Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions packages/opencode/src/cli/cmd/tui/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ async function getTerminalBackgroundColor(): Promise<"dark" | "light"> {
})
}

export function tui(input: { url: string; args: Args; directory?: string; onExit?: () => Promise<void> }) {
import type { EventSource } from "./context/sdk"

export function tui(input: {
url: string
args: Args
directory?: string
fetch?: typeof fetch
events?: EventSource
onExit?: () => Promise<void>
}) {
// promise to prevent immediate exit
return new Promise<void>(async (resolve) => {
const mode = await getTerminalBackgroundColor()
Expand All @@ -117,7 +126,12 @@ export function tui(input: { url: string; args: Args; directory?: string; onExit
<KVProvider>
<ToastProvider>
<RouteProvider>
<SDKProvider url={input.url} directory={input.directory}>
<SDKProvider
url={input.url}
directory={input.directory}
fetch={input.fetch}
events={input.events}
>
<SyncProvider>
<ThemeProvider mode={mode}>
<LocalProvider>
Expand Down
77 changes: 48 additions & 29 deletions packages/opencode/src/cli/cmd/tui/context/sdk.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,66 @@ import { createSimpleContext } from "./helper"
import { createGlobalEmitter } from "@solid-primitives/event-bus"
import { batch, onCleanup, onMount } from "solid-js"

export type EventSource = {
on: (handler: (event: Event) => void) => () => void
}

export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
name: "SDK",
init: (props: { url: string; directory?: string }) => {
init: (props: { url: string; directory?: string; fetch?: typeof fetch; events?: EventSource }) => {
const abort = new AbortController()
const sdk = createOpencodeClient({
baseUrl: props.url,
signal: abort.signal,
directory: props.directory,
fetch: props.fetch,
})

const emitter = createGlobalEmitter<{
[key in Event["type"]]: Extract<Event, { type: key }>
}>()

let queue: Event[] = []
let timer: Timer | undefined
let last = 0

const flush = () => {
if (queue.length === 0) return
const events = queue
queue = []
timer = undefined
last = Date.now()
// Batch all event emissions so all store updates result in a single render
batch(() => {
for (const event of events) {
emitter.emit(event.type, event)
}
})
}

const handleEvent = (event: Event) => {
queue.push(event)
const elapsed = Date.now() - last

if (timer) return
// If we just flushed recently (within 16ms), batch this with future events
// Otherwise, process immediately to avoid latency
if (elapsed < 16) {
timer = setTimeout(flush, 16)
return
}
flush()
}

onMount(async () => {
// If an event source is provided, use it instead of SSE
if (props.events) {
const unsub = props.events.on(handleEvent)
onCleanup(unsub)
return
}

// Fall back to SSE
while (true) {
if (abort.signal.aborted) break
const events = await sdk.event.subscribe(
Expand All @@ -26,36 +71,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
signal: abort.signal,
},
)
let queue: Event[] = []
let timer: Timer | undefined
let last = 0

const flush = () => {
if (queue.length === 0) return
const events = queue
queue = []
timer = undefined
last = Date.now()
// Batch all event emissions so all store updates result in a single render
batch(() => {
for (const event of events) {
emitter.emit(event.type, event)
}
})
}

for await (const event of events.stream) {
queue.push(event)
const elapsed = Date.now() - last

if (timer) continue
// If we just flushed recently (within 16ms), batch this with future events
// Otherwise, process immediately to avoid latency
if (elapsed < 16) {
timer = setTimeout(flush, 16)
continue
}
flush()
handleEvent(event)
}

// Flush any remaining events
Expand All @@ -68,6 +86,7 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({

onCleanup(() => {
abort.abort()
if (timer) clearTimeout(timer)
})

return { client: sdk, event: emitter, url: props.url }
Expand Down
63 changes: 60 additions & 3 deletions packages/opencode/src/cli/cmd/tui/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,39 @@ import { UI } from "@/cli/ui"
import { iife } from "@/util/iife"
import { Log } from "@/util/log"
import { withNetworkOptions, resolveNetworkOptions } from "@/cli/network"
import type { Event } from "@opencode-ai/sdk/v2"
import type { EventSource } from "./context/sdk"

declare global {
const OPENCODE_WORKER_PATH: string
}

type RpcClient = ReturnType<typeof Rpc.client<typeof rpc>>

function createWorkerFetch(client: RpcClient): typeof fetch {
const fn = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
const request = new Request(input, init)
const body = request.body ? await request.text() : undefined
const result = await client.call("fetch", {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers.entries()),
body,
})
return new Response(result.body, {
status: result.status,
headers: result.headers,
})
}
return fn as typeof fetch
}

function createEventSource(client: RpcClient): EventSource {
return {
on: (handler) => client.on<Event>("event", handler),
}
}

export const TuiThreadCommand = cmd({
command: "$0 [project]",
describe: "start opencode tui",
Expand Down Expand Up @@ -80,16 +108,45 @@ export const TuiThreadCommand = cmd({
process.on("SIGUSR2", async () => {
await client.call("reload", undefined)
})
const opts = await resolveNetworkOptions(args)
const server = await client.call("server", opts)

const prompt = await iife(async () => {
const piped = !process.stdin.isTTY ? await Bun.stdin.text() : undefined
if (!args.prompt) return piped
return piped ? piped + "\n" + args.prompt : args.prompt
})

// Check if server should be started (port or hostname explicitly set in CLI or config)
const networkOpts = await resolveNetworkOptions(args)
const shouldStartServer =
process.argv.includes("--port") ||
process.argv.includes("--hostname") ||
process.argv.includes("--mdns") ||
networkOpts.mdns ||
networkOpts.port !== 0 ||
networkOpts.hostname !== "127.0.0.1"

// Subscribe to events from worker
await client.call("subscribe", { directory: cwd })

let url: string
let customFetch: typeof fetch | undefined
let events: EventSource | undefined

if (shouldStartServer) {
// Start HTTP server for external access
const server = await client.call("server", networkOpts)
url = server.url
} else {
// Use direct RPC communication (no HTTP)
url = "http://opencode.internal"
customFetch = createWorkerFetch(client)
events = createEventSource(client)
}

const tuiPromise = tui({
url: server.url,
url,
fetch: customFetch,
events,
args: {
continue: args.continue,
sessionID: args.session,
Expand Down
57 changes: 42 additions & 15 deletions packages/opencode/src/cli/cmd/tui/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "@/project/bootstrap"
import { Rpc } from "@/util/rpc"
import { upgrade } from "@/cli/upgrade"
import type { BunWebSocketData } from "hono/bun"
import { Config } from "@/config/config"
import { Bus } from "@/bus"
import { GlobalBus } from "@/bus/global"
import type { BunWebSocketData } from "hono/bun"

await Log.init({
print: process.argv.includes("--print-logs"),
Expand All @@ -29,20 +31,47 @@ process.on("uncaughtException", (e) => {
})
})

let server: Bun.Server<BunWebSocketData>
// Subscribe to global events and forward them via RPC
GlobalBus.on("event", (event) => {
Rpc.emit("global.event", event)
})

let server: Bun.Server<BunWebSocketData> | undefined

export const rpc = {
async server(input: { port: number; hostname: string; mdns?: boolean }) {
if (server) await server.stop(true)
try {
server = Server.listen(input)
return {
url: server.url.toString(),
}
} catch (e) {
console.error(e)
throw e
async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
const request = new Request(input.url, {
method: input.method,
headers: input.headers,
body: input.body,
})
const response = await Server.App().fetch(request)
const body = await response.text()
return {
status: response.status,
headers: Object.fromEntries(response.headers.entries()),
body,
}
},
async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) {
if (server) await server.stop(true)
server = Server.listen(input)
return { url: server.url.toString() }
},
async subscribe(input: { directory: string }) {
return Instance.provide({
directory: input.directory,
init: InstanceBootstrap,
fn: async () => {
Bus.subscribeAll((event) => {
Rpc.emit("event", event)
})
// Emit connected event
Rpc.emit("event", { type: "server.connected", properties: {} })
return { subscribed: true }
},
})
},
async checkUpgrade(input: { directory: string }) {
await Instance.provide({
directory: input.directory,
Expand All @@ -59,9 +88,7 @@ export const rpc = {
async shutdown() {
Log.Default.info("worker shutting down")
await Instance.disposeAll()
// TODO: this should be awaited, but ws connections are
// causing this to hang, need to revisit this
server.stop(true)
if (server) server.stop(true)
},
}

Expand Down
4 changes: 4 additions & 0 deletions packages/opencode/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,10 @@ export namespace Server {
host: "app.opencode.ai",
},
})
response.headers.set(
"Content-Security-Policy",
"default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'",
)
return response
}) as unknown as Hono,
)
Expand Down
24 changes: 24 additions & 0 deletions packages/opencode/src/util/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ export namespace Rpc {
}
}

export function emit(event: string, data: unknown) {
postMessage(JSON.stringify({ type: "rpc.event", event, data }))
}

export function client<T extends Definition>(target: {
postMessage: (data: string) => void | null
onmessage: ((this: Worker, ev: MessageEvent<any>) => any) | null
}) {
const pending = new Map<number, (result: any) => void>()
const listeners = new Map<string, Set<(data: any) => void>>()
let id = 0
target.onmessage = async (evt) => {
const parsed = JSON.parse(evt.data)
Expand All @@ -28,6 +33,14 @@ export namespace Rpc {
pending.delete(parsed.id)
}
}
if (parsed.type === "rpc.event") {
const handlers = listeners.get(parsed.event)
if (handlers) {
for (const handler of handlers) {
handler(parsed.data)
}
}
}
}
return {
call<Method extends keyof T>(method: Method, input: Parameters<T[Method]>[0]): Promise<ReturnType<T[Method]>> {
Expand All @@ -37,6 +50,17 @@ export namespace Rpc {
target.postMessage(JSON.stringify({ type: "rpc.request", method, input, id: requestId }))
})
},
on<Data>(event: string, handler: (data: Data) => void) {
let handlers = listeners.get(event)
if (!handlers) {
handlers = new Set()
listeners.set(event, handlers)
}
handlers.add(handler)
return () => {
handlers!.delete(handler)
}
},
}
}
}