diff --git a/.changeset/poor-otters-drop.md b/.changeset/poor-otters-drop.md new file mode 100644 index 000000000000..16f878924f05 --- /dev/null +++ b/.changeset/poor-otters-drop.md @@ -0,0 +1,10 @@ +--- +"miniflare": patch +"wrangler": patch +--- + +fix: add support for workers with assets when running multiple workers in one `wrangler dev` instance + +https://github.com/cloudflare/workers-sdk/pull/7251 added support for running multiple Workers in one `wrangler dev`/miniflare session. e.g. `wrangler dev -c wrangler.toml -c ../worker2/wrangler.toml`, which among other things, allowed cross-service RPC to Durable Objects. + +However this did not work in the same way as production when there was a Worker with assets - this PR should fix that. diff --git a/packages/miniflare/src/index.ts b/packages/miniflare/src/index.ts index 398456c1055c..5ea6ac189389 100644 --- a/packages/miniflare/src/index.ts +++ b/packages/miniflare/src/index.ts @@ -1130,14 +1130,6 @@ export class Miniflare { innerBindings: Worker_Binding[]; }[] = []; - if (this.#workerOpts[0].assets.assets) { - // This will be the UserWorker, or the vitest pool worker wrapping the UserWorker - // The asset plugin needs this so that it can set the binding between the RouterWorker and the UserWorker - // TODO: apply this to ever this.#workerOpts, not just the first (i.e this.#workerOpts[0]) - this.#workerOpts[0].assets.assets.workerName = - this.#workerOpts[0].core.name; - } - for (let i = 0; i < allWorkerOpts.length; i++) { const previousWorkerOpts = allPreviousWorkerOpts?.[i]; const workerOpts = allWorkerOpts[i]; @@ -1152,6 +1144,12 @@ export class Miniflare { } } + if (workerOpts.assets.assets) { + // This will be the UserWorker, or the vitest pool worker wrapping the UserWorker + // The asset plugin needs this so that it can set the binding between the RouterWorker and the UserWorker + workerOpts.assets.assets.workerName = workerOpts.core.name; + } + // Collect all bindings from this worker const workerBindings: Worker_Binding[] = []; allWorkerBindings.set(workerName, workerBindings); @@ -1202,6 +1200,20 @@ export class Miniflare { }); } } + if ("service" in binding) { + const targetWorkerName = binding.service?.name?.replace( + "core:user:", + "" + ); + const maybeAssetTargetService = allWorkerOpts.find( + (worker) => + worker.core.name === targetWorkerName && worker.assets.assets + ); + if (maybeAssetTargetService) { + assert(binding.service?.name); + binding.service.name = `${ROUTER_SERVICE_NAME}:${targetWorkerName}`; + } + } } } } @@ -1305,7 +1317,7 @@ export class Miniflare { !this.#workerOpts[0].core.name?.startsWith( "vitest-pool-workers-runner-" ) - ? ROUTER_SERVICE_NAME + ? `${ROUTER_SERVICE_NAME}:${this.#workerOpts[0].core.name}` : getUserServiceName(this.#workerOpts[0].core.name), loopbackPort, log: this.#log, @@ -1340,7 +1352,6 @@ export class Miniflare { "Ensure wrapped bindings don't have bindings to themselves." ); } - return { services: servicesArray, sockets, extensions }; } diff --git a/packages/miniflare/src/plugins/assets/index.ts b/packages/miniflare/src/plugins/assets/index.ts index abed3d1fd844..f0643051ab06 100644 --- a/packages/miniflare/src/plugins/assets/index.ts +++ b/packages/miniflare/src/plugins/assets/index.ts @@ -39,7 +39,9 @@ export const ASSETS_PLUGIN: Plugin = { { // binding between User Worker and Asset Worker name: options.assets.binding, - service: { name: ASSETS_SERVICE_NAME }, + service: { + name: `${ASSETS_SERVICE_NAME}:${options.assets.workerName}`, + }, }, ]; }, @@ -68,8 +70,10 @@ export const ASSETS_PLUGIN: Plugin = { options.assets.directory ); + const id = options.assets.workerName; + const namespaceService: Service = { - name: ASSETS_KV_SERVICE_NAME, + name: `${ASSETS_KV_SERVICE_NAME}:${id}`, worker: { compatibilityDate: "2023-07-24", compatibilityFlags: ["nodejs_compat"], @@ -93,7 +97,7 @@ export const ASSETS_PLUGIN: Plugin = { }; const assetService: Service = { - name: ASSETS_SERVICE_NAME, + name: `${ASSETS_SERVICE_NAME}:${id}`, worker: { compatibilityDate: "2024-08-01", modules: [ @@ -105,7 +109,9 @@ export const ASSETS_PLUGIN: Plugin = { bindings: [ { name: "ASSETS_KV_NAMESPACE", - kvNamespace: { name: ASSETS_KV_SERVICE_NAME }, + kvNamespace: { + name: `${ASSETS_KV_SERVICE_NAME}:${id}`, + }, }, { name: "ASSETS_MANIFEST", @@ -120,7 +126,7 @@ export const ASSETS_PLUGIN: Plugin = { }; const routerService: Service = { - name: ROUTER_SERVICE_NAME, + name: `${ROUTER_SERVICE_NAME}:${id}`, worker: { compatibilityDate: "2024-08-01", modules: [ @@ -132,11 +138,13 @@ export const ASSETS_PLUGIN: Plugin = { bindings: [ { name: "ASSET_WORKER", - service: { name: ASSETS_SERVICE_NAME }, + service: { + name: `${ASSETS_SERVICE_NAME}:${id}`, + }, }, { name: "USER_WORKER", - service: { name: getUserServiceName(options.assets.workerName) }, + service: { name: getUserServiceName(id) }, }, { name: "CONFIG", diff --git a/packages/miniflare/src/plugins/core/index.ts b/packages/miniflare/src/plugins/core/index.ts index 18341d16c729..08f5ea7fd549 100644 --- a/packages/miniflare/src/plugins/core/index.ts +++ b/packages/miniflare/src/plugins/core/index.ts @@ -262,7 +262,7 @@ function getCustomServiceDesignator( } else if (service === kCurrentWorker) { // Sets SELF binding to point to router worker instead if assets are present. serviceName = hasAssetsAndIsVitest - ? ROUTER_SERVICE_NAME + ? `${ROUTER_SERVICE_NAME}:${refererName}` : getUserServiceName(refererName); } else { // Regular user worker diff --git a/packages/wrangler/e2e/multiworker-dev.test.ts b/packages/wrangler/e2e/multiworker-dev.test.ts index d584242b085b..5fcbcef1be75 100644 --- a/packages/wrangler/e2e/multiworker-dev.test.ts +++ b/packages/wrangler/e2e/multiworker-dev.test.ts @@ -35,7 +35,6 @@ describe("multiworker", () => { let b: string; let c: string; let helper: WranglerE2ETestHelper; - beforeEach(async () => { workerName = generateResourceName("worker"); workerName2 = generateResourceName("worker"); @@ -446,4 +445,348 @@ describe("multiworker", () => { ); }); }); + + describe("workers with assets", async () => { + let workerNameAW: string; + let workerName4: string; + let aw: string; + let d: string; + describe("-> worker with assets -> regular worker", () => { + beforeEach(async () => { + aw = await makeRoot(); + workerNameAW = generateResourceName("worker-aw"); + workerName4 = generateResourceName("worker"); + await baseSeed(aw, { + "wrangler.toml": dedent` + name = "${workerNameAW}" + main = "src/index.ts" + compatibility_date = "2024-11-01" + + [assets] + directory = "./public/" + binding = "ASSETS" + + [[services]] + binding = "DEE" + service = '${workerName4}' + + [[services]] + binding = "COUNTER" + service = '${workerName4}' + entrypoint = 'CounterService' + + [durable_objects] + bindings = [ + { name = "REFERENCED_DO", class_name = "MyDurableObject", script_name = "${workerName4}" } + ] + `, + "public/asset-binding.html": "

have an asset via a binding

", + "public/asset.html": "

have an asset directly

", + "src/index.ts": dedent/* javascript */ ` + export default { + async fetch(req, env) { + const url = new URL(req.url) + if (url.pathname === "/asset-via-binding") { + return env.ASSETS.fetch(new URL("asset-binding.html", req.url)); + } + if (url.pathname === "/worker") { + return new Response("hello world from a worker with assets"); + } + if (url.pathname === "/count") { + const counter = await env.COUNTER.newCounter() + await counter.increment(1) + await counter.increment(2) + await counter.increment(3) + return new Response(String(await counter.value)) + } + if (url.pathname === "/do") { + const id = env.REFERENCED_DO.idFromName(url.pathname); + const stub = env.REFERENCED_DO.get(id); + return stub.fetch(req); + } + if (url.pathname === "/do-rpc") { + const id = env.REFERENCED_DO.idFromName(url.pathname); + const stub = env.REFERENCED_DO.get(id); + return new Response(await stub.sayHello("through DO RPC")); + } + return env.DEE.fetch(req); + }, + }; + `, + "package.json": dedent` + { + "name": "aw", + "version": "0.0.0", + "private": true + } + `, + }); + + d = await makeRoot(); + await baseSeed(d, { + "wrangler.toml": dedent` + name = "${workerName4}" + main = "src/index.ts" + compatibility_date = "2024-11-01" + `, + "src/index.ts": dedent/* javascript */ ` + import { DurableObject, WorkerEntrypoint, RpcTarget } from "cloudflare:workers"; + export default{ + async fetch(req, env) { + return new Response("hello world from dee"); + } + }; + class Counter extends RpcTarget { + #value = 0; + + increment(amount) { + this.#value += amount; + return this.#value; + } + + get value() { + return this.#value; + } + } + + export class CounterService extends WorkerEntrypoint { + async newCounter() { + return new Counter(); + } + } + + export class MyDurableObject extends DurableObject { + async fetch(request: Request) { + if (request.headers.has("X-Reset-Count")) { + await this.ctx.storage.put("count", 0); + } + let count: number = (await this.ctx.storage.get("count")) || 0; + await this.ctx.storage.put("count", ++count); + return Response.json({ count }); + } + + sayHello(name: string) { + return "Hello " + name; + } + } + `, + }); + }); + + it("can fetch a worker with assets", async () => { + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${d}/wrangler.toml`, + { cwd: aw } + ); + + const { url } = await worker.waitForReady(5_000); + await expect(fetchText(`${url}/asset`)).resolves.toBe( + "

