Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/opencode/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,18 @@ export namespace Config {
.min(0)
.optional()
.describe("Token buffer for compaction. Leaves enough window to avoid overflow during compaction."),
checkpointThreshold: z
.number()
.min(0)
.max(1)
.optional()
.describe("Fraction of context capacity to trigger background checkpoint (default: 0.50)"),
swapThreshold: z
.number()
.min(0)
.max(1)
.optional()
.describe("Fraction of context capacity to trigger buffer swap (default: 0.75)"),
})
.optional(),
experimental: z
Expand Down
79 changes: 79 additions & 0 deletions packages/opencode/src/session/__tests__/double-buffer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { describe, test, expect, beforeEach } from "bun:test"
import { DoubleBuffer } from "../double-buffer"

describe("DoubleBuffer", () => {
const sessionID = "test-session"

beforeEach(() => {
DoubleBuffer.resetState(sessionID)
})

test("starts in normal phase", () => {
const state = DoubleBuffer.getState(sessionID)
expect(state.phase).toBe("normal")
expect(state.checkpointSummary).toBeNull()
expect(state.generation).toBe(0)
})

test("beginCheckpoint transitions to checkpoint_pending", () => {
DoubleBuffer.beginCheckpoint(sessionID, 10)
const state = DoubleBuffer.getState(sessionID)
expect(state.phase).toBe("checkpoint_pending")
expect(state.checkpointMessageCount).toBe(10)
})

test("finishCheckpoint transitions to concurrent", () => {
DoubleBuffer.beginCheckpoint(sessionID, 10)
DoubleBuffer.finishCheckpoint(sessionID, "Summary of conversation so far")
const state = DoubleBuffer.getState(sessionID)
expect(state.phase).toBe("concurrent")
expect(state.checkpointSummary).toBe("Summary of conversation so far")
expect(state.generation).toBe(1)
})

test("completeSwap resets to normal", () => {
DoubleBuffer.beginCheckpoint(sessionID, 10)
DoubleBuffer.finishCheckpoint(sessionID, "Summary")
DoubleBuffer.completeSwap(sessionID)
const state = DoubleBuffer.getState(sessionID)
expect(state.phase).toBe("normal")
expect(state.checkpointSummary).toBeNull()
expect(state.generation).toBe(1) // generation preserved
})

test("full lifecycle: normal -> checkpoint -> concurrent -> swap -> normal", () => {
const state = DoubleBuffer.getState(sessionID)
expect(state.phase).toBe("normal")

DoubleBuffer.beginCheckpoint(sessionID, 5)
expect(state.phase).toBe("checkpoint_pending")

DoubleBuffer.finishCheckpoint(sessionID, "Checkpoint summary")
expect(state.phase).toBe("concurrent")
expect(state.generation).toBe(1)

DoubleBuffer.completeSwap(sessionID)
expect(state.phase).toBe("normal")
expect(state.checkpointSummary).toBeNull()
expect(state.generation).toBe(1)
})

test("multiple generations increment correctly", () => {
for (let i = 0; i < 3; i++) {
DoubleBuffer.beginCheckpoint(sessionID, 10)
DoubleBuffer.finishCheckpoint(sessionID, `Summary gen ${i + 1}`)
DoubleBuffer.completeSwap(sessionID)
}
const state = DoubleBuffer.getState(sessionID)
expect(state.generation).toBe(3)
})

test("resetState clears everything", () => {
DoubleBuffer.beginCheckpoint(sessionID, 10)
DoubleBuffer.finishCheckpoint(sessionID, "Summary")
DoubleBuffer.resetState(sessionID)
const state = DoubleBuffer.getState(sessionID)
expect(state.phase).toBe("normal")
expect(state.generation).toBe(0)
})
})
103 changes: 103 additions & 0 deletions packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,109 @@ export namespace SessionCompaction {
return count >= usable
}

/**
* Check if a background checkpoint should start (double-buffer phase 1).
* Called after each LLM response to capture a high-quality early summary.
*/
export async function checkpointIfNeeded(input: {
sessionID: string
tokens: MessageV2.Assistant["tokens"]
model: Provider.Model
messages: MessageV2.WithParts[]
parentID: string
abort: AbortSignal
}): Promise<void> {
const { DoubleBuffer } = await import("./double-buffer")
const should = await DoubleBuffer.shouldCheckpoint({
sessionID: input.sessionID,
tokens: input.tokens,
model: input.model,
})
if (!should) return

const messageCount = input.messages.length
DoubleBuffer.beginCheckpoint(input.sessionID, messageCount)

// Fire and forget: run compaction in background
// The summary will be stored in double-buffer state for later swap
process({
parentID: input.parentID,
messages: input.messages,
sessionID: input.sessionID,
abort: input.abort,
auto: true,
})
.then((result) => {
// Extract the summary from the last assistant message
// The compaction process creates a summary message
if (result === "continue") {
Session.messages({ sessionID: input.sessionID })
.then((msgs) => {
const lastAssistant = msgs.findLast(
(m) => m.info.role === "assistant" && (m.info as any).summary === true,
)
if (lastAssistant) {
const textPart = lastAssistant.parts.find((p) => p.type === "text")
if (textPart && textPart.type === "text" && textPart.text) {
DoubleBuffer.finishCheckpoint(input.sessionID, textPart.text)
} else {
log.warn("checkpoint produced no text summary", { sessionID: input.sessionID })
DoubleBuffer.getState(input.sessionID).phase = "normal"
}
} else {
log.warn("checkpoint produced no assistant summary message", { sessionID: input.sessionID })
DoubleBuffer.getState(input.sessionID).phase = "normal"
}
})
.catch((err) => {
log.error("failed to read checkpoint summary", { sessionID: input.sessionID, error: err })
DoubleBuffer.getState(input.sessionID).phase = "normal"
})
} else {
log.warn("checkpoint compaction returned stop", { sessionID: input.sessionID })
DoubleBuffer.getState(input.sessionID).phase = "normal"
}
})
.catch((err) => {
log.error("background checkpoint failed", { sessionID: input.sessionID, error: err })
DoubleBuffer.getState(input.sessionID).phase = "normal"
})
}

/**
* Check if we should swap to the pre-computed checkpoint (double-buffer phase 3).
* Returns true if a swap should occur, in which case the caller should use
* the checkpoint summary instead of doing stop-the-world compaction.
*/
export async function shouldSwapBuffer(input: {
sessionID: string
tokens: MessageV2.Assistant["tokens"]
model: Provider.Model
}): Promise<boolean> {
const { DoubleBuffer } = await import("./double-buffer")
return DoubleBuffer.shouldSwap({
sessionID: input.sessionID,
tokens: input.tokens,
model: input.model,
})
}

/**
* Get the pre-computed checkpoint summary for a swap.
*/
export async function getCheckpointSummary(sessionID: string): Promise<string | null> {
const { DoubleBuffer } = await import("./double-buffer")
return DoubleBuffer.getState(sessionID).checkpointSummary
}

/**
* Mark swap as complete.
*/
export async function completeSwap(sessionID: string): Promise<void> {
const { DoubleBuffer } = await import("./double-buffer")
DoubleBuffer.completeSwap(sessionID)
}

export const PRUNE_MINIMUM = 20_000
export const PRUNE_PROTECT = 40_000

Expand Down
178 changes: 178 additions & 0 deletions packages/opencode/src/session/double-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { Log } from "../util/log"
import { Config } from "@/config/config"
import { Provider } from "../provider/provider"
import { ProviderTransform } from "@/provider/transform"

export namespace DoubleBuffer {
const log = Log.create({ service: "session.double-buffer" })

export type Phase = "normal" | "checkpoint_pending" | "concurrent"

export interface State {
phase: Phase
checkpointSummary: string | null
checkpointMessageCount: number
lastInputTokens: number
generation: number
}

// Per-session state storage
const sessions = new Map<string, State>()

export function getState(sessionID: string): State {
if (!sessions.has(sessionID)) {
sessions.set(sessionID, {
phase: "normal",
checkpointSummary: null,
checkpointMessageCount: 0,
lastInputTokens: 0,
generation: 0,
})
}
return sessions.get(sessionID)!
}

export function resetState(sessionID: string): void {
sessions.delete(sessionID)
}

/**
* Get configurable thresholds. Uses config if available, otherwise defaults.
* Default checkpoint at 50% (early, high-quality summary), swap at 75% (current behavior).
*/
export async function getThresholds(model: Provider.Model): Promise<{
checkpointThreshold: number
swapThreshold: number
maxTokens: number
}> {
const config = await Config.get()
const context = model.limit.context
if (context === 0) return { checkpointThreshold: 0, swapThreshold: 0, maxTokens: 0 }

const COMPACTION_BUFFER = 20_000
const reserved =
(config.compaction as any)?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(model))
const usable = model.limit.input
? model.limit.input - reserved
: context - ProviderTransform.maxOutputTokens(model)

// Configurable thresholds from config, default to 0.50/0.75
const checkpointPct = (config.compaction as any)?.checkpointThreshold ?? 0.5
const swapPct = (config.compaction as any)?.swapThreshold ?? 0.75

return {
checkpointThreshold: Math.floor(usable * checkpointPct),
swapThreshold: Math.floor(usable * swapPct),
maxTokens: usable,
}
}

/**
* Check if we should start a background checkpoint.
* Fires when in NORMAL phase and tokens exceed checkpoint threshold.
*/
export async function shouldCheckpoint(input: {
sessionID: string
tokens: { input: number; output: number; cache: { read: number; write: number }; total?: number }
model: Provider.Model
}): Promise<boolean> {
const state = getState(input.sessionID)
if (state.phase !== "normal") return false

const count =
input.tokens.total ||
input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
state.lastInputTokens = count

const { checkpointThreshold } = await getThresholds(input.model)
if (checkpointThreshold === 0) return false

const result = count >= checkpointThreshold
if (result) {
log.info("checkpoint threshold reached", {
sessionID: input.sessionID,
tokens: count,
threshold: checkpointThreshold,
phase: state.phase,
})
}
return result
}

/**
* Check if we should swap buffers (use pre-computed summary).
* This replaces the existing isOverflow check when a checkpoint exists.
*/
export async function shouldSwap(input: {
sessionID: string
tokens: { input: number; output: number; cache: { read: number; write: number }; total?: number }
model: Provider.Model
}): Promise<boolean> {
const state = getState(input.sessionID)
if (state.phase !== "concurrent") return false
if (!state.checkpointSummary) return false

const count =
input.tokens.total ||
input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
state.lastInputTokens = count

const { swapThreshold } = await getThresholds(input.model)
if (swapThreshold === 0) return false

const result = count >= swapThreshold
if (result) {
log.info("swap threshold reached", {
sessionID: input.sessionID,
tokens: count,
threshold: swapThreshold,
phase: state.phase,
generation: state.generation,
})
}
return result
}

/**
* Begin the checkpoint phase. Records the message count at checkpoint time.
*/
export function beginCheckpoint(sessionID: string, messageCount: number): void {
const state = getState(sessionID)
state.phase = "checkpoint_pending"
state.checkpointMessageCount = messageCount
log.info("checkpoint started", {
sessionID,
messageCount,
tokens: state.lastInputTokens,
})
}

/**
* Complete the checkpoint with a summary. Transitions to concurrent phase.
*/
export function finishCheckpoint(sessionID: string, summary: string): void {
const state = getState(sessionID)
state.phase = "concurrent"
state.checkpointSummary = summary
state.generation++
log.info("checkpoint complete", {
sessionID,
generation: state.generation,
summaryLength: summary.length,
})
}

/**
* Complete the swap. Resets to normal phase for next cycle.
*/
export function completeSwap(sessionID: string): void {
const state = getState(sessionID)
log.info("swap complete", {
sessionID,
generation: state.generation,
})
state.phase = "normal"
state.checkpointSummary = null
state.checkpointMessageCount = 0
}
}
2 changes: 1 addition & 1 deletion packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ export namespace SessionProcessor {
sessionID: input.sessionID,
messageID: input.assistantMessage.parentID,
})
if (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) {
if (!input.assistantMessage.summary && await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model })) {
needsCompaction = true
}
break
Expand Down
Loading
Loading