Skip to content

Commit ca6c010

Browse files
Add extension in front of workflow binding for manually disposing engine stubs (#10919)
1 parent 69ad5c5 commit ca6c010

File tree

11 files changed

+225
-74
lines changed

11 files changed

+225
-74
lines changed

.changeset/tricky-squids-wait.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@cloudflare/vitest-pool-workers": minor
3+
"@cloudflare/workflows-shared": minor
4+
"miniflare": minor
5+
---
6+
7+
migrate workflow to use a wrapped binding

fixtures/workflow/tests/index.test.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ describe("Workflows", () => {
4848
await expect(fetchJson(`http://${ip}:${port}/create?workflowName=test`))
4949
.resolves.toMatchInlineSnapshot(`
5050
{
51-
"__LOCAL_DEV_STEP_OUTPUTS": [],
51+
"__LOCAL_DEV_STEP_OUTPUTS": [
52+
{
53+
"output": "First step result",
54+
},
55+
],
5256
"output": null,
5357
"status": "running",
5458
}
@@ -77,7 +81,11 @@ describe("Workflows", () => {
7781
await expect(fetchJson(`http://${ip}:${port}/create`)).resolves
7882
.toMatchInlineSnapshot(`
7983
{
80-
"__LOCAL_DEV_STEP_OUTPUTS": [],
84+
"__LOCAL_DEV_STEP_OUTPUTS": [
85+
{
86+
"output": "First step result",
87+
},
88+
],
8189
"output": null,
8290
"status": "running",
8391
}

packages/miniflare/src/plugins/workflows/index.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from "fs/promises";
22
import SCRIPT_WORKFLOWS_BINDING from "worker:workflows/binding";
3+
import SCRIPT_WORKFLOWS_WRAPPED_BINDING from "worker:workflows/wrapped-binding";
34
import { z } from "zod";
45
import { Service } from "../../runtime";
56
import { getUserServiceName } from "../core";
@@ -43,13 +44,21 @@ export const WORKFLOWS_PLUGIN: Plugin<
4344
return Object.entries(options.workflows ?? {}).map(
4445
([bindingName, workflow]) => ({
4546
name: bindingName,
46-
service: {
47-
name: getUserBindingServiceName(
48-
WORKFLOWS_PLUGIN_NAME,
49-
workflow.name,
50-
workflow.remoteProxyConnectionString
51-
),
52-
entrypoint: "WorkflowBinding",
47+
wrapped: {
48+
moduleName: `${WORKFLOWS_PLUGIN_NAME}:local-wrapped-binding`,
49+
innerBindings: [
50+
{
51+
name: "binding",
52+
service: {
53+
name: getUserBindingServiceName(
54+
WORKFLOWS_PLUGIN_NAME,
55+
workflow.name,
56+
workflow.remoteProxyConnectionString
57+
),
58+
entrypoint: "WorkflowBinding",
59+
},
60+
},
61+
],
5362
},
5463
})
5564
);
@@ -64,6 +73,20 @@ export const WORKFLOWS_PLUGIN: Plugin<
6473
);
6574
},
6675

76+
getExtensions({}) {
77+
return [
78+
{
79+
modules: [
80+
{
81+
name: `${WORKFLOWS_PLUGIN_NAME}:local-wrapped-binding`,
82+
esModule: SCRIPT_WORKFLOWS_WRAPPED_BINDING(),
83+
internal: true,
84+
},
85+
],
86+
},
87+
];
88+
},
89+
6790
async getServices({ options, sharedOptions, tmpPath, defaultPersistRoot }) {
6891
const persistPath = getPersistPath(
6992
WORKFLOWS_PLUGIN_NAME,
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import { WorkflowBinding } from "@cloudflare/workflows-shared/src/binding";
2+
3+
class WorkflowImpl implements Workflow {
4+
constructor(private binding: WorkflowBinding) {
5+
this.binding = binding;
6+
}
7+
8+
async get(id: string): Promise<WorkflowInstance> {
9+
const instanceHandle = new InstanceImpl(id, this.binding);
10+
// throws instance.not_found if instance doesn't exist
11+
// this is needed for backwards compat
12+
await instanceHandle.status();
13+
return instanceHandle;
14+
}
15+
16+
async create(
17+
options?: WorkflowInstanceCreateOptions
18+
): Promise<WorkflowInstance> {
19+
using result = (await this.binding.create(options)) as WorkflowInstance &
20+
Disposable;
21+
22+
return new InstanceImpl(result.id, this.binding);
23+
}
24+
25+
async createBatch(
26+
options: WorkflowInstanceCreateOptions[]
27+
): Promise<WorkflowInstance[]> {
28+
const result = await this.binding.createBatch(options);
29+
return result.map((res) => {
30+
return new InstanceImpl(res.id, this.binding);
31+
});
32+
}
33+
34+
async unsafeGetBindingName(): Promise<string> {
35+
return this.binding.unsafeGetBindingName();
36+
}
37+
38+
async unsafeAbort(instanceId: string, reason?: string): Promise<void> {
39+
return this.binding.unsafeAbort(instanceId, reason);
40+
}
41+
42+
async unsafeGetInstanceModifier(instanceId: string): Promise<unknown> {
43+
return this.binding.unsafeGetInstanceModifier(instanceId);
44+
}
45+
46+
async unsafeWaitForStepResult(
47+
instanceId: string,
48+
name: string,
49+
index?: number
50+
): Promise<unknown> {
51+
return this.binding.unsafeWaitForStepResult(instanceId, name, index);
52+
}
53+
54+
async unsafeWaitForStatus(instanceId: string, status: string): Promise<void> {
55+
return await this.binding.unsafeWaitForStatus(instanceId, status);
56+
}
57+
}
58+
59+
class InstanceImpl implements WorkflowInstance {
60+
constructor(
61+
public id: string,
62+
private binding: WorkflowBinding
63+
) {}
64+
65+
public async pause(): Promise<void> {
66+
// Look for instance in namespace
67+
// Get engine stub
68+
// Call a few functions on stub
69+
throw new Error("Not implemented yet");
70+
}
71+
72+
public async resume(): Promise<void> {
73+
throw new Error("Not implemented yet");
74+
}
75+
76+
public async terminate(): Promise<void> {
77+
throw new Error("Not implemented yet");
78+
}
79+
80+
public async restart(): Promise<void> {
81+
throw new Error("Not implemented yet");
82+
}
83+
84+
public async status(): Promise<InstanceStatus> {
85+
const instance = (await this.binding.get(this.id)) as WorkflowInstance &
86+
Disposable;
87+
using res = (await instance.status()) as InstanceStatus & Disposable;
88+
instance[Symbol.dispose]();
89+
return structuredClone(res);
90+
}
91+
92+
public async sendEvent(args: {
93+
payload: unknown;
94+
type: string;
95+
}): Promise<void> {
96+
const instance = (await this.binding.get(this.id)) as WorkflowInstance &
97+
Disposable;
98+
const res = (await instance.sendEvent(args)) as void & Disposable;
99+
instance[Symbol.dispose]();
100+
res[Symbol.dispose]();
101+
}
102+
}
103+
104+
export function makeBinding(env: { binding: WorkflowBinding }): Workflow {
105+
return new WorkflowImpl(env.binding);
106+
}
107+
108+
export default makeBinding;

packages/miniflare/test/plugins/workflows/index.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ test("persists Workflow data on file-system between runs", async (t) => {
4242
let res = await mf.dispatchFetch("http://localhost");
4343
t.is(
4444
await res.text(),
45-
'{"status":"running","__LOCAL_DEV_STEP_OUTPUTS":[],"output":null}'
45+
'{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":["yes you are"],"output":"I\'m a output string"}'
4646
);
4747

4848
// there's no waitUntil in ava haha

packages/vite-plugin-cloudflare/playground/external-workflows/__tests__/workflows.spec.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,7 @@ import { getJsonResponse } from "../../__test-utils__";
44
test("creates a Workflow with an ID", async () => {
55
const instanceId = "external-workflows-test-id";
66

7-
expect(await getJsonResponse(`/create?id=${instanceId}`)).toEqual({
8-
id: instanceId,
9-
status: {
10-
status: "running",
11-
__LOCAL_DEV_STEP_OUTPUTS: [],
12-
output: null,
13-
},
14-
});
7+
await getJsonResponse(`/create?id=${instanceId}`);
158

169
await vi.waitFor(
1710
async () => {

packages/vite-plugin-cloudflare/playground/workflows/__tests__/workflows.spec.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,7 @@ import { getJsonResponse } from "../../__test-utils__";
44
test("creates a Workflow with an ID", async () => {
55
const instanceId = "workflows-test-id";
66

7-
expect(await getJsonResponse(`/create?id=${instanceId}`)).toEqual({
8-
id: instanceId,
9-
status: {
10-
status: "running",
11-
__LOCAL_DEV_STEP_OUTPUTS: [],
12-
output: null,
13-
},
14-
});
7+
await getJsonResponse(`/create?id=${instanceId}`);
158

169
await vi.waitFor(
1710
async () => {

packages/vitest-pool-workers/src/worker/workflows.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,31 @@ class WorkflowInstanceIntrospectorHandle
4545
{
4646
#workflow: WorkflowBinding;
4747
#instanceId: string;
48-
#instanceModifier: WorkflowInstanceModifier;
48+
#instanceModifier: WorkflowInstanceModifier | undefined;
49+
#instanceModifierPromise: Promise<WorkflowInstanceModifier> | undefined;
4950

5051
constructor(workflow: WorkflowBinding, instanceId: string) {
5152
this.#workflow = workflow;
5253
this.#instanceId = instanceId;
53-
54-
this.#instanceModifier = workflow.unsafeGetInstanceModifier(
55-
instanceId
56-
) as WorkflowInstanceModifier;
54+
this.#instanceModifierPromise = workflow
55+
.unsafeGetInstanceModifier(instanceId)
56+
.then((res) => {
57+
this.#instanceModifier = res as WorkflowInstanceModifier;
58+
this.#instanceModifierPromise = undefined;
59+
return this.#instanceModifier;
60+
});
5761
}
5862

5963
async modify(fn: ModifierCallback): Promise<WorkflowInstanceIntrospector> {
64+
if (this.#instanceModifierPromise !== undefined) {
65+
this.#instanceModifier = await this.#instanceModifierPromise;
66+
}
67+
if (this.#instanceModifier === undefined) {
68+
throw new Error(
69+
"could not apply modifications due to internal error. Retrying the test may resolve the issue."
70+
);
71+
}
72+
6073
await fn(this.#instanceModifier);
6174

6275
return this;

0 commit comments

Comments
 (0)