This commit is contained in:
雅诺狐
2025-10-05 17:50:13 +08:00
22 changed files with 1145 additions and 458 deletions

View File

@@ -18,7 +18,7 @@
- [x] 添加表情包情感分析功能
- [x] 添加主动思考配置
- [x] 添加日程管理
- [ ] 添加MCP SSE支持
- [x] 添加MCP SSE支持
- [ ] 增加基于GPT-Sovits的多情感语音合成功能插件形式
- [ ] 增加基于Open Voice的语音合成功能插件形式
- [x] 对聊天信息的视频增加一个videoid就像imageid一样

274
docs/MCP_SSE_USAGE.md Normal file
View File

@@ -0,0 +1,274 @@
# MCP SSE 客户端使用指南
## 简介
MCP (Model Context Protocol) SSE (Server-Sent Events) 客户端支持通过SSE协议与MCP兼容的服务器进行通信。该客户端已集成到MoFox Bot的LLM模型客户端系统中。
## 功能特性
- ✅ 支持SSE流式响应
- ✅ 支持多轮对话
- ✅ 支持工具调用Function Calling
- ✅ 支持多模态内容(文本+图片)
- ✅ 自动处理中断信号
- ✅ 完整的Token使用统计
## 配置方法
### 1. 安装依赖
依赖已自动添加到项目中:
```bash
pip install mcp>=0.9.0 sse-starlette>=2.2.1
```
或使用uv
```bash
uv sync
```
### 2. 配置API Provider
在配置文件中添加MCP SSE provider
```python
# 在配置中添加
api_providers = [
{
"name": "mcp_provider",
"client_type": "mcp_sse", # 使用MCP SSE客户端
"base_url": "https://your-mcp-server.com",
"api_key": "your-api-key",
"timeout": 60
}
]
```
### 3. 配置模型
```python
models = [
{
"name": "mcp_model",
"api_provider": "mcp_provider",
"model_identifier": "your-model-name",
"force_stream_mode": True # MCP SSE始终使用流式
}
]
```
## 使用示例
### 基础对话
```python
from src.llm_models.model_client.base_client import client_registry
from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType
from src.config.api_ada_configs import APIProvider, ModelInfo
# 获取客户端
api_provider = APIProvider(
name="mcp_provider",
client_type="mcp_sse",
base_url="https://your-mcp-server.com",
api_key="your-api-key"
)
client = client_registry.get_client_class_instance(api_provider)
# 构建消息
messages = [
MessageBuilder()
.set_role(RoleType.User)
.add_text_content("你好,请介绍一下你自己")
.build()
]
# 获取响应
model_info = ModelInfo(
name="mcp_model",
api_provider="mcp_provider",
model_identifier="your-model-name"
)
response = await client.get_response(
model_info=model_info,
message_list=messages,
max_tokens=1024,
temperature=0.7
)
print(response.content)
```
### 使用工具调用
```python
from src.llm_models.payload_content.tool_option import (
ToolOptionBuilder,
ToolParamType
)
# 定义工具
tools = [
ToolOptionBuilder()
.set_name("get_weather")
.set_description("获取指定城市的天气信息")
.add_param(
name="city",
param_type=ToolParamType.STRING,
description="城市名称",
required=True
)
.build()
]
# 发送请求
response = await client.get_response(
model_info=model_info,
message_list=messages,
tool_options=tools,
max_tokens=1024,
temperature=0.7
)
# 检查工具调用
if response.tool_calls:
for tool_call in response.tool_calls:
print(f"调用工具: {tool_call.func_name}")
print(f"参数: {tool_call.args}")
```
### 多模态对话
```python
import base64
# 读取图片并编码
with open("image.jpg", "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
# 构建多模态消息
messages = [
MessageBuilder()
.set_role(RoleType.User)
.add_text_content("这张图片里有什么?")
.add_image_content("jpg", image_data)
.build()
]
response = await client.get_response(
model_info=model_info,
message_list=messages
)
```
### 中断处理
```python
import asyncio
# 创建中断事件
interrupt_flag = asyncio.Event()
# 在另一个协程中设置中断
async def interrupt_after_delay():
await asyncio.sleep(5)
interrupt_flag.set()
asyncio.create_task(interrupt_after_delay())
try:
response = await client.get_response(
model_info=model_info,
message_list=messages,
interrupt_flag=interrupt_flag
)
except ReqAbortException:
print("请求被中断")
```
## MCP协议规范
MCP SSE客户端遵循以下协议规范
### 请求格式
```json
{
"model": "model-name",
"messages": [
{
"role": "user",
"content": "message content"
}
],
"max_tokens": 1024,
"temperature": 0.7,
"stream": true,
"tools": [
{
"name": "tool_name",
"description": "tool description",
"input_schema": {
"type": "object",
"properties": {...},
"required": [...]
}
}
]
}
```
### SSE事件类型
客户端处理以下SSE事件
1. **content_block_start** - 内容块开始
2. **content_block_delta** - 内容块增量
3. **content_block_stop** - 内容块结束
4. **message_delta** - 消息元数据更新
5. **message_stop** - 消息结束
## 限制说明
当前MCP SSE客户端的限制
- ❌ 不支持嵌入Embedding功能
- ❌ 不支持音频转录功能
- ✅ 仅支持流式响应SSE特性
## 故障排查
### 连接失败
检查:
1. base_url是否正确
2. API key是否有效
3. 网络连接是否正常
4. 服务器是否支持SSE协议
### 解析错误
检查:
1. 服务器返回的SSE格式是否符合MCP规范
2. 查看日志中的详细错误信息
### 工具调用失败
检查:
1. 工具定义的schema是否正确
2. 服务器是否支持工具调用功能
## 相关文档
- [MCP协议规范](https://github.com/anthropics/mcp)
- [SSE规范](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events)
- [MoFox Bot文档](../README.md)
## 更新日志
### v0.8.1
- ✅ 添加MCP SSE客户端支持
- ✅ 支持流式响应和工具调用
- ✅ 支持多模态内容

View File

@@ -76,6 +76,8 @@ dependencies = [
"aiosqlite>=0.21.0",
"inkfox>=0.1.0",
"rrjieba>=0.1.13",
"mcp>=0.9.0",
"sse-starlette>=2.2.1",
]
[[tool.uv.index]]

View File

@@ -69,4 +69,6 @@ lunar_python
fuzzywuzzy
python-multipart
aiofiles
inkfox
inkfox
mcp
sse-starlette

View File

@@ -1,3 +1,4 @@
import asyncio
import hashlib
import random
import time
@@ -304,14 +305,6 @@ class ExpressionSelector:
try:
# start_time = time.time()
content, (reasoning_content, model_name, _) = await self.llm_model.generate_response_async(prompt=prompt)
# logger.info(f"LLM请求时间: {model_name} {time.time() - start_time} \n{prompt}")
# logger.info(f"模型名称: {model_name}")
# logger.info(f"LLM返回结果: {content}")
# if reasoning_content:
# logger.info(f"LLM推理: {reasoning_content}")
# else:
# logger.info(f"LLM推理: 无")
if not content:
logger.warning("LLM返回空结果")
@@ -338,7 +331,7 @@ class ExpressionSelector:
# 对选中的所有表达方式一次性更新count数
if valid_expressions:
await self.update_expressions_count_batch(valid_expressions, 0.006)
asyncio.create_task(self.update_expressions_count_batch(valid_expressions, 0.006))
# logger.info(f"LLM从{len(all_expressions)}个情境中选择了{len(valid_expressions)}个")
return valid_expressions

View File

@@ -997,7 +997,6 @@ class MemorySystem:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
# get_stream 为异步方法,需要 await
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream or not hasattr(chat_stream, "context_manager"):

View File

@@ -13,6 +13,7 @@ from src.common.data_models.database_data_model import DatabaseMessages
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.base.component_types import ChatType
from .distribution_manager import stream_loop_manager
@@ -54,7 +55,13 @@ class SingleStreamContextManager:
bool: 是否成功添加
"""
try:
self.context.add_message(message)
# 直接操作上下文的消息列表
message.is_read = False
self.context.unread_messages.append(message)
# 自动检测和更新chat type
self._detect_chat_type(message)
# 在上下文管理器中计算兴趣值
await self._calculate_message_interest(message)
self.total_messages += 1
@@ -78,7 +85,28 @@ class SingleStreamContextManager:
bool: 是否成功更新
"""
try:
self.context.update_message_info(message_id, **updates)
# 直接在未读消息中查找并更新
for message in self.context.unread_messages:
if message.message_id == message_id:
if "interest_value" in updates:
message.interest_value = updates["interest_value"]
if "actions" in updates:
message.actions = updates["actions"]
if "should_reply" in updates:
message.should_reply = updates["should_reply"]
break
# 在历史消息中查找并更新
for message in self.context.history_messages:
if message.message_id == message_id:
if "interest_value" in updates:
message.interest_value = updates["interest_value"]
if "actions" in updates:
message.actions = updates["actions"]
if "should_reply" in updates:
message.should_reply = updates["should_reply"]
break
logger.debug(f"更新单流上下文消息: {self.stream_id}/{message_id}")
return True
except Exception as e:
@@ -259,36 +287,17 @@ class SingleStreamContextManager:
logger.error(f"计算消息兴趣度时发生错误: {e}", exc_info=True)
return 0.5
async def add_message_async(self, message: DatabaseMessages, skip_energy_update: bool = False) -> bool:
"""异步实现的 add_message将消息添加到 context并 await 能量更新与分发。"""
try:
self.context.add_message(message)
# 在上下文管理器中计算兴趣值
await self._calculate_message_interest(message)
self.total_messages += 1
self.last_access_time = time.time()
# 启动流的循环任务(如果还未启动)
asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id))
logger.debug(f"添加消息到单流上下文(异步): {self.stream_id}")
return True
except Exception as e:
logger.error(f"添加消息到单流上下文失败 (async) {self.stream_id}: {e}", exc_info=True)
return False
async def update_message_async(self, message_id: str, updates: dict[str, Any]) -> bool:
"""异步实现的 update_message更新消息并在需要时 await 能量更新。"""
try:
self.context.update_message_info(message_id, **updates)
logger.debug(f"更新单流上下文消息(异步): {self.stream_id}/{message_id}")
return True
except Exception as e:
logger.error(f"更新单流上下文消息失败 (async) {self.stream_id}/{message_id}: {e}", exc_info=True)
return False
def _detect_chat_type(self, message: DatabaseMessages):
"""根据消息内容自动检测聊天类型"""
# 只有在第一次添加消息时才检测聊天类型,避免后续消息改变类型
if len(self.context.unread_messages) == 1: # 只有这条消息
# 如果消息包含群组信息,则为群聊
if hasattr(message, "chat_info_group_id") and message.chat_info_group_id:
self.context.chat_type = ChatType.GROUP
elif hasattr(message, "chat_info_group_name") and message.chat_info_group_name:
self.context.chat_type = ChatType.GROUP
else:
self.context.chat_type = ChatType.PRIVATE
async def clear_context_async(self) -> bool:
"""异步实现的 clear_context清空消息并 await 能量重算。"""

