feat(chat): 实现延迟重处理并修复并发问题

本次提交为聊天处理逻辑带来了两项关键改进:

1.  **中断后的延迟重处理:**
    当机器人被新消息打断时,系统现在会等待一个短暂的延迟(例如0.5秒)再重新处理。这允许多条快速连续发送的消息被合并到同一次处理中,避免了因消息轰炸导致的多次、零碎的处理流程,提高了效率和响应的连贯性。

2.  **为聊天器添加并发锁:**
    在 `AffinityChatter` 中引入了 `asyncio.Lock`,以确保每个实例一次只处理一个流上下文。这可以防止在并发场景下可能出现的竞争条件,保证了数据处理的原子性和状态的一致性。
This commit is contained in:
tt-P607
2025-10-13 09:22:57 +08:00
parent 95959b2c16
commit 109aef4623
2 changed files with 54 additions and 45 deletions

View File

@@ -7,7 +7,7 @@ import asyncio
import random
import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Dict
from src.chat.chatter_manager import ChatterManager
from src.chat.message_receive.chat_stream import ChatStream
@@ -384,7 +384,8 @@ class MessageManager:
await chat_stream.context_manager.context.increment_interruption_count()
# 🚀 新增:打断后立即重新进入聊天流程
await self._trigger_immediate_reprocess(chat_stream)
# 🚀 新增:打断后延迟重新进入聊天流程,以合并短时间内的多条消息
asyncio.create_task(self._trigger_delayed_reprocess(chat_stream, delay=0.5))
# 检查是否已达到最大次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
@@ -398,8 +399,13 @@ class MessageManager:
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
async def _trigger_immediate_reprocess(self, chat_stream: ChatStream):
"""打断后立即重新进入聊天流程"""
async def _trigger_delayed_reprocess(self, chat_stream: ChatStream, delay: float):
"""打断后延迟重新进入聊天流程,以合并短时间内的多条消息"""
await asyncio.sleep(delay)
await self._trigger_reprocess(chat_stream)
async def _trigger_reprocess(self, chat_stream: ChatStream):
"""重新处理聊天流的核心逻辑"""
try:
stream_id = chat_stream.stream_id
@@ -519,7 +525,8 @@ class MessageManager:
self.message_caches[stream_id].append(message)
self.cache_stats["total_cached_messages"] += 1
logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...")
if message.processed_plain_text:
logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...")
return True
except Exception as e:
logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}")

View File

@@ -42,6 +42,7 @@ class AffinityChatter(BaseChatter):
"""
super().__init__(stream_id, action_manager)
self.planner = ChatterActionPlanner(stream_id, action_manager)
self._lock = asyncio.Lock()
# 处理器统计
self.stats = {
@@ -63,55 +64,56 @@ class AffinityChatter(BaseChatter):
Returns:
处理结果字典
"""
try:
# 触发表达学习
learner = await expression_learner_manager.get_expression_learner(self.stream_id)
asyncio.create_task(learner.trigger_learning_for_chat())
async with self._lock:
try:
# 触发表达学习
learner = await expression_learner_manager.get_expression_learner(self.stream_id)
asyncio.create_task(learner.trigger_learning_for_chat())
unread_messages = context.get_unread_messages()
unread_messages = context.get_unread_messages()
# 使用增强版规划器处理消息
actions, target_message = await self.planner.plan(context=context)
self.stats["plans_created"] += 1
# 使用增强版规划器处理消息
actions, target_message = await self.planner.plan(context=context)
self.stats["plans_created"] += 1
# 执行动作(如果规划器返回了动作)
execution_result = {"executed_count": len(actions) if actions else 0}
if actions:
logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作")
# 执行动作(如果规划器返回了动作)
execution_result = {"executed_count": len(actions) if actions else 0}
if actions:
logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作")
# 更新统计
self.stats["messages_processed"] += 1
self.stats["actions_executed"] += execution_result.get("executed_count", 0)
self.stats["successful_executions"] += 1
self.last_activity_time = time.time()
# 更新统计
self.stats["messages_processed"] += 1
self.stats["actions_executed"] += execution_result.get("executed_count", 0)
self.stats["successful_executions"] += 1
self.last_activity_time = time.time()
result = {
"success": True,
"stream_id": self.stream_id,
"plan_created": True,
"actions_count": len(actions) if actions else 0,
"has_target_message": target_message is not None,
"unread_messages_processed": len(unread_messages),
**execution_result,
}
result = {
"success": True,
"stream_id": self.stream_id,
"plan_created": True,
"actions_count": len(actions) if actions else 0,
"has_target_message": target_message is not None,
"unread_messages_processed": len(unread_messages),
**execution_result,
}
logger.info(
f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}"
)
logger.info(
f"聊天流 {self.stream_id} StreamContext处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}"
)
return result
return result
except Exception as e:
logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}")
self.stats["failed_executions"] += 1
self.last_activity_time = time.time()
except Exception as e:
logger.error(f"亲和力聊天处理器 {self.stream_id} 处理StreamContext时出错: {e}\n{traceback.format_exc()}")
self.stats["failed_executions"] += 1
self.last_activity_time = time.time()
return {
"success": False,
"stream_id": self.stream_id,
"error_message": str(e),
"executed_count": 0,
}
return {
"success": False,
"stream_id": self.stream_id,
"error_message": str(e),
"executed_count": 0,
}
def get_stats(self) -> dict[str, Any]:
"""