Skip to content

fix: tool call streaming output compatibility#6439

Closed
Weikjssss wants to merge 2 commits intoAstrBotDevs:devfrom
Weikjssss:fix/tool-call-stream-compatibility
Closed

fix: tool call streaming output compatibility#6439
Weikjssss wants to merge 2 commits intoAstrBotDevs:devfrom
Weikjssss:fix/tool-call-stream-compatibility

Conversation

@Weikjssss
Copy link
Copy Markdown

@Weikjssss Weikjssss commented Mar 16, 2026

Conflicts:

astrbot/core/provider/sources/openai_source.py resolved by [CherryPick] version

在使用第三方模型提供商的OpenAI 兼容链路下,特定模型(如 claude)在工具调用返回的流式 chunk 与 AstrBot 当前流式聚合实现不完全兼容,导致工具调用后续输出被截断或提前结束。
此前当 ChatCompletionStreamState 聚合失败时,仅记录日志,最终可能出现“只输出开头就结束”;本次改动为该链路补充了可靠兜底,确保能产出可用的最终响应(文本或工具调用)。

Modifications / 改动点

  • 修改核心文件:astrbot/core/provider/sources/openai_source.py

    • _query_stream 中新增 fallback 缓冲:
      • 文本缓冲(delta.content
      • 推理缓冲(reasoning)
      • 工具调用缓冲(按 tool_call_id 合并 name/arguments/extra_content)
      • usage/id 缓冲
    • state.handle_chunk 失败时:
      • 标记 state_ok=False
      • 仅首条 warning,后续降为 debug,避免日志噪音
      • 继续输出流式增量,不中断
    • get_final_completion/解析失败时:
      • 使用缓冲数据构建 final LLMResponse
      • 工具调用参数支持分片拼接并在最终阶段 json.loads,失败时保留 {"_raw_arguments": ...} 防丢失
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

验证命令与结果:

image

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
    / If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.

  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”
    / My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.

  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。
    / I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.

  • 😮 我的更改没有引入恶意代码。
    / My changes do not introduce malicious code.

  • ⚠️ 我已认真阅读并理解以上所有内容,确保本次提交符合规范。
    / I have read and understood all the above and confirm this PR follows the rules.

  • 🚀 我确保本次开发基于 dev 分支,并将代码合并至开发分支(除非极其紧急,才允许合并到主分支)。
    / I confirm that this development is based on the dev branch and will be merged into the development branch, unless it is extremely urgent to merge into the main branch.

  • ⚠️没有认真阅读以上内容,直接提交。
    / I did not read the above carefully before submitting.

Summary by Sourcery

Improve robustness of OpenAI-compatible streaming responses by adding fallbacks when stream state aggregation or final parsing fails.

Bug Fixes:

  • Prevent truncation or premature termination of tool-call streaming responses when third-party OpenAI-compatible providers emit incompatible chunks.

Enhancements:

  • Add fallback buffering of text, reasoning, tool calls, usage, and id during streaming to construct a final LLMResponse when normal state aggregation or parsing is unavailable.
  • Support reconstruction and JSON parsing of fragmented tool-call arguments, preserving raw payloads when parsing fails for reliability.
  • Tidy choice-emptiness checks by using truthiness instead of explicit length comparisons in completion parsing and reasoning extraction.

# Conflicts:
#	astrbot/core/provider/sources/openai_source.py   resolved by [CherryPick] version
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Mar 16, 2026
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求旨在解决在使用第三方模型提供商的OpenAI兼容链路下,特定模型(如Claude)在工具调用流式输出中可能出现的兼容性问题,导致输出被截断或提前结束。通过引入一套健壮的流式聚合失败备用机制,确保即使在SDK流状态聚合不完全兼容的情况下,系统也能产出完整且可用的最终响应,无论是文本还是工具调用。

Highlights

  • 引入备用缓冲区: 在 _query_stream 中引入了文本、推理、工具调用、用量和ID的备用缓冲区,以确保即使SDK流状态聚合失败也能构建可用响应。
  • 改进错误处理: 改进了 state.handle_chunk 的错误处理,当聚合失败时,不再中断流式输出,而是标记状态为不正常,并仅记录一次警告,后续降级为调试日志以减少噪音。
  • 增强最终响应可靠性: 增加了在 get_final_completion 或解析失败时,使用缓冲数据构建最终 LLMResponse 的逻辑,确保响应的可靠性。
  • 优化工具调用参数处理: 增强了工具调用参数的处理,支持分片拼接,并在最终阶段尝试 json.loads,失败时保留原始参数以防丢失。
Changelog
  • astrbot/core/provider/sources/openai_source.py
    • 引入了 state_okstate_error_logged 标志,用于控制流式聚合失败时的行为和日志输出。
    • 添加了 fallback_text_parts, fallback_reasoning_parts, fallback_tool_calls, fallback_tool_call_idx_to_id, fallback_usage, fallback_id 等备用缓冲区,用于在SDK聚合失败时收集流式数据。
    • 修改了 state.handle_chunk 的异常处理逻辑,使其在聚合失败时继续流式输出,并优化了警告日志的频率。
    • 在流式处理结束时,增加了基于 state_ok 状态和解析结果的最终响应构建逻辑,优先使用SDK聚合结果,失败时回退到备用缓冲区。
    • 新增了静态方法 _collect_stream_tool_calls,用于从流式delta中收集工具调用信息并填充到备用工具调用缓冲区。
    • 新增了方法 _build_stream_fallback_response,用于根据备用缓冲区中的数据构建最终的 LLMResponse,包括处理工具调用参数的JSON解析和错误保留。
    • len(completion.choices) == 0 的检查统一替换为 not completion.choices,以提高代码简洁性。
Activity
  • 目前没有发现任何人类活动,例如评论或审查。
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. label Mar 16, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The fallback buffering in _query_stream (text, reasoning, tool calls) grows unbounded with the stream length; consider adding a size limit or chunking strategy to avoid excessive memory use for very long generations.
  • The _query_stream method is becoming quite complex with interleaved state management and fallback logic; consider extracting the fallback buffer handling into a dedicated helper or dataclass to keep the control flow easier to reason about.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The fallback buffering in `_query_stream` (text, reasoning, tool calls) grows unbounded with the stream length; consider adding a size limit or chunking strategy to avoid excessive memory use for very long generations.
- The `_query_stream` method is becoming quite complex with interleaved state management and fallback logic; consider extracting the fallback buffer handling into a dedicated helper or dataclass to keep the control flow easier to reason about.

## Individual Comments

### Comment 1
<location path="astrbot/core/provider/sources/openai_source.py" line_range="323-332" />
<code_context>
         async for chunk in stream:
+            if chunk.id:
+                fallback_id = chunk.id
             try:
                 state.handle_chunk(chunk)
             except Exception as e:
</code_context>
<issue_to_address>
**suggestion:** Swallowing JSON parse errors for tool arguments without logging may hinder debugging of malformed tool calls.

When `json.loads(raw_args)` fails, the exception is swallowed and replaced with `{ "_raw_arguments": raw_args }` without any logging. This preserves functionality but obscures upstream issues like malformed JSON or truncation. Please add at least a throttled debug or warning log with the model name and a truncated `raw_args` so operational issues with degraded tool call payloads are detectable.

Suggested implementation:

```python
        async for chunk in stream:
            if chunk.id:
                fallback_id = chunk.id
            try:
                state.handle_chunk(chunk)
            except Exception as e:
                # Do not stop streaming on state aggregation failure. We still
                # keep yielding chunk deltas and rely on fallback finalization.
                state_ok = False
                if not state_error_logged:
                    logger.warning(
                        f"Saving chunk state error for model {self.get_model()}: {e!r}",
                    )
                    state_error_logged = True

```

```python
        try:
            arguments = json.loads(raw_args)
        except Exception as e:
            # Log JSON parse failures for tool arguments in a throttled manner so that
            # malformed or truncated tool payloads are observable without flooding logs.
            if _should_log_tool_args_parse_error():
                logger.warning(
                    "Failed to parse tool call arguments for model %s: %r. "
                    "Raw arguments (truncated to 256 chars): %r",
                    self.get_model(),
                    e,
                    raw_args[:256],
                )
            arguments = {"_raw_arguments": raw_args}

```

```python
logger = logging.getLogger(__name__)

# Throttle logging for noisy tool-argument JSON parse errors. This keeps
# operational visibility without flooding logs if a model starts emitting
# systematically malformed tool calls.
_TOOL_ARGS_PARSE_ERROR_COUNT = 0
_TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N = 100


def _should_log_tool_args_parse_error() -> bool:
    """
    Return True when we should emit a log entry for a tool-argument JSON parse error.

    This uses a simple counter-based throttle (log every N errors). It is intentionally
    lightweight and process-local; if tighter guarantees are needed across workers,
    this can be replaced with a shared-rate-limiter implementation.
    """
    global _ TOOL_ARGS_PARSE_ERROR_COUNT  # type: ignore[no-redef]
    _ TOOL_ARGS_PARSE_ERROR_COUNT += 1
    return _ TOOL_ARGS_PARSE_ERROR_COUNT % _ TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N == 1

```

1. Ensure that the `try/except` block for `json.loads(raw_args)` exists exactly as in the SEARCH block; if its structure or local variable names differ (`raw_args` vs `raw_arguments`, etc.), adjust the SEARCH/REPLACE snippet to match the actual code.
2. The helper `_should_log_tool_args_parse_error` assumes `logging` is already imported and `logger` is defined as shown; if your file defines `logger` differently or in another location, insert the helper and the `_TOOL_ARGS_PARSE_ERROR_*` globals near that definition instead.
3. If your codebase has an existing throttled-logging utility (e.g. `log_throttled`, `RateLimitedLogger`, etc.), you should replace the simple counter-based `_should_log_tool_args_parse_error` implementation with that shared utility to stay consistent with existing conventions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 323 to +332
try:
state.handle_chunk(chunk)
except Exception as e:
logger.warning("Saving chunk state error: " + str(e))
if len(chunk.choices) == 0:
# Do not stop streaming on state aggregation failure. We still
# keep yielding chunk deltas and rely on fallback finalization.
state_ok = False
if not state_error_logged:
logger.warning(
f"Saving chunk state error for model {self.get_model()}: {e!r}",
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Swallowing JSON parse errors for tool arguments without logging may hinder debugging of malformed tool calls.

When json.loads(raw_args) fails, the exception is swallowed and replaced with { "_raw_arguments": raw_args } without any logging. This preserves functionality but obscures upstream issues like malformed JSON or truncation. Please add at least a throttled debug or warning log with the model name and a truncated raw_args so operational issues with degraded tool call payloads are detectable.

Suggested implementation:

        async for chunk in stream:
            if chunk.id:
                fallback_id = chunk.id
            try:
                state.handle_chunk(chunk)
            except Exception as e:
                # Do not stop streaming on state aggregation failure. We still
                # keep yielding chunk deltas and rely on fallback finalization.
                state_ok = False
                if not state_error_logged:
                    logger.warning(
                        f"Saving chunk state error for model {self.get_model()}: {e!r}",
                    )
                    state_error_logged = True
        try:
            arguments = json.loads(raw_args)
        except Exception as e:
            # Log JSON parse failures for tool arguments in a throttled manner so that
            # malformed or truncated tool payloads are observable without flooding logs.
            if _should_log_tool_args_parse_error():
                logger.warning(
                    "Failed to parse tool call arguments for model %s: %r. "
                    "Raw arguments (truncated to 256 chars): %r",
                    self.get_model(),
                    e,
                    raw_args[:256],
                )
            arguments = {"_raw_arguments": raw_args}
logger = logging.getLogger(__name__)

# Throttle logging for noisy tool-argument JSON parse errors. This keeps
# operational visibility without flooding logs if a model starts emitting
# systematically malformed tool calls.
_TOOL_ARGS_PARSE_ERROR_COUNT = 0
_TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N = 100


def _should_log_tool_args_parse_error() -> bool:
    """
    Return True when we should emit a log entry for a tool-argument JSON parse error.

    This uses a simple counter-based throttle (log every N errors). It is intentionally
    lightweight and process-local; if tighter guarantees are needed across workers,
    this can be replaced with a shared-rate-limiter implementation.
    """
    global _ TOOL_ARGS_PARSE_ERROR_COUNT  # type: ignore[no-redef]
    _ TOOL_ARGS_PARSE_ERROR_COUNT += 1
    return _ TOOL_ARGS_PARSE_ERROR_COUNT % _ TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N == 1
  1. Ensure that the try/except block for json.loads(raw_args) exists exactly as in the SEARCH block; if its structure or local variable names differ (raw_args vs raw_arguments, etc.), adjust the SEARCH/REPLACE snippet to match the actual code.
  2. The helper _should_log_tool_args_parse_error assumes logging is already imported and logger is defined as shown; if your file defines logger differently or in another location, insert the helper and the _TOOL_ARGS_PARSE_ERROR_* globals near that definition instead.
  3. If your codebase has an existing throttled-logging utility (e.g. log_throttled, RateLimitedLogger, etc.), you should replace the simple counter-based _should_log_tool_args_parse_error implementation with that shared utility to stay consistent with existing conventions.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

本次 PR 为处理工具调用的流式输出引入了一套可靠的回退(fallback)机制,有效解决了在部分 OpenAI 兼容服务下流式 chunk 不完全兼容导致输出被截断的问题。_query_stream 中的改动以及新增的 _collect_stream_tool_calls_build_stream_fallback_response 方法设计良好,能够在标准聚合逻辑失败时,通过缓冲的原始数据片段稳健地重建最终响应,确保了文本或工具调用结果的完整性,显著提升了流式模式下工具调用的可靠性。我有一个关于异常处理的建议,可以让代码更加精确。

if raw_args:
try:
parsed_args = json.loads(raw_args)
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

为了提高代码的精确性和可维护性,建议将宽泛的 except Exception: 替换为更具体的 except json.JSONDecodeError:。这能更清晰地表明代码意图是处理JSON解析失败的情况,同时避免意外捕获其他类型的运行时错误,使错误处理逻辑更加稳健和易于调试。

Suggested change
except Exception:
except json.JSONDecodeError:

@mohen-ink
Copy link
Copy Markdown

我在使用硅基流动的api的时候也出现了不能调用工具的情况,日志提示Saving chunk state error,也是这个错误吗?

@Weikjssss
Copy link
Copy Markdown
Author

我在使用硅基流动的api的时候也出现了不能调用工具的情况,日志提示Saving chunk state error,也是这个错误吗?

是的,就是这个warn日志

@Soulter Soulter changed the base branch from dev to master March 23, 2026 03:54
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Mar 23, 2026
@Soulter Soulter changed the base branch from master to dev March 23, 2026 03:55
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. and removed size:XXL This PR changes 1000+ lines, ignoring generated files. labels Mar 23, 2026
@Soulter
Copy link
Copy Markdown
Member

Soulter commented Mar 23, 2026

我在使用硅基流动的api的时候也出现了不能调用工具的情况,日志提示Saving chunk state error,也是这个错误吗?

所有模型都会吗?

@Soulter
Copy link
Copy Markdown
Member

Soulter commented Mar 23, 2026

I fixed this bug with a better approach, which introduces less changes and variables. #6829. Thanks for your PR!

@Soulter Soulter closed this Mar 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants