feat: 添加KFC V2专属动作模块及相关功能,优化回复机制

This commit is contained in:
Windpicker-owo
2025-11-30 15:04:00 +08:00
parent c68bf4ad4f
commit 1750004ffa
13 changed files with 266 additions and 512 deletions

View File

@@ -1,13 +1,10 @@
import asyncio
import time
import traceback
from typing import Any, TYPE_CHECKING
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.logger import get_logger
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import database_api
from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.component_types import ActionInfo, ComponentType
from src.plugin_system.core.component_registry import component_registry
@@ -20,23 +17,19 @@ logger = get_logger("action_manager")
class ChatterActionManager:
"""
动作管理器,用于管理各种类型的动作
现在统一使用新插件系统,简化了原有的新旧兼容逻辑。
动作管理器,用于管理和执行动作
职责:
- 加载和管理可用动作集
- 创建动作实例
- 执行动作(所有动作逻辑在 Action.execute() 中实现)
"""
def __init__(self):
"""初始化动作管理器"""
# 当前正在使用的动作集合,在规划开始时加载
self._using_actions: dict[str, ActionInfo] = {}
self.chat_id: str | None = None
self.log_prefix: str = "ChatterActionManager"
# 批量存储支持
self._batch_storage_enabled = False
self._pending_actions = []
self._current_chat_id = None
async def load_actions(self, stream_id: str | None):
"""根据 stream_id 加载当前可用的动作"""
@@ -44,8 +37,6 @@ class ChatterActionManager:
self._using_actions = component_registry.get_default_actions(stream_id)
logger.debug(f"已为 stream '{stream_id}' 加载 {len(self._using_actions)} 个可用动作: {list(self._using_actions.keys())}")
# === 执行Action方法 ===
@staticmethod
def create_action(
action_name: str,
@@ -70,12 +61,13 @@ class ChatterActionManager:
chat_stream: 聊天流
log_prefix: 日志前缀
shutting_down: 是否正在关闭
action_message: 目标消息
Returns:
Optional[BaseAction]: 创建的动作处理器实例如果动作名称未注册则返回None
BaseAction | None: 创建的动作处理器实例
"""
try:
# 获取组件类 - 明确指定查询Action类型
# 获取组件类
component_class: type[BaseAction] = component_registry.get_component_class(
action_name, ComponentType.ACTION
) # type: ignore
@@ -110,8 +102,6 @@ class ChatterActionManager:
except Exception as e:
logger.error(f"创建Action实例失败 {action_name}: {e}")
import traceback
logger.error(traceback.format_exc())
return None
@@ -119,17 +109,8 @@ class ChatterActionManager:
"""获取当前正在使用的动作集合"""
return self._using_actions.copy()
# === Modify相关方法 ===
def remove_action_from_using(self, action_name: str) -> bool:
"""
从当前使用的动作集中移除指定动作
Args:
action_name: 动作名称
Returns:
bool: 移除是否成功
"""
"""从当前使用的动作集中移除指定动作"""
if action_name not in self._using_actions:
logger.warning(f"移除失败: 动作 {action_name} 不在当前使用的动作集中")
return False
@@ -141,7 +122,6 @@ class ChatterActionManager:
async def restore_actions(self) -> None:
"""恢复到当前 stream_id 的默认动作集"""
actions_to_restore = list(self._using_actions.keys())
# 使用 self.chat_id 来恢复当前上下文的动作
await self.load_actions(self.chat_id)
logger.debug(f"恢复动作集: 从 {actions_to_restore} 恢复到 stream '{self.chat_id}' 的默认动作集 {list(self._using_actions.keys())}")
@@ -157,13 +137,13 @@ class ChatterActionManager:
clear_unread_messages: bool = True,
) -> Any:
"""
执行单个动作的通用函数
执行单个动作
所有动作(包括 reply/respond都通过 BaseAction.execute() 执行
所有动作逻辑都在 BaseAction.execute() 中实现
Args:
action_name: 动作名称
chat_id: 聊天id
chat_id: 聊天ID
target_message: 目标消息
reasoning: 执行理由
action_data: 动作数据
@@ -172,16 +152,16 @@ class ChatterActionManager:
clear_unread_messages: 是否清除未读消息
Returns:
执行结果
执行结果字典
"""
chat_stream = None
try:
# 通过chat_id获取chat_stream
# 获取 chat_stream
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(chat_id)
if not chat_stream:
logger.error(f"{log_prefix} 无法找到chat_id对应的chat_stream: {chat_id}")
logger.error(f"{log_prefix} 无法找到 chat_stream: {chat_id}")
return {
"action_type": action_name,
"success": False,
@@ -189,66 +169,75 @@ class ChatterActionManager:
"error": "chat_stream not found",
}
# 设置正在回复的状态
# 设置正在处理的状态
chat_stream.context.is_replying = True
# no_action 特殊处理
if action_name == "no_action":
return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""}
return {"action_type": "no_action", "success": True, "reply_text": ""}
# 统一通过 _handle_action 执行所有动作
success, reply_text, command = await self._handle_action(
chat_stream,
action_name,
reasoning,
action_data or {},
{}, # cycle_timers
thinking_id,
target_message,
# 创建并执行动作
action_handler = self.create_action(
action_name=action_name,
action_data=action_data or {},
reasoning=reasoning,
cycle_timers={},
thinking_id=thinking_id or "",
chat_stream=chat_stream,
log_prefix=log_prefix or self.log_prefix,
action_message=target_message,
)
# 记录执行的动作到目标消息
if not action_handler:
logger.error(f"{log_prefix} 创建动作处理器失败: {action_name}")
return {
"action_type": action_name,
"success": False,
"reply_text": "",
"error": f"Failed to create action handler: {action_name}",
}
# 执行动作
success, reply_text = await action_handler.handle_action()
# 记录动作到消息并存储动作信息
if success:
asyncio.create_task(self._record_action_to_message(chat_stream, action_name, target_message, action_data))
# 重置打断计数
await self._reset_interruption_count_after_action(chat_stream.stream_id)
asyncio.create_task(self._reset_interruption_count(chat_stream.stream_id))
# 统一存储动作信息
asyncio.create_task(
self._store_action_info(
action_handler=action_handler,
action_name=action_name,
reply_text=reply_text,
target_message=target_message,
)
)
return {
"action_type": action_name,
"success": success,
"reply_text": reply_text,
"command": command,
}
except Exception as e:
logger.error(f"{log_prefix} 执行动作时出错: {e}")
logger.error(f"{log_prefix} 错误信息: {traceback.format_exc()}")
logger.error(traceback.format_exc())
return {
"action_type": action_name,
"success": False,
"reply_text": "",
"loop_info": None,
"error": str(e),
}
finally:
# 确保重置正在回复的状态
if chat_stream:
chat_stream.context.is_replying = False
async def _record_action_to_message(self, chat_stream, action_name, target_message, action_data):
"""
记录执行的动作到目标消息中
Args:
chat_stream: ChatStream实例
action_name: 动作名称
target_message: 目标消息
action_data: 动作数据
"""
async def _record_action_to_message(self, chat_stream, action_name: str, target_message, action_data: dict | None):
"""记录执行的动作到目标消息"""
try:
from src.chat.message_manager.message_manager import message_manager
# 获取目标消息ID
target_message_id = None
if target_message:
target_message_id = target_message.message_id
@@ -256,362 +245,66 @@ class ChatterActionManager:
target_message_id = action_data.get("target_message_id")
if not target_message_id:
logger.debug(f"无法获取目标消息ID动作: {action_name}")
return
# 通过message_manager更新消息的动作记录并刷新focus_energy
await message_manager.add_action(
stream_id=chat_stream.stream_id, message_id=target_message_id, action=action_name
stream_id=chat_stream.stream_id,
message_id=target_message_id,
action=action_name,
)
logger.debug(f"已记录动作 {action_name} 到消息 {target_message_id} 并更新focus_energy")
logger.debug(f"已记录动作 {action_name} 到消息 {target_message_id}")
except Exception as e:
logger.error(f"记录动作到消息失败: {e}")
# 不抛出异常,避免影响主要功能
async def _reset_interruption_count_after_action(self, stream_id: str):
"""在动作执行成功后重置打断计数"""
async def _reset_interruption_count(self, stream_id: str):
"""重置打断计数"""
try:
from src.plugin_system.apis.chat_api import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(stream_id)
if chat_stream:
context = chat_stream.context
if context.interruption_count > 0:
old_count = context.interruption_count
# old_afc_adjustment = context.context.get_afc_threshold_adjustment()
await context.reset_interruption_count()
logger.debug(
f"动作执行成功,重置聊天流 {stream_id} 的打断计数: {old_count} -> 0"
)
if chat_stream and chat_stream.context.interruption_count > 0:
old_count = chat_stream.context.interruption_count
await chat_stream.context.reset_interruption_count()
logger.debug(f"重置打断计数: {old_count} -> 0")
except Exception as e:
logger.warning(f"重置打断计数时出错: {e}")
async def _handle_action(
self, chat_stream, action, reasoning, action_data, cycle_timers, thinking_id, action_message
) -> tuple[bool, str, str]:
"""
处理具体的动作执行
Args:
chat_stream: ChatStream实例
action: 动作名称
reasoning: 执行理由
action_data: 动作数据
cycle_timers: 循环计时器
thinking_id: 思考ID
action_message: 动作消息
Returns:
tuple: (执行是否成功, 回复文本, 命令文本)
功能说明:
- 创建对应的动作处理器
- 执行动作并捕获异常
- 返回执行结果供上级方法整合
"""
if not chat_stream:
return False, "", ""
try:
# 创建动作处理器
action_handler = self.create_action(
action_name=action,
action_data=action_data,
reasoning=reasoning,
cycle_timers=cycle_timers,
thinking_id=thinking_id,
chat_stream=chat_stream,
log_prefix=self.log_prefix,
action_message=action_message,
)
if not action_handler:
# 动作处理器创建失败,尝试回退机制
logger.warning(f"{self.log_prefix} 创建动作处理器失败: {action},尝试回退方案")
# 获取当前可用的动作
available_actions = self.get_using_actions()
fallback_action = None
# 回退优先级reply > 第一个可用动作
if "reply" in available_actions:
fallback_action = "reply"
elif available_actions:
fallback_action = next(iter(available_actions.keys()))
if fallback_action and fallback_action != action:
logger.info(f"{self.log_prefix} 使用回退动作: {fallback_action}")
action_handler = self.create_action(
action_name=fallback_action,
action_data=action_data,
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
cycle_timers=cycle_timers,
thinking_id=thinking_id,
chat_stream=chat_stream,
log_prefix=self.log_prefix,
action_message=action_message,
)
if not action_handler:
logger.error(f"{self.log_prefix} 回退方案也失败,无法创建任何动作处理器")
return False, "", ""
# 执行动作
success, reply_text = await action_handler.handle_action()
return success, reply_text, ""
except Exception as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
traceback.print_exc()
return False, "", ""
async def _send_and_store_reply(
self,
chat_stream: "ChatStream",
response_set,
loop_start_time,
action_message,
cycle_timers: dict[str, float],
thinking_id,
actions,
should_quote_reply: bool | None = None,
) -> tuple[str, dict[str, float]]:
"""
发送并存储回复信息
Args:
chat_stream: ChatStream实例
response_set: 回复内容集合
loop_start_time: 循环开始时间
action_message: 动作消息
cycle_timers: 循环计时器
thinking_id: 思考ID
actions: 动作列表
should_quote_reply: 是否应该引用回复原消息None表示自动决定
Returns:
Tuple[Dict[str, Any], str, Dict[str, float]]: 循环信息, 回复文本, 循环计时器
"""
# 发送回复
with Timer("回复发送", cycle_timers):
reply_text = await self.send_response(
chat_stream, response_set, loop_start_time, action_message, should_quote_reply
)
# 存储reply action信息
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
if action_message:
platform = action_message.chat_info.platform
user_id = action_message.user_info.user_id
else:
platform = getattr(chat_stream, "platform", "unknown")
user_id = ""
# 获取用户信息并生成回复提示
person_id = person_info_manager.get_person_id(
platform,
user_id,
)
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
# 存储动作信息到数据库(支持批量存储)
if self._batch_storage_enabled:
self.add_action_to_batch(
action_name="reply",
action_data={"reply_text": reply_text},
thinking_id=thinking_id or "",
action_done=True,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
)
else:
await database_api.store_action_info(
chat_stream=chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=thinking_id,
action_data={"reply_text": reply_text},
action_name="reply",
)
return reply_text, cycle_timers
async def send_response(
self, chat_stream, reply_set, thinking_start_time, message_data, should_quote_reply: bool | None = None
) -> str:
"""
发送回复内容的具体实现
Args:
chat_stream: ChatStream实例
reply_set: 回复内容集合,包含多个回复段
thinking_start_time: 思考开始时间
message_data: 消息数据
should_quote_reply: 是否应该引用回复原消息None表示自动决定
Returns:
str: 完整的回复文本
功能说明:
- 检查是否有新消息需要回复
- 处理主动思考的"沉默"决定
- 根据消息数量决定是否添加回复引用
- 逐段发送回复内容,支持打字效果
- 正确处理元组格式的回复段
"""
current_time = time.time()
# 计算新消息数量
await message_api.count_new_messages(
chat_id=chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time
)
# 根据新消息数量决定是否需要引用回复
reply_text = ""
# 检查是否为主动思考消息
if message_data:
is_proactive_thinking = getattr(message_data, "message_type", None) == "proactive_thinking"
else:
is_proactive_thinking = True
logger.debug(f"[send_response] message_data: {message_data}")
first_replied = False
for reply_seg in reply_set:
# 调试日志验证reply_seg的格式
logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}")
# 修正:正确处理元组格式 (格式为: (type, content))
if isinstance(reply_seg, tuple) and len(reply_seg) >= 2:
_, data = reply_seg
else:
# 向下兼容:如果已经是字符串,则直接使用
data = str(reply_seg)
if isinstance(data, list):
data = "".join(map(str, data))
reply_text += data
# 如果是主动思考且内容为"沉默",则不发送
if is_proactive_thinking and data.strip() == "沉默":
logger.info(f"{self.log_prefix} 主动思考决定保持沉默,不发送消息")
continue
# 发送第一段回复
if not first_replied:
# 决定是否引用回复
is_private_chat = not bool(chat_stream.group_info)
# 如果明确指定了should_quote_reply则使用指定值
if should_quote_reply is not None:
set_reply_flag = should_quote_reply and bool(message_data)
logger.debug(
f"📤 [ActionManager] 使用planner指定的引用设置: should_quote_reply={should_quote_reply}"
)
else:
# 否则使用默认逻辑:默认不引用,让对话更流畅自然
set_reply_flag = False
logger.debug(
f"📤 [ActionManager] 使用默认引用逻辑: 默认不引用(is_private={is_private_chat})"
)
logger.debug(
f"📤 [ActionManager] 准备发送第一段回复。message_data: {message_data}, set_reply: {set_reply_flag}"
)
await send_api.text_to_stream(
text=data,
stream_id=chat_stream.stream_id,
reply_to_message=message_data,
set_reply=set_reply_flag,
typing=False,
)
first_replied = True
else:
# 发送后续回复
await send_api.text_to_stream(
text=data,
stream_id=chat_stream.stream_id,
reply_to_message=None,
set_reply=False,
typing=True,
)
return reply_text
def enable_batch_storage(self, chat_id: str):
"""启用批量存储模式"""
self._batch_storage_enabled = True
self._current_chat_id = chat_id
self._pending_actions.clear()
logger.debug(f"已启用批量存储模式chat_id: {chat_id}")
def disable_batch_storage(self):
"""禁用批量存储模式"""
self._batch_storage_enabled = False
self._current_chat_id = None
self._pending_actions = [] # 清空队列
logger.debug("已禁用批量存储模式")
def add_action_to_batch(
async def _store_action_info(
self,
action_handler: BaseAction,
action_name: str,
action_data: dict,
thinking_id: str = "",
action_done: bool = True,
action_build_into_prompt: bool = False,
action_prompt_display: str = "",
reply_text: str,
target_message: DatabaseMessages | None,
):
"""添加动作到批量存储列表"""
if not self._batch_storage_enabled:
return False
action_record = {
"action_name": action_name,
"action_data": action_data,
"thinking_id": thinking_id,
"action_done": action_done,
"action_build_into_prompt": action_build_into_prompt,
"action_prompt_display": action_prompt_display,
"timestamp": time.time(),
}
self._pending_actions.append(action_record)
logger.debug(f"已添加动作到批量存储列表: {action_name} (当前待处理: {len(self._pending_actions)} 个)")
return True
async def flush_batch_storage(self, chat_stream):
"""批量存储所有待处理的动作记录"""
if not self._pending_actions:
logger.debug("没有待处理的动作需要批量存储")
return
"""统一存储动作信息到数据库"""
try:
logger.info(f"开始批量存储 {len(self._pending_actions)} 个动作记录")
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import database_api
# 批量存储所有动作
stored_count = 0
for action_data in self._pending_actions:
try:
result = await database_api.store_action_info(
chat_stream=chat_stream,
action_name=action_data.get("action_name", ""),
action_data=action_data.get("action_data", {}),
action_done=action_data.get("action_done", True),
action_build_into_prompt=action_data.get("action_build_into_prompt", False),
action_prompt_display=action_data.get("action_prompt_display", ""),
thinking_id=action_data.get("thinking_id", ""),
)
if result:
stored_count += 1
except Exception as e:
logger.error(f"存储单个动作记录失败: {e}")
# 构建 action_prompt_display
action_prompt_display = ""
if reply_text:
person_info_manager = get_person_info_manager()
if target_message:
platform = target_message.chat_info.platform
user_id = target_message.user_info.user_id
person_id = person_info_manager.get_person_id(platform, user_id)
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
else:
action_prompt_display = f"统一回应:{reply_text}"
logger.info(f"批量存储完成: 成功存储 {stored_count}/{len(self._pending_actions)} 个动作记录")
# 清空待处理列表
self._pending_actions.clear()
# 存储动作信息
await database_api.store_action_info(
chat_stream=action_handler.chat_stream,
action_build_into_prompt=False,
action_prompt_display=action_prompt_display,
action_done=True,
thinking_id=action_handler.thinking_id,
action_data={"reply_text": reply_text} if reply_text else action_handler.action_data,
action_name=action_name,
)
logger.debug(f"已存储动作信息: {action_name}")
except Exception as e:
logger.error(f"批量存储动作记录时发生错误: {e}")
logger.error(f"存储动作信息失败: {e}")