diff --git a/plugins/napcat_adapter_plugin/plugin.py b/plugins/napcat_adapter_plugin/plugin.py index b5f31e167..48ae8603d 100644 --- a/plugins/napcat_adapter_plugin/plugin.py +++ b/plugins/napcat_adapter_plugin/plugin.py @@ -7,7 +7,6 @@ from . import event_types, CONSTS, event_handlers from typing import List from src.plugin_system import BasePlugin, BaseEventHandler, register_plugin, EventType, ConfigField -from src.plugin_system.base.base_event import HandlerResult from src.plugin_system.core.event_manager import event_manager from src.common.logger import get_logger @@ -37,6 +36,7 @@ def get_classes_in_module(module): classes.append(member) return classes + async def message_recv(server_connection: Server.ServerConnection): await message_handler.set_server_connection(server_connection) asyncio.create_task(notice_handler.set_server_connection(server_connection)) @@ -47,7 +47,7 @@ async def message_recv(server_connection: Server.ServerConnection): try: # 首先尝试解析原始消息 decoded_raw_message: dict = json.loads(raw_message) - + # 检查是否是切片消息 (来自 MMC) if chunker.is_chunk_message(decoded_raw_message): logger.debug("接收到切片消息,尝试重组") @@ -61,14 +61,14 @@ async def message_recv(server_connection: Server.ServerConnection): # 切片尚未完整,继续等待更多切片 logger.debug("等待更多切片...") continue - + # 处理完整消息(可能是重组后的,也可能是原本就完整的) post_type = decoded_raw_message.get("post_type") if post_type in ["meta_event", "message", "notice"]: await message_queue.put(decoded_raw_message) elif post_type is None: await put_response(decoded_raw_message) - + except json.JSONDecodeError as e: logger.error(f"消息解析失败: {e}") logger.debug(f"原始消息: {raw_message[:500]}...") @@ -76,6 +76,7 @@ async def message_recv(server_connection: Server.ServerConnection): logger.error(f"处理消息时出错: {e}") logger.debug(f"原始消息: {raw_message[:500]}...") + async def message_process(): """消息处理主循环""" logger.info("消息处理器已启动") @@ -84,7 +85,7 @@ async def message_process(): try: # 使用超时等待,以便能够响应取消请求 message = await asyncio.wait_for(message_queue.get(), timeout=1.0) - + post_type = message.get("post_type") if post_type == "message": await message_handler.handle_raw_message(message) @@ -94,10 +95,10 @@ async def message_process(): await notice_handler.handle_notice(message) else: logger.warning(f"未知的post_type: {post_type}") - + message_queue.task_done() await asyncio.sleep(0.05) - + except asyncio.TimeoutError: # 超时是正常的,继续循环 continue @@ -112,7 +113,7 @@ async def message_process(): except ValueError: pass await asyncio.sleep(0.1) - + except asyncio.CancelledError: logger.info("消息处理器已停止") raise @@ -132,6 +133,7 @@ async def message_process(): except Exception as e: logger.debug(f"清理消息队列时出错: {e}") + async def napcat_server(): """启动 Napcat WebSocket 连接(支持正向和反向连接)""" mode = global_config.napcat_server.mode @@ -143,63 +145,61 @@ async def napcat_server(): logger.error(f"启动 WebSocket 连接失败: {e}") raise + async def graceful_shutdown(): """优雅关闭所有组件""" try: logger.info("正在关闭adapter...") - + # 停止消息重组器的清理任务 try: await reassembler.stop_cleanup_task() except Exception as e: logger.warning(f"停止消息重组器清理任务时出错: {e}") - + # 停止功能管理器文件监控 try: await features_manager.stop_file_watcher() except Exception as e: logger.warning(f"停止功能管理器文件监控时出错: {e}") - + # 关闭消息处理器(包括消息缓冲器) try: await message_handler.shutdown() except Exception as e: logger.warning(f"关闭消息处理器时出错: {e}") - + # 关闭 WebSocket 连接 try: await websocket_manager.stop_connection() except Exception as e: logger.warning(f"关闭WebSocket连接时出错: {e}") - + # 关闭 MaiBot 连接 try: await mmc_stop_com() except Exception as e: logger.warning(f"关闭MaiBot连接时出错: {e}") - + # 取消所有剩余任务 current_task = asyncio.current_task() tasks = [t for t in asyncio.all_tasks() if t is not current_task and not t.done()] - + if tasks: logger.info(f"正在取消 {len(tasks)} 个剩余任务...") for task in tasks: task.cancel() - + # 等待任务取消完成,忽略 CancelledError try: - await asyncio.wait_for( - asyncio.gather(*tasks, return_exceptions=True), - timeout=10 - ) + await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=10) except asyncio.TimeoutError: logger.warning("部分任务取消超时") except Exception as e: logger.debug(f"任务取消过程中的异常(可忽略): {e}") - + logger.info("Adapter已成功关闭") - + except Exception as e: logger.error(f"Adapter关闭中出现错误: {e}") finally: @@ -214,6 +214,7 @@ async def graceful_shutdown(): except Exception: pass + class LauchNapcatAdapterHandler(BaseEventHandler): """自动启动Adapter""" @@ -245,6 +246,7 @@ class LauchNapcatAdapterHandler(BaseEventHandler): asyncio.create_task(message_process()) asyncio.create_task(check_timeout_response()) + class StopNapcatAdapterHandler(BaseEventHandler): """关闭Adapter""" @@ -257,7 +259,7 @@ class StopNapcatAdapterHandler(BaseEventHandler): async def execute(self, kwargs): await graceful_shutdown() return - + @register_plugin class NapcatAdapterPlugin(BasePlugin): @@ -295,7 +297,7 @@ class NapcatAdapterPlugin(BasePlugin): def get_plugin_components(self): self.register_events() - + components = [] components.append((LauchNapcatAdapterHandler.get_handler_info(), LauchNapcatAdapterHandler)) components.append((StopNapcatAdapterHandler.get_handler_info(), StopNapcatAdapterHandler)) diff --git a/plugins/napcat_adapter_plugin/src/config/official_configs.py b/plugins/napcat_adapter_plugin/src/config/official_configs.py index ba2c19e12..35bac1d57 100644 --- a/plugins/napcat_adapter_plugin/src/config/official_configs.py +++ b/plugins/napcat_adapter_plugin/src/config/official_configs.py @@ -58,14 +58,16 @@ class VoiceConfig(ConfigBase): use_tts: bool = False """是否启用TTS功能""" + @dataclass class SlicingConfig(ConfigBase): max_frame_size: int = 64 """WebSocket帧的最大大小,单位为字节,默认64KB""" - + delay_ms: int = 10 """切片发送间隔时间,单位为毫秒""" - + + @dataclass class DebugConfig(ConfigBase): level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" diff --git a/plugins/napcat_adapter_plugin/src/message_chunker.py b/plugins/napcat_adapter_plugin/src/message_chunker.py index 9ba27f3c0..f4e150711 100644 --- a/plugins/napcat_adapter_plugin/src/message_chunker.py +++ b/plugins/napcat_adapter_plugin/src/message_chunker.py @@ -3,6 +3,7 @@ 用于在 Ada 发送给 MMC 时进行消息切片,利用 WebSocket 协议的自动重组特性 仅在 Ada -> MMC 方向进行切片,其他方向(MMC -> Ada,Ada <-> Napcat)不切片 """ + import json import uuid import asyncio @@ -15,10 +16,9 @@ from src.common.logger import get_logger logger = get_logger("napcat_adapter") - class MessageChunker: """消息切片器,用于处理大消息的分片发送""" - + def __init__(self): self.max_chunk_size = global_config.slicing.max_frame_size * 1024 @@ -29,19 +29,21 @@ class MessageChunker: message_str = json.dumps(message, ensure_ascii=False) else: message_str = message - return len(message_str.encode('utf-8')) > self.max_chunk_size + return len(message_str.encode("utf-8")) > self.max_chunk_size except Exception as e: logger.error(f"检查消息大小时出错: {e}") return False - - def chunk_message(self, message: Union[str, Dict[str, Any]], chunk_id: Optional[str] = None) -> List[Dict[str, Any]]: + + def chunk_message( + self, message: Union[str, Dict[str, Any]], chunk_id: Optional[str] = None + ) -> List[Dict[str, Any]]: """ 将消息切片 - + Args: message: 要切片的消息(字符串或字典) chunk_id: 切片组ID,如果不提供则自动生成 - + Returns: 切片后的消息字典列表 """ @@ -51,30 +53,30 @@ class MessageChunker: message_str = json.dumps(message, ensure_ascii=False) else: message_str = message - + if not self.should_chunk_message(message_str): # 不需要切片的情况,如果输入是字典则返回字典,如果是字符串则包装成非切片标记的字典 if isinstance(message, dict): return [message] else: return [{"_original_message": message_str}] - + if chunk_id is None: chunk_id = str(uuid.uuid4()) - - message_bytes = message_str.encode('utf-8') + + message_bytes = message_str.encode("utf-8") total_size = len(message_bytes) - + # 计算需要多少个切片 num_chunks = (total_size + self.max_chunk_size - 1) // self.max_chunk_size - + chunks = [] for i in range(num_chunks): start_pos = i * self.max_chunk_size end_pos = min(start_pos + self.max_chunk_size, total_size) - + chunk_data = message_bytes[start_pos:end_pos] - + # 构建切片消息 chunk_message = { "__mmc_chunk_info__": { @@ -83,17 +85,17 @@ class MessageChunker: "total_chunks": num_chunks, "chunk_size": len(chunk_data), "total_size": total_size, - "timestamp": time.time() + "timestamp": time.time(), }, - "__mmc_chunk_data__": chunk_data.decode('utf-8', errors='ignore'), - "__mmc_is_chunked__": True + "__mmc_chunk_data__": chunk_data.decode("utf-8", errors="ignore"), + "__mmc_is_chunked__": True, } - + chunks.append(chunk_message) - + logger.debug(f"消息切片完成: {total_size} bytes -> {num_chunks} chunks (ID: {chunk_id})") return chunks - + except Exception as e: logger.error(f"消息切片时出错: {e}") # 出错时返回原消息 @@ -101,7 +103,7 @@ class MessageChunker: return [message] else: return [{"_original_message": message}] - + def is_chunk_message(self, message: Union[str, Dict[str, Any]]) -> bool: """判断是否是切片消息""" try: @@ -109,12 +111,12 @@ class MessageChunker: data = json.loads(message) else: data = message - + return ( - isinstance(data, dict) and - "__mmc_chunk_info__" in data and - "__mmc_chunk_data__" in data and - "__mmc_is_chunked__" in data + isinstance(data, dict) + and "__mmc_chunk_info__" in data + and "__mmc_chunk_data__" in data + and "__mmc_is_chunked__" in data ) except (json.JSONDecodeError, TypeError): return False @@ -122,17 +124,17 @@ class MessageChunker: class MessageReassembler: """消息重组器,用于重组接收到的切片消息""" - + def __init__(self, timeout: int = 30): self.timeout = timeout self.chunk_buffers: Dict[str, Dict[str, Any]] = {} self._cleanup_task = None - + async def start_cleanup_task(self): """启动清理任务""" if self._cleanup_task is None: self._cleanup_task = asyncio.create_task(self._cleanup_expired_chunks()) - + async def stop_cleanup_task(self): """停止清理任务""" if self._cleanup_task: @@ -142,35 +144,35 @@ class MessageReassembler: except asyncio.CancelledError: pass self._cleanup_task = None - + async def _cleanup_expired_chunks(self): """清理过期的切片缓冲区""" while True: try: await asyncio.sleep(10) # 每10秒检查一次 current_time = time.time() - + expired_chunks = [] for chunk_id, buffer_info in self.chunk_buffers.items(): - if current_time - buffer_info['timestamp'] > self.timeout: + if current_time - buffer_info["timestamp"] > self.timeout: expired_chunks.append(chunk_id) - + for chunk_id in expired_chunks: logger.warning(f"清理过期的切片缓冲区: {chunk_id}") del self.chunk_buffers[chunk_id] - + except asyncio.CancelledError: break except Exception as e: logger.error(f"清理过期切片时出错: {e}") - + async def add_chunk(self, message: Union[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ 添加切片,如果切片完整则返回重组后的消息 - + Args: message: 切片消息(字符串或字典) - + Returns: 如果切片完整则返回重组后的原始消息字典,否则返回None """ @@ -180,7 +182,7 @@ class MessageReassembler: chunk_data = json.loads(message) else: chunk_data = message - + # 检查是否是切片消息 if not chunker.is_chunk_message(chunk_data): # 不是切片消息,直接返回 @@ -192,38 +194,38 @@ class MessageReassembler: return {"text_message": chunk_data["_original_message"]} else: return chunk_data - + chunk_info = chunk_data["__mmc_chunk_info__"] chunk_content = chunk_data["__mmc_chunk_data__"] - + chunk_id = chunk_info["chunk_id"] chunk_index = chunk_info["chunk_index"] total_chunks = chunk_info["total_chunks"] chunk_timestamp = chunk_info.get("timestamp", time.time()) - + # 初始化缓冲区 if chunk_id not in self.chunk_buffers: self.chunk_buffers[chunk_id] = { "chunks": {}, "total_chunks": total_chunks, "received_chunks": 0, - "timestamp": chunk_timestamp + "timestamp": chunk_timestamp, } - + buffer = self.chunk_buffers[chunk_id] - + # 检查切片是否已经接收过 if chunk_index in buffer["chunks"]: logger.warning(f"重复接收切片: {chunk_id}#{chunk_index}") return None - + # 添加切片 buffer["chunks"][chunk_index] = chunk_content buffer["received_chunks"] += 1 buffer["timestamp"] = time.time() # 更新时间戳 - + logger.debug(f"接收切片: {chunk_id}#{chunk_index} ({buffer['received_chunks']}/{total_chunks})") - + # 检查是否接收完整 if buffer["received_chunks"] == total_chunks: # 重组消息 @@ -233,25 +235,25 @@ class MessageReassembler: logger.error(f"切片 {chunk_id}#{i} 缺失,无法重组") return None reassembled_message += buffer["chunks"][i] - + # 清理缓冲区 del self.chunk_buffers[chunk_id] - + logger.debug(f"消息重组完成: {chunk_id} ({len(reassembled_message)} chars)") - + # 尝试反序列化重组后的消息 try: return json.loads(reassembled_message) except json.JSONDecodeError: # 如果不能反序列化为JSON,则作为文本消息返回 return {"text_message": reassembled_message} - + return None - + except (json.JSONDecodeError, KeyError, TypeError) as e: logger.error(f"处理切片消息时出错: {e}") return None - + def get_pending_chunks_info(self) -> Dict[str, Any]: """获取待处理切片信息""" info = {} @@ -260,11 +262,11 @@ class MessageReassembler: "received": buffer["received_chunks"], "total": buffer["total_chunks"], "progress": f"{buffer['received_chunks']}/{buffer['total_chunks']}", - "age_seconds": time.time() - buffer["timestamp"] + "age_seconds": time.time() - buffer["timestamp"], } return info # 全局实例 chunker = MessageChunker() -reassembler = MessageReassembler() \ No newline at end of file +reassembler = MessageReassembler() diff --git a/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py b/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py index aad211106..1bd34ceac 100644 --- a/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py +++ b/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py @@ -743,31 +743,31 @@ class MessageHandler: """ message_data: dict = raw_message.get("data", {}) json_data = message_data.get("data", "") - + # 检查JSON消息格式 if not message_data or "data" not in message_data: logger.warning("JSON消息格式不正确") return Seg(type="json", data=json.dumps(message_data)) - + try: nested_data = json.loads(json_data) - + # 检查是否是QQ小程序分享消息 if "app" in nested_data and "com.tencent.miniapp" in str(nested_data.get("app", "")): logger.debug("检测到QQ小程序分享消息,开始提取信息") - + # 提取目标字段 extracted_info = {} - + # 提取 meta.detail_1 中的信息 meta = nested_data.get("meta", {}) detail_1 = meta.get("detail_1", {}) - + if detail_1: extracted_info["title"] = detail_1.get("title", "") extracted_info["desc"] = detail_1.get("desc", "") qqdocurl = detail_1.get("qqdocurl", "") - + # 从qqdocurl中提取b23.tv短链接 if qqdocurl and "b23.tv" in qqdocurl: # 查找b23.tv链接的起始位置 @@ -785,26 +785,29 @@ class MessageHandler: extracted_info["short_url"] = qqdocurl else: extracted_info["short_url"] = qqdocurl - + # 如果成功提取到关键信息,返回格式化的文本 if extracted_info.get("title") or extracted_info.get("desc") or extracted_info.get("short_url"): content_parts = [] - + if extracted_info.get("title"): content_parts.append(f"来源: {extracted_info['title']}") - + if extracted_info.get("desc"): content_parts.append(f"标题: {extracted_info['desc']}") - + if extracted_info.get("short_url"): content_parts.append(f"链接: {extracted_info['short_url']}") - + formatted_content = "\n".join(content_parts) - return Seg(type="text", data=f"这是一条小程序分享消息,可以根据来源,考虑使用对应解析工具\n{formatted_content}") - + return Seg( + type="text", + data=f"这是一条小程序分享消息,可以根据来源,考虑使用对应解析工具\n{formatted_content}", + ) + # 如果没有提取到关键信息,返回None return None - + except json.JSONDecodeError as e: logger.error(f"解析JSON消息失败: {e}") return None diff --git a/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py b/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py index b08b8a77e..653fe5444 100644 --- a/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py +++ b/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py @@ -1,4 +1,3 @@ -import json import asyncio from src.common.logger import get_logger @@ -28,36 +27,36 @@ class MessageSending: try: # 检查是否需要切片发送 message_dict = message_base.to_dict() - + if chunker.should_chunk_message(message_dict): - logger.info(f"消息过大,进行切片发送到 MaiBot") - + logger.info("消息过大,进行切片发送到 MaiBot") + # 切片消息 chunks = chunker.chunk_message(message_dict) - + # 逐个发送切片 for i, chunk in enumerate(chunks): - logger.debug(f"发送切片 {i+1}/{len(chunks)} 到 MaiBot") - + logger.debug(f"发送切片 {i + 1}/{len(chunks)} 到 MaiBot") + # 获取对应的客户端并发送切片 platform = message_base.message_info.platform if platform not in self.maibot_router.clients: logger.error(f"平台 {platform} 未连接") return False - + client = self.maibot_router.clients[platform] send_status = await client.send_message(chunk) - + if not send_status: - logger.error(f"发送切片 {i+1}/{len(chunks)} 失败") + logger.error(f"发送切片 {i + 1}/{len(chunks)} 失败") return False - + # 使用配置中的延迟时间 if i < len(chunks) - 1: delay_seconds = global_config.slicing.delay_ms / 1000.0 logger.debug(f"切片发送延迟: {global_config.slicing.delay_ms}毫秒") await asyncio.sleep(delay_seconds) - + logger.debug("所有切片发送完成") return True else: @@ -66,7 +65,7 @@ class MessageSending: if not send_status: raise RuntimeError("可能是路由未正确配置或连接异常") return send_status - + except Exception as e: logger.error(f"发送消息失败: {str(e)}") logger.error("请检查与MaiBot之间的连接") diff --git a/src/chat/chat_loop/cycle_processor.py b/src/chat/chat_loop/cycle_processor.py index b35905361..b446c697a 100644 --- a/src/chat/chat_loop/cycle_processor.py +++ b/src/chat/chat_loop/cycle_processor.py @@ -5,7 +5,6 @@ import math import random from typing import Optional, Dict, Any, Tuple -from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.utils.timer_calculator import Timer from src.common.logger import get_logger from src.config.config import global_config @@ -32,7 +31,7 @@ class CycleProcessor: context: HFC聊天上下文对象,包含聊天流、能量值等信息 response_handler: 响应处理器,负责生成和发送回复 cycle_tracker: 循环跟踪器,负责记录和管理每次思考循环的信息 - """ + """ self.context = context self.response_handler = response_handler self.cycle_tracker = cycle_tracker @@ -57,12 +56,12 @@ class CycleProcessor: # 存储reply action信息 person_info_manager = get_person_info_manager() - + # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 platform = action_message.get("chat_info_platform") if platform is None: platform = getattr(self.context.chat_stream, "platform", "unknown") - + person_id = person_info_manager.get_person_id( platform, action_message.get("user_id", ""), @@ -94,8 +93,8 @@ class CycleProcessor: } return loop_info, reply_text, cycle_timers - - async def observe(self,interest_value:float = 0.0) -> bool: + + async def observe(self, interest_value: float = 0.0) -> str: """ 观察和处理单次思考循环的核心方法 @@ -114,7 +113,7 @@ class CycleProcessor: """ action_type = "no_action" reply_text = "" # 初始化reply_text变量,避免UnboundLocalError - + # 使用sigmoid函数将interest_value转换为概率 # 当interest_value为0时,概率接近0(使用Focus模式) # 当interest_value很高时,概率接近1(使用Normal模式) @@ -127,16 +126,24 @@ class CycleProcessor: k = 2.0 # 控制曲线陡峭程度 x0 = 1.0 # 控制曲线中心点 return 1.0 / (1.0 + math.exp(-k * (interest_val - x0))) - - normal_mode_probability = calculate_normal_mode_probability(interest_value) * 0.5 / global_config.chat.get_current_talk_frequency(self.context.stream_id) - + + normal_mode_probability = ( + calculate_normal_mode_probability(interest_value) + * 0.5 + / global_config.chat.get_current_talk_frequency(self.context.stream_id) + ) + # 根据概率决定使用哪种模式 if random.random() < normal_mode_probability: mode = ChatMode.NORMAL - logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Normal planner模式") + logger.info( + f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Normal planner模式" + ) else: mode = ChatMode.FOCUS - logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Focus planner模式") + logger.info( + f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f},选择Focus planner模式" + ) cycle_timers, thinking_id = self.cycle_tracker.start_cycle() logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考") @@ -165,12 +172,14 @@ class CycleProcessor: from src.plugin_system.core.event_manager import event_manager from src.plugin_system import EventType - result = await event_manager.trigger_event(EventType.ON_PLAN,plugin_name="SYSTEM", stream_id=self.context.chat_stream) + result = await event_manager.trigger_event( + EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.chat_stream + ) if not result.all_continue_process(): raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成") - + with Timer("规划器", cycle_timers): - actions, _= await self.action_planner.plan( + actions, _ = await self.action_planner.plan( mode=mode, loop_start_time=loop_start_time, available_actions=available_actions, @@ -183,7 +192,7 @@ class CycleProcessor: # 直接处理no_reply逻辑,不再通过动作系统 reason = action_info.get("reasoning", "选择不回复") logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}") - + # 存储no_reply信息到数据库 await database_api.store_action_info( chat_stream=self.context.chat_stream, @@ -194,13 +203,8 @@ class CycleProcessor: action_data={"reason": reason}, action_name="no_reply", ) - - return { - "action_type": "no_reply", - "success": True, - "reply_text": "", - "command": "" - } + + return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} elif action_info["action_type"] != "reply": # 执行普通动作 with Timer("动作执行", cycle_timers): @@ -210,40 +214,32 @@ class CycleProcessor: action_info["action_data"], cycle_timers, thinking_id, - action_info["action_message"] + action_info["action_message"], ) return { "action_type": action_info["action_type"], "success": success, "reply_text": reply_text, - "command": command + "command": command, } else: try: success, response_set, _ = await generator_api.generate_reply( chat_stream=self.context.chat_stream, - reply_message = action_info["action_message"], + reply_message=action_info["action_message"], available_actions=available_actions, enable_tool=global_config.tool.enable_tool, request_type="chat.replyer", from_plugin=False, - ) + ) if not success or not response_set: - logger.info(f"对 {action_info['action_message'].get('processed_plain_text')} 的回复生成失败") - return { - "action_type": "reply", - "success": False, - "reply_text": "", - "loop_info": None - } + logger.info( + f"对 {action_info['action_message'].get('processed_plain_text')} 的回复生成失败" + ) + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} except asyncio.CancelledError: logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消") - return { - "action_type": "reply", - "success": False, - "reply_text": "", - "loop_info": None - } + return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None} loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply( response_set, @@ -253,12 +249,7 @@ class CycleProcessor: thinking_id, actions, ) - return { - "action_type": "reply", - "success": True, - "reply_text": reply_text, - "loop_info": loop_info - } + return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info} except Exception as e: logger.error(f"{self.log_prefix} 执行动作时出错: {e}") logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}") @@ -267,9 +258,9 @@ class CycleProcessor: "success": False, "reply_text": "", "loop_info": None, - "error": str(e) + "error": str(e), } - + # 创建所有动作的后台任务 action_tasks = [asyncio.create_task(execute_action(action)) for action in actions] @@ -282,12 +273,12 @@ class CycleProcessor: action_success = False action_reply_text = "" action_command = "" - + for i, result in enumerate(results): if isinstance(result, BaseException): logger.error(f"{self.log_prefix} 动作执行异常: {result}") continue - + action_info = actions[i] if result["action_type"] != "reply": action_success = result["success"] @@ -327,7 +318,7 @@ class CycleProcessor: }, } reply_text = action_reply_text - + if ENABLE_S4U: await stop_typing() @@ -335,226 +326,7 @@ class CycleProcessor: self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers) action_type = actions[0]["action_type"] if actions else "no_action" - # 管理no_reply计数器:当执行了非no_reply动作时,重置计数器 - if action_type != "no_reply": - # no_reply逻辑已集成到heartFC_chat.py中,直接重置计数器 - self.context.chat_instance.recent_interest_records.clear() - self.context.no_reply_consecutive = 0 - logger.debug(f"{self.log_prefix} 执行了{action_type}动作,重置no_reply计数器") - return True - - if action_type == "no_reply": - self.context.no_reply_consecutive += 1 - self.context.chat_instance._determine_form_type() - - # 在一轮动作执行完毕后,增加睡眠压力 - if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system: - if action_type not in ["no_reply", "no_action"]: - self.context.energy_manager.increase_sleep_pressure() - - return True - - async def execute_plan(self, action_result: Dict[str, Any], target_message: Optional[Dict[str, Any]]): - """ - 执行一个已经制定好的计划 - """ - action_type = action_result.get("action_type", "error") - - # 这里我们需要为执行计划创建一个新的循环追踪 - cycle_timers, thinking_id = self.cycle_tracker.start_cycle(is_proactive=True) - loop_start_time = time.time() - - if action_type == "reply": - # 主动思考不应该直接触发简单回复,但为了逻辑完整性,我们假设它会调用response_handler - # 注意:这里的 available_actions 和 plan_result 是缺失的,需要根据实际情况处理 - await self._handle_reply_action( - target_message, {}, None, loop_start_time, cycle_timers, thinking_id, {"action_result": action_result} - ) - else: - await self._handle_other_actions( - action_type, - action_result.get("reasoning", ""), - action_result.get("action_data", {}), - action_result.get("is_parallel", False), - None, - target_message, - cycle_timers, - thinking_id, - {"action_result": action_result}, - loop_start_time, - ) - - async def _handle_reply_action( - self, message_data, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result - ): - """ - 处理回复类型的动作 - - Args: - message_data: 消息数据 - available_actions: 可用动作列表 - gen_task: 预先创建的生成任务(可能为None) - loop_start_time: 循环开始时间 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - - 功能说明: - - 根据聊天模式决定是否使用预生成的回复或实时生成 - - 在NORMAL模式下使用异步生成提高效率 - - 在FOCUS模式下同步生成确保及时响应 - - 发送生成的回复并结束循环 - """ - # 初始化reply_to_str以避免UnboundLocalError - reply_to_str = None - - if self.context.loop_mode == ChatMode.NORMAL: - if not gen_task: - reply_to_str = await self._build_reply_to_str(message_data) - gen_task = asyncio.create_task( - self.response_handler.generate_response( - message_data=message_data, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.normal", - ) - ) - else: - # 如果gen_task已存在但reply_to_str还未构建,需要构建它 - if reply_to_str is None: - reply_to_str = await self._build_reply_to_str(message_data) - - try: - response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout) - except asyncio.TimeoutError: - response_set = None - else: - reply_to_str = await self._build_reply_to_str(message_data) - response_set = await self.response_handler.generate_response( - message_data=message_data, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.focus", - ) - - if response_set: - loop_info, _, _ = await self.response_handler.generate_and_send_reply( - response_set, reply_to_str, loop_start_time, message_data, cycle_timers, thinking_id, plan_result - ) - self.cycle_tracker.end_cycle(loop_info, cycle_timers) - - async def _handle_other_actions( - self, - action_type, - reasoning, - action_data, - is_parallel, - gen_task, - action_message, - cycle_timers, - thinking_id, - plan_result, - loop_start_time, - ): - """ - 处理非回复类型的动作(如no_reply、自定义动作等) - - Args: - action_type: 动作类型 - reasoning: 动作理由 - action_data: 动作数据 - is_parallel: 是否并行执行 - gen_task: 生成任务 - action_message: 动作消息 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - loop_start_time: 循环开始时间 - - 功能说明: - - 在NORMAL模式下可能并行执行回复生成和动作处理 - - 等待所有异步任务完成 - - 整合回复和动作的执行结果 - - 构建最终循环信息并结束循环 - """ - background_reply_task = None - if self.context.loop_mode == ChatMode.NORMAL and is_parallel and gen_task: - background_reply_task = asyncio.create_task( - self._handle_parallel_reply( - gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result - ) - ) - - background_action_task = asyncio.create_task( - self._handle_action(action_type, reasoning, action_data, cycle_timers, thinking_id, action_message) - ) - - reply_loop_info, action_success, action_reply_text, action_command = None, False, "", "" - - if background_reply_task: - results = await asyncio.gather(background_reply_task, background_action_task, return_exceptions=True) - reply_result, action_result_val = results - if not isinstance(reply_result, BaseException) and reply_result is not None: - reply_loop_info, _, _ = reply_result - else: - reply_loop_info = None - - if not isinstance(action_result_val, BaseException) and action_result_val is not None: - action_success, action_reply_text, action_command = action_result_val - else: - action_success, action_reply_text, action_command = False, "", "" - else: - results = await asyncio.gather(background_action_task, return_exceptions=True) - if results and len(results) > 0: - action_result_val = results[0] # Get the actual result from the tuple - else: - action_result_val = (False, "", "") - - if not isinstance(action_result_val, BaseException) and action_result_val is not None: - action_success, action_reply_text, action_command = action_result_val - else: - action_success, action_reply_text, action_command = False, "", "" - - loop_info = self._build_final_loop_info( - reply_loop_info, action_success, action_reply_text, action_command, plan_result - ) - self.cycle_tracker.end_cycle(loop_info, cycle_timers) - - async def _handle_parallel_reply( - self, gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result - ): - """ - 处理并行回复生成 - - Args: - gen_task: 回复生成任务 - loop_start_time: 循环开始时间 - action_message: 动作消息 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - - Returns: - tuple: (循环信息, 回复文本, 计时器信息) 或 None - - 功能说明: - - 等待并行回复生成任务完成(带超时) - - 构建回复目标字符串 - - 发送生成的回复 - - 返回循环信息供上级方法使用 - """ - try: - response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout) - except asyncio.TimeoutError: - return None, "", {} - - if not response_set: - return None, "", {} - - reply_to_str = await self._build_reply_to_str(action_message) - return await self.response_handler.generate_and_send_reply( - response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result - ) + return action_type async def _handle_action( self, action, reasoning, action_data, cycle_timers, thinking_id, action_message @@ -603,12 +375,12 @@ class CycleProcessor: if "reply" in available_actions: fallback_action = "reply" elif available_actions: - fallback_action = list(available_actions.keys())[0] + fallback_action = list(available_actions.keys()) if fallback_action and fallback_action != action: logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}") action_handler = self.context.action_manager.create_action( - action_name=fallback_action, + action_name=fallback_action if isinstance(fallback_action, list) else fallback_action, action_data=action_data, reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}", cycle_timers=cycle_timers, @@ -628,43 +400,3 @@ class CycleProcessor: logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}") traceback.print_exc() return False, "", "" - - def _build_final_loop_info(self, reply_loop_info, action_success, action_reply_text, action_command, plan_result): - """ - 构建最终的循环信息 - - Args: - reply_loop_info: 回复循环信息(可能为None) - action_success: 动作执行是否成功 - action_reply_text: 动作回复文本 - action_command: 动作命令 - plan_result: 规划结果 - - Returns: - dict: 完整的循环信息,包含规划信息和动作信息 - - 功能说明: - - 如果有回复循环信息,则在其基础上添加动作信息 - - 如果没有回复信息,则创建新的循环信息结构 - - 整合所有执行结果供循环跟踪器记录 - """ - if reply_loop_info: - loop_info = reply_loop_info - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "command": action_command, - "taken_time": time.time(), - } - ) - else: - loop_info = { - "loop_plan_info": {"action_result": plan_result.get("action_result", {})}, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "command": action_command, - "taken_time": time.time(), - }, - } - return loop_info diff --git a/src/chat/chat_loop/cycle_tracker.py b/src/chat/chat_loop/cycle_tracker.py index 2647bb7c6..9f276383b 100644 --- a/src/chat/chat_loop/cycle_tracker.py +++ b/src/chat/chat_loop/cycle_tracker.py @@ -91,25 +91,24 @@ class CycleTracker: # 获取动作类型,兼容新旧格式 action_type = "未知动作" - if hasattr(self, '_current_cycle_detail') and self._current_cycle_detail: + if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail: loop_plan_info = self._current_cycle_detail.loop_plan_info if isinstance(loop_plan_info, dict): - action_result = loop_plan_info.get('action_result', {}) + action_result = loop_plan_info.get("action_result", {}) if isinstance(action_result, dict): # 旧格式:action_result是字典 - action_type = action_result.get('action_type', '未知动作') + action_type = action_result.get("action_type", "未知动作") elif isinstance(action_result, list) and action_result: # 新格式:action_result是actions列表 - action_type = action_result[0].get('action_type', '未知动作') + action_type = action_result[0].get("action_type", "未知动作") elif isinstance(loop_plan_info, list) and loop_plan_info: # 直接是actions列表的情况 - action_type = loop_plan_info[0].get('action_type', '未知动作') + action_type = loop_plan_info[0].get("action_type", "未知动作") if self.context.current_cycle_detail.end_time and self.context.current_cycle_detail.start_time: duration = self.context.current_cycle_detail.end_time - self.context.current_cycle_detail.start_time logger.info( f"{self.context.log_prefix} 第{self.context.current_cycle_detail.cycle_id}次思考," f"耗时: {duration:.1f}秒, " - f"选择动作: {action_type}" - + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") + f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "") ) diff --git a/src/chat/chat_loop/energy_manager.py b/src/chat/chat_loop/energy_manager.py index 9358664a0..cc3cf8d0e 100644 --- a/src/chat/chat_loop/energy_manager.py +++ b/src/chat/chat_loop/energy_manager.py @@ -3,7 +3,6 @@ import time from typing import Optional from src.common.logger import get_logger from src.config.config import global_config -from src.plugin_system.base.component_types import ChatMode from .hfc_context import HfcContext from src.schedule.schedule_manager import schedule_manager diff --git a/src/chat/chat_loop/heartFC_chat.py b/src/chat/chat_loop/heartFC_chat.py index 41b072332..037ad620e 100644 --- a/src/chat/chat_loop/heartFC_chat.py +++ b/src/chat/chat_loop/heartFC_chat.py @@ -2,24 +2,24 @@ import asyncio import time import traceback import random -from typing import Optional, List, Dict, Any, Tuple +from typing import Optional, List, Dict, Any from collections import deque from src.common.logger import get_logger from src.config.config import global_config from src.person_info.relationship_builder_manager import relationship_builder_manager from src.chat.express.expression_learner import expression_learner_manager -from src.plugin_system.base.component_types import ChatMode from src.schedule.schedule_manager import schedule_manager, SleepState from src.plugin_system.apis import message_api from .hfc_context import HfcContext from .energy_manager import EnergyManager -from .proactive_thinker import ProactiveThinker +from .proactive.proactive_thinker import ProactiveThinker from .cycle_processor import CycleProcessor from .response_handler import ResponseHandler from .cycle_tracker import CycleTracker -from .wakeup_manager import WakeUpManager +from .sleep_manager.wakeup_manager import WakeUpManager +from .proactive.events import ProactiveTriggerEvent logger = get_logger("hfc") @@ -54,7 +54,8 @@ class HeartFChatting: self.context.chat_instance = self self._loop_task: Optional[asyncio.Task] = None - + self._proactive_monitor_task: Optional[asyncio.Task] = None + # 记录最近3次的兴趣度 self.recent_interest_records: deque = deque(maxlen=3) self._initialize_chat_mode() @@ -93,8 +94,12 @@ class HeartFChatting: self.context.relationship_builder = relationship_builder_manager.get_or_create_builder(self.context.stream_id) self.context.expression_learner = expression_learner_manager.get_expression_learner(self.context.stream_id) - #await self.energy_manager.start() - await self.proactive_thinker.start() + # 启动主动思考监视器 + if global_config.chat.enable_proactive_thinking: + self._proactive_monitor_task = asyncio.create_task(self._proactive_monitor_loop()) + self._proactive_monitor_task.add_done_callback(self._handle_proactive_monitor_completion) + logger.info(f"{self.context.log_prefix} 主动思考监视器已启动") + await self.wakeup_manager.start() self._loop_task = asyncio.create_task(self._main_chat_loop()) @@ -116,8 +121,12 @@ class HeartFChatting: return self.context.running = False - #await self.energy_manager.stop() - await self.proactive_thinker.stop() + # 停止主动思考监视器 + if self._proactive_monitor_task and not self._proactive_monitor_task.done(): + self._proactive_monitor_task.cancel() + await asyncio.sleep(0) + logger.info(f"{self.context.log_prefix} 主动思考监视器已停止") + await self.wakeup_manager.stop() if self._loop_task and not self._loop_task.done(): @@ -147,6 +156,151 @@ class HeartFChatting: except asyncio.CancelledError: logger.info(f"{self.context.log_prefix} HeartFChatting: 结束了聊天") + def _handle_proactive_monitor_completion(self, task: asyncio.Task): + """ + 处理主动思考监视器任务完成 + + Args: + task: 完成的异步任务对象 + + 功能说明: + - 处理任务异常完成的情况 + - 记录任务正常结束或被取消的日志 + """ + try: + if exception := task.exception(): + logger.error(f"{self.context.log_prefix} 主动思考监视器异常: {exception}") + else: + logger.info(f"{self.context.log_prefix} 主动思考监视器正常结束") + except asyncio.CancelledError: + logger.info(f"{self.context.log_prefix} 主动思考监视器被取消") + + async def _proactive_monitor_loop(self): + """ + 主动思考监视器循环 + + 功能说明: + - 定期检查是否需要进行主动思考 + - 计算聊天沉默时间,并与动态思考间隔比较 + - 当沉默时间超过阈值时,触发主动思考 + - 处理思考过程中的异常 + """ + while self.context.running: + await asyncio.sleep(15) + + if not self._should_enable_proactive_thinking(): + continue + + current_time = time.time() + silence_duration = current_time - self.context.last_message_time + target_interval = self._get_dynamic_thinking_interval() + + if silence_duration >= target_interval: + try: + formatted_time = self._format_duration(silence_duration) + event = ProactiveTriggerEvent( + source="silence_monitor", + reason=f"聊天已沉默 {formatted_time}", + metadata={"silence_duration": silence_duration}, + ) + await self.proactive_thinker.think(event) + self.context.last_message_time = current_time + except Exception as e: + logger.error(f"{self.context.log_prefix} 主动思考触发执行出错: {e}") + logger.error(traceback.format_exc()) + + def _should_enable_proactive_thinking(self) -> bool: + """ + 判断是否应启用主动思考 + + Returns: + bool: 如果应启用主动思考则返回True,否则返回False + + 功能说明: + - 检查全局配置和特定聊天设置 + - 支持按群聊和私聊分别配置 + - 支持白名单模式,只在特定聊天中启用 + """ + if not self.context.chat_stream: + return False + + is_group_chat = self.context.chat_stream.group_info is not None + + if is_group_chat and not global_config.chat.proactive_thinking_in_group: + return False + if not is_group_chat and not global_config.chat.proactive_thinking_in_private: + return False + + stream_parts = self.context.stream_id.split(":") + current_chat_identifier = f"{stream_parts}:{stream_parts}" if len(stream_parts) >= 2 else self.context.stream_id + + enable_list = getattr( + global_config.chat, + "proactive_thinking_enable_in_groups" if is_group_chat else "proactive_thinking_enable_in_private", + [], + ) + return not enable_list or current_chat_identifier in enable_list + + def _get_dynamic_thinking_interval(self) -> float: + """ + 获取动态思考间隔时间 + + Returns: + float: 思考间隔秒数 + + 功能说明: + - 尝试从timing_utils导入正态分布间隔函数 + - 根据配置计算动态间隔,增加随机性 + - 在无法导入或计算出错时,回退到固定的间隔 + """ + try: + from src.utils.timing_utils import get_normal_distributed_interval + + base_interval = global_config.chat.proactive_thinking_interval + delta_sigma = getattr(global_config.chat, "delta_sigma", 120) + + if base_interval <= 0: + base_interval = abs(base_interval) + if delta_sigma < 0: + delta_sigma = abs(delta_sigma) + + if base_interval == 0 and delta_sigma == 0: + return 300 + if delta_sigma == 0: + return base_interval + + sigma_percentage = delta_sigma / base_interval if base_interval > 0 else delta_sigma / 1000 + return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True) + + except ImportError: + logger.warning(f"{self.context.log_prefix} timing_utils不可用,使用固定间隔") + return max(300, abs(global_config.chat.proactive_thinking_interval)) + except Exception as e: + logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔") + return max(300, abs(global_config.chat.proactive_thinking_interval)) + + def _format_duration(self, seconds: float) -> str: + """ + 格式化时长为可读字符串 + + Args: + seconds: 时长秒数 + + Returns: + str: 格式化后的字符串 (例如 "1小时2分3秒") + """ + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + parts = [] + if hours > 0: + parts.append(f"{hours}小时") + if minutes > 0: + parts.append(f"{minutes}分") + if secs > 0 or not parts: + parts.append(f"{secs}秒") + return "".join(parts) + async def _main_chat_loop(self): """ 主聊天循环 @@ -238,30 +392,36 @@ class HeartFChatting: logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。") # 根据聊天模式处理新消息 - # 统一使用 _should_process_messages 判断是否应该处理 - should_process,interest_value = await self._should_process_messages(recent_messages if has_new_messages else None) - if should_process: - self.context.last_read_time = time.time() - await self.cycle_processor.observe(interest_value = interest_value) - else: - # Normal模式:消息数量不足,等待 + should_process, interest_value = await self._should_process_messages(recent_messages) + if not should_process: + # 消息数量不足或兴趣不够,等待 await asyncio.sleep(0.5) - return True - - if not await self._should_process_messages(recent_messages if has_new_messages else None): - return has_new_messages - - # 处理新消息 - for message in recent_messages: - await self.cycle_processor.observe(interest_value = interest_value) - + return True # Skip rest of the logic for this iteration + + # Messages should be processed + action_type = await self.cycle_processor.observe(interest_value=interest_value) + + # 管理no_reply计数器 + if action_type != "no_reply": + self.recent_interest_records.clear() + self.context.no_reply_consecutive = 0 + logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作,重置no_reply计数器") + else: # action_type == "no_reply" + self.context.no_reply_consecutive += 1 + self._determine_form_type() + + # 在一轮动作执行完毕后,增加睡眠压力 + if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system: + if action_type not in ["no_reply", "no_action"]: + self.context.energy_manager.increase_sleep_pressure() + # 如果成功观察,增加能量值并重置累积兴趣值 - if has_new_messages: - self.context.energy_value += 1 / global_config.chat.focus_value - # 重置累积兴趣值,因为消息已经被成功处理 - self.context.breaking_accumulated_interest = 0.0 - logger.info(f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值") - + self.context.energy_value += 1 / global_config.chat.focus_value + # 重置累积兴趣值,因为消息已经被成功处理 + self.context.breaking_accumulated_interest = 0.0 + logger.info( + f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值" + ) # 更新上一帧的睡眠状态 self.context.was_sleeping = is_sleeping @@ -282,7 +442,6 @@ class HeartFChatting: return has_new_messages - def _handle_wakeup_messages(self, messages): """ 处理休眠状态下的消息,累积唤醒度 @@ -321,73 +480,83 @@ class HeartFChatting: def _determine_form_type(self) -> str: """判断使用哪种形式的no_reply""" # 检查是否启用breaking模式 - if not global_config.chat.enable_breaking_mode: + if not getattr(global_config.chat, "enable_breaking_mode", False): logger.info(f"{self.context.log_prefix} breaking模式已禁用,使用waiting形式") self.context.focus_energy = 1 - return - + return "waiting" + # 如果连续no_reply次数少于3次,使用waiting形式 if self.context.no_reply_consecutive <= 3: self.context.focus_energy = 1 + return "waiting" else: # 使用累积兴趣值而不是最近3次的记录 total_interest = self.context.breaking_accumulated_interest - + # 计算调整后的阈值 adjusted_threshold = 1 / global_config.chat.get_current_talk_frequency(self.context.stream_id) - - logger.info(f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}") - + + logger.info( + f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}" + ) + # 如果累积兴趣值小于阈值,进入breaking形式 if total_interest < adjusted_threshold: logger.info(f"{self.context.log_prefix} 累积兴趣度不足,进入breaking形式") self.context.focus_energy = random.randint(3, 6) + return "breaking" else: logger.info(f"{self.context.log_prefix} 累积兴趣度充足,使用waiting形式") self.context.focus_energy = 1 + return "waiting" - async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool,float]: + async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]: """ 统一判断是否应该处理消息的函数 根据当前循环模式和消息内容决定是否继续处理 """ + if not new_message: + return False, 0.0 + new_message_count = len(new_message) - talk_frequency = global_config.chat.get_current_talk_frequency(self.context.chat_stream.stream_id) + talk_frequency = global_config.chat.get_current_talk_frequency(self.context.stream_id) modified_exit_count_threshold = self.context.focus_energy * 0.5 / talk_frequency modified_exit_interest_threshold = 1.5 / talk_frequency - + # 计算当前批次消息的兴趣值 batch_interest = 0.0 for msg_dict in new_message: interest_value = msg_dict.get("interest_value", 0.0) if msg_dict.get("processed_plain_text", ""): batch_interest += interest_value - + # 在breaking形式下累积所有消息的兴趣值 if new_message_count > 0: self.context.breaking_accumulated_interest += batch_interest total_interest = self.context.breaking_accumulated_interest else: total_interest = self.context.breaking_accumulated_interest - + if new_message_count >= modified_exit_count_threshold: # 记录兴趣度到列表 self.recent_interest_records.append(total_interest) # 重置累积兴趣值,因为已经达到了消息数量阈值 self.context.breaking_accumulated_interest = 0.0 - + logger.info( f"{self.context.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待,累积兴趣值: {total_interest:.2f}" ) - return True,total_interest/new_message_count + return True, total_interest / new_message_count # 检查累计兴趣值 if new_message_count > 0: # 只在兴趣值变化时输出log if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest: - logger.info(f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}") + logger.info( + f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}" + ) self._last_accumulated_interest = total_interest if total_interest >= modified_exit_interest_threshold: # 记录兴趣度到列表 @@ -397,13 +566,16 @@ class HeartFChatting: logger.info( f"{self.context.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{modified_exit_interest_threshold:.1f}),结束等待" ) - return True,total_interest/new_message_count - + return True, total_interest / new_message_count + # 每10秒输出一次等待状态 - if int(time.time() - self.context.last_read_time) > 0 and int(time.time() - self.context.last_read_time) % 10 == 0: + if ( + int(time.time() - self.context.last_read_time) > 0 + and int(time.time() - self.context.last_read_time) % 10 == 0 + ): logger.info( - f"{self.context.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..." + f"{self.context.log_prefix} 已等待{time.time() - self.context.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..." ) await asyncio.sleep(0.5) - - return False,0.0 + + return False, 0.0 diff --git a/src/chat/chat_loop/hfc_context.py b/src/chat/chat_loop/hfc_context.py index c924f713d..fca6a65e4 100644 --- a/src/chat/chat_loop/hfc_context.py +++ b/src/chat/chat_loop/hfc_context.py @@ -1,16 +1,15 @@ from typing import List, Optional, TYPE_CHECKING import time from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.common.logger import get_logger from src.person_info.relationship_builder_manager import RelationshipBuilder from src.chat.express.expression_learner import ExpressionLearner -from src.plugin_system.base.component_types import ChatMode from src.chat.planner_actions.action_manager import ActionManager from src.chat.chat_loop.hfc_utils import CycleDetail if TYPE_CHECKING: - from .wakeup_manager import WakeUpManager + from .sleep_manager.wakeup_manager import WakeUpManager from .energy_manager import EnergyManager + from .heartFC_chat import HeartFChatting class HfcContext: @@ -43,13 +42,13 @@ class HfcContext: self.energy_value = self.chat_stream.energy_value self.sleep_pressure = self.chat_stream.sleep_pressure - self.was_sleeping = False # 用于检测睡眠状态的切换 - + self.was_sleeping = False # 用于检测睡眠状态的切换 + self.last_message_time = time.time() self.last_read_time = time.time() - 10 - + # 从聊天流恢复breaking累积兴趣值 - self.breaking_accumulated_interest = getattr(self.chat_stream, 'breaking_accumulated_interest', 0.0) + self.breaking_accumulated_interest = getattr(self.chat_stream, "breaking_accumulated_interest", 0.0) self.action_manager = ActionManager() @@ -69,7 +68,7 @@ class HfcContext: # breaking形式下的累积兴趣值 self.breaking_accumulated_interest = 0.0 # 引用HeartFChatting实例,以便其他组件可以调用其方法 - self.chat_instance = None + self.chat_instance: Optional["HeartFChatting"] = None def save_context_state(self): """将当前状态保存到聊天流""" @@ -78,4 +77,4 @@ class HfcContext: self.chat_stream.sleep_pressure = self.sleep_pressure self.chat_stream.focus_energy = self.focus_energy self.chat_stream.no_reply_consecutive = self.no_reply_consecutive - self.chat_stream.breaking_accumulated_interest = self.breaking_accumulated_interest \ No newline at end of file + self.chat_stream.breaking_accumulated_interest = self.breaking_accumulated_interest diff --git a/src/chat/chat_loop/hfc_utils.py b/src/chat/chat_loop/hfc_utils.py index eeaac70f5..ae77b2378 100644 --- a/src/chat/chat_loop/hfc_utils.py +++ b/src/chat/chat_loop/hfc_utils.py @@ -1,13 +1,11 @@ import time from typing import Optional, Dict, Any, Union -from src.config.config import global_config from src.common.logger import get_logger from src.chat.message_receive.chat_stream import get_chat_manager from src.plugin_system.apis import send_api from maim_message.message_base import GroupInfo -from src.common.message_repository import count_messages logger = get_logger("hfc") @@ -122,6 +120,7 @@ class CycleDetail: self.loop_plan_info = loop_info["loop_plan_info"] self.loop_action_info = loop_info["loop_action_info"] + async def send_typing(): """ 发送打字状态指示 diff --git a/src/chat/chat_loop/proactive/events.py b/src/chat/chat_loop/proactive/events.py new file mode 100644 index 000000000..c273afef1 --- /dev/null +++ b/src/chat/chat_loop/proactive/events.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass, field +from typing import Optional, Dict, Any + + +@dataclass +class ProactiveTriggerEvent: + """ + 主动思考触发事件的数据类 + """ + + source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager" + reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo" + metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息 diff --git a/src/chat/chat_loop/proactive/proactive_thinker.py b/src/chat/chat_loop/proactive/proactive_thinker.py new file mode 100644 index 000000000..e2be9fdc2 --- /dev/null +++ b/src/chat/chat_loop/proactive/proactive_thinker.py @@ -0,0 +1,125 @@ +import time +import traceback +from typing import TYPE_CHECKING + +from src.common.logger import get_logger +from src.plugin_system.base.component_types import ChatMode +from ..hfc_context import HfcContext +from .events import ProactiveTriggerEvent +from src.plugin_system.apis import generator_api + +if TYPE_CHECKING: + from ..cycle_processor import CycleProcessor + +logger = get_logger("hfc") + + +class ProactiveThinker: + def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"): + """ + 初始化主动思考器 + + Args: + context: HFC聊天上下文对象 + cycle_processor: 循环处理器,用于执行主动思考的结果 + + 功能说明: + - 接收主动思考事件并执行思考流程 + - 根据事件类型执行不同的前置操作(如修改情绪) + - 调用planner进行决策并由cycle_processor执行 + """ + self.context = context + self.cycle_processor = cycle_processor + + async def think(self, trigger_event: ProactiveTriggerEvent): + """ + 统一的API入口,用于触发主动思考 + + Args: + trigger_event: 描述触发上下文的事件对象 + """ + logger.info( + f"{self.context.log_prefix} 接收到主动思考事件: " + f"来源='{trigger_event.source}', 原因='{trigger_event.reason}'" + ) + + try: + # 1. 根据事件类型执行前置操作 + await self._prepare_for_thinking(trigger_event) + + # 2. 执行核心思考逻辑 + await self._execute_proactive_thinking(trigger_event) + + except Exception as e: + logger.error(f"{self.context.log_prefix} 主动思考 think 方法执行异常: {e}") + logger.error(traceback.format_exc()) + + async def _prepare_for_thinking(self, trigger_event: ProactiveTriggerEvent): + """ + 根据事件类型,执行思考前的准备工作,例如修改情绪 + + Args: + trigger_event: 触发事件 + """ + if trigger_event.source != "insomnia_manager": + return + + try: + from src.mood.mood_manager import mood_manager + + mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id) + new_mood = None + + if trigger_event.reason == "low_pressure": + new_mood = "精力过剩,毫无睡意" + elif trigger_event.reason == "random": + new_mood = "深夜emo,胡思乱想" + elif trigger_event.reason == "goodnight": + new_mood = "有点困了,准备睡觉了" + + if new_mood: + mood_obj.mood_state = new_mood + mood_obj.last_change_time = time.time() + logger.info( + f"{self.context.log_prefix} 因 '{trigger_event.reason}'," + f"情绪状态被强制更新为: {mood_obj.mood_state}" + ) + + except Exception as e: + logger.error(f"{self.context.log_prefix} 设置失眠情绪时出错: {e}") + + async def _execute_proactive_thinking(self, trigger_event: ProactiveTriggerEvent): + """ + 执行主动思考的核心逻辑 + + Args: + trigger_event: 触发事件 + """ + try: + # 直接调用 planner 的 PROACTIVE 模式 + actions, target_message = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE) + + # 获取第一个规划出的动作作为主要决策 + action_result = actions[0] if actions else {} + + # 如果决策不是 do_nothing,则执行 + if action_result and action_result.get("action_type") != "do_nothing": + if action_result.get("action_type") == "reply": + success, response_set, _ = await generator_api.generate_reply( + chat_stream=self.context.chat_stream, + reply_message=action_result["action_message"], + available_actions={}, + enable_tool=False, + request_type="chat.replyer.proactive", + from_plugin=False, + ) + if success and response_set: + await self.cycle_processor.response_handler.send_response( + response_set, time.time(), action_result["action_message"] + ) + else: + logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") + + except Exception as e: + logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}") + logger.error(traceback.format_exc()) diff --git a/src/chat/chat_loop/proactive_thinker.py b/src/chat/chat_loop/proactive_thinker.py deleted file mode 100644 index 74816451f..000000000 --- a/src/chat/chat_loop/proactive_thinker.py +++ /dev/null @@ -1,353 +0,0 @@ -import asyncio -import time -import traceback -from typing import Optional, TYPE_CHECKING - -from src.common.logger import get_logger -from src.config.config import global_config -from src.plugin_system.base.component_types import ChatMode -from .hfc_context import HfcContext - -if TYPE_CHECKING: - from .cycle_processor import CycleProcessor - -logger = get_logger("hfc") - - -class ProactiveThinker: - def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"): - """ - 初始化主动思考器 - - Args: - context: HFC聊天上下文对象 - cycle_processor: 循环处理器,用于执行主动思考的结果 - - 功能说明: - - 管理机器人的主动发言功能 - - 根据沉默时间和配置触发主动思考 - - 提供私聊和群聊不同的思考提示模板 - - 使用3-sigma规则计算动态思考间隔 - """ - self.context = context - self.cycle_processor = cycle_processor - self._proactive_thinking_task: Optional[asyncio.Task] = None - - self.proactive_thinking_prompts = { - "private": """现在你和你朋友的私聊里面已经隔了{time}没有发送消息了,请你结合上下文以及你和你朋友之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择: - - 1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时) - 2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时) - - 请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""", - "group": """现在群里面已经隔了{time}没有人发送消息了,请你结合上下文以及群聊里面之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择: - - 1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时) - 2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时) - - 请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""", - } - - async def start(self): - """ - 启动主动思考器 - - 功能说明: - - 检查运行状态和配置,避免重复启动 - - 只有在启用主动思考功能时才启动 - - 创建主动思考循环异步任务 - - 设置任务完成回调处理 - - 记录启动日志 - """ - if self.context.running and not self._proactive_thinking_task and global_config.chat.enable_proactive_thinking: - self._proactive_thinking_task = asyncio.create_task(self._proactive_thinking_loop()) - self._proactive_thinking_task.add_done_callback(self._handle_proactive_thinking_completion) - logger.info(f"{self.context.log_prefix} 主动思考器已启动") - - async def stop(self): - """ - 停止主动思考器 - - 功能说明: - - 取消正在运行的主动思考任务 - - 等待任务完全停止 - - 记录停止日志 - """ - if self._proactive_thinking_task and not self._proactive_thinking_task.done(): - self._proactive_thinking_task.cancel() - await asyncio.sleep(0) - logger.info(f"{self.context.log_prefix} 主动思考器已停止") - - def _handle_proactive_thinking_completion(self, task: asyncio.Task): - """ - 处理主动思考任务完成 - - Args: - task: 完成的异步任务对象 - - 功能说明: - - 处理任务正常完成或异常情况 - - 记录相应的日志信息 - - 区分取消和异常终止的情况 - """ - try: - if exception := task.exception(): - logger.error(f"{self.context.log_prefix} 主动思考循环异常: {exception}") - else: - logger.info(f"{self.context.log_prefix} 主动思考循环正常结束") - except asyncio.CancelledError: - logger.info(f"{self.context.log_prefix} 主动思考循环被取消") - - async def _proactive_thinking_loop(self): - """ - 主动思考的主循环 - - 功能说明: - - 每15秒检查一次是否需要主动思考 - - 只在FOCUS模式下进行主动思考 - - 检查是否启用主动思考功能 - - 计算沉默时间并与动态间隔比较 - - 达到条件时执行主动思考并更新最后消息时间 - - 处理执行过程中的异常 - """ - while self.context.running: - await asyncio.sleep(15) - - if self.context.loop_mode != ChatMode.FOCUS: - continue - - if not self._should_enable_proactive_thinking(): - continue - - current_time = time.time() - silence_duration = current_time - self.context.last_message_time - - target_interval = self._get_dynamic_thinking_interval() - - if silence_duration >= target_interval: - try: - await self._execute_proactive_thinking(silence_duration) - self.context.last_message_time = current_time - except Exception as e: - logger.error(f"{self.context.log_prefix} 主动思考执行出错: {e}") - logger.error(traceback.format_exc()) - - def _should_enable_proactive_thinking(self) -> bool: - """ - 检查是否应该启用主动思考 - - Returns: - bool: 如果应该启用主动思考则返回True - - 功能说明: - - 检查聊天流是否存在 - - 检查当前聊天是否在启用列表中(按平台和类型分别检查) - - 根据聊天类型(群聊/私聊)和配置决定是否启用 - - 群聊需要proactive_thinking_in_group为True - - 私聊需要proactive_thinking_in_private为True - """ - if not self.context.chat_stream: - return False - - is_group_chat = self.context.chat_stream.group_info is not None - - # 检查基础开关 - if is_group_chat and not global_config.chat.proactive_thinking_in_group: - return False - if not is_group_chat and not global_config.chat.proactive_thinking_in_private: - return False - - # 获取当前聊天的完整标识 (platform:chat_id) - stream_parts = self.context.stream_id.split(":") - if len(stream_parts) >= 2: - platform = stream_parts[0] - chat_id = stream_parts[1] - current_chat_identifier = f"{platform}:{chat_id}" - else: - # 如果无法解析,则使用原始stream_id - current_chat_identifier = self.context.stream_id - - # 检查是否在启用列表中 - if is_group_chat: - # 群聊检查 - enable_list = getattr(global_config.chat, "proactive_thinking_enable_in_groups", []) - if enable_list and current_chat_identifier not in enable_list: - return False - else: - # 私聊检查 - enable_list = getattr(global_config.chat, "proactive_thinking_enable_in_private", []) - if enable_list and current_chat_identifier not in enable_list: - return False - - return True - - def _get_dynamic_thinking_interval(self) -> float: - """ - 获取动态思考间隔 - - Returns: - float: 计算得出的思考间隔时间(秒) - - 功能说明: - - 使用3-sigma规则计算正态分布的思考间隔 - - 基于base_interval和delta_sigma配置计算 - - 处理特殊情况(为0或负数的配置) - - 如果timing_utils不可用则使用固定间隔 - - 间隔范围被限制在1秒到86400秒(1天)之间 - """ - try: - from src.utils.timing_utils import get_normal_distributed_interval - - base_interval = global_config.chat.proactive_thinking_interval - delta_sigma = getattr(global_config.chat, "delta_sigma", 120) - - if base_interval < 0: - base_interval = abs(base_interval) - if delta_sigma < 0: - delta_sigma = abs(delta_sigma) - - if base_interval == 0 and delta_sigma == 0: - return 300 - elif base_interval == 0: - sigma_percentage = delta_sigma / 1000 - return get_normal_distributed_interval(0, sigma_percentage, 1, 86400, use_3sigma_rule=True) - elif delta_sigma == 0: - return base_interval - - sigma_percentage = delta_sigma / base_interval - return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True) - - except ImportError: - logger.warning(f"{self.context.log_prefix} timing_utils不可用,使用固定间隔") - return max(300, abs(global_config.chat.proactive_thinking_interval)) - except Exception as e: - logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔") - return max(300, abs(global_config.chat.proactive_thinking_interval)) - - def _format_duration(self, seconds: float) -> str: - """ - 格式化持续时间为中文描述 - - Args: - seconds: 持续时间(秒) - - Returns: - str: 格式化后的时间字符串,如"1小时30分45秒" - - 功能说明: - - 将秒数转换为小时、分钟、秒的组合 - - 只显示非零的时间单位 - - 如果所有单位都为0则显示"0秒" - - 用于主动思考日志的时间显示 - """ - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - secs = int(seconds % 60) - - parts = [] - if hours > 0: - parts.append(f"{hours}小时") - if minutes > 0: - parts.append(f"{minutes}分") - if secs > 0 or not parts: - parts.append(f"{secs}秒") - - return "".join(parts) - - async def _execute_proactive_thinking(self, silence_duration: float): - """ - 执行主动思考 - - Args: - silence_duration: 沉默持续时间(秒) - """ - formatted_time = self._format_duration(silence_duration) - logger.info(f"{self.context.log_prefix} 触发主动思考,已沉默{formatted_time}") - - try: - # 直接调用 planner 的 PROACTIVE 模式 - action_result_tuple, target_message = await self.cycle_processor.action_planner.plan( - mode=ChatMode.PROACTIVE - ) - action_result = action_result_tuple.get("action_result") - - # 如果决策不是 do_nothing,则执行 - if action_result and action_result.get("action_type") != "do_nothing": - logger.info(f"{self.context.log_prefix} 主动思考决策: {action_result.get('action_type')}, 原因: {action_result.get('reasoning')}") - # 在主动思考时,如果 target_message 为 None,则默认选取最新 message 作为 target_message - if target_message is None and self.context.chat_stream and self.context.chat_stream.context: - from src.chat.message_receive.message import MessageRecv - latest_message = self.context.chat_stream.context.get_last_message() - if isinstance(latest_message, MessageRecv): - user_info = latest_message.message_info.user_info - target_message = { - "chat_info_platform": latest_message.message_info.platform, - "user_platform": user_info.platform if user_info else None, - "user_id": user_info.user_id if user_info else None, - "processed_plain_text": latest_message.processed_plain_text, - "is_mentioned": latest_message.is_mentioned, - } - - # 将决策结果交给 cycle_processor 的后续流程处理 - await self.cycle_processor.execute_plan(action_result, target_message) - else: - logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") - - except Exception as e: - logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}") - logger.error(traceback.format_exc()) - - async def trigger_insomnia_thinking(self, reason: str): - """ - 由外部事件(如失眠)触发的一次性主动思考 - - Args: - reason: 触发的原因 (e.g., "low_pressure", "random") - """ - logger.info(f"{self.context.log_prefix} 因“{reason}”触发失眠,开始深夜思考...") - - # 1. 根据原因修改情绪 - try: - from src.mood.mood_manager import mood_manager - - mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id) - if reason == "low_pressure": - mood_obj.mood_state = "精力过剩,毫无睡意" - elif reason == "random": - mood_obj.mood_state = "深夜emo,胡思乱想" - mood_obj.last_change_time = time.time() # 更新时间戳以允许后续的情绪回归 - logger.info(f"{self.context.log_prefix} 因失眠,情绪状态被强制更新为: {mood_obj.mood_state}") - except Exception as e: - logger.error(f"{self.context.log_prefix} 设置失眠情绪时出错: {e}") - - # 2. 直接执行主动思考逻辑 - try: - # 传入一个象征性的silence_duration,因为它在这里不重要 - await self._execute_proactive_thinking(silence_duration=1) - except Exception as e: - logger.error(f"{self.context.log_prefix} 失眠思考执行出错: {e}") - logger.error(traceback.format_exc()) - - async def trigger_goodnight_thinking(self): - """ - 在失眠状态结束后,触发一次准备睡觉的主动思考 - """ - logger.info(f"{self.context.log_prefix} 失眠状态结束,准备睡觉,触发告别思考...") - - # 1. 设置一个准备睡觉的特定情绪 - try: - from src.mood.mood_manager import mood_manager - - mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id) - mood_obj.mood_state = "有点困了,准备睡觉了" - mood_obj.last_change_time = time.time() - logger.info(f"{self.context.log_prefix} 情绪状态更新为: {mood_obj.mood_state}") - except Exception as e: - logger.error(f"{self.context.log_prefix} 设置睡前情绪时出错: {e}") - - # 2. 直接执行主动思考逻辑 - try: - await self._execute_proactive_thinking(silence_duration=1) - except Exception as e: - logger.error(f"{self.context.log_prefix} 睡前告别思考执行出错: {e}") - logger.error(traceback.format_exc()) diff --git a/src/chat/chat_loop/response_handler.py b/src/chat/chat_loop/response_handler.py index 55dca45a3..b88f39fe0 100644 --- a/src/chat/chat_loop/response_handler.py +++ b/src/chat/chat_loop/response_handler.py @@ -64,7 +64,7 @@ class ResponseHandler: - 构建并返回完整的循环信息 - 用于上级方法的状态跟踪 """ - reply_text = await self.send_response(response_set, reply_to_str, loop_start_time, action_message) + reply_text = await self.send_response(response_set, loop_start_time, action_message) person_info_manager = get_person_info_manager() @@ -157,7 +157,7 @@ class ResponseHandler: await send_api.text_to_stream( text=data, stream_id=self.context.stream_id, - reply_to_message = message_data, + reply_to_message=message_data, set_reply=need_reply, typing=False, ) @@ -166,107 +166,9 @@ class ResponseHandler: await send_api.text_to_stream( text=data, stream_id=self.context.stream_id, - reply_to_message = message_data, - set_reply=need_reply, + reply_to_message=None, + set_reply=False, typing=True, ) return reply_text - - # TODO: 已废弃 - async def generate_response( - self, - message_data: dict, - available_actions: Optional[Dict[str, Any]], - reply_to: str, - request_type: str = "chat.replyer.normal", - ) -> Optional[list]: - """ - 生成回复内容 - - Args: - message_data: 消息数据 - available_actions: 可用动作列表 - reply_to: 回复目标 - request_type: 请求类型,默认为普通回复 - - Returns: - list: 生成的回复内容列表,失败时返回None - - 功能说明: - - 在生成回复前进行反注入检测(提高效率) - - 调用生成器API生成回复 - - 根据配置启用或禁用工具功能 - - 处理生成失败的情况 - - 记录生成过程中的错误和异常 - """ - try: - # === 反注入检测(仅在需要生成回复时) === - # 执行反注入检测(直接使用字典格式) - anti_injector = get_anti_injector() - result, modified_content, reason = await anti_injector.process_message( - message_data, self.context.chat_stream - ) - - # 根据反注入结果处理消息数据 - await anti_injector.handle_message_storage(result, modified_content, reason, message_data) - - if result == ProcessResult.BLOCKED_BAN: - # 用户被封禁 - 直接阻止回复生成 - anti_injector_logger.warning(f"用户被反注入系统封禁,阻止回复生成: {reason}") - return None - elif result == ProcessResult.BLOCKED_INJECTION: - # 消息被阻止(危险内容等) - 直接阻止回复生成 - anti_injector_logger.warning(f"消息被反注入系统阻止,阻止回复生成: {reason}") - return None - elif result == ProcessResult.COUNTER_ATTACK: - # 反击模式:生成反击消息作为回复 - anti_injector_logger.info(f"反击模式启动,生成反击回复: {reason}") - if modified_content: - # 返回反击消息作为回复内容 - return [("text", modified_content)] - else: - # 没有反击内容时阻止回复生成 - return None - - # 检查是否需要加盾处理 - safety_prompt = None - if result == ProcessResult.SHIELDED: - # 获取安全系统提示词并注入 - shield = anti_injector.shield - safety_prompt = shield.get_safety_system_prompt() - await Prompt.create_async(safety_prompt, "anti_injection_safety_prompt") - anti_injector_logger.info(f"消息已被反注入系统加盾处理,已注入安全提示词: {reason}") - - # 处理被修改的消息内容(用于生成回复) - modified_reply_to = reply_to - if modified_content: - # 更新消息内容用于生成回复 - anti_injector_logger.info(f"消息内容已被反注入系统修改,使用修改后内容生成回复: {reason}") - # 解析原始reply_to格式:"发送者:消息内容" - if ":" in reply_to: - sender_part, _ = reply_to.split(":", 1) - modified_reply_to = f"{sender_part}:{modified_content}" - else: - # 如果格式不标准,直接使用修改后的内容 - modified_reply_to = modified_content - - # === 正常的回复生成流程 === - success, reply_set, _ = await generator_api.generate_reply( - chat_stream=self.context.chat_stream, - reply_to=modified_reply_to, # 使用可能被修改的内容 - available_actions=available_actions, - enable_tool=global_config.tool.enable_tool, - request_type=request_type, - from_plugin=False, - ) - - if not success or not reply_set: - logger.info(f"对 {message_data.get('processed_plain_text')} 的回复生成失败") - return None - - return reply_set - - except Exception as e: - logger.error(f"{self.context.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}") - return None diff --git a/src/schedule/sleep_manager.py b/src/chat/chat_loop/sleep_manager/sleep_manager.py similarity index 99% rename from src/schedule/sleep_manager.py rename to src/chat/chat_loop/sleep_manager/sleep_manager.py index 8da39ba0e..3acda568a 100644 --- a/src/schedule/sleep_manager.py +++ b/src/chat/chat_loop/sleep_manager/sleep_manager.py @@ -10,7 +10,7 @@ from src.manager.local_store_manager import local_storage from src.plugin_system.apis import send_api, generator_api if TYPE_CHECKING: - from src.chat.chat_loop.wakeup_manager import WakeUpManager + from mmc.src.chat.chat_loop.sleep_manager.wakeup_manager import WakeUpManager logger = get_logger("sleep_manager") diff --git a/src/chat/chat_loop/wakeup_manager.py b/src/chat/chat_loop/sleep_manager/wakeup_manager.py similarity index 98% rename from src/chat/chat_loop/wakeup_manager.py rename to src/chat/chat_loop/sleep_manager/wakeup_manager.py index df5957b14..b72fe80de 100644 --- a/src/chat/chat_loop/wakeup_manager.py +++ b/src/chat/chat_loop/sleep_manager/wakeup_manager.py @@ -4,7 +4,7 @@ from typing import Optional from src.common.logger import get_logger from src.config.config import global_config from src.manager.local_store_manager import local_storage -from .hfc_context import HfcContext +from ..hfc_context import HfcContext logger = get_logger("wakeup") @@ -139,7 +139,7 @@ class WakeUpManager: # 只有在休眠且非失眠状态下才累积唤醒度 from src.schedule.schedule_manager import schedule_manager - from src.schedule.sleep_manager import SleepState + from mmc.src.chat.chat_loop.sleep_manager.sleep_manager import SleepState current_sleep_state = schedule_manager.get_current_sleep_state() if current_sleep_state != SleepState.SLEEPING: diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index 071f57a4a..b25fd1ab8 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -724,7 +724,7 @@ class EmojiManager: if not emoji.is_deleted and emoji.hash == emoji_hash: return emoji return None # 如果循环结束还没找到,则返回 None - + async def get_emoji_tag_by_hash(self, emoji_hash: str) -> Optional[str]: """根据哈希值获取已注册表情包的描述 @@ -755,7 +755,7 @@ class EmojiManager: except Exception as e: logger.error(f"获取表情包描述失败 (Hash: {emoji_hash}): {str(e)}") return None - + async def get_emoji_description_by_hash(self, emoji_hash: str) -> Optional[str]: """根据哈希值获取已注册表情包的描述 diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index 29b03815c..c43901eab 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -85,6 +85,7 @@ class ChatStream: self.context: ChatMessageContext = None # type: ignore # 用于存储该聊天的上下文信息 self.focus_energy = 1 self.no_reply_consecutive = 0 + self.breaking_accumulated_interest = 0.0 def to_dict(self) -> dict: """转换为字典格式""" @@ -97,6 +98,7 @@ class ChatStream: "last_active_time": self.last_active_time, "energy_value": self.energy_value, "sleep_pressure": self.sleep_pressure, + "breaking_accumulated_interest": self.breaking_accumulated_interest, } @classmethod @@ -257,7 +259,7 @@ class ChatManager: "user_cardname": model_instance.user_cardname or "", } group_info_data = None - if model_instance.group_id: + if model_instance and getattr(model_instance, "group_id", None): group_info_data = { "platform": model_instance.group_platform, "group_id": model_instance.group_id, @@ -403,7 +405,7 @@ class ChatManager: "user_cardname": model_instance.user_cardname or "", } group_info_data = None - if model_instance.group_id: + if model_instance and getattr(model_instance, "group_id", None): group_info_data = { "platform": model_instance.group_platform, "group_id": model_instance.group_id, diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 930d1f5ea..1df006a1c 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -120,7 +120,7 @@ class MessageRecv(Message): self.priority_mode = "interest" self.priority_info = None self.interest_value: float = 0.0 - + self.key_words = [] self.key_words_lite = [] diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 8f9502e2d..8219ee761 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -20,7 +20,7 @@ class MessageStorage: if isinstance(keywords, list): return orjson.dumps(keywords).decode("utf-8") return "[]" - + @staticmethod def _deserialize_keywords(keywords_str: str) -> list: """将JSON字符串反序列化为关键词列表""" diff --git a/src/chat/planner_actions/action_modifier.py b/src/chat/planner_actions/action_modifier.py index b68e639f5..705f723c8 100644 --- a/src/chat/planner_actions/action_modifier.py +++ b/src/chat/planner_actions/action_modifier.py @@ -161,10 +161,8 @@ class ActionModifier: available_actions = list(self.action_manager.get_using_actions().keys()) available_actions_text = "、".join(available_actions) if available_actions else "无" - - logger.info( - f"{self.log_prefix} 当前可用动作: {available_actions_text}||移除: {removals_summary}" - ) + + logger.info(f"{self.log_prefix} 当前可用动作: {available_actions_text}||移除: {removals_summary}") def _check_action_associated_types(self, all_actions: Dict[str, ActionInfo], chat_context: ChatMessageContext): type_mismatched_actions: List[Tuple[str, str]] = [] diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index c08d52029..24fdd16eb 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -188,15 +188,12 @@ class ActionPlanner: param_text = "" if action_info.action_parameters: param_text = "\n" + "\n".join( - f' "{p_name}":"{p_desc}"' - for p_name, p_desc in action_info.action_parameters.items() + f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items() ) require_text = "\n".join(f"- {req}" for req in action_info.action_require) - using_action_prompt = await global_prompt_manager.get_prompt_async( - "action_prompt" - ) + using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") action_options_block += using_action_prompt.format( action_name=action_name, action_description=action_info.description, @@ -205,9 +202,7 @@ class ActionPlanner: ) return action_options_block - def find_message_by_id( - self, message_id: str, message_id_list: list - ) -> Optional[Dict[str, Any]]: + def find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]: # sourcery skip: use-next """ 根据message_id从message_id_list中查找对应的原始消息 @@ -245,7 +240,7 @@ class ActionPlanner: async def plan( self, mode: ChatMode = ChatMode.FOCUS, - loop_start_time:float = 0.0, + loop_start_time: float = 0.0, available_actions: Optional[Dict[str, ActionInfo]] = None, ) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]: """ @@ -323,11 +318,15 @@ class ActionPlanner: # 如果获取的target_message为None,输出warning并重新plan if target_message is None: self.plan_retry_count += 1 - logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息,重试次数: {self.plan_retry_count}/{self.max_plan_retries}") - + logger.warning( + f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息,重试次数: {self.plan_retry_count}/{self.max_plan_retries}" + ) + # 如果连续三次plan均为None,输出error并选取最新消息 if self.plan_retry_count >= self.max_plan_retries: - logger.error(f"{self.log_prefix}连续{self.max_plan_retries}次plan获取target_message失败,选择最新消息作为target_message") + logger.error( + f"{self.log_prefix}连续{self.max_plan_retries}次plan获取target_message失败,选择最新消息作为target_message" + ) target_message = self.get_latest_message(message_id_list) self.plan_retry_count = 0 # 重置计数器 else: @@ -338,8 +337,7 @@ class ActionPlanner: self.plan_retry_count = 0 else: logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id") - - + if action != "no_reply" and action != "reply" and action not in current_available_actions: logger.warning( f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'" @@ -362,36 +360,35 @@ class ActionPlanner: is_parallel = False if mode == ChatMode.NORMAL and action in current_available_actions: is_parallel = current_available_actions[action].parallel_action - - + action_data["loop_start_time"] = loop_start_time - + actions = [] - + # 1. 添加Planner取得的动作 - actions.append({ - "action_type": action, - "reasoning": reasoning, - "action_data": action_data, - "action_message": target_message, - "available_actions": available_actions # 添加这个字段 - }) - - if action != "reply" and is_parallel: - actions.append({ - "action_type": "reply", + actions.append( + { + "action_type": action, + "reasoning": reasoning, + "action_data": action_data, "action_message": target_message, - "available_actions": available_actions - }) - - return actions,target_message + "available_actions": available_actions, # 添加这个字段 + } + ) + + if action != "reply" and is_parallel: + actions.append( + {"action_type": "reply", "action_message": target_message, "available_actions": available_actions} + ) + + return actions, target_message async def build_planner_prompt( self, is_group_chat: bool, # Now passed as argument chat_target_info: Optional[dict], # Now passed as argument current_available_actions: Dict[str, ActionInfo], - refresh_time :bool = False, + refresh_time: bool = False, mode: ChatMode = ChatMode.FOCUS, ) -> tuple[str, list]: # sourcery skip: use-join """构建 Planner LLM 的提示词 (获取模板并填充数据)""" @@ -400,21 +397,15 @@ class ActionPlanner: time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" bot_name = global_config.bot.nickname bot_nickname = ( - f",也有人叫你{','.join(global_config.bot.alias_names)}" - if global_config.bot.alias_names - else "" + f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else "" ) bot_core_personality = global_config.personality.personality_core - identity_block = ( - f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" - ) + identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" schedule_block = "" if global_config.schedule.enable: if current_activity := schedule_manager.get_current_activity(): - schedule_block = ( - f"你当前正在:{current_activity},但注意它与群聊的聊天无关。" - ) + schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。" mood_block = "" if global_config.mood.enable_mood: @@ -424,13 +415,9 @@ class ActionPlanner: # --- 根据模式构建不同的Prompt --- if mode == ChatMode.PROACTIVE: long_term_memory_block = await self._get_long_term_memory_context() - action_options_text = await self._build_action_options( - current_available_actions, mode - ) + action_options_text = await self._build_action_options(current_available_actions, mode) - prompt_template = await global_prompt_manager.get_prompt_async( - "proactive_planner_prompt" - ) + prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt") prompt = prompt_template.format( time_block=time_block, identity_block=identity_block, @@ -463,12 +450,8 @@ class ActionPlanner: limit=5, ) - actions_before_now_block = build_readable_actions( - actions=actions_before_now - ) - actions_before_now_block = ( - f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" - ) + actions_before_now_block = build_readable_actions(actions=actions_before_now) + actions_before_now_block = f"你刚刚选择并执行过的action是:\n{actions_before_now_block}" if refresh_time: self.last_obs_time_mark = time.time() @@ -504,30 +487,22 @@ class ActionPlanner: }}""" chat_context_description = "你现在正在一个群聊中" - chat_target_name = None + chat_target_name = None if not is_group_chat and chat_target_info: chat_target_name = ( - chat_target_info.get("person_name") - or chat_target_info.get("user_nickname") - or "对方" + chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方" ) chat_context_description = f"你正在和 {chat_target_name} 私聊" - action_options_block = await self._build_action_options( - current_available_actions, mode - ) + action_options_block = await self._build_action_options(current_available_actions, mode) moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。" custom_prompt_block = "" if global_config.custom_prompt.planner_custom_prompt_content: - custom_prompt_block = ( - global_config.custom_prompt.planner_custom_prompt_content - ) + custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content - planner_prompt_template = await global_prompt_manager.get_prompt_async( - "planner_prompt" - ) + planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt") prompt = planner_prompt_template.format( schedule_block=schedule_block, mood_block=mood_block, @@ -555,9 +530,7 @@ class ActionPlanner: """ is_group_chat = True is_group_chat, chat_target_info = get_chat_type_and_target_info(self.chat_id) - logger.debug( - f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}" - ) + logger.debug(f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}") current_available_actions_dict = self.action_manager.get_using_actions() @@ -568,13 +541,9 @@ class ActionPlanner: current_available_actions = {} for action_name in current_available_actions_dict: if action_name in all_registered_actions: - current_available_actions[action_name] = all_registered_actions[ - action_name - ] + current_available_actions[action_name] = all_registered_actions[action_name] else: - logger.warning( - f"{self.log_prefix}使用中的动作 {action_name} 未在已注册动作中找到" - ) + logger.warning(f"{self.log_prefix}使用中的动作 {action_name} 未在已注册动作中找到") # 将no_reply作为系统级特殊动作添加到可用动作中 # no_reply虽然是系统级决策,但需要让规划器认为它是可用的 diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 973565d37..bee1ad802 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -15,7 +15,6 @@ from src.chat.utils.prompt_utils import PromptUtils from src.mais4u.mai_think import mai_thinking_manager from src.common.logger import get_logger from src.config.config import global_config, model_config -from src.config.api_ada_configs import TaskConfig from src.individuality.individuality import get_individuality from src.llm_models.utils_model import LLMRequest from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending @@ -706,16 +705,16 @@ class DefaultReplyer: # 检查最新五条消息中是否包含bot自己说的消息 latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages) - + # logger.info(f"最新五条消息:{latest_5_messages}") # logger.info(f"最新五条消息中是否包含bot自己说的消息:{has_bot_message}") - + # 如果最新五条消息中不包含bot的消息,则返回空字符串 if not has_bot_message: core_dialogue_prompt = "" else: core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 2) :] # 限制消息数量 - + core_dialogue_prompt_str = build_readable_messages( core_dialogue_list, replace_bot_name=True, @@ -819,7 +818,7 @@ class DefaultReplyer: mood_prompt = "" if reply_to: - #兼容旧的reply_to + # 兼容旧的reply_to sender, target = self._parse_reply_target(reply_to) else: # 获取 platform,如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值 @@ -830,7 +829,7 @@ class DefaultReplyer: ) person_name = await person_info_manager.get_value(person_id, "person_name") sender = person_name - target = reply_message.get('processed_plain_text') + target = reply_message.get("processed_plain_text") person_info_manager = get_person_info_manager() person_id = person_info_manager.get_person_id_by_person_name(sender) @@ -1021,7 +1020,7 @@ class DefaultReplyer: chat_stream = self.chat_stream chat_id = chat_stream.stream_id is_group_chat = bool(chat_stream.group_info) - + if reply_message: sender = reply_message.get("sender") target = reply_message.get("target") @@ -1178,7 +1177,9 @@ class DefaultReplyer: else: logger.debug(f"\n{prompt}\n") - content, (reasoning_content, model_name, tool_calls) = await self.express_model.generate_response_async(prompt) + content, (reasoning_content, model_name, tool_calls) = await self.express_model.generate_response_async( + prompt + ) logger.debug(f"replyer生成内容: {content}") return content, reasoning_content, model_name, tool_calls diff --git a/src/chat/replyer/replyer_manager.py b/src/chat/replyer/replyer_manager.py index 2613e49a1..2f64ab07f 100644 --- a/src/chat/replyer/replyer_manager.py +++ b/src/chat/replyer/replyer_manager.py @@ -1,7 +1,6 @@ -from typing import Dict, Optional, List, Tuple +from typing import Dict, Optional from src.common.logger import get_logger -from src.config.api_ada_configs import TaskConfig from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.replyer.default_generator import DefaultReplyer diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py index fac26806c..e6843874f 100644 --- a/src/chat/utils/chat_message_builder.py +++ b/src/chat/utils/chat_message_builder.py @@ -1250,7 +1250,7 @@ async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]: # 检查必要信息是否存在 且 不是机器人自己 if not all([platform, user_id]) or user_id == global_config.bot.qq_account: continue - + # 添加空值检查,防止 platform 为 None 时出错 if platform is None: platform = "unknown" diff --git a/src/chat/utils/prompt_utils.py b/src/chat/utils/prompt_utils.py index a5bb931dd..4eed6025f 100644 --- a/src/chat/utils/prompt_utils.py +++ b/src/chat/utils/prompt_utils.py @@ -4,17 +4,13 @@ """ import re -import time from typing import Dict, Any, Optional, Tuple from src.common.logger import get_logger from src.config.config import global_config -from src.chat.utils.chat_message_builder import ( - get_raw_msg_before_timestamp_with_chat, - build_readable_messages_with_id, -) from src.chat.message_receive.chat_stream import get_chat_manager from src.person_info.person_info import get_person_info_manager +from src.plugin_system.apis import cross_context_api logger = get_logger("prompt_utils") @@ -88,108 +84,24 @@ class PromptUtils: ) -> str: """ 构建跨群聊上下文 - 统一实现,完全继承DefaultReplyer功能 - - Args: - chat_id: 当前聊天ID - target_user_info: 目标用户信息 - current_prompt_mode: 当前提示模式 - - Returns: - str: 跨群上下文块 """ if not global_config.cross_context.enable: return "" - # 找到当前群聊所在的共享组 - target_group = None - current_stream = get_chat_manager().get_stream(chat_id) - if not current_stream or not current_stream.group_info: + other_chat_raw_ids = cross_context_api.get_context_groups(chat_id) + if not other_chat_raw_ids: return "" - try: - current_chat_raw_id = current_stream.group_info.group_id - except Exception as e: - logger.error(f"获取群聊ID失败: {e}") + chat_stream = get_chat_manager().get_stream(chat_id) + if not chat_stream: return "" - for group in global_config.cross_context.groups: - if str(current_chat_raw_id) in group.chat_ids: - target_group = group - break - - if not target_group: - return "" - - # 根据prompt_mode选择策略 - other_chat_raw_ids = [chat_id for chat_id in target_group.chat_ids if chat_id != str(current_chat_raw_id)] - - cross_context_messages = [] - if current_prompt_mode == "normal": - # normal模式:获取其他群聊的最近N条消息 - for chat_raw_id in other_chat_raw_ids: - stream_id = get_chat_manager().get_stream_id(current_stream.platform, chat_raw_id, is_group=True) - if not stream_id: - continue - - try: - messages = get_raw_msg_before_timestamp_with_chat( - chat_id=stream_id, - timestamp=time.time(), - limit=5, # 可配置 - ) - if messages: - chat_name = get_chat_manager().get_stream_name(stream_id) or stream_id - formatted_messages, _ = build_readable_messages_with_id(messages, timestamp_mode="relative") - cross_context_messages.append(f'[以下是来自"{chat_name}"的近期消息]\n{formatted_messages}') - except Exception as e: - logger.error(f"获取群聊{chat_raw_id}的消息失败: {e}") - continue - + return await cross_context_api.build_cross_context_normal(chat_stream, other_chat_raw_ids) elif current_prompt_mode == "s4u": - # s4u模式:获取当前发言用户在其他群聊的消息 - if target_user_info: - user_id = target_user_info.get("user_id") + return await cross_context_api.build_cross_context_s4u(chat_stream, other_chat_raw_ids, target_user_info) - if user_id: - for chat_raw_id in other_chat_raw_ids: - stream_id = get_chat_manager().get_stream_id( - current_stream.platform, chat_raw_id, is_group=True - ) - if not stream_id: - continue - - try: - messages = get_raw_msg_before_timestamp_with_chat( - chat_id=stream_id, - timestamp=time.time(), - limit=20, # 获取更多消息以供筛选 - ) - user_messages = [msg for msg in messages if msg.get("user_id") == user_id][ - -5: - ] # 筛选并取最近5条 - - if user_messages: - chat_name = get_chat_manager().get_stream_name(stream_id) or stream_id - user_name = ( - target_user_info.get("person_name") - or target_user_info.get("user_nickname") - or user_id - ) - formatted_messages, _ = build_readable_messages_with_id( - user_messages, timestamp_mode="relative" - ) - cross_context_messages.append( - f'[以下是"{user_name}"在"{chat_name}"的近期发言]\n{formatted_messages}' - ) - except Exception as e: - logger.error(f"获取用户{user_id}在群聊{chat_raw_id}的消息失败: {e}") - continue - - if not cross_context_messages: - return "" - - return "# 跨群上下文参考\n" + "\n\n".join(cross_context_messages) + "\n" + return "" @staticmethod def parse_reply_target_id(reply_to: str) -> str: diff --git a/src/chat/utils/utils_image.py b/src/chat/utils/utils_image.py index aee19eeaf..847d48fac 100644 --- a/src/chat/utils/utils_image.py +++ b/src/chat/utils/utils_image.py @@ -4,7 +4,6 @@ import time import hashlib import uuid import io -import asyncio import numpy as np from typing import Optional, Tuple, Dict, Any @@ -27,16 +26,15 @@ logger = get_logger("chat_image") def is_image_message(message: Dict[str, Any]) -> bool: """ 判断消息是否为图片消息 - + Args: message: 消息字典 - + Returns: bool: 是否为图片消息 """ return message.get("type") == "image" or ( - isinstance(message.get("content"), dict) and - message["content"].get("type") == "image" + isinstance(message.get("content"), dict) and message["content"].get("type") == "image" ) @@ -596,7 +594,6 @@ class ImageManager: return "", "[图片]" - # 创建全局单例 image_manager = None diff --git a/src/common/database/monthly_plan_db.py b/src/common/database/monthly_plan_db.py index cfe6501fc..7d254fa9f 100644 --- a/src/common/database/monthly_plan_db.py +++ b/src/common/database/monthly_plan_db.py @@ -62,10 +62,12 @@ def get_active_plans_for_month(month: str) -> List[MonthlyPlan]: """ with get_db_session() as session: try: - plans = session.query(MonthlyPlan).filter( - MonthlyPlan.target_month == month, - MonthlyPlan.status == 'active' - ).order_by(MonthlyPlan.created_at.desc()).all() + plans = ( + session.query(MonthlyPlan) + .filter(MonthlyPlan.target_month == month, MonthlyPlan.status == "active") + .order_by(MonthlyPlan.created_at.desc()) + .all() + ) return plans except Exception as e: logger.error(f"查询 {month} 的有效月度计划时发生错误: {e}") diff --git a/src/config/config.py b/src/config/config.py index 232c48fb6..06cb6370e 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -81,8 +81,8 @@ def get_key_comment(toml_table, key): return item.trivia.comment if hasattr(toml_table, "keys"): for k in toml_table.keys(): - if isinstance(k, KeyType) and k.key == key: # type: ignore - return k.trivia.comment # type: ignore + if isinstance(k, KeyType) and k.key == key: # type: ignore + return k.trivia.comment # type: ignore return None diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 67c2d4b6b..7dbc0ce0e 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -259,7 +259,6 @@ class NormalChatConfig(ValidatedConfigBase): """普通聊天配置类""" - class ExpressionRule(ValidatedConfigBase): """表达学习规则""" @@ -652,7 +651,10 @@ class ContextGroup(ValidatedConfigBase): """上下文共享组配置""" name: str = Field(..., description="共享组的名称") - chat_ids: List[str] = Field(..., description="属于该组的聊天ID列表") + chat_ids: List[List[str]] = Field( + ..., + description='属于该组的聊天ID列表,格式为 [["type", "chat_id"], ...],例如 [["group", "123456"], ["private", "789012"]]', + ) class CrossContextConfig(ValidatedConfigBase): diff --git a/src/main.py b/src/main.py index 10ae224db..e5869b3b7 100644 --- a/src/main.py +++ b/src/main.py @@ -29,9 +29,54 @@ from src.plugin_system.core.plugin_hot_reload import hot_reload_manager # 导入消息API和traceback模块 from src.common.message import get_global_api -# 条件导入记忆系统 -if global_config.memory.enable_memory: - from src.chat.memory_system.Hippocampus import hippocampus_manager +from src.chat.memory_system.Hippocampus import hippocampus_manager + +if not global_config.memory.enable_memory: + import src.chat.memory_system.Hippocampus as hippocampus_module + + class MockHippocampusManager: + def initialize(self): + pass + + def get_hippocampus(self): + return None + + async def build_memory(self): + pass + + async def forget_memory(self, percentage: float = 0.005): + pass + + async def consolidate_memory(self): + pass + + async def get_memory_from_text( + self, + text: str, + max_memory_num: int = 3, + max_memory_length: int = 2, + max_depth: int = 3, + fast_retrieval: bool = False, + ) -> list: + return [] + + async def get_memory_from_topic( + self, valid_keywords: list[str], max_memory_num: int = 3, max_memory_length: int = 2, max_depth: int = 3 + ) -> list: + return [] + + async def get_activate_from_text( + self, text: str, max_depth: int = 3, fast_retrieval: bool = False + ) -> tuple[float, list[str]]: + return 0.0, [] + + def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list: + return [] + + def get_all_node_names(self) -> list: + return [] + + hippocampus_module.hippocampus_manager = MockHippocampusManager() # 插件系统现在使用统一的插件加载器 @@ -42,11 +87,7 @@ logger = get_logger("main") class MainSystem: def __init__(self): - # 根据配置条件性地初始化记忆系统 - if global_config.memory.enable_memory: - self.hippocampus_manager = hippocampus_manager - else: - self.hippocampus_manager = None + self.hippocampus_manager = hippocampus_manager self.individuality: Individuality = get_individuality() @@ -103,8 +144,6 @@ class MainSystem: else: loop.run_until_complete(async_memory_manager.shutdown()) logger.info("🛑 记忆管理器已停止") - except ImportError: - pass # 异步记忆优化器不存在 except Exception as e: logger.error(f"停止记忆管理器时出错: {e}") @@ -189,7 +228,6 @@ MoFox_Bot(第三方修改版) get_emoji_manager().initialize() logger.info("表情包管理器初始化成功") - # 启动情绪管理器 await mood_manager.start() logger.info("情绪管理器初始化成功") @@ -201,22 +239,18 @@ MoFox_Bot(第三方修改版) logger.info("聊天管理器初始化成功") - # 根据配置条件性地初始化记忆系统 - if global_config.memory.enable_memory: - if self.hippocampus_manager: - self.hippocampus_manager.initialize() - logger.info("记忆系统初始化成功") + # 初始化记忆系统 + self.hippocampus_manager.initialize() + logger.info("记忆系统初始化成功") - # 初始化异步记忆管理器 - try: - from src.chat.memory_system.async_memory_optimizer import async_memory_manager + # 初始化异步记忆管理器 + try: + from src.chat.memory_system.async_memory_optimizer import async_memory_manager - await async_memory_manager.initialize() - logger.info("记忆管理器初始化成功") - except Exception as e: - logger.error(f"记忆管理器初始化失败: {e}") - else: - logger.info("记忆系统已禁用,跳过初始化") + await async_memory_manager.initialize() + logger.info("记忆管理器初始化成功") + except Exception as e: + logger.error(f"记忆管理器初始化失败: {e}") # await asyncio.sleep(0.5) #防止logger输出飞了 @@ -265,15 +299,14 @@ MoFox_Bot(第三方修改版) self.server.run(), ] - # 根据配置条件性地添加记忆系统相关任务 - if global_config.memory.enable_memory and self.hippocampus_manager: - tasks.extend( - [ - self.build_memory_task(), - self.forget_memory_task(), - self.consolidate_memory_task(), - ] - ) + # 添加记忆系统相关任务 + tasks.extend( + [ + self.build_memory_task(), + self.forget_memory_task(), + self.consolidate_memory_task(), + ] + ) await asyncio.gather(*tasks) @@ -305,10 +338,6 @@ MoFox_Bot(第三方修改版) def sync_build_memory(): """在线程池中执行同步记忆构建""" - if not self.hippocampus_manager: - logger.error("尝试在禁用记忆系统时构建记忆,操作已取消。") - return - try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/src/plugin_system/apis/cross_context_api.py b/src/plugin_system/apis/cross_context_api.py new file mode 100644 index 000000000..8dd4aaf97 --- /dev/null +++ b/src/plugin_system/apis/cross_context_api.py @@ -0,0 +1,180 @@ +""" +跨群聊上下文API +""" + +import time +from typing import Dict, Any, Optional, List + +from src.common.logger import get_logger +from src.config.config import global_config +from src.chat.utils.chat_message_builder import ( + get_raw_msg_before_timestamp_with_chat, + build_readable_messages_with_id, +) +from src.chat.message_receive.chat_stream import get_chat_manager, ChatStream + +logger = get_logger("cross_context_api") + + +def get_context_groups(chat_id: str) -> Optional[List[List[str]]]: + """ + 获取当前聊天所在的共享组的其他聊天ID + """ + current_stream = get_chat_manager().get_stream(chat_id) + if not current_stream: + return None + + is_group = current_stream.group_info is not None + if is_group: + assert current_stream.group_info is not None + current_chat_raw_id = current_stream.group_info.group_id + else: + current_chat_raw_id = current_stream.user_info.user_id + current_type = "group" if is_group else "private" + + for group in global_config.cross_context.groups: + # 检查当前聊天的ID和类型是否在组的chat_ids中 + if [current_type, str(current_chat_raw_id)] in group.chat_ids: + # 返回组内其他聊天的 [type, id] 列表 + return [chat_info for chat_info in group.chat_ids if chat_info != [current_type, str(current_chat_raw_id)]] + + return None + + +async def build_cross_context_normal(chat_stream: ChatStream, other_chat_infos: List[List[str]]) -> str: + """ + 构建跨群聊/私聊上下文 (Normal模式) + """ + cross_context_messages = [] + for chat_type, chat_raw_id in other_chat_infos: + is_group = chat_type == "group" + stream_id = get_chat_manager().get_stream_id(chat_stream.platform, chat_raw_id, is_group=is_group) + if not stream_id: + continue + + try: + messages = get_raw_msg_before_timestamp_with_chat( + chat_id=stream_id, + timestamp=time.time(), + limit=5, # 可配置 + ) + if messages: + chat_name = get_chat_manager().get_stream_name(stream_id) or chat_raw_id + formatted_messages, _ = build_readable_messages_with_id(messages, timestamp_mode="relative") + cross_context_messages.append(f'[以下是来自"{chat_name}"的近期消息]\n{formatted_messages}') + except Exception as e: + logger.error(f"获取聊天 {chat_raw_id} 的消息失败: {e}") + continue + + if not cross_context_messages: + return "" + + return "# 跨上下文参考\n" + "\n\n".join(cross_context_messages) + "\n" + + +async def build_cross_context_s4u( + chat_stream: ChatStream, + other_chat_infos: List[List[str]], + target_user_info: Optional[Dict[str, Any]], +) -> str: + """ + 构建跨群聊/私聊上下文 (S4U模式) + """ + cross_context_messages = [] + if target_user_info: + user_id = target_user_info.get("user_id") + + if user_id: + for chat_type, chat_raw_id in other_chat_infos: + is_group = chat_type == "group" + stream_id = get_chat_manager().get_stream_id(chat_stream.platform, chat_raw_id, is_group=is_group) + if not stream_id: + continue + + try: + messages = get_raw_msg_before_timestamp_with_chat( + chat_id=stream_id, + timestamp=time.time(), + limit=20, # 获取更多消息以供筛选 + ) + user_messages = [msg for msg in messages if msg.get("user_id") == user_id][-5:] + + if user_messages: + chat_name = get_chat_manager().get_stream_name(stream_id) or chat_raw_id + user_name = ( + target_user_info.get("person_name") or target_user_info.get("user_nickname") or user_id + ) + formatted_messages, _ = build_readable_messages_with_id( + user_messages, timestamp_mode="relative" + ) + cross_context_messages.append( + f'[以下是"{user_name}"在"{chat_name}"的近期发言]\n{formatted_messages}' + ) + except Exception as e: + logger.error(f"获取用户 {user_id} 在聊天 {chat_raw_id} 的消息失败: {e}") + continue + + if not cross_context_messages: + return "" + + return "# 跨上下文参考\n" + "\n\n".join(cross_context_messages) + "\n" + + +async def get_chat_history_by_group_name(group_name: str) -> str: + """ + 根据互通组名字获取聊天记录 + """ + target_group = None + for group in global_config.cross_context.groups: + if group.name == group_name: + target_group = group + break + + if not target_group: + return f"找不到名为 {group_name} 的互通组。" + + if not target_group.chat_ids: + return f"互通组 {group_name} 中没有配置任何聊天。" + + chat_infos = target_group.chat_ids + chat_manager = get_chat_manager() + + cross_context_messages = [] + for chat_type, chat_raw_id in chat_infos: + is_group = chat_type == "group" + + found_stream = None + for stream in chat_manager.streams.values(): + if is_group: + if stream.group_info and stream.group_info.group_id == chat_raw_id: + found_stream = stream + break + else: # private + if stream.user_info and stream.user_info.user_id == chat_raw_id and not stream.group_info: + found_stream = stream + break + + if not found_stream: + logger.warning(f"在已加载的聊天流中找不到ID为 {chat_raw_id} 的聊天。") + continue + + stream_id = found_stream.stream_id + + try: + messages = get_raw_msg_before_timestamp_with_chat( + chat_id=stream_id, + timestamp=time.time(), + limit=5, # 可配置 + ) + if messages: + chat_name = get_chat_manager().get_stream_name(stream_id) or chat_raw_id + formatted_messages, _ = build_readable_messages_with_id(messages, timestamp_mode="relative") + cross_context_messages.append(f'[以下是来自"{chat_name}"的近期消息]\n{formatted_messages}') + except Exception as e: + logger.error(f"获取聊天 {chat_raw_id} 的消息失败: {e}") + continue + + if not cross_context_messages: + return f"无法从互通组 {group_name} 中获取任何聊天记录。" + + return "# 跨上下文参考\n" + "\n\n".join(cross_context_messages) + "\n" diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index ee60b72b6..b20909e77 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -12,7 +12,6 @@ import traceback from typing import Tuple, Any, Dict, List, Optional from rich.traceback import install from src.common.logger import get_logger -from src.config.api_ada_configs import TaskConfig from src.chat.replyer.default_generator import DefaultReplyer from src.chat.message_receive.chat_stream import ChatStream from src.chat.utils.utils import process_llm_response @@ -107,9 +106,7 @@ async def generate_reply( """ try: # 获取回复器 - replyer = get_replyer( - chat_stream, chat_id, request_type=request_type - ) + replyer = get_replyer(chat_stream, chat_id, request_type=request_type) if not replyer: logger.error("[GeneratorAPI] 无法获取回复器") return False, [], None diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index 5e24cb1b0..7a4d371a2 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -30,7 +30,6 @@ import traceback import time -import difflib import asyncio from typing import Optional, Union, Dict, Any from src.common.logger import get_logger @@ -41,8 +40,6 @@ from maim_message import UserInfo from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.message_receive.message import MessageSending, MessageRecv -from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, replace_user_references_async -from src.person_info.person_info import get_person_info_manager from maim_message import Seg from src.config.config import global_config @@ -51,6 +48,7 @@ logger = get_logger("send_api") # 适配器命令响应等待池 _adapter_response_pool: Dict[str, asyncio.Future] = {} + def message_dict_to_message_recv(message_dict: Dict[str, Any]) -> Optional[MessageRecv]: """查找要回复的消息 @@ -97,10 +95,11 @@ def message_dict_to_message_recv(message_dict: Dict[str, Any]) -> Optional[Messa } message_recv = MessageRecv(message_dict) - + logger.info(f"[SendAPI] 找到匹配的回复消息,发送者: {message_dict.get('user_nickname', '')}") return message_recv + def put_adapter_response(request_id: str, response_data: dict) -> None: """将适配器响应放入响应池""" if request_id in _adapter_response_pool: @@ -192,7 +191,7 @@ async def _send_to_target( anchor_message.update_chat_stream(target_stream) reply_to_platform_id = ( f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}" - ) + ) else: anchor_message = None reply_to_platform_id = None @@ -234,7 +233,6 @@ async def _send_to_target( return False - # ============================================================================= # 公共API函数 - 预定义类型的发送函数 # ============================================================================= @@ -274,7 +272,9 @@ async def text_to_stream( ) -async def emoji_to_stream(emoji_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = False) -> bool: +async def emoji_to_stream( + emoji_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = False +) -> bool: """向指定流发送表情包 Args: @@ -285,10 +285,14 @@ async def emoji_to_stream(emoji_base64: str, stream_id: str, storage_message: bo Returns: bool: 是否发送成功 """ - return await _send_to_target("emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply) + return await _send_to_target( + "emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply + ) -async def image_to_stream(image_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = False) -> bool: +async def image_to_stream( + image_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = False +) -> bool: """向指定流发送图片 Args: @@ -299,11 +303,17 @@ async def image_to_stream(image_base64: str, stream_id: str, storage_message: bo Returns: bool: 是否发送成功 """ - return await _send_to_target("image", image_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply) + return await _send_to_target( + "image", image_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply + ) async def command_to_stream( - command: Union[str, dict], stream_id: str, storage_message: bool = True, display_message: str = "", set_reply: bool = False + command: Union[str, dict], + stream_id: str, + storage_message: bool = True, + display_message: str = "", + set_reply: bool = False, ) -> bool: """向指定流发送命令 diff --git a/src/plugin_system/core/event_manager.py b/src/plugin_system/core/event_manager.py index 8f70b259b..7f92b1632 100644 --- a/src/plugin_system/core/event_manager.py +++ b/src/plugin_system/core/event_manager.py @@ -68,7 +68,7 @@ class EventManager: event = BaseEvent(event_name, allowed_subscribers, allowed_triggers) self._events[event_name] = event logger.debug(f"事件 {event_name} 注册成功") - + # 检查是否有缓存的订阅需要处理 self._process_pending_subscriptions(event_name) diff --git a/src/plugin_system/core/plugin_manager.py b/src/plugin_system/core/plugin_manager.py index 2794e2fc5..07d33b773 100644 --- a/src/plugin_system/core/plugin_manager.py +++ b/src/plugin_system/core/plugin_manager.py @@ -200,7 +200,7 @@ class PluginManager: # 检查并调用 on_plugin_loaded 钩子(如果存在) if hasattr(plugin_instance, "on_plugin_loaded") and callable( - getattr(plugin_instance, "on_plugin_loaded") + plugin_instance.on_plugin_loaded ): logger.debug(f"为插件 '{plugin_name}' 调用 on_plugin_loaded 钩子") try: diff --git a/src/plugins/built_in/maizone_refactored/plugin.py b/src/plugins/built_in/maizone_refactored/plugin.py index a82db7511..c54872872 100644 --- a/src/plugins/built_in/maizone_refactored/plugin.py +++ b/src/plugins/built_in/maizone_refactored/plugin.py @@ -53,7 +53,9 @@ class MaiZoneRefactoredPlugin(BasePlugin): "enable_reply": ConfigField(type=bool, default=True, description="完成后是否回复"), "ai_image_number": ConfigField(type=int, default=1, description="AI生成图片数量"), "image_number": ConfigField(type=int, default=1, description="本地配图数量(1-9张)"), - "image_directory": ConfigField(type=str, default=(Path(__file__).parent / "images").as_posix(), description="图片存储目录") + "image_directory": ConfigField( + type=str, default=(Path(__file__).parent / "images").as_posix(), description="图片存储目录" + ), }, "read": { "permission": ConfigField(type=list, default=[], description="阅读权限QQ号列表"), @@ -75,7 +77,9 @@ class MaiZoneRefactoredPlugin(BasePlugin): "forbidden_hours_end": ConfigField(type=int, default=6, description="禁止发送的结束小时(24小时制)"), }, "cookie": { - "http_fallback_host": ConfigField(type=str, default="127.0.0.1", description="备用Cookie获取服务的主机地址"), + "http_fallback_host": ConfigField( + type=str, default="127.0.0.1", description="备用Cookie获取服务的主机地址" + ), "http_fallback_port": ConfigField(type=int, default=9999, description="备用Cookie获取服务的端口"), "napcat_token": ConfigField(type=str, default="", description="Napcat服务的认证Token(可选)"), }, @@ -95,14 +99,14 @@ class MaiZoneRefactoredPlugin(BasePlugin): image_service = ImageService(self.get_config) cookie_service = CookieService(self.get_config) reply_tracker_service = ReplyTrackerService() - + # 使用已创建的 reply_tracker_service 实例 qzone_service = QZoneService( - self.get_config, - content_service, - image_service, + self.get_config, + content_service, + image_service, cookie_service, - reply_tracker_service # 传入已创建的实例 + reply_tracker_service, # 传入已创建的实例 ) scheduler_service = SchedulerService(self.get_config, qzone_service) monitor_service = MonitorService(self.get_config, qzone_service) diff --git a/src/plugins/built_in/maizone_refactored/services/content_service.py b/src/plugins/built_in/maizone_refactored/services/content_service.py index 46f6017f0..27f2a0ee9 100644 --- a/src/plugins/built_in/maizone_refactored/services/content_service.py +++ b/src/plugins/built_in/maizone_refactored/services/content_service.py @@ -13,6 +13,7 @@ from src.common.logger import get_logger import imghdr import asyncio from src.plugin_system.apis import llm_api, config_api, generator_api +from src.plugin_system.apis.cross_context_api import get_chat_history_by_group_name from src.chat.message_receive.chat_stream import get_chat_manager from maim_message import UserInfo from src.llm_models.utils_model import LLMRequest @@ -87,6 +88,11 @@ class ContentService: if context: prompt += f"\n作为参考,这里有一些最近的聊天记录:\n---\n{context}\n---" + # 添加跨群聊上下文 + cross_context = await get_chat_history_by_group_name("maizone_context_group") + if cross_context and "找不到名为" not in cross_context: + prompt += f"\n\n---跨群聊参考---\n{cross_context}\n---" + # 添加历史记录以避免重复 prompt += "\n\n---历史说说记录---\n" history_block = await get_send_history(qq_account) @@ -232,7 +238,7 @@ class ContentService: for i in range(3): # 重试3次 try: async with aiohttp.ClientSession() as session: - async with session.get(image_url, timeout=30) as resp: + async with session.get(image_url, timeout=aiohttp.ClientTimeout(total=30)) as resp: if resp.status != 200: logger.error(f"下载图片失败: {image_url}, status: {resp.status}") await asyncio.sleep(2) diff --git a/src/plugins/built_in/maizone_refactored/services/qzone_service.py b/src/plugins/built_in/maizone_refactored/services/qzone_service.py index 6960dd5ab..2c7dddaec 100644 --- a/src/plugins/built_in/maizone_refactored/services/qzone_service.py +++ b/src/plugins/built_in/maizone_refactored/services/qzone_service.py @@ -272,8 +272,10 @@ class QZoneService: # 检查是否已经在持久化记录中标记为已回复 if not self.reply_tracker.has_replied(fid, comment_tid): # 记录日志以便追踪 - logger.debug(f"发现新评论需要回复 - 说说ID: {fid}, 评论ID: {comment_tid}, " - f"评论人: {comment.get('nickname', '')}, 内容: {comment.get('content', '')}") + logger.debug( + f"发现新评论需要回复 - 说说ID: {fid}, 评论ID: {comment_tid}, " + f"评论人: {comment.get('nickname', '')}, 内容: {comment.get('content', '')}" + ) comments_to_reply.append(comment) if not comments_to_reply: diff --git a/src/plugins/built_in/maizone_refactored/services/reply_tracker_service.py b/src/plugins/built_in/maizone_refactored/services/reply_tracker_service.py index 9df67d26a..0fa7edb99 100644 --- a/src/plugins/built_in/maizone_refactored/services/reply_tracker_service.py +++ b/src/plugins/built_in/maizone_refactored/services/reply_tracker_service.py @@ -41,7 +41,7 @@ class ReplyTrackerService: if not isinstance(data, dict): logger.error("加载的数据不是字典格式") return False - + for feed_id, comments in data.items(): if not isinstance(feed_id, str): logger.error(f"无效的说说ID格式: {feed_id}") @@ -70,12 +70,14 @@ class ReplyTrackerService: logger.warning("回复记录文件为空,将创建新的记录") self.replied_comments = {} return - + data = json.loads(file_content) if self._validate_data(data): self.replied_comments = data - logger.info(f"已加载 {len(self.replied_comments)} 条说说的回复记录," - f"总计 {sum(len(comments) for comments in self.replied_comments.values())} 条评论") + logger.info( + f"已加载 {len(self.replied_comments)} 条说说的回复记录," + f"总计 {sum(len(comments) for comments in self.replied_comments.values())} 条评论" + ) else: logger.error("加载的数据格式无效,将创建新的记录") self.replied_comments = {} @@ -112,12 +114,12 @@ class ReplyTrackerService: self._cleanup_old_records() # 创建临时文件 - temp_file = self.reply_record_file.with_suffix('.tmp') - + temp_file = self.reply_record_file.with_suffix(".tmp") + # 先写入临时文件 with open(temp_file, "w", encoding="utf-8") as f: json.dump(self.replied_comments, f, ensure_ascii=False, indent=2) - + # 如果写入成功,重命名为正式文件 if temp_file.stat().st_size > 0: # 确保写入成功 # 在Windows上,如果目标文件已存在,需要先删除它 @@ -128,7 +130,7 @@ class ReplyTrackerService: else: logger.error("临时文件写入失败,文件大小为0") temp_file.unlink() # 删除空的临时文件 - + except Exception as e: logger.error(f"保存回复记录失败: {e}", exc_info=True) # 尝试删除可能存在的临时文件 @@ -204,7 +206,7 @@ class ReplyTrackerService: # 确保将comment_id转换为字符串格式 comment_id_str = str(comment_id) - + if feed_id not in self.replied_comments: self.replied_comments[feed_id] = {} diff --git a/src/schedule/monthly_plan_manager.py b/src/schedule/monthly_plan_manager.py index 570cc00b1..bb966a48f 100644 --- a/src/schedule/monthly_plan_manager.py +++ b/src/schedule/monthly_plan_manager.py @@ -76,7 +76,7 @@ class MonthlyPlanManager: if len(plans) > max_plans: logger.warning(f"当前月度计划数量 ({len(plans)}) 超出上限 ({max_plans}),将自动删除多余的计划。") # 数据库查询结果已按创建时间降序排序(新的在前),直接截取超出上限的部分进行删除 - plans_to_delete = plans[:len(plans)-max_plans] + plans_to_delete = plans[: len(plans) - max_plans] delete_ids = [p.id for p in plans_to_delete] delete_plans_by_ids(delete_ids) # 重新获取计划列表 @@ -101,7 +101,7 @@ class MonthlyPlanManager: async def _generate_monthly_plans_logic(self, target_month: Optional[str] = None) -> bool: """ 生成指定月份的月度计划的核心逻辑 - + :param target_month: 目标月份,格式为 "YYYY-MM"。如果为 None,则为当前月份。 :return: 是否生成成功 """ @@ -291,6 +291,8 @@ class MonthlyPlanManager: except Exception as e: logger.error(f" 归档 {target_month} 月度计划时发生错误: {e}") + + class MonthlyPlanGenerationTask(AsyncTask): """每月初自动生成新月度计划的任务""" @@ -327,7 +329,7 @@ class MonthlyPlanGenerationTask(AsyncTask): current_month = next_month.strftime("%Y-%m") logger.info(f" 到达月初,开始生成 {current_month} 的月度计划...") await self.monthly_plan_manager._generate_monthly_plans_logic(current_month) - + except asyncio.CancelledError: logger.info(" 每月月度计划生成任务被取消。") break diff --git a/src/schedule/schedule_manager.py b/src/schedule/schedule_manager.py index e4457a3fc..e543eb706 100644 --- a/src/schedule/schedule_manager.py +++ b/src/schedule/schedule_manager.py @@ -15,10 +15,10 @@ from src.llm_models.utils_model import LLMRequest from src.common.logger import get_logger from json_repair import repair_json from src.manager.async_task_manager import AsyncTask, async_task_manager -from .sleep_manager import SleepManager, SleepState +from ..chat.chat_loop.sleep_manager.sleep_manager import SleepManager, SleepState if TYPE_CHECKING: - from src.chat.chat_loop.wakeup_manager import WakeUpManager + from src.chat.chat_loop.sleep_manager.wakeup_manager import WakeUpManager logger = get_logger("schedule_manager") @@ -165,14 +165,16 @@ class ScheduleManager: schedule_str = f"已成功加载今天的日程 ({today_str}):\n" if self.today_schedule: for item in self.today_schedule: - schedule_str += f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n" + schedule_str += ( + f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n" + ) logger.info(schedule_str) return # 成功加载,直接返回 else: logger.warning("数据库中的日程数据格式无效,将重新生成日程") else: logger.info(f"数据库中未找到今天的日程 ({today_str}),将调用 LLM 生成。") - + # 仅在需要时生成 await self.generate_and_save_schedule() diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 84833a6c8..fceec7d8b 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "6.7.1" +version = "6.7.2" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -476,28 +476,25 @@ pre_sleep_notification_groups = [] # 用于生成睡前消息的提示。AI会根据这个提示生成一句晚安问候。 pre_sleep_prompt = "我准备睡觉了,请生成一句简短自然的晚安问候。" -[cross_context] # 跨群聊上下文共享配置 +[cross_context] # 跨群聊/私聊上下文共享配置 # 这是总开关,用于一键启用或禁用此功能 -enable = false +enable = true # 在这里定义您的“共享组” -# 只有在同一个组内的群聊才会共享上下文 -# 注意:这里的chat_ids需要填写群号 +# 只有在同一个组内的聊天才会共享上下文 +# 格式:chat_ids = [["type", "id"], ["type", "id"], ...] +# type 可选 "group" 或 "private" [[cross_context.groups]] name = "项目A技术讨论组" chat_ids = [ - "111111", # 假设这是“开发群”的ID - "222222" # 假设这是“产品群”的ID + ["group", "169850076"], # 假设这是“开发群”的ID + ["group", "1025509724"], # 假设这是“产品群”的ID + ["private", "123456789"] # 假设这是某个用户的私聊 ] - -[maizone_intercom] -# QQ空间互通组配置 -# 启用后,发布说说时会读取指定互通组的上下文 -enable = false # 定义QQ空间互通组 # 同一个组的chat_id会共享上下文,用于生成更相关的说说 -[[maizone_intercom.groups]] +[[cross_context.maizone_context_group]] name = "Maizone默认互通组" chat_ids = [ - "111111", # 示例群聊1 - "222222" # 示例群聊2 + ["group", "111111"], # 示例群聊1 + ["private", "222222"] # 示例私聊2 ] \ No newline at end of file