diff --git a/packages/beacon-node/src/chain/blocks/types.ts b/packages/beacon-node/src/chain/blocks/types.ts index 2fd16fa64705..63fe21df105a 100644 --- a/packages/beacon-node/src/chain/blocks/types.ts +++ b/packages/beacon-node/src/chain/blocks/types.ts @@ -29,7 +29,12 @@ export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Arra export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & ( | {type: BlockInputType.preDeneb} | ({type: BlockInputType.postDeneb} & BlockInputBlobs) - | {type: BlockInputType.blobsPromise; blobsCache: BlobsCache; availabilityPromise: Promise} + | { + type: BlockInputType.blobsPromise; + blobsCache: BlobsCache; + availabilityPromise: Promise; + resolveAvailability: (blobs: BlockInputBlobs) => void; + } ); export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean { @@ -85,7 +90,8 @@ export const getBlockInput = { source: BlockSource, blobsCache: BlobsCache, blockBytes: Uint8Array | null, - availabilityPromise: Promise + availabilityPromise: Promise, + resolveAvailability: (blobs: BlockInputBlobs) => void ): BlockInput { if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) { throw Error(`Pre Deneb block slot ${block.message.slot}`); @@ -97,10 +103,27 @@ export const getBlockInput = { blobsCache, blockBytes, availabilityPromise, + resolveAvailability, }; }, }; +export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs { + const blobs = []; + const blobsBytes = []; + + for (let index = 0; index < blobsCache.size; index++) { + const blobCache = blobsCache.get(index); + if (blobCache === undefined) { + throw Error(`Missing blobSidecar at index=${index}`); + } + const {blobSidecar, blobBytes} = blobCache; + blobs.push(blobSidecar); + blobsBytes.push(blobBytes); + } + return {blobs, blobsBytes}; +} + export enum AttestationImportOpt { Skip, Force, diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts index 9c45469d56dd..de7a9575ce06 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts @@ -7,9 +7,9 @@ import {validateBlobSidecars} from "../validation/blobSidecar.js"; import {Metrics} from "../../metrics/metrics.js"; import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation} from "./types.js"; -// proposer boost is not available post 3 sec so try pulling using unknown block hash -// post 3 sec after throwing the availability error -const BLOB_AVAILABILITY_TIMEOUT = 3_000; +// we can now wait for full 12 seconds because unavailable block sync will try pulling +// the blobs from the network anyway after 500ms of seeing the block +const BLOB_AVAILABILITY_TIMEOUT = 12_000; /** * Verifies some early cheap sanity checks on the block before running the full state transition. @@ -59,7 +59,7 @@ export async function verifyBlocksDataAvailability( } async function maybeValidateBlobs( - chain: {config: ChainForkConfig; genesisTime: UintNum64}, + chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger}, blockInput: BlockInput, opts: ImportBlockOpts ): Promise { @@ -102,7 +102,7 @@ async function maybeValidateBlobs( * which may try unknownblock/blobs fill (by root). */ async function raceWithCutoff( - chain: {config: ChainForkConfig; genesisTime: UintNum64}, + chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger}, blockInput: BlockInput, availabilityPromise: Promise ): Promise { @@ -114,6 +114,7 @@ async function raceWithCutoff( 0 ); const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime)); + chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime}); try { await Promise.race([availabilityPromise, cutoffTimeout]); diff --git a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts index 1f23503d3957..1f7e992ebada 100644 --- a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts +++ b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts @@ -11,7 +11,14 @@ import { BlockInputBlobs, BlobsCache, GossipedInputType, + getBlockInputBlobs, } from "../blocks/types.js"; +import {Metrics} from "../../metrics/index.js"; + +export enum BlockInputAvailabilitySource { + GOSSIP = "gossip", + UNKNOWN_SYNC = "unknown_sync", +} type GossipedBlockInput = | {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null} @@ -52,7 +59,8 @@ export class SeenGossipBlockInput { getGossipBlockInput( config: ChainForkConfig, - gossipedInput: GossipedBlockInput + gossipedInput: GossipedBlockInput, + metrics: Metrics | null ): | { blockInput: BlockInput; @@ -113,6 +121,7 @@ export class SeenGossipBlockInput { if (blobKzgCommitments.length === blobsCache.size) { const allBlobs = getBlockInputBlobs(blobsCache); resolveAvailability(allBlobs); + metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP}); const {blobs, blobsBytes} = allBlobs; return { blockInput: getBlockInput.postDeneb( @@ -133,7 +142,8 @@ export class SeenGossipBlockInput { BlockSource.gossip, blobsCache, blockBytes ?? null, - availabilityPromise + availabilityPromise, + resolveAvailability ), blockInputMeta: { pending: GossipedInputType.blob, @@ -165,19 +175,3 @@ function getEmptyBlockInputCacheEntry(): BlockInputCacheType { const blobsCache = new Map(); return {availabilityPromise, resolveAvailability, blobsCache}; } - -function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs { - const blobs = []; - const blobsBytes = []; - - for (let index = 0; index < blobsCache.size; index++) { - const blobCache = blobsCache.get(index); - if (blobCache === undefined) { - throw Error(`Missing blobSidecar at index=${index}`); - } - const {blobSidecar, blobBytes} = blobCache; - blobs.push(blobSidecar); - blobsBytes.push(blobBytes); - } - return {blobs, blobsBytes}; -} diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 957a962de103..4f82969cff81 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -7,11 +7,12 @@ import {InsertOutcome} from "../../chain/opPools/types.js"; import {RegenCaller, RegenFnName} from "../../chain/regen/interface.js"; import {ReprocessStatus} from "../../chain/reprocess.js"; import {RejectReason} from "../../chain/seenCache/seenAttestationData.js"; +import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js"; import {ExecutionPayloadStatus} from "../../execution/index.js"; import {GossipType} from "../../network/index.js"; import {CannotAcceptWorkReason, ReprocessRejectReason} from "../../network/processor/index.js"; import {BackfillSyncMethod} from "../../sync/backfill/backfill.js"; -import {PendingBlockType} from "../../sync/interface.js"; +import {PendingBlockType} from "../../sync/index.js"; import {PeerSyncType, RangeSyncType} from "../../sync/utils/remoteSyncType.js"; import {LodestarMetadata} from "../options.js"; import {RegistryMetricCreator} from "../utils/registryMetricCreator.js"; @@ -592,6 +593,11 @@ export function createLodestarMetrics( help: "Time elapsed between block slot time and the time block received via unknown block sync", buckets: [0.5, 1, 2, 4, 6, 12], }), + resolveAvailabilitySource: register.gauge<{source: BlockInputAvailabilitySource}>({ + name: "lodestar_sync_blockinput_availability_source", + help: "Total number of blocks whose data availability was resolved", + labelNames: ["source"], + }), }, // Gossip sync committee diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index 67ea0a1dd0e0..65c5d56fb808 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -17,6 +17,7 @@ export enum NetworkEvent { // TODO remove this event, this is not a network-level concern, rather a chain / sync concern unknownBlockParent = "unknownBlockParent", unknownBlock = "unknownBlock", + unknownBlockInput = "unknownBlockInput", // Network processor events /** (Network -> App) A gossip message is ready for validation */ @@ -31,6 +32,7 @@ export type NetworkEventData = { [NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId}; [NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr}; [NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr}; + [NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr}; [NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage; [NetworkEvent.gossipMessageValidationResult]: { msgId: string; @@ -45,6 +47,7 @@ export const networkEventDirection: Record = { [NetworkEvent.reqRespRequest]: EventDirection.none, // Only used internally in NetworkCore [NetworkEvent.unknownBlockParent]: EventDirection.workerToMain, [NetworkEvent.unknownBlock]: EventDirection.workerToMain, + [NetworkEvent.unknownBlockInput]: EventDirection.workerToMain, [NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain, [NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker, }; diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index 60186f8fb79f..d9efdd2b09a6 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -45,7 +45,7 @@ import {PeerAction} from "../peers/index.js"; import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js"; import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js"; import {validateGossipBlobSidecar} from "../../chain/validation/blobSidecar.js"; -import {BlockInput, GossipedInputType, BlobSidecarValidation} from "../../chain/blocks/types.js"; +import {BlockInput, GossipedInputType, BlobSidecarValidation, BlockInputType} from "../../chain/blocks/types.js"; import {sszDeserialize} from "../gossip/topic.js"; import {INetworkCore} from "../core/index.js"; import {INetwork} from "../interface.js"; @@ -118,11 +118,15 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler const recvToValLatency = Date.now() / 1000 - seenTimestampSec; // always set block to seen cache for all forks so that we don't need to download it - const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput(config, { - type: GossipedInputType.block, - signedBlock, - blockBytes, - }); + const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput( + config, + { + type: GossipedInputType.block, + signedBlock, + blockBytes, + }, + metrics + ); const blockInput = blockInputRes.blockInput; // blockInput can't be returned null, improve by enforcing via return types if (blockInput === null) { @@ -187,11 +191,15 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler const delaySec = chain.clock.secFromSlot(slot, seenTimestampSec); const recvToValLatency = Date.now() / 1000 - seenTimestampSec; - const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput(config, { - type: GossipedInputType.blob, - blobSidecar, - blobBytes, - }); + const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput( + config, + { + type: GossipedInputType.blob, + blobSidecar, + blobBytes, + }, + metrics + ); try { await validateGossipBlobSidecar(chain, blobSidecar, gossipIndex); @@ -242,6 +250,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler // Handler - MUST NOT `await`, to allow validation result to be propagated metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message); + // if blobs are not yet fully available start an aggressive blob pull + if (blockInput.type === BlockInputType.blobsPromise) { + events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr}); + } chain .processBlock(blockInput, { @@ -276,7 +288,6 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler if (e instanceof BlockError) { switch (e.type.code) { case BlockErrorCode.DATA_UNAVAILABLE: { - // TODO: create a newevent unknownBlobs and only pull blobs const slot = signedBlock.message.slot; const forkTypes = config.getForkTypes(slot); const rootHex = toHexString(forkTypes.BeaconBlock.hashTreeRoot(signedBlock.message)); diff --git a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts index c85464a05b61..9562588d56db 100644 --- a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts +++ b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts @@ -1,9 +1,11 @@ import {ChainForkConfig} from "@lodestar/config"; import {phase0, deneb} from "@lodestar/types"; import {ForkSeq} from "@lodestar/params"; -import {BlockInput, BlockSource} from "../../chain/blocks/types.js"; +import {BlockInput, BlockInputType, BlockSource, getBlockInputBlobs, getBlockInput} from "../../chain/blocks/types.js"; import {PeerIdStr} from "../../util/peerId.js"; import {INetwork} from "../interface.js"; +import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js"; +import {Metrics} from "../../metrics/index.js"; import {matchBlockWithBlobs} from "./beaconBlocksMaybeBlobsByRange.js"; export async function beaconBlocksMaybeBlobsByRoot( @@ -39,3 +41,52 @@ export async function beaconBlocksMaybeBlobsByRoot( // and here it should be infinity since all bobs should match return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, Infinity, BlockSource.byRoot); } + +export async function unavailableBeaconBlobsByRoot( + config: ChainForkConfig, + network: INetwork, + peerId: PeerIdStr, + unavailableBlockInput: BlockInput, + metrics: Metrics | null +): Promise { + if (unavailableBlockInput.type !== BlockInputType.blobsPromise) { + return unavailableBlockInput; + } + + const blobIdentifiers: deneb.BlobIdentifier[] = []; + const {block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput; + + const slot = block.message.slot; + const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message); + + const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length; + for (let index = 0; index < blobKzgCommitmentsLen; index++) { + if (blobsCache.has(index) === false) blobIdentifiers.push({blockRoot, index}); + } + + let allBlobSidecars: deneb.BlobSidecar[]; + if (blobIdentifiers.length > 0) { + allBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, blobIdentifiers); + } else { + allBlobSidecars = []; + } + + // add them in cache so that its reflected in all the blockInputs that carry this + // for e.g. a blockInput that might be awaiting blobs promise fullfillment in + // verifyBlocksDataAvailability + for (const blobSidecar of allBlobSidecars) { + blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null}); + } + + // check and see if all blobs are now available and in that case resolve availability + // if not this will error and the leftover blobs will be tried from another peer + const allBlobs = getBlockInputBlobs(blobsCache); + const {blobs, blobsBytes} = allBlobs; + if (blobs.length !== blobKzgCommitmentsLen) { + throw Error(`Not all blobs fetched missingBlobs=${blobKzgCommitmentsLen - blobs.length}`); + } + + resolveAvailability(allBlobs); + metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC}); + return getBlockInput.postDeneb(config, block, BlockSource.byRoot, blobs, blockBytes, blobsBytes); +} diff --git a/packages/beacon-node/src/sync/interface.ts b/packages/beacon-node/src/sync/interface.ts index 4dd2fd96e21a..1c1ae1ceedf7 100644 --- a/packages/beacon-node/src/sync/interface.ts +++ b/packages/beacon-node/src/sync/interface.ts @@ -2,7 +2,7 @@ import {Logger} from "@lodestar/utils"; import {RootHex, Slot, phase0} from "@lodestar/types"; import {BeaconConfig} from "@lodestar/config"; import {routes} from "@lodestar/api"; -import {BlockInput} from "../chain/blocks/types.js"; +import {BlockInput, BlockInputType} from "../chain/blocks/types.js"; import {INetwork} from "../network/index.js"; import {IBeaconChain} from "../chain/index.js"; import {Metrics} from "../metrics/index.js"; @@ -56,7 +56,7 @@ export interface SyncModules { } export type UnknownAndAncestorBlocks = { - unknowns: UnknownBlock[]; + unknowns: (UnknownBlock | UnknownBlockInput)[]; ancestors: DownloadedBlock[]; }; @@ -66,7 +66,7 @@ export type UnknownAndAncestorBlocks = { * - store 1 record with known parentBlockRootHex & blockInput, blockRootHex as key, status downloaded * - store 1 record with undefined parentBlockRootHex & blockInput, parentBlockRootHex as key, status pending */ -export type PendingBlock = UnknownBlock | DownloadedBlock; +export type PendingBlock = UnknownBlock | UnknownBlockInput | DownloadedBlock; type PendingBlockCommon = { blockRootHex: RootHex; @@ -80,6 +80,15 @@ export type UnknownBlock = PendingBlockCommon & { blockInput: null; }; +/** + * either the blobs are unknown or in future some blobs and even the block is unknown + */ +export type UnknownBlockInput = PendingBlockCommon & { + status: PendingBlockStatus.pending | PendingBlockStatus.fetching; + parentBlockRootHex: null; + blockInput: BlockInput & {type: BlockInputType.blobsPromise}; +}; + export type DownloadedBlock = PendingBlockCommon & { status: PendingBlockStatus.downloaded | PendingBlockStatus.processing; parentBlockRootHex: RootHex; @@ -102,4 +111,6 @@ export enum PendingBlockType { * During gossip time, we may get a block but the parent root is unknown (not in forkchoice). */ UNKNOWN_PARENT = "unknown_parent", + + UNKNOWN_BLOCKINPUT = "unknown_blockinput", } diff --git a/packages/beacon-node/src/sync/unknownBlock.ts b/packages/beacon-node/src/sync/unknownBlock.ts index cefe2617900a..15a145eb5f84 100644 --- a/packages/beacon-node/src/sync/unknownBlock.ts +++ b/packages/beacon-node/src/sync/unknownBlock.ts @@ -1,18 +1,21 @@ import {fromHexString, toHexString} from "@chainsafe/ssz"; import {ChainForkConfig} from "@lodestar/config"; import {Logger, pruneSetToMax} from "@lodestar/utils"; -import {Root, RootHex} from "@lodestar/types"; +import {Root, RootHex, deneb} from "@lodestar/types"; import {INTERVALS_PER_SLOT} from "@lodestar/params"; import {sleep} from "@lodestar/utils"; import {INetwork, NetworkEvent, NetworkEventData, PeerAction} from "../network/index.js"; import {PeerIdStr} from "../util/peerId.js"; import {IBeaconChain} from "../chain/index.js"; -import {BlockInput} from "../chain/blocks/types.js"; +import {BlockInput, BlockInputType} from "../chain/blocks/types.js"; import {Metrics} from "../metrics/index.js"; import {shuffle} from "../util/shuffle.js"; import {byteArrayEquals} from "../util/bytes.js"; import {BlockError, BlockErrorCode} from "../chain/errors/index.js"; -import {beaconBlocksMaybeBlobsByRoot} from "../network/reqresp/beaconBlocksMaybeBlobsByRoot.js"; +import { + beaconBlocksMaybeBlobsByRoot, + unavailableBeaconBlobsByRoot, +} from "../network/reqresp/beaconBlocksMaybeBlobsByRoot.js"; import {wrapError} from "../util/wrapError.js"; import {PendingBlock, PendingBlockStatus, PendingBlockType} from "./interface.js"; import {getDescendantBlocks, getAllDescendantBlocks, getUnknownAndAncestorBlocks} from "./utils/pendingBlocksTree.js"; @@ -59,6 +62,7 @@ export class UnknownBlockSync { if (!this.subscribedToNetworkEvents) { this.logger.verbose("UnknownBlockSync enabled."); this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock); + this.network.events.on(NetworkEvent.unknownBlockInput, this.onUnknownBlockInput); this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent); this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch); this.subscribedToNetworkEvents = true; @@ -71,6 +75,7 @@ export class UnknownBlockSync { unsubscribeFromNetwork(): void { this.logger.verbose("UnknownBlockSync disabled."); this.network.events.off(NetworkEvent.unknownBlock, this.onUnknownBlock); + this.network.events.off(NetworkEvent.unknownBlockInput, this.onUnknownBlockInput); this.network.events.off(NetworkEvent.unknownBlockParent, this.onUnknownParent); this.network.events.off(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch); this.subscribedToNetworkEvents = false; @@ -98,6 +103,19 @@ export class UnknownBlockSync { } }; + /** + * Process an unknownBlockInput event and register the block in `pendingBlocks` Map. + */ + private onUnknownBlockInput = (data: NetworkEventData[NetworkEvent.unknownBlockInput]): void => { + try { + this.addUnknownBlock(data.blockInput, data.peer); + this.triggerUnknownBlockSearch(); + this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCKINPUT}); + } catch (e) { + this.logger.debug("Error handling unknownBlockInput event", {}, e as Error); + } + }; + /** * Process an unknownBlockParent event and register the block in `pendingBlocks` Map. */ @@ -147,19 +165,42 @@ export class UnknownBlockSync { this.addUnknownBlock(parentBlockRootHex, peerIdStr); } - private addUnknownBlock(blockRootHex: RootHex, peerIdStr?: string): void { + private addUnknownBlock(blockInputOrRootHex: RootHex | BlockInput, peerIdStr?: string): void { + let blockRootHex; + let blockInput: BlockInput | null; + + if (typeof blockInputOrRootHex === "string") { + blockRootHex = blockInputOrRootHex; + blockInput = null; + } else { + const {block} = blockInputOrRootHex; + blockRootHex = toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)); + blockInput = blockInputOrRootHex; + } + let pendingBlock = this.pendingBlocks.get(blockRootHex); if (!pendingBlock) { pendingBlock = { blockRootHex, parentBlockRootHex: null, - blockInput: null, + blockInput: blockInput?.type === BlockInputType.blobsPromise ? blockInput : null, peerIdStrs: new Set(), status: PendingBlockStatus.pending, downloadAttempts: 0, - }; + } as PendingBlock; this.pendingBlocks.set(blockRootHex, pendingBlock); - this.logger.verbose("Added unknown block to pendingBlocks", {root: blockRootHex}); + + if (pendingBlock.blockInput?.type === BlockInputType.blobsPromise) { + this.logger.verbose("Added blockInput with unknown blobs to pendingBlocks", { + root: blockRootHex, + slot: blockInput?.block.message.slot ?? "unknown", + }); + } else { + this.logger.verbose("Added unknown block to pendingBlocks", { + root: blockRootHex, + slot: blockInput?.block.message.slot ?? "unknown", + }); + } } if (peerIdStr) { @@ -226,13 +267,29 @@ export class UnknownBlockSync { return; } - this.logger.verbose("Downloading unknown block", { - root: block.blockRootHex, - pendingBlocks: this.pendingBlocks.size, - }); + if (block.blockInput?.type === BlockInputType.blobsPromise) { + this.logger.verbose("Downloading unknown blockInput", { + root: block.blockRootHex, + pendingBlocks: this.pendingBlocks.size, + blockInputType: block.blockInput.type, + slot: block.blockInput?.block.message.slot ?? "unknown", + }); + } else { + this.logger.verbose("Downloading unknown block", { + root: block.blockRootHex, + pendingBlocks: this.pendingBlocks.size, + slot: block.blockInput?.block.message.slot ?? "unknown", + }); + } block.status = PendingBlockStatus.fetching; - const res = await wrapError(this.fetchUnknownBlockRoot(fromHexString(block.blockRootHex), connectedPeers)); + + let res; + if (block.blockInput === null) { + res = await wrapError(this.fetchUnknownBlockRoot(fromHexString(block.blockRootHex), connectedPeers)); + } else { + res = await wrapError(this.fetchUnavailableBlockInput(block.blockInput, connectedPeers)); + } if (res.err) this.metrics?.syncUnknownBlock.downloadedBlocksError.inc(); else this.metrics?.syncUnknownBlock.downloadedBlocksSuccess.inc(); @@ -252,11 +309,22 @@ export class UnknownBlockSync { this.metrics?.syncUnknownBlock.elapsedTimeTillReceived.observe(delaySec); const parentInForkchoice = this.chain.forkChoice.hasBlock(blockInput.block.message.parentRoot); - this.logger.verbose("Downloaded unknown block", { - root: block.blockRootHex, - pendingBlocks: this.pendingBlocks.size, - parentInForkchoice, - }); + + if (block.blockInput.type === BlockInputType.blobsPromise) { + this.logger.verbose("Downloaded unknown blobs", { + root: block.blockRootHex, + pendingBlocks: this.pendingBlocks.size, + parentInForkchoice, + blockInputType: blockInput.type, + }); + } else { + this.logger.verbose("Downloaded unknown block", { + root: block.blockRootHex, + pendingBlocks: this.pendingBlocks.size, + parentInForkchoice, + blockInputType: blockInput.type, + }); + } if (parentInForkchoice) { // Bingo! Process block. Add to pending blocks anyway for recycle the cache that prevents duplicate processing @@ -437,6 +505,68 @@ export class UnknownBlockSync { } } + /** + * Fetches missing blobs for the blockinput, in future can also pull block is thats also missing + * along with the blobs (i.e. only some blobs are available) + */ + private async fetchUnavailableBlockInput( + unavailableBlockInput: BlockInput, + connectedPeers: PeerIdStr[] + ): Promise<{blockInput: BlockInput; peerIdStr: string}> { + if (unavailableBlockInput.type !== BlockInputType.blobsPromise) { + return {blockInput: unavailableBlockInput, peerIdStr: ""}; + } + + const shuffledPeers = shuffle(connectedPeers); + const unavailableBlock = unavailableBlockInput.block; + const blockRoot = this.config + .getForkTypes(unavailableBlock.message.slot) + .BeaconBlock.hashTreeRoot(unavailableBlock.message); + const blockRootHex = toHexString(blockRoot); + + const blobKzgCommitmentsLen = (unavailableBlock.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length; + const pendingBlobs = blobKzgCommitmentsLen - unavailableBlockInput.blobsCache.size; + + let lastError: Error | null = null; + for (let i = 0; i < MAX_ATTEMPTS_PER_BLOCK; i++) { + const peer = shuffledPeers[i % shuffledPeers.length]; + try { + const blockInput = await unavailableBeaconBlobsByRoot( + this.config, + this.network, + peer, + unavailableBlockInput, + this.metrics + ); + + // Peer does not have the block, try with next peer + if (blockInput === undefined) { + continue; + } + + // Verify block root is correct + const block = blockInput.block.message; + const receivedBlockRoot = this.config.getForkTypes(block.slot).BeaconBlock.hashTreeRoot(block); + if (!byteArrayEquals(receivedBlockRoot, blockRoot)) { + throw Error(`Wrong block received by peer, got ${toHexString(receivedBlockRoot)} expected ${blockRootHex}`); + } + this.logger.debug("Fetched UnavailableBlockInput", {attempts: i, pendingBlobs, blobKzgCommitmentsLen}); + + return {blockInput, peerIdStr: peer}; + } catch (e) { + this.logger.debug("Error fetching UnavailableBlockInput", {attempt: i, blockRootHex, peer}, e as Error); + lastError = e as Error; + } + } + + if (lastError) { + lastError.message = `Error fetching UnavailableBlockInput after ${MAX_ATTEMPTS_PER_BLOCK} attempts: ${lastError.message}`; + throw lastError; + } else { + throw Error(`Error fetching UnavailableBlockInput after ${MAX_ATTEMPTS_PER_BLOCK}: unknown error`); + } + } + /** * Gets all descendant blocks of `block` recursively from `pendingBlocks`. * Assumes that if a parent block does not exist or is not processable, all descendant blocks are bad too. diff --git a/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts b/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts index cd3f606dff53..ca93fd0181af 100644 --- a/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts +++ b/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts @@ -5,8 +5,10 @@ import { PendingBlock, PendingBlockStatus, UnknownAndAncestorBlocks, + UnknownBlockInput, UnknownBlock, } from "../interface.js"; +import {BlockInputType} from "../../chain/blocks/types.js"; export function getAllDescendantBlocks(blockRootHex: RootHex, blocks: Map): PendingBlock[] { // Do one pass over all blocks to index by parent @@ -57,16 +59,20 @@ export function getDescendantBlocks(blockRootHex: RootHex, blocks: Map): UnknownAndAncestorBlocks { - const unknowns: UnknownBlock[] = []; + const unknowns: (UnknownBlock | UnknownBlockInput)[] = []; const ancestors: DownloadedBlock[] = []; for (const block of blocks.values()) { const parentHex = block.parentBlockRootHex; - if (block.status === PendingBlockStatus.pending && block.blockInput == null && parentHex == null) { + if ( + block.status === PendingBlockStatus.pending && + (block.blockInput == null || block.blockInput.type === BlockInputType.blobsPromise) && + parentHex == null + ) { unknowns.push(block); } - if (parentHex && !blocks.has(parentHex)) { + if (block.status === PendingBlockStatus.downloaded && parentHex && !blocks.has(parentHex)) { ancestors.push(block); } } diff --git a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts index a49c6923a9ea..976df5d2ae4d 100644 --- a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts +++ b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts @@ -15,7 +15,7 @@ import { ReqRespMethod, networkEventDirection, } from "../../../../src/network/index.js"; -import {BlockInputType, BlockSource} from "../../../../src/chain/blocks/types.js"; +import {BlockInputType, BlockSource, BlockInputBlobs, BlockInput} from "../../../../src/chain/blocks/types.js"; import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js"; import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js"; import {NetworkWorkerApi} from "../../../../src/network/core/index.js"; @@ -92,6 +92,10 @@ describe.skip("data serialization through worker boundary", function () { rootHex: ZERO_HASH_HEX, peer, }, + [NetworkEvent.unknownBlockInput]: { + blockInput: getEmptyBlockInput(), + peer, + }, [NetworkEvent.pendingGossipsubMessage]: { topic: {type: GossipType.beacon_block, fork: ForkName.altair}, msg: { @@ -240,3 +244,23 @@ describe.skip("data serialization through worker boundary", function () { }); type Resolves> = T extends Promise ? (U extends void ? null : U) : never; + +function getEmptyBlockInput(): BlockInput { + let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { + resolveAvailability = resolveCB; + }); + if (resolveAvailability === null) { + throw Error("Promise Constructor was not executed immediately"); + } + const blobsCache = new Map(); + return { + type: BlockInputType.blobsPromise, + block: ssz.deneb.SignedBeaconBlock.defaultValue(), + source: BlockSource.gossip, + blockBytes: ZERO_HASH, + blobsCache, + availabilityPromise, + resolveAvailability, + }; +} diff --git a/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts b/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts index 16af7b0df10e..ac7adb546663 100644 --- a/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts +++ b/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts @@ -105,11 +105,15 @@ describe("SeenGossipBlockInput", () => { try { if (eventType === GossipedInputType.block) { - const blockInputRes = seenGossipBlockInput.getGossipBlockInput(config, { - type: GossipedInputType.block, - signedBlock, - blockBytes: null, - }); + const blockInputRes = seenGossipBlockInput.getGossipBlockInput( + config, + { + type: GossipedInputType.block, + signedBlock, + blockBytes: null, + }, + null + ); if (expectedResponseType instanceof Error) { expect.fail(`expected to fail with error: ${expectedResponseType.message}`); @@ -123,11 +127,15 @@ describe("SeenGossipBlockInput", () => { const blobSidecar = blobSidecars[index]; expect(blobSidecar).not.toBeUndefined(); - const blobInputRes = seenGossipBlockInput.getGossipBlockInput(config, { - type: GossipedInputType.blob, - blobSidecar, - blobBytes: null, - }); + const blobInputRes = seenGossipBlockInput.getGossipBlockInput( + config, + { + type: GossipedInputType.blob, + blobSidecar, + blobBytes: null, + }, + null + ); if (expectedResponseType instanceof Error) { expect.fail(`expected to fail with error: ${expectedResponseType.message}`);