have an asset directly

" + ); + await expect(fetchText(`${url}/asset-via-binding`)).resolves.toBe( + "

have an asset via a binding

" + ); + await expect( + fetch(`${url}/worker`).then((r) => r.text()) + ).resolves.toBe("hello world from a worker with assets"); + }); + + it("can fetch a regular worker through a worker with assets, including with rpc", async () => { + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${d}/wrangler.toml`, + { cwd: aw } + ); + + const { url } = await worker.waitForReady(5_000); + + await expect( + fetch(`${url}/hello-from-dee`).then((r) => r.text()) + ).resolves.toBe("hello world from dee"); + + await vi.waitFor( + async () => + await expect(fetchText(`${url}/count`)).resolves.toBe("6"), + { interval: 1000, timeout: 10_000 } + ); + }); + + it("can fetch a DO through a worker with assets, including with rpc", async () => { + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${d}/wrangler.toml`, + { cwd: aw } + ); + + const { url } = await worker.waitForReady(5_000); + + await vi.waitFor( + async () => + await expect(fetchText(`${url}/do-rpc`)).resolves.toBe( + "Hello through DO RPC" + ), + { interval: 1000, timeout: 10_000 } + ); + await vi.waitFor( + async () => + await expect( + fetchJson(`${url}/do`, { + headers: { + "X-Reset-Count": "true", + }, + }) + ).resolves.toMatchObject({ count: 1 }), + { interval: 1000, timeout: 10_000 } + ); + }); + }); + + describe("-> regular worker -> worker with assets", () => { + beforeEach(async () => { + await baseSeed(aw, { + "wrangler.toml": dedent` + name = "${workerNameAW}" + main = "src/index.ts" + compatibility_date = "2024-11-01" + [assets] + directory = "./public/" + binding = "ASSETS" + `, + "src/index.ts": dedent/* javascript */ ` + export default { + async fetch(req, env) { + const url = new URL(req.url) + if (url.pathname === "/asset-via-binding") { + return env.ASSETS.fetch(new URL("asset-binding.html", req.url)); + } + return new Response("hello world from a worker with assets") + }, + add(a, b) { return a + b; } + }; + `, + }); + await baseSeed(d, { + "wrangler.toml": dedent` + name = "${workerName4}" + main = "src/index.ts" + compatibility_date = "2024-11-01" + [[services]] + binding = "AW" + service = '${workerNameAW}' + `, + "src/index.ts": dedent/* javascript */ ` + export default{ + async fetch(req, env) { + const url = new URL(req.url) + if (url.pathname === "/rpc") { + return env.AW.add(1,2); + } + return env.AW.fetch(req); + } + };`, + }); + }); + + it("can fetch assets through a regular worker", async () => { + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${aw}/wrangler.toml`, + { cwd: d } + ); + + const { url } = await worker.waitForReady(5_000); + await expect(fetchText(`${url}/asset`)).resolves.toBe( + "

have an asset directly

" + ); + }); + + it("can fall back to user worker", async () => { + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${aw}/wrangler.toml`, + { cwd: d } + ); + + const { url } = await worker.waitForReady(5_000); + await expect(fetchText(`${url}/worker`)).resolves.toBe( + "hello world from a worker with assets" + ); + await expect(fetchText(`${url}/asset-via-binding`)).resolves.toBe( + "

have an asset via a binding

" + ); + }); + + it.fails("call rpc on a worker with assets", async () => { + // because it hits the router worker not the user worker, and add is not implemented there + // proxy/filter worker will fix this + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${aw}/wrangler.toml`, + { cwd: d } + ); + + const { url } = await worker.waitForReady(5_000); + await expect(fetchText(`${url}/rpc`)).resolves.toEqual(3); + }); + // wrangler dev just crashses because the named entrypoint does not exist on the router worker + it.fails( + "binding to named entrypoint on a worker with assets", + async () => { + await baseSeed(aw, { + "wrangler.toml": dedent` + name = "${workerNameAW}" + main = "src/index.ts" + compatibility_date = "2024-11-01" + [assets] + directory = "./public/" + binding = "ASSETS" + `, + "src/index.ts": dedent/* javascript */ ` + import { WorkerEntrypoint } from "cloudflare:workers"; + export class NamedEntrypoint extends WorkerEntrypoint { + async add(a, b) { + return a + b; + } + async fetch(req) { + return new Response("hello world from the fetch handler of a named entrypoint") + } + } + export default { + async fetch(req, env) { + return new Response("hello world from a worker with assets") + }, + }; + `, + }); + await baseSeed(d, { + "wrangler.toml": dedent` + name = "${workerName4}" + main = "src/index.ts" + compatibility_date = "2024-11-01" + [[services]] + binding = "NAMED" + service = '${workerNameAW}' + entrypoint = "NamedEntrypoint" + `, + "src/index.ts": dedent/* javascript */ ` + export default{ + async fetch(req, env) { + const url = new URL(req.url) + if (url.pathname === "/named-rpc") { + return env.NAMED.add(1,2); + } + if (url.pathname === "/named-fetch") { + return env.NAMED.fetch(req); + } + return new Response("hello world"); + } + };`, + }); + + const worker = helper.runLongLived( + `wrangler dev -c wrangler.toml -c ${aw}/wrangler.toml`, + { cwd: d } + ); + + const { url } = await worker.waitForReady(5_000); + await expect(fetchText(`${url}/named-rpc`)).resolves.toEqual(3); + } + ); + }); + }); });