View File

@@ -23,8 +23,6 @@ class StreamLoopManager:
def __init__(self, max_concurrent_streams: int | None = None):
# 流循环任务管理
self.stream_loops: dict[str, asyncio.Task] = {}
# 跟踪流使用的管理器类型
self.stream_management_type: dict[str, str] = {} # stream_id -> "adaptive" or "fallback"
# 统计信息
self.stats: dict[str, Any] = {
@@ -115,7 +113,6 @@ class StreamLoopManager:
return True
# 使用自适应流管理器获取槽位
use_adaptive = False
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
@@ -132,21 +129,14 @@ class StreamLoopManager:
)
if slot_acquired:
use_adaptive = True
logger.debug(f"成功获取流处理槽位: {stream_id} (优先级: {priority.name})")
else:
logger.debug(f"自适应管理器拒绝槽位请求: {stream_id},尝试回退方案")
else:
logger.debug("自适应管理器未运行,使用原始方法")
logger.debug("自适应管理器未运行")
except Exception as e:
logger.debug(f"自适应管理器获取槽位失败,使用原始方法: {e}")
# 如果自适应管理器失败或未运行,使用回退方案
if not use_adaptive:
if not await self._fallback_acquire_slot(stream_id, force):
logger.debug(f"回退方案也失败: {stream_id}")
return False
logger.debug(f"自适应管理器获取槽位失败: {e}")
# 创建流循环任务
try:
@@ -155,68 +145,22 @@ class StreamLoopManager:
name=f"stream_loop_{stream_id}"
)
self.stream_loops[stream_id] = loop_task
# 记录管理器类型
self.stream_management_type[stream_id] = "adaptive" if use_adaptive else "fallback"
# 更新统计信息
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
logger.info(f"启动流循环任务: {stream_id} (管理器: {'adaptive' if use_adaptive else 'fallback'})")
logger.info(f"启动流循环任务: {stream_id}")
return True
except Exception as e:
logger.error(f"启动流循环任务失败 {stream_id}: {e}")
# 释放槽位
if use_adaptive:
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
except:
pass
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
return False
async def _fallback_acquire_slot(self, stream_id: str, force: bool) -> bool:
"""回退方案:获取槽位(原始方法)"""
# 判断是否需要强制分发
should_force = force or await self._should_force_dispatch_for_stream(stream_id)
# 检查是否超过最大并发限制
current_streams = len(self.stream_loops)
if current_streams >= self.max_concurrent_streams and not should_force:
logger.warning(
f"超过最大并发流数限制({current_streams}/{self.max_concurrent_streams}),无法启动流 {stream_id}"
)
return False
# 处理强制分发情况
if should_force and current_streams >= self.max_concurrent_streams:
logger.warning(
f"{stream_id} 未读消息积压严重(>{self.force_dispatch_unread_threshold}),突破并发限制强制启动分发 (当前: {current_streams}/{self.max_concurrent_streams})"
)
# 检查是否有现有的分发循环,如果有则先移除
if stream_id in self.stream_loops:
logger.info(f"发现现有流循环 {stream_id},将先移除再重新创建")
existing_task = self.stream_loops[stream_id]
if not existing_task.done():
existing_task.cancel()
# 创建异步任务来等待取消完成,并添加异常处理
cancel_task = asyncio.create_task(
self._wait_for_task_cancel(stream_id, existing_task),
name=f"cancel_existing_loop_{stream_id}"
)
# 为取消任务添加异常处理,避免孤儿任务
cancel_task.add_done_callback(
lambda task: logger.debug(f"取消任务完成: {stream_id}") if not task.exception()
else logger.error(f"取消任务异常: {stream_id} - {task.exception()}")
)
# 从字典中移除
del self.stream_loops[stream_id]
current_streams -= 1 # 更新当前流数量
return True
def _determine_stream_priority(self, stream_id: str) -> "StreamPriority":
"""确定流优先级"""
try:
@@ -237,20 +181,6 @@ class StreamLoopManager:
from src.chat.message_manager.adaptive_stream_manager import StreamPriority
return StreamPriority.NORMAL
# 创建流循环任务
try:
task = asyncio.create_task(
self._stream_loop(stream_id),
name=f"stream_loop_{stream_id}" # 为任务添加名称,便于调试
)
self.stream_loops[stream_id] = task
self.stats["total_loops"] += 1
logger.info(f"启动流循环: {stream_id} (当前总数: {len(self.stream_loops)})")
return True
except Exception as e:
logger.error(f"创建流循环任务失败: {stream_id} - {e}")
return False
async def stop_stream_loop(self, stream_id: str) -> bool:
"""停止指定流的循环任务
@@ -342,17 +272,6 @@ class StreamLoopManager:
# 4. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages)
if has_messages:
updated_unread_count = self._get_unread_count(context)
if self._needs_force_dispatch_for_context(context, updated_unread_count):
interval = min(interval, max(self.force_dispatch_min_interval, 0.0))
logger.debug(
"%s 未读消息仍有 %d 条,使用加速分发间隔 %.2fs",
stream_id,
updated_unread_count,
interval,
)
# 5. sleep等待下次检查
logger.info(f"{stream_id} 等待 {interval:.2f}s")
await asyncio.sleep(interval)
@@ -378,23 +297,14 @@ class StreamLoopManager:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
# 根据管理器类型释放相应的槽位
management_type = self.stream_management_type.get(stream_id, "fallback")
if management_type == "adaptive":
# 释放自适应管理器的槽位
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
logger.debug(f"释放自适应流处理槽位: {stream_id}")
except Exception as e:
logger.debug(f"释放自适应流处理槽位失败: {e}")
else:
logger.debug(f"{stream_id} 使用回退方案,无需释放自适应槽位")
# 清理管理器类型记录
if stream_id in self.stream_management_type:
del self.stream_management_type[stream_id]
# 释放自适应管理器的槽位
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
logger.debug(f"释放自适应流处理槽位: {stream_id}")
except Exception as e:
logger.debug(f"释放自适应流处理槽位失败: {e}")
logger.info(f"流循环结束: {stream_id}")
@@ -417,7 +327,7 @@ class StreamLoopManager:
logger.error(f"获取流上下文失败 {stream_id}: {e}")
return None
async def _has_messages_to_process(self, context: Any) -> bool:
async def _has_messages_to_process(self, context: StreamContext) -> bool:
"""检查是否有消息需要处理
Args:
@@ -464,7 +374,7 @@ class StreamLoopManager:
success = results.get("success", False)
if success:
await self._refresh_focus_energy(stream_id)
asyncio.create_task(self._refresh_focus_energy(stream_id))
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
@@ -553,16 +463,16 @@ class StreamLoopManager:
logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}")
return False
def _get_unread_count(self, context: Any) -> int:
def _get_unread_count(self, context: StreamContext) -> int:
try:
unread_messages = getattr(context, "unread_messages", None)
unread_messages = context.unread_messages
if unread_messages is None:
return 0
return len(unread_messages)
except Exception:
return 0
def _needs_force_dispatch_for_context(self, context: Any, unread_count: int | None = None) -> bool:
def _needs_force_dispatch_for_context(self, context: StreamContext, unread_count: int | None = None) -> bool:
if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0:
return False

