database_api_doc

This commit is contained in:
UnCLAS-Prommer
2025-07-27 13:33:16 +08:00
parent dc1b996321
commit d872d63feb
2 changed files with 157 additions and 202 deletions

View File

@@ -6,72 +6,51 @@
```python ```python
from src.plugin_system.apis import database_api from src.plugin_system.apis import database_api
# 或者
from src.plugin_system import database_api
``` ```
## 主要功能 ## 主要功能
### 1. 通用数据库查询 ### 1. 通用数据库操作
#### `db_query(model_class, query_type="get", filters=None, data=None, limit=None, order_by=None, single_result=False)`
执行数据库查询操作的通用接口
**参数:**
- `model_class`Peewee模型类如ActionRecords、Messages等
- `query_type`:查询类型,可选值: "get", "create", "update", "delete", "count"
- `filters`:过滤条件字典,键为字段名,值为要匹配的值
- `data`:用于创建或更新的数据字典
- `limit`:限制结果数量
- `order_by`:排序字段列表,使用字段名,前缀'-'表示降序
- `single_result`:是否只返回单个结果
**返回:**
根据查询类型返回不同的结果:
- "get":返回查询结果列表或单个结果
- "create":返回创建的记录
- "update":返回受影响的行数
- "delete":返回受影响的行数
- "count":返回记录数量
### 2. 便捷查询函数
#### `db_save(model_class, data, key_field=None, key_value=None)`
保存数据到数据库(创建或更新)
**参数:**
- `model_class`Peewee模型类
- `data`:要保存的数据字典
- `key_field`:用于查找现有记录的字段名
- `key_value`:用于查找现有记录的字段值
**返回:**
- `Dict[str, Any]`保存后的记录数据失败时返回None
#### `db_get(model_class, filters=None, order_by=None, limit=None)`
简化的查询函数
**参数:**
- `model_class`Peewee模型类
- `filters`:过滤条件字典
- `order_by`:排序字段
- `limit`:限制结果数量
**返回:**
- `Union[List[Dict], Dict, None]`:查询结果
### 3. 专用函数
#### `store_action_info(...)`
存储动作信息的专用函数
## 使用示例
### 1. 基本查询操作
```python ```python
from src.plugin_system.apis import database_api async def db_query(
from src.common.database.database_model import Messages, ActionRecords model_class: Type[Model],
data: Optional[Dict[str, Any]] = None,
query_type: Optional[str] = "get",
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
order_by: Optional[List[str]] = None,
single_result: Optional[bool] = False,
) -> Union[List[Dict[str, Any]], Dict[str, Any], None]:
```
执行数据库查询操作的通用接口。
# 查询最近10条消息 **Args:**
- `model_class`: Peewee模型类。
- Peewee模型类可以在`src.common.database.database_model`模块中找到,如`ActionRecords``Messages`等。
- `data`: 用于创建或更新的数据
- `query_type`: 查询类型
- 可选值: `get`, `create`, `update`, `delete`, `count`
- `filters`: 过滤条件字典,键为字段名,值为要匹配的值。
- `limit`: 限制结果数量。
- `order_by`: 排序字段列表,使用字段名,前缀'-'表示降序。
- 排序字段,前缀`-`表示降序,例如`-time`表示按时间字段(即`time`字段)降序
- `single_result`: 是否只返回单个结果。
**Returns:**
- 根据查询类型返回不同的结果:
- `get`: 返回查询结果列表或单个结果。(如果 `single_result=True`
- `create`: 返回创建的记录。
- `update`: 返回受影响的行数。
- `delete`: 返回受影响的行数。
- `count`: 返回记录数量。
#### 示例
1. 查询最近10条消息
```python
messages = await database_api.db_query( messages = await database_api.db_query(
Messages, Messages,
query_type="get", query_type="get",
@@ -79,180 +58,159 @@ messages = await database_api.db_query(
limit=10, limit=10,
order_by=["-time"] order_by=["-time"]
) )
# 查询单条记录
message = await database_api.db_query(
Messages,
query_type="get",
filters={"message_id": "msg_123"},
single_result=True
)
``` ```
2. 创建一条记录
### 2. 创建记录
```python ```python
# 创建新的动作记录
new_record = await database_api.db_query( new_record = await database_api.db_query(
ActionRecords, ActionRecords,
data={"action_id": "123", "time": time.time(), "action_name": "TestAction"},
query_type="create", query_type="create",
data={
"action_id": "action_123",
"time": time.time(),
"action_name": "TestAction",
"action_done": True
}
) )
print(f"创建了记录: {new_record['id']}")
``` ```
3. 更新记录
### 3. 更新记录
```python ```python
# 更新动作状态
updated_count = await database_api.db_query( updated_count = await database_api.db_query(
ActionRecords, ActionRecords,
data={"action_done": True},
query_type="update", query_type="update",
filters={"action_id": "action_123"}, filters={"action_id": "123"},
data={"action_done": True, "completion_time": time.time()}
) )
print(f"更新了 {updated_count} 条记录")
``` ```
4. 删除记录
### 4. 删除记录
```python ```python
# 删除过期记录
deleted_count = await database_api.db_query( deleted_count = await database_api.db_query(
ActionRecords, ActionRecords,
query_type="delete", query_type="delete",
filters={"time__lt": time.time() - 86400} # 删除24小时前的记录 filters={"action_id": "123"}
) )
print(f"删除了 {deleted_count} 条过期记录")
``` ```
5. 计数
### 5. 统计查询
```python ```python
# 统计消息数量 count = await database_api.db_query(
message_count = await database_api.db_query(
Messages, Messages,
query_type="count", query_type="count",
filters={"chat_id": chat_stream.stream_id} filters={"chat_id": chat_stream.stream_id}
) )
print(f"该聊天有 {message_count} 条消息")
``` ```
### 6. 使用便捷函 ### 2. 数据库保存
```python
async def db_save(
model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None
) -> Optional[Dict[str, Any]]:
```
保存数据到数据库(创建或更新)
如果提供了key_field和key_value会先尝试查找匹配的记录进行更新
如果没有找到匹配记录或未提供key_field和key_value则创建新记录。
**Args:**
- `model_class`: Peewee模型类。
- `data`: 要保存的数据字典。
- `key_field`: 用于查找现有记录的字段名,例如"action_id"。
- `key_value`: 用于查找现有记录的字段值。
**Returns:**
- `Optional[Dict[str, Any]]`: 保存后的记录数据失败时返回None。
#### 示例
创建或更新一条记录
```python ```python
# 使用db_save进行创建或更新
record = await database_api.db_save( record = await database_api.db_save(
ActionRecords, ActionRecords,
{ {
"action_id": "action_123", "action_id": "123",
"time": time.time(), "time": time.time(),
"action_name": "TestAction", "action_name": "TestAction",
"action_done": True "action_done": True
}, },
key_field="action_id", key_field="action_id",
key_value="action_123" key_value="123"
) )
```
# 使用db_get进行简单查询 ### 3. 数据库获取
recent_messages = await database_api.db_get( ```python
async def db_get(
model_class: Type[Model],
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
order_by: Optional[str] = None,
single_result: Optional[bool] = False,
) -> Union[List[Dict[str, Any]], Dict[str, Any], None]:
```
从数据库获取记录
这是db_query方法的简化版本专注于数据检索操作。
**Args:**
- `model_class`: Peewee模型类。
- `filters`: 过滤条件字典,键为字段名,值为要匹配的值。
- `limit`: 限制结果数量。
- `order_by`: 排序字段,使用字段名,前缀'-'表示降序。
- `single_result`: 是否只返回单个结果如果为True则返回单个记录字典或None否则返回记录字典列表或空列表
**Returns:**
- `Union[List[Dict], Dict, None]`: 查询结果列表或单个结果(如果`single_result=True`失败时返回None。
#### 示例
1. 获取单个记录
```python
record = await database_api.db_get(
ActionRecords,
filters={"action_id": "123"},
limit=1
)
```
2. 获取最近10条记录
```python
records = await database_api.db_get(
Messages, Messages,
filters={"chat_id": chat_stream.stream_id}, filters={"chat_id": chat_stream.stream_id},
limit=10,
order_by="-time", order_by="-time",
limit=5
) )
``` ```
## 高级用法 ### 4. 动作信息存储
### 复杂查询示例
```python ```python
# 查询特定用户在特定时间段的消息 async def store_action_info(
user_messages = await database_api.db_query( chat_stream=None,
Messages, action_build_into_prompt: bool = False,
query_type="get", action_prompt_display: str = "",
filters={ action_done: bool = True,
"user_id": "123456", thinking_id: str = "",
"time__gte": start_time, # 大于等于开始时间 action_data: Optional[dict] = None,
"time__lt": end_time # 小于结束时间 action_name: str = "",
}, ) -> Optional[Dict[str, Any]]:
order_by=["-time"], ```
limit=50 存储动作信息到数据库,是一种针对 Action 的 `db_save()` 的封装函数。
将Action执行的相关信息保存到ActionRecords表中用于后续的记忆和上下文构建。
**Args:**
- `chat_stream`: 聊天流对象包含聊天ID等信息。
- `action_build_into_prompt`: 是否将动作信息构建到提示中。
- `action_prompt_display`: 动作提示的显示文本。
- `action_done`: 动作是否完成。
- `thinking_id`: 思考过程的ID。
- `action_data`: 动作的数据字典。
- `action_name`: 动作的名称。
**Returns:**
- `Optional[Dict[str, Any]]`: 存储后的记录数据失败时返回None。
#### 示例
```python
record = await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=True,
action_prompt_display="执行了回复动作",
action_done=True,
thinking_id="thinking_123",
action_data={"content": "Hello"},
action_name="reply_action"
) )
```
# 批量处理
for message in user_messages:
print(f"消息内容: {message['plain_text']}")
print(f"发送时间: {message['time']}")
```
### 插件中的数据持久化
```python
from src.plugin_system.base import BasePlugin
from src.plugin_system.apis import database_api
class DataPlugin(BasePlugin):
async def handle_action(self, action_data, chat_stream):
# 保存插件数据
plugin_data = {
"plugin_name": self.plugin_name,
"chat_id": chat_stream.stream_id,
"data": json.dumps(action_data),
"created_time": time.time()
}
# 使用自定义表模型(需要先定义)
record = await database_api.db_save(
PluginData, # 假设的插件数据模型
plugin_data,
key_field="plugin_name",
key_value=self.plugin_name
)
return {"success": True, "record_id": record["id"]}
```
## 数据模型
### 常用模型类
系统提供了以下常用的数据模型:
- `Messages`:消息记录
- `ActionRecords`:动作记录
- `UserInfo`:用户信息
- `GroupInfo`:群组信息
### 字段说明
#### Messages模型主要字段
- `message_id`消息ID
- `chat_id`聊天ID
- `user_id`用户ID
- `plain_text`:纯文本内容
- `time`:时间戳
#### ActionRecords模型主要字段
- `action_id`动作ID
- `action_name`:动作名称
- `action_done`:是否完成
- `time`:创建时间
## 注意事项
1. **异步操作**所有数据库API都是异步的必须使用`await`
2. **错误处理**函数内置错误处理失败时返回None或空列表
3. **数据类型**:返回的都是字典格式的数据,不是模型对象
4. **性能考虑**:使用`limit`参数避免查询大量数据
5. **过滤条件**支持简单的等值过滤复杂查询需要使用原生Peewee语法
6. **事务**如需事务支持建议直接使用Peewee的事务功能

View File

@@ -152,10 +152,7 @@ async def db_query(
except DoesNotExist: except DoesNotExist:
# 记录不存在 # 记录不存在
if query_type == "get" and single_result: return None if query_type == "get" and single_result else []
return None
return []
except Exception as e: except Exception as e:
logger.error(f"[DatabaseAPI] 数据库操作出错: {e}") logger.error(f"[DatabaseAPI] 数据库操作出错: {e}")
traceback.print_exc() traceback.print_exc()
@@ -170,7 +167,8 @@ async def db_query(
async def db_save( async def db_save(
model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None model_class: Type[Model], data: Dict[str, Any], key_field: Optional[str] = None, key_value: Optional[Any] = None
) -> Union[Dict[str, Any], None]: ) -> Optional[Dict[str, Any]]:
# sourcery skip: inline-immediately-returned-variable
"""保存数据到数据库(创建或更新) """保存数据到数据库(创建或更新)
如果提供了key_field和key_value会先尝试查找匹配的记录进行更新 如果提供了key_field和key_value会先尝试查找匹配的记录进行更新
@@ -203,10 +201,9 @@ async def db_save(
try: try:
# 如果提供了key_field和key_value尝试更新现有记录 # 如果提供了key_field和key_value尝试更新现有记录
if key_field and key_value is not None: if key_field and key_value is not None:
# 查找现有记录 if existing_records := list(
existing_records = list(model_class.select().where(getattr(model_class, key_field) == key_value).limit(1)) model_class.select().where(getattr(model_class, key_field) == key_value).limit(1)
):
if existing_records:
# 更新现有记录 # 更新现有记录
existing_record = existing_records[0] existing_record = existing_records[0]
for field, value in data.items(): for field, value in data.items():
@@ -244,8 +241,8 @@ async def db_get(
Args: Args:
model_class: Peewee模型类 model_class: Peewee模型类
filters: 过滤条件,字段名和值的字典 filters: 过滤条件,字段名和值的字典
order_by: 排序字段,前缀'-'表示降序,例如'-time'表示按时间字段即time字段降序
limit: 结果数量限制 limit: 结果数量限制
order_by: 排序字段,前缀'-'表示降序,例如'-time'表示按时间字段即time字段降序
single_result: 是否只返回单个结果如果为True则返回单个记录字典或None否则返回记录字典列表或空列表 single_result: 是否只返回单个结果如果为True则返回单个记录字典或None否则返回记录字典列表或空列表
Returns: Returns:
@@ -310,7 +307,7 @@ async def store_action_info(
thinking_id: str = "", thinking_id: str = "",
action_data: Optional[dict] = None, action_data: Optional[dict] = None,
action_name: str = "", action_name: str = "",
) -> Union[Dict[str, Any], None]: ) -> Optional[Dict[str, Any]]:
"""存储动作信息到数据库 """存储动作信息到数据库
将Action执行的相关信息保存到ActionRecords表中用于后续的记忆和上下文构建。 将Action执行的相关信息保存到ActionRecords表中用于后续的记忆和上下文构建。