feat:PFC重构,模块化拆分

This commit is contained in:
SengokuCola
2025-04-08 22:31:47 +08:00
parent e3b2d5b88c
commit 687c3f6710
12 changed files with 661 additions and 385 deletions

View File

@@ -1,27 +1,25 @@
import datetime
import asyncio
from typing import List, Optional, Dict, Any, Tuple, Literal, Set
from enum import Enum
from typing import List, Dict, Tuple
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo, Seg
from ..chat.message import Message
from ..message.message_base import UserInfo
from ..models.utils_model import LLM_request
from ..config.config import global_config
from src.plugins.chat.message import MessageSending
from ..message.api import global_api
from ..storage.storage import MessageStorage
from .chat_observer import ChatObserver
from .reply_checker import ReplyChecker
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .chat_states import NotificationHandler, Notification, NotificationType
import time
from dataclasses import dataclass, field
from .pfc import DecisionInfo, DecisionInfoType
from .observation_info import ObservationInfo
from .conversation import ConversationInfo
logger = get_module_logger("action_planner")
class ActionPlannerInfo:
def __init__(self):
self.done_action = []
self.goal_list = []
self.knowledge_list = []
self.memory_list = []
class ActionPlanner:
"""行动规划器"""
@@ -38,73 +36,60 @@ class ActionPlanner:
async def plan(
self,
goal: str,
method: str,
reasoning: str,
action_history: List[Dict[str, str]] = None,
decision_info: DecisionInfoType = None # Use DecisionInfoType here
observation_info: ObservationInfo,
conversation_info: ConversationInfo
) -> Tuple[str, str]:
"""规划下一步行动
Args:
goal: 对话目标
method: 实现方法
reasoning: 目标原因
action_history: 行动历史记录
decision_info: 决策信息
observation_info: 决策信息
conversation_info: 对话信息
Returns:
Tuple[str, str]: (行动类型, 行动原因)
"""
# 构建提示词
logger.debug(f"开始规划行动:当前目标: {goal}")
logger.debug(f"开始规划行动:当前目标: {conversation_info.goal_list}")
# 获取最近20条消息
messages = self.chat_observer.get_message_history(limit=20)
#构建对话目标
if conversation_info.goal_list:
goal, reasoning = conversation_info.goal_list[-1]
else:
goal = "目前没有明确对话目标"
reasoning = "目前没有明确对话目标,最好思考一个对话目标"
# 获取聊天历史记录
chat_history_list = observation_info.chat_history
chat_history_text = ""
for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}"
if sender == self.name:
sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
for msg in chat_history_list:
chat_history_text += f"{msg}\n"
if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg}\n"
observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本
action_history_text = ""
if action_history and action_history[-1]['action'] == "direct_reply":
action_history_text = "你刚刚发言回复了对方"
action_history_list = conversation_info.action_history
action_history_text = "你之前做的事情是:"
for action in action_history_list:
action_history_text += f"{action}\n"
# 构建决策信息文本
decision_info_text = ""
if decision_info:
decision_info_text = "当前对话状态:\n"
if decision_info.is_cold_chat:
decision_info_text += f"对话处于冷场状态,已持续{int(decision_info.cold_chat_duration)}\n"
if decision_info.new_messages_count > 0:
decision_info_text += f"{decision_info.new_messages_count}条新消息未处理\n"
user_response_time = decision_info.get_user_response_time()
if user_response_time:
decision_info_text += f"距离用户上次发言已过去{int(user_response_time)}\n"
bot_response_time = decision_info.get_bot_response_time()
if bot_response_time:
decision_info_text += f"距离你上次发言已过去{int(bot_response_time)}\n"
if decision_info.active_users:
decision_info_text += f"当前活跃用户数: {len(decision_info.active_users)}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动
当前对话目标:{goal}
实现该对话目标的方式:{method}
产生该对话目标的原因:{reasoning}
{decision_info_text}
{action_history_text}
最近的对话记录:
@@ -117,7 +102,6 @@ wait: 当你做出了发言,对方尚未回复时等待对方的回复
listening: 倾听对方发言,当你认为对方发言尚未结束时采用
direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言
rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标
judge_conversation: 判断对话是否结束,当发现对话目标已经达到或者希望停止对话时选择,会判断对话是否结束
请以JSON格式输出包含以下字段
1. action: 行动类型,注意你之前的行为
@@ -134,7 +118,7 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到
success, result = get_items_from_json(
content,
"action", "reason",
default_values={"action": "direct_reply", "reason": "默认原因"}
default_values={"action": "direct_reply", "reason": "没有明确原因"}
)
if not success:
@@ -144,7 +128,7 @@ judge_conversation: 判断对话是否结束,当发现对话目标已经达到
reason = result["reason"]
# 验证action类型
if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal", "judge_conversation"]:
if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal"]:
logger.warning(f"未知的行动类型: {action}默认使用listening")
action = "listening"

View File

@@ -70,6 +70,11 @@ class ChatObserver:
self.last_cold_chat_check: float = time.time()
self.is_cold_chat_state: bool = False
self.update_event = asyncio.Event()
self.update_interval = 5 # 更新间隔(秒)
self.message_cache = []
self.update_running = False
async def check(self) -> bool:
"""检查距离上一次观察之后是否有了新消息
@@ -368,3 +373,71 @@ class ChatObserver:
time_info += f"\n距离对方上次发言已经过去了{int(user_speak_ago)}"
return time_info
def start_periodic_update(self):
"""启动观察器的定期更新"""
if not self.update_running:
self.update_running = True
asyncio.create_task(self._periodic_update())
async def _periodic_update(self):
"""定期更新消息历史"""
try:
while self.update_running:
await self._update_message_history()
await asyncio.sleep(self.update_interval)
except Exception as e:
logger.error(f"定期更新消息历史时出错: {str(e)}")
async def _update_message_history(self) -> bool:
"""更新消息历史
Returns:
bool: 是否有新消息
"""
try:
messages = await self.message_storage.get_messages_for_stream(
self.stream_id,
limit=50
)
if not messages:
return False
# 检查是否有新消息
has_new_messages = False
if messages and (not self.message_cache or messages[0]["message_id"] != self.message_cache[0]["message_id"]):
has_new_messages = True
self.message_cache = messages
if has_new_messages:
self.update_event.set()
self.update_event.clear()
return True
return False
except Exception as e:
logger.error(f"更新消息历史时出错: {str(e)}")
return False
def get_message_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""获取消息历史
Args:
limit: 获取的最大消息数量
Returns:
List[Dict[str, Any]]: 消息历史列表
"""
return self.message_cache[:limit]
def get_last_message(self) -> Optional[Dict[str, Any]]:
"""获取最后一条消息
Returns:
Optional[Dict[str, Any]]: 最后一条消息如果没有则返回None
"""
if not self.message_cache:
return None
return self.message_cache[0]

View File

@@ -2,21 +2,31 @@ import asyncio
import datetime
from typing import Dict, Any
from ..chat.message import Message
from .pfc import ConversationState, ChatObserver,GoalAnalyzer, Waiter, DirectMessageSender, PFCNotificationHandler
from .pfc_types import ConversationState
from .pfc import ChatObserver, GoalAnalyzer, Waiter, DirectMessageSender, PFCNotificationHandler
from src.common.logger import get_module_logger
from .action_planner import ActionPlanner
from .decision_info import DecisionInfo
from .observation_info import ObservationInfo
from .reply_generator import ReplyGenerator
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo
from ..config.config import global_config
from src.plugins.chat.chat_stream import chat_manager
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .chat_states import NotificationType
import time
import traceback
logger = get_module_logger("pfc_conversation")
class ConversationInfo:
def __init__(self):
self.done_action = []
self.goal_list = []
self.knowledge_list = []
self.memory_list = []
class Conversation:
"""对话类,负责管理单个对话的状态和行为"""
@@ -31,23 +41,13 @@ class Conversation:
self.state = ConversationState.INIT
self.should_continue = False
# 目标和规划
self.current_goal = "保持友好的对话"
self.current_method = "以友好的态度回应"
self.goal_reasoning = "确保对话顺利进行"
# 知识缓存和行动历史
self.knowledge_cache = {}
self.action_history = []
# 回复相关
self.generated_reply = ""
async def _initialize(self):
"""初始化实例,注册所有组件"""
try:
self.chat_observer = ChatObserver.get_instance(self.stream_id)
try:
self.action_planner = ActionPlanner(self.stream_id)
self.goal_analyzer = GoalAnalyzer(self.stream_id)
self.reply_generator = ReplyGenerator(self.stream_id)
@@ -58,69 +58,69 @@ class Conversation:
# 获取聊天流信息
self.chat_stream = chat_manager.get_stream(self.stream_id)
# 决策信息
self.decision_info = DecisionInfo()
self.decision_info.bot_id = global_config.BOT_QQ
# 创建通知处理器
self.notification_handler = PFCNotificationHandler(self)
self.stop_action_planner = False
except Exception as e:
logger.error(f"初始化对话实例:注册组件失败: {e}")
logger.error(f"初始化对话实例:注册运行组件失败: {e}")
logger.error(traceback.format_exc())
raise
try:
start_time = time.time()
self.chat_observer.start() # 启动观察器
logger.info(f"观察器启动完成,耗时: {time.time() - start_time:.2f}")
await asyncio.sleep(1) # 给观察器一些启动时间
total_time = time.time() - start_time
logger.info(f"实例初始化完成,总耗时: {total_time:.2f}")
self.should_continue = True
asyncio.create_task(self.start())
#决策所需要的信息,包括自身自信和观察信息两部分
#注册观察器和观测信息
self.chat_observer = ChatObserver.get_instance(self.stream_id)
self.chat_observer.start()
self.observation_info = ObservationInfo()
self.observation_info.bind_to_chat_observer(self.stream_id)
#对话信息
self.conversation_info = ConversationInfo()
except Exception as e:
logger.error(f"初始化对话实例失败: {e}")
logger.error(f"初始化对话实例:注册信息组件失败: {e}")
logger.error(traceback.format_exc())
raise
# 组件准备完成,启动该论对话
self.should_continue = True
asyncio.create_task(self.start())
async def start(self):
"""开始对话流程"""
try:
logger.info("对话系统启动")
while self.should_continue:
await self._do_a_step()
logger.info("对话系统启动中...")
asyncio.create_task(self._plan_and_action_loop())
except Exception as e:
logger.error(f"启动对话系统失败: {e}")
raise
async def _do_a_step(self):
"""思考步"""
async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块"""
# 获取最近的消息历史
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
while self.should_continue:
# 使用决策信息来辅助行动规划
action, reason = await self.action_planner.plan(
self.observation_info,
self.conversation_info
)
if self._check_new_messages_after_planning():
continue
self.chat_observer.trigger_update() # 触发立即更新
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
# 执行行动
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
# 使用决策信息来辅助行动规划
action, reason = await self.action_planner.plan(
self.current_goal,
self.current_method,
self.goal_reasoning,
self.action_history,
self.decision_info # 传入决策信息
)
async def _check_new_messages_after_planning(self):
"""检查在规划后是否有新消息"""
if self.observation_info.new_messages_count > 0:
logger.info(f"发现{self.observation_info.new_messages_count}条新消息,可能需要重新考虑行动")
# 如果需要,可以在这里添加逻辑来根据新消息重新决定行动
return True
return False
# 执行行动
await self._handle_action(action, reason)
# # 清理已处理的消息
# self.decision_info.clear_unprocessed_messages()
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
@@ -141,20 +141,18 @@ class Conversation:
logger.warning(f"转换消息时出错: {e}")
raise
async def _handle_action(self, action: str, reason: str):
async def _handle_action(self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo):
"""处理规划的行动"""
logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史
self.action_history.append({
# 记录action历史先设置为stop完成后再设置为done
conversation_info.action_history.append({
"action": action,
"reason": reason,
"status": "stop",
"time": datetime.datetime.now().strftime("%H:%M:%S")
})
# 只保留最近的10条记录
if len(self.action_history) > 10:
self.action_history = self.action_history[-10:]
if action == "direct_reply":
self.state = ConversationState.GENERATING
@@ -174,37 +172,34 @@ class Conversation:
await self._send_reply()
elif action == "fetch_knowledge":
self.state = ConversationState.GENERATING
messages = self.chat_observer.get_message_history(limit=30)
knowledge, sources = await self.knowledge_fetcher.fetch(
self.current_goal,
[self._convert_to_message(msg) for msg in messages]
)
logger.info(f"获取到知识,来源: {sources}")
conversation_info.action_history.append({
"action": action,
"reason": reason,
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S")
})
if knowledge != "未找到相关知识":
self.knowledge_cache[sources] = knowledge
elif action == "fetch_knowledge":
self.state = ConversationState.FETCHING
knowledge = "TODO:知识"
topic = "TODO:关键词"
logger.info(f"假装获取到知识{knowledge},关键词是: {topic}")
if knowledge:
if topic not in self.conversation_info.knowledge_list:
self.conversation_info.knowledge_list.append({
"topic": topic,
"knowledge": knowledge
})
else:
self.conversation_info.knowledge_list[topic] += knowledge
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
elif action == "judge_conversation":
self.state = ConversationState.JUDGING
self.goal_achieved, self.stop_conversation, self.reason = await self.goal_analyzer.analyze_conversation(self.current_goal, self.goal_reasoning)
# 如果当前目标达成但还有其他目标
if self.goal_achieved and not self.stop_conversation:
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 切换到下一个目标
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"当前目标已达成,切换到新目标: {self.current_goal}")
return
if self.stop_conversation:
await self._stop_conversation()
goal_list = observation_info.goal_list
new_goal_list = await self.goal_analyzer.analyze_goal(goal_list)
observation_info.goal_list = new_goal_list
elif action == "listening":
self.state = ConversationState.LISTENING
@@ -230,7 +225,7 @@ class Conversation:
latest_message = self._convert_to_message(messages[0])
await self.direct_sender.send_message(
chat_stream=self.chat_stream,
content="抱歉,由于等待时间过长,我需要先去忙别的了。下次再聊吧~",
content="TODO:超时消息",
reply_to_message=latest_message
)
except Exception as e:

View File

@@ -1,116 +0,0 @@
#Programmable Friendly Conversationalist
#Prefrontal cortex
import datetime
import asyncio
from typing import List, Optional, Dict, Any, Tuple, Literal, Set
from enum import Enum
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo, Seg
from ..chat.message import Message
from ..models.utils_model import LLM_request
from ..config.config import global_config
from src.plugins.chat.message import MessageSending
from ..message.api import global_api
from ..storage.storage import MessageStorage
from .chat_observer import ChatObserver
from .reply_generator import ReplyGenerator
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .chat_states import NotificationHandler, Notification, NotificationType
import time
from dataclasses import dataclass, field
from .conversation import Conversation
@dataclass
class DecisionInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息"""
# 消息相关
last_message_time: Optional[float] = None
last_message_content: Optional[str] = None
last_message_sender: Optional[str] = None
new_messages_count: int = 0
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list)
# 对话状态
is_cold_chat: bool = False
cold_chat_duration: float = 0.0
last_bot_speak_time: Optional[float] = None
last_user_speak_time: Optional[float] = None
# 对话参与者
active_users: Set[str] = field(default_factory=set)
bot_id: str = field(default="")
def update_from_message(self, message: Dict[str, Any]):
"""从消息更新信息
Args:
message: 消息数据
"""
self.last_message_time = message["time"]
self.last_message_content = message.get("processed_plain_text", "")
user_info = UserInfo.from_dict(message.get("user_info", {}))
self.last_message_sender = user_info.user_id
if user_info.user_id == self.bot_id:
self.last_bot_speak_time = message["time"]
else:
self.last_user_speak_time = message["time"]
self.active_users.add(user_info.user_id)
self.new_messages_count += 1
self.unprocessed_messages.append(message)
def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态
Args:
is_cold: 是否冷场
current_time: 当前时间
"""
self.is_cold_chat = is_cold
if is_cold and self.last_message_time:
self.cold_chat_duration = current_time - self.last_message_time
def get_active_duration(self) -> float:
"""获取当前活跃时长
Returns:
float: 最后一条消息到现在的时长(秒)
"""
if not self.last_message_time:
return 0.0
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取用户响应时间
Returns:
Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None
"""
if not self.last_user_speak_time:
return None
return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人响应时间
Returns:
Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None
"""
if not self.last_bot_speak_time:
return None
return time.time() - self.last_bot_speak_time
def clear_unprocessed_messages(self):
"""清空未处理消息列表"""
self.unprocessed_messages.clear()
self.new_messages_count = 0
# Forward reference for type hints
DecisionInfoType = DecisionInfo

View File

@@ -0,0 +1,49 @@
from typing import Optional
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..chat.message import Message
from ..message.message_base import Seg
from src.plugins.chat.message import MessageSending
logger = get_module_logger("message_sender")
class DirectMessageSender:
"""直接消息发送器"""
def __init__(self):
pass
async def send_message(
self,
chat_stream: ChatStream,
content: str,
reply_to_message: Optional[Message] = None,
) -> None:
"""发送消息到聊天流
Args:
chat_stream: 聊天流
content: 消息内容
reply_to_message: 要回复的消息(可选)
"""
try:
# 创建消息内容
segments = [Seg(type="text", data={"text": content})]
# 检查是否需要引用回复
if reply_to_message:
reply_id = reply_to_message.message_id
message_sending = MessageSending(
segments=segments,
reply_to_id=reply_id
)
else:
message_sending = MessageSending(segments=segments)
# 发送消息
await chat_stream.send_message(message_sending)
logger.info(f"消息已发送: {content}")
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
raise

View File

@@ -0,0 +1,71 @@
from typing import TYPE_CHECKING
from src.common.logger import get_module_logger
from .chat_states import NotificationHandler, Notification, NotificationType
if TYPE_CHECKING:
from .conversation import Conversation
logger = get_module_logger("notification_handler")
class PFCNotificationHandler(NotificationHandler):
"""PFC通知处理器"""
def __init__(self, conversation: 'Conversation'):
"""初始化PFC通知处理器
Args:
conversation: 对话实例
"""
self.conversation = conversation
async def handle_notification(self, notification: Notification):
"""处理通知
Args:
notification: 通知对象
"""
logger.debug(f"收到通知: {notification.type.name}, 数据: {notification.data}")
# 根据通知类型执行不同的处理
if notification.type == NotificationType.NEW_MESSAGE:
# 新消息通知
await self._handle_new_message(notification)
elif notification.type == NotificationType.COLD_CHAT:
# 冷聊天通知
await self._handle_cold_chat(notification)
elif notification.type == NotificationType.COMMAND:
# 命令通知
await self._handle_command(notification)
else:
logger.warning(f"未知的通知类型: {notification.type.name}")
async def _handle_new_message(self, notification: Notification):
"""处理新消息通知
Args:
notification: 通知对象
"""
# 更新决策信息
observation_info = self.conversation.observation_info
observation_info.last_message_time = notification.data.get("time", 0)
observation_info.add_unprocessed_message(notification.data)
# 手动触发观察器更新
self.conversation.chat_observer.trigger_update()
async def _handle_cold_chat(self, notification: Notification):
"""处理冷聊天通知
Args:
notification: 通知对象
"""
# 获取冷聊天信息
cold_duration = notification.data.get("duration", 0)
# 更新决策信息
observation_info = self.conversation.observation_info
observation_info.conversation_cold_duration = cold_duration
logger.info(f"对话已冷: {cold_duration}")

View File

@@ -0,0 +1,246 @@
#Programmable Friendly Conversationalist
#Prefrontal cortex
from typing import List, Optional, Dict, Any, Set
from ..message.message_base import UserInfo
import time
from dataclasses import dataclass, field
from src.common.logger import get_module_logger
from .chat_observer import ChatObserver
from .chat_states import NotificationHandler
logger = get_module_logger("observation_info")
class ObservationInfoHandler(NotificationHandler):
"""ObservationInfo的通知处理器"""
def __init__(self, observation_info: 'ObservationInfo'):
"""初始化处理器
Args:
observation_info: 要更新的ObservationInfo实例
"""
self.observation_info = observation_info
async def handle_notification(self, notification: Dict[str, Any]):
"""处理通知
Args:
notification: 通知数据
"""
notification_type = notification.get("type")
data = notification.get("data", {})
if notification_type == "NEW_MESSAGE":
# 处理新消息通知
logger.debug(f"收到新消息通知data: {data}")
message = data.get("message", {})
self.observation_info.update_from_message(message)
# self.observation_info.has_unread_messages = True
# self.observation_info.new_unread_message.append(message.get("processed_plain_text", ""))
elif notification_type == "COLD_CHAT":
# 处理冷场通知
is_cold = data.get("is_cold", False)
self.observation_info.update_cold_chat_status(is_cold, time.time())
elif notification_type == "ACTIVE_CHAT":
# 处理活跃通知
is_active = data.get("is_active", False)
self.observation_info.is_cold = not is_active
elif notification_type == "BOT_SPEAKING":
# 处理机器人说话通知
self.observation_info.is_typing = False
self.observation_info.last_bot_speak_time = time.time()
elif notification_type == "USER_SPEAKING":
# 处理用户说话通知
self.observation_info.is_typing = False
self.observation_info.last_user_speak_time = time.time()
elif notification_type == "MESSAGE_DELETED":
# 处理消息删除通知
message_id = data.get("message_id")
self.observation_info.unprocessed_messages = [
msg for msg in self.observation_info.unprocessed_messages
if msg.get("message_id") != message_id
]
elif notification_type == "USER_JOINED":
# 处理用户加入通知
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.add(user_id)
elif notification_type == "USER_LEFT":
# 处理用户离开通知
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.discard(user_id)
elif notification_type == "ERROR":
# 处理错误通知
error_msg = data.get("error", "")
logger.error(f"收到错误通知: {error_msg}")
@dataclass
class ObservationInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息"""
#data_list
chat_history: List[str] = field(default_factory=list)
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list)
active_users: Set[str] = field(default_factory=set)
#data
last_bot_speak_time: Optional[float] = None
last_user_speak_time: Optional[float] = None
last_message_time: Optional[float] = None
last_message_content: str = ""
last_message_sender: Optional[str] = None
bot_id: Optional[str] = None
new_messages_count: int = 0
cold_chat_duration: float = 0.0
#state
is_typing: bool = False
has_unread_messages: bool = False
is_cold_chat: bool = False
changed: bool = False
# #spec
# meta_plan_trigger: bool = False
def __post_init__(self):
"""初始化后创建handler"""
self.chat_observer = None
self.handler = ObservationInfoHandler(self)
def bind_to_chat_observer(self, stream_id: str):
"""绑定到指定的chat_observer
Args:
stream_id: 聊天流ID
"""
self.chat_observer = ChatObserver.get_instance(stream_id)
self.chat_observer.notification_manager.register_handler(
target="observation_info",
notification_type="NEW_MESSAGE",
handler=self.handler
)
self.chat_observer.notification_manager.register_handler(
target="observation_info",
notification_type="COLD_CHAT",
handler=self.handler
)
def unbind_from_chat_observer(self):
"""解除与chat_observer的绑定"""
if self.chat_observer:
self.chat_observer.notification_manager.unregister_handler(
target="observation_info",
notification_type="NEW_MESSAGE",
handler=self.handler
)
self.chat_observer.notification_manager.unregister_handler(
target="observation_info",
notification_type="COLD_CHAT",
handler=self.handler
)
self.chat_observer = None
def update_from_message(self, message: Dict[str, Any]):
"""从消息更新信息
Args:
message: 消息数据
"""
logger.debug(f"更新信息from_message: {message}")
self.last_message_time = message["time"]
self.last_message_content = message.get("processed_plain_text", "")
user_info = UserInfo.from_dict(message.get("user_info", {}))
self.last_message_sender = user_info.user_id
if user_info.user_id == self.bot_id:
self.last_bot_speak_time = message["time"]
else:
self.last_user_speak_time = message["time"]
self.active_users.add(user_info.user_id)
self.new_messages_count += 1
self.unprocessed_messages.append(message)
self.update_changed()
def update_changed(self):
"""更新changed状态"""
self.changed = True
# self.meta_plan_trigger = True
def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态
Args:
is_cold: 是否冷场
current_time: 当前时间
"""
self.is_cold_chat = is_cold
if is_cold and self.last_message_time:
self.cold_chat_duration = current_time - self.last_message_time
def get_active_duration(self) -> float:
"""获取当前活跃时长
Returns:
float: 最后一条消息到现在的时长(秒)
"""
if not self.last_message_time:
return 0.0
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取用户响应时间
Returns:
Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None
"""
if not self.last_user_speak_time:
return None
return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人响应时间
Returns:
Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None
"""
if not self.last_bot_speak_time:
return None
return time.time() - self.last_bot_speak_time
def clear_unprocessed_messages(self):
"""清空未处理消息列表"""
# 将未处理消息添加到历史记录中
for message in self.unprocessed_messages:
if "processed_plain_text" in message:
self.chat_history.append(message["processed_plain_text"])
# 清空未处理消息列表
self.has_unread_messages = False
self.unprocessed_messages.clear()
self.new_messages_count = 0
def add_unprocessed_message(self, message: Dict[str, Any]):
"""添加未处理的消息
Args:
message: 消息数据
"""
# 防止重复添加同一消息
message_id = message.get("message_id")
if message_id and not any(m.get("message_id") == message_id for m in self.unprocessed_messages):
self.unprocessed_messages.append(message)
self.new_messages_count += 1
# 同时更新其他消息相关信息
self.update_from_message(message)

View File

@@ -2,8 +2,7 @@
#Prefrontal cortex
import datetime
import asyncio
from typing import List, Optional, Dict, Any, Tuple, Literal, Set
from enum import Enum
from typing import List, Optional, Tuple, TYPE_CHECKING
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo, Seg
@@ -14,34 +13,20 @@ from src.plugins.chat.message import MessageSending
from ..message.api import global_api
from ..storage.storage import MessageStorage
from .chat_observer import ChatObserver
from .reply_generator import ReplyGenerator
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .chat_states import NotificationHandler, Notification, NotificationType
from .waiter import Waiter
from .message_sender import DirectMessageSender
from .notification_handler import PFCNotificationHandler
import time
from dataclasses import dataclass, field
from .conversation import Conversation
if TYPE_CHECKING:
from .conversation import Conversation
logger = get_module_logger("pfc")
class ConversationState(Enum):
"""对话状态"""
INIT = "初始化"
RETHINKING = "重新思考"
ANALYZING = "分析历史"
PLANNING = "规划目标"
GENERATING = "生成回复"
CHECKING = "检查回复"
SENDING = "发送消息"
WAITING = "等待"
LISTENING = "倾听"
ENDED = "结束"
JUDGING = "判断"
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]
class GoalAnalyzer:
"""对话目标分析器"""
@@ -249,42 +234,33 @@ class GoalAnalyzer:
{{
"goal_achieved": true,
"stop_conversation": false,
"reason": "用户已经得到了满意的回答,但我仍希望继续聊天"
"reason": "虽然目标已达成,但对话仍然有继续的价值"
}}"""
logger.debug(f"发送到LLM的提示词: {prompt}")
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
# 使用简化函数提取JSON内容
# 尝试解析JSON
success, result = get_items_from_json(
content,
"goal_achieved", "stop_conversation", "reason",
required_types={
"goal_achieved": bool,
"stop_conversation": bool,
"reason": str
}
required_types={"goal_achieved": bool, "stop_conversation": bool, "reason": str}
)
if not success:
return False, False, "确保对话顺利进行"
logger.error("无法解析对话分析结果JSON")
return False, False, "解析结果失败"
# 如果当前目标达成,从目标列表中移除
if result["goal_achieved"] and not result["stop_conversation"]:
for i, (g, _, _) in enumerate(self.goals):
if g == goal:
self.goals.pop(i)
# 如果还有其他目标,不停止对话
if self.goals:
result["stop_conversation"] = False
break
goal_achieved = result["goal_achieved"]
stop_conversation = result["stop_conversation"]
reason = result["reason"]
return result["goal_achieved"], result["stop_conversation"], result["reason"]
return goal_achieved, stop_conversation, reason
except Exception as e:
logger.error(f"分析对话目标时出错: {str(e)}")
return False, False, "确保对话顺利进行"
logger.error(f"分析对话状态时出错: {str(e)}")
return False, False, f"分析出错: {str(e)}"
class Waiter:
@@ -319,63 +295,6 @@ class Waiter:
logger.info("等待中...")
class PFCNotificationHandler(NotificationHandler):
"""PFC的通知处理器"""
def __init__(self, conversation: 'Conversation'):
self.conversation = conversation
self.logger = get_module_logger("pfc_notification")
self.decision_info = conversation.decision_info
async def handle_notification(self, notification: Notification):
"""处理通知"""
try:
if not notification or not hasattr(notification, 'data') or notification.data is None:
self.logger.error("收到无效的通知notification 或 data 为空")
return
if notification.type == NotificationType.NEW_MESSAGE:
# 处理新消息通知
message = notification.data
if not isinstance(message, dict):
self.logger.error(f"无效的消息格式: {type(message)}")
return
content = message.get('content', '')
self.logger.info(f"收到新消息通知: {content[:30] if content else ''}...")
# 更新决策信息
try:
self.decision_info.update_from_message(message)
except Exception as e:
self.logger.error(f"更新决策信息失败: {e}")
return
# 触发对话系统更新
self.conversation.chat_observer.trigger_update()
elif notification.type == NotificationType.COLD_CHAT:
# 处理冷场通知
try:
is_cold = bool(notification.data.get("is_cold", False))
# 更新决策信息
self.decision_info.update_cold_chat_status(is_cold, time.time())
if is_cold:
self.logger.info("检测到对话冷场")
else:
self.logger.info("对话恢复活跃")
except Exception as e:
self.logger.error(f"处理冷场状态失败: {e}")
return
except Exception as e:
self.logger.error(f"处理通知时出错: {str(e)}")
# 添加更详细的错误信息
self.logger.error(f"通知类型: {getattr(notification, 'type', None)}")
self.logger.error(f"通知数据: {getattr(notification, 'data', None)}")
class DirectMessageSender:
"""直接发送消息到平台的发送器"""

