From 0beb62421c0b417627a3fa10463a7dec4ba49b59 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 26 Sep 2025 00:24:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E5=AE=9E=E7=8E=B0=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 引入了一个全新的并发消息处理系统,以显著提升在高活跃度群聊中的响应速度。 在此之前,消息管理器对每个聊天流(如一个群聊)内的所有消息进行串行处理,导致用户需要排队等待机器人响应。新系统引入了可配置的并发模式: - 通过 `concurrent_message_processing` 开关启用。 - 允许并行处理来自同一群聊中不同用户的消息。 - 通过 `process_by_user_id` 保证对同一用户的消息处理仍然是串行的,以维持上下文的连贯性。 - 使用 `concurrent_per_user_limit` 控制并发处理的用户数量。 为了支持此功能,对 `MessageManager` 进行了大规模重构,用更高效的独立流检查机制取代了旧的全局轮询和优先级排序逻辑。同时,清理和移除了大量已废弃或冗余的配置项,简化了整体配置。 BREAKING CHANGE: 移除了多个已废弃的 `ChatConfig` 配置项,包括 `mentioned_bot_inevitable_reply`, `at_bot_inevitable_reply`, `focus_value`, `group_chat_mode` 等。这些功能已被新的 AFC 逻辑或其它机制取代。请参考最新的配置文件模板进行更新。 --- src/chat/chatter_manager.py | 13 +- src/chat/message_manager/message_manager.py | 518 ++++++------------ src/config/official_configs.py | 131 +++-- .../affinity_flow_chatter/affinity_chatter.py | 9 +- .../affinity_flow_chatter/plan_filter.py | 4 - .../built_in/affinity_flow_chatter/planner.py | 18 +- template/bot_config_template.toml | 13 +- 7 files changed, 273 insertions(+), 433 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index c906fd901..f795f479b 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -79,7 +79,9 @@ class ChatterManager: del self.instances[stream_id] logger.info(f"清理不活跃聊天流实例: {stream_id}") - async def process_stream_context(self, stream_id: str, context: StreamContext) -> dict: + async def process_stream_context( + self, stream_id: str, context: StreamContext, unread_messages: Optional[List[Any]] = None + ) -> dict: """处理流上下文""" chat_type = context.chat_type logger.debug(f"处理流 {stream_id},聊天类型: {chat_type.value}") @@ -104,9 +106,14 @@ class ChatterManager: self.stats["streams_processed"] += 1 try: - result = await self.instances[stream_id].execute(context) + # 如果提供了 unread_messages,则传递给 execute 方法 + if unread_messages: + result = await self.instances[stream_id].execute(context, unread_messages) + else: + result = await self.instances[stream_id].execute(context) + self.stats["successful_executions"] += 1 - + # 记录处理结果 success = result.get("success", False) actions_count = result.get("actions_count", 0) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 85dc5e4d5..bab71c09c 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -7,7 +7,7 @@ import asyncio import random import time import traceback -from typing import Dict, Optional, Any, TYPE_CHECKING +from typing import Dict, Optional, Any, TYPE_CHECKING, List from src.common.logger import get_logger from src.common.data_models.database_data_model import DatabaseMessages @@ -29,10 +29,13 @@ class MessageManager: def __init__(self, check_interval: float = 5.0): self.stream_contexts: Dict[str, StreamContext] = {} - self.check_interval = check_interval # 检查间隔(秒) + self.check_interval = check_interval self.is_running = False self.manager_task: Optional[asyncio.Task] = None + # 并发控制信号量 + self.concurrent_semaphore: Optional[asyncio.Semaphore] = None + # 统计信息 self.stats = MessageManagerStats() @@ -52,6 +55,10 @@ class MessageManager: self.is_running = True self.manager_task = asyncio.create_task(self._manager_loop()) + if global_config.chat.concurrent_message_processing: + limit = global_config.chat.concurrent_per_user_limit + self.concurrent_semaphore = asyncio.Semaphore(limit) + logger.info(f"并发处理已启用,全局并发限制: {limit}") await self.wakeup_manager.start() logger.info("消息管理器已启动") @@ -64,8 +71,12 @@ class MessageManager: # 停止所有流处理任务 for context in self.stream_contexts.values(): - if context.processing_task and not context.processing_task.done(): + if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done(): context.processing_task.cancel() + if hasattr(context, 'user_processing_tasks'): + for task in context.user_processing_tasks.values(): + if task and not task.done(): + task.cancel() # 停止管理器任务 if self.manager_task and not self.manager_task.done(): @@ -79,9 +90,14 @@ class MessageManager: """添加消息到指定聊天流""" # 获取或创建流上下文 if stream_id not in self.stream_contexts: - self.stream_contexts[stream_id] = StreamContext(stream_id=stream_id) + context = StreamContext(stream_id=stream_id) + # 为并发处理添加队列和锁 + if global_config.chat.concurrent_message_processing: + context.send_lock = asyncio.Lock() + context.user_processing_tasks = {} + self.stream_contexts[stream_id] = context self.stats.total_streams += 1 - + context = self.stream_contexts[stream_id] context.set_chat_mode(ChatMode.FOCUS) context.add_message(message) @@ -99,10 +115,9 @@ class MessageManager: await self._check_streams_with_individual_intervals() # 计算下次检查时间(使用最小间隔或固定间隔) + next_check_delay = self.check_interval if global_config.chat.dynamic_distribution_enabled: next_check_delay = self._calculate_next_manager_delay() - else: - next_check_delay = self.check_interval await asyncio.sleep(next_check_delay) except asyncio.CancelledError: @@ -111,107 +126,63 @@ class MessageManager: logger.error(f"消息管理器循环出错: {e}") traceback.print_exc() - async def _check_all_streams(self): - """检查所有聊天流""" - active_streams = 0 - total_unread = 0 - - for stream_id, context in self.stream_contexts.items(): - if not context.is_active: - continue - - active_streams += 1 - - # 检查是否有未读消息 - unread_messages = context.get_unread_messages() - if unread_messages: - total_unread += len(unread_messages) - - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 更新统计 - self.stats.active_streams = active_streams - self.stats.total_unread_messages = total_unread - - async def _process_stream_messages(self, stream_id: str): - """处理指定聊天流的消息""" + async def _process_stream_messages(self, stream_id: str, unread_messages_override: List[DatabaseMessages]): + """ + 处理指定聊天流的消息 (非并发模式专用) + """ if stream_id not in self.stream_contexts: return context = self.stream_contexts[stream_id] + context.processing_task = asyncio.current_task() + user_id = unread_messages_override[0].user_info.user_id if unread_messages_override and hasattr(unread_messages_override[0], 'user_info') else None try: - # 获取未读消息 - unread_messages = context.get_unread_messages() - if not unread_messages: - return + await self._check_and_handle_interruption(context, stream_id, unread_messages_override, user_id) - # 检查是否需要打断现有处理 - await self._check_and_handle_interruption(context, stream_id) - - # --- 睡眠状态检查 --- if self.sleep_manager.is_sleeping(): - logger.info(f"Bot正在睡觉,检查聊天流 {stream_id} 是否有唤醒触发器。") - was_woken_up = False is_private = context.is_private_chat() - - for message in unread_messages: + for message in unread_messages_override: is_mentioned = message.is_mentioned or False if is_private or is_mentioned: if self.wakeup_manager.add_wakeup_value(is_private, is_mentioned): was_woken_up = True - break # 一旦被吵醒,就跳出循环并处理消息 - + break if not was_woken_up: logger.debug(f"聊天流 {stream_id} 中没有唤醒触发器,保持消息未读状态。") - return # 退出,不处理消息 - + self._clear_specific_unread_messages(context, unread_messages_override) + return logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。") - # --- 睡眠状态检查结束 --- - logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages)} 条未读消息") - - # 直接使用StreamContext对象进行处理 - if unread_messages: - try: - # 记录当前chat type用于调试 - logger.debug(f"聊天流 {stream_id} 检测到的chat type: {context.chat_type.value}") - - # 发送到chatter manager,传递StreamContext对象 - results = await self.chatter_manager.process_stream_context(stream_id, context) - - # 处理结果,标记消息为已读 - if results.get("success", False): - self._clear_all_unread_messages(context) - logger.debug(f"聊天流 {stream_id} 处理成功,清除了 {len(unread_messages)} 条未读消息") - else: - logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") - - except Exception as e: - logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}") - # 出现异常时也清除未读消息,避免重复处理 - self._clear_all_unread_messages(context) - raise - - logger.debug(f"聊天流 {stream_id} 消息处理完成") + logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages_override)} 条未读消息") + + results = await self.chatter_manager.process_stream_context(stream_id, context, unread_messages_override) + if results.get("success", False): + logger.debug(f"聊天流 {stream_id} 处理成功") + else: + logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") + + self._clear_specific_unread_messages(context, unread_messages_override) except asyncio.CancelledError: + logger.info(f"聊天流 {stream_id} 的处理任务被取消") + self._clear_specific_unread_messages(context, unread_messages_override) raise except Exception as e: - logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}") + logger.error(f"处理聊天流 {stream_id} 时发生异常: {e}") traceback.print_exc() - + self._clear_specific_unread_messages(context, unread_messages_override) + finally: + context.processing_task = None + def deactivate_stream(self, stream_id: str): """停用聊天流""" if stream_id in self.stream_contexts: context = self.stream_contexts[stream_id] context.is_active = False - # 取消处理任务 - if context.processing_task and not context.processing_task.done(): + if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done(): context.processing_task.cancel() logger.info(f"停用聊天流: {stream_id}") @@ -234,7 +205,7 @@ class MessageManager: unread_count=len(context.get_unread_messages()), history_count=len(context.history_messages), last_check_time=context.last_check_time, - has_active_task=bool(context.processing_task and not context.processing_task.done()), + has_active_task=bool(hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done()), ) def get_manager_stats(self) -> Dict[str, Any]: @@ -263,30 +234,49 @@ class MessageManager: del self.stream_contexts[stream_id] logger.info(f"清理不活跃聊天流: {stream_id}") - async def _check_and_handle_interruption(self, context: StreamContext, stream_id: str): + async def _check_and_handle_interruption( + self, context: StreamContext, stream_id: str, unread_messages: List[DatabaseMessages], user_id: Optional[str] = None + ): """检查并处理消息打断""" if not global_config.chat.interruption_enabled: return - # 检查是否有正在进行的处理任务 - if context.processing_task and not context.processing_task.done(): - # 计算打断概率 + if context.interruption_count >= global_config.chat.interruption_max_limit: + logger.debug(f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},本次不进行打断") + return + + task_to_check = None + if global_config.chat.concurrent_message_processing and global_config.chat.process_by_user_id and user_id: + task_to_check = context.user_processing_tasks.get(user_id) + else: + task_to_check = context.processing_task + + if task_to_check and not task_to_check.done(): interruption_probability = context.calculate_interruption_probability( global_config.chat.interruption_max_limit, global_config.chat.interruption_probability_factor ) - # 根据概率决定是否打断 if random.random() < interruption_probability: - logger.info(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") + user_nickname = "" + if user_id and unread_messages: + for msg in unread_messages: + if hasattr(msg, "user_info") and msg.user_info and msg.user_info.user_id == user_id: + user_nickname = msg.user_info.user_nickname + break + + if user_nickname: + log_target = f"用户'{user_nickname}({user_id})'在聊天流 {stream_id}" + else: + log_target = f"用户 {user_id} 在聊天流 {stream_id}" if user_id else f"聊天流 {stream_id}" - # 取消现有任务 - context.processing_task.cancel() + logger.info(f"{log_target} 触发消息打断,打断概率: {interruption_probability:.2f}") + + task_to_check.cancel() try: - await context.processing_task + await task_to_check except asyncio.CancelledError: pass - # 增加打断计数并应用afc阈值降低 context.increment_interruption_count() context.apply_interruption_afc_reduction(global_config.chat.interruption_afc_reduction) logger.info( @@ -295,149 +285,47 @@ class MessageManager: else: logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - def _calculate_dynamic_distribution_interval(self) -> float: - """根据所有活跃聊天流的focus_energy动态计算分发周期""" - if not global_config.chat.dynamic_distribution_enabled: - return self.check_interval # 使用固定间隔 - - if not self.stream_contexts: - return self.check_interval # 默认间隔 - - # 计算活跃流的平均focus_energy - active_streams = [ctx for ctx in self.stream_contexts.values() if ctx.is_active] - if not active_streams: - return self.check_interval - - total_focus_energy = 0.0 - max_focus_energy = 0.0 - stream_count = 0 - - for context in active_streams: - from src.plugin_system.apis.chat_api import get_chat_manager - chat_stream = get_chat_manager().get_stream(context.stream_id) - if chat_stream: - focus_energy = chat_stream.focus_energy - total_focus_energy += focus_energy - max_focus_energy = max(max_focus_energy, focus_energy) - stream_count += 1 - - if stream_count == 0: - return self.check_interval - - avg_focus_energy = total_focus_energy / stream_count - - # 使用配置参数 - base_interval = global_config.chat.dynamic_distribution_base_interval - min_interval = global_config.chat.dynamic_distribution_min_interval - max_interval = global_config.chat.dynamic_distribution_max_interval - jitter_factor = global_config.chat.dynamic_distribution_jitter_factor - - # 根据平均兴趣度调整间隔 - # 高兴趣度 -> 更频繁检查 (间隔更短) - # 低兴趣度 -> 较少检查 (间隔更长) - if avg_focus_energy >= 0.7: - # 高兴趣度:1-5秒 - interval = base_interval * (1.0 - (avg_focus_energy - 0.7) * 2.0) - elif avg_focus_energy >= 0.4: - # 中等兴趣度:5-15秒 - interval = base_interval * (1.0 + (avg_focus_energy - 0.4) * 3.33) - else: - # 低兴趣度:15-30秒 - interval = base_interval * (3.0 + (0.4 - avg_focus_energy) * 5.0) - - # 添加随机扰动避免同步 - import random - jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor) - final_interval = interval * jitter - - # 限制在合理范围内 - final_interval = max(min_interval, min(max_interval, final_interval)) - - logger.debug( - f"动态分发周期: {final_interval:.2f}s | " - f"平均兴趣度: {avg_focus_energy:.3f} | " - f"活跃流数: {stream_count}" - ) - - return final_interval - - def _calculate_stream_distribution_interval(self, context: StreamContext) -> float: + def _calculate_dynamic_distribution_interval(self, context: StreamContext) -> float: """计算单个聊天流的分发周期 - 基于阈值感知的focus_energy""" if not global_config.chat.dynamic_distribution_enabled: - return self.check_interval # 使用固定间隔 - - from src.plugin_system.apis.chat_api import get_chat_manager - chat_stream = get_chat_manager().get_stream(context.stream_id) - # 获取该流的focus_energy(新的阈值感知版本) - focus_energy = 0.5 # 默认值 - avg_message_interest = 0.5 # 默认平均兴趣度 + return self.check_interval - if chat_stream: - focus_energy = chat_stream.focus_energy - # 获取平均消息兴趣度用于更精确的计算 - if chat_stream.message_count > 0: - avg_message_interest = chat_stream.message_interest_total / chat_stream.message_count + focus_energy = 0.5 + avg_message_interest = 0.5 + + if hasattr(context, 'chat_stream') and context.chat_stream: + focus_energy = context.chat_stream.focus_energy + if context.chat_stream.message_count > 0: + avg_message_interest = context.chat_stream.message_interest_total / context.chat_stream.message_count - # 获取AFC阈值用于参考,添加None值检查 reply_threshold = getattr(global_config.affinity_flow, 'reply_action_interest_threshold', 0.4) non_reply_threshold = getattr(global_config.affinity_flow, 'non_reply_action_interest_threshold', 0.2) high_match_threshold = getattr(global_config.affinity_flow, 'high_match_interest_threshold', 0.8) - # 使用配置参数 base_interval = global_config.chat.dynamic_distribution_base_interval min_interval = global_config.chat.dynamic_distribution_min_interval max_interval = global_config.chat.dynamic_distribution_max_interval jitter_factor = global_config.chat.dynamic_distribution_jitter_factor - # 基于阈值感知的智能分发周期计算 if avg_message_interest >= high_match_threshold: - # 超高兴趣度:极快响应 (1-2秒) interval_multiplier = 0.3 + (focus_energy - 0.7) * 2.0 elif avg_message_interest >= reply_threshold: - # 高兴趣度:快速响应 (2-6秒) gap_from_reply = (avg_message_interest - reply_threshold) / (high_match_threshold - reply_threshold) interval_multiplier = 0.6 + gap_from_reply * 0.4 elif avg_message_interest >= non_reply_threshold: - # 中等兴趣度:正常响应 (6-15秒) gap_from_non_reply = (avg_message_interest - non_reply_threshold) / (reply_threshold - non_reply_threshold) interval_multiplier = 1.2 + gap_from_non_reply * 1.8 else: - # 低兴趣度:缓慢响应 (15-30秒) gap_ratio = max(0, avg_message_interest / non_reply_threshold) interval_multiplier = 3.0 + (1.0 - gap_ratio) * 3.0 - # 应用focus_energy微调 energy_adjustment = 1.0 + (focus_energy - 0.5) * 0.5 interval = base_interval * interval_multiplier * energy_adjustment - # 添加随机扰动避免同步 - import random jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor) final_interval = interval * jitter - # 限制在合理范围内 final_interval = max(min_interval, min(max_interval, final_interval)) - - # 根据兴趣度级别调整日志级别 - if avg_message_interest >= high_match_threshold: - log_level = "info" - elif avg_message_interest >= reply_threshold: - log_level = "info" - else: - log_level = "debug" - - log_msg = ( - f"流 {context.stream_id} 分发周期: {final_interval:.2f}s | " - f"focus_energy: {focus_energy:.3f} | " - f"avg_interest: {avg_message_interest:.3f} | " - f"阈值参考: {non_reply_threshold:.2f}/{reply_threshold:.2f}/{high_match_threshold:.2f}" - ) - - if log_level == "info": - logger.info(log_msg) - else: - logger.debug(log_msg) - return final_interval def _calculate_next_manager_delay(self) -> float: @@ -445,7 +333,6 @@ class MessageManager: current_time = time.time() min_delay = float('inf') - # 找到最近需要检查的流 for context in self.stream_contexts.values(): if not context.is_active: continue @@ -454,14 +341,11 @@ class MessageManager: if time_until_check > 0: min_delay = min(min_delay, time_until_check) else: - min_delay = 0.1 # 立即检查 - break + return 0.1 - # 如果没有活跃流,使用默认间隔 if min_delay == float('inf'): return self.check_interval - # 确保最小延迟 return max(0.1, min(min_delay, self.check_interval)) async def _check_streams_with_individual_intervals(self): @@ -473,162 +357,106 @@ class MessageManager: if not context.is_active: continue - # 检查是否达到检查时间 if current_time >= context.next_check_time: - # 更新检查时间 context.last_check_time = current_time - - # 计算下次检查时间和分发周期 if global_config.chat.dynamic_distribution_enabled: context.distribution_interval = self._calculate_stream_distribution_interval(context) else: context.distribution_interval = self.check_interval - - # 设置下次检查时间 context.next_check_time = current_time + context.distribution_interval - # 检查未读消息 unread_messages = context.get_unread_messages() - if unread_messages: - processed_streams += 1 - self.stats.total_unread_messages = len(unread_messages) + if not unread_messages: + continue - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - from src.plugin_system.apis.chat_api import get_chat_manager - chat_stream = get_chat_manager().get_stream(context.stream_id) - focus_energy = chat_stream.focus_energy if chat_stream else 0.5 - - # 根据优先级记录日志 - if focus_energy >= 0.7: - logger.info( - f"高优先级流 {stream_id} 开始处理 | " - f"focus_energy: {focus_energy:.3f} | " - f"分发周期: {context.distribution_interval:.2f}s | " - f"未读消息: {len(unread_messages)}" - ) + processed_streams += 1 + + if global_config.chat.concurrent_message_processing: + if global_config.chat.process_by_user_id: + user_messages = {} + for msg in unread_messages: + user_id = msg.user_info.user_id if hasattr(msg, 'user_info') and msg.user_info else 'unknown_user' + if user_id not in user_messages: + user_messages[user_id] = [] + user_messages[user_id].append(msg) + + for user_id, messages in user_messages.items(): + await self._check_and_handle_interruption(context, stream_id, messages, user_id) + if not context.user_processing_tasks.get(user_id) or context.user_processing_tasks[user_id].done(): + task = asyncio.create_task(self._process_and_send_reply(context, messages)) + context.user_processing_tasks[user_id] = task + else: + # Fix: Ensure unread_messages is available in this branch + all_unread_messages = context.get_unread_messages() + if all_unread_messages: + if not global_config.chat.concurrent_message_processing: + await self._check_and_handle_interruption(context, stream_id, all_unread_messages) + if not context.processing_task or context.processing_task.done(): + context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id, all_unread_messages)) else: - logger.debug( - f"流 {stream_id} 开始处理 | " - f"focus_energy: {focus_energy:.3f} | " - f"分发周期: {context.distribution_interval:.2f}s" - ) - - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 更新活跃流计数 - active_count = sum(1 for ctx in self.stream_contexts.values() if ctx.is_active) - self.stats.active_streams = active_count - - if processed_streams > 0: - logger.debug( - f"本次循环处理了 {processed_streams} 个流 | " - f"活跃流总数: {active_count}" - ) - - async def _check_all_streams_with_priority(self): - """按优先级检查所有聊天流,高focus_energy的流优先处理""" - if not self.stream_contexts: + await self._check_and_handle_interruption(context, stream_id, all_unread_messages) + if not context.processing_task or context.processing_task.done(): + task = asyncio.create_task(self._process_and_send_reply(context, all_unread_messages)) + context.processing_task = task + # The original 'else' block for the 'if current_time >= context.next_check_time:' check + # was problematic. It seems it tried to process messages even when it wasn't time. + # Removing it should fix the UnboundLocalError and align with the logic of checking the time first. + + async def _process_and_send_reply(self, context: StreamContext, unread_messages: list): + """在后台处理单批消息并加锁发送 (并发模式专用)""" + if not self.concurrent_semaphore: + logger.error("并发信号量未初始化") return - # 获取活跃的聊天流并按focus_energy排序 - active_streams = [] - for stream_id, context in self.stream_contexts.items(): - if not context.is_active: - continue + user_id = unread_messages[0].user_info.user_id if global_config.chat.process_by_user_id and unread_messages and hasattr(unread_messages[0], 'user_info') else None - # 获取focus_energy,如果不存在则使用默认值 - from src.plugin_system.apis.chat_api import get_chat_manager - chat_stream = get_chat_manager().get_stream(context.stream_id) - focus_energy = 0.5 - if chat_stream: - focus_energy = chat_stream.focus_energy - - # 计算流优先级分数 - priority_score = self._calculate_stream_priority(context, focus_energy) - active_streams.append((priority_score, stream_id, context)) - - # 按优先级降序排序 - active_streams.sort(reverse=True, key=lambda x: x[0]) - - # 处理排序后的流 - active_stream_count = 0 - total_unread = 0 - - for priority_score, stream_id, context in active_streams: - active_stream_count += 1 - - # 检查是否有未读消息 - unread_messages = context.get_unread_messages() - if unread_messages: - total_unread += len(unread_messages) - - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 高优先级流的额外日志 - if priority_score > 0.7: - logger.info( - f"高优先级流 {stream_id} 开始处理 | " - f"优先级: {priority_score:.3f} | " - f"未读消息: {len(unread_messages)}" - ) - - # 更新统计 - self.stats.active_streams = active_stream_count - self.stats.total_unread_messages = total_unread - - def _calculate_stream_priority(self, context: StreamContext, focus_energy: float) -> float: - """计算聊天流的优先级分数""" - from src.plugin_system.apis.chat_api import get_chat_manager - chat_stream = get_chat_manager().get_stream(context.stream_id) - # 基础优先级:focus_energy - base_priority = focus_energy - - # 未读消息数量加权 - unread_count = len(context.get_unread_messages()) - message_count_bonus = min(unread_count * 0.1, 0.3) # 最多30%加成 - - # 时间加权:最近活跃的流优先级更高 - current_time = time.time() - time_since_active = current_time - context.last_check_time - time_penalty = max(0, 1.0 - time_since_active / 3600.0) # 1小时内无惩罚 - - # 连续无回复惩罚 - if chat_stream: - consecutive_no_reply = chat_stream.consecutive_no_reply - no_reply_penalty = max(0, 1.0 - consecutive_no_reply * 0.05) # 每次无回复降低5% - else: - no_reply_penalty = 1.0 - - # 综合优先级计算 - final_priority = ( - base_priority * 0.6 + # 基础兴趣度权重60% - message_count_bonus * 0.2 + # 消息数量权重20% - time_penalty * 0.1 + # 时间权重10% - no_reply_penalty * 0.1 # 回复状态权重10% - ) - - return max(0.0, min(1.0, final_priority)) - - def _clear_all_unread_messages(self, context: StreamContext): - """清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" - unread_messages = context.get_unread_messages() - if not unread_messages: - return - - logger.warning(f"正在清除 {len(unread_messages)} 条未读消息") - - # 将所有未读消息标记为已读并移动到历史记录 - for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表 + async with self.concurrent_semaphore: try: - context.mark_message_as_read(msg.message_id) - self.stats.total_processed_messages += 1 - logger.debug(f"强制清除消息 {msg.message_id},标记为已读") + # 思考和发送都在锁内,确保单次回复的原子性 + async with context.send_lock: + logger.debug(f"发送任务锁定聊天流 {context.stream_id},准备处理和回复") + + results = await self.chatter_manager.process_stream_context(context.stream_id, context, unread_messages) + + if results.get("success", False): + self._clear_specific_unread_messages(context, unread_messages) + logger.debug(f"聊天流 {context.stream_id} 并发处理成功,清除了 {len(unread_messages)} 条未读消息") + else: + logger.warning(f"聊天流 {context.stream_id} 并发处理失败: {results.get('error_message', '未知错误')}") + + reply_delay = random.uniform(1.5, 3.0) + await asyncio.sleep(reply_delay) + + logger.debug(f"发送任务解锁聊天流 {context.stream_id}") + + except asyncio.CancelledError: + logger.info(f"用户 {user_id} 的任务被取消") + self._clear_specific_unread_messages(context, unread_messages) # 取消时也清除消息 + raise except Exception as e: - logger.error(f"清除消息 {msg.message_id} 时出错: {e}") + logger.error(f"后台回复处理任务出错: {e}") + traceback.print_exc() + self._clear_specific_unread_messages(context, unread_messages) + finally: + if user_id and user_id in context.user_processing_tasks: + if context.user_processing_tasks[user_id] is asyncio.current_task(): + del context.user_processing_tasks[user_id] + def _clear_specific_unread_messages(self, context: StreamContext, messages_to_clear: list): + """清除指定上下文中的特定未读消息""" + if not messages_to_clear: + return + message_ids_to_clear = {msg.message_id for msg in messages_to_clear} + + context.unread_messages = [msg for msg in context.unread_messages if msg.message_id not in message_ids_to_clear] + + for msg in messages_to_clear: + context.history_messages.append(msg) + + if len(context.history_messages) > 100: + context.history_messages = context.history_messages[-100:] + + # 创建全局消息管理器实例 message_manager = MessageManager() diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 7afedfae7..4fefa6212 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -104,20 +104,9 @@ class ChatConfig(ValidatedConfigBase): """聊天配置类""" max_context_size: int = Field(default=18, description="最大上下文大小") - replyer_random_probability: float = Field(default=0.5, description="回复者随机概率") thinking_timeout: int = Field(default=40, description="思考超时时间") - talk_frequency: float = Field(default=1.0, description="聊天频率") - mentioned_bot_inevitable_reply: bool = Field(default=False, description="提到机器人的必然回复") - at_bot_inevitable_reply: bool = Field(default=False, description="@机器人的必然回复") allow_reply_self: bool = Field(default=False, description="是否允许回复自己说的话") talk_frequency_adjust: list[list[str]] = Field(default_factory=lambda: [], description="聊天频率调整") - focus_value: float = Field(default=1.0, description="专注值") - focus_mode_quiet_groups: List[str] = Field( - default_factory=list, - description='专注模式下需要保持安静的群组列表, 格式: ["platform:group_id1", "platform:group_id2"]', - ) - force_reply_private: bool = Field(default=False, description="强制回复私聊") - group_chat_mode: Literal["auto", "normal", "focus"] = Field(default="auto", description="群聊模式") timestamp_display_mode: Literal["normal", "normal_no_YMD", "relative"] = Field( default="normal_no_YMD", description="时间戳显示模式" ) @@ -158,47 +147,57 @@ class ChatConfig(ValidatedConfigBase): dynamic_distribution_jitter_factor: float = Field( default=0.2, ge=0.0, le=0.5, description="分发间隔随机扰动因子" ) - + + # 并发消息处理 + concurrent_message_processing: bool = Field( + default=False, description="是否启用并发消息处理,在同一聊天流中并行处理多个消息" + ) + concurrent_per_user_limit: int = Field( + default=3, description="在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数" + ) + process_by_user_id: bool = Field( + default=True, description="在并发模式下,是否按用户ID进行独立串行处理" + ) + def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float: """ 根据当前时间和聊天流获取对应的 talk_frequency - + Args: chat_stream_id: 聊天流ID,格式为 "platform:chat_id:type" - + Returns: float: 对应的频率值 """ if not self.talk_frequency_adjust: - return self.talk_frequency - + return 1.0 + # 优先检查聊天流特定的配置 if chat_stream_id: stream_frequency = self._get_stream_specific_frequency(chat_stream_id) if stream_frequency is not None: return stream_frequency - + # 检查全局时段配置(第一个元素为空字符串的配置) global_frequency = self._get_global_frequency() - return self.talk_frequency if global_frequency is None else global_frequency - - @staticmethod - def _get_time_based_frequency(time_freq_list: list[str]) -> Optional[float]: + return 1.0 if global_frequency is None else global_frequency + + def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]: """ 根据时间配置列表获取当前时段的频率 - + Args: time_freq_list: 时间频率配置列表,格式为 ["HH:MM,frequency", ...] - + Returns: float: 频率值,如果没有配置则返回 None """ from datetime import datetime - + current_time = datetime.now().strftime("%H:%M") current_hour, current_minute = map(int, current_time.split(":")) current_minutes = current_hour * 60 + current_minute - + # 解析时间频率配置 time_freq_pairs = [] for time_freq_str in time_freq_list: @@ -210,13 +209,13 @@ class ChatConfig(ValidatedConfigBase): time_freq_pairs.append((minutes, frequency)) except (ValueError, IndexError): continue - + if not time_freq_pairs: return None - + # 按时间排序 time_freq_pairs.sort(key=lambda x: x[0]) - + # 查找当前时间对应的频率 current_frequency = None for minutes, frequency in time_freq_pairs: @@ -224,20 +223,20 @@ class ChatConfig(ValidatedConfigBase): current_frequency = frequency else: break - + # 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑) if current_frequency is None and time_freq_pairs: current_frequency = time_freq_pairs[-1][1] - + return current_frequency - + def _get_stream_specific_frequency(self, chat_stream_id: str): """ 获取特定聊天流在当前时间的频率 - + Args: chat_stream_id: 聊天流ID(哈希值) - + Returns: float: 频率值,如果没有配置则返回 None """ @@ -245,31 +244,30 @@ class ChatConfig(ValidatedConfigBase): for config_item in self.talk_frequency_adjust: if not config_item or len(config_item) < 2: continue - + stream_config_str = config_item[0] # 例如 "qq:1026294844:group" - + # 解析配置字符串并生成对应的 chat_id config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str) if config_chat_id is None: continue - + # 比较生成的 chat_id if config_chat_id != chat_stream_id: continue - + # 使用通用的时间频率解析方法 return self._get_time_based_frequency(config_item[1:]) - + return None - - @staticmethod - def _parse_stream_config_to_chat_id(stream_config_str: str) -> Optional[str]: + + def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id - + Args: stream_config_str: 格式为 "platform:id:type" 的字符串 - + Returns: str: 生成的 chat_id,如果解析失败则返回 None """ @@ -277,42 +275,42 @@ class ChatConfig(ValidatedConfigBase): parts = stream_config_str.split(":") if len(parts) != 3: return None - + platform = parts[0] id_str = parts[1] stream_type = parts[2] - + # 判断是否为群聊 is_group = stream_type == "group" - + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id import hashlib - + if is_group: components = [platform, str(id_str)] else: components = [platform, str(id_str), "private"] key = "_".join(components) return hashlib.md5(key.encode()).hexdigest() - + except (ValueError, IndexError): return None - + def _get_global_frequency(self) -> Optional[float]: """ 获取全局默认频率配置 - + Returns: float: 频率值,如果没有配置则返回 None """ for config_item in self.talk_frequency_adjust: if not config_item or len(config_item) < 2: continue - + # 检查是否为全局默认配置(第一个元素为空字符串) if config_item[0] == "": return self._get_time_based_frequency(config_item[1:]) - + return None @@ -346,10 +344,10 @@ class ExpressionConfig(ValidatedConfigBase): def _parse_stream_config_to_chat_id(stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id - + Args: stream_config_str: 格式为 "platform:id:type" 的字符串 - + Returns: str: 生成的 chat_id,如果解析失败则返回 None """ @@ -357,52 +355,52 @@ class ExpressionConfig(ValidatedConfigBase): parts = stream_config_str.split(":") if len(parts) != 3: return None - + platform = parts[0] id_str = parts[1] stream_type = parts[2] - + # 判断是否为群聊 is_group = stream_type == "group" - + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id import hashlib - + if is_group: components = [platform, str(id_str)] else: components = [platform, str(id_str), "private"] key = "_".join(components) return hashlib.md5(key.encode()).hexdigest() - + except (ValueError, IndexError): return None - + def get_expression_config_for_chat(self, chat_stream_id: Optional[str] = None) -> tuple[bool, bool, float]: """ 根据聊天流ID获取表达配置 - + Args: chat_stream_id: 聊天流ID,格式为哈希值 - + Returns: tuple: (是否使用表达, 是否学习表达, 学习间隔) """ if not self.rules: # 如果没有配置,使用默认值:启用表达,启用学习,强度1.0 return True, True, 1.0 - + # 优先检查聊天流特定的配置 if chat_stream_id: for rule in self.rules: if rule.chat_stream_id and self._parse_stream_config_to_chat_id(rule.chat_stream_id) == chat_stream_id: return rule.use_expression, rule.learn_expression, rule.learning_strength - + # 检查全局配置(chat_stream_id为空字符串的配置) for rule in self.rules: if rule.chat_stream_id == "": return rule.use_expression, rule.learn_expression, rule.learning_strength - + # 如果都没有匹配,返回默认值 return True, True, 1.0 @@ -476,7 +474,7 @@ class KeywordRuleConfig(ValidatedConfigBase): def __post_init__(self): import re - + if not self.keywords and not self.regex: raise ValueError("关键词规则必须至少包含keywords或regex中的一个") if not self.reaction: @@ -499,7 +497,6 @@ class CustomPromptConfig(ValidatedConfigBase): """自定义提示词配置类""" image_prompt: str = Field(default="", description="图片提示词") - planner_custom_prompt_enable: bool = Field(default=False, description="启用规划器自定义提示词") planner_custom_prompt_content: str = Field(default="", description="规划器自定义提示词内容") diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py index 08f5f7098..35416a297 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py @@ -53,12 +53,13 @@ class AffinityChatter(BaseChatter): } self.last_activity_time = time.time() - async def execute(self, context: StreamContext) -> dict: + async def execute(self, context: StreamContext, unread_messages: list | None = None) -> dict: """ 处理StreamContext对象 Args: context: StreamContext对象,包含聊天流的所有消息信息 + unread_messages: (可选) 指定要处理的未读消息列表,用于并发处理 Returns: 处理结果字典 @@ -68,10 +69,12 @@ class AffinityChatter(BaseChatter): learner = expression_learner_manager.get_expression_learner(self.stream_id) asyncio.create_task(learner.trigger_learning_for_chat()) - unread_messages = context.get_unread_messages() + # 如果没有提供未读消息列表,则从上下文中获取 + if unread_messages is None: + unread_messages = context.get_unread_messages() # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(context=context) + actions, target_message = await self.planner.plan(context=context, unread_messages=unread_messages) self.stats["plans_created"] += 1 # 执行动作(如果规划器返回了动作) diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index abe5518ea..a10a59f17 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -218,10 +218,6 @@ class ChatterPlanFilter: self.last_obs_time_mark = time.time() mentioned_bonus = "" - if global_config.chat.mentioned_bot_inevitable_reply: - mentioned_bonus = "\n- 有人提到你" - if global_config.chat.at_bot_inevitable_reply: - mentioned_bonus = "\n- 有人提到你,或者at你" if plan.mode == ChatMode.FOCUS: no_action_block = """ diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index b3b86210c..64cf92d9c 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -70,12 +70,15 @@ class ChatterActionPlanner: "other_actions_executed": 0, } - async def plan(self, context: "StreamContext" = None) -> Tuple[List[Dict], Optional[Dict]]: + async def plan( + self, context: "StreamContext" = None, unread_messages: Optional[List[Dict]] = None + ) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 Args: context (StreamContext): 包含聊天流消息的上下文对象。 + unread_messages (Optional[List[Dict]]): (可选) 指定要处理的未读消息列表,用于并发处理 Returns: Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: @@ -85,14 +88,16 @@ class ChatterActionPlanner: try: self.planner_stats["total_plans"] += 1 - return await self._enhanced_plan_flow(context) - + return await self._enhanced_plan_flow(context, unread_messages) + except Exception as e: logger.error(f"规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 return [], None - async def _enhanced_plan_flow(self, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]: + async def _enhanced_plan_flow( + self, context: "StreamContext", unread_messages: Optional[List[Dict]] = None + ) -> Tuple[List[Dict], Optional[Dict]]: """执行增强版规划流程""" try: # 在规划前,先进行动作修改 @@ -106,7 +111,10 @@ class ChatterActionPlanner: # 确保Plan中包含所有当前可用的动作 initial_plan.available_actions = self.action_manager.get_using_actions() - unread_messages = context.get_unread_messages() if context else [] + # 如果没有提供未读消息列表,则从上下文中获取 + if unread_messages is None: + unread_messages = context.get_unread_messages() if context else [] + # 2. 兴趣度评分 - 只对未读消息进行评分 if unread_messages: bot_nickname = global_config.bot.nickname diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 50f473c54..0a71e78ed 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.0.2" +version = "7.0.4" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -156,11 +156,12 @@ dynamic_distribution_min_interval = 1.0 # 最小分发间隔(秒) dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒) dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子 -focus_value_adjust = [ - ["", "8:00,1", "12:00,0.8", "18:00,1", "01:00,0.3"], - ["qq:114514:group", "12:20,0.6", "16:10,0.5", "20:10,0.8", "00:10,0.3"], - ["qq:1919810:private", "8:20,0.5", "12:10,0.8", "20:10,1", "00:10,0.2"] -] +# 是否启用并发消息处理,在同一聊天流中并行处理多个消息 +concurrent_message_processing = false +# 在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数 +concurrent_per_user_limit = 3 +# 在并发模式下,是否按用户ID进行独立串行处理 +process_by_user_id = true talk_frequency_adjust = [ ["", "8:00,0.5", "12:00,0.6", "18:00,0.8", "01:00,0.3"],