🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-04-24 03:17:06 +00:00
parent b92e0891a1
commit 49c4d77c97
7 changed files with 238 additions and 193 deletions

View File

@@ -75,34 +75,36 @@ class Conversation:
raise
try:
logger.info(f"{self.stream_id} 加载初始聊天记录...")
storage = MongoDBMessageStorage() # 创建存储实例
storage = MongoDBMessageStorage() # 创建存储实例
# 获取当前时间点之前最多 N 条消息 (比如 30 条)
# get_messages_before 返回的是按时间正序排列的列表
initial_messages = await storage.get_messages_before(
chat_id=self.stream_id,
time_point=time.time(),
limit=30 # 加载最近20条作为初始上下文可以调整
chat_id=self.stream_id,
time_point=time.time(),
limit=30, # 加载最近20条作为初始上下文可以调整
)
if initial_messages:
# 将加载的消息填充到 ObservationInfo 的 chat_history
self.observation_info.chat_history = initial_messages
self.observation_info.chat_history_count = len(initial_messages)
# 更新 ObservationInfo 中的时间戳等信息
last_msg = initial_messages[-1]
self.observation_info.last_message_time = last_msg.get('time')
self.observation_info.last_message_time = last_msg.get("time")
last_user_info = UserInfo.from_dict(last_msg.get("user_info", {}))
self.observation_info.last_message_sender = last_user_info.user_id
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
# (可选)可以遍历 initial_messages 来设置 last_bot_speak_time 和 last_user_speak_time
# 这里为了简化,只用了最后一条消息的时间,如果需要精确的发言者时间需要遍历
logger.info(f"成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}")
logger.info(
f"成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}"
)
# 让 ChatObserver 从加载的最后一条消息之后开始同步
self.chat_observer.last_message_time = self.observation_info.last_message_time
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
self.chat_observer.last_message_read = last_msg # 更新 observer 的最后读取记录
else:
logger.info("没有找到初始聊天记录。")
@@ -128,49 +130,52 @@ class Conversation:
try:
# --- 在规划前记录当前新消息数量 ---
initial_new_message_count = 0
if hasattr(self.observation_info, 'new_messages_count'):
if hasattr(self.observation_info, "new_messages_count"):
initial_new_message_count = self.observation_info.new_messages_count
else:
logger.warning("ObservationInfo missing 'new_messages_count' before planning.")
logger.warning("ObservationInfo missing 'new_messages_count' before planning.")
# 使用决策信息来辅助行动规划
action, reason = await self.action_planner.plan(self.observation_info, self.conversation_info) # 注意plan 函数内部现在不应再调用 clear_unprocessed_messages
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info
) # 注意plan 函数内部现在不应再调用 clear_unprocessed_messages
# --- 规划后检查是否有 *更多* 新消息到达 ---
current_new_message_count = 0
if hasattr(self.observation_info, 'new_messages_count'):
current_new_message_count = self.observation_info.new_messages_count
if hasattr(self.observation_info, "new_messages_count"):
current_new_message_count = self.observation_info.new_messages_count
else:
logger.warning("ObservationInfo missing 'new_messages_count' after planning.")
logger.warning("ObservationInfo missing 'new_messages_count' after planning.")
if current_new_message_count > initial_new_message_count:
# 只有当规划期间消息数量 *增加* 了,才认为需要重新规划
logger.info(f"规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划")
await asyncio.sleep(0.1) # 短暂延时
continue # 跳过本次行动,重新规划
logger.info(
f"规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划"
)
await asyncio.sleep(0.1) # 短暂延时
continue # 跳过本次行动,重新规划
# --- 如果没有在规划期间收到更多新消息,则准备执行行动 ---
# --- 清理未处理消息:移到这里,在执行动作前 ---
# 只有当确实有新消息被 planner 看到,并且 action 是要处理它们的时候才清理
if initial_new_message_count > 0 and action == "direct_reply":
if hasattr(self.observation_info, 'clear_unprocessed_messages'):
# 确保 clear_unprocessed_messages 方法存在
logger.debug(f"准备执行 direct_reply清理 {initial_new_message_count} 条规划时已知的新消息。")
self.observation_info.clear_unprocessed_messages()
# 手动重置计数器,确保状态一致性(理想情况下 clear 方法会做这个)
if hasattr(self.observation_info, 'new_messages_count'):
self.observation_info.new_messages_count = 0
if hasattr(self.observation_info, "clear_unprocessed_messages"):
# 确保 clear_unprocessed_messages 方法存在
logger.debug(f"准备执行 direct_reply清理 {initial_new_message_count} 条规划时已知的新消息。")
self.observation_info.clear_unprocessed_messages()
# 手动重置计数器,确保状态一致性(理想情况下 clear 方法会做这个)
if hasattr(self.observation_info, "new_messages_count"):
self.observation_info.new_messages_count = 0
else:
logger.error("无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!")
# 这里可能需要考虑是否继续执行 action或者抛出错误
logger.error("无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!")
# 这里可能需要考虑是否继续执行 action或者抛出错误
# --- 执行行动 ---
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
goal_ended = False
if hasattr(self.conversation_info, 'goal_list') and self.conversation_info.goal_list:
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
for goal in self.conversation_info.goal_list:
if isinstance(goal, tuple) and len(goal) > 0 and goal[0] == "结束对话":
goal_ended = True
@@ -185,15 +190,15 @@ class Conversation:
# break # 可以选择在这里直接跳出循环
except Exception as loop_err:
logger.error(f"PFC主循环出错: {loop_err}")
logger.error(traceback.format_exc())
# 发生严重错误时可以考虑停止,或者至少等待一下再继续
await asyncio.sleep(1) # 发生错误时等待1秒
#添加短暂的异步睡眠
if self.should_continue: # 只有在还需要继续循环时才 sleep
await asyncio.sleep(0.1) # 等待 0.1 秒,给其他任务执行时间
logger.error(f"PFC主循环出错: {loop_err}")
logger.error(traceback.format_exc())
# 发生严重错误时可以考虑停止,或者至少等待一下再继续
await asyncio.sleep(1) # 发生错误时等待1秒
# 添加短暂的异步睡眠
if self.should_continue: # 只有在还需要继续循环时才 sleep
await asyncio.sleep(0.1) # 等待 0.1 秒,给其他任务执行时间
logger.info(f"PFC 循环结束 for stream_id: {self.stream_id}") # 添加日志表明循环正常结束
logger.info(f"PFC 循环结束 for stream_id: {self.stream_id}") # 添加日志表明循环正常结束
def _check_new_messages_after_planning(self):
"""检查在规划后是否有新消息"""
@@ -226,16 +231,16 @@ class Conversation:
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo
):
"""处理规划的行动"""
logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史先设置为start完成后再设置为done (这个 update 移到后面执行成功后再做)
current_action_record = {
"action": action,
"plan_reason": reason, #使用 plan_reason 存储规划原因
"status": "start", # 初始状态为 start
"plan_reason": reason, # 使用 plan_reason 存储规划原因
"status": "start", # 初始状态为 start
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None
"final_reason": None,
}
conversation_info.done_action.append(current_action_record)
# 获取刚刚添加记录的索引,方便后面更新状态
@@ -244,33 +249,33 @@ class Conversation:
# --- 根据不同的 action 执行 ---
if action == "direct_reply":
# --- 这个 if 块内部的所有代码都需要正确缩进 ---
self.waiter.wait_accumulated_time = 0 # 重置等待时间
self.waiter.wait_accumulated_time = 0 # 重置等待时间
self.state = ConversationState.GENERATING
# 生成回复
self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info)
logger.info(f"生成回复: {self.generated_reply}") # 使用 logger
logger.info(f"生成回复: {self.generated_reply}") # 使用 logger
# --- 调用 ReplyChecker 检查回复 ---
is_suitable = False # 先假定不合适,检查通过再改为 True
check_reason = "检查未执行" # 用不同的变量名存储检查原因
is_suitable = False # 先假定不合适,检查通过再改为 True
check_reason = "检查未执行" # 用不同的变量名存储检查原因
need_replan = False
try:
# 尝试获取当前主要目标
current_goal_str = conversation_info.goal_list[0][0] if conversation_info.goal_list else ""
# 调用检查器
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history, # 传入最新的历史记录!
retry_count=0
chat_history=observation_info.chat_history, # 传入最新的历史记录!
retry_count=0,
)
logger.info(f"回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}")
except Exception as check_err:
logger.error(f"调用 ReplyChecker 时出错: {check_err}")
check_reason = f"检查过程出错: {check_err}" # 记录错误原因
check_reason = f"检查过程出错: {check_err}" # 记录错误原因
# is_suitable 保持 False
# --- 处理检查结果 ---
@@ -280,38 +285,44 @@ class Conversation:
if self._check_new_messages_after_planning():
logger.info("检查到新消息,取消发送已生成的回复,重新规划行动")
# 更新 action 状态为 recall
conversation_info.done_action[action_index].update({
"status": "recall",
"reason": f"有新消息,取消发送: {self.generated_reply}", # 更新原因
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
return None # 退出 _handle_action
conversation_info.done_action[action_index].update(
{
"status": "recall",
"reason": f"有新消息,取消发送: {self.generated_reply}", # 更新原因
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
return None # 退出 _handle_action
# 发送回复
await self._send_reply() # 这个函数内部会处理自己的错误
await self._send_reply() # 这个函数内部会处理自己的错误
# 更新 action 历史状态为 done
conversation_info.done_action[action_index].update({
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
else:
# 回复不合适
logger.warning(f"生成的回复被 ReplyChecker 拒绝: '{self.generated_reply}'. 原因: {check_reason}")
# 更新 action 状态为 recall (因为没执行发送)
conversation_info.done_action[action_index].update({
"status": "recall",
"final_reason": check_reason,
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "recall",
"final_reason": check_reason,
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
# 如果检查器建议重新规划
if need_replan:
logger.info("ReplyChecker 建议重新规划目标。")
# 可选:在此处清空目标列表以强制重新规划
# conversation_info.goal_list = []
# conversation_info.goal_list = []
# 注意:不发送消息,也不执行后面的代码
# --- 之前重复的代码块已被删除 ---
@@ -323,22 +334,26 @@ class Conversation:
topic = "TODO:关键词"
logger.info(f"假装获取到知识{knowledge},关键词是: {topic}")
if knowledge:
pass # 简单处理
pass # 简单处理
# 标记 action 为 done
conversation_info.done_action[action_index].update({
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
elif action == "rethink_goal":
self.waiter.wait_accumulated_time = 0
self.state = ConversationState.RETHINKING
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
# 标记 action 为 done
conversation_info.done_action[action_index].update({
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
elif action == "listening":
self.state = ConversationState.LISTENING
@@ -347,31 +362,36 @@ class Conversation:
# listening 和 wait 通常在完成后不需要标记为 done因为它们是持续状态
# 但如果需要记录,可以在 waiter 返回后标记。目前逻辑是 waiter 返回后主循环继续。
# 为了统一,可以暂时在这里也标记一下(或者都不标记)
conversation_info.done_action[action_index].update({
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
elif action == "end_conversation":
self.should_continue = False # 设置循环停止标志
self.should_continue = False # 设置循环停止标志
logger.info("决定结束对话...")
# 标记 action 为 done
conversation_info.done_action[action_index].update({
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
# 这里不需要 return主循环会在下一轮检查 should_continue
else: # 对应 'wait' 动作
else: # 对应 'wait' 动作
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
await self.waiter.wait(self.conversation_info)
# 同 listening可以考虑是否标记状态
conversation_info.done_action[action_index].update({
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
})
conversation_info.done_action[action_index].update(
{
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
async def _send_timeout_message(self):
"""发送超时结束消息"""
@@ -395,35 +415,35 @@ class Conversation:
try:
# 外层 try: 捕获发送消息和后续处理中的主要错误
current_time = time.time() # 获取当前时间戳
reply_content = self.generated_reply # 获取要发送的内容
current_time = time.time() # 获取当前时间戳
reply_content = self.generated_reply # 获取要发送的内容
# 发送消息
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
logger.info(f"消息已发送: {reply_content}") # 可以在发送后加个日志确认
logger.info(f"消息已发送: {reply_content}") # 可以在发送后加个日志确认
# --- 添加的立即更新状态逻辑开始 ---
try:
# 内层 try: 专门捕获手动更新状态时可能出现的错误
# 创建一个代表刚刚发送的消息的字典
bot_message_info = {
"message_id": f"bot_sent_{current_time}", # 创建一个简单的唯一ID
"message_id": f"bot_sent_{current_time}", # 创建一个简单的唯一ID
"time": current_time,
"user_info": UserInfo( # 使用 UserInfo 类构建用户信息
user_id=str(global_config.BOT_QQ),
user_nickname=global_config.BOT_NICKNAME,
platform=self.chat_stream.platform # 从 chat_stream 获取平台信息
).to_dict(), # 转换为字典格式存储
"processed_plain_text": reply_content, # 使用发送的内容
"detailed_plain_text": f"{int(current_time)},{global_config.BOT_NICKNAME}:{reply_content}", # 构造一个简单的详细文本, 时间戳取整
"user_info": UserInfo( # 使用 UserInfo 类构建用户信息
user_id=str(global_config.BOT_QQ),
user_nickname=global_config.BOT_NICKNAME,
platform=self.chat_stream.platform, # 从 chat_stream 获取平台信息
).to_dict(), # 转换为字典格式存储
"processed_plain_text": reply_content, # 使用发送的内容
"detailed_plain_text": f"{int(current_time)},{global_config.BOT_NICKNAME}:{reply_content}", # 构造一个简单的详细文本, 时间戳取整
# 可以根据需要添加其他字段,保持与 observation_info.chat_history 中其他消息结构一致
}
# 直接更新 ObservationInfo 实例
if self.observation_info:
self.observation_info.chat_history.append(bot_message_info) # 将消息添加到历史记录末尾
self.observation_info.last_bot_speak_time = current_time # 更新 Bot 最后发言时间
self.observation_info.last_message_time = current_time # 更新最后消息时间
self.observation_info.chat_history.append(bot_message_info) # 将消息添加到历史记录末尾
self.observation_info.last_bot_speak_time = current_time # 更新 Bot 最后发言时间
self.observation_info.last_message_time = current_time # 更新最后消息时间
logger.debug("已手动将Bot发送的消息添加到 ObservationInfo")
else:
logger.warning("无法手动更新 ObservationInfo实例不存在")
@@ -432,15 +452,14 @@ class Conversation:
logger.error(f"手动更新 ObservationInfo 时出错: {update_err}")
# --- 添加的立即更新状态逻辑结束 ---
# 原有的触发更新和等待代码
self.chat_observer.trigger_update()
if not await self.chat_observer.wait_for_update():
logger.warning("等待 ChatObserver 更新完成超时")
self.state = ConversationState.ANALYZING # 更新对话状态
self.state = ConversationState.ANALYZING # 更新对话状态
except Exception as e:
# 这是外层 try 对应的 except
logger.error(f"发送消息或更新状态时失败: {str(e)}")
self.state = ConversationState.ANALYZING # 出错也要尝试恢复状态
self.state = ConversationState.ANALYZING # 出错也要尝试恢复状态