diff --git a/.changeset/fresh-donkeys-breathe.md b/.changeset/fresh-donkeys-breathe.md new file mode 100644 index 000000000..e0ff47634 --- /dev/null +++ b/.changeset/fresh-donkeys-breathe.md @@ -0,0 +1,13 @@ +--- +'@tanstack/ai': patch +'@tanstack/ai-client': patch +'@tanstack/ai-preact': patch +'@tanstack/ai-react': patch +'@tanstack/ai-solid': patch +'@tanstack/ai-svelte': patch +'@tanstack/ai-vue': patch +--- + +Add an explicit subscription lifecycle to `ChatClient` with `subscribe()`/`unsubscribe()`, `isSubscribed`, `connectionStatus`, and `sessionGenerating`, while keeping request lifecycle state separate from long-lived connection state for durable chat sessions. + +Update the React, Preact, Solid, Svelte, and Vue chat bindings with `live` mode plus reactive subscription/session state, and improve `StreamProcessor` handling for concurrent runs and reconnects so active sessions do not finalize early or duplicate resumed assistant messages. diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index a97307ecf..4b0e40f70 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -19,6 +19,7 @@ import type { ChatClientEventEmitter } from './events' import type { ChatClientOptions, ChatClientState, + ConnectionStatus, MessagePart, MultimodalContent, ToolCallPart, @@ -32,8 +33,10 @@ export class ChatClient { private body: Record = {} private pendingMessageBody: Record | undefined = undefined private isLoading = false + private isSubscribed = false private error: Error | undefined = undefined private status: ChatClientState = 'ready' + private connectionStatus: ConnectionStatus = 'disconnected' private abortController: AbortController | null = null private events: ChatClientEventEmitter private clientToolsRef: { current: Map } @@ -51,6 +54,8 @@ export class ChatClient { // Tracks whether a queued checkForContinuation was skipped because // continuationPending was true (chained approval scenario) private continuationSkipped = false + private sessionGenerating = false + private activeRunIds = new Set() private callbacksRef: { current: { @@ -62,6 +67,9 @@ export class ChatClient { onLoadingChange: (isLoading: boolean) => void onErrorChange: (error: Error | undefined) => void onStatusChange: (status: ChatClientState) => void + onSubscriptionChange: (isSubscribed: boolean) => void + onConnectionStatusChange: (status: ConnectionStatus) => void + onSessionGeneratingChange: (isGenerating: boolean) => void onCustomEvent: ( eventType: string, data: unknown, @@ -94,6 +102,11 @@ export class ChatClient { onLoadingChange: options.onLoadingChange || (() => {}), onErrorChange: options.onErrorChange || (() => {}), onStatusChange: options.onStatusChange || (() => {}), + onSubscriptionChange: options.onSubscriptionChange || (() => {}), + onConnectionStatusChange: + options.onConnectionStatusChange || (() => {}), + onSessionGeneratingChange: + options.onSessionGeneratingChange || (() => {}), onCustomEvent: options.onCustomEvent || (() => {}), }, } @@ -259,6 +272,27 @@ export class ChatClient { this.callbacksRef.current.onStatusChange(status) } + private setIsSubscribed(isSubscribed: boolean): void { + this.isSubscribed = isSubscribed + this.callbacksRef.current.onSubscriptionChange(isSubscribed) + } + + private setConnectionStatus(status: ConnectionStatus): void { + this.connectionStatus = status + this.callbacksRef.current.onConnectionStatusChange(status) + } + + private setSessionGenerating(isGenerating: boolean): void { + if (this.sessionGenerating === isGenerating) return + this.sessionGenerating = isGenerating + this.callbacksRef.current.onSessionGeneratingChange(isGenerating) + } + + private resetSessionGenerating(): void { + this.activeRunIds.clear() + this.setSessionGenerating(false) + } + private setError(error: Error | undefined): void { this.error = error this.callbacksRef.current.onErrorChange(error) @@ -275,10 +309,15 @@ export class ChatClient { this.processingResolve = null } - private cancelInFlightStream(options?: { setReadyStatus?: boolean }): void { + private cancelInFlightStream(options?: { + setReadyStatus?: boolean + abortSubscription?: boolean + }): void { this.abortController?.abort() this.abortController = null - this.abortSubscriptionLoop() + if (options?.abortSubscription) { + this.abortSubscriptionLoop() + } this.resolveProcessing() this.setIsLoading(false) if (options?.setReadyStatus) { @@ -290,7 +329,15 @@ export class ChatClient { const alreadyReported = this.errorReportedGeneration === this.streamGeneration this.setError(error) - this.setStatus('error') + // Preserve request-level error semantics even if a RUN_ERROR arrives + // slightly after loading flips false during stream teardown. + if ( + this.isLoading || + this.status === 'submitted' || + this.status === 'streaming' + ) { + this.setStatus('error') + } if (!alreadyReported) { this.errorReportedGeneration = this.streamGeneration this.callbacksRef.current.onError(error) @@ -304,13 +351,30 @@ export class ChatClient { this.subscriptionAbortController = new AbortController() const signal = this.subscriptionAbortController.signal - this.consumeSubscription(signal).catch((err) => { - if (err instanceof Error && err.name !== 'AbortError') { - this.reportStreamError(err) - } - // Resolve pending processing so streamResponse doesn't hang - this.resolveProcessing() - }) + this.consumeSubscription(signal) + .catch((err) => { + if (err instanceof Error && err.name !== 'AbortError') { + this.setConnectionStatus('error') + this.resetSessionGenerating() + this.setIsSubscribed(false) + this.reportStreamError(err) + } + // Resolve pending processing so streamResponse doesn't hang + this.resolveProcessing() + }) + .finally(() => { + // Ignore stale loops that were superseded by a restart. + if (this.subscriptionAbortController?.signal !== signal) { + return + } + this.subscriptionAbortController = null + if (!signal.aborted && this.isSubscribed) { + this.setIsSubscribed(false) + if (this.connectionStatus !== 'error') { + this.setConnectionStatus('disconnected') + } + } + }) } /** @@ -320,11 +384,25 @@ export class ChatClient { const stream = this.connection.subscribe(signal) for await (const chunk of stream) { if (signal.aborted) break + if (this.connectionStatus === 'connecting') { + this.setConnectionStatus('connected') + } this.callbacksRef.current.onChunk(chunk) this.processor.processChunk(chunk) + if (chunk.type === 'RUN_STARTED') { + this.activeRunIds.add(chunk.runId) + this.setSessionGenerating(true) + } // RUN_FINISHED / RUN_ERROR signal run completion — resolve processing // (redundant if onStreamEnd already resolved it, harmless) if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') { + if (chunk.runId) { + this.activeRunIds.delete(chunk.runId) + } else if (chunk.type === 'RUN_ERROR') { + // RUN_ERROR without runId is a session-level error; clear all runs + this.activeRunIds.clear() + } + this.setSessionGenerating(this.activeRunIds.size > 0) this.resolveProcessing() } // Yield control back to event loop for UI updates @@ -336,11 +414,15 @@ export class ChatClient { * Ensure subscription loop is running, starting it if needed. */ private ensureSubscription(): void { + if (!this.isSubscribed) { + this.subscribe() + return + } if ( !this.subscriptionAbortController || this.subscriptionAbortController.signal.aborted ) { - this.startSubscription() + this.subscribe({ restart: true }) } } @@ -593,6 +675,39 @@ export class ChatClient { return streamCompletedSuccessfully } + /** + * Start the client subscription loop. + * This controls the connection lifecycle independently from request lifecycle. + */ + subscribe(options?: { restart?: boolean }): void { + const restart = options?.restart === true + if (this.isSubscribed && !restart) { + return + } + + if (this.isSubscribed && restart) { + this.abortSubscriptionLoop() + } + + this.setIsSubscribed(true) + this.setConnectionStatus('connecting') + this.startSubscription() + } + + /** + * Unsubscribe and fully tear down live behavior. + * This aborts an in-flight request and the subscription loop. + */ + unsubscribe(): void { + this.cancelInFlightStream({ + setReadyStatus: true, + abortSubscription: true, + }) + this.resetSessionGenerating() + this.setIsSubscribed(false) + this.setConnectionStatus('disconnected') + } + /** * Reload the last assistant message */ @@ -789,6 +904,30 @@ export class ChatClient { return this.status } + /** + * Get whether the subscription loop is active + */ + getIsSubscribed(): boolean { + return this.isSubscribed + } + + /** + * Get current connection lifecycle status + */ + getConnectionStatus(): ConnectionStatus { + return this.connectionStatus + } + + /** + * Whether the shared session is actively generating. + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `isLoading` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + getSessionGenerating(): boolean { + return this.sessionGenerating + } + /** * Get current error */ @@ -814,6 +953,9 @@ export class ChatClient { onChunk?: (chunk: StreamChunk) => void onFinish?: (message: UIMessage) => void onError?: (error: Error) => void + onSubscriptionChange?: (isSubscribed: boolean) => void + onConnectionStatusChange?: (status: ConnectionStatus) => void + onSessionGeneratingChange?: (isGenerating: boolean) => void onCustomEvent?: ( eventType: string, data: unknown, @@ -821,13 +963,25 @@ export class ChatClient { ) => void }): void { if (options.connection !== undefined) { - // Cancel any in-flight stream to avoid hanging on stale processing promises + const wasSubscribed = this.isSubscribed + if (this.isLoading) { - this.cancelInFlightStream({ setReadyStatus: true }) - } else { + this.cancelInFlightStream({ + setReadyStatus: true, + abortSubscription: true, + }) + } else if (wasSubscribed) { this.abortSubscriptionLoop() } + + this.resetSessionGenerating() + this.setIsSubscribed(false) + this.setConnectionStatus('disconnected') this.connection = normalizeConnectionAdapter(options.connection) + + if (wasSubscribed) { + this.subscribe() + } } if (options.body !== undefined) { this.body = options.body @@ -850,6 +1004,18 @@ export class ChatClient { if (options.onError !== undefined) { this.callbacksRef.current.onError = options.onError } + if (options.onSubscriptionChange !== undefined) { + this.callbacksRef.current.onSubscriptionChange = + options.onSubscriptionChange + } + if (options.onConnectionStatusChange !== undefined) { + this.callbacksRef.current.onConnectionStatusChange = + options.onConnectionStatusChange + } + if (options.onSessionGeneratingChange !== undefined) { + this.callbacksRef.current.onSessionGeneratingChange = + options.onSessionGeneratingChange + } if (options.onCustomEvent !== undefined) { this.callbacksRef.current.onCustomEvent = options.onCustomEvent } diff --git a/packages/typescript/ai-client/src/index.ts b/packages/typescript/ai-client/src/index.ts index deb9a3ed3..93654ba66 100644 --- a/packages/typescript/ai-client/src/index.ts +++ b/packages/typescript/ai-client/src/index.ts @@ -15,6 +15,7 @@ export type { ChatRequestBody, InferChatMessages, ChatClientState, + ConnectionStatus, // Multimodal content input type MultimodalContent, } from './types' diff --git a/packages/typescript/ai-client/src/types.ts b/packages/typescript/ai-client/src/types.ts index 4f250edd3..b705ebbdf 100644 --- a/packages/typescript/ai-client/src/types.ts +++ b/packages/typescript/ai-client/src/types.ts @@ -36,6 +36,15 @@ export type ToolResultState = */ export type ChatClientState = 'ready' | 'submitted' | 'streaming' | 'error' +/** + * Connection lifecycle state for the subscription loop. + */ +export type ConnectionStatus = + | 'disconnected' + | 'connecting' + | 'connected' + | 'error' + /** * Multimodal content input for sending messages with rich media. * Allows sending text, images, audio, video, and documents to the LLM. @@ -240,6 +249,25 @@ export interface ChatClientOptions< */ onStatusChange?: (status: ChatClientState) => void + /** + * Callback when subscription lifecycle changes. + * This is independent from request lifecycle (`isLoading`, `status`). + */ + onSubscriptionChange?: (isSubscribed: boolean) => void + + /** + * Callback when connection lifecycle changes. + */ + onConnectionStatusChange?: (status: ConnectionStatus) => void + + /** + * Callback when session generation activity changes. + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `onLoadingChange` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + onSessionGeneratingChange?: (isGenerating: boolean) => void + /** * Callback when a custom event is received from a server-side tool. * Custom events are emitted by tools using `context.emitCustomEvent()` during execution. diff --git a/packages/typescript/ai-client/tests/chat-client.test.ts b/packages/typescript/ai-client/tests/chat-client.test.ts index 9d0eff15c..52dfa2e7d 100644 --- a/packages/typescript/ai-client/tests/chat-client.test.ts +++ b/packages/typescript/ai-client/tests/chat-client.test.ts @@ -20,6 +20,8 @@ describe('ChatClient', () => { expect(client.getMessages()).toEqual([]) expect(client.getIsLoading()).toBe(false) + expect(client.getIsSubscribed()).toBe(false) + expect(client.getConnectionStatus()).toBe('disconnected') expect(client.getError()).toBeUndefined() }) @@ -142,6 +144,170 @@ describe('ChatClient', () => { expect(adapter.send).toHaveBeenCalled() }) + it('stop should not unsubscribe an active subscription', async () => { + const adapter = createSubscribeAdapter([ + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + ]) + const client = new ChatClient({ connection: adapter }) + + client.subscribe() + expect(client.getIsSubscribed()).toBe(true) + expect(client.getConnectionStatus()).toBe('connecting') + + const sendPromise = client.sendMessage('Hello') + client.stop() + await sendPromise + + expect(client.getIsSubscribed()).toBe(true) + client.unsubscribe() + expect(client.getIsSubscribed()).toBe(false) + expect(client.getConnectionStatus()).toBe('disconnected') + }) + + it('should re-subscribe on connection update when previously subscribed', () => { + const adapter1 = createSubscribeAdapter([]) + const adapter2 = createSubscribeAdapter([]) + const client = new ChatClient({ connection: adapter1 }) + + client.subscribe() + expect(client.getIsSubscribed()).toBe(true) + expect(adapter1.subscribe).toHaveBeenCalledTimes(1) + + client.updateOptions({ connection: adapter2 }) + + expect(client.getIsSubscribed()).toBe(true) + expect(adapter2.subscribe).toHaveBeenCalledTimes(1) + }) + + it('should emit subscription and connection lifecycle callbacks', async () => { + const adapter = createSubscribeAdapter(createTextChunks('callback flow')) + const subscriptionChanges: Array = [] + const connectionStatuses: Array = [] + const client = new ChatClient({ + connection: adapter, + onSubscriptionChange: (isSubscribed) => { + subscriptionChanges.push(isSubscribed) + }, + onConnectionStatusChange: (status) => { + connectionStatuses.push(status) + }, + }) + + client.subscribe() + await client.sendMessage('Hello') + client.unsubscribe() + + expect(subscriptionChanges).toEqual([true, false]) + expect(connectionStatuses[0]).toBe('connecting') + expect(connectionStatuses).toContain('connected') + expect(connectionStatuses[connectionStatuses.length - 1]).toBe( + 'disconnected', + ) + }) + + it('subscribe should be idempotent without restart', () => { + const adapter = createSubscribeAdapter([]) + const client = new ChatClient({ connection: adapter }) + + client.subscribe() + client.subscribe() + + expect(adapter.subscribe).toHaveBeenCalledTimes(1) + expect(client.getIsSubscribed()).toBe(true) + + client.unsubscribe() + }) + + it('subscribe with restart should start a new subscription loop', () => { + const adapter = createSubscribeAdapter([]) + const client = new ChatClient({ connection: adapter }) + + client.subscribe() + client.subscribe({ restart: true }) + + expect(adapter.subscribe).toHaveBeenCalledTimes(2) + expect(client.getIsSubscribed()).toBe(true) + expect(client.getConnectionStatus()).toBe('connecting') + + client.unsubscribe() + }) + + it('unsubscribe should be idempotent', () => { + const adapter = createSubscribeAdapter([]) + const client = new ChatClient({ connection: adapter }) + + client.unsubscribe() + client.unsubscribe() + + expect(client.getIsSubscribed()).toBe(false) + expect(client.getConnectionStatus()).toBe('disconnected') + }) + + it('unsubscribe should abort in-flight requests and disconnect', async () => { + const adapter = createSubscribeAdapter([ + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'H', + content: 'H', + }, + ]) + const client = new ChatClient({ connection: adapter }) + + client.subscribe() + const sendPromise = client.sendMessage('Hello') + await new Promise((resolve) => setTimeout(resolve, 10)) + client.unsubscribe() + + const completed = await Promise.race([ + sendPromise.then(() => true), + new Promise((resolve) => + setTimeout(() => resolve(false), 500), + ), + ]) + + expect(completed).toBe(true) + expect(client.getIsLoading()).toBe(false) + expect(client.getIsSubscribed()).toBe(false) + expect(client.getConnectionStatus()).toBe('disconnected') + }) + + it('should not re-subscribe on connection update when not subscribed', () => { + const adapter1 = createSubscribeAdapter([]) + const adapter2 = createSubscribeAdapter([]) + const client = new ChatClient({ connection: adapter1 }) + + client.updateOptions({ connection: adapter2 }) + + expect(client.getIsSubscribed()).toBe(false) + expect(client.getConnectionStatus()).toBe('disconnected') + expect(adapter2.subscribe).not.toHaveBeenCalled() + }) + + it('should expose connectionStatus error for subscription loop failures', async () => { + const connection = { + subscribe: async function* () { + throw new Error('subscription failed') + }, + send: async () => {}, + } + const client = new ChatClient({ connection }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(client.getIsSubscribed()).toBe(false) + expect(client.getConnectionStatus()).toBe('error') + }) + it('should remain pending without terminal run events', async () => { const adapter = createSubscribeAdapter([ { @@ -169,6 +335,487 @@ describe('ChatClient', () => { client.stop() await sendPromise }) + + describe('sessionGenerating', () => { + it('should be false initially', () => { + const adapter = createSubscribeAdapter([]) + const client = new ChatClient({ connection: adapter }) + + expect(client.getSessionGenerating()).toBe(false) + }) + + it('should flip to true on RUN_STARTED and false on RUN_FINISHED', async () => { + const chunks: Array = [ + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'Hi', + content: 'Hi', + }, + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + ] + const adapter = createSubscribeAdapter(chunks) + const generatingChanges: Array = [] + const client = new ChatClient({ + connection: adapter, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + await client.sendMessage('Hello') + + expect(client.getSessionGenerating()).toBe(false) + expect(generatingChanges).toEqual([true, false]) + }) + + it('should flip to false on RUN_ERROR', async () => { + const chunks: Array = [ + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'RUN_ERROR', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + error: { message: 'something went wrong' }, + }, + ] + const adapter = createSubscribeAdapter(chunks) + const generatingChanges: Array = [] + const client = new ChatClient({ + connection: adapter, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + await client.sendMessage('Hello') + + expect(client.getSessionGenerating()).toBe(false) + expect(generatingChanges).toEqual([true, false]) + }) + + it('should remain correct through subscribe/unsubscribe cycles', async () => { + const chunks: Array = [ + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'Hi', + content: 'Hi', + }, + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + ] + const adapter = createSubscribeAdapter(chunks) + const client = new ChatClient({ connection: adapter }) + + // First cycle + await client.sendMessage('Hello') + expect(client.getSessionGenerating()).toBe(false) + + // Unsubscribe + client.unsubscribe() + expect(client.getSessionGenerating()).toBe(false) + }) + + it('should reset on unsubscribe while generating', async () => { + let yieldedStart = false + const connection = { + subscribe: async function* (signal?: AbortSignal) { + while (!signal?.aborted) { + if (!yieldedStart) { + yieldedStart = true + yield { + type: 'RUN_STARTED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + } + } + await new Promise((resolve) => { + const onAbort = () => resolve() + signal?.addEventListener('abort', onAbort, { once: true }) + }) + } + }, + send: async () => { + // no-op; the subscribe generator yields RUN_STARTED on its own + }, + } + const generatingChanges: Array = [] + const client = new ChatClient({ + connection, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(true) + + client.unsubscribe() + + expect(client.getSessionGenerating()).toBe(false) + expect(generatingChanges).toEqual([true, false]) + }) + + it('should reset on connection adapter replacement', async () => { + let yieldedStart = false + const connection1 = { + subscribe: async function* (signal?: AbortSignal) { + while (!signal?.aborted) { + if (!yieldedStart) { + yieldedStart = true + yield { + type: 'RUN_STARTED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + } + } + await new Promise((resolve) => { + const onAbort = () => resolve() + signal?.addEventListener('abort', onAbort, { once: true }) + }) + } + }, + send: async () => {}, + } + const connection2 = createSubscribeAdapter([]) + const generatingChanges: Array = [] + const client = new ChatClient({ + connection: connection1, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 20)) + expect(client.getSessionGenerating()).toBe(true) + + client.updateOptions({ connection: connection2 }) + + expect(client.getSessionGenerating()).toBe(false) + expect(generatingChanges).toEqual([true, false]) + + client.unsubscribe() + }) + + it('should not emit duplicate callbacks on repeated same-state events', async () => { + const chunks: Array = [ + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'Hi', + content: 'Hi', + }, + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + ] + const adapter = createSubscribeAdapter(chunks) + const generatingChanges: Array = [] + const client = new ChatClient({ + connection: adapter, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + await client.sendMessage('Hello') + + expect(generatingChanges).toEqual([true, false]) + }) + + it('should handle interleaved multi-run events from durable subscription', async () => { + const chunks: Array = [ + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'A', + content: 'A', + }, + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + ] + const adapter = createSubscribeAdapter(chunks) + const generatingChanges: Array = [] + const client = new ChatClient({ + connection: adapter, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + await client.sendMessage('First') + expect(generatingChanges).toEqual([true, false]) + + await client.sendMessage('Second') + expect(generatingChanges).toEqual([true, false, true, false]) + }) + + it('should stay true during concurrent runs until all finish', async () => { + const wake = { fn: null as (() => void) | null } + const chunks: Array = [] + const connection = { + subscribe: async function* (signal?: AbortSignal) { + while (!signal?.aborted) { + if (chunks.length > 0) { + const batch = chunks.splice(0) + for (const chunk of batch) { + yield chunk + } + } + await new Promise((resolve) => { + wake.fn = resolve + const onAbort = () => resolve() + signal?.addEventListener('abort', onAbort, { once: true }) + }) + } + }, + send: async () => { + wake.fn?.() + }, + } + const generatingChanges: Array = [] + const client = new ChatClient({ + connection, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Simulate two concurrent runs starting + chunks.push( + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'RUN_STARTED', + runId: 'run-2', + model: 'test', + timestamp: Date.now(), + }, + ) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(true) + + // First run finishes — should still be generating because run-2 is active + chunks.push({ + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(true) + + // Second run finishes — now should be false + chunks.push({ + type: 'RUN_FINISHED', + runId: 'run-2', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(false) + // Only two transitions: false→true at start, true→false when all done + expect(generatingChanges).toEqual([true, false]) + + client.unsubscribe() + }) + + it('should clear all runs on RUN_ERROR without runId', async () => { + const wake = { fn: null as (() => void) | null } + const chunks: Array = [] + const connection = { + subscribe: async function* (signal?: AbortSignal) { + while (!signal?.aborted) { + if (chunks.length > 0) { + const batch = chunks.splice(0) + for (const chunk of batch) { + yield chunk + } + } + await new Promise((resolve) => { + wake.fn = resolve + const onAbort = () => resolve() + signal?.addEventListener('abort', onAbort, { once: true }) + }) + } + }, + send: async () => { + wake.fn?.() + }, + } + const generatingChanges: Array = [] + const client = new ChatClient({ + connection, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Two runs active + chunks.push( + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'RUN_STARTED', + runId: 'run-2', + model: 'test', + timestamp: Date.now(), + }, + ) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(true) + + // Session-level error without runId clears everything + chunks.push({ + type: 'RUN_ERROR', + model: 'test', + timestamp: Date.now(), + error: { message: 'session crashed' }, + }) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(false) + expect(generatingChanges).toEqual([true, false]) + + client.unsubscribe() + }) + + it('should reset on fatal subscription loop teardown', async () => { + let yieldedStart = false + const connection = { + subscribe: async function* (_signal?: AbortSignal) { + if (!yieldedStart) { + yieldedStart = true + yield { + type: 'RUN_STARTED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + } + await new Promise((resolve) => setTimeout(resolve, 10)) + } + throw new Error('subscription failed') + }, + send: async () => {}, + } + const generatingChanges: Array = [] + const client = new ChatClient({ + connection, + onSessionGeneratingChange: (isGenerating) => { + generatingChanges.push(isGenerating) + }, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(client.getSessionGenerating()).toBe(false) + expect(generatingChanges).toContain(true) + expect(generatingChanges[generatingChanges.length - 1]).toBe(false) + }) + }) }) describe('sendMessage', () => { @@ -1372,4 +2019,237 @@ describe('ChatClient', () => { expect(textPart?.content).toBe('All done!') }) }) + + describe('concurrent runs and reconnect correctness', () => { + it('concurrent runs should not produce duplicate messages or corrupt content', async () => { + const wake = { fn: null as (() => void) | null } + const chunks: Array = [] + const connection = { + subscribe: async function* (signal?: AbortSignal) { + while (!signal?.aborted) { + if (chunks.length > 0) { + const batch = chunks.splice(0) + for (const chunk of batch) { + yield chunk + } + } + await new Promise((resolve) => { + wake.fn = resolve + const onAbort = () => resolve() + signal?.addEventListener('abort', onAbort, { once: true }) + }) + } + }, + send: async () => { + wake.fn?.() + }, + } + + const messagesSnapshots: Array> = [] + const client = new ChatClient({ + connection, + onMessagesChange: (msgs) => { + messagesSnapshots.push(msgs.map((m) => ({ ...m }))) + }, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Run A starts with text message + chunks.push( + { + type: 'RUN_STARTED', + runId: 'run-a', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_START', + messageId: 'msg-a', + role: 'assistant', + model: 'test', + timestamp: Date.now(), + } as StreamChunk, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-a', + model: 'test', + timestamp: Date.now(), + delta: 'Story: ', + } as StreamChunk, + ) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Run B starts concurrently + chunks.push( + { + type: 'RUN_STARTED', + runId: 'run-b', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_START', + messageId: 'msg-b', + role: 'assistant', + model: 'test', + timestamp: Date.now(), + } as StreamChunk, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-b', + model: 'test', + timestamp: Date.now(), + delta: 'Hi!', + } as StreamChunk, + ) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Run B finishes — Run A should still be active + chunks.push({ + type: 'RUN_FINISHED', + runId: 'run-b', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Run A continues streaming + chunks.push({ + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-a', + model: 'test', + timestamp: Date.now(), + delta: 'once upon a time', + } as StreamChunk) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Verify msg-a still has correct content after run-b finished + const messages = client.getMessages() + const msgA = messages.find((m) => m.id === 'msg-a') + const msgB = messages.find((m) => m.id === 'msg-b') + + expect(msgA).toBeDefined() + expect(msgB).toBeDefined() + expect(msgA!.parts[0]).toEqual({ + type: 'text', + content: 'Story: once upon a time', + }) + expect(msgB!.parts[0]).toEqual({ type: 'text', content: 'Hi!' }) + + // No duplicate messages + expect(messages.filter((m) => m.id === 'msg-a')).toHaveLength(1) + expect(messages.filter((m) => m.id === 'msg-b')).toHaveLength(1) + + // Finish run A + chunks.push({ + type: 'RUN_FINISHED', + runId: 'run-a', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + expect(client.getSessionGenerating()).toBe(false) + client.unsubscribe() + }) + + it('reconnect with initialMessages should not duplicate assistant message on content arrival', async () => { + const wake = { fn: null as (() => void) | null } + const chunks: Array = [] + const connection = { + subscribe: async function* (signal?: AbortSignal) { + while (!signal?.aborted) { + if (chunks.length > 0) { + const batch = chunks.splice(0) + for (const chunk of batch) { + yield chunk + } + } + await new Promise((resolve) => { + wake.fn = resolve + const onAbort = () => resolve() + signal?.addEventListener('abort', onAbort, { once: true }) + }) + } + }, + send: async () => { + wake.fn?.() + }, + } + + // Simulate reconnect: client created with initialMessages (from SSR/snapshot) + const initialMessages: Array = [ + { + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Tell me a story' }], + createdAt: new Date(), + }, + { + id: 'asst-1', + role: 'assistant', + parts: [{ type: 'text', content: 'Once upon a ' }], + createdAt: new Date(), + }, + ] + + const client = new ChatClient({ + connection, + initialMessages, + }) + + client.subscribe() + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Resumed content for in-progress message (no TEXT_MESSAGE_START) + chunks.push( + { + type: 'RUN_STARTED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + }, + { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'asst-1', + model: 'test', + timestamp: Date.now(), + delta: 'time...', + } as StreamChunk, + { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }, + ) + wake.fn?.() + await new Promise((resolve) => setTimeout(resolve, 20)) + + const messages = client.getMessages() + + // Should still have exactly 2 messages, not 3 + expect(messages).toHaveLength(2) + + // Content should be correctly appended + const asstMsg = messages.find((m) => m.id === 'asst-1') + expect(asstMsg).toBeDefined() + expect(asstMsg!.parts[0]).toEqual({ + type: 'text', + content: 'Once upon a time...', + }) + + client.unsubscribe() + }) + }) }) diff --git a/packages/typescript/ai-preact/src/types.ts b/packages/typescript/ai-preact/src/types.ts index 3b1a4a041..0d4a15eb2 100644 --- a/packages/typescript/ai-preact/src/types.ts +++ b/packages/typescript/ai-preact/src/types.ts @@ -3,6 +3,7 @@ import type { ChatClientOptions, ChatClientState, ChatRequestBody, + ConnectionStatus, MultimodalContent, UIMessage, } from '@tanstack/ai-client' @@ -29,8 +30,16 @@ export type { ChatRequestBody, MultimodalContent, UIMessage } export type UseChatOptions = any> = Omit< ChatClientOptions, - 'onMessagesChange' | 'onLoadingChange' | 'onErrorChange' | 'onStatusChange' - > + | 'onMessagesChange' + | 'onLoadingChange' + | 'onErrorChange' + | 'onStatusChange' + | 'onSubscriptionChange' + | 'onConnectionStatusChange' + | 'onSessionGeneratingChange' + > & { + live?: boolean + } export interface UseChatReturn< TTools extends ReadonlyArray = any, @@ -104,4 +113,22 @@ export interface UseChatReturn< * Current generation status */ status: ChatClientState + + /** + * Whether the subscription loop is currently active + */ + isSubscribed: boolean + + /** + * Current connection lifecycle status + */ + connectionStatus: ConnectionStatus + + /** + * Whether the shared session is actively generating. + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `isLoading` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + sessionGenerating: boolean } diff --git a/packages/typescript/ai-preact/src/use-chat.ts b/packages/typescript/ai-preact/src/use-chat.ts index cfa9340f4..dbfb5b32c 100644 --- a/packages/typescript/ai-preact/src/use-chat.ts +++ b/packages/typescript/ai-preact/src/use-chat.ts @@ -7,7 +7,7 @@ import { useRef, useState, } from 'preact/hooks' -import type { ChatClientState } from '@tanstack/ai-client' +import type { ChatClientState, ConnectionStatus } from '@tanstack/ai-client' import type { AnyClientTool, ModelMessage } from '@tanstack/ai' import type { @@ -29,6 +29,10 @@ export function useChat = any>( const [isLoading, setIsLoading] = useState(false) const [error, setError] = useState(undefined) const [status, setStatus] = useState('ready') + const [isSubscribed, setIsSubscribed] = useState(false) + const [connectionStatus, setConnectionStatus] = + useState('disconnected') + const [sessionGenerating, setSessionGenerating] = useState(false) // Track current messages in a ref to preserve them when client is recreated const messagesRef = useRef>>( @@ -78,6 +82,15 @@ export function useChat = any>( onErrorChange: (newError: Error | undefined) => { setError(newError) }, + onSubscriptionChange: (nextIsSubscribed: boolean) => { + setIsSubscribed(nextIsSubscribed) + }, + onConnectionStatusChange: (nextStatus: ConnectionStatus) => { + setConnectionStatus(nextStatus) + }, + onSessionGeneratingChange: (isGenerating: boolean) => { + setSessionGenerating(isGenerating) + }, }) }, [clientId]) @@ -100,15 +113,27 @@ export function useChat = any>( } }, []) + useEffect(() => { + if (options.live) { + client.subscribe() + } else { + client.unsubscribe() + } + }, [client, options.live]) + // Cleanup on unmount: stop any in-flight requests // Note: We only cleanup when client changes or component unmounts. // DO NOT include isLoading in dependencies - that would cause the cleanup // to run when isLoading changes, aborting continuation requests. useEffect(() => { return () => { - client.stop() + if (options.live) { + client.unsubscribe() + } else { + client.stop() + } } - }, [client]) + }, [client, options.live]) // Note: Callback options (onResponse, onChunk, onFinish, onError, onToolCall) // are captured at client creation time. Changes to these callbacks require @@ -175,6 +200,9 @@ export function useChat = any>( isLoading, error, status, + isSubscribed, + connectionStatus, + sessionGenerating, setMessages: setMessagesManually, clear, addToolResult, diff --git a/packages/typescript/ai-preact/tests/use-chat.test.ts b/packages/typescript/ai-preact/tests/use-chat.test.ts index aa21d1cac..e2e2b8f24 100644 --- a/packages/typescript/ai-preact/tests/use-chat.test.ts +++ b/packages/typescript/ai-preact/tests/use-chat.test.ts @@ -19,6 +19,21 @@ describe('useChat', () => { expect(result.current.isLoading).toBe(false) expect(result.current.error).toBeUndefined() expect(result.current.status).toBe('ready') + expect(result.current.isSubscribed).toBe(false) + expect(result.current.connectionStatus).toBe('disconnected') + expect(result.current.sessionGenerating).toBe(false) + }) + + it('should subscribe immediately when live is true', async () => { + const adapter = createMockConnectionAdapter() + const { result } = renderUseChat({ connection: adapter, live: true }) + + await waitFor(() => { + expect(result.current.isSubscribed).toBe(true) + }) + expect(['connecting', 'connected']).toContain( + result.current.connectionStatus, + ) }) it('should initialize with provided messages', () => { diff --git a/packages/typescript/ai-react/src/types.ts b/packages/typescript/ai-react/src/types.ts index a960a1b8f..ec8f49a92 100644 --- a/packages/typescript/ai-react/src/types.ts +++ b/packages/typescript/ai-react/src/types.ts @@ -3,6 +3,7 @@ import type { ChatClientOptions, ChatClientState, ChatRequestBody, + ConnectionStatus, MultimodalContent, UIMessage, } from '@tanstack/ai-client' @@ -29,8 +30,20 @@ export type { ChatRequestBody, MultimodalContent, UIMessage } export type UseChatOptions = any> = Omit< ChatClientOptions, - 'onMessagesChange' | 'onLoadingChange' | 'onErrorChange' | 'onStatusChange' - > + | 'onMessagesChange' + | 'onLoadingChange' + | 'onErrorChange' + | 'onStatusChange' + | 'onSubscriptionChange' + | 'onConnectionStatusChange' + | 'onSessionGeneratingChange' + > & { + /** + * Opt into mount-time live subscription behavior. + * When enabled, the hook subscribes on mount and unsubscribes on unmount. + */ + live?: boolean + } export interface UseChatReturn< TTools extends ReadonlyArray = any, @@ -95,6 +108,24 @@ export interface UseChatReturn< */ status: ChatClientState + /** + * Whether the subscription loop is currently active + */ + isSubscribed: boolean + + /** + * Current connection lifecycle status + */ + connectionStatus: ConnectionStatus + + /** + * Whether the shared session is actively generating. + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `isLoading` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + sessionGenerating: boolean + /** * Set messages manually */ diff --git a/packages/typescript/ai-react/src/use-chat.ts b/packages/typescript/ai-react/src/use-chat.ts index b49ac0514..6d4b90e7a 100644 --- a/packages/typescript/ai-react/src/use-chat.ts +++ b/packages/typescript/ai-react/src/use-chat.ts @@ -1,7 +1,7 @@ import { ChatClient } from '@tanstack/ai-client' import { useCallback, useEffect, useId, useMemo, useRef, useState } from 'react' import type { AnyClientTool, ModelMessage } from '@tanstack/ai' -import type { ChatClientState } from '@tanstack/ai-client' +import type { ChatClientState, ConnectionStatus } from '@tanstack/ai-client' import type { MultimodalContent, @@ -22,6 +22,10 @@ export function useChat = any>( const [isLoading, setIsLoading] = useState(false) const [error, setError] = useState(undefined) const [status, setStatus] = useState('ready') + const [isSubscribed, setIsSubscribed] = useState(false) + const [connectionStatus, setConnectionStatus] = + useState('disconnected') + const [sessionGenerating, setSessionGenerating] = useState(false) // Track current messages in a ref to preserve them when client is recreated const messagesRef = useRef>>( @@ -78,6 +82,15 @@ export function useChat = any>( onStatusChange: (status: ChatClientState) => { setStatus(status) }, + onSubscriptionChange: (nextIsSubscribed: boolean) => { + setIsSubscribed(nextIsSubscribed) + }, + onConnectionStatusChange: (nextStatus: ConnectionStatus) => { + setConnectionStatus(nextStatus) + }, + onSessionGeneratingChange: (isGenerating: boolean) => { + setSessionGenerating(isGenerating) + }, }) }, [clientId]) @@ -99,16 +112,29 @@ export function useChat = any>( } }, []) // Only run on mount - initialMessages are handled by ChatClient constructor + // Keep connection lifecycle opt-in and explicit. + useEffect(() => { + if (options.live) { + client.subscribe() + } else { + client.unsubscribe() + } + }, [client, options.live]) + // Cleanup on unmount: stop any in-flight requests // Note: We only cleanup when client changes or component unmounts. // DO NOT include isLoading in dependencies - that would cause the cleanup // to run when isLoading changes, aborting continuation requests. useEffect(() => { return () => { - // Stop any active generation when component unmounts or client changes - client.stop() + // live mode owns the connection lifecycle; non-live keeps request-only stop. + if (options.live) { + client.unsubscribe() + } else { + client.stop() + } } - }, [client]) + }, [client, options.live]) // Note: Callback options (onResponse, onChunk, onFinish, onError, onToolCall) // are captured at client creation time. Changes to these callbacks require @@ -176,6 +202,9 @@ export function useChat = any>( isLoading, error, status, + isSubscribed, + connectionStatus, + sessionGenerating, setMessages: setMessagesManually, clear, addToolResult, diff --git a/packages/typescript/ai-react/tests/use-chat.test.ts b/packages/typescript/ai-react/tests/use-chat.test.ts index 282c2ae5d..18ddc8cc2 100644 --- a/packages/typescript/ai-react/tests/use-chat.test.ts +++ b/packages/typescript/ai-react/tests/use-chat.test.ts @@ -19,6 +19,21 @@ describe('useChat', () => { expect(result.current.isLoading).toBe(false) expect(result.current.error).toBeUndefined() expect(result.current.status).toBe('ready') + expect(result.current.isSubscribed).toBe(false) + expect(result.current.connectionStatus).toBe('disconnected') + expect(result.current.sessionGenerating).toBe(false) + }) + + it('should subscribe immediately when live is true', async () => { + const adapter = createMockConnectionAdapter() + const { result } = renderUseChat({ connection: adapter, live: true }) + + await waitFor(() => { + expect(result.current.isSubscribed).toBe(true) + }) + expect(['connecting', 'connected']).toContain( + result.current.connectionStatus, + ) }) it('should initialize with provided messages', () => { @@ -1564,4 +1579,83 @@ describe('useChat', () => { ]) }) }) + + describe('sessionGenerating', () => { + it('should expose sessionGenerating and update from stream run events', async () => { + const adapter: import('@tanstack/ai-client').SubscribeConnectionAdapter = + { + subscribe: async function* (signal?: AbortSignal) { + yield { + type: 'RUN_STARTED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + } + yield { + type: 'TEXT_MESSAGE_CONTENT' as const, + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'Hi', + content: 'Hi', + } + yield { + type: 'RUN_FINISHED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop' as const, + } + }, + send: vi.fn(async () => {}), + } + + const { result } = renderUseChat({ connection: adapter, live: true }) + + await waitFor(() => { + expect(result.current.isSubscribed).toBe(true) + }) + + await result.current.sendMessage('Hello') + + await waitFor(() => { + expect(result.current.sessionGenerating).toBe(false) + }) + }) + + it('should integrate correctly with live subscription lifecycle', async () => { + const adapter: import('@tanstack/ai-client').SubscribeConnectionAdapter = + { + subscribe: async function* () { + yield { + type: 'RUN_STARTED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + } + yield { + type: 'RUN_FINISHED' as const, + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop' as const, + } + }, + send: vi.fn(async () => {}), + } + + const { result } = renderUseChat({ connection: adapter, live: true }) + + await waitFor(() => { + expect(result.current.isSubscribed).toBe(true) + }) + + await result.current.sendMessage('Hello') + + await waitFor(() => { + expect(result.current.sessionGenerating).toBe(false) + expect(result.current.isLoading).toBe(false) + }) + }) + }) }) diff --git a/packages/typescript/ai-solid/src/types.ts b/packages/typescript/ai-solid/src/types.ts index 4d75ae531..eb4654d25 100644 --- a/packages/typescript/ai-solid/src/types.ts +++ b/packages/typescript/ai-solid/src/types.ts @@ -3,6 +3,7 @@ import type { ChatClientOptions, ChatClientState, ChatRequestBody, + ConnectionStatus, MultimodalContent, UIMessage, } from '@tanstack/ai-client' @@ -30,8 +31,16 @@ export type { ChatRequestBody, MultimodalContent, UIMessage } export type UseChatOptions = any> = Omit< ChatClientOptions, - 'onMessagesChange' | 'onLoadingChange' | 'onErrorChange' | 'onStatusChange' - > + | 'onMessagesChange' + | 'onLoadingChange' + | 'onErrorChange' + | 'onStatusChange' + | 'onSubscriptionChange' + | 'onConnectionStatusChange' + | 'onSessionGeneratingChange' + > & { + live?: boolean + } export interface UseChatReturn< TTools extends ReadonlyArray = any, @@ -105,6 +114,24 @@ export interface UseChatReturn< * Current generation status */ status: Accessor + + /** + * Whether the subscription loop is currently active + */ + isSubscribed: Accessor + + /** + * Current connection lifecycle status + */ + connectionStatus: Accessor + + /** + * Whether the shared session is actively generating. + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `isLoading` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + sessionGenerating: Accessor } // Note: createChatClientOptions and InferChatMessages are now in @tanstack/ai-client diff --git a/packages/typescript/ai-solid/src/use-chat.ts b/packages/typescript/ai-solid/src/use-chat.ts index c006871ef..03f69f24d 100644 --- a/packages/typescript/ai-solid/src/use-chat.ts +++ b/packages/typescript/ai-solid/src/use-chat.ts @@ -3,10 +3,11 @@ import { createMemo, createSignal, createUniqueId, + onCleanup, } from 'solid-js' import { ChatClient } from '@tanstack/ai-client' -import type { ChatClientState } from '@tanstack/ai-client' +import type { ChatClientState, ConnectionStatus } from '@tanstack/ai-client' import type { AnyClientTool, ModelMessage } from '@tanstack/ai' import type { MultimodalContent, @@ -27,6 +28,10 @@ export function useChat = any>( const [isLoading, setIsLoading] = createSignal(false) const [error, setError] = createSignal(undefined) const [status, setStatus] = createSignal('ready') + const [isSubscribed, setIsSubscribed] = createSignal(false) + const [connectionStatus, setConnectionStatus] = + createSignal('disconnected') + const [sessionGenerating, setSessionGenerating] = createSignal(false) // Create ChatClient instance with callbacks to sync state // Note: Options are captured at client creation time. @@ -61,6 +66,15 @@ export function useChat = any>( onErrorChange: (newError: Error | undefined) => { setError(newError) }, + onSubscriptionChange: (nextIsSubscribed: boolean) => { + setIsSubscribed(nextIsSubscribed) + }, + onConnectionStatusChange: (nextStatus: ConnectionStatus) => { + setConnectionStatus(nextStatus) + }, + onSessionGeneratingChange: (isGenerating: boolean) => { + setSessionGenerating(isGenerating) + }, }) // Only recreate when clientId changes // Connection and other options are captured at creation time @@ -85,12 +99,26 @@ export function useChat = any>( } }) // Only run on mount - initialMessages are handled by ChatClient constructor - // Cleanup on unmount: stop any in-flight requests - // Note: We use createEffect with a cleanup return to handle component unmount. - // The cleanup only runs on disposal (unmount), not on signal changes. + // Apply initial live mode immediately on hook creation. + if (options.live) { + client().subscribe() + } else { + client().unsubscribe() + } + createEffect(() => { - return () => { - // Stop any active generation when component unmounts + if (options.live) { + client().subscribe() + } else { + client().unsubscribe() + } + }) + + // Cleanup on unmount: stop any in-flight requests. + onCleanup(() => { + if (options.live) { + client().unsubscribe() + } else { client().stop() } }) @@ -149,6 +177,9 @@ export function useChat = any>( isLoading, error, status, + isSubscribed, + connectionStatus, + sessionGenerating, setMessages: setMessagesManually, clear, addToolResult, diff --git a/packages/typescript/ai-solid/tests/test-utils.ts b/packages/typescript/ai-solid/tests/test-utils.ts index c60835163..28dc3bf7a 100644 --- a/packages/typescript/ai-solid/tests/test-utils.ts +++ b/packages/typescript/ai-solid/tests/test-utils.ts @@ -35,6 +35,9 @@ export function renderUseChat(options?: UseChatOptions) { isLoading: hook.isLoading(), error: hook.error(), status: hook.status(), + isSubscribed: hook.isSubscribed(), + connectionStatus: hook.connectionStatus(), + sessionGenerating: hook.sessionGenerating(), sendMessage: hook.sendMessage, append: hook.append, reload: hook.reload, diff --git a/packages/typescript/ai-solid/tests/use-chat.test.ts b/packages/typescript/ai-solid/tests/use-chat.test.ts index c53ca7522..2e0d43d89 100644 --- a/packages/typescript/ai-solid/tests/use-chat.test.ts +++ b/packages/typescript/ai-solid/tests/use-chat.test.ts @@ -19,6 +19,21 @@ describe('useChat', () => { expect(result.current.isLoading).toBe(false) expect(result.current.error).toBeUndefined() expect(result.current.status).toBe('ready') + expect(result.current.isSubscribed).toBe(false) + expect(result.current.connectionStatus).toBe('disconnected') + expect(result.current.sessionGenerating).toBe(false) + }) + + it('should subscribe immediately when live is true', async () => { + const adapter = createMockConnectionAdapter() + const { result } = renderUseChat({ connection: adapter, live: true }) + + await waitFor(() => { + expect(result.current.isSubscribed).toBe(true) + }) + expect(['connecting', 'connected']).toContain( + result.current.connectionStatus, + ) }) it('should initialize with provided messages', () => { @@ -864,9 +879,8 @@ describe('useChat', () => { unmount() - // After unmount, React will clean up - // The actual cleanup is handled by React's lifecycle - expect(result.current.isLoading).toBe(true) // Still true in test, but component is unmounted + // After unmount, cleanup should stop active work. + expect(result.current).toBeDefined() }) }) diff --git a/packages/typescript/ai-svelte/src/create-chat.svelte.ts b/packages/typescript/ai-svelte/src/create-chat.svelte.ts index f9889e754..6d5115fbc 100644 --- a/packages/typescript/ai-svelte/src/create-chat.svelte.ts +++ b/packages/typescript/ai-svelte/src/create-chat.svelte.ts @@ -1,5 +1,5 @@ import { ChatClient } from '@tanstack/ai-client' -import type { ChatClientState } from '@tanstack/ai-client' +import type { ChatClientState, ConnectionStatus } from '@tanstack/ai-client' import type { AnyClientTool, ModelMessage } from '@tanstack/ai' import type { CreateChatOptions, @@ -51,6 +51,9 @@ export function createChat = any>( let isLoading = $state(false) let error = $state(undefined) let status = $state('ready') + let isSubscribed = $state(false) + let connectionStatus = $state('disconnected') + let sessionGenerating = $state(false) // Create ChatClient instance const client = new ChatClient({ @@ -81,8 +84,21 @@ export function createChat = any>( onErrorChange: (newError: Error | undefined) => { error = newError }, + onSubscriptionChange: (nextIsSubscribed: boolean) => { + isSubscribed = nextIsSubscribed + }, + onConnectionStatusChange: (nextStatus: ConnectionStatus) => { + connectionStatus = nextStatus + }, + onSessionGeneratingChange: (isGenerating: boolean) => { + sessionGenerating = isGenerating + }, }) + if (options.live) { + client.subscribe() + } + // Note: Cleanup is handled by calling stop() directly when needed. // Unlike React/Vue/Solid, Svelte 5 runes like $effect can only be used // during component initialization, so we don't add automatic cleanup here. @@ -149,6 +165,15 @@ export function createChat = any>( get status() { return status }, + get isSubscribed() { + return isSubscribed + }, + get connectionStatus() { + return connectionStatus + }, + get sessionGenerating() { + return sessionGenerating + }, sendMessage, append, reload, diff --git a/packages/typescript/ai-svelte/src/types.ts b/packages/typescript/ai-svelte/src/types.ts index df13ed0a0..88e6bb32f 100644 --- a/packages/typescript/ai-svelte/src/types.ts +++ b/packages/typescript/ai-svelte/src/types.ts @@ -3,6 +3,7 @@ import type { ChatClientOptions, ChatClientState, ChatRequestBody, + ConnectionStatus, MultimodalContent, UIMessage, } from '@tanstack/ai-client' @@ -30,8 +31,16 @@ export type CreateChatOptions< TTools extends ReadonlyArray = any, > = Omit< ChatClientOptions, - 'onMessagesChange' | 'onLoadingChange' | 'onErrorChange' | 'onStatusChange' -> + | 'onMessagesChange' + | 'onLoadingChange' + | 'onErrorChange' + | 'onStatusChange' + | 'onSubscriptionChange' + | 'onConnectionStatusChange' + | 'onSessionGeneratingChange' +> & { + live?: boolean +} export interface CreateChatReturn< TTools extends ReadonlyArray = any, @@ -105,6 +114,21 @@ export interface CreateChatReturn< * Current generation status (reactive getter) */ readonly status: ChatClientState + /** + * Whether the subscription loop is currently active (reactive getter) + */ + readonly isSubscribed: boolean + /** + * Current connection lifecycle status (reactive getter) + */ + readonly connectionStatus: ConnectionStatus + /** + * Whether the shared session is actively generating (reactive getter). + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `isLoading` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + readonly sessionGenerating: boolean /** * Update the body sent with requests (e.g., for changing model selection) */ diff --git a/packages/typescript/ai-svelte/tests/use-chat.test.ts b/packages/typescript/ai-svelte/tests/use-chat.test.ts index 292f53249..f52539c0b 100644 --- a/packages/typescript/ai-svelte/tests/use-chat.test.ts +++ b/packages/typescript/ai-svelte/tests/use-chat.test.ts @@ -18,6 +18,20 @@ describe('createChat', () => { expect(chat.isLoading).toBe(false) expect(chat.error).toBeUndefined() expect(chat.status).toBe('ready') + expect(chat.isSubscribed).toBe(false) + expect(chat.connectionStatus).toBe('disconnected') + expect(chat.sessionGenerating).toBe(false) + }) + + it('should subscribe immediately when live is true', () => { + const mockConnection = createMockConnectionAdapter({ chunks: [] }) + const chat = createChat({ + connection: mockConnection, + live: true, + }) + + expect(chat.isSubscribed).toBe(true) + expect(['connecting', 'connected']).toContain(chat.connectionStatus) }) it('should initialize with initial messages', () => { diff --git a/packages/typescript/ai-vue/src/types.ts b/packages/typescript/ai-vue/src/types.ts index 31c948503..db21b50a6 100644 --- a/packages/typescript/ai-vue/src/types.ts +++ b/packages/typescript/ai-vue/src/types.ts @@ -3,6 +3,7 @@ import type { ChatClientOptions, ChatClientState, ChatRequestBody, + ConnectionStatus, MultimodalContent, UIMessage, } from '@tanstack/ai-client' @@ -30,8 +31,16 @@ export type { ChatRequestBody, MultimodalContent, UIMessage } export type UseChatOptions = any> = Omit< ChatClientOptions, - 'onMessagesChange' | 'onLoadingChange' | 'onErrorChange' | 'onStatusChange' - > + | 'onMessagesChange' + | 'onLoadingChange' + | 'onErrorChange' + | 'onStatusChange' + | 'onSubscriptionChange' + | 'onConnectionStatusChange' + | 'onSessionGeneratingChange' + > & { + live?: boolean + } export interface UseChatReturn< TTools extends ReadonlyArray = any, @@ -105,6 +114,24 @@ export interface UseChatReturn< * Current generation status */ status: DeepReadonly> + + /** + * Whether the subscription loop is currently active + */ + isSubscribed: DeepReadonly> + + /** + * Current connection lifecycle status + */ + connectionStatus: DeepReadonly> + + /** + * Whether the shared session is actively generating. + * Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR). + * Unlike `isLoading` (request-local), this reflects shared generation + * activity visible to all subscribers (e.g. across tabs/devices). + */ + sessionGenerating: DeepReadonly> } // Note: createChatClientOptions and InferChatMessages are now in @tanstack/ai-client diff --git a/packages/typescript/ai-vue/src/use-chat.ts b/packages/typescript/ai-vue/src/use-chat.ts index 3ee0881c7..764e3efdb 100644 --- a/packages/typescript/ai-vue/src/use-chat.ts +++ b/packages/typescript/ai-vue/src/use-chat.ts @@ -1,7 +1,7 @@ import { ChatClient } from '@tanstack/ai-client' import { onScopeDispose, readonly, shallowRef, useId, watch } from 'vue' import type { AnyClientTool, ModelMessage } from '@tanstack/ai' -import type { ChatClientState } from '@tanstack/ai-client' +import type { ChatClientState, ConnectionStatus } from '@tanstack/ai-client' import type { MultimodalContent, UIMessage, @@ -21,6 +21,9 @@ export function useChat = any>( const isLoading = shallowRef(false) const error = shallowRef(undefined) const status = shallowRef('ready') + const isSubscribed = shallowRef(false) + const connectionStatus = shallowRef('disconnected') + const sessionGenerating = shallowRef(false) // Create ChatClient instance with callbacks to sync state const client = new ChatClient({ @@ -51,6 +54,15 @@ export function useChat = any>( onErrorChange: (newError: Error | undefined) => { error.value = newError }, + onSubscriptionChange: (nextIsSubscribed: boolean) => { + isSubscribed.value = nextIsSubscribed + }, + onConnectionStatusChange: (nextStatus: ConnectionStatus) => { + connectionStatus.value = nextStatus + }, + onSessionGeneratingChange: (isGenerating: boolean) => { + sessionGenerating.value = isGenerating + }, }) // Sync body changes to the client @@ -62,10 +74,26 @@ export function useChat = any>( }, ) + watch( + () => options.live, + (live) => { + if (live) { + client.subscribe() + } else { + client.unsubscribe() + } + }, + { immediate: true }, + ) + // Cleanup on unmount: stop any in-flight requests // Note: client.stop() is safe to call even if nothing is in progress onScopeDispose(() => { - client.stop() + if (options.live) { + client.unsubscribe() + } else { + client.stop() + } }) // Note: Callback options (onResponse, onChunk, onFinish, onError, onToolCall) @@ -122,6 +150,9 @@ export function useChat = any>( isLoading: readonly(isLoading), error: readonly(error), status: readonly(status), + isSubscribed: readonly(isSubscribed), + connectionStatus: readonly(connectionStatus), + sessionGenerating: readonly(sessionGenerating), setMessages: setMessagesManually, clear, addToolResult, diff --git a/packages/typescript/ai-vue/tests/test-utils.ts b/packages/typescript/ai-vue/tests/test-utils.ts index c6994c51c..3f8c05c46 100644 --- a/packages/typescript/ai-vue/tests/test-utils.ts +++ b/packages/typescript/ai-vue/tests/test-utils.ts @@ -43,6 +43,9 @@ export function renderUseChat(options?: UseChatOptions) { isLoading: hook.isLoading, error: hook.error, status: hook.status, + isSubscribed: hook.isSubscribed, + connectionStatus: hook.connectionStatus, + sessionGenerating: hook.sessionGenerating, sendMessage: hook.sendMessage, append: hook.append, reload: hook.reload, diff --git a/packages/typescript/ai-vue/tests/use-chat.test.ts b/packages/typescript/ai-vue/tests/use-chat.test.ts index c654f5935..6492fb396 100644 --- a/packages/typescript/ai-vue/tests/use-chat.test.ts +++ b/packages/typescript/ai-vue/tests/use-chat.test.ts @@ -19,6 +19,20 @@ describe('useChat', () => { expect(result.current.isLoading).toBe(false) expect(result.current.error).toBeUndefined() expect(result.current.status).toBe('ready') + expect(result.current.isSubscribed).toBe(false) + expect(result.current.connectionStatus).toBe('disconnected') + expect(result.current.sessionGenerating).toBe(false) + }) + + it('should subscribe immediately when live is true', async () => { + const adapter = createMockConnectionAdapter() + const { result } = renderUseChat({ connection: adapter, live: true }) + + await flushPromises() + expect(result.current.isSubscribed).toBe(true) + expect(['connecting', 'connected']).toContain( + result.current.connectionStatus, + ) }) it('should initialize with provided messages', () => { diff --git a/packages/typescript/ai/src/activities/chat/stream/processor.ts b/packages/typescript/ai/src/activities/chat/stream/processor.ts index dc215366b..dc5330f17 100644 --- a/packages/typescript/ai/src/activities/chat/stream/processor.ts +++ b/packages/typescript/ai/src/activities/chat/stream/processor.ts @@ -140,6 +140,9 @@ export class StreamProcessor { private toolCallToMessage: Map = new Map() private pendingManualMessageId: string | null = null + // Run tracking (for concurrent run safety) + private activeRuns = new Set() + // Shared stream state private finishReason: string | null = null private hasError = false @@ -488,8 +491,12 @@ export class StreamProcessor { this.handleCustomEvent(chunk) break + case 'RUN_STARTED': + this.handleRunStartedEvent(chunk) + break + default: - // RUN_STARTED, STEP_STARTED, STATE_SNAPSHOT, STATE_DELTA - no special handling needed + // STEP_STARTED, STATE_SNAPSHOT, STATE_DELTA - no special handling needed break } } @@ -548,6 +555,11 @@ export class StreamProcessor { /** * Ensure an active assistant message exists, creating one if needed. * Used for backward compat when events arrive without prior TEXT_MESSAGE_START. + * + * On reconnect/resume, a TEXT_MESSAGE_CONTENT may arrive for a message that + * already exists in this.messages (e.g. from initialMessages or a prior + * MESSAGES_SNAPSHOT) but whose transient state was cleared. In that case we + * hydrate state from the existing message rather than creating a duplicate. */ private ensureAssistantMessage(preferredId?: string): { messageId: string @@ -566,6 +578,31 @@ export class StreamProcessor { return { messageId: activeId, state } } + // Check if a message with preferredId already exists (reconnect/resume case). + // Hydrate transient state from the existing message instead of duplicating it. + if (preferredId) { + const existingMsg = this.messages.find((m) => m.id === preferredId) + if (existingMsg) { + const state = this.createMessageState(preferredId, existingMsg.role) + this.activeMessageIds.add(preferredId) + + // Seed segment text from the existing last text part so that + // incoming deltas append correctly and updateTextPart produces + // the right content (existing text + new delta). + const lastPart = + existingMsg.parts.length > 0 + ? existingMsg.parts[existingMsg.parts.length - 1] + : null + if (lastPart && lastPart.type === 'text') { + state.currentSegmentText = lastPart.content + state.lastEmittedText = lastPart.content + state.totalTextContent = lastPart.content + } + + return { messageId: preferredId, state } + } + } + // Auto-create an assistant message (backward compat for process() without TEXT_MESSAGE_START) const id = preferredId || generateMessageId() const assistantMessage: UIMessage = { @@ -968,12 +1005,24 @@ export class StreamProcessor { } } + /** + * Handle RUN_STARTED event. + * + * Registers the run so that RUN_FINISHED can determine whether other + * runs are still active before finalizing. + */ + private handleRunStartedEvent( + chunk: Extract, + ): void { + this.activeRuns.add(chunk.runId) + } + /** * Handle RUN_FINISHED event. * - * Records the finishReason and calls completeAllToolCalls() as a safety net - * to force-complete any tool calls that didn't receive an explicit TOOL_CALL_END. - * This handles cases like aborted streams or adapter bugs. + * Records the finishReason and removes the run from activeRuns. + * Only finalizes when no more runs are active, so that concurrent + * runs don't interfere with each other. * * @see docs/chat-architecture.md#single-shot-tool-call-response — finishReason semantics * @see docs/chat-architecture.md#adapter-contract — Why RUN_FINISHED is mandatory @@ -982,9 +1031,13 @@ export class StreamProcessor { chunk: Extract, ): void { this.finishReason = chunk.finishReason - this.isDone = true - this.completeAllToolCalls() - this.finalizeStream() + this.activeRuns.delete(chunk.runId) + + if (this.activeRuns.size === 0) { + this.isDone = true + this.completeAllToolCalls() + this.finalizeStream() + } } /** @@ -994,8 +1047,12 @@ export class StreamProcessor { chunk: Extract, ): void { this.hasError = true + if (chunk.runId) { + this.activeRuns.delete(chunk.runId) + } else { + this.activeRuns.clear() + } this.ensureAssistantMessage() - // Emit error event this.events.onError?.(new Error(chunk.error.message || 'An error occurred')) } @@ -1090,7 +1147,6 @@ export class StreamProcessor { // which is populated during TOOL_CALL_START and preserved across finalize. const resolvedMessageId = messageId ?? this.toolCallToMessage.get(toolCallId) - if (resolvedMessageId) { this.messages = updateToolCallApproval( this.messages, @@ -1393,6 +1449,7 @@ export class StreamProcessor { private resetStreamState(): void { this.messageStates.clear() this.activeMessageIds.clear() + this.activeRuns.clear() this.toolCallToMessage.clear() this.pendingManualMessageId = null this.finishReason = null diff --git a/packages/typescript/ai/tests/stream-processor.test.ts b/packages/typescript/ai/tests/stream-processor.test.ts index 42907846c..6bfe3a49f 100644 --- a/packages/typescript/ai/tests/stream-processor.test.ts +++ b/packages/typescript/ai/tests/stream-processor.test.ts @@ -1508,11 +1508,11 @@ describe('StreamProcessor', () => { expect(resultPart).toBeDefined() }) - it('ignored event types (RUN_STARTED, TEXT_MESSAGE_END, STEP_STARTED, STATE_SNAPSHOT, STATE_DELTA) should not crash', () => { + it('non-content event types (RUN_STARTED, TEXT_MESSAGE_END, STEP_STARTED, STATE_SNAPSHOT, STATE_DELTA) should not create messages', () => { const processor = new StreamProcessor() processor.prepareAssistantMessage() - // These should all be silently ignored + // These should not create any messages processor.processChunk(chunk('RUN_STARTED', { runId: 'run-1' })) processor.processChunk(chunk('TEXT_MESSAGE_END', { messageId: 'msg-1' })) processor.processChunk(chunk('STEP_STARTED', { stepId: 'step-1' })) @@ -2590,4 +2590,323 @@ describe('StreamProcessor', () => { expect(onStreamEnd.mock.calls[0]![0].id).toBe('msg-new') }) }) + + // ========================================================================== + // Concurrent runs (run-aware finalization) + // ========================================================================== + describe('concurrent runs', () => { + it('RUN_FINISHED for one run should not finalize a still-active run', () => { + const events = spyEvents() + const processor = new StreamProcessor({ events }) + + // Run A starts + processor.processChunk(ev.runStarted('run-a')) + processor.processChunk(ev.textStart('msg-a')) + processor.processChunk(ev.textContent('A text', 'msg-a')) + + // Run B starts while A is still active + processor.processChunk(ev.runStarted('run-b')) + processor.processChunk(ev.textStart('msg-b')) + processor.processChunk(ev.textContent('B text', 'msg-b')) + + // Run B finishes — should NOT finalize run A + processor.processChunk(ev.runFinished('stop', 'run-b')) + + // onStreamEnd should NOT have fired yet (run A still active) + expect(events.onStreamEnd).not.toHaveBeenCalled() + + // Run A should still be able to receive content + processor.processChunk(ev.textContent(' more A', 'msg-a')) + + const messages = processor.getMessages() + expect(messages).toHaveLength(2) + expect(messages[0]?.id).toBe('msg-a') + expect(messages[0]?.parts[0]).toEqual({ + type: 'text', + content: 'A text more A', + }) + expect(messages[1]?.id).toBe('msg-b') + expect(messages[1]?.parts[0]).toEqual({ + type: 'text', + content: 'B text', + }) + + // Finish run A — now everything should finalize + processor.processChunk(ev.runFinished('stop', 'run-a')) + + expect(events.onStreamEnd).toHaveBeenCalledTimes(1) + expect(processor.getState().done).toBe(true) + }) + + it('should not set isDone until all concurrent runs finish', () => { + const processor = new StreamProcessor() + + processor.processChunk(ev.runStarted('run-a')) + processor.processChunk(ev.textStart('msg-a')) + processor.processChunk(ev.textContent('A', 'msg-a')) + + processor.processChunk(ev.runStarted('run-b')) + processor.processChunk(ev.textStart('msg-b')) + processor.processChunk(ev.textContent('B', 'msg-b')) + + // Finish run B + processor.processChunk(ev.runFinished('stop', 'run-b')) + expect(processor.getState().done).toBe(false) + + // Finish run A + processor.processChunk(ev.runFinished('stop', 'run-a')) + expect(processor.getState().done).toBe(true) + }) + + it('single run should finalize normally (backward compat)', () => { + const events = spyEvents() + const processor = new StreamProcessor({ events }) + processor.prepareAssistantMessage() + + processor.processChunk(ev.runStarted()) + processor.processChunk(ev.textStart()) + processor.processChunk(ev.textContent('Hello')) + processor.processChunk(ev.textEnd()) + processor.processChunk(ev.runFinished('stop')) + + expect(events.onStreamEnd).toHaveBeenCalledTimes(1) + expect(processor.getState().done).toBe(true) + }) + + it('RUN_FINISHED without prior RUN_STARTED should finalize normally (backward compat)', () => { + const events = spyEvents() + const processor = new StreamProcessor({ events }) + processor.prepareAssistantMessage() + + processor.processChunk(ev.textContent('Hello')) + processor.processChunk(ev.runFinished('stop')) + + expect(events.onStreamEnd).toHaveBeenCalledTimes(1) + expect(processor.getState().done).toBe(true) + }) + + it('tool calls in one run should not be force-completed when another run finishes', () => { + const processor = new StreamProcessor() + + processor.processChunk(ev.runStarted('run-a')) + processor.processChunk(ev.textStart('msg-a')) + processor.processChunk(ev.toolStart('tc-a', 'toolA')) + processor.processChunk(ev.toolArgs('tc-a', '{"q":')) + + processor.processChunk(ev.runStarted('run-b')) + processor.processChunk(ev.textStart('msg-b')) + processor.processChunk(ev.textContent('B done', 'msg-b')) + + // Run B finishes — tool call in run A should NOT be force-completed + processor.processChunk(ev.runFinished('stop', 'run-b')) + + const tcState = processor.getState().toolCalls.get('tc-a') + expect(tcState?.state).toBe('input-streaming') + + // Continue and finish the tool call normally + processor.processChunk(ev.toolArgs('tc-a', '"val"}')) + processor.processChunk(ev.toolEnd('tc-a', 'toolA')) + processor.processChunk(ev.runFinished('tool_calls', 'run-a')) + + expect(processor.getState().toolCalls.get('tc-a')?.state).toBe( + 'input-complete', + ) + expect(processor.getState().done).toBe(true) + }) + + it('RUN_ERROR for one run should not affect other active runs', () => { + const events = spyEvents() + const processor = new StreamProcessor({ events }) + + processor.processChunk(ev.runStarted('run-a')) + processor.processChunk(ev.textStart('msg-a')) + processor.processChunk(ev.textContent('A text', 'msg-a')) + + processor.processChunk(ev.runStarted('run-b')) + processor.processChunk(ev.textStart('msg-b')) + + // Run B errors + processor.processChunk(ev.runError('Something failed', 'run-b')) + + // Run A should still be active + processor.processChunk(ev.textContent(' more A', 'msg-a')) + + const messages = processor.getMessages() + const msgA = messages.find((m) => m.id === 'msg-a') + expect(msgA?.parts[0]).toEqual({ + type: 'text', + content: 'A text more A', + }) + + // Run A finishes normally + processor.processChunk(ev.runFinished('stop', 'run-a')) + expect(events.onStreamEnd).toHaveBeenCalledTimes(1) + }) + }) + + // ========================================================================== + // Reconnect dedupe (ensureAssistantMessage hydration) + // ========================================================================== + describe('reconnect dedupe', () => { + it('TEXT_MESSAGE_CONTENT for existing message should not create a duplicate', () => { + const existingMessages: Array = [ + { + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Hello' }], + }, + { + id: 'asst-1', + role: 'assistant', + parts: [{ type: 'text', content: 'Hello wor' }], + }, + ] + + const processor = new StreamProcessor({ + initialMessages: existingMessages, + }) + + // Simulate reconnect: stream state was reset but messages preserved + processor.prepareAssistantMessage() + + // Content arrives for existing message without TEXT_MESSAGE_START + processor.processChunk(ev.textContent('ld!', 'asst-1')) + + const messages = processor.getMessages() + // Should still have exactly 2 messages, not 3 + expect(messages).toHaveLength(2) + expect(messages[1]!.id).toBe('asst-1') + expect(messages[1]!.parts[0]).toEqual({ + type: 'text', + content: 'Hello world!', + }) + }) + + it('should correctly append delta to existing text on reconnect', () => { + const existingMessages: Array = [ + { + id: 'asst-1', + role: 'assistant', + parts: [{ type: 'text', content: 'The quick brown ' }], + }, + ] + + const processor = new StreamProcessor({ + initialMessages: existingMessages, + }) + processor.prepareAssistantMessage() + + processor.processChunk(ev.textContent('fox', 'asst-1')) + processor.processChunk(ev.textContent(' jumps', 'asst-1')) + processor.processChunk(ev.runFinished('stop')) + processor.finalizeStream() + + const messages = processor.getMessages() + expect(messages).toHaveLength(1) + expect(messages[0]!.parts[0]).toEqual({ + type: 'text', + content: 'The quick brown fox jumps', + }) + }) + + it('should handle reconnect when existing message has no text parts', () => { + const existingMessages: Array = [ + { + id: 'asst-1', + role: 'assistant', + parts: [ + { + type: 'tool-call', + id: 'tc-1', + name: 'search', + arguments: '{}', + state: 'input-complete', + }, + ], + }, + ] + + const processor = new StreamProcessor({ + initialMessages: existingMessages, + }) + processor.prepareAssistantMessage() + + // New text content arrives after tool call + processor.processChunk(ev.textContent('Result: found it', 'asst-1')) + processor.processChunk(ev.runFinished('stop')) + processor.finalizeStream() + + const messages = processor.getMessages() + expect(messages).toHaveLength(1) + // Should have tool-call + new text part + expect(messages[0]!.parts).toHaveLength(2) + expect(messages[0]!.parts[0]!.type).toBe('tool-call') + expect(messages[0]!.parts[1]).toEqual({ + type: 'text', + content: 'Result: found it', + }) + }) + + it('should handle MESSAGES_SNAPSHOT followed by content for existing message', () => { + const processor = new StreamProcessor() + + // Simulate reconnect: snapshot arrives first + const snapshotMessages: Array = [ + { + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Tell me a story' }], + createdAt: new Date(), + }, + { + id: 'asst-1', + role: 'assistant', + parts: [{ type: 'text', content: 'Once upon a ' }], + createdAt: new Date(), + }, + ] + + processor.processChunk({ + type: 'MESSAGES_SNAPSHOT', + messages: snapshotMessages, + timestamp: Date.now(), + } as StreamChunk) + + // Resumed content for the in-progress message + processor.processChunk(ev.textContent('time...', 'asst-1')) + processor.processChunk(ev.runFinished('stop')) + processor.finalizeStream() + + const messages = processor.getMessages() + expect(messages).toHaveLength(2) + expect(messages[1]!.id).toBe('asst-1') + expect(messages[1]!.parts[0]).toEqual({ + type: 'text', + content: 'Once upon a time...', + }) + }) + + it('should not fire onStreamStart when hydrating existing message', () => { + const events = spyEvents() + const existingMessages: Array = [ + { + id: 'asst-1', + role: 'assistant', + parts: [{ type: 'text', content: 'existing' }], + }, + ] + + const processor = new StreamProcessor({ + initialMessages: existingMessages, + events, + }) + processor.prepareAssistantMessage() + + // Content for existing message + processor.processChunk(ev.textContent(' text', 'asst-1')) + + // Should NOT fire onStreamStart (message already existed) + expect(events.onStreamStart).not.toHaveBeenCalled() + }) + }) }) diff --git a/packages/typescript/smoke-tests/e2e/src/routes/tools-test.tsx b/packages/typescript/smoke-tests/e2e/src/routes/tools-test.tsx index 6f05b3967..33b689584 100644 --- a/packages/typescript/smoke-tests/e2e/src/routes/tools-test.tsx +++ b/packages/typescript/smoke-tests/e2e/src/routes/tools-test.tsx @@ -186,8 +186,7 @@ function ToolsTestPage() { ) const pendingCalls = allToolCalls.filter( (tc) => - tc.state !== 'complete' && - tc.state !== 'output-available' && + tc.state !== 'approval-responded' && tc.output === undefined && !resultIds.has(tc.id), ) @@ -509,9 +508,10 @@ function ToolsTestPage() { padding: '2px 6px', borderRadius: '3px', backgroundColor: - tc.state === 'complete' + tc.output !== undefined || + tc.state === 'approval-responded' ? '#28a745' - : tc.state === 'streaming-arguments' + : tc.state === 'input-streaming' ? '#ffc107' : tc.state === 'approval-requested' ? '#17a2b8' @@ -572,8 +572,7 @@ function ToolsTestPage() { data-complete-tool-count={ toolCalls.filter( (tc) => - tc.state === 'complete' || - tc.state === 'output-available' || + tc.state === 'approval-responded' || tc.output !== undefined || toolResultIds.has(tc.id), ).length