fix: tool call streaming output compatibility#6439
fix: tool call streaming output compatibility#6439Weikjssss wants to merge 2 commits intoAstrBotDevs:devfrom
Conversation
# Conflicts: # astrbot/core/provider/sources/openai_source.py resolved by [CherryPick] version
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求旨在解决在使用第三方模型提供商的OpenAI兼容链路下,特定模型(如Claude)在工具调用流式输出中可能出现的兼容性问题,导致输出被截断或提前结束。通过引入一套健壮的流式聚合失败备用机制,确保即使在SDK流状态聚合不完全兼容的情况下,系统也能产出完整且可用的最终响应,无论是文本还是工具调用。 Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The fallback buffering in
_query_stream(text, reasoning, tool calls) grows unbounded with the stream length; consider adding a size limit or chunking strategy to avoid excessive memory use for very long generations. - The
_query_streammethod is becoming quite complex with interleaved state management and fallback logic; consider extracting the fallback buffer handling into a dedicated helper or dataclass to keep the control flow easier to reason about.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The fallback buffering in `_query_stream` (text, reasoning, tool calls) grows unbounded with the stream length; consider adding a size limit or chunking strategy to avoid excessive memory use for very long generations.
- The `_query_stream` method is becoming quite complex with interleaved state management and fallback logic; consider extracting the fallback buffer handling into a dedicated helper or dataclass to keep the control flow easier to reason about.
## Individual Comments
### Comment 1
<location path="astrbot/core/provider/sources/openai_source.py" line_range="323-332" />
<code_context>
async for chunk in stream:
+ if chunk.id:
+ fallback_id = chunk.id
try:
state.handle_chunk(chunk)
except Exception as e:
</code_context>
<issue_to_address>
**suggestion:** Swallowing JSON parse errors for tool arguments without logging may hinder debugging of malformed tool calls.
When `json.loads(raw_args)` fails, the exception is swallowed and replaced with `{ "_raw_arguments": raw_args }` without any logging. This preserves functionality but obscures upstream issues like malformed JSON or truncation. Please add at least a throttled debug or warning log with the model name and a truncated `raw_args` so operational issues with degraded tool call payloads are detectable.
Suggested implementation:
```python
async for chunk in stream:
if chunk.id:
fallback_id = chunk.id
try:
state.handle_chunk(chunk)
except Exception as e:
# Do not stop streaming on state aggregation failure. We still
# keep yielding chunk deltas and rely on fallback finalization.
state_ok = False
if not state_error_logged:
logger.warning(
f"Saving chunk state error for model {self.get_model()}: {e!r}",
)
state_error_logged = True
```
```python
try:
arguments = json.loads(raw_args)
except Exception as e:
# Log JSON parse failures for tool arguments in a throttled manner so that
# malformed or truncated tool payloads are observable without flooding logs.
if _should_log_tool_args_parse_error():
logger.warning(
"Failed to parse tool call arguments for model %s: %r. "
"Raw arguments (truncated to 256 chars): %r",
self.get_model(),
e,
raw_args[:256],
)
arguments = {"_raw_arguments": raw_args}
```
```python
logger = logging.getLogger(__name__)
# Throttle logging for noisy tool-argument JSON parse errors. This keeps
# operational visibility without flooding logs if a model starts emitting
# systematically malformed tool calls.
_TOOL_ARGS_PARSE_ERROR_COUNT = 0
_TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N = 100
def _should_log_tool_args_parse_error() -> bool:
"""
Return True when we should emit a log entry for a tool-argument JSON parse error.
This uses a simple counter-based throttle (log every N errors). It is intentionally
lightweight and process-local; if tighter guarantees are needed across workers,
this can be replaced with a shared-rate-limiter implementation.
"""
global _ TOOL_ARGS_PARSE_ERROR_COUNT # type: ignore[no-redef]
_ TOOL_ARGS_PARSE_ERROR_COUNT += 1
return _ TOOL_ARGS_PARSE_ERROR_COUNT % _ TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N == 1
```
1. Ensure that the `try/except` block for `json.loads(raw_args)` exists exactly as in the SEARCH block; if its structure or local variable names differ (`raw_args` vs `raw_arguments`, etc.), adjust the SEARCH/REPLACE snippet to match the actual code.
2. The helper `_should_log_tool_args_parse_error` assumes `logging` is already imported and `logger` is defined as shown; if your file defines `logger` differently or in another location, insert the helper and the `_TOOL_ARGS_PARSE_ERROR_*` globals near that definition instead.
3. If your codebase has an existing throttled-logging utility (e.g. `log_throttled`, `RateLimitedLogger`, etc.), you should replace the simple counter-based `_should_log_tool_args_parse_error` implementation with that shared utility to stay consistent with existing conventions.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| try: | ||
| state.handle_chunk(chunk) | ||
| except Exception as e: | ||
| logger.warning("Saving chunk state error: " + str(e)) | ||
| if len(chunk.choices) == 0: | ||
| # Do not stop streaming on state aggregation failure. We still | ||
| # keep yielding chunk deltas and rely on fallback finalization. | ||
| state_ok = False | ||
| if not state_error_logged: | ||
| logger.warning( | ||
| f"Saving chunk state error for model {self.get_model()}: {e!r}", | ||
| ) |
There was a problem hiding this comment.
suggestion: Swallowing JSON parse errors for tool arguments without logging may hinder debugging of malformed tool calls.
When json.loads(raw_args) fails, the exception is swallowed and replaced with { "_raw_arguments": raw_args } without any logging. This preserves functionality but obscures upstream issues like malformed JSON or truncation. Please add at least a throttled debug or warning log with the model name and a truncated raw_args so operational issues with degraded tool call payloads are detectable.
Suggested implementation:
async for chunk in stream:
if chunk.id:
fallback_id = chunk.id
try:
state.handle_chunk(chunk)
except Exception as e:
# Do not stop streaming on state aggregation failure. We still
# keep yielding chunk deltas and rely on fallback finalization.
state_ok = False
if not state_error_logged:
logger.warning(
f"Saving chunk state error for model {self.get_model()}: {e!r}",
)
state_error_logged = True try:
arguments = json.loads(raw_args)
except Exception as e:
# Log JSON parse failures for tool arguments in a throttled manner so that
# malformed or truncated tool payloads are observable without flooding logs.
if _should_log_tool_args_parse_error():
logger.warning(
"Failed to parse tool call arguments for model %s: %r. "
"Raw arguments (truncated to 256 chars): %r",
self.get_model(),
e,
raw_args[:256],
)
arguments = {"_raw_arguments": raw_args}logger = logging.getLogger(__name__)
# Throttle logging for noisy tool-argument JSON parse errors. This keeps
# operational visibility without flooding logs if a model starts emitting
# systematically malformed tool calls.
_TOOL_ARGS_PARSE_ERROR_COUNT = 0
_TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N = 100
def _should_log_tool_args_parse_error() -> bool:
"""
Return True when we should emit a log entry for a tool-argument JSON parse error.
This uses a simple counter-based throttle (log every N errors). It is intentionally
lightweight and process-local; if tighter guarantees are needed across workers,
this can be replaced with a shared-rate-limiter implementation.
"""
global _ TOOL_ARGS_PARSE_ERROR_COUNT # type: ignore[no-redef]
_ TOOL_ARGS_PARSE_ERROR_COUNT += 1
return _ TOOL_ARGS_PARSE_ERROR_COUNT % _ TOOL_ARGS_PARSE_ERROR_LOG_EVERY_N == 1- Ensure that the
try/exceptblock forjson.loads(raw_args)exists exactly as in the SEARCH block; if its structure or local variable names differ (raw_argsvsraw_arguments, etc.), adjust the SEARCH/REPLACE snippet to match the actual code. - The helper
_should_log_tool_args_parse_errorassumesloggingis already imported andloggeris defined as shown; if your file definesloggerdifferently or in another location, insert the helper and the_TOOL_ARGS_PARSE_ERROR_*globals near that definition instead. - If your codebase has an existing throttled-logging utility (e.g.
log_throttled,RateLimitedLogger, etc.), you should replace the simple counter-based_should_log_tool_args_parse_errorimplementation with that shared utility to stay consistent with existing conventions.
There was a problem hiding this comment.
Code Review
本次 PR 为处理工具调用的流式输出引入了一套可靠的回退(fallback)机制,有效解决了在部分 OpenAI 兼容服务下流式 chunk 不完全兼容导致输出被截断的问题。_query_stream 中的改动以及新增的 _collect_stream_tool_calls 和 _build_stream_fallback_response 方法设计良好,能够在标准聚合逻辑失败时,通过缓冲的原始数据片段稳健地重建最终响应,确保了文本或工具调用结果的完整性,显著提升了流式模式下工具调用的可靠性。我有一个关于异常处理的建议,可以让代码更加精确。
| if raw_args: | ||
| try: | ||
| parsed_args = json.loads(raw_args) | ||
| except Exception: |
|
我在使用硅基流动的api的时候也出现了不能调用工具的情况,日志提示Saving chunk state error,也是这个错误吗? |
是的,就是这个warn日志 |
所有模型都会吗? |
|
I fixed this bug with a better approach, which introduces less changes and variables. #6829. Thanks for your PR! |
Conflicts:
astrbot/core/provider/sources/openai_source.py resolved by [CherryPick] version
在使用第三方模型提供商的OpenAI 兼容链路下,特定模型(如 claude)在工具调用返回的流式 chunk 与 AstrBot 当前流式聚合实现不完全兼容,导致工具调用后续输出被截断或提前结束。
此前当
ChatCompletionStreamState聚合失败时,仅记录日志,最终可能出现“只输出开头就结束”;本次改动为该链路补充了可靠兜底,确保能产出可用的最终响应(文本或工具调用)。Modifications / 改动点
修改核心文件:
astrbot/core/provider/sources/openai_source.py_query_stream中新增 fallback 缓冲:delta.content)tool_call_id合并 name/arguments/extra_content)state.handle_chunk失败时:state_ok=Falseget_final_completion/解析失败时:LLMResponsejson.loads,失败时保留{"_raw_arguments": ...}防丢失This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
验证命令与结果:
Checklist / 检查清单
😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml.😮 我的更改没有引入恶意代码。
/ My changes do not introduce malicious code.
/ I have read and understood all the above and confirm this PR follows the rules.
🚀 我确保本次开发基于 dev 分支,并将代码合并至开发分支(除非极其紧急,才允许合并到主分支)。
/ I confirm that this development is based on the dev branch and will be merged into the development branch, unless it is extremely urgent to merge into the main branch.
/ I did not read the above carefully before submitting.
Summary by Sourcery
Improve robustness of OpenAI-compatible streaming responses by adding fallbacks when stream state aggregation or final parsing fails.
Bug Fixes:
Enhancements: