diff --git a/README.md b/README.md index fa97fec14..46e1fb77d 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,66 @@ # 麦麦!MaiCore-MaiMBot (编辑中) +
+
+ + ![Python Version](https://img.shields.io/badge/Python-3.9+-blue) + ![License](https://img.shields.io/github/license/SengokuCola/MaiMBot?label=协议) + ![Status](https://img.shields.io/badge/状态-开发中-yellow) + ![Contributors](https://img.shields.io/github/contributors/MaiM-with-u/MaiBot.svg?style=flat&label=贡献者) + ![forks](https://img.shields.io/github/forks/MaiM-with-u/MaiBot.svg?style=flat&label=分支数) + ![stars](https://img.shields.io/github/stars/MaiM-with-u/MaiBot?style=flat&label=星标数) + ![issues](https://img.shields.io/github/issues/MaiM-with-u/MaiBot) + +
+ +

+ + Logo + +
+ + 画师:略nd + + +

MaiBot(麦麦)

+

+ 一款专注于 群组聊天 的赛博网友 +
+ 探索本项目的文档 » +
+
+ + 报告Bug + · + 提出新特性 +

+ +

## 新版0.6.0部署前先阅读:https://docs.mai-mai.org/manual/usage/mmc_q_a -
- -![Python Version](https://img.shields.io/badge/Python-3.9+-blue) -![License](https://img.shields.io/github/license/SengokuCola/MaiMBot) -![Status](https://img.shields.io/badge/状态-开发中-yellow) - -
## 📝 项目简介 **🍔MaiCore是一个基于大语言模型的可交互智能体** -- LLM 提供对话能力 -- 动态Prompt构建器 -- 实时的思维系统 -- MongoDB 提供数据持久化支持 -- 可扩展,可支持多种平台和多种功能 + +- 💭 **智能对话系统**:基于LLM的自然语言交互 +- 🤔 **实时思维系统**:模拟人类思考过程 +- 💝 **情感表达系统**:丰富的表情包和情绪表达 +- 🧠 **持久记忆系统**:基于MongoDB的长期记忆存储 +- 🔄 **动态人格系统**:自适应的性格特征 + +
+ + 麦麦演示视频 +
+ 👆 点击观看麦麦演示视频 👆 +
+
+ + +### 📢 版本信息 **最新版本: v0.6.0** ([查看更新日志](changelogs/changelog.md)) > [!WARNING] @@ -28,19 +70,12 @@ > 次版本MaiBot将基于MaiCore运行,不再依赖于nonebot相关组件运行。 > MaiBot将通过nonebot的插件与nonebot建立联系,然后nonebot与QQ建立联系,实现MaiBot与QQ的交互 -**分支介绍:** -- main 稳定版本 -- dev 开发版(不知道什么意思就别下) -- classical 0.6.0以前的版本 +**分支说明:** +- `main`: 稳定发布版本 +- `dev`: 开发测试版本(不知道什么意思就别下) +- `classical`: 0.6.0之前的版本 -
- - 麦麦演示视频 -
- 👆 点击观看麦麦演示视频 👆 -
-
> [!WARNING] > - 项目处于活跃开发阶段,代码可能随时更改 @@ -49,6 +84,12 @@ > - 由于持续迭代,可能存在一些已知或未知的bug > - 由于开发中,可能消耗较多token +### ⚠️ 重要提示 + +- 升级到v0.6.0版本前请务必阅读:[升级指南](https://docs.mai-mai.org/manual/usage/mmc_q_a) +- 本版本基于MaiCore重构,通过nonebot插件与QQ平台交互 +- 项目处于活跃开发阶段,功能和API可能随时调整 + ### 💬交流群(开发和建议相关讨论)不一定有空回复,会优先写文档和代码 - [五群](https://qm.qq.com/q/JxvHZnxyec) 1022489779 - [一群](https://qm.qq.com/q/VQ3XZrWgMs) 766798517 【已满】 @@ -72,55 +113,35 @@ ## 🎯 功能介绍 -### 💬 聊天功能 -- 提供思维流(心流)聊天和推理聊天两种对话逻辑 -- 支持关键词检索主动发言:对消息的话题topic进行识别,如果检测到麦麦存储过的话题就会主动进行发言 -- 支持bot名字呼唤发言:检测到"麦麦"会主动发言,可配置 -- 支持多模型,多厂商自定义配置 -- 动态的prompt构建器,更拟人 -- 支持图片,转发消息,回复消息的识别 -- 支持私聊功能,可使用PFC模式的有目的多轮对话(实验性) +| 模块 | 主要功能 | 特点 | +|------|---------|------| +| 💬 聊天系统 | • 思维流/推理聊天
• 关键词主动发言
• 多模型支持
• 动态prompt构建
• 私聊功能(PFC) | 拟人化交互 | +| 🧠 思维流系统 | • 实时思考生成
• 自动启停机制
• 日程系统联动 | 智能化决策 | +| 🧠 记忆系统 2.0 | • 优化记忆抽取
• 海马体记忆机制
• 聊天记录概括 | 持久化记忆 | +| 😊 表情包系统 | • 情绪匹配发送
• GIF支持
• 自动收集与审查 | 丰富表达 | +| 📅 日程系统 | • 动态日程生成
• 自定义想象力
• 思维流联动 | 智能规划 | +| 👥 关系系统 2.0 | • 关系管理优化
• 丰富接口支持
• 个性化交互 | 深度社交 | +| 📊 统计系统 | • 使用数据统计
• LLM调用记录
• 实时控制台显示 | 数据可视 | +| 🔧 系统功能 | • 优雅关闭机制
• 自动数据保存
• 异常处理完善 | 稳定可靠 | -### 🧠 思维流系统 -- 思维流能够在回复前后进行思考,生成实时想法 -- 思维流自动启停机制,提升资源利用效率 -- 思维流与日程系统联动,实现动态日程生成 +## 📐 项目架构 -### 🧠 记忆系统 2.0 -- 优化记忆抽取策略和prompt结构 -- 改进海马体记忆提取机制,提升自然度 -- 对聊天记录进行概括存储,在需要时调用 +```mermaid +graph TD + A[MaiCore] --> B[对话系统] + A --> C[思维流系统] + A --> D[记忆系统] + A --> E[情感系统] + B --> F[多模型支持] + B --> G[动态Prompt] + C --> H[实时思考] + C --> I[日程联动] + D --> J[记忆存储] + D --> K[记忆检索] + E --> L[表情管理] + E --> M[情绪识别] +``` -### 😊 表情包系统 -- 支持根据发言内容发送对应情绪的表情包 -- 支持识别和处理gif表情包 -- 会自动偷群友的表情包 -- 表情包审查功能 -- 表情包文件完整性自动检查 -- 自动清理缓存图片 - -### 📅 日程系统 -- 动态更新的日程生成 -- 可自定义想象力程度 -- 与聊天情况交互(思维流模式下) - -### 👥 关系系统 2.0 -- 优化关系管理系统,适用于新版本 -- 提供更丰富的关系接口 -- 针对每个用户创建"关系",实现个性化回复 - -### 📊 统计系统 -- 详细的使用数据统计 -- LLM调用统计 -- 在控制台显示统计信息 - -### 🔧 系统功能 -- 支持优雅的shutdown机制 -- 自动保存功能,定期保存聊天记录和关系数据 -- 完善的异常处理机制 -- 可自定义时区设置 -- 优化的日志输出格式 -- 配置自动更新功能 ## 开发计划TODO:LIST @@ -157,7 +178,6 @@ MaiCore是一个开源项目,我们非常欢迎你的参与。你的贡献, ## 致谢 -- [nonebot2](https://github.com/nonebot/nonebot2): 跨平台 Python 异步聊天机器人框架 - [NapCat](https://github.com/NapNeko/NapCatQQ): 现代化的基于 NTQQ 的 Bot 协议端实现 ### 贡献者 diff --git a/bot.py b/bot.py index a0bf3a3cb..ca214967e 100644 --- a/bot.py +++ b/bot.py @@ -8,6 +8,7 @@ import time import platform from dotenv import load_dotenv from src.common.logger import get_module_logger +from src.common.crash_logger import install_crash_handler from src.main import MainSystem logger = get_module_logger("main_bot") @@ -193,6 +194,9 @@ def raw_main(): if platform.system().lower() != "windows": time.tzset() + # 安装崩溃日志处理器 + install_crash_handler() + check_eula() print("检查EULA和隐私条款完成") easter_egg() diff --git a/depends-data/maimai.png b/depends-data/maimai.png new file mode 100644 index 000000000..faccb856b Binary files /dev/null and b/depends-data/maimai.png differ diff --git a/depends-data/video.png b/depends-data/video.png new file mode 100644 index 000000000..84176b2d9 Binary files /dev/null and b/depends-data/video.png differ diff --git a/src/common/crash_logger.py b/src/common/crash_logger.py new file mode 100644 index 000000000..658e1bb02 --- /dev/null +++ b/src/common/crash_logger.py @@ -0,0 +1,72 @@ +import sys +import traceback +import logging +from pathlib import Path +from logging.handlers import RotatingFileHandler + +def setup_crash_logger(): + """设置崩溃日志记录器""" + # 创建logs/crash目录(如果不存在) + crash_log_dir = Path("logs/crash") + crash_log_dir.mkdir(parents=True, exist_ok=True) + + # 创建日志记录器 + crash_logger = logging.getLogger('crash_logger') + crash_logger.setLevel(logging.ERROR) + + # 设置日志格式 + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s\n' + '异常类型: %(exc_info)s\n' + '详细信息:\n%(message)s\n' + '-------------------\n' + ) + + # 创建按大小轮转的文件处理器(最大10MB,保留5个备份) + log_file = crash_log_dir / "crash.log" + file_handler = RotatingFileHandler( + log_file, + maxBytes=10*1024*1024, # 10MB + backupCount=5, + encoding='utf-8' + ) + file_handler.setFormatter(formatter) + crash_logger.addHandler(file_handler) + + return crash_logger + +def log_crash(exc_type, exc_value, exc_traceback): + """记录崩溃信息到日志文件""" + if exc_type is None: + return + + # 获取崩溃日志记录器 + crash_logger = logging.getLogger('crash_logger') + + # 获取完整的异常堆栈信息 + stack_trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) + + # 记录崩溃信息 + crash_logger.error( + stack_trace, + exc_info=(exc_type, exc_value, exc_traceback) + ) + +def install_crash_handler(): + """安装全局异常处理器""" + # 设置崩溃日志记录器 + setup_crash_logger() + + # 保存原始的异常处理器 + original_hook = sys.excepthook + + def exception_handler(exc_type, exc_value, exc_traceback): + """全局异常处理器""" + # 记录崩溃信息 + log_crash(exc_type, exc_value, exc_traceback) + + # 调用原始的异常处理器 + original_hook(exc_type, exc_value, exc_traceback) + + # 设置全局异常处理器 + sys.excepthook = exception_handler \ No newline at end of file diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py new file mode 100644 index 000000000..ad69fea1d --- /dev/null +++ b/src/plugins/PFC/action_planner.py @@ -0,0 +1,139 @@ +from typing import Tuple +from src.common.logger import get_module_logger +from ..models.utils_model import LLM_request +from ..config.config import global_config +from .chat_observer import ChatObserver +from .pfc_utils import get_items_from_json +from src.individuality.individuality import Individuality +from .observation_info import ObservationInfo +from .conversation_info import ConversationInfo + +logger = get_module_logger("action_planner") + +class ActionPlannerInfo: + def __init__(self): + self.done_action = [] + self.goal_list = [] + self.knowledge_list = [] + self.memory_list = [] + + +class ActionPlanner: + """行动规划器""" + + def __init__(self, stream_id: str): + self.llm = LLM_request( + model=global_config.llm_normal, + temperature=0.7, + max_tokens=1000, + request_type="action_planning" + ) + self.personality_info = Individuality.get_instance().get_prompt(type = "personality", x_person = 2, level = 2) + self.name = global_config.BOT_NICKNAME + self.chat_observer = ChatObserver.get_instance(stream_id) + + async def plan( + self, + observation_info: ObservationInfo, + conversation_info: ConversationInfo + ) -> Tuple[str, str]: + """规划下一步行动 + + Args: + observation_info: 决策信息 + conversation_info: 对话信息 + + Returns: + Tuple[str, str]: (行动类型, 行动原因) + """ + # 构建提示词 + logger.debug(f"开始规划行动:当前目标: {conversation_info.goal_list}") + + #构建对话目标 + if conversation_info.goal_list: + goal, reasoning = conversation_info.goal_list[-1] + else: + goal = "目前没有明确对话目标" + reasoning = "目前没有明确对话目标,最好思考一个对话目标" + + + # 获取聊天历史记录 + chat_history_list = observation_info.chat_history + chat_history_text = "" + for msg in chat_history_list: + chat_history_text += f"{msg}\n" + + if observation_info.new_messages_count > 0: + new_messages_list = observation_info.unprocessed_messages + + chat_history_text += f"有{observation_info.new_messages_count}条新消息:\n" + for msg in new_messages_list: + chat_history_text += f"{msg}\n" + + observation_info.clear_unprocessed_messages() + + + personality_text = f"你的名字是{self.name},{self.personality_info}" + + # 构建action历史文本 + action_history_list = conversation_info.done_action + action_history_text = "你之前做的事情是:" + for action in action_history_list: + action_history_text += f"{action}\n" + + + + prompt = f"""{personality_text}。现在你在参与一场QQ聊天,请分析以下内容,根据信息决定下一步行动: + +当前对话目标:{goal} +产生该对话目标的原因:{reasoning} + +{action_history_text} + +最近的对话记录: +{chat_history_text} + +请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言: +行动类型: +fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择 +wait: 当你做出了发言,对方尚未回复时等待对方的回复 +listening: 倾听对方发言,当你认为对方发言尚未结束时采用 +direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言 +rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标 + +请以JSON格式输出,包含以下字段: +1. action: 行动类型,注意你之前的行为 +2. reason: 选择该行动的原因,注意你之前的行为(简要解释) + +注意:请严格按照JSON格式输出,不要包含任何其他内容。""" + + logger.debug(f"发送到LLM的提示词: {prompt}") + try: + content, _ = await self.llm.generate_response_async(prompt) + logger.debug(f"LLM原始返回内容: {content}") + + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "action", "reason", + default_values={"action": "direct_reply", "reason": "没有明确原因"} + ) + + if not success: + return "direct_reply", "JSON解析失败,选择直接回复" + + action = result["action"] + reason = result["reason"] + + # 验证action类型 + if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal"]: + logger.warning(f"未知的行动类型: {action},默认使用listening") + action = "listening" + + logger.info(f"规划的行动: {action}") + logger.info(f"行动原因: {reason}") + return action, reason + + except Exception as e: + logger.error(f"规划行动时出错: {str(e)}") + return "direct_reply", "发生错误,选择直接回复" \ No newline at end of file diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index c918576e2..93618cf2d 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -2,9 +2,10 @@ import time import asyncio from typing import Optional, Dict, Any, List, Tuple from src.common.logger import get_module_logger -from src.common.database import db from ..message.message_base import UserInfo from ..config.config import global_config +from .chat_states import NotificationManager, create_new_message_notification, create_cold_chat_notification +from .message_storage import MessageStorage, MongoDBMessageStorage logger = get_module_logger("chat_observer") @@ -16,37 +17,41 @@ class ChatObserver: _instances: Dict[str, "ChatObserver"] = {} @classmethod - def get_instance(cls, stream_id: str) -> "ChatObserver": + def get_instance(cls, stream_id: str, message_storage: Optional[MessageStorage] = None) -> 'ChatObserver': """获取或创建观察器实例 Args: stream_id: 聊天流ID - + message_storage: 消息存储实现,如果为None则使用MongoDB实现 + Returns: ChatObserver: 观察器实例 """ if stream_id not in cls._instances: - cls._instances[stream_id] = cls(stream_id) + cls._instances[stream_id] = cls(stream_id, message_storage) return cls._instances[stream_id] - - def __init__(self, stream_id: str): + + def __init__(self, stream_id: str, message_storage: Optional[MessageStorage] = None): """初始化观察器 Args: stream_id: 聊天流ID + message_storage: 消息存储实现,如果为None则使用MongoDB实现 """ if stream_id in self._instances: raise RuntimeError(f"ChatObserver for {stream_id} already exists. Use get_instance() instead.") self.stream_id = stream_id + self.message_storage = message_storage or MongoDBMessageStorage() + self.last_user_speak_time: Optional[float] = None # 对方上次发言时间 - self.last_bot_speak_time: Optional[float] = None # 机器人上次发言时间 - self.last_check_time: float = time.time() # 上次查看聊天记录时间 - self.last_message_read: Optional[str] = None # 最后读取的消息ID - self.last_message_time: Optional[float] = None # 最后一条消息的时间戳 - - self.waiting_start_time: Optional[float] = None # 等待开始时间 - + self.last_bot_speak_time: Optional[float] = None # 机器人上次发言时间 + self.last_check_time: float = time.time() # 上次查看聊天记录时间 + self.last_message_read: Optional[str] = None # 最后读取的消息ID + self.last_message_time: Optional[float] = None # 最后一条消息的时间戳 + + self.waiting_start_time: float = time.time() # 等待开始时间,初始化为当前时间 + # 消息历史记录 self.message_history: List[Dict[str, Any]] = [] # 所有消息历史 self.last_message_id: Optional[str] = None # 最后一条消息的ID @@ -57,48 +62,42 @@ class ChatObserver: self._task: Optional[asyncio.Task] = None self._update_event = asyncio.Event() # 触发更新的事件 self._update_complete = asyncio.Event() # 更新完成的事件 - - def check(self) -> bool: + + # 通知管理器 + self.notification_manager = NotificationManager() + + # 冷场检查配置 + self.cold_chat_threshold: float = 60.0 # 60秒无消息判定为冷场 + self.last_cold_chat_check: float = time.time() + self.is_cold_chat_state: bool = False + + self.update_event = asyncio.Event() + self.update_interval = 5 # 更新间隔(秒) + self.message_cache = [] + self.update_running = False + + async def check(self) -> bool: """检查距离上一次观察之后是否有了新消息 Returns: bool: 是否有新消息 """ logger.debug(f"检查距离上一次观察之后是否有了新消息: {self.last_check_time}") - - query = {"chat_id": self.stream_id, "time": {"$gt": self.last_check_time}} - - # 只需要查询是否存在,不需要获取具体消息 - new_message_exists = db.messages.find_one(query) is not None - + + new_message_exists = await self.message_storage.has_new_messages( + self.stream_id, + self.last_check_time + ) + if new_message_exists: logger.debug("发现新消息") self.last_check_time = time.time() return new_message_exists - - def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: - """获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象""" - messages = self.get_message_history(self.last_check_time) - for message in messages: - self._add_message_to_history(message) - return messages, self.message_history - - def new_message_after(self, time_point: float) -> bool: - """判断是否在指定时间点后有新消息 - - Args: - time_point: 时间戳 - - Returns: - bool: 是否有新消息 - """ - logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point}") - return self.last_message_time is None or self.last_message_time > time_point - - def _add_message_to_history(self, message: Dict[str, Any]): - """添加消息到历史记录 - + + async def _add_message_to_history(self, message: Dict[str, Any]): + """添加消息到历史记录并发送通知 + Args: message: 消息数据 """ @@ -113,7 +112,76 @@ class ChatObserver: self.last_bot_speak_time = message["time"] else: self.last_user_speak_time = message["time"] - + + # 发送新消息通知 + notification = create_new_message_notification( + sender="chat_observer", + target="pfc", + message=message + ) + await self.notification_manager.send_notification(notification) + + # 检查并更新冷场状态 + await self._check_cold_chat() + + async def _check_cold_chat(self): + """检查是否处于冷场状态并发送通知""" + current_time = time.time() + + # 每10秒检查一次冷场状态 + if current_time - self.last_cold_chat_check < 10: + return + + self.last_cold_chat_check = current_time + + # 判断是否冷场 + is_cold = False + if self.last_message_time is None: + is_cold = True + else: + is_cold = (current_time - self.last_message_time) > self.cold_chat_threshold + + # 如果冷场状态发生变化,发送通知 + if is_cold != self.is_cold_chat_state: + self.is_cold_chat_state = is_cold + notification = create_cold_chat_notification( + sender="chat_observer", + target="pfc", + is_cold=is_cold + ) + await self.notification_manager.send_notification(notification) + + async def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象""" + messages = await self.message_storage.get_messages_after( + self.stream_id, + self.last_message_read + ) + for message in messages: + await self._add_message_to_history(message) + return messages, self.message_history + + def new_message_after(self, time_point: float) -> bool: + """判断是否在指定时间点后有新消息 + + Args: + time_point: 时间戳 + + Returns: + bool: 是否有新消息 + """ + if time_point is None: + logger.warning("time_point 为 None,返回 False") + return False + + if self.last_message_time is None: + logger.debug("没有最后消息时间,返回 False") + return False + + has_new = self.last_message_time > time_point + logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point} = {has_new}") + return has_new + def get_message_history( self, start_time: Optional[float] = None, @@ -156,15 +224,11 @@ class ChatObserver: Returns: List[Dict[str, Any]]: 新消息列表 """ - query = {"chat_id": self.stream_id} - if self.last_message_read: - # 获取ID大于last_message_read的消息 - last_message = db.messages.find_one({"message_id": self.last_message_read}) - if last_message: - query["time"] = {"$gt": last_message["time"]} - - new_messages = list(db.messages.find(query).sort("time", 1)) - + new_messages = await self.message_storage.get_messages_after( + self.stream_id, + self.last_message_read + ) + if new_messages: self.last_message_read = new_messages[-1]["message_id"] @@ -179,27 +243,24 @@ class ChatObserver: Returns: List[Dict[str, Any]]: 最多5条消息 """ - query = {"chat_id": self.stream_id, "time": {"$lt": time_point}} - - new_messages = list( - db.messages.find(query).sort("time", -1).limit(5) # 倒序获取5条 + new_messages = await self.message_storage.get_messages_before( + self.stream_id, + time_point ) - - # 将消息按时间正序排列 - new_messages.reverse() - + if new_messages: self.last_message_read = new_messages[-1]["message_id"] return new_messages - + + '''主要观察循环''' async def _update_loop(self): """更新循环""" try: start_time = time.time() messages = await self._fetch_new_messages_before(start_time) for message in messages: - self._add_message_to_history(message) + await self._add_message_to_history(message) except Exception as e: logger.error(f"缓冲消息出错: {e}") @@ -220,8 +281,8 @@ class ChatObserver: if new_messages: # 处理新消息 for message in new_messages: - self._add_message_to_history(message) - + await self._add_message_to_history(message) + # 设置完成事件 self._update_complete.set() @@ -312,3 +373,71 @@ class ChatObserver: time_info += f"\n距离对方上次发言已经过去了{int(user_speak_ago)}秒" return time_info + + def start_periodic_update(self): + """启动观察器的定期更新""" + if not self.update_running: + self.update_running = True + asyncio.create_task(self._periodic_update()) + + async def _periodic_update(self): + """定期更新消息历史""" + try: + while self.update_running: + await self._update_message_history() + await asyncio.sleep(self.update_interval) + except Exception as e: + logger.error(f"定期更新消息历史时出错: {str(e)}") + + async def _update_message_history(self) -> bool: + """更新消息历史 + + Returns: + bool: 是否有新消息 + """ + try: + messages = await self.message_storage.get_messages_for_stream( + self.stream_id, + limit=50 + ) + + if not messages: + return False + + # 检查是否有新消息 + has_new_messages = False + if messages and (not self.message_cache or messages[0]["message_id"] != self.message_cache[0]["message_id"]): + has_new_messages = True + + self.message_cache = messages + + if has_new_messages: + self.update_event.set() + self.update_event.clear() + return True + return False + + except Exception as e: + logger.error(f"更新消息历史时出错: {str(e)}") + return False + + def get_cached_messages(self, limit: int = 50) -> List[Dict[str, Any]]: + """获取缓存的消息历史 + + Args: + limit: 获取的最大消息数量,默认50 + + Returns: + List[Dict[str, Any]]: 缓存的消息历史列表 + """ + return self.message_cache[:limit] + + def get_last_message(self) -> Optional[Dict[str, Any]]: + """获取最后一条消息 + + Returns: + Optional[Dict[str, Any]]: 最后一条消息,如果没有则返回None + """ + if not self.message_cache: + return None + return self.message_cache[0] diff --git a/src/plugins/PFC/chat_states.py b/src/plugins/PFC/chat_states.py new file mode 100644 index 000000000..bb7cfc4a6 --- /dev/null +++ b/src/plugins/PFC/chat_states.py @@ -0,0 +1,267 @@ +from enum import Enum, auto +from typing import Optional, Dict, Any, List, Set +from dataclasses import dataclass +from datetime import datetime +from abc import ABC, abstractmethod + +class ChatState(Enum): + """聊天状态枚举""" + NORMAL = auto() # 正常状态 + NEW_MESSAGE = auto() # 有新消息 + COLD_CHAT = auto() # 冷场状态 + ACTIVE_CHAT = auto() # 活跃状态 + BOT_SPEAKING = auto() # 机器人正在说话 + USER_SPEAKING = auto() # 用户正在说话 + SILENT = auto() # 沉默状态 + ERROR = auto() # 错误状态 + +class NotificationType(Enum): + """通知类型枚举""" + NEW_MESSAGE = auto() # 新消息通知 + COLD_CHAT = auto() # 冷场通知 + ACTIVE_CHAT = auto() # 活跃通知 + BOT_SPEAKING = auto() # 机器人说话通知 + USER_SPEAKING = auto() # 用户说话通知 + MESSAGE_DELETED = auto() # 消息删除通知 + USER_JOINED = auto() # 用户加入通知 + USER_LEFT = auto() # 用户离开通知 + ERROR = auto() # 错误通知 + +@dataclass +class ChatStateInfo: + """聊天状态信息""" + state: ChatState + last_message_time: Optional[float] = None + last_message_content: Optional[str] = None + last_speaker: Optional[str] = None + message_count: int = 0 + cold_duration: float = 0.0 # 冷场持续时间(秒) + active_duration: float = 0.0 # 活跃持续时间(秒) + +@dataclass +class Notification: + """通知基类""" + type: NotificationType + timestamp: float + sender: str # 发送者标识 + target: str # 接收者标识 + data: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + "type": self.type.name, + "timestamp": self.timestamp, + "data": self.data + } + +@dataclass +class StateNotification(Notification): + """持续状态通知""" + is_active: bool = True + + def to_dict(self) -> Dict[str, Any]: + base_dict = super().to_dict() + base_dict["is_active"] = self.is_active + return base_dict + +class NotificationHandler(ABC): + """通知处理器接口""" + + @abstractmethod + async def handle_notification(self, notification: Notification): + """处理通知""" + pass + +class NotificationManager: + """通知管理器""" + + def __init__(self): + # 按接收者和通知类型存储处理器 + self._handlers: Dict[str, Dict[NotificationType, List[NotificationHandler]]] = {} + self._active_states: Set[NotificationType] = set() + self._notification_history: List[Notification] = [] + + def register_handler(self, target: str, notification_type: NotificationType, handler: NotificationHandler): + """注册通知处理器 + + Args: + target: 接收者标识(例如:"pfc") + notification_type: 要处理的通知类型 + handler: 处理器实例 + """ + if target not in self._handlers: + self._handlers[target] = {} + if notification_type not in self._handlers[target]: + self._handlers[target][notification_type] = [] + self._handlers[target][notification_type].append(handler) + + def unregister_handler(self, target: str, notification_type: NotificationType, handler: NotificationHandler): + """注销通知处理器 + + Args: + target: 接收者标识 + notification_type: 通知类型 + handler: 要注销的处理器实例 + """ + if target in self._handlers and notification_type in self._handlers[target]: + handlers = self._handlers[target][notification_type] + if handler in handlers: + handlers.remove(handler) + # 如果该类型的处理器列表为空,删除该类型 + if not handlers: + del self._handlers[target][notification_type] + # 如果该目标没有任何处理器,删除该目标 + if not self._handlers[target]: + del self._handlers[target] + + async def send_notification(self, notification: Notification): + """发送通知""" + self._notification_history.append(notification) + + # 如果是状态通知,更新活跃状态 + if isinstance(notification, StateNotification): + if notification.is_active: + self._active_states.add(notification.type) + else: + self._active_states.discard(notification.type) + + # 调用目标接收者的处理器 + target = notification.target + if target in self._handlers: + handlers = self._handlers[target].get(notification.type, []) + for handler in handlers: + await handler.handle_notification(notification) + + def get_active_states(self) -> Set[NotificationType]: + """获取当前活跃的状态""" + return self._active_states.copy() + + def is_state_active(self, state_type: NotificationType) -> bool: + """检查特定状态是否活跃""" + return state_type in self._active_states + + def get_notification_history(self, + sender: Optional[str] = None, + target: Optional[str] = None, + limit: Optional[int] = None) -> List[Notification]: + """获取通知历史 + + Args: + sender: 过滤特定发送者的通知 + target: 过滤特定接收者的通知 + limit: 限制返回数量 + """ + history = self._notification_history + + if sender: + history = [n for n in history if n.sender == sender] + if target: + history = [n for n in history if n.target == target] + + if limit is not None: + history = history[-limit:] + + return history + +# 一些常用的通知创建函数 +def create_new_message_notification(sender: str, target: str, message: Dict[str, Any]) -> Notification: + """创建新消息通知""" + return Notification( + type=NotificationType.NEW_MESSAGE, + timestamp=datetime.now().timestamp(), + sender=sender, + target=target, + data={ + "message_id": message.get("message_id"), + "content": message.get("content"), + "sender": message.get("sender"), + "time": message.get("time") + } + ) + +def create_cold_chat_notification(sender: str, target: str, is_cold: bool) -> StateNotification: + """创建冷场状态通知""" + return StateNotification( + type=NotificationType.COLD_CHAT, + timestamp=datetime.now().timestamp(), + sender=sender, + target=target, + data={"is_cold": is_cold}, + is_active=is_cold + ) + +def create_active_chat_notification(sender: str, target: str, is_active: bool) -> StateNotification: + """创建活跃状态通知""" + return StateNotification( + type=NotificationType.ACTIVE_CHAT, + timestamp=datetime.now().timestamp(), + sender=sender, + target=target, + data={"is_active": is_active}, + is_active=is_active + ) + +class ChatStateManager: + """聊天状态管理器""" + + def __init__(self): + self.current_state = ChatState.NORMAL + self.state_info = ChatStateInfo(state=ChatState.NORMAL) + self.state_history: list[ChatStateInfo] = [] + + def update_state(self, new_state: ChatState, **kwargs): + """更新聊天状态 + + Args: + new_state: 新的状态 + **kwargs: 其他状态信息 + """ + self.current_state = new_state + self.state_info.state = new_state + + # 更新其他状态信息 + for key, value in kwargs.items(): + if hasattr(self.state_info, key): + setattr(self.state_info, key, value) + + # 记录状态历史 + self.state_history.append(self.state_info) + + def get_current_state_info(self) -> ChatStateInfo: + """获取当前状态信息""" + return self.state_info + + def get_state_history(self) -> list[ChatStateInfo]: + """获取状态历史""" + return self.state_history + + def is_cold_chat(self, threshold: float = 60.0) -> bool: + """判断是否处于冷场状态 + + Args: + threshold: 冷场阈值(秒) + + Returns: + bool: 是否冷场 + """ + if not self.state_info.last_message_time: + return True + + current_time = datetime.now().timestamp() + return (current_time - self.state_info.last_message_time) > threshold + + def is_active_chat(self, threshold: float = 5.0) -> bool: + """判断是否处于活跃状态 + + Args: + threshold: 活跃阈值(秒) + + Returns: + bool: 是否活跃 + """ + if not self.state_info.last_message_time: + return False + + current_time = datetime.now().timestamp() + return (current_time - self.state_info.last_message_time) <= threshold \ No newline at end of file diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py new file mode 100644 index 000000000..dda380491 --- /dev/null +++ b/src/plugins/PFC/conversation.py @@ -0,0 +1,245 @@ +import asyncio +import datetime +from typing import Dict, Any +from ..chat.message import Message +from .pfc_types import ConversationState +from .pfc import ChatObserver, GoalAnalyzer, Waiter, DirectMessageSender +from src.common.logger import get_module_logger +from .action_planner import ActionPlanner +from .observation_info import ObservationInfo +from .conversation_info import ConversationInfo +from .reply_generator import ReplyGenerator +from ..chat.chat_stream import ChatStream +from ..message.message_base import UserInfo +from src.plugins.chat.chat_stream import chat_manager +from .pfc_KnowledgeFetcher import KnowledgeFetcher +import traceback + +logger = get_module_logger("pfc_conversation") + + +class Conversation: + """对话类,负责管理单个对话的状态和行为""" + + def __init__(self, stream_id: str): + """初始化对话实例 + + Args: + stream_id: 聊天流ID + """ + self.stream_id = stream_id + self.state = ConversationState.INIT + self.should_continue = False + + # 回复相关 + self.generated_reply = "" + + async def _initialize(self): + """初始化实例,注册所有组件""" + + try: + self.action_planner = ActionPlanner(self.stream_id) + self.goal_analyzer = GoalAnalyzer(self.stream_id) + self.reply_generator = ReplyGenerator(self.stream_id) + self.knowledge_fetcher = KnowledgeFetcher() + self.waiter = Waiter(self.stream_id) + self.direct_sender = DirectMessageSender() + + # 获取聊天流信息 + self.chat_stream = chat_manager.get_stream(self.stream_id) + + self.stop_action_planner = False + except Exception as e: + logger.error(f"初始化对话实例:注册运行组件失败: {e}") + logger.error(traceback.format_exc()) + raise + + + try: + #决策所需要的信息,包括自身自信和观察信息两部分 + #注册观察器和观测信息 + self.chat_observer = ChatObserver.get_instance(self.stream_id) + self.chat_observer.start() + self.observation_info = ObservationInfo() + self.observation_info.bind_to_chat_observer(self.stream_id) + + #对话信息 + self.conversation_info = ConversationInfo() + except Exception as e: + logger.error(f"初始化对话实例:注册信息组件失败: {e}") + logger.error(traceback.format_exc()) + raise + + # 组件准备完成,启动该论对话 + self.should_continue = True + asyncio.create_task(self.start()) + + + async def start(self): + """开始对话流程""" + try: + logger.info("对话系统启动中...") + asyncio.create_task(self._plan_and_action_loop()) + except Exception as e: + logger.error(f"启动对话系统失败: {e}") + raise + + + async def _plan_and_action_loop(self): + """思考步,PFC核心循环模块""" + # 获取最近的消息历史 + while self.should_continue: + # 使用决策信息来辅助行动规划 + action, reason = await self.action_planner.plan( + self.observation_info, + self.conversation_info + ) + if self._check_new_messages_after_planning(): + continue + + # 执行行动 + await self._handle_action(action, reason, self.observation_info, self.conversation_info) + + def _check_new_messages_after_planning(self): + """检查在规划后是否有新消息""" + if self.observation_info.new_messages_count > 0: + logger.info(f"发现{self.observation_info.new_messages_count}条新消息,可能需要重新考虑行动") + # 如果需要,可以在这里添加逻辑来根据新消息重新决定行动 + return True + return False + + + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: + """将消息字典转换为Message对象""" + try: + chat_info = msg_dict.get("chat_info", {}) + chat_stream = ChatStream.from_dict(chat_info) + user_info = UserInfo.from_dict(msg_dict.get("user_info", {})) + + return Message( + message_id=msg_dict["message_id"], + chat_stream=chat_stream, + time=msg_dict["time"], + user_info=user_info, + processed_plain_text=msg_dict.get("processed_plain_text", ""), + detailed_plain_text=msg_dict.get("detailed_plain_text", "") + ) + except Exception as e: + logger.warning(f"转换消息时出错: {e}") + raise + + async def _handle_action(self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo): + """处理规划的行动""" + logger.info(f"执行行动: {action}, 原因: {reason}") + + # 记录action历史,先设置为stop,完成后再设置为done + conversation_info.done_action.append({ + "action": action, + "reason": reason, + "status": "start", + "time": datetime.datetime.now().strftime("%H:%M:%S") + }) + + + if action == "direct_reply": + self.state = ConversationState.GENERATING + self.generated_reply = await self.reply_generator.generate( + observation_info, + conversation_info + ) + + # # 检查回复是否合适 + # is_suitable, reason, need_replan = await self.reply_generator.check_reply( + # self.generated_reply, + # self.current_goal + # ) + + if self._check_new_messages_after_planning(): + return None + + await self._send_reply() + + conversation_info.done_action.append({ + "action": action, + "reason": reason, + "status": "done", + "time": datetime.datetime.now().strftime("%H:%M:%S") + }) + + elif action == "fetch_knowledge": + self.state = ConversationState.FETCHING + knowledge = "TODO:知识" + topic = "TODO:关键词" + + logger.info(f"假装获取到知识{knowledge},关键词是: {topic}") + + if knowledge: + if topic not in self.conversation_info.knowledge_list: + self.conversation_info.knowledge_list.append({ + "topic": topic, + "knowledge": knowledge + }) + else: + self.conversation_info.knowledge_list[topic] += knowledge + + elif action == "rethink_goal": + self.state = ConversationState.RETHINKING + await self.goal_analyzer.analyze_goal(conversation_info, observation_info) + + + elif action == "listening": + self.state = ConversationState.LISTENING + logger.info("倾听对方发言...") + if await self.waiter.wait(): # 如果返回True表示超时 + await self._send_timeout_message() + await self._stop_conversation() + + else: # wait + self.state = ConversationState.WAITING + logger.info("等待更多信息...") + if await self.waiter.wait(): # 如果返回True表示超时 + await self._send_timeout_message() + await self._stop_conversation() + + async def _send_timeout_message(self): + """发送超时结束消息""" + try: + messages = self.chat_observer.get_cached_messages(limit=1) + if not messages: + return + + latest_message = self._convert_to_message(messages[0]) + await self.direct_sender.send_message( + chat_stream=self.chat_stream, + content="TODO:超时消息", + reply_to_message=latest_message + ) + except Exception as e: + logger.error(f"发送超时消息失败: {str(e)}") + + async def _send_reply(self): + """发送回复""" + if not self.generated_reply: + logger.warning("没有生成回复") + return + + messages = self.chat_observer.get_cached_messages(limit=1) + if not messages: + logger.warning("没有最近的消息可以回复") + return + + latest_message = self._convert_to_message(messages[0]) + try: + await self.direct_sender.send_message( + chat_stream=self.chat_stream, + content=self.generated_reply, + reply_to_message=latest_message + ) + self.chat_observer.trigger_update() # 触发立即更新 + if not await self.chat_observer.wait_for_update(): + logger.warning("等待消息更新超时") + + self.state = ConversationState.ANALYZING + except Exception as e: + logger.error(f"发送消息失败: {str(e)}") + self.state = ConversationState.ANALYZING \ No newline at end of file diff --git a/src/plugins/PFC/conversation_info.py b/src/plugins/PFC/conversation_info.py new file mode 100644 index 000000000..5b8262a16 --- /dev/null +++ b/src/plugins/PFC/conversation_info.py @@ -0,0 +1,8 @@ + + +class ConversationInfo: + def __init__(self): + self.done_action = [] + self.goal_list = [] + self.knowledge_list = [] + self.memory_list = [] \ No newline at end of file diff --git a/src/plugins/PFC/message_sender.py b/src/plugins/PFC/message_sender.py new file mode 100644 index 000000000..6df1e7ded --- /dev/null +++ b/src/plugins/PFC/message_sender.py @@ -0,0 +1,49 @@ +from typing import Optional +from src.common.logger import get_module_logger +from ..chat.chat_stream import ChatStream +from ..chat.message import Message +from ..message.message_base import Seg +from src.plugins.chat.message import MessageSending + +logger = get_module_logger("message_sender") + +class DirectMessageSender: + """直接消息发送器""" + + def __init__(self): + pass + + async def send_message( + self, + chat_stream: ChatStream, + content: str, + reply_to_message: Optional[Message] = None, + ) -> None: + """发送消息到聊天流 + + Args: + chat_stream: 聊天流 + content: 消息内容 + reply_to_message: 要回复的消息(可选) + """ + try: + # 创建消息内容 + segments = [Seg(type="text", data={"text": content})] + + # 检查是否需要引用回复 + if reply_to_message: + reply_id = reply_to_message.message_id + message_sending = MessageSending( + segments=segments, + reply_to_id=reply_id + ) + else: + message_sending = MessageSending(segments=segments) + + # 发送消息 + await chat_stream.send_message(message_sending) + logger.info(f"消息已发送: {content}") + + except Exception as e: + logger.error(f"发送消息失败: {str(e)}") + raise \ No newline at end of file diff --git a/src/plugins/PFC/message_storage.py b/src/plugins/PFC/message_storage.py new file mode 100644 index 000000000..3c7cab8b3 --- /dev/null +++ b/src/plugins/PFC/message_storage.py @@ -0,0 +1,134 @@ +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Optional +from src.common.database import db + +class MessageStorage(ABC): + """消息存储接口""" + + @abstractmethod + async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]: + """获取指定消息ID之后的所有消息 + + Args: + chat_id: 聊天ID + message_id: 消息ID,如果为None则获取所有消息 + + Returns: + List[Dict[str, Any]]: 消息列表 + """ + pass + + @abstractmethod + async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]: + """获取指定时间点之前的消息 + + Args: + chat_id: 聊天ID + time_point: 时间戳 + limit: 最大消息数量 + + Returns: + List[Dict[str, Any]]: 消息列表 + """ + pass + + @abstractmethod + async def has_new_messages(self, chat_id: str, after_time: float) -> bool: + """检查是否有新消息 + + Args: + chat_id: 聊天ID + after_time: 时间戳 + + Returns: + bool: 是否有新消息 + """ + pass + +class MongoDBMessageStorage(MessageStorage): + """MongoDB消息存储实现""" + + def __init__(self): + self.db = db + + async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]: + query = {"chat_id": chat_id} + + if message_id: + # 获取ID大于message_id的消息 + last_message = self.db.messages.find_one({"message_id": message_id}) + if last_message: + query["time"] = {"$gt": last_message["time"]} + + return list( + self.db.messages.find(query).sort("time", 1) + ) + + async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]: + query = { + "chat_id": chat_id, + "time": {"$lt": time_point} + } + + messages = list( + self.db.messages.find(query).sort("time", -1).limit(limit) + ) + + # 将消息按时间正序排列 + messages.reverse() + return messages + + async def has_new_messages(self, chat_id: str, after_time: float) -> bool: + query = { + "chat_id": chat_id, + "time": {"$gt": after_time} + } + + return self.db.messages.find_one(query) is not None + +# # 创建一个内存消息存储实现,用于测试 +# class InMemoryMessageStorage(MessageStorage): +# """内存消息存储实现,主要用于测试""" + +# def __init__(self): +# self.messages: Dict[str, List[Dict[str, Any]]] = {} + +# async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]: +# if chat_id not in self.messages: +# return [] + +# messages = self.messages[chat_id] +# if not message_id: +# return messages + +# # 找到message_id的索引 +# try: +# index = next(i for i, m in enumerate(messages) if m["message_id"] == message_id) +# return messages[index + 1:] +# except StopIteration: +# return [] + +# async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]: +# if chat_id not in self.messages: +# return [] + +# messages = [ +# m for m in self.messages[chat_id] +# if m["time"] < time_point +# ] + +# return messages[-limit:] + +# async def has_new_messages(self, chat_id: str, after_time: float) -> bool: +# if chat_id not in self.messages: +# return False + +# return any(m["time"] > after_time for m in self.messages[chat_id]) + +# # 测试辅助方法 +# def add_message(self, chat_id: str, message: Dict[str, Any]): +# """添加测试消息""" +# if chat_id not in self.messages: +# self.messages[chat_id] = [] +# self.messages[chat_id].append(message) +# self.messages[chat_id].sort(key=lambda m: m["time"]) \ No newline at end of file diff --git a/src/plugins/PFC/notification_handler.py b/src/plugins/PFC/notification_handler.py new file mode 100644 index 000000000..38c0d0dee --- /dev/null +++ b/src/plugins/PFC/notification_handler.py @@ -0,0 +1,71 @@ +from typing import TYPE_CHECKING +from src.common.logger import get_module_logger +from .chat_states import NotificationHandler, Notification, NotificationType + +if TYPE_CHECKING: + from .conversation import Conversation + +logger = get_module_logger("notification_handler") + +class PFCNotificationHandler(NotificationHandler): + """PFC通知处理器""" + + def __init__(self, conversation: 'Conversation'): + """初始化PFC通知处理器 + + Args: + conversation: 对话实例 + """ + self.conversation = conversation + + async def handle_notification(self, notification: Notification): + """处理通知 + + Args: + notification: 通知对象 + """ + logger.debug(f"收到通知: {notification.type.name}, 数据: {notification.data}") + + # 根据通知类型执行不同的处理 + if notification.type == NotificationType.NEW_MESSAGE: + # 新消息通知 + await self._handle_new_message(notification) + elif notification.type == NotificationType.COLD_CHAT: + # 冷聊天通知 + await self._handle_cold_chat(notification) + elif notification.type == NotificationType.COMMAND: + # 命令通知 + await self._handle_command(notification) + else: + logger.warning(f"未知的通知类型: {notification.type.name}") + + async def _handle_new_message(self, notification: Notification): + """处理新消息通知 + + Args: + notification: 通知对象 + """ + + # 更新决策信息 + observation_info = self.conversation.observation_info + observation_info.last_message_time = notification.data.get("time", 0) + observation_info.add_unprocessed_message(notification.data) + + # 手动触发观察器更新 + self.conversation.chat_observer.trigger_update() + + async def _handle_cold_chat(self, notification: Notification): + """处理冷聊天通知 + + Args: + notification: 通知对象 + """ + # 获取冷聊天信息 + cold_duration = notification.data.get("duration", 0) + + # 更新决策信息 + observation_info = self.conversation.observation_info + observation_info.conversation_cold_duration = cold_duration + + logger.info(f"对话已冷: {cold_duration}秒") + \ No newline at end of file diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py new file mode 100644 index 000000000..2967f10e3 --- /dev/null +++ b/src/plugins/PFC/observation_info.py @@ -0,0 +1,246 @@ +#Programmable Friendly Conversationalist +#Prefrontal cortex +from typing import List, Optional, Dict, Any, Set +from ..message.message_base import UserInfo +import time +from dataclasses import dataclass, field +from src.common.logger import get_module_logger +from .chat_observer import ChatObserver +from .chat_states import NotificationHandler + +logger = get_module_logger("observation_info") + +class ObservationInfoHandler(NotificationHandler): + """ObservationInfo的通知处理器""" + + def __init__(self, observation_info: 'ObservationInfo'): + """初始化处理器 + + Args: + observation_info: 要更新的ObservationInfo实例 + """ + self.observation_info = observation_info + + async def handle_notification(self, notification: Dict[str, Any]): + """处理通知 + + Args: + notification: 通知数据 + """ + notification_type = notification.get("type") + data = notification.get("data", {}) + + if notification_type == "NEW_MESSAGE": + # 处理新消息通知 + logger.debug(f"收到新消息通知data: {data}") + message = data.get("message", {}) + self.observation_info.update_from_message(message) + # self.observation_info.has_unread_messages = True + # self.observation_info.new_unread_message.append(message.get("processed_plain_text", "")) + + elif notification_type == "COLD_CHAT": + # 处理冷场通知 + is_cold = data.get("is_cold", False) + self.observation_info.update_cold_chat_status(is_cold, time.time()) + + elif notification_type == "ACTIVE_CHAT": + # 处理活跃通知 + is_active = data.get("is_active", False) + self.observation_info.is_cold = not is_active + + elif notification_type == "BOT_SPEAKING": + # 处理机器人说话通知 + self.observation_info.is_typing = False + self.observation_info.last_bot_speak_time = time.time() + + elif notification_type == "USER_SPEAKING": + # 处理用户说话通知 + self.observation_info.is_typing = False + self.observation_info.last_user_speak_time = time.time() + + elif notification_type == "MESSAGE_DELETED": + # 处理消息删除通知 + message_id = data.get("message_id") + self.observation_info.unprocessed_messages = [ + msg for msg in self.observation_info.unprocessed_messages + if msg.get("message_id") != message_id + ] + + elif notification_type == "USER_JOINED": + # 处理用户加入通知 + user_id = data.get("user_id") + if user_id: + self.observation_info.active_users.add(user_id) + + elif notification_type == "USER_LEFT": + # 处理用户离开通知 + user_id = data.get("user_id") + if user_id: + self.observation_info.active_users.discard(user_id) + + elif notification_type == "ERROR": + # 处理错误通知 + error_msg = data.get("error", "") + logger.error(f"收到错误通知: {error_msg}") + +@dataclass +class ObservationInfo: + """决策信息类,用于收集和管理来自chat_observer的通知信息""" + + #data_list + chat_history: List[str] = field(default_factory=list) + unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) + active_users: Set[str] = field(default_factory=set) + + #data + last_bot_speak_time: Optional[float] = None + last_user_speak_time: Optional[float] = None + last_message_time: Optional[float] = None + last_message_content: str = "" + last_message_sender: Optional[str] = None + bot_id: Optional[str] = None + new_messages_count: int = 0 + cold_chat_duration: float = 0.0 + + #state + is_typing: bool = False + has_unread_messages: bool = False + is_cold_chat: bool = False + changed: bool = False + + # #spec + # meta_plan_trigger: bool = False + + def __post_init__(self): + """初始化后创建handler""" + self.chat_observer = None + self.handler = ObservationInfoHandler(self) + + def bind_to_chat_observer(self, stream_id: str): + """绑定到指定的chat_observer + + Args: + stream_id: 聊天流ID + """ + self.chat_observer = ChatObserver.get_instance(stream_id) + self.chat_observer.notification_manager.register_handler( + target="observation_info", + notification_type="NEW_MESSAGE", + handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", + notification_type="COLD_CHAT", + handler=self.handler + ) + + def unbind_from_chat_observer(self): + """解除与chat_observer的绑定""" + if self.chat_observer: + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", + notification_type="NEW_MESSAGE", + handler=self.handler + ) + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", + notification_type="COLD_CHAT", + handler=self.handler + ) + self.chat_observer = None + + def update_from_message(self, message: Dict[str, Any]): + """从消息更新信息 + + Args: + message: 消息数据 + """ + logger.debug(f"更新信息from_message: {message}") + self.last_message_time = message["time"] + self.last_message_content = message.get("processed_plain_text", "") + + user_info = UserInfo.from_dict(message.get("user_info", {})) + self.last_message_sender = user_info.user_id + + if user_info.user_id == self.bot_id: + self.last_bot_speak_time = message["time"] + else: + self.last_user_speak_time = message["time"] + self.active_users.add(user_info.user_id) + + self.new_messages_count += 1 + self.unprocessed_messages.append(message) + + self.update_changed() + + def update_changed(self): + """更新changed状态""" + self.changed = True + # self.meta_plan_trigger = True + + def update_cold_chat_status(self, is_cold: bool, current_time: float): + """更新冷场状态 + + Args: + is_cold: 是否冷场 + current_time: 当前时间 + """ + self.is_cold_chat = is_cold + if is_cold and self.last_message_time: + self.cold_chat_duration = current_time - self.last_message_time + + def get_active_duration(self) -> float: + """获取当前活跃时长 + + Returns: + float: 最后一条消息到现在的时长(秒) + """ + if not self.last_message_time: + return 0.0 + return time.time() - self.last_message_time + + def get_user_response_time(self) -> Optional[float]: + """获取用户响应时间 + + Returns: + Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None + """ + if not self.last_user_speak_time: + return None + return time.time() - self.last_user_speak_time + + def get_bot_response_time(self) -> Optional[float]: + """获取机器人响应时间 + + Returns: + Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None + """ + if not self.last_bot_speak_time: + return None + return time.time() - self.last_bot_speak_time + + def clear_unprocessed_messages(self): + """清空未处理消息列表""" + # 将未处理消息添加到历史记录中 + for message in self.unprocessed_messages: + if "processed_plain_text" in message: + self.chat_history.append(message["processed_plain_text"]) + # 清空未处理消息列表 + self.has_unread_messages = False + self.unprocessed_messages.clear() + self.new_messages_count = 0 + + def add_unprocessed_message(self, message: Dict[str, Any]): + """添加未处理的消息 + + Args: + message: 消息数据 + """ + # 防止重复添加同一消息 + message_id = message.get("message_id") + if message_id and not any(m.get("message_id") == message_id for m in self.unprocessed_messages): + self.unprocessed_messages.append(message) + self.new_messages_count += 1 + + # 同时更新其他消息相关信息 + self.update_from_message(message) \ No newline at end of file diff --git a/src/plugins/PFC/pfc.py b/src/plugins/PFC/pfc.py index 6816488aa..62b28acb4 100644 --- a/src/plugins/PFC/pfc.py +++ b/src/plugins/PFC/pfc.py @@ -2,8 +2,7 @@ # Prefrontal cortex import datetime import asyncio -from typing import List, Optional, Dict, Any, Tuple, Literal -from enum import Enum +from typing import List, Optional, Tuple, TYPE_CHECKING from src.common.logger import get_module_logger from ..chat.chat_stream import ChatStream from ..message.message_base import UserInfo, Seg @@ -11,153 +10,21 @@ from ..chat.message import Message from ..models.utils_model import LLM_request from ..config.config import global_config from src.plugins.chat.message import MessageSending -from src.plugins.chat.chat_stream import chat_manager from ..message.api import global_api from ..storage.storage import MessageStorage from .chat_observer import ChatObserver -from .pfc_KnowledgeFetcher import KnowledgeFetcher -from .reply_checker import ReplyChecker from .pfc_utils import get_items_from_json from src.individuality.individuality import Individuality +from .conversation_info import ConversationInfo +from .observation_info import ObservationInfo import time +if TYPE_CHECKING: + pass + logger = get_module_logger("pfc") -class ConversationState(Enum): - """对话状态""" - - INIT = "初始化" - RETHINKING = "重新思考" - ANALYZING = "分析历史" - PLANNING = "规划目标" - GENERATING = "生成回复" - CHECKING = "检查回复" - SENDING = "发送消息" - WAITING = "等待" - LISTENING = "倾听" - ENDED = "结束" - JUDGING = "判断" - - -ActionType = Literal["direct_reply", "fetch_knowledge", "wait"] - - -class ActionPlanner: - """行动规划器""" - - def __init__(self, stream_id: str): - self.llm = LLM_request( - model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="action_planning" - ) - self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2) - self.name = global_config.BOT_NICKNAME - self.chat_observer = ChatObserver.get_instance(stream_id) - - async def plan( - self, - goal: str, - method: str, - reasoning: str, - action_history: List[Dict[str, str]] = None, - chat_observer: Optional[ChatObserver] = None, # 添加chat_observer参数 - ) -> Tuple[str, str]: - """规划下一步行动 - - Args: - goal: 对话目标 - reasoning: 目标原因 - action_history: 行动历史记录 - - Returns: - Tuple[str, str]: (行动类型, 行动原因) - """ - # 构建提示词 - # 获取最近20条消息 - self.chat_observer.waiting_start_time = time.time() - - messages = self.chat_observer.get_message_history(limit=20) - chat_history_text = "" - for msg in messages: - time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") - user_info = UserInfo.from_dict(msg.get("user_info", {})) - sender = user_info.user_nickname or f"用户{user_info.user_id}" - if sender == self.name: - sender = "你说" - chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n" - - personality_text = f"你的名字是{self.name},{self.personality_info}" - - # 构建action历史文本 - action_history_text = "" - if action_history: - if action_history[-1]["action"] == "direct_reply": - action_history_text = "你刚刚发言回复了对方" - - # 获取时间信息 - time_info = self.chat_observer.get_time_info() - - prompt = f"""现在你在参与一场QQ聊天,请分析以下内容,根据信息决定下一步行动: -{personality_text} -当前对话目标:{goal} -实现该对话目标的方式:{method} -产生该对话目标的原因:{reasoning} -{time_info} -最近的对话记录: -{chat_history_text} -{action_history_text} -请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言: -行动类型: -fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择 -wait: 当你做出了发言,对方尚未回复时等待对方的回复 -listening: 倾听对方发言,当你认为对方发言尚未结束时采用 -direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言 -rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标 -judge_conversation: 判断对话是否结束,当发现对话目标已经达到或者希望停止对话时选择,会判断对话是否结束 - -请以JSON格式输出,包含以下字段: -1. action: 行动类型,注意你之前的行为 -2. reason: 选择该行动的原因,注意你之前的行为(简要解释) - -注意:请严格按照JSON格式输出,不要包含任何其他内容。""" - - logger.debug(f"发送到LLM的提示词: {prompt}") - try: - content, _ = await self.llm.generate_response_async(prompt) - logger.debug(f"LLM原始返回内容: {content}") - - # 使用简化函数提取JSON内容 - success, result = get_items_from_json( - content, "action", "reason", default_values={"action": "direct_reply", "reason": "默认原因"} - ) - - if not success: - return "direct_reply", "JSON解析失败,选择直接回复" - - action = result["action"] - reason = result["reason"] - - # 验证action类型 - if action not in [ - "direct_reply", - "fetch_knowledge", - "wait", - "listening", - "rethink_goal", - "judge_conversation", - ]: - logger.warning(f"未知的行动类型: {action},默认使用listening") - action = "listening" - - logger.info(f"规划的行动: {action}") - logger.info(f"行动原因: {reason}") - return action, reason - - except Exception as e: - logger.error(f"规划行动时出错: {str(e)}") - return "direct_reply", "发生错误,选择直接回复" - - class GoalAnalyzer: """对话目标分析器""" @@ -176,42 +43,55 @@ class GoalAnalyzer: self.max_goals = 3 # 同时保持的最大目标数量 self.current_goal_and_reason = None - async def analyze_goal(self) -> Tuple[str, str, str]: + async def analyze_goal(self, conversation_info: ConversationInfo, observation_info: ObservationInfo): """分析对话历史并设定目标 Args: - chat_history: 聊天历史记录列表 - + conversation_info: 对话信息 + observation_info: 观察信息 + Returns: Tuple[str, str, str]: (目标, 方法, 原因) """ - max_retries = 3 - for retry in range(max_retries): - try: - # 构建提示词 - messages = self.chat_observer.get_message_history(limit=20) - chat_history_text = "" - for msg in messages: - time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") - user_info = UserInfo.from_dict(msg.get("user_info", {})) - sender = user_info.user_nickname or f"用户{user_info.user_id}" - if sender == self.name: - sender = "你说" - chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n" - - personality_text = f"你的名字是{self.name},{self.personality_info}" - - # 构建当前已有目标的文本 - existing_goals_text = "" - if self.goals: - existing_goals_text = "当前已有的对话目标:\n" - for i, (goal, _, reason) in enumerate(self.goals): - existing_goals_text += f"{i + 1}. 目标: {goal}, 原因: {reason}\n" - - prompt = f"""{personality_text}。现在你在参与一场QQ聊天,请分析以下聊天记录,并根据你的性格特征确定多个明确的对话目标。 + #构建对话目标 + goal_list = conversation_info.goal_list + goal_text = "" + for goal, reason in goal_list: + goal_text += f"目标:{goal};" + goal_text += f"原因:{reason}\n" + + + # 获取聊天历史记录 + chat_history_list = observation_info.chat_history + chat_history_text = "" + for msg in chat_history_list: + chat_history_text += f"{msg}\n" + + if observation_info.new_messages_count > 0: + new_messages_list = observation_info.unprocessed_messages + + chat_history_text += f"有{observation_info.new_messages_count}条新消息:\n" + for msg in new_messages_list: + chat_history_text += f"{msg}\n" + + observation_info.clear_unprocessed_messages() + + + personality_text = f"你的名字是{self.name},{self.personality_info}" + + # 构建action历史文本 + action_history_list = conversation_info.done_action + action_history_text = "你之前做的事情是:" + for action in action_history_list: + action_history_text += f"{action}\n" + + + prompt = f"""{personality_text}。现在你在参与一场QQ聊天,请分析以下聊天记录,并根据你的性格特征确定多个明确的对话目标。 这些目标应该反映出对话的不同方面和意图。 -{existing_goals_text} +{action_history_text} +当前对话目标: +{goal_text} 聊天记录: {chat_history_text} @@ -222,54 +102,37 @@ class GoalAnalyzer: 3. 添加新目标 4. 删除不再相关的目标 -请以JSON格式输出一个当前最主要的对话目标,包含以下字段: +请以JSON格式输出当前的所有对话目标,包含以下字段: 1. goal: 对话目标(简短的一句话) 2. reasoning: 对话原因,为什么设定这个目标(简要解释) 输出格式示例: {{ - "goal": "回答用户关于Python编程的具体问题", - "reasoning": "用户提出了关于Python的技术问题,需要专业且准确的解答" +"goal": "回答用户关于Python编程的具体问题", +"reasoning": "用户提出了关于Python的技术问题,需要专业且准确的解答" +}}, +{{ +"goal": "回答用户关于python安装的具体问题", +"reasoning": "用户提出了关于Python的技术问题,需要专业且准确的解答" }}""" - logger.debug(f"发送到LLM的提示词: {prompt}") - content, _ = await self.llm.generate_response_async(prompt) - logger.debug(f"LLM原始返回内容: {content}") - - # 使用简化函数提取JSON内容 - success, result = get_items_from_json( - content, "goal", "reasoning", required_types={"goal": str, "reasoning": str} - ) - - if not success: - logger.error(f"无法解析JSON,重试第{retry + 1}次") - continue - - goal = result["goal"] - reasoning = result["reasoning"] - - # 使用默认的方法 - method = "以友好的态度回应" - - # 更新目标列表 - await self._update_goals(goal, method, reasoning) - - # 返回当前最主要的目标 - if self.goals: - current_goal, current_method, current_reasoning = self.goals[0] - return current_goal, current_method, current_reasoning - else: - return goal, method, reasoning - - except Exception as e: - logger.error(f"分析对话目标时出错: {str(e)},重试第{retry + 1}次") - if retry == max_retries - 1: - return "保持友好的对话", "以友好的态度回应", "确保对话顺利进行" - continue - - # 所有重试都失败后的默认返回 - return "保持友好的对话", "以友好的态度回应", "确保对话顺利进行" + logger.debug(f"发送到LLM的提示词: {prompt}") + content, _ = await self.llm.generate_response_async(prompt) + logger.debug(f"LLM原始返回内容: {content}") + + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "goal", "reasoning", + required_types={"goal": str, "reasoning": str} + ) + #TODO + + + conversation_info.goal_list.append(result) + + async def _update_goals(self, new_goal: str, method: str, reasoning: str): """更新目标列表 @@ -332,7 +195,7 @@ class GoalAnalyzer: return self.goals[1:].copy() async def analyze_conversation(self, goal, reasoning): - messages = self.chat_observer.get_message_history() + messages = self.chat_observer.get_cached_messages() chat_history_text = "" for msg in messages: time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") @@ -360,40 +223,33 @@ class GoalAnalyzer: {{ "goal_achieved": true, "stop_conversation": false, - "reason": "用户已经得到了满意的回答,但我仍希望继续聊天" + "reason": "虽然目标已达成,但对话仍然有继续的价值" }}""" - logger.debug(f"发送到LLM的提示词: {prompt}") + try: content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - - # 使用简化函数提取JSON内容 + + # 尝试解析JSON success, result = get_items_from_json( content, - "goal_achieved", - "stop_conversation", - "reason", - required_types={"goal_achieved": bool, "stop_conversation": bool, "reason": str}, + "goal_achieved", "stop_conversation", "reason", + required_types={"goal_achieved": bool, "stop_conversation": bool, "reason": str} ) if not success: - return False, False, "确保对话顺利进行" - - # 如果当前目标达成,从目标列表中移除 - if result["goal_achieved"] and not result["stop_conversation"]: - for i, (g, _, _) in enumerate(self.goals): - if g == goal: - self.goals.pop(i) - # 如果还有其他目标,不停止对话 - if self.goals: - result["stop_conversation"] = False - break - - return result["goal_achieved"], result["stop_conversation"], result["reason"] - + logger.error("无法解析对话分析结果JSON") + return False, False, "解析结果失败" + + goal_achieved = result["goal_achieved"] + stop_conversation = result["stop_conversation"] + reason = result["reason"] + + return goal_achieved, stop_conversation, reason + except Exception as e: - logger.error(f"分析对话目标时出错: {str(e)}") - return False, False, "确保对话顺利进行" + logger.error(f"分析对话状态时出错: {str(e)}") + return False, False, f"分析出错: {str(e)}" class Waiter: @@ -410,563 +266,24 @@ class Waiter: Returns: bool: 是否超时(True表示超时) """ - wait_start_time = self.chat_observer.waiting_start_time - while not self.chat_observer.new_message_after(wait_start_time): - await asyncio.sleep(1) - logger.info("等待中...") - # 检查是否超过60秒 + # 使用当前时间作为等待开始时间 + wait_start_time = time.time() + self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间 + + while True: + # 检查是否有新消息 + if self.chat_observer.new_message_after(wait_start_time): + logger.info("等待结束,收到新消息") + return False + + # 检查是否超时 if time.time() - wait_start_time > 300: logger.info("等待超过300秒,结束对话") return True - logger.info("等待结束") - return False - - -class ReplyGenerator: - """回复生成器""" - - def __init__(self, stream_id: str): - self.llm = LLM_request( - model=global_config.llm_normal, temperature=0.7, max_tokens=300, request_type="reply_generation" - ) - self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2) - self.name = global_config.BOT_NICKNAME - self.chat_observer = ChatObserver.get_instance(stream_id) - self.reply_checker = ReplyChecker(stream_id) - - async def generate( - self, - goal: str, - chat_history: List[Message], - knowledge_cache: Dict[str, str], - previous_reply: Optional[str] = None, - retry_count: int = 0, - ) -> str: - """生成回复 - - Args: - goal: 对话目标 - chat_history: 聊天历史 - knowledge_cache: 知识缓存 - previous_reply: 上一次生成的回复(如果有) - retry_count: 当前重试次数 - - Returns: - str: 生成的回复 - """ - # 构建提示词 - logger.debug(f"开始生成回复:当前目标: {goal}") - self.chat_observer.trigger_update() # 触发立即更新 - if not await self.chat_observer.wait_for_update(): - logger.warning("等待消息更新超时") - - messages = self.chat_observer.get_message_history(limit=20) - chat_history_text = "" - for msg in messages: - time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") - user_info = UserInfo.from_dict(msg.get("user_info", {})) - sender = user_info.user_nickname or f"用户{user_info.user_id}" - if sender == self.name: - sender = "你说" - chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n" - - # 整理知识缓存 - knowledge_text = "" - if knowledge_cache: - knowledge_text = "\n相关知识:" - if isinstance(knowledge_cache, dict): - for _source, content in knowledge_cache.items(): - knowledge_text += f"\n{content}" - elif isinstance(knowledge_cache, list): - for item in knowledge_cache: - knowledge_text += f"\n{item}" - - # 添加上一次生成的回复信息 - previous_reply_text = "" - if previous_reply: - previous_reply_text = f"\n上一次生成的回复(需要改进):\n{previous_reply}" - - personality_text = f"你的名字是{self.name},{self.personality_info}" - - prompt = f"""{personality_text}。现在你在参与一场QQ聊天,请根据以下信息生成回复: - -当前对话目标:{goal} -{knowledge_text} -{previous_reply_text} -最近的聊天记录: -{chat_history_text} - -请根据上述信息,以你的性格特征生成一个自然、得体的回复。回复应该: -1. 符合对话目标,以"你"的角度发言 -2. 体现你的性格特征 -3. 自然流畅,像正常聊天一样,简短 -4. 适当利用相关知识,但不要生硬引用 -{"5. 改进上一次回复中的问题" if previous_reply else ""} - -请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清"你"和对方说的话,不要把"你"说的话当做对方说的话,这是你自己说的话。 -请你回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 -请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 -不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。 - -请直接输出回复内容,不需要任何额外格式。""" - - try: - content, _ = await self.llm.generate_response_async(prompt) - logger.info(f"生成的回复: {content}") - is_new = self.chat_observer.check() - logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息") - - # 如果有新消息,重新生成回复 - if is_new: - logger.info("检测到新消息,重新生成回复") - return await self.generate(goal, chat_history, knowledge_cache, None, retry_count) - - return content - - except Exception as e: - logger.error(f"生成回复时出错: {e}") - return "抱歉,我现在有点混乱,让我重新思考一下..." - - async def check_reply(self, reply: str, goal: str, retry_count: int = 0) -> Tuple[bool, str, bool]: - """检查回复是否合适 - - Args: - reply: 生成的回复 - goal: 对话目标 - retry_count: 当前重试次数 - - Returns: - Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) - """ - return await self.reply_checker.check(reply, goal, retry_count) - - -class Conversation: - # 类级别的实例管理 - _instances: Dict[str, "Conversation"] = {} - _instance_lock = asyncio.Lock() # 类级别的全局锁 - _init_events: Dict[str, asyncio.Event] = {} # 初始化完成事件 - _initializing: Dict[str, bool] = {} # 标记是否正在初始化 - - @classmethod - async def get_instance(cls, stream_id: str) -> Optional["Conversation"]: - """获取或创建对话实例 - - Args: - stream_id: 聊天流ID - - Returns: - Optional[Conversation]: 对话实例,如果创建或等待失败则返回None - """ - try: - # 使用全局锁来确保线程安全 - async with cls._instance_lock: - # 如果已经在初始化中,等待初始化完成 - if stream_id in cls._initializing and cls._initializing[stream_id]: - # 释放锁等待初始化 - cls._instance_lock.release() - try: - await asyncio.wait_for(cls._init_events[stream_id].wait(), timeout=5.0) - except asyncio.TimeoutError: - logger.error(f"等待实例 {stream_id} 初始化超时") - return None - finally: - await cls._instance_lock.acquire() - - # 如果实例不存在,创建新实例 - if stream_id not in cls._instances: - cls._instances[stream_id] = cls(stream_id) - cls._init_events[stream_id] = asyncio.Event() - cls._initializing[stream_id] = True - logger.info(f"创建新的对话实例: {stream_id}") - - return cls._instances[stream_id] - except Exception as e: - logger.error(f"获取对话实例失败: {e}") - return None - - @classmethod - async def remove_instance(cls, stream_id: str): - """删除对话实例 - - Args: - stream_id: 聊天流ID - """ - async with cls._instance_lock: - if stream_id in cls._instances: - # 停止相关组件 - instance = cls._instances[stream_id] - instance.chat_observer.stop() - # 删除实例 - del cls._instances[stream_id] - if stream_id in cls._init_events: - del cls._init_events[stream_id] - if stream_id in cls._initializing: - del cls._initializing[stream_id] - logger.info(f"已删除对话实例 {stream_id}") - - def __init__(self, stream_id: str): - """初始化对话系统""" - self.stream_id = stream_id - self.state = ConversationState.INIT - self.current_goal: Optional[str] = None - self.current_method: Optional[str] = None - self.goal_reasoning: Optional[str] = None - self.generated_reply: Optional[str] = None - self.should_continue = True - - # 初始化聊天观察器 - self.chat_observer = ChatObserver.get_instance(stream_id) - - # 添加action历史记录 - self.action_history: List[Dict[str, str]] = [] - - # 知识缓存 - self.knowledge_cache: Dict[str, str] = {} # 确保初始化为字典 - - # 初始化各个组件 - self.goal_analyzer = GoalAnalyzer(self.stream_id) - self.action_planner = ActionPlanner(self.stream_id) - self.reply_generator = ReplyGenerator(self.stream_id) - self.knowledge_fetcher = KnowledgeFetcher() - self.direct_sender = DirectMessageSender() - self.waiter = Waiter(self.stream_id) - - # 创建聊天流 - self.chat_stream = chat_manager.get_stream(self.stream_id) - - def _clear_knowledge_cache(self): - """清空知识缓存""" - self.knowledge_cache.clear() # 使用clear方法清空字典 - - async def start(self): - """开始对话流程""" - try: - logger.info("对话系统启动") - self.should_continue = True - self.chat_observer.start() # 启动观察器 + await asyncio.sleep(1) - # 启动对话循环 - await self._conversation_loop() - except Exception as e: - logger.error(f"启动对话系统失败: {e}") - raise - finally: - # 标记初始化完成 - self._init_events[self.stream_id].set() - self._initializing[self.stream_id] = False + logger.info("等待中...") - async def _conversation_loop(self): - """对话循环""" - # 获取最近的消息历史 - self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() - - while self.should_continue: - # 执行行动 - self.chat_observer.trigger_update() # 触发立即更新 - if not await self.chat_observer.wait_for_update(): - logger.warning("等待消息更新超时") - - action, reason = await self.action_planner.plan( - self.current_goal, - self.current_method, - self.goal_reasoning, - self.action_history, # 传入action历史 - self.chat_observer, # 传入chat_observer - ) - - # 执行行动 - await self._handle_action(action, reason) - - def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: - """将消息字典转换为Message对象""" - try: - chat_info = msg_dict.get("chat_info", {}) - chat_stream = ChatStream.from_dict(chat_info) - user_info = UserInfo.from_dict(msg_dict.get("user_info", {})) - - return Message( - message_id=msg_dict["message_id"], - chat_stream=chat_stream, - time=msg_dict["time"], - user_info=user_info, - processed_plain_text=msg_dict.get("processed_plain_text", ""), - detailed_plain_text=msg_dict.get("detailed_plain_text", ""), - ) - except Exception as e: - logger.warning(f"转换消息时出错: {e}") - raise - - async def _handle_action(self, action: str, reason: str): - """处理规划的行动""" - logger.info(f"执行行动: {action}, 原因: {reason}") - - # 记录action历史 - self.action_history.append( - {"action": action, "reason": reason, "time": datetime.datetime.now().strftime("%H:%M:%S")} - ) - - # 只保留最近的10条记录 - if len(self.action_history) > 10: - self.action_history = self.action_history[-10:] - - if action == "direct_reply": - self.state = ConversationState.GENERATING - messages = self.chat_observer.get_message_history(limit=30) - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - ) - - # 检查回复是否合适 - is_suitable, reason, need_replan = await self.reply_generator.check_reply( - self.generated_reply, self.current_goal - ) - - if not is_suitable: - logger.warning(f"生成的回复不合适,原因: {reason}") - if need_replan: - # 尝试切换到其他备选目标 - alternative_goals = await self.goal_analyzer.get_alternative_goals() - if alternative_goals: - # 有备选目标,尝试使用下一个目标 - self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0] - logger.info(f"切换到备选目标: {self.current_goal}") - # 使用新目标生成回复 - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - ) - # 检查使用新目标生成的回复是否合适 - is_suitable, reason, _ = await self.reply_generator.check_reply( - self.generated_reply, self.current_goal - ) - if is_suitable: - # 如果新目标的回复合适,调整目标优先级 - await self.goal_analyzer._update_goals( - self.current_goal, self.current_method, self.goal_reasoning - ) - else: - # 如果新目标还是不合适,重新思考目标 - self.state = ConversationState.RETHINKING - ( - self.current_goal, - self.current_method, - self.goal_reasoning, - ) = await self.goal_analyzer.analyze_goal() - return - else: - # 没有备选目标,重新分析 - self.state = ConversationState.RETHINKING - ( - self.current_goal, - self.current_method, - self.goal_reasoning, - ) = await self.goal_analyzer.analyze_goal() - return - else: - # 重新生成回复 - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - self.generated_reply, # 将不合适的回复作为previous_reply传入 - ) - - while self.chat_observer.check(): - if not is_suitable: - logger.warning(f"生成的回复不合适,原因: {reason}") - if need_replan: - # 尝试切换到其他备选目标 - alternative_goals = await self.goal_analyzer.get_alternative_goals() - if alternative_goals: - # 有备选目标,尝试使用下一个目标 - self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0] - logger.info(f"切换到备选目标: {self.current_goal}") - # 使用新目标生成回复 - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - ) - is_suitable = True # 假设使用新目标后回复是合适的 - else: - # 没有备选目标,重新分析 - self.state = ConversationState.RETHINKING - ( - self.current_goal, - self.current_method, - self.goal_reasoning, - ) = await self.goal_analyzer.analyze_goal() - return - else: - # 重新生成回复 - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - self.generated_reply, # 将不合适的回复作为previous_reply传入 - ) - - await self._send_reply() - - elif action == "fetch_knowledge": - self.state = ConversationState.GENERATING - messages = self.chat_observer.get_message_history(limit=30) - knowledge, sources = await self.knowledge_fetcher.fetch( - self.current_goal, [self._convert_to_message(msg) for msg in messages] - ) - logger.info(f"获取到知识,来源: {sources}") - - if knowledge != "未找到相关知识": - self.knowledge_cache[sources] = knowledge - - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - ) - - # 检查回复是否合适 - is_suitable, reason, need_replan = await self.reply_generator.check_reply( - self.generated_reply, self.current_goal - ) - - if not is_suitable: - logger.warning(f"生成的回复不合适,原因: {reason}") - if need_replan: - # 尝试切换到其他备选目标 - alternative_goals = await self.goal_analyzer.get_alternative_goals() - if alternative_goals: - # 有备选目标,尝试使用 - self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0] - logger.info(f"切换到备选目标: {self.current_goal}") - # 使用新目标获取知识并生成回复 - knowledge, sources = await self.knowledge_fetcher.fetch( - self.current_goal, [self._convert_to_message(msg) for msg in messages] - ) - if knowledge != "未找到相关知识": - self.knowledge_cache[sources] = knowledge - - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - ) - else: - # 没有备选目标,重新分析 - self.state = ConversationState.RETHINKING - ( - self.current_goal, - self.current_method, - self.goal_reasoning, - ) = await self.goal_analyzer.analyze_goal() - return - else: - # 重新生成回复 - self.generated_reply = await self.reply_generator.generate( - self.current_goal, - self.current_method, - [self._convert_to_message(msg) for msg in messages], - self.knowledge_cache, - self.generated_reply, # 将不合适的回复作为previous_reply传入 - ) - - await self._send_reply() - - elif action == "rethink_goal": - self.state = ConversationState.RETHINKING - self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() - - elif action == "judge_conversation": - self.state = ConversationState.JUDGING - self.goal_achieved, self.stop_conversation, self.reason = await self.goal_analyzer.analyze_conversation( - self.current_goal, self.goal_reasoning - ) - - # 如果当前目标达成但还有其他目标 - if self.goal_achieved and not self.stop_conversation: - alternative_goals = await self.goal_analyzer.get_alternative_goals() - if alternative_goals: - # 切换到下一个目标 - self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0] - logger.info(f"当前目标已达成,切换到新目标: {self.current_goal}") - return - - if self.stop_conversation: - await self._stop_conversation() - - elif action == "listening": - self.state = ConversationState.LISTENING - logger.info("倾听对方发言...") - if await self.waiter.wait(): # 如果返回True表示超时 - await self._send_timeout_message() - await self._stop_conversation() - - else: # wait - self.state = ConversationState.WAITING - logger.info("等待更多信息...") - if await self.waiter.wait(): # 如果返回True表示超时 - await self._send_timeout_message() - await self._stop_conversation() - - async def _stop_conversation(self): - """完全停止对话""" - logger.info("停止对话") - self.should_continue = False - self.state = ConversationState.ENDED - # 删除实例(这会同时停止chat_observer) - await self.remove_instance(self.stream_id) - - async def _send_timeout_message(self): - """发送超时结束消息""" - try: - messages = self.chat_observer.get_message_history(limit=1) - if not messages: - return - - latest_message = self._convert_to_message(messages[0]) - await self.direct_sender.send_message( - chat_stream=self.chat_stream, - content="抱歉,由于等待时间过长,我需要先去忙别的了。下次再聊吧~", - reply_to_message=latest_message, - ) - except Exception as e: - logger.error(f"发送超时消息失败: {str(e)}") - - async def _send_reply(self): - """发送回复""" - if not self.generated_reply: - logger.warning("没有生成回复") - return - - messages = self.chat_observer.get_message_history(limit=1) - if not messages: - logger.warning("没有最近的消息可以回复") - return - - latest_message = self._convert_to_message(messages[0]) - try: - await self.direct_sender.send_message( - chat_stream=self.chat_stream, content=self.generated_reply, reply_to_message=latest_message - ) - self.chat_observer.trigger_update() # 触发立即更新 - if not await self.chat_observer.wait_for_update(): - logger.warning("等待消息更新超时") - - self.state = ConversationState.ANALYZING - except Exception as e: - logger.error(f"发送消息失败: {str(e)}") - self.state = ConversationState.ANALYZING class DirectMessageSender: diff --git a/src/plugins/PFC/pfc_manager.py b/src/plugins/PFC/pfc_manager.py new file mode 100644 index 000000000..9a36bef19 --- /dev/null +++ b/src/plugins/PFC/pfc_manager.py @@ -0,0 +1,97 @@ +from typing import Dict, Optional +from src.common.logger import get_module_logger +from .conversation import Conversation +import traceback + +logger = get_module_logger("pfc_manager") + +class PFCManager: + """PFC对话管理器,负责管理所有对话实例""" + + # 单例模式 + _instance = None + + # 会话实例管理 + _instances: Dict[str, Conversation] = {} + _initializing: Dict[str, bool] = {} + + @classmethod + def get_instance(cls) -> 'PFCManager': + """获取管理器单例 + + Returns: + PFCManager: 管理器实例 + """ + if cls._instance is None: + cls._instance = PFCManager() + return cls._instance + + async def get_or_create_conversation(self, stream_id: str) -> Optional[Conversation]: + """获取或创建对话实例 + + Args: + stream_id: 聊天流ID + + Returns: + Optional[Conversation]: 对话实例,创建失败则返回None + """ + # 检查是否已经有实例 + if stream_id in self._initializing and self._initializing[stream_id]: + logger.debug(f"会话实例正在初始化中: {stream_id}") + return None + + if stream_id in self._instances: + logger.debug(f"使用现有会话实例: {stream_id}") + return self._instances[stream_id] + + try: + # 创建新实例 + logger.info(f"创建新的对话实例: {stream_id}") + self._initializing[stream_id] = True + # 创建实例 + conversation_instance = Conversation(stream_id) + self._instances[stream_id] = conversation_instance + + # 启动实例初始化 + await self._initialize_conversation(conversation_instance) + except Exception as e: + logger.error(f"创建会话实例失败: {stream_id}, 错误: {e}") + return None + + return conversation_instance + + + async def _initialize_conversation(self, conversation: Conversation): + """初始化会话实例 + + Args: + conversation: 要初始化的会话实例 + """ + stream_id = conversation.stream_id + + try: + logger.info(f"开始初始化会话实例: {stream_id}") + # 启动初始化流程 + await conversation._initialize() + + # 标记初始化完成 + self._initializing[stream_id] = False + + logger.info(f"会话实例 {stream_id} 初始化完成") + + except Exception as e: + logger.error(f"管理器初始化会话实例失败: {stream_id}, 错误: {e}") + logger.error(traceback.format_exc()) + # 清理失败的初始化 + + + async def get_conversation(self, stream_id: str) -> Optional[Conversation]: + """获取已存在的会话实例 + + Args: + stream_id: 聊天流ID + + Returns: + Optional[Conversation]: 会话实例,不存在则返回None + """ + return self._instances.get(stream_id) \ No newline at end of file diff --git a/src/plugins/PFC/pfc_types.py b/src/plugins/PFC/pfc_types.py new file mode 100644 index 000000000..d7ad8e91f --- /dev/null +++ b/src/plugins/PFC/pfc_types.py @@ -0,0 +1,21 @@ +from enum import Enum +from typing import Literal + + +class ConversationState(Enum): + """对话状态""" + INIT = "初始化" + RETHINKING = "重新思考" + ANALYZING = "分析历史" + PLANNING = "规划目标" + GENERATING = "生成回复" + CHECKING = "检查回复" + SENDING = "发送消息" + FETCHING = "获取知识" + WAITING = "等待" + LISTENING = "倾听" + ENDED = "结束" + JUDGING = "判断" + + +ActionType = Literal["direct_reply", "fetch_knowledge", "wait"] \ No newline at end of file diff --git a/src/plugins/PFC/reply_checker.py b/src/plugins/PFC/reply_checker.py index c53feba9b..6889f7ca8 100644 --- a/src/plugins/PFC/reply_checker.py +++ b/src/plugins/PFC/reply_checker.py @@ -33,7 +33,7 @@ class ReplyChecker: Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) """ # 获取最新的消息记录 - messages = self.chat_observer.get_message_history(limit=5) + messages = self.chat_observer.get_cached_messages(limit=5) chat_history_text = "" for msg in messages: time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py new file mode 100644 index 000000000..beec9dd3e --- /dev/null +++ b/src/plugins/PFC/reply_generator.py @@ -0,0 +1,126 @@ +from typing import Tuple +from src.common.logger import get_module_logger +from ..models.utils_model import LLM_request +from ..config.config import global_config +from .chat_observer import ChatObserver +from .reply_checker import ReplyChecker +from src.individuality.individuality import Individuality +from .observation_info import ObservationInfo +from .conversation_info import ConversationInfo + +logger = get_module_logger("reply_generator") + + +class ReplyGenerator: + """回复生成器""" + + def __init__(self, stream_id: str): + self.llm = LLM_request( + model=global_config.llm_normal, + temperature=0.7, + max_tokens=300, + request_type="reply_generation" + ) + self.personality_info = Individuality.get_instance().get_prompt(type = "personality", x_person = 2, level = 2) + self.name = global_config.BOT_NICKNAME + self.chat_observer = ChatObserver.get_instance(stream_id) + self.reply_checker = ReplyChecker(stream_id) + + async def generate( + self, + observation_info: ObservationInfo, + conversation_info: ConversationInfo + ) -> str: + """生成回复 + + Args: + goal: 对话目标 + chat_history: 聊天历史 + knowledge_cache: 知识缓存 + previous_reply: 上一次生成的回复(如果有) + retry_count: 当前重试次数 + + Returns: + str: 生成的回复 + """ + # 构建提示词 + logger.debug(f"开始生成回复:当前目标: {conversation_info.goal_list}") + + goal_list = conversation_info.goal_list + goal_text = "" + for goal, reason in goal_list: + goal_text += f"目标:{goal};" + goal_text += f"原因:{reason}\n" + + # 获取聊天历史记录 + chat_history_list = observation_info.chat_history + chat_history_text = "" + for msg in chat_history_list: + chat_history_text += f"{msg}\n" + + + # 整理知识缓存 + knowledge_text = "" + knowledge_list = conversation_info.knowledge_list + for knowledge in knowledge_list: + knowledge_text += f"知识:{knowledge}\n" + + personality_text = f"你的名字是{self.name},{self.personality_info}" + + prompt = f"""{personality_text}。现在你在参与一场QQ聊天,请根据以下信息生成回复: + +当前对话目标:{goal_text} +{knowledge_text} +最近的聊天记录: +{chat_history_text} + +请根据上述信息,以你的性格特征生成一个自然、得体的回复。回复应该: +1. 符合对话目标,以"你"的角度发言 +2. 体现你的性格特征 +3. 自然流畅,像正常聊天一样,简短 +4. 适当利用相关知识,但不要生硬引用 + +请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清"你"和对方说的话,不要把"你"说的话当做对方说的话,这是你自己说的话。 +请你回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 +请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 +不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。 + +请直接输出回复内容,不需要任何额外格式。""" + + try: + content, _ = await self.llm.generate_response_async(prompt) + logger.info(f"生成的回复: {content}") + # is_new = self.chat_observer.check() + # logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息") + + # 如果有新消息,重新生成回复 + # if is_new: + # logger.info("检测到新消息,重新生成回复") + # return await self.generate( + # goal, chat_history, knowledge_cache, + # None, retry_count + # ) + + return content + + except Exception as e: + logger.error(f"生成回复时出错: {e}") + return "抱歉,我现在有点混乱,让我重新思考一下..." + + async def check_reply( + self, + reply: str, + goal: str, + retry_count: int = 0 + ) -> Tuple[bool, str, bool]: + """检查回复是否合适 + + Args: + reply: 生成的回复 + goal: 对话目标 + retry_count: 当前重试次数 + + Returns: + Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) + """ + return await self.reply_checker.check(reply, goal, retry_count) \ No newline at end of file diff --git a/src/plugins/PFC/waiter.py b/src/plugins/PFC/waiter.py new file mode 100644 index 000000000..0e1bf59f3 --- /dev/null +++ b/src/plugins/PFC/waiter.py @@ -0,0 +1,45 @@ +from src.common.logger import get_module_logger +from .chat_observer import ChatObserver + +logger = get_module_logger("waiter") + +class Waiter: + """等待器,用于等待对话流中的事件""" + + def __init__(self, stream_id: str): + self.stream_id = stream_id + self.chat_observer = ChatObserver.get_instance(stream_id) + + async def wait(self, timeout: float = 20.0) -> bool: + """等待用户回复或超时 + + Args: + timeout: 超时时间(秒) + + Returns: + bool: 如果因为超时返回则为True,否则为False + """ + try: + message_before = self.chat_observer.get_last_message() + + # 等待新消息 + logger.debug(f"等待新消息,超时时间: {timeout}秒") + + is_timeout = await self.chat_observer.wait_for_update(timeout=timeout) + if is_timeout: + logger.debug("等待超时,没有收到新消息") + return True + + # 检查是否是新消息 + message_after = self.chat_observer.get_last_message() + if message_before and message_after and message_before.get("message_id") == message_after.get("message_id"): + # 如果消息ID相同,说明没有新消息 + logger.debug("没有收到新消息") + return True + + logger.debug("收到新消息") + return False + + except Exception as e: + logger.error(f"等待时出错: {str(e)}") + return True \ No newline at end of file diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 58a5b20c3..40a00a3ab 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -1,14 +1,13 @@ from ..moods.moods import MoodManager # 导入情绪管理器 from ..config.config import global_config from .message import MessageRecv -from ..PFC.pfc import Conversation, ConversationState +from ..PFC.pfc_manager import PFCManager from .chat_stream import chat_manager from ..chat_module.only_process.only_message_process import MessageProcessor from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from ..chat_module.think_flow_chat.think_flow_chat import ThinkFlowChat from ..chat_module.reasoning_chat.reasoning_chat import ReasoningChat -import asyncio import traceback # 定义日志配置 @@ -31,10 +30,15 @@ class ChatBot: self.think_flow_chat = ThinkFlowChat() self.reasoning_chat = ReasoningChat() self.only_process_chat = MessageProcessor() + + # 创建初始化PFC管理器的任务,会在_ensure_started时执行 + self.pfc_manager = PFCManager.get_instance() async def _ensure_started(self): """确保所有任务已启动""" if not self._started: + logger.info("确保ChatBot所有任务已启动") + self._started = True async def _create_PFC_chat(self, message: MessageRecv): @@ -42,27 +46,11 @@ class ChatBot: chat_id = str(message.chat_stream.stream_id) if global_config.enable_pfc_chatting: - # 获取或创建对话实例 - conversation = await Conversation.get_instance(chat_id) - if conversation is None: - logger.error(f"创建或获取对话实例失败: {chat_id}") - return + + await self.pfc_manager.get_or_create_conversation(chat_id) - # 如果是新创建的实例,启动对话系统 - if conversation.state == ConversationState.INIT: - asyncio.create_task(conversation.start()) - logger.info(f"为聊天 {chat_id} 创建新的对话实例") - elif conversation.state == ConversationState.ENDED: - # 如果实例已经结束,重新创建 - await Conversation.remove_instance(chat_id) - conversation = await Conversation.get_instance(chat_id) - if conversation is None: - logger.error(f"重新创建对话实例失败: {chat_id}") - return - asyncio.create_task(conversation.start()) - logger.info(f"为聊天 {chat_id} 重新创建对话实例") except Exception as e: - logger.error(f"创建PFC聊天流失败: {e}") + logger.error(f"创建PFC聊天失败: {e}") async def message_process(self, message_data: str) -> None: """处理转化后的统一格式消息 @@ -90,6 +78,9 @@ class ChatBot: - 性能计时 """ try: + # 确保所有任务已启动 + await self._ensure_started() + message = MessageRecv(message_data) groupinfo = message.message_info.group_info userinfo = message.message_info.user_info diff --git a/src/plugins/config/config.py b/src/plugins/config/config.py index f751d2478..eccb3bc0b 100644 --- a/src/plugins/config/config.py +++ b/src/plugins/config/config.py @@ -24,10 +24,11 @@ config_config = LogConfig( # 配置主程序日志格式 logger = get_module_logger("config", config=config_config) -# 考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 -is_test = False -mai_version_main = "0.6.1" -mai_version_fix = "" +#考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 +is_test = True +mai_version_main = "0.6.2" +mai_version_fix = "snapshot-1" + if mai_version_fix: if is_test: mai_version = f"test-{mai_version_main}-{mai_version_fix}" @@ -454,6 +455,7 @@ class BotConfig: config.emoji_response_penalty = willing_config.get( "emoji_response_penalty", config.emoji_response_penalty ) + if config.INNER_VERSION in SpecifierSet(">=1.2.5"): config.mentioned_bot_inevitable_reply = willing_config.get( "mentioned_bot_inevitable_reply", config.mentioned_bot_inevitable_reply ) diff --git a/src/plugins/utils/statistic.py b/src/plugins/utils/statistic.py index eef10c01d..4b9afff39 100644 --- a/src/plugins/utils/statistic.py +++ b/src/plugins/utils/statistic.py @@ -2,7 +2,7 @@ import threading import time from collections import defaultdict from datetime import datetime, timedelta -from typing import Any, Dict +from typing import Any, Dict, List from src.common.logger import get_module_logger from ...common.database import db @@ -22,6 +22,7 @@ class LLMStatistics: self.stats_thread = None self.console_thread = None self._init_database() + self.name_dict: Dict[List] = {} def _init_database(self): """初始化数据库集合""" @@ -137,16 +138,24 @@ class LLMStatistics: # user_id = str(doc.get("user_info", {}).get("user_id", "unknown")) chat_info = doc.get("chat_info", {}) user_info = doc.get("user_info", {}) + message_time = doc.get("time", 0) group_info = chat_info.get("group_info") if chat_info else {} # print(f"group_info: {group_info}") group_name = None if group_info: + group_id = f"g{group_info.get('group_id')}" group_name = group_info.get("group_name", f"群{group_info.get('group_id')}") if user_info and not group_name: + group_id = f"u{user_info['user_id']}" group_name = user_info["user_nickname"] + if self.name_dict.get(group_id): + if message_time > self.name_dict.get(group_id)[1]: + self.name_dict[group_id] = [group_name, message_time] + else: + self.name_dict[group_id] = [group_name, message_time] # print(f"group_name: {group_name}") stats["messages_by_user"][user_id] += 1 - stats["messages_by_chat"][group_name] += 1 + stats["messages_by_chat"][group_id] += 1 return stats @@ -187,7 +196,7 @@ class LLMStatistics: tokens = stats["tokens_by_model"][model_name] cost = stats["costs_by_model"][model_name] output.append( - data_fmt.format(model_name[:32] + ".." if len(model_name) > 32 else model_name, count, tokens, cost) + data_fmt.format(model_name[:30] + ".." if len(model_name) > 32 else model_name, count, tokens, cost) ) output.append("") @@ -221,8 +230,8 @@ class LLMStatistics: # 添加聊天统计 output.append("群组统计:") output.append(("群组名称 消息数量")) - for group_name, count in sorted(stats["messages_by_chat"].items()): - output.append(f"{group_name[:32]:<32} {count:>10}") + for group_id, count in sorted(stats["messages_by_chat"].items()): + output.append(f"{self.name_dict[group_id][0][:32]:<32} {count:>10}") return "\n".join(output) @@ -250,7 +259,7 @@ class LLMStatistics: tokens = stats["tokens_by_model"][model_name] cost = stats["costs_by_model"][model_name] output.append( - data_fmt.format(model_name[:32] + ".." if len(model_name) > 32 else model_name, count, tokens, cost) + data_fmt.format(model_name[:30] + ".." if len(model_name) > 32 else model_name, count, tokens, cost) ) output.append("") @@ -284,8 +293,8 @@ class LLMStatistics: # 添加聊天统计 output.append("群组统计:") output.append(("群组名称 消息数量")) - for group_name, count in sorted(stats["messages_by_chat"].items()): - output.append(f"{group_name[:32]:<32} {count:>10}") + for group_id, count in sorted(stats["messages_by_chat"].items()): + output.append(f"{self.name_dict[group_id][0][:32]:<32} {count:>10}") return "\n".join(output) diff --git a/src/plugins/willing/mode_dynamic.py b/src/plugins/willing/mode_dynamic.py index ce188c56c..3d2ca6e77 100644 --- a/src/plugins/willing/mode_dynamic.py +++ b/src/plugins/willing/mode_dynamic.py @@ -158,12 +158,12 @@ class WillingManager: logger.debug(f"被提及, 当前意愿: {current_willing}") if is_emoji: - current_willing *= 0.1 + current_willing = global_config.emoji_response_penalty * 0.1 logger.debug(f"表情包, 当前意愿: {current_willing}") # 根据话题兴趣度适当调整 if interested_rate > 0.5: - current_willing += (interested_rate - 0.5) * 0.5 + current_willing += (interested_rate - 0.5) * 0.5 * global_config.response_interested_rate_amplifier # 根据当前模式计算回复概率 base_probability = 0.0 @@ -180,7 +180,7 @@ class WillingManager: base_probability = 0.30 if msg_count >= 15 else 0.03 * min(msg_count, 10) # 考虑回复意愿的影响 - reply_probability = base_probability * current_willing + reply_probability = base_probability * current_willing * global_config.response_willing_amplifier # 检查群组权限(如果是群聊) if chat_stream.group_info and config: diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index d7a5cdaea..70cf0e0b7 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "1.2.4" +version = "1.2.5" #以下是给开发人员阅读的,一般用户不需要阅读