feat(KFC): 更新聊天处理器和回复模块,优化动作名称及上下文构建逻辑

This commit is contained in:
Windpicker-owo
2025-11-30 15:52:01 +08:00
parent fc85338d0b
commit c6f34992d1
10 changed files with 229 additions and 50 deletions

View File

@@ -57,12 +57,40 @@ class ChatterManager:
self.stats["chatters_registered"] += 1 self.stats["chatters_registered"] += 1
def get_chatter_class(self, chat_type: ChatType) -> type | None: def get_chatter_class_for_chat_type(self, chat_type: ChatType) -> type | None:
"""获取指定聊天类型的聊天处理器类""" """
if chat_type in self.chatter_classes: 获取指定聊天类型的最佳聊天处理器类
return self.chatter_classes[chat_type][0]
优先级规则:
1. 优先选择明确匹配当前聊天类型的 Chatter如 PRIVATE 或 GROUP
2. 如果没有精确匹配,才使用 ALL 类型的 Chatter
Args:
chat_type: 聊天类型
Returns:
最佳匹配的聊天处理器类,如果没有匹配则返回 None
"""
# 1. 首先尝试精确匹配(排除 ALL 类型)
if chat_type != ChatType.ALL and chat_type in self.chatter_classes:
chatter_list = self.chatter_classes[chat_type]
if chatter_list:
logger.debug(f"找到精确匹配的聊天处理器: {chatter_list[0].__name__} for {chat_type.value}")
return chatter_list[0]
# 2. 如果没有精确匹配,回退到 ALL 类型
if ChatType.ALL in self.chatter_classes:
chatter_list = self.chatter_classes[ChatType.ALL]
if chatter_list:
logger.debug(f"使用通用聊天处理器: {chatter_list[0].__name__} for {chat_type.value}")
return chatter_list[0]
return None return None
def get_chatter_class(self, chat_type: ChatType) -> type | None:
"""获取指定聊天类型的聊天处理器类(兼容旧接口)"""
return self.get_chatter_class_for_chat_type(chat_type)
def get_supported_chat_types(self) -> list[ChatType]: def get_supported_chat_types(self) -> list[ChatType]:
"""获取支持的聊天类型列表""" """获取支持的聊天类型列表"""
return list(self.chatter_classes.keys()) return list(self.chatter_classes.keys())
@@ -112,29 +140,29 @@ class ChatterManager:
logger.error("schedule unread cleanup failed", stream_id=stream_id, error=runtime_error) logger.error("schedule unread cleanup failed", stream_id=stream_id, error=runtime_error)
async def process_stream_context(self, stream_id: str, context: "StreamContext") -> dict: async def process_stream_context(self, stream_id: str, context: "StreamContext") -> dict:
"""处理流上下文""" """
处理流上下文
每个聊天流只能有一个活跃的 Chatter 组件。
选择优先级:明确指定聊天类型的 Chatter > ALL 类型的 Chatter
"""
chat_type = context.chat_type chat_type = context.chat_type
chat_type_value = chat_type.value chat_type_value = chat_type.value
logger.debug("处理流上下文", stream_id=stream_id, chat_type=chat_type_value) logger.debug("处理流上下文", stream_id=stream_id, chat_type=chat_type_value)
self._ensure_chatter_registry() self._ensure_chatter_registry()
chatter_class = self.get_chatter_class(chat_type) # 检查是否已有该流的 Chatter 实例
if not chatter_class: stream_instance = self.instances.get(stream_id)
all_chatter_class = self.get_chatter_class(ChatType.ALL)
if all_chatter_class: if stream_instance is None:
chatter_class = all_chatter_class # 使用新的优先级选择逻辑获取最佳 Chatter 类
logger.info( chatter_class = self.get_chatter_class_for_chat_type(chat_type)
"回退到通用聊天处理器",
stream_id=stream_id, if not chatter_class:
requested_type=chat_type_value,
fallback=ChatType.ALL.value,
)
else:
raise ValueError(f"No chatter registered for chat type {chat_type}") raise ValueError(f"No chatter registered for chat type {chat_type}")
stream_instance = self.instances.get(stream_id) # 创建新实例
if stream_instance is None:
stream_instance = chatter_class(stream_id=stream_id, action_manager=self.action_manager) stream_instance = chatter_class(stream_id=stream_id, action_manager=self.action_manager)
self.instances[stream_id] = stream_instance self.instances[stream_id] = stream_instance
logger.info( logger.info(
@@ -143,6 +171,13 @@ class ChatterManager:
chatter_class=chatter_class.__name__, chatter_class=chatter_class.__name__,
chat_type=chat_type_value, chat_type=chat_type_value,
) )
else:
# 已有实例,直接使用(每个流只有一个活跃的 Chatter
logger.debug(
"使用已有聊天处理器实例",
stream_id=stream_id,
chatter_class=stream_instance.__class__.__name__,
)
self.stats["streams_processed"] += 1 self.stats["streams_processed"] += 1
try: try:

View File

@@ -22,10 +22,12 @@ class KFCReplyAction(BaseAction):
- 不调用 LLM直接发送 content 参数中的内容 - 不调用 LLM直接发送 content 参数中的内容
- content 由 Replyer 提前生成 - content 由 Replyer 提前生成
- 仅限 KokoroFlowChatterV2 使用 - 仅限 KokoroFlowChatterV2 使用
注意:使用 kfc_reply 作为动作名称以避免与 AFC 的 reply 动作冲突
""" """
# 动作基本信息 # 动作基本信息
action_name = "reply" action_name = "kfc_reply"
action_description = "发送回复消息。content 参数包含要发送的内容。" action_description = "发送回复消息。content 参数包含要发送的内容。"
# 激活设置 # 激活设置

View File

@@ -160,7 +160,7 @@ class KokoroFlowChatterV2(BaseChatter):
log_prefix="[KFC V2]", log_prefix="[KFC V2]",
) )
exec_results.append(result) exec_results.append(result)
if result.get("success") and action.type in ("reply", "respond"): if result.get("success") and action.type in ("kfc_reply", "respond"):
has_reply = True has_reply = True
# 10. 记录 Bot 规划到 mental_log # 10. 记录 Bot 规划到 mental_log

View File

@@ -50,6 +50,7 @@ class KFCContextBuilder:
sender_name: str, sender_name: str,
target_message: str, target_message: str,
context: Optional["StreamContext"] = None, context: Optional["StreamContext"] = None,
user_id: Optional[str] = None,
) -> dict[str, str]: ) -> dict[str, str]:
""" """
并行构建所有上下文模块 并行构建所有上下文模块
@@ -58,6 +59,7 @@ class KFCContextBuilder:
sender_name: 发送者名称 sender_name: 发送者名称
target_message: 目标消息内容 target_message: 目标消息内容
context: 聊天流上下文(可选) context: 聊天流上下文(可选)
user_id: 用户ID可选用于精确查找关系信息
Returns: Returns:
dict: 包含所有上下文块的字典 dict: 包含所有上下文块的字典
@@ -65,7 +67,7 @@ class KFCContextBuilder:
chat_history = await self._get_chat_history_text(context) chat_history = await self._get_chat_history_text(context)
tasks = { tasks = {
"relation_info": self._build_relation_info(sender_name, target_message), "relation_info": self._build_relation_info(sender_name, target_message, user_id),
"memory_block": self._build_memory_block(chat_history, target_message), "memory_block": self._build_memory_block(chat_history, target_message),
"expression_habits": self._build_expression_habits(chat_history, target_message), "expression_habits": self._build_expression_habits(chat_history, target_message),
"schedule": self._build_schedule_block(), "schedule": self._build_schedule_block(),
@@ -127,7 +129,7 @@ class KFCContextBuilder:
logger.error(f"获取聊天历史失败: {e}") logger.error(f"获取聊天历史失败: {e}")
return "" return ""
async def _build_relation_info(self, sender_name: str, target_message: str) -> str: async def _build_relation_info(self, sender_name: str, target_message: str, user_id: Optional[str] = None) -> str:
"""构建关系信息块""" """构建关系信息块"""
config = _get_config() config = _get_config()
@@ -135,11 +137,20 @@ class KFCContextBuilder:
return "你将要回复的是你自己发送的消息。" return "你将要回复的是你自己发送的消息。"
person_info_manager = get_person_info_manager() person_info_manager = get_person_info_manager()
person_id = await person_info_manager.get_person_id_by_person_name(sender_name)
# 优先使用 user_id + platform 获取 person_id
person_id = None
if user_id and self.platform:
person_id = person_info_manager.get_person_id(self.platform, user_id)
logger.debug(f"通过 platform={self.platform}, user_id={user_id} 获取 person_id={person_id}")
# 如果没有找到,尝试通过 person_name 查找
if not person_id:
person_id = await person_info_manager.get_person_id_by_person_name(sender_name)
if not person_id: if not person_id:
logger.debug(f"未找到用户 {sender_name}ID") logger.debug(f"未找到用户 {sender_name} person_id")
return f"完全不认识{sender_name},这是你们的第一次互动" return f"{sender_name}还没有建立深厚的关系,这是早期的互动阶段"
try: try:
from src.person_info.relationship_fetcher import relationship_fetcher_manager from src.person_info.relationship_fetcher import relationship_fetcher_manager
@@ -324,12 +335,13 @@ async def build_kfc_context(
sender_name: str, sender_name: str,
target_message: str, target_message: str,
context: Optional["StreamContext"] = None, context: Optional["StreamContext"] = None,
user_id: Optional[str] = None,
) -> dict[str, str]: ) -> dict[str, str]:
""" """
便捷函数构建KFC所需的所有上下文 便捷函数构建KFC所需的所有上下文
""" """
builder = KFCContextBuilder(chat_stream) builder = KFCContextBuilder(chat_stream)
return await builder.build_all_context(sender_name, target_message, context) return await builder.build_all_context(sender_name, target_message, context, user_id)
__all__ = [ __all__ = [

View File

@@ -230,7 +230,7 @@ class ActionModel:
def get_description(self) -> str: def get_description(self) -> str:
"""获取动作的文字描述""" """获取动作的文字描述"""
if self.type == "reply": if self.type == "kfc_reply":
content = self.params.get("content", "") content = self.params.get("content", "")
return f'发送消息:"{content[:50]}{"..." if len(content) > 50 else ""}"' return f'发送消息:"{content[:50]}{"..." if len(content) > 50 else ""}"'
elif self.type == "poke_user": elif self.type == "poke_user":
@@ -305,12 +305,12 @@ class LLMResponse:
def has_reply(self) -> bool: def has_reply(self) -> bool:
"""是否包含回复动作""" """是否包含回复动作"""
return any(a.type in ("reply", "respond") for a in self.actions) return any(a.type in ("kfc_reply", "respond") for a in self.actions)
def get_reply_content(self) -> str: def get_reply_content(self) -> str:
"""获取回复内容""" """获取回复内容"""
for action in self.actions: for action in self.actions:
if action.type in ("reply", "respond"): if action.type in ("kfc_reply", "respond"):
return action.params.get("content", "") return action.params.get("content", "")
return "" return ""

View File

@@ -163,6 +163,16 @@ class ProactiveThinker:
if not session.waiting_config.is_active(): if not session.waiting_config.is_active():
return return
# 防止与 Chatter 并发处理:如果 Session 刚刚被更新5秒内跳过
# 这样可以避免 Chatter 正在处理时ProactiveThinker 也开始处理
time_since_last_activity = time.time() - session.last_activity_at
if time_since_last_activity < 5:
logger.debug(
f"[ProactiveThinker] Session {session.user_id} 刚有活动 "
f"({time_since_last_activity:.1f}s ago),跳过处理"
)
return
# 检查是否超时 # 检查是否超时
if session.waiting_config.is_timeout(): if session.waiting_config.is_timeout():
await self._handle_timeout(session) await self._handle_timeout(session)
@@ -250,6 +260,19 @@ class ProactiveThinker:
"""处理等待超时""" """处理等待超时"""
self._stats["timeout_decisions"] += 1 self._stats["timeout_decisions"] += 1
# 再次检查 Session 状态,防止在等待过程中被 Chatter 处理
if session.status != SessionStatus.WAITING:
logger.debug(f"[ProactiveThinker] Session {session.user_id} 已不在等待状态,跳过超时处理")
return
# 再次检查最近活动时间
time_since_last_activity = time.time() - session.last_activity_at
if time_since_last_activity < 5:
logger.debug(
f"[ProactiveThinker] Session {session.user_id} 刚有活动,跳过超时处理"
)
return
logger.info(f"[ProactiveThinker] 等待超时: user={session.user_id}") logger.info(f"[ProactiveThinker] 等待超时: user={session.user_id}")
try: try:
@@ -391,6 +414,14 @@ class ProactiveThinker:
"""处理主动思考""" """处理主动思考"""
self._stats["proactive_triggered"] += 1 self._stats["proactive_triggered"] += 1
# 再次检查最近活动时间,防止与 Chatter 并发
time_since_last_activity = time.time() - session.last_activity_at
if time_since_last_activity < 5:
logger.debug(
f"[ProactiveThinker] Session {session.user_id} 刚有活动,跳过主动思考"
)
return
logger.info(f"[ProactiveThinker] 主动思考触发: user={session.user_id}, reason={trigger_reason}") logger.info(f"[ProactiveThinker] 主动思考触发: user={session.user_id}, reason={trigger_reason}")
try: try:

View File

@@ -63,11 +63,14 @@ class PromptBuilder:
""" """
extra_context = extra_context or {} extra_context = extra_context or {}
# 获取 user_id从 session 中)
user_id = session.user_id if session else None
# 1. 构建人设块 # 1. 构建人设块
persona_block = self._build_persona_block() persona_block = self._build_persona_block()
# 2. 构建关系块 # 2. 构建关系块
relation_block = await self._build_relation_block(user_name, chat_stream) relation_block = await self._build_relation_block(user_name, chat_stream, user_id)
# 3. 构建活动流 # 3. 构建活动流
activity_stream = await self._build_activity_stream(session, user_name) activity_stream = await self._build_activity_stream(session, user_name)
@@ -123,6 +126,7 @@ class PromptBuilder:
self, self,
user_name: str, user_name: str,
chat_stream: Optional["ChatStream"], chat_stream: Optional["ChatStream"],
user_id: Optional[str] = None,
) -> str: ) -> str:
"""构建关系块""" """构建关系块"""
if not chat_stream: if not chat_stream:
@@ -139,6 +143,7 @@ class PromptBuilder:
sender_name=user_name, sender_name=user_name,
target_message="", target_message="",
context=None, context=None,
user_id=user_id,
) )
relation_info = context_data.get("relation_info", "") relation_info = context_data.get("relation_info", "")
@@ -253,7 +258,7 @@ class PromptBuilder:
for action in actions: for action in actions:
action_type = action.get("type", "unknown") action_type = action.get("type", "unknown")
if action_type == "reply": if action_type == "kfc_reply":
content = action.get("content", "") content = action.get("content", "")
if len(content) > 50: if len(content) > 50:
content = content[:50] + "..." content = content[:50] + "..."
@@ -341,22 +346,111 @@ class PromptBuilder:
) )
def _build_actions_block(self, available_actions: Optional[dict]) -> str: def _build_actions_block(self, available_actions: Optional[dict]) -> str:
"""构建可用动作块""" """
构建可用动作块
参考 AFC planner 的格式,为每个动作展示:
- 动作名和描述
- 使用场景
- JSON 示例(含参数)
"""
if not available_actions: if not available_actions:
return self._get_default_actions_block() return self._get_default_actions_block()
lines = [] action_blocks = []
for name, info in available_actions.items(): for action_name, action_info in available_actions.items():
desc = getattr(info, "description", "") or f"执行 {name}" block = self._format_single_action(action_name, action_info)
lines.append(f"- `{name}`: {desc}") if block:
action_blocks.append(block)
return "\n".join(lines) if lines else self._get_default_actions_block() return "\n".join(action_blocks) if action_blocks else self._get_default_actions_block()
def _format_single_action(self, action_name: str, action_info) -> str:
"""
格式化单个动作为详细说明块
Args:
action_name: 动作名称
action_info: ActionInfo 对象
Returns:
格式化后的动作说明
"""
# 获取动作描述
description = getattr(action_info, "description", "") or f"执行 {action_name}"
# 获取使用场景
action_require = getattr(action_info, "action_require", []) or []
require_text = "\n".join(f" - {req}" for req in action_require) if action_require else " - 根据情况使用"
# 获取参数定义
action_parameters = getattr(action_info, "action_parameters", {}) or {}
# 构建 action_data JSON 示例
if action_parameters:
param_lines = []
for param_name, param_desc in action_parameters.items():
param_lines.append(f' "{param_name}": "<{param_desc}>"')
action_data_json = "{\n" + ",\n".join(param_lines) + "\n }"
else:
action_data_json = "{}"
# 构建完整的动作块
return f"""### {action_name}
**描述**: {description}
**使用场景**:
{require_text}
**示例**:
```json
{{
"type": "{action_name}",
{f'"content": "<你要说的内容>"' if action_name == "kfc_reply" else self._build_params_example(action_parameters)}
}}
```
"""
def _build_params_example(self, action_parameters: dict) -> str:
"""构建参数示例字符串"""
if not action_parameters:
return '"_comment": "此动作无需额外参数"'
parts = []
for param_name, param_desc in action_parameters.items():
parts.append(f'"{param_name}": "<{param_desc}>"')
return ",\n ".join(parts)
def _get_default_actions_block(self) -> str: def _get_default_actions_block(self) -> str:
"""获取默认的动作列表""" """获取默认的动作列表"""
return """- `reply`: 发送文字消息参数content return """### kfc_reply
- `poke_user`: 戳一戳对方 **描述**: 发送回复消息
- `do_nothing`: 什么都不做"""
**使用场景**:
- 需要回复对方消息时使用
**示例**:
```json
{
"type": "kfc_reply",
"content": "你要说的话"
}
```
### do_nothing
**描述**: 什么都不做
**使用场景**:
- 当前不需要回应时使用
**示例**:
```json
{
"type": "do_nothing"
}
```"""
async def _get_output_format(self) -> str: async def _get_output_format(self) -> str:
"""获取输出格式模板""" """获取输出格式模板"""
@@ -370,7 +464,7 @@ class PromptBuilder:
return """请用 JSON 格式回复: return """请用 JSON 格式回复:
{ {
"thought": "你的想法", "thought": "你的想法",
"actions": [{"type": "reply", "content": "你的回复"}], "actions": [{"type": "kfc_reply", "content": "你的回复"}],
"expected_reaction": "期待的反应", "expected_reaction": "期待的反应",
"max_wait_seconds": 300 "max_wait_seconds": 300
}""" }"""

View File

@@ -47,20 +47,23 @@ KFC_V2_OUTPUT_FORMAT = Prompt(
{{ {{
"thought": "你脑子里在想什么,越自然越好", "thought": "你脑子里在想什么,越自然越好",
"actions": [ "actions": [
{{"type": "reply", "content": "你要说的话"}}, {{"type": "动作名称", ...动作参数}}
{{"type": "其他动作", "参数": ""}}
], ],
"expected_reaction": "你期待对方的反应是什么", "expected_reaction": "你期待对方的反应是什么",
"max_wait_seconds": 300 "max_wait_seconds": 300
}} }}
``` ```
说明 ### 字段说明
- `thought`:你的内心独白,记录你此刻的想法和感受 - `thought`:你的内心独白,记录你此刻的想法和感受。要自然,不要技术性语言。
- `actions`:你要执行的动作列表,可以组合多个 - `actions`:你要执行的动作列表。每个动作是一个对象,必须包含 `type` 字段指定动作类型,其他字段根据动作类型不同而不同(参考上面每个动作的示例)。
- `expected_reaction`:你期待对方如何回应(用于判断是否需要等待) - `expected_reaction`:你期待对方如何回应(用于判断是否需要等待)
- `max_wait_seconds`设定等待时间0 表示不等待,超时后你会考虑是否要主动说点什么 - `max_wait_seconds`设定等待时间0 表示不等待,超时后你会考虑是否要主动说点什么
- 即使什么都不想做,也放一个 `{{"type": "do_nothing"}}`""",
### 注意事项
- 动作参数直接写在动作对象里,不需要 `action_data` 包装
- 即使什么都不想做,也放一个 `{{"type": "do_nothing"}}`
- 可以组合多个动作,比如先发消息再发表情""",
) )
# ================================================================================================= # =================================================================================================

View File

@@ -54,7 +54,9 @@ async def generate_response(
extra_context=extra_context, extra_context=extra_context,
) )
logger.debug(f"[KFC Replyer] 构建的提示词:\n{prompt}") from src.config.config import global_config
if global_config and global_config.debug.show_prompt:
logger.info(f"[KFC Replyer] 生成的提示词:\n{prompt}")
# 2. 获取模型配置并调用 LLM # 2. 获取模型配置并调用 LLM
models = llm_api.get_available_models() models = llm_api.get_available_models()

View File

@@ -197,7 +197,7 @@ class KokoroSession:
for entry in reversed(self.mental_log): for entry in reversed(self.mental_log):
if entry.event_type == EventType.BOT_PLANNING: if entry.event_type == EventType.BOT_PLANNING:
for action in entry.actions: for action in entry.actions:
if action.get("type") in ("reply", "respond"): if action.get("type") in ("kfc_reply", "respond"):
return action.get("content", "") return action.get("content", "")
return None return None