Merge branch 'FPC-test' of https://github.com/smartmita/MaiBot into PFC-fix

This commit is contained in:
Bakadax
2025-04-27 17:44:29 +08:00
4 changed files with 525 additions and 360 deletions

View File

@@ -13,7 +13,7 @@ from .pfc import ChatObserver, GoalAnalyzer, DirectMessageSender
from src.common.logger import get_module_logger
from .action_planner import ActionPlanner
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from .conversation_info import ConversationInfo # 确保导入 ConversationInfo
from .reply_generator import ReplyGenerator
from ..chat.chat_stream import ChatStream
from maim_message import UserInfo
@@ -104,9 +104,6 @@ class Conversation:
self.observation_info.last_message_sender = last_user_info.user_id
self.observation_info.last_message_content = last_msg.get("processed_plain_text", "")
# (可选)可以遍历 initial_messages 来设置 last_bot_speak_time 和 last_user_speak_time
# 这里为了简化,只用了最后一条消息的时间,如果需要精确的发言者时间需要遍历
logger.info(
f"成功加载 {len(initial_messages)} 条初始聊天记录。最后一条消息时间: {self.observation_info.last_message_time}"
)
@@ -136,18 +133,15 @@ class Conversation:
async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块"""
while self.should_continue:
# 忽略逻辑
if self.ignore_until_timestamp and time.time() < self.ignore_until_timestamp:
# 仍在忽略期间,等待下次检查
await asyncio.sleep(30) # 每 30 秒检查一次
continue # 跳过本轮循环的剩余部分
await asyncio.sleep(30)
continue
elif self.ignore_until_timestamp and time.time() >= self.ignore_until_timestamp:
# 忽略期结束,现在正常地结束对话
logger.info(f"忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None # 清除时间戳
self.should_continue = False # 现在停止循环
# (可选)在这里记录一个 'end_conversation' 动作
# 或者确保管理器会基于 should_continue 为 False 来清理它
continue # 跳过本轮循环的剩余部分,让它终止
logger.info(f"忽略时间已到 {self.stream_id},准备结束对话。")
self.ignore_until_timestamp = None
self.should_continue = False
continue
try:
# --- 在规划前记录当前新消息数量 ---
initial_new_message_count = 0
@@ -156,10 +150,13 @@ class Conversation:
else:
logger.warning("ObservationInfo missing 'new_messages_count' before planning.")
# 使用决策信息来辅助行动规划
# --- 调用 Action Planner ---
# 传递 self.conversation_info.last_successful_reply_action
action, reason = await self.action_planner.plan(
self.observation_info, self.conversation_info
) # 注意plan 函数内部现在不应再调用 clear_unprocessed_messages
self.observation_info,
self.conversation_info,
self.conversation_info.last_successful_reply_action
)
# --- 规划后检查是否有 *更多* 新消息到达 ---
current_new_message_count = 0
@@ -169,84 +166,101 @@ class Conversation:
logger.warning("ObservationInfo missing 'new_messages_count' after planning.")
if current_new_message_count > initial_new_message_count:
# 只有当规划期间消息数量 *增加* 了,才认为需要重新规划
logger.info(
f"规划期间发现新增消息 ({initial_new_message_count} -> {current_new_message_count}),跳过本次行动,重新规划"
)
await asyncio.sleep(0.1) # 短暂延时
continue # 跳过本次行动,重新规划
# 如果规划期间有新消息,也应该重置上次回复状态,因为现在要响应新消息了
self.conversation_info.last_successful_reply_action = None
await asyncio.sleep(0.1)
continue
# --- 如果没有在规划期间收到更多新消息,则准备执行行动 ---
# --- 清理未处理消息:移到这里,在执行动作前 ---
# 只有当确实有新消息被 planner 看到,并且 action 是要处理它们的时候才清理
if initial_new_message_count > 0 and action == "direct_reply":
# 包含 send_new_message
if initial_new_message_count > 0 and action in ["direct_reply", "send_new_message"]:
if hasattr(self.observation_info, "clear_unprocessed_messages"):
# 确保 clear_unprocessed_messages 方法存在
logger.debug(f"准备执行 direct_reply清理 {initial_new_message_count} 条规划时已知的新消息。")
logger.debug(f"准备执行 {action},清理 {initial_new_message_count} 条规划时已知的新消息。")
await self.observation_info.clear_unprocessed_messages()
# 手动重置计数器,确保状态一致性(理想情况下 clear 方法会做这个)
if hasattr(self.observation_info, "new_messages_count"):
self.observation_info.new_messages_count = 0
else:
logger.error("无法清理未处理消息: ObservationInfo 缺少 clear_unprocessed_messages 方法!")
# 这里可能需要考虑是否继续执行 action或者抛出错误
# --- 执行行动 ---
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
# 检查是否需要结束对话 (逻辑不变)
goal_ended = False
if hasattr(self.conversation_info, "goal_list") and self.conversation_info.goal_list:
for goal in self.conversation_info.goal_list:
if isinstance(goal, tuple) and len(goal) > 0 and goal[0] == "结束对话":
goal_ended = True
break
elif isinstance(goal, dict) and goal.get("goal") == "结束对话":
for goal_item in self.conversation_info.goal_list:
current_goal = None
if isinstance(goal_item, tuple) and len(goal_item) > 0:
current_goal = goal_item[0]
elif isinstance(goal_item, dict):
current_goal = goal_item.get("goal")
if current_goal == "结束对话":
goal_ended = True
break
if goal_ended:
self.should_continue = False
logger.info("检测到'结束对话'目标,停止循环。")
# break # 可以选择在这里直接跳出循环
except Exception as loop_err:
logger.error(f"PFC主循环出错: {loop_err}")
logger.error(traceback.format_exc())
# 发生严重错误时可以考虑停止,或者至少等待一下再继续
await asyncio.sleep(1) # 发生错误时等待1秒
# 添加短暂的异步睡眠
if self.should_continue: # 只有在还需要继续循环时才 sleep
await asyncio.sleep(0.1) # 等待 0.1 秒,给其他任务执行时间
await asyncio.sleep(1)
logger.info(f"PFC 循环结束 for stream_id: {self.stream_id}") # 添加日志表明循环正常结束
if self.should_continue:
await asyncio.sleep(0.1)
logger.info(f"PFC 循环结束 for stream_id: {self.stream_id}")
def _check_new_messages_after_planning(self):
"""检查在规划后是否有新消息"""
# 检查 ObservationInfo 是否已初始化并且有 new_messages_count 属性
if not hasattr(self, 'observation_info') or not hasattr(self.observation_info, 'new_messages_count'):
logger.warning("ObservationInfo 未初始化或缺少 'new_messages_count' 属性,无法检查新消息。")
return False # 或者根据需要抛出错误
if self.observation_info.new_messages_count > 0:
logger.info(f"发现{self.observation_info.new_messages_count}条新消息,可能需要重新考虑行动")
# 如果需要,可以在这里添加逻辑来根据新消息重新决定行动
logger.info(f"生成/执行动作期间收到 {self.observation_info.new_messages_count} 条新消息,取消当前动作并重新规划")
# 如果有新消息,也应该重置上次回复状态
if hasattr(self, 'conversation_info'): # 确保 conversation_info 已初始化
self.conversation_info.last_successful_reply_action = None
else:
logger.warning("ConversationInfo 未初始化,无法重置 last_successful_reply_action。")
return True
return False
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
chat_info = msg_dict.get("chat_info", {})
chat_stream = ChatStream.from_dict(chat_info)
# 尝试从 msg_dict 直接获取 chat_stream如果失败则从全局 chat_manager 获取
chat_info = msg_dict.get("chat_info")
if chat_info and isinstance(chat_info, dict):
chat_stream = ChatStream.from_dict(chat_info)
elif self.chat_stream: # 使用实例变量中的 chat_stream
chat_stream = self.chat_stream
else: # Fallback: 尝试从 manager 获取 (可能需要 stream_id)
chat_stream = chat_manager.get_stream(self.stream_id)
if not chat_stream:
raise ValueError(f"无法确定 ChatStream for stream_id {self.stream_id}")
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
return Message(
message_id=msg_dict["message_id"],
chat_stream=chat_stream,
time=msg_dict["time"],
message_id=msg_dict.get("message_id", f"gen_{time.time()}"), # 提供默认 ID
chat_stream=chat_stream, # 使用确定的 chat_stream
time=msg_dict.get("time", time.time()), # 提供默认时间
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
)
except Exception as e:
logger.warning(f"转换消息时出错: {e}")
raise
# 可以选择返回 None 或重新抛出异常,这里选择重新抛出以指示问题
raise ValueError(f"无法将字典转换为 Message 对象: {e}") from e
async def _handle_action(
self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo
@@ -255,21 +269,27 @@ class Conversation:
logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史先设置为start完成后再设置为done (这个 update 移到后面执行成功后再做)
# 记录action历史 (逻辑不变)
current_action_record = {
"action": action,
"plan_reason": reason, # 使用 plan_reason 存储规划原因
"status": "start", # 初始状态为 start
"plan_reason": reason,
"status": "start",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
# 确保 done_action 列表存在
if not hasattr(conversation_info, 'done_action'):
conversation_info.done_action = []
conversation_info.done_action.append(current_action_record)
# 获取刚刚添加记录的索引,方便后面更新状态
action_index = len(conversation_info.done_action) - 1
action_successful = False # 用于标记动作是否成功完成
# --- 根据不同的 action 执行 ---
if action == "direct_reply":
max_reply_attempts = 3 # 设置最大尝试次数(与 reply_checker.py 中的 max_retries 保持一致或稍大)
# send_new_message 失败后执行 wait
if action == "send_new_message":
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
@@ -278,179 +298,301 @@ class Conversation:
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
logger.info(f"尝试生成回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)...")
logger.info(f"尝试生成追问回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)...")
self.state = ConversationState.GENERATING
# 1. 生成回复
self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info)
logger.info(f"{reply_attempt_count} 次生成的回复: {self.generated_reply}")
# 1. 生成回复 (调用 generate 时传入 action_type)
self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info, action_type='send_new_message')
logger.info(f"{reply_attempt_count} 次生成的追问回复: {self.generated_reply}")
# 2. 检查回复
# 2. 检查回复 (逻辑不变)
self.state = ConversationState.CHECKING
try:
current_goal_str = conversation_info.goal_list[0]["goal"] if conversation_info.goal_list else ""
# 注意:这里传递的是 reply_attempt_count - 1 作为 retry_count 给 checker
current_goal_str = conversation_info.goal_list[0][0] if conversation_info.goal_list else ""
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history,
chat_history_str=observation_info.chat_history_str,
retry_count=reply_attempt_count - 1, # 传递当前尝试次数从0开始计数
retry_count=reply_attempt_count - 1,
)
logger.info(
f"{reply_attempt_count} 次检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
f"{reply_attempt_count}追问检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
if is_suitable:
final_reply_to_send = self.generated_reply # 保存合适的回复
break # 回复合适,跳出循环
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(f"{reply_attempt_count} 次检查建议重新规划,停止尝试。原因: {check_reason}")
break # 如果检查器建议重新规划,也停止尝试
# 如果不合适但不需要重新规划,循环会继续进行下一次尝试
logger.warning(f"{reply_attempt_count}追问检查建议重新规划,停止尝试。原因: {check_reason}")
break
except Exception as check_err:
logger.error(f"{reply_attempt_count} 次调用 ReplyChecker 时出错: {check_err}")
logger.error(f"{reply_attempt_count} 次调用 ReplyChecker (追问) 时出错: {check_err}")
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
# 如果检查本身出错,可以选择跳出循环或继续尝试
# 这里选择跳出循环,避免无限循环在检查错误上
break
# 循环结束,处理最终结果
if is_suitable:
# 回复合适且已保存在 final_reply_to_send 中
# 检查是否有新消息进来 (在所有尝试结束后再检查一次)
# 检查是否有新消息
if self._check_new_messages_after_planning():
logger.info("生成回复期间收到新消息,取消发送,重新规划行动")
logger.info("生成追问回复期间收到新消息,取消发送,重新规划行动")
conversation_info.done_action[action_index].update(
{
"status": "recall",
"final_reason": f"有新消息,取消发送: {final_reply_to_send}",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
{"status": "recall", "final_reason": f"有新消息,取消发送追问: {final_reply_to_send}"}
)
# 这里直接返回,不执行后续发送和wait
return
return # 直接返回,重新规划
# 发送合适的回复
self.generated_reply = final_reply_to_send # 确保 self.generated_reply 是最终要发送的内容
await self._send_reply()
self.generated_reply = final_reply_to_send
# --- 在这里调用 _send_reply ---
await self._send_reply() # <--- 调用恢复后的函数
# 更新 action 历史状态为 done
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
# 更新状态: 标记上次成功是 send_new_message
self.conversation_info.last_successful_reply_action = 'send_new_message'
action_successful = True # 标记动作成功
else:
# 循环结束但没有找到合适的回复(达到最大次数或检查出错/建议重规划)
logger.warning(f"经过 {reply_attempt_count} 次尝试,未能生成合适的回复。最终原因: {check_reason}")
# 追问失败
logger.warning(f"经过 {reply_attempt_count} 次尝试,未能生成合适的追问回复。最终原因: {check_reason}")
conversation_info.done_action[action_index].update(
{
"status": "recall", # 标记为 recall 因为没有成功发送
"final_reason": f"尝试{reply_attempt_count}次后失败: {check_reason}",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
{"status": "recall", "final_reason": f"追问尝试{reply_attempt_count}次后失败: {check_reason}"}
)
# 重置状态: 追问失败,下次用初始 prompt
self.conversation_info.last_successful_reply_action = None
# 执行 Wait 操作
logger.info("由于无法生成合适回复,执行 'wait' 操作...")
logger.info("由于无法生成合适追问回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
await self.waiter.wait(self.conversation_info)
wait_action_record = {
"action": "wait",
"plan_reason": "因 send_new_message 多次尝试失败而执行的后备等待",
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
elif action == "direct_reply":
max_reply_attempts = 3
reply_attempt_count = 0
is_suitable = False
need_replan = False
check_reason = "未进行尝试"
final_reply_to_send = ""
while reply_attempt_count < max_reply_attempts and not is_suitable:
reply_attempt_count += 1
logger.info(f"尝试生成首次回复 (第 {reply_attempt_count}/{max_reply_attempts} 次)...")
self.state = ConversationState.GENERATING
# 1. 生成回复
self.generated_reply = await self.reply_generator.generate(observation_info, conversation_info, action_type='direct_reply')
logger.info(f"{reply_attempt_count} 次生成的首次回复: {self.generated_reply}")
# 2. 检查回复
self.state = ConversationState.CHECKING
try:
current_goal_str = conversation_info.goal_list[0][0] if conversation_info.goal_list else ""
is_suitable, check_reason, need_replan = await self.reply_generator.check_reply(
reply=self.generated_reply,
goal=current_goal_str,
chat_history=observation_info.chat_history,
chat_history_str=observation_info.chat_history_str,
retry_count=reply_attempt_count - 1,
)
logger.info(
f"{reply_attempt_count} 次首次回复检查结果: 合适={is_suitable}, 原因='{check_reason}', 需重新规划={need_replan}"
)
if is_suitable:
final_reply_to_send = self.generated_reply
break
elif need_replan:
logger.warning(f"{reply_attempt_count} 次首次回复检查建议重新规划,停止尝试。原因: {check_reason}")
break
except Exception as check_err:
logger.error(f"{reply_attempt_count} 次调用 ReplyChecker (首次回复) 时出错: {check_err}")
check_reason = f"{reply_attempt_count} 次检查过程出错: {check_err}"
break
# 循环结束,处理最终结果
if is_suitable:
# 检查是否有新消息
if self._check_new_messages_after_planning():
logger.info("生成首次回复期间收到新消息,取消发送,重新规划行动")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"有新消息,取消发送首次回复: {final_reply_to_send}"}
)
return # 直接返回,重新规划
# 发送合适的回复
self.generated_reply = final_reply_to_send
# --- 在这里调用 _send_reply ---
await self._send_reply() # <--- 调用恢复后的函数
# 更新状态: 标记上次成功是 direct_reply
self.conversation_info.last_successful_reply_action = 'direct_reply'
action_successful = True # 标记动作成功
else:
# 首次回复失败
logger.warning(f"经过 {reply_attempt_count} 次尝试,未能生成合适的首次回复。最终原因: {check_reason}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"首次回复尝试{reply_attempt_count}次后失败: {check_reason}"}
)
# 重置状态: 首次回复失败,下次还是用初始 prompt
self.conversation_info.last_successful_reply_action = None
# 执行 Wait 操作 (保持原有逻辑)
logger.info("由于无法生成合适首次回复,执行 'wait' 操作...")
self.state = ConversationState.WAITING
# 直接调用 wait 方法
await self.waiter.wait(self.conversation_info)
# 可以选择添加一条新的 action 记录来表示这个 wait
wait_action_record = {
"action": "wait",
"plan_reason": "因 direct_reply 多次尝试失败而执行的后备等待",
"status": "done", # wait 完成后可以认为是 done
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
"final_reason": None,
}
conversation_info.done_action.append(wait_action_record)
elif action == "fetch_knowledge":
self.waiter.wait_accumulated_time = 0
self.state = ConversationState.FETCHING
knowledge = "TODO:知识"
topic = "TODO:关键词"
logger.info(f"假装获取到知识{knowledge},关键词是: {topic}")
if knowledge:
pass # 简单处理
# 标记 action 为 done
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
knowledge_query = reason
try:
# 检查 knowledge_fetcher 是否存在
if not hasattr(self, 'knowledge_fetcher'):
logger.error("KnowledgeFetcher 未初始化,无法获取知识。")
raise AttributeError("KnowledgeFetcher not initialized")
knowledge, source = await self.knowledge_fetcher.fetch(knowledge_query, observation_info.chat_history)
logger.info(f"获取到知识: {knowledge[:100]}..., 来源: {source}")
if knowledge:
# 确保 knowledge_list 存在
if not hasattr(conversation_info, 'knowledge_list'):
conversation_info.knowledge_list = []
conversation_info.knowledge_list.append({"query": knowledge_query, "knowledge": knowledge, "source": source})
action_successful = True
except Exception as fetch_err:
logger.error(f"获取知识时出错: {fetch_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"获取知识失败: {fetch_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "rethink_goal":
self.waiter.wait_accumulated_time = 0
self.state = ConversationState.RETHINKING
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
# 标记 action 为 done
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
try:
# 检查 goal_analyzer 是否存在
if not hasattr(self, 'goal_analyzer'):
logger.error("GoalAnalyzer 未初始化,无法重新思考目标。")
raise AttributeError("GoalAnalyzer not initialized")
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
action_successful = True
except Exception as rethink_err:
logger.error(f"重新思考目标时出错: {rethink_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"重新思考目标失败: {rethink_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info("倾听对方发言...")
await self.waiter.wait_listening(conversation_info)
# listening 和 wait 通常在完成后不需要标记为 done因为它们是持续状态
# 但如果需要记录,可以在 waiter 返回后标记。目前逻辑是 waiter 返回后主循环继续。
# 为了统一,可以暂时在这里也标记一下(或者都不标记)
conversation_info.done_action[action_index].update(
{
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
try:
# 检查 waiter 是否存在
if not hasattr(self, 'waiter'):
logger.error("Waiter 未初始化,无法倾听。")
raise AttributeError("Waiter not initialized")
timeout_occurred = await self.waiter.wait_listening(conversation_info)
action_successful = True # Listening 完成就算成功
except Exception as listen_err:
logger.error(f"倾听时出错: {listen_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"倾听失败: {listen_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
elif action == "end_conversation":
self.should_continue = False # 设置循环停止标志
self.should_continue = False
logger.info("决定结束对话...")
# 标记 action 为 done
action_successful = True # 标记动作成功
elif action == "block_and_ignore":
logger.info("不想再理你了...")
ignore_duration_seconds = 10 * 60
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(f"将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}")
self.state = ConversationState.IGNORED
action_successful = True # 标记动作成功
else: # 对应 'wait' 动作
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
try:
# 检查 waiter 是否存在
if not hasattr(self, 'waiter'):
logger.error("Waiter 未初始化,无法等待。")
raise AttributeError("Waiter not initialized")
timeout_occurred = await self.waiter.wait(self.conversation_info)
action_successful = True # Wait 完成就算成功
except Exception as wait_err:
logger.error(f"等待时出错: {wait_err}")
conversation_info.done_action[action_index].update(
{"status": "recall", "final_reason": f"等待失败: {wait_err}"}
)
self.conversation_info.last_successful_reply_action = None # 重置状态
# --- 更新 Action History 状态 ---
# 只有当动作本身成功时,才更新状态为 done
if action_successful:
conversation_info.done_action[action_index].update(
{
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
# 这里不需要 return主循环会在下一轮检查 should_continue
# 重置状态: 对于非回复类动作的成功,清除上次回复状态
if action not in ['direct_reply', 'send_new_message']:
self.conversation_info.last_successful_reply_action = None
logger.debug(f"动作 {action} 成功完成,重置 last_successful_reply_action")
# 如果动作是 recall 状态,在各自的处理逻辑中已经更新了 done_action
elif action == "block_and_ignore":
logger.info("不想再理你了...")
# 1. 标记对话为暂时忽略
ignore_duration_seconds = 10 * 60 # 10 分钟
self.ignore_until_timestamp = time.time() + ignore_duration_seconds
logger.info(f"将忽略此对话直到: {datetime.datetime.fromtimestamp(self.ignore_until_timestamp)}")
conversation_info.done_action[action_index].update(
{
"status": "done", # 或者一个自定义状态,比如 "ignored"
"final_reason": "Detected potential harassment, ignoring temporarily.", # 检测到潜在骚扰,暂时忽略
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
self.state = ConversationState.IGNORED
async def _send_reply(self):
"""发送回复"""
if not self.generated_reply:
logger.warning("没有生成回复内容,无法发送。")
return
else: # 对应 'wait' 动作
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
await self.waiter.wait(self.conversation_info)
# 同 listening可以考虑是否标记状态
conversation_info.done_action[action_index].update(
{
"status": "done", # 或 "completed"
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
try:
_current_time = time.time()
reply_content = self.generated_reply
# 发送消息 (确保 direct_sender 和 chat_stream 有效)
if not hasattr(self, 'direct_sender') or not self.direct_sender:
logger.error("DirectMessageSender 未初始化,无法发送回复。")
return
if not self.chat_stream:
logger.error("ChatStream 未初始化,无法发送回复。")
return
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
# 发送成功后,手动触发 observer 更新可能导致重复处理自己发送的消息
# 更好的做法是依赖 observer 的自动轮询或数据库触发器(如果支持)
# 暂时注释掉,观察是否影响 ObservationInfo 的更新
# self.chat_observer.trigger_update()
# if not await self.chat_observer.wait_for_update():
# logger.warning("等待 ChatObserver 更新完成超时")
self.state = ConversationState.ANALYZING # 更新状态
except Exception as e:
logger.error(f"发送消息或更新状态时失败: {str(e)}")
logger.error(traceback.format_exc())
self.state = ConversationState.ANALYZING
async def _send_timeout_message(self):
"""发送超时结束消息"""
@@ -465,29 +607,3 @@ class Conversation:
)
except Exception as e:
logger.error(f"发送超时消息失败: {str(e)}")
async def _send_reply(self):
"""发送回复"""
if not self.generated_reply:
logger.warning("没有生成回复")
return
try:
# 外层 try: 捕获发送消息和后续处理中的主要错误
_current_time = time.time() # 获取当前时间戳
reply_content = self.generated_reply # 获取要发送的内容
# 发送消息
await self.direct_sender.send_message(chat_stream=self.chat_stream, content=reply_content)
# 原有的触发更新和等待代码
self.chat_observer.trigger_update()
if not await self.chat_observer.wait_for_update():
logger.warning("等待 ChatObserver 更新完成超时")
self.state = ConversationState.ANALYZING # 更新对话状态
except Exception as e:
# 这是外层 try 对应的 except
logger.error(f"发送消息或更新状态时失败: {str(e)}")
self.state = ConversationState.ANALYZING # 出错也要尝试恢复状态