Skip to content

Commit

Permalink
add option for custom updateCallback when registering worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed Jun 25, 2024
1 parent fa69dda commit 89ed541
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 28 deletions.
27 changes: 26 additions & 1 deletion bin/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import * as env from 'lib0/environment'
import * as yredis from '@y/redis'
import * as Y from 'yjs'

const redisPrefix = env.getConf('redis-prefix') || 'y'
const postgresUrl = env.getConf('postgres')
Expand All @@ -27,4 +28,28 @@ if (s3Endpoint) {
store = createMemoryStorage()
}

yredis.createWorker(store, redisPrefix)
let ydocUpdateCallback = env.getConf('ydoc-update-callback')
if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') {
ydocUpdateCallback += '/'
}

/**
* @type {(room: string, ydoc: Y.Doc) => Promise<void>}
*/
const updateCallback = async (room, ydoc) => {
if (ydocUpdateCallback != null) {
// call YDOC_UPDATE_CALLBACK here
const formData = new FormData()
// @todo only convert ydoc to updatev2 once
formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)]))
// @todo should add a timeout to fetch (see fetch signal abortcontroller)
const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' })
if (!res.ok) {
console.error(`Issue sending data to YDOC_UPDATE_CALLBACK. status="${res.status}" statusText="${res.statusText}"`)
}
}
}

yredis.createWorker(store, redisPrefix, {
updateCallback
})
44 changes: 19 additions & 25 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ const logWorker = logging.createModuleLogger('@y/redis/api/worker')

export const redisUrl = env.ensureConf('redis')

let ydocUpdateCallback = env.getConf('ydoc-update-callback')
if (ydocUpdateCallback != null && ydocUpdateCallback.slice(-1) !== '/') {
ydocUpdateCallback += '/'
}

/**
* @param {string} a
* @param {string} b
Expand Down Expand Up @@ -254,12 +249,9 @@ export class Api {
}

/**
* @param {Object} opts
* @param {number} [opts.blockTime]
* @param {number} [opts.tryReclaimCount]
* @param {number} [opts.tryClaimCount]
* @param {WorkerOpts} opts
*/
async consumeWorkerQueue ({ blockTime = 1000, tryReclaimCount = 5, tryClaimCount = 5 } = {}) {
async consumeWorkerQueue ({ blockTime = 1000, tryReclaimCount = 5, tryClaimCount = 5, updateCallback = async () => {} }) {
/**
* @type {Array<{stream: string, id: string}>}
*/
Expand Down Expand Up @@ -291,6 +283,8 @@ export class Api {
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
} else {
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
// @todo, make sure that awareness by this.getDoc is eventually destroyed, or doesn't
// register a timeout anymore
const { ydoc, storeReferences, redisLastId } = await this.getDoc(room, docid)
const lastId = math.max(number.parseInt(redisLastId.split('-')[0]), number.parseInt(task.id.split('-')[0]))
await this.store.persistDoc(room, docid, ydoc)
Expand All @@ -304,17 +298,7 @@ export class Api {
])
logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime })
try {
if (ydocUpdateCallback != null) {
// call YDOC_UPDATE_CALLBACK here
const formData = new FormData()
// @todo only convert ydoc to updatev2 once
formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)]))
// @todo should add a timeout to fetch (see fetch signal abortcontroller)
const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' })
if (!res.ok) {
console.error(`Issue sending data to YDOC_UPDATE_CALLBACK. status="${res.status}" statusText="${res.statusText}"`)
}
}
await updateCallback(room, ydoc)
} catch (e) {
console.error(e)
}
Expand All @@ -331,28 +315,38 @@ export class Api {
}
}

/**
* @typedef {Object} WorkerOpts
* @property {(room: string, ydoc: Y.Doc) => Promise<void>} [WorkerOpts.updateCallback]
* @property {number} [WorkerOpts.blockTime]
* @property {number} [WorkerOpts.tryReclaimCount]
* @property {number} [WorkerOpts.tryClaimCount]
*/

/**
* @param {import('./storage.js').AbstractStorage} store
* @param {string} redisPrefix
* @param {WorkerOpts} opts
*/
export const createWorker = async (store, redisPrefix) => {
export const createWorker = async (store, redisPrefix, opts) => {
const a = await createApiClient(store, redisPrefix)
return new Worker(a)
return new Worker(a, opts)
}

export class Worker {
/**
* @param {Api} client
* @param {WorkerOpts} opts
*/
constructor (client) {
constructor (client, opts) {
this.client = client
logWorker('Created worker process ', { id: client.consumername, prefix: client.prefix, minMessageLifetime: client.redisMinMessageLifetime })
;(async () => {
const startRedisTime = await client.redis.time()
const timeDiff = startRedisTime.getTime() - time.getUnixTime()
while (!client._destroyed) {
try {
const tasks = await client.consumeWorkerQueue()
const tasks = await client.consumeWorkerQueue(opts)
if (tasks.length === 0 || (client.redisMinMessageLifetime > time.getUnixTime() + timeDiff - number.parseInt(tasks[0].id.split('-')[0]))) {
await promise.wait(client.redisMinMessageLifetime / 2)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/api.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const createTestCase = async tc => {
}

const createWorker = async () => {
const worker = await api.createWorker(store, redisPrefix)
const worker = await api.createWorker(store, redisPrefix, {})
worker.client.redisMinMessageLifetime = 200
worker.client.redisWorkerTimeout = 50
prevClients.push(worker.client)
Expand Down
2 changes: 1 addition & 1 deletion tests/ws.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const createWsClient = (tc, room) => {
}

const createWorker = async () => {
const worker = await api.createWorker(utils.store, utils.redisPrefix)
const worker = await api.createWorker(utils.store, utils.redisPrefix, {})
worker.client.redisMinMessageLifetime = 500
worker.client.redisWorkerTimeout = 100
utils.prevClients.push(worker.client)
Expand Down

0 comments on commit 89ed541

Please sign in to comment.