Skip to content

Commit

Permalink
feat: aggressively pull blobs as soon as we see the block (ChainSafe#…
Browse files Browse the repository at this point in the history
…6499)

* feat: aggressively pull blobs as soon as we see the block

* fixes

* some logging improvement

* improve log

* fix tsc

* improve comments

* apply feedback

* refactor to a separate event and handler for unknownblock input

* fix e2e

* reduce diff

* add metric to track availability resolution

* fix circular include issue
  • Loading branch information
g11tech authored Mar 5, 2024
1 parent 10c1b11 commit d66f607
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 73 deletions.
27 changes: 25 additions & 2 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Arra
export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| {type: BlockInputType.blobsPromise; blobsCache: BlobsCache; availabilityPromise: Promise<BlockInputBlobs>}
| {
type: BlockInputType.blobsPromise;
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
}
);

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
Expand Down Expand Up @@ -85,7 +90,8 @@ export const getBlockInput = {
source: BlockSource,
blobsCache: BlobsCache,
blockBytes: Uint8Array | null,
availabilityPromise: Promise<BlockInputBlobs>
availabilityPromise: Promise<BlockInputBlobs>,
resolveAvailability: (blobs: BlockInputBlobs) => void
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
Expand All @@ -97,10 +103,27 @@ export const getBlockInput = {
blobsCache,
blockBytes,
availabilityPromise,
resolveAvailability,
};
},
};

export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
const blobs = [];
const blobsBytes = [];

for (let index = 0; index < blobsCache.size; index++) {
const blobCache = blobsCache.get(index);
if (blobCache === undefined) {
throw Error(`Missing blobSidecar at index=${index}`);
}
const {blobSidecar, blobBytes} = blobCache;
blobs.push(blobSidecar);
blobsBytes.push(blobBytes);
}
return {blobs, blobsBytes};
}

export enum AttestationImportOpt {
Skip,
Force,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation} from "./types.js";

// proposer boost is not available post 3 sec so try pulling using unknown block hash
// post 3 sec after throwing the availability error
const BLOB_AVAILABILITY_TIMEOUT = 3_000;
// we can now wait for full 12 seconds because unavailable block sync will try pulling
// the blobs from the network anyway after 500ms of seeing the block
const BLOB_AVAILABILITY_TIMEOUT = 12_000;

/**
* Verifies some early cheap sanity checks on the block before running the full state transition.
Expand Down Expand Up @@ -59,7 +59,7 @@ export async function verifyBlocksDataAvailability(
}

async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64},
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
opts: ImportBlockOpts
): Promise<DataAvailableStatus> {
Expand Down Expand Up @@ -102,7 +102,7 @@ async function maybeValidateBlobs(
* which may try unknownblock/blobs fill (by root).
*/
async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64},
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
availabilityPromise: Promise<T>
): Promise<T> {
Expand All @@ -114,6 +114,7 @@ async function raceWithCutoff<T>(
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime));
chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime});

