Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f0b81bb
feat(ai): align start event types with AG-UI
thruflo Feb 10, 2026
30d00e1
feat(ai): add MessageStreamState type for per-message stream tracking
thruflo Feb 10, 2026
f03493b
feat(ai): refactor StreamProcessor to per-message state
thruflo Feb 10, 2026
f036f55
feat(ai): replace STATE_SNAPSHOT with MESSAGES_SNAPSHOT event
thruflo Feb 10, 2026
3f1ccb5
feat(ai-client): add SessionAdapter interface and createDefaultSession
thruflo Feb 10, 2026
202244a
feat(ai-client): refactor ChatClient to use SessionAdapter subscripti…
thruflo Feb 10, 2026
d4cc2b1
fix(ai-preact): thread option through.
thruflo Feb 10, 2026
f45dd77
fix(ai): finalizeStream when RUN_FINISHED.
thruflo Feb 10, 2026
6d1c733
fix(ai-client): handle reload during active stream with generation co…
thruflo Feb 10, 2026
a217426
docs: remove proposal docs.
thruflo Feb 10, 2026
fd1c50c
fix(ai, ai-client): address stream lifecycle edge cases from PR review
thruflo Feb 10, 2026
8c628ee
fix(ai-client): fix reload failures from stale stream state and waite…
thruflo Feb 11, 2026
36d6a93
ci: apply automated fixes
autofix-ci[bot] Feb 11, 2026
2abc6c7
fix(ai): resolve eslint errors in stream processor
thruflo Feb 11, 2026
c5e1aa3
fix(ai-client): resolve eslint errors in chat-client and session-adapter
thruflo Feb 11, 2026
537b73c
fix(ai-client): propagate send() errors to subscribe() consumers
thruflo Feb 11, 2026
fc52ef7
fix(ai): map 'tool' role to 'assistant' in message state to fix lookups
thruflo Feb 11, 2026
ed1cddb
fix(ai): normalize chunk.delta to avoid "undefined" string concatenation
thruflo Feb 12, 2026
fd7c226
fix(ai): use || instead of ?? for chunk.delta fallback to satisfy eslint
thruflo Feb 12, 2026
64f5517
fix(ai): reset stream flags on MESSAGES_SNAPSHOT to avoid stale state
thruflo Feb 12, 2026
7155e87
refactor(ai-client): finalize connection adapter unification
samwillis Feb 18, 2026
1bcfdfe
Combine the SessionAdapter with the ConnectionAdapter
samwillis Feb 19, 2026
04e3e70
Merge branch 'main' into thruflo/durable-session-support
jherr Feb 23, 2026
fe307a1
ci: apply automated fixes
autofix-ci[bot] Feb 23, 2026
4e3a6cd
Merge remote-tracking branch 'origin/main' into thruflo/durable-sessi…
samwillis Mar 9, 2026
620ab8e
fix: return booleans from chat client streamResponse
samwillis Mar 9, 2026
098e07e
Merge remote-tracking branch 'origin/main' into thruflo/durable-sessi…
samwillis Mar 13, 2026
3b1c3b1
chore: add changeset for durable chat updates
samwillis Mar 13, 2026
3d858c1
feat: add sessionGenerating lifecycle for shared generation activity
samwillis Mar 3, 2026
ac7905e
fix: make stream finalization run-aware and fix reconnect message dedup
samwillis Mar 4, 2026
78bcfcb
fix: clean up framework client type exports
samwillis Mar 9, 2026
16f983c
chore: add changeset for subscription lifecycle updates
samwillis Mar 13, 2026
0503d13
Merge branch 'main' into pr-356-sub-lifecycle
jherr Mar 14, 2026
7d4147d
Merge remote-tracking branch 'origin/main' into pr-356-sub-lifecycle
jherr Mar 15, 2026
da5bfc3
ci: apply automated fixes
autofix-ci[bot] Mar 15, 2026
4abfbfd
fix: resolve approval lookup after RUN_FINISHED and update smoke test…
jherr Mar 15, 2026
20d7a9d
fix: treat input-complete as non-terminal in smoke test tool tracking
jherr Mar 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/fresh-donkeys-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@tanstack/ai': patch
'@tanstack/ai-client': patch
'@tanstack/ai-preact': patch
'@tanstack/ai-react': patch
'@tanstack/ai-solid': patch
'@tanstack/ai-svelte': patch
'@tanstack/ai-vue': patch
---

