From 9d0488ef5a0834f20d7c487432f6e9649a0766d6 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Sun, 30 Nov 2025 21:01:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=20KFC=20=E5=9B=9E?= =?UTF-8?q?=E5=A4=8D=E5=8A=A8=E4=BD=9C=EF=BC=8C=E4=BC=98=E5=8C=96=E5=9B=9E?= =?UTF-8?q?=E5=A4=8D=E7=94=9F=E6=88=90=E6=B5=81=E7=A8=8B=E5=92=8C=E4=B8=8A?= =?UTF-8?q?=E4=B8=8B=E6=96=87=E6=B3=A8=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kokoro_flow_chatter/actions/reply.py | 258 ++++++++++++++++-- .../built_in/kokoro_flow_chatter/chatter.py | 36 +-- .../kokoro_flow_chatter/proactive_thinker.py | 60 ++-- 3 files changed, 263 insertions(+), 91 deletions(-) diff --git a/src/plugins/built_in/kokoro_flow_chatter/actions/reply.py b/src/plugins/built_in/kokoro_flow_chatter/actions/reply.py index c43da326b..fc6edb1b8 100644 --- a/src/plugins/built_in/kokoro_flow_chatter/actions/reply.py +++ b/src/plugins/built_in/kokoro_flow_chatter/actions/reply.py @@ -1,34 +1,53 @@ """ KFC 回复动作模块 -KFC 的 reply 动作与 AFC 不同: -- 不调用 LLM 生成回复,content 由 Replyer 提前生成 -- 动作本身只负责发送 content 参数中的内容 +KFC 的 reply 动作: +- 完整的回复流程在 execute() 中实现 +- 调用 Replyer 生成回复文本 +- 回复后处理(系统格式词过滤、分段发送、错字生成等) +- 发送回复消息 + +与 AFC 类似,但使用 KFC 专属的 Replyer 和 Session 系统。 """ -from typing import ClassVar +import asyncio +from typing import TYPE_CHECKING, ClassVar, Optional from src.common.logger import get_logger +from src.config.config import global_config from src.plugin_system import ActionActivationType, BaseAction, ChatMode from src.plugin_system.apis import send_api +if TYPE_CHECKING: + from ..session import KokoroSession + logger = get_logger("kfc_reply_action") class KFCReplyAction(BaseAction): - """KFC Reply 动作 - 发送已生成的回复内容 + """KFC Reply 动作 - 完整的私聊回复流程 特点: - - 不调用 LLM,直接发送 content 参数中的内容 - - content 由 Replyer 提前生成 + - 完整的回复流程:生成回复 → 后处理 → 分段发送 + - 使用 KFC 专属的 Replyer 生成回复 + - 支持系统格式词过滤、分段发送、错字生成等后处理 - 仅限 KokoroFlowChatter 使用 - 注意:使用 kfc_reply 作为动作名称以避免与 AFC 的 reply 动作冲突 + action_data 参数: + - user_id: 用户ID(必需,用于获取 Session) + - user_name: 用户名称(必需) + - thought: Planner 生成的想法/内心独白(必需) + - situation_type: 情况类型(可选,默认 "new_message") + - extra_context: 额外上下文(可选) + - content: 预生成的回复内容(可选,如果提供则直接发送) + - should_quote_reply: 是否引用原消息(可选,默认 false) + - enable_splitter: 是否启用分段发送(可选,默认 true) + - enable_chinese_typo: 是否启用错字生成(可选,默认 true) """ # 动作基本信息 action_name = "kfc_reply" - action_description = "发送回复消息。content 参数包含要发送的内容。" + action_description = "发送回复消息。会根据当前对话情境生成并发送回复。" # 激活设置 activation_type = ActionActivationType.ALWAYS @@ -40,45 +59,230 @@ class KFCReplyAction(BaseAction): # 动作参数定义 action_parameters: ClassVar = { - "content": "要发送的回复内容(必需,由 Replyer 生成)", + "content": "要发送的回复内容(可选,如果不提供则自动生成)", "should_quote_reply": "是否引用原消息(可选,true/false,默认 false)", } # 动作使用场景 action_require: ClassVar = [ - "发送回复消息时使用", - "content 参数必须包含要发送的内容", + "需要发送回复消息时使用", + "私聊场景的标准回复动作", ] # 关联类型 associated_types: ClassVar[list[str]] = ["text"] async def execute(self) -> tuple[bool, str]: - """执行 reply 动作 - 发送 content 中的内容""" + """执行 reply 动作 - 完整的回复流程""" try: - # 获取要发送的内容 + # 1. 检查是否有预生成的内容 content = self.action_data.get("content", "") + if not content: - logger.warning(f"{self.log_prefix} content 为空,跳过发送") - return True, "" + # 2. 需要生成回复,获取必要信息 + user_id = self.action_data.get("user_id") + user_name = self.action_data.get("user_name", "用户") + thought = self.action_data.get("thought", "") + situation_type = self.action_data.get("situation_type", "new_message") + extra_context = self.action_data.get("extra_context") + + if not user_id: + logger.warning(f"{self.log_prefix} 缺少 user_id,无法生成回复") + return False, "" + + # 3. 获取 Session + session = await self._get_session(user_id) + if not session: + logger.warning(f"{self.log_prefix} 无法获取 Session: {user_id}") + return False, "" + + # 4. 调用 Replyer 生成回复 + success, content = await self._generate_reply( + session=session, + user_name=user_name, + thought=thought, + situation_type=situation_type, + extra_context=extra_context, + ) + + if not success or not content: + logger.warning(f"{self.log_prefix} 回复生成失败") + return False, "" - # 获取是否引用 - should_quote = self.action_data.get("should_quote_reply", False) + # 5. 回复后处理(系统格式词过滤 + 分段处理) + enable_splitter = self.action_data.get("enable_splitter", True) + enable_chinese_typo = self.action_data.get("enable_chinese_typo", True) - # 发送消息 - await send_api.text_to_stream( - text=content, - stream_id=self.chat_stream.stream_id, - reply_to_message=self.action_message, - set_reply=should_quote and bool(self.action_message), - typing=False, + processed_segments = self._post_process_reply( + content=content, + enable_splitter=enable_splitter, + enable_chinese_typo=enable_chinese_typo, ) - logger.info(f"{self.log_prefix} KFC reply 动作执行成功") - return True, content + if not processed_segments: + logger.warning(f"{self.log_prefix} 回复后处理后内容为空") + return False, "" + # 6. 分段发送回复 + should_quote = self.action_data.get("should_quote_reply", False) + reply_text = await self._send_segments( + segments=processed_segments, + should_quote=should_quote, + ) + + logger.info(f"{self.log_prefix} KFC reply 动作执行成功: {reply_text[:50]}...") + return True, reply_text + + except asyncio.CancelledError: + logger.debug(f"{self.log_prefix} 回复任务被取消") + return False, "" except Exception as e: logger.error(f"{self.log_prefix} KFC reply 动作执行失败: {e}") import traceback traceback.print_exc() return False, "" + + def _post_process_reply( + self, + content: str, + enable_splitter: bool = True, + enable_chinese_typo: bool = True, + ) -> list[str]: + """ + 回复后处理 + + 包括: + 1. 系统格式词过滤(移除 [回复...]、[表情包:...]、@<...> 等) + 2. 分段处理(根据标点分句、智能合并) + 3. 错字生成(拟人化) + + Args: + content: 原始回复内容 + enable_splitter: 是否启用分段 + enable_chinese_typo: 是否启用错字生成 + + Returns: + 处理后的文本段落列表 + """ + try: + from src.chat.utils.utils import filter_system_format_content, process_llm_response + + # 1. 过滤系统格式词 + filtered_content = filter_system_format_content(content) + + if not filtered_content or not filtered_content.strip(): + logger.warning(f"{self.log_prefix} 过滤系统格式词后内容为空") + return [] + + # 2. 分段处理 + 错字生成 + processed_segments = process_llm_response( + filtered_content, + enable_splitter=enable_splitter, + enable_chinese_typo=enable_chinese_typo, + ) + + # 过滤空段落 + processed_segments = [seg for seg in processed_segments if seg and seg.strip()] + + logger.debug( + f"{self.log_prefix} 回复后处理完成: " + f"原始长度={len(content)}, 过滤后长度={len(filtered_content)}, " + f"分段数={len(processed_segments)}" + ) + + return processed_segments + + except Exception as e: + logger.error(f"{self.log_prefix} 回复后处理失败: {e}") + # 失败时返回原始内容 + return [content] if content else [] + + async def _send_segments( + self, + segments: list[str], + should_quote: bool = False, + ) -> str: + """ + 分段发送回复 + + Args: + segments: 要发送的文本段落列表 + should_quote: 是否引用原消息(仅第一条消息引用) + + Returns: + 完整的回复文本(所有段落拼接) + """ + reply_text = "" + first_sent = False + + # 获取分段发送的间隔时间 + typing_delay = 0.5 + if global_config and hasattr(global_config, 'response_splitter'): + typing_delay = getattr(global_config.response_splitter, "typing_delay", 0.5) + + for segment in segments: + if not segment or not segment.strip(): + continue + + reply_text += segment + + # 发送消息 + if not first_sent: + # 第一条消息:可能需要引用 + await send_api.text_to_stream( + text=segment, + stream_id=self.chat_stream.stream_id, + reply_to_message=self.action_message, + set_reply=should_quote and bool(self.action_message), + typing=False, + ) + first_sent = True + else: + # 后续消息:模拟打字延迟 + if typing_delay > 0: + await asyncio.sleep(typing_delay) + + await send_api.text_to_stream( + text=segment, + stream_id=self.chat_stream.stream_id, + reply_to_message=None, + set_reply=False, + typing=True, + ) + + return reply_text + + async def _get_session(self, user_id: str) -> Optional["KokoroSession"]: + """获取用户 Session""" + try: + from ..session import get_session_manager + + session_manager = get_session_manager() + return await session_manager.get_session(user_id, self.chat_stream.stream_id) + except Exception as e: + logger.error(f"{self.log_prefix} 获取 Session 失败: {e}") + return None + + async def _generate_reply( + self, + session: "KokoroSession", + user_name: str, + thought: str, + situation_type: str, + extra_context: Optional[dict] = None, + ) -> tuple[bool, str]: + """调用 Replyer 生成回复""" + try: + from ..replyer import generate_reply_text + + return await generate_reply_text( + session=session, + user_name=user_name, + thought=thought, + situation_type=situation_type, + chat_stream=self.chat_stream, + extra_context=extra_context, + ) + except Exception as e: + logger.error(f"{self.log_prefix} 生成回复失败: {e}") + return False, "" diff --git a/src/plugins/built_in/kokoro_flow_chatter/chatter.py b/src/plugins/built_in/kokoro_flow_chatter/chatter.py index 63d69b632..6c2b0e802 100644 --- a/src/plugins/built_in/kokoro_flow_chatter/chatter.py +++ b/src/plugins/built_in/kokoro_flow_chatter/chatter.py @@ -3,8 +3,8 @@ Kokoro Flow Chatter - Chatter 主类 极简设计,只负责: 1. 收到消息 -2. 调用 Replyer 生成响应 -3. 执行动作 +2. 调用 Planner 生成规划 +3. 执行动作(回复在 Action.execute() 中生成) 4. 更新 Session """ @@ -20,7 +20,6 @@ from src.plugin_system.base.component_types import ChatType from .models import SessionStatus from .planner import generate_plan -from .replyer import generate_reply_text from .session import get_session_manager if TYPE_CHECKING: @@ -153,30 +152,19 @@ class KokoroFlowChatter(BaseChatter): available_actions=available_actions, ) - # 10. 对于需要回复的动作,调用 Replyer 生成实际文本 - processed_actions = [] + # 10. 为 kfc_reply 动作注入必要的上下文信息 for action in plan_response.actions: if action.type == "kfc_reply": - # 调用 replyer 生成回复文本 - success, reply_text = await generate_reply_text( - session=session, - user_name=user_name, - thought=plan_response.thought, - situation_type=situation_type, - chat_stream=chat_stream, - ) - if success and reply_text: - # 更新 action 的 content - action.params["content"] = reply_text - else: - logger.warning("[KFC] 回复生成失败,跳过该动作") - continue - processed_actions.append(action) + # 注入回复生成所需的上下文 + action.params["user_id"] = user_id + action.params["user_name"] = user_name + action.params["thought"] = plan_response.thought + action.params["situation_type"] = situation_type - # 11. 执行动作 + # 11. 执行动作(回复生成在 Action.execute() 中完成) exec_results = [] has_reply = False - for action in processed_actions: + for action in plan_response.actions: result = await self.action_manager.execute_action( action_name=action.type, chat_id=self.stream_id, @@ -193,7 +181,7 @@ class KokoroFlowChatter(BaseChatter): # 12. 记录 Bot 规划到 mental_log session.add_bot_planning( thought=plan_response.thought, - actions=[a.to_dict() for a in processed_actions], + actions=[a.to_dict() for a in plan_response.actions], expected_reaction=plan_response.expected_reaction, max_wait_seconds=plan_response.max_wait_seconds, ) @@ -222,7 +210,7 @@ class KokoroFlowChatter(BaseChatter): logger.info( f"{SOFT_PURPLE}[KFC]{RESET} 处理完成: " f"user={user_name}, situation={situation_type}, " - f"actions={[a.type for a in processed_actions]}, " + f"actions={[a.type for a in plan_response.actions]}, " f"wait={plan_response.max_wait_seconds}s" ) diff --git a/src/plugins/built_in/kokoro_flow_chatter/proactive_thinker.py b/src/plugins/built_in/kokoro_flow_chatter/proactive_thinker.py index fffc1719b..0cee53028 100644 --- a/src/plugins/built_in/kokoro_flow_chatter/proactive_thinker.py +++ b/src/plugins/built_in/kokoro_flow_chatter/proactive_thinker.py @@ -22,7 +22,7 @@ from src.plugin_system.apis.unified_scheduler import TriggerType, unified_schedu from .models import EventType, SessionStatus from .planner import generate_plan -from .replyer import _clean_reply_text, generate_reply_text +from .replyer import _clean_reply_text from .session import KokoroSession, get_session_manager if TYPE_CHECKING: @@ -412,26 +412,16 @@ class ProactiveThinker: available_actions=action_manager.get_using_actions(), ) - # 对于需要回复的动作,调用 Replyer 生成实际文本 - processed_actions = [] + # 为 kfc_reply 动作注入必要的上下文信息 for action in plan_response.actions: if action.type == "kfc_reply": - success, reply_text = await generate_reply_text( - session=session, - user_name=user_name, - thought=plan_response.thought, - situation_type="timeout", - chat_stream=chat_stream, - ) - if success and reply_text: - action.params["content"] = reply_text - else: - logger.warning("[ProactiveThinker] 回复生成失败,跳过该动作") - continue - processed_actions.append(action) + action.params["user_id"] = session.user_id + action.params["user_name"] = user_name + action.params["thought"] = plan_response.thought + action.params["situation_type"] = "timeout" - # 执行动作 - for action in processed_actions: + # 执行动作(回复生成在 Action.execute() 中完成) + for action in plan_response.actions: await action_manager.execute_action( action_name=action.type, chat_id=session.stream_id, @@ -445,7 +435,7 @@ class ProactiveThinker: # 记录到 mental_log session.add_bot_planning( thought=plan_response.thought, - actions=[a.to_dict() for a in processed_actions], + actions=[a.to_dict() for a in plan_response.actions], expected_reaction=plan_response.expected_reaction, max_wait_seconds=plan_response.max_wait_seconds, ) @@ -466,7 +456,7 @@ class ProactiveThinker: logger.info( f"[ProactiveThinker] 超时决策完成: user={session.user_id}, " - f"actions={[a.type for a in processed_actions]}, " + f"actions={[a.type for a in plan_response.actions]}, " f"continue_wait={plan_response.max_wait_seconds > 0}" ) @@ -612,27 +602,17 @@ class ProactiveThinker: await self.session_manager.save_session(session.user_id) return - # 对于需要回复的动作,调用 Replyer 生成实际文本 - processed_actions = [] + # 为 kfc_reply 动作注入必要的上下文信息 for action in plan_response.actions: if action.type == "kfc_reply": - success, reply_text = await generate_reply_text( - session=session, - user_name=user_name, - thought=plan_response.thought, - situation_type="proactive", - chat_stream=chat_stream, - extra_context=extra_context, - ) - if success and reply_text: - action.params["content"] = reply_text - else: - logger.warning("[ProactiveThinker] 回复生成失败,跳过该动作") - continue - processed_actions.append(action) + action.params["user_id"] = session.user_id + action.params["user_name"] = user_name + action.params["thought"] = plan_response.thought + action.params["situation_type"] = "proactive" + action.params["extra_context"] = extra_context - # 执行动作 - for action in processed_actions: + # 执行动作(回复生成在 Action.execute() 中完成) + for action in plan_response.actions: await action_manager.execute_action( action_name=action.type, chat_id=session.stream_id, @@ -646,7 +626,7 @@ class ProactiveThinker: # 记录到 mental_log session.add_bot_planning( thought=plan_response.thought, - actions=[a.to_dict() for a in processed_actions], + actions=[a.to_dict() for a in plan_response.actions], expected_reaction=plan_response.expected_reaction, max_wait_seconds=plan_response.max_wait_seconds, ) @@ -664,7 +644,7 @@ class ProactiveThinker: logger.info( f"[ProactiveThinker] 主动发起完成: user={session.user_id}, " - f"actions={[a.type for a in processed_actions]}" + f"actions={[a.type for a in plan_response.actions]}" ) except Exception as e: