diff --git a/src/common/message_repository.py b/src/common/message_repository.py index 007de6e1a..fc7b7e542 100644 --- a/src/common/message_repository.py +++ b/src/common/message_repository.py @@ -5,7 +5,10 @@ from typing import List, Dict, Any, Optional logger = get_module_logger(__name__) -def find_messages(filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] = None, limit: int = 0, limit_mode: str = 'latest') -> List[Dict[str, Any]]: + +def find_messages( + filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] = None, limit: int = 0, limit_mode: str = "latest" +) -> List[Dict[str, Any]]: """ 根据提供的过滤器、排序和限制条件查找消息。 @@ -23,17 +26,17 @@ def find_messages(filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] results: List[Dict[str, Any]] = [] if limit > 0: - if limit_mode == 'earliest': + if limit_mode == "earliest": # 获取时间最早的 limit 条记录,已经是正序 - query = query.sort([('time', 1)]).limit(limit) + query = query.sort([("time", 1)]).limit(limit) results = list(query) else: # 默认为 'latest' # 获取时间最晚的 limit 条记录 - query = query.sort([('time', -1)]).limit(limit) + query = query.sort([("time", -1)]).limit(limit) latest_results = list(query) # 将结果按时间正序排列 # 假设消息文档中总是有 'time' 字段且可排序 - results = sorted(latest_results, key=lambda msg: msg.get('time')) + results = sorted(latest_results, key=lambda msg: msg.get("time")) else: # limit 为 0 时,应用传入的 sort 参数 if sort: @@ -42,10 +45,14 @@ def find_messages(filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] return results except Exception as e: - log_message = f"查找消息失败 (filter={filter}, sort={sort}, limit={limit}, limit_mode={limit_mode}): {e}\n" + traceback.format_exc() + log_message = ( + f"查找消息失败 (filter={filter}, sort={sort}, limit={limit}, limit_mode={limit_mode}): {e}\n" + + traceback.format_exc() + ) logger.error(log_message) return [] + def count_messages(filter: Dict[str, Any]) -> int: """ 根据提供的过滤器计算消息数量。 @@ -64,4 +71,5 @@ def count_messages(filter: Dict[str, Any]) -> int: logger.error(log_message) return 0 -# 你可以在这里添加更多与 messages 集合相关的数据库操作函数,例如 find_one_message, insert_message 等。 \ No newline at end of file + +# 你可以在这里添加更多与 messages 集合相关的数据库操作函数,例如 find_one_message, insert_message 等。 diff --git a/src/do_tool/tool_use.py b/src/do_tool/tool_use.py index 877ee6e9e..52c26f80e 100644 --- a/src/do_tool/tool_use.py +++ b/src/do_tool/tool_use.py @@ -33,7 +33,7 @@ class ToolUser: Returns: str: 构建好的提示词 """ - + if subheartflow: mid_memory_info = subheartflow.observations[0].mid_memory_info # print(f"intol111111111111111111111111111111111222222222222mid_memory_info:{mid_memory_info}") diff --git a/src/heart_flow/observation.py b/src/heart_flow/observation.py index 5c1a29e87..9903b184b 100644 --- a/src/heart_flow/observation.py +++ b/src/heart_flow/observation.py @@ -5,7 +5,12 @@ from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config from src.common.logger import get_module_logger import traceback -from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages,get_raw_msg_by_timestamp_with_chat,num_new_messages_since +from src.plugins.utils.chat_message_builder import ( + get_raw_msg_before_timestamp_with_chat, + build_readable_messages, + get_raw_msg_by_timestamp_with_chat, + num_new_messages_since, +) logger = get_module_logger("observation") @@ -40,13 +45,12 @@ class ChattingObservation(Observation): self.llm_summary = LLMRequest( model=global_config.llm_observation, temperature=0.7, max_tokens=300, request_type="chat_observation" ) - + async def initialize(self): initial_messages = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 10) self.talking_message = initial_messages # 将这些消息设为初始上下文 self.talking_message_str = await build_readable_messages(self.talking_message) - # 进行一次观察 返回观察结果observe_info def get_observe_info(self, ids=None): if ids: @@ -77,11 +81,11 @@ class ChattingObservation(Observation): # 查找新消息,最多获取 self.max_now_obs_len 条 print("2222222222222222221111111111111111开始观察") new_messages_list = get_raw_msg_by_timestamp_with_chat( - chat_id=self.chat_id, - timestamp_start=self.last_observe_time, - timestamp_end=datetime.now().timestamp(), # 使用当前时间作为结束时间戳 - limit=self.max_now_obs_len, - limit_mode="latest" + chat_id=self.chat_id, + timestamp_start=self.last_observe_time, + timestamp_end=datetime.now().timestamp(), # 使用当前时间作为结束时间戳 + limit=self.max_now_obs_len, + limit_mode="latest", ) print(f"2222222222222222221111111111111111获取到新消息{len(new_messages_list)}条") if new_messages_list: # 检查列表是否为空 @@ -93,11 +97,13 @@ class ChattingObservation(Observation): messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len oldest_messages = self.talking_message[:messages_to_remove_count] self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的 - + oldest_messages_str = await build_readable_messages(oldest_messages) # 调用 LLM 总结主题 - prompt = f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n用一句话概括包括人物事件和主要信息,不要分点:" + prompt = ( + f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n用一句话概括包括人物事件和主要信息,不要分点:" + ) summary = "没有主题的闲聊" # 默认值 try: summary_result, _ = await self.llm_summary.generate_response_async(prompt) @@ -131,10 +137,10 @@ class ChattingObservation(Observation): # except Exception as e: # 将异常处理移至此处以覆盖整个总结过程 # logger.error(f"处理和总结旧消息时出错 for chat {self.chat_id}: {e}") # traceback.print_exc() # 记录详细堆栈 - # print(f"处理后self.talking_message:{self.talking_message}") + # print(f"处理后self.talking_message:{self.talking_message}") self.talking_message_str = await build_readable_messages(self.talking_message) - + logger.trace( f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.talking_message_str}" ) diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 8a38554f7..439b2a3f0 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -121,7 +121,6 @@ class SubHeartflow: logger.error(f"[{self.subheartflow_id}] Error during pre-thinking observation: {e}") logger.error(traceback.format_exc()) - async def do_thinking_before_reply( self, extra_info: str, @@ -176,26 +175,22 @@ class SubHeartflow: prompt_personality += f",{random_detail}" time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - - + # 创建局部Random对象避免影响全局随机状态 local_random = random.Random() current_minute = int(time.strftime("%M")) local_random.seed(current_minute) # 用分钟作为种子确保每分钟内选择一致 - + hf_options = [ ("继续生成你在这个聊天中的想法,在原来想法的基础上继续思考", 0.7), - ("生成你在这个聊天中的想法,在原来的想法上尝试新的话题", 0.1), + ("生成你在这个聊天中的想法,在原来的想法上尝试新的话题", 0.1), ("生成你在这个聊天中的想法,不要太深入", 0.1), - ("继续生成你在这个聊天中的想法,进行深入思考", 0.1) + ("继续生成你在这个聊天中的想法,进行深入思考", 0.1), ] - + hf_do_next = local_random.choices( - [option[0] for option in hf_options], - weights=[option[1] for option in hf_options], - k=1 + [option[0] for option in hf_options], weights=[option[1] for option in hf_options], k=1 )[0] - prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_before")).format( extra_info=extra_info_prompt, @@ -235,7 +230,6 @@ class SubHeartflow: # logger.info(f"[{self.subheartflow_id}] 思考前脑内状态:{self.current_mind}") return self.current_mind, self.past_mind - def update_current_mind(self, response): self.past_mind.append(self.current_mind) self.current_mind = response diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index aefd9f0c4..9c98a16a5 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -23,7 +23,7 @@ logger = get_module_logger("chat_utils") def is_english_letter(char: str) -> bool: """检查字符是否为英文字母(忽略大小写)""" - return 'a' <= char.lower() <= 'z' + return "a" <= char.lower() <= "z" def db_message_to_str(message_dict: Dict) -> str: @@ -233,8 +233,8 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: List[str]: 分割和合并后的句子列表 """ # 处理两个汉字中间的换行符 - text = re.sub(r'([\u4e00-\u9fff])\n([\u4e00-\u9fff])', r'\1。\2', text) - + text = re.sub(r"([\u4e00-\u9fff])\n([\u4e00-\u9fff])", r"\1。\2", text) + len_text = len(text) if len_text < 3: if random.random() < 0.01: @@ -243,7 +243,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: return [text] # 定义分隔符 - separators = {',', ',', ' ', '。', ';'} + separators = {",", ",", " ", "。", ";"} segments = [] current_segment = "" @@ -255,19 +255,19 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: # 检查分割条件:如果分隔符左右都是英文字母,则不分割 can_split = True if i > 0 and i < len(text) - 1: - prev_char = text[i-1] - next_char = text[i+1] + prev_char = text[i - 1] + next_char = text[i + 1] # if is_english_letter(prev_char) and is_english_letter(next_char) and char == ' ': # 原计划只对空格应用此规则,现应用于所有分隔符 if is_english_letter(prev_char) and is_english_letter(next_char): - can_split = False + can_split = False if can_split: # 只有当当前段不为空时才添加 if current_segment: segments.append((current_segment, char)) # 如果当前段为空,但分隔符是空格,则也添加一个空段(保留空格) - elif char == ' ': - segments.append(("", char)) + elif char == " ": + segments.append(("", char)) current_segment = "" else: # 不分割,将分隔符加入当前段 @@ -287,7 +287,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: if not segments: # recovered_text = recover_kaomoji([text], mapping) # 恢复原文本中的颜文字 - 已移至上层处理 # return [s for s in recovered_text if s] # 返回非空结果 - return [text] if text else [] # 如果原始文本非空,则返回原始文本(可能只包含未被分割的字符或颜文字占位符) + return [text] if text else [] # 如果原始文本非空,则返回原始文本(可能只包含未被分割的字符或颜文字占位符) # 2. 概率合并 if len_text < 12: @@ -307,23 +307,23 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: # 检查是否可以与下一段合并 # 条件:不是最后一段,且随机数小于合并概率,且当前段有内容(避免合并空段) if idx + 1 < len(segments) and random.random() < merge_probability and current_content: - next_content, next_sep = segments[idx+1] + next_content, next_sep = segments[idx + 1] # 合并: (内容1 + 分隔符1 + 内容2, 分隔符2) # 只有当下一段也有内容时才合并文本,否则只传递分隔符 if next_content: - merged_content = current_content + current_sep + next_content - merged_segments.append((merged_content, next_sep)) - else: # 下一段内容为空,只保留当前内容和下一段的分隔符 - merged_segments.append((current_content, next_sep)) + merged_content = current_content + current_sep + next_content + merged_segments.append((merged_content, next_sep)) + else: # 下一段内容为空,只保留当前内容和下一段的分隔符 + merged_segments.append((current_content, next_sep)) - idx += 2 # 跳过下一段,因为它已被合并 + idx += 2 # 跳过下一段,因为它已被合并 else: # 不合并,直接添加当前段 merged_segments.append((current_content, current_sep)) idx += 1 # 提取最终的句子内容 - final_sentences = [content for content, sep in merged_segments if content] # 只保留有内容的段 + final_sentences = [content for content, sep in merged_segments if content] # 只保留有内容的段 # 清理可能引入的空字符串 final_sentences = [s for s in final_sentences if s] @@ -414,7 +414,7 @@ def process_llm_response(text: str) -> List[str]: sentences.append(content) # 在所有句子处理完毕后,对包含占位符的列表进行恢复 sentences = recover_kaomoji(sentences, kaomoji_mapping) - + print(sentences) return sentences @@ -579,17 +579,17 @@ def get_western_ratio(paragraph): 原理:检查段落中字母数字字符的西文比例 通过is_english_letter函数判断每个字符是否为西文 只检查字母数字字符,忽略标点符号和空格等非字母数字字符 - + Args: paragraph: 要检查的文本段落 - + Returns: float: 西文字符比例(0.0-1.0),如果没有字母数字字符则返回0.0 """ alnum_chars = [char for char in paragraph if char.isalnum()] if not alnum_chars: return 0.0 - + western_count = sum(1 for char in alnum_chars if is_english_letter(char)) return western_count / len(alnum_chars) diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py index 3fbada3e0..37708a94f 100644 --- a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py +++ b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py @@ -177,8 +177,7 @@ class HeartFC_Processor: message.message_info.platform, message.message_info.user_info.user_id, message.message_info.user_info.user_nickname, - message.message_info.user_info.user_cardname - or message.message_info.user_info.user_nickname, + message.message_info.user_info.user_cardname or message.message_info.user_info.user_nickname, "", ) else: diff --git a/src/plugins/chat_module/heartFC_chat/pf_chatting.py b/src/plugins/chat_module/heartFC_chat/pf_chatting.py index da6c889a9..59472fd14 100644 --- a/src/plugins/chat_module/heartFC_chat/pf_chatting.py +++ b/src/plugins/chat_module/heartFC_chat/pf_chatting.py @@ -13,7 +13,7 @@ from src.common.logger import get_module_logger, LogConfig, PFC_STYLE_CONFIG # from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config from src.plugins.chat.utils_image import image_path_to_base64 # Local import needed after move -from src.plugins.utils.timer_calculater import Timer # <--- Import Timer +from src.plugins.utils.timer_calculater import Timer # <--- Import Timer # 定义日志配置 (使用 loguru 格式) interest_log_config = LogConfig( @@ -85,7 +85,6 @@ class PFChatting: max_tokens=1000, request_type="action_planning", ) - # Internal state for loop control self._loop_timer: float = 0.0 # Remaining time for the loop in seconds @@ -213,7 +212,7 @@ class PFChatting: try: thinking_id = "" while True: - cycle_timers = {} # <--- Initialize timers dict for this cycle + cycle_timers = {} # <--- Initialize timers dict for this cycle if self.heartfc_controller.MessageManager().check_if_sending_message_exist(self.stream_id, thinking_id): # logger.info(f"{log_prefix} PFChatting: 11111111111111111111111111111111麦麦还在发消息,等会再规划") @@ -238,7 +237,7 @@ class PFChatting: planner_start_db_time = 0.0 # 初始化 try: - with Timer("Total Cycle", cycle_timers) as _total_timer: # <--- Start total cycle timer + with Timer("Total Cycle", cycle_timers) as _total_timer: # <--- Start total cycle timer # Use try_acquire pattern or timeout? await self._processing_lock.acquire() acquired_lock = True @@ -249,7 +248,7 @@ class PFChatting: # --- Planner --- # planner_result = {} - with Timer("Planner", cycle_timers): # <--- Start Planner timer + with Timer("Planner", cycle_timers): # <--- Start Planner timer planner_result = await self._planner() action = planner_result.get("action", "error") reasoning = planner_result.get("reasoning", "Planner did not provide reasoning.") @@ -280,11 +279,11 @@ class PFChatting: replier_result = None try: # --- Replier Work --- # - with Timer("Replier", cycle_timers): # <--- Start Replier timer + with Timer("Replier", cycle_timers): # <--- Start Replier timer replier_result = await self._replier_work( anchor_message=anchor_message, thinking_id=thinking_id, - reason = reasoning, + reason=reasoning, ) except Exception as e_replier: logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}") @@ -293,7 +292,7 @@ class PFChatting: if replier_result: # --- Sender Work --- # try: - with Timer("Sender", cycle_timers): # <--- Start Sender timer + with Timer("Sender", cycle_timers): # <--- Start Sender timer await self._sender( thinking_id=thinking_id, anchor_message=anchor_message, @@ -309,13 +308,15 @@ class PFChatting: logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.") self._cleanup_thinking_message(thinking_id) elif action == "emoji_reply": - logger.info(f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}") + logger.info( + f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}" + ) action_taken_this_cycle = True anchor = await self._get_anchor_message(observed_messages) if anchor: try: # --- Handle Emoji (Moved) --- # - with Timer("Emoji Handler", cycle_timers): # <--- Start Emoji timer + with Timer("Emoji Handler", cycle_timers): # <--- Start Emoji timer await self._handle_emoji(anchor, [], emoji_query) except Exception as e_emoji: logger.error(f"{log_prefix} 循环: 发送表情失败: {e_emoji}") @@ -333,7 +334,7 @@ class PFChatting: observation = self.sub_hf._get_primary_observation() if observation: - with Timer("Wait New Msg", cycle_timers): # <--- Start Wait timer + with Timer("Wait New Msg", cycle_timers): # <--- Start Wait timer wait_start_time = time.monotonic() while True: # 检查计时器是否耗尽 @@ -368,7 +369,9 @@ class PFChatting: action_taken_this_cycle = False else: # Unknown action from planner - logger.warning(f"{log_prefix} PFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}") + logger.warning( + f"{log_prefix} PFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}" + ) action_taken_this_cycle = False except Exception as e_cycle: @@ -391,9 +394,11 @@ class PFChatting: # 直接格式化存储在字典中的浮点数 elapsed formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" timer_strings.append(f"{name}: {formatted_time}") - + if timer_strings: # 如果有有效计时器数据才打印 - logger.debug(f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}") + logger.debug( + f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}" + ) # --- Timer Decrement --- # cycle_duration = time.monotonic() - loop_cycle_start_time @@ -460,7 +465,7 @@ class PFChatting: if tool_result.get("used_tools", False): tool_result_info = tool_result.get("structured_info", {}) logger.debug(f"{log_prefix}[Planner] 规划前工具结果: {tool_result_info}") - + get_mid_memory_id = [ mem["content"] for mem in tool_result_info.get("mid_chat_mem", []) if "content" in mem ] @@ -495,13 +500,10 @@ class PFChatting: "tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}}, } - - response = await self.planner_llm._execute_request( endpoint="/chat/completions", payload=payload, prompt=prompt ) - if len(response) == 3: _, _, tool_calls = response if tool_calls and isinstance(tool_calls, list) and len(tool_calls) > 0: @@ -665,7 +667,6 @@ class PFChatting: emoji_anchor = first_bot_msg if first_bot_msg else anchor_message await self._handle_emoji(emoji_anchor, response_set, send_emoji) - else: # logger.warning(f"{log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。") # 无需清理,因为_send_response_messages返回None意味着已处理/已删除 @@ -701,9 +702,7 @@ class PFChatting: async def _build_planner_prompt(self, observed_messages_str: str, current_mind: Optional[str]) -> str: """构建 Planner LLM 的提示词""" - prompt = ( - f"你的名字是 {global_config.BOT_NICKNAME}。是一个女大二学生,正在QQ聊天,正在决定是否以及如何回应当前的聊天。\n" - ) + prompt = f"你的名字是 {global_config.BOT_NICKNAME}。是一个女大二学生,正在QQ聊天,正在决定是否以及如何回应当前的聊天。\n" if observed_messages_str: prompt += "观察到的最新聊天内容如下 (最近的消息在最后):\n---\n" diff --git a/src/plugins/person_info/person_info.py b/src/plugins/person_info/person_info.py index 8f8bd5020..8105b330f 100644 --- a/src/plugins/person_info/person_info.py +++ b/src/plugins/person_info/person_info.py @@ -425,7 +425,9 @@ class PersonInfoManager: logger.error(f"个人信息推断运行时出错: {str(e)}") logger.exception("详细错误信息:") - async def get_or_create_person(self, platform: str, user_id: int, nickname: str = None, user_cardname: str = None, user_avatar: str = None) -> str: + async def get_or_create_person( + self, platform: str, user_id: int, nickname: str = None, user_cardname: str = None, user_avatar: str = None + ) -> str: """ 根据 platform 和 user_id 获取 person_id。 如果对应的用户不存在,则使用提供的可选信息创建新用户。 @@ -452,7 +454,7 @@ class PersonInfoManager: "platform": platform, "user_id": user_id, "nickname": nickname, - "konw_time": int(datetime.datetime.now().timestamp()) # 添加初次认识时间 + "konw_time": int(datetime.datetime.now().timestamp()), # 添加初次认识时间 # 注意:这里没有添加 user_cardname 和 user_avatar,因为它们不在 person_info_default 中 # 如果需要存储它们,需要先在 person_info_default 中定义 } diff --git a/src/plugins/utils/chat_message_builder.py b/src/plugins/utils/chat_message_builder.py index ac031d378..66f0776c8 100644 --- a/src/plugins/utils/chat_message_builder.py +++ b/src/plugins/utils/chat_message_builder.py @@ -1,14 +1,16 @@ from src.config.config import global_config + # 不再直接使用 db -# from src.common.database import db +# from src.common.database import db # 移除 logger 和 traceback,因为错误处理移至 repository # from src.common.logger import get_module_logger # import traceback -from typing import List, Dict, Any, Tuple # 确保类型提示被导入 -import time # 导入 time 模块以获取当前时间 +from typing import List, Dict, Any, Tuple # 确保类型提示被导入 +import time # 导入 time 模块以获取当前时间 # 导入新的 repository 函数 from src.common.message_repository import find_messages, count_messages + # 导入 PersonInfoManager 和时间转换工具 from src.plugins.person_info.person_info import person_info_manager from src.plugins.chat.utils import translate_timestamp_to_human_readable @@ -16,7 +18,10 @@ from src.plugins.chat.utils import translate_timestamp_to_human_readable # 不再需要文件级别的 logger # logger = get_module_logger(__name__) -def get_raw_msg_by_timestamp(timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]: + +def get_raw_msg_by_timestamp( + timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest" +) -> List[Dict[str, Any]]: """ 获取从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 @@ -24,62 +29,83 @@ def get_raw_msg_by_timestamp(timestamp_start: float, timestamp_end: float, limit """ filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}} # 只有当 limit 为 0 时才应用外部 sort - sort_order = [('time', 1)] if limit == 0 else None + sort_order = [("time", 1)] if limit == 0 else None return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) -def get_raw_msg_by_timestamp_with_chat(chat_id: str, timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]: + +def get_raw_msg_by_timestamp_with_chat( + chat_id: str, timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest" +) -> List[Dict[str, Any]]: """获取在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 """ filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}} # 只有当 limit 为 0 时才应用外部 sort - sort_order = [('time', 1)] if limit == 0 else None + sort_order = [("time", 1)] if limit == 0 else None # 直接将 limit_mode 传递给 find_messages return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) -def get_raw_msg_by_timestamp_with_chat_users(chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]: + +def get_raw_msg_by_timestamp_with_chat_users( + chat_id: str, + timestamp_start: float, + timestamp_end: float, + person_ids: list, + limit: int = 0, + limit_mode: str = "latest", +) -> List[Dict[str, Any]]: """获取某些特定用户在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 """ - filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} + filter_query = { + "chat_id": chat_id, + "time": {"$gt": timestamp_start, "$lt": timestamp_end}, + "user_id": {"$in": person_ids}, + } # 只有当 limit 为 0 时才应用外部 sort - sort_order = [('time', 1)] if limit == 0 else None + sort_order = [("time", 1)] if limit == 0 else None return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) -def get_raw_msg_by_timestamp_with_users(timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]: + +def get_raw_msg_by_timestamp_with_users( + timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest" +) -> List[Dict[str, Any]]: """获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'。 """ filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} # 只有当 limit 为 0 时才应用外部 sort - sort_order = [('time', 1)] if limit == 0 else None + sort_order = [("time", 1)] if limit == 0 else None return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) + def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[Dict[str, Any]]: """获取指定时间戳之前的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 """ filter_query = {"time": {"$lt": timestamp}} - sort_order = [('time', 1)] + sort_order = [("time", 1)] return find_messages(filter=filter_query, sort=sort_order, limit=limit) + def get_raw_msg_before_timestamp_with_chat(chat_id: str, timestamp: float, limit: int = 0) -> List[Dict[str, Any]]: """获取指定时间戳之前的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 """ filter_query = {"chat_id": chat_id, "time": {"$lt": timestamp}} - sort_order = [('time', 1)] + sort_order = [("time", 1)] return find_messages(filter=filter_query, sort=sort_order, limit=limit) + def get_raw_msg_before_timestamp_with_users(timestamp: float, person_ids: list, limit: int = 0) -> List[Dict[str, Any]]: """获取指定时间戳之前的消息,按时间升序排序,返回消息列表 limit: 限制返回的消息数量,0为不限制 """ filter_query = {"time": {"$lt": timestamp}, "user_id": {"$in": person_ids}} - sort_order = [('time', 1)] + sort_order = [("time", 1)] return find_messages(filter=filter_query, sort=sort_order, limit=limit) @@ -94,23 +120,31 @@ def num_new_messages_since(chat_id: str, timestamp_start: float = 0.0, timestamp # 确保 timestamp_start < _timestamp_end if timestamp_start >= _timestamp_end: # logger.warning(f"timestamp_start ({timestamp_start}) must be less than _timestamp_end ({_timestamp_end}). Returning 0.") - return 0 # 起始时间大于等于结束时间,没有新消息 + return 0 # 起始时间大于等于结束时间,没有新消息 filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}} return count_messages(filter=filter_query) -def num_new_messages_since_with_users(chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: list) -> int: + +def num_new_messages_since_with_users( + chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: list +) -> int: """检查某些特定用户在特定聊天在指定时间戳之间有多少新消息""" - if not person_ids: # 保持空列表检查 + if not person_ids: # 保持空列表检查 return 0 - filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} + filter_query = { + "chat_id": chat_id, + "time": {"$gt": timestamp_start, "$lt": timestamp_end}, + "user_id": {"$in": person_ids}, + } return count_messages(filter=filter_query) + async def _build_readable_messages_internal( messages: List[Dict[str, Any]], replace_bot_name: bool = True, merge_messages: bool = False, - timestamp_mode: str = "relative" # 新增参数控制时间戳格式 + timestamp_mode: str = "relative", # 新增参数控制时间戳格式 ) -> Tuple[str, List[Tuple[float, str, str]]]: """ 内部辅助函数,构建可读消息字符串和原始消息详情列表。 @@ -136,7 +170,7 @@ async def _build_readable_messages_internal( user_id = user_info.get("user_id") user_nickname = user_info.get("nickname") timestamp = msg.get("time") - content = msg.get("processed_plain_text", "") # 默认空字符串 + content = msg.get("processed_plain_text", "") # 默认空字符串 # 检查必要信息是否存在 if not all([platform, user_id, timestamp is not None]): @@ -158,7 +192,7 @@ async def _build_readable_messages_internal( if not message_details: return "", [] - + message_details.sort(key=lambda x: x[0]) # 按时间戳(第一个元素)升序排序,越早的消息排在前面 # 3: 合并连续消息 (如果 merge_messages 为 True) @@ -169,7 +203,7 @@ async def _build_readable_messages_internal( "name": message_details[0][1], "start_time": message_details[0][0], "end_time": message_details[0][0], - "content": [message_details[0][2]] + "content": [message_details[0][2]], } for i in range(1, len(message_details)): @@ -177,27 +211,24 @@ async def _build_readable_messages_internal( # 如果是同一个人发送的连续消息且时间间隔小于等于60秒 if name == current_merge["name"] and (timestamp - current_merge["end_time"] <= 60): current_merge["content"].append(content) - current_merge["end_time"] = timestamp # 更新最后消息时间 + current_merge["end_time"] = timestamp # 更新最后消息时间 else: # 保存上一个合并块 merged_messages.append(current_merge) # 开始新的合并块 - current_merge = { - "name": name, - "start_time": timestamp, - "end_time": timestamp, - "content": [content] - } + current_merge = {"name": name, "start_time": timestamp, "end_time": timestamp, "content": [content]} # 添加最后一个合并块 merged_messages.append(current_merge) - elif message_details: # 如果不合并消息,则每个消息都是一个独立的块 + elif message_details: # 如果不合并消息,则每个消息都是一个独立的块 for timestamp, name, content in message_details: - merged_messages.append({ - "name": name, - "start_time": timestamp, # 起始和结束时间相同 - "end_time": timestamp, - "content": [content] # 内容只有一个元素 - }) + merged_messages.append( + { + "name": name, + "start_time": timestamp, # 起始和结束时间相同 + "end_time": timestamp, + "content": [content], # 内容只有一个元素 + } + ) # 4 & 5: 格式化为字符串 output_lines = [] @@ -220,11 +251,12 @@ async def _build_readable_messages_internal( # 返回格式化后的字符串和原始的 message_details 列表 return formatted_string, message_details + async def build_readable_messages_with_list( messages: List[Dict[str, Any]], replace_bot_name: bool = True, merge_messages: bool = False, - timestamp_mode: str = "relative" + timestamp_mode: str = "relative", ) -> Tuple[str, List[Tuple[float, str, str]]]: """ 将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。 @@ -235,11 +267,12 @@ async def build_readable_messages_with_list( ) return formatted_string, details_list + async def build_readable_messages( messages: List[Dict[str, Any]], replace_bot_name: bool = True, merge_messages: bool = False, - timestamp_mode: str = "relative" + timestamp_mode: str = "relative", ) -> str: """ 将消息列表转换为可读的文本格式。 @@ -249,7 +282,3 @@ async def build_readable_messages( messages, replace_bot_name, merge_messages, timestamp_mode ) return formatted_string - - - -