Skip to content

Commit d91ba05

Browse files
committed
Always complete batches, not only batchTriggerAndWait in deployed tasks
1 parent 140eb40 commit d91ba05

File tree

8 files changed

+172
-82
lines changed

8 files changed

+172
-82
lines changed

apps/webapp/app/v3/services/batchTriggerV2.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ export class BatchTriggerV2Service extends BaseService {
229229
dependentTaskAttemptId: dependentAttempt?.id,
230230
runCount: body.items.length,
231231
runIds: runs.map((r) => r.id),
232+
status: "COMPLETED",
233+
batchVersion: "v2",
232234
},
233235
});
234236

@@ -328,11 +330,12 @@ export class BatchTriggerV2Service extends BaseService {
328330
idempotencyKey: options.idempotencyKey,
329331
idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt,
330332
dependentTaskAttemptId: dependentAttempt?.id,
331-
runCount: body.items.length,
333+
runCount: newRunCount,
332334
runIds: runs.map((r) => r.id),
333335
payload: payloadPacket.data,
334336
payloadType: payloadPacket.dataType,
335337
options,
338+
batchVersion: "v2",
336339
},
337340
});
338341

@@ -409,6 +412,7 @@ export class BatchTriggerV2Service extends BaseService {
409412
payload: payloadPacket.data,
410413
payloadType: payloadPacket.dataType,
411414
options,
415+
batchVersion: "v2",
412416
},
413417
});
414418

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { BaseService } from "./baseService.server";
1414
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
1515
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
1616
import { socketIo } from "../handleSocketIo.server";
17+
import { ResumeBatchRunService } from "./resumeBatchRun.server";
1718