View File

@@ -1,6 +1,6 @@
from typing import Dict, Optional
from src.common.logger import get_module_logger
from .pfc import Conversation
from .conversation import Conversation
import traceback
logger = get_module_logger("pfc_manager")

View File

@@ -0,0 +1,21 @@
from enum import Enum
from typing import Literal
class ConversationState(Enum):
"""对话状态"""
INIT = "初始化"
RETHINKING = "重新思考"
ANALYZING = "分析历史"
PLANNING = "规划目标"
GENERATING = "生成回复"
CHECKING = "检查回复"
SENDING = "发送消息"
FETCHING = "获取知识"
WAITING = "等待"
LISTENING = "倾听"
ENDED = "结束"
JUDGING = "判断"
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]

View File

@@ -1,24 +1,13 @@
import datetime
import asyncio
from typing import List, Optional, Dict, Any, Tuple, Literal, Set
from enum import Enum
from typing import List, Optional, Dict, Tuple
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo, Seg
from ..message.message_base import UserInfo
from ..chat.message import Message
from ..models.utils_model import LLM_request
from ..config.config import global_config
from src.plugins.chat.message import MessageSending
from ..message.api import global_api
from ..storage.storage import MessageStorage
from .chat_observer import ChatObserver
from .reply_checker import ReplyChecker
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .chat_states import NotificationHandler, Notification, NotificationType
import time
from dataclasses import dataclass, field
from .conversation import Conversation
logger = get_module_logger("reply_generator")

45
src/plugins/PFC/waiter.py Normal file
View File

@@ -0,0 +1,45 @@
from src.common.logger import get_module_logger
from .chat_observer import ChatObserver
logger = get_module_logger("waiter")
class Waiter:
"""等待器,用于等待对话流中的事件"""
def __init__(self, stream_id: str):
self.stream_id = stream_id
self.chat_observer = ChatObserver.get_instance(stream_id)
async def wait(self, timeout: float = 20.0) -> bool:
"""等待用户回复或超时
Args:
timeout: 超时时间(秒)
Returns:
bool: 如果因为超时返回则为True否则为False
"""
try:
message_before = self.chat_observer.get_last_message()
# 等待新消息
logger.debug(f"等待新消息,超时时间: {timeout}")
is_timeout = await self.chat_observer.wait_for_update(timeout=timeout)
if is_timeout:
logger.debug("等待超时,没有收到新消息")
return True
# 检查是否是新消息
message_after = self.chat_observer.get_last_message()
if message_before and message_after and message_before.get("message_id") == message_after.get("message_id"):
# 如果消息ID相同说明没有新消息
logger.debug("没有收到新消息")
return True
logger.debug("收到新消息")
return False
except Exception as e:
logger.error(f"等待时出错: {str(e)}")
return True