diff --git a/src/chat/chat_loop/cycle_processor.py b/src/chat/chat_loop/cycle_processor.py index acbdf474e..b446c697a 100644 --- a/src/chat/chat_loop/cycle_processor.py +++ b/src/chat/chat_loop/cycle_processor.py @@ -94,7 +94,7 @@ class CycleProcessor: return loop_info, reply_text, cycle_timers - async def observe(self, interest_value: float = 0.0) -> bool: + async def observe(self, interest_value: float = 0.0) -> str: """ 观察和处理单次思考循环的核心方法 @@ -326,226 +326,7 @@ class CycleProcessor: self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers) action_type = actions[0]["action_type"] if actions else "no_action" - # 管理no_reply计数器:当执行了非no_reply动作时,重置计数器 - if action_type != "no_reply": - # no_reply逻辑已集成到heartFC_chat.py中,直接重置计数器 - self.context.chat_instance.recent_interest_records.clear() - self.context.no_reply_consecutive = 0 - logger.debug(f"{self.log_prefix} 执行了{action_type}动作,重置no_reply计数器") - return True - - if action_type == "no_reply": - self.context.no_reply_consecutive += 1 - self.context.chat_instance._determine_form_type() - - # 在一轮动作执行完毕后,增加睡眠压力 - if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system: - if action_type not in ["no_reply", "no_action"]: - self.context.energy_manager.increase_sleep_pressure() - - return True - - async def execute_plan(self, action_result: Dict[str, Any], target_message: Optional[Dict[str, Any]]): - """ - 执行一个已经制定好的计划 - """ - action_type = action_result.get("action_type", "error") - - # 这里我们需要为执行计划创建一个新的循环追踪 - cycle_timers, thinking_id = self.cycle_tracker.start_cycle(is_proactive=True) - loop_start_time = time.time() - - if action_type == "reply": - # 主动思考不应该直接触发简单回复,但为了逻辑完整性,我们假设它会调用response_handler - # 注意:这里的 available_actions 和 plan_result 是缺失的,需要根据实际情况处理 - await self._handle_reply_action( - target_message, {}, None, loop_start_time, cycle_timers, thinking_id, {"action_result": action_result} - ) - else: - await self._handle_other_actions( - action_type, - action_result.get("reasoning", ""), - action_result.get("action_data", {}), - action_result.get("is_parallel", False), - None, - target_message, - cycle_timers, - thinking_id, - {"action_result": action_result}, - loop_start_time, - ) - - async def _handle_reply_action( - self, message_data, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result - ): - """ - 处理回复类型的动作 - - Args: - message_data: 消息数据 - available_actions: 可用动作列表 - gen_task: 预先创建的生成任务(可能为None) - loop_start_time: 循环开始时间 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - - 功能说明: - - 根据聊天模式决定是否使用预生成的回复或实时生成 - - 在NORMAL模式下使用异步生成提高效率 - - 在FOCUS模式下同步生成确保及时响应 - - 发送生成的回复并结束循环 - """ - # 初始化reply_to_str以避免UnboundLocalError - reply_to_str = None - - if self.context.loop_mode == ChatMode.NORMAL: - if not gen_task: - reply_to_str = await self._build_reply_to_str(message_data) - gen_task = asyncio.create_task( - self.response_handler.generate_response( - message_data=message_data, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.normal", - ) - ) - else: - # 如果gen_task已存在但reply_to_str还未构建,需要构建它 - if reply_to_str is None: - reply_to_str = await self._build_reply_to_str(message_data) - - try: - response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout) - except asyncio.TimeoutError: - response_set = None - else: - reply_to_str = await self._build_reply_to_str(message_data) - response_set = await self.response_handler.generate_response( - message_data=message_data, - available_actions=available_actions, - reply_to=reply_to_str, - request_type="chat.replyer.focus", - ) - - if response_set: - loop_info, _, _ = await self.response_handler.generate_and_send_reply( - response_set, reply_to_str, loop_start_time, message_data, cycle_timers, thinking_id, plan_result - ) - self.cycle_tracker.end_cycle(loop_info, cycle_timers) - - async def _handle_other_actions( - self, - action_type, - reasoning, - action_data, - is_parallel, - gen_task, - action_message, - cycle_timers, - thinking_id, - plan_result, - loop_start_time, - ): - """ - 处理非回复类型的动作(如no_reply、自定义动作等) - - Args: - action_type: 动作类型 - reasoning: 动作理由 - action_data: 动作数据 - is_parallel: 是否并行执行 - gen_task: 生成任务 - action_message: 动作消息 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - loop_start_time: 循环开始时间 - - 功能说明: - - 在NORMAL模式下可能并行执行回复生成和动作处理 - - 等待所有异步任务完成 - - 整合回复和动作的执行结果 - - 构建最终循环信息并结束循环 - """ - background_reply_task = None - if self.context.loop_mode == ChatMode.NORMAL and is_parallel and gen_task: - background_reply_task = asyncio.create_task( - self._handle_parallel_reply( - gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result - ) - ) - - background_action_task = asyncio.create_task( - self._handle_action(action_type, reasoning, action_data, cycle_timers, thinking_id, action_message) - ) - - reply_loop_info, action_success, action_reply_text, action_command = None, False, "", "" - - if background_reply_task: - results = await asyncio.gather(background_reply_task, background_action_task, return_exceptions=True) - reply_result, action_result_val = results - if not isinstance(reply_result, BaseException) and reply_result is not None: - reply_loop_info, _, _ = reply_result - else: - reply_loop_info = None - - if not isinstance(action_result_val, BaseException) and action_result_val is not None: - action_success, action_reply_text, action_command = action_result_val - else: - action_success, action_reply_text, action_command = False, "", "" - else: - results = await asyncio.gather(background_action_task, return_exceptions=True) - if results and len(results) > 0: - action_result_val = results[0] # Get the actual result from the tuple - else: - action_result_val = (False, "", "") - - if not isinstance(action_result_val, BaseException) and action_result_val is not None: - action_success, action_reply_text, action_command = action_result_val - else: - action_success, action_reply_text, action_command = False, "", "" - - loop_info = self._build_final_loop_info( - reply_loop_info, action_success, action_reply_text, action_command, plan_result - ) - self.cycle_tracker.end_cycle(loop_info, cycle_timers) - - async def _handle_parallel_reply( - self, gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result - ): - """ - 处理并行回复生成 - - Args: - gen_task: 回复生成任务 - loop_start_time: 循环开始时间 - action_message: 动作消息 - cycle_timers: 循环计时器 - thinking_id: 思考ID - plan_result: 规划结果 - - Returns: - tuple: (循环信息, 回复文本, 计时器信息) 或 None - - 功能说明: - - 等待并行回复生成任务完成(带超时) - - 构建回复目标字符串 - - 发送生成的回复 - - 返回循环信息供上级方法使用 - """ - try: - response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout) - except asyncio.TimeoutError: - return None, "", {} - - if not response_set: - return None, "", {} - - reply_to_str = await self._build_reply_to_str(action_message) - return await self.response_handler.generate_and_send_reply( - response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result - ) + return action_type async def _handle_action( self, action, reasoning, action_data, cycle_timers, thinking_id, action_message @@ -594,12 +375,12 @@ class CycleProcessor: if "reply" in available_actions: fallback_action = "reply" elif available_actions: - fallback_action = list(available_actions.keys())[0] + fallback_action = list(available_actions.keys()) if fallback_action and fallback_action != action: logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}") action_handler = self.context.action_manager.create_action( - action_name=fallback_action, + action_name=fallback_action if isinstance(fallback_action, list) else fallback_action, action_data=action_data, reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}", cycle_timers=cycle_timers, @@ -619,43 +400,3 @@ class CycleProcessor: logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}") traceback.print_exc() return False, "", "" - - def _build_final_loop_info(self, reply_loop_info, action_success, action_reply_text, action_command, plan_result): - """ - 构建最终的循环信息 - - Args: - reply_loop_info: 回复循环信息(可能为None) - action_success: 动作执行是否成功 - action_reply_text: 动作回复文本 - action_command: 动作命令 - plan_result: 规划结果 - - Returns: - dict: 完整的循环信息,包含规划信息和动作信息 - - 功能说明: - - 如果有回复循环信息,则在其基础上添加动作信息 - - 如果没有回复信息,则创建新的循环信息结构 - - 整合所有执行结果供循环跟踪器记录 - """ - if reply_loop_info: - loop_info = reply_loop_info - loop_info["loop_action_info"].update( - { - "action_taken": action_success, - "command": action_command, - "taken_time": time.time(), - } - ) - else: - loop_info = { - "loop_plan_info": {"action_result": plan_result.get("action_result", {})}, - "loop_action_info": { - "action_taken": action_success, - "reply_text": action_reply_text, - "command": action_command, - "taken_time": time.time(), - }, - } - return loop_info diff --git a/src/chat/chat_loop/heartFC_chat.py b/src/chat/chat_loop/heartFC_chat.py index caa13affe..80a7aff76 100644 --- a/src/chat/chat_loop/heartFC_chat.py +++ b/src/chat/chat_loop/heartFC_chat.py @@ -342,31 +342,36 @@ class HeartFChatting: logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。") # 根据聊天模式处理新消息 - # 统一使用 _should_process_messages 判断是否应该处理 should_process, interest_value = await self._should_process_messages(recent_messages) - if should_process: - self.context.last_read_time = time.time() - await self.cycle_processor.observe(interest_value=interest_value) - else: - # Normal模式:消息数量不足,等待 + if not should_process: + # 消息数量不足或兴趣不够,等待 await asyncio.sleep(0.5) - return True + return True # Skip rest of the logic for this iteration - if not await self._should_process_messages(recent_messages): - return has_new_messages + # Messages should be processed + action_type = await self.cycle_processor.observe(interest_value=interest_value) - # 处理新消息 - for message in recent_messages: - await self.cycle_processor.observe(interest_value=interest_value) + # 管理no_reply计数器 + if action_type != "no_reply": + self.recent_interest_records.clear() + self.context.no_reply_consecutive = 0 + logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作,重置no_reply计数器") + else: # action_type == "no_reply" + self.context.no_reply_consecutive += 1 + self._determine_form_type() + + # 在一轮动作执行完毕后,增加睡眠压力 + if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system: + if action_type not in ["no_reply", "no_action"]: + self.context.energy_manager.increase_sleep_pressure() # 如果成功观察,增加能量值并重置累积兴趣值 - if has_new_messages: - self.context.energy_value += 1 / global_config.chat.focus_value - # 重置累积兴趣值,因为消息已经被成功处理 - self.context.breaking_accumulated_interest = 0.0 - logger.info( - f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值" - ) + self.context.energy_value += 1 / global_config.chat.focus_value + # 重置累积兴趣值,因为消息已经被成功处理 + self.context.breaking_accumulated_interest = 0.0 + logger.info( + f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值" + ) # 更新上一帧的睡眠状态 self.context.was_sleeping = is_sleeping diff --git a/src/chat/chat_loop/proactive/proactive_thinker.py b/src/chat/chat_loop/proactive/proactive_thinker.py index 9cb2f45c6..e2be9fdc2 100644 --- a/src/chat/chat_loop/proactive/proactive_thinker.py +++ b/src/chat/chat_loop/proactive/proactive_thinker.py @@ -6,6 +6,7 @@ from src.common.logger import get_logger from src.plugin_system.base.component_types import ChatMode from ..hfc_context import HfcContext from .events import ProactiveTriggerEvent +from src.plugin_system.apis import generator_api if TYPE_CHECKING: from ..cycle_processor import CycleProcessor @@ -103,23 +104,19 @@ class ProactiveThinker: # 如果决策不是 do_nothing,则执行 if action_result and action_result.get("action_type") != "do_nothing": - # 在主动思考时,如果 target_message 为 None,则默认选取最新 message 作为 target_message - if target_message is None and self.context.chat_stream and self.context.chat_stream.context: - from src.chat.message_receive.message import MessageRecv - - latest_message = self.context.chat_stream.context.get_last_message() - if isinstance(latest_message, MessageRecv): - user_info = latest_message.message_info.user_info - target_message = { - "chat_info_platform": latest_message.message_info.platform, - "user_platform": user_info.platform if user_info else None, - "user_id": user_info.user_id if user_info else None, - "processed_plain_text": latest_message.processed_plain_text, - "is_mentioned": latest_message.is_mentioned, - } - - # 将决策结果交给 cycle_processor 的后续流程处理 - await self.cycle_processor.execute_plan(action_result, target_message) + if action_result.get("action_type") == "reply": + success, response_set, _ = await generator_api.generate_reply( + chat_stream=self.context.chat_stream, + reply_message=action_result["action_message"], + available_actions={}, + enable_tool=False, + request_type="chat.replyer.proactive", + from_plugin=False, + ) + if success and response_set: + await self.cycle_processor.response_handler.send_response( + response_set, time.time(), action_result["action_message"] + ) else: logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默") diff --git a/src/chat/chat_loop/response_handler.py b/src/chat/chat_loop/response_handler.py index cc9414e9b..7a5acfc53 100644 --- a/src/chat/chat_loop/response_handler.py +++ b/src/chat/chat_loop/response_handler.py @@ -172,101 +172,3 @@ class ResponseHandler: ) return reply_text - - # TODO: 已废弃 - async def generate_response( - self, - message_data: dict, - available_actions: Optional[Dict[str, Any]], - reply_to: str, - request_type: str = "chat.replyer.normal", - ) -> Optional[list]: - """ - 生成回复内容 - - Args: - message_data: 消息数据 - available_actions: 可用动作列表 - reply_to: 回复目标 - request_type: 请求类型,默认为普通回复 - - Returns: - list: 生成的回复内容列表,失败时返回None - - 功能说明: - - 在生成回复前进行反注入检测(提高效率) - - 调用生成器API生成回复 - - 根据配置启用或禁用工具功能 - - 处理生成失败的情况 - - 记录生成过程中的错误和异常 - """ - try: - # === 反注入检测(仅在需要生成回复时) === - # 执行反注入检测(直接使用字典格式) - anti_injector = get_anti_injector() - result, modified_content, reason = await anti_injector.process_message( - message_data, self.context.chat_stream - ) - - # 根据反注入结果处理消息数据 - await anti_injector.handle_message_storage(result, modified_content, reason or "", message_data) - - if result == ProcessResult.BLOCKED_BAN: - # 用户被封禁 - 直接阻止回复生成 - anti_injector_logger.warning(f"用户被反注入系统封禁,阻止回复生成: {reason}") - return None - elif result == ProcessResult.BLOCKED_INJECTION: - # 消息被阻止(危险内容等) - 直接阻止回复生成 - anti_injector_logger.warning(f"消息被反注入系统阻止,阻止回复生成: {reason}") - return None - elif result == ProcessResult.COUNTER_ATTACK: - # 反击模式:生成反击消息作为回复 - anti_injector_logger.info(f"反击模式启动,生成反击回复: {reason}") - if modified_content: - # 返回反击消息作为回复内容 - return [("text", modified_content)] - else: - # 没有反击内容时阻止回复生成 - return None - - # 检查是否需要加盾处理 - safety_prompt = None - if result == ProcessResult.SHIELDED: - # 获取安全系统提示词并注入 - shield = anti_injector.shield - safety_prompt = shield.get_safety_system_prompt() - await Prompt.create_async(safety_prompt, "anti_injection_safety_prompt") - anti_injector_logger.info(f"消息已被反注入系统加盾处理,已注入安全提示词: {reason}") - - # 处理被修改的消息内容(用于生成回复) - modified_reply_to = reply_to - if modified_content: - # 更新消息内容用于生成回复 - anti_injector_logger.info(f"消息内容已被反注入系统修改,使用修改后内容生成回复: {reason}") - # 解析原始reply_to格式:"发送者:消息内容" - if ":" in reply_to: - sender_part, _ = reply_to.split(":", 1) - modified_reply_to = f"{sender_part}:{modified_content}" - else: - # 如果格式不标准,直接使用修改后的内容 - modified_reply_to = modified_content - - # === 正常的回复生成流程 === - success, reply_set, _ = await generator_api.generate_reply( - chat_stream=self.context.chat_stream, - reply_to=modified_reply_to, # 使用可能被修改的内容 - available_actions=available_actions, - enable_tool=global_config.tool.enable_tool, - request_type=request_type, - from_plugin=False, - ) - - if not success or not reply_set: - logger.info(f"对 {message_data.get('processed_plain_text')} 的回复生成失败") - return None - - return reply_set - - except Exception as e: - logger.error(f"{self.context.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}") - return None