From c5a55df5ec0717984927ded3a5d4c8027d181ed6 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 26 Sep 2025 00:24:34 +0800 Subject: [PATCH 1/4] =?UTF-8?q?feat(chat):=20=E5=AE=9E=E7=8E=B0=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 引入了一个全新的并发消息处理系统,以显著提升在高活跃度群聊中的响应速度。 在此之前,消息管理器对每个聊天流(如一个群聊)内的所有消息进行串行处理,导致用户需要排队等待机器人响应。新系统引入了可配置的并发模式: - 通过 `concurrent_message_processing` 开关启用。 - 允许并行处理来自同一群聊中不同用户的消息。 - 通过 `process_by_user_id` 保证对同一用户的消息处理仍然是串行的,以维持上下文的连贯性。 - 使用 `concurrent_per_user_limit` 控制并发处理的用户数量。 为了支持此功能,对 `MessageManager` 进行了大规模重构,用更高效的独立流检查机制取代了旧的全局轮询和优先级排序逻辑。同时,清理和移除了大量已废弃或冗余的配置项,简化了整体配置。 BREAKING CHANGE: 移除了多个已废弃的 `ChatConfig` 配置项,包括 `mentioned_bot_inevitable_reply`, `at_bot_inevitable_reply`, `focus_value`, `group_chat_mode` 等。这些功能已被新的 AFC 逻辑或其它机制取代。请参考最新的配置文件模板进行更新。 --- src/chat/chatter_manager.py | 13 +- src/chat/message_manager/message_manager.py | 498 ++++++------------ src/config/official_configs.py | 125 +++-- .../affinity_flow_chatter/affinity_chatter.py | 9 +- .../affinity_flow_chatter/plan_filter.py | 4 - .../built_in/affinity_flow_chatter/planner.py | 18 +- template/bot_config_template.toml | 9 +- 7 files changed, 267 insertions(+), 409 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index c906fd901..f795f479b 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -79,7 +79,9 @@ class ChatterManager: del self.instances[stream_id] logger.info(f"清理不活跃聊天流实例: {stream_id}") - async def process_stream_context(self, stream_id: str, context: StreamContext) -> dict: + async def process_stream_context( + self, stream_id: str, context: StreamContext, unread_messages: Optional[List[Any]] = None + ) -> dict: """处理流上下文""" chat_type = context.chat_type logger.debug(f"处理流 {stream_id},聊天类型: {chat_type.value}") @@ -104,9 +106,14 @@ class ChatterManager: self.stats["streams_processed"] += 1 try: - result = await self.instances[stream_id].execute(context) + # 如果提供了 unread_messages,则传递给 execute 方法 + if unread_messages: + result = await self.instances[stream_id].execute(context, unread_messages) + else: + result = await self.instances[stream_id].execute(context) + self.stats["successful_executions"] += 1 - + # 记录处理结果 success = result.get("success", False) actions_count = result.get("actions_count", 0) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 551e60782..0e18ca084 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -7,7 +7,7 @@ import asyncio import random import time import traceback -from typing import Dict, Optional, Any, TYPE_CHECKING +from typing import Dict, Optional, Any, TYPE_CHECKING, List from src.common.logger import get_logger from src.common.data_models.database_data_model import DatabaseMessages @@ -30,10 +30,13 @@ class MessageManager: def __init__(self, check_interval: float = 5.0): self.stream_contexts: Dict[str, StreamContext] = {} - self.check_interval = check_interval # 检查间隔(秒) + self.check_interval = check_interval self.is_running = False self.manager_task: Optional[asyncio.Task] = None + # 并发控制信号量 + self.concurrent_semaphore: Optional[asyncio.Semaphore] = None + # 统计信息 self.stats = MessageManagerStats() @@ -53,6 +56,10 @@ class MessageManager: self.is_running = True self.manager_task = asyncio.create_task(self._manager_loop()) + if global_config.chat.concurrent_message_processing: + limit = global_config.chat.concurrent_per_user_limit + self.concurrent_semaphore = asyncio.Semaphore(limit) + logger.info(f"并发处理已启用,全局并发限制: {limit}") await self.wakeup_manager.start() logger.info("消息管理器已启动") @@ -65,8 +72,12 @@ class MessageManager: # 停止所有流处理任务 for context in self.stream_contexts.values(): - if context.processing_task and not context.processing_task.done(): + if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done(): context.processing_task.cancel() + if hasattr(context, 'user_processing_tasks'): + for task in context.user_processing_tasks.values(): + if task and not task.done(): + task.cancel() # 停止管理器任务 if self.manager_task and not self.manager_task.done(): @@ -80,9 +91,14 @@ class MessageManager: """添加消息到指定聊天流""" # 获取或创建流上下文 if stream_id not in self.stream_contexts: - self.stream_contexts[stream_id] = StreamContext(stream_id=stream_id) + context = StreamContext(stream_id=stream_id) + # 为并发处理添加队列和锁 + if global_config.chat.concurrent_message_processing: + context.send_lock = asyncio.Lock() + context.user_processing_tasks = {} + self.stream_contexts[stream_id] = context self.stats.total_streams += 1 - + context = self.stream_contexts[stream_id] context.set_chat_mode(ChatMode.FOCUS) context.add_message(message) @@ -100,10 +116,9 @@ class MessageManager: await self._check_streams_with_individual_intervals() # 计算下次检查时间(使用最小间隔或固定间隔) + next_check_delay = self.check_interval if global_config.chat.dynamic_distribution_enabled: next_check_delay = self._calculate_next_manager_delay() - else: - next_check_delay = self.check_interval await asyncio.sleep(next_check_delay) except asyncio.CancelledError: @@ -112,107 +127,63 @@ class MessageManager: logger.error(f"消息管理器循环出错: {e}") traceback.print_exc() - async def _check_all_streams(self): - """检查所有聊天流""" - active_streams = 0 - total_unread = 0 - - for stream_id, context in self.stream_contexts.items(): - if not context.is_active: - continue - - active_streams += 1 - - # 检查是否有未读消息 - unread_messages = context.get_unread_messages() - if unread_messages: - total_unread += len(unread_messages) - - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 更新统计 - self.stats.active_streams = active_streams - self.stats.total_unread_messages = total_unread - - async def _process_stream_messages(self, stream_id: str): - """处理指定聊天流的消息""" + async def _process_stream_messages(self, stream_id: str, unread_messages_override: List[DatabaseMessages]): + """ + 处理指定聊天流的消息 (非并发模式专用) + """ if stream_id not in self.stream_contexts: return context = self.stream_contexts[stream_id] + context.processing_task = asyncio.current_task() + user_id = unread_messages_override[0].user_info.user_id if unread_messages_override and hasattr(unread_messages_override[0], 'user_info') else None try: - # 获取未读消息 - unread_messages = context.get_unread_messages() - if not unread_messages: - return + await self._check_and_handle_interruption(context, stream_id, unread_messages_override, user_id) - # 检查是否需要打断现有处理 - await self._check_and_handle_interruption(context, stream_id) - - # --- 睡眠状态检查 --- if self.sleep_manager.is_sleeping(): - logger.info(f"Bot正在睡觉,检查聊天流 {stream_id} 是否有唤醒触发器。") - was_woken_up = False is_private = context.is_private_chat() - - for message in unread_messages: + for message in unread_messages_override: is_mentioned = message.is_mentioned or False if is_private or is_mentioned: if self.wakeup_manager.add_wakeup_value(is_private, is_mentioned): was_woken_up = True - break # 一旦被吵醒,就跳出循环并处理消息 - + break if not was_woken_up: logger.debug(f"聊天流 {stream_id} 中没有唤醒触发器,保持消息未读状态。") - return # 退出,不处理消息 - + self._clear_specific_unread_messages(context, unread_messages_override) + return logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。") - # --- 睡眠状态检查结束 --- - logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages)} 条未读消息") - - # 直接使用StreamContext对象进行处理 - if unread_messages: - try: - # 记录当前chat type用于调试 - logger.debug(f"聊天流 {stream_id} 检测到的chat type: {context.chat_type.value}") - - # 发送到chatter manager,传递StreamContext对象 - results = await self.chatter_manager.process_stream_context(stream_id, context) - - # 处理结果,标记消息为已读 - if results.get("success", False): - self._clear_all_unread_messages(context) - logger.debug(f"聊天流 {stream_id} 处理成功,清除了 {len(unread_messages)} 条未读消息") - else: - logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") - - except Exception as e: - logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}") - # 出现异常时也清除未读消息,避免重复处理 - self._clear_all_unread_messages(context) - raise - - logger.debug(f"聊天流 {stream_id} 消息处理完成") + logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages_override)} 条未读消息") + + results = await self.chatter_manager.process_stream_context(stream_id, context, unread_messages_override) + if results.get("success", False): + logger.debug(f"聊天流 {stream_id} 处理成功") + else: + logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") + + self._clear_specific_unread_messages(context, unread_messages_override) except asyncio.CancelledError: + logger.info(f"聊天流 {stream_id} 的处理任务被取消") + self._clear_specific_unread_messages(context, unread_messages_override) raise except Exception as e: - logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}") + logger.error(f"处理聊天流 {stream_id} 时发生异常: {e}") traceback.print_exc() - + self._clear_specific_unread_messages(context, unread_messages_override) + finally: + context.processing_task = None + def deactivate_stream(self, stream_id: str): """停用聊天流""" if stream_id in self.stream_contexts: context = self.stream_contexts[stream_id] context.is_active = False - # 取消处理任务 - if context.processing_task and not context.processing_task.done(): + if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done(): context.processing_task.cancel() logger.info(f"停用聊天流: {stream_id}") @@ -235,7 +206,7 @@ class MessageManager: unread_count=len(context.get_unread_messages()), history_count=len(context.history_messages), last_check_time=context.last_check_time, - has_active_task=bool(context.processing_task and not context.processing_task.done()), + has_active_task=bool(hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done()), ) def get_manager_stats(self) -> Dict[str, Any]: @@ -264,30 +235,49 @@ class MessageManager: del self.stream_contexts[stream_id] logger.info(f"清理不活跃聊天流: {stream_id}") - async def _check_and_handle_interruption(self, context: StreamContext, stream_id: str): + async def _check_and_handle_interruption( + self, context: StreamContext, stream_id: str, unread_messages: List[DatabaseMessages], user_id: Optional[str] = None + ): """检查并处理消息打断""" if not global_config.chat.interruption_enabled: return - # 检查是否有正在进行的处理任务 - if context.processing_task and not context.processing_task.done(): - # 计算打断概率 + if context.interruption_count >= global_config.chat.interruption_max_limit: + logger.debug(f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},本次不进行打断") + return + + task_to_check = None + if global_config.chat.concurrent_message_processing and global_config.chat.process_by_user_id and user_id: + task_to_check = context.user_processing_tasks.get(user_id) + else: + task_to_check = context.processing_task + + if task_to_check and not task_to_check.done(): interruption_probability = context.calculate_interruption_probability( global_config.chat.interruption_max_limit, global_config.chat.interruption_probability_factor ) - # 根据概率决定是否打断 if random.random() < interruption_probability: - logger.info(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") + user_nickname = "" + if user_id and unread_messages: + for msg in unread_messages: + if hasattr(msg, "user_info") and msg.user_info and msg.user_info.user_id == user_id: + user_nickname = msg.user_info.user_nickname + break + + if user_nickname: + log_target = f"用户'{user_nickname}({user_id})'在聊天流 {stream_id}" + else: + log_target = f"用户 {user_id} 在聊天流 {stream_id}" if user_id else f"聊天流 {stream_id}" - # 取消现有任务 - context.processing_task.cancel() + logger.info(f"{log_target} 触发消息打断,打断概率: {interruption_probability:.2f}") + + task_to_check.cancel() try: - await context.processing_task + await task_to_check except asyncio.CancelledError: pass - # 增加打断计数并应用afc阈值降低 context.increment_interruption_count() context.apply_interruption_afc_reduction(global_config.chat.interruption_afc_reduction) logger.info( @@ -296,145 +286,47 @@ class MessageManager: else: logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - def _calculate_dynamic_distribution_interval(self) -> float: - """根据所有活跃聊天流的focus_energy动态计算分发周期""" - if not global_config.chat.dynamic_distribution_enabled: - return self.check_interval # 使用固定间隔 - - if not self.stream_contexts: - return self.check_interval # 默认间隔 - - # 计算活跃流的平均focus_energy - active_streams = [ctx for ctx in self.stream_contexts.values() if ctx.is_active] - if not active_streams: - return self.check_interval - - total_focus_energy = 0.0 - max_focus_energy = 0.0 - stream_count = 0 - - for context in active_streams: - if hasattr(context, 'chat_stream') and context.chat_stream: - focus_energy = context.chat_stream.focus_energy - total_focus_energy += focus_energy - max_focus_energy = max(max_focus_energy, focus_energy) - stream_count += 1 - - if stream_count == 0: - return self.check_interval - - avg_focus_energy = total_focus_energy / stream_count - - # 使用配置参数 - base_interval = global_config.chat.dynamic_distribution_base_interval - min_interval = global_config.chat.dynamic_distribution_min_interval - max_interval = global_config.chat.dynamic_distribution_max_interval - jitter_factor = global_config.chat.dynamic_distribution_jitter_factor - - # 根据平均兴趣度调整间隔 - # 高兴趣度 -> 更频繁检查 (间隔更短) - # 低兴趣度 -> 较少检查 (间隔更长) - if avg_focus_energy >= 0.7: - # 高兴趣度:1-5秒 - interval = base_interval * (1.0 - (avg_focus_energy - 0.7) * 2.0) - elif avg_focus_energy >= 0.4: - # 中等兴趣度:5-15秒 - interval = base_interval * (1.0 + (avg_focus_energy - 0.4) * 3.33) - else: - # 低兴趣度:15-30秒 - interval = base_interval * (3.0 + (0.4 - avg_focus_energy) * 5.0) - - # 添加随机扰动避免同步 - import random - jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor) - final_interval = interval * jitter - - # 限制在合理范围内 - final_interval = max(min_interval, min(max_interval, final_interval)) - - logger.debug( - f"动态分发周期: {final_interval:.2f}s | " - f"平均兴趣度: {avg_focus_energy:.3f} | " - f"活跃流数: {stream_count}" - ) - - return final_interval - - def _calculate_stream_distribution_interval(self, context: StreamContext) -> float: + def _calculate_dynamic_distribution_interval(self, context: StreamContext) -> float: """计算单个聊天流的分发周期 - 基于阈值感知的focus_energy""" if not global_config.chat.dynamic_distribution_enabled: - return self.check_interval # 使用固定间隔 + return self.check_interval - # 获取该流的focus_energy(新的阈值感知版本) - focus_energy = 0.5 # 默认值 - avg_message_interest = 0.5 # 默认平均兴趣度 + focus_energy = 0.5 + avg_message_interest = 0.5 if hasattr(context, 'chat_stream') and context.chat_stream: focus_energy = context.chat_stream.focus_energy - # 获取平均消息兴趣度用于更精确的计算 if context.chat_stream.message_count > 0: avg_message_interest = context.chat_stream.message_interest_total / context.chat_stream.message_count - # 获取AFC阈值用于参考,添加None值检查 reply_threshold = getattr(global_config.affinity_flow, 'reply_action_interest_threshold', 0.4) non_reply_threshold = getattr(global_config.affinity_flow, 'non_reply_action_interest_threshold', 0.2) high_match_threshold = getattr(global_config.affinity_flow, 'high_match_interest_threshold', 0.8) - # 使用配置参数 base_interval = global_config.chat.dynamic_distribution_base_interval min_interval = global_config.chat.dynamic_distribution_min_interval max_interval = global_config.chat.dynamic_distribution_max_interval jitter_factor = global_config.chat.dynamic_distribution_jitter_factor - # 基于阈值感知的智能分发周期计算 if avg_message_interest >= high_match_threshold: - # 超高兴趣度:极快响应 (1-2秒) interval_multiplier = 0.3 + (focus_energy - 0.7) * 2.0 elif avg_message_interest >= reply_threshold: - # 高兴趣度:快速响应 (2-6秒) gap_from_reply = (avg_message_interest - reply_threshold) / (high_match_threshold - reply_threshold) interval_multiplier = 0.6 + gap_from_reply * 0.4 elif avg_message_interest >= non_reply_threshold: - # 中等兴趣度:正常响应 (6-15秒) gap_from_non_reply = (avg_message_interest - non_reply_threshold) / (reply_threshold - non_reply_threshold) interval_multiplier = 1.2 + gap_from_non_reply * 1.8 else: - # 低兴趣度:缓慢响应 (15-30秒) gap_ratio = max(0, avg_message_interest / non_reply_threshold) interval_multiplier = 3.0 + (1.0 - gap_ratio) * 3.0 - # 应用focus_energy微调 energy_adjustment = 1.0 + (focus_energy - 0.5) * 0.5 interval = base_interval * interval_multiplier * energy_adjustment - # 添加随机扰动避免同步 - import random jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor) final_interval = interval * jitter - # 限制在合理范围内 final_interval = max(min_interval, min(max_interval, final_interval)) - - # 根据兴趣度级别调整日志级别 - if avg_message_interest >= high_match_threshold: - log_level = "info" - elif avg_message_interest >= reply_threshold: - log_level = "info" - else: - log_level = "debug" - - log_msg = ( - f"流 {context.stream_id} 分发周期: {final_interval:.2f}s | " - f"focus_energy: {focus_energy:.3f} | " - f"avg_interest: {avg_message_interest:.3f} | " - f"阈值参考: {non_reply_threshold:.2f}/{reply_threshold:.2f}/{high_match_threshold:.2f}" - ) - - if log_level == "info": - logger.info(log_msg) - else: - logger.debug(log_msg) - return final_interval def _calculate_next_manager_delay(self) -> float: @@ -442,7 +334,6 @@ class MessageManager: current_time = time.time() min_delay = float('inf') - # 找到最近需要检查的流 for context in self.stream_contexts.values(): if not context.is_active: continue @@ -451,14 +342,11 @@ class MessageManager: if time_until_check > 0: min_delay = min(min_delay, time_until_check) else: - min_delay = 0.1 # 立即检查 - break + return 0.1 - # 如果没有活跃流,使用默认间隔 if min_delay == float('inf'): return self.check_interval - # 确保最小延迟 return max(0.1, min(min_delay, self.check_interval)) async def _check_streams_with_individual_intervals(self): @@ -470,156 +358,106 @@ class MessageManager: if not context.is_active: continue - # 检查是否达到检查时间 if current_time >= context.next_check_time: - # 更新检查时间 context.last_check_time = current_time - - # 计算下次检查时间和分发周期 if global_config.chat.dynamic_distribution_enabled: context.distribution_interval = self._calculate_stream_distribution_interval(context) else: context.distribution_interval = self.check_interval - - # 设置下次检查时间 context.next_check_time = current_time + context.distribution_interval - # 检查未读消息 unread_messages = context.get_unread_messages() - if unread_messages: - processed_streams += 1 - self.stats.total_unread_messages = len(unread_messages) + if not unread_messages: + continue - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - focus_energy = context.chat_stream.focus_energy if hasattr(context, 'chat_stream') and context.chat_stream else 0.5 - - # 根据优先级记录日志 - if focus_energy >= 0.7: - logger.info( - f"高优先级流 {stream_id} 开始处理 | " - f"focus_energy: {focus_energy:.3f} | " - f"分发周期: {context.distribution_interval:.2f}s | " - f"未读消息: {len(unread_messages)}" - ) + processed_streams += 1 + + if global_config.chat.concurrent_message_processing: + if global_config.chat.process_by_user_id: + user_messages = {} + for msg in unread_messages: + user_id = msg.user_info.user_id if hasattr(msg, 'user_info') and msg.user_info else 'unknown_user' + if user_id not in user_messages: + user_messages[user_id] = [] + user_messages[user_id].append(msg) + + for user_id, messages in user_messages.items(): + await self._check_and_handle_interruption(context, stream_id, messages, user_id) + if not context.user_processing_tasks.get(user_id) or context.user_processing_tasks[user_id].done(): + task = asyncio.create_task(self._process_and_send_reply(context, messages)) + context.user_processing_tasks[user_id] = task + else: + # Fix: Ensure unread_messages is available in this branch + all_unread_messages = context.get_unread_messages() + if all_unread_messages: + if not global_config.chat.concurrent_message_processing: + await self._check_and_handle_interruption(context, stream_id, all_unread_messages) + if not context.processing_task or context.processing_task.done(): + context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id, all_unread_messages)) else: - logger.debug( - f"流 {stream_id} 开始处理 | " - f"focus_energy: {focus_energy:.3f} | " - f"分发周期: {context.distribution_interval:.2f}s" - ) - - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 更新活跃流计数 - active_count = sum(1 for ctx in self.stream_contexts.values() if ctx.is_active) - self.stats.active_streams = active_count - - if processed_streams > 0: - logger.debug( - f"本次循环处理了 {processed_streams} 个流 | " - f"活跃流总数: {active_count}" - ) - - async def _check_all_streams_with_priority(self): - """按优先级检查所有聊天流,高focus_energy的流优先处理""" - if not self.stream_contexts: + await self._check_and_handle_interruption(context, stream_id, all_unread_messages) + if not context.processing_task or context.processing_task.done(): + task = asyncio.create_task(self._process_and_send_reply(context, all_unread_messages)) + context.processing_task = task + # The original 'else' block for the 'if current_time >= context.next_check_time:' check + # was problematic. It seems it tried to process messages even when it wasn't time. + # Removing it should fix the UnboundLocalError and align with the logic of checking the time first. + + async def _process_and_send_reply(self, context: StreamContext, unread_messages: list): + """在后台处理单批消息并加锁发送 (并发模式专用)""" + if not self.concurrent_semaphore: + logger.error("并发信号量未初始化") return - # 获取活跃的聊天流并按focus_energy排序 - active_streams = [] - for stream_id, context in self.stream_contexts.items(): - if not context.is_active: - continue + user_id = unread_messages[0].user_info.user_id if global_config.chat.process_by_user_id and unread_messages and hasattr(unread_messages[0], 'user_info') else None - # 获取focus_energy,如果不存在则使用默认值 - focus_energy = 0.5 - if hasattr(context, 'chat_stream') and context.chat_stream: - focus_energy = context.chat_stream.focus_energy - - # 计算流优先级分数 - priority_score = self._calculate_stream_priority(context, focus_energy) - active_streams.append((priority_score, stream_id, context)) - - # 按优先级降序排序 - active_streams.sort(reverse=True, key=lambda x: x[0]) - - # 处理排序后的流 - active_stream_count = 0 - total_unread = 0 - - for priority_score, stream_id, context in active_streams: - active_stream_count += 1 - - # 检查是否有未读消息 - unread_messages = context.get_unread_messages() - if unread_messages: - total_unread += len(unread_messages) - - # 如果没有处理任务,创建一个 - if not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) - - # 高优先级流的额外日志 - if priority_score > 0.7: - logger.info( - f"高优先级流 {stream_id} 开始处理 | " - f"优先级: {priority_score:.3f} | " - f"未读消息: {len(unread_messages)}" - ) - - # 更新统计 - self.stats.active_streams = active_stream_count - self.stats.total_unread_messages = total_unread - - def _calculate_stream_priority(self, context: StreamContext, focus_energy: float) -> float: - """计算聊天流的优先级分数""" - # 基础优先级:focus_energy - base_priority = focus_energy - - # 未读消息数量加权 - unread_count = len(context.get_unread_messages()) - message_count_bonus = min(unread_count * 0.1, 0.3) # 最多30%加成 - - # 时间加权:最近活跃的流优先级更高 - current_time = time.time() - time_since_active = current_time - context.last_check_time - time_penalty = max(0, 1.0 - time_since_active / 3600.0) # 1小时内无惩罚 - - # 连续无回复惩罚 - if hasattr(context, 'chat_stream') and context.chat_stream: - consecutive_no_reply = context.chat_stream.consecutive_no_reply - no_reply_penalty = max(0, 1.0 - consecutive_no_reply * 0.05) # 每次无回复降低5% - else: - no_reply_penalty = 1.0 - - # 综合优先级计算 - final_priority = ( - base_priority * 0.6 + # 基础兴趣度权重60% - message_count_bonus * 0.2 + # 消息数量权重20% - time_penalty * 0.1 + # 时间权重10% - no_reply_penalty * 0.1 # 回复状态权重10% - ) - - return max(0.0, min(1.0, final_priority)) - - def _clear_all_unread_messages(self, context: StreamContext): - """清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" - unread_messages = context.get_unread_messages() - if not unread_messages: - return - - logger.warning(f"正在清除 {len(unread_messages)} 条未读消息") - - # 将所有未读消息标记为已读并移动到历史记录 - for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表 + async with self.concurrent_semaphore: try: - context.mark_message_as_read(msg.message_id) - self.stats.total_processed_messages += 1 - logger.debug(f"强制清除消息 {msg.message_id},标记为已读") + # 思考和发送都在锁内,确保单次回复的原子性 + async with context.send_lock: + logger.debug(f"发送任务锁定聊天流 {context.stream_id},准备处理和回复") + + results = await self.chatter_manager.process_stream_context(context.stream_id, context, unread_messages) + + if results.get("success", False): + self._clear_specific_unread_messages(context, unread_messages) + logger.debug(f"聊天流 {context.stream_id} 并发处理成功,清除了 {len(unread_messages)} 条未读消息") + else: + logger.warning(f"聊天流 {context.stream_id} 并发处理失败: {results.get('error_message', '未知错误')}") + + reply_delay = random.uniform(1.5, 3.0) + await asyncio.sleep(reply_delay) + + logger.debug(f"发送任务解锁聊天流 {context.stream_id}") + + except asyncio.CancelledError: + logger.info(f"用户 {user_id} 的任务被取消") + self._clear_specific_unread_messages(context, unread_messages) # 取消时也清除消息 + raise except Exception as e: - logger.error(f"清除消息 {msg.message_id} 时出错: {e}") + logger.error(f"后台回复处理任务出错: {e}") + traceback.print_exc() + self._clear_specific_unread_messages(context, unread_messages) + finally: + if user_id and user_id in context.user_processing_tasks: + if context.user_processing_tasks[user_id] is asyncio.current_task(): + del context.user_processing_tasks[user_id] + def _clear_specific_unread_messages(self, context: StreamContext, messages_to_clear: list): + """清除指定上下文中的特定未读消息""" + if not messages_to_clear: + return + message_ids_to_clear = {msg.message_id for msg in messages_to_clear} + + context.unread_messages = [msg for msg in context.unread_messages if msg.message_id not in message_ids_to_clear] + + for msg in messages_to_clear: + context.history_messages.append(msg) + + if len(context.history_messages) > 100: + context.history_messages = context.history_messages[-100:] + + # 创建全局消息管理器实例 message_manager = MessageManager() diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 3ba341997..86b15bfb1 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -74,20 +74,9 @@ class ChatConfig(ValidatedConfigBase): """聊天配置类""" max_context_size: int = Field(default=18, description="最大上下文大小") - replyer_random_probability: float = Field(default=0.5, description="回复者随机概率") thinking_timeout: int = Field(default=40, description="思考超时时间") - talk_frequency: float = Field(default=1.0, description="聊天频率") - mentioned_bot_inevitable_reply: bool = Field(default=False, description="提到机器人的必然回复") - at_bot_inevitable_reply: bool = Field(default=False, description="@机器人的必然回复") allow_reply_self: bool = Field(default=False, description="是否允许回复自己说的话") talk_frequency_adjust: list[list[str]] = Field(default_factory=lambda: [], description="聊天频率调整") - focus_value: float = Field(default=1.0, description="专注值") - focus_mode_quiet_groups: List[str] = Field( - default_factory=list, - description='专注模式下需要保持安静的群组列表, 格式: ["platform:group_id1", "platform:group_id2"]', - ) - force_reply_private: bool = Field(default=False, description="强制回复私聊") - group_chat_mode: Literal["auto", "normal", "focus"] = Field(default="auto", description="群聊模式") timestamp_display_mode: Literal["normal", "normal_no_YMD", "relative"] = Field( default="normal_no_YMD", description="时间戳显示模式" ) @@ -128,46 +117,57 @@ class ChatConfig(ValidatedConfigBase): dynamic_distribution_jitter_factor: float = Field( default=0.2, ge=0.0, le=0.5, description="分发间隔随机扰动因子" ) - + + # 并发消息处理 + concurrent_message_processing: bool = Field( + default=False, description="是否启用并发消息处理,在同一聊天流中并行处理多个消息" + ) + concurrent_per_user_limit: int = Field( + default=3, description="在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数" + ) + process_by_user_id: bool = Field( + default=True, description="在并发模式下,是否按用户ID进行独立串行处理" + ) + def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float: """ 根据当前时间和聊天流获取对应的 talk_frequency - + Args: chat_stream_id: 聊天流ID,格式为 "platform:chat_id:type" - + Returns: float: 对应的频率值 """ if not self.talk_frequency_adjust: - return self.talk_frequency - + return 1.0 + # 优先检查聊天流特定的配置 if chat_stream_id: stream_frequency = self._get_stream_specific_frequency(chat_stream_id) if stream_frequency is not None: return stream_frequency - + # 检查全局时段配置(第一个元素为空字符串的配置) global_frequency = self._get_global_frequency() - return self.talk_frequency if global_frequency is None else global_frequency - + return 1.0 if global_frequency is None else global_frequency + def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]: """ 根据时间配置列表获取当前时段的频率 - + Args: time_freq_list: 时间频率配置列表,格式为 ["HH:MM,frequency", ...] - + Returns: float: 频率值,如果没有配置则返回 None """ from datetime import datetime - + current_time = datetime.now().strftime("%H:%M") current_hour, current_minute = map(int, current_time.split(":")) current_minutes = current_hour * 60 + current_minute - + # 解析时间频率配置 time_freq_pairs = [] for time_freq_str in time_freq_list: @@ -179,13 +179,13 @@ class ChatConfig(ValidatedConfigBase): time_freq_pairs.append((minutes, frequency)) except (ValueError, IndexError): continue - + if not time_freq_pairs: return None - + # 按时间排序 time_freq_pairs.sort(key=lambda x: x[0]) - + # 查找当前时间对应的频率 current_frequency = None for minutes, frequency in time_freq_pairs: @@ -193,20 +193,20 @@ class ChatConfig(ValidatedConfigBase): current_frequency = frequency else: break - + # 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑) if current_frequency is None and time_freq_pairs: current_frequency = time_freq_pairs[-1][1] - + return current_frequency - + def _get_stream_specific_frequency(self, chat_stream_id: str): """ 获取特定聊天流在当前时间的频率 - + Args: chat_stream_id: 聊天流ID(哈希值) - + Returns: float: 频率值,如果没有配置则返回 None """ @@ -214,30 +214,30 @@ class ChatConfig(ValidatedConfigBase): for config_item in self.talk_frequency_adjust: if not config_item or len(config_item) < 2: continue - + stream_config_str = config_item[0] # 例如 "qq:1026294844:group" - + # 解析配置字符串并生成对应的 chat_id config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str) if config_chat_id is None: continue - + # 比较生成的 chat_id if config_chat_id != chat_stream_id: continue - + # 使用通用的时间频率解析方法 return self._get_time_based_frequency(config_item[1:]) - + return None - + def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id - + Args: stream_config_str: 格式为 "platform:id:type" 的字符串 - + Returns: str: 生成的 chat_id,如果解析失败则返回 None """ @@ -245,42 +245,42 @@ class ChatConfig(ValidatedConfigBase): parts = stream_config_str.split(":") if len(parts) != 3: return None - + platform = parts[0] id_str = parts[1] stream_type = parts[2] - + # 判断是否为群聊 is_group = stream_type == "group" - + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id import hashlib - + if is_group: components = [platform, str(id_str)] else: components = [platform, str(id_str), "private"] key = "_".join(components) return hashlib.md5(key.encode()).hexdigest() - + except (ValueError, IndexError): return None - + def _get_global_frequency(self) -> Optional[float]: """ 获取全局默认频率配置 - + Returns: float: 频率值,如果没有配置则返回 None """ for config_item in self.talk_frequency_adjust: if not config_item or len(config_item) < 2: continue - + # 检查是否为全局默认配置(第一个元素为空字符串) if config_item[0] == "": return self._get_time_based_frequency(config_item[1:]) - + return None @@ -313,10 +313,10 @@ class ExpressionConfig(ValidatedConfigBase): def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id - + Args: stream_config_str: 格式为 "platform:id:type" 的字符串 - + Returns: str: 生成的 chat_id,如果解析失败则返回 None """ @@ -324,52 +324,52 @@ class ExpressionConfig(ValidatedConfigBase): parts = stream_config_str.split(":") if len(parts) != 3: return None - + platform = parts[0] id_str = parts[1] stream_type = parts[2] - + # 判断是否为群聊 is_group = stream_type == "group" - + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id import hashlib - + if is_group: components = [platform, str(id_str)] else: components = [platform, str(id_str), "private"] key = "_".join(components) return hashlib.md5(key.encode()).hexdigest() - + except (ValueError, IndexError): return None - + def get_expression_config_for_chat(self, chat_stream_id: Optional[str] = None) -> tuple[bool, bool, float]: """ 根据聊天流ID获取表达配置 - + Args: chat_stream_id: 聊天流ID,格式为哈希值 - + Returns: tuple: (是否使用表达, 是否学习表达, 学习间隔) """ if not self.rules: # 如果没有配置,使用默认值:启用表达,启用学习,强度1.0 return True, True, 1.0 - + # 优先检查聊天流特定的配置 if chat_stream_id: for rule in self.rules: if rule.chat_stream_id and self._parse_stream_config_to_chat_id(rule.chat_stream_id) == chat_stream_id: return rule.use_expression, rule.learn_expression, rule.learning_strength - + # 检查全局配置(chat_stream_id为空字符串的配置) for rule in self.rules: if rule.chat_stream_id == "": return rule.use_expression, rule.learn_expression, rule.learning_strength - + # 如果都没有匹配,返回默认值 return True, True, 1.0 @@ -443,7 +443,7 @@ class KeywordRuleConfig(ValidatedConfigBase): def __post_init__(self): import re - + if not self.keywords and not self.regex: raise ValueError("关键词规则必须至少包含keywords或regex中的一个") if not self.reaction: @@ -466,7 +466,6 @@ class CustomPromptConfig(ValidatedConfigBase): """自定义提示词配置类""" image_prompt: str = Field(default="", description="图片提示词") - planner_custom_prompt_enable: bool = Field(default=False, description="启用规划器自定义提示词") planner_custom_prompt_content: str = Field(default="", description="规划器自定义提示词内容") diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py index 08f5f7098..35416a297 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py @@ -53,12 +53,13 @@ class AffinityChatter(BaseChatter): } self.last_activity_time = time.time() - async def execute(self, context: StreamContext) -> dict: + async def execute(self, context: StreamContext, unread_messages: list | None = None) -> dict: """ 处理StreamContext对象 Args: context: StreamContext对象,包含聊天流的所有消息信息 + unread_messages: (可选) 指定要处理的未读消息列表,用于并发处理 Returns: 处理结果字典 @@ -68,10 +69,12 @@ class AffinityChatter(BaseChatter): learner = expression_learner_manager.get_expression_learner(self.stream_id) asyncio.create_task(learner.trigger_learning_for_chat()) - unread_messages = context.get_unread_messages() + # 如果没有提供未读消息列表,则从上下文中获取 + if unread_messages is None: + unread_messages = context.get_unread_messages() # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(context=context) + actions, target_message = await self.planner.plan(context=context, unread_messages=unread_messages) self.stats["plans_created"] += 1 # 执行动作(如果规划器返回了动作) diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index 4793e2835..f484713f2 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -218,10 +218,6 @@ class ChatterPlanFilter: self.last_obs_time_mark = time.time() mentioned_bonus = "" - if global_config.chat.mentioned_bot_inevitable_reply: - mentioned_bonus = "\n- 有人提到你" - if global_config.chat.at_bot_inevitable_reply: - mentioned_bonus = "\n- 有人提到你,或者at你" if plan.mode == ChatMode.FOCUS: no_action_block = """ diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index 56211d80e..ddf05b8f4 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -70,12 +70,15 @@ class ChatterActionPlanner: "other_actions_executed": 0, } - async def plan(self, context: "StreamContext" = None) -> Tuple[List[Dict], Optional[Dict]]: + async def plan( + self, context: "StreamContext" = None, unread_messages: Optional[List[Dict]] = None + ) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 Args: context (StreamContext): 包含聊天流消息的上下文对象。 + unread_messages (Optional[List[Dict]]): (可选) 指定要处理的未读消息列表,用于并发处理 Returns: Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: @@ -85,14 +88,16 @@ class ChatterActionPlanner: try: self.planner_stats["total_plans"] += 1 - return await self._enhanced_plan_flow(context) - + return await self._enhanced_plan_flow(context, unread_messages) + except Exception as e: logger.error(f"规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 return [], None - async def _enhanced_plan_flow(self, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]: + async def _enhanced_plan_flow( + self, context: "StreamContext", unread_messages: Optional[List[Dict]] = None + ) -> Tuple[List[Dict], Optional[Dict]]: """执行增强版规划流程""" try: # 在规划前,先进行动作修改 @@ -106,7 +111,10 @@ class ChatterActionPlanner: # 确保Plan中包含所有当前可用的动作 initial_plan.available_actions = self.action_manager.get_using_actions() - unread_messages = context.get_unread_messages() if context else [] + # 如果没有提供未读消息列表,则从上下文中获取 + if unread_messages is None: + unread_messages = context.get_unread_messages() if context else [] + # 2. 兴趣度评分 - 只对未读消息进行评分 if unread_messages: bot_nickname = global_config.bot.nickname diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index c298ecc16..e9cc25b9c 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.0.2" +version = "7.0.4" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -132,6 +132,13 @@ dynamic_distribution_min_interval = 1.0 # 最小分发间隔(秒) dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒) dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子 +# 是否启用并发消息处理,在同一聊天流中并行处理多个消息 +concurrent_message_processing = false +# 在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数 +concurrent_per_user_limit = 3 +# 在并发模式下,是否按用户ID进行独立串行处理 +process_by_user_id = true + talk_frequency_adjust = [ ["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"], ["qq:114514:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"], From 8d725911da32b065b011c9d4026811f46da7d3e1 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 26 Sep 2025 01:52:50 +0800 Subject: [PATCH 2/4] =?UTF-8?q?Revert:=20=E5=9B=9E=E9=80=80=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E7=9A=84=E7=9B=B8=E5=85=B3=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 由于并发消息处理的实现在测试中暴露出消息重复和目标ID丢失的问题,暂时回退至该功能合并前的稳定状态,以便进一步排查问题。 --- src/chat/chatter_manager.py | 13 +- src/chat/message_manager/message_manager.py | 540 ++++++++++++------ src/config/official_configs.py | 125 ++-- .../affinity_flow_chatter/affinity_chatter.py | 9 +- .../affinity_flow_chatter/plan_filter.py | 4 + .../built_in/affinity_flow_chatter/planner.py | 18 +- template/bot_config_template.toml | 9 +- 7 files changed, 430 insertions(+), 288 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index f795f479b..c906fd901 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -79,9 +79,7 @@ class ChatterManager: del self.instances[stream_id] logger.info(f"清理不活跃聊天流实例: {stream_id}") - async def process_stream_context( - self, stream_id: str, context: StreamContext, unread_messages: Optional[List[Any]] = None - ) -> dict: + async def process_stream_context(self, stream_id: str, context: StreamContext) -> dict: """处理流上下文""" chat_type = context.chat_type logger.debug(f"处理流 {stream_id},聊天类型: {chat_type.value}") @@ -106,14 +104,9 @@ class ChatterManager: self.stats["streams_processed"] += 1 try: - # 如果提供了 unread_messages,则传递给 execute 方法 - if unread_messages: - result = await self.instances[stream_id].execute(context, unread_messages) - else: - result = await self.instances[stream_id].execute(context) - + result = await self.instances[stream_id].execute(context) self.stats["successful_executions"] += 1 - + # 记录处理结果 success = result.get("success", False) actions_count = result.get("actions_count", 0) diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 0e18ca084..551e60782 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -7,7 +7,7 @@ import asyncio import random import time import traceback -from typing import Dict, Optional, Any, TYPE_CHECKING, List +from typing import Dict, Optional, Any, TYPE_CHECKING from src.common.logger import get_logger from src.common.data_models.database_data_model import DatabaseMessages @@ -30,13 +30,10 @@ class MessageManager: def __init__(self, check_interval: float = 5.0): self.stream_contexts: Dict[str, StreamContext] = {} - self.check_interval = check_interval + self.check_interval = check_interval # 检查间隔(秒) self.is_running = False self.manager_task: Optional[asyncio.Task] = None - # 并发控制信号量 - self.concurrent_semaphore: Optional[asyncio.Semaphore] = None - # 统计信息 self.stats = MessageManagerStats() @@ -56,10 +53,6 @@ class MessageManager: self.is_running = True self.manager_task = asyncio.create_task(self._manager_loop()) - if global_config.chat.concurrent_message_processing: - limit = global_config.chat.concurrent_per_user_limit - self.concurrent_semaphore = asyncio.Semaphore(limit) - logger.info(f"并发处理已启用,全局并发限制: {limit}") await self.wakeup_manager.start() logger.info("消息管理器已启动") @@ -72,12 +65,8 @@ class MessageManager: # 停止所有流处理任务 for context in self.stream_contexts.values(): - if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done(): + if context.processing_task and not context.processing_task.done(): context.processing_task.cancel() - if hasattr(context, 'user_processing_tasks'): - for task in context.user_processing_tasks.values(): - if task and not task.done(): - task.cancel() # 停止管理器任务 if self.manager_task and not self.manager_task.done(): @@ -91,14 +80,9 @@ class MessageManager: """添加消息到指定聊天流""" # 获取或创建流上下文 if stream_id not in self.stream_contexts: - context = StreamContext(stream_id=stream_id) - # 为并发处理添加队列和锁 - if global_config.chat.concurrent_message_processing: - context.send_lock = asyncio.Lock() - context.user_processing_tasks = {} - self.stream_contexts[stream_id] = context + self.stream_contexts[stream_id] = StreamContext(stream_id=stream_id) self.stats.total_streams += 1 - + context = self.stream_contexts[stream_id] context.set_chat_mode(ChatMode.FOCUS) context.add_message(message) @@ -116,9 +100,10 @@ class MessageManager: await self._check_streams_with_individual_intervals() # 计算下次检查时间(使用最小间隔或固定间隔) - next_check_delay = self.check_interval if global_config.chat.dynamic_distribution_enabled: next_check_delay = self._calculate_next_manager_delay() + else: + next_check_delay = self.check_interval await asyncio.sleep(next_check_delay) except asyncio.CancelledError: @@ -127,63 +112,107 @@ class MessageManager: logger.error(f"消息管理器循环出错: {e}") traceback.print_exc() - async def _process_stream_messages(self, stream_id: str, unread_messages_override: List[DatabaseMessages]): - """ - 处理指定聊天流的消息 (非并发模式专用) - """ + async def _check_all_streams(self): + """检查所有聊天流""" + active_streams = 0 + total_unread = 0 + + for stream_id, context in self.stream_contexts.items(): + if not context.is_active: + continue + + active_streams += 1 + + # 检查是否有未读消息 + unread_messages = context.get_unread_messages() + if unread_messages: + total_unread += len(unread_messages) + + # 如果没有处理任务,创建一个 + if not context.processing_task or context.processing_task.done(): + context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) + + # 更新统计 + self.stats.active_streams = active_streams + self.stats.total_unread_messages = total_unread + + async def _process_stream_messages(self, stream_id: str): + """处理指定聊天流的消息""" if stream_id not in self.stream_contexts: return context = self.stream_contexts[stream_id] - context.processing_task = asyncio.current_task() - user_id = unread_messages_override[0].user_info.user_id if unread_messages_override and hasattr(unread_messages_override[0], 'user_info') else None try: - await self._check_and_handle_interruption(context, stream_id, unread_messages_override, user_id) + # 获取未读消息 + unread_messages = context.get_unread_messages() + if not unread_messages: + return + # 检查是否需要打断现有处理 + await self._check_and_handle_interruption(context, stream_id) + + # --- 睡眠状态检查 --- if self.sleep_manager.is_sleeping(): + logger.info(f"Bot正在睡觉,检查聊天流 {stream_id} 是否有唤醒触发器。") + was_woken_up = False is_private = context.is_private_chat() - for message in unread_messages_override: + + for message in unread_messages: is_mentioned = message.is_mentioned or False if is_private or is_mentioned: if self.wakeup_manager.add_wakeup_value(is_private, is_mentioned): was_woken_up = True - break + break # 一旦被吵醒,就跳出循环并处理消息 + if not was_woken_up: logger.debug(f"聊天流 {stream_id} 中没有唤醒触发器,保持消息未读状态。") - self._clear_specific_unread_messages(context, unread_messages_override) - return - logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。") + return # 退出,不处理消息 - logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages_override)} 条未读消息") - - results = await self.chatter_manager.process_stream_context(stream_id, context, unread_messages_override) - if results.get("success", False): - logger.debug(f"聊天流 {stream_id} 处理成功") - else: - logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") - - self._clear_specific_unread_messages(context, unread_messages_override) + logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。") + # --- 睡眠状态检查结束 --- + + logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages)} 条未读消息") + + # 直接使用StreamContext对象进行处理 + if unread_messages: + try: + # 记录当前chat type用于调试 + logger.debug(f"聊天流 {stream_id} 检测到的chat type: {context.chat_type.value}") + + # 发送到chatter manager,传递StreamContext对象 + results = await self.chatter_manager.process_stream_context(stream_id, context) + + # 处理结果,标记消息为已读 + if results.get("success", False): + self._clear_all_unread_messages(context) + logger.debug(f"聊天流 {stream_id} 处理成功,清除了 {len(unread_messages)} 条未读消息") + else: + logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}") + + except Exception as e: + logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}") + # 出现异常时也清除未读消息,避免重复处理 + self._clear_all_unread_messages(context) + raise + + logger.debug(f"聊天流 {stream_id} 消息处理完成") except asyncio.CancelledError: - logger.info(f"聊天流 {stream_id} 的处理任务被取消") - self._clear_specific_unread_messages(context, unread_messages_override) raise except Exception as e: - logger.error(f"处理聊天流 {stream_id} 时发生异常: {e}") + logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}") traceback.print_exc() - self._clear_specific_unread_messages(context, unread_messages_override) - finally: - context.processing_task = None - + def deactivate_stream(self, stream_id: str): """停用聊天流""" if stream_id in self.stream_contexts: context = self.stream_contexts[stream_id] context.is_active = False - if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done(): + # 取消处理任务 + if context.processing_task and not context.processing_task.done(): context.processing_task.cancel() logger.info(f"停用聊天流: {stream_id}") @@ -206,7 +235,7 @@ class MessageManager: unread_count=len(context.get_unread_messages()), history_count=len(context.history_messages), last_check_time=context.last_check_time, - has_active_task=bool(hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done()), + has_active_task=bool(context.processing_task and not context.processing_task.done()), ) def get_manager_stats(self) -> Dict[str, Any]: @@ -235,49 +264,30 @@ class MessageManager: del self.stream_contexts[stream_id] logger.info(f"清理不活跃聊天流: {stream_id}") - async def _check_and_handle_interruption( - self, context: StreamContext, stream_id: str, unread_messages: List[DatabaseMessages], user_id: Optional[str] = None - ): + async def _check_and_handle_interruption(self, context: StreamContext, stream_id: str): """检查并处理消息打断""" if not global_config.chat.interruption_enabled: return - if context.interruption_count >= global_config.chat.interruption_max_limit: - logger.debug(f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},本次不进行打断") - return - - task_to_check = None - if global_config.chat.concurrent_message_processing and global_config.chat.process_by_user_id and user_id: - task_to_check = context.user_processing_tasks.get(user_id) - else: - task_to_check = context.processing_task - - if task_to_check and not task_to_check.done(): + # 检查是否有正在进行的处理任务 + if context.processing_task and not context.processing_task.done(): + # 计算打断概率 interruption_probability = context.calculate_interruption_probability( global_config.chat.interruption_max_limit, global_config.chat.interruption_probability_factor ) + # 根据概率决定是否打断 if random.random() < interruption_probability: - user_nickname = "" - if user_id and unread_messages: - for msg in unread_messages: - if hasattr(msg, "user_info") and msg.user_info and msg.user_info.user_id == user_id: - user_nickname = msg.user_info.user_nickname - break - - if user_nickname: - log_target = f"用户'{user_nickname}({user_id})'在聊天流 {stream_id}" - else: - log_target = f"用户 {user_id} 在聊天流 {stream_id}" if user_id else f"聊天流 {stream_id}" + logger.info(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") - logger.info(f"{log_target} 触发消息打断,打断概率: {interruption_probability:.2f}") - - task_to_check.cancel() + # 取消现有任务 + context.processing_task.cancel() try: - await task_to_check + await context.processing_task except asyncio.CancelledError: pass + # 增加打断计数并应用afc阈值降低 context.increment_interruption_count() context.apply_interruption_afc_reduction(global_config.chat.interruption_afc_reduction) logger.info( @@ -286,47 +296,145 @@ class MessageManager: else: logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - def _calculate_dynamic_distribution_interval(self, context: StreamContext) -> float: - """计算单个聊天流的分发周期 - 基于阈值感知的focus_energy""" + def _calculate_dynamic_distribution_interval(self) -> float: + """根据所有活跃聊天流的focus_energy动态计算分发周期""" if not global_config.chat.dynamic_distribution_enabled: + return self.check_interval # 使用固定间隔 + + if not self.stream_contexts: + return self.check_interval # 默认间隔 + + # 计算活跃流的平均focus_energy + active_streams = [ctx for ctx in self.stream_contexts.values() if ctx.is_active] + if not active_streams: return self.check_interval - focus_energy = 0.5 - avg_message_interest = 0.5 + total_focus_energy = 0.0 + max_focus_energy = 0.0 + stream_count = 0 - if hasattr(context, 'chat_stream') and context.chat_stream: - focus_energy = context.chat_stream.focus_energy - if context.chat_stream.message_count > 0: - avg_message_interest = context.chat_stream.message_interest_total / context.chat_stream.message_count + for context in active_streams: + if hasattr(context, 'chat_stream') and context.chat_stream: + focus_energy = context.chat_stream.focus_energy + total_focus_energy += focus_energy + max_focus_energy = max(max_focus_energy, focus_energy) + stream_count += 1 - reply_threshold = getattr(global_config.affinity_flow, 'reply_action_interest_threshold', 0.4) - non_reply_threshold = getattr(global_config.affinity_flow, 'non_reply_action_interest_threshold', 0.2) - high_match_threshold = getattr(global_config.affinity_flow, 'high_match_interest_threshold', 0.8) + if stream_count == 0: + return self.check_interval + avg_focus_energy = total_focus_energy / stream_count + + # 使用配置参数 base_interval = global_config.chat.dynamic_distribution_base_interval min_interval = global_config.chat.dynamic_distribution_min_interval max_interval = global_config.chat.dynamic_distribution_max_interval jitter_factor = global_config.chat.dynamic_distribution_jitter_factor - if avg_message_interest >= high_match_threshold: - interval_multiplier = 0.3 + (focus_energy - 0.7) * 2.0 - elif avg_message_interest >= reply_threshold: - gap_from_reply = (avg_message_interest - reply_threshold) / (high_match_threshold - reply_threshold) - interval_multiplier = 0.6 + gap_from_reply * 0.4 - elif avg_message_interest >= non_reply_threshold: - gap_from_non_reply = (avg_message_interest - non_reply_threshold) / (reply_threshold - non_reply_threshold) - interval_multiplier = 1.2 + gap_from_non_reply * 1.8 + # 根据平均兴趣度调整间隔 + # 高兴趣度 -> 更频繁检查 (间隔更短) + # 低兴趣度 -> 较少检查 (间隔更长) + if avg_focus_energy >= 0.7: + # 高兴趣度:1-5秒 + interval = base_interval * (1.0 - (avg_focus_energy - 0.7) * 2.0) + elif avg_focus_energy >= 0.4: + # 中等兴趣度:5-15秒 + interval = base_interval * (1.0 + (avg_focus_energy - 0.4) * 3.33) else: - gap_ratio = max(0, avg_message_interest / non_reply_threshold) - interval_multiplier = 3.0 + (1.0 - gap_ratio) * 3.0 - - energy_adjustment = 1.0 + (focus_energy - 0.5) * 0.5 - interval = base_interval * interval_multiplier * energy_adjustment + # 低兴趣度:15-30秒 + interval = base_interval * (3.0 + (0.4 - avg_focus_energy) * 5.0) + # 添加随机扰动避免同步 + import random jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor) final_interval = interval * jitter + # 限制在合理范围内 final_interval = max(min_interval, min(max_interval, final_interval)) + + logger.debug( + f"动态分发周期: {final_interval:.2f}s | " + f"平均兴趣度: {avg_focus_energy:.3f} | " + f"活跃流数: {stream_count}" + ) + + return final_interval + + def _calculate_stream_distribution_interval(self, context: StreamContext) -> float: + """计算单个聊天流的分发周期 - 基于阈值感知的focus_energy""" + if not global_config.chat.dynamic_distribution_enabled: + return self.check_interval # 使用固定间隔 + + # 获取该流的focus_energy(新的阈值感知版本) + focus_energy = 0.5 # 默认值 + avg_message_interest = 0.5 # 默认平均兴趣度 + + if hasattr(context, 'chat_stream') and context.chat_stream: + focus_energy = context.chat_stream.focus_energy + # 获取平均消息兴趣度用于更精确的计算 + if context.chat_stream.message_count > 0: + avg_message_interest = context.chat_stream.message_interest_total / context.chat_stream.message_count + + # 获取AFC阈值用于参考,添加None值检查 + reply_threshold = getattr(global_config.affinity_flow, 'reply_action_interest_threshold', 0.4) + non_reply_threshold = getattr(global_config.affinity_flow, 'non_reply_action_interest_threshold', 0.2) + high_match_threshold = getattr(global_config.affinity_flow, 'high_match_interest_threshold', 0.8) + + # 使用配置参数 + base_interval = global_config.chat.dynamic_distribution_base_interval + min_interval = global_config.chat.dynamic_distribution_min_interval + max_interval = global_config.chat.dynamic_distribution_max_interval + jitter_factor = global_config.chat.dynamic_distribution_jitter_factor + + # 基于阈值感知的智能分发周期计算 + if avg_message_interest >= high_match_threshold: + # 超高兴趣度:极快响应 (1-2秒) + interval_multiplier = 0.3 + (focus_energy - 0.7) * 2.0 + elif avg_message_interest >= reply_threshold: + # 高兴趣度:快速响应 (2-6秒) + gap_from_reply = (avg_message_interest - reply_threshold) / (high_match_threshold - reply_threshold) + interval_multiplier = 0.6 + gap_from_reply * 0.4 + elif avg_message_interest >= non_reply_threshold: + # 中等兴趣度:正常响应 (6-15秒) + gap_from_non_reply = (avg_message_interest - non_reply_threshold) / (reply_threshold - non_reply_threshold) + interval_multiplier = 1.2 + gap_from_non_reply * 1.8 + else: + # 低兴趣度:缓慢响应 (15-30秒) + gap_ratio = max(0, avg_message_interest / non_reply_threshold) + interval_multiplier = 3.0 + (1.0 - gap_ratio) * 3.0 + + # 应用focus_energy微调 + energy_adjustment = 1.0 + (focus_energy - 0.5) * 0.5 + interval = base_interval * interval_multiplier * energy_adjustment + + # 添加随机扰动避免同步 + import random + jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor) + final_interval = interval * jitter + + # 限制在合理范围内 + final_interval = max(min_interval, min(max_interval, final_interval)) + + # 根据兴趣度级别调整日志级别 + if avg_message_interest >= high_match_threshold: + log_level = "info" + elif avg_message_interest >= reply_threshold: + log_level = "info" + else: + log_level = "debug" + + log_msg = ( + f"流 {context.stream_id} 分发周期: {final_interval:.2f}s | " + f"focus_energy: {focus_energy:.3f} | " + f"avg_interest: {avg_message_interest:.3f} | " + f"阈值参考: {non_reply_threshold:.2f}/{reply_threshold:.2f}/{high_match_threshold:.2f}" + ) + + if log_level == "info": + logger.info(log_msg) + else: + logger.debug(log_msg) + return final_interval def _calculate_next_manager_delay(self) -> float: @@ -334,6 +442,7 @@ class MessageManager: current_time = time.time() min_delay = float('inf') + # 找到最近需要检查的流 for context in self.stream_contexts.values(): if not context.is_active: continue @@ -342,11 +451,14 @@ class MessageManager: if time_until_check > 0: min_delay = min(min_delay, time_until_check) else: - return 0.1 + min_delay = 0.1 # 立即检查 + break + # 如果没有活跃流,使用默认间隔 if min_delay == float('inf'): return self.check_interval + # 确保最小延迟 return max(0.1, min(min_delay, self.check_interval)) async def _check_streams_with_individual_intervals(self): @@ -358,106 +470,156 @@ class MessageManager: if not context.is_active: continue + # 检查是否达到检查时间 if current_time >= context.next_check_time: + # 更新检查时间 context.last_check_time = current_time + + # 计算下次检查时间和分发周期 if global_config.chat.dynamic_distribution_enabled: context.distribution_interval = self._calculate_stream_distribution_interval(context) else: context.distribution_interval = self.check_interval + + # 设置下次检查时间 context.next_check_time = current_time + context.distribution_interval + # 检查未读消息 unread_messages = context.get_unread_messages() - if not unread_messages: - continue + if unread_messages: + processed_streams += 1 + self.stats.total_unread_messages = len(unread_messages) - processed_streams += 1 - - if global_config.chat.concurrent_message_processing: - if global_config.chat.process_by_user_id: - user_messages = {} - for msg in unread_messages: - user_id = msg.user_info.user_id if hasattr(msg, 'user_info') and msg.user_info else 'unknown_user' - if user_id not in user_messages: - user_messages[user_id] = [] - user_messages[user_id].append(msg) - - for user_id, messages in user_messages.items(): - await self._check_and_handle_interruption(context, stream_id, messages, user_id) - if not context.user_processing_tasks.get(user_id) or context.user_processing_tasks[user_id].done(): - task = asyncio.create_task(self._process_and_send_reply(context, messages)) - context.user_processing_tasks[user_id] = task - else: - # Fix: Ensure unread_messages is available in this branch - all_unread_messages = context.get_unread_messages() - if all_unread_messages: - if not global_config.chat.concurrent_message_processing: - await self._check_and_handle_interruption(context, stream_id, all_unread_messages) - if not context.processing_task or context.processing_task.done(): - context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id, all_unread_messages)) + # 如果没有处理任务,创建一个 + if not context.processing_task or context.processing_task.done(): + focus_energy = context.chat_stream.focus_energy if hasattr(context, 'chat_stream') and context.chat_stream else 0.5 + + # 根据优先级记录日志 + if focus_energy >= 0.7: + logger.info( + f"高优先级流 {stream_id} 开始处理 | " + f"focus_energy: {focus_energy:.3f} | " + f"分发周期: {context.distribution_interval:.2f}s | " + f"未读消息: {len(unread_messages)}" + ) else: - await self._check_and_handle_interruption(context, stream_id, all_unread_messages) - if not context.processing_task or context.processing_task.done(): - task = asyncio.create_task(self._process_and_send_reply(context, all_unread_messages)) - context.processing_task = task - # The original 'else' block for the 'if current_time >= context.next_check_time:' check - # was problematic. It seems it tried to process messages even when it wasn't time. - # Removing it should fix the UnboundLocalError and align with the logic of checking the time first. - - async def _process_and_send_reply(self, context: StreamContext, unread_messages: list): - """在后台处理单批消息并加锁发送 (并发模式专用)""" - if not self.concurrent_semaphore: - logger.error("并发信号量未初始化") + logger.debug( + f"流 {stream_id} 开始处理 | " + f"focus_energy: {focus_energy:.3f} | " + f"分发周期: {context.distribution_interval:.2f}s" + ) + + context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) + + # 更新活跃流计数 + active_count = sum(1 for ctx in self.stream_contexts.values() if ctx.is_active) + self.stats.active_streams = active_count + + if processed_streams > 0: + logger.debug( + f"本次循环处理了 {processed_streams} 个流 | " + f"活跃流总数: {active_count}" + ) + + async def _check_all_streams_with_priority(self): + """按优先级检查所有聊天流,高focus_energy的流优先处理""" + if not self.stream_contexts: return - user_id = unread_messages[0].user_info.user_id if global_config.chat.process_by_user_id and unread_messages and hasattr(unread_messages[0], 'user_info') else None + # 获取活跃的聊天流并按focus_energy排序 + active_streams = [] + for stream_id, context in self.stream_contexts.items(): + if not context.is_active: + continue - async with self.concurrent_semaphore: + # 获取focus_energy,如果不存在则使用默认值 + focus_energy = 0.5 + if hasattr(context, 'chat_stream') and context.chat_stream: + focus_energy = context.chat_stream.focus_energy + + # 计算流优先级分数 + priority_score = self._calculate_stream_priority(context, focus_energy) + active_streams.append((priority_score, stream_id, context)) + + # 按优先级降序排序 + active_streams.sort(reverse=True, key=lambda x: x[0]) + + # 处理排序后的流 + active_stream_count = 0 + total_unread = 0 + + for priority_score, stream_id, context in active_streams: + active_stream_count += 1 + + # 检查是否有未读消息 + unread_messages = context.get_unread_messages() + if unread_messages: + total_unread += len(unread_messages) + + # 如果没有处理任务,创建一个 + if not context.processing_task or context.processing_task.done(): + context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id)) + + # 高优先级流的额外日志 + if priority_score > 0.7: + logger.info( + f"高优先级流 {stream_id} 开始处理 | " + f"优先级: {priority_score:.3f} | " + f"未读消息: {len(unread_messages)}" + ) + + # 更新统计 + self.stats.active_streams = active_stream_count + self.stats.total_unread_messages = total_unread + + def _calculate_stream_priority(self, context: StreamContext, focus_energy: float) -> float: + """计算聊天流的优先级分数""" + # 基础优先级:focus_energy + base_priority = focus_energy + + # 未读消息数量加权 + unread_count = len(context.get_unread_messages()) + message_count_bonus = min(unread_count * 0.1, 0.3) # 最多30%加成 + + # 时间加权:最近活跃的流优先级更高 + current_time = time.time() + time_since_active = current_time - context.last_check_time + time_penalty = max(0, 1.0 - time_since_active / 3600.0) # 1小时内无惩罚 + + # 连续无回复惩罚 + if hasattr(context, 'chat_stream') and context.chat_stream: + consecutive_no_reply = context.chat_stream.consecutive_no_reply + no_reply_penalty = max(0, 1.0 - consecutive_no_reply * 0.05) # 每次无回复降低5% + else: + no_reply_penalty = 1.0 + + # 综合优先级计算 + final_priority = ( + base_priority * 0.6 + # 基础兴趣度权重60% + message_count_bonus * 0.2 + # 消息数量权重20% + time_penalty * 0.1 + # 时间权重10% + no_reply_penalty * 0.1 # 回复状态权重10% + ) + + return max(0.0, min(1.0, final_priority)) + + def _clear_all_unread_messages(self, context: StreamContext): + """清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读""" + unread_messages = context.get_unread_messages() + if not unread_messages: + return + + logger.warning(f"正在清除 {len(unread_messages)} 条未读消息") + + # 将所有未读消息标记为已读并移动到历史记录 + for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表 try: - # 思考和发送都在锁内,确保单次回复的原子性 - async with context.send_lock: - logger.debug(f"发送任务锁定聊天流 {context.stream_id},准备处理和回复") - - results = await self.chatter_manager.process_stream_context(context.stream_id, context, unread_messages) - - if results.get("success", False): - self._clear_specific_unread_messages(context, unread_messages) - logger.debug(f"聊天流 {context.stream_id} 并发处理成功,清除了 {len(unread_messages)} 条未读消息") - else: - logger.warning(f"聊天流 {context.stream_id} 并发处理失败: {results.get('error_message', '未知错误')}") - - reply_delay = random.uniform(1.5, 3.0) - await asyncio.sleep(reply_delay) - - logger.debug(f"发送任务解锁聊天流 {context.stream_id}") - - except asyncio.CancelledError: - logger.info(f"用户 {user_id} 的任务被取消") - self._clear_specific_unread_messages(context, unread_messages) # 取消时也清除消息 - raise + context.mark_message_as_read(msg.message_id) + self.stats.total_processed_messages += 1 + logger.debug(f"强制清除消息 {msg.message_id},标记为已读") except Exception as e: - logger.error(f"后台回复处理任务出错: {e}") - traceback.print_exc() - self._clear_specific_unread_messages(context, unread_messages) - finally: - if user_id and user_id in context.user_processing_tasks: - if context.user_processing_tasks[user_id] is asyncio.current_task(): - del context.user_processing_tasks[user_id] + logger.error(f"清除消息 {msg.message_id} 时出错: {e}") - def _clear_specific_unread_messages(self, context: StreamContext, messages_to_clear: list): - """清除指定上下文中的特定未读消息""" - if not messages_to_clear: - return - message_ids_to_clear = {msg.message_id for msg in messages_to_clear} - - context.unread_messages = [msg for msg in context.unread_messages if msg.message_id not in message_ids_to_clear] - - for msg in messages_to_clear: - context.history_messages.append(msg) - - if len(context.history_messages) > 100: - context.history_messages = context.history_messages[-100:] - - # 创建全局消息管理器实例 message_manager = MessageManager() diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 86b15bfb1..3ba341997 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -74,9 +74,20 @@ class ChatConfig(ValidatedConfigBase): """聊天配置类""" max_context_size: int = Field(default=18, description="最大上下文大小") + replyer_random_probability: float = Field(default=0.5, description="回复者随机概率") thinking_timeout: int = Field(default=40, description="思考超时时间") + talk_frequency: float = Field(default=1.0, description="聊天频率") + mentioned_bot_inevitable_reply: bool = Field(default=False, description="提到机器人的必然回复") + at_bot_inevitable_reply: bool = Field(default=False, description="@机器人的必然回复") allow_reply_self: bool = Field(default=False, description="是否允许回复自己说的话") talk_frequency_adjust: list[list[str]] = Field(default_factory=lambda: [], description="聊天频率调整") + focus_value: float = Field(default=1.0, description="专注值") + focus_mode_quiet_groups: List[str] = Field( + default_factory=list, + description='专注模式下需要保持安静的群组列表, 格式: ["platform:group_id1", "platform:group_id2"]', + ) + force_reply_private: bool = Field(default=False, description="强制回复私聊") + group_chat_mode: Literal["auto", "normal", "focus"] = Field(default="auto", description="群聊模式") timestamp_display_mode: Literal["normal", "normal_no_YMD", "relative"] = Field( default="normal_no_YMD", description="时间戳显示模式" ) @@ -117,57 +128,46 @@ class ChatConfig(ValidatedConfigBase): dynamic_distribution_jitter_factor: float = Field( default=0.2, ge=0.0, le=0.5, description="分发间隔随机扰动因子" ) - - # 并发消息处理 - concurrent_message_processing: bool = Field( - default=False, description="是否启用并发消息处理,在同一聊天流中并行处理多个消息" - ) - concurrent_per_user_limit: int = Field( - default=3, description="在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数" - ) - process_by_user_id: bool = Field( - default=True, description="在并发模式下,是否按用户ID进行独立串行处理" - ) - + def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float: """ 根据当前时间和聊天流获取对应的 talk_frequency - + Args: chat_stream_id: 聊天流ID,格式为 "platform:chat_id:type" - + Returns: float: 对应的频率值 """ if not self.talk_frequency_adjust: - return 1.0 - + return self.talk_frequency + # 优先检查聊天流特定的配置 if chat_stream_id: stream_frequency = self._get_stream_specific_frequency(chat_stream_id) if stream_frequency is not None: return stream_frequency - + # 检查全局时段配置(第一个元素为空字符串的配置) global_frequency = self._get_global_frequency() - return 1.0 if global_frequency is None else global_frequency - + return self.talk_frequency if global_frequency is None else global_frequency + def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]: """ 根据时间配置列表获取当前时段的频率 - + Args: time_freq_list: 时间频率配置列表,格式为 ["HH:MM,frequency", ...] - + Returns: float: 频率值,如果没有配置则返回 None """ from datetime import datetime - + current_time = datetime.now().strftime("%H:%M") current_hour, current_minute = map(int, current_time.split(":")) current_minutes = current_hour * 60 + current_minute - + # 解析时间频率配置 time_freq_pairs = [] for time_freq_str in time_freq_list: @@ -179,13 +179,13 @@ class ChatConfig(ValidatedConfigBase): time_freq_pairs.append((minutes, frequency)) except (ValueError, IndexError): continue - + if not time_freq_pairs: return None - + # 按时间排序 time_freq_pairs.sort(key=lambda x: x[0]) - + # 查找当前时间对应的频率 current_frequency = None for minutes, frequency in time_freq_pairs: @@ -193,20 +193,20 @@ class ChatConfig(ValidatedConfigBase): current_frequency = frequency else: break - + # 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑) if current_frequency is None and time_freq_pairs: current_frequency = time_freq_pairs[-1][1] - + return current_frequency - + def _get_stream_specific_frequency(self, chat_stream_id: str): """ 获取特定聊天流在当前时间的频率 - + Args: chat_stream_id: 聊天流ID(哈希值) - + Returns: float: 频率值,如果没有配置则返回 None """ @@ -214,30 +214,30 @@ class ChatConfig(ValidatedConfigBase): for config_item in self.talk_frequency_adjust: if not config_item or len(config_item) < 2: continue - + stream_config_str = config_item[0] # 例如 "qq:1026294844:group" - + # 解析配置字符串并生成对应的 chat_id config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str) if config_chat_id is None: continue - + # 比较生成的 chat_id if config_chat_id != chat_stream_id: continue - + # 使用通用的时间频率解析方法 return self._get_time_based_frequency(config_item[1:]) - + return None - + def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id - + Args: stream_config_str: 格式为 "platform:id:type" 的字符串 - + Returns: str: 生成的 chat_id,如果解析失败则返回 None """ @@ -245,42 +245,42 @@ class ChatConfig(ValidatedConfigBase): parts = stream_config_str.split(":") if len(parts) != 3: return None - + platform = parts[0] id_str = parts[1] stream_type = parts[2] - + # 判断是否为群聊 is_group = stream_type == "group" - + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id import hashlib - + if is_group: components = [platform, str(id_str)] else: components = [platform, str(id_str), "private"] key = "_".join(components) return hashlib.md5(key.encode()).hexdigest() - + except (ValueError, IndexError): return None - + def _get_global_frequency(self) -> Optional[float]: """ 获取全局默认频率配置 - + Returns: float: 频率值,如果没有配置则返回 None """ for config_item in self.talk_frequency_adjust: if not config_item or len(config_item) < 2: continue - + # 检查是否为全局默认配置(第一个元素为空字符串) if config_item[0] == "": return self._get_time_based_frequency(config_item[1:]) - + return None @@ -313,10 +313,10 @@ class ExpressionConfig(ValidatedConfigBase): def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ 解析流配置字符串并生成对应的 chat_id - + Args: stream_config_str: 格式为 "platform:id:type" 的字符串 - + Returns: str: 生成的 chat_id,如果解析失败则返回 None """ @@ -324,52 +324,52 @@ class ExpressionConfig(ValidatedConfigBase): parts = stream_config_str.split(":") if len(parts) != 3: return None - + platform = parts[0] id_str = parts[1] stream_type = parts[2] - + # 判断是否为群聊 is_group = stream_type == "group" - + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id import hashlib - + if is_group: components = [platform, str(id_str)] else: components = [platform, str(id_str), "private"] key = "_".join(components) return hashlib.md5(key.encode()).hexdigest() - + except (ValueError, IndexError): return None - + def get_expression_config_for_chat(self, chat_stream_id: Optional[str] = None) -> tuple[bool, bool, float]: """ 根据聊天流ID获取表达配置 - + Args: chat_stream_id: 聊天流ID,格式为哈希值 - + Returns: tuple: (是否使用表达, 是否学习表达, 学习间隔) """ if not self.rules: # 如果没有配置,使用默认值:启用表达,启用学习,强度1.0 return True, True, 1.0 - + # 优先检查聊天流特定的配置 if chat_stream_id: for rule in self.rules: if rule.chat_stream_id and self._parse_stream_config_to_chat_id(rule.chat_stream_id) == chat_stream_id: return rule.use_expression, rule.learn_expression, rule.learning_strength - + # 检查全局配置(chat_stream_id为空字符串的配置) for rule in self.rules: if rule.chat_stream_id == "": return rule.use_expression, rule.learn_expression, rule.learning_strength - + # 如果都没有匹配,返回默认值 return True, True, 1.0 @@ -443,7 +443,7 @@ class KeywordRuleConfig(ValidatedConfigBase): def __post_init__(self): import re - + if not self.keywords and not self.regex: raise ValueError("关键词规则必须至少包含keywords或regex中的一个") if not self.reaction: @@ -466,6 +466,7 @@ class CustomPromptConfig(ValidatedConfigBase): """自定义提示词配置类""" image_prompt: str = Field(default="", description="图片提示词") + planner_custom_prompt_enable: bool = Field(default=False, description="启用规划器自定义提示词") planner_custom_prompt_content: str = Field(default="", description="规划器自定义提示词内容") diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py index 35416a297..08f5f7098 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py @@ -53,13 +53,12 @@ class AffinityChatter(BaseChatter): } self.last_activity_time = time.time() - async def execute(self, context: StreamContext, unread_messages: list | None = None) -> dict: + async def execute(self, context: StreamContext) -> dict: """ 处理StreamContext对象 Args: context: StreamContext对象,包含聊天流的所有消息信息 - unread_messages: (可选) 指定要处理的未读消息列表,用于并发处理 Returns: 处理结果字典 @@ -69,12 +68,10 @@ class AffinityChatter(BaseChatter): learner = expression_learner_manager.get_expression_learner(self.stream_id) asyncio.create_task(learner.trigger_learning_for_chat()) - # 如果没有提供未读消息列表,则从上下文中获取 - if unread_messages is None: - unread_messages = context.get_unread_messages() + unread_messages = context.get_unread_messages() # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(context=context, unread_messages=unread_messages) + actions, target_message = await self.planner.plan(context=context) self.stats["plans_created"] += 1 # 执行动作(如果规划器返回了动作) diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index f484713f2..4793e2835 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -218,6 +218,10 @@ class ChatterPlanFilter: self.last_obs_time_mark = time.time() mentioned_bonus = "" + if global_config.chat.mentioned_bot_inevitable_reply: + mentioned_bonus = "\n- 有人提到你" + if global_config.chat.at_bot_inevitable_reply: + mentioned_bonus = "\n- 有人提到你,或者at你" if plan.mode == ChatMode.FOCUS: no_action_block = """ diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index ddf05b8f4..56211d80e 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -70,15 +70,12 @@ class ChatterActionPlanner: "other_actions_executed": 0, } - async def plan( - self, context: "StreamContext" = None, unread_messages: Optional[List[Dict]] = None - ) -> Tuple[List[Dict], Optional[Dict]]: + async def plan(self, context: "StreamContext" = None) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 Args: context (StreamContext): 包含聊天流消息的上下文对象。 - unread_messages (Optional[List[Dict]]): (可选) 指定要处理的未读消息列表,用于并发处理 Returns: Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: @@ -88,16 +85,14 @@ class ChatterActionPlanner: try: self.planner_stats["total_plans"] += 1 - return await self._enhanced_plan_flow(context, unread_messages) - + return await self._enhanced_plan_flow(context) + except Exception as e: logger.error(f"规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 return [], None - async def _enhanced_plan_flow( - self, context: "StreamContext", unread_messages: Optional[List[Dict]] = None - ) -> Tuple[List[Dict], Optional[Dict]]: + async def _enhanced_plan_flow(self, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]: """执行增强版规划流程""" try: # 在规划前,先进行动作修改 @@ -111,10 +106,7 @@ class ChatterActionPlanner: # 确保Plan中包含所有当前可用的动作 initial_plan.available_actions = self.action_manager.get_using_actions() - # 如果没有提供未读消息列表,则从上下文中获取 - if unread_messages is None: - unread_messages = context.get_unread_messages() if context else [] - + unread_messages = context.get_unread_messages() if context else [] # 2. 兴趣度评分 - 只对未读消息进行评分 if unread_messages: bot_nickname = global_config.bot.nickname diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index e9cc25b9c..c298ecc16 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.0.4" +version = "7.0.2" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -132,13 +132,6 @@ dynamic_distribution_min_interval = 1.0 # 最小分发间隔(秒) dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒) dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子 -# 是否启用并发消息处理,在同一聊天流中并行处理多个消息 -concurrent_message_processing = false -# 在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数 -concurrent_per_user_limit = 3 -# 在并发模式下,是否按用户ID进行独立串行处理 -process_by_user_id = true - talk_frequency_adjust = [ ["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"], ["qq:114514:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"], From 99595f239d16ebe641785c94a870b973e449c2cc Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 26 Sep 2025 02:10:43 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat(chat):=20=E5=AE=9E=E7=8E=B0=20focus=5F?= =?UTF-8?q?energy=20=E7=9A=84=E5=AE=9E=E6=97=B6=E6=9B=B4=E6=96=B0=E4=B8=8E?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为了解决 `focus_energy` 更新不及时,导致其无法准确反映当前对话兴趣度的问题,本次提交引入了一套新的实时更新与同步机制。 这确保了每当消息的兴趣度发生变化时,代表机器人注意力的 `focus_energy` 也能被立即重新计算和更新,使决策更加精准。 主要变更: 1. **手动更新**: 在 `ChatStream` 中新增 `update_focus_energy` 方法,允许外部逻辑在需要时手动触发 `focus_energy` 的重新计算。 2. **实时计算**: `ChatterActionPlanner` 在评估并更新消息兴趣度后,会立即调用 `update_focus_energy`,确保了兴趣度到注意力的即时传导。 3. **状态同步**: `ChatterManager` 在完成一次执行后,会主动将 `mood_manager` 中可能已更新的 `chat_stream` 同步回当前的 `StreamContext`,保证了整个处理流中数据的一致性。 --- src/chat/chatter_manager.py | 10 ++++++++++ src/chat/message_receive/chat_stream.py | 4 ++++ src/plugins/built_in/affinity_flow_chatter/planner.py | 4 +++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index c906fd901..be70f4969 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -107,6 +107,16 @@ class ChatterManager: result = await self.instances[stream_id].execute(context) self.stats["successful_executions"] += 1 + # 从 mood_manager 获取最新的 chat_stream 并同步回 StreamContext + try: + from src.mood.mood_manager import mood_manager + mood = mood_manager.get_mood_by_chat_id(stream_id) + if mood and mood.chat_stream: + context.chat_stream = mood.chat_stream + logger.debug(f"已将最新的 chat_stream 同步回流 {stream_id} 的 StreamContext") + except Exception as sync_e: + logger.error(f"同步 chat_stream 回 StreamContext 失败: {sync_e}") + # 记录处理结果 success = result.get("success", False) actions_count = result.get("actions_count", 0) diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index 5eff1f493..dad71a41a 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -257,6 +257,10 @@ class ChatStream: self.last_interaction_time = time.time() self.focus_energy = self._calculate_dynamic_focus_energy() + def update_focus_energy(self): + """手动触发更新focus_energy""" + self.focus_energy = self._calculate_dynamic_focus_energy() + def record_action(self, is_reply: bool = False): """记录动作执行""" self.action_count += 1 diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index 56211d80e..3c0b20c8f 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -135,7 +135,9 @@ class ChatterActionPlanner: chat_mood = mood_manager.get_mood_by_chat_id(self.chat_id) if hasattr(chat_mood, 'chat_stream') and chat_mood.chat_stream: chat_mood.chat_stream.add_message_interest(score) - logger.debug(f"已更新聊天 {self.chat_id} 的ChatStream兴趣度,分数: {score:.3f}") + # 在这里同步更新 focus_energy + chat_mood.chat_stream.update_focus_energy() + logger.debug(f"已更新聊天 {self.chat_id} 的ChatStream兴趣度和Focus Energy,分数: {score:.3f}") # base_threshold = self.interest_scoring.reply_threshold # 检查兴趣度是否达到非回复动作阈值 From ef8d080228c82ebd9712e937572103f86d8c7864 Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 26 Sep 2025 02:14:48 +0800 Subject: [PATCH 4/4] =?UTF-8?q?refactor(chat):=20=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E6=95=B0=E6=8D=AE=E4=B8=AD=E7=9A=84=20relati?= =?UTF-8?q?onship=5Fscore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chat/message_receive/chat_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index dad71a41a..c59a6bbd7 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -534,7 +534,6 @@ class ChatManager: "action_count": s_data_dict.get("action_count", 0), "reply_count": s_data_dict.get("reply_count", 0), "last_interaction_time": s_data_dict.get("last_interaction_time", time.time()), - "relationship_score": s_data_dict.get("relationship_score", 0.3), "consecutive_no_reply": s_data_dict.get("consecutive_no_reply", 0), }