feat(distribution_manager): 在处理成功后添加短暂等待,确保清理操作完成

feat(message_manager): 简化清除未读消息逻辑,移除冗余注释
feat(action_manager): 移除自动清空未读消息的逻辑,改为手动处理
feat(plan_executor): 扩展回复类动作的分类,包含 respond 动作
This commit is contained in:
Windpicker-owo
2025-11-10 21:38:55 +08:00
parent c46df81bca
commit f3af3caf71
5 changed files with 16 additions and 86 deletions

View File

@@ -249,6 +249,10 @@ class StreamLoopManager:
self.stats["total_process_cycles"] += 1
if success:
logger.info(f"✅ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理成功")
# 🔒 处理成功后,等待一小段时间确保清理操作完成
# 这样可以避免在 chatter_manager 清除未读消息之前就进入下一轮循环
await asyncio.sleep(0.1)
else:
self.stats["total_failures"] += 1
logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败")
@@ -347,15 +351,9 @@ class StreamLoopManager:
# 设置处理状态为正在处理
self._set_stream_processing_status(stream_id, True)
# 子任务跟踪 - 存储到 context 中以便打断时可以访问
child_tasks = set()
context._active_process_tasks = child_tasks # 临时存储子任务集合
chatter_task = None
try:
start_time = time.time()
# 注意缓存消息刷新已移至planner开始时执行动作修改器之后此处不再刷新
# 检查未读消息,如果为空则直接返回(优化:避免无效的 chatter 调用)
unread_messages = context.get_unread_messages()
if not unread_messages:
@@ -370,9 +368,7 @@ class StreamLoopManager:
context.triggering_user_id = last_message.user_info.user_id
# 创建子任务用于刷新能量(不阻塞主流程)
energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id))
child_tasks.add(energy_task)
energy_task.add_done_callback(lambda t: child_tasks.discard(t))
asyncio.create_task(self._refresh_focus_energy(stream_id))
# 设置 Chatter 正在处理的标志
context.is_chatter_processing = True
@@ -383,9 +379,7 @@ class StreamLoopManager:
self.chatter_manager.process_stream_context(stream_id, context),
name=f"chatter_process_{stream_id}"
)
child_tasks.add(chatter_task)
context._chatter_task = chatter_task # 保存主 chatter 任务引用
# 等待 chatter 任务完成
results = await chatter_task
success = results.get("success", False)
@@ -401,47 +395,19 @@ class StreamLoopManager:
else:
logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}")
return success
except asyncio.CancelledError:
logger.info(f"流处理被取消,正在清理所有子任务: {stream_id}")
# 取消所有子任务(包括 chatter 任务和其后台任务)
cancel_count = 0
for child_task in list(child_tasks): # 使用 list() 创建副本避免迭代时修改
if not child_task.done():
child_task.cancel()
cancel_count += 1
if cancel_count > 0:
logger.info(f"已取消 {cancel_count} 个子任务: {stream_id}")
# 等待所有子任务真正结束(设置短超时)
try:
await asyncio.wait_for(
asyncio.gather(*child_tasks, return_exceptions=True),
timeout=1.0
)
except asyncio.TimeoutError:
logger.warning(f"等待子任务取消超时: {stream_id}")
return success
except asyncio.CancelledError:
if chatter_task and not chatter_task.done():
chatter_task.cancel()
raise
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
# 异常时也要清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
return False
finally:
# 清除 Chatter 处理标志
context.is_chatter_processing = False
logger.debug(f"清除 Chatter 处理标志: {stream_id}")
# 清理临时存储的任务引用
if hasattr(context, '_active_process_tasks'):
delattr(context, '_active_process_tasks')
if hasattr(context, '_chatter_task'):
delattr(context, '_chatter_task')
# 无论成功或失败,都要设置处理状态为未处理
self._set_stream_processing_status(stream_id, False)

View File

