Skip to content

Commit 73bb2dd

Browse files
committed
fix checkpoint cancellation
1 parent 1c33bab commit 73bb2dd

File tree

2 files changed

+145
-122
lines changed

2 files changed

+145
-122
lines changed

apps/coordinator/src/checkpointer.ts

Lines changed: 113 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type CheckpointAndPushResult =
2727
| { success: true; checkpoint: CheckpointData }
2828
| {
2929
success: false;
30-
reason?: "CANCELED" | "DISABLED" | "ERROR" | "IN_PROGRESS" | "NO_SUPPORT" | "SKIP_RETRYING";
30+
reason?: "CANCELED" | "ERROR" | "SKIP_RETRYING";
3131
};
3232

3333
type CheckpointData = {
@@ -87,9 +87,14 @@ export class Checkpointer {
8787
#dockerMode: boolean;
8888

8989
#logger = new SimpleStructuredLogger("checkpointer");
90-
#abortControllers = new Map<string, AbortController>();
90+
9191
#failedCheckpoints = new Map<string, unknown>();
92-
#waitingToCheckpoint = new Set<string>();
92+
93+
// Indexed by run ID
94+
#runAbortControllers = new Map<
95+
string,
96+
{ signal: AbortSignal; abort: AbortController["abort"] }
97+
>();
9398

9499
private registryHost: string;
95100
private registryNamespace: string;
@@ -196,25 +201,73 @@ export class Checkpointer {
196201
const start = performance.now();
197202
this.#logger.log(`checkpointAndPush() start`, { start, opts });
198203

199-
let interval: NodeJS.Timer | undefined;
204+
const { runId } = opts;
200205

206+
let interval: NodeJS.Timer | undefined;
201207
if (opts.shouldHeartbeat) {
202208
interval = setInterval(() => {
203-
this.#logger.log("Sending heartbeat", { runId: opts.runId });
204-
this.opts.heartbeat(opts.runId);
209+
this.#logger.log("Sending heartbeat", { runId });
210+
this.opts.heartbeat(runId);
205211
}, 20_000);
206212
}
207213

214+
const controller = new AbortController();
215+
const signal = controller.signal;
216+
const abort = controller.abort.bind(controller);
217+
218+
const onAbort = () => {
219+
this.#logger.error("Checkpoint aborted", { runId, options: opts });
220+
};
221+
222+
signal.addEventListener("abort", onAbort, { once: true });
223+
224+
const removeCurrentAbortController = () => {
225+
const controller = this.#runAbortControllers.get(runId);
226+
227+
// Ensure only the current controller is removed
228+
if (controller && controller.signal === signal) {
229+
this.#runAbortControllers.delete(runId);
230+
}
231+
232+
// Remove the abort listener in case it hasn't fired
233+
signal.removeEventListener("abort", onAbort);
234+
};
235+
236+
if (!this.#dockerMode && !this.#canCheckpoint) {
237+
this.#logger.error("No checkpoint support. Simulation requires docker.");
238+
this.#failCheckpoint(runId, "NO_SUPPORT");
239+
return;
240+
}
241+
242+
if (this.#isRunCheckpointing(runId)) {
243+
this.#logger.error("Checkpoint procedure already in progress", { options: opts });
244+
this.#failCheckpoint(runId, "IN_PROGRESS");
245+
return;
246+
}
247+
248+
// This is a new checkpoint, clear any last failure for this run
249+
this.#clearFailedCheckpoint(runId);
250+
251+
if (this.disableCheckpointSupport) {
252+
this.#logger.error("Checkpoint support disabled", { options: opts });
253+
this.#failCheckpoint(runId, "DISABLED");
254+
return;
255+
}
256+
257+
this.#runAbortControllers.set(runId, { signal, abort });
258+
208259
try {
209-
const result = await this.#checkpointAndPushWithBackoff(opts, delayMs);
260+
const result = await this.#checkpointAndPushWithBackoff(opts, { delayMs, signal });
210261

211262
const end = performance.now();
212263
this.#logger.log(`checkpointAndPush() end`, {
213264
start,
214265
end,
215266
diff: end - start,
267+
diffWithoutDelay: end - start - (delayMs ?? 0),
216268
opts,
217269
success: result.success,
270+
delayMs,
218271
});
219272

220273
if (!result.success) {
@@ -226,40 +279,41 @@ export class Checkpointer {
226279
if (opts.shouldHeartbeat) {
227280
clearInterval(interval);
228281
}
282+
removeCurrentAbortController();
229283
}
230284
}
231285

232-
#isCheckpointing(runId: string) {
233-
return this.#abortControllers.has(runId) || this.#waitingToCheckpoint.has(runId);
286+
#isRunCheckpointing(runId: string) {
287+
return this.#runAbortControllers.has(runId);
234288
}
235289

236-
cancelCheckpoint(runId: string): boolean {
290+
cancelAllCheckpointsForRun(runId: string): boolean {
291+
this.#logger.log("cancelAllCheckpointsForRun: call", { runId });
292+
237293
// If the last checkpoint failed, pretend we canceled it
238294
// This ensures tasks don't wait for external resume messages to continue
239295
if (this.#hasFailedCheckpoint(runId)) {
296+
this.#logger.log("cancelAllCheckpointsForRun: hasFailedCheckpoint", { runId });
240297
this.#clearFailedCheckpoint(runId);
241298
return true;
242299
}
243300

244-
if (this.#waitingToCheckpoint.has(runId)) {
245-
this.#waitingToCheckpoint.delete(runId);
246-
return true;
247-
}
248-
249-
const controller = this.#abortControllers.get(runId);
301+
const controller = this.#runAbortControllers.get(runId);
250302

251303
if (!controller) {
252-
this.#logger.debug("Nothing to cancel", { runId });
304+
this.#logger.debug("cancelAllCheckpointsForRun: no abort controller", { runId });
253305
return false;
254306
}
255307

256-
if (controller.signal.aborted) {
257-
this.#logger.debug("Controller already aborted", { runId });
308+
const { abort, signal } = controller;
309+
310+
if (signal.aborted) {
311+
this.#logger.debug("cancelAllCheckpointsForRun: signal already aborted", { runId });
258312
return false;
259313
}
260314

261-
controller.abort("cancelCheckpoint()");
262-
this.#abortControllers.delete(runId);
315+
abort("cancelCheckpoint()");
316+
this.#runAbortControllers.delete(runId);
263317

264318
return true;
265319
}
@@ -272,19 +326,16 @@ export class Checkpointer {
272326
deploymentVersion,
273327
attemptNumber,
274328
}: CheckpointAndPushOptions,
275-
delayMs?: number
329+
{ delayMs, signal }: { delayMs?: number; signal: AbortSignal }
276330
): Promise<CheckpointAndPushResult> {
277331
if (delayMs && delayMs > 0) {
278332
this.#logger.log("Delaying checkpoint", { runId, delayMs });
279333

280-
this.#waitingToCheckpoint.add(runId);
281-
await setTimeout(delayMs);
282-
283-
if (!this.#waitingToCheckpoint.has(runId)) {
334+
try {
335+
await setTimeout(delayMs, undefined, { signal });
336+
} catch (error) {
284337
this.#logger.log("Checkpoint canceled during initial delay", { runId });
285338
return { success: false, reason: "CANCELED" };
286-
} else {
287-
this.#waitingToCheckpoint.delete(runId);
288339
}
289340
}
290341

@@ -310,24 +361,24 @@ export class Checkpointer {
310361
delay,
311362
});
312363

313-
this.#waitingToCheckpoint.add(runId);
314-
await setTimeout(delay.milliseconds);
315-
316-
if (!this.#waitingToCheckpoint.has(runId)) {
317-
this.#logger.log("Checkpoint canceled while waiting for retry", { runId });
364+
try {
365+
await setTimeout(delay.milliseconds, undefined, { signal });
366+
} catch (error) {
367+
this.#logger.log("Checkpoint canceled during retry delay", { runId });
318368
return { success: false, reason: "CANCELED" };
319-
} else {
320-
this.#waitingToCheckpoint.delete(runId);
321369
}
322370
}
323371

324-
const result = await this.#checkpointAndPush({
325-
runId,
326-
leaveRunning,
327-
projectRef,
328-
deploymentVersion,
329-
attemptNumber,
330-
});
372+
const result = await this.#checkpointAndPush(
373+
{
374+
runId,
375+
leaveRunning,
376+
projectRef,
377+
deploymentVersion,
378+
attemptNumber,
379+
},
380+
{ signal }
381+
);
331382

