feat: add buffered intermediate messages for non-streaming agent loop#7627
feat: add buffered intermediate messages for non-streaming agent loop#7627alicesainta wants to merge 2 commits intoAstrBotDevs:masterfrom
Conversation
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The
if buffer_intermediate_messages and not stream_to_general and not agent_runner.streaming and resp.type == "llm_result"guard is nested insideif stream_to_general or not agent_runner.streaming:and already gated bystream_to_general/agent_runner.streaming, so you can simplify the condition to justif buffer_intermediate_messages and not stream_to_general and resp.type == "llm_result"(or restructure the outer branch) to reduce redundancy and improve readability. - The buffering/merging path for
llm_resultis currently inlined insiderun_agent; consider extracting this into a small helper (e.g.,buffer_and_maybe_flush_llm_chain(...)) so the main loop is easier to follow and the buffering behavior is centralized for future adjustments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `if buffer_intermediate_messages and not stream_to_general and not agent_runner.streaming and resp.type == "llm_result"` guard is nested inside `if stream_to_general or not agent_runner.streaming:` and already gated by `stream_to_general` / `agent_runner.streaming`, so you can simplify the condition to just `if buffer_intermediate_messages and not stream_to_general and resp.type == "llm_result"` (or restructure the outer branch) to reduce redundancy and improve readability.
- The buffering/merging path for `llm_result` is currently inlined inside `run_agent`; consider extracting this into a small helper (e.g., `buffer_and_maybe_flush_llm_chain(...)`) so the main loop is easier to follow and the buffering behavior is centralized for future adjustments.
## Individual Comments
### Comment 1
<location path="astrbot/core/astr_agent_run_util.py" line_range="201" />
<code_context>
continue
if stream_to_general or not agent_runner.streaming:
+ if (
+ buffer_intermediate_messages
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the buffering logic into helper functions and optionally grouping related flags into a config object to simplify the control flow and function signatures.
You can keep the feature while simplifying the control flow and reducing duplication by extracting the buffering logic and precomputing the combined predicate.
### 1. Extract buffering logic out of the main loop
Instead of the deep inline conditional + merging logic:
```python
if stream_to_general or not agent_runner.streaming:
if (
buffer_intermediate_messages
and not stream_to_general
and not agent_runner.streaming
and resp.type == "llm_result"
):
buffered_llm_chains.append(resp.data["chain"])
if not agent_runner.done():
continue
merged_chain = MessageChain()
for chain in buffered_llm_chains:
merged_chain.chain.extend(chain.chain)
buffered_llm_chains.clear()
astr_event.set_result(
MessageEventResult(
chain=merged_chain.chain,
result_content_type=ResultContentType.LLM_RESULT,
),
)
yield
astr_event.clear_result()
continue
content_typ = ...
```
you can push this into a small helper that returns `True` when it has fully handled the event (so the caller can just `continue`):
```python
def _can_buffer_llm_result(
buffer_intermediate_messages: bool,
stream_to_general: bool,
agent_runner: AgentRunner,
) -> bool:
return (
buffer_intermediate_messages
and not stream_to_general
and not agent_runner.streaming
)
def _merge_message_chains(chains: list[MessageChain]) -> MessageChain:
merged = MessageChain()
for chain in chains:
merged.chain.extend(chain.chain)
chains.clear()
return merged
async def _handle_buffered_llm_result(
resp,
agent_runner: AgentRunner,
astr_event,
buffered_llm_chains: list[MessageChain],
) -> bool:
"""Returns True if the caller should `continue` (event fully handled)."""
buffered_llm_chains.append(resp.data["chain"])
if not agent_runner.done():
return True # keep buffering
merged_chain = _merge_message_chains(buffered_llm_chains)
astr_event.set_result(
MessageEventResult(
chain=merged_chain.chain,
result_content_type=ResultContentType.LLM_RESULT,
),
)
yield
astr_event.clear_result()
return True
```
Then the main loop becomes much more linear and easier to read:
```python
if stream_to_general or not agent_runner.streaming:
if (
resp.type == "llm_result"
and _can_buffer_llm_result(
buffer_intermediate_messages,
stream_to_general,
agent_runner,
)
):
handled = await _handle_buffered_llm_result(
resp,
agent_runner,
astr_event,
buffered_llm_chains,
)
if handled:
continue
content_typ = (
ResultContentType.LLM_RESULT
if resp.type == "llm_result"
else ResultContentType.GENERAL_RESULT
)
astr_event.set_result(
MessageEventResult(
chain=resp.data["chain"].chain,
result_content_type=content_typ,
),
)
yield
astr_event.clear_result()
```
This removes the inline merging logic, eliminates duplicated predicates inside the nested `if`, and makes the intent of the buffering path explicit.
### 2. Reduce signature bloat using a simple config object (optional but helps)
If you want to address the growing parameter list without changing behavior, you can group the related flags into a small config object and pass that instead of multiple booleans:
```python
@dataclass
class AgentRunOptions:
show_tool_use: bool = True
show_tool_call_result: bool = False
show_reasoning: bool = False
buffer_intermediate_messages: bool = False
```
Use it like this:
```python
async def run_agent(
agent_runner: AgentRunner,
max_step: int = 30,
stream_to_general: bool = False,
options: AgentRunOptions | None = None,
) -> AsyncGenerator[MessageChain | None, None]:
options = options or AgentRunOptions()
...
if (
resp.type == "llm_result"
and _can_buffer_llm_result(
options.buffer_intermediate_messages,
stream_to_general,
agent_runner,
)
):
...
```
And callers such as `run_live_agent` and `_run_agent_feeder` just pass `options` instead of four separate flags:
```python
async def run_live_agent(
agent_runner: AgentRunner,
tts_provider: TTSProvider | None = None,
max_step: int = 30,
options: AgentRunOptions | None = None,
) -> AsyncGenerator[MessageChain | None, None]:
options = options or AgentRunOptions()
...
async for chain in run_agent(
agent_runner,
max_step=max_step,
stream_to_general=False,
options=options,
):
yield chain
```
This keeps all existing behavior intact while making `run_agent`’s core logic and the call chain significantly easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
This pull request introduces a feature to buffer and merge intermediate messages from an agent when running in non-streaming mode, controlled by a new buffer_intermediate_messages configuration setting. The implementation includes changes to the agent runner utilities, configuration schemas, and the internal agent pipeline stage. A critical issue was identified in the buffering logic where messages could be lost if the agent terminates unexpectedly. Furthermore, the current implementation yields None instead of the merged MessageChain, which breaks downstream consumers like the Text-to-Speech (TTS) engine in Live Mode. A suggestion was provided to ensure the buffer is flushed correctly and the merged chain is yielded.
Fixes #7599
在非流式多步 Agent 工具调用场景下,LLM 每一步的中间文本会被立即发送,导致用户收到多条碎片化消息。
本 PR 增加可选缓冲能力,在 Agent 完成后合并中间文本为单条消息发送,提升用户体验,同时保持默认行为不变。
Modifications / 改动点
新增配置项:
provider_settings.buffer_intermediate_messages(默认false)修改
astrbot/core/astr_agent_run_util.pyrun_agent()增加buffer_intermediate_messages参数llm_result中间文本LLM_RESULT输出修改
astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.pybuffer_intermediate_messages到run_agent()/run_live_agent()修改
astrbot/core/config/default.pybuffer_intermediate_messages: falseThis is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
Verification Steps:
streaming_response=false,buffer_intermediate_messages=truestreaming_response=false,buffer_intermediate_messages=falsestreaming_response=true(buffer_intermediate_messages任意)Local checks:
Output:
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
/ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txt 和 pyproject.toml 文件相应位置。
😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add an optional configuration to buffer and merge intermediate non-streaming agent messages into a single response while preserving existing behavior by default.
New Features:
Enhancements: