主动思考定时任务优化,目前有个问题at动作会认为当建环境环是群聊,我多次尝试解决不了……唉

This commit is contained in:
tt-P607
2025-09-08 22:32:19 +08:00
parent 7180223622
commit 76646e5d85
4 changed files with 165 additions and 85 deletions

View File

@@ -11,3 +11,4 @@ class ProactiveTriggerEvent:
source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager"
reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo"
metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息
related_message_id: Optional[str] = None # 关联的消息ID用于加载上下文

View File

@@ -1,6 +1,7 @@
import time
import traceback
import orjson
import re
from typing import TYPE_CHECKING, Dict, Any
from src.common.logger import get_logger
@@ -15,7 +16,8 @@ from src.plugin_system.base.component_types import ComponentType
from src.config.config import global_config
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id
from src.mood.mood_manager import mood_manager
from src.common.database.sqlalchemy_database_api import store_action_info
from src.common.database.sqlalchemy_database_api import store_action_info, db_get
from src.common.database.sqlalchemy_models import Messages
if TYPE_CHECKING:
from ..cycle_processor import CycleProcessor
@@ -118,85 +120,156 @@ class ProactiveThinker:
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
try:
# 调用规划器的 PROACTIVE 模式,让其决定下一步的行动
actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
# 如果是提醒事件,跳过规划器,直接构建默认动作
if trigger_event.source == "reminder_system":
# 1. 获取原始消息上下文
action_message = {}
if trigger_event.related_message_id:
# 直接将从数据库获取的完整消息记录作为 action_message
action_message = await db_get(
Messages, {"message_id": trigger_event.related_message_id}, single_result=True
) or {}
# 通常只关心规划出的第一个动作
action_result = actions[0] if actions else {}
# 2. 智能确定@对象
reason_text = trigger_event.reason.replace("定时提醒:", "").strip()
user_name_match = re.search(r"艾特一下(\S+)", reason_text)
if user_name_match:
user_name = user_name_match.group(1)
at_message = reason_text.replace(f"艾特一下{user_name}", "").strip()
elif action_message.get("user_nickname"):
user_name = action_message.get("user_nickname")
at_message = reason_text
else:
user_name = ""
at_message = reason_text
action_type = action_result.get("action_type")
# 3. 构建动作
action_result = {
"action_type": "at_user",
"reasoning": "执行定时提醒",
"action_data": {
"user_name": user_name,
"at_message": at_message or "时间到啦!"
},
"action_message": action_message
}
# 4. 执行或回退
try:
success, _, _ = await self.cycle_processor._handle_action(
action=action_result["action_type"],
reasoning=action_result["reasoning"],
action_data=action_result["action_data"],
cycle_timers={},
thinking_id="",
action_message=action_result["action_message"]
)
if not success:
raise Exception("at_user action failed")
except Exception:
logger.warning(f"{self.context.log_prefix} at_user动作执行失败回退到proactive_reply")
fallback_action = {
"action_type": "proactive_reply",
"action_data": {"topic": trigger_event.reason},
"action_message": action_message
}
await self._generate_proactive_content_and_send(fallback_action, trigger_event)
if action_type == "proactive_reply":
await self._generate_proactive_content_and_send(action_result)
elif action_type != "do_nothing":
logger.warning(f"{self.context.log_prefix} 主动思考返回了未知的动作类型: {action_type}")
else:
# 如果规划结果是“什么都不做”,则记录日志
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
# 对于其他来源的主动思考,正常调用规划器
actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
action_result = actions[0] if actions else {}
action_type = action_result.get("action_type")
if action_type == "proactive_reply":
await self._generate_proactive_content_and_send(action_result, trigger_event)
elif action_type not in ["do_nothing", "no_action"]:
await self.cycle_processor._handle_action(
action=action_result["action_type"],
reasoning=action_result.get("reasoning", ""),
action_data=action_result.get("action_data", {}),
cycle_timers={},
thinking_id="",
action_message=action_result.get("action_message")
)
else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc())
async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any]):
async def _get_reminder_context(self, message_id: str) -> str:
"""获取提醒消息的上下文"""
try:
# 只获取那一条消息
message_record = await db_get(Messages, {"message_id": message_id}, single_result=True)
if message_record:
# 使用 build_readable_messages_with_id 来格式化单条消息
chat_context_block, _ = build_readable_messages_with_id(messages=[message_record])
return chat_context_block
return "无法加载相关的聊天记录。"
except Exception as e:
logger.error(f"{self.context.log_prefix} 获取提醒上下文失败: {e}")
return "无法加载相关的聊天记录。"
async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any], trigger_event: ProactiveTriggerEvent):
"""
获取实时信息,构建最终的生成提示词,并生成和发送主动回复。
Args:
action_result (Dict[str, Any]): 规划器返回的动作结果。
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
try:
topic = action_result.get("action_data", {}).get("topic", "随便聊聊")
logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'")
# 1. 获取日程信息
schedule_block = "你今天没有日程安排。"
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity}"
# 2. 网络搜索
news_block = "暂时没有获取到最新资讯。"
try:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool:
# 检查工具的execute方法签名使用正确的参数名
try:
search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10)
except TypeError:
# 如果search_query不工作尝试其他可能的参数名
if trigger_event.source != "reminder_system":
try:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool:
try:
search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10)
search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10)
except TypeError:
# 跳过网络搜索,避免影响主动思考
logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索")
news_block = "跳过网络搜索。"
search_result_dict = None
if search_result_dict and not search_result_dict.get("error"):
news_block = search_result_dict.get("content", "未能提取有效资讯。")
elif search_result_dict:
logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}")
else:
logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}")
try:
search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10)
except TypeError:
logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索")
news_block = "跳过网络搜索。"
search_result_dict = None
if search_result_dict and not search_result_dict.get("error"):
news_block = search_result_dict.get("content", "未能提取有效资讯。")
elif search_result_dict:
logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}")
else:
logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}")
# 3. 获取最新的聊天上下文
message_list = get_raw_msg_before_timestamp_with_chat(
chat_id=self.context.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.3),
)
chat_context_block, _ = build_readable_messages_with_id(messages=message_list)
if trigger_event.source == "reminder_system" and trigger_event.related_message_id:
chat_context_block = await self._get_reminder_context(trigger_event.related_message_id)
else:
message_list = get_raw_msg_before_timestamp_with_chat(
chat_id=self.context.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.3),
)
chat_context_block, _ = build_readable_messages_with_id(messages=message_list)
# 4. 使用决策模型进行二次确认(节省珍贵的回复模型调用)
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
bot_name = global_config.bot.nickname
# 构建二次确认提示词
confirmation_prompt = f"""# 主动回复二次确认
## 基本信息
@@ -219,7 +292,6 @@ class ProactiveThinker:
请严格按照上述格式输出,不要添加任何解释。"""
# 使用决策模型进行二次确认
planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner,
request_type="planner"
@@ -227,12 +299,10 @@ class ProactiveThinker:
confirmation_result, _ = await planner_llm.generate_response_async(prompt=confirmation_prompt)
# 检查二次确认结果
if not confirmation_result or "SKIP_PROACTIVE_REPLY" in confirmation_result:
logger.info(f"{self.context.log_prefix} 决策模型二次确认决定跳过主动回复")
return
# 5. 只有通过二次确认才调用珍贵的回复模型
bot_name = global_config.bot.nickname
personality = global_config.personality
identity_block = (

View File

@@ -199,7 +199,8 @@ class HeartFCMessageReceiver:
"chat_id": chat.stream_id,
"content": reminder_event.content,
"confidence": reminder_event.confidence,
"created_at": datetime.now().isoformat()
"created_at": datetime.now().isoformat(),
"original_message_id": message.message_info.message_id
}
success = await event_scheduler.schedule_event(

View File

@@ -484,6 +484,7 @@ class ActionPlanner:
mode: ChatMode = ChatMode.FOCUS,
loop_start_time: float = 0.0,
available_actions: Optional[Dict[str, ActionInfo]] = None,
pseudo_message: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""
[注释] "大脑"规划器。
@@ -513,6 +514,8 @@ class ActionPlanner:
truncate=False,
show_actions=False,
)
if pseudo_message:
chat_content_block_short += f"\n[m99] 刚刚, 用户: {pseudo_message}"
self.last_obs_time_mark = time.time()
is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info()
@@ -522,8 +525,8 @@ class ActionPlanner:
# --- 2. 启动小脑并行思考 ---
all_sub_planner_results: List[Dict[str, Any]] = []
# PROACTIVE模式下禁用小脑,避免与大脑的主动思考决策冲突
if mode != ChatMode.PROACTIVE:
# PROACTIVE模式下,只有在有伪消息(来自提醒)时才激活小脑
if mode != ChatMode.PROACTIVE or pseudo_message:
try:
sub_planner_actions: Dict[str, ActionInfo] = {}
for action_name, action_info in available_actions.items():
@@ -607,40 +610,45 @@ class ActionPlanner:
action, reasoning = "no_reply", f"大脑处理错误: {e}"
# --- 4. 整合大脑和小脑的决策 ---
# 如果是私聊且开启了强制回复则将no_reply强制改为reply
if not is_group_chat and global_config.chat.force_reply_private and action == "no_reply":
action = "reply"
reasoning = "私聊强制回复"
logger.info(f"{self.log_prefix}私聊强制回复已触发,将动作从 'no_reply' 修改为 'reply'")
is_parallel = True
for info in all_sub_planner_results:
action_type = info.get("action_type")
if action_type and action_type not in ["no_action", "no_reply"]:
action_info = available_actions.get(action_type)
if action_info and not action_info.parallel_action:
is_parallel = False
break
action_data["loop_start_time"] = loop_start_time
final_actions: List[Dict[str, Any]] = []
if is_parallel:
logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行")
if action not in ["no_action", "no_reply"]:
final_actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions,
}
)
final_actions.extend(all_sub_planner_results)
# 特殊规则如果是提醒任务且大脑决定do_nothing则忽略大脑采用小脑的决策
if mode == ChatMode.PROACTIVE and pseudo_message and action == "do_nothing":
logger.info(f"{self.log_prefix}提醒任务触发大脑决策为do_nothing忽略大脑并采用小脑决策")
final_actions = all_sub_planner_results
else:
logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)")
final_actions.extend(all_sub_planner_results)
# 如果是私聊且开启了强制回复则将no_reply强制改为reply
if not is_group_chat and global_config.chat.force_reply_private and action == "no_reply":
action = "reply"
reasoning = "私聊强制回复"
logger.info(f"{self.log_prefix}私聊强制回复已触发,将动作从 'no_reply' 修改为 'reply'")
is_parallel = True
for info in all_sub_planner_results:
action_type = info.get("action_type")
if action_type and action_type not in ["no_action", "no_reply"]:
action_info = available_actions.get(action_type)
if action_info and not action_info.parallel_action:
is_parallel = False
break
action_data["loop_start_time"] = loop_start_time
final_actions: List[Dict[str, Any]] = []
if is_parallel:
logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行")
if action not in ["no_action", "no_reply"]:
final_actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions,
}
)
final_actions.extend(all_sub_planner_results)
else:
logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)")
final_actions.extend(all_sub_planner_results)
final_actions = self._filter_no_actions(final_actions)