From 5b29d9785ba531af19b6fb9a3a8f6d9a61b8b945 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 2 Dec 2020 18:08:28 +0000 Subject: [PATCH 1/9] feat(events): add events system --- src/interfaces.ts | 137 ++++++++++++++++++++++++++++++++++++++ src/lib.ts | 5 ++ src/main.ts | 163 +++++++++++++++++++++++++++++----------------- src/runner.ts | 3 + src/signals.ts | 10 ++- src/worker.ts | 91 +++++++++++++++++++------- 6 files changed, 325 insertions(+), 84 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index b8bb1931..d5357829 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,7 +1,9 @@ +import { EventEmitter } from "events"; import { Pool, PoolClient, QueryResult, QueryResultRow } from "pg"; import { Release } from "./lib"; import { Logger } from "./logger"; +import { Signal } from "./signals"; /* * Terminology: @@ -197,6 +199,7 @@ export interface Runner { stop: () => Promise; addJob: AddJobFunction; promise: Promise; + events: WorkerEvents; } export interface TaskSpec { @@ -280,6 +283,11 @@ export interface SharedOptions { * Graphile worker will skip the execution of any jobs that contain these flags */ forbiddenFlags?: null | string[] | ForbiddenFlagsFn; + + /** + * An EventEmitter instance to which we'll emit events. + */ + events?: WorkerEvents; } /** @@ -334,3 +342,132 @@ export interface RunnerOptions extends WorkerPoolOptions { } export interface WorkerUtilsOptions extends SharedOptions {} + +type BaseEventMap = Record; +type EventMapKey = string & keyof TEventMap; +type EventCallback = (params: TPayload) => void; + +interface TypedEventEmitter + extends EventEmitter { + addListener>( + eventName: TEventName, + callback: EventCallback, + ): this; + on>( + eventName: TEventName, + callback: EventCallback, + ): this; + once>( + eventName: TEventName, + callback: EventCallback, + ): this; + + removeListener>( + eventName: TEventName, + callback: EventCallback, + ): this; + off>( + eventName: TEventName, + callback: EventCallback, + ): this; + + emit>( + eventName: TEventName, + params: TEventMap[TEventName], + ): boolean; +} + +/** + * These are the events that a worker instance supports. + */ +export type WorkerEvents = TypedEventEmitter<{ + /** + * When a worker pool is created + */ + "pool:create": { workerPool: WorkerPool }; + + /** + * When a worker pool attempts to connect to PG ready to issue a LISTEN + * statement + */ + "pool:listen:connecting": { workerPool: WorkerPool }; + + /** + * When a worker pool starts listening for jobs via PG LISTEN + */ + "pool:listen:success": { workerPool: WorkerPool; client: PoolClient }; + + /** + * When a worker pool faces an error on their PG LISTEN client + */ + "pool:listen:error": { + workerPool: WorkerPool; + error: any; + client: PoolClient; + }; + + /** + * When a worker pool is released + */ + "pool:release": { pool: WorkerPool }; + + /** + * When a worker pool starts a graceful shutdown + */ + "pool:gracefulShutdown": { pool: WorkerPool; message: string }; + + /** + * When a worker pool graceful shutdown throws an error + */ + "pool:gracefulShutdown:error": { pool: WorkerPool; error: any }; + + /** + * When a worker is created + */ + "worker:create": { worker: Worker; tasks: TaskList }; + + /** + * When a worker calls get_job but there are no available jobs + */ + "worker:getJob:error": { worker: Worker; error: any }; + + /** + * When a worker calls get_job but there are no available jobs + */ + "worker:getJob:empty": { worker: Worker }; + + /** + * When a worker is created + */ + "worker:fatalError": { worker: Worker; error: any; jobError: any | null }; + + /** + * When a job is retrieved by get_job + */ + "job:start": { worker: Worker; job: Job }; + + /** + * When a job completes successfully + */ + "job:success": { worker: Worker; job: Job }; + + /** + * When a job throws an error + */ + "job:error": { worker: Worker; job: Job; error: any }; + + /** + * When a job fails permanently (emitted after job:error when appropriate) + */ + "job:failed": { worker: Worker; job: Job; error: any }; + + /** + * When the runner is terminated by a signal + */ + gracefulShutdown: { signal: Signal }; + + /** + * When the runner is stopped + */ + stop: {}; +}>; diff --git a/src/lib.ts b/src/lib.ts index d7d449a9..ad0b91a1 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -1,4 +1,5 @@ import * as assert from "assert"; +import { EventEmitter } from "events"; import { Client, Pool } from "pg"; import { defaults } from "./config"; @@ -8,11 +9,13 @@ import { RunnerOptions, SharedOptions, WithPgClient, + WorkerEvents, } from "./interfaces"; import { defaultLogger, Logger, LogScope } from "./logger"; import { migrate } from "./migrate"; interface CompiledSharedOptions { + events: WorkerEvents; logger: Logger; workerSchema: string; escapedWorkerSchema: string; @@ -33,9 +36,11 @@ export function processSharedOptions( const { logger = defaultLogger, schema: workerSchema = defaults.schema, + events = new EventEmitter(), } = options; const escapedWorkerSchema = Client.prototype.escapeIdentifier(workerSchema); compiled = { + events, logger, workerSchema, escapedWorkerSchema, diff --git a/src/main.ts b/src/main.ts index f04d4a43..da4f7098 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "events"; import { Pool, PoolClient } from "pg"; import { inspect } from "util"; @@ -11,6 +12,7 @@ import { Job, TaskList, Worker, + WorkerEvents, WorkerOptions, WorkerPool, WorkerPoolOptions, @@ -25,14 +27,38 @@ const allWorkerPools: Array = []; // Exported for testing only export { allWorkerPools as _allWorkerPools }; +/** + * All pools share the same signal handlers, so we need to broadcast + * gracefulShutdown to all the pools' events; we use this event emitter to + * aggregate these requests. + */ +let _signalHandlersEventEmitter: WorkerEvents = new EventEmitter(); + +/** + * Only register the signal handlers once _globally_. + */ let _registeredSignalHandlers = false; + +/** + * Only trigger graceful shutdown once. + */ let _shuttingDown = false; -function registerSignalHandlers(logger: Logger) { + +/** + * This will register the signal handlers to make sure the worker shuts down + * gracefully if it can. It will only register signal handlers once; even if + * you call it multiple times it will always use the first logger it is passed, + * future calls will register the events but take no further actions. + */ +function registerSignalHandlers(logger: Logger, events: WorkerEvents) { if (_shuttingDown) { throw new Error( "System has already gone into shutdown, should not be spawning new workers now!", ); } + _signalHandlersEventEmitter.on("gracefulShutdown", (o) => + events.emit("gracefulShutdown", o), + ); if (_registeredSignalHandlers) { return; } @@ -54,6 +80,7 @@ function registerSignalHandlers(logger: Logger) { return; } _shuttingDown = true; + _signalHandlersEventEmitter.emit("gracefulShutdown", { signal }); Promise.all( allWorkerPools.map((pool) => pool.gracefulShutdown(`Forced worker shutdown due to ${signal}`), @@ -73,7 +100,7 @@ export function runTaskList( tasks: TaskList, pgPool: Pool, ): WorkerPool { - const { logger, escapedWorkerSchema } = processSharedOptions(options); + const { logger, escapedWorkerSchema, events } = processSharedOptions(options); logger.debug(`Worker pool options are ${inspect(options)}`, { options }); const { concurrency = defaults.concurrentJobs, @@ -83,7 +110,7 @@ export function runTaskList( if (!noHandleSignals) { // Clean up when certain signals occur - registerSignalHandlers(logger); + registerSignalHandlers(logger, events); } const promise = deferred(); @@ -105,12 +132,69 @@ export function runTaskList( } }; + // This is a representation of us that can be interacted with externally + const workerPool: WorkerPool = { + release: async () => { + events.emit("pool:release", { pool: this }); + unlistenForChanges(); + promise.resolve(); + await Promise.all(workers.map((worker) => worker.release())); + const idx = allWorkerPools.indexOf(workerPool); + allWorkerPools.splice(idx, 1); + }, + + // Make sure we clean up after ourselves even if a signal is caught + async gracefulShutdown(message: string) { + events.emit("pool:gracefulShutdown", { pool: this, message }); + try { + logger.debug(`Attempting graceful shutdown`); + // Release all our workers' jobs + const workerIds = workers.map((worker) => worker.workerId); + const jobsInProgress: Array = workers + .map((worker) => worker.getActiveJob()) + .filter((job): job is Job => !!job); + // Remove all the workers - we're shutting them down manually + workers.splice(0, workers.length).map((worker) => worker.release()); + logger.debug(`Releasing the jobs '${workerIds.join(", ")}'`, { + workerIds, + }); + const { rows: cancelledJobs } = await pgPool.query( + ` + SELECT ${escapedWorkerSchema}.fail_job(job_queues.locked_by, jobs.id, $2) + FROM ${escapedWorkerSchema}.jobs + INNER JOIN ${escapedWorkerSchema}.job_queues ON (job_queues.queue_name = jobs.queue_name) + WHERE job_queues.locked_by = ANY($1::text[]) AND jobs.id = ANY($3::int[]); + `, + [workerIds, message, jobsInProgress.map((job) => job.id)], + ); + logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { + cancelledJobs, + }); + logger.debug("Jobs released"); + } catch (e) { + events.emit("pool:gracefulShutdown:error", { pool: this, error: e }); + logger.error(`Error occurred during graceful shutdown: ${e.message}`, { + error: e, + }); + } + // Remove ourself from the list of worker pools + this.release(); + }, + + promise, + }; + + // Ensure that during a forced shutdown we get cleaned up too + allWorkerPools.push(workerPool); + events.emit("pool:create", { workerPool }); + const listenForChanges = ( err: Error | undefined, client: PoolClient, release: () => void, ) => { if (err) { + events.emit("pool:listen:error", { workerPool, client, error: err }); logger.error( `Error connecting with notify listener (trying again in 5 seconds): ${err.message}`, { error: err }, @@ -121,6 +205,7 @@ export function runTaskList( }, 5000); return; } + events.emit("pool:listen:success", { workerPool, client }); listenForChangesClient = client; client.on("notification", () => { if (listenForChangesClient === client) { @@ -129,11 +214,9 @@ export function runTaskList( } }); - // Subscribe to jobs:insert message - client.query('LISTEN "jobs:insert"'); - // On error, release this client and try again client.on("error", (e: Error) => { + events.emit("pool:listen:error", { workerPool, client, error: e }); logger.error(`Error with database notify listener: ${e.message}`, { error: e, }); @@ -145,9 +228,13 @@ export function runTaskList( error: e, }); } + events.emit("pool:listen:connecting", { workerPool }); pgPool.connect(listenForChanges); }); + // Subscribe to jobs:insert message + client.query('LISTEN "jobs:insert"'); + const supportedTaskNames = Object.keys(tasks); logger.info( @@ -158,60 +245,9 @@ export function runTaskList( }; // Create a client dedicated to listening for new jobs. + events.emit("pool:listen:connecting", { workerPool }); pgPool.connect(listenForChanges); - // This is a representation of us that can be interacted with externally - const workerPool = { - release: async () => { - unlistenForChanges(); - promise.resolve(); - await Promise.all(workers.map((worker) => worker.release())); - const idx = allWorkerPools.indexOf(workerPool); - allWorkerPools.splice(idx, 1); - }, - - // Make sure we clean up after ourselves even if a signal is caught - async gracefulShutdown(message: string) { - try { - logger.debug(`Attempting graceful shutdown`); - // Release all our workers' jobs - const workerIds = workers.map((worker) => worker.workerId); - const jobsInProgress: Array = workers - .map((worker) => worker.getActiveJob()) - .filter((job): job is Job => !!job); - // Remove all the workers - we're shutting them down manually - workers.splice(0, workers.length).map((worker) => worker.release()); - logger.debug(`Releasing the jobs '${workerIds.join(", ")}'`, { - workerIds, - }); - const { rows: cancelledJobs } = await pgPool.query( - ` - SELECT ${escapedWorkerSchema}.fail_job(job_queues.locked_by, jobs.id, $2) - FROM ${escapedWorkerSchema}.jobs - INNER JOIN ${escapedWorkerSchema}.job_queues ON (job_queues.queue_name = jobs.queue_name) - WHERE job_queues.locked_by = ANY($1::text[]) AND jobs.id = ANY($3::int[]); - `, - [workerIds, message, jobsInProgress.map((job) => job.id)], - ); - logger.debug(`Cancelled ${cancelledJobs.length} jobs`, { - cancelledJobs, - }); - logger.debug("Jobs released"); - } catch (e) { - logger.error(`Error occurred during graceful shutdown: ${e.message}`, { - error: e, - }); - } - // Remove ourself from the list of worker pools - this.release(); - }, - - promise, - }; - - // Ensure that during a forced shutdown we get cleaned up too - allWorkerPools.push(workerPool); - // Spawn our workers; they can share clients from the pool. const withPgClient = makeWithPgClientFromPool(pgPool); for (let i = 0; i < concurrency; i++) { @@ -227,6 +263,11 @@ export const runTaskListOnce = ( options: WorkerOptions, tasks: TaskList, client: PoolClient, -) => - makeNewWorker(options, tasks, makeWithPgClientFromClient(client), false) - .promise; +) => { + return makeNewWorker( + options, + tasks, + makeWithPgClientFromClient(client), + false, + ).promise; +}; diff --git a/src/runner.ts b/src/runner.ts index 43f16e01..5cfdf156 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -75,6 +75,7 @@ export const run = async ( release, releasers, addJob, + events, } = await getUtilsAndReleasersFromOptions(options); try { @@ -89,6 +90,7 @@ export const run = async ( async stop() { if (running) { running = false; + events.emit("stop", {}); await release(); } else { throw new Error("Runner is already stopped"); @@ -96,6 +98,7 @@ export const run = async ( }, addJob, promise: workerPool.promise, + events, }; } catch (e) { await release(); diff --git a/src/signals.ts b/src/signals.ts index 0536768f..55deece2 100644 --- a/src/signals.ts +++ b/src/signals.ts @@ -1,3 +1,11 @@ +export type Signal = + | "SIGUSR2" + | "SIGINT" + | "SIGTERM" + | "SIGPIPE" + | "SIGHUP" + | "SIGABRT"; + export default [ "SIGUSR2", "SIGINT", @@ -5,4 +13,4 @@ export default [ "SIGPIPE", "SIGHUP", "SIGABRT", -] as Array<"SIGUSR2" | "SIGINT" | "SIGTERM" | "SIGPIPE" | "SIGHUP" | "SIGABRT">; +] as Array; diff --git a/src/worker.ts b/src/worker.ts index fec27dc4..26972311 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -30,6 +30,7 @@ export function makeNewWorker( escapedWorkerSchema, logger, maxContiguousErrors, + events, } = processSharedOptions(options, { scope: { label: "worker", @@ -62,6 +63,29 @@ export function makeNewWorker( return promise; }; + const nudge = () => { + assert(active, "nudge called after worker terminated"); + if (doNextTimer) { + // Must be idle; call early + doNext(); + return true; + } else { + again = true; + // Not idle; find someone else! + return false; + } + }; + + const worker: Worker = { + nudge, + workerId, + release, + promise, + getActiveJob: () => activeJob, + }; + + events.emit("worker:create", { worker, tasks }); + logger.debug(`Spawned`); let contiguousErrors = 0; @@ -99,7 +123,7 @@ export function makeNewWorker( text: // TODO: breaking change; change this to more optimal: // `SELECT id, queue_name, task_identifier, payload FROM ...`, - `SELECT * FROM ${escapedWorkerSchema}.get_job($1, $2, forbidden_flags := $3::text[]);`, + `SELECT * FROM ${escapedWorkerSchema}.get_job($1, $2, forbidden_flags := $3::text[]); `, values: [ workerId, supportedTaskNames, @@ -112,7 +136,14 @@ export function makeNewWorker( // `doNext` cannot be executed concurrently, so we know this is safe. // eslint-disable-next-line require-atomic-updates activeJob = jobRow && jobRow.id ? jobRow : null; + + if (activeJob) { + events.emit("job:start", { worker, job: activeJob }); + } else { + events.emit("worker:getJob:empty", { worker }); + } } catch (err) { + events.emit("worker:getJob:error", { worker, error: err }); if (continuous) { contiguousErrors++; logger.debug( @@ -188,6 +219,24 @@ export function makeNewWorker( const durationRaw = process.hrtime(startTimestamp); const duration = durationRaw[0] * 1e3 + durationRaw[1] * 1e-6; if (err) { + try { + events.emit("job:error", { worker, job, error: err }); + } catch (e) { + logger.error( + "Error occurred in event emitter for 'job:error'; this is an issue in your application code and you should fix it", + ); + } + if (job.attempts >= job.max_attempts) { + try { + // Failed forever + events.emit("job:failed", { worker, job, error: err }); + } catch (e) { + logger.error( + "Error occurred in event emitter for 'job:failed'; this is an issue in your application code and you should fix it", + ); + } + } + const { message: rawMessage, stack } = err; /** @@ -215,6 +264,13 @@ export function makeNewWorker( }), ); } else { + try { + events.emit("job:success", { worker, job }); + } catch (e) { + logger.error( + "Error occurred in event emitter for 'job:success'; this is an issue in your application code and you should fix it", + ); + } if (!process.env.NO_LOG_SUCCESS) { logger.info( `Completed task ${job.id} (${ @@ -235,6 +291,18 @@ export function makeNewWorker( ); } } catch (fatalError) { + try { + events.emit("worker:fatalError", { + worker, + error: fatalError, + jobError: err, + }); + } catch (e) { + logger.error( + "Error occurred in event emitter for 'worker:fatalError'; this is an issue in your application code and you should fix it", + ); + } + const when = err ? `after failure '${err.message}'` : "after success"; logger.error( `Failed to release job '${job.id}' ${when}; committing seppuku\n${fatalError.message}`, @@ -255,30 +323,9 @@ export function makeNewWorker( } }; - const nudge = () => { - assert(active, "nudge called after worker terminated"); - if (doNextTimer) { - // Must be idle; call early - doNext(); - return true; - } else { - again = true; - // Not idle; find someone else! - return false; - } - }; - // Start! doNext(); - const worker = { - nudge, - workerId, - release, - promise, - getActiveJob: () => activeJob, - }; - // For tests promise["worker"] = worker; From 2d522e75b621ce1d364433dd2cc2151117598035 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 2 Dec 2020 18:40:11 +0000 Subject: [PATCH 2/9] Revert unnecessary change --- src/main.ts | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/main.ts b/src/main.ts index da4f7098..86f349cc 100644 --- a/src/main.ts +++ b/src/main.ts @@ -263,11 +263,6 @@ export const runTaskListOnce = ( options: WorkerOptions, tasks: TaskList, client: PoolClient, -) => { - return makeNewWorker( - options, - tasks, - makeWithPgClientFromClient(client), - false, - ).promise; -}; +) => + makeNewWorker(options, tasks, makeWithPgClientFromClient(client), false) + .promise; From b492f1d032c185e5fef34d61d875b69fa599e35b Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 7 Dec 2020 16:53:36 +0000 Subject: [PATCH 3/9] Document events --- README.md | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 142 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 47ceafbd..c21d2277 100644 --- a/README.md +++ b/README.md @@ -262,16 +262,10 @@ the APIs for running jobs. ### `run(options: RunnerOptions): Promise` Runs until either stopped by a signal event like `SIGINT` or by calling the -`stop()` function on the Runner object `run()` resolves to. +`stop()` method on the resolved object. -The Runner object also contains a `addJob` method (see [`addJob`](#addjob)) that -can be used to enqueue jobs: - -```js -await runner.addJob("testTask", { - thisIsThePayload: true, -}); -``` +The the resolved 'Runner' object has a number of helpers on it, see +[Runner object](#runner-object) for more information. ### `runOnce(options: RunnerOptions): Promise` @@ -319,6 +313,145 @@ One of these must be provided (in order of priority): - [PostgreSQL environmental variables](https://www.postgresql.org/docs/current/libpq-envars.html), including at least `PGDATABASE` (NOTE: not all envvars are supported) +### `Runner` object + +The `run` method above resolves to a 'Runner' object that has the following +methods and properties: + +- `stop(): Promise` - stops the runner from accepting new jobs, and + returns a promise that resolves when all the in progress tasks (if any) are + complete. +- `addJob: AddJobFunction` - see [`addJob`](#addjob). +- `promise: Promise` - a promise that resolves once the runner has + completed. +- `events: WorkerEvents` - a Node.js `EventEmitter` that exposes certain events + within the runner (see [`WorkerEvents`](#workerevents)). + +#### Example: adding a job with `runner.addJob` + +See [`addJob`](#addjob) for more details. + +```js +await runner.addJob("testTask", { + thisIsThePayload: true, +}); +``` + +#### Example: listening to an event with `runner.events` + +See [`WorkerEvents`](#workerevents) for more details. + +```js +runner.events.on("job.success", ({ worker, job }) => { + console.log(`Hooray! Worker ${worker.workerId} completed job ${job.id}`); +}); +``` + +### `WorkerEvents` + +We support a large number of events via an EventEmitter. You can either retrieve +the event emitter via the `events` property on the `Runner` object, or you can +create your own event emitter and pass it to Graphile Worker via the +`WorkerOptions.events` option (this is primarily useful for getting events from +the other Graphile Worker entrypoints). + +Details of what events we support and what data is available on the event +payload is detailed below in TypeScript syntax: + +``` +export type WorkerEvents = TypedEventEmitter<{ + /** + * When a worker pool is created + */ + "pool:create": { workerPool: WorkerPool }; + + /** + * When a worker pool attempts to connect to PG ready to issue a LISTEN + * statement + */ + "pool:listen:connecting": { workerPool: WorkerPool }; + + /** + * When a worker pool starts listening for jobs via PG LISTEN + */ + "pool:listen:success": { workerPool: WorkerPool; client: PoolClient }; + + /** + * When a worker pool faces an error on their PG LISTEN client + */ + "pool:listen:error": { + workerPool: WorkerPool; + error: any; + client: PoolClient; + }; + + /** + * When a worker pool is released + */ + "pool:release": { pool: WorkerPool }; + + /** + * When a worker pool starts a graceful shutdown + */ + "pool:gracefulShutdown": { pool: WorkerPool; message: string }; + + /** + * When a worker pool graceful shutdown throws an error + */ + "pool:gracefulShutdown:error": { pool: WorkerPool; error: any }; + + /** + * When a worker is created + */ + "worker:create": { worker: Worker; tasks: TaskList }; + + /** + * When a worker calls get_job but there are no available jobs + */ + "worker:getJob:error": { worker: Worker; error: any }; + + /** + * When a worker calls get_job but there are no available jobs + */ + "worker:getJob:empty": { worker: Worker }; + + /** + * When a worker is created + */ + "worker:fatalError": { worker: Worker; error: any; jobError: any | null }; + + /** + * When a job is retrieved by get_job + */ + "job:start": { worker: Worker; job: Job }; + + /** + * When a job completes successfully + */ + "job:success": { worker: Worker; job: Job }; + + /** + * When a job throws an error + */ + "job:error": { worker: Worker; job: Job; error: any }; + + /** + * When a job fails permanently (emitted after job:error when appropriate) + */ + "job:failed": { worker: Worker; job: Job; error: any }; + + /** + * When the runner is terminated by a signal + */ + gracefulShutdown: { signal: Signal }; + + /** + * When the runner is stopped + */ + stop: {}; +}>; +``` + ## Library usage: queueing jobs You can also use the `graphile-worker` library to queue jobs using one of the From 185f20290b56a38c7a27b999ab919f3d328d12d6 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 7 Dec 2020 16:57:01 +0000 Subject: [PATCH 4/9] Add 'events' option --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index c21d2277..1170931b 100644 --- a/README.md +++ b/README.md @@ -301,6 +301,9 @@ The following options for these methods are available. - `schema` can be used to change the default `graphile_worker` schema to something else (equivalent to `--schema` on the CLI) - `forbiddenFlags` see [Forbidden flags](#forbidden-flags) below +- `events`: pass your own `new EventEmitter()` if you want to customize the + options, get earlier events (before the runner object resolves), or want to + get events from alternative Graphile Worker entrypoints. Exactly one of either `taskDirectory` or `taskList` must be provided (except for `runMigrations` which doesn't require a task list). From e16faa59f84ff86200e03a00ff619c830f8119f0 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 9 Dec 2020 10:56:40 +0000 Subject: [PATCH 5/9] Typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1170931b..2679ec97 100644 --- a/README.md +++ b/README.md @@ -345,7 +345,7 @@ await runner.addJob("testTask", { See [`WorkerEvents`](#workerevents) for more details. ```js -runner.events.on("job.success", ({ worker, job }) => { +runner.events.on("job:success", ({ worker, job }) => { console.log(`Hooray! Worker ${worker.workerId} completed job ${job.id}`); }); ``` From f2fb48310fe8aa6d57b7b3ad6dc617976f0dbb77 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 9 Dec 2020 10:56:53 +0000 Subject: [PATCH 6/9] Don't create new options object, unnecessary --- src/main.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main.ts b/src/main.ts index 86f349cc..3dbb2942 100644 --- a/src/main.ts +++ b/src/main.ts @@ -102,11 +102,7 @@ export function runTaskList( ): WorkerPool { const { logger, escapedWorkerSchema, events } = processSharedOptions(options); logger.debug(`Worker pool options are ${inspect(options)}`, { options }); - const { - concurrency = defaults.concurrentJobs, - noHandleSignals, - ...workerOptions - } = options; + const { concurrency = defaults.concurrentJobs, noHandleSignals } = options; if (!noHandleSignals) { // Clean up when certain signals occur @@ -251,7 +247,7 @@ export function runTaskList( // Spawn our workers; they can share clients from the pool. const withPgClient = makeWithPgClientFromPool(pgPool); for (let i = 0; i < concurrency; i++) { - workers.push(makeNewWorker(workerOptions, tasks, withPgClient)); + workers.push(makeNewWorker(options, tasks, withPgClient)); } // TODO: handle when a worker shuts down (spawn a new one) From e93d023fdb9fd63f9ffd2fc317853880100cd89f Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 9 Dec 2020 11:40:27 +0000 Subject: [PATCH 7/9] Add basic tests for some events. --- __tests__/events.test.ts | 126 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 __tests__/events.test.ts diff --git a/__tests__/events.test.ts b/__tests__/events.test.ts new file mode 100644 index 00000000..635b1550 --- /dev/null +++ b/__tests__/events.test.ts @@ -0,0 +1,126 @@ +import { EventEmitter } from "events"; +import { Pool } from "pg"; + +import { run } from "../src"; +import deferred, { Deferred } from "../src/deferred"; +import { Task, TaskList, WorkerSharedOptions } from "../src/interfaces"; +import { + ESCAPED_GRAPHILE_WORKER_SCHEMA, + jobCount, + reset, + sleep, + sleepUntil, + withPgPool, +} from "./helpers"; + +const EVENTS = [ + "pool:create", + "pool:listen:connecting", + "pool:listen:success", + "pool:listen:error", + "pool:release", + "pool:gracefulShutdown", + "pool:gracefulShutdown:error", + "worker:create", + "worker:getJob:error", + "worker:getJob:empty", + "worker:fatalError", + "job:start", + "job:success", + "job:error", + "job:failed", + "gracefulShutdown", + "stop", +]; + +const addJob = (pgPool: Pool, id?: string | number) => + pgPool.query( + `select ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.add_job('job1', json_build_object('id', $1::text), 'serial')`, + [String(id != null ? id : Math.random())], + ); + +const options: WorkerSharedOptions = {}; + +test("emits the expected events", () => + withPgPool(async (pgPool) => { + await reset(pgPool, options); + + // Build the tasks + const jobPromises: { + [id: string]: Deferred; + } = {}; + const job1: Task = jest.fn(({ id }: { id: string }) => { + const jobPromise = deferred(); + if (jobPromises[id]) { + throw new Error("Job with this id already registered"); + } + jobPromises[id] = jobPromise; + return jobPromise; + }); + const tasks: TaskList = { + job1, + }; + + // Run the worker + const events = new EventEmitter(); + + const emittedEvents: Array<{ event: string; payload: any }> = []; + function createListener(event: string) { + return (payload: any) => { + emittedEvents.push({ event, payload }); + }; + } + + EVENTS.forEach((event) => { + events.on(event, createListener(event)); + }); + + const CONCURRENCY = 3; + const runner = await run({ + concurrency: CONCURRENCY, + pgPool, + taskList: tasks, + events, + }); + + expect(runner.events).toEqual(events); + + const eventCount = (name: string) => + emittedEvents.map((obj) => obj.event).filter((n) => n === name).length; + + // NOTE: these are the events that get emitted _before_ `run` resolves; so + // you can only receive these if you pass an EventEmitter to run manually. + expect(eventCount("pool:create")).toEqual(1); + expect(eventCount("pool:listen:connecting")).toEqual(1); + expect(eventCount("worker:create")).toEqual(CONCURRENCY); + + let finished = false; + runner.promise.then(() => { + finished = true; + }); + + for (let i = 0; i < 5; i++) { + await addJob(pgPool, i); + } + + for (let i = 0; i < 5; i++) { + await sleepUntil(() => !!jobPromises[i]); + expect(eventCount("job:start")).toEqual(i + 1); + expect(eventCount("job:success")).toEqual(i); + jobPromises[i].resolve(); + await sleepUntil(() => eventCount("job:success") === i + 1); + } + + await sleep(1); + expect(finished).toBeFalsy(); + expect(eventCount("stop")).toEqual(0); + expect(eventCount("pool:release")).toEqual(0); + await runner.stop(); + expect(eventCount("stop")).toEqual(1); + expect(job1).toHaveBeenCalledTimes(5); + await sleep(1); + expect(finished).toBeTruthy(); + await runner.promise; + expect(eventCount("pool:release")).toEqual(1); + expect(await jobCount(pgPool)).toEqual(0); + })); From 4667b91c9445e216010a7750e98019db1090c228 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 9 Dec 2020 11:48:55 +0000 Subject: [PATCH 8/9] Add worker:release and worker:stop events --- README.md | 10 ++++++++++ __tests__/events.test.ts | 5 +++++ src/interfaces.ts | 10 ++++++++++ src/worker.ts | 9 +++++++++ 4 files changed, 34 insertions(+) diff --git a/README.md b/README.md index 2679ec97..fc56f087 100644 --- a/README.md +++ b/README.md @@ -408,6 +408,16 @@ export type WorkerEvents = TypedEventEmitter<{ */ "worker:create": { worker: Worker; tasks: TaskList }; + /** + * When a worker release is requested + */ + "worker:release": { worker: Worker }; + + /** + * When a worker stops (normally after a release) + */ + "worker:stop": { worker: Worker; error?: any }; + /** * When a worker calls get_job but there are no available jobs */ diff --git a/__tests__/events.test.ts b/__tests__/events.test.ts index 635b1550..daca7777 100644 --- a/__tests__/events.test.ts +++ b/__tests__/events.test.ts @@ -22,6 +22,8 @@ const EVENTS = [ "pool:gracefulShutdown", "pool:gracefulShutdown:error", "worker:create", + "worker:release", + "worker:stop", "worker:getJob:error", "worker:getJob:empty", "worker:fatalError", @@ -114,6 +116,7 @@ test("emits the expected events", () => await sleep(1); expect(finished).toBeFalsy(); expect(eventCount("stop")).toEqual(0); + expect(eventCount("worker:release")).toEqual(0); expect(eventCount("pool:release")).toEqual(0); await runner.stop(); expect(eventCount("stop")).toEqual(1); @@ -121,6 +124,8 @@ test("emits the expected events", () => await sleep(1); expect(finished).toBeTruthy(); await runner.promise; + expect(eventCount("worker:release")).toEqual(CONCURRENCY); + expect(eventCount("worker:stop")).toEqual(CONCURRENCY); expect(eventCount("pool:release")).toEqual(1); expect(await jobCount(pgPool)).toEqual(0); })); diff --git a/src/interfaces.ts b/src/interfaces.ts index d5357829..6cf5ea22 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -426,6 +426,16 @@ export type WorkerEvents = TypedEventEmitter<{ */ "worker:create": { worker: Worker; tasks: TaskList }; + /** + * When a worker release is requested + */ + "worker:release": { worker: Worker }; + + /** + * When a worker stops (normally after a release) + */ + "worker:stop": { worker: Worker; error?: any }; + /** * When a worker calls get_job but there are no available jobs */ diff --git a/src/worker.ts b/src/worker.ts index 26972311..5080f045 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -38,6 +38,14 @@ export function makeNewWorker( }, }); const promise = deferred(); + promise.then( + () => { + events.emit("worker:stop", { worker }); + }, + (error) => { + events.emit("worker:stop", { worker, error }); + }, + ); let activeJob: Job | null = null; let doNextTimer: NodeJS.Timer | null = null; @@ -56,6 +64,7 @@ export function makeNewWorker( return; } active = false; + events.emit("worker:release", { worker }); if (cancelDoNext()) { // Nothing in progress; resolve the promise promise.resolve(); From 606024e922cbc2e07095b750c48b6eee3b82c8c9 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Wed, 9 Dec 2020 12:12:33 +0000 Subject: [PATCH 9/9] Add examples from the README --- examples/readme/README.md | 10 ++++++++ examples/readme/events.js | 42 +++++++++++++++++++++++++++++++++ examples/readme/tasks/task_2.js | 4 ++++ 3 files changed, 56 insertions(+) create mode 100644 examples/readme/README.md create mode 100644 examples/readme/events.js create mode 100644 examples/readme/tasks/task_2.js diff --git a/examples/readme/README.md b/examples/readme/README.md new file mode 100644 index 00000000..08cebd64 --- /dev/null +++ b/examples/readme/README.md @@ -0,0 +1,10 @@ +Examples from the README, mostly for testing. + +- `events.js` is a combination of the + [Quickstart: library](https://github.com/graphile/worker/blob/main/README.md#quickstart-library) + example with the + [Example: listening to an event with `runner.events`](https://github.com/graphile/worker/blob/main/README.md#example-listening-to-an-event-with-runnerevents) + example; it's designed to be run standalone +- `tasks/task_2.js` to be used with `await addJob("task_2", { foo: "bar" });`; + to run this you can run `graphile-worker -c your_database_here` in this folder + and it should pick up the task automatically. diff --git a/examples/readme/events.js b/examples/readme/events.js new file mode 100644 index 00000000..3d061129 --- /dev/null +++ b/examples/readme/events.js @@ -0,0 +1,42 @@ +const { run, quickAddJob } = require(/* "graphile-worker" */ "../.."); + +async function main() { + // Run a worker to execute jobs: + const runner = await run({ + connectionString: "postgres:///my_db", + concurrency: 5, + // Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc + noHandleSignals: false, + pollInterval: 1000, + // you can set the taskList or taskDirectory but not both + taskList: { + hello: async (payload, helpers) => { + const { name } = payload; + helpers.logger.info(`Hello, ${name}`); + }, + }, + // or: + // taskDirectory: `${__dirname}/tasks`, + }); + + runner.events.on("job:success", ({ worker, job }) => { + console.log(`Hooray! Worker ${worker.workerId} completed job ${job.id}`); + }); + + // Or add a job to be executed: + await quickAddJob( + // makeWorkerUtils options + { connectionString: "postgres:///my_db" }, + + // Task identifier + "hello", + + // Payload + { name: "Bobby Tables" }, + ); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/examples/readme/tasks/task_2.js b/examples/readme/tasks/task_2.js new file mode 100644 index 00000000..ee64cdb4 --- /dev/null +++ b/examples/readme/tasks/task_2.js @@ -0,0 +1,4 @@ +module.exports = async (payload, helpers) => { + // async is optional, but best practice + helpers.logger.debug(`Received ${JSON.stringify(payload)}`); +};