feat: 使用提示管理和会话处理来实现Kokoro Flow Chatter V2

- 在Kokoro Flow Chatter V2中添加提示模块以管理提示信息。
- 创建一个构建器,用于根据用户交互和会话上下文构建提示。
- 为不同场景(新消息、及时回复等)注册各种提示模板。
- 开发一个回复模块,使用LLM API生成回复。
- 实现会话管理以处理用户交互并维护状态。
- 引入心理日志条目以追踪用户与机器人的交互情况。
- 确保各模块中都有适当的日志记录和错误处理。
This commit is contained in:
Windpicker-owo
2025-11-30 13:05:26 +08:00
parent 4245228cb7
commit 0fe15dac52
13 changed files with 3155 additions and 0 deletions

View File

@@ -0,0 +1,66 @@
"""
Kokoro Flow Chatter V2 - 私聊特化的心流聊天器
重构版本,核心设计理念:
1. Chatter 职责极简化:只负责"收到消息 → 规划执行"
2. Session 状态简化:只有 IDLE 和 WAITING 两种状态
3. 独立的 Replyer专属的提示词构建和 LLM 交互
4. 独立的主动思考器:负责等待管理和主动发起
5. 大模板 + 小模板:线性叙事风格的提示词架构
"""
from .models import (
EventType,
SessionStatus,
MentalLogEntry,
WaitingConfig,
ActionModel,
LLMResponse,
)
from .session import KokoroSession, SessionManager, get_session_manager
from .chatter import KokoroFlowChatterV2
from .replyer import generate_response
from .action_executor import ActionExecutor
from .proactive_thinker import (
ProactiveThinker,
get_proactive_thinker,
start_proactive_thinker,
stop_proactive_thinker,
)
from .config import (
KokoroFlowChatterV2Config,
get_config,
load_config,
reload_config,
)
from .plugin import KokoroFlowChatterV2Plugin
__all__ = [
# Models
"EventType",
"SessionStatus",
"MentalLogEntry",
"WaitingConfig",
"ActionModel",
"LLMResponse",
# Session
"KokoroSession",
"SessionManager",
"get_session_manager",
# Core Components
"KokoroFlowChatterV2",
"generate_response",
"ActionExecutor",
# Proactive Thinker
"ProactiveThinker",
"get_proactive_thinker",
"start_proactive_thinker",
"stop_proactive_thinker",
# Config
"KokoroFlowChatterV2Config",
"get_config",
"load_config",
"reload_config",
# Plugin
"KokoroFlowChatterV2Plugin",
]

View File

@@ -0,0 +1,228 @@
"""
Kokoro Flow Chatter V2 - 动作执行器
负责执行 LLM 决策的动作
"""
import asyncio
import time
from typing import TYPE_CHECKING, Any, Optional
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.common.logger import get_logger
from src.plugin_system.apis import send_api
from .models import ActionModel, LLMResponse
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
logger = get_logger("kfc_v2_action_executor")
class ActionExecutor:
"""
动作执行器
职责:
- 执行 reply、poke_user 等动作
- 通过 ActionManager 执行动态注册的动作
"""
# 内置动作(不通过 ActionManager
BUILTIN_ACTIONS = {"reply", "do_nothing"}
def __init__(self, stream_id: str):
self.stream_id = stream_id
self._action_manager = ChatterActionManager()
self._available_actions: dict = {}
# 统计
self._stats = {
"total_executed": 0,
"successful": 0,
"failed": 0,
}
async def load_actions(self) -> dict:
"""加载可用动作"""
await self._action_manager.load_actions(self.stream_id)
self._available_actions = self._action_manager.get_using_actions()
logger.debug(f"[ActionExecutor] 加载了 {len(self._available_actions)} 个动作")
return self._available_actions
def get_available_actions(self) -> dict:
"""获取可用动作"""
return self._available_actions.copy()
async def execute(
self,
response: LLMResponse,
chat_stream: Optional["ChatStream"],
) -> dict[str, Any]:
"""
执行动作列表
Args:
response: LLM 响应
chat_stream: 聊天流
Returns:
执行结果
"""
results = []
has_reply = False
reply_content = ""
for action in response.actions:
try:
result = await self._execute_action(action, chat_stream)
results.append(result)
if result.get("success"):
self._stats["successful"] += 1
if action.type in ("reply", "respond"):
has_reply = True
reply_content = action.params.get("content", "")
else:
self._stats["failed"] += 1
except Exception as e:
logger.error(f"[ActionExecutor] 执行动作失败 {action.type}: {e}")
results.append({
"action_type": action.type,
"success": False,
"error": str(e),
})
self._stats["failed"] += 1
self._stats["total_executed"] += 1
return {
"success": all(r.get("success", False) for r in results),
"results": results,
"has_reply": has_reply,
"reply_content": reply_content,
}
async def _execute_action(
self,
action: ActionModel,
chat_stream: Optional["ChatStream"],
) -> dict[str, Any]:
"""执行单个动作"""
action_type = action.type
if action_type == "reply":
return await self._execute_reply(action, chat_stream)
elif action_type == "do_nothing":
logger.debug("[ActionExecutor] 执行 do_nothing")
return {"action_type": "do_nothing", "success": True}
elif action_type == "poke_user":
return await self._execute_via_manager(action, chat_stream)
elif action_type in self._available_actions:
return await self._execute_via_manager(action, chat_stream)
else:
logger.warning(f"[ActionExecutor] 未知动作类型: {action_type}")
return {
"action_type": action_type,
"success": False,
"error": f"未知动作类型: {action_type}",
}
async def _execute_reply(
self,
action: ActionModel,
chat_stream: Optional["ChatStream"],
) -> dict[str, Any]:
"""执行回复动作"""
content = action.params.get("content", "")
if not content:
return {
"action_type": "reply",
"success": False,
"error": "回复内容为空",
}
try:
# 消息后处理(分割、错别字等)
processed_messages = await self._process_reply_content(content)
all_success = True
for msg in processed_messages:
success = await send_api.text_to_stream(
text=msg,
stream_id=self.stream_id,
typing=True,
)
if not success:
all_success = False
return {
"action_type": "reply",
"success": all_success,
"reply_text": content,
}
except Exception as e:
logger.error(f"[ActionExecutor] 发送回复失败: {e}")
return {
"action_type": "reply",
"success": False,
"error": str(e),
}
async def _process_reply_content(self, content: str) -> list[str]:
"""处理回复内容(分割、错别字等)"""
try:
# 复用 v1 的后处理器
from src.plugins.built_in.kokoro_flow_chatter.response_post_processor import (
process_reply_content,
)
messages = await process_reply_content(content)
return messages if messages else [content]
except Exception as e:
logger.warning(f"[ActionExecutor] 消息处理失败,使用原始内容: {e}")
return [content]
async def _execute_via_manager(
self,
action: ActionModel,
chat_stream: Optional["ChatStream"],
) -> dict[str, Any]:
"""通过 ActionManager 执行动作"""
try:
result = await self._action_manager.execute_action(
action_name=action.type,
chat_id=self.stream_id,
target_message=None,
reasoning=f"KFC决策: {action.type}",
action_data=action.params,
thinking_id=None,
log_prefix="[KFC V2]",
)
return {
"action_type": action.type,
"success": result.get("success", False),
"result": result,
}
except Exception as e:
logger.error(f"[ActionExecutor] ActionManager 执行失败: {e}")
return {
"action_type": action.type,
"success": False,
"error": str(e),
}
def get_stats(self) -> dict:
"""获取统计信息"""
return self._stats.copy()

View File

@@ -0,0 +1,263 @@
"""
Kokoro Flow Chatter V2 - Chatter 主类
极简设计,只负责:
1. 收到消息
2. 调用 Replyer 生成响应
3. 执行动作
4. 更新 Session
"""
import asyncio
import time
from typing import TYPE_CHECKING, Any, ClassVar, Optional
from src.common.data_models.message_manager_data_model import StreamContext
from src.common.logger import get_logger
from src.plugin_system.base.base_chatter import BaseChatter
from src.plugin_system.base.component_types import ChatType
from .action_executor import ActionExecutor
from .models import EventType, SessionStatus
from .replyer import generate_response
from .session import get_session_manager
if TYPE_CHECKING:
from src.chat.planner_actions.action_manager import ChatterActionManager
from src.common.data_models.database_data_model import DatabaseMessages
logger = get_logger("kfc_v2_chatter")
# 控制台颜色
SOFT_PURPLE = "\033[38;5;183m"
RESET = "\033[0m"
class KokoroFlowChatterV2(BaseChatter):
"""
Kokoro Flow Chatter V2 - 私聊特化的心流聊天器
核心设计:
- Chatter 只负责 "收到消息 → 规划执行" 的流程
- 无论 Session 之前是什么状态,流程都一样
- 区别只体现在提示词中
不负责:
- 等待超时处理(由 ProactiveThinker 负责)
- 连续思考(由 ProactiveThinker 负责)
- 主动发起对话(由 ProactiveThinker 负责)
"""
chatter_name: str = "KokoroFlowChatterV2"
chatter_description: str = "心流聊天器 V2 - 私聊特化的深度情感交互处理器"
chat_types: ClassVar[list[ChatType]] = [ChatType.PRIVATE]
def __init__(
self,
stream_id: str,
action_manager: "ChatterActionManager",
plugin_config: dict | None = None,
):
super().__init__(stream_id, action_manager, plugin_config)
# 核心组件
self.session_manager = get_session_manager()
self.action_executor = ActionExecutor(stream_id)
# 并发控制
self._lock = asyncio.Lock()
self._processing = False
# 统计
self._stats = {
"messages_processed": 0,
"successful_responses": 0,
"failed_responses": 0,
}
logger.info(f"{SOFT_PURPLE}[KFC V2]{RESET} 初始化完成: stream_id={stream_id}")
async def execute(self, context: StreamContext) -> dict:
"""
执行聊天处理
流程:
1. 获取 Session
2. 获取未读消息
3. 记录用户消息到 mental_log
4. 确定 situation_type根据之前的等待状态
5. 调用 Replyer 生成响应
6. 执行动作
7. 更新 Session记录 Bot 规划,设置等待状态)
8. 保存 Session
"""
async with self._lock:
self._processing = True
try:
# 1. 获取未读消息
unread_messages = context.get_unread_messages()
if not unread_messages:
return self._build_result(success=True, message="no_unread_messages")
# 2. 取最后一条消息作为主消息
target_message = unread_messages[-1]
user_info = target_message.user_info
if not user_info:
return self._build_result(success=False, message="no_user_info")
user_id = str(user_info.user_id)
user_name = user_info.user_nickname or user_id
# 3. 获取或创建 Session
session = await self.session_manager.get_session(user_id, self.stream_id)
# 4. 确定 situation_type根据之前的等待状态
situation_type = self._determine_situation_type(session)
# 5. 记录用户消息到 mental_log
for msg in unread_messages:
msg_content = msg.processed_plain_text or msg.display_message or ""
msg_user_name = msg.user_info.user_nickname if msg.user_info else user_name
msg_user_id = str(msg.user_info.user_id) if msg.user_info else user_id
session.add_user_message(
content=msg_content,
user_name=msg_user_name,
user_id=msg_user_id,
timestamp=msg.time,
)
# 6. 加载可用动作
await self.action_executor.load_actions()
available_actions = self.action_executor.get_available_actions()
# 7. 获取聊天流
chat_stream = await self._get_chat_stream()
# 8. 调用 Replyer 生成响应
response = await generate_response(
session=session,
user_name=user_name,
situation_type=situation_type,
chat_stream=chat_stream,
available_actions=available_actions,
)
# 9. 执行动作
exec_result = await self.action_executor.execute(response, chat_stream)
# 10. 记录 Bot 规划到 mental_log
session.add_bot_planning(
thought=response.thought,
actions=[a.to_dict() for a in response.actions],
expected_reaction=response.expected_reaction,
max_wait_seconds=response.max_wait_seconds,
)
# 11. 更新 Session 状态
if response.max_wait_seconds > 0:
session.start_waiting(
expected_reaction=response.expected_reaction,
max_wait_seconds=response.max_wait_seconds,
)
else:
session.end_waiting()
# 12. 标记消息为已读
for msg in unread_messages:
context.mark_message_as_read(str(msg.message_id))
# 13. 保存 Session
await self.session_manager.save_session(user_id)
# 14. 更新统计
self._stats["messages_processed"] += len(unread_messages)
if exec_result.get("has_reply"):
self._stats["successful_responses"] += 1
logger.info(
f"{SOFT_PURPLE}[KFC V2]{RESET} 处理完成: "
f"user={user_name}, situation={situation_type}, "
f"actions={[a.type for a in response.actions]}, "
f"wait={response.max_wait_seconds}s"
)
return self._build_result(
success=True,
message="processed",
has_reply=exec_result.get("has_reply", False),
thought=response.thought,
situation_type=situation_type,
)
except Exception as e:
self._stats["failed_responses"] += 1
logger.error(f"[KFC V2] 处理失败: {e}")
import traceback
traceback.print_exc()
return self._build_result(success=False, message=str(e), error=True)
finally:
self._processing = False
def _determine_situation_type(self, session) -> str:
"""
确定当前情况类型
根据 Session 之前的状态决定提示词的 situation_type
"""
if session.status == SessionStatus.WAITING:
# 之前在等待
if session.waiting_config.is_timeout():
# 超时了才收到回复
return "reply_late"
else:
# 在预期内收到回复
return "reply_in_time"
else:
# 之前是 IDLE
return "new_message"
async def _get_chat_stream(self):
"""获取聊天流对象"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
if chat_manager:
return await chat_manager.get_stream(self.stream_id)
except Exception as e:
logger.warning(f"[KFC V2] 获取 chat_stream 失败: {e}")
return None
def _build_result(
self,
success: bool,
message: str = "",
error: bool = False,
**kwargs,
) -> dict:
"""构建返回结果"""
result = {
"success": success,
"stream_id": self.stream_id,
"message": message,
"error": error,
"timestamp": time.time(),
}
result.update(kwargs)
return result
def get_stats(self) -> dict[str, Any]:
"""获取统计信息"""
return {
**self._stats,
"action_executor_stats": self.action_executor.get_stats(),
}
@property
def is_processing(self) -> bool:
"""是否正在处理"""
return self._processing

View File

@@ -0,0 +1,221 @@
"""
Kokoro Flow Chatter V2 - 配置
可以通过 TOML 配置文件覆盖默认值
"""
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class WaitingDefaults:
"""等待配置默认值"""
# 默认最大等待时间(秒)
default_max_wait_seconds: int = 300
# 最小等待时间
min_wait_seconds: int = 30
# 最大等待时间
max_wait_seconds: int = 1800
@dataclass
class ProactiveConfig:
"""主动思考配置"""
# 是否启用主动思考
enabled: bool = True
# 沉默阈值(秒),超过此时间考虑主动发起
silence_threshold_seconds: int = 7200
# 两次主动发起最小间隔(秒)
min_interval_between_proactive: int = 1800
# 勿扰时段开始HH:MM 格式)
quiet_hours_start: str = "23:00"
# 勿扰时段结束
quiet_hours_end: str = "07:00"
# 主动发起概率0.0 ~ 1.0
trigger_probability: float = 0.3
@dataclass
class PromptConfig:
"""提示词配置"""
# 活动记录保留条数
max_activity_entries: int = 30
# 每条记录最大字符数
max_entry_length: int = 500
# 是否包含人物关系信息
include_relation: bool = True
# 是否包含记忆信息
include_memory: bool = True
@dataclass
class SessionConfig:
"""会话配置"""
# Session 持久化目录(相对于 data/
session_dir: str = "kokoro_flow_chatter_v2/sessions"
# Session 自动过期时间(秒),超过此时间未活动自动清理
session_expire_seconds: int = 86400 * 7 # 7 天
# 活动记录保留上限
max_mental_log_entries: int = 100
@dataclass
class LLMConfig:
"""LLM 配置"""
# 模型名称(空则使用默认)
model_name: str = ""
# Temperature
temperature: float = 0.8
# 最大 Token
max_tokens: int = 1024
# 请求超时(秒)
timeout: float = 60.0
@dataclass
class KokoroFlowChatterV2Config:
"""Kokoro Flow Chatter V2 总配置"""
# 是否启用
enabled: bool = True
# 启用的消息源类型(空列表表示全部)
enabled_stream_types: List[str] = field(default_factory=lambda: ["private"])
# 等待配置
waiting: WaitingDefaults = field(default_factory=WaitingDefaults)
# 主动思考配置
proactive: ProactiveConfig = field(default_factory=ProactiveConfig)
# 提示词配置
prompt: PromptConfig = field(default_factory=PromptConfig)
# 会话配置
session: SessionConfig = field(default_factory=SessionConfig)
# LLM 配置
llm: LLMConfig = field(default_factory=LLMConfig)
# 调试模式
debug: bool = False
# 全局配置单例
_config: Optional[KokoroFlowChatterV2Config] = None
def get_config() -> KokoroFlowChatterV2Config:
"""获取全局配置"""
global _config
if _config is None:
_config = load_config()
return _config
def load_config() -> KokoroFlowChatterV2Config:
"""从全局配置加载 KFC V2 配置"""
from src.config.config import global_config
config = KokoroFlowChatterV2Config()
# 尝试从全局配置读取
if not global_config:
return config
try:
if hasattr(global_config, 'kokoro_flow_chatter_v2'):
kfc_cfg = getattr(global_config, 'kokoro_flow_chatter_v2')
# 基础配置
if hasattr(kfc_cfg, 'enabled'):
config.enabled = kfc_cfg.enabled
if hasattr(kfc_cfg, 'enabled_stream_types'):
config.enabled_stream_types = list(kfc_cfg.enabled_stream_types)
if hasattr(kfc_cfg, 'debug'):
config.debug = kfc_cfg.debug
# 等待配置
if hasattr(kfc_cfg, 'waiting'):
wait_cfg = kfc_cfg.waiting
config.waiting = WaitingDefaults(
default_max_wait_seconds=getattr(wait_cfg, 'default_max_wait_seconds', 300),
min_wait_seconds=getattr(wait_cfg, 'min_wait_seconds', 30),
max_wait_seconds=getattr(wait_cfg, 'max_wait_seconds', 1800),
)
# 主动思考配置
if hasattr(kfc_cfg, 'proactive'):
pro_cfg = kfc_cfg.proactive
config.proactive = ProactiveConfig(
enabled=getattr(pro_cfg, 'enabled', True),
silence_threshold_seconds=getattr(pro_cfg, 'silence_threshold_seconds', 7200),
min_interval_between_proactive=getattr(pro_cfg, 'min_interval_between_proactive', 1800),
quiet_hours_start=getattr(pro_cfg, 'quiet_hours_start', "23:00"),
quiet_hours_end=getattr(pro_cfg, 'quiet_hours_end', "07:00"),
trigger_probability=getattr(pro_cfg, 'trigger_probability', 0.3),
)
# 提示词配置
if hasattr(kfc_cfg, 'prompt'):
pmt_cfg = kfc_cfg.prompt
config.prompt = PromptConfig(
max_activity_entries=getattr(pmt_cfg, 'max_activity_entries', 30),
max_entry_length=getattr(pmt_cfg, 'max_entry_length', 500),
include_relation=getattr(pmt_cfg, 'include_relation', True),
include_memory=getattr(pmt_cfg, 'include_memory', True),
)
# 会话配置
if hasattr(kfc_cfg, 'session'):
sess_cfg = kfc_cfg.session
config.session = SessionConfig(
session_dir=getattr(sess_cfg, 'session_dir', "kokoro_flow_chatter_v2/sessions"),
session_expire_seconds=getattr(sess_cfg, 'session_expire_seconds', 86400 * 7),
max_mental_log_entries=getattr(sess_cfg, 'max_mental_log_entries', 100),
)
# LLM 配置
if hasattr(kfc_cfg, 'llm'):
llm_cfg = kfc_cfg.llm
config.llm = LLMConfig(
model_name=getattr(llm_cfg, 'model_name', ""),
temperature=getattr(llm_cfg, 'temperature', 0.8),
max_tokens=getattr(llm_cfg, 'max_tokens', 1024),
timeout=getattr(llm_cfg, 'timeout', 60.0),
)
except Exception as e:
from src.common.logger import get_logger
logger = get_logger("kfc_v2_config")
logger.warning(f"加载 KFC V2 配置失败,使用默认值: {e}")
return config
def reload_config() -> KokoroFlowChatterV2Config:
"""重新加载配置"""
global _config
_config = load_config()
return _config

View File

@@ -0,0 +1,338 @@
"""
Kokoro Flow Chatter V2 上下文构建器
为 KFC V2 提供完整的情境感知能力。
包含:
- 关系信息 (relation_info)
- 记忆块 (memory_block)
- 表达习惯 (expression_habits)
- 日程信息 (schedule)
- 时间信息 (time)
"""
import asyncio
import time
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.person_info.person_info import get_person_info_manager, PersonInfoManager
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
from src.common.data_models.message_manager_data_model import StreamContext
logger = get_logger("kfc_v2_context_builder")
def _get_config():
"""获取全局配置(带类型断言)"""
assert global_config is not None, "global_config 未初始化"
return global_config
class KFCContextBuilder:
"""
KFC V2 上下文构建器
为提示词提供完整的情境感知数据。
"""
def __init__(self, chat_stream: "ChatStream"):
self.chat_stream = chat_stream
self.chat_id = chat_stream.stream_id
self.platform = chat_stream.platform
self.is_group_chat = bool(chat_stream.group_info)
async def build_all_context(
self,
sender_name: str,
target_message: str,
context: Optional["StreamContext"] = None,
) -> dict[str, str]:
"""
并行构建所有上下文模块
Args:
sender_name: 发送者名称
target_message: 目标消息内容
context: 聊天流上下文(可选)
Returns:
dict: 包含所有上下文块的字典
"""
chat_history = await self._get_chat_history_text(context)
tasks = {
"relation_info": self._build_relation_info(sender_name, target_message),
"memory_block": self._build_memory_block(chat_history, target_message),
"expression_habits": self._build_expression_habits(chat_history, target_message),
"schedule": self._build_schedule_block(),
"time": self._build_time_block(),
}
results = {}
try:
task_results = await asyncio.gather(
*[self._wrap_task(name, coro) for name, coro in tasks.items()],
return_exceptions=True
)
for result in task_results:
if isinstance(result, tuple):
name, value = result
results[name] = value
else:
logger.warning(f"上下文构建任务异常: {result}")
except Exception as e:
logger.error(f"并行构建上下文失败: {e}")
return results
async def _wrap_task(self, name: str, coro) -> tuple[str, str]:
"""包装任务以返回名称和结果"""
try:
result = await coro
return (name, result or "")
except Exception as e:
logger.error(f"构建 {name} 失败: {e}")
return (name, "")
async def _get_chat_history_text(
self,
context: Optional["StreamContext"] = None,
limit: int = 20,
) -> str:
"""获取聊天历史文本"""
if context is None:
return ""
try:
from src.chat.utils.chat_message_builder import build_readable_messages
messages = context.get_messages(limit=limit, include_unread=True)
if not messages:
return ""
msg_dicts = [msg.flatten() for msg in messages]
return await build_readable_messages(
msg_dicts,
replace_bot_name=True,
timestamp_mode="relative",
truncate=True,
)
except Exception as e:
logger.error(f"获取聊天历史失败: {e}")
return ""
async def _build_relation_info(self, sender_name: str, target_message: str) -> str:
"""构建关系信息块"""
config = _get_config()
if sender_name == f"{config.bot.nickname}(你)":
return "你将要回复的是你自己发送的消息。"
person_info_manager = get_person_info_manager()
person_id = await person_info_manager.get_person_id_by_person_name(sender_name)
if not person_id:
logger.debug(f"未找到用户 {sender_name} 的ID")
return f"你完全不认识{sender_name},这是你们的第一次互动。"
try:
from src.person_info.relationship_fetcher import relationship_fetcher_manager
relationship_fetcher = relationship_fetcher_manager.get_fetcher(self.chat_id)
user_relation_info = await relationship_fetcher.build_relation_info(person_id, points_num=5)
stream_impression = await relationship_fetcher.build_chat_stream_impression(self.chat_id)
parts = []
if user_relation_info:
parts.append(f"### 你与 {sender_name} 的关系\n{user_relation_info}")
if stream_impression:
scene_type = "这个群" if self.is_group_chat else "你们的私聊"
parts.append(f"### 你对{scene_type}的印象\n{stream_impression}")
if parts:
return "\n\n".join(parts)
else:
return f"你与{sender_name}还没有建立深厚的关系,这是早期的互动阶段。"
except Exception as e:
logger.error(f"获取关系信息失败: {e}")
return f"你与{sender_name}是普通朋友关系。"
async def _build_memory_block(self, chat_history: str, target_message: str) -> str:
"""构建记忆块(使用三层记忆系统)"""
config = _get_config()
if not (config.memory and config.memory.enable):
return ""
try:
from src.memory_graph.manager_singleton import get_unified_memory_manager
from src.memory_graph.utils.three_tier_formatter import memory_formatter
unified_manager = get_unified_memory_manager()
if not unified_manager:
logger.debug("[三层记忆] 管理器未初始化")
return ""
search_result = await unified_manager.search_memories(
query_text=target_message,
use_judge=True,
recent_chat_history=chat_history,
)
if not search_result:
return ""
perceptual_blocks = search_result.get("perceptual_blocks", [])
short_term_memories = search_result.get("short_term_memories", [])
long_term_memories = search_result.get("long_term_memories", [])
formatted_memories = await memory_formatter.format_all_tiers(
perceptual_blocks=perceptual_blocks,
short_term_memories=short_term_memories,
long_term_memories=long_term_memories
)
total_count = len(perceptual_blocks) + len(short_term_memories) + len(long_term_memories)
if total_count > 0 and formatted_memories.strip():
logger.info(
f"[三层记忆] 检索到 {total_count} 条记忆 "
f"(感知:{len(perceptual_blocks)}, 短期:{len(short_term_memories)}, 长期:{len(long_term_memories)})"
)
return f"### 🧠 相关记忆\n\n{formatted_memories}"
return ""
except Exception as e:
logger.error(f"[三层记忆] 检索失败: {e}")
return ""
async def _build_expression_habits(self, chat_history: str, target_message: str) -> str:
"""构建表达习惯块"""
config = _get_config()
use_expression, _, _ = config.expression.get_expression_config_for_chat(self.chat_id)
if not use_expression:
return ""
try:
from src.chat.express.expression_selector import expression_selector
style_habits = []
grammar_habits = []
selected_expressions = await expression_selector.select_suitable_expressions(
chat_id=self.chat_id,
chat_history=chat_history,
target_message=target_message,
max_num=8,
min_num=2
)
if selected_expressions:
for expr in selected_expressions:
if isinstance(expr, dict) and "situation" in expr and "style" in expr:
expr_type = expr.get("type", "style")
habit_str = f"{expr['situation']}时,使用 {expr['style']}"
if expr_type == "grammar":
grammar_habits.append(habit_str)
else:
style_habits.append(habit_str)
parts = []
if style_habits:
parts.append("**语言风格习惯**\n" + "\n".join(f"- {h}" for h in style_habits))
if grammar_habits:
parts.append("**句法习惯**\n" + "\n".join(f"- {h}" for h in grammar_habits))
if parts:
return "### 💬 你的表达习惯\n\n" + "\n\n".join(parts)
return ""
except Exception as e:
logger.error(f"构建表达习惯失败: {e}")
return ""
async def _build_schedule_block(self) -> str:
"""构建日程信息块"""
config = _get_config()
if not config.planning_system.schedule_enable:
return ""
try:
from src.schedule.schedule_manager import schedule_manager
activity_info = schedule_manager.get_current_activity()
if not activity_info:
return ""
activity = activity_info.get("activity")
time_range = activity_info.get("time_range")
now = datetime.now()
if time_range:
try:
start_str, end_str = time_range.split("-")
start_time = datetime.strptime(start_str.strip(), "%H:%M").replace(
year=now.year, month=now.month, day=now.day
)
end_time = datetime.strptime(end_str.strip(), "%H:%M").replace(
year=now.year, month=now.month, day=now.day
)
if end_time < start_time:
end_time += timedelta(days=1)
if now < start_time:
now += timedelta(days=1)
duration_minutes = (now - start_time).total_seconds() / 60
remaining_minutes = (end_time - now).total_seconds() / 60
return (
f"你当前正在「{activity}」,"
f"{start_time.strftime('%H:%M')}开始,预计{end_time.strftime('%H:%M')}结束,"
f"已进行{duration_minutes:.0f}分钟,还剩约{remaining_minutes:.0f}分钟。"
)
except (ValueError, AttributeError):
pass
return f"你当前正在「{activity}"
except Exception as e:
logger.error(f"构建日程块失败: {e}")
return ""
async def _build_time_block(self) -> str:
"""构建时间信息块"""
now = datetime.now()
weekdays = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
weekday = weekdays[now.weekday()]
return f"{now.strftime('%Y年%m月%d')} {weekday} {now.strftime('%H:%M:%S')}"
async def build_kfc_context(
chat_stream: "ChatStream",
sender_name: str,
target_message: str,
context: Optional["StreamContext"] = None,
) -> dict[str, str]:
"""
便捷函数构建KFC所需的所有上下文
"""
builder = KFCContextBuilder(chat_stream)
return await builder.build_all_context(sender_name, target_message, context)
__all__ = [
"KFCContextBuilder",
"build_kfc_context",
]

View File

@@ -0,0 +1,320 @@
"""
Kokoro Flow Chatter V2 - 数据模型
定义核心数据结构:
- EventType: 活动流事件类型
- SessionStatus: 会话状态(仅 IDLE 和 WAITING
- MentalLogEntry: 心理活动日志条目
- WaitingConfig: 等待配置
- ActionModel: 动作模型
- LLMResponse: LLM 响应结构
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import time
class EventType(Enum):
"""
活动流事件类型
用于标记 mental_log 中不同类型的事件,
每种类型对应一个提示词小模板
"""
# 用户相关
USER_MESSAGE = "user_message" # 用户发送消息
# Bot 行动相关
BOT_PLANNING = "bot_planning" # Bot 规划thought + actions
# 等待相关
WAITING_START = "waiting_start" # 开始等待
WAITING_UPDATE = "waiting_update" # 等待期间心理变化
REPLY_RECEIVED_IN_TIME = "reply_in_time" # 在预期内收到回复
REPLY_RECEIVED_LATE = "reply_late" # 超出预期收到回复
WAIT_TIMEOUT = "wait_timeout" # 等待超时
# 主动思考相关
PROACTIVE_TRIGGER = "proactive_trigger" # 主动思考触发(长期沉默)
def __str__(self) -> str:
return self.value
class SessionStatus(Enum):
"""
会话状态
极简设计,只有两种稳定状态:
- IDLE: 空闲,没有期待回复
- WAITING: 等待对方回复中
"""
IDLE = "idle"
WAITING = "waiting"
def __str__(self) -> str:
return self.value
@dataclass
class WaitingConfig:
"""
等待配置
当 Bot 发送消息后设置的等待参数
"""
expected_reaction: str = "" # 期望对方如何回应
max_wait_seconds: int = 0 # 最长等待时间0 表示不等待
started_at: float = 0.0 # 开始等待的时间戳
last_thinking_at: float = 0.0 # 上次连续思考的时间戳
thinking_count: int = 0 # 连续思考次数
def is_active(self) -> bool:
"""是否正在等待"""
return self.max_wait_seconds > 0 and self.started_at > 0
def get_elapsed_seconds(self) -> float:
"""获取已等待时间(秒)"""
if not self.is_active():
return 0.0
return time.time() - self.started_at
def get_elapsed_minutes(self) -> float:
"""获取已等待时间(分钟)"""
return self.get_elapsed_seconds() / 60
def is_timeout(self) -> bool:
"""是否已超时"""
if not self.is_active():
return False
return self.get_elapsed_seconds() >= self.max_wait_seconds
def get_progress(self) -> float:
"""获取等待进度 (0.0 - 1.0)"""
if not self.is_active() or self.max_wait_seconds <= 0:
return 0.0
return min(self.get_elapsed_seconds() / self.max_wait_seconds, 1.0)
def to_dict(self) -> dict[str, Any]:
return {
"expected_reaction": self.expected_reaction,
"max_wait_seconds": self.max_wait_seconds,
"started_at": self.started_at,
"last_thinking_at": self.last_thinking_at,
"thinking_count": self.thinking_count,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "WaitingConfig":
return cls(
expected_reaction=data.get("expected_reaction", ""),
max_wait_seconds=data.get("max_wait_seconds", 0),
started_at=data.get("started_at", 0.0),
last_thinking_at=data.get("last_thinking_at", 0.0),
thinking_count=data.get("thinking_count", 0),
)
def reset(self) -> None:
"""重置等待配置"""
self.expected_reaction = ""
self.max_wait_seconds = 0
self.started_at = 0.0
self.last_thinking_at = 0.0
self.thinking_count = 0
@dataclass
class MentalLogEntry:
"""
心理活动日志条目
记录活动流中的每一个事件节点,
用于构建线性叙事风格的提示词
"""
event_type: EventType
timestamp: float
# 通用字段
content: str = "" # 事件内容(消息文本、动作描述等)
# 用户消息相关
user_name: str = "" # 发送者名称
user_id: str = "" # 发送者 ID
# Bot 规划相关
thought: str = "" # 内心想法
actions: list[dict] = field(default_factory=list) # 执行的动作列表
expected_reaction: str = "" # 期望的回应
max_wait_seconds: int = 0 # 设定的等待时间
# 等待相关
elapsed_seconds: float = 0.0 # 已等待时间
waiting_thought: str = "" # 等待期间的想法
mood: str = "" # 当前心情
# 元数据
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"event_type": str(self.event_type),
"timestamp": self.timestamp,
"content": self.content,
"user_name": self.user_name,
"user_id": self.user_id,
"thought": self.thought,
"actions": self.actions,
"expected_reaction": self.expected_reaction,
"max_wait_seconds": self.max_wait_seconds,
"elapsed_seconds": self.elapsed_seconds,
"waiting_thought": self.waiting_thought,
"mood": self.mood,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MentalLogEntry":
event_type_str = data.get("event_type", "user_message")
try:
event_type = EventType(event_type_str)
except ValueError:
event_type = EventType.USER_MESSAGE
return cls(
event_type=event_type,
timestamp=data.get("timestamp", time.time()),
content=data.get("content", ""),
user_name=data.get("user_name", ""),
user_id=data.get("user_id", ""),
thought=data.get("thought", ""),
actions=data.get("actions", []),
expected_reaction=data.get("expected_reaction", ""),
max_wait_seconds=data.get("max_wait_seconds", 0),
elapsed_seconds=data.get("elapsed_seconds", 0.0),
waiting_thought=data.get("waiting_thought", ""),
mood=data.get("mood", ""),
metadata=data.get("metadata", {}),
)
def get_time_str(self, format: str = "%H:%M") -> str:
"""获取格式化的时间字符串"""
return time.strftime(format, time.localtime(self.timestamp))
@dataclass
class ActionModel:
"""
动作模型
表示 LLM 决策的单个动作
"""
type: str # 动作类型
params: dict[str, Any] = field(default_factory=dict) # 动作参数
reason: str = "" # 选择该动作的理由
def to_dict(self) -> dict[str, Any]:
result = {"type": self.type}
if self.reason:
result["reason"] = self.reason
result.update(self.params)
return result
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ActionModel":
action_type = data.get("type", "do_nothing")
reason = data.get("reason", "")
params = {k: v for k, v in data.items() if k not in ("type", "reason")}
return cls(type=action_type, params=params, reason=reason)
def get_description(self) -> str:
"""获取动作的文字描述"""
if self.type == "reply":
content = self.params.get("content", "")
return f'发送消息:"{content[:50]}{"..." if len(content) > 50 else ""}"'
elif self.type == "poke_user":
return "戳了戳对方"
elif self.type == "do_nothing":
return "什么都没做"
elif self.type == "send_emoji":
emoji = self.params.get("emoji", "")
return f"发送表情:{emoji}"
else:
return f"执行动作:{self.type}"
@dataclass
class LLMResponse:
"""
LLM 响应结构
定义 LLM 输出的 JSON 格式
"""
thought: str # 内心想法
actions: list[ActionModel] # 动作列表
expected_reaction: str = "" # 期望对方的回应
max_wait_seconds: int = 0 # 最长等待时间0 = 不等待)
# 可选字段
mood: str = "" # 当前心情
def to_dict(self) -> dict[str, Any]:
return {
"thought": self.thought,
"actions": [a.to_dict() for a in self.actions],
"expected_reaction": self.expected_reaction,
"max_wait_seconds": self.max_wait_seconds,
"mood": self.mood,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "LLMResponse":
actions_data = data.get("actions", [])
actions = [ActionModel.from_dict(a) for a in actions_data] if actions_data else []
# 如果没有动作,添加默认的 do_nothing
if not actions:
actions = [ActionModel(type="do_nothing")]
# 处理 max_wait_seconds确保在合理范围内
max_wait = data.get("max_wait_seconds", 0)
try:
max_wait = int(max_wait)
max_wait = max(0, min(max_wait, 1800)) # 0-30分钟
except (ValueError, TypeError):
max_wait = 0
return cls(
thought=data.get("thought", ""),
actions=actions,
expected_reaction=data.get("expected_reaction", ""),
max_wait_seconds=max_wait,
mood=data.get("mood", ""),
)
@classmethod
def create_error_response(cls, error_message: str) -> "LLMResponse":
"""创建错误响应"""
return cls(
thought=f"出现了问题:{error_message}",
actions=[ActionModel(type="do_nothing")],
expected_reaction="",
max_wait_seconds=0,
)
def has_reply(self) -> bool:
"""是否包含回复动作"""
return any(a.type in ("reply", "respond") for a in self.actions)
def get_reply_content(self) -> str:
"""获取回复内容"""
for action in self.actions:
if action.type in ("reply", "respond"):
return action.params.get("content", "")
return ""
def get_actions_description(self) -> str:
"""获取所有动作的文字描述"""
descriptions = [a.get_description() for a in self.actions]
return " + ".join(descriptions)

View File

@@ -0,0 +1,105 @@
"""
Kokoro Flow Chatter V2 - 插件注册
注册 Chatter
"""
from typing import Any, ClassVar
from src.common.logger import get_logger
from src.plugin_system.base.base_plugin import BasePlugin
from src.plugin_system.base.component_types import ChatterInfo
from src.plugin_system.decorators import register_plugin
from .chatter import KokoroFlowChatterV2
from .config import get_config
from .proactive_thinker import start_proactive_thinker, stop_proactive_thinker
logger = get_logger("kfc_v2_plugin")
@register_plugin
class KokoroFlowChatterV2Plugin(BasePlugin):
"""
Kokoro Flow Chatter V2 插件
专为私聊设计的增强 Chatter
- 线性叙事提示词架构
- 等待机制与心理状态演变
- 主动思考能力
"""
plugin_name: str = "kokoro_flow_chatter_v2"
enable_plugin: bool = True
plugin_priority: int = 50 # 高于默认 Chatter
dependencies: ClassVar[list[str]] = []
python_dependencies: ClassVar[list[str]] = []
config_file_name: str = "config.toml"
# 状态
_is_started: bool = False
async def on_plugin_loaded(self):
"""插件加载时"""
config = get_config()
if not config.enabled:
logger.info("[KFC V2] 插件已禁用")
return
logger.info("[KFC V2] 插件已加载")
# 启动主动思考器
if config.proactive.enabled:
try:
await start_proactive_thinker()
logger.info("[KFC V2] 主动思考器已启动")
self._is_started = True
except Exception as e:
logger.error(f"[KFC V2] 启动主动思考器失败: {e}")
async def on_plugin_unloaded(self):
"""插件卸载时"""
try:
await stop_proactive_thinker()
logger.info("[KFC V2] 主动思考器已停止")
self._is_started = False
except Exception as e:
logger.warning(f"[KFC V2] 停止主动思考器失败: {e}")
def get_plugin_components(self):
"""返回组件列表"""
config = get_config()
if not config.enabled:
return []
components = []
try:
# 注册 Chatter
components.append((
KokoroFlowChatterV2.get_chatter_info(),
KokoroFlowChatterV2,
))
logger.debug("[KFC V2] 成功加载 KokoroFlowChatterV2 组件")
except Exception as e:
logger.error(f"[KFC V2] 加载组件失败: {e}")
return components
def get_plugin_info(self) -> dict[str, Any]:
"""获取插件信息"""
return {
"name": self.plugin_name,
"display_name": "Kokoro Flow Chatter V2",
"version": "2.0.0",
"author": "MoFox",
"description": "专为私聊设计的增强 Chatter",
"features": [
"线性叙事提示词架构",
"心理活动流记录",
"等待机制与超时处理",
"主动思考能力",
],
}

View File

@@ -0,0 +1,500 @@
"""
Kokoro Flow Chatter V2 - 主动思考器
独立组件,负责:
1. 等待期间的连续思考(更新心理状态)
2. 等待超时决策(继续等 or 做点什么)
3. 长期沉默后主动发起对话
通过 UnifiedScheduler 定期触发,与 Chatter 解耦
"""
import asyncio
import random
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.apis.unified_scheduler import TriggerType, unified_scheduler
from .action_executor import ActionExecutor
from .models import EventType, SessionStatus
from .replyer import generate_response
from .session import KokoroSession, get_session_manager
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
logger = get_logger("kfc_v2_proactive_thinker")
class ProactiveThinker:
"""
主动思考器
独立于 Chatter负责处理
1. 等待期间的连续思考
2. 等待超时
3. 长期沉默后主动发起
核心逻辑:
- 定期检查所有 WAITING 状态的 Session
- 触发连续思考或超时决策
- 定期检查长期沉默的 Session考虑主动发起
"""
# 连续思考触发点(等待进度百分比)
THINKING_TRIGGERS = [0.3, 0.6, 0.85]
# 任务名称
TASK_WAITING_CHECK = "kfc_v2_waiting_check"
TASK_PROACTIVE_CHECK = "kfc_v2_proactive_check"
def __init__(self):
self.session_manager = get_session_manager()
# 配置
self._load_config()
# 调度任务 ID
self._waiting_schedule_id: Optional[str] = None
self._proactive_schedule_id: Optional[str] = None
self._running = False
# 统计
self._stats = {
"waiting_checks": 0,
"continuous_thinking_triggered": 0,
"timeout_decisions": 0,
"proactive_triggered": 0,
}
def _load_config(self) -> None:
"""加载配置"""
# 默认配置
self.waiting_check_interval = 15.0 # 等待检查间隔(秒)
self.proactive_check_interval = 300.0 # 主动思考检查间隔(秒)
self.silence_threshold = 7200 # 沉默阈值(秒)
self.min_proactive_interval = 1800 # 两次主动思考最小间隔(秒)
self.quiet_hours_start = "23:00"
self.quiet_hours_end = "07:00"
# 从全局配置读取
if global_config and hasattr(global_config, 'kokoro_flow_chatter'):
kfc_config = global_config.kokoro_flow_chatter
if hasattr(kfc_config, 'proactive_thinking'):
proactive_cfg = kfc_config.proactive_thinking
self.silence_threshold = getattr(proactive_cfg, 'silence_threshold_seconds', 7200)
self.min_proactive_interval = getattr(proactive_cfg, 'min_interval_between_proactive', 1800)
async def start(self) -> None:
"""启动主动思考器"""
if self._running:
logger.warning("[ProactiveThinker] 已在运行中")
return
self._running = True
# 注册等待检查任务
self._waiting_schedule_id = await unified_scheduler.create_schedule(
callback=self._check_waiting_sessions,
trigger_type=TriggerType.TIME,
trigger_config={"delay_seconds": self.waiting_check_interval},
is_recurring=True,
task_name=self.TASK_WAITING_CHECK,
force_overwrite=True,
timeout=60.0,
)
# 注册主动思考检查任务
self._proactive_schedule_id = await unified_scheduler.create_schedule(
callback=self._check_proactive_sessions,
trigger_type=TriggerType.TIME,
trigger_config={"delay_seconds": self.proactive_check_interval},
is_recurring=True,
task_name=self.TASK_PROACTIVE_CHECK,
force_overwrite=True,
timeout=120.0,
)
logger.info("[ProactiveThinker] 已启动")
async def stop(self) -> None:
"""停止主动思考器"""
if not self._running:
return
self._running = False
if self._waiting_schedule_id:
await unified_scheduler.remove_schedule(self._waiting_schedule_id)
if self._proactive_schedule_id:
await unified_scheduler.remove_schedule(self._proactive_schedule_id)
logger.info("[ProactiveThinker] 已停止")
# ========================
# 等待检查
# ========================
async def _check_waiting_sessions(self) -> None:
"""检查所有等待中的 Session"""
self._stats["waiting_checks"] += 1
sessions = await self.session_manager.get_waiting_sessions()
if not sessions:
return
# 并行处理
tasks = [
asyncio.create_task(self._process_waiting_session(s))
for s in sessions
]
await asyncio.gather(*tasks, return_exceptions=True)
async def _process_waiting_session(self, session: KokoroSession) -> None:
"""处理单个等待中的 Session"""
try:
if session.status != SessionStatus.WAITING:
return
if not session.waiting_config.is_active():
return
# 检查是否超时
if session.waiting_config.is_timeout():
await self._handle_timeout(session)
return
# 检查是否需要触发连续思考
progress = session.waiting_config.get_progress()
if self._should_trigger_thinking(session, progress):
await self._handle_continuous_thinking(session, progress)
except Exception as e:
logger.error(f"[ProactiveThinker] 处理等待 Session 失败 {session.user_id}: {e}")
def _should_trigger_thinking(self, session: KokoroSession, progress: float) -> bool:
"""判断是否应触发连续思考"""
# 计算应该触发的次数
expected_count = sum(1 for t in self.THINKING_TRIGGERS if progress >= t)
if session.waiting_config.thinking_count >= expected_count:
return False
# 确保两次思考之间有间隔
if session.waiting_config.last_thinking_at > 0:
elapsed = time.time() - session.waiting_config.last_thinking_at
if elapsed < 30: # 至少 30 秒间隔
return False
return True
async def _handle_continuous_thinking(
self,
session: KokoroSession,
progress: float,
) -> None:
"""处理连续思考"""
self._stats["continuous_thinking_triggered"] += 1
# 生成等待中的想法
thought = self._generate_waiting_thought(session, progress)
# 记录到 mental_log
session.add_waiting_update(
waiting_thought=thought,
mood="", # 可以根据进度设置心情
)
# 更新思考计数
session.waiting_config.thinking_count += 1
session.waiting_config.last_thinking_at = time.time()
# 保存
await self.session_manager.save_session(session.user_id)
logger.debug(
f"[ProactiveThinker] 连续思考: user={session.user_id}, "
f"progress={progress:.1%}, thought={thought[:30]}..."
)
def _generate_waiting_thought(self, session: KokoroSession, progress: float) -> str:
"""生成等待中的想法"""
elapsed_minutes = session.waiting_config.get_elapsed_minutes()
if progress < 0.4:
thoughts = [
f"已经等了 {elapsed_minutes:.0f} 分钟了,对方可能在忙吧...",
"不知道对方在做什么呢",
"再等等看吧",
]
elif progress < 0.7:
thoughts = [
f"等了 {elapsed_minutes:.0f} 分钟了,有点担心...",
"对方是不是忘记回复了?",
"嗯...还是没有消息",
]
else:
thoughts = [
f"已经等了 {elapsed_minutes:.0f} 分钟了,感觉有点焦虑",
"要不要主动说点什么呢...",
"快到时间了,对方还是没回",
]
return random.choice(thoughts)
async def _handle_timeout(self, session: KokoroSession) -> None:
"""处理等待超时"""
self._stats["timeout_decisions"] += 1
logger.info(f"[ProactiveThinker] 等待超时: user={session.user_id}")
try:
# 获取聊天流
chat_stream = await self._get_chat_stream(session.stream_id)
# 加载动作
action_executor = ActionExecutor(session.stream_id)
await action_executor.load_actions()
# 调用 Replyer 生成超时决策
response = await generate_response(
session=session,
user_name=session.user_id, # 这里可以改进,获取真实用户名
situation_type="timeout",
chat_stream=chat_stream,
available_actions=action_executor.get_available_actions(),
)
# 执行动作
exec_result = await action_executor.execute(response, chat_stream)
# 记录到 mental_log
session.add_bot_planning(
thought=response.thought,
actions=[a.to_dict() for a in response.actions],
expected_reaction=response.expected_reaction,
max_wait_seconds=response.max_wait_seconds,
)
# 更新状态
if response.max_wait_seconds > 0:
# 继续等待
session.start_waiting(
expected_reaction=response.expected_reaction,
max_wait_seconds=response.max_wait_seconds,
)
else:
# 不再等待
session.end_waiting()
# 保存
await self.session_manager.save_session(session.user_id)
logger.info(
f"[ProactiveThinker] 超时决策完成: user={session.user_id}, "
f"actions={[a.type for a in response.actions]}, "
f"continue_wait={response.max_wait_seconds > 0}"
)
except Exception as e:
logger.error(f"[ProactiveThinker] 处理超时失败: {e}")
# 出错时结束等待
session.end_waiting()
await self.session_manager.save_session(session.user_id)
# ========================
# 主动思考(长期沉默)
# ========================
async def _check_proactive_sessions(self) -> None:
"""检查是否有需要主动发起对话的 Session"""
# 检查是否在勿扰时段
if self._is_quiet_hours():
return
sessions = await self.session_manager.get_all_sessions()
current_time = time.time()
for session in sessions:
try:
trigger_reason = self._should_trigger_proactive(session, current_time)
if trigger_reason:
await self._handle_proactive(session, trigger_reason)
except Exception as e:
logger.error(f"[ProactiveThinker] 检查主动思考失败 {session.user_id}: {e}")
def _is_quiet_hours(self) -> bool:
"""检查是否在勿扰时段"""
try:
now = datetime.now()
current_minutes = now.hour * 60 + now.minute
start_parts = self.quiet_hours_start.split(":")
start_minutes = int(start_parts[0]) * 60 + int(start_parts[1])
end_parts = self.quiet_hours_end.split(":")
end_minutes = int(end_parts[0]) * 60 + int(end_parts[1])
if start_minutes <= end_minutes:
return start_minutes <= current_minutes < end_minutes
else:
return current_minutes >= start_minutes or current_minutes < end_minutes
except:
return False
def _should_trigger_proactive(
self,
session: KokoroSession,
current_time: float,
) -> Optional[str]:
"""判断是否应触发主动思考"""
# 只检查 IDLE 状态的 Session
if session.status != SessionStatus.IDLE:
return None
# 检查沉默时长
silence_duration = current_time - session.last_activity_at
if silence_duration < self.silence_threshold:
return None
# 检查距离上次主动思考的间隔
if session.last_proactive_at:
time_since_last = current_time - session.last_proactive_at
if time_since_last < self.min_proactive_interval:
return None
# 概率触发(避免每次检查都触发)
if random.random() > 0.3: # 30% 概率
return None
silence_hours = silence_duration / 3600
return f"沉默了 {silence_hours:.1f} 小时"
async def _handle_proactive(
self,
session: KokoroSession,
trigger_reason: str,
) -> None:
"""处理主动思考"""
self._stats["proactive_triggered"] += 1
logger.info(f"[ProactiveThinker] 主动思考触发: user={session.user_id}, reason={trigger_reason}")
try:
# 获取聊天流
chat_stream = await self._get_chat_stream(session.stream_id)
# 加载动作
action_executor = ActionExecutor(session.stream_id)
await action_executor.load_actions()
# 计算沉默时长
silence_seconds = time.time() - session.last_activity_at
if silence_seconds < 3600:
silence_duration = f"{silence_seconds / 60:.0f} 分钟"
else:
silence_duration = f"{silence_seconds / 3600:.1f} 小时"
# 调用 Replyer
response = await generate_response(
session=session,
user_name=session.user_id,
situation_type="proactive",
chat_stream=chat_stream,
available_actions=action_executor.get_available_actions(),
extra_context={
"trigger_reason": trigger_reason,
"silence_duration": silence_duration,
},
)
# 检查是否决定不打扰
is_do_nothing = (
len(response.actions) == 0 or
(len(response.actions) == 1 and response.actions[0].type == "do_nothing")
)
if is_do_nothing:
logger.info(f"[ProactiveThinker] 决定不打扰: user={session.user_id}")
session.last_proactive_at = time.time()
await self.session_manager.save_session(session.user_id)
return
# 执行动作
exec_result = await action_executor.execute(response, chat_stream)
# 记录到 mental_log
session.add_bot_planning(
thought=response.thought,
actions=[a.to_dict() for a in response.actions],
expected_reaction=response.expected_reaction,
max_wait_seconds=response.max_wait_seconds,
)
# 更新状态
session.last_proactive_at = time.time()
if response.max_wait_seconds > 0:
session.start_waiting(
expected_reaction=response.expected_reaction,
max_wait_seconds=response.max_wait_seconds,
)
# 保存
await self.session_manager.save_session(session.user_id)
logger.info(
f"[ProactiveThinker] 主动发起完成: user={session.user_id}, "
f"actions={[a.type for a in response.actions]}"
)
except Exception as e:
logger.error(f"[ProactiveThinker] 主动思考失败: {e}")
async def _get_chat_stream(self, stream_id: str):
"""获取聊天流"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
if chat_manager:
return await chat_manager.get_stream(stream_id)
except Exception as e:
logger.warning(f"[ProactiveThinker] 获取 chat_stream 失败: {e}")
return None
def get_stats(self) -> dict:
"""获取统计信息"""
return {
**self._stats,
"is_running": self._running,
}
# 全局单例
_proactive_thinker: Optional[ProactiveThinker] = None
def get_proactive_thinker() -> ProactiveThinker:
"""获取全局主动思考器"""
global _proactive_thinker
if _proactive_thinker is None:
_proactive_thinker = ProactiveThinker()
return _proactive_thinker
async def start_proactive_thinker() -> ProactiveThinker:
"""启动主动思考器"""
thinker = get_proactive_thinker()
await thinker.start()
return thinker
async def stop_proactive_thinker() -> None:
"""停止主动思考器"""
global _proactive_thinker
if _proactive_thinker:
await _proactive_thinker.stop()

View File

@@ -0,0 +1,16 @@
"""
Kokoro Flow Chatter V2 - 提示词模块
使用项目统一的 Prompt 管理系统管理所有提示词模板
"""
# 导入 prompts 模块以注册提示词
from . import prompts # noqa: F401
from .builder import PromptBuilder, get_prompt_builder
from .prompts import PROMPT_NAMES
__all__ = [
"PromptBuilder",
"get_prompt_builder",
"PROMPT_NAMES",
]

View File

@@ -0,0 +1,388 @@
"""
Kokoro Flow Chatter V2 - 提示词构建器
使用项目统一的 Prompt 管理系统构建提示词
"""
import time
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from src.chat.utils.prompt import global_prompt_manager
from src.common.logger import get_logger
from src.config.config import global_config
from ..models import EventType, MentalLogEntry, SessionStatus
from ..session import KokoroSession
# 导入模板注册(确保模板被注册到 global_prompt_manager
from . import prompts as _ # noqa: F401
from .prompts import PROMPT_NAMES
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
logger = get_logger("kfc_v2_prompt_builder")
class PromptBuilder:
"""
提示词构建器
使用统一的 Prompt 管理系统构建提示词:
1. 构建活动流(从 mental_log 生成线性叙事)
2. 构建当前情况描述
3. 使用 global_prompt_manager 格式化最终提示词
"""
def __init__(self):
self._context_builder = None
async def build_prompt(
self,
session: KokoroSession,
user_name: str,
situation_type: str = "new_message",
chat_stream: Optional["ChatStream"] = None,
available_actions: Optional[dict] = None,
extra_context: Optional[dict] = None,
) -> str:
"""
构建完整的提示词
Args:
session: 会话对象
user_name: 用户名称
situation_type: 情况类型 (new_message/reply_in_time/reply_late/timeout/proactive)
chat_stream: 聊天流对象
available_actions: 可用动作字典
extra_context: 额外上下文(如 trigger_reason
Returns:
完整的提示词
"""
extra_context = extra_context or {}
# 1. 构建人设块
persona_block = self._build_persona_block()
# 2. 构建关系块
relation_block = await self._build_relation_block(user_name, chat_stream)
# 3. 构建活动流
activity_stream = await self._build_activity_stream(session, user_name)
# 4. 构建当前情况
current_situation = await self._build_current_situation(
session, user_name, situation_type, extra_context
)
# 5. 构建可用动作
actions_block = self._build_actions_block(available_actions)
# 6. 获取输出格式
output_format = await self._get_output_format()
# 7. 使用统一的 prompt 管理系统格式化
prompt = await global_prompt_manager.format_prompt(
PROMPT_NAMES["main"],
user_name=user_name,
persona_block=persona_block,
relation_block=relation_block,
activity_stream=activity_stream or "(这是你们第一次聊天)",
current_situation=current_situation,
available_actions=actions_block,
output_format=output_format,
)
return prompt
def _build_persona_block(self) -> str:
"""构建人设块"""
if global_config is None:
return "你是一个温暖、真诚的人。"
personality = global_config.personality
parts = []
if personality.personality_core:
parts.append(personality.personality_core)
if personality.personality_side:
parts.append(personality.personality_side)
if personality.identity:
parts.append(personality.identity)
if personality.reply_style:
parts.append(f"\n### 说话风格\n{personality.reply_style}")
return "\n\n".join(parts) if parts else "你是一个温暖、真诚的人。"
async def _build_relation_block(
self,
user_name: str,
chat_stream: Optional["ChatStream"],
) -> str:
"""构建关系块"""
if not chat_stream:
return f"你与 {user_name} 还不太熟悉,这是早期的交流阶段。"
try:
# 延迟导入上下文构建器
if self._context_builder is None:
from ..context_builder import KFCContextBuilder
self._context_builder = KFCContextBuilder
builder = self._context_builder(chat_stream)
context_data = await builder.build_all_context(
sender_name=user_name,
target_message="",
context=None,
)
relation_info = context_data.get("relation_info", "")
if relation_info:
return relation_info
except Exception as e:
logger.warning(f"构建关系块失败: {e}")
return f"你与 {user_name} 还不太熟悉,这是早期的交流阶段。"
async def _build_activity_stream(
self,
session: KokoroSession,
user_name: str,
) -> str:
"""
构建活动流
将 mental_log 中的事件按时间顺序转换为线性叙事
使用统一的 prompt 模板
"""
entries = session.get_recent_entries(limit=30)
if not entries:
return ""
parts = []
for entry in entries:
part = await self._format_entry(entry, user_name)
if part:
parts.append(part)
return "\n\n".join(parts)
async def _format_entry(self, entry: MentalLogEntry, user_name: str) -> str:
"""格式化单个活动日志条目"""
if entry.event_type == EventType.USER_MESSAGE:
# 用户消息
result = await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_user_message"],
time=entry.get_time_str(),
user_name=entry.user_name or user_name,
content=entry.content,
)
# 如果有回复状态元数据,添加说明
reply_status = entry.metadata.get("reply_status")
if reply_status == "in_time":
elapsed = entry.metadata.get("elapsed_seconds", 0) / 60
max_wait = entry.metadata.get("max_wait_seconds", 0) / 60
result += await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_reply_in_time"],
elapsed_minutes=elapsed,
max_wait_minutes=max_wait,
)
elif reply_status == "late":
elapsed = entry.metadata.get("elapsed_seconds", 0) / 60
max_wait = entry.metadata.get("max_wait_seconds", 0) / 60
result += await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_reply_late"],
elapsed_minutes=elapsed,
max_wait_minutes=max_wait,
)
return result
elif entry.event_type == EventType.BOT_PLANNING:
# Bot 规划
actions_desc = self._format_actions(entry.actions)
if entry.max_wait_seconds > 0:
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_bot_planning"],
thought=entry.thought or "(没有特别的想法)",
actions_description=actions_desc,
expected_reaction=entry.expected_reaction or "随便怎么回应都行",
max_wait_minutes=entry.max_wait_seconds / 60,
)
else:
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_bot_planning_no_wait"],
thought=entry.thought or "(没有特别的想法)",
actions_description=actions_desc,
)
elif entry.event_type == EventType.WAITING_UPDATE:
# 等待中心理变化
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_waiting_update"],
elapsed_minutes=entry.elapsed_seconds / 60,
waiting_thought=entry.waiting_thought or "还在等...",
)
elif entry.event_type == EventType.PROACTIVE_TRIGGER:
# 主动思考触发
silence = entry.metadata.get("silence_duration", "一段时间")
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["entry_proactive_trigger"],
silence_duration=silence,
)
return ""
def _format_actions(self, actions: list[dict]) -> str:
"""格式化动作列表为可读描述"""
if not actions:
return "(无动作)"
descriptions = []
for action in actions:
action_type = action.get("type", "unknown")
if action_type == "reply":
content = action.get("content", "")
if len(content) > 50:
content = content[:50] + "..."
descriptions.append(f"发送消息:「{content}")
elif action_type == "poke_user":
descriptions.append("戳了戳对方")
elif action_type == "do_nothing":
descriptions.append("什么都不做")
elif action_type == "send_emoji":
emoji = action.get("emoji", "")
descriptions.append(f"发送表情:{emoji}")
else:
descriptions.append(f"执行动作:{action_type}")
return "".join(descriptions)
async def _build_current_situation(
self,
session: KokoroSession,
user_name: str,
situation_type: str,
extra_context: dict,
) -> str:
"""构建当前情况描述"""
current_time = datetime.now().strftime("%Y年%m月%d%H:%M")
if situation_type == "new_message":
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["situation_new_message"],
current_time=current_time,
user_name=user_name,
)
elif situation_type == "reply_in_time":
elapsed = session.waiting_config.get_elapsed_seconds()
max_wait = session.waiting_config.max_wait_seconds
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["situation_reply_in_time"],
current_time=current_time,
user_name=user_name,
elapsed_minutes=elapsed / 60,
max_wait_minutes=max_wait / 60,
)
elif situation_type == "reply_late":
elapsed = session.waiting_config.get_elapsed_seconds()
max_wait = session.waiting_config.max_wait_seconds
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["situation_reply_late"],
current_time=current_time,
user_name=user_name,
elapsed_minutes=elapsed / 60,
max_wait_minutes=max_wait / 60,
)
elif situation_type == "timeout":
elapsed = session.waiting_config.get_elapsed_seconds()
max_wait = session.waiting_config.max_wait_seconds
expected = session.waiting_config.expected_reaction
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["situation_timeout"],
current_time=current_time,
user_name=user_name,
elapsed_minutes=elapsed / 60,
max_wait_minutes=max_wait / 60,
expected_reaction=expected or "对方能回复点什么",
)
elif situation_type == "proactive":
silence = extra_context.get("silence_duration", "一段时间")
trigger_reason = extra_context.get("trigger_reason", "")
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["situation_proactive"],
current_time=current_time,
user_name=user_name,
silence_duration=silence,
trigger_reason=trigger_reason,
)
# 默认使用 new_message
return await global_prompt_manager.format_prompt(
PROMPT_NAMES["situation_new_message"],
current_time=current_time,
user_name=user_name,
)
def _build_actions_block(self, available_actions: Optional[dict]) -> str:
"""构建可用动作块"""
if not available_actions:
return self._get_default_actions_block()
lines = []
for name, info in available_actions.items():
desc = getattr(info, "description", "") or f"执行 {name}"
lines.append(f"- `{name}`: {desc}")
return "\n".join(lines) if lines else self._get_default_actions_block()
def _get_default_actions_block(self) -> str:
"""获取默认的动作列表"""
return """- `reply`: 发送文字消息参数content
- `poke_user`: 戳一戳对方
- `do_nothing`: 什么都不做"""
async def _get_output_format(self) -> str:
"""获取输出格式模板"""
try:
prompt = await global_prompt_manager.get_prompt_async(
PROMPT_NAMES["output_format"]
)
return prompt.template
except KeyError:
# 如果模板未注册,返回默认格式
return """请用 JSON 格式回复:
{
"thought": "你的想法",
"actions": [{"type": "reply", "content": "你的回复"}],
"expected_reaction": "期待的反应",
"max_wait_seconds": 300
}"""
# 全局单例
_prompt_builder: Optional[PromptBuilder] = None
def get_prompt_builder() -> PromptBuilder:
"""获取全局提示词构建器"""
global _prompt_builder
if _prompt_builder is None:
_prompt_builder = PromptBuilder()
return _prompt_builder

View File

@@ -0,0 +1,217 @@
"""
Kokoro Flow Chatter V2 - 提示词模板注册
使用项目统一的 Prompt 管理系统注册所有 KFC V2 使用的提示词模板
"""
from src.chat.utils.prompt import Prompt
# =================================================================================================
# KFC V2 主提示词模板
# =================================================================================================
KFC_V2_MAIN_PROMPT = Prompt(
name="kfc_v2_main",
template="""# 你与 {user_name} 的私聊
## 1. 你是谁
{persona_block}
## 2. 你与 {user_name} 的关系
{relation_block}
## 3. 你们之间发生的事(活动流)
以下是你和 {user_name} 最近的互动历史,按时间顺序记录了你们的对话和你的心理活动:
{activity_stream}
## 4. 当前情况
{current_situation}
## 5. 你可以做的事情
{available_actions}
## 6. 你的回复格式
{output_format}
""",
)
# =================================================================================================
# 输出格式模板
# =================================================================================================
KFC_V2_OUTPUT_FORMAT = Prompt(
name="kfc_v2_output_format",
template="""请用以下 JSON 格式回复:
```json
{{
"thought": "你脑子里在想什么,越自然越好",
"actions": [
{{"type": "reply", "content": "你要说的话"}},
{{"type": "其他动作", "参数": ""}}
],
"expected_reaction": "你期待对方的反应是什么",
"max_wait_seconds": 300
}}
```
说明:
- `thought`:你的内心独白,记录你此刻的想法和感受
- `actions`:你要执行的动作列表,可以组合多个
- `expected_reaction`:你期待对方如何回应(用于判断是否需要等待)
- `max_wait_seconds`设定等待时间0 表示不等待,超时后你会考虑是否要主动说点什么
- 即使什么都不想做,也放一个 `{{"type": "do_nothing"}}`""",
)
# =================================================================================================
# 情景模板 - 根据不同情境使用不同的当前情况描述
# =================================================================================================
KFC_V2_SITUATION_NEW_MESSAGE = Prompt(
name="kfc_v2_situation_new_message",
template="""现在是 {current_time}
{user_name} 刚刚给你发了消息。这是一次新的对话发起(不是对你之前消息的回复)。
请决定你要怎么回应。你可以:
- 发送文字消息回复
- 发表情包
- 戳一戳对方
- 什么都不做(如果觉得没必要回复)
- 或者组合多个动作""",
)
KFC_V2_SITUATION_REPLY_IN_TIME = Prompt(
name="kfc_v2_situation_reply_in_time",
template="""现在是 {current_time}
你之前发了消息后一直在等 {user_name} 的回复。
等了大约 {elapsed_minutes:.1f} 分钟(你原本打算最多等 {max_wait_minutes:.1f} 分钟)。
现在 {user_name} 回复了!
请决定你接下来要怎么回应。""",
)
KFC_V2_SITUATION_REPLY_LATE = Prompt(
name="kfc_v2_situation_reply_late",
template="""现在是 {current_time}
你之前发了消息后在等 {user_name} 的回复。
你原本打算最多等 {max_wait_minutes:.1f} 分钟,但实际等了 {elapsed_minutes:.1f} 分钟才收到回复。
虽然有点迟,但 {user_name} 终于回复了。
请决定你接下来要怎么回应。(可以选择轻轻抱怨一下迟到,也可以装作没在意)""",
)
KFC_V2_SITUATION_TIMEOUT = Prompt(
name="kfc_v2_situation_timeout",
template="""现在是 {current_time}
你之前发了消息后一直在等 {user_name} 的回复。
你原本打算最多等 {max_wait_minutes:.1f} 分钟,现在已经等了 {elapsed_minutes:.1f} 分钟了,对方还是没回。
你期待的反应是:"{expected_reaction}"
你需要决定:
1. 继续等待(设置新的 max_wait_seconds
2. 主动说点什么打破沉默
3. 做点别的事情(戳一戳、发表情等)
4. 算了不等了max_wait_seconds = 0""",
)
KFC_V2_SITUATION_PROACTIVE = Prompt(
name="kfc_v2_situation_proactive",
template="""现在是 {current_time}
你和 {user_name} 已经有一段时间没聊天了(沉默了 {silence_duration})。
{trigger_reason}
你在想要不要主动找 {user_name} 聊点什么。
请决定:
1. 主动发起对话(想个话题开场)
2. 发个表情或戳一戳试探一下
3. 算了现在不是好时机do_nothing
如果决定发起对话,想想用什么自然的方式开场,不要太突兀。""",
)
# =================================================================================================
# 活动流条目模板 - 用于构建 activity_stream
# =================================================================================================
# 用户消息条目
KFC_V2_ENTRY_USER_MESSAGE = Prompt(
name="kfc_v2_entry_user_message",
template="""{time}{user_name} 说:
"{content}"
""",
)
# Bot 规划条目(有等待)
KFC_V2_ENTRY_BOT_PLANNING = Prompt(
name="kfc_v2_entry_bot_planning",
template="""【你的想法】
内心:{thought}
行动:{actions_description}
期待:{expected_reaction}
决定等待:最多 {max_wait_minutes:.1f} 分钟
""",
)
# Bot 规划条目(无等待)
KFC_V2_ENTRY_BOT_PLANNING_NO_WAIT = Prompt(
name="kfc_v2_entry_bot_planning_no_wait",
template="""【你的想法】
内心:{thought}
行动:{actions_description}
(不打算等对方回复)
""",
)
# 等待期间心理变化
KFC_V2_ENTRY_WAITING_UPDATE = Prompt(
name="kfc_v2_entry_waiting_update",
template="""【等待中... {elapsed_minutes:.1f} 分钟过去了】
你想:{waiting_thought}
""",
)
# 收到及时回复时的标注
KFC_V2_ENTRY_REPLY_IN_TIME = Prompt(
name="kfc_v2_entry_reply_in_time",
template="""→ (对方在你预期时间内回复了,等了 {elapsed_minutes:.1f} 分钟)
""",
)
# 收到迟到回复时的标注
KFC_V2_ENTRY_REPLY_LATE = Prompt(
name="kfc_v2_entry_reply_late",
template="""→ (对方回复迟了,你原本只打算等 {max_wait_minutes:.1f} 分钟,实际等了 {elapsed_minutes:.1f} 分钟)
""",
)
# 主动思考触发
KFC_V2_ENTRY_PROACTIVE_TRIGGER = Prompt(
name="kfc_v2_entry_proactive_trigger",
template="""【沉默了 {silence_duration}
你开始考虑要不要主动找对方聊点什么...
""",
)
# 导出所有模板名称,方便外部引用
PROMPT_NAMES = {
"main": "kfc_v2_main",
"output_format": "kfc_v2_output_format",
"situation_new_message": "kfc_v2_situation_new_message",
"situation_reply_in_time": "kfc_v2_situation_reply_in_time",
"situation_reply_late": "kfc_v2_situation_reply_late",
"situation_timeout": "kfc_v2_situation_timeout",
"situation_proactive": "kfc_v2_situation_proactive",
"entry_user_message": "kfc_v2_entry_user_message",
"entry_bot_planning": "kfc_v2_entry_bot_planning",
"entry_bot_planning_no_wait": "kfc_v2_entry_bot_planning_no_wait",
"entry_waiting_update": "kfc_v2_entry_waiting_update",
"entry_reply_in_time": "kfc_v2_entry_reply_in_time",
"entry_reply_late": "kfc_v2_entry_reply_late",
"entry_proactive_trigger": "kfc_v2_entry_proactive_trigger",
}

View File

@@ -0,0 +1,107 @@
"""
Kokoro Flow Chatter V2 - Replyer
简化的回复生成模块,使用插件系统的 llm_api
"""
from typing import TYPE_CHECKING, Optional
from src.common.logger import get_logger
from src.plugin_system.apis import llm_api
from src.utils.json_parser import extract_and_parse_json
from .models import LLMResponse
from .prompt.builder import get_prompt_builder
from .session import KokoroSession
if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream
logger = get_logger("kfc_v2_replyer")
async def generate_response(
session: KokoroSession,
user_name: str,
situation_type: str = "new_message",
chat_stream: Optional["ChatStream"] = None,
available_actions: Optional[dict] = None,
extra_context: Optional[dict] = None,
) -> LLMResponse:
"""
生成回复
Args:
session: 会话对象
user_name: 用户名称
situation_type: 情况类型
chat_stream: 聊天流对象
available_actions: 可用动作字典
extra_context: 额外上下文
Returns:
LLMResponse 对象
"""
try:
# 1. 构建提示词
prompt_builder = get_prompt_builder()
prompt = await prompt_builder.build_prompt(
session=session,
user_name=user_name,
situation_type=situation_type,
chat_stream=chat_stream,
available_actions=available_actions,
extra_context=extra_context,
)
logger.debug(f"[KFC Replyer] 构建的提示词:\n{prompt}")
# 2. 获取模型配置并调用 LLM
models = llm_api.get_available_models()
replyer_config = models.get("replyer")
if not replyer_config:
logger.error("[KFC Replyer] 未找到 replyer 模型配置")
return LLMResponse.create_error_response("未找到 replyer 模型配置")
success, raw_response, reasoning, model_name = await llm_api.generate_with_model(
prompt=prompt,
model_config=replyer_config,
request_type="kokoro_flow_chatter_v2",
)
if not success:
logger.error(f"[KFC Replyer] LLM 调用失败: {raw_response}")
return LLMResponse.create_error_response(raw_response)
logger.debug(f"[KFC Replyer] LLM 响应 (model={model_name}):\n{raw_response}")
# 3. 解析响应
return _parse_response(raw_response)
except Exception as e:
logger.error(f"[KFC Replyer] 生成失败: {e}")
import traceback
traceback.print_exc()
return LLMResponse.create_error_response(str(e))
def _parse_response(raw_response: str) -> LLMResponse:
"""解析 LLM 响应"""
data = extract_and_parse_json(raw_response, strict=False)
if not data or not isinstance(data, dict):
logger.warning(f"[KFC Replyer] 无法解析 JSON: {raw_response[:200]}...")
return LLMResponse.create_error_response("无法解析响应格式")
response = LLMResponse.from_dict(data)
if response.thought:
logger.info(
f"[KFC Replyer] 解析成功: thought={response.thought[:50]}..., "
f"actions={[a.type for a in response.actions]}"
)
else:
logger.warning("[KFC Replyer] 响应缺少 thought")
return response

View File

@@ -0,0 +1,386 @@
"""
Kokoro Flow Chatter V2 - 会话管理
极简的会话状态管理:
- Session 只有 IDLE 和 WAITING 两种状态
- 包含 mental_log心理活动历史
- 包含 waiting_config等待配置
"""
import asyncio
import json
import os
import time
from pathlib import Path
from typing import Optional
from src.common.logger import get_logger
from .models import (
EventType,
MentalLogEntry,
SessionStatus,
WaitingConfig,
)
logger = get_logger("kfc_v2_session")
class KokoroSession:
"""
Kokoro Flow Chatter V2 会话
为每个私聊用户维护一个独立的会话,包含:
- 基本信息user_id, stream_id
- 状态(只有 IDLE 和 WAITING
- 心理活动历史mental_log
- 等待配置waiting_config
"""
# 心理活动日志最大保留条数
MAX_MENTAL_LOG_SIZE = 50
def __init__(
self,
user_id: str,
stream_id: str,
):
self.user_id = user_id
self.stream_id = stream_id
# 状态(只有 IDLE 和 WAITING
self._status: SessionStatus = SessionStatus.IDLE
# 心理活动历史
self.mental_log: list[MentalLogEntry] = []
# 等待配置
self.waiting_config: WaitingConfig = WaitingConfig()
# 时间戳
self.created_at: float = time.time()
self.last_activity_at: float = time.time()
# 统计
self.total_interactions: int = 0
# 上次主动思考时间
self.last_proactive_at: Optional[float] = None
@property
def status(self) -> SessionStatus:
return self._status
@status.setter
def status(self, value: SessionStatus) -> None:
old_status = self._status
self._status = value
if old_status != value:
logger.debug(f"Session {self.user_id} 状态变更: {old_status}{value}")
def add_entry(self, entry: MentalLogEntry) -> None:
"""添加心理活动日志条目"""
self.mental_log.append(entry)
self.last_activity_at = time.time()
# 保持日志在合理大小
if len(self.mental_log) > self.MAX_MENTAL_LOG_SIZE:
self.mental_log = self.mental_log[-self.MAX_MENTAL_LOG_SIZE:]
def add_user_message(
self,
content: str,
user_name: str,
user_id: str,
timestamp: Optional[float] = None,
) -> MentalLogEntry:
"""添加用户消息事件"""
entry = MentalLogEntry(
event_type=EventType.USER_MESSAGE,
timestamp=timestamp or time.time(),
content=content,
user_name=user_name,
user_id=user_id,
)
# 如果之前在等待,记录收到回复的情况
if self.status == SessionStatus.WAITING and self.waiting_config.is_active():
elapsed = self.waiting_config.get_elapsed_seconds()
max_wait = self.waiting_config.max_wait_seconds
if elapsed <= max_wait:
entry.metadata["reply_status"] = "in_time"
entry.metadata["elapsed_seconds"] = elapsed
entry.metadata["max_wait_seconds"] = max_wait
else:
entry.metadata["reply_status"] = "late"
entry.metadata["elapsed_seconds"] = elapsed
entry.metadata["max_wait_seconds"] = max_wait
self.add_entry(entry)
return entry
def add_bot_planning(
self,
thought: str,
actions: list[dict],
expected_reaction: str = "",
max_wait_seconds: int = 0,
timestamp: Optional[float] = None,
) -> MentalLogEntry:
"""添加 Bot 规划事件"""
entry = MentalLogEntry(
event_type=EventType.BOT_PLANNING,
timestamp=timestamp or time.time(),
thought=thought,
actions=actions,
expected_reaction=expected_reaction,
max_wait_seconds=max_wait_seconds,
)
self.add_entry(entry)
self.total_interactions += 1
return entry
def add_waiting_update(
self,
waiting_thought: str,
mood: str = "",
timestamp: Optional[float] = None,
) -> MentalLogEntry:
"""添加等待期间的心理变化"""
entry = MentalLogEntry(
event_type=EventType.WAITING_UPDATE,
timestamp=timestamp or time.time(),
waiting_thought=waiting_thought,
mood=mood,
elapsed_seconds=self.waiting_config.get_elapsed_seconds(),
)
self.add_entry(entry)
return entry
def start_waiting(
self,
expected_reaction: str,
max_wait_seconds: int,
) -> None:
"""开始等待"""
if max_wait_seconds <= 0:
# 不等待,直接进入 IDLE
self.status = SessionStatus.IDLE
self.waiting_config.reset()
return
self.status = SessionStatus.WAITING
self.waiting_config = WaitingConfig(
expected_reaction=expected_reaction,
max_wait_seconds=max_wait_seconds,
started_at=time.time(),
last_thinking_at=0.0,
thinking_count=0,
)
logger.debug(
f"Session {self.user_id} 开始等待: "
f"max_wait={max_wait_seconds}s, expected={expected_reaction[:30]}..."
)
def end_waiting(self) -> None:
"""结束等待"""
self.status = SessionStatus.IDLE
self.waiting_config.reset()
def get_recent_entries(self, limit: int = 20) -> list[MentalLogEntry]:
"""获取最近的心理活动日志"""
return self.mental_log[-limit:] if self.mental_log else []
def get_last_bot_message(self) -> Optional[str]:
"""获取最后一条 Bot 发送的消息"""
for entry in reversed(self.mental_log):
if entry.event_type == EventType.BOT_PLANNING:
for action in entry.actions:
if action.get("type") in ("reply", "respond"):
return action.get("content", "")
return None
def to_dict(self) -> dict:
"""转换为字典(用于持久化)"""
return {
"user_id": self.user_id,
"stream_id": self.stream_id,
"status": str(self.status),
"mental_log": [e.to_dict() for e in self.mental_log],
"waiting_config": self.waiting_config.to_dict(),
"created_at": self.created_at,
"last_activity_at": self.last_activity_at,
"total_interactions": self.total_interactions,
"last_proactive_at": self.last_proactive_at,
}
@classmethod
def from_dict(cls, data: dict) -> "KokoroSession":
"""从字典创建会话"""
session = cls(
user_id=data.get("user_id", ""),
stream_id=data.get("stream_id", ""),
)
# 状态
status_str = data.get("status", "idle")
try:
session._status = SessionStatus(status_str)
except ValueError:
session._status = SessionStatus.IDLE
# 心理活动历史
mental_log_data = data.get("mental_log", [])
session.mental_log = [MentalLogEntry.from_dict(e) for e in mental_log_data]
# 等待配置
waiting_data = data.get("waiting_config", {})
session.waiting_config = WaitingConfig.from_dict(waiting_data)
# 时间戳
session.created_at = data.get("created_at", time.time())
session.last_activity_at = data.get("last_activity_at", time.time())
session.total_interactions = data.get("total_interactions", 0)
session.last_proactive_at = data.get("last_proactive_at")
return session
class SessionManager:
"""
会话管理器
负责会话的创建、获取、保存和清理
"""
_instance: Optional["SessionManager"] = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
data_dir: str = "data/kokoro_flow_chatter_v2/sessions",
max_session_age_days: int = 30,
):
if hasattr(self, "_initialized") and self._initialized:
return
self._initialized = True
self.data_dir = Path(data_dir)
self.max_session_age_days = max_session_age_days
# 内存缓存
self._sessions: dict[str, KokoroSession] = {}
self._locks: dict[str, asyncio.Lock] = {}
# 确保数据目录存在
self.data_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"SessionManager 初始化完成: {self.data_dir}")
def _get_lock(self, user_id: str) -> asyncio.Lock:
"""获取用户级别的锁"""
if user_id not in self._locks:
self._locks[user_id] = asyncio.Lock()
return self._locks[user_id]
def _get_file_path(self, user_id: str) -> Path:
"""获取会话文件路径"""
safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in user_id)
return self.data_dir / f"{safe_id}.json"
async def get_session(self, user_id: str, stream_id: str) -> KokoroSession:
"""获取或创建会话"""
async with self._get_lock(user_id):
# 检查内存缓存
if user_id in self._sessions:
session = self._sessions[user_id]
session.stream_id = stream_id # 更新 stream_id
return session
# 尝试从文件加载
session = await self._load_from_file(user_id)
if session:
session.stream_id = stream_id
self._sessions[user_id] = session
return session
# 创建新会话
session = KokoroSession(user_id=user_id, stream_id=stream_id)
self._sessions[user_id] = session
logger.info(f"创建新会话: {user_id}")
return session
async def _load_from_file(self, user_id: str) -> Optional[KokoroSession]:
"""从文件加载会话"""
file_path = self._get_file_path(user_id)
if not file_path.exists():
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
session = KokoroSession.from_dict(data)
logger.debug(f"从文件加载会话: {user_id}")
return session
except Exception as e:
logger.error(f"加载会话失败 {user_id}: {e}")
return None
async def save_session(self, user_id: str) -> bool:
"""保存会话到文件"""
async with self._get_lock(user_id):
if user_id not in self._sessions:
return False
session = self._sessions[user_id]
file_path = self._get_file_path(user_id)
try:
data = session.to_dict()
temp_path = file_path.with_suffix(".json.tmp")
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(temp_path, file_path)
return True
except Exception as e:
logger.error(f"保存会话失败 {user_id}: {e}")
return False
async def save_all(self) -> int:
"""保存所有会话"""
count = 0
for user_id in list(self._sessions.keys()):
if await self.save_session(user_id):
count += 1
return count
async def get_waiting_sessions(self) -> list[KokoroSession]:
"""获取所有处于等待状态的会话"""
return [s for s in self._sessions.values() if s.status == SessionStatus.WAITING]
async def get_all_sessions(self) -> list[KokoroSession]:
"""获取所有会话"""
return list(self._sessions.values())
def get_session_sync(self, user_id: str) -> Optional[KokoroSession]:
"""同步获取会话(仅从内存)"""
return self._sessions.get(user_id)
# 全局单例
_session_manager: Optional[SessionManager] = None
def get_session_manager() -> SessionManager:
"""获取全局会话管理器"""
global _session_manager
if _session_manager is None:
_session_manager = SessionManager()
return _session_manager