From 0ccffedbe169640c068f571ec04e8adda58641f8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 16 Apr 2025 16:43:45 +0100 Subject: [PATCH 1/3] fix: refill release concurrency token bucket queue when runs resume before checkpoints are created --- internal-packages/run-engine/package.json | 5 +- .../run-engine/src/engine/index.ts | 1 + .../releaseConcurrencyTokenBucketQueue.ts | 109 +++++ .../src/engine/systems/dequeueSystem.ts | 10 + .../systems/releaseConcurrencySystem.ts | 54 +++ .../src/engine/systems/waitpointSystem.ts | 7 + .../engine/tests/releaseConcurrency.test.ts | 417 ++++++++++++++++++ ...releaseConcurrencyTokenBucketQueue.test.ts | 110 +++++ 8 files changed, 711 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index 65c36c5d34..f7fa531396 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -36,9 +36,10 @@ "scripts": { "clean": "rimraf dist", "typecheck": "tsc --noEmit -p tsconfig.build.json", - "test": "vitest --sequence.concurrent=false --no-file-parallelism", + "test": "vitest --sequence.concurrent=false --no-file-parallelism --run", + "test:dev": "vitest --sequence.concurrent=false --no-file-parallelism", "test:coverage": "vitest --sequence.concurrent=false --no-file-parallelism --coverage.enabled", "build": "pnpm run clean && tsc -p tsconfig.build.json", "dev": "tsc --watch -p tsconfig.build.json" } -} +} \ No newline at end of file diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 21993d809e..4f4131170b 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -305,6 +305,7 @@ export class RunEngine { executionSnapshotSystem: this.executionSnapshotSystem, runAttemptSystem: this.runAttemptSystem, machines: this.options.machines, + releaseConcurrencySystem: this.releaseConcurrencySystem, }); } diff --git a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts index d932803694..48ed249326 100644 --- a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts +++ b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts @@ -259,6 +259,59 @@ export class ReleaseConcurrencyTokenBucketQueue { }); } + public async getReleaseQueueMetrics(releaseQueueDescriptor: T) { + const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); + const currentTokensRaw = await this.redis.get(this.#bucketKey(releaseQueue)); + const queueLength = await this.redis.zcard(this.#queueKey(releaseQueue)); + + const currentTokens = currentTokensRaw ? Number(currentTokensRaw) : undefined; + + return { currentTokens, queueLength }; + } + + /** + * Refill a token only if the releaserId is not in the release queue. + * Returns true if the token was refilled, false if the releaserId was found in the queue. + */ + public async refillTokenIfNotInQueue( + releaseQueueDescriptor: T, + releaserId: string + ): Promise { + const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor); + const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); + + if (maxTokens === 0) { + this.logger.debug("No tokens available, skipping refill", { + releaseQueueDescriptor, + releaserId, + maxTokens, + releaseQueue, + }); + + return false; + } + + const result = await this.redis.refillTokenIfNotInQueue( + this.masterQueuesKey, + this.#bucketKey(releaseQueue), + this.#queueKey(releaseQueue), + this.#metadataKey(releaseQueue), + releaseQueue, + releaserId, + String(maxTokens) + ); + + this.logger.debug("Attempted to refill token if not in queue", { + releaseQueueDescriptor, + releaserId, + maxTokens, + releaseQueue, + result, + }); + + return result === "true"; + } + /** * Get the next queue that has available capacity and process one item from it * Returns true if an item was processed, false if no items were available @@ -783,6 +836,51 @@ end return true `, }); + + this.redis.defineCommand("refillTokenIfNotInQueue", { + numberOfKeys: 4, + lua: ` +local masterQueuesKey = KEYS[1] +local bucketKey = KEYS[2] +local queueKey = KEYS[3] +local metadataKey = KEYS[4] + +local releaseQueue = ARGV[1] +local releaserId = ARGV[2] +local maxTokens = tonumber(ARGV[3]) + +-- Check if the releaserId is in the queue +local score = redis.call("ZSCORE", queueKey, releaserId) +if score then + -- Item is in queue, don't refill token + return redis.status_reply("false") +end + +-- Return the token to the bucket +local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens) +local remainingTokens = currentTokens + 1 + +-- Don't exceed maxTokens +if remainingTokens > maxTokens then + remainingTokens = maxTokens +end + +redis.call("SET", bucketKey, remainingTokens) + +-- Clean up any metadata just in case +redis.call("HDEL", metadataKey, releaserId) + +-- Update the master queue based on remaining queue length +local queueLength = redis.call("ZCARD", queueKey) +if queueLength > 0 then + redis.call("ZADD", masterQueuesKey, remainingTokens, releaseQueue) +else + redis.call("ZREM", masterQueuesKey, releaseQueue) +end + +return redis.status_reply("true") + `, + }); } } @@ -839,6 +937,17 @@ declare module "@internal/redis" { releaserId: string, callback?: Callback ): Result; + + refillTokenIfNotInQueue( + masterQueuesKey: string, + bucketKey: string, + queueKey: string, + metadataKey: string, + releaseQueue: string, + releaserId: string, + maxTokens: string, + callback?: Callback + ): Result; } } diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 912c9e2d25..9776a98518 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -11,23 +11,27 @@ import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./execution import { RunAttemptSystem } from "./runAttemptSystem.js"; import { SystemResources } from "./systems.js"; import { sendNotificationToWorker } from "../eventBus.js"; +import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js"; export type DequeueSystemOptions = { resources: SystemResources; machines: RunEngineOptions["machines"]; executionSnapshotSystem: ExecutionSnapshotSystem; runAttemptSystem: RunAttemptSystem; + releaseConcurrencySystem: ReleaseConcurrencySystem; }; export class DequeueSystem { private readonly $: SystemResources; private readonly executionSnapshotSystem: ExecutionSnapshotSystem; private readonly runAttemptSystem: RunAttemptSystem; + private readonly releaseConcurrencySystem: ReleaseConcurrencySystem; constructor(private readonly options: DequeueSystemOptions) { this.$ = options.resources; this.executionSnapshotSystem = options.executionSnapshotSystem; this.runAttemptSystem = options.runAttemptSystem; + this.releaseConcurrencySystem = options.releaseConcurrencySystem; } /** @@ -158,6 +162,12 @@ export class DequeueSystem { } ); + if (snapshot.previousSnapshotId) { + await this.releaseConcurrencySystem.refillTokensForSnapshot( + snapshot.previousSnapshotId + ); + } + await sendNotificationToWorker({ runId, snapshot: newSnapshot, diff --git a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts index 6798780b4a..5aa1fa6ffc 100644 --- a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts +++ b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts @@ -69,6 +69,52 @@ export class ReleaseConcurrencySystem { await this.releaseConcurrencyQueue.quit(); } + public async refillTokensForSnapshot(snapshotId: string | undefined): Promise; + public async refillTokensForSnapshot(snapshot: TaskRunExecutionSnapshot): Promise; + public async refillTokensForSnapshot( + snapshotOrId: TaskRunExecutionSnapshot | string | undefined + ) { + if (!this.releaseConcurrencyQueue) { + return; + } + + if (typeof snapshotOrId === "undefined") { + return; + } + + const snapshot = + typeof snapshotOrId === "string" + ? await this.$.prisma.taskRunExecutionSnapshot.findFirst({ + where: { id: snapshotOrId }, + }) + : snapshotOrId; + + if (!snapshot) { + this.$.logger.error("Snapshot not found", { + snapshotId: snapshotOrId, + }); + + return; + } + + if (snapshot.executionStatus !== "EXECUTING_WITH_WAITPOINTS") { + this.$.logger.debug("Snapshot is not in a valid state to refill tokens", { + snapshot, + }); + + return; + } + + await this.releaseConcurrencyQueue.refillTokenIfNotInQueue( + { + orgId: snapshot.organizationId, + projectId: snapshot.projectId, + envId: snapshot.environmentId, + }, + snapshot.id + ); + } + public async checkpointCreatedOnEnvironment(environment: RuntimeEnvironment) { if (!this.releaseConcurrencyQueue) { return; @@ -86,11 +132,19 @@ export class ReleaseConcurrencySystem { public async releaseConcurrencyForSnapshot(snapshot: TaskRunExecutionSnapshot) { if (!this.releaseConcurrencyQueue) { + this.$.logger.debug("Release concurrency queue not enabled, skipping release", { + snapshotId: snapshot.id, + }); + return; } // Go ahead and release concurrency immediately if the run is in a development environment if (snapshot.environmentType === "DEVELOPMENT") { + this.$.logger.debug("Immediate release of concurrency for development environment", { + snapshotId: snapshot.id, + }); + return await this.executeReleaseConcurrencyForSnapshot(snapshot.id); } diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 2d4d6b3381..9500412457 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -510,6 +510,11 @@ export class WaitpointSystem { await this.$.runLock.lock([runId], 5000, async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); + this.$.logger.debug("continueRunIfUnblocked", { + runId, + snapshot, + }); + //run is still executing, send a message to the worker if (isExecuting(snapshot.executionStatus)) { const result = await this.$.runQueue.reacquireConcurrency( @@ -543,6 +548,8 @@ export class WaitpointSystem { } ); + await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot); + await sendNotificationToWorker({ runId, snapshot: newSnapshot, diff --git a/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts b/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts index 43560d60f8..3602810e6a 100644 --- a/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts +++ b/internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts @@ -3,6 +3,7 @@ import { trace } from "@internal/tracing"; import { RunEngine } from "../index.js"; import { setTimeout } from "node:timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; +import { EventBusEventArgs } from "../eventBus.js"; vi.setConfig({ testTimeout: 60_000 }); @@ -1087,4 +1088,420 @@ describe("RunEngine Releasing Concurrency", () => { expect(queueConcurrencyAfterReturn).toBe(1); } ); + + containerTest( + "refills token bucket after waitpoint completion when snapshot not in release queue", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + releaseConcurrency: { + maxTokensRatio: 0.1, // 10% of the concurrency limit = 1 token + maxRetries: 3, + consumersCount: 1, + pollInterval: 500, + batchSize: 1, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + const taskIdentifier = "test-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: `task/${taskIdentifier}`, + isTest: false, + tags: [], + }, + prisma + ); + + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + const queueConcurrency = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrency).toBe(1); + + const envConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrency).toBe(1); + + // create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + expect(attemptResult.snapshot.executionStatus).toBe("EXECUTING"); + + // create a manual waitpoint + const result = await engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + + // Block the run, not specifying any release concurrency option + const executingWithWaitpointSnapshot = await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: result.waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + }); + + expect(executingWithWaitpointSnapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Now confirm the environment concurrency has been released + const envConcurrencyAfter = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrencyAfter).toBe(0); + + // And confirm the release concurrency system has consumed the token + const queueMetrics = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetrics?.currentTokens).toBe(0); + + await engine.completeWaitpoint({ + id: result.waitpoint.id, + }); + + await setTimeout(1_000); + + const executionData2 = await engine.getRunExecutionData({ runId: run.id }); + expect(executionData2?.snapshot.executionStatus).toBe("EXECUTING"); + + const queueMetricsAfter = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetricsAfter?.currentTokens).toBe(1); + } + ); + + containerTest( + "refills token bucket after waitpoint completion when unable to reacquire concurrency, after dequeuing the queued executing run", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + releaseConcurrency: { + maxTokensRatio: 1, + maxRetries: 3, + consumersCount: 1, + pollInterval: 500, + batchSize: 1, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + const taskIdentifier = "test-task"; + + await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier, + undefined, + undefined, + { + concurrencyLimit: 1, + } + ); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: `task/${taskIdentifier}`, + isTest: false, + tags: [], + }, + prisma + ); + + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + const queueConcurrency = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrency).toBe(1); + + const envConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrency).toBe(1); + + // create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + expect(attemptResult.snapshot.executionStatus).toBe("EXECUTING"); + + // create a manual waitpoint + const result = await engine.createManualWaitpoint({ + environmentId: authenticatedEnvironment.id, + projectId: authenticatedEnvironment.projectId, + }); + + // Block the run, specifying the release concurrency option as true + const executingWithWaitpointSnapshot = await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: result.waitpoint.id, + projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, + releaseConcurrency: true, + }); + + expect(executingWithWaitpointSnapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); + + // Now confirm the environment concurrency has been released + const envConcurrencyAfter = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrencyAfter).toBe(0); + + const queueConcurrencyAfter = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrencyAfter).toBe(0); + + // And confirm the release concurrency system has consumed the token + const queueMetrics = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetrics?.currentTokens).toBe(9); + + // Create and start second run on the same queue + const secondRun = await engine.trigger( + { + number: 2, + friendlyId: "run_second", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345-second", + spanId: "s12345-second", + masterQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + // Dequeue and start the second run + const dequeuedSecond = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: secondRun.masterQueue, + maxRunCount: 10, + }); + + // Now confirm the environment concurrency has been released + const envConcurrencyAfterSecond = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + + expect(envConcurrencyAfterSecond).toBe(1); + + const queueConcurrencyAfterSecond = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrencyAfterSecond).toBe(1); + + const secondAttempt = await engine.startRunAttempt({ + runId: dequeuedSecond[0].run.id, + snapshotId: dequeuedSecond[0].snapshot.id, + }); + + expect(secondAttempt.snapshot.executionStatus).toBe("EXECUTING"); + + // Complete the waitpoint that's blocking the first run + await engine.completeWaitpoint({ + id: result.waitpoint.id, + }); + + await setTimeout(1_000); + + // Verify that the first run could not reacquire the concurrency so it's back in the queue + const executionData2 = await engine.getRunExecutionData({ runId: run.id }); + expect(executionData2?.snapshot.executionStatus).toBe("QUEUED_EXECUTING"); + + const queueMetricsAfter = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + // We've consumed 1 token, so we should have 9 left + expect(queueMetricsAfter?.currentTokens).toBe(9); + + // Complete the second run so the first run can be dequeued + await engine.completeRunAttempt({ + runId: dequeuedSecond[0].run.id, + snapshotId: secondAttempt.snapshot.id, + completion: { + ok: true, + id: dequeuedSecond[0].run.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + await setTimeout(500); + + // Check the current concurrency of the queue/environment + const queueConcurrencyAfterSecondFinished = await engine.runQueue.currentConcurrencyOfQueue( + authenticatedEnvironment, + `task/${taskIdentifier}` + ); + + expect(queueConcurrencyAfterSecondFinished).toBe(0); + + const envConcurrencyAfterSecondFinished = + await engine.runQueue.currentConcurrencyOfEnvironment(authenticatedEnvironment); + + expect(envConcurrencyAfterSecondFinished).toBe(0); + + let event: EventBusEventArgs<"workerNotification">[0] | undefined = undefined; + engine.eventBus.on("workerNotification", (result) => { + event = result; + }); + + // Verify the first run is back in the queue + const queuedRun = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + // We don't actually return the run here from dequeuing, it's instead sent to the cluster as a workerNotification + expect(queuedRun.length).toBe(0); + + assertNonNullable(event); + const notificationEvent = event as EventBusEventArgs<"workerNotification">[0]; + expect(notificationEvent.run.id).toBe(run.id); + expect(notificationEvent.snapshot.executionStatus).toBe("EXECUTING"); + + // Make sure the token bucket is refilled + const queueMetricsAfterSecondFinished = + await engine.releaseConcurrencySystem.releaseConcurrencyQueue?.getReleaseQueueMetrics({ + orgId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + envId: authenticatedEnvironment.id, + }); + + expect(queueMetricsAfterSecondFinished?.currentTokens).toBe(10); + } + ); }); diff --git a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts index caed8ccff9..1de58a2ccd 100644 --- a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts +++ b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts @@ -713,4 +713,114 @@ describe("ReleaseConcurrencyQueue", () => { await queue.quit(); } ); + + redisTest( + "refillTokenIfNotInQueue should refill token when releaserId is not in queue", + async ({ redisContainer }) => { + const { queue, executedRuns } = createReleaseConcurrencyQueue(redisContainer, 2); + + try { + // Use up all tokens + await queue.attemptToRelease({ name: "test-queue" }, "run1"); + await queue.attemptToRelease({ name: "test-queue" }, "run2"); + + // Verify tokens were used + expect(executedRuns).toHaveLength(2); + + // Try to refill token for a releaserId that's not in queue + const wasRefilled = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run3"); + expect(wasRefilled).toBe(true); + + // Verify we can now execute a new run + await queue.attemptToRelease({ name: "test-queue" }, "run3"); + await setTimeout(100); + expect(executedRuns).toHaveLength(3); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "refillTokenIfNotInQueue should not refill token when releaserId is in queue", + async ({ redisContainer }) => { + const { queue, executedRuns } = createReleaseConcurrencyQueue(redisContainer, 1); + + try { + // Use the only token + await queue.attemptToRelease({ name: "test-queue" }, "run1"); + expect(executedRuns).toHaveLength(1); + + // Queue up run2 + await queue.attemptToRelease({ name: "test-queue" }, "run2"); + expect(executedRuns).toHaveLength(1); // run2 is queued + + // Try to refill token for run2 which is in queue + const wasRefilled = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run2"); + expect(wasRefilled).toBe(false); + + // Verify run2 is still queued by refilling a token normally + await queue.refillTokens({ name: "test-queue" }, 1); + await setTimeout(100); + expect(executedRuns).toHaveLength(2); + expect(executedRuns[1]).toEqual({ releaseQueue: "test-queue", runId: "run2" }); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "refillTokenIfNotInQueue should handle multiple queues independently", + async ({ redisContainer }) => { + const { queue, executedRuns } = createReleaseConcurrencyQueue(redisContainer, 1); + + try { + // Use tokens in both queues + await queue.attemptToRelease({ name: "queue1" }, "run1"); + await queue.attemptToRelease({ name: "queue2" }, "run2"); + expect(executedRuns).toHaveLength(2); + + // Queue up more runs + await queue.attemptToRelease({ name: "queue1" }, "run3"); + await queue.attemptToRelease({ name: "queue2" }, "run4"); + expect(executedRuns).toHaveLength(2); // run3 and run4 are queued + + // Try to refill tokens for different releaserIds + const wasRefilled1 = await queue.refillTokenIfNotInQueue({ name: "queue1" }, "run5"); + const wasRefilled2 = await queue.refillTokenIfNotInQueue({ name: "queue2" }, "run4"); + + expect(wasRefilled1).toBe(true); // run5 not in queue1 + expect(wasRefilled2).toBe(false); // run4 is in queue2 + + // Verify queue1 can execute a new run with the refilled token + await queue.attemptToRelease({ name: "queue1" }, "run5"); + await setTimeout(100); + expect(executedRuns).toHaveLength(3); + expect(executedRuns[2]).toEqual({ releaseQueue: "queue1", runId: "run5" }); + } finally { + await queue.quit(); + } + } + ); + + redisTest("refillTokenIfNotInQueue should not exceed maxTokens", async ({ redisContainer }) => { + const { queue } = createReleaseConcurrencyQueue(redisContainer, 1); + + try { + // First refill should work + const firstRefill = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run1"); + expect(firstRefill).toBe(true); + + // Second refill should work but not exceed maxTokens + const secondRefill = await queue.refillTokenIfNotInQueue({ name: "test-queue" }, "run2"); + expect(secondRefill).toBe(true); + + // Get metrics to verify token count + const metrics = await queue.getReleaseQueueMetrics({ name: "test-queue" }); + expect(metrics.currentTokens).toBe(1); // Should not exceed maxTokens + } finally { + await queue.quit(); + } + }); }); From 97838df0a20df362fbe46bc0aacab0eb2486d1d3 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 16 Apr 2025 16:55:13 +0100 Subject: [PATCH 2/3] Fix the engine package.json tests --- internal-packages/run-engine/package.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index f7fa531396..30c9cff7a8 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -36,8 +36,7 @@ "scripts": { "clean": "rimraf dist", "typecheck": "tsc --noEmit -p tsconfig.build.json", - "test": "vitest --sequence.concurrent=false --no-file-parallelism --run", - "test:dev": "vitest --sequence.concurrent=false --no-file-parallelism", + "test": "vitest --sequence.concurrent=false --no-file-parallelism", "test:coverage": "vitest --sequence.concurrent=false --no-file-parallelism --coverage.enabled", "build": "pnpm run clean && tsc -p tsconfig.build.json", "dev": "tsc --watch -p tsconfig.build.json" From 2bbdaeec8043da5fb8b79b7a0d24be79bb27af87 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 16 Apr 2025 16:57:52 +0100 Subject: [PATCH 3/3] Remove log --- .../run-engine/src/engine/systems/waitpointSystem.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 9500412457..687d077822 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -510,11 +510,6 @@ export class WaitpointSystem { await this.$.runLock.lock([runId], 5000, async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); - this.$.logger.debug("continueRunIfUnblocked", { - runId, - snapshot, - }); - //run is still executing, send a message to the worker if (isExecuting(snapshot.executionStatus)) { const result = await this.$.runQueue.reacquireConcurrency(