from src.plugin_system import BaseEventHandler from src.plugin_system.base.base_event import HandlerResult from .src.send_handler import send_handler from .event_types import NapcatEvent from src.common.logger import get_logger logger = get_logger("napcat_adapter") class SetProfileHandler(BaseEventHandler): handler_name: str = "napcat_set_qq_profile_handler" handler_description: str = "设置账号信息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SET_PROFILE] async def execute(self,params:dict): raw = params.get("raw",{}) nickname = params.get("nickname","") personal_note = params.get("personal_note","") sex = params.get("sex","") if params.get("raw",""): nickname = raw.get("nickname","") personal_note = raw.get("personal_note","") sex = raw.get("sex","") if not nickname: logger.error("事件 napcat_set_qq_profile 缺少必要参数: nickname ") return HandlerResult(False,False,{"status":"error"}) payload = { "nickname": nickname, "personal_note": personal_note, "sex": sex } response = await send_handler.send_message_to_napcat(action="set_qq_profile",params=payload) if response.get("status","") == "ok": if response.get("data","").get("result","") == 0: return HandlerResult(True,True,response) else: logger.error(f"事件 napcat_set_qq_profile 请求失败!err={response.get("data","").get("errMsg","")}") return HandlerResult(False,False,response) else: logger.error("事件 napcat_set_qq_profile 请求失败!") return HandlerResult(False,False,{"status":"error"}) class GetOnlineClientsHandler(BaseEventHandler): handler_name: str = "napcat_get_online_clients_handler" handler_description: str = "获取当前账号在线客户端列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_ONLINE_CLIENTS] async def execute(self, params: dict): raw = params.get("raw", {}) no_cache = params.get("no_cache", False) if params.get("raw", ""): no_cache = raw.get("no_cache", False) payload = { "no_cache": no_cache } response = await send_handler.send_message_to_napcat(action="get_online_clients", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_online_clients 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetOnlineStatusHandler(BaseEventHandler): handler_name: str = "napcat_set_online_status_handler" handler_description: str = "设置在线状态" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SET_ONLINE_STATUS] async def execute(self, params: dict): raw = params.get("raw", {}) status = params.get("status", "") ext_status = params.get("ext_status", "0") battery_status = params.get("battery_status", "0") if params.get("raw", ""): status = raw.get("status", "") ext_status = raw.get("ext_status", "0") battery_status = raw.get("battery_status", "0") if not status: logger.error("事件 napcat_set_online_status 缺少必要参数: status") return HandlerResult(False, False, {"status": "error"}) payload = { "status": status, "ext_status": ext_status, "battery_status": battery_status } response = await send_handler.send_message_to_napcat(action="set_online_status", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_online_status 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetFriendsWithCategoryHandler(BaseEventHandler): handler_name: str = "napcat_get_friends_with_category_handler" handler_description: str = "获取好友分组列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_FRIENDS_WITH_CATEGORY] async def execute(self, params: dict): payload = {} response = await send_handler.send_message_to_napcat(action="get_friends_with_category", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_friends_with_category 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetAvatarHandler(BaseEventHandler): handler_name: str = "napcat_set_qq_avatar_handler" handler_description: str = "设置头像" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SET_AVATAR] async def execute(self, params: dict): raw = params.get("raw", {}) file = params.get("file", "") if params.get("raw", ""): file = raw.get("file", "") if not file: logger.error("事件 napcat_set_qq_avatar 缺少必要参数: file") return HandlerResult(False, False, {"status": "error"}) payload = { "file": file } response = await send_handler.send_message_to_napcat(action="set_qq_avatar", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_qq_avatar 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SendLikeHandler(BaseEventHandler): handler_name: str = "napcat_send_like_handler" handler_description: str = "点赞" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SEND_LIKE] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") times = params.get("times", 1) if params.get("raw", ""): user_id = raw.get("user_id", "") times = raw.get("times", 1) if not user_id: logger.error("事件 napcat_send_like 缺少必要参数: user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id), "times": times } response = await send_handler.send_message_to_napcat(action="send_like", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_send_like 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetFriendAddRequestHandler(BaseEventHandler): handler_name: str = "napcat_set_friend_add_request_handler" handler_description: str = "处理好友请求" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SET_FRIEND_ADD_REQUEST] async def execute(self, params: dict): raw = params.get("raw", {}) flag = params.get("flag", "") approve = params.get("approve", True) remark = params.get("remark", "") if params.get("raw", ""): flag = raw.get("flag", "") approve = raw.get("approve", True) remark = raw.get("remark", "") if not flag or approve is None or remark is None: logger.error("事件 napcat_set_friend_add_request 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "flag": flag, "approve": approve, "remark": remark } response = await send_handler.send_message_to_napcat(action="set_friend_add_request", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_friend_add_request 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetSelfLongnickHandler(BaseEventHandler): handler_name: str = "napcat_set_self_longnick_handler" handler_description: str = "设置个性签名" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SET_SELF_LONGNICK] async def execute(self, params: dict): raw = params.get("raw", {}) longNick = params.get("longNick", "") if params.get("raw", ""): longNick = raw.get("longNick", "") if not longNick: logger.error("事件 napcat_set_self_longnick 缺少必要参数: longNick") return HandlerResult(False, False, {"status": "error"}) payload = { "longNick": longNick } response = await send_handler.send_message_to_napcat(action="set_self_longnick", params=payload) if response.get("status", "") == "ok": if response.get("data", {}).get("result", "") == 0: return HandlerResult(True, True, response) else: logger.error(f"事件 napcat_set_self_longnick 请求失败!err={response.get('data', {}).get('errMsg', '')}") return HandlerResult(False, False, response) else: logger.error("事件 napcat_set_self_longnick 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetLoginInfoHandler(BaseEventHandler): handler_name: str = "napcat_get_login_info_handler" handler_description: str = "获取登录号信息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_LOGIN_INFO] async def execute(self, params: dict): payload = {} response = await send_handler.send_message_to_napcat(action="get_login_info", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_login_info 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetRecentContactHandler(BaseEventHandler): handler_name: str = "napcat_get_recent_contact_handler" handler_description: str = "最近消息列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_RECENT_CONTACT] async def execute(self, params: dict): raw = params.get("raw", {}) count = params.get("count", 20) if params.get("raw", ""): count = raw.get("count", 20) payload = { "count": count } response = await send_handler.send_message_to_napcat(action="get_recent_contact", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_recent_contact 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetStrangerInfoHandler(BaseEventHandler): handler_name: str = "napcat_get_stranger_info_handler" handler_description: str = "获取(指定)账号信息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_STRANGER_INFO] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") if params.get("raw", ""): user_id = raw.get("user_id", "") if not user_id: logger.error("事件 napcat_get_stranger_info 缺少必要参数: user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id) } response = await send_handler.send_message_to_napcat(action="get_stranger_info", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_stranger_info 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetFriendListHandler(BaseEventHandler): handler_name: str = "napcat_get_friend_list_handler" handler_description: str = "获取好友列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_FRIEND_LIST] async def execute(self, params: dict): raw = params.get("raw", {}) no_cache = params.get("no_cache", False) if params.get("raw", ""): no_cache = raw.get("no_cache", False) payload = { "no_cache": no_cache } response = await send_handler.send_message_to_napcat(action="get_friend_list", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_friend_list 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetProfileLikeHandler(BaseEventHandler): handler_name: str = "napcat_get_profile_like_handler" handler_description: str = "获取点赞列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_PROFILE_LIKE] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") start = params.get("start", 0) count = params.get("count", 10) if params.get("raw", ""): user_id = raw.get("user_id", "") start = raw.get("start", 0) count = raw.get("count", 10) payload = { "start": start, "count": count } if user_id: payload["user_id"] = str(user_id) response = await send_handler.send_message_to_napcat(action="get_profile_like", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_profile_like 请求失败!") return HandlerResult(False, False, {"status": "error"}) class DeleteFriendHandler(BaseEventHandler): handler_name: str = "napcat_delete_friend_handler" handler_description: str = "删除好友" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.DELETE_FRIEND] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") temp_block = params.get("temp_block", False) temp_both_del = params.get("temp_both_del", False) if params.get("raw", ""): user_id = raw.get("user_id", "") temp_block = raw.get("temp_block", False) temp_both_del = raw.get("temp_both_del", False) if not user_id or temp_block is None or temp_both_del is None: logger.error("事件 napcat_delete_friend 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id), "temp_block": temp_block, "temp_both_del": temp_both_del } response = await send_handler.send_message_to_napcat(action="delete_friend", params=payload) if response.get("status", "") == "ok": if response.get("data", {}).get("result", "") == 0: return HandlerResult(True, True, response) else: logger.error(f"事件 napcat_delete_friend 请求失败!err={response.get('data', {}).get('errMsg', '')}") return HandlerResult(False, False, response) else: logger.error("事件 napcat_delete_friend 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetUserStatusHandler(BaseEventHandler): handler_name: str = "napcat_get_user_status_handler" handler_description: str = "获取(指定)用户状态" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_USER_STATUS] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") if params.get("raw", ""): user_id = raw.get("user_id", "") if not user_id: logger.error("事件 napcat_get_user_status 缺少必要参数: user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id) } response = await send_handler.send_message_to_napcat(action="get_user_status", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_user_status 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetStatusHandler(BaseEventHandler): handler_name: str = "napcat_get_status_handler" handler_description: str = "获取状态" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_STATUS] async def execute(self, params: dict): payload = {} response = await send_handler.send_message_to_napcat(action="get_status", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_status 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetMiniAppArkHandler(BaseEventHandler): handler_name: str = "napcat_get_mini_app_ark_handler" handler_description: str = "获取小程序卡片" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.GET_MINI_APP_ARK] async def execute(self, params: dict): raw = params.get("raw", {}) type = params.get("type", "") title = params.get("title", "") desc = params.get("desc", "") picUrl = params.get("picUrl", "") jumpUrl = params.get("jumpUrl", "") webUrl = params.get("webUrl", "") rawArkData = params.get("rawArkData", False) if params.get("raw", ""): type = raw.get("type", "") title = raw.get("title", "") desc = raw.get("desc", "") picUrl = raw.get("picUrl", "") jumpUrl = raw.get("jumpUrl", "") webUrl = raw.get("webUrl", "") rawArkData = raw.get("rawArkData", False) if not type or not title or not desc or not picUrl or not jumpUrl: logger.error("事件 napcat_get_mini_app_ark 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "type": type, "title": title, "desc": desc, "picUrl": picUrl, "jumpUrl": jumpUrl, "webUrl": webUrl, "rawArkData": rawArkData } response = await send_handler.send_message_to_napcat(action="get_mini_app_ark", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_mini_app_ark 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetDiyOnlineStatusHandler(BaseEventHandler): handler_name: str = "napcat_set_diy_online_status_handler" handler_description: str = "设置自定义在线状态" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.ACCOUNT.SET_DIY_ONLINE_STATUS] async def execute(self, params: dict): raw = params.get("raw", {}) face_id = params.get("face_id", "") face_type = params.get("face_type", "0") wording = params.get("wording", "") if params.get("raw", ""): face_id = raw.get("face_id", "") face_type = raw.get("face_type", "0") wording = raw.get("wording", "") if not face_id: logger.error("事件 napcat_set_diy_online_status 缺少必要参数: face_id") return HandlerResult(False, False, {"status": "error"}) payload = { "face_id": str(face_id), "face_type": str(face_type), "wording": wording } response = await send_handler.send_message_to_napcat(action="set_diy_online_status", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_diy_online_status 请求失败!") return HandlerResult(False, False, {"status": "error"}) # ===MESSAGE=== class SendPrivateMsgHandler(BaseEventHandler): handler_name: str = "napcat_send_private_msg_handler" handler_description: str = "发送私聊消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.SEND_PRIVATE_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") message = params.get("message", "") if params.get("raw", ""): user_id = raw.get("user_id", "") message = raw.get("message", "") if not user_id or not message: logger.error("事件 napcat_send_private_msg 缺少必要参数: user_id 或 message") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id), "message": message } response = await send_handler.send_message_to_napcat(action="send_private_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_send_private_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SendPokeHandler(BaseEventHandler): handler_name: str = "napcat_send_poke_handler" handler_description: str = "发送戳一戳" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.SEND_POKE] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") group_id = params.get("group_id", None) if params.get("raw", ""): user_id = raw.get("user_id", "") group_id = raw.get("group_id", None) if not user_id: logger.error("事件 napcat_send_poke 缺少必要参数: user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id) } if group_id is not None: payload["group_id"] = str(group_id) response = await send_handler.send_message_to_napcat(action="send_poke", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_send_poke 请求失败!") return HandlerResult(False, False, {"status": "error"}) class DeleteMsgHandler(BaseEventHandler): handler_name: str = "napcat_delete_msg_handler" handler_description: str = "撤回消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.DELETE_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") if params.get("raw", ""): message_id = raw.get("message_id", "") if not message_id: logger.error("事件 napcat_delete_msg 缺少必要参数: message_id") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id) } response = await send_handler.send_message_to_napcat(action="delete_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_delete_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupMsgHistoryHandler(BaseEventHandler): handler_name: str = "napcat_get_group_msg_history_handler" handler_description: str = "获取群历史消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.GET_GROUP_MSG_HISTORY] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") message_seq = params.get("message_seq", 0) count = params.get("count", 20) reverseOrder = params.get("reverseOrder", False) if params.get("raw", ""): group_id = raw.get("group_id", "") message_seq = raw.get("message_seq", 0) count = raw.get("count", 20) reverseOrder = raw.get("reverseOrder", False) if not group_id: logger.error("事件 napcat_get_group_msg_history 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "message_seq": int(message_seq), "count": int(count), "reverseOrder": bool(reverseOrder) } response = await send_handler.send_message_to_napcat(action="get_group_msg_history", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_msg_history 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetMsgHandler(BaseEventHandler): handler_name: str = "napcat_get_msg_handler" handler_description: str = "获取消息详情" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.GET_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") if params.get("raw", ""): message_id = raw.get("message_id", "") if not message_id: logger.error("事件 napcat_get_msg 缺少必要参数: message_id") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id) } response = await send_handler.send_message_to_napcat(action="get_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetForwardMsgHandler(BaseEventHandler): handler_name: str = "napcat_get_forward_msg_handler" handler_description: str = "获取合并转发消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.GET_FORWARD_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") if params.get("raw", ""): message_id = raw.get("message_id", "") if not message_id: logger.error("事件 napcat_get_forward_msg 缺少必要参数: message_id") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id) } response = await send_handler.send_message_to_napcat(action="get_forward_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_forward_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetMsgEmojiLikeHandler(BaseEventHandler): handler_name: str = "napcat_set_msg_emoji_like_handler" handler_description: str = "贴表情" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.SET_MSG_EMOJI_LIKE] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") emoji_id = params.get("emoji_id", 0) set_flag = params.get("set", True) if params.get("raw", ""): message_id = raw.get("message_id", "") emoji_id = raw.get("emoji_id", 0) set_flag = raw.get("set", True) if not message_id or emoji_id is None or set_flag is None: logger.error("事件 napcat_set_msg_emoji_like 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id), "emoji_id": int(emoji_id), "set": bool(set_flag) } response = await send_handler.send_message_to_napcat(action="set_msg_emoji_like", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_msg_emoji_like 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetFriendMsgHistoryHandler(BaseEventHandler): handler_name: str = "napcat_get_friend_msg_history_handler" handler_description: str = "获取好友历史消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.GET_FRIEND_MSG_HISTORY] async def execute(self, params: dict): raw = params.get("raw", {}) user_id = params.get("user_id", "") message_seq = params.get("message_seq", 0) count = params.get("count", 20) reverseOrder = params.get("reverseOrder", False) if params.get("raw", ""): user_id = raw.get("user_id", "") message_seq = raw.get("message_seq", 0) count = raw.get("count", 20) reverseOrder = raw.get("reverseOrder", False) if not user_id: logger.error("事件 napcat_get_friend_msg_history 缺少必要参数: user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "user_id": str(user_id), "message_seq": int(message_seq), "count": int(count), "reverseOrder": bool(reverseOrder) } response = await send_handler.send_message_to_napcat(action="get_friend_msg_history", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_friend_msg_history 请求失败!") return HandlerResult(False, False, {"status": "error"}) class FetchEmojiLikeHandler(BaseEventHandler): handler_name: str = "napcat_fetch_emoji_like_handler" handler_description: str = "获取贴表情详情" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.FETCH_EMOJI_LIKE] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") emoji_id = params.get("emoji_id", "") emoji_type = params.get("emoji_type", "") count = params.get("count", 20) if params.get("raw", ""): message_id = raw.get("message_id", "") emoji_id = raw.get("emoji_id", "") emoji_type = raw.get("emoji_type", "") count = raw.get("count", 20) if not message_id or not emoji_id or not emoji_type: logger.error("事件 napcat_fetch_emoji_like 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id), "emojiId": str(emoji_id), "emojiType": str(emoji_type), "count": int(count) } response = await send_handler.send_message_to_napcat(action="fetch_emoji_like", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_fetch_emoji_like 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SendForwardMsgHandler(BaseEventHandler): handler_name: str = "napcat_send_forward_msg_handler" handler_description: str = "发送合并转发消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.SEND_FORWARD_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) messages = params.get("messages", {}) news = params.get("news", {}) prompt = params.get("prompt", "") summary = params.get("summary", "") source = params.get("source", "") group_id = params.get("group_id", None) user_id = params.get("user_id", None) if params.get("raw", ""): messages = raw.get("messages", {}) news = raw.get("news", {}) prompt = raw.get("prompt", "") summary = raw.get("summary", "") source = raw.get("source", "") group_id = raw.get("group_id", None) user_id = raw.get("user_id", None) if not messages or not news or not prompt or not summary or not source: logger.error("事件 napcat_send_forward_msg 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "messages": messages, "news": news, "prompt": prompt, "summary": summary, "source": source } if group_id is not None: payload["group_id"] = str(group_id) if user_id is not None: payload["user_id"] = str(user_id) response = await send_handler.send_message_to_napcat(action="send_forward_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_send_forward_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SendGroupAiRecordHandler(BaseEventHandler): handler_name: str = "napcat_send_group_ai_record_handler" handler_description: str = "发送群AI语音" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.MESSAGE.SEND_GROUP_AI_RECORD] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") character = params.get("character", "") text = params.get("text", "") if params.get("raw", ""): group_id = raw.get("group_id", "") character = raw.get("character", "") text = raw.get("text", "") if not group_id or not character or not text: logger.error("事件 napcat_send_group_ai_record 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "character": character, "text": text } response = await send_handler.send_message_to_napcat(action="send_group_ai_record", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_send_group_ai_record 请求失败!") return HandlerResult(False, False, {"status": "error"}) # ===GROUP=== class GetGroupInfoHandler(BaseEventHandler): handler_name: str = "napcat_get_group_info_handler" handler_description: str = "获取群信息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_INFO] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_get_group_info 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="get_group_info", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_info 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupAddOptionHandler(BaseEventHandler): handler_name: str = "napcat_set_group_add_option_handler" handler_description: str = "设置群添加选项" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADD_OPTION] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") add_type = params.get("add_type", "") group_question = params.get("group_question", "") group_answer = params.get("group_answer", "") if params.get("raw", ""): group_id = raw.get("group_id", "") add_type = raw.get("add_type", "") group_question = raw.get("group_question", "") group_answer = raw.get("group_answer", "") if not group_id or not add_type: logger.error("事件 napcat_set_group_add_option 缺少必要参数: group_id 或 add_type") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "add_type": str(add_type) } if group_question: payload["group_question"] = group_question if group_answer: payload["group_answer"] = group_answer response = await send_handler.send_message_to_napcat(action="set_group_add_option", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_add_option 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupKickMembersHandler(BaseEventHandler): handler_name: str = "napcat_set_group_kick_members_handler" handler_description: str = "批量踢出群成员" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_KICK_MEMBERS] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", []) reject_add_request = params.get("reject_add_request", False) if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", []) reject_add_request = raw.get("reject_add_request", False) if not group_id or not user_id: logger.error("事件 napcat_set_group_kick_members 缺少必要参数: group_id 或 user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": user_id, "reject_add_request": bool(reject_add_request) } response = await send_handler.send_message_to_napcat(action="set_group_kick_members", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_kick_members 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupRemarkHandler(BaseEventHandler): handler_name: str = "napcat_set_group_remark_handler" handler_description: str = "设置群备注" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_REMARK] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") remark = params.get("remark", "") if params.get("raw", ""): group_id = raw.get("group_id", "") remark = raw.get("remark", "") if not group_id or not remark: logger.error("事件 napcat_set_group_remark 缺少必要参数: group_id 或 remark") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "remark": remark } response = await send_handler.send_message_to_napcat(action="set_group_remark", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_remark 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupKickHandler(BaseEventHandler): handler_name: str = "napcat_set_group_kick_handler" handler_description: str = "群踢人" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_KICK] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", "") reject_add_request = params.get("reject_add_request", False) if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", "") reject_add_request = raw.get("reject_add_request", False) if not group_id or not user_id: logger.error("事件 napcat_set_group_kick 缺少必要参数: group_id 或 user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": str(user_id), "reject_add_request": bool(reject_add_request) } response = await send_handler.send_message_to_napcat(action="set_group_kick", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_kick 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupSystemMsgHandler(BaseEventHandler): handler_name: str = "napcat_get_group_system_msg_handler" handler_description: str = "获取群系统消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_SYSTEM_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) count = params.get("count", 20) if params.get("raw", ""): count = raw.get("count", 20) if count is None: logger.error("事件 napcat_get_group_system_msg 缺少必要参数: count") return HandlerResult(False, False, {"status": "error"}) payload = { "count": int(count) } response = await send_handler.send_message_to_napcat(action="get_group_system_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_system_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupBanHandler(BaseEventHandler): handler_name: str = "napcat_set_group_ban_handler" handler_description: str = "群禁言" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_BAN] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", "") duration = params.get("duration", 0) if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", "") duration = raw.get("duration", 0) if not group_id or not user_id or duration is None: logger.error("事件 napcat_set_group_ban 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": str(user_id), "duration": int(duration) } response = await send_handler.send_message_to_napcat(action="set_group_ban", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_ban 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetEssenceMsgListHandler(BaseEventHandler): handler_name: str = "napcat_get_essence_msg_list_handler" handler_description: str = "获取群精华消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_ESSENCE_MSG_LIST] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_get_essence_msg_list 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="get_essence_msg_list", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_essence_msg_list 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupWholeBanHandler(BaseEventHandler): handler_name: str = "napcat_set_group_whole_ban_handler" handler_description: str = "全体禁言" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_WHOLE_BAN] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") enable = params.get("enable", True) if params.get("raw", ""): group_id = raw.get("group_id", "") enable = raw.get("enable", True) if not group_id or enable is None: logger.error("事件 napcat_set_group_whole_ban 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "enable": bool(enable) } response = await send_handler.send_message_to_napcat(action="set_group_whole_ban", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_whole_ban 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupPortraitHandler(BaseEventHandler): handler_name: str = "napcat_set_group_portrait_handler" handler_description: str = "设置群头像" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_PORTRAINT] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") file_path = params.get("file", "") if params.get("raw", ""): group_id = raw.get("group_id", "") file_path = raw.get("file", "") if not group_id or not file_path: logger.error("事件 napcat_set_group_portrait 缺少必要参数: group_id 或 file") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "file": file_path } response = await send_handler.send_message_to_napcat(action="set_group_portrait", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_portrait 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupAdminHandler(BaseEventHandler): handler_name: str = "napcat_set_group_admin_handler" handler_description: str = "设置群管理" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADMIN] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", "") enable = params.get("enable", True) if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", "") enable = raw.get("enable", True) if not group_id or not user_id or enable is None: logger.error("事件 napcat_set_group_admin 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": str(user_id), "enable": bool(enable) } response = await send_handler.send_message_to_napcat(action="set_group_admin", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_admin 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupCardHandler(BaseEventHandler): handler_name: str = "napcat_set_group_card_handler" handler_description: str = "设置群成员名片" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_CARD] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", "") card = params.get("card", "") if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", "") card = raw.get("card", "") if not group_id or not user_id: logger.error("事件 napcat_set_group_card 缺少必要参数: group_id 或 user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": str(user_id) } if card: payload["card"] = card response = await send_handler.send_message_to_napcat(action="group_card", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_card 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetEssenceMsgHandler(BaseEventHandler): handler_name: str = "napcat_set_essence_msg_handler" handler_description: str = "设置群精华消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_ESSENCE_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") if params.get("raw", ""): message_id = raw.get("message_id", "") if not message_id: logger.error("事件 napcat_set_essence_msg 缺少必要参数: message_id") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id) } response = await send_handler.send_message_to_napcat(action="set_essence_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_essence_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupNameHandler(BaseEventHandler): handler_name: str = "napcat_set_group_name_handler" handler_description: str = "设置群名" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_NAME] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") group_name = params.get("group_name", "") if params.get("raw", ""): group_id = raw.get("group_id", "") group_name = raw.get("group_name", "") if not group_id or not group_name: logger.error("事件 napcat_set_group_name 缺少必要参数: group_id 或 group_name") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "group_name": group_name } response = await send_handler.send_message_to_napcat(action="set_group_name", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_name 请求失败!") return HandlerResult(False, False, {"status": "error"}) class DeleteEssenceMsgHandler(BaseEventHandler): handler_name: str = "napcat_delete_essence_msg_handler" handler_description: str = "删除群精华消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.DELETE_ESSENCE_MSG] async def execute(self, params: dict): raw = params.get("raw", {}) message_id = params.get("message_id", "") if params.get("raw", ""): message_id = raw.get("message_id", "") if not message_id: logger.error("事件 napcat_delete_essence_msg 缺少必要参数: message_id") return HandlerResult(False, False, {"status": "error"}) payload = { "message_id": str(message_id) } response = await send_handler.send_message_to_napcat(action="delete_essence_msg", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_delete_essence_msg 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupLeaveHandler(BaseEventHandler): handler_name: str = "napcat_set_group_leave_handler" handler_description: str = "退群" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_LEAVE] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_set_group_leave 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="set_group_leave", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_leave 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SendGroupNoticeHandler(BaseEventHandler): handler_name: str = "napcat_send_group_notice_handler" handler_description: str = "发送群公告" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SEND_GROUP_NOTICE] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") content = params.get("content", "") image = params.get("image", "") if params.get("raw", ""): group_id = raw.get("group_id", "") content = raw.get("content", "") image = raw.get("image", "") if not group_id or not content: logger.error("事件 napcat_send_group_notice 缺少必要参数: group_id 或 content") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "content": content } if image: payload["image"] = image response = await send_handler.send_message_to_napcat(action="_send_group_notice", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_send_group_notice 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupSpecialTitleHandler(BaseEventHandler): handler_name: str = "napcat_set_group_special_title_handler" handler_description: str = "设置群头衔" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_SPECIAL_TITLE] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", "") special_title = params.get("special_title", "") if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", "") special_title = raw.get("special_title", "") if not group_id or not user_id: logger.error("事件 napcat_set_group_special_title 缺少必要参数: group_id 或 user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": str(user_id) } if special_title: payload["special_title"] = special_title response = await send_handler.send_message_to_napcat(action="set_group_special_title", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_special_title 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupNoticeHandler(BaseEventHandler): handler_name: str = "napcat_get_group_notice_handler" handler_description: str = "获取群公告" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_NOTICE] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_get_group_notice 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="_get_group_notice", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_notice 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupAddRequestHandler(BaseEventHandler): handler_name: str = "napcat_set_group_add_request_handler" handler_description: str = "处理加群请求" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_ADD_REQUEST] async def execute(self, params: dict): raw = params.get("raw", {}) flag = params.get("flag", "") approve = params.get("approve", True) reason = params.get("reason", "") if params.get("raw", ""): flag = raw.get("flag", "") approve = raw.get("approve", True) reason = raw.get("reason", "") if not flag or approve is None: logger.error("事件 napcat_set_group_add_request 缺少必要参数") return HandlerResult(False, False, {"status": "error"}) payload = { "flag": flag, "approve": bool(approve) } if reason: payload["reason"] = reason response = await send_handler.send_message_to_napcat(action="set_group_add_request", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_add_request 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupListHandler(BaseEventHandler): handler_name: str = "napcat_get_group_list_handler" handler_description: str = "获取群列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_LIST] async def execute(self, params: dict): raw = params.get("raw", {}) no_cache = params.get("no_cache", False) if params.get("raw", ""): no_cache = raw.get("no_cache", False) payload = { "no_cache": bool(no_cache) } response = await send_handler.send_message_to_napcat(action="get_group_list", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_list 请求失败!") return HandlerResult(False, False, {"status": "error"}) class DeleteGroupNoticeHandler(BaseEventHandler): handler_name: str = "napcat_del_group_notice_handler" handler_description: str = "删除群公告" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.DELETE_GROUP_NOTICE] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") notice_id = params.get("notice_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") notice_id = raw.get("notice_id", "") if not group_id or not notice_id: logger.error("事件 napcat_del_group_notice 缺少必要参数: group_id 或 notice_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "notice_id": notice_id } response = await send_handler.send_message_to_napcat(action="_del_group_notice", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_del_group_notice 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupMemberInfoHandler(BaseEventHandler): handler_name: str = "napcat_get_group_member_info_handler" handler_description: str = "获取群成员信息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_MEMBER_INFO] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") user_id = params.get("user_id", "") no_cache = params.get("no_cache", False) if params.get("raw", ""): group_id = raw.get("group_id", "") user_id = raw.get("user_id", "") no_cache = raw.get("no_cache", False) if not group_id or not user_id: logger.error("事件 napcat_get_group_member_info 缺少必要参数: group_id 或 user_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "user_id": str(user_id), "no_cache": bool(no_cache) } response = await send_handler.send_message_to_napcat(action="get_group_member_info", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_member_info 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupMemberListHandler(BaseEventHandler): handler_name: str = "napcat_get_group_member_list_handler" handler_description: str = "获取群成员列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_MEMBER_LIST] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") no_cache = params.get("no_cache", False) if params.get("raw", ""): group_id = raw.get("group_id", "") no_cache = raw.get("no_cache", False) if not group_id: logger.error("事件 napcat_get_group_member_list 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id), "no_cache": bool(no_cache) } response = await send_handler.send_message_to_napcat(action="get_group_member_list", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_member_list 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupHonorInfoHandler(BaseEventHandler): handler_name: str = "napcat_get_group_honor_info_handler" handler_description: str = "获取群荣誉" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_HONOR_INFO] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") type = params.get("type", "") if params.get("raw", ""): group_id = raw.get("group_id", "") type = raw.get("type", "") if not group_id: logger.error("事件 napcat_get_group_honor_info 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } if type: payload["type"] = type response = await send_handler.send_message_to_napcat(action="get_group_honor_info", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_honor_info 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupInfoExHandler(BaseEventHandler): handler_name: str = "napcat_get_group_info_ex_handler" handler_description: str = "获取群信息ex" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_INFO_EX] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_get_group_info_ex 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="get_group_info_ex", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_info_ex 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupAtAllRemainHandler(BaseEventHandler): handler_name: str = "napcat_get_group_at_all_remain_handler" handler_description: str = "获取群 @全体成员 剩余次数" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_AT_ALL_REMAIN] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_get_group_at_all_remain 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="get_group_at_all_remain", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_at_all_remain 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupShutListHandler(BaseEventHandler): handler_name: str = "napcat_get_group_shut_list_handler" handler_description: str = "获取群禁言列表" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_SHUT_LIST] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_get_group_shut_list 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="get_group_shut_list", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_shut_list 请求失败!") return HandlerResult(False, False, {"status": "error"}) class GetGroupIgnoredNotifiesHandler(BaseEventHandler): handler_name: str = "napcat_get_group_ignored_notifies_handler" handler_description: str = "获取群过滤系统消息" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.GET_GROUP_IGNORED_NOTIFIES] async def execute(self, params: dict): payload = {} response = await send_handler.send_message_to_napcat(action="get_group_ignored_notifies", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_get_group_ignored_notifies 请求失败!") return HandlerResult(False, False, {"status": "error"}) class SetGroupSignHandler(BaseEventHandler): handler_name: str = "napcat_set_group_sign_handler" handler_description: str = "群打卡" weight: int = 100 intercept_message: bool = False init_subscribe = [NapcatEvent.GROUP.SET_GROUP_SIGN] async def execute(self, params: dict): raw = params.get("raw", {}) group_id = params.get("group_id", "") if params.get("raw", ""): group_id = raw.get("group_id", "") if not group_id: logger.error("事件 napcat_set_group_sign 缺少必要参数: group_id") return HandlerResult(False, False, {"status": "error"}) payload = { "group_id": str(group_id) } response = await send_handler.send_message_to_napcat(action="set_group_sign", params=payload) if response.get("status", "") == "ok": return HandlerResult(True, True, response) else: logger.error("事件 napcat_set_group_sign 请求失败!") return HandlerResult(False, False, {"status": "error"})