ruff:format

This commit is contained in:
SengokuCola
2025-04-21 00:44:28 +08:00
parent 4223f054cc
commit 67c8beb558
9 changed files with 159 additions and 122 deletions

View File

@@ -5,7 +5,10 @@ from typing import List, Dict, Any, Optional
logger = get_module_logger(__name__) logger = get_module_logger(__name__)
def find_messages(filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] = None, limit: int = 0, limit_mode: str = 'latest') -> List[Dict[str, Any]]:
def find_messages(
filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]] = None, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
""" """
根据提供的过滤器、排序和限制条件查找消息。 根据提供的过滤器、排序和限制条件查找消息。
@@ -23,17 +26,17 @@ def find_messages(filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]]
results: List[Dict[str, Any]] = [] results: List[Dict[str, Any]] = []
if limit > 0: if limit > 0:
if limit_mode == 'earliest': if limit_mode == "earliest":
# 获取时间最早的 limit 条记录,已经是正序 # 获取时间最早的 limit 条记录,已经是正序
query = query.sort([('time', 1)]).limit(limit) query = query.sort([("time", 1)]).limit(limit)
results = list(query) results = list(query)
else: # 默认为 'latest' else: # 默认为 'latest'
# 获取时间最晚的 limit 条记录 # 获取时间最晚的 limit 条记录
query = query.sort([('time', -1)]).limit(limit) query = query.sort([("time", -1)]).limit(limit)
latest_results = list(query) latest_results = list(query)
# 将结果按时间正序排列 # 将结果按时间正序排列
# 假设消息文档中总是有 'time' 字段且可排序 # 假设消息文档中总是有 'time' 字段且可排序
results = sorted(latest_results, key=lambda msg: msg.get('time')) results = sorted(latest_results, key=lambda msg: msg.get("time"))
else: else:
# limit 为 0 时,应用传入的 sort 参数 # limit 为 0 时,应用传入的 sort 参数
if sort: if sort:
@@ -42,10 +45,14 @@ def find_messages(filter: Dict[str, Any], sort: Optional[List[tuple[str, int]]]
return results return results
except Exception as e: except Exception as e:
log_message = f"查找消息失败 (filter={filter}, sort={sort}, limit={limit}, limit_mode={limit_mode}): {e}\n" + traceback.format_exc() log_message = (
f"查找消息失败 (filter={filter}, sort={sort}, limit={limit}, limit_mode={limit_mode}): {e}\n"
+ traceback.format_exc()
)
logger.error(log_message) logger.error(log_message)
return [] return []
def count_messages(filter: Dict[str, Any]) -> int: def count_messages(filter: Dict[str, Any]) -> int:
""" """
根据提供的过滤器计算消息数量。 根据提供的过滤器计算消息数量。
@@ -64,4 +71,5 @@ def count_messages(filter: Dict[str, Any]) -> int:
logger.error(log_message) logger.error(log_message)
return 0 return 0
# 你可以在这里添加更多与 messages 集合相关的数据库操作函数,例如 find_one_message, insert_message 等。 # 你可以在这里添加更多与 messages 集合相关的数据库操作函数,例如 find_one_message, insert_message 等。

View File

@@ -5,7 +5,12 @@ from src.plugins.models.utils_model import LLMRequest
from src.config.config import global_config from src.config.config import global_config
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
import traceback import traceback
from src.plugins.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages,get_raw_msg_by_timestamp_with_chat,num_new_messages_since from src.plugins.utils.chat_message_builder import (
get_raw_msg_before_timestamp_with_chat,
build_readable_messages,
get_raw_msg_by_timestamp_with_chat,
num_new_messages_since,
)
logger = get_module_logger("observation") logger = get_module_logger("observation")
@@ -46,7 +51,6 @@ class ChattingObservation(Observation):
self.talking_message = initial_messages # 将这些消息设为初始上下文 self.talking_message = initial_messages # 将这些消息设为初始上下文
self.talking_message_str = await build_readable_messages(self.talking_message) self.talking_message_str = await build_readable_messages(self.talking_message)
# 进行一次观察 返回观察结果observe_info # 进行一次观察 返回观察结果observe_info
def get_observe_info(self, ids=None): def get_observe_info(self, ids=None):
if ids: if ids:
@@ -81,7 +85,7 @@ class ChattingObservation(Observation):
timestamp_start=self.last_observe_time, timestamp_start=self.last_observe_time,
timestamp_end=datetime.now().timestamp(), # 使用当前时间作为结束时间戳 timestamp_end=datetime.now().timestamp(), # 使用当前时间作为结束时间戳
limit=self.max_now_obs_len, limit=self.max_now_obs_len,
limit_mode="latest" limit_mode="latest",
) )
print(f"2222222222222222221111111111111111获取到新消息{len(new_messages_list)}") print(f"2222222222222222221111111111111111获取到新消息{len(new_messages_list)}")
if new_messages_list: # 检查列表是否为空 if new_messages_list: # 检查列表是否为空
@@ -97,7 +101,9 @@ class ChattingObservation(Observation):
oldest_messages_str = await build_readable_messages(oldest_messages) oldest_messages_str = await build_readable_messages(oldest_messages)
# 调用 LLM 总结主题 # 调用 LLM 总结主题
prompt = f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n用一句话概括包括人物事件和主要信息,不要分点:" prompt = (
f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n用一句话概括包括人物事件和主要信息,不要分点:"
)
summary = "没有主题的闲聊" # 默认值 summary = "没有主题的闲聊" # 默认值
try: try:
summary_result, _ = await self.llm_summary.generate_response_async(prompt) summary_result, _ = await self.llm_summary.generate_response_async(prompt)

View File

@@ -121,7 +121,6 @@ class SubHeartflow:
logger.error(f"[{self.subheartflow_id}] Error during pre-thinking observation: {e}") logger.error(f"[{self.subheartflow_id}] Error during pre-thinking observation: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
async def do_thinking_before_reply( async def do_thinking_before_reply(
self, self,
extra_info: str, extra_info: str,
@@ -177,7 +176,6 @@ class SubHeartflow:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 创建局部Random对象避免影响全局随机状态 # 创建局部Random对象避免影响全局随机状态
local_random = random.Random() local_random = random.Random()
current_minute = int(time.strftime("%M")) current_minute = int(time.strftime("%M"))
@@ -187,16 +185,13 @@ class SubHeartflow:
("继续生成你在这个聊天中的想法,在原来想法的基础上继续思考", 0.7), ("继续生成你在这个聊天中的想法,在原来想法的基础上继续思考", 0.7),
("生成你在这个聊天中的想法,在原来的想法上尝试新的话题", 0.1), ("生成你在这个聊天中的想法,在原来的想法上尝试新的话题", 0.1),
("生成你在这个聊天中的想法,不要太深入", 0.1), ("生成你在这个聊天中的想法,不要太深入", 0.1),
("继续生成你在这个聊天中的想法,进行深入思考", 0.1) ("继续生成你在这个聊天中的想法,进行深入思考", 0.1),
] ]
hf_do_next = local_random.choices( hf_do_next = local_random.choices(
[option[0] for option in hf_options], [option[0] for option in hf_options], weights=[option[1] for option in hf_options], k=1
weights=[option[1] for option in hf_options],
k=1
)[0] )[0]
prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_before")).format( prompt = (await global_prompt_manager.get_prompt_async("sub_heartflow_prompt_before")).format(
extra_info=extra_info_prompt, extra_info=extra_info_prompt,
# relation_prompt_all=relation_prompt_all, # relation_prompt_all=relation_prompt_all,
@@ -235,7 +230,6 @@ class SubHeartflow:
# logger.info(f"[{self.subheartflow_id}] 思考前脑内状态:{self.current_mind}") # logger.info(f"[{self.subheartflow_id}] 思考前脑内状态:{self.current_mind}")
return self.current_mind, self.past_mind return self.current_mind, self.past_mind
def update_current_mind(self, response): def update_current_mind(self, response):
self.past_mind.append(self.current_mind) self.past_mind.append(self.current_mind)
self.current_mind = response self.current_mind = response

View File

@@ -23,7 +23,7 @@ logger = get_module_logger("chat_utils")
def is_english_letter(char: str) -> bool: def is_english_letter(char: str) -> bool:
"""检查字符是否为英文字母(忽略大小写)""" """检查字符是否为英文字母(忽略大小写)"""
return 'a' <= char.lower() <= 'z' return "a" <= char.lower() <= "z"
def db_message_to_str(message_dict: Dict) -> str: def db_message_to_str(message_dict: Dict) -> str:
@@ -233,7 +233,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
List[str]: 分割和合并后的句子列表 List[str]: 分割和合并后的句子列表
""" """
# 处理两个汉字中间的换行符 # 处理两个汉字中间的换行符
text = re.sub(r'([\u4e00-\u9fff])\n([\u4e00-\u9fff])', r'\1。\2', text) text = re.sub(r"([\u4e00-\u9fff])\n([\u4e00-\u9fff])", r"\1。\2", text)
len_text = len(text) len_text = len(text)
if len_text < 3: if len_text < 3:
@@ -243,7 +243,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
return [text] return [text]
# 定义分隔符 # 定义分隔符
separators = {'', ',', ' ', '', ';'} separators = {"", ",", " ", "", ";"}
segments = [] segments = []
current_segment = "" current_segment = ""
@@ -255,8 +255,8 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
# 检查分割条件:如果分隔符左右都是英文字母,则不分割 # 检查分割条件:如果分隔符左右都是英文字母,则不分割
can_split = True can_split = True
if i > 0 and i < len(text) - 1: if i > 0 and i < len(text) - 1:
prev_char = text[i-1] prev_char = text[i - 1]
next_char = text[i+1] next_char = text[i + 1]
# if is_english_letter(prev_char) and is_english_letter(next_char) and char == ' ': # 原计划只对空格应用此规则,现应用于所有分隔符 # if is_english_letter(prev_char) and is_english_letter(next_char) and char == ' ': # 原计划只对空格应用此规则,现应用于所有分隔符
if is_english_letter(prev_char) and is_english_letter(next_char): if is_english_letter(prev_char) and is_english_letter(next_char):
can_split = False can_split = False
@@ -266,7 +266,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
if current_segment: if current_segment:
segments.append((current_segment, char)) segments.append((current_segment, char))
# 如果当前段为空,但分隔符是空格,则也添加一个空段(保留空格) # 如果当前段为空,但分隔符是空格,则也添加一个空段(保留空格)
elif char == ' ': elif char == " ":
segments.append(("", char)) segments.append(("", char))
current_segment = "" current_segment = ""
else: else:
@@ -307,7 +307,7 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
# 检查是否可以与下一段合并 # 检查是否可以与下一段合并
# 条件:不是最后一段,且随机数小于合并概率,且当前段有内容(避免合并空段) # 条件:不是最后一段,且随机数小于合并概率,且当前段有内容(避免合并空段)
if idx + 1 < len(segments) and random.random() < merge_probability and current_content: if idx + 1 < len(segments) and random.random() < merge_probability and current_content:
next_content, next_sep = segments[idx+1] next_content, next_sep = segments[idx + 1]
# 合并: (内容1 + 分隔符1 + 内容2, 分隔符2) # 合并: (内容1 + 分隔符1 + 内容2, 分隔符2)
# 只有当下一段也有内容时才合并文本,否则只传递分隔符 # 只有当下一段也有内容时才合并文本,否则只传递分隔符
if next_content: if next_content:

View File

@@ -177,8 +177,7 @@ class HeartFC_Processor:
message.message_info.platform, message.message_info.platform,
message.message_info.user_info.user_id, message.message_info.user_info.user_id,
message.message_info.user_info.user_nickname, message.message_info.user_info.user_nickname,
message.message_info.user_info.user_cardname message.message_info.user_info.user_cardname or message.message_info.user_info.user_nickname,
or message.message_info.user_info.user_nickname,
"", "",
) )
else: else:

