Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/node/runtime/SSHRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const shescape = {
},
};

function logSSHBackoffWait(initLogger: InitLogger, waitMs: number): void {
const secs = Math.max(1, Math.ceil(waitMs / 1000));
initLogger.logStep(`SSH unavailable; retrying in ${secs}s...`);
}

// Re-export SSHRuntimeConfig from connection pool (defined there to avoid circular deps)
export type { SSHRuntimeConfig } from "./sshConnectionPool";

Expand Down Expand Up @@ -122,9 +127,11 @@ export class SSHRuntime implements Runtime {
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
}

// Ensure connection is healthy before executing
// This provides backoff protection and singleflighting for concurrent requests
await sshConnectionPool.acquireConnection(this.config);
// Ensure connection is healthy before executing.
// This provides backoff protection and singleflighting for concurrent requests.
await sshConnectionPool.acquireConnection(this.config, {
abortSignal: options.abortSignal,
});

// Build command parts
const parts: string[] = [];
Expand Down Expand Up @@ -437,7 +444,7 @@ export class SSHRuntime implements Runtime {
*/
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
// Ensure connection is healthy before executing
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
await sshConnectionPool.acquireConnection(this.config, { timeoutMs });

const sshArgs = this.buildSSHArgs();
sshArgs.push(this.config.host, command);
Expand Down Expand Up @@ -623,7 +630,13 @@ export class SSHRuntime implements Runtime {
initLogger.logStderr(`Could not get origin URL: ${getErrorMessage(error)}`);
}

// Step 2: Create bundle locally and pipe to remote file via SSH
// Step 2: Ensure the SSH host is reachable before doing expensive local work
await sshConnectionPool.acquireConnection(this.config, {
abortSignal,
onWait: (waitMs) => logSSHBackoffWait(initLogger, waitMs),
});

// Step 3: Create bundle locally and pipe to remote file via SSH
initLogger.logStep(`Creating git bundle...`);
await new Promise<void>((resolve, reject) => {
// Check if aborted before spawning
Expand Down Expand Up @@ -670,7 +683,7 @@ export class SSHRuntime implements Runtime {
});
});

// Step 3: Clone from bundle on remote using this.exec
// Step 4: Clone from bundle on remote using this.exec
initLogger.logStep(`Cloning repository on remote...`);

// Expand tilde in destination path for git clone
Expand All @@ -693,7 +706,7 @@ export class SSHRuntime implements Runtime {
throw new Error(`Failed to clone repository: ${cloneStderr || cloneStdout}`);
}

