Skip to content

Commit

Permalink
fix: single state tree at start up (#7056)
Browse files Browse the repository at this point in the history
* feat: use db state to load ws state

* feat: log state size

* fix: rename initStateFromAnchorState to checkAndPersistAnchorState

* fix: only persist anchor state if it's cp state

* fix: avoid redundant anchor state serialization
  • Loading branch information
twoeths authored and philknows committed Sep 11, 2024
1 parent 94e3878 commit fa67e62
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 50 deletions.
18 changes: 10 additions & 8 deletions packages/beacon-node/src/chain/initState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ export async function persistGenesisResult(
export async function persistAnchorState(
config: ChainForkConfig,
db: IBeaconDb,
anchorState: BeaconStateAllForks
anchorState: BeaconStateAllForks,
anchorStateBytes: Uint8Array
): Promise<void> {
if (anchorState.slot === GENESIS_SLOT) {
const genesisBlock = createGenesisBlock(config, anchorState);
await Promise.all([
db.blockArchive.add(genesisBlock),
db.block.add(genesisBlock),
db.stateArchive.add(anchorState),
db.stateArchive.putBinary(anchorState.slot, anchorStateBytes),
]);
} else {
await db.stateArchive.add(anchorState);
await db.stateArchive.putBinary(anchorState.slot, anchorStateBytes);
}
}

Expand Down Expand Up @@ -154,16 +155,17 @@ export async function initStateFromDb(
/**
* Initialize and persist an anchor state (either weak subjectivity or genesis)
*/
export async function initStateFromAnchorState(
export async function checkAndPersistAnchorState(
config: ChainForkConfig,
db: IBeaconDb,
logger: Logger,
anchorState: BeaconStateAllForks,
anchorStateBytes: Uint8Array,
{
isWithinWeakSubjectivityPeriod,
isCheckpointState,
}: {isWithinWeakSubjectivityPeriod: boolean; isCheckpointState: boolean}
): Promise<BeaconStateAllForks> {
): Promise<void> {
const expectedFork = config.getForkInfo(computeStartSlotAtEpoch(anchorState.fork.epoch));
const expectedForkVersion = toHex(expectedFork.version);
const stateFork = toHex(anchorState.fork.currentVersion);
Expand Down Expand Up @@ -191,9 +193,9 @@ export async function initStateFromAnchorState(
logger.warn("Checkpoint sync recommended, please use --help to see checkpoint sync options");
}

await persistAnchorState(config, db, anchorState);

return anchorState;
if (isCheckpointState || anchorState.slot === GENESIS_SLOT) {
await persistAnchorState(config, db, anchorState, anchorStateBytes);
}
}

export function initBeaconMetrics(metrics: Metrics, state: BeaconStateAllForks): void {
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export {initStateFromAnchorState, initStateFromDb, initStateFromEth1} from "./chain/index.js";
export {checkAndPersistAnchorState, initStateFromDb, initStateFromEth1} from "./chain/index.js";
export {BeaconDb, type IBeaconDb} from "./db/index.js";
export {Eth1Provider, type IEth1Provider} from "./eth1/index.js";
export {createNodeJsLibp2p, type NodeJsLibp2pOpts} from "./network/index.js";
Expand All @@ -20,4 +20,4 @@ export {RestApiServer} from "./api/rest/base.js";
export type {RestApiServerOpts, RestApiServerModules, RestApiServerMetrics} from "./api/rest/base.js";

// Export type util for CLI - TEMP move to lodestar-types eventually
export {getStateTypeFromBytes} from "./util/multifork.js";
export {getStateTypeFromBytes, getStateSlotFromBytes} from "./util/multifork.js";
101 changes: 71 additions & 30 deletions packages/cli/src/cmds/beacon/initBeaconState.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import {ssz} from "@lodestar/types";
import {createBeaconConfig, BeaconConfig, ChainForkConfig} from "@lodestar/config";
import {Logger} from "@lodestar/utils";
import {Logger, formatBytes} from "@lodestar/utils";
import {
isWithinWeakSubjectivityPeriod,
ensureWithinWeakSubjectivityPeriod,
BeaconStateAllForks,
loadState,
loadStateAndValidators,
} from "@lodestar/state-transition";
import {
IBeaconDb,
IBeaconNodeOptions,
initStateFromAnchorState,
checkAndPersistAnchorState,
initStateFromEth1,
getStateTypeFromBytes,
} from "@lodestar/beacon-node";
Expand All @@ -25,32 +27,36 @@ import {
} from "../../networks/index.js";
import {BeaconArgs} from "./options.js";

type StateWithBytes = {state: BeaconStateAllForks; stateBytes: Uint8Array};

async function initAndVerifyWeakSubjectivityState(
config: BeaconConfig,
db: IBeaconDb,
logger: Logger,
store: BeaconStateAllForks,
wsState: BeaconStateAllForks,
dbStateBytes: StateWithBytes,
wsStateBytes: StateWithBytes,
wsCheckpoint: Checkpoint,
opts: {ignoreWeakSubjectivityCheck?: boolean} = {}
): Promise<{anchorState: BeaconStateAllForks; wsCheckpoint: Checkpoint}> {
const dbState = dbStateBytes.state;
const wsState = wsStateBytes.state;
// Check if the store's state and wsState are compatible
if (
store.genesisTime !== wsState.genesisTime ||
!ssz.Root.equals(store.genesisValidatorsRoot, wsState.genesisValidatorsRoot)
dbState.genesisTime !== wsState.genesisTime ||
!ssz.Root.equals(dbState.genesisValidatorsRoot, wsState.genesisValidatorsRoot)
) {
throw new Error(
"Db state and checkpoint state are not compatible, either clear the db or verify your checkpoint source"
);
}

// Pick the state which is ahead as an anchor to initialize the beacon chain
let anchorState = wsState;
let anchorState = wsStateBytes;
let anchorCheckpoint = wsCheckpoint;
let isCheckpointState = true;
if (store.slot > wsState.slot) {
anchorState = store;
anchorCheckpoint = getCheckpointFromState(store);
if (dbState.slot > wsState.slot) {
anchorState = dbStateBytes;
anchorCheckpoint = getCheckpointFromState(dbState);
isCheckpointState = false;
logger.verbose(
"Db state is ahead of the provided checkpoint state, using the db state to initialize the beacon chain"
Expand All @@ -59,19 +65,19 @@ async function initAndVerifyWeakSubjectivityState(

// Throw error unless user explicitly asked not to, in testnets can happen that wss period is too small
// that even some epochs of non finalization can cause finalized checkpoint to be out of valid range
const wssCheck = wrapFnError(() => ensureWithinWeakSubjectivityPeriod(config, anchorState, anchorCheckpoint));
const wssCheck = wrapFnError(() => ensureWithinWeakSubjectivityPeriod(config, anchorState.state, anchorCheckpoint));
const isWithinWeakSubjectivityPeriod = wssCheck.err === null;
if (!isWithinWeakSubjectivityPeriod && !opts.ignoreWeakSubjectivityCheck) {
throw wssCheck.err;
}

anchorState = await initStateFromAnchorState(config, db, logger, anchorState, {
await checkAndPersistAnchorState(config, db, logger, anchorState.state, anchorState.stateBytes, {
isWithinWeakSubjectivityPeriod,
isCheckpointState,
});

// Return the latest anchorState but still return original wsCheckpoint to validate in backfill
return {anchorState, wsCheckpoint};
return {anchorState: anchorState.state, wsCheckpoint};
}

/**
Expand All @@ -96,8 +102,20 @@ export async function initBeaconState(
}
// fetch the latest state stored in the db which will be used in all cases, if it exists, either
// i) used directly as the anchor state
// ii) used during verification of a weak subjectivity state,
const lastDbState = await db.stateArchive.lastValue();
// ii) used to load and verify a weak subjectivity state,
const lastDbSlot = await db.stateArchive.lastKey();
const stateBytes = lastDbSlot !== null ? await db.stateArchive.getBinary(lastDbSlot) : null;
let lastDbState: BeaconStateAllForks | null = null;
let lastDbValidatorsBytes: Uint8Array | null = null;
let lastDbStateWithBytes: StateWithBytes | null = null;
if (stateBytes) {
logger.verbose("Found the last archived state", {slot: lastDbSlot, size: formatBytes(stateBytes.length)});
const {state, validatorsBytes} = loadStateAndValidators(chainForkConfig, stateBytes);
lastDbState = state;
lastDbValidatorsBytes = validatorsBytes;
lastDbStateWithBytes = {state, stateBytes: stateBytes};
}

if (lastDbState) {
const config = createBeaconConfig(chainForkConfig, lastDbState.genesisValidatorsRoot);
const wssCheck = isWithinWeakSubjectivityPeriod(config, lastDbState, getCheckpointFromState(lastDbState));
Expand All @@ -107,27 +125,34 @@ export async function initBeaconState(
// Forcing to sync from checkpoint is only recommended if node is taking too long to sync from last db state.
// It is important to remind the user to remove this flag again unless it is absolutely necessary.
if (wssCheck) {
logger.warn("Forced syncing from checkpoint even though db state is within weak subjectivity period");
logger.warn(
`Forced syncing from checkpoint even though db state at slot ${lastDbState.slot} is within weak subjectivity period`
);
logger.warn("Please consider removing --forceCheckpointSync flag unless absolutely necessary");
}
} else {
// All cases when we want to directly use lastDbState as the anchor state:
// - if no checkpoint sync args provided, or
// - the lastDbState is within weak subjectivity period:
if ((!args.checkpointState && !args.checkpointSyncUrl) || wssCheck) {
const anchorState = await initStateFromAnchorState(config, db, logger, lastDbState, {
if (stateBytes === null) {
// this never happens
throw Error(`There is no stateBytes for the lastDbState at slot ${lastDbState.slot}`);
}
await checkAndPersistAnchorState(config, db, logger, lastDbState, stateBytes, {
isWithinWeakSubjectivityPeriod: wssCheck,
isCheckpointState: false,
});
return {anchorState};
return {anchorState: lastDbState};
}
}
}

// See if we can sync state using checkpoint sync args or else start from genesis
if (args.checkpointState) {
return readWSState(
lastDbState,
lastDbStateWithBytes,
lastDbValidatorsBytes,
{
checkpointState: args.checkpointState,
wssCheckpoint: args.wssCheckpoint,
Expand All @@ -139,7 +164,8 @@ export async function initBeaconState(
);
} else if (args.checkpointSyncUrl) {
return fetchWSStateFromBeaconApi(
lastDbState,
lastDbStateWithBytes,
lastDbValidatorsBytes,
{
checkpointSyncUrl: args.checkpointSyncUrl,
wssCheckpoint: args.wssCheckpoint,
Expand All @@ -153,10 +179,10 @@ export async function initBeaconState(
const genesisStateFile = args.genesisStateFile || getGenesisFileUrl(args.network || defaultNetwork);
if (genesisStateFile && !args.forceGenesis) {
const stateBytes = await downloadOrLoadFile(genesisStateFile);
let anchorState = getStateTypeFromBytes(chainForkConfig, stateBytes).deserializeToViewDU(stateBytes);
const anchorState = getStateTypeFromBytes(chainForkConfig, stateBytes).deserializeToViewDU(stateBytes);
const config = createBeaconConfig(chainForkConfig, anchorState.genesisValidatorsRoot);
const wssCheck = isWithinWeakSubjectivityPeriod(config, anchorState, getCheckpointFromState(anchorState));
anchorState = await initStateFromAnchorState(config, db, logger, anchorState, {
await checkAndPersistAnchorState(config, db, logger, anchorState, stateBytes, {
isWithinWeakSubjectivityPeriod: wssCheck,
isCheckpointState: true,
});
Expand All @@ -170,7 +196,8 @@ export async function initBeaconState(
}

async function readWSState(
lastDbState: BeaconStateAllForks | null,
lastDbStateBytes: StateWithBytes | null,
lastDbValidatorsBytes: Uint8Array | null,
wssOpts: {checkpointState: string; wssCheckpoint?: string; ignoreWeakSubjectivityCheck?: boolean},
chainForkConfig: ChainForkConfig,
db: IBeaconDb,
Expand All @@ -180,19 +207,28 @@ async function readWSState(
// if a weak subjectivity checkpoint has been provided, it is used for additional verification
// otherwise, the state itself is used for verification (not bad, because the trusted state has been explicitly provided)
const {checkpointState, wssCheckpoint, ignoreWeakSubjectivityCheck} = wssOpts;
const lastDbState = lastDbStateBytes?.state ?? null;

const stateBytes = await downloadOrLoadFile(checkpointState);
const wsState = getStateTypeFromBytes(chainForkConfig, stateBytes).deserializeToViewDU(stateBytes);
let wsState: BeaconStateAllForks;
if (lastDbState && lastDbValidatorsBytes) {
// use lastDbState to load wsState if possible to share the same state tree
wsState = loadState(chainForkConfig, lastDbState, stateBytes, lastDbValidatorsBytes).state;
} else {
wsState = getStateTypeFromBytes(chainForkConfig, stateBytes).deserializeToViewDU(stateBytes);
}
const config = createBeaconConfig(chainForkConfig, wsState.genesisValidatorsRoot);
const store = lastDbState ?? wsState;
const wsStateBytes = {state: wsState, stateBytes};
const store = lastDbStateBytes ?? wsStateBytes;
const checkpoint = wssCheckpoint ? getCheckpointFromArg(wssCheckpoint) : getCheckpointFromState(wsState);
return initAndVerifyWeakSubjectivityState(config, db, logger, store, wsState, checkpoint, {
return initAndVerifyWeakSubjectivityState(config, db, logger, store, wsStateBytes, checkpoint, {
ignoreWeakSubjectivityCheck,
});
}

async function fetchWSStateFromBeaconApi(
lastDbState: BeaconStateAllForks | null,
lastDbStateBytes: StateWithBytes | null,
lastDbValidatorsBytes: Uint8Array | null,
wssOpts: {checkpointSyncUrl: string; wssCheckpoint?: string; ignoreWeakSubjectivityCheck?: boolean},
chainForkConfig: ChainForkConfig,
db: IBeaconDb,
Expand All @@ -213,10 +249,15 @@ async function fetchWSStateFromBeaconApi(
throw e;
}

const {wsState, wsCheckpoint} = await fetchWeakSubjectivityState(chainForkConfig, logger, wssOpts);
const {wsState, wsStateBytes, wsCheckpoint} = await fetchWeakSubjectivityState(chainForkConfig, logger, wssOpts, {
lastDbState: lastDbStateBytes?.state ?? null,
lastDbValidatorsBytes,
});

const config = createBeaconConfig(chainForkConfig, wsState.genesisValidatorsRoot);
const store = lastDbState ?? wsState;
return initAndVerifyWeakSubjectivityState(config, db, logger, store, wsState, wsCheckpoint, {
const wsStateWithBytes = {state: wsState, stateBytes: wsStateBytes};
const store = lastDbStateBytes ?? wsStateWithBytes;
return initAndVerifyWeakSubjectivityState(config, db, logger, store, wsStateWithBytes, wsCheckpoint, {
ignoreWeakSubjectivityCheck: wssOpts.ignoreWeakSubjectivityCheck,
});
}
35 changes: 27 additions & 8 deletions packages/cli/src/networks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ import got from "got";
import {ENR} from "@chainsafe/enr";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {HttpHeader, MediaType, WireFormat, getClient} from "@lodestar/api";
import {getStateTypeFromBytes} from "@lodestar/beacon-node";
import {getStateSlotFromBytes} from "@lodestar/beacon-node";
import {ChainConfig, ChainForkConfig} from "@lodestar/config";
import {Checkpoint} from "@lodestar/types/phase0";
import {Slot} from "@lodestar/types";
import {fromHex, callFnWhenAwait, Logger} from "@lodestar/utils";
import {BeaconStateAllForks, getLatestBlockRoot, computeCheckpointEpochAtStateSlot} from "@lodestar/state-transition";
import {fromHex, callFnWhenAwait, Logger, formatBytes} from "@lodestar/utils";
import {
BeaconStateAllForks,
getLatestBlockRoot,
computeCheckpointEpochAtStateSlot,
loadState,
} from "@lodestar/state-transition";
import {parseBootnodesFile} from "../util/format.js";
import * as mainnet from "./mainnet.js";
import * as dev from "./dev.js";
Expand Down Expand Up @@ -140,8 +145,12 @@ export function readBootnodes(bootnodesFilePath: string): string[] {
export async function fetchWeakSubjectivityState(
config: ChainForkConfig,
logger: Logger,
{checkpointSyncUrl, wssCheckpoint}: {checkpointSyncUrl: string; wssCheckpoint?: string}
): Promise<{wsState: BeaconStateAllForks; wsCheckpoint: Checkpoint}> {
{checkpointSyncUrl, wssCheckpoint}: {checkpointSyncUrl: string; wssCheckpoint?: string},
{
lastDbState,
lastDbValidatorsBytes,
}: {lastDbState: BeaconStateAllForks | null; lastDbValidatorsBytes: Uint8Array | null}
): Promise<{wsState: BeaconStateAllForks; wsStateBytes: Uint8Array; wsCheckpoint: Checkpoint}> {
try {
let wsCheckpoint: Checkpoint | null;
let stateId: Slot | "finalized";
Expand Down Expand Up @@ -169,21 +178,31 @@ export async function fetchWeakSubjectivityState(
}
);

const stateBytes = await callFnWhenAwait(
const wsStateBytes = await callFnWhenAwait(
getStatePromise,
() => logger.info("Download in progress, please wait..."),
GET_STATE_LOG_INTERVAL
).then((res) => {
return res.ssz();
});

logger.info("Download completed", {stateId});
const wsSlot = getStateSlotFromBytes(wsStateBytes);
const logData = {stateId, size: formatBytes(wsStateBytes.length)};
logger.info("Download completed", typeof stateId === "number" ? logData : {...logData, slot: wsSlot});
// It should not be required to get fork type from bytes but Checkpointz does not return
// Eth-Consensus-Version header, see https://github.com/ethpandaops/checkpointz/issues/164
const wsState = getStateTypeFromBytes(config, stateBytes).deserializeToViewDU(stateBytes);
let wsState: BeaconStateAllForks;
if (lastDbState && lastDbValidatorsBytes) {
// use lastDbState to load wsState if possible to share the same state tree
wsState = loadState(config, lastDbState, wsStateBytes, lastDbValidatorsBytes).state;
} else {
const stateType = config.getForkTypes(wsSlot).BeaconState;
wsState = stateType.deserializeToViewDU(wsStateBytes);
}

return {
wsState,
wsStateBytes,
wsCheckpoint: wsCheckpoint ?? getCheckpointFromState(wsState),
};
} catch (e) {
Expand Down
1 change: 1 addition & 0 deletions packages/state-transition/src/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ export * from "./validator.js";
export * from "./weakSubjectivity.js";
export * from "./deposit.js";
export * from "./electra.js";
export * from "./loadState/index.js";
2 changes: 1 addition & 1 deletion packages/state-transition/src/util/loadState/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export {loadState} from "./loadState.js";
export {loadState, loadStateAndValidators} from "./loadState.js";
Loading

0 comments on commit fa67e62

Please sign in to comment.