From 7216835bf7489804905751c6b52e75a8945e7974 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Duarte?= Date: Thu, 19 Dec 2024 16:45:25 +0000 Subject: [PATCH] Make workflow `instance.status()` return output equal to production workflows (#7575) * feat: Make `instance.status()` return output equal to production Previously,in local dev, the `output` field would return the list of successful steps outputs in the workflow. This is not expected behaviour compared to production workflows (where the output is the actual return of the `run` function), probably just a implementation detail not set left-over from before beta. This commit makes it so that `output` is equal to production behaviour. For observability sake, I kept the old step output list in a different field `__LOCAL_DEV_STEP_OUTPUTS` - I think this is a good enough compromise right now since we want local dev to be correct against prod first. We can remove `__LOCAL_DEV_STEP_OUTPUTS` later, once we figure out on how to add custom stuff to the devtools page. * fix types * fix tests * chore: add better types for readLogs --- .changeset/great-flowers-compete.md | 6 ++++ fixtures/workflow-multiple/src/index.ts | 4 +-- .../workflow-multiple/tests/index.test.ts | 12 ++++--- fixtures/workflow/src/index.ts | 2 +- fixtures/workflow/tests/index.test.ts | 12 ++++--- .../test/plugins/workflows/index.spec.ts | 15 +++++++-- packages/workflows-shared/src/binding.ts | 33 +++++++++++++++---- packages/workflows-shared/src/engine.ts | 33 ++++++++++++------- 8 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 .changeset/great-flowers-compete.md diff --git a/.changeset/great-flowers-compete.md b/.changeset/great-flowers-compete.md new file mode 100644 index 000000000000..96160dec4121 --- /dev/null +++ b/.changeset/great-flowers-compete.md @@ -0,0 +1,6 @@ +--- +"@cloudflare/workflows-shared": patch +"miniflare": patch +--- + +Make `Instance.status()` return type the same as production diff --git a/fixtures/workflow-multiple/src/index.ts b/fixtures/workflow-multiple/src/index.ts index 3498d07e2f24..adaf369e8e93 100644 --- a/fixtures/workflow-multiple/src/index.ts +++ b/fixtures/workflow-multiple/src/index.ts @@ -29,7 +29,7 @@ export class Demo extends WorkflowEntrypoint<{}, Params> { }; }); - return [result, result2, timestamp, payload, "workflow1"]; + return "i'm workflow1"; } } @@ -53,7 +53,7 @@ export class Demo2 extends WorkflowEntrypoint<{}, Params> { }; }); - return [result, result2, timestamp, payload, "workflow2"]; + return "i'm workflow2"; } } diff --git a/fixtures/workflow-multiple/tests/index.test.ts b/fixtures/workflow-multiple/tests/index.test.ts index 29268cd0ea29..4a986f4111cf 100644 --- a/fixtures/workflow-multiple/tests/index.test.ts +++ b/fixtures/workflow-multiple/tests/index.test.ts @@ -48,7 +48,8 @@ describe("Workflows", () => { id: "test", status: { status: "running", - output: [], + __LOCAL_DEV_STEP_OUTPUTS: [], + output: null, }, }; @@ -65,7 +66,8 @@ describe("Workflows", () => { id: "test", status: { status: "running", - output: [{ output: "First step result" }], + __LOCAL_DEV_STEP_OUTPUTS: [{ output: "First step result" }], + output: null, }, }; await Promise.all([ @@ -96,10 +98,11 @@ describe("Workflows", () => { id: "test", status: { status: "complete", - output: [ + __LOCAL_DEV_STEP_OUTPUTS: [ { output: "First step result" }, { output: "workflow1" }, ], + output: "i'm workflow1", }, }); }, @@ -113,10 +116,11 @@ describe("Workflows", () => { id: "test", status: { status: "complete", - output: [ + __LOCAL_DEV_STEP_OUTPUTS: [ { output: "First step result" }, { output: "workflow2" }, ], + output: "i'm workflow2", }, }); }, diff --git a/fixtures/workflow/src/index.ts b/fixtures/workflow/src/index.ts index 8d84551cefe6..1a62c4b33cb6 100644 --- a/fixtures/workflow/src/index.ts +++ b/fixtures/workflow/src/index.ts @@ -27,7 +27,7 @@ export class Demo extends WorkflowEntrypoint<{}, Params> { }; }); - return [result, result2, timestamp, payload]; + return "i'm a workflow output"; } } diff --git a/fixtures/workflow/tests/index.test.ts b/fixtures/workflow/tests/index.test.ts index cb81a544c637..046f8190f425 100644 --- a/fixtures/workflow/tests/index.test.ts +++ b/fixtures/workflow/tests/index.test.ts @@ -46,7 +46,8 @@ describe("Workflows", () => { fetchJson(`http://${ip}:${port}/create?workflowName=test`) ).resolves.toEqual({ status: "running", - output: [], + __LOCAL_DEV_STEP_OUTPUTS: [], + output: null, }); await vi.waitFor( @@ -55,7 +56,8 @@ describe("Workflows", () => { fetchJson(`http://${ip}:${port}/status?workflowName=test`) ).resolves.toEqual({ status: "running", - output: [{ output: "First step result" }], + __LOCAL_DEV_STEP_OUTPUTS: [{ output: "First step result" }], + output: null, }); }, { timeout: 5000 } @@ -67,10 +69,11 @@ describe("Workflows", () => { fetchJson(`http://${ip}:${port}/status?workflowName=test`) ).resolves.toEqual({ status: "complete", - output: [ + __LOCAL_DEV_STEP_OUTPUTS: [ { output: "First step result" }, { output: "Second step result" }, ], + output: "i'm a workflow output", }); }, { timeout: 5000 } @@ -80,7 +83,8 @@ describe("Workflows", () => { it("creates a workflow without id", async ({ expect }) => { await expect(fetchJson(`http://${ip}:${port}/create`)).resolves.toEqual({ status: "running", - output: [], + __LOCAL_DEV_STEP_OUTPUTS: [], + output: null, }); }); diff --git a/packages/miniflare/test/plugins/workflows/index.spec.ts b/packages/miniflare/test/plugins/workflows/index.spec.ts index ac19a418e868..0c8d9ddccb32 100644 --- a/packages/miniflare/test/plugins/workflows/index.spec.ts +++ b/packages/miniflare/test/plugins/workflows/index.spec.ts @@ -40,7 +40,10 @@ test("persists Workflow data on file-system between runs", async (t) => { t.teardown(() => mf.dispose()); let res = await mf.dispatchFetch("http://localhost"); - t.is(await res.text(), '{"status":"running","output":[]}'); + t.is( + await res.text(), + '{"status":"running","__LOCAL_DEV_STEP_OUTPUTS":[],"output":null}' + ); // there's no waitUntil in ava haha const begin = performance.now(); @@ -50,7 +53,10 @@ test("persists Workflow data on file-system between runs", async (t) => { const res = await mf.dispatchFetch("http://localhost"); console.log(test); test = await res.text(); - if (test === '{"status":"complete","output":["yes you are"]}') { + if ( + test === + '{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":["yes you are"],"output":"I\'m a output string"}' + ) { success = true; break; } @@ -68,5 +74,8 @@ test("persists Workflow data on file-system between runs", async (t) => { // state should be persisted now res = await mf.dispatchFetch("http://localhost"); - t.is(await res.text(), '{"status":"complete","output":["yes you are"]}'); + t.is( + await res.text(), + '{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":["yes you are"],"output":"I\'m a output string"}' + ); }); diff --git a/packages/workflows-shared/src/binding.ts b/packages/workflows-shared/src/binding.ts index 0e7a44ab6c7d..f10dd0981bb8 100644 --- a/packages/workflows-shared/src/binding.ts +++ b/packages/workflows-shared/src/binding.ts @@ -5,6 +5,7 @@ import type { DatabaseVersion, DatabaseWorkflow, Engine, + EngineLogs, } from "./engine"; type Env = { @@ -93,16 +94,34 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance { throw new Error("Not implemented yet"); } - public async status(): Promise { + public async status(): Promise< + InstanceStatus & { __LOCAL_DEV_STEP_OUTPUTS: unknown[] } + > { const status = await this.stub.getStatus(0, this.id); - const { logs } = await this.stub.readLogs(); - // @ts-expect-error TODO: Fix this + + // NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise + const { logs } = + await (this.stub.readLogs() as unknown as Promise); + + const workflowSuccessEvent = logs + .filter((log) => log.event === InstanceEvent.WORKFLOW_SUCCESS) + .at(0); + const filteredLogs = logs.filter( - // @ts-expect-error TODO: Fix this (log) => log.event === InstanceEvent.STEP_SUCCESS ); - // @ts-expect-error TODO: Fix this - const output = filteredLogs.map((log) => log.metadata.result); - return { status: instanceStatusName(status), output }; // output, error + const stepOutputs = filteredLogs.map((log) => log.metadata.result); + + const workflowOutput = + workflowSuccessEvent !== undefined + ? workflowSuccessEvent.metadata.result + : null; + + return { + status: instanceStatusName(status), + __LOCAL_DEV_STEP_OUTPUTS: stepOutputs, + // @ts-expect-error types are wrong, will remove this expect-error once I fix them + output: workflowOutput, + }; // output, error } } diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index 08dd14a0b9b2..d2002b9d08f6 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -12,11 +12,7 @@ import { startGracePeriod, } from "./lib/gracePeriodSemaphore"; import { TimePriorityQueue } from "./lib/timePriorityQueue"; -import type { - InstanceLogsResponse, - InstanceMetadata, - RawInstanceLog, -} from "./instance"; +import type { InstanceMetadata, RawInstanceLog } from "./instance"; import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers"; export interface Env { @@ -53,6 +49,19 @@ export type DatabaseInstance = { ended_on: string | null; }; +export type Log = { + event: InstanceEvent; + group: string | null; + target: string | null; + metadata: { + result: unknown; + }; +}; + +export type EngineLogs = { + logs: Log[]; +}; + const ENGINE_STATUS_KEY = "ENGINE_STATUS"; export class Engine extends DurableObject { @@ -121,18 +130,20 @@ export class Engine extends DurableObject { return []; } - readLogs(): InstanceLogsResponse { + readLogs(): EngineLogs { const logs = [ - ...this.ctx.storage.sql.exec>( - "SELECT event, groupKey, target, metadata FROM states" - ), + ...this.ctx.storage.sql.exec<{ + event: InstanceEvent; + groupKey: string | null; + target: string | null; + metadata: string; + }>("SELECT event, groupKey, target, metadata FROM states"), ]; return { - // @ts-expect-error TODO: Fix this logs: logs.map((log) => ({ ...log, - metadata: JSON.parse(log.metadata as string), + metadata: JSON.parse(log.metadata), group: log.groupKey, })), };