From 49c2bc854c02871998b39c12e4d2ce25c65d34f8 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 18 May 2025 18:15:38 +0800 Subject: [PATCH] =?UTF-8?q?refactor=EF=BC=9A=E9=87=8D=E6=9E=84=E8=81=8A?= =?UTF-8?q?=E5=A4=A9=E7=8A=B6=E6=80=81=E5=88=87=E6=8D=A2=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=EF=BC=8C=E7=A7=BB=E9=99=A4=E9=99=90=E9=A2=9D=EF=BC=8C=E7=B2=BE?= =?UTF-8?q?=E7=AE=80=E5=88=87=E6=8D=A2=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/HeartFC_system.md | 2 +- src/chat/focus_chat/heartFC_chat.py | 9 +- .../focus_chat/heartflow_prompt_builder.py | 6 +- src/chat/focus_chat/info/action_info.py | 83 +++ .../info_processors/action_processor.py | 126 +++++ .../info_processors/self_processor.py | 5 +- .../focus_chat/planners/action_manager.py | 13 - .../actions/exit_focus_chat_action.py | 108 ++++ .../planners/actions/no_reply_action.py | 16 - .../planners/actions/plugin_action.py | 2 - .../planners/actions/reply_action.py | 5 - src/chat/focus_chat/planners/planner.py | 59 +- src/chat/heart_flow/background_tasks.py | 73 +-- src/chat/heart_flow/interest_logger.py | 212 ------- src/chat/heart_flow/mai_state_manager.py | 90 +-- .../observation/hfcloop_observation.py | 5 +- src/chat/heart_flow/sub_heartflow.py | 60 +- src/chat/heart_flow/subheartflow_manager.py | 524 ++---------------- src/chat/message_receive/bot.py | 41 -- src/chat/normal_chat/normal_chat.py | 10 +- src/chat/utils/utils.py | 73 ++- src/common/logger.py | 36 ++ src/common/logger_manager.py | 4 + template/bot_config_template.toml | 8 +- 24 files changed, 541 insertions(+), 1029 deletions(-) create mode 100644 src/chat/focus_chat/info/action_info.py create mode 100644 src/chat/focus_chat/info_processors/action_processor.py create mode 100644 src/chat/focus_chat/planners/actions/exit_focus_chat_action.py delete mode 100644 src/chat/heart_flow/interest_logger.py diff --git a/docs/HeartFC_system.md b/docs/HeartFC_system.md index a55f1c973..e48a7b5d7 100644 --- a/docs/HeartFC_system.md +++ b/docs/HeartFC_system.md @@ -149,7 +149,7 @@ c HeartFChatting工作方式 - **状态及含义**: - `ChatState.ABSENT` (不参与/没在看): 初始或停用状态。子心流不观察新信息,不进行思考,也不回复。 - `ChatState.CHAT` (随便看看/水群): 普通聊天模式。激活 `NormalChatInstance`。 - * `ChatState.FOCUSED` (专注/认真水群): 专注聊天模式。激活 `HeartFlowChatInstance`。 + * `ChatState.FOCUSED` (专注/认真聊天): 专注聊天模式。激活 `HeartFlowChatInstance`。 - **选择**: 子心流可以根据外部指令(来自 `SubHeartflowManager`)或内部逻辑(未来的扩展)选择进入 `ABSENT` 状态(不回复不观察),或进入 `CHAT` / `FOCUSED` 中的一种回复模式。 - **状态转换机制** (由 `SubHeartflowManager` 驱动,更细致的说明): - **初始状态**: 新创建的 `SubHeartflow` 默认为 `ABSENT` 状态。 diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 0f5371a36..4f17f9bdf 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -68,7 +68,6 @@ class HeartFChatting: self, chat_id: str, observations: list[Observation], - on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]], ): """ HeartFChatting 初始化函数 @@ -76,12 +75,10 @@ class HeartFChatting: 参数: chat_id: 聊天流唯一标识符(如stream_id) observations: 关联的观察列表 - on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的异步回调函数 """ # 基础属性 self.stream_id: str = chat_id # 聊天流ID self.chat_stream: Optional[ChatStream] = None # 关联的聊天流 - self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback self.log_prefix: str = str(chat_id) # Initial default, will be updated self.hfcloop_observation = HFCloopObservation(observe_id=self.stream_id) self.chatting_observation = observations[0] @@ -165,7 +162,7 @@ class HeartFChatting: 启动 HeartFChatting 的主循环。 注意:调用此方法前必须确保已经成功初始化。 """ - logger.info(f"{self.log_prefix} 开始认真水群(HFC)...") + logger.info(f"{self.log_prefix} 开始认真聊天(HFC)...") await self._start_loop_if_needed() async def _start_loop_if_needed(self): @@ -463,11 +460,7 @@ class HeartFChatting: observations=self.all_observations, expressor=self.expressor, chat_stream=self.chat_stream, - current_cycle=self._current_cycle, log_prefix=self.log_prefix, - on_consecutive_no_reply_callback=self.on_consecutive_no_reply_callback, - # total_no_reply_count=self.total_no_reply_count, - # total_waiting_time=self.total_waiting_time, shutting_down=self._shutting_down, ) diff --git a/src/chat/focus_chat/heartflow_prompt_builder.py b/src/chat/focus_chat/heartflow_prompt_builder.py index d8d2b836f..532ceccd1 100644 --- a/src/chat/focus_chat/heartflow_prompt_builder.py +++ b/src/chat/focus_chat/heartflow_prompt_builder.py @@ -234,7 +234,8 @@ class PromptBuilder: reply_style2=reply_style2_chosen, keywords_reaction_prompt=keywords_reaction_prompt, prompt_ger=prompt_ger, - moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + # moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + moderation_prompt="", ) else: template_name = "reasoning_prompt_private_main" @@ -256,7 +257,8 @@ class PromptBuilder: reply_style2=reply_style2_chosen, keywords_reaction_prompt=keywords_reaction_prompt, prompt_ger=prompt_ger, - moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + # moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + moderation_prompt="", ) # --- End choosing template --- diff --git a/src/chat/focus_chat/info/action_info.py b/src/chat/focus_chat/info/action_info.py new file mode 100644 index 000000000..1bb6b96a6 --- /dev/null +++ b/src/chat/focus_chat/info/action_info.py @@ -0,0 +1,83 @@ +from typing import Dict, Optional, Any, List +from dataclasses import dataclass +from .info_base import InfoBase + + +@dataclass +class ActionInfo(InfoBase): + """动作信息类 + + 用于管理和记录动作的变更信息,包括需要添加或移除的动作。 + 继承自 InfoBase 类,使用字典存储具体数据。 + + Attributes: + type (str): 信息类型标识符,固定为 "action" + + Data Fields: + add_actions (List[str]): 需要添加的动作列表 + remove_actions (List[str]): 需要移除的动作列表 + reason (str): 变更原因说明 + """ + + type: str = "action" + + def get_type(self) -> str: + """获取信息类型""" + return self.type + + def get_data(self) -> Dict[str, Any]: + """获取信息数据""" + return self.data + + def set_action_changes(self, action_changes: Dict[str, List[str]]) -> None: + """设置动作变更信息 + + Args: + action_changes (Dict[str, List[str]]): 包含要增加和删除的动作列表 + { + "add": ["action1", "action2"], + "remove": ["action3"] + } + """ + self.data["add_actions"] = action_changes.get("add", []) + self.data["remove_actions"] = action_changes.get("remove", []) + + def set_reason(self, reason: str) -> None: + """设置变更原因 + + Args: + reason (str): 动作变更的原因说明 + """ + self.data["reason"] = reason + + def get_add_actions(self) -> List[str]: + """获取需要添加的动作列表 + + Returns: + List[str]: 需要添加的动作列表 + """ + return self.data.get("add_actions", []) + + def get_remove_actions(self) -> List[str]: + """获取需要移除的动作列表 + + Returns: + List[str]: 需要移除的动作列表 + """ + return self.data.get("remove_actions", []) + + def get_reason(self) -> Optional[str]: + """获取变更原因 + + Returns: + Optional[str]: 动作变更的原因说明,如果未设置则返回 None + """ + return self.data.get("reason") + + def has_changes(self) -> bool: + """检查是否有动作变更 + + Returns: + bool: 如果有任何动作需要添加或移除则返回True + """ + return bool(self.get_add_actions() or self.get_remove_actions()) \ No newline at end of file diff --git a/src/chat/focus_chat/info_processors/action_processor.py b/src/chat/focus_chat/info_processors/action_processor.py new file mode 100644 index 000000000..a952b38c8 --- /dev/null +++ b/src/chat/focus_chat/info_processors/action_processor.py @@ -0,0 +1,126 @@ +from typing import List, Optional, Any +from src.chat.focus_chat.info.obs_info import ObsInfo +from src.chat.heart_flow.observation.observation import Observation +from src.chat.focus_chat.info.info_base import InfoBase +from src.chat.focus_chat.info.action_info import ActionInfo +from .base_processor import BaseProcessor +from src.common.logger_manager import get_logger +from src.chat.heart_flow.observation.chatting_observation import ChattingObservation +from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservation +from src.chat.focus_chat.info.cycle_info import CycleInfo +from datetime import datetime +from typing import Dict +from src.chat.models.utils_model import LLMRequest +from src.config.config import global_config +import random + +logger = get_logger("processor") + + +class ActionProcessor(BaseProcessor): + """动作处理器 + + 用于处理Observation对象,将其转换为ObsInfo对象。 + """ + + log_prefix = "聊天信息处理" + + def __init__(self): + """初始化观察处理器""" + super().__init__() + # TODO: API-Adapter修改标记 + self.model_summary = LLMRequest( + model=global_config.model.observation, temperature=0.7, max_tokens=300, request_type="chat_observation" + ) + + async def process_info( + self, + observations: Optional[List[Observation]] = None, + running_memorys: Optional[List[Dict]] = None, + **kwargs: Any, + ) -> List[InfoBase]: + """处理Observation对象 + + Args: + infos: InfoBase对象列表 + observations: 可选的Observation对象列表 + **kwargs: 其他可选参数 + + Returns: + List[InfoBase]: 处理后的ObsInfo实例列表 + """ + # print(f"observations: {observations}") + processed_infos = [] + + # 处理Observation对象 + if observations: + for obs in observations: + + if isinstance(obs, HFCloopObservation): + + + # 创建动作信息 + action_info = ActionInfo() + action_changes = await self.analyze_loop_actions(obs) + if action_changes["add"] or action_changes["remove"]: + action_info.set_action_changes(action_changes) + # 设置变更原因 + reasons = [] + if action_changes["add"]: + reasons.append(f"添加动作{action_changes['add']}因为检测到大量无回复") + if action_changes["remove"]: + reasons.append(f"移除动作{action_changes['remove']}因为检测到连续回复") + action_info.set_reason(" | ".join(reasons)) + processed_infos.append(action_info) + + return processed_infos + + + async def analyze_loop_actions(self, obs: HFCloopObservation) -> Dict[str, List[str]]: + """分析最近的循环内容并决定动作的增减 + + Returns: + Dict[str, List[str]]: 包含要增加和删除的动作 + { + "add": ["action1", "action2"], + "remove": ["action3"] + } + """ + result = {"add": [], "remove": []} + + # 获取最近10次循环 + recent_cycles = obs.history_loop[-10:] if len(obs.history_loop) > 10 else obs.history_loop + if not recent_cycles: + return result + + # 统计no_reply的数量 + no_reply_count = 0 + reply_sequence = [] # 记录最近的动作序列 + + for cycle in recent_cycles: + action_type = cycle.loop_plan_info["action_result"]["action_type"] + if action_type == "no_reply": + no_reply_count += 1 + reply_sequence.append(action_type == "reply") + + # 检查no_reply比例 + if len(recent_cycles) >= 5 and (no_reply_count / len(recent_cycles)) >= 0.8: + result["add"].append("exit_focus_chat") + + # 获取最近三次的reply状态 + last_three = reply_sequence[-3:] if len(reply_sequence) >= 3 else reply_sequence + + # 根据最近的reply情况决定是否移除reply动作 + if len(last_three) >= 3 and all(last_three): + # 如果最近三次都是reply,直接移除 + result["remove"].append("reply") + elif len(last_three) >= 2 and all(last_three[-2:]): + # 如果最近两次都是reply,40%概率移除 + if random.random() < 0.4: + result["remove"].append("reply") + elif last_three and last_three[-1]: + # 如果最近一次是reply,20%概率移除 + if random.random() < 0.2: + result["remove"].append("reply") + + return result diff --git a/src/chat/focus_chat/info_processors/self_processor.py b/src/chat/focus_chat/info_processors/self_processor.py index 5114e49b6..4e7e8544b 100644 --- a/src/chat/focus_chat/info_processors/self_processor.py +++ b/src/chat/focus_chat/info_processors/self_processor.py @@ -22,7 +22,7 @@ logger = get_logger("processor") def init_prompt(): indentify_prompt = """ 你的名字是{bot_name},你是一个AI,但是你伪装成了一个人类,你的人格是,{prompt_personality}。 -你对外的形象是一只橙色的鱼,头上有绿色的树叶,你用的头像也是这个。 +你的头像形象是一只橙色的鱼,头上有绿色的树叶。 {relation_prompt} {memory_str} @@ -36,6 +36,9 @@ def init_prompt(): 3. 你的自我认同是否有助于你的回答,如果你需要自我相关的信息来帮你参与聊天,请输出,否则请输出十个字以内的简短自我认同 4. 一般情况下不用输出自我认同,只需要输出十几个字的简短自我认同就好,除非有明显需要自我认同的场景 +请回复的平淡一些,简短一些,说中文,不要浮夸,平淡一些。 +请注意不要输出多余内容(包括前后缀,冒号和引号,括号(),表情包,at或 @等 )。只输出内容。 + """ Prompt(indentify_prompt, "indentify_prompt") diff --git a/src/chat/focus_chat/planners/action_manager.py b/src/chat/focus_chat/planners/action_manager.py index 2ee7f349d..60ab0babf 100644 --- a/src/chat/focus_chat/planners/action_manager.py +++ b/src/chat/focus_chat/planners/action_manager.py @@ -137,11 +137,7 @@ class ActionManager: observations: List[Observation], expressor: DefaultExpressor, chat_stream: ChatStream, - current_cycle: CycleDetail, log_prefix: str, - on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]], - # total_no_reply_count: int = 0, - # total_waiting_time: float = 0.0, shutting_down: bool = False, ) -> Optional[BaseAction]: """ @@ -156,11 +152,7 @@ class ActionManager: observations: 观察列表 expressor: 表达器 chat_stream: 聊天流 - current_cycle: 当前循环信息 log_prefix: 日志前缀 - on_consecutive_no_reply_callback: 连续不回复回调 - total_no_reply_count: 连续不回复计数 - total_waiting_time: 累计等待时间 shutting_down: 是否正在关闭 Returns: @@ -179,7 +171,6 @@ class ActionManager: try: # 创建动作实例 instance = handler_class( - action_name=action_name, action_data=action_data, reasoning=reasoning, cycle_timers=cycle_timers, @@ -187,11 +178,7 @@ class ActionManager: observations=observations, expressor=expressor, chat_stream=chat_stream, - current_cycle=current_cycle, log_prefix=log_prefix, - on_consecutive_no_reply_callback=on_consecutive_no_reply_callback, - # total_no_reply_count=total_no_reply_count, - # total_waiting_time=total_waiting_time, shutting_down=shutting_down, ) diff --git a/src/chat/focus_chat/planners/actions/exit_focus_chat_action.py b/src/chat/focus_chat/planners/actions/exit_focus_chat_action.py new file mode 100644 index 000000000..6aeb68ccd --- /dev/null +++ b/src/chat/focus_chat/planners/actions/exit_focus_chat_action.py @@ -0,0 +1,108 @@ +import asyncio +import traceback +from src.common.logger_manager import get_logger +from src.chat.utils.timer_calculator import Timer +from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action +from typing import Tuple, List, Callable, Coroutine +from src.chat.heart_flow.observation.observation import Observation +from src.chat.heart_flow.observation.chatting_observation import ChattingObservation +from src.chat.heart_flow.sub_heartflow import SubHeartFlow +from src.chat.message_receive.chat_stream import ChatStream +from src.chat.heart_flow.heartflow import heartflow +from src.chat.heart_flow.sub_heartflow import ChatState + +logger = get_logger("action_taken") + + +@register_action +class ExitFocusChatAction(BaseAction): + """退出专注聊天动作处理类 + + 处理决定退出专注聊天的动作。 + 执行后会将所属的sub heartflow转变为normal_chat状态。 + """ + + action_name = "exit_focus_chat" + action_description = "退出专注聊天,转为普通聊天模式" + action_parameters = {} + action_require = [ + "很长时间没有回复,你决定退出专注聊天", + "当前内容不需要持续专注关注,你决定退出专注聊天", + "聊天内容已经完成,你决定退出专注聊天", + ] + default = True + + def __init__( + self, + action_data: dict, + reasoning: str, + cycle_timers: dict, + thinking_id: str, + observations: List[Observation], + log_prefix: str, + chat_stream: ChatStream, + shutting_down: bool = False, + **kwargs, + ): + """初始化退出专注聊天动作处理器 + + Args: + action_data: 动作数据 + reasoning: 执行该动作的理由 + cycle_timers: 计时器字典 + thinking_id: 思考ID + observations: 观察列表 + log_prefix: 日志前缀 + shutting_down: 是否正在关闭 + """ + super().__init__(action_data, reasoning, cycle_timers, thinking_id) + self.observations = observations + self.log_prefix = log_prefix + self._shutting_down = shutting_down + self.chat_id = chat_stream.stream_id + + + + async def handle_action(self) -> Tuple[bool, str]: + """ + 处理退出专注聊天的情况 + + 工作流程: + 1. 将sub heartflow转换为normal_chat状态 + 2. 等待新消息、超时或关闭信号 + 3. 根据等待结果更新连续不回复计数 + 4. 如果达到阈值,触发回调 + + Returns: + Tuple[bool, str]: (是否执行成功, 状态转换消息) + """ + try: + # 转换状态 + status_message = "" + self.sub_heartflow = await heartflow.get_or_create_subheartflow(self.chat_id) + if self.sub_heartflow: + try: + # 转换为normal_chat状态 + await self.sub_heartflow.change_chat_state(ChatState.NORMAL_CHAT) + status_message = "已成功切换到普通聊天模式" + logger.info(f"{self.log_prefix} {status_message}") + except Exception as e: + error_msg = f"切换到普通聊天模式失败: {str(e)}" + logger.error(f"{self.log_prefix} {error_msg}") + return False, error_msg + else: + warning_msg = "未找到有效的sub heartflow实例,无法切换状态" + logger.warning(f"{self.log_prefix} {warning_msg}") + return False, warning_msg + + + return True, status_message + + except asyncio.CancelledError: + logger.info(f"{self.log_prefix} 处理 'exit_focus_chat' 时等待被中断 (CancelledError)") + raise + except Exception as e: + error_msg = f"处理 'exit_focus_chat' 时发生错误: {str(e)}" + logger.error(f"{self.log_prefix} {error_msg}") + logger.error(traceback.format_exc()) + return False, error_msg \ No newline at end of file diff --git a/src/chat/focus_chat/planners/actions/no_reply_action.py b/src/chat/focus_chat/planners/actions/no_reply_action.py index c6852fbe1..6e31d5abb 100644 --- a/src/chat/focus_chat/planners/actions/no_reply_action.py +++ b/src/chat/focus_chat/planners/actions/no_reply_action.py @@ -6,14 +6,12 @@ from src.chat.focus_chat.planners.actions.base_action import BaseAction, registe from typing import Tuple, List, Callable, Coroutine from src.chat.heart_flow.observation.observation import Observation from src.chat.heart_flow.observation.chatting_observation import ChattingObservation -from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail from src.chat.focus_chat.hfc_utils import parse_thinking_id_to_timestamp logger = get_logger("action_taken") # 常量定义 WAITING_TIME_THRESHOLD = 300 # 等待新消息时间阈值,单位秒 -CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值 @register_action @@ -40,11 +38,7 @@ class NoReplyAction(BaseAction): cycle_timers: dict, thinking_id: str, observations: List[Observation], - on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]], - current_cycle: CycleDetail, log_prefix: str, - # total_no_reply_count: int = 0, - # total_waiting_time: float = 0.0, shutting_down: bool = False, **kwargs, ): @@ -57,20 +51,12 @@ class NoReplyAction(BaseAction): cycle_timers: 计时器字典 thinking_id: 思考ID observations: 观察列表 - on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的回调函数 - current_cycle: 当前循环信息 log_prefix: 日志前缀 - total_no_reply_count: 连续不回复计数 - total_waiting_time: 累计等待时间 shutting_down: 是否正在关闭 """ super().__init__(action_data, reasoning, cycle_timers, thinking_id) self.observations = observations - self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback - self._current_cycle = current_cycle self.log_prefix = log_prefix - # self.total_no_reply_count = total_no_reply_count - # self.total_waiting_time = total_waiting_time self._shutting_down = shutting_down async def handle_action(self) -> Tuple[bool, str]: @@ -93,8 +79,6 @@ class NoReplyAction(BaseAction): with Timer("等待新消息", self.cycle_timers): # 等待新消息、超时或关闭信号,并获取结果 await self._wait_for_new_message(observation, self.thinking_id, self.log_prefix) - # 从计时器获取实际等待时间 - _current_waiting = self.cycle_timers.get("等待新消息", 0.0) return True, "" # 不回复动作没有回复文本 diff --git a/src/chat/focus_chat/planners/actions/plugin_action.py b/src/chat/focus_chat/planners/actions/plugin_action.py index 5e8ddd998..94754d021 100644 --- a/src/chat/focus_chat/planners/actions/plugin_action.py +++ b/src/chat/focus_chat/planners/actions/plugin_action.py @@ -30,8 +30,6 @@ class PluginAction(BaseAction): self._services["expressor"] = kwargs["expressor"] if "chat_stream" in kwargs: self._services["chat_stream"] = kwargs["chat_stream"] - if "current_cycle" in kwargs: - self._services["current_cycle"] = kwargs["current_cycle"] self.log_prefix = kwargs.get("log_prefix", "") diff --git a/src/chat/focus_chat/planners/actions/reply_action.py b/src/chat/focus_chat/planners/actions/reply_action.py index 07e35b458..45a4340d5 100644 --- a/src/chat/focus_chat/planners/actions/reply_action.py +++ b/src/chat/focus_chat/planners/actions/reply_action.py @@ -6,7 +6,6 @@ from typing import Tuple, List from src.chat.heart_flow.observation.observation import Observation from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor from src.chat.message_receive.chat_stream import ChatStream -from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail from src.chat.heart_flow.observation.chatting_observation import ChattingObservation from src.chat.focus_chat.hfc_utils import create_empty_anchor_message @@ -41,7 +40,6 @@ class ReplyAction(BaseAction): def __init__( self, - action_name: str, action_data: dict, reasoning: str, cycle_timers: dict, @@ -49,7 +47,6 @@ class ReplyAction(BaseAction): observations: List[Observation], expressor: DefaultExpressor, chat_stream: ChatStream, - current_cycle: CycleDetail, log_prefix: str, **kwargs, ): @@ -64,14 +61,12 @@ class ReplyAction(BaseAction): observations: 观察列表 expressor: 表达器 chat_stream: 聊天流 - current_cycle: 当前循环信息 log_prefix: 日志前缀 """ super().__init__(action_data, reasoning, cycle_timers, thinking_id) self.observations = observations self.expressor = expressor self.chat_stream = chat_stream - self._current_cycle = current_cycle self.log_prefix = log_prefix async def handle_action(self) -> Tuple[bool, str]: diff --git a/src/chat/focus_chat/planners/planner.py b/src/chat/focus_chat/planners/planner.py index 116419ee1..ca35d3096 100644 --- a/src/chat/focus_chat/planners/planner.py +++ b/src/chat/focus_chat/planners/planner.py @@ -8,12 +8,12 @@ from src.chat.focus_chat.info.info_base import InfoBase from src.chat.focus_chat.info.obs_info import ObsInfo from src.chat.focus_chat.info.cycle_info import CycleInfo from src.chat.focus_chat.info.mind_info import MindInfo +from src.chat.focus_chat.info.action_info import ActionInfo from src.chat.focus_chat.info.structured_info import StructuredInfo from src.common.logger_manager import get_logger from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.individuality.individuality import Individuality from src.chat.focus_chat.planners.action_manager import ActionManager -from src.chat.focus_chat.planners.action_manager import ActionInfo logger = get_logger("planner") @@ -87,34 +87,68 @@ class ActionPlanner: action = "no_reply" # 默认动作 reasoning = "规划器初始化默认" + action_data = {} try: # 获取观察信息 extra_info: list[str] = [] + + # 首先处理动作变更 + for info in all_plan_info: + if isinstance(info, ActionInfo) and info.has_changes(): + add_actions = info.get_add_actions() + remove_actions = info.get_remove_actions() + reason = info.get_reason() + + # 处理动作的增加 + for action_name in add_actions: + if action_name in self.action_manager.get_registered_actions(): + self.action_manager.add_action_to_using(action_name) + logger.debug(f"{self.log_prefix}添加动作: {action_name}, 原因: {reason}") + + # 处理动作的移除 + for action_name in remove_actions: + self.action_manager.remove_action_from_using(action_name) + logger.debug(f"{self.log_prefix}移除动作: {action_name}, 原因: {reason}") + + # 如果当前选择的动作被移除了,更新为no_reply + if action in remove_actions: + action = "no_reply" + reasoning = f"之前选择的动作{action}已被移除,原因: {reason}" + + # 继续处理其他信息 for info in all_plan_info: if isinstance(info, ObsInfo): - # logger.debug(f"{self.log_prefix} 观察信息: {info}") observed_messages = info.get_talking_message() observed_messages_str = info.get_talking_message_str_truncate() chat_type = info.get_chat_type() - if chat_type == "group": - is_group_chat = True - else: - is_group_chat = False + is_group_chat = (chat_type == "group") elif isinstance(info, MindInfo): - # logger.debug(f"{self.log_prefix} 思维信息: {info}") current_mind = info.get_current_mind() elif isinstance(info, CycleInfo): - # logger.debug(f"{self.log_prefix} 循环信息: {info}") cycle_info = info.get_observe_info() elif isinstance(info, StructuredInfo): - # logger.debug(f"{self.log_prefix} 结构化信息: {info}") _structured_info = info.get_data() - else: - logger.debug(f"{self.log_prefix} 其他信息: {info}") + elif not isinstance(info, ActionInfo): # 跳过已处理的ActionInfo extra_info.append(info.get_processed_info()) + # 获取当前可用的动作 current_available_actions = self.action_manager.get_using_actions() + + # 如果没有可用动作,直接返回no_reply + if not current_available_actions: + logger.warning(f"{self.log_prefix}没有可用的动作,将使用no_reply") + action = "no_reply" + reasoning = "没有可用的动作" + return { + "action_result": { + "action_type": action, + "action_data": action_data, + "reasoning": reasoning + }, + "current_mind": current_mind, + "observed_messages": observed_messages + } # --- 构建提示词 (调用修改后的 PromptBuilder 方法) --- prompt = await self.build_planner_prompt( @@ -181,7 +215,7 @@ class ActionPlanner: except Exception as outer_e: logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}") traceback.print_exc() - action = "no_reply" # 发生未知错误,标记为 error 动作 + action = "no_reply" reasoning = f"Planner 内部处理错误: {outer_e}" logger.debug( @@ -202,7 +236,6 @@ class ActionPlanner: "observed_messages": observed_messages, } - # 返回结果字典 return plan_result async def build_planner_prompt( diff --git a/src/chat/heart_flow/background_tasks.py b/src/chat/heart_flow/background_tasks.py index d9fa1c9d3..28b248bdc 100644 --- a/src/chat/heart_flow/background_tasks.py +++ b/src/chat/heart_flow/background_tasks.py @@ -1,13 +1,9 @@ import asyncio import traceback from typing import Optional, Coroutine, Callable, Any, List - from src.common.logger_manager import get_logger - -# Need manager types for dependency injection from src.chat.heart_flow.mai_state_manager import MaiStateManager, MaiStateInfo from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager -from src.chat.heart_flow.interest_logger import InterestLogger logger = get_logger("background_tasks") @@ -62,23 +58,18 @@ class BackgroundTaskManager: mai_state_info: MaiStateInfo, # Needs current state info mai_state_manager: MaiStateManager, subheartflow_manager: SubHeartflowManager, - interest_logger: InterestLogger, ): self.mai_state_info = mai_state_info self.mai_state_manager = mai_state_manager self.subheartflow_manager = subheartflow_manager - self.interest_logger = interest_logger # Task references self._state_update_task: Optional[asyncio.Task] = None self._cleanup_task: Optional[asyncio.Task] = None - self._logging_task: Optional[asyncio.Task] = None - self._normal_chat_timeout_check_task: Optional[asyncio.Task] = None self._hf_judge_state_update_task: Optional[asyncio.Task] = None self._into_focus_task: Optional[asyncio.Task] = None self._private_chat_activation_task: Optional[asyncio.Task] = None # 新增私聊激活任务引用 self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks - self._detect_command_from_gui_task: Optional[asyncio.Task] = None # 新增GUI命令检测任务引用 async def start_tasks(self): """启动所有后台任务 @@ -97,30 +88,12 @@ class BackgroundTaskManager: f"聊天状态更新任务已启动 间隔:{STATE_UPDATE_INTERVAL_SECONDS}s", "_state_update_task", ), - ( - lambda: self._run_normal_chat_timeout_check_cycle(NORMAL_CHAT_TIMEOUT_CHECK_INTERVAL_SECONDS), - "debug", - f"聊天超时检查任务已启动 间隔:{NORMAL_CHAT_TIMEOUT_CHECK_INTERVAL_SECONDS}s", - "_normal_chat_timeout_check_task", - ), - ( - lambda: self._run_absent_into_chat(HF_JUDGE_STATE_UPDATE_INTERVAL_SECONDS), - "debug", - f"状态评估任务已启动 间隔:{HF_JUDGE_STATE_UPDATE_INTERVAL_SECONDS}s", - "_hf_judge_state_update_task", - ), ( self._run_cleanup_cycle, "info", f"清理任务已启动 间隔:{CLEANUP_INTERVAL_SECONDS}s", "_cleanup_task", ), - ( - self._run_logging_cycle, - "info", - f"日志任务已启动 间隔:{LOG_INTERVAL_SECONDS}s", - "_logging_task", - ), # 新增兴趣评估任务配置 ( self._run_into_focus_cycle, @@ -136,13 +109,6 @@ class BackgroundTaskManager: f"私聊激活检查任务已启动 间隔:{PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS}s", "_private_chat_activation_task", ), - # 新增GUI命令检测任务配置 - # ( - # lambda: self._run_detect_command_from_gui_cycle(3), - # "debug", - # f"GUI命令检测任务已启动 间隔:{3}s", - # "_detect_command_from_gui_task", - # ), ] # 统一启动所有任务 @@ -207,7 +173,6 @@ class BackgroundTaskManager: if state_changed: current_state = self.mai_state_info.get_current_state() - await self.subheartflow_manager.enforce_subheartflow_limits() # 状态转换处理 @@ -218,15 +183,6 @@ class BackgroundTaskManager: logger.info("检测到离线,停用所有子心流") await self.subheartflow_manager.deactivate_all_subflows() - async def _perform_absent_into_chat(self): - """调用llm检测是否转换ABSENT-CHAT状态""" - logger.debug("[状态评估任务] 开始基于LLM评估子心流状态...") - await self.subheartflow_manager.sbhf_absent_into_chat() - - async def _normal_chat_timeout_check_work(self): - """检查处于CHAT状态的子心流是否因长时间未发言而超时,并将其转为ABSENT""" - logger.debug("[聊天超时检查] 开始检查处于CHAT状态的子心流...") - await self.subheartflow_manager.sbhf_chat_into_absent() async def _perform_cleanup_work(self): """执行子心流清理任务 @@ -253,42 +209,23 @@ class BackgroundTaskManager: # 记录最终清理结果 logger.info(f"[清理任务] 清理完成, 共停止 {stopped_count}/{len(flows_to_stop)} 个子心流") - async def _perform_logging_work(self): - """执行一轮状态日志记录。""" - await self.interest_logger.log_all_states() # --- 新增兴趣评估工作函数 --- async def _perform_into_focus_work(self): """执行一轮子心流兴趣评估与提升检查。""" # 直接调用 subheartflow_manager 的方法,并传递当前状态信息 await self.subheartflow_manager.sbhf_absent_into_focus() - - # --- 结束新增 --- - - # --- 结束新增 --- - - # --- Specific Task Runners --- # + async def _run_state_update_cycle(self, interval: int): await _run_periodic_loop(task_name="State Update", interval=interval, task_func=self._perform_state_update_work) - async def _run_absent_into_chat(self, interval: int): - await _run_periodic_loop(task_name="Into Chat", interval=interval, task_func=self._perform_absent_into_chat) - async def _run_normal_chat_timeout_check_cycle(self, interval: int): - await _run_periodic_loop( - task_name="Normal Chat Timeout Check", interval=interval, task_func=self._normal_chat_timeout_check_work - ) async def _run_cleanup_cycle(self): await _run_periodic_loop( task_name="Subflow Cleanup", interval=CLEANUP_INTERVAL_SECONDS, task_func=self._perform_cleanup_work ) - async def _run_logging_cycle(self): - await _run_periodic_loop( - task_name="State Logging", interval=LOG_INTERVAL_SECONDS, task_func=self._perform_logging_work - ) - # --- 新增兴趣评估任务运行器 --- async def _run_into_focus_cycle(self): await _run_periodic_loop( @@ -304,11 +241,3 @@ class BackgroundTaskManager: interval=interval, task_func=self.subheartflow_manager.sbhf_absent_private_into_focus, ) - - # # 有api之后删除 - # async def _run_detect_command_from_gui_cycle(self, interval: int): - # await _run_periodic_loop( - # task_name="Detect Command from GUI", - # interval=interval, - # task_func=self.subheartflow_manager.detect_command_from_gui, - # ) diff --git a/src/chat/heart_flow/interest_logger.py b/src/chat/heart_flow/interest_logger.py deleted file mode 100644 index b33f449db..000000000 --- a/src/chat/heart_flow/interest_logger.py +++ /dev/null @@ -1,212 +0,0 @@ -import asyncio -import time -import json -import os -import traceback -from typing import TYPE_CHECKING, Dict, List - -from src.common.logger_manager import get_logger - -# Need chat_manager to get stream names -from src.chat.message_receive.chat_stream import chat_manager - -if TYPE_CHECKING: - from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager - from src.chat.heart_flow.sub_heartflow import SubHeartflow - from src.chat.heart_flow.heartflow import Heartflow # 导入 Heartflow 类型 - - -logger = get_logger("interest") - -# Consider moving log directory/filename constants here -LOG_DIRECTORY = "logs/interest" -HISTORY_LOG_FILENAME = "interest_history.log" - - -def _ensure_log_directory(): - """确保日志目录存在。""" - os.makedirs(LOG_DIRECTORY, exist_ok=True) - logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") - - -def _clear_and_create_log_file(): - """清除日志文件并创建新的日志文件。""" - if os.path.exists(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)): - os.remove(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)) - with open(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME), "w", encoding="utf-8") as f: - f.write("") - - -class InterestLogger: - """负责定期记录主心流和所有子心流的状态到日志文件。""" - - def __init__(self, subheartflow_manager: "SubHeartflowManager", heartflow: "Heartflow"): - """ - 初始化 InterestLogger。 - - Args: - subheartflow_manager: 子心流管理器实例。 - heartflow: 主心流实例,用于获取主心流状态。 - """ - self.subheartflow_manager = subheartflow_manager - self.heartflow = heartflow # 存储 Heartflow 实例 - self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME) - _ensure_log_directory() - _clear_and_create_log_file() - - async def get_all_subflow_states(self) -> Dict[str, Dict]: - """并发获取所有活跃子心流的当前完整状态。""" - all_flows: List["SubHeartflow"] = self.subheartflow_manager.get_all_subheartflows() - tasks = [] - results = {} - - if not all_flows: - # logger.debug("未找到任何子心流状态") - return results - - for subheartflow in all_flows: - if await self.subheartflow_manager.get_or_create_subheartflow(subheartflow.subheartflow_id): - tasks.append( - asyncio.create_task(subheartflow.get_full_state(), name=f"get_state_{subheartflow.subheartflow_id}") - ) - else: - logger.warning(f"子心流 {subheartflow.subheartflow_id} 在创建任务前已消失") - - if tasks: - done, pending = await asyncio.wait(tasks, timeout=5.0) - - if pending: - logger.warning(f"获取子心流状态超时,有 {len(pending)} 个任务未完成") - for task in pending: - task.cancel() - - for task in done: - stream_id_str = task.get_name().split("get_state_")[-1] - stream_id = stream_id_str - - if task.cancelled(): - logger.warning(f"获取子心流 {stream_id} 状态的任务已取消(超时)", exc_info=False) - elif task.exception(): - exc = task.exception() - logger.warning(f"获取子心流 {stream_id} 状态出错: {exc}") - else: - result = task.result() - results[stream_id] = result - - logger.trace(f"成功获取 {len(results)} 个子心流的完整状态") - return results - - async def log_all_states(self): - """获取主心流状态和所有子心流的完整状态并写入日志文件。""" - try: - current_timestamp = time.time() - - # main_mind = self.heartflow.current_mind - # 获取 Mai 状态名称 - mai_state_name = self.heartflow.current_state.get_current_state().name - - all_subflow_states = await self.get_all_subflow_states() - - log_entry_base = { - "timestamp": round(current_timestamp, 2), - # "main_mind": main_mind, - "mai_state": mai_state_name, - "subflow_count": len(all_subflow_states), - "subflows": [], - } - - if not all_subflow_states: - # logger.debug("没有获取到任何子心流状态,仅记录主心流状态") - with open(self._history_log_file_path, "a", encoding="utf-8") as f: - f.write(json.dumps(log_entry_base, ensure_ascii=False) + "\n") - return - - subflow_details = [] - items_snapshot = list(all_subflow_states.items()) - for stream_id, state in items_snapshot: - group_name = stream_id - try: - chat_stream = chat_manager.get_stream(stream_id) - if chat_stream: - if chat_stream.group_info: - group_name = chat_stream.group_info.group_name - elif chat_stream.user_info: - group_name = f"私聊_{chat_stream.user_info.user_nickname}" - except Exception as e: - logger.trace(f"无法获取 stream_id {stream_id} 的群组名: {e}") - - interest_state = state.get("interest_state", {}) - - subflow_entry = { - "stream_id": stream_id, - "group_name": group_name, - "sub_mind": state.get("current_mind", "未知"), - "sub_chat_state": state.get("chat_state", "未知"), - "interest_level": interest_state.get("interest_level", 0.0), - "start_hfc_probability": interest_state.get("start_hfc_probability", 0.0), - # "is_above_threshold": interest_state.get("is_above_threshold", False), - } - subflow_details.append(subflow_entry) - - log_entry_base["subflows"] = subflow_details - - with open(self._history_log_file_path, "a", encoding="utf-8") as f: - f.write(json.dumps(log_entry_base, ensure_ascii=False) + "\n") - - except IOError as e: - logger.error(f"写入状态日志到 {self._history_log_file_path} 出错: {e}") - except Exception as e: - logger.error(f"记录状态时发生意外错误: {e}") - logger.error(traceback.format_exc()) - - async def api_get_all_states(self): - """获取主心流和所有子心流的状态。""" - try: - current_timestamp = time.time() - - # main_mind = self.heartflow.current_mind - # 获取 Mai 状态名称 - mai_state_name = self.heartflow.current_state.get_current_state().name - - all_subflow_states = await self.get_all_subflow_states() - - log_entry_base = { - "timestamp": round(current_timestamp, 2), - # "main_mind": main_mind, - "mai_state": mai_state_name, - "subflow_count": len(all_subflow_states), - "subflows": [], - } - - subflow_details = [] - items_snapshot = list(all_subflow_states.items()) - for stream_id, state in items_snapshot: - group_name = stream_id - try: - chat_stream = chat_manager.get_stream(stream_id) - if chat_stream: - if chat_stream.group_info: - group_name = chat_stream.group_info.group_name - elif chat_stream.user_info: - group_name = f"私聊_{chat_stream.user_info.user_nickname}" - except Exception as e: - logger.trace(f"无法获取 stream_id {stream_id} 的群组名: {e}") - - interest_state = state.get("interest_state", {}) - - subflow_entry = { - "stream_id": stream_id, - "group_name": group_name, - "sub_mind": state.get("current_mind", "未知"), - "sub_chat_state": state.get("chat_state", "未知"), - "interest_level": interest_state.get("interest_level", 0.0), - "start_hfc_probability": interest_state.get("start_hfc_probability", 0.0), - # "is_above_threshold": interest_state.get("is_above_threshold", False), - } - subflow_details.append(subflow_entry) - - log_entry_base["subflows"] = subflow_details - return subflow_details - except Exception as e: - logger.error(f"记录状态时发生意外错误: {e}") - logger.error(traceback.format_exc()) diff --git a/src/chat/heart_flow/mai_state_manager.py b/src/chat/heart_flow/mai_state_manager.py index 017656ad2..c5e272796 100644 --- a/src/chat/heart_flow/mai_state_manager.py +++ b/src/chat/heart_flow/mai_state_manager.py @@ -13,67 +13,24 @@ logger = get_logger("mai_state") # The line `enable_unlimited_hfc_chat = False` is setting a configuration parameter that controls # whether a specific debugging feature is enabled or not. When `enable_unlimited_hfc_chat` is set to # `False`, it means that the debugging feature for unlimited focused chatting is disabled. -enable_unlimited_hfc_chat = True # 调试用:无限专注聊天 -# enable_unlimited_hfc_chat = False +# enable_unlimited_hfc_chat = True # 调试用:无限专注聊天 +enable_unlimited_hfc_chat = False prevent_offline_state = True -# 目前默认不启用OFFLINE状态 - -MAX_NORMAL_CHAT_NUM_PEEKING = int(global_config.chat.base_normal_chat_num / 2) -MAX_NORMAL_CHAT_NUM_NORMAL = global_config.chat.base_normal_chat_num -MAX_NORMAL_CHAT_NUM_FOCUSED = global_config.chat.base_normal_chat_num + 1 - -# 不同状态下专注聊天的最大消息数 -MAX_FOCUSED_CHAT_NUM_PEEKING = int(global_config.chat.base_focused_chat_num / 2) -MAX_FOCUSED_CHAT_NUM_NORMAL = global_config.chat.base_focused_chat_num -MAX_FOCUSED_CHAT_NUM_FOCUSED = global_config.chat.base_focused_chat_num + 2 - -# -- 状态定义 -- +# 目前默认不启用OFFLINE状 class MaiState(enum.Enum): """ 聊天状态: OFFLINE: 不在线:回复概率极低,不会进行任何聊天 - PEEKING: 看一眼手机:回复概率较低,会进行一些普通聊天 NORMAL_CHAT: 正常看手机:回复概率较高,会进行一些普通聊天和少量的专注聊天 FOCUSED_CHAT: 专注聊天:回复概率极高,会进行专注聊天和少量的普通聊天 """ OFFLINE = "不在线" - PEEKING = "看一眼手机" NORMAL_CHAT = "正常看手机" FOCUSED_CHAT = "专心看手机" - def get_normal_chat_max_num(self): - # 调试用 - if enable_unlimited_hfc_chat: - return 1000 - - if self == MaiState.OFFLINE: - return 0 - elif self == MaiState.PEEKING: - return MAX_NORMAL_CHAT_NUM_PEEKING - elif self == MaiState.NORMAL_CHAT: - return MAX_NORMAL_CHAT_NUM_NORMAL - elif self == MaiState.FOCUSED_CHAT: - return MAX_NORMAL_CHAT_NUM_FOCUSED - return None - - def get_focused_chat_max_num(self): - # 调试用 - if enable_unlimited_hfc_chat: - return 1000 - - if self == MaiState.OFFLINE: - return 0 - elif self == MaiState.PEEKING: - return MAX_FOCUSED_CHAT_NUM_PEEKING - elif self == MaiState.NORMAL_CHAT: - return MAX_FOCUSED_CHAT_NUM_NORMAL - elif self == MaiState.FOCUSED_CHAT: - return MAX_FOCUSED_CHAT_NUM_FOCUSED - return None - class MaiStateInfo: def __init__(self): @@ -143,34 +100,18 @@ class MaiStateManager: _time_since_last_min_check = current_time - current_state_info.last_min_check_time next_state: Optional[MaiState] = None - # 辅助函数:根据 prevent_offline_state 标志调整目标状态 def _resolve_offline(candidate_state: MaiState) -> MaiState: - # 现在不再切换到OFFLINE,直接返回当前状态 if candidate_state == MaiState.OFFLINE: return current_status return candidate_state if current_status == MaiState.OFFLINE: logger.info("当前[离线],没看手机,思考要不要上线看看......") - elif current_status == MaiState.PEEKING: - logger.info("当前[看一眼手机],思考要不要继续聊下去......") elif current_status == MaiState.NORMAL_CHAT: logger.info("当前在[正常看手机]思考要不要继续聊下去......") elif current_status == MaiState.FOCUSED_CHAT: logger.info("当前在[专心看手机]思考要不要继续聊下去......") - # 1. 移除每分钟概率切换到OFFLINE的逻辑 - # if time_since_last_min_check >= 60: - # if current_status != MaiState.OFFLINE: - # if random.random() < 0.03: # 3% 概率切换到 OFFLINE - # potential_next = MaiState.OFFLINE - # resolved_next = _resolve_offline(potential_next) - # logger.debug(f"概率触发下线,resolve 为 {resolved_next.value}") - # # 只有当解析后的状态与当前状态不同时才设置 next_state - # if resolved_next != current_status: - # next_state = resolved_next - - # 2. 状态持续时间规则 (只有在规则1没有触发状态改变时才检查) if next_state is None: time_limit_exceeded = False choices_list = [] @@ -178,44 +119,33 @@ class MaiStateManager: rule_id = "" if current_status == MaiState.OFFLINE: - # OFFLINE 状态不再自动切换,直接返回 None return None - elif current_status == MaiState.PEEKING: - if time_in_current_status >= 600: # PEEKING 最多持续 600 秒 - time_limit_exceeded = True - rule_id = "2.2 (From PEEKING)" - weights = [50, 50] - choices_list = [MaiState.NORMAL_CHAT, MaiState.FOCUSED_CHAT] elif current_status == MaiState.NORMAL_CHAT: if time_in_current_status >= 300: # NORMAL_CHAT 最多持续 300 秒 time_limit_exceeded = True rule_id = "2.3 (From NORMAL_CHAT)" - weights = [50, 50] - choices_list = [MaiState.PEEKING, MaiState.FOCUSED_CHAT] + weights = [100] + choices_list = [MaiState.FOCUSED_CHAT] elif current_status == MaiState.FOCUSED_CHAT: if time_in_current_status >= 600: # FOCUSED_CHAT 最多持续 600 秒 time_limit_exceeded = True rule_id = "2.4 (From FOCUSED_CHAT)" - weights = [50, 50] - choices_list = [MaiState.NORMAL_CHAT, MaiState.PEEKING] + weights = [100] + choices_list = [MaiState.NORMAL_CHAT] if time_limit_exceeded: next_state_candidate = random.choices(choices_list, weights=weights, k=1)[0] resolved_candidate = _resolve_offline(next_state_candidate) logger.debug( - f"规则{rule_id}:时间到,随机选择 {next_state_candidate.value},resolve 为 {resolved_candidate.value}" + f"规则{rule_id}:时间到,切换到 {next_state_candidate.value},resolve 为 {resolved_candidate.value}" ) - next_state = resolved_candidate # 直接使用解析后的状态 + next_state = resolved_candidate - # 注意:enable_unlimited_hfc_chat 优先级高于 prevent_offline_state - # 如果触发了这个,它会覆盖上面规则2设置的 next_state if enable_unlimited_hfc_chat: logger.debug("调试用:开挂了,强制切换到专注聊天") next_state = MaiState.FOCUSED_CHAT - # --- 最终决策 --- # - # 如果决定了下一个状态,且这个状态与当前状态不同,则返回下一个状态 if next_state is not None and next_state != current_status: return next_state else: - return None # 没有状态转换发生或无需重置计时器 + return None diff --git a/src/chat/heart_flow/observation/hfcloop_observation.py b/src/chat/heart_flow/observation/hfcloop_observation.py index 82c9c879a..d712b83be 100644 --- a/src/chat/heart_flow/observation/hfcloop_observation.py +++ b/src/chat/heart_flow/observation/hfcloop_observation.py @@ -17,7 +17,9 @@ class HFCloopObservation: self.observe_id = observe_id self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间 self.history_loop: List[CycleDetail] = [] - self.action_manager = ActionManager() + self.action_manager: ActionManager = None + + self.all_actions = {} def get_observe_info(self): return self.observe_info @@ -27,6 +29,7 @@ class HFCloopObservation: def set_action_manager(self, action_manager: ActionManager): self.action_manager = action_manager + self.all_actions = self.action_manager.get_registered_actions() async def observe(self): recent_active_cycles: List[CycleDetail] = [] diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index 157c1c957..c440f8cfd 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -89,6 +89,14 @@ class SubHeartflow: await self.interest_chatting.initialize() logger.debug(f"{self.log_prefix} InterestChatting 实例已初始化。") + # 创建并初始化 normal_chat_instance + chat_stream = chat_manager.get_stream(self.chat_id) + if chat_stream: + self.normal_chat_instance = NormalChat(chat_stream=chat_stream,interest_dict=self.get_interest_dict()) + await self.normal_chat_instance.initialize() + await self.normal_chat_instance.start_chat() + logger.info(f"{self.log_prefix} NormalChat 实例已创建并启动。") + def update_last_chat_state_time(self): self.chat_state_last_time = time.time() - self.chat_state_changed_time @@ -181,8 +189,7 @@ class SubHeartflow: # 创建 HeartFChatting 实例,并传递 从构造函数传入的 回调函数 self.heart_fc_instance = HeartFChatting( chat_id=self.subheartflow_id, - observations=self.observations, # 传递所有观察者 - on_consecutive_no_reply_callback=self.hfc_no_reply_callback, # <-- Use stored callback + observations=self.observations, ) # 初始化并启动 HeartFChatting @@ -200,55 +207,41 @@ class SubHeartflow: self.heart_fc_instance = None # 创建或初始化异常,清理实例 return False - async def change_chat_state(self, new_state: "ChatState"): - """更新sub_heartflow的聊天状态,并管理 HeartFChatting 和 NormalChat 实例及任务""" + async def change_chat_state(self, new_state: ChatState) -> None: + """ + 改变聊天状态。 + 如果转换到CHAT或FOCUSED状态时超过限制,会保持当前状态。 + """ current_state = self.chat_state.chat_status + state_changed = False + log_prefix = f"[{self.log_prefix}]" - if current_state == new_state: - return - - log_prefix = self.log_prefix - state_changed = False # 标记状态是否实际发生改变 - - # --- 状态转换逻辑 --- if new_state == ChatState.CHAT: - # 移除限额检查逻辑 - logger.debug(f"{log_prefix} 准备进入或保持 聊天 状态") - if current_state == ChatState.FOCUSED: - if await self._start_normal_chat(rewind=False): - # logger.info(f"{log_prefix} 成功进入或保持 NormalChat 状态。") - state_changed = True - else: - logger.error(f"{log_prefix} 从FOCUSED状态启动 NormalChat 失败,无法进入 CHAT 状态。") - # 考虑是否需要回滚状态或采取其他措施 - return # 启动失败,不改变状态 + logger.debug(f"{log_prefix} 准备进入或保持 普通聊天 状态") + if await self._start_normal_chat(): + logger.debug(f"{log_prefix} 成功进入或保持 NormalChat 状态。") + state_changed = True else: - if await self._start_normal_chat(rewind=True): - # logger.info(f"{log_prefix} 成功进入或保持 NormalChat 状态。") - state_changed = True - else: - logger.error(f"{log_prefix} 从ABSENT状态启动 NormalChat 失败,无法进入 CHAT 状态。") - # 考虑是否需要回滚状态或采取其他措施 - return # 启动失败,不改变状态 + logger.error(f"{log_prefix} 启动 NormalChat 失败,无法进入 CHAT 状态。") + # 启动失败时,保持当前状态 + return elif new_state == ChatState.FOCUSED: - # 移除限额检查逻辑 logger.debug(f"{log_prefix} 准备进入或保持 专注聊天 状态") if await self._start_heart_fc_chat(): logger.debug(f"{log_prefix} 成功进入或保持 HeartFChatting 状态。") state_changed = True else: logger.error(f"{log_prefix} 启动 HeartFChatting 失败,无法进入 FOCUSED 状态。") - # 启动失败,状态回滚到之前的状态或ABSENT?这里保持不改变 - return # 启动失败,不改变状态 + # 启动失败时,保持当前状态 + return elif new_state == ChatState.ABSENT: logger.info(f"{log_prefix} 进入 ABSENT 状态,停止所有聊天活动...") self.clear_interest_dict() - await self._stop_normal_chat() await self._stop_heart_fc_chat() - state_changed = True # 总是可以成功转换到 ABSENT + state_changed = True # --- 更新状态和最后活动时间 --- if state_changed: @@ -263,7 +256,6 @@ class SubHeartflow: self.chat_state_last_time = 0 self.chat_state_changed_time = time.time() else: - # 如果因为某些原因(如启动失败)没有成功改变状态,记录一下 logger.debug( f"{log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。" ) diff --git a/src/chat/heart_flow/subheartflow_manager.py b/src/chat/heart_flow/subheartflow_manager.py index bf4ddf7e1..22bab6a40 100644 --- a/src/chat/heart_flow/subheartflow_manager.py +++ b/src/chat/heart_flow/subheartflow_manager.py @@ -1,26 +1,14 @@ import asyncio import time import random -from typing import Dict, Any, Optional, List, Tuple -import json # 导入 json 模块 -import functools # <-- 新增导入 - -# 导入日志模块 +from typing import Dict, Any, Optional, List +import functools from src.common.logger_manager import get_logger - -# 导入聊天流管理模块 from src.chat.message_receive.chat_stream import chat_manager - -# 导入心流相关类 from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState from src.chat.heart_flow.mai_state_manager import MaiStateInfo from src.chat.heart_flow.observation.chatting_observation import ChattingObservation - -# 导入LLM请求工具 -from src.chat.models.utils_model import LLMRequest from src.config.config import global_config -from src.individuality.individuality import Individuality -import traceback # 初始化日志记录器 @@ -74,15 +62,6 @@ class SubHeartflowManager: self._lock = asyncio.Lock() # 用于保护 self.subheartflows 的访问 self.mai_state_info: MaiStateInfo = mai_state_info # 存储传入的 MaiStateInfo 实例 - # 为 LLM 状态评估创建一个 LLMRequest 实例 - # 使用与 Heartflow 相同的模型和参数 - # TODO: API-Adapter修改标记 - self.llm_state_evaluator = LLMRequest( - model=global_config.model.heartflow, # 与 Heartflow 一致 - temperature=0.6, # 与 Heartflow 一致 - max_tokens=1000, # 与 Heartflow 一致 (虽然可能不需要这么多) - request_type="subheartflow_state_eval", # 保留特定的请求类型 - ) async def force_change_state(self, subflow_id: Any, target_state: ChatState) -> bool: """强制改变指定子心流的状态""" @@ -156,10 +135,6 @@ class SubHeartflowManager: logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True) return None - # --- 新增:内部方法,用于尝试将单个子心流设置为 ABSENT --- - - # --- 结束新增 --- - async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool: """停止指定的子心流并将其状态设置为 ABSENT""" log_prefix = "[子心流管理]" @@ -190,54 +165,6 @@ class SubHeartflowManager: return flows_to_stop - async def enforce_subheartflow_limits(self): - """根据主状态限制停止超额子心流(优先停不活跃的)""" - # 使用 self.mai_state_info 获取当前状态和限制 - current_mai_state = self.mai_state_info.get_current_state() - normal_limit = current_mai_state.get_normal_chat_max_num() - focused_limit = current_mai_state.get_focused_chat_max_num() - logger.debug(f"[限制] 状态:{current_mai_state.value}, 普通限:{normal_limit}, 专注限:{focused_limit}") - - # 分类统计当前子心流 - normal_flows = [] - focused_flows = [] - for flow_id, flow in list(self.subheartflows.items()): - if flow.chat_state.chat_status == ChatState.CHAT: - normal_flows.append((flow_id, getattr(flow, "last_active_time", 0))) - elif flow.chat_state.chat_status == ChatState.FOCUSED: - focused_flows.append((flow_id, getattr(flow, "last_active_time", 0))) - - logger.debug(f"[限制] 当前数量 - 普通:{len(normal_flows)}, 专注:{len(focused_flows)}") - stopped = 0 - - # 处理普通聊天超额 - if len(normal_flows) > normal_limit: - excess = len(normal_flows) - normal_limit - logger.info(f"[限制] 普通聊天超额({len(normal_flows)}>{normal_limit}), 停止{excess}个") - normal_flows.sort(key=lambda x: x[1]) - for flow_id, _ in normal_flows[:excess]: - if await self.sleep_subheartflow(flow_id, f"普通聊天超额(限{normal_limit})"): - stopped += 1 - - # 处理专注聊天超额(需重新统计) - focused_flows = [ - (fid, t) - for fid, f in list(self.subheartflows.items()) - if (t := getattr(f, "last_active_time", 0)) and f.chat_state.chat_status == ChatState.FOCUSED - ] - if len(focused_flows) > focused_limit: - excess = len(focused_flows) - focused_limit - logger.info(f"[限制] 专注聊天超额({len(focused_flows)}>{focused_limit}), 停止{excess}个") - focused_flows.sort(key=lambda x: x[1]) - for flow_id, _ in focused_flows[:excess]: - if await self.sleep_subheartflow(flow_id, f"专注聊天超额(限{focused_limit})"): - stopped += 1 - - if stopped: - logger.info(f"[限制] 已停止{stopped}个子心流, 剩余:{len(self.subheartflows)}") - else: - logger.debug(f"[限制] 无需停止, 当前总数:{len(self.subheartflows)}") - async def deactivate_all_subflows(self): """将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)""" log_prefix = "[停用]" @@ -273,27 +200,14 @@ class SubHeartflowManager: ) async def sbhf_absent_into_focus(self): - """评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)""" + """评估子心流兴趣度,满足条件则提升到FOCUSED状态(基于start_hfc_probability)""" try: current_state = self.mai_state_info.get_current_state() - focused_limit = current_state.get_focused_chat_max_num() - # --- 新增:检查是否允许进入 FOCUS 模式 --- # + # 检查是否允许进入 FOCUS 模式 if not global_config.chat.allow_focus_mode: if int(time.time()) % 60 == 0: # 每60秒输出一次日志避免刷屏 logger.trace("未开启 FOCUSED 状态 (allow_focus_mode=False)") - return # 如果不允许,直接返回 - # --- 结束新增 --- - - logger.info(f"当前状态 ({current_state.value}) 可以在{focused_limit}个群 专注聊天") - - if focused_limit <= 0: - # logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流") - return - - current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED) - if current_focused_count >= focused_limit: - logger.debug(f"已达专注上限 ({current_focused_count}/{focused_limit})") return for sub_hf in list(self.subheartflows.values()): @@ -321,11 +235,6 @@ class SubHeartflowManager: if random.random() >= sub_hf.interest_chatting.start_hfc_probability: continue - # 再次检查是否达到上限 - if current_focused_count >= focused_limit: - logger.debug(f"{stream_name} 已达专注上限") - break - # 获取最新状态并执行提升 current_subflow = self.subheartflows.get(flow_id) if not current_subflow: @@ -338,283 +247,57 @@ class SubHeartflowManager: # 执行状态提升 await current_subflow.change_chat_state(ChatState.FOCUSED) - # 验证提升结果 - if ( - final_subflow := self.subheartflows.get(flow_id) - ) and final_subflow.chat_state.chat_status == ChatState.FOCUSED: - current_focused_count += 1 except Exception as e: logger.error(f"启动HFC 兴趣评估失败: {e}", exc_info=True) - async def sbhf_absent_into_chat(self): + + async def sbhf_focus_into_absent_or_chat(self, subflow_id: Any): """ - 随机选一个 ABSENT 状态的 *群聊* 子心流,评估是否应转换为 CHAT 状态。 - 每次调用最多转换一个。 - 私聊会被忽略。 - """ - current_mai_state = self.mai_state_info.get_current_state() - chat_limit = current_mai_state.get_normal_chat_max_num() - - async with self._lock: - # 1. 筛选出所有 ABSENT 状态的 *群聊* 子心流 - absent_group_subflows = [ - hf - for hf in self.subheartflows.values() - if hf.chat_state.chat_status == ChatState.ABSENT and hf.is_group_chat - ] - - if not absent_group_subflows: - # logger.debug("没有摸鱼的群聊子心流可以评估。") # 日志太频繁 - return # 没有目标,直接返回 - - # 2. 随机选一个幸运儿 - sub_hf_to_evaluate = random.choice(absent_group_subflows) - flow_id = sub_hf_to_evaluate.subheartflow_id - stream_name = chat_manager.get_stream_name(flow_id) or flow_id - log_prefix = f"[{stream_name}]" - - # 3. 检查 CHAT 上限 - current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT) - if current_chat_count >= chat_limit: - logger.info(f"{log_prefix} 想看看能不能聊,但是聊天太多了, ({current_chat_count}/{chat_limit}) 满了。") - return # 满了,这次就算了 - - # --- 获取 FOCUSED 计数 --- - current_focused_count = self.count_subflows_by_state_nolock(ChatState.FOCUSED) - focused_limit = current_mai_state.get_focused_chat_max_num() - - # --- 新增:获取聊天和专注群名 --- - chatting_group_names = [] - focused_group_names = [] - for flow_id, hf in self.subheartflows.items(): - stream_name = chat_manager.get_stream_name(flow_id) or str(flow_id) # 保证有名字 - if hf.chat_state.chat_status == ChatState.CHAT: - chatting_group_names.append(stream_name) - elif hf.chat_state.chat_status == ChatState.FOCUSED: - focused_group_names.append(stream_name) - # --- 结束新增 --- - - # --- 获取观察信息和构建 Prompt --- - first_observation = sub_hf_to_evaluate.observations[0] # 喵~第一个观察者肯定存在的说 - await first_observation.observe() - current_chat_log = first_observation.talking_message_str or "当前没啥聊天内容。" - _observation_summary = f"在[{stream_name}]这个群中,你最近看群友聊了这些:\n{current_chat_log}" - - _mai_state_description = f"你当前状态: {current_mai_state.value}。" - individuality = Individuality.get_instance() - personality_prompt = individuality.get_prompt(x_person=2, level=2) - prompt_personality = f"你正在扮演名为{individuality.name}的人类,{personality_prompt}" - - # --- 修改:在 prompt 中加入当前聊天计数和群名信息 (条件显示) --- - chat_status_lines = [] - if chatting_group_names: - chat_status_lines.append( - f"正在这些群闲聊 ({current_chat_count}/{chat_limit}): {', '.join(chatting_group_names)}" - ) - if focused_group_names: - chat_status_lines.append( - f"正在这些群专注的聊天 ({current_focused_count}/{focused_limit}): {', '.join(focused_group_names)}" - ) - - chat_status_prompt = "当前没有在任何群聊中。" # 默认消息喵~ - if chat_status_lines: - chat_status_prompt = "当前聊天情况,你已经参与了下面这几个群的聊天:\n" + "\n".join( - chat_status_lines - ) # 拼接状态信息 - - prompt = ( - f"{prompt_personality}\n" - f"{chat_status_prompt}\n" # <-- 喵!用了新的状态信息~ - f"你当前尚未加入 [{stream_name}] 群聊天。\n" - f"{_observation_summary}\n---\n" - f"基于以上信息,你想不想开始在这个群闲聊?\n" - f"请说明理由,并以 JSON 格式回答,包含 'decision' (布尔值) 和 'reason' (字符串)。\n" - f'例如:{{"decision": true, "reason": "看起来挺热闹的,插个话"}}\n' - f'例如:{{"decision": false, "reason": "已经聊了好多,休息一下"}}\n' - f"请只输出有效的 JSON 对象。" - ) - # --- 结束修改 --- - - # --- 4. LLM 评估是否想聊 --- - yao_kai_shi_liao_ma, reason = await self._llm_evaluate_state_transition(prompt) - - if reason: - if yao_kai_shi_liao_ma: - logger.info(f"{log_prefix} 打算开始聊,原因是: {reason}") - else: - logger.info(f"{log_prefix} 不打算聊,原因是: {reason}") - else: - logger.info(f"{log_prefix} 结果: {yao_kai_shi_liao_ma}") - - if yao_kai_shi_liao_ma is None: - logger.debug(f"{log_prefix} 问AI想不想聊失败了,这次算了。") - return # 评估失败,结束 - - if not yao_kai_shi_liao_ma: - # logger.info(f"{log_prefix} 现在不想聊这个群。") - return # 不想聊,结束 - - # --- 5. AI想聊,再次检查额度并尝试转换 --- - # 再次检查以防万一 - current_chat_count_before_change = self.count_subflows_by_state_nolock(ChatState.CHAT) - if current_chat_count_before_change < chat_limit: - logger.info( - f"{log_prefix} 想聊,而且还有精力 ({current_chat_count_before_change}/{chat_limit}),这就去聊!" - ) - await sub_hf_to_evaluate.change_chat_state(ChatState.CHAT) - # 确认转换成功 - if sub_hf_to_evaluate.chat_state.chat_status == ChatState.CHAT: - logger.debug(f"{log_prefix} 成功进入聊天状态!本次评估圆满结束。") - else: - logger.warning( - f"{log_prefix} 奇怪,尝试进入聊天状态失败了。当前状态: {sub_hf_to_evaluate.chat_state.chat_status.value}" - ) - else: - logger.warning( - f"{log_prefix} AI说想聊,但是刚问完就没空位了 ({current_chat_count_before_change}/{chat_limit})。真不巧,下次再说吧。" - ) - # 无论转换成功与否,本次评估都结束了 - - # 锁在这里自动释放 - - # --- 新增:单独检查 CHAT 状态超时的任务 --- - async def sbhf_chat_into_absent(self): - """定期检查处于 CHAT 状态的子心流是否因长时间未发言而超时,并将其转为 ABSENT。""" - log_prefix_task = "[聊天超时检查]" - transitioned_to_absent = 0 - checked_count = 0 - - async with self._lock: - subflows_snapshot = list(self.subheartflows.values()) - checked_count = len(subflows_snapshot) - - if not subflows_snapshot: - return - - for sub_hf in subflows_snapshot: - # 只检查 CHAT 状态的子心流 - if sub_hf.chat_state.chat_status != ChatState.CHAT: - continue - - flow_id = sub_hf.subheartflow_id - stream_name = chat_manager.get_stream_name(flow_id) or flow_id - log_prefix = f"[{stream_name}]({log_prefix_task})" - - should_deactivate = False - reason = "" - - try: - last_bot_dong_zuo_time = sub_hf.get_normal_chat_last_speak_time() - - if last_bot_dong_zuo_time > 0: - current_time = time.time() - time_since_last_bb = current_time - last_bot_dong_zuo_time - minutes_since_last_bb = time_since_last_bb / 60 - - # 60分钟强制退出 - if minutes_since_last_bb >= 60: - should_deactivate = True - reason = "超过60分钟未发言,强制退出" - else: - # 根据时间区间确定退出概率 - exit_probability = 0 - if minutes_since_last_bb < 5: - exit_probability = 0.01 # 1% - elif minutes_since_last_bb < 15: - exit_probability = 0.02 # 2% - elif minutes_since_last_bb < 30: - exit_probability = 0.04 # 4% - else: - exit_probability = 0.08 # 8% - - # 随机判断是否退出 - if random.random() < exit_probability: - should_deactivate = True - reason = f"已{minutes_since_last_bb:.1f}分钟未发言,触发{exit_probability * 100:.0f}%退出概率" - - except AttributeError: - logger.error( - f"{log_prefix} 无法获取 Bot 最后 BB 时间,请确保 SubHeartflow 相关实现正确。跳过超时检查。" - ) - except Exception as e: - logger.error(f"{log_prefix} 检查 Bot 超时状态时出错: {e}", exc_info=True) - - # 执行状态转换(如果超时) - if should_deactivate: - logger.debug(f"{log_prefix} 因超时 ({reason}),尝试转换为 ABSENT 状态。") - await sub_hf.change_chat_state(ChatState.ABSENT) - # 再次检查确保状态已改变 - if sub_hf.chat_state.chat_status == ChatState.ABSENT: - transitioned_to_absent += 1 - logger.info(f"{log_prefix} 不看了。") - else: - logger.warning(f"{log_prefix} 尝试因超时转换为 ABSENT 失败。") - - if transitioned_to_absent > 0: - logger.debug( - f"{log_prefix_task} 完成,共检查 {checked_count} 个子心流,{transitioned_to_absent} 个因超时转为 ABSENT。" - ) - - # --- 结束新增 --- - - async def _llm_evaluate_state_transition(self, prompt: str) -> Tuple[Optional[bool], Optional[str]]: - """ - 使用 LLM 评估是否应进行状态转换,期望 LLM 返回 JSON 格式。 + 接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 CHAT。 + 通常在连续多次 "no_reply" 后被调用。 + 对于私聊和群聊,都转换为 CHAT。 Args: - prompt: 提供给 LLM 的提示信息,要求返回 {"decision": true/false}。 - - Returns: - Optional[bool]: 如果成功解析 LLM 的 JSON 响应并提取了 'decision' 键的值,则返回该布尔值。 - 如果 LLM 调用失败、返回无效 JSON 或 JSON 中缺少 'decision' 键或其值不是布尔型,则返回 None。 + subflow_id: 需要转换状态的子心流 ID。 """ - log_prefix = "[LLM状态评估]" - try: - # --- 真实的 LLM 调用 --- - response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt) - # logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估") - logger.debug(f"{log_prefix} 原始输入: {prompt}") - logger.debug(f"{log_prefix} 原始评估结果: {response_text}") + async with self._lock: + subflow = self.subheartflows.get(subflow_id) + if not subflow: + logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 CHAT") + return - # --- 解析 JSON 响应 --- - try: - # 尝试去除可能的Markdown代码块标记 - cleaned_response = response_text.strip().strip("`").strip() - if cleaned_response.startswith("json"): - cleaned_response = cleaned_response[4:].strip() + stream_name = chat_manager.get_stream_name(subflow_id) or subflow_id + current_state = subflow.chat_state.chat_status - data = json.loads(cleaned_response) - decision = data.get("decision") # 使用 .get() 避免 KeyError - reason = data.get("reason") + if current_state == ChatState.FOCUSED: + target_state = ChatState.CHAT + log_reason = "转为CHAT" - if isinstance(decision, bool): - logger.debug(f"{log_prefix} LLM评估结果 (来自JSON): {'建议转换' if decision else '建议不转换'}") - - return decision, reason - else: - logger.warning( - f"{log_prefix} LLM 返回的 JSON 中 'decision' 键的值不是布尔型: {decision}。响应: {response_text}" + logger.info( + f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 尝试转换为 {target_state.value} ({log_reason})" + ) + try: + # 从HFC到CHAT时,清空兴趣字典 + subflow.clear_interest_dict() + await subflow.change_chat_state(target_state) + final_state = subflow.chat_state.chat_status + if final_state == target_state: + logger.debug(f"[状态转换请求] {stream_name} 状态已成功转换为 {final_state.value}") + else: + logger.warning( + f"[状态转换请求] 尝试将 {stream_name} 转换为 {target_state.value} 后,状态实际为 {final_state.value}" + ) + except Exception as e: + logger.error( + f"[状态转换请求] 转换 {stream_name} 到 {target_state.value} 时出错: {e}", exc_info=True ) - return None, None # 值类型不正确 - - except json.JSONDecodeError as json_err: - logger.warning(f"{log_prefix} LLM 返回的响应不是有效的 JSON: {json_err}。响应: {response_text}") - # 尝试在非JSON响应中查找关键词作为后备方案 (可选) - if "true" in response_text.lower(): - logger.debug(f"{log_prefix} 在非JSON响应中找到 'true',解释为建议转换") - return True, None - if "false" in response_text.lower(): - logger.debug(f"{log_prefix} 在非JSON响应中找到 'false',解释为建议不转换") - return False, None - return None, None # JSON 解析失败,也未找到关键词 - except Exception as parse_err: # 捕获其他可能的解析错误 - logger.warning(f"{log_prefix} 解析 LLM JSON 响应时发生意外错误: {parse_err}。响应: {response_text}") - return None, None - - except Exception as e: - logger.error(f"{log_prefix} 调用 LLM 或处理其响应时出错: {e}", exc_info=True) - traceback.print_exc() - return None, None # LLM 调用或处理失败 + elif current_state == ChatState.ABSENT: + logger.debug(f"[状态转换请求] {stream_name} 处于 ABSENT 状态,尝试转为 CHAT") + await subflow.change_chat_state(ChatState.CHAT) + else: + logger.debug( + f"[状态转换请求] {stream_name} 当前状态为 {current_state.value},无需转换" + ) def count_subflows_by_state(self, state: ChatState) -> int: """统计指定状态的子心流数量""" @@ -637,23 +320,6 @@ class SubHeartflowManager: count += 1 return count - def get_active_subflow_minds(self) -> List[str]: - """获取所有活跃(非ABSENT)子心流的当前想法""" - minds = [] - for subheartflow in self.subheartflows.values(): - # 检查子心流是否活跃(非ABSENT状态) - if subheartflow.chat_state.chat_status != ChatState.ABSENT: - minds.append(subheartflow.sub_mind.current_mind) - return minds - - def update_main_mind_in_subflows(self, main_mind: str): - """更新所有子心流的主心流想法""" - updated_count = sum( - 1 - for _, subheartflow in list(self.subheartflows.items()) - if subheartflow.subheartflow_id in self.subheartflows - ) - logger.debug(f"[子心流管理器] 更新了{updated_count}个子心流的主想法") async def delete_subflow(self, subheartflow_id: Any): """删除指定的子心流。""" @@ -670,91 +336,13 @@ class SubHeartflowManager: else: logger.warning(f"尝试删除不存在的 SubHeartflow: {subheartflow_id}") - # --- 新增:处理 HFC 无回复回调的专用方法 --- # + async def _handle_hfc_no_reply(self, subheartflow_id: Any): """处理来自 HeartFChatting 的连续无回复信号 (通过 partial 绑定 ID)""" - # 注意:这里不需要再获取锁,因为 sbhf_focus_into_absent 内部会处理锁 + # 注意:这里不需要再获取锁,因为 sbhf_focus_into_absent_or_chat 内部会处理锁 logger.debug(f"[管理器 HFC 处理器] 接收到来自 {subheartflow_id} 的 HFC 无回复信号") await self.sbhf_focus_into_absent_or_chat(subheartflow_id) - # --- 结束新增 --- # - - # --- 新增:处理来自 HeartFChatting 的状态转换请求 --- # - async def sbhf_focus_into_absent_or_chat(self, subflow_id: Any): - """ - 接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 ABSENT 或 CHAT。 - 通常在连续多次 "no_reply" 后被调用。 - 对于私聊,总是转换为 ABSENT。 - 对于群聊,随机决定转换为 ABSENT 或 CHAT (如果 CHAT 未达上限)。 - - Args: - subflow_id: 需要转换状态的子心流 ID。 - """ - async with self._lock: - subflow = self.subheartflows.get(subflow_id) - if not subflow: - logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 ABSENT/CHAT") - return - - stream_name = chat_manager.get_stream_name(subflow_id) or subflow_id - current_state = subflow.chat_state.chat_status - - if current_state == ChatState.FOCUSED: - target_state = ChatState.ABSENT # Default target - log_reason = "默认转换 (私聊或群聊)" - - # --- Modify logic based on chat type --- # - if subflow.is_group_chat: - # Group chat: Decide between ABSENT or CHAT - if random.random() < 0.5: # 50% chance to try CHAT - current_mai_state = self.mai_state_info.get_current_state() - chat_limit = current_mai_state.get_normal_chat_max_num() - current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT) - - if current_chat_count < chat_limit: - target_state = ChatState.CHAT - log_reason = f"群聊随机选择 CHAT (当前 {current_chat_count}/{chat_limit})" - else: - target_state = ChatState.ABSENT # Fallback to ABSENT if CHAT limit reached - log_reason = ( - f"群聊随机选择 CHAT 但已达上限 ({current_chat_count}/{chat_limit}),转为 ABSENT" - ) - else: # 50% chance to go directly to ABSENT - target_state = ChatState.ABSENT - log_reason = "群聊随机选择 ABSENT" - else: - # Private chat: Always go to ABSENT - target_state = ChatState.ABSENT - log_reason = "私聊退出 FOCUSED,转为 ABSENT" - # --- End modification --- # - - logger.info( - f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 尝试转换为 {target_state.value} ({log_reason})" - ) - try: - # 从HFC到CHAT时,清空兴趣字典 - subflow.clear_interest_dict() - await subflow.change_chat_state(target_state) - final_state = subflow.chat_state.chat_status - if final_state == target_state: - logger.debug(f"[状态转换请求] {stream_name} 状态已成功转换为 {final_state.value}") - else: - logger.warning( - f"[状态转换请求] 尝试将 {stream_name} 转换为 {target_state.value} 后,状态实际为 {final_state.value}" - ) - except Exception as e: - logger.error( - f"[状态转换请求] 转换 {stream_name} 到 {target_state.value} 时出错: {e}", exc_info=True - ) - elif current_state == ChatState.ABSENT: - logger.debug(f"[状态转换请求] {stream_name} 已处于 ABSENT 状态,无需转换") - else: - logger.warning( - f"[状态转换请求] 收到对 {stream_name} 的请求,但其状态为 {current_state.value} (非 FOCUSED),不执行转换" - ) - - # --- 结束新增 --- # - # --- 新增:处理私聊从 ABSENT 直接到 FOCUSED 的逻辑 --- # async def sbhf_absent_private_into_focus(self): """检查 ABSENT 状态的私聊子心流是否有新活动,若有且未达 FOCUSED 上限,则直接转换为 FOCUSED。""" @@ -762,19 +350,8 @@ class SubHeartflowManager: transitioned_count = 0 checked_count = 0 - # --- 获取当前状态和 FOCUSED 上限 --- # - current_mai_state = self.mai_state_info.get_current_state() - focused_limit = current_mai_state.get_focused_chat_max_num() - # --- 检查是否允许 FOCUS 模式 --- # if not global_config.chat.allow_focus_mode: - # Log less frequently to avoid spam - # if int(time.time()) % 60 == 0: - # logger.debug(f"{log_prefix_task} 配置不允许进入 FOCUSED 状态") - return - - if focused_limit <= 0: - # logger.debug(f"{log_prefix_task} 当前状态 ({current_mai_state.value}) 不允许 FOCUSED 子心流") return async with self._lock: @@ -795,12 +372,6 @@ class SubHeartflowManager: # --- 遍历评估每个符合条件的私聊 --- # for sub_hf in eligible_subflows: - # --- 再次检查 FOCUSED 上限,因为可能有多个同时激活 --- # - if current_focused_count >= focused_limit: - logger.debug( - f"{log_prefix_task} 已达专注上限 ({current_focused_count}/{focused_limit}),停止检查后续私聊。" - ) - break # 已满,无需再检查其他私聊 flow_id = sub_hf.subheartflow_id stream_name = chat_manager.get_stream_name(flow_id) or flow_id @@ -824,9 +395,6 @@ class SubHeartflowManager: # --- 如果活跃且未达上限,则尝试转换 --- # if is_active: - logger.info( - f"{log_prefix} 检测到活跃且未达专注上限 ({current_focused_count}/{focused_limit}),尝试转换为 FOCUSED。" - ) await sub_hf.change_chat_state(ChatState.FOCUSED) # 确认转换成功 if sub_hf.chat_state.chat_status == ChatState.FOCUSED: diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 88bf141a1..3b9a6f929 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -78,25 +78,6 @@ class ChatBot: group_info = message.message_info.group_info user_info = message.message_info.user_info - # 用户黑名单拦截 - # if userinfo.user_id in global_config.chat_target.ban_user_id: - # logger.debug(f"用户{userinfo.user_id}被禁止回复") - # return - - # if groupinfo is None: - # logger.trace("检测到私聊消息,检查") - # # 好友黑名单拦截 - # if userinfo.user_id not in global_config.experimental.talk_allowed_private: - # # logger.debug(f"用户{userinfo.user_id}没有私聊权限") - # return - - # 群聊黑名单拦截 - # print(groupinfo.group_id) - # print(global_config.chat_target.talk_allowed_groups) - # if groupinfo is not None and groupinfo.group_id not in global_config.chat_target.talk_allowed_groups: - # logger.debug(f"群{groupinfo.group_id}被禁止回复") - # return - # 确认从接口发来的message是否有自定义的prompt模板信息 if message.message_info.template_info and not message.message_info.template_info.template_default: template_group_name = message.message_info.template_info.template_name @@ -114,28 +95,6 @@ class ChatBot: # 如果在私聊中 if group_info is None: logger.trace("检测到私聊消息") - # 是否在配置信息中开启私聊模式 - # if global_config.experimental.enable_friend_chat: - # logger.trace("私聊模式已启用") - # # 是否进入PFC - # if global_config.enable_pfc_chatting: - # logger.trace("进入PFC私聊处理流程") - # userinfo = message.message_info.user_info - # messageinfo = message.message_info - # # 创建聊天流 - # logger.trace(f"为{userinfo.user_id}创建/获取聊天流") - # chat = await chat_manager.get_or_create_stream( - # platform=messageinfo.platform, - # user_info=userinfo, - # group_info=groupinfo, - # ) - # message.update_chat_stream(chat) - # await self.only_process_chat.process_message(message) - # await self._create_pfc_chat(message) - # # 禁止PFC,进入普通的心流消息处理逻辑 - # else: - # logger.trace("进入普通心流私聊处理") - # await self.heartflow_processor.process_message(message_data) if global_config.experimental.pfc_chatting: logger.trace("进入PFC私聊处理流程") # 创建聊天流 diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 96cc2b8cb..bd5322137 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -22,11 +22,11 @@ from src.chat.emoji_system.emoji_manager import emoji_manager from src.chat.normal_chat.willing.willing_manager import willing_manager from src.config.config import global_config -logger = get_logger("chat") +logger = get_logger("normal_chat") class NormalChat: - def __init__(self, chat_stream: ChatStream, interest_dict: dict = None): + def __init__(self, chat_stream: ChatStream, interest_dict: dict = {}): """初始化 NormalChat 实例。只进行同步操作。""" # Basic info from chat_stream (sync) @@ -200,7 +200,7 @@ class NormalChat: logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") break - # 获取待处理消息列表 + items_to_process = list(self.interest_dict.items()) if not items_to_process: continue @@ -481,7 +481,7 @@ class NormalChat: try: if exc := task.exception(): logger.error(f"[{self.stream_name}] 任务异常: {exc}") - logger.error(traceback.format_exc()) + traceback.print_exc() except asyncio.CancelledError: logger.debug(f"[{self.stream_name}] 任务已取消") except Exception as e: @@ -522,4 +522,4 @@ class NormalChat: logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。") except Exception as e: logger.error(f"[{self.stream_name}] 清理思考消息时出错: {e}") - logger.error(traceback.format_exc()) + traceback.print_exc() diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index a5b601c43..6d9ce0719 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -15,6 +15,8 @@ from ..models.utils_model import LLMRequest from .typo_generator import ChineseTypoGenerator from ...common.database.database import db from ...config.config import global_config +from ...common.database.database_model import Messages +from ...common.message_repository import find_messages, count_messages logger = get_module_logger("chat_utils") @@ -108,20 +110,12 @@ async def get_embedding(text, request_type="embedding"): def get_recent_group_detailed_plain_text(chat_stream_id: str, limit: int = 12, combine=False): - recent_messages = list( - db.messages.find( - {"chat_id": chat_stream_id}, - { - "time": 1, # 返回时间字段 - "chat_id": 1, - "chat_info": 1, - "user_info": 1, - "message_id": 1, # 返回消息ID字段 - "detailed_plain_text": 1, # 返回处理后的文本字段 - }, - ) - .sort("time", -1) - .limit(limit) + filter_query = {"chat_id": chat_stream_id} + sort_order = [("time", -1)] + recent_messages = find_messages( + message_filter=filter_query, + sort=sort_order, + limit=limit ) if not recent_messages: @@ -143,17 +137,14 @@ def get_recent_group_detailed_plain_text(chat_stream_id: str, limit: int = 12, c return message_detailed_plain_text_list -def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> list: +def get_recent_group_speaker(chat_stream_id: str, sender, limit: int = 12) -> list: # 获取当前群聊记录内发言的人 - recent_messages = list( - db.messages.find( - {"chat_id": chat_stream_id}, - { - "user_info": 1, - }, - ) - .sort("time", -1) - .limit(limit) + filter_query = {"chat_id": chat_stream_id} + sort_order = [("time", -1)] + recent_messages = find_messages( + message_filter=filter_query, + sort=sort_order, + limit=limit ) if not recent_messages: @@ -161,7 +152,12 @@ def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> li who_chat_in_group = [] for msg_db_data in recent_messages: - user_info = UserInfo.from_dict(msg_db_data["user_info"]) + user_info = UserInfo.from_dict({ + "platform": msg_db_data["user_platform"], + "user_id": msg_db_data["user_id"], + "user_nickname": msg_db_data["user_nickname"], + "user_cardname": msg_db_data.get("user_cardname", "") + }) if ( (user_info.platform, user_info.user_id) != sender and user_info.user_id != global_config.bot.qq_account @@ -581,26 +577,23 @@ def count_messages_between(start_time: float, end_time: float, stream_id: str) - logger.error("stream_id 不能为空") return 0, 0 - # 直接查询时间范围内的消息 - # time > start_time AND time <= end_time - query = {"chat_id": stream_id, "time": {"$gt": start_time, "$lte": end_time}} + # 使用message_repository中的count_messages和find_messages函数 + + + # 构建查询条件 + filter_query = {"chat_id": stream_id, "time": {"$gt": start_time, "$lte": end_time}} try: - # 执行查询 - messages_cursor = db.messages.find(query) + # 先获取消息数量 + count = count_messages(filter_query) + + # 获取消息内容计算总长度 + messages = find_messages(message_filter=filter_query) + total_length = sum(len(msg.get("processed_plain_text", "")) for msg in messages) - # 遍历结果计算数量和长度 - for msg in messages_cursor: - count += 1 - total_length += len(msg.get("processed_plain_text", "")) - - # logger.debug(f"查询范围 ({start_time}, {end_time}] 内找到 {count} 条消息,总长度 {total_length}") return count, total_length - except PyMongoError as e: - logger.error(f"查询 stream_id={stream_id} 在 ({start_time}, {end_time}] 范围内的消息时出错: {e}") - return 0, 0 - except Exception as e: # 保留一个通用异常捕获以防万一 + except Exception as e: logger.error(f"计算消息数量时发生意外错误: {e}") return 0, 0 diff --git a/src/common/logger.py b/src/common/logger.py index adc15fe71..394d9de90 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -276,6 +276,40 @@ CHAT_STYLE_CONFIG = { }, } +# Topic日志样式配置 +NORMAL_CHAT_STYLE_CONFIG = { + "advanced": { + "console_format": ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "一般水群 | " + "{message}" + ), + "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 一般水群 | {message}", + }, + "simple": { + "console_format": "{time:HH:mm:ss} | 一般水群 | {message}", # noqa: E501 + "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 一般水群 | {message}", + }, +} + +# Topic日志样式配置 +FOCUS_CHAT_STYLE_CONFIG = { + "advanced": { + "console_format": ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "专注水群 | " + "{message}" + ), + "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 专注水群 | {message}", + }, + "simple": { + "console_format": "{time:HH:mm:ss} | 专注水群 | {message}", # noqa: E501 + "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 专注水群 | {message}", + }, +} + REMOTE_STYLE_CONFIG = { "advanced": { "console_format": ( @@ -915,6 +949,8 @@ API_SERVER_STYLE_CONFIG = API_SERVER_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT els INTEREST_CHAT_STYLE_CONFIG = ( INTEREST_CHAT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else INTEREST_CHAT_STYLE_CONFIG["advanced"] ) +NORMAL_CHAT_STYLE_CONFIG = NORMAL_CHAT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else NORMAL_CHAT_STYLE_CONFIG["advanced"] +FOCUS_CHAT_STYLE_CONFIG = FOCUS_CHAT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else FOCUS_CHAT_STYLE_CONFIG["advanced"] def is_registered_module(record: dict) -> bool: diff --git a/src/common/logger_manager.py b/src/common/logger_manager.py index 48d415bd9..523059313 100644 --- a/src/common/logger_manager.py +++ b/src/common/logger_manager.py @@ -21,6 +21,8 @@ from src.common.logger import ( WILLING_STYLE_CONFIG, PFC_ACTION_PLANNER_STYLE_CONFIG, MAI_STATE_CONFIG, + NORMAL_CHAT_STYLE_CONFIG, + FOCUS_CHAT_STYLE_CONFIG, LPMM_STYLE_CONFIG, HFC_STYLE_CONFIG, OBSERVATION_STYLE_CONFIG, @@ -94,6 +96,8 @@ MODULE_LOGGER_CONFIGS = { "init": INIT_STYLE_CONFIG, # 初始化 "interest_chat": INTEREST_CHAT_STYLE_CONFIG, # 兴趣 "api": API_SERVER_STYLE_CONFIG, # API服务器 + "normal_chat": NORMAL_CHAT_STYLE_CONFIG, # 一般水群 + "focus_chat": FOCUS_CHAT_STYLE_CONFIG, # 专注水群 # ...如有更多模块,继续添加... } diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index b66c3b180..943422029 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "2.1.0" +version = "2.2.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -55,8 +55,6 @@ qq="http://127.0.0.1:18002/api/message" allow_focus_mode = false # 是否允许专注聊天状态 # 是否启用heart_flowC(HFC)模式 # 启用后麦麦会自主选择进入heart_flowC模式(持续一段时间),进行主动的观察和回复,并给出回复,比较消耗token -base_normal_chat_num = 999 # 最多允许多少个群进行普通聊天 -base_focused_chat_num = 4 # 最多允许多少个群进行专注聊天 chat.observation_context_size = 15 # 观察到的最长上下文大小,建议15,太短太长都会导致脑袋尖尖 message_buffer = true # 启用消息缓冲器?启用此项以解决消息的拆分问题,但会使麦麦的回复延迟 @@ -226,14 +224,14 @@ provider = "SILICONFLOW" pri_in = 0 pri_out = 0 -[model.sub_heartflow] #心流:认真水群时,生成麦麦的内心想法,必须使用具有工具调用能力的模型 +[model.sub_heartflow] #心流:认真聊天时,生成麦麦的内心想法,必须使用具有工具调用能力的模型 name = "Pro/deepseek-ai/DeepSeek-V3" provider = "SILICONFLOW" pri_in = 2 pri_out = 8 temp = 0.3 #模型的温度,新V3建议0.1-0.3 -[model.plan] #决策:认真水群时,负责决定麦麦该做什么 +[model.plan] #决策:认真聊天时,负责决定麦麦该做什么 name = "Pro/deepseek-ai/DeepSeek-V3" provider = "SILICONFLOW" pri_in = 2