View File

@@ -1,5 +1,6 @@
import os
import re
import time
import traceback
from typing import Any
@@ -468,55 +469,110 @@ class ChatBot:
template_group_name = None
async def preprocess():
# 存储消息到数据库
from .storage import MessageStorage
try:
await MessageStorage.store_message(message, message.chat_stream)
logger.debug(f"消息已存储到数据库: {message.message_info.message_id}")
except Exception as e:
logger.error(f"存储消息到数据库失败: {e}")
traceback.print_exc()
# 使用消息管理器处理消息(保持原有功能)
from src.common.data_models.database_data_model import DatabaseMessages
message_info = message.message_info
msg_user_info = getattr(message_info, "user_info", None)
stream_user_info = getattr(message.chat_stream, "user_info", None)
group_info = getattr(message.chat_stream, "group_info", None)
message_id = message_info.message_id or ""
message_time = message_info.time if message_info.time is not None else time.time()
is_mentioned = None
if isinstance(message.is_mentioned, bool):
is_mentioned = message.is_mentioned
elif isinstance(message.is_mentioned, (int, float)):
is_mentioned = message.is_mentioned != 0
user_id = ""
user_nickname = ""
user_cardname = None
user_platform = ""
if msg_user_info:
user_id = str(getattr(msg_user_info, "user_id", "") or "")
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
user_cardname = getattr(msg_user_info, "user_cardname", None)
user_platform = getattr(msg_user_info, "platform", "") or ""
elif stream_user_info:
user_id = str(getattr(stream_user_info, "user_id", "") or "")
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
user_cardname = getattr(stream_user_info, "user_cardname", None)
user_platform = getattr(stream_user_info, "platform", "") or ""
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "")
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
chat_user_cardname = getattr(stream_user_info, "user_cardname", None)
chat_user_platform = getattr(stream_user_info, "platform", "") or ""
group_id = getattr(group_info, "group_id", None)
group_name = getattr(group_info, "group_name", None)
group_platform = getattr(group_info, "platform", None)
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message.message_info.message_id,
time=message.message_info.time,
message_id=message_id,
time=float(message_time),
chat_id=message.chat_stream.stream_id,
processed_plain_text=message.processed_plain_text,
display_message=message.processed_plain_text,
is_mentioned=message.is_mentioned,
is_at=message.is_at,
is_emoji=message.is_emoji,
is_picid=message.is_picid,
is_command=message.is_command,
is_notify=message.is_notify,
user_id=message.message_info.user_info.user_id,
user_nickname=message.message_info.user_info.user_nickname,
user_cardname=message.message_info.user_info.user_cardname,
user_platform=message.message_info.user_info.platform,
is_mentioned=is_mentioned,
is_at=bool(message.is_at) if message.is_at is not None else None,
is_emoji=bool(message.is_emoji),
is_picid=bool(message.is_picid),
is_command=bool(message.is_command),
is_notify=bool(message.is_notify),
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_stream_id=message.chat_stream.stream_id,
chat_info_platform=message.chat_stream.platform,
chat_info_create_time=message.chat_stream.create_time,
chat_info_last_active_time=message.chat_stream.last_active_time,
chat_info_user_id=message.chat_stream.user_info.user_id,
chat_info_user_nickname=message.chat_stream.user_info.user_nickname,
chat_info_user_cardname=message.chat_stream.user_info.user_cardname,
chat_info_user_platform=message.chat_stream.user_info.platform,
chat_info_create_time=float(message.chat_stream.create_time),
chat_info_last_active_time=float(message.chat_stream.last_active_time),
chat_info_user_id=chat_user_id,
chat_info_user_nickname=chat_user_nickname,
chat_info_user_cardname=chat_user_cardname,
chat_info_user_platform=chat_user_platform,
chat_info_group_id=group_id,
chat_info_group_name=group_name,
chat_info_group_platform=group_platform,
)
# 如果是群聊,添加群组信息
if message.chat_stream.group_info:
db_message.chat_info_group_id = message.chat_stream.group_info.group_id
db_message.chat_info_group_name = message.chat_stream.group_info.group_name
db_message.chat_info_group_platform = message.chat_stream.group_info.platform
# 兼容历史逻辑:显式设置群聊相关属性,便于后续逻辑通过 hasattr 判断
if group_info:
setattr(db_message, "chat_info_group_id", group_id)
setattr(db_message, "chat_info_group_name", group_name)
setattr(db_message, "chat_info_group_platform", group_platform)
else:
setattr(db_message, "chat_info_group_id", None)
setattr(db_message, "chat_info_group_name", None)
setattr(db_message, "chat_info_group_platform", None)
# 添加消息到消息管理器
await message_manager.add_message(message.chat_stream.stream_id, db_message)
logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}")
# 先交给消息管理器处理,计算兴趣度等衍生数据
try:
await message_manager.add_message(message.chat_stream.stream_id, db_message)
logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}")
except Exception as e:
logger.error(f"消息添加到消息管理器失败: {e}")
# 将兴趣度结果同步回原始消息,便于后续流程使用
message.interest_value = getattr(db_message, "interest_value", getattr(message, "interest_value", 0.0))
setattr(message, "should_reply", getattr(db_message, "should_reply", getattr(message, "should_reply", False)))
setattr(message, "should_act", getattr(db_message, "should_act", getattr(message, "should_act", False)))
# 存储消息到数据库,只进行一次写入
try:
await MessageStorage.store_message(message, message.chat_stream)
logger.debug(
"消息已存储到数据库: %s (interest=%.3f, should_reply=%s, should_act=%s)",
message.message_info.message_id,
getattr(message, "interest_value", -1.0),
getattr(message, "should_reply", None),
getattr(message, "should_act", None),
)
except Exception as e:
logger.error(f"存储消息到数据库失败: {e}")
traceback.print_exc()
if template_group_name:
async with global_prompt_manager.async_message_scope(template_group_name):

View File

@@ -33,6 +33,10 @@ class ChatterActionManager:
self._using_actions = component_registry.get_default_actions()
self.log_prefix: str = "ChatterActionManager"
# 批量存储支持
self._batch_storage_enabled = False
self._pending_actions = []
self._current_chat_id = None
# === 执行Action方法 ===
@@ -184,19 +188,29 @@ class ChatterActionManager:
reason = reasoning or "选择不回复"
logger.info(f"{log_prefix} 选择不回复,原因: {reason}")
# 存储no_reply信息到数据库
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name="no_reply",
)
# 存储no_reply信息到数据库(支持批量存储)
if self._batch_storage_enabled:
self.add_action_to_batch(
action_name="no_reply",
action_data={"reason": reason},
thinking_id=thinking_id or "",
action_done=True,
action_build_into_prompt=False,
action_prompt_display=reason
)
else:
asyncio.create_task(database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name="no_reply",
))
# 自动清空所有未读消息
await self._clear_all_unread_messages(chat_stream.stream_id, "no_reply")
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "no_reply"))
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
@@ -214,12 +228,12 @@ class ChatterActionManager:
# 记录执行的动作到目标消息
if success:
await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
asyncio.create_task(self._record_action_to_message(chat_stream, action_name, target_message, action_data))
# 自动清空所有未读消息
if clear_unread_messages:
await self._clear_all_unread_messages(chat_stream.stream_id, action_name)
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, action_name))
# 重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id)
asyncio.create_task(self._reset_interruption_count_after_action(chat_stream.stream_id))
return {
"action_type": action_name,
@@ -260,13 +274,13 @@ class ChatterActionManager:
)
# 记录回复动作到目标消息
await self._record_action_to_message(chat_stream, "reply", target_message, action_data)
asyncio.create_task(self._record_action_to_message(chat_stream, "reply", target_message, action_data))
if clear_unread_messages:
await self._clear_all_unread_messages(chat_stream.stream_id, "reply")
asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "reply"))
# 回复成功,重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id)
asyncio.create_task(self._reset_interruption_count_after_action(chat_stream.stream_id))
return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info}
@@ -474,16 +488,26 @@ class ChatterActionManager:
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
# 存储动作信息到数据库
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text},
action_name="reply",
)
# 存储动作信息到数据库(支持批量存储)
if self._batch_storage_enabled:
self.add_action_to_batch(
action_name="reply",
action_data={"reply_text": reply_text},
thinking_id=thinking_id or "",
action_done=True,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display
)
else:
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text},
action_name="reply",
)
# 构建循环信息
loop_info: dict[str, Any] = {
@@ -579,3 +603,71 @@ class ChatterActionManager:
)
return reply_text
def enable_batch_storage(self, chat_id: str):
"""启用批量存储模式"""
self._batch_storage_enabled = True
self._current_chat_id = chat_id
self._pending_actions.clear()
logger.debug(f"已启用批量存储模式chat_id: {chat_id}")
def disable_batch_storage(self):
"""禁用批量存储模式"""
self._batch_storage_enabled = False
self._current_chat_id = None
logger.debug("已禁用批量存储模式")
def add_action_to_batch(self, action_name: str, action_data: dict, thinking_id: str = "",
action_done: bool = True, action_build_into_prompt: bool = False,
action_prompt_display: str = ""):
"""添加动作到批量存储列表"""
if not self._batch_storage_enabled:
return False
action_record = {
"action_name": action_name,
"action_data": action_data,
"thinking_id": thinking_id,
"action_done": action_done,
"action_build_into_prompt": action_build_into_prompt,
"action_prompt_display": action_prompt_display,
"timestamp": time.time()
}
self._pending_actions.append(action_record)
logger.debug(f"已添加动作到批量存储列表: {action_name} (当前待处理: {len(self._pending_actions)} 个)")
return True
async def flush_batch_storage(self, chat_stream):
"""批量存储所有待处理的动作记录"""
if not self._pending_actions:
logger.debug("没有待处理的动作需要批量存储")
return
try:
logger.info(f"开始批量存储 {len(self._pending_actions)} 个动作记录")
# 批量存储所有动作
stored_count = 0
for action_data in self._pending_actions:
try:
result = await database_api.store_action_info(
chat_stream=chat_stream,
action_name=action_data.get("action_name", ""),
action_data=action_data.get("action_data", {}),
action_done=action_data.get("action_done", True),
action_build_into_prompt=action_data.get("action_build_into_prompt", False),
action_prompt_display=action_data.get("action_prompt_display", ""),
thinking_id=action_data.get("thinking_id", "")
)
if result:
stored_count += 1
except Exception as e:
logger.error(f"存储单个动作记录失败: {e}")
logger.info(f"批量存储完成: 成功存储 {stored_count}/{len(self._pending_actions)} 个动作记录")
# 清空待处理列表
self._pending_actions.clear()
except Exception as e:
logger.error(f"批量存储动作记录时发生错误: {e}")

View File

@@ -287,13 +287,13 @@ class DefaultReplyer:
try:
# 构建 Prompt
with Timer("构建Prompt", {}): # 内部计时器,可选保留
prompt = await asyncio.create_task(self.build_prompt_reply_context(
prompt = await self.build_prompt_reply_context(
reply_to=reply_to,
extra_info=extra_info,
available_actions=available_actions,
enable_tool=enable_tool,
reply_message=reply_message,
))
)
if not prompt:
logger.warning("构建prompt失败跳过回复生成")

View File

@@ -175,7 +175,6 @@ class PromptManager:
self._prompts = {}
self._counter = 0
self._context = PromptContext()
self._lock = asyncio.Lock()
@asynccontextmanager
async def async_message_scope(self, message_id: str | None = None):
@@ -190,10 +189,9 @@ class PromptManager:
logger.debug(f"从上下文中获取提示词: {name} {context_prompt}")
return context_prompt
async with self._lock:
if name not in self._prompts:
raise KeyError(f"Prompt '{name}' not found")
return self._prompts[name]
if name not in self._prompts:
raise KeyError(f"Prompt '{name}' not found")
return self._prompts[name]
def generate_name(self, template: str) -> str:
"""为未命名的prompt生成名称"""

View File

@@ -53,38 +53,8 @@ class StreamContext(BaseDataModel):
priority_mode: str | None = None
priority_info: dict | None = None
def add_message(self, message: "DatabaseMessages"):
"""添加消息到上下文"""
message.is_read = False
self.unread_messages.append(message)
# 自动检测和更新chat type
self._detect_chat_type(message)
def update_message_info(
self, message_id: str, interest_value: float = None, actions: list = None, should_reply: bool = None
):
"""
更新消息信息
Args:
message_id: 消息ID
interest_value: 兴趣度值
actions: 执行的动作列表
should_reply: 是否应该回复
"""
# 在未读消息中查找并更新
for message in self.unread_messages:
if message.message_id == message_id:
message.update_message_info(interest_value, actions, should_reply)
break
# 在历史消息中查找并更新
for message in self.history_messages:
if message.message_id == message_id:
message.update_message_info(interest_value, actions, should_reply)
break
def add_action_to_message(self, message_id: str, action: str):
"""
向指定消息添加执行的动作
@@ -105,42 +75,8 @@ class StreamContext(BaseDataModel):
message.add_action(action)
break
def _detect_chat_type(self, message: "DatabaseMessages"):
"""根据消息内容自动检测聊天类型"""
# 只有在第一次添加消息时才检测聊天类型,避免后续消息改变类型
if len(self.unread_messages) == 1: # 只有这条消息
# 如果消息包含群组信息,则为群聊
if hasattr(message, "chat_info_group_id") and message.chat_info_group_id:
self.chat_type = ChatType.GROUP
elif hasattr(message, "chat_info_group_name") and message.chat_info_group_name:
self.chat_type = ChatType.GROUP
else:
self.chat_type = ChatType.PRIVATE
def update_chat_type(self, chat_type: ChatType):
"""手动更新聊天类型"""
self.chat_type = chat_type
def set_chat_mode(self, chat_mode: ChatMode):
"""设置聊天模式"""
self.chat_mode = chat_mode
def is_group_chat(self) -> bool:
"""检查是否为群聊"""
return self.chat_type == ChatType.GROUP
def is_private_chat(self) -> bool:
"""检查是否为私聊"""
return self.chat_type == ChatType.PRIVATE
def get_chat_type_display(self) -> str:
"""获取聊天类型的显示名称"""
if self.chat_type == ChatType.GROUP:
return "群聊"
elif self.chat_type == ChatType.PRIVATE:
return "私聊"
else:
return "未知类型"
def mark_message_as_read(self, message_id: str):
"""标记消息为已读"""

View File

@@ -36,6 +36,9 @@ class DatabaseConfig(ValidatedConfigBase):
connection_pool_size: int = Field(default=10, ge=1, description="连接池大小")
connection_timeout: int = Field(default=10, ge=1, description="连接超时时间")
# 批量动作记录存储配置
batch_action_storage_enabled: bool = Field(default=True, description="是否启用批量保存动作记录(开启后将多个动作一次性写入数据库,提升性能)")
class BotConfig(ValidatedConfigBase):
"""QQ机器人配置类"""
@@ -685,6 +688,7 @@ class AffinityFlowConfig(ValidatedConfigBase):
base_relationship_score: float = Field(default=0.5, description="基础人物关系分")
class ProactiveThinkingConfig(ValidatedConfigBase):
"""主动思考(主动发起对话)功能配置"""

View File

@@ -6,3 +6,5 @@ if "openai" in used_client_types:
from . import openai_client # noqa: F401
if "aiohttp_gemini" in used_client_types:
from . import aiohttp_gemini_client # noqa: F401
if "mcp_sse" in used_client_types:
from . import mcp_sse_client # noqa: F401

View File

@@ -0,0 +1,410 @@
"""
MCP (Model Context Protocol) SSE (Server-Sent Events) 客户端实现
支持通过SSE协议与MCP服务器进行通信
"""
import asyncio
import io
import json
from collections.abc import Callable
from typing import Any
import aiohttp
import orjson
from json_repair import repair_json
from src.common.logger import get_logger
from src.config.api_ada_configs import APIProvider, ModelInfo
from ..exceptions import (
NetworkConnectionError,
ReqAbortException,
RespNotOkException,
RespParseException,
)
from ..payload_content.message import Message, RoleType
from ..payload_content.resp_format import RespFormat
from ..payload_content.tool_option import ToolCall, ToolOption
from .base_client import APIResponse, BaseClient, UsageRecord, client_registry
logger = get_logger("MCP-SSE客户端")
def _convert_messages_to_mcp(messages: list[Message]) -> list[dict[str, Any]]:
"""
将消息列表转换为MCP协议格式
:param messages: 消息列表
:return: MCP格式的消息列表
"""
mcp_messages = []
for message in messages:
mcp_msg: dict[str, Any] = {
"role": message.role.value,
}
# 处理内容
if isinstance(message.content, str):
mcp_msg["content"] = message.content
elif isinstance(message.content, list):
# 处理多模态内容
content_parts = []
for item in message.content:
if isinstance(item, tuple):
# 图片内容
content_parts.append({
"type": "image",
"source": {
"type": "base64",
"media_type": f"image/{item[0].lower()}",
"data": item[1],
},
})
elif isinstance(item, str):
# 文本内容
content_parts.append({"type": "text", "text": item})
mcp_msg["content"] = content_parts
# 添加工具调用ID如果是工具消息
if message.role == RoleType.Tool and message.tool_call_id:
mcp_msg["tool_call_id"] = message.tool_call_id
mcp_messages.append(mcp_msg)
return mcp_messages
def _convert_tools_to_mcp(tool_options: list[ToolOption]) -> list[dict[str, Any]]:
"""
将工具选项转换为MCP协议格式
:param tool_options: 工具选项列表
:return: MCP格式的工具列表
"""
mcp_tools = []
for tool in tool_options:
mcp_tool = {
"name": tool.name,
"description": tool.description,
}
if tool.params:
properties = {}
required = []
for param in tool.params:
properties[param.name] = {
"type": param.param_type.value,
"description": param.description,
}
if param.enum_values:
properties[param.name]["enum"] = param.enum_values
if param.required:
required.append(param.name)
mcp_tool["input_schema"] = {
"type": "object",
"properties": properties,
"required": required,
}
mcp_tools.append(mcp_tool)
return mcp_tools
async def _parse_sse_stream(
session: aiohttp.ClientSession,
url: str,
payload: dict[str, Any],
headers: dict[str, str],
interrupt_flag: asyncio.Event | None = None,
) -> tuple[APIResponse, tuple[int, int, int] | None]:
"""
解析SSE流式响应
:param session: aiohttp会话
:param url: 请求URL
:param payload: 请求负载
:param headers: 请求头
:param interrupt_flag: 中断标志
:return: API响应和使用记录
"""
content_buffer = io.StringIO()
reasoning_buffer = io.StringIO()
tool_calls_buffer: list[tuple[str, str, dict[str, Any]]] = []
usage_record = None
try:
async with session.post(url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise RespNotOkException(
response.status, f"MCP SSE请求失败: {error_text}"
)
# 解析SSE流
async for line in response.content:
if interrupt_flag and interrupt_flag.is_set():
raise ReqAbortException("请求被外部信号中断")
decoded_line = line.decode("utf-8").strip()
# 跳过空行和注释
if not decoded_line or decoded_line.startswith(":"):
continue
# 解析SSE事件
if decoded_line.startswith("data: "):
data_str = decoded_line[6:] # 移除"data: "前缀
# 跳过[DONE]标记
if data_str == "[DONE]":
break
try:
event_data = orjson.loads(data_str)
except orjson.JSONDecodeError:
logger.warning(f"无法解析SSE数据: {data_str}")
continue
# 处理不同类型的事件
event_type = event_data.get("type")
if event_type == "content_block_start":
# 内容块开始
block = event_data.get("content_block", {})
if block.get("type") == "text":
pass # 准备接收文本内容
elif block.get("type") == "tool_use":
# 工具调用开始
tool_calls_buffer.append(
(
block.get("id", ""),
block.get("name", ""),
{},
)
)
elif event_type == "content_block_delta":
# 内容块增量
delta = event_data.get("delta", {})
delta_type = delta.get("type")
if delta_type == "text_delta":
# 文本增量
text = delta.get("text", "")
content_buffer.write(text)
elif delta_type == "input_json_delta":
# 工具调用参数增量
if tool_calls_buffer:
partial_json = delta.get("partial_json", "")
# 累积JSON片段
current_args = tool_calls_buffer[-1][2]
if "_json_buffer" not in current_args:
current_args["_json_buffer"] = ""
current_args["_json_buffer"] += partial_json
elif event_type == "content_block_stop":
# 内容块结束
if tool_calls_buffer:
# 解析完整的工具调用参数
last_call = tool_calls_buffer[-1]
if "_json_buffer" in last_call[2]:
json_str = last_call[2].pop("_json_buffer")
try:
parsed_args = orjson.loads(repair_json(json_str))
tool_calls_buffer[-1] = (
last_call[0],
last_call[1],
parsed_args if isinstance(parsed_args, dict) else {},
)
except orjson.JSONDecodeError as e:
logger.error(f"解析工具调用参数失败: {e}")
elif event_type == "message_delta":
# 消息元数据更新
delta = event_data.get("delta", {})
stop_reason = delta.get("stop_reason")
if stop_reason:
logger.debug(f"消息结束原因: {stop_reason}")
# 提取使用统计
usage = event_data.get("usage", {})
if usage:
usage_record = (
usage.get("input_tokens", 0),
usage.get("output_tokens", 0),
usage.get("input_tokens", 0) + usage.get("output_tokens", 0),
)
elif event_type == "message_stop":
# 消息结束
break
except aiohttp.ClientError as e:
raise NetworkConnectionError() from e
except Exception as e:
logger.error(f"解析SSE流时发生错误: {e}")
raise
# 构建响应
response = APIResponse()
if content_buffer.tell() > 0:
response.content = content_buffer.getvalue()
if reasoning_buffer.tell() > 0:
response.reasoning_content = reasoning_buffer.getvalue()
if tool_calls_buffer:
response.tool_calls = [
ToolCall(call_id, func_name, args)
for call_id, func_name, args in tool_calls_buffer
]
# 关闭缓冲区
content_buffer.close()
reasoning_buffer.close()
return response, usage_record
@client_registry.register_client_class("mcp_sse")
class MCPSSEClient(BaseClient):
"""
MCP SSE客户端实现
支持通过Server-Sent Events协议与MCP服务器通信
"""
def __init__(self, api_provider: APIProvider):
super().__init__(api_provider)
self._session: aiohttp.ClientSession | None = None
async def _get_session(self) -> aiohttp.ClientSession:
"""获取或创建aiohttp会话"""
if self._session is None or self._session.closed:
timeout = aiohttp.ClientTimeout(total=self.api_provider.timeout)
self._session = aiohttp.ClientSession(timeout=timeout)
return self._session
async def close(self):
"""关闭客户端会话"""
if self._session and not self._session.closed:
await self._session.close()
async def get_response(
self,
model_info: ModelInfo,
message_list: list[Message],
tool_options: list[ToolOption] | None = None,
max_tokens: int = 1024,
temperature: float = 0.7,
response_format: RespFormat | None = None,
stream_response_handler: Callable[[Any, asyncio.Event | None], tuple[APIResponse, tuple[int, int, int]]]
| None = None,
async_response_parser: Callable[[Any], tuple[APIResponse, tuple[int, int, int]]] | None = None,
interrupt_flag: asyncio.Event | None = None,
extra_params: dict[str, Any] | None = None,
) -> APIResponse:
"""
获取对话响应
:param model_info: 模型信息
:param message_list: 对话消息列表
:param tool_options: 工具选项
:param max_tokens: 最大token数
:param temperature: 温度参数
:param response_format: 响应格式
:param stream_response_handler: 流式响应处理器
:param async_response_parser: 异步响应解析器
:param interrupt_flag: 中断标志
:param extra_params: 额外参数
:return: API响应
"""
session = await self._get_session()
# 构建请求负载
payload: dict[str, Any] = {
"model": model_info.model_identifier,
"messages": _convert_messages_to_mcp(message_list),
"max_tokens": max_tokens,
"temperature": temperature,
"stream": True, # MCP SSE始终使用流式
}
# 添加工具
if tool_options:
payload["tools"] = _convert_tools_to_mcp(tool_options)
# 添加额外参数
if extra_params:
payload.update(extra_params)
# 构建请求头
headers = {
"Content-Type": "application/json",
"Accept": "text/event-stream",
"Authorization": f"Bearer {self.api_provider.get_api_key()}",
}
# 发送请求并解析响应
url = f"{self.api_provider.base_url}/v1/messages"
try:
response, usage_record = await _parse_sse_stream(
session, url, payload, headers, interrupt_flag
)
except Exception as e:
logger.error(f"MCP SSE请求失败: {e}")
raise
# 添加使用记录
if usage_record:
response.usage = UsageRecord(
model_name=model_info.name,
provider_name=model_info.api_provider,
prompt_tokens=usage_record[0],
completion_tokens=usage_record[1],
total_tokens=usage_record[2],
)
return response
async def get_embedding(
self,
model_info: ModelInfo,
embedding_input: str,
extra_params: dict[str, Any] | None = None,
) -> APIResponse:
"""
获取文本嵌入
MCP协议暂不支持嵌入功能
:param model_info: 模型信息
:param embedding_input: 嵌入输入文本
:return: 嵌入响应
"""
raise NotImplementedError("MCP SSE客户端暂不支持嵌入功能")
async def get_audio_transcriptions(
self,
model_info: ModelInfo,
audio_base64: str,
extra_params: dict[str, Any] | None = None,
) -> APIResponse:
"""
获取音频转录
MCP协议暂不支持音频转录功能
:param model_info: 模型信息
:param audio_base64: base64编码的音频数据
:return: 音频转录响应
"""
raise NotImplementedError("MCP SSE客户端暂不支持音频转录功能")
def get_support_image_formats(self) -> list[str]:
"""
获取支持的图片格式
:return: 支持的图片格式列表
"""
return ["jpg", "jpeg", "png", "webp", "gif"]

View File

@@ -32,7 +32,7 @@ class InterestCalculationResult:
):
self.success = success
self.message_id = message_id
self.interest_value = max(0.0, min(1.0, interest_value)) # 确保在0-1范围内
self.interest_value = interest_value
self.should_take_action = should_take_action
self.should_reply = should_reply
self.should_act = should_act

View File

@@ -4,6 +4,7 @@
"""
import time
import orjson
from typing import TYPE_CHECKING
from src.chat.interest_system import bot_interest_manager
@@ -42,6 +43,9 @@ class AffinityInterestCalculator(BaseInterestCalculator):
self.reply_threshold = affinity_config.reply_action_interest_threshold # 回复动作兴趣阈值
self.mention_threshold = affinity_config.mention_bot_adjustment_threshold # 提及bot后的调整阈值
# 兴趣匹配系统配置
self.use_smart_matching = True
# 连续不回复概率提升
self.no_reply_count = 0
self.max_no_reply_count = affinity_config.max_no_reply_count
@@ -71,7 +75,11 @@ class AffinityInterestCalculator(BaseInterestCalculator):
start_time = time.time()
message_id = getattr(message, "message_id", "")
content = getattr(message, "processed_plain_text", "")
user_id = getattr(message, "user_info", {}).user_id if hasattr(message, "user_info") and hasattr(message.user_info, "user_id") else ""
user_info = getattr(message, "user_info", None)
if user_info and hasattr(user_info, "user_id"):
user_id = user_info.user_id
else:
user_id = ""
logger.debug(f"[Affinity兴趣计算] 开始处理消息 {message_id}")
logger.debug(f"[Affinity兴趣计算] 消息内容: {content[:50]}...")
@@ -111,10 +119,18 @@ class AffinityInterestCalculator(BaseInterestCalculator):
logger.debug(f"[Affinity兴趣计算] 应用不回复提升后: {total_score:.3f}{adjusted_score:.3f}")
# 6. 决定是否回复和执行动作
should_reply = adjusted_score > self.reply_threshold
should_take_action = adjusted_score > (self.reply_threshold + 0.1)
logger.debug(f"[Affinity兴趣计算] 阈值判断: {adjusted_score:.3f} > 回复阈值:{self.reply_threshold:.3f}? = {should_reply}")
logger.debug(f"[Affinity兴趣计算] 阈值判断: {adjusted_score:.3f} > 动作阈值:{self.reply_threshold + 0.1:.3f}? = {should_take_action}")
reply_threshold = self.reply_threshold
action_threshold = global_config.affinity_flow.non_reply_action_interest_threshold
should_reply = adjusted_score >= reply_threshold
should_take_action = adjusted_score >= action_threshold
logger.debug(
f"[Affinity兴趣计算] 阈值判断: {adjusted_score:.3f} >= 回复阈值:{reply_threshold:.3f}? = {should_reply}"
)
logger.debug(
f"[Affinity兴趣计算] 阈值判断: {adjusted_score:.3f} >= 动作阈值:{action_threshold:.3f}? = {should_take_action}"
)
calculation_time = time.time() - start_time
@@ -140,7 +156,7 @@ class AffinityInterestCalculator(BaseInterestCalculator):
error_message=str(e)
)
async def _calculate_interest_match_score(self, content: str, keywords: list[str] = None) -> float:
async def _calculate_interest_match_score(self, content: str, keywords: list[str] | None = None) -> float:
"""计算兴趣匹配度(使用智能兴趣匹配系统)"""
# 调试日志:检查各个条件
@@ -158,7 +174,7 @@ class AffinityInterestCalculator(BaseInterestCalculator):
try:
# 使用机器人的兴趣标签系统进行智能匹配
match_result = await bot_interest_manager.calculate_interest_match(content, keywords)
match_result = await bot_interest_manager.calculate_interest_match(content, keywords or [])
logger.debug(f"兴趣匹配结果: {match_result}")
if match_result:
@@ -241,7 +257,6 @@ class AffinityInterestCalculator(BaseInterestCalculator):
key_words = getattr(message, "key_words", "")
if key_words:
try:
import orjson
extracted = orjson.loads(key_words)
if isinstance(extracted, list):
keywords = extracted
@@ -253,7 +268,6 @@ class AffinityInterestCalculator(BaseInterestCalculator):
key_words_lite = getattr(message, "key_words_lite", "")
if key_words_lite:
try:
import orjson
extracted = orjson.loads(key_words_lite)
if isinstance(extracted, list):
keywords = extracted
@@ -296,6 +310,3 @@ class AffinityInterestCalculator(BaseInterestCalculator):
self.no_reply_count = 0
else:
self.no_reply_count = min(self.no_reply_count + 1, self.max_no_reply_count)
# 是否使用智能兴趣匹配(作为类属性)
use_smart_matching = True

