Skip to content

Commit

Permalink
Merge 9911750 into 6d01593
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Aug 26, 2024
2 parents 6d01593 + 9911750 commit 6076e72
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 12 deletions.
70 changes: 58 additions & 12 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {fromHexString} from "@chainsafe/ssz";
import {phase0, Slot, RootHex, BeaconBlock} from "@lodestar/types";
import {phase0, Slot, RootHex, BeaconBlock, SignedBeaconBlock} from "@lodestar/types";
import {
CachedBeaconStateAllForks,
computeEpochAtSlot,
Expand All @@ -8,6 +8,7 @@ import {
DataAvailableStatus,
processSlots,
stateTransition,
StateHashTreeRootSource,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Logger, toRootHex} from "@lodestar/utils";
Expand Down Expand Up @@ -145,7 +146,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
caller: RegenCaller,
opts?: StateCloneOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
Expand All @@ -156,6 +157,13 @@ export class StateRegenerator implements IStateRegeneratorInternal {
return cachedStateCtx;
}

// in block gossip validation (getPreState() call), dontTransferCache is specified as true because we only want to transfer cache in verifyBlocksStateTransitionOnly()
// but here we want to process blocks as fast as possible so force to transfer cache in this case
if (opts && allowDiskReload) {
// if there is no `opts` specified, it already means "false"
opts.dontTransferCache = false;
}

// Otherwise we have to use the fork choice to traverse backwards, block by block,
// searching the state caches
// then replay blocks forward to the desired stateRoot
Expand All @@ -166,6 +174,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const blocksToReplay = [block];
let state: CachedBeaconStateAllForks | null = null;
const {checkpointStateCache} = this.modules;

const getSeedStateTimer = this.modules.metrics?.regenGetState.getSeedState.startTimer({caller});
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.blockStateCache.get(b.stateRoot, opts);
Expand All @@ -181,26 +191,58 @@ export class StateRegenerator implements IStateRegeneratorInternal {
}
blocksToReplay.push(b);
}
getSeedStateTimer?.();

if (state === null) {
throw new RegenError({
code: RegenErrorCode.NO_SEED_STATE,
});
}

const blockCount = blocksToReplay.length;
const MAX_EPOCH_TO_PROCESS = 5;
if (blocksToReplay.length > MAX_EPOCH_TO_PROCESS * SLOTS_PER_EPOCH) {
if (blockCount > MAX_EPOCH_TO_PROCESS * SLOTS_PER_EPOCH) {
throw new RegenError({
code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED,
stateRoot,
});
}

const replaySlots = blocksToReplay.map((b) => b.slot).join(",");
this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots});
for (const b of blocksToReplay.reverse()) {
const block = await this.modules.db.block.get(fromHexString(b.blockRoot));
if (!block) {
this.modules.metrics?.regenGetState.blockCount.observe({caller}, blockCount);

const replaySlots = new Array<Slot>(blockCount);
const blockPromises = new Array<Promise<SignedBeaconBlock | null>>(blockCount);

const protoBlocksAsc = blocksToReplay.reverse();
for (const [i, protoBlock] of protoBlocksAsc.entries()) {
replaySlots[i] = protoBlock.slot;
blockPromises[i] = this.modules.db.block.get(fromHexString(protoBlock.blockRoot));
}

const logCtx = {stateRoot, replaySlots: replaySlots.join(",")};
this.modules.logger.debug("Replaying blocks to get state", logCtx);

const loadBlocksTimer = this.modules.metrics?.regenGetState.loadBlocks.startTimer({caller});
const blockOrNulls = await Promise.all(blockPromises);
loadBlocksTimer?.();

const blocksByRoot = new Map<RootHex, SignedBeaconBlock>();
for (const [i, blockOrNull] of blockOrNulls.entries()) {
// checking early here helps prevent unneccessary state transition below
if (blockOrNull === null) {
throw new RegenError({
code: RegenErrorCode.BLOCK_NOT_IN_DB,
blockRoot: protoBlocksAsc[i].blockRoot,
});
}
blocksByRoot.set(protoBlocksAsc[i].blockRoot, blockOrNull);
}

const stateTransitionTimer = this.modules.metrics?.regenGetState.stateTransition.startTimer({caller});
for (const b of protoBlocksAsc) {
const block = blocksByRoot.get(b.blockRoot);
// just to make compiler happy, we checked in the above for loop already
if (block === undefined) {
throw new RegenError({
code: RegenErrorCode.BLOCK_NOT_IN_DB,
blockRoot: b.blockRoot,
Expand All @@ -224,7 +266,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
this.modules.metrics
);

const hashTreeRootTimer = this.modules.metrics?.stateHashTreeRootTime.startTimer({
source: StateHashTreeRootSource.regenState,
});
const stateRoot = toRootHex(state.hashTreeRoot());
hashTreeRootTimer?.();

if (b.stateRoot !== stateRoot) {
throw new RegenError({
slot: b.slot,
Expand All @@ -238,17 +285,16 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// also with allowDiskReload flag, we "reload" it to the state cache too
this.modules.blockStateCache.add(state);
}

// this avoids keeping our node busy processing blocks
await nextEventLoop();
} catch (e) {
throw new RegenError({
code: RegenErrorCode.STATE_TRANSITION_ERROR,
error: e as Error,
});
}
}
this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots});
stateTransitionTimer?.();

this.modules.logger.debug("Replayed blocks to get state", {...logCtx, stateSlot: state.slot});

return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
newCachedState.commit();
const stateRoot = toRootHex(newCachedState.hashTreeRoot());
timer?.();

// load all cache in order for consumers (usually regen.getState()) to process blocks faster
newCachedState.validators.getAllReadonlyValues();
newCachedState.balances.getAll();
this.logger.debug("Reload: cached state load successful", {
...logMeta,
stateSlot: newCachedState.slot,
Expand Down
28 changes: 28 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,34 @@ export function createLodestarMetrics(
help: "UnhandledPromiseRejection total count",
}),

// regen.getState metrics
regenGetState: {
blockCount: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_block_count",
help: "Block count in regen.getState",
labelNames: ["caller"],
buckets: [4, 8, 16, 32, 64],
}),
getSeedState: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_get_seed_state_seconds",
help: "Duration of get seed state in regen.getState",
labelNames: ["caller"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
loadBlocks: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_load_blocks_seconds",
help: "Duration of load blocks in regen.getState",
labelNames: ["caller"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
stateTransition: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_state_transition_seconds",
help: "Duration of state transition in regen.getState",
labelNames: ["caller"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
},

// Precompute next epoch transition
precomputeNextEpochTransition: {
count: register.counter<{result: string}>({
Expand Down
1 change: 1 addition & 0 deletions packages/state-transition/src/stateTransition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export enum StateHashTreeRootSource {
blockTransition = "block_transition",
prepareNextSlot = "prepare_next_slot",
prepareNextEpoch = "prepare_next_epoch",
regenState = "regen_state",
computeNewStateRoot = "compute_new_state_root",
}

Expand Down

0 comments on commit 6076e72

Please sign in to comment.