diff --git a/docs/plugins/api/database-api.md b/docs/plugins/api/database-api.md index 174bef158..5b6b4468f 100644 --- a/docs/plugins/api/database-api.md +++ b/docs/plugins/api/database-api.md @@ -6,72 +6,51 @@ ```python from src.plugin_system.apis import database_api +# 或者 +from src.plugin_system import database_api ``` ## 主要功能 -### 1. 通用数据库查询 - -#### `db_query(model_class, query_type="get", filters=None, data=None, limit=None, order_by=None, single_result=False)` -执行数据库查询操作的通用接口 - -**参数:** -- `model_class`:Peewee模型类,如ActionRecords、Messages等 -- `query_type`:查询类型,可选值: "get", "create", "update", "delete", "count" -- `filters`:过滤条件字典,键为字段名,值为要匹配的值 -- `data`:用于创建或更新的数据字典 -- `limit`:限制结果数量 -- `order_by`:排序字段列表,使用字段名,前缀'-'表示降序 -- `single_result`:是否只返回单个结果 - -**返回:** -根据查询类型返回不同的结果: -- "get":返回查询结果列表或单个结果 -- "create":返回创建的记录 -- "update":返回受影响的行数 -- "delete":返回受影响的行数 -- "count":返回记录数量 - -### 2. 便捷查询函数 - -#### `db_save(model_class, data, key_field=None, key_value=None)` -保存数据到数据库(创建或更新) - -**参数:** -- `model_class`:Peewee模型类 -- `data`:要保存的数据字典 -- `key_field`:用于查找现有记录的字段名 -- `key_value`:用于查找现有记录的字段值 - -**返回:** -- `Dict[str, Any]`:保存后的记录数据,失败时返回None - -#### `db_get(model_class, filters=None, order_by=None, limit=None)` -简化的查询函数 - -**参数:** -- `model_class`:Peewee模型类 -- `filters`:过滤条件字典 -- `order_by`:排序字段 -- `limit`:限制结果数量 - -**返回:** -- `Union[List[Dict], Dict, None]`:查询结果 - -### 3. 专用函数 - -#### `store_action_info(...)` -存储动作信息的专用函数 - -## 使用示例 - -### 1. 基本查询操作 +### 1. 通用数据库操作 ```python -from src.plugin_system.apis import database_api -from src.common.database.database_model import Messages, ActionRecords +async def db_query( + model_class: Type[Model], + data: Optional[Dict[str, Any]] = None, + query_type: Optional[str] = "get", + filters: Optional[Dict[str, Any]] = None, + limit: Optional[int] = None, + order_by: Optional[List[str]] = None, + single_result: Optional[bool] = False, +) -> Union[List[Dict[str, Any]], Dict[str, Any], None]: +``` +执行数据库查询操作的通用接口。 -# 查询最近10条消息 +**Args:** +- `model_class`: Peewee模型类。 + - Peewee模型类可以在`src.common.database.database_model`模块中找到,如`ActionRecords`、`Messages`等。 +- `data`: 用于创建或更新的数据 +- `query_type`: 查询类型 + - 可选值: `get`, `create`, `update`, `delete`, `count`。 +- `filters`: 过滤条件字典,键为字段名,值为要匹配的值。 +- `limit`: 限制结果数量。 +- `order_by`: 排序字段列表,使用字段名,前缀'-'表示降序。 + - 排序字段,前缀`-`表示降序,例如`-time`表示按时间字段(即`time`字段)降序 +- `single_result`: 是否只返回单个结果。 + +**Returns:** +- 根据查询类型返回不同的结果: + - `get`: 返回查询结果列表或单个结果。(如果 `single_result=True`) + - `create`: 返回创建的记录。 + - `update`: 返回受影响的行数。 + - `delete`: 返回受影响的行数。 + - `count`: 返回记录数量。 + +#### 示例 + +1. 查询最近10条消息 +```python messages = await database_api.db_query( Messages, query_type="get", @@ -79,180 +58,159 @@ messages = await database_api.db_query( limit=10, order_by=["-time"] ) - -# 查询单条记录 -message = await database_api.db_query( - Messages, - query_type="get", - filters={"message_id": "msg_123"}, - single_result=True -) ``` - -### 2. 创建记录 - +2. 创建一条记录 ```python -# 创建新的动作记录 new_record = await database_api.db_query( ActionRecords, + data={"action_id": "123", "time": time.time(), "action_name": "TestAction"}, query_type="create", - data={ - "action_id": "action_123", - "time": time.time(), - "action_name": "TestAction", - "action_done": True - } ) - -print(f"创建了记录: {new_record['id']}") ``` - -### 3. 更新记录 - +3. 更新记录 ```python -# 更新动作状态 updated_count = await database_api.db_query( ActionRecords, + data={"action_done": True}, query_type="update", - filters={"action_id": "action_123"}, - data={"action_done": True, "completion_time": time.time()} + filters={"action_id": "123"}, ) - -print(f"更新了 {updated_count} 条记录") ``` - -### 4. 删除记录 - +4. 删除记录 ```python -# 删除过期记录 deleted_count = await database_api.db_query( ActionRecords, query_type="delete", - filters={"time__lt": time.time() - 86400} # 删除24小时前的记录 + filters={"action_id": "123"} ) - -print(f"删除了 {deleted_count} 条过期记录") ``` - -### 5. 统计查询 - +5. 计数 ```python -# 统计消息数量 -message_count = await database_api.db_query( +count = await database_api.db_query( Messages, query_type="count", filters={"chat_id": chat_stream.stream_id} ) - -print(f"该聊天有 {message_count} 条消息") ``` -### 6. 使用便捷函数 - +### 2. 数据库保存 +```python +async def db_save( + model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None +) -> Optional[Dict[str, Any]]: +``` +保存数据到数据库(创建或更新) + +如果提供了key_field和key_value,会先尝试查找匹配的记录进行更新; + +如果没有找到匹配记录,或未提供key_field和key_value,则创建新记录。 + +**Args:** +- `model_class`: Peewee模型类。 +- `data`: 要保存的数据字典。 +- `key_field`: 用于查找现有记录的字段名,例如"action_id"。 +- `key_value`: 用于查找现有记录的字段值。 + +**Returns:** +- `Optional[Dict[str, Any]]`: 保存后的记录数据,失败时返回None。 + +#### 示例 +创建或更新一条记录 ```python -# 使用db_save进行创建或更新 record = await database_api.db_save( ActionRecords, { - "action_id": "action_123", + "action_id": "123", "time": time.time(), "action_name": "TestAction", "action_done": True }, key_field="action_id", - key_value="action_123" + key_value="123" ) +``` -# 使用db_get进行简单查询 -recent_messages = await database_api.db_get( +### 3. 数据库获取 +```python +async def db_get( + model_class: Type[Model], + filters: Optional[Dict[str, Any]] = None, + limit: Optional[int] = None, + order_by: Optional[str] = None, + single_result: Optional[bool] = False, +) -> Union[List[Dict[str, Any]], Dict[str, Any], None]: +``` + +从数据库获取记录 + +这是db_query方法的简化版本,专注于数据检索操作。 + +**Args:** +- `model_class`: Peewee模型类。 +- `filters`: 过滤条件字典,键为字段名,值为要匹配的值。 +- `limit`: 限制结果数量。 +- `order_by`: 排序字段,使用字段名,前缀'-'表示降序。 +- `single_result`: 是否只返回单个结果,如果为True,则返回单个记录字典或None;否则返回记录字典列表或空列表 + +**Returns:** +- `Union[List[Dict], Dict, None]`: 查询结果列表或单个结果(如果`single_result=True`),失败时返回None。 + +#### 示例 +1. 获取单个记录 +```python +record = await database_api.db_get( + ActionRecords, + filters={"action_id": "123"}, + limit=1 +) +``` +2. 获取最近10条记录 +```python +records = await database_api.db_get( Messages, filters={"chat_id": chat_stream.stream_id}, + limit=10, order_by="-time", - limit=5 ) ``` -## 高级用法 - -### 复杂查询示例 - +### 4. 动作信息存储 ```python -# 查询特定用户在特定时间段的消息 -user_messages = await database_api.db_query( - Messages, - query_type="get", - filters={ - "user_id": "123456", - "time__gte": start_time, # 大于等于开始时间 - "time__lt": end_time # 小于结束时间 - }, - order_by=["-time"], - limit=50 +async def store_action_info( + chat_stream=None, + action_build_into_prompt: bool = False, + action_prompt_display: str = "", + action_done: bool = True, + thinking_id: str = "", + action_data: Optional[dict] = None, + action_name: str = "", +) -> Optional[Dict[str, Any]]: +``` +存储动作信息到数据库,是一种针对 Action 的 `db_save()` 的封装函数。 + +将Action执行的相关信息保存到ActionRecords表中,用于后续的记忆和上下文构建。 + +**Args:** +- `chat_stream`: 聊天流对象,包含聊天ID等信息。 +- `action_build_into_prompt`: 是否将动作信息构建到提示中。 +- `action_prompt_display`: 动作提示的显示文本。 +- `action_done`: 动作是否完成。 +- `thinking_id`: 思考过程的ID。 +- `action_data`: 动作的数据字典。 +- `action_name`: 动作的名称。 + +**Returns:** +- `Optional[Dict[str, Any]]`: 存储后的记录数据,失败时返回None。 + +#### 示例 +```python +record = await database_api.store_action_info( + chat_stream=chat_stream, + action_build_into_prompt=True, + action_prompt_display="执行了回复动作", + action_done=True, + thinking_id="thinking_123", + action_data={"content": "Hello"}, + action_name="reply_action" ) - -# 批量处理 -for message in user_messages: - print(f"消息内容: {message['plain_text']}") - print(f"发送时间: {message['time']}") -``` - -### 插件中的数据持久化 - -```python -from src.plugin_system.base import BasePlugin -from src.plugin_system.apis import database_api - -class DataPlugin(BasePlugin): - async def handle_action(self, action_data, chat_stream): - # 保存插件数据 - plugin_data = { - "plugin_name": self.plugin_name, - "chat_id": chat_stream.stream_id, - "data": json.dumps(action_data), - "created_time": time.time() - } - - # 使用自定义表模型(需要先定义) - record = await database_api.db_save( - PluginData, # 假设的插件数据模型 - plugin_data, - key_field="plugin_name", - key_value=self.plugin_name - ) - - return {"success": True, "record_id": record["id"]} -``` - -## 数据模型 - -### 常用模型类 -系统提供了以下常用的数据模型: - -- `Messages`:消息记录 -- `ActionRecords`:动作记录 -- `UserInfo`:用户信息 -- `GroupInfo`:群组信息 - -### 字段说明 - -#### Messages模型主要字段 -- `message_id`:消息ID -- `chat_id`:聊天ID -- `user_id`:用户ID -- `plain_text`:纯文本内容 -- `time`:时间戳 - -#### ActionRecords模型主要字段 -- `action_id`:动作ID -- `action_name`:动作名称 -- `action_done`:是否完成 -- `time`:创建时间 - -## 注意事项 - -1. **异步操作**:所有数据库API都是异步的,必须使用`await` -2. **错误处理**:函数内置错误处理,失败时返回None或空列表 -3. **数据类型**:返回的都是字典格式的数据,不是模型对象 -4. **性能考虑**:使用`limit`参数避免查询大量数据 -5. **过滤条件**:支持简单的等值过滤,复杂查询需要使用原生Peewee语法 -6. **事务**:如需事务支持,建议直接使用Peewee的事务功能 \ No newline at end of file +``` \ No newline at end of file diff --git a/src/plugin_system/apis/database_api.py b/src/plugin_system/apis/database_api.py index d46bfba39..8b253806b 100644 --- a/src/plugin_system/apis/database_api.py +++ b/src/plugin_system/apis/database_api.py @@ -152,10 +152,7 @@ async def db_query( except DoesNotExist: # 记录不存在 - if query_type == "get" and single_result: - return None - return [] - + return None if query_type == "get" and single_result else [] except Exception as e: logger.error(f"[DatabaseAPI] 数据库操作出错: {e}") traceback.print_exc() @@ -170,7 +167,8 @@ async def db_query( async def db_save( model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None -) -> Union[Dict[str, Any], None]: +) -> Optional[Dict[str, Any]]: + # sourcery skip: inline-immediately-returned-variable """保存数据到数据库(创建或更新) 如果提供了key_field和key_value,会先尝试查找匹配的记录进行更新; @@ -203,10 +201,9 @@ async def db_save( try: # 如果提供了key_field和key_value,尝试更新现有记录 if key_field and key_value is not None: - # 查找现有记录 - existing_records = list(model_class.select().where(getattr(model_class, key_field) == key_value).limit(1)) - - if existing_records: + if existing_records := list( + model_class.select().where(getattr(model_class, key_field) == key_value).limit(1) + ): # 更新现有记录 existing_record = existing_records[0] for field, value in data.items(): @@ -244,8 +241,8 @@ async def db_get( Args: model_class: Peewee模型类 filters: 过滤条件,字段名和值的字典 - order_by: 排序字段,前缀'-'表示降序,例如'-time'表示按时间字段(即time字段)降序 limit: 结果数量限制 + order_by: 排序字段,前缀'-'表示降序,例如'-time'表示按时间字段(即time字段)降序 single_result: 是否只返回单个结果,如果为True,则返回单个记录字典或None;否则返回记录字典列表或空列表 Returns: @@ -310,7 +307,7 @@ async def store_action_info( thinking_id: str = "", action_data: Optional[dict] = None, action_name: str = "", -) -> Union[Dict[str, Any], None]: +) -> Optional[Dict[str, Any]]: """存储动作信息到数据库 将Action执行的相关信息保存到ActionRecords表中,用于后续的记忆和上下文构建。