refac:tool去处理器化

This commit is contained in:
SengokuCola
2025-07-01 17:47:56 +08:00
parent 6b6f99659d
commit 3544daeadb
12 changed files with 522 additions and 702 deletions

View File

@@ -25,7 +25,6 @@ class CycleDetail:
self.loop_processor_info: Dict[str, Any] = {} # 前处理器信息
self.loop_plan_info: Dict[str, Any] = {}
self.loop_action_info: Dict[str, Any] = {}
self.loop_post_processor_info: Dict[str, Any] = {} # 后处理器信息
def to_dict(self) -> Dict[str, Any]:
"""将循环信息转换为字典格式"""
@@ -80,7 +79,6 @@ class CycleDetail:
"loop_processor_info": convert_to_serializable(self.loop_processor_info),
"loop_plan_info": convert_to_serializable(self.loop_plan_info),
"loop_action_info": convert_to_serializable(self.loop_action_info),
"loop_post_processor_info": convert_to_serializable(self.loop_post_processor_info),
}
def complete_cycle(self):
@@ -135,4 +133,3 @@ class CycleDetail:
self.loop_processor_info = loop_info["loop_processor_info"]
self.loop_plan_info = loop_info["loop_plan_info"]
self.loop_action_info = loop_info["loop_action_info"]
self.loop_post_processor_info = loop_info["loop_post_processor_info"]

View File

@@ -19,7 +19,7 @@ from src.chat.heart_flow.observation.working_observation import WorkingMemoryObs
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.heart_flow.observation.structure_observation import StructureObservation
from src.chat.heart_flow.observation.actions_observation import ActionObservation
from src.chat.focus_chat.info_processors.tool_processor import ToolProcessor
from src.chat.focus_chat.memory_activator import MemoryActivator
from src.chat.focus_chat.info_processors.base_processor import BaseProcessor
from src.chat.focus_chat.planners.planner_factory import PlannerFactory
@@ -34,8 +34,7 @@ from src.person_info.relationship_builder_manager import relationship_builder_ma
install(extra_lines=3)
# 超时常量配置
ACTION_MODIFICATION_TIMEOUT = 15.0 # 动作修改任务超时时限(秒)
# 注释:原来的动作修改超时常量已移除,因为改为顺序执行
# 定义观察器映射:键是观察器名称,值是 (观察器类, 初始化参数)
OBSERVATION_CLASSES = {
@@ -51,11 +50,6 @@ PROCESSOR_CLASSES = {
"WorkingMemoryProcessor": (WorkingMemoryProcessor, "working_memory_processor"),
}
# 定义后期处理器映射:在规划后、动作执行前运行的处理器
POST_PLANNING_PROCESSOR_CLASSES = {
"ToolProcessor": (ToolProcessor, "tool_use_processor"),
}
logger = get_logger("hfc") # Logger Name Changed
@@ -128,23 +122,11 @@ class HeartFChatting:
if not config_key or getattr(config_processor_settings, config_key, True):
self.enabled_processor_names.append(proc_name)
# 初始化后期处理器(规划后执行的处理器)
self.enabled_post_planning_processor_names = []
for proc_name, (_proc_class, config_key) in POST_PLANNING_PROCESSOR_CLASSES.items():
# 对于关系相关处理器,需要同时检查关系配置项
if not config_key or getattr(config_processor_settings, config_key, True):
self.enabled_post_planning_processor_names.append(proc_name)
# logger.info(f"{self.log_prefix} 将启用的处理器: {self.enabled_processor_names}")
# logger.info(f"{self.log_prefix} 将启用的后期处理器: {self.enabled_post_planning_processor_names}")
self.processors: List[BaseProcessor] = []
self._register_default_processors()
# 初始化后期处理器
self.post_planning_processors: List[BaseProcessor] = []
self._register_post_planning_processors()
self.action_manager = ActionManager()
self.action_planner = PlannerFactory.create_planner(
log_prefix=self.log_prefix, action_manager=self.action_manager
@@ -186,7 +168,7 @@ class HeartFChatting:
# 检查是否需要跳过WorkingMemoryObservation
if name == "WorkingMemoryObservation":
# 如果工作记忆处理器被禁用则跳过WorkingMemoryObservation
if not global_config.focus_chat_processor.working_memory_processor:
if not global_config.focus_chat.working_memory_processor:
logger.debug(f"{self.log_prefix} 工作记忆处理器已禁用,跳过注册观察器 {name}")
continue
@@ -211,16 +193,13 @@ class HeartFChatting:
processor_info = PROCESSOR_CLASSES.get(name) # processor_info is (ProcessorClass, config_key)
if processor_info:
processor_actual_class = processor_info[0] # 获取实际的类定义
# 根据处理器类名判断是否需要 subheartflow_id
if name in [
"WorkingMemoryProcessor",
]:
self.processors.append(processor_actual_class(subheartflow_id=self.stream_id))
elif name == "ChattingInfoProcessor":
# 根据处理器类名判断构造参数
if name == "ChattingInfoProcessor":
self.processors.append(processor_actual_class())
elif name == "WorkingMemoryProcessor":
self.processors.append(processor_actual_class(subheartflow_id=self.stream_id))
else:
# 对于PROCESSOR_CLASSES中定义但此处未明确处理构造的处理器
# (例如, 新增了一个处理器到PROCESSOR_CLASSES, 它不需要id, 也不叫ChattingInfoProcessor)
try:
self.processors.append(processor_actual_class()) # 尝试无参构造
logger.debug(f"{self.log_prefix} 注册处理器 {name} (尝试无参构造).")
@@ -239,46 +218,7 @@ class HeartFChatting:
else:
logger.warning(f"{self.log_prefix} 没有注册任何处理器。这可能是由于配置错误或所有处理器都被禁用了。")
def _register_post_planning_processors(self):
"""根据 self.enabled_post_planning_processor_names 注册后期处理器"""
self.post_planning_processors = [] # 清空已有的
for name in self.enabled_post_planning_processor_names: # 'name' is "PersonImpressionpProcessor", etc.
processor_info = POST_PLANNING_PROCESSOR_CLASSES.get(name) # processor_info is (ProcessorClass, config_key)
if processor_info:
processor_actual_class = processor_info[0] # 获取实际的类定义
# 根据处理器类名判断是否需要 subheartflow_id
if name in [
"ToolProcessor",
"RelationshipBuildProcessor",
"RealTimeInfoProcessor",
"ExpressionSelectorProcessor",
]:
self.post_planning_processors.append(processor_actual_class(subheartflow_id=self.stream_id))
else:
# 对于POST_PLANNING_PROCESSOR_CLASSES中定义但此处未明确处理构造的处理器
# (例如, 新增了一个处理器到POST_PLANNING_PROCESSOR_CLASSES, 它不需要id, 也不叫PersonImpressionpProcessor)
try:
self.post_planning_processors.append(processor_actual_class()) # 尝试无参构造
logger.debug(f"{self.log_prefix} 注册后期处理器 {name} (尝试无参构造).")
except TypeError:
logger.error(
f"{self.log_prefix} 后期处理器 {name} 构造失败。它可能需要参数(如 subheartflow_id但未在注册逻辑中明确处理。"
)
else:
# 这理论上不应该发生,因为 enabled_post_planning_processor_names 是从 POST_PLANNING_PROCESSOR_CLASSES 的键生成的
logger.warning(
f"{self.log_prefix} 在 POST_PLANNING_PROCESSOR_CLASSES 中未找到名为 '{name}' 的处理器定义,将跳过注册。"
)
if self.post_planning_processors:
logger.info(
f"{self.log_prefix} 已注册后期处理器: {[p.__class__.__name__ for p in self.post_planning_processors]}"
)
else:
logger.warning(
f"{self.log_prefix} 没有注册任何后期处理器。这可能是由于配置错误或所有后期处理器都被禁用了。"
)
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@@ -460,19 +400,7 @@ class HeartFChatting:
("\n前处理器耗时: " + "; ".join(processor_time_strings)) if processor_time_strings else ""
)
# 新增:输出每个后处理器的耗时
post_processor_time_costs = self._current_cycle_detail.loop_post_processor_info.get(
"post_processor_time_costs", {}
)
post_processor_time_strings = []
for pname, ptime in post_processor_time_costs.items():
formatted_ptime = f"{ptime * 1000:.2f}毫秒" if ptime < 1 else f"{ptime:.2f}"
post_processor_time_strings.append(f"{pname}: {formatted_ptime}")
post_processor_time_log = (
("\n后处理器耗时: " + "; ".join(post_processor_time_strings))
if post_processor_time_strings
else ""
)
logger.info(
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
@@ -480,7 +408,6 @@ class HeartFChatting:
f"动作: {self._current_cycle_detail.loop_plan_info.get('action_result', {}).get('action_type', '未知动作')}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
+ processor_time_log
+ post_processor_time_log
)
# 记录性能数据
@@ -491,8 +418,7 @@ class HeartFChatting:
"action_type": action_result.get("action_type", "unknown"),
"total_time": self._current_cycle_detail.end_time - self._current_cycle_detail.start_time,
"step_times": cycle_timers.copy(),
"processor_time_costs": processor_time_costs, # 处理器时间
"post_processor_time_costs": post_processor_time_costs, # 后处理器时间
"processor_time_costs": processor_time_costs, # 处理器时间
"reasoning": action_result.get("reasoning", ""),
"success": self._current_cycle_detail.loop_action_info.get("action_taken", False),
}
@@ -634,122 +560,7 @@ class HeartFChatting:
return all_plan_info, processor_time_costs
async def _process_post_planning_processors_with_timing(
self, observations: List[Observation], action_type: str, action_data: dict
) -> tuple[dict, dict]:
"""
处理后期处理器(规划后执行的处理器)并收集详细时间统计
包括:关系处理器、表达选择器、记忆激活器
参数:
observations: 观察器列表
action_type: 动作类型
action_data: 原始动作数据
返回:
tuple[dict, dict]: (更新后的动作数据, 后处理器时间统计)
"""
logger.info(f"{self.log_prefix} 开始执行后期处理器(带详细统计)")
# 创建所有后期任务
task_list = []
task_to_name_map = {}
task_start_times = {}
post_processor_time_costs = {}
# 添加后期处理器任务
for processor in self.post_planning_processors:
processor_name = processor.__class__.__name__
async def run_processor_with_timeout_and_timing(proc=processor, name=processor_name):
start_time = time.time()
try:
result = await asyncio.wait_for(
proc.process_info(observations=observations, action_type=action_type, action_data=action_data),
30,
)
end_time = time.time()
post_processor_time_costs[name] = end_time - start_time
logger.debug(f"{self.log_prefix} 后期处理器 {name} 耗时: {end_time - start_time:.3f}")
return result
except Exception as e:
end_time = time.time()
post_processor_time_costs[name] = end_time - start_time
logger.warning(f"{self.log_prefix} 后期处理器 {name} 执行异常,耗时: {end_time - start_time:.3f}")
raise e
task = asyncio.create_task(run_processor_with_timeout_and_timing())
task_list.append(task)
task_to_name_map[task] = ("processor", processor_name)
task_start_times[task] = time.time()
logger.info(f"{self.log_prefix} 启动后期处理器任务: {processor_name}")
# 如果没有任何后期任务,直接返回
if not task_list:
logger.info(f"{self.log_prefix} 没有启用的后期处理器或记忆激活器")
return action_data, {}
# 等待所有任务完成
pending_tasks = set(task_list)
all_post_plan_info = []
while pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
task_type, task_name = task_to_name_map[task]
try:
result = await task
if task_type == "processor":
logger.info(f"{self.log_prefix} 后期处理器 {task_name} 已完成!")
if result is not None:
all_post_plan_info.extend(result)
else:
logger.warning(f"{self.log_prefix} 后期处理器 {task_name} 返回了 None")
except asyncio.TimeoutError:
# 对于超时任务,记录已用时间
elapsed_time = time.time() - task_start_times[task]
if task_type == "processor":
post_processor_time_costs[task_name] = elapsed_time
logger.warning(
f"{self.log_prefix} 后期处理器 {task_name} 超时(>30s已跳过耗时: {elapsed_time:.3f}"
)
except Exception as e:
# 对于异常任务,记录已用时间
elapsed_time = time.time() - task_start_times[task]
if task_type == "processor":
post_processor_time_costs[task_name] = elapsed_time
logger.error(
f"{self.log_prefix} 后期处理器 {task_name} 执行失败,耗时: {elapsed_time:.3f}秒. 错误: {e}",
exc_info=True,
)
# 将后期处理器的结果整合到 action_data 中
updated_action_data = action_data.copy()
structured_info = ""
for info in all_post_plan_info:
if isinstance(info, StructuredInfo):
structured_info = info.get_processed_info()
if structured_info:
updated_action_data["structured_info"] = structured_info
if all_post_plan_info:
logger.info(f"{self.log_prefix} 后期处理完成,产生了 {len(all_post_plan_info)} 个信息项")
# 输出详细统计信息
if post_processor_time_costs:
stats_str = ", ".join(
[f"{name}: {time_cost:.3f}s" for name, time_cost in post_processor_time_costs.items()]
)
logger.info(f"{self.log_prefix} 后期处理器详细耗时统计: {stats_str}")
return updated_action_data, post_processor_time_costs
async def _observe_process_plan_action_loop(self, cycle_timers: dict, thinking_id: str) -> dict:
try:
@@ -765,10 +576,10 @@ class HeartFChatting:
await self.relationship_builder.build_relation()
# 并行执行调整动作、回忆和处理器阶段
with Timer("调整动作、处理", cycle_timers):
# 创建并行任务
async def modify_actions_task():
# 顺序执行调整动作和处理器阶段
# 第一步:动作修改
with Timer("动作修改", cycle_timers):
try:
# 调用完整的动作修改流程
await self.action_modifier.modify_actions(
observations=self.observations,
@@ -776,44 +587,17 @@ class HeartFChatting:
await self.action_observation.observe()
self.observations.append(self.action_observation)
return True
# 创建两个并行任务为LLM调用添加超时保护
action_modify_task = asyncio.create_task(
asyncio.wait_for(modify_actions_task(), timeout=ACTION_MODIFICATION_TIMEOUT)
)
processor_task = asyncio.create_task(self._process_processors(self.observations))
# 等待两个任务完成,使用超时保护和详细错误处理
action_modify_result = None
all_plan_info = []
processor_time_costs = {}
try:
action_modify_result, (all_plan_info, processor_time_costs) = await asyncio.gather(
action_modify_task, processor_task, return_exceptions=True
)
# 检查各个任务的结果
if isinstance(action_modify_result, Exception):
if isinstance(action_modify_result, asyncio.TimeoutError):
logger.error(f"{self.log_prefix} 动作修改任务超时")
else:
logger.error(f"{self.log_prefix} 动作修改任务失败: {action_modify_result}")
processor_result = (all_plan_info, processor_time_costs)
if isinstance(processor_result, Exception):
if isinstance(processor_result, asyncio.TimeoutError):
logger.error(f"{self.log_prefix} 处理器任务超时")
else:
logger.error(f"{self.log_prefix} 处理器任务失败: {processor_result}")
all_plan_info = []
processor_time_costs = {}
else:
all_plan_info, processor_time_costs = processor_result
logger.debug(f"{self.log_prefix} 动作修改完成")
except Exception as e:
logger.error(f"{self.log_prefix} 并行任务gather失败: {e}")
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 继续执行,不中断流程
# 第二步:信息处理器
with Timer("信息处理器", cycle_timers):
try:
all_plan_info, processor_time_costs = await self._process_processors(self.observations)
except Exception as e:
logger.error(f"{self.log_prefix} 信息处理器失败: {e}")
# 设置默认值以继续执行
all_plan_info = []
processor_time_costs = {}
@@ -833,7 +617,6 @@ class HeartFChatting:
"observed_messages": plan_result.get("observed_messages", ""),
}
# 修正将后期处理器从执行动作Timer中分离出来
action_type, action_data, reasoning = (
plan_result.get("action_result", {}).get("action_type", "error"),
plan_result.get("action_result", {}).get("action_data", {}),
@@ -849,22 +632,7 @@ class HeartFChatting:
logger.debug(f"{self.log_prefix} 麦麦想要:'{action_str}'")
# 添加:单独计时后期处理器,并收集详细统计
post_processor_time_costs = {}
if action_type != "no_reply":
with Timer("后期处理器", cycle_timers):
logger.debug(f"{self.log_prefix} 执行后期处理器(动作类型: {action_type}")
# 记录详细的后处理器时间
post_start_time = time.time()
action_data, post_processor_time_costs = await self._process_post_planning_processors_with_timing(
self.observations, action_type, action_data
)
post_end_time = time.time()
logger.info(f"{self.log_prefix} 后期处理器总耗时: {post_end_time - post_start_time:.3f}")
else:
logger.debug(f"{self.log_prefix} 跳过后期处理器(动作类型: {action_type}")
# 修正:纯动作执行计时
# 动作执行计时
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_type, reasoning, action_data, cycle_timers, thinking_id
@@ -877,17 +645,11 @@ class HeartFChatting:
"taken_time": time.time(),
}
# 添加后处理器统计到loop_info
loop_post_processor_info = {
"post_processor_time_costs": post_processor_time_costs,
}
loop_info = {
"loop_observation_info": loop_observation_info,
"loop_processor_info": loop_processor_info,
"loop_plan_info": loop_plan_info,
"loop_action_info": loop_action_info,
"loop_post_processor_info": loop_post_processor_info, # 新增
}
return loop_info

View File

@@ -1,71 +0,0 @@
from dataclasses import dataclass
from typing import List, Dict
from .info_base import InfoBase
@dataclass
class ExpressionSelectionInfo(InfoBase):
"""表达选择信息类
用于存储和管理选中的表达方式信息。
Attributes:
type (str): 信息类型标识符,默认为 "expression_selection"
data (Dict[str, Any]): 包含选中表达方式的数据字典
"""
type: str = "expression_selection"
def get_selected_expressions(self) -> List[Dict[str, str]]:
"""获取选中的表达方式列表
Returns:
List[Dict[str, str]]: 选中的表达方式列表
"""
return self.get_info("selected_expressions") or []
def set_selected_expressions(self, expressions: List[Dict[str, str]]) -> None:
"""设置选中的表达方式列表
Args:
expressions: 选中的表达方式列表
"""
self.data["selected_expressions"] = expressions
def get_expressions_count(self) -> int:
"""获取选中表达方式的数量
Returns:
int: 表达方式数量
"""
return len(self.get_selected_expressions())
def get_processed_info(self) -> str:
"""获取处理后的信息
Returns:
str: 处理后的信息字符串
"""
expressions = self.get_selected_expressions()
if not expressions:
return ""
# 格式化表达方式为可读文本
formatted_expressions = []
for expr in expressions:
situation = expr.get("situation", "")
style = expr.get("style", "")
expr.get("type", "")
if situation and style:
formatted_expressions.append(f"{situation}时,使用 {style}")
return "\n".join(formatted_expressions)
def get_expressions_for_action_data(self) -> List[Dict[str, str]]:
"""获取用于action_data的表达方式数据
Returns:
List[Dict[str, str]]: 格式化后的表达方式数据
"""
return self.get_selected_expressions()

