Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal-packages/run-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@
"build": "pnpm run clean && tsc -p tsconfig.build.json",
"dev": "tsc --watch -p tsconfig.build.json"
}
}
}
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ export class RunEngine {
executionSnapshotSystem: this.executionSnapshotSystem,
runAttemptSystem: this.runAttemptSystem,
machines: this.options.machines,
releaseConcurrencySystem: this.releaseConcurrencySystem,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,59 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
});
}

public async getReleaseQueueMetrics(releaseQueueDescriptor: T) {
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
const currentTokensRaw = await this.redis.get(this.#bucketKey(releaseQueue));
const queueLength = await this.redis.zcard(this.#queueKey(releaseQueue));

const currentTokens = currentTokensRaw ? Number(currentTokensRaw) : undefined;

return { currentTokens, queueLength };
}

/**
* Refill a token only if the releaserId is not in the release queue.
* Returns true if the token was refilled, false if the releaserId was found in the queue.
*/
public async refillTokenIfNotInQueue(
releaseQueueDescriptor: T,
releaserId: string
): Promise<boolean> {
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);

if (maxTokens === 0) {
this.logger.debug("No tokens available, skipping refill", {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
});

return false;
}

const result = await this.redis.refillTokenIfNotInQueue(
this.masterQueuesKey,
this.#bucketKey(releaseQueue),
this.#queueKey(releaseQueue),
this.#metadataKey(releaseQueue),
releaseQueue,
releaserId,
String(maxTokens)
);

this.logger.debug("Attempted to refill token if not in queue", {
releaseQueueDescriptor,
releaserId,
maxTokens,
releaseQueue,
result,
});

return result === "true";
}

/**
* Get the next queue that has available capacity and process one item from it
* Returns true if an item was processed, false if no items were available
Expand Down Expand Up @@ -783,6 +836,51 @@ end
return true
`,
});

this.redis.defineCommand("refillTokenIfNotInQueue", {
numberOfKeys: 4,
lua: `
local masterQueuesKey = KEYS[1]
local bucketKey = KEYS[2]
local queueKey = KEYS[3]
local metadataKey = KEYS[4]

local releaseQueue = ARGV[1]
local releaserId = ARGV[2]
local maxTokens = tonumber(ARGV[3])

-- Check if the releaserId is in the queue
local score = redis.call("ZSCORE", queueKey, releaserId)
if score then
-- Item is in queue, don't refill token
return redis.status_reply("false")
end

-- Return the token to the bucket
local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
local remainingTokens = currentTokens + 1

-- Don't exceed maxTokens
if remainingTokens > maxTokens then
remainingTokens = maxTokens
end

redis.call("SET", bucketKey, remainingTokens)

-- Clean up any metadata just in case
redis.call("HDEL", metadataKey, releaserId)

-- Update the master queue based on remaining queue length
local queueLength = redis.call("ZCARD", queueKey)
if queueLength > 0 then
redis.call("ZADD", masterQueuesKey, remainingTokens, releaseQueue)
else
redis.call("ZREM", masterQueuesKey, releaseQueue)
end

return redis.status_reply("true")
`,
});
}
}

Expand Down Expand Up @@ -839,6 +937,17 @@ declare module "@internal/redis" {
releaserId: string,
callback?: Callback<void>
): Result<void, Context>;

refillTokenIfNotInQueue(
masterQueuesKey: string,
bucketKey: string,
queueKey: string,
metadataKey: string,
releaseQueue: string,
releaserId: string,
maxTokens: string,
callback?: Callback<string>
): Result<string, Context>;
}
}

Expand Down
10 changes: 10 additions & 0 deletions internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@ import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./execution
import { RunAttemptSystem } from "./runAttemptSystem.js";
import { SystemResources } from "./systems.js";
import { sendNotificationToWorker } from "../eventBus.js";
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";

export type DequeueSystemOptions = {
resources: SystemResources;
machines: RunEngineOptions["machines"];
executionSnapshotSystem: ExecutionSnapshotSystem;
runAttemptSystem: RunAttemptSystem;
releaseConcurrencySystem: ReleaseConcurrencySystem;
};

export class DequeueSystem {
private readonly $: SystemResources;
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
private readonly runAttemptSystem: RunAttemptSystem;
private readonly releaseConcurrencySystem: ReleaseConcurrencySystem;

constructor(private readonly options: DequeueSystemOptions) {
this.$ = options.resources;
this.executionSnapshotSystem = options.executionSnapshotSystem;
this.runAttemptSystem = options.runAttemptSystem;
this.releaseConcurrencySystem = options.releaseConcurrencySystem;
}

/**
Expand Down Expand Up @@ -158,6 +162,12 @@ export class DequeueSystem {
}
);

if (snapshot.previousSnapshotId) {
await this.releaseConcurrencySystem.refillTokensForSnapshot(
snapshot.previousSnapshotId
);
}

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,52 @@ export class ReleaseConcurrencySystem {
await this.releaseConcurrencyQueue.quit();
}

public async refillTokensForSnapshot(snapshotId: string | undefined): Promise<void>;
public async refillTokensForSnapshot(snapshot: TaskRunExecutionSnapshot): Promise<void>;
public async refillTokensForSnapshot(
snapshotOrId: TaskRunExecutionSnapshot | string | undefined
) {
if (!this.releaseConcurrencyQueue) {
return;
}

if (typeof snapshotOrId === "undefined") {
return;
}

const snapshot =
typeof snapshotOrId === "string"
? await this.$.prisma.taskRunExecutionSnapshot.findFirst({
where: { id: snapshotOrId },
})
: snapshotOrId;

if (!snapshot) {
this.$.logger.error("Snapshot not found", {
snapshotId: snapshotOrId,
});

return;
}

if (snapshot.executionStatus !== "EXECUTING_WITH_WAITPOINTS") {
this.$.logger.debug("Snapshot is not in a valid state to refill tokens", {
snapshot,
});

return;
}

await this.releaseConcurrencyQueue.refillTokenIfNotInQueue(
{
orgId: snapshot.organizationId,
projectId: snapshot.projectId,
envId: snapshot.environmentId,
},
snapshot.id
);
}

public async checkpointCreatedOnEnvironment(environment: RuntimeEnvironment) {
if (!this.releaseConcurrencyQueue) {
return;
Expand All @@ -86,11 +132,19 @@ export class ReleaseConcurrencySystem {

public async releaseConcurrencyForSnapshot(snapshot: TaskRunExecutionSnapshot) {
if (!this.releaseConcurrencyQueue) {
this.$.logger.debug("Release concurrency queue not enabled, skipping release", {
snapshotId: snapshot.id,
});

return;
}

// Go ahead and release concurrency immediately if the run is in a development environment
if (snapshot.environmentType === "DEVELOPMENT") {
this.$.logger.debug("Immediate release of concurrency for development environment", {
snapshotId: snapshot.id,
});

return await this.executeReleaseConcurrencyForSnapshot(snapshot.id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ export class WaitpointSystem {
}
);

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
Expand Down
Loading