feat: support webchat message editing, branching, and regeneration#7608
feat: support webchat message editing, branching, and regeneration#7608SweetenedSuzuka wants to merge 7 commits intoAstrBotDevs:masterfrom
Conversation
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The
ChatRouteclass has grown quite large with many new private helpers (branching, sanitization, conversation sync); consider extracting the branching/edit/regenerate logic into a dedicated service/module to keep the route handler focused and easier to maintain. - Both
chat.get_sessionsandchatui_project.get_project_sessionscallget_preferencesfor allbranch_metarecords and then filter bysession_ids, which can become inefficient as data grows; consider adding a DB method that fetches preferences by(scope, key, scope_ids)to avoid scanning all entries. - The SSE/WebSocket handling for
message_savedis now duplicated betweenchat.stream,live_chat._handle_chat_message, and the frontendprocessStreamPayload; consider centralizing the event payload shape and processing logic to reduce divergence and make future changes safer.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `ChatRoute` class has grown quite large with many new private helpers (branching, sanitization, conversation sync); consider extracting the branching/edit/regenerate logic into a dedicated service/module to keep the route handler focused and easier to maintain.
- Both `chat.get_sessions` and `chatui_project.get_project_sessions` call `get_preferences` for all `branch_meta` records and then filter by `session_ids`, which can become inefficient as data grows; consider adding a DB method that fetches preferences by `(scope, key, scope_ids)` to avoid scanning all entries.
- The SSE/WebSocket handling for `message_saved` is now duplicated between `chat.stream`, `live_chat._handle_chat_message`, and the frontend `processStreamPayload`; consider centralizing the event payload shape and processing logic to reduce divergence and make future changes safer.
## Individual Comments
### Comment 1
<location path="astrbot/dashboard/routes/chat.py" line_range="317" />
<code_context>
)
return record
+ def _sanitize_message_content(self, content: dict) -> dict:
+ """Normalize message content before persisting it."""
+ if not isinstance(content, dict):
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the new conversation-history, content-handling, and branching/regeneration logic into dedicated services/utilities so the route remains a thin HTTP layer.
You can keep the new features but reduce complexity by pulling the non-HTTP logic into small collaborators and pure utilities. This keeps the route thin and makes the dense helper web testable in isolation.
### 1. Extract conversation-history sync into a service
`_sync_conversation_history_message`, `_find_conversation_history_index`, `_extract_conversation_text`, `_extract_conversation_think`, `_is_displayable_conversation_message`, `_trim_conversation_history`, `_clone_current_conversation`, `_rewrite_current_conversation` all belong to a “conversation history” concern rather than the route.
You can move them into a dedicated `ConversationHistoryService` and keep the handler as an orchestrator:
```python
# conversation_history_service.py
class ConversationHistoryService:
def __init__(self, conv_mgr):
self._conv_mgr = conv_mgr
def find_message_index(self, history: list[dict], *, role: str, ordinal: int,
old_text: str, old_reasoning: str) -> int | None:
# inline from _find_conversation_history_index + helpers, but pure
...
async def sync_platform_edit(
self,
session,
existing_record,
updated_content: dict,
) -> None:
# logic currently in _sync_conversation_history_message
...
async def rewrite_current_conversation(
self,
session,
*,
max_user_messages: int | None = None,
max_assistant_messages: int | None = None,
) -> None:
# logic currently in _rewrite_current_conversation
...
```
Then the route becomes trivial and easier to read:
```python
# in ChatRoute.__init__
self.conv_history_svc = ConversationHistoryService(self.conv_mgr)
# in update_message
await self.db.update_platform_message_history(message_id=message_id, content=content)
await self.conv_history_svc.sync_platform_edit(
session=session,
existing_record=record,
updated_content=content,
)
```
And in `regenerate_message`:
```python
await self.db.delete_platform_message_histories(
[source_user_record.id, target_record.id]
)
await self.conv_history_svc.rewrite_current_conversation(
session,
max_user_messages=preserved_user_count,
max_assistant_messages=preserved_bot_count,
)
```
This keeps all behavior, but the route no longer needs to know about ordinals, text/thinking extraction, or history trimming details.
### 2. Consolidate content extraction/replacement utilities
These helpers are tightly coupled and duplicated:
- `_extract_platform_message_text`
- `_extract_conversation_text`
- `_extract_conversation_think`
- `_replace_user_conversation_content`
- `_replace_assistant_conversation_content`
They can live in a tiny utility module and be reused by both update and regeneration flows:
```python
# conversation_content_utils.py
def extract_platform_text(content: dict | None) -> str:
...
def extract_conversation_text(message: dict) -> str:
...
def extract_conversation_think(message: dict) -> str:
...
def replace_user_content(original_content, edited_text: str) -> str | list[dict]:
...
def replace_assistant_content(
original_content, edited_text: str, reasoning: str
) -> str | list[dict]:
...
```
Usage in the route (and in the history service):
```python
from .conversation_content_utils import (
extract_platform_text,
replace_user_content,
replace_assistant_content,
)
edited_text = extract_platform_text(updated_content)
if conversation_role == "user":
target_message["content"] = replace_user_content(original_content, edited_text)
else:
target_message["content"] = replace_assistant_content(
original_content, edited_text, updated_content.get("reasoning", "")
)
```
This removes the content rules from the route class and ensures both edit/regenerate paths use the same rules.
### 3. Extract branching/regeneration orchestration into a small domain service
`branch_session` and `regenerate_message` currently mix:
- permission checking,
- platform history scanning,
- ID remapping,
- session cloning,
- and conversation trimming.
You can keep HTTP concerns in the route and move the rest into a service, e.g. `SessionBranchService`:
```python
# session_branch_service.py
class SessionBranchService:
def __init__(self, db, platform_history_mgr, conv_history_svc, umop_config_router):
self.db = db
self.platform_history_mgr = platform_history_mgr
self.conv_history_svc = conv_history_svc
self.umop_config_router = umop_config_router
async def branch_session(self, source_session, username: str, display_name: str | None):
# logic from _get_sorted_platform_history, _create_branch_session,
# _remap_reply_message_ids, and branch_session body
...
async def prepare_regeneration(self, session, message_id: int):
# selection logic from regenerate_message:
# - get sorted history
# - find target bot record
# - find corresponding user record
# - compute preserved counts
# Return a simple DTO
...
```
Then the route endpoints become focused:
```python
# in ChatRoute.__init__
self.session_branch_svc = SessionBranchService(
self.db, self.platform_history_mgr, self.conv_history_svc, self.umop_config_router
)
# in branch_session
cloned_session, project_info, branch_meta = await self.session_branch_svc.branch_session(
session, username, display_name
)
# in regenerate_message
plan = await self.session_branch_svc.prepare_regeneration(session, message_id)
if isinstance(plan, Error):
return Response().error(plan.message).__dict__
await self.db.delete_platform_message_histories(plan.removed_ids)
await self.conv_history_svc.rewrite_current_conversation(
session,
max_user_messages=plan.preserved_user_count,
max_assistant_messages=plan.preserved_bot_count,
)
```
This keeps the new features but breaks the “god object” pattern: the route only does auth, parsing, and calling services; the services encapsulate the domain logic with narrower, testable APIs.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Code Review
This pull request introduces features for branching chat sessions, regenerating bot messages, and editing existing messages. It includes backend database updates, new API routes, and significant frontend enhancements to support these workflows. Feedback highlights a security vulnerability in the session retrieval endpoint where ownership checks are missing, performance concerns regarding batch database operations and large history fetches, and a CSS syntax error in the frontend styling.
| for history in history_list: | ||
| cloned_content = deepcopy(history.content or {}) | ||
| message_parts = cloned_content.get("message") | ||
| if isinstance(message_parts, list): | ||
| self._remap_reply_message_ids(message_parts, old_to_new_message_ids) | ||
| cloned_content["message"] = strip_message_parts_path_fields( | ||
| message_parts | ||
| ) | ||
|
|
||
| new_record = await self.platform_history_mgr.insert( | ||
| platform_id=cloned_session.platform_id, | ||
| user_id=cloned_session.session_id, | ||
| content=cloned_content, | ||
| sender_id=history.sender_id, | ||
| sender_name=history.sender_name, | ||
| ) | ||
| old_to_new_message_ids[str(history.id)] = new_record.id |
There was a problem hiding this comment.
Performing database inserts in a loop inside branch_session is inefficient, especially for long conversation histories. Each call to platform_history_mgr.insert likely starts and commits a new transaction. This can lead to significant performance degradation and potential database lock contention on SQLite. Consider implementing a batch insert method in the history manager or wrapping the loop in a single transaction.
There was a problem hiding this comment.
这个问题是合理的。当前实现确实是在 branch_session 中逐条写入历史记录。
不过不建议当前 PR 处理,主要是因为这会涉及 history manager / DB 写入路径的扩展,改动面会超出本次功能修复范围。
| platform_history = await self.platform_history_mgr.get( | ||
| platform_id=session.platform_id, | ||
| user_id=session.session_id, | ||
| page=1, | ||
| page_size=100000, | ||
| ) |
There was a problem hiding this comment.
Fetching up to 100,000 history records to find a message's ordinal is inefficient and can cause memory issues for large sessions. Consider using a more targeted SQL query (e.g., using COUNT(*) with a filter on created_at and id) to determine the message's position in the history relative to its role.
There was a problem hiding this comment.
这个问题也是合理的。当前为了定位消息在对应角色历史中的位置,确实会读取较大范围的历史记录。
同样不建议当前 PR 处理,因为如果改为更定向的查询方式,需要一起调整当前消息定位与会话同步逻辑。这样会明显扩大本次 PR 的实现范围。
|
粗看之下,ChatRoute过重,建议拆服务,是有道理的; |
|
@sourcery-ai review |
|
还没改完,等一会( |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
Chat.vuestyles around.session-text/.session-branch-text, there appears to be a strayfont-size: 14px;andfont-weight: 500;outside of any selector after your additions; this likely broke the original.session-titlerules and should be cleaned up so those declarations live inside the intended class. - The branching label/summary logic is duplicated between
Chat.vue(branchSummary) andProjectView.vue(branchLabel); consider extracting a shared helper (or reusing one component/composable) to centralize the formatting ofbranch_metaand avoid divergence in the future.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `Chat.vue` styles around `.session-text`/`.session-branch-text`, there appears to be a stray `font-size: 14px;` and `font-weight: 500;` outside of any selector after your additions; this likely broke the original `.session-title` rules and should be cleaned up so those declarations live inside the intended class.
- The branching label/summary logic is duplicated between `Chat.vue` (`branchSummary`) and `ProjectView.vue` (`branchLabel`); consider extracting a shared helper (or reusing one component/composable) to centralize the formatting of `branch_meta` and avoid divergence in the future.
## Individual Comments
### Comment 1
<location path="astrbot/dashboard/routes/chat.py" line_range="1409-1418" />
<code_context>
+ async def update_message(self):
</code_context>
<issue_to_address>
**issue (bug_risk):** Updated content is trusted from the client; there’s no server-side check that the message type is unchanged.
In `update_message`, the new `content` is taken directly from the client, and `_sanitize_message_content` only checks that `content.type` is a non-empty string. This lets a client change a persisted `user` message into a `bot` message (or any other type), which can break history semantics and `_sync_conversation_history_message` logic. You can harden this by requiring `content['type']` to match `record.content.get('type')`, and optionally restricting it to an allowed set (e.g. `{'user', 'bot'}`).
</issue_to_address>
### Comment 2
<location path="astrbot/dashboard/routes/chat.py" line_range="347" />
<code_context>
+ if mapped_id is not None:
+ part["message_id"] = mapped_id
+
+ def _build_webchat_unified_msg_origin(self, session) -> str:
+ message_type = (
+ MessageType.GROUP_MESSAGE
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting conversation history, regeneration, branching, and session serialization logic into dedicated services so this route focuses only on HTTP validation and orchestration.
The new features are solid, but a lot of domain logic has accumulated in this route. You can keep all behavior while reducing complexity by extracting a couple of focused services and hiding low‑level details behind higher‑level operations.
### 1. Extract conversation history ops into a service
All of these are tightly coupled and share the same domain:
- `_build_webchat_unified_msg_origin`
- `_extract_platform_message_text`
- `_extract_conversation_text`
- `_extract_conversation_think`
- `_is_displayable_conversation_message`
- `_replace_user_conversation_content`
- `_replace_assistant_conversation_content`
- `_find_conversation_history_index`
- `_trim_conversation_history`
- `_clone_current_conversation`
- `_rewrite_current_conversation`
- `_sync_conversation_history_message`
You can move these into a `ConversationHistoryService` and let the route depend on coarse methods like `sync_edited_message`, `clone_current`, `rewrite_current`.
Example (minimal, showing the pattern, not full implementation):
```python
# conversation_history_service.py
class ConversationHistoryService:
def __init__(self, conv_mgr, platform_history_mgr):
self._conv_mgr = conv_mgr
self._platform_history_mgr = platform_history_mgr
def build_unified_origin(self, session) -> str:
message_type = (
MessageType.GROUP_MESSAGE if session.is_group else MessageType.FRIEND_MESSAGE
)
return f"{session.platform_id}:{message_type.value}:{session.platform_id}!{session.creator}!{session.session_id}"
async def sync_edited_message(self, *, session, existing_record, updated_content: dict) -> None:
unified_origin = self.build_unified_origin(session)
conversation_id = await self._conv_mgr.get_curr_conversation_id(unified_origin)
if not conversation_id:
return
conversation = await self._conv_mgr.get_conversation(
unified_msg_origin=unified_origin,
conversation_id=conversation_id,
)
if not conversation:
return
history = json.loads(conversation.history) or []
# use private helpers inside the service:
index = self._find_message_index_in_history(
history=history,
existing_record=existing_record,
updated_content=updated_content,
)
if index is None:
return
history[index] = self._apply_message_update(
history[index],
existing_record,
updated_content,
)
await self._conv_mgr.update_conversation(
unified_msg_origin=unified_origin,
conversation_id=conversation_id,
history=history,
)
async def rewrite_current(
self,
*,
session,
max_user_messages: int | None = None,
max_assistant_messages: int | None = None,
) -> None:
unified_origin = self.build_unified_origin(session)
conversation_id = await self._conv_mgr.get_curr_conversation_id(unified_origin)
if not conversation_id:
return
conversation = await self._conv_mgr.get_conversation(
unified_msg_origin=unified_origin,
conversation_id=conversation_id,
)
if not conversation:
return
history = json.loads(conversation.history) or []
trimmed = self._trim(history,
max_user_messages=max_user_messages,
max_assistant_messages=max_assistant_messages)
await self._conv_mgr.update_conversation(
unified_msg_origin=unified_origin,
conversation_id=conversation_id,
history=trimmed,
)
# private helpers (_extract_conversation_text, _trim, etc.) stay inside service
```
Route code then becomes thinner:
```python
# in route __init__
self.history_service = ConversationHistoryService(
conv_mgr=self.conv_mgr,
platform_history_mgr=self.platform_history_mgr,
)
# in update_message
await self.db.update_platform_message_history(
message_id=message_id, content=content
)
await self.history_service.sync_edited_message(
session=session,
existing_record=record,
updated_content=content,
)
```
```python
# in regenerate_message
await self.db.delete_platform_message_histories([source_user_record.id, target_record.id])
await self.history_service.rewrite_current(
session=session,
max_user_messages=preserved_user_count,
max_assistant_messages=preserved_bot_count,
)
```
This keeps all current behavior but makes conversation logic testable and reusable outside the route.
### 2. Encapsulate regeneration reasoning
`regenerate_message` hand‑rolls a lot of history traversal:
- find target bot record
- enforce “latest bot” / “latest turn” invariants
- compute preserved user/bot counts
You can move this into the same history service as explicit high‑level operations, reducing the route to orchestration.
Example:
```python
# conversation_history_service.py
class ConversationHistoryService:
# ...
async def get_sorted_platform_history(self, session) -> list:
history = await self._platform_history_mgr.get(
platform_id=session.platform_id,
user_id=session.session_id,
page=1,
page_size=100000,
)
history.sort(key=lambda h: (h.created_at, h.id))
return history
def find_last_turn_for_regeneration(self, history_list, message_id: int):
target_index = next(
(i for i, h in enumerate(history_list) if h.id == message_id),
None,
)
if target_index is None:
raise ValueError("not_found")
target = history_list[target_index]
if target.content.get("type") != "bot":
raise ValueError("not_bot")
if target_index != len(history_list) - 1:
raise ValueError("not_latest_bot")
user_index = None
for i in range(target_index - 1, -1, -1):
content = history_list[i].content
if isinstance(content, dict) and content.get("type") == "user":
user_index = i
break
if user_index is None:
raise ValueError("no_user")
if user_index != len(history_list) - 2:
raise ValueError("not_latest_turn")
preserved = history_list[:user_index]
preserved_user = sum(
1 for h in preserved
if isinstance(h.content, dict) and h.content.get("type") == "user"
)
preserved_bot = sum(
1 for h in preserved
if isinstance(h.content, dict) and h.content.get("type") == "bot"
)
return {
"user_record": history_list[user_index],
"bot_record": target,
"preserved_user_count": preserved_user,
"preserved_bot_count": preserved_bot,
}
```
Then `regenerate_message` simplifies to:
```python
async def regenerate_message(self):
# validation as-is ...
history_list = await self.history_service.get_sorted_platform_history(session)
try:
turn = self.history_service.find_last_turn_for_regeneration(
history_list, message_id
)
except ValueError as exc:
# map error codes to existing error messages; keeps behavior unchanged
code = str(exc)
if code == "not_found":
return Response().error(f"Message {message_id} not found").__dict__
if code == "not_bot":
return Response().error("Only bot messages can be regenerated").__dict__
if code == "not_latest_bot":
return (
Response()
.error("Only the latest bot message can be regenerated in place")
.__dict__
)
if code == "no_user":
return Response().error("No user message found for regeneration").__dict__
if code == "not_latest_turn":
return (
Response()
.error("Only the latest user/bot turn can be regenerated in place")
.__dict__
)
raise
source_user_record = turn["user_record"]
target_record = turn["bot_record"]
await self.db.delete_platform_message_histories(
[source_user_record.id, target_record.id]
)
await self.history_service.rewrite_current(
session=session,
max_user_messages=turn["preserved_user_count"],
max_assistant_messages=turn["preserved_bot_count"],
)
regenerated_parts = deepcopy(source_user_record.content.get("message", []))
# build response_data as now
```
The invariants are now encoded in one place and easier to maintain.
### 3. Extract branching into a dedicated service
Branching spans:
- `_get_sorted_platform_history`
- `_remap_reply_message_ids`
- `_build_branch_display_name`
- `_create_branch_session`
- `_clone_session_route`
- `_clone_current_conversation`
- `_get_branch_meta` / `_set_branch_meta`
- parts of `branch_session`
These are all specific to “branch a session” and can live behind a `SessionBranchingService`, leaving the route as validation + one call.
Example:
```python
# session_branching_service.py
class SessionBranchingService:
def __init__(self, db, platform_history_mgr, conv_mgr, umop_config_router, branch_meta_scope: str, branch_meta_key: str):
self._db = db
self._history_mgr = platform_history_mgr
self._conv_mgr = conv_mgr
self._umop_config_router = umop_config_router
self._branch_meta_scope = branch_meta_scope
self._branch_meta_key = branch_meta_key
async def get_branch_meta(self, session_id: str) -> dict | None:
preference = await self._db.get_preference(
self._branch_meta_scope,
session_id,
self._branch_meta_key,
)
if not preference or not isinstance(preference.value, dict):
return None
return preference.value
async def branch_session(
self,
*,
source_session,
creator: str,
display_name: str | None = None,
branch_type: str = "branch",
) -> dict:
# wraps the current _create_branch_session + history copy logic,
# remapping reply ids and returning the same response shape used now
# ...
return {
"session": cloned_session,
"project": project_info,
"branch_meta": branch_meta,
}
```
Route usage:
```python
# in __init__
self.branching_service = SessionBranchingService(
db=self.db,
platform_history_mgr=self.platform_history_mgr,
conv_mgr=self.conv_mgr,
umop_config_router=self.umop_config_router,
branch_meta_scope=self.branch_meta_scope,
branch_meta_key=self.branch_meta_key,
)
# in get_session
branch_meta = await self.branching_service.get_branch_meta(session_id)
# in branch_session
result = await self.branching_service.branch_session(
source_session=session,
creator=username,
display_name=display_name,
branch_type="branch",
)
cloned_session = result["session"]
project_info = result["project"]
branch_meta = result["branch_meta"]
response_data = self._serialize_session(cloned_session, branch_meta)
if project_info:
response_data["project"] = {
"project_id": project_info.project_id,
"title": project_info.title,
"emoji": project_info.emoji,
}
return Response().ok(data=response_data).__dict__
```
This keeps the persistence and behavior identical while removing branching internals from the route.
### 4. Centralize session serialization / presentation
`_serialize_session`, `_get_branch_meta`, `_set_branch_meta`, and the `branch_meta_*` constants are presentation/persistence concerns. You can move them into a small presenter or reuse the branching service for this.
Example:
```python
# session_presenter.py
class SessionPresenter:
def serialize(self, session, branch_meta: dict | None = None) -> dict:
return {
"session_id": session.session_id,
"platform_id": session.platform_id,
"creator": session.creator,
"display_name": session.display_name,
"is_group": session.is_group,
"created_at": to_utc_isoformat(session.created_at),
"updated_at": to_utc_isoformat(session.updated_at),
"branch_meta": branch_meta,
}
```
Route code:
```python
# in __init__
self.session_presenter = SessionPresenter()
# in get_sessions
sessions_data = [
self.session_presenter.serialize(
item["session"],
branch_meta_map.get(item["session"].session_id),
)
for item in sessions
]
```
Overall effect:
- Route remains responsible for HTTP validation and high‑level orchestration.
- Conversation editing/rewriting, regeneration invariants, and branching become focused, testable services.
- The behavior remains unchanged, but the mental load and coupling in this route file are significantly reduced.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
我只是想在Web聊天界面加一个和SillyTavern一样的编辑消息的功能,并支持重新生成最新一条消息而已 |
|
已处理 已处理 已处理 |
|
已经修复了代码检查出示的潜在风险,排除了问题。 |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- Several helpers like
_get_sorted_platform_historyand_sync_conversation_history_messagefetch and sort up to 100000 history records on each call; consider introducing a configurable page size/constant or a more targeted lookup (e.g., by id or ordinal passed from the client) to avoid unnecessary full-history scans on large sessions. - In
branch_sessionyou clone platform history by iterating and inserting each record one-by-one; if sessions can be long, a bulk insert or batched approach at the database layer would reduce round-trips and improve performance. - The regeneration flow (
regenerate_message) relies on strict structural assumptions (last two messages must form a user/bot turn, and history is re-counted via_rewrite_current_conversation); it might be worth centralising this invariant check into a small helper and adding a short code comment to document the expected shape of history to make future changes safer.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Several helpers like `_get_sorted_platform_history` and `_sync_conversation_history_message` fetch and sort up to 100000 history records on each call; consider introducing a configurable page size/constant or a more targeted lookup (e.g., by id or ordinal passed from the client) to avoid unnecessary full-history scans on large sessions.
- In `branch_session` you clone platform history by iterating and inserting each record one-by-one; if sessions can be long, a bulk insert or batched approach at the database layer would reduce round-trips and improve performance.
- The regeneration flow (`regenerate_message`) relies on strict structural assumptions (last two messages must form a user/bot turn, and history is re-counted via `_rewrite_current_conversation`); it might be worth centralising this invariant check into a small helper and adding a short code comment to document the expected shape of history to make future changes safer.
## Individual Comments
### Comment 1
<location path="astrbot/dashboard/routes/chat.py" line_range="759-760" />
<code_context>
+ if source_conf_id:
+ await self.umop_config_router.update_route(target_umo, source_conf_id)
+
+ async def _get_sorted_platform_history(self, session) -> list:
+ history_list = await self.platform_history_mgr.get(
+ platform_id=session.platform_id,
+ user_id=session.session_id,
</code_context>
<issue_to_address>
**suggestion (performance):** The very large `page_size=100000` and in-memory sort for platform history may not scale well.
Several helpers (`_get_sorted_platform_history`, `_sync_conversation_history_message`, `branch_session`, `regenerate_message`) rely on fetching up to 100k history records and sorting them in memory, which can be expensive for active users or long-lived sessions. Consider reducing the limit (and making it configurable) and/or pushing the sort into the DB via an indexed `ORDER BY` so you avoid the in-memory sort and extra pass in Python.
Suggested implementation:
```python
if source_conf_id:
await self.umop_config_router.update_route(target_umo, source_conf_id)
page=1,
page_size=1000,
)
```
To fully implement the scaling improvements you described, you should also:
1. Make `page_size` configurable (e.g. via a settings/config module), and replace the hard-coded `1000` with that configuration constant in this file.
2. Update `platform_history_mgr.get(...)` to:
- Accept an `order_by` argument (e.g. `order_by=("created_at", "id")`).
- Apply ordering in the database query, ensuring there is an index on `(platform_id, session_id/user_id, created_at, id)` as appropriate.
3. Update `_get_sorted_platform_history`, `_sync_conversation_history_message`, `branch_session`, and `regenerate_message` to:
- Rely on the database-ordered result from `platform_history_mgr.get(...)` instead of performing in-memory sorting.
- Use the configurable `page_size` rather than any other hard-coded large limits if present.
</issue_to_address>
### Comment 2
<location path="astrbot/dashboard/routes/chat.py" line_range="318" />
<code_context>
)
return record
+ def _sanitize_message_content(self, content: dict) -> dict:
+ """Normalize message content before persisting it."""
+ if not isinstance(content, dict):
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the new message-content utilities, conversation-history sync logic, and session-branching/regeneration orchestration into dedicated helper/service modules to keep the route focused on HTTP and permissions wiring.
You can keep all the new behavior but reduce route complexity by extracting a couple of focused helpers/services. The logic is already nicely grouped into coherent clusters; you can move those clusters out of the route without changing behavior.
### 1. Extract message-content utilities out of the route
These are generic and don’t need access to `self`:
- `_sanitize_message_content`
- `_extract_platform_message_text`
- `_remap_reply_message_ids`
Move them into a module like `message_content.py` and call them from the route. That removes low-level data-munging from the HTTP class and makes them easier to test in isolation.
```python
# message_content.py
from copy import deepcopy
from astrbot.core.platform.sources.webchat.message_parts_helper import (
strip_message_parts_path_fields,
)
def sanitize_message_content(content: dict) -> dict:
if not isinstance(content, dict):
raise ValueError("Missing key: content")
normalized = deepcopy(content)
message_type = normalized.get("type")
if not isinstance(message_type, str) or not message_type:
raise ValueError("Missing key: content.type")
message_parts = normalized.get("message")
if not isinstance(message_parts, list):
raise ValueError("Missing key: content.message")
normalized["message"] = strip_message_parts_path_fields(message_parts)
return normalized
def extract_platform_message_text(content: dict | None) -> str:
if not isinstance(content, dict):
return ""
message_parts = content.get("message")
if not isinstance(message_parts, list):
return ""
texts: list[str] = []
for part in message_parts:
if isinstance(part, dict) and part.get("type") == "plain":
text = part.get("text")
if isinstance(text, str):
texts.append(text)
return "".join(texts)
def remap_reply_message_ids(message_parts: list[dict], message_id_map: dict[str, int]) -> None:
for part in message_parts:
if not isinstance(part, dict) or part.get("type") != "reply":
continue
message_id = part.get("message_id")
if message_id is None:
continue
mapped_id = message_id_map.get(str(message_id))
if mapped_id is not None:
part["message_id"] = mapped_id
```
Route usage becomes thinner:
```python
# in ChatRoute
from .message_content import (
sanitize_message_content,
extract_platform_message_text,
remap_reply_message_ids,
)
async def update_message(self):
...
try:
message_id = int(message_id)
content = sanitize_message_content(content)
...
async def branch_session(self):
...
for history in history_list:
cloned_content = deepcopy(history.content or {})
message_parts = cloned_content.get("message")
if isinstance(message_parts, list):
remap_reply_message_ids(message_parts, old_to_new_message_ids)
cloned_content["message"] = strip_message_parts_path_fields(message_parts)
...
async def _sync_conversation_history_message(...):
...
old_text = extract_platform_message_text(existing_record.content)
edited_text = extract_platform_message_text(updated_content)
...
```
### 2. Extract conversation-history sync into a helper/service
All of these form one cohesive concern:
- `_extract_conversation_text`
- `_extract_conversation_think`
- `_is_displayable_conversation_message`
- `_replace_user_conversation_content`
- `_replace_assistant_conversation_content`
- `_find_conversation_history_index`
- `_trim_conversation_history`
- `_clone_current_conversation`
- `_rewrite_current_conversation`
- `_sync_conversation_history_message`
You can keep the algorithms intact but move them into `conversation_history_service.py`. The route then just passes dependencies and parameters.
```python
# conversation_history_service.py
import json
from copy import deepcopy
from typing import Any
from astrbot.core.utils.datetime_utils import to_utc_isoformat # if needed
from .message_content import extract_platform_message_text
class ConversationHistoryService:
def __init__(self, conv_mgr, platform_history_mgr, build_unified_origin):
self._conv_mgr = conv_mgr
self._platform_history_mgr = platform_history_mgr
# build_unified_origin: (session) -> str
self._build_unified_origin = build_unified_origin
# move _extract_conversation_text, _extract_conversation_think,
# _is_displayable_conversation_message, _replace_*,
# _find_conversation_history_index, _trim_conversation_history here
async def sync_message(self, session, existing_record, updated_content: dict) -> None:
unified = self._build_unified_origin(session)
conversation_id = await self._conv_mgr.get_curr_conversation_id(unified)
if not conversation_id:
return
conversation = await self._conv_mgr.get_conversation(
unified_msg_origin=unified,
conversation_id=conversation_id,
)
if not conversation:
return
history = json.loads(conversation.history)
if not isinstance(history, list):
return
platform_history = await self._platform_history_mgr.get(
platform_id=session.platform_id,
user_id=session.session_id,
page=1,
page_size=100000,
)
platform_history.sort(key=lambda item: (item.created_at, item.id))
role_type = existing_record.content.get("type")
...
# same logic currently in _sync_conversation_history_message
await self._conv_mgr.update_conversation(
unified_msg_origin=unified,
conversation_id=conversation_id,
history=history,
)
async def clone_current_conversation(self, source_session, target_session, *, max_user_messages=None, max_assistant_messages=None) -> None:
...
# move _clone_current_conversation body here
async def rewrite_current_conversation(self, session, *, max_user_messages=None, max_assistant_messages=None) -> None:
...
# move _rewrite_current_conversation body here
```
In the route:
```python
# in ChatRoute.__init__
self._conv_history_service = ConversationHistoryService(
self.conv_mgr,
self.platform_history_mgr,
self._build_webchat_unified_msg_origin,
)
async def update_message(self):
...
await self.db.update_platform_message_history(
message_id=message_id, content=content
)
await self._conv_history_service.sync_message(
session=session,
existing_record=record,
updated_content=content,
)
...
async def regenerate_message(self):
...
await self.db.delete_platform_message_histories(
[source_user_record.id, target_record.id]
)
await self._conv_history_service.rewrite_current_conversation(
session,
max_user_messages=preserved_user_count,
max_assistant_messages=preserved_bot_count,
)
...
```
This keeps all behaviors but makes it clear that “conversation history sync/clone/trim” is a separate concern from HTTP + DB wiring.
### 3. Extract branching/regeneration orchestration into a session-branching service
These methods define a clear “branching” domain:
- `_build_webchat_unified_msg_origin` (or pass as dependency)
- `_serialize_session`
- `_get_branch_meta`, `_set_branch_meta`
- `_clone_session_route`
- `_get_sorted_platform_history`
- `_find_message_index`
- `_build_branch_display_name`
- `_create_branch_session`
- `branch_session`
- parts of `regenerate_message` that orchestrate DB + cloning + branch meta
A service like `SessionBranchingService` can expose clear APIs:
```python
# session_branching_service.py
from copy import deepcopy
from astrbot.core.platform.sources.webchat.message_parts_helper import (
strip_message_parts_path_fields,
)
from .message_content import remap_reply_message_ids
class SessionBranchingService:
def __init__(self, db, platform_history_mgr, umop_config_router, branch_meta_scope, branch_meta_key):
self._db = db
self._platform_history_mgr = platform_history_mgr
self._umop_config_router = umop_config_router
self._branch_meta_scope = branch_meta_scope
self._branch_meta_key = branch_meta_key
async def get_branch_meta(self, session_id: str) -> dict | None:
pref = await self._db.get_preference(
self._branch_meta_scope,
session_id,
self._branch_meta_key,
)
if not pref or not isinstance(pref.value, dict):
return None
return pref.value
async def create_branch_session_with_history(self, source_session, username: str, *, display_name: str | None, branch_type: str):
history_list = await self._platform_history_mgr.get(
platform_id=source_session.platform_id,
user_id=source_session.session_id,
page=1,
page_size=100000,
)
history_list.sort(key=lambda history: (history.created_at, history.id))
cloned_session, project_info, branch_meta = await self._create_branch_session(
source_session,
username,
display_name=display_name,
branch_type=branch_type,
)
old_to_new_ids: dict[str, int] = {}
for history in history_list:
cloned_content = deepcopy(history.content or {})
message_parts = cloned_content.get("message")
if isinstance(message_parts, list):
remap_reply_message_ids(message_parts, old_to_new_ids)
cloned_content["message"] = strip_message_parts_path_fields(message_parts)
new_record = await self._platform_history_mgr.insert(
platform_id=cloned_session.platform_id,
user_id=cloned_session.session_id,
content=cloned_content,
sender_id=history.sender_id,
sender_name=history.sender_name,
)
old_to_new_ids[str(history.id)] = new_record.id
return cloned_session, project_info, branch_meta
# move _build_branch_display_name, _create_branch_session, _clone_session_route, etc., here
```
Then your route reduces to permission checks + delegating:
```python
# in ChatRoute.__init__
from .session_branching_service import SessionBranchingService
self.branch_meta_scope = "webchat_session"
self.branch_meta_key = "branch_meta"
self._branching_service = SessionBranchingService(
self.db,
self.platform_history_mgr,
self.umop_config_router,
self.branch_meta_scope,
self.branch_meta_key,
)
async def get_session(self):
...
branch_meta = await self._branching_service.get_branch_meta(session_id)
...
async def branch_session(self):
...
cloned_session, project_info, branch_meta = await self._branching_service.create_branch_session_with_history(
session,
username,
display_name=display_name,
branch_type="branch",
)
response_data = self._serialize_session(cloned_session, branch_meta)
...
```
You can follow the same pattern for the “regeneration” part of `regenerate_message` (the DB deletions + preserved history counts + conversation rewrite) by moving that orchestration into the branching service or a separate `RegenerationService`.
---
These small extractions keep all current behavior and data formats but:
- make the route methods mostly about HTTP I/O and permissions,
- localize the complex algorithms in domain helpers with clear responsibilities,
- give you better test boundaries for conversation sync, branching, and message-content handling.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
ce36040 to
f6b8fe5
Compare
|
不小心推送错仓库了,已经回退。 |
|
@Soulter @zouyonghe @LIghtJUNction @advent259141 |
|
可以简单解释一下你是如何做到编辑、分支和重新生成的吗? 我理解在 AstrBot 里面做到这个是比较复杂的。 我之前的想法是:
这样才可以做到编辑、重新生成。 为什么要这样设计?因为 ChatUI 下用户可能会调用指令,这样不会进入 LLM 历史记录。 至于分支就更复杂了。 cc @zouyonghe @LIghtJUNction @advent259141 如果有更好的点子可以一起讨论 |
对不起,我犯了一个很大的错误,我完全没有考虑到工具这个情况。 我的方案其实很单纯,就是当编辑消息的时候,首先直接编辑PlatformMessageHistory中的对应信息,然后利用session_id和message_id调出PlatformMessageHistory和Conversation中的文本,然后根据它在PlatformMessageHistory中是第X条用户/AI消息,直接写进Conversation的第X条用户/AI消息,的确没有考虑到调用工具的情况。因为之前用的时候有莫名其妙发现ChatUI的消息发送后失踪的情况,所以姑且做了个万一出错就根据具体的消息内容去进行字段匹配的备用措施。 之所以一开始单纯用第X条消息作为锚点,是纯粹没考虑到只在ChatUI显示而不在消息记录的情况,没有理解到两个地方数据源不同的用意,认为它只是一种莫名其妙的设计残留。 所以,我目前做的这个方案是不可行的,除非只用来文字聊天。 |
|
但是我仍然想要实现这个功能,因为它对于我来说确实非常重要。 我将重新构思解决方案,但这可能要涉及到更多的修改与重构,在目前的框架下我认为这是困难的。 |
中文
修改内容
验证
python -m ruff check .npm exec vue-tsc -- --noEmitnpm run buildEnglish
Changes
Validation
python -m ruff check .npm exec vue-tsc -- --noEmitnpm run buildSummary by Sourcery
Add WebChat support for editing persisted messages, branching sessions, and in-place regeneration of the latest assistant reply while keeping platform history and conversation context in sync.
New Features:
Enhancements: