修复私聊PFC

This commit is contained in:
114514
2025-04-23 23:48:42 +08:00
parent 7281c13a12
commit 2732f40714
13 changed files with 681 additions and 282 deletions

View File

@@ -495,6 +495,9 @@ class BotConfig:
"llm_observation", "llm_observation",
"llm_sub_heartflow", "llm_sub_heartflow",
"llm_heartflow", "llm_heartflow",
"llm_PFC_action_planner",
"llm_PFC_chat",
"llm_PFC_reply_checker",
] ]
for item in config_list: for item in config_list:

View File

@@ -1,4 +1,5 @@
from typing import Tuple import time
from typing import Tuple, List, Dict, Any, Optional # 确保导入了必要的类型
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from ..models.utils_model import LLMRequest from ..models.utils_model import LLMRequest
from ...config.config import global_config from ...config.config import global_config
@@ -10,7 +11,8 @@ from .conversation_info import ConversationInfo
logger = get_module_logger("action_planner") logger = get_module_logger("action_planner")
# 注意:这个 ActionPlannerInfo 类似乎没有在 ActionPlanner 中使用,
# 如果确实没用,可以考虑移除,但暂时保留以防万一。
class ActionPlannerInfo: class ActionPlannerInfo:
def __init__(self): def __init__(self):
self.done_action = [] self.done_action = []
@@ -18,18 +20,18 @@ class ActionPlannerInfo:
self.knowledge_list = [] self.knowledge_list = []
self.memory_list = [] self.memory_list = []
# ActionPlanner 类定义,顶格
class ActionPlanner: class ActionPlanner:
"""行动规划器""" """行动规划器"""
def __init__(self, stream_id: str): def __init__(self, stream_id: str):
self.llm = LLMRequest( self.llm = LLMRequest(
model=global_config.llm_normal, model=global_config.llm_PFC_action_planner,
temperature=global_config.llm_normal["temp"], temperature=global_config.llm_PFC_action_planner["temp"],
max_tokens=1000, max_tokens=1500,
request_type="action_planning", request_type="action_planning",
) )
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2) self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=3)
self.identity_detail_info = Individuality.get_instance().get_prompt(type="identity", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id) self.chat_observer = ChatObserver.get_instance(stream_id)
@@ -43,140 +45,250 @@ class ActionPlanner:
Returns: Returns:
Tuple[str, str]: (行动类型, 行动原因) Tuple[str, str]: (行动类型, 行动原因)
""" """
# --- 获取 Bot 上次发言时间信息 ---
time_since_last_bot_message_info = ""
try:
bot_id = str(global_config.BOT_QQ)
if hasattr(observation_info, 'chat_history') and observation_info.chat_history:
for i in range(len(observation_info.chat_history) - 1, -1, -1):
msg = observation_info.chat_history[i]
if not isinstance(msg, dict):
continue
sender_info = msg.get('user_info', {})
sender_id = str(sender_info.get('user_id')) if isinstance(sender_info, dict) else None
msg_time = msg.get('time')
if sender_id == bot_id and msg_time:
time_diff = time.time() - msg_time
if time_diff < 60.0:
time_since_last_bot_message_info = f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n"
break
else:
logger.debug("Observation info chat history is empty or not available for bot time check.")
except AttributeError:
logger.warning("ObservationInfo object might not have chat_history attribute yet for bot time check.")
except Exception as e:
logger.warning(f"获取 Bot 上次发言时间时出错: {e}")
# --- 获取 Bot 上次发言时间信息结束 ---
timeout_context = ""
try: # 添加 try-except 以增加健壮性
if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list:
last_goal_tuple = conversation_info.goal_list[-1]
if isinstance(last_goal_tuple, tuple) and len(last_goal_tuple) > 0:
last_goal_text = last_goal_tuple[0]
if isinstance(last_goal_text, str) and "分钟,思考接下来要做什么" in last_goal_text:
try:
timeout_minutes_text = last_goal_text.split('')[0].replace('你等待了','')
timeout_context = f"重要提示:你刚刚因为对方长时间({timeout_minutes_text})没有回复而结束了等待,这可能代表在对方看来本次聊天已结束,请基于此情况规划下一步,不要重复等待前的发言。\n"
except Exception:
timeout_context = f"重要提示:你刚刚因为对方长时间没有回复而结束了等待,这可能代表在对方看来本次聊天已结束,请基于此情况规划下一步,不要重复等待前的发言。\n"
else:
logger.debug("Conversation info goal_list is empty or not available for timeout check.")
except AttributeError:
logger.warning("ConversationInfo object might not have goal_list attribute yet for timeout check.")
except Exception as e:
logger.warning(f"检查超时目标时出错: {e}")
# 构建提示词 # 构建提示词
logger.debug(f"开始规划行动:当前目标: {conversation_info.goal_list}") logger.debug(f"开始规划行动:当前目标: {getattr(conversation_info, 'goal_list', '不可用')}") # 使用 getattr
# 构建对话目标 # 构建对话目标 (goals_str)
goals_str = "" goals_str = ""
if conversation_info.goal_list: try: # 添加 try-except
for goal_reason in conversation_info.goal_list: if hasattr(conversation_info, 'goal_list') and conversation_info.goal_list:
# 处理字典或元组格式 for goal_reason in conversation_info.goal_list:
if isinstance(goal_reason, tuple): if isinstance(goal_reason, tuple) and len(goal_reason) > 0:
# 假设元组的第一个元素是目标,第二个元素是原因 goal = goal_reason[0]
goal = goal_reason[0] reasoning = goal_reason[1] if len(goal_reason) > 1 else "没有明确原因"
reasoning = goal_reason[1] if len(goal_reason) > 1 else "没有明确原因" elif isinstance(goal_reason, dict):
elif isinstance(goal_reason, dict): goal = goal_reason.get("goal", "目标内容缺失")
goal = goal_reason.get("goal") reasoning = goal_reason.get("reasoning", "没有明确原因")
reasoning = goal_reason.get("reasoning", "没有明确原因") else:
else: goal = str(goal_reason)
# 如果是其他类型,尝试转为字符串 reasoning = "没有明确原因"
goal = str(goal_reason) goal = str(goal) if goal is not None else "目标内容缺失"
reasoning = "没有明确原因" reasoning = str(reasoning) if reasoning is not None else "没有明确原因"
goal_str += f"- 目标:{goal}\n 原因:{reasoning}\n"
if not goals_str: # 如果循环后 goals_str 仍为空
goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
except AttributeError:
logger.warning("ConversationInfo object might not have goal_list attribute yet.")
goals_str = "- 获取对话目标时出错。\n"
except Exception as e:
logger.error(f"构建对话目标字符串时出错: {e}")
goals_str = "- 构建对话目标时出错。\n"
goal_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n" # 获取聊天历史记录 (chat_history_text)
goals_str += goal_str
else:
goal = "目前没有明确对话目标"
reasoning = "目前没有明确对话目标,最好思考一个对话目标"
goals_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
# 获取聊天历史记录
chat_history_list = (
observation_info.chat_history[-20:]
if len(observation_info.chat_history) >= 20
else observation_info.chat_history
)
chat_history_text = "" chat_history_text = ""
for msg in chat_history_list: try:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n" if hasattr(observation_info, 'chat_history') and observation_info.chat_history:
chat_history_list = observation_info.chat_history[-20:]
for msg in chat_history_list:
if isinstance(msg, dict) and 'detailed_plain_text' in msg:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
elif isinstance(msg, str):
chat_history_text += f"{msg}\n"
if not chat_history_text: # 如果历史记录是空列表
chat_history_text = "还没有聊天记录。\n"
else:
chat_history_text = "还没有聊天记录。\n"
if observation_info.new_messages_count > 0: if hasattr(observation_info, 'new_messages_count') and observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages if hasattr(observation_info, 'unprocessed_messages') and observation_info.unprocessed_messages:
new_messages_list = observation_info.unprocessed_messages
chat_history_text += f"--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n"
for msg in new_messages_list:
if isinstance(msg, dict) and 'detailed_plain_text' in msg:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
elif isinstance(msg, str):
chat_history_text += f"{msg}\n"
# 清理消息应该由调用者或 observation_info 内部逻辑处理,这里不再调用 clear
# if hasattr(observation_info, 'clear_unprocessed_messages'):
# observation_info.clear_unprocessed_messages()
else:
logger.warning("ObservationInfo has new_messages_count > 0 but unprocessed_messages is empty or missing.")
except AttributeError:
logger.warning("ObservationInfo object might be missing expected attributes for chat history.")
chat_history_text = "获取聊天记录时出错。\n"
except Exception as e:
logger.error(f"处理聊天记录时发生未知错误: {e}")
chat_history_text = "处理聊天记录时出错。\n"
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
observation_info.clear_unprocessed_messages() # 构建 Persona 文本 (persona_text)
identity_details_only = self.identity_detail_info
identity_addon = ""
if isinstance(identity_details_only, str):
pronouns = ["", "", ""]
original_details = identity_details_only
for p in pronouns:
if identity_details_only.startswith(p):
identity_details_only = identity_details_only[len(p):]
break
if identity_details_only.endswith(""):
identity_details_only = identity_details_only[:-1]
cleaned_details = identity_details_only.strip(', ')
if cleaned_details:
identity_addon = f"并且{cleaned_details}"
persona_text = f"你的名字是{self.name}{self.personality_info}{identity_addon}"
personality_text = f"你的名字是{self.name}{self.personality_info}" # --- 构建更清晰的行动历史和上一次行动结果 ---
action_history_summary = "你最近执行的行动历史:\n"
last_action_context = "关于你【上一次尝试】的行动:\n"
# 构建action历史文本 action_history_list = []
action_history_list = ( try: # 添加 try-except
conversation_info.done_action[-10:] if hasattr(conversation_info, 'done_action') and conversation_info.done_action:
if len(conversation_info.done_action) >= 10 action_history_list = conversation_info.done_action[-5:]
else conversation_info.done_action else:
) logger.debug("Conversation info done_action is empty or not available.")
action_history_text = "你之前做的事情是:" except AttributeError:
for action in action_history_list: logger.warning("ConversationInfo object might not have done_action attribute yet.")
if isinstance(action, dict): except Exception as e:
action_type = action.get("action") logger.error(f"访问行动历史时出错: {e}")
action_reason = action.get("reason")
action_status = action.get("status")
if action_status == "recall":
action_history_text += (
f"原本打算:{action_type},但是因为有新消息,你发现这个行动不合适,所以你没做\n"
)
elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
elif isinstance(action, tuple):
# 假设元组的格式是(action_type, action_reason, action_status)
action_type = action[0] if len(action) > 0 else "未知行动"
action_reason = action[1] if len(action) > 1 else "未知原因"
action_status = action[2] if len(action) > 2 else "done"
if action_status == "recall":
action_history_text += (
f"原本打算:{action_type},但是因为有新消息,你发现这个行动不合适,所以你没做\n"
)
elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动 if not action_history_list:
action_history_summary += "- 还没有执行过行动。\n"
last_action_context += "- 这是你规划的第一个行动。\n"
else:
for i, action_data in enumerate(action_history_list):
action_type = "未知"
plan_reason = "未知"
status = "未知"
final_reason = ""
action_time = ""
当前对话目标:{goals_str} if isinstance(action_data, dict):
action_type = action_data.get("action", "未知")
plan_reason = action_data.get("plan_reason", "未知规划原因")
status = action_data.get("status", "未知")
final_reason = action_data.get("final_reason", "")
action_time = action_data.get("time", "")
elif isinstance(action_data, tuple):
if len(action_data) > 0: action_type = action_data[0]
if len(action_data) > 1: plan_reason = action_data[1]
if len(action_data) > 2: status = action_data[2]
if status == "recall" and len(action_data) > 3: final_reason = action_data[3]
{action_history_text} reason_text = f", 失败/取消原因: {final_reason}" if final_reason else ""
summary_line = f"- 时间:{action_time}, 尝试行动:'{action_type}', 状态:{status}{reason_text}"
action_history_summary += summary_line + "\n"
最近的对话记录: if i == len(action_history_list) - 1:
{chat_history_text} last_action_context += f"- 上次【规划】的行动是: '{action_type}'\n"
last_action_context += f"- 当时规划的【原因】是: {plan_reason}\n"
if status == "done":
last_action_context += f"- 该行动已【成功执行】。\n"
elif status == "recall":
last_action_context += f"- 但该行动最终【未能执行/被取消】。\n"
if final_reason:
last_action_context += f"- 【重要】失败/取消的具体原因是: “{final_reason}\n"
else:
last_action_context += f"- 【重要】失败/取消原因未明确记录。\n"
else:
last_action_context += f"- 该行动当前状态: {status}\n"
请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言: # --- 构建最终的 Prompt ---
行动类型 prompt = f"""{persona_text}。现在你在参与一场QQ聊天请根据以下【所有信息】审慎决策下一步行动可以发言可以等待可以倾听可以调取知识
【当前对话目标】
{goals_str if goals_str.strip() else "- 目前没有明确对话目标,请考虑设定一个。\n"}
【最近行动历史概要】
{action_history_summary}
【上一次行动的详细情况和结果】
{last_action_context}
【时间和超时提示】
{time_since_last_bot_message_info}{timeout_context}
【最近的对话记录】(包括你已成功发送的消息 和 新收到的消息)
{chat_history_text if chat_history_text.strip() else "还没有聊天记录。\n"}
--- 行动决策指南 ---
1. **仔细分析【上一次行动的详细情况和结果】**。如果上次行动是 direct_reply 且因“内容与你上一条发言完全相同”或“高度相似”而被取消(status: recall),那么【绝对不要】立即再次规划 direct_reply。在这种特定情况下你应该优先考虑 wait (等待用户的新回应) 或 rethink_goal (如果对话似乎因此卡住了)。
2. 结合【当前对话目标】和【最近的对话记录】来判断是否需要回应、回应什么。如果【最近的对话记录】中有新的用户消息,通常需要 direct_reply。如果上次行动成功或者上次失败的原因不是重复可以根据对话内容考虑 direct_reply。
3. 注意【时间和超时提示】,如果对方长时间未回复(例如在 timeout_context 中提示end_conversation 可能更合适。
4. 只有在你确信需要发言(比如回应新消息、追问、深入话题),并且上一次行动没有因重复被拒时,才应优先选择 direct_reply。
--- 可选行动类型 ---
fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择 fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择
wait: 当你做出了发言,对方尚未回复时暂时等待对方的回复 wait: 等待对方回复(尤其是在你刚发言后、或上次发言因重复被拒时、或不确定做什么时,这是较安全的选择)
listening: 倾听对方发言,当你认为对方发言尚未结束时采用 listening: 倾听对方发言,当你认为对方发言尚未结束时采用
direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言 direct_reply: 直接回复或发送新消息,允许适当的追问和深入话题,**但是请务必遵守上面的决策指南,避免在因重复被拒后立即使用,也不要在对方没有回复的情况下过多的“消息轰炸”或重复发言**
rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标 rethink_goal: 重新思考对话目标,当发现对话目标不再适用或对话卡住时选择,注意私聊的环境是灵活的,有可能需要经常选择
end_conversation: 结束对话,长时间没回复或者当你觉得谈话暂时结束时选择,停止该场对话 end_conversation: 决定结束对话,对方长时间没回复或者当你觉得谈话暂时结束时可以选择
请以JSON格式输出,包含以下字段 请以JSON格式输出你的决策
1. action: 行动类型,注意你之前的行为 {{
2. reason: 选择行动的原因,注意你之前的行为(简要解释) "action": "选择行动类型 (必须是上面列表中的一个)",
"reason": "选择该行动的详细原因 (必须解释你是如何根据“上一次行动结果”、“对话记录”和“决策指南”做出判断的)"
}}
注意请严格按照JSON格式输出不要包含任何其他内容。""" 注意请严格按照JSON格式输出不要包含任何其他内容。"""
logger.debug(f"发送到LLM的提示词: {prompt}") logger.debug(f"发送到LLM的提示词 (已更新): {prompt}")
try: try:
content, _ = await self.llm.generate_response_async(prompt) content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}") logger.debug(f"LLM原始返回内容: {content}")
# 使用简化函数提取JSON内容
success, result = get_items_from_json( success, result = get_items_from_json(
content, "action", "reason", default_values={"action": "direct_reply", "reason": "没有明确原因"} content, "action", "reason",
default_values={"action": "wait", "reason": "LLM返回格式错误或未提供原因默认等待"}
) )
if not success: action = result.get("action", "wait")
return "direct_reply", "JSON解析失败选择直接回复" reason = result.get("reason", "LLM未提供原因默认等待")
action = result["action"]
reason = result["reason"]
# 验证action类型 # 验证action类型
if action not in [ valid_actions = ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal", "end_conversation"]
"direct_reply", if action not in valid_actions:
"fetch_knowledge", logger.warning(f"LLM返回了未知的行动类型: '{action}',强制改为 wait")
"wait", reason = f"(原始行动'{action}'无效已强制改为wait) {reason}"
"listening", action = "wait"
"rethink_goal",
"end_conversation",
]:
logger.warning(f"未知的行动类型: {action}默认使用listening")
action = "listening"
logger.info(f"规划的行动: {action}") logger.info(f"规划的行动: {action}")
logger.info(f"行动原因: {reason}") logger.info(f"行动原因: {reason}")
return action, reason return action, reason
except Exception as e: except Exception as e:
logger.error(f"规划行动时出错: {str(e)}") logger.error(f"规划行动时调用 LLM 或处理结果出错: {str(e)}")
return "direct_reply", "发生错误,选择直接回复" return "wait", f"行动规划处理中发生错误,暂时等待: {str(e)}"

View File

@@ -119,6 +119,7 @@ class ChatObserver:
self.last_cold_chat_check = current_time self.last_cold_chat_check = current_time
# 判断是否冷场 # 判断是否冷场
is_cold = False
if self.last_message_time is None: if self.last_message_time is None:
is_cold = True is_cold = True
else: else:
@@ -354,7 +355,7 @@ class ChatObserver:
Returns: Returns:
List[Dict[str, Any]]: 缓存的消息历史列表 List[Dict[str, Any]]: 缓存的消息历史列表
""" """
return self.message_cache[:limit] return self.message_cache[-limit:]
def get_last_message(self) -> Optional[Dict[str, Any]]: def get_last_message(self) -> Optional[Dict[str, Any]]:
"""获取最后一条消息 """获取最后一条消息
@@ -364,7 +365,7 @@ class ChatObserver:
""" """
if not self.message_cache: if not self.message_cache:
return None return None
return self.message_cache[0] return self.message_cache[-1]
def __str__(self): def __str__(self):
return f"ChatObserver for {self.stream_id}" return f"ChatObserver for {self.stream_id}"

View File

@@ -1,5 +1,8 @@
import time
import asyncio import asyncio
import datetime import datetime
from .message_storage import MongoDBMessageStorage
from ...config.config import global_config
from typing import Dict, Any from typing import Dict, Any
from ..chat.message import Message from ..chat.message import Message
from .pfc_types import ConversationState from .pfc_types import ConversationState
@@ -70,7 +73,42 @@ class Conversation:
logger.error(f"初始化对话实例:注册信息组件失败: {e}") logger.error(f"初始化对话实例:注册信息组件失败: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise raise
try:
logger.info(f"{self.stream_id} 加载初始聊天记录...")
storage = MongoDBMessageStorage() # 创建存储实例
# 获取当前时间点之前最多 N 条消息 (比如 30 条)
# get_messages_before 返回的是按时间正序排列的列表
initial_messages = await storage.get_messages_before(
chat_id=self.stream_id,
time_point=time.time(),
limit=30 # 加载最近20条作为初始上下文可以调整
)
if initial_messages:
# 将加载的消息填充到 ObservationInfo 的 chat_history
self.observation_info.chat_history = initial_messages
self.observation_info.chat_history_count = len(initial_messages)
# 更新 ObservationInfo 中的时间戳等信息
last_msg = initial_messages[-1]
self.observation_info.last_message_time = last_msg.get('time')
last_user_info = UserInfo.from_dict(last_msg.get("user_info", {}))
self.observation_info.last_message_sender = last_user_info.user_id
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
# (可选)可以遍历 initial_messages 来设置 last_bot_speak_time 和 last_user_speak_time
# 这里为了简化,只用了最后一条消息的时间,如果需要精确的发言者时间需要遍历
logger.info(f"成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}")
# 让 ChatObserver 从加载的最后一条消息之后开始同步
self.chat_observer.last_message_time = self.observation_info.last_message_time
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
else:
logger.info("没有找到初始聊天记录。")
except Exception as load_err:
logger.error(f"加载初始聊天记录时出错: {load_err}")
# 出错也要继续,只是没有历史记录而已
# 组件准备完成,启动该论对话 # 组件准备完成,启动该论对话
self.should_continue = True self.should_continue = True
asyncio.create_task(self.start()) asyncio.create_task(self.start())
@@ -86,24 +124,76 @@ class Conversation:
async def _plan_and_action_loop(self): async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块""" """思考步PFC核心循环模块"""
# 获取最近的消息历史
while self.should_continue: while self.should_continue:
# 使用决策信息来辅助行动规划 try:
action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info) # --- 在规划前记录当前新消息数量 ---
if self._check_new_messages_after_planning(): initial_new_message_count = 0
continue if hasattr(self.observation_info, 'new_messages_count'):
initial_new_message_count = self.observation_info.new_messages_count
else:
logger.warning("ObservationInfo missing 'new_messages_count' before planning.")
# 执行行动 # 使用决策信息来辅助行动规划
await self._handle_action(action, reason, self.observation_info, self.conversation_info) action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info) # 注意plan 函数内部现在不应再调用 clear_unprocessed_messages
for goal in self.conversation_info.goal_list: # --- 规划后检查是否有 *更多* 新消息到达 ---
# 检查goal是否为元组类型如果是元组则使用索引访问如果是字典则使用get方法 current_new_message_count = 0
if isinstance(goal, tuple): if hasattr(self.observation_info, 'new_messages_count'):
# 假设元组的第一个元素是目标内容 current_new_message_count = self.observation_info.new_messages_count
print(f"goal: {goal}") else:
if goal[0] == "结束对话": logger.warning("ObservationInfo missing 'new_messages_count' after planning.")
self.should_continue = False
break if current_new_message_count > initial_new_message_count:
# 只有当规划期间消息数量 *增加* 了,才认为需要重新规划
logger.info(f"规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划")
await asyncio.sleep(0.1) # 短暂延时
continue # 跳过本次行动,重新规划
# --- 如果没有在规划期间收到更多新消息,则准备执行行动 ---
# --- 清理未处理消息:移到这里,在执行动作前 ---
# 只有当确实有新消息被 planner 看到,并且 action 是要处理它们的时候才清理
if initial_new_message_count > 0 and action == "direct_reply":
if hasattr(self.observation_info, 'clear_unprocessed_messages'):
# 确保 clear_unprocessed_messages 方法存在
logger.debug(f"准备执行 direct_reply清理 {initial_new_message_count} 条规划时已知的新消息。")
self.observation_info.clear_unprocessed_messages()
# 手动重置计数器,确保状态一致性(理想情况下 clear 方法会做这个)
if hasattr(self.observation_info, 'new_messages_count'):
self.observation_info.new_messages_count = 0
else:
logger.error("无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!")
# 这里可能需要考虑是否继续执行 action或者抛出错误
# --- 执行行动 ---
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
goal_ended = False
if hasattr(self.conversation_info, 'goal_list') and self.conversation_info.goal_list:
for goal in self.conversation_info.goal_list:
if isinstance(goal, tuple) and len(goal) > 0 and goal[0] == "结束对话":
goal_ended = True
break
elif isinstance(goal, dict) and goal.get("goal") == "结束对话":
goal_ended = True
break
if goal_ended:
self.should_continue = False
logger.info("检测到'结束对话'目标,停止循环。")
# break # 可以选择在这里直接跳出循环
except Exception as loop_err:
logger.error(f"PFC主循环出错: {loop_err}")
logger.error(traceback.format_exc())
# 发生严重错误时可以考虑停止,或者至少等待一下再继续
await asyncio.sleep(1) # 发生错误时等待1秒
#添加短暂的异步睡眠
if self.should_continue: # 只有在还需要继续循环时才 sleep
await asyncio.sleep(0.1) # 等待 0.1 秒,给其他任务执行时间
logger.info(f"PFC 循环结束 for stream_id: {self.stream_id}") # 添加日志表明循环正常结束
def _check_new_messages_after_planning(self): def _check_new_messages_after_planning(self):
"""检查在规划后是否有新消息""" """检查在规划后是否有新消息"""
@@ -113,8 +203,7 @@ class Conversation:
return True return True
return False return False
@staticmethod def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
def _convert_to_message(msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象""" """将消息字典转换为Message对象"""
try: try:
chat_info = msg_dict.get("chat_info", {}) chat_info = msg_dict.get("chat_info", {})
@@ -124,7 +213,7 @@ class Conversation:
return Message( return Message(
message_id=msg_dict["message_id"], message_id=msg_dict["message_id"],
chat_stream=chat_stream, chat_stream=chat_stream,
timestamp=msg_dict["time"], time=msg_dict["time"],
user_info=user_info, user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""), processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""), detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
@@ -137,92 +226,152 @@ class Conversation:
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo
): ):
"""处理规划的行动""" """处理规划的行动"""
logger.info(f"执行行动: {action}, 原因: {reason}") logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史先设置为stop完成后再设置为done # 记录action历史先设置为start完成后再设置为done (这个 update 移到后面执行成功后再做)
conversation_info.done_action.append( current_action_record = {
{ "action": action,
"action": action, "plan_reason": reason, #使用 plan_reason 存储规划原因
"reason": reason, "status": "start", # 初始状态为 start
"status": "start", "time": datetime.datetime.now().strftime("%H:%M:%S"),
"time": datetime.datetime.now().strftime("%H:%M:%S"), "final_reason": None
} }
) conversation_info.done_action.append(current_action_record)
# 获取刚刚添加记录的索引,方便后面更新状态
action_index = len(conversation_info.done_action) - 1
# --- 根据不同的 action 执行 ---
if action == "direct_reply": if action == "direct_reply":
self.waiter.wait_accumulated_time = 0 # --- 这个 if 块内部的所有代码都需要正确缩进 ---
self.waiter.wait_accumulated_time = 0 # 重置等待时间
self.state = ConversationState.GENERATING self.state = ConversationState.GENERATING
# 生成回复
self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info) self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info)
print(f"生成回复: {self.generated_reply}") logger.info(f"生成回复: {self.generated_reply}") # 使用 logger
# # 检查回复是否合适 # --- 调用 ReplyChecker 检查回复 ---
# is_suitable, reason, need_replan = await self.reply_generator.check_reply( is_suitable = False # 先假定不合适,检查通过再改为 True
# self.generated_reply, check_reason = "检查未执行" # 用不同的变量名存储检查原因
# self.current_goal need_replan = False
# ) try:
# 尝试获取当前主要目标
if self._check_new_messages_after_planning(): current_goal_str = conversation_info.goal_list[0][0] if conversation_info.goal_list else ""
logger.info("333333发现新消息重新考虑行动")
conversation_info.done_action[-1].update( # 调用检查器
{ is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
"status": "recall", reply=self.generated_reply,
"time": datetime.datetime.now().strftime("%H:%M:%S"), goal=current_goal_str,
} chat_history=observation_info.chat_history, # 传入最新的历史记录!
retry_count=0
) )
return None logger.info(f"回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}")
await self._send_reply() except Exception as check_err:
logger.error(f"调用 ReplyChecker 时出错: {check_err}")
check_reason = f"检查过程出错: {check_err}" # 记录错误原因
# is_suitable 保持 False
conversation_info.done_action[-1].update( # --- 处理检查结果 ---
{ if is_suitable:
# 回复合适,继续执行
# 检查是否有新消息进来
if self._check_new_messages_after_planning():
logger.info("检查到新消息,取消发送已生成的回复,重新规划行动")
# 更新 action 状态为 recall
conversation_info.done_action[action_index].update({
"status": "recall",
"reason": f"有新消息,取消发送: {self.generated_reply}", # 更新原因
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
return None # 退出 _handle_action
# 发送回复
await self._send_reply() # 这个函数内部会处理自己的错误
# 更新 action 历史状态为 done
conversation_info.done_action[action_index].update({
"status": "done", "status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"), "time": datetime.datetime.now().strftime("%H:%M:%S"),
} })
)
return None else:
# 回复不合适
logger.warning(f"生成的回复被 ReplyChecker 拒绝: '{self.generated_reply}'. 原因: {check_reason}")
# 更新 action 状态为 recall (因为没执行发送)
conversation_info.done_action[action_index].update({
"status": "recall",
"final_reason": check_reason,
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
# 如果检查器建议重新规划
if need_replan:
logger.info("ReplyChecker 建议重新规划目标。")
# 可选:在此处清空目标列表以强制重新规划
# conversation_info.goal_list = []
# 注意:不发送消息,也不执行后面的代码
# --- 之前重复的代码块已被删除 ---
elif action == "fetch_knowledge": elif action == "fetch_knowledge":
self.waiter.wait_accumulated_time = 0 self.waiter.wait_accumulated_time = 0
self.state = ConversationState.FETCHING self.state = ConversationState.FETCHING
knowledge = "TODO:知识" knowledge = "TODO:知识"
topic = "TODO:关键词" topic = "TODO:关键词"
logger.info(f"假装获取到知识{knowledge},关键词是: {topic}") logger.info(f"假装获取到知识{knowledge},关键词是: {topic}")
if knowledge: if knowledge:
if topic not in self.conversation_info.knowledge_list: pass # 简单处理
self.conversation_info.knowledge_list.append({"topic": topic, "knowledge": knowledge}) # 标记 action 为 done
return None conversation_info.done_action[action_index].update({
else: "status": "done",
self.conversation_info.knowledge_list[topic] += knowledge "time": datetime.datetime.now().strftime("%H:%M:%S"),
return None })
return None
elif action == "rethink_goal": elif action == "rethink_goal":
self.waiter.wait_accumulated_time = 0 self.waiter.wait_accumulated_time = 0
self.state = ConversationState.RETHINKING self.state = ConversationState.RETHINKING
await self.goal_analyzer.analyze_goal(conversation_info, observation_info) await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
return None # 标记 action 为 done
conversation_info.done_action[action_index].update({
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
elif action == "listening": elif action == "listening":
self.state = ConversationState.LISTENING self.state = ConversationState.LISTENING
logger.info("倾听对方发言...") logger.info("倾听对方发言...")
await self.waiter.wait_listening(conversation_info) await self.waiter.wait_listening(conversation_info)
return None # listening 和 wait 通常在完成后不需要标记为 done因为它们是持续状态
# 但如果需要记录,可以在 waiter 返回后标记。目前逻辑是 waiter 返回后主循环继续。
# 为了统一,可以暂时在这里也标记一下(或者都不标记)
conversation_info.done_action[action_index].update({
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
elif action == "end_conversation": elif action == "end_conversation":
self.should_continue = False self.should_continue = False # 设置循环停止标志
logger.info("决定结束对话...") logger.info("决定结束对话...")
return None # 标记 action 为 done
conversation_info.done_action[action_index].update({
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
# 这里不需要 return主循环会在下一轮检查 should_continue
else: # wait else: # 对应 'wait' 动作
self.state = ConversationState.WAITING self.state = ConversationState.WAITING
logger.info("等待更多信息...") logger.info("等待更多信息...")
await self.waiter.wait(self.conversation_info) await self.waiter.wait(self.conversation_info)
return None # 同 listening可以考虑是否标记状态
conversation_info.done_action[action_index].update({
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
async def _send_timeout_message(self): async def _send_timeout_message(self):
"""发送超时结束消息""" """发送超时结束消息"""
@@ -245,12 +394,53 @@ class Conversation:
return return
try: try:
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=self.generated_reply) # 外层 try: 捕获发送消息和后续处理中的主要错误
self.chat_observer.trigger_update() # 触发立即更新 current_time = time.time() # 获取当前时间戳
if not await self.chat_observer.wait_for_update(): reply_content = self.generated_reply # 获取要发送的内容
logger.warning("等待消息更新超时")
# 发送消息
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
logger.info(f"消息已发送: {reply_content}") # 可以在发送后加个日志确认
# --- 添加的立即更新状态逻辑开始 ---
try:
# 内层 try: 专门捕获手动更新状态时可能出现的错误
# 创建一个代表刚刚发送的消息的字典
bot_message_info = {
"message_id": f"bot_sent_{current_time}", # 创建一个简单的唯一ID
"time": current_time,
"user_info": UserInfo( # 使用 UserInfo 类构建用户信息
user_id=str(global_config.BOT_QQ),
user_nickname=global_config.BOT_NICKNAME,
platform=self.chat_stream.platform # 从 chat_stream 获取平台信息
).to_dict(), # 转换为字典格式存储
"processed_plain_text": reply_content, # 使用发送的内容
"detailed_plain_text": f"{int(current_time)},{global_config.BOT_NICKNAME}:{reply_content}", # 构造一个简单的详细文本, 时间戳取整
# 可以根据需要添加其他字段,保持与 observation_info.chat_history 中其他消息结构一致
}
# 直接更新 ObservationInfo 实例
if self.observation_info:
self.observation_info.chat_history.append(bot_message_info) # 将消息添加到历史记录末尾
self.observation_info.last_bot_speak_time = current_time # 更新 Bot 最后发言时间
self.observation_info.last_message_time = current_time # 更新最后消息时间
logger.debug("已手动将Bot发送的消息添加到 ObservationInfo")
else:
logger.warning("无法手动更新 ObservationInfo实例不存在")
except Exception as update_err:
logger.error(f"手动更新 ObservationInfo 时出错: {update_err}")
# --- 添加的立即更新状态逻辑结束 ---
# 原有的触发更新和等待代码
self.chat_observer.trigger_update()
if not await self.chat_observer.wait_for_update():
logger.warning("等待 ChatObserver 更新完成超时")
self.state = ConversationState.ANALYZING # 更新对话状态
self.state = ConversationState.ANALYZING
except Exception as e: except Exception as e:
logger.error(f"发送消息失败: {str(e)}") # 这是外层 try 对应的 except
self.state = ConversationState.ANALYZING logger.error(f"发送消息或更新状态时失败: {str(e)}")
self.state = ConversationState.ANALYZING # 出错也要尝试恢复状态

View File

@@ -4,7 +4,7 @@ from ..chat.chat_stream import ChatStream
from ..chat.message import Message from ..chat.message import Message
from ..message.message_base import Seg from ..message.message_base import Seg
from src.plugins.chat.message import MessageSending, MessageSet from src.plugins.chat.message import MessageSending, MessageSet
from src.plugins.chat.messagesender import message_manager from src.plugins.chat.message_sender import message_manager
logger = get_module_logger("message_sender") logger = get_module_logger("message_sender")
@@ -15,8 +15,8 @@ class DirectMessageSender:
def __init__(self): def __init__(self):
pass pass
@staticmethod
async def send_message( async def send_message(
self,
chat_stream: ChatStream, chat_stream: ChatStream,
content: str, content: str,
reply_to_message: Optional[Message] = None, reply_to_message: Optional[Message] = None,

View File

@@ -50,16 +50,21 @@ class MessageStorage(ABC):
class MongoDBMessageStorage(MessageStorage): class MongoDBMessageStorage(MessageStorage):
"""MongoDB消息存储实现""" """MongoDB消息存储实现"""
def __init__(self):
self.db = db
async def get_messages_after(self, chat_id: str, message_time: float) -> List[Dict[str, Any]]: async def get_messages_after(self, chat_id: str, message_time: float) -> List[Dict[str, Any]]:
query = {"chat_id": chat_id, "time": {"$gt": message_time}} query = {"chat_id": chat_id}
# print(f"storage_check_message: {message_time}") # print(f"storage_check_message: {message_time}")
return list(db.messages.find(query).sort("time", 1)) query["time"] = {"$gt": message_time}
return list(self.db.messages.find(query).sort("time", 1))
async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]: async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]:
query = {"chat_id": chat_id, "time": {"$lt": time_point}} query = {"chat_id": chat_id, "time": {"$lt": time_point}}
messages = list(db.messages.find(query).sort("time", -1).limit(limit)) messages = list(self.db.messages.find(query).sort("time", -1).limit(limit))
# 将消息按时间正序排列 # 将消息按时间正序排列
messages.reverse() messages.reverse()
@@ -68,7 +73,7 @@ class MongoDBMessageStorage(MessageStorage):
async def has_new_messages(self, chat_id: str, after_time: float) -> bool: async def has_new_messages(self, chat_id: str, after_time: float) -> bool:
query = {"chat_id": chat_id, "time": {"$gt": after_time}} query = {"chat_id": chat_id, "time": {"$gt": after_time}}
return db.messages.find_one(query) is not None return self.db.messages.find_one(query) is not None
# # 创建一个内存消息存储实现,用于测试 # # 创建一个内存消息存储实现,用于测试

View File

@@ -120,10 +120,6 @@ class ObservationInfo:
# #spec # #spec
# meta_plan_trigger: bool = False # meta_plan_trigger: bool = False
def __init__(self):
self.last_message_id = None
self.chat_observer = None
def __post_init__(self): def __post_init__(self):
"""初始化后创建handler""" """初始化后创建handler"""
self.chat_observer = None self.chat_observer = None
@@ -133,7 +129,7 @@ class ObservationInfo:
"""绑定到指定的chat_observer """绑定到指定的chat_observer
Args: Args:
chat_observer: 要绑定的ChatObserver实例 stream_id: 聊天流ID
""" """
self.chat_observer = chat_observer self.chat_observer = chat_observer
self.chat_observer.notification_manager.register_handler( self.chat_observer.notification_manager.register_handler(
@@ -175,8 +171,7 @@ class ObservationInfo:
self.last_bot_speak_time = message["time"] self.last_bot_speak_time = message["time"]
else: else:
self.last_user_speak_time = message["time"] self.last_user_speak_time = message["time"]
if user_info.user_id is not None: self.active_users.add(user_info.user_id)
self.active_users.add(str(user_info.user_id))
self.new_messages_count += 1 self.new_messages_count += 1
self.unprocessed_messages.append(message) self.unprocessed_messages.append(message)
@@ -232,7 +227,7 @@ class ObservationInfo:
"""清空未处理消息列表""" """清空未处理消息列表"""
# 将未处理消息添加到历史记录中 # 将未处理消息添加到历史记录中
for message in self.unprocessed_messages: for message in self.unprocessed_messages:
self.chat_history.append(message) # TODO NEED FIX TYPE??? self.chat_history.append(message)
# 清空未处理消息列表 # 清空未处理消息列表
self.has_unread_messages = False self.has_unread_messages = False
self.unprocessed_messages.clear() self.unprocessed_messages.clear()

View File

@@ -34,7 +34,8 @@ class GoalAnalyzer:
model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="conversation_goal" model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="conversation_goal"
) )
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2) self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=3)
self.identity_detail_info = Individuality.get_instance().get_prompt(type="identity", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME self.name = global_config.BOT_NICKNAME
self.nick_name = global_config.BOT_ALIAS_NAMES self.nick_name = global_config.BOT_ALIAS_NAMES
self.chat_observer = ChatObserver.get_instance(stream_id) self.chat_observer = ChatObserver.get_instance(stream_id)
@@ -93,15 +94,28 @@ class GoalAnalyzer:
observation_info.clear_unprocessed_messages() observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}" identity_details_only = self.identity_detail_info
identity_addon = ""
if isinstance(identity_details_only, str):
pronouns = ["", "", ""]
for p in pronouns:
if identity_details_only.startswith(p):
identity_details_only = identity_details_only[len(p):]
break
if identity_details_only.endswith(""):
identity_details_only = identity_details_only[:-1]
cleaned_details = identity_details_only.strip(', ')
if cleaned_details:
identity_addon = f"并且{cleaned_details}"
persona_text = f"你的名字是{self.name}{self.personality_info}{identity_addon}"
# 构建action历史文本 # 构建action历史文本
action_history_list = conversation_info.done_action action_history_list = conversation_info.done_action
action_history_text = "你之前做的事情是:" action_history_text = "你之前做的事情是:"
for action in action_history_list: for action in action_history_list:
action_history_text += f"{action}\n" action_history_text += f"{action}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下聊天记录并根据你的性格特征确定多个明确的对话目标。 prompt = f"""{persona_text}。现在你在参与一场QQ聊天请分析以下聊天记录并根据你的性格特征确定多个明确的对话目标。
这些目标应该反映出对话的不同方面和意图。 这些目标应该反映出对话的不同方面和意图。
{action_history_text} {action_history_text}
@@ -160,16 +174,16 @@ class GoalAnalyzer:
# 返回第一个目标作为当前主要目标(如果有) # 返回第一个目标作为当前主要目标(如果有)
if result: if result:
first_goal = result[0] first_goal = result[0]
return first_goal.get("goal", ""), "", first_goal.get("reasoning", "") return (first_goal.get("goal", ""), "", first_goal.get("reasoning", ""))
else: else:
# 单个目标的情况 # 单个目标的情况
goal = result.get("goal", "") goal = result.get("goal", "")
reasoning = result.get("reasoning", "") reasoning = result.get("reasoning", "")
conversation_info.goal_list.append((goal, reasoning)) conversation_info.goal_list.append((goal, reasoning))
return goal, "", reasoning return (goal, "", reasoning)
# 如果解析失败,返回默认值 # 如果解析失败,返回默认值
return "", "", "" return ("", "", "")
async def _update_goals(self, new_goal: str, method: str, reasoning: str): async def _update_goals(self, new_goal: str, method: str, reasoning: str):
"""更新目标列表 """更新目标列表
@@ -195,8 +209,7 @@ class GoalAnalyzer:
if len(self.goals) > self.max_goals: if len(self.goals) > self.max_goals:
self.goals.pop() # 移除最老的目标 self.goals.pop() # 移除最老的目标
@staticmethod def _calculate_similarity(self, goal1: str, goal2: str) -> float:
def _calculate_similarity(goal1: str, goal2: str) -> float:
"""简单计算两个目标之间的相似度 """简单计算两个目标之间的相似度
这里使用一个简单的实现,实际可以使用更复杂的文本相似度算法 这里使用一个简单的实现,实际可以使用更复杂的文本相似度算法
@@ -244,9 +257,25 @@ class GoalAnalyzer:
sender = "你说" sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n" chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
personality_text = f"你的名字是{self.name}{self.personality_info}" identity_details_only = self.identity_detail_info
identity_addon = ""
if isinstance(identity_details_only, str):
pronouns = ["", "", ""]
for p in pronouns:
if identity_details_only.startswith(p):
identity_details_only = identity_details_only[len(p):]
break
if identity_details_only.endswith(""):
identity_details_only = identity_details_only[:-1]
cleaned_details = identity_details_only.strip(', ')
if cleaned_details:
identity_addon = f"并且{cleaned_details}"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天 persona_text = f"你的名字是{self.name}{self.personality_info}{identity_addon}"
# ===> Persona 文本构建结束 <===
# --- 修改 Prompt 字符串,使用 persona_text ---
prompt = f"""{persona_text}。现在你在参与一场QQ聊天
当前对话目标:{goal} 当前对话目标:{goal}
产生该对话目标的原因:{reasoning} 产生该对话目标的原因:{reasoning}
@@ -300,8 +329,7 @@ class DirectMessageSender:
self.logger = get_module_logger("direct_sender") self.logger = get_module_logger("direct_sender")
self.storage = MessageStorage() self.storage = MessageStorage()
@staticmethod async def send_via_ws(self, message: MessageSending) -> None:
async def send_via_ws(message: MessageSending) -> None:
try: try:
await global_api.send_message(message) await global_api.send_message(message)
except Exception as e: except Exception as e:
@@ -352,7 +380,7 @@ class DirectMessageSender:
# logger.info(f"发送消息到{end_point}") # logger.info(f"发送消息到{end_point}")
# logger.info(message_json) # logger.info(message_json)
try: try:
await global_api.send_message_rest(end_point, message_json) await global_api.send_message_REST(end_point, message_json)
except Exception as e: except Exception as e:
logger.error(f"REST方式发送失败出现错误: {str(e)}") logger.error(f"REST方式发送失败出现错误: {str(e)}")
logger.info("尝试使用ws发送") logger.info("尝试使用ws发送")

View File

@@ -19,8 +19,7 @@ class KnowledgeFetcher:
request_type="knowledge_fetch", request_type="knowledge_fetch",
) )
@staticmethod async def fetch(self, query: str, chat_history: List[Message]) -> Tuple[str, str]:
async def fetch(query: str, chat_history: List[Message]) -> Tuple[str, str]:
"""获取相关知识 """获取相关知识
Args: Args:

View File

@@ -1,6 +1,6 @@
import json import json
import datetime import datetime
from typing import Tuple from typing import Tuple, List, Dict, Any
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from ..models.utils_model import LLMRequest from ..models.utils_model import LLMRequest
from ...config.config import global_config from ...config.config import global_config
@@ -15,13 +15,13 @@ class ReplyChecker:
def __init__(self, stream_id: str): def __init__(self, stream_id: str):
self.llm = LLMRequest( self.llm = LLMRequest(
model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="reply_check" model=global_config.llm_PFC_reply_checker, temperature=0.55, max_tokens=1000, request_type="reply_check"
) )
self.name = global_config.BOT_NICKNAME self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id) self.chat_observer = ChatObserver.get_instance(stream_id)
self.max_retries = 2 # 最大重试次数 self.max_retries = 2 # 最大重试次数
async def check(self, reply: str, goal: str, retry_count: int = 0) -> Tuple[bool, str, bool]: async def check(self, reply: str, goal: str, chat_history: List[Dict[str, Any]], retry_count: int = 0) -> Tuple[bool, str, bool]:
"""检查生成的回复是否合适 """检查生成的回复是否合适
Args: Args:
@@ -32,10 +32,41 @@ class ReplyChecker:
Returns: Returns:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
""" """
# 获取最新的消息记录 # 不再从 observer 获取,直接使用传入的 chat_history
messages = self.chat_observer.get_cached_messages(limit=5) # messages = self.chat_observer.get_cached_messages(limit=20)
chat_history_text = "" chat_history_text = ""
for msg in messages: try:
# 筛选出最近由 Bot 自己发送的消息
bot_messages = []
for msg in reversed(chat_history):
user_info = UserInfo.from_dict(msg.get("user_info", {}))
if str(user_info.user_id) == str(global_config.BOT_QQ): # 确保比较的是字符串
bot_messages.append(msg.get('processed_plain_text', ''))
if len(bot_messages) >= 2: # 只和最近的两条比较
break
# 进行比较
if bot_messages:
# 可以用简单比较,或者更复杂的相似度库 (如 difflib)
# 简单比较:是否完全相同
if reply == bot_messages[0]: # 和最近一条完全一样
logger.warning(f"ReplyChecker 检测到回复与上一条 Bot 消息完全相同: '{reply}'")
return False, "回复内容与你上一条发言完全相同,请修改,可以选择深入话题或寻找其它话题或等待", False # 不合适,无需重新规划
# 2. 相似度检查 (如果精确匹配未通过)
import difflib # 导入 difflib 库
# 计算编辑距离相似度ratio() 返回 0 到 1 之间的浮点数
similarity_ratio = difflib.SequenceMatcher(None, reply, bot_messages[0]).ratio()
logger.debug(f"ReplyChecker - 相似度: {similarity_ratio:.2f}")
# 设置一个相似度阈值
similarity_threshold = 0.9
if similarity_ratio > similarity_threshold:
logger.warning(f"ReplyChecker 检测到回复与上一条 Bot 消息高度相似 (相似度 {similarity_ratio:.2f}): '{reply}'")
return False, f"拒绝发送:回复内容与你上一条发言高度相似 (相似度 {similarity_ratio:.2f}),请修改,可以选择深入话题或寻找其它话题或等待。", False
except Exception as self_check_err:
logger.error(f"检查自身重复发言时出错: {self_check_err}")
for msg in chat_history[-20:]:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {})) user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}" sender = user_info.user_nickname or f"用户{user_info.user_id}"
@@ -43,7 +74,7 @@ class ReplyChecker:
sender = "你说" sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n" chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
prompt = f"""请检查以下回复是否合适: prompt = f"""请检查以下回复或消息是否合适:
当前对话目标:{goal} 当前对话目标:{goal}
最新的对话记录: 最新的对话记录:
@@ -52,12 +83,18 @@ class ReplyChecker:
待检查的回复: 待检查的回复:
{reply} {reply}
请检查以下几点: 结合聊天记录检查以下几点:
1. 回复是否依然符合当前对话目标和实现方式 1. 回复是否依然符合当前对话目标和实现方式
2. 回复是否与最新的对话记录保持一致性 2. 回复是否与最新的对话记录保持一致性
3. 回复是否重复发言,重复表达 3. 回复是否重复发言,重复表达同质内容(尤其是只是换一种方式表达了相同的含义)
4. 回复是否包含违法违规内容(政治敏感、暴力等) 4. 回复是否包含政治敏感内容
5. 回复是否以你的角度发言,不要把""说的话当做对方说的话,这是你自己说的话 5. 回复是否以你的角度发言,不要把""说的话当做对方说的话,这是你自己说的话(不要自己回复自己的消息)
6. 回复是否通俗易懂
7. 回复是否有些多余,例如在对方没有回复的情况下,依然连续多次“消息轰炸”
8. 回复是否使用了完全没必要的修辞
9. 回复是否逻辑通顺
10. 回复是否太过冗长了通常私聊的每条消息长度在20字以内除非特殊情况
11. 在连续多次发送消息的情况下,当前回复是否衔接自然,会不会显得奇怪
请以JSON格式输出包含以下字段 请以JSON格式输出包含以下字段
1. suitable: 是否合适 (true/false) 1. suitable: 是否合适 (true/false)

View File

@@ -1,4 +1,4 @@
from typing import Tuple from typing import Tuple, List, Dict, Any
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from ..models.utils_model import LLMRequest from ..models.utils_model import LLMRequest
from ...config.config import global_config from ...config.config import global_config
@@ -16,12 +16,13 @@ class ReplyGenerator:
def __init__(self, stream_id: str): def __init__(self, stream_id: str):
self.llm = LLMRequest( self.llm = LLMRequest(
model=global_config.llm_normal, model=global_config.llm_PFC_chat,
temperature=global_config.llm_normal["temp"], temperature=global_config.llm_PFC_chat["temp"],
max_tokens=300, max_tokens=300,
request_type="reply_generation", request_type="reply_generation",
) )
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2) self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=3)
self.identity_detail_info = Individuality.get_instance().get_prompt(type="identity", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id) self.chat_observer = ChatObserver.get_instance(stream_id)
self.reply_checker = ReplyChecker(stream_id) self.reply_checker = ReplyChecker(stream_id)
@@ -30,8 +31,11 @@ class ReplyGenerator:
"""生成回复 """生成回复
Args: Args:
observation_info: 观察信息 goal: 对话目标
conversation_info: 对话信息 chat_history: 聊天历史
knowledge_cache: 知识缓存
previous_reply: 上一次生成的回复(如果有)
retry_count: 当前重试次数
Returns: Returns:
str: 生成的回复 str: 生成的回复
@@ -82,8 +86,20 @@ class ReplyGenerator:
observation_info.clear_unprocessed_messages() observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}" identity_details_only = self.identity_detail_info
identity_addon = ""
if isinstance(identity_details_only, str):
pronouns = ["", "", ""]
for p in pronouns:
if identity_details_only.startswith(p):
identity_details_only = identity_details_only[len(p):]
break
if identity_details_only.endswith(""):
identity_details_only = identity_details_only[:-1]
cleaned_details = identity_details_only.strip(', ')
if cleaned_details:
identity_addon = f"并且{cleaned_details}"
persona_text = f"你的名字是{self.name}{self.personality_info}{identity_addon}"
# 构建action历史文本 # 构建action历史文本
action_history_list = ( action_history_list = (
conversation_info.done_action[-10:] conversation_info.done_action[-10:]
@@ -114,21 +130,23 @@ class ReplyGenerator:
elif action_status == "done": elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n" action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请根据以下信息生成回复 prompt = f"""{persona_text}。现在你在参与一场QQ聊天请根据以下信息生成一条新消息
当前对话目标:{goals_str} 当前对话目标:{goals_str}
最近的聊天记录: 最近的聊天记录:
{chat_history_text} {chat_history_text}
请根据上述信息,以你的性格特征生成一个自然、得体的回复。回复应该: 请根据上述信息,结合聊天记录,发一条消息(可以是回复,补充,深入话题,或追问等等)。该消息应该:
1. 符合对话目标,以""的角度发言 1. 符合对话目标,以""的角度发言(不要自己与自己对话!)
2. 体现你的性格特征 2. 符合你的性格特征和身份细节
3. 自然流畅,像正常聊天一样,简短 3. 自然流畅,像正常聊天一样,简短通常20字以内除非特殊情况
4. 适当利用相关知识,但不要生硬引用 4. 适当利用相关知识,但不要生硬引用
5. 自然、得体,结合聊天记录逻辑合理,且没有重复表达同质内容
**注意:如果聊天记录中最新的消息是你自己发送的,那么你的思路不应该是“回复”,而是应该紧紧衔接你发送的消息,进行话题的深入,补充,或追问等等;**
请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清""和对方说的话,不要把""说的话当做对方说的话,这是你自己说的话。 请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清""和对方说的话,不要把""说的话当做对方说的话,这是你自己说的话。
请你回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话 可以回复得自然随意自然一些,就像真人一样,注意把握聊天内容,整体风格可以平和、简短,不要刻意突出自身学科背景,不要说你说过的话,可以简短,多简短都可以,但是避免冗长。
请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。 请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。 不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。
@@ -151,10 +169,10 @@ class ReplyGenerator:
return content return content
except Exception as e: except Exception as e:
logger.error(f"生成回复时出错: {str(e)}") logger.error(f"生成回复时出错: {e}")
return "抱歉,我现在有点混乱,让我重新思考一下..." return "抱歉,我现在有点混乱,让我重新思考一下..."
async def check_reply(self, reply: str, goal: str, retry_count: int = 0) -> Tuple[bool, str, bool]: async def check_reply(self, reply: str, goal: str, chat_history: List[Dict[str, Any]], retry_count: int = 0) -> Tuple[bool, str, bool]:
"""检查回复是否合适 """检查回复是否合适
Args: Args:
@@ -165,4 +183,4 @@ class ReplyGenerator:
Returns: Returns:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
""" """
return await self.reply_checker.check(reply, goal, retry_count) return await self.reply_checker.check(reply, goal, chat_history, retry_count)

View File

@@ -1,85 +1,74 @@
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from .chat_observer import ChatObserver from .chat_observer import ChatObserver
from .conversation_info import ConversationInfo from .conversation_info import ConversationInfo
from src.individuality.individuality import Individuality # from src.individuality.individuality import Individuality # 不再需要
from ...config.config import global_config from ...config.config import global_config
import time import time
import asyncio import asyncio
logger = get_module_logger("waiter") logger = get_module_logger("waiter")
# --- 在这里设定你想要的超时时间(秒) ---
# 例如: 120 秒 = 2 分钟
DESIRED_TIMEOUT_SECONDS = 300
class Waiter: class Waiter:
"""快 速 等 待""" """等待处理类"""
def __init__(self, stream_id: str): def __init__(self, stream_id: str):
self.chat_observer = ChatObserver.get_instance(stream_id) self.chat_observer = ChatObserver.get_instance(stream_id)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME self.name = global_config.BOT_NICKNAME
# self.wait_accumulated_time = 0 # 不再需要累加计时
self.wait_accumulated_time = 0
async def wait(self, conversation_info: ConversationInfo) -> bool: async def wait(self, conversation_info: ConversationInfo) -> bool:
"""等待 """等待用户新消息或超时"""
Returns:
bool: 是否超时True表示超时
"""
# 使用当前时间作为等待开始时间
wait_start_time = time.time() wait_start_time = time.time()
self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间 logger.info(f"进入常规等待状态 (超时: {DESIRED_TIMEOUT_SECONDS} 秒)...")
while True: while True:
# 检查是否有新消息 # 检查是否有新消息
if self.chat_observer.new_message_after(wait_start_time): if self.chat_observer.new_message_after(wait_start_time):
logger.info("等待结束,收到新消息") logger.info("等待结束,收到新消息")
return False return False # 返回 False 表示不是超时
# 检查是否超时 # 检查是否超时
if time.time() - wait_start_time > 300: elapsed_time = time.time() - wait_start_time
self.wait_accumulated_time += 300 if elapsed_time > DESIRED_TIMEOUT_SECONDS:
logger.info(f"等待超过 {DESIRED_TIMEOUT_SECONDS} 秒...添加思考目标。")
logger.info("等待超过300秒结束对话")
wait_goal = { wait_goal = {
"goal": f"你等待了{self.wait_accumulated_time / 60}分钟,思考接下来要做什么", "goal": f"你等待了{elapsed_time / 60:.1f}分钟,注意可能在对方看来聊天已经结束,思考接下来要做什么",
"reason": "对方很久没有回复你的消息了", "reason": "对方很久没有回复你的消息了",
} }
conversation_info.goal_list.append(wait_goal) conversation_info.goal_list.append(wait_goal)
print(f"添加目标: {wait_goal}") logger.info(f"添加目标: {wait_goal}")
return True # 返回 True 表示超时
return True await asyncio.sleep(5) # 每 5 秒检查一次
logger.info("等待中...") # 可以考虑把这个频繁日志注释掉,只在超时或收到消息时输出
await asyncio.sleep(1)
logger.info("等待中...")
async def wait_listening(self, conversation_info: ConversationInfo) -> bool: async def wait_listening(self, conversation_info: ConversationInfo) -> bool:
"""等待倾听 """倾听用户发言或超时"""
Returns:
bool: 是否超时True表示超时
"""
# 使用当前时间作为等待开始时间
wait_start_time = time.time() wait_start_time = time.time()
self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间 logger.info(f"进入倾听等待状态 (超时: {DESIRED_TIMEOUT_SECONDS} 秒)...")
while True: while True:
# 检查是否有新消息 # 检查是否有新消息
if self.chat_observer.new_message_after(wait_start_time): if self.chat_observer.new_message_after(wait_start_time):
logger.info("等待结束,收到新消息") logger.info("倾听等待结束,收到新消息")
return False return False # 返回 False 表示不是超时
# 检查是否超时 # 检查是否超时
if time.time() - wait_start_time > 300: elapsed_time = time.time() - wait_start_time
self.wait_accumulated_time += 300 if elapsed_time > DESIRED_TIMEOUT_SECONDS:
logger.info("等待超过300秒结束对话") logger.info(f"倾听等待超过 {DESIRED_TIMEOUT_SECONDS} 秒...添加思考目标。")
wait_goal = { wait_goal = {
"goal": f"你等待了{self.wait_accumulated_time / 60}分钟,思考接下来要做什么", # 保持 goal 文本一致
"goal": f"你等待了{elapsed_time / 60:.1f}分钟,注意可能在对方看来聊天已经结束,思考接下来要做什么",
"reason": "对方话说一半消失了,很久没有回复", "reason": "对方话说一半消失了,很久没有回复",
} }
conversation_info.goal_list.append(wait_goal) conversation_info.goal_list.append(wait_goal)
print(f"添加目标: {wait_goal}") logger.info(f"添加目标: {wait_goal}")
return True # 返回 True 表示超时
return True await asyncio.sleep(5) # 每 5 秒检查一次
logger.info("倾听等待中...") # 同上,可以考虑注释掉
await asyncio.sleep(1)
logger.info("等待中...")

View File

@@ -184,7 +184,7 @@ response_max_sentence_num = 4 # 回复允许的最大句子数
[remote] #发送统计信息,主要是看全球有多少只麦麦 [remote] #发送统计信息,主要是看全球有多少只麦麦
enable = true enable = true
[experimental] #实验性功能,不一定完善或者根本不能用 [experimental] #实验性功能
enable_friend_chat = false # 是否启用好友聊天 enable_friend_chat = false # 是否启用好友聊天
pfc_chatting = false # 是否启用PFC聊天该功能仅作用于私聊与回复模式独立 pfc_chatting = false # 是否启用PFC聊天该功能仅作用于私聊与回复模式独立
@@ -273,3 +273,25 @@ name = "Qwen/Qwen2.5-32B-Instruct"
provider = "SILICONFLOW" provider = "SILICONFLOW"
pri_in = 1.26 pri_in = 1.26
pri_out = 1.26 pri_out = 1.26
#私聊PFC需要开启PFC功能默认三个模型均为硅基流动v3如果需要支持多人同时私聊或频繁调用建议把其中的一个或两个换成官方v3或其它模型以免撞到429
[model.llm_PFC_action_planner]
name = "Pro/deepseek-ai/DeepSeek-V3"
provider = "SILICONFLOW"
temp = 0.3
pri_in = 2
pri_out = 8
[model.llm_PFC_chat]
name = "Pro/deepseek-ai/DeepSeek-V3"
provider = "SILICONFLOW"
temp = 0.3
pri_in = 2
pri_out = 8
[model.llm_PFC_reply_checker]
name = "Pro/deepseek-ai/DeepSeek-V3"
provider = "SILICONFLOW"
pri_in = 2
pri_out = 8