// Step 4: Create local tracking branches for all remote branches
// Step 5: Create local tracking branches for all remote branches
// This ensures that branch names like "custom-trunk" can be used directly
// in git checkout commands, rather than needing "origin/custom-trunk"
initLogger.logStep(`Creating local tracking branches...`);
Expand All @@ -708,7 +721,7 @@ export class SSHRuntime implements Runtime {
await createTrackingBranchesStream.exitCode;
// Don't fail if this fails - some branches may already exist

// Step 5: Update origin remote if we have an origin URL
// Step 6: Update origin remote if we have an origin URL
if (originUrl) {
initLogger.logStep(`Setting origin remote to ${originUrl}...`);
const setOriginStream = await this.exec(
Expand Down Expand Up @@ -740,7 +753,7 @@ export class SSHRuntime implements Runtime {
await removeOriginStream.exitCode;
}

// Step 5: Remove bundle file
// Step 7: Remove bundle file
initLogger.logStep(`Cleaning up bundle file...`);
const rmStream = await this.exec(`rm ${bundleTempPath}`, {
cwd: "~",
Expand Down
48 changes: 42 additions & 6 deletions src/node/runtime/sshConnectionPool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ describe("SSHConnectionPool", () => {
};

// Trigger a failure via acquireConnection (will fail to connect)
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
await expect(
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 })
).rejects.toThrow();

// Verify we're now in backoff
const healthBefore = pool.getConnectionHealth(config);
Expand Down Expand Up @@ -231,6 +233,38 @@ describe("SSHConnectionPool", () => {
expect(elapsed).toBeLessThan(50);
});

test("waits through backoff (bounded) instead of throwing", async () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
host: "test.example.com",
srcBaseDir: "/work",
};

// Put host into backoff without doing a real probe.
pool.reportFailure(config, "Connection refused");
expect(pool.getConnectionHealth(config)?.backoffUntil).toBeDefined();

const sleepCalls: number[] = [];
const onWaitCalls: number[] = [];

await pool.acquireConnection(config, {
onWait: (ms) => {
onWaitCalls.push(ms);
},
sleep: (ms) => {
sleepCalls.push(ms);
// Simulate time passing / recovery.
pool.markHealthy(config);
return Promise.resolve();
},
});

expect(sleepCalls.length).toBe(1);
expect(onWaitCalls.length).toBe(1);
expect(sleepCalls[0]).toBeGreaterThan(0);
expect(onWaitCalls[0]).toBe(sleepCalls[0]);
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");
});
test("throws immediately when in backoff", async () => {
const pool = new SSHConnectionPool();
const config: SSHRuntimeConfig = {
Expand All @@ -239,10 +273,12 @@ describe("SSHConnectionPool", () => {
};

// Trigger a failure to put connection in backoff
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
await expect(
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 })
).rejects.toThrow();

// Second call should throw immediately with backoff message
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
await expect(pool.acquireConnection(config, { maxWaitMs: 0 })).rejects.toThrow(/in backoff/);
});

test("getControlPath returns deterministic path", () => {
Expand Down Expand Up @@ -270,9 +306,9 @@ describe("SSHConnectionPool", () => {

// All concurrent calls should share the same probe and get same result
const results = await Promise.allSettled([
pool.acquireConnection(config, 1000),
pool.acquireConnection(config, 1000),
pool.acquireConnection(config, 1000),
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 }),
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 }),
pool.acquireConnection(config, { timeoutMs: 1000, maxWaitMs: 0 }),
]);

// All should be rejected (connection fails)
Expand Down
195 changes: 158 additions & 37 deletions src/node/runtime/sshConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,65 @@ const BACKOFF_SCHEDULE = [1, 5, 10, 20, 40, 60];
*/
const HEALTHY_TTL_MS = 15 * 1000; // 15 seconds

const DEFAULT_PROBE_TIMEOUT_MS = 10_000;
const DEFAULT_MAX_WAIT_MS = 2 * 60 * 1000; // 2 minutes

export interface AcquireConnectionOptions {
/** Timeout for the health check probe. */
timeoutMs?: number;

/**
* Max time to wait (ms) for a host to become healthy (waits + probes).
*
* - Omit to use the default (waits through backoff).
* - Set to 0 to fail fast.
*/
maxWaitMs?: number;

/** Optional abort signal to cancel any waiting. */
abortSignal?: AbortSignal;

/**
* Called when acquireConnection is waiting due to backoff.
*
* Useful for user-facing progress logs (e.g. workspace init).
*/
onWait?: (waitMs: number) => void;

/**
* Test seam.
*
* If provided, this is used for sleeping between wait cycles.
*/
sleep?: (ms: number, abortSignal?: AbortSignal) => Promise<void>;
}

async function sleepWithAbort(ms: number, abortSignal?: AbortSignal): Promise<void> {
if (ms <= 0) return;
if (abortSignal?.aborted) {
throw new Error("Operation aborted");
}

await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
resolve();
}, ms);

const onAbort = () => {
cleanup();
reject(new Error("Operation aborted"));
};

const cleanup = () => {
clearTimeout(timer);
abortSignal?.removeEventListener("abort", onAbort);
};

abortSignal?.addEventListener("abort", onAbort);
});
}

/**
* SSH Connection Pool
*
Expand All @@ -80,51 +139,113 @@ export class SSHConnectionPool {
/**
* Ensure connection is healthy before proceeding.
*
* @param config SSH configuration
* @param timeoutMs Timeout for health check probe (default: 10s)
* @throws Error if connection is in backoff or health check fails
* By default, acquireConnection waits through backoff (bounded) so user-facing
* actions don’t immediately fail during transient SSH outages.
*
* Callers can opt into fail-fast behavior by passing `{ maxWaitMs: 0 }`.
*/
async acquireConnection(config: SSHRuntimeConfig, timeoutMs = 10000): Promise<void> {
async acquireConnection(config: SSHRuntimeConfig, timeoutMs?: number): Promise<void>;
async acquireConnection(
config: SSHRuntimeConfig,
options?: AcquireConnectionOptions
): Promise<void>;
async acquireConnection(
config: SSHRuntimeConfig,
timeoutMsOrOptions: number | AcquireConnectionOptions = DEFAULT_PROBE_TIMEOUT_MS
): Promise<void> {
const options: AcquireConnectionOptions =
typeof timeoutMsOrOptions === "number"
? { timeoutMs: timeoutMsOrOptions }
: (timeoutMsOrOptions ?? {});

const timeoutMs = options.timeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS;
const sleep = options.sleep ?? sleepWithAbort;

const maxWaitMs = options.maxWaitMs ?? DEFAULT_MAX_WAIT_MS;
const shouldWait = maxWaitMs > 0;

const key = makeConnectionKey(config);
const health = this.health.get(key);
const startTime = Date.now();

// Check if in backoff
if (health?.backoffUntil && health.backoffUntil > new Date()) {
const remainingSecs = Math.ceil((health.backoffUntil.getTime() - Date.now()) / 1000);
throw new Error(
`SSH connection to ${config.host} is in backoff for ${remainingSecs}s. ` +
`Last error: ${health.lastError ?? "unknown"}`
);
}
while (true) {
if (options.abortSignal?.aborted) {
throw new Error("Operation aborted");
}

// Return immediately if known healthy and not stale
if (health?.status === "healthy") {
const age = Date.now() - (health.lastSuccess?.getTime() ?? 0);
if (age < HEALTHY_TTL_MS) {
log.debug(`SSH connection to ${config.host} is known healthy, skipping probe`);
return;
const health = this.health.get(key);

// If in backoff: either fail fast or wait (bounded).
if (health?.backoffUntil && health.backoffUntil > new Date()) {
const remainingMs = health.backoffUntil.getTime() - Date.now();
const remainingSecs = Math.ceil(remainingMs / 1000);

if (!shouldWait) {
throw new Error(
`SSH connection to ${config.host} is in backoff for ${remainingSecs}s. ` +
`Last error: ${health.lastError ?? "unknown"}`
);
}

const elapsedMs = Date.now() - startTime;
const budgetMs = Math.max(0, maxWaitMs - elapsedMs);
if (budgetMs <= 0) {
throw new Error(
`SSH connection to ${config.host} did not become healthy within ${maxWaitMs}ms. ` +
`Last error: ${health.lastError ?? "unknown"}`
);
}

const waitMs = Math.min(remainingMs, budgetMs);
options.onWait?.(waitMs);
await sleep(waitMs, options.abortSignal);
continue;
}
log.debug(
`SSH connection to ${config.host} health is stale (${Math.round(age / 1000)}s), re-probing`
);
}

// Check for inflight probe - singleflighting
const existing = this.inflight.get(key);
if (existing) {
log.debug(`SSH connection to ${config.host} has inflight probe, waiting...`);
return existing;
}
// Return immediately if known healthy and not stale.
if (health?.status === "healthy") {
const age = Date.now() - (health.lastSuccess?.getTime() ?? 0);
if (age < HEALTHY_TTL_MS) {
log.debug(`SSH connection to ${config.host} is known healthy, skipping probe`);
return;
}
log.debug(
`SSH connection to ${config.host} health is stale (${Math.round(age / 1000)}s), re-probing`
);
}

// Start new probe
log.debug(`SSH connection to ${config.host} needs probe, starting health check`);
const probe = this.probeConnection(config, timeoutMs, key);
this.inflight.set(key, probe);
// Check for inflight probe - singleflighting.
const existing = this.inflight.get(key);
if (existing) {
log.debug(`SSH connection to ${config.host} has inflight probe, waiting...`);
try {
await existing;
return;
} catch (error) {
// Probe failed; if we're in wait mode we'll loop and sleep through the backoff.
if (!shouldWait) {
throw error;
}
continue;
}
}

// Start new probe.
log.debug(`SSH connection to ${config.host} needs probe, starting health check`);
const probe = this.probeConnection(config, timeoutMs, key);
this.inflight.set(key, probe);

try {
await probe;
} finally {
this.inflight.delete(key);
try {
await probe;
return;
} catch (error) {
if (!shouldWait) {
throw error;
}
// In wait mode: probeConnection() recorded backoff; loop and wait.
continue;
} finally {
this.inflight.delete(key);
}
}
}

Expand Down