View File

@@ -86,7 +86,6 @@ class PFChatting:
request_type="action_planning", request_type="action_planning",
) )
# Internal state for loop control # Internal state for loop control
self._loop_timer: float = 0.0 # Remaining time for the loop in seconds self._loop_timer: float = 0.0 # Remaining time for the loop in seconds
self._loop_active: bool = False # Is the loop currently running? self._loop_active: bool = False # Is the loop currently running?
@@ -284,7 +283,7 @@ class PFChatting:
replier_result = await self._replier_work( replier_result = await self._replier_work(
anchor_message=anchor_message, anchor_message=anchor_message,
thinking_id=thinking_id, thinking_id=thinking_id,
reason = reasoning, reason=reasoning,
) )
except Exception as e_replier: except Exception as e_replier:
logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}") logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}")
@@ -309,7 +308,9 @@ class PFChatting:
logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.") logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.")
self._cleanup_thinking_message(thinking_id) self._cleanup_thinking_message(thinking_id)
elif action == "emoji_reply": elif action == "emoji_reply":
logger.info(f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}") logger.info(
f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}"
)
action_taken_this_cycle = True action_taken_this_cycle = True
anchor = await self._get_anchor_message(observed_messages) anchor = await self._get_anchor_message(observed_messages)
if anchor: if anchor:
@@ -368,7 +369,9 @@ class PFChatting:
action_taken_this_cycle = False action_taken_this_cycle = False
else: # Unknown action from planner else: # Unknown action from planner
logger.warning(f"{log_prefix} PFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}") logger.warning(
f"{log_prefix} PFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}"
)
action_taken_this_cycle = False action_taken_this_cycle = False
except Exception as e_cycle: except Exception as e_cycle:
@@ -393,7 +396,9 @@ class PFChatting:
timer_strings.append(f"{name}: {formatted_time}") timer_strings.append(f"{name}: {formatted_time}")
if timer_strings: # 如果有有效计时器数据才打印 if timer_strings: # 如果有有效计时器数据才打印
logger.debug(f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}") logger.debug(
f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}"
)
# --- Timer Decrement --- # # --- Timer Decrement --- #
cycle_duration = time.monotonic() - loop_cycle_start_time cycle_duration = time.monotonic() - loop_cycle_start_time
@@ -495,13 +500,10 @@ class PFChatting:
"tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}}, "tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}},
} }
response = await self.planner_llm._execute_request( response = await self.planner_llm._execute_request(
endpoint="/chat/completions", payload=payload, prompt=prompt endpoint="/chat/completions", payload=payload, prompt=prompt
) )
if len(response) == 3: if len(response) == 3:
_, _, tool_calls = response _, _, tool_calls = response
if tool_calls and isinstance(tool_calls, list) and len(tool_calls) > 0: if tool_calls and isinstance(tool_calls, list) and len(tool_calls) > 0:
@@ -665,7 +667,6 @@ class PFChatting:
emoji_anchor = first_bot_msg if first_bot_msg else anchor_message emoji_anchor = first_bot_msg if first_bot_msg else anchor_message
await self._handle_emoji(emoji_anchor, response_set, send_emoji) await self._handle_emoji(emoji_anchor, response_set, send_emoji)
else: else:
# logger.warning(f"{log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。") # logger.warning(f"{log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。")
# 无需清理因为_send_response_messages返回None意味着已处理/已删除 # 无需清理因为_send_response_messages返回None意味着已处理/已删除
@@ -701,9 +702,7 @@ class PFChatting:
async def _build_planner_prompt(self, observed_messages_str: str, current_mind: Optional[str]) -> str: async def _build_planner_prompt(self, observed_messages_str: str, current_mind: Optional[str]) -> str:
"""构建 Planner LLM 的提示词""" """构建 Planner LLM 的提示词"""
prompt = ( prompt = f"你的名字是 {global_config.BOT_NICKNAME}。是一个女大二学生正在QQ聊天正在决定是否以及如何回应当前的聊天。\n"
f"你的名字是 {global_config.BOT_NICKNAME}。是一个女大二学生正在QQ聊天正在决定是否以及如何回应当前的聊天。\n"
)
if observed_messages_str: if observed_messages_str:
prompt += "观察到的最新聊天内容如下 (最近的消息在最后)\n---\n" prompt += "观察到的最新聊天内容如下 (最近的消息在最后)\n---\n"

