@@ -2,11 +2,19 @@ import { CoordinatorToPlatformMessages } from "@trigger.dev/core/v3";
2
2
import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket" ;
3
3
import type { Checkpoint , CheckpointRestoreEvent } from "@trigger.dev/database" ;
4
4
import { logger } from "~/services/logger.server" ;
5
- import { generateFriendlyId } from "../friendlyIdentifiers" ;
6
5
import { marqs } from "~/v3/marqs/index.server" ;
7
- import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server" ;
6
+ import { generateFriendlyId } from "../friendlyIdentifiers" ;
7
+ import {
8
+ FINAL_ATTEMPT_STATUSES ,
9
+ isFinalAttemptStatus ,
10
+ isFinalRunStatus ,
11
+ isFreezableAttemptStatus ,
12
+ isFreezableRunStatus ,
13
+ } from "../taskStatus" ;
8
14
import { BaseService } from "./baseService.server" ;
9
- import { isFinalRunStatus , isFreezableAttemptStatus , isFreezableRunStatus } from "../taskStatus" ;
15
+ import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server" ;
16
+ import { ResumeBatchRunService } from "./resumeBatchRun.server" ;
17
+ import { ResumeTaskDependencyService } from "./resumeTaskDependency.server" ;
10
18
11
19
export class CreateCheckpointService extends BaseService {
12
20
public async call (
@@ -89,6 +97,9 @@ export class CreateCheckpointService extends BaseService {
89
97
} ;
90
98
}
91
99
100
+ //sleep to test slow checkpoints
101
+ // await new Promise((resolve) => setTimeout(resolve, 60_000));
102
+
92
103
const checkpoint = await this . _prisma . checkpoint . create ( {
93
104
data : {
94
105
friendlyId : generateFriendlyId ( "checkpoint" ) ,
@@ -140,10 +151,133 @@ export class CreateCheckpointService extends BaseService {
140
151
dependencyFriendlyRunId : reason . friendlyId ,
141
152
} ) ;
142
153
143
- keepRunAlive = await this . #isRunCompleted( reason . friendlyId ) ;
144
-
145
- if ( ! keepRunAlive ) {
146
- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
154
+ if ( checkpointEvent ) {
155
+ const dependency = await this . _prisma . taskRunDependency . findFirst ( {
156
+ select : {
157
+ id : true ,
158
+ taskRunId : true ,
159
+ } ,
160
+ where : {
161
+ taskRun : {
162
+ friendlyId : reason . friendlyId ,
163
+ } ,
164
+ } ,
165
+ } ) ;
166
+
167
+ logger . log ( "CreateCheckpointService: Created checkpoint WAIT_FOR_TASK" , {
168
+ checkpointId : checkpoint . id ,
169
+ runFriendlyId : reason . friendlyId ,
170
+ dependencyId : dependency ?. id ,
171
+ } ) ;
172
+
173
+ if ( ! dependency ) {
174
+ logger . error ( "CreateCheckpointService: Dependency not found" , {
175
+ friendlyId : reason . friendlyId ,
176
+ } ) ;
177
+
178
+ return {
179
+ success : true ,
180
+ checkpoint,
181
+ event : checkpointEvent ,
182
+ keepRunAlive : false ,
183
+ } ;
184
+ }
185
+
186
+ const childRun = await this . _prisma . taskRun . findFirst ( {
187
+ select : {
188
+ id : true ,
189
+ status : true ,
190
+ } ,
191
+ where : {
192
+ id : dependency . taskRunId ,
193
+ } ,
194
+ } ) ;
195
+
196
+ if ( ! childRun ) {
197
+ logger . error ( "CreateCheckpointService: Dependency child run not found" , {
198
+ taskRunId : dependency . taskRunId ,
199
+ runFriendlyId : reason . friendlyId ,
200
+ dependencyId : dependency . id ,
201
+ } ) ;
202
+
203
+ return {
204
+ success : true ,
205
+ checkpoint,
206
+ event : checkpointEvent ,
207
+ keepRunAlive : false ,
208
+ } ;
209
+ }
210
+
211
+ const isFinished = isFinalRunStatus ( childRun . status ) ;
212
+ if ( ! isFinished ) {
213
+ logger . debug ( "CreateCheckpointService: Dependency child run not finished" , {
214
+ taskRunId : dependency . taskRunId ,
215
+ runFriendlyId : reason . friendlyId ,
216
+ dependencyId : dependency . id ,
217
+ childRunStatus : childRun . status ,
218
+ childRunId : childRun . id ,
219
+ } ) ;
220
+
221
+ return {
222
+ success : true ,
223
+ checkpoint,
224
+ event : checkpointEvent ,
225
+ keepRunAlive : false ,
226
+ } ;
227
+ }
228
+
229
+ const lastAttempt = await this . _prisma . taskRunAttempt . findFirst ( {
230
+ select : {
231
+ id : true ,
232
+ status : true ,
233
+ } ,
234
+ where : {
235
+ taskRunId : dependency . taskRunId ,
236
+ } ,
237
+ orderBy : {
238
+ createdAt : "desc" ,
239
+ } ,
240
+ } ) ;
241
+
242
+ if ( ! lastAttempt ) {
243
+ logger . debug ( "CreateCheckpointService: Dependency child attempt not found" , {
244
+ taskRunId : dependency . taskRunId ,
245
+ runFriendlyId : reason . friendlyId ,
246
+ dependencyId : dependency ?. id ,
247
+ } ) ;
248
+ return {
249
+ success : true ,
250
+ checkpoint,
251
+ event : checkpointEvent ,
252
+ keepRunAlive : false ,
253
+ } ;
254
+ }
255
+
256
+ if ( ! isFinalAttemptStatus ( lastAttempt . status ) ) {
257
+ logger . debug ( "CreateCheckpointService: Dependency child attempt not final" , {
258
+ taskRunId : dependency . taskRunId ,
259
+ runFriendlyId : reason . friendlyId ,
260
+ dependencyId : dependency . id ,
261
+ lastAttemptId : lastAttempt . id ,
262
+ lastAttemptStatus : lastAttempt . status ,
263
+ } ) ;
264
+
265
+ return {
266
+ success : true ,
267
+ checkpoint,
268
+ event : checkpointEvent ,
269
+ keepRunAlive : false ,
270
+ } ;
271
+ }
272
+
273
+ await ResumeTaskDependencyService . enqueue ( dependency . id , lastAttempt . id , this . _prisma ) ;
274
+
275
+ return {
276
+ success : true ,
277
+ checkpoint,
278
+ event : checkpointEvent ,
279
+ keepRunAlive : false ,
280
+ } ;
147
281
}
148
282
149
283
break ;
@@ -154,10 +288,37 @@ export class CreateCheckpointService extends BaseService {
154
288
batchDependencyFriendlyId : reason . batchFriendlyId ,
155
289
} ) ;
156
290
157
- keepRunAlive = await this . #isBatchCompleted( reason . batchFriendlyId ) ;
158
-
159
- if ( ! keepRunAlive ) {
160
- await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
291
+ if ( checkpointEvent ) {
292
+ const batchRun = await this . _prisma . batchTaskRun . findFirst ( {
293
+ select : {
294
+ id : true ,
295
+ } ,
296
+ where : {
297
+ friendlyId : reason . batchFriendlyId ,
298
+ } ,
299
+ } ) ;
300
+
301
+ if ( ! batchRun ) {
302
+ logger . error ( "CreateCheckpointService: Batch not found" , {
303
+ friendlyId : reason . batchFriendlyId ,
304
+ } ) ;
305
+
306
+ return {
307
+ success : true ,
308
+ checkpoint,
309
+ event : checkpointEvent ,
310
+ keepRunAlive : false ,
311
+ } ;
312
+ }
313
+
314
+ await ResumeBatchRunService . enqueue ( batchRun . id , this . _prisma ) ;
315
+
316
+ return {
317
+ success : true ,
318
+ checkpoint,
319
+ event : checkpointEvent ,
320
+ keepRunAlive : false ,
321
+ } ;
161
322
}
162
323
163
324
break ;
@@ -206,34 +367,4 @@ export class CreateCheckpointService extends BaseService {
206
367
keepRunAlive,
207
368
} ;
208
369
}
209
-
210
- async #isBatchCompleted( friendlyId : string ) : Promise < boolean > {
211
- const batch = await this . _prisma . batchTaskRun . findUnique ( {
212
- where : {
213
- friendlyId,
214
- } ,
215
- } ) ;
216
-
217
- if ( ! batch ) {
218
- logger . error ( "Batch not found" , { friendlyId } ) ;
219
- return false ;
220
- }
221
-
222
- return batch . status === "COMPLETED" ;
223
- }
224
-
225
- async #isRunCompleted( friendlyId : string ) : Promise < boolean > {
226
- const run = await this . _prisma . taskRun . findUnique ( {
227
- where : {
228
- friendlyId,
229
- } ,
230
- } ) ;
231
-
232
- if ( ! run ) {
233
- logger . error ( "Run not found" , { friendlyId } ) ;
234
- return false ;
235
- }
236
-
237
- return isFinalRunStatus ( run . status ) ;
238
- }
239
370
}
0 commit comments