feat: 重构 KFC 回复动作,优化回复生成流程和上下文注入

This commit is contained in:
Windpicker-owo
2025-11-30 21:01:02 +08:00
parent 1288fd6cf7
commit 9d0488ef5a
3 changed files with 263 additions and 91 deletions

View File

@@ -1,34 +1,53 @@
"""
KFC 回复动作模块
KFC 的 reply 动作与 AFC 不同
- 不调用 LLM 生成回复content 由 Replyer 提前生成
- 动作本身只负责发送 content 参数中的内容
KFC 的 reply 动作:
- 完整的回复流程在 execute() 中实现
- 调用 Replyer 生成回复文本
- 回复后处理(系统格式词过滤、分段发送、错字生成等)
- 发送回复消息
与 AFC 类似,但使用 KFC 专属的 Replyer 和 Session 系统。
"""
from typing import ClassVar
import asyncio
from typing import TYPE_CHECKING, ClassVar, Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system import ActionActivationType, BaseAction, ChatMode
from src.plugin_system.apis import send_api
if TYPE_CHECKING:
from ..session import KokoroSession
logger = get_logger("kfc_reply_action")
class KFCReplyAction(BaseAction):
"""KFC Reply 动作 - 发送已生成的回复内容
"""KFC Reply 动作 - 完整的私聊回复流程
特点:
- 不调用 LLM直接发送 content 参数中的内容
- content 由 Replyer 提前生成
- 完整的回复流程:生成回复 → 后处理 → 分段发送
- 使用 KFC 专属的 Replyer 生成回复
- 支持系统格式词过滤、分段发送、错字生成等后处理
- 仅限 KokoroFlowChatter 使用
注意:使用 kfc_reply 作为动作名称以避免与 AFC 的 reply 动作冲突
action_data 参数:
- user_id: 用户ID必需用于获取 Session
- user_name: 用户名称(必需)
- thought: Planner 生成的想法/内心独白(必需)
- situation_type: 情况类型(可选,默认 "new_message"
- extra_context: 额外上下文(可选)
- content: 预生成的回复内容(可选,如果提供则直接发送)
- should_quote_reply: 是否引用原消息(可选,默认 false
- enable_splitter: 是否启用分段发送(可选,默认 true
- enable_chinese_typo: 是否启用错字生成(可选,默认 true
"""
# 动作基本信息
action_name = "kfc_reply"
action_description = "发送回复消息。content 参数包含要发送的内容"
action_description = "发送回复消息。会根据当前对话情境生成并发送回复"
# 激活设置
activation_type = ActionActivationType.ALWAYS
@@ -40,45 +59,230 @@ class KFCReplyAction(BaseAction):
# 动作参数定义
action_parameters: ClassVar = {
"content": "要发送的回复内容(必需,由 Replyer 生成)",
"content": "要发送的回复内容(可选,如果不提供则自动生成)",
"should_quote_reply": "是否引用原消息可选true/false默认 false",
}
# 动作使用场景
action_require: ClassVar = [
"发送回复消息时使用",
"content 参数必须包含要发送的内容",
"需要发送回复消息时使用",
"私聊场景的标准回复动作",
]
# 关联类型
associated_types: ClassVar[list[str]] = ["text"]
async def execute(self) -> tuple[bool, str]:
"""执行 reply 动作 - 发送 content 中的内容"""
"""执行 reply 动作 - 完整的回复流程"""
try:
# 获取要发送的内容
# 1. 检查是否有预生成的内容
content = self.action_data.get("content", "")
if not content:
logger.warning(f"{self.log_prefix} content 为空,跳过发送")
return True, ""
# 2. 需要生成回复,获取必要信息
user_id = self.action_data.get("user_id")
user_name = self.action_data.get("user_name", "用户")
thought = self.action_data.get("thought", "")
situation_type = self.action_data.get("situation_type", "new_message")
extra_context = self.action_data.get("extra_context")
if not user_id:
logger.warning(f"{self.log_prefix} 缺少 user_id无法生成回复")
return False, ""
# 3. 获取 Session
session = await self._get_session(user_id)
if not session:
logger.warning(f"{self.log_prefix} 无法获取 Session: {user_id}")
return False, ""
# 4. 调用 Replyer 生成回复
success, content = await self._generate_reply(
session=session,
user_name=user_name,
thought=thought,
situation_type=situation_type,
extra_context=extra_context,
)
if not success or not content:
logger.warning(f"{self.log_prefix} 回复生成失败")
return False, ""
# 获取是否引用
should_quote = self.action_data.get("should_quote_reply", False)
# 5. 回复后处理(系统格式词过滤 + 分段处理)
enable_splitter = self.action_data.get("enable_splitter", True)
enable_chinese_typo = self.action_data.get("enable_chinese_typo", True)
# 发送消息
await send_api.text_to_stream(
text=content,
stream_id=self.chat_stream.stream_id,
reply_to_message=self.action_message,
set_reply=should_quote and bool(self.action_message),
typing=False,
processed_segments = self._post_process_reply(
content=content,
enable_splitter=enable_splitter,
enable_chinese_typo=enable_chinese_typo,
)
logger.info(f"{self.log_prefix} KFC reply 动作执行成功")
return True, content
if not processed_segments:
logger.warning(f"{self.log_prefix} 回复后处理后内容为空")
return False, ""
# 6. 分段发送回复
should_quote = self.action_data.get("should_quote_reply", False)
reply_text = await self._send_segments(
segments=processed_segments,
should_quote=should_quote,
)
logger.info(f"{self.log_prefix} KFC reply 动作执行成功: {reply_text[:50]}...")
return True, reply_text
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 回复任务被取消")
return False, ""
except Exception as e:
logger.error(f"{self.log_prefix} KFC reply 动作执行失败: {e}")
import traceback
traceback.print_exc()
return False, ""
def _post_process_reply(
self,
content: str,
enable_splitter: bool = True,
enable_chinese_typo: bool = True,
) -> list[str]:
"""
回复后处理
包括:
1. 系统格式词过滤(移除 [回复...]、[表情包:...]、@<...> 等)
2. 分段处理(根据标点分句、智能合并)
3. 错字生成(拟人化)
Args:
content: 原始回复内容
enable_splitter: 是否启用分段
enable_chinese_typo: 是否启用错字生成
Returns:
处理后的文本段落列表
"""
try:
from src.chat.utils.utils import filter_system_format_content, process_llm_response
# 1. 过滤系统格式词
filtered_content = filter_system_format_content(content)
if not filtered_content or not filtered_content.strip():
logger.warning(f"{self.log_prefix} 过滤系统格式词后内容为空")
return []
# 2. 分段处理 + 错字生成
processed_segments = process_llm_response(
filtered_content,
enable_splitter=enable_splitter,
enable_chinese_typo=enable_chinese_typo,
)
# 过滤空段落
processed_segments = [seg for seg in processed_segments if seg and seg.strip()]
logger.debug(
f"{self.log_prefix} 回复后处理完成: "
f"原始长度={len(content)}, 过滤后长度={len(filtered_content)}, "
f"分段数={len(processed_segments)}"
)
return processed_segments
except Exception as e:
logger.error(f"{self.log_prefix} 回复后处理失败: {e}")
# 失败时返回原始内容
return [content] if content else []
async def _send_segments(
self,
segments: list[str],
should_quote: bool = False,
) -> str:
"""
分段发送回复
Args:
segments: 要发送的文本段落列表
should_quote: 是否引用原消息(仅第一条消息引用)
Returns:
完整的回复文本(所有段落拼接)
"""
reply_text = ""
first_sent = False
# 获取分段发送的间隔时间
typing_delay = 0.5
if global_config and hasattr(global_config, 'response_splitter'):
typing_delay = getattr(global_config.response_splitter, "typing_delay", 0.5)
for segment in segments:
if not segment or not segment.strip():
continue
reply_text += segment
# 发送消息
if not first_sent:
# 第一条消息:可能需要引用
await send_api.text_to_stream(
text=segment,
stream_id=self.chat_stream.stream_id,
reply_to_message=self.action_message,
set_reply=should_quote and bool(self.action_message),
typing=False,
)
first_sent = True
else:
# 后续消息:模拟打字延迟
if typing_delay > 0:
await asyncio.sleep(typing_delay)
await send_api.text_to_stream(
text=segment,
stream_id=self.chat_stream.stream_id,
reply_to_message=None,
set_reply=False,
typing=True,
)
return reply_text
async def _get_session(self, user_id: str) -> Optional["KokoroSession"]:
"""获取用户 Session"""
try:
from ..session import get_session_manager
session_manager = get_session_manager()
return await session_manager.get_session(user_id, self.chat_stream.stream_id)
except Exception as e:
logger.error(f"{self.log_prefix} 获取 Session 失败: {e}")
return None
async def _generate_reply(
self,
session: "KokoroSession",
user_name: str,
thought: str,
situation_type: str,
extra_context: Optional[dict] = None,
) -> tuple[bool, str]:
"""调用 Replyer 生成回复"""
try:
from ..replyer import generate_reply_text
return await generate_reply_text(
session=session,
user_name=user_name,
thought=thought,
situation_type=situation_type,
chat_stream=self.chat_stream,
extra_context=extra_context,
)
except Exception as e:
logger.error(f"{self.log_prefix} 生成回复失败: {e}")
return False, ""

View File

@@ -3,8 +3,8 @@ Kokoro Flow Chatter - Chatter 主类
极简设计,只负责:
1. 收到消息
2. 调用 Replyer 生成响应
3. 执行动作
2. 调用 Planner 生成规划
3. 执行动作(回复在 Action.execute() 中生成)
4. 更新 Session
"""
@@ -20,7 +20,6 @@ from src.plugin_system.base.component_types import ChatType
from .models import SessionStatus
from .planner import generate_plan
from .replyer import generate_reply_text
from .session import get_session_manager
if TYPE_CHECKING:
@@ -153,30 +152,19 @@ class KokoroFlowChatter(BaseChatter):
available_actions=available_actions,
)
# 10. 对于需要回复的动作,调用 Replyer 生成实际文本
processed_actions = []
# 10. 为 kfc_reply 动作注入必要的上下文信息
for action in plan_response.actions:
if action.type == "kfc_reply":
# 调用 replyer 生成回复文本
success, reply_text = await generate_reply_text(
session=session,
user_name=user_name,
thought=plan_response.thought,
situation_type=situation_type,
chat_stream=chat_stream,
)
if success and reply_text:
# 更新 action 的 content
action.params["content"] = reply_text
else:
logger.warning("[KFC] 回复生成失败,跳过该动作")
continue
processed_actions.append(action)
# 注入回复生成所需的上下文
action.params["user_id"] = user_id
action.params["user_name"] = user_name
action.params["thought"] = plan_response.thought
action.params["situation_type"] = situation_type
# 11. 执行动作
# 11. 执行动作(回复生成在 Action.execute() 中完成)
exec_results = []
has_reply = False
for action in processed_actions:
for action in plan_response.actions:
result = await self.action_manager.execute_action(
action_name=action.type,
chat_id=self.stream_id,
@@ -193,7 +181,7 @@ class KokoroFlowChatter(BaseChatter):
# 12. 记录 Bot 规划到 mental_log
session.add_bot_planning(
thought=plan_response.thought,
actions=[a.to_dict() for a in processed_actions],
actions=[a.to_dict() for a in plan_response.actions],
expected_reaction=plan_response.expected_reaction,
max_wait_seconds=plan_response.max_wait_seconds,
)
@@ -222,7 +210,7 @@ class KokoroFlowChatter(BaseChatter):
logger.info(
f"{SOFT_PURPLE}[KFC]{RESET} 处理完成: "
f"user={user_name}, situation={situation_type}, "
f"actions={[a.type for a in processed_actions]}, "
f"actions={[a.type for a in plan_response.actions]}, "
f"wait={plan_response.max_wait_seconds}s"
)

View File

@@ -22,7 +22,7 @@ from src.plugin_system.apis.unified_scheduler import TriggerType, unified_schedu
from .models import EventType, SessionStatus
from .planner import generate_plan
from .replyer import _clean_reply_text, generate_reply_text
from .replyer import _clean_reply_text
from .session import KokoroSession, get_session_manager
if TYPE_CHECKING:
@@ -412,26 +412,16 @@ class ProactiveThinker:
available_actions=action_manager.get_using_actions(),
)
# 对于需要回复的动作,调用 Replyer 生成实际文本
processed_actions = []
# 为 kfc_reply 动作注入必要的上下文信息
for action in plan_response.actions:
if action.type == "kfc_reply":
success, reply_text = await generate_reply_text(
session=session,
user_name=user_name,
thought=plan_response.thought,
situation_type="timeout",
chat_stream=chat_stream,
)
if success and reply_text:
action.params["content"] = reply_text
else:
logger.warning("[ProactiveThinker] 回复生成失败,跳过该动作")
continue
processed_actions.append(action)
action.params["user_id"] = session.user_id
action.params["user_name"] = user_name
action.params["thought"] = plan_response.thought
action.params["situation_type"] = "timeout"
# 执行动作
for action in processed_actions:
# 执行动作(回复生成在 Action.execute() 中完成)
for action in plan_response.actions:
await action_manager.execute_action(
action_name=action.type,
chat_id=session.stream_id,
@@ -445,7 +435,7 @@ class ProactiveThinker:
# 记录到 mental_log
session.add_bot_planning(
thought=plan_response.thought,
actions=[a.to_dict() for a in processed_actions],
actions=[a.to_dict() for a in plan_response.actions],
expected_reaction=plan_response.expected_reaction,
max_wait_seconds=plan_response.max_wait_seconds,
)
@@ -466,7 +456,7 @@ class ProactiveThinker:
logger.info(
f"[ProactiveThinker] 超时决策完成: user={session.user_id}, "
f"actions={[a.type for a in processed_actions]}, "
f"actions={[a.type for a in plan_response.actions]}, "
f"continue_wait={plan_response.max_wait_seconds > 0}"
)
@@ -612,27 +602,17 @@ class ProactiveThinker:
await self.session_manager.save_session(session.user_id)
return
# 对于需要回复的动作,调用 Replyer 生成实际文本
processed_actions = []
# 为 kfc_reply 动作注入必要的上下文信息
for action in plan_response.actions:
if action.type == "kfc_reply":
success, reply_text = await generate_reply_text(
session=session,
user_name=user_name,
thought=plan_response.thought,
situation_type="proactive",
chat_stream=chat_stream,
extra_context=extra_context,
)
if success and reply_text:
action.params["content"] = reply_text
else:
logger.warning("[ProactiveThinker] 回复生成失败,跳过该动作")
continue
processed_actions.append(action)
action.params["user_id"] = session.user_id
action.params["user_name"] = user_name
action.params["thought"] = plan_response.thought
action.params["situation_type"] = "proactive"
action.params["extra_context"] = extra_context
# 执行动作
for action in processed_actions:
# 执行动作(回复生成在 Action.execute() 中完成)
for action in plan_response.actions:
await action_manager.execute_action(
action_name=action.type,
chat_id=session.stream_id,
@@ -646,7 +626,7 @@ class ProactiveThinker:
# 记录到 mental_log
session.add_bot_planning(
thought=plan_response.thought,
actions=[a.to_dict() for a in processed_actions],
actions=[a.to_dict() for a in plan_response.actions],
expected_reaction=plan_response.expected_reaction,
max_wait_seconds=plan_response.max_wait_seconds,
)
@@ -664,7 +644,7 @@ class ProactiveThinker:
logger.info(
f"[ProactiveThinker] 主动发起完成: user={session.user_id}, "
f"actions={[a.type for a in processed_actions]}"
f"actions={[a.type for a in plan_response.actions]}"
)
except Exception as e: