refactor(chat): 重构聊天循环逻辑,简化CycleProcessor职责
将`no_reply`计数器管理、睡眠压力增加等逻辑从`CycleProcessor`上移至`HeartFChatting`主循环中,使其职责更清晰。`CycleProcessor.observe`现在直接返回执行的动作类型,简化了其内部状态管理。 主要变更: - `CycleProcessor`不再处理回复生成、并行任务和最终循环信息的构建,这些复杂的逻辑被移除,极大地简化了该类。 - `HeartFChatting`现在负责根据`observe`返回的动作类型来管理`no_reply`计数器和睡眠压力。 - 删除了`CycleProcessor.execute_plan`方法,主动思考的回复流程被重构,直接调用`generator_api`和`response_handler.send_response`。 - 移除了`response_handler`中已废弃的`generate_response`方法。
This commit is contained in:
committed by
Windpicker-owo
parent
2e436dff33
commit
311c67f0c4
@@ -94,7 +94,7 @@ class CycleProcessor:
|
||||
|
||||
return loop_info, reply_text, cycle_timers
|
||||
|
||||
async def observe(self, interest_value: float = 0.0) -> bool:
|
||||
async def observe(self, interest_value: float = 0.0) -> str:
|
||||
"""
|
||||
观察和处理单次思考循环的核心方法
|
||||
|
||||
@@ -326,226 +326,7 @@ class CycleProcessor:
|
||||
self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers)
|
||||
|
||||
action_type = actions[0]["action_type"] if actions else "no_action"
|
||||
# 管理no_reply计数器:当执行了非no_reply动作时,重置计数器
|
||||
if action_type != "no_reply":
|
||||
# no_reply逻辑已集成到heartFC_chat.py中,直接重置计数器
|
||||
self.context.chat_instance.recent_interest_records.clear()
|
||||
self.context.no_reply_consecutive = 0
|
||||
logger.debug(f"{self.log_prefix} 执行了{action_type}动作,重置no_reply计数器")
|
||||
return True
|
||||
|
||||
if action_type == "no_reply":
|
||||
self.context.no_reply_consecutive += 1
|
||||
self.context.chat_instance._determine_form_type()
|
||||
|
||||
# 在一轮动作执行完毕后,增加睡眠压力
|
||||
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
|
||||
if action_type not in ["no_reply", "no_action"]:
|
||||
self.context.energy_manager.increase_sleep_pressure()
|
||||
|
||||
return True
|
||||
|
||||
async def execute_plan(self, action_result: Dict[str, Any], target_message: Optional[Dict[str, Any]]):
|
||||
"""
|
||||
执行一个已经制定好的计划
|
||||
"""
|
||||
action_type = action_result.get("action_type", "error")
|
||||
|
||||
# 这里我们需要为执行计划创建一个新的循环追踪
|
||||
cycle_timers, thinking_id = self.cycle_tracker.start_cycle(is_proactive=True)
|
||||
loop_start_time = time.time()
|
||||
|
||||
if action_type == "reply":
|
||||
# 主动思考不应该直接触发简单回复,但为了逻辑完整性,我们假设它会调用response_handler
|
||||
# 注意:这里的 available_actions 和 plan_result 是缺失的,需要根据实际情况处理
|
||||
await self._handle_reply_action(
|
||||
target_message, {}, None, loop_start_time, cycle_timers, thinking_id, {"action_result": action_result}
|
||||
)
|
||||
else:
|
||||
await self._handle_other_actions(
|
||||
action_type,
|
||||
action_result.get("reasoning", ""),
|
||||
action_result.get("action_data", {}),
|
||||
action_result.get("is_parallel", False),
|
||||
None,
|
||||
target_message,
|
||||
cycle_timers,
|
||||
thinking_id,
|
||||
{"action_result": action_result},
|
||||
loop_start_time,
|
||||
)
|
||||
|
||||
async def _handle_reply_action(
|
||||
self, message_data, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result
|
||||
):
|
||||
"""
|
||||
处理回复类型的动作
|
||||
|
||||
Args:
|
||||
message_data: 消息数据
|
||||
available_actions: 可用动作列表
|
||||
gen_task: 预先创建的生成任务(可能为None)
|
||||
loop_start_time: 循环开始时间
|
||||
cycle_timers: 循环计时器
|
||||
thinking_id: 思考ID
|
||||
plan_result: 规划结果
|
||||
|
||||
功能说明:
|
||||
- 根据聊天模式决定是否使用预生成的回复或实时生成
|
||||
- 在NORMAL模式下使用异步生成提高效率
|
||||
- 在FOCUS模式下同步生成确保及时响应
|
||||
- 发送生成的回复并结束循环
|
||||
"""
|
||||
# 初始化reply_to_str以避免UnboundLocalError
|
||||
reply_to_str = None
|
||||
|
||||
if self.context.loop_mode == ChatMode.NORMAL:
|
||||
if not gen_task:
|
||||
reply_to_str = await self._build_reply_to_str(message_data)
|
||||
gen_task = asyncio.create_task(
|
||||
self.response_handler.generate_response(
|
||||
message_data=message_data,
|
||||
available_actions=available_actions,
|
||||
reply_to=reply_to_str,
|
||||
request_type="chat.replyer.normal",
|
||||
)
|
||||
)
|
||||
else:
|
||||
# 如果gen_task已存在但reply_to_str还未构建,需要构建它
|
||||
if reply_to_str is None:
|
||||
reply_to_str = await self._build_reply_to_str(message_data)
|
||||
|
||||
try:
|
||||
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
response_set = None
|
||||
else:
|
||||
reply_to_str = await self._build_reply_to_str(message_data)
|
||||
response_set = await self.response_handler.generate_response(
|
||||
message_data=message_data,
|
||||
available_actions=available_actions,
|
||||
reply_to=reply_to_str,
|
||||
request_type="chat.replyer.focus",
|
||||
)
|
||||
|
||||
if response_set:
|
||||
loop_info, _, _ = await self.response_handler.generate_and_send_reply(
|
||||
response_set, reply_to_str, loop_start_time, message_data, cycle_timers, thinking_id, plan_result
|
||||
)
|
||||
self.cycle_tracker.end_cycle(loop_info, cycle_timers)
|
||||
|
||||
async def _handle_other_actions(
|
||||
self,
|
||||
action_type,
|
||||
reasoning,
|
||||
action_data,
|
||||
is_parallel,
|
||||
gen_task,
|
||||
action_message,
|
||||
cycle_timers,
|
||||
thinking_id,
|
||||
plan_result,
|
||||
loop_start_time,
|
||||
):
|
||||
"""
|
||||
处理非回复类型的动作(如no_reply、自定义动作等)
|
||||
|
||||
Args:
|
||||
action_type: 动作类型
|
||||
reasoning: 动作理由
|
||||
action_data: 动作数据
|
||||
is_parallel: 是否并行执行
|
||||
gen_task: 生成任务
|
||||
action_message: 动作消息
|
||||
cycle_timers: 循环计时器
|
||||
thinking_id: 思考ID
|
||||
plan_result: 规划结果
|
||||
loop_start_time: 循环开始时间
|
||||
|
||||
功能说明:
|
||||
- 在NORMAL模式下可能并行执行回复生成和动作处理
|
||||
- 等待所有异步任务完成
|
||||
- 整合回复和动作的执行结果
|
||||
- 构建最终循环信息并结束循环
|
||||
"""
|
||||
background_reply_task = None
|
||||
if self.context.loop_mode == ChatMode.NORMAL and is_parallel and gen_task:
|
||||
background_reply_task = asyncio.create_task(
|
||||
self._handle_parallel_reply(
|
||||
gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
|
||||
)
|
||||
)
|
||||
|
||||
background_action_task = asyncio.create_task(
|
||||
self._handle_action(action_type, reasoning, action_data, cycle_timers, thinking_id, action_message)
|
||||
)
|
||||
|
||||
reply_loop_info, action_success, action_reply_text, action_command = None, False, "", ""
|
||||
|
||||
if background_reply_task:
|
||||
results = await asyncio.gather(background_reply_task, background_action_task, return_exceptions=True)
|
||||
reply_result, action_result_val = results
|
||||
if not isinstance(reply_result, BaseException) and reply_result is not None:
|
||||
reply_loop_info, _, _ = reply_result
|
||||
else:
|
||||
reply_loop_info = None
|
||||
|
||||
if not isinstance(action_result_val, BaseException) and action_result_val is not None:
|
||||
action_success, action_reply_text, action_command = action_result_val
|
||||
else:
|
||||
action_success, action_reply_text, action_command = False, "", ""
|
||||
else:
|
||||
results = await asyncio.gather(background_action_task, return_exceptions=True)
|
||||
if results and len(results) > 0:
|
||||
action_result_val = results[0] # Get the actual result from the tuple
|
||||
else:
|
||||
action_result_val = (False, "", "")
|
||||
|
||||
if not isinstance(action_result_val, BaseException) and action_result_val is not None:
|
||||
action_success, action_reply_text, action_command = action_result_val
|
||||
else:
|
||||
action_success, action_reply_text, action_command = False, "", ""
|
||||
|
||||
loop_info = self._build_final_loop_info(
|
||||
reply_loop_info, action_success, action_reply_text, action_command, plan_result
|
||||
)
|
||||
self.cycle_tracker.end_cycle(loop_info, cycle_timers)
|
||||
|
||||
async def _handle_parallel_reply(
|
||||
self, gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
|
||||
):
|
||||
"""
|
||||
处理并行回复生成
|
||||
|
||||
Args:
|
||||
gen_task: 回复生成任务
|
||||
loop_start_time: 循环开始时间
|
||||
action_message: 动作消息
|
||||
cycle_timers: 循环计时器
|
||||
thinking_id: 思考ID
|
||||
plan_result: 规划结果
|
||||
|
||||
Returns:
|
||||
tuple: (循环信息, 回复文本, 计时器信息) 或 None
|
||||
|
||||
功能说明:
|
||||
- 等待并行回复生成任务完成(带超时)
|
||||
- 构建回复目标字符串
|
||||
- 发送生成的回复
|
||||
- 返回循环信息供上级方法使用
|
||||
"""
|
||||
try:
|
||||
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
return None, "", {}
|
||||
|
||||
if not response_set:
|
||||
return None, "", {}
|
||||
|
||||
reply_to_str = await self._build_reply_to_str(action_message)
|
||||
return await self.response_handler.generate_and_send_reply(
|
||||
response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
|
||||
)
|
||||
return action_type
|
||||
|
||||
async def _handle_action(
|
||||
self, action, reasoning, action_data, cycle_timers, thinking_id, action_message
|
||||
@@ -594,12 +375,12 @@ class CycleProcessor:
|
||||
if "reply" in available_actions:
|
||||
fallback_action = "reply"
|
||||
elif available_actions:
|
||||
fallback_action = list(available_actions.keys())[0]
|
||||
fallback_action = list(available_actions.keys())
|
||||
|
||||
if fallback_action and fallback_action != action:
|
||||
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
|
||||
action_handler = self.context.action_manager.create_action(
|
||||
action_name=fallback_action,
|
||||
action_name=fallback_action if isinstance(fallback_action, list) else fallback_action,
|
||||
action_data=action_data,
|
||||
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
|
||||
cycle_timers=cycle_timers,
|
||||
@@ -619,43 +400,3 @@ class CycleProcessor:
|
||||
logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}")
|
||||
traceback.print_exc()
|
||||
return False, "", ""
|
||||
|
||||
def _build_final_loop_info(self, reply_loop_info, action_success, action_reply_text, action_command, plan_result):
|
||||
"""
|
||||
构建最终的循环信息
|
||||
|
||||
Args:
|
||||
reply_loop_info: 回复循环信息(可能为None)
|
||||
action_success: 动作执行是否成功
|
||||
action_reply_text: 动作回复文本
|
||||
action_command: 动作命令
|
||||
plan_result: 规划结果
|
||||
|
||||
Returns:
|
||||
dict: 完整的循环信息,包含规划信息和动作信息
|
||||
|
||||
功能说明:
|
||||
- 如果有回复循环信息,则在其基础上添加动作信息
|
||||
- 如果没有回复信息,则创建新的循环信息结构
|
||||
- 整合所有执行结果供循环跟踪器记录
|
||||
"""
|
||||
if reply_loop_info:
|
||||
loop_info = reply_loop_info
|
||||
loop_info["loop_action_info"].update(
|
||||
{
|
||||
"action_taken": action_success,
|
||||
"command": action_command,
|
||||
"taken_time": time.time(),
|
||||
}
|
||||
)
|
||||
else:
|
||||
loop_info = {
|
||||
"loop_plan_info": {"action_result": plan_result.get("action_result", {})},
|
||||
"loop_action_info": {
|
||||
"action_taken": action_success,
|
||||
"reply_text": action_reply_text,
|
||||
"command": action_command,
|
||||
"taken_time": time.time(),
|
||||
},
|
||||
}
|
||||
return loop_info
|
||||
|
||||
@@ -342,31 +342,36 @@ class HeartFChatting:
|
||||
logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。")
|
||||
|
||||
# 根据聊天模式处理新消息
|
||||
# 统一使用 _should_process_messages 判断是否应该处理
|
||||
should_process, interest_value = await self._should_process_messages(recent_messages)
|
||||
if should_process:
|
||||
self.context.last_read_time = time.time()
|
||||
await self.cycle_processor.observe(interest_value=interest_value)
|
||||
else:
|
||||
# Normal模式:消息数量不足,等待
|
||||
if not should_process:
|
||||
# 消息数量不足或兴趣不够,等待
|
||||
await asyncio.sleep(0.5)
|
||||
return True
|
||||
return True # Skip rest of the logic for this iteration
|
||||
|
||||
if not await self._should_process_messages(recent_messages):
|
||||
return has_new_messages
|
||||
# Messages should be processed
|
||||
action_type = await self.cycle_processor.observe(interest_value=interest_value)
|
||||
|
||||
# 处理新消息
|
||||
for message in recent_messages:
|
||||
await self.cycle_processor.observe(interest_value=interest_value)
|
||||
# 管理no_reply计数器
|
||||
if action_type != "no_reply":
|
||||
self.recent_interest_records.clear()
|
||||
self.context.no_reply_consecutive = 0
|
||||
logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作,重置no_reply计数器")
|
||||
else: # action_type == "no_reply"
|
||||
self.context.no_reply_consecutive += 1
|
||||
self._determine_form_type()
|
||||
|
||||
# 在一轮动作执行完毕后,增加睡眠压力
|
||||
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
|
||||
if action_type not in ["no_reply", "no_action"]:
|
||||
self.context.energy_manager.increase_sleep_pressure()
|
||||
|
||||
# 如果成功观察,增加能量值并重置累积兴趣值
|
||||
if has_new_messages:
|
||||
self.context.energy_value += 1 / global_config.chat.focus_value
|
||||
# 重置累积兴趣值,因为消息已经被成功处理
|
||||
self.context.breaking_accumulated_interest = 0.0
|
||||
logger.info(
|
||||
f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值"
|
||||
)
|
||||
self.context.energy_value += 1 / global_config.chat.focus_value
|
||||
# 重置累积兴趣值,因为消息已经被成功处理
|
||||
self.context.breaking_accumulated_interest = 0.0
|
||||
logger.info(
|
||||
f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值"
|
||||
)
|
||||
|
||||
# 更新上一帧的睡眠状态
|
||||
self.context.was_sleeping = is_sleeping
|
||||
|
||||
@@ -6,6 +6,7 @@ from src.common.logger import get_logger
|
||||
from src.plugin_system.base.component_types import ChatMode
|
||||
from ..hfc_context import HfcContext
|
||||
from .events import ProactiveTriggerEvent
|
||||
from src.plugin_system.apis import generator_api
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..cycle_processor import CycleProcessor
|
||||
@@ -103,23 +104,19 @@ class ProactiveThinker:
|
||||
|
||||
# 如果决策不是 do_nothing,则执行
|
||||
if action_result and action_result.get("action_type") != "do_nothing":
|
||||
# 在主动思考时,如果 target_message 为 None,则默认选取最新 message 作为 target_message
|
||||
if target_message is None and self.context.chat_stream and self.context.chat_stream.context:
|
||||
from src.chat.message_receive.message import MessageRecv
|
||||
|
||||
latest_message = self.context.chat_stream.context.get_last_message()
|
||||
if isinstance(latest_message, MessageRecv):
|
||||
user_info = latest_message.message_info.user_info
|
||||
target_message = {
|
||||
"chat_info_platform": latest_message.message_info.platform,
|
||||
"user_platform": user_info.platform if user_info else None,
|
||||
"user_id": user_info.user_id if user_info else None,
|
||||
"processed_plain_text": latest_message.processed_plain_text,
|
||||
"is_mentioned": latest_message.is_mentioned,
|
||||
}
|
||||
|
||||
# 将决策结果交给 cycle_processor 的后续流程处理
|
||||
await self.cycle_processor.execute_plan(action_result, target_message)
|
||||
if action_result.get("action_type") == "reply":
|
||||
success, response_set, _ = await generator_api.generate_reply(
|
||||
chat_stream=self.context.chat_stream,
|
||||
reply_message=action_result["action_message"],
|
||||
available_actions={},
|
||||
enable_tool=False,
|
||||
request_type="chat.replyer.proactive",
|
||||
from_plugin=False,
|
||||
)
|
||||
if success and response_set:
|
||||
await self.cycle_processor.response_handler.send_response(
|
||||
response_set, time.time(), action_result["action_message"]
|
||||
)
|
||||
else:
|
||||
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
|
||||
|
||||
|
||||
@@ -172,101 +172,3 @@ class ResponseHandler:
|
||||
)
|
||||
|
||||
return reply_text
|
||||
|
||||
# TODO: 已废弃
|
||||
async def generate_response(
|
||||
self,
|
||||
message_data: dict,
|
||||
available_actions: Optional[Dict[str, Any]],
|
||||
reply_to: str,
|
||||
request_type: str = "chat.replyer.normal",
|
||||
) -> Optional[list]:
|
||||
"""
|
||||
生成回复内容
|
||||
|
||||
Args:
|
||||
message_data: 消息数据
|
||||
available_actions: 可用动作列表
|
||||
reply_to: 回复目标
|
||||
request_type: 请求类型,默认为普通回复
|
||||
|
||||
Returns:
|
||||
list: 生成的回复内容列表,失败时返回None
|
||||
|
||||
功能说明:
|
||||
- 在生成回复前进行反注入检测(提高效率)
|
||||
- 调用生成器API生成回复
|
||||
- 根据配置启用或禁用工具功能
|
||||
- 处理生成失败的情况
|
||||
- 记录生成过程中的错误和异常
|
||||
"""
|
||||
try:
|
||||
# === 反注入检测(仅在需要生成回复时) ===
|
||||
# 执行反注入检测(直接使用字典格式)
|
||||
anti_injector = get_anti_injector()
|
||||
result, modified_content, reason = await anti_injector.process_message(
|
||||
message_data, self.context.chat_stream
|
||||
)
|
||||
|
||||
# 根据反注入结果处理消息数据
|
||||
await anti_injector.handle_message_storage(result, modified_content, reason or "", message_data)
|
||||
|
||||
if result == ProcessResult.BLOCKED_BAN:
|
||||
# 用户被封禁 - 直接阻止回复生成
|
||||
anti_injector_logger.warning(f"用户被反注入系统封禁,阻止回复生成: {reason}")
|
||||
return None
|
||||
elif result == ProcessResult.BLOCKED_INJECTION:
|
||||
# 消息被阻止(危险内容等) - 直接阻止回复生成
|
||||
anti_injector_logger.warning(f"消息被反注入系统阻止,阻止回复生成: {reason}")
|
||||
return None
|
||||
elif result == ProcessResult.COUNTER_ATTACK:
|
||||
# 反击模式:生成反击消息作为回复
|
||||
anti_injector_logger.info(f"反击模式启动,生成反击回复: {reason}")
|
||||
if modified_content:
|
||||
# 返回反击消息作为回复内容
|
||||
return [("text", modified_content)]
|
||||
else:
|
||||
# 没有反击内容时阻止回复生成
|
||||
return None
|
||||
|
||||
# 检查是否需要加盾处理
|
||||
safety_prompt = None
|
||||
if result == ProcessResult.SHIELDED:
|
||||
# 获取安全系统提示词并注入
|
||||
shield = anti_injector.shield
|
||||
safety_prompt = shield.get_safety_system_prompt()
|
||||
await Prompt.create_async(safety_prompt, "anti_injection_safety_prompt")
|
||||
anti_injector_logger.info(f"消息已被反注入系统加盾处理,已注入安全提示词: {reason}")
|
||||
|
||||
# 处理被修改的消息内容(用于生成回复)
|
||||
modified_reply_to = reply_to
|
||||
if modified_content:
|
||||
# 更新消息内容用于生成回复
|
||||
anti_injector_logger.info(f"消息内容已被反注入系统修改,使用修改后内容生成回复: {reason}")
|
||||
# 解析原始reply_to格式:"发送者:消息内容"
|
||||
if ":" in reply_to:
|
||||
sender_part, _ = reply_to.split(":", 1)
|
||||
modified_reply_to = f"{sender_part}:{modified_content}"
|
||||
else:
|
||||
# 如果格式不标准,直接使用修改后的内容
|
||||
modified_reply_to = modified_content
|
||||
|
||||
# === 正常的回复生成流程 ===
|
||||
success, reply_set, _ = await generator_api.generate_reply(
|
||||
chat_stream=self.context.chat_stream,
|
||||
reply_to=modified_reply_to, # 使用可能被修改的内容
|
||||
available_actions=available_actions,
|
||||
enable_tool=global_config.tool.enable_tool,
|
||||
request_type=request_type,
|
||||
from_plugin=False,
|
||||
)
|
||||
|
||||
if not success or not reply_set:
|
||||
logger.info(f"对 {message_data.get('processed_plain_text')} 的回复生成失败")
|
||||
return None
|
||||
|
||||
return reply_set
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{self.context.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user