Skip to content

feat: add buffered intermediate messages for non-streaming agent loop#7627

Open
alicesainta wants to merge 2 commits intoAstrBotDevs:masterfrom
alicesainta:master
Open

feat: add buffered intermediate messages for non-streaming agent loop#7627
alicesainta wants to merge 2 commits intoAstrBotDevs:masterfrom
alicesainta:master

Conversation

@alicesainta
Copy link
Copy Markdown
Contributor

@alicesainta alicesainta commented Apr 17, 2026

Fixes #7599
在非流式多步 Agent 工具调用场景下,LLM 每一步的中间文本会被立即发送,导致用户收到多条碎片化消息。
本 PR 增加可选缓冲能力,在 Agent 完成后合并中间文本为单条消息发送,提升用户体验,同时保持默认行为不变。

Modifications / 改动点

  • 新增配置项:provider_settings.buffer_intermediate_messages(默认 false

  • 修改 astrbot/core/astr_agent_run_util.py

    • run_agent() 增加 buffer_intermediate_messages 参数
    • 非流式模式下开启该配置时,缓存 llm_result 中间文本
    • Agent 完成后将缓存内容合并为一条 LLM_RESULT 输出
  • 修改 astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py

    • 读取并透传 buffer_intermediate_messagesrun_agent() / run_live_agent()
  • 修改 astrbot/core/config/default.py

    • 默认配置中加入 buffer_intermediate_messages: false
    • 补充配置元数据(类型、说明、展示条件)
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

Verification Steps:

  1. streaming_response=false, buffer_intermediate_messages=true
    • 发送需多步工具调用的请求
    • 结果:中间文本不逐条发送,最终合并为一条消息输出
  2. streaming_response=false, buffer_intermediate_messages=false
    • 结果:保持原行为,每步文本独立发送
  3. streaming_response=truebuffer_intermediate_messages 任意)
    • 结果:流式行为不受影响

Local checks:

uv run ruff format .
uv run ruff check .

Output:

ruff format: files left unchanged
ruff check: All checks passed

Checklist / 检查清单

😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。

🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
/ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txt 和 pyproject.toml 文件相应位置。

😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。

Summary by Sourcery

Add an optional configuration to buffer and merge intermediate non-streaming agent messages into a single response while preserving existing behavior by default.

New Features:

  • Introduce a provider_settings.buffer_intermediate_messages option to control buffering of intermediate agent messages in non-streaming mode.

Enhancements:

  • Extend agent runner and live agent execution paths to support buffering and merging of intermediate LLM_RESULT chains when configured.
  • Expose the new buffering option through the chat provider template metadata with description, hint, and UI conditions tied to non-streaming local agents.

@auto-assign auto-assign bot requested review from Soulter and advent259141 April 17, 2026 12:48
@dosubot dosubot bot added size:M This PR changes 30-99 lines, ignoring generated files. area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. labels Apr 17, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The if buffer_intermediate_messages and not stream_to_general and not agent_runner.streaming and resp.type == "llm_result" guard is nested inside if stream_to_general or not agent_runner.streaming: and already gated by stream_to_general / agent_runner.streaming, so you can simplify the condition to just if buffer_intermediate_messages and not stream_to_general and resp.type == "llm_result" (or restructure the outer branch) to reduce redundancy and improve readability.
  • The buffering/merging path for llm_result is currently inlined inside run_agent; consider extracting this into a small helper (e.g., buffer_and_maybe_flush_llm_chain(...)) so the main loop is easier to follow and the buffering behavior is centralized for future adjustments.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `if buffer_intermediate_messages and not stream_to_general and not agent_runner.streaming and resp.type == "llm_result"` guard is nested inside `if stream_to_general or not agent_runner.streaming:` and already gated by `stream_to_general` / `agent_runner.streaming`, so you can simplify the condition to just `if buffer_intermediate_messages and not stream_to_general and resp.type == "llm_result"` (or restructure the outer branch) to reduce redundancy and improve readability.
- The buffering/merging path for `llm_result` is currently inlined inside `run_agent`; consider extracting this into a small helper (e.g., `buffer_and_maybe_flush_llm_chain(...)`) so the main loop is easier to follow and the buffering behavior is centralized for future adjustments.

## Individual Comments

### Comment 1
<location path="astrbot/core/astr_agent_run_util.py" line_range="201" />
<code_context>
                     continue

                 if stream_to_general or not agent_runner.streaming:
