Skip to content

Commit

Permalink
PrepareNextSlot scheduler (#4209)
Browse files Browse the repository at this point in the history
* Implement PrepareNextSlot scheduler

* Remove doing advanced state transition in importBlock function

* chore: fix test names

* Call updateHead() to reduce reorg possibility

* chore: fix lint
  • Loading branch information
twoeths authored Jul 6, 2022
1 parent 0b4c4bf commit 1b03e81
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 262 deletions.
92 changes: 27 additions & 65 deletions packages/lodestar/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ import {
computeStartSlotAtEpoch,
getEffectiveBalanceIncrementsZeroInactive,
computeEpochAtSlot,
isBellatrixStateType,
RootCache,
processSlots,
} from "@chainsafe/lodestar-beacon-state-transition";
import {IForkChoice, OnBlockPrecachedData, ForkChoiceError, ForkChoiceErrorCode} from "@chainsafe/lodestar-fork-choice";
import {ILogger} from "@chainsafe/lodestar-utils";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {IMetrics} from "../../metrics/index.js";
import {IExecutionEngine, PayloadId} from "../../execution/engine/interface.js";
import {IExecutionEngine} from "../../execution/engine/interface.js";
import {IBeaconDb} from "../../db/index.js";
import {ZERO_HASH_HEX} from "../../constants/index.js";
import {CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js";
Expand All @@ -24,7 +22,6 @@ import {ChainEventEmitter} from "../emitter.js";
import {LightClientServer} from "../lightClient/index.js";
import {SeenAggregatedAttestations} from "../seenCache/seenAggregateAndProof.js";
import {SeenBlockAttesters} from "../seenCache/seenBlockAttesters.js";
import {prepareExecutionPayload} from "../factory/block/body.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";
import {BeaconProposerCache} from "../beaconProposerCache.js";
import {IBeaconClock} from "../clock/index.js";
Expand Down Expand Up @@ -225,8 +222,7 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:

// Emit ChainEvent.forkChoiceHead event
const oldHead = chain.forkChoice.getHead();
chain.forkChoice.updateHead();
const newHead = chain.forkChoice.getHead();
const newHead = chain.forkChoice.updateHead();
const currFinalizedEpoch = chain.forkChoice.getFinalizedCheckpoint().epoch;

if (newHead.blockRoot !== oldHead.blockRoot) {
Expand Down Expand Up @@ -268,34 +264,32 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
}
}

void maybeIssueNextProposerEngineFcU(chain, postState).then((payloadId) => {
// NOTE: forkChoice.fsStore.finalizedCheckpoint MUST only change is response to an onBlock event
// Notify execution layer of head and finalized updates only if has already
// not been done via payloadId generation. But even if this fcU follows the
// payloadId one, there is no harm as the ELs will just ignore it.
if (payloadId === null && (newHead.blockRoot !== oldHead.blockRoot || currFinalizedEpoch !== prevFinalizedEpoch)) {
/**
* On post BELLATRIX_EPOCH but pre TTD, blocks include empty execution payload with a zero block hash.
* The consensus clients must not send notifyForkchoiceUpdate before TTD since the execution client will error.
* So we must check that:
* - `headBlockHash !== null` -> Pre BELLATRIX_EPOCH
* - `headBlockHash !== ZERO_HASH` -> Pre TTD
*/
const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
/**
* After BELLATRIX_EPOCH and TTD it's okay to send a zero hash block hash for the finalized block. This will happen if
* the current finalized block does not contain any execution payload at all (pre MERGE_EPOCH) or if it contains a
* zero block hash (pre TTD)
*/
const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
if (headBlockHash !== ZERO_HASH_HEX) {
chain.executionEngine.notifyForkchoiceUpdate(headBlockHash, safeBlockHash, finalizedBlockHash).catch((e) => {
chain.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
});
}
// NOTE: forkChoice.fsStore.finalizedCheckpoint MUST only change in response to an onBlock event
// Notifying EL of head and finalized updates as below is usually done within the 1st 4s of the slot.
// If there is an advanced payload generation in the next slot, we'll notify EL again 4s before next
// slot via PrepareNextSlotScheduler. There is no harm updating the ELs with same data, it will just ignore it.
if (newHead.blockRoot !== oldHead.blockRoot || currFinalizedEpoch !== prevFinalizedEpoch) {
/**
* On post BELLATRIX_EPOCH but pre TTD, blocks include empty execution payload with a zero block hash.
* The consensus clients must not send notifyForkchoiceUpdate before TTD since the execution client will error.
* So we must check that:
* - `headBlockHash !== null` -> Pre BELLATRIX_EPOCH
* - `headBlockHash !== ZERO_HASH` -> Pre TTD
*/
const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
/**
* After BELLATRIX_EPOCH and TTD it's okay to send a zero hash block hash for the finalized block. This will happen if
* the current finalized block does not contain any execution payload at all (pre MERGE_EPOCH) or if it contains a
* zero block hash (pre TTD)
*/
const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
if (headBlockHash !== ZERO_HASH_HEX) {
chain.executionEngine.notifyForkchoiceUpdate(headBlockHash, safeBlockHash, finalizedBlockHash).catch((e) => {
chain.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
});
}
});
}

// Emit ChainEvent.block event
//
Expand Down Expand Up @@ -329,38 +323,6 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
});
}

async function maybeIssueNextProposerEngineFcU(
chain: ImportBlockModules,
state: CachedBeaconStateAllForks
): Promise<PayloadId | null> {
const prepareSlot = state.slot + 1;
const prepareEpoch = computeEpochAtSlot(prepareSlot);
// No need to try building block if we are not synced
if (prepareSlot !== chain.clock.currentSlot + 1 || prepareEpoch < chain.config.BELLATRIX_FORK_EPOCH) {
return null;
}
const prepareState = processSlots(state, prepareSlot);
// TODO wait till third/last interval of the slot to actual send an fcU
// so that any head change is accomodated before that. However this could
// be optimized if the last block receieved is already head. This will be
// especially meaningful for mev boost which might have more delays
// because of how protocol is designed
if (isBellatrixStateType(prepareState)) {
try {
const proposerIndex = prepareState.epochCtx.getBeaconProposer(prepareSlot);
const feeRecipient = chain.beaconProposerCache.get(proposerIndex);
if (feeRecipient) {
const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
return prepareExecutionPayload(chain, safeBlockHash, finalizedBlockHash, prepareState, feeRecipient);
}
} catch (e) {
chain.logger.error("Error on issuing next proposer engine fcU", {}, e as Error);
}
}
return null;
}

/**
* Returns the closest state to postState.currentJustifiedCheckpoint in the same fork as postState
*
Expand Down
4 changes: 2 additions & 2 deletions packages/lodestar/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import {
} from "./opPools/index.js";
import {LightClientServer} from "./lightClient/index.js";
import {Archiver} from "./archiver/index.js";
import {PrecomputeNextEpochTransitionScheduler} from "./precomputeNextEpochTransition.js";
import {PrepareNextSlotScheduler} from "./prepareNextSlot.js";
import {ReprocessController} from "./reprocess.js";
import {SeenAggregatedAttestations} from "./seenCache/seenAggregateAndProof.js";
import {SeenBlockAttesters} from "./seenCache/seenBlockAttesters.js";
Expand Down Expand Up @@ -230,7 +230,7 @@ export class BeaconChain implements IBeaconChain {
this.lightClientServer = lightClientServer;

this.archiver = new Archiver(db, this, logger, signal, opts);
new PrecomputeNextEpochTransitionScheduler(this, this.config, metrics, this.logger, signal);
new PrepareNextSlotScheduler(this, this.config, metrics, this.logger, signal);

metrics?.opPool.aggregatedAttestationPoolSize.addCollect(() => this.onScrapeMetrics());

Expand Down
88 changes: 0 additions & 88 deletions packages/lodestar/src/chain/precomputeNextEpochTransition.ts

This file was deleted.

130 changes: 130 additions & 0 deletions packages/lodestar/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import {computeEpochAtSlot, isBellatrixStateType} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {ForkSeq, SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params";
import {Slot} from "@chainsafe/lodestar-types";
import {ILogger, sleep} from "@chainsafe/lodestar-utils";
import {GENESIS_EPOCH, ZERO_HASH_HEX} from "../constants/constants.js";
import {IMetrics} from "../metrics/index.js";
import {ChainEvent} from "./emitter.js";
import {prepareExecutionPayload} from "./factory/block/body.js";
import {IBeaconChain} from "./interface.js";
import {RegenCaller} from "./regen/index.js";

/* With 12s slot times, this scheduler will run 4s before the start of each slot (`12 / 3 = 4`). */
const SCHEDULER_LOOKAHEAD_FACTOR = 3;

/* We don't want to do more epoch transition than this */
const PREPARE_EPOCH_LIMIT = 1;

/**
* At Bellatrix, if we are responsible for proposing in next slot, we want to prepare payload
* 4s (1/3 slot) before the start of next slot
*
* For all forks, when clock is 1/3 slot before an epoch, we want to prepare for the next epoch
* transition from our head so that:
* + validators vote for block head on time through attestation
* + validators propose blocks on time
* + For Bellatrix, to compute proposers of next epoch so that we can prepare new payloads
*
*/
export class PrepareNextSlotScheduler {
constructor(
private readonly chain: IBeaconChain,
private readonly config: IChainForkConfig,
private readonly metrics: IMetrics | null,
private readonly logger: ILogger,
private readonly signal: AbortSignal
) {
this.chain.emitter.on(ChainEvent.clockSlot, this.prepareForNextSlot);

this.signal.addEventListener(
"abort",
() => {
this.chain.emitter.off(ChainEvent.clockSlot, this.prepareForNextSlot);
},
{once: true}
);
}

/**
* Use clockSlot instead of clockEpoch to schedule the task at more exact time.
*/
prepareForNextSlot = async (clockSlot: Slot): Promise<void> => {
const prepareSlot = clockSlot + 1;
const prepareEpoch = computeEpochAtSlot(prepareSlot);
const isLastEpochSlot = (clockSlot + 1) % SLOTS_PER_EPOCH === 0;
if (this.config.getForkSeq(prepareEpoch) < ForkSeq.bellatrix && !isLastEpochSlot) {
return;
}

const slotMs = this.config.SECONDS_PER_SLOT * 1000;
// At 1/3 slot time before the next slot, we either prepare payload or precompute epoch transition
await sleep(slotMs - slotMs / SCHEDULER_LOOKAHEAD_FACTOR, this.signal);

// calling updateHead() here before we produce a block to reduce reorg possibility
const {slot: headSlot, blockRoot: headRoot} = this.chain.forkChoice.updateHead();
const nextEpoch = computeEpochAtSlot(clockSlot) + 1;
// Do nothing at pre genesis
if (nextEpoch <= GENESIS_EPOCH) return;

const headEpoch = computeEpochAtSlot(headSlot);
if (prepareEpoch - headEpoch > PREPARE_EPOCH_LIMIT) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "skip"}, 1);
this.logger.debug("Skipping PrepareNextSlotScheduler - head slot is too behind current slot", {
nextEpoch,
headSlot,
clockSlot,
});

return;
}

if (prepareEpoch > headEpoch) {
this.logger.verbose("Running PrepareNextSlotScheduler epoch transition", {nextEpoch, headSlot, prepareSlot});
}

// No need to wait for this or the clock drift
// Pre Bellatrix: we only do precompute state transition for the last slot of epoch
// For Bellatrix, we always do the `processSlots()` to prepare payload for the next slot
this.chain.regen
.getBlockSlotState(headRoot, prepareSlot, RegenCaller.precomputeEpoch)
.then((prepareState) => {
// assuming there is no reorg, it caches the checkpoint state & helps avoid doing a full state transition in the next slot
// + when gossip block comes, we need to validate and run state transition
// + if next slot is a skipped slot, it'd help getting target checkpoint state faster to validate attestations
if (prepareEpoch > headEpoch) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "success"}, 1);
const previousHits = this.chain.checkpointStateCache.updatePreComputedCheckpoint(headRoot, nextEpoch);
if (previousHits === 0) {
this.metrics?.precomputeNextEpochTransition.waste.inc();
}
this.metrics?.precomputeNextEpochTransition.hits.set(previousHits ?? 0);
this.logger.verbose("Completed PrepareNextSlotScheduler epoch transition", {
nextEpoch,
headSlot,
prepareSlot,
});
}

if (isBellatrixStateType(prepareState)) {
const proposerIndex = prepareState.epochCtx.getBeaconProposer(prepareSlot);
const feeRecipient = this.chain.beaconProposerCache.get(proposerIndex);
if (feeRecipient) {
const safeBlockHash = this.chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash =
this.chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
void prepareExecutionPayload(this.chain, safeBlockHash, finalizedBlockHash, prepareState, feeRecipient);
this.logger.verbose("PrepareNextSlotScheduler prepared new payload", {
prepareSlot,
proposerIndex,
feeRecipient,
});
}
}
})
.catch((e) => {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1);
this.logger.error("Failed to precompute epoch transition", nextEpoch, e);
});
};
}
4 changes: 2 additions & 2 deletions packages/lodestar/test/unit/api/impl/debug/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {INetwork, Network} from "../../../../../src/network/index.js";
import {IBeaconChain} from "../../../../../src/chain/index.js";
import {generateProtoBlock} from "../../../../utils/block.js";
import {StubbedBeaconDb} from "../../../../utils/stub/index.js";
import {generateState} from "../../../../utils/state.js";
import {generateCachedAltairState, generateState} from "../../../../utils/state.js";
import {setupApiImplTestServer} from "../index.test.js";
import {SinonStubFn} from "../../../../utils/types.js";

Expand Down Expand Up @@ -60,7 +60,7 @@ describe.skip("api - debug - beacon", function () {
});

it("getStateV2 - should be able to convert to json", async function () {
resolveStateIdStub.resolves(generateState({}, minimalConfig, true));
resolveStateIdStub.resolves(generateCachedAltairState({}, minimalConfig));
const {data: state} = await debugApi.getStateV2("something");
expect(() => ssz.altair.BeaconState.toJson(state as altair.BeaconState)).to.not.throw();
});
Expand Down
Loading

0 comments on commit 1b03e81

Please sign in to comment.