feat:统一normal和focus的动作调整,emoji统一可选随机激活或llm激活

This commit is contained in:
SengokuCola
2025-07-06 18:36:14 +08:00
parent 6c117742a9
commit 498d72384f
20 changed files with 217 additions and 748 deletions

View File

@@ -9,18 +9,17 @@ from src.plugin_system.apis import generator_api
from maim_message import UserInfo, Seg
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.chat.utils.timer_calculator import Timer
from src.common.message_repository import count_messages
from src.chat.utils.prompt_builder import global_prompt_manager
from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet
from src.chat.message_receive.message_sender import message_manager
from src.chat.normal_chat.willing.willing_manager import get_willing_manager
from src.chat.normal_chat.normal_chat_utils import get_recent_message_stats
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.planner_actions.action_manager import ActionManager
from src.person_info.relationship_builder_manager import relationship_builder_manager
from .priority_manager import PriorityManager
import traceback
from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner
from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier
from src.chat.planner_actions.planner_normal import NormalChatPlanner
from src.chat.planner_actions.action_modifier import ActionModifier
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
from src.manager.mood_manager import mood_manager
@@ -71,7 +70,7 @@ class NormalChat:
# Planner相关初始化
self.action_manager = ActionManager()
self.planner = NormalChatPlanner(self.stream_name, self.action_manager)
self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name)
self.action_modifier = ActionModifier(self.action_manager, self.stream_id)
self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner
# 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply}
@@ -569,8 +568,8 @@ class NormalChat:
available_actions = None
if self.enable_planner:
try:
await self.action_modifier.modify_actions_for_normal_chat(
self.chat_stream, self.recent_replies, message.processed_plain_text
await self.action_modifier.modify_actions(
mode="normal", message_content=message.processed_plain_text
)
available_actions = self.action_manager.get_using_actions_for_mode("normal")
except Exception as e:
@@ -1003,3 +1002,29 @@ class NormalChat:
except Exception as e:
logger.error(f"[{self.stream_name}] 清理思考消息 {thinking_id} 时出错: {e}")
def get_recent_message_stats(minutes: int = 30, chat_id: str = None) -> dict:
"""
Args:
minutes (int): 检索的分钟数默认30分钟
chat_id (str, optional): 指定的chat_id仅统计该chat下的消息。为None时统计全部。
Returns:
dict: {"bot_reply_count": int, "total_message_count": int}
"""
now = time.time()
start_time = now - minutes * 60
bot_id = global_config.bot.qq_account
filter_base = {"time": {"$gte": start_time}}
if chat_id is not None:
filter_base["chat_id"] = chat_id
# 总消息数
total_message_count = count_messages(filter_base)
# bot自身回复数
bot_filter = filter_base.copy()
bot_filter["user_id"] = bot_id
bot_reply_count = count_messages(bot_filter)
return {"bot_reply_count": bot_reply_count, "total_message_count": total_message_count}

View File

@@ -1,403 +0,0 @@
from typing import List, Any, Dict
from src.common.logger import get_logger
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
from src.config.config import global_config
import random
import time
import asyncio
logger = get_logger("normal_chat_action_modifier")
class NormalChatActionModifier:
"""Normal Chat动作修改器
负责根据Normal Chat的上下文和状态动态调整可用的动作集合
实现与Focus Chat类似的动作激活策略但将LLM_JUDGE转换为概率激活以提升性能
"""
def __init__(self, action_manager: ActionManager, stream_id: str, stream_name: str):
"""初始化动作修改器"""
self.action_manager = action_manager
self.stream_id = stream_id
self.stream_name = stream_name
self.log_prefix = f"[{stream_name}]动作修改器"
# 缓存所有注册的动作
self.all_actions = self.action_manager.get_registered_actions()
async def modify_actions_for_normal_chat(
self,
chat_stream,
recent_replies: List[dict],
message_content: str,
**kwargs: Any,
):
"""为Normal Chat修改可用动作集合
实现动作激活策略:
1. 基于关联类型的动态过滤
2. 基于激活类型的智能判定LLM_JUDGE转为概率激活
Args:
chat_stream: 聊天流对象
recent_replies: 最近的回复记录
message_content: 当前消息内容
**kwargs: 其他参数
"""
reasons = []
merged_action_changes = {"add": [], "remove": []}
type_mismatched_actions = [] # 在外层定义避免作用域问题
self.action_manager.restore_default_actions()
# 第一阶段:基于关联类型的动态过滤
if chat_stream:
chat_context = chat_stream.context if hasattr(chat_stream, "context") else None
if chat_context:
# 获取Normal模式下的可用动作已经过滤了mode_enable
current_using_actions = self.action_manager.get_using_actions_for_mode("normal")
# print(f"current_using_actions: {current_using_actions}")
for action_name in current_using_actions.keys():
if action_name in self.all_actions:
data = self.all_actions[action_name]
if data.get("associated_types"):
if not chat_context.check_types(data["associated_types"]):
type_mismatched_actions.append(action_name)
logger.debug(f"{self.log_prefix} 动作 {action_name} 关联类型不匹配,移除该动作")
if type_mismatched_actions:
merged_action_changes["remove"].extend(type_mismatched_actions)
reasons.append(f"移除{type_mismatched_actions}(关联类型不匹配)")
# 第二阶段:应用激活类型判定
# 构建聊天内容 - 使用与planner一致的方式
chat_content = ""
if chat_stream and hasattr(chat_stream, "stream_id"):
try:
# 获取消息历史使用与normal_chat_planner相同的方法
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=chat_stream.stream_id,
timestamp=time.time(),
limit=global_config.chat.max_context_size, # 使用相同的配置
)
# 构建可读的聊天上下文
chat_content = build_readable_messages(
message_list_before_now,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
show_actions=True,
)
logger.debug(f"{self.log_prefix} 成功构建聊天内容,长度: {len(chat_content)}")
except Exception as e:
logger.warning(f"{self.log_prefix} 构建聊天内容失败: {e}")
chat_content = ""
# 获取当前Normal模式下的动作集进行激活判定
current_actions = self.action_manager.get_using_actions_for_mode("normal")
# print(f"current_actions: {current_actions}")
# print(f"chat_content: {chat_content}")
final_activated_actions = await self._apply_normal_activation_filtering(
current_actions, chat_content, message_content, recent_replies
)
# print(f"final_activated_actions: {final_activated_actions}")
# 统一处理所有需要移除的动作,避免重复移除
all_actions_to_remove = set() # 使用set避免重复
# 添加关联类型不匹配的动作
if type_mismatched_actions:
all_actions_to_remove.update(type_mismatched_actions)
# 添加激活类型判定未通过的动作
for action_name in current_actions.keys():
if action_name not in final_activated_actions:
all_actions_to_remove.add(action_name)
# 统计移除原因(避免重复)
activation_failed_actions = [
name
for name in current_actions.keys()
if name not in final_activated_actions and name not in type_mismatched_actions
]
if activation_failed_actions:
reasons.append(f"移除{activation_failed_actions}(激活类型判定未通过)")
# 统一执行移除操作
for action_name in all_actions_to_remove:
success = self.action_manager.remove_action_from_using(action_name)
if success:
logger.debug(f"{self.log_prefix} 移除动作: {action_name}")
else:
logger.debug(f"{self.log_prefix} 动作 {action_name} 已经不在使用集中,跳过移除")
# 应用动作添加(如果有的话)
for action_name in merged_action_changes["add"]:
if action_name in self.all_actions:
success = self.action_manager.add_action_to_using(action_name)
if success:
logger.debug(f"{self.log_prefix} 添加动作: {action_name}")
# 记录变更原因
if reasons:
logger.info(f"{self.log_prefix} 动作调整完成: {' | '.join(reasons)}")
# 获取最终的Normal模式可用动作并记录
final_actions = self.action_manager.get_using_actions_for_mode("normal")
logger.debug(f"{self.log_prefix} 当前Normal模式可用动作: {list(final_actions.keys())}")
async def _apply_normal_activation_filtering(
self,
actions_with_info: Dict[str, Any],
chat_content: str = "",
message_content: str = "",
recent_replies: List[dict] = None,
) -> Dict[str, Any]:
"""
应用Normal模式的激活类型过滤逻辑
与Focus模式的区别
1. LLM_JUDGE类型转换为概率激活避免LLM调用
2. RANDOM类型保持概率激活
3. KEYWORD类型保持关键词匹配
4. ALWAYS类型直接激活
Args:
actions_with_info: 带完整信息的动作字典
chat_content: 聊天内容
message_content: 当前消息内容
recent_replies: 最近的回复记录列表
Returns:
Dict[str, Any]: 过滤后激活的actions字典
"""
activated_actions = {}
# 分类处理不同激活类型的actions
always_actions = {}
random_actions = {}
keyword_actions = {}
llm_judge_actions = {}
for action_name, action_info in actions_with_info.items():
# 使用normal_activation_type
activation_type = action_info.get("normal_activation_type", "always")
# 现在统一是字符串格式的激活类型值
if activation_type == "always":
always_actions[action_name] = action_info
elif activation_type == "random":
random_actions[action_name] = action_info
elif activation_type == "llm_judge":
llm_judge_actions[action_name] = action_info
elif activation_type == "keyword":
keyword_actions[action_name] = action_info
else:
logger.warning(f"{self.log_prefix}未知的激活类型: {activation_type},跳过处理")
# 1. 处理ALWAYS类型直接激活
for action_name, action_info in always_actions.items():
activated_actions[action_name] = action_info
logger.debug(f"{self.log_prefix}激活动作: {action_name},原因: ALWAYS类型直接激活")
# 2. 处理RANDOM类型概率激活
for action_name, action_info in random_actions.items():
probability = action_info.get("random_activation_probability", ActionManager.DEFAULT_RANDOM_PROBABILITY)
should_activate = random.random() < probability
if should_activate:
activated_actions[action_name] = action_info
logger.debug(f"{self.log_prefix}激活动作: {action_name},原因: RANDOM类型触发概率{probability}")
else:
logger.debug(f"{self.log_prefix}未激活动作: {action_name},原因: RANDOM类型未触发概率{probability}")
# 3. 处理KEYWORD类型关键词匹配
for action_name, action_info in keyword_actions.items():
should_activate = self._check_keyword_activation(action_name, action_info, chat_content, message_content)
if should_activate:
activated_actions[action_name] = action_info
keywords = action_info.get("activation_keywords", [])
logger.debug(f"{self.log_prefix}激活动作: {action_name},原因: KEYWORD类型匹配关键词{keywords}")
else:
keywords = action_info.get("activation_keywords", [])
logger.debug(f"{self.log_prefix}未激活动作: {action_name},原因: KEYWORD类型未匹配关键词{keywords}")
# 4. 处理LLM_JUDGE类型并行判定
if llm_judge_actions:
# 直接并行处理所有LLM判定actions
llm_results = await self._process_llm_judge_actions_parallel(
llm_judge_actions,
chat_content,
)
# 添加激活的LLM判定actions
for action_name, should_activate in llm_results.items():
if should_activate:
activated_actions[action_name] = llm_judge_actions[action_name]
logger.debug(f"{self.log_prefix}激活动作: {action_name},原因: LLM_JUDGE类型判定通过")
else:
logger.debug(f"{self.log_prefix}未激活动作: {action_name},原因: LLM_JUDGE类型判定未通过")
logger.debug(f"{self.log_prefix}Normal模式激活类型过滤完成: {list(activated_actions.keys())}")
return activated_actions
def _check_keyword_activation(
self,
action_name: str,
action_info: Dict[str, Any],
chat_content: str = "",
message_content: str = "",
) -> bool:
"""
检查是否匹配关键词触发条件
Args:
action_name: 动作名称
action_info: 动作信息
chat_content: 聊天内容(已经是格式化后的可读消息)
Returns:
bool: 是否应该激活此action
"""
activation_keywords = action_info.get("activation_keywords", [])
case_sensitive = action_info.get("keyword_case_sensitive", False)
if not activation_keywords:
logger.warning(f"{self.log_prefix}动作 {action_name} 设置为关键词触发但未配置关键词")
return False
# 使用构建好的聊天内容作为检索文本
search_text = chat_content + message_content
# 如果不区分大小写,转换为小写
if not case_sensitive:
search_text = search_text.lower()
# 检查每个关键词
matched_keywords = []
for keyword in activation_keywords:
check_keyword = keyword if case_sensitive else keyword.lower()
if check_keyword in search_text:
matched_keywords.append(keyword)
# print(f"search_text: {search_text}")
# print(f"activation_keywords: {activation_keywords}")
if matched_keywords:
logger.debug(f"{self.log_prefix}动作 {action_name} 匹配到关键词: {matched_keywords}")
return True
else:
logger.debug(f"{self.log_prefix}动作 {action_name} 未匹配到任何关键词: {activation_keywords}")
return False
async def _process_llm_judge_actions_parallel(
self,
llm_judge_actions: Dict[str, Any],
chat_content: str = "",
) -> Dict[str, bool]:
"""
并行处理LLM判定actions支持智能缓存
Args:
llm_judge_actions: 需要LLM判定的actions
chat_content: 聊天内容
Returns:
Dict[str, bool]: action名称到激活结果的映射
"""
# 生成当前上下文的哈希值
current_context_hash = self._generate_context_hash(chat_content)
current_time = time.time()
results = {}
tasks_to_run = {}
# 检查缓存
for action_name, action_info in llm_judge_actions.items():
cache_key = f"{action_name}_{current_context_hash}"
# 检查是否有有效的缓存
if (
cache_key in self._llm_judge_cache
and current_time - self._llm_judge_cache[cache_key]["timestamp"] < self._cache_expiry_time
):
results[action_name] = self._llm_judge_cache[cache_key]["result"]
logger.debug(
f"{self.log_prefix}使用缓存结果 {action_name}: {'激活' if results[action_name] else '未激活'}"
)
else:
# 需要进行LLM判定
tasks_to_run[action_name] = action_info
# 如果有需要运行的任务,并行执行
if tasks_to_run:
logger.debug(f"{self.log_prefix}并行执行LLM判定任务数: {len(tasks_to_run)}")
# 创建并行任务
tasks = []
task_names = []
for action_name, action_info in tasks_to_run.items():
task = self._llm_judge_action(
action_name,
action_info,
chat_content,
)
tasks.append(task)
task_names.append(action_name)
# 并行执行所有任务
try:
task_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果并更新缓存
for _, (action_name, result) in enumerate(zip(task_names, task_results)):
if isinstance(result, Exception):
logger.error(f"{self.log_prefix}LLM判定action {action_name} 时出错: {result}")
results[action_name] = False
else:
results[action_name] = result
# 更新缓存
cache_key = f"{action_name}_{current_context_hash}"
self._llm_judge_cache[cache_key] = {"result": result, "timestamp": current_time}
logger.debug(f"{self.log_prefix}并行LLM判定完成耗时: {time.time() - current_time:.2f}s")
except Exception as e:
logger.error(f"{self.log_prefix}并行LLM判定失败: {e}")
# 如果并行执行失败为所有任务返回False
for action_name in tasks_to_run.keys():
results[action_name] = False
# 清理过期缓存
self._cleanup_expired_cache(current_time)
return results
def get_available_actions_count(self) -> int:
"""获取当前可用动作数量排除默认的no_action"""
current_actions = self.action_manager.get_using_actions_for_mode("normal")
# 排除no_action如果存在
filtered_actions = {k: v for k, v in current_actions.items() if k != "no_action"}
return len(filtered_actions)
def should_skip_planning(self) -> bool:
"""判断是否应该跳过规划过程"""
available_count = self.get_available_actions_count()
if available_count == 0:
logger.debug(f"{self.log_prefix} 没有可用动作,跳过规划")
return True
return False

