Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 212 additions & 1 deletion apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,9 @@ describe("ProviderCommandReactor", () => {

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(thread?.session).toBeNull();
expect(thread?.session).not.toBeNull();
expect(thread?.session?.status).toBe("stopped");
expect(thread?.session?.lastError).toContain("Provider turn start failed");
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
Expand Down Expand Up @@ -1129,13 +1131,36 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.threadId).toBe("thread-1");
expect(thread?.session?.providerName).toBe("codex");
expect(thread?.session?.runtimeMode).toBe("approval-required");
expect(thread?.session?.status).toBe("error");
expect(thread?.session?.lastError).toContain("Provider turn start failed");
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
payload: {
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
},
});

// Verify recovery: a subsequent turn with the correct provider succeeds.
await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-provider-switch-recovery"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-provider-switch-recovery"),
role: "user",
text: "recover",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.sendTurn.mock.calls.length === 2);
expect(harness.sendTurn.mock.calls.length).toBe(2);
});

it("does not stop the active session when restart fails before rebind", async () => {
Expand Down Expand Up @@ -1570,4 +1595,190 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.threadId).toBe("thread-1");
expect(thread?.session?.activeTurnId).toBeNull();
});

it("sets session to error when sendTurn fails with an existing session", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-err-session-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-err-session-1"),
role: "user",
text: "first",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.sendTurn.mock.calls.length === 1);

harness.sendTurn.mockImplementationOnce(
() =>
Effect.fail(
new ProviderAdapterRequestError({
provider: "codex",
method: "sendTurn",
detail: "simulated failure",
}),
) as never,
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-err-session-2"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-err-session-2"),
role: "user",
text: "second",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return thread?.session?.status === "error";
});

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(thread?.session?.status).toBe("error");
expect(thread?.session?.lastError).toContain("Provider turn start failed");
expect(thread?.session?.activeTurnId).toBeNull();
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toBeDefined();
});

it("sets session to stopped with lastError when turn start fails without a prior session", async () => {
const harness = await createHarness({
threadModelSelection: { provider: "codex", model: "gpt-5-codex" },
});
const now = new Date().toISOString();

// Request a provider that conflicts with the thread — fails before
// a session is ever created.
await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-no-session"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-no-session"),
role: "user",
text: "hello",
attachments: [],
},
modelSelection: {
provider: "claudeAgent",
model: "claude-opus-4-6",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
// "stopped" — not "error" — because no real provider binding exists.
return thread?.session?.status === "stopped";
});

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(thread?.session).not.toBeNull();
expect(thread?.session?.status).toBe("stopped");
expect(thread?.session?.lastError).toContain("Provider turn start failed");
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toBeDefined();
});

it("uses stopped status when turn fails after a previously stopped session", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

// Establish and then stop a session.
await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-stopped-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-stopped-1"),
role: "user",
text: "first",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);
await waitFor(() => harness.sendTurn.mock.calls.length === 1);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.session.stop",
commandId: CommandId.make("cmd-session-stop-for-rebind"),
threadId: ThreadId.make("thread-1"),
createdAt: now,
}),
);
await waitFor(() => harness.stopSession.mock.calls.length === 1);

// Make the next startSession fail so ensureSessionForThread fails
// before a new binding is created.
harness.startSession.mockImplementationOnce(
(_: unknown, __: unknown) => Effect.fail(new Error("simulated start failure")) as never,
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-stopped-2"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-stopped-2"),
role: "user",
text: "second",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return (
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
false
);
});

const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
// Must be "stopped" — not "error" — because the previous session was
// already stopped and no new binding was created before the failure.
expect(thread?.session?.status).toBe("stopped");
expect(thread?.session?.lastError).toContain("Provider turn start failed");
});
});
50 changes: 43 additions & 7 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,49 @@ const make = Effect.gen(function* () {
createdAt: event.payload.createdAt,
}).pipe(
Effect.catchCause((cause) =>
appendProviderFailureActivity({
threadId: event.payload.threadId,
kind: "provider.turn.start.failed",
summary: "Provider turn start failed",
detail: Cause.pretty(cause),
turnId: null,
createdAt: event.payload.createdAt,
Effect.gen(function* () {
const errorDetail = Cause.pretty(cause);
yield* appendProviderFailureActivity({
threadId: event.payload.threadId,
kind: "provider.turn.start.failed",
summary: "Provider turn start failed",
detail: errorDetail,
turnId: null,
createdAt: event.payload.createdAt,
});
// Transition the thread session so the client can detect the
// failure. Without this update isSendBusy stays true and the
// "Working for …" indicator hangs indefinitely.
//
// When a real provider binding exists, mark the session as
// "error" so the binding is preserved for potential recovery.
// When no session was bound yet (e.g. provider mismatch before
// session start), use "stopped" to avoid creating a synthetic
// session that looks bound without a backing ProviderService
// directory entry — ensureSessionForThread checks
// `status !== "stopped"` to decide whether to reuse a session.
const currentThread = yield* resolveThread(event.payload.threadId);
// A session record with status "stopped" has no backing
// ProviderService binding — treat it the same as no session.
// This mirrors the guard in ensureSessionForThread (line ~293).
const hasActiveSession =
currentThread?.session != null && currentThread.session.status !== "stopped";
const baseSession = currentThread?.session ?? {
threadId: event.payload.threadId,
providerName: thread.modelSelection.provider,
runtimeMode: thread.runtimeMode,
};
yield* setThreadSession({
threadId: event.payload.threadId,
session: {
...baseSession,
status: hasActiveSession ? "error" : "stopped",
activeTurnId: null,
lastError: `Provider turn start failed: ${errorDetail}`,
updatedAt: event.payload.createdAt,
},
createdAt: event.payload.createdAt,
});
}),
),
);
Expand Down
Loading