feat(chat): 实现并发消息处理系统
引入了一个全新的并发消息处理系统,以显著提升在高活跃度群聊中的响应速度。 在此之前,消息管理器对每个聊天流(如一个群聊)内的所有消息进行串行处理,导致用户需要排队等待机器人响应。新系统引入了可配置的并发模式: - 通过 `concurrent_message_processing` 开关启用。 - 允许并行处理来自同一群聊中不同用户的消息。 - 通过 `process_by_user_id` 保证对同一用户的消息处理仍然是串行的,以维持上下文的连贯性。 - 使用 `concurrent_per_user_limit` 控制并发处理的用户数量。 为了支持此功能,对 `MessageManager` 进行了大规模重构,用更高效的独立流检查机制取代了旧的全局轮询和优先级排序逻辑。同时,清理和移除了大量已废弃或冗余的配置项,简化了整体配置。 BREAKING CHANGE: 移除了多个已废弃的 `ChatConfig` 配置项,包括 `mentioned_bot_inevitable_reply`, `at_bot_inevitable_reply`, `focus_value`, `group_chat_mode` 等。这些功能已被新的 AFC 逻辑或其它机制取代。请参考最新的配置文件模板进行更新。
This commit is contained in:
@@ -79,7 +79,9 @@ class ChatterManager:
|
||||
del self.instances[stream_id]
|
||||
logger.info(f"清理不活跃聊天流实例: {stream_id}")
|
||||
|
||||
async def process_stream_context(self, stream_id: str, context: StreamContext) -> dict:
|
||||
async def process_stream_context(
|
||||
self, stream_id: str, context: StreamContext, unread_messages: Optional[List[Any]] = None
|
||||
) -> dict:
|
||||
"""处理流上下文"""
|
||||
chat_type = context.chat_type
|
||||
logger.debug(f"处理流 {stream_id},聊天类型: {chat_type.value}")
|
||||
@@ -104,9 +106,14 @@ class ChatterManager:
|
||||
|
||||
self.stats["streams_processed"] += 1
|
||||
try:
|
||||
result = await self.instances[stream_id].execute(context)
|
||||
# 如果提供了 unread_messages,则传递给 execute 方法
|
||||
if unread_messages:
|
||||
result = await self.instances[stream_id].execute(context, unread_messages)
|
||||
else:
|
||||
result = await self.instances[stream_id].execute(context)
|
||||
|
||||
self.stats["successful_executions"] += 1
|
||||
|
||||
|
||||
# 记录处理结果
|
||||
success = result.get("success", False)
|
||||
actions_count = result.get("actions_count", 0)
|
||||
|
||||
Reference in New Issue
Block a user