diff --git a/.changeset/fluffy-seals-vanish.md b/.changeset/fluffy-seals-vanish.md new file mode 100644 index 000000000000..b3b5d65f5d86 --- /dev/null +++ b/.changeset/fluffy-seals-vanish.md @@ -0,0 +1,5 @@ +--- +"wrangler": patch +--- + +chore: small refactor of dev registry to split file-based and server-based registries into separate files diff --git a/packages/wrangler/src/dev-registry.ts b/packages/wrangler/src/dev-registry.ts deleted file mode 100644 index 895a05baabed..000000000000 --- a/packages/wrangler/src/dev-registry.ts +++ /dev/null @@ -1,433 +0,0 @@ -import events from "node:events"; -import { utimesSync } from "node:fs"; -import { - mkdir, - readdir, - readFile, - stat, - unlink, - writeFile, -} from "node:fs/promises"; -import { createServer } from "node:http"; -import net from "node:net"; -import path from "node:path"; -import * as util from "node:util"; -import bodyParser from "body-parser"; -import { watch } from "chokidar"; -import express from "express"; -import { createHttpTerminator } from "http-terminator"; -import { fetch } from "undici"; -import { version as wranglerVersion } from "../package.json"; -import { getFlag } from "./experimental-flags"; -import { getGlobalWranglerConfigPath } from "./global-wrangler-config-path"; -import { logger } from "./logger"; -import type { Binding } from "./api"; -import type { Config } from "./config"; -import type { HttpTerminator } from "http-terminator"; -import type { Server } from "node:http"; - -// Safety of `!`: `parseInt(undefined)` is NaN -// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -let DEV_REGISTRY_PORT = parseInt(process.env.WRANGLER_WORKER_REGISTRY_PORT!); -if (Number.isNaN(DEV_REGISTRY_PORT)) { - DEV_REGISTRY_PORT = 6284; -} -const DEV_REGISTRY_HOST = `http://127.0.0.1:${DEV_REGISTRY_PORT}`; - -const DEV_REGISTRY_PATH = path.join(getGlobalWranglerConfigPath(), "registry"); - -let globalServer: Server | null; -let globalTerminator: HttpTerminator; - -let globalWatcher: ReturnType | undefined; -let globalWorkers: WorkerRegistry | undefined; - -const heartbeats = new Map>(); - -export type WorkerRegistry = Record; - -export type WorkerEntrypointsDefinition = Record< - /* name */ "default" | string, - { host: string; port: number } | undefined ->; - -export type WorkerDefinition = { - port: number | undefined; - protocol: "http" | "https" | undefined; - host: string | undefined; - mode: "local" | "remote"; - headers?: Record; - entrypointAddresses?: WorkerEntrypointsDefinition; - durableObjects: { name: string; className: string }[]; - durableObjectsHost?: string; - durableObjectsPort?: number; -}; - -async function loadWorkerDefinitions(): Promise { - await mkdir(DEV_REGISTRY_PATH, { recursive: true }); - globalWorkers ??= {}; - const newWorkers = new Set(); - const workerDefinitions = await readdir(DEV_REGISTRY_PATH); - for (const workerName of workerDefinitions) { - try { - const file = await readFile( - path.join(DEV_REGISTRY_PATH, workerName), - "utf8" - ); - const stats = await stat(path.join(DEV_REGISTRY_PATH, workerName)); - // Cleanup existing workers older than 10 minutes - if (stats.mtime.getTime() < Date.now() - 600000) { - await unregisterWorker(workerName); - } else { - globalWorkers[workerName] = JSON.parse(file); - newWorkers.add(workerName); - } - } catch (e) { - // This can safely be ignored. It generally indicates the worker was too old and was removed by a parallel Wrangler process - logger.debug( - "Error while loading worker definition from the registry", - e - ); - } - } - - for (const worker of Object.keys(globalWorkers)) { - if (!newWorkers.has(worker)) { - delete globalWorkers[worker]; - } - } - return globalWorkers; -} - -/** - * A helper function to check whether our service registry is already running - */ -async function isPortAvailable() { - return new Promise((resolve, reject) => { - const netServer = net - .createServer() - .once("error", (err) => { - netServer.close(); - if ((err as unknown as { code: string }).code === "EADDRINUSE") { - resolve(false); - } else { - reject(err); - } - }) - .once("listening", () => { - netServer.close(); - resolve(true); - }); - netServer.listen(DEV_REGISTRY_PORT, "127.0.0.1"); - }); -} - -const jsonBodyParser = bodyParser.json(); - -export async function startWorkerRegistryServer(port: number) { - const app = express(); - - let workers: WorkerRegistry = {}; - app - .get("/workers", async (req, res) => { - res.json(workers); - }) - .post("/workers/:workerId", jsonBodyParser, async (req, res) => { - workers[req.params.workerId] = req.body; - res.json(null); - }) - .delete(`/workers/:workerId`, async (req, res) => { - delete workers[req.params.workerId]; - res.json(null); - }) - .delete("/workers", async (req, res) => { - workers = {}; - res.json(null); - }); - - const appServer = createServer(app); - const appTerminator = createHttpTerminator({ server: appServer }); - - const listeningPromise = events.once(appServer, "listening"); - appServer.listen(port, "127.0.0.1"); - await listeningPromise; - - return { server: appServer, terminator: appTerminator }; -} - -/** - * Start the service registry. It's a simple server - * that exposes endpoints for registering and unregistering - * services, as well as getting the state of the registry. - */ -export async function startWorkerRegistry( - listener?: (registry: WorkerRegistry | undefined) => void -) { - if (getFlag("FILE_BASED_REGISTRY")) { - globalWatcher ??= watch(DEV_REGISTRY_PATH, { - persistent: true, - }).on("all", async () => { - await loadWorkerDefinitions(); - listener?.({ ...globalWorkers }); - }); - return; - } - if ((await isPortAvailable()) && !globalServer) { - const result = await startWorkerRegistryServer(DEV_REGISTRY_PORT); - globalServer = result.server; - globalTerminator = result.terminator; - - /** - * The registry server may have already been started by another wrangler process. - * If wrangler processes are run in parallel, isPortAvailable() can return true - * while another process spins up the server - */ - globalServer.once("error", (err) => { - if ((err as unknown as { code: string }).code !== "EADDRINUSE") { - throw err; - } - }); - - /** - * The registry server may close. Reset the server to null for restart. - */ - globalServer.on("close", () => { - globalServer = null; - }); - } -} - -/** - * Stop the service registry. - */ -export async function stopWorkerRegistry() { - if (getFlag("FILE_BASED_REGISTRY") || globalWatcher) { - await globalWatcher?.close(); - for (const heartbeat of heartbeats) { - clearInterval(heartbeat[1]); - } - return; - } - await globalTerminator?.terminate(); - globalServer = null; -} - -/** - * Register a worker in the registry. - */ -export async function registerWorker( - name: string, - definition: WorkerDefinition -) { - if (getFlag("FILE_BASED_REGISTRY")) { - const existingHeartbeat = heartbeats.get(name); - if (existingHeartbeat) { - clearInterval(existingHeartbeat); - } - await mkdir(DEV_REGISTRY_PATH, { recursive: true }); - await writeFile( - path.join(DEV_REGISTRY_PATH, name), - // We don't currently do anything with the stored Wrangler version, - // but if we need to make breaking changes to this format in the future - // we can use this field to present useful messaging - JSON.stringify({ ...definition, wranglerVersion }, null, 2) - ); - heartbeats.set( - name, - setInterval(() => { - utimesSync(path.join(DEV_REGISTRY_PATH, name), new Date(), new Date()); - }, 30_000) - ); - return; - } - /** - * Prevent the dev registry be closed. - */ - await startWorkerRegistry(); - try { - return await fetch(`${DEV_REGISTRY_HOST}/workers/${name}`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(definition), - }); - } catch (e) { - if ( - !["ECONNRESET", "ECONNREFUSED"].includes( - (e as unknown as { cause?: { code?: string } }).cause?.code || "___" - ) - ) { - logger.error("Failed to register worker in local service registry", e); - } else { - logger.debug("Failed to register worker in local service registry", e); - } - } -} - -/** - * Unregister a worker from the registry. - */ -export async function unregisterWorker(name: string) { - if (getFlag("FILE_BASED_REGISTRY")) { - try { - await unlink(path.join(DEV_REGISTRY_PATH, name)); - const existingHeartbeat = heartbeats.get(name); - if (existingHeartbeat) { - clearInterval(existingHeartbeat); - } - } catch (e) { - logger.debug("failed to unregister worker", e); - } - return; - } - try { - await fetch(`${DEV_REGISTRY_HOST}/workers/${name}`, { - method: "DELETE", - }); - } catch (e) { - if ( - !["ECONNRESET", "ECONNREFUSED"].includes( - (e as unknown as { cause?: { code?: string } }).cause?.code || "___" - ) - ) { - throw e; - // logger.error("failed to unregister worker", e); - } - } -} - -/** - * Get the state of the service registry. - */ -export async function getRegisteredWorkers(): Promise< - WorkerRegistry | undefined -> { - if (getFlag("FILE_BASED_REGISTRY")) { - globalWorkers = await loadWorkerDefinitions(); - return { ...globalWorkers }; - } - - try { - const response = await fetch(`${DEV_REGISTRY_HOST}/workers`); - return (await response.json()) as WorkerRegistry; - } catch (e) { - if ( - !["ECONNRESET", "ECONNREFUSED"].includes( - (e as unknown as { cause?: { code?: string } }).cause?.code || "___" - ) - ) { - throw e; - } - } -} - -/** - * a function that takes your serviceNames and durableObjectNames and returns a - * list of the running workers that we're bound to - */ -export async function getBoundRegisteredWorkers( - { - name, - services, - durableObjects, - }: { - name: string | undefined; - services: - | Config["services"] - | Extract[] - | undefined; - durableObjects: - | Config["durable_objects"] - | { bindings: Extract[] } - | undefined; - }, - existingWorkerDefinitions?: WorkerRegistry | undefined -): Promise { - const serviceNames = (services || []).map( - (serviceBinding) => serviceBinding.service - ); - const durableObjectServices = ( - durableObjects || { bindings: [] } - ).bindings.map((durableObjectBinding) => durableObjectBinding.script_name); - - if (serviceNames.length === 0 && durableObjectServices.length === 0) { - return {}; - } - const workerDefinitions = - existingWorkerDefinitions ?? (await getRegisteredWorkers()); - - const filteredWorkers = Object.fromEntries( - Object.entries(workerDefinitions || {}).filter( - ([key, _value]) => - key !== name && // Always exclude current worker to avoid infinite loops - (serviceNames.includes(key) || durableObjectServices.includes(key)) - ) - ); - return filteredWorkers; -} - -/** - * A react-free version of the above hook - */ -export async function devRegistry( - cb: (workers: WorkerRegistry | undefined) => void -): Promise<(name?: string) => Promise> { - let previousRegistry: WorkerRegistry | undefined; - - let interval: ReturnType; - - let hasFailedToFetch = false; - - // The new file based registry supports a much more performant listener callback - if (getFlag("FILE_BASED_REGISTRY")) { - await startWorkerRegistry(async (registry) => { - if (!util.isDeepStrictEqual(registry, previousRegistry)) { - previousRegistry = registry; - cb(registry); - } - }); - } else { - try { - await startWorkerRegistry(); - } catch (err) { - logger.error("failed to start worker registry", err); - } - // Else we need to fall back to a polling based approach - interval = setInterval(async () => { - try { - const registry = await getRegisteredWorkers(); - if (!util.isDeepStrictEqual(registry, previousRegistry)) { - previousRegistry = registry; - cb(registry); - } - } catch (err) { - if (!hasFailedToFetch) { - hasFailedToFetch = true; - logger.warn("Failed to get worker definitions", err); - } - } - }, 300); - } - - return async (name) => { - interval && clearInterval(interval); - try { - const [unregisterResult, stopRegistryResult] = await Promise.allSettled([ - name ? unregisterWorker(name) : Promise.resolve(), - stopWorkerRegistry(), - ]); - if (unregisterResult.status === "rejected") { - logger.error("Failed to unregister worker", unregisterResult.reason); - } - if (stopRegistryResult.status === "rejected") { - logger.error( - "Failed to stop worker registry", - stopRegistryResult.reason - ); - } - } catch (err) { - logger.error("Failed to cleanup dev registry", err); - } - }; -} diff --git a/packages/wrangler/src/dev-registry/file-registry.ts b/packages/wrangler/src/dev-registry/file-registry.ts new file mode 100644 index 000000000000..77e5eae8b683 --- /dev/null +++ b/packages/wrangler/src/dev-registry/file-registry.ts @@ -0,0 +1,161 @@ +import { utimesSync } from "node:fs"; +import { + mkdir, + readdir, + readFile, + stat, + unlink, + writeFile, +} from "node:fs/promises"; +import path from "node:path"; +import * as util from "node:util"; +import { watch } from "chokidar"; +import { version as wranglerVersion } from "../../package.json"; +import { getGlobalWranglerConfigPath } from "../global-wrangler-config-path"; +import { logger } from "../logger"; +import type { WorkerDefinition, WorkerRegistry } from "./types"; + +const DEV_REGISTRY_PATH = path.join(getGlobalWranglerConfigPath(), "registry"); +const heartbeats = new Map(); +let globalWorkers: WorkerRegistry | undefined; +let globalWatcher: ReturnType | undefined; + +export const FileRegistry = { + devRegistry, + getRegisteredWorkers, + registerWorker, + startWorkerRegistry, + stopWorkerRegistry, + unregisterWorker, +}; + +async function devRegistry( + cb: (workers: WorkerRegistry | undefined) => void +): Promise<(name?: string) => Promise> { + let previousRegistry: WorkerRegistry | undefined; + + await startWorkerRegistry(async (registry) => { + if (!util.isDeepStrictEqual(registry, previousRegistry)) { + previousRegistry = registry; + cb(registry); + } + }); + + return async (name?: string) => { + try { + const [unregisterResult, stopRegistryResult] = await Promise.allSettled([ + name ? unregisterWorker(name) : Promise.resolve(), + stopWorkerRegistry(), + ]); + if (unregisterResult.status === "rejected") { + logger.error("Failed to unregister worker", unregisterResult.reason); + } + if (stopRegistryResult.status === "rejected") { + logger.error( + "Failed to stop worker registry", + stopRegistryResult.reason + ); + } + } catch (err) { + logger.error("Failed to cleanup dev registry", err); + } + }; +} + +async function startWorkerRegistry( + cb?: (registry: WorkerRegistry | undefined) => void +) { + globalWatcher ??= watch(DEV_REGISTRY_PATH, { + persistent: true, + }).on("all", async () => { + await loadWorkerDefinitions(); + cb?.({ ...globalWorkers }); + }); + return; +} + +async function stopWorkerRegistry() { + if (globalWatcher) { + await globalWatcher?.close(); + for (const heartbeat of heartbeats) { + clearInterval(heartbeat[1]); + } + return; + } +} + +async function registerWorker(name: string, definition: WorkerDefinition) { + const existingHeartbeat = heartbeats.get(name); + if (existingHeartbeat) { + clearInterval(existingHeartbeat); + } + await mkdir(DEV_REGISTRY_PATH, { recursive: true }); + await writeFile( + path.join(DEV_REGISTRY_PATH, name), + // We don't currently do anything with the stored Wrangler version, + // but if we need to make breaking changes to this format in the future + // we can use this field to present useful messaging + JSON.stringify({ ...definition, wranglerVersion }, null, 2) + ); + heartbeats.set( + name, + setInterval(() => { + utimesSync(path.join(DEV_REGISTRY_PATH, name), new Date(), new Date()); + }, 30_000) + ); + return; +} + +async function getRegisteredWorkers(): Promise { + globalWorkers = await loadWorkerDefinitions(); + return { ...globalWorkers }; +} + +async function unregisterWorker(name: string) { + try { + await unlink(path.join(DEV_REGISTRY_PATH, name)); + const existingHeartbeat = heartbeats.get(name); + if (existingHeartbeat) { + clearInterval(existingHeartbeat); + } + } catch (e) { + logger.debug("failed to unregister worker", e); + } + return; +} + +export async function loadWorkerDefinitions(): Promise { + await mkdir(DEV_REGISTRY_PATH, { recursive: true }); + globalWorkers ??= {}; + const newWorkers = new Set(); + const workerDefinitions = await readdir(DEV_REGISTRY_PATH); + for (const workerName of workerDefinitions) { + try { + const file = await readFile( + path.join(DEV_REGISTRY_PATH, workerName), + "utf8" + ); + const stats = await stat(path.join(DEV_REGISTRY_PATH, workerName)); + // Cleanup existing workers older than 10 minutes + if (stats.mtime.getTime() < Date.now() - 600000) { + await unregisterWorker(workerName); + } else { + globalWorkers[workerName] = JSON.parse(file); + newWorkers.add(workerName); + } + } catch (e) { + // This can safely be ignored. It generally indicates the worker was too old and was removed by a parallel Wrangler process + logger.debug( + "Error while loading worker definition from the registry", + e + ); + } + } + + for (const worker of Object.keys(globalWorkers)) { + if (!newWorkers.has(worker)) { + delete globalWorkers[worker]; + } + } + return globalWorkers; +} diff --git a/packages/wrangler/src/dev-registry/index.ts b/packages/wrangler/src/dev-registry/index.ts new file mode 100644 index 000000000000..61d7ad70d862 --- /dev/null +++ b/packages/wrangler/src/dev-registry/index.ts @@ -0,0 +1,138 @@ +import { getFlag } from "../experimental-flags"; +import { FileRegistry } from "./file-registry"; +import { ServerRegistry } from "./server-registry"; +import type { Binding } from "../api"; +import type { Config } from "../config"; +import type { + WorkerDefinition, + WorkerEntrypointsDefinition, + WorkerRegistry, +} from "./types"; + +export type { WorkerDefinition, WorkerRegistry, WorkerEntrypointsDefinition }; + +// Safety of `!`: `parseInt(undefined)` is NaN +// eslint-disable-next-line @typescript-eslint/no-non-null-assertion +let DEV_REGISTRY_PORT = parseInt(process.env.WRANGLER_WORKER_REGISTRY_PORT!); +if (Number.isNaN(DEV_REGISTRY_PORT)) { + DEV_REGISTRY_PORT = 6284; +} + +export const startWorkerRegistryServer = + ServerRegistry.startWorkerRegistryServer; + +/** + * Start the service registry. It's a simple server + * that exposes endpoints for registering and unregistering + * services, as well as getting the state of the registry. + */ +export async function startWorkerRegistry( + listener?: (registry: WorkerRegistry | undefined) => void +) { + if (getFlag("FILE_BASED_REGISTRY")) { + return FileRegistry.startWorkerRegistry(listener); + } + + return ServerRegistry.startWorkerRegistry(); +} + +/** + * Stop the service registry. + */ +export async function stopWorkerRegistry() { + if (getFlag("FILE_BASED_REGISTRY")) { + return FileRegistry.stopWorkerRegistry(); + } + return ServerRegistry.stopWorkerRegistry(); +} + +/** + * Register a worker in the registry. + */ +export async function registerWorker( + name: string, + definition: WorkerDefinition +) { + if (getFlag("FILE_BASED_REGISTRY")) { + return FileRegistry.registerWorker(name, definition); + } + return ServerRegistry.registerWorker(name, definition); +} + +/** + * Unregister a worker from the registry. + */ +export async function unregisterWorker(name: string) { + if (getFlag("FILE_BASED_REGISTRY")) { + return FileRegistry.unregisterWorker(name); + } + return ServerRegistry.unregisterWorker(name); +} + +/** + * Get the state of the service registry. + */ +export async function getRegisteredWorkers(): Promise< + WorkerRegistry | undefined +> { + if (getFlag("FILE_BASED_REGISTRY")) { + return FileRegistry.getRegisteredWorkers(); + } + + return ServerRegistry.getRegisteredWorkers(); +} + +/** + * a function that takes your serviceNames and durableObjectNames and returns a + * list of the running workers that we're bound to + */ +export async function getBoundRegisteredWorkers( + { + name, + services, + durableObjects, + }: { + name: string | undefined; + services: + | Config["services"] + | Extract[] + | undefined; + durableObjects: + | Config["durable_objects"] + | { bindings: Extract[] } + | undefined; + }, + existingWorkerDefinitions?: WorkerRegistry | undefined +): Promise { + const serviceNames = (services || []).map( + (serviceBinding) => serviceBinding.service + ); + const durableObjectServices = ( + durableObjects || { bindings: [] } + ).bindings.map((durableObjectBinding) => durableObjectBinding.script_name); + + if (serviceNames.length === 0 && durableObjectServices.length === 0) { + return {}; + } + const workerDefinitions = + existingWorkerDefinitions ?? (await getRegisteredWorkers()); + + const filteredWorkers = Object.fromEntries( + Object.entries(workerDefinitions || {}).filter( + ([key, _value]) => + key !== name && // Always exclude current worker to avoid infinite loops + (serviceNames.includes(key) || durableObjectServices.includes(key)) + ) + ); + return filteredWorkers; +} + +export async function devRegistry( + cb: (workers: WorkerRegistry | undefined) => void +): Promise<(name?: string) => Promise> { + if (getFlag("FILE_BASED_REGISTRY")) { + return FileRegistry.devRegistry(cb); + } + + return ServerRegistry.devRegistry(cb); +} diff --git a/packages/wrangler/src/dev-registry/server-registry.ts b/packages/wrangler/src/dev-registry/server-registry.ts new file mode 100644 index 000000000000..04bb8920e6a2 --- /dev/null +++ b/packages/wrangler/src/dev-registry/server-registry.ts @@ -0,0 +1,256 @@ +import events from "node:events"; +import { createServer } from "node:http"; +import net from "node:net"; +import * as util from "node:util"; +import bodyParser from "body-parser"; +import express from "express"; +import { createHttpTerminator } from "http-terminator"; +import { fetch } from "undici"; +import { logger } from "../logger"; +import type { WorkerDefinition, WorkerRegistry } from "./types"; +import type { watch } from "chokidar"; +import type { HttpTerminator } from "http-terminator"; +import type { Server } from "node:http"; + +let DEV_REGISTRY_PORT = parseInt(process.env.WRANGLER_WORKER_REGISTRY_PORT!); // eslint-disable-line @typescript-eslint/no-non-null-assertion +if (Number.isNaN(DEV_REGISTRY_PORT)) { + DEV_REGISTRY_PORT = 6284; +} +const DEV_REGISTRY_HOST = `http://127.0.0.1:${DEV_REGISTRY_PORT}`; +const jsonBodyParser = bodyParser.json(); +let globalServer: Server | null; +let globalTerminator: HttpTerminator; + +let globalWatcher: ReturnType | undefined; +const heartbeats = new Map>(); + +export const ServerRegistry = { + devRegistry, + getRegisteredWorkers, + registerWorker, + startWorkerRegistry, + startWorkerRegistryServer, + stopWorkerRegistry, + unregisterWorker, +}; + +async function devRegistry( + cb: (workers: WorkerRegistry | undefined) => void +): Promise<(name?: string) => Promise> { + let previousRegistry: WorkerRegistry | undefined; + let hasFailedToFetch = false; + + try { + await startWorkerRegistry(); + } catch (err) { + logger.error("failed to start worker registry", err); + } + + const interval = setInterval(async () => { + try { + const registry = await getRegisteredWorkers(); + if (!util.isDeepStrictEqual(registry, previousRegistry)) { + previousRegistry = registry; + cb(registry); + } + } catch (err) { + if (!hasFailedToFetch) { + hasFailedToFetch = true; + logger.warn("Failed to get worker definitions", err); + } + } + }, 300); + + return async (name?: string) => { + clearInterval(interval); + try { + const [unregisterResult, stopRegistryResult] = await Promise.allSettled([ + name ? unregisterWorker(name) : Promise.resolve(), + stopWorkerRegistry(), + ]); + if (unregisterResult.status === "rejected") { + logger.error("Failed to unregister worker", unregisterResult.reason); + } + if (stopRegistryResult.status === "rejected") { + logger.error( + "Failed to stop worker registry", + stopRegistryResult.reason + ); + } + } catch (err) { + logger.error("Failed to cleanup dev registry", err); + } + }; +} + +/** + * Get the state of the service registry. + */ +export async function getRegisteredWorkers(): Promise< + WorkerRegistry | undefined +> { + try { + const response = await fetch(`${DEV_REGISTRY_HOST}/workers`); + return (await response.json()) as WorkerRegistry; + } catch (e) { + if ( + !["ECONNRESET", "ECONNREFUSED"].includes( + (e as unknown as { cause?: { code?: string } }).cause?.code || "___" + ) + ) { + throw e; + } + } +} + +/** + * Register a worker in the registry. + */ +export async function registerWorker( + name: string, + definition: WorkerDefinition +) { + /** + * Prevent the dev registry be closed. + */ + await startWorkerRegistry(); + try { + return await fetch(`${DEV_REGISTRY_HOST}/workers/${name}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(definition), + }); + } catch (e) { + if ( + !["ECONNRESET", "ECONNREFUSED"].includes( + (e as unknown as { cause?: { code?: string } }).cause?.code || "___" + ) + ) { + logger.error("Failed to register worker in local service registry", e); + } else { + logger.debug("Failed to register worker in local service registry", e); + } + } +} + +/** + * Start the service registry. It's a simple server + * that exposes endpoints for registering and unregistering + * services, as well as getting the state of the registry. + */ +export async function startWorkerRegistry() { + if ((await isPortAvailable()) && !globalServer) { + const result = await startWorkerRegistryServer(DEV_REGISTRY_PORT); + globalServer = result.server; + globalTerminator = result.terminator; + + /** + * The registry server may have already been started by another wrangler process. + * If wrangler processes are run in parallel, isPortAvailable() can return true + * while another process spins up the server + */ + globalServer.once("error", (err) => { + if ((err as unknown as { code: string }).code !== "EADDRINUSE") { + throw err; + } + }); + + /** + * The registry server may close. Reset the server to null for restart. + */ + globalServer.on("close", () => { + globalServer = null; + }); + } +} + +export async function startWorkerRegistryServer(port: number) { + const app = express(); + + let workers: WorkerRegistry = {}; + app + .get("/workers", async (req, res) => { + res.json(workers); + }) + .post("/workers/:workerId", jsonBodyParser, async (req, res) => { + workers[req.params.workerId] = req.body; + res.json(null); + }) + .delete(`/workers/:workerId`, async (req, res) => { + delete workers[req.params.workerId]; + res.json(null); + }) + .delete("/workers", async (req, res) => { + workers = {}; + res.json(null); + }); + + const appServer = createServer(app); + const appTerminator = createHttpTerminator({ server: appServer }); + + const listeningPromise = events.once(appServer, "listening"); + appServer.listen(port, "127.0.0.1"); + await listeningPromise; + + return { server: appServer, terminator: appTerminator }; +} + +/** + * Stop the service registry. + */ +export async function stopWorkerRegistry() { + if (globalWatcher) { + await globalWatcher?.close(); + for (const heartbeat of heartbeats) { + clearInterval(heartbeat[1]); + } + return; + } + await globalTerminator?.terminate(); + globalServer = null; +} + +/** + * Unregister a worker from the registry. + */ +export async function unregisterWorker(name: string) { + try { + await fetch(`${DEV_REGISTRY_HOST}/workers/${name}`, { + method: "DELETE", + }); + } catch (e) { + if ( + !["ECONNRESET", "ECONNREFUSED"].includes( + (e as unknown as { cause?: { code?: string } }).cause?.code || "___" + ) + ) { + throw e; + // logger.error("failed to unregister worker", e); + } + } +} + +/** + * A helper function to check whether our service registry is already running + */ +async function isPortAvailable() { + return new Promise((resolve, reject) => { + const netServer = net + .createServer() + .once("error", (err) => { + netServer.close(); + if ((err as unknown as { code: string }).code === "EADDRINUSE") { + resolve(false); + } else { + reject(err); + } + }) + .once("listening", () => { + netServer.close(); + resolve(true); + }); + netServer.listen(DEV_REGISTRY_PORT, "127.0.0.1"); + }); +} diff --git a/packages/wrangler/src/dev-registry/types.ts b/packages/wrangler/src/dev-registry/types.ts new file mode 100644 index 000000000000..0cdf02eefab9 --- /dev/null +++ b/packages/wrangler/src/dev-registry/types.ts @@ -0,0 +1,18 @@ +export type WorkerRegistry = Record; + +export type WorkerEntrypointsDefinition = Record< + "default" | string, + { host: string; port: number } | undefined +>; + +export type WorkerDefinition = { + port: number | undefined; + protocol: "http" | "https" | undefined; + host: string | undefined; + mode: "local" | "remote"; + headers?: Record; + entrypointAddresses?: WorkerEntrypointsDefinition; + durableObjects: { name: string; className: string }[]; + durableObjectsHost?: string; + durableObjectsPort?: number; +}; diff --git a/packages/wrangler/src/dev.tsx b/packages/wrangler/src/dev.tsx index 01e4f4e3d946..aa390c510a51 100644 --- a/packages/wrangler/src/dev.tsx +++ b/packages/wrangler/src/dev.tsx @@ -503,6 +503,8 @@ async function updateDevEnvRegistry( await events.once(devEnv, "configUpdate"); } + // If the current bound workers in the registry are exactly the same as the workers defined in the config, + // then we don't need to update anything. if ( util.isDeepStrictEqual( boundWorkers,