diff --git a/bin/worker.js b/bin/worker.js index c43e55e..6cbcd60 100644 --- a/bin/worker.js +++ b/bin/worker.js @@ -2,6 +2,7 @@ import * as env from 'lib0/environment' import * as yredis from '@y/redis' +import * as Y from 'yjs' const redisPrefix = env.getConf('redis-prefix') || 'y' const postgresUrl = env.getConf('postgres') @@ -27,4 +28,28 @@ if (s3Endpoint) { store = createMemoryStorage() } -yredis.createWorker(store, redisPrefix) +let ydocUpdateCallback = env.getConf('ydoc-update-callback') +if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') { + ydocUpdateCallback += '/' +} + +/** + * @type {(room: string, ydoc: Y.Doc) => Promise} + */ +const updateCallback = async (room, ydoc) => { + if (ydocUpdateCallback != null) { + // call YDOC_UPDATE_CALLBACK here + const formData = new FormData() + // @todo only convert ydoc to updatev2 once + formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)])) + // @todo should add a timeout to fetch (see fetch signal abortcontroller) + const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' }) + if (!res.ok) { + console.error(`Issue sending data to YDOC_UPDATE_CALLBACK. status="${res.status}" statusText="${res.statusText}"`) + } + } +} + +yredis.createWorker(store, redisPrefix, { + updateCallback +}) diff --git a/src/api.js b/src/api.js index a4ea43a..c2f8b45 100644 --- a/src/api.js +++ b/src/api.js @@ -18,11 +18,6 @@ const logWorker = logging.createModuleLogger('@y/redis/api/worker') export const redisUrl = env.ensureConf('redis') -let ydocUpdateCallback = env.getConf('ydoc-update-callback') -if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') { - ydocUpdateCallback += '/' -} - /** * @param {string} a * @param {string} b @@ -254,12 +249,9 @@ export class Api { } /** - * @param {Object} opts - * @param {number} [opts.blockTime] - * @param {number} [opts.tryReclaimCount] - * @param {number} [opts.tryClaimCount] + * @param {WorkerOpts} opts */ - async consumeWorkerQueue ({ blockTime = 1000, tryReclaimCount = 5, tryClaimCount = 5 } = {}) { + async consumeWorkerQueue ({ blockTime = 1000, tryReclaimCount = 5, tryClaimCount = 5, updateCallback = async () => {} }) { /** * @type {Array<{stream: string, id: string}>} */ @@ -291,6 +283,8 @@ export class Api { logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) } else { const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) + // @todo, make sure that awareness by this.getDoc is eventually destroyed, or doesn't + // register a timeout anymore const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid) const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0])) await this.store.persistDoc(room, docid, ydoc) @@ -304,17 +298,7 @@ export class Api { ]) logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime }) try { - if (ydocUpdateCallback != null) { - // call YDOC_UPDATE_CALLBACK here - const formData = new FormData() - // @todo only convert ydoc to updatev2 once - formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)])) - // @todo should add a timeout to fetch (see fetch signal abortcontroller) - const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' }) - if (!res.ok) { - console.error(`Issue sending data to YDOC_UPDATE_CALLBACK. status="${res.status}" statusText="${res.statusText}"`) - } - } + await updateCallback(room, ydoc) } catch (e) { console.error(e) } @@ -331,20 +315,30 @@ export class Api { } } +/** + * @typedef {Object} WorkerOpts + * @property {(room: string, ydoc: Y.Doc) => Promise} [WorkerOpts.updateCallback] + * @property {number} [WorkerOpts.blockTime] + * @property {number} [WorkerOpts.tryReclaimCount] + * @property {number} [WorkerOpts.tryClaimCount] + */ + /** * @param {import('./storage.js').AbstractStorage} store * @param {string} redisPrefix + * @param {WorkerOpts} opts */ -export const createWorker = async (store, redisPrefix) => { +export const createWorker = async (store, redisPrefix, opts) => { const a = await createApiClient(store, redisPrefix) - return new Worker(a) + return new Worker(a, opts) } export class Worker { /** * @param {Api} client + * @param {WorkerOpts} opts */ - constructor (client) { + constructor (client, opts) { this.client = client logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime }) ;(async () => { @@ -352,7 +346,7 @@ export class Worker { const timeDiff = startRedisTime.getTime() - time.getUnixTime() while (!client._destroyed) { try { - const tasks = await client.consumeWorkerQueue() + const tasks = await client.consumeWorkerQueue(opts) if (tasks.length === 0 || (client.redisMinMessageLifetime > time.getUnixTime() + timeDiff - number.parseInt(tasks[0].id.split('-')[0]))) { await promise.wait(client.redisMinMessageLifetime / 2) } diff --git a/tests/api.tests.js b/tests/api.tests.js index 3fa13a2..45cd2b4 100644 --- a/tests/api.tests.js +++ b/tests/api.tests.js @@ -44,7 +44,7 @@ const createTestCase = async tc => { } const createWorker = async () => { - const worker = await api.createWorker(store, redisPrefix) + const worker = await api.createWorker(store, redisPrefix, {}) worker.client.redisMinMessageLifetime = 200 worker.client.redisWorkerTimeout = 50 prevClients.push(worker.client) diff --git a/tests/ws.tests.js b/tests/ws.tests.js index cda24f9..49aa215 100644 --- a/tests/ws.tests.js +++ b/tests/ws.tests.js @@ -29,7 +29,7 @@ const createWsClient = (tc, room) => { } const createWorker = async () => { - const worker = await api.createWorker(utils.store, utils.redisPrefix) + const worker = await api.createWorker(utils.store, utils.redisPrefix, {}) worker.client.redisMinMessageLifetime = 500 worker.client.redisWorkerTimeout = 100 utils.prevClients.push(worker.client)