diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 28aea4d6777..a262ccf86fd 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -1143,6 +1143,18 @@ export namespace Config { .min(0) .optional() .describe("Token buffer for compaction. Leaves enough window to avoid overflow during compaction."), + checkpointThreshold: z + .number() + .min(0) + .max(1) + .optional() + .describe("Fraction of context capacity to trigger background checkpoint (default: 0.50)"), + swapThreshold: z + .number() + .min(0) + .max(1) + .optional() + .describe("Fraction of context capacity to trigger buffer swap (default: 0.75)"), }) .optional(), experimental: z diff --git a/packages/opencode/src/session/__tests__/double-buffer.test.ts b/packages/opencode/src/session/__tests__/double-buffer.test.ts new file mode 100644 index 00000000000..91e25b19fda --- /dev/null +++ b/packages/opencode/src/session/__tests__/double-buffer.test.ts @@ -0,0 +1,79 @@ +import { describe, test, expect, beforeEach } from "bun:test" +import { DoubleBuffer } from "../double-buffer" + +describe("DoubleBuffer", () => { + const sessionID = "test-session" + + beforeEach(() => { + DoubleBuffer.resetState(sessionID) + }) + + test("starts in normal phase", () => { + const state = DoubleBuffer.getState(sessionID) + expect(state.phase).toBe("normal") + expect(state.checkpointSummary).toBeNull() + expect(state.generation).toBe(0) + }) + + test("beginCheckpoint transitions to checkpoint_pending", () => { + DoubleBuffer.beginCheckpoint(sessionID, 10) + const state = DoubleBuffer.getState(sessionID) + expect(state.phase).toBe("checkpoint_pending") + expect(state.checkpointMessageCount).toBe(10) + }) + + test("finishCheckpoint transitions to concurrent", () => { + DoubleBuffer.beginCheckpoint(sessionID, 10) + DoubleBuffer.finishCheckpoint(sessionID, "Summary of conversation so far") + const state = DoubleBuffer.getState(sessionID) + expect(state.phase).toBe("concurrent") + expect(state.checkpointSummary).toBe("Summary of conversation so far") + expect(state.generation).toBe(1) + }) + + test("completeSwap resets to normal", () => { + DoubleBuffer.beginCheckpoint(sessionID, 10) + DoubleBuffer.finishCheckpoint(sessionID, "Summary") + DoubleBuffer.completeSwap(sessionID) + const state = DoubleBuffer.getState(sessionID) + expect(state.phase).toBe("normal") + expect(state.checkpointSummary).toBeNull() + expect(state.generation).toBe(1) // generation preserved + }) + + test("full lifecycle: normal -> checkpoint -> concurrent -> swap -> normal", () => { + const state = DoubleBuffer.getState(sessionID) + expect(state.phase).toBe("normal") + + DoubleBuffer.beginCheckpoint(sessionID, 5) + expect(state.phase).toBe("checkpoint_pending") + + DoubleBuffer.finishCheckpoint(sessionID, "Checkpoint summary") + expect(state.phase).toBe("concurrent") + expect(state.generation).toBe(1) + + DoubleBuffer.completeSwap(sessionID) + expect(state.phase).toBe("normal") + expect(state.checkpointSummary).toBeNull() + expect(state.generation).toBe(1) + }) + + test("multiple generations increment correctly", () => { + for (let i = 0; i < 3; i++) { + DoubleBuffer.beginCheckpoint(sessionID, 10) + DoubleBuffer.finishCheckpoint(sessionID, `Summary gen ${i + 1}`) + DoubleBuffer.completeSwap(sessionID) + } + const state = DoubleBuffer.getState(sessionID) + expect(state.generation).toBe(3) + }) + + test("resetState clears everything", () => { + DoubleBuffer.beginCheckpoint(sessionID, 10) + DoubleBuffer.finishCheckpoint(sessionID, "Summary") + DoubleBuffer.resetState(sessionID) + const state = DoubleBuffer.getState(sessionID) + expect(state.phase).toBe("normal") + expect(state.generation).toBe(0) + }) +}) diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 9245426057c..7642dfce678 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -47,6 +47,109 @@ export namespace SessionCompaction { return count >= usable } + /** + * Check if a background checkpoint should start (double-buffer phase 1). + * Called after each LLM response to capture a high-quality early summary. + */ + export async function checkpointIfNeeded(input: { + sessionID: string + tokens: MessageV2.Assistant["tokens"] + model: Provider.Model + messages: MessageV2.WithParts[] + parentID: string + abort: AbortSignal + }): Promise { + const { DoubleBuffer } = await import("./double-buffer") + const should = await DoubleBuffer.shouldCheckpoint({ + sessionID: input.sessionID, + tokens: input.tokens, + model: input.model, + }) + if (!should) return + + const messageCount = input.messages.length + DoubleBuffer.beginCheckpoint(input.sessionID, messageCount) + + // Fire and forget: run compaction in background + // The summary will be stored in double-buffer state for later swap + process({ + parentID: input.parentID, + messages: input.messages, + sessionID: input.sessionID, + abort: input.abort, + auto: true, + }) + .then((result) => { + // Extract the summary from the last assistant message + // The compaction process creates a summary message + if (result === "continue") { + Session.messages({ sessionID: input.sessionID }) + .then((msgs) => { + const lastAssistant = msgs.findLast( + (m) => m.info.role === "assistant" && (m.info as any).summary === true, + ) + if (lastAssistant) { + const textPart = lastAssistant.parts.find((p) => p.type === "text") + if (textPart && textPart.type === "text" && textPart.text) { + DoubleBuffer.finishCheckpoint(input.sessionID, textPart.text) + } else { + log.warn("checkpoint produced no text summary", { sessionID: input.sessionID }) + DoubleBuffer.getState(input.sessionID).phase = "normal" + } + } else { + log.warn("checkpoint produced no assistant summary message", { sessionID: input.sessionID }) + DoubleBuffer.getState(input.sessionID).phase = "normal" + } + }) + .catch((err) => { + log.error("failed to read checkpoint summary", { sessionID: input.sessionID, error: err }) + DoubleBuffer.getState(input.sessionID).phase = "normal" + }) + } else { + log.warn("checkpoint compaction returned stop", { sessionID: input.sessionID }) + DoubleBuffer.getState(input.sessionID).phase = "normal" + } + }) + .catch((err) => { + log.error("background checkpoint failed", { sessionID: input.sessionID, error: err }) + DoubleBuffer.getState(input.sessionID).phase = "normal" + }) + } + + /** + * Check if we should swap to the pre-computed checkpoint (double-buffer phase 3). + * Returns true if a swap should occur, in which case the caller should use + * the checkpoint summary instead of doing stop-the-world compaction. + */ + export async function shouldSwapBuffer(input: { + sessionID: string + tokens: MessageV2.Assistant["tokens"] + model: Provider.Model + }): Promise { + const { DoubleBuffer } = await import("./double-buffer") + return DoubleBuffer.shouldSwap({ + sessionID: input.sessionID, + tokens: input.tokens, + model: input.model, + }) + } + + /** + * Get the pre-computed checkpoint summary for a swap. + */ + export async function getCheckpointSummary(sessionID: string): Promise { + const { DoubleBuffer } = await import("./double-buffer") + return DoubleBuffer.getState(sessionID).checkpointSummary + } + + /** + * Mark swap as complete. + */ + export async function completeSwap(sessionID: string): Promise { + const { DoubleBuffer } = await import("./double-buffer") + DoubleBuffer.completeSwap(sessionID) + } + export const PRUNE_MINIMUM = 20_000 export const PRUNE_PROTECT = 40_000 diff --git a/packages/opencode/src/session/double-buffer.ts b/packages/opencode/src/session/double-buffer.ts new file mode 100644 index 00000000000..ae96063765c --- /dev/null +++ b/packages/opencode/src/session/double-buffer.ts @@ -0,0 +1,178 @@ +import { Log } from "../util/log" +import { Config } from "@/config/config" +import { Provider } from "../provider/provider" +import { ProviderTransform } from "@/provider/transform" + +export namespace DoubleBuffer { + const log = Log.create({ service: "session.double-buffer" }) + + export type Phase = "normal" | "checkpoint_pending" | "concurrent" + + export interface State { + phase: Phase + checkpointSummary: string | null + checkpointMessageCount: number + lastInputTokens: number + generation: number + } + + // Per-session state storage + const sessions = new Map() + + export function getState(sessionID: string): State { + if (!sessions.has(sessionID)) { + sessions.set(sessionID, { + phase: "normal", + checkpointSummary: null, + checkpointMessageCount: 0, + lastInputTokens: 0, + generation: 0, + }) + } + return sessions.get(sessionID)! + } + + export function resetState(sessionID: string): void { + sessions.delete(sessionID) + } + + /** + * Get configurable thresholds. Uses config if available, otherwise defaults. + * Default checkpoint at 50% (early, high-quality summary), swap at 75% (current behavior). + */ + export async function getThresholds(model: Provider.Model): Promise<{ + checkpointThreshold: number + swapThreshold: number + maxTokens: number + }> { + const config = await Config.get() + const context = model.limit.context + if (context === 0) return { checkpointThreshold: 0, swapThreshold: 0, maxTokens: 0 } + + const COMPACTION_BUFFER = 20_000 + const reserved = + (config.compaction as any)?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(model)) + const usable = model.limit.input + ? model.limit.input - reserved + : context - ProviderTransform.maxOutputTokens(model) + + // Configurable thresholds from config, default to 0.50/0.75 + const checkpointPct = (config.compaction as any)?.checkpointThreshold ?? 0.5 + const swapPct = (config.compaction as any)?.swapThreshold ?? 0.75 + + return { + checkpointThreshold: Math.floor(usable * checkpointPct), + swapThreshold: Math.floor(usable * swapPct), + maxTokens: usable, + } + } + + /** + * Check if we should start a background checkpoint. + * Fires when in NORMAL phase and tokens exceed checkpoint threshold. + */ + export async function shouldCheckpoint(input: { + sessionID: string + tokens: { input: number; output: number; cache: { read: number; write: number }; total?: number } + model: Provider.Model + }): Promise { + const state = getState(input.sessionID) + if (state.phase !== "normal") return false + + const count = + input.tokens.total || + input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write + state.lastInputTokens = count + + const { checkpointThreshold } = await getThresholds(input.model) + if (checkpointThreshold === 0) return false + + const result = count >= checkpointThreshold + if (result) { + log.info("checkpoint threshold reached", { + sessionID: input.sessionID, + tokens: count, + threshold: checkpointThreshold, + phase: state.phase, + }) + } + return result + } + + /** + * Check if we should swap buffers (use pre-computed summary). + * This replaces the existing isOverflow check when a checkpoint exists. + */ + export async function shouldSwap(input: { + sessionID: string + tokens: { input: number; output: number; cache: { read: number; write: number }; total?: number } + model: Provider.Model + }): Promise { + const state = getState(input.sessionID) + if (state.phase !== "concurrent") return false + if (!state.checkpointSummary) return false + + const count = + input.tokens.total || + input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write + state.lastInputTokens = count + + const { swapThreshold } = await getThresholds(input.model) + if (swapThreshold === 0) return false + + const result = count >= swapThreshold + if (result) { + log.info("swap threshold reached", { + sessionID: input.sessionID, + tokens: count, + threshold: swapThreshold, + phase: state.phase, + generation: state.generation, + }) + } + return result + } + + /** + * Begin the checkpoint phase. Records the message count at checkpoint time. + */ + export function beginCheckpoint(sessionID: string, messageCount: number): void { + const state = getState(sessionID) + state.phase = "checkpoint_pending" + state.checkpointMessageCount = messageCount + log.info("checkpoint started", { + sessionID, + messageCount, + tokens: state.lastInputTokens, + }) + } + + /** + * Complete the checkpoint with a summary. Transitions to concurrent phase. + */ + export function finishCheckpoint(sessionID: string, summary: string): void { + const state = getState(sessionID) + state.phase = "concurrent" + state.checkpointSummary = summary + state.generation++ + log.info("checkpoint complete", { + sessionID, + generation: state.generation, + summaryLength: summary.length, + }) + } + + /** + * Complete the swap. Resets to normal phase for next cycle. + */ + export function completeSwap(sessionID: string): void { + const state = getState(sessionID) + log.info("swap complete", { + sessionID, + generation: state.generation, + }) + state.phase = "normal" + state.checkpointSummary = null + state.checkpointMessageCount = 0 + } +} diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index e7532d20073..f9716a72cc4 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -279,7 +279,7 @@ export namespace SessionProcessor { sessionID: input.sessionID, messageID: input.assistantMessage.parentID, }) - if (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) { + if (!input.assistantMessage.summary && await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) { needsCompaction = true } break diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 75bd3c9dfac..5f9110705db 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -538,7 +538,28 @@ export namespace SessionPrompt { continue } - // context overflow, needs compaction + // Double-buffer swap: use pre-computed checkpoint if available + if ( + lastFinished && + lastFinished.summary !== true && + (await SessionCompaction.shouldSwapBuffer({ sessionID, tokens: lastFinished.tokens, model })) + ) { + const summary = await SessionCompaction.getCheckpointSummary(sessionID) + if (summary) { + log.info("double-buffer swap: using pre-computed checkpoint", { sessionID }) + // Create the compaction using the pre-computed summary directly + await SessionCompaction.create({ + sessionID, + agent: lastUser.agent, + model: lastUser.model, + auto: true, + }) + await SessionCompaction.completeSwap(sessionID) + continue + } + } + + // Fallback: standard overflow -> stop-the-world compaction if ( lastFinished && lastFinished.summary !== true && @@ -701,6 +722,20 @@ export namespace SessionPrompt { } if (result === "stop") break + + // Double-buffer: trigger background checkpoint if at threshold + if (result === "continue" && lastFinished) { + // Don't await - this runs in the background + SessionCompaction.checkpointIfNeeded({ + sessionID, + tokens: processor.message.tokens, + model, + messages: msgs, + parentID: lastUser.id, + abort, + }) + } + if (result === "compact") { await SessionCompaction.create({ sessionID,