Merge pull request #895 from MaiM-with-u/dev

Dev 0.6.3 update
This commit is contained in:
SengokuCola
2025-04-30 18:54:14 +08:00
committed by GitHub
172 changed files with 17559 additions and 8682 deletions

View File

@@ -1,182 +1,492 @@
from typing import Tuple
from src.common.logger import get_module_logger
from ..models.utils_model import LLM_request
from ..config.config import global_config
import time
from typing import Tuple, Optional # 增加了 Optional
from src.common.logger_manager import get_logger
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from .chat_observer import ChatObserver
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
logger = get_module_logger("action_planner")
from src.plugins.utils.chat_message_builder import build_readable_messages
class ActionPlannerInfo:
def __init__(self):
self.done_action = []
self.goal_list = []
self.knowledge_list = []
self.memory_list = []
logger = get_logger("pfc_action_planner")
# --- 定义 Prompt 模板 ---
# Prompt(1): 首次回复或非连续回复时的决策 Prompt
PROMPT_INITIAL_REPLY = """{persona_text}。现在你在参与一场QQ私聊请根据以下【所有信息】审慎且灵活的决策下一步行动可以回复可以倾听可以调取知识甚至可以屏蔽对方
【当前对话目标】
{goals_str}
{knowledge_info_str}
【最近行动历史概要】
{action_history_summary}
【上一次行动的详细情况和结果】
{last_action_context}
【时间和超时提示】
{time_since_last_bot_message_info}{timeout_context}
【最近的对话记录】(包括你已成功发送的消息 和 新收到的消息)
{chat_history_text}
------
可选行动类型以及解释:
fetch_knowledge: 需要调取知识或记忆,当需要专业知识或特定信息时选择,对方若提到你不太认识的人名或实体也可以尝试选择
listening: 倾听对方发言,当你认为对方话才说到一半,发言明显未结束时选择
direct_reply: 直接回复对方
rethink_goal: 思考一个对话目标,当你觉得目前对话需要目标,或当前目标不再适用,或话题卡住时选择。注意私聊的环境是灵活的,有可能需要经常选择
end_conversation: 结束对话,对方长时间没回复或者当你觉得对话告一段落时可以选择
block_and_ignore: 更加极端的结束对话方式,直接结束对话并在一段时间内无视对方所有发言(屏蔽),当对话让你感到十分不适,或你遭到各类骚扰时选择
请以JSON格式输出你的决策
{{
"action": "选择的行动类型 (必须是上面列表中的一个)",
"reason": "选择该行动的详细原因 (必须有解释你是如何根据“上一次行动结果”、“对话记录”和自身设定人设做出合理判断的)"
}}
注意请严格按照JSON格式输出不要包含任何其他内容。"""
# Prompt(2): 上一次成功回复后,决定继续发言时的决策 Prompt
PROMPT_FOLLOW_UP = """{persona_text}。现在你在参与一场QQ私聊刚刚你已经回复了对方请根据以下【所有信息】审慎且灵活的决策下一步行动可以继续发送新消息可以等待可以倾听可以调取知识甚至可以屏蔽对方
【当前对话目标】
{goals_str}
{knowledge_info_str}
【最近行动历史概要】
{action_history_summary}
【上一次行动的详细情况和结果】
{last_action_context}
【时间和超时提示】
{time_since_last_bot_message_info}{timeout_context}
【最近的对话记录】(包括你已成功发送的消息 和 新收到的消息)
{chat_history_text}
------
可选行动类型以及解释:
fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择,对方若提到你不太认识的人名或实体也可以尝试选择
wait: 暂时不说话,留给对方交互空间,等待对方回复(尤其是在你刚发言后、或上次发言因重复、发言过多被拒时、或不确定做什么时,这是不错的选择)
listening: 倾听对方发言(虽然你刚发过言,但如果对方立刻回复且明显话没说完,可以选择这个)
send_new_message: 发送一条新消息继续对话,允许适当的追问、补充、深入话题,或开启相关新话题。**但是避免在因重复被拒后立即使用,也不要在对方没有回复的情况下过多的“消息轰炸”或重复发言**
rethink_goal: 思考一个对话目标,当你觉得目前对话需要目标,或当前目标不再适用,或话题卡住时选择。注意私聊的环境是灵活的,有可能需要经常选择
end_conversation: 结束对话,对方长时间没回复或者当你觉得对话告一段落时可以选择
block_and_ignore: 更加极端的结束对话方式,直接结束对话并在一段时间内无视对方所有发言(屏蔽),当对话让你感到十分不适,或你遭到各类骚扰时选择
请以JSON格式输出你的决策
{{
"action": "选择的行动类型 (必须是上面列表中的一个)",
"reason": "选择该行动的详细原因 (必须有解释你是如何根据“上一次行动结果”、“对话记录”和自身设定人设做出合理判断的。请说明你为什么选择继续发言而不是等待,以及打算发送什么类型的新消息连续发言,必须记录已经发言了几次)"
}}
注意请严格按照JSON格式输出不要包含任何其他内容。"""
# 新增Prompt(3): 决定是否在结束对话前发送告别语
PROMPT_END_DECISION = """{persona_text}。刚刚你决定结束一场 QQ 私聊。
【你们之前的聊天记录】
{chat_history_text}
你觉得你们的对话已经完整结束了吗?有时候,在对话自然结束后再说点什么可能会有点奇怪,但有时也可能需要一条简短的消息来圆满结束。
如果觉得确实有必要再发一条简短、自然、符合你人设的告别消息(比如 "好,下次再聊~""嗯,先这样吧"),就输出 "yes"
如果觉得当前状态下直接结束对话更好,没有必要再发消息,就输出 "no"
请以 JSON 格式输出你的选择:
{{
"say_bye": "yes/no",
"reason": "选择 yes 或 no 的原因和内心想法 (简要说明)"
}}
注意:请严格按照 JSON 格式输出,不要包含任何其他内容。"""
# ActionPlanner 类定义,顶格
class ActionPlanner:
"""行动规划器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=1000,
def __init__(self, stream_id: str, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_PFC_action_planner,
temperature=global_config.llm_PFC_action_planner["temp"],
max_tokens=1500,
request_type="action_planning",
)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
self.personality_info = Individuality.get_instance().get_prompt(x_person=2, level=3)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
self.private_name = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
# self.action_planner_info = ActionPlannerInfo() # 移除未使用的变量
async def plan(self, observation_info: ObservationInfo, conversation_info: ConversationInfo) -> Tuple[str, str]:
# 修改 plan 方法签名,增加 last_successful_reply_action 参数
async def plan(
self,
observation_info: ObservationInfo,
conversation_info: ConversationInfo,
last_successful_reply_action: Optional[str],
) -> Tuple[str, str]:
"""规划下一步行动
Args:
observation_info: 决策信息
conversation_info: 对话信息
last_successful_reply_action: 上一次成功的回复动作类型 ('direct_reply''send_new_message' 或 None)
Returns:
Tuple[str, str]: (行动类型, 行动原因)
"""
# 构建提示词
logger.debug(f"开始规划行动:当前目标: {conversation_info.goal_list}")
# --- 获取 Bot 上次发言时间信息 ---
# (这部分逻辑不变)
time_since_last_bot_message_info = ""
try:
bot_id = str(global_config.BOT_QQ)
if hasattr(observation_info, "chat_history") and observation_info.chat_history:
for i in range(len(observation_info.chat_history) - 1, -1, -1):
msg = observation_info.chat_history[i]
if not isinstance(msg, dict):
continue
sender_info = msg.get("user_info", {})
sender_id = str(sender_info.get("user_id")) if isinstance(sender_info, dict) else None
msg_time = msg.get("time")
if sender_id == bot_id and msg_time:
time_diff = time.time() - msg_time
if time_diff < 60.0:
time_since_last_bot_message_info = (
f"提示:你上一条成功发送的消息是在 {time_diff:.1f} 秒前。\n"
)
break
else:
logger.debug(
f"[私聊][{self.private_name}]Observation info chat history is empty or not available for bot time check."
)
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo object might not have chat_history attribute yet for bot time check."
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]获取 Bot 上次发言时间时出错: {e}")
# 构建对话目标
# --- 获取超时提示信息 ---
# (这部分逻辑不变)
timeout_context = ""
try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
last_goal_dict = conversation_info.goal_list[-1]
if isinstance(last_goal_dict, dict) and "goal" in last_goal_dict:
last_goal_text = last_goal_dict["goal"]
if isinstance(last_goal_text, str) and "分钟,思考接下来要做什么" in last_goal_text:
try:
timeout_minutes_text = last_goal_text.split("")[0].replace("你等待了", "")
timeout_context = f"重要提示:对方已经长时间({timeout_minutes_text})没有回复你的消息了(这可能代表对方繁忙/不想回复/没注意到你的消息等情况,或在对方看来本次聊天已告一段落),请基于此情况规划下一步。\n"
except Exception:
timeout_context = "重要提示:对方已经长时间没有回复你的消息了(这可能代表对方繁忙/不想回复/没注意到你的消息等情况,或在对方看来本次聊天已告一段落),请基于此情况规划下一步。\n"
else:
logger.debug(
f"[私聊][{self.private_name}]Conversation info goal_list is empty or not available for timeout check."
)
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo object might not have goal_list attribute yet for timeout check."
)
except Exception as e:
logger.warning(f"[私聊][{self.private_name}]检查超时目标时出错: {e}")
# --- 构建通用 Prompt 参数 ---
logger.debug(
f"[私聊][{self.private_name}]开始规划行动:当前目标: {getattr(conversation_info, 'goal_list', '不可用')}"
)
# 构建对话目标 (goals_str)
goals_str = ""
if conversation_info.goal_list:
for goal_reason in conversation_info.goal_list:
# 处理字典或元组格式
if isinstance(goal_reason, tuple):
# 假设元组的第一个元素是目标,第二个元素是原因
goal = goal_reason[0]
reasoning = goal_reason[1] if len(goal_reason) > 1 else "没有明确原因"
elif isinstance(goal_reason, dict):
goal = goal_reason.get("goal")
reasoning = goal_reason.get("reasoning", "没有明确原因")
else:
# 如果是其他类型,尝试转为字符串
goal = str(goal_reason)
reasoning = "没有明确原因"
try:
if hasattr(conversation_info, "goal_list") and conversation_info.goal_list:
for goal_reason in conversation_info.goal_list:
if isinstance(goal_reason, dict):
goal = goal_reason.get("goal", "目标内容缺失")
reasoning = goal_reason.get("reasoning", "没有明确原因")
else:
goal = str(goal_reason)
reasoning = "没有明确原因"
goal_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
goals_str += goal_str
else:
goal = "目前没有明确对话目标"
reasoning = "目前没有明确对话目标,最好思考一个对话目标"
goals_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
goal = str(goal) if goal is not None else "目标内容缺失"
reasoning = str(reasoning) if reasoning is not None else "没有明确原因"
goals_str += f"- 目标:{goal}\n 原因:{reasoning}\n"
# 获取聊天历史记录
chat_history_list = (
observation_info.chat_history[-20:]
if len(observation_info.chat_history) >= 20
else observation_info.chat_history
)
if not goals_str:
goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
else:
goals_str = "- 目前没有明确对话目标,请考虑设定一个。\n"
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo object might not have goal_list attribute yet."
)
goals_str = "- 获取对话目标时出错。\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建对话目标字符串时出错: {e}")
goals_str = "- 构建对话目标时出错。\n"
# --- 知识信息字符串构建开始 ---
knowledge_info_str = "【已获取的相关知识和记忆】\n"
try:
# 检查 conversation_info 是否有 knowledge_list 并且不为空
if hasattr(conversation_info, "knowledge_list") and conversation_info.knowledge_list:
# 最多只显示最近的 5 条知识,防止 Prompt 过长
recent_knowledge = conversation_info.knowledge_list[-5:]
for i, knowledge_item in enumerate(recent_knowledge):
if isinstance(knowledge_item, dict):
query = knowledge_item.get("query", "未知查询")
knowledge = knowledge_item.get("knowledge", "无知识内容")
source = knowledge_item.get("source", "未知来源")
# 只取知识内容的前 2000 个字,避免太长
knowledge_snippet = knowledge[:2000] + "..." if len(knowledge) > 2000 else knowledge
knowledge_info_str += (
f"{i + 1}. 关于 '{query}' 的知识 (来源: {source}):\n {knowledge_snippet}\n"
)
else:
# 处理列表里不是字典的异常情况
knowledge_info_str += f"{i + 1}. 发现一条格式不正确的知识记录。\n"
if not recent_knowledge: # 如果 knowledge_list 存在但为空
knowledge_info_str += "- 暂无相关知识和记忆。\n"
else:
# 如果 conversation_info 没有 knowledge_list 属性,或者列表为空
knowledge_info_str += "- 暂无相关知识记忆。\n"
except AttributeError:
logger.warning(f"[私聊][{self.private_name}]ConversationInfo 对象可能缺少 knowledge_list 属性。")
knowledge_info_str += "- 获取知识列表时出错。\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建知识信息字符串时出错: {e}")
knowledge_info_str += "- 处理知识列表时出错。\n"
# --- 知识信息字符串构建结束 ---
# 获取聊天历史记录 (chat_history_text)
chat_history_text = ""
for msg in chat_history_list:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
try:
if hasattr(observation_info, "chat_history") and observation_info.chat_history:
chat_history_text = observation_info.chat_history_str
if not chat_history_text:
chat_history_text = "还没有聊天记录。\n"
else:
chat_history_text = "还没有聊天记录。\n"
if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
if hasattr(observation_info, "new_messages_count") and observation_info.new_messages_count > 0:
if hasattr(observation_info, "unprocessed_messages") and observation_info.unprocessed_messages:
new_messages_list = observation_info.unprocessed_messages
new_messages_str = await build_readable_messages(
new_messages_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += (
f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}"
)
else:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo has new_messages_count > 0 but unprocessed_messages is empty or missing."
)
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo object might be missing expected attributes for chat history."
)
chat_history_text = "获取聊天记录时出错。\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]处理聊天记录时发生未知错误: {e}")
chat_history_text = "处理聊天记录时出错。\n"
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
# 构建 Persona 文本 (persona_text)
persona_text = f"你的名字是{self.name}{self.personality_info}"
observation_info.clear_unprocessed_messages()
# 构建行动历史和上一次行动结果 (action_history_summary, last_action_context)
# (这部分逻辑不变)
action_history_summary = "你最近执行的行动历史:\n"
last_action_context = "关于你【上一次尝试】的行动:\n"
action_history_list = []
try:
if hasattr(conversation_info, "done_action") and conversation_info.done_action:
action_history_list = conversation_info.done_action[-5:]
else:
logger.debug(f"[私聊][{self.private_name}]Conversation info done_action is empty or not available.")
except AttributeError:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo object might not have done_action attribute yet."
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]访问行动历史时出错: {e}")
personality_text = f"你的名字是{self.name}{self.personality_info}"
if not action_history_list:
action_history_summary += "- 还没有执行过行动。\n"
last_action_context += "- 这是你规划的第一个行动。\n"
else:
for i, action_data in enumerate(action_history_list):
action_type = "未知"
plan_reason = "未知"
status = "未知"
final_reason = ""
action_time = ""
# 构建action历史文本
action_history_list = (
conversation_info.done_action[-10:]
if len(conversation_info.done_action) >= 10
else conversation_info.done_action
if isinstance(action_data, dict):
action_type = action_data.get("action", "未知")
plan_reason = action_data.get("plan_reason", "未知规划原因")
status = action_data.get("status", "未知")
final_reason = action_data.get("final_reason", "")
action_time = action_data.get("time", "")
elif isinstance(action_data, tuple):
# 假设旧格式兼容
if len(action_data) > 0:
action_type = action_data[0]
if len(action_data) > 1:
plan_reason = action_data[1] # 可能是规划原因或最终原因
if len(action_data) > 2:
status = action_data[2]
if status == "recall" and len(action_data) > 3:
final_reason = action_data[3]
elif status == "done" and action_type in ["direct_reply", "send_new_message"]:
plan_reason = "成功发送" # 简化显示
reason_text = f", 失败/取消原因: {final_reason}" if final_reason else ""
summary_line = f"- 时间:{action_time}, 尝试行动:'{action_type}', 状态:{status}{reason_text}"
action_history_summary += summary_line + "\n"
if i == len(action_history_list) - 1:
last_action_context += f"- 上次【规划】的行动是: '{action_type}'\n"
last_action_context += f"- 当时规划的【原因】是: {plan_reason}\n"
if status == "done":
last_action_context += "- 该行动已【成功执行】。\n"
# 记录这次成功的行动类型,供下次决策
# self.last_successful_action_type = action_type # 不在这里记录,由 conversation 控制
elif status == "recall":
last_action_context += "- 但该行动最终【未能执行/被取消】。\n"
if final_reason:
last_action_context += f"- 【重要】失败/取消的具体原因是: “{final_reason}\n"
else:
last_action_context += "- 【重要】失败/取消原因未明确记录。\n"
# self.last_successful_action_type = None # 行动失败,清除记录
else:
last_action_context += f"- 该行动当前状态: {status}\n"
# self.last_successful_action_type = None # 非完成状态,清除记录
# --- 选择 Prompt ---
if last_successful_reply_action in ["direct_reply", "send_new_message"]:
prompt_template = PROMPT_FOLLOW_UP
logger.debug(f"[私聊][{self.private_name}]使用 PROMPT_FOLLOW_UP (追问决策)")
else:
prompt_template = PROMPT_INITIAL_REPLY
logger.debug(f"[私聊][{self.private_name}]使用 PROMPT_INITIAL_REPLY (首次/非连续回复决策)")
# --- 格式化最终的 Prompt ---
prompt = prompt_template.format(
persona_text=persona_text,
goals_str=goals_str if goals_str.strip() else "- 目前没有明确对话目标,请考虑设定一个。",
action_history_summary=action_history_summary,
last_action_context=last_action_context,
time_since_last_bot_message_info=time_since_last_bot_message_info,
timeout_context=timeout_context,
chat_history_text=chat_history_text if chat_history_text.strip() else "还没有聊天记录。",
knowledge_info_str=knowledge_info_str,
)
action_history_text = "你之前做的事情是:"
for action in action_history_list:
if isinstance(action, dict):
action_type = action.get("action")
action_reason = action.get("reason")
action_status = action.get("status")
if action_status == "recall":
action_history_text += (
f"原本打算:{action_type},但是因为有新消息,你发现这个行动不合适,所以你没做\n"
)
elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
elif isinstance(action, tuple):
# 假设元组的格式是(action_type, action_reason, action_status)
action_type = action[0] if len(action) > 0 else "未知行动"
action_reason = action[1] if len(action) > 1 else "未知原因"
action_status = action[2] if len(action) > 2 else "done"
if action_status == "recall":
action_history_text += (
f"原本打算:{action_type},但是因为有新消息,你发现这个行动不合适,所以你没做\n"
)
elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动
当前对话目标:{goals_str}
{action_history_text}
最近的对话记录:
{chat_history_text}
请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言:
行动类型:
fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择
wait: 当你做出了发言,对方尚未回复时暂时等待对方的回复
listening: 倾听对方发言,当你认为对方发言尚未结束时采用
direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言
rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标
end_conversation: 结束对话,长时间没回复或者当你觉得谈话暂时结束时选择,停止该场对话
请以JSON格式输出包含以下字段
1. action: 行动类型,注意你之前的行为
2. reason: 选择该行动的原因,注意你之前的行为(简要解释)
注意请严格按照JSON格式输出不要包含任何其他内容。"""
logger.debug(f"发送到LLM的提示词: {prompt}")
logger.debug(f"[私聊][{self.private_name}]发送到LLM的最终提示词:\n------\n{prompt}\n------")
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
logger.debug(f"[私聊][{self.private_name}]LLM (行动规划) 原始返回内容: {content}")
# 使用简化函数提取JSON内容
success, result = get_items_from_json(
content, "action", "reason", default_values={"action": "direct_reply", "reason": "没有明确原因"}
# --- 初始行动规划解析 ---
success, initial_result = get_items_from_json(
content,
self.private_name,
"action",
"reason",
default_values={"action": "wait", "reason": "LLM返回格式错误或未提供原因默认等待"},
)
if not success:
return "direct_reply", "JSON解析失败选择直接回复"
initial_action = initial_result.get("action", "wait")
initial_reason = initial_result.get("reason", "LLM未提供原因默认等待")
action = result["action"]
reason = result["reason"]
# 检查是否需要进行结束对话决策 ---
if initial_action == "end_conversation":
logger.info(f"[私聊][{self.private_name}]初步规划结束对话,进入告别决策...")
# 验证action类型
if action not in [
"direct_reply",
"fetch_knowledge",
"wait",
"listening",
"rethink_goal",
"end_conversation",
]:
logger.warning(f"未知的行动类型: {action}默认使用listening")
action = "listening"
# 使用新的 PROMPT_END_DECISION
end_decision_prompt = PROMPT_END_DECISION.format(
persona_text=persona_text, # 复用之前的 persona_text
chat_history_text=chat_history_text, # 复用之前的 chat_history_text
)
logger.info(f"规划的行动: {action}")
logger.info(f"行动原因: {reason}")
return action, reason
logger.debug(
f"[私聊][{self.private_name}]发送到LLM的结束决策提示词:\n------\n{end_decision_prompt}\n------"
)
try:
end_content, _ = await self.llm.generate_response_async(end_decision_prompt) # 再次调用LLM
logger.debug(f"[私聊][{self.private_name}]LLM (结束决策) 原始返回内容: {end_content}")
# 解析结束决策的JSON
end_success, end_result = get_items_from_json(
end_content,
self.private_name,
"say_bye",
"reason",
default_values={"say_bye": "no", "reason": "结束决策LLM返回格式错误默认不告别"},
required_types={"say_bye": str, "reason": str}, # 明确类型
)
say_bye_decision = end_result.get("say_bye", "no").lower() # 转小写方便比较
end_decision_reason = end_result.get("reason", "未提供原因")
if end_success and say_bye_decision == "yes":
# 决定要告别,返回新的 'say_goodbye' 动作
logger.info(
f"[私聊][{self.private_name}]结束决策: yes, 准备生成告别语. 原因: {end_decision_reason}"
)
# 注意:这里的 reason 可以考虑拼接初始原因和结束决策原因,或者只用结束决策原因
final_action = "say_goodbye"
final_reason = f"决定发送告别语。决策原因: {end_decision_reason} (原结束理由: {initial_reason})"
return final_action, final_reason
else:
# 决定不告别 (包括解析失败或明确说no)
logger.info(
f"[私聊][{self.private_name}]结束决策: no, 直接结束对话. 原因: {end_decision_reason}"
)
# 返回原始的 'end_conversation' 动作
final_action = "end_conversation"
final_reason = initial_reason # 保持原始的结束理由
return final_action, final_reason
except Exception as end_e:
logger.error(f"[私聊][{self.private_name}]调用结束决策LLM或处理结果时出错: {str(end_e)}")
# 出错时,默认执行原始的结束对话
logger.warning(f"[私聊][{self.private_name}]结束决策出错,将按原计划执行 end_conversation")
return "end_conversation", initial_reason # 返回原始动作和原因
else:
action = initial_action
reason = initial_reason
# 验证action类型 (保持不变)
valid_actions = [
"direct_reply",
"send_new_message",
"fetch_knowledge",
"wait",
"listening",
"rethink_goal",
"end_conversation", # 仍然需要验证,因为可能从上面决策后返回
"block_and_ignore",
"say_goodbye", # 也要验证这个新动作
]
if action not in valid_actions:
logger.warning(f"[私聊][{self.private_name}]LLM返回了未知的行动类型: '{action}',强制改为 wait")
reason = f"(原始行动'{action}'无效已强制改为wait) {reason}"
action = "wait"
logger.info(f"[私聊][{self.private_name}]规划的行动: {action}")
logger.info(f"[私聊][{self.private_name}]行动原因: {reason}")
return action, reason
except Exception as e:
logger.error(f"规划行动时出错: {str(e)}")
return "direct_reply", "发生错误,选择直接回复"
# 外层异常处理保持不变
logger.error(f"[私聊][{self.private_name}]规划行动时调用 LLM 或处理结果出错: {str(e)}")
return "wait", f"行动规划处理中发生错误,暂时等待: {str(e)}"

View File

@@ -3,8 +3,8 @@ import asyncio
import traceback
from typing import Optional, Dict, Any, List
from src.common.logger import get_module_logger
from ..message.message_base import UserInfo
from ..config.config import global_config
from maim_message import UserInfo
from ...config.config import global_config
from .chat_states import NotificationManager, create_new_message_notification, create_cold_chat_notification
from .message_storage import MongoDBMessageStorage
@@ -18,7 +18,7 @@ class ChatObserver:
_instances: Dict[str, "ChatObserver"] = {}
@classmethod
def get_instance(cls, stream_id: str) -> "ChatObserver":
def get_instance(cls, stream_id: str, private_name: str) -> "ChatObserver":
"""获取或创建观察器实例
Args:
@@ -28,10 +28,10 @@ class ChatObserver:
ChatObserver: 观察器实例
"""
if stream_id not in cls._instances:
cls._instances[stream_id] = cls(stream_id)
cls._instances[stream_id] = cls(stream_id, private_name)
return cls._instances[stream_id]
def __init__(self, stream_id: str):
def __init__(self, stream_id: str, private_name: str):
"""初始化观察器
Args:
@@ -41,6 +41,7 @@ class ChatObserver:
raise RuntimeError(f"ChatObserver for {stream_id} already exists. Use get_instance() instead.")
self.stream_id = stream_id
self.private_name = private_name
self.message_storage = MongoDBMessageStorage()
# self.last_user_speak_time: Optional[float] = None # 对方上次发言时间
@@ -76,12 +77,12 @@ class ChatObserver:
Returns:
bool: 是否有新消息
"""
logger.debug(f"检查距离上一次观察之后是否有了新消息: {self.last_check_time}")
logger.debug(f"[私聊][{self.private_name}]检查距离上一次观察之后是否有了新消息: {self.last_check_time}")
new_message_exists = await self.message_storage.has_new_messages(self.stream_id, self.last_check_time)
if new_message_exists:
logger.debug("发现新消息")
logger.debug(f"[私聊][{self.private_name}]发现新消息")
self.last_check_time = time.time()
return new_message_exists
@@ -94,15 +95,13 @@ class ChatObserver:
"""
try:
# 发送新消息通知
# logger.info(f"发送新ccchandleer消息通知: {message}")
notification = create_new_message_notification(
sender="chat_observer", target="observation_info", message=message
)
# logger.info(f"发送新消ddddd息通知: {notification}")
# print(self.notification_manager)
await self.notification_manager.send_notification(notification)
except Exception as e:
logger.error(f"添加消息到历史记录时出错: {e}")
logger.error(f"[私聊][{self.private_name}]添加消息到历史记录时出错: {e}")
print(traceback.format_exc())
# 检查并更新冷场状态
@@ -142,11 +141,13 @@ class ChatObserver:
"""
if self.last_message_time is None:
logger.debug("没有最后消息时间,返回 False")
logger.debug(f"[私聊][{self.private_name}]没有最后消息时间,返回 False")
return False
has_new = self.last_message_time > time_point
logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point} = {has_new}")
logger.debug(
f"[私聊][{self.private_name}]判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point} = {has_new}"
)
return has_new
def get_message_history(
@@ -215,7 +216,7 @@ class ChatObserver:
if new_messages:
self.last_message_read = new_messages[-1]["message_id"]
logger.debug(f"获取指定时间点111之前的消息: {new_messages}")
logger.debug(f"[私聊][{self.private_name}]获取指定时间点111之前的消息: {new_messages}")
return new_messages
@@ -228,9 +229,9 @@ class ChatObserver:
# messages = await self._fetch_new_messages_before(start_time)
# for message in messages:
# await self._add_message_to_history(message)
# logger.debug(f"缓冲消息: {messages}")
# logger.debug(f"[私聊][{self.private_name}]缓冲消息: {messages}")
# except Exception as e:
# logger.error(f"缓冲消息出错: {e}")
# logger.error(f"[私聊][{self.private_name}]缓冲消息出错: {e}")
while self._running:
try:
@@ -258,8 +259,8 @@ class ChatObserver:
self._update_complete.set()
except Exception as e:
logger.error(f"更新循环出错: {e}")
logger.error(traceback.format_exc())
logger.error(f"[私聊][{self.private_name}]更新循环出错: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
self._update_complete.set() # 即使出错也要设置完成事件
def trigger_update(self):
@@ -279,7 +280,7 @@ class ChatObserver:
await asyncio.wait_for(self._update_complete.wait(), timeout=timeout)
return True
except asyncio.TimeoutError:
logger.warning(f"等待更新完成超时({timeout}秒)")
logger.warning(f"[私聊][{self.private_name}]等待更新完成超时({timeout}秒)")
return False
def start(self):
@@ -289,7 +290,7 @@ class ChatObserver:
self._running = True
self._task = asyncio.create_task(self._update_loop())
logger.info(f"ChatObserver for {self.stream_id} started")
logger.debug(f"[私聊][{self.private_name}]ChatObserver for {self.stream_id} started")
def stop(self):
"""停止观察器"""
@@ -298,7 +299,7 @@ class ChatObserver:
self._update_complete.set() # 设置完成事件以解除等待
if self._task:
self._task.cancel()
logger.info(f"ChatObserver for {self.stream_id} stopped")
logger.debug(f"[私聊][{self.private_name}]ChatObserver for {self.stream_id} stopped")
async def process_chat_history(self, messages: list):
"""处理聊天历史
@@ -316,7 +317,7 @@ class ChatObserver:
else:
self.update_user_speak_time(msg["time"])
except Exception as e:
logger.warning(f"处理消息时间时出错: {e}")
logger.warning(f"[私聊][{self.private_name}]处理消息时间时出错: {e}")
continue
def update_check_time(self):
@@ -355,7 +356,7 @@ class ChatObserver:
Returns:
List[Dict[str, Any]]: 缓存的消息历史列表
"""
return self.message_cache[:limit]
return self.message_cache[-limit:]
def get_last_message(self) -> Optional[Dict[str, Any]]:
"""获取最后一条消息
@@ -365,7 +366,7 @@ class ChatObserver:
"""
if not self.message_cache:
return None
return self.message_cache[0]
return self.message_cache[-1]
def __str__(self):
return f"ChatObserver for {self.stream_id}"

View File

@@ -98,15 +98,11 @@ class NotificationManager:
notification_type: 要处理的通知类型
handler: 处理器实例
"""
print(1145145511114445551111444)
if target not in self._handlers:
# print("没11有target")
self._handlers[target] = {}
if notification_type not in self._handlers[target]:
# print("没11有notification_type")
self._handlers[target][notification_type] = []
# print(self._handlers[target][notification_type])
# print(f"注册1111111111111111111111处理器: {target} {notification_type} {handler}")
self._handlers[target][notification_type].append(handler)
# print(self._handlers[target][notification_type])
@@ -132,7 +128,6 @@ class NotificationManager:
async def send_notification(self, notification: Notification):
"""发送通知"""
self._notification_history.append(notification)
# print("kaishichul-----------------------------------i")
# 如果是状态通知,更新活跃状态
if isinstance(notification, StateNotification):
@@ -145,10 +140,9 @@ class NotificationManager:
target = notification.target
if target in self._handlers:
handlers = self._handlers[target].get(notification.type, [])
# print(1111111)
print(handlers)
# print(handlers)
for handler in handlers:
print(f"调用处理器: {handler}")
# print(f"调用处理器: {handler}")
await handler.handle_notification(notification)
def get_active_states(self) -> Set[NotificationType]:

View File

@@ -1,37 +1,46 @@
import time
import asyncio
import datetime
from typing import Dict, Any
# from .message_storage import MongoDBMessageStorage
from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
# from ...config.config import global_config
from typing import Dict, Any, Optional
from ..chat.message import Message
from .pfc_types import ConversationState
from .pfc import ChatObserver, GoalAnalyzer, DirectMessageSender
from src.common.logger import get_module_logger
from .pfc import ChatObserver, GoalAnalyzer
from .message_sender import DirectMessageSender
from src.common.logger_manager import get_logger
from .action_planner import ActionPlanner
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from .conversation_info import ConversationInfo # 确保导入 ConversationInfo
from .reply_generator import ReplyGenerator
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo
from maim_message import UserInfo
from src.plugins.chat.chat_stream import chat_manager
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .waiter import Waiter
import traceback
logger = get_module_logger("pfc_conversation")
logger = get_logger("pfc")
class Conversation:
"""对话类,负责管理单个对话的状态和行为"""
def __init__(self, stream_id: str):
def __init__(self, stream_id: str, private_name: str):
"""初始化对话实例
Args:
stream_id: 聊天流ID
"""
self.stream_id = stream_id
self.private_name = private_name
self.state = ConversationState.INIT
self.should_continue = False
self.ignore_until_timestamp: Optional[float] = None
# 回复相关
self.generated_reply = ""
@@ -40,37 +49,76 @@ class Conversation:
"""初始化实例,注册所有组件"""
try:
self.action_planner = ActionPlanner(self.stream_id)
self.goal_analyzer = GoalAnalyzer(self.stream_id)
self.reply_generator = ReplyGenerator(self.stream_id)
self.knowledge_fetcher = KnowledgeFetcher()
self.waiter = Waiter(self.stream_id)
self.direct_sender = DirectMessageSender()
self.action_planner = ActionPlanner(self.stream_id, self.private_name)
self.goal_analyzer = GoalAnalyzer(self.stream_id, self.private_name)
self.reply_generator = ReplyGenerator(self.stream_id, self.private_name)
self.knowledge_fetcher = KnowledgeFetcher(self.private_name)
self.waiter = Waiter(self.stream_id, self.private_name)
self.direct_sender = DirectMessageSender(self.private_name)
# 获取聊天流信息
self.chat_stream = chat_manager.get_stream(self.stream_id)
self.stop_action_planner = False
except Exception as e:
logger.error(f"初始化对话实例:注册运行组件失败: {e}")
logger.error(traceback.format_exc())
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册运行组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
raise
try:
# 决策所需要的信息,包括自身自信和观察信息两部分
# 注册观察器和观测信息
self.chat_observer = ChatObserver.get_instance(self.stream_id)
self.chat_observer = ChatObserver.get_instance(self.stream_id, self.private_name)
self.chat_observer.start()
self.observation_info = ObservationInfo()
self.observation_info = ObservationInfo(self.private_name)
self.observation_info.bind_to_chat_observer(self.chat_observer)
# print(self.chat_observer.get_cached_messages(limit=)
self.conversation_info = ConversationInfo()
except Exception as e:
logger.error(f"初始化对话实例:注册信息组件失败: {e}")
logger.error(traceback.format_exc())
logger.error(f"[私聊][{self.private_name}]初始化对话实例:注册信息组件失败: {e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
raise
try:
logger.info(f"[私聊][{self.private_name}]为 {self.stream_id} 加载初始聊天记录...")
initial_messages = get_raw_msg_before_timestamp_with_chat( #
chat_id=self.stream_id,
timestamp=time.time(),
limit=30, # 加载最近30条作为初始上下文可以调整
)
chat_talking_prompt = await build_readable_messages(
initial_messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
if initial_messages:
# 将加载的消息填充到 ObservationInfo 的 chat_history
self.observation_info.chat_history = initial_messages
self.observation_info.chat_history_str = chat_talking_prompt + "\n"
self.observation_info.chat_history_count = len(initial_messages)
# 更新 ObservationInfo 中的时间戳等信息
last_msg = initial_messages[-1]
self.observation_info.last_message_time = last_msg.get("time")
last_user_info = UserInfo.from_dict(last_msg.get("user_info", {}))
self.observation_info.last_message_sender = last_user_info.user_id
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
logger.info(
f"[私聊][{self.private_name}]成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}"
)
# 让 ChatObserver 从加载的最后一条消息之后开始同步
self.chat_observer.last_message_time = self.observation_info.last_message_time
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
else:
logger.info(f"[私聊][{self.private_name}]没有找到初始聊天记录。")
except Exception as load_err:
logger.error(f"[私聊][{self.private_name}]加载初始聊天记录时出错: {load_err}")
# 出错也要继续,只是没有历史记录而已
# 组件准备完成,启动该论对话
self.should_continue = True
asyncio.create_task(self.start())
@@ -78,142 +126,562 @@ class Conversation:
async def start(self):
"""开始对话流程"""
try:
logger.info("对话系统启动中...")
logger.info(f"[私聊][{self.private_name}]对话系统启动中...")
asyncio.create_task(self._plan_and_action_loop())
except Exception as e:
logger.error(f"启动对话系统失败: {e}")
logger.error(f"[私聊][{self.private_name}]启动对话系统失败: {e}")
raise
async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块"""
# 获取最近的消息历史
while self.should_continue:
# 使用决策信息来辅助行动规划
action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info)
if self._check_new_messages_after_planning():
# 忽略逻辑
if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp:
await asyncio.sleep(30)
continue
elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp:
logger.info(f"[私聊][{self.private_name}]忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None
self.should_continue = False
continue
try:
# --- 在规划前记录当前新消息数量 ---
initial_new_message_count = 0
if hasattr(self.observation_info, "new_messages_count"):
initial_new_message_count = self.observation_info.new_messages_count + 1 # 算上麦麦自己发的那一条
else:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' before planning."
)
# 执行行动
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
# --- 调用 Action Planner ---
# 传递 self.conversation_info.last_successful_reply_action
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info, self.conversation_info.last_successful_reply_action
)
for goal in self.conversation_info.goal_list:
# 检查goal是否为元组类型如果是元组则使用索引访问如果是字典则使用get方法
if isinstance(goal, tuple):
# 假设元组的第一个元素是目标内容
print(f"goal: {goal}")
if goal[0] == "结束对话":
self.should_continue = False
break
# --- 规划后检查是否有 *更多* 新消息到达 ---
current_new_message_count = 0
if hasattr(self.observation_info, "new_messages_count"):
current_new_message_count = self.observation_info.new_messages_count
else:
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo missing 'new_messages_count' after planning."
)
if current_new_message_count > initial_new_message_count + 2:
logger.info(
f"[私聊][{self.private_name}]规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划"
)
# 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了
self.conversation_info.last_successful_reply_action = None
await asyncio.sleep(0.1)
continue
# 包含 send_new_message
if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]:
if hasattr(self.observation_info, "clear_unprocessed_messages"):
logger.debug(
f"[私聊][{self.private_name}]准备执行 {action},清理 {initial_new_message_count} 条规划时已知的新消息。"
)
await self.observation_info.clear_unprocessed_messages()
if hasattr(self.observation_info, "new_messages_count"):
self.observation_info.new_messages_count = 0
else:
logger.error(
f"[私聊][{self.private_name}]无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!"
)
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
# 检查是否需要结束对话 (逻辑不变)
goal_ended = False
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
for goal_item in self.conversation_info.goal_list:
if isinstance(goal_item, dict):
current_goal = goal_item.get("goal")
if current_goal == "结束对话":
goal_ended = True
break
if goal_ended:
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]检测到'结束对话'目标,停止循环。")
except Exception as loop_err:
logger.error(f"[私聊][{self.private_name}]PFC主循环出错: {loop_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
await asyncio.sleep(1)
if self.should_continue:
await asyncio.sleep(0.1)
logger.info(f"[私聊][{self.private_name}]PFC 循环结束 for stream_id: {self.stream_id}")
def _check_new_messages_after_planning(self):
"""检查在规划后是否有新消息"""
if self.observation_info.new_messages_count > 0:
logger.info(f"发现{self.observation_info.new_messages_count}条新消息,可能需要重新考虑行动")
# 如果需要,可以在这里添加逻辑来根据新消息重新决定行动
# 检查 ObservationInfo 是否已初始化并且有 new_messages_count 属性
if not hasattr(self, "observation_info") or not hasattr(self.observation_info, "new_messages_count"):
logger.warning(
f"[私聊][{self.private_name}]ObservationInfo 未初始化或缺少 'new_messages_count' 属性,无法检查新消息。"
)
return False # 或者根据需要抛出错误
if self.observation_info.new_messages_count > 2:
logger.info(
f"[私聊][{self.private_name}]生成/执行动作期间收到 {self.observation_info.new_messages_count} 条新消息,取消当前动作并重新规划"
)
# 如果有新消息,也应该重置上次回复状态
if hasattr(self, "conversation_info"): # 确保 conversation_info 已初始化
self.conversation_info.last_successful_reply_action = None
else:
logger.warning(
f"[私聊][{self.private_name}]ConversationInfo 未初始化,无法重置 last_successful_reply_action。"
)
return True
return False
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
chat_info = msg_dict.get("chat_info", {})
chat_stream = ChatStream.from_dict(chat_info)
# 尝试从 msg_dict 直接获取 chat_stream如果失败则从全局 chat_manager 获取
chat_info = msg_dict.get("chat_info")
if chat_info and isinstance(chat_info, dict):
chat_stream = ChatStream.from_dict(chat_info)
elif self.chat_stream: # 使用实例变量中的 chat_stream
chat_stream = self.chat_stream
else: # Fallback: 尝试从 manager 获取 (可能需要 stream_id)
chat_stream = chat_manager.get_stream(self.stream_id)
if not chat_stream:
raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}")
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
return Message(
message_id=msg_dict["message_id"],
chat_stream=chat_stream,
time=msg_dict["time"],
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID
chat_stream=chat_stream, # 使用确定的 chat_stream
time=msg_dict.get("time", time.time()), # 提供默认时间
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
)
except Exception as e:
logger.warning(f"转换消息时出错: {e}")
raise
logger.warning(f"[私聊][{self.private_name}]转换消息时出错: {e}")
# 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
async def _handle_action(
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo
):
"""处理规划的行动"""
logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史先设置为stop完成后再设置为done
conversation_info.done_action.append(
{
"action": action,
"reason": reason,
"status": "start",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
logger.debug(f"[私聊][{self.private_name}]执行行动: {action}, 原因: {reason}")
if action == "direct_reply":
self.waiter.wait_accumulated_time = 0
# 记录action历史 (逻辑不变)
current_action_record = {
"action": action,
"plan_reason": reason,
"status": "start",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
# 确保 done_action 列表存在
if not hasattr(conversation_info, "done_action"):
conversation_info.done_action = []
conversation_info.done_action.append(current_action_record)
action_index = len(conversation_info.done_action) - 1
self.state = ConversationState.GENERATING
self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info)
print(f"生成回复: {self.generated_reply}")
action_successful = False # 用于标记动作是否成功完成
# # 检查回复是否合适
# is_suitable, reason, need_replan = await self.reply_generator.check_reply(
# self.generated_reply,
# self.current_goal
# )
# --- 根据不同的 action 执行 ---
if self._check_new_messages_after_planning():
logger.info("333333发现新消息重新考虑行动")
conversation_info.done_action[-1].update(
{
"status": "recall",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
# send_new_message 失败后执行 wait
if action == "send_new_message":
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
check_reason = "未进行尝试"
final_reply_to_send = ""
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
logger.info(
f"[私聊][{self.private_name}]尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
)
return None
self.state = ConversationState.GENERATING
await self._send_reply()
# 1. 生成回复 (调用 generate 时传入 action_type)
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="send_new_message"
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的追问回复: {self.generated_reply}"
)
conversation_info.done_action[-1].update(
# 2. 检查回复 (逻辑不变)
self.state = ConversationState.CHECKING
try:
current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else ""
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history,
chat_history_str=observation_info.chat_history_str,
retry_count=reply_attempt_count - 1,
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
if is_suitable:
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次追问检查建议重新规划,停止尝试。原因: {check_reason}"
)
break
except Exception as check_err:
logger.error(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
break
# 循环结束,处理最终结果
if is_suitable:
# 检查是否有新消息
if self._check_new_messages_after_planning():
logger.info(f"[私聊][{self.private_name}]生成追问回复期间收到新消息,取消发送,重新规划行动")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"}
)
return # 直接返回,重新规划
# 发送合适的回复
self.generated_reply = final_reply_to_send
# --- 在这里调用 _send_reply ---
await self._send_reply() # <--- 调用恢复后的函数
# 更新状态: 标记上次成功是 send_new_message
self.conversation_info.last_successful_reply_action = "send_new_message"
action_successful = True # 标记动作成功
elif need_replan:
# 打回动作决策
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,追问回复决定打回动作决策。打回原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后打回: {check_reason}"}
)
else:
# 追问失败
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"}
)
# 重置状态: 追问失败,下次用初始 prompt
self.conversation_info.last_successful_reply_action = None
# 执行 Wait 操作
logger.info(f"[私聊][{self.private_name}]由于无法生成合适追问回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
await self.waiter.wait(self.conversation_info)
wait_action_record = {
"action": "wait",
"plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待",
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
elif action == "direct_reply":
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
check_reason = "未进行尝试"
final_reply_to_send = ""
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
logger.info(
f"[私聊][{self.private_name}]尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)..."
)
self.state = ConversationState.GENERATING
# 1. 生成回复
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="direct_reply"
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次生成的首次回复: {self.generated_reply}"
)
# 2. 检查回复
self.state = ConversationState.CHECKING
try:
current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else ""
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history,
chat_history_str=observation_info.chat_history_str,
retry_count=reply_attempt_count - 1,
)
logger.info(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
if is_suitable:
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}"
)
break
except Exception as check_err:
logger.error(
f"[私聊][{self.private_name}]第 {reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}"
)
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
break
# 循环结束,处理最终结果
if is_suitable:
# 检查是否有新消息
if self._check_new_messages_after_planning():
logger.info(f"[私聊][{self.private_name}]生成首次回复期间收到新消息,取消发送,重新规划行动")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"有新消息,取消发送首次回复: {final_reply_to_send}"}
)
return # 直接返回,重新规划
# 发送合适的回复
self.generated_reply = final_reply_to_send
# --- 在这里调用 _send_reply ---
await self._send_reply() # <--- 调用恢复后的函数
# 更新状态: 标记上次成功是 direct_reply
self.conversation_info.last_successful_reply_action = "direct_reply"
action_successful = True # 标记动作成功
elif need_replan:
# 打回动作决策
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,首次回复决定打回动作决策。打回原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后打回: {check_reason}"}
)
else:
# 首次回复失败
logger.warning(
f"[私聊][{self.private_name}]经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}"
)
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"}
)
# 重置状态: 首次回复失败,下次还是用初始 prompt
self.conversation_info.last_successful_reply_action = None
# 执行 Wait 操作 (保持原有逻辑)
logger.info(f"[私聊][{self.private_name}]由于无法生成合适首次回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
await self.waiter.wait(self.conversation_info)
wait_action_record = {
"action": "wait",
"plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待",
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
elif action == "fetch_knowledge":
self.state = ConversationState.FETCHING
knowledge_query = reason
try:
# 检查 knowledge_fetcher 是否存在
if not hasattr(self, "knowledge_fetcher"):
logger.error(f"[私聊][{self.private_name}]KnowledgeFetcher 未初始化,无法获取知识。")
raise AttributeError("KnowledgeFetcher not initialized")
knowledge, source = await self.knowledge_fetcher.fetch(knowledge_query, observation_info.chat_history)
logger.info(f"[私聊][{self.private_name}]获取到知识: {knowledge[:100]}..., 来源: {source}")
if knowledge:
# 确保 knowledge_list 存在
if not hasattr(conversation_info, "knowledge_list"):
conversation_info.knowledge_list = []
conversation_info.knowledge_list.append(
{"query": knowledge_query, "knowledge": knowledge, "source": source}
)
action_successful = True
except Exception as fetch_err:
logger.error(f"[私聊][{self.private_name}]获取知识时出错: {str(fetch_err)}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"获取知识失败: {str(fetch_err)}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
try:
# 检查 goal_analyzer 是否存在
if not hasattr(self, "goal_analyzer"):
logger.error(f"[私聊][{self.private_name}]GoalAnalyzer 未初始化,无法重新思考目标。")
raise AttributeError("GoalAnalyzer not initialized")
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
action_successful = True
except Exception as rethink_err:
logger.error(f"[私聊][{self.private_name}]重新思考目标时出错: {rethink_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info(f"[私聊][{self.private_name}]倾听对方发言...")
try:
# 检查 waiter 是否存在
if not hasattr(self, "waiter"):
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法倾听。")
raise AttributeError("Waiter not initialized")
await self.waiter.wait_listening(conversation_info)
action_successful = True # Listening 完成就算成功
except Exception as listen_err:
logger.error(f"[私聊][{self.private_name}]倾听时出错: {listen_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "say_goodbye":
self.state = ConversationState.GENERATING # 也可以定义一个新的状态,如 ENDING
logger.info(f"[私聊][{self.private_name}]执行行动: 生成并发送告别语...")
try:
# 1. 生成告别语 (使用 'say_goodbye' action_type)
self.generated_reply = await self.reply_generator.generate(
observation_info, conversation_info, action_type="say_goodbye"
)
logger.info(f"[私聊][{self.private_name}]生成的告别语: {self.generated_reply}")
# 2. 直接发送告别语 (不经过检查)
if self.generated_reply: # 确保生成了内容
await self._send_reply() # 调用发送方法
# 发送成功后,标记动作成功
action_successful = True
logger.info(f"[私聊][{self.private_name}]告别语已发送。")
else:
logger.warning(f"[私聊][{self.private_name}]未能生成告别语内容,无法发送。")
action_successful = False # 标记动作失败
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": "未能生成告别语内容"}
)
# 3. 无论是否发送成功,都准备结束对话
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]发送告别语流程结束,即将停止对话实例。")
except Exception as goodbye_err:
logger.error(f"[私聊][{self.private_name}]生成或发送告别语时出错: {goodbye_err}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
# 即使出错,也结束对话
self.should_continue = False
action_successful = False # 标记动作失败
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"生成或发送告别语时出错: {goodbye_err}"}
)
elif action == "end_conversation":
# 这个分支现在只会在 action_planner 最终决定不告别时被调用
self.should_continue = False
logger.info(f"[私聊][{self.private_name}]收到最终结束指令,停止对话...")
action_successful = True # 标记这个指令本身是成功的
elif action == "block_and_ignore":
logger.info(f"[私聊][{self.private_name}]不想再理你了...")
ignore_duration_seconds = 10 * 60
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(
f"[私聊][{self.private_name}]将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}"
)
self.state = ConversationState.IGNORED
action_successful = True # 标记动作成功
else: # 对应 'wait' 动作
self.state = ConversationState.WAITING
logger.info(f"[私聊][{self.private_name}]等待更多信息...")
try:
# 检查 waiter 是否存在
if not hasattr(self, "waiter"):
logger.error(f"[私聊][{self.private_name}]Waiter 未初始化,无法等待。")
raise AttributeError("Waiter not initialized")
_timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # Wait 完成就算成功
except Exception as wait_err:
logger.error(f"[私聊][{self.private_name}]等待时出错: {wait_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
# --- 更新 Action History 状态 ---
# 只有当动作本身成功时,才更新状态为 done
if action_successful:
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
# 重置状态: 对于非回复类动作的成功,清除上次回复状态
if action not in ["direct_reply", "send_new_message"]:
self.conversation_info.last_successful_reply_action = None
logger.debug(f"[私聊][{self.private_name}]动作 {action} 成功完成,重置 last_successful_reply_action")
# 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action
elif action == "fetch_knowledge":
self.waiter.wait_accumulated_time = 0
async def _send_reply(self):
"""发送回复"""
if not self.generated_reply:
logger.warning(f"[私聊][{self.private_name}]没有生成回复内容,无法发送。")
return
self.state = ConversationState.FETCHING
knowledge = "TODO:知识"
topic = "TODO:关键词"
try:
_current_time = time.time()
reply_content = self.generated_reply
logger.info(f"假装获取到知识{knowledge},关键词是: {topic}")
# 发送消息 (确保 direct_sender 和 chat_stream 有效)
if not hasattr(self, "direct_sender") or not self.direct_sender:
logger.error(f"[私聊][{self.private_name}]DirectMessageSender 未初始化,无法发送回复。")
return
if not self.chat_stream:
logger.error(f"[私聊][{self.private_name}]ChatStream 未初始化,无法发送回复。")
return
if knowledge:
if topic not in self.conversation_info.knowledge_list:
self.conversation_info.knowledge_list.append({"topic": topic, "knowledge": knowledge})
else:
self.conversation_info.knowledge_list[topic] += knowledge
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
elif action == "rethink_goal":
self.waiter.wait_accumulated_time = 0
# 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息
# 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持)
# 暂时注释掉,观察是否影响 ObservationInfo 的更新
# self.chat_observer.trigger_update()
# if not await self.chat_observer.wait_for_update():
# logger.warning(f"[私聊][{self.private_name}]等待 ChatObserver 更新完成超时")
self.state = ConversationState.RETHINKING
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
self.state = ConversationState.ANALYZING # 更新状态
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info("倾听对方发言...")
await self.waiter.wait_listening(conversation_info)
elif action == "end_conversation":
self.should_continue = False
logger.info("决定结束对话...")
else: # wait
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
await self.waiter.wait(self.conversation_info)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]发送消息或更新状态时失败: {str(e)}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}")
self.state = ConversationState.ANALYZING
async def _send_timeout_message(self):
"""发送超时结束消息"""
@@ -227,21 +695,4 @@ class Conversation:
chat_stream=self.chat_stream, content="TODO:超时消息", reply_to_message=latest_message
)
except Exception as e:
logger.error(f"发送超时消息失败: {str(e)}")
async def _send_reply(self):
"""发送回复"""
if not self.generated_reply:
logger.warning("没有生成回复")
return
try:
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=self.generated_reply)
self.chat_observer.trigger_update() # 触发立即更新
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
self.state = ConversationState.ANALYZING
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
self.state = ConversationState.ANALYZING
logger.error(f"[私聊][{self.private_name}]发送超时消息失败: {str(e)}")

View File

@@ -1,6 +1,10 @@
from typing import Optional
class ConversationInfo:
def __init__(self):
self.done_action = []
self.goal_list = []
self.knowledge_list = []
self.memory_list = []
self.last_successful_reply_action: Optional[str] = None

View File

@@ -1,10 +1,14 @@
import time
from typing import Optional
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..chat.message import Message
from ..message.message_base import Seg
from maim_message import UserInfo, Seg
from src.plugins.chat.message import MessageSending, MessageSet
from src.plugins.chat.message_sender import message_manager
from ..storage.storage import MessageStorage
from ...config.config import global_config
logger = get_module_logger("message_sender")
@@ -12,8 +16,9 @@ logger = get_module_logger("message_sender")
class DirectMessageSender:
"""直接消息发送器"""
def __init__(self):
pass
def __init__(self, private_name: str):
self.private_name = private_name
self.storage = MessageStorage()
async def send_message(
self,
@@ -30,21 +35,44 @@ class DirectMessageSender:
"""
try:
# 创建消息内容
segments = [Seg(type="text", data={"text": content})]
segments = Seg(type="seglist", data=[Seg(type="text", data=content)])
# 检查是否需要引用回复
if reply_to_message:
reply_id = reply_to_message.message_id
message_sending = MessageSending(segments=segments, reply_to_id=reply_id)
else:
message_sending = MessageSending(segments=segments)
# 获取麦麦的信息
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=chat_stream.platform,
)
# 用当前时间作为message_id和之前那套sender一样
message_id = f"dm{round(time.time(), 2)}"
# 构建消息对象
message = MessageSending(
message_id=message_id,
chat_stream=chat_stream,
bot_user_info=bot_user_info,
sender_info=reply_to_message.message_info.user_info if reply_to_message else None,
message_segment=segments,
reply=reply_to_message,
is_head=True,
is_emoji=False,
thinking_start_time=time.time(),
)
# 处理消息
await message.process()
# 不知道有什么用先留下来了和之前那套sender一样
_message_json = message.to_dict()
# 发送消息
message_set = MessageSet(chat_stream, message_sending.message_id)
message_set.add_message(message_sending)
message_manager.add_message(message_set)
logger.info(f"PFC消息已发送: {content}")
message_set = MessageSet(chat_stream, message_id)
message_set.add_message(message)
await message_manager.add_message(message_set)
await self.storage.store_message(message, chat_stream)
logger.info(f"[私聊][{self.private_name}]PFC消息已发送: {content}")
except Exception as e:
logger.error(f"PFC消息发送失败: {str(e)}")
logger.error(f"[私聊][{self.private_name}]PFC消息发送失败: {str(e)}")
raise

View File

@@ -1,12 +1,12 @@
# Programmable Friendly Conversationalist
# Prefrontal cortex
from typing import List, Optional, Dict, Any, Set
from ..message.message_base import UserInfo
from maim_message import UserInfo
import time
from dataclasses import dataclass, field
from src.common.logger import get_module_logger
from .chat_observer import ChatObserver
from .chat_states import NotificationHandler, NotificationType
from .chat_states import NotificationHandler, NotificationType, Notification
from src.plugins.utils.chat_message_builder import build_readable_messages
import traceback # 导入 traceback 用于调试
logger = get_module_logger("observation_info")
@@ -14,187 +14,287 @@ logger = get_module_logger("observation_info")
class ObservationInfoHandler(NotificationHandler):
"""ObservationInfo的通知处理器"""
def __init__(self, observation_info: "ObservationInfo"):
def __init__(self, observation_info: "ObservationInfo", private_name: str):
"""初始化处理器
Args:
observation_info: 要更新的ObservationInfo实例
private_name: 私聊对象的名称,用于日志记录
"""
self.observation_info = observation_info
# 将 private_name 存储在 handler 实例中
self.private_name = private_name
async def handle_notification(self, notification):
async def handle_notification(self, notification: Notification): # 添加类型提示
# 获取通知类型和数据
notification_type = notification.type
data = notification.data
if notification_type == NotificationType.NEW_MESSAGE:
# 处理新消息通知
logger.debug(f"收到新消息通知data: {data}")
message_id = data.get("message_id")
processed_plain_text = data.get("processed_plain_text")
detailed_plain_text = data.get("detailed_plain_text")
user_info = data.get("user_info")
time_value = data.get("time")
try: # 添加错误处理块
if notification_type == NotificationType.NEW_MESSAGE:
# 处理新消息通知
# logger.debug(f"[私聊][{self.private_name}]收到新消息通知data: {data}") # 可以在需要时取消注释
message_id = data.get("message_id")
processed_plain_text = data.get("processed_plain_text")
detailed_plain_text = data.get("detailed_plain_text")
user_info_dict = data.get("user_info") # 先获取字典
time_value = data.get("time")
message = {
"message_id": message_id,
"processed_plain_text": processed_plain_text,
"detailed_plain_text": detailed_plain_text,
"user_info": user_info,
"time": time_value,
}
# 确保 user_info 是字典类型再创建 UserInfo 对象
user_info = None
if isinstance(user_info_dict, dict):
try:
user_info = UserInfo.from_dict(user_info_dict)
except Exception as e:
logger.error(
f"[私聊][{self.private_name}]从字典创建 UserInfo 时出错: {e}, 字典内容: {user_info_dict}"
)
# 可以选择在这里返回或记录错误,避免后续代码出错
return
elif user_info_dict is not None:
logger.warning(
f"[私聊][{self.private_name}]收到的 user_info 不是预期的字典类型: {type(user_info_dict)}"
)
# 根据需要处理非字典情况,这里暂时返回
return
self.observation_info.update_from_message(message)
message = {
"message_id": message_id,
"processed_plain_text": processed_plain_text,
"detailed_plain_text": detailed_plain_text,
"user_info": user_info_dict, # 存储原始字典或 UserInfo 对象,取决于你的 update_from_message 如何处理
"time": time_value,
}
# 传递 UserInfo 对象(如果成功创建)或原始字典
await self.observation_info.update_from_message(message, user_info) # 修改:传递 user_info 对象
elif notification_type == NotificationType.COLD_CHAT:
# 处理冷场通知
is_cold = data.get("is_cold", False)
self.observation_info.update_cold_chat_status(is_cold, time.time())
elif notification_type == NotificationType.COLD_CHAT:
# 处理冷场通知
is_cold = data.get("is_cold", False)
await self.observation_info.update_cold_chat_status(is_cold, time.time()) # 修改:改为 await 调用
elif notification_type == NotificationType.ACTIVE_CHAT:
# 处理活跃通知
is_active = data.get("is_active", False)
self.observation_info.is_cold = not is_active
elif notification_type == NotificationType.ACTIVE_CHAT:
# 处理活跃通知 (通常由 COLD_CHAT 的反向状态处理)
is_active = data.get("is_active", False)
self.observation_info.is_cold = not is_active
elif notification_type == NotificationType.BOT_SPEAKING:
# 处理机器人说话通知
self.observation_info.is_typing = False
self.observation_info.last_bot_speak_time = time.time()
elif notification_type == NotificationType.BOT_SPEAKING:
# 处理机器人说话通知 (按需实现)
self.observation_info.is_typing = False
self.observation_info.last_bot_speak_time = time.time()
elif notification_type == NotificationType.USER_SPEAKING:
# 处理用户说话通知
self.observation_info.is_typing = False
self.observation_info.last_user_speak_time = time.time()
elif notification_type == NotificationType.USER_SPEAKING:
# 处理用户说话通知
self.observation_info.is_typing = False
self.observation_info.last_user_speak_time = time.time()
elif notification_type == NotificationType.MESSAGE_DELETED:
# 处理消息删除通知
message_id = data.get("message_id")
self.observation_info.unprocessed_messages = [
msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id
]
elif notification_type == NotificationType.MESSAGE_DELETED:
# 处理消息删除通知
message_id = data.get("message_id")
# 从 unprocessed_messages 中移除被删除的消息
original_count = len(self.observation_info.unprocessed_messages)
self.observation_info.unprocessed_messages = [
msg for msg in self.observation_info.unprocessed_messages if msg.get("message_id") != message_id
]
if len(self.observation_info.unprocessed_messages) < original_count:
logger.info(f"[私聊][{self.private_name}]移除了未处理的消息 (ID: {message_id})")
elif notification_type == NotificationType.USER_JOINED:
# 处理用户加入通知
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.add(user_id)
elif notification_type == NotificationType.USER_JOINED:
# 处理用户加入通知 (如果适用私聊场景)
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.add(str(user_id)) # 确保是字符串
elif notification_type == NotificationType.USER_LEFT:
# 处理用户离开通知
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.discard(user_id)
elif notification_type == NotificationType.USER_LEFT:
# 处理用户离开通知 (如果适用私聊场景)
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.discard(str(user_id)) # 确保是字符串
elif notification_type == NotificationType.ERROR:
# 处理错误通知
error_msg = data.get("error", "")
logger.error(f"收到错误通知: {error_msg}")
elif notification_type == NotificationType.ERROR:
# 处理错误通知
error_msg = data.get("error", "未提供错误信息")
logger.error(f"[私聊][{self.private_name}]收到错误通知: {error_msg}")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]处理通知时发生错误: {e}")
logger.error(traceback.format_exc()) # 打印详细堆栈信息
@dataclass
class ObservationInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息"""
# --- 修改:添加 private_name 字段 ---
private_name: str = field(init=True) # 让 dataclass 的 __init__ 接收 private_name
# data_list
chat_history: List[str] = field(default_factory=list)
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list)
chat_history: List[Dict[str, Any]] = field(default_factory=list) # 修改:明确类型为 Dict
chat_history_str: str = ""
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list) # 修改:明确类型为 Dict
active_users: Set[str] = field(default_factory=set)
# data
last_bot_speak_time: Optional[float] = None
last_user_speak_time: Optional[float] = None
last_message_time: Optional[float] = None
# 添加 last_message_id
last_message_id: Optional[str] = None
last_message_content: str = ""
last_message_sender: Optional[str] = None
bot_id: Optional[str] = None
chat_history_count: int = 0
new_messages_count: int = 0
cold_chat_duration: float = 0.0
cold_chat_start_time: Optional[float] = None # 用于计算冷场持续时间
cold_chat_duration: float = 0.0 # 缓存计算结果
# state
is_typing: bool = False
has_unread_messages: bool = False
is_typing: bool = False # 可能表示对方正在输入
# has_unread_messages: bool = False # 这个状态可以通过 new_messages_count > 0 判断
is_cold_chat: bool = False
changed: bool = False
changed: bool = False # 用于标记状态是否有变化,以便外部模块决定是否重新规划
# #spec
# #spec (暂时注释掉,如果不需要)
# meta_plan_trigger: bool = False
# --- 修改:移除 __post_init__ 的参数 ---
def __post_init__(self):
"""初始化后创建handler"""
self.chat_observer = None
self.handler = ObservationInfoHandler(self)
"""初始化后创建handler并进行必要的设置"""
self.chat_observer: Optional[ChatObserver] = None # 添加类型提示
self.handler = ObservationInfoHandler(self, self.private_name)
def bind_to_chat_observer(self, chat_observer: ChatObserver):
"""绑定到指定的chat_observer
Args:
stream_id: 聊天流ID
chat_observer: 要绑定的 ChatObserver 实例
"""
if self.chat_observer:
logger.warning(f"[私聊][{self.private_name}]尝试重复绑定 ChatObserver")
return
self.chat_observer = chat_observer
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
)
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
print("1919810------------------------绑定-----------------------------")
try:
# 注册关心的通知类型
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
)
self.chat_observer.notification_manager.register_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
# 可以根据需要注册更多通知类型
# self.chat_observer.notification_manager.register_handler(
# target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
# )
logger.info(f"[私聊][{self.private_name}]成功绑定到 ChatObserver")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]绑定到 ChatObserver 时出错: {e}")
self.chat_observer = None # 绑定失败,重置
def unbind_from_chat_observer(self):
"""解除与chat_observer的绑定"""
if self.chat_observer:
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
)
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
self.chat_observer = None
if self.chat_observer and hasattr(self.chat_observer, "notification_manager"): # 增加检查
try:
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.NEW_MESSAGE, handler=self.handler
)
self.chat_observer.notification_manager.unregister_handler(
target="observation_info", notification_type=NotificationType.COLD_CHAT, handler=self.handler
)
# 如果注册了其他类型,也要在这里注销
# self.chat_observer.notification_manager.unregister_handler(
# target="observation_info", notification_type=NotificationType.MESSAGE_DELETED, handler=self.handler
# )
logger.info(f"[私聊][{self.private_name}]成功从 ChatObserver 解绑")
except Exception as e:
logger.error(f"[私聊][{self.private_name}]从 ChatObserver 解绑时出错: {e}")
finally: # 确保 chat_observer 被重置
self.chat_observer = None
else:
logger.warning(f"[私聊][{self.private_name}]尝试解绑时 ChatObserver 不存在或无效")
def update_from_message(self, message: Dict[str, Any]):
# 修改:update_from_message 接收 UserInfo 对象
async def update_from_message(self, message: Dict[str, Any], user_info: Optional[UserInfo]):
"""从消息更新信息
Args:
message: 消息数据
message: 消息数据字典
user_info: 解析后的 UserInfo 对象 (可能为 None)
"""
# print("1919810-----------------------------------------------------")
# logger.debug(f"更新信息from_message: {message}")
self.last_message_time = message["time"]
self.last_message_id = message["message_id"]
message_time = message.get("time")
message_id = message.get("message_id")
processed_text = message.get("processed_plain_text", "")
self.last_message_content = message.get("processed_plain_text", "")
# 只有在新消息到达时才更新 last_message 相关信息
if message_time and message_time > (self.last_message_time or 0):
self.last_message_time = message_time
self.last_message_id = message_id
self.last_message_content = processed_text
# 重置冷场计时器
self.is_cold_chat = False
self.cold_chat_start_time = None
self.cold_chat_duration = 0.0
user_info = UserInfo.from_dict(message.get("user_info", {}))
self.last_message_sender = user_info.user_id
if user_info:
sender_id = str(user_info.user_id) # 确保是字符串
self.last_message_sender = sender_id
# 更新发言时间
if sender_id == self.bot_id:
self.last_bot_speak_time = message_time
else:
self.last_user_speak_time = message_time
self.active_users.add(sender_id) # 用户发言则认为其活跃
else:
logger.warning(
f"[私聊][{self.private_name}]处理消息更新时缺少有效的 UserInfo 对象, message_id: {message_id}"
)
self.last_message_sender = None # 发送者未知
if user_info.user_id == self.bot_id:
self.last_bot_speak_time = message["time"]
# 将原始消息字典添加到未处理列表
self.unprocessed_messages.append(message)
self.new_messages_count = len(self.unprocessed_messages) # 直接用列表长度
# logger.debug(f"[私聊][{self.private_name}]消息更新: last_time={self.last_message_time}, new_count={self.new_messages_count}")
self.update_changed() # 标记状态已改变
else:
self.last_user_speak_time = message["time"]
self.active_users.add(user_info.user_id)
self.new_messages_count += 1
self.unprocessed_messages.append(message)
self.update_changed()
# 如果消息时间戳不是最新的,可能不需要处理,或者记录一个警告
pass
# logger.warning(f"[私聊][{self.private_name}]收到过时或无效时间戳的消息: ID={message_id}, time={message_time}")
def update_changed(self):
"""更新changed状态"""
"""标记状态已改变,并重置标记"""
# logger.debug(f"[私聊][{self.private_name}]状态标记为已改变 (changed=True)")
self.changed = True
def update_cold_chat_status(self, is_cold: bool, current_time: float):
async def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态
Args:
is_cold: 是否冷场
current_time: 当前时间
is_cold: 是否处于冷场状态
current_time: 当前时间
"""
self.is_cold_chat = is_cold
if is_cold and self.last_message_time:
self.cold_chat_duration = current_time - self.last_message_time
if is_cold != self.is_cold_chat: # 仅在状态变化时更新
self.is_cold_chat = is_cold
if is_cold:
# 进入冷场状态
self.cold_chat_start_time = (
self.last_message_time or current_time
) # 从最后消息时间开始算,或从当前时间开始
logger.info(f"[私聊][{self.private_name}]进入冷场状态,开始时间: {self.cold_chat_start_time}")
else:
# 结束冷场状态
if self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time
logger.info(f"[私聊][{self.private_name}]结束冷场状态,持续时间: {self.cold_chat_duration:.2f}")
self.cold_chat_start_time = None # 重置开始时间
self.update_changed() # 状态变化,标记改变
# 即使状态没变,如果是冷场状态,也更新持续时间
if self.is_cold_chat and self.cold_chat_start_time:
self.cold_chat_duration = current_time - self.cold_chat_start_time
def get_active_duration(self) -> float:
"""获取当前活跃时长
"""获取当前活跃时长 (距离最后一条消息的时间)
Returns:
float: 最后一条消息到现在的时长(秒)
@@ -204,7 +304,7 @@ class ObservationInfo:
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取用户响应时间
"""获取用户最后响应时间 (距离用户最后发言的时间)
Returns:
Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None
@@ -214,7 +314,7 @@ class ObservationInfo:
return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人响应时间
"""获取机器人最后响应时间 (距离机器人最后发言的时间)
Returns:
Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None
@@ -223,13 +323,39 @@ class ObservationInfo:
return None
return time.time() - self.last_bot_speak_time
def clear_unprocessed_messages(self):
"""清空未处理消息列表"""
# 将未处理消息添加到历史记录中
for message in self.unprocessed_messages:
self.chat_history.append(message)
# 清空未处理消息列表
self.has_unread_messages = False
async def clear_unprocessed_messages(self):
"""未处理消息移入历史记录,并更新相关状态"""
if not self.unprocessed_messages:
return # 没有未处理消息,直接返回
# logger.debug(f"[私聊][{self.private_name}]处理 {len(self.unprocessed_messages)} 条未处理消息...")
# 将未处理消息添加到历史记录中 (确保历史记录有长度限制,避免无限增长)
max_history_len = 100 # 示例最多保留100条历史记录
self.chat_history.extend(self.unprocessed_messages)
if len(self.chat_history) > max_history_len:
self.chat_history = self.chat_history[-max_history_len:]
# 更新历史记录字符串 (只使用最近一部分生成例如20条)
history_slice_for_str = self.chat_history[-20:]
try:
self.chat_history_str = await build_readable_messages(
history_slice_for_str,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0, # read_mark 可能需要根据逻辑调整
)
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建聊天记录字符串时出错: {e}")
self.chat_history_str = "[构建聊天记录出错]" # 提供错误提示
# 清空未处理消息列表和计数
# cleared_count = len(self.unprocessed_messages)
self.unprocessed_messages.clear()
self.chat_history_count = len(self.chat_history)
self.new_messages_count = 0
# self.has_unread_messages = False # 这个状态可以通过 new_messages_count 判断
self.chat_history_count = len(self.chat_history) # 更新历史记录总数
# logger.debug(f"[私聊][{self.private_name}]已处理 {cleared_count} 条消息,当前历史记录 {self.chat_history_count} 条。")
self.update_changed() # 状态改变

View File

@@ -1,24 +1,13 @@
# Programmable Friendly Conversationalist
# Prefrontal cortex
import datetime
# import asyncio
from typing import List, Optional, Tuple, TYPE_CHECKING
from typing import List, Tuple, TYPE_CHECKING
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo, Seg
from ..chat.message import Message
from ..models.utils_model import LLM_request
from ..config.config import global_config
from src.plugins.chat.message import MessageSending
from ..message.api import global_api
from ..storage.storage import MessageStorage
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from .chat_observer import ChatObserver
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .conversation_info import ConversationInfo
from .observation_info import ObservationInfo
import time
from src.plugins.utils.chat_message_builder import build_readable_messages
if TYPE_CHECKING:
pass
@@ -29,15 +18,16 @@ logger = get_module_logger("pfc")
class GoalAnalyzer:
"""对话目标分析器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
def __init__(self, stream_id: str, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="conversation_goal"
)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
self.personality_info = Individuality.get_instance().get_prompt(x_person=2, level=3)
self.name = global_config.BOT_NICKNAME
self.nick_name = global_config.BOT_ALIAS_NAMES
self.chat_observer = ChatObserver.get_instance(stream_id)
self.private_name = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
# 多目标存储结构
self.goals = [] # 存储多个目标
@@ -58,16 +48,10 @@ class GoalAnalyzer:
goals_str = ""
if conversation_info.goal_list:
for goal_reason in conversation_info.goal_list:
# 处理字典或元组格式
if isinstance(goal_reason, tuple):
# 假设元组的第一个元素是目标,第二个元素是原因
goal = goal_reason[0]
reasoning = goal_reason[1] if len(goal_reason) > 1 else "没有明确原因"
elif isinstance(goal_reason, dict):
goal = goal_reason.get("goal")
if isinstance(goal_reason, dict):
goal = goal_reason.get("goal", "目标内容缺失")
reasoning = goal_reason.get("reasoning", "没有明确原因")
else:
# 如果是其他类型,尝试转为字符串
goal = str(goal_reason)
reasoning = "没有明确原因"
@@ -79,29 +63,29 @@ class GoalAnalyzer:
goals_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
# 获取聊天历史记录
chat_history_list = observation_info.chat_history
chat_history_text = ""
for msg in chat_history_list:
chat_history_text += f"{msg}\n"
chat_history_text = observation_info.chat_history_str
if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
new_messages_str = await build_readable_messages(
new_messages_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}"
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg}\n"
observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}"
# await observation_info.clear_unprocessed_messages()
persona_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本
action_history_list = conversation_info.done_action
action_history_text = "你之前做的事情是:"
for action in action_history_list:
action_history_text += f"{action}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下聊天记录并根据你的性格特征确定多个明确的对话目标。
prompt = f"""{persona_text}。现在你在参与一场QQ聊天请分析以下聊天记录并根据你的性格特征确定多个明确的对话目标。
这些目标应该反映出对话的不同方面和意图。
{action_history_text}
@@ -124,27 +108,32 @@ class GoalAnalyzer:
输出格式示例:
[
{{
{{
"goal": "回答用户关于Python编程的具体问题",
"reasoning": "用户提出了关于Python的技术问题需要专业且准确的解答"
}},
{{
}},
{{
"goal": "回答用户关于python安装的具体问题",
"reasoning": "用户提出了关于Python的技术问题需要专业且准确的解答"
}}
}}
]"""
logger.debug(f"发送到LLM的提示词: {prompt}")
logger.debug(f"[私聊][{self.private_name}]发送到LLM的提示词: {prompt}")
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
logger.debug(f"[私聊][{self.private_name}]LLM原始返回内容: {content}")
except Exception as e:
logger.error(f"分析对话目标时出错: {str(e)}")
logger.error(f"[私聊][{self.private_name}]分析对话目标时出错: {str(e)}")
content = ""
# 使用改进后的get_items_from_json函数处理JSON数组
success, result = get_items_from_json(
content, "goal", "reasoning", required_types={"goal": str, "reasoning": str}, allow_array=True
content,
self.private_name,
"goal",
"reasoning",
required_types={"goal": str, "reasoning": str},
allow_array=True,
)
if success:
@@ -153,9 +142,7 @@ class GoalAnalyzer:
# 清空现有目标列表并添加新目标
conversation_info.goal_list = []
for item in result:
goal = item.get("goal", "")
reasoning = item.get("reasoning", "")
conversation_info.goal_list.append((goal, reasoning))
conversation_info.goal_list.append(item)
# 返回第一个目标作为当前主要目标(如果有)
if result:
@@ -163,9 +150,7 @@ class GoalAnalyzer:
return (first_goal.get("goal", ""), "", first_goal.get("reasoning", ""))
else:
# 单个目标的情况
goal = result.get("goal", "")
reasoning = result.get("reasoning", "")
conversation_info.goal_list.append((goal, reasoning))
conversation_info.goal_list.append(result)
return (goal, "", reasoning)
# 如果解析失败,返回默认值
@@ -234,18 +219,19 @@ class GoalAnalyzer:
async def analyze_conversation(self, goal, reasoning):
messages = self.chat_observer.get_cached_messages()
chat_history_text = ""
for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}"
if sender == self.name:
sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
chat_history_text = await build_readable_messages(
messages,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
personality_text = f"你的名字是{self.name}{self.personality_info}"
persona_text = f"你的名字是{self.name}{self.personality_info}"
# ===> Persona 文本构建结束 <===
prompt = f"""{personality_text}。现在你在参与一场QQ聊天
# --- 修改 Prompt 字符串,使用 persona_text ---
prompt = f"""{persona_text}。现在你在参与一场QQ聊天
当前对话目标:{goal}
产生该对话目标的原因:{reasoning}
@@ -266,11 +252,12 @@ class GoalAnalyzer:
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
logger.debug(f"[私聊][{self.private_name}]LLM原始返回内容: {content}")
# 尝试解析JSON
success, result = get_items_from_json(
content,
self.private_name,
"goal_achieved",
"stop_conversation",
"reason",
@@ -278,7 +265,7 @@ class GoalAnalyzer:
)
if not success:
logger.error("无法解析对话分析结果JSON")
logger.error(f"[私聊][{self.private_name}]无法解析对话分析结果JSON")
return False, False, "解析结果失败"
goal_achieved = result["goal_achieved"]
@@ -288,75 +275,67 @@ class GoalAnalyzer:
return goal_achieved, stop_conversation, reason
except Exception as e:
logger.error(f"分析对话状态时出错: {str(e)}")
logger.error(f"[私聊][{self.private_name}]分析对话状态时出错: {str(e)}")
return False, False, f"分析出错: {str(e)}"
class DirectMessageSender:
"""直接发送消息到平台的发送器"""
# 先注释掉,万一以后出问题了还能开回来(((
# class DirectMessageSender:
# """直接发送消息到平台的发送器"""
def __init__(self):
self.logger = get_module_logger("direct_sender")
self.storage = MessageStorage()
# def __init__(self, private_name: str):
# self.logger = get_module_logger("direct_sender")
# self.storage = MessageStorage()
# self.private_name = private_name
async def send_via_ws(self, message: MessageSending) -> None:
try:
await global_api.send_message(message)
except Exception as e:
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置请检查配置文件") from e
# async def send_via_ws(self, message: MessageSending) -> None:
# try:
# await global_api.send_message(message)
# except Exception as e:
# raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置请检查配置文件") from e
async def send_message(
self,
chat_stream: ChatStream,
content: str,
reply_to_message: Optional[Message] = None,
) -> None:
"""直接发送消息到平台
# async def send_message(
# self,
# chat_stream: ChatStream,
# content: str,
# reply_to_message: Optional[Message] = None,
# ) -> None:
# """直接发送消息到平台
Args:
chat_stream: 聊天流
content: 消息内容
reply_to_message: 要回复的消息
"""
# 构建消息对象
message_segment = Seg(type="text", data=content)
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=chat_stream.platform,
)
# Args:
# chat_stream: 聊天流
# content: 消息内容
# reply_to_message: 要回复的消息
# """
# # 构建消息对象
# message_segment = Seg(type="text", data=content)
# bot_user_info = UserInfo(
# user_id=global_config.BOT_QQ,
# user_nickname=global_config.BOT_NICKNAME,
# platform=chat_stream.platform,
# )
message = MessageSending(
message_id=f"dm{round(time.time(), 2)}",
chat_stream=chat_stream,
bot_user_info=bot_user_info,
sender_info=reply_to_message.message_info.user_info if reply_to_message else None,
message_segment=message_segment,
reply=reply_to_message,
is_head=True,
is_emoji=False,
thinking_start_time=time.time(),
)
# message = MessageSending(
# message_id=f"dm{round(time.time(), 2)}",
# chat_stream=chat_stream,
# bot_user_info=bot_user_info,
# sender_info=reply_to_message.message_info.user_info if reply_to_message else None,
# message_segment=message_segment,
# reply=reply_to_message,
# is_head=True,
# is_emoji=False,
# thinking_start_time=time.time(),
# )
# 处理消息
await message.process()
# # 处理消息
# await message.process()
message_json = message.to_dict()
# _message_json = message.to_dict()
# 发送消息
try:
end_point = global_config.api_urls.get(message.message_info.platform, None)
if end_point:
# logger.info(f"发送消息到{end_point}")
# logger.info(message_json)
try:
await global_api.send_message_REST(end_point, message_json)
except Exception as e:
logger.error(f"REST方式发送失败出现错误: {str(e)}")
logger.info("尝试使用ws发送")
await self.send_via_ws(message)
else:
await self.send_via_ws(message)
logger.success(f"PFC消息已发送: {content}")
except Exception as e:
logger.error(f"PFC消息发送失败: {str(e)}")
# # 发送消息
# try:
# await self.send_via_ws(message)
# await self.storage.store_message(message, chat_stream)
# logger.success(f"[私聊][{self.private_name}]PFC消息已发送: {content}")
# except Exception as e:
# logger.error(f"[私聊][{self.private_name}]PFC消息发送失败: {str(e)}")

View File

@@ -1,9 +1,11 @@
from typing import List, Tuple
from src.common.logger import get_module_logger
from src.plugins.memory_system.Hippocampus import HippocampusManager
from ..models.utils_model import LLM_request
from ..config.config import global_config
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from ..chat.message import Message
from ..knowledge.knowledge_lib import qa_manager
from ..utils.chat_message_builder import build_readable_messages
logger = get_module_logger("knowledge_fetcher")
@@ -11,13 +13,33 @@ logger = get_module_logger("knowledge_fetcher")
class KnowledgeFetcher:
"""知识调取器"""
def __init__(self):
self.llm = LLM_request(
def __init__(self, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=1000,
request_type="knowledge_fetch",
)
self.private_name = private_name
def _lpmm_get_knowledge(self, query: str) -> str:
"""获取相关知识
Args:
query: 查询内容
Returns:
str: 构造好的,带相关度的知识
"""
logger.debug(f"[私聊][{self.private_name}]正在从LPMM知识库中获取知识")
try:
knowledge_info = qa_manager.get_knowledge(query)
logger.debug(f"[私聊][{self.private_name}]LPMM知识库查询结果: {knowledge_info:150}")
return knowledge_info
except Exception as e:
logger.error(f"[私聊][{self.private_name}]LPMM知识库搜索工具执行失败: {str(e)}")
return "未找到匹配的知识"
async def fetch(self, query: str, chat_history: List[Message]) -> Tuple[str, str]:
"""获取相关知识
@@ -30,10 +52,13 @@ class KnowledgeFetcher:
Tuple[str, str]: (获取的知识, 知识来源)
"""
# 构建查询上下文
chat_history_text = ""
for msg in chat_history:
# sender = msg.message_info.user_info.user_nickname or f"用户{msg.message_info.user_info.user_id}"
chat_history_text += f"{msg.detailed_plain_text}\n"
chat_history_text = await build_readable_messages(
chat_history,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
# 从记忆中获取相关知识
related_memory = await HippocampusManager.get_instance().get_memory_from_text(
@@ -43,13 +68,18 @@ class KnowledgeFetcher:
max_depth=3,
fast_retrieval=False,
)
knowledge_text = ""
sources_text = "无记忆匹配" # 默认值
if related_memory:
knowledge = ""
sources = []
for memory in related_memory:
knowledge += memory[1] + "\n"
knowledge_text += memory[1] + "\n"
sources.append(f"记忆片段{memory[0]}")
return knowledge.strip(), "".join(sources)
knowledge_text = knowledge_text.strip()
sources_text = "".join(sources)
return "未找到相关知识", "无记忆匹配"
knowledge_text += "\n现在有以下**知识**可供参考:\n "
knowledge_text += self._lpmm_get_knowledge(query)
knowledge_text += "\n请记住这些**知识**,并根据**知识**回答问题。\n"
return knowledge_text or "未找到相关知识", sources_text or "无记忆匹配"

View File

@@ -1,3 +1,4 @@
import time
from typing import Dict, Optional
from src.common.logger import get_module_logger
from .conversation import Conversation
@@ -27,7 +28,7 @@ class PFCManager:
cls._instance = PFCManager()
return cls._instance
async def get_or_create_conversation(self, stream_id: str) -> Optional[Conversation]:
async def get_or_create_conversation(self, stream_id: str, private_name: str) -> Optional[Conversation]:
"""获取或创建对话实例
Args:
@@ -38,25 +39,41 @@ class PFCManager:
"""
# 检查是否已经有实例
if stream_id in self._initializing and self._initializing[stream_id]:
logger.debug(f"会话实例正在初始化中: {stream_id}")
logger.debug(f"[私聊][{private_name}]会话实例正在初始化中: {stream_id}")
return None
if stream_id in self._instances and self._instances[stream_id].should_continue:
logger.debug(f"使用现有会话实例: {stream_id}")
logger.debug(f"[私聊][{private_name}]使用现有会话实例: {stream_id}")
return self._instances[stream_id]
if stream_id in self._instances:
instance = self._instances[stream_id]
if (
hasattr(instance, "ignore_until_timestamp")
and instance.ignore_until_timestamp
and time.time() < instance.ignore_until_timestamp
):
logger.debug(f"[私聊][{private_name}]会话实例当前处于忽略状态: {stream_id}")
# 返回 None 阻止交互。或者可以返回实例但标记它被忽略了喵?
# 还是返回 None 吧喵。
return None
# 检查 should_continue 状态
if instance.should_continue:
logger.debug(f"[私聊][{private_name}]使用现有会话实例: {stream_id}")
return instance
# else: 实例存在但不应继续
try:
# 创建新实例
logger.info(f"创建新的对话实例: {stream_id}")
logger.info(f"[私聊][{private_name}]创建新的对话实例: {stream_id}")
self._initializing[stream_id] = True
# 创建实例
conversation_instance = Conversation(stream_id)
conversation_instance = Conversation(stream_id, private_name)
self._instances[stream_id] = conversation_instance
# 启动实例初始化
await self._initialize_conversation(conversation_instance)
except Exception as e:
logger.error(f"创建会话实例失败: {stream_id}, 错误: {e}")
logger.error(f"[私聊][{private_name}]创建会话实例失败: {stream_id}, 错误: {e}")
return None
return conversation_instance
@@ -68,20 +85,21 @@ class PFCManager:
conversation: 要初始化的会话实例
"""
stream_id = conversation.stream_id
private_name = conversation.private_name
try:
logger.info(f"开始初始化会话实例: {stream_id}")
logger.info(f"[私聊][{private_name}]开始初始化会话实例: {stream_id}")
# 启动初始化流程
await conversation._initialize()
# 标记初始化完成
self._initializing[stream_id] = False
logger.info(f"会话实例 {stream_id} 初始化完成")
logger.info(f"[私聊][{private_name}]会话实例 {stream_id} 初始化完成")
except Exception as e:
logger.error(f"管理器初始化会话实例失败: {stream_id}, 错误: {e}")
logger.error(traceback.format_exc())
logger.error(f"[私聊][{private_name}]管理器初始化会话实例失败: {stream_id}, 错误: {e}")
logger.error(f"[私聊][{private_name}]{traceback.format_exc()}")
# 清理失败的初始化
async def get_conversation(self, stream_id: str) -> Optional[Conversation]:

View File

@@ -17,6 +17,7 @@ class ConversationState(Enum):
LISTENING = "倾听"
ENDED = "结束"
JUDGING = "判断"
IGNORED = "屏蔽"
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]

View File

@@ -8,6 +8,7 @@ logger = get_module_logger("pfc_utils")
def get_items_from_json(
content: str,
private_name: str,
*items: str,
default_values: Optional[Dict[str, Any]] = None,
required_types: Optional[Dict[str, type]] = None,
@@ -78,9 +79,9 @@ def get_items_from_json(
if valid_items:
return True, valid_items
except json.JSONDecodeError:
logger.debug("JSON数组解析失败尝试解析单个JSON对象")
logger.debug(f"[私聊][{private_name}]JSON数组解析失败尝试解析单个JSON对象")
except Exception as e:
logger.debug(f"尝试解析JSON数组时出错: {str(e)}")
logger.debug(f"[私聊][{private_name}]尝试解析JSON数组时出错: {str(e)}")
# 尝试解析JSON对象
try:
@@ -93,10 +94,10 @@ def get_items_from_json(
try:
json_data = json.loads(json_match.group())
except json.JSONDecodeError:
logger.error("提取的JSON内容解析失败")
logger.error(f"[私聊][{private_name}]提取的JSON内容解析失败")
return False, result
else:
logger.error("无法在返回内容中找到有效的JSON")
logger.error(f"[私聊][{private_name}]无法在返回内容中找到有效的JSON")
return False, result
# 提取字段
@@ -106,20 +107,20 @@ def get_items_from_json(
# 验证必需字段
if not all(item in result for item in items):
logger.error(f"JSON缺少必要字段实际内容: {json_data}")
logger.error(f"[私聊][{private_name}]JSON缺少必要字段实际内容: {json_data}")
return False, result
# 验证字段类型
if required_types:
for field, expected_type in required_types.items():
if field in result and not isinstance(result[field], expected_type):
logger.error(f"{field} 必须是 {expected_type.__name__} 类型")
logger.error(f"[私聊][{private_name}]{field} 必须是 {expected_type.__name__} 类型")
return False, result
# 验证字符串字段不为空
for field in items:
if isinstance(result[field], str) and not result[field].strip():
logger.error(f"{field} 不能为空")
logger.error(f"[私聊][{private_name}]{field} 不能为空")
return False, result
return True, result

View File

@@ -1,11 +1,10 @@
import json
import datetime
from typing import Tuple
from typing import Tuple, List, Dict, Any
from src.common.logger import get_module_logger
from ..models.utils_model import LLM_request
from ..config.config import global_config
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from .chat_observer import ChatObserver
from ..message.message_base import UserInfo
from maim_message import UserInfo
logger = get_module_logger("reply_checker")
@@ -13,15 +12,18 @@ logger = get_module_logger("reply_checker")
class ReplyChecker:
"""回复检查器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="reply_check"
def __init__(self, stream_id: str, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_PFC_reply_checker, temperature=0.50, max_tokens=1000, request_type="reply_check"
)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
self.max_retries = 2 # 最大重试次数
self.private_name = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
self.max_retries = 3 # 最大重试次数
async def check(self, reply: str, goal: str, retry_count: int = 0) -> Tuple[bool, str, bool]:
async def check(
self, reply: str, goal: str, chat_history: List[Dict[str, Any]], chat_history_text: str, retry_count: int = 0
) -> Tuple[bool, str, bool]:
"""检查生成的回复是否合适
Args:
@@ -32,42 +34,86 @@ class ReplyChecker:
Returns:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
"""
# 获取最新的消息记录
messages = self.chat_observer.get_cached_messages(limit=5)
chat_history_text = ""
for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}"
if sender == self.name:
sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
# 不再从 observer 获取,直接使用传入的 chat_history
# messages = self.chat_observer.get_cached_messages(limit=20)
try:
# 筛选出最近由 Bot 自己发送的消息
bot_messages = []
for msg in reversed(chat_history):
user_info = UserInfo.from_dict(msg.get("user_info", {}))
if str(user_info.user_id) == str(global_config.BOT_QQ): # 确保比较的是字符串
bot_messages.append(msg.get("processed_plain_text", ""))
if len(bot_messages) >= 2: # 只和最近的两条比较
break
# 进行比较
if bot_messages:
# 可以用简单比较,或者更复杂的相似度库 (如 difflib)
# 简单比较:是否完全相同
if reply == bot_messages[0]: # 和最近一条完全一样
logger.warning(
f"[私聊][{self.private_name}]ReplyChecker 检测到回复与上一条 Bot 消息完全相同: '{reply}'"
)
return (
False,
"被逻辑检查拒绝:回复内容与你上一条发言完全相同,可以选择深入话题或寻找其它话题或等待",
True,
) # 不合适,需要返回至决策层
# 2. 相似度检查 (如果精确匹配未通过)
import difflib # 导入 difflib 库
prompt = f"""请检查以下回复是否合适:
# 计算编辑距离相似度ratio() 返回 0 到 1 之间的浮点数
similarity_ratio = difflib.SequenceMatcher(None, reply, bot_messages[0]).ratio()
logger.debug(f"[私聊][{self.private_name}]ReplyChecker - 相似度: {similarity_ratio:.2f}")
# 设置一个相似度阈值
similarity_threshold = 0.9
if similarity_ratio > similarity_threshold:
logger.warning(
f"[私聊][{self.private_name}]ReplyChecker 检测到回复与上一条 Bot 消息高度相似 (相似度 {similarity_ratio:.2f}): '{reply}'"
)
return (
False,
f"被逻辑检查拒绝:回复内容与你上一条发言高度相似 (相似度 {similarity_ratio:.2f}),可以选择深入话题或寻找其它话题或等待。",
True,
)
except Exception as e:
import traceback
logger.error(f"[私聊][{self.private_name}]检查回复时出错: 类型={type(e)}, 值={e}")
logger.error(f"[私聊][{self.private_name}]{traceback.format_exc()}") # 打印详细的回溯信息
prompt = f"""你是一个聊天逻辑检查器,请检查以下回复或消息是否合适:
当前对话目标:{goal}
最新的对话记录:
{chat_history_text}
待检查的回复
待检查的消息
{reply}
请检查以下几点:
1. 回复是否依然符合当前对话目标和实现方式
2. 回复是否与最新的对话记录保持一致性
3. 回复是否重复发言,重复表达
4. 回复是否包含违法违规内容(政治敏感、暴力等)
5. 回复是否以你的角度发言,不要""说的话当做对方说的话,这是你自己说的话
结合聊天记录检查以下几点:
1. 这条消息是否依然符合当前对话目标和实现方式
2. 这条消息是否与最新的对话记录保持一致性
3. 是否存在重复发言,重复表达同质内容(尤其是只是换一种方式表达了相同的含义)
4. 这条消息是否包含违规内容(例如血腥暴力,政治敏感等)
5. 这条消息是否以发送者的角度发言不要让发送者自己回复自己的消息)
6. 这条消息是否通俗易懂
7. 这条消息是否有些多余例如在对方没有回复的情况下依然连续多次“消息轰炸”尤其是已经连续发送3条信息的情况这很可能不合理需要着重判断
8. 这条消息是否使用了完全没必要的修辞
9. 这条消息是否逻辑通顺
10. 这条消息是否太过冗长了通常私聊的每条消息长度在20字以内除非特殊情况
11. 在连续多次发送消息的情况下,这条消息是否衔接自然,会不会显得奇怪(例如连续两条消息中部分内容重叠)
请以JSON格式输出包含以下字段
1. suitable: 是否合适 (true/false)
2. reason: 原因说明
3. need_replan: 是否需要重新规划对话目标 (true/false),当发现当前对话目标不再适合时设为true
3. need_replan: 是否需要重新决策 (true/false),当你认为此时已经不适合发消息,需要规划其它行动时,设为true
输出格式示例:
{{
"suitable": true,
"reason": "回复符合要求,内容得体",
"reason": "回复符合要求,虽然有可能略微偏离目标,但是整体内容流畅得体",
"need_replan": false
}}
@@ -75,7 +121,7 @@ class ReplyChecker:
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"检查回复的原始返回: {content}")
logger.debug(f"[私聊][{self.private_name}]检查回复的原始返回: {content}")
# 清理内容尝试提取JSON部分
content = content.strip()
@@ -128,7 +174,7 @@ class ReplyChecker:
return suitable, reason, need_replan
except Exception as e:
logger.error(f"检查回复时出错: {e}")
logger.error(f"[私聊][{self.private_name}]检查回复时出错: {e}")
# 如果出错且已达到最大重试次数,建议重新规划
if retry_count >= self.max_retries:
return False, "多次检查失败,建议重新规划", True

