feat(chat): 实现延迟重处理并修复并发问题

本次提交为聊天处理逻辑带来了两项关键改进:

1.  **中断后的延迟重处理:**
    当机器人被新消息打断时,系统现在会等待一个短暂的延迟(例如0.5秒)再重新处理。这允许多条快速连续发送的消息被合并到同一次处理中,避免了因消息轰炸导致的多次、零碎的处理流程,提高了效率和响应的连贯性。

2.  **为聊天器添加并发锁:**
    在 `AffinityChatter` 中引入了 `asyncio.Lock`,以确保每个实例一次只处理一个流上下文。这可以防止在并发场景下可能出现的竞争条件,保证了数据处理的原子性和状态的一致性。
This commit is contained in:
tt-P607
2025-10-13 09:22:57 +08:00
committed by Windpicker-owo
parent c88b259c0e
commit 34701e34f1
2 changed files with 54 additions and 45 deletions

View File

@@ -7,7 +7,7 @@ import asyncio
import random
import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Dict
from src.chat.chatter_manager import ChatterManager
from src.chat.message_receive.chat_stream import ChatStream
@@ -384,7 +384,8 @@ class MessageManager:
await chat_stream.context_manager.context.increment_interruption_count()
# 🚀 新增:打断后立即重新进入聊天流程
await self._trigger_immediate_reprocess(chat_stream)
# 🚀 新增:打断后延迟重新进入聊天流程,以合并短时间内的多条消息
asyncio.create_task(self._trigger_delayed_reprocess(chat_stream, delay=0.5))
# 检查是否已达到最大次数
if chat_stream.context_manager.context.interruption_count >= global_config.chat.interruption_max_limit:
@@ -398,8 +399,13 @@ class MessageManager:
else:
logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f},检测到 {len(all_processing_tasks)} 个任务")
async def _trigger_immediate_reprocess(self, chat_stream: ChatStream):
"""打断后立即重新进入聊天流程"""
async def _trigger_delayed_reprocess(self, chat_stream: ChatStream, delay: float):
"""打断后延迟重新进入聊天流程,以合并短时间内的多条消息"""
await asyncio.sleep(delay)
await self._trigger_reprocess(chat_stream)
async def _trigger_reprocess(self, chat_stream: ChatStream):
"""重新处理聊天流的核心逻辑"""
try:
stream_id = chat_stream.stream_id
@@ -519,6 +525,7 @@ class MessageManager:
self.message_caches[stream_id].append(message)
self.cache_stats["total_cached_messages"] += 1
if message.processed_plain_text:
logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...")
return True
except Exception as e:

View File

@@ -42,6 +42,7 @@ class AffinityChatter(BaseChatter):
"""
super().__init__(stream_id, action_manager)
self.planner = ChatterActionPlanner(stream_id, action_manager)
self._lock = asyncio.Lock()
# 处理器统计
self.stats = {
@@ -63,6 +64,7 @@ class AffinityChatter(BaseChatter):
Returns:
处理结果字典
"""
async with self._lock:
try:
# 触发表达学习
learner = await expression_learner_manager.get_expression_learner(self.stream_id)