View File

@@ -1,306 +0,0 @@
import json
from typing import Dict, Any
from rich.traceback import install
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.common.logger import get_logger
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.individuality.individuality import get_individuality
from src.chat.focus_chat.planners.action_manager import ActionManager
from src.chat.message_receive.message import MessageThinking
from json_repair import repair_json
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat
import time
import traceback
logger = get_logger("normal_chat_planner")
install(extra_lines=3)
def init_prompt():
Prompt(
"""
你的自我认知是:
{self_info_block}
请记住你的性格,身份和特点。
你是群内的一员,你现在正在参与群内的闲聊,以下是群内的聊天内容:
{chat_context}
基于以上聊天上下文和用户的最新消息选择最合适的action。
注意除了下面动作选项之外你在聊天中不能做其他任何事情这是你能力的边界现在请你选择合适的action:
{action_options_text}
重要说明:
- "no_action" 表示只进行普通聊天回复,不执行任何额外动作
- 其他action表示在普通回复的基础上执行相应的额外动作
你必须从上面列出的可用action中选择一个并说明原因。
{moderation_prompt}
请以动作的输出要求,以严格的 JSON 格式输出,且仅包含 JSON 内容。不要有任何其他文字或解释:
""",
"normal_chat_planner_prompt",
)
Prompt(
"""
动作:{action_name}
动作描述:{action_description}
{action_require}
{{
"action": "{action_name}",{action_parameters}
}}
""",
"normal_chat_action_prompt",
)
class NormalChatPlanner:
def __init__(self, log_prefix: str, action_manager: ActionManager):
self.log_prefix = log_prefix
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.planner,
request_type="normal.planner", # 用于normal_chat动作规划
)
self.action_manager = action_manager
async def plan(self, message: MessageThinking) -> Dict[str, Any]:
"""
Normal Chat 规划器: 使用LLM根据上下文决定做出什么动作。
参数:
message: 思考消息对象
sender_name: 发送者名称
"""
action = "no_action" # 默认动作改为no_action
reasoning = "规划器初始化默认"
action_data = {}
try:
# 设置默认值
nickname_str = ""
for nicknames in global_config.bot.alias_names:
nickname_str += f"{nicknames},"
name_block = f"你的名字是{global_config.bot.nickname},你的昵称有{nickname_str},有人也会用这些昵称称呼你。"
personality_block = get_individuality().get_personality_prompt(x_person=2, level=2)
identity_block = get_individuality().get_identity_prompt(x_person=2, level=2)
self_info = name_block + personality_block + identity_block
# 获取当前可用的动作使用Normal模式过滤
current_available_actions = self.action_manager.get_using_actions_for_mode("normal")
# 注意:动作的激活判定现在在 normal_chat_action_modifier 中完成
# 这里直接使用经过 action_modifier 处理后的最终动作集
# 符合职责分离原则ActionModifier负责动作管理Planner专注于决策
# 如果没有可用动作直接返回no_action
if not current_available_actions:
logger.debug(f"{self.log_prefix}规划器: 没有可用动作返回no_action")
return {
"action_result": {
"action_type": action,
"action_data": action_data,
"reasoning": reasoning,
"is_parallel": True,
},
"chat_context": "",
"action_prompt": "",
}
# 构建normal_chat的上下文 (使用与normal_chat相同的prompt构建方法)
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=message.chat_stream.stream_id,
timestamp=time.time(),
limit=global_config.chat.max_context_size,
)
chat_context = build_readable_messages(
message_list_before_now,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
show_actions=True,
)
# 构建planner的prompt
prompt = await self.build_planner_prompt(
self_info_block=self_info,
chat_context=chat_context,
current_available_actions=current_available_actions,
)
if not prompt:
logger.warning(f"{self.log_prefix}规划器: 构建提示词失败")
return {
"action_result": {
"action_type": action,
"action_data": action_data,
"reasoning": reasoning,
"is_parallel": False,
},
"chat_context": chat_context,
"action_prompt": "",
}
# 使用LLM生成动作决策
try:
content, (reasoning_content, model_name) = await self.planner_llm.generate_response_async(prompt)
logger.info(f"{self.log_prefix}规划器原始提示词: {prompt}")
logger.info(f"{self.log_prefix}规划器原始响应: {content}")
if reasoning_content:
logger.info(f"{self.log_prefix}规划器推理: {reasoning_content}")
# 解析JSON响应
try:
# 尝试修复JSON
fixed_json = repair_json(content)
action_result = json.loads(fixed_json)
action = action_result.get("action", "no_action")
reasoning = action_result.get("reasoning", "未提供原因")
# 提取其他参数作为action_data
action_data = {k: v for k, v in action_result.items() if k not in ["action", "reasoning"]}
# 验证动作是否在可用动作列表中,或者是特殊动作
if action not in current_available_actions:
logger.warning(f"{self.log_prefix}规划器选择了不可用的动作: {action}, 回退到no_action")
action = "no_action"
reasoning = f"选择的动作{action}不在可用列表中回退到no_action"
action_data = {}
except json.JSONDecodeError as e:
logger.warning(f"{self.log_prefix}规划器JSON解析失败: {e}, 内容: {content}")
action = "no_action"
reasoning = "JSON解析失败使用默认动作"
action_data = {}
except Exception as e:
logger.error(f"{self.log_prefix}规划器LLM调用失败: {e}")
action = "no_action"
reasoning = "LLM调用失败使用默认动作"
action_data = {}
except Exception as outer_e:
logger.error(f"{self.log_prefix}规划器异常: {outer_e}")
# 设置异常时的默认值
current_available_actions = {}
chat_context = "无法获取聊天上下文"
prompt = ""
action = "no_action"
reasoning = "规划器出现异常,使用默认动作"
action_data = {}
# 检查动作是否支持并行执行
is_parallel = False
if action in current_available_actions:
action_info = current_available_actions[action]
is_parallel = action_info.get("parallel_action", False)
logger.debug(
f"{self.log_prefix}规划器决策动作:{action}, 动作信息: '{action_data}', 理由: {reasoning}, 并行执行: {is_parallel}"
)
# 恢复到默认动作集
self.action_manager.restore_actions()
logger.debug(
f"{self.log_prefix}规划后恢复到默认动作集, 当前可用: {list(self.action_manager.get_using_actions().keys())}"
)
# 构建 action 记录
action_record = {
"action_type": action,
"action_data": action_data,
"reasoning": reasoning,
"timestamp": time.time(),
"model_name": model_name if "model_name" in locals() else None,
}
action_result = {
"action_type": action,
"action_data": action_data,
"reasoning": reasoning,
"is_parallel": is_parallel,
"action_record": json.dumps(action_record, ensure_ascii=False),
}
plan_result = {
"action_result": action_result,
"chat_context": chat_context,
"action_prompt": prompt,
}
return plan_result
async def build_planner_prompt(
self,
self_info_block: str,
chat_context: str,
current_available_actions: Dict[str, Any],
) -> str:
"""构建 Normal Chat Planner LLM 的提示词"""
try:
# 构建动作选项文本
action_options_text = ""
for action_name, action_info in current_available_actions.items():
action_description = action_info.get("description", "")
action_parameters = action_info.get("parameters", {})
action_require = action_info.get("require", [])
if action_parameters:
param_text = "\n"
# print(action_parameters)
for param_name, param_description in action_parameters.items():
param_text += f' "{param_name}":"{param_description}"\n'
param_text = param_text.rstrip("\n")
else:
param_text = ""
require_text = ""
for require_item in action_require:
require_text += f"- {require_item}\n"
require_text = require_text.rstrip("\n")
# 构建单个动作的提示
action_prompt = await global_prompt_manager.format_prompt(
"normal_chat_action_prompt",
action_name=action_name,
action_description=action_description,
action_parameters=param_text,
action_require=require_text,
)
action_options_text += action_prompt + "\n\n"
# 审核提示
moderation_prompt = "请确保你的回复符合平台规则,避免不当内容。"
# 使用模板构建最终提示词
prompt = await global_prompt_manager.format_prompt(
"normal_chat_planner_prompt",
self_info_block=self_info_block,
action_options_text=action_options_text,
moderation_prompt=moderation_prompt,
chat_context=chat_context,
)
return prompt
except Exception as e:
logger.error(f"{self.log_prefix}构建Planner提示词失败: {e}")
traceback.print_exc()
return ""
init_prompt()

View File

@@ -1,30 +0,0 @@
import time
from src.config.config import global_config
from src.common.message_repository import count_messages
def get_recent_message_stats(minutes: int = 30, chat_id: str = None) -> dict:
"""
Args:
minutes (int): 检索的分钟数默认30分钟
chat_id (str, optional): 指定的chat_id仅统计该chat下的消息。为None时统计全部。
Returns:
dict: {"bot_reply_count": int, "total_message_count": int}
"""
now = time.time()
start_time = now - minutes * 60
bot_id = global_config.bot.qq_account
filter_base = {"time": {"$gte": start_time}}
if chat_id is not None:
filter_base["chat_id"] = chat_id
# 总消息数
total_message_count = count_messages(filter_base)
# bot自身回复数
bot_filter = filter_base.copy()
bot_filter["user_id"] = bot_id
bot_reply_count = count_messages(bot_filter)
return {"bot_reply_count": bot_reply_count, "total_message_count": total_message_count}