From 274aac71bd0f2d0419873068648a8a16b201e942 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 16 Jan 2025 17:40:14 +0000 Subject: [PATCH 1/4] Shared queue consumer telemetry improvements --- apps/webapp/app/env.server.ts | 3 +- .../app/v3/marqs/devQueueConsumer.server.ts | 1 - .../v3/marqs/sharedQueueConsumer.server.ts | 1675 ++++++++++------- apps/webapp/app/v3/tracer.server.ts | 13 +- packages/core/src/logger.ts | 5 + 5 files changed, 1050 insertions(+), 647 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 1dd46326cf..ad4834ce03 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -190,8 +190,7 @@ const EnvironmentSchema = z.object({ // Internal OTEL environment variables INTERNAL_OTEL_TRACE_EXPORTER_URL: z.string().optional(), - INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_NAME: z.string().optional(), - INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_VALUE: z.string().optional(), + INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS: z.string().optional(), INTERNAL_OTEL_TRACE_LOGGING_ENABLED: z.string().default("1"), // this means 1/20 traces or 5% of traces will be sampled (sampled = recorded) INTERNAL_OTEL_TRACE_SAMPLING_RATE: z.string().default("20"), diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index ca520d8199..8560312d70 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -280,7 +280,6 @@ export class DevQueueConsumer { kind: SpanKind.CONSUMER, attributes: { ...attributesFromAuthenticatedEnv(this.env), - [SEMINTATTRS_FORCE_RECORDING]: true, }, }, ROOT_CONTEXT diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 39037903cd..e5f57d93d8 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -1,4 +1,13 @@ -import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api"; +import { + Context, + ROOT_CONTEXT, + Span, + SpanKind, + SpanOptions, + SpanStatusCode, + context, + trace, +} from "@opentelemetry/api"; import { MachinePreset, ProdTaskRunExecution, @@ -19,19 +28,22 @@ import { BackgroundWorkerTask, Prisma, RuntimeEnvironment, - TaskRun, TaskRunStatus, } from "@trigger.dev/database"; import { z } from "zod"; import { $replica, prisma } from "~/db.server"; +import { env } from "~/env.server"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; +import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server"; +import { EnvironmentVariable } from "../environmentVariables/repository"; import { FailedTaskRunService } from "../failedTaskRun.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { socketIo } from "../handleSocketIo.server"; +import { machinePresetFromConfig, machinePresetFromRun } from "../machinePresets.server"; import { findCurrentWorkerDeployment, getWorkerDeploymentFromWorker, @@ -40,46 +52,61 @@ import { import { CrashTaskRunService } from "../services/crashTaskRun.server"; import { CreateTaskRunAttemptService } from "../services/createTaskRunAttempt.server"; import { RestoreCheckpointService } from "../services/restoreCheckpoint.server"; -import { SEMINTATTRS_FORCE_RECORDING, tracer } from "../tracer.server"; -import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; -import { EnvironmentVariable } from "../environmentVariables/repository"; -import { machinePresetFromConfig, machinePresetFromRun } from "../machinePresets.server"; -import { env } from "~/env.server"; import { FINAL_ATTEMPT_STATUSES, FINAL_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus, } from "../taskStatus"; +import { SEMINTATTRS_FORCE_RECORDING, tracer } from "../tracer.server"; import { getMaxDuration } from "../utils/maxDuration"; +import { MessagePayload } from "./types"; const WithTraceContext = z.object({ traceparent: z.string().optional(), tracestate: z.string().optional(), }); +export const SharedQueueExecuteMessageBody = WithTraceContext.extend({ + type: z.literal("EXECUTE"), + taskIdentifier: z.string(), + checkpointEventId: z.string().optional(), + retryCheckpointsDisabled: z.boolean().optional(), +}); + +export type SharedQueueExecuteMessageBody = z.infer; + +export const SharedQueueResumeMessageBody = WithTraceContext.extend({ + type: z.literal("RESUME"), + completedAttemptIds: z.string().array(), + resumableAttemptId: z.string(), + checkpointEventId: z.string().optional(), +}); + +export type SharedQueueResumeMessageBody = z.infer; + +export const SharedQueueResumeAfterDurationMessageBody = WithTraceContext.extend({ + type: z.literal("RESUME_AFTER_DURATION"), + resumableAttemptId: z.string(), + checkpointEventId: z.string(), +}); + +export type SharedQueueResumeAfterDurationMessageBody = z.infer< + typeof SharedQueueResumeAfterDurationMessageBody +>; + +export const SharedQueueFailMessageBody = WithTraceContext.extend({ + type: z.literal("FAIL"), + reason: z.string(), +}); + +export type SharedQueueFailMessageBody = z.infer; + export const SharedQueueMessageBody = z.discriminatedUnion("type", [ - WithTraceContext.extend({ - type: z.literal("EXECUTE"), - taskIdentifier: z.string(), - checkpointEventId: z.string().optional(), - retryCheckpointsDisabled: z.boolean().optional(), - }), - WithTraceContext.extend({ - type: z.literal("RESUME"), - completedAttemptIds: z.string().array(), - resumableAttemptId: z.string(), - checkpointEventId: z.string().optional(), - }), - WithTraceContext.extend({ - type: z.literal("RESUME_AFTER_DURATION"), - resumableAttemptId: z.string(), - checkpointEventId: z.string(), - }), - WithTraceContext.extend({ - type: z.literal("FAIL"), - reason: z.string(), - }), + SharedQueueExecuteMessageBody, + SharedQueueResumeMessageBody, + SharedQueueResumeAfterDurationMessageBody, + SharedQueueFailMessageBody, ]); export type SharedQueueMessageBody = z.infer; @@ -93,23 +120,48 @@ export type SharedQueueConsumerOptions = { interval?: number; }; +type DoWorkInternalResult = + | { + reason: string; + attrs?: Record; + error?: Error | string; + interval?: number; + } + | undefined; + +type HandleMessageResult = + | { + action: "ack_and_do_more_work" | "nack" | "nack_and_do_more_work" | "noop"; + interval?: number; + retryInMs?: number; + reason?: string; + attrs?: Record; + error?: Error | string; + } + | undefined; + export class SharedQueueConsumer { private _backgroundWorkers: Map = new Map(); private _deprecatedWorkers: Map = new Map(); private _enabled = false; private _options: Required; private _perTraceCountdown: number | undefined; - private _lastNewTrace: Date | undefined; + private _traceStartedAt: Date | undefined; private _currentSpanContext: Context | undefined; - private _taskFailures: number = 0; - private _taskSuccesses: number = 0; + private _reasonStats: Record = {}; private _currentSpan: Span | undefined; private _endSpanInNextIteration = false; private _tasks = sharedQueueTasks; private _id: string; + private _connectedAt: Date; + private _iterationsCount = 0; + private _totalIterationsCount = 0; + private _runningDurationInMs = 0; + private _currentMessage: MessagePayload | undefined; + private _currentMessageData: SharedQueueMessageBody | undefined; constructor( - private _sender: ZodMessageSender, + private _providerSender: ZodMessageSender, options: SharedQueueConsumerOptions = {} ) { this._options = { @@ -120,6 +172,7 @@ export class SharedQueueConsumer { }; this._id = generateFriendlyId("shared-queue", 6); + this._connectedAt = new Date(); } // This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it @@ -142,7 +195,7 @@ export class SharedQueueConsumer { return; } - const backgroundWorker = await prisma.backgroundWorker.findUnique({ + const backgroundWorker = await prisma.backgroundWorker.findFirst({ where: { friendlyId: id, runtimeEnvironmentId: envId, @@ -188,17 +241,18 @@ export class SharedQueueConsumer { this._enabled = true; this._perTraceCountdown = this._options.maximumItemsPerTrace; - this._lastNewTrace = new Date(); - this._taskFailures = 0; - this._taskSuccesses = 0; + this._traceStartedAt = new Date(); + this._reasonStats = {}; this.#doWork().finally(() => {}); } #endCurrentSpan() { if (this._currentSpan) { - this._currentSpan.setAttribute("tasks.period.failures", this._taskFailures); - this._currentSpan.setAttribute("tasks.period.successes", this._taskSuccesses); + for (const [reason, count] of Object.entries(this._reasonStats)) { + this._currentSpan.setAttribute(`reasons.${reason}`, count); + } + this._currentSpan.end(); } } @@ -212,19 +266,39 @@ export class SharedQueueConsumer { // Check if the trace has expired if ( this._perTraceCountdown === 0 || - Date.now() - this._lastNewTrace!.getTime() > this._options.traceTimeoutSeconds * 1000 || + Date.now() - this._traceStartedAt!.getTime() > this._options.traceTimeoutSeconds * 1000 || this._currentSpanContext === undefined || this._endSpanInNextIteration ) { this.#endCurrentSpan(); + const traceDurationInMs = this._traceStartedAt + ? Date.now() - this._traceStartedAt.getTime() + : undefined; + const iterationsPerSecond = traceDurationInMs + ? this._iterationsCount / (traceDurationInMs / 1000) + : undefined; + // Create a new trace this._currentSpan = tracer.startSpan( "SharedQueueConsumer.doWork()", { kind: SpanKind.CONSUMER, attributes: { - [SEMINTATTRS_FORCE_RECORDING]: true, + id: this._id, + iterations: this._iterationsCount, + total_iterations: this._totalIterationsCount, + options_maximumItemsPerTrace: this._options.maximumItemsPerTrace, + options_nextTickInterval: this._options.nextTickInterval, + options_interval: this._options.interval, + connected_at: this._connectedAt.toISOString(), + consumer_age_in_seconds: (Date.now() - this._connectedAt.getTime()) / 1000, + do_work_internal_per_second: this._iterationsCount / (this._runningDurationInMs / 1000), + running_duration_ms: this._runningDurationInMs, + trace_timeout_in_seconds: this._options.traceTimeoutSeconds, + trace_duration_ms: traceDurationInMs, + iterations_per_second: iterationsPerSecond, + iterations_per_minute: iterationsPerSecond ? iterationsPerSecond * 60 : undefined, }, }, ROOT_CONTEXT @@ -234,19 +308,81 @@ export class SharedQueueConsumer { this._currentSpanContext = trace.setSpan(ROOT_CONTEXT, this._currentSpan); this._perTraceCountdown = this._options.maximumItemsPerTrace; - this._lastNewTrace = new Date(); - this._taskFailures = 0; - this._taskSuccesses = 0; + this._traceStartedAt = new Date(); + this._reasonStats = {}; + this._iterationsCount = 0; + this._runningDurationInMs = 0; this._endSpanInNextIteration = false; } return context.with(this._currentSpanContext ?? ROOT_CONTEXT, async () => { - await this.#doWorkInternal(); - this._perTraceCountdown = this._perTraceCountdown! - 1; + await tracer.startActiveSpan("doWorkInternal()", async (span) => { + let nextInterval = this._options.interval; + + span.setAttributes({ + id: this._id, + total_iterations: this._totalIterationsCount, + iterations: this._iterationsCount, + }); + + const startAt = performance.now(); + + try { + const result = await this.#doWorkInternal(); + + if (result) { + this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1; + + span.setAttribute("reason", result.reason); + + if (result.attrs) { + for (const [key, value] of Object.entries(result.attrs)) { + if (value) { + span.setAttribute(key, value); + } + } + } + + if (result.error) { + span.recordException(result.error); + span.setStatus({ code: SpanStatusCode.ERROR }); + } + + if (typeof result.interval === "number") { + nextInterval = Math.max(result.interval, 0); // Cannot be negative + } + + span.setAttribute("nextInterval", nextInterval); + } else { + span.setAttribute("reason", "no_result"); + + this._reasonStats["no_result"] = (this._reasonStats["no_result"] ?? 0) + 1; + } + } catch (error) { + if (error instanceof Error) { + this._currentSpan?.recordException(error); + } else { + this._currentSpan?.recordException(new Error(String(error))); + } + + this._endSpanInNextIteration = true; + } finally { + this._runningDurationInMs = this._runningDurationInMs + (performance.now() - startAt); + this._iterationsCount++; + this._totalIterationsCount++; + this._perTraceCountdown = this._perTraceCountdown! - 1; + + span.end(); + + setTimeout(() => { + this.#doWork().finally(() => {}); + }, nextInterval); + } + }); }); } - async #doWorkInternal() { + async #doWorkInternal(): Promise { // Attempt to dequeue a message from the shared queue // If no message is available, reschedule the worker to run again in 1 second // If a message is available, find the BackgroundWorkerTask that matches the message's taskIdentifier @@ -257,11 +393,13 @@ export class SharedQueueConsumer { // When the task run completes, ack the message // Using a heartbeat mechanism, if the client keeps responding with a heartbeat, we'll keep the message processing and increase the visibility timeout. + this._currentMessage = undefined; + this._currentMessageData = undefined; + const message = await marqs?.dequeueMessageInSharedQueue(this._id); if (!message) { - this.#doMoreWork(this._options.nextTickInterval); - return; + return { reason: "no_message_dequeued", interval: this._options.nextTickInterval }; } logger.log("dequeueMessageInSharedQueue()", { queueMessage: message }); @@ -274,718 +412,949 @@ export class SharedQueueConsumer { error: messageBody.error, }); - await this.#ackAndDoMoreWork(message.messageId); - return; - } + await this.#ack(message.messageId); - // TODO: For every ACK, decide what should be done with the existing run and attempts. Make sure to check the current statuses first. - - switch (messageBody.data.type) { - // MARK: EXECUTE - case "EXECUTE": { - const existingTaskRun = await prisma.taskRun.findUnique({ - where: { - id: message.messageId, - }, - }); + return { + reason: "failed_to_parse_message", + attrs: { message_id: message.messageId, message_version: message.version }, + error: messageBody.error, + }; + } - if (!existingTaskRun) { - logger.error("No existing task run", { - queueMessage: message.data, - messageId: message.messageId, - }); + const hydrateAttributes = (attrs: Record) => { + return { + ...attrs, + message_id: message.messageId, + message_version: message.version, + run_id: message.messageId, + message_type: messageBody.data.type, + }; + }; - // INFO: There used to be a race condition where tasks could be triggered, but execute messages could be dequeued before the run finished being created in the DB - // This should not be happening anymore. In case it does, consider reqeueuing here with a brief delay while limiting total retries. + this._currentMessage = message; + this._currentMessageData = messageBody.data; - await this.#ackAndDoMoreWork(message.messageId); - return; - } + const messageResult = await this.#handleMessage(message, messageBody.data); - const retryingFromCheckpoint = !!messageBody.data.checkpointEventId; + if (!messageResult) { + return { + reason: "no_message_result", + attrs: hydrateAttributes({}), + }; + } - const EXECUTABLE_RUN_STATUSES = { - fromCheckpoint: ["WAITING_TO_RESUME"] satisfies TaskRunStatus[], - withoutCheckpoint: ["PENDING", "RETRYING_AFTER_FAILURE"] satisfies TaskRunStatus[], + switch (messageResult.action) { + case "noop": { + return { + reason: messageResult.reason ?? "none_specified", + attrs: hydrateAttributes(messageResult.attrs ?? {}), + error: messageResult.error, + interval: messageResult.interval, + }; + } + case "ack_and_do_more_work": { + await this.#ack(message.messageId); + + return { + reason: messageResult.reason ?? "none_specified", + attrs: hydrateAttributes(messageResult.attrs ?? {}), + error: messageResult.error, + interval: messageResult.interval, + }; + } + case "nack_and_do_more_work": { + await this.#nack(message.messageId, messageResult.retryInMs); + + return { + reason: messageResult.reason ?? "none_specified", + attrs: hydrateAttributes(messageResult.attrs ?? {}), + error: messageResult.error, + interval: messageResult.interval, }; + } + case "nack": { + await marqs?.nackMessage(message.messageId); - if ( - (retryingFromCheckpoint && - !EXECUTABLE_RUN_STATUSES.fromCheckpoint.includes(existingTaskRun.status)) || - (!retryingFromCheckpoint && - !EXECUTABLE_RUN_STATUSES.withoutCheckpoint.includes(existingTaskRun.status)) - ) { - logger.error("Task run has invalid status for execution. Going to ack", { - queueMessage: message.data, - messageId: message.messageId, - taskRun: existingTaskRun.id, - status: existingTaskRun.status, - retryingFromCheckpoint, - }); + return { + reason: messageResult.reason ?? "none_specified", + attrs: hydrateAttributes(messageResult.attrs ?? {}), + error: messageResult.error, + }; + } + } + } - await this.#ackAndDoMoreWork(message.messageId); - return; + async #handleMessage( + message: MessagePayload, + data: SharedQueueMessageBody + ): Promise { + return await this.#startActiveSpan("handleMessage()", async (span) => { + // TODO: For every ACK, decide what should be done with the existing run and attempts. Make sure to check the current statuses first. + switch (data.type) { + // MARK: EXECUTE + case "EXECUTE": { + return await this.#handleExecuteMessage(message, data); } - - // Check if the task run is locked to a specific worker, if not, use the current worker deployment - const deployment = existingTaskRun.lockedById - ? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById) - : existingTaskRun.lockedToVersionId - ? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId) - : await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId); - - if (!deployment || !deployment.worker) { - logger.error("No matching deployment found for task run", { - queueMessage: message.data, - messageId: message.messageId, - }); - - await this.#markRunAsWaitingForDeploy(existingTaskRun.id); - - await this.#ackAndDoMoreWork(message.messageId); - return; + // MARK: DEP RESUME + // Resume after dependency completed with no remaining retries + case "RESUME": { + return await this.#handleResumeMessage(message, data); } - - if (!deployment.imageReference) { - logger.error("Deployment is missing an image reference", { - queueMessage: message.data, - messageId: message.messageId, - deployment: deployment.id, - }); - - await this.#markRunAsWaitingForDeploy(existingTaskRun.id); - - await this.#ackAndDoMoreWork(message.messageId); - return; + // MARK: DURATION RESUME + // Resume after duration-based wait + case "RESUME_AFTER_DURATION": { + return await this.#handleResumeAfterDurationMessage(message, data); } - - const backgroundTask = deployment.worker.tasks.find( - (task) => task.slug === existingTaskRun.taskIdentifier - ); - - if (!backgroundTask) { - const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({ - where: { - slug: existingTaskRun.taskIdentifier, - projectId: existingTaskRun.projectId, - runtimeEnvironmentId: existingTaskRun.runtimeEnvironmentId, - }, - include: { - worker: { - include: { - deployment: { - include: {}, - }, - }, - }, - }, - }); - - if (nonCurrentTask) { - logger.warn("Task for this run exists but is not part of the current deploy", { - taskRun: existingTaskRun.id, - taskIdentifier: existingTaskRun.taskIdentifier, - }); - } else { - logger.warn("Task for this run has never been deployed", { - taskRun: existingTaskRun.id, - taskIdentifier: existingTaskRun.taskIdentifier, - }); - } - - await this.#markRunAsWaitingForDeploy(existingTaskRun.id); - - // If this task is ever deployed, a new message will be enqueued after successful indexing - await this.#ackAndDoMoreWork(message.messageId); - return; - } - - const lockedTaskRun = await prisma.taskRun.update({ - where: { - id: message.messageId, - }, - data: { - lockedAt: new Date(), - lockedById: backgroundTask.id, - lockedToVersionId: deployment.worker.id, - taskVersion: deployment.worker.version, - sdkVersion: deployment.worker.sdkVersion, - cliVersion: deployment.worker.cliVersion, - startedAt: existingTaskRun.startedAt ?? new Date(), - baseCostInCents: env.CENTS_PER_RUN, - machinePreset: - existingTaskRun.machinePreset ?? - machinePresetFromConfig(backgroundTask.machineConfig ?? {}).name, - maxDurationInSeconds: getMaxDuration( - existingTaskRun.maxDurationInSeconds, - backgroundTask.maxDurationInSeconds - ), - }, - include: { - runtimeEnvironment: true, - attempts: { - take: 1, - orderBy: { number: "desc" }, - }, - tags: true, - checkpoints: { - take: 1, - orderBy: { - createdAt: "desc", - }, - }, - lockedBy: true, - }, - }); - - if (!lockedTaskRun) { - logger.warn("Failed to lock task run", { - taskRun: existingTaskRun.id, - taskIdentifier: existingTaskRun.taskIdentifier, - deployment: deployment.id, - backgroundWorker: deployment.worker.id, - messageId: message.messageId, - }); - - await this.#ackAndDoMoreWork(message.messageId); - return; + // MARK: FAIL + // Fail for whatever reason, usually runs that have been resumed but stopped heartbeating + case "FAIL": { + return await this.#handleFailMessage(message, data); } + } + }); + } - const queue = await prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, - name: sanitizeQueueName(lockedTaskRun.queue), - }, - }, - }); - - if (!queue) { - logger.debug("SharedQueueConsumer queue not found, so nacking message", { - queueMessage: message, - taskRunQueue: lockedTaskRun.queue, - runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, - }); - - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval); - return; - } + async #handleExecuteMessage( + message: MessagePayload, + data: SharedQueueExecuteMessageBody + ): Promise { + const existingTaskRun = await prisma.taskRun.findFirst({ + where: { + id: message.messageId, + }, + }); - if (!this._enabled) { - logger.debug("SharedQueueConsumer not enabled, so nacking message", { - queueMessage: message, - }); + if (!existingTaskRun) { + logger.error("No existing task run", { + queueMessage: message.data, + messageId: message.messageId, + }); - await marqs?.nackMessage(message.messageId); - return; - } + // INFO: There used to be a race condition where tasks could be triggered, but execute messages could be dequeued before the run finished being created in the DB + // This should not be happening anymore. In case it does, consider reqeueuing here with a brief delay while limiting total retries. + return { action: "ack_and_do_more_work", reason: "no_existing_task_run" }; + } - const nextAttemptNumber = lockedTaskRun.attempts[0] - ? lockedTaskRun.attempts[0].number + 1 - : 1; + const retryingFromCheckpoint = !!data.checkpointEventId; - const isRetry = - nextAttemptNumber > 1 && - (lockedTaskRun.status === "WAITING_TO_RESUME" || - lockedTaskRun.status === "RETRYING_AFTER_FAILURE"); + const EXECUTABLE_RUN_STATUSES = { + fromCheckpoint: ["WAITING_TO_RESUME"] satisfies TaskRunStatus[], + withoutCheckpoint: ["PENDING", "RETRYING_AFTER_FAILURE"] satisfies TaskRunStatus[], + }; - try { - if (messageBody.data.checkpointEventId) { - const restoreService = new RestoreCheckpointService(); + if ( + (retryingFromCheckpoint && + !EXECUTABLE_RUN_STATUSES.fromCheckpoint.includes(existingTaskRun.status)) || + (!retryingFromCheckpoint && + !EXECUTABLE_RUN_STATUSES.withoutCheckpoint.includes(existingTaskRun.status)) + ) { + logger.error("Task run has invalid status for execution. Going to ack", { + queueMessage: message.data, + messageId: message.messageId, + taskRun: existingTaskRun.id, + status: existingTaskRun.status, + retryingFromCheckpoint, + }); - const checkpoint = await restoreService.call({ - eventId: messageBody.data.checkpointEventId, - isRetry, - }); + return { + action: "ack_and_do_more_work", + reason: "invalid_run_status", + attrs: { status: existingTaskRun.status, retryingFromCheckpoint }, + }; + } - if (!checkpoint) { - logger.error("Failed to restore checkpoint", { - queueMessage: message.data, - messageId: message.messageId, - runStatus: lockedTaskRun.status, - isRetry, - }); + // Check if the task run is locked to a specific worker, if not, use the current worker deployment + const deployment = await this.#startActiveSpan("findCurrentWorkerDeployment", async (span) => { + return existingTaskRun.lockedById + ? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById) + : existingTaskRun.lockedToVersionId + ? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId) + : await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId); + }); - await this.#ackAndDoMoreWork(message.messageId); - return; - } + const worker = deployment?.worker; - break; - } + if (!deployment || !worker) { + logger.error("No matching deployment found for task run", { + queueMessage: message.data, + messageId: message.messageId, + }); - if (!deployment.worker.supportsLazyAttempts) { - try { - const service = new CreateTaskRunAttemptService(); - await service.call({ - runId: lockedTaskRun.id, - setToExecuting: false, - }); - } catch (error) { - logger.error("Failed to create task run attempt for outdate worker", { - error, - taskRun: lockedTaskRun.id, - }); + await this.#markRunAsWaitingForDeploy(existingTaskRun.id); - const service = new CrashTaskRunService(); - await service.call(lockedTaskRun.id, { - errorCode: TaskRunErrorCodes.OUTDATED_SDK_VERSION, - }); + return { + action: "ack_and_do_more_work", + reason: "no_matching_deployment", + attrs: { + run_id: existingTaskRun.id, + locked_by_id: existingTaskRun.lockedById ?? undefined, + locked_to_version_id: existingTaskRun.lockedToVersionId ?? undefined, + environment_id: existingTaskRun.runtimeEnvironmentId, + }, + }; + } - await this.#ackAndDoMoreWork(message.messageId); - return; - } - } + const imageReference = deployment.imageReference; - if (isRetry && !messageBody.data.retryCheckpointsDisabled) { - socketIo.coordinatorNamespace.emit("READY_FOR_RETRY", { - version: "v1", - runId: lockedTaskRun.id, - }); + if (!imageReference) { + logger.error("Deployment is missing an image reference", { + queueMessage: message.data, + messageId: message.messageId, + deployment: deployment.id, + }); - // Retries for workers with disabled retry checkpoints will be handled just like normal attempts - } else { - const machine = - machinePresetFromRun(lockedTaskRun) ?? - machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {}); + await this.#markRunAsWaitingForDeploy(existingTaskRun.id); - await this._sender.send("BACKGROUND_WORKER_MESSAGE", { - backgroundWorkerId: deployment.worker.friendlyId, - data: { - type: "SCHEDULE_ATTEMPT", - image: deployment.imageReference, - version: deployment.version, - machine, - nextAttemptNumber, - // identifiers - id: "placeholder", // TODO: Remove this completely in a future release - envId: lockedTaskRun.runtimeEnvironment.id, - envType: lockedTaskRun.runtimeEnvironment.type, - orgId: lockedTaskRun.runtimeEnvironment.organizationId, - projectId: lockedTaskRun.runtimeEnvironment.projectId, - runId: lockedTaskRun.id, - }, - }); - } - } catch (e) { - if (e instanceof Error) { - this._currentSpan?.recordException(e); - } else { - this._currentSpan?.recordException(new Error(String(e))); - } + return { + action: "ack_and_do_more_work", + reason: "missing_image_reference", + attrs: { + run_id: existingTaskRun.id, + deployment_id: deployment.id, + }, + }; + } - this._endSpanInNextIteration = true; + const backgroundTask = worker.tasks.find( + (task) => task.slug === existingTaskRun.taskIdentifier + ); - // We now need to unlock the task run and delete the task run attempt - await prisma.$transaction([ - prisma.taskRun.update({ - where: { - id: lockedTaskRun.id, - }, - data: { - lockedAt: null, - lockedById: null, - status: lockedTaskRun.status, - startedAt: existingTaskRun.startedAt, + if (!backgroundTask) { + const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({ + where: { + slug: existingTaskRun.taskIdentifier, + projectId: existingTaskRun.projectId, + runtimeEnvironmentId: existingTaskRun.runtimeEnvironmentId, + }, + include: { + worker: { + include: { + deployment: { + include: {}, }, - }), - ]); - - logger.error("SharedQueueConsumer errored, so nacking message", { - queueMessage: message, - error: e instanceof Error ? { name: e.name, message: e.message, stack: e.stack } : e, - }); - - await this.#nackAndDoMoreWork(message.messageId); - return; - } + }, + }, + }, + }); - break; + if (nonCurrentTask) { + logger.warn("Task for this run exists but is not part of the current deploy", { + taskRun: existingTaskRun.id, + taskIdentifier: existingTaskRun.taskIdentifier, + }); + } else { + logger.warn("Task for this run has never been deployed", { + taskRun: existingTaskRun.id, + taskIdentifier: existingTaskRun.taskIdentifier, + }); } - // MARK: DEP RESUME - // Resume after dependency completed with no remaining retries - case "RESUME": { - if (messageBody.data.checkpointEventId) { - try { - const restoreService = new RestoreCheckpointService(); - const checkpoint = await restoreService.call({ - eventId: messageBody.data.checkpointEventId, - }); + await this.#markRunAsWaitingForDeploy(existingTaskRun.id); - if (!checkpoint) { - logger.error("Failed to restore checkpoint", { - queueMessage: message.data, - messageId: message.messageId, - }); + // If this task is ever deployed, a new message will be enqueued after successful indexing + return { + action: "ack_and_do_more_work", + reason: "task_not_deployed", + attrs: { + run_id: existingTaskRun.id, + task_identifier: existingTaskRun.taskIdentifier, + }, + }; + } - await this.#ackAndDoMoreWork(message.messageId); - return; - } - } catch (e) { - if (e instanceof Error) { - this._currentSpan?.recordException(e); - } else { - this._currentSpan?.recordException(new Error(String(e))); - } + const lockedTaskRun = await prisma.taskRun.update({ + where: { + id: message.messageId, + }, + data: { + lockedAt: new Date(), + lockedById: backgroundTask.id, + lockedToVersionId: worker.id, + taskVersion: worker.version, + sdkVersion: worker.sdkVersion, + cliVersion: worker.cliVersion, + startedAt: existingTaskRun.startedAt ?? new Date(), + baseCostInCents: env.CENTS_PER_RUN, + machinePreset: + existingTaskRun.machinePreset ?? + machinePresetFromConfig(backgroundTask.machineConfig ?? {}).name, + maxDurationInSeconds: getMaxDuration( + existingTaskRun.maxDurationInSeconds, + backgroundTask.maxDurationInSeconds + ), + }, + include: { + runtimeEnvironment: true, + attempts: { + take: 1, + orderBy: { number: "desc" }, + }, + tags: true, + checkpoints: { + take: 1, + orderBy: { + createdAt: "desc", + }, + }, + lockedBy: true, + }, + }); - this._endSpanInNextIteration = true; + if (!lockedTaskRun) { + logger.warn("Failed to lock task run", { + taskRun: existingTaskRun.id, + taskIdentifier: existingTaskRun.taskIdentifier, + deployment: deployment.id, + backgroundWorker: worker.id, + messageId: message.messageId, + }); - await this.#nackAndDoMoreWork(message.messageId); - return; - } + return { + action: "nack_and_do_more_work", + reason: "failed_to_lock_task_run", + attrs: { + run_id: existingTaskRun.id, + task_identifier: existingTaskRun.taskIdentifier, + deployment_id: deployment.id, + background_worker_id: worker.id, + message_id: message.messageId, + }, + }; + } - this.#doMoreWork(); - return; - } + const queue = await prisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, + name: sanitizeQueueName(lockedTaskRun.queue), + }, + }); - const resumableRun = await prisma.taskRun.findUnique({ - where: { - id: message.messageId, - status: { - notIn: FINAL_RUN_STATUSES, - }, - }, - }); + if (!queue) { + logger.debug("SharedQueueConsumer queue not found, so nacking message", { + queueMessage: message, + taskRunQueue: lockedTaskRun.queue, + runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, + }); - if (!resumableRun) { - logger.error("Resumable run not found", { - queueMessage: message.data, - messageId: message.messageId, - }); + return { + action: "nack_and_do_more_work", + reason: "queue_not_found", + attrs: { + queue_name: sanitizeQueueName(lockedTaskRun.queue), + runtime_environment_id: lockedTaskRun.runtimeEnvironmentId, + }, + interval: this._options.nextTickInterval, + }; + } - await this.#ackAndDoMoreWork(message.messageId); - return; - } + if (!this._enabled) { + logger.debug("SharedQueueConsumer not enabled, so nacking message", { + queueMessage: message, + }); - if (resumableRun.status !== "EXECUTING") { - logger.warn("Run is not executing, will try to resume anyway", { - queueMessage: message.data, - messageId: message.messageId, - runStatus: resumableRun.status, - }); - } + return { + action: "nack", + reason: "not_enabled", + attrs: { + message_id: message.messageId, + }, + }; + } - const resumableAttempt = await prisma.taskRunAttempt.findUnique({ - where: { - id: messageBody.data.resumableAttemptId, - }, - include: { - checkpoints: { - take: 1, - orderBy: { - createdAt: "desc", - }, - }, - }, + const nextAttemptNumber = lockedTaskRun.attempts[0] ? lockedTaskRun.attempts[0].number + 1 : 1; + + const isRetry = + nextAttemptNumber > 1 && + (lockedTaskRun.status === "WAITING_TO_RESUME" || + lockedTaskRun.status === "RETRYING_AFTER_FAILURE"); + + try { + if (data.checkpointEventId) { + const restoreService = new RestoreCheckpointService(); + + const checkpoint = await restoreService.call({ + eventId: data.checkpointEventId, + isRetry, }); - if (!resumableAttempt) { - logger.error("Resumable attempt not found", { + if (!checkpoint) { + logger.error("Failed to restore checkpoint", { queueMessage: message.data, messageId: message.messageId, + runStatus: lockedTaskRun.status, + isRetry, }); - await this.#ackAndDoMoreWork(message.messageId); - return; + return { + action: "ack_and_do_more_work", + reason: "failed_to_restore_checkpoint", + attrs: { + run_status: lockedTaskRun.status, + is_retry: isRetry, + }, + }; } - const queue = await prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: resumableAttempt.runtimeEnvironmentId, - name: sanitizeQueueName(resumableRun.queue), - }, - }, - }); + return; + } - if (!queue) { - logger.debug("SharedQueueConsumer queue not found, so nacking message", { - queueName: sanitizeQueueName(resumableRun.queue), - attempt: resumableAttempt, + if (!worker.supportsLazyAttempts) { + try { + const service = new CreateTaskRunAttemptService(); + await service.call({ + runId: lockedTaskRun.id, + setToExecuting: false, + }); + } catch (error) { + logger.error("Failed to create task run attempt for outdate worker", { + error, + taskRun: lockedTaskRun.id, }); - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval); - return; - } + const service = new CrashTaskRunService(); + await service.call(lockedTaskRun.id, { + errorCode: TaskRunErrorCodes.OUTDATED_SDK_VERSION, + }); - if (!this._enabled) { - await marqs?.nackMessage(message.messageId); - return; + return { + action: "ack_and_do_more_work", + reason: "failed_to_create_attempt", + attrs: { + message_id: message.messageId, + run_id: lockedTaskRun.id, + }, + error: error instanceof Error ? error : String(error), + }; } + } - const completions: TaskRunExecutionResult[] = []; - const executions: TaskRunExecution[] = []; + if (isRetry && !data.retryCheckpointsDisabled) { + socketIo.coordinatorNamespace.emit("READY_FOR_RETRY", { + version: "v1", + runId: lockedTaskRun.id, + }); - for (const completedAttemptId of messageBody.data.completedAttemptIds) { - const completedAttempt = await prisma.taskRunAttempt.findUnique({ - where: { - id: completedAttemptId, - taskRun: { - lockedAt: { - not: null, - }, - lockedById: { - not: null, - }, - }, + // Retries for workers with disabled retry checkpoints will be handled just like normal attempts + return { + action: "noop", + reason: "retry_checkpoints_disabled", + }; + } else { + const machine = + machinePresetFromRun(lockedTaskRun) ?? + machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {}); + + await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => { + await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", { + backgroundWorkerId: worker.friendlyId, + data: { + type: "SCHEDULE_ATTEMPT", + image: imageReference, + version: deployment.version, + machine, + nextAttemptNumber, + // identifiers + id: "placeholder", // TODO: Remove this completely in a future release + envId: lockedTaskRun.runtimeEnvironment.id, + envType: lockedTaskRun.runtimeEnvironment.type, + orgId: lockedTaskRun.runtimeEnvironment.organizationId, + projectId: lockedTaskRun.runtimeEnvironment.projectId, + runId: lockedTaskRun.id, }, }); + }); - if (!completedAttempt) { - logger.error("Completed attempt not found", { - queueMessage: message.data, - messageId: message.messageId, - }); + return { + action: "noop", + reason: "scheduled_attempt", + attrs: { + next_attempt_number: nextAttemptNumber, + }, + }; + } + } catch (e) { + // We now need to unlock the task run and delete the task run attempt + await prisma.$transaction([ + prisma.taskRun.update({ + where: { + id: lockedTaskRun.id, + }, + data: { + lockedAt: null, + lockedById: null, + status: lockedTaskRun.status, + startedAt: existingTaskRun.startedAt, + }, + }), + ]); - await this.#ackAndDoMoreWork(message.messageId); - return; - } + logger.error("SharedQueueConsumer errored, so nacking message", { + queueMessage: message, + error: e instanceof Error ? { name: e.name, message: e.message, stack: e.stack } : e, + }); - const completion = await this._tasks.getCompletionPayloadFromAttempt(completedAttempt.id); + return { + action: "nack_and_do_more_work", + reason: "failed_to_schedule_attempt", + error: e instanceof Error ? e : String(e), + }; + } + } - if (!completion) { - await this.#ackAndDoMoreWork(message.messageId); - return; - } + async #handleResumeMessage( + message: MessagePayload, + data: SharedQueueResumeMessageBody + ): Promise { + if (data.checkpointEventId) { + try { + const restoreService = new RestoreCheckpointService(); - completions.push(completion); + const checkpoint = await restoreService.call({ + eventId: data.checkpointEventId, + }); - const executionPayload = await this._tasks.getExecutionPayloadFromAttempt({ - id: completedAttempt.id, + if (!checkpoint) { + logger.error("Failed to restore checkpoint", { + queueMessage: message.data, + messageId: message.messageId, }); - if (!executionPayload) { - await this.#ackAndDoMoreWork(message.messageId); - return; - } - - executions.push(executionPayload.execution); + return { + action: "ack_and_do_more_work", + reason: "failed_to_restore_checkpoint", + attrs: { + checkpoint_event_id: data.checkpointEventId, + }, + }; } - try { - const resumeMessage = { - version: "v1" as const, - runId: resumableAttempt.taskRunId, - attemptId: resumableAttempt.id, - attemptFriendlyId: resumableAttempt.friendlyId, - completions, - executions, - }; + return { + action: "noop", + reason: "restored_checkpoint", + }; + } catch (e) { + return { + action: "nack_and_do_more_work", + reason: "failed_to_restore_checkpoint", + error: e instanceof Error ? e : String(e), + }; + } + } - logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { - resumeMessage, - message, - resumableRun, - }); + const resumableRun = await prisma.taskRun.findFirst({ + where: { + id: message.messageId, + status: { + notIn: FINAL_RUN_STATUSES, + }, + }, + }); - // The attempt should still be running so we can broadcast to all coordinators to resume immediately - const responses = await socketIo.coordinatorNamespace - .timeout(10_000) - .emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage); + if (!resumableRun) { + logger.error("Resumable run not found", { + queueMessage: message.data, + messageId: message.messageId, + }); - logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", { - resumeMessage, - responses, - message, - }); + return { + action: "ack_and_do_more_work", + reason: "run_not_found", + }; + } - if (responses.length === 0) { - logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", { - resumeMessage, - message, - }); - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); - return; - } + if (resumableRun.status !== "EXECUTING") { + logger.warn("Run is not executing, will try to resume anyway", { + queueMessage: message.data, + messageId: message.messageId, + runStatus: resumableRun.status, + }); + } - const hasSuccess = responses.some((response) => response.success); + const resumableAttempt = await prisma.taskRunAttempt.findFirst({ + where: { + id: data.resumableAttemptId, + }, + include: { + checkpoints: { + take: 1, + orderBy: { + createdAt: "desc", + }, + }, + }, + }); - if (hasSuccess) { - this.#doMoreWork(); - return; - } + if (!resumableAttempt) { + logger.error("Resumable attempt not found", { + queueMessage: message.data, + messageId: message.messageId, + }); - // No coordinator was able to resume the run - logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", { - resumeMessage, - responses, - message, - }); + return { + action: "ack_and_do_more_work", + reason: "attempt_not_found", + attrs: { + attempt_id: data.resumableAttemptId, + }, + }; + } - // Let's check if the run is frozen - if (resumableRun.status === "WAITING_TO_RESUME") { - logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK run is waiting to be restored", { - queueMessage: message.data, - messageId: message.messageId, - }); + const queue = await prisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: resumableAttempt.runtimeEnvironmentId, + name: sanitizeQueueName(resumableRun.queue), + }, + }); + + if (!queue) { + logger.debug("SharedQueueConsumer queue not found, so nacking message", { + queueName: sanitizeQueueName(resumableRun.queue), + attempt: resumableAttempt, + }); - try { - const restoreService = new RestoreCheckpointService(); + return { + action: "nack_and_do_more_work", + reason: "queue_not_found", + attrs: { + queue_name: sanitizeQueueName(resumableRun.queue), + }, + interval: this._options.nextTickInterval, + }; + } - const checkpointEvent = await restoreService.getLastCheckpointEventIfUnrestored( - resumableRun.id - ); + if (!this._enabled) { + return { + action: "nack", + reason: "not_enabled", + attrs: { + message_id: message.messageId, + }, + }; + } - if (checkpointEvent) { - // The last checkpoint hasn't been restored yet, so restore it - const checkpoint = await restoreService.call({ - eventId: checkpointEvent.id, - }); + const completions: TaskRunExecutionResult[] = []; + const executions: TaskRunExecution[] = []; - if (!checkpoint) { - logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK failed to restore checkpoint", { - queueMessage: message.data, - messageId: message.messageId, - }); + for (const completedAttemptId of data.completedAttemptIds) { + const completedAttempt = await prisma.taskRunAttempt.findFirst({ + where: { + id: completedAttemptId, + taskRun: { + lockedAt: { + not: null, + }, + lockedById: { + not: null, + }, + }, + }, + }); - await this.#ackAndDoMoreWork(message.messageId); - return; - } + if (!completedAttempt) { + logger.error("Completed attempt not found", { + queueMessage: message.data, + messageId: message.messageId, + }); - logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK restored checkpoint", { - queueMessage: message.data, - messageId: message.messageId, - checkpoint, - }); - - this.#doMoreWork(); - return; - } else { - logger.debug( - "RESUME_AFTER_DEPENDENCY_WITH_ACK run is frozen without last checkpoint event", - { - queueMessage: message.data, - messageId: message.messageId, - } - ); - } - } catch (e) { - if (e instanceof Error) { - this._currentSpan?.recordException(e); - } else { - this._currentSpan?.recordException(new Error(String(e))); - } + return { + action: "ack_and_do_more_work", + reason: "completed_attempt_not_found", + attrs: { + completed_attempt_id: completedAttemptId, + }, + }; + } - this._endSpanInNextIteration = true; + const completion = await this.#startActiveSpan( + "getCompletionPayloadFromAttempt", + async (span) => { + return await this._tasks.getCompletionPayloadFromAttempt(completedAttempt.id); + } + ); - await this.#nackAndDoMoreWork( - message.messageId, - this._options.nextTickInterval, - 5_000 - ); - return; - } - } + if (!completion) { + return { + action: "ack_and_do_more_work", + reason: "failed_to_get_completion_payload", + attrs: { + completed_attempt_id: completedAttemptId, + }, + }; + } - logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK retrying", { - queueMessage: message.data, - messageId: message.messageId, + completions.push(completion); + + const executionPayload = await this.#startActiveSpan( + "getExecutionPayloadFromAttempt", + async (span) => { + return await this._tasks.getExecutionPayloadFromAttempt({ + id: completedAttempt.id, }); + } + ); - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); - return; - } catch (e) { - if (e instanceof Error) { - this._currentSpan?.recordException(e); - } else { - this._currentSpan?.recordException(new Error(String(e))); - } + if (!executionPayload) { + return { + action: "ack_and_do_more_work", + reason: "failed_to_get_execution_payload", + attrs: { + completed_attempt_id: completedAttemptId, + }, + }; + } - this._endSpanInNextIteration = true; + executions.push(executionPayload.execution); + } - logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK threw, nacking with delay", { - message, - error: e, - }); + try { + const resumeMessage = { + version: "v1" as const, + runId: resumableAttempt.taskRunId, + attemptId: resumableAttempt.id, + attemptFriendlyId: resumableAttempt.friendlyId, + completions, + executions, + }; - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); - return; + logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { + resumeMessage, + message, + resumableRun, + }); + + // The attempt should still be running so we can broadcast to all coordinators to resume immediately + const responses = await this.#startActiveSpan( + "emitResumeAfterDependencyWithAck", + async () => { + return await socketIo.coordinatorNamespace + .timeout(10_000) + .emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage); } + ); + + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", { + resumeMessage, + responses, + message, + }); + + if (responses.length === 0) { + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", { + resumeMessage, + message, + }); + + return { + action: "nack_and_do_more_work", + reason: "resume_after_dependency_with_ack_no_response", + attrs: { + resume_message: "RESUME_AFTER_DEPENDENCY_WITH_ACK", + }, + interval: this._options.nextTickInterval, + retryInMs: 5_000, + }; + } + + const hasSuccess = responses.some((response) => response.success); - break; + if (hasSuccess) { + return { + action: "noop", + reason: "resume_after_dependency_with_ack_success", + }; } - // MARK: DURATION RESUME - // Resume after duration-based wait - case "RESUME_AFTER_DURATION": { + + // No coordinator was able to resume the run + logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", { + resumeMessage, + responses, + message, + }); + + // Let's check if the run is frozen + if (resumableRun.status === "WAITING_TO_RESUME") { + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK run is waiting to be restored", { + queueMessage: message.data, + messageId: message.messageId, + }); + try { const restoreService = new RestoreCheckpointService(); - const checkpoint = await restoreService.call({ - eventId: messageBody.data.checkpointEventId, - }); + const checkpointEvent = await restoreService.getLastCheckpointEventIfUnrestored( + resumableRun.id + ); + + if (checkpointEvent) { + // The last checkpoint hasn't been restored yet, so restore it + const checkpoint = await restoreService.call({ + eventId: checkpointEvent.id, + }); + + if (!checkpoint) { + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK failed to restore checkpoint", { + queueMessage: message.data, + messageId: message.messageId, + }); + + return { + action: "ack_and_do_more_work", + reason: "failed_to_restore_checkpoint", + attrs: { + checkpoint_event_id: checkpointEvent.id, + }, + }; + } - if (!checkpoint) { - logger.error("Failed to restore checkpoint", { + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK restored checkpoint", { queueMessage: message.data, messageId: message.messageId, + checkpoint, }); - await this.#ackAndDoMoreWork(message.messageId); - return; - } - } catch (e) { - if (e instanceof Error) { - this._currentSpan?.recordException(e); + return { + action: "noop", + reason: "restored_checkpoint", + }; } else { - this._currentSpan?.recordException(new Error(String(e))); + logger.debug( + "RESUME_AFTER_DEPENDENCY_WITH_ACK run is frozen without last checkpoint event", + { + queueMessage: message.data, + messageId: message.messageId, + } + ); + + return { + action: "noop", + reason: "resume_after_dependency_with_ack_frozen", + }; } + } catch (e) { + return { + action: "nack_and_do_more_work", + reason: "waiting_to_resume_threw", + error: e instanceof Error ? e : String(e), + interval: this._options.nextTickInterval, + retryInMs: 5_000, + }; + } + } - this._endSpanInNextIteration = true; + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK retrying", { + queueMessage: message.data, + messageId: message.messageId, + }); - await this.#nackAndDoMoreWork(message.messageId); - return; - } + return { + action: "nack_and_do_more_work", + reason: "resume_after_dependency_with_ack_retrying", + attrs: { + message_id: message.messageId, + }, + interval: this._options.nextTickInterval, + retryInMs: 5_000, + }; + } catch (e) { + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK threw, nacking with delay", { + message, + error: e, + }); - break; - } - // MARK: FAIL - // Fail for whatever reason, usually runs that have been resumed but stopped heartbeating - case "FAIL": { - const existingTaskRun = await prisma.taskRun.findUnique({ - where: { - id: message.messageId, - }, - }); + return { + action: "nack_and_do_more_work", + reason: "resume_after_dependency_with_ack_threw", + error: e instanceof Error ? e : String(e), + interval: this._options.nextTickInterval, + retryInMs: 5_000, + }; + } + } - if (!existingTaskRun) { - logger.error("No existing task run to fail", { - queueMessage: messageBody, - messageId: message.messageId, - }); + async #handleResumeAfterDurationMessage( + message: MessagePayload, + data: SharedQueueResumeAfterDurationMessageBody + ): Promise { + try { + const restoreService = new RestoreCheckpointService(); - await this.#ackAndDoMoreWork(message.messageId); - return; - } + const checkpoint = await restoreService.call({ + eventId: data.checkpointEventId, + }); - // TODO: Consider failing the attempt and retrying instead. This may not be a good idea, as dequeued FAIL messages tend to point towards critical, persistent errors. - const service = new CrashTaskRunService(); - await service.call(existingTaskRun.id, { - crashAttempts: true, - reason: messageBody.data.reason, + if (!checkpoint) { + logger.error("Failed to restore checkpoint", { + queueMessage: message.data, + messageId: message.messageId, }); - await this.#ackAndDoMoreWork(message.messageId); - return; + return { + action: "ack_and_do_more_work", + reason: "failed_to_restore_checkpoint", + attrs: { + checkpoint_event_id: data.checkpointEventId, + }, + }; } - } - this.#doMoreWork(); - return; + return { + action: "noop", + reason: "restored_checkpoint", + attrs: { + checkpoint_event_id: data.checkpointEventId, + }, + }; + } catch (e) { + return { + action: "nack_and_do_more_work", + reason: "restoring_checkpoint_threw", + error: e instanceof Error ? e : String(e), + }; + } } - #doMoreWork(intervalInMs = this._options.interval) { - setTimeout(() => this.#doWork(), intervalInMs); + async #handleFailMessage( + message: MessagePayload, + data: SharedQueueFailMessageBody + ): Promise { + const existingTaskRun = await prisma.taskRun.findFirst({ + where: { + id: message.messageId, + }, + }); + + if (!existingTaskRun) { + logger.error("No existing task run to fail", { + queueMessage: data, + messageId: message.messageId, + }); + + return { + action: "ack_and_do_more_work", + reason: "no_existing_task_run", + }; + } + + // TODO: Consider failing the attempt and retrying instead. This may not be a good idea, as dequeued FAIL messages tend to point towards critical, persistent errors. + const service = new CrashTaskRunService(); + await service.call(existingTaskRun.id, { + crashAttempts: true, + reason: data.reason, + }); + + return { + action: "ack_and_do_more_work", + reason: "message_failed", + }; } - async #ackAndDoMoreWork(messageId: string, intervalInMs?: number) { + async #ack(messageId: string) { await marqs?.acknowledgeMessage(messageId, "Acking and doing more work in SharedQueueConsumer"); - this.#doMoreWork(intervalInMs); } - async #nackAndDoMoreWork(messageId: string, queueIntervalInMs?: number, nackRetryInMs?: number) { + async #nack(messageId: string, nackRetryInMs?: number) { const retryAt = nackRetryInMs ? Date.now() + nackRetryInMs : undefined; await marqs?.nackMessage(messageId, retryAt); - this.#doMoreWork(queueIntervalInMs); } async #markRunAsWaitingForDeploy(runId: string) { @@ -1000,6 +1369,40 @@ export class SharedQueueConsumer { }, }); } + + async #startActiveSpan( + name: string, + fn: (span: Span) => Promise, + options?: SpanOptions + ): Promise { + return await tracer.startActiveSpan(name, options ?? {}, async (span) => { + if (this._currentMessage) { + span.setAttribute("message_id", this._currentMessage.messageId); + span.setAttribute("run_id", this._currentMessage.messageId); + span.setAttribute("message_version", this._currentMessage.version); + } + + if (this._currentMessageData) { + span.setAttribute("message_type", this._currentMessageData.type); + } + + try { + return await fn(span); + } catch (error) { + if (error instanceof Error) { + span.recordException(error); + } else { + span.recordException(String(error)); + } + + span.setStatus({ code: SpanStatusCode.ERROR }); + + throw error; + } finally { + span.end(); + } + }); + } } type AttemptForCompletion = Prisma.TaskRunAttemptGetPayload<{ @@ -1388,7 +1791,7 @@ class SharedQueueTasks { setToExecuting?: boolean, isRetrying?: boolean ): Promise { - const run = await prisma.taskRun.findUnique({ + const run = await prisma.taskRun.findFirst({ where: { id, }, @@ -1482,7 +1885,7 @@ class SharedQueueTasks { async taskHeartbeat(attemptFriendlyId: string) { logger.debug("[SharedQueueConsumer] taskHeartbeat()", { id: attemptFriendlyId }); - const taskRunAttempt = await prisma.taskRunAttempt.findUnique({ + const taskRunAttempt = await prisma.taskRunAttempt.findFirst({ where: { friendlyId: attemptFriendlyId }, }); diff --git a/apps/webapp/app/v3/tracer.server.ts b/apps/webapp/app/v3/tracer.server.ts index ff669f3cf1..66b1fbfd4b 100644 --- a/apps/webapp/app/v3/tracer.server.ts +++ b/apps/webapp/app/v3/tracer.server.ts @@ -130,17 +130,14 @@ function getTracer() { }); if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) { + const headers = env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS + ? (JSON.parse(env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS) as Record) + : undefined; + const exporter = new OTLPTraceExporter({ url: env.INTERNAL_OTEL_TRACE_EXPORTER_URL, timeoutMillis: 15_000, - headers: - env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_NAME && - env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_VALUE - ? { - [env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_NAME]: - env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_VALUE, - } - : undefined, + headers: headers, }); provider.addSpanProcessor( diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index 5d949c5dcb..56e20f0df6 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -109,6 +109,11 @@ export class Logger { currentSpan && currentSpan.isRecording() ? currentSpan?.spanContext().spanId : undefined, }; + // If the span is not recording, and it's a debug log, don't log it + if (currentSpan && !currentSpan.isRecording() && level === "debug") { + return; + } + loggerFunction(JSON.stringify(structuredLog, this.#jsonReplacer)); } } From 754a014cdf06733a4af514c491ac08288c951b63 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 16 Jan 2025 17:42:57 +0000 Subject: [PATCH 2/4] findUnique -> findFirst to stop the prisma dataloader issue on api key auth --- apps/webapp/app/models/runtimeEnvironment.server.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/models/runtimeEnvironment.server.ts b/apps/webapp/app/models/runtimeEnvironment.server.ts index 868a39bf11..eb3e07c978 100644 --- a/apps/webapp/app/models/runtimeEnvironment.server.ts +++ b/apps/webapp/app/models/runtimeEnvironment.server.ts @@ -5,7 +5,7 @@ import { getUsername } from "~/utils/username"; export type { RuntimeEnvironment }; export async function findEnvironmentByApiKey(apiKey: string) { - const environment = await prisma.runtimeEnvironment.findUnique({ + const environment = await prisma.runtimeEnvironment.findFirst({ where: { apiKey, }, @@ -25,7 +25,7 @@ export async function findEnvironmentByApiKey(apiKey: string) { } export async function findEnvironmentByPublicApiKey(apiKey: string) { - const environment = await prisma.runtimeEnvironment.findUnique({ + const environment = await prisma.runtimeEnvironment.findFirst({ where: { pkApiKey: apiKey, }, @@ -45,7 +45,7 @@ export async function findEnvironmentByPublicApiKey(apiKey: string) { } export async function findEnvironmentById(id: string) { - const environment = await prisma.runtimeEnvironment.findUnique({ + const environment = await prisma.runtimeEnvironment.findFirst({ where: { id, }, @@ -85,7 +85,7 @@ export async function createNewSession(environment: RuntimeEnvironment, ipAddres } export async function disconnectSession(environmentId: string) { - const environment = await prisma.runtimeEnvironment.findUnique({ + const environment = await prisma.runtimeEnvironment.findFirst({ where: { id: environmentId, }, From 7bc2ded627be1ad7e5fa88237208cde0bf9ca781 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 16 Jan 2025 18:29:08 +0000 Subject: [PATCH 3/4] Added action stats, audited all actions and improved reasons --- .../v3/marqs/sharedQueueConsumer.server.ts | 72 ++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index e5f57d93d8..b0cff44b3c 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -58,7 +58,7 @@ import { isFinalAttemptStatus, isFinalRunStatus, } from "../taskStatus"; -import { SEMINTATTRS_FORCE_RECORDING, tracer } from "../tracer.server"; +import { tracer } from "../tracer.server"; import { getMaxDuration } from "../utils/maxDuration"; import { MessagePayload } from "./types"; @@ -120,25 +120,26 @@ export type SharedQueueConsumerOptions = { interval?: number; }; +type HandleMessageAction = "ack_and_do_more_work" | "nack" | "nack_and_do_more_work" | "noop"; + type DoWorkInternalResult = | { reason: string; attrs?: Record; error?: Error | string; interval?: number; + action?: HandleMessageAction; } | undefined; -type HandleMessageResult = - | { - action: "ack_and_do_more_work" | "nack" | "nack_and_do_more_work" | "noop"; - interval?: number; - retryInMs?: number; - reason?: string; - attrs?: Record; - error?: Error | string; - } - | undefined; +type HandleMessageResult = { + action: HandleMessageAction; + interval?: number; + retryInMs?: number; + reason?: string; + attrs?: Record; + error?: Error | string; +}; export class SharedQueueConsumer { private _backgroundWorkers: Map = new Map(); @@ -149,6 +150,7 @@ export class SharedQueueConsumer { private _traceStartedAt: Date | undefined; private _currentSpanContext: Context | undefined; private _reasonStats: Record = {}; + private _actionStats: Record = {}; private _currentSpan: Span | undefined; private _endSpanInNextIteration = false; private _tasks = sharedQueueTasks; @@ -243,6 +245,7 @@ export class SharedQueueConsumer { this._perTraceCountdown = this._options.maximumItemsPerTrace; this._traceStartedAt = new Date(); this._reasonStats = {}; + this._actionStats = {}; this.#doWork().finally(() => {}); } @@ -250,7 +253,11 @@ export class SharedQueueConsumer { #endCurrentSpan() { if (this._currentSpan) { for (const [reason, count] of Object.entries(this._reasonStats)) { - this._currentSpan.setAttribute(`reasons.${reason}`, count); + this._currentSpan.setAttribute(`reasons_${reason}`, count); + } + + for (const [action, count] of Object.entries(this._actionStats)) { + this._currentSpan.setAttribute(`actions_${action}`, count); } this._currentSpan.end(); @@ -310,6 +317,7 @@ export class SharedQueueConsumer { this._perTraceCountdown = this._options.maximumItemsPerTrace; this._traceStartedAt = new Date(); this._reasonStats = {}; + this._actionStats = {}; this._iterationsCount = 0; this._runningDurationInMs = 0; this._endSpanInNextIteration = false; @@ -333,6 +341,10 @@ export class SharedQueueConsumer { if (result) { this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1; + if (result.action) { + this._actionStats[result.action] = (this._actionStats[result.action] ?? 0) + 1; + } + span.setAttribute("reason", result.reason); if (result.attrs) { @@ -357,6 +369,7 @@ export class SharedQueueConsumer { span.setAttribute("reason", "no_result"); this._reasonStats["no_result"] = (this._reasonStats["no_result"] ?? 0) + 1; + this._actionStats["no_result"] = (this._actionStats["no_result"] ?? 0) + 1; } } catch (error) { if (error instanceof Error) { @@ -436,13 +449,6 @@ export class SharedQueueConsumer { const messageResult = await this.#handleMessage(message, messageBody.data); - if (!messageResult) { - return { - reason: "no_message_result", - attrs: hydrateAttributes({}), - }; - } - switch (messageResult.action) { case "noop": { return { @@ -450,6 +456,7 @@ export class SharedQueueConsumer { attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, interval: messageResult.interval, + action: "noop", }; } case "ack_and_do_more_work": { @@ -460,6 +467,7 @@ export class SharedQueueConsumer { attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, interval: messageResult.interval, + action: "ack_and_do_more_work", }; } case "nack_and_do_more_work": { @@ -470,6 +478,7 @@ export class SharedQueueConsumer { attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, interval: messageResult.interval, + action: "nack_and_do_more_work", }; } case "nack": { @@ -479,6 +488,7 @@ export class SharedQueueConsumer { reason: messageResult.reason ?? "none_specified", attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, + action: "nack", }; } } @@ -609,7 +619,6 @@ export class SharedQueueConsumer { action: "ack_and_do_more_work", reason: "missing_image_reference", attrs: { - run_id: existingTaskRun.id, deployment_id: deployment.id, }, }; @@ -710,7 +719,7 @@ export class SharedQueueConsumer { }); return { - action: "nack_and_do_more_work", + action: "ack_and_do_more_work", reason: "failed_to_lock_task_run", attrs: { run_id: existingTaskRun.id, @@ -791,11 +800,18 @@ export class SharedQueueConsumer { attrs: { run_status: lockedTaskRun.status, is_retry: isRetry, + checkpoint_event_id: data.checkpointEventId, }, }; } - return; + return { + action: "noop", + reason: "restored_checkpoint", + attrs: { + checkpoint_event_id: data.checkpointEventId, + }, + }; } if (!worker.supportsLazyAttempts) { @@ -806,7 +822,7 @@ export class SharedQueueConsumer { setToExecuting: false, }); } catch (error) { - logger.error("Failed to create task run attempt for outdate worker", { + logger.error("Failed to create task run attempt for outdated worker", { error, taskRun: lockedTaskRun.id, }); @@ -818,7 +834,7 @@ export class SharedQueueConsumer { return { action: "ack_and_do_more_work", - reason: "failed_to_create_attempt", + reason: "failed_to_create_attempt_for_outdated_worker", attrs: { message_id: message.messageId, run_id: lockedTaskRun.id, @@ -931,6 +947,9 @@ export class SharedQueueConsumer { return { action: "noop", reason: "restored_checkpoint", + attrs: { + checkpoint_event_id: data.checkpointEventId, + }, }; } catch (e) { return { @@ -992,7 +1011,7 @@ export class SharedQueueConsumer { return { action: "ack_and_do_more_work", - reason: "attempt_not_found", + reason: "resumable_attempt_not_found", attrs: { attempt_id: data.resumableAttemptId, }, @@ -1215,6 +1234,9 @@ export class SharedQueueConsumer { return { action: "noop", reason: "restored_checkpoint", + attrs: { + checkpoint_event_id: data.checkpointEventId, + }, }; } else { logger.debug( From 7b8fc17d5711acbf8aa2405f7957335f2fd0d2e4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 16 Jan 2025 19:11:10 +0000 Subject: [PATCH 4/4] Better debug logging filtering --- apps/webapp/app/v3/tracer.server.ts | 16 ++++++++++++---- packages/core/src/logger.ts | 4 ++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/webapp/app/v3/tracer.server.ts b/apps/webapp/app/v3/tracer.server.ts index 66b1fbfd4b..04db57a762 100644 --- a/apps/webapp/app/v3/tracer.server.ts +++ b/apps/webapp/app/v3/tracer.server.ts @@ -130,14 +130,12 @@ function getTracer() { }); if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) { - const headers = env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS - ? (JSON.parse(env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS) as Record) - : undefined; + const headers = parseInternalTraceHeaders() ?? {}; const exporter = new OTLPTraceExporter({ url: env.INTERNAL_OTEL_TRACE_EXPORTER_URL, timeoutMillis: 15_000, - headers: headers, + headers, }); provider.addSpanProcessor( @@ -206,3 +204,13 @@ export function attributesFromAuthenticatedEnv(env: AuthenticatedEnvironment): A [SemanticEnvResources.USER_ID]: env.orgMember?.userId, }; } + +function parseInternalTraceHeaders(): Record | undefined { + try { + return env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS + ? (JSON.parse(env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADERS) as Record) + : undefined; + } catch { + return; + } +} diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index 56e20f0df6..90ddc81b1d 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -109,9 +109,9 @@ export class Logger { currentSpan && currentSpan.isRecording() ? currentSpan?.spanContext().spanId : undefined, }; - // If the span is not recording, and it's a debug log, don't log it + // If the span is not recording, and it's a debug log, mark it so we can filter it out when we forward it if (currentSpan && !currentSpan.isRecording() && level === "debug") { - return; + structuredLog.skipForwarding = true; } loggerFunction(JSON.stringify(structuredLog, this.#jsonReplacer));