diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index e18dad077b..350b81e3ff 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -224,6 +224,7 @@ class ManagedSupervisor { try { await this.workloadManager.create({ + dequeuedAt: message.dequeuedAt, envId: message.environment.id, envType: message.environment.type, image: message.image, diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 0fd6416e8d..09695bc897 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -28,6 +28,8 @@ export class DockerWorkloadManager implements WorkloadManager { "run", "--detach", `--network=${env.DOCKER_NETWORK}`, + `--env=TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`, + `--env=TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`, `--env=TRIGGER_ENV_ID=${opts.envId}`, `--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`, `--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`, diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 685ef42f10..8b3c48ffed 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -61,6 +61,14 @@ export class KubernetesWorkloadManager implements WorkloadManager { ], resources: this.#getResourcesForMachine(opts.machine), env: [ + { + name: "TRIGGER_DEQUEUED_AT_MS", + value: opts.dequeuedAt.getTime().toString(), + }, + { + name: "TRIGGER_POD_SCHEDULED_AT_MS", + value: Date.now().toString(), + }, { name: "TRIGGER_RUN_ID", value: opts.runFriendlyId, @@ -97,7 +105,11 @@ export class KubernetesWorkloadManager implements WorkloadManager { }, { name: "TRIGGER_WORKER_INSTANCE_NAME", - value: env.TRIGGER_WORKER_INSTANCE_NAME, + valueFrom: { + fieldRef: { + fieldPath: "spec.nodeName", + }, + }, }, { name: "OTEL_EXPORTER_OTLP_ENDPOINT", diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index 6240975146..a5d7ed3c90 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -21,6 +21,7 @@ export interface WorkloadManagerCreateOptions { machine: MachinePreset; version: string; nextAttemptNumber?: number; + dequeuedAt: Date; // identifiers envId: string; envType: EnvironmentType; diff --git a/apps/webapp/app/utils/timelineSpanEvents.ts b/apps/webapp/app/utils/timelineSpanEvents.ts index 85d82164e1..0dcbd2a401 100644 --- a/apps/webapp/app/utils/timelineSpanEvents.ts +++ b/apps/webapp/app/utils/timelineSpanEvents.ts @@ -171,7 +171,7 @@ function getAdminOnlyForEvent(event: string): boolean { return true; } case "import": { - return true; + return false; } case "lazy_payload": { return true; diff --git a/apps/webapp/test/timelineSpanEvents.test.ts b/apps/webapp/test/timelineSpanEvents.test.ts index cf02db5c0a..adc02b6f57 100644 --- a/apps/webapp/test/timelineSpanEvents.test.ts +++ b/apps/webapp/test/timelineSpanEvents.test.ts @@ -62,11 +62,11 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false); // Only dequeue and fork events should be visible for non-admins - expect(result.length).toBe(2); + expect(result.length).toBe(3); expect(result.some((event) => event.name === "Dequeued")).toBe(true); expect(result.some((event) => event.name === "Launched")).toBe(true); expect(result.some((event) => event.name === "Attempt created")).toBe(false); - expect(result.some((event) => event.name.includes("Importing"))).toBe(false); + expect(result.some((event) => event.name.includes("Importing"))).toBe(true); }); test("should include all events when isAdmin is true", () => { @@ -220,7 +220,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { expect(result.some((event) => event.name === "Attempt created")).toBe(false); }); - test("should filter import events for non-admin when fork event exists", () => { + test.skip("should filter import events for non-admin when fork event exists", () => { const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false); // With fork event, import should be hidden for non-admins diff --git a/packages/cli-v3/src/entryPoints/dev-run-controller.ts b/packages/cli-v3/src/entryPoints/dev-run-controller.ts index 847152d48c..ccfc68e259 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-controller.ts @@ -369,11 +369,15 @@ export class DevRunController { try { await this.cancelAttempt(); } catch (error) { - logger.debug("Failed to cancel attempt, shutting down", { + logger.debug("Failed to cancel attempt, killing task run process", { error, }); - //todo kill the process? + try { + await this.taskRunProcess?.kill("SIGKILL"); + } catch (error) { + logger.debug("Failed to cancel attempt, failed to kill task run process", { error }); + } return; } @@ -512,8 +516,6 @@ export class DevRunController { snapshot: snapshot.friendlyId, }); - // TODO: We may already be executing this run, this may be a new attempt - // This is the only case where incrementing the attempt number is allowed this.enterRunPhase(run, snapshot); const metrics = [ @@ -539,9 +541,6 @@ export class DevRunController { try { return await this.executeRun({ run, snapshot, execution, envVars, metrics }); } catch (error) { - // TODO: Handle the case where we're in the warm start phase or executing a new run - // This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch - logger.debug("Error while executing attempt", { error, }); @@ -570,8 +569,6 @@ export class DevRunController { error: completionResult.error, }); - // TODO: Maybe we should keep retrying for a while longer - this.runFinished(); return; } diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 2e89225c36..49352f2a14 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -8,6 +8,7 @@ import { type CompleteRunAttemptResult, HeartbeatService, type RunExecutionData, + type TaskRunExecutionMetrics, type TaskRunExecutionResult, type TaskRunFailedExecutionResult, WorkerManifest, @@ -25,6 +26,11 @@ import { assertExhaustive } from "../utilities/assertExhaustive.js"; import { setTimeout as sleep } from "timers/promises"; import { io, type Socket } from "socket.io-client"; +const DateEnv = z + .string() + .transform((val) => new Date(parseInt(val, 10))) + .pipe(z.date()); + // All IDs are friendly IDs const Env = z.object({ // Set at build time @@ -50,6 +56,10 @@ const Env = z.object({ TRIGGER_RUNNER_ID: z.string(), TRIGGER_METADATA_URL: z.string().optional(), + // Timeline metrics + TRIGGER_POD_SCHEDULED_AT_MS: DateEnv, + TRIGGER_DEQUEUED_AT_MS: DateEnv, + // May be overridden TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), @@ -238,6 +248,14 @@ class ManagedRunController { if (!response.success) { console.error("[ManagedRunController] Heartbeat failed", { error: response.error }); + + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "heartbeat: failed", + properties: { + error: response.error, + }, + }); } }, intervalMs: this.heartbeatIntervalSeconds * 1000, @@ -620,6 +638,14 @@ class ManagedRunController { if (!continuationResult.success) { console.error("Failed to continue execution", { error: continuationResult.error }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "failed to continue execution", + properties: { + error: continuationResult.error, + }, + }); + this.waitForNextRun(); return; } @@ -734,10 +760,14 @@ class ManagedRunController { private async startAndExecuteRunAttempt({ runFriendlyId, snapshotFriendlyId, + dequeuedAt, + podScheduledAt, isWarmStart = false, }: { runFriendlyId: string; snapshotFriendlyId: string; + dequeuedAt?: Date; + podScheduledAt?: Date; isWarmStart?: boolean; }) { if (!this.socket) { @@ -749,6 +779,8 @@ class ManagedRunController { snapshot: { friendlyId: snapshotFriendlyId }, }); + const attemptStartedAt = Date.now(); + const start = await this.httpClient.startRunAttempt(runFriendlyId, snapshotFriendlyId, { isWarmStart, }); @@ -756,10 +788,20 @@ class ManagedRunController { if (!start.success) { console.error("[ManagedRunController] Failed to start run", { error: start.error }); + this.sendDebugLog({ + runId: runFriendlyId, + message: "failed to start run attempt", + properties: { + error: start.error, + }, + }); + this.waitForNextRun(); return; } + const attemptDuration = Date.now() - attemptStartedAt; + const { run, snapshot, execution, envVars } = start.data; logger.debug("[ManagedRunController] Started run", { @@ -767,21 +809,49 @@ class ManagedRunController { snapshot: snapshot.friendlyId, }); - // TODO: We may already be executing this run, this may be a new attempt - // This is the only case where incrementing the attempt number is allowed this.enterRunPhase(run, snapshot); + const metrics = [ + { + name: "start", + event: "create_attempt", + timestamp: attemptStartedAt, + duration: attemptDuration, + }, + ] + .concat( + dequeuedAt + ? [ + { + name: "start", + event: "dequeue", + timestamp: dequeuedAt.getTime(), + duration: 0, + }, + ] + : [] + ) + .concat( + podScheduledAt + ? [ + { + name: "start", + event: "pod_scheduled", + timestamp: podScheduledAt.getTime(), + duration: 0, + }, + ] + : [] + ) satisfies TaskRunExecutionMetrics; + const taskRunEnv = { ...gatherProcessEnv(), ...envVars, }; try { - return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution }); + return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution, metrics }); } catch (error) { - // TODO: Handle the case where we're in the warm start phase or executing a new run - // This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch - console.error("Error while executing attempt", { error, }); @@ -810,7 +880,13 @@ class ManagedRunController { error: completionResult.error, }); - // TODO: Maybe we should keep retrying for a while longer + this.sendDebugLog({ + runId: run.friendlyId, + message: "completion: failed to submit after error", + properties: { + error: completionResult.error, + }, + }); this.waitForNextRun(); return; @@ -923,6 +999,7 @@ class ManagedRunController { this.startAndExecuteRunAttempt({ runFriendlyId: nextRun.run.friendlyId, snapshotFriendlyId: nextRun.snapshot.friendlyId, + dequeuedAt: nextRun.dequeuedAt, isWarmStart: true, }).finally(() => {}); return; @@ -1032,7 +1109,10 @@ class ManagedRunController { snapshot, envVars, execution, - }: WorkloadRunAttemptStartResponseBody) { + metrics, + }: WorkloadRunAttemptStartResponseBody & { + metrics?: TaskRunExecutionMetrics; + }) { this.snapshotPoller.start(); if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) { @@ -1058,6 +1138,7 @@ class ManagedRunController { payload: { execution, traceContext: execution.run.traceContext ?? {}, + metrics, }, messageId: run.friendlyId, env: envVars, @@ -1096,6 +1177,14 @@ class ManagedRunController { error: completionResult.error, }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "completion: failed to submit", + properties: { + error: completionResult.error, + }, + }); + this.waitForNextRun(); return; } @@ -1212,6 +1301,8 @@ class ManagedRunController { this.startAndExecuteRunAttempt({ runFriendlyId: env.TRIGGER_RUN_ID, snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID, + dequeuedAt: env.TRIGGER_DEQUEUED_AT_MS, + podScheduledAt: env.TRIGGER_POD_SCHEDULED_AT_MS, }).finally(() => {}); return; }