迁移:3804124,9e9e796
(feat:将no_reply内置、fix:优化reply,填补缺失值)
This commit is contained in:
@@ -1,13 +1,16 @@
|
||||
import asyncio
|
||||
import time
|
||||
import traceback
|
||||
from typing import Optional, Dict, Any
|
||||
from typing import Optional, Dict, Any, Tuple
|
||||
|
||||
from src.chat.message_receive.chat_stream import get_chat_manager
|
||||
from src.chat.utils.timer_calculator import Timer
|
||||
from src.common.logger import get_logger
|
||||
from src.config.config import global_config
|
||||
from src.chat.planner_actions.planner import ActionPlanner
|
||||
from src.chat.planner_actions.action_modifier import ActionModifier
|
||||
from src.person_info.person_info import get_person_info_manager
|
||||
from src.plugin_system.apis import database_api
|
||||
from src.plugin_system.base.component_types import ChatMode
|
||||
from src.mais4u.constant_s4u import ENABLE_S4U
|
||||
from src.chat.chat_loop.hfc_utils import send_typing, stop_typing
|
||||
@@ -28,6 +31,8 @@ class CycleProcessor:
|
||||
response_handler: 响应处理器,负责生成和发送回复
|
||||
cycle_tracker: 循环跟踪器,负责记录和管理每次思考循环的信息
|
||||
"""
|
||||
self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]"
|
||||
|
||||
self.context = context
|
||||
self.response_handler = response_handler
|
||||
self.cycle_tracker = cycle_tracker
|
||||
@@ -35,7 +40,54 @@ class CycleProcessor:
|
||||
self.action_modifier = ActionModifier(
|
||||
action_manager=self.context.action_manager, chat_id=self.context.stream_id
|
||||
)
|
||||
|
||||
async def _send_and_store_reply(
|
||||
self,
|
||||
response_set,
|
||||
reply_to_str,
|
||||
loop_start_time,
|
||||
action_message,
|
||||
cycle_timers: Dict[str, float],
|
||||
thinking_id,
|
||||
plan_result,
|
||||
) -> Tuple[Dict[str, Any], str, Dict[str, float]]:
|
||||
with Timer("回复发送", cycle_timers):
|
||||
reply_text = await self._send_response(response_set, reply_to_str, loop_start_time, action_message)
|
||||
|
||||
# 存储reply action信息
|
||||
person_info_manager = get_person_info_manager()
|
||||
person_id = person_info_manager.get_person_id(
|
||||
action_message.get("chat_info_platform", ""),
|
||||
action_message.get("user_id", ""),
|
||||
)
|
||||
person_name = await person_info_manager.get_value(person_id, "person_name")
|
||||
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
|
||||
|
||||
await database_api.store_action_info(
|
||||
chat_stream=self.chat_stream,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=action_prompt_display,
|
||||
action_done=True,
|
||||
thinking_id=thinking_id,
|
||||
action_data={"reply_text": reply_text, "reply_to": reply_to_str},
|
||||
action_name="reply",
|
||||
)
|
||||
|
||||
# 构建循环信息
|
||||
loop_info: Dict[str, Any] = {
|
||||
"loop_plan_info": {
|
||||
"action_result": plan_result.get("action_result", {}),
|
||||
},
|
||||
"loop_action_info": {
|
||||
"action_taken": True,
|
||||
"reply_text": reply_text,
|
||||
"command": "",
|
||||
"taken_time": time.time(),
|
||||
},
|
||||
}
|
||||
|
||||
return loop_info, reply_text, cycle_timers
|
||||
|
||||
async def observe(self, message_data: Optional[Dict[str, Any]] = None) -> bool:
|
||||
"""
|
||||
观察和处理单次思考循环的核心方法
|
||||
@@ -79,34 +131,24 @@ class CycleProcessor:
|
||||
at_bot_mentioned = (global_config.chat.mentioned_bot_inevitable_reply and is_mentioned_bot) or (
|
||||
global_config.chat.at_bot_inevitable_reply and is_mentioned_bot
|
||||
)
|
||||
|
||||
|
||||
# 专注模式下提及bot必定回复
|
||||
if self.context.loop_mode == ChatMode.FOCUS and at_bot_mentioned and "no_reply" in available_actions:
|
||||
available_actions = {k: v for k, v in available_actions.items() if k != "no_reply"}
|
||||
|
||||
# 检查是否在normal模式下没有可用动作(除了reply相关动作)
|
||||
skip_planner = False
|
||||
gen_task = None
|
||||
|
||||
if self.context.loop_mode == ChatMode.NORMAL:
|
||||
non_reply_actions = {
|
||||
k: v for k, v in available_actions.items() if k not in ["reply", "no_reply", "no_action"]
|
||||
}
|
||||
if not non_reply_actions:
|
||||
skip_planner = True
|
||||
logger.info(f"{self.log_prefix} Normal模式下没有可用动作,直接回复")
|
||||
logger.info(f"Normal模式下没有可用动作,直接回复")
|
||||
plan_result = self._get_direct_reply_plan(loop_start_time)
|
||||
target_message = message_data
|
||||
|
||||
# 如果normal模式且不跳过规划器,开始一个回复生成进程,先准备好回复(其实是和planer同时进行的)
|
||||
if not skip_planner:
|
||||
reply_to_str = await self._build_reply_to_str(message_data)
|
||||
gen_task = asyncio.create_task(
|
||||
self.response_handler.generate_response(
|
||||
message_data=message_data,
|
||||
available_actions=available_actions,
|
||||
reply_to=reply_to_str,
|
||||
request_type="chat.replyer.normal",
|
||||
)
|
||||
)
|
||||
|
||||
# Focus模式
|
||||
if not skip_planner:
|
||||
@@ -123,57 +165,237 @@ class CycleProcessor:
|
||||
with Timer("规划器", cycle_timers):
|
||||
plan_result, target_message = await self.action_planner.plan(mode=self.context.loop_mode)
|
||||
|
||||
action_result = plan_result.get("action_result", {}) if isinstance(plan_result, dict) else {}
|
||||
if not isinstance(action_result, dict):
|
||||
action_result = {}
|
||||
action_result = plan_result.get("action_result", {})
|
||||
|
||||
action_type = action_result.get("action_type", "error")
|
||||
action_data = action_result.get("action_data", {})
|
||||
reasoning = action_result.get("reasoning", "未提供理由")
|
||||
is_parallel = action_result.get("is_parallel", True)
|
||||
|
||||
action_data["loop_start_time"] = loop_start_time
|
||||
action_message = message_data or target_message
|
||||
|
||||
is_private_chat = self.context.chat_stream.group_info is None if self.context.chat_stream else False
|
||||
if self.context.loop_mode == ChatMode.FOCUS and is_private_chat and action_type == "no_reply":
|
||||
action_type = "reply"
|
||||
# is_private_chat = self.context.chat_stream.group_info is None if self.context.chat_stream else False
|
||||
|
||||
# 重构后的动作处理逻辑:先汇总所有动作,然后并行执行
|
||||
actions = []
|
||||
|
||||
if action_type == "reply":
|
||||
# 使用 action_planner 获取的 target_message,如果为空则使用原始 message_data
|
||||
actual_message = target_message or message_data
|
||||
await self._handle_reply_action(
|
||||
actual_message, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result
|
||||
# 1. 添加Planner取得的动作
|
||||
actions.append({
|
||||
"action_type": action_type,
|
||||
"reasoning": reasoning,
|
||||
"action_data": action_data,
|
||||
"action_message": action_message,
|
||||
"available_actions": available_actions # 添加这个字段
|
||||
})
|
||||
|
||||
# 2. 如果不是reply动作且需要并行执行,额外添加reply动作
|
||||
if action_type != "reply" and is_parallel:
|
||||
actions.append({
|
||||
"action_type": "reply",
|
||||
"action_message": action_message,
|
||||
"available_actions": available_actions
|
||||
})
|
||||
|
||||
async def execute_action(action_info):
|
||||
"""执行单个动作的通用函数"""
|
||||
try:
|
||||
if action_info["action_type"] == "no_reply":
|
||||
# 直接处理no_reply逻辑,不再通过动作系统
|
||||
reason = action_info.get("reasoning", "选择不回复")
|
||||
logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}")
|
||||
|
||||
# 存储no_reply信息到数据库
|
||||
await database_api.store_action_info(
|
||||
chat_stream=self.context.chat_stream,
|
||||
action_build_into_prompt=False,
|
||||
action_prompt_display=reason,
|
||||
action_done=True,
|
||||
thinking_id=thinking_id,
|
||||
action_data={"reason": reason},
|
||||
action_name="no_reply",
|
||||
)
|
||||
|
||||
return {
|
||||
"action_type": "no_reply",
|
||||
"success": True,
|
||||
"reply_text": "",
|
||||
"command": ""
|
||||
}
|
||||
elif action_info["action_type"] != "reply":
|
||||
# 执行普通动作
|
||||
with Timer("动作执行", cycle_timers):
|
||||
success, reply_text, command = await self._handle_action(
|
||||
action_info["action_type"],
|
||||
action_info["reasoning"],
|
||||
action_info["action_data"],
|
||||
cycle_timers,
|
||||
thinking_id,
|
||||
action_info["action_message"]
|
||||
)
|
||||
return {
|
||||
"action_type": action_info["action_type"],
|
||||
"success": success,
|
||||
"reply_text": reply_text,
|
||||
"command": command
|
||||
}
|
||||
else:
|
||||
# 执行回复动作
|
||||
reply_to_str = await self._build_reply_to_str(action_info["action_message"])
|
||||
request_type = "chat.replyer"
|
||||
|
||||
# 生成回复
|
||||
gather_timeout = global_config.chat.thinking_timeout
|
||||
try:
|
||||
response_set = await asyncio.wait_for(
|
||||
self.response_handler._generate_response(
|
||||
message_data=action_info["action_message"],
|
||||
available_actions=action_info["available_actions"],
|
||||
reply_to=reply_to_str,
|
||||
request_type=request_type,
|
||||
),
|
||||
timeout=gather_timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s,已跳过"
|
||||
)
|
||||
return {
|
||||
"action_type": "reply",
|
||||
"success": False,
|
||||
"reply_text": "",
|
||||
"loop_info": None
|
||||
}
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
|
||||
return {
|
||||
"action_type": "reply",
|
||||
"success": False,
|
||||
"reply_text": "",
|
||||
"loop_info": None
|
||||
}
|
||||
|
||||
if not response_set:
|
||||
logger.warning(f"{self.log_prefix} 模型超时或生成回复内容为空")
|
||||
return {
|
||||
"action_type": "reply",
|
||||
"success": False,
|
||||
"reply_text": "",
|
||||
"loop_info": None
|
||||
}
|
||||
|
||||
# TODO: Where is my fucking _send_and_store_reply?
|
||||
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
|
||||
response_set,
|
||||
reply_to_str,
|
||||
loop_start_time,
|
||||
action_info["action_message"],
|
||||
cycle_timers,
|
||||
thinking_id,
|
||||
plan_result,
|
||||
)
|
||||
return {
|
||||
"action_type": "reply",
|
||||
"success": True,
|
||||
"reply_text": reply_text,
|
||||
"loop_info": loop_info
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"{self.log_prefix} 执行动作时出错: {e}")
|
||||
return {
|
||||
"action_type": action_info["action_type"],
|
||||
"success": False,
|
||||
"reply_text": "",
|
||||
"loop_info": None,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
# 创建所有动作的后台任务
|
||||
action_tasks = [asyncio.create_task(execute_action(action)) for action in actions]
|
||||
|
||||
# 并行执行所有任务
|
||||
results = await asyncio.gather(*action_tasks, return_exceptions=True)
|
||||
|
||||
# 处理执行结果
|
||||
reply_loop_info = None
|
||||
reply_text_from_reply = ""
|
||||
action_success = False
|
||||
action_reply_text = ""
|
||||
action_command = ""
|
||||
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, BaseException):
|
||||
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
|
||||
continue
|
||||
|
||||
action_info = actions[i]
|
||||
if result["action_type"] != "reply":
|
||||
action_success = result["success"]
|
||||
action_reply_text = result["reply_text"]
|
||||
action_command = result.get("command", "")
|
||||
elif result["action_type"] == "reply":
|
||||
if result["success"]:
|
||||
reply_loop_info = result["loop_info"]
|
||||
reply_text_from_reply = result["reply_text"]
|
||||
else:
|
||||
logger.warning(f"{self.log_prefix} 回复动作执行失败")
|
||||
|
||||
# 构建最终的循环信息
|
||||
if reply_loop_info:
|
||||
# 如果有回复信息,使用回复的loop_info作为基础
|
||||
loop_info = reply_loop_info
|
||||
# 更新动作执行信息
|
||||
loop_info["loop_action_info"].update(
|
||||
{
|
||||
"action_taken": action_success,
|
||||
"command": action_command,
|
||||
"taken_time": time.time(),
|
||||
}
|
||||
)
|
||||
reply_text = reply_text_from_reply
|
||||
else:
|
||||
await self._handle_other_actions(
|
||||
action_type,
|
||||
reasoning,
|
||||
action_data,
|
||||
is_parallel,
|
||||
gen_task,
|
||||
target_message or message_data,
|
||||
cycle_timers,
|
||||
thinking_id,
|
||||
plan_result,
|
||||
loop_start_time,
|
||||
)
|
||||
|
||||
self.context.last_action = action_type
|
||||
|
||||
# 处理no_reply相关的逻辑
|
||||
if action_type != "no_reply":
|
||||
self.context.no_reply_consecutive = 0
|
||||
if hasattr(self.context, 'chat_instance') and self.context.chat_instance:
|
||||
self.context.chat_instance.recent_interest_records.clear()
|
||||
logger.info(f"{self.context.log_prefix} 执行了{action_type}动作,重置no_reply计数器和兴趣度记录")
|
||||
|
||||
if action_type == "no_reply":
|
||||
self.context.no_reply_consecutive += 1
|
||||
# 调用HeartFChatting中的_determine_form_type方法
|
||||
if hasattr(self.context, 'chat_instance') and self.context.chat_instance:
|
||||
self.context.chat_instance._determine_form_type()
|
||||
# 没有回复信息,构建纯动作的loop_info
|
||||
loop_info = {
|
||||
"loop_plan_info": {
|
||||
"action_result": plan_result.get("action_result", {}),
|
||||
},
|
||||
"loop_action_info": {
|
||||
"action_taken": action_success,
|
||||
"reply_text": action_reply_text,
|
||||
"command": action_command,
|
||||
"taken_time": time.time(),
|
||||
},
|
||||
}
|
||||
reply_text = action_reply_text
|
||||
|
||||
self.last_action = action_type
|
||||
|
||||
if ENABLE_S4U:
|
||||
await stop_typing()
|
||||
|
||||
self.context.chat_instance.cycle_tracker.end_cycle(loop_info, cycle_timers)
|
||||
self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers)
|
||||
|
||||
if self.context.loop_mode == ChatMode.NORMAL:
|
||||
await self.context.chat_instance.willing_manager.after_generate_reply_handle(message_data.get("message_id", ""))
|
||||
|
||||
# 管理no_reply计数器:当执行了非no_reply动作时,重置计数器
|
||||
if action_type != "no_reply" and action_type != "no_action":
|
||||
# no_reply逻辑已集成到heartFC_chat.py中,直接重置计数器
|
||||
self.context.chat_instance.recent_interest_records.clear()
|
||||
self.context.no_reply_consecutive = 0
|
||||
logger.info(f"{self.log_prefix} 执行了{action_type}动作,重置no_reply计数器")
|
||||
return True
|
||||
elif action_type == "no_action":
|
||||
# 当执行回复动作时,也重置no_reply计数
|
||||
self.context.chat_instance.recent_interest_records.clear()
|
||||
self.context.no_reply_consecutive = 0
|
||||
logger.info(f"{self.log_prefix} 执行了回复动作,重置no_reply计数器")
|
||||
|
||||
if action_type == "no_reply":
|
||||
self.context.no_reply_consecutive += 1
|
||||
self.context.chat_instance._determine_form_type()
|
||||
|
||||
# 在一轮动作执行完毕后,增加睡眠压力
|
||||
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
|
||||
if action_type not in ["no_reply", "no_action"]:
|
||||
|
||||
Reference in New Issue
Block a user