try {
await Promise.race([availabilityPromise, cutoffTimeout]);
Expand Down
30 changes: 12 additions & 18 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ import {
BlockInputBlobs,
BlobsCache,
GossipedInputType,
getBlockInputBlobs,
} from "../blocks/types.js";
import {Metrics} from "../../metrics/index.js";

export enum BlockInputAvailabilitySource {
GOSSIP = "gossip",
UNKNOWN_SYNC = "unknown_sync",
}

type GossipedBlockInput =
| {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null}
Expand Down Expand Up @@ -52,7 +59,8 @@ export class SeenGossipBlockInput {

getGossipBlockInput(
config: ChainForkConfig,
gossipedInput: GossipedBlockInput
gossipedInput: GossipedBlockInput,
metrics: Metrics | null
):
| {
blockInput: BlockInput;
Expand Down Expand Up @@ -113,6 +121,7 @@ export class SeenGossipBlockInput {
if (blobKzgCommitments.length === blobsCache.size) {
const allBlobs = getBlockInputBlobs(blobsCache);
resolveAvailability(allBlobs);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {blobs, blobsBytes} = allBlobs;
return {
blockInput: getBlockInput.postDeneb(
Expand All @@ -133,7 +142,8 @@ export class SeenGossipBlockInput {
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise
availabilityPromise,
resolveAvailability
),
blockInputMeta: {
pending: GossipedInputType.blob,
Expand Down Expand Up @@ -165,19 +175,3 @@ function getEmptyBlockInputCacheEntry(): BlockInputCacheType {
const blobsCache = new Map();
return {availabilityPromise, resolveAvailability, blobsCache};
}

function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
const blobs = [];
const blobsBytes = [];

for (let index = 0; index < blobsCache.size; index++) {
const blobCache = blobsCache.get(index);
if (blobCache === undefined) {
throw Error(`Missing blobSidecar at index=${index}`);
}
const {blobSidecar, blobBytes} = blobCache;
blobs.push(blobSidecar);
blobsBytes.push(blobBytes);
}
return {blobs, blobsBytes};
}
8 changes: 7 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import {InsertOutcome} from "../../chain/opPools/types.js";
import {RegenCaller, RegenFnName} from "../../chain/regen/interface.js";
import {ReprocessStatus} from "../../chain/reprocess.js";
import {RejectReason} from "../../chain/seenCache/seenAttestationData.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {ExecutionPayloadStatus} from "../../execution/index.js";
import {GossipType} from "../../network/index.js";
import {CannotAcceptWorkReason, ReprocessRejectReason} from "../../network/processor/index.js";
import {BackfillSyncMethod} from "../../sync/backfill/backfill.js";
import {PendingBlockType} from "../../sync/interface.js";
import {PendingBlockType} from "../../sync/index.js";
import {PeerSyncType, RangeSyncType} from "../../sync/utils/remoteSyncType.js";
import {LodestarMetadata} from "../options.js";
import {RegistryMetricCreator} from "../utils/registryMetricCreator.js";
Expand Down Expand Up @@ -592,6 +593,11 @@ export function createLodestarMetrics(
help: "Time elapsed between block slot time and the time block received via unknown block sync",
buckets: [0.5, 1, 2, 4, 6, 12],
}),
resolveAvailabilitySource: register.gauge<{source: BlockInputAvailabilitySource}>({
name: "lodestar_sync_blockinput_availability_source",
help: "Total number of blocks whose data availability was resolved",
labelNames: ["source"],
}),
},

// Gossip sync committee
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export enum NetworkEvent {
// TODO remove this event, this is not a network-level concern, rather a chain / sync concern
unknownBlockParent = "unknownBlockParent",
unknownBlock = "unknownBlock",
unknownBlockInput = "unknownBlockInput",

// Network processor events
/** (Network -> App) A gossip message is ready for validation */
Expand All @@ -31,6 +32,7 @@ export type NetworkEventData = {
[NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr};
[NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr};
[NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr};
[NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage;
[NetworkEvent.gossipMessageValidationResult]: {
msgId: string;
Expand All @@ -45,6 +47,7 @@ export const networkEventDirection: Record<NetworkEvent, EventDirection> = {
[NetworkEvent.reqRespRequest]: EventDirection.none, // Only used internally in NetworkCore
[NetworkEvent.unknownBlockParent]: EventDirection.workerToMain,
[NetworkEvent.unknownBlock]: EventDirection.workerToMain,
[NetworkEvent.unknownBlockInput]: EventDirection.workerToMain,
[NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain,
[NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker,
};
Expand Down
35 changes: 23 additions & 12 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import {PeerAction} from "../peers/index.js";
import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js";
import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js";
import {validateGossipBlobSidecar} from "../../chain/validation/blobSidecar.js";
import {BlockInput, GossipedInputType, BlobSidecarValidation} from "../../chain/blocks/types.js";
import {BlockInput, GossipedInputType, BlobSidecarValidation, BlockInputType} from "../../chain/blocks/types.js";
import {sszDeserialize} from "../gossip/topic.js";
import {INetworkCore} from "../core/index.js";
import {INetwork} from "../interface.js";
Expand Down Expand Up @@ -118,11 +118,15 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
const recvToValLatency = Date.now() / 1000 - seenTimestampSec;

// always set block to seen cache for all forks so that we don't need to download it
const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput(config, {
type: GossipedInputType.block,
signedBlock,
blockBytes,
});
const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput(
config,
{
type: GossipedInputType.block,
signedBlock,
blockBytes,
},
metrics
);
const blockInput = blockInputRes.blockInput;
// blockInput can't be returned null, improve by enforcing via return types
if (blockInput === null) {
Expand Down Expand Up @@ -187,11 +191,15 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
const delaySec = chain.clock.secFromSlot(slot, seenTimestampSec);
const recvToValLatency = Date.now() / 1000 - seenTimestampSec;

const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput(config, {
type: GossipedInputType.blob,
blobSidecar,
blobBytes,
});
const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput(
config,
{
type: GossipedInputType.blob,
blobSidecar,
blobBytes,
},
metrics
);

try {
await validateGossipBlobSidecar(chain, blobSidecar, gossipIndex);
Expand Down Expand Up @@ -242,6 +250,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
// Handler - MUST NOT `await`, to allow validation result to be propagated

metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message);
// if blobs are not yet fully available start an aggressive blob pull
if (blockInput.type === BlockInputType.blobsPromise) {
events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr});
}

chain
.processBlock(blockInput, {
Expand Down Expand Up @@ -276,7 +288,6 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
if (e instanceof BlockError) {
switch (e.type.code) {
case BlockErrorCode.DATA_UNAVAILABLE: {
// TODO: create a newevent unknownBlobs and only pull blobs
const slot = signedBlock.message.slot;
const forkTypes = config.getForkTypes(slot);
const rootHex = toHexString(forkTypes.BeaconBlock.hashTreeRoot(signedBlock.message));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {ChainForkConfig} from "@lodestar/config";
import {phase0, deneb} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {BlockInput, BlockSource} from "../../chain/blocks/types.js";
import {BlockInput, BlockInputType, BlockSource, getBlockInputBlobs, getBlockInput} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {Metrics} from "../../metrics/index.js";
import {matchBlockWithBlobs} from "./beaconBlocksMaybeBlobsByRange.js";

export async function beaconBlocksMaybeBlobsByRoot(
Expand Down Expand Up @@ -39,3 +41,52 @@ export async function beaconBlocksMaybeBlobsByRoot(
// and here it should be infinity since all bobs should match
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, Infinity, BlockSource.byRoot);
}

export async function unavailableBeaconBlobsByRoot(
config: ChainForkConfig,
network: INetwork,
peerId: PeerIdStr,
unavailableBlockInput: BlockInput,
metrics: Metrics | null
): Promise<BlockInput> {
if (unavailableBlockInput.type !== BlockInputType.blobsPromise) {
return unavailableBlockInput;
}

const blobIdentifiers: deneb.BlobIdentifier[] = [];
const {block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput;

const slot = block.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);

const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
for (let index = 0; index < blobKzgCommitmentsLen; index++) {
if (blobsCache.has(index) === false) blobIdentifiers.push({blockRoot, index});
}

let allBlobSidecars: deneb.BlobSidecar[];
if (blobIdentifiers.length > 0) {
allBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, blobIdentifiers);
} else {
allBlobSidecars = [];
}

// add them in cache so that its reflected in all the blockInputs that carry this
// for e.g. a blockInput that might be awaiting blobs promise fullfillment in
// verifyBlocksDataAvailability
for (const blobSidecar of allBlobSidecars) {
blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null});
}

// check and see if all blobs are now available and in that case resolve availability
// if not this will error and the leftover blobs will be tried from another peer
const allBlobs = getBlockInputBlobs(blobsCache);
const {blobs, blobsBytes} = allBlobs;
if (blobs.length !== blobKzgCommitmentsLen) {
throw Error(`Not all blobs fetched missingBlobs=${blobKzgCommitmentsLen - blobs.length}`);
}

resolveAvailability(allBlobs);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC});
return getBlockInput.postDeneb(config, block, BlockSource.byRoot, blobs, blockBytes, blobsBytes);
}
17 changes: 14 additions & 3 deletions packages/beacon-node/src/sync/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Logger} from "@lodestar/utils";
import {RootHex, Slot, phase0} from "@lodestar/types";
import {BeaconConfig} from "@lodestar/config";
import {routes} from "@lodestar/api";
import {BlockInput} from "../chain/blocks/types.js";
import {BlockInput, BlockInputType} from "../chain/blocks/types.js";
import {INetwork} from "../network/index.js";
import {IBeaconChain} from "../chain/index.js";
import {Metrics} from "../metrics/index.js";
Expand Down Expand Up @@ -56,7 +56,7 @@ export interface SyncModules {
}

export type UnknownAndAncestorBlocks = {
unknowns: UnknownBlock[];
unknowns: (UnknownBlock | UnknownBlockInput)[];
ancestors: DownloadedBlock[];
};

Expand All @@ -66,7 +66,7 @@ export type UnknownAndAncestorBlocks = {
* - store 1 record with known parentBlockRootHex & blockInput, blockRootHex as key, status downloaded
* - store 1 record with undefined parentBlockRootHex & blockInput, parentBlockRootHex as key, status pending
*/
export type PendingBlock = UnknownBlock | DownloadedBlock;
export type PendingBlock = UnknownBlock | UnknownBlockInput | DownloadedBlock;

type PendingBlockCommon = {
blockRootHex: RootHex;
Expand All @@ -80,6 +80,15 @@ export type UnknownBlock = PendingBlockCommon & {
blockInput: null;
};

/**
* either the blobs are unknown or in future some blobs and even the block is unknown
*/
export type UnknownBlockInput = PendingBlockCommon & {
status: PendingBlockStatus.pending | PendingBlockStatus.fetching;
parentBlockRootHex: null;
blockInput: BlockInput & {type: BlockInputType.blobsPromise};
};

export type DownloadedBlock = PendingBlockCommon & {
status: PendingBlockStatus.downloaded | PendingBlockStatus.processing;
parentBlockRootHex: RootHex;
Expand All @@ -102,4 +111,6 @@ export enum PendingBlockType {
* During gossip time, we may get a block but the parent root is unknown (not in forkchoice).
*/
UNKNOWN_PARENT = "unknown_parent",

UNKNOWN_BLOCKINPUT = "unknown_blockinput",
}
Loading

0 comments on commit d66f607

Please sign in to comment.