From 0fe15dac52cbecbd854a285495f5c0a414be647c Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Sun, 30 Nov 2025 13:05:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BD=BF=E7=94=A8=E6=8F=90=E7=A4=BA?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=92=8C=E4=BC=9A=E8=AF=9D=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=9D=A5=E5=AE=9E=E7=8E=B0Kokoro=20Flow=20Chatter=20V2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在Kokoro Flow Chatter V2中添加提示模块以管理提示信息。 - 创建一个构建器,用于根据用户交互和会话上下文构建提示。 - 为不同场景(新消息、及时回复等)注册各种提示模板。 - 开发一个回复模块,使用LLM API生成回复。 - 实现会话管理以处理用户交互并维护状态。 - 引入心理日志条目以追踪用户与机器人的交互情况。 - 确保各模块中都有适当的日志记录和错误处理。 --- .../kokoro_flow_chatter_v2/__init__.py | 66 +++ .../kokoro_flow_chatter_v2/action_executor.py | 228 ++++++++ .../kokoro_flow_chatter_v2/chatter.py | 263 +++++++++ .../built_in/kokoro_flow_chatter_v2/config.py | 221 ++++++++ .../kokoro_flow_chatter_v2/context_builder.py | 338 ++++++++++++ .../built_in/kokoro_flow_chatter_v2/models.py | 320 +++++++++++ .../built_in/kokoro_flow_chatter_v2/plugin.py | 105 ++++ .../proactive_thinker.py | 500 ++++++++++++++++++ .../kokoro_flow_chatter_v2/prompt/__init__.py | 16 + .../kokoro_flow_chatter_v2/prompt/builder.py | 388 ++++++++++++++ .../kokoro_flow_chatter_v2/prompt/prompts.py | 217 ++++++++ .../kokoro_flow_chatter_v2/replyer.py | 107 ++++ .../kokoro_flow_chatter_v2/session.py | 386 ++++++++++++++ 13 files changed, 3155 insertions(+) create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/__init__.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/action_executor.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/chatter.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/config.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/context_builder.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/models.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/plugin.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/proactive_thinker.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/prompt/__init__.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/prompt/builder.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/prompt/prompts.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/replyer.py create mode 100644 src/plugins/built_in/kokoro_flow_chatter_v2/session.py diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/__init__.py b/src/plugins/built_in/kokoro_flow_chatter_v2/__init__.py new file mode 100644 index 000000000..a7e8604e8 --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/__init__.py @@ -0,0 +1,66 @@ +""" +Kokoro Flow Chatter V2 - 私聊特化的心流聊天器 + +重构版本,核心设计理念: +1. Chatter 职责极简化:只负责"收到消息 → 规划执行" +2. Session 状态简化:只有 IDLE 和 WAITING 两种状态 +3. 独立的 Replyer:专属的提示词构建和 LLM 交互 +4. 独立的主动思考器:负责等待管理和主动发起 +5. 大模板 + 小模板:线性叙事风格的提示词架构 +""" + +from .models import ( + EventType, + SessionStatus, + MentalLogEntry, + WaitingConfig, + ActionModel, + LLMResponse, +) +from .session import KokoroSession, SessionManager, get_session_manager +from .chatter import KokoroFlowChatterV2 +from .replyer import generate_response +from .action_executor import ActionExecutor +from .proactive_thinker import ( + ProactiveThinker, + get_proactive_thinker, + start_proactive_thinker, + stop_proactive_thinker, +) +from .config import ( + KokoroFlowChatterV2Config, + get_config, + load_config, + reload_config, +) +from .plugin import KokoroFlowChatterV2Plugin + +__all__ = [ + # Models + "EventType", + "SessionStatus", + "MentalLogEntry", + "WaitingConfig", + "ActionModel", + "LLMResponse", + # Session + "KokoroSession", + "SessionManager", + "get_session_manager", + # Core Components + "KokoroFlowChatterV2", + "generate_response", + "ActionExecutor", + # Proactive Thinker + "ProactiveThinker", + "get_proactive_thinker", + "start_proactive_thinker", + "stop_proactive_thinker", + # Config + "KokoroFlowChatterV2Config", + "get_config", + "load_config", + "reload_config", + # Plugin + "KokoroFlowChatterV2Plugin", +] diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/action_executor.py b/src/plugins/built_in/kokoro_flow_chatter_v2/action_executor.py new file mode 100644 index 000000000..aa0fe2ef1 --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/action_executor.py @@ -0,0 +1,228 @@ +""" +Kokoro Flow Chatter V2 - 动作执行器 + +负责执行 LLM 决策的动作 +""" + +import asyncio +import time +from typing import TYPE_CHECKING, Any, Optional + +from src.chat.planner_actions.action_manager import ChatterActionManager +from src.common.logger import get_logger +from src.plugin_system.apis import send_api + +from .models import ActionModel, LLMResponse + +if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream + +logger = get_logger("kfc_v2_action_executor") + + +class ActionExecutor: + """ + 动作执行器 + + 职责: + - 执行 reply、poke_user 等动作 + - 通过 ActionManager 执行动态注册的动作 + """ + + # 内置动作(不通过 ActionManager) + BUILTIN_ACTIONS = {"reply", "do_nothing"} + + def __init__(self, stream_id: str): + self.stream_id = stream_id + self._action_manager = ChatterActionManager() + self._available_actions: dict = {} + + # 统计 + self._stats = { + "total_executed": 0, + "successful": 0, + "failed": 0, + } + + async def load_actions(self) -> dict: + """加载可用动作""" + await self._action_manager.load_actions(self.stream_id) + self._available_actions = self._action_manager.get_using_actions() + logger.debug(f"[ActionExecutor] 加载了 {len(self._available_actions)} 个动作") + return self._available_actions + + def get_available_actions(self) -> dict: + """获取可用动作""" + return self._available_actions.copy() + + async def execute( + self, + response: LLMResponse, + chat_stream: Optional["ChatStream"], + ) -> dict[str, Any]: + """ + 执行动作列表 + + Args: + response: LLM 响应 + chat_stream: 聊天流 + + Returns: + 执行结果 + """ + results = [] + has_reply = False + reply_content = "" + + for action in response.actions: + try: + result = await self._execute_action(action, chat_stream) + results.append(result) + + if result.get("success"): + self._stats["successful"] += 1 + if action.type in ("reply", "respond"): + has_reply = True + reply_content = action.params.get("content", "") + else: + self._stats["failed"] += 1 + + except Exception as e: + logger.error(f"[ActionExecutor] 执行动作失败 {action.type}: {e}") + results.append({ + "action_type": action.type, + "success": False, + "error": str(e), + }) + self._stats["failed"] += 1 + + self._stats["total_executed"] += 1 + + return { + "success": all(r.get("success", False) for r in results), + "results": results, + "has_reply": has_reply, + "reply_content": reply_content, + } + + async def _execute_action( + self, + action: ActionModel, + chat_stream: Optional["ChatStream"], + ) -> dict[str, Any]: + """执行单个动作""" + action_type = action.type + + if action_type == "reply": + return await self._execute_reply(action, chat_stream) + + elif action_type == "do_nothing": + logger.debug("[ActionExecutor] 执行 do_nothing") + return {"action_type": "do_nothing", "success": True} + + elif action_type == "poke_user": + return await self._execute_via_manager(action, chat_stream) + + elif action_type in self._available_actions: + return await self._execute_via_manager(action, chat_stream) + + else: + logger.warning(f"[ActionExecutor] 未知动作类型: {action_type}") + return { + "action_type": action_type, + "success": False, + "error": f"未知动作类型: {action_type}", + } + + async def _execute_reply( + self, + action: ActionModel, + chat_stream: Optional["ChatStream"], + ) -> dict[str, Any]: + """执行回复动作""" + content = action.params.get("content", "") + + if not content: + return { + "action_type": "reply", + "success": False, + "error": "回复内容为空", + } + + try: + # 消息后处理(分割、错别字等) + processed_messages = await self._process_reply_content(content) + + all_success = True + for msg in processed_messages: + success = await send_api.text_to_stream( + text=msg, + stream_id=self.stream_id, + typing=True, + ) + if not success: + all_success = False + + return { + "action_type": "reply", + "success": all_success, + "reply_text": content, + } + + except Exception as e: + logger.error(f"[ActionExecutor] 发送回复失败: {e}") + return { + "action_type": "reply", + "success": False, + "error": str(e), + } + + async def _process_reply_content(self, content: str) -> list[str]: + """处理回复内容(分割、错别字等)""" + try: + # 复用 v1 的后处理器 + from src.plugins.built_in.kokoro_flow_chatter.response_post_processor import ( + process_reply_content, + ) + + messages = await process_reply_content(content) + return messages if messages else [content] + + except Exception as e: + logger.warning(f"[ActionExecutor] 消息处理失败,使用原始内容: {e}") + return [content] + + async def _execute_via_manager( + self, + action: ActionModel, + chat_stream: Optional["ChatStream"], + ) -> dict[str, Any]: + """通过 ActionManager 执行动作""" + try: + result = await self._action_manager.execute_action( + action_name=action.type, + chat_id=self.stream_id, + target_message=None, + reasoning=f"KFC决策: {action.type}", + action_data=action.params, + thinking_id=None, + log_prefix="[KFC V2]", + ) + + return { + "action_type": action.type, + "success": result.get("success", False), + "result": result, + } + + except Exception as e: + logger.error(f"[ActionExecutor] ActionManager 执行失败: {e}") + return { + "action_type": action.type, + "success": False, + "error": str(e), + } + + def get_stats(self) -> dict: + """获取统计信息""" + return self._stats.copy() diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/chatter.py b/src/plugins/built_in/kokoro_flow_chatter_v2/chatter.py new file mode 100644 index 000000000..410226b95 --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/chatter.py @@ -0,0 +1,263 @@ +""" +Kokoro Flow Chatter V2 - Chatter 主类 + +极简设计,只负责: +1. 收到消息 +2. 调用 Replyer 生成响应 +3. 执行动作 +4. 更新 Session +""" + +import asyncio +import time +from typing import TYPE_CHECKING, Any, ClassVar, Optional + +from src.common.data_models.message_manager_data_model import StreamContext +from src.common.logger import get_logger +from src.plugin_system.base.base_chatter import BaseChatter +from src.plugin_system.base.component_types import ChatType + +from .action_executor import ActionExecutor +from .models import EventType, SessionStatus +from .replyer import generate_response +from .session import get_session_manager + +if TYPE_CHECKING: + from src.chat.planner_actions.action_manager import ChatterActionManager + from src.common.data_models.database_data_model import DatabaseMessages + +logger = get_logger("kfc_v2_chatter") + +# 控制台颜色 +SOFT_PURPLE = "\033[38;5;183m" +RESET = "\033[0m" + + +class KokoroFlowChatterV2(BaseChatter): + """ + Kokoro Flow Chatter V2 - 私聊特化的心流聊天器 + + 核心设计: + - Chatter 只负责 "收到消息 → 规划执行" 的流程 + - 无论 Session 之前是什么状态,流程都一样 + - 区别只体现在提示词中 + + 不负责: + - 等待超时处理(由 ProactiveThinker 负责) + - 连续思考(由 ProactiveThinker 负责) + - 主动发起对话(由 ProactiveThinker 负责) + """ + + chatter_name: str = "KokoroFlowChatterV2" + chatter_description: str = "心流聊天器 V2 - 私聊特化的深度情感交互处理器" + chat_types: ClassVar[list[ChatType]] = [ChatType.PRIVATE] + + def __init__( + self, + stream_id: str, + action_manager: "ChatterActionManager", + plugin_config: dict | None = None, + ): + super().__init__(stream_id, action_manager, plugin_config) + + # 核心组件 + self.session_manager = get_session_manager() + self.action_executor = ActionExecutor(stream_id) + + # 并发控制 + self._lock = asyncio.Lock() + self._processing = False + + # 统计 + self._stats = { + "messages_processed": 0, + "successful_responses": 0, + "failed_responses": 0, + } + + logger.info(f"{SOFT_PURPLE}[KFC V2]{RESET} 初始化完成: stream_id={stream_id}") + + async def execute(self, context: StreamContext) -> dict: + """ + 执行聊天处理 + + 流程: + 1. 获取 Session + 2. 获取未读消息 + 3. 记录用户消息到 mental_log + 4. 确定 situation_type(根据之前的等待状态) + 5. 调用 Replyer 生成响应 + 6. 执行动作 + 7. 更新 Session(记录 Bot 规划,设置等待状态) + 8. 保存 Session + """ + async with self._lock: + self._processing = True + + try: + # 1. 获取未读消息 + unread_messages = context.get_unread_messages() + if not unread_messages: + return self._build_result(success=True, message="no_unread_messages") + + # 2. 取最后一条消息作为主消息 + target_message = unread_messages[-1] + user_info = target_message.user_info + + if not user_info: + return self._build_result(success=False, message="no_user_info") + + user_id = str(user_info.user_id) + user_name = user_info.user_nickname or user_id + + # 3. 获取或创建 Session + session = await self.session_manager.get_session(user_id, self.stream_id) + + # 4. 确定 situation_type(根据之前的等待状态) + situation_type = self._determine_situation_type(session) + + # 5. 记录用户消息到 mental_log + for msg in unread_messages: + msg_content = msg.processed_plain_text or msg.display_message or "" + msg_user_name = msg.user_info.user_nickname if msg.user_info else user_name + msg_user_id = str(msg.user_info.user_id) if msg.user_info else user_id + + session.add_user_message( + content=msg_content, + user_name=msg_user_name, + user_id=msg_user_id, + timestamp=msg.time, + ) + + # 6. 加载可用动作 + await self.action_executor.load_actions() + available_actions = self.action_executor.get_available_actions() + + # 7. 获取聊天流 + chat_stream = await self._get_chat_stream() + + # 8. 调用 Replyer 生成响应 + response = await generate_response( + session=session, + user_name=user_name, + situation_type=situation_type, + chat_stream=chat_stream, + available_actions=available_actions, + ) + + # 9. 执行动作 + exec_result = await self.action_executor.execute(response, chat_stream) + + # 10. 记录 Bot 规划到 mental_log + session.add_bot_planning( + thought=response.thought, + actions=[a.to_dict() for a in response.actions], + expected_reaction=response.expected_reaction, + max_wait_seconds=response.max_wait_seconds, + ) + + # 11. 更新 Session 状态 + if response.max_wait_seconds > 0: + session.start_waiting( + expected_reaction=response.expected_reaction, + max_wait_seconds=response.max_wait_seconds, + ) + else: + session.end_waiting() + + # 12. 标记消息为已读 + for msg in unread_messages: + context.mark_message_as_read(str(msg.message_id)) + + # 13. 保存 Session + await self.session_manager.save_session(user_id) + + # 14. 更新统计 + self._stats["messages_processed"] += len(unread_messages) + if exec_result.get("has_reply"): + self._stats["successful_responses"] += 1 + + logger.info( + f"{SOFT_PURPLE}[KFC V2]{RESET} 处理完成: " + f"user={user_name}, situation={situation_type}, " + f"actions={[a.type for a in response.actions]}, " + f"wait={response.max_wait_seconds}s" + ) + + return self._build_result( + success=True, + message="processed", + has_reply=exec_result.get("has_reply", False), + thought=response.thought, + situation_type=situation_type, + ) + + except Exception as e: + self._stats["failed_responses"] += 1 + logger.error(f"[KFC V2] 处理失败: {e}") + import traceback + traceback.print_exc() + return self._build_result(success=False, message=str(e), error=True) + + finally: + self._processing = False + + def _determine_situation_type(self, session) -> str: + """ + 确定当前情况类型 + + 根据 Session 之前的状态决定提示词的 situation_type + """ + if session.status == SessionStatus.WAITING: + # 之前在等待 + if session.waiting_config.is_timeout(): + # 超时了才收到回复 + return "reply_late" + else: + # 在预期内收到回复 + return "reply_in_time" + else: + # 之前是 IDLE + return "new_message" + + async def _get_chat_stream(self): + """获取聊天流对象""" + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + if chat_manager: + return await chat_manager.get_stream(self.stream_id) + except Exception as e: + logger.warning(f"[KFC V2] 获取 chat_stream 失败: {e}") + return None + + def _build_result( + self, + success: bool, + message: str = "", + error: bool = False, + **kwargs, + ) -> dict: + """构建返回结果""" + result = { + "success": success, + "stream_id": self.stream_id, + "message": message, + "error": error, + "timestamp": time.time(), + } + result.update(kwargs) + return result + + def get_stats(self) -> dict[str, Any]: + """获取统计信息""" + return { + **self._stats, + "action_executor_stats": self.action_executor.get_stats(), + } + + @property + def is_processing(self) -> bool: + """是否正在处理""" + return self._processing diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/config.py b/src/plugins/built_in/kokoro_flow_chatter_v2/config.py new file mode 100644 index 000000000..37e6c068e --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/config.py @@ -0,0 +1,221 @@ +""" +Kokoro Flow Chatter V2 - 配置 + +可以通过 TOML 配置文件覆盖默认值 +""" + +from dataclasses import dataclass, field +from typing import List, Optional + + +@dataclass +class WaitingDefaults: + """等待配置默认值""" + + # 默认最大等待时间(秒) + default_max_wait_seconds: int = 300 + + # 最小等待时间 + min_wait_seconds: int = 30 + + # 最大等待时间 + max_wait_seconds: int = 1800 + + +@dataclass +class ProactiveConfig: + """主动思考配置""" + + # 是否启用主动思考 + enabled: bool = True + + # 沉默阈值(秒),超过此时间考虑主动发起 + silence_threshold_seconds: int = 7200 + + # 两次主动发起最小间隔(秒) + min_interval_between_proactive: int = 1800 + + # 勿扰时段开始(HH:MM 格式) + quiet_hours_start: str = "23:00" + + # 勿扰时段结束 + quiet_hours_end: str = "07:00" + + # 主动发起概率(0.0 ~ 1.0) + trigger_probability: float = 0.3 + + +@dataclass +class PromptConfig: + """提示词配置""" + + # 活动记录保留条数 + max_activity_entries: int = 30 + + # 每条记录最大字符数 + max_entry_length: int = 500 + + # 是否包含人物关系信息 + include_relation: bool = True + + # 是否包含记忆信息 + include_memory: bool = True + + +@dataclass +class SessionConfig: + """会话配置""" + + # Session 持久化目录(相对于 data/) + session_dir: str = "kokoro_flow_chatter_v2/sessions" + + # Session 自动过期时间(秒),超过此时间未活动自动清理 + session_expire_seconds: int = 86400 * 7 # 7 天 + + # 活动记录保留上限 + max_mental_log_entries: int = 100 + + +@dataclass +class LLMConfig: + """LLM 配置""" + + # 模型名称(空则使用默认) + model_name: str = "" + + # Temperature + temperature: float = 0.8 + + # 最大 Token + max_tokens: int = 1024 + + # 请求超时(秒) + timeout: float = 60.0 + + +@dataclass +class KokoroFlowChatterV2Config: + """Kokoro Flow Chatter V2 总配置""" + + # 是否启用 + enabled: bool = True + + # 启用的消息源类型(空列表表示全部) + enabled_stream_types: List[str] = field(default_factory=lambda: ["private"]) + + # 等待配置 + waiting: WaitingDefaults = field(default_factory=WaitingDefaults) + + # 主动思考配置 + proactive: ProactiveConfig = field(default_factory=ProactiveConfig) + + # 提示词配置 + prompt: PromptConfig = field(default_factory=PromptConfig) + + # 会话配置 + session: SessionConfig = field(default_factory=SessionConfig) + + # LLM 配置 + llm: LLMConfig = field(default_factory=LLMConfig) + + # 调试模式 + debug: bool = False + + +# 全局配置单例 +_config: Optional[KokoroFlowChatterV2Config] = None + + +def get_config() -> KokoroFlowChatterV2Config: + """获取全局配置""" + global _config + if _config is None: + _config = load_config() + return _config + + +def load_config() -> KokoroFlowChatterV2Config: + """从全局配置加载 KFC V2 配置""" + from src.config.config import global_config + + config = KokoroFlowChatterV2Config() + + # 尝试从全局配置读取 + if not global_config: + return config + + try: + if hasattr(global_config, 'kokoro_flow_chatter_v2'): + kfc_cfg = getattr(global_config, 'kokoro_flow_chatter_v2') + + # 基础配置 + if hasattr(kfc_cfg, 'enabled'): + config.enabled = kfc_cfg.enabled + if hasattr(kfc_cfg, 'enabled_stream_types'): + config.enabled_stream_types = list(kfc_cfg.enabled_stream_types) + if hasattr(kfc_cfg, 'debug'): + config.debug = kfc_cfg.debug + + # 等待配置 + if hasattr(kfc_cfg, 'waiting'): + wait_cfg = kfc_cfg.waiting + config.waiting = WaitingDefaults( + default_max_wait_seconds=getattr(wait_cfg, 'default_max_wait_seconds', 300), + min_wait_seconds=getattr(wait_cfg, 'min_wait_seconds', 30), + max_wait_seconds=getattr(wait_cfg, 'max_wait_seconds', 1800), + ) + + # 主动思考配置 + if hasattr(kfc_cfg, 'proactive'): + pro_cfg = kfc_cfg.proactive + config.proactive = ProactiveConfig( + enabled=getattr(pro_cfg, 'enabled', True), + silence_threshold_seconds=getattr(pro_cfg, 'silence_threshold_seconds', 7200), + min_interval_between_proactive=getattr(pro_cfg, 'min_interval_between_proactive', 1800), + quiet_hours_start=getattr(pro_cfg, 'quiet_hours_start', "23:00"), + quiet_hours_end=getattr(pro_cfg, 'quiet_hours_end', "07:00"), + trigger_probability=getattr(pro_cfg, 'trigger_probability', 0.3), + ) + + # 提示词配置 + if hasattr(kfc_cfg, 'prompt'): + pmt_cfg = kfc_cfg.prompt + config.prompt = PromptConfig( + max_activity_entries=getattr(pmt_cfg, 'max_activity_entries', 30), + max_entry_length=getattr(pmt_cfg, 'max_entry_length', 500), + include_relation=getattr(pmt_cfg, 'include_relation', True), + include_memory=getattr(pmt_cfg, 'include_memory', True), + ) + + # 会话配置 + if hasattr(kfc_cfg, 'session'): + sess_cfg = kfc_cfg.session + config.session = SessionConfig( + session_dir=getattr(sess_cfg, 'session_dir', "kokoro_flow_chatter_v2/sessions"), + session_expire_seconds=getattr(sess_cfg, 'session_expire_seconds', 86400 * 7), + max_mental_log_entries=getattr(sess_cfg, 'max_mental_log_entries', 100), + ) + + # LLM 配置 + if hasattr(kfc_cfg, 'llm'): + llm_cfg = kfc_cfg.llm + config.llm = LLMConfig( + model_name=getattr(llm_cfg, 'model_name', ""), + temperature=getattr(llm_cfg, 'temperature', 0.8), + max_tokens=getattr(llm_cfg, 'max_tokens', 1024), + timeout=getattr(llm_cfg, 'timeout', 60.0), + ) + + except Exception as e: + from src.common.logger import get_logger + logger = get_logger("kfc_v2_config") + logger.warning(f"加载 KFC V2 配置失败,使用默认值: {e}") + + return config + + +def reload_config() -> KokoroFlowChatterV2Config: + """重新加载配置""" + global _config + _config = load_config() + return _config diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/context_builder.py b/src/plugins/built_in/kokoro_flow_chatter_v2/context_builder.py new file mode 100644 index 000000000..d3633ebfe --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/context_builder.py @@ -0,0 +1,338 @@ +""" +Kokoro Flow Chatter V2 上下文构建器 + +为 KFC V2 提供完整的情境感知能力。 +包含: +- 关系信息 (relation_info) +- 记忆块 (memory_block) +- 表达习惯 (expression_habits) +- 日程信息 (schedule) +- 时间信息 (time) +""" + +import asyncio +import time +from datetime import datetime, timedelta +from typing import TYPE_CHECKING, Any, Optional + +from src.common.logger import get_logger +from src.config.config import global_config +from src.person_info.person_info import get_person_info_manager, PersonInfoManager + +if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream + from src.common.data_models.message_manager_data_model import StreamContext + +logger = get_logger("kfc_v2_context_builder") + + +def _get_config(): + """获取全局配置(带类型断言)""" + assert global_config is not None, "global_config 未初始化" + return global_config + + +class KFCContextBuilder: + """ + KFC V2 上下文构建器 + + 为提示词提供完整的情境感知数据。 + """ + + def __init__(self, chat_stream: "ChatStream"): + self.chat_stream = chat_stream + self.chat_id = chat_stream.stream_id + self.platform = chat_stream.platform + self.is_group_chat = bool(chat_stream.group_info) + + async def build_all_context( + self, + sender_name: str, + target_message: str, + context: Optional["StreamContext"] = None, + ) -> dict[str, str]: + """ + 并行构建所有上下文模块 + + Args: + sender_name: 发送者名称 + target_message: 目标消息内容 + context: 聊天流上下文(可选) + + Returns: + dict: 包含所有上下文块的字典 + """ + chat_history = await self._get_chat_history_text(context) + + tasks = { + "relation_info": self._build_relation_info(sender_name, target_message), + "memory_block": self._build_memory_block(chat_history, target_message), + "expression_habits": self._build_expression_habits(chat_history, target_message), + "schedule": self._build_schedule_block(), + "time": self._build_time_block(), + } + + results = {} + try: + task_results = await asyncio.gather( + *[self._wrap_task(name, coro) for name, coro in tasks.items()], + return_exceptions=True + ) + + for result in task_results: + if isinstance(result, tuple): + name, value = result + results[name] = value + else: + logger.warning(f"上下文构建任务异常: {result}") + except Exception as e: + logger.error(f"并行构建上下文失败: {e}") + + return results + + async def _wrap_task(self, name: str, coro) -> tuple[str, str]: + """包装任务以返回名称和结果""" + try: + result = await coro + return (name, result or "") + except Exception as e: + logger.error(f"构建 {name} 失败: {e}") + return (name, "") + + async def _get_chat_history_text( + self, + context: Optional["StreamContext"] = None, + limit: int = 20, + ) -> str: + """获取聊天历史文本""" + if context is None: + return "" + + try: + from src.chat.utils.chat_message_builder import build_readable_messages + + messages = context.get_messages(limit=limit, include_unread=True) + if not messages: + return "" + + msg_dicts = [msg.flatten() for msg in messages] + + return await build_readable_messages( + msg_dicts, + replace_bot_name=True, + timestamp_mode="relative", + truncate=True, + ) + except Exception as e: + logger.error(f"获取聊天历史失败: {e}") + return "" + + async def _build_relation_info(self, sender_name: str, target_message: str) -> str: + """构建关系信息块""" + config = _get_config() + + if sender_name == f"{config.bot.nickname}(你)": + return "你将要回复的是你自己发送的消息。" + + person_info_manager = get_person_info_manager() + person_id = await person_info_manager.get_person_id_by_person_name(sender_name) + + if not person_id: + logger.debug(f"未找到用户 {sender_name} 的ID") + return f"你完全不认识{sender_name},这是你们的第一次互动。" + + try: + from src.person_info.relationship_fetcher import relationship_fetcher_manager + + relationship_fetcher = relationship_fetcher_manager.get_fetcher(self.chat_id) + + user_relation_info = await relationship_fetcher.build_relation_info(person_id, points_num=5) + stream_impression = await relationship_fetcher.build_chat_stream_impression(self.chat_id) + + parts = [] + if user_relation_info: + parts.append(f"### 你与 {sender_name} 的关系\n{user_relation_info}") + if stream_impression: + scene_type = "这个群" if self.is_group_chat else "你们的私聊" + parts.append(f"### 你对{scene_type}的印象\n{stream_impression}") + + if parts: + return "\n\n".join(parts) + else: + return f"你与{sender_name}还没有建立深厚的关系,这是早期的互动阶段。" + + except Exception as e: + logger.error(f"获取关系信息失败: {e}") + return f"你与{sender_name}是普通朋友关系。" + + async def _build_memory_block(self, chat_history: str, target_message: str) -> str: + """构建记忆块(使用三层记忆系统)""" + config = _get_config() + + if not (config.memory and config.memory.enable): + return "" + + try: + from src.memory_graph.manager_singleton import get_unified_memory_manager + from src.memory_graph.utils.three_tier_formatter import memory_formatter + + unified_manager = get_unified_memory_manager() + if not unified_manager: + logger.debug("[三层记忆] 管理器未初始化") + return "" + + search_result = await unified_manager.search_memories( + query_text=target_message, + use_judge=True, + recent_chat_history=chat_history, + ) + + if not search_result: + return "" + + perceptual_blocks = search_result.get("perceptual_blocks", []) + short_term_memories = search_result.get("short_term_memories", []) + long_term_memories = search_result.get("long_term_memories", []) + + formatted_memories = await memory_formatter.format_all_tiers( + perceptual_blocks=perceptual_blocks, + short_term_memories=short_term_memories, + long_term_memories=long_term_memories + ) + + total_count = len(perceptual_blocks) + len(short_term_memories) + len(long_term_memories) + if total_count > 0 and formatted_memories.strip(): + logger.info( + f"[三层记忆] 检索到 {total_count} 条记忆 " + f"(感知:{len(perceptual_blocks)}, 短期:{len(short_term_memories)}, 长期:{len(long_term_memories)})" + ) + return f"### 🧠 相关记忆\n\n{formatted_memories}" + + return "" + + except Exception as e: + logger.error(f"[三层记忆] 检索失败: {e}") + return "" + + async def _build_expression_habits(self, chat_history: str, target_message: str) -> str: + """构建表达习惯块""" + config = _get_config() + + use_expression, _, _ = config.expression.get_expression_config_for_chat(self.chat_id) + if not use_expression: + return "" + + try: + from src.chat.express.expression_selector import expression_selector + + style_habits = [] + grammar_habits = [] + + selected_expressions = await expression_selector.select_suitable_expressions( + chat_id=self.chat_id, + chat_history=chat_history, + target_message=target_message, + max_num=8, + min_num=2 + ) + + if selected_expressions: + for expr in selected_expressions: + if isinstance(expr, dict) and "situation" in expr and "style" in expr: + expr_type = expr.get("type", "style") + habit_str = f"当{expr['situation']}时,使用 {expr['style']}" + if expr_type == "grammar": + grammar_habits.append(habit_str) + else: + style_habits.append(habit_str) + + parts = [] + if style_habits: + parts.append("**语言风格习惯**:\n" + "\n".join(f"- {h}" for h in style_habits)) + if grammar_habits: + parts.append("**句法习惯**:\n" + "\n".join(f"- {h}" for h in grammar_habits)) + + if parts: + return "### 💬 你的表达习惯\n\n" + "\n\n".join(parts) + + return "" + + except Exception as e: + logger.error(f"构建表达习惯失败: {e}") + return "" + + async def _build_schedule_block(self) -> str: + """构建日程信息块""" + config = _get_config() + + if not config.planning_system.schedule_enable: + return "" + + try: + from src.schedule.schedule_manager import schedule_manager + + activity_info = schedule_manager.get_current_activity() + if not activity_info: + return "" + + activity = activity_info.get("activity") + time_range = activity_info.get("time_range") + now = datetime.now() + + if time_range: + try: + start_str, end_str = time_range.split("-") + start_time = datetime.strptime(start_str.strip(), "%H:%M").replace( + year=now.year, month=now.month, day=now.day + ) + end_time = datetime.strptime(end_str.strip(), "%H:%M").replace( + year=now.year, month=now.month, day=now.day + ) + + if end_time < start_time: + end_time += timedelta(days=1) + if now < start_time: + now += timedelta(days=1) + + duration_minutes = (now - start_time).total_seconds() / 60 + remaining_minutes = (end_time - now).total_seconds() / 60 + + return ( + f"你当前正在「{activity}」," + f"从{start_time.strftime('%H:%M')}开始,预计{end_time.strftime('%H:%M')}结束," + f"已进行{duration_minutes:.0f}分钟,还剩约{remaining_minutes:.0f}分钟。" + ) + except (ValueError, AttributeError): + pass + + return f"你当前正在「{activity}」" + + except Exception as e: + logger.error(f"构建日程块失败: {e}") + return "" + + async def _build_time_block(self) -> str: + """构建时间信息块""" + now = datetime.now() + weekdays = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] + weekday = weekdays[now.weekday()] + return f"{now.strftime('%Y年%m月%d日')} {weekday} {now.strftime('%H:%M:%S')}" + + +async def build_kfc_context( + chat_stream: "ChatStream", + sender_name: str, + target_message: str, + context: Optional["StreamContext"] = None, +) -> dict[str, str]: + """ + 便捷函数:构建KFC所需的所有上下文 + """ + builder = KFCContextBuilder(chat_stream) + return await builder.build_all_context(sender_name, target_message, context) + + +__all__ = [ + "KFCContextBuilder", + "build_kfc_context", +] diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/models.py b/src/plugins/built_in/kokoro_flow_chatter_v2/models.py new file mode 100644 index 000000000..9327f3dfb --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/models.py @@ -0,0 +1,320 @@ +""" +Kokoro Flow Chatter V2 - 数据模型 + +定义核心数据结构: +- EventType: 活动流事件类型 +- SessionStatus: 会话状态(仅 IDLE 和 WAITING) +- MentalLogEntry: 心理活动日志条目 +- WaitingConfig: 等待配置 +- ActionModel: 动作模型 +- LLMResponse: LLM 响应结构 +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any +import time + + +class EventType(Enum): + """ + 活动流事件类型 + + 用于标记 mental_log 中不同类型的事件, + 每种类型对应一个提示词小模板 + """ + # 用户相关 + USER_MESSAGE = "user_message" # 用户发送消息 + + # Bot 行动相关 + BOT_PLANNING = "bot_planning" # Bot 规划(thought + actions) + + # 等待相关 + WAITING_START = "waiting_start" # 开始等待 + WAITING_UPDATE = "waiting_update" # 等待期间心理变化 + REPLY_RECEIVED_IN_TIME = "reply_in_time" # 在预期内收到回复 + REPLY_RECEIVED_LATE = "reply_late" # 超出预期收到回复 + WAIT_TIMEOUT = "wait_timeout" # 等待超时 + + # 主动思考相关 + PROACTIVE_TRIGGER = "proactive_trigger" # 主动思考触发(长期沉默) + + def __str__(self) -> str: + return self.value + + +class SessionStatus(Enum): + """ + 会话状态 + + 极简设计,只有两种稳定状态: + - IDLE: 空闲,没有期待回复 + - WAITING: 等待对方回复中 + """ + IDLE = "idle" + WAITING = "waiting" + + def __str__(self) -> str: + return self.value + + +@dataclass +class WaitingConfig: + """ + 等待配置 + + 当 Bot 发送消息后设置的等待参数 + """ + expected_reaction: str = "" # 期望对方如何回应 + max_wait_seconds: int = 0 # 最长等待时间(秒),0 表示不等待 + started_at: float = 0.0 # 开始等待的时间戳 + last_thinking_at: float = 0.0 # 上次连续思考的时间戳 + thinking_count: int = 0 # 连续思考次数 + + def is_active(self) -> bool: + """是否正在等待""" + return self.max_wait_seconds > 0 and self.started_at > 0 + + def get_elapsed_seconds(self) -> float: + """获取已等待时间(秒)""" + if not self.is_active(): + return 0.0 + return time.time() - self.started_at + + def get_elapsed_minutes(self) -> float: + """获取已等待时间(分钟)""" + return self.get_elapsed_seconds() / 60 + + def is_timeout(self) -> bool: + """是否已超时""" + if not self.is_active(): + return False + return self.get_elapsed_seconds() >= self.max_wait_seconds + + def get_progress(self) -> float: + """获取等待进度 (0.0 - 1.0)""" + if not self.is_active() or self.max_wait_seconds <= 0: + return 0.0 + return min(self.get_elapsed_seconds() / self.max_wait_seconds, 1.0) + + def to_dict(self) -> dict[str, Any]: + return { + "expected_reaction": self.expected_reaction, + "max_wait_seconds": self.max_wait_seconds, + "started_at": self.started_at, + "last_thinking_at": self.last_thinking_at, + "thinking_count": self.thinking_count, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WaitingConfig": + return cls( + expected_reaction=data.get("expected_reaction", ""), + max_wait_seconds=data.get("max_wait_seconds", 0), + started_at=data.get("started_at", 0.0), + last_thinking_at=data.get("last_thinking_at", 0.0), + thinking_count=data.get("thinking_count", 0), + ) + + def reset(self) -> None: + """重置等待配置""" + self.expected_reaction = "" + self.max_wait_seconds = 0 + self.started_at = 0.0 + self.last_thinking_at = 0.0 + self.thinking_count = 0 + + +@dataclass +class MentalLogEntry: + """ + 心理活动日志条目 + + 记录活动流中的每一个事件节点, + 用于构建线性叙事风格的提示词 + """ + event_type: EventType + timestamp: float + + # 通用字段 + content: str = "" # 事件内容(消息文本、动作描述等) + + # 用户消息相关 + user_name: str = "" # 发送者名称 + user_id: str = "" # 发送者 ID + + # Bot 规划相关 + thought: str = "" # 内心想法 + actions: list[dict] = field(default_factory=list) # 执行的动作列表 + expected_reaction: str = "" # 期望的回应 + max_wait_seconds: int = 0 # 设定的等待时间 + + # 等待相关 + elapsed_seconds: float = 0.0 # 已等待时间 + waiting_thought: str = "" # 等待期间的想法 + mood: str = "" # 当前心情 + + # 元数据 + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "event_type": str(self.event_type), + "timestamp": self.timestamp, + "content": self.content, + "user_name": self.user_name, + "user_id": self.user_id, + "thought": self.thought, + "actions": self.actions, + "expected_reaction": self.expected_reaction, + "max_wait_seconds": self.max_wait_seconds, + "elapsed_seconds": self.elapsed_seconds, + "waiting_thought": self.waiting_thought, + "mood": self.mood, + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MentalLogEntry": + event_type_str = data.get("event_type", "user_message") + try: + event_type = EventType(event_type_str) + except ValueError: + event_type = EventType.USER_MESSAGE + + return cls( + event_type=event_type, + timestamp=data.get("timestamp", time.time()), + content=data.get("content", ""), + user_name=data.get("user_name", ""), + user_id=data.get("user_id", ""), + thought=data.get("thought", ""), + actions=data.get("actions", []), + expected_reaction=data.get("expected_reaction", ""), + max_wait_seconds=data.get("max_wait_seconds", 0), + elapsed_seconds=data.get("elapsed_seconds", 0.0), + waiting_thought=data.get("waiting_thought", ""), + mood=data.get("mood", ""), + metadata=data.get("metadata", {}), + ) + + def get_time_str(self, format: str = "%H:%M") -> str: + """获取格式化的时间字符串""" + return time.strftime(format, time.localtime(self.timestamp)) + + +@dataclass +class ActionModel: + """ + 动作模型 + + 表示 LLM 决策的单个动作 + """ + type: str # 动作类型 + params: dict[str, Any] = field(default_factory=dict) # 动作参数 + reason: str = "" # 选择该动作的理由 + + def to_dict(self) -> dict[str, Any]: + result = {"type": self.type} + if self.reason: + result["reason"] = self.reason + result.update(self.params) + return result + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ActionModel": + action_type = data.get("type", "do_nothing") + reason = data.get("reason", "") + params = {k: v for k, v in data.items() if k not in ("type", "reason")} + return cls(type=action_type, params=params, reason=reason) + + def get_description(self) -> str: + """获取动作的文字描述""" + if self.type == "reply": + content = self.params.get("content", "") + return f'发送消息:"{content[:50]}{"..." if len(content) > 50 else ""}"' + elif self.type == "poke_user": + return "戳了戳对方" + elif self.type == "do_nothing": + return "什么都没做" + elif self.type == "send_emoji": + emoji = self.params.get("emoji", "") + return f"发送表情:{emoji}" + else: + return f"执行动作:{self.type}" + + +@dataclass +class LLMResponse: + """ + LLM 响应结构 + + 定义 LLM 输出的 JSON 格式 + """ + thought: str # 内心想法 + actions: list[ActionModel] # 动作列表 + expected_reaction: str = "" # 期望对方的回应 + max_wait_seconds: int = 0 # 最长等待时间(0 = 不等待) + + # 可选字段 + mood: str = "" # 当前心情 + + def to_dict(self) -> dict[str, Any]: + return { + "thought": self.thought, + "actions": [a.to_dict() for a in self.actions], + "expected_reaction": self.expected_reaction, + "max_wait_seconds": self.max_wait_seconds, + "mood": self.mood, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "LLMResponse": + actions_data = data.get("actions", []) + actions = [ActionModel.from_dict(a) for a in actions_data] if actions_data else [] + + # 如果没有动作,添加默认的 do_nothing + if not actions: + actions = [ActionModel(type="do_nothing")] + + # 处理 max_wait_seconds,确保在合理范围内 + max_wait = data.get("max_wait_seconds", 0) + try: + max_wait = int(max_wait) + max_wait = max(0, min(max_wait, 1800)) # 0-30分钟 + except (ValueError, TypeError): + max_wait = 0 + + return cls( + thought=data.get("thought", ""), + actions=actions, + expected_reaction=data.get("expected_reaction", ""), + max_wait_seconds=max_wait, + mood=data.get("mood", ""), + ) + + @classmethod + def create_error_response(cls, error_message: str) -> "LLMResponse": + """创建错误响应""" + return cls( + thought=f"出现了问题:{error_message}", + actions=[ActionModel(type="do_nothing")], + expected_reaction="", + max_wait_seconds=0, + ) + + def has_reply(self) -> bool: + """是否包含回复动作""" + return any(a.type in ("reply", "respond") for a in self.actions) + + def get_reply_content(self) -> str: + """获取回复内容""" + for action in self.actions: + if action.type in ("reply", "respond"): + return action.params.get("content", "") + return "" + + def get_actions_description(self) -> str: + """获取所有动作的文字描述""" + descriptions = [a.get_description() for a in self.actions] + return " + ".join(descriptions) diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/plugin.py b/src/plugins/built_in/kokoro_flow_chatter_v2/plugin.py new file mode 100644 index 000000000..9af94ccc7 --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/plugin.py @@ -0,0 +1,105 @@ +""" +Kokoro Flow Chatter V2 - 插件注册 + +注册 Chatter +""" + +from typing import Any, ClassVar + +from src.common.logger import get_logger +from src.plugin_system.base.base_plugin import BasePlugin +from src.plugin_system.base.component_types import ChatterInfo +from src.plugin_system.decorators import register_plugin + +from .chatter import KokoroFlowChatterV2 +from .config import get_config +from .proactive_thinker import start_proactive_thinker, stop_proactive_thinker + +logger = get_logger("kfc_v2_plugin") + + +@register_plugin +class KokoroFlowChatterV2Plugin(BasePlugin): + """ + Kokoro Flow Chatter V2 插件 + + 专为私聊设计的增强 Chatter: + - 线性叙事提示词架构 + - 等待机制与心理状态演变 + - 主动思考能力 + """ + + plugin_name: str = "kokoro_flow_chatter_v2" + enable_plugin: bool = True + plugin_priority: int = 50 # 高于默认 Chatter + dependencies: ClassVar[list[str]] = [] + python_dependencies: ClassVar[list[str]] = [] + config_file_name: str = "config.toml" + + # 状态 + _is_started: bool = False + + async def on_plugin_loaded(self): + """插件加载时""" + config = get_config() + + if not config.enabled: + logger.info("[KFC V2] 插件已禁用") + return + + logger.info("[KFC V2] 插件已加载") + + # 启动主动思考器 + if config.proactive.enabled: + try: + await start_proactive_thinker() + logger.info("[KFC V2] 主动思考器已启动") + self._is_started = True + except Exception as e: + logger.error(f"[KFC V2] 启动主动思考器失败: {e}") + + async def on_plugin_unloaded(self): + """插件卸载时""" + try: + await stop_proactive_thinker() + logger.info("[KFC V2] 主动思考器已停止") + self._is_started = False + except Exception as e: + logger.warning(f"[KFC V2] 停止主动思考器失败: {e}") + + def get_plugin_components(self): + """返回组件列表""" + config = get_config() + + if not config.enabled: + return [] + + components = [] + + try: + # 注册 Chatter + components.append(( + KokoroFlowChatterV2.get_chatter_info(), + KokoroFlowChatterV2, + )) + logger.debug("[KFC V2] 成功加载 KokoroFlowChatterV2 组件") + except Exception as e: + logger.error(f"[KFC V2] 加载组件失败: {e}") + + return components + + def get_plugin_info(self) -> dict[str, Any]: + """获取插件信息""" + return { + "name": self.plugin_name, + "display_name": "Kokoro Flow Chatter V2", + "version": "2.0.0", + "author": "MoFox", + "description": "专为私聊设计的增强 Chatter", + "features": [ + "线性叙事提示词架构", + "心理活动流记录", + "等待机制与超时处理", + "主动思考能力", + ], + } diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/proactive_thinker.py b/src/plugins/built_in/kokoro_flow_chatter_v2/proactive_thinker.py new file mode 100644 index 000000000..382b9500f --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/proactive_thinker.py @@ -0,0 +1,500 @@ +""" +Kokoro Flow Chatter V2 - 主动思考器 + +独立组件,负责: +1. 等待期间的连续思考(更新心理状态) +2. 等待超时决策(继续等 or 做点什么) +3. 长期沉默后主动发起对话 + +通过 UnifiedScheduler 定期触发,与 Chatter 解耦 +""" + +import asyncio +import random +import time +from datetime import datetime +from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional + +from src.common.logger import get_logger +from src.config.config import global_config +from src.plugin_system.apis.unified_scheduler import TriggerType, unified_scheduler + +from .action_executor import ActionExecutor +from .models import EventType, SessionStatus +from .replyer import generate_response +from .session import KokoroSession, get_session_manager + +if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream + +logger = get_logger("kfc_v2_proactive_thinker") + + +class ProactiveThinker: + """ + 主动思考器 + + 独立于 Chatter,负责处理: + 1. 等待期间的连续思考 + 2. 等待超时 + 3. 长期沉默后主动发起 + + 核心逻辑: + - 定期检查所有 WAITING 状态的 Session + - 触发连续思考或超时决策 + - 定期检查长期沉默的 Session,考虑主动发起 + """ + + # 连续思考触发点(等待进度百分比) + THINKING_TRIGGERS = [0.3, 0.6, 0.85] + + # 任务名称 + TASK_WAITING_CHECK = "kfc_v2_waiting_check" + TASK_PROACTIVE_CHECK = "kfc_v2_proactive_check" + + def __init__(self): + self.session_manager = get_session_manager() + + # 配置 + self._load_config() + + # 调度任务 ID + self._waiting_schedule_id: Optional[str] = None + self._proactive_schedule_id: Optional[str] = None + self._running = False + + # 统计 + self._stats = { + "waiting_checks": 0, + "continuous_thinking_triggered": 0, + "timeout_decisions": 0, + "proactive_triggered": 0, + } + + def _load_config(self) -> None: + """加载配置""" + # 默认配置 + self.waiting_check_interval = 15.0 # 等待检查间隔(秒) + self.proactive_check_interval = 300.0 # 主动思考检查间隔(秒) + self.silence_threshold = 7200 # 沉默阈值(秒) + self.min_proactive_interval = 1800 # 两次主动思考最小间隔(秒) + self.quiet_hours_start = "23:00" + self.quiet_hours_end = "07:00" + + # 从全局配置读取 + if global_config and hasattr(global_config, 'kokoro_flow_chatter'): + kfc_config = global_config.kokoro_flow_chatter + if hasattr(kfc_config, 'proactive_thinking'): + proactive_cfg = kfc_config.proactive_thinking + self.silence_threshold = getattr(proactive_cfg, 'silence_threshold_seconds', 7200) + self.min_proactive_interval = getattr(proactive_cfg, 'min_interval_between_proactive', 1800) + + async def start(self) -> None: + """启动主动思考器""" + if self._running: + logger.warning("[ProactiveThinker] 已在运行中") + return + + self._running = True + + # 注册等待检查任务 + self._waiting_schedule_id = await unified_scheduler.create_schedule( + callback=self._check_waiting_sessions, + trigger_type=TriggerType.TIME, + trigger_config={"delay_seconds": self.waiting_check_interval}, + is_recurring=True, + task_name=self.TASK_WAITING_CHECK, + force_overwrite=True, + timeout=60.0, + ) + + # 注册主动思考检查任务 + self._proactive_schedule_id = await unified_scheduler.create_schedule( + callback=self._check_proactive_sessions, + trigger_type=TriggerType.TIME, + trigger_config={"delay_seconds": self.proactive_check_interval}, + is_recurring=True, + task_name=self.TASK_PROACTIVE_CHECK, + force_overwrite=True, + timeout=120.0, + ) + + logger.info("[ProactiveThinker] 已启动") + + async def stop(self) -> None: + """停止主动思考器""" + if not self._running: + return + + self._running = False + + if self._waiting_schedule_id: + await unified_scheduler.remove_schedule(self._waiting_schedule_id) + if self._proactive_schedule_id: + await unified_scheduler.remove_schedule(self._proactive_schedule_id) + + logger.info("[ProactiveThinker] 已停止") + + # ======================== + # 等待检查 + # ======================== + + async def _check_waiting_sessions(self) -> None: + """检查所有等待中的 Session""" + self._stats["waiting_checks"] += 1 + + sessions = await self.session_manager.get_waiting_sessions() + if not sessions: + return + + # 并行处理 + tasks = [ + asyncio.create_task(self._process_waiting_session(s)) + for s in sessions + ] + await asyncio.gather(*tasks, return_exceptions=True) + + async def _process_waiting_session(self, session: KokoroSession) -> None: + """处理单个等待中的 Session""" + try: + if session.status != SessionStatus.WAITING: + return + + if not session.waiting_config.is_active(): + return + + # 检查是否超时 + if session.waiting_config.is_timeout(): + await self._handle_timeout(session) + return + + # 检查是否需要触发连续思考 + progress = session.waiting_config.get_progress() + if self._should_trigger_thinking(session, progress): + await self._handle_continuous_thinking(session, progress) + + except Exception as e: + logger.error(f"[ProactiveThinker] 处理等待 Session 失败 {session.user_id}: {e}") + + def _should_trigger_thinking(self, session: KokoroSession, progress: float) -> bool: + """判断是否应触发连续思考""" + # 计算应该触发的次数 + expected_count = sum(1 for t in self.THINKING_TRIGGERS if progress >= t) + + if session.waiting_config.thinking_count >= expected_count: + return False + + # 确保两次思考之间有间隔 + if session.waiting_config.last_thinking_at > 0: + elapsed = time.time() - session.waiting_config.last_thinking_at + if elapsed < 30: # 至少 30 秒间隔 + return False + + return True + + async def _handle_continuous_thinking( + self, + session: KokoroSession, + progress: float, + ) -> None: + """处理连续思考""" + self._stats["continuous_thinking_triggered"] += 1 + + # 生成等待中的想法 + thought = self._generate_waiting_thought(session, progress) + + # 记录到 mental_log + session.add_waiting_update( + waiting_thought=thought, + mood="", # 可以根据进度设置心情 + ) + + # 更新思考计数 + session.waiting_config.thinking_count += 1 + session.waiting_config.last_thinking_at = time.time() + + # 保存 + await self.session_manager.save_session(session.user_id) + + logger.debug( + f"[ProactiveThinker] 连续思考: user={session.user_id}, " + f"progress={progress:.1%}, thought={thought[:30]}..." + ) + + def _generate_waiting_thought(self, session: KokoroSession, progress: float) -> str: + """生成等待中的想法""" + elapsed_minutes = session.waiting_config.get_elapsed_minutes() + + if progress < 0.4: + thoughts = [ + f"已经等了 {elapsed_minutes:.0f} 分钟了,对方可能在忙吧...", + "不知道对方在做什么呢", + "再等等看吧", + ] + elif progress < 0.7: + thoughts = [ + f"等了 {elapsed_minutes:.0f} 分钟了,有点担心...", + "对方是不是忘记回复了?", + "嗯...还是没有消息", + ] + else: + thoughts = [ + f"已经等了 {elapsed_minutes:.0f} 分钟了,感觉有点焦虑", + "要不要主动说点什么呢...", + "快到时间了,对方还是没回", + ] + + return random.choice(thoughts) + + async def _handle_timeout(self, session: KokoroSession) -> None: + """处理等待超时""" + self._stats["timeout_decisions"] += 1 + + logger.info(f"[ProactiveThinker] 等待超时: user={session.user_id}") + + try: + # 获取聊天流 + chat_stream = await self._get_chat_stream(session.stream_id) + + # 加载动作 + action_executor = ActionExecutor(session.stream_id) + await action_executor.load_actions() + + # 调用 Replyer 生成超时决策 + response = await generate_response( + session=session, + user_name=session.user_id, # 这里可以改进,获取真实用户名 + situation_type="timeout", + chat_stream=chat_stream, + available_actions=action_executor.get_available_actions(), + ) + + # 执行动作 + exec_result = await action_executor.execute(response, chat_stream) + + # 记录到 mental_log + session.add_bot_planning( + thought=response.thought, + actions=[a.to_dict() for a in response.actions], + expected_reaction=response.expected_reaction, + max_wait_seconds=response.max_wait_seconds, + ) + + # 更新状态 + if response.max_wait_seconds > 0: + # 继续等待 + session.start_waiting( + expected_reaction=response.expected_reaction, + max_wait_seconds=response.max_wait_seconds, + ) + else: + # 不再等待 + session.end_waiting() + + # 保存 + await self.session_manager.save_session(session.user_id) + + logger.info( + f"[ProactiveThinker] 超时决策完成: user={session.user_id}, " + f"actions={[a.type for a in response.actions]}, " + f"continue_wait={response.max_wait_seconds > 0}" + ) + + except Exception as e: + logger.error(f"[ProactiveThinker] 处理超时失败: {e}") + # 出错时结束等待 + session.end_waiting() + await self.session_manager.save_session(session.user_id) + + # ======================== + # 主动思考(长期沉默) + # ======================== + + async def _check_proactive_sessions(self) -> None: + """检查是否有需要主动发起对话的 Session""" + # 检查是否在勿扰时段 + if self._is_quiet_hours(): + return + + sessions = await self.session_manager.get_all_sessions() + current_time = time.time() + + for session in sessions: + try: + trigger_reason = self._should_trigger_proactive(session, current_time) + if trigger_reason: + await self._handle_proactive(session, trigger_reason) + except Exception as e: + logger.error(f"[ProactiveThinker] 检查主动思考失败 {session.user_id}: {e}") + + def _is_quiet_hours(self) -> bool: + """检查是否在勿扰时段""" + try: + now = datetime.now() + current_minutes = now.hour * 60 + now.minute + + start_parts = self.quiet_hours_start.split(":") + start_minutes = int(start_parts[0]) * 60 + int(start_parts[1]) + + end_parts = self.quiet_hours_end.split(":") + end_minutes = int(end_parts[0]) * 60 + int(end_parts[1]) + + if start_minutes <= end_minutes: + return start_minutes <= current_minutes < end_minutes + else: + return current_minutes >= start_minutes or current_minutes < end_minutes + except: + return False + + def _should_trigger_proactive( + self, + session: KokoroSession, + current_time: float, + ) -> Optional[str]: + """判断是否应触发主动思考""" + # 只检查 IDLE 状态的 Session + if session.status != SessionStatus.IDLE: + return None + + # 检查沉默时长 + silence_duration = current_time - session.last_activity_at + if silence_duration < self.silence_threshold: + return None + + # 检查距离上次主动思考的间隔 + if session.last_proactive_at: + time_since_last = current_time - session.last_proactive_at + if time_since_last < self.min_proactive_interval: + return None + + # 概率触发(避免每次检查都触发) + if random.random() > 0.3: # 30% 概率 + return None + + silence_hours = silence_duration / 3600 + return f"沉默了 {silence_hours:.1f} 小时" + + async def _handle_proactive( + self, + session: KokoroSession, + trigger_reason: str, + ) -> None: + """处理主动思考""" + self._stats["proactive_triggered"] += 1 + + logger.info(f"[ProactiveThinker] 主动思考触发: user={session.user_id}, reason={trigger_reason}") + + try: + # 获取聊天流 + chat_stream = await self._get_chat_stream(session.stream_id) + + # 加载动作 + action_executor = ActionExecutor(session.stream_id) + await action_executor.load_actions() + + # 计算沉默时长 + silence_seconds = time.time() - session.last_activity_at + if silence_seconds < 3600: + silence_duration = f"{silence_seconds / 60:.0f} 分钟" + else: + silence_duration = f"{silence_seconds / 3600:.1f} 小时" + + # 调用 Replyer + response = await generate_response( + session=session, + user_name=session.user_id, + situation_type="proactive", + chat_stream=chat_stream, + available_actions=action_executor.get_available_actions(), + extra_context={ + "trigger_reason": trigger_reason, + "silence_duration": silence_duration, + }, + ) + + # 检查是否决定不打扰 + is_do_nothing = ( + len(response.actions) == 0 or + (len(response.actions) == 1 and response.actions[0].type == "do_nothing") + ) + + if is_do_nothing: + logger.info(f"[ProactiveThinker] 决定不打扰: user={session.user_id}") + session.last_proactive_at = time.time() + await self.session_manager.save_session(session.user_id) + return + + # 执行动作 + exec_result = await action_executor.execute(response, chat_stream) + + # 记录到 mental_log + session.add_bot_planning( + thought=response.thought, + actions=[a.to_dict() for a in response.actions], + expected_reaction=response.expected_reaction, + max_wait_seconds=response.max_wait_seconds, + ) + + # 更新状态 + session.last_proactive_at = time.time() + if response.max_wait_seconds > 0: + session.start_waiting( + expected_reaction=response.expected_reaction, + max_wait_seconds=response.max_wait_seconds, + ) + + # 保存 + await self.session_manager.save_session(session.user_id) + + logger.info( + f"[ProactiveThinker] 主动发起完成: user={session.user_id}, " + f"actions={[a.type for a in response.actions]}" + ) + + except Exception as e: + logger.error(f"[ProactiveThinker] 主动思考失败: {e}") + + async def _get_chat_stream(self, stream_id: str): + """获取聊天流""" + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + if chat_manager: + return await chat_manager.get_stream(stream_id) + except Exception as e: + logger.warning(f"[ProactiveThinker] 获取 chat_stream 失败: {e}") + return None + + def get_stats(self) -> dict: + """获取统计信息""" + return { + **self._stats, + "is_running": self._running, + } + + +# 全局单例 +_proactive_thinker: Optional[ProactiveThinker] = None + + +def get_proactive_thinker() -> ProactiveThinker: + """获取全局主动思考器""" + global _proactive_thinker + if _proactive_thinker is None: + _proactive_thinker = ProactiveThinker() + return _proactive_thinker + + +async def start_proactive_thinker() -> ProactiveThinker: + """启动主动思考器""" + thinker = get_proactive_thinker() + await thinker.start() + return thinker + + +async def stop_proactive_thinker() -> None: + """停止主动思考器""" + global _proactive_thinker + if _proactive_thinker: + await _proactive_thinker.stop() diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/__init__.py b/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/__init__.py new file mode 100644 index 000000000..501e3b92f --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/__init__.py @@ -0,0 +1,16 @@ +""" +Kokoro Flow Chatter V2 - 提示词模块 + +使用项目统一的 Prompt 管理系统管理所有提示词模板 +""" + +# 导入 prompts 模块以注册提示词 +from . import prompts # noqa: F401 +from .builder import PromptBuilder, get_prompt_builder +from .prompts import PROMPT_NAMES + +__all__ = [ + "PromptBuilder", + "get_prompt_builder", + "PROMPT_NAMES", +] diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/builder.py b/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/builder.py new file mode 100644 index 000000000..83bf5e91d --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/builder.py @@ -0,0 +1,388 @@ +""" +Kokoro Flow Chatter V2 - 提示词构建器 + +使用项目统一的 Prompt 管理系统构建提示词 +""" + +import time +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +from src.chat.utils.prompt import global_prompt_manager +from src.common.logger import get_logger +from src.config.config import global_config + +from ..models import EventType, MentalLogEntry, SessionStatus +from ..session import KokoroSession + +# 导入模板注册(确保模板被注册到 global_prompt_manager) +from . import prompts as _ # noqa: F401 +from .prompts import PROMPT_NAMES + +if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream + +logger = get_logger("kfc_v2_prompt_builder") + + +class PromptBuilder: + """ + 提示词构建器 + + 使用统一的 Prompt 管理系统构建提示词: + 1. 构建活动流(从 mental_log 生成线性叙事) + 2. 构建当前情况描述 + 3. 使用 global_prompt_manager 格式化最终提示词 + """ + + def __init__(self): + self._context_builder = None + + async def build_prompt( + self, + session: KokoroSession, + user_name: str, + situation_type: str = "new_message", + chat_stream: Optional["ChatStream"] = None, + available_actions: Optional[dict] = None, + extra_context: Optional[dict] = None, + ) -> str: + """ + 构建完整的提示词 + + Args: + session: 会话对象 + user_name: 用户名称 + situation_type: 情况类型 (new_message/reply_in_time/reply_late/timeout/proactive) + chat_stream: 聊天流对象 + available_actions: 可用动作字典 + extra_context: 额外上下文(如 trigger_reason) + + Returns: + 完整的提示词 + """ + extra_context = extra_context or {} + + # 1. 构建人设块 + persona_block = self._build_persona_block() + + # 2. 构建关系块 + relation_block = await self._build_relation_block(user_name, chat_stream) + + # 3. 构建活动流 + activity_stream = await self._build_activity_stream(session, user_name) + + # 4. 构建当前情况 + current_situation = await self._build_current_situation( + session, user_name, situation_type, extra_context + ) + + # 5. 构建可用动作 + actions_block = self._build_actions_block(available_actions) + + # 6. 获取输出格式 + output_format = await self._get_output_format() + + # 7. 使用统一的 prompt 管理系统格式化 + prompt = await global_prompt_manager.format_prompt( + PROMPT_NAMES["main"], + user_name=user_name, + persona_block=persona_block, + relation_block=relation_block, + activity_stream=activity_stream or "(这是你们第一次聊天)", + current_situation=current_situation, + available_actions=actions_block, + output_format=output_format, + ) + + return prompt + + def _build_persona_block(self) -> str: + """构建人设块""" + if global_config is None: + return "你是一个温暖、真诚的人。" + + personality = global_config.personality + parts = [] + + if personality.personality_core: + parts.append(personality.personality_core) + + if personality.personality_side: + parts.append(personality.personality_side) + + if personality.identity: + parts.append(personality.identity) + + if personality.reply_style: + parts.append(f"\n### 说话风格\n{personality.reply_style}") + + return "\n\n".join(parts) if parts else "你是一个温暖、真诚的人。" + + async def _build_relation_block( + self, + user_name: str, + chat_stream: Optional["ChatStream"], + ) -> str: + """构建关系块""" + if not chat_stream: + return f"你与 {user_name} 还不太熟悉,这是早期的交流阶段。" + + try: + # 延迟导入上下文构建器 + if self._context_builder is None: + from ..context_builder import KFCContextBuilder + self._context_builder = KFCContextBuilder + + builder = self._context_builder(chat_stream) + context_data = await builder.build_all_context( + sender_name=user_name, + target_message="", + context=None, + ) + + relation_info = context_data.get("relation_info", "") + if relation_info: + return relation_info + + except Exception as e: + logger.warning(f"构建关系块失败: {e}") + + return f"你与 {user_name} 还不太熟悉,这是早期的交流阶段。" + + async def _build_activity_stream( + self, + session: KokoroSession, + user_name: str, + ) -> str: + """ + 构建活动流 + + 将 mental_log 中的事件按时间顺序转换为线性叙事 + 使用统一的 prompt 模板 + """ + entries = session.get_recent_entries(limit=30) + if not entries: + return "" + + parts = [] + + for entry in entries: + part = await self._format_entry(entry, user_name) + if part: + parts.append(part) + + return "\n\n".join(parts) + + async def _format_entry(self, entry: MentalLogEntry, user_name: str) -> str: + """格式化单个活动日志条目""" + + if entry.event_type == EventType.USER_MESSAGE: + # 用户消息 + result = await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_user_message"], + time=entry.get_time_str(), + user_name=entry.user_name or user_name, + content=entry.content, + ) + + # 如果有回复状态元数据,添加说明 + reply_status = entry.metadata.get("reply_status") + if reply_status == "in_time": + elapsed = entry.metadata.get("elapsed_seconds", 0) / 60 + max_wait = entry.metadata.get("max_wait_seconds", 0) / 60 + result += await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_reply_in_time"], + elapsed_minutes=elapsed, + max_wait_minutes=max_wait, + ) + elif reply_status == "late": + elapsed = entry.metadata.get("elapsed_seconds", 0) / 60 + max_wait = entry.metadata.get("max_wait_seconds", 0) / 60 + result += await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_reply_late"], + elapsed_minutes=elapsed, + max_wait_minutes=max_wait, + ) + + return result + + elif entry.event_type == EventType.BOT_PLANNING: + # Bot 规划 + actions_desc = self._format_actions(entry.actions) + + if entry.max_wait_seconds > 0: + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_bot_planning"], + thought=entry.thought or "(没有特别的想法)", + actions_description=actions_desc, + expected_reaction=entry.expected_reaction or "随便怎么回应都行", + max_wait_minutes=entry.max_wait_seconds / 60, + ) + else: + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_bot_planning_no_wait"], + thought=entry.thought or "(没有特别的想法)", + actions_description=actions_desc, + ) + + elif entry.event_type == EventType.WAITING_UPDATE: + # 等待中心理变化 + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_waiting_update"], + elapsed_minutes=entry.elapsed_seconds / 60, + waiting_thought=entry.waiting_thought or "还在等...", + ) + + elif entry.event_type == EventType.PROACTIVE_TRIGGER: + # 主动思考触发 + silence = entry.metadata.get("silence_duration", "一段时间") + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["entry_proactive_trigger"], + silence_duration=silence, + ) + + return "" + + def _format_actions(self, actions: list[dict]) -> str: + """格式化动作列表为可读描述""" + if not actions: + return "(无动作)" + + descriptions = [] + for action in actions: + action_type = action.get("type", "unknown") + + if action_type == "reply": + content = action.get("content", "") + if len(content) > 50: + content = content[:50] + "..." + descriptions.append(f"发送消息:「{content}」") + elif action_type == "poke_user": + descriptions.append("戳了戳对方") + elif action_type == "do_nothing": + descriptions.append("什么都不做") + elif action_type == "send_emoji": + emoji = action.get("emoji", "") + descriptions.append(f"发送表情:{emoji}") + else: + descriptions.append(f"执行动作:{action_type}") + + return "、".join(descriptions) + + async def _build_current_situation( + self, + session: KokoroSession, + user_name: str, + situation_type: str, + extra_context: dict, + ) -> str: + """构建当前情况描述""" + current_time = datetime.now().strftime("%Y年%m月%d日 %H:%M") + + if situation_type == "new_message": + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["situation_new_message"], + current_time=current_time, + user_name=user_name, + ) + + elif situation_type == "reply_in_time": + elapsed = session.waiting_config.get_elapsed_seconds() + max_wait = session.waiting_config.max_wait_seconds + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["situation_reply_in_time"], + current_time=current_time, + user_name=user_name, + elapsed_minutes=elapsed / 60, + max_wait_minutes=max_wait / 60, + ) + + elif situation_type == "reply_late": + elapsed = session.waiting_config.get_elapsed_seconds() + max_wait = session.waiting_config.max_wait_seconds + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["situation_reply_late"], + current_time=current_time, + user_name=user_name, + elapsed_minutes=elapsed / 60, + max_wait_minutes=max_wait / 60, + ) + + elif situation_type == "timeout": + elapsed = session.waiting_config.get_elapsed_seconds() + max_wait = session.waiting_config.max_wait_seconds + expected = session.waiting_config.expected_reaction + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["situation_timeout"], + current_time=current_time, + user_name=user_name, + elapsed_minutes=elapsed / 60, + max_wait_minutes=max_wait / 60, + expected_reaction=expected or "对方能回复点什么", + ) + + elif situation_type == "proactive": + silence = extra_context.get("silence_duration", "一段时间") + trigger_reason = extra_context.get("trigger_reason", "") + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["situation_proactive"], + current_time=current_time, + user_name=user_name, + silence_duration=silence, + trigger_reason=trigger_reason, + ) + + # 默认使用 new_message + return await global_prompt_manager.format_prompt( + PROMPT_NAMES["situation_new_message"], + current_time=current_time, + user_name=user_name, + ) + + def _build_actions_block(self, available_actions: Optional[dict]) -> str: + """构建可用动作块""" + if not available_actions: + return self._get_default_actions_block() + + lines = [] + for name, info in available_actions.items(): + desc = getattr(info, "description", "") or f"执行 {name}" + lines.append(f"- `{name}`: {desc}") + + return "\n".join(lines) if lines else self._get_default_actions_block() + + def _get_default_actions_block(self) -> str: + """获取默认的动作列表""" + return """- `reply`: 发送文字消息(参数:content) +- `poke_user`: 戳一戳对方 +- `do_nothing`: 什么都不做""" + + async def _get_output_format(self) -> str: + """获取输出格式模板""" + try: + prompt = await global_prompt_manager.get_prompt_async( + PROMPT_NAMES["output_format"] + ) + return prompt.template + except KeyError: + # 如果模板未注册,返回默认格式 + return """请用 JSON 格式回复: +{ + "thought": "你的想法", + "actions": [{"type": "reply", "content": "你的回复"}], + "expected_reaction": "期待的反应", + "max_wait_seconds": 300 +}""" + + +# 全局单例 +_prompt_builder: Optional[PromptBuilder] = None + + +def get_prompt_builder() -> PromptBuilder: + """获取全局提示词构建器""" + global _prompt_builder + if _prompt_builder is None: + _prompt_builder = PromptBuilder() + return _prompt_builder diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/prompts.py b/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/prompts.py new file mode 100644 index 000000000..77f1da867 --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/prompt/prompts.py @@ -0,0 +1,217 @@ +""" +Kokoro Flow Chatter V2 - 提示词模板注册 + +使用项目统一的 Prompt 管理系统注册所有 KFC V2 使用的提示词模板 +""" + +from src.chat.utils.prompt import Prompt + +# ================================================================================================= +# KFC V2 主提示词模板 +# ================================================================================================= + +KFC_V2_MAIN_PROMPT = Prompt( + name="kfc_v2_main", + template="""# 你与 {user_name} 的私聊 + +## 1. 你是谁 +{persona_block} + +## 2. 你与 {user_name} 的关系 +{relation_block} + +## 3. 你们之间发生的事(活动流) +以下是你和 {user_name} 最近的互动历史,按时间顺序记录了你们的对话和你的心理活动: + +{activity_stream} + +## 4. 当前情况 +{current_situation} + +## 5. 你可以做的事情 +{available_actions} + +## 6. 你的回复格式 +{output_format} +""", +) + +# ================================================================================================= +# 输出格式模板 +# ================================================================================================= + +KFC_V2_OUTPUT_FORMAT = Prompt( + name="kfc_v2_output_format", + template="""请用以下 JSON 格式回复: +```json +{{ + "thought": "你脑子里在想什么,越自然越好", + "actions": [ + {{"type": "reply", "content": "你要说的话"}}, + {{"type": "其他动作", "参数": "值"}} + ], + "expected_reaction": "你期待对方的反应是什么", + "max_wait_seconds": 300 +}} +``` + +说明: +- `thought`:你的内心独白,记录你此刻的想法和感受 +- `actions`:你要执行的动作列表,可以组合多个 +- `expected_reaction`:你期待对方如何回应(用于判断是否需要等待) +- `max_wait_seconds`:设定等待时间(秒),0 表示不等待,超时后你会考虑是否要主动说点什么 +- 即使什么都不想做,也放一个 `{{"type": "do_nothing"}}`""", +) + +# ================================================================================================= +# 情景模板 - 根据不同情境使用不同的当前情况描述 +# ================================================================================================= + +KFC_V2_SITUATION_NEW_MESSAGE = Prompt( + name="kfc_v2_situation_new_message", + template="""现在是 {current_time}。 + +{user_name} 刚刚给你发了消息。这是一次新的对话发起(不是对你之前消息的回复)。 + +请决定你要怎么回应。你可以: +- 发送文字消息回复 +- 发表情包 +- 戳一戳对方 +- 什么都不做(如果觉得没必要回复) +- 或者组合多个动作""", +) + +KFC_V2_SITUATION_REPLY_IN_TIME = Prompt( + name="kfc_v2_situation_reply_in_time", + template="""现在是 {current_time}。 + +你之前发了消息后一直在等 {user_name} 的回复。 +等了大约 {elapsed_minutes:.1f} 分钟(你原本打算最多等 {max_wait_minutes:.1f} 分钟)。 +现在 {user_name} 回复了! + +请决定你接下来要怎么回应。""", +) + +KFC_V2_SITUATION_REPLY_LATE = Prompt( + name="kfc_v2_situation_reply_late", + template="""现在是 {current_time}。 + +你之前发了消息后在等 {user_name} 的回复。 +你原本打算最多等 {max_wait_minutes:.1f} 分钟,但实际等了 {elapsed_minutes:.1f} 分钟才收到回复。 +虽然有点迟,但 {user_name} 终于回复了。 + +请决定你接下来要怎么回应。(可以选择轻轻抱怨一下迟到,也可以装作没在意)""", +) + +KFC_V2_SITUATION_TIMEOUT = Prompt( + name="kfc_v2_situation_timeout", + template="""现在是 {current_time}。 + +你之前发了消息后一直在等 {user_name} 的回复。 +你原本打算最多等 {max_wait_minutes:.1f} 分钟,现在已经等了 {elapsed_minutes:.1f} 分钟了,对方还是没回。 +你期待的反应是:"{expected_reaction}" + +你需要决定: +1. 继续等待(设置新的 max_wait_seconds) +2. 主动说点什么打破沉默 +3. 做点别的事情(戳一戳、发表情等) +4. 算了不等了(max_wait_seconds = 0)""", +) + +KFC_V2_SITUATION_PROACTIVE = Prompt( + name="kfc_v2_situation_proactive", + template="""现在是 {current_time}。 + +你和 {user_name} 已经有一段时间没聊天了(沉默了 {silence_duration})。 +{trigger_reason} + +你在想要不要主动找 {user_name} 聊点什么。 + +请决定: +1. 主动发起对话(想个话题开场) +2. 发个表情或戳一戳试探一下 +3. 算了,现在不是好时机(do_nothing) + +如果决定发起对话,想想用什么自然的方式开场,不要太突兀。""", +) + +# ================================================================================================= +# 活动流条目模板 - 用于构建 activity_stream +# ================================================================================================= + +# 用户消息条目 +KFC_V2_ENTRY_USER_MESSAGE = Prompt( + name="kfc_v2_entry_user_message", + template="""【{time}】{user_name} 说: +"{content}" +""", +) + +# Bot 规划条目(有等待) +KFC_V2_ENTRY_BOT_PLANNING = Prompt( + name="kfc_v2_entry_bot_planning", + template="""【你的想法】 +内心:{thought} +行动:{actions_description} +期待:{expected_reaction} +决定等待:最多 {max_wait_minutes:.1f} 分钟 +""", +) + +# Bot 规划条目(无等待) +KFC_V2_ENTRY_BOT_PLANNING_NO_WAIT = Prompt( + name="kfc_v2_entry_bot_planning_no_wait", + template="""【你的想法】 +内心:{thought} +行动:{actions_description} +(不打算等对方回复) +""", +) + +# 等待期间心理变化 +KFC_V2_ENTRY_WAITING_UPDATE = Prompt( + name="kfc_v2_entry_waiting_update", + template="""【等待中... {elapsed_minutes:.1f} 分钟过去了】 +你想:{waiting_thought} +""", +) + +# 收到及时回复时的标注 +KFC_V2_ENTRY_REPLY_IN_TIME = Prompt( + name="kfc_v2_entry_reply_in_time", + template="""→ (对方在你预期时间内回复了,等了 {elapsed_minutes:.1f} 分钟) +""", +) + +# 收到迟到回复时的标注 +KFC_V2_ENTRY_REPLY_LATE = Prompt( + name="kfc_v2_entry_reply_late", + template="""→ (对方回复迟了,你原本只打算等 {max_wait_minutes:.1f} 分钟,实际等了 {elapsed_minutes:.1f} 分钟) +""", +) + +# 主动思考触发 +KFC_V2_ENTRY_PROACTIVE_TRIGGER = Prompt( + name="kfc_v2_entry_proactive_trigger", + template="""【沉默了 {silence_duration}】 +你开始考虑要不要主动找对方聊点什么... +""", +) + +# 导出所有模板名称,方便外部引用 +PROMPT_NAMES = { + "main": "kfc_v2_main", + "output_format": "kfc_v2_output_format", + "situation_new_message": "kfc_v2_situation_new_message", + "situation_reply_in_time": "kfc_v2_situation_reply_in_time", + "situation_reply_late": "kfc_v2_situation_reply_late", + "situation_timeout": "kfc_v2_situation_timeout", + "situation_proactive": "kfc_v2_situation_proactive", + "entry_user_message": "kfc_v2_entry_user_message", + "entry_bot_planning": "kfc_v2_entry_bot_planning", + "entry_bot_planning_no_wait": "kfc_v2_entry_bot_planning_no_wait", + "entry_waiting_update": "kfc_v2_entry_waiting_update", + "entry_reply_in_time": "kfc_v2_entry_reply_in_time", + "entry_reply_late": "kfc_v2_entry_reply_late", + "entry_proactive_trigger": "kfc_v2_entry_proactive_trigger", +} diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/replyer.py b/src/plugins/built_in/kokoro_flow_chatter_v2/replyer.py new file mode 100644 index 000000000..a8c417cf2 --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/replyer.py @@ -0,0 +1,107 @@ +""" +Kokoro Flow Chatter V2 - Replyer + +简化的回复生成模块,使用插件系统的 llm_api +""" + +from typing import TYPE_CHECKING, Optional + +from src.common.logger import get_logger +from src.plugin_system.apis import llm_api +from src.utils.json_parser import extract_and_parse_json + +from .models import LLMResponse +from .prompt.builder import get_prompt_builder +from .session import KokoroSession + +if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream + +logger = get_logger("kfc_v2_replyer") + + +async def generate_response( + session: KokoroSession, + user_name: str, + situation_type: str = "new_message", + chat_stream: Optional["ChatStream"] = None, + available_actions: Optional[dict] = None, + extra_context: Optional[dict] = None, +) -> LLMResponse: + """ + 生成回复 + + Args: + session: 会话对象 + user_name: 用户名称 + situation_type: 情况类型 + chat_stream: 聊天流对象 + available_actions: 可用动作字典 + extra_context: 额外上下文 + + Returns: + LLMResponse 对象 + """ + try: + # 1. 构建提示词 + prompt_builder = get_prompt_builder() + prompt = await prompt_builder.build_prompt( + session=session, + user_name=user_name, + situation_type=situation_type, + chat_stream=chat_stream, + available_actions=available_actions, + extra_context=extra_context, + ) + + logger.debug(f"[KFC Replyer] 构建的提示词:\n{prompt}") + + # 2. 获取模型配置并调用 LLM + models = llm_api.get_available_models() + replyer_config = models.get("replyer") + + if not replyer_config: + logger.error("[KFC Replyer] 未找到 replyer 模型配置") + return LLMResponse.create_error_response("未找到 replyer 模型配置") + + success, raw_response, reasoning, model_name = await llm_api.generate_with_model( + prompt=prompt, + model_config=replyer_config, + request_type="kokoro_flow_chatter_v2", + ) + + if not success: + logger.error(f"[KFC Replyer] LLM 调用失败: {raw_response}") + return LLMResponse.create_error_response(raw_response) + + logger.debug(f"[KFC Replyer] LLM 响应 (model={model_name}):\n{raw_response}") + + # 3. 解析响应 + return _parse_response(raw_response) + + except Exception as e: + logger.error(f"[KFC Replyer] 生成失败: {e}") + import traceback + traceback.print_exc() + return LLMResponse.create_error_response(str(e)) + + +def _parse_response(raw_response: str) -> LLMResponse: + """解析 LLM 响应""" + data = extract_and_parse_json(raw_response, strict=False) + + if not data or not isinstance(data, dict): + logger.warning(f"[KFC Replyer] 无法解析 JSON: {raw_response[:200]}...") + return LLMResponse.create_error_response("无法解析响应格式") + + response = LLMResponse.from_dict(data) + + if response.thought: + logger.info( + f"[KFC Replyer] 解析成功: thought={response.thought[:50]}..., " + f"actions={[a.type for a in response.actions]}" + ) + else: + logger.warning("[KFC Replyer] 响应缺少 thought") + + return response diff --git a/src/plugins/built_in/kokoro_flow_chatter_v2/session.py b/src/plugins/built_in/kokoro_flow_chatter_v2/session.py new file mode 100644 index 000000000..df321070d --- /dev/null +++ b/src/plugins/built_in/kokoro_flow_chatter_v2/session.py @@ -0,0 +1,386 @@ +""" +Kokoro Flow Chatter V2 - 会话管理 + +极简的会话状态管理: +- Session 只有 IDLE 和 WAITING 两种状态 +- 包含 mental_log(心理活动历史) +- 包含 waiting_config(等待配置) +""" + +import asyncio +import json +import os +import time +from pathlib import Path +from typing import Optional + +from src.common.logger import get_logger + +from .models import ( + EventType, + MentalLogEntry, + SessionStatus, + WaitingConfig, +) + +logger = get_logger("kfc_v2_session") + + +class KokoroSession: + """ + Kokoro Flow Chatter V2 会话 + + 为每个私聊用户维护一个独立的会话,包含: + - 基本信息(user_id, stream_id) + - 状态(只有 IDLE 和 WAITING) + - 心理活动历史(mental_log) + - 等待配置(waiting_config) + """ + + # 心理活动日志最大保留条数 + MAX_MENTAL_LOG_SIZE = 50 + + def __init__( + self, + user_id: str, + stream_id: str, + ): + self.user_id = user_id + self.stream_id = stream_id + + # 状态(只有 IDLE 和 WAITING) + self._status: SessionStatus = SessionStatus.IDLE + + # 心理活动历史 + self.mental_log: list[MentalLogEntry] = [] + + # 等待配置 + self.waiting_config: WaitingConfig = WaitingConfig() + + # 时间戳 + self.created_at: float = time.time() + self.last_activity_at: float = time.time() + + # 统计 + self.total_interactions: int = 0 + + # 上次主动思考时间 + self.last_proactive_at: Optional[float] = None + + @property + def status(self) -> SessionStatus: + return self._status + + @status.setter + def status(self, value: SessionStatus) -> None: + old_status = self._status + self._status = value + if old_status != value: + logger.debug(f"Session {self.user_id} 状态变更: {old_status} → {value}") + + def add_entry(self, entry: MentalLogEntry) -> None: + """添加心理活动日志条目""" + self.mental_log.append(entry) + self.last_activity_at = time.time() + + # 保持日志在合理大小 + if len(self.mental_log) > self.MAX_MENTAL_LOG_SIZE: + self.mental_log = self.mental_log[-self.MAX_MENTAL_LOG_SIZE:] + + def add_user_message( + self, + content: str, + user_name: str, + user_id: str, + timestamp: Optional[float] = None, + ) -> MentalLogEntry: + """添加用户消息事件""" + entry = MentalLogEntry( + event_type=EventType.USER_MESSAGE, + timestamp=timestamp or time.time(), + content=content, + user_name=user_name, + user_id=user_id, + ) + + # 如果之前在等待,记录收到回复的情况 + if self.status == SessionStatus.WAITING and self.waiting_config.is_active(): + elapsed = self.waiting_config.get_elapsed_seconds() + max_wait = self.waiting_config.max_wait_seconds + + if elapsed <= max_wait: + entry.metadata["reply_status"] = "in_time" + entry.metadata["elapsed_seconds"] = elapsed + entry.metadata["max_wait_seconds"] = max_wait + else: + entry.metadata["reply_status"] = "late" + entry.metadata["elapsed_seconds"] = elapsed + entry.metadata["max_wait_seconds"] = max_wait + + self.add_entry(entry) + return entry + + def add_bot_planning( + self, + thought: str, + actions: list[dict], + expected_reaction: str = "", + max_wait_seconds: int = 0, + timestamp: Optional[float] = None, + ) -> MentalLogEntry: + """添加 Bot 规划事件""" + entry = MentalLogEntry( + event_type=EventType.BOT_PLANNING, + timestamp=timestamp or time.time(), + thought=thought, + actions=actions, + expected_reaction=expected_reaction, + max_wait_seconds=max_wait_seconds, + ) + self.add_entry(entry) + self.total_interactions += 1 + return entry + + def add_waiting_update( + self, + waiting_thought: str, + mood: str = "", + timestamp: Optional[float] = None, + ) -> MentalLogEntry: + """添加等待期间的心理变化""" + entry = MentalLogEntry( + event_type=EventType.WAITING_UPDATE, + timestamp=timestamp or time.time(), + waiting_thought=waiting_thought, + mood=mood, + elapsed_seconds=self.waiting_config.get_elapsed_seconds(), + ) + self.add_entry(entry) + return entry + + def start_waiting( + self, + expected_reaction: str, + max_wait_seconds: int, + ) -> None: + """开始等待""" + if max_wait_seconds <= 0: + # 不等待,直接进入 IDLE + self.status = SessionStatus.IDLE + self.waiting_config.reset() + return + + self.status = SessionStatus.WAITING + self.waiting_config = WaitingConfig( + expected_reaction=expected_reaction, + max_wait_seconds=max_wait_seconds, + started_at=time.time(), + last_thinking_at=0.0, + thinking_count=0, + ) + logger.debug( + f"Session {self.user_id} 开始等待: " + f"max_wait={max_wait_seconds}s, expected={expected_reaction[:30]}..." + ) + + def end_waiting(self) -> None: + """结束等待""" + self.status = SessionStatus.IDLE + self.waiting_config.reset() + + def get_recent_entries(self, limit: int = 20) -> list[MentalLogEntry]: + """获取最近的心理活动日志""" + return self.mental_log[-limit:] if self.mental_log else [] + + def get_last_bot_message(self) -> Optional[str]: + """获取最后一条 Bot 发送的消息""" + for entry in reversed(self.mental_log): + if entry.event_type == EventType.BOT_PLANNING: + for action in entry.actions: + if action.get("type") in ("reply", "respond"): + return action.get("content", "") + return None + + def to_dict(self) -> dict: + """转换为字典(用于持久化)""" + return { + "user_id": self.user_id, + "stream_id": self.stream_id, + "status": str(self.status), + "mental_log": [e.to_dict() for e in self.mental_log], + "waiting_config": self.waiting_config.to_dict(), + "created_at": self.created_at, + "last_activity_at": self.last_activity_at, + "total_interactions": self.total_interactions, + "last_proactive_at": self.last_proactive_at, + } + + @classmethod + def from_dict(cls, data: dict) -> "KokoroSession": + """从字典创建会话""" + session = cls( + user_id=data.get("user_id", ""), + stream_id=data.get("stream_id", ""), + ) + + # 状态 + status_str = data.get("status", "idle") + try: + session._status = SessionStatus(status_str) + except ValueError: + session._status = SessionStatus.IDLE + + # 心理活动历史 + mental_log_data = data.get("mental_log", []) + session.mental_log = [MentalLogEntry.from_dict(e) for e in mental_log_data] + + # 等待配置 + waiting_data = data.get("waiting_config", {}) + session.waiting_config = WaitingConfig.from_dict(waiting_data) + + # 时间戳 + session.created_at = data.get("created_at", time.time()) + session.last_activity_at = data.get("last_activity_at", time.time()) + session.total_interactions = data.get("total_interactions", 0) + session.last_proactive_at = data.get("last_proactive_at") + + return session + + +class SessionManager: + """ + 会话管理器 + + 负责会话的创建、获取、保存和清理 + """ + + _instance: Optional["SessionManager"] = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__( + self, + data_dir: str = "data/kokoro_flow_chatter_v2/sessions", + max_session_age_days: int = 30, + ): + if hasattr(self, "_initialized") and self._initialized: + return + + self._initialized = True + self.data_dir = Path(data_dir) + self.max_session_age_days = max_session_age_days + + # 内存缓存 + self._sessions: dict[str, KokoroSession] = {} + self._locks: dict[str, asyncio.Lock] = {} + + # 确保数据目录存在 + self.data_dir.mkdir(parents=True, exist_ok=True) + + logger.info(f"SessionManager 初始化完成: {self.data_dir}") + + def _get_lock(self, user_id: str) -> asyncio.Lock: + """获取用户级别的锁""" + if user_id not in self._locks: + self._locks[user_id] = asyncio.Lock() + return self._locks[user_id] + + def _get_file_path(self, user_id: str) -> Path: + """获取会话文件路径""" + safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in user_id) + return self.data_dir / f"{safe_id}.json" + + async def get_session(self, user_id: str, stream_id: str) -> KokoroSession: + """获取或创建会话""" + async with self._get_lock(user_id): + # 检查内存缓存 + if user_id in self._sessions: + session = self._sessions[user_id] + session.stream_id = stream_id # 更新 stream_id + return session + + # 尝试从文件加载 + session = await self._load_from_file(user_id) + if session: + session.stream_id = stream_id + self._sessions[user_id] = session + return session + + # 创建新会话 + session = KokoroSession(user_id=user_id, stream_id=stream_id) + self._sessions[user_id] = session + logger.info(f"创建新会话: {user_id}") + return session + + async def _load_from_file(self, user_id: str) -> Optional[KokoroSession]: + """从文件加载会话""" + file_path = self._get_file_path(user_id) + if not file_path.exists(): + return None + + try: + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + session = KokoroSession.from_dict(data) + logger.debug(f"从文件加载会话: {user_id}") + return session + except Exception as e: + logger.error(f"加载会话失败 {user_id}: {e}") + return None + + async def save_session(self, user_id: str) -> bool: + """保存会话到文件""" + async with self._get_lock(user_id): + if user_id not in self._sessions: + return False + + session = self._sessions[user_id] + file_path = self._get_file_path(user_id) + + try: + data = session.to_dict() + temp_path = file_path.with_suffix(".json.tmp") + + with open(temp_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + os.replace(temp_path, file_path) + return True + except Exception as e: + logger.error(f"保存会话失败 {user_id}: {e}") + return False + + async def save_all(self) -> int: + """保存所有会话""" + count = 0 + for user_id in list(self._sessions.keys()): + if await self.save_session(user_id): + count += 1 + return count + + async def get_waiting_sessions(self) -> list[KokoroSession]: + """获取所有处于等待状态的会话""" + return [s for s in self._sessions.values() if s.status == SessionStatus.WAITING] + + async def get_all_sessions(self) -> list[KokoroSession]: + """获取所有会话""" + return list(self._sessions.values()) + + def get_session_sync(self, user_id: str) -> Optional[KokoroSession]: + """同步获取会话(仅从内存)""" + return self._sessions.get(user_id) + + +# 全局单例 +_session_manager: Optional[SessionManager] = None + + +def get_session_manager() -> SessionManager: + """获取全局会话管理器""" + global _session_manager + if _session_manager is None: + _session_manager = SessionManager() + return _session_manager