From 687c3f6710e48fc9e059f2ffe5d02c709973418d Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Tue, 8 Apr 2025 22:31:47 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9APFC=E9=87=8D=E6=9E=84=EF=BC=8C?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=8C=96=E6=8B=86=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/PFC/action_planner.py | 108 +++++------ src/plugins/PFC/chat_observer.py | 73 +++++++ src/plugins/PFC/conversation.py | 179 +++++++++-------- src/plugins/PFC/decision_info.py | 116 ----------- src/plugins/PFC/message_sender.py | 49 +++++ src/plugins/PFC/notification_handler.py | 71 +++++++ src/plugins/PFC/observation_info.py | 246 ++++++++++++++++++++++++ src/plugins/PFC/pfc.py | 121 ++---------- src/plugins/PFC/pfc_manager.py | 2 +- src/plugins/PFC/pfc_types.py | 21 ++ src/plugins/PFC/reply_generator.py | 15 +- src/plugins/PFC/waiter.py | 45 +++++ 12 files changed, 661 insertions(+), 385 deletions(-) delete mode 100644 src/plugins/PFC/decision_info.py create mode 100644 src/plugins/PFC/message_sender.py create mode 100644 src/plugins/PFC/notification_handler.py create mode 100644 src/plugins/PFC/observation_info.py create mode 100644 src/plugins/PFC/pfc_types.py create mode 100644 src/plugins/PFC/waiter.py diff --git a/src/plugins/PFC/action_planner.py b/src/plugins/PFC/action_planner.py index c24cc0903..d4f13452e 100644 --- a/src/plugins/PFC/action_planner.py +++ b/src/plugins/PFC/action_planner.py @@ -1,27 +1,25 @@ import datetime -import asyncio -from typing import List, Optional, Dict, Any, Tuple, Literal, Set -from enum import Enum +from typing import List, Dict, Tuple from src.common.logger import get_module_logger -from ..chat.chat_stream import ChatStream -from ..message.message_base import UserInfo, Seg -from ..chat.message import Message +from ..message.message_base import UserInfo from ..models.utils_model import LLM_request from ..config.config import global_config -from src.plugins.chat.message import MessageSending -from ..message.api import global_api -from ..storage.storage import MessageStorage from .chat_observer import ChatObserver -from .reply_checker import ReplyChecker from .pfc_utils import get_items_from_json from src.individuality.individuality import Individuality -from .chat_states import NotificationHandler, Notification, NotificationType -import time -from dataclasses import dataclass, field -from .pfc import DecisionInfo, DecisionInfoType +from .observation_info import ObservationInfo +from .conversation import ConversationInfo logger = get_module_logger("action_planner") +class ActionPlannerInfo: + def __init__(self): + self.done_action = [] + self.goal_list = [] + self.knowledge_list = [] + self.memory_list = [] + + class ActionPlanner: """行动规划器""" @@ -38,73 +36,60 @@ class ActionPlanner: async def plan( self, - goal: str, - method: str, - reasoning: str, - action_history: List[Dict[str, str]] = None, - decision_info: DecisionInfoType = None # Use DecisionInfoType here + observation_info: ObservationInfo, + conversation_info: ConversationInfo ) -> Tuple[str, str]: """规划下一步行动 Args: - goal: 对话目标 - method: 实现方法 - reasoning: 目标原因 - action_history: 行动历史记录 - decision_info: 决策信息 + observation_info: 决策信息 + conversation_info: 对话信息 Returns: Tuple[str, str]: (行动类型, 行动原因) """ # 构建提示词 - logger.debug(f"开始规划行动:当前目标: {goal}") + logger.debug(f"开始规划行动:当前目标: {conversation_info.goal_list}") - # 获取最近20条消息 - messages = self.chat_observer.get_message_history(limit=20) + #构建对话目标 + if conversation_info.goal_list: + goal, reasoning = conversation_info.goal_list[-1] + else: + goal = "目前没有明确对话目标" + reasoning = "目前没有明确对话目标,最好思考一个对话目标" + + + # 获取聊天历史记录 + chat_history_list = observation_info.chat_history chat_history_text = "" - for msg in messages: - time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") - user_info = UserInfo.from_dict(msg.get("user_info", {})) - sender = user_info.user_nickname or f"用户{user_info.user_id}" - if sender == self.name: - sender = "你说" - chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n" + for msg in chat_history_list: + chat_history_text += f"{msg}\n" + + if observation_info.new_messages_count > 0: + new_messages_list = observation_info.unprocessed_messages + + chat_history_text += f"有{observation_info.new_messages_count}条新消息:\n" + for msg in new_messages_list: + chat_history_text += f"{msg}\n" + + observation_info.clear_unprocessed_messages() + personality_text = f"你的名字是{self.name},{self.personality_info}" # 构建action历史文本 - action_history_text = "" - if action_history and action_history[-1]['action'] == "direct_reply": - action_history_text = "你刚刚发言回复了对方" + action_history_list = conversation_info.action_history + action_history_text = "你之前做的事情是:" + for action in action_history_list: + action_history_text += f"{action}\n" - # 构建决策信息文本 - decision_info_text = "" - if decision_info: - decision_info_text = "当前对话状态:\n" - if decision_info.is_cold_chat: - decision_info_text += f"对话处于冷场状态,已持续{int(decision_info.cold_chat_duration)}秒\n" - - if decision_info.new_messages_count > 0: - decision_info_text += f"有{decision_info.new_messages_count}条新消息未处理\n" - - user_response_time = decision_info.get_user_response_time() - if user_response_time: - decision_info_text += f"距离用户上次发言已过去{int(user_response_time)}秒\n" - - bot_response_time = decision_info.get_bot_response_time() - if bot_response_time: - decision_info_text += f"距离你上次发言已过去{int(bot_response_time)}秒\n" - - if decision_info.active_users: - decision_info_text += f"当前活跃用户数: {len(decision_info.active_users)}\n" + prompt = f"""{personality_text}。现在你在参与一场QQ聊天,请分析以下内容,根据信息决定下一步行动: 当前对话目标:{goal} -实现该对话目标的方式:{method} 产生该对话目标的原因:{reasoning} -{decision_info_text} {action_history_text} 最近的对话记录: @@ -117,7 +102,6 @@ wait: 当你做出了发言,对方尚未回复时等待对方的回复 listening: 倾听对方发言,当你认为对方发言尚未结束时采用 direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言 rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标 -judge_conversation: 判断对话是否结束,当发现对话目标已经达到或者希望停止对话时选择,会判断对话是否结束 请以JSON格式输出,包含以下字段: 1. action: 行动类型,注意你之前的行为 @@ -134,7 +118,7 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到 success, result = get_items_from_json( content, "action", "reason", - default_values={"action": "direct_reply", "reason": "默认原因"} + default_values={"action": "direct_reply", "reason": "没有明确原因"} ) if not success: @@ -144,7 +128,7 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到 reason = result["reason"] # 验证action类型 - if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal", "judge_conversation"]: + if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal"]: logger.warning(f"未知的行动类型: {action},默认使用listening") action = "listening" diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index 2fda95d2c..0a6b3bfb6 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -69,6 +69,11 @@ class ChatObserver: self.cold_chat_threshold: float = 60.0 # 60秒无消息判定为冷场 self.last_cold_chat_check: float = time.time() self.is_cold_chat_state: bool = False + + self.update_event = asyncio.Event() + self.update_interval = 5 # 更新间隔(秒) + self.message_cache = [] + self.update_running = False async def check(self) -> bool: """检查距离上一次观察之后是否有了新消息 @@ -368,3 +373,71 @@ class ChatObserver: time_info += f"\n距离对方上次发言已经过去了{int(user_speak_ago)}秒" return time_info + + def start_periodic_update(self): + """启动观察器的定期更新""" + if not self.update_running: + self.update_running = True + asyncio.create_task(self._periodic_update()) + + async def _periodic_update(self): + """定期更新消息历史""" + try: + while self.update_running: + await self._update_message_history() + await asyncio.sleep(self.update_interval) + except Exception as e: + logger.error(f"定期更新消息历史时出错: {str(e)}") + + async def _update_message_history(self) -> bool: + """更新消息历史 + + Returns: + bool: 是否有新消息 + """ + try: + messages = await self.message_storage.get_messages_for_stream( + self.stream_id, + limit=50 + ) + + if not messages: + return False + + # 检查是否有新消息 + has_new_messages = False + if messages and (not self.message_cache or messages[0]["message_id"] != self.message_cache[0]["message_id"]): + has_new_messages = True + + self.message_cache = messages + + if has_new_messages: + self.update_event.set() + self.update_event.clear() + return True + return False + + except Exception as e: + logger.error(f"更新消息历史时出错: {str(e)}") + return False + + def get_message_history(self, limit: int = 50) -> List[Dict[str, Any]]: + """获取消息历史 + + Args: + limit: 获取的最大消息数量 + + Returns: + List[Dict[str, Any]]: 消息历史列表 + """ + return self.message_cache[:limit] + + def get_last_message(self) -> Optional[Dict[str, Any]]: + """获取最后一条消息 + + Returns: + Optional[Dict[str, Any]]: 最后一条消息,如果没有则返回None + """ + if not self.message_cache: + return None + return self.message_cache[0] diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 5321b9c45..0d9457980 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -2,21 +2,31 @@ import asyncio import datetime from typing import Dict, Any from ..chat.message import Message -from .pfc import ConversationState, ChatObserver,GoalAnalyzer, Waiter, DirectMessageSender, PFCNotificationHandler +from .pfc_types import ConversationState +from .pfc import ChatObserver, GoalAnalyzer, Waiter, DirectMessageSender, PFCNotificationHandler from src.common.logger import get_module_logger from .action_planner import ActionPlanner -from .decision_info import DecisionInfo +from .observation_info import ObservationInfo from .reply_generator import ReplyGenerator from ..chat.chat_stream import ChatStream from ..message.message_base import UserInfo from ..config.config import global_config from src.plugins.chat.chat_stream import chat_manager from .pfc_KnowledgeFetcher import KnowledgeFetcher +from .chat_states import NotificationType import time import traceback logger = get_module_logger("pfc_conversation") +class ConversationInfo: + def __init__(self): + self.done_action = [] + self.goal_list = [] + self.knowledge_list = [] + self.memory_list = [] + + class Conversation: """对话类,负责管理单个对话的状态和行为""" @@ -31,96 +41,86 @@ class Conversation: self.state = ConversationState.INIT self.should_continue = False - # 目标和规划 - self.current_goal = "保持友好的对话" - self.current_method = "以友好的态度回应" - self.goal_reasoning = "确保对话顺利进行" - - # 知识缓存和行动历史 - self.knowledge_cache = {} - self.action_history = [] - # 回复相关 self.generated_reply = "" async def _initialize(self): """初始化实例,注册所有组件""" + try: - - self.chat_observer = ChatObserver.get_instance(self.stream_id) self.action_planner = ActionPlanner(self.stream_id) self.goal_analyzer = GoalAnalyzer(self.stream_id) self.reply_generator = ReplyGenerator(self.stream_id) self.knowledge_fetcher = KnowledgeFetcher() self.waiter = Waiter(self.stream_id) self.direct_sender = DirectMessageSender() - + # 获取聊天流信息 self.chat_stream = chat_manager.get_stream(self.stream_id) - # 决策信息 - self.decision_info = DecisionInfo() - self.decision_info.bot_id = global_config.BOT_QQ - # 创建通知处理器 self.notification_handler = PFCNotificationHandler(self) - + + self.stop_action_planner = False except Exception as e: - logger.error(f"初始化对话实例:注册组件失败: {e}") + logger.error(f"初始化对话实例:注册运行组件失败: {e}") logger.error(traceback.format_exc()) raise + try: - start_time = time.time() - self.chat_observer.start() # 启动观察器 - logger.info(f"观察器启动完成,耗时: {time.time() - start_time:.2f}秒") - - await asyncio.sleep(1) # 给观察器一些启动时间 - - total_time = time.time() - start_time - logger.info(f"实例初始化完成,总耗时: {total_time:.2f}秒") - - self.should_continue = True - asyncio.create_task(self.start()) + #决策所需要的信息,包括自身自信和观察信息两部分 + #注册观察器和观测信息 + self.chat_observer = ChatObserver.get_instance(self.stream_id) + self.chat_observer.start() + self.observation_info = ObservationInfo() + self.observation_info.bind_to_chat_observer(self.stream_id) + #对话信息 + self.conversation_info = ConversationInfo() except Exception as e: - logger.error(f"初始化对话实例失败: {e}") + logger.error(f"初始化对话实例:注册信息组件失败: {e}") logger.error(traceback.format_exc()) raise + + # 组件准备完成,启动该论对话 + self.should_continue = True + asyncio.create_task(self.start()) + async def start(self): """开始对话流程""" try: - logger.info("对话系统启动") - while self.should_continue: - await self._do_a_step() + logger.info("对话系统启动中...") + asyncio.create_task(self._plan_and_action_loop()) except Exception as e: logger.error(f"启动对话系统失败: {e}") raise - - async def _do_a_step(self): - """思考步""" + + + async def _plan_and_action_loop(self): + """思考步,PFC核心循环模块""" # 获取最近的消息历史 - self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() - - self.chat_observer.trigger_update() # 触发立即更新 - if not await self.chat_observer.wait_for_update(): - logger.warning("等待消息更新超时") - - # 使用决策信息来辅助行动规划 - action, reason = await self.action_planner.plan( - self.current_goal, - self.current_method, - self.goal_reasoning, - self.action_history, - self.decision_info # 传入决策信息 - ) - - # 执行行动 - await self._handle_action(action, reason) - - # # 清理已处理的消息 - # self.decision_info.clear_unprocessed_messages() + while self.should_continue: + # 使用决策信息来辅助行动规划 + action, reason = await self.action_planner.plan( + self.observation_info, + self.conversation_info + ) + if self._check_new_messages_after_planning(): + continue + + # 执行行动 + await self._handle_action(action, reason, self.observation_info, self.conversation_info) + + async def _check_new_messages_after_planning(self): + """检查在规划后是否有新消息""" + if self.observation_info.new_messages_count > 0: + logger.info(f"发现{self.observation_info.new_messages_count}条新消息,可能需要重新考虑行动") + # 如果需要,可以在这里添加逻辑来根据新消息重新决定行动 + return True + return False + def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message: """将消息字典转换为Message对象""" @@ -141,20 +141,18 @@ class Conversation: logger.warning(f"转换消息时出错: {e}") raise - async def _handle_action(self, action: str, reason: str): + async def _handle_action(self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo): """处理规划的行动""" logger.info(f"执行行动: {action}, 原因: {reason}") - # 记录action历史 - self.action_history.append({ + # 记录action历史,先设置为stop,完成后再设置为done + conversation_info.action_history.append({ "action": action, "reason": reason, + "status": "stop", "time": datetime.datetime.now().strftime("%H:%M:%S") }) - # 只保留最近的10条记录 - if len(self.action_history) > 10: - self.action_history = self.action_history[-10:] if action == "direct_reply": self.state = ConversationState.GENERATING @@ -174,37 +172,34 @@ class Conversation: await self._send_reply() - elif action == "fetch_knowledge": - self.state = ConversationState.GENERATING - messages = self.chat_observer.get_message_history(limit=30) - knowledge, sources = await self.knowledge_fetcher.fetch( - self.current_goal, - [self._convert_to_message(msg) for msg in messages] - ) - logger.info(f"获取到知识,来源: {sources}") + conversation_info.action_history.append({ + "action": action, + "reason": reason, + "status": "done", + "time": datetime.datetime.now().strftime("%H:%M:%S") + }) - if knowledge != "未找到相关知识": - self.knowledge_cache[sources] = knowledge + elif action == "fetch_knowledge": + self.state = ConversationState.FETCHING + knowledge = "TODO:知识" + topic = "TODO:关键词" + + logger.info(f"假装获取到知识{knowledge},关键词是: {topic}") + + if knowledge: + if topic not in self.conversation_info.knowledge_list: + self.conversation_info.knowledge_list.append({ + "topic": topic, + "knowledge": knowledge + }) + else: + self.conversation_info.knowledge_list[topic] += knowledge elif action == "rethink_goal": self.state = ConversationState.RETHINKING - self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal() - - elif action == "judge_conversation": - self.state = ConversationState.JUDGING - self.goal_achieved, self.stop_conversation, self.reason = await self.goal_analyzer.analyze_conversation(self.current_goal, self.goal_reasoning) - - # 如果当前目标达成但还有其他目标 - if self.goal_achieved and not self.stop_conversation: - alternative_goals = await self.goal_analyzer.get_alternative_goals() - if alternative_goals: - # 切换到下一个目标 - self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0] - logger.info(f"当前目标已达成,切换到新目标: {self.current_goal}") - return - - if self.stop_conversation: - await self._stop_conversation() + goal_list = observation_info.goal_list + new_goal_list = await self.goal_analyzer.analyze_goal(goal_list) + observation_info.goal_list = new_goal_list elif action == "listening": self.state = ConversationState.LISTENING @@ -230,7 +225,7 @@ class Conversation: latest_message = self._convert_to_message(messages[0]) await self.direct_sender.send_message( chat_stream=self.chat_stream, - content="抱歉,由于等待时间过长,我需要先去忙别的了。下次再聊吧~", + content="TODO:超时消息", reply_to_message=latest_message ) except Exception as e: diff --git a/src/plugins/PFC/decision_info.py b/src/plugins/PFC/decision_info.py deleted file mode 100644 index 29beb3484..000000000 --- a/src/plugins/PFC/decision_info.py +++ /dev/null @@ -1,116 +0,0 @@ -#Programmable Friendly Conversationalist -#Prefrontal cortex -import datetime -import asyncio -from typing import List, Optional, Dict, Any, Tuple, Literal, Set -from enum import Enum -from src.common.logger import get_module_logger -from ..chat.chat_stream import ChatStream -from ..message.message_base import UserInfo, Seg -from ..chat.message import Message -from ..models.utils_model import LLM_request -from ..config.config import global_config -from src.plugins.chat.message import MessageSending -from ..message.api import global_api -from ..storage.storage import MessageStorage -from .chat_observer import ChatObserver -from .reply_generator import ReplyGenerator -from .pfc_utils import get_items_from_json -from src.individuality.individuality import Individuality -from .chat_states import NotificationHandler, Notification, NotificationType -import time -from dataclasses import dataclass, field -from .conversation import Conversation - - -@dataclass -class DecisionInfo: - """决策信息类,用于收集和管理来自chat_observer的通知信息""" - - # 消息相关 - last_message_time: Optional[float] = None - last_message_content: Optional[str] = None - last_message_sender: Optional[str] = None - new_messages_count: int = 0 - unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) - - # 对话状态 - is_cold_chat: bool = False - cold_chat_duration: float = 0.0 - last_bot_speak_time: Optional[float] = None - last_user_speak_time: Optional[float] = None - - # 对话参与者 - active_users: Set[str] = field(default_factory=set) - bot_id: str = field(default="") - - def update_from_message(self, message: Dict[str, Any]): - """从消息更新信息 - - Args: - message: 消息数据 - """ - self.last_message_time = message["time"] - self.last_message_content = message.get("processed_plain_text", "") - - user_info = UserInfo.from_dict(message.get("user_info", {})) - self.last_message_sender = user_info.user_id - - if user_info.user_id == self.bot_id: - self.last_bot_speak_time = message["time"] - else: - self.last_user_speak_time = message["time"] - self.active_users.add(user_info.user_id) - - self.new_messages_count += 1 - self.unprocessed_messages.append(message) - - def update_cold_chat_status(self, is_cold: bool, current_time: float): - """更新冷场状态 - - Args: - is_cold: 是否冷场 - current_time: 当前时间 - """ - self.is_cold_chat = is_cold - if is_cold and self.last_message_time: - self.cold_chat_duration = current_time - self.last_message_time - - def get_active_duration(self) -> float: - """获取当前活跃时长 - - Returns: - float: 最后一条消息到现在的时长(秒) - """ - if not self.last_message_time: - return 0.0 - return time.time() - self.last_message_time - - def get_user_response_time(self) -> Optional[float]: - """获取用户响应时间 - - Returns: - Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None - """ - if not self.last_user_speak_time: - return None - return time.time() - self.last_user_speak_time - - def get_bot_response_time(self) -> Optional[float]: - """获取机器人响应时间 - - Returns: - Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None - """ - if not self.last_bot_speak_time: - return None - return time.time() - self.last_bot_speak_time - - def clear_unprocessed_messages(self): - """清空未处理消息列表""" - self.unprocessed_messages.clear() - self.new_messages_count = 0 - - -# Forward reference for type hints -DecisionInfoType = DecisionInfo \ No newline at end of file diff --git a/src/plugins/PFC/message_sender.py b/src/plugins/PFC/message_sender.py new file mode 100644 index 000000000..6df1e7ded --- /dev/null +++ b/src/plugins/PFC/message_sender.py @@ -0,0 +1,49 @@ +from typing import Optional +from src.common.logger import get_module_logger +from ..chat.chat_stream import ChatStream +from ..chat.message import Message +from ..message.message_base import Seg +from src.plugins.chat.message import MessageSending + +logger = get_module_logger("message_sender") + +class DirectMessageSender: + """直接消息发送器""" + + def __init__(self): + pass + + async def send_message( + self, + chat_stream: ChatStream, + content: str, + reply_to_message: Optional[Message] = None, + ) -> None: + """发送消息到聊天流 + + Args: + chat_stream: 聊天流 + content: 消息内容 + reply_to_message: 要回复的消息(可选) + """ + try: + # 创建消息内容 + segments = [Seg(type="text", data={"text": content})] + + # 检查是否需要引用回复 + if reply_to_message: + reply_id = reply_to_message.message_id + message_sending = MessageSending( + segments=segments, + reply_to_id=reply_id + ) + else: + message_sending = MessageSending(segments=segments) + + # 发送消息 + await chat_stream.send_message(message_sending) + logger.info(f"消息已发送: {content}") + + except Exception as e: + logger.error(f"发送消息失败: {str(e)}") + raise \ No newline at end of file diff --git a/src/plugins/PFC/notification_handler.py b/src/plugins/PFC/notification_handler.py new file mode 100644 index 000000000..38c0d0dee --- /dev/null +++ b/src/plugins/PFC/notification_handler.py @@ -0,0 +1,71 @@ +from typing import TYPE_CHECKING +from src.common.logger import get_module_logger +from .chat_states import NotificationHandler, Notification, NotificationType + +if TYPE_CHECKING: + from .conversation import Conversation + +logger = get_module_logger("notification_handler") + +class PFCNotificationHandler(NotificationHandler): + """PFC通知处理器""" + + def __init__(self, conversation: 'Conversation'): + """初始化PFC通知处理器 + + Args: + conversation: 对话实例 + """ + self.conversation = conversation + + async def handle_notification(self, notification: Notification): + """处理通知 + + Args: + notification: 通知对象 + """ + logger.debug(f"收到通知: {notification.type.name}, 数据: {notification.data}") + + # 根据通知类型执行不同的处理 + if notification.type == NotificationType.NEW_MESSAGE: + # 新消息通知 + await self._handle_new_message(notification) + elif notification.type == NotificationType.COLD_CHAT: + # 冷聊天通知 + await self._handle_cold_chat(notification) + elif notification.type == NotificationType.COMMAND: + # 命令通知 + await self._handle_command(notification) + else: + logger.warning(f"未知的通知类型: {notification.type.name}") + + async def _handle_new_message(self, notification: Notification): + """处理新消息通知 + + Args: + notification: 通知对象 + """ + + # 更新决策信息 + observation_info = self.conversation.observation_info + observation_info.last_message_time = notification.data.get("time", 0) + observation_info.add_unprocessed_message(notification.data) + + # 手动触发观察器更新 + self.conversation.chat_observer.trigger_update() + + async def _handle_cold_chat(self, notification: Notification): + """处理冷聊天通知 + + Args: + notification: 通知对象 + """ + # 获取冷聊天信息 + cold_duration = notification.data.get("duration", 0) + + # 更新决策信息 + observation_info = self.conversation.observation_info + observation_info.conversation_cold_duration = cold_duration + + logger.info(f"对话已冷: {cold_duration}秒") + \ No newline at end of file diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py new file mode 100644 index 000000000..2967f10e3 --- /dev/null +++ b/src/plugins/PFC/observation_info.py @@ -0,0 +1,246 @@ +#Programmable Friendly Conversationalist +#Prefrontal cortex +from typing import List, Optional, Dict, Any, Set +from ..message.message_base import UserInfo +import time +from dataclasses import dataclass, field +from src.common.logger import get_module_logger +from .chat_observer import ChatObserver +from .chat_states import NotificationHandler + +logger = get_module_logger("observation_info") + +class ObservationInfoHandler(NotificationHandler): + """ObservationInfo的通知处理器""" + + def __init__(self, observation_info: 'ObservationInfo'): + """初始化处理器 + + Args: + observation_info: 要更新的ObservationInfo实例 + """ + self.observation_info = observation_info + + async def handle_notification(self, notification: Dict[str, Any]): + """处理通知 + + Args: + notification: 通知数据 + """ + notification_type = notification.get("type") + data = notification.get("data", {}) + + if notification_type == "NEW_MESSAGE": + # 处理新消息通知 + logger.debug(f"收到新消息通知data: {data}") + message = data.get("message", {}) + self.observation_info.update_from_message(message) + # self.observation_info.has_unread_messages = True + # self.observation_info.new_unread_message.append(message.get("processed_plain_text", "")) + + elif notification_type == "COLD_CHAT": + # 处理冷场通知 + is_cold = data.get("is_cold", False) + self.observation_info.update_cold_chat_status(is_cold, time.time()) + + elif notification_type == "ACTIVE_CHAT": + # 处理活跃通知 + is_active = data.get("is_active", False) + self.observation_info.is_cold = not is_active + + elif notification_type == "BOT_SPEAKING": + # 处理机器人说话通知 + self.observation_info.is_typing = False + self.observation_info.last_bot_speak_time = time.time() + + elif notification_type == "USER_SPEAKING": + # 处理用户说话通知 + self.observation_info.is_typing = False + self.observation_info.last_user_speak_time = time.time() + + elif notification_type == "MESSAGE_DELETED": + # 处理消息删除通知 + message_id = data.get("message_id") + self.observation_info.unprocessed_messages = [ + msg for msg in self.observation_info.unprocessed_messages + if msg.get("message_id") != message_id + ] + + elif notification_type == "USER_JOINED": + # 处理用户加入通知 + user_id = data.get("user_id") + if user_id: + self.observation_info.active_users.add(user_id) + + elif notification_type == "USER_LEFT": + # 处理用户离开通知 + user_id = data.get("user_id") + if user_id: + self.observation_info.active_users.discard(user_id) + + elif notification_type == "ERROR": + # 处理错误通知 + error_msg = data.get("error", "") + logger.error(f"收到错误通知: {error_msg}") + +@dataclass +class ObservationInfo: + """决策信息类,用于收集和管理来自chat_observer的通知信息""" + + #data_list + chat_history: List[str] = field(default_factory=list) + unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) + active_users: Set[str] = field(default_factory=set) + + #data + last_bot_speak_time: Optional[float] = None + last_user_speak_time: Optional[float] = None + last_message_time: Optional[float] = None + last_message_content: str = "" + last_message_sender: Optional[str] = None + bot_id: Optional[str] = None + new_messages_count: int = 0 + cold_chat_duration: float = 0.0 + + #state + is_typing: bool = False + has_unread_messages: bool = False + is_cold_chat: bool = False + changed: bool = False + + # #spec + # meta_plan_trigger: bool = False + + def __post_init__(self): + """初始化后创建handler""" + self.chat_observer = None + self.handler = ObservationInfoHandler(self) + + def bind_to_chat_observer(self, stream_id: str): + """绑定到指定的chat_observer + + Args: + stream_id: 聊天流ID + """ + self.chat_observer = ChatObserver.get_instance(stream_id) + self.chat_observer.notification_manager.register_handler( + target="observation_info", + notification_type="NEW_MESSAGE", + handler=self.handler + ) + self.chat_observer.notification_manager.register_handler( + target="observation_info", + notification_type="COLD_CHAT", + handler=self.handler + ) + + def unbind_from_chat_observer(self): + """解除与chat_observer的绑定""" + if self.chat_observer: + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", + notification_type="NEW_MESSAGE", + handler=self.handler + ) + self.chat_observer.notification_manager.unregister_handler( + target="observation_info", + notification_type="COLD_CHAT", + handler=self.handler + ) + self.chat_observer = None + + def update_from_message(self, message: Dict[str, Any]): + """从消息更新信息 + + Args: + message: 消息数据 + """ + logger.debug(f"更新信息from_message: {message}") + self.last_message_time = message["time"] + self.last_message_content = message.get("processed_plain_text", "") + + user_info = UserInfo.from_dict(message.get("user_info", {})) + self.last_message_sender = user_info.user_id + + if user_info.user_id == self.bot_id: + self.last_bot_speak_time = message["time"] + else: + self.last_user_speak_time = message["time"] + self.active_users.add(user_info.user_id) + + self.new_messages_count += 1 + self.unprocessed_messages.append(message) + + self.update_changed() + + def update_changed(self): + """更新changed状态""" + self.changed = True + # self.meta_plan_trigger = True + + def update_cold_chat_status(self, is_cold: bool, current_time: float): + """更新冷场状态 + + Args: + is_cold: 是否冷场 + current_time: 当前时间 + """ + self.is_cold_chat = is_cold + if is_cold and self.last_message_time: + self.cold_chat_duration = current_time - self.last_message_time + + def get_active_duration(self) -> float: + """获取当前活跃时长 + + Returns: + float: 最后一条消息到现在的时长(秒) + """ + if not self.last_message_time: + return 0.0 + return time.time() - self.last_message_time + + def get_user_response_time(self) -> Optional[float]: + """获取用户响应时间 + + Returns: + Optional[float]: 用户最后发言到现在的时长(秒),如果没有用户发言则返回None + """ + if not self.last_user_speak_time: + return None + return time.time() - self.last_user_speak_time + + def get_bot_response_time(self) -> Optional[float]: + """获取机器人响应时间 + + Returns: + Optional[float]: 机器人最后发言到现在的时长(秒),如果没有机器人发言则返回None + """ + if not self.last_bot_speak_time: + return None + return time.time() - self.last_bot_speak_time + + def clear_unprocessed_messages(self): + """清空未处理消息列表""" + # 将未处理消息添加到历史记录中 + for message in self.unprocessed_messages: + if "processed_plain_text" in message: + self.chat_history.append(message["processed_plain_text"]) + # 清空未处理消息列表 + self.has_unread_messages = False + self.unprocessed_messages.clear() + self.new_messages_count = 0 + + def add_unprocessed_message(self, message: Dict[str, Any]): + """添加未处理的消息 + + Args: + message: 消息数据 + """ + # 防止重复添加同一消息 + message_id = message.get("message_id") + if message_id and not any(m.get("message_id") == message_id for m in self.unprocessed_messages): + self.unprocessed_messages.append(message) + self.new_messages_count += 1 + + # 同时更新其他消息相关信息 + self.update_from_message(message) \ No newline at end of file diff --git a/src/plugins/PFC/pfc.py b/src/plugins/PFC/pfc.py index 072066660..65a2ac982 100644 --- a/src/plugins/PFC/pfc.py +++ b/src/plugins/PFC/pfc.py @@ -2,8 +2,7 @@ #Prefrontal cortex import datetime import asyncio -from typing import List, Optional, Dict, Any, Tuple, Literal, Set -from enum import Enum +from typing import List, Optional, Tuple, TYPE_CHECKING from src.common.logger import get_module_logger from ..chat.chat_stream import ChatStream from ..message.message_base import UserInfo, Seg @@ -14,34 +13,20 @@ from src.plugins.chat.message import MessageSending from ..message.api import global_api from ..storage.storage import MessageStorage from .chat_observer import ChatObserver -from .reply_generator import ReplyGenerator from .pfc_utils import get_items_from_json from src.individuality.individuality import Individuality from .chat_states import NotificationHandler, Notification, NotificationType +from .waiter import Waiter +from .message_sender import DirectMessageSender +from .notification_handler import PFCNotificationHandler import time -from dataclasses import dataclass, field -from .conversation import Conversation + +if TYPE_CHECKING: + from .conversation import Conversation logger = get_module_logger("pfc") -class ConversationState(Enum): - """对话状态""" - INIT = "初始化" - RETHINKING = "重新思考" - ANALYZING = "分析历史" - PLANNING = "规划目标" - GENERATING = "生成回复" - CHECKING = "检查回复" - SENDING = "发送消息" - WAITING = "等待" - LISTENING = "倾听" - ENDED = "结束" - JUDGING = "判断" - - -ActionType = Literal["direct_reply", "fetch_knowledge", "wait"] - class GoalAnalyzer: """对话目标分析器""" @@ -249,42 +234,33 @@ class GoalAnalyzer: {{ "goal_achieved": true, "stop_conversation": false, - "reason": "用户已经得到了满意的回答,但我仍希望继续聊天" + "reason": "虽然目标已达成,但对话仍然有继续的价值" }}""" - logger.debug(f"发送到LLM的提示词: {prompt}") + try: content, _ = await self.llm.generate_response_async(prompt) logger.debug(f"LLM原始返回内容: {content}") - # 使用简化函数提取JSON内容 + # 尝试解析JSON success, result = get_items_from_json( content, "goal_achieved", "stop_conversation", "reason", - required_types={ - "goal_achieved": bool, - "stop_conversation": bool, - "reason": str - } + required_types={"goal_achieved": bool, "stop_conversation": bool, "reason": str} ) if not success: - return False, False, "确保对话顺利进行" + logger.error("无法解析对话分析结果JSON") + return False, False, "解析结果失败" + + goal_achieved = result["goal_achieved"] + stop_conversation = result["stop_conversation"] + reason = result["reason"] - # 如果当前目标达成,从目标列表中移除 - if result["goal_achieved"] and not result["stop_conversation"]: - for i, (g, _, _) in enumerate(self.goals): - if g == goal: - self.goals.pop(i) - # 如果还有其他目标,不停止对话 - if self.goals: - result["stop_conversation"] = False - break - - return result["goal_achieved"], result["stop_conversation"], result["reason"] + return goal_achieved, stop_conversation, reason except Exception as e: - logger.error(f"分析对话目标时出错: {str(e)}") - return False, False, "确保对话顺利进行" + logger.error(f"分析对话状态时出错: {str(e)}") + return False, False, f"分析出错: {str(e)}" class Waiter: @@ -319,63 +295,6 @@ class Waiter: logger.info("等待中...") -class PFCNotificationHandler(NotificationHandler): - """PFC的通知处理器""" - - def __init__(self, conversation: 'Conversation'): - self.conversation = conversation - self.logger = get_module_logger("pfc_notification") - self.decision_info = conversation.decision_info - - async def handle_notification(self, notification: Notification): - """处理通知""" - try: - if not notification or not hasattr(notification, 'data') or notification.data is None: - self.logger.error("收到无效的通知:notification 或 data 为空") - return - - if notification.type == NotificationType.NEW_MESSAGE: - # 处理新消息通知 - message = notification.data - if not isinstance(message, dict): - self.logger.error(f"无效的消息格式: {type(message)}") - return - - content = message.get('content', '') - self.logger.info(f"收到新消息通知: {content[:30] if content else ''}...") - - # 更新决策信息 - try: - self.decision_info.update_from_message(message) - except Exception as e: - self.logger.error(f"更新决策信息失败: {e}") - return - - # 触发对话系统更新 - self.conversation.chat_observer.trigger_update() - - elif notification.type == NotificationType.COLD_CHAT: - # 处理冷场通知 - try: - is_cold = bool(notification.data.get("is_cold", False)) - # 更新决策信息 - self.decision_info.update_cold_chat_status(is_cold, time.time()) - - if is_cold: - self.logger.info("检测到对话冷场") - else: - self.logger.info("对话恢复活跃") - except Exception as e: - self.logger.error(f"处理冷场状态失败: {e}") - return - - except Exception as e: - self.logger.error(f"处理通知时出错: {str(e)}") - # 添加更详细的错误信息 - self.logger.error(f"通知类型: {getattr(notification, 'type', None)}") - self.logger.error(f"通知数据: {getattr(notification, 'data', None)}") - - class DirectMessageSender: """直接发送消息到平台的发送器""" diff --git a/src/plugins/PFC/pfc_manager.py b/src/plugins/PFC/pfc_manager.py index 7e5f4cdb4..9a36bef19 100644 --- a/src/plugins/PFC/pfc_manager.py +++ b/src/plugins/PFC/pfc_manager.py @@ -1,6 +1,6 @@ from typing import Dict, Optional from src.common.logger import get_module_logger -from .pfc import Conversation +from .conversation import Conversation import traceback logger = get_module_logger("pfc_manager") diff --git a/src/plugins/PFC/pfc_types.py b/src/plugins/PFC/pfc_types.py new file mode 100644 index 000000000..d7ad8e91f --- /dev/null +++ b/src/plugins/PFC/pfc_types.py @@ -0,0 +1,21 @@ +from enum import Enum +from typing import Literal + + +class ConversationState(Enum): + """对话状态""" + INIT = "初始化" + RETHINKING = "重新思考" + ANALYZING = "分析历史" + PLANNING = "规划目标" + GENERATING = "生成回复" + CHECKING = "检查回复" + SENDING = "发送消息" + FETCHING = "获取知识" + WAITING = "等待" + LISTENING = "倾听" + ENDED = "结束" + JUDGING = "判断" + + +ActionType = Literal["direct_reply", "fetch_knowledge", "wait"] \ No newline at end of file diff --git a/src/plugins/PFC/reply_generator.py b/src/plugins/PFC/reply_generator.py index 70be6eebc..6ef5f1e20 100644 --- a/src/plugins/PFC/reply_generator.py +++ b/src/plugins/PFC/reply_generator.py @@ -1,24 +1,13 @@ import datetime -import asyncio -from typing import List, Optional, Dict, Any, Tuple, Literal, Set -from enum import Enum +from typing import List, Optional, Dict, Tuple from src.common.logger import get_module_logger -from ..chat.chat_stream import ChatStream -from ..message.message_base import UserInfo, Seg +from ..message.message_base import UserInfo from ..chat.message import Message from ..models.utils_model import LLM_request from ..config.config import global_config -from src.plugins.chat.message import MessageSending -from ..message.api import global_api -from ..storage.storage import MessageStorage from .chat_observer import ChatObserver from .reply_checker import ReplyChecker -from .pfc_utils import get_items_from_json from src.individuality.individuality import Individuality -from .chat_states import NotificationHandler, Notification, NotificationType -import time -from dataclasses import dataclass, field -from .conversation import Conversation logger = get_module_logger("reply_generator") diff --git a/src/plugins/PFC/waiter.py b/src/plugins/PFC/waiter.py new file mode 100644 index 000000000..0e1bf59f3 --- /dev/null +++ b/src/plugins/PFC/waiter.py @@ -0,0 +1,45 @@ +from src.common.logger import get_module_logger +from .chat_observer import ChatObserver + +logger = get_module_logger("waiter") + +class Waiter: + """等待器,用于等待对话流中的事件""" + + def __init__(self, stream_id: str): + self.stream_id = stream_id + self.chat_observer = ChatObserver.get_instance(stream_id) + + async def wait(self, timeout: float = 20.0) -> bool: + """等待用户回复或超时 + + Args: + timeout: 超时时间(秒) + + Returns: + bool: 如果因为超时返回则为True,否则为False + """ + try: + message_before = self.chat_observer.get_last_message() + + # 等待新消息 + logger.debug(f"等待新消息,超时时间: {timeout}秒") + + is_timeout = await self.chat_observer.wait_for_update(timeout=timeout) + if is_timeout: + logger.debug("等待超时,没有收到新消息") + return True + + # 检查是否是新消息 + message_after = self.chat_observer.get_last_message() + if message_before and message_after and message_before.get("message_id") == message_after.get("message_id"): + # 如果消息ID相同,说明没有新消息 + logger.debug("没有收到新消息") + return True + + logger.debug("收到新消息") + return False + + except Exception as e: + logger.error(f"等待时出错: {str(e)}") + return True \ No newline at end of file