@@ -15,6 +15,7 @@ import {
15
15
RegisteredHookFunction ,
16
16
TaskCompleteResult ,
17
17
TaskInitOutput ,
18
+ TaskWait ,
18
19
} from "../lifecycleHooks/types.js" ;
19
20
import { recordSpanException , TracingSDK } from "../otel/index.js" ;
20
21
import { runTimelineMetrics } from "../run-timeline-metrics-api.js" ;
@@ -126,6 +127,14 @@ export class TaskExecutor {
126
127
127
128
parsedPayload = await this . #parsePayload( payloadResult ) ;
128
129
130
+ lifecycleHooks . registerOnWaitHookListener ( async ( wait ) => {
131
+ await this . #callOnWaitFunctions( wait , parsedPayload , ctx , initOutput , signal ) ;
132
+ } ) ;
133
+
134
+ lifecycleHooks . registerOnResumeHookListener ( async ( wait ) => {
135
+ await this . #callOnResumeFunctions( wait , parsedPayload , ctx , initOutput , signal ) ;
136
+ } ) ;
137
+
129
138
const executeTask = async ( payload : any ) => {
130
139
const [ runError , output ] = await tryCatch (
131
140
( async ( ) => {
@@ -383,6 +392,146 @@ export class TaskExecutor {
383
392
} ) ;
384
393
}
385
394
395
+ async #callOnWaitFunctions(
396
+ wait : TaskWait ,
397
+ payload : unknown ,
398
+ ctx : TaskRunContext ,
399
+ initOutput : TaskInitOutput ,
400
+ signal ?: AbortSignal
401
+ ) {
402
+ const globalWaitHooks = lifecycleHooks . getGlobalWaitHooks ( ) ;
403
+ const taskWaitHook = lifecycleHooks . getTaskWaitHook ( this . task . id ) ;
404
+
405
+ if ( globalWaitHooks . length === 0 && ! taskWaitHook ) {
406
+ return ;
407
+ }
408
+
409
+ const result = await runTimelineMetrics . measureMetric (
410
+ "trigger.dev/execution" ,
411
+ "onWait" ,
412
+ async ( ) => {
413
+ for ( const hook of globalWaitHooks ) {
414
+ const [ hookError ] = await tryCatch (
415
+ this . _tracer . startActiveSpan (
416
+ hook . name ? `onWait/${ hook . name } ` : "onWait/global" ,
417
+ async ( span ) => {
418
+ await hook . fn ( { payload, ctx, signal, task : this . task . id , wait, init : initOutput } ) ;
419
+ } ,
420
+ {
421
+ attributes : {
422
+ [ SemanticInternalAttributes . STYLE_ICON ] : "task-hook-onWait" ,
423
+ [ SemanticInternalAttributes . COLLAPSED ] : true ,
424
+ } ,
425
+ }
426
+ )
427
+ ) ;
428
+
429
+ if ( hookError ) {
430
+ throw hookError ;
431
+ }
432
+ }
433
+
434
+ if ( taskWaitHook ) {
435
+ const [ hookError ] = await tryCatch (
436
+ this . _tracer . startActiveSpan (
437
+ "onWait/task" ,
438
+ async ( span ) => {
439
+ await taskWaitHook ( {
440
+ payload,
441
+ ctx,
442
+ signal,
443
+ task : this . task . id ,
444
+ wait,
445
+ init : initOutput ,
446
+ } ) ;
447
+ } ,
448
+ {
449
+ attributes : {
450
+ [ SemanticInternalAttributes . STYLE_ICON ] : "task-hook-onWait" ,
451
+ [ SemanticInternalAttributes . COLLAPSED ] : true ,
452
+ } ,
453
+ }
454
+ )
455
+ ) ;
456
+
457
+ if ( hookError ) {
458
+ throw hookError ;
459
+ }
460
+ }
461
+ }
462
+ ) ;
463
+ }
464
+
465
+ async #callOnResumeFunctions(
466
+ wait : TaskWait ,
467
+ payload : unknown ,
468
+ ctx : TaskRunContext ,
469
+ initOutput : TaskInitOutput ,
470
+ signal ?: AbortSignal
471
+ ) {
472
+ const globalResumeHooks = lifecycleHooks . getGlobalResumeHooks ( ) ;
473
+ const taskResumeHook = lifecycleHooks . getTaskResumeHook ( this . task . id ) ;
474
+
475
+ if ( globalResumeHooks . length === 0 && ! taskResumeHook ) {
476
+ return ;
477
+ }
478
+
479
+ const result = await runTimelineMetrics . measureMetric (
480
+ "trigger.dev/execution" ,
481
+ "onResume" ,
482
+ async ( ) => {
483
+ for ( const hook of globalResumeHooks ) {
484
+ const [ hookError ] = await tryCatch (
485
+ this . _tracer . startActiveSpan (
486
+ hook . name ? `onResume/${ hook . name } ` : "onResume/global" ,
487
+ async ( span ) => {
488
+ await hook . fn ( { payload, ctx, signal, task : this . task . id , wait, init : initOutput } ) ;
489
+ } ,
490
+ {
491
+ attributes : {
492
+ [ SemanticInternalAttributes . STYLE_ICON ] : "task-hook-onResume" ,
493
+ [ SemanticInternalAttributes . COLLAPSED ] : true ,
494
+ } ,
495
+ }
496
+ )
497
+ ) ;
498
+
499
+ if ( hookError ) {
500
+ throw hookError ;
501
+ }
502
+ }
503
+
504
+ if ( taskResumeHook ) {
505
+ const [ hookError ] = await tryCatch (
506
+ this . _tracer . startActiveSpan (
507
+ "onResume/task" ,
508
+ async ( span ) => {
509
+ await taskResumeHook ( {
510
+ payload,
511
+ ctx,
512
+ signal,
513
+ task : this . task . id ,
514
+ wait,
515
+ init : initOutput ,
516
+ } ) ;
517
+ } ,
518
+ {
519
+ attributes : {
520
+ [ SemanticInternalAttributes . STYLE_ICON ] : "task-hook-onResume" ,
521
+ [ SemanticInternalAttributes . COLLAPSED ] : true ,
522
+ } ,
523
+ }
524
+ )
525
+ ) ;
526
+
527
+ if ( hookError ) {
528
+ throw hookError ;
529
+ }
530
+ }
531
+ }
532
+ ) ;
533
+ }
534
+
386
535
async #callInitFunctions( payload : unknown , ctx : TaskRunContext , signal ?: AbortSignal ) {
387
536
const globalInitHooks = lifecycleHooks . getGlobalInitHooks ( ) ;
388
537
const taskInitHook = lifecycleHooks . getTaskInitHook ( this . task . id ) ;
0 commit comments