+                    if (
+                        buffer_intermediate_messages
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the buffering logic into helper functions and optionally grouping related flags into a config object to simplify the control flow and function signatures.

You can keep the feature while simplifying the control flow and reducing duplication by extracting the buffering logic and precomputing the combined predicate.

### 1. Extract buffering logic out of the main loop

Instead of the deep inline conditional + merging logic:

```python
if stream_to_general or not agent_runner.streaming:
    if (
        buffer_intermediate_messages
        and not stream_to_general
        and not agent_runner.streaming
        and resp.type == "llm_result"
    ):
        buffered_llm_chains.append(resp.data["chain"])
        if not agent_runner.done():
            continue

        merged_chain = MessageChain()
        for chain in buffered_llm_chains:
            merged_chain.chain.extend(chain.chain)
        buffered_llm_chains.clear()

        astr_event.set_result(
            MessageEventResult(
                chain=merged_chain.chain,
                result_content_type=ResultContentType.LLM_RESULT,
            ),
        )
        yield
        astr_event.clear_result()
        continue

    content_typ = ...
```

you can push this into a small helper that returns `True` when it has fully handled the event (so the caller can just `continue`):

```python
def _can_buffer_llm_result(
    buffer_intermediate_messages: bool,
    stream_to_general: bool,
    agent_runner: AgentRunner,
) -> bool:
    return (
        buffer_intermediate_messages
        and not stream_to_general
        and not agent_runner.streaming
    )


def _merge_message_chains(chains: list[MessageChain]) -> MessageChain:
    merged = MessageChain()
    for chain in chains:
        merged.chain.extend(chain.chain)
    chains.clear()
    return merged


async def _handle_buffered_llm_result(
    resp,
    agent_runner: AgentRunner,
    astr_event,
    buffered_llm_chains: list[MessageChain],
) -> bool:
    """Returns True if the caller should `continue` (event fully handled)."""
    buffered_llm_chains.append(resp.data["chain"])
    if not agent_runner.done():
        return True  # keep buffering

    merged_chain = _merge_message_chains(buffered_llm_chains)

    astr_event.set_result(
        MessageEventResult(
            chain=merged_chain.chain,
            result_content_type=ResultContentType.LLM_RESULT,
        ),
    )
    yield
    astr_event.clear_result()
    return True
```

Then the main loop becomes much more linear and easier to read:

```python
if stream_to_general or not agent_runner.streaming:
    if (
        resp.type == "llm_result"
        and _can_buffer_llm_result(
            buffer_intermediate_messages,
            stream_to_general,
            agent_runner,
        )
    ):
        handled = await _handle_buffered_llm_result(
            resp,
            agent_runner,
            astr_event,
            buffered_llm_chains,
        )
        if handled:
            continue

    content_typ = (
        ResultContentType.LLM_RESULT
        if resp.type == "llm_result"
        else ResultContentType.GENERAL_RESULT
    )
    astr_event.set_result(
        MessageEventResult(
            chain=resp.data["chain"].chain,
            result_content_type=content_typ,
        ),
    )
    yield
    astr_event.clear_result()
```

This removes the inline merging logic, eliminates duplicated predicates inside the nested `if`, and makes the intent of the buffering path explicit.

### 2. Reduce signature bloat using a simple config object (optional but helps)

If you want to address the growing parameter list without changing behavior, you can group the related flags into a small config object and pass that instead of multiple booleans:

```python
@dataclass
class AgentRunOptions:
    show_tool_use: bool = True
    show_tool_call_result: bool = False
    show_reasoning: bool = False
    buffer_intermediate_messages: bool = False
```

Use it like this:

```python
async def run_agent(
    agent_runner: AgentRunner,
    max_step: int = 30,
    stream_to_general: bool = False,
    options: AgentRunOptions | None = None,
) -> AsyncGenerator[MessageChain | None, None]:
    options = options or AgentRunOptions()
    ...
    if (
        resp.type == "llm_result"
        and _can_buffer_llm_result(
            options.buffer_intermediate_messages,
            stream_to_general,
            agent_runner,
        )
    ):
        ...
```

And callers such as `run_live_agent` and `_run_agent_feeder` just pass `options` instead of four separate flags:

```python
async def run_live_agent(
    agent_runner: AgentRunner,
    tts_provider: TTSProvider | None = None,
    max_step: int = 30,
    options: AgentRunOptions | None = None,
) -> AsyncGenerator[MessageChain | None, None]:
    options = options or AgentRunOptions()
    ...
    async for chain in run_agent(
        agent_runner,
        max_step=max_step,
        stream_to_general=False,
        options=options,
    ):
        yield chain
```

This keeps all existing behavior intact while making `run_agent`’s core logic and the call chain significantly easier to follow.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/astr_agent_run_util.py
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a feature to buffer and merge intermediate messages from an agent when running in non-streaming mode, controlled by a new buffer_intermediate_messages configuration setting. The implementation includes changes to the agent runner utilities, configuration schemas, and the internal agent pipeline stage. A critical issue was identified in the buffering logic where messages could be lost if the agent terminates unexpectedly. Furthermore, the current implementation yields None instead of the merged MessageChain, which breaks downstream consumers like the Text-to-Speech (TTS) engine in Live Mode. A suggestion was provided to ensure the buffer is flushed correctly and the merged chain is yielded.

Comment thread astrbot/core/astr_agent_run_util.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature Request: 增加"缓冲中间消息"配置项,合并多步 Agent 中间文本为单条消息

1 participant