refactor(KFC): 从独立调度器迁移到统一调度器适配器

此提交重构了 Kokoro Flow Chatter (KFC) 插件,以集成新的 `UnifiedScheduler`。独立的 `scheduler.py` 已被移除,并由 `kfc_scheduler_adapter.py` 替代,它作为中央调度系统的适配器。

主要更改包括:
- 移除专用的 `BackgroundScheduler`,改用全局的 `UnifiedScheduler`。
- 实现 `KFCSchedulerAdapter` 来处理检查等待会话的逻辑。
- 修复超时处理逻辑中的一个关键错误:回调函数在任意 chatter 实例上执行时使用了错误的 `stream_id`。现在它正确使用 `session.stream_id`,确保消息发送给正确的用户。
- 通过使用 `asyncio.create_task` 并行处理会话来改进会话检查过程,防止一个长时间运行的会话阻塞其他会话。

此外,此提交还包括提示和操作描述的小幅改进为了更清晰和更稳健的改进。
This commit is contained in:
tt-P607
2025-11-29 16:32:10 +08:00
parent c4583e61d1
commit 49db3bd138
6 changed files with 98 additions and 453 deletions

View File

@@ -427,16 +427,23 @@ class KokoroFlowChatter(BaseChatter):
# 返回一个默认的JSON响应
return '{"thought": "出现了技术问题", "expected_user_reaction": "", "max_wait_seconds": 60, "actions": [{"type": "do_nothing"}]}'
async def _get_chat_stream(self):
"""获取聊天流对象"""
async def _get_chat_stream(self, stream_id: Optional[str] = None):
"""
获取聊天流对象
Args:
stream_id: 可选的stream_id若不提供则使用self.stream_id
在超时回调中应使用session.stream_id以避免发送到错误的用户
"""
target_stream_id = stream_id or self.stream_id
try:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
if chat_manager:
return await chat_manager.get_stream(self.stream_id)
return await chat_manager.get_stream(target_stream_id)
except Exception as e:
logger.warning(f"[KFC] 获取chat_stream失败: {e}")
logger.warning(f"[KFC] 获取chat_stream失败 (stream_id={target_stream_id}): {e}")
return None
async def _on_session_timeout(self, session: KokoroSession) -> None:
@@ -445,15 +452,23 @@ class KokoroFlowChatter(BaseChatter):
当等待超时时,触发后续决策流程
注意此回调由全局调度器触发可能会在任意Chatter实例上执行。
因此必须使用session.stream_id而非self.stream_id来确保消息发送给正确的用户。
Args:
session: 超时的会话
"""
logger.info(f"[KFC] 处理超时决策: user={session.user_id}")
logger.info(f"[KFC] 处理超时决策: user={session.user_id}, stream_id={session.stream_id}")
self.stats["timeout_decisions"] += 1
try:
# 关键修复:使用 session 的 stream_id 创建正确的 ActionExecutor
# 因为全局调度器的回调可能在任意 Chatter 实例上执行
from .action_executor import ActionExecutor
timeout_action_executor = ActionExecutor(session.stream_id)
# V2: 加载可用动作
available_actions = await self.action_executor.load_actions()
available_actions = await timeout_action_executor.load_actions()
# 生成超时决策提示词V2: 传递可用动作)
system_prompt, user_prompt = self.prompt_generator.generate_timeout_decision_prompt(
@@ -466,11 +481,11 @@ class KokoroFlowChatter(BaseChatter):
self.stats["llm_calls"] += 1
# 解析响应
parsed_response = self.action_executor.parse_llm_response(llm_response)
parsed_response = timeout_action_executor.parse_llm_response(llm_response)
# 执行动作
chat_stream = await self._get_chat_stream()
execution_result = await self.action_executor.execute_actions(
# 关键修复:使用 session.stream_id 获取正确的 chat_stream
chat_stream = await self._get_chat_stream(session.stream_id)
execution_result = await timeout_action_executor.execute_actions(
parsed_response,
session,
chat_stream

View File

@@ -10,6 +10,7 @@ Kokoro Flow Chatter 调度器适配器
3. 与 UnifiedScheduler 的集成
"""
import asyncio
import time
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional
@@ -118,7 +119,10 @@ class KFCSchedulerAdapter:
self._schedule_id = None
async def _check_waiting_sessions(self) -> None:
"""检查所有等待中的会话(由 UnifiedScheduler 调用)"""
"""检查所有等待中的会话(由 UnifiedScheduler 调用)
优化:使用 asyncio.create_task 并行处理多个会话,避免顺序阻塞
"""
session_manager = get_session_manager()
waiting_sessions = await session_manager.get_all_waiting_sessions()
@@ -128,11 +132,31 @@ class KFCSchedulerAdapter:
if not waiting_sessions:
return
# 并行处理所有等待中的会话,避免一个会话阻塞其他会话
tasks = []
for session in waiting_sessions:
try:
await self._process_waiting_session(session)
except Exception as e:
logger.error(f"处理等待会话 {session.user_id} 时出错: {e}")
task = asyncio.create_task(
self._safe_process_waiting_session(session),
name=f"kfc_session_check_{session.user_id}"
)
tasks.append(task)
# 等待所有任务完成,但每个任务都有独立的异常处理
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _safe_process_waiting_session(self, session: KokoroSession) -> None:
"""安全地处理等待会话,带有超时保护"""
try:
# 给每个会话处理设置 60 秒超时LLM 调用可能需要较长时间)
await asyncio.wait_for(
self._process_waiting_session(session),
timeout=60.0
)
except asyncio.TimeoutError:
logger.error(f"处理等待会话 {session.user_id} 超时60秒")
except Exception as e:
logger.error(f"处理等待会话 {session.user_id} 时出错: {e}")
async def _process_waiting_session(self, session: KokoroSession) -> None:
"""

View File

@@ -121,7 +121,7 @@ class KokoroFlowChatterPlugin(BasePlugin):
)
# 初始化调度器
from .scheduler import initialize_scheduler
from .kfc_scheduler_adapter import initialize_scheduler
# 从 global_config 读取配置
check_interval = 10.0
@@ -153,7 +153,7 @@ class KokoroFlowChatterPlugin(BasePlugin):
# 停止调度器
if self._scheduler:
from .scheduler import shutdown_scheduler
from .kfc_scheduler_adapter import shutdown_scheduler
await shutdown_scheduler()
self._scheduler = None

View File

@@ -95,7 +95,10 @@ def build_rules_module() -> str:
2. **感受优先**:在做出任何回应前,先以""的方式体验和感受当前的情境
3. **真实的内心活动**:保持真实的内心独白,可以有情绪、猜测、犹豫、甚至不想搭理人的时候
4. **自然的社交距离**:像真人一样有自己的边界感,不会无条件迎合,也不会刻意疏远
5. **有自己的生活**:你有自己的事情要做、自己的兴趣爱好、自己的情绪起伏,不是随叫随到的存在"""
5. **有自己的生活**:你有自己的事情要做、自己的兴趣爱好、自己的情绪起伏,不是随叫随到的存在
### 严禁事项
- **禁止模仿[表情包xxx]格式**:聊天记录中的"[表情包:描述]"是系统对图片的识别转述,不是发送表情包的方式!如果你想发表情包,必须使用 `emoji` 动作而不是在reply内容中写"[表情包xxx]"这种假的格式"""
return f"""## 2. 核心行为准则 (Core Interaction Rules)
{safety_guidelines_str}
@@ -321,7 +324,7 @@ def build_output_module(
reply_style = global_config.personality.reply_style or ""
expression_habits = context_data.get("expression_habits", "")
# JSON 输出格式说明 - 强调 max_wait_seconds 可以为 0
# JSON 输出格式说明 - 强调 max_wait_seconds 的多种用途
json_format = """### JSON输出格式要求
你必须以JSON格式输出包含以下字段
@@ -344,9 +347,35 @@ def build_output_module(
- `thought` 是你的"",应该是自然的内心想法,包含情绪和不确定性
- `actions` 是你的"",是一个**可编排的剧本**,可以组合多个行动形成完整的互动体验
- 你可以根据情况组合多个动作,比如:先唱歌舒缓情绪,再发图片逗笑,最后用语音表达关心
- `max_wait_seconds` 应该根据对话的重要性和氛围动态调整上限900秒
- **重要**: 当话题已经自然结束、用户说"拜拜/晚安/再见"、或者你认为不需要继续等待用户回复时,设为 **0**
- 即使决定不做任何事,也要有 `{"type": "do_nothing"}` 动作"""
- 即使决定不做任何事,也要有 `{"type": "do_nothing"}` 动作
### `max_wait_seconds`:你的"短期思考窗口"
这个字段设定一个时间窗口,在这段时间内如果用户没有新消息,你会被再次唤醒。
把它理解为"我想在X秒后再想想这件事"——一个短期的主动思考机会。
**场景1定时任务/提醒**
用户说"两分钟后提醒我""过一会儿叫我" → 设置对应秒数,超时后执行提醒
**场景2期待用户回复**
你发了消息,想等用户回复 → 根据话题热度设置等待时间通常60-300秒
超时后你可以:追问、换个话题、或者决定不打扰
**场景3延续思考**
聊着聊着你突然想到什么,但现在不适合说 → 设置一个等待时间
超时后你可以分享那个想法,或者已经不重要了就算了
**何时设为 0**
- 话题自然结束(拜拜/晚安/再见)
- 你不打算继续这个对话
- 长时间的主动陪伴交给其他系统处理,不需要在这里等太久
**超时后你会怎样?**
超时后你会被唤醒,收到"等待超时"的提示。此时你可以自由决定:
- 执行之前承诺的任务(如提醒)
- 主动找话题聊聊
- 什么都不做do_nothing
请在 `thought` 中说明你设置这个时间的意图,这样超时后你能记得自己想干嘛。"""
parts = ["## 6. 表达方式与输出格式 (Expression Style & Output Format)"]

View File

@@ -1,424 +0,0 @@
"""
Kokoro Flow Chatter 后台调度器
负责处理等待状态的计时和超时决策,实现"连续体验"的核心功能:
- 定期检查等待中的会话
- 触发连续思考更新
- 处理等待超时事件
"""
import asyncio
import time
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional
from src.common.logger import get_logger
from .models import (
KokoroSession,
MentalLogEntry,
MentalLogEventType,
SessionStatus,
)
from .session_manager import get_session_manager
if TYPE_CHECKING:
from .chatter import KokoroFlowChatter
logger = get_logger("kokoro_scheduler")
class BackgroundScheduler:
"""
Kokoro Flow Chatter 后台调度器
核心功能:
1. 定期检查处于WAITING状态的会话
2. 在特定时间点触发"连续思考"
3. 处理等待超时并触发决策
4. 管理后台任务的生命周期
"""
# 连续思考触发点(等待进度的百分比)
CONTINUOUS_THINKING_TRIGGERS = [0.3, 0.6, 0.85]
def __init__(
self,
check_interval: float = 10.0,
on_timeout_callback: Optional[Callable[[KokoroSession], Coroutine[Any, Any, None]]] = None,
on_continuous_thinking_callback: Optional[Callable[[KokoroSession], Coroutine[Any, Any, None]]] = None,
):
"""
初始化后台调度器
Args:
check_interval: 检查间隔(秒)
on_timeout_callback: 超时回调函数
on_continuous_thinking_callback: 连续思考回调函数
"""
self.check_interval = check_interval
self.on_timeout_callback = on_timeout_callback
self.on_continuous_thinking_callback = on_continuous_thinking_callback
self._running = False
self._check_task: Optional[asyncio.Task] = None
self._pending_tasks: set[asyncio.Task] = set()
# 统计信息
self._stats = {
"total_checks": 0,
"timeouts_triggered": 0,
"continuous_thinking_triggered": 0,
"last_check_time": 0.0,
}
logger.info("BackgroundScheduler 初始化完成")
async def start(self) -> None:
"""启动调度器"""
if self._running:
logger.warning("调度器已在运行中")
return
self._running = True
self._check_task = asyncio.create_task(self._check_loop())
logger.info("BackgroundScheduler 已启动")
async def stop(self) -> None:
"""停止调度器"""
self._running = False
# 取消主检查任务
if self._check_task:
self._check_task.cancel()
try:
await self._check_task
except asyncio.CancelledError:
pass
# 取消所有待处理任务
for task in self._pending_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._pending_tasks.clear()
logger.info("BackgroundScheduler 已停止")
async def _check_loop(self) -> None:
"""主检查循环"""
while self._running:
try:
await self._check_waiting_sessions()
self._stats["last_check_time"] = time.time()
self._stats["total_checks"] += 1
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"检查循环出错: {e}")
await asyncio.sleep(self.check_interval)
async def _check_waiting_sessions(self) -> None:
"""检查所有等待中的会话"""
session_manager = get_session_manager()
waiting_sessions = await session_manager.get_all_waiting_sessions()
if not waiting_sessions:
return
for session in waiting_sessions:
try:
await self._process_waiting_session(session)
except Exception as e:
logger.error(f"处理等待会话 {session.user_id} 时出错: {e}")
async def _process_waiting_session(self, session: KokoroSession) -> None:
"""
处理单个等待中的会话
Args:
session: 等待中的会话
"""
if session.status != SessionStatus.WAITING:
return
if session.waiting_since is None:
return
wait_duration = session.get_waiting_duration()
max_wait = session.max_wait_seconds
# 检查是否超时
if session.is_wait_timeout():
logger.info(f"会话 {session.user_id} 等待超时,触发决策")
await self._handle_timeout(session)
return
# 检查是否需要触发连续思考
wait_progress = wait_duration / max_wait if max_wait > 0 else 0
for trigger_point in self.CONTINUOUS_THINKING_TRIGGERS:
# 检查是否刚刚经过这个触发点
if self._should_trigger_continuous_thinking(
session,
wait_progress,
trigger_point
):
logger.debug(
f"会话 {session.user_id} 触发连续思考 "
f"(进度: {wait_progress:.1%}, 触发点: {trigger_point:.1%})"
)
await self._handle_continuous_thinking(session, wait_progress)
break
def _should_trigger_continuous_thinking(
self,
session: KokoroSession,
current_progress: float,
trigger_point: float,
) -> bool:
"""
判断是否应该触发连续思考
逻辑:
- 当前进度刚刚超过触发点
- 距离上次连续思考有足够间隔
- 还没有达到该触发点对应的思考次数
"""
# 已经超过了这个触发点
if current_progress < trigger_point:
return False
# 计算当前应该触发的思考次数
expected_count = sum(
1 for tp in self.CONTINUOUS_THINKING_TRIGGERS
if current_progress >= tp
)
# 如果还没达到预期的思考次数,触发一次
if session.continuous_thinking_count < expected_count:
# 确保间隔足够至少30秒
if session.last_continuous_thinking_at is None:
return True
time_since_last = time.time() - session.last_continuous_thinking_at
return time_since_last >= 30.0
return False
async def _handle_timeout(self, session: KokoroSession) -> None:
"""
处理等待超时
Args:
session: 超时的会话
"""
self._stats["timeouts_triggered"] += 1
# 更新会话状态
session.status = SessionStatus.FOLLOW_UP_PENDING
session.emotional_state.anxiety_level = 0.8 # 超时时焦虑程度较高
# 添加超时日志
timeout_entry = MentalLogEntry(
event_type=MentalLogEventType.TIMEOUT_DECISION,
timestamp=time.time(),
thought=f"等了{session.max_wait_seconds}秒了,对方还是没有回复...",
content="等待超时",
emotional_snapshot=session.emotional_state.to_dict(),
)
session.add_mental_log_entry(timeout_entry)
# 保存会话状态
session_manager = get_session_manager()
await session_manager.save_session(session.user_id)
# 调用超时回调
if self.on_timeout_callback:
task = asyncio.create_task(self._run_callback_safe(
self.on_timeout_callback,
session,
"timeout"
))
self._pending_tasks.add(task)
task.add_done_callback(self._pending_tasks.discard)
async def _handle_continuous_thinking(
self,
session: KokoroSession,
wait_progress: float,
) -> None:
"""
处理连续思考
Args:
session: 会话
wait_progress: 等待进度
"""
self._stats["continuous_thinking_triggered"] += 1
# 更新焦虑程度
session.emotional_state.update_anxiety_over_time(
session.get_waiting_duration(),
session.max_wait_seconds
)
# 更新连续思考计数
session.continuous_thinking_count += 1
session.last_continuous_thinking_at = time.time()
# 生成基于进度的内心想法
thought = self._generate_waiting_thought(session, wait_progress)
# 添加连续思考日志
thinking_entry = MentalLogEntry(
event_type=MentalLogEventType.CONTINUOUS_THINKING,
timestamp=time.time(),
thought=thought,
content="",
emotional_snapshot=session.emotional_state.to_dict(),
metadata={"wait_progress": wait_progress},
)
session.add_mental_log_entry(thinking_entry)
# 保存会话状态
session_manager = get_session_manager()
await session_manager.save_session(session.user_id)
# 调用连续思考回调如果需要LLM生成更自然的想法
if self.on_continuous_thinking_callback:
task = asyncio.create_task(self._run_callback_safe(
self.on_continuous_thinking_callback,
session,
"continuous_thinking"
))
self._pending_tasks.add(task)
task.add_done_callback(self._pending_tasks.discard)
def _generate_waiting_thought(
self,
session: KokoroSession,
wait_progress: float,
) -> str:
"""
生成等待中的内心想法简单版本不调用LLM
Args:
session: 会话
wait_progress: 等待进度
Returns:
str: 内心想法
"""
wait_seconds = session.get_waiting_duration()
wait_minutes = wait_seconds / 60
if wait_progress < 0.4:
thoughts = [
f"已经等了{wait_minutes:.1f}分钟了,对方可能在忙吧...",
f"嗯...{wait_minutes:.1f}分钟过去了,不知道对方在做什么",
"对方好像还没看到消息,再等等吧",
]
elif wait_progress < 0.7:
thoughts = [
f"等了{wait_minutes:.1f}分钟了,有点担心对方是不是不想回了",
f"{wait_minutes:.1f}分钟了,对方可能真的很忙?",
"时间过得好慢啊...不知道对方什么时候会回复",
]
else:
thoughts = [
f"已经等了{wait_minutes:.1f}分钟了,感觉有点焦虑...",
f"{wait_minutes:.0f}分钟了,对方是不是忘记回复了?",
"等了这么久,要不要主动说点什么呢...",
]
import random
return random.choice(thoughts)
async def _run_callback_safe(
self,
callback: Callable[[KokoroSession], Coroutine[Any, Any, None]],
session: KokoroSession,
callback_type: str,
) -> None:
"""安全地运行回调函数"""
try:
await callback(session)
except Exception as e:
logger.error(f"执行{callback_type}回调时出错 (user={session.user_id}): {e}")
def set_timeout_callback(
self,
callback: Callable[[KokoroSession], Coroutine[Any, Any, None]],
) -> None:
"""设置超时回调函数"""
self.on_timeout_callback = callback
def set_continuous_thinking_callback(
self,
callback: Callable[[KokoroSession], Coroutine[Any, Any, None]],
) -> None:
"""设置连续思考回调函数"""
self.on_continuous_thinking_callback = callback
def get_stats(self) -> dict[str, Any]:
"""获取统计信息"""
return {
**self._stats,
"is_running": self._running,
"pending_tasks": len(self._pending_tasks),
"check_interval": self.check_interval,
}
@property
def is_running(self) -> bool:
"""调度器是否正在运行"""
return self._running
# 全局调度器实例
_scheduler: Optional[BackgroundScheduler] = None
def get_scheduler() -> BackgroundScheduler:
"""获取全局调度器实例"""
global _scheduler
if _scheduler is None:
_scheduler = BackgroundScheduler()
return _scheduler
async def initialize_scheduler(
check_interval: float = 10.0,
on_timeout_callback: Optional[Callable[[KokoroSession], Coroutine[Any, Any, None]]] = None,
on_continuous_thinking_callback: Optional[Callable[[KokoroSession], Coroutine[Any, Any, None]]] = None,
) -> BackgroundScheduler:
"""
初始化并启动调度器
Args:
check_interval: 检查间隔
on_timeout_callback: 超时回调
on_continuous_thinking_callback: 连续思考回调
Returns:
BackgroundScheduler: 调度器实例
"""
global _scheduler
_scheduler = BackgroundScheduler(
check_interval=check_interval,
on_timeout_callback=on_timeout_callback,
on_continuous_thinking_callback=on_continuous_thinking_callback,
)
await _scheduler.start()
return _scheduler
async def shutdown_scheduler() -> None:
"""关闭调度器"""
global _scheduler
if _scheduler:
await _scheduler.stop()
_scheduler = None

View File

@@ -62,7 +62,7 @@ class TTSVoiceAction(BaseAction):
"""
action_name = "tts_voice_action"
action_description = "将你生成好的文本转换为语音并发送。你必须提供要转换的文本。"
action_description = "将你生成好的文本转换为语音并发送。注意:这是纯语音合成,只能说话,不能唱歌!"
mode_enable = ChatMode.ALL
parallel_action = False
@@ -70,7 +70,7 @@ class TTSVoiceAction(BaseAction):
action_parameters: ClassVar[dict] = {
"tts_voice_text": {
"type": "string",
"description": "需要转换为语音并发送的完整、自然、适合口语的文本内容。",
"description": "需要转换为语音并发送的完整、自然、适合口语的文本内容。注意:只能是说话内容,不能是歌词或唱歌!",
"required": True
},
"voice_style": {
@@ -100,14 +100,15 @@ class TTSVoiceAction(BaseAction):
}
action_require: ClassVar[list] = [
"在调用此动作时,你必须在 'text' 参数中提供要合成语音的完整回复内容。这是强制性的。",
"当用户明确请求使用语音进行回复时,例如‘发个语音听听’、‘用语音说’等",
"【核心限制】此动作只能用于说话绝对不能用于唱歌TTS无法发出有音调的歌声只会输出平淡的念白。如果用户要求唱歌不要使用此动作",
"在调用此动作时,你必须在 'tts_voice_text' 参数中提供要合成语音的完整回复内容。这是强制性的",
"当用户明确请求使用语音进行回复时,例如'发个语音听听''用语音说'等。",
"当对话内容适合用语音表达,例如讲故事、念诗、撒嬌或进行角色扮演时。",
"在表达特殊情感(如安慰、鼓励、庆祝)的场景下,可以主动使用语音来增强感染力。",
"不要在日常的、简短的问答或闲聊中频繁使用语音,避免打扰用户。",
"提供的 'text' 内容必须是纯粹的对话,不能包含任何括号或方括号括起来的动作、表情、或场景描述(例如,不要出现 '(笑)''[歪头]'",
"**重要**:此动作专为语音合成设计,因此 'text' 参数的内容必须是纯净、标准的口语文本。请务必抑制你通常的、富有表现力的文本风格,不要使用任何辅助聊天或增强视觉效果的特殊符号(例如 '', '', '', '' 等),因为它们无法被正确合成为语音。",
"【**最终规则**】'text' 参数中,所有句子和停顿【必须】使用且只能使用以下四个标准标点符号:'' (逗号)、'' (句号)、'' (问号)、'' (叹号)。任何其他符号,特别是 '...''' 以及任何表情符号或装饰性符号,都【严禁】出现,否则将导致语音合成严重失败。"
"提供的 'tts_voice_text' 内容必须是纯粹的对话,不能包含任何括号或方括号括起来的动作、表情、或场景描述(例如,不要出现 '(笑)''[歪头]'",
"**重要**:此动作专为语音合成设计,因此 'tts_voice_text' 参数的内容必须是纯净、标准的口语文本。请务必抑制你通常的、富有表现力的文本风格,不要使用任何辅助聊天或增强视觉效果的特殊符号(例如 '', '', '', '' 等),因为它们无法被正确合成为语音。",
"【**最终规则**】'tts_voice_text' 参数中,所有句子和停顿【必须】使用且只能使用以下四个标准标点符号:'' (逗号)、'' (句号)、'' (问号)、'' (叹号)。任何其他符号,特别是 '...''' 以及任何表情符号或装饰性符号,都【严禁】出现,否则将导致语音合成严重失败。"
]
def __init__(self, *args, **kwargs):