Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const chaosMonkey = new ChaosMonkey(
!!process.env.CHAOS_MONKEY_DISABLE_DELAYS
);

class CheckpointReadinessTimeoutError extends Error {}
class CheckpointCancelError extends Error {}

class TaskCoordinator {
#httpServer: ReturnType<typeof createServer>;
#checkpointer = new Checkpointer({
Expand Down Expand Up @@ -241,7 +244,7 @@ class TaskCoordinator {
return;
}

this.#checkpointer.cancelCheckpoint(message.runId);
this.#cancelCheckpoint(message.runId);

if (message.delayInMs) {
taskSocket.emit("REQUEST_EXIT", {
Expand Down Expand Up @@ -398,9 +401,14 @@ class TaskCoordinator {

let timeout: NodeJS.Timeout | undefined = undefined;

const CHECKPOINTABLE_TIMEOUT_SECONDS = 20;

const isCheckpointable = new Promise((resolve, reject) => {
// We set a reasonable timeout to prevent waiting forever
timeout = setTimeout(() => reject("timeout"), 20_000);
timeout = setTimeout(
() => reject(new CheckpointReadinessTimeoutError()),
CHECKPOINTABLE_TIMEOUT_SECONDS * 1000
);

this.#checkpointableTasks.set(socket.data.runId, { resolve, reject });
});
Expand All @@ -415,10 +423,24 @@ class TaskCoordinator {
} catch (error) {
logger.error("Error while waiting for checkpointable state", { error });

await crashRun({
name: "ReadyForCheckpointError",
message: `Failed to become checkpointable for ${reason}`,
});
if (error instanceof CheckpointReadinessTimeoutError) {
await crashRun({
name: error.name,
message: `Failed to become checkpointable in ${CHECKPOINTABLE_TIMEOUT_SECONDS}s for ${reason}`,
});

return {
success: false,
reason: "timeout",
};
}

if (error instanceof CheckpointCancelError) {
return {
success: false,
reason: "canceled",
};
}

return {
success: false,
Expand Down Expand Up @@ -1065,7 +1087,7 @@ class TaskCoordinator {

if (checkpointWait) {
// Stop waiting for task to reach checkpointable state
checkpointWait.reject("Checkpoint cancelled");
checkpointWait.reject(new CheckpointCancelError());
}

// Cancel checkpointing procedure
Expand Down