better:尝试重构pfc

This commit is contained in:
SengokuCola
2025-04-08 00:23:08 +08:00
parent 0d7068acab
commit cc190ac2b9
6 changed files with 797 additions and 308 deletions

View File

@@ -2,7 +2,7 @@
#Prefrontal cortex
import datetime
import asyncio
from typing import List, Optional, Dict, Any, Tuple, Literal
from typing import List, Optional, Dict, Any, Tuple, Literal, Set
from enum import Enum
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
@@ -19,7 +19,9 @@ from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .reply_checker import ReplyChecker
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .chat_states import NotificationHandler, Notification, NotificationType
import time
from dataclasses import dataclass, field
logger = get_module_logger("pfc")
@@ -42,6 +44,99 @@ class ConversationState(Enum):
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]
@dataclass
class DecisionInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息"""
# 消息相关
last_message_time: Optional[float] = None
last_message_content: Optional[str] = None
last_message_sender: Optional[str] = None
new_messages_count: int = 0
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list)
# 对话状态
is_cold_chat: bool = False
cold_chat_duration: float = 0.0
last_bot_speak_time: Optional[float] = None
last_user_speak_time: Optional[float] = None
# 对话参与者
active_users: Set[str] = field(default_factory=set)
bot_id: str = field(default="")
def update_from_message(self, message: Dict[str, Any]):
"""从消息更新信息
Args:
message: 消息数据
"""
self.last_message_time = message["time"]
self.last_message_content = message.get("processed_plain_text", "")
user_info = UserInfo.from_dict(message.get("user_info", {}))
self.last_message_sender = user_info.user_id
if user_info.user_id == self.bot_id:
self.last_bot_speak_time = message["time"]
else:
self.last_user_speak_time = message["time"]
self.active_users.add(user_info.user_id)
self.new_messages_count += 1
self.unprocessed_messages.append(message)
def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态
Args:
is_cold: 是否冷场
current_time: 当前时间
"""
self.is_cold_chat = is_cold
if is_cold and self.last_message_time:
self.cold_chat_duration = current_time - self.last_message_time
def get_active_duration(self) -> float:
"""获取当前活跃时长
Returns:
float: 最后一条消息到现在的时长(秒)
"""
if not self.last_message_time:
return 0.0
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取用户响应时间
Returns:
Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None
"""
if not self.last_user_speak_time:
return None
return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人响应时间
Returns:
Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None
"""
if not self.last_bot_speak_time:
return None
return time.time() - self.last_bot_speak_time
def clear_unprocessed_messages(self):
"""清空未处理消息列表"""
self.unprocessed_messages.clear()
self.new_messages_count = 0
# Forward reference for type hints
DecisionInfoType = DecisionInfo
class ActionPlanner:
"""行动规划器"""
@@ -62,22 +157,24 @@ class ActionPlanner:
method: str,
reasoning: str,
action_history: List[Dict[str, str]] = None,
chat_observer: Optional[ChatObserver] = None, # 添加chat_observer参数
decision_info: DecisionInfoType = None # Use DecisionInfoType here
) -> Tuple[str, str]:
"""规划下一步行动
Args:
goal: 对话目标
method: 实现方法
reasoning: 目标原因
action_history: 行动历史记录
decision_info: 决策信息
Returns:
Tuple[str, str]: (行动类型, 行动原因)
"""
# 构建提示词
# 获取最近20条消息
self.chat_observer.waiting_start_time = time.time()
logger.debug(f"开始规划行动:当前目标: {goal}")
# 获取最近20条消息
messages = self.chat_observer.get_message_history(limit=20)
chat_history_text = ""
for msg in messages:
@@ -92,22 +189,42 @@ class ActionPlanner:
# 构建action历史文本
action_history_text = ""
if action_history:
if action_history[-1]['action'] == "direct_reply":
action_history_text = "你刚刚发言回复了对方"
if action_history and action_history[-1]['action'] == "direct_reply":
action_history_text = "你刚刚发言回复了对方"
# 构建决策信息文本
decision_info_text = ""
if decision_info:
decision_info_text = "当前对话状态:\n"
if decision_info.is_cold_chat:
decision_info_text += f"对话处于冷场状态,已持续{int(decision_info.cold_chat_duration)}\n"
if decision_info.new_messages_count > 0:
decision_info_text += f"{decision_info.new_messages_count}条新消息未处理\n"
user_response_time = decision_info.get_user_response_time()
if user_response_time:
decision_info_text += f"距离用户上次发言已过去{int(user_response_time)}\n"
bot_response_time = decision_info.get_bot_response_time()
if bot_response_time:
decision_info_text += f"距离你上次发言已过去{int(bot_response_time)}\n"
if decision_info.active_users:
decision_info_text += f"当前活跃用户数: {len(decision_info.active_users)}\n"
# 获取时间信息
time_info = self.chat_observer.get_time_info()
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动
prompt = f"""现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动
{personality_text}
当前对话目标:{goal}
实现该对话目标的方式:{method}
产生该对话目标的原因:{reasoning}
{time_info}
{decision_info_text}
{action_history_text}
最近的对话记录:
{chat_history_text}
{action_history_text}
请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言:
行动类型:
fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择
@@ -413,16 +530,23 @@ class Waiter:
Returns:
bool: 是否超时True表示超时
"""
wait_start_time = self.chat_observer.waiting_start_time
while not self.chat_observer.new_message_after(wait_start_time):
await asyncio.sleep(1)
logger.info("等待中...")
# 检查是否超过60秒
# 使用当前时间作为等待开始时间
wait_start_time = time.time()
self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间
while True:
# 检查是否有新消息
if self.chat_observer.new_message_after(wait_start_time):
logger.info("等待结束,收到新消息")
return False
# 检查是否超时
if time.time() - wait_start_time > 300:
logger.info("等待超过300秒结束对话")
return True
logger.info("等待结束")
return False
await asyncio.sleep(1)
logger.info("等待中...")
class ReplyGenerator:
@@ -519,16 +643,16 @@ class ReplyGenerator:
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.info(f"生成的回复: {content}")
is_new = self.chat_observer.check()
logger.debug(f"再看一眼聊天记录,{'' if is_new else '没有'}新消息")
# is_new = self.chat_observer.check()
# logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息")
# 如果有新消息,重新生成回复
if is_new:
logger.info("检测到新消息,重新生成回复")
return await self.generate(
goal, chat_history, knowledge_cache,
None, retry_count
)
# if is_new:
# logger.info("检测到新消息,重新生成回复")
# return await self.generate(
# goal, chat_history, knowledge_cache,
# None, retry_count
# )
return content
@@ -555,12 +679,69 @@ class ReplyGenerator:
return await self.reply_checker.check(reply, goal, retry_count)
class PFCNotificationHandler(NotificationHandler):
"""PFC的通知处理器"""
def __init__(self, conversation: 'Conversation'):
self.conversation = conversation
self.logger = get_module_logger("pfc_notification")
self.decision_info = conversation.decision_info
async def handle_notification(self, notification: Notification):
"""处理通知"""
try:
if not notification or not hasattr(notification, 'data') or notification.data is None:
self.logger.error("收到无效的通知notification 或 data 为空")
return
if notification.type == NotificationType.NEW_MESSAGE:
# 处理新消息通知
message = notification.data
if not isinstance(message, dict):
self.logger.error(f"无效的消息格式: {type(message)}")
return
content = message.get('content', '')
self.logger.info(f"收到新消息通知: {content[:30] if content else ''}...")
# 更新决策信息
try:
self.decision_info.update_from_message(message)
except Exception as e:
self.logger.error(f"更新决策信息失败: {e}")
return
# 触发对话系统更新
self.conversation.chat_observer.trigger_update()
elif notification.type == NotificationType.COLD_CHAT:
# 处理冷场通知
try:
is_cold = bool(notification.data.get("is_cold", False))
# 更新决策信息
self.decision_info.update_cold_chat_status(is_cold, time.time())
if is_cold:
self.logger.info("检测到对话冷场")
else:
self.logger.info("对话恢复活跃")
except Exception as e:
self.logger.error(f"处理冷场状态失败: {e}")
return
except Exception as e:
self.logger.error(f"处理通知时出错: {str(e)}")
# 添加更详细的错误信息
self.logger.error(f"通知类型: {getattr(notification, 'type', None)}")
self.logger.error(f"通知数据: {getattr(notification, 'data', None)}")
class Conversation:
# 类级别的实例管理
_instances: Dict[str, 'Conversation'] = {}
_instance_lock = asyncio.Lock() # 类级别的全局锁
_init_events: Dict[str, asyncio.Event] = {} # 初始化完成事件
_initializing: Dict[str, bool] = {} # 标记是否正在初始化
_instance_lock = asyncio.Lock()
_init_events: Dict[str, asyncio.Event] = {}
_initializing: Dict[str, bool] = {}
@classmethod
async def get_instance(cls, stream_id: str) -> Optional['Conversation']:
@@ -573,102 +754,89 @@ class Conversation:
Optional[Conversation]: 对话实例如果创建或等待失败则返回None
"""
try:
# 使用全局锁来确保线程安全
async with cls._instance_lock:
# 如果已经在初始化中,等待初始化完成
if stream_id in cls._initializing and cls._initializing[stream_id]:
# 释放锁等待初始化
cls._instance_lock.release()
try:
await asyncio.wait_for(cls._init_events[stream_id].wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.error(f"等待实例 {stream_id} 初始化超时")
return None
finally:
await cls._instance_lock.acquire()
# 如果实例不存在,创建新实例
if stream_id not in cls._instances:
cls._instances[stream_id] = cls(stream_id)
cls._init_events[stream_id] = asyncio.Event()
cls._initializing[stream_id] = True
logger.info(f"创建新的对话实例: {stream_id}")
# 检查是否已经有实例
if stream_id in cls._instances:
return cls._instances[stream_id]
async with cls._instance_lock:
# 再次检查,防止在获取锁的过程中其他线程创建了实例
if stream_id in cls._instances:
return cls._instances[stream_id]
# 如果正在初始化,等待初始化完成
if stream_id in cls._initializing and cls._initializing[stream_id]:
event = cls._init_events.get(stream_id)
if event:
try:
# 在等待之前释放锁
cls._instance_lock.release()
await asyncio.wait_for(event.wait(), timeout=10.0) # 增加超时时间到10秒
# 重新获取锁
await cls._instance_lock.acquire()
if stream_id in cls._instances:
return cls._instances[stream_id]
except asyncio.TimeoutError:
logger.error(f"等待实例 {stream_id} 初始化超时")
# 清理超时的初始化状态
cls._initializing[stream_id] = False
if stream_id in cls._init_events:
del cls._init_events[stream_id]
return None
# 创建新实例
logger.info(f"创建新的对话实例: {stream_id}")
cls._initializing[stream_id] = True
cls._init_events[stream_id] = asyncio.Event()
# 在锁保护下创建实例
instance = cls(stream_id)
cls._instances[stream_id] = instance
# 启动实例初始化(在后台运行)
asyncio.create_task(instance._initialize())
return instance
except Exception as e:
logger.error(f"获取对话实例失败: {e}")
return None
async def _initialize(self):
"""初始化实例(在后台运行)"""
try:
logger.info(f"开始初始化对话实例: {self.stream_id}")
self.chat_observer.start() # 启动观察器
await asyncio.sleep(1) # 给观察器一些启动时间
# 获取初始目标
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
# 标记初始化完成
self.__class__._initializing[self.stream_id] = False
if self.stream_id in self.__class__._init_events:
self.__class__._init_events[self.stream_id].set()
# 启动对话循环
asyncio.create_task(self._conversation_loop())
except Exception as e:
logger.error(f"初始化对话实例失败: {e}")
# 清理失败的初始化
self.__class__._initializing[self.stream_id] = False
if self.stream_id in self.__class__._init_events:
self.__class__._init_events[self.stream_id].set()
if self.stream_id in self.__class__._instances:
del self.__class__._instances[self.stream_id]
@classmethod
async def remove_instance(cls, stream_id: str):
"""删除对话实例
Args:
stream_id: 聊天流ID
"""
async with cls._instance_lock:
if stream_id in cls._instances:
# 停止相关组件
instance = cls._instances[stream_id]
instance.chat_observer.stop()
# 删除实例
del cls._instances[stream_id]
if stream_id in cls._init_events:
del cls._init_events[stream_id]
if stream_id in cls._initializing:
del cls._initializing[stream_id]
logger.info(f"已删除对话实例 {stream_id}")
def __init__(self, stream_id: str):
"""初始化对话系统"""
self.stream_id = stream_id
self.state = ConversationState.INIT
self.current_goal: Optional[str] = None
self.current_method: Optional[str] = None
self.goal_reasoning: Optional[str] = None
self.generated_reply: Optional[str] = None
self.should_continue = True
# 初始化聊天观察器
self.chat_observer = ChatObserver.get_instance(stream_id)
# 添加action历史记录
self.action_history: List[Dict[str, str]] = []
# 知识缓存
self.knowledge_cache: Dict[str, str] = {} # 确保初始化为字典
# 初始化各个组件
self.goal_analyzer = GoalAnalyzer(self.stream_id)
self.action_planner = ActionPlanner(self.stream_id)
self.reply_generator = ReplyGenerator(self.stream_id)
self.knowledge_fetcher = KnowledgeFetcher()
self.direct_sender = DirectMessageSender()
self.waiter = Waiter(self.stream_id)
# 创建聊天流
self.chat_stream = chat_manager.get_stream(self.stream_id)
def _clear_knowledge_cache(self):
"""清空知识缓存"""
self.knowledge_cache.clear() # 使用clear方法清空字典
async def start(self):
"""开始对话流程"""
try:
logger.info("对话系统启动")
self.should_continue = True
self.chat_observer.start() # 启动观察器
await asyncio.sleep(1)
# 启动对话循环
await self._conversation_loop()
except Exception as e:
logger.error(f"启动对话系统失败: {e}")
raise
finally:
# 标记初始化完成
self._init_events[self.stream_id].set()
self._initializing[self.stream_id] = False
async def _conversation_loop(self):
"""对话循环"""
@@ -681,17 +849,21 @@ class Conversation:
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
# 使用决策信息来辅助行动规划
action, reason = await self.action_planner.plan(
self.current_goal,
self.current_method,
self.goal_reasoning,
self.action_history, # 传入action历史
self.chat_observer # 传入chat_observer
self.action_history,
self.decision_info # 传入决策信息
)
# 执行行动
await self._handle_action(action, reason)
# 清理已处理的消息
self.decision_info.clear_unprocessed_messages()
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
@@ -742,87 +914,6 @@ class Conversation:
self.current_goal
)
if not is_suitable:
logger.warning(f"生成的回复不合适,原因: {reason}")
if need_replan:
# 尝试切换到其他备选目标
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 有备选目标,尝试使用下一个目标
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"切换到备选目标: {self.current_goal}")
# 使用新目标生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache
)
# 检查使用新目标生成的回复是否合适
is_suitable, reason, _ = await self.reply_generator.check_reply(
self.generated_reply,
self.current_goal
)
if is_suitable:
# 如果新目标的回复合适,调整目标优先级
await self.goal_analyzer._update_goals(
self.current_goal,
self.current_method,
self.goal_reasoning
)
else:
# 如果新目标还是不合适,重新思考目标
self.state = ConversationState.RETHINKING
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
return
else:
# 没有备选目标,重新分析
self.state = ConversationState.RETHINKING
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
return
else:
# 重新生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
self.generated_reply # 将不合适的回复作为previous_reply传入
)
while self.chat_observer.check():
if not is_suitable:
logger.warning(f"生成的回复不合适,原因: {reason}")
if need_replan:
# 尝试切换到其他备选目标
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 有备选目标,尝试使用下一个目标
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"切换到备选目标: {self.current_goal}")
# 使用新目标生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache
)
is_suitable = True # 假设使用新目标后回复是合适的
else:
# 没有备选目标,重新分析
self.state = ConversationState.RETHINKING
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
return
else:
# 重新生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
self.generated_reply # 将不合适的回复作为previous_reply传入
)
await self._send_reply()
elif action == "fetch_knowledge":
@@ -836,59 +927,6 @@ class Conversation:
if knowledge != "未找到相关知识":
self.knowledge_cache[sources] = knowledge
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache
)
# 检查回复是否合适
is_suitable, reason, need_replan = await self.reply_generator.check_reply(
self.generated_reply,
self.current_goal
)
if not is_suitable:
logger.warning(f"生成的回复不合适,原因: {reason}")
if need_replan:
# 尝试切换到其他备选目标
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 有备选目标,尝试使用
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"切换到备选目标: {self.current_goal}")
# 使用新目标获取知识并生成回复
knowledge, sources = await self.knowledge_fetcher.fetch(
self.current_goal,
[self._convert_to_message(msg) for msg in messages]
)
if knowledge != "未找到相关知识":
self.knowledge_cache[sources] = knowledge
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache
)
else:
# 没有备选目标,重新分析
self.state = ConversationState.RETHINKING
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
return
else:
# 重新生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
self.generated_reply # 将不合适的回复作为previous_reply传入
)
await self._send_reply()
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING