refactor(chat): 重构聊天循环逻辑,简化CycleProcessor职责

将`no_reply`计数器管理、睡眠压力增加等逻辑从`CycleProcessor`上移至`HeartFChatting`主循环中,使其职责更清晰。`CycleProcessor.observe`现在直接返回执行的动作类型,简化了其内部状态管理。

主要变更:
- `CycleProcessor`不再处理回复生成、并行任务和最终循环信息的构建,这些复杂的逻辑被移除,极大地简化了该类。
- `HeartFChatting`现在负责根据`observe`返回的动作类型来管理`no_reply`计数器和睡眠压力。
- 删除了`CycleProcessor.execute_plan`方法,主动思考的回复流程被重构,直接调用`generator_api`和`response_handler.send_response`。
- 移除了`response_handler`中已废弃的`generate_response`方法。
This commit is contained in:
minecraft1024a
2025-09-05 21:40:42 +08:00
parent e1fbdaad8c
commit 4b256721d3
4 changed files with 42 additions and 397 deletions

View File

@@ -94,7 +94,7 @@ class CycleProcessor:
return loop_info, reply_text, cycle_timers return loop_info, reply_text, cycle_timers
async def observe(self, interest_value: float = 0.0) -> bool: async def observe(self, interest_value: float = 0.0) -> str:
""" """
观察和处理单次思考循环的核心方法 观察和处理单次思考循环的核心方法
@@ -326,226 +326,7 @@ class CycleProcessor:
self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers) self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers)
action_type = actions[0]["action_type"] if actions else "no_action" action_type = actions[0]["action_type"] if actions else "no_action"
# 管理no_reply计数器当执行了非no_reply动作时重置计数器 return action_type
if action_type != "no_reply":
# no_reply逻辑已集成到heartFC_chat.py中直接重置计数器
self.context.chat_instance.recent_interest_records.clear()
self.context.no_reply_consecutive = 0
logger.debug(f"{self.log_prefix} 执行了{action_type}动作重置no_reply计数器")
return True
if action_type == "no_reply":
self.context.no_reply_consecutive += 1
self.context.chat_instance._determine_form_type()
# 在一轮动作执行完毕后,增加睡眠压力
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
if action_type not in ["no_reply", "no_action"]:
self.context.energy_manager.increase_sleep_pressure()
return True
async def execute_plan(self, action_result: Dict[str, Any], target_message: Optional[Dict[str, Any]]):
"""
执行一个已经制定好的计划
"""
action_type = action_result.get("action_type", "error")
# 这里我们需要为执行计划创建一个新的循环追踪
cycle_timers, thinking_id = self.cycle_tracker.start_cycle(is_proactive=True)
loop_start_time = time.time()
if action_type == "reply":
# 主动思考不应该直接触发简单回复但为了逻辑完整性我们假设它会调用response_handler
# 注意:这里的 available_actions 和 plan_result 是缺失的,需要根据实际情况处理
await self._handle_reply_action(
target_message, {}, None, loop_start_time, cycle_timers, thinking_id, {"action_result": action_result}
)
else:
await self._handle_other_actions(
action_type,
action_result.get("reasoning", ""),
action_result.get("action_data", {}),
action_result.get("is_parallel", False),
None,
target_message,
cycle_timers,
thinking_id,
{"action_result": action_result},
loop_start_time,
)
async def _handle_reply_action(
self, message_data, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result
):
"""
处理回复类型的动作
Args:
message_data: 消息数据
available_actions: 可用动作列表
gen_task: 预先创建的生成任务可能为None
loop_start_time: 循环开始时间
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
功能说明:
- 根据聊天模式决定是否使用预生成的回复或实时生成
- 在NORMAL模式下使用异步生成提高效率
- 在FOCUS模式下同步生成确保及时响应
- 发送生成的回复并结束循环
"""
# 初始化reply_to_str以避免UnboundLocalError
reply_to_str = None
if self.context.loop_mode == ChatMode.NORMAL:
if not gen_task:
reply_to_str = await self._build_reply_to_str(message_data)
gen_task = asyncio.create_task(
self.response_handler.generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal",
)
)
else:
# 如果gen_task已存在但reply_to_str还未构建需要构建它
if reply_to_str is None:
reply_to_str = await self._build_reply_to_str(message_data)
try:
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
except asyncio.TimeoutError:
response_set = None
else:
reply_to_str = await self._build_reply_to_str(message_data)
response_set = await self.response_handler.generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.focus",
)
if response_set:
loop_info, _, _ = await self.response_handler.generate_and_send_reply(
response_set, reply_to_str, loop_start_time, message_data, cycle_timers, thinking_id, plan_result
)
self.cycle_tracker.end_cycle(loop_info, cycle_timers)
async def _handle_other_actions(
self,
action_type,
reasoning,
action_data,
is_parallel,
gen_task,
action_message,
cycle_timers,
thinking_id,
plan_result,
loop_start_time,
):
"""
处理非回复类型的动作如no_reply、自定义动作等
Args:
action_type: 动作类型
reasoning: 动作理由
action_data: 动作数据
is_parallel: 是否并行执行
gen_task: 生成任务
action_message: 动作消息
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
loop_start_time: 循环开始时间
功能说明:
- 在NORMAL模式下可能并行执行回复生成和动作处理
- 等待所有异步任务完成
- 整合回复和动作的执行结果
- 构建最终循环信息并结束循环
"""
background_reply_task = None
if self.context.loop_mode == ChatMode.NORMAL and is_parallel and gen_task:
background_reply_task = asyncio.create_task(
self._handle_parallel_reply(
gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
)
)
background_action_task = asyncio.create_task(
self._handle_action(action_type, reasoning, action_data, cycle_timers, thinking_id, action_message)
)
reply_loop_info, action_success, action_reply_text, action_command = None, False, "", ""
if background_reply_task:
results = await asyncio.gather(background_reply_task, background_action_task, return_exceptions=True)
reply_result, action_result_val = results
if not isinstance(reply_result, BaseException) and reply_result is not None:
reply_loop_info, _, _ = reply_result
else:
reply_loop_info = None
if not isinstance(action_result_val, BaseException) and action_result_val is not None:
action_success, action_reply_text, action_command = action_result_val
else:
action_success, action_reply_text, action_command = False, "", ""
else:
results = await asyncio.gather(background_action_task, return_exceptions=True)
if results and len(results) > 0:
action_result_val = results[0] # Get the actual result from the tuple
else:
action_result_val = (False, "", "")
if not isinstance(action_result_val, BaseException) and action_result_val is not None:
action_success, action_reply_text, action_command = action_result_val
else:
action_success, action_reply_text, action_command = False, "", ""
loop_info = self._build_final_loop_info(
reply_loop_info, action_success, action_reply_text, action_command, plan_result
)
self.cycle_tracker.end_cycle(loop_info, cycle_timers)
async def _handle_parallel_reply(
self, gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
):
"""
处理并行回复生成
Args:
gen_task: 回复生成任务
loop_start_time: 循环开始时间
action_message: 动作消息
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
Returns:
tuple: (循环信息, 回复文本, 计时器信息) 或 None
功能说明:
- 等待并行回复生成任务完成(带超时)
- 构建回复目标字符串
- 发送生成的回复
- 返回循环信息供上级方法使用
"""
try:
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
except asyncio.TimeoutError:
return None, "", {}
if not response_set:
return None, "", {}
reply_to_str = await self._build_reply_to_str(action_message)
return await self.response_handler.generate_and_send_reply(
response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
)
async def _handle_action( async def _handle_action(
self, action, reasoning, action_data, cycle_timers, thinking_id, action_message self, action, reasoning, action_data, cycle_timers, thinking_id, action_message
@@ -594,12 +375,12 @@ class CycleProcessor:
if "reply" in available_actions: if "reply" in available_actions:
fallback_action = "reply" fallback_action = "reply"
elif available_actions: elif available_actions:
fallback_action = list(available_actions.keys())[0] fallback_action = list(available_actions.keys())
if fallback_action and fallback_action != action: if fallback_action and fallback_action != action:
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}") logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
action_handler = self.context.action_manager.create_action( action_handler = self.context.action_manager.create_action(
action_name=fallback_action, action_name=fallback_action if isinstance(fallback_action, list) else fallback_action,
action_data=action_data, action_data=action_data,
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}", reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
cycle_timers=cycle_timers, cycle_timers=cycle_timers,
@@ -619,43 +400,3 @@ class CycleProcessor:
logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}") logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}")
traceback.print_exc() traceback.print_exc()
return False, "", "" return False, "", ""
def _build_final_loop_info(self, reply_loop_info, action_success, action_reply_text, action_command, plan_result):
"""
构建最终的循环信息
Args:
reply_loop_info: 回复循环信息可能为None
action_success: 动作执行是否成功
action_reply_text: 动作回复文本
action_command: 动作命令
plan_result: 规划结果
Returns:
dict: 完整的循环信息,包含规划信息和动作信息
功能说明:
- 如果有回复循环信息,则在其基础上添加动作信息
- 如果没有回复信息,则创建新的循环信息结构
- 整合所有执行结果供循环跟踪器记录
"""
if reply_loop_info:
loop_info = reply_loop_info
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
)
else:
loop_info = {
"loop_plan_info": {"action_result": plan_result.get("action_result", {})},
"loop_action_info": {
"action_taken": action_success,
"reply_text": action_reply_text,
"command": action_command,
"taken_time": time.time(),
},
}
return loop_info