View File

@@ -69,6 +69,13 @@ class ChatterPlanExecutor:
action_types = [action.action_type for action in plan.decided_actions]
logger.info(f"选择动作: {', '.join(action_types) if action_types else ''}")
# 根据配置决定是否启用批量存储模式
if global_config.database.batch_action_storage_enabled:
self.action_manager.enable_batch_storage(plan.chat_id)
logger.debug("已启用批量存储模式")
else:
logger.debug("批量存储功能已禁用,使用立即存储模式")
execution_results = []
reply_actions = []
other_actions = []
@@ -102,6 +109,9 @@ class ChatterPlanExecutor:
f"规划执行完成: 总数={len(plan.decided_actions)}, 成功={successful_count}, 失败={len(execution_results) - successful_count}"
)
# 批量存储所有待处理的动作
await self._flush_action_manager_batch_storage(plan)
return {
"executed_count": len(plan.decided_actions),
"successful_count": successful_count,
@@ -220,11 +230,11 @@ class ChatterPlanExecutor:
except Exception as e:
error_message = str(e)
logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}")
'''
# 记录用户关系追踪
if success and action_info.action_message:
await self._track_user_interaction(action_info, plan, reply_content)
'''
execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time)
@@ -395,6 +405,7 @@ class ChatterPlanExecutor:
# 移除执行时间列表以避免返回过大数据
stats.pop("execution_times", None)
return stats
def reset_stats(self):
@@ -422,3 +433,26 @@ class ChatterPlanExecutor:
}
for i, time_val in enumerate(recent_times)
]
async def _flush_action_manager_batch_storage(self, plan: Plan):
"""使用 action_manager 的批量存储功能存储所有待处理的动作"""
try:
# 通过 chat_id 获取真实的 chat_stream 对象
from src.plugin_system.apis.chat_api import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(plan.chat_id)
if chat_stream:
# 调用 action_manager 的批量存储
await self.action_manager.flush_batch_storage(chat_stream)
logger.info("批量存储完成:通过 action_manager 存储所有动作记录")
# 禁用批量存储模式
self.action_manager.disable_batch_storage()
except Exception as e:
logger.error(f"批量存储动作记录时发生错误: {e}")
# 确保在出错时也禁用批量存储模式
self.action_manager.disable_batch_storage()

View File

