feat: 添加通知消息处理功能,支持戳一戳、禁言等事件,并更新相关逻辑

This commit is contained in:
Windpicker-owo
2025-11-27 22:54:58 +08:00
parent 3538716515
commit a06510b9b6
4 changed files with 187 additions and 35 deletions

View File

@@ -93,29 +93,18 @@ class MessageManager:
logger.info("消息管理器已停止")
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
"""添加消息到指定聊天流
注意Notice 消息已在 MessageHandler._handle_notice_message 中单独处理,
不再经过此方法。此方法仅处理普通消息。
"""
try:
# 检查是否为notice消息
if self._is_notice_message(message):
# Notice消息处理 - 添加到全局管理器
logger.debug(f"检测到notice消息: notice_type={getattr(message, 'notice_type', None)}")
await self._handle_notice_message(stream_id, message)
# 根据配置决定是否继续处理(触发聊天流程)
if not global_config.notice.enable_notice_trigger_chat:
logger.debug(f"Notice消息将被忽略不触发聊天流程: {stream_id}")
return # 停止处理,不进入未读消息队列
else:
logger.debug(f"Notice消息将触发聊天流程: {stream_id}")
# 继续执行,将消息添加到未读队列
# 普通消息处理
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
return
# 启动steam loop任务如果尚未启动
# 启动 stream loop 任务(如果尚未启动)
await stream_loop_manager.start_stream_loop(stream_id)
await self._check_and_handle_interruption(chat_stream, message)
await chat_stream.context.add_message(message)

View File

@@ -140,6 +140,24 @@ class MessageHandler:
message_type="adapter_response",
)
# 注册 notice 消息处理器(处理通知消息,如戳一戳、禁言等)
def _is_notice_message(env: MessageEnvelope) -> bool:
"""检查是否为 notice 消息"""
message_info = env.get("message_info")
if not isinstance(message_info, dict):
return False
additional_config = message_info.get("additional_config")
if isinstance(additional_config, dict):
return additional_config.get("is_notice", False)
return False
runtime.add_route(
predicate=_is_notice_message,
handler=self._handle_notice_message,
name="notice_message_handler",
message_type="notice",
)
# 注册默认消息处理器(处理所有其他消息)
runtime.add_route(
predicate=lambda _: True, # 匹配所有消息
@@ -235,6 +253,165 @@ class MessageHandler:
await self._handle_adapter_response(seg_data)
return None
async def _handle_notice_message(self, envelope: MessageEnvelope) -> MessageEnvelope | None:
"""
Notice 消息专属处理器:处理通知消息(戳一戳、禁言、表情回复等)
Notice 消息与普通消息不同,它们不需要完整的消息处理链:
1. 不触发命令处理
2. 存储到数据库
3. 添加到全局 Notice 管理器
4. 触发 ON_NOTICE_RECEIVED 事件供插件监听
"""
try:
message_info = envelope.get("message_info")
if not isinstance(message_info, dict):
logger.debug("Notice 消息缺少 message_info已跳过")
return None
# 获取 notice 配置
additional_config = message_info.get("additional_config", {})
if not isinstance(additional_config, dict):
additional_config = {}
notice_type = additional_config.get("notice_type", "unknown")
is_public_notice = additional_config.get("is_public_notice", False)
# 获取用户和群组信息
group_info = message_info.get("group_info")
user_info = message_info.get("user_info")
if not user_info:
logger.debug("Notice 消息缺少用户信息,已跳过")
return None
# 获取或创建聊天流
platform = message_info.get("platform", "unknown")
from src.chat.message_receive.chat_stream import get_chat_manager
chat = await get_chat_manager().get_or_create_stream(
platform=platform,
user_info=DatabaseUserInfo.from_dict(user_info) if user_info else None, # type: ignore
group_info=DatabaseGroupInfo.from_dict(group_info) if group_info else None,
)
# 将消息信封转换为 DatabaseMessages
from src.chat.message_receive.message_processor import process_message_from_dict
message = await process_message_from_dict(
message_dict=envelope,
stream_id=chat.stream_id,
platform=chat.platform
)
# 填充聊天流时间信息
message.chat_info.create_time = chat.create_time
message.chat_info.last_active_time = chat.last_active_time
# 标记为 notice 消息
message.is_notify = True
message.notice_type = notice_type
# 打印接收日志
chat_name = chat.group_info.group_name if chat.group_info else "私聊"
user_nickname = message.user_info.user_nickname if message.user_info else "未知用户"
logger.info(f"[Notice][{chat_name}][{notice_type}] {user_nickname}: {message.processed_plain_text}\u001b[0m")
# 存储消息到数据库
await MessageStorage.store_message(message, chat)
# 添加到全局 Notice 管理器
await self._add_notice_to_manager(message, chat.stream_id, is_public_notice, notice_type)
# 触发 notice 事件(可供插件监听)
await event_manager.trigger_event(
EventType.ON_NOTICE_RECEIVED,
permission_group="USER",
message=message,
notice_type=notice_type,
chat_stream=chat,
)
# 根据配置决定是否触发聊天流程
if global_config and global_config.notice and global_config.notice.enable_notice_trigger_chat:
logger.debug(f"Notice 消息将触发聊天流程: {chat.stream_id}")
# 添加到聊天流上下文,触发正常的消息处理流程
from src.chat.message_manager.distribution_manager import stream_loop_manager
await stream_loop_manager.start_stream_loop(chat.stream_id)
await chat.context.add_message(message)
else:
logger.debug(f"Notice 消息不触发聊天流程: {chat.stream_id}")
return None
except Exception as e:
logger.error(f"处理 Notice 消息时出错: {e}")
import traceback
traceback.print_exc()
return None
async def _add_notice_to_manager(
self,
message: DatabaseMessages,
stream_id: str,
is_public_notice: bool,
notice_type: str
) -> None:
"""将 notice 消息添加到全局 Notice 管理器
Args:
message: 数据库消息对象
stream_id: 聊天流ID
is_public_notice: 是否为公共 notice
notice_type: notice 类型
"""
try:
from src.chat.message_manager.global_notice_manager import NoticeScope
# 确定作用域
scope = NoticeScope.PUBLIC if is_public_notice else NoticeScope.STREAM
# 获取 TTL
ttl = self._get_notice_ttl(notice_type)
# 添加到全局 notice 管理器
success = message_manager.notice_manager.add_notice(
message=message,
scope=scope,
target_stream_id=stream_id if scope == NoticeScope.STREAM else None,
ttl=ttl
)
if success:
logger.debug(
f"Notice 消息已添加到全局管理器: message_id={message.message_id}, "
f"scope={scope.value}, stream={stream_id}, ttl={ttl}s"
)
else:
logger.warning(f"Notice 消息添加失败: message_id={message.message_id}")
except Exception as e:
logger.error(f"添加 notice 到管理器失败: {e}")
def _get_notice_ttl(self, notice_type: str) -> int:
"""根据 notice 类型获取生存时间(秒)
Args:
notice_type: notice 类型
Returns:
int: TTL 秒数
"""
ttl_mapping = {
"poke": 1800, # 戳一戳 30 分钟
"emoji_like": 3600, # 表情回复 1 小时
"group_ban": 7200, # 禁言 2 小时
"group_lift_ban": 7200, # 解禁 2 小时
"group_whole_ban": 3600, # 全体禁言 1 小时
"group_whole_lift_ban": 3600, # 解除全体禁言 1 小时
"group_upload": 3600, # 群文件上传 1 小时
}
return ttl_mapping.get(notice_type, 3600) # 默认 1 小时
async def _handle_normal_message(self, envelope: MessageEnvelope) -> MessageEnvelope | None:
"""
默认消息处理器:处理普通消息
@@ -317,22 +494,6 @@ class MessageHandler:
return None
# 保留旧的 process_message 方法用于向后兼容
async def process_message(self, envelope: MessageEnvelope) -> None:
"""
处理接收到的消息信封(向后兼容)
注意:此方法已被 MessageRuntime 路由取代。
如果直接调用此方法,它会委托给 runtime.handle_message()。
Args:
envelope: 消息信封(来自适配器)
"""
if self._runtime:
await self._runtime.handle_message(envelope)
else:
# 如果 runtime 未设置,使用旧的处理流程
await self._handle_normal_message(envelope)
async def _process_commands(self, message: DatabaseMessages, chat: "ChatStream") -> None:
"""处理命令和继续消息流程"""

View File

@@ -106,6 +106,7 @@ class EventType(Enum):
ON_START = "on_start" # 启动事件,用于调用按时任务
ON_STOP = "on_stop"
ON_MESSAGE = "on_message"
ON_NOTICE_RECEIVED = "on_notice_received" # Notice 消息事件(戳一戳、禁言等)
ON_PLAN = "on_plan"
POST_LLM = "post_llm"
AFTER_LLM = "after_llm"

View File

@@ -210,8 +210,9 @@ class NoticeHandler:
msg_builder.seg_list([handled_segment])
# 设置 additional_config包含 notice 相关配置)
res = msg_builder.build()["message_info"]["additional_config"] = notice_config
return res
envelope = msg_builder.build()
envelope["message_info"]["additional_config"] = notice_config
return envelope
async def _handle_poke_notify(
self, raw: Dict[str, Any], group_id: Any, user_id: Any