This commit is contained in:
Windpicker-owo
2025-07-28 22:57:20 +08:00
35 changed files with 1734 additions and 3146 deletions

View File

@@ -46,7 +46,6 @@ from .apis import (
person_api,
plugin_manage_api,
send_api,
utils_api,
register_plugin,
get_logger,
)
@@ -68,7 +67,6 @@ __all__ = [
"person_api",
"plugin_manage_api",
"send_api",
"utils_api",
"register_plugin",
"get_logger",
# 基础类

View File

@@ -17,7 +17,6 @@ from src.plugin_system.apis import (
person_api,
plugin_manage_api,
send_api,
utils_api,
)
from .logging_api import get_logger
from .plugin_register_api import register_plugin
@@ -35,7 +34,6 @@ __all__ = [
"person_api",
"plugin_manage_api",
"send_api",
"utils_api",
"get_logger",
"register_plugin",
]

View File

@@ -210,7 +210,7 @@ class ChatManager:
chat_stream: 聊天流对象
Returns:
Dict[str, Any]: 聊天流信息字典
Dict ({str: Any}): 聊天流信息字典
Raises:
TypeError: 如果 chat_stream 不是 ChatStream 类型
@@ -285,41 +285,41 @@ class ChatManager:
# =============================================================================
def get_all_streams(platform: Optional[str] | SpecialTypes = "qq"):
def get_all_streams(platform: Optional[str] | SpecialTypes = "qq") -> List[ChatStream]:
"""获取所有聊天流的便捷函数"""
return ChatManager.get_all_streams(platform)
def get_group_streams(platform: Optional[str] | SpecialTypes = "qq"):
def get_group_streams(platform: Optional[str] | SpecialTypes = "qq") -> List[ChatStream]:
"""获取群聊聊天流的便捷函数"""
return ChatManager.get_group_streams(platform)
def get_private_streams(platform: Optional[str] | SpecialTypes = "qq"):
def get_private_streams(platform: Optional[str] | SpecialTypes = "qq") -> List[ChatStream]:
"""获取私聊聊天流的便捷函数"""
return ChatManager.get_private_streams(platform)
def get_stream_by_group_id(group_id: str, platform: Optional[str] | SpecialTypes = "qq"):
def get_stream_by_group_id(group_id: str, platform: Optional[str] | SpecialTypes = "qq") -> Optional[ChatStream]:
"""根据群ID获取聊天流的便捷函数"""
return ChatManager.get_group_stream_by_group_id(group_id, platform)
def get_stream_by_user_id(user_id: str, platform: Optional[str] | SpecialTypes = "qq"):
def get_stream_by_user_id(user_id: str, platform: Optional[str] | SpecialTypes = "qq") -> Optional[ChatStream]:
"""根据用户ID获取私聊流的便捷函数"""
return ChatManager.get_private_stream_by_user_id(user_id, platform)
def get_stream_type(chat_stream: ChatStream):
def get_stream_type(chat_stream: ChatStream) -> str:
"""获取聊天流类型的便捷函数"""
return ChatManager.get_stream_type(chat_stream)
def get_stream_info(chat_stream: ChatStream):
def get_stream_info(chat_stream: ChatStream) -> Dict[str, Any]:
"""获取聊天流信息的便捷函数"""
return ChatManager.get_stream_info(chat_stream)
def get_streams_summary():
def get_streams_summary() -> Dict[str, int]:
"""获取聊天流统计摘要的便捷函数"""
return ChatManager.get_streams_summary()

View File

@@ -10,7 +10,6 @@
from typing import Any
from src.common.logger import get_logger
from src.config.config import global_config
from src.person_info.person_info import get_person_info_manager
logger = get_logger("config_api")
@@ -26,7 +25,7 @@ def get_global_config(key: str, default: Any = None) -> Any:
插件应使用此方法读取全局配置,以保证只读和隔离性。
Args:
key: 命名空间式配置键名,支持嵌套访问,如 "section.subsection.key",大小写敏感
key: 命名空间式配置键名,使用嵌套访问,如 "section.subsection.key",大小写敏感
default: 如果配置不存在时返回的默认值
Returns:
@@ -76,50 +75,3 @@ def get_plugin_config(plugin_config: dict, key: str, default: Any = None) -> Any
except Exception as e:
logger.warning(f"[ConfigAPI] 获取插件配置 {key} 失败: {e}")
return default
# =============================================================================
# 用户信息API函数
# =============================================================================
async def get_user_id_by_person_name(person_name: str) -> tuple[str, str]:
"""根据内部用户名获取用户ID
Args:
person_name: 用户名
Returns:
tuple[str, str]: (平台, 用户ID)
"""
try:
person_info_manager = get_person_info_manager()
person_id = person_info_manager.get_person_id_by_person_name(person_name)
user_id: str = await person_info_manager.get_value(person_id, "user_id") # type: ignore
platform: str = await person_info_manager.get_value(person_id, "platform") # type: ignore
return platform, user_id
except Exception as e:
logger.error(f"[ConfigAPI] 根据用户名获取用户ID失败: {e}")
return "", ""
async def get_person_info(person_id: str, key: str, default: Any = None) -> Any:
"""获取用户信息
Args:
person_id: 用户ID
key: 信息键名
default: 默认值
Returns:
Any: 用户信息值或默认值
"""
try:
person_info_manager = get_person_info_manager()
response = await person_info_manager.get_value(person_id, key)
if not response:
raise ValueError(f"[ConfigAPI] 获取用户 {person_id} 的信息 '{key}' 失败,返回默认值")
return response
except Exception as e:
logger.error(f"[ConfigAPI] 获取用户信息失败: {e}")
return default

View File

@@ -152,10 +152,7 @@ async def db_query(
except DoesNotExist:
# 记录不存在
if query_type == "get" and single_result:
return None
return []
return None if query_type == "get" and single_result else []
except Exception as e:
logger.error(f"[DatabaseAPI] 数据库操作出错: {e}")
traceback.print_exc()
@@ -170,7 +167,8 @@ async def db_query(
async def db_save(
model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None
) -> Union[Dict[str, Any], None]:
) -> Optional[Dict[str, Any]]:
# sourcery skip: inline-immediately-returned-variable
"""保存数据到数据库(创建或更新)
如果提供了key_field和key_value会先尝试查找匹配的记录进行更新
@@ -203,10 +201,9 @@ async def db_save(
try:
# 如果提供了key_field和key_value尝试更新现有记录
if key_field and key_value is not None:
# 查找现有记录
existing_records = list(model_class.select().where(getattr(model_class, key_field) == key_value).limit(1))
if existing_records:
if existing_records := list(
model_class.select().where(getattr(model_class, key_field) == key_value).limit(1)
):
# 更新现有记录
existing_record = existing_records[0]
for field, value in data.items():
@@ -244,8 +241,8 @@ async def db_get(
Args:
model_class: Peewee模型类
filters: 过滤条件,字段名和值的字典
order_by: 排序字段,前缀'-'表示降序,例如'-time'表示按时间字段即time字段降序
limit: 结果数量限制
order_by: 排序字段,前缀'-'表示降序,例如'-time'表示按时间字段即time字段降序
single_result: 是否只返回单个结果如果为True则返回单个记录字典或None否则返回记录字典列表或空列表
Returns:
@@ -310,7 +307,7 @@ async def store_action_info(
thinking_id: str = "",
action_data: Optional[dict] = None,
action_name: str = "",
) -> Union[Dict[str, Any], None]:
) -> Optional[Dict[str, Any]]:
"""存储动作信息到数据库
将Action执行的相关信息保存到ActionRecords表中用于后续的记忆和上下文构建。

View File

@@ -65,14 +65,14 @@ async def get_by_description(description: str) -> Optional[Tuple[str, str, str]]
return None
async def get_random(count: Optional[int] = 1) -> Optional[List[Tuple[str, str, str]]]:
async def get_random(count: Optional[int] = 1) -> List[Tuple[str, str, str]]:
"""随机获取指定数量的表情包
Args:
count: 要获取的表情包数量默认为1
Returns:
Optional[List[Tuple[str, str, str]]]: 包含(base64编码, 表情包描述, 随机情感标签)的元组列表,如果失败则为None
List[Tuple[str, str, str]]: 包含(base64编码, 表情包描述, 随机情感标签)的元组列表,失败则返回空列表
Raises:
TypeError: 如果count不是整数类型
@@ -94,13 +94,13 @@ async def get_random(count: Optional[int] = 1) -> Optional[List[Tuple[str, str,
if not all_emojis:
logger.warning("[EmojiAPI] 没有可用的表情包")
return None
return []
# 过滤有效表情包
valid_emojis = [emoji for emoji in all_emojis if not emoji.is_deleted]
if not valid_emojis:
logger.warning("[EmojiAPI] 没有有效的表情包")
return None
return []
if len(valid_emojis) < count:
logger.warning(
@@ -127,14 +127,14 @@ async def get_random(count: Optional[int] = 1) -> Optional[List[Tuple[str, str,
if not results and count > 0:
logger.warning("[EmojiAPI] 随机获取表情包失败,没有一个可以成功处理")
return None
return []
logger.info(f"[EmojiAPI] 成功获取 {len(results)} 个随机表情包")
return results
except Exception as e:
logger.error(f"[EmojiAPI] 获取随机表情包失败: {e}")
return None
return []
async def get_by_emotion(emotion: str) -> Optional[Tuple[str, str, str]]:
@@ -162,10 +162,11 @@ async def get_by_emotion(emotion: str) -> Optional[Tuple[str, str, str]]:
# 筛选匹配情感的表情包
matching_emojis = []
for emoji_obj in all_emojis:
if not emoji_obj.is_deleted and emotion.lower() in [e.lower() for e in emoji_obj.emotion]:
matching_emojis.append(emoji_obj)
matching_emojis.extend(
emoji_obj
for emoji_obj in all_emojis
if not emoji_obj.is_deleted and emotion.lower() in [e.lower() for e in emoji_obj.emotion]
)
if not matching_emojis:
logger.warning(f"[EmojiAPI] 未找到匹配情感 '{emotion}' 的表情包")
return None
@@ -256,10 +257,11 @@ def get_descriptions() -> List[str]:
emoji_manager = get_emoji_manager()
descriptions = []
for emoji_obj in emoji_manager.emoji_objects:
if not emoji_obj.is_deleted and emoji_obj.description:
descriptions.append(emoji_obj.description)
descriptions.extend(
emoji_obj.description
for emoji_obj in emoji_manager.emoji_objects
if not emoji_obj.is_deleted and emoji_obj.description
)
return descriptions
except Exception as e:
logger.error(f"[EmojiAPI] 获取表情包描述失败: {e}")

View File

@@ -84,18 +84,23 @@ async def generate_reply(
enable_chinese_typo: bool = True,
return_prompt: bool = False,
model_configs: Optional[List[Dict[str, Any]]] = None,
request_type: str = "",
enable_timeout: bool = False,
request_type: str = "generator_api",
) -> Tuple[bool, List[Tuple[str, Any]], Optional[str]]:
"""生成回复
Args:
chat_stream: 聊天流对象(优先)
chat_id: 聊天ID备用
action_data: 动作数据
action_data: 动作数据向下兼容包含reply_to和extra_info
reply_to: 回复对象,格式为 "发送者:消息内容"
extra_info: 额外信息,用于补充上下文
available_actions: 可用动作
enable_tool: 是否启用工具调用
enable_splitter: 是否启用消息分割器
enable_chinese_typo: 是否启用错字生成器
return_prompt: 是否返回提示词
model_configs: 模型配置列表
request_type: 请求类型可选记录LLM使用
Returns:
Tuple[bool, List[Tuple[str, Any]], Optional[str]]: (是否成功, 回复集合, 提示词)
"""
@@ -107,7 +112,7 @@ async def generate_reply(
return False, [], None
logger.debug("[GeneratorAPI] 开始生成回复")
if not reply_to and action_data:
reply_to = action_data.get("reply_to", "")
if not extra_info and action_data:
@@ -118,7 +123,6 @@ async def generate_reply(
reply_to=reply_to,
extra_info=extra_info,
available_actions=available_actions,
enable_timeout=enable_timeout,
enable_tool=enable_tool,
)
reply_set = []
@@ -151,15 +155,24 @@ async def rewrite_reply(
enable_splitter: bool = True,
enable_chinese_typo: bool = True,
model_configs: Optional[List[Dict[str, Any]]] = None,
) -> Tuple[bool, List[Tuple[str, Any]]]:
raw_reply: str = "",
reason: str = "",
reply_to: str = "",
return_prompt: bool = False,
) -> Tuple[bool, List[Tuple[str, Any]], Optional[str]]:
"""重写回复
Args:
chat_stream: 聊天流对象(优先)
reply_data: 回复数据
reply_data: 回复数据字典(向下兼容备用,当其他参数缺失时从此获取)
chat_id: 聊天ID备用
enable_splitter: 是否启用消息分割器
enable_chinese_typo: 是否启用错字生成器
model_configs: 模型配置列表
raw_reply: 原始回复内容
reason: 回复原因
reply_to: 回复对象
return_prompt: 是否返回提示词
Returns:
Tuple[bool, List[Tuple[str, Any]]]: (是否成功, 回复集合)
@@ -169,12 +182,23 @@ async def rewrite_reply(
replyer = get_replyer(chat_stream, chat_id, model_configs=model_configs)
if not replyer:
logger.error("[GeneratorAPI] 无法获取回复器")
return False, []
return False, [], None
logger.info("[GeneratorAPI] 开始重写回复")
# 如果参数缺失从reply_data中获取
if reply_data:
raw_reply = raw_reply or reply_data.get("raw_reply", "")
reason = reason or reply_data.get("reason", "")
reply_to = reply_to or reply_data.get("reply_to", "")
# 调用回复器重写回复
success, content = await replyer.rewrite_reply_with_context(reply_data=reply_data or {})
success, content, prompt = await replyer.rewrite_reply_with_context(
raw_reply=raw_reply,
reason=reason,
reply_to=reply_to,
return_prompt=return_prompt,
)
reply_set = []
if content:
reply_set = await process_human_text(content, enable_splitter, enable_chinese_typo)
@@ -184,14 +208,14 @@ async def rewrite_reply(
else:
logger.warning("[GeneratorAPI] 重写回复失败")
return success, reply_set
return success, reply_set, prompt if return_prompt else None
except ValueError as ve:
raise ve
except Exception as e:
logger.error(f"[GeneratorAPI] 重写回复时出错: {e}")
return False, []
return False, [], None
async def process_human_text(content: str, enable_splitter: bool, enable_chinese_typo: bool) -> List[Tuple[str, Any]]:
@@ -217,3 +241,27 @@ async def process_human_text(content: str, enable_splitter: bool, enable_chinese
except Exception as e:
logger.error(f"[GeneratorAPI] 处理人形文本时出错: {e}")
return []
async def generate_response_custom(
chat_stream: Optional[ChatStream] = None,
chat_id: Optional[str] = None,
model_configs: Optional[List[Dict[str, Any]]] = None,
prompt: str = "",
) -> Optional[str]:
replyer = get_replyer(chat_stream, chat_id, model_configs=model_configs)
if not replyer:
logger.error("[GeneratorAPI] 无法获取回复器")
return None
try:
logger.debug("[GeneratorAPI] 开始生成自定义回复")
response = await replyer.llm_generate_content(prompt)
if response:
logger.debug("[GeneratorAPI] 自定义回复生成成功")
return response
else:
logger.warning("[GeneratorAPI] 自定义回复生成失败")
return None
except Exception as e:
logger.error(f"[GeneratorAPI] 生成自定义回复时出错: {e}")
return None

View File

@@ -54,7 +54,7 @@ def get_available_models() -> Dict[str, Any]:
async def generate_with_model(
prompt: str, model_config: Dict[str, Any], request_type: str = "plugin.generate", **kwargs
) -> Tuple[bool, str, str, str]:
) -> Tuple[bool, str]:
"""使用指定模型生成内容
Args:
@@ -73,10 +73,11 @@ async def generate_with_model(
llm_request = LLMRequest(model=model_config, request_type=request_type, **kwargs)
response, (reasoning, model_name) = await llm_request.generate_response_async(prompt)
return True, response, reasoning, model_name
# TODO: 复活这个_
response, _ = await llm_request.generate_response_async(prompt)
return True, response
except Exception as e:
error_msg = f"生成内容时出错: {str(e)}"
logger.error(f"[LLMAPI] {error_msg}")
return False, error_msg, "", ""
return False, error_msg

View File

@@ -207,7 +207,7 @@ def get_random_chat_messages(
def get_messages_by_time_for_users(
start_time: float, end_time: float, person_ids: list, limit: int = 0, limit_mode: str = "latest"
start_time: float, end_time: float, person_ids: List[str], limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
"""
获取指定用户在所有聊天中指定时间范围内的消息
@@ -287,7 +287,7 @@ def get_messages_before_time_in_chat(
return get_raw_msg_before_timestamp_with_chat(chat_id, timestamp, limit)
def get_messages_before_time_for_users(timestamp: float, person_ids: list, limit: int = 0) -> List[Dict[str, Any]]:
def get_messages_before_time_for_users(timestamp: float, person_ids: List[str], limit: int = 0) -> List[Dict[str, Any]]:
"""
获取指定用户在指定时间戳之前的消息
@@ -372,7 +372,7 @@ def count_new_messages(chat_id: str, start_time: float = 0.0, end_time: Optional
return num_new_messages_since(chat_id, start_time, end_time)
def count_new_messages_for_users(chat_id: str, start_time: float, end_time: float, person_ids: list) -> int:
def count_new_messages_for_users(chat_id: str, start_time: float, end_time: float, person_ids: List[str]) -> int:
"""
计算指定聊天中指定用户从开始时间到结束时间的新消息数量

View File

@@ -1,10 +1,12 @@
from typing import Tuple, List
def list_loaded_plugins() -> List[str]:
"""
列出所有当前加载的插件。
Returns:
list: 当前加载的插件名称列表。
List[str]: 当前加载的插件名称列表。
"""
from src.plugin_system.core.plugin_manager import plugin_manager
@@ -16,17 +18,38 @@ def list_registered_plugins() -> List[str]:
列出所有已注册的插件。
Returns:
list: 已注册的插件名称列表。
List[str]: 已注册的插件名称列表。
"""
from src.plugin_system.core.plugin_manager import plugin_manager
return plugin_manager.list_registered_plugins()
def get_plugin_path(plugin_name: str) -> str:
"""
获取指定插件的路径。
Args:
plugin_name (str): 插件名称。
Returns:
str: 插件目录的绝对路径。
Raises:
ValueError: 如果插件不存在。
"""
from src.plugin_system.core.plugin_manager import plugin_manager
if plugin_path := plugin_manager.get_plugin_path(plugin_name):
return plugin_path
else:
raise ValueError(f"插件 '{plugin_name}' 不存在。")
async def remove_plugin(plugin_name: str) -> bool:
"""
卸载指定的插件。
**此函数是异步的,确保在异步环境中调用。**
Args:
@@ -43,7 +66,7 @@ async def remove_plugin(plugin_name: str) -> bool:
async def reload_plugin(plugin_name: str) -> bool:
"""
重新加载指定的插件。
**此函数是异步的,确保在异步环境中调用。**
Args:
@@ -71,6 +94,7 @@ def load_plugin(plugin_name: str) -> Tuple[bool, int]:
return plugin_manager.load_registered_plugin_classes(plugin_name)
def add_plugin_directory(plugin_directory: str) -> bool:
"""
添加插件目录。
@@ -84,6 +108,7 @@ def add_plugin_directory(plugin_directory: str) -> bool:
return plugin_manager.add_plugin_directory(plugin_directory)
def rescan_plugin_directory() -> Tuple[int, int]:
"""
重新扫描插件目录,加载新插件。
@@ -92,4 +117,4 @@ def rescan_plugin_directory() -> Tuple[int, int]:
"""
from src.plugin_system.core.plugin_manager import plugin_manager
return plugin_manager.rescan_plugin_directory()
return plugin_manager.rescan_plugin_directory()

View File

@@ -49,7 +49,7 @@ async def _send_to_target(
display_message: str = "",
typing: bool = False,
reply_to: str = "",
reply_to_platform_id: str = "",
reply_to_platform_id: Optional[str] = None,
storage_message: bool = True,
show_log: bool = True,
) -> bool:
@@ -60,8 +60,11 @@ async def _send_to_target(
content: 消息内容
stream_id: 目标流ID
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息格式,如"发送者:消息内容"
typing: 是否模拟打字等待。
reply_to: 回复消息格式"发送者:消息内容"
reply_to_platform_id: 回复消息,格式为"平台:用户ID",如果不提供则自动查找(插件开发者禁用!)
storage_message: 是否存储消息到数据库
show_log: 发送是否显示日志
Returns:
bool: 是否发送成功
@@ -97,6 +100,10 @@ async def _send_to_target(
anchor_message = None
if reply_to:
anchor_message = await _find_reply_message(target_stream, reply_to)
if anchor_message and anchor_message.message_info.user_info and not reply_to_platform_id:
reply_to_platform_id = (
f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}"
)
# 构建发送消息对象
bot_message = MessageSending(
@@ -262,12 +269,22 @@ async def text_to_stream(
stream_id: 聊天流ID
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
reply_to_platform_id: 回复消息,格式为"平台:用户ID",如果不提供则自动查找(插件开发者禁用!)
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target("text", text, stream_id, "", typing, reply_to, reply_to_platform_id, storage_message)
return await _send_to_target(
"text",
text,
stream_id,
"",
typing,
reply_to,
reply_to_platform_id=reply_to_platform_id,
storage_message=storage_message,
)
async def emoji_to_stream(emoji_base64: str, stream_id: str, storage_message: bool = True) -> bool:
@@ -350,249 +367,3 @@ async def custom_to_stream(
storage_message=storage_message,
show_log=show_log,
)
async def text_to_group(
text: str,
group_id: str,
platform: str = "qq",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向群聊发送文本消息
Args:
text: 要发送的文本内容
group_id: 群聊ID
platform: 平台,默认为"qq"
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target("text", text, stream_id, "", typing, reply_to, storage_message=storage_message)
async def text_to_user(
text: str,
user_id: str,
platform: str = "qq",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向用户发送私聊文本消息
Args:
text: 要发送的文本内容
user_id: 用户ID
platform: 平台,默认为"qq"
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, user_id, False)
return await _send_to_target("text", text, stream_id, "", typing, reply_to, storage_message=storage_message)
async def emoji_to_group(emoji_base64: str, group_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向群聊发送表情包
Args:
emoji_base64: 表情包的base64编码
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target("emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message)
async def emoji_to_user(emoji_base64: str, user_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向用户发送表情包
Args:
emoji_base64: 表情包的base64编码
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, user_id, False)
return await _send_to_target("emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message)
async def image_to_group(image_base64: str, group_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向群聊发送图片
Args:
image_base64: 图片的base64编码
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target("image", image_base64, stream_id, "", typing=False, storage_message=storage_message)
async def image_to_user(image_base64: str, user_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向用户发送图片
Args:
image_base64: 图片的base64编码
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, user_id, False)
return await _send_to_target("image", image_base64, stream_id, "", typing=False)
async def command_to_group(command: str, group_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向群聊发送命令
Args:
command: 命令
group_id: 群聊ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target("command", command, stream_id, "", typing=False, storage_message=storage_message)
async def command_to_user(command: str, user_id: str, platform: str = "qq", storage_message: bool = True) -> bool:
"""向用户发送命令
Args:
command: 命令
user_id: 用户ID
platform: 平台,默认为"qq"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, user_id, False)
return await _send_to_target("command", command, stream_id, "", typing=False, storage_message=storage_message)
# =============================================================================
# 通用发送函数 - 支持任意消息类型
# =============================================================================
async def custom_to_group(
message_type: str,
content: str,
group_id: str,
platform: str = "qq",
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向群聊发送自定义类型消息
Args:
message_type: 消息类型,如"text""image""emoji""video""file"
content: 消息内容通常是base64编码或文本
group_id: 群聊ID
platform: 平台,默认为"qq"
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, group_id, True)
return await _send_to_target(
message_type, content, stream_id, display_message, typing, reply_to, storage_message=storage_message
)
async def custom_to_user(
message_type: str,
content: str,
user_id: str,
platform: str = "qq",
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""向用户发送自定义类型消息
Args:
message_type: 消息类型,如"text""image""emoji""video""file"
content: 消息内容通常是base64编码或文本
user_id: 用户ID
platform: 平台,默认为"qq"
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
stream_id = get_chat_manager().get_stream_id(platform, user_id, False)
return await _send_to_target(
message_type, content, stream_id, display_message, typing, reply_to, storage_message=storage_message
)
async def custom_message(
message_type: str,
content: str,
target_id: str,
is_group: bool = True,
platform: str = "qq",
display_message: str = "",
typing: bool = False,
reply_to: str = "",
storage_message: bool = True,
) -> bool:
"""发送自定义消息的通用接口
Args:
message_type: 消息类型,如"text""image""emoji""video""file""audio"
content: 消息内容
target_id: 目标ID群ID或用户ID
is_group: 是否为群聊True为群聊False为私聊
platform: 平台,默认为"qq"
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
示例:
# 发送视频到群聊
await send_api.custom_message("video", video_base64, "123456", True)
# 发送文件到用户
await send_api.custom_message("file", file_base64, "987654", False)
# 发送音频到群聊并回复特定消息
await send_api.custom_message("audio", audio_base64, "123456", True, reply_to="张三:你好")
"""
stream_id = get_chat_manager().get_stream_id(platform, target_id, is_group)
return await _send_to_target(
message_type, content, stream_id, display_message, typing, reply_to, storage_message=storage_message
)

View File

@@ -1,168 +0,0 @@
"""工具类API模块
提供了各种辅助功能
使用方式:
from src.plugin_system.apis import utils_api
plugin_path = utils_api.get_plugin_path()
data = utils_api.read_json_file("data.json")
timestamp = utils_api.get_timestamp()
"""
import os
import json
import time
import inspect
import datetime
import uuid
from typing import Any, Optional
from src.common.logger import get_logger
logger = get_logger("utils_api")
# =============================================================================
# 文件操作API函数
# =============================================================================
def get_plugin_path(caller_frame=None) -> str:
"""获取调用者插件的路径
Args:
caller_frame: 调用者的栈帧默认为None自动获取
Returns:
str: 插件目录的绝对路径
"""
try:
if caller_frame is None:
caller_frame = inspect.currentframe().f_back # type: ignore
plugin_module_path = inspect.getfile(caller_frame) # type: ignore
plugin_dir = os.path.dirname(plugin_module_path)
return plugin_dir
except Exception as e:
logger.error(f"[UtilsAPI] 获取插件路径失败: {e}")
return ""
def read_json_file(file_path: str, default: Any = None) -> Any:
"""读取JSON文件
Args:
file_path: 文件路径,可以是相对于插件目录的路径
default: 如果文件不存在或读取失败时返回的默认值
Returns:
Any: JSON数据或默认值
"""
try:
# 如果是相对路径,则相对于调用者的插件目录
if not os.path.isabs(file_path):
caller_frame = inspect.currentframe().f_back # type: ignore
plugin_dir = get_plugin_path(caller_frame)
file_path = os.path.join(plugin_dir, file_path)
if not os.path.exists(file_path):
logger.warning(f"[UtilsAPI] 文件不存在: {file_path}")
return default
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error(f"[UtilsAPI] 读取JSON文件出错: {e}")
return default
def write_json_file(file_path: str, data: Any, indent: int = 2) -> bool:
"""写入JSON文件
Args:
file_path: 文件路径,可以是相对于插件目录的路径
data: 要写入的数据
indent: JSON缩进
Returns:
bool: 是否写入成功
"""
try:
# 如果是相对路径,则相对于调用者的插件目录
if not os.path.isabs(file_path):
caller_frame = inspect.currentframe().f_back # type: ignore
plugin_dir = get_plugin_path(caller_frame)
file_path = os.path.join(plugin_dir, file_path)
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
return True
except Exception as e:
logger.error(f"[UtilsAPI] 写入JSON文件出错: {e}")
return False
# =============================================================================
# 时间相关API函数
# =============================================================================
def get_timestamp() -> int:
"""获取当前时间戳
Returns:
int: 当前时间戳(秒)
"""
return int(time.time())
def format_time(timestamp: Optional[int | float] = None, format_str: str = "%Y-%m-%d %H:%M:%S") -> str:
"""格式化时间
Args:
timestamp: 时间戳如果为None则使用当前时间
format_str: 时间格式字符串
Returns:
str: 格式化后的时间字符串
"""
try:
if timestamp is None:
timestamp = time.time()
return datetime.datetime.fromtimestamp(timestamp).strftime(format_str)
except Exception as e:
logger.error(f"[UtilsAPI] 格式化时间失败: {e}")
return ""
def parse_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> int:
"""解析时间字符串为时间戳
Args:
time_str: 时间字符串
format_str: 时间格式字符串
Returns:
int: 时间戳(秒)
"""
try:
dt = datetime.datetime.strptime(time_str, format_str)
return int(dt.timestamp())
except Exception as e:
logger.error(f"[UtilsAPI] 解析时间失败: {e}")
return 0
# =============================================================================
# 其他工具函数
# =============================================================================
def generate_unique_id() -> str:
"""生成唯一ID
Returns:
str: 唯一ID
"""
return str(uuid.uuid4())

View File

@@ -208,7 +208,7 @@ class BaseAction(ABC):
return False, f"等待新消息失败: {str(e)}"
async def send_text(
self, content: str, reply_to: str = "", reply_to_platform_id: str = "", typing: bool = False
self, content: str, reply_to: str = "", typing: bool = False
) -> bool:
"""发送文本消息
@@ -227,7 +227,6 @@ class BaseAction(ABC):
text=content,
stream_id=self.chat_id,
reply_to=reply_to,
reply_to_platform_id=reply_to_platform_id,
typing=typing,
)

View File

@@ -224,6 +224,18 @@ class PluginManager:
list: 已注册的插件类名称列表。
"""
return list(self.plugin_classes.keys())
def get_plugin_path(self, plugin_name: str) -> Optional[str]:
"""
获取指定插件的路径。
Args:
plugin_name: 插件名称
Returns:
Optional[str]: 插件目录的绝对路径如果插件不存在则返回None。
"""
return self.plugin_paths.get(plugin_name)
# === 私有方法 ===
# == 目录管理 ==
@@ -289,6 +301,7 @@ class PluginManager:
return False
module = module_from_spec(spec)
module.__package__ = module_name # 设置模块包名
spec.loader.exec_module(module)
logger.debug(f"插件模块加载成功: {plugin_file}")