diff --git a/apps/coordinator/package.json b/apps/coordinator/package.json index 1268a1c620..c860adb1c8 100644 --- a/apps/coordinator/package.json +++ b/apps/coordinator/package.json @@ -17,10 +17,10 @@ "license": "MIT", "dependencies": { "@trigger.dev/core": "workspace:*", - "execa": "^8.0.1", "nanoid": "^5.0.6", "prom-client": "^15.1.0", - "socket.io": "4.7.4" + "socket.io": "4.7.4", + "tinyexec": "^0.3.0" }, "devDependencies": { "@types/node": "^18", diff --git a/apps/coordinator/src/chaosMonkey.ts b/apps/coordinator/src/chaosMonkey.ts index 978ec463d5..e2bc147674 100644 --- a/apps/coordinator/src/chaosMonkey.ts +++ b/apps/coordinator/src/chaosMonkey.ts @@ -1,4 +1,3 @@ -import type { Execa$ } from "execa"; import { setTimeout as timeout } from "node:timers/promises"; class ChaosMonkeyError extends Error { @@ -35,11 +34,9 @@ export class ChaosMonkey { } async call({ - $, throwErrors = !this.disableErrors, addDelays = !this.disableDelays, }: { - $?: Execa$; throwErrors?: boolean; addDelays?: boolean; } = {}) { @@ -60,11 +57,7 @@ export class ChaosMonkey { chaosEvents.push(async () => { console.log("🍌 Chaos monkey: Add delay"); - if ($) { - await $`sleep ${this.delayInSeconds}`; - } else { - await timeout(this.delayInSeconds * 1000); - } + await timeout(this.delayInSeconds * 1000); }); } @@ -72,11 +65,7 @@ export class ChaosMonkey { chaosEvents.push(async () => { console.log("🍌 Chaos monkey: Throw error"); - if ($) { - await $`false`; - } else { - throw new ChaosMonkey.Error("🍌 Chaos monkey: Throw error"); - } + throw new ChaosMonkey.Error("🍌 Chaos monkey: Throw error"); }); } diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index ad14d8145c..2d7876c77f 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -1,10 +1,13 @@ import { ExponentialBackoff } from "@trigger.dev/core/v3/apps"; -import { testDockerCheckpoint, isExecaChildProcess } from "@trigger.dev/core/v3/apps"; +import { testDockerCheckpoint } from "@trigger.dev/core/v3/apps"; import { SimpleLogger } from "@trigger.dev/core/v3/apps"; -import { $ } from "execa"; import { nanoid } from "nanoid"; import fs from "node:fs/promises"; import { ChaosMonkey } from "./chaosMonkey"; +import { Buildah, Crictl, Exec } from "./exec"; +import { setTimeout } from "node:timers/promises"; +import { TempFileCleaner } from "./cleaner"; +import { numFromEnv, boolFromEnv } from "./util"; type CheckpointerInitializeReturn = { canCheckpoint: boolean; @@ -48,8 +51,6 @@ type CheckpointerOptions = { chaosMonkey?: ChaosMonkey; }; -class CheckpointAbortError extends Error {} - async function getFileSize(filePath: string): Promise { try { const stats = await fs.stat(filePath); @@ -95,7 +96,6 @@ export class Checkpointer { private registryTlsVerify: boolean; private disableCheckpointSupport: boolean; - private checkpointPath: string; private simulateCheckpointFailure: boolean; private simulateCheckpointFailureSeconds: number; @@ -103,6 +103,7 @@ export class Checkpointer { private simulatePushFailureSeconds: number; private chaosMonkey: ChaosMonkey; + private tmpCleaner?: TempFileCleaner; constructor(private opts: CheckpointerOptions) { this.#dockerMode = opts.dockerMode; @@ -112,7 +113,6 @@ export class Checkpointer { this.registryTlsVerify = opts.registryTlsVerify ?? true; this.disableCheckpointSupport = opts.disableCheckpointSupport ?? false; - this.checkpointPath = opts.checkpointPath ?? "/checkpoints"; this.simulateCheckpointFailure = opts.simulateCheckpointFailure ?? false; this.simulateCheckpointFailureSeconds = opts.simulateCheckpointFailureSeconds ?? 300; @@ -120,6 +120,7 @@ export class Checkpointer { this.simulatePushFailureSeconds = opts.simulatePushFailureSeconds ?? 300; this.chaosMonkey = opts.chaosMonkey ?? new ChaosMonkey(!!process.env.CHAOS_MONKEY_ENABLED); + this.tmpCleaner = this.#createTmpCleaner(); } async init(): Promise { @@ -138,16 +139,15 @@ export class Checkpointer { this.#logger.error(testCheckpoint.message, testCheckpoint.error ?? ""); return this.#getInitReturn(false); - } else { - try { - await $`buildah login --get-login ${this.registryHost}`; - } catch (error) { - this.#logger.error(`No checkpoint support: Not logged in to registry ${this.registryHost}`); - return this.#getInitReturn(false); - } } - return this.#getInitReturn(true); + const canLogin = await Buildah.canLogin(this.registryHost); + + if (!canLogin) { + this.#logger.error(`No checkpoint support: Not logged in to registry ${this.registryHost}`); + } + + return this.#getInitReturn(canLogin); } #getInitReturn(canCheckpoint: boolean): CheckpointerInitializeReturn { @@ -185,7 +185,7 @@ export class Checkpointer { if (this.#dockerMode) { return basename; } else { - return `${this.checkpointPath}/${basename}.tar`; + return Crictl.getExportLocation(basename); } } @@ -291,7 +291,7 @@ export class Checkpointer { }); this.#waitingForRetry.add(runId); - await new Promise((resolve) => setTimeout(resolve, delay.milliseconds)); + await setTimeout(delay.milliseconds); if (!this.#waitingForRetry.has(runId)) { this.#logger.log("Checkpoint canceled while waiting for retry", { runId }); @@ -402,73 +402,60 @@ export class Checkpointer { const controller = new AbortController(); this.#abortControllers.set(runId, controller); - const assertNotAborted = (abortMessage?: string) => { - if (controller.signal.aborted) { - throw new CheckpointAbortError(abortMessage); - } - - this.#logger.debug("Not aborted", { abortMessage }); + const onAbort = () => { + this.#logger.error("Checkpoint aborted", { options }); + controller.signal.removeEventListener("abort", onAbort); }; - - const $$ = $({ signal: controller.signal }); + controller.signal.addEventListener("abort", onAbort); const shortCode = nanoid(8); const imageRef = this.#getImageRef(projectRef, deploymentVersion, shortCode); const exportLocation = this.#getExportLocation(projectRef, deploymentVersion, shortCode); + const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal }); + const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal }); + const cleanup = async () => { + const metadata = { + runId, + exportLocation, + imageRef, + }; + if (this.#dockerMode) { + this.#logger.debug("Skipping cleanup in docker mode", metadata); return; } - try { - await $`rm ${exportLocation}`; - this.#logger.log("Deleted checkpoint archive", { exportLocation }); + this.#logger.log("Cleaning up", metadata); - await $`buildah rmi ${imageRef}`; - this.#logger.log("Deleted checkpoint image", { imageRef }); + try { + await buildah.cleanup(); + await crictl.cleanup(); } catch (error) { - this.#logger.error("Failure during checkpoint cleanup", { exportLocation, error }); + this.#logger.error("Error during cleanup", { ...metadata, error }); } + + this.#abortControllers.delete(runId); + controller.signal.removeEventListener("abort", onAbort); }; try { - assertNotAborted("chaosMonkey.call"); - await this.chaosMonkey.call({ $: $$ }); + await this.chaosMonkey.call(); this.#logger.log("Checkpointing:", { options }); const containterName = this.#getRunContainerName(runId); - const containterNameWithAttempt = this.#getRunContainerName(runId, attemptNumber); // Create checkpoint (docker) if (this.#dockerMode) { - try { - if (this.opts.forceSimulate || !this.#canCheckpoint) { - this.#logger.log("Simulating checkpoint"); - this.#logger.debug(await $$`docker pause ${containterNameWithAttempt}`); - } else { - if (this.simulateCheckpointFailure) { - if (performance.now() < this.simulateCheckpointFailureSeconds * 1000) { - this.#logger.error("Simulating checkpoint failure", { options }); - throw new Error("SIMULATE_CHECKPOINT_FAILURE"); - } - } - - if (leaveRunning) { - this.#logger.debug( - await $$`docker checkpoint create --leave-running ${containterNameWithAttempt} ${exportLocation}` - ); - } else { - this.#logger.debug( - await $$`docker checkpoint create ${containterNameWithAttempt} ${exportLocation}` - ); - } - } - } catch (error) { - this.#logger.error("Failed while creating docker checkpoint", { exportLocation }); - throw error; - } + await this.#createDockerCheckpoint( + controller.signal, + runId, + exportLocation, + leaveRunning, + attemptNumber + ); this.#logger.log("checkpoint created:", { runId, @@ -490,13 +477,7 @@ export class Checkpointer { return { success: false, reason: "SKIP_RETRYING" }; } - assertNotAborted("cmd: crictl ps"); - const containerId = this.#logger.debug( - // @ts-expect-error - await $`crictl ps` - .pipeStdout($({ stdin: "pipe" })`grep ${containterName}`) - .pipeStdout($({ stdin: "pipe" })`cut -f1 ${"-d "}`) - ); + const containerId = await crictl.ps(containterName, true); if (!containerId.stdout) { this.#logger.error("could not find container id", { options, containterName }); @@ -513,8 +494,7 @@ export class Checkpointer { } // Create checkpoint - assertNotAborted("cmd: crictl checkpoint"); - this.#logger.debug(await $$`crictl checkpoint --export=${exportLocation} ${containerId}`); + await crictl.checkpoint(containerId.stdout, exportLocation); const postCheckpoint = performance.now(); // Print checkpoint size @@ -522,28 +502,20 @@ export class Checkpointer { this.#logger.log("checkpoint archive created", { size, options }); // Create image from checkpoint - assertNotAborted("cmd: buildah from scratch"); - const container = this.#logger.debug(await $$`buildah from scratch`); + const workingContainer = await buildah.from("scratch"); const postFrom = performance.now(); - assertNotAborted("cmd: buildah add"); - this.#logger.debug(await $$`buildah add ${container} ${exportLocation} /`); + await buildah.add(workingContainer.stdout, exportLocation, "/"); const postAdd = performance.now(); - assertNotAborted("cmd: buildah config"); - this.#logger.debug( - await $$`buildah config --annotation=io.kubernetes.cri-o.annotations.checkpoint.name=counter ${container}` - ); + await buildah.config(workingContainer.stdout, [ + `io.kubernetes.cri-o.annotations.checkpoint.name=${shortCode}`, + ]); const postConfig = performance.now(); - assertNotAborted("cmd: buildah commit"); - this.#logger.debug(await $$`buildah commit ${container} ${imageRef}`); + await buildah.commit(workingContainer.stdout, imageRef); const postCommit = performance.now(); - assertNotAborted("cmd: buildah rm"); - this.#logger.debug(await $$`buildah rm ${container}`); - const postRm = performance.now(); - if (this.simulatePushFailure) { if (performance.now() < this.simulatePushFailureSeconds * 1000) { this.#logger.error("Simulating push failure", { options }); @@ -552,10 +524,7 @@ export class Checkpointer { } // Push checkpoint image - assertNotAborted("cmd: buildah push"); - this.#logger.debug( - await $$`buildah push --tls-verify=${String(this.registryTlsVerify)} ${imageRef}` - ); + await buildah.push(imageRef, this.registryTlsVerify); const postPush = performance.now(); const perf = { @@ -564,8 +533,7 @@ export class Checkpointer { "buildah add": postAdd - postFrom, "buildah config": postConfig - postAdd, "buildah commit": postCommit - postConfig, - "buildah rm": postRm - postCommit, - "buildah push": postPush - postRm, + "buildah push": postPush - postCommit, }; this.#logger.log("Checkpointed and pushed image to:", { location: imageRef, perf }); @@ -578,30 +546,77 @@ export class Checkpointer { }, }; } catch (error) { - if (error instanceof CheckpointAbortError) { - this.#logger.error("Checkpoint canceled: CheckpointAbortError", { options, error }); + if (error instanceof Exec.Result) { + if (error.aborted) { + this.#logger.error("Checkpoint canceled: Exec", { options }); + + return { success: false, reason: "CANCELED" }; + } else { + this.#logger.error("Checkpoint command error", { options, error }); + + return { success: false, reason: "ERROR" }; + } + } + + this.#logger.error("Unhandled checkpoint error", { options, error }); + + return { success: false, reason: "ERROR" }; + } finally { + await cleanup(); + if (controller.signal.aborted) { + this.#logger.error("Checkpoint canceled: Cleanup", { options }); + + // Overrides any prior return value return { success: false, reason: "CANCELED" }; } + } + } - if (isExecaChildProcess(error)) { - if (error.isCanceled) { - this.#logger.error("Checkpoint canceled: ExecaChildProcess", { options, error }); + async #createDockerCheckpoint( + abortSignal: AbortSignal, + runId: string, + exportLocation: string, + leaveRunning: boolean, + attemptNumber?: number + ) { + const containterNameWithAttempt = this.#getRunContainerName(runId, attemptNumber); + const exec = new Exec({ logger: this.#logger, abortSignal }); - return { success: false, reason: "CANCELED" }; + try { + if (this.opts.forceSimulate || !this.#canCheckpoint) { + this.#logger.log("Simulating checkpoint"); + + await exec.x("docker", ["pause", containterNameWithAttempt]); + + return; + } + + if (this.simulateCheckpointFailure) { + if (performance.now() < this.simulateCheckpointFailureSeconds * 1000) { + this.#logger.error("Simulating checkpoint failure", { + runId, + exportLocation, + leaveRunning, + attemptNumber, + }); + + throw new Error("SIMULATE_CHECKPOINT_FAILURE"); } + } - this.#logger.error("Checkpoint command error", { options, error }); + const args = ["checkpoint", "create"]; - return { success: false, reason: "ERROR" }; + if (leaveRunning) { + args.push("--leave-running"); } - this.#logger.error("Unhandled checkpoint error", { options, error }); + args.push(containterNameWithAttempt, exportLocation); - return { success: false, reason: "ERROR" }; - } finally { - this.#abortControllers.delete(runId); - await cleanup(); + await exec.x("docker", args); + } catch (error) { + this.#logger.error("Failed while creating docker checkpoint", { exportLocation }); + throw error; } } @@ -620,4 +635,34 @@ export class Checkpointer { #getRunContainerName(suffix: string, attemptNumber?: number) { return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`; } + + #createTmpCleaner() { + if (!boolFromEnv("TMP_CLEANER_ENABLED", false)) { + return; + } + + const defaultPaths = [Buildah.tmpDir, Crictl.checkpointDir].filter(Boolean); + const pathsOverride = process.env.TMP_CLEANER_PATHS_OVERRIDE?.split(",").filter(Boolean) ?? []; + const paths = pathsOverride.length ? pathsOverride : defaultPaths; + + if (paths.length === 0) { + this.#logger.error("TempFileCleaner enabled but no paths to clean", { + defaultPaths, + pathsOverride, + TMP_CLEANER_PATHS_OVERRIDE: process.env.TMP_CLEANER_PATHS_OVERRIDE, + }); + + return; + } + const cleaner = new TempFileCleaner({ + paths, + maxAgeMinutes: numFromEnv("TMP_CLEANER_MAX_AGE_MINUTES", 60), + intervalSeconds: numFromEnv("TMP_CLEANER_INTERVAL_SECONDS", 300), + leadingEdge: boolFromEnv("TMP_CLEANER_LEADING_EDGE", false), + }); + + cleaner.start(); + + return cleaner; + } } diff --git a/apps/coordinator/src/cleaner.ts b/apps/coordinator/src/cleaner.ts new file mode 100644 index 0000000000..a3b50e2e91 --- /dev/null +++ b/apps/coordinator/src/cleaner.ts @@ -0,0 +1,102 @@ +import { SimpleLogger } from "@trigger.dev/core/v3/apps"; +import { Exec } from "./exec"; +import { setTimeout } from "timers/promises"; + +interface TempFileCleanerOptions { + paths: string[]; + maxAgeMinutes: number; + intervalSeconds: number; + leadingEdge?: boolean; +} + +export class TempFileCleaner { + private logger = new SimpleLogger("[tmp-cleaner]"); + private enabled = false; + private exec = new Exec({ logger: this.logger }); + + constructor(private opts: TempFileCleanerOptions) {} + + async start() { + this.logger.log("start", this.opts); + this.enabled = true; + + if (!this.opts.leadingEdge) { + await this.wait(); + } + + while (this.enabled) { + try { + await this.clean(); + } catch (error) { + this.logger.error("error during tick", error); + } + + await this.wait(); + } + } + + stop() { + this.logger.log("stop", this.opts); + this.enabled = false; + } + + private wait() { + return setTimeout(this.opts.intervalSeconds * 1000); + } + + private async clean() { + for (const path of this.opts.paths) { + try { + await this.cleanSingle(path); + } catch (error) { + this.logger.error("error while cleaning", { path, error }); + } + } + } + + private async cleanSingle(startingPoint: string) { + const maxAgeMinutes = this.opts.maxAgeMinutes; + + const ignoreStartingPoint = ["!", "-path", startingPoint]; + const onlyDirectDescendants = ["-maxdepth", "1"]; + const onlyOldFiles = ["-mmin", `+${maxAgeMinutes}`]; + + const baseArgs = [ + startingPoint, + ...ignoreStartingPoint, + ...onlyDirectDescendants, + ...onlyOldFiles, + ]; + + const duArgs = ["-exec", "du", "-ch", "{}", "+"]; + const rmArgs = ["-exec", "rm", "-rf", "{}", "+"]; + + const du = this.x("find", [...baseArgs, ...duArgs]); + const duOutput = await du; + + const duLines = duOutput.stdout.trim().split("\n"); + const fileCount = duLines.length - 1; // last line is the total + const fileSize = duLines.at(-1)?.trim().split(/\s+/)[0]; + + if (fileCount === 0) { + this.logger.log("nothing to delete", { startingPoint, maxAgeMinutes }); + return; + } + + this.logger.log("deleting old files", { fileCount, fileSize, startingPoint, maxAgeMinutes }); + + const rm = this.x("find", [...baseArgs, ...rmArgs]); + const rmOutput = await rm; + + if (rmOutput.stderr.length > 0) { + this.logger.error("delete unsuccessful", rmOutput); + return; + } + + this.logger.log("deleted old files", { fileCount, fileSize, startingPoint, maxAgeMinutes }); + } + + private get x() { + return this.exec.x.bind(this.exec); + } +} diff --git a/apps/coordinator/src/exec.ts b/apps/coordinator/src/exec.ts new file mode 100644 index 0000000000..c0d8c0c862 --- /dev/null +++ b/apps/coordinator/src/exec.ts @@ -0,0 +1,282 @@ +import { SimpleLogger } from "@trigger.dev/core/v3/apps"; +import { randomUUID } from "crypto"; +import { homedir } from "os"; +import { type Result, x } from "tinyexec"; + +class TinyResult { + pid?: number; + exitCode?: number; + aborted: boolean; + killed: boolean; + + constructor(result: Result) { + this.pid = result.pid; + this.exitCode = result.exitCode; + this.aborted = result.aborted; + this.killed = result.killed; + } +} + +interface ExecOptions { + logger?: SimpleLogger; + abortSignal?: AbortSignal; + logOutput?: boolean; + trimArgs?: boolean; + neverThrow?: boolean; +} + +export class Exec { + private logger: SimpleLogger; + private abortSignal: AbortSignal | undefined; + + private logOutput: boolean; + private trimArgs: boolean; + private neverThrow: boolean; + + constructor(opts: ExecOptions) { + this.logger = opts.logger ?? new SimpleLogger(); + this.abortSignal = opts.abortSignal; + + this.logOutput = opts.logOutput ?? true; + this.trimArgs = opts.trimArgs ?? true; + this.neverThrow = opts.neverThrow ?? false; + } + + async x( + command: string, + args?: string[], + opts?: { neverThrow?: boolean; ignoreAbort?: boolean } + ) { + const argsTrimmed = this.trimArgs ? args?.map((arg) => arg.trim()) : args; + + const commandWithFirstArg = `${command}${argsTrimmed?.length ? ` ${argsTrimmed[0]}` : ""}`; + this.logger.debug(`exec: ${commandWithFirstArg}`, { command, args, argsTrimmed }); + + const result = x(command, argsTrimmed, { + signal: opts?.ignoreAbort ? undefined : this.abortSignal, + // We don't use this as it doesn't cover killed and aborted processes + // throwOnError: true, + }); + + const output = await result; + + const metadata = { + command, + argsRaw: args, + argsTrimmed, + ...output, + }; + + if (this.logOutput) { + this.logger.debug(`output: ${commandWithFirstArg}`, metadata); + } + + if (this.neverThrow || opts?.neverThrow) { + return output; + } + + if (result.aborted) { + this.logger.error(`aborted: ${commandWithFirstArg}`, metadata); + throw new TinyResult(result); + } + + if (result.killed) { + this.logger.error(`killed: ${commandWithFirstArg}`, metadata); + throw new TinyResult(result); + } + + if (result.exitCode !== 0) { + this.logger.error(`non-zero exit: ${commandWithFirstArg}`, metadata); + throw new TinyResult(result); + } + + return output; + } + + static Result = TinyResult; +} + +interface BuildahOptions { + id?: string; + abortSignal?: AbortSignal; +} + +export class Buildah { + private id: string; + private logger: SimpleLogger; + private exec: Exec; + + private containers = new Set(); + private images = new Set(); + + constructor(opts: BuildahOptions) { + this.id = opts.id ?? randomUUID(); + this.logger = new SimpleLogger(`[buildah][${this.id}]`); + + this.exec = new Exec({ + logger: this.logger, + abortSignal: opts.abortSignal, + }); + + this.logger.log("initiaized", { opts }); + } + + private get x() { + return this.exec.x.bind(this.exec); + } + + async from(baseImage: string) { + const output = await this.x("buildah", ["from", baseImage]); + this.containers.add(output.stdout); + return output; + } + + async add(container: string, src: string, dest: string) { + return await this.x("buildah", ["add", container, src, dest]); + } + + async config(container: string, annotations: string[]) { + const args = ["config"]; + + for (const annotation of annotations) { + args.push(`--annotation=${annotation}`); + } + + args.push(container); + + return await this.x("buildah", args); + } + + async commit(container: string, imageRef: string) { + const output = await this.x("buildah", ["commit", container, imageRef]); + this.images.add(output.stdout); + return output; + } + + async push(imageRef: string, registryTlsVerify?: boolean) { + return await this.x("buildah", [ + "push", + `--tls-verify=${String(!!registryTlsVerify)}`, + imageRef, + ]); + } + + async cleanup() { + if (this.containers.size > 0) { + try { + const output = await this.x("buildah", ["rm", ...this.containers], { ignoreAbort: true }); + this.containers.clear(); + + if (output.stderr.length > 0) { + this.logger.error("failed to remove some containers", { output }); + } + } catch (error) { + this.logger.error("failed to clean up containers", { error, containers: this.containers }); + } + } else { + this.logger.debug("no containers to clean up"); + } + + if (this.images.size > 0) { + try { + const output = await this.x("buildah", ["rmi", ...this.images], { ignoreAbort: true }); + this.images.clear(); + + if (output.stderr.length > 0) { + this.logger.error("failed to remove some images", { output }); + } + } catch (error) { + this.logger.error("failed to clean up images", { error, images: this.images }); + } + } else { + this.logger.debug("no images to clean up"); + } + } + + static async canLogin(registryHost: string) { + try { + await x("buildah", ["login", "--get-login", registryHost], { throwOnError: true }); + return true; + } catch (error) { + return false; + } + } + + static get tmpDir() { + return process.env.TMPDIR ?? "/var/tmp"; + } + + static get storageRootDir() { + return process.getuid?.() === 0 + ? "/var/lib/containers/storage" + : `${homedir()}/.local/share/containers/storage`; + } +} + +interface CrictlOptions { + id?: string; + abortSignal?: AbortSignal; +} + +export class Crictl { + private id: string; + private logger: SimpleLogger; + private exec: Exec; + + private archives = new Set(); + + constructor(opts: CrictlOptions) { + this.id = opts.id ?? randomUUID(); + this.logger = new SimpleLogger(`[crictl][${this.id}]`); + + this.exec = new Exec({ + logger: this.logger, + abortSignal: opts.abortSignal, + }); + + this.logger.log("initiaized", { opts }); + } + + private get x() { + return this.exec.x.bind(this.exec); + } + + async ps(containerName: string, quiet?: boolean) { + return await this.x("crictl", ["ps", "--name", containerName, quiet ? "--quiet" : ""]); + } + + async checkpoint(containerId: string, exportLocation: string) { + const output = await this.x("crictl", [ + "checkpoint", + `--export=${exportLocation}`, + containerId, + ]); + this.archives.add(exportLocation); + return output; + } + + async cleanup() { + if (this.archives.size > 0) { + try { + const output = await this.x("rm", ["-v", ...this.archives], { ignoreAbort: true }); + this.archives.clear(); + + if (output.stderr.length > 0) { + this.logger.error("failed to remove some archives", { output }); + } + } catch (error) { + this.logger.error("failed to clean up archives", { error, archives: this.archives }); + } + } else { + this.logger.debug("no archives to clean up"); + } + } + + static getExportLocation(identifier: string) { + return `${this.checkpointDir}/${identifier}.tar`; + } + + static get checkpointDir() { + return process.env.CRI_CHECKPOINT_DIR ?? "/checkpoints"; + } +} diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index ca3f1b427a..d52edfb3aa 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -14,6 +14,7 @@ import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps"; import { SimpleLogger } from "@trigger.dev/core/v3/apps"; import { ChaosMonkey } from "./chaosMonkey"; import { Checkpointer } from "./checkpointer"; +import { boolFromEnv, numFromEnv } from "./util"; import { collectDefaultMetrics, register, Gauge } from "prom-client"; collectDefaultMetrics(); @@ -22,26 +23,6 @@ const HTTP_SERVER_PORT = Number(process.env.HTTP_SERVER_PORT || 8020); const NODE_NAME = process.env.NODE_NAME || "coordinator"; const DEFAULT_RETRY_DELAY_THRESHOLD_IN_MS = 30_000; -const boolFromEnv = (env: string, defaultValue: boolean): boolean => { - const value = process.env[env]; - - if (!value) { - return defaultValue; - } - - return ["1", "true"].includes(value); -}; - -const numFromEnv = (env: string, defaultValue: number): number => { - const value = process.env[env]; - - if (!value) { - return defaultValue; - } - - return parseInt(value, 10); -}; - const PLATFORM_ENABLED = ["1", "true"].includes(process.env.PLATFORM_ENABLED ?? "true"); const PLATFORM_HOST = process.env.PLATFORM_HOST || "127.0.0.1"; const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030; @@ -66,7 +47,6 @@ class TaskCoordinator { heartbeat: this.#sendRunHeartbeat.bind(this), registryHost: process.env.REGISTRY_HOST, registryNamespace: process.env.REGISTRY_NAMESPACE, - checkpointPath: process.env.CHECKPOINT_PATH, registryTlsVerify: boolFromEnv("REGISTRY_TLS_VERIFY", true), disableCheckpointSupport: boolFromEnv("DISABLE_CHECKPOINT_SUPPORT", false), simulatePushFailure: boolFromEnv("SIMULATE_PUSH_FAILURE", false), diff --git a/apps/coordinator/src/util.ts b/apps/coordinator/src/util.ts new file mode 100644 index 0000000000..74bb605ee2 --- /dev/null +++ b/apps/coordinator/src/util.ts @@ -0,0 +1,19 @@ +export const boolFromEnv = (env: string, defaultValue: boolean): boolean => { + const value = process.env[env]; + + if (!value) { + return defaultValue; + } + + return ["1", "true"].includes(value); +}; + +export const numFromEnv = (env: string, defaultValue: number): number => { + const value = process.env[env]; + + if (!value) { + return defaultValue; + } + + return parseInt(value, 10); +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 608b27b3ed..414c875357 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -77,9 +77,6 @@ importers: '@trigger.dev/core': specifier: workspace:* version: link:../../packages/core - execa: - specifier: ^8.0.1 - version: 8.0.1 nanoid: specifier: ^5.0.6 version: 5.0.6 @@ -89,6 +86,9 @@ importers: socket.io: specifier: 4.7.4 version: 4.7.4 + tinyexec: + specifier: ^0.3.0 + version: 0.3.0 devDependencies: '@types/node': specifier: ^18 @@ -27494,6 +27494,10 @@ packages: resolution: {integrity: sha512-au8dwv4xKSDR+Fw52csDo3wcDztPdne2oM1o/7LFro4h6bdFmvyUAeAfX40pwDtzHgRFqz1XWaUqgKS2G83/ig==} dev: false + /tinyexec@0.3.0: + resolution: {integrity: sha512-tVGE0mVJPGb0chKhqmsoosjsS+qUnJVGJpZgsHYQcGoPlG3B51R3PouqTgEGH2Dc9jjFyOqOpix6ZHNMXp1FZg==} + dev: false + /tinyglobby@0.2.2: resolution: {integrity: sha512-mZ2sDMaySvi1PkTp4lTo1In2zjU+cY8OvZsfwrDrx3YGRbXPX1/cbPwCR9zkm3O/Fz9Jo0F1HNgIQ1b8BepqyQ==} engines: {node: '>=12.0.0'}