332383
if (result.success) {
333384
return result;
@@ -339,24 +390,6 @@ export class Checkpointer {
339390
return result;
340391
}
341392

342-
if (result.reason === "IN_PROGRESS") {
343-
this.#logger.log("Checkpoint already in progress, won't retry", { runId });
344-
this.#failCheckpoint(runId, result.reason);
345-
return result;
346-
}
347-
348-
if (result.reason === "NO_SUPPORT") {
349-
this.#logger.log("No checkpoint support, won't retry", { runId });
350-
this.#failCheckpoint(runId, result.reason);
351-
return result;
352-
}
353-
354-
if (result.reason === "DISABLED") {
355-
this.#logger.log("Checkpoint support disabled, won't retry", { runId });
356-
this.#failCheckpoint(runId, result.reason);
357-
return result;
358-
}
359-
360393
if (result.reason === "SKIP_RETRYING") {
361394
this.#logger.log("Skipping retrying", { runId });
362395
return result;
@@ -384,13 +417,16 @@ export class Checkpointer {
384417
return { success: false, reason: "ERROR" };
385418
}
386419

387-
async #checkpointAndPush({
388-
runId,
389-
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
390-
projectRef,
391-
deploymentVersion,
392-
attemptNumber,
393-
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
420+
async #checkpointAndPush(
421+
{
422+
runId,
423+
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
424+
projectRef,
425+
deploymentVersion,
426+
attemptNumber,
427+
}: CheckpointAndPushOptions,
428+
{ signal }: { signal: AbortSignal }
429+
): Promise<CheckpointAndPushResult> {
394430
await this.init();
395431

396432
const options = {
@@ -401,47 +437,12 @@ export class Checkpointer {
401437
attemptNumber,
402438
};
403439

404-
if (!this.#dockerMode && !this.#canCheckpoint) {
405-
this.#logger.error("No checkpoint support. Simulation requires docker.");
406-
return { success: false, reason: "NO_SUPPORT" };
407-
}
408-
409-
if (this.#isCheckpointing(runId)) {
410-
this.#logger.error("Checkpoint procedure already in progress", { options });
411-
return { success: false, reason: "IN_PROGRESS" };
412-
}
413-
414-
// This is a new checkpoint, clear any last failure for this run
415-
this.#clearFailedCheckpoint(runId);
416-
417-
if (this.disableCheckpointSupport) {
418-
this.#logger.error("Checkpoint support disabled", { options });
419-
return { success: false, reason: "DISABLED" };
420-
}
421-
422-
const controller = new AbortController();
423-
this.#abortControllers.set(runId, controller);
424-
425-
const onAbort = () => {
426-
this.#logger.error("Checkpoint aborted", { options });
427-
controller.signal.removeEventListener("abort", onAbort);
428-
};
429-
controller.signal.addEventListener("abort", onAbort);
430-
431440
const shortCode = nanoid(8);
432441
const imageRef = this.#getImageRef(projectRef, deploymentVersion, shortCode);
433442
const exportLocation = this.#getExportLocation(projectRef, deploymentVersion, shortCode);
434443

435-
const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });
436-
const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });
437-
438-
const removeCurrentAbortController = () => {
439-
// Ensure only the current controller is removed
440-
if (this.#abortControllers.get(runId) === controller) {
441-
this.#abortControllers.delete(runId);
442-
}
443-
controller.signal.removeEventListener("abort", onAbort);
444-
};
444+
const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: signal });
445+
const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: signal });
445446

446447
const cleanup = async () => {
447448
const metadata = {
@@ -452,7 +453,6 @@ export class Checkpointer {
452453

453454
if (this.#dockerMode) {
454455
this.#logger.debug("Skipping cleanup in docker mode", metadata);
455-
removeCurrentAbortController();
456456
return;
457457
}
458458

@@ -464,28 +464,26 @@ export class Checkpointer {
464464
} catch (error) {
465465
this.#logger.error("Error during cleanup", { ...metadata, error });
466466
}
467-
468-
removeCurrentAbortController();
469467
};
470468

471469
try {
472470
await this.chaosMonkey.call();
473471

474-
this.#logger.log("Checkpointing:", { options });
472+
this.#logger.log("checkpointAndPush: checkpointing", { options });
475473

476474
const containterName = this.#getRunContainerName(runId);
477475

478476
// Create checkpoint (docker)
479477
if (this.#dockerMode) {
480478
await this.#createDockerCheckpoint(
481-
controller.signal,
479+
signal,
482480
runId,
483481
exportLocation,
484482
leaveRunning,
485483
attemptNumber
486484
);
487485

488-
this.#logger.log("checkpoint created:", {
486+
this.#logger.log("checkpointAndPush: checkpoint created", {
489487
runId,
490488
location: exportLocation,
491489
});
@@ -586,13 +584,16 @@ export class Checkpointer {
586584
}
587585
}
588586

589-
this.#logger.error("Unhandled checkpoint error", { options, error });
587+
this.#logger.error("Unhandled checkpoint error", {
588+
options,
589+
error: error instanceof Error ? error.message : error,
590+
});
590591

591592
return { success: false, reason: "ERROR" };
592593
} finally {
593594
await cleanup();
594595

595-
if (controller.signal.aborted) {
596+
if (signal.aborted) {
596597
this.#logger.error("Checkpoint canceled: Cleanup", { options });
597598

598599
// Overrides any prior return value

0 commit comments

Comments
 (0)