This commit is contained in:
雅诺狐
2025-12-08 15:48:40 +08:00
parent 084192843b
commit 3edcc9d169
137 changed files with 2194 additions and 2237 deletions

View File

@@ -11,17 +11,17 @@
import asyncio
import time
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, AsyncIterator, Callable, Awaitable
from typing import TYPE_CHECKING, Any
from src.chat.chatter_manager import ChatterManager
from src.chat.energy_system import energy_manager
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.message_receive.chat_stream import get_chat_manager
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
from src.common.data_models.message_manager_data_model import StreamContext
logger = get_logger("stream_loop_manager")
@@ -36,7 +36,7 @@ logger = get_logger("stream_loop_manager")
class ConversationTick:
"""
会话事件标记 - 表示一次待处理的会话事件
这是一个轻量级的事件信号,不存储消息数据。
未读消息由 StreamContext 管理,能量值由 energy_manager 管理。
"""
@@ -61,10 +61,10 @@ async def conversation_loop(
) -> AsyncIterator[ConversationTick]:
"""
会话循环生成器 - 按需产出 Tick 事件
替代原有的无限循环任务,改为事件驱动的生成器模式。
只有调用 __anext__() 时才会执行,完全由消费者控制节奏。
Args:
stream_id: 流ID
get_context_func: 获取 StreamContext 的异步函数
@@ -72,13 +72,13 @@ async def conversation_loop(
flush_cache_func: 刷新缓存消息的异步函数
check_force_dispatch_func: 检查是否需要强制分发的函数
is_running_func: 检查是否继续运行的函数
Yields:
ConversationTick: 会话事件
"""
tick_count = 0
last_interval = None
while is_running_func():
try:
# 1. 获取流上下文
@@ -87,17 +87,17 @@ async def conversation_loop(
logger.warning(f" [生成器] stream={stream_id[:8]}, 无法获取流上下文")
await asyncio.sleep(10.0)
continue
# 2. 刷新缓存消息到未读列表
await flush_cache_func(stream_id)
# 3. 检查是否有消息需要处理
unread_messages = context.get_unread_messages()
unread_count = len(unread_messages) if unread_messages else 0
# 4. 检查是否需要强制分发
force_dispatch = check_force_dispatch_func(context, unread_count)
# 5. 如果有消息,产出 Tick
if unread_count > 0 or force_dispatch:
tick_count += 1
@@ -106,18 +106,18 @@ async def conversation_loop(
force_dispatch=force_dispatch,
tick_count=tick_count,
)
# 6. 计算并等待下次检查间隔
has_messages = unread_count > 0
interval = await calculate_interval_func(stream_id, has_messages)
# 只在间隔发生变化时输出日志
if last_interval is None or abs(interval - last_interval) > 0.01:
logger.debug(f"[生成器] stream={stream_id[:8]}, 等待间隔: {interval:.2f}s")
last_interval = interval
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f" [生成器] stream={stream_id[:8]}, 被取消")
break
@@ -137,16 +137,16 @@ async def run_chat_stream(
) -> None:
"""
聊天流驱动器 - 消费 Tick 事件并调用 Chatter
替代原有的 _stream_loop_worker结构更清晰。
Args:
stream_id: 流ID
manager: StreamLoopManager 实例
"""
task_id = id(asyncio.current_task())
logger.debug(f" [驱动器] stream={stream_id[:8]}, 任务ID={task_id}, 启动")
try:
# 创建生成器
tick_generator = conversation_loop(
@@ -157,7 +157,7 @@ async def run_chat_stream(
check_force_dispatch_func=manager._needs_force_dispatch_for_context,
is_running_func=lambda: manager.is_running,
)
# 消费 Tick 事件
async for tick in tick_generator:
try:
@@ -165,7 +165,7 @@ async def run_chat_stream(
context = await manager._get_stream_context(stream_id)
if not context:
continue
# 并发保护:检查是否正在处理
if context.is_chatter_processing:
if manager._recover_stale_chatter_state(stream_id, context):
@@ -173,19 +173,19 @@ async def run_chat_stream(
else:
logger.debug(f" [驱动器] stream={stream_id[:8]}, Chatter正在处理跳过此Tick")
continue
# 日志
if tick.force_dispatch:
logger.info(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 强制分发")
else:
logger.debug(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 开始处理")
# 更新能量值
try:
await manager._update_stream_energy(stream_id, context)
except Exception as e:
logger.debug(f"更新能量失败: {e}")
# 处理消息
assert global_config is not None
try:
@@ -196,7 +196,7 @@ async def run_chat_stream(
except asyncio.TimeoutError:
logger.warning(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 处理超时")
success = False
# 更新统计
manager.stats["total_process_cycles"] += 1
if success:
@@ -205,13 +205,13 @@ async def run_chat_stream(
else:
manager.stats["total_failures"] += 1
logger.debug(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 处理失败")
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f" [驱动器] stream={stream_id[:8]}, 处理Tick时出错: {e}")
manager.stats["total_failures"] += 1
except asyncio.CancelledError:
logger.info(f" [驱动器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消")
finally:
@@ -233,7 +233,7 @@ async def run_chat_stream(
class StreamLoopManager:
"""
流循环管理器 - 基于 Generator + Tick 的事件驱动模式
管理所有聊天流的生命周期,为每个流创建独立的驱动器任务。
"""
@@ -321,11 +321,11 @@ class StreamLoopManager:
async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool:
"""
启动指定流的驱动器任务
Args:
stream_id: 流ID
force: 是否强制启动(会先取消现有任务)
Returns:
bool: 是否成功启动
"""
@@ -379,10 +379,10 @@ class StreamLoopManager:
async def stop_stream_loop(self, stream_id: str) -> bool:
"""
停止指定流的驱动器任务
Args:
stream_id: 流ID
Returns:
bool: 是否成功停止
"""
@@ -446,11 +446,11 @@ class StreamLoopManager:
async def _process_stream_messages(self, stream_id: str, context: "StreamContext") -> bool:
"""
处理流消息
Args:
stream_id: 流ID
context: 流上下文
Returns:
bool: 是否处理成功
"""
@@ -468,7 +468,7 @@ class StreamLoopManager:
chatter_task = None
try:
start_time = time.time()
# 检查未读消息
unread_messages = context.get_unread_messages()
if not unread_messages:
@@ -521,7 +521,7 @@ class StreamLoopManager:
logger.warning(f"处理失败: {stream_id} - {results.get('error_message', '未知错误')}")
return success
except asyncio.CancelledError:
if chatter_task and not chatter_task.done():
chatter_task.cancel()
@@ -557,7 +557,7 @@ class StreamLoopManager:
# 检查是否有消息提及 Bot
bot_name = getattr(global_config.bot, "nickname", "")
bot_aliases = getattr(global_config.bot, "alias_names", [])
mention_keywords = [bot_name] + list(bot_aliases) if bot_name else list(bot_aliases)
mention_keywords = [bot_name, *list(bot_aliases)] if bot_name else list(bot_aliases)
mention_keywords = [k for k in mention_keywords if k]
for msg in unread_messages: