Files
Mofox-Core/plugins/napcat_adapter_plugin/event_handlers.py
Windpicker-owo 79eec5f66e 修复ada插件
2025-08-30 22:06:00 +08:00

1860 lines
72 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from src.plugin_system import BaseEventHandler
from src.plugin_system.base.base_event import HandlerResult
from .src.send_handler import send_handler
from .event_types import NapcatEvent
from src.common.logger import get_logger
logger = get_logger("napcat_adapter")
class SetProfileHandler(BaseEventHandler):
handler_name: str = "napcat_set_qq_profile_handler"
handler_description: str = "设置账号信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SET_PROFILE]
async def execute(self,params:dict):
raw = params.get("raw",{})
nickname = params.get("nickname","")
personal_note = params.get("personal_note","")
sex = params.get("sex","")
if params.get("raw",""):
nickname = raw.get("nickname","")
personal_note = raw.get("personal_note","")
sex = raw.get("sex","")
if not nickname:
logger.error("事件 napcat_set_qq_profile 缺少必要参数: nickname ")
return HandlerResult(False,False,{"status":"error"})
payload = {
"nickname": nickname,
"personal_note": personal_note,
"sex": sex
}
response = await send_handler.send_message_to_napcat(action="set_qq_profile",params=payload)
if response.get("status","") == "ok":
if response.get("data","").get("result","") == 0:
return HandlerResult(True,True,response)
else:
logger.error(f"事件 napcat_set_qq_profile 请求失败err={response.get("data","").get("errMsg","")}")
return HandlerResult(False,False,response)
else:
logger.error("事件 napcat_set_qq_profile 请求失败!")
return HandlerResult(False,False,{"status":"error"})
class GetOnlineClientsHandler(BaseEventHandler):
handler_name: str = "napcat_get_online_clients_handler"
handler_description: str = "获取当前账号在线客户端列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_ONLINE_CLIENTS]
async def execute(self, params: dict):
raw = params.get("raw", {})
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
no_cache = raw.get("no_cache", False)
payload = {
"no_cache": no_cache
}
response = await send_handler.send_message_to_napcat(action="get_online_clients", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_online_clients 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetOnlineStatusHandler(BaseEventHandler):
handler_name: str = "napcat_set_online_status_handler"
handler_description: str = "设置在线状态"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SET_ONLINE_STATUS]
async def execute(self, params: dict):
raw = params.get("raw", {})
status = params.get("status", "")
ext_status = params.get("ext_status", "0")
battery_status = params.get("battery_status", "0")
if params.get("raw", ""):
status = raw.get("status", "")
ext_status = raw.get("ext_status", "0")
battery_status = raw.get("battery_status", "0")
if not status:
logger.error("事件 napcat_set_online_status 缺少必要参数: status")
return HandlerResult(False, False, {"status": "error"})
payload = {
"status": status,
"ext_status": ext_status,
"battery_status": battery_status
}
response = await send_handler.send_message_to_napcat(action="set_online_status", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_online_status 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetFriendsWithCategoryHandler(BaseEventHandler):
handler_name: str = "napcat_get_friends_with_category_handler"
handler_description: str = "获取好友分组列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_FRIENDS_WITH_CATEGORY]
async def execute(self, params: dict):
payload = {}
response = await send_handler.send_message_to_napcat(action="get_friends_with_category", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_friends_with_category 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetAvatarHandler(BaseEventHandler):
handler_name: str = "napcat_set_qq_avatar_handler"
handler_description: str = "设置头像"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SET_AVATAR]
async def execute(self, params: dict):
raw = params.get("raw", {})
file = params.get("file", "")
if params.get("raw", ""):
file = raw.get("file", "")
if not file:
logger.error("事件 napcat_set_qq_avatar 缺少必要参数: file")
return HandlerResult(False, False, {"status": "error"})
payload = {
"file": file
}
response = await send_handler.send_message_to_napcat(action="set_qq_avatar", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_qq_avatar 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SendLikeHandler(BaseEventHandler):
handler_name: str = "napcat_send_like_handler"
handler_description: str = "点赞"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SEND_LIKE]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
times = params.get("times", 1)
if params.get("raw", ""):
user_id = raw.get("user_id", "")
times = raw.get("times", 1)
if not user_id:
logger.error("事件 napcat_send_like 缺少必要参数: user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id),
"times": times
}
response = await send_handler.send_message_to_napcat(action="send_like", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_like 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetFriendAddRequestHandler(BaseEventHandler):
handler_name: str = "napcat_set_friend_add_request_handler"
handler_description: str = "处理好友请求"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SET_FRIEND_ADD_REQUEST]
async def execute(self, params: dict):
raw = params.get("raw", {})
flag = params.get("flag", "")
approve = params.get("approve", True)
remark = params.get("remark", "")
if params.get("raw", ""):
flag = raw.get("flag", "")
approve = raw.get("approve", True)
remark = raw.get("remark", "")
if not flag or approve is None or remark is None:
logger.error("事件 napcat_set_friend_add_request 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"flag": flag,
"approve": approve,
"remark": remark
}
response = await send_handler.send_message_to_napcat(action="set_friend_add_request", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_friend_add_request 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetSelfLongnickHandler(BaseEventHandler):
handler_name: str = "napcat_set_self_longnick_handler"
handler_description: str = "设置个性签名"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SET_SELF_LONGNICK]
async def execute(self, params: dict):
raw = params.get("raw", {})
longNick = params.get("longNick", "")
if params.get("raw", ""):
longNick = raw.get("longNick", "")
if not longNick:
logger.error("事件 napcat_set_self_longnick 缺少必要参数: longNick")
return HandlerResult(False, False, {"status": "error"})
payload = {
"longNick": longNick
}
response = await send_handler.send_message_to_napcat(action="set_self_longnick", params=payload)
if response.get("status", "") == "ok":
if response.get("data", {}).get("result", "") == 0:
return HandlerResult(True, True, response)
else:
logger.error(f"事件 napcat_set_self_longnick 请求失败err={response.get('data', {}).get('errMsg', '')}")
return HandlerResult(False, False, response)
else:
logger.error("事件 napcat_set_self_longnick 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetLoginInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_login_info_handler"
handler_description: str = "获取登录号信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_LOGIN_INFO]
async def execute(self, params: dict):
payload = {}
response = await send_handler.send_message_to_napcat(action="get_login_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_login_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetRecentContactHandler(BaseEventHandler):
handler_name: str = "napcat_get_recent_contact_handler"
handler_description: str = "最近消息列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_RECENT_CONTACT]
async def execute(self, params: dict):
raw = params.get("raw", {})
count = params.get("count", 20)
if params.get("raw", ""):
count = raw.get("count", 20)
payload = {
"count": count
}
response = await send_handler.send_message_to_napcat(action="get_recent_contact", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_recent_contact 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetStrangerInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_stranger_info_handler"
handler_description: str = "获取(指定)账号信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_STRANGER_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
if params.get("raw", ""):
user_id = raw.get("user_id", "")
if not user_id:
logger.error("事件 napcat_get_stranger_info 缺少必要参数: user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id)
}
response = await send_handler.send_message_to_napcat(action="get_stranger_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_stranger_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetFriendListHandler(BaseEventHandler):
handler_name: str = "napcat_get_friend_list_handler"
handler_description: str = "获取好友列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_FRIEND_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
no_cache = raw.get("no_cache", False)
payload = {
"no_cache": no_cache
}
response = await send_handler.send_message_to_napcat(action="get_friend_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_friend_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetProfileLikeHandler(BaseEventHandler):
handler_name: str = "napcat_get_profile_like_handler"
handler_description: str = "获取点赞列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_PROFILE_LIKE]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
start = params.get("start", 0)
count = params.get("count", 10)
if params.get("raw", ""):
user_id = raw.get("user_id", "")
start = raw.get("start", 0)
count = raw.get("count", 10)
payload = {
"start": start,
"count": count
}
if user_id:
payload["user_id"] = str(user_id)
response = await send_handler.send_message_to_napcat(action="get_profile_like", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_profile_like 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class DeleteFriendHandler(BaseEventHandler):
handler_name: str = "napcat_delete_friend_handler"
handler_description: str = "删除好友"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.DELETE_FRIEND]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
temp_block = params.get("temp_block", False)
temp_both_del = params.get("temp_both_del", False)
if params.get("raw", ""):
user_id = raw.get("user_id", "")
temp_block = raw.get("temp_block", False)
temp_both_del = raw.get("temp_both_del", False)
if not user_id or temp_block is None or temp_both_del is None:
logger.error("事件 napcat_delete_friend 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id),
"temp_block": temp_block,
"temp_both_del": temp_both_del
}
response = await send_handler.send_message_to_napcat(action="delete_friend", params=payload)
if response.get("status", "") == "ok":
if response.get("data", {}).get("result", "") == 0:
return HandlerResult(True, True, response)
else:
logger.error(f"事件 napcat_delete_friend 请求失败err={response.get('data', {}).get('errMsg', '')}")
return HandlerResult(False, False, response)
else:
logger.error("事件 napcat_delete_friend 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetUserStatusHandler(BaseEventHandler):
handler_name: str = "napcat_get_user_status_handler"
handler_description: str = "获取(指定)用户状态"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_USER_STATUS]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
if params.get("raw", ""):
user_id = raw.get("user_id", "")
if not user_id:
logger.error("事件 napcat_get_user_status 缺少必要参数: user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id)
}
response = await send_handler.send_message_to_napcat(action="get_user_status", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_user_status 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetStatusHandler(BaseEventHandler):
handler_name: str = "napcat_get_status_handler"
handler_description: str = "获取状态"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_STATUS]
async def execute(self, params: dict):
payload = {}
response = await send_handler.send_message_to_napcat(action="get_status", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_status 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetMiniAppArkHandler(BaseEventHandler):
handler_name: str = "napcat_get_mini_app_ark_handler"
handler_description: str = "获取小程序卡片"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.GET_MINI_APP_ARK]
async def execute(self, params: dict):
raw = params.get("raw", {})
type = params.get("type", "")
title = params.get("title", "")
desc = params.get("desc", "")
picUrl = params.get("picUrl", "")
jumpUrl = params.get("jumpUrl", "")
webUrl = params.get("webUrl", "")
rawArkData = params.get("rawArkData", False)
if params.get("raw", ""):
type = raw.get("type", "")
title = raw.get("title", "")
desc = raw.get("desc", "")
picUrl = raw.get("picUrl", "")
jumpUrl = raw.get("jumpUrl", "")
webUrl = raw.get("webUrl", "")
rawArkData = raw.get("rawArkData", False)
if not type or not title or not desc or not picUrl or not jumpUrl:
logger.error("事件 napcat_get_mini_app_ark 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"type": type,
"title": title,
"desc": desc,
"picUrl": picUrl,
"jumpUrl": jumpUrl,
"webUrl": webUrl,
"rawArkData": rawArkData
}
response = await send_handler.send_message_to_napcat(action="get_mini_app_ark", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_mini_app_ark 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetDiyOnlineStatusHandler(BaseEventHandler):
handler_name: str = "napcat_set_diy_online_status_handler"
handler_description: str = "设置自定义在线状态"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.ACCOUNT.SET_DIY_ONLINE_STATUS]
async def execute(self, params: dict):
raw = params.get("raw", {})
face_id = params.get("face_id", "")
face_type = params.get("face_type", "0")
wording = params.get("wording", "")
if params.get("raw", ""):
face_id = raw.get("face_id", "")
face_type = raw.get("face_type", "0")
wording = raw.get("wording", "")
if not face_id:
logger.error("事件 napcat_set_diy_online_status 缺少必要参数: face_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"face_id": str(face_id),
"face_type": str(face_type),
"wording": wording
}
response = await send_handler.send_message_to_napcat(action="set_diy_online_status", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_diy_online_status 请求失败!")
return HandlerResult(False, False, {"status": "error"})
# ===MESSAGE===
class SendPrivateMsgHandler(BaseEventHandler):
handler_name: str = "napcat_send_private_msg_handler"
handler_description: str = "发送私聊消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.SEND_PRIVATE_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
message = params.get("message", "")
if params.get("raw", ""):
user_id = raw.get("user_id", "")
message = raw.get("message", "")
if not user_id or not message:
logger.error("事件 napcat_send_private_msg 缺少必要参数: user_id 或 message")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id),
"message": message
}
response = await send_handler.send_message_to_napcat(action="send_private_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_private_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SendPokeHandler(BaseEventHandler):
handler_name: str = "napcat_send_poke_handler"
handler_description: str = "发送戳一戳"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.SEND_POKE]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
group_id = params.get("group_id", None)
if params.get("raw", ""):
user_id = raw.get("user_id", "")
group_id = raw.get("group_id", None)
if not user_id:
logger.error("事件 napcat_send_poke 缺少必要参数: user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id)
}
if group_id is not None:
payload["group_id"] = str(group_id)
response = await send_handler.send_message_to_napcat(action="send_poke", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_poke 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class DeleteMsgHandler(BaseEventHandler):
handler_name: str = "napcat_delete_msg_handler"
handler_description: str = "撤回消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.DELETE_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_delete_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="delete_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_delete_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupMsgHistoryHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_msg_history_handler"
handler_description: str = "获取群历史消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.GET_GROUP_MSG_HISTORY]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
message_seq = params.get("message_seq", 0)
count = params.get("count", 20)
reverseOrder = params.get("reverseOrder", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
message_seq = raw.get("message_seq", 0)
count = raw.get("count", 20)
reverseOrder = raw.get("reverseOrder", False)
if not group_id:
logger.error("事件 napcat_get_group_msg_history 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"message_seq": int(message_seq),
"count": int(count),
"reverseOrder": bool(reverseOrder)
}
response = await send_handler.send_message_to_napcat(action="get_group_msg_history", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_msg_history 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetMsgHandler(BaseEventHandler):
handler_name: str = "napcat_get_msg_handler"
handler_description: str = "获取消息详情"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.GET_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_get_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="get_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetForwardMsgHandler(BaseEventHandler):
handler_name: str = "napcat_get_forward_msg_handler"
handler_description: str = "获取合并转发消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.GET_FORWARD_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_get_forward_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="get_forward_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_forward_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetMsgEmojiLikeHandler(BaseEventHandler):
handler_name: str = "napcat_set_msg_emoji_like_handler"
handler_description: str = "贴表情"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.SET_MSG_EMOJI_LIKE]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
emoji_id = params.get("emoji_id", 0)
set_flag = params.get("set", True)
if params.get("raw", ""):
message_id = raw.get("message_id", "")
emoji_id = raw.get("emoji_id", 0)
set_flag = raw.get("set", True)
if not message_id or emoji_id is None or set_flag is None:
logger.error("事件 napcat_set_msg_emoji_like 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id),
"emoji_id": int(emoji_id),
"set": bool(set_flag)
}
response = await send_handler.send_message_to_napcat(action="set_msg_emoji_like", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_msg_emoji_like 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetFriendMsgHistoryHandler(BaseEventHandler):
handler_name: str = "napcat_get_friend_msg_history_handler"
handler_description: str = "获取好友历史消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.GET_FRIEND_MSG_HISTORY]
async def execute(self, params: dict):
raw = params.get("raw", {})
user_id = params.get("user_id", "")
message_seq = params.get("message_seq", 0)
count = params.get("count", 20)
reverseOrder = params.get("reverseOrder", False)
if params.get("raw", ""):
user_id = raw.get("user_id", "")
message_seq = raw.get("message_seq", 0)
count = raw.get("count", 20)
reverseOrder = raw.get("reverseOrder", False)
if not user_id:
logger.error("事件 napcat_get_friend_msg_history 缺少必要参数: user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"user_id": str(user_id),
"message_seq": int(message_seq),
"count": int(count),
"reverseOrder": bool(reverseOrder)
}
response = await send_handler.send_message_to_napcat(action="get_friend_msg_history", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_friend_msg_history 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class FetchEmojiLikeHandler(BaseEventHandler):
handler_name: str = "napcat_fetch_emoji_like_handler"
handler_description: str = "获取贴表情详情"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.FETCH_EMOJI_LIKE]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
emoji_id = params.get("emoji_id", "")
emoji_type = params.get("emoji_type", "")
count = params.get("count", 20)
if params.get("raw", ""):
message_id = raw.get("message_id", "")
emoji_id = raw.get("emoji_id", "")
emoji_type = raw.get("emoji_type", "")
count = raw.get("count", 20)
if not message_id or not emoji_id or not emoji_type:
logger.error("事件 napcat_fetch_emoji_like 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id),
"emojiId": str(emoji_id),
"emojiType": str(emoji_type),
"count": int(count)
}
response = await send_handler.send_message_to_napcat(action="fetch_emoji_like", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_fetch_emoji_like 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SendForwardMsgHandler(BaseEventHandler):
handler_name: str = "napcat_send_forward_msg_handler"
handler_description: str = "发送合并转发消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.SEND_FORWARD_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
messages = params.get("messages", {})
news = params.get("news", {})
prompt = params.get("prompt", "")
summary = params.get("summary", "")
source = params.get("source", "")
group_id = params.get("group_id", None)
user_id = params.get("user_id", None)
if params.get("raw", ""):
messages = raw.get("messages", {})
news = raw.get("news", {})
prompt = raw.get("prompt", "")
summary = raw.get("summary", "")
source = raw.get("source", "")
group_id = raw.get("group_id", None)
user_id = raw.get("user_id", None)
if not messages or not news or not prompt or not summary or not source:
logger.error("事件 napcat_send_forward_msg 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"messages": messages,
"news": news,
"prompt": prompt,
"summary": summary,
"source": source
}
if group_id is not None:
payload["group_id"] = str(group_id)
if user_id is not None:
payload["user_id"] = str(user_id)
response = await send_handler.send_message_to_napcat(action="send_forward_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_forward_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SendGroupAiRecordHandler(BaseEventHandler):
handler_name: str = "napcat_send_group_ai_record_handler"
handler_description: str = "发送群AI语音"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.SEND_GROUP_AI_RECORD]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
character = params.get("character", "")
text = params.get("text", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
character = raw.get("character", "")
text = raw.get("text", "")
if not group_id or not character or not text:
logger.error("事件 napcat_send_group_ai_record 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"character": character,
"text": text
}
response = await send_handler.send_message_to_napcat(action="send_group_ai_record", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_group_ai_record 请求失败!")
return HandlerResult(False, False, {"status": "error"})
# ===GROUP===
class GetGroupInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_info_handler"
handler_description: str = "获取群信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_info 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupAddOptionHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_add_option_handler"
handler_description: str = "设置群添加选项"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADD_OPTION]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
add_type = params.get("add_type", "")
group_question = params.get("group_question", "")
group_answer = params.get("group_answer", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
add_type = raw.get("add_type", "")
group_question = raw.get("group_question", "")
group_answer = raw.get("group_answer", "")
if not group_id or not add_type:
logger.error("事件 napcat_set_group_add_option 缺少必要参数: group_id 或 add_type")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"add_type": str(add_type)
}
if group_question:
payload["group_question"] = group_question
if group_answer:
payload["group_answer"] = group_answer
response = await send_handler.send_message_to_napcat(action="set_group_add_option", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_add_option 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupKickMembersHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_kick_members_handler"
handler_description: str = "批量踢出群成员"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_KICK_MEMBERS]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", [])
reject_add_request = params.get("reject_add_request", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", [])
reject_add_request = raw.get("reject_add_request", False)
if not group_id or not user_id:
logger.error("事件 napcat_set_group_kick_members 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": user_id,
"reject_add_request": bool(reject_add_request)
}
response = await send_handler.send_message_to_napcat(action="set_group_kick_members", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_kick_members 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupRemarkHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_remark_handler"
handler_description: str = "设置群备注"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_REMARK]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
remark = params.get("remark", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
remark = raw.get("remark", "")
if not group_id or not remark:
logger.error("事件 napcat_set_group_remark 缺少必要参数: group_id 或 remark")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"remark": remark
}
response = await send_handler.send_message_to_napcat(action="set_group_remark", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_remark 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupKickHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_kick_handler"
handler_description: str = "群踢人"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_KICK]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
reject_add_request = params.get("reject_add_request", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
reject_add_request = raw.get("reject_add_request", False)
if not group_id or not user_id:
logger.error("事件 napcat_set_group_kick 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"reject_add_request": bool(reject_add_request)
}
response = await send_handler.send_message_to_napcat(action="set_group_kick", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_kick 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupSystemMsgHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_system_msg_handler"
handler_description: str = "获取群系统消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_SYSTEM_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
count = params.get("count", 20)
if params.get("raw", ""):
count = raw.get("count", 20)
if count is None:
logger.error("事件 napcat_get_group_system_msg 缺少必要参数: count")
return HandlerResult(False, False, {"status": "error"})
payload = {
"count": int(count)
}
response = await send_handler.send_message_to_napcat(action="get_group_system_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_system_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupBanHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_ban_handler"
handler_description: str = "群禁言"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_BAN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
duration = params.get("duration", 0)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
duration = raw.get("duration", 0)
if not group_id or not user_id or duration is None:
logger.error("事件 napcat_set_group_ban 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"duration": int(duration)
}
response = await send_handler.send_message_to_napcat(action="set_group_ban", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_ban 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetEssenceMsgListHandler(BaseEventHandler):
handler_name: str = "napcat_get_essence_msg_list_handler"
handler_description: str = "获取群精华消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_ESSENCE_MSG_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_essence_msg_list 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_essence_msg_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_essence_msg_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupWholeBanHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_whole_ban_handler"
handler_description: str = "全体禁言"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_WHOLE_BAN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
enable = params.get("enable", True)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
enable = raw.get("enable", True)
if not group_id or enable is None:
logger.error("事件 napcat_set_group_whole_ban 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"enable": bool(enable)
}
response = await send_handler.send_message_to_napcat(action="set_group_whole_ban", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_whole_ban 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupPortraitHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_portrait_handler"
handler_description: str = "设置群头像"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_PORTRAINT]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
file_path = params.get("file", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
file_path = raw.get("file", "")
if not group_id or not file_path:
logger.error("事件 napcat_set_group_portrait 缺少必要参数: group_id 或 file")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"file": file_path
}
response = await send_handler.send_message_to_napcat(action="set_group_portrait", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_portrait 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupAdminHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_admin_handler"
handler_description: str = "设置群管理"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADMIN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
enable = params.get("enable", True)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
enable = raw.get("enable", True)
if not group_id or not user_id or enable is None:
logger.error("事件 napcat_set_group_admin 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"enable": bool(enable)
}
response = await send_handler.send_message_to_napcat(action="set_group_admin", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_admin 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupCardHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_card_handler"
handler_description: str = "设置群成员名片"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_CARD]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
card = params.get("card", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
card = raw.get("card", "")
if not group_id or not user_id:
logger.error("事件 napcat_set_group_card 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id)
}
if card:
payload["card"] = card
response = await send_handler.send_message_to_napcat(action="group_card", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_card 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetEssenceMsgHandler(BaseEventHandler):
handler_name: str = "napcat_set_essence_msg_handler"
handler_description: str = "设置群精华消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_ESSENCE_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_set_essence_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="set_essence_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_essence_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupNameHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_name_handler"
handler_description: str = "设置群名"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_NAME]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
group_name = params.get("group_name", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
group_name = raw.get("group_name", "")
if not group_id or not group_name:
logger.error("事件 napcat_set_group_name 缺少必要参数: group_id 或 group_name")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"group_name": group_name
}
response = await send_handler.send_message_to_napcat(action="set_group_name", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_name 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class DeleteEssenceMsgHandler(BaseEventHandler):
handler_name: str = "napcat_delete_essence_msg_handler"
handler_description: str = "删除群精华消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.DELETE_ESSENCE_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_delete_essence_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="delete_essence_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_delete_essence_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupLeaveHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_leave_handler"
handler_description: str = "退群"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_LEAVE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_set_group_leave 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="set_group_leave", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_leave 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SendGroupNoticeHandler(BaseEventHandler):
handler_name: str = "napcat_send_group_notice_handler"
handler_description: str = "发送群公告"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SEND_GROUP_NOTICE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
content = params.get("content", "")
image = params.get("image", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
content = raw.get("content", "")
image = raw.get("image", "")
if not group_id or not content:
logger.error("事件 napcat_send_group_notice 缺少必要参数: group_id 或 content")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"content": content
}
if image:
payload["image"] = image
response = await send_handler.send_message_to_napcat(action="_send_group_notice", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_group_notice 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupSpecialTitleHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_special_title_handler"
handler_description: str = "设置群头衔"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_SPECIAL_TITLE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
special_title = params.get("special_title", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
special_title = raw.get("special_title", "")
if not group_id or not user_id:
logger.error("事件 napcat_set_group_special_title 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id)
}
if special_title:
payload["special_title"] = special_title
response = await send_handler.send_message_to_napcat(action="set_group_special_title", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_special_title 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupNoticeHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_notice_handler"
handler_description: str = "获取群公告"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_NOTICE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_notice 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="_get_group_notice", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_notice 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupAddRequestHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_add_request_handler"
handler_description: str = "处理加群请求"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADD_REQUEST]
async def execute(self, params: dict):
raw = params.get("raw", {})
flag = params.get("flag", "")
approve = params.get("approve", True)
reason = params.get("reason", "")
if params.get("raw", ""):
flag = raw.get("flag", "")
approve = raw.get("approve", True)
reason = raw.get("reason", "")
if not flag or approve is None:
logger.error("事件 napcat_set_group_add_request 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"flag": flag,
"approve": bool(approve)
}
if reason:
payload["reason"] = reason
response = await send_handler.send_message_to_napcat(action="set_group_add_request", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_add_request 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupListHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_list_handler"
handler_description: str = "获取群列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
no_cache = raw.get("no_cache", False)
payload = {
"no_cache": bool(no_cache)
}
response = await send_handler.send_message_to_napcat(action="get_group_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class DeleteGroupNoticeHandler(BaseEventHandler):
handler_name: str = "napcat_del_group_notice_handler"
handler_description: str = "删除群公告"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.DELETE_GROUP_NOTICE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
notice_id = params.get("notice_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
notice_id = raw.get("notice_id", "")
if not group_id or not notice_id:
logger.error("事件 napcat_del_group_notice 缺少必要参数: group_id 或 notice_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"notice_id": notice_id
}
response = await send_handler.send_message_to_napcat(action="_del_group_notice", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_del_group_notice 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupMemberInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_member_info_handler"
handler_description: str = "获取群成员信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_MEMBER_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
no_cache = raw.get("no_cache", False)
if not group_id or not user_id:
logger.error("事件 napcat_get_group_member_info 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"no_cache": bool(no_cache)
}
response = await send_handler.send_message_to_napcat(action="get_group_member_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_member_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupMemberListHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_member_list_handler"
handler_description: str = "获取群成员列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_MEMBER_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
no_cache = raw.get("no_cache", False)
if not group_id:
logger.error("事件 napcat_get_group_member_list 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"no_cache": bool(no_cache)
}
response = await send_handler.send_message_to_napcat(action="get_group_member_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_member_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupHonorInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_honor_info_handler"
handler_description: str = "获取群荣誉"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_HONOR_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
type = params.get("type", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
type = raw.get("type", "")
if not group_id:
logger.error("事件 napcat_get_group_honor_info 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
if type:
payload["type"] = type
response = await send_handler.send_message_to_napcat(action="get_group_honor_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_honor_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupInfoExHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_info_ex_handler"
handler_description: str = "获取群信息ex"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_INFO_EX]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_info_ex 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_info_ex", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_info_ex 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupAtAllRemainHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_at_all_remain_handler"
handler_description: str = "获取群 @全体成员 剩余次数"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_AT_ALL_REMAIN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_at_all_remain 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_at_all_remain", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_at_all_remain 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupShutListHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_shut_list_handler"
handler_description: str = "获取群禁言列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_SHUT_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_shut_list 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_shut_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_shut_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupIgnoredNotifiesHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_ignored_notifies_handler"
handler_description: str = "获取群过滤系统消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_IGNORED_NOTIFIES]
async def execute(self, params: dict):
payload = {}
response = await send_handler.send_message_to_napcat(action="get_group_ignored_notifies", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_ignored_notifies 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupSignHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_sign_handler"
handler_description: str = "群打卡"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_SIGN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_set_group_sign 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="set_group_sign", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_sign 请求失败!")
return HandlerResult(False, False, {"status": "error"})