This commit is contained in:
tt-P607
2025-11-28 00:47:13 +08:00
13 changed files with 1118 additions and 161 deletions

View File

@@ -1,6 +1,6 @@
[project]
name = "MoFox-Bot"
version = "0.12.0"
version = "0.13.0"
description = "MoFox-Bot 是一个基于大语言模型的可交互智能体"
requires-python = ">=3.11,<=3.13"
dependencies = [

View File

@@ -58,6 +58,7 @@ from sqlalchemy import (
Table,
inspect,
text,
types as sqltypes,
)
from sqlalchemy.engine import Engine, Connection
from sqlalchemy.exc import SQLAlchemyError
@@ -191,7 +192,7 @@ def get_database_config_from_toml(db_type: str) -> dict | None:
def create_sqlite_engine(sqlite_path: str) -> Engine:
"""创建 SQLite 引擎"""
"""<EFBFBD><EFBFBD><EFBFBD><EFBFBD> SQLite <EFBFBD><EFBFBD><EFBFBD><EFBFBD>"""
if not os.path.isabs(sqlite_path):
sqlite_path = os.path.join(PROJECT_ROOT, sqlite_path)
@@ -200,28 +201,18 @@ def create_sqlite_engine(sqlite_path: str) -> Engine:
url = f"sqlite:///{sqlite_path}"
logger.info("使用 SQLite 数据库: %s", sqlite_path)
return create_engine(url, future=True)
def create_mysql_engine(
host: str,
port: int,
database: str,
user: str,
password: str,
charset: str = "utf8mb4",
) -> Engine:
"""创建 MySQL 引擎"""
# 延迟导入 pymysql以便友好提示
try:
import pymysql # noqa: F401
except ImportError:
logger.error("需要安装 pymysql 才能连接 MySQL: pip install pymysql")
raise
url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset={charset}"
logger.info("使用 MySQL 数据库: %s@%s:%s/%s", user, host, port, database)
return create_engine(url, future=True)
engine = create_engine(
url,
future=True,
connect_args={
"timeout": 30, # wait a bit if the db is locked
"check_same_thread": False,
},
)
# Increase busy timeout to reduce "database is locked" errors on SQLite
with engine.connect() as conn:
conn.execute(text("PRAGMA busy_timeout=30000"))
return engine
def create_postgresql_engine(
@@ -324,22 +315,35 @@ def get_table_row_count(conn: Connection, table: Table) -> int:
def copy_table_structure(source_table: Table, target_metadata: MetaData, target_engine: Engine) -> Table:
"""在目标数据库中创建与源表结构相同的表
"""复制表结构到目标数据库,使其结构保持一致"""
target_is_sqlite = target_engine.dialect.name == "sqlite"
target_is_pg = target_engine.dialect.name == "postgresql"
Args:
source_table: 源表对象
target_metadata: 目标元数据对象
target_engine: 目标数据库引擎
columns = []
for c in source_table.columns:
new_col = c.copy()
Returns:
Table: 目标表对象
"""
# 复制表结构
# SQLite 不支持 nextval 等 server_default
if target_is_sqlite:
new_col.server_default = None
# PostgreSQL 需要将部分 SQLite 特有类型转换
if target_is_pg:
col_type = new_col.type
# SQLite DATETIME -> 通用 DateTime
if isinstance(col_type, sqltypes.DateTime) or col_type.__class__.__name__ in {"DATETIME", "DateTime"}:
new_col.type = sqltypes.DateTime()
# TEXT(50) 等长度受限的 TEXT 在 PG 无效,改用 String(length)
elif isinstance(col_type, sqltypes.Text) and getattr(col_type, "length", None):
new_col.type = sqltypes.String(length=col_type.length)
columns.append(new_col)
# 为避免迭代约束集合时出现 “Set changed size during iteration”这里不复制表级约束
target_table = Table(
source_table.name,
target_metadata,
*[c.copy() for c in source_table.columns],
*[c.copy() for c in source_table.constraints],
*columns,
)
target_metadata.create_all(target_engine, tables=[target_table])
return target_table
@@ -383,8 +387,6 @@ def migrate_table_data(
logger.error("查询表 %s 失败: %s", source_table.name, e)
return 0, 1
columns = source_table.columns.keys()
def insert_batch(rows: list[dict]):
nonlocal migrated_rows, error_count
if not rows:
@@ -398,8 +400,18 @@ def migrate_table_data(
error_count += len(rows)
batch: list[dict] = []
null_char_replacements = 0
for row in result:
row_dict = {col: row[col] for col in columns}
# Use column objects to access row mapping to avoid quoted_name keys
row_dict = {}
for col in source_table.columns:
val = row._mapping[col]
if isinstance(val, str) and "\x00" in val:
val = val.replace("\x00", "")
null_char_replacements += 1
row_dict[col.key] = val
batch.append(row_dict)
if len(batch) >= batch_size:
insert_batch(batch)
@@ -414,6 +426,12 @@ def migrate_table_data(
migrated_rows,
error_count,
)
if null_char_replacements:
logger.warning(
"%s%d 个字符串值包含 NUL 已被移除后写入目标库",
source_table.name,
null_char_replacements,
)
return migrated_rows, error_count
@@ -535,6 +553,14 @@ class DatabaseMigrator:
# 目标数据库配置
target_config = self._load_target_config()
# 防止源/目标 SQLite 指向同一路径导致自我覆盖及锁
if (
self.source_type == "sqlite"
and self.target_type == "sqlite"
and os.path.abspath(source_config.get("path", "")) == os.path.abspath(target_config.get("path", ""))
):
raise ValueError("源数据库与目标数据库不能是同一个 SQLite 文件,请为目标指定不同的路径")
# 创建引擎
self.source_engine = create_engine_by_type(self.source_type, source_config)
self.target_engine = create_engine_by_type(self.target_type, target_config)
@@ -589,32 +615,36 @@ class DatabaseMigrator:
return sorted_tables
def _drop_target_tables(self, conn: Connection):
"""删除目标数据库中已经存在的表(谨慎操作
def _drop_target_tables(self):
"""删除目标数据库中已有的表(如果有
这里为了避免冲突,迁移前会询问用户是否删除目标库中已经存在的同名表。
使用 Engine.begin() 进行连接以支持 autobegin 和 begin 兼容 SQLAlchemy 2.0 的写法
"""
inspector = inspect(conn)
existing_tables = inspector.get_table_names()
if not existing_tables:
logger.info("目标数据库中没有已存在的表,无需删除")
if self.target_engine is None:
logger.warning("目标数据库引擎尚未初始化,无法删除表")
return
logger.info("目标数据库中当前存在的表: %s", ", ".join(existing_tables))
if confirm_action("是否删除目标数据库中已有的所有表?此操作不可恢复!", default=False):
with conn.begin():
with self.target_engine.begin() as conn:
inspector = inspect(conn)
existing_tables = inspector.get_table_names()
if not existing_tables:
logger.info("目标数据库中没有已存在的表,无需删除")
return
logger.info("目标数据库中的当前表: %s", ", ".join(existing_tables))
if confirm_action("是否删除目标数据库中现有的表列表?此操作不可撤销", default=False):
for table_name in existing_tables:
try:
logger.info("删除目标数据库表: %s", table_name)
logger.info("删除目标数据库表: %s", table_name)
conn.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE"))
except SQLAlchemyError as e:
logger.error("删除 %s 失败: %s", table_name, e)
logger.error("删除 %s 失败: %s", table_name, e)
self.stats["errors"].append(
f"删除 {table_name} 失败: {e}"
f"删除 {table_name} 失败: {e}"
)
else:
logger.info("用户选择保留目标数据库中已有的表,可能会与迁移数据发生冲突。")
else:
logger.info("跳过删除目标数据库中的表,继续迁移过程")
def migrate(self):
"""执行迁移操作"""
@@ -630,24 +660,24 @@ class DatabaseMigrator:
logger.info("按依赖顺序迁移表: %s", ", ".join(t.name for t in tables))
# 删除目标库中已有表(可选)
with self.target_engine.connect() as target_conn:
self._drop_target_tables(target_conn)
self._drop_target_tables()
# 开始迁移
with self.source_engine.connect() as source_conn, self.target_engine.connect() as target_conn:
with self.source_engine.connect() as source_conn:
for source_table in tables:
try:
# 在目标库中创建表结构
target_table = copy_table_structure(source_table, MetaData(), self.target_engine)
# 迁移数据
migrated_rows, error_count = migrate_table_data(
source_conn,
target_conn,
source_table,
target_table,
batch_size=self.batch_size,
)
# 每张表单独事务,避免退出上下文被自动回滚
with self.target_engine.begin() as target_conn:
migrated_rows, error_count = migrate_table_data(
source_conn,
target_conn,
source_table,
target_table,
batch_size=self.batch_size,
)
self.stats["tables_migrated"] += 1
self.stats["rows_migrated"] += migrated_rows
@@ -937,7 +967,7 @@ def interactive_setup() -> dict:
if target_type == "sqlite":
target_path = _ask_str(
"目标 SQLite 文件路径(若不存在会自动创建)",
default="data/MaiBot_target.db",
default="data/MaiBot.db",
)
target_config = {"path": target_path}
else:

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""重置 PostgreSQL 序列值
迁移数据后PostgreSQL 的序列(用于自增主键)可能没有更新到正确的值,
导致插入新记录时出现主键冲突。此脚本会自动检测并重置所有序列。
使用方法:
python scripts/reset_pg_sequences.py --host localhost --port 5432 --database maibot --user postgres --password your_password
"""
import argparse
import psycopg
def reset_sequences(host: str, port: int, database: str, user: str, password: str):
"""重置所有序列值"""
conn_str = f"host={host} port={port} dbname={database} user={user} password={password}"
print(f"连接到 PostgreSQL: {host}:{port}/{database}")
conn = psycopg.connect(conn_str)
conn.autocommit = True
# 查询所有序列及其关联的表和列
query = """
SELECT
t.relname AS table_name,
a.attname AS column_name,
s.relname AS sequence_name
FROM pg_class s
JOIN pg_depend d ON d.objid = s.oid
JOIN pg_class t ON d.refobjid = t.oid
JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
WHERE s.relkind = 'S'
"""
cursor = conn.execute(query)
sequences = cursor.fetchall()
print(f"发现 {len(sequences)} 个序列")
reset_count = 0
for table_name, col_name, seq_name in sequences:
try:
# 获取当前最大 ID
max_result = conn.execute(f'SELECT MAX("{col_name}") FROM "{table_name}"')
max_id = max_result.fetchone()[0]
if max_id is not None:
# 重置序列
conn.execute(f"SELECT setval('{seq_name}', {max_id}, true)")
print(f"{seq_name} -> {max_id}")
reset_count += 1
else:
print(f" - {seq_name}: 表为空,跳过")
except Exception as e:
print(f"{table_name}.{col_name}: {e}")
conn.close()
print(f"\n✅ 重置完成!共重置 {reset_count} 个序列")
def main():
parser = argparse.ArgumentParser(description="重置 PostgreSQL 序列值")
parser.add_argument("--host", default="localhost", help="PostgreSQL 主机")
parser.add_argument("--port", type=int, default=5432, help="PostgreSQL 端口")
parser.add_argument("--database", default="maibot", help="数据库名")
parser.add_argument("--user", default="postgres", help="用户名")
parser.add_argument("--password", required=True, help="密码")
args = parser.parse_args()
reset_sequences(args.host, args.port, args.database, args.user, args.password)
if __name__ == "__main__":
main()

View File

@@ -367,6 +367,7 @@ class BotInterestManager:
self.embedding_dimension,
current_dim,
)
return embedding
else:
raise RuntimeError(f"❌ 返回的embedding为空: {embedding}")

View File

@@ -93,29 +93,18 @@ class MessageManager:
logger.info("消息管理器已停止")
async def add_message(self, stream_id: str, message: DatabaseMessages):
"""添加消息到指定聊天流"""
"""添加消息到指定聊天流
注意Notice 消息已在 MessageHandler._handle_notice_message 中单独处理,
不再经过此方法。此方法仅处理普通消息。
"""
try:
# 检查是否为notice消息
if self._is_notice_message(message):
# Notice消息处理 - 添加到全局管理器
logger.debug(f"检测到notice消息: notice_type={getattr(message, 'notice_type', None)}")
await self._handle_notice_message(stream_id, message)
# 根据配置决定是否继续处理(触发聊天流程)
if not global_config.notice.enable_notice_trigger_chat:
logger.debug(f"Notice消息将被忽略不触发聊天流程: {stream_id}")
return # 停止处理,不进入未读消息队列
else:
logger.debug(f"Notice消息将触发聊天流程: {stream_id}")
# 继续执行,将消息添加到未读队列
# 普通消息处理
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if not chat_stream:
logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在")
return
# 启动steam loop任务如果尚未启动
# 启动 stream loop 任务(如果尚未启动)
await stream_loop_manager.start_stream_loop(stream_id)
await self._check_and_handle_interruption(chat_stream, message)
await chat_stream.context.add_message(message)

View File

@@ -120,9 +120,6 @@ class MessageHandler:
# 注册前置钩子:消息预处理和过滤
runtime.register_before_hook(self._before_hook)
# 注册后置钩子:存储、情绪更新等
runtime.register_after_hook(self._after_hook)
# 注册错误钩子:统一异常处理
runtime.register_error_hook(self._error_hook)
@@ -140,6 +137,24 @@ class MessageHandler:
message_type="adapter_response",
)
# 注册 notice 消息处理器(处理通知消息,如戳一戳、禁言等)
def _is_notice_message(env: MessageEnvelope) -> bool:
"""检查是否为 notice 消息"""
message_info = env.get("message_info")
if not isinstance(message_info, dict):
return False
additional_config = message_info.get("additional_config")
if isinstance(additional_config, dict):
return additional_config.get("is_notice", False)
return False
runtime.add_route(
predicate=_is_notice_message,
handler=self._handle_notice_message,
name="notice_message_handler",
message_type="notice",
)
# 注册默认消息处理器(处理所有其他消息)
runtime.add_route(
predicate=lambda _: True, # 匹配所有消息
@@ -204,14 +219,6 @@ class MessageHandler:
await MessageStorage.update_message(dict(envelope))
raise UserWarning("Echo 消息已处理")
async def _after_hook(self, envelope: MessageEnvelope) -> None:
"""
后置钩子:消息后处理
在消息处理完成后执行的清理工作
"""
# 后置处理逻辑(如有需要)
pass
async def _error_hook(self, envelope: MessageEnvelope, exc: BaseException) -> None:
"""
@@ -235,6 +242,165 @@ class MessageHandler:
await self._handle_adapter_response(seg_data)
return None
async def _handle_notice_message(self, envelope: MessageEnvelope) -> MessageEnvelope | None:
"""
Notice 消息专属处理器:处理通知消息(戳一戳、禁言、表情回复等)
Notice 消息与普通消息不同,它们不需要完整的消息处理链:
1. 不触发命令处理
2. 存储到数据库
3. 添加到全局 Notice 管理器
4. 触发 ON_NOTICE_RECEIVED 事件供插件监听
"""
try:
message_info = envelope.get("message_info")
if not isinstance(message_info, dict):
logger.debug("Notice 消息缺少 message_info已跳过")
return None
# 获取 notice 配置
additional_config = message_info.get("additional_config", {})
if not isinstance(additional_config, dict):
additional_config = {}
notice_type = additional_config.get("notice_type", "unknown")
is_public_notice = additional_config.get("is_public_notice", False)
# 获取用户和群组信息
group_info = message_info.get("group_info")
user_info = message_info.get("user_info")
if not user_info:
logger.debug("Notice 消息缺少用户信息,已跳过")
return None
# 获取或创建聊天流
platform = message_info.get("platform", "unknown")
from src.chat.message_receive.chat_stream import get_chat_manager
chat = await get_chat_manager().get_or_create_stream(
platform=platform,
user_info=DatabaseUserInfo.from_dict(user_info) if user_info else None, # type: ignore
group_info=DatabaseGroupInfo.from_dict(group_info) if group_info else None,
)
# 将消息信封转换为 DatabaseMessages
from src.chat.message_receive.message_processor import process_message_from_dict
message = await process_message_from_dict(
message_dict=envelope,
stream_id=chat.stream_id,
platform=chat.platform
)
# 填充聊天流时间信息
message.chat_info.create_time = chat.create_time
message.chat_info.last_active_time = chat.last_active_time
# 标记为 notice 消息
message.is_notify = True
message.notice_type = notice_type
# 打印接收日志
chat_name = chat.group_info.group_name if chat.group_info else "私聊"
user_nickname = message.user_info.user_nickname if message.user_info else "未知用户"
logger.info(f"[Notice][{chat_name}][{notice_type}] {user_nickname}: {message.processed_plain_text}\u001b[0m")
# 存储消息到数据库
await MessageStorage.store_message(message, chat)
# 添加到全局 Notice 管理器
await self._add_notice_to_manager(message, chat.stream_id, is_public_notice, notice_type)
# 触发 notice 事件(可供插件监听)
await event_manager.trigger_event(
EventType.ON_NOTICE_RECEIVED,
permission_group="USER",
message=message,
notice_type=notice_type,
chat_stream=chat,
)
# 根据配置决定是否触发聊天流程
if global_config and global_config.notice and global_config.notice.enable_notice_trigger_chat:
logger.debug(f"Notice 消息将触发聊天流程: {chat.stream_id}")
# 添加到聊天流上下文,触发正常的消息处理流程
from src.chat.message_manager.distribution_manager import stream_loop_manager
await stream_loop_manager.start_stream_loop(chat.stream_id)
await chat.context.add_message(message)
else:
logger.debug(f"Notice 消息不触发聊天流程: {chat.stream_id}")
return None
except Exception as e:
logger.error(f"处理 Notice 消息时出错: {e}")
import traceback
traceback.print_exc()
return None
async def _add_notice_to_manager(
self,
message: DatabaseMessages,
stream_id: str,
is_public_notice: bool,
notice_type: str
) -> None:
"""将 notice 消息添加到全局 Notice 管理器
Args:
message: 数据库消息对象
stream_id: 聊天流ID
is_public_notice: 是否为公共 notice
notice_type: notice 类型
"""
try:
from src.chat.message_manager.global_notice_manager import NoticeScope
# 确定作用域
scope = NoticeScope.PUBLIC if is_public_notice else NoticeScope.STREAM
# 获取 TTL
ttl = self._get_notice_ttl(notice_type)
# 添加到全局 notice 管理器
success = message_manager.notice_manager.add_notice(
message=message,
scope=scope,
target_stream_id=stream_id if scope == NoticeScope.STREAM else None,
ttl=ttl
)
if success:
logger.debug(
f"Notice 消息已添加到全局管理器: message_id={message.message_id}, "
f"scope={scope.value}, stream={stream_id}, ttl={ttl}s"
)
else:
logger.warning(f"Notice 消息添加失败: message_id={message.message_id}")
except Exception as e:
logger.error(f"添加 notice 到管理器失败: {e}")
def _get_notice_ttl(self, notice_type: str) -> int:
"""根据 notice 类型获取生存时间(秒)
Args:
notice_type: notice 类型
Returns:
int: TTL 秒数
"""
ttl_mapping = {
"poke": 1800, # 戳一戳 30 分钟
"emoji_like": 3600, # 表情回复 1 小时
"group_ban": 7200, # 禁言 2 小时
"group_lift_ban": 7200, # 解禁 2 小时
"group_whole_ban": 3600, # 全体禁言 1 小时
"group_whole_lift_ban": 3600, # 解除全体禁言 1 小时
"group_upload": 3600, # 群文件上传 1 小时
}
return ttl_mapping.get(notice_type, 3600) # 默认 1 小时
async def _handle_normal_message(self, envelope: MessageEnvelope) -> MessageEnvelope | None:
"""
默认消息处理器:处理普通消息
@@ -309,6 +475,21 @@ class MessageHandler:
# 处理命令和后续流程
await self._process_commands(message, chat)
# 触发消息事件
result = await event_manager.trigger_event(
EventType.ON_MESSAGE,
permission_group="SYSTEM",
message=message
)
if result and not result.all_continue_process():
raise UserWarning(
f"插件{result.get_summary().get('stopped_handlers', '')}于消息到达时取消了消息处理"
)
# 预处理消息
await self._preprocess_message(message, chat)
except UserWarning as uw:
logger.info(str(uw))
except Exception as e:
@@ -317,22 +498,6 @@ class MessageHandler:
return None
# 保留旧的 process_message 方法用于向后兼容
async def process_message(self, envelope: MessageEnvelope) -> None:
"""
处理接收到的消息信封(向后兼容)
注意:此方法已被 MessageRuntime 路由取代。
如果直接调用此方法,它会委托给 runtime.handle_message()。
Args:
envelope: 消息信封(来自适配器)
"""
if self._runtime:
await self._runtime.handle_message(envelope)
else:
# 如果 runtime 未设置,使用旧的处理流程
await self._handle_normal_message(envelope)
async def _process_commands(self, message: DatabaseMessages, chat: "ChatStream") -> None:
"""处理命令和继续消息流程"""
@@ -354,20 +519,6 @@ class MessageHandler:
logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}")
return
# 触发消息事件
result = await event_manager.trigger_event(
EventType.ON_MESSAGE,
permission_group="SYSTEM",
message=message
)
if result and not result.all_continue_process():
raise UserWarning(
f"插件{result.get_summary().get('stopped_handlers', '')}于消息到达时取消了消息处理"
)
# 预处理消息
await self._preprocess_message(message, chat)
except UserWarning as uw:
logger.info(str(uw))
except Exception as e:

View File

@@ -84,11 +84,12 @@ async def check_and_migrate_database(existing_engine=None):
try:
# 检查并添加缺失的列
db_columns = await connection.run_sync(
db_columns_info = await connection.run_sync(
lambda conn: {
col["name"] for col in inspector.get_columns(table_name)
col["name"]: col for col in inspector.get_columns(table_name)
}
)
db_columns = set(db_columns_info.keys())
model_columns = {col.name for col in table.c}
missing_columns = model_columns - db_columns
@@ -144,7 +145,12 @@ async def check_and_migrate_database(existing_engine=None):
# 提交列添加事务
await connection.commit()
else:
logger.info(f"'{table_name}' 的列结构一致。")
logger.debug(f"'{table_name}' 的列结构一致。")
# 3. 检查并修复列类型不匹配(仅 PostgreSQL
await _check_and_fix_column_types(
connection, inspector, table_name, table, db_columns_info
)
# 检查并创建缺失的索引
db_indexes = await connection.run_sync(
@@ -225,3 +231,126 @@ async def drop_all_tables(existing_engine=None):
await connection.run_sync(Base.metadata.drop_all)
logger.warning("所有数据库表已删除。")
# =============================================================================
# 列类型修复辅助函数
# =============================================================================
# 已知需要修复的列类型映射
# 格式: {(表名, 列名): (期望的Python类型类别, PostgreSQL USING 子句)}
# Python类型类别: "boolean", "integer", "float", "string"
_BOOLEAN_USING_CLAUSE = (
"boolean",
"USING CASE WHEN {column} IS NULL THEN FALSE "
"WHEN {column} = 0 THEN FALSE ELSE TRUE END"
)
_COLUMN_TYPE_FIXES = {
# messages 表的布尔列
("messages", "is_public_notice"): _BOOLEAN_USING_CLAUSE,
("messages", "should_reply"): _BOOLEAN_USING_CLAUSE,
("messages", "should_act"): _BOOLEAN_USING_CLAUSE,
("messages", "is_mentioned"): _BOOLEAN_USING_CLAUSE,
("messages", "is_emoji"): _BOOLEAN_USING_CLAUSE,
("messages", "is_picid"): _BOOLEAN_USING_CLAUSE,
("messages", "is_command"): _BOOLEAN_USING_CLAUSE,
("messages", "is_notify"): _BOOLEAN_USING_CLAUSE,
}
def _get_expected_pg_type(python_type_category: str) -> str:
"""获取期望的 PostgreSQL 类型名称"""
mapping = {
"boolean": "boolean",
"integer": "integer",
"float": "double precision",
"string": "text",
}
return mapping.get(python_type_category, "text")
def _normalize_pg_type(type_name: str) -> str:
"""标准化 PostgreSQL 类型名称用于比较"""
type_name = type_name.lower().strip()
# 处理常见的别名
aliases = {
"bool": "boolean",
"int": "integer",
"int4": "integer",
"int8": "bigint",
"float8": "double precision",
"float4": "real",
"numeric": "numeric",
"decimal": "numeric",
}
return aliases.get(type_name, type_name)
async def _check_and_fix_column_types(connection, inspector, table_name, table, db_columns_info):
"""检查并修复列类型不匹配的问题(仅 PostgreSQL
Args:
connection: 数据库连接
inspector: SQLAlchemy inspector
table_name: 表名
table: SQLAlchemy Table 对象
db_columns_info: 数据库中列的信息字典
"""
# 获取数据库方言
def get_dialect_name(conn):
return conn.dialect.name
dialect_name = await connection.run_sync(get_dialect_name)
# 目前只处理 PostgreSQL
if dialect_name != "postgresql":
return
for (fix_table, fix_column), (expected_type_category, using_clause) in _COLUMN_TYPE_FIXES.items():
if fix_table != table_name:
continue
if fix_column not in db_columns_info:
continue
col_info = db_columns_info[fix_column]
current_type = _normalize_pg_type(str(col_info.get("type", "")))
expected_type = _get_expected_pg_type(expected_type_category)
# 如果类型已经正确,跳过
if current_type == expected_type:
continue
# 检查是否需要修复:如果当前是 numeric 但期望是 boolean
if current_type == "numeric" and expected_type == "boolean":
logger.warning(
f"发现列类型不匹配: {table_name}.{fix_column} "
f"(当前: {current_type}, 期望: {expected_type})"
)
# PostgreSQL 需要先删除默认值,再修改类型,最后重新设置默认值
using_sql = using_clause.format(column=fix_column)
drop_default_sql = f"ALTER TABLE {table_name} ALTER COLUMN {fix_column} DROP DEFAULT"
alter_type_sql = f"ALTER TABLE {table_name} ALTER COLUMN {fix_column} TYPE BOOLEAN {using_sql}"
set_default_sql = f"ALTER TABLE {table_name} ALTER COLUMN {fix_column} SET DEFAULT FALSE"
try:
def execute_alter(conn):
# 步骤 1: 删除默认值
try:
conn.execute(text(drop_default_sql))
except Exception:
pass # 如果没有默认值,忽略错误
# 步骤 2: 修改类型
conn.execute(text(alter_type_sql))
# 步骤 3: 重新设置默认值
conn.execute(text(set_default_sql))
await connection.run_sync(execute_alter)
await connection.commit()
logger.info(f"成功修复列类型: {table_name}.{fix_column} -> BOOLEAN")
except Exception as e:
logger.error(f"修复列类型失败 {table_name}.{fix_column}: {e}")
await connection.rollback()

View File

@@ -64,7 +64,7 @@ TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "template")
# 考虑到实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码
# 对该字段的更新请严格参照语义化版本规范https://semver.org/lang/zh-CN/
MMC_VERSION = "0.13.0-alpha.3"
MMC_VERSION = "0.13.0-alpha.4"
# 全局配置变量
_CONFIG_INITIALIZED = False

View File

@@ -106,6 +106,7 @@ class EventType(Enum):
ON_START = "on_start" # 启动事件,用于调用按时任务
ON_STOP = "on_stop"
ON_MESSAGE = "on_message"
ON_NOTICE_RECEIVED = "on_notice_received" # Notice 消息事件(戳一戳、禁言等)
ON_PLAN = "on_plan"
POST_LLM = "post_llm"
AFTER_LLM = "after_llm"

View File

@@ -99,8 +99,47 @@ class NapcatAdapter(BaseAdapter):
self.meta_event_handler.set_plugin_config(self.plugin.config)
self.send_handler.set_plugin_config(self.plugin.config)
# 注册 notice 事件到 event manager
await self._register_notice_events()
logger.info("Napcat 适配器已加载")
async def _register_notice_events(self) -> None:
"""注册 notice 相关事件到 event manager"""
from src.plugin_system.core.event_manager import event_manager
from .src.event_types import NapcatEvent
# 定义所有 notice 事件类型
notice_events = [
NapcatEvent.ON_RECEIVED.POKE,
NapcatEvent.ON_RECEIVED.EMOJI_LIEK,
NapcatEvent.ON_RECEIVED.GROUP_UPLOAD,
NapcatEvent.ON_RECEIVED.GROUP_BAN,
NapcatEvent.ON_RECEIVED.GROUP_LIFT_BAN,
NapcatEvent.ON_RECEIVED.FRIEND_RECALL,
NapcatEvent.ON_RECEIVED.GROUP_RECALL,
NapcatEvent.ON_RECEIVED.FRIEND_INPUT,
]
# 注册所有事件
registered_count = 0
for event_type in notice_events:
try:
# 使用同步的 register_event 方法注册事件
success = event_manager.register_event(
event_name=event_type,
allowed_triggers=["napcat_adapter_plugin"], # 只允许此插件触发
)
if success:
registered_count += 1
logger.debug(f"已注册 notice 事件: {event_type}")
else:
logger.debug(f"notice 事件已存在: {event_type}")
except Exception as e:
logger.warning(f"注册 notice 事件失败: {event_type}, 错误: {e}")
logger.info(f"已注册 {registered_count} 个新 notice 事件类型(共 {len(notice_events)} 个)")
async def on_adapter_unloaded(self) -> None:
"""适配器卸载时的清理"""
logger.info("Napcat 适配器正在关闭...")
@@ -133,22 +172,28 @@ class NapcatAdapter(BaseAdapter):
if not future.done():
future.set_result(raw)
# 消息事件
if post_type == "message":
return await self.message_handler.handle_raw_message(raw) # type: ignore[return-value]
try:
# 消息事件
if post_type == "message":
return await self.message_handler.handle_raw_message(raw) # type: ignore[return-value]
# 通知事件
elif post_type == "notice":
return await self.notice_handler.handle_notice(raw) # type: ignore[return-value]
# 通知事件
elif post_type == "notice":
return await self.notice_handler.handle_notice(raw) # type: ignore[return-value]
# 元事件
elif post_type == "meta_event":
return await self.meta_event_handler.handle_meta_event(raw) # type: ignore[return-value]
# 未知事件类型
else:
return
# 事件
elif post_type == "meta_event":
return await self.meta_event_handler.handle_meta_event(raw) # type: ignore[return-value]
# 未知事件类型
else:
return None
except ValueError as ve:
logger.warning(f"处理 Napcat 事件时数据无效: {ve}")
return None
except Exception as e:
logger.error(f"处理 Napcat 事件失败: {e}, 原始数据: {raw}")
return None
async def _send_platform_message(self, envelope: MessageEnvelope) -> None: # type: ignore[override]
"""
将 MessageEnvelope 转换并发送到 Napcat
@@ -156,7 +201,10 @@ class NapcatAdapter(BaseAdapter):
这里不直接通过 WebSocket 发送 envelope
而是调用 Napcat APIsend_group_msg, send_private_msg 等)
"""
await self.send_handler.handle_message(envelope)
try:
await self.send_handler.handle_message(envelope)
except Exception as e:
logger.error(f"发送 Napcat 消息失败: {e}")
async def send_napcat_api(self, action: str, params: Dict[str, Any], timeout: float = 30.0) -> Dict[str, Any]:
"""
@@ -265,6 +313,10 @@ class NapcatAdapterPlugin(BasePlugin):
"private_list": ConfigField(type=list, default=[], description="私聊名单;根据名单模式过滤"),
"ban_user_id": ConfigField(type=list, default=[], description="全局封禁的用户 ID 列表"),
"ban_qq_bot": ConfigField(type=bool, default=False, description="是否屏蔽其他 QQ 机器人消息"),
"enable_poke": ConfigField(type=bool, default=True, description="是否启用戳一戳消息处理"),
"ignore_non_self_poke": ConfigField(type=bool, default=False, description="是否忽略不是针对自己的戳一戳消息"),
"poke_debounce_seconds": ConfigField(type=float, default=2.0, description="戳一戳防抖时间(秒)"),
"enable_emoji_like": ConfigField(type=bool, default=True, description="是否启用群聊表情回复处理"),
},
}

View File

@@ -108,7 +108,7 @@ ACCEPT_FORMAT = [
]
# 插件名称
PLUGIN_NAME = "NEW_napcat_adapter"
PLUGIN_NAME = "napcat_adapter_plugin"
# QQ表情映射表
QQ_FACE = {

View File

@@ -0,0 +1,37 @@
"""Napcat 适配器事件类型定义"""
class NapcatEvent:
"""Napcat 适配器事件类型"""
class ON_RECEIVED:
"""接收事件"""
FRIEND_INPUT = "napcat.on_received.friend_input" # 好友正在输入
EMOJI_LIEK = "napcat.on_received.emoji_like" # 表情回复(注意:保持原来的拼写)
POKE = "napcat.on_received.poke" # 戳一戳
GROUP_UPLOAD = "napcat.on_received.group_upload" # 群文件上传
GROUP_BAN = "napcat.on_received.group_ban" # 群禁言
GROUP_LIFT_BAN = "napcat.on_received.group_lift_ban" # 群解禁
FRIEND_RECALL = "napcat.on_received.friend_recall" # 好友消息撤回
GROUP_RECALL = "napcat.on_received.group_recall" # 群消息撤回
class MESSAGE:
"""消息相关事件"""
GET_MSG = "napcat.message.get_msg" # 获取消息
class GROUP:
"""群组相关事件"""
SET_GROUP_BAN = "napcat.group.set_group_ban" # 设置群禁言
SET_GROUP_WHOLE_BAN = "napcat.group.set_group_whole_ban" # 设置全员禁言
SET_GROUP_KICK = "napcat.group.set_group_kick" # 踢出群聊
class FRIEND:
"""好友相关事件"""
SEND_LIKE = "napcat.friend.send_like" # 发送点赞
__all__ = ["NapcatEvent"]

View File

@@ -2,40 +2,530 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, Optional
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from mofox_wire import MessageBuilder, SegPayload, UserInfoPayload
from src.common.logger import get_logger
from src.plugin_system.apis import config_api
from ...event_models import ACCEPT_FORMAT, NoticeType, QQ_FACE, PLUGIN_NAME
from ..utils import get_group_info, get_member_info, get_self_info, get_stranger_info, get_message_detail
if TYPE_CHECKING:
from ...plugin import NapcatAdapter
from ....plugin import NapcatAdapter
logger = get_logger("napcat_adapter")
class NoticeHandler:
"""处理 Napcat 通知事件(戳一戳、表情回复等)"""
"""处理 Napcat 通知事件(戳一戳、表情回复、禁言、文件上传等)"""
def __init__(self, adapter: "NapcatAdapter"):
self.adapter = adapter
self.plugin_config: Optional[Dict[str, Any]] = None
# 戳一戳防抖时间戳
self.last_poke_time: float = 0.0
def set_plugin_config(self, config: Dict[str, Any]) -> None:
"""设置插件配置"""
self.plugin_config = config
def _get_config(self, key: str, default: Any = None) -> Any:
"""获取插件配置的辅助方法"""
if not self.plugin_config:
return default
return config_api.get_plugin_config(self.plugin_config, key, default)
async def handle_notice(self, raw: Dict[str, Any]):
"""处理通知事件"""
# 简化版本:返回一个空的 MessageEnvelope
import time
import uuid
return {
"direction": "incoming",
"message_info": {
"platform": "qq",
"message_id": str(uuid.uuid4()),
"time": time.time(),
},
"message_segment": {"type": "text", "data": "[通知事件]"},
"timestamp_ms": int(time.time() * 1000),
"""
处理通知事件
Args:
raw: OneBot 原始通知数据
Returns:
MessageEnvelope (dict) or None
"""
notice_type = raw.get("notice_type")
message_time: float = time.time()
self_id = raw.get("self_id")
group_id = raw.get("group_id")
user_id = raw.get("user_id")
target_id = raw.get("target_id")
handled_segment: SegPayload | None = None
user_info: UserInfoPayload | None = None
system_notice: bool = False
notice_config: Dict[str, Any] = {
"is_notice": False,
"is_public_notice": False,
"target_id": target_id,
}
match notice_type:
case NoticeType.friend_recall:
logger.info("好友撤回一条消息")
logger.info(f"撤回消息ID{raw.get('message_id')}, 撤回时间:{raw.get('time')}")
logger.warning("暂时不支持撤回消息处理")
return None
case NoticeType.group_recall:
logger.info("群内用户撤回一条消息")
logger.info(f"撤回消息ID{raw.get('message_id')}, 撤回时间:{raw.get('time')}")
logger.warning("暂时不支持撤回消息处理")
return None
case NoticeType.notify:
sub_type = raw.get("sub_type")
match sub_type:
case NoticeType.Notify.poke:
if self._get_config("features.enable_poke", True):
logger.debug("处理戳一戳消息")
handled_segment, user_info = await self._handle_poke_notify(raw, group_id, user_id)
if handled_segment and user_info:
notice_config["notice_type"] = "poke"
notice_config["is_notice"] = True
else:
logger.warning("戳一戳消息被禁用,取消戳一戳处理")
return None
case NoticeType.Notify.input_status:
from src.plugin_system.core.event_manager import event_manager
from ...event_types import NapcatEvent
await event_manager.trigger_event(
NapcatEvent.ON_RECEIVED.FRIEND_INPUT,
permission_group=PLUGIN_NAME
)
return None
case _:
logger.warning(f"不支持的notify类型: {notice_type}.{sub_type}")
return None
case NoticeType.group_msg_emoji_like:
if self._get_config("features.enable_emoji_like", True):
logger.debug("处理群聊表情回复")
handled_segment, user_info = await self._handle_group_emoji_like_notify(
raw, group_id, user_id
)
if handled_segment and user_info:
notice_config["notice_type"] = "emoji_like"
notice_config["is_notice"] = True
else:
logger.warning("群聊表情回复被禁用,取消群聊表情回复处理")
return None
case NoticeType.group_ban:
sub_type = raw.get("sub_type")
match sub_type:
case NoticeType.GroupBan.ban:
logger.info("处理群禁言")
handled_segment, user_info = await self._handle_ban_notify(raw, group_id)
if handled_segment and user_info:
system_notice = True
user_id_in_ban = raw.get("user_id")
if user_id_in_ban == 0:
notice_config["notice_type"] = "group_whole_ban"
else:
notice_config["notice_type"] = "group_ban"
notice_config["is_notice"] = True
case NoticeType.GroupBan.lift_ban:
logger.info("处理解除群禁言")
handled_segment, user_info = await self._handle_lift_ban_notify(raw, group_id)
if handled_segment and user_info:
system_notice = True
user_id_in_ban = raw.get("user_id")
if user_id_in_ban == 0:
notice_config["notice_type"] = "group_whole_lift_ban"
else:
notice_config["notice_type"] = "group_lift_ban"
notice_config["is_notice"] = True
case _:
logger.warning(f"不支持的group_ban类型: {notice_type}.{sub_type}")
return None
case NoticeType.group_upload:
logger.info("群文件上传")
if user_id == self_id:
logger.info("检测到机器人自己上传文件,忽略此通知")
return None
handled_segment, user_info = await self._handle_group_upload_notify(
raw, group_id, user_id, self_id
)
if handled_segment and user_info:
notice_config["notice_type"] = "group_upload"
notice_config["is_notice"] = True
case _:
logger.warning(f"不支持的notice类型: {notice_type}")
return None
if not handled_segment or not user_info:
logger.warning("notice处理失败或不支持")
return None
# 使用 MessageBuilder 构建消息
msg_builder = MessageBuilder()
(
msg_builder.direction("incoming")
.message_id("notice")
.timestamp_ms(int(message_time * 1000))
.from_user(
user_id=str(user_info.get("user_id", "")),
platform="qq",
nickname=user_info.get("user_nickname", ""),
cardname=user_info.get("user_cardname", ""),
)
)
# 如果是群消息,添加群信息
if group_id:
fetched_group_info = await get_group_info(group_id)
group_name: str | None = None
if fetched_group_info:
group_name = fetched_group_info.get("group_name")
else:
logger.warning("无法获取notice消息所在群的名称")
msg_builder.from_group(
group_id=str(group_id),
platform="qq",
name=group_name or "",
)
# 设置格式信息
content_format = [handled_segment.get("type", "text")]
if "notify" not in content_format:
content_format.append("notify")
msg_builder.format_info(
content_format=content_format,
accept_format=ACCEPT_FORMAT,
)
# 设置消息段
msg_builder.seg_list([handled_segment])
# 设置 additional_config包含 notice 相关配置)
envelope = msg_builder.build()
envelope["message_info"]["additional_config"] = notice_config
return envelope
async def _handle_poke_notify(
self, raw: Dict[str, Any], group_id: Any, user_id: Any
) -> Tuple[SegPayload | None, UserInfoPayload | None]:
"""处理戳一戳通知"""
self_info: dict | None = await get_self_info()
if not self_info:
logger.error("自身信息获取失败")
return None, None
self_id = raw.get("self_id")
target_id = raw.get("target_id")
# 防抖检查:如果是针对机器人的戳一戳,检查防抖时间
if self_id == target_id:
current_time = time.time()
debounce_seconds = self._get_config("features.poke_debounce_seconds", 2.0)
if self.last_poke_time > 0:
time_diff = current_time - self.last_poke_time
if time_diff < debounce_seconds:
logger.debug(
f"戳一戳防抖:用户 {user_id} 的戳一戳被忽略(距离上次戳一戳 {time_diff:.2f} 秒)"
)
return None, None
self.last_poke_time = current_time
target_name: str | None = None
raw_info: list = raw.get("raw_info", [])
if group_id:
user_qq_info: dict | None = await get_member_info(group_id, user_id)
else:
user_qq_info: dict | None = await get_stranger_info(user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname", "QQ用户")
user_cardname = user_qq_info.get("card", "")
else:
user_name = "QQ用户"
user_cardname = ""
logger.debug("无法获取戳一戳对方的用户昵称")
# 计算显示名称
display_name = ""
if self_id == target_id:
target_name = self_info.get("nickname", "")
elif self_id == user_id:
# 不发送机器人戳别人的消息
return None, None
else:
# 如果配置为忽略不是针对自己的戳一戳则直接返回None
if self._get_config("features.ignore_non_self_poke", False):
logger.debug("忽略不是针对自己的戳一戳消息")
return None, None
if group_id:
fetched_member_info: dict | None = await get_member_info(group_id, target_id)
if fetched_member_info:
target_name = fetched_member_info.get("nickname", "QQ用户")
else:
target_name = "QQ用户"
logger.debug("无法获取被戳一戳方的用户昵称")
display_name = user_name
else:
return None, None
# 解析戳一戳文本
first_txt: str = "戳了戳"
second_txt: str = ""
try:
if len(raw_info) > 2:
first_txt = raw_info[2].get("txt", "戳了戳")
if len(raw_info) > 4:
second_txt = raw_info[4].get("txt", "")
except Exception as e:
logger.warning(f"解析戳一戳消息失败: {str(e)},将使用默认文本")
user_info: UserInfoPayload = {
"platform": "qq",
"user_id": str(user_id),
"user_nickname": user_name,
"user_cardname": user_cardname,
}
seg_data: SegPayload = {
"type": "text",
"data": f"{display_name}{first_txt}{target_name}{second_txt}这是QQ的一个功能用于提及某人但没那么明显",
}
return seg_data, user_info
async def _handle_group_emoji_like_notify(
self, raw: Dict[str, Any], group_id: Any, user_id: Any
) -> Tuple[SegPayload | None, UserInfoPayload | None]:
"""处理群聊表情回复通知"""
if not group_id:
logger.error("群ID不能为空无法处理群聊表情回复通知")
return None, None
user_qq_info: dict | None = await get_member_info(group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname", "QQ用户")
user_cardname = user_qq_info.get("card", "")
else:
user_name = "QQ用户"
user_cardname = ""
logger.debug("无法获取表情回复对方的用户昵称")
# 触发事件
from src.plugin_system.core.event_manager import event_manager
from ...event_types import NapcatEvent
target_message = await get_message_detail(raw.get("message_id", ""))
target_message_text = ""
if target_message:
target_message_text = target_message.get("raw_message", "")
else:
logger.error("未找到对应消息")
return None, None
if len(target_message_text) > 15:
target_message_text = target_message_text[:15] + "..."
user_info: UserInfoPayload = {
"platform": "qq",
"user_id": str(user_id),
"user_nickname": user_name,
"user_cardname": user_cardname,
}
likes_list = raw.get("likes", [])
like_emoji_id = ""
if likes_list and len(likes_list) > 0:
like_emoji_id = str(likes_list[0].get("emoji_id", ""))
# 触发表情回复事件
await event_manager.trigger_event(
NapcatEvent.ON_RECEIVED.EMOJI_LIEK,
permission_group=PLUGIN_NAME,
group_id=group_id,
user_id=user_id,
message_id=raw.get("message_id", ""),
emoji_id=like_emoji_id,
)
emoji_text = QQ_FACE.get(like_emoji_id, f"[表情{like_emoji_id}]")
seg_data: SegPayload = {
"type": "text",
"data": f"{user_name}使用Emoji表情{emoji_text}回应了消息[{target_message_text}]",
}
return seg_data, user_info
async def _handle_group_upload_notify(
self, raw: Dict[str, Any], group_id: Any, user_id: Any, self_id: Any
) -> Tuple[SegPayload | None, UserInfoPayload | None]:
"""处理群文件上传通知"""
if not group_id:
logger.error("群ID不能为空无法处理群文件上传通知")
return None, None
user_qq_info: dict | None = await get_member_info(group_id, user_id)
if user_qq_info:
user_name = user_qq_info.get("nickname", "QQ用户")
user_cardname = user_qq_info.get("card", "")
else:
user_name = "QQ用户"
user_cardname = ""
logger.debug("无法获取上传文件的用户昵称")
file_info = raw.get("file")
if not file_info:
logger.error("群文件上传通知中缺少文件信息")
return None, None
user_info: UserInfoPayload = {
"platform": "qq",
"user_id": str(user_id),
"user_nickname": user_name,
"user_cardname": user_cardname,
}
file_name = file_info.get("name", "未知文件")
file_size = file_info.get("size", 0)
seg_data: SegPayload = {
"type": "text",
"data": f"{user_name} 上传了文件: {file_name} (大小: {file_size} 字节)",
}
return seg_data, user_info
async def _handle_ban_notify(
self, raw: Dict[str, Any], group_id: Any
) -> Tuple[SegPayload | None, UserInfoPayload | None]:
"""处理群禁言通知"""
if not group_id:
logger.error("群ID不能为空无法处理禁言通知")
return None, None
# 获取操作者信息
operator_id = raw.get("operator_id")
operator_nickname: str = "QQ用户"
operator_cardname: str = ""
member_info: dict | None = await get_member_info(group_id, operator_id)
if member_info:
operator_nickname = member_info.get("nickname", "QQ用户")
operator_cardname = member_info.get("card", "")
else:
logger.warning("无法获取禁言执行者的昵称,消息可能会无效")
operator_info: UserInfoPayload = {
"platform": "qq",
"user_id": str(operator_id),
"user_nickname": operator_nickname,
"user_cardname": operator_cardname,
}
# 获取被禁言者信息
user_id = raw.get("user_id")
banned_user_info: Dict[str, Any] | None = None
user_nickname: str = "QQ用户"
user_cardname: str = ""
sub_type: str = ""
duration = raw.get("duration")
if duration is None:
logger.error("禁言时长不能为空,无法处理禁言通知")
return None, None
if user_id == 0: # 全体禁言
sub_type = "whole_ban"
else: # 单人禁言
sub_type = "ban"
fetched_member_info: dict | None = await get_member_info(group_id, user_id)
if fetched_member_info:
user_nickname = fetched_member_info.get("nickname", "QQ用户")
user_cardname = fetched_member_info.get("card", "")
banned_user_info = {
"platform": "qq",
"user_id": str(user_id),
"user_nickname": user_nickname,
"user_cardname": user_cardname,
}
seg_data: SegPayload = {
"type": "notify",
"data": {
"sub_type": sub_type,
"duration": duration,
"banned_user_info": banned_user_info,
},
}
return seg_data, operator_info
async def _handle_lift_ban_notify(
self, raw: Dict[str, Any], group_id: Any
) -> Tuple[SegPayload | None, UserInfoPayload | None]:
"""处理解除群禁言通知"""
if not group_id:
logger.error("群ID不能为空无法处理解除禁言通知")
return None, None
# 获取操作者信息
operator_id = raw.get("operator_id")
operator_nickname: str = "QQ用户"
operator_cardname: str = ""
member_info: dict | None = await get_member_info(group_id, operator_id)
if member_info:
operator_nickname = member_info.get("nickname", "QQ用户")
operator_cardname = member_info.get("card", "")
else:
logger.warning("无法获取解除禁言执行者的昵称,消息可能会无效")
operator_info: UserInfoPayload = {
"platform": "qq",
"user_id": str(operator_id),
"user_nickname": operator_nickname,
"user_cardname": operator_cardname,
}
# 获取被解除禁言者信息
sub_type: str = ""
user_nickname: str = "QQ用户"
user_cardname: str = ""
lifted_user_info: Dict[str, Any] | None = None
user_id = raw.get("user_id")
if user_id == 0: # 全体禁言解除
sub_type = "whole_lift_ban"
else: # 单人禁言解除
sub_type = "lift_ban"
fetched_member_info: dict | None = await get_member_info(group_id, user_id)
if fetched_member_info:
user_nickname = fetched_member_info.get("nickname", "QQ用户")
user_cardname = fetched_member_info.get("card", "")
else:
logger.warning("无法获取解除禁言消息发送者的昵称,消息可能会无效")
lifted_user_info = {
"platform": "qq",
"user_id": str(user_id),
"user_nickname": user_nickname,
"user_cardname": user_cardname,
}
seg_data: SegPayload = {
"type": "notify",
"data": {
"sub_type": sub_type,
"lifted_user_info": lifted_user_info,
}
}
return seg_data, operator_info