(payload: RpcPayload, opts?: ReqOpts): Promise;
fetchWithRetries(payload: RpcPayload, opts?: ReqOpts): Promise;
fetchBatch(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise;
+ emitter: JsonRpcHttpClientEventEmitter;
}
export class JsonRpcHttpClient implements IJsonRpcHttpClient {
@@ -57,7 +83,10 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* the token freshness +-5 seconds (via `iat` property of the token claim)
*/
private readonly jwtSecret?: Uint8Array;
+ private readonly jwtId?: string;
+ private readonly jwtVersion?: string;
private readonly metrics: JsonRpcHttpClientMetrics | null;
+ readonly emitter = new JsonRpcHttpClientEventEmitter();
constructor(
private readonly urls: string[],
@@ -76,6 +105,10 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* and it might deny responses to the RPC requests.
*/
jwtSecret?: Uint8Array;
+ /** If jwtSecret and jwtId are provided, jwtId will be included in JwtClaim.id */
+ jwtId?: string;
+ /** If jwtSecret and jwtVersion are provided, jwtVersion will be included in JwtClaim.clv. */
+ jwtVersion?: string;
/** Retry attempts */
retryAttempts?: number;
/** Retry delay, only relevant with retry attempts */
@@ -98,6 +131,8 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
}
this.jwtSecret = opts?.jwtSecret;
+ this.jwtId = opts?.jwtId;
+ this.jwtVersion = opts?.jwtVersion;
this.metrics = opts?.metrics ?? null;
this.metrics?.configUrlsCount.set(urls.length);
@@ -107,31 +142,39 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* Perform RPC request
*/
async fetch(payload: RpcPayload, opts?: ReqOpts): Promise {
- const res: RpcResponse = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
- return parseRpcResponse(res, payload);
+ return this.wrapWithEvents(
+ async () => {
+ const res: RpcResponse = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
+ return parseRpcResponse(res, payload);
+ },
+ {payload}
+ );
}
/**
* Perform RPC request with retry
*/
async fetchWithRetries(payload: RpcPayload, opts?: ReqOpts): Promise {
- const routeId = opts?.routeId ?? "unknown";
-
- const res = await retry>(
- async (attempt) => {
- /** If this is a retry, increment the retry counter for this method */
- if (attempt > 1) {
- this.opts?.metrics?.retryCount.inc({routeId});
+ return this.wrapWithEvents(async () => {
+ const routeId = opts?.routeId ?? "unknown";
+
+ const res = await retry>(
+ async (attempt) => {
+ /** If this is a retry, increment the retry counter for this method */
+ if (attempt > 1) {
+ this.opts?.metrics?.retryCount.inc({routeId});
+ }
+ return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
+ },
+ {
+ retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
+ retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
+ shouldRetry: opts?.shouldRetry,
+ signal: this.opts?.signal,
}
- return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
- },
- {
- retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
- retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0,
- shouldRetry: opts?.shouldRetry,
- }
- );
- return parseRpcResponse(res, payload);
+ );
+ return parseRpcResponse(res, payload);
+ }, payload);
}
/**
@@ -139,26 +182,41 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* Type-wise assumes all requests results have the same type
*/
async fetchBatch(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise {
- if (rpcPayloadArr.length === 0) return [];
-
- const resArr: RpcResponse[] = await this.fetchJson(
- rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
- opts
- );
+ return this.wrapWithEvents(async () => {
+ if (rpcPayloadArr.length === 0) return [];
+
+ const resArr: RpcResponse[] = await this.fetchJson(
+ rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})),
+ opts
+ );
+
+ if (!Array.isArray(resArr)) {
+ // Nethermind may reply to batch request with a JSON RPC error
+ if ((resArr as RpcResponseError).error !== undefined) {
+ throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
+ }
- if (!Array.isArray(resArr)) {
- // Nethermind may reply to batch request with a JSON RPC error
- if ((resArr as RpcResponseError).error !== undefined) {
- throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch");
+ throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
}
- throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`);
- }
+ return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
+ }, rpcPayloadArr);
+ }
- return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i]));
+ private async wrapWithEvents(func: () => Promise, payload?: unknown): Promise {
+ try {
+ const response = await func();
+ this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response});
+ return response;
+ } catch (error) {
+ this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error});
+ throw error;
+ }
}
private async fetchJson(json: T, opts?: ReqOpts): Promise {
+ if (this.urls.length === 0) throw Error("No url provided");
+
const routeId = opts?.routeId ?? "unknown";
let lastError: Error | null = null;
@@ -170,21 +228,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
try {
return await this.fetchJsonOneUrl(this.urls[i], json, opts);
} catch (e) {
+ lastError = e as Error;
if (this.opts?.shouldNotFallback?.(e as Error)) {
- throw e;
+ break;
}
-
- lastError = e as Error;
}
}
-
- if (lastError !== null) {
- throw lastError;
- } else if (this.urls.length === 0) {
- throw Error("No url provided");
- } else {
- throw Error("Unknown error");
- }
+ throw lastError ?? Error("Unknown error");
}
/**
@@ -213,7 +263,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
*
* Jwt auth spec: https://github.com/ethereum/execution-apis/pull/167
*/
- const token = encodeJwtToken({iat: Math.floor(new Date().getTime() / 1000)}, this.jwtSecret);
+ const jwtClaim: JwtClaim = {
+ iat: Math.floor(Date.now() / 1000),
+ id: this.jwtId,
+ clv: this.jwtVersion,
+ };
+
+ const token = encodeJwtToken(jwtClaim, this.jwtSecret);
headers["Authorization"] = `Bearer ${token}`;
}
@@ -238,7 +294,6 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
return bodyJson;
} catch (e) {
this.metrics?.requestErrors.inc({routeId});
-
if (controller.signal.aborted) {
// controller will abort on both parent signal abort + timeout of this specific request
if (this.opts?.signal?.aborted) {
diff --git a/packages/beacon-node/src/eth1/provider/jwt.ts b/packages/beacon-node/src/eth1/provider/jwt.ts
index d9ed24ded165..1e267120957f 100644
--- a/packages/beacon-node/src/eth1/provider/jwt.ts
+++ b/packages/beacon-node/src/eth1/provider/jwt.ts
@@ -4,8 +4,11 @@ import jwt from "jwt-simple";
const {encode, decode} = jwt;
-/** jwt token has iat which is issued at unix timestamp, and an optional exp for expiry */
-type JwtClaim = {iat: number; exp?: number};
+/**
+ * jwt token has iat which is issued at unix timestamp, an optional exp for expiry,
+ * an optional id as unique identifier, and an optional clv for client type/version
+ */
+export type JwtClaim = {iat: number; exp?: number; id?: string; clv?: string};
export function encodeJwtToken(
claim: JwtClaim,
diff --git a/packages/beacon-node/src/execution/builder/http.ts b/packages/beacon-node/src/execution/builder/http.ts
index 39ad6062edfe..bfe003372ced 100644
--- a/packages/beacon-node/src/execution/builder/http.ts
+++ b/packages/beacon-node/src/execution/builder/http.ts
@@ -1,8 +1,13 @@
import {byteArrayEquals, toHexString} from "@chainsafe/ssz";
import {allForks, bellatrix, Slot, Root, BLSPubkey, ssz, deneb, Wei} from "@lodestar/types";
+import {
+ parseSignedBlindedBlockOrContents,
+ parseExecutionPayloadAndBlobsBundle,
+ reconstructFullBlockOrContents,
+} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {getClient, Api as BuilderApi} from "@lodestar/api/builder";
-import {SLOTS_PER_EPOCH} from "@lodestar/params";
+import {SLOTS_PER_EPOCH, ForkExecution} from "@lodestar/params";
import {ApiError} from "@lodestar/api";
import {Metrics} from "../../metrics/metrics.js";
@@ -84,31 +89,43 @@ export class ExecutionBuilderHttp implements IExecutionBuilder {
}
async registerValidator(registrations: bellatrix.SignedValidatorRegistrationV1[]): Promise {
- ApiError.assert(await this.api.registerValidator(registrations));
+ ApiError.assert(
+ await this.api.registerValidator(registrations),
+ "Failed to forward validator registrations to connected builder"
+ );
}
async getHeader(
+ fork: ForkExecution,
slot: Slot,
parentHash: Root,
proposerPubKey: BLSPubkey
): Promise<{
header: allForks.ExecutionPayloadHeader;
- blockValue: Wei;
- blobKzgCommitments?: deneb.BlobKzgCommitments;
+ executionPayloadValue: Wei;
+ blindedBlobsBundle?: deneb.BlindedBlobsBundle;
}> {
const res = await this.api.getHeader(slot, parentHash, proposerPubKey);
ApiError.assert(res, "execution.builder.getheader");
- const {header, value: blockValue} = res.response.data.message;
- const {blobKzgCommitments} = res.response.data.message as {blobKzgCommitments?: deneb.BlobKzgCommitments};
- return {header, blockValue, blobKzgCommitments};
+ const {header, value: executionPayloadValue} = res.response.data.message;
+ const {blindedBlobsBundle} = res.response.data.message as deneb.BuilderBid;
+ return {header, executionPayloadValue, blindedBlobsBundle};
}
- async submitBlindedBlock(signedBlock: allForks.SignedBlindedBeaconBlock): Promise {
- const res = await this.api.submitBlindedBlock(signedBlock);
+ async submitBlindedBlock(
+ signedBlindedBlockOrContents: allForks.SignedBlindedBeaconBlockOrContents
+ ): Promise {
+ const res = await this.api.submitBlindedBlock(signedBlindedBlockOrContents);
ApiError.assert(res, "execution.builder.submitBlindedBlock");
- const executionPayload = res.response.data;
- const expectedTransactionsRoot = signedBlock.message.body.executionPayloadHeader.transactionsRoot;
- const actualTransactionsRoot = ssz.bellatrix.Transactions.hashTreeRoot(res.response.data.transactions);
+ const {data} = res.response;
+
+ const {executionPayload, blobsBundle} = parseExecutionPayloadAndBlobsBundle(data);
+ const {signedBlindedBlock, signedBlindedBlobSidecars} =
+ parseSignedBlindedBlockOrContents(signedBlindedBlockOrContents);
+
+ // some validations for execution payload
+ const expectedTransactionsRoot = signedBlindedBlock.message.body.executionPayloadHeader.transactionsRoot;
+ const actualTransactionsRoot = ssz.bellatrix.Transactions.hashTreeRoot(executionPayload.transactions);
if (!byteArrayEquals(expectedTransactionsRoot, actualTransactionsRoot)) {
throw Error(
`Invalid transactionsRoot of the builder payload, expected=${toHexString(
@@ -116,10 +133,8 @@ export class ExecutionBuilderHttp implements IExecutionBuilder {
)}, actual=${toHexString(actualTransactionsRoot)}`
);
}
- const fullySignedBlock: bellatrix.SignedBeaconBlock = {
- ...signedBlock,
- message: {...signedBlock.message, body: {...signedBlock.message.body, executionPayload}},
- };
- return fullySignedBlock;
+
+ const blobs = blobsBundle ? blobsBundle.blobs : null;
+ return reconstructFullBlockOrContents({signedBlindedBlock, signedBlindedBlobSidecars}, {executionPayload, blobs});
}
}
diff --git a/packages/beacon-node/src/execution/builder/interface.ts b/packages/beacon-node/src/execution/builder/interface.ts
index 6e008e30f828..e9a2cabb69ef 100644
--- a/packages/beacon-node/src/execution/builder/interface.ts
+++ b/packages/beacon-node/src/execution/builder/interface.ts
@@ -1,4 +1,5 @@
import {allForks, bellatrix, Root, Slot, BLSPubkey, deneb, Wei} from "@lodestar/types";
+import {ForkExecution} from "@lodestar/params";
export interface IExecutionBuilder {
/**
@@ -17,13 +18,16 @@ export interface IExecutionBuilder {
checkStatus(): Promise;
registerValidator(registrations: bellatrix.SignedValidatorRegistrationV1[]): Promise;
getHeader(
+ fork: ForkExecution,
slot: Slot,
parentHash: Root,
proposerPubKey: BLSPubkey
): Promise<{
header: allForks.ExecutionPayloadHeader;
- blockValue: Wei;
- blobKzgCommitments?: deneb.BlobKzgCommitments;
+ executionPayloadValue: Wei;
+ blindedBlobsBundle?: deneb.BlindedBlobsBundle;
}>;
- submitBlindedBlock(signedBlock: allForks.SignedBlindedBeaconBlock): Promise;
+ submitBlindedBlock(
+ signedBlock: allForks.SignedBlindedBeaconBlockOrContents
+ ): Promise;
}
diff --git a/packages/beacon-node/src/execution/engine/http.ts b/packages/beacon-node/src/execution/engine/http.ts
index 331e24fa7955..70df97ba1e4a 100644
--- a/packages/beacon-node/src/execution/engine/http.ts
+++ b/packages/beacon-node/src/execution/engine/http.ts
@@ -5,13 +5,13 @@ import {
ErrorJsonRpcResponse,
HttpRpcError,
IJsonRpcHttpClient,
+ JsonRpcHttpClientEvent,
ReqOpts,
} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
-import {IJson, RpcPayload} from "../../eth1/interface.js";
import {
ExecutionPayloadStatus,
ExecutePayloadResponse,
@@ -55,6 +55,14 @@ export type ExecutionEngineHttpOpts = {
* +-5 seconds interval.
*/
jwtSecretHex?: string;
+ /**
+ * An identifier string passed as CLI arg that will be set in `id` field of jwt claims
+ */
+ jwtId?: string;
+ /**
+ * A version string that will be set in `clv` field of jwt claims
+ */
+ jwtVersion?: string;
};
export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
@@ -110,7 +118,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
private readonly rpcFetchQueue: JobItemQueue<[EngineRequest], EngineResponse>;
private jobQueueProcessor = async ({method, params, methodOpts}: EngineRequest): Promise => {
- return this.fetchWithRetries(
+ return this.rpc.fetchWithRetries(
{method, params},
methodOpts
);
@@ -126,22 +134,14 @@ export class ExecutionEngineHttp implements IExecutionEngine {
metrics?.engineHttpProcessorQueue
);
this.logger = logger;
- }
- protected async fetchWithRetries(payload: RpcPayload, opts?: ReqOpts): Promise {
- try {
- const res = await this.rpc.fetchWithRetries(payload, opts);
+ this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => {
+ this.updateEngineState(getExecutionEngineState({payloadError: error, oldState: this.state}));
+ });
+
+ this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => {
this.updateEngineState(getExecutionEngineState({targetState: ExecutionEngineState.ONLINE, oldState: this.state}));
- return res;
- } catch (err) {
- this.updateEngineState(getExecutionEngineState({payloadError: err, oldState: this.state}));
-
- /*
- * TODO: For some error cases as abort, we may not want to escalate the error to the caller
- * But for now the higher level code handles such cases so we can just rethrow the error
- */
- throw err;
- }
+ });
}
/**
@@ -363,14 +363,14 @@ export class ExecutionEngineHttp implements IExecutionEngine {
async getPayload(
fork: ForkName,
payloadId: PayloadId
- ): Promise<{executionPayload: allForks.ExecutionPayload; blockValue: Wei; blobsBundle?: BlobsBundle}> {
+ ): Promise<{executionPayload: allForks.ExecutionPayload; executionPayloadValue: Wei; blobsBundle?: BlobsBundle}> {
const method =
ForkSeq[fork] >= ForkSeq.deneb
? "engine_getPayloadV3"
: ForkSeq[fork] >= ForkSeq.capella
? "engine_getPayloadV2"
: "engine_getPayloadV1";
- const payloadResponse = await this.fetchWithRetries<
+ const payloadResponse = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>(
@@ -390,10 +390,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
async getPayloadBodiesByHash(blockHashes: RootHex[]): Promise<(ExecutionPayloadBody | null)[]> {
const method = "engine_getPayloadBodiesByHashV1";
assertReqSizeLimit(blockHashes.length, 32);
- const response = await this.fetchWithRetries<
+ const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
- >({method, params: blockHashes});
+ >({method, params: [blockHashes]});
return response.map(deserializeExecutionPayloadBody);
}
@@ -405,7 +405,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
assertReqSizeLimit(blockCount, 32);
const start = numToQuantity(startBlockNumber);
const count = numToQuantity(blockCount);
- const response = await this.fetchWithRetries<
+ const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [start, count]});
diff --git a/packages/beacon-node/src/execution/engine/index.ts b/packages/beacon-node/src/execution/engine/index.ts
index 210cba5fb489..743abf203de9 100644
--- a/packages/beacon-node/src/execution/engine/index.ts
+++ b/packages/beacon-node/src/execution/engine/index.ts
@@ -36,6 +36,8 @@ export function getExecutionEngineHttp(
signal: modules.signal,
metrics: modules.metrics?.executionEnginerHttpClient,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
+ jwtId: opts.jwtId,
+ jwtVersion: opts.jwtVersion,
});
return new ExecutionEngineHttp(rpc, modules);
}
diff --git a/packages/beacon-node/src/execution/engine/interface.ts b/packages/beacon-node/src/execution/engine/interface.ts
index f2ce00e43a45..9a7ee3963379 100644
--- a/packages/beacon-node/src/execution/engine/interface.ts
+++ b/packages/beacon-node/src/execution/engine/interface.ts
@@ -2,11 +2,11 @@ import {ForkName} from "@lodestar/params";
import {KZGCommitment, Blob, KZGProof} from "@lodestar/types/deneb";
import {Root, RootHex, allForks, capella, Wei} from "@lodestar/types";
-import {DATA, QUANTITY} from "../../eth1/provider/utils.js";
+import {DATA} from "../../eth1/provider/utils.js";
import {PayloadIdCache, PayloadId, WithdrawalV1} from "./payloadIdCache.js";
import {ExecutionPayloadBody} from "./types.js";
-export {PayloadIdCache, PayloadId, WithdrawalV1};
+export {PayloadIdCache, type PayloadId, type WithdrawalV1};
export enum ExecutionPayloadStatus {
/** given payload is valid */
@@ -70,12 +70,6 @@ export type PayloadAttributes = {
parentBeaconBlockRoot?: Uint8Array;
};
-export type TransitionConfigurationV1 = {
- terminalTotalDifficulty: QUANTITY;
- terminalBlockHash: DATA;
- terminalBlockNumber: QUANTITY;
-};
-
export type BlobsBundle = {
/**
* Execution payload `blockHash` for the caller to sanity-check the consistency with the `engine_getPayload` call
@@ -142,7 +136,7 @@ export interface IExecutionEngine {
getPayload(
fork: ForkName,
payloadId: PayloadId
- ): Promise<{executionPayload: allForks.ExecutionPayload; blockValue: Wei; blobsBundle?: BlobsBundle}>;
+ ): Promise<{executionPayload: allForks.ExecutionPayload; executionPayloadValue: Wei; blobsBundle?: BlobsBundle}>;
getPayloadBodiesByHash(blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;
diff --git a/packages/beacon-node/src/execution/engine/mock.ts b/packages/beacon-node/src/execution/engine/mock.ts
index c64528552ea4..83a5ea3a7ed6 100644
--- a/packages/beacon-node/src/execution/engine/mock.ts
+++ b/packages/beacon-node/src/execution/engine/mock.ts
@@ -136,7 +136,7 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
*/
private notifyNewPayload(
executionPayloadRpc: EngineApiRpcParamTypes["engine_newPayloadV1"][0],
- // TODO deneb: add versionedHashes validation
+ // add versionedHashes validation later if required
_versionedHashes?: EngineApiRpcParamTypes["engine_newPayloadV3"][1]
): EngineApiRpcReturnTypes["engine_newPayloadV1"] {
const blockHash = executionPayloadRpc.blockHash;
diff --git a/packages/beacon-node/src/execution/engine/types.ts b/packages/beacon-node/src/execution/engine/types.ts
index d400df03a585..4f24480e0b96 100644
--- a/packages/beacon-node/src/execution/engine/types.ts
+++ b/packages/beacon-node/src/execution/engine/types.ts
@@ -55,7 +55,7 @@ export type EngineApiRpcParamTypes = {
/**
* 1. Array of DATA - Array of block_hash field values of the ExecutionPayload structure
* */
- engine_getPayloadBodiesByHashV1: DATA[];
+ engine_getPayloadBodiesByHashV1: DATA[][];
/**
* 1. start: QUANTITY, 64 bits - Starting block number
@@ -102,12 +102,13 @@ export type EngineApiRpcReturnTypes = {
engine_getPayloadBodiesByRangeV1: (ExecutionPayloadBodyRpc | null)[];
};
-type ExecutionPayloadRpcWithBlockValue = {
+type ExecutionPayloadRpcWithValue = {
executionPayload: ExecutionPayloadRpc;
+ // even though CL tracks this as executionPayloadValue, EL returns this as blockValue
blockValue: QUANTITY;
blobsBundle?: BlobsBundleRpc;
};
-type ExecutionPayloadResponse = ExecutionPayloadRpc | ExecutionPayloadRpcWithBlockValue;
+type ExecutionPayloadResponse = ExecutionPayloadRpc | ExecutionPayloadRpcWithValue;
export type ExecutionPayloadBodyRpc = {transactions: DATA[]; withdrawals: WithdrawalV1[] | null};
@@ -199,25 +200,25 @@ export function serializeVersionedHashes(vHashes: VersionedHashes): VersionedHas
return vHashes.map(bytesToData);
}
-export function hasBlockValue(response: ExecutionPayloadResponse): response is ExecutionPayloadRpcWithBlockValue {
- return (response as ExecutionPayloadRpcWithBlockValue).blockValue !== undefined;
+export function hasPayloadValue(response: ExecutionPayloadResponse): response is ExecutionPayloadRpcWithValue {
+ return (response as ExecutionPayloadRpcWithValue).blockValue !== undefined;
}
export function parseExecutionPayload(
fork: ForkName,
response: ExecutionPayloadResponse
-): {executionPayload: allForks.ExecutionPayload; blockValue: Wei; blobsBundle?: BlobsBundle} {
+): {executionPayload: allForks.ExecutionPayload; executionPayloadValue: Wei; blobsBundle?: BlobsBundle} {
let data: ExecutionPayloadRpc;
- let blockValue: Wei;
+ let executionPayloadValue: Wei;
let blobsBundle: BlobsBundle | undefined;
- if (hasBlockValue(response)) {
- blockValue = quantityToBigint(response.blockValue);
+ if (hasPayloadValue(response)) {
+ executionPayloadValue = quantityToBigint(response.blockValue);
data = response.executionPayload;
blobsBundle = response.blobsBundle ? parseBlobsBundle(response.blobsBundle) : undefined;
} else {
data = response;
// Just set it to zero as default
- blockValue = BigInt(0);
+ executionPayloadValue = BigInt(0);
blobsBundle = undefined;
}
@@ -268,7 +269,7 @@ export function parseExecutionPayload(
(executionPayload as deneb.ExecutionPayload).excessBlobGas = quantityToBigint(excessBlobGas);
}
- return {executionPayload, blockValue, blobsBundle};
+ return {executionPayload, executionPayloadValue, blobsBundle};
}
export function serializePayloadAttributes(data: PayloadAttributes): PayloadAttributesRpc {
diff --git a/packages/beacon-node/src/execution/engine/utils.ts b/packages/beacon-node/src/execution/engine/utils.ts
index 13ad8d855062..e661af8daf70 100644
--- a/packages/beacon-node/src/execution/engine/utils.ts
+++ b/packages/beacon-node/src/execution/engine/utils.ts
@@ -1,7 +1,13 @@
import {isFetchError} from "@lodestar/api";
import {isErrorAborted} from "@lodestar/utils";
import {IJson, RpcPayload} from "../../eth1/interface.js";
-import {IJsonRpcHttpClient, ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js";
+import {
+ IJsonRpcHttpClient,
+ ErrorJsonRpcResponse,
+ HttpRpcError,
+ JsonRpcHttpClientEventEmitter,
+ JsonRpcHttpClientEvent,
+} from "../../eth1/provider/jsonRpcHttpClient.js";
import {isQueueErrorAborted} from "../../util/queue/errors.js";
import {ExecutionPayloadStatus, ExecutionEngineState} from "./interface.js";
@@ -11,16 +17,20 @@ export type JsonRpcBackend = {
};
export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient {
+ readonly emitter = new JsonRpcHttpClientEventEmitter();
+
constructor(private readonly backend: JsonRpcBackend) {}
async fetch(payload: RpcPayload): Promise {
- const handler = this.backend.handlers[payload.method];
- if (handler === undefined) {
- throw Error(`Unknown method ${payload.method}`);
- }
-
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- return handler(...(payload.params as any[])) as R;
+ return this.wrapWithEvents(async () => {
+ const handler = this.backend.handlers[payload.method];
+ if (handler === undefined) {
+ throw Error(`Unknown method ${payload.method}`);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ return handler(...(payload.params as any[])) as R;
+ }, payload);
}
fetchWithRetries(payload: RpcPayload): Promise {
@@ -30,6 +40,17 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient {
fetchBatch(rpcPayloadArr: RpcPayload[]): Promise {
return Promise.all(rpcPayloadArr.map((payload) => this.fetch(payload)));
}
+
+ private async wrapWithEvents(func: () => Promise, payload?: unknown): Promise {
+ try {
+ const response = await func();
+ this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response});
+ return response;
+ } catch (error) {
+ this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error});
+ throw error;
+ }
+ }
}
export const HTTP_FATAL_ERROR_CODES = ["ECONNREFUSED", "ENOTFOUND", "EAI_AGAIN"];
diff --git a/packages/beacon-node/src/index.ts b/packages/beacon-node/src/index.ts
index 69adb74d5471..aa555a1ab0ca 100644
--- a/packages/beacon-node/src/index.ts
+++ b/packages/beacon-node/src/index.ts
@@ -1,17 +1,23 @@
export {initStateFromAnchorState, initStateFromDb, initStateFromEth1} from "./chain/index.js";
-export {BeaconDb, IBeaconDb} from "./db/index.js";
-export {Eth1Provider, IEth1Provider} from "./eth1/index.js";
-export {createNodeJsLibp2p, NodeJsLibp2pOpts} from "./network/index.js";
+export {BeaconDb, type IBeaconDb} from "./db/index.js";
+export {Eth1Provider, type IEth1Provider} from "./eth1/index.js";
+export {createNodeJsLibp2p, type NodeJsLibp2pOpts} from "./network/index.js";
export * from "./node/index.js";
// Export metrics utilities to de-duplicate validator metrics
-export {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer, getHttpMetricsServer} from "./metrics/index.js";
+export {
+ RegistryMetricCreator,
+ collectNodeJSMetrics,
+ type HttpMetricsServer,
+ getHttpMetricsServer,
+} from "./metrics/index.js";
// Export monitoring service to make it usable by validator
export {MonitoringService} from "./monitoring/index.js";
// Export generic RestApi server for CLI
-export {RestApiServer, RestApiServerOpts, RestApiServerModules, RestApiServerMetrics} from "./api/rest/base.js";
+export {RestApiServer} from "./api/rest/base.js";
+export type {RestApiServerOpts, RestApiServerModules, RestApiServerMetrics} from "./api/rest/base.js";
// Export type util for CLI - TEMP move to lodestar-types eventually
export {getStateTypeFromBytes} from "./util/multifork.js";
diff --git a/packages/beacon-node/src/metrics/metrics.ts b/packages/beacon-node/src/metrics/metrics.ts
index 5adb4bffac97..58a48e34bbd5 100644
--- a/packages/beacon-node/src/metrics/metrics.ts
+++ b/packages/beacon-node/src/metrics/metrics.ts
@@ -25,7 +25,7 @@ export function createMetrics(
const lodestar = createLodestarMetrics(register, opts.metadata, anchorState);
const genesisTime = anchorState.genesisTime;
- const validatorMonitor = createValidatorMonitor(lodestar, config, genesisTime, logger);
+ const validatorMonitor = createValidatorMonitor(lodestar, config, genesisTime, logger, opts);
// Register a single collect() function to run all validatorMonitor metrics
lodestar.validatorMonitor.validatorsConnected.addCollect(() => {
const clockSlot = getCurrentSlot(config, genesisTime);
diff --git a/packages/beacon-node/src/metrics/metrics/beacon.ts b/packages/beacon-node/src/metrics/metrics/beacon.ts
index 25f842e635af..2b763599f6e1 100644
--- a/packages/beacon-node/src/metrics/metrics/beacon.ts
+++ b/packages/beacon-node/src/metrics/metrics/beacon.ts
@@ -138,6 +138,25 @@ export function createBeaconMetrics(register: RegistryMetricCreator) {
labelNames: ["source"],
}),
+ blockProductionCaches: {
+ producedBlockRoot: register.gauge({
+ name: "beacon_blockroot_produced_cache_total",
+ help: "Count of cached produded block roots",
+ }),
+ producedBlindedBlockRoot: register.gauge({
+ name: "beacon_blinded_blockroot_produced_cache_total",
+ help: "Count of cached produded blinded block roots",
+ }),
+ producedBlobSidecarsCache: register.gauge({
+ name: "beacon_blobsidecars_produced_cache_total",
+ help: "Count of cached produced blob sidecars",
+ }),
+ producedBlindedBlobSidecarsCache: register.gauge({
+ name: "beacon_blinded_blobsidecars_produced_cache_total",
+ help: "Count of cached produced blinded blob sidecars",
+ }),
+ },
+
blockPayload: {
payloadAdvancePrepTime: register.histogram({
name: "beacon_block_payload_prepare_time",
diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts
index 840d47dfbd06..b6f0d78f3b06 100644
--- a/packages/beacon-node/src/metrics/metrics/lodestar.ts
+++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts
@@ -121,6 +121,12 @@ export function createLodestarMetrics(
help: "Current count of pending items in reqRespBridgeReqCaller data structure",
}),
},
+ networkWorkerWireEventsOnMainThreadLatency: register.histogram<"eventName">({
+ name: "lodestar_network_worker_wire_events_on_main_thread_latency_seconds",
+ help: "Latency in seconds to transmit network events to main thread across worker port",
+ labelNames: ["eventName"],
+ buckets: [0.001, 0.003, 0.01, 0.03, 0.1],
+ }),
regenQueue: {
length: register.gauge({
@@ -447,6 +453,10 @@ export function createLodestarMetrics(
name: "lodestar_bls_thread_pool_batchable_sig_sets_total",
help: "Count of total batchable signature sets",
}),
+ signatureDeserializationMainThreadDuration: register.gauge({
+ name: "lodestar_bls_thread_pool_signature_deserialization_main_thread_time_seconds",
+ help: "Total time spent deserializing signatures on main thread",
+ }),
},
// BLS time on single thread mode
@@ -640,6 +650,13 @@ export function createLodestarMetrics(
labelNames: ["error"],
}),
},
+ gossipBlob: {
+ receivedToGossipValidate: register.histogram({
+ name: "lodestar_gossip_blob_received_to_gossip_validate",
+ help: "Time elapsed between blob received and blob validated",
+ buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
+ }),
+ },
importBlock: {
persistBlockNoSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_no_serialized_data_count",
diff --git a/packages/beacon-node/src/metrics/options.ts b/packages/beacon-node/src/metrics/options.ts
index 9a19e033a291..1abfb6c7ffd1 100644
--- a/packages/beacon-node/src/metrics/options.ts
+++ b/packages/beacon-node/src/metrics/options.ts
@@ -1,4 +1,5 @@
import {HttpMetricsServerOpts} from "./server/index.js";
+import {ValidatorMonitorOpts} from "./validatorMonitor.js";
export type LodestarMetadata = {
/** "v0.16.0/developer/feature-1/ac99f2b5" */
@@ -9,11 +10,12 @@ export type LodestarMetadata = {
network: string;
};
-export type MetricsOptions = HttpMetricsServerOpts & {
- enabled: boolean;
- /** Optional metadata to send to Prometheus */
- metadata?: LodestarMetadata;
-};
+export type MetricsOptions = ValidatorMonitorOpts &
+ HttpMetricsServerOpts & {
+ enabled: boolean;
+ /** Optional metadata to send to Prometheus */
+ metadata?: LodestarMetadata;
+ };
export const defaultMetricsOptions: MetricsOptions = {
enabled: false,
diff --git a/packages/beacon-node/src/metrics/validatorMonitor.ts b/packages/beacon-node/src/metrics/validatorMonitor.ts
index 83d0647ea7fc..7bae06d8a170 100644
--- a/packages/beacon-node/src/metrics/validatorMonitor.ts
+++ b/packages/beacon-node/src/metrics/validatorMonitor.ts
@@ -9,12 +9,13 @@ import {
getBlockRootAtSlot,
ParticipationFlags,
} from "@lodestar/state-transition";
-import {Logger, MapDef, MapDefMax, toHex} from "@lodestar/utils";
+import {LogData, LogHandler, LogLevel, Logger, MapDef, MapDefMax, toHex} from "@lodestar/utils";
import {RootHex, allForks, altair, deneb} from "@lodestar/types";
import {ChainConfig, ChainForkConfig} from "@lodestar/config";
import {ForkSeq, INTERVALS_PER_SLOT, MIN_ATTESTATION_INCLUSION_DELAY, SLOTS_PER_EPOCH} from "@lodestar/params";
import {Epoch, Slot, ValidatorIndex} from "@lodestar/types";
import {IndexedAttestation, SignedAggregateAndProof} from "@lodestar/types/phase0";
+import {GENESIS_SLOT} from "../constants/constants.js";
import {LodestarMetrics} from "./metrics/lodestar.js";
/** The validator monitor collects per-epoch data about each monitored validator.
@@ -76,6 +77,11 @@ export type ValidatorMonitor = {
scrapeMetrics(slotClock: Slot): void;
};
+export type ValidatorMonitorOpts = {
+ /** Log validator monitor events as info */
+ validatorMonitorLogs?: boolean;
+};
+
/** Information required to reward some validator during the current and previous epoch. */
type ValidatorStatus = {
/** True if the validator has been slashed, ever. */
@@ -136,7 +142,7 @@ type EpochSummary = {
/** The delay between when the attestation should have been produced and when it was observed. */
attestationMinDelay: Seconds | null;
/** The number of times a validators attestation was seen in an aggregate. */
- attestationAggregateIncusions: number;
+ attestationAggregateInclusions: number;
/** The number of times a validators attestation was seen in a block. */
attestationBlockInclusions: number;
/** The minimum observed inclusion distance for an attestation for this epoch.. */
@@ -176,7 +182,7 @@ function getEpochSummary(validator: MonitoredValidator, epoch: Epoch): EpochSumm
summary = {
attestations: 0,
attestationMinDelay: null,
- attestationAggregateIncusions: 0,
+ attestationAggregateInclusions: 0,
attestationBlockInclusions: 0,
attestationMinBlockInclusionDistance: null,
blocks: 0,
@@ -239,8 +245,14 @@ export function createValidatorMonitor(
metrics: LodestarMetrics,
config: ChainForkConfig,
genesisTime: number,
- logger: Logger
+ logger: Logger,
+ opts: ValidatorMonitorOpts
): ValidatorMonitor {
+ const logLevel = opts.validatorMonitorLogs ? LogLevel.info : LogLevel.debug;
+ const log: LogHandler = (message: string, context?: LogData) => {
+ logger[logLevel](message, context);
+ };
+
/** The validators that require additional monitoring. */
const validators = new MapDef(() => ({
summaries: new Map(),
@@ -345,8 +357,8 @@ export function createValidatorMonitor(
}
if (!summary.isPrevSourceAttester || !summary.isPrevTargetAttester || !summary.isPrevHeadAttester) {
- logger.debug("Failed attestation in previous epoch", {
- validatorIndex: index,
+ log("Failed attestation in previous epoch", {
+ validator: index,
prevEpoch: currentEpoch - 1,
isPrevSourceAttester: summary.isPrevSourceAttester,
isPrevHeadAttester: summary.isPrevHeadAttester,
@@ -411,13 +423,13 @@ export function createValidatorMonitor(
if (validator) {
metrics.validatorMonitor.unaggregatedAttestationSubmittedSentPeers.observe(sentPeers);
metrics.validatorMonitor.unaggregatedAttestationDelaySeconds.observe({src: OpSource.api}, delaySec);
- logger.debug("Local validator published unaggregated attestation", {
- validatorIndex: index,
+ log("Published unaggregated attestation", {
+ validator: index,
slot: data.slot,
committeeIndex: data.index,
subnet,
sentPeers,
- delaySec,
+ delaySec: delaySec.toFixed(4),
});
const attestationSummary = validator.attestations
@@ -461,12 +473,12 @@ export function createValidatorMonitor(
const validator = validators.get(index);
if (validator) {
metrics.validatorMonitor.aggregatedAttestationDelaySeconds.observe({src: OpSource.api}, delaySec);
- logger.debug("Local validator published aggregated attestation", {
- validatorIndex: index,
+ log("Published aggregated attestation", {
+ validator: index,
slot: data.slot,
committeeIndex: data.index,
sentPeers,
- delaySec,
+ delaySec: delaySec.toFixed(4),
});
validator.attestations
@@ -481,15 +493,15 @@ export function createValidatorMonitor(
const src = OpSource.gossip;
const data = indexedAttestation.data;
const epoch = computeEpochAtSlot(data.slot);
- // Returns the duration between when a `AggregateAndproof` with `data` could be produced (2/3rd through the slot) and `seenTimestamp`.
+ // Returns the duration between when a `AggregateAndProof` with `data` could be produced (2/3rd through the slot) and `seenTimestamp`.
const delaySec = seenTimestampSec - (genesisTime + (data.slot + 2 / 3) * config.SECONDS_PER_SLOT);
const aggregatorIndex = signedAggregateAndProof.message.aggregatorIndex;
- const validtorAggregator = validators.get(aggregatorIndex);
- if (validtorAggregator) {
+ const validatorAggregator = validators.get(aggregatorIndex);
+ if (validatorAggregator) {
metrics.validatorMonitor.aggregatedAttestationTotal.inc({src});
metrics.validatorMonitor.aggregatedAttestationDelaySeconds.observe({src}, delaySec);
- const summary = getEpochSummary(validtorAggregator, epoch);
+ const summary = getEpochSummary(validatorAggregator, epoch);
summary.aggregates += 1;
summary.aggregateMinDelay = Math.min(delaySec, summary.aggregateMinDelay ?? Infinity);
}
@@ -500,11 +512,12 @@ export function createValidatorMonitor(
metrics.validatorMonitor.attestationInAggregateTotal.inc({src});
metrics.validatorMonitor.attestationInAggregateDelaySeconds.observe({src}, delaySec);
const summary = getEpochSummary(validator, epoch);
- summary.attestationAggregateIncusions += 1;
- logger.debug("Local validator attestation is included in AggregatedAndProof", {
- validatorIndex: index,
+ summary.attestationAggregateInclusions += 1;
+ log("Attestation is included in aggregate", {
+ validator: index,
slot: data.slot,
committeeIndex: data.index,
+ aggregatorIndex,
});
validator.attestations
@@ -562,8 +575,8 @@ export function createValidatorMonitor(
attestationSlot: indexedAttestation.data.slot,
});
- logger.debug("Local validator attestation is included in block", {
- validatorIndex: index,
+ log("Attestation is included in block", {
+ validator: index,
slot: data.slot,
committeeIndex: data.index,
inclusionDistance,
@@ -607,6 +620,11 @@ export function createValidatorMonitor(
// To guard against short re-orgs it will track the status of epoch N at the end of epoch N+1.
// This function **SHOULD** be called at the last slot of an epoch to have max possible information.
onceEveryEndOfEpoch(headState) {
+ if (headState.slot <= GENESIS_SLOT) {
+ // Before genesis, there won't be any validator activity
+ return;
+ }
+
// Prune validators not seen in a while
for (const [index, validator] of validators.entries()) {
if (Date.now() - validator.lastRegisteredTimeMs > RETAIN_REGISTERED_VALIDATORS_MS) {
@@ -627,8 +645,12 @@ export function createValidatorMonitor(
for (const [index, validator] of validators.entries()) {
const flags = parseParticipationFlags(previousEpochParticipation.get(index));
const attestationSummary = validator.attestations.get(prevEpoch)?.get(prevEpochTargetRoot);
- metrics.validatorMonitor.prevEpochAttestationSummary.inc({
- summary: renderAttestationSummary(config, rootCache, attestationSummary, flags),
+ const summary = renderAttestationSummary(config, rootCache, attestationSummary, flags);
+ metrics.validatorMonitor.prevEpochAttestationSummary.inc({summary});
+ log("Previous epoch attestation", {
+ validator: index,
+ epoch: prevEpoch,
+ summary,
});
}
}
@@ -639,9 +661,15 @@ export function createValidatorMonitor(
const validator = validators.get(validatorIndex);
if (validator) {
// If expected proposer is a tracked validator
- const summary = validator.summaries.get(prevEpoch);
- metrics.validatorMonitor.prevEpochBlockProposalSummary.inc({
- summary: renderBlockProposalSummary(config, rootCache, summary, SLOTS_PER_EPOCH * prevEpoch + slotIndex),
+ const epochSummary = validator.summaries.get(prevEpoch);
+ const proposalSlot = SLOTS_PER_EPOCH * prevEpoch + slotIndex;
+ const summary = renderBlockProposalSummary(config, rootCache, epochSummary, proposalSlot);
+ metrics.validatorMonitor.prevEpochBlockProposalSummary.inc({summary});
+ log("Previous epoch block proposal", {
+ validator: validatorIndex,
+ slot: proposalSlot,
+ epoch: prevEpoch,
+ summary,
});
}
}
@@ -698,7 +726,9 @@ export function createValidatorMonitor(
metrics.validatorMonitor.prevEpochAttestations.observe(summary.attestations);
if (summary.attestationMinDelay !== null)
metrics.validatorMonitor.prevEpochAttestationsMinDelaySeconds.observe(summary.attestationMinDelay);
- metrics.validatorMonitor.prevEpochAttestationAggregateInclusions.observe(summary.attestationAggregateIncusions);
+ metrics.validatorMonitor.prevEpochAttestationAggregateInclusions.observe(
+ summary.attestationAggregateInclusions
+ );
metrics.validatorMonitor.prevEpochAttestationBlockInclusions.observe(summary.attestationBlockInclusions);
if (summary.attestationMinBlockInclusionDistance !== null) {
metrics.validatorMonitor.prevEpochAttestationBlockMinInclusionDistance.observe(
@@ -968,8 +998,8 @@ function renderBlockProposalSummary(
}
if (rootCache.getBlockRootAtSlot(proposalSlot) === proposal.blockRoot) {
- // Cannonical state includes our block
- return "cannonical";
+ // Canonical state includes our block
+ return "canonical";
}
let out = "orphaned";
diff --git a/packages/beacon-node/src/monitoring/service.ts b/packages/beacon-node/src/monitoring/service.ts
index 66ed14ee42d0..f50f992ebe1f 100644
--- a/packages/beacon-node/src/monitoring/service.ts
+++ b/packages/beacon-node/src/monitoring/service.ts
@@ -186,9 +186,9 @@ export class MonitoringService {
// error was thrown by abort signal
if (signal.reason === FetchAbortReason.Close) {
- throw new ErrorAborted(`request to ${this.remoteServiceHost}`);
+ throw new ErrorAborted("request");
} else if (signal.reason === FetchAbortReason.Timeout) {
- throw new TimeoutError(`reached for request to ${this.remoteServiceHost}`);
+ throw new TimeoutError("request");
} else {
throw e;
}
diff --git a/packages/beacon-node/src/network/core/metrics.ts b/packages/beacon-node/src/network/core/metrics.ts
index 78bc88d52fe7..e5ce0bede447 100644
--- a/packages/beacon-node/src/network/core/metrics.ts
+++ b/packages/beacon-node/src/network/core/metrics.ts
@@ -333,6 +333,8 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
};
}
+export type NetworkCoreWorkerMetrics = ReturnType;
+
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export function getNetworkCoreWorkerMetrics(register: RegistryMetricCreator) {
return {
@@ -340,5 +342,11 @@ export function getNetworkCoreWorkerMetrics(register: RegistryMetricCreator) {
name: "lodestar_network_worker_reqresp_bridge_caller_pending_count",
help: "Current count of pending elements in respBridgeCaller",
}),
+ networkWorkerWireEventsOnWorkerThreadLatency: register.histogram<"eventName">({
+ name: "lodestar_network_worker_wire_events_on_worker_thread_latency_seconds",
+ help: "Latency in seconds to transmit network events to worker thread across parent port",
+ labelNames: ["eventName"],
+ buckets: [0.001, 0.003, 0.01, 0.03, 0.1],
+ }),
};
}
diff --git a/packages/beacon-node/src/network/core/networkCore.ts b/packages/beacon-node/src/network/core/networkCore.ts
index 34733739a996..498e556b040e 100644
--- a/packages/beacon-node/src/network/core/networkCore.ts
+++ b/packages/beacon-node/src/network/core/networkCore.ts
@@ -9,6 +9,7 @@ import {routes} from "@lodestar/api";
import {BeaconConfig} from "@lodestar/config";
import type {LoggerNode} from "@lodestar/logger/node";
import {Epoch, phase0} from "@lodestar/types";
+import {withTimeout} from "@lodestar/utils";
import {ForkName} from "@lodestar/params";
import {ResponseIncoming} from "@lodestar/reqresp";
import {Libp2p} from "../interface.js";
@@ -268,7 +269,10 @@ export class NetworkCore implements INetworkCore {
this.logger.debug("network reqResp closed");
this.attnetsService.close();
this.syncnetsService.close();
- await this.libp2p.stop();
+ // In some cases, `libp2p.stop` never resolves, it is required
+ // to wrap the call with a timeout to allow for a timely shutdown
+ // See https://github.com/ChainSafe/lodestar/issues/6053
+ await withTimeout(async () => this.libp2p.stop(), 5000);
this.logger.debug("network lib2p closed");
this.closed = true;
diff --git a/packages/beacon-node/src/network/core/networkCoreWorker.ts b/packages/beacon-node/src/network/core/networkCoreWorker.ts
index ef3408038343..35303190a8f8 100644
--- a/packages/beacon-node/src/network/core/networkCoreWorker.ts
+++ b/packages/beacon-node/src/network/core/networkCoreWorker.ts
@@ -1,18 +1,18 @@
-import worker from "node:worker_threads";
import fs from "node:fs";
import path from "node:path";
-import {createFromProtobuf} from "@libp2p/peer-id-factory";
+import worker from "node:worker_threads";
+import type {ModuleThread} from "@chainsafe/threads";
import {expose} from "@chainsafe/threads/worker";
-import type {WorkerModule} from "@chainsafe/threads/dist/types/worker.js";
+import {createFromProtobuf} from "@libp2p/peer-id-factory";
import {chainConfigFromJson, createBeaconConfig} from "@lodestar/config";
import {getNodeLogger} from "@lodestar/logger/node";
-import {collectNodeJSMetrics, RegistryMetricCreator} from "../../metrics/index.js";
+import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js";
import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
import {Clock} from "../../util/clock.js";
-import {wireEventsOnWorkerThread} from "../../util/workerEvents.js";
-import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {peerIdToString} from "../../util/peerId.js";
import {profileNodeJS} from "../../util/profile.js";
+import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
+import {wireEventsOnWorkerThread} from "../../util/workerEvents.js";
import {getNetworkCoreWorkerMetrics} from "./metrics.js";
import {NetworkWorkerApi, NetworkWorkerData} from "./types.js";
import {NetworkCore} from "./networkCore.js";
@@ -25,7 +25,7 @@ import {
reqRespBridgeEventDirection,
} from "./events.js";
-// Cloned data from instatiation
+// Cloned data from instantiation
const workerData = worker.workerData as NetworkWorkerData;
const parentPort = worker.parentPort;
// eslint-disable-next-line @typescript-eslint/strict-boolean-expressions
@@ -83,9 +83,9 @@ new AsyncIterableBridgeHandler(getReqRespBridgeReqEvents(reqRespBridgeEventBus),
);
const reqRespBridgeRespCaller = new AsyncIterableBridgeCaller(getReqRespBridgeRespEvents(reqRespBridgeEventBus));
+const networkCoreWorkerMetrics = metricsRegister ? getNetworkCoreWorkerMetrics(metricsRegister) : null;
// respBridgeCaller metrics
-if (metricsRegister) {
- const networkCoreWorkerMetrics = getNetworkCoreWorkerMetrics(metricsRegister);
+if (networkCoreWorkerMetrics) {
networkCoreWorkerMetrics.reqRespBridgeRespCallerPending.addCollect(() => {
networkCoreWorkerMetrics.reqRespBridgeRespCallerPending.set(reqRespBridgeRespCaller.pendingCount);
});
@@ -110,19 +110,21 @@ wireEventsOnWorkerThread(
NetworkWorkerThreadEventType.networkEvent,
events,
parentPort,
+ networkCoreWorkerMetrics,
networkEventDirection
);
wireEventsOnWorkerThread(
NetworkWorkerThreadEventType.reqRespBridgeEvents,
reqRespBridgeEventBus,
parentPort,
+ networkCoreWorkerMetrics,
reqRespBridgeEventDirection
);
const libp2pWorkerApi: NetworkWorkerApi = {
- close: () => {
+ close: async () => {
abortController.abort();
- return core.close();
+ await core.close();
},
scrapeMetrics: () => core.scrapeMetrics(),
@@ -162,4 +164,4 @@ const libp2pWorkerApi: NetworkWorkerApi = {
},
};
-expose(libp2pWorkerApi as WorkerModule);
+expose(libp2pWorkerApi as ModuleThread);
diff --git a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts
index 46c06456c429..ddffb5fc1460 100644
--- a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts
+++ b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts
@@ -1,24 +1,24 @@
+import path from "node:path";
import worker_threads from "node:worker_threads";
-import {exportToProtobuf} from "@libp2p/peer-id-factory";
-import {PeerId} from "@libp2p/interface/peer-id";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/dist/src/score/peer-score.js";
import {PublishOpts} from "@chainsafe/libp2p-gossipsub/types";
-import {spawn, Thread, Worker} from "@chainsafe/threads";
+import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads";
+import {PeerId} from "@libp2p/interface/peer-id";
+import {exportToProtobuf} from "@libp2p/peer-id-factory";
import {routes} from "@lodestar/api";
-import {phase0} from "@lodestar/types";
-import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp";
import {BeaconConfig, chainConfigToJson} from "@lodestar/config";
import type {LoggerNode} from "@lodestar/logger/node";
-import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
-import {wireEventsOnMainThread} from "../../util/workerEvents.js";
+import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp";
+import {phase0} from "@lodestar/types";
import {Metrics} from "../../metrics/index.js";
-import {IncomingRequestArgs, OutgoingRequestArgs, GetReqRespHandlerFn} from "../reqresp/types.js";
+import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
+import {peerIdFromString} from "../../util/peerId.js";
+import {terminateWorkerThread, wireEventsOnMainThread} from "../../util/workerEvents.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
-import {CommitteeSubscription} from "../subnets/interface.js";
-import {PeerAction, PeerScoreStats} from "../peers/index.js";
import {NetworkOptions} from "../options.js";
-import {peerIdFromString} from "../../util/peerId.js";
-import {NetworkWorkerApi, NetworkWorkerData, INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js";
+import {PeerAction, PeerScoreStats} from "../peers/index.js";
+import {GetReqRespHandlerFn, IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js";
+import {CommitteeSubscription} from "../subnets/interface.js";
import {
NetworkWorkerThreadEventType,
ReqRespBridgeEventBus,
@@ -27,6 +27,10 @@ import {
getReqRespBridgeRespEvents,
reqRespBridgeEventDirection,
} from "./events.js";
+import {INetworkCore, MultiaddrStr, NetworkWorkerApi, NetworkWorkerData, PeerIdStr} from "./types.js";
+
+// Worker constructor consider the path relative to the current working directory
+const workerDir = process.env.NODE_ENV === "test" ? "../../../lib/network/core/" : "./";
export type WorkerNetworkCoreOpts = NetworkOptions & {
metricsEnabled: boolean;
@@ -47,10 +51,13 @@ export type WorkerNetworkCoreInitModules = {
};
type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & {
- workerApi: NetworkWorkerApi;
+ networkThreadApi: ModuleThread;
worker: Worker;
};
+const NETWORK_WORKER_EXIT_TIMEOUT_MS = 1000;
+const NETWORK_WORKER_EXIT_RETRY_COUNT = 3;
+
/**
* NetworkCore implementation using a Worker thread
*/
@@ -72,15 +79,21 @@ export class WorkerNetworkCore implements INetworkCore {
NetworkWorkerThreadEventType.networkEvent,
modules.events,
modules.worker as unknown as worker_threads.Worker,
+ modules.metrics,
networkEventDirection
);
wireEventsOnMainThread(
NetworkWorkerThreadEventType.reqRespBridgeEvents,
this.reqRespBridgeEventBus,
modules.worker as unknown as worker_threads.Worker,
+ modules.metrics,
reqRespBridgeEventDirection
);
+ Thread.errors(modules.networkThreadApi).subscribe((err) => {
+ this.modules.logger.error("Network worker thread error", {}, err);
+ });
+
const {metrics} = modules;
if (metrics) {
metrics.networkWorkerHandler.reqRespBridgeReqCallerPending.addCollect(() => {
@@ -107,7 +120,7 @@ export class WorkerNetworkCore implements INetworkCore {
loggerOpts: modules.logger.toOpts(),
};
- const worker = new Worker("./networkCoreWorker.js", {
+ const worker = new Worker(path.join(workerDir, "networkCoreWorker.js"), {
workerData,
/**
* maxYoungGenerationSizeMb defaults to 152mb through the cli option defaults.
@@ -124,24 +137,30 @@ export class WorkerNetworkCore implements INetworkCore {
} as ConstructorParameters[1]);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- const workerApi = (await spawn(worker, {
+ const networkThreadApi = (await spawn(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
- // TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satifies its contrains
- })) as unknown as NetworkWorkerApi;
+ // TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satisfies its contrains
+ })) as unknown as ModuleThread;
return new WorkerNetworkCore({
...modules,
- workerApi,
+ networkThreadApi,
worker,
});
}
async close(): Promise {
+ this.modules.logger.debug("closing network core running in network worker");
await this.getApi().close();
this.modules.logger.debug("terminating network worker");
- await Thread.terminate(this.modules.workerApi as unknown as Thread);
+ await terminateWorkerThread({
+ worker: this.getApi(),
+ retryCount: NETWORK_WORKER_EXIT_RETRY_COUNT,
+ retryMs: NETWORK_WORKER_EXIT_TIMEOUT_MS,
+ logger: this.modules.logger,
+ });
this.modules.logger.debug("terminated network worker");
}
@@ -231,7 +250,7 @@ export class WorkerNetworkCore implements INetworkCore {
return this.getApi().writeDiscv5Profile(durationMs, dirpath);
}
- private getApi(): NetworkWorkerApi {
- return this.modules.workerApi;
+ private getApi(): ModuleThread {
+ return this.modules.networkThreadApi;
}
}
diff --git a/packages/beacon-node/src/network/core/types.ts b/packages/beacon-node/src/network/core/types.ts
index d36d339e9a97..790c532aa2a4 100644
--- a/packages/beacon-node/src/network/core/types.ts
+++ b/packages/beacon-node/src/network/core/types.ts
@@ -87,6 +87,9 @@ export type NetworkWorkerData = {
* API exposed by the libp2p worker
*/
export type NetworkWorkerApi = INetworkCorePublic & {
+ // To satisfy the constraint of `ModuleThread` type
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [string: string]: (...args: any[]) => Promise | any;
// Async method through worker boundary
reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise;
reStatusPeers(peers: PeerIdStr[]): Promise;
diff --git a/packages/beacon-node/src/network/libp2p/noise.ts b/packages/beacon-node/src/network/libp2p/noise.ts
index fcd4f3c9354f..fcb00fe41893 100644
--- a/packages/beacon-node/src/network/libp2p/noise.ts
+++ b/packages/beacon-node/src/network/libp2p/noise.ts
@@ -1,36 +1,85 @@
+import crypto from "node:crypto";
import type {ConnectionEncrypter} from "@libp2p/interface/connection-encrypter";
-import {newInstance, ChaCha20Poly1305} from "@chainsafe/as-chacha20poly1305";
import {ICryptoInterface, noise, pureJsCrypto} from "@chainsafe/libp2p-noise";
import {digest} from "@chainsafe/as-sha256";
-
-type Bytes = Uint8Array;
-type Bytes32 = Uint8Array;
+import {newInstance, ChaCha20Poly1305} from "@chainsafe/as-chacha20poly1305";
const ctx = newInstance();
const asImpl = new ChaCha20Poly1305(ctx);
-// same to stablelib but we use as-chacha20poly1305 and as-sha256
-const lodestarCrypto: ICryptoInterface = {
- ...pureJsCrypto,
- hashSHA256(data: Uint8Array): Uint8Array {
- return digest(data);
+const CHACHA_POLY1305 = "chacha20-poly1305";
+
+const nodeCrypto: Pick