diff --git a/bot.py b/bot.py index d547c360e..770f5365b 100644 --- a/bot.py +++ b/bot.py @@ -119,7 +119,6 @@ async def graceful_shutdown(): for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) - except Exception as e: logger.error(f"麦麦关闭失败: {e}") diff --git a/requirements.txt b/requirements.txt index 06068d888..91ae096c1 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/scripts/interest_monitor_gui.py b/scripts/interest_monitor_gui.py index 1f03b9695..0c44507c8 100644 --- a/scripts/interest_monitor_gui.py +++ b/scripts/interest_monitor_gui.py @@ -28,8 +28,26 @@ matplotlib.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei"] matplotlib.rcParams["axes.unicode_minus"] = False # 解决负号'-'显示为方块的问题 +def get_random_color(): + """生成随机颜色用于区分线条""" + return "#{:06x}".format(random.randint(0, 0xFFFFFF)) + + +def format_timestamp(ts): + """辅助函数:格式化时间戳,处理 None 或无效值""" + if ts is None: + return "N/A" + try: + # 假设 ts 是 float 类型的时间戳 + dt_object = datetime.fromtimestamp(float(ts)) + return dt_object.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError): + return "Invalid Time" + + class InterestMonitorApp: def __init__(self, root): + self._main_mind_loaded = None self.root = root self.root.title(WINDOW_TITLE) self.root.geometry("1800x800") # 调整窗口大小以适应图表 @@ -173,10 +191,6 @@ class InterestMonitorApp: """当 Combobox 选择改变时调用,更新单个流的图表""" self.update_single_stream_plot() - def get_random_color(self): - """生成随机颜色用于区分线条""" - return "#{:06x}".format(random.randint(0, 0xFFFFFF)) - def load_main_mind_history(self): """只读取包含main_mind的日志行,维护历史想法队列""" if not os.path.exists(LOG_FILE_PATH): @@ -332,7 +346,7 @@ class InterestMonitorApp: new_probability_history[stream_id] = deque(maxlen=MAX_HISTORY_POINTS) # 创建概率 deque # 检查是否已有颜色,没有则分配 if stream_id not in self.stream_colors: - self.stream_colors[stream_id] = self.get_random_color() + self.stream_colors[stream_id] = get_random_color() # *** 存储此 stream_id 最新的显示名称 *** new_stream_display_names[stream_id] = group_name @@ -593,17 +607,6 @@ class InterestMonitorApp: # --- 新增:重新绘制画布 --- self.canvas_single.draw() - def format_timestamp(self, ts): - """辅助函数:格式化时间戳,处理 None 或无效值""" - if ts is None: - return "N/A" - try: - # 假设 ts 是 float 类型的时间戳 - dt_object = datetime.fromtimestamp(float(ts)) - return dt_object.strftime("%Y-%m-%d %H:%M:%S") - except (ValueError, TypeError): - return "Invalid Time" - def update_single_stream_details(self, stream_id): """更新单个流详情区域的标签内容""" if stream_id: @@ -616,8 +619,8 @@ class InterestMonitorApp: self.single_stream_sub_mind.set(f"想法: {sub_mind}") self.single_stream_chat_state.set(f"状态: {chat_state}") self.single_stream_threshold.set(f"阈值以上: {'是' if threshold else '否'}") - self.single_stream_last_active.set(f"最后活跃: {self.format_timestamp(last_active_ts)}") - self.single_stream_last_interaction.set(f"最后交互: {self.format_timestamp(last_interaction_ts)}") + self.single_stream_last_active.set(f"最后活跃: {format_timestamp(last_active_ts)}") + self.single_stream_last_interaction.set(f"最后交互: {format_timestamp(last_interaction_ts)}") else: # 如果没有选择流,则清空详情 self.single_stream_sub_mind.set("想法: N/A") diff --git a/src/common/logger.py b/src/common/logger.py index 6c95935ea..a0d621d9b 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -321,7 +321,7 @@ CHAT_STYLE_CONFIG = { "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 见闻 | {message}", }, "simple": { - "console_format": ("{time:MM-DD HH:mm} | 见闻 | {message}"), # noqa: E501 + "console_format": "{time:MM-DD HH:mm} | 见闻 | {message}", # noqa: E501 "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 见闻 | {message}", }, } @@ -353,7 +353,7 @@ SUB_HEARTFLOW_STYLE_CONFIG = { "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦小脑袋 | {message}", }, "simple": { - "console_format": ("{time:MM-DD HH:mm} | 麦麦水群 | {message}"), # noqa: E501 + "console_format": "{time:MM-DD HH:mm} | 麦麦水群 | {message}", # noqa: E501 "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦水群 | {message}", }, } @@ -369,7 +369,7 @@ SUB_HEARTFLOW_MIND_STYLE_CONFIG = { "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦小脑袋 | {message}", }, "simple": { - "console_format": ("{time:MM-DD HH:mm} | 麦麦小脑袋 | {message}"), # noqa: E501 + "console_format": "{time:MM-DD HH:mm} | 麦麦小脑袋 | {message}", # noqa: E501 "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦小脑袋 | {message}", }, } @@ -385,7 +385,7 @@ SUBHEARTFLOW_MANAGER_STYLE_CONFIG = { "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦水群[管理] | {message}", }, "simple": { - "console_format": ("{time:MM-DD HH:mm} | 麦麦水群[管理] | {message}"), # noqa: E501 + "console_format": "{time:MM-DD HH:mm} | 麦麦水群[管理] | {message}", # noqa: E501 "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦水群[管理] | {message}", }, } @@ -633,7 +633,7 @@ HFC_STYLE_CONFIG = { "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 专注聊天 | {message}", }, "simple": { - "console_format": ("{time:MM-DD HH:mm} | 专注聊天 | {message}"), + "console_format": "{time:MM-DD HH:mm} | 专注聊天 | {message}", "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 专注聊天 | {message}", }, } @@ -1031,7 +1031,7 @@ def add_custom_style_handler( # retention=current_config["retention"], # compression=current_config["compression"], # encoding="utf-8", - # filter=lambda record: record["extra"].get("module") == module_name + # message_filter=lambda record: record["extra"].get("module") == module_name # and record["extra"].get("custom_style") == style_name, # enqueue=True, # ) diff --git a/src/common/message_repository.py b/src/common/message_repository.py index fc7b7e542..72643f913 100644 --- a/src/common/message_repository.py +++ b/src/common/message_repository.py @@ -7,13 +7,16 @@ logger = get_module_logger(__name__) def find_messages( - filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] = None, limit: int = 0, limit_mode: str = "latest" + message_filter: Dict[str, Any], + sort: Optional[List[tuple[str, int]]] = None, + limit: int = 0, + limit_mode: str = "latest", ) -> List[Dict[str, Any]]: """ 根据提供的过滤器、排序和限制条件查找消息。 Args: - filter: MongoDB 查询过滤器。 + message_filter: MongoDB 查询过滤器。 sort: MongoDB 排序条件列表,例如 [('time', 1)]。仅在 limit 为 0 时生效。 limit: 返回的最大文档数,0表示不限制。 limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录(结果仍按时间正序排列)。默认为 'latest'。 @@ -22,7 +25,7 @@ def find_messages( 消息文档列表,如果出错则返回空列表。 """ try: - query = db.messages.find(filter) + query = db.messages.find(message_filter) results: List[Dict[str, Any]] = [] if limit > 0: @@ -46,28 +49,28 @@ def find_messages( return results except Exception as e: log_message = ( - f"查找消息失败 (filter={filter}, sort={sort}, limit={limit}, limit_mode={limit_mode}): {e}\n" + f"查找消息失败 (filter={message_filter}, sort={sort}, limit={limit}, limit_mode={limit_mode}): {e}\n" + traceback.format_exc() ) logger.error(log_message) return [] -def count_messages(filter: Dict[str, Any]) -> int: +def count_messages(message_filter: Dict[str, Any]) -> int: """ 根据提供的过滤器计算消息数量。 Args: - filter: MongoDB 查询过滤器。 + message_filter: MongoDB 查询过滤器。 Returns: 符合条件的消息数量,如果出错则返回 0。 """ try: - count = db.messages.count_documents(filter) + count = db.messages.count_documents(message_filter) return count except Exception as e: - log_message = f"计数消息失败 (filter={filter}): {e}\n" + traceback.format_exc() + log_message = f"计数消息失败 (message_filter={message_filter}): {e}\n" + traceback.format_exc() logger.error(log_message) return 0 diff --git a/src/heart_flow/background_tasks.py b/src/heart_flow/background_tasks.py index 56fee2a91..383606530 100644 --- a/src/heart_flow/background_tasks.py +++ b/src/heart_flow/background_tasks.py @@ -25,6 +25,33 @@ STATE_UPDATE_INTERVAL_SECONDS = 60 LOG_INTERVAL_SECONDS = 3 +async def _run_periodic_loop( + task_name: str, interval: int, task_func: Callable[..., Coroutine[Any, Any, None]], **kwargs +): + """周期性任务主循环""" + while True: + start_time = asyncio.get_event_loop().time() + # logger.debug(f"开始执行后台任务: {task_name}") + + try: + await task_func(**kwargs) # 执行实际任务 + except asyncio.CancelledError: + logger.info(f"任务 {task_name} 已取消") + break + except Exception as e: + logger.error(f"任务 {task_name} 执行出错: {e}") + logger.error(traceback.format_exc()) + + # 计算并执行间隔等待 + elapsed = asyncio.get_event_loop().time() - start_time + sleep_time = max(0, interval - elapsed) + # if sleep_time < 0.1: # 任务超时处理, DEBUG 时可能干扰断点 + # logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)") + await asyncio.sleep(sleep_time) + + logger.debug(f"任务循环结束: {task_name}") # 调整日志信息 + + class BackgroundTaskManager: """管理 Heartflow 的后台周期性任务。""" @@ -143,32 +170,6 @@ class BackgroundTaskManager: # 第三步:清空任务列表 self._tasks = [] # 重置任务列表 - async def _run_periodic_loop( - self, task_name: str, interval: int, task_func: Callable[..., Coroutine[Any, Any, None]], **kwargs - ): - """周期性任务主循环""" - while True: - start_time = asyncio.get_event_loop().time() - # logger.debug(f"开始执行后台任务: {task_name}") - - try: - await task_func(**kwargs) # 执行实际任务 - except asyncio.CancelledError: - logger.info(f"任务 {task_name} 已取消") - break - except Exception as e: - logger.error(f"任务 {task_name} 执行出错: {e}") - logger.error(traceback.format_exc()) - - # 计算并执行间隔等待 - elapsed = asyncio.get_event_loop().time() - start_time - sleep_time = max(0, interval - elapsed) - # if sleep_time < 0.1: # 任务超时处理, DEBUG 时可能干扰断点 - # logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)") - await asyncio.sleep(sleep_time) - - logger.debug(f"任务循环结束: {task_name}") # 调整日志信息 - async def _perform_state_update_work(self): """执行状态更新工作""" previous_status = self.mai_state_info.get_current_state() @@ -249,33 +250,33 @@ class BackgroundTaskManager: # --- Specific Task Runners --- # async def _run_state_update_cycle(self, interval: int): - await self._run_periodic_loop( + await _run_periodic_loop( task_name="State Update", interval=interval, task_func=self._perform_state_update_work ) async def _run_absent_into_chat(self, interval: int): - await self._run_periodic_loop( + await _run_periodic_loop( task_name="Into Chat", interval=interval, task_func=self._perform_absent_into_chat ) async def _run_normal_chat_timeout_check_cycle(self, interval: int): - await self._run_periodic_loop( + await _run_periodic_loop( task_name="Normal Chat Timeout Check", interval=interval, task_func=self._normal_chat_timeout_check_work ) async def _run_cleanup_cycle(self): - await self._run_periodic_loop( + await _run_periodic_loop( task_name="Subflow Cleanup", interval=CLEANUP_INTERVAL_SECONDS, task_func=self._perform_cleanup_work ) async def _run_logging_cycle(self): - await self._run_periodic_loop( + await _run_periodic_loop( task_name="State Logging", interval=LOG_INTERVAL_SECONDS, task_func=self._perform_logging_work ) # --- 新增兴趣评估任务运行器 --- async def _run_into_focus_cycle(self): - await self._run_periodic_loop( + await _run_periodic_loop( task_name="Into Focus", interval=INTEREST_EVAL_INTERVAL_SECONDS, task_func=self._perform_into_focus_work, diff --git a/src/heart_flow/interest_logger.py b/src/heart_flow/interest_logger.py index 04cdb6f43..06d3f1cb0 100644 --- a/src/heart_flow/interest_logger.py +++ b/src/heart_flow/interest_logger.py @@ -23,6 +23,12 @@ LOG_DIRECTORY = "logs/interest" HISTORY_LOG_FILENAME = "interest_history.log" +def _ensure_log_directory(): + """确保日志目录存在。""" + os.makedirs(LOG_DIRECTORY, exist_ok=True) + logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") + + class InterestLogger: """负责定期记录主心流和所有子心流的状态到日志文件。""" @@ -37,12 +43,7 @@ class InterestLogger: self.subheartflow_manager = subheartflow_manager self.heartflow = heartflow # 存储 Heartflow 实例 self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME) - self._ensure_log_directory() - - def _ensure_log_directory(self): - """确保日志目录存在。""" - os.makedirs(LOG_DIRECTORY, exist_ok=True) - logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") + _ensure_log_directory() async def get_all_subflow_states(self) -> Dict[str, Dict]: """并发获取所有活跃子心流的当前完整状态。""" diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py index fbf1be870..f1716d249 100644 --- a/src/heart_flow/sub_mind.py +++ b/src/heart_flow/sub_mind.py @@ -86,6 +86,7 @@ def calculate_replacement_probability(similarity: float) -> float: class SubMind: def __init__(self, subheartflow_id: str, chat_state: ChatStateInfo, observations: Observation): + self.last_active_time = None self.subheartflow_id = subheartflow_id self.llm_model = LLMRequest( diff --git a/src/plugins/PFC/chat_observer.py b/src/plugins/PFC/chat_observer.py index 102c95028..2822e1111 100644 --- a/src/plugins/PFC/chat_observer.py +++ b/src/plugins/PFC/chat_observer.py @@ -37,6 +37,10 @@ class ChatObserver: Args: stream_id: 聊天流ID """ + self.last_check_time = None + self.last_check_time = None + self.last_bot_speak_time = None + self.last_user_speak_time = None if stream_id in self._instances: raise RuntimeError(f"ChatObserver for {stream_id} already exists. Use get_instance() instead.") diff --git a/src/plugins/PFC/message_storage.py b/src/plugins/PFC/message_storage.py index b57f5d2b5..cd6a01e34 100644 --- a/src/plugins/PFC/message_storage.py +++ b/src/plugins/PFC/message_storage.py @@ -51,11 +51,9 @@ class MongoDBMessageStorage(MessageStorage): """MongoDB消息存储实现""" async def get_messages_after(self, chat_id: str, message_time: float) -> List[Dict[str, Any]]: - query = {"chat_id": chat_id} + query = {"chat_id": chat_id, "time": {"$gt": message_time}} # print(f"storage_check_message: {message_time}") - query["time"] = {"$gt": message_time} - return list(db.messages.find(query).sort("time", 1)) async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]: diff --git a/src/plugins/PFC/observation_info.py b/src/plugins/PFC/observation_info.py index 35f393014..af7f537bc 100644 --- a/src/plugins/PFC/observation_info.py +++ b/src/plugins/PFC/observation_info.py @@ -158,6 +158,10 @@ class ObservationInfo: # meta_plan_trigger: bool = False # --- 修改:移除 __post_init__ 的参数 --- + def __init__(self): + self.chat_observer = None + self.chat_observer = None + def __post_init__(self): """初始化后创建handler并进行必要的设置""" self.chat_observer: Optional[ChatObserver] = None # 添加类型提示 diff --git a/src/plugins/PFC/pfc.py b/src/plugins/PFC/pfc.py index d6f4c5192..e12cb2429 100644 --- a/src/plugins/PFC/pfc.py +++ b/src/plugins/PFC/pfc.py @@ -147,14 +147,14 @@ class GoalAnalyzer: # 返回第一个目标作为当前主要目标(如果有) if result: first_goal = result[0] - return (first_goal.get("goal", ""), "", first_goal.get("reasoning", "")) + return first_goal.get("goal", ""), "", first_goal.get("reasoning", "") else: # 单个目标的情况 conversation_info.goal_list.append(result) - return (goal, "", reasoning) + return goal, "", reasoning # 如果解析失败,返回默认值 - return ("", "", "") + return "", "", "" async def _update_goals(self, new_goal: str, method: str, reasoning: str): """更新目标列表 diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 8051d0a8e..7ca1483cc 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -1,3 +1,5 @@ +from typing import Dict, Any + from ..moods.moods import MoodManager # 导入情绪管理器 from ...config.config import global_config from .message import MessageRecv @@ -46,7 +48,7 @@ class ChatBot: except Exception as e: logger.error(f"创建PFC聊天失败: {e}") - async def message_process(self, message_data: str) -> None: + async def message_process(self, message_data: Dict[str, Any]) -> None: """处理转化后的统一格式消息 这个函数本质是预处理一些数据,根据配置信息和消息内容,预处理消息,并分发到合适的消息处理器中 heart_flow模式:使用思维流系统进行回复 diff --git a/src/plugins/chat/message_sender.py b/src/plugins/chat/message_sender.py index 493397bbd..30d943d97 100644 --- a/src/plugins/chat/message_sender.py +++ b/src/plugins/chat/message_sender.py @@ -17,6 +17,40 @@ from src.common.logger_manager import get_logger logger = get_logger("sender") +async def send_via_ws(message: MessageSending) -> None: + """通过 WebSocket 发送消息""" + try: + await send_message(message) + except Exception as e: + logger.error(f"WS发送失败: {e}") + raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e + + +async def send_message( + message: MessageSending, +) -> None: + """发送消息(核心发送逻辑)""" + + # --- 添加计算打字和延迟的逻辑 (从 heartflow_message_sender 移动并调整) --- + typing_time = calculate_typing_time( + input_string=message.processed_plain_text, + thinking_start_time=message.thinking_start_time, + is_emoji=message.is_emoji, + ) + # logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") # 减少日志 + await asyncio.sleep(typing_time) + # logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") # 减少日志 + # --- 结束打字延迟 --- + + message_preview = truncate_message(message.processed_plain_text) + + try: + await send_via_ws(message) + logger.success(f"发送消息 '{message_preview}' 成功") # 调整日志格式 + except Exception as e: + logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") + + class MessageSender: """发送器 (不再是单例)""" @@ -29,39 +63,6 @@ class MessageSender: """设置当前bot实例""" pass - async def send_via_ws(self, message: MessageSending) -> None: - """通过 WebSocket 发送消息""" - try: - await global_api.send_message(message) - except Exception as e: - logger.error(f"WS发送失败: {e}") - raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e - - async def send_message( - self, - message: MessageSending, - ) -> None: - """发送消息(核心发送逻辑)""" - - # --- 添加计算打字和延迟的逻辑 (从 heartflow_message_sender 移动并调整) --- - typing_time = calculate_typing_time( - input_string=message.processed_plain_text, - thinking_start_time=message.thinking_start_time, - is_emoji=message.is_emoji, - ) - # logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") # 减少日志 - await asyncio.sleep(typing_time) - # logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") # 减少日志 - # --- 结束打字延迟 --- - - message_preview = truncate_message(message.processed_plain_text) - - try: - await self.send_via_ws(message) - logger.success(f"发送消息 '{message_preview}' 成功") # 调整日志格式 - except Exception as e: - logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") - class MessageContainer: """单个聊天流的发送/思考消息容器""" @@ -119,7 +120,7 @@ class MessageContainer: """移除指定的消息对象,如果消息存在则返回True,否则返回False""" try: _initial_len = len(self.messages) - # 使用列表推导式或 filter 创建新列表,排除要删除的元素 + # 使用列表推导式或 message_filter 创建新列表,排除要删除的元素 # self.messages = [msg for msg in self.messages if msg is not message_to_remove] # 或者直接 remove (如果确定对象唯一性) if message_to_remove in self.messages: @@ -146,6 +147,7 @@ class MessageManager: """管理所有聊天流的消息容器 (不再是单例)""" def __init__(self): + self._processor_task = None self.containers: Dict[str, MessageContainer] = {} self.storage = MessageStorage() # 添加 storage 实例 self._running = True # 处理器运行状态 @@ -226,7 +228,7 @@ class MessageManager: await message.process() # 预处理消息内容 # 使用全局 message_sender 实例 - await message_sender.send_message(message) + await send_message(message) await self.storage.store_message(message, message.chat_stream) # 移除消息要在发送 *之后* diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index 71980f484..16581f3aa 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -263,7 +263,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: if char in separators: # 检查分割条件:如果分隔符左右都是英文字母,则不分割 can_split = True - if i > 0 and i < len(text) - 1: + if 0 < i < len(text) - 1: prev_char = text[i - 1] next_char = text[i + 1] # if is_english_letter(prev_char) and is_english_letter(next_char) and char == ' ': # 原计划只对空格应用此规则,现应用于所有分隔符 diff --git a/src/plugins/emoji_system/emoji_manager.py b/src/plugins/emoji_system/emoji_manager.py index d6da4ce3f..a75a4f904 100644 --- a/src/plugins/emoji_system/emoji_manager.py +++ b/src/plugins/emoji_system/emoji_manager.py @@ -16,7 +16,6 @@ from ..chat.utils_image import image_path_to_base64, image_manager from ..models.utils_model import LLMRequest from src.common.logger_manager import get_logger - logger = get_logger("emoji") BASE_DIR = os.path.join("data") @@ -24,7 +23,6 @@ EMOJI_DIR = os.path.join(BASE_DIR, "emoji") # 表情包存储目录 EMOJI_REGISTED_DIR = os.path.join(BASE_DIR, "emoji_registed") # 已注册的表情包注册目录 MAX_EMOJI_FOR_PROMPT = 20 # 最大允许的表情包描述数量于图片替换的 prompt 中 - """ 还没经过测试,有些地方数据库和内存数据同步可能不完全 @@ -225,6 +223,140 @@ class MaiEmoji: return False +def _emoji_objects_to_readable_list(emoji_objects): + """将表情包对象列表转换为可读的字符串列表 + + 参数: + emoji_objects: MaiEmoji对象列表 + + 返回: + list[str]: 可读的表情包信息字符串列表 + """ + emoji_info_list = [] + for i, emoji in enumerate(emoji_objects): + # 转换时间戳为可读时间 + time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(emoji.register_time)) + # 构建每个表情包的信息字符串 + emoji_info = f"编号: {i + 1}\n描述: {emoji.description}\n使用次数: {emoji.usage_count}\n添加时间: {time_str}\n" + emoji_info_list.append(emoji_info) + return emoji_info_list + + +def _to_emoji_objects(data): + emoji_objects = [] + load_errors = 0 + emoji_data_list = list(data) + + for emoji_data in emoji_data_list: + full_path = emoji_data.get("full_path") + if not full_path: + logger.warning(f"[加载错误] 数据库记录缺少 'full_path' 字段: {emoji_data.get('_id')}") + load_errors += 1 + continue # 跳过缺少 full_path 的记录 + + try: + # 使用 full_path 初始化 MaiEmoji 对象 + emoji = MaiEmoji(full_path=full_path) + + # 设置从数据库加载的属性 + emoji.hash = emoji_data.get("hash", "") + # 如果 hash 为空,也跳过?取决于业务逻辑 + if not emoji.hash: + logger.warning(f"[加载错误] 数据库记录缺少 'hash' 字段: {full_path}") + load_errors += 1 + continue + + emoji.description = emoji_data.get("description", "") + emoji.emotion = emoji_data.get("emotion", []) + emoji.usage_count = emoji_data.get("usage_count", 0) + # 优先使用 last_used_time,否则用 timestamp,最后用当前时间 + last_used = emoji_data.get("last_used_time") + timestamp = emoji_data.get("timestamp") + emoji.last_used_time = ( + last_used if last_used is not None else (timestamp if timestamp is not None else time.time()) + ) + emoji.register_time = timestamp if timestamp is not None else time.time() + emoji.format = emoji_data.get("format", "") # 加载格式 + + # 不需要再手动设置 path 和 filename,__init__ 会自动处理 + + emoji_objects.append(emoji) + + except ValueError as ve: # 捕获 __init__ 可能的错误 + logger.error(f"[加载错误] 初始化 MaiEmoji 失败 ({full_path}): {ve}") + load_errors += 1 + except Exception as e: + logger.error(f"[加载错误] 处理数据库记录时出错 ({full_path}): {str(e)}") + load_errors += 1 + return emoji_objects, load_errors + return emoji_objects, load_errors + + +def _ensure_emoji_dir(): + """确保表情存储目录存在""" + os.makedirs(EMOJI_DIR, exist_ok=True) + os.makedirs(EMOJI_REGISTED_DIR, exist_ok=True) + + +async def clear_temp_emoji(): + """清理临时表情包 + 清理/data/emoji和/data/image目录下的所有文件 + 当目录中文件数超过100时,会全部删除 + """ + + logger.info("[清理] 开始清理缓存...") + + for need_clear in (os.path.join(BASE_DIR, "emoji"), os.path.join(BASE_DIR, "image")): + if os.path.exists(need_clear): + files = os.listdir(need_clear) + # 如果文件数超过50就全部删除 + if len(files) > 100: + for filename in files: + file_path = os.path.join(need_clear, filename) + if os.path.isfile(file_path): + os.remove(file_path) + logger.debug(f"[清理] 删除: {filename}") + + logger.success("[清理] 完成") + + +async def clean_unused_emojis(emoji_dir, emoji_objects): + """清理指定目录中未被 emoji_objects 追踪的表情包文件""" + if not os.path.exists(emoji_dir): + logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}") + return + + try: + # 获取内存中所有有效表情包的完整路径集合 + tracked_full_paths = {emoji.full_path for emoji in emoji_objects if not emoji.is_deleted} + cleaned_count = 0 + + # 遍历指定目录中的所有文件 + for file_name in os.listdir(emoji_dir): + file_full_path = os.path.join(emoji_dir, file_name) + + # 确保处理的是文件而不是子目录 + if not os.path.isfile(file_full_path): + continue + + # 如果文件不在被追踪的集合中,则删除 + if file_full_path not in tracked_full_paths: + try: + os.remove(file_full_path) + logger.info(f"[清理] 删除未追踪的表情包文件: {file_full_path}") + cleaned_count += 1 + except Exception as e: + logger.error(f"[错误] 删除文件时出错 ({file_full_path}): {str(e)}") + + if cleaned_count > 0: + logger.success(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。") + else: + logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。") + + except Exception as e: + logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {str(e)}") + + class EmojiManager: _instance = None @@ -235,6 +367,7 @@ class EmojiManager: return cls._instance def __init__(self): + self._initialized = None self._scan_task = None self.vlm = LLMRequest(model=global_config.vlm, temperature=0.3, max_tokens=1000, request_type="emoji") self.llm_emotion_judge = LLMRequest( @@ -248,23 +381,18 @@ class EmojiManager: logger.info("启动表情包管理器") - def _ensure_emoji_dir(self): - """确保表情存储目录存在""" - os.makedirs(EMOJI_DIR, exist_ok=True) - os.makedirs(EMOJI_REGISTED_DIR, exist_ok=True) - def initialize(self): """初始化数据库连接和表情目录""" if not self._initialized: try: self._ensure_emoji_collection() - self._ensure_emoji_dir() + _ensure_emoji_dir() self._initialized = True # 更新表情包数量 # 启动时执行一次完整性检查 # await self.check_emoji_file_integrity() - except Exception: - logger.exception("初始化表情管理器失败") + except Exception as e: + logger.exception(f"初始化表情管理器失败: {e}") def _ensure_db(self): """确保数据库已初始化""" @@ -291,12 +419,12 @@ class EmojiManager: db.emoji.create_index([("embedding", "2dsphere")]) db.emoji.create_index([("filename", 1)], unique=True) - def record_usage(self, hash: str): + def record_usage(self, emoji_hash: str): """记录表情使用次数""" try: - db.emoji.update_one({"hash": hash}, {"$inc": {"usage_count": 1}}) + db.emoji.update_one({"hash": emoji_hash}, {"$inc": {"usage_count": 1}}) for emoji in self.emoji_objects: - if emoji.hash == hash: + if emoji.hash == emoji_hash: emoji.usage_count += 1 break @@ -458,7 +586,7 @@ class EmojiManager: self.emoji_objects = [e for e in self.emoji_objects if e not in objects_to_remove] # 清理 EMOJI_REGISTED_DIR 目录中未被追踪的文件 - await self.clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects) + await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects) # 输出清理结果 if removed_count > 0: @@ -477,7 +605,7 @@ class EmojiManager: while True: logger.info("[扫描] 开始检查表情包完整性...") await self.check_emoji_file_integrity() - await self.clear_temp_emoji() + await clear_temp_emoji() logger.info("[扫描] 开始扫描新表情包...") # 检查表情包目录是否存在 @@ -531,51 +659,7 @@ class EmojiManager: self._ensure_db() logger.info("[数据库] 开始加载所有表情包记录...") - all_emoji_data = list(db.emoji.find()) - emoji_objects = [] - load_errors = 0 - - for emoji_data in all_emoji_data: - full_path = emoji_data.get("full_path") - if not full_path: - logger.warning(f"[加载错误] 数据库记录缺少 'full_path' 字段: {emoji_data.get('_id')}") - load_errors += 1 - continue # 跳过缺少 full_path 的记录 - - try: - # 使用 full_path 初始化 MaiEmoji 对象 - emoji = MaiEmoji(full_path=full_path) - - # 设置从数据库加载的属性 - emoji.hash = emoji_data.get("hash", "") - # 如果 hash 为空,也跳过?取决于业务逻辑 - if not emoji.hash: - logger.warning(f"[加载错误] 数据库记录缺少 'hash' 字段: {full_path}") - load_errors += 1 - continue - - emoji.description = emoji_data.get("description", "") - emoji.emotion = emoji_data.get("emotion", []) - emoji.usage_count = emoji_data.get("usage_count", 0) - # 优先使用 last_used_time,否则用 timestamp,最后用当前时间 - last_used = emoji_data.get("last_used_time") - timestamp = emoji_data.get("timestamp") - emoji.last_used_time = ( - last_used if last_used is not None else (timestamp if timestamp is not None else time.time()) - ) - emoji.register_time = timestamp if timestamp is not None else time.time() - emoji.format = emoji_data.get("format", "") # 加载格式 - - # 不需要再手动设置 path 和 filename,__init__ 会自动处理 - - emoji_objects.append(emoji) - - except ValueError as ve: # 捕获 __init__ 可能的错误 - logger.error(f"[加载错误] 初始化 MaiEmoji 失败 ({full_path}): {ve}") - load_errors += 1 - except Exception as e: - logger.error(f"[加载错误] 处理数据库记录时出错 ({full_path}): {str(e)}") - load_errors += 1 + emoji_objects, load_errors = _to_emoji_objects(db.emoji.find()) # 更新内存中的列表和数量 self.emoji_objects = emoji_objects @@ -590,11 +674,11 @@ class EmojiManager: self.emoji_objects = [] # 加载失败则清空列表 self.emoji_num = 0 - async def get_emoji_from_db(self, hash=None): + async def get_emoji_from_db(self, emoji_hash=None): """获取指定哈希值的表情包并初始化为MaiEmoji类对象列表 (主要用于调试或特定查找) 参数: - hash: 可选,如果提供则只返回指定哈希值的表情包 + emoji_hash: 可选,如果提供则只返回指定哈希值的表情包 返回: list[MaiEmoji]: 表情包对象列表 @@ -603,49 +687,14 @@ class EmojiManager: self._ensure_db() query = {} - if hash: - query = {"hash": hash} + if emoji_hash: + query = {"hash": emoji_hash} else: logger.warning( "[查询] 未提供 hash,将尝试加载所有表情包,建议使用 get_all_emoji_from_db 更新管理器状态。" ) - emoji_data_list = list(db.emoji.find(query)) - emoji_objects = [] - load_errors = 0 - - for emoji_data in emoji_data_list: - full_path = emoji_data.get("full_path") - if not full_path: - logger.warning(f"[加载错误] 数据库记录缺少 'full_path' 字段: {emoji_data.get('_id')}") - load_errors += 1 - continue - - try: - emoji = MaiEmoji(full_path=full_path) - emoji.hash = emoji_data.get("hash", "") - if not emoji.hash: - logger.warning(f"[加载错误] 数据库记录缺少 'hash' 字段: {full_path}") - load_errors += 1 - continue - - emoji.description = emoji_data.get("description", "") - emoji.emotion = emoji_data.get("emotion", []) - emoji.usage_count = emoji_data.get("usage_count", 0) - last_used = emoji_data.get("last_used_time") - timestamp = emoji_data.get("timestamp") - emoji.last_used_time = ( - last_used if last_used is not None else (timestamp if timestamp is not None else time.time()) - ) - emoji.register_time = timestamp if timestamp is not None else time.time() - emoji.format = emoji_data.get("format", "") - emoji_objects.append(emoji) - except ValueError as ve: - logger.error(f"[加载错误] 初始化 MaiEmoji 失败 ({full_path}): {ve}") - load_errors += 1 - except Exception as e: - logger.error(f"[加载错误] 处理数据库记录时出错 ({full_path}): {str(e)}") - load_errors += 1 + emoji_objects, load_errors = _to_emoji_objects(db.emoji.find(query)) if load_errors > 0: logger.warning(f"[查询] 加载过程中出现 {load_errors} 个错误。") @@ -656,17 +705,17 @@ class EmojiManager: logger.error(f"[错误] 从数据库获取表情包对象失败: {str(e)}") return [] - async def get_emoji_from_manager(self, hash) -> Optional[MaiEmoji]: + async def get_emoji_from_manager(self, emoji_hash) -> Optional[MaiEmoji]: """从内存中的 emoji_objects 列表获取表情包 参数: - hash: 要查找的表情包哈希值 + emoji_hash: 要查找的表情包哈希值 返回: MaiEmoji 或 None: 如果找到则返回 MaiEmoji 对象,否则返回 None """ for emoji in self.emoji_objects: # 确保对象未被标记为删除且哈希值匹配 - if not emoji.is_deleted and emoji.hash == hash: + if not emoji.is_deleted and emoji.hash == emoji_hash: return emoji return None # 如果循环结束还没找到,则返回 None @@ -709,26 +758,6 @@ class EmojiManager: logger.error(traceback.format_exc()) return False - def _emoji_objects_to_readable_list(self, emoji_objects): - """将表情包对象列表转换为可读的字符串列表 - - 参数: - emoji_objects: MaiEmoji对象列表 - - 返回: - list[str]: 可读的表情包信息字符串列表 - """ - emoji_info_list = [] - for i, emoji in enumerate(emoji_objects): - # 转换时间戳为可读时间 - time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(emoji.register_time)) - # 构建每个表情包的信息字符串 - emoji_info = ( - f"编号: {i + 1}\n描述: {emoji.description}\n使用次数: {emoji.usage_count}\n添加时间: {time_str}\n" - ) - emoji_info_list.append(emoji_info) - return emoji_info_list - async def replace_a_emoji(self, new_emoji: MaiEmoji): """替换一个表情包 @@ -755,7 +784,7 @@ class EmojiManager: ) # 将表情包信息转换为可读的字符串 - emoji_info_list = self._emoji_objects_to_readable_list(selected_emojis) + emoji_info_list = _emoji_objects_to_readable_list(selected_emojis) # 构建提示词 prompt = ( @@ -853,7 +882,7 @@ class EmojiManager: ''' content, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format) if content == "否": - return None, [] + return "", [] # 分析情感含义 emotion_prompt = f""" @@ -989,76 +1018,6 @@ class EmojiManager: logger.error(f"[错误] 删除异常处理文件时出错: {remove_error}") return False - async def clear_temp_emoji(self): - """清理临时表情包 - 清理/data/emoji和/data/image目录下的所有文件 - 当目录中文件数超过100时,会全部删除 - """ - - logger.info("[清理] 开始清理缓存...") - - # 清理emoji目录 - emoji_dir = os.path.join(BASE_DIR, "emoji") - if os.path.exists(emoji_dir): - files = os.listdir(emoji_dir) - # 如果文件数超过50就全部删除 - if len(files) > 100: - for filename in files: - file_path = os.path.join(emoji_dir, filename) - if os.path.isfile(file_path): - os.remove(file_path) - logger.debug(f"[清理] 删除: {filename}") - - # 清理image目录 - image_dir = os.path.join(BASE_DIR, "image") - if os.path.exists(image_dir): - files = os.listdir(image_dir) - # 如果文件数超过50就全部删除 - if len(files) > 100: - for filename in files: - file_path = os.path.join(image_dir, filename) - if os.path.isfile(file_path): - os.remove(file_path) - logger.debug(f"[清理] 删除图片: {filename}") - - logger.success("[清理] 完成") - - async def clean_unused_emojis(self, emoji_dir, emoji_objects): - """清理指定目录中未被 emoji_objects 追踪的表情包文件""" - if not os.path.exists(emoji_dir): - logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}") - return - - try: - # 获取内存中所有有效表情包的完整路径集合 - tracked_full_paths = {emoji.full_path for emoji in emoji_objects if not emoji.is_deleted} - cleaned_count = 0 - - # 遍历指定目录中的所有文件 - for file_name in os.listdir(emoji_dir): - file_full_path = os.path.join(emoji_dir, file_name) - - # 确保处理的是文件而不是子目录 - if not os.path.isfile(file_full_path): - continue - - # 如果文件不在被追踪的集合中,则删除 - if file_full_path not in tracked_full_paths: - try: - os.remove(file_full_path) - logger.info(f"[清理] 删除未追踪的表情包文件: {file_full_path}") - cleaned_count += 1 - except Exception as e: - logger.error(f"[错误] 删除文件时出错 ({file_full_path}): {str(e)}") - - if cleaned_count > 0: - logger.success(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。") - else: - logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。") - - except Exception as e: - logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {str(e)}") - # 创建全局单例 emoji_manager = EmojiManager() diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 73d679e4e..47d420dd7 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -144,6 +144,25 @@ class SenderError(HeartFCError): pass +async def _handle_cycle_delay(action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str): + """处理循环延迟""" + cycle_duration = time.monotonic() - cycle_start_time + + try: + sleep_duration = 0.0 + if not action_taken_this_cycle and cycle_duration < 1: + sleep_duration = 1 - cycle_duration + elif cycle_duration < 0.2: + sleep_duration = 0.2 + + if sleep_duration > 0: + await asyncio.sleep(sleep_duration) + + except asyncio.CancelledError: + logger.info(f"{log_prefix} Sleep interrupted, loop likely cancelling.") + raise + + class HeartFChatting: """ 管理一个连续的Plan-Replier-Sender循环 @@ -327,7 +346,7 @@ class HeartFChatting: self._current_cycle.timers = cycle_timers # 防止循环过快消耗资源 - await self._handle_cycle_delay(action_taken, loop_cycle_start_time, self.log_prefix) + await _handle_cycle_delay(action_taken, loop_cycle_start_time, self.log_prefix) # 完成当前循环并保存历史 self._current_cycle.complete_cycle() @@ -715,24 +734,6 @@ class HeartFChatting: if not self._shutting_down: logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}") - async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str): - """处理循环延迟""" - cycle_duration = time.monotonic() - cycle_start_time - - try: - sleep_duration = 0.0 - if not action_taken_this_cycle and cycle_duration < 1: - sleep_duration = 1 - cycle_duration - elif cycle_duration < 0.2: - sleep_duration = 0.2 - - if sleep_duration > 0: - await asyncio.sleep(sleep_duration) - - except asyncio.CancelledError: - logger.info(f"{log_prefix} Sleep interrupted, loop likely cancelling.") - raise - async def _get_submind_thinking(self, cycle_timers: dict) -> str: """ 获取子思维的思考结果 diff --git a/src/plugins/heartFC_chat/heartFC_sender.py b/src/plugins/heartFC_chat/heartFC_sender.py index 9e65edcfb..a41036679 100644 --- a/src/plugins/heartFC_chat/heartFC_sender.py +++ b/src/plugins/heartFC_chat/heartFC_sender.py @@ -12,6 +12,22 @@ from src.plugins.chat.utils import calculate_typing_time logger = get_logger("sender") +async def send_message(message: MessageSending) -> None: + """合并后的消息发送函数,包含WS发送和日志记录""" + message_preview = truncate_message(message.processed_plain_text) + + try: + # 直接调用API发送消息 + await send_message(message) + logger.success(f"发送消息 '{message_preview}' 成功") + + except Exception as e: + logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") + if not message.message_info.platform: + raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e + raise e # 重新抛出其他异常 + + class HeartFCSender: """管理消息的注册、即时处理、发送和存储,并跟踪思考状态。""" @@ -21,21 +37,6 @@ class HeartFCSender: self.thinking_messages: Dict[str, Dict[str, MessageThinking]] = {} self._thinking_lock = asyncio.Lock() # 保护 thinking_messages 的锁 - async def send_message(self, message: MessageSending) -> None: - """合并后的消息发送函数,包含WS发送和日志记录""" - message_preview = truncate_message(message.processed_plain_text) - - try: - # 直接调用API发送消息 - await global_api.send_message(message) - logger.success(f"发送消息 '{message_preview}' 成功") - - except Exception as e: - logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") - if not message.message_info.platform: - raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e - raise e # 重新抛出其他异常 - async def register_thinking(self, thinking_message: MessageThinking): """注册一个思考中的消息。""" if not thinking_message.chat_stream or not thinking_message.message_info.message_id: @@ -73,7 +74,7 @@ class HeartFCSender: thinking_message = self.thinking_messages.get(chat_id, {}).get(message_id) return thinking_message.thinking_start_time if thinking_message else None - async def type_and_send_message(self, message: MessageSending, type=False): + async def type_and_send_message(self, message: MessageSending, typing=False): """ 立即处理、发送并存储单个 MessageSending 消息。 调用此方法前,应先调用 register_thinking 注册对应的思考消息。 @@ -100,7 +101,7 @@ class HeartFCSender: await message.process() - if type: + if typing: typing_time = calculate_typing_time( input_string=message.processed_plain_text, thinking_start_time=message.thinking_start_time, @@ -108,7 +109,7 @@ class HeartFCSender: ) await asyncio.sleep(typing_time) - await self.send_message(message) + await send_message(message) await self.storage.store_message(message, message.chat_stream) except Exception as e: @@ -136,7 +137,7 @@ class HeartFCSender: await asyncio.sleep(0.5) - await self.send_message(message) # 使用现有的发送方法 + await send_message(message) # 使用现有的发送方法 await self.storage.store_message(message, message.chat_stream) # 使用现有的存储方法 except Exception as e: diff --git a/src/plugins/heartFC_chat/heartflow_processor.py b/src/plugins/heartFC_chat/heartflow_processor.py index f7f3819cf..5bd63b14a 100644 --- a/src/plugins/heartFC_chat/heartflow_processor.py +++ b/src/plugins/heartFC_chat/heartflow_processor.py @@ -12,11 +12,134 @@ from ..chat.chat_stream import chat_manager from ..chat.message_buffer import message_buffer from ..utils.timer_calculator import Timer from src.plugins.person_info.relationship_manager import relationship_manager -from typing import Optional, Tuple +from typing import Optional, Tuple, Dict, Any logger = get_logger("chat") +async def _handle_error(error: Exception, context: str, message: Optional[MessageRecv] = None) -> None: + """统一的错误处理函数 + + Args: + error: 捕获到的异常 + context: 错误发生的上下文描述 + message: 可选的消息对象,用于记录相关消息内容 + """ + logger.error(f"{context}: {error}") + logger.error(traceback.format_exc()) + if message and hasattr(message, "raw_message"): + logger.error(f"相关消息原始内容: {message.raw_message}") + + +async def _process_relationship(message: MessageRecv) -> None: + """处理用户关系逻辑 + + Args: + message: 消息对象,包含用户信息 + """ + platform = message.message_info.platform + user_id = message.message_info.user_info.user_id + nickname = message.message_info.user_info.user_nickname + cardname = message.message_info.user_info.user_cardname or nickname + + is_known = await relationship_manager.is_known_some_one(platform, user_id) + + if not is_known: + logger.info(f"首次认识用户: {nickname}") + await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname, "") + elif not await relationship_manager.is_qved_name(platform, user_id): + logger.info(f"给用户({nickname},{cardname})取名: {nickname}") + await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname, "") + + +async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool]: + """计算消息的兴趣度 + + Args: + message: 待处理的消息对象 + + Returns: + Tuple[float, bool]: (兴趣度, 是否被提及) + """ + is_mentioned, _ = is_mentioned_bot_in_message(message) + interested_rate = 0.0 + + with Timer("记忆激活"): + interested_rate = await HippocampusManager.get_instance().get_activate_from_text( + message.processed_plain_text, + fast_retrieval=True, + ) + logger.trace(f"记忆激活率: {interested_rate:.2f}") + + if is_mentioned: + interest_increase_on_mention = 1 + interested_rate += interest_increase_on_mention + + return interested_rate, is_mentioned + + +def _get_message_type(message: MessageRecv) -> str: + """获取消息类型 + + Args: + message: 消息对象 + + Returns: + str: 消息类型 + """ + if message.message_segment.type != "seglist": + return message.message_segment.type + + if ( + isinstance(message.message_segment.data, list) + and all(isinstance(x, Seg) for x in message.message_segment.data) + and len(message.message_segment.data) == 1 + ): + return message.message_segment.data[0].type + + return "seglist" + + +def _check_ban_words(text: str, chat, userinfo) -> bool: + """检查消息是否包含过滤词 + + Args: + text: 待检查的文本 + chat: 聊天对象 + userinfo: 用户信息 + + Returns: + bool: 是否包含过滤词 + """ + for word in global_config.ban_words: + if word in text: + chat_name = chat.group_info.group_name if chat.group_info else "私聊" + logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}") + logger.info(f"[过滤词识别]消息中含有{word},filtered") + return True + return False + + +def _check_ban_regex(text: str, chat, userinfo) -> bool: + """检查消息是否匹配过滤正则表达式 + + Args: + text: 待检查的文本 + chat: 聊天对象 + userinfo: 用户信息 + + Returns: + bool: 是否匹配过滤正则 + """ + for pattern in global_config.ban_msgs_regex: + if pattern.search(text): + chat_name = chat.group_info.group_name if chat.group_info else "私聊" + logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}") + logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") + return True + return False + + class HeartFCProcessor: """心流处理器,负责处理接收到的消息并计算兴趣度""" @@ -24,86 +147,7 @@ class HeartFCProcessor: """初始化心流处理器,创建消息存储实例""" self.storage = MessageStorage() - async def _handle_error(self, error: Exception, context: str, message: Optional[MessageRecv] = None) -> None: - """统一的错误处理函数 - - Args: - error: 捕获到的异常 - context: 错误发生的上下文描述 - message: 可选的消息对象,用于记录相关消息内容 - """ - logger.error(f"{context}: {error}") - logger.error(traceback.format_exc()) - if message and hasattr(message, "raw_message"): - logger.error(f"相关消息原始内容: {message.raw_message}") - - async def _process_relationship(self, message: MessageRecv) -> None: - """处理用户关系逻辑 - - Args: - message: 消息对象,包含用户信息 - """ - platform = message.message_info.platform - user_id = message.message_info.user_info.user_id - nickname = message.message_info.user_info.user_nickname - cardname = message.message_info.user_info.user_cardname or nickname - - is_known = await relationship_manager.is_known_some_one(platform, user_id) - - if not is_known: - logger.info(f"首次认识用户: {nickname}") - await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname, "") - elif not await relationship_manager.is_qved_name(platform, user_id): - logger.info(f"给用户({nickname},{cardname})取名: {nickname}") - await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname, "") - - async def _calculate_interest(self, message: MessageRecv) -> Tuple[float, bool]: - """计算消息的兴趣度 - - Args: - message: 待处理的消息对象 - - Returns: - Tuple[float, bool]: (兴趣度, 是否被提及) - """ - is_mentioned, _ = is_mentioned_bot_in_message(message) - interested_rate = 0.0 - - with Timer("记忆激活"): - interested_rate = await HippocampusManager.get_instance().get_activate_from_text( - message.processed_plain_text, - fast_retrieval=True, - ) - logger.trace(f"记忆激活率: {interested_rate:.2f}") - - if is_mentioned: - interest_increase_on_mention = 1 - interested_rate += interest_increase_on_mention - - return interested_rate, is_mentioned - - def _get_message_type(self, message: MessageRecv) -> str: - """获取消息类型 - - Args: - message: 消息对象 - - Returns: - str: 消息类型 - """ - if message.message_segment.type != "seglist": - return message.message_segment.type - - if ( - isinstance(message.message_segment.data, list) - and all(isinstance(x, Seg) for x in message.message_segment.data) - and len(message.message_segment.data) == 1 - ): - return message.message_segment.data[0].type - - return "seglist" - - async def process_message(self, message_data: str) -> None: + async def process_message(self, message_data: Dict[str, Any]) -> None: """处理接收到的原始消息数据 主要流程: @@ -138,7 +182,7 @@ class HeartFCProcessor: await message.process() # 3. 过滤检查 - if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex( + if _check_ban_words(message.processed_plain_text, chat, userinfo) or _check_ban_regex( message.raw_message, chat, userinfo ): return @@ -146,7 +190,7 @@ class HeartFCProcessor: # 4. 缓冲检查 buffer_result = await message_buffer.query_buffer_result(message) if not buffer_result: - msg_type = self._get_message_type(message) + msg_type = _get_message_type(message) type_messages = { "text": f"触发缓冲,消息:{message.processed_plain_text}", "image": "触发缓冲,表情包/图片等待中", @@ -160,7 +204,7 @@ class HeartFCProcessor: logger.trace(f"存储成功: {message.processed_plain_text}") # 6. 兴趣度计算与更新 - interested_rate, is_mentioned = await self._calculate_interest(message) + interested_rate, is_mentioned = await _calculate_interest(message) await subheartflow.interest_chatting.increase_interest(value=interested_rate) subheartflow.interest_chatting.add_interest_dict(message, interested_rate, is_mentioned) @@ -175,45 +219,7 @@ class HeartFCProcessor: ) # 8. 关系处理 - await self._process_relationship(message) + await _process_relationship(message) except Exception as e: - await self._handle_error(e, "消息处理失败", message) - - def _check_ban_words(self, text: str, chat, userinfo) -> bool: - """检查消息是否包含过滤词 - - Args: - text: 待检查的文本 - chat: 聊天对象 - userinfo: 用户信息 - - Returns: - bool: 是否包含过滤词 - """ - for word in global_config.ban_words: - if word in text: - chat_name = chat.group_info.group_name if chat.group_info else "私聊" - logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}") - logger.info(f"[过滤词识别]消息中含有{word},filtered") - return True - return False - - def _check_ban_regex(self, text: str, chat, userinfo) -> bool: - """检查消息是否匹配过滤正则表达式 - - Args: - text: 待检查的文本 - chat: 聊天对象 - userinfo: 用户信息 - - Returns: - bool: 是否匹配过滤正则 - """ - for pattern in global_config.ban_msgs_regex: - if pattern.search(text): - chat_name = chat.group_info.group_name if chat.group_info else "私聊" - logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}") - logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered") - return True - return False + await _handle_error(e, "消息处理失败", message) diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index 40819f013..9b83f2652 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -151,6 +151,96 @@ JSON 结构如下,包含三个字段 "action", "reasoning", "emoji_query": Prompt("\n你有以下这些**知识**:\n{prompt_info}\n请你**记住上面的知识**,之后可能会用到。\n", "knowledge_prompt") +async def _build_prompt_focus( + reason, current_mind_info, structured_info, chat_stream, sender_name +) -> tuple[str, str]: + individuality = Individuality.get_instance() + prompt_personality = individuality.get_prompt(x_person=0, level=2) + # 日程构建 + # schedule_prompt = f'''你现在正在做的事情是:{bot_schedule.get_current_num_task(num = 1,time_info = False)}''' + + if chat_stream.group_info: + chat_in_group = True + else: + chat_in_group = False + + message_list_before_now = get_raw_msg_before_timestamp_with_chat( + chat_id=chat_stream.stream_id, + timestamp=time.time(), + limit=global_config.observation_context_size, + ) + + chat_talking_prompt = await build_readable_messages( + message_list_before_now, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="normal", + read_mark=0.0, + truncate=True, + ) + + # 中文高手(新加的好玩功能) + prompt_ger = "" + if random.random() < 0.04: + prompt_ger += "你喜欢用倒装句" + if random.random() < 0.02: + prompt_ger += "你喜欢用反问句" + + reply_styles1 = [ + ("给出日常且口语化的回复,平淡一些", 0.4), # 40%概率 + ("给出非常简短的回复", 0.4), # 40%概率 + ("给出缺失主语的回复,简短", 0.15), # 15%概率 + ("给出带有语病的回复,朴实平淡", 0.05), # 5%概率 + ] + reply_style1_chosen = random.choices( + [style[0] for style in reply_styles1], weights=[style[1] for style in reply_styles1], k=1 + )[0] + + reply_styles2 = [ + ("不要回复的太有条理,可以有个性", 0.6), # 60%概率 + ("不要回复的太有条理,可以复读", 0.15), # 15%概率 + ("回复的认真一些", 0.2), # 20%概率 + ("可以回复单个表情符号", 0.05), # 5%概率 + ] + reply_style2_chosen = random.choices( + [style[0] for style in reply_styles2], weights=[style[1] for style in reply_styles2], k=1 + )[0] + + if structured_info: + structured_info_prompt = await global_prompt_manager.format_prompt( + "info_from_tools", structured_info=structured_info + ) + else: + structured_info_prompt = "" + + logger.debug("开始构建prompt") + + prompt = await global_prompt_manager.format_prompt( + "heart_flow_prompt", + info_from_tools=structured_info_prompt, + chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private1"), + chat_talking_prompt=chat_talking_prompt, + bot_name=global_config.BOT_NICKNAME, + prompt_personality=prompt_personality, + chat_target_2=await global_prompt_manager.get_prompt_async("chat_target_group2") + if chat_in_group + else await global_prompt_manager.get_prompt_async("chat_target_private2"), + current_mind_info=current_mind_info, + reply_style2=reply_style2_chosen, + reply_style1=reply_style1_chosen, + reason=reason, + prompt_ger=prompt_ger, + moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), + sender_name=sender_name, + ) + + logger.debug(f"focus_chat_prompt: \n{prompt}") + + return prompt + + class PromptBuilder: def __init__(self): self.prompt_built = "" @@ -170,7 +260,7 @@ class PromptBuilder: return await self._build_prompt_normal(chat_stream, message_txt, sender_name) elif build_mode == "focus": - return await self._build_prompt_focus( + return await _build_prompt_focus( reason, current_mind_info, structured_info, @@ -179,95 +269,6 @@ class PromptBuilder: ) return None - async def _build_prompt_focus( - self, reason, current_mind_info, structured_info, chat_stream, sender_name - ) -> tuple[str, str]: - individuality = Individuality.get_instance() - prompt_personality = individuality.get_prompt(x_person=0, level=2) - # 日程构建 - # schedule_prompt = f'''你现在正在做的事情是:{bot_schedule.get_current_num_task(num = 1,time_info = False)}''' - - if chat_stream.group_info: - chat_in_group = True - else: - chat_in_group = False - - message_list_before_now = get_raw_msg_before_timestamp_with_chat( - chat_id=chat_stream.stream_id, - timestamp=time.time(), - limit=global_config.observation_context_size, - ) - - chat_talking_prompt = await build_readable_messages( - message_list_before_now, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="normal", - read_mark=0.0, - truncate=True, - ) - - # 中文高手(新加的好玩功能) - prompt_ger = "" - if random.random() < 0.04: - prompt_ger += "你喜欢用倒装句" - if random.random() < 0.02: - prompt_ger += "你喜欢用反问句" - - reply_styles1 = [ - ("给出日常且口语化的回复,平淡一些", 0.4), # 40%概率 - ("给出非常简短的回复", 0.4), # 40%概率 - ("给出缺失主语的回复,简短", 0.15), # 15%概率 - ("给出带有语病的回复,朴实平淡", 0.05), # 5%概率 - ] - reply_style1_chosen = random.choices( - [style[0] for style in reply_styles1], weights=[style[1] for style in reply_styles1], k=1 - )[0] - - reply_styles2 = [ - ("不要回复的太有条理,可以有个性", 0.6), # 60%概率 - ("不要回复的太有条理,可以复读", 0.15), # 15%概率 - ("回复的认真一些", 0.2), # 20%概率 - ("可以回复单个表情符号", 0.05), # 5%概率 - ] - reply_style2_chosen = random.choices( - [style[0] for style in reply_styles2], weights=[style[1] for style in reply_styles2], k=1 - )[0] - - if structured_info: - structured_info_prompt = await global_prompt_manager.format_prompt( - "info_from_tools", structured_info=structured_info - ) - else: - structured_info_prompt = "" - - logger.debug("开始构建prompt") - - prompt = await global_prompt_manager.format_prompt( - "heart_flow_prompt", - info_from_tools=structured_info_prompt, - chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1") - if chat_in_group - else await global_prompt_manager.get_prompt_async("chat_target_private1"), - chat_talking_prompt=chat_talking_prompt, - bot_name=global_config.BOT_NICKNAME, - prompt_personality=prompt_personality, - chat_target_2=await global_prompt_manager.get_prompt_async("chat_target_group2") - if chat_in_group - else await global_prompt_manager.get_prompt_async("chat_target_private2"), - current_mind_info=current_mind_info, - reply_style2=reply_style2_chosen, - reply_style1=reply_style1_chosen, - reason=reason, - prompt_ger=prompt_ger, - moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), - sender_name=sender_name, - ) - - logger.debug(f"focus_chat_prompt: \n{prompt}") - - return prompt - async def _build_prompt_normal(self, chat_stream, message_txt: str, sender_name: str = "某人") -> tuple[str, str]: individuality = Individuality.get_instance() prompt_personality = individuality.get_prompt(x_person=2, level=2) diff --git a/src/plugins/knowledge/src/qa_manager.py b/src/plugins/knowledge/src/qa_manager.py index a09879a17..11067d0e5 100644 --- a/src/plugins/knowledge/src/qa_manager.py +++ b/src/plugins/knowledge/src/qa_manager.py @@ -27,7 +27,7 @@ class QAManager: self.kg_manager = kg_manager self.llm_client_list = { "embedding": llm_client_embedding, - "filter": llm_client_filter, + "message_filter": llm_client_filter, "qa": llm_client_qa, } diff --git a/src/plugins/respon_info_catcher/info_catcher.py b/src/plugins/respon_info_catcher/info_catcher.py index 5cb67a16d..08be4a76b 100644 --- a/src/plugins/respon_info_catcher/info_catcher.py +++ b/src/plugins/respon_info_catcher/info_catcher.py @@ -185,32 +185,24 @@ class InfoCatcher: try: # 将消息对象转换为可序列化的字典喵~ - thinking_log_data = { - "chat_id": self.chat_id, - # "response_mode": self.response_mode, # 这个也删掉喵~ - "trigger_text": self.trigger_response_text, - "response_text": self.response_text, - "trigger_info": { + thinking_log_data = {"chat_id": self.chat_id, "trigger_text": self.trigger_response_text, + "response_text": self.response_text, "trigger_info": { "time": self.trigger_response_time, "message": self.message_to_dict(self.trigger_response_message), - }, - "response_info": { + }, "response_info": { "time": self.response_time, "message": self.response_messages, - }, - "timing_results": self.timing_results, - "chat_history": self.message_list_to_dict(self.chat_history), - "chat_history_in_thinking": self.message_list_to_dict(self.chat_history_in_thinking), - "chat_history_after_response": self.message_list_to_dict(self.chat_history_after_response), - } + }, "timing_results": self.timing_results, "chat_history": self.message_list_to_dict(self.chat_history), + "chat_history_in_thinking": self.message_list_to_dict(self.chat_history_in_thinking), + "chat_history_after_response": self.message_list_to_dict( + self.chat_history_after_response), "heartflow_data": self.heartflow_data, + "reasoning_data": self.reasoning_data} # 根据不同的响应模式添加相应的数据喵~ # 现在直接都加上去好了喵~ # if self.response_mode == "heart_flow": # thinking_log_data["mode_specific_data"] = self.heartflow_data # elif self.response_mode == "reasoning": # thinking_log_data["mode_specific_data"] = self.reasoning_data - thinking_log_data["heartflow_data"] = self.heartflow_data - thinking_log_data["reasoning_data"] = self.reasoning_data # 将数据插入到 thinking_log 集合中喵~ db.thinking_log.insert_one(thinking_log_data) diff --git a/src/plugins/schedule/schedule_generator.py b/src/plugins/schedule/schedule_generator.py index 761fcb7dc..ee7bdee15 100644 --- a/src/plugins/schedule/schedule_generator.py +++ b/src/plugins/schedule/schedule_generator.py @@ -30,6 +30,7 @@ class ScheduleGenerator: def __init__(self): # 使用离线LLM模型 + self.enable_output = None self.llm_scheduler_all = LLMRequest( model=global_config.llm_reasoning, temperature=global_config.SCHEDULE_TEMPERATURE + 0.3, diff --git a/src/plugins/utils/chat_message_builder.py b/src/plugins/utils/chat_message_builder.py index a7eef4431..75fca69cf 100644 --- a/src/plugins/utils/chat_message_builder.py +++ b/src/plugins/utils/chat_message_builder.py @@ -123,7 +123,7 @@ def num_new_messages_since(chat_id: str, timestamp_start: float = 0.0, timestamp return 0 # 起始时间大于等于结束时间,没有新消息 filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}} - return count_messages(filter=filter_query) + return count_messages(message_filter=filter_query) def num_new_messages_since_with_users( @@ -137,7 +137,7 @@ def num_new_messages_since_with_users( "time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}, } - return count_messages(filter=filter_query) + return count_messages(message_filter=filter_query) async def _build_readable_messages_internal( @@ -227,7 +227,7 @@ async def _build_readable_messages_internal( replace_content = "......(太长了)" truncated_content = content - if limit > 0 and original_len > limit: + if 0 < limit < original_len: truncated_content = f"{content[:limit]}{replace_content}" message_details.append((timestamp, name, truncated_content)) diff --git a/src/plugins/willing/mode_custom.py b/src/plugins/willing/mode_custom.py index c3a5c3078..4b2e8f3c3 100644 --- a/src/plugins/willing/mode_custom.py +++ b/src/plugins/willing/mode_custom.py @@ -2,5 +2,23 @@ from .willing_manager import BaseWillingManager class CustomWillingManager(BaseWillingManager): + async def async_task_starter(self) -> None: + pass + + async def before_generate_reply_handle(self, message_id: str): + pass + + async def after_generate_reply_handle(self, message_id: str): + pass + + async def not_reply_handle(self, message_id: str): + pass + + async def get_reply_probability(self, message_id: str): + pass + + async def bombing_buffer_message_handle(self, message_id: str): + pass + def __init__(self): super().__init__()