1819
type BaseInput = {
1920
id: string;
@@ -81,6 +82,15 @@ export class FinalizeTaskRunService extends BaseService {
8182
await this.finalizeRunError(run, error);
8283
}
8384

85+
try {
86+
await this.#finalizeBatch(run);
87+
} catch (finalizeBatchError) {
88+
logger.error("FinalizeTaskRunService: Failed to finalize batch", {
89+
runId: run.id,
90+
error: finalizeBatchError,
91+
});
92+
}
93+
8494
//resume any dependencies
8595
const resumeService = new ResumeDependentParentsService(this._prisma);
8696
const result = await resumeService.call({ id: run.id });
@@ -135,6 +145,72 @@ export class FinalizeTaskRunService extends BaseService {
135145
return run as Output<T>;
136146
}
137147

148+
async #finalizeBatch(run: TaskRun) {
149+
if (!run.batchId) {
150+
return;
151+
}
152+
153+
logger.debug("FinalizeTaskRunService: Finalizing batch", { runId: run.id });
154+
155+
const environment = await this._prisma.runtimeEnvironment.findFirst({
156+
where: {
157+
id: run.runtimeEnvironmentId,
158+
},
159+
});
160+
161+
if (!environment) {
162+
return;
163+
}
164+
165+
const batchItems = await this._prisma.batchTaskRunItem.findMany({
166+
where: {
167+
taskRunId: run.id,
168+
},
169+
include: {
170+
batchTaskRun: {
171+
select: {
172+
id: true,
173+
dependentTaskAttemptId: true,
174+
},
175+
},
176+
},
177+
});
178+
179+
if (batchItems.length === 0) {
180+
return;
181+
}
182+
183+
if (batchItems.length > 10) {
184+
logger.error("FinalizeTaskRunService: More than 10 batch items", {
185+
runId: run.id,
186+
batchItems: batchItems.length,
187+
});
188+
189+
return;
190+
}
191+
192+
for (const item of batchItems) {
193+
// Don't do anything if this is a batchTriggerAndWait in a deployed task
194+
if (environment.type !== "DEVELOPMENT" && item.batchTaskRun.dependentTaskAttemptId) {
195+
continue;
196+
}
197+
198+
// Update the item to complete
199+
await this._prisma.batchTaskRunItem.update({
200+
where: {
201+
id: item.id,
202+
},
203+
data: {
204+
status: "COMPLETED",
205+
},
206+
});
207+
208+
// This won't resume because this batch does not have a dependent task attempt ID
209+
// or is in development, but this service will mark the batch as completed
210+
await ResumeBatchRunService.enqueue(item.batchTaskRunId, this._prisma);
211+
}
212+
}
213+
138214
async finalizeRunError(run: TaskRun, error: TaskRunError) {
139215
await this._prisma.taskRun.update({
140216
where: { id: run.id },

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,55 @@ export class ResumeBatchRunService extends BaseService {
1313
id: batchRunId,
1414
},
1515
include: {
16-
dependentTaskAttempt: {
16+
runtimeEnvironment: {
1717
include: {
18-
runtimeEnvironment: {
19-
include: {
20-
project: true,
21-
organization: true,
22-
},
23-
},
24-
taskRun: true,
18+
project: true,
19+
organization: true,
20+
},
21+
},
22+
items: {
23+
select: {
24+
status: true,
25+
taskRunAttemptId: true,
2526
},
2627
},
27-
items: true,
2828
},
2929
});
3030

31-
if (!batchRun || !batchRun.dependentTaskAttempt) {
31+
if (!batchRun) {
3232
logger.error(
3333
"ResumeBatchRunService: Batch run doesn't exist or doesn't have a dependent attempt",
3434
{
35-
batchRun,
35+
batchRunId,
3636
}
3737
);
3838
return;
3939
}
4040

4141
if (batchRun.status === "COMPLETED") {
4242
logger.debug("ResumeBatchRunService: Batch run is already completed", {
43-
batchRun: batchRun,
43+
batchRunId: batchRun.id,
44+
batchRun: {
45+
id: batchRun.id,
46+
status: batchRun.status,
47+
},
4448
});
4549
return;
4650
}
4751

4852
if (batchRun.items.some((item) => !finishedBatchRunStatuses.includes(item.status))) {
4953
logger.debug("ResumeBatchRunService: All items aren't yet completed", {
50-
batchRun: batchRun,
54+
batchRunId: batchRun.id,
55+
batchRun: {
56+
id: batchRun.id,
57+
status: batchRun.status,
58+
},
5159
});
5260
return;
5361
}
5462

55-
// This batch has a dependent attempt and just finalized, we should resume that attempt
56-
const environment = batchRun.dependentTaskAttempt.runtimeEnvironment;
57-
58-
// If we are in development, we don't need to resume the dependent task (that will happen automatically)
59-
if (environment.type === "DEVELOPMENT") {
63+
// If we are in development, or there is no dependent attempt, we can just mark the batch as completed and return
64+
if (batchRun.runtimeEnvironment.type === "DEVELOPMENT" || !batchRun.dependentTaskAttemptId) {
6065
// We need to update the batchRun status so we don't resume it again
6166
await this._prisma.batchTaskRun.update({
6267
where: {
@@ -69,12 +74,42 @@ export class ResumeBatchRunService extends BaseService {
6974
return;
7075
}
7176

72-
const dependentRun = batchRun.dependentTaskAttempt.taskRun;
77+
const dependentTaskAttempt = await this._prisma.taskRunAttempt.findFirst({
78+
where: {
79+
id: batchRun.dependentTaskAttemptId,
80+
},
81+
select: {
82+
status: true,
83+
id: true,
84+
taskRun: {
85+
select: {
86+
id: true,
87+
queue: true,
88+
taskIdentifier: true,
89+
concurrencyKey: true,
90+
},
91+
},
92+
},
93+
});
94+
95+
if (!dependentTaskAttempt) {
96+
logger.error("ResumeBatchRunService: Dependent attempt not found", {
97+
batchRunId: batchRun.id,
98+
dependentTaskAttemptId: batchRun.dependentTaskAttemptId,
99+
});
100+
101+
return;
102+
}
103+
104+
// This batch has a dependent attempt and just finalized, we should resume that attempt
105+
const environment = batchRun.runtimeEnvironment;
106+
107+
const dependentRun = dependentTaskAttempt.taskRun;
73108

74-
if (batchRun.dependentTaskAttempt.status === "PAUSED" && batchRun.checkpointEventId) {
109+
if (dependentTaskAttempt.status === "PAUSED" && batchRun.checkpointEventId) {
75110
logger.debug("ResumeBatchRunService: Attempt is paused and has a checkpoint event", {
76111
batchRunId: batchRun.id,
77-
dependentTaskAttempt: batchRun.dependentTaskAttempt,
112+
dependentTaskAttempt: dependentTaskAttempt,
78113
checkpointEventId: batchRun.checkpointEventId,
79114
});
80115

@@ -83,7 +118,7 @@ export class ResumeBatchRunService extends BaseService {
83118
if (wasUpdated) {
84119
logger.debug("ResumeBatchRunService: Resuming dependent run with checkpoint", {
85120
batchRunId: batchRun.id,
86-
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
121+
dependentTaskAttemptId: dependentTaskAttempt.id,
87122
});
88123
await marqs?.enqueueMessage(
89124
environment,
@@ -92,37 +127,37 @@ export class ResumeBatchRunService extends BaseService {
92127
{
93128
type: "RESUME",
94129
completedAttemptIds: [],
95-
resumableAttemptId: batchRun.dependentTaskAttempt.id,
130+
resumableAttemptId: dependentTaskAttempt.id,
96131
checkpointEventId: batchRun.checkpointEventId,
97-
taskIdentifier: batchRun.dependentTaskAttempt.taskRun.taskIdentifier,
98-
projectId: batchRun.dependentTaskAttempt.runtimeEnvironment.projectId,
99-
environmentId: batchRun.dependentTaskAttempt.runtimeEnvironment.id,
100-
environmentType: batchRun.dependentTaskAttempt.runtimeEnvironment.type,
132+
taskIdentifier: dependentTaskAttempt.taskRun.taskIdentifier,
133+
projectId: environment.projectId,
134+
environmentId: environment.id,
135+
environmentType: environment.type,
101136
},
102137
dependentRun.concurrencyKey ?? undefined
103138
);
104139
} else {
105140
logger.debug("ResumeBatchRunService: with checkpoint was already completed", {
106141
batchRunId: batchRun.id,
107-
dependentTaskAttempt: batchRun.dependentTaskAttempt,
142+
dependentTaskAttempt: dependentTaskAttempt,
108143
checkpointEventId: batchRun.checkpointEventId,
109144
hasCheckpointEvent: !!batchRun.checkpointEventId,
110145
});
111146
}
112147
} else {
113148
logger.debug("ResumeBatchRunService: attempt is not paused or there's no checkpoint event", {
114149
batchRunId: batchRun.id,
115-
dependentTaskAttempt: batchRun.dependentTaskAttempt,
150+
dependentTaskAttempt: dependentTaskAttempt,
116151
checkpointEventId: batchRun.checkpointEventId,
117152
hasCheckpointEvent: !!batchRun.checkpointEventId,
118153
});
119154

120-
if (batchRun.dependentTaskAttempt.status === "PAUSED" && !batchRun.checkpointEventId) {
155+
if (dependentTaskAttempt.status === "PAUSED" && !batchRun.checkpointEventId) {
121156
// In case of race conditions the status can be PAUSED without a checkpoint event
122157
// When the checkpoint is created, it will continue the run
123158
logger.error("ResumeBatchRunService: attempt is paused but there's no checkpoint event", {
124159
batchRunId: batchRun.id,
125-
dependentTaskAttempt: batchRun.dependentTaskAttempt,
160+
dependentTaskAttempt: dependentTaskAttempt,
126161
checkpointEventId: batchRun.checkpointEventId,
127162
hasCheckpointEvent: !!batchRun.checkpointEventId,
128163
});
@@ -134,24 +169,24 @@ export class ResumeBatchRunService extends BaseService {
134169
if (wasUpdated) {
135170
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
136171
batchRunId: batchRun.id,
137-
dependentTaskAttempt: batchRun.dependentTaskAttempt,
172+
dependentTaskAttempt: dependentTaskAttempt,
138173
checkpointEventId: batchRun.checkpointEventId,
139174
hasCheckpointEvent: !!batchRun.checkpointEventId,
140175
});
141176
await marqs?.replaceMessage(dependentRun.id, {
142177
type: "RESUME",
143178
completedAttemptIds: batchRun.items.map((item) => item.taskRunAttemptId).filter(Boolean),
144-
resumableAttemptId: batchRun.dependentTaskAttempt.id,
179+
resumableAttemptId: dependentTaskAttempt.id,
145180
checkpointEventId: batchRun.checkpointEventId ?? undefined,
146-
taskIdentifier: batchRun.dependentTaskAttempt.taskRun.taskIdentifier,
147-
projectId: batchRun.dependentTaskAttempt.runtimeEnvironment.projectId,
148-
environmentId: batchRun.dependentTaskAttempt.runtimeEnvironment.id,
149-
environmentType: batchRun.dependentTaskAttempt.runtimeEnvironment.type,
181+
taskIdentifier: dependentTaskAttempt.taskRun.taskIdentifier,
182+
projectId: environment.projectId,
183+
environmentId: environment.id,
184+
environmentType: environment.type,
150185
});
151186
} else {
152187
logger.debug("ResumeBatchRunService: without checkpoint was already completed", {
153188
batchRunId: batchRun.id,
154-
dependentTaskAttempt: batchRun.dependentTaskAttempt,
189+
dependentTaskAttempt: dependentTaskAttempt,
155190
checkpointEventId: batchRun.checkpointEventId,
156191
hasCheckpointEvent: !!batchRun.checkpointEventId,
157192
});
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "BatchTaskRun" ADD COLUMN "batchVersion" TEXT NOT NULL DEFAULT 'v1';

internal-packages/database/prisma/schema.prisma

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,11 +2154,12 @@ model BatchTaskRun {
21542154
updatedAt DateTime @updatedAt
21552155
21562156
// new columns
2157-
runIds String[] @default([])
2158-
runCount Int @default(0)
2159-
payload String?
2160-
payloadType String @default("application/json")
2161-
options Json?
2157+
runIds String[] @default([])
2158+
runCount Int @default(0)
2159+
payload String?
2160+
payloadType String @default("application/json")
2161+
options Json?
2162+
batchVersion String @default("v1")
21622163
21632164
///all the below properties are engine v1 only
21642165
items BatchTaskRunItem[]

references/v3-catalog/src/management.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { configure, envvars, runs, schedules } from "@trigger.dev/sdk/v3";
1+
import { configure, envvars, runs, schedules, batch } from "@trigger.dev/sdk/v3";
22
import dotenv from "dotenv";
33
import { unfriendlyIdTask } from "./trigger/other.js";
44
import { spamRateLimiter, taskThatErrors } from "./trigger/retries.js";
@@ -255,9 +255,17 @@ async function doTriggerUnfriendlyTaskId() {
255255
console.log("completed run", completedRun);
256256
}
257257

258+
async function doBatchTrigger() {
259+
const response = await batch.triggerByTask([
260+
{ task: simpleChildTask, payload: { message: "Hello, World!" } },
261+
]);
262+
263+
console.log("batch trigger response", response);
264+
}
265+
258266
// doRuns().catch(console.error);
259267
// doListRuns().catch(console.error);
260268
// doScheduleLists().catch(console.error);
261-
doSchedules().catch(console.error);
269+
doBatchTrigger().catch(console.error);
262270
// doEnvVars().catch(console.error);
263271
// doTriggerUnfriendlyTaskId().catch(console.error);

0 commit comments

Comments
 (0)