@@ -3,6 +3,7 @@
集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。
"""
import asyncio
from dataclasses import asdict
from typing import TYPE_CHECKING, Any
@@ -12,6 +13,7 @@ from src.mood.mood_manager import mood_manager
from src.plugins.built_in.affinity_flow_chatter.plan_executor import ChatterPlanExecutor
from src.plugins.built_in.affinity_flow_chatter.plan_filter import ChatterPlanFilter
from src.plugins.built_in.affinity_flow_chatter.plan_generator import ChatterPlanGenerator
from src.plugin_system.base.component_types import ChatMode
if TYPE_CHECKING:
from src.chat.planner_actions.action_manager import ChatterActionManager
@@ -59,7 +61,7 @@ class ChatterActionPlanner:
"other_actions_executed": 0,
}
async def plan(self, context: "StreamContext" = None) -> tuple[list[dict], dict | None]:
async def plan(self, context: "StreamContext | None" = None) -> tuple[list[dict[str, Any]], Any | None]:
"""
执行完整的增强版规划流程。
@@ -81,7 +83,7 @@ class ChatterActionPlanner:
self.planner_stats["failed_plans"] += 1
return [], None
async def _enhanced_plan_flow(self, context: "StreamContext") -> tuple[list[dict], dict | None]:
async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]:
"""执行增强版规划流程"""
try:
# 在规划前,先进行动作修改
@@ -91,47 +93,48 @@ class ChatterActionPlanner:
await action_modifier.modify_actions()
# 1. 生成初始 Plan
initial_plan = await self.generator.generate(context.chat_mode)
chat_mode = context.chat_mode if context else ChatMode.NORMAL
initial_plan = await self.generator.generate(chat_mode)
# 确保Plan中包含所有当前可用的动作
initial_plan.available_actions = self.action_manager.get_using_actions()
unread_messages = context.get_unread_messages() if context else []
# 2. 使用新的兴趣度管理系统进行评分
score = 0.0
should_reply = False
reply_not_available = False
max_message_interest = 0.0
reply_not_available = True
interest_updates: list[dict[str, Any]] = []
aggregate_should_act = False
aggregate_should_reply = False
if unread_messages:
# 直接使用消息中已计算的标志,无需重复计算兴趣值
for message in unread_messages:
try:
message_interest = getattr(message, "interest_value", 0.3)
raw_interest = getattr(message, "interest_value", 0.3)
if raw_interest is None:
raw_interest = 0.0
message_interest = float(raw_interest)
max_message_interest = max(max_message_interest, message_interest)
message_should_reply = getattr(message, "should_reply", False)
message_should_act = getattr(message, "should_act", False)
# 确保interest_value不是None
if message_interest is None:
message_interest = 0.3
# 更新最高兴趣度消息
if message_interest > score:
score = message_interest
if message_should_reply:
should_reply = True
else:
reply_not_available = True
# 如果should_act为false强制设为no_action
if not message_should_act:
reply_not_available = True
logger.debug(
f"消息 {message.message_id} 预计算标志: interest={message_interest:.3f}, "
f"should_reply={message_should_reply}, should_act={message_should_act}"
)
if message_should_reply:
aggregate_should_reply = True
aggregate_should_act = True
reply_not_available = False
break
if message_should_act:
aggregate_should_act = True
except Exception as e:
logger.warning(f"处理消息 {message.message_id} 失败: {e}")
message.interest_value = 0.0
@@ -142,22 +145,27 @@ class ChatterActionPlanner:
"message_id": message.message_id,
"interest_value": 0.0,
"should_reply": False,
"should_act": False,
}
)
if interest_updates:
await self._commit_interest_updates(interest_updates)
asyncio.create_task(self._commit_interest_updates(interest_updates))
# 检查兴趣度是否达到非回复动作阈值
non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold
if score < non_reply_action_interest_threshold:
logger.info(f"兴趣度 {score:.3f} 低于阈值 {non_reply_action_interest_threshold:.3f},不执行动作")
if not aggregate_should_act:
logger.info("所有未读消息低于兴趣度阈值,不执行动作")
# 直接返回 no_action
from src.common.data_models.info_data_model import ActionPlannerInfo
no_action = ActionPlannerInfo(
action_type="no_action",
reasoning=f"兴趣度评分 {score:.3f} 未达阈值 {non_reply_action_interest_threshold:.3f}",
reasoning=(
"所有未读消息兴趣度未达阈值 "
f"{non_reply_action_interest_threshold:.3f}"
f"(最高兴趣度 {max_message_interest:.3f}"
),
action_data={},
action_message=None,
)
@@ -169,9 +177,6 @@ class ChatterActionPlanner:
plan_filter = ChatterPlanFilter(self.chat_id, available_actions)
filtered_plan = await plan_filter.filter(reply_not_available, initial_plan)
# 检查filtered_plan是否有reply动作用于统计
has_reply_action = any(decision.action_type == "reply" for decision in filtered_plan.decided_actions)
# 5. 使用 PlanExecutor 执行 Plan
execution_result = await self.executor.execute(filtered_plan)
@@ -213,7 +218,7 @@ class ChatterActionPlanner:
except Exception as e:
logger.warning(f"批量更新数据库兴趣度失败: {e}")
def _update_stats_from_execution_result(self, execution_result: dict[str, any]):
def _update_stats_from_execution_result(self, execution_result: dict[str, Any]):
"""根据执行结果更新规划器统计"""
if not execution_result:
return
@@ -237,7 +242,7 @@ class ChatterActionPlanner:
self.planner_stats["replies_generated"] += reply_count
self.planner_stats["other_actions_executed"] += other_count
def _build_return_result(self, plan: "Plan") -> tuple[list[dict], dict | None]:
def _build_return_result(self, plan: "Plan") -> tuple[list[dict[str, Any]], Any | None]:
"""构建返回结果"""
final_actions = plan.decided_actions or []
final_target_message = next((act.action_message for act in final_actions if act.action_message), None)
@@ -254,7 +259,7 @@ class ChatterActionPlanner:
return final_actions_dict, final_target_message_dict
def get_planner_stats(self) -> dict[str, any]:
def get_planner_stats(self) -> dict[str, Any]:
"""获取规划器统计"""
return self.planner_stats.copy()
@@ -263,7 +268,7 @@ class ChatterActionPlanner:
chat_mood = mood_manager.get_mood_by_chat_id(self.chat_id)
return chat_mood.mood_state
def get_mood_stats(self) -> dict[str, any]:
def get_mood_stats(self) -> dict[str, Any]:
"""获取情绪状态统计"""
chat_mood = mood_manager.get_mood_by_chat_id(self.chat_id)
return {

View File

@@ -19,191 +19,138 @@ def init_prompts():
# 并要求模型以 JSON 格式输出一个或多个动作组合。
Prompt(
"""
{mood_block}
{time_block}
{mood_block}
{identity_block}
{schedule_block}
{users_in_chat}
{custom_prompt_block}
{chat_context_description},以下是具体的聊天内容
{chat_context_description}
## 📜 已读历史消息(仅供参考)
{actions_before_now_block}
## 📜 已读历史(仅供理解,不可作为动作对象)
{read_history_block}
## 📬 未读历史消息(动作执行对象
## 📬 未读历史(只能对这里的消息执行动作
{unread_history_block}
{moderation_prompt}
**任务: 构建一个完整的响应**
你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成:
1. **主要动作**: 这是响应的核心,通常是 `reply`(如果有)。
2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例`emoji`(发送表情包)或 `poke_user`(戳一戳)
# 目标
你的任务是根据当前对话,给出一个或多个动作,构成一次完整的响应组合。
- 主要动作:通常是 reply需回复)。
- 辅助动作(可选):如 emoji、poke_user 等,用于增强表达
**决策流程:**
1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。**
2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。**
3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)。
4. **核心:如果有多条未读消息都需要回应(例如多人@你),你应该并行处理,在`actions`列表中生成多个`reply`动作。**
5. 首先,决定是否要对未读消息进行 `reply`(如果有)
6. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格
7. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。
8. 如果用户明确要求了某个动作,请务必优先满足
# 决策流程
1. 已读仅供参考,不能对已读执行任何动作。
2. 目标消息必须来自未读历史,并使用其前缀 <m...> 作为 target_message_id。
3. 优先级:
- 直接针对你:@你、回复你、点名提问、引用你的消息。
- 与你强相关的话题或你熟悉的问题
- 其他与上下文弱相关的内容最后考虑
{mentioned_bonus}
4. 多目标:若多人同时需要回应,请在 actions 中并行生成多个 reply每个都指向各自的 target_message_id
5. 避免:表情包/纯表情/无信息的消息;对这类消息通常不回复或选择 no_action/no_reply。
6. 风格:保持人设一致;避免重复你说过的话;避免冗余和口头禅。
**重要提醒:**
- **回复消息时必须遵循对话的流程,不要重复已经说过的话。**
- **确保回复与上下文紧密相关,回应要针对用户的消息内容。**
- **保持角色设定的一致性,使用符合你性格的语言风格。**
- **不要对表情包消息做出回应!**
**输出格式:**
请严格按照以下 JSON 格式输出,包含 `thinking` 和 `actions` 字段:
**重要概念:将“内心思考”作为思绪流的体现**
`thinking` 字段是本次决策的核心。它并非一个简单的“理由”,而是 **一个模拟人类在回应前,头脑中自然浮现的、未经修饰的思绪流**。你需要完全代入 {identity_block} 的角色,将那一刻的想法自然地记录下来。
**内心思考的要点:**
* **自然流露**: 不要使用“决定”、“所以”、“因此”等结论性或汇报式的词语。你的思考应该像日记一样,是给自己看的,充满了不确定性和情绪的自然流动。
* **展现过程**: 重点在于展现 **思考的过程**,而不是 **决策的结果**。描述你看到了什么,想到了什么,感受到了什么。
* **使用昵称**: 在你的思绪流中,请直接使用用户的昵称来指代他们,而不是`<m1>`, `<m2>`这样的消息ID。
* **严禁技术术语**: 严禁在思考中提及任何数字化的度量(如兴趣度、分数)或内部技术术语。请完全使用角色自身的感受和语言来描述思考过程。
# 思绪流规范thinking
- 真实、自然、非结论化,像给自己看的随笔。
- 描述你看到/想到/感觉到的过程,不要出现“因此/我决定”等总结词。
- 直接使用对方昵称,而不是 <m1>/<m2> 这样的标签。
- 禁止出现“兴趣度、分数”等技术术语或内部实现细节。
## 可用动作列表
{action_options_text}
### 单动作示例:
## 输出格式(只输出 JSON不要多余文本或代码块
示例(单动作):
```json
{{
"thinking": "在这里写下你的思绪流...",
"actions": [
{{
"action_type": "动作类型reply, emoji等",
"action_type": "reply",
"reasoning": "选择该动作的理由",
"action_data": {{
"target_message_id": "目标消息ID",
"content": "回复内容或其他动作所需数据"
"target_message_id": "m123",
"content": "你的回复内容"
}}
}}
]
}}
```
### **多重回复示例 (核心功能)**
当有多人与你互动时,你需要同时回应他们,甚至可以同时处理三个!
示例(多重回复,并行):
```json
{{
"thinking": "哇,群里好热闹呀!张三、李四、王五都在@我!让我看看...张三在问我昨天推荐的电影好不好看,这个得好好分享一下观后感。李四在说他家的猫咪学会了新技能,好可爱,得夸夸他!王五好像遇到点麻烦,在问我一个技术问题,这个得优先、详细地解答一下!得一个个来!",
"thinking": "在这里写下你的思绪流...",
"actions": [
{{
"action_type": "reply",
"reasoning": "回应张三关于电影的提问,并分享我的看法。",
"reasoning": "理由A",
"action_data": {{
"target_message_id": "m124",
"content": "张三!你问的那部电影我昨天也看啦,真的超赞!特别是最后那个反转,简直让人意想不到!"
"content": "对A的回复"
}}
}},
{{
"action_type": "reply",
"reasoning": "回应李四分享的趣事,表达赞美和羡慕。",
"reasoning": "理由B",
"action_data": {{
"target_message_id": "m125",
"content": "哇,李四你家猫咪也太聪明了吧!居然会握手了!好羡慕呀!"
}}
}},
{{
"action_type": "reply",
"reasoning": "优先回应王五的技术求助,并提供详细的解答。",
"action_data": {{
"target_message_id": "m126",
"content": "王五别急,你说的那个问题我之前也遇到过。你试试看是不是配置文件里的`enable_magic`选项没有设置成`true`?如果还不行你再把错误截图发我看看。"
"content": "对B的回复"
}}
}}
]
}}
```
**强制规则**:
- 对于每一个需要目标消息的动作(如`reply`, `poke_user`, `set_emoji_like`),你 **必须** 在`action_data`中提供准确的`target_message_id`这个ID来源于`## 未读历史消息`中消息前的`<m...>`标签。
- 当你选择的动作需要参数时(例如 `set_emoji_like` 需要 `emoji` 参数),你 **必须** `action_data`提供所有必需的参数及其对应的值
如果没有合适的回复对象或不需要回复,输出空的 actions 数组
# 强制规则
- 需要目标消息的动作reply/poke_user/set_emoji_like 等),必须提供准确的 target_message_id(来自未读历史里的 <m...> 标签
- 当动作需要额外参数时,必须在 action_data 中补全
- 私聊场景只允许使用 reply群聊可选用辅助动作。
- 如果没有合适的目标或无需动作,请输出
```json
{{
"thinking": "说明为什么不需要回复",
"thinking": "说明为什么不需要动作/不需要回复",
"actions": []
}}
```
{no_action_block}
""",
"planner_prompt",
)
# 主动规划器提示词,用于主动场景和前瞻性规划
# 主动规划器提示词,用于主动场景和前瞻性规划(与 plan_filter 的传参严格对齐)
Prompt(
"""
{mood_block}
{time_block}
{mood_block}
{identity_block}
{schedule_block}
{users_in_chat}
{custom_prompt_block}
{chat_context_description},以下是具体的聊天内容。
## 🧠 近期记忆与状态
{long_term_memory_block}
## 📜 已读历史消息(仅供参考)
{read_history_block}
## 🗣️ 最近聊天概览
{chat_content_block}
## 📬 未读历史消息(动作执行对象)
{unread_history_block}
## ⏱️ 你刚刚的动作
{actions_before_now_block}
{moderation_prompt}
# 任务
基于当前语境,主动构建一次响应动作组合:
- 主要动作通常是 reply如果需要回复
- 如在群聊且气氛合适,可选择一个辅助动作(如 emoji、poke_user增强表达。
- 如果刚刚已经连续发言且无人回应,可考虑 no_reply什么都不做
**任务: 构建一个完整的响应**
你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成:
1. **主要动作**: 这是响应的核心,通常是 `reply`(如果有)。
2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。
**决策流程:**
1. **重要:已读历史消息仅作为当前聊天情景的参考,帮助你理解对话上下文。**
2. **重要:所有动作的执行对象只能是未读历史消息中的消息,不能对已读消息执行动作。**
3. 在未读历史消息中,优先对兴趣值高的消息做出动作(兴趣值标注在消息末尾)。
4. 首先,决定是否要对未读消息进行 `reply`(如果有)。
5. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。
6. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。
7. 如果用户明确要求了某个动作,请务必优先满足。
**动作限制:**
- 在私聊中,你只能使用 `reply` 动作。私聊中不允许使用任何其他动作。
- 在群聊中,你可以自由选择是否使用辅助动作。
**重要提醒:**
- **回复消息时必须遵循对话的流程,不要重复已经说过的话。**
- **确保回复与上下文紧密相关,回应要针对用户的消息内容。**
- **保持角色设定的一致性,使用符合你性格的语言风格。**
**输出格式:**
请严格按照以下 JSON 格式输出,包含 `thinking` 和 `actions` 字段:
```json
{{
"thinking": "你的思考过程,分析当前情况并说明为什么选择这些动作",
"actions": [
{{
"action_type": "动作类型reply, emoji等",
"reasoning": "选择该动作的理由",
"action_data": {{
"target_message_id": "目标消息ID",
"content": "回复内容或其他动作所需数据"
}}
}}
]
}}
```
如果没有合适的回复对象或不需要回复,输出空的 actions 数组:
```json
{{
"thinking": "说明为什么不需要回复",
"actions": []
}}
```
# 输出要求
- thinking 为思绪流(自然、非结论化,不含技术术语或“兴趣度”等字眼)。
- 严格只输出 JSON结构与普通规划器一致包含 "thinking""actions"(数组)。
- 对需要目标消息的动作,提供准确的 target_message_id若无可用目标可返回空 actions)。
""",
"proactive_planner_prompt",
)

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.1.8"
version = "7.2.1"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -40,6 +40,9 @@ mysql_sql_mode = "TRADITIONAL" # SQL模式
connection_pool_size = 10 # 连接池大小仅MySQL有效
connection_timeout = 10 # 连接超时时间(秒)
# 批量动作记录存储配置
batch_action_storage_enabled = true # 是否启用批量保存动作记录(开启后将多个动作一次性写入数据库,提升性能)
[permission] # 权限系统配置
# Master用户配置拥有最高权限无视所有权限节点
# 格式:[[platform, user_id], ...]