Add an explicit subscription lifecycle to `ChatClient` with `subscribe()`/`unsubscribe()`, `isSubscribed`, `connectionStatus`, and `sessionGenerating`, while keeping request lifecycle state separate from long-lived connection state for durable chat sessions.

Update the React, Preact, Solid, Svelte, and Vue chat bindings with `live` mode plus reactive subscription/session state, and improve `StreamProcessor` handling for concurrent runs and reconnects so active sessions do not finalize early or duplicate resumed assistant messages.
194 changes: 180 additions & 14 deletions packages/typescript/ai-client/src/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type { ChatClientEventEmitter } from './events'
import type {
ChatClientOptions,
ChatClientState,
ConnectionStatus,
MessagePart,
MultimodalContent,
ToolCallPart,
Expand All @@ -32,8 +33,10 @@ export class ChatClient {
private body: Record<string, any> = {}
private pendingMessageBody: Record<string, any> | undefined = undefined
private isLoading = false
private isSubscribed = false
private error: Error | undefined = undefined
private status: ChatClientState = 'ready'
private connectionStatus: ConnectionStatus = 'disconnected'
private abortController: AbortController | null = null
private events: ChatClientEventEmitter
private clientToolsRef: { current: Map<string, AnyClientTool> }
Expand All @@ -51,6 +54,8 @@ export class ChatClient {
// Tracks whether a queued checkForContinuation was skipped because
// continuationPending was true (chained approval scenario)
private continuationSkipped = false
private sessionGenerating = false
private activeRunIds = new Set<string>()

private callbacksRef: {
current: {
Expand All @@ -62,6 +67,9 @@ export class ChatClient {
onLoadingChange: (isLoading: boolean) => void
onErrorChange: (error: Error | undefined) => void
onStatusChange: (status: ChatClientState) => void
onSubscriptionChange: (isSubscribed: boolean) => void
onConnectionStatusChange: (status: ConnectionStatus) => void
onSessionGeneratingChange: (isGenerating: boolean) => void
onCustomEvent: (
eventType: string,
data: unknown,
Expand Down Expand Up @@ -94,6 +102,11 @@ export class ChatClient {
onLoadingChange: options.onLoadingChange || (() => {}),
onErrorChange: options.onErrorChange || (() => {}),
onStatusChange: options.onStatusChange || (() => {}),
onSubscriptionChange: options.onSubscriptionChange || (() => {}),
onConnectionStatusChange:
options.onConnectionStatusChange || (() => {}),
onSessionGeneratingChange:
options.onSessionGeneratingChange || (() => {}),
onCustomEvent: options.onCustomEvent || (() => {}),
},
}
Expand Down Expand Up @@ -259,6 +272,27 @@ export class ChatClient {
this.callbacksRef.current.onStatusChange(status)
}

private setIsSubscribed(isSubscribed: boolean): void {
this.isSubscribed = isSubscribed
this.callbacksRef.current.onSubscriptionChange(isSubscribed)
}

private setConnectionStatus(status: ConnectionStatus): void {
this.connectionStatus = status
this.callbacksRef.current.onConnectionStatusChange(status)
}

private setSessionGenerating(isGenerating: boolean): void {
if (this.sessionGenerating === isGenerating) return
this.sessionGenerating = isGenerating
this.callbacksRef.current.onSessionGeneratingChange(isGenerating)
}

private resetSessionGenerating(): void {
this.activeRunIds.clear()
this.setSessionGenerating(false)
}

private setError(error: Error | undefined): void {
this.error = error
this.callbacksRef.current.onErrorChange(error)
Expand All @@ -275,10 +309,15 @@ export class ChatClient {
this.processingResolve = null
}

private cancelInFlightStream(options?: { setReadyStatus?: boolean }): void {
private cancelInFlightStream(options?: {
setReadyStatus?: boolean
abortSubscription?: boolean
}): void {
this.abortController?.abort()
this.abortController = null
this.abortSubscriptionLoop()
if (options?.abortSubscription) {
this.abortSubscriptionLoop()
}
this.resolveProcessing()
this.setIsLoading(false)
if (options?.setReadyStatus) {
Expand All @@ -290,7 +329,15 @@ export class ChatClient {
const alreadyReported =
this.errorReportedGeneration === this.streamGeneration
this.setError(error)
this.setStatus('error')
// Preserve request-level error semantics even if a RUN_ERROR arrives
// slightly after loading flips false during stream teardown.
if (
this.isLoading ||
this.status === 'submitted' ||
this.status === 'streaming'
) {
this.setStatus('error')
}
if (!alreadyReported) {
this.errorReportedGeneration = this.streamGeneration
this.callbacksRef.current.onError(error)
Expand All @@ -304,13 +351,30 @@ export class ChatClient {
this.subscriptionAbortController = new AbortController()
const signal = this.subscriptionAbortController.signal

this.consumeSubscription(signal).catch((err) => {
if (err instanceof Error && err.name !== 'AbortError') {
this.reportStreamError(err)
}
// Resolve pending processing so streamResponse doesn't hang
this.resolveProcessing()
})
this.consumeSubscription(signal)
.catch((err) => {
if (err instanceof Error && err.name !== 'AbortError') {
this.setConnectionStatus('error')
this.resetSessionGenerating()
this.setIsSubscribed(false)
this.reportStreamError(err)
}
// Resolve pending processing so streamResponse doesn't hang
this.resolveProcessing()
})
.finally(() => {
// Ignore stale loops that were superseded by a restart.
if (this.subscriptionAbortController?.signal !== signal) {
return
}
this.subscriptionAbortController = null
if (!signal.aborted && this.isSubscribed) {
this.setIsSubscribed(false)
if (this.connectionStatus !== 'error') {
this.setConnectionStatus('disconnected')
}
}
})
}

/**
Expand All @@ -320,11 +384,25 @@ export class ChatClient {
const stream = this.connection.subscribe(signal)
for await (const chunk of stream) {
if (signal.aborted) break
if (this.connectionStatus === 'connecting') {
this.setConnectionStatus('connected')
}
this.callbacksRef.current.onChunk(chunk)
this.processor.processChunk(chunk)
if (chunk.type === 'RUN_STARTED') {
this.activeRunIds.add(chunk.runId)
this.setSessionGenerating(true)
}
// RUN_FINISHED / RUN_ERROR signal run completion — resolve processing
// (redundant if onStreamEnd already resolved it, harmless)
if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') {
if (chunk.runId) {
this.activeRunIds.delete(chunk.runId)
} else if (chunk.type === 'RUN_ERROR') {
// RUN_ERROR without runId is a session-level error; clear all runs
this.activeRunIds.clear()
}
this.setSessionGenerating(this.activeRunIds.size > 0)
this.resolveProcessing()
Comment on lines +398 to +406
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't resolve request completion from unrelated runs.

processingComplete is request-local, but this branch resolves it for every terminal chunk seen on the shared subscription. If another run in the same durable session finishes before the run started by this send(), await sendMessage() can return early and the completion path will run against partial local output. This needs request→run correlation before calling resolveProcessing().

Also applies to: 599-610

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/typescript/ai-client/src/chat-client.ts` around lines 394 - 402, The
code is resolving request-level completion (resolveProcessing) for any terminal
chunk on a shared subscription; fix by correlating terminal chunk runIds to the
specific send() request before calling resolveProcessing: when send() starts,
record the request's runId(s) (e.g., a per-request set like requestRunIds stored
on the ChatClient instance or returned by the server) and then in the
RUN_FINISHED / RUN_ERROR branch only delete from this.activeRunIds and call
this.resolveProcessing() if chunk.runId is present and matches one of the runIds
for the current request (or, for session-level RUN_ERROR with no runId, only
resolve if this request created no runId and is the originator), otherwise skip
calling resolveProcessing; use existing symbols activeRunIds,
setSessionGenerating, resolveProcessing, chunk.runId and send() to locate where
to add the request→run correlation check.

}
// Yield control back to event loop for UI updates
Expand All @@ -336,11 +414,15 @@ export class ChatClient {
* Ensure subscription loop is running, starting it if needed.
*/
private ensureSubscription(): void {
if (!this.isSubscribed) {
this.subscribe()
return
}
if (
!this.subscriptionAbortController ||
this.subscriptionAbortController.signal.aborted
) {
this.startSubscription()
this.subscribe({ restart: true })
}
}

Expand Down Expand Up @@ -593,6 +675,39 @@ export class ChatClient {
return streamCompletedSuccessfully
}

/**
* Start the client subscription loop.
* This controls the connection lifecycle independently from request lifecycle.
*/
subscribe(options?: { restart?: boolean }): void {
const restart = options?.restart === true
if (this.isSubscribed && !restart) {
return
}

if (this.isSubscribed && restart) {
this.abortSubscriptionLoop()
}

this.setIsSubscribed(true)
this.setConnectionStatus('connecting')
this.startSubscription()
}

/**
* Unsubscribe and fully tear down live behavior.
* This aborts an in-flight request and the subscription loop.
*/
unsubscribe(): void {
this.cancelInFlightStream({
setReadyStatus: true,
abortSubscription: true,
})
this.resetSessionGenerating()
this.setIsSubscribed(false)
this.setConnectionStatus('disconnected')
}

/**
* Reload the last assistant message
*/
Expand Down Expand Up @@ -789,6 +904,30 @@ export class ChatClient {
return this.status
}

/**
* Get whether the subscription loop is active
*/
getIsSubscribed(): boolean {
return this.isSubscribed
}

/**
* Get current connection lifecycle status
*/
getConnectionStatus(): ConnectionStatus {
return this.connectionStatus
}

/**
* Whether the shared session is actively generating.
* Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR).
* Unlike `isLoading` (request-local), this reflects shared generation
* activity visible to all subscribers (e.g. across tabs/devices).
*/
getSessionGenerating(): boolean {
return this.sessionGenerating
}

/**
* Get current error
*/
Expand All @@ -814,20 +953,35 @@ export class ChatClient {
onChunk?: (chunk: StreamChunk) => void
onFinish?: (message: UIMessage) => void
onError?: (error: Error) => void
onSubscriptionChange?: (isSubscribed: boolean) => void
onConnectionStatusChange?: (status: ConnectionStatus) => void
onSessionGeneratingChange?: (isGenerating: boolean) => void
onCustomEvent?: (
eventType: string,
data: unknown,
context: { toolCallId?: string },
) => void
}): void {
if (options.connection !== undefined) {
// Cancel any in-flight stream to avoid hanging on stale processing promises
const wasSubscribed = this.isSubscribed

if (this.isLoading) {
this.cancelInFlightStream({ setReadyStatus: true })
} else {
this.cancelInFlightStream({
setReadyStatus: true,
abortSubscription: true,
})
} else if (wasSubscribed) {
this.abortSubscriptionLoop()
}

this.resetSessionGenerating()
this.setIsSubscribed(false)
this.setConnectionStatus('disconnected')
this.connection = normalizeConnectionAdapter(options.connection)

if (wasSubscribed) {
this.subscribe()
}
}
if (options.body !== undefined) {
this.body = options.body
Expand All @@ -850,6 +1004,18 @@ export class ChatClient {
if (options.onError !== undefined) {
this.callbacksRef.current.onError = options.onError
}
if (options.onSubscriptionChange !== undefined) {
this.callbacksRef.current.onSubscriptionChange =
options.onSubscriptionChange
}
if (options.onConnectionStatusChange !== undefined) {
this.callbacksRef.current.onConnectionStatusChange =
options.onConnectionStatusChange
}
if (options.onSessionGeneratingChange !== undefined) {
this.callbacksRef.current.onSessionGeneratingChange =
options.onSessionGeneratingChange
}
if (options.onCustomEvent !== undefined) {
this.callbacksRef.current.onCustomEvent = options.onCustomEvent
}
Expand Down
1 change: 1 addition & 0 deletions packages/typescript/ai-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export type {
ChatRequestBody,
InferChatMessages,
ChatClientState,
ConnectionStatus,
// Multimodal content input type
MultimodalContent,
} from './types'
Expand Down
28 changes: 28 additions & 0 deletions packages/typescript/ai-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ export type ToolResultState =
*/
export type ChatClientState = 'ready' | 'submitted' | 'streaming' | 'error'

/**
* Connection lifecycle state for the subscription loop.
*/
export type ConnectionStatus =
| 'disconnected'
| 'connecting'
| 'connected'
| 'error'

/**
* Multimodal content input for sending messages with rich media.
* Allows sending text, images, audio, video, and documents to the LLM.
Expand Down Expand Up @@ -240,6 +249,25 @@ export interface ChatClientOptions<
*/
onStatusChange?: (status: ChatClientState) => void

/**
* Callback when subscription lifecycle changes.
* This is independent from request lifecycle (`isLoading`, `status`).
*/
onSubscriptionChange?: (isSubscribed: boolean) => void

/**
* Callback when connection lifecycle changes.
*/
onConnectionStatusChange?: (status: ConnectionStatus) => void

/**
* Callback when session generation activity changes.
* Derived from stream run events (RUN_STARTED / RUN_FINISHED / RUN_ERROR).
* Unlike `onLoadingChange` (request-local), this reflects shared generation
* activity visible to all subscribers (e.g. across tabs/devices).
*/
onSessionGeneratingChange?: (isGenerating: boolean) => void

/**
* Callback when a custom event is received from a server-side tool.
* Custom events are emitted by tools using `context.emitCustomEvent()` during execution.
Expand Down
Loading
Loading