event重构暂时完成,待测试
This commit is contained in:
@@ -97,6 +97,13 @@ class CycleProcessor:
|
||||
if not skip_planner:
|
||||
plan_result, target_message = await self.action_planner.plan(mode=self.context.loop_mode)
|
||||
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
from src.plugin_system.base.component_types import EventType
|
||||
# 触发 ON_PLAN 事件
|
||||
result = await event_manager.trigger_event(EventType.ON_PLAN, stream_id=self.chat_stream.stream_id)
|
||||
if result and not result.all_continue_process():
|
||||
return
|
||||
|
||||
action_result = plan_result.get("action_result", {}) if isinstance(plan_result, dict) else {}
|
||||
if not isinstance(action_result, dict):
|
||||
action_result = {}
|
||||
|
||||
@@ -13,7 +13,7 @@ from src.chat.message_receive.message import MessageRecv, MessageRecvS4U
|
||||
from src.chat.message_receive.storage import MessageStorage
|
||||
from src.chat.heart_flow.heartflow_message_processor import HeartFCMessageReceiver
|
||||
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
|
||||
from src.plugin_system.core import component_registry, events_manager, global_announcement_manager
|
||||
from src.plugin_system.core import component_registry, event_manager, global_announcement_manager
|
||||
from src.plugin_system.base import BaseCommand, EventType
|
||||
from src.mais4u.mais4u_chat.s4u_msg_processor import S4UMessageProcessor
|
||||
|
||||
@@ -303,8 +303,9 @@ class ChatBot:
|
||||
logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}")
|
||||
return
|
||||
|
||||
if not await events_manager.handle_mai_events(EventType.ON_MESSAGE, message):
|
||||
return
|
||||
result = await event_manager.trigger_event(EventType.ON_MESSAGE,message=message)
|
||||
if not result.all_continue_process():
|
||||
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers','')}于消息到达时取消了消息处理")
|
||||
|
||||
# 确认从接口发来的message是否有自定义的prompt模板信息
|
||||
if message.message_info.template_info and not message.message_info.template_info.template_default:
|
||||
|
||||
@@ -366,13 +366,12 @@ class DefaultReplyer:
|
||||
if not prompt:
|
||||
logger.warning("构建prompt失败,跳过回复生成")
|
||||
return False, None, None
|
||||
from src.plugin_system.core.events_manager import events_manager
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
|
||||
if not from_plugin:
|
||||
if not await events_manager.handle_mai_events(
|
||||
EventType.POST_LLM, None, prompt, None, stream_id=stream_id
|
||||
):
|
||||
raise UserWarning("插件于请求前中断了内容生成")
|
||||
result = await event_manager.trigger_event(EventType.POST_LLM,prompt=prompt,llm_response=llm_response,stream_id=stream_id)
|
||||
if not result.all_continue_process():
|
||||
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于请求前中断了内容生成")
|
||||
|
||||
# 4. 调用 LLM 生成回复
|
||||
content = None
|
||||
@@ -388,10 +387,11 @@ class DefaultReplyer:
|
||||
"model": model_name,
|
||||
"tool_calls": tool_call,
|
||||
}
|
||||
if not from_plugin and not await events_manager.handle_mai_events(
|
||||
EventType.AFTER_LLM, None, prompt, llm_response, stream_id=stream_id
|
||||
):
|
||||
raise UserWarning("插件于请求后取消了内容生成")
|
||||
# 触发 AFTER_LLM 事件
|
||||
if not from_plugin:
|
||||
result = await event_manager.trigger_event(EventType.AFTER_LLM,prompt=prompt,llm_response=llm_response,stream_id=stream_id)
|
||||
if not result.all_continue_process():
|
||||
raise UserWarning(f"插件{result.get_summary().get("stopped_handlers","")}于请求后取消了内容生成")
|
||||
except UserWarning as e:
|
||||
raise e
|
||||
except Exception as llm_e:
|
||||
|
||||
10
src/main.py
10
src/main.py
@@ -19,6 +19,8 @@ from src.mood.mood_manager import mood_manager
|
||||
from rich.traceback import install
|
||||
from src.manager.schedule_manager import schedule_manager
|
||||
from src.schedule.monthly_plan_manager import MonthlyPlanManager
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
from src.plugin_system.base.component_types import EventType
|
||||
# from src.api.main import start_api_server
|
||||
|
||||
# 导入新的插件管理器和热重载管理器
|
||||
@@ -147,6 +149,9 @@ MaiMbot-Pro-Max(第三方修改版)
|
||||
# 添加统计信息输出任务
|
||||
await async_task_manager.add_task(StatisticOutputTask())
|
||||
|
||||
# 注册默认事件
|
||||
event_manager.init_default_events()
|
||||
|
||||
# 初始化权限管理器
|
||||
from src.plugin_system.core.permission_manager import PermissionManager
|
||||
from src.plugin_system.apis.permission_api import permission_api
|
||||
@@ -161,8 +166,10 @@ MaiMbot-Pro-Max(第三方修改版)
|
||||
# 加载所有actions,包括默认的和插件的
|
||||
plugin_manager.load_all_plugins()
|
||||
|
||||
# 启动插件热重载系统
|
||||
# 处理所有缓存的事件订阅(插件加载完成后)
|
||||
event_manager.process_all_pending_subscriptions()
|
||||
|
||||
# 启动插件热重载系统
|
||||
hot_reload_manager.start()
|
||||
|
||||
# 初始化表情管理器
|
||||
@@ -225,6 +232,7 @@ MaiMbot-Pro-Max(第三方修改版)
|
||||
|
||||
|
||||
try:
|
||||
await event_manager.trigger_event(EventType.ON_START)
|
||||
init_time = int(1000 * (time.time() - init_start_time))
|
||||
logger.info(f"初始化完成,神经元放电{init_time}次")
|
||||
except Exception as e:
|
||||
|
||||
@@ -72,7 +72,7 @@ class BaseTool(ABC):
|
||||
"""
|
||||
raise NotImplementedError("子类必须实现execute方法")
|
||||
|
||||
async def direct_execute(self, **function_args: dict[str, Any]) -> dict[str, Any]:
|
||||
async def direct_execute(self, **kwargs: dict[str, Any]) -> dict[str, Any]:
|
||||
"""直接执行工具函数(供插件调用)
|
||||
通过该方法,插件可以直接调用工具,而不需要传入字典格式的参数
|
||||
插件可以直接调用此方法,用更加明了的方式传入参数
|
||||
@@ -88,10 +88,10 @@ class BaseTool(ABC):
|
||||
"""
|
||||
parameter_required = [param[0] for param in self.parameters if param[3]] # 获取所有必填参数名
|
||||
for param_name in parameter_required:
|
||||
if param_name not in function_args:
|
||||
if param_name not in kwargs:
|
||||
raise ValueError(f"工具类 {self.__class__.__name__} 缺少必要参数: {param_name}")
|
||||
|
||||
return await self.execute(function_args)
|
||||
return await self.execute(kwargs)
|
||||
|
||||
def get_config(self, key: str, default=None):
|
||||
"""获取插件配置值,使用嵌套键访问
|
||||
|
||||
@@ -12,9 +12,9 @@ class ComponentType(Enum):
|
||||
|
||||
ACTION = "action" # 动作组件
|
||||
COMMAND = "command" # 命令组件
|
||||
TOOL = "tool" # 服务组件(预留)
|
||||
TOOL = "tool" # 工具组件
|
||||
SCHEDULER = "scheduler" # 定时任务组件(预留)
|
||||
EVENT_HANDLER = "event_handler" # 事件处理组件(预留)
|
||||
EVENT_HANDLER = "event_handler" # 事件处理组件
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
@@ -188,6 +188,32 @@ class EventHandlerInfo(ComponentInfo):
|
||||
super().__post_init__()
|
||||
self.component_type = ComponentType.EVENT_HANDLER
|
||||
|
||||
@dataclass
|
||||
class EventInfo(ComponentInfo):
|
||||
"""事件组件信息"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
self.component_type = ComponentType.EVENT
|
||||
|
||||
# 事件类型枚举
|
||||
class EventType(Enum):
|
||||
"""
|
||||
事件类型枚举类
|
||||
"""
|
||||
|
||||
ON_START = "on_start" # 启动事件,用于调用按时任务
|
||||
ON_STOP = "on_stop" # 停止事件,用于调用按时任务
|
||||
ON_MESSAGE = "on_message"
|
||||
ON_PLAN = "on_plan"
|
||||
POST_LLM = "post_llm"
|
||||
AFTER_LLM = "after_llm"
|
||||
POST_SEND = "post_send"
|
||||
AFTER_SEND = "after_send"
|
||||
UNKNOWN = "unknown" # 未知事件类型
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
@dataclass
|
||||
class PluginInfo:
|
||||
|
||||
@@ -6,14 +6,14 @@
|
||||
|
||||
from src.plugin_system.core.plugin_manager import plugin_manager
|
||||
from src.plugin_system.core.component_registry import component_registry
|
||||
from src.plugin_system.core.events_manager import events_manager
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
from src.plugin_system.core.global_announcement_manager import global_announcement_manager
|
||||
from src.plugin_system.core.plugin_hot_reload import hot_reload_manager
|
||||
|
||||
__all__ = [
|
||||
"plugin_manager",
|
||||
"component_registry",
|
||||
"events_manager",
|
||||
"event_manager",
|
||||
"global_announcement_manager",
|
||||
"hot_reload_manager",
|
||||
]
|
||||
|
||||
@@ -10,12 +10,14 @@ from src.plugin_system.base.component_types import (
|
||||
CommandInfo,
|
||||
EventHandlerInfo,
|
||||
PluginInfo,
|
||||
EventInfo,
|
||||
ComponentType,
|
||||
)
|
||||
from src.plugin_system.base.base_command import BaseCommand
|
||||
from src.plugin_system.base.base_action import BaseAction
|
||||
from src.plugin_system.base.base_tool import BaseTool
|
||||
from src.plugin_system.base.base_events_handler import BaseEventHandler
|
||||
from src.plugin_system.base.base_event import BaseEvent
|
||||
|
||||
logger = get_logger("component_registry")
|
||||
|
||||
@@ -220,14 +222,6 @@ class ComponentRegistry:
|
||||
logger.warning(f"EventHandler组件 {handler_name} 未启用")
|
||||
return True # 未启用,但是也是注册成功
|
||||
|
||||
from .events_manager import events_manager # 延迟导入防止循环导入问题
|
||||
|
||||
if events_manager.register_event_subscriber(handler_info, handler_class):
|
||||
self._enabled_event_handlers[handler_name] = handler_class
|
||||
return True
|
||||
else:
|
||||
logger.error(f"注册事件处理器 {handler_name} 失败")
|
||||
return False
|
||||
|
||||
# === 组件移除相关 ===
|
||||
|
||||
@@ -261,12 +255,14 @@ class ComponentRegistry:
|
||||
|
||||
case ComponentType.EVENT_HANDLER:
|
||||
# 移除EventHandler注册和事件订阅
|
||||
from .events_manager import events_manager # 延迟导入防止循环导入问题
|
||||
from .event_manager import event_manager # 延迟导入防止循环导入问题
|
||||
|
||||
self._event_handler_registry.pop(component_name, None)
|
||||
self._enabled_event_handlers.pop(component_name, None)
|
||||
try:
|
||||
await events_manager.unregister_event_subscriber(component_name)
|
||||
handler = event_manager.get_event_handler(component_name)
|
||||
for event in handler.subscribed_events:
|
||||
await event_manager.unsubscribe_handler_from_event(event, component_name)
|
||||
logger.debug(f"已移除EventHandler组件: {component_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"移除EventHandler事件订阅时出错: {e}")
|
||||
@@ -336,9 +332,9 @@ class ComponentRegistry:
|
||||
assert isinstance(target_component_info, EventHandlerInfo)
|
||||
assert issubclass(target_component_class, BaseEventHandler)
|
||||
self._enabled_event_handlers[component_name] = target_component_class
|
||||
from .events_manager import events_manager # 延迟导入防止循环导入问题
|
||||
from .event_manager import event_manager # 延迟导入防止循环导入问题
|
||||
event_manager.register_event_handler(component_name)
|
||||
|
||||
events_manager.register_event_subscriber(target_component_info, target_component_class)
|
||||
namespaced_name = f"{component_type}.{component_name}"
|
||||
self._components[namespaced_name].enabled = True
|
||||
self._components_by_type[component_type][component_name].enabled = True
|
||||
@@ -369,9 +365,12 @@ class ComponentRegistry:
|
||||
self._llm_available_tools.pop(component_name)
|
||||
case ComponentType.EVENT_HANDLER:
|
||||
self._enabled_event_handlers.pop(component_name)
|
||||
from .events_manager import events_manager # 延迟导入防止循环导入问题
|
||||
from .event_manager import event_manager # 延迟导入防止循环导入问题
|
||||
|
||||
handler = event_manager.get_event_handler(component_name)
|
||||
for event in handler.subscribed_events:
|
||||
await event_manager.unsubscribe_handler_from_event(event, component_name)
|
||||
|
||||
await events_manager.unregister_event_subscriber(component_name)
|
||||
self._components[component_name].enabled = False
|
||||
self._components_by_type[component_type][component_name].enabled = False
|
||||
logger.info(f"组件 {component_name} 已禁用")
|
||||
|
||||
@@ -170,13 +170,13 @@ class EventManager:
|
||||
return True
|
||||
|
||||
def get_event_handler(self, handler_name: str) -> Optional[Type[BaseEventHandler]]:
|
||||
"""获取指定事件处理器类
|
||||
"""获取指定事件处理器实例
|
||||
|
||||
Args:
|
||||
handler_name (str): 处理器名称
|
||||
|
||||
Returns:
|
||||
Type[BaseEventHandler]: 处理器类,不存在返回None
|
||||
Type[BaseEventHandler]: 处理器实例,不存在返回None
|
||||
"""
|
||||
return self._event_handlers.get(handler_name)
|
||||
|
||||
|
||||
@@ -1,262 +0,0 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
from typing import List, Dict, Optional, Type, Tuple, Any
|
||||
|
||||
from src.chat.message_receive.message import MessageRecv
|
||||
from src.chat.message_receive.chat_stream import get_chat_manager
|
||||
from src.common.logger import get_logger
|
||||
from src.plugin_system.base.component_types import EventType, EventHandlerInfo, MaiMessages
|
||||
from src.plugin_system.base.base_events_handler import BaseEventHandler
|
||||
from .global_announcement_manager import global_announcement_manager
|
||||
|
||||
logger = get_logger("events_manager")
|
||||
|
||||
|
||||
class EventsManager:
|
||||
def __init__(self):
|
||||
# 有权重的 events 订阅者注册表
|
||||
self._events_subscribers: Dict[EventType, List[BaseEventHandler]] = {event: [] for event in EventType}
|
||||
self._handler_mapping: Dict[str, Type[BaseEventHandler]] = {} # 事件处理器映射表
|
||||
self._handler_tasks: Dict[str, List[asyncio.Task]] = {} # 事件处理器正在处理的任务
|
||||
|
||||
def register_event_subscriber(self, handler_info: EventHandlerInfo, handler_class: Type[BaseEventHandler]) -> bool:
|
||||
"""注册事件处理器
|
||||
|
||||
Args:
|
||||
handler_info (EventHandlerInfo): 事件处理器信息
|
||||
handler_class (Type[BaseEventHandler]): 事件处理器类
|
||||
|
||||
Returns:
|
||||
bool: 是否注册成功
|
||||
"""
|
||||
handler_name = handler_info.name
|
||||
|
||||
if handler_name in self._handler_mapping:
|
||||
logger.warning(f"事件处理器 {handler_name} 已存在,跳过注册")
|
||||
return True
|
||||
|
||||
if not issubclass(handler_class, BaseEventHandler):
|
||||
logger.error(f"类 {handler_class.__name__} 不是 BaseEventHandler 的子类")
|
||||
return False
|
||||
|
||||
self._handler_mapping[handler_name] = handler_class
|
||||
return self._insert_event_handler(handler_class, handler_info)
|
||||
|
||||
async def handle_mai_events(
|
||||
self,
|
||||
event_type: EventType,
|
||||
message: Optional[MessageRecv] = None,
|
||||
llm_prompt: Optional[str] = None,
|
||||
llm_response: Optional[Dict[str, Any]] = None,
|
||||
stream_id: Optional[str] = None,
|
||||
action_usage: Optional[List[str]] = None,
|
||||
) -> bool:
|
||||
"""处理 events"""
|
||||
from src.plugin_system.core import component_registry
|
||||
|
||||
continue_flag = True
|
||||
transformed_message: Optional[MaiMessages] = None
|
||||
if not message:
|
||||
assert stream_id, "如果没有消息,必须提供流ID"
|
||||
if event_type in [EventType.ON_MESSAGE, EventType.ON_PLAN, EventType.POST_LLM, EventType.AFTER_LLM]:
|
||||
transformed_message = self._build_message_from_stream(stream_id, llm_prompt, llm_response)
|
||||
else:
|
||||
transformed_message = self._transform_event_without_message(
|
||||
stream_id, llm_prompt, llm_response, action_usage
|
||||
)
|
||||
else:
|
||||
transformed_message = self._transform_event_message(message, llm_prompt, llm_response)
|
||||
for handler in self._events_subscribers.get(event_type, []):
|
||||
if transformed_message.stream_id:
|
||||
stream_id = transformed_message.stream_id
|
||||
if handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(stream_id):
|
||||
continue
|
||||
handler.set_plugin_config(component_registry.get_plugin_config(handler.plugin_name) or {})
|
||||
if handler.intercept_message:
|
||||
try:
|
||||
success, continue_processing, result = await handler.execute(transformed_message)
|
||||
if not success:
|
||||
logger.error(f"EventHandler {handler.handler_name} 执行失败: {result}")
|
||||
else:
|
||||
logger.debug(f"EventHandler {handler.handler_name} 执行成功: {result}")
|
||||
continue_flag = continue_flag and continue_processing
|
||||
except Exception as e:
|
||||
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}")
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
handler_task = asyncio.create_task(handler.execute(transformed_message))
|
||||
handler_task.add_done_callback(self._task_done_callback)
|
||||
handler_task.set_name(f"{handler.plugin_name}-{handler.handler_name}")
|
||||
if handler.handler_name not in self._handler_tasks:
|
||||
self._handler_tasks[handler.handler_name] = []
|
||||
self._handler_tasks[handler.handler_name].append(handler_task)
|
||||
except Exception as e:
|
||||
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}")
|
||||
continue
|
||||
return continue_flag
|
||||
|
||||
def _insert_event_handler(self, handler_class: Type[BaseEventHandler], handler_info: EventHandlerInfo) -> bool:
|
||||
"""插入事件处理器到对应的事件类型列表中并设置其插件配置"""
|
||||
if handler_class.event_type == EventType.UNKNOWN:
|
||||
logger.error(f"事件处理器 {handler_class.__name__} 的事件类型未知,无法注册")
|
||||
return False
|
||||
|
||||
handler_instance = handler_class()
|
||||
handler_instance.set_plugin_name(handler_info.plugin_name or "unknown")
|
||||
self._events_subscribers[handler_class.event_type].append(handler_instance)
|
||||
self._events_subscribers[handler_class.event_type].sort(key=lambda x: x.weight, reverse=True)
|
||||
|
||||
return True
|
||||
|
||||
def _remove_event_handler_instance(self, handler_class: Type[BaseEventHandler]) -> bool:
|
||||
"""从事件类型列表中移除事件处理器"""
|
||||
display_handler_name = handler_class.handler_name or handler_class.__name__
|
||||
if handler_class.event_type == EventType.UNKNOWN:
|
||||
logger.warning(f"事件处理器 {display_handler_name} 的事件类型未知,不存在于处理器列表中")
|
||||
return False
|
||||
|
||||
handlers = self._events_subscribers[handler_class.event_type]
|
||||
for i, handler in enumerate(handlers):
|
||||
if isinstance(handler, handler_class):
|
||||
del handlers[i]
|
||||
logger.debug(f"事件处理器 {display_handler_name} 已移除")
|
||||
return True
|
||||
|
||||
logger.warning(f"未找到事件处理器 {display_handler_name},无法移除")
|
||||
return False
|
||||
|
||||
def _transform_event_message(
|
||||
self, message: MessageRecv, llm_prompt: Optional[str] = None, llm_response: Optional[Dict[str, Any]] = None
|
||||
) -> MaiMessages:
|
||||
"""转换事件消息格式"""
|
||||
# 直接赋值部分内容
|
||||
transformed_message = MaiMessages(
|
||||
llm_prompt=llm_prompt,
|
||||
llm_response_content=llm_response.get("content") if llm_response else None,
|
||||
llm_response_reasoning=llm_response.get("reasoning") if llm_response else None,
|
||||
llm_response_model=llm_response.get("model") if llm_response else None,
|
||||
llm_response_tool_call=llm_response.get("tool_calls") if llm_response else None,
|
||||
raw_message=message.raw_message,
|
||||
additional_data=message.message_info.additional_config or {},
|
||||
)
|
||||
|
||||
# 消息段处理
|
||||
if message.message_segment.type == "seglist":
|
||||
transformed_message.message_segments = list(message.message_segment.data) # type: ignore
|
||||
else:
|
||||
transformed_message.message_segments = [message.message_segment]
|
||||
|
||||
# stream_id 处理
|
||||
if hasattr(message, "chat_stream") and message.chat_stream:
|
||||
transformed_message.stream_id = message.chat_stream.stream_id
|
||||
|
||||
# 处理后文本
|
||||
transformed_message.plain_text = message.processed_plain_text
|
||||
|
||||
# 基本信息
|
||||
if hasattr(message, "message_info") and message.message_info:
|
||||
if message.message_info.platform:
|
||||
transformed_message.message_base_info["platform"] = message.message_info.platform
|
||||
if message.message_info.group_info:
|
||||
transformed_message.is_group_message = True
|
||||
transformed_message.message_base_info.update(
|
||||
{
|
||||
"group_id": message.message_info.group_info.group_id,
|
||||
"group_name": message.message_info.group_info.group_name,
|
||||
}
|
||||
)
|
||||
if message.message_info.user_info:
|
||||
if not transformed_message.is_group_message:
|
||||
transformed_message.is_private_message = True
|
||||
transformed_message.message_base_info.update(
|
||||
{
|
||||
"user_id": message.message_info.user_info.user_id,
|
||||
"user_cardname": message.message_info.user_info.user_cardname, # 用户群昵称
|
||||
"user_nickname": message.message_info.user_info.user_nickname, # 用户昵称(用户名)
|
||||
}
|
||||
)
|
||||
|
||||
return transformed_message
|
||||
|
||||
def _build_message_from_stream(
|
||||
self, stream_id: str, llm_prompt: Optional[str] = None, llm_response: Optional[Dict[str, Any]] = None
|
||||
) -> MaiMessages:
|
||||
"""从流ID构建消息"""
|
||||
chat_stream = get_chat_manager().get_stream(stream_id)
|
||||
assert chat_stream, f"未找到流ID为 {stream_id} 的聊天流"
|
||||
message = chat_stream.context.get_last_message()
|
||||
return self._transform_event_message(message, llm_prompt, llm_response)
|
||||
|
||||
def _transform_event_without_message(
|
||||
self,
|
||||
stream_id: str,
|
||||
llm_prompt: Optional[str] = None,
|
||||
llm_response: Optional[Dict[str, Any]] = None,
|
||||
action_usage: Optional[List[str]] = None,
|
||||
) -> MaiMessages:
|
||||
"""没有message对象时进行转换"""
|
||||
chat_stream = get_chat_manager().get_stream(stream_id)
|
||||
assert chat_stream, f"未找到流ID为 {stream_id} 的聊天流"
|
||||
return MaiMessages(
|
||||
stream_id=stream_id,
|
||||
llm_prompt=llm_prompt,
|
||||
llm_response_content=(llm_response.get("content") if llm_response else None),
|
||||
llm_response_reasoning=(llm_response.get("reasoning") if llm_response else None),
|
||||
llm_response_model=llm_response.get("model") if llm_response else None,
|
||||
llm_response_tool_call=(llm_response.get("tool_calls") if llm_response else None),
|
||||
is_group_message=(not (not chat_stream.group_info)),
|
||||
is_private_message=(not chat_stream.group_info),
|
||||
action_usage=action_usage,
|
||||
additional_data={"response_is_processed": True},
|
||||
)
|
||||
|
||||
def _task_done_callback(self, task: asyncio.Task[Tuple[bool, bool, str | None]]):
|
||||
"""任务完成回调"""
|
||||
task_name = task.get_name() or "Unknown Task"
|
||||
try:
|
||||
success, _, result = task.result() # 忽略是否继续的标志,因为消息本身未被拦截
|
||||
if success:
|
||||
logger.debug(f"事件处理任务 {task_name} 已成功完成: {result}")
|
||||
else:
|
||||
logger.error(f"事件处理任务 {task_name} 执行失败: {result}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"事件处理任务 {task_name} 发生异常: {e}")
|
||||
finally:
|
||||
with contextlib.suppress(ValueError, KeyError):
|
||||
self._handler_tasks[task_name].remove(task)
|
||||
|
||||
async def cancel_handler_tasks(self, handler_name: str) -> None:
|
||||
tasks_to_be_cancelled = self._handler_tasks.get(handler_name, [])
|
||||
if remaining_tasks := [task for task in tasks_to_be_cancelled if not task.done()]:
|
||||
for task in remaining_tasks:
|
||||
task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=5)
|
||||
logger.info(f"已取消事件处理器 {handler_name} 的所有任务")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"取消事件处理器 {handler_name} 的任务超时,开始强制取消")
|
||||
except Exception as e:
|
||||
logger.error(f"取消事件处理器 {handler_name} 的任务时发生异常: {e}")
|
||||
if handler_name in self._handler_tasks:
|
||||
del self._handler_tasks[handler_name]
|
||||
|
||||
async def unregister_event_subscriber(self, handler_name: str) -> bool:
|
||||
"""取消注册事件处理器"""
|
||||
if handler_name not in self._handler_mapping:
|
||||
logger.warning(f"事件处理器 {handler_name} 不存在,无法取消注册")
|
||||
return False
|
||||
|
||||
await self.cancel_handler_tasks(handler_name)
|
||||
|
||||
handler_class = self._handler_mapping.pop(handler_name)
|
||||
if not self._remove_event_handler_instance(handler_class):
|
||||
return False
|
||||
|
||||
logger.info(f"事件处理器 {handler_name} 已成功取消注册")
|
||||
return True
|
||||
|
||||
|
||||
events_manager = EventsManager()
|
||||
Reference in New Issue
Block a user