From 1365099fd4fb8e80ca8627d129d20cd3e5be02e9 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 6 Jul 2025 20:14:09 +0800 Subject: [PATCH] =?UTF-8?q?remove=EF=BC=9A=E5=86=97=E4=BD=99=E7=9A=84sbhf?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=92=8Cfocus=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- changelogs/changelog.md | 1 + src/api/apiforgui.py | 34 -- ...loop_observation.py => focus_loop_info.py} | 42 +-- src/chat/focus_chat/heartFC_chat.py | 209 +---------- src/chat/focus_chat/hfc_performance_logger.py | 1 - src/chat/focus_chat/hfc_utils.py | 35 -- src/chat/focus_chat/hfc_version_manager.py | 2 +- src/chat/focus_chat/info/action_info.py | 83 ----- src/chat/focus_chat/info/cycle_info.py | 157 -------- src/chat/focus_chat/info/info_base.py | 69 ---- src/chat/focus_chat/info/obs_info.py | 165 --------- .../focus_chat/info/workingmemory_info.py | 86 ----- .../info_processors/base_processor.py | 51 --- .../info_processors/chattinginfo_processor.py | 142 -------- .../observation/actions_observation.py | 46 --- .../observation/chatting_observation.py | 183 ---------- .../focus_chat/observation/observation.py | 25 -- .../observation/working_observation.py | 34 -- src/chat/heart_flow/background_tasks.py | 173 --------- src/chat/heart_flow/heartflow.py | 106 ++---- .../heart_flow/heartflow_message_processor.py | 21 +- src/chat/heart_flow/sub_heartflow.py | 83 ----- src/chat/heart_flow/subheartflow_manager.py | 337 ------------------ src/chat/memory_system/memory_activator.py | 6 - src/chat/message_receive/__init__.py | 2 +- ...age_sender.py => normal_message_sender.py} | 0 .../uni_message_sender.py} | 0 src/chat/normal_chat/normal_chat.py | 2 +- src/chat/planner_actions/action_modifier.py | 18 +- src/chat/planner_actions/planner_focus.py | 138 +++---- src/chat/replyer/default_generator.py | 2 +- src/chat/utils/statistic.py | 6 +- .../working_memory/memory_item.py | 0 .../working_memory/memory_manager.py | 0 .../working_memory/working_memory.py | 0 .../working_memory_processor.py | 5 +- src/common/logger.py | 1 - src/config/official_configs.py | 8 - src/experimental/PFC/message_sender.py | 2 +- src/main.py | 7 +- src/plugin_system/apis/chat_api.py | 34 -- src/plugin_system/apis/send_api.py | 2 +- src/plugin_system/base/base_action.py | 1 - template/bot_config_template.toml | 23 +- 44 files changed, 132 insertions(+), 2210 deletions(-) rename src/chat/focus_chat/{observation/hfcloop_observation.py => focus_loop_info.py} (67%) delete mode 100644 src/chat/focus_chat/info/action_info.py delete mode 100644 src/chat/focus_chat/info/cycle_info.py delete mode 100644 src/chat/focus_chat/info/info_base.py delete mode 100644 src/chat/focus_chat/info/obs_info.py delete mode 100644 src/chat/focus_chat/info/workingmemory_info.py delete mode 100644 src/chat/focus_chat/info_processors/base_processor.py delete mode 100644 src/chat/focus_chat/info_processors/chattinginfo_processor.py delete mode 100644 src/chat/focus_chat/observation/actions_observation.py delete mode 100644 src/chat/focus_chat/observation/chatting_observation.py delete mode 100644 src/chat/focus_chat/observation/observation.py delete mode 100644 src/chat/focus_chat/observation/working_observation.py delete mode 100644 src/chat/heart_flow/background_tasks.py delete mode 100644 src/chat/heart_flow/subheartflow_manager.py rename src/chat/message_receive/{message_sender.py => normal_message_sender.py} (100%) rename src/chat/{focus_chat/heartFC_sender.py => message_receive/uni_message_sender.py} (100%) rename src/chat/{focus_chat => }/working_memory/memory_item.py (100%) rename src/chat/{focus_chat => }/working_memory/memory_manager.py (100%) rename src/chat/{focus_chat => }/working_memory/working_memory.py (100%) rename src/chat/{focus_chat/info_processors => working_memory}/working_memory_processor.py (98%) diff --git a/changelogs/changelog.md b/changelogs/changelog.md index eab206f1b..f31a46239 100644 --- a/changelogs/changelog.md +++ b/changelogs/changelog.md @@ -12,6 +12,7 @@ - normal的插件允许llm激活 - 合并action激活器 - emoji统一可选随机激活或llm激活 +- 移除observation和processor,简化focus的代码逻辑 ## [0.8.1] - 2025-7-5 diff --git a/src/api/apiforgui.py b/src/api/apiforgui.py index e1cffebb6..01685939e 100644 --- a/src/api/apiforgui.py +++ b/src/api/apiforgui.py @@ -1,7 +1,6 @@ from src.chat.heart_flow.heartflow import heartflow from src.chat.heart_flow.sub_heartflow import ChatState from src.common.logger import get_logger -import time logger = get_logger("api") @@ -20,39 +19,6 @@ async def forced_change_subheartflow_status(subheartflow_id: str, status: ChatSt return False -async def get_subheartflow_cycle_info(subheartflow_id: str, history_len: int) -> dict: - """获取子心流的循环信息""" - subheartflow_cycle_info = await heartflow.api_get_subheartflow_cycle_info(subheartflow_id, history_len) - logger.debug(f"子心流 {subheartflow_id} 循环信息: {subheartflow_cycle_info}") - if subheartflow_cycle_info: - return subheartflow_cycle_info - else: - logger.warning(f"子心流 {subheartflow_id} 循环信息未找到") - return None - - -async def get_normal_chat_replies(subheartflow_id: str, limit: int = 10) -> list: - """获取子心流的NormalChat回复记录 - - Args: - subheartflow_id: 子心流ID - limit: 最大返回数量,默认10条 - - Returns: - list: 回复记录列表,如果未找到则返回空列表 - """ - replies = await heartflow.api_get_normal_chat_replies(subheartflow_id, limit) - logger.debug(f"子心流 {subheartflow_id} NormalChat回复记录: 获取到 {len(replies) if replies else 0} 条") - if replies: - # 格式化时间戳为可读时间 - for reply in replies: - if "time" in reply: - reply["formatted_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["time"])) - return replies - else: - logger.warning(f"子心流 {subheartflow_id} NormalChat回复记录未找到") - return [] - async def get_all_states(): """获取所有状态""" diff --git a/src/chat/focus_chat/observation/hfcloop_observation.py b/src/chat/focus_chat/focus_loop_info.py similarity index 67% rename from src/chat/focus_chat/observation/hfcloop_observation.py rename to src/chat/focus_chat/focus_loop_info.py index ad7245f8a..2389f10c9 100644 --- a/src/chat/focus_chat/observation/hfcloop_observation.py +++ b/src/chat/focus_chat/focus_loop_info.py @@ -6,20 +6,16 @@ from src.chat.focus_chat.hfc_utils import CycleDetail from typing import List # Import the new utility function -logger = get_logger("observation") +logger = get_logger("loop_info") # 所有观察的基类 -class HFCloopObservation: +class FocusLoopInfo: def __init__(self, observe_id): - self.observe_info = "" self.observe_id = observe_id self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间 self.history_loop: List[CycleDetail] = [] - def get_observe_info(self): - return self.observe_info - def add_loop_info(self, loop_info: CycleDetail): self.history_loop.append(loop_info) @@ -50,11 +46,6 @@ class HFCloopObservation: action_taken_time_str = ( datetime.fromtimestamp(action_taken_time).strftime("%H:%M:%S") if action_taken_time > 0 else "未知时间" ) - # print(action_type) - # print(action_reasoning) - # print(is_taken) - # print(action_taken_time_str) - # print("--------------------------------") if action_reasoning != cycle_last_reason: cycle_last_reason = action_reasoning action_reasoning_str = f"你选择这个action的原因是:{action_reasoning}" @@ -71,9 +62,6 @@ class HFCloopObservation: else: action_detailed_str += f"{action_taken_time_str}时,你选择回复(action:{action_type},内容是:'{response_text}'),但是动作失败了。{action_reasoning_str}\n" elif action_type == "no_reply": - # action_detailed_str += ( - # f"{action_taken_time_str}时,你选择不回复(action:{action_type}),{action_reasoning_str}\n" - # ) pass else: if is_taken: @@ -88,17 +76,6 @@ class HFCloopObservation: else: cycle_info_block = "\n" - # 根据连续文本回复的数量构建提示信息 - if consecutive_text_replies >= 3: # 如果最近的三个活动都是文本回复 - cycle_info_block = f'你已经连续回复了三条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}",第三近: "{responses_for_prompt[2]}")。你回复的有点多了,请注意' - elif consecutive_text_replies == 2: # 如果最近的两个活动是文本回复 - cycle_info_block = f'你已经连续回复了两条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}"),请注意' - - # 包装提示块,增加可读性,即使没有连续回复也给个标记 - # if cycle_info_block: - # cycle_info_block = f"\n你最近的回复\n{cycle_info_block}\n" - # else: - # cycle_info_block = "\n" # 获取history_loop中最新添加的 if self.history_loop: @@ -112,17 +89,4 @@ class HFCloopObservation: else: cycle_info_block += f"距离你上一次阅读消息并思考和规划,已经过去了{time_diff}秒\n" else: - cycle_info_block += "你还没看过消息\n" - - self.observe_info = cycle_info_block - - def to_dict(self) -> dict: - """将观察对象转换为可序列化的字典""" - # 只序列化基本信息,避免循环引用 - return { - "observe_info": self.observe_info, - "observe_id": self.observe_id, - "last_observe_time": self.last_observe_time, - # 不序列化history_loop,避免循环引用 - "history_loop_count": len(self.history_loop), - } + cycle_info_block += "你还没看过消息\n" \ No newline at end of file diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 9e10da366..ac95a984b 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -9,15 +9,7 @@ from rich.traceback import install from src.chat.utils.prompt_builder import global_prompt_manager from src.common.logger import get_logger from src.chat.utils.timer_calculator import Timer -from src.chat.focus_chat.observation.observation import Observation -from src.chat.focus_chat.info.info_base import InfoBase -from src.chat.focus_chat.info_processors.chattinginfo_processor import ChattingInfoProcessor -from src.chat.focus_chat.info_processors.working_memory_processor import WorkingMemoryProcessor -from src.chat.focus_chat.observation.hfcloop_observation import HFCloopObservation -from src.chat.focus_chat.observation.working_observation import WorkingMemoryObservation -from src.chat.focus_chat.observation.chatting_observation import ChattingObservation -from src.chat.focus_chat.observation.actions_observation import ActionObservation -from src.chat.focus_chat.info_processors.base_processor import BaseProcessor +from src.chat.focus_chat.focus_loop_info import FocusLoopInfo from src.chat.planner_actions.planner_focus import ActionPlanner from src.chat.planner_actions.action_modifier import ActionModifier from src.chat.planner_actions.action_manager import ActionManager @@ -32,23 +24,8 @@ install(extra_lines=3) # 注释:原来的动作修改超时常量已移除,因为改为顺序执行 -# 定义观察器映射:键是观察器名称,值是 (观察器类, 初始化参数) -OBSERVATION_CLASSES = { - "ChattingObservation": (ChattingObservation, "chat_id"), - "WorkingMemoryObservation": (WorkingMemoryObservation, "observe_id"), - "HFCloopObservation": (HFCloopObservation, "observe_id"), -} - -# 定义处理器映射:键是处理器名称,值是 (处理器类, 可选的配置键名) -PROCESSOR_CLASSES = { - "ChattingInfoProcessor": (ChattingInfoProcessor, None), - "WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"), -} - logger = get_logger("hfc") # Logger Name Changed - - class HeartFChatting: """ 管理一个连续的Focus Chat循环 @@ -83,25 +60,14 @@ class HeartFChatting: self._message_threshold = max(10, int(30 * global_config.chat.exit_focus_threshold)) self._fatigue_triggered = False # 是否已触发疲惫退出 - # 初始化观察器 - self.observations: List[Observation] = [] - self._register_observations() - - # 根据配置文件和默认规则确定启用的处理器 - self.enabled_processor_names = ["ChattingInfoProcessor"] - if global_config.focus_chat.working_memory_processor: - self.enabled_processor_names.append("WorkingMemoryProcessor") - - self.processors: List[BaseProcessor] = [] - self._register_default_processors() + self.loop_info: FocusLoopInfo = FocusLoopInfo(observe_id=self.stream_id) self.action_manager = ActionManager() self.action_planner = ActionPlanner( - log_prefix=self.log_prefix, action_manager=self.action_manager + chat_id = self.stream_id, + action_manager=self.action_manager ) self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id) - self.action_observation = ActionObservation(observe_id=self.stream_id) - self.action_observation.set_action_manager(self.action_manager) self._processing_lock = asyncio.Lock() @@ -130,66 +96,8 @@ class HeartFChatting: f"{self.log_prefix} HeartFChatting 初始化完成,消息疲惫阈值: {self._message_threshold}条(基于exit_focus_threshold={global_config.chat.exit_focus_threshold}计算,仅在auto模式下生效)" ) - def _register_observations(self): - """注册所有观察器""" - self.observations = [] # 清空已有的 - - for name, (observation_class, param_name) in OBSERVATION_CLASSES.items(): - try: - # 检查是否需要跳过WorkingMemoryObservation - if name == "WorkingMemoryObservation": - # 如果工作记忆处理器被禁用,则跳过WorkingMemoryObservation - if not global_config.focus_chat.working_memory_processor: - logger.debug(f"{self.log_prefix} 工作记忆处理器已禁用,跳过注册观察器 {name}") - continue - - # 根据参数名使用正确的参数 - kwargs = {param_name: self.stream_id} - observation = observation_class(**kwargs) - self.observations.append(observation) - logger.debug(f"{self.log_prefix} 注册观察器 {name}") - except Exception as e: - logger.error(f"{self.log_prefix} 观察器 {name} 构造失败: {e}") - - if self.observations: - logger.info(f"{self.log_prefix} 已注册观察器: {[o.__class__.__name__ for o in self.observations]}") - else: - logger.warning(f"{self.log_prefix} 没有注册任何观察器") - - def _register_default_processors(self): - """根据 self.enabled_processor_names 注册信息处理器""" - self.processors = [] # 清空已有的 - - for name in self.enabled_processor_names: # 'name' is "ChattingInfoProcessor", etc. - processor_info = PROCESSOR_CLASSES.get(name) # processor_info is (ProcessorClass, config_key) - if processor_info: - processor_actual_class = processor_info[0] # 获取实际的类定义 - # 根据处理器类名判断构造参数 - if name == "ChattingInfoProcessor": - self.processors.append(processor_actual_class()) - elif name == "WorkingMemoryProcessor": - self.processors.append(processor_actual_class(subheartflow_id=self.stream_id)) - else: - try: - self.processors.append(processor_actual_class()) # 尝试无参构造 - logger.debug(f"{self.log_prefix} 注册处理器 {name} (尝试无参构造).") - except TypeError: - logger.error( - f"{self.log_prefix} 处理器 {name} 构造失败。它可能需要参数(如 subheartflow_id)但未在注册逻辑中明确处理。" - ) - else: - logger.warning( - f"{self.log_prefix} 在 PROCESSOR_CLASSES 中未找到名为 '{name}' 的处理器定义,将跳过注册。" - ) - - if self.processors: - logger.info(f"{self.log_prefix} 已注册处理器: {[p.__class__.__name__ for p in self.processors]}") - else: - logger.warning(f"{self.log_prefix} 没有注册任何处理器。这可能是由于配置错误或所有处理器都被禁用了。") - async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" - logger.debug(f"{self.log_prefix} 开始启动 HeartFChatting") # 如果循环已经激活,直接返回 if self._loop_active: @@ -210,8 +118,6 @@ class HeartFChatting: try: # 等待旧任务确实被取消 await asyncio.wait_for(self._loop_task, timeout=5.0) - except (asyncio.CancelledError, asyncio.TimeoutError): - pass # 忽略取消或超时错误 except Exception as e: logger.warning(f"{self.log_prefix} 等待旧任务取消时出错: {e}") self._loop_task = None # 清理旧任务引用 @@ -310,14 +216,11 @@ class HeartFChatting: logger.error(f"{self.log_prefix} 处理上下文时出错: {e}") # 为当前循环设置错误状态,防止后续重复报错 error_loop_info = { - "loop_observation_info": {}, - "loop_processor_info": {}, "loop_plan_info": { "action_result": { "action_type": "error", "action_data": {}, }, - "observed_messages": "", }, "loop_action_info": { "action_taken": False, @@ -335,14 +238,8 @@ class HeartFChatting: self._current_cycle_detail.set_loop_info(loop_info) - # 从observations列表中获取HFCloopObservation - hfcloop_observation = next( - (obs for obs in self.observations if isinstance(obs, HFCloopObservation)), None - ) - if hfcloop_observation: - hfcloop_observation.add_loop_info(self._current_cycle_detail) - else: - logger.warning(f"{self.log_prefix} 未找到HFCloopObservation实例") + + self.loop_info.add_loop_info(self._current_cycle_detail) self._current_cycle_detail.timers = cycle_timers @@ -391,15 +288,12 @@ class HeartFChatting: # 如果_current_cycle_detail存在但未完成,为其设置错误状态 if self._current_cycle_detail and not hasattr(self._current_cycle_detail, "end_time"): error_loop_info = { - "loop_observation_info": {}, - "loop_processor_info": {}, "loop_plan_info": { "action_result": { "action_type": "error", "action_data": {}, "reasoning": f"循环处理失败: {e}", }, - "observed_messages": "", }, "loop_action_info": { "action_taken": False, @@ -445,65 +339,10 @@ class HeartFChatting: if acquired and self._processing_lock.locked(): self._processing_lock.release() - async def _process_processors(self, observations: List[Observation]) -> tuple[List[InfoBase], Dict[str, float]]: - # 记录并行任务开始时间 - parallel_start_time = time.time() - logger.debug(f"{self.log_prefix} 开始信息处理器并行任务") - - processor_tasks = [] - task_to_name_map = {} - - for processor in self.processors: - processor_name = processor.__class__.log_prefix - - async def run_with_timeout(proc=processor): - return await proc.process_info(observations=observations) - - task = asyncio.create_task(run_with_timeout()) - - processor_tasks.append(task) - task_to_name_map[task] = processor_name - logger.debug(f"{self.log_prefix} 启动处理器任务: {processor_name}") - - pending_tasks = set(processor_tasks) - all_plan_info: List[InfoBase] = [] - - while pending_tasks: - done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) - - for task in done: - processor_name = task_to_name_map[task] - task_completed_time = time.time() - duration_since_parallel_start = task_completed_time - parallel_start_time - - try: - result_list = await task - logger.debug(f"{self.log_prefix} 处理器 {processor_name} 已完成!") - if result_list is not None: - all_plan_info.extend(result_list) - else: - logger.warning(f"{self.log_prefix} 处理器 {processor_name} 返回了 None") - except Exception as e: - logger.error( - f"{self.log_prefix} 处理器 {processor_name} 执行失败,耗时 (自并行开始): {duration_since_parallel_start:.2f}秒. 错误: {e}", - exc_info=True, - ) - traceback.print_exc() - - - return all_plan_info - async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict: try: loop_start_time = time.time() - with Timer("观察", cycle_timers): - # 执行所有观察器的观察 - for observation in self.observations: - await observation.observe() - - loop_observation_info = { - "observations": self.observations, - } + await self.loop_info.observe() await self.relationship_builder.build_relation() @@ -513,37 +352,18 @@ class HeartFChatting: try: # 调用完整的动作修改流程 await self.action_modifier.modify_actions( - observations=self.observations, + loop_info = self.loop_info, mode="focus", ) - - await self.action_observation.observe() - self.observations.append(self.action_observation) - logger.debug(f"{self.log_prefix} 动作修改完成") except Exception as e: logger.error(f"{self.log_prefix} 动作修改失败: {e}") # 继续执行,不中断流程 - - try: - all_plan_info = await self._process_processors(self.observations) - except Exception as e: - logger.error(f"{self.log_prefix} 信息处理器失败: {e}") - # 设置默认值以继续执行 - all_plan_info = [] - - loop_processor_info = { - "all_plan_info": all_plan_info, - } - - logger.debug(f"{self.log_prefix} 并行阶段完成,准备进入规划器,plan_info数量: {len(all_plan_info)}") - with Timer("规划器", cycle_timers): - plan_result = await self.action_planner.plan(all_plan_info, loop_start_time) + plan_result = await self.action_planner.plan() loop_plan_info = { "action_result": plan_result.get("action_result", {}), - "observed_messages": plan_result.get("observed_messages", ""), } action_type, action_data, reasoning = ( @@ -551,6 +371,8 @@ class HeartFChatting: plan_result.get("action_result", {}).get("action_data", {}), plan_result.get("action_result", {}).get("reasoning", "未提供理由"), ) + + action_data["loop_start_time"] = loop_start_time if action_type == "reply": action_str = "回复" @@ -559,7 +381,7 @@ class HeartFChatting: else: action_str = action_type - logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'") + logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}',理由是:{reasoning}") # 动作执行计时 with Timer("动作执行", cycle_timers): @@ -575,8 +397,6 @@ class HeartFChatting: } loop_info = { - "loop_observation_info": loop_observation_info, - "loop_processor_info": loop_processor_info, "loop_plan_info": loop_plan_info, "loop_action_info": loop_action_info, } @@ -587,11 +407,8 @@ class HeartFChatting: logger.error(f"{self.log_prefix} FOCUS聊天处理失败: {e}") logger.error(traceback.format_exc()) return { - "loop_observation_info": {}, - "loop_processor_info": {}, "loop_plan_info": { "action_result": {"action_type": "error", "action_data": {}, "reasoning": f"处理失败: {e}"}, - "observed_messages": "", }, "loop_action_info": {"action_taken": False, "reply_text": "", "command": "", "taken_time": time.time()}, } @@ -636,7 +453,7 @@ class HeartFChatting: return False, "", "" if not action_handler: - logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}, 原因: {reasoning}") + logger.warning(f"{self.log_prefix} 未能创建动作处理器: {action}") return False, "", "" # 处理动作并获取结果 diff --git a/src/chat/focus_chat/hfc_performance_logger.py b/src/chat/focus_chat/hfc_performance_logger.py index 7ae3ea2de..88b4c66a3 100644 --- a/src/chat/focus_chat/hfc_performance_logger.py +++ b/src/chat/focus_chat/hfc_performance_logger.py @@ -41,7 +41,6 @@ class HFCPerformanceLogger: "action_type": cycle_data.get("action_type", "unknown"), "total_time": cycle_data.get("total_time", 0), "step_times": cycle_data.get("step_times", {}), - "processor_time_costs": cycle_data.get("processor_time_costs", {}), # 前处理器时间 "reasoning": cycle_data.get("reasoning", ""), "success": cycle_data.get("success", False), } diff --git a/src/chat/focus_chat/hfc_utils.py b/src/chat/focus_chat/hfc_utils.py index 0e7fe6a2c..7eeb9a7ab 100644 --- a/src/chat/focus_chat/hfc_utils.py +++ b/src/chat/focus_chat/hfc_utils.py @@ -5,7 +5,6 @@ from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.message import UserInfo from src.common.logger import get_logger import json -import os from typing import Dict, Any logger = get_logger(__name__) @@ -24,9 +23,6 @@ class CycleDetail: self.end_time: Optional[float] = None self.timers: Dict[str, float] = {} - # 新字段 - self.loop_observation_info: Dict[str, Any] = {} - self.loop_processor_info: Dict[str, Any] = {} # 前处理器信息 self.loop_plan_info: Dict[str, Any] = {} self.loop_action_info: Dict[str, Any] = {} @@ -79,8 +75,6 @@ class CycleDetail: "end_time": self.end_time, "timers": self.timers, "thinking_id": self.thinking_id, - "loop_observation_info": convert_to_serializable(self.loop_observation_info), - "loop_processor_info": convert_to_serializable(self.loop_processor_info), "loop_plan_info": convert_to_serializable(self.loop_plan_info), "loop_action_info": convert_to_serializable(self.loop_action_info), } @@ -100,41 +94,12 @@ class CycleDetail: or "group" ) - # current_time_minute = time.strftime("%Y%m%d_%H%M", time.localtime()) - - # try: - # self.log_cycle_to_file( - # log_dir + self.prefix + f"/{current_time_minute}_cycle_" + str(self.cycle_id) + ".json" - # ) - # except Exception as e: - # logger.warning(f"写入文件日志,可能是群名称包含非法字符: {e}") - - def log_cycle_to_file(self, file_path: str): - """将循环信息写入文件""" - # 如果目录不存在,则创建目 - dir_name = os.path.dirname(file_path) - # 去除特殊字符,保留字母、数字、下划线、中划线和中文 - dir_name = "".join( - char for char in dir_name if char.isalnum() or char in ["_", "-", "/"] or "\u4e00" <= char <= "\u9fff" - ) - # print("dir_name:", dir_name) - if dir_name and not os.path.exists(dir_name): - os.makedirs(dir_name, exist_ok=True) - # 写入文件 - - file_path = os.path.join(dir_name, os.path.basename(file_path)) - # print("file_path:", file_path) - with open(file_path, "a", encoding="utf-8") as f: - f.write(json.dumps(self.to_dict(), ensure_ascii=False) + "\n") - def set_thinking_id(self, thinking_id: str): """设置思考消息ID""" self.thinking_id = thinking_id def set_loop_info(self, loop_info: Dict[str, Any]): """设置循环信息""" - self.loop_observation_info = loop_info["loop_observation_info"] - self.loop_processor_info = loop_info["loop_processor_info"] self.loop_plan_info = loop_info["loop_plan_info"] self.loop_action_info = loop_info["loop_action_info"] diff --git a/src/chat/focus_chat/hfc_version_manager.py b/src/chat/focus_chat/hfc_version_manager.py index 91a3f51be..c41dff2a8 100644 --- a/src/chat/focus_chat/hfc_version_manager.py +++ b/src/chat/focus_chat/hfc_version_manager.py @@ -20,7 +20,7 @@ class HFCVersionManager: """HFC版本号管理器""" # 默认版本号 - DEFAULT_VERSION = "v5.0.0" + DEFAULT_VERSION = "v6.0.0" # 当前运行时版本号 _current_version: Optional[str] = None diff --git a/src/chat/focus_chat/info/action_info.py b/src/chat/focus_chat/info/action_info.py deleted file mode 100644 index 8c97029d0..000000000 --- a/src/chat/focus_chat/info/action_info.py +++ /dev/null @@ -1,83 +0,0 @@ -from typing import Dict, Optional, Any, List -from dataclasses import dataclass -from .info_base import InfoBase - - -@dataclass -class ActionInfo(InfoBase): - """动作信息类 - - 用于管理和记录动作的变更信息,包括需要添加或移除的动作。 - 继承自 InfoBase 类,使用字典存储具体数据。 - - Attributes: - type (str): 信息类型标识符,固定为 "action" - - Data Fields: - add_actions (List[str]): 需要添加的动作列表 - remove_actions (List[str]): 需要移除的动作列表 - reason (str): 变更原因说明 - """ - - type: str = "action" - - def get_type(self) -> str: - """获取信息类型""" - return self.type - - def get_data(self) -> Dict[str, Any]: - """获取信息数据""" - return self.data - - def set_action_changes(self, action_changes: Dict[str, List[str]]) -> None: - """设置动作变更信息 - - Args: - action_changes (Dict[str, List[str]]): 包含要增加和删除的动作列表 - { - "add": ["action1", "action2"], - "remove": ["action3"] - } - """ - self.data["add_actions"] = action_changes.get("add", []) - self.data["remove_actions"] = action_changes.get("remove", []) - - def set_reason(self, reason: str) -> None: - """设置变更原因 - - Args: - reason (str): 动作变更的原因说明 - """ - self.data["reason"] = reason - - def get_add_actions(self) -> List[str]: - """获取需要添加的动作列表 - - Returns: - List[str]: 需要添加的动作列表 - """ - return self.data.get("add_actions", []) - - def get_remove_actions(self) -> List[str]: - """获取需要移除的动作列表 - - Returns: - List[str]: 需要移除的动作列表 - """ - return self.data.get("remove_actions", []) - - def get_reason(self) -> Optional[str]: - """获取变更原因 - - Returns: - Optional[str]: 动作变更的原因说明,如果未设置则返回 None - """ - return self.data.get("reason") - - def has_changes(self) -> bool: - """检查是否有动作变更 - - Returns: - bool: 如果有任何动作需要添加或移除则返回True - """ - return bool(self.get_add_actions() or self.get_remove_actions()) diff --git a/src/chat/focus_chat/info/cycle_info.py b/src/chat/focus_chat/info/cycle_info.py deleted file mode 100644 index 3701aa153..000000000 --- a/src/chat/focus_chat/info/cycle_info.py +++ /dev/null @@ -1,157 +0,0 @@ -from typing import Dict, Optional, Any -from dataclasses import dataclass -from .info_base import InfoBase - - -@dataclass -class CycleInfo(InfoBase): - """循环信息类 - - 用于记录和管理心跳循环的相关信息,包括循环ID、时间信息、动作信息等。 - 继承自 InfoBase 类,使用字典存储具体数据。 - - Attributes: - type (str): 信息类型标识符,固定为 "cycle" - - Data Fields: - cycle_id (str): 当前循环的唯一标识符 - start_time (str): 循环开始的时间 - end_time (str): 循环结束的时间 - action (str): 在循环中采取的动作 - action_data (Dict[str, Any]): 动作相关的详细数据 - reason (str): 触发循环的原因 - observe_info (str): 当前的回复信息 - """ - - type: str = "cycle" - - def get_type(self) -> str: - """获取信息类型""" - return self.type - - def get_data(self) -> Dict[str, str]: - """获取信息数据""" - return self.data - - def get_info(self, key: str) -> Optional[str]: - """获取特定属性的信息 - - Args: - key: 要获取的属性键名 - - Returns: - 属性值,如果键不存在则返回 None - """ - return self.data.get(key) - - def set_cycle_id(self, cycle_id: str) -> None: - """设置循环ID - - Args: - cycle_id (str): 循环的唯一标识符 - """ - self.data["cycle_id"] = cycle_id - - def set_start_time(self, start_time: str) -> None: - """设置开始时间 - - Args: - start_time (str): 循环开始的时间,建议使用标准时间格式 - """ - self.data["start_time"] = start_time - - def set_end_time(self, end_time: str) -> None: - """设置结束时间 - - Args: - end_time (str): 循环结束的时间,建议使用标准时间格式 - """ - self.data["end_time"] = end_time - - def set_action(self, action: str) -> None: - """设置采取的动作 - - Args: - action (str): 在循环中执行的动作名称 - """ - self.data["action"] = action - - def set_action_data(self, action_data: Dict[str, Any]) -> None: - """设置动作数据 - - Args: - action_data (Dict[str, Any]): 动作相关的详细数据,将被转换为字符串存储 - """ - self.data["action_data"] = str(action_data) - - def set_reason(self, reason: str) -> None: - """设置原因 - - Args: - reason (str): 触发循环的原因说明 - """ - self.data["reason"] = reason - - def set_observe_info(self, observe_info: str) -> None: - """设置回复信息 - - Args: - observe_info (str): 当前的回复信息 - """ - self.data["observe_info"] = observe_info - - def get_cycle_id(self) -> Optional[str]: - """获取循环ID - - Returns: - Optional[str]: 循环的唯一标识符,如果未设置则返回 None - """ - return self.get_info("cycle_id") - - def get_start_time(self) -> Optional[str]: - """获取开始时间 - - Returns: - Optional[str]: 循环开始的时间,如果未设置则返回 None - """ - return self.get_info("start_time") - - def get_end_time(self) -> Optional[str]: - """获取结束时间 - - Returns: - Optional[str]: 循环结束的时间,如果未设置则返回 None - """ - return self.get_info("end_time") - - def get_action(self) -> Optional[str]: - """获取采取的动作 - - Returns: - Optional[str]: 在循环中执行的动作名称,如果未设置则返回 None - """ - return self.get_info("action") - - def get_action_data(self) -> Optional[str]: - """获取动作数据 - - Returns: - Optional[str]: 动作相关的详细数据(字符串形式),如果未设置则返回 None - """ - return self.get_info("action_data") - - def get_reason(self) -> Optional[str]: - """获取原因 - - Returns: - Optional[str]: 触发循环的原因说明,如果未设置则返回 None - """ - return self.get_info("reason") - - def get_observe_info(self) -> Optional[str]: - """获取回复信息 - - Returns: - Optional[str]: 当前的回复信息,如果未设置则返回 None - """ - return self.get_info("observe_info") diff --git a/src/chat/focus_chat/info/info_base.py b/src/chat/focus_chat/info/info_base.py deleted file mode 100644 index 53ad30230..000000000 --- a/src/chat/focus_chat/info/info_base.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import Dict, Optional, Any, List -from dataclasses import dataclass, field - - -@dataclass -class InfoBase: - """信息基类 - - 这是一个基础信息类,用于存储和管理各种类型的信息数据。 - 所有具体的信息类都应该继承自这个基类。 - - Attributes: - type (str): 信息类型标识符,默认为 "base" - data (Dict[str, Union[str, Dict, list]]): 存储具体信息数据的字典, - 支持存储字符串、字典、列表等嵌套数据结构 - """ - - type: str = "base" - data: Dict[str, Any] = field(default_factory=dict) - processed_info: str = "" - - def get_type(self) -> str: - """获取信息类型 - - Returns: - str: 当前信息对象的类型标识符 - """ - return self.type - - def get_data(self) -> Dict[str, Any]: - """获取所有信息数据 - - Returns: - Dict[str, Any]: 包含所有信息数据的字典 - """ - return self.data - - def get_info(self, key: str) -> Optional[Any]: - """获取特定属性的信息 - - Args: - key: 要获取的属性键名 - - Returns: - Optional[Any]: 属性值,如果键不存在则返回 None - """ - return self.data.get(key) - - def get_info_list(self, key: str) -> List[Any]: - """获取特定属性的信息列表 - - Args: - key: 要获取的属性键名 - - Returns: - List[Any]: 属性值列表,如果键不存在则返回空列表 - """ - value = self.data.get(key) - if isinstance(value, list): - return value - return [] - - def get_processed_info(self) -> str: - """获取处理后的信息 - - Returns: - str: 处理后的信息字符串 - """ - return self.processed_info diff --git a/src/chat/focus_chat/info/obs_info.py b/src/chat/focus_chat/info/obs_info.py deleted file mode 100644 index 9cc1e1e9b..000000000 --- a/src/chat/focus_chat/info/obs_info.py +++ /dev/null @@ -1,165 +0,0 @@ -from typing import Dict, Optional -from dataclasses import dataclass -from .info_base import InfoBase - - -@dataclass -class ObsInfo(InfoBase): - """OBS信息类 - - 用于记录和管理OBS相关的信息,包括说话消息、截断后的说话消息和聊天类型。 - 继承自 InfoBase 类,使用字典存储具体数据。 - - Attributes: - type (str): 信息类型标识符,固定为 "obs" - - Data Fields: - talking_message (str): 说话消息内容 - talking_message_str_truncate (str): 截断后的说话消息内容 - talking_message_str_short (str): 简短版本的说话消息内容(使用最新一半消息) - talking_message_str_truncate_short (str): 截断简短版本的说话消息内容(使用最新一半消息) - chat_type (str): 聊天类型,可以是 "private"(私聊)、"group"(群聊)或 "other"(其他) - """ - - type: str = "obs" - - def set_talking_message(self, message: str) -> None: - """设置说话消息 - - Args: - message (str): 说话消息内容 - """ - self.data["talking_message"] = message - - def set_talking_message_str_truncate(self, message: str) -> None: - """设置截断后的说话消息 - - Args: - message (str): 截断后的说话消息内容 - """ - self.data["talking_message_str_truncate"] = message - - def set_talking_message_str_short(self, message: str) -> None: - """设置简短版本的说话消息 - - Args: - message (str): 简短版本的说话消息内容 - """ - self.data["talking_message_str_short"] = message - - def set_talking_message_str_truncate_short(self, message: str) -> None: - """设置截断简短版本的说话消息 - - Args: - message (str): 截断简短版本的说话消息内容 - """ - self.data["talking_message_str_truncate_short"] = message - - def set_previous_chat_info(self, message: str) -> None: - """设置之前聊天信息 - - Args: - message (str): 之前聊天信息内容 - """ - self.data["previous_chat_info"] = message - - def set_chat_type(self, chat_type: str) -> None: - """设置聊天类型 - - Args: - chat_type (str): 聊天类型,可以是 "private"(私聊)、"group"(群聊)或 "other"(其他) - """ - if chat_type not in ["private", "group", "other"]: - chat_type = "other" - self.data["chat_type"] = chat_type - - def set_chat_target(self, chat_target: str) -> None: - """设置聊天目标 - - Args: - chat_target (str): 聊天目标,可以是 "private"(私聊)、"group"(群聊)或 "other"(其他) - """ - self.data["chat_target"] = chat_target - - def set_chat_id(self, chat_id: str) -> None: - """设置聊天ID - - Args: - chat_id (str): 聊天ID - """ - self.data["chat_id"] = chat_id - - def get_chat_id(self) -> Optional[str]: - """获取聊天ID - - Returns: - Optional[str]: 聊天ID,如果未设置则返回 None - """ - return self.get_info("chat_id") - - def get_talking_message(self) -> Optional[str]: - """获取说话消息 - - Returns: - Optional[str]: 说话消息内容,如果未设置则返回 None - """ - return self.get_info("talking_message") - - def get_talking_message_str_truncate(self) -> Optional[str]: - """获取截断后的说话消息 - - Returns: - Optional[str]: 截断后的说话消息内容,如果未设置则返回 None - """ - return self.get_info("talking_message_str_truncate") - - def get_talking_message_str_short(self) -> Optional[str]: - """获取简短版本的说话消息 - - Returns: - Optional[str]: 简短版本的说话消息内容,如果未设置则返回 None - """ - return self.get_info("talking_message_str_short") - - def get_talking_message_str_truncate_short(self) -> Optional[str]: - """获取截断简短版本的说话消息 - - Returns: - Optional[str]: 截断简短版本的说话消息内容,如果未设置则返回 None - """ - return self.get_info("talking_message_str_truncate_short") - - def get_chat_type(self) -> str: - """获取聊天类型 - - Returns: - str: 聊天类型,默认为 "other" - """ - return self.get_info("chat_type") or "other" - - def get_type(self) -> str: - """获取信息类型 - - Returns: - str: 当前信息对象的类型标识符 - """ - return self.type - - def get_data(self) -> Dict[str, str]: - """获取所有信息数据 - - Returns: - Dict[str, str]: 包含所有信息数据的字典 - """ - return self.data - - def get_info(self, key: str) -> Optional[str]: - """获取特定属性的信息 - - Args: - key: 要获取的属性键名 - - Returns: - Optional[str]: 属性值,如果键不存在则返回 None - """ - return self.data.get(key) diff --git a/src/chat/focus_chat/info/workingmemory_info.py b/src/chat/focus_chat/info/workingmemory_info.py deleted file mode 100644 index 0a3282edf..000000000 --- a/src/chat/focus_chat/info/workingmemory_info.py +++ /dev/null @@ -1,86 +0,0 @@ -from typing import Dict, Optional, List -from dataclasses import dataclass -from .info_base import InfoBase - - -@dataclass -class WorkingMemoryInfo(InfoBase): - type: str = "workingmemory" - - processed_info: str = "" - - def set_talking_message(self, message: str) -> None: - """设置说话消息 - - Args: - message (str): 说话消息内容 - """ - self.data["talking_message"] = message - - def set_working_memory(self, working_memory: List[str]) -> None: - """设置工作记忆列表 - - Args: - working_memory (List[str]): 工作记忆内容列表 - """ - self.data["working_memory"] = working_memory - - def add_working_memory(self, working_memory: str) -> None: - """添加一条工作记忆 - - Args: - working_memory (str): 工作记忆内容,格式为"记忆要点:xxx" - """ - working_memory_list = self.data.get("working_memory", []) - working_memory_list.append(working_memory) - self.data["working_memory"] = working_memory_list - - def get_working_memory(self) -> List[str]: - """获取所有工作记忆 - - Returns: - List[str]: 工作记忆内容列表,每条记忆格式为"记忆要点:xxx" - """ - return self.data.get("working_memory", []) - - def get_type(self) -> str: - """获取信息类型 - - Returns: - str: 当前信息对象的类型标识符 - """ - return self.type - - def get_data(self) -> Dict[str, List[str]]: - """获取所有信息数据 - - Returns: - Dict[str, List[str]]: 包含所有信息数据的字典 - """ - return self.data - - def get_info(self, key: str) -> Optional[List[str]]: - """获取特定属性的信息 - - Args: - key: 要获取的属性键名 - - Returns: - Optional[List[str]]: 属性值,如果键不存在则返回 None - """ - return self.data.get(key) - - def get_processed_info(self) -> str: - """获取处理后的信息 - - Returns: - str: 处理后的信息数据,所有记忆要点按行拼接 - """ - all_memory = self.get_working_memory() - memory_str = "" - for memory in all_memory: - memory_str += f"{memory}\n" - - self.processed_info = memory_str - - return self.processed_info diff --git a/src/chat/focus_chat/info_processors/base_processor.py b/src/chat/focus_chat/info_processors/base_processor.py deleted file mode 100644 index 26396580c..000000000 --- a/src/chat/focus_chat/info_processors/base_processor.py +++ /dev/null @@ -1,51 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List, Any -from src.chat.focus_chat.info.info_base import InfoBase -from src.chat.focus_chat.observation.observation import Observation -from src.common.logger import get_logger - -logger = get_logger("base_processor") - - -class BaseProcessor(ABC): - """信息处理器基类 - - 所有具体的信息处理器都应该继承这个基类,并实现process_info方法。 - 支持处理InfoBase和Observation类型的输入。 - """ - - log_prefix = "Base信息处理器" - - @abstractmethod - def __init__(self): - """初始化处理器""" - - @abstractmethod - async def process_info( - self, - observations: List[Observation] = None, - **kwargs: Any, - ) -> List[InfoBase]: - """处理信息对象的抽象方法 - - Args: - infos: InfoBase对象列表 - observations: 可选的Observation对象列表 - **kwargs: 其他可选参数 - - Returns: - List[InfoBase]: 处理后的InfoBase实例列表 - """ - pass - - def _create_processed_item(self, info_type: str, info_data: Any) -> dict: - """创建处理后的信息项 - - Args: - info_type: 信息类型 - info_data: 信息数据 - - Returns: - dict: 处理后的信息项 - """ - return {"type": info_type, "id": f"info_{info_type}", "content": info_data, "ttl": 3} diff --git a/src/chat/focus_chat/info_processors/chattinginfo_processor.py b/src/chat/focus_chat/info_processors/chattinginfo_processor.py deleted file mode 100644 index a4aea17c4..000000000 --- a/src/chat/focus_chat/info_processors/chattinginfo_processor.py +++ /dev/null @@ -1,142 +0,0 @@ -from typing import List, Any -from src.chat.focus_chat.info.obs_info import ObsInfo -from src.chat.focus_chat.observation.observation import Observation -from src.chat.focus_chat.info.info_base import InfoBase -from .base_processor import BaseProcessor -from src.common.logger import get_logger -from src.chat.focus_chat.observation.chatting_observation import ChattingObservation -from datetime import datetime -from src.llm_models.utils_model import LLMRequest -from src.config.config import global_config - -logger = get_logger("processor") - - -class ChattingInfoProcessor(BaseProcessor): - """观察处理器 - - 用于处理Observation对象,将其转换为ObsInfo对象。 - """ - - log_prefix = "聊天信息处理" - - def __init__(self): - """初始化观察处理器""" - super().__init__() - # TODO: API-Adapter修改标记 - self.model_summary = LLMRequest( - model=global_config.model.utils_small, - temperature=0.7, - request_type="focus.observation.chat", - ) - - async def process_info( - self, - observations: List[Observation] = None, - **kwargs: Any, - ) -> List[InfoBase]: - """处理Observation对象 - - Args: - infos: InfoBase对象列表 - observations: 可选的Observation对象列表 - **kwargs: 其他可选参数 - - Returns: - List[InfoBase]: 处理后的ObsInfo实例列表 - """ - # print(f"observations: {observations}") - processed_infos = [] - - # 处理Observation对象 - if observations: - for obs in observations: - # print(f"obs: {obs}") - if isinstance(obs, ChattingObservation): - obs_info = ObsInfo() - - # 设置聊天ID - if hasattr(obs, "chat_id"): - obs_info.set_chat_id(obs.chat_id) - - # 设置说话消息 - if hasattr(obs, "talking_message_str"): - # print(f"设置说话消息:obs.talking_message_str: {obs.talking_message_str}") - obs_info.set_talking_message(obs.talking_message_str) - - # 设置截断后的说话消息 - if hasattr(obs, "talking_message_str_truncate"): - # print(f"设置截断后的说话消息:obs.talking_message_str_truncate: {obs.talking_message_str_truncate}") - obs_info.set_talking_message_str_truncate(obs.talking_message_str_truncate) - - # 设置简短版本的说话消息 - if hasattr(obs, "talking_message_str_short"): - obs_info.set_talking_message_str_short(obs.talking_message_str_short) - - # 设置截断简短版本的说话消息 - if hasattr(obs, "talking_message_str_truncate_short"): - obs_info.set_talking_message_str_truncate_short(obs.talking_message_str_truncate_short) - - if hasattr(obs, "mid_memory_info"): - # print(f"设置之前聊天信息:obs.mid_memory_info: {obs.mid_memory_info}") - obs_info.set_previous_chat_info(obs.mid_memory_info) - - # 设置聊天类型 - is_group_chat = obs.is_group_chat - if is_group_chat: - chat_type = "group" - else: - chat_type = "private" - if hasattr(obs, "chat_target_info") and obs.chat_target_info: - obs_info.set_chat_target(obs.chat_target_info.get("person_name", "某人")) - obs_info.set_chat_type(chat_type) - - # logger.debug(f"聊天信息处理器处理后的信息: {obs_info}") - - processed_infos.append(obs_info) - - return processed_infos - - async def chat_compress(self, obs: ChattingObservation): - log_msg = "" - if obs.compressor_prompt: - summary = "" - try: - summary_result, _ = await self.model_summary.generate_response_async(obs.compressor_prompt) - summary = "没有主题的闲聊" - if summary_result: - summary = summary_result - except Exception as e: - log_msg = f"总结主题失败 for chat {obs.chat_id}: {e}" - logger.error(log_msg) - else: - log_msg = f"chat_compress 完成 for chat {obs.chat_id}, summary: {summary}" - logger.info(log_msg) - - mid_memory = { - "id": str(int(datetime.now().timestamp())), - "theme": summary, - "messages": obs.oldest_messages, # 存储原始消息对象 - "readable_messages": obs.oldest_messages_str, - # "timestamps": oldest_timestamps, - "chat_id": obs.chat_id, - "created_at": datetime.now().timestamp(), - } - - obs.mid_memories.append(mid_memory) - if len(obs.mid_memories) > obs.max_mid_memory_len: - obs.mid_memories.pop(0) # 移除最旧的 - - mid_memory_str = "之前聊天的内容概述是:\n" - for mid_memory_item in obs.mid_memories: # 重命名循环变量以示区分 - time_diff = int((datetime.now().timestamp() - mid_memory_item["created_at"]) / 60) - mid_memory_str += ( - f"距离现在{time_diff}分钟前(聊天记录id:{mid_memory_item['id']}):{mid_memory_item['theme']}\n" - ) - obs.mid_memory_info = mid_memory_str - - obs.compressor_prompt = "" - obs.oldest_messages = [] - obs.oldest_messages_str = "" - - return log_msg diff --git a/src/chat/focus_chat/observation/actions_observation.py b/src/chat/focus_chat/observation/actions_observation.py deleted file mode 100644 index 125032140..000000000 --- a/src/chat/focus_chat/observation/actions_observation.py +++ /dev/null @@ -1,46 +0,0 @@ -# 定义了来自外部世界的信息 -# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体 -from datetime import datetime -from src.common.logger import get_logger -from src.chat.planner_actions.action_manager import ActionManager - -logger = get_logger("observation") - - -# 特殊的观察,专门用于观察动作 -# 所有观察的基类 -class ActionObservation: - def __init__(self, observe_id): - self.observe_info = "" - self.observe_id = observe_id - self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间 - self.action_manager: ActionManager = None - - self.all_actions = {} - self.all_using_actions = {} - - def get_observe_info(self): - return self.observe_info - - def set_action_manager(self, action_manager: ActionManager): - self.action_manager = action_manager - self.all_actions = self.action_manager.get_registered_actions() - - async def observe(self): - action_info_block = "" - self.all_using_actions = self.action_manager.get_using_actions() - for action_name, action_info in self.all_using_actions.items(): - action_info_block += f"\n{action_name}: {action_info.get('description', '')}" - action_info_block += "\n注意,除了上面动作选项之外,你在群聊里不能做其他任何事情,这是你能力的边界\n" - - self.observe_info = action_info_block - - def to_dict(self) -> dict: - """将观察对象转换为可序列化的字典""" - return { - "observe_info": self.observe_info, - "observe_id": self.observe_id, - "last_observe_time": self.last_observe_time, - "all_actions": self.all_actions, - "all_using_actions": self.all_using_actions, - } diff --git a/src/chat/focus_chat/observation/chatting_observation.py b/src/chat/focus_chat/observation/chatting_observation.py deleted file mode 100644 index 201e313fa..000000000 --- a/src/chat/focus_chat/observation/chatting_observation.py +++ /dev/null @@ -1,183 +0,0 @@ -from datetime import datetime -from src.config.config import global_config -from src.chat.utils.chat_message_builder import ( - get_raw_msg_before_timestamp_with_chat, - build_readable_messages, - get_raw_msg_by_timestamp_with_chat, - num_new_messages_since, - get_person_id_list, -) -from src.chat.utils.prompt_builder import global_prompt_manager, Prompt -from src.chat.focus_chat.observation.observation import Observation -from src.common.logger import get_logger -from src.chat.utils.utils import get_chat_type_and_target_info - -logger = get_logger("observation") - -# 定义提示模板 -Prompt( - """这是{chat_type_description},请总结以下聊天记录的主题: -{chat_logs} -请概括这段聊天记录的主题和主要内容 -主题:简短的概括,包括时间,人物和事件,不要超过20个字 -内容:具体的信息内容,包括人物、事件和信息,不要超过200个字,不要分点。 - -请用json格式返回,格式如下: -{{ - "theme": "主题,例如 2025-06-14 10:00:00 群聊 麦麦 和 网友 讨论了 游戏 的话题", - "content": "内容,可以是对聊天记录的概括,也可以是聊天记录的详细内容" -}} -""", - "chat_summary_prompt", -) - - -class ChattingObservation(Observation): - def __init__(self, chat_id): - super().__init__(chat_id) - self.chat_id = chat_id - self.platform = "qq" - - self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id) - - self.talking_message = [] - self.talking_message_str = "" - self.talking_message_str_truncate = "" - self.talking_message_str_short = "" - self.talking_message_str_truncate_short = "" - self.name = global_config.bot.nickname - self.nick_name = global_config.bot.alias_names - self.max_now_obs_len = global_config.chat.max_context_size - self.overlap_len = global_config.focus_chat.compressed_length - self.person_list = [] - self.compressor_prompt = "" - self.oldest_messages = [] - self.oldest_messages_str = "" - - self.last_observe_time = datetime.now().timestamp() - initial_messages = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 10) - initial_messages_short = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 5) - self.last_observe_time = initial_messages[-1]["time"] if initial_messages else self.last_observe_time - self.talking_message = initial_messages - self.talking_message_short = initial_messages_short - self.talking_message_str = build_readable_messages(self.talking_message, show_actions=True) - self.talking_message_str_truncate = build_readable_messages( - self.talking_message, show_actions=True, truncate=True - ) - self.talking_message_str_short = build_readable_messages(self.talking_message_short, show_actions=True) - self.talking_message_str_truncate_short = build_readable_messages( - self.talking_message_short, show_actions=True, truncate=True - ) - - def to_dict(self) -> dict: - """将观察对象转换为可序列化的字典""" - return { - "chat_id": self.chat_id, - "platform": self.platform, - "is_group_chat": self.is_group_chat, - "chat_target_info": self.chat_target_info, - "talking_message_str": self.talking_message_str, - "talking_message_str_truncate": self.talking_message_str_truncate, - "talking_message_str_short": self.talking_message_str_short, - "talking_message_str_truncate_short": self.talking_message_str_truncate_short, - "name": self.name, - "nick_name": self.nick_name, - "last_observe_time": self.last_observe_time, - } - - def get_observe_info(self, ids=None): - return self.talking_message_str - - async def observe(self): - # 自上一次观察的新消息 - new_messages_list = get_raw_msg_by_timestamp_with_chat( - chat_id=self.chat_id, - timestamp_start=self.last_observe_time, - timestamp_end=datetime.now().timestamp(), - limit=self.max_now_obs_len, - limit_mode="latest", - ) - - # print(f"new_messages_list: {new_messages_list}") - - last_obs_time_mark = self.last_observe_time - if new_messages_list: - self.last_observe_time = new_messages_list[-1]["time"] - self.talking_message.extend(new_messages_list) - - if len(self.talking_message) > self.max_now_obs_len: - # 计算需要移除的消息数量,保留最新的 max_now_obs_len 条 - messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len - oldest_messages = self.talking_message[:messages_to_remove_count] - self.talking_message = self.talking_message[messages_to_remove_count:] - - # 构建压缩提示 - oldest_messages_str = build_readable_messages( - messages=oldest_messages, timestamp_mode="normal_no_YMD", read_mark=0, show_actions=True - ) - - # 根据聊天类型选择提示模板 - prompt_template_name = "chat_summary_prompt" - if self.is_group_chat: - chat_type_description = "qq群聊的聊天记录" - else: - chat_target_name = "对方" - if self.chat_target_info: - chat_target_name = ( - self.chat_target_info.get("person_name") - or self.chat_target_info.get("user_nickname") - or chat_target_name - ) - chat_type_description = f"你和{chat_target_name}的私聊记录" - - prompt = await global_prompt_manager.format_prompt( - prompt_template_name, - chat_type_description=chat_type_description, - chat_logs=oldest_messages_str, - ) - - self.compressor_prompt = prompt - - # 构建当前消息 - self.talking_message_str = build_readable_messages( - messages=self.talking_message, - timestamp_mode="lite", - read_mark=last_obs_time_mark, - show_actions=True, - ) - self.talking_message_str_truncate = build_readable_messages( - messages=self.talking_message, - timestamp_mode="normal_no_YMD", - read_mark=last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - # 构建简短版本 - 使用最新一半的消息 - half_count = len(self.talking_message) // 2 - recent_messages = self.talking_message[-half_count:] if half_count > 0 else self.talking_message - - self.talking_message_str_short = build_readable_messages( - messages=recent_messages, - timestamp_mode="lite", - read_mark=last_obs_time_mark, - show_actions=True, - ) - self.talking_message_str_truncate_short = build_readable_messages( - messages=recent_messages, - timestamp_mode="normal_no_YMD", - read_mark=last_obs_time_mark, - truncate=True, - show_actions=True, - ) - - self.person_list = await get_person_id_list(self.talking_message) - - # logger.debug( - # f"Chat {self.chat_id} - 现在聊天内容:{self.talking_message_str}" - # ) - - async def has_new_messages_since(self, timestamp: float) -> bool: - """检查指定时间戳之后是否有新消息""" - count = num_new_messages_since(chat_id=self.chat_id, timestamp_start=timestamp) - return count > 0 diff --git a/src/chat/focus_chat/observation/observation.py b/src/chat/focus_chat/observation/observation.py deleted file mode 100644 index 272f43d99..000000000 --- a/src/chat/focus_chat/observation/observation.py +++ /dev/null @@ -1,25 +0,0 @@ -# 定义了来自外部世界的信息 -# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体 -from datetime import datetime -from src.common.logger import get_logger - -logger = get_logger("observation") - - -# 所有观察的基类 -class Observation: - def __init__(self, observe_id): - self.observe_info = "" - self.observe_id = observe_id - self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间 - - def to_dict(self) -> dict: - """将观察对象转换为可序列化的字典""" - return { - "observe_info": self.observe_info, - "observe_id": self.observe_id, - "last_observe_time": self.last_observe_time, - } - - async def observe(self): - pass diff --git a/src/chat/focus_chat/observation/working_observation.py b/src/chat/focus_chat/observation/working_observation.py deleted file mode 100644 index 6052a120a..000000000 --- a/src/chat/focus_chat/observation/working_observation.py +++ /dev/null @@ -1,34 +0,0 @@ -# 定义了来自外部世界的信息 -# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体 -from datetime import datetime -from src.common.logger import get_logger -from src.chat.focus_chat.working_memory.working_memory import WorkingMemory -from src.chat.focus_chat.working_memory.memory_item import MemoryItem -from typing import List -# Import the new utility function - -logger = get_logger("observation") - - -# 所有观察的基类 -class WorkingMemoryObservation: - def __init__(self, observe_id): - self.observe_info = "" - self.observe_id = observe_id - self.last_observe_time = datetime.now().timestamp() - - self.working_memory = WorkingMemory(chat_id=observe_id) - - self.retrieved_working_memory = [] - - def get_observe_info(self): - return self.working_memory - - def add_retrieved_working_memory(self, retrieved_working_memory: List[MemoryItem]): - self.retrieved_working_memory.append(retrieved_working_memory) - - def get_retrieved_working_memory(self): - return self.retrieved_working_memory - - async def observe(self): - pass diff --git a/src/chat/heart_flow/background_tasks.py b/src/chat/heart_flow/background_tasks.py deleted file mode 100644 index b24dad32b..000000000 --- a/src/chat/heart_flow/background_tasks.py +++ /dev/null @@ -1,173 +0,0 @@ -import asyncio -import traceback -from typing import Optional, Coroutine, Callable, Any, List -from src.common.logger import get_logger -from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager -from src.config.config import global_config - -logger = get_logger("background_tasks") - - -# 新增私聊激活检查间隔 -PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS = 5 # 与兴趣评估类似,设为5秒 - -CLEANUP_INTERVAL_SECONDS = 1200 - - -async def _run_periodic_loop( - task_name: str, interval: int, task_func: Callable[..., Coroutine[Any, Any, None]], **kwargs -): - """周期性任务主循环""" - while True: - start_time = asyncio.get_event_loop().time() - # logger.debug(f"开始执行后台任务: {task_name}") - - try: - await task_func(**kwargs) # 执行实际任务 - except asyncio.CancelledError: - logger.info(f"任务 {task_name} 已取消") - break - except Exception as e: - logger.error(f"任务 {task_name} 执行出错: {e}") - logger.error(traceback.format_exc()) - - # 计算并执行间隔等待 - elapsed = asyncio.get_event_loop().time() - start_time - sleep_time = max(0, interval - elapsed) - # if sleep_time < 0.1: # 任务超时处理, DEBUG 时可能干扰断点 - # logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)") - await asyncio.sleep(sleep_time) - - logger.debug(f"任务循环结束: {task_name}") # 调整日志信息 - - -class BackgroundTaskManager: - """管理 Heartflow 的后台周期性任务。""" - - def __init__( - self, - subheartflow_manager: SubHeartflowManager, - ): - self.subheartflow_manager = subheartflow_manager - - # Task references - self._cleanup_task: Optional[asyncio.Task] = None - self._hf_judge_state_update_task: Optional[asyncio.Task] = None - self._private_chat_activation_task: Optional[asyncio.Task] = None # 新增私聊激活任务引用 - self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks - - async def start_tasks(self): - """启动所有后台任务 - - 功能说明: - - 启动核心后台任务: 状态更新、清理、日志记录、兴趣评估和随机停用 - - 每个任务启动前检查是否已在运行 - - 将任务引用保存到任务列表 - """ - - task_configs = [] - - # 根据 chat_mode 条件添加其他任务 - if not (global_config.chat.chat_mode == "normal"): - task_configs.extend( - [ - ( - self._run_cleanup_cycle, - "info", - f"清理任务已启动 间隔:{CLEANUP_INTERVAL_SECONDS}s", - "_cleanup_task", - ), - # 新增私聊激活任务配置 - ( - # Use lambda to pass the interval to the runner function - lambda: self._run_private_chat_activation_cycle(PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS), - "debug", - f"私聊激活检查任务已启动 间隔:{PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS}s", - "_private_chat_activation_task", - ), - ] - ) - - # 统一启动所有任务 - for task_func, log_level, log_msg, task_attr_name in task_configs: - # 检查任务变量是否存在且未完成 - current_task_var = getattr(self, task_attr_name) - if current_task_var is None or current_task_var.done(): - new_task = asyncio.create_task(task_func()) - setattr(self, task_attr_name, new_task) # 更新任务变量 - if new_task not in self._tasks: # 避免重复添加 - self._tasks.append(new_task) - - # 根据配置记录不同级别的日志 - getattr(logger, log_level)(log_msg) - else: - logger.warning(f"{task_attr_name}任务已在运行") - - async def stop_tasks(self): - """停止所有后台任务。 - - 该方法会: - 1. 遍历所有后台任务并取消未完成的任务 - 2. 等待所有取消操作完成 - 3. 清空任务列表 - """ - logger.info("正在停止所有后台任务...") - cancelled_count = 0 - - # 第一步:取消所有运行中的任务 - for task in self._tasks: - if task and not task.done(): - task.cancel() # 发送取消请求 - cancelled_count += 1 - - # 第二步:处理取消结果 - if cancelled_count > 0: - logger.debug(f"正在等待{cancelled_count}个任务完成取消...") - # 使用gather等待所有取消操作完成,忽略异常 - await asyncio.gather(*[t for t in self._tasks if t and t.cancelled()], return_exceptions=True) - logger.info(f"成功取消{cancelled_count}个后台任务") - else: - logger.info("没有需要取消的后台任务") - - # 第三步:清空任务列表 - self._tasks = [] # 重置任务列表 - - # 状态转换处理 - - async def _perform_cleanup_work(self): - """执行子心流清理任务 - 1. 获取需要清理的不活跃子心流列表 - 2. 逐个停止这些子心流 - 3. 记录清理结果 - """ - # 获取需要清理的子心流列表(包含ID和原因) - flows_to_stop = self.subheartflow_manager.get_inactive_subheartflows() - - if not flows_to_stop: - return # 没有需要清理的子心流直接返回 - - logger.info(f"准备删除 {len(flows_to_stop)} 个不活跃(1h)子心流") - stopped_count = 0 - - # 逐个停止子心流 - for flow_id in flows_to_stop: - success = await self.subheartflow_manager.delete_subflow(flow_id) - if success: - stopped_count += 1 - logger.debug(f"[清理任务] 已停止子心流 {flow_id}") - - # 记录最终清理结果 - logger.info(f"[清理任务] 清理完成, 共停止 {stopped_count}/{len(flows_to_stop)} 个子心流") - - async def _run_cleanup_cycle(self): - await _run_periodic_loop( - task_name="Subflow Cleanup", interval=CLEANUP_INTERVAL_SECONDS, task_func=self._perform_cleanup_work - ) - - # 新增私聊激活任务运行器 - async def _run_private_chat_activation_cycle(self, interval: int): - await _run_periodic_loop( - task_name="Private Chat Activation Check", - interval=interval, - task_func=self.subheartflow_manager.sbhf_absent_private_into_focus, - ) diff --git a/src/chat/heart_flow/heartflow.py b/src/chat/heart_flow/heartflow.py index c8c5d1295..7ab71fc39 100644 --- a/src/chat/heart_flow/heartflow.py +++ b/src/chat/heart_flow/heartflow.py @@ -1,84 +1,56 @@ from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState from src.common.logger import get_logger -from typing import Any, Optional, List -from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager -from src.chat.heart_flow.background_tasks import BackgroundTaskManager # Import BackgroundTaskManager - +from typing import Any, Optional +from typing import Dict +from src.chat.message_receive.chat_stream import get_chat_manager logger = get_logger("heartflow") class Heartflow: - """主心流协调器,负责初始化并协调各个子系统: - - 状态管理 (MaiState) - - 子心流管理 (SubHeartflow) - - 后台任务 (BackgroundTaskManager) - """ + """主心流协调器,负责初始化并协调聊天""" def __init__(self): - # 子心流管理 (在初始化时传入 current_state) - self.subheartflow_manager: SubHeartflowManager = SubHeartflowManager() - - # 后台任务管理器 (整合所有定时任务) - self.background_task_manager: BackgroundTaskManager = BackgroundTaskManager( - subheartflow_manager=self.subheartflow_manager, - ) + self.subheartflows: Dict[Any, "SubHeartflow"] = {} async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]: - """获取或创建一个新的SubHeartflow实例 - 委托给 SubHeartflowManager""" - # 不再需要传入 self.current_state - return await self.subheartflow_manager.get_or_create_subheartflow(subheartflow_id) + """获取或创建一个新的SubHeartflow实例""" + if subheartflow_id in self.subheartflows: + subflow = self.subheartflows.get(subheartflow_id) + if subflow: + return subflow + + try: + new_subflow = SubHeartflow( + subheartflow_id, + ) + + await new_subflow.initialize() + + # 注册子心流 + self.subheartflows[subheartflow_id] = new_subflow + heartflow_name = get_chat_manager().get_stream_name(subheartflow_id) or subheartflow_id + logger.info(f"[{heartflow_name}] 开始接收消息") + + return new_subflow + except Exception as e: + logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True) + return None + async def force_change_subheartflow_status(self, subheartflow_id: str, status: ChatState) -> None: """强制改变子心流的状态""" # 这里的 message 是可选的,可能是一个消息对象,也可能是其他类型的数据 - return await self.subheartflow_manager.force_change_state(subheartflow_id, status) - - async def api_get_all_states(self): - """获取所有状态""" - return await self.interest_logger.api_get_all_states() - - async def api_get_subheartflow_cycle_info(self, subheartflow_id: str, history_len: int) -> Optional[dict]: - """获取子心流的循环信息""" - subheartflow = await self.subheartflow_manager.get_or_create_subheartflow(subheartflow_id) - if not subheartflow: - logger.warning(f"尝试获取不存在的子心流 {subheartflow_id} 的周期信息") - return None - heartfc_instance = subheartflow.heart_fc_instance - if not heartfc_instance: - logger.warning(f"子心流 {subheartflow_id} 没有心流实例,无法获取周期信息") - return None - - return heartfc_instance.get_cycle_history(last_n=history_len) - - async def api_get_normal_chat_replies(self, subheartflow_id: str, limit: int = 10) -> Optional[List[dict]]: - """获取子心流的NormalChat回复记录 - - Args: - subheartflow_id: 子心流ID - limit: 最大返回数量,默认10条 - - Returns: - Optional[List[dict]]: 回复记录列表,如果子心流不存在则返回None - """ - subheartflow = await self.subheartflow_manager.get_or_create_subheartflow(subheartflow_id) - if not subheartflow: - logger.warning(f"尝试获取不存在的子心流 {subheartflow_id} 的NormalChat回复记录") - return None - - return subheartflow.get_normal_chat_recent_replies(limit) - - async def heartflow_start_working(self): - """启动后台任务""" - await self.background_task_manager.start_tasks() - logger.info("[Heartflow] 后台任务已启动") - - # 根本不会用到这个函数吧,那样麦麦直接死了 - async def stop_working(self): - """停止所有任务和子心流""" - logger.info("[Heartflow] 正在停止任务和子心流...") - await self.background_task_manager.stop_tasks() - await self.subheartflow_manager.deactivate_all_subflows() - logger.info("[Heartflow] 所有任务和子心流已停止") + return await self.force_change_state(subheartflow_id, status) + + async def force_change_state(self, subflow_id: Any, target_state: ChatState) -> bool: + """强制改变指定子心流的状态""" + subflow = self.subheartflows.get(subflow_id) + if not subflow: + logger.warning(f"[强制状态转换]尝试转换不存在的子心流{subflow_id} 到 {target_state.value}") + return False + await subflow.change_chat_state(target_state) + logger.info(f"[强制状态转换]子心流 {subflow_id} 已转换到 {target_state.value}") + return True heartflow = Heartflow() diff --git a/src/chat/heart_flow/heartflow_message_processor.py b/src/chat/heart_flow/heartflow_message_processor.py index 56f4a73e2..f68139058 100644 --- a/src/chat/heart_flow/heartflow_message_processor.py +++ b/src/chat/heart_flow/heartflow_message_processor.py @@ -10,29 +10,13 @@ from src.common.logger import get_logger import re import math import traceback -from typing import Optional, Tuple +from typing import Tuple from src.person_info.relationship_manager import get_relationship_manager -# from ..message_receive.message_buffer import message_buffer logger = get_logger("chat") - -async def _handle_error(error: Exception, context: str, message: Optional[MessageRecv] = None) -> None: - """统一的错误处理函数 - - Args: - error: 捕获到的异常 - context: 错误发生的上下文描述 - message: 可选的消息对象,用于记录相关消息内容 - """ - logger.error(f"{context}: {error}") - logger.error(traceback.format_exc()) - if message and hasattr(message, "raw_message"): - logger.error(f"相关消息原始内容: {message.raw_message}") - - async def _process_relationship(message: MessageRecv) -> None: """处理用户关系逻辑 @@ -149,4 +133,5 @@ class HeartFCMessageReceiver: await _process_relationship(message) except Exception as e: - await _handle_error(e, "消息处理失败", message) + logger.error(f"消息处理失败: {e}") + print(traceback.format_exc()) diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index 6dee805a0..51b663dfe 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -44,10 +44,6 @@ class SubHeartflow: # 兴趣消息集合 self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {} - # 活动状态管理 - self.should_stop = False # 停止标志 - self.task: Optional[asyncio.Task] = None # 后台任务 - # focus模式退出冷却时间管理 self.last_focus_exit_time: float = 0 # 上次退出focus模式的时间 @@ -211,10 +207,6 @@ class SubHeartflow: await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0) logger.info(f"{log_prefix} HeartFChatting 循环已启动。") return True - except asyncio.TimeoutError: - logger.error(f"{log_prefix} 启动现有 HeartFChatting 循环超时") - # 超时时清理实例,准备重新创建 - self.heart_fc_instance = None except Exception as e: logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") logger.error(traceback.format_exc()) @@ -231,7 +223,6 @@ class SubHeartflow: logger.debug(f"{log_prefix} 创建新的 HeartFChatting 实例") self.heart_fc_instance = HeartFChatting( chat_id=self.subheartflow_id, - # observations=self.observations, on_stop_focus_chat=self._handle_stop_focus_chat_request, ) @@ -241,10 +232,6 @@ class SubHeartflow: logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") return True - except asyncio.TimeoutError: - logger.error(f"{log_prefix} 创建或启动新 HeartFChatting 实例超时") - self.heart_fc_instance = None # 超时时清理实例 - return False except Exception as e: logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}") logger.error(traceback.format_exc()) @@ -255,8 +242,6 @@ class SubHeartflow: logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}") logger.error(traceback.format_exc()) return False - finally: - logger.debug(f"{self.log_prefix} _start_heart_fc_chat 完成") async def change_chat_state(self, new_state: ChatState) -> None: """ @@ -312,25 +297,6 @@ class SubHeartflow: f"{log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。" ) - - def get_normal_chat_last_speak_time(self) -> float: - if self.normal_chat_instance: - return self.normal_chat_instance.last_speak_time - return 0 - - def get_normal_chat_recent_replies(self, limit: int = 10) -> List[dict]: - """获取NormalChat实例的最近回复记录 - - Args: - limit: 最大返回数量,默认10条 - - Returns: - List[dict]: 最近的回复记录列表,如果没有NormalChat实例则返回空列表 - """ - if self.normal_chat_instance: - return self.normal_chat_instance.get_recent_replies(limit) - return [] - def add_message_to_normal_chat_cache(self, message: MessageRecv, interest_value: float, is_mentioned: bool): self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) # 如果字典长度超过10,删除最旧的消息 @@ -338,55 +304,6 @@ class SubHeartflow: oldest_key = next(iter(self.interest_dict)) self.interest_dict.pop(oldest_key) - def get_normal_chat_action_manager(self): - """获取NormalChat的ActionManager实例 - - Returns: - ActionManager: NormalChat的ActionManager实例,如果不存在则返回None - """ - if self.normal_chat_instance: - return self.normal_chat_instance.get_action_manager() - return None - - async def get_full_state(self) -> dict: - """获取子心流的完整状态,包括兴趣、思维和聊天状态。""" - return { - "interest_state": "interest_state", - "chat_state": self.chat_state.chat_status.value, - "chat_state_changed_time": self.chat_state_changed_time, - } - - async def shutdown(self): - """安全地关闭子心流及其管理的任务""" - if self.should_stop: - logger.info(f"{self.log_prefix} 子心流已在关闭过程中。") - return - - logger.info(f"{self.log_prefix} 开始关闭子心流...") - self.should_stop = True # 标记为停止,让后台任务退出 - - # 使用新的停止方法 - await self._stop_normal_chat() - await self._stop_heart_fc_chat() - - # 取消可能存在的旧后台任务 (self.task) - if self.task and not self.task.done(): - logger.debug(f"{self.log_prefix} 取消子心流主任务 (Shutdown)...") - self.task.cancel() - try: - await asyncio.wait_for(self.task, timeout=1.0) # 给点时间响应取消 - except asyncio.CancelledError: - logger.debug(f"{self.log_prefix} 子心流主任务已取消 (Shutdown)。") - except asyncio.TimeoutError: - logger.warning(f"{self.log_prefix} 等待子心流主任务取消超时 (Shutdown)。") - except Exception as e: - logger.error(f"{self.log_prefix} 等待子心流主任务取消时发生错误 (Shutdown): {e}") - - self.task = None # 清理任务引用 - self.chat_state.chat_status = ChatState.ABSENT # 状态重置为不参与 - - logger.info(f"{self.log_prefix} 子心流关闭完成。") - def is_in_focus_cooldown(self) -> bool: """检查是否在focus模式的冷却期内 diff --git a/src/chat/heart_flow/subheartflow_manager.py b/src/chat/heart_flow/subheartflow_manager.py deleted file mode 100644 index 587234cba..000000000 --- a/src/chat/heart_flow/subheartflow_manager.py +++ /dev/null @@ -1,337 +0,0 @@ -import asyncio -import time -from typing import Dict, Any, Optional, List -from src.common.logger import get_logger -from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState - - -# 初始化日志记录器 - -logger = get_logger("subheartflow_manager") - -# 子心流管理相关常量 -INACTIVE_THRESHOLD_SECONDS = 3600 # 子心流不活跃超时时间(秒) -NORMAL_CHAT_TIMEOUT_SECONDS = 30 * 60 # 30分钟 - - -async def _try_set_subflow_absent_internal(subflow: "SubHeartflow", log_prefix: str) -> bool: - """ - 尝试将给定的子心流对象状态设置为 ABSENT (内部方法,不处理锁)。 - - Args: - subflow: 子心流对象。 - log_prefix: 用于日志记录的前缀 (例如 "[子心流管理]" 或 "[停用]")。 - - Returns: - bool: 如果状态成功变为 ABSENT 或原本就是 ABSENT,返回 True;否则返回 False。 - """ - flow_id = subflow.subheartflow_id - stream_name = get_chat_manager().get_stream_name(flow_id) or flow_id - - if subflow.chat_state.chat_status != ChatState.ABSENT: - logger.debug(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT") - try: - await subflow.change_chat_state(ChatState.ABSENT) - # 再次检查以确认状态已更改 (change_chat_state 内部应确保) - if subflow.chat_state.chat_status == ChatState.ABSENT: - return True - else: - logger.warning( - f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}" - ) - return False - except Exception as e: - logger.error(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT 时失败: {e}", exc_info=True) - return False - else: - logger.debug(f"{log_prefix} {stream_name} 已是 ABSENT 状态") - return True # 已经是目标状态,视为成功 - - -class SubHeartflowManager: - """管理所有活跃的 SubHeartflow 实例。""" - - def __init__(self): - self.subheartflows: Dict[Any, "SubHeartflow"] = {} - self._lock = asyncio.Lock() # 用于保护 self.subheartflows 的访问 - - async def force_change_state(self, subflow_id: Any, target_state: ChatState) -> bool: - """强制改变指定子心流的状态""" - async with self._lock: - subflow = self.subheartflows.get(subflow_id) - if not subflow: - logger.warning(f"[强制状态转换]尝试转换不存在的子心流{subflow_id} 到 {target_state.value}") - return False - await subflow.change_chat_state(target_state) - logger.info(f"[强制状态转换]子心流 {subflow_id} 已转换到 {target_state.value}") - return True - - def get_all_subheartflows(self) -> List["SubHeartflow"]: - """获取所有当前管理的 SubHeartflow 实例列表 (快照)。""" - return list(self.subheartflows.values()) - - async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]: - """获取或创建指定ID的子心流实例 - - Args: - subheartflow_id: 子心流唯一标识符 - mai_states 参数已被移除,使用 self.mai_state_info - - Returns: - 成功返回SubHeartflow实例,失败返回None - """ - async with self._lock: - # 检查是否已存在该子心流 - if subheartflow_id in self.subheartflows: - subflow = self.subheartflows[subheartflow_id] - if subflow.should_stop: - logger.warning(f"尝试获取已停止的子心流 {subheartflow_id},正在重新激活") - subflow.should_stop = False # 重置停止标志 - return subflow - - try: - new_subflow = SubHeartflow( - subheartflow_id, - ) - - # 然后再进行异步初始化,此时 SubHeartflow 内部若需启动 HeartFChatting,就能拿到 observation - await new_subflow.initialize() - - # 注册子心流 - self.subheartflows[subheartflow_id] = new_subflow - heartflow_name = get_chat_manager().get_stream_name(subheartflow_id) or subheartflow_id - logger.info(f"[{heartflow_name}] 开始接收消息") - - return new_subflow - except Exception as e: - logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True) - return None - - async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool: - """停止指定的子心流并将其状态设置为 ABSENT""" - log_prefix = "[子心流管理]" - async with self._lock: # 加锁以安全访问字典 - subheartflow = self.subheartflows.get(subheartflow_id) - - stream_name = get_chat_manager().get_stream_name(subheartflow_id) or subheartflow_id - logger.info(f"{log_prefix} 正在停止 {stream_name}, 原因: {reason}") - - # 调用内部方法处理状态变更 - success = await _try_set_subflow_absent_internal(subheartflow, log_prefix) - - return success - # 锁在此处自动释放 - - def get_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): - """识别并返回需要清理的不活跃(处于ABSENT状态超过一小时)子心流(id, 原因)""" - _current_time = time.time() - flows_to_stop = [] - - for subheartflow_id, subheartflow in list(self.subheartflows.items()): - state = subheartflow.chat_state.chat_status - if state != ChatState.ABSENT: - continue - subheartflow.update_last_chat_state_time() - _absent_last_time = subheartflow.chat_state_last_time - flows_to_stop.append(subheartflow_id) - - return flows_to_stop - - async def deactivate_all_subflows(self): - """将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)""" - log_prefix = "[停用]" - changed_count = 0 - processed_count = 0 - - async with self._lock: # 获取锁以安全迭代 - # 使用 list() 创建一个当前值的快照,防止在迭代时修改字典 - flows_to_update = list(self.subheartflows.values()) - processed_count = len(flows_to_update) - if not flows_to_update: - logger.debug(f"{log_prefix} 无活跃子心流,无需操作") - return - - for subflow in flows_to_update: - # 记录原始状态,以便统计实际改变的数量 - original_state_was_absent = subflow.chat_state.chat_status == ChatState.ABSENT - - success = await _try_set_subflow_absent_internal(subflow, log_prefix) - - # 如果成功设置为 ABSENT 且原始状态不是 ABSENT,则计数 - if success and not original_state_was_absent: - if subflow.chat_state.chat_status == ChatState.ABSENT: - changed_count += 1 - else: - # 这种情况理论上不应发生,如果内部方法返回 True 的话 - stream_name = ( - get_chat_manager().get_stream_name(subflow.subheartflow_id) or subflow.subheartflow_id - ) - logger.warning(f"{log_prefix} 内部方法声称成功但 {stream_name} 状态未变为 ABSENT。") - # 锁在此处自动释放 - - logger.info( - f"{log_prefix} 完成,共处理 {processed_count} 个子心流,成功将 {changed_count} 个非 ABSENT 子心流的状态更改为 ABSENT。" - ) - - # async def sbhf_normal_into_focus(self): - # """评估子心流兴趣度,满足条件则提升到FOCUSED状态(基于start_hfc_probability)""" - # try: - # for sub_hf in list(self.subheartflows.values()): - # flow_id = sub_hf.subheartflow_id - # stream_name = get_chat_manager().get_stream_name(flow_id) or flow_id - - # # 跳过已经是FOCUSED状态的子心流 - # if sub_hf.chat_state.chat_status == ChatState.FOCUSED: - # continue - - # if sub_hf.interest_chatting.start_hfc_probability == 0: - # continue - # else: - # logger.debug( - # f"{stream_name},现在状态: {sub_hf.chat_state.chat_status.value},进入专注概率: {sub_hf.interest_chatting.start_hfc_probability}" - # ) - - # if random.random() >= sub_hf.interest_chatting.start_hfc_probability: - # continue - - # # 获取最新状态并执行提升 - # current_subflow = self.subheartflows.get(flow_id) - # if not current_subflow: - # continue - - # logger.info( - # f"{stream_name} 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})" - # ) - - # # 执行状态提升 - # await current_subflow.change_chat_state(ChatState.FOCUSED) - - # except Exception as e: - # logger.error(f"启动HFC 兴趣评估失败: {e}", exc_info=True) - - async def sbhf_focus_into_normal(self, subflow_id: Any): - """ - 接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 NORMAL。 - 通常在连续多次 "no_reply" 后被调用。 - 对于私聊和群聊,都转换为 NORMAL。 - - Args: - subflow_id: 需要转换状态的子心流 ID。 - """ - async with self._lock: - subflow = self.subheartflows.get(subflow_id) - if not subflow: - logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 NORMAL") - return - - stream_name = get_chat_manager().get_stream_name(subflow_id) or subflow_id - current_state = subflow.chat_state.chat_status - - if current_state == ChatState.FOCUSED: - target_state = ChatState.NORMAL - log_reason = "转为NORMAL" - - logger.info( - f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 尝试转换为 {target_state.value} ({log_reason})" - ) - try: - # 从HFC到CHAT时,清空兴趣字典 - subflow.interest_dict.clear() - await subflow.change_chat_state(target_state) - final_state = subflow.chat_state.chat_status - if final_state == target_state: - logger.debug(f"[状态转换请求] {stream_name} 状态已成功转换为 {final_state.value}") - else: - logger.warning( - f"[状态转换请求] 尝试将 {stream_name} 转换为 {target_state.value} 后,状态实际为 {final_state.value}" - ) - except Exception as e: - logger.error( - f"[状态转换请求] 转换 {stream_name} 到 {target_state.value} 时出错: {e}", exc_info=True - ) - elif current_state == ChatState.ABSENT: - logger.debug(f"[状态转换请求] {stream_name} 处于 ABSENT 状态,尝试转为 NORMAL") - await subflow.change_chat_state(ChatState.NORMAL) - else: - logger.debug(f"[状态转换请求] {stream_name} 当前状态为 {current_state.value},无需转换") - - async def delete_subflow(self, subheartflow_id: Any): - """删除指定的子心流。""" - async with self._lock: - subflow = self.subheartflows.pop(subheartflow_id, None) - if subflow: - logger.info(f"正在删除 SubHeartflow: {subheartflow_id}...") - try: - # 调用 shutdown 方法确保资源释放 - await subflow.shutdown() - logger.info(f"SubHeartflow {subheartflow_id} 已成功删除。") - except Exception as e: - logger.error(f"删除 SubHeartflow {subheartflow_id} 时出错: {e}", exc_info=True) - else: - logger.warning(f"尝试删除不存在的 SubHeartflow: {subheartflow_id}") - - # --- 新增:处理私聊从 ABSENT 直接到 FOCUSED 的逻辑 --- # - async def sbhf_absent_private_into_focus(self): - """检查 ABSENT 状态的私聊子心流是否有新活动,若有则直接转换为 FOCUSED。""" - log_prefix_task = "[私聊激活检查]" - transitioned_count = 0 - checked_count = 0 - - async with self._lock: - # --- 筛选出所有 ABSENT 状态的私聊子心流 --- # - eligible_subflows = [ - hf - for hf in self.subheartflows.values() - if hf.chat_state.chat_status == ChatState.ABSENT and not hf.is_group_chat - ] - checked_count = len(eligible_subflows) - - if not eligible_subflows: - # logger.debug(f"{log_prefix_task} 没有 ABSENT 状态的私聊子心流可以评估。") - return - - # --- 遍历评估每个符合条件的私聊 --- # - for sub_hf in eligible_subflows: - flow_id = sub_hf.subheartflow_id - stream_name = get_chat_manager().get_stream_name(flow_id) or flow_id - log_prefix = f"[{stream_name}]({log_prefix_task})" - - try: - # --- 检查是否有新活动 --- # - observation = sub_hf._get_primary_observation() # 获取主要观察者 - is_active = False - if observation: - # 检查自上次状态变为 ABSENT 后是否有新消息 - # 使用 chat_state_changed_time 可能更精确 - # 加一点点缓冲时间(例如 1 秒)以防时间戳完全相等 - timestamp_to_check = sub_hf.chat_state_changed_time - 1 - has_new = await observation.has_new_messages_since(timestamp_to_check) - if has_new: - is_active = True - logger.debug(f"{log_prefix} 检测到新消息,标记为活跃。") - else: - logger.warning(f"{log_prefix} 无法获取主要观察者来检查活动状态。") - - # --- 如果活跃,则尝试转换 --- # - if is_active: - await sub_hf.change_chat_state(ChatState.FOCUSED) - # 确认转换成功 - if sub_hf.chat_state.chat_status == ChatState.FOCUSED: - transitioned_count += 1 - logger.info(f"{log_prefix} 成功进入 FOCUSED 状态。") - else: - logger.warning( - f"{log_prefix} 尝试进入 FOCUSED 状态失败。当前状态: {sub_hf.chat_state.chat_status.value}" - ) - # else: # 不活跃,无需操作 - # logger.debug(f"{log_prefix} 未检测到新活动,保持 ABSENT。") - - except Exception as e: - logger.error(f"{log_prefix} 检查私聊活动或转换状态时出错: {e}", exc_info=True) - - # --- 循环结束后记录总结日志 --- # - if transitioned_count > 0: - logger.debug( - f"{log_prefix_task} 完成,共检查 {checked_count} 个私聊,{transitioned_count} 个转换为 FOCUSED。" - ) diff --git a/src/chat/memory_system/memory_activator.py b/src/chat/memory_system/memory_activator.py index eb783d483..8640f2a88 100644 --- a/src/chat/memory_system/memory_activator.py +++ b/src/chat/memory_system/memory_activator.py @@ -80,12 +80,6 @@ class MemoryActivator: async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> List[Dict]: """ 激活记忆 - - Args: - observations: 现有的进行观察后的 观察列表 - - Returns: - List[Dict]: 激活的记忆列表 """ # 如果记忆系统被禁用,直接返回空列表 if not global_config.memory.enable_memory: diff --git a/src/chat/message_receive/__init__.py b/src/chat/message_receive/__init__.py index a900de6b4..d01bea726 100644 --- a/src/chat/message_receive/__init__.py +++ b/src/chat/message_receive/__init__.py @@ -1,6 +1,6 @@ from src.chat.emoji_system.emoji_manager import get_emoji_manager from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.message_receive.message_sender import message_manager +from src.chat.message_receive.normal_message_sender import message_manager from src.chat.message_receive.storage import MessageStorage diff --git a/src/chat/message_receive/message_sender.py b/src/chat/message_receive/normal_message_sender.py similarity index 100% rename from src/chat/message_receive/message_sender.py rename to src/chat/message_receive/normal_message_sender.py diff --git a/src/chat/focus_chat/heartFC_sender.py b/src/chat/message_receive/uni_message_sender.py similarity index 100% rename from src/chat/focus_chat/heartFC_sender.py rename to src/chat/message_receive/uni_message_sender.py diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 5e6b14f63..a1f1e1bdf 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -12,7 +12,7 @@ from src.chat.utils.timer_calculator import Timer from src.common.message_repository import count_messages from src.chat.utils.prompt_builder import global_prompt_manager from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet -from src.chat.message_receive.message_sender import message_manager +from src.chat.message_receive.normal_message_sender import message_manager from src.chat.normal_chat.willing.willing_manager import get_willing_manager from src.chat.planner_actions.action_manager import ActionManager from src.person_info.relationship_builder_manager import relationship_builder_manager diff --git a/src/chat/planner_actions/action_modifier.py b/src/chat/planner_actions/action_modifier.py index 44acabf9c..439558dd1 100644 --- a/src/chat/planner_actions/action_modifier.py +++ b/src/chat/planner_actions/action_modifier.py @@ -1,7 +1,6 @@ from typing import List, Optional, Any, Dict -from src.chat.focus_chat.observation.observation import Observation from src.common.logger import get_logger -from src.chat.focus_chat.observation.hfcloop_observation import HFCloopObservation +from src.chat.focus_chat.focus_loop_info import FocusLoopInfo from src.chat.message_receive.chat_stream import get_chat_manager from src.config.config import global_config from src.llm_models.utils_model import LLMRequest @@ -44,8 +43,8 @@ class ActionModifier: async def modify_actions( self, + loop_info = None, mode: str = "focus", - observations: Optional[List[Observation]] = None, message_content: str = "", ): """ @@ -83,13 +82,10 @@ class ActionModifier: chat_content = chat_content + "\n" + f"现在,最新的消息是:{message_content}" # === 第一阶段:传统观察处理 === - if observations: - for obs in observations: - if isinstance(obs, HFCloopObservation): - # 获取适用于FOCUS模式的动作 - removals_from_loop = await self.analyze_loop_actions(obs) - if removals_from_loop: - removals_s1.extend(removals_from_loop) + if loop_info: + removals_from_loop = await self.analyze_loop_actions(loop_info) + if removals_from_loop: + removals_s1.extend(removals_from_loop) # 检查动作的关联类型 chat_context = self.chat_stream.context @@ -466,7 +462,7 @@ class ActionModifier: logger.debug(f"{self.log_prefix}动作 {action_name} 未匹配到任何关键词: {activation_keywords}") return False - async def analyze_loop_actions(self, obs: HFCloopObservation) -> List[tuple[str, str]]: + async def analyze_loop_actions(self, obs: FocusLoopInfo) -> List[tuple[str, str]]: """分析最近的循环内容并决定动作的移除 Returns: diff --git a/src/chat/planner_actions/planner_focus.py b/src/chat/planner_actions/planner_focus.py index c52b8b486..2aef5f429 100644 --- a/src/chat/planner_actions/planner_focus.py +++ b/src/chat/planner_actions/planner_focus.py @@ -1,18 +1,18 @@ import json # <--- 确保导入 json import traceback -from typing import List, Dict, Any, Optional +from typing import Dict, Any, Optional from rich.traceback import install from src.llm_models.utils_model import LLMRequest from src.config.config import global_config -from src.chat.focus_chat.info.info_base import InfoBase -from src.chat.focus_chat.info.obs_info import ObsInfo -from src.chat.focus_chat.info.action_info import ActionInfo from src.common.logger import get_logger from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.chat.planner_actions.action_manager import ActionManager from json_repair import repair_json from src.chat.utils.utils import get_chat_type_and_target_info from datetime import datetime +from src.chat.message_receive.chat_stream import get_chat_manager +from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat +import time logger = get_logger("planner") @@ -38,23 +38,6 @@ def init_prompt(): "simple_planner_prompt", ) - Prompt( - """ -{time_block} -{indentify_block} -你现在需要根据聊天内容,选择的合适的action来参与聊天。 -{chat_context_description},以下是具体的聊天内容: -{chat_content_block} -{moderation_prompt} -现在请你选择合适的action: - -{action_options_text} - -请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容: -""", - "simple_planner_prompt_private", - ) - Prompt( """ 动作:{action_name} @@ -69,8 +52,10 @@ def init_prompt(): class ActionPlanner: - def __init__(self, log_prefix: str, action_manager: ActionManager): - self.log_prefix = log_prefix + def __init__(self, chat_id: str, action_manager: ActionManager): + self.chat_id = chat_id + self.log_prefix = f"[{get_chat_manager().get_stream_name(chat_id) or chat_id}]" + self.action_manager = action_manager # LLM规划器配置 self.planner_llm = LLMRequest( @@ -82,17 +67,12 @@ class ActionPlanner: model=global_config.model.utils_small, request_type="focus.planner", # 用于动作规划 ) + + self.last_obs_time_mark = 0.0 - async def plan( - self, all_plan_info: List[InfoBase],loop_start_time: float - ) -> Dict[str, Any]: + async def plan(self) -> Dict[str, Any]: """ 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 - - 参数: - all_plan_info: 所有计划信息 - running_memorys: 回忆信息 - loop_start_time: 循环开始时间 """ action = "no_reply" # 默认动作 @@ -100,42 +80,36 @@ class ActionPlanner: action_data = {} try: - # 获取观察信息 - extra_info: list[str] = [] - - extra_info = [] - observed_messages = [] - observed_messages_str = "" - chat_type = "group" is_group_chat = True - chat_id = None # 添加chat_id变量 + + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=self.chat_id, + timestamp=time.time(), + limit=global_config.chat.max_context_size, + ) - for info in all_plan_info: - if isinstance(info, ObsInfo): - observed_messages = info.get_talking_message() - observed_messages_str = info.get_talking_message_str_truncate_short() - chat_type = info.get_chat_type() - is_group_chat = chat_type == "group" - # 从ObsInfo中获取chat_id - chat_id = info.get_chat_id() - else: - extra_info.append(info.get_processed_info()) + chat_context = build_readable_messages( + messages=message_list_before_now, + timestamp_mode="normal_no_YMD", + read_mark=self.last_obs_time_mark, + truncate=True, + show_actions=True, + ) + + self.last_obs_time_mark = time.time() # 获取聊天类型和目标信息 chat_target_info = None - if chat_id: - try: - # 重新获取更准确的聊天信息 - is_group_chat_updated, chat_target_info = get_chat_type_and_target_info(chat_id) - # 如果获取成功,更新is_group_chat - if is_group_chat_updated is not None: - is_group_chat = is_group_chat_updated - logger.debug( - f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}" - ) - except Exception as e: - logger.warning(f"{self.log_prefix}获取聊天目标信息失败: {e}") - chat_target_info = None + + try: + # 重新获取更准确的聊天信息 + is_group_chat, chat_target_info = get_chat_type_and_target_info(self.chat_id) + logger.debug( + f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}" + ) + except Exception as e: + logger.warning(f"{self.log_prefix}获取聊天目标信息失败: {e}") + chat_target_info = None # 获取经过modify_actions处理后的最终可用动作集 # 注意:动作的激活判定现在在主循环的modify_actions中完成 @@ -164,14 +138,13 @@ class ActionPlanner: ) return { "action_result": {"action_type": action, "action_data": action_data, "reasoning": reasoning}, - "observed_messages": observed_messages, } # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- prompt = await self.build_planner_prompt( is_group_chat=is_group_chat, # <-- Pass HFC state chat_target_info=chat_target_info, # <-- 传递获取到的聊天目标信息 - observed_messages_str=observed_messages_str, # <-- Pass local variable + observed_messages_str=chat_context, # <-- Pass local variable current_available_actions=current_available_actions, # <-- Pass determined actions ) @@ -228,9 +201,6 @@ class ActionPlanner: if key not in ["action", "reasoning"]: action_data[key] = value - action_data["loop_start_time"] = loop_start_time - - # 对于reply动作不需要额外处理,因为相关字段已经在上面的循环中添加到action_data if extracted_action not in current_available_actions: logger.warning( @@ -265,7 +235,6 @@ class ActionPlanner: plan_result = { "action_result": action_result, - "observed_messages": observed_messages, "action_prompt": prompt, } @@ -276,7 +245,7 @@ class ActionPlanner: is_group_chat: bool, # Now passed as argument chat_target_info: Optional[dict], # Now passed as argument observed_messages_str: str, - current_available_actions: Dict[str, ActionInfo], + current_available_actions, ) -> str: """构建 Planner LLM 的提示词 (获取模板并填充数据)""" try: @@ -295,11 +264,9 @@ class ActionPlanner: chat_content_block = "你还未开始聊天" action_options_block = "" - # 根据聊天类型选择不同的动作prompt模板 - action_template_name = "action_prompt_private" if not is_group_chat else "action_prompt" for using_actions_name, using_actions_info in current_available_actions.items(): - using_action_prompt = await global_prompt_manager.get_prompt_async(action_template_name) + if using_actions_info["parameters"]: param_text = "\n" @@ -314,22 +281,13 @@ class ActionPlanner: require_text += f"- {require_item}\n" require_text = require_text.rstrip("\n") - # 根据模板类型决定是否包含description参数 - if action_template_name == "action_prompt_private": - # 私聊模板不包含description参数 - using_action_prompt = using_action_prompt.format( - action_name=using_actions_name, - action_parameters=param_text, - action_require=require_text, - ) - else: - # 群聊模板包含description参数 - using_action_prompt = using_action_prompt.format( - action_name=using_actions_name, - action_description=using_actions_info["description"], - action_parameters=param_text, - action_require=require_text, - ) + using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt") + using_action_prompt = using_action_prompt.format( + action_name=using_actions_name, + action_description=using_actions_info["description"], + action_parameters=param_text, + action_require=require_text, + ) action_options_block += using_action_prompt @@ -347,9 +305,7 @@ class ActionPlanner: bot_core_personality = global_config.personality.personality_core indentify_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}:" - # 根据聊天类型选择不同的prompt模板 - template_name = "simple_planner_prompt_private" if not is_group_chat else "simple_planner_prompt" - planner_prompt_template = await global_prompt_manager.get_prompt_async(template_name) + planner_prompt_template = await global_prompt_manager.get_prompt_async("simple_planner_prompt") prompt = planner_prompt_template.format( time_block=time_block, chat_context_description=chat_context_description, diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 62ff926f5..befa22230 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -9,7 +9,7 @@ from src.common.logger import get_logger from src.llm_models.utils_model import LLMRequest from src.config.config import global_config from src.chat.utils.timer_calculator import Timer # <--- Import Timer -from src.chat.focus_chat.heartFC_sender import HeartFCSender +from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.utils.utils import get_chat_type_and_target_info from src.chat.message_receive.chat_stream import ChatStream from src.chat.focus_chat.hfc_utils import parse_thinking_id_to_timestamp diff --git a/src/chat/utils/statistic.py b/src/chat/utils/statistic.py index bb3f53a1a..25d231c01 100644 --- a/src/chat/utils/statistic.py +++ b/src/chat/utils/statistic.py @@ -1243,7 +1243,7 @@ class StatisticOutputTask(AsyncTask): focus_chat_rows = "" if stat_data[FOCUS_AVG_TIMES_BY_CHAT_ACTION]: # 获取前三个阶段(不包括执行动作) - basic_stages = ["观察", "并行调整动作、处理", "规划器"] + basic_stages = ["观察", "规划器"] existing_basic_stages = [] for stage in basic_stages: # 检查是否有任何聊天流在这个阶段有数据 @@ -1352,7 +1352,7 @@ class StatisticOutputTask(AsyncTask): focus_action_stage_rows = "" if stat_data[FOCUS_AVG_TIMES_BY_ACTION]: # 获取所有阶段(按固定顺序) - stage_order = ["观察", "并行调整动作、处理", "规划器", "执行动作"] + stage_order = ["观察", "规划器", "执行动作"] all_stages = [] for stage in stage_order: if any(stage in stage_times for stage_times in stat_data[FOCUS_AVG_TIMES_BY_ACTION].values()): @@ -1618,7 +1618,7 @@ class StatisticOutputTask(AsyncTask): focus_version_stage_rows = "" if stat_data[FOCUS_AVG_TIMES_BY_VERSION]: # 基础三个阶段 - basic_stages = ["观察", "并行调整动作、处理", "规划器"] + basic_stages = ["观察", "规划器"] # 获取所有action类型用于执行时间列 all_action_types_for_exec = set() diff --git a/src/chat/focus_chat/working_memory/memory_item.py b/src/chat/working_memory/memory_item.py similarity index 100% rename from src/chat/focus_chat/working_memory/memory_item.py rename to src/chat/working_memory/memory_item.py diff --git a/src/chat/focus_chat/working_memory/memory_manager.py b/src/chat/working_memory/memory_manager.py similarity index 100% rename from src/chat/focus_chat/working_memory/memory_manager.py rename to src/chat/working_memory/memory_manager.py diff --git a/src/chat/focus_chat/working_memory/working_memory.py b/src/chat/working_memory/working_memory.py similarity index 100% rename from src/chat/focus_chat/working_memory/working_memory.py rename to src/chat/working_memory/working_memory.py diff --git a/src/chat/focus_chat/info_processors/working_memory_processor.py b/src/chat/working_memory/working_memory_processor.py similarity index 98% rename from src/chat/focus_chat/info_processors/working_memory_processor.py rename to src/chat/working_memory/working_memory_processor.py index ad2c88876..562278462 100644 --- a/src/chat/focus_chat/info_processors/working_memory_processor.py +++ b/src/chat/working_memory/working_memory_processor.py @@ -7,7 +7,6 @@ import traceback from src.common.logger import get_logger from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.chat.message_receive.chat_stream import get_chat_manager -from .base_processor import BaseProcessor from typing import List from src.chat.focus_chat.observation.working_observation import WorkingMemoryObservation from src.chat.focus_chat.working_memory.working_memory import WorkingMemory @@ -44,12 +43,10 @@ def init_prompt(): Prompt(memory_proces_prompt, "prompt_memory_proces") -class WorkingMemoryProcessor(BaseProcessor): +class WorkingMemoryProcessor: log_prefix = "工作记忆" def __init__(self, subheartflow_id: str): - super().__init__() - self.subheartflow_id = subheartflow_id self.llm_model = LLMRequest( diff --git a/src/common/logger.py b/src/common/logger.py index c0fa7be2d..6be06d241 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -352,7 +352,6 @@ MODULE_COLORS = { "heartflow_utils": "\033[38;5;219m", # 浅粉色 "sub_heartflow": "\033[38;5;207m", # 粉紫色 "subheartflow_manager": "\033[38;5;201m", # 深粉色 - "observation": "\033[38;5;141m", # 紫色 "background_tasks": "\033[38;5;240m", # 灰色 "chat_message": "\033[38;5;45m", # 青色 "chat_stream": "\033[38;5;51m", # 亮青色 diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 1c28ab7c8..e8ecb2885 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -295,20 +295,12 @@ class NormalChatConfig(ConfigBase): class FocusChatConfig(ConfigBase): """专注聊天配置类""" - compressed_length: int = 5 - """心流上下文压缩的最短压缩长度,超过心流观察到的上下文长度,会压缩,最短压缩长度为5""" - - compress_length_limit: int = 5 - """最多压缩份数,超过该数值的压缩上下文会被删除""" - think_interval: float = 1 """思考间隔(秒)""" consecutive_replies: float = 1 """连续回复能力,值越高,麦麦连续回复的概率越高""" - working_memory_processor: bool = False - """是否启用工作记忆处理器""" @dataclass diff --git a/src/experimental/PFC/message_sender.py b/src/experimental/PFC/message_sender.py index 841ebe450..d0816d8b5 100644 --- a/src/experimental/PFC/message_sender.py +++ b/src/experimental/PFC/message_sender.py @@ -5,7 +5,7 @@ from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.message import Message from maim_message import UserInfo, Seg from src.chat.message_receive.message import MessageSending, MessageSet -from src.chat.message_receive.message_sender import message_manager +from src.chat.message_receive.normal_message_sender import message_manager from src.chat.message_receive.storage import MessageStorage from src.config.config import global_config from rich.traceback import install diff --git a/src/main.py b/src/main.py index 768913c4b..fae064773 100644 --- a/src/main.py +++ b/src/main.py @@ -10,8 +10,7 @@ from src.manager.mood_manager import MoodPrintTask, MoodUpdateTask from src.chat.emoji_system.emoji_manager import get_emoji_manager from src.chat.normal_chat.willing.willing_manager import get_willing_manager from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.heart_flow.heartflow import heartflow -from src.chat.message_receive.message_sender import message_manager +from src.chat.message_receive.normal_message_sender import message_manager from src.chat.message_receive.storage import MessageStorage from src.config.config import global_config from src.chat.message_receive.bot import chat_bot @@ -142,10 +141,6 @@ class MainSystem: await message_manager.start() logger.info("全局消息管理器启动成功") - # 启动心流系统主循环 - asyncio.create_task(heartflow.heartflow_start_working()) - logger.info("心流系统启动成功") - init_time = int(1000 * (time.time() - init_start_time)) logger.info(f"初始化完成,神经元放电{init_time}次") except Exception as e: diff --git a/src/plugin_system/apis/chat_api.py b/src/plugin_system/apis/chat_api.py index 23a5a3be0..b56142a47 100644 --- a/src/plugin_system/apis/chat_api.py +++ b/src/plugin_system/apis/chat_api.py @@ -17,7 +17,6 @@ from src.common.logger import get_logger # 导入依赖 from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.chat.focus_chat.info.obs_info import ObsInfo logger = get_logger("chat_api") @@ -193,39 +192,6 @@ class ChatManager: logger.error(f"[ChatAPI] 获取聊天流信息失败: {e}") return {} - @staticmethod - def get_recent_messages_from_obs(observations: List[Any], count: int = 5) -> List[Dict[str, Any]]: - """从观察对象获取最近的消息 - - Args: - observations: 观察对象列表 - count: 要获取的消息数量 - - Returns: - List[Dict]: 消息列表,每个消息包含发送者、内容等信息 - """ - messages = [] - - try: - if observations and len(observations) > 0: - obs = observations[0] - if hasattr(obs, "get_talking_message"): - obs: ObsInfo - raw_messages = obs.get_talking_message() - # 转换为简化格式 - for msg in raw_messages[-count:]: - simple_msg = { - "sender": msg.get("sender", "未知"), - "content": msg.get("content", ""), - "timestamp": msg.get("timestamp", 0), - } - messages.append(simple_msg) - logger.debug(f"[ChatAPI] 获取到 {len(messages)} 条最近消息") - except Exception as e: - logger.error(f"[ChatAPI] 获取最近消息失败: {e}") - - return messages - @staticmethod def get_streams_summary() -> Dict[str, int]: """获取聊天流统计摘要 diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index c0486e164..7a6bd1be1 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -28,7 +28,7 @@ from src.common.logger import get_logger # 导入依赖 from src.chat.message_receive.chat_stream import get_chat_manager -from src.chat.focus_chat.heartFC_sender import HeartFCSender +from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.message_receive.message import MessageSending, MessageRecv from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat from src.person_info.person_info import get_person_info_manager diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index a68091b96..cc5cbc261 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -44,7 +44,6 @@ class BaseAction(ABC): reasoning: 执行该动作的理由 cycle_timers: 计时器字典 thinking_id: 思考ID - observations: 观察列表 expressor: 表达器对象 replyer: 回复器对象 chat_stream: 聊天流对象 diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 478d62ed8..e269cdddf 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "3.4.0" +version = "3.5.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -61,12 +61,15 @@ enable_relationship = true # 是否启用关系系统 relation_frequency = 1 # 关系频率,麦麦构建关系的速度,仅在normal_chat模式下有效 [chat] #麦麦的聊天通用设置 -chat_mode = "normal" # 聊天模式 —— 普通模式:normal,专注模式:focus,自动auto:在普通模式和专注模式之间自动切换 -# chat_mode = "focus" -# chat_mode = "auto" +chat_mode = "normal" # 聊天模式 —— 普通模式:normal,专注模式:focus,在普通模式和专注模式之间自动切换 +auto_focus_threshold = 1 # 自动切换到专注聊天的阈值,越低越容易进入专注聊天 +exit_focus_threshold = 1 # 自动退出专注聊天的阈值,越低越容易退出专注聊天 +# 普通模式下,麦麦会针对感兴趣的消息进行回复,token消耗量较低 +# 专注模式下,麦麦会进行主动的观察,并给出回复,token消耗量略高,但是回复时机更准确 +# 自动模式下,麦麦会根据消息内容自动切换到专注模式或普通模式 max_context_size = 18 # 上下文长度 - +thinking_timeout = 20 # 麦麦一次回复最长思考规划时间,超过这个时间的思考会放弃(往往是api反应太慢) replyer_random_probability = 0.5 # 首要replyer模型被选择的概率 talk_frequency = 1 # 麦麦回复频率,越高,麦麦回复越频繁 @@ -96,13 +99,6 @@ talk_frequency_adjust = [ # - 时间支持跨天,例如 "00:10,0.3" 表示从凌晨0:10开始使用频率0.3 # - 系统会自动将 "platform:id:type" 转换为内部的哈希chat_id进行匹配 -auto_focus_threshold = 1 # 自动切换到专注聊天的阈值,越低越容易进入专注聊天 -exit_focus_threshold = 1 # 自动退出专注聊天的阈值,越低越容易退出专注聊天 -# 普通模式下,麦麦会针对感兴趣的消息进行回复,token消耗量较低 -# 专注模式下,麦麦会进行主动的观察和回复,并给出回复,token消耗量较高 -# 自动模式下,麦麦会根据消息内容自动切换到专注模式或普通模式 - -thinking_timeout = 30 # 麦麦一次回复最长思考规划时间,超过这个时间的思考会放弃(往往是api反应太慢) [message_receive] # 以下是消息过滤,可以根据规则过滤特定消息,将不会读取这些消息 @@ -127,9 +123,6 @@ enable_planner = true # 是否启用动作规划器(与focus_chat共享actions [focus_chat] #专注聊天 think_interval = 3 # 思考间隔 单位秒,可以有效减少消耗 consecutive_replies = 1 # 连续回复能力,值越高,麦麦连续回复的概率越高 -compressed_length = 8 # 不能大于observation_context_size,心流上下文压缩的最短压缩长度,超过心流观察到的上下文长度,会压缩,最短压缩长度为5 -compress_length_limit = 4 #最多压缩份数,超过该数值的压缩上下文会被删除 -working_memory_processor = false # 是否启用工作记忆处理器,消耗量大 [tool] enable_in_normal_chat = false # 是否在普通聊天中启用工具