🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-06-19 12:22:36 +00:00
parent 0467f97e7c
commit 7ed3ecb561
26 changed files with 450 additions and 450 deletions

View File

@@ -15,7 +15,7 @@ from src.plugin_system.apis import (
message_api,
person_api,
send_api,
utils_api
utils_api,
)
# 导出所有API模块使它们可以通过 apis.xxx 方式访问
@@ -29,5 +29,5 @@ __all__ = [
"message_api",
"person_api",
"send_api",
"utils_api"
"utils_api",
]

View File

@@ -6,7 +6,7 @@
from src.plugin_system.apis import chat_api
streams = chat_api.get_all_group_streams()
chat_type = chat_api.get_stream_type(stream)
或者:
from src.plugin_system.apis.chat_api import ChatManager as chat
streams = chat.get_all_group_streams()
@@ -24,14 +24,14 @@ logger = get_logger("chat_api")
class ChatManager:
"""聊天管理器 - 专门负责聊天信息的查询和管理"""
@staticmethod
def get_all_streams(platform: str = "qq") -> List[ChatStream]:
"""获取所有聊天流
Args:
platform: 平台筛选,默认为"qq"
Returns:
List[ChatStream]: 聊天流列表
"""
@@ -44,14 +44,14 @@ class ChatManager:
except Exception as e:
logger.error(f"[ChatAPI] 获取聊天流失败: {e}")
return streams
@staticmethod
def get_group_streams(platform: str = "qq") -> List[ChatStream]:
"""获取所有群聊聊天流
Args:
platform: 平台筛选,默认为"qq"
Returns:
List[ChatStream]: 群聊聊天流列表
"""
@@ -64,14 +64,14 @@ class ChatManager:
except Exception as e:
logger.error(f"[ChatAPI] 获取群聊流失败: {e}")
return streams
@staticmethod
def get_private_streams(platform: str = "qq") -> List[ChatStream]:
"""获取所有私聊聊天流
Args:
platform: 平台筛选,默认为"qq"
Returns:
List[ChatStream]: 私聊聊天流列表
"""
@@ -84,15 +84,15 @@ class ChatManager:
except Exception as e:
logger.error(f"[ChatAPI] 获取私聊流失败: {e}")
return streams
@staticmethod
def get_stream_by_group_id(group_id: str, platform: str = "qq") -> Optional[ChatStream]:
"""根据群ID获取聊天流
Args:
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
Optional[ChatStream]: 聊天流对象如果未找到返回None
"""
@@ -109,15 +109,15 @@ class ChatManager:
except Exception as e:
logger.error(f"[ChatAPI] 查找群聊流失败: {e}")
return None
@staticmethod
def get_stream_by_user_id(user_id: str, platform: str = "qq") -> Optional[ChatStream]:
"""根据用户ID获取私聊流
Args:
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
Optional[ChatStream]: 聊天流对象如果未找到返回None
"""
@@ -134,74 +134,78 @@ class ChatManager:
except Exception as e:
logger.error(f"[ChatAPI] 查找私聊流失败: {e}")
return None
@staticmethod
def get_stream_type(chat_stream: ChatStream) -> str:
"""获取聊天流类型
Args:
chat_stream: 聊天流对象
Returns:
str: 聊天类型 ("group", "private", "unknown")
"""
if not chat_stream:
return "unknown"
if hasattr(chat_stream, "group_info"):
return "group" if chat_stream.group_info else "private"
return "unknown"
@staticmethod
def get_stream_info(chat_stream: ChatStream) -> Dict[str, Any]:
"""获取聊天流详细信息
Args:
chat_stream: 聊天流对象
Returns:
Dict[str, Any]: 聊天流信息字典
"""
if not chat_stream:
return {}
try:
info = {
"stream_id": chat_stream.stream_id,
"platform": chat_stream.platform,
"type": ChatManager.get_stream_type(chat_stream),
}
if chat_stream.group_info:
info.update({
"group_id": chat_stream.group_info.group_id,
"group_name": getattr(chat_stream.group_info, "group_name", "未知群聊"),
})
info.update(
{
"group_id": chat_stream.group_info.group_id,
"group_name": getattr(chat_stream.group_info, "group_name", "未知群聊"),
}
)
if chat_stream.user_info:
info.update({
"user_id": chat_stream.user_info.user_id,
"user_name": chat_stream.user_info.user_nickname,
})
info.update(
{
"user_id": chat_stream.user_info.user_id,
"user_name": chat_stream.user_info.user_nickname,
}
)
return info
except Exception as e:
logger.error(f"[ChatAPI] 获取聊天流信息失败: {e}")
return {}
@staticmethod
def get_recent_messages_from_obs(observations: List[Any], count: int = 5) -> List[Dict[str, Any]]:
"""从观察对象获取最近的消息
Args:
observations: 观察对象列表
count: 要获取的消息数量
Returns:
List[Dict]: 消息列表,每个消息包含发送者、内容等信息
"""
messages = []
try:
if observations and len(observations) > 0:
obs = observations[0]
@@ -219,13 +223,13 @@ class ChatManager:
logger.debug(f"[ChatAPI] 获取到 {len(messages)} 条最近消息")
except Exception as e:
logger.error(f"[ChatAPI] 获取最近消息失败: {e}")
return messages
@staticmethod
def get_streams_summary() -> Dict[str, int]:
"""获取聊天流统计摘要
Returns:
Dict[str, int]: 包含各种统计信息的字典
"""
@@ -233,14 +237,14 @@ class ChatManager:
all_streams = ChatManager.get_all_streams()
group_streams = ChatManager.get_group_streams()
private_streams = ChatManager.get_private_streams()
summary = {
"total_streams": len(all_streams),
"group_streams": len(group_streams),
"private_streams": len(private_streams),
"qq_streams": len([s for s in all_streams if s.platform == "qq"]),
}
logger.debug(f"[ChatAPI] 聊天流统计: {summary}")
return summary
except Exception as e:
@@ -252,6 +256,7 @@ class ChatManager:
# 模块级别的便捷函数 - 类似 requests.get(), requests.post() 的设计
# =============================================================================
def get_all_streams(platform: str = "qq") -> List[ChatStream]:
"""获取所有聊天流的便捷函数"""
return ChatManager.get_all_streams(platform)
@@ -289,4 +294,4 @@ def get_stream_info(chat_stream: ChatStream) -> Dict[str, Any]:
def get_streams_summary() -> Dict[str, int]:
"""获取聊天流统计摘要的便捷函数"""
return ChatManager.get_streams_summary()
return ChatManager.get_streams_summary()

View File

@@ -19,6 +19,7 @@ logger = get_logger("config_api")
# 配置访问API函数
# =============================================================================
def get_global_config(key: str, default: Any = None) -> Any:
"""
安全地从全局配置中获取一个值。
@@ -34,7 +35,7 @@ def get_global_config(key: str, default: Any = None) -> Any:
# 支持嵌套键访问
keys = key.split(".")
current = global_config
try:
for k in keys:
if hasattr(current, k):
@@ -79,6 +80,7 @@ def get_plugin_config(plugin_config: dict, key: str, default: Any = None) -> Any
# 用户信息API函数
# =============================================================================
async def get_user_id_by_person_name(person_name: str) -> tuple[str, str]:
"""根据用户名获取用户ID

View File

@@ -18,6 +18,7 @@ logger = get_logger("database_api")
# 通用数据库查询API函数
# =============================================================================
async def db_query(
model_class: Type[Model],
query_type: str = "get",
@@ -202,9 +203,7 @@ async def db_save(
# 如果提供了key_field和key_value尝试更新现有记录
if key_field and key_value is not None:
# 查找现有记录
existing_records = list(
model_class.select().where(getattr(model_class, key_field) == key_value).limit(1)
)
existing_records = list(model_class.select().where(getattr(model_class, key_field) == key_value).limit(1))
if existing_records:
# 更新现有记录
@@ -307,9 +306,9 @@ async def store_action_info(
action_name: str = "",
) -> Union[Dict[str, Any], None]:
"""存储动作信息到数据库
将Action执行的相关信息保存到ActionRecords表中用于后续的记忆和上下文构建。
Args:
chat_stream: 聊天流对象,包含聊天相关信息
action_build_into_prompt: 是否将此动作构建到提示中
@@ -318,11 +317,11 @@ async def store_action_info(
thinking_id: 关联的思考ID
action_data: 动作数据字典
action_name: 动作名称
Returns:
Dict[str, Any]: 保存的记录数据
None: 如果保存失败
示例:
record = await database_api.store_action_info(
chat_stream=chat_stream,
@@ -338,7 +337,7 @@ async def store_action_info(
import time
import json
from src.common.database.database_model import ActionRecords
# 构建动作记录数据
record_data = {
"action_id": thinking_id or str(int(time.time() * 1000000)), # 使用thinking_id或生成唯一ID
@@ -349,38 +348,39 @@ async def store_action_info(
"action_build_into_prompt": action_build_into_prompt,
"action_prompt_display": action_prompt_display,
}
# 从chat_stream获取聊天信息
if chat_stream:
record_data.update({
"chat_id": getattr(chat_stream, 'stream_id', ''),
"chat_info_stream_id": getattr(chat_stream, 'stream_id', ''),
"chat_info_platform": getattr(chat_stream, 'platform', ''),
})
record_data.update(
{
"chat_id": getattr(chat_stream, "stream_id", ""),
"chat_info_stream_id": getattr(chat_stream, "stream_id", ""),
"chat_info_platform": getattr(chat_stream, "platform", ""),
}
)
else:
# 如果没有chat_stream设置默认值
record_data.update({
"chat_id": "",
"chat_info_stream_id": "",
"chat_info_platform": "",
})
record_data.update(
{
"chat_id": "",
"chat_info_stream_id": "",
"chat_info_platform": "",
}
)
# 使用已有的db_save函数保存记录
saved_record = await db_save(
ActionRecords,
data=record_data,
key_field="action_id",
key_value=record_data["action_id"]
ActionRecords, data=record_data, key_field="action_id", key_value=record_data["action_id"]
)
if saved_record:
logger.info(f"[DatabaseAPI] 成功存储动作信息: {action_name} (ID: {record_data['action_id']})")
else:
logger.error(f"[DatabaseAPI] 存储动作信息失败: {action_name}")
return saved_record
except Exception as e:
logger.error(f"[DatabaseAPI] 存储动作信息时发生错误: {e}")
traceback.print_exc()
return None
return None

View File

@@ -20,35 +20,36 @@ logger = get_logger("emoji_api")
# 表情包获取API函数
# =============================================================================
async def get_by_description(description: str) -> Optional[Tuple[str, str, str]]:
"""根据描述选择表情包
Args:
description: 表情包的描述文本,例如"开心""难过""愤怒"
Returns:
Optional[Tuple[str, str, str]]: (base64编码, 表情包描述, 匹配的情感标签) 或 None
"""
try:
logger.info(f"[EmojiAPI] 根据描述获取表情包: {description}")
emoji_manager = get_emoji_manager()
emoji_result = await emoji_manager.get_emoji_for_text(description)
if not emoji_result:
logger.warning(f"[EmojiAPI] 未找到匹配描述 '{description}' 的表情包")
return None
emoji_path, emoji_description, matched_emotion = emoji_result
emoji_base64 = image_path_to_base64(emoji_path)
if not emoji_base64:
logger.error(f"[EmojiAPI] 无法将表情包文件转换为base64: {emoji_path}")
return None
logger.info(f"[EmojiAPI] 成功获取表情包: {emoji_description}, 匹配情感: {matched_emotion}")
return emoji_base64, emoji_description, matched_emotion
except Exception as e:
logger.error(f"[EmojiAPI] 获取表情包失败: {e}")
return None
@@ -56,43 +57,44 @@ async def get_by_description(description: str) -> Optional[Tuple[str, str, str]]
async def get_random() -> Optional[Tuple[str, str, str]]:
"""随机获取表情包
Returns:
Optional[Tuple[str, str, str]]: (base64编码, 表情包描述, 随机情感标签) 或 None
"""
try:
logger.info("[EmojiAPI] 随机获取表情包")
emoji_manager = get_emoji_manager()
all_emojis = emoji_manager.emoji_objects
if not all_emojis:
logger.warning("[EmojiAPI] 没有可用的表情包")
return None
# 过滤有效表情包
valid_emojis = [emoji for emoji in all_emojis if not emoji.is_deleted]
if not valid_emojis:
logger.warning("[EmojiAPI] 没有有效的表情包")
return None
# 随机选择
import random
selected_emoji = random.choice(valid_emojis)
emoji_base64 = image_path_to_base64(selected_emoji.full_path)
if not emoji_base64:
logger.error(f"[EmojiAPI] 无法转换表情包为base64: {selected_emoji.full_path}")
return None
matched_emotion = random.choice(selected_emoji.emotion) if selected_emoji.emotion else "随机表情"
# 记录使用次数
emoji_manager.record_usage(selected_emoji.hash)
logger.info(f"[EmojiAPI] 成功获取随机表情包: {selected_emoji.description}")
return emoji_base64, selected_emoji.description, matched_emotion
except Exception as e:
logger.error(f"[EmojiAPI] 获取随机表情包失败: {e}")
return None
@@ -100,44 +102,45 @@ async def get_random() -> Optional[Tuple[str, str, str]]:
async def get_by_emotion(emotion: str) -> Optional[Tuple[str, str, str]]:
"""根据情感标签获取表情包
Args:
emotion: 情感标签,如"happy""sad""angry"
Returns:
Optional[Tuple[str, str, str]]: (base64编码, 表情包描述, 匹配的情感标签) 或 None
"""
try:
logger.info(f"[EmojiAPI] 根据情感获取表情包: {emotion}")
emoji_manager = get_emoji_manager()
all_emojis = emoji_manager.emoji_objects
# 筛选匹配情感的表情包
matching_emojis = []
for emoji_obj in all_emojis:
if not emoji_obj.is_deleted and emotion.lower() in [e.lower() for e in emoji_obj.emotion]:
matching_emojis.append(emoji_obj)
if not matching_emojis:
logger.warning(f"[EmojiAPI] 未找到匹配情感 '{emotion}' 的表情包")
return None
# 随机选择匹配的表情包
import random
selected_emoji = random.choice(matching_emojis)
emoji_base64 = image_path_to_base64(selected_emoji.full_path)
if not emoji_base64:
logger.error(f"[EmojiAPI] 无法转换表情包为base64: {selected_emoji.full_path}")
return None
# 记录使用次数
emoji_manager.record_usage(selected_emoji.hash)
logger.info(f"[EmojiAPI] 成功获取情感表情包: {selected_emoji.description}")
return emoji_base64, selected_emoji.description, emotion
except Exception as e:
logger.error(f"[EmojiAPI] 根据情感获取表情包失败: {e}")
return None
@@ -147,9 +150,10 @@ async def get_by_emotion(emotion: str) -> Optional[Tuple[str, str, str]]:
# 表情包信息查询API函数
# =============================================================================
def get_count() -> int:
"""获取表情包数量
Returns:
int: 当前可用的表情包数量
"""
@@ -163,7 +167,7 @@ def get_count() -> int:
def get_info() -> dict:
"""获取表情包系统信息
Returns:
dict: 包含表情包数量、最大数量等信息
"""
@@ -181,18 +185,18 @@ def get_info() -> dict:
def get_emotions() -> list:
"""获取所有可用的情感标签
Returns:
list: 所有表情包的情感标签列表(去重)
"""
try:
emoji_manager = get_emoji_manager()
emotions = set()
for emoji_obj in emoji_manager.emoji_objects:
if not emoji_obj.is_deleted and emoji_obj.emotion:
emotions.update(emoji_obj.emotion)
return sorted(list(emotions))
except Exception as e:
logger.error(f"[EmojiAPI] 获取情感标签失败: {e}")
@@ -201,18 +205,18 @@ def get_emotions() -> list:
def get_descriptions() -> list:
"""获取所有表情包描述
Returns:
list: 所有可用表情包的描述列表
"""
try:
emoji_manager = get_emoji_manager()
descriptions = []
for emoji_obj in emoji_manager.emoji_objects:
if not emoji_obj.is_deleted and emoji_obj.description:
descriptions.append(emoji_obj.description)
return descriptions
except Exception as e:
logger.error(f"[EmojiAPI] 获取表情包描述失败: {e}")

View File

@@ -16,22 +16,22 @@ from src.chat.message_receive.chat_stream import get_chat_manager
logger = get_logger("generator_api")
# =============================================================================
# 回复器获取API函数
# =============================================================================
def get_replyer(chat_stream=None, platform: str = None, chat_id: str = None, is_group: bool = True) -> DefaultReplyer:
"""获取回复器对象
优先使用chat_stream如果没有则使用platform和chat_id组合
Args:
chat_stream: 聊天流对象(优先)
platform: 平台名称,如"qq"
chat_id: 聊天ID群ID或用户ID
is_group: 是否为群聊
Returns:
Optional[Any]: 回复器对象如果获取失败则返回None
"""
@@ -40,7 +40,7 @@ def get_replyer(chat_stream=None, platform: str = None, chat_id: str = None, is_
if chat_stream:
logger.debug("[GeneratorAPI] 使用聊天流获取回复器")
return DefaultReplyer(chat_stream=chat_stream)
# 使用平台和ID组合
if platform and chat_id:
logger.debug("[GeneratorAPI] 使用平台和ID获取回复器")
@@ -48,7 +48,7 @@ def get_replyer(chat_stream=None, platform: str = None, chat_id: str = None, is_
if not chat_manager:
logger.warning("[GeneratorAPI] 无法获取聊天管理器")
return None
# 查找对应的聊天流
target_stream = None
for _stream_id, stream in chat_manager.streams.items():
@@ -61,29 +61,31 @@ def get_replyer(chat_stream=None, platform: str = None, chat_id: str = None, is_
if str(stream.user_info.user_id) == str(chat_id):
target_stream = stream
break
return DefaultReplyer(chat_stream=target_stream)
logger.warning("[GeneratorAPI] 缺少必要参数,无法获取回复器")
return None
except Exception as e:
logger.error(f"[GeneratorAPI] 获取回复器失败: {e}")
return None
# =============================================================================
# 回复生成API函数
# =============================================================================
async def generate_reply(
chat_stream=None,
action_data: Dict[str, Any] = None,
platform: str = None,
chat_id: str = None,
is_group: bool = True
is_group: bool = True,
) -> Tuple[bool, List[Tuple[str, Any]]]:
"""生成回复
Args:
chat_stream: 聊天流对象(优先)
action_data: 动作数据
@@ -94,7 +96,7 @@ async def generate_reply(
platform: 平台名称(备用)
chat_id: 聊天ID备用
is_group: 是否为群聊(备用)
Returns:
Tuple[bool, List[Tuple[str, Any]]]: (是否成功, 回复集合)
"""
@@ -104,41 +106,42 @@ async def generate_reply(
if not replyer:
logger.error("[GeneratorAPI] 无法获取回复器")
return False, []
logger.info("[GeneratorAPI] 开始生成回复")
# 调用回复器生成回复
success, reply_set = await replyer.generate_reply_with_context(
reply_data=action_data or {},
)
if success:
logger.info(f"[GeneratorAPI] 回复生成成功,生成了 {len(reply_set)} 个回复项")
else:
logger.warning("[GeneratorAPI] 回复生成失败")
return success, reply_set or []
except Exception as e:
logger.error(f"[GeneratorAPI] 生成回复时出错: {e}")
return False, []
async def rewrite_reply(
chat_stream=None,
reply_data: Dict[str, Any] = None,
platform: str = None,
chat_id: str = None,
is_group: bool = True
is_group: bool = True,
) -> Tuple[bool, List[Tuple[str, Any]]]:
"""重写回复
Args:
chat_stream: 聊天流对象(优先)
action_data: 动作数据
platform: 平台名称(备用)
chat_id: 聊天ID备用
is_group: 是否为群聊(备用)
Returns:
Tuple[bool, List[Tuple[str, Any]]]: (是否成功, 回复集合)
"""
@@ -148,23 +151,21 @@ async def rewrite_reply(
if not replyer:
logger.error("[GeneratorAPI] 无法获取回复器")
return False, []
logger.info("[GeneratorAPI] 开始重写回复")
# 调用回复器重写回复
success, reply_set = await replyer.rewrite_reply_with_context(
reply_data=reply_data or {},
)
if success:
logger.info(f"[GeneratorAPI] 重写回复成功,生成了 {len(reply_set)} 个回复项")
else:
logger.warning("[GeneratorAPI] 重写回复失败")
return success, reply_set or []
except Exception as e:
logger.error(f"[GeneratorAPI] 重写回复时出错: {e}")
return False, []

View File

@@ -19,6 +19,7 @@ logger = get_logger("llm_api")
# LLM模型API函数
# =============================================================================
def get_available_models() -> Dict[str, Any]:
"""获取所有可用的模型配置

View File

@@ -32,18 +32,19 @@ from src.chat.utils.chat_message_builder import (
# 消息查询API函数
# =============================================================================
def get_messages_by_time(
start_time: float, end_time: float, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
"""
获取指定时间范围内的消息
Args:
start_time: 开始时间戳
end_time: 结束时间戳
limit: 限制返回的消息数量0为不限制
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -55,14 +56,14 @@ def get_messages_by_time_in_chat(
) -> List[Dict[str, Any]]:
"""
获取指定聊天中指定时间范围内的消息
Args:
chat_id: 聊天ID
start_time: 开始时间戳
end_time: 结束时间戳
limit: 限制返回的消息数量0为不限制
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -74,14 +75,14 @@ def get_messages_by_time_in_chat_inclusive(
) -> List[Dict[str, Any]]:
"""
获取指定聊天中指定时间范围内的消息(包含边界)
Args:
chat_id: 聊天ID
start_time: 开始时间戳(包含)
end_time: 结束时间戳(包含)
limit: 限制返回的消息数量0为不限制
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -98,7 +99,7 @@ def get_messages_by_time_in_chat_for_users(
) -> List[Dict[str, Any]]:
"""
获取指定聊天中指定用户在指定时间范围内的消息
Args:
chat_id: 聊天ID
start_time: 开始时间戳
@@ -106,7 +107,7 @@ def get_messages_by_time_in_chat_for_users(
person_ids: 用户ID列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -118,13 +119,13 @@ def get_random_chat_messages(
) -> List[Dict[str, Any]]:
"""
随机选择一个聊天,返回该聊天在指定时间范围内的消息
Args:
start_time: 开始时间戳
end_time: 结束时间戳
limit: 限制返回的消息数量0为不限制
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -136,14 +137,14 @@ def get_messages_by_time_for_users(
) -> List[Dict[str, Any]]:
"""
获取指定用户在所有聊天中指定时间范围内的消息
Args:
start_time: 开始时间戳
end_time: 结束时间戳
person_ids: 用户ID列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -153,11 +154,11 @@ def get_messages_by_time_for_users(
def get_messages_before_time(timestamp: float, limit: int = 0) -> List[Dict[str, Any]]:
"""
获取指定时间戳之前的消息
Args:
timestamp: 时间戳
limit: 限制返回的消息数量0为不限制
Returns:
消息列表
"""
@@ -167,29 +168,27 @@ def get_messages_before_time(timestamp: float, limit: int = 0) -> List[Dict[str,
def get_messages_before_time_in_chat(chat_id: str, timestamp: float, limit: int = 0) -> List[Dict[str, Any]]:
"""
获取指定聊天中指定时间戳之前的消息
Args:
chat_id: 聊天ID
timestamp: 时间戳
limit: 限制返回的消息数量0为不限制
Returns:
消息列表
"""
return get_raw_msg_before_timestamp_with_chat(chat_id, timestamp, limit)
def get_messages_before_time_for_users(
timestamp: float, person_ids: list, limit: int = 0
) -> List[Dict[str, Any]]:
def get_messages_before_time_for_users(timestamp: float, person_ids: list, limit: int = 0) -> List[Dict[str, Any]]:
"""
获取指定用户在指定时间戳之前的消息
Args:
timestamp: 时间戳
person_ids: 用户ID列表
limit: 限制返回的消息数量0为不限制
Returns:
消息列表
"""
@@ -197,20 +196,17 @@ def get_messages_before_time_for_users(
def get_recent_messages(
chat_id: str,
hours: float = 24.0,
limit: int = 100,
limit_mode: str = "latest"
chat_id: str, hours: float = 24.0, limit: int = 100, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
"""
获取指定聊天中最近一段时间的消息
Args:
chat_id: 聊天ID
hours: 最近多少小时默认24小时
limit: 限制返回的消息数量默认100条
limit_mode: 当limit>0时生效'earliest'表示获取最早的记录,'latest'表示获取最新的记录
Returns:
消息列表
"""
@@ -223,35 +219,32 @@ def get_recent_messages(
# 消息计数API函数
# =============================================================================
def count_new_messages(
chat_id: str, start_time: float = 0.0, end_time: Optional[float] = None
) -> int:
def count_new_messages(chat_id: str, start_time: float = 0.0, end_time: Optional[float] = None) -> int:
"""
计算指定聊天中从开始时间到结束时间的新消息数量
Args:
chat_id: 聊天ID
start_time: 开始时间戳
end_time: 结束时间戳如果为None则使用当前时间
Returns:
新消息数量
"""
return num_new_messages_since(chat_id, start_time, end_time)
def count_new_messages_for_users(
chat_id: str, start_time: float, end_time: float, person_ids: list
) -> int:
def count_new_messages_for_users(chat_id: str, start_time: float, end_time: float, person_ids: list) -> int:
"""
计算指定聊天中指定用户从开始时间到结束时间的新消息数量
Args:
chat_id: 聊天ID
start_time: 开始时间戳
end_time: 结束时间戳
person_ids: 用户ID列表
Returns:
新消息数量
"""
@@ -262,6 +255,7 @@ def count_new_messages_for_users(
# 消息格式化API函数
# =============================================================================
def build_readable_messages_to_str(
messages: List[Dict[str, Any]],
replace_bot_name: bool = True,
@@ -273,7 +267,7 @@ def build_readable_messages_to_str(
) -> str:
"""
将消息列表构建成可读的字符串
Args:
messages: 消息列表
replace_bot_name: 是否将机器人的名称替换为""
@@ -282,7 +276,7 @@ def build_readable_messages_to_str(
read_mark: 已读标记时间戳,用于分割已读和未读消息
truncate: 是否截断长消息
show_actions: 是否显示动作记录
Returns:
格式化后的可读字符串
"""
@@ -300,29 +294,27 @@ async def build_readable_messages_with_details(
) -> Tuple[str, List[Tuple[float, str, str]]]:
"""
将消息列表构建成可读的字符串,并返回详细信息
Args:
messages: 消息列表
replace_bot_name: 是否将机器人的名称替换为""
merge_messages: 是否合并连续消息
timestamp_mode: 时间戳显示模式,'relative''absolute'
truncate: 是否截断长消息
Returns:
格式化后的可读字符串和详细信息元组列表(时间戳, 昵称, 内容)
"""
return await build_readable_messages_with_list(
messages, replace_bot_name, merge_messages, timestamp_mode, truncate
)
return await build_readable_messages_with_list(messages, replace_bot_name, merge_messages, timestamp_mode, truncate)
async def get_person_ids_from_messages(messages: List[Dict[str, Any]]) -> List[str]:
"""
从消息列表中提取不重复的用户ID列表
Args:
messages: 消息列表
Returns:
用户ID列表
"""

View File

@@ -18,16 +18,17 @@ logger = get_logger("person_api")
# 个人信息API函数
# =============================================================================
def get_person_id(platform: str, user_id: int) -> str:
"""根据平台和用户ID获取person_id
Args:
platform: 平台名称,如 "qq", "telegram"
user_id: 用户ID
Returns:
str: 唯一的person_idMD5哈希值
示例:
person_id = person_api.get_person_id("qq", 123456)
"""
@@ -40,15 +41,15 @@ def get_person_id(platform: str, user_id: int) -> str:
async def get_person_value(person_id: str, field_name: str, default: Any = None) -> Any:
"""根据person_id和字段名获取某个值
Args:
person_id: 用户的唯一标识ID
field_name: 要获取的字段名,如 "nickname", "impression"
default: 当字段不存在或获取失败时返回的默认值
Returns:
Any: 字段值或默认值
示例:
nickname = await person_api.get_person_value(person_id, "nickname", "未知用户")
impression = await person_api.get_person_value(person_id, "impression")
@@ -64,18 +65,18 @@ async def get_person_value(person_id: str, field_name: str, default: Any = None)
async def get_person_values(person_id: str, field_names: list, default_dict: dict = None) -> dict:
"""批量获取用户信息字段值
Args:
person_id: 用户的唯一标识ID
field_names: 要获取的字段名列表
default_dict: 默认值字典,键为字段名,值为默认值
Returns:
dict: 字段名到值的映射字典
示例:
values = await person_api.get_person_values(
person_id,
person_id,
["nickname", "impression", "know_times"],
{"nickname": "未知用户", "know_times": 0}
)
@@ -83,11 +84,11 @@ async def get_person_values(person_id: str, field_names: list, default_dict: dic
try:
person_info_manager = get_person_info_manager()
values = await person_info_manager.get_values(person_id, field_names)
# 如果获取成功,返回结果
if values:
return values
# 如果获取失败,构建默认值字典
result = {}
if default_dict:
@@ -96,9 +97,9 @@ async def get_person_values(person_id: str, field_names: list, default_dict: dic
else:
for field in field_names:
result[field] = None
return result
except Exception as e:
logger.error(f"[PersonAPI] 批量获取用户信息失败: person_id={person_id}, fields={field_names}, error={e}")
# 返回默认值字典
@@ -114,14 +115,14 @@ async def get_person_values(person_id: str, field_names: list, default_dict: dic
async def is_person_known(platform: str, user_id: int) -> bool:
"""判断是否认识某个用户
Args:
platform: 平台名称
user_id: 用户ID
Returns:
bool: 是否认识该用户
示例:
known = await person_api.is_person_known("qq", 123456)
"""
@@ -135,13 +136,13 @@ async def is_person_known(platform: str, user_id: int) -> bool:
def get_person_id_by_name(person_name: str) -> str:
"""根据用户名获取person_id
Args:
person_name: 用户名
Returns:
str: person_id如果未找到返回空字符串
示例:
person_id = person_api.get_person_id_by_name("张三")
"""

View File

@@ -31,6 +31,7 @@ logger = get_logger("send_api")
# 内部实现函数(不暴露给外部)
# =============================================================================
async def _send_to_target(
message_type: str,
content: str,
@@ -41,7 +42,7 @@ async def _send_to_target(
storage_message: bool = True,
) -> bool:
"""向指定目标发送消息的内部实现
Args:
message_type: 消息类型,如"text""image""emoji"
content: 消息内容
@@ -49,41 +50,41 @@ async def _send_to_target(
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息的格式,如"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
try:
logger.info(f"[SendAPI] 发送{message_type}消息到 {stream_id}")
# 查找目标聊天流
target_stream = get_chat_manager().get_stream(stream_id)
if not target_stream:
logger.error(f"[SendAPI] 未找到聊天流: {stream_id}")
return False
# 创建发送器
heart_fc_sender = HeartFCSender()
# 生成消息ID
current_time = time.time()
message_id = f"send_api_{int(current_time * 1000)}"
# 构建机器人用户信息
bot_user_info = UserInfo(
user_id=global_config.bot.qq_account,
user_nickname=global_config.bot.nickname,
platform=target_stream.platform,
)
# 创建消息段
message_segment = Seg(type=message_type, data=content)
# 处理回复消息
anchor_message = None
if reply_to:
anchor_message = await _find_reply_message(target_stream, reply_to)
# 构建发送消息对象
bot_message = MessageSending(
message_id=message_id,
@@ -97,19 +98,19 @@ async def _send_to_target(
is_emoji=(message_type == "emoji"),
thinking_start_time=current_time,
)
# 发送消息
sent_msg = await heart_fc_sender.send_message(
bot_message, typing=typing, set_reply=(anchor_message is not None), storage_message=storage_message
)
if sent_msg:
logger.info(f"[SendAPI] 成功发送消息到 {stream_id}")
return True
else:
logger.error("[SendAPI] 发送消息失败")
return False
except Exception as e:
logger.error(f"[SendAPI] 发送消息时出错: {e}")
traceback.print_exc()
@@ -118,11 +119,11 @@ async def _send_to_target(
async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageRecv]:
"""查找要回复的消息
Args:
target_stream: 目标聊天流
reply_to: 回复格式,如"发送者:消息内容""发送者:消息内容"
Returns:
Optional[MessageRecv]: 找到的消息如果没找到则返回None
"""
@@ -139,20 +140,20 @@ async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageR
if len(parts) != 2:
logger.warning(f"[SendAPI] reply_to格式不正确: {reply_to}")
return None
sender = parts[0].strip()
text = parts[1].strip()
# 获取聊天流的最新20条消息
reverse_talking_message = get_raw_msg_before_timestamp_with_chat(
target_stream.stream_id,
target_stream.stream_id,
time.time(), # 当前时间之前的消息
20 # 最新的20条消息
20, # 最新的20条消息
)
# 反转列表,使最新的消息在前面
reverse_talking_message = list(reversed(reverse_talking_message))
find_msg = None
for message in reverse_talking_message:
user_id = message["user_id"]
@@ -198,20 +199,20 @@ async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageR
"format_info": format_info,
"template_info": template_info,
}
message_dict = {
"message_info": message_info,
"raw_message": find_msg.get("processed_plain_text"),
"detailed_plain_text": find_msg.get("processed_plain_text"),
"processed_plain_text": find_msg.get("processed_plain_text"),
}
find_rec_msg = MessageRecv(message_dict)
find_rec_msg.update_chat_stream(target_stream)
logger.info(f"[SendAPI] 找到匹配的回复消息,发送者: {sender}")
return find_rec_msg
except Exception as e:
logger.error(f"[SendAPI] 查找回复消息时出错: {e}")
traceback.print_exc()
@@ -222,34 +223,49 @@ async def _find_reply_message(target_stream, reply_to: str) -> Optional[MessageR
# 公共API函数 - 预定义类型的发送函数
# =============================================================================
async def text_to_group(text: str, group_id: str, platform: str = "qq", typing: bool = False, reply_to: str = "", storage_message: bool = True) -> bool:
async def text_to_group(
text: str,
group_id: str,
platform: str = "qq",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向群聊发送文本消息
Args:
text: 要发送的文本内容
group_id: 群聊ID
platform: 平台,默认为"qq"
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target("text", text, stream_id, "", typing, reply_to, storage_message)
async def text_to_user(text: str, user_id: str, platform: str = "qq", typing: bool = False, reply_to: str = "", storage_message: bool = True) -> bool:
async def text_to_user(
text: str,
user_id: str,
platform: str = "qq",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向用户发送私聊文本消息
Args:
text: 要发送的文本内容
user_id: 用户ID
platform: 平台,默认为"qq"
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
@@ -259,12 +275,12 @@ async def text_to_user(text: str, user_id: str, platform: str = "qq", typing: bo
async def emoji_to_group(emoji_base64: str, group_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向群聊发送表情包
Args:
emoji_base64: 表情包的base64编码
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
@@ -274,12 +290,12 @@ async def emoji_to_group(emoji_base64: str, group_id: str, platform: str = "qq",
async def emoji_to_user(emoji_base64: str, user_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向用户发送表情包
Args:
emoji_base64: 表情包的base64编码
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
@@ -289,12 +305,12 @@ async def emoji_to_user(emoji_base64: str, user_id: str, platform: str = "qq", s
async def image_to_group(image_base64: str, group_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向群聊发送图片
Args:
image_base64: 图片的base64编码
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
@@ -304,40 +320,42 @@ async def image_to_group(image_base64: str, group_id: str, platform: str = "qq",
async def image_to_user(image_base64: str, user_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向用户发送图片
Args:
image_base64: 图片的base64编码
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, user_id, False)
return await _send_to_target("image", image_base64, stream_id, "", typing=False)
async def command_to_group(command: str, group_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向群聊发送命令
Args:
command: 命令
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target("command", command, stream_id, "", typing=False, storage_message=storage_message)
async def command_to_user(command: str, user_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向用户发送命令
Args:
command: 命令
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
@@ -349,18 +367,19 @@ async def command_to_user(command: str, user_id: str, platform: str = "qq", stor
# 通用发送函数 - 支持任意消息类型
# =============================================================================
async def custom_to_group(
message_type: str,
content: str,
group_id: str,
platform: str = "qq",
message_type: str,
content: str,
group_id: str,
platform: str = "qq",
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True
storage_message: bool = True,
) -> bool:
"""向群聊发送自定义类型消息
Args:
message_type: 消息类型,如"text""image""emoji""video""file"
content: 消息内容通常是base64编码或文本
@@ -369,7 +388,7 @@ async def custom_to_group(
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
@@ -378,17 +397,17 @@ async def custom_to_group(
async def custom_to_user(
message_type: str,
content: str,
user_id: str,
platform: str = "qq",
message_type: str,
content: str,
user_id: str,
platform: str = "qq",
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True
storage_message: bool = True,
) -> bool:
"""向用户发送自定义类型消息
Args:
message_type: 消息类型,如"text""image""emoji""video""file"
content: 消息内容通常是base64编码或文本
@@ -397,7 +416,7 @@ async def custom_to_user(
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
@@ -414,10 +433,10 @@ async def custom_message(
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True
storage_message: bool = True,
) -> bool:
"""发送自定义消息的通用接口
Args:
message_type: 消息类型,如"text""image""emoji""video""file""audio"
content: 消息内容
@@ -427,19 +446,19 @@ async def custom_message(
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
示例:
# 发送视频到群聊
await send_api.custom_message("video", video_base64, "123456", True)
# 发送文件到用户
await send_api.custom_message("file", file_base64, "987654", False)
# 发送音频到群聊并回复特定消息
await send_api.custom_message("audio", audio_base64, "123456", True, reply_to="张三:你好")
"""
stream_id = get_chat_manager().get_stream_id(platform, target_id, is_group)
return await _send_to_target(message_type, content, stream_id, display_message, typing, reply_to, storage_message)
return await _send_to_target(message_type, content, stream_id, display_message, typing, reply_to, storage_message)

View File

@@ -24,6 +24,7 @@ logger = get_logger("utils_api")
# 文件操作API函数
# =============================================================================
def get_plugin_path(caller_frame=None) -> str:
"""获取调用者插件的路径
@@ -106,6 +107,7 @@ def write_json_file(file_path: str, data: Any, indent: int = 2) -> bool:
# 时间相关API函数
# =============================================================================
def get_timestamp() -> int:
"""获取当前时间戳
@@ -156,6 +158,7 @@ def parse_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int:
# 其他工具函数
# =============================================================================
def generate_unique_id() -> str:
"""生成唯一ID