refactor(event): 优化AFTER_SEND事件的异步触发逻辑并增强日志记录
This commit is contained in:
@@ -30,14 +30,27 @@ async def send_message(message: MessageSending, show_log=True) -> bool:
|
|||||||
from src.plugin_system.base.component_types import EventType
|
from src.plugin_system.base.component_types import EventType
|
||||||
|
|
||||||
if message.chat_stream:
|
if message.chat_stream:
|
||||||
await event_manager.trigger_event(
|
logger.info(f"[发送完成] 准备触发 AFTER_SEND 事件,stream_id={message.chat_stream.stream_id}")
|
||||||
EventType.AFTER_SEND,
|
|
||||||
permission_group="SYSTEM",
|
# 使用 asyncio.create_task 来异步触发事件,避免阻塞
|
||||||
stream_id=message.chat_stream.stream_id,
|
async def trigger_event_async():
|
||||||
message=message,
|
try:
|
||||||
)
|
logger.info(f"[事件触发] 开始异步触发 AFTER_SEND 事件")
|
||||||
|
await event_manager.trigger_event(
|
||||||
|
EventType.AFTER_SEND,
|
||||||
|
permission_group="SYSTEM",
|
||||||
|
stream_id=message.chat_stream.stream_id,
|
||||||
|
message=message,
|
||||||
|
)
|
||||||
|
logger.info(f"[事件触发] AFTER_SEND 事件触发完成")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[事件触发] 异步触发事件失败: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# 创建异步任务,不等待完成
|
||||||
|
asyncio.create_task(trigger_event_async())
|
||||||
|
logger.info(f"[发送完成] AFTER_SEND 事件已提交到异步任务")
|
||||||
except Exception as event_error:
|
except Exception as event_error:
|
||||||
logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}")
|
logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}", exc_info=True)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|||||||
@@ -34,6 +34,8 @@ class ProactiveThinkingReplyHandler(BaseEventHandler):
|
|||||||
Returns:
|
Returns:
|
||||||
HandlerResult: 处理结果
|
HandlerResult: 处理结果
|
||||||
"""
|
"""
|
||||||
|
logger.info("[事件] ProactiveThinkingReplyHandler 开始执行")
|
||||||
|
|
||||||
if not kwargs:
|
if not kwargs:
|
||||||
return HandlerResult(success=True, continue_process=True, message=None)
|
return HandlerResult(success=True, continue_process=True, message=None)
|
||||||
|
|
||||||
@@ -42,6 +44,8 @@ class ProactiveThinkingReplyHandler(BaseEventHandler):
|
|||||||
logger.warning("Reply事件缺少stream_id参数")
|
logger.warning("Reply事件缺少stream_id参数")
|
||||||
return HandlerResult(success=True, continue_process=True, message=None)
|
return HandlerResult(success=True, continue_process=True, message=None)
|
||||||
|
|
||||||
|
logger.info(f"[事件] 收到 AFTER_SEND 事件,stream_id={stream_id}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from src.config.config import global_config
|
from src.config.config import global_config
|
||||||
|
|
||||||
@@ -56,7 +60,9 @@ class ProactiveThinkingReplyHandler(BaseEventHandler):
|
|||||||
logger.info(f"检测到reply事件,聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复")
|
logger.info(f"检测到reply事件,聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复")
|
||||||
|
|
||||||
# 重置定时任务(这会自动清除暂停标记并创建新任务)
|
# 重置定时任务(这会自动清除暂停标记并创建新任务)
|
||||||
|
logger.debug(f"[事件] 准备调用 schedule_proactive_thinking")
|
||||||
success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
|
success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
|
||||||
|
logger.debug(f"[事件] schedule_proactive_thinking 调用完成,success={success}")
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
if was_paused:
|
if was_paused:
|
||||||
|
|||||||
@@ -330,13 +330,13 @@ class ProactiveThinkingPlanner:
|
|||||||
【最近的聊天记录】
|
【最近的聊天记录】
|
||||||
{context['recent_chat_history']}
|
{context['recent_chat_history']}
|
||||||
{expression_habits}
|
{expression_habits}
|
||||||
请生成一条简短的消息,用于活跃气氛或刷存在感。要求:
|
请生成一条简短的消息,用于水群。要求:
|
||||||
1. 非常简短(5-15字)
|
1. 非常简短(5-15字)
|
||||||
2. 轻松随意,不要有明确的话题或问题
|
2. 轻松随意,不要有明确的话题或问题
|
||||||
3. 可以是:问候、表达心情、随口一句话
|
3. 可以是:问候、表达心情、随口一句话
|
||||||
4. 符合你的人设和当前聊天风格
|
4. 符合你的人设和当前聊天风格
|
||||||
5. 如果有表达方式参考,在合适时自然使用
|
5. 如果有表达方式参考,在合适时自然使用
|
||||||
|
6. 合理参考历史记录
|
||||||
直接输出消息内容,不要解释:"""
|
直接输出消息内容,不要解释:"""
|
||||||
|
|
||||||
else: # throw_topic
|
else: # throw_topic
|
||||||
@@ -438,7 +438,7 @@ async def execute_proactive_thinking(stream_id: str):
|
|||||||
|
|
||||||
config = global_config.proactive_thinking
|
config = global_config.proactive_thinking
|
||||||
|
|
||||||
logger.info(f"开始为聊天流 {stream_id} 执行主动思考")
|
logger.info(f"[主动思考] 开始为聊天流 {stream_id} 执行主动思考")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 0. 前置检查
|
# 0. 前置检查
|
||||||
@@ -453,22 +453,26 @@ async def execute_proactive_thinking(stream_id: str):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# 1. 搜集信息
|
# 1. 搜集信息
|
||||||
|
logger.info(f"[主动思考] 步骤1:搜集上下文信息")
|
||||||
context = await _planner.gather_context(stream_id)
|
context = await _planner.gather_context(stream_id)
|
||||||
if not context:
|
if not context:
|
||||||
logger.warning(f"无法搜集聊天流 {stream_id} 的上下文,跳过本次主动思考")
|
logger.warning(f"[主动思考] 无法搜集聊天流 {stream_id} 的上下文,跳过本次主动思考")
|
||||||
return
|
return
|
||||||
|
logger.info(f"[主动思考] 上下文搜集完成")
|
||||||
|
|
||||||
# 检查兴趣分数阈值
|
# 检查兴趣分数阈值
|
||||||
interest_score = context.get('interest_score', 0.5)
|
interest_score = context.get('interest_score', 0.5)
|
||||||
if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score):
|
if not proactive_thinking_scheduler._check_interest_score_threshold(interest_score):
|
||||||
logger.info(f"聊天流 {stream_id} 兴趣分数不在阈值范围内")
|
logger.info(f"[主动思考] 聊天流 {stream_id} 兴趣分数不在阈值范围内")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. 进行决策
|
# 2. 进行决策
|
||||||
|
logger.info(f"[主动思考] 步骤2:LLM决策")
|
||||||
decision = await _planner.make_decision(context)
|
decision = await _planner.make_decision(context)
|
||||||
if not decision:
|
if not decision:
|
||||||
logger.warning(f"决策失败,跳过本次主动思考")
|
logger.warning(f"[主动思考] 决策失败,跳过本次主动思考")
|
||||||
return
|
return
|
||||||
|
logger.info(f"[主动思考] 决策完成")
|
||||||
|
|
||||||
action = decision.get("action", "do_nothing")
|
action = decision.get("action", "do_nothing")
|
||||||
reasoning = decision.get("reasoning", "无")
|
reasoning = decision.get("reasoning", "无")
|
||||||
@@ -483,13 +487,18 @@ async def execute_proactive_thinking(stream_id: str):
|
|||||||
return
|
return
|
||||||
|
|
||||||
elif action == "simple_bubble":
|
elif action == "simple_bubble":
|
||||||
logger.info(f"决策:简单冒个泡。理由:{reasoning}")
|
logger.info(f"[主动思考] 决策:简单冒个泡。理由:{reasoning}")
|
||||||
|
|
||||||
# 生成简单的消息
|
# 生成简单的消息
|
||||||
|
logger.info(f"[主动思考] 步骤3:生成冒泡回复")
|
||||||
reply = await _planner.generate_reply(context, "simple_bubble")
|
reply = await _planner.generate_reply(context, "simple_bubble")
|
||||||
if reply:
|
if reply:
|
||||||
await send_api.text_to_stream(stream_id=stream_id, text=reply)
|
logger.info(f"[主动思考] 步骤4:发送消息")
|
||||||
logger.info(f"已发送冒泡消息到 {stream_id}")
|
await send_api.text_to_stream(
|
||||||
|
stream_id=stream_id,
|
||||||
|
text=reply,
|
||||||
|
)
|
||||||
|
logger.info(f"[主动思考] 已发送冒泡消息到 {stream_id}")
|
||||||
|
|
||||||
# 增加每日计数
|
# 增加每日计数
|
||||||
proactive_thinking_scheduler._increment_daily_count(stream_id)
|
proactive_thinking_scheduler._increment_daily_count(stream_id)
|
||||||
@@ -497,23 +506,30 @@ async def execute_proactive_thinking(stream_id: str):
|
|||||||
# 更新统计
|
# 更新统计
|
||||||
if config.enable_statistics:
|
if config.enable_statistics:
|
||||||
_update_statistics(stream_id, action)
|
_update_statistics(stream_id, action)
|
||||||
|
|
||||||
|
logger.info(f"[主动思考] simple_bubble 执行完成")
|
||||||
# 冒泡后可以继续主动思考,不需要暂停
|
# 冒泡后可以继续主动思考,不需要暂停
|
||||||
|
|
||||||
elif action == "throw_topic":
|
elif action == "throw_topic":
|
||||||
topic = decision.get("topic", "")
|
topic = decision.get("topic", "")
|
||||||
logger.info(f"决策:抛出话题。理由:{reasoning},话题:{topic}")
|
logger.info(f"[主动思考] 决策:抛出话题。理由:{reasoning},话题:{topic}")
|
||||||
|
|
||||||
if not topic:
|
if not topic:
|
||||||
logger.warning("选择了抛出话题但未提供话题内容,降级为冒泡")
|
logger.warning("[主动思考] 选择了抛出话题但未提供话题内容,降级为冒泡")
|
||||||
|
logger.info(f"[主动思考] 步骤3:生成降级冒泡回复")
|
||||||
reply = await _planner.generate_reply(context, "simple_bubble")
|
reply = await _planner.generate_reply(context, "simple_bubble")
|
||||||
else:
|
else:
|
||||||
# 生成基于话题的消息
|
# 生成基于话题的消息
|
||||||
|
logger.info(f"[主动思考] 步骤3:生成话题回复")
|
||||||
reply = await _planner.generate_reply(context, "throw_topic", topic)
|
reply = await _planner.generate_reply(context, "throw_topic", topic)
|
||||||
|
|
||||||
if reply:
|
if reply:
|
||||||
await send_api.text_to_stream(stream_id=stream_id, text=reply)
|
logger.info(f"[主动思考] 步骤4:发送消息")
|
||||||
logger.info(f"已发送话题消息到 {stream_id}")
|
await send_api.text_to_stream(
|
||||||
|
stream_id=stream_id,
|
||||||
|
text=reply,
|
||||||
|
)
|
||||||
|
logger.info(f"[主动思考] 已发送话题消息到 {stream_id}")
|
||||||
|
|
||||||
# 增加每日计数
|
# 增加每日计数
|
||||||
proactive_thinking_scheduler._increment_daily_count(stream_id)
|
proactive_thinking_scheduler._increment_daily_count(stream_id)
|
||||||
@@ -524,15 +540,13 @@ async def execute_proactive_thinking(stream_id: str):
|
|||||||
|
|
||||||
# 抛出话题后暂停主动思考(如果配置了冷却时间)
|
# 抛出话题后暂停主动思考(如果配置了冷却时间)
|
||||||
if config.topic_throw_cooldown > 0:
|
if config.topic_throw_cooldown > 0:
|
||||||
|
logger.info(f"[主动思考] 步骤5:暂停任务")
|
||||||
await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题")
|
await proactive_thinking_scheduler.pause_proactive_thinking(stream_id, reason="已抛出话题")
|
||||||
|
logger.info(f"[主动思考] 已暂停聊天流 {stream_id} 的主动思考,等待用户回复")
|
||||||
# 设置定时恢复(在reply_reset_enabled关闭时使用)
|
|
||||||
if not config.reply_reset_enabled:
|
logger.info(f"[主动思考] throw_topic 执行完成")
|
||||||
import asyncio
|
|
||||||
async def resume_after_cooldown():
|
logger.info(f"[主动思考] 聊天流 {stream_id} 的主动思考执行完成")
|
||||||
await asyncio.sleep(config.topic_throw_cooldown)
|
|
||||||
await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
|
|
||||||
asyncio.create_task(resume_after_cooldown())
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"执行主动思考失败: {e}", exc_info=True)
|
logger.error(f"[主动思考] 执行主动思考失败: {e}", exc_info=True)
|
||||||
|
|||||||
@@ -85,8 +85,11 @@ class UnifiedScheduler:
|
|||||||
"""处理来自 event_manager 的事件通知
|
"""处理来自 event_manager 的事件通知
|
||||||
|
|
||||||
此方法由 event_manager 在触发事件时直接调用
|
此方法由 event_manager 在触发事件时直接调用
|
||||||
|
|
||||||
|
注意:此方法不能在持有 self._lock 的情况下调用,
|
||||||
|
否则会导致死锁(因为回调可能再次触发事件)
|
||||||
"""
|
"""
|
||||||
# 获取订阅该事件的所有任务
|
# 获取订阅该事件的所有任务(快速复制,减少锁持有时间)
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
event_tasks = [
|
event_tasks = [
|
||||||
task
|
task
|
||||||
@@ -97,15 +100,18 @@ class UnifiedScheduler:
|
|||||||
]
|
]
|
||||||
|
|
||||||
if not event_tasks:
|
if not event_tasks:
|
||||||
logger.debug(f"事件 '{event_name}' 没有对应的调度任务")
|
logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务")
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f"事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
|
logger.info(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
|
||||||
|
|
||||||
tasks_to_remove = []
|
tasks_to_remove = []
|
||||||
|
|
||||||
|
# 在锁外执行回调,避免死锁
|
||||||
for task in event_tasks:
|
for task in event_tasks:
|
||||||
try:
|
try:
|
||||||
|
logger.debug(f"[调度器] 执行事件任务: {task.task_name}")
|
||||||
|
|
||||||
# 执行回调,传入事件参数
|
# 执行回调,传入事件参数
|
||||||
if event_params:
|
if event_params:
|
||||||
if asyncio.iscoroutinefunction(task.callback):
|
if asyncio.iscoroutinefunction(task.callback):
|
||||||
@@ -121,14 +127,17 @@ class UnifiedScheduler:
|
|||||||
# 如果不是循环任务,标记为删除
|
# 如果不是循环任务,标记为删除
|
||||||
if not task.is_recurring:
|
if not task.is_recurring:
|
||||||
tasks_to_remove.append(task.schedule_id)
|
tasks_to_remove.append(task.schedule_id)
|
||||||
|
|
||||||
|
logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True)
|
logger.error(f"[调度器] 执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True)
|
||||||
|
|
||||||
# 移除已完成的一次性任务
|
# 移除已完成的一次性任务
|
||||||
async with self._lock:
|
if tasks_to_remove:
|
||||||
for schedule_id in tasks_to_remove:
|
async with self._lock:
|
||||||
await self._remove_task_internal(schedule_id)
|
for schedule_id in tasks_to_remove:
|
||||||
|
await self._remove_task_internal(schedule_id)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""启动调度器"""
|
"""启动调度器"""
|
||||||
@@ -190,37 +199,54 @@ class UnifiedScheduler:
|
|||||||
logger.error(f"调度器检查循环发生错误: {e}", exc_info=True)
|
logger.error(f"调度器检查循环发生错误: {e}", exc_info=True)
|
||||||
|
|
||||||
async def _check_and_trigger_tasks(self):
|
async def _check_and_trigger_tasks(self):
|
||||||
"""检查并触发到期任务"""
|
"""检查并触发到期任务
|
||||||
|
|
||||||
|
注意:为了避免死锁,回调执行必须在锁外进行
|
||||||
|
"""
|
||||||
|
current_time = datetime.now()
|
||||||
|
|
||||||
|
# 第一阶段:在锁内快速收集需要触发的任务
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
tasks_to_remove = []
|
tasks_to_trigger = []
|
||||||
current_time = datetime.now()
|
|
||||||
|
|
||||||
for schedule_id, task in list(self._tasks.items()):
|
for schedule_id, task in list(self._tasks.items()):
|
||||||
if not task.is_active:
|
if not task.is_active:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
should_trigger = await self._should_trigger_task(task, current_time)
|
should_trigger = await self._should_trigger_task(task, current_time)
|
||||||
|
|
||||||
if should_trigger:
|
if should_trigger:
|
||||||
# 执行回调
|
tasks_to_trigger.append(task)
|
||||||
await self._execute_callback(task)
|
|
||||||
|
|
||||||
# 更新任务状态
|
|
||||||
task.last_triggered_at = current_time
|
|
||||||
task.trigger_count += 1
|
|
||||||
|
|
||||||
# 如果不是循环任务,标记为删除
|
|
||||||
if not task.is_recurring:
|
|
||||||
tasks_to_remove.append(schedule_id)
|
|
||||||
logger.info(f"一次性任务 {task.task_name} 已完成,将被移除")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True)
|
logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# 第二阶段:在锁外执行回调(避免死锁)
|
||||||
|
tasks_to_remove = []
|
||||||
|
|
||||||
|
for task in tasks_to_trigger:
|
||||||
|
try:
|
||||||
|
logger.debug(f"[调度器] 触发定时任务: {task.task_name}")
|
||||||
|
|
||||||
|
# 执行回调
|
||||||
|
await self._execute_callback(task)
|
||||||
|
|
||||||
# 移除已完成的一次性任务
|
# 更新任务状态
|
||||||
for schedule_id in tasks_to_remove:
|
task.last_triggered_at = current_time
|
||||||
await self._remove_task_internal(schedule_id)
|
task.trigger_count += 1
|
||||||
|
|
||||||
|
# 如果不是循环任务,标记为删除
|
||||||
|
if not task.is_recurring:
|
||||||
|
tasks_to_remove.append(task.schedule_id)
|
||||||
|
logger.info(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# 第三阶段:在锁内移除已完成的任务
|
||||||
|
if tasks_to_remove:
|
||||||
|
async with self._lock:
|
||||||
|
for schedule_id in tasks_to_remove:
|
||||||
|
await self._remove_task_internal(schedule_id)
|
||||||
|
|
||||||
async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool:
|
async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool:
|
||||||
"""判断任务是否应该触发"""
|
"""判断任务是否应该触发"""
|
||||||
|
|||||||
Reference in New Issue
Block a user