View File

@@ -425,7 +425,9 @@ class PersonInfoManager:
logger.error(f"个人信息推断运行时出错: {str(e)}") logger.error(f"个人信息推断运行时出错: {str(e)}")
logger.exception("详细错误信息:") logger.exception("详细错误信息:")
async def get_or_create_person(self, platform: str, user_id: int, nickname: str = None, user_cardname: str = None, user_avatar: str = None) -> str: async def get_or_create_person(
self, platform: str, user_id: int, nickname: str = None, user_cardname: str = None, user_avatar: str = None
) -> str:
""" """
根据 platform 和 user_id 获取 person_id。 根据 platform 和 user_id 获取 person_id。
如果对应的用户不存在,则使用提供的可选信息创建新用户。 如果对应的用户不存在,则使用提供的可选信息创建新用户。
@@ -452,7 +454,7 @@ class PersonInfoManager:
"platform": platform, "platform": platform,
"user_id": user_id, "user_id": user_id,
"nickname": nickname, "nickname": nickname,
"konw_time": int(datetime.datetime.now().timestamp()) # 添加初次认识时间 "konw_time": int(datetime.datetime.now().timestamp()), # 添加初次认识时间
# 注意:这里没有添加 user_cardname 和 user_avatar因为它们不在 person_info_default 中 # 注意:这里没有添加 user_cardname 和 user_avatar因为它们不在 person_info_default 中
# 如果需要存储它们,需要先在 person_info_default 中定义 # 如果需要存储它们,需要先在 person_info_default 中定义
} }

