This commit is contained in:
tt-P607
2025-10-20 10:05:45 +08:00
18 changed files with 1149 additions and 16 deletions

View File

@@ -1,3 +1,50 @@
# Notice 系统重构 (2025-10-19)
## 🔧 核心改进
1. **移除硬编码的 Notice 类型判定**
- 之前某些 `notice_type` 会被硬编码为公共 notice`group_whole_ban``system_announcement` 等)
- 现在 notice 的作用域完全由 `additional_config` 中的 `is_public_notice` 字段决定
- 提供了更灵活和可控的 notice 管理方式
2. **修改的文件**
- `src/chat/message_manager/message_manager.py`: 移除 `_determine_notice_scope` 方法中的硬编码逻辑
- `src/chat/utils/prompt_params.py`: 添加缺失的 `notice_block` 字段
3. **新增文档**
- `docs/guides/notice_system_guide.md`: 完整的 Notice 系统使用指南
## 💡 使用方式
### 流级 Notice默认
```python
additional_config = {
"is_notice": True,
"is_public_notice": False, # 或者不设置
"notice_type": "group_ban"
}
```
### 公共 Notice
```python
additional_config = {
"is_notice": True,
"is_public_notice": True, # 显式设置为公共
"notice_type": "system_announcement"
}
```
## ⚠️ 迁移注意
如果你的插件依赖以下 notice 类型自动成为公共 notice
- `group_whole_ban`
- `group_whole_lift_ban`
- `system_announcement`
- `platform_maintenance`
请在 `additional_config` 中显式添加 `"is_public_notice": True`
---
# 插件API与规范修改
1. 现在`plugin_system``__init__.py`文件中包含了所有插件API的导入用户可以直接使用`from src.plugin_system import *`来导入所有API。

View File

@@ -0,0 +1,248 @@
# Notice 系统使用指南
## 概述
Notice 系统用于管理和展示系统通知消息,支持两种作用域:
- **公共 NoticePublic**: 对所有聊天流可见
- **流级 NoticeStream**: 仅对特定聊天流可见
## Notice 配置
### 1. 消息标记为 Notice
在消息的 `additional_config` 中设置以下字段:
```python
additional_config = {
"is_notice": True, # 标记为notice消息
"notice_type": "group_ban", # notice类型可选
"is_public_notice": False, # 是否为公共notice
}
```
### 2. Notice 作用域
Notice 的作用域完全由 `is_public_notice` 字段决定:
#### 流级 Notice默认
```python
additional_config = {
"is_notice": True,
"is_public_notice": False, # 或者不设置该字段
}
```
- 仅在消息所属的聊天流中可见
- 适用于:群禁言、群解禁、戳一戳等群内事件
#### 公共 Notice
```python
additional_config = {
"is_notice": True,
"is_public_notice": True, # 明确设置为公共
}
```
- 在所有聊天流中可见
- 适用于:系统公告、平台维护通知等全局事件
### 3. Notice 类型
通过 `notice_type` 字段可以对 notice 进行分类:
```python
# 常见的 notice 类型
notice_types = {
"group_ban": "群禁言",
"group_lift_ban": "群解禁",
"group_whole_ban": "全员禁言",
"group_whole_lift_ban": "全员解禁",
"poke": "戳一戳",
"system_announcement": "系统公告",
"platform_maintenance": "平台维护",
}
```
### 4. Notice 生存时间TTL
Notice 消息会在一定时间后自动过期,默认为 1 小时3600 秒)。
不同类型的 notice 可以有不同的 TTL
- 临时事件(戳一戳): 5 分钟
- 群管理事件(禁言/解禁): 1 小时
- 重要公告: 24 小时
## 使用示例
### 示例 1: 群禁言通知(流级)
```python
from src.common.data_models.database_data_model import DatabaseMessages
message = DatabaseMessages(
chat_id="group_123456",
sender_id="10001",
raw_message="用户 张三 被管理员禁言 10 分钟",
additional_config={
"is_notice": True,
"is_public_notice": False, # 仅该群可见
"notice_type": "group_ban",
"target_id": "user_12345",
}
)
```
### 示例 2: 系统维护公告(公共)
```python
message = DatabaseMessages(
chat_id="system",
sender_id="system",
raw_message="系统将于今晚 23:00 进行维护,预计 1 小时",
additional_config={
"is_notice": True,
"is_public_notice": True, # 所有聊天流可见
"notice_type": "platform_maintenance",
}
)
```
### 示例 3: 在插件中发送 Notice
```python
from src.api import send_private_message, send_group_message
# 发送群内 notice
await send_group_message(
group_id=123456,
message="管理员已开启全员禁言",
additional_config={
"is_notice": True,
"is_public_notice": False,
"notice_type": "group_whole_ban",
}
)
# 发送公共 notice
await send_group_message(
group_id=123456, # 任意有效的群号
message="🔔 Bot 将在 5 分钟后重启进行更新",
additional_config={
"is_notice": True,
"is_public_notice": True,
"notice_type": "system_announcement",
}
)
```
## Notice 在 Prompt 中的展示
当启用 `notice_in_prompt` 配置时notice 消息会被自动添加到 AI 的提示词中:
```
## 📢 最近的系统通知
[群禁言] 用户 张三 被管理员禁言 10 分钟 (5分钟前)
[戳一戳] 李四 戳了戳 你 (刚刚)
[系统公告] Bot 将在 5 分钟后重启进行更新 (2分钟前)
```
## 配置选项
`bot_config.toml` 中配置 notice 系统:
```toml
[notice]
# 是否在 prompt 中显示 notice
notice_in_prompt = true
# prompt 中显示的 notice 数量限制
notice_prompt_limit = 5
```
## 注意事项
1. **作用域控制**:
- `is_public_notice` 字段是唯一决定 notice 作用域的因素
- 不要依赖 `notice_type` 来控制作用域
2. **性能考虑**:
- Notice 消息会自动过期清理
- 每种类型最多存储 100 条 notice
- 每 5 分钟自动清理过期消息
3. **兼容性**:
- 如果不设置 `is_public_notice`,默认为流级 notice
- 旧代码中基于 `notice_type` 的判断已被移除
## 迁移指南
如果你的代码中依赖了以下 notice 类型自动成为公共 notice 的行为:
- `group_whole_ban`
- `group_whole_lift_ban`
- `system_announcement`
- `platform_maintenance`
请在消息的 `additional_config` 中显式设置:
```python
# 修改前(依赖硬编码)
additional_config = {
"is_notice": True,
"notice_type": "system_announcement",
# 会自动成为公共 notice
}
# 修改后(显式指定)
additional_config = {
"is_notice": True,
"notice_type": "system_announcement",
"is_public_notice": True, # 显式设置
}
```
## API 参考
### GlobalNoticeManager
```python
from src.chat.message_manager.global_notice_manager import global_notice_manager
# 添加 notice
success = global_notice_manager.add_notice(
message=db_message,
scope=NoticeScope.PUBLIC, # 或 NoticeScope.STREAM
target_stream_id="group_123456", # STREAM 模式必需
ttl=3600 # 生存时间(秒)
)
# 获取可访问的 notice
notices = global_notice_manager.get_accessible_notices(
stream_id="group_123456",
limit=10
)
# 获取格式化的 notice 文本
text = global_notice_manager.get_notice_text(
stream_id="group_123456",
limit=5
)
```
## 常见问题
### Q: Notice 不显示在 prompt 中?
A: 检查配置:
1. `bot_config.toml``notice.notice_in_prompt = true`
2. 确认消息的 `is_notice = True`
3. 确认 notice 未过期
### Q: 如何让 notice 对所有群可见?
A: 在 `additional_config` 中设置 `is_public_notice = True`
### Q: 如何设置自定义的 notice 类型?
A: 在 `additional_config` 中设置任意字符串作为 `notice_type`
### Q: Notice 什么时候会被清理?
A:
1. 超过 TTL 时间后自动清理
2. 每种类型超过 100 条时,移除最旧的
3. 手动调用清理 API

View File

@@ -0,0 +1,400 @@
"""
全局Notice管理器
用于统一管理所有notice消息将notice与正常消息分离
"""
import time
import threading
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.logger import get_logger
logger = get_logger("global_notice_manager")
class NoticeScope(Enum):
"""Notice作用域"""
PUBLIC = "public" # 公共notice所有聊天流可见
STREAM = "stream" # 特定聊天流notice
@dataclass
class NoticeMessage:
"""Notice消息数据结构"""
message: DatabaseMessages
scope: NoticeScope
target_stream_id: Optional[str] = None # 如果是STREAM类型指定目标流ID
timestamp: float = field(default_factory=time.time)
ttl: int = 3600 # 默认1小时过期
def is_expired(self) -> bool:
"""检查是否过期"""
return time.time() - self.timestamp > self.ttl
def is_accessible_by_stream(self, stream_id: str) -> bool:
"""检查聊天流是否可以访问此notice"""
if self.scope == NoticeScope.PUBLIC:
return True
return self.target_stream_id == stream_id
class GlobalNoticeManager:
"""全局Notice管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
self._notices: Dict[str, deque[NoticeMessage]] = defaultdict(deque)
self._max_notices_per_type = 100 # 每种类型最大存储数量
self._cleanup_interval = 300 # 5分钟清理一次过期消息
self._last_cleanup_time = time.time()
# 统计信息
self.stats = {
"total_notices": 0,
"public_notices": 0,
"stream_notices": 0,
"expired_notices": 0,
"last_cleanup_time": 0,
}
logger.info("全局Notice管理器初始化完成")
def add_notice(
self,
message: DatabaseMessages,
scope: NoticeScope = NoticeScope.STREAM,
target_stream_id: Optional[str] = None,
ttl: Optional[int] = None
) -> bool:
"""添加notice消息
Args:
message: 数据库消息对象
scope: notice作用域
target_stream_id: 目标聊天流ID仅在STREAM模式下有效
ttl: 生存时间默认为1小时
Returns:
bool: 是否添加成功
"""
try:
# 验证消息是否为notice类型
if not self._is_notice_message(message):
logger.warning(f"尝试添加非notice消息: {message.message_id}")
return False
# 验证参数
if scope == NoticeScope.STREAM and not target_stream_id:
logger.error("STREAM类型的notice必须指定target_stream_id")
return False
# 创建notice消息
notice = NoticeMessage(
message=message,
scope=scope,
target_stream_id=target_stream_id,
ttl=ttl or 3600 # 默认1小时
)
# 确定存储键
storage_key = self._get_storage_key(scope, target_stream_id, message)
# 添加到存储
self._notices[storage_key].append(notice)
# 限制数量
if len(self._notices[storage_key]) > self._max_notices_per_type:
# 移除最旧的消息
removed = self._notices[storage_key].popleft()
logger.debug(f"移除过期notice: {removed.message.message_id}")
# 更新统计
self.stats["total_notices"] += 1
if scope == NoticeScope.PUBLIC:
self.stats["public_notices"] += 1
else:
self.stats["stream_notices"] += 1
# 定期清理过期消息
self._cleanup_expired_notices()
logger.info(f"✅ Notice已添加: id={message.message_id}, type={self._get_notice_type(message)}, scope={scope.value}, target={target_stream_id}, storage_key={storage_key}, ttl={ttl}s")
return True
except Exception as e:
logger.error(f"添加notice消息失败: {e}")
return False
def get_accessible_notices(self, stream_id: str, limit: int = 20) -> List[NoticeMessage]:
"""获取指定聊天流可访问的notice消息
Args:
stream_id: 聊天流ID
limit: 最大返回数量
Returns:
List[NoticeMessage]: 可访问的notice消息列表按时间倒序排列
"""
try:
accessible_notices = []
current_time = time.time()
# 清理过期消息
if current_time - self._last_cleanup_time > self._cleanup_interval:
self._cleanup_expired_notices()
# 收集可访问的notice
for storage_key, notices in self._notices.items():
for notice in notices:
if notice.is_expired():
continue
if notice.is_accessible_by_stream(stream_id):
accessible_notices.append(notice)
# 按时间倒序排列
accessible_notices.sort(key=lambda x: x.timestamp, reverse=True)
# 限制数量
return accessible_notices[:limit]
except Exception as e:
logger.error(f"获取可访问notice失败: {e}")
return []
def get_notice_text(self, stream_id: str, limit: int = 10) -> str:
"""获取格式化的notice文本用于构建提示词
Args:
stream_id: 聊天流ID
limit: 最大notice数量
Returns:
str: 格式化的notice文本块不包含标题由调用方添加
"""
try:
notices = self.get_accessible_notices(stream_id, limit)
if not notices:
logger.debug(f"没有可访问的notice消息: stream_id={stream_id}")
return ""
# 构建notice文本块不包含标题和结束线
notice_lines = []
for notice in notices:
message = notice.message
notice_type = self._get_notice_type(message)
# 格式化notice消息
if notice_type:
notice_line = f"[{notice_type}] {message.processed_plain_text or message.raw_message}"
else:
notice_line = f"[通知] {message.processed_plain_text or message.raw_message}"
# 添加时间信息(相对时间)
time_diff = int(time.time() - notice.timestamp)
if time_diff < 60:
time_str = "刚刚"
elif time_diff < 3600:
time_str = f"{time_diff // 60}分钟前"
elif time_diff < 86400:
time_str = f"{time_diff // 3600}小时前"
else:
time_str = f"{time_diff // 86400}天前"
notice_line += f" ({time_str})"
notice_lines.append(notice_line)
result = "\n".join(notice_lines)
logger.debug(f"获取notice文本成功: stream_id={stream_id}, 数量={len(notices)}")
return result
except Exception as e:
logger.error(f"获取notice文本失败: {e}", exc_info=True)
return ""
def clear_notices(self, stream_id: Optional[str] = None, notice_type: Optional[str] = None) -> int:
"""清理notice消息
Args:
stream_id: 聊天流ID如果为None则清理所有流
notice_type: notice类型如果为None则清理所有类型
Returns:
int: 清理的消息数量
"""
try:
removed_count = 0
# 需要移除的键
keys_to_remove = []
for storage_key, notices in self._notices.items():
new_notices = deque()
for notice in notices:
should_remove = True
# 检查流ID过滤
if stream_id is not None:
if notice.scope == NoticeScope.STREAM:
if notice.target_stream_id != stream_id:
should_remove = False
else:
# 公共notice只有当指定清理所有流时才清理
should_remove = False
# 检查notice类型过滤
if should_remove and notice_type is not None:
message_type = self._get_notice_type(notice.message)
if message_type != notice_type:
should_remove = False
if should_remove:
removed_count += 1
else:
new_notices.append(notice)
if new_notices:
self._notices[storage_key] = new_notices
else:
keys_to_remove.append(storage_key)
# 移除空的键
for key in keys_to_remove:
del self._notices[key]
logger.info(f"清理notice消息: {removed_count}")
return removed_count
except Exception as e:
logger.error(f"清理notice消息失败: {e}")
return 0
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
# 更新实时统计
total_active_notices = sum(len(notices) for notices in self._notices.values())
self.stats["total_notices"] = total_active_notices
self.stats["active_keys"] = len(self._notices)
self.stats["last_cleanup_time"] = int(self._last_cleanup_time)
# 添加详细的存储键信息
storage_keys_info = {}
for key, notices in self._notices.items():
storage_keys_info[key] = {
"count": len(notices),
"oldest": min((n.timestamp for n in notices), default=0),
"newest": max((n.timestamp for n in notices), default=0),
}
self.stats["storage_keys"] = storage_keys_info
return self.stats.copy()
def _is_notice_message(self, message: DatabaseMessages) -> bool:
"""检查消息是否为notice类型"""
try:
# 首先检查消息的is_notify字段
if hasattr(message, 'is_notify') and message.is_notify:
return True
# 检查消息的附加配置
if hasattr(message, 'additional_config') and message.additional_config:
if isinstance(message.additional_config, dict):
return message.additional_config.get("is_notice", False)
elif isinstance(message.additional_config, str):
# 兼容JSON字符串格式
import json
config = json.loads(message.additional_config)
return config.get("is_notice", False)
# 检查消息类型或其他标识
return False
except Exception as e:
logger.debug(f"检查notice类型失败: {e}")
return False
def _get_storage_key(self, scope: NoticeScope, target_stream_id: Optional[str], message: DatabaseMessages) -> str:
"""生成存储键"""
if scope == NoticeScope.PUBLIC:
return "public"
else:
notice_type = self._get_notice_type(message) or "default"
return f"stream_{target_stream_id}_{notice_type}"
def _get_notice_type(self, message: DatabaseMessages) -> Optional[str]:
"""获取notice类型"""
try:
if hasattr(message, 'additional_config') and message.additional_config:
if isinstance(message.additional_config, dict):
return message.additional_config.get("notice_type")
elif isinstance(message.additional_config, str):
import json
config = json.loads(message.additional_config)
return config.get("notice_type")
return None
except Exception:
return None
def _cleanup_expired_notices(self) -> int:
"""清理过期的notice消息"""
try:
current_time = time.time()
if current_time - self._last_cleanup_time < self._cleanup_interval:
return 0
removed_count = 0
keys_to_remove = []
for storage_key, notices in self._notices.items():
new_notices = deque()
for notice in notices:
if notice.is_expired():
removed_count += 1
self.stats["expired_notices"] += 1
else:
new_notices.append(notice)
if new_notices:
self._notices[storage_key] = new_notices
else:
keys_to_remove.append(storage_key)
# 移除空的键
for key in keys_to_remove:
del self._notices[key]
self._last_cleanup_time = current_time
if removed_count > 0:
logger.debug(f"清理过期notice: {removed_count}")
return removed_count
except Exception as e:
logger.error(f"清理过期notice失败: {e}")
return 0
# 创建全局单例实例
global_notice_manager = GlobalNoticeManager()

View File

@@ -7,7 +7,7 @@ import asyncio
import random
import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Dict, Optional
from src.chat.chatter_manager import ChatterManager
from src.chat.message_receive.chat_stream import ChatStream
@@ -20,6 +20,8 @@ from src.plugin_system.apis.chat_api import get_chat_manager
from .distribution_manager import stream_loop_manager
from .sleep_system.state_manager import SleepState, sleep_state_manager
from .global_notice_manager import global_notice_manager, NoticeScope
if TYPE_CHECKING:
pass
@@ -52,6 +54,9 @@ class MessageManager:
# 不再需要全局上下文管理器,直接通过 ChatManager 访问各个 ChatStream 的 context_manager
# 全局Notice管理器
self.notice_manager = global_notice_manager
async def start(self):
"""启动消息管理器"""
if self.is_running:
@@ -153,6 +158,14 @@ class MessageManager:
# TODO: 在这里为 WOKEN_UP_ANGRY 等未来状态添加特殊处理逻辑
try:
# 检查是否为notice消息
if self._is_notice_message(message):
# Notice消息处理 - 不进入未读消息
logger.info(f"📢 检测到notice消息: message_id={message.message_id}, is_notify={message.is_notify}, notice_type={getattr(message, 'notice_type', None)}")
await self._handle_notice_message(stream_id, message)
return
# 普通消息处理
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
@@ -617,6 +630,141 @@ class MessageManager:
"processing_streams": len([s for s in self.stream_processing_status.keys() if self.stream_processing_status[s]]),
}
# ===== Notice管理相关方法 =====
def _is_notice_message(self, message: DatabaseMessages) -> bool:
"""检查消息是否为notice类型"""
try:
# 首先检查消息的is_notify字段
if hasattr(message, 'is_notify') and message.is_notify:
return True
# 检查消息的附加配置
if hasattr(message, 'additional_config') and message.additional_config:
if isinstance(message.additional_config, dict):
return message.additional_config.get("is_notice", False)
elif isinstance(message.additional_config, str):
# 兼容JSON字符串格式
import json
config = json.loads(message.additional_config)
return config.get("is_notice", False)
return False
except Exception as e:
logger.debug(f"检查notice类型失败: {e}")
return False
async def _handle_notice_message(self, stream_id: str, message: DatabaseMessages) -> None:
"""处理notice消息将其添加到全局notice管理器"""
try:
# 获取notice作用域
scope = self._determine_notice_scope(message, stream_id)
# 添加到全局notice管理器
success = self.notice_manager.add_notice(
message=message,
scope=scope,
target_stream_id=stream_id if scope == NoticeScope.STREAM else None,
ttl=self._get_notice_ttl(message)
)
if success:
logger.info(f"✅ Notice消息已添加到全局管理器: message_id={message.message_id}, scope={scope.value}, stream={stream_id}, ttl={self._get_notice_ttl(message)}s")
else:
logger.warning(f"❌ Notice消息添加失败: message_id={message.message_id}")
except Exception as e:
logger.error(f"处理notice消息失败: {e}")
def _determine_notice_scope(self, message: DatabaseMessages, stream_id: str) -> NoticeScope:
"""确定notice的作用域
作用域完全由 additional_config 中的 is_public_notice 字段决定:
- is_public_notice=True: 公共notice所有聊天流可见
- is_public_notice=False 或未设置: 特定聊天流notice
"""
try:
# 检查附加配置中的公共notice标志
if hasattr(message, 'additional_config') and message.additional_config:
if isinstance(message.additional_config, dict):
is_public = message.additional_config.get("is_public_notice", False)
elif isinstance(message.additional_config, str):
import json
config = json.loads(message.additional_config)
is_public = config.get("is_public_notice", False)
else:
is_public = False
if is_public:
logger.debug(f"Notice被标记为公共: message_id={message.message_id}")
return NoticeScope.PUBLIC
# 默认为特定聊天流notice
return NoticeScope.STREAM
except Exception as e:
logger.debug(f"确定notice作用域失败: {e}")
return NoticeScope.STREAM
def _get_notice_type(self, message: DatabaseMessages) -> Optional[str]:
"""获取notice类型"""
try:
if hasattr(message, 'additional_config') and message.additional_config:
if isinstance(message.additional_config, dict):
return message.additional_config.get("notice_type")
elif isinstance(message.additional_config, str):
import json
config = json.loads(message.additional_config)
return config.get("notice_type")
return None
except Exception:
return None
def _get_notice_ttl(self, message: DatabaseMessages) -> int:
"""获取notice的生存时间"""
try:
# 根据notice类型设置不同的TTL
notice_type = self._get_notice_type(message)
ttl_mapping = {
"poke": 1800, # 戳一戳30分钟
"emoji_like": 3600, # 表情回复1小时
"group_ban": 7200, # 禁言2小时
"group_lift_ban": 7200, # 解禁2小时
"group_whole_ban": 3600, # 全体禁言1小时
"group_whole_lift_ban": 3600, # 解除全体禁言1小时
}
return ttl_mapping.get(notice_type, 3600) # 默认1小时
except Exception:
return 3600
def get_notice_text(self, stream_id: str, limit: int = 10) -> str:
"""获取指定聊天流的notice文本用于构建提示词"""
try:
return self.notice_manager.get_notice_text(stream_id, limit)
except Exception as e:
logger.error(f"获取notice文本失败: {e}")
return ""
def clear_notices(self, stream_id: Optional[str] = None, notice_type: Optional[str] = None) -> int:
"""清理notice消息"""
try:
return self.notice_manager.clear_notices(stream_id, notice_type)
except Exception as e:
logger.error(f"清理notice失败: {e}")
return 0
def get_notice_stats(self) -> Dict[str, Any]:
"""获取notice管理器统计信息"""
try:
return self.notice_manager.get_stats()
except Exception as e:
logger.error(f"获取notice统计失败: {e}")
return {}
# 创建全局消息管理器实例
message_manager = MessageManager()

View File

@@ -301,12 +301,35 @@ class ChatBot:
return False, None, True # 出错时继续处理消息
async def handle_notice_message(self, message: MessageRecv):
"""处理notice消息
notice消息是系统事件通知如禁言、戳一戳等具有以下特点
1. 默认不触发聊天流程,只记录
2. 可通过配置开启触发聊天流程
3. 会在提示词中展示
"""
# 检查是否是notice消息
if message.is_notify:
logger.info(f"收到notice消息: {message.notice_type}")
# 根据配置决定是否触发聊天流程
if not global_config.notice.enable_notice_trigger_chat:
logger.debug("notice消息不触发聊天流程配置已关闭")
return True # 返回True表示已处理不继续后续流程
else:
logger.debug("notice消息触发聊天流程配置已开启")
return False # 返回False表示继续处理触发聊天流程
# 兼容旧的notice判断方式
if message.message_info.message_id == "notice":
message.is_notify = True
logger.info("notice消息")
# print(message)
logger.info("旧格式notice消息")
return True
# 同样根据配置决定
if not global_config.notice.enable_notice_trigger_chat:
return True
else:
return False
# 处理适配器响应消息
if hasattr(message, "message_segment") and message.message_segment:
@@ -379,19 +402,31 @@ class ChatBot:
# 确保所有任务已启动
await self._ensure_started()
platform = message_data["message_info"].get("platform")
# 控制握手等消息可能缺少 message_info这里直接跳过避免 KeyError
if not isinstance(message_data, dict):
logger.warning(f"收到无法解析的消息类型: {type(message_data)},已跳过")
return
message_info = message_data.get("message_info")
if not isinstance(message_info, dict):
logger.debug(
"收到缺少 message_info 的消息,已跳过。可用字段: %s",
", ".join(message_data.keys()),
)
return
platform = message_info.get("platform")
if platform == "amaidesu_default":
await self.do_s4u(message_data)
return
if message_data["message_info"].get("group_info") is not None:
message_data["message_info"]["group_info"]["group_id"] = str(
message_data["message_info"]["group_info"]["group_id"]
if message_info.get("group_info") is not None:
message_info["group_info"]["group_id"] = str(
message_info["group_info"]["group_id"]
)
if message_data["message_info"].get("user_info") is not None:
message_data["message_info"]["user_info"]["user_id"] = str(
message_data["message_info"]["user_info"]["user_id"]
if message_info.get("user_info") is not None:
message_info["user_info"]["user_id"] = str(
message_info["user_info"]["user_id"]
)
# print(message_data)
# logger.debug(str(message_data))
@@ -425,6 +460,107 @@ class ChatBot:
f"[{chat_name}]{message.message_info.user_info.user_nickname}:{message.processed_plain_text}\u001b[0m"
)
# 处理notice消息
notice_handled = await self.handle_notice_message(message)
if notice_handled:
# notice消息已处理需要先添加到message_manager再存储
try:
from src.common.data_models.database_data_model import DatabaseMessages
import time
message_info = message.message_info
msg_user_info = getattr(message_info, "user_info", None)
stream_user_info = getattr(message.chat_stream, "user_info", None)
group_info = getattr(message.chat_stream, "group_info", None)
message_id = message_info.message_id or ""
message_time = message_info.time if message_info.time is not None else time.time()
user_id = ""
user_nickname = ""
user_cardname = None
user_platform = ""
if msg_user_info:
user_id = str(getattr(msg_user_info, "user_id", "") or "")
user_nickname = getattr(msg_user_info, "user_nickname", "") or ""
user_cardname = getattr(msg_user_info, "user_cardname", None)
user_platform = getattr(msg_user_info, "platform", "") or ""
elif stream_user_info:
user_id = str(getattr(stream_user_info, "user_id", "") or "")
user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
user_cardname = getattr(stream_user_info, "user_cardname", None)
user_platform = getattr(stream_user_info, "platform", "") or ""
chat_user_id = str(getattr(stream_user_info, "user_id", "") or "")
chat_user_nickname = getattr(stream_user_info, "user_nickname", "") or ""
chat_user_cardname = getattr(stream_user_info, "user_cardname", None)
chat_user_platform = getattr(stream_user_info, "platform", "") or ""
group_id = getattr(group_info, "group_id", None)
group_name = getattr(group_info, "group_name", None)
group_platform = getattr(group_info, "platform", None)
# 构建additional_config确保包含is_notice标志
import json
additional_config_dict = {
"is_notice": True,
"notice_type": message.notice_type or "unknown",
"is_public_notice": bool(message.is_public_notice),
}
# 如果message_info有additional_config合并进来
if hasattr(message_info, 'additional_config') and message_info.additional_config:
if isinstance(message_info.additional_config, dict):
additional_config_dict.update(message_info.additional_config)
elif isinstance(message_info.additional_config, str):
try:
existing_config = json.loads(message_info.additional_config)
additional_config_dict.update(existing_config)
except Exception:
pass
additional_config_json = json.dumps(additional_config_dict)
# 创建数据库消息对象
db_message = DatabaseMessages(
message_id=message_id,
time=float(message_time),
chat_id=message.chat_stream.stream_id,
processed_plain_text=message.processed_plain_text,
display_message=message.processed_plain_text,
is_notify=bool(message.is_notify),
is_public_notice=bool(message.is_public_notice),
notice_type=message.notice_type,
additional_config=additional_config_json,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_stream_id=message.chat_stream.stream_id,
chat_info_platform=message.chat_stream.platform,
chat_info_create_time=float(message.chat_stream.create_time),
chat_info_last_active_time=float(message.chat_stream.last_active_time),
chat_info_user_id=chat_user_id,
chat_info_user_nickname=chat_user_nickname,
chat_info_user_cardname=chat_user_cardname,
chat_info_user_platform=chat_user_platform,
chat_info_group_id=group_id,
chat_info_group_name=group_name,
chat_info_group_platform=group_platform,
)
# 添加到message_manager这会将notice添加到全局notice管理器
await message_manager.add_message(message.chat_stream.stream_id, db_message)
logger.info(f"✅ Notice消息已添加到message_manager: type={message.notice_type}, stream={message.chat_stream.stream_id}")
except Exception as e:
logger.error(f"Notice消息添加到message_manager失败: {e}", exc_info=True)
# 存储后直接返回
await MessageStorage.store_message(message, chat)
logger.debug("notice消息已存储跳过后续处理")
return
# 过滤检查
if _check_ban_words(message.processed_plain_text, chat, user_info) or _check_ban_regex( # type: ignore
message.raw_message, # type: ignore
@@ -522,6 +658,8 @@ class ChatBot:
is_picid=bool(message.is_picid),
is_command=bool(message.is_command),
is_notify=bool(message.is_notify),
is_public_notice=bool(message.is_public_notice),
notice_type=message.notice_type,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,

View File

@@ -203,6 +203,8 @@ class ChatStream:
is_video=getattr(message, "is_video", False),
is_command=getattr(message, "is_command", False),
is_notify=getattr(message, "is_notify", False),
is_public_notice=getattr(message, "is_public_notice", False),
notice_type=getattr(message, "notice_type", None),
# 消息内容
processed_plain_text=getattr(message, "processed_plain_text", ""),
display_message=getattr(message, "processed_plain_text", ""), # 默认使用processed_plain_text

View File

@@ -121,7 +121,9 @@ class MessageRecv(Message):
self.is_voice = False
self.is_video = False
self.is_mentioned = None
self.is_notify = False
self.is_notify = False # 是否为notice消息
self.is_public_notice = False # 是否为公共notice
self.notice_type = None # notice类型
self.is_at = False
self.is_command = False
@@ -132,6 +134,12 @@ class MessageRecv(Message):
self.key_words = []
self.key_words_lite = []
# 解析additional_config中的notice信息
if self.message_info.additional_config and isinstance(self.message_info.additional_config, dict):
self.is_notify = self.message_info.additional_config.get("is_notice", False)
self.is_public_notice = self.message_info.additional_config.get("is_public_notice", False)
self.notice_type = self.message_info.additional_config.get("notice_type")
def update_chat_stream(self, chat_stream: "ChatStream"):
self.chat_stream = chat_stream

View File

@@ -222,6 +222,8 @@ class OptimizedChatStream:
is_video=getattr(message, "is_video", False),
is_command=getattr(message, "is_command", False),
is_notify=getattr(message, "is_notify", False),
is_public_notice=getattr(message, "is_public_notice", False),
notice_type=getattr(message, "notice_type", None),
processed_plain_text=getattr(message, "processed_plain_text", ""),
display_message=getattr(message, "processed_plain_text", ""),
priority_mode=getattr(message, "priority_mode", None),

View File

@@ -95,6 +95,8 @@ def init_prompt():
### 📬 未读历史消息(动作执行对象)
{unread_history_prompt}
{notice_block}
## 表达方式
- *你需要参考你的回复风格:*
{reply_style}
@@ -180,6 +182,8 @@ If you need to use the search tool, please directly call the function "lpmm_sear
{relation_info_block}
{extra_info_block}
{notice_block}
{cross_context_block}
{identity}
如果有人说你是人机,你可以用一种阴阳怪气的口吻来回应
@@ -780,6 +784,55 @@ class DefaultReplyer:
return keywords_reaction_prompt
async def build_notice_block(self, chat_id: str) -> str:
"""构建notice信息块
使用全局notice管理器获取notice消息并格式化展示
Args:
chat_id: 聊天ID即stream_id
Returns:
str: 格式化的notice信息文本如果没有notice或未启用则返回空字符串
"""
try:
logger.debug(f"开始构建notice块chat_id={chat_id}")
# 检查是否启用notice in prompt
if not hasattr(global_config, 'notice'):
logger.debug("notice配置不存在")
return ""
if not global_config.notice.notice_in_prompt:
logger.debug("notice_in_prompt配置未启用")
return ""
# 使用全局notice管理器获取notice文本
from src.chat.message_manager.message_manager import message_manager
limit = getattr(global_config.notice, 'notice_prompt_limit', 5)
logger.debug(f"获取notice文本limit={limit}")
notice_text = message_manager.get_notice_text(chat_id, limit)
if notice_text and notice_text.strip():
# 添加标题和格式化
notice_lines = []
notice_lines.append("## 📢 最近的系统通知")
notice_lines.append("")
notice_lines.append(notice_text)
notice_lines.append("")
result = "\n".join(notice_lines)
logger.info(f"notice块构建成功chat_id={chat_id}, 长度={len(result)}")
return result
else:
logger.debug(f"没有可用的notice文本chat_id={chat_id}")
return ""
except Exception as e:
logger.error(f"构建notice块失败chat_id={chat_id}: {e}", exc_info=True)
return ""
async def _time_and_run_task(self, coroutine, name: str) -> tuple[str, Any, float]:
"""计时并运行异步任务的辅助函数
@@ -1226,7 +1279,7 @@ class DefaultReplyer:
from src.chat.utils.prompt import Prompt
# 并行执行六个构建任务
# 并行执行任务
tasks = {
"expression_habits": asyncio.create_task(
self._time_and_run_task(
@@ -1254,6 +1307,9 @@ class DefaultReplyer:
"cross_context",
)
),
"notice_block": asyncio.create_task(
self._time_and_run_task(self.build_notice_block(chat_id), "notice_block")
),
}
# 设置超时
@@ -1272,6 +1328,7 @@ class DefaultReplyer:
"tool_info": "",
"prompt_info": "",
"cross_context": "",
"notice_block": "",
}
logger.info(f"为超时任务 {task_name} 提供默认值")
return task_name, default_values[task_name], timeout
@@ -1304,6 +1361,7 @@ class DefaultReplyer:
tool_info = results_dict["tool_info"]
prompt_info = results_dict["prompt_info"]
cross_context_block = results_dict["cross_context"]
notice_block = results_dict["notice_block"]
# 检查是否为视频分析结果,并注入引导语
if target and ("[视频内容]" in target or "好的,我将根据您提供的" in target):
@@ -1444,6 +1502,7 @@ class DefaultReplyer:
tool_info_block=tool_info,
knowledge_prompt=prompt_info,
cross_context_block=cross_context_block,
notice_block=notice_block,
keywords_reaction_prompt=keywords_reaction_prompt,
extra_info_block=extra_info_block,
time_block=time_block,
@@ -1582,6 +1641,9 @@ class DefaultReplyer:
else:
reply_target_block = ""
# 构建notice_block
notice_block = await self.build_notice_block(chat_id)
if is_group_chat:
await global_prompt_manager.get_prompt_async("chat_target_group1")
await global_prompt_manager.get_prompt_async("chat_target_group2")
@@ -1613,6 +1675,7 @@ class DefaultReplyer:
# 添加已构建的表达习惯和关系信息
expression_habits_block=expression_habits_block,
relation_info_block=relation_info,
notice_block=notice_block,
bot_name=global_config.bot.nickname,
bot_nickname=",".join(global_config.bot.alias_names) if global_config.bot.alias_names else "",
)

View File

@@ -1009,6 +1009,9 @@ async def build_readable_messages(
copy_messages = [msg.copy() for msg in messages]
if not copy_messages:
return ""
if show_actions and copy_messages:
# 获取所有消息的时间范围
min_time = min(msg.get("time", 0) for msg in copy_messages)

View File

@@ -305,6 +305,8 @@ class Prompt:
pre_built_params["knowledge_prompt"] = self.parameters.knowledge_prompt
if self.parameters.cross_context_block:
pre_built_params["cross_context_block"] = self.parameters.cross_context_block
if self.parameters.notice_block:
pre_built_params["notice_block"] = self.parameters.notice_block
# 根据参数确定要构建的项
if self.parameters.enable_expression and not pre_built_params.get("expression_habits_block"):
@@ -801,6 +803,7 @@ class Prompt:
"relation_info_block": context_data.get("relation_info_block", ""),
"extra_info_block": self.parameters.extra_info_block or context_data.get("extra_info_block", ""),
"cross_context_block": context_data.get("cross_context_block", ""),
"notice_block": self.parameters.notice_block or context_data.get("notice_block", ""),
"identity": self.parameters.identity_block or context_data.get("identity", ""),
"action_descriptions": self.parameters.action_descriptions or context_data.get("action_descriptions", ""),
"sender_name": self.parameters.sender or "未知用户",
@@ -830,6 +833,7 @@ class Prompt:
"relation_info_block": context_data.get("relation_info_block", ""),
"extra_info_block": self.parameters.extra_info_block or context_data.get("extra_info_block", ""),
"cross_context_block": context_data.get("cross_context_block", ""),
"notice_block": self.parameters.notice_block or context_data.get("notice_block", ""),
"identity": self.parameters.identity_block or context_data.get("identity", ""),
"action_descriptions": self.parameters.action_descriptions or context_data.get("action_descriptions", ""),
"schedule_block": self.parameters.schedule_block or context_data.get("schedule_block", ""),

View File

@@ -60,6 +60,7 @@ class PromptParameters:
reply_target_block: str = ""
mood_prompt: str = ""
action_descriptions: str = ""
notice_block: str = ""
# 可用动作信息
available_actions: dict[str, Any] | None = None

View File

@@ -67,7 +67,9 @@ class DatabaseMessages(BaseDataModel):
is_emoji: bool = False, # 是否为表情消息
is_picid: bool = False, # 是否为图片消息(包含图片 ID
is_command: bool = False, # 是否为命令消息(如 /help
is_notify: bool = False, # 是否为通知消息(如系统通知
is_notify: bool = False, # 是否为notice消息如禁言、戳一戳等系统事件
is_public_notice: bool = False, # 是否为公共notice所有聊天可见
notice_type: str | None = None, # notice类型由适配器指定如 "group_ban", "poke" 等)
selected_expressions: str | None = None, # 选择的表情或响应模板
is_read: bool = False, # 是否已读
user_id: str = "", # 用户 ID
@@ -110,6 +112,8 @@ class DatabaseMessages(BaseDataModel):
self.is_picid = is_picid
self.is_command = is_command
self.is_notify = is_notify
self.is_public_notice = is_public_notice
self.notice_type = notice_type
self.selected_expressions = selected_expressions
self.is_read = is_read
self.actions = actions
@@ -180,6 +184,8 @@ class DatabaseMessages(BaseDataModel):
"is_picid": self.is_picid,
"is_command": self.is_command,
"is_notify": self.is_notify,
"is_public_notice": self.is_public_notice,
"notice_type": self.notice_type,
"selected_expressions": self.selected_expressions,
"is_read": self.is_read,
"actions": self.actions,

View File

@@ -248,6 +248,8 @@ class Messages(Base):
is_picid: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_command: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_notify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_public_notice: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
notice_type: Mapped[str | None] = mapped_column(String(50), nullable=True)
# 兴趣度系统字段
actions: Mapped[str | None] = mapped_column(Text, nullable=True)

View File

@@ -33,6 +33,7 @@ from src.config.official_configs import (
MessageReceiveConfig,
MoodConfig,
NormalChatConfig,
NoticeConfig,
PermissionConfig,
PersonalityConfig,
PlanningSystemConfig,
@@ -378,6 +379,7 @@ class Config(ValidatedConfigBase):
personality: PersonalityConfig = Field(..., description="个性配置")
chat: ChatConfig = Field(..., description="聊天配置")
message_receive: MessageReceiveConfig = Field(..., description="消息接收配置")
notice: NoticeConfig = Field(..., description="Notice消息配置")
normal_chat: NormalChatConfig = Field(..., description="普通聊天配置")
emoji: EmojiConfig = Field(..., description="表情配置")
expression: ExpressionConfig = Field(..., description="表达配置")

View File

@@ -150,6 +150,17 @@ class MessageReceiveConfig(ValidatedConfigBase):
ban_msgs_regex: list[str] = Field(default_factory=lambda: [], description="禁用消息正则列表")
class NoticeConfig(ValidatedConfigBase):
"""Notice消息配置类"""
enable_notice_trigger_chat: bool = Field(default=False, description="是否允许notice消息触发聊天流程")
notice_in_prompt: bool = Field(default=True, description="是否在提示词中展示最近的notice消息")
notice_prompt_limit: int = Field(default=5, ge=1, le=20, description="在提示词中展示的最大notice数量")
notice_time_window: int = Field(default=3600, ge=60, le=86400, description="notice时间窗口(秒)")
max_notices_per_chat: int = Field(default=30, ge=10, le=100, description="每个聊天保留的notice数量上限")
notice_retention_time: int = Field(default=86400, ge=3600, le=604800, description="notice保留时间(秒)")
class NormalChatConfig(ValidatedConfigBase):
"""普通聊天配置类"""

View File

@@ -180,6 +180,38 @@ class NoticeHandler:
group_name=group_name,
)
# 准备additional_config包含notice标志
notice_config = {
"is_notice": system_notice, # 禁言/解禁是系统通知
"is_public_notice": False, # 群内notice非公共
"target_id": target_id, # 在这里塞了一个target_id方便mmc那边知道被戳的人是谁
}
# 根据notice_type设置notice_type字段
if system_notice:
sub_type = raw_message.get("sub_type")
if notice_type == NoticeType.group_ban:
if sub_type == NoticeType.GroupBan.ban:
user_id_in_ban = raw_message.get("user_id")
if user_id_in_ban == 0:
notice_config["notice_type"] = "group_whole_ban"
else:
notice_config["notice_type"] = "group_ban"
elif sub_type == NoticeType.GroupBan.lift_ban:
user_id_in_ban = raw_message.get("user_id")
if user_id_in_ban == 0:
notice_config["notice_type"] = "group_whole_lift_ban"
else:
notice_config["notice_type"] = "group_lift_ban"
elif notice_type == NoticeType.notify:
sub_type = raw_message.get("sub_type")
if sub_type == NoticeType.Notify.poke:
notice_config["notice_type"] = "poke"
notice_config["is_notice"] = True # 戳一戳也是notice
elif notice_type == NoticeType.group_msg_emoji_like:
notice_config["notice_type"] = "emoji_like"
notice_config["is_notice"] = True # 表情回复也是notice
message_info: BaseMessageInfo = BaseMessageInfo(
platform=config_api.get_plugin_config(self.plugin_config, "maibot_server.platform_name", "qq"),
message_id="notice",
@@ -191,7 +223,7 @@ class NoticeHandler:
content_format=["text", "notify"],
accept_format=ACCEPT_FORMAT,
),
additional_config={"target_id": target_id}, # 在这里塞了一个target_id方便mmc那边知道被戳的人是谁
additional_config=notice_config, # 字典而不是JSON字符串
)
message_base: MessageBase = MessageBase(
@@ -504,6 +536,13 @@ class NoticeHandler:
group_name=group_name,
)
# 准备notice标志
notice_config = {
"is_notice": True,
"is_public_notice": False,
"notice_type": "group_lift_ban" if user_id != 0 else "group_whole_lift_ban",
}
message_info: BaseMessageInfo = BaseMessageInfo(
platform=config_api.get_plugin_config(self.plugin_config, "maibot_server.platform_name", "qq"),
message_id="notice",
@@ -512,6 +551,7 @@ class NoticeHandler:
group_info=group_info,
template_info=None,
format_info=None,
additional_config=notice_config, # 字典而不是JSON字符串
)
message_base: MessageBase = MessageBase(

View File

@@ -149,6 +149,14 @@ ban_msgs_regex = [
#"\\d{4}-\\d{2}-\\d{2}", # 匹配日期
]
[notice] # Notice消息配置
enable_notice_trigger_chat = false # 是否允许notice消息触发聊天流程默认关闭notice只会被记录但不会触发回复
notice_in_prompt = true # 是否在提示词中展示最近的notice消息
notice_prompt_limit = 5 # 在提示词中展示的最大notice数量
notice_time_window = 3600 # notice时间窗口只有这个时间范围内的notice会在提示词中展示默认1小时
max_notices_per_chat = 30 # 每个聊天保留的notice数量上限
notice_retention_time = 86400 # notice保留时间默认24小时
[anti_prompt_injection] # LLM反注入系统配置
enabled = false # 是否启用反注入系统
enabled_rules = false # 是否启用规则检测