From 1152e607cc59a9869ae2858041474cb9fead012b Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Fri, 8 Sep 2023 14:45:56 +0200 Subject: [PATCH 1/3] Fix the close handler for the worker --- .../src/network/core/networkCoreWorker.ts | 26 ++++++------- .../network/core/networkCoreWorkerHandler.ts | 38 ++++++++++--------- .../beacon-node/src/network/core/types.ts | 3 ++ 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/packages/beacon-node/src/network/core/networkCoreWorker.ts b/packages/beacon-node/src/network/core/networkCoreWorker.ts index ef3408038343..a0c8ff22fe60 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorker.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorker.ts @@ -1,21 +1,18 @@ -import worker from "node:worker_threads"; import fs from "node:fs"; import path from "node:path"; -import {createFromProtobuf} from "@libp2p/peer-id-factory"; +import worker from "node:worker_threads"; +import type {ModuleThread} from "@chainsafe/threads"; import {expose} from "@chainsafe/threads/worker"; -import type {WorkerModule} from "@chainsafe/threads/dist/types/worker.js"; +import {createFromProtobuf} from "@libp2p/peer-id-factory"; import {chainConfigFromJson, createBeaconConfig} from "@lodestar/config"; import {getNodeLogger} from "@lodestar/logger/node"; -import {collectNodeJSMetrics, RegistryMetricCreator} from "../../metrics/index.js"; +import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js"; import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js"; import {Clock} from "../../util/clock.js"; -import {wireEventsOnWorkerThread} from "../../util/workerEvents.js"; -import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; import {peerIdToString} from "../../util/peerId.js"; import {profileNodeJS} from "../../util/profile.js"; -import {getNetworkCoreWorkerMetrics} from "./metrics.js"; -import {NetworkWorkerApi, NetworkWorkerData} from "./types.js"; -import {NetworkCore} from "./networkCore.js"; +import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; +import {wireEventsOnWorkerThread} from "../../util/workerEvents.js"; import { NetworkWorkerThreadEventType, ReqRespBridgeEventBus, @@ -24,8 +21,11 @@ import { getReqRespBridgeRespEvents, reqRespBridgeEventDirection, } from "./events.js"; +import {getNetworkCoreWorkerMetrics} from "./metrics.js"; +import {NetworkCore} from "./networkCore.js"; +import {NetworkWorkerApi, NetworkWorkerData} from "./types.js"; -// Cloned data from instatiation +// Cloned data from instantiation const workerData = worker.workerData as NetworkWorkerData; const parentPort = worker.parentPort; // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions @@ -120,9 +120,9 @@ wireEventsOnWorkerThread( ); const libp2pWorkerApi: NetworkWorkerApi = { - close: () => { + close: async () => { abortController.abort(); - return core.close(); + await core.close(); }, scrapeMetrics: () => core.scrapeMetrics(), @@ -162,4 +162,4 @@ const libp2pWorkerApi: NetworkWorkerApi = { }, }; -expose(libp2pWorkerApi as WorkerModule); +expose(libp2pWorkerApi as ModuleThread); diff --git a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts index 46c06456c429..a32a5491b135 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts @@ -1,24 +1,23 @@ import worker_threads from "node:worker_threads"; -import {exportToProtobuf} from "@libp2p/peer-id-factory"; -import {PeerId} from "@libp2p/interface/peer-id"; import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/dist/src/score/peer-score.js"; import {PublishOpts} from "@chainsafe/libp2p-gossipsub/types"; -import {spawn, Thread, Worker} from "@chainsafe/threads"; +import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads"; +import {PeerId} from "@libp2p/interface/peer-id"; +import {exportToProtobuf} from "@libp2p/peer-id-factory"; import {routes} from "@lodestar/api"; -import {phase0} from "@lodestar/types"; -import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp"; import {BeaconConfig, chainConfigToJson} from "@lodestar/config"; import type {LoggerNode} from "@lodestar/logger/node"; +import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp"; +import {phase0} from "@lodestar/types"; +import {Metrics} from "../../metrics/index.js"; import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js"; +import {peerIdFromString} from "../../util/peerId.js"; import {wireEventsOnMainThread} from "../../util/workerEvents.js"; -import {Metrics} from "../../metrics/index.js"; -import {IncomingRequestArgs, OutgoingRequestArgs, GetReqRespHandlerFn} from "../reqresp/types.js"; import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; -import {CommitteeSubscription} from "../subnets/interface.js"; -import {PeerAction, PeerScoreStats} from "../peers/index.js"; import {NetworkOptions} from "../options.js"; -import {peerIdFromString} from "../../util/peerId.js"; -import {NetworkWorkerApi, NetworkWorkerData, INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js"; +import {PeerAction, PeerScoreStats} from "../peers/index.js"; +import {GetReqRespHandlerFn, IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js"; +import {CommitteeSubscription} from "../subnets/interface.js"; import { NetworkWorkerThreadEventType, ReqRespBridgeEventBus, @@ -27,6 +26,7 @@ import { getReqRespBridgeRespEvents, reqRespBridgeEventDirection, } from "./events.js"; +import {INetworkCore, MultiaddrStr, NetworkWorkerApi, NetworkWorkerData, PeerIdStr} from "./types.js"; export type WorkerNetworkCoreOpts = NetworkOptions & { metricsEnabled: boolean; @@ -47,7 +47,7 @@ export type WorkerNetworkCoreInitModules = { }; type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & { - workerApi: NetworkWorkerApi; + networkApiThread: ModuleThread; worker: Worker; }; @@ -81,6 +81,10 @@ export class WorkerNetworkCore implements INetworkCore { reqRespBridgeEventDirection ); + Thread.errors(modules.networkApiThread).subscribe((err) => { + this.modules.logger.error("Network worker thread error", undefined, err); + }); + const {metrics} = modules; if (metrics) { metrics.networkWorkerHandler.reqRespBridgeReqCallerPending.addCollect(() => { @@ -124,16 +128,16 @@ export class WorkerNetworkCore implements INetworkCore { } as ConstructorParameters[1]); // eslint-disable-next-line @typescript-eslint/no-explicit-any - const workerApi = (await spawn(worker, { + const networkApiThread = (await spawn(worker, { // A Lodestar Node may do very expensive task at start blocking the event loop and causing // the initialization to timeout. The number below is big enough to almost disable the timeout timeout: 5 * 60 * 1000, // TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satifies its contrains - })) as unknown as NetworkWorkerApi; + })) as unknown as ModuleThread; return new WorkerNetworkCore({ ...modules, - workerApi, + networkApiThread, worker, }); } @@ -141,7 +145,7 @@ export class WorkerNetworkCore implements INetworkCore { async close(): Promise { await this.getApi().close(); this.modules.logger.debug("terminating network worker"); - await Thread.terminate(this.modules.workerApi as unknown as Thread); + await Thread.terminate(this.getApi() as unknown as Thread); this.modules.logger.debug("terminated network worker"); } @@ -232,6 +236,6 @@ export class WorkerNetworkCore implements INetworkCore { } private getApi(): NetworkWorkerApi { - return this.modules.workerApi; + return this.modules.networkApiThread; } } diff --git a/packages/beacon-node/src/network/core/types.ts b/packages/beacon-node/src/network/core/types.ts index d36d339e9a97..790c532aa2a4 100644 --- a/packages/beacon-node/src/network/core/types.ts +++ b/packages/beacon-node/src/network/core/types.ts @@ -87,6 +87,9 @@ export type NetworkWorkerData = { * API exposed by the libp2p worker */ export type NetworkWorkerApi = INetworkCorePublic & { + // To satisfy the constraint of `ModuleThread` type + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [string: string]: (...args: any[]) => Promise | any; // Async method through worker boundary reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise; reStatusPeers(peers: PeerIdStr[]): Promise; From 8192e8068e0cd5c40632acab7b6e250907ff9593 Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Fri, 8 Sep 2023 15:53:08 +0200 Subject: [PATCH 2/3] Add retry to exit the thread --- .../network/core/networkCoreWorkerHandler.ts | 14 ++++++-- packages/beacon-node/src/util/workerEvents.ts | 34 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts index a32a5491b135..898ea7c10469 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts @@ -12,7 +12,7 @@ import {phase0} from "@lodestar/types"; import {Metrics} from "../../metrics/index.js"; import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js"; import {peerIdFromString} from "../../util/peerId.js"; -import {wireEventsOnMainThread} from "../../util/workerEvents.js"; +import {terminateWorkerThread, wireEventsOnMainThread} from "../../util/workerEvents.js"; import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; import {NetworkOptions} from "../options.js"; import {PeerAction, PeerScoreStats} from "../peers/index.js"; @@ -51,6 +51,9 @@ type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & { worker: Worker; }; +const networkWorkerExitTimeoutMs = 1000; +const networkWorkerExitRetryCount = 3; + /** * NetworkCore implementation using a Worker thread */ @@ -145,7 +148,12 @@ export class WorkerNetworkCore implements INetworkCore { async close(): Promise { await this.getApi().close(); this.modules.logger.debug("terminating network worker"); - await Thread.terminate(this.getApi() as unknown as Thread); + await terminateWorkerThread({ + worker: this.getApi(), + retryCount: networkWorkerExitRetryCount, + retryMs: networkWorkerExitTimeoutMs, + logger: this.modules.logger, + }); this.modules.logger.debug("terminated network worker"); } @@ -235,7 +243,7 @@ export class WorkerNetworkCore implements INetworkCore { return this.getApi().writeDiscv5Profile(durationMs, dirpath); } - private getApi(): NetworkWorkerApi { + private getApi(): ModuleThread { return this.modules.networkApiThread; } } diff --git a/packages/beacon-node/src/util/workerEvents.ts b/packages/beacon-node/src/util/workerEvents.ts index 8926b6d18cb4..cd61e6b95393 100644 --- a/packages/beacon-node/src/util/workerEvents.ts +++ b/packages/beacon-node/src/util/workerEvents.ts @@ -1,4 +1,7 @@ import {MessagePort, Worker} from "node:worker_threads"; +import {Thread} from "@chainsafe/threads"; +import {Logger} from "@lodestar/logger"; +import {sleep} from "@lodestar/utils"; import {StrictEventEmitterSingleArg} from "./strictEvents.js"; export type WorkerBridgeEvent = { @@ -85,3 +88,34 @@ export function wireEventsOnMainThread( } } } + +export async function terminateWorkerThread({ + worker, + retryMs, + retryCount, + logger, +}: { + worker: Thread; + retryMs: number; + retryCount: number; + logger?: Logger; +}): Promise { + const terminated = new Promise((resolve) => { + Thread.events(worker).subscribe((event) => { + if (event.type === "termination") { + resolve(true); + } + }); + }); + + for (let i = 0; i < retryCount; i++) { + await Thread.terminate(worker); + const result = await Promise.race([terminated, sleep(retryMs).then(() => false)]); + + if (result) return; + + logger?.warn("Worker thread failed to terminate, retrying..."); + } + + throw new Error(`Worker thread failed to terminate in ${retryCount * retryMs}ms.`); +} From 67c7eb5040967048ffbf6707ccd3a9b06d05faac Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Fri, 8 Sep 2023 17:15:19 +0200 Subject: [PATCH 3/3] Update code with feedback --- .../network/core/networkCoreWorkerHandler.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts index 898ea7c10469..73ca9e9c5fd0 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts @@ -47,12 +47,12 @@ export type WorkerNetworkCoreInitModules = { }; type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & { - networkApiThread: ModuleThread; + networkThreadApi: ModuleThread; worker: Worker; }; -const networkWorkerExitTimeoutMs = 1000; -const networkWorkerExitRetryCount = 3; +const NETWORK_WORKER_EXIT_TIMEOUT_MS = 1000; +const NETWORK_WORKER_EXIT_RETRY_COUNT = 3; /** * NetworkCore implementation using a Worker thread @@ -84,8 +84,8 @@ export class WorkerNetworkCore implements INetworkCore { reqRespBridgeEventDirection ); - Thread.errors(modules.networkApiThread).subscribe((err) => { - this.modules.logger.error("Network worker thread error", undefined, err); + Thread.errors(modules.networkThreadApi).subscribe((err) => { + this.modules.logger.error("Network worker thread error", {}, err); }); const {metrics} = modules; @@ -131,7 +131,7 @@ export class WorkerNetworkCore implements INetworkCore { } as ConstructorParameters[1]); // eslint-disable-next-line @typescript-eslint/no-explicit-any - const networkApiThread = (await spawn(worker, { + const networkThreadApi = (await spawn(worker, { // A Lodestar Node may do very expensive task at start blocking the event loop and causing // the initialization to timeout. The number below is big enough to almost disable the timeout timeout: 5 * 60 * 1000, @@ -140,7 +140,7 @@ export class WorkerNetworkCore implements INetworkCore { return new WorkerNetworkCore({ ...modules, - networkApiThread, + networkThreadApi, worker, }); } @@ -150,8 +150,8 @@ export class WorkerNetworkCore implements INetworkCore { this.modules.logger.debug("terminating network worker"); await terminateWorkerThread({ worker: this.getApi(), - retryCount: networkWorkerExitRetryCount, - retryMs: networkWorkerExitTimeoutMs, + retryCount: NETWORK_WORKER_EXIT_RETRY_COUNT, + retryMs: NETWORK_WORKER_EXIT_TIMEOUT_MS, logger: this.modules.logger, }); this.modules.logger.debug("terminated network worker"); @@ -244,6 +244,6 @@ export class WorkerNetworkCore implements INetworkCore { } private getApi(): ModuleThread { - return this.modules.networkApiThread; + return this.modules.networkThreadApi; } }