diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index e1ad6308c..70afbcfa8 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -7,7 +7,7 @@ import asyncio import random import time from collections import defaultdict, deque -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Dict from src.chat.chatter_manager import ChatterManager from src.chat.message_receive.chat_stream import ChatStream @@ -384,7 +384,8 @@ class MessageManager: await chat_stream.context_manager.context.increment_interruption_count() # 🚀 新增:打断后立即重新进入聊天流程 - await self._trigger_immediate_reprocess(chat_stream) + # 🚀 新增:打断后延迟重新进入聊天流程,以合并短时间内的多条消息 + asyncio.create_task(self._trigger_delayed_reprocess(chat_stream, delay=0.5)) # 检查是否已达到最大次数 if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit: @@ -398,8 +399,13 @@ class MessageManager: else: logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务") - async def _trigger_immediate_reprocess(self, chat_stream: ChatStream): - """打断后立即重新进入聊天流程""" + async def _trigger_delayed_reprocess(self, chat_stream: ChatStream, delay: float): + """打断后延迟重新进入聊天流程,以合并短时间内的多条消息""" + await asyncio.sleep(delay) + await self._trigger_reprocess(chat_stream) + + async def _trigger_reprocess(self, chat_stream: ChatStream): + """重新处理聊天流的核心逻辑""" try: stream_id = chat_stream.stream_id @@ -519,7 +525,8 @@ class MessageManager: self.message_caches[stream_id].append(message) self.cache_stats["total_cached_messages"] += 1 - logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...") + if message.processed_plain_text: + logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...") return True except Exception as e: logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}") diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py index 760ed07a6..a94e09c8c 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py @@ -42,6 +42,7 @@ class AffinityChatter(BaseChatter): """ super().__init__(stream_id, action_manager) self.planner = ChatterActionPlanner(stream_id, action_manager) + self._lock = asyncio.Lock() # 处理器统计 self.stats = { @@ -63,55 +64,56 @@ class AffinityChatter(BaseChatter): Returns: 处理结果字典 """ - try: - # 触发表达学习 - learner = await expression_learner_manager.get_expression_learner(self.stream_id) - asyncio.create_task(learner.trigger_learning_for_chat()) + async with self._lock: + try: + # 触发表达学习 + learner = await expression_learner_manager.get_expression_learner(self.stream_id) + asyncio.create_task(learner.trigger_learning_for_chat()) - unread_messages = context.get_unread_messages() + unread_messages = context.get_unread_messages() - # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(context=context) - self.stats["plans_created"] += 1 + # 使用增强版规划器处理消息 + actions, target_message = await self.planner.plan(context=context) + self.stats["plans_created"] += 1 - # 执行动作(如果规划器返回了动作) - execution_result = {"executed_count": len(actions) if actions else 0} - if actions: - logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作") + # 执行动作(如果规划器返回了动作) + execution_result = {"executed_count": len(actions) if actions else 0} + if actions: + logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作") - # 更新统计 - self.stats["messages_processed"] += 1 - self.stats["actions_executed"] += execution_result.get("executed_count", 0) - self.stats["successful_executions"] += 1 - self.last_activity_time = time.time() + # 更新统计 + self.stats["messages_processed"] += 1 + self.stats["actions_executed"] += execution_result.get("executed_count", 0) + self.stats["successful_executions"] += 1 + self.last_activity_time = time.time() - result = { - "success": True, - "stream_id": self.stream_id, - "plan_created": True, - "actions_count": len(actions) if actions else 0, - "has_target_message": target_message is not None, - "unread_messages_processed": len(unread_messages), - **execution_result, - } + result = { + "success": True, + "stream_id": self.stream_id, + "plan_created": True, + "actions_count": len(actions) if actions else 0, + "has_target_message": target_message is not None, + "unread_messages_processed": len(unread_messages), + **execution_result, + } - logger.info( - f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}" - ) + logger.info( + f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}" + ) - return result + return result - except Exception as e: - logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}") - self.stats["failed_executions"] += 1 - self.last_activity_time = time.time() + except Exception as e: + logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}") + self.stats["failed_executions"] += 1 + self.last_activity_time = time.time() - return { - "success": False, - "stream_id": self.stream_id, - "error_message": str(e), - "executed_count": 0, - } + return { + "success": False, + "stream_id": self.stream_id, + "error_message": str(e), + "executed_count": 0, + } def get_stats(self) -> dict[str, Any]: """