-
-
Notifications
You must be signed in to change notification settings - Fork 317
/
Copy paththreadPool.ts
70 lines (65 loc) · 2.16 KB
/
threadPool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import {spawn, Pool, Worker, ModuleThread, QueuedTask} from "@chainsafe/threads";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI} from "./types.js";
import {maxPoolSize} from "./poolSize.js";
/**
* Thread pool to decrypt keystores
*/
export class DecryptKeystoresThreadPool {
private pool: Pool<ModuleThread<DecryptKeystoreWorkerAPI>>;
private tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
private terminatePoolHandler: () => void;
constructor(
keystoreCount: number,
private readonly signal: AbortSignal
) {
this.pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker("./worker.js"), {
// The number below is big enough to almost disable the timeout
// which helps during tests run on unpredictably slow hosts
timeout: 5 * 60 * 1000,
}),
{
// Adjust worker pool size based on keystore count
size: Math.min(keystoreCount, maxPoolSize),
// Decrypt keystores in sequence, increasing concurrency does not improve performance
concurrency: 1,
}
);
// Terminate worker threads when process receives exit signal
this.terminatePoolHandler = () => {
void this.pool.terminate(true);
};
signal.addEventListener("abort", this.terminatePoolHandler, {once: true});
}
/**
* Add keystore to the task queue to be decrypted
*/
queue(
args: DecryptKeystoreArgs,
onDecrypted: (secretKeyBytes: Uint8Array) => void,
onError: (e: Error) => void
): void {
const task = this.pool.queue((thread) => thread.decryptKeystore(args));
this.tasks.push(task);
task.then(onDecrypted).catch(onError);
}
/**
* Resolves once all queued tasks are completed and terminates worker threads.
* Errors during executing can be captured in `onError` handler for each task.
*/
async completed(): Promise<void> {
await this.pool.settled(true);
await this.pool.terminate();
this.signal.removeEventListener("abort", this.terminatePoolHandler);
}
/**
* Cancel all pending tasks
*/
cancel(): void {
for (const task of this.tasks) {
task.cancel();
}
this.tasks = [];
}
}