@@ -436,11 +436,7 @@ class MessageManager:
logger.error(f"清除未读消息时发生错误: {e}")
async def clear_stream_unread_messages(self, stream_id: str):
"""清除指定聊天流的所有未读消息
此方法会标记剩余的未读消息为已读(移到历史),作为兜底保护。
正常情况下action_manager 应该已经标记了所有消息,这里只是确保没有遗漏。
"""
"""清除指定聊天流的所有未读消息"""
try:
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
@@ -453,9 +449,7 @@ class MessageManager:
unread_count = len(context.unread_messages)
# 如果还有未读消息,说明 action_manager 可能遗漏了,标记它们
if unread_count > 0:
logger.debug(f"🧹 [兜底清理] stream={stream_id[:8]}, 发现 {unread_count} 条剩余未读消息,标记为已读")
if unread_count > 0:
# 获取所有未读消息的 ID
message_ids = [msg.message_id for msg in context.unread_messages]
@@ -463,9 +457,8 @@ class MessageManager:
success = chat_stream.context_manager.mark_messages_as_read(message_ids)
if success:
logger.debug(f" [兜底清理] stream={stream_id[:8]}, 成功标记 {unread_count} 条消息为已读")
logger.debug(f"✅ stream={stream_id[:8]}, 成功标记 {unread_count} 条消息为已读")
else:
logger.warning(f"⚠️ [兜底清理] stream={stream_id[:8]}, 标记失败,直接清空")
context.unread_messages.clear()
else:
logger.debug(f"{stream_id[:8]} 没有剩余未读消息,无需清理")

View File

@@ -215,9 +215,6 @@ class ChatterActionManager:
action_name="no_reply",
)
# 自动清空所有未读消息(改为同步等待)
await self._clear_all_unread_messages(chat_stream.stream_id, "no_reply")
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
elif action_name != "reply" and action_name != "respond" and action_name != "no_action":
@@ -235,9 +232,6 @@ class ChatterActionManager:
# 记录执行的动作到目标消息(改为同步等待)
if success:
await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
# 自动清空所有未读消息
if clear_unread_messages:
await self._clear_all_unread_messages(chat_stream.stream_id, action_name)
# 重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id)
@@ -322,9 +316,6 @@ class ChatterActionManager:
# 记录回复动作到目标消息
await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
if clear_unread_messages:
await self._clear_all_unread_messages(chat_stream.stream_id, action_name)
# 回复成功,重置打断计数(改为同步等待)
await self._reset_interruption_count_after_action(chat_stream.stream_id)
@@ -401,24 +392,6 @@ class ChatterActionManager:
except Exception as e:
logger.warning(f"重置打断计数时出错: {e}")
async def _clear_all_unread_messages(self, stream_id: str, action_name: str):
"""在动作执行成功后自动清空所有未读消息
Args:
stream_id: 聊天流ID
action_name: 动作名称
"""
try:
from src.chat.message_manager.message_manager import message_manager
# 清空所有未读消息
await message_manager.clear_all_unread_messages(stream_id)
logger.debug(f"[{action_name}] 已自动清空聊天流 {stream_id} 的所有未读消息")
except Exception as e:
logger.error(f"[{action_name}] 自动清空未读消息时出错: {e}")
# 不抛出异常,避免影响主要功能
async def _handle_action(
self, chat_stream, action, reasoning, action_data, cycle_timers, thinking_id, action_message
) -> tuple[bool, str, str]:

View File

@@ -78,8 +78,9 @@ class ChatterPlanExecutor:
other_actions = []
# 分类动作:回复动作和其他动作
# 回复类动作包括reply, proactive_reply, respond
for action_info in plan.decided_actions:
if action_info.action_type in ["reply", "proactive_reply"]:
if action_info.action_type in ["reply", "proactive_reply", "respond"]:
reply_actions.append(action_info)
else:
other_actions.append(action_info)

View File

@@ -363,9 +363,6 @@ class ChatterPlanFilter:
stream_context = chat_stream.context_manager
# 🔥 确保历史消息已从数据库加载
await stream_context.ensure_history_initialized()
# 获取真正的已读和未读消息
read_messages = stream_context.context.history_messages # 已读消息存储在history_messages中
if not read_messages: