diff --git a/changelogs/changelog_dev.md b/changelogs/changelog_dev.md index acfb7e03f..3a5f9740f 100644 --- a/changelogs/changelog_dev.md +++ b/changelogs/changelog_dev.md @@ -1,4 +1,9 @@ 这里放置了测试版本的细节更新 + +## [test-0.6.1-snapshot-1] - 2025-4-5 +- 修复pfc回复出错bug +- 修复表情包打字时间,不会卡表情包 + ## [test-0.6.0-snapshot-9] - 2025-4-4 - 可以识别gif表情包 diff --git a/flake.nix b/flake.nix index 404f7555c..23b82bb77 100644 --- a/flake.nix +++ b/flake.nix @@ -18,10 +18,11 @@ devShells.default = pkgs.mkShell { name = "python-venv"; venvDir = "./.venv"; - buildInputs = [ - pythonPackages.python - pythonPackages.venvShellHook - pythonPackages.numpy + buildInputs = with pythonPackages; [ + python + venvShellHook + scipy + numpy ]; postVenvCreation = '' @@ -35,4 +36,4 @@ ''; }; }); -} \ No newline at end of file +} diff --git a/scripts/run.sh b/scripts/run.sh index c1fe4973f..342a23feb 100644 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -4,7 +4,7 @@ # 适用于Arch/Ubuntu 24.10/Debian 12/CentOS 9 # 请小心使用任何一键脚本! -INSTALLER_VERSION="0.0.1-refactor" +INSTALLER_VERSION="0.0.2-refactor" LANG=C.UTF-8 # 如无法访问GitHub请修改此处镜像地址 @@ -62,7 +62,7 @@ show_menu() { "4" "启动Nonebot adapter" \ "5" "停止Nonebot adapter" \ "6" "重启Nonebot adapter" \ - "7" "更新MaiCore及其依赖" \ + "7" "拉取最新MaiCore仓库" \ "8" "切换分支" \ "9" "退出" 3>&1 1>&2 2>&3) @@ -111,6 +111,8 @@ show_menu() { # 更新依赖 update_dependencies() { + whiptail --title "⚠" --msgbox "更新后请阅读教程" 10 60 + systemctl stop ${SERVICE_NAME} cd "${INSTALL_DIR}/MaiBot" || { whiptail --msgbox "🚫 无法进入安装目录!" 10 60 return 1 @@ -126,8 +128,7 @@ update_dependencies() { return 1 fi deactivate - systemctl restart ${SERVICE_NAME} - whiptail --msgbox "✅ 依赖已更新并重启服务!" 10 60 + whiptail --msgbox "✅ 已停止服务并拉取最新仓库提交" 10 60 } # 切换分支 @@ -157,7 +158,7 @@ switch_branch() { whiptail --msgbox "🚫 代码拉取失败!" 10 60 return 1 fi - + systemctl stop ${SERVICE_NAME} source "${INSTALL_DIR}/venv/bin/activate" pip install -r requirements.txt deactivate @@ -165,8 +166,7 @@ switch_branch() { sed -i "s/^BRANCH=.*/BRANCH=${new_branch}/" /etc/maicore_install.conf BRANCH="${new_branch}" check_eula - systemctl restart ${SERVICE_NAME} - whiptail --msgbox "✅ 已切换到分支 ${new_branch} 并重启服务!" 10 60 + whiptail --msgbox "✅ 已停止服务并切换到分支 ${new_branch} !" 10 60 } check_eula() { @@ -228,6 +228,8 @@ run_installation() { fi fi + whiptail --title "ℹ️ 提示" --msgbox "如果您没有特殊需求,请优先使用docker方式部署。" 10 60 + # 协议确认 if ! (whiptail --title "ℹ️ [1/6] 使用协议" --yes-button "我同意" --no-button "我拒绝" --yesno "使用MaiCore及此脚本前请先阅读EULA协议及隐私协议\nhttps://github.com/MaiM-with-u/MaiBot/blob/refactor/EULA.md\nhttps://github.com/MaiM-with-u/MaiBot/blob/refactor/PRIVACY.md\n\n您是否同意上述协议?" 12 70); then exit 1 @@ -370,12 +372,13 @@ run_installation() { # 选择分支 choose_branch() { BRANCH=$(whiptail --title "🔀 选择分支" --radiolist "请选择要安装的分支:" 15 60 4 \ - "main" "稳定最新版(推荐)" ON \ - "classical" "经典版" OFF \ + "main" "稳定版本(推荐)" ON \ + "dev" "开发版(不知道什么意思就别选)" OFF \ + "classical" "经典版(0.6.0以前的版本)" OFF \ "custom" "自定义分支" OFF 3>&1 1>&2 2>&3) RETVAL=$? if [ $RETVAL -ne 0 ]; then - whiptail --msgbox "操作取消!" 10 60 + whiptail --msgbox "🚫 操作取消!" 10 60 exit 1 fi @@ -383,7 +386,7 @@ run_installation() { BRANCH=$(whiptail --title "🔀 自定义分支" --inputbox "请输入自定义分支名称:" 10 60 "refactor" 3>&1 1>&2 2>&3) RETVAL=$? if [ $RETVAL -ne 0 ]; then - whiptail --msgbox "输入取消!" 10 60 + whiptail --msgbox "🚫 输入取消!" 10 60 exit 1 fi if [[ -z "$BRANCH" ]]; then diff --git a/src/common/database.py b/src/common/database.py index a3e5b4e3b..ee0ead0bd 100644 --- a/src/common/database.py +++ b/src/common/database.py @@ -15,9 +15,16 @@ def __create_database_instance(): password = os.getenv("MONGODB_PASSWORD") auth_source = os.getenv("MONGODB_AUTH_SOURCE") - if uri and uri.startswith("mongodb://"): - # 优先使用URI连接 - return MongoClient(uri) + if uri: + # 支持标准mongodb://和mongodb+srv://连接字符串 + if uri.startswith(("mongodb://", "mongodb+srv://")): + return MongoClient(uri) + else: + raise ValueError( + "Invalid MongoDB URI format. URI must start with 'mongodb://' or 'mongodb+srv://'. " + "For MongoDB Atlas, use 'mongodb+srv://' format. " + "See: https://www.mongodb.com/docs/manual/reference/connection-string/" + ) if username and password: # 如果有用户名和密码,使用认证连接 diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index 4fa6951e2..532afc9db 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -1,6 +1,6 @@ import time import asyncio -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Tuple from src.common.logger import get_module_logger from src.common.database import db from ..message.message_base import UserInfo @@ -57,6 +57,35 @@ class ChatObserver: self._update_event = asyncio.Event() # 触发更新的事件 self._update_complete = asyncio.Event() # 更新完成的事件 + def check(self) -> bool: + """检查距离上一次观察之后是否有了新消息 + + Returns: + bool: 是否有新消息 + """ + logger.debug(f"检查距离上一次观察之后是否有了新消息: {self.last_check_time}") + + query = { + "chat_id": self.stream_id, + "time": {"$gt": self.last_check_time} + } + + # 只需要查询是否存在,不需要获取具体消息 + new_message_exists = db.messages.find_one(query) is not None + + if new_message_exists: + logger.debug("发现新消息") + self.last_check_time = time.time() + + return new_message_exists + + def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象""" + messages = self.get_message_history(self.last_check_time) + for message in messages: + self._add_message_to_history(message) + return messages, self.message_history + def new_message_after(self, time_point: float) -> bool: """判断是否在指定时间点后有新消息 @@ -66,6 +95,7 @@ class ChatObserver: Returns: bool: 是否有新消息 """ + logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point}") return self.last_message_time is None or self.last_message_time > time_point def _add_message_to_history(self, message: Dict[str, Any]): diff --git a/src/plugins/PFC/pfc.py b/src/plugins/PFC/pfc.py index 667a6f035..e02409ce8 100644 --- a/src/plugins/PFC/pfc.py +++ b/src/plugins/PFC/pfc.py @@ -17,6 +17,7 @@ from ..storage.storage import MessageStorage from .chat_observer import ChatObserver from .pfc_KnowledgeFetcher import KnowledgeFetcher from .reply_checker import ReplyChecker +from .pfc_utils import get_items_from_json import json import time @@ -128,43 +129,18 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到 content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 清理内容,尝试提取JSON部分 - content = content.strip() - try: - # 尝试直接解析 - result = json.loads(content) - except json.JSONDecodeError: - # 如果直接解析失败,尝试查找和提取JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - json_match = re.search(json_pattern, content) - if json_match: - try: - result = json.loads(json_match.group()) - except json.JSONDecodeError: - logger.error("提取的JSON内容解析失败,返回默认行动") - return "direct_reply", "JSON解析失败,选择直接回复" - else: - # 如果找不到JSON,尝试从文本中提取行动和原因 - if "direct_reply" in content.lower(): - return "direct_reply", "从文本中提取的行动" - elif "fetch_knowledge" in content.lower(): - return "fetch_knowledge", "从文本中提取的行动" - elif "wait" in content.lower(): - return "wait", "从文本中提取的行动" - elif "listening" in content.lower(): - return "listening", "从文本中提取的行动" - elif "rethink_goal" in content.lower(): - return "rethink_goal", "从文本中提取的行动" - elif "judge_conversation" in content.lower(): - return "judge_conversation", "从文本中提取的行动" - else: - logger.error("无法从返回内容中提取行动类型") - return "direct_reply", "无法解析响应,选择直接回复" + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "action", "reason", + default_values={"action": "direct_reply", "reason": "默认原因"} + ) - # 验证JSON字段 - action = result.get("action", "direct_reply") - reason = result.get("reason", "默认原因") + if not success: + return "direct_reply", "JSON解析失败,选择直接回复" + + action = result["action"] + reason = result["reason"] # 验证action类型 if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal", "judge_conversation"]: @@ -195,6 +171,8 @@ class GoalAnalyzer: self.name = global_config.BOT_NICKNAME self.nick_name = global_config.BOT_ALIAS_NAMES self.chat_observer = ChatObserver.get_instance(stream_id) + + self.current_goal_and_reason = None async def analyze_goal(self) -> Tuple[str, str, str]: """分析对话历史并设定目标 @@ -239,48 +217,20 @@ class GoalAnalyzer: content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 清理和验证返回内容 - if not content or not isinstance(content, str): - logger.error("LLM返回内容为空或格式不正确") - continue - - # 尝试提取JSON部分 - content = content.strip() - try: - # 尝试直接解析 - result = json.loads(content) - except json.JSONDecodeError: - # 如果直接解析失败,尝试查找和提取JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - json_match = re.search(json_pattern, content) - if json_match: - try: - result = json.loads(json_match.group()) - except json.JSONDecodeError: - logger.error(f"提取的JSON内容解析失败,重试第{retry + 1}次") - continue - else: - logger.error(f"无法在返回内容中找到有效的JSON,重试第{retry + 1}次") - continue + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "goal", "reasoning", + required_types={"goal": str, "reasoning": str} + ) - # 验证JSON字段 - if not all(key in result for key in ["goal", "reasoning"]): - logger.error(f"JSON缺少必要字段,实际内容: {result},重试第{retry + 1}次") + if not success: + logger.error(f"无法解析JSON,重试第{retry + 1}次") continue goal = result["goal"] reasoning = result["reasoning"] - # 验证字段内容 - if not isinstance(goal, str) or not isinstance(reasoning, str): - logger.error(f"JSON字段类型错误,goal和reasoning必须是字符串,重试第{retry + 1}次") - continue - - if not goal.strip() or not reasoning.strip(): - logger.error(f"JSON字段内容为空,重试第{retry + 1}次") - continue - # 使用默认的方法 method = "以友好的态度回应" return goal, method, reasoning @@ -330,58 +280,21 @@ class GoalAnalyzer: content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 清理和验证返回内容 - if not content or not isinstance(content, str): - logger.error("LLM返回内容为空或格式不正确") - return False, False, "确保对话顺利进行" - - # 尝试提取JSON部分 - content = content.strip() - try: - # 尝试直接解析 - result = json.loads(content) - except json.JSONDecodeError: - # 如果直接解析失败,尝试查找和提取JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - json_match = re.search(json_pattern, content) - if json_match: - try: - result = json.loads(json_match.group()) - except json.JSONDecodeError as e: - logger.error(f"提取的JSON内容解析失败: {e}") - return False, False, "确保对话顺利进行" - else: - logger.error("无法在返回内容中找到有效的JSON") - return False, False, "确保对话顺利进行" + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "goal_achieved", "stop_conversation", "reason", + required_types={ + "goal_achieved": bool, + "stop_conversation": bool, + "reason": str + } + ) - # 验证JSON字段 - if not all(key in result for key in ["goal_achieved", "stop_conversation", "reason"]): - logger.error(f"JSON缺少必要字段,实际内容: {result}") - return False, False, "确保对话顺利进行" - - goal_achieved = result["goal_achieved"] - stop_conversation = result["stop_conversation"] - reason = result["reason"] - - # 验证字段类型 - if not isinstance(goal_achieved, bool): - logger.error("goal_achieved 必须是布尔值") - return False, False, "确保对话顺利进行" - - if not isinstance(stop_conversation, bool): - logger.error("stop_conversation 必须是布尔值") - return False, False, "确保对话顺利进行" - - if not isinstance(reason, str): - logger.error("reason 必须是字符串") - return False, False, "确保对话顺利进行" - - if not reason.strip(): - logger.error("reason 不能为空") + if not success: return False, False, "确保对话顺利进行" - return goal_achieved, stop_conversation, reason + return result["goal_achieved"], result["stop_conversation"], result["reason"] except Exception as e: logger.error(f"分析对话目标时出错: {str(e)}") @@ -435,19 +348,18 @@ class ReplyGenerator: knowledge_cache: Dict[str, str], previous_reply: Optional[str] = None, retry_count: int = 0 - ) -> Tuple[str, bool]: + ) -> str: """生成回复 Args: goal: 对话目标 - method: 实现方式 chat_history: 聊天历史 knowledge_cache: 知识缓存 previous_reply: 上一次生成的回复(如果有) retry_count: 当前重试次数 Returns: - Tuple[str, bool]: (生成的回复, 是否需要重新规划) + str: 生成的回复 """ # 构建提示词 logger.debug(f"开始生成回复:当前目标: {goal}") @@ -508,53 +420,105 @@ class ReplyGenerator: try: content, _ = await self.llm.generate_response_async(prompt) logger.info(f"生成的回复: {content}") + is_new = self.chat_observer.check() + logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息") - # 检查生成的回复是否合适 - is_suitable, reason, need_replan = await self.reply_checker.check( - content, goal, retry_count - ) - - if not is_suitable: - logger.warning(f"生成的回复不合适,原因: {reason}") - if need_replan: - logger.info("需要重新规划对话目标") - return "让我重新思考一下...", True - else: - # 递归调用,将当前回复作为previous_reply传入 - return await self.generate( - goal, chat_history, knowledge_cache, - content, retry_count + 1 - ) + # 如果有新消息,重新生成回复 + if is_new: + logger.info("检测到新消息,重新生成回复") + return await self.generate( + goal, chat_history, knowledge_cache, + None, retry_count + ) - return content, False + return content except Exception as e: logger.error(f"生成回复时出错: {e}") - return "抱歉,我现在有点混乱,让我重新思考一下...", True + return "抱歉,我现在有点混乱,让我重新思考一下..." + + async def check_reply( + self, + reply: str, + goal: str, + retry_count: int = 0 + ) -> Tuple[bool, str, bool]: + """检查回复是否合适 + + Args: + reply: 生成的回复 + goal: 对话目标 + retry_count: 当前重试次数 + + Returns: + Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) + """ + return await self.reply_checker.check(reply, goal, retry_count) class Conversation: # 类级别的实例管理 _instances: Dict[str, 'Conversation'] = {} + _instance_lock = asyncio.Lock() # 类级别的全局锁 + _init_events: Dict[str, asyncio.Event] = {} # 初始化完成事件 + _initializing: Dict[str, bool] = {} # 标记是否正在初始化 @classmethod - def get_instance(cls, stream_id: str) -> 'Conversation': - """获取或创建对话实例""" - if stream_id not in cls._instances: - cls._instances[stream_id] = cls(stream_id) - logger.info(f"创建新的对话实例: {stream_id}") - return cls._instances[stream_id] + async def get_instance(cls, stream_id: str) -> Optional['Conversation']: + """获取或创建对话实例 + + Args: + stream_id: 聊天流ID + + Returns: + Optional[Conversation]: 对话实例,如果创建或等待失败则返回None + """ + try: + # 使用全局锁来确保线程安全 + async with cls._instance_lock: + # 如果已经在初始化中,等待初始化完成 + if stream_id in cls._initializing and cls._initializing[stream_id]: + # 释放锁等待初始化 + cls._instance_lock.release() + try: + await asyncio.wait_for(cls._init_events[stream_id].wait(), timeout=5.0) + except asyncio.TimeoutError: + logger.error(f"等待实例 {stream_id} 初始化超时") + return None + finally: + await cls._instance_lock.acquire() + + # 如果实例不存在,创建新实例 + if stream_id not in cls._instances: + cls._instances[stream_id] = cls(stream_id) + cls._init_events[stream_id] = asyncio.Event() + cls._initializing[stream_id] = True + logger.info(f"创建新的对话实例: {stream_id}") + + return cls._instances[stream_id] + except Exception as e: + logger.error(f"获取对话实例失败: {e}") + return None @classmethod - def remove_instance(cls, stream_id: str): - """删除对话实例""" - if stream_id in cls._instances: - # 停止相关组件 - instance = cls._instances[stream_id] - instance.chat_observer.stop() - # 删除实例 - del cls._instances[stream_id] - logger.info(f"已删除对话实例 {stream_id}") + async def remove_instance(cls, stream_id: str): + """删除对话实例 + + Args: + stream_id: 聊天流ID + """ + async with cls._instance_lock: + if stream_id in cls._instances: + # 停止相关组件 + instance = cls._instances[stream_id] + instance.chat_observer.stop() + # 删除实例 + del cls._instances[stream_id] + if stream_id in cls._init_events: + del cls._init_events[stream_id] + if stream_id in cls._initializing: + del cls._initializing[stream_id] + logger.info(f"已删除对话实例 {stream_id}") def __init__(self, stream_id: str): """初始化对话系统""" @@ -592,13 +556,21 @@ class Conversation: async def start(self): """开始对话流程""" - logger.info("对话系统启动") - self.should_continue = True - self.chat_observer.start() # 启动观察器 - await asyncio.sleep(1) - # 启动对话循环 - await self._conversation_loop() - + try: + logger.info("对话系统启动") + self.should_continue = True + self.chat_observer.start() # 启动观察器 + await asyncio.sleep(1) + # 启动对话循环 + await self._conversation_loop() + except Exception as e: + logger.error(f"启动对话系统失败: {e}") + raise + finally: + # 标记初始化完成 + self._init_events[self.stream_id].set() + self._initializing[self.stream_id] = False + async def _conversation_loop(self): """对话循环""" # 获取最近的消息历史 @@ -658,17 +630,53 @@ class Conversation: if action == "direct_reply": self.state = ConversationState.GENERATING messages = self.chat_observer.get_message_history(limit=30) - self.generated_reply, need_replan = await self.reply_generator.generate( + self.generated_reply = await self.reply_generator.generate( self.current_goal, self.current_method, [self._convert_to_message(msg) for msg in messages], self.knowledge_cache ) - if need_replan: - self.state = ConversationState.RETHINKING - self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() - else: - await self._send_reply() + + # 检查回复是否合适 + is_suitable, reason, need_replan = await self.reply_generator.check_reply( + self.generated_reply, + self.current_goal + ) + + if not is_suitable: + logger.warning(f"生成的回复不合适,原因: {reason}") + if need_replan: + self.state = ConversationState.RETHINKING + self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() + return + else: + # 重新生成回复 + self.generated_reply = await self.reply_generator.generate( + self.current_goal, + self.current_method, + [self._convert_to_message(msg) for msg in messages], + self.knowledge_cache, + self.generated_reply # 将不合适的回复作为previous_reply传入 + ) + + while self.chat_observer.check(): + if not is_suitable: + logger.warning(f"生成的回复不合适,原因: {reason}") + if need_replan: + self.state = ConversationState.RETHINKING + self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() + return + else: + # 重新生成回复 + self.generated_reply = await self.reply_generator.generate( + self.current_goal, + self.current_method, + [self._convert_to_message(msg) for msg in messages], + self.knowledge_cache, + self.generated_reply # 将不合适的回复作为previous_reply传入 + ) + + await self._send_reply() elif action == "fetch_knowledge": self.state = ConversationState.GENERATING @@ -682,17 +690,36 @@ class Conversation: if knowledge != "未找到相关知识": self.knowledge_cache[sources] = knowledge - self.generated_reply, need_replan = await self.reply_generator.generate( + self.generated_reply = await self.reply_generator.generate( self.current_goal, self.current_method, [self._convert_to_message(msg) for msg in messages], self.knowledge_cache ) - if need_replan: - self.state = ConversationState.RETHINKING - self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() - else: - await self._send_reply() + + # 检查回复是否合适 + is_suitable, reason, need_replan = await self.reply_generator.check_reply( + self.generated_reply, + self.current_goal + ) + + if not is_suitable: + logger.warning(f"生成的回复不合适,原因: {reason}") + if need_replan: + self.state = ConversationState.RETHINKING + self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() + return + else: + # 重新生成回复 + self.generated_reply = await self.reply_generator.generate( + self.current_goal, + self.current_method, + [self._convert_to_message(msg) for msg in messages], + self.knowledge_cache, + self.generated_reply # 将不合适的回复作为previous_reply传入 + ) + + await self._send_reply() elif action == "rethink_goal": self.state = ConversationState.RETHINKING @@ -724,7 +751,7 @@ class Conversation: self.should_continue = False self.state = ConversationState.ENDED # 删除实例(这会同时停止chat_observer) - self.remove_instance(self.stream_id) + await self.remove_instance(self.stream_id) async def _send_timeout_message(self): """发送超时结束消息""" @@ -821,7 +848,7 @@ class DirectMessageSender: if not end_point: raise ValueError(f"未找到平台:{chat_stream.platform} 的url配置") - await global_api.send_message(end_point, message_json) + await global_api.send_message_REST(end_point, message_json) # 存储消息 await self.storage.store_message(message, message.chat_stream) diff --git a/src/plugins/PFC/pfc_utils.py b/src/plugins/PFC/pfc_utils.py new file mode 100644 index 000000000..2b94e6c4d --- /dev/null +++ b/src/plugins/PFC/pfc_utils.py @@ -0,0 +1,72 @@ +import json +import re +from typing import Dict, Any, Optional, List, Tuple, Union +from src.common.logger import get_module_logger + +logger = get_module_logger("pfc_utils") + +def get_items_from_json( + content: str, + *items: str, + default_values: Optional[Dict[str, Any]] = None, + required_types: Optional[Dict[str, type]] = None +) -> Tuple[bool, Dict[str, Any]]: + """从文本中提取JSON内容并获取指定字段 + + Args: + content: 包含JSON的文本 + *items: 要提取的字段名 + default_values: 字段的默认值,格式为 {字段名: 默认值} + required_types: 字段的必需类型,格式为 {字段名: 类型} + + Returns: + Tuple[bool, Dict[str, Any]]: (是否成功, 提取的字段字典) + """ + content = content.strip() + result = {} + + # 设置默认值 + if default_values: + result.update(default_values) + + # 尝试解析JSON + try: + json_data = json.loads(content) + except json.JSONDecodeError: + # 如果直接解析失败,尝试查找和提取JSON部分 + json_pattern = r'\{[^{}]*\}' + json_match = re.search(json_pattern, content) + if json_match: + try: + json_data = json.loads(json_match.group()) + except json.JSONDecodeError: + logger.error("提取的JSON内容解析失败") + return False, result + else: + logger.error("无法在返回内容中找到有效的JSON") + return False, result + + # 提取字段 + for item in items: + if item in json_data: + result[item] = json_data[item] + + # 验证必需字段 + if not all(item in result for item in items): + logger.error(f"JSON缺少必要字段,实际内容: {json_data}") + return False, result + + # 验证字段类型 + if required_types: + for field, expected_type in required_types.items(): + if field in result and not isinstance(result[field], expected_type): + logger.error(f"{field} 必须是 {expected_type.__name__} 类型") + return False, result + + # 验证字符串字段不为空 + for field in items: + if isinstance(result[field], str) and not result[field].strip(): + logger.error(f"{field} 不能为空") + return False, result + + return True, result \ No newline at end of file diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 68afd2e76..32308bfa9 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -42,11 +42,24 @@ class ChatBot: if global_config.enable_pfc_chatting: # 获取或创建对话实例 - conversation = Conversation.get_instance(chat_id) + conversation = await Conversation.get_instance(chat_id) + if conversation is None: + logger.error(f"创建或获取对话实例失败: {chat_id}") + return + # 如果是新创建的实例,启动对话系统 if conversation.state == ConversationState.INIT: asyncio.create_task(conversation.start()) logger.info(f"为聊天 {chat_id} 创建新的对话实例") + elif conversation.state == ConversationState.ENDED: + # 如果实例已经结束,重新创建 + await Conversation.remove_instance(chat_id) + conversation = await Conversation.get_instance(chat_id) + if conversation is None: + logger.error(f"重新创建对话实例失败: {chat_id}") + return + asyncio.create_task(conversation.start()) + logger.info(f"为聊天 {chat_id} 重新创建对话实例") except Exception as e: logger.error(f"创建PFC聊天流失败: {e}") @@ -78,8 +91,13 @@ class ChatBot: try: message = MessageRecv(message_data) groupinfo = message.message_info.group_info - logger.debug(f"处理消息:{str(message_data)[:50]}...") + userinfo = message.message_info.user_info + logger.debug(f"处理消息:{str(message_data)[:80]}...") + if userinfo.user_id in global_config.ban_user_id: + logger.debug(f"用户{userinfo.user_id}被禁止回复") + return + if global_config.enable_pfc_chatting: try: if groupinfo is None and global_config.enable_friend_chat: @@ -96,11 +114,11 @@ class ChatBot: await self._create_PFC_chat(message) else: if groupinfo.group_id in global_config.talk_allowed_groups: - logger.debug(f"开始群聊模式{message_data}") + logger.debug(f"开始群聊模式{str(message_data)[:50]}...") if global_config.response_mode == "heart_flow": await self.think_flow_chat.process_message(message_data) elif global_config.response_mode == "reasoning": - logger.debug(f"开始推理模式{message_data}") + logger.debug(f"开始推理模式{str(message_data)[:50]}...") await self.reasoning_chat.process_message(message_data) else: logger.error(f"未知的回复模式,请检查配置文件!!: {global_config.response_mode}") diff --git a/src/plugins/chat/chat_stream.py b/src/plugins/chat/chat_stream.py index 8cddb9376..694e685fa 100644 --- a/src/plugins/chat/chat_stream.py +++ b/src/plugins/chat/chat_stream.py @@ -28,7 +28,7 @@ class ChatStream: self.platform = platform self.user_info = user_info self.group_info = group_info - self.create_time = data.get("create_time", int(time.time())) if data else int(time.time()) + self.create_time = data.get("create_time", time.time()) if data else time.time() self.last_active_time = data.get("last_active_time", self.create_time) if data else self.create_time self.saved = False @@ -60,7 +60,7 @@ class ChatStream: def update_active_time(self): """更新最后活跃时间""" - self.last_active_time = int(time.time()) + self.last_active_time = time.time() self.saved = False diff --git a/src/plugins/chat/message.py b/src/plugins/chat/message.py index 22487831f..f3369d7bb 100644 --- a/src/plugins/chat/message.py +++ b/src/plugins/chat/message.py @@ -168,7 +168,7 @@ class MessageProcessBase(Message): # 调用父类初始化 super().__init__( message_id=message_id, - time=int(time.time()), + time=round(time.time(), 3), # 保留3位小数 chat_stream=chat_stream, user_info=bot_user_info, message_segment=message_segment, diff --git a/src/plugins/chat/message_sender.py b/src/plugins/chat/message_sender.py index 5b4adc8d1..566fe295e 100644 --- a/src/plugins/chat/message_sender.py +++ b/src/plugins/chat/message_sender.py @@ -43,6 +43,12 @@ class Message_Sender: # 按thinking_start_time排序,时间早的在前面 return recalled_messages + async def send_via_ws(self, message: MessageSending) -> None: + try: + await global_api.send_message(message) + except Exception as e: + raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e + async def send_message( self, message: MessageSending, @@ -58,8 +64,14 @@ class Message_Sender: logger.warning(f"消息“{message.processed_plain_text}”已被撤回,不发送") break if not is_recalled: - typing_time = calculate_typing_time(message.processed_plain_text) + # print(message.processed_plain_text + str(message.is_emoji)) + typing_time = calculate_typing_time( + input_string=message.processed_plain_text, + thinking_start_time=message.thinking_start_time, + is_emoji=message.is_emoji) + logger.debug(f"{message.processed_plain_text},{typing_time},计算输入时间结束") await asyncio.sleep(typing_time) + logger.debug(f"{message.processed_plain_text},{typing_time},等待输入时间结束") message_json = message.to_dict() @@ -69,14 +81,14 @@ class Message_Sender: if end_point: # logger.info(f"发送消息到{end_point}") # logger.info(message_json) - await global_api.send_message_REST(end_point, message_json) - else: try: - await global_api.send_message(message) + await global_api.send_message_REST(end_point, message_json) except Exception as e: - raise ValueError( - f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件" - ) from e + logger.error(f"REST方式发送失败,出现错误: {str(e)}") + logger.info("尝试使用ws发送") + await self.send_via_ws(message) + else: + await self.send_via_ws(message) logger.success(f"发送消息“{message_preview}”成功") except Exception as e: logger.error(f"发送消息“{message_preview}”失败: {str(e)}") @@ -214,6 +226,8 @@ class MessageManager: await message_earliest.process() + # print(f"message_earliest.thinking_start_tim22222e:{message_earliest.thinking_start_time}") + await message_sender.send_message(message_earliest) await self.storage.store_message(message_earliest, message_earliest.chat_stream) diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index 9646fe73b..26bd3a171 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -334,26 +334,19 @@ def process_llm_response(text: str) -> List[str]: return sentences -def calculate_typing_time(input_string: str, chinese_time: float = 0.2, english_time: float = 0.1) -> float: +def calculate_typing_time(input_string: str, thinking_start_time: float, chinese_time: float = 0.2, english_time: float = 0.1, is_emoji: bool = False) -> float: """ 计算输入字符串所需的时间,中文和英文字符有不同的输入时间 input_string (str): 输入的字符串 chinese_time (float): 中文字符的输入时间,默认为0.2秒 english_time (float): 英文字符的输入时间,默认为0.1秒 + is_emoji (bool): 是否为emoji,默认为False 特殊情况: - 如果只有一个中文字符,将使用3倍的中文输入时间 - 在所有输入结束后,额外加上回车时间0.3秒 + - 如果is_emoji为True,将使用固定1秒的输入时间 """ - - # 如果输入是列表,将其连接成字符串 - if isinstance(input_string, list): - input_string = ''.join(input_string) - - # 确保现在是字符串类型 - if not isinstance(input_string, str): - input_string = str(input_string) - mood_manager = MoodManager.get_instance() # 将0-1的唤醒度映射到-1到1 mood_arousal = mood_manager.current_mood.arousal @@ -376,7 +369,19 @@ def calculate_typing_time(input_string: str, chinese_time: float = 0.2, english_ else: # 其他字符(如英文) total_time += english_time - return total_time + 0.3 # 加上回车时间 + + if is_emoji: + total_time = 1 + + if time.time() - thinking_start_time > 10: + total_time = 1 + + # print(f"thinking_start_time:{thinking_start_time}") + # print(f"nowtime:{time.time()}") + # print(f"nowtime - thinking_start_time:{time.time() - thinking_start_time}") + # print(f"{total_time}") + + return total_time # 加上回车时间 def cosine_similarity(v1, v2): diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py b/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py index 87fc14045..4c0f035ea 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py @@ -145,12 +145,13 @@ class PromptBuilder: logger.info("开始构建prompt") prompt = f""" +{relation_prompt_all} {memory_prompt} {prompt_info} {schedule_prompt} {chat_target} {chat_talking_prompt} -现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。{relation_prompt_all}\n +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)},{prompt_personality}。 你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些, 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} diff --git a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py index 725fd3f72..5fc752c4c 100644 --- a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py +++ b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py @@ -95,6 +95,8 @@ class ThinkFlowChat: ) if not mark_head: mark_head = True + + # print(f"thinking_start_time:{bot_message.thinking_start_time}") message_set.add_message(bot_message) message_manager.add_message(message_set) diff --git a/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py b/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py index 3cd6096e7..d79878258 100644 --- a/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py +++ b/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py @@ -123,7 +123,7 @@ class PromptBuilder: {chat_talking_prompt} 你刚刚脑子里在想: {current_mind_info} -现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。{relation_prompt_all}\n +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)},{prompt_personality}。 你正在{chat_target_2},现在请你读读之前的聊天记录,然后给出日常且口语化的回复,平淡一些, 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} diff --git a/src/plugins/config/config.py b/src/plugins/config/config.py index c1bb35dbc..680c2bafb 100644 --- a/src/plugins/config/config.py +++ b/src/plugins/config/config.py @@ -25,8 +25,8 @@ config_config = LogConfig( logger = get_module_logger("config", config=config_config) #考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 -mai_version_main = "0.6.0" -mai_version_fix = "" +mai_version_main = "0.6.1" +mai_version_fix = "snapshot-1" if mai_version_fix: mai_version = f"{mai_version_main}-{mai_version_fix}" else: diff --git a/src/plugins/message/api.py b/src/plugins/message/api.py index a29ce429e..2a6a2b6fc 100644 --- a/src/plugins/message/api.py +++ b/src/plugins/message/api.py @@ -29,7 +29,10 @@ class BaseMessageHandler: try: tasks.append(handler(message)) except Exception as e: - raise RuntimeError(str(e)) from e + logger.error(f"消息处理出错: {str(e)}") + logger.error(traceback.format_exc()) + # 不抛出异常,而是记录错误并继续处理其他消息 + continue if tasks: await asyncio.gather(*tasks, return_exceptions=True) @@ -212,9 +215,8 @@ class MessageServer(BaseMessageHandler): try: async with session.post(url, json=data, headers={"Content-Type": "application/json"}) as response: return await response.json() - except Exception: - # logger.error(f"发送消息失败: {str(e)}") - pass + except Exception as e: + raise e class BaseMessageAPI: diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 7df6a6e8e..1906b00e5 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -162,7 +162,7 @@ pfc_chatting = false # 是否启用PFC聊天 # stream = : 用于指定模型是否是使用流式输出 # 如果不指定,则该项是 False -[model.llm_reasoning] #暂时未使用 +[model.llm_reasoning] #只在回复模式为reasoning时启用 name = "Pro/deepseek-ai/DeepSeek-R1" # name = "Qwen/QwQ-32B" provider = "SILICONFLOW"