diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index 4fa6951e2..6781145b6 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -57,6 +57,15 @@ class ChatObserver: self._update_event = asyncio.Event() # 触发更新的事件 self._update_complete = asyncio.Event() # 更新完成的事件 + def check(self) -> bool: + """检查距离上一次观察之后是否有了新消息 + + Returns: + bool: 是否有新消息 + """ + return self.new_message_after(self.last_check_time) + + def new_message_after(self, time_point: float) -> bool: """判断是否在指定时间点后有新消息 diff --git a/src/plugins/PFC/pfc.py b/src/plugins/PFC/pfc.py index 405ca02dc..db92fd80a 100644 --- a/src/plugins/PFC/pfc.py +++ b/src/plugins/PFC/pfc.py @@ -17,6 +17,7 @@ from ..storage.storage import MessageStorage from .chat_observer import ChatObserver from .pfc_KnowledgeFetcher import KnowledgeFetcher from .reply_checker import ReplyChecker +from .pfc_utils import get_items_from_json import json import time @@ -128,43 +129,18 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到 content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 清理内容,尝试提取JSON部分 - content = content.strip() - try: - # 尝试直接解析 - result = json.loads(content) - except json.JSONDecodeError: - # 如果直接解析失败,尝试查找和提取JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - json_match = re.search(json_pattern, content) - if json_match: - try: - result = json.loads(json_match.group()) - except json.JSONDecodeError: - logger.error("提取的JSON内容解析失败,返回默认行动") - return "direct_reply", "JSON解析失败,选择直接回复" - else: - # 如果找不到JSON,尝试从文本中提取行动和原因 - if "direct_reply" in content.lower(): - return "direct_reply", "从文本中提取的行动" - elif "fetch_knowledge" in content.lower(): - return "fetch_knowledge", "从文本中提取的行动" - elif "wait" in content.lower(): - return "wait", "从文本中提取的行动" - elif "listening" in content.lower(): - return "listening", "从文本中提取的行动" - elif "rethink_goal" in content.lower(): - return "rethink_goal", "从文本中提取的行动" - elif "judge_conversation" in content.lower(): - return "judge_conversation", "从文本中提取的行动" - else: - logger.error("无法从返回内容中提取行动类型") - return "direct_reply", "无法解析响应,选择直接回复" + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "action", "reason", + default_values={"action": "direct_reply", "reason": "默认原因"} + ) - # 验证JSON字段 - action = result.get("action", "direct_reply") - reason = result.get("reason", "默认原因") + if not success: + return "direct_reply", "JSON解析失败,选择直接回复" + + action = result["action"] + reason = result["reason"] # 验证action类型 if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal", "judge_conversation"]: @@ -195,6 +171,8 @@ class GoalAnalyzer: self.name = global_config.BOT_NICKNAME self.nick_name = global_config.BOT_ALIAS_NAMES self.chat_observer = ChatObserver.get_instance(stream_id) + + self.current_goal_and_reason = None async def analyze_goal(self) -> Tuple[str, str, str]: """分析对话历史并设定目标 @@ -239,48 +217,20 @@ class GoalAnalyzer: content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 清理和验证返回内容 - if not content or not isinstance(content, str): - logger.error("LLM返回内容为空或格式不正确") - continue - - # 尝试提取JSON部分 - content = content.strip() - try: - # 尝试直接解析 - result = json.loads(content) - except json.JSONDecodeError: - # 如果直接解析失败,尝试查找和提取JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - json_match = re.search(json_pattern, content) - if json_match: - try: - result = json.loads(json_match.group()) - except json.JSONDecodeError: - logger.error(f"提取的JSON内容解析失败,重试第{retry + 1}次") - continue - else: - logger.error(f"无法在返回内容中找到有效的JSON,重试第{retry + 1}次") - continue + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "goal", "reasoning", + required_types={"goal": str, "reasoning": str} + ) - # 验证JSON字段 - if not all(key in result for key in ["goal", "reasoning"]): - logger.error(f"JSON缺少必要字段,实际内容: {result},重试第{retry + 1}次") + if not success: + logger.error(f"无法解析JSON,重试第{retry + 1}次") continue goal = result["goal"] reasoning = result["reasoning"] - # 验证字段内容 - if not isinstance(goal, str) or not isinstance(reasoning, str): - logger.error(f"JSON字段类型错误,goal和reasoning必须是字符串,重试第{retry + 1}次") - continue - - if not goal.strip() or not reasoning.strip(): - logger.error(f"JSON字段内容为空,重试第{retry + 1}次") - continue - # 使用默认的方法 method = "以友好的态度回应" return goal, method, reasoning @@ -330,58 +280,21 @@ class GoalAnalyzer: content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 清理和验证返回内容 - if not content or not isinstance(content, str): - logger.error("LLM返回内容为空或格式不正确") - return False, False, "确保对话顺利进行" - - # 尝试提取JSON部分 - content = content.strip() - try: - # 尝试直接解析 - result = json.loads(content) - except json.JSONDecodeError: - # 如果直接解析失败,尝试查找和提取JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - json_match = re.search(json_pattern, content) - if json_match: - try: - result = json.loads(json_match.group()) - except json.JSONDecodeError as e: - logger.error(f"提取的JSON内容解析失败: {e}") - return False, False, "确保对话顺利进行" - else: - logger.error("无法在返回内容中找到有效的JSON") - return False, False, "确保对话顺利进行" + # 使用简化函数提取JSON内容 + success, result = get_items_from_json( + content, + "goal_achieved", "stop_conversation", "reason", + required_types={ + "goal_achieved": bool, + "stop_conversation": bool, + "reason": str + } + ) - # 验证JSON字段 - if not all(key in result for key in ["goal_achieved", "stop_conversation", "reason"]): - logger.error(f"JSON缺少必要字段,实际内容: {result}") - return False, False, "确保对话顺利进行" - - goal_achieved = result["goal_achieved"] - stop_conversation = result["stop_conversation"] - reason = result["reason"] - - # 验证字段类型 - if not isinstance(goal_achieved, bool): - logger.error("goal_achieved 必须是布尔值") - return False, False, "确保对话顺利进行" - - if not isinstance(stop_conversation, bool): - logger.error("stop_conversation 必须是布尔值") - return False, False, "确保对话顺利进行" - - if not isinstance(reason, str): - logger.error("reason 必须是字符串") - return False, False, "确保对话顺利进行" - - if not reason.strip(): - logger.error("reason 不能为空") + if not success: return False, False, "确保对话顺利进行" - return goal_achieved, stop_conversation, reason + return result["goal_achieved"], result["stop_conversation"], result["reason"] except Exception as e: logger.error(f"分析对话目标时出错: {str(e)}") @@ -536,25 +449,66 @@ class ReplyGenerator: class Conversation: # 类级别的实例管理 _instances: Dict[str, 'Conversation'] = {} + _instance_lock = asyncio.Lock() # 类级别的全局锁 + _init_events: Dict[str, asyncio.Event] = {} # 初始化完成事件 + _initializing: Dict[str, bool] = {} # 标记是否正在初始化 @classmethod - def get_instance(cls, stream_id: str) -> 'Conversation': - """获取或创建对话实例""" - if stream_id not in cls._instances: - cls._instances[stream_id] = cls(stream_id) - logger.info(f"创建新的对话实例: {stream_id}") - return cls._instances[stream_id] + async def get_instance(cls, stream_id: str) -> Optional['Conversation']: + """获取或创建对话实例 + + Args: + stream_id: 聊天流ID + + Returns: + Optional[Conversation]: 对话实例,如果创建或等待失败则返回None + """ + try: + # 使用全局锁来确保线程安全 + async with cls._instance_lock: + # 如果已经在初始化中,等待初始化完成 + if stream_id in cls._initializing and cls._initializing[stream_id]: + # 释放锁等待初始化 + cls._instance_lock.release() + try: + await asyncio.wait_for(cls._init_events[stream_id].wait(), timeout=5.0) + except asyncio.TimeoutError: + logger.error(f"等待实例 {stream_id} 初始化超时") + return None + finally: + await cls._instance_lock.acquire() + + # 如果实例不存在,创建新实例 + if stream_id not in cls._instances: + cls._instances[stream_id] = cls(stream_id) + cls._init_events[stream_id] = asyncio.Event() + cls._initializing[stream_id] = True + logger.info(f"创建新的对话实例: {stream_id}") + + return cls._instances[stream_id] + except Exception as e: + logger.error(f"获取对话实例失败: {e}") + return None @classmethod - def remove_instance(cls, stream_id: str): - """删除对话实例""" - if stream_id in cls._instances: - # 停止相关组件 - instance = cls._instances[stream_id] - instance.chat_observer.stop() - # 删除实例 - del cls._instances[stream_id] - logger.info(f"已删除对话实例 {stream_id}") + async def remove_instance(cls, stream_id: str): + """删除对话实例 + + Args: + stream_id: 聊天流ID + """ + async with cls._instance_lock: + if stream_id in cls._instances: + # 停止相关组件 + instance = cls._instances[stream_id] + instance.chat_observer.stop() + # 删除实例 + del cls._instances[stream_id] + if stream_id in cls._init_events: + del cls._init_events[stream_id] + if stream_id in cls._initializing: + del cls._initializing[stream_id] + logger.info(f"已删除对话实例 {stream_id}") def __init__(self, stream_id: str): """初始化对话系统""" @@ -592,13 +546,21 @@ class Conversation: async def start(self): """开始对话流程""" - logger.info("对话系统启动") - self.should_continue = True - self.chat_observer.start() # 启动观察器 - await asyncio.sleep(1) - # 启动对话循环 - await self._conversation_loop() - + try: + logger.info("对话系统启动") + self.should_continue = True + self.chat_observer.start() # 启动观察器 + await asyncio.sleep(1) + # 启动对话循环 + await self._conversation_loop() + except Exception as e: + logger.error(f"启动对话系统失败: {e}") + raise + finally: + # 标记初始化完成 + self._init_events[self.stream_id].set() + self._initializing[self.stream_id] = False + async def _conversation_loop(self): """对话循环""" # 获取最近的消息历史 @@ -724,7 +686,7 @@ class Conversation: self.should_continue = False self.state = ConversationState.ENDED # 删除实例(这会同时停止chat_observer) - self.remove_instance(self.stream_id) + await self.remove_instance(self.stream_id) async def _send_timeout_message(self): """发送超时结束消息""" diff --git a/src/plugins/PFC/pfc_utils.py b/src/plugins/PFC/pfc_utils.py new file mode 100644 index 000000000..2b94e6c4d --- /dev/null +++ b/src/plugins/PFC/pfc_utils.py @@ -0,0 +1,72 @@ +import json +import re +from typing import Dict, Any, Optional, List, Tuple, Union +from src.common.logger import get_module_logger + +logger = get_module_logger("pfc_utils") + +def get_items_from_json( + content: str, + *items: str, + default_values: Optional[Dict[str, Any]] = None, + required_types: Optional[Dict[str, type]] = None +) -> Tuple[bool, Dict[str, Any]]: + """从文本中提取JSON内容并获取指定字段 + + Args: + content: 包含JSON的文本 + *items: 要提取的字段名 + default_values: 字段的默认值,格式为 {字段名: 默认值} + required_types: 字段的必需类型,格式为 {字段名: 类型} + + Returns: + Tuple[bool, Dict[str, Any]]: (是否成功, 提取的字段字典) + """ + content = content.strip() + result = {} + + # 设置默认值 + if default_values: + result.update(default_values) + + # 尝试解析JSON + try: + json_data = json.loads(content) + except json.JSONDecodeError: + # 如果直接解析失败,尝试查找和提取JSON部分 + json_pattern = r'\{[^{}]*\}' + json_match = re.search(json_pattern, content) + if json_match: + try: + json_data = json.loads(json_match.group()) + except json.JSONDecodeError: + logger.error("提取的JSON内容解析失败") + return False, result + else: + logger.error("无法在返回内容中找到有效的JSON") + return False, result + + # 提取字段 + for item in items: + if item in json_data: + result[item] = json_data[item] + + # 验证必需字段 + if not all(item in result for item in items): + logger.error(f"JSON缺少必要字段,实际内容: {json_data}") + return False, result + + # 验证字段类型 + if required_types: + for field, expected_type in required_types.items(): + if field in result and not isinstance(result[field], expected_type): + logger.error(f"{field} 必须是 {expected_type.__name__} 类型") + return False, result + + # 验证字符串字段不为空 + for field in items: + if isinstance(result[field], str) and not result[field].strip(): + logger.error(f"{field} 不能为空") + return False, result + + return True, result \ No newline at end of file diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index cd5b758f7..cfdfbdb32 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -42,11 +42,24 @@ class ChatBot: if global_config.enable_pfc_chatting: # 获取或创建对话实例 - conversation = Conversation.get_instance(chat_id) + conversation = await Conversation.get_instance(chat_id) + if conversation is None: + logger.error(f"创建或获取对话实例失败: {chat_id}") + return + # 如果是新创建的实例,启动对话系统 if conversation.state == ConversationState.INIT: asyncio.create_task(conversation.start()) logger.info(f"为聊天 {chat_id} 创建新的对话实例") + elif conversation.state == ConversationState.ENDED: + # 如果实例已经结束,重新创建 + await Conversation.remove_instance(chat_id) + conversation = await Conversation.get_instance(chat_id) + if conversation is None: + logger.error(f"重新创建对话实例失败: {chat_id}") + return + asyncio.create_task(conversation.start()) + logger.info(f"为聊天 {chat_id} 重新创建对话实例") except Exception as e: logger.error(f"创建PFC聊天流失败: {e}") diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py b/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py index e3015fe1e..af18fe6ae 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_prompt_builder.py @@ -142,12 +142,13 @@ class PromptBuilder: logger.info("开始构建prompt") prompt = f""" +{relation_prompt_all} {memory_prompt} {prompt_info} {schedule_prompt} {chat_target} {chat_talking_prompt} -现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。{relation_prompt_all}\n +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)},{prompt_personality}。 你正在{chat_target_2},现在请你读读之前的聊天记录,{mood_prompt},然后给出日常且口语化的回复,平淡一些, 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger} diff --git a/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py b/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py index 3cd6096e7..d79878258 100644 --- a/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py +++ b/src/plugins/chat_module/think_flow_chat/think_flow_prompt_builder.py @@ -123,7 +123,7 @@ class PromptBuilder: {chat_talking_prompt} 你刚刚脑子里在想: {current_mind_info} -现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。{relation_prompt_all}\n +现在"{sender_name}"说的:{message_txt}。引起了你的注意,你想要在群里发言发言或者回复这条消息。\n 你的网名叫{global_config.BOT_NICKNAME},有人也叫你{"/".join(global_config.BOT_ALIAS_NAMES)},{prompt_personality}。 你正在{chat_target_2},现在请你读读之前的聊天记录,然后给出日常且口语化的回复,平淡一些, 尽量简短一些。{keywords_reaction_prompt}请注意把握聊天内容,不要回复的太有条理,可以有个性。{prompt_ger}