@@ -450,7 +450,7 @@ export class TriggerTaskService extends BaseService {
450
450
) ,
451
451
0
452
452
)
453
- : null ;
453
+ : body . options . queue ?. concurrencyLimit ;
454
454
455
455
let taskQueue = await tx . taskQueue . findFirst ( {
456
456
where : {
@@ -459,74 +459,47 @@ export class TriggerTaskService extends BaseService {
459
459
} ,
460
460
} ) ;
461
461
462
- const existingConcurrencyLimit =
463
- typeof taskQueue ?. concurrencyLimit === "number"
464
- ? taskQueue . concurrencyLimit
465
- : undefined ;
466
-
467
- if ( taskQueue ) {
468
- if ( existingConcurrencyLimit !== concurrencyLimit ) {
469
- taskQueue = await tx . taskQueue . update ( {
470
- where : {
471
- id : taskQueue . id ,
472
- } ,
473
- data : {
474
- concurrencyLimit :
475
- typeof concurrencyLimit === "number" ? concurrencyLimit : null ,
476
- } ,
477
- } ) ;
478
-
479
- if ( typeof taskQueue . concurrencyLimit === "number" ) {
480
- logger . debug ( "TriggerTaskService: updating concurrency limit" , {
481
- runId : taskRun . id ,
482
- friendlyId : taskRun . friendlyId ,
483
- taskQueue,
484
- orgId : environment . organizationId ,
485
- projectId : environment . projectId ,
486
- existingConcurrencyLimit,
487
- concurrencyLimit,
488
- queueOptions : body . options ?. queue ,
489
- } ) ;
490
- await marqs ?. updateQueueConcurrencyLimits (
491
- environment ,
492
- taskQueue . name ,
493
- taskQueue . concurrencyLimit
494
- ) ;
495
- } else {
496
- logger . debug ( "TriggerTaskService: removing concurrency limit" , {
497
- runId : taskRun . id ,
498
- friendlyId : taskRun . friendlyId ,
499
- taskQueue,
500
- orgId : environment . organizationId ,
501
- projectId : environment . projectId ,
502
- existingConcurrencyLimit,
503
- concurrencyLimit,
504
- queueOptions : body . options ?. queue ,
505
- } ) ;
506
- await marqs ?. removeQueueConcurrencyLimits ( environment , taskQueue . name ) ;
507
- }
508
- }
509
- } else {
510
- const queueId = generateFriendlyId ( "queue" ) ;
511
-
462
+ if ( ! taskQueue ) {
463
+ // handle conflicts with existing queues
512
464
taskQueue = await tx . taskQueue . create ( {
513
465
data : {
514
- friendlyId : queueId ,
466
+ friendlyId : generateFriendlyId ( "queue" ) ,
515
467
name : queueName ,
516
468
concurrencyLimit,
517
469
runtimeEnvironmentId : environment . id ,
518
470
projectId : environment . projectId ,
519
471
type : "NAMED" ,
520
472
} ,
521
473
} ) ;
474
+ }
522
475
523
- if ( typeof taskQueue . concurrencyLimit === "number" ) {
524
- await marqs ?. updateQueueConcurrencyLimits (
525
- environment ,
526
- taskQueue . name ,
527
- taskQueue . concurrencyLimit
528
- ) ;
529
- }
476
+ if ( typeof concurrencyLimit === "number" ) {
477
+ logger . debug ( "TriggerTaskService: updating concurrency limit" , {
478
+ runId : taskRun . id ,
479
+ friendlyId : taskRun . friendlyId ,
480
+ taskQueue,
481
+ orgId : environment . organizationId ,
482
+ projectId : environment . projectId ,
483
+ concurrencyLimit,
484
+ queueOptions : body . options ?. queue ,
485
+ } ) ;
486
+
487
+ await marqs ?. updateQueueConcurrencyLimits (
488
+ environment ,
489
+ taskQueue . name ,
490
+ concurrencyLimit
491
+ ) ;
492
+ } else if ( concurrencyLimit === null ) {
493
+ logger . debug ( "TriggerTaskService: removing concurrency limit" , {
494
+ runId : taskRun . id ,
495
+ friendlyId : taskRun . friendlyId ,
496
+ taskQueue,
497
+ orgId : environment . organizationId ,
498
+ projectId : environment . projectId ,
499
+ queueOptions : body . options ?. queue ,
500
+ } ) ;
501
+
502
+ await marqs ?. removeQueueConcurrencyLimits ( environment , taskQueue . name ) ;
530
503
}
531
504
}
532
505
@@ -623,6 +596,18 @@ export class TriggerTaskService extends BaseService {
623
596
throw new ServiceValidationError (
624
597
`Cannot trigger ${ taskId } with a one-time use token as it has already been used.`
625
598
) ;
599
+ } else if (
600
+ Array . isArray ( target ) &&
601
+ target . length == 2 &&
602
+ typeof target [ 0 ] === "string" &&
603
+ typeof target [ 1 ] === "string" &&
604
+ target [ 0 ] == "runtimeEnvironmentId" &&
605
+ target [ 1 ] == "name" &&
606
+ error . message . includes ( "prisma.taskQueue.create" )
607
+ ) {
608
+ throw new Error (
609
+ `Failed to trigger ${ taskId } as the queue could not be created do to a unique constraint error, please try again.`
610
+ ) ;
626
611
} else {
627
612
throw new ServiceValidationError (
628
613
`Cannot trigger ${ taskId } as it has already been triggered with the same idempotency key.`
0 commit comments