View File

@@ -1,171 +1,228 @@
from typing import Tuple
from typing import Tuple, List, Dict, Any
from src.common.logger import get_module_logger
from ..models.utils_model import LLM_request
from ..config.config import global_config
from ..models.utils_model import LLMRequest
from ...config.config import global_config
from .chat_observer import ChatObserver
from .reply_checker import ReplyChecker
from src.individuality.individuality import Individuality
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from src.plugins.utils.chat_message_builder import build_readable_messages
logger = get_module_logger("reply_generator")
# --- 定义 Prompt 模板 ---
class ReplyGenerator:
"""回复生成器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=300,
request_type="reply_generation",
)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
self.reply_checker = ReplyChecker(stream_id)
async def generate(self, observation_info: ObservationInfo, conversation_info: ConversationInfo) -> str:
"""生成回复
Args:
goal: 对话目标
chat_history: 聊天历史
knowledge_cache: 知识缓存
previous_reply: 上一次生成的回复(如果有)
retry_count: 当前重试次数
Returns:
str: 生成的回复
"""
# 构建提示词
logger.debug(f"开始生成回复:当前目标: {conversation_info.goal_list}")
# 构建对话目标
goals_str = ""
if conversation_info.goal_list:
for goal_reason in conversation_info.goal_list:
# 处理字典或元组格式
if isinstance(goal_reason, tuple):
# 假设元组的第一个元素是目标,第二个元素是原因
goal = goal_reason[0]
reasoning = goal_reason[1] if len(goal_reason) > 1 else "没有明确原因"
elif isinstance(goal_reason, dict):
goal = goal_reason.get("goal")
reasoning = goal_reason.get("reasoning", "没有明确原因")
else:
# 如果是其他类型,尝试转为字符串
goal = str(goal_reason)
reasoning = "没有明确原因"
goal_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
goals_str += goal_str
else:
goal = "目前没有明确对话目标"
reasoning = "目前没有明确对话目标,最好思考一个对话目标"
goals_str = f"目标:{goal},产生该对话目标的原因:{reasoning}\n"
# 获取聊天历史记录
chat_history_list = (
observation_info.chat_history[-20:]
if len(observation_info.chat_history) >= 20
else observation_info.chat_history
)
chat_history_text = ""
for msg in chat_history_list:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg.get('detailed_plain_text', '')}\n"
observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本
action_history_list = (
conversation_info.done_action[-10:]
if len(conversation_info.done_action) >= 10
else conversation_info.done_action
)
action_history_text = "你之前做的事情是:"
for action in action_history_list:
if isinstance(action, dict):
action_type = action.get("action")
action_reason = action.get("reason")
action_status = action.get("status")
if action_status == "recall":
action_history_text += (
f"原本打算:{action_type},但是因为有新消息,你发现这个行动不合适,所以你没做\n"
)
elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
elif isinstance(action, tuple):
# 假设元组的格式是(action_type, action_reason, action_status)
action_type = action[0] if len(action) > 0 else "未知行动"
action_reason = action[1] if len(action) > 1 else "未知原因"
action_status = action[2] if len(action) > 2 else "done"
if action_status == "recall":
action_history_text += (
f"原本打算:{action_type},但是因为有新消息,你发现这个行动不合适,所以你没做\n"
)
elif action_status == "done":
action_history_text += f"你之前做了:{action_type},原因:{action_reason}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请根据以下信息生成回复
# Prompt for direct_reply (首次回复)
PROMPT_DIRECT_REPLY = """{persona_text}。现在你在参与一场QQ私聊请根据以下信息生成一条回复
当前对话目标:{goals_str}
{knowledge_info_str}
最近的聊天记录:
{chat_history_text}
请根据上述信息,以你的性格特征生成一个自然、得体的回复。回复应该:
1. 符合对话目标,以""的角度发言
2. 体现你的性格特征
3. 自然流畅,像正常聊天一样,简短
4. 适当利用相关知识,但不要生硬引用
请根据上述信息,结合聊天记录,回复对方。该回复应该:
1. 符合对话目标,以""的角度发言(不要自己与自己对话!)
2. 符合你的性格特征和身份细节
3. 通俗易懂,自然流畅,像正常聊天一样,简短通常20字以内除非特殊情况
4. 可以适当利用相关知识,但不要生硬引用
5. 自然、得体,结合聊天记录逻辑合理,且没有重复表达同质内容
请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清""和对方说的话,不要把""说的话当做对方说的话,这是你自己说的话。
请你回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话
可以回复得自然随意自然一些,就像真人一样,注意把握聊天内容,整体风格可以平和、简短,不要刻意突出自身学科背景,不要说你说过的话,可以简短,多简短都可以,但是避免冗长。
请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。
请直接输出回复内容,不需要任何额外格式。"""
# Prompt for send_new_message (追问/补充)
PROMPT_SEND_NEW_MESSAGE = """{persona_text}。现在你在参与一场QQ私聊**刚刚你已经发送了一条或多条消息**,现在请根据以下信息再发一条新消息:
当前对话目标:{goals_str}
{knowledge_info_str}
最近的聊天记录:
{chat_history_text}
请根据上述信息,结合聊天记录,继续发一条新消息(例如对之前消息的补充,深入话题,或追问等等)。该消息应该:
1. 符合对话目标,以""的角度发言(不要自己与自己对话!)
2. 符合你的性格特征和身份细节
3. 通俗易懂自然流畅像正常聊天一样简短通常20字以内除非特殊情况
4. 可以适当利用相关知识,但不要生硬引用
5. 跟之前你发的消息自然的衔接,逻辑合理,且没有重复表达同质内容或部分重叠内容
请注意把握聊天内容,不用太有条理,可以有个性。请分清""和对方说的话,不要把""说的话当做对方说的话,这是你自己说的话。
这条消息可以自然随意自然一些,就像真人一样,注意把握聊天内容,整体风格可以平和、简短,不要刻意突出自身学科背景,不要说你说过的话,可以简短,多简短都可以,但是避免冗长。
请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出消息内容。
不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。
请直接输出回复内容,不需要任何额外格式。"""
# Prompt for say_goodbye (告别语生成)
PROMPT_FAREWELL = """{persona_text}。你在参与一场 QQ 私聊,现在对话似乎已经结束,你决定再发一条最后的消息来圆满结束。
最近的聊天记录:
{chat_history_text}
请根据上述信息,结合聊天记录,构思一条**简短、自然、符合你人设**的最后的消息。
这条消息应该:
1. 从你自己的角度发言。
2. 符合你的性格特征和身份细节。
3. 通俗易懂,自然流畅,通常很简短。
4. 自然地为这场对话画上句号,避免开启新话题或显得冗长、刻意。
请像真人一样随意自然,**简洁是关键**。
不要输出多余内容包括前后缀、冒号、引号、括号、表情包、at或@等)。
请直接输出最终的告别消息内容,不需要任何额外格式。"""
class ReplyGenerator:
"""回复生成器"""
def __init__(self, stream_id: str, private_name: str):
self.llm = LLMRequest(
model=global_config.llm_PFC_chat,
temperature=global_config.llm_PFC_chat["temp"],
max_tokens=300,
request_type="reply_generation",
)
self.personality_info = Individuality.get_instance().get_prompt(x_person=2, level=3)
self.name = global_config.BOT_NICKNAME
self.private_name = private_name
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
self.reply_checker = ReplyChecker(stream_id, private_name)
# 修改 generate 方法签名,增加 action_type 参数
async def generate(
self, observation_info: ObservationInfo, conversation_info: ConversationInfo, action_type: str
) -> str:
"""生成回复
Args:
observation_info: 观察信息
conversation_info: 对话信息
action_type: 当前执行的动作类型 ('direct_reply''send_new_message')
Returns:
str: 生成的回复
"""
# 构建提示词
logger.debug(
f"[私聊][{self.private_name}]开始生成回复 (动作类型: {action_type}):当前目标: {conversation_info.goal_list}"
)
# --- 构建通用 Prompt 参数 ---
# (这部分逻辑基本不变)
# 构建对话目标 (goals_str)
goals_str = ""
if conversation_info.goal_list:
for goal_reason in conversation_info.goal_list:
if isinstance(goal_reason, dict):
goal = goal_reason.get("goal", "目标内容缺失")
reasoning = goal_reason.get("reasoning", "没有明确原因")
else:
goal = str(goal_reason)
reasoning = "没有明确原因"
goal = str(goal) if goal is not None else "目标内容缺失"
reasoning = str(reasoning) if reasoning is not None else "没有明确原因"
goals_str += f"- 目标:{goal}\n 原因:{reasoning}\n"
else:
goals_str = "- 目前没有明确对话目标\n" # 简化无目标情况
# --- 新增:构建知识信息字符串 ---
knowledge_info_str = "【供参考的相关知识和记忆】\n" # 稍微改下标题,表明是供参考
try:
# 检查 conversation_info 是否有 knowledge_list 并且不为空
if hasattr(conversation_info, "knowledge_list") and conversation_info.knowledge_list:
# 最多只显示最近的 5 条知识
recent_knowledge = conversation_info.knowledge_list[-5:]
for i, knowledge_item in enumerate(recent_knowledge):
if isinstance(knowledge_item, dict):
query = knowledge_item.get("query", "未知查询")
knowledge = knowledge_item.get("knowledge", "无知识内容")
source = knowledge_item.get("source", "未知来源")
# 只取知识内容的前 2000 个字
knowledge_snippet = knowledge[:2000] + "..." if len(knowledge) > 2000 else knowledge
knowledge_info_str += (
f"{i + 1}. 关于 '{query}' (来源: {source}): {knowledge_snippet}\n" # 格式微调,更简洁
)
else:
knowledge_info_str += f"{i + 1}. 发现一条格式不正确的知识记录。\n"
if not recent_knowledge:
knowledge_info_str += "- 暂无。\n" # 更简洁的提示
else:
knowledge_info_str += "- 暂无。\n"
except AttributeError:
logger.warning(f"[私聊][{self.private_name}]ConversationInfo 对象可能缺少 knowledge_list 属性。")
knowledge_info_str += "- 获取知识列表时出错。\n"
except Exception as e:
logger.error(f"[私聊][{self.private_name}]构建知识信息字符串时出错: {e}")
knowledge_info_str += "- 处理知识列表时出错。\n"
# 获取聊天历史记录 (chat_history_text)
chat_history_text = observation_info.chat_history_str
if observation_info.new_messages_count > 0 and observation_info.unprocessed_messages:
new_messages_list = observation_info.unprocessed_messages
new_messages_str = await build_readable_messages(
new_messages_list,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
)
chat_history_text += f"\n--- 以下是 {observation_info.new_messages_count} 条新消息 ---\n{new_messages_str}"
elif not chat_history_text:
chat_history_text = "还没有聊天记录。"
# 构建 Persona 文本 (persona_text)
persona_text = f"你的名字是{self.name}{self.personality_info}"
# --- 选择 Prompt ---
if action_type == "send_new_message":
prompt_template = PROMPT_SEND_NEW_MESSAGE
logger.info(f"[私聊][{self.private_name}]使用 PROMPT_SEND_NEW_MESSAGE (追问生成)")
elif action_type == "say_goodbye": # 处理告别动作
prompt_template = PROMPT_FAREWELL
logger.info(f"[私聊][{self.private_name}]使用 PROMPT_FAREWELL (告别语生成)")
else: # 默认使用 direct_reply 的 prompt (包括 'direct_reply' 或其他未明确处理的类型)
prompt_template = PROMPT_DIRECT_REPLY
logger.info(f"[私聊][{self.private_name}]使用 PROMPT_DIRECT_REPLY (首次/非连续回复生成)")
# --- 格式化最终的 Prompt ---
prompt = prompt_template.format(
persona_text=persona_text,
goals_str=goals_str,
chat_history_text=chat_history_text,
knowledge_info_str=knowledge_info_str,
)
# --- 调用 LLM 生成 ---
logger.debug(f"[私聊][{self.private_name}]发送到LLM的生成提示词:\n------\n{prompt}\n------")
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.info(f"生成的回复: {content}")
# is_new = self.chat_observer.check()
# logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息")
# 如果有新消息,重新生成回复
# if is_new:
# logger.info("检测到新消息,重新生成回复")
# return await self.generate(
# goal, chat_history, knowledge_cache,
# None, retry_count
# )
logger.debug(f"[私聊][{self.private_name}]生成的回复: {content}")
# 移除旧的检查新消息逻辑,这应该由 conversation 控制流处理
return content
except Exception as e:
logger.error(f"生成回复时出错: {e}")
logger.error(f"[私聊][{self.private_name}]生成回复时出错: {e}")
return "抱歉,我现在有点混乱,让我重新思考一下..."
async def check_reply(self, reply: str, goal: str, retry_count: int = 0) -> Tuple[bool, str, bool]:
# check_reply 方法保持不变
async def check_reply(
self, reply: str, goal: str, chat_history: List[Dict[str, Any]], chat_history_str: str, retry_count: int = 0
) -> Tuple[bool, str, bool]:
"""检查回复是否合适
Args:
reply: 生成的回复
goal: 对话目标
retry_count: 当前重试次数
Returns:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
(此方法逻辑保持不变)
"""
return await self.reply_checker.check(reply, goal, retry_count)
return await self.reply_checker.check(reply, goal, chat_history, chat_history_str, retry_count)

View File

@@ -1,85 +1,79 @@
from src.common.logger import get_module_logger
from .chat_observer import ChatObserver
from .conversation_info import ConversationInfo
from src.individuality.individuality import Individuality
from ..config.config import global_config
# from src.individuality.individuality import Individuality # 不再需要
from ...config.config import global_config
import time
import asyncio
logger = get_module_logger("waiter")
# --- 在这里设定你想要的超时时间(秒) ---
# 例如: 120 秒 = 2 分钟
DESIRED_TIMEOUT_SECONDS = 300
class Waiter:
"""快 速 等 待"""
"""等待处理类"""
def __init__(self, stream_id: str):
self.chat_observer = ChatObserver.get_instance(stream_id)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
def __init__(self, stream_id: str, private_name: str):
self.chat_observer = ChatObserver.get_instance(stream_id, private_name)
self.name = global_config.BOT_NICKNAME
self.wait_accumulated_time = 0
self.private_name = private_name
# self.wait_accumulated_time = 0 # 不再需要累加计时
async def wait(self, conversation_info: ConversationInfo) -> bool:
"""等待
Returns:
bool: 是否超时True表示超时
"""
# 使用当前时间作为等待开始时间
"""等待用户新消息或超时"""
wait_start_time = time.time()
self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间
logger.info(f"[私聊][{self.private_name}]进入常规等待状态 (超时: {DESIRED_TIMEOUT_SECONDS} 秒)...")
while True:
# 检查是否有新消息
if self.chat_observer.new_message_after(wait_start_time):
logger.info("等待结束,收到新消息")
return False
logger.info(f"[私聊][{self.private_name}]等待结束,收到新消息")
return False # 返回 False 表示不是超时
# 检查是否超时
if time.time() - wait_start_time > 300:
self.wait_accumulated_time += 300
logger.info("等待超过300秒结束对话")
elapsed_time = time.time() - wait_start_time
if elapsed_time > DESIRED_TIMEOUT_SECONDS:
logger.info(f"[私聊][{self.private_name}]等待超过 {DESIRED_TIMEOUT_SECONDS} 秒...添加思考目标。")
wait_goal = {
"goal": f"你等待了{self.wait_accumulated_time / 60}分钟,思考接下来要做什么",
"reason": "对方很久没有回复你的消息了",
"goal": f"你等待了{elapsed_time / 60:.1f}分钟,注意可能在对方看来聊天已经结束,思考接下来要做什么",
"reasoning": "对方很久没有回复你的消息了",
}
conversation_info.goal_list.append(wait_goal)
print(f"添加目标: {wait_goal}")
logger.info(f"[私聊][{self.private_name}]添加目标: {wait_goal}")
return True # 返回 True 表示超时
return True
await asyncio.sleep(1)
logger.info("等待中...")
await asyncio.sleep(5) # 每 5 秒检查一次
logger.debug(
f"[私聊][{self.private_name}]等待中..."
) # 可以考虑把这个频繁日志注释掉,只在超时或收到消息时输出
async def wait_listening(self, conversation_info: ConversationInfo) -> bool:
"""等待倾听
Returns:
bool: 是否超时True表示超时
"""
# 使用当前时间作为等待开始时间
"""倾听用户发言或超时"""
wait_start_time = time.time()
self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间
logger.info(f"[私聊][{self.private_name}]进入倾听等待状态 (超时: {DESIRED_TIMEOUT_SECONDS} 秒)...")
while True:
# 检查是否有新消息
if self.chat_observer.new_message_after(wait_start_time):
logger.info("等待结束,收到新消息")
return False
logger.info(f"[私聊][{self.private_name}]倾听等待结束,收到新消息")
return False # 返回 False 表示不是超时
# 检查是否超时
if time.time() - wait_start_time > 300:
self.wait_accumulated_time += 300
logger.info("等待超过300秒结束对话")
elapsed_time = time.time() - wait_start_time
if elapsed_time > DESIRED_TIMEOUT_SECONDS:
logger.info(f"[私聊][{self.private_name}]倾听等待超过 {DESIRED_TIMEOUT_SECONDS} 秒...添加思考目标。")
wait_goal = {
"goal": f"你等待了{self.wait_accumulated_time / 60}分钟,思考接下来要做什么",
"reason": "对方话说一半消失了,很久没有回复",
# 保持 goal 文本一致
"goal": f"你等待了{elapsed_time / 60:.1f}分钟,对方似乎话说一半突然消失了,可能忙去了?也可能忘记了回复?要问问吗?还是结束对话?或继续等待?思考接下来要做什么",
"reasoning": "对方话说一半消失了,很久没有回复",
}
conversation_info.goal_list.append(wait_goal)
print(f"添加目标: {wait_goal}")
logger.info(f"[私聊][{self.private_name}]添加目标: {wait_goal}")
return True # 返回 True 表示超时
return True
await asyncio.sleep(1)
logger.info("等待中...")
await asyncio.sleep(5) # 每 5 秒检查一次
logger.debug(f"[私聊][{self.private_name}]倾听等待中...") # 同上,可以考虑注释掉