refactor(distribution_manager): 优化流循环管理逻辑,减少重复代码并改进异常处理

refactor(action_manager): 将异步存储操作改为非阻塞任务,提升性能
refactor(default_generator): 简化回复生成器中的消息处理逻辑
refactor(generator_api): 更新类型提示,增强代码可读性
refactor(affinity_chatter): 清理异常处理中的冗余代码,确保处理标记的正确清理
refactor(affinity_interest_calculator): 重命名阈值调整方法,提升代码一致性
refactor(plan_executor): 移除冗余的已读消息处理逻辑
refactor(planner): 优化规划器中的异常处理,确保正常模式的退出检查
This commit is contained in:
Windpicker-owo
2025-11-12 16:08:52 +08:00
parent 66c23e1ba1
commit 80736a0deb
8 changed files with 38 additions and 78 deletions

View File

@@ -108,6 +108,17 @@ class StreamLoopManager:
Returns:
bool: 是否成功启动
"""
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
return False
# 快速路径:如果流已存在且不是强制启动,无需处理
if not force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动")
return True
# 获取或创建该流的启动锁
if stream_id not in self._stream_start_locks:
self._stream_start_locks[stream_id] = asyncio.Lock()
@@ -116,17 +127,6 @@ class StreamLoopManager:
# 使用锁防止并发启动同一个流的多个循环任务
async with lock:
# 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
return False
# 快速路径:如果流已存在且不是强制启动,无需处理
if not force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动")
return True
# 如果是强制启动且任务仍在运行,先取消旧任务
if force and context.stream_loop_task and not context.stream_loop_task.done():
logger.warning(f"⚠️ [流循环] stream={stream_id[:8]}, 强制启动模式:先取消现有任务")
@@ -238,7 +238,7 @@ class StreamLoopManager:
# 3. 在处理前更新能量值(用于下次间隔计算)
try:
await self._update_stream_energy(stream_id, context)
asyncio.create_task(self._update_stream_energy(stream_id, context))
except Exception as e:
logger.debug(f"更新流能量失败 {stream_id}: {e}")
@@ -370,9 +370,6 @@ class StreamLoopManager:
if last_message:
context.triggering_user_id = last_message.user_info.user_id
# 创建子任务用于刷新能量(不阻塞主流程)
asyncio.create_task(self._refresh_focus_energy(stream_id))
# 设置 Chatter 正在处理的标志
context.is_chatter_processing = True
logger.debug(f"设置 Chatter 处理标志: {stream_id}")
@@ -388,11 +385,6 @@ class StreamLoopManager:
success = results.get("success", False)
if success:
# 处理成功后,再次刷新缓存中可能的新消息
additional_messages = await self._flush_cached_messages_to_unread(stream_id)
if additional_messages:
logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:

View File

@@ -204,8 +204,7 @@ class ChatterActionManager:
action_prompt_display=reason,
)
else:
# 改为同步等待,确保存储完成
await database_api.store_action_info(
asyncio.create_task(database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason,
@@ -213,7 +212,7 @@ class ChatterActionManager:
thinking_id=thinking_id or "",
action_data={"reason": reason},
action_name="no_reply",
)
))
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
@@ -229,9 +228,9 @@ class ChatterActionManager:
target_message,
)
# 记录执行的动作到目标消息(改为同步等待)
# 记录执行的动作到目标消息
if success:
await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
asyncio.create_task(self._record_action_to_message(chat_stream, action_name, target_message, action_data))
# 重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id)
@@ -314,13 +313,13 @@ class ChatterActionManager:
)
# 记录回复动作到目标消息
await self._record_action_to_message(chat_stream, action_name, target_message, action_data)
asyncio.create_task(self._record_action_to_message(chat_stream, action_name, target_message, action_data))
# 回复成功,重置打断计数(改为同步等待)
# 回复成功,重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id)
return loop_info, reply_text, cycle_timers_reply
loop_info, reply_text, cycle_timers_reply = await asyncio.create_task(_after_reply())
loop_info, reply_text, _ = await _after_reply()
return {"action_type": action_name, "success": True, "reply_text": reply_text, "loop_info": loop_info}
except Exception as e:

View File

@@ -380,8 +380,8 @@ class DefaultReplyer:
if not prompt:
logger.warning("构建prompt失败跳过回复生成")
return False, None, None
from src.plugin_system.core.event_manager import event_manager
# 触发 POST_LLM 事件(请求 LLM 之前)
if not from_plugin:
result = await event_manager.trigger_event(
@@ -1202,19 +1202,11 @@ class DefaultReplyer:
return ""
else:
# 有 reply_message正常处理
# 统一处理 DatabaseMessages 对象和字典
if isinstance(reply_message, DatabaseMessages):
platform = reply_message.chat_info.platform
user_id = reply_message.user_info.user_id
user_nickname = reply_message.user_info.user_nickname
user_cardname = reply_message.user_info.user_cardname
processed_plain_text = reply_message.processed_plain_text
else:
platform = reply_message.get("chat_info_platform")
user_id = reply_message.get("user_id")
user_nickname = reply_message.get("user_nickname")
user_cardname = reply_message.get("user_cardname")
processed_plain_text = reply_message.get("processed_plain_text")
platform = reply_message.chat_info.platform
user_id = reply_message.user_info.user_id
user_nickname = reply_message.user_info.user_nickname
user_cardname = reply_message.user_info.user_cardname
processed_plain_text = reply_message.processed_plain_text
person_id = person_info_manager.get_person_id(
platform, # type: ignore
@@ -1237,7 +1229,7 @@ class DefaultReplyer:
current_user_id = await person_info_manager.get_value(person_id, "user_id")
current_platform = platform
if current_user_id == bot_user_id and current_platform == global_config.bot.platform:
if str(current_user_id) == bot_user_id and current_platform == global_config.bot.platform:
sender = f"{person_name}(你)"
else:
# 如果不是bot自己直接使用person_name