View File

@@ -1,4 +1,5 @@
from src.config.config import global_config from src.config.config import global_config
# 不再直接使用 db # 不再直接使用 db
# from src.common.database import db # from src.common.database import db
# 移除 logger 和 traceback因为错误处理移至 repository # 移除 logger 和 traceback因为错误处理移至 repository
@@ -9,6 +10,7 @@ import time # 导入 time 模块以获取当前时间
# 导入新的 repository 函数 # 导入新的 repository 函数
from src.common.message_repository import find_messages, count_messages from src.common.message_repository import find_messages, count_messages
# 导入 PersonInfoManager 和时间转换工具 # 导入 PersonInfoManager 和时间转换工具
from src.plugins.person_info.person_info import person_info_manager from src.plugins.person_info.person_info import person_info_manager
from src.plugins.chat.utils import translate_timestamp_to_human_readable from src.plugins.chat.utils import translate_timestamp_to_human_readable
@@ -16,7 +18,10 @@ from src.plugins.chat.utils import translate_timestamp_to_human_readable
# 不再需要文件级别的 logger # 不再需要文件级别的 logger
# logger = get_module_logger(__name__) # logger = get_module_logger(__name__)
def get_raw_msg_by_timestamp(timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]:
def get_raw_msg_by_timestamp(
timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
""" """
获取从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 获取从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
@@ -24,62 +29,83 @@ def get_raw_msg_by_timestamp(timestamp_start: float, timestamp_end: float, limit
""" """
filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}} filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}}
# 只有当 limit 为 0 时才应用外部 sort # 只有当 limit 为 0 时才应用外部 sort
sort_order = [('time', 1)] if limit == 0 else None sort_order = [("time", 1)] if limit == 0 else None
return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
def get_raw_msg_by_timestamp_with_chat(chat_id: str, timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]:
def get_raw_msg_by_timestamp_with_chat(
chat_id: str, timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
"""获取在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 """获取在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest' limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
""" """
filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}} filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}}
# 只有当 limit 为 0 时才应用外部 sort # 只有当 limit 为 0 时才应用外部 sort
sort_order = [('time', 1)] if limit == 0 else None sort_order = [("time", 1)] if limit == 0 else None
# 直接将 limit_mode 传递给 find_messages # 直接将 limit_mode 传递给 find_messages
return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
def get_raw_msg_by_timestamp_with_chat_users(chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]:
def get_raw_msg_by_timestamp_with_chat_users(
chat_id: str,
timestamp_start: float,
timestamp_end: float,
person_ids: list,
limit: int = 0,
limit_mode: str = "latest",
) -> List[Dict[str, Any]]:
"""获取某些特定用户在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 """获取某些特定用户在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest' limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
""" """
filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} filter_query = {
"chat_id": chat_id,
"time": {"$gt": timestamp_start, "$lt": timestamp_end},
"user_id": {"$in": person_ids},
}
# 只有当 limit 为 0 时才应用外部 sort # 只有当 limit 为 0 时才应用外部 sort
sort_order = [('time', 1)] if limit == 0 else None sort_order = [("time", 1)] if limit == 0 else None
return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
def get_raw_msg_by_timestamp_with_users(timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest") -> List[Dict[str, Any]]:
def get_raw_msg_by_timestamp_with_users(
timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
"""获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表 """获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest' limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
""" """
filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} filter_query = {"time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}}
# 只有当 limit 为 0 时才应用外部 sort # 只有当 limit 为 0 时才应用外部 sort
sort_order = [('time', 1)] if limit == 0 else None sort_order = [("time", 1)] if limit == 0 else None
return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode) return find_messages(filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[Dict[str, Any]]: def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[Dict[str, Any]]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表 """获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
""" """
filter_query = {"time": {"$lt": timestamp}} filter_query = {"time": {"$lt": timestamp}}
sort_order = [('time', 1)] sort_order = [("time", 1)]
return find_messages(filter=filter_query, sort=sort_order, limit=limit) return find_messages(filter=filter_query, sort=sort_order, limit=limit)
def get_raw_msg_before_timestamp_with_chat(chat_id: str, timestamp: float, limit: int = 0) -> List[Dict[str, Any]]: def get_raw_msg_before_timestamp_with_chat(chat_id: str, timestamp: float, limit: int = 0) -> List[Dict[str, Any]]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表 """获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
""" """
filter_query = {"chat_id": chat_id, "time": {"$lt": timestamp}} filter_query = {"chat_id": chat_id, "time": {"$lt": timestamp}}
sort_order = [('time', 1)] sort_order = [("time", 1)]
return find_messages(filter=filter_query, sort=sort_order, limit=limit) return find_messages(filter=filter_query, sort=sort_order, limit=limit)
def get_raw_msg_before_timestamp_with_users(timestamp: float, person_ids: list, limit: int = 0) -> List[Dict[str, Any]]: def get_raw_msg_before_timestamp_with_users(timestamp: float, person_ids: list, limit: int = 0) -> List[Dict[str, Any]]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表 """获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制 limit: 限制返回的消息数量0为不限制
""" """
filter_query = {"time": {"$lt": timestamp}, "user_id": {"$in": person_ids}} filter_query = {"time": {"$lt": timestamp}, "user_id": {"$in": person_ids}}
sort_order = [('time', 1)] sort_order = [("time", 1)]
return find_messages(filter=filter_query, sort=sort_order, limit=limit) return find_messages(filter=filter_query, sort=sort_order, limit=limit)
@@ -99,18 +125,26 @@ def num_new_messages_since(chat_id: str, timestamp_start: float = 0.0, timestamp
filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}} filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": _timestamp_end}}
return count_messages(filter=filter_query) return count_messages(filter=filter_query)
def num_new_messages_since_with_users(chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: list) -> int:
def num_new_messages_since_with_users(
chat_id: str, timestamp_start: float, timestamp_end: float, person_ids: list
) -> int:
"""检查某些特定用户在特定聊天在指定时间戳之间有多少新消息""" """检查某些特定用户在特定聊天在指定时间戳之间有多少新消息"""
if not person_ids: # 保持空列表检查 if not person_ids: # 保持空列表检查
return 0 return 0
filter_query = {"chat_id": chat_id, "time": {"$gt": timestamp_start, "$lt": timestamp_end}, "user_id": {"$in": person_ids}} filter_query = {
"chat_id": chat_id,
"time": {"$gt": timestamp_start, "$lt": timestamp_end},
"user_id": {"$in": person_ids},
}
return count_messages(filter=filter_query) return count_messages(filter=filter_query)
async def _build_readable_messages_internal( async def _build_readable_messages_internal(
messages: List[Dict[str, Any]], messages: List[Dict[str, Any]],
replace_bot_name: bool = True, replace_bot_name: bool = True,
merge_messages: bool = False, merge_messages: bool = False,
timestamp_mode: str = "relative" # 新增参数控制时间戳格式 timestamp_mode: str = "relative", # 新增参数控制时间戳格式
) -> Tuple[str, List[Tuple[float, str, str]]]: ) -> Tuple[str, List[Tuple[float, str, str]]]:
""" """
内部辅助函数,构建可读消息字符串和原始消息详情列表。 内部辅助函数,构建可读消息字符串和原始消息详情列表。
@@ -169,7 +203,7 @@ async def _build_readable_messages_internal(
"name": message_details[0][1], "name": message_details[0][1],
"start_time": message_details[0][0], "start_time": message_details[0][0],
"end_time": message_details[0][0], "end_time": message_details[0][0],
"content": [message_details[0][2]] "content": [message_details[0][2]],
} }
for i in range(1, len(message_details)): for i in range(1, len(message_details)):
@@ -182,22 +216,19 @@ async def _build_readable_messages_internal(
# 保存上一个合并块 # 保存上一个合并块
merged_messages.append(current_merge) merged_messages.append(current_merge)
# 开始新的合并块 # 开始新的合并块
current_merge = { current_merge = {"name": name, "start_time": timestamp, "end_time": timestamp, "content": [content]}
"name": name,
"start_time": timestamp,
"end_time": timestamp,
"content": [content]
}
# 添加最后一个合并块 # 添加最后一个合并块
merged_messages.append(current_merge) merged_messages.append(current_merge)
elif message_details: # 如果不合并消息,则每个消息都是一个独立的块 elif message_details: # 如果不合并消息,则每个消息都是一个独立的块
for timestamp, name, content in message_details: for timestamp, name, content in message_details:
merged_messages.append({ merged_messages.append(
{
"name": name, "name": name,
"start_time": timestamp, # 起始和结束时间相同 "start_time": timestamp, # 起始和结束时间相同
"end_time": timestamp, "end_time": timestamp,
"content": [content] # 内容只有一个元素 "content": [content], # 内容只有一个元素
}) }
)
# 4 & 5: 格式化为字符串 # 4 & 5: 格式化为字符串
output_lines = [] output_lines = []
@@ -220,11 +251,12 @@ async def _build_readable_messages_internal(
# 返回格式化后的字符串和原始的 message_details 列表 # 返回格式化后的字符串和原始的 message_details 列表
return formatted_string, message_details return formatted_string, message_details
async def build_readable_messages_with_list( async def build_readable_messages_with_list(
messages: List[Dict[str, Any]], messages: List[Dict[str, Any]],
replace_bot_name: bool = True, replace_bot_name: bool = True,
merge_messages: bool = False, merge_messages: bool = False,
timestamp_mode: str = "relative" timestamp_mode: str = "relative",
) -> Tuple[str, List[Tuple[float, str, str]]]: ) -> Tuple[str, List[Tuple[float, str, str]]]:
""" """
将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。 将消息列表转换为可读的文本格式,并返回原始(时间戳, 昵称, 内容)列表。
@@ -235,11 +267,12 @@ async def build_readable_messages_with_list(
) )
return formatted_string, details_list return formatted_string, details_list
async def build_readable_messages( async def build_readable_messages(
messages: List[Dict[str, Any]], messages: List[Dict[str, Any]],
replace_bot_name: bool = True, replace_bot_name: bool = True,
merge_messages: bool = False, merge_messages: bool = False,
timestamp_mode: str = "relative" timestamp_mode: str = "relative",
) -> str: ) -> str:
""" """
将消息列表转换为可读的文本格式。 将消息列表转换为可读的文本格式。
@@ -249,7 +282,3 @@ async def build_readable_messages(
messages, replace_bot_name, merge_messages, timestamp_mode messages, replace_bot_name, merge_messages, timestamp_mode
) )
return formatted_string return formatted_string