View File

@@ -1,34 +0,0 @@
from typing import Dict, Any
from dataclasses import dataclass, field
from .info_base import InfoBase
@dataclass
class MindInfo(InfoBase):
"""思维信息类
用于存储和管理当前思维状态的信息。
Attributes:
type (str): 信息类型标识符,默认为 "mind"
data (Dict[str, Any]): 包含 current_mind 的数据字典
"""
type: str = "mind"
data: Dict[str, Any] = field(default_factory=lambda: {"current_mind": ""})
def get_current_mind(self) -> str:
"""获取当前思维状态
Returns:
str: 当前思维状态
"""
return self.get_info("current_mind") or ""
def set_current_mind(self, mind: str) -> None:
"""设置当前思维状态
Args:
mind: 要设置的思维状态
"""
self.data["current_mind"] = mind

View File

@@ -1,40 +0,0 @@
from dataclasses import dataclass
from .info_base import InfoBase
@dataclass
class RelationInfo(InfoBase):
"""关系信息类
用于存储和管理当前关系状态的信息。
Attributes:
type (str): 信息类型标识符,默认为 "relation"
data (Dict[str, Any]): 包含 current_relation 的数据字典
"""
type: str = "relation"
def get_relation_info(self) -> str:
"""获取当前关系状态
Returns:
str: 当前关系状态
"""
return self.get_info("relation_info") or ""
def set_relation_info(self, relation_info: str) -> None:
"""设置当前关系状态
Args:
relation_info: 要设置的关系状态
"""
self.data["relation_info"] = relation_info
def get_processed_info(self) -> str:
"""获取处理后的信息
Returns:
str: 处理后的信息
"""
return self.get_relation_info() or ""

View File

@@ -1,85 +0,0 @@
from typing import Dict, Optional, Any, List
from dataclasses import dataclass, field
@dataclass
class StructuredInfo:
"""信息基类
这是一个基础信息类,用于存储和管理各种类型的信息数据。
所有具体的信息类都应该继承自这个基类。
Attributes:
type (str): 信息类型标识符,默认为 "base"
data (Dict[str, Union[str, Dict, list]]): 存储具体信息数据的字典,
支持存储字符串、字典、列表等嵌套数据结构
"""
type: str = "structured_info"
data: Dict[str, Any] = field(default_factory=dict)
def get_type(self) -> str:
"""获取信息类型
Returns:
str: 当前信息对象的类型标识符
"""
return self.type
def get_data(self) -> Dict[str, Any]:
"""获取所有信息数据
Returns:
Dict[str, Any]: 包含所有信息数据的字典
"""
return self.data
def get_info(self, key: str) -> Optional[Any]:
"""获取特定属性的信息
Args:
key: 要获取的属性键名
Returns:
Optional[Any]: 属性值,如果键不存在则返回 None
"""
return self.data.get(key)
def get_info_list(self, key: str) -> List[Any]:
"""获取特定属性的信息列表
Args:
key: 要获取的属性键名
Returns:
List[Any]: 属性值列表,如果键不存在则返回空列表
"""
value = self.data.get(key)
if isinstance(value, list):
return value
return []
def set_info(self, key: str, value: Any) -> None:
"""设置特定属性的信息值
Args:
key: 要设置的属性键名
value: 要设置的属性值
"""
self.data[key] = value
def get_processed_info(self) -> str:
"""获取处理后的信息
Returns:
str: 处理后的信息字符串
"""
info_str = ""
# print(f"self.data: {self.data}")
for key, value in self.data.items():
# print(f"key: {key}, value: {value}")
info_str += f"信息类型:{key},信息内容:{value}\n"
return info_str

View File

@@ -1,186 +0,0 @@
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
import time
from src.common.logger import get_logger
from src.individuality.individuality import get_individuality
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.tools.tool_use import ToolUser
from src.chat.utils.json_utils import process_llm_tool_calls
from .base_processor import BaseProcessor
from typing import List
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.info.structured_info import StructuredInfo
from src.chat.heart_flow.observation.structure_observation import StructureObservation
logger = get_logger("processor")
def init_prompt():
# ... 原有代码 ...
# 添加工具执行器提示词
tool_executor_prompt = """
你是一个专门执行工具的助手。你的名字是{bot_name}。现在是{time_now}
群里正在进行的聊天内容:
{chat_observe_info}
请仔细分析聊天内容,考虑以下几点:
1. 内容中是否包含需要查询信息的问题
2. 是否有明确的工具使用指令
If you need to use a tool, please directly call the corresponding tool function. If you do not need to use any tool, simply output "No tool needed".
"""
Prompt(tool_executor_prompt, "tool_executor_prompt")
class ToolProcessor(BaseProcessor):
log_prefix = "工具执行器"
def __init__(self, subheartflow_id: str):
super().__init__()
self.subheartflow_id = subheartflow_id
self.log_prefix = f"[{subheartflow_id}:ToolExecutor] "
self.llm_model = LLMRequest(
model=global_config.model.focus_tool_use,
request_type="focus.processor.tool",
)
self.structured_info = []
async def process_info(
self,
observations: List[Observation] = None,
action_type: str = None,
action_data: dict = None,
**kwargs,
) -> List[StructuredInfo]:
"""处理信息对象
Args:
observations: 可选的观察列表包含ChattingObservation和StructureObservation类型
action_type: 动作类型
action_data: 动作数据
**kwargs: 其他可选参数
Returns:
list: 处理后的结构化信息列表
"""
working_infos = []
result = []
if observations:
for observation in observations:
if isinstance(observation, ChattingObservation):
result, used_tools, prompt = await self.execute_tools(observation)
logger.info(f"工具调用结果: {result}")
# 更新WorkingObservation中的结构化信息
for observation in observations:
if isinstance(observation, StructureObservation):
for structured_info in result:
# logger.debug(f"{self.log_prefix} 更新WorkingObservation中的结构化信息: {structured_info}")
observation.add_structured_info(structured_info)
working_infos = observation.get_observe_info()
logger.debug(f"{self.log_prefix} 获取更新后WorkingObservation中的结构化信息: {working_infos}")
structured_info = StructuredInfo()
if working_infos:
for working_info in working_infos:
structured_info.set_info(key=working_info.get("type"), value=working_info.get("content"))
return [structured_info]
async def execute_tools(self, observation: ChattingObservation, action_type: str = None, action_data: dict = None):
"""
并行执行工具,返回结构化信息
参数:
sub_mind: 子思维对象
chat_target_name: 聊天目标名称,默认为"对方"
is_group_chat: 是否为群聊默认为False
return_details: 是否返回详细信息默认为False
cycle_info: 循环信息对象,可用于记录详细执行信息
action_type: 动作类型
action_data: 动作数据
返回:
如果return_details为False:
List[Dict]: 工具执行结果的结构化信息列表
如果return_details为True:
Tuple[List[Dict], List[str], str]: (工具执行结果列表, 使用的工具列表, 工具执行提示词)
"""
tool_instance = ToolUser()
tools = tool_instance._define_tools()
# logger.debug(f"observation: {observation}")
# logger.debug(f"observation.chat_target_info: {observation.chat_target_info}")
# logger.debug(f"observation.is_group_chat: {observation.is_group_chat}")
# logger.debug(f"observation.person_list: {observation.person_list}")
is_group_chat = observation.is_group_chat
# chat_observe_info = observation.get_observe_info()
chat_observe_info = observation.talking_message_str_truncate_short
# person_list = observation.person_list
# 获取时间信息
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 构建专用于工具调用的提示词
prompt = await global_prompt_manager.format_prompt(
"tool_executor_prompt",
chat_observe_info=chat_observe_info,
is_group_chat=is_group_chat,
bot_name=get_individuality().name,
time_now=time_now,
)
# 调用LLM专注于工具使用
# logger.info(f"开始执行工具调用{prompt}")
response, other_info = await self.llm_model.generate_response_async(prompt=prompt, tools=tools)
if len(other_info) == 3:
reasoning_content, model_name, tool_calls = other_info
else:
reasoning_content, model_name = other_info
tool_calls = None
# print("tooltooltooltooltooltooltooltooltooltooltooltooltooltooltooltooltool")
if tool_calls:
logger.info(f"获取到工具原始输出:\n{tool_calls}")
# 处理工具调用和结果收集类似于SubMind中的逻辑
new_structured_items = []
used_tools = [] # 记录使用了哪些工具
if tool_calls:
success, valid_tool_calls, error_msg = process_llm_tool_calls(tool_calls)
if success and valid_tool_calls:
for tool_call in valid_tool_calls:
try:
# 记录使用的工具名称
tool_name = tool_call.get("name", "unknown_tool")
used_tools.append(tool_name)
result = await tool_instance._execute_tool_call(tool_call)
name = result.get("type", "unknown_type")
content = result.get("content", "")
logger.info(f"工具{name},获得信息:{content}")
if result:
new_item = {
"type": result.get("type", "unknown_type"),
"id": result.get("id", f"tool_exec_{time.time()}"),
"content": result.get("content", ""),
"ttl": 3,
}
new_structured_items.append(new_item)
except Exception as e:
logger.error(f"{self.log_prefix}工具执行失败: {e}")
return new_structured_items, used_tools, prompt
init_prompt()