View File

@@ -342,31 +342,36 @@ class HeartFChatting:
logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。") logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。")
# 根据聊天模式处理新消息 # 根据聊天模式处理新消息
# 统一使用 _should_process_messages 判断是否应该处理
should_process, interest_value = await self._should_process_messages(recent_messages) should_process, interest_value = await self._should_process_messages(recent_messages)
if should_process: if not should_process:
self.context.last_read_time = time.time() # 消息数量不足或兴趣不够,等待
await self.cycle_processor.observe(interest_value=interest_value)
else:
# Normal模式消息数量不足等待
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
return True return True # Skip rest of the logic for this iteration
if not await self._should_process_messages(recent_messages): # Messages should be processed
return has_new_messages action_type = await self.cycle_processor.observe(interest_value=interest_value)
# 处理新消息 # 管理no_reply计数器
for message in recent_messages: if action_type != "no_reply":
await self.cycle_processor.observe(interest_value=interest_value) self.recent_interest_records.clear()
self.context.no_reply_consecutive = 0
logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作重置no_reply计数器")
else: # action_type == "no_reply"
self.context.no_reply_consecutive += 1
self._determine_form_type()
# 在一轮动作执行完毕后,增加睡眠压力
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
if action_type not in ["no_reply", "no_action"]:
self.context.energy_manager.increase_sleep_pressure()
# 如果成功观察,增加能量值并重置累积兴趣值 # 如果成功观察,增加能量值并重置累积兴趣值
if has_new_messages: self.context.energy_value += 1 / global_config.chat.focus_value
self.context.energy_value += 1 / global_config.chat.focus_value # 重置累积兴趣值,因为消息已经被成功处理
# 重置累积兴趣值,因为消息已经被成功处理 self.context.breaking_accumulated_interest = 0.0
self.context.breaking_accumulated_interest = 0.0 logger.info(
logger.info( f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值"
f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值" )
)
# 更新上一帧的睡眠状态 # 更新上一帧的睡眠状态
self.context.was_sleeping = is_sleeping self.context.was_sleeping = is_sleeping

View File

@@ -6,6 +6,7 @@ from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatMode from src.plugin_system.base.component_types import ChatMode
from ..hfc_context import HfcContext from ..hfc_context import HfcContext
from .events import ProactiveTriggerEvent from .events import ProactiveTriggerEvent
from src.plugin_system.apis import generator_api
if TYPE_CHECKING: if TYPE_CHECKING:
from ..cycle_processor import CycleProcessor from ..cycle_processor import CycleProcessor
@@ -103,23 +104,19 @@ class ProactiveThinker:
# 如果决策不是 do_nothing则执行 # 如果决策不是 do_nothing则执行
if action_result and action_result.get("action_type") != "do_nothing": if action_result and action_result.get("action_type") != "do_nothing":
# 在主动思考时,如果 target_message 为 None则默认选取最新 message 作为 target_message if action_result.get("action_type") == "reply":
if target_message is None and self.context.chat_stream and self.context.chat_stream.context: success, response_set, _ = await generator_api.generate_reply(
from src.chat.message_receive.message import MessageRecv chat_stream=self.context.chat_stream,
reply_message=action_result["action_message"],
latest_message = self.context.chat_stream.context.get_last_message() available_actions={},
if isinstance(latest_message, MessageRecv): enable_tool=False,
user_info = latest_message.message_info.user_info request_type="chat.replyer.proactive",
target_message = { from_plugin=False,
"chat_info_platform": latest_message.message_info.platform, )
"user_platform": user_info.platform if user_info else None, if success and response_set:
"user_id": user_info.user_id if user_info else None, await self.cycle_processor.response_handler.send_response(
"processed_plain_text": latest_message.processed_plain_text, response_set, time.time(), action_result["action_message"]
"is_mentioned": latest_message.is_mentioned, )
}
# 将决策结果交给 cycle_processor 的后续流程处理
await self.cycle_processor.execute_plan(action_result, target_message)
else: else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")

View File

@@ -172,101 +172,3 @@ class ResponseHandler:
) )
return reply_text return reply_text
# TODO: 已废弃
async def generate_response(
self,
message_data: dict,
available_actions: Optional[Dict[str, Any]],
reply_to: str,
request_type: str = "chat.replyer.normal",
) -> Optional[list]:
"""
生成回复内容
Args:
message_data: 消息数据
available_actions: 可用动作列表
reply_to: 回复目标
request_type: 请求类型,默认为普通回复
Returns:
list: 生成的回复内容列表失败时返回None
功能说明:
- 在生成回复前进行反注入检测(提高效率)
- 调用生成器API生成回复
- 根据配置启用或禁用工具功能
- 处理生成失败的情况
- 记录生成过程中的错误和异常
"""
try:
# === 反注入检测(仅在需要生成回复时) ===
# 执行反注入检测(直接使用字典格式)
anti_injector = get_anti_injector()
result, modified_content, reason = await anti_injector.process_message(
message_data, self.context.chat_stream
)
# 根据反注入结果处理消息数据
await anti_injector.handle_message_storage(result, modified_content, reason or "", message_data)
if result == ProcessResult.BLOCKED_BAN:
# 用户被封禁 - 直接阻止回复生成
anti_injector_logger.warning(f"用户被反注入系统封禁,阻止回复生成: {reason}")
return None
elif result == ProcessResult.BLOCKED_INJECTION:
# 消息被阻止(危险内容等) - 直接阻止回复生成
anti_injector_logger.warning(f"消息被反注入系统阻止,阻止回复生成: {reason}")
return None
elif result == ProcessResult.COUNTER_ATTACK:
# 反击模式:生成反击消息作为回复
anti_injector_logger.info(f"反击模式启动,生成反击回复: {reason}")
if modified_content:
# 返回反击消息作为回复内容
return [("text", modified_content)]
else:
# 没有反击内容时阻止回复生成
return None
# 检查是否需要加盾处理
safety_prompt = None
if result == ProcessResult.SHIELDED:
# 获取安全系统提示词并注入
shield = anti_injector.shield
safety_prompt = shield.get_safety_system_prompt()
await Prompt.create_async(safety_prompt, "anti_injection_safety_prompt")
anti_injector_logger.info(f"消息已被反注入系统加盾处理,已注入安全提示词: {reason}")
# 处理被修改的消息内容(用于生成回复)
modified_reply_to = reply_to
if modified_content:
# 更新消息内容用于生成回复
anti_injector_logger.info(f"消息内容已被反注入系统修改,使用修改后内容生成回复: {reason}")
# 解析原始reply_to格式"发送者:消息内容"
if ":" in reply_to:
sender_part, _ = reply_to.split(":", 1)
modified_reply_to = f"{sender_part}:{modified_content}"
else:
# 如果格式不标准,直接使用修改后的内容
modified_reply_to = modified_content
# === 正常的回复生成流程 ===
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.context.chat_stream,
reply_to=modified_reply_to, # 使用可能被修改的内容
available_actions=available_actions,
enable_tool=global_config.tool.enable_tool,
request_type=request_type,
from_plugin=False,
)
if not success or not reply_set:
logger.info(f"{message_data.get('processed_plain_text')} 的回复生成失败")
return None
return reply_set
except Exception as e:
logger.error(f"{self.context.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None