feat(napcat): 补充全部群事件处理与事件定义

新增近 30 个群相关事件处理器,覆盖群信息、成员管理、禁言、公告、精华消息等功能;修正合并转发消息事件常量拼写;补全事件类型注解与详细文档。所有群事件统一使用 `NapcatEvent.GROUP.*` 枚举值与对应 Handler。
This commit is contained in:
Windpicker-owo
2025-08-30 17:55:13 +08:00
parent dbe2075090
commit efdda4d6f4
2 changed files with 1784 additions and 48 deletions

View File

@@ -549,6 +549,7 @@ class SetDiyOnlineStatusHandler(BaseEventHandler):
return HandlerResult(False, False, {"status": "error"})
# ===MESSAGE===
class SendPrivateMsgHandler(BaseEventHandler):
handler_name: str = "napcat_send_private_msg_handler"
handler_description: str = "发送私聊消息"
@@ -856,7 +857,7 @@ class SendForwardMsgHandler(BaseEventHandler):
handler_description: str = "发送合并转发消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.MESSAGE.SEND_FORWARF_MSG]
init_subscribe = [NapcatEvent.MESSAGE.SEND_FORWARD_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
@@ -934,3 +935,925 @@ class SendGroupAiRecordHandler(BaseEventHandler):
else:
logger.error("事件 napcat_send_group_ai_record 请求失败!")
return HandlerResult(False, False, {"status": "error"})
# ===GROUP===
class GetGroupInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_info_handler"
handler_description: str = "获取群信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_info 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupAddOptionHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_add_option_handler"
handler_description: str = "设置群添加选项"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADD_OPTION]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
add_type = params.get("add_type", "")
group_question = params.get("group_question", "")
group_answer = params.get("group_answer", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
add_type = raw.get("add_type", "")
group_question = raw.get("group_question", "")
group_answer = raw.get("group_answer", "")
if not group_id or not add_type:
logger.error("事件 napcat_set_group_add_option 缺少必要参数: group_id 或 add_type")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"add_type": str(add_type)
}
if group_question:
payload["group_question"] = group_question
if group_answer:
payload["group_answer"] = group_answer
response = await send_handler.send_message_to_napcat(action="set_group_add_option", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_add_option 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupKickMembersHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_kick_members_handler"
handler_description: str = "批量踢出群成员"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_KICK_MEMBERS]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", [])
reject_add_request = params.get("reject_add_request", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", [])
reject_add_request = raw.get("reject_add_request", False)
if not group_id or not user_id:
logger.error("事件 napcat_set_group_kick_members 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": user_id,
"reject_add_request": bool(reject_add_request)
}
response = await send_handler.send_message_to_napcat(action="set_group_kick_members", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_kick_members 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupRemarkHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_remark_handler"
handler_description: str = "设置群备注"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_REMARK]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
remark = params.get("remark", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
remark = raw.get("remark", "")
if not group_id or not remark:
logger.error("事件 napcat_set_group_remark 缺少必要参数: group_id 或 remark")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"remark": remark
}
response = await send_handler.send_message_to_napcat(action="set_group_remark", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_remark 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupKickHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_kick_handler"
handler_description: str = "群踢人"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_KICK]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
reject_add_request = params.get("reject_add_request", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
reject_add_request = raw.get("reject_add_request", False)
if not group_id or not user_id:
logger.error("事件 napcat_set_group_kick 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"reject_add_request": bool(reject_add_request)
}
response = await send_handler.send_message_to_napcat(action="set_group_kick", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_kick 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupSystemMsgHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_system_msg_handler"
handler_description: str = "获取群系统消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_SYSTEM_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
count = params.get("count", 20)
if params.get("raw", ""):
count = raw.get("count", 20)
if count is None:
logger.error("事件 napcat_get_group_system_msg 缺少必要参数: count")
return HandlerResult(False, False, {"status": "error"})
payload = {
"count": int(count)
}
response = await send_handler.send_message_to_napcat(action="get_group_system_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_system_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupBanHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_ban_handler"
handler_description: str = "群禁言"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_BAN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
duration = params.get("duration", 0)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
duration = raw.get("duration", 0)
if not group_id or not user_id or duration is None:
logger.error("事件 napcat_set_group_ban 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"duration": int(duration)
}
response = await send_handler.send_message_to_napcat(action="set_group_ban", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_ban 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetEssenceMsgListHandler(BaseEventHandler):
handler_name: str = "napcat_get_essence_msg_list_handler"
handler_description: str = "获取群精华消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_ESSENCE_MSG_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_essence_msg_list 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_essence_msg_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_essence_msg_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupWholeBanHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_whole_ban_handler"
handler_description: str = "全体禁言"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_WHOLE_BAN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
enable = params.get("enable", True)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
enable = raw.get("enable", True)
if not group_id or enable is None:
logger.error("事件 napcat_set_group_whole_ban 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"enable": bool(enable)
}
response = await send_handler.send_message_to_napcat(action="set_group_whole_ban", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_whole_ban 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupPortraitHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_portrait_handler"
handler_description: str = "设置群头像"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_PORTRAINT]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
file_path = params.get("file", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
file_path = raw.get("file", "")
if not group_id or not file_path:
logger.error("事件 napcat_set_group_portrait 缺少必要参数: group_id 或 file")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"file": file_path
}
response = await send_handler.send_message_to_napcat(action="set_group_portrait", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_portrait 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupAdminHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_admin_handler"
handler_description: str = "设置群管理"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADMIN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
enable = params.get("enable", True)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
enable = raw.get("enable", True)
if not group_id or not user_id or enable is None:
logger.error("事件 napcat_set_group_admin 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"enable": bool(enable)
}
response = await send_handler.send_message_to_napcat(action="set_group_admin", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_admin 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupCardHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_card_handler"
handler_description: str = "设置群成员名片"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_CARD]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
card = params.get("card", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
card = raw.get("card", "")
if not group_id or not user_id:
logger.error("事件 napcat_set_group_card 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id)
}
if card:
payload["card"] = card
response = await send_handler.send_message_to_napcat(action="group_card", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_card 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetEssenceMsgHandler(BaseEventHandler):
handler_name: str = "napcat_set_essence_msg_handler"
handler_description: str = "设置群精华消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_ESSENCE_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_set_essence_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="set_essence_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_essence_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupNameHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_name_handler"
handler_description: str = "设置群名"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_NAME]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
group_name = params.get("group_name", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
group_name = raw.get("group_name", "")
if not group_id or not group_name:
logger.error("事件 napcat_set_group_name 缺少必要参数: group_id 或 group_name")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"group_name": group_name
}
response = await send_handler.send_message_to_napcat(action="set_group_name", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_name 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class DeleteEssenceMsgHandler(BaseEventHandler):
handler_name: str = "napcat_delete_essence_msg_handler"
handler_description: str = "删除群精华消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.DELETE_ESSENCE_MSG]
async def execute(self, params: dict):
raw = params.get("raw", {})
message_id = params.get("message_id", "")
if params.get("raw", ""):
message_id = raw.get("message_id", "")
if not message_id:
logger.error("事件 napcat_delete_essence_msg 缺少必要参数: message_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"message_id": str(message_id)
}
response = await send_handler.send_message_to_napcat(action="delete_essence_msg", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_delete_essence_msg 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupLeaveHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_leave_handler"
handler_description: str = "退群"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_LEAVE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_set_group_leave 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="set_group_leave", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_leave 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SendGroupNoticeHandler(BaseEventHandler):
handler_name: str = "napcat_send_group_notice_handler"
handler_description: str = "发送群公告"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SEND_GROUP_NOTICE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
content = params.get("content", "")
image = params.get("image", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
content = raw.get("content", "")
image = raw.get("image", "")
if not group_id or not content:
logger.error("事件 napcat_send_group_notice 缺少必要参数: group_id 或 content")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"content": content
}
if image:
payload["image"] = image
response = await send_handler.send_message_to_napcat(action="_send_group_notice", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_send_group_notice 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupSpecialTitleHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_special_title_handler"
handler_description: str = "设置群头衔"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_SPECIAL_TITLE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
special_title = params.get("special_title", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
special_title = raw.get("special_title", "")
if not group_id or not user_id:
logger.error("事件 napcat_set_group_special_title 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id)
}
if special_title:
payload["special_title"] = special_title
response = await send_handler.send_message_to_napcat(action="set_group_special_title", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_special_title 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupNoticeHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_notice_handler"
handler_description: str = "获取群公告"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_NOTICE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_notice 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="_get_group_notice", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_notice 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupAddRequestHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_add_request_handler"
handler_description: str = "处理加群请求"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADD_REQUEST]
async def execute(self, params: dict):
raw = params.get("raw", {})
flag = params.get("flag", "")
approve = params.get("approve", True)
reason = params.get("reason", "")
if params.get("raw", ""):
flag = raw.get("flag", "")
approve = raw.get("approve", True)
reason = raw.get("reason", "")
if not flag or approve is None:
logger.error("事件 napcat_set_group_add_request 缺少必要参数")
return HandlerResult(False, False, {"status": "error"})
payload = {
"flag": flag,
"approve": bool(approve)
}
if reason:
payload["reason"] = reason
response = await send_handler.send_message_to_napcat(action="set_group_add_request", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_add_request 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupListHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_list_handler"
handler_description: str = "获取群列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
no_cache = raw.get("no_cache", False)
payload = {
"no_cache": bool(no_cache)
}
response = await send_handler.send_message_to_napcat(action="get_group_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class DeleteGroupNoticeHandler(BaseEventHandler):
handler_name: str = "napcat_del_group_notice_handler"
handler_description: str = "删除群公告"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.DELETE_GROUP_NOTICE]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
notice_id = params.get("notice_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
notice_id = raw.get("notice_id", "")
if not group_id or not notice_id:
logger.error("事件 napcat_del_group_notice 缺少必要参数: group_id 或 notice_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"notice_id": notice_id
}
response = await send_handler.send_message_to_napcat(action="_del_group_notice", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_del_group_notice 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupMemberInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_member_info_handler"
handler_description: str = "获取群成员信息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_MEMBER_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
user_id = params.get("user_id", "")
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
user_id = raw.get("user_id", "")
no_cache = raw.get("no_cache", False)
if not group_id or not user_id:
logger.error("事件 napcat_get_group_member_info 缺少必要参数: group_id 或 user_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"user_id": str(user_id),
"no_cache": bool(no_cache)
}
response = await send_handler.send_message_to_napcat(action="get_group_member_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_member_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupMemberListHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_member_list_handler"
handler_description: str = "获取群成员列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_MEMBER_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
no_cache = params.get("no_cache", False)
if params.get("raw", ""):
group_id = raw.get("group_id", "")
no_cache = raw.get("no_cache", False)
if not group_id:
logger.error("事件 napcat_get_group_member_list 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id),
"no_cache": bool(no_cache)
}
response = await send_handler.send_message_to_napcat(action="get_group_member_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_member_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupHonorInfoHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_honor_info_handler"
handler_description: str = "获取群荣誉"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_HONOR_INFO]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
type = params.get("type", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
type = raw.get("type", "")
if not group_id:
logger.error("事件 napcat_get_group_honor_info 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
if type:
payload["type"] = type
response = await send_handler.send_message_to_napcat(action="get_group_honor_info", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_honor_info 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupInfoExHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_info_ex_handler"
handler_description: str = "获取群信息ex"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_INFO_EX]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_info_ex 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_info_ex", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_info_ex 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupAtAllRemainHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_at_all_remain_handler"
handler_description: str = "获取群 @全体成员 剩余次数"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_AT_ALL_REMAIN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_at_all_remain 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_at_all_remain", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_at_all_remain 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupShutListHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_shut_list_handler"
handler_description: str = "获取群禁言列表"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_SHUT_LIST]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_get_group_shut_list 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="get_group_shut_list", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_shut_list 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class GetGroupIgnoredNotifiesHandler(BaseEventHandler):
handler_name: str = "napcat_get_group_ignored_notifies_handler"
handler_description: str = "获取群过滤系统消息"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.GET_GROUP_IGNORED_NOTIFIES]
async def execute(self, params: dict):
payload = {}
response = await send_handler.send_message_to_napcat(action="get_group_ignored_notifies", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_get_group_ignored_notifies 请求失败!")
return HandlerResult(False, False, {"status": "error"})
class SetGroupSignHandler(BaseEventHandler):
handler_name: str = "napcat_set_group_sign_handler"
handler_description: str = "群打卡"
weight: int = 100
intercept_message: bool = False
init_subscribe = [NapcatEvent.GROUP.SET_GROUP_SIGN]
async def execute(self, params: dict):
raw = params.get("raw", {})
group_id = params.get("group_id", "")
if params.get("raw", ""):
group_id = raw.get("group_id", "")
if not group_id:
logger.error("事件 napcat_set_group_sign 缺少必要参数: group_id")
return HandlerResult(False, False, {"status": "error"})
payload = {
"group_id": str(group_id)
}
response = await send_handler.send_message_to_napcat(action="set_group_sign", params=payload)
if response.get("status", "") == "ok":
return HandlerResult(True, True, response)
else:
logger.error("事件 napcat_set_group_sign 请求失败!")
return HandlerResult(False, False, {"status": "error"})