This commit is contained in:
Windpicker-owo
2025-09-09 14:55:04 +08:00
12 changed files with 938 additions and 462 deletions

View File

@@ -90,20 +90,20 @@ class CycleTracker:
timer_strings.append(f"{name}: {formatted_time}")
# 获取动作类型,兼容新旧格式
# 获取动作类型
action_type = "未知动作"
if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail:
loop_plan_info = self._current_cycle_detail.loop_plan_info
if isinstance(loop_plan_info, dict):
action_result = loop_plan_info.get("action_result", {})
if isinstance(action_result, dict):
# 旧格式action_result是字典
action_type = action_result.get("action_type", "未知动作")
elif isinstance(action_result, list) and action_result:
# 新格式action_result是actions列表
action_type = action_result[0].get("action_type", "未知动作")
elif isinstance(loop_plan_info, list) and loop_plan_info:
# 直接是actions列表的情况
action_type = loop_plan_info[0].get("action_type", "未知动作")
if self.context.current_cycle_detail:
loop_plan_info = self.context.current_cycle_detail.loop_plan_info
actions = loop_plan_info.get("action_result")
if isinstance(actions, list) and actions:
# 从actions列表中提取所有action_type
action_types = [a.get("action_type", "未知") for a in actions]
action_type = ", ".join(action_types)
elif isinstance(actions, dict):
# 兼容旧格式
action_type = actions.get("action_type", "未知动作")
if self.context.current_cycle_detail.end_time and self.context.current_cycle_detail.start_time:
duration = self.context.current_cycle_detail.end_time - self.context.current_cycle_detail.start_time

View File

@@ -0,0 +1,239 @@
"""
事件驱动的智能调度器
基于asyncio的精确定时事件调度系统替代轮询机制
"""
import asyncio
import time
import traceback
from datetime import datetime, timedelta
from typing import Dict, Callable, Any, Optional
from dataclasses import dataclass
from src.common.logger import get_logger
logger = get_logger("event_scheduler")
@dataclass
class ScheduledEvent:
"""调度事件数据类"""
event_id: str
trigger_time: datetime
callback: Callable
metadata: Dict[str, Any]
task: Optional[asyncio.Task] = None
class EventDrivenScheduler:
"""事件驱动的调度器"""
def __init__(self):
self.scheduled_events: Dict[str, ScheduledEvent] = {}
self._shutdown = False
async def schedule_event(
self,
event_id: str,
trigger_time: datetime,
callback: Callable,
metadata: Dict[str, Any] = None
) -> bool:
"""
调度一个事件在指定时间触发
Args:
event_id: 事件唯一标识
trigger_time: 触发时间
callback: 回调函数
metadata: 事件元数据
Returns:
bool: 调度成功返回True
"""
try:
if metadata is None:
metadata = {}
# 如果事件已存在,先取消
if event_id in self.scheduled_events:
await self.cancel_event(event_id)
# 计算延迟时间
now = datetime.now()
delay = (trigger_time - now).total_seconds()
if delay <= 0:
logger.warning(f"事件 {event_id} 的触发时间已过,立即执行")
# 立即执行
asyncio.create_task(self._execute_callback(event_id, callback, metadata))
return True
# 创建调度事件
scheduled_event = ScheduledEvent(
event_id=event_id,
trigger_time=trigger_time,
callback=callback,
metadata=metadata
)
# 创建异步任务
scheduled_event.task = asyncio.create_task(
self._wait_and_execute(scheduled_event)
)
self.scheduled_events[event_id] = scheduled_event
logger.info(f"调度事件 {event_id} 将在 {trigger_time} 触发 (延迟 {delay:.1f} 秒)")
return True
except Exception as e:
logger.error(f"调度事件失败: {e}")
return False
async def _wait_and_execute(self, event: ScheduledEvent):
"""等待并执行事件"""
try:
now = datetime.now()
delay = (event.trigger_time - now).total_seconds()
if delay > 0:
await asyncio.sleep(delay)
# 检查是否被取消
if self._shutdown or event.event_id not in self.scheduled_events:
return
# 执行回调
await self._execute_callback(event.event_id, event.callback, event.metadata)
except asyncio.CancelledError:
logger.info(f"事件 {event.event_id} 被取消")
except Exception as e:
logger.error(f"执行事件 {event.event_id} 时出错: {e}")
finally:
# 清理已完成的事件
if event.event_id in self.scheduled_events:
del self.scheduled_events[event.event_id]
async def _execute_callback(self, event_id: str, callback: Callable, metadata: Dict[str, Any]):
"""执行回调函数"""
try:
logger.info(f"执行调度事件: {event_id}")
# 根据回调函数签名调用
if asyncio.iscoroutinefunction(callback):
await callback(metadata)
else:
callback(metadata)
except Exception as e:
logger.error(f"执行回调函数失败: {e}")
logger.error(traceback.format_exc())
async def cancel_event(self, event_id: str) -> bool:
"""
取消一个调度事件
Args:
event_id: 事件ID
Returns:
bool: 取消成功返回True
"""
try:
if event_id in self.scheduled_events:
event = self.scheduled_events[event_id]
if event.task and not event.task.done():
event.task.cancel()
del self.scheduled_events[event_id]
logger.info(f"取消调度事件: {event_id}")
return True
return False
except Exception as e:
logger.error(f"取消事件失败: {e}")
return False
async def shutdown(self):
"""关闭调度器,取消所有事件"""
self._shutdown = True
for event_id in list(self.scheduled_events.keys()):
await self.cancel_event(event_id)
logger.info("事件调度器已关闭")
def get_scheduled_events(self) -> Dict[str, ScheduledEvent]:
"""获取所有调度事件"""
return self.scheduled_events.copy()
def get_event_count(self) -> int:
"""获取调度事件数量"""
return len(self.scheduled_events)
# 全局事件调度器实例
event_scheduler = EventDrivenScheduler()
# 便捷函数
async def schedule_reminder(
reminder_id: str,
reminder_time: datetime,
chat_id: str,
reminder_content: str,
callback: Callable
):
"""
调度提醒事件的便捷函数
Args:
reminder_id: 提醒唯一标识
reminder_time: 提醒时间
chat_id: 聊天ID
reminder_content: 提醒内容
callback: 回调函数
"""
metadata = {
"type": "reminder",
"chat_id": chat_id,
"content": reminder_content,
"created_at": datetime.now().isoformat()
}
return await event_scheduler.schedule_event(
event_id=reminder_id,
trigger_time=reminder_time,
callback=callback,
metadata=metadata
)
async def _execute_reminder_callback(subheartflow_id: str, reminder_text: str):
"""执行提醒回调函数"""
try:
# 获取对应的subheartflow实例
from src.chat.heart_flow.heartflow import heartflow
subflow = await heartflow.get_or_create_subheartflow(subheartflow_id)
if not subflow:
logger.error(f"无法获取subheartflow实例: {subheartflow_id}")
return
# 创建主动思考事件,触发完整的思考流程
from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent
event = ProactiveTriggerEvent(
source="reminder_system",
reason=f"定时提醒:{reminder_text}",
metadata={
"reminder_text": reminder_text,
"trigger_time": datetime.now().isoformat()
}
)
# 通过subflow的HeartFChatting实例触发主动思考
await subflow.heart_fc_instance.proactive_thinker.think(event)
logger.info(f"已触发提醒的主动思考,内容: {reminder_text}")
except Exception as e:
logger.error(f"执行提醒回调时发生错误: {e}")
import traceback
traceback.print_exc()

View File

@@ -11,3 +11,4 @@ class ProactiveTriggerEvent:
source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager"
reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo"
metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息
related_message_id: Optional[str] = None # 关联的消息ID用于加载上下文

View File

@@ -1,6 +1,7 @@
import time
import traceback
import orjson
import re
from typing import TYPE_CHECKING, Dict, Any
from src.common.logger import get_logger
@@ -15,7 +16,8 @@ from src.plugin_system.base.component_types import ComponentType
from src.config.config import global_config
from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, build_readable_messages_with_id
from src.mood.mood_manager import mood_manager
from src.common.database.sqlalchemy_database_api import store_action_info
from src.common.database.sqlalchemy_database_api import store_action_info, db_get
from src.common.database.sqlalchemy_models import Messages
if TYPE_CHECKING:
from ..cycle_processor import CycleProcessor
@@ -118,69 +120,203 @@ class ProactiveThinker:
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
try:
# 调用规划器的 PROACTIVE 模式,让其决定下一步的行动
actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
# 如果是提醒事件,跳过规划器,直接构建默认动作
if trigger_event.source == "reminder_system":
# 1. 获取上下文信息
metadata = trigger_event.metadata or {}
action_message = metadata
reminder_content = trigger_event.reason.replace("定时提醒:", "").strip()
# 通常只关心规划出的第一个动作
action_result = actions[0] if actions else {}
# 2. 确定目标用户名
target_user_name = None
match = re.search(r"艾特一下([^,\s]+)", reminder_content)
if match:
target_user_name = match.group(1)
else:
from src.person_info.person_info import get_person_info_manager
user_id = metadata.get("user_id")
platform = metadata.get("platform")
if user_id and platform:
person_id = get_person_info_manager().get_person_id(platform, user_id)
target_user_name = await get_person_info_manager().get_value(person_id, "person_name")
action_type = action_result.get("action_type")
if not target_user_name:
logger.warning(f"无法从提醒 '{reminder_content}' 中确定目标用户,回退")
raise Exception("无法确定目标用户")
# 3. 构建动作
action_result = {
"action_type": "at_user",
"reasoning": "执行定时提醒",
"action_data": {
"user_name": target_user_name,
"at_message": reminder_content
},
"action_message": action_message
}
# 4. 执行或回退
try:
original_chat_id = metadata.get("chat_id")
if not original_chat_id:
if trigger_event.related_message_id:
db_message = await db_get(Messages, {"message_id": trigger_event.related_message_id}, single_result=True) or {}
original_chat_id = db_message.get("chat_id")
if not original_chat_id:
raise Exception("提醒事件中缺少chat_id")
from src.chat.heart_flow.heartflow import heartflow
subflow = await heartflow.get_or_create_subheartflow(original_chat_id)
if not subflow:
raise Exception(f"无法为chat_id {original_chat_id} 获取subflow")
success, _, _ = await subflow.heart_fc_instance.cycle_processor._handle_action(
action=action_result["action_type"],
reasoning=action_result["reasoning"],
action_data=action_result["action_data"],
cycle_timers={},
thinking_id="",
action_message=action_result["action_message"],
)
if not success:
raise Exception("at_user action failed")
except Exception as e:
logger.warning(f"{self.context.log_prefix} at_user动作执行失败: {e}回退到proactive_reply")
fallback_action = {
"action_type": "proactive_reply",
"action_data": {"topic": trigger_event.reason},
"action_message": action_message
}
await self._generate_proactive_content_and_send(fallback_action, trigger_event)
if action_type == "proactive_reply":
await self._generate_proactive_content_and_send(action_result)
elif action_type != "do_nothing":
logger.warning(f"{self.context.log_prefix} 主动思考返回了未知的动作类型: {action_type}")
else:
# 如果规划结果是“什么都不做”,则记录日志
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
# 对于其他来源的主动思考,正常调用规划器
actions, _ = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
action_result = actions[0] if actions else {}
action_type = action_result.get("action_type")
if action_type == "proactive_reply":
await self._generate_proactive_content_and_send(action_result, trigger_event)
elif action_type not in ["do_nothing", "no_action"]:
await self.cycle_processor._handle_action(
action=action_result["action_type"],
reasoning=action_result.get("reasoning", ""),
action_data=action_result.get("action_data", {}),
cycle_timers={},
thinking_id="",
action_message=action_result.get("action_message")
)
else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc())
async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any]):
async def _get_reminder_context(self, message_id: str) -> str:
"""获取提醒消息的上下文"""
try:
# 只获取那一条消息
message_record = await db_get(Messages, {"message_id": message_id}, single_result=True)
if message_record:
# 使用 build_readable_messages_with_id 来格式化单条消息
chat_context_block, _ = build_readable_messages_with_id(messages=[message_record])
return chat_context_block
return "无法加载相关的聊天记录。"
except Exception as e:
logger.error(f"{self.context.log_prefix} 获取提醒上下文失败: {e}")
return "无法加载相关的聊天记录。"
async def _generate_proactive_content_and_send(self, action_result: Dict[str, Any], trigger_event: ProactiveTriggerEvent):
"""
获取实时信息,构建最终的生成提示词,并生成和发送主动回复。
Args:
action_result (Dict[str, Any]): 规划器返回的动作结果。
trigger_event (ProactiveTriggerEvent): 触发事件。
"""
try:
topic = action_result.get("action_data", {}).get("topic", "随便聊聊")
logger.info(f"{self.context.log_prefix} 主动思考确定主题: '{topic}'")
# 1. 获取日程信息
schedule_block = "你今天没有日程安排。"
if global_config.planning_system.schedule_enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = f"你当前正在:{current_activity}"
# 2. 网络搜索
news_block = "暂时没有获取到最新资讯。"
try:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool:
tool_args = {"query": topic, "max_results": 10}
# 调用工具,并传递参数
search_result_dict = await web_search_tool.execute(**tool_args)
if search_result_dict and not search_result_dict.get("error"):
news_block = search_result_dict.get("content", "未能提取有效资讯。")
if trigger_event.source != "reminder_system":
try:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool:
try:
search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10)
except TypeError:
try:
search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10)
except TypeError:
logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索")
news_block = "跳过网络搜索。"
search_result_dict = None
if search_result_dict and not search_result_dict.get("error"):
news_block = search_result_dict.get("content", "未能提取有效资讯。")
elif search_result_dict:
logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}")
else:
logger.warning(f"{self.context.log_prefix} 网络搜索返回错误: {search_result_dict.get('error')}")
else:
logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}")
logger.warning(f"{self.context.log_prefix} 未找到 web_search 工具实例。")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考时网络搜索失败: {e}")
# 3. 获取最新的聊天上下文
message_list = get_raw_msg_before_timestamp_with_chat(
chat_id=self.context.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.3),
if trigger_event.source == "reminder_system" and trigger_event.related_message_id:
chat_context_block = await self._get_reminder_context(trigger_event.related_message_id)
else:
message_list = get_raw_msg_before_timestamp_with_chat(
chat_id=self.context.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.3),
)
chat_context_block, _ = build_readable_messages_with_id(messages=message_list)
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
bot_name = global_config.bot.nickname
confirmation_prompt = f"""# 主动回复二次确认
## 基本信息
你的名字是{bot_name},准备主动发起关于"{topic}"的话题。
## 最近的聊天内容
{chat_context_block}
## 合理判断标准
请检查以下条件,如果**大部分条件都合理**就可以回复:
1. **时间合理性**当前时间是否在深夜凌晨2点-6点这种不适合主动聊天的时段
2. **内容价值**:这个话题"{topic}"是否有意义,不是完全无关紧要的内容?
3. **重复避免**你准备说的话题是否与最近2条消息明显重复
4. **自然性**:在当前上下文中主动提起这个话题是否自然合理?
## 输出要求
如果判断应该跳过比如深夜时段、完全无意义话题、明显重复内容输出SKIP_PROACTIVE_REPLY
其他情况都应该输出PROCEED_TO_REPLY
请严格按照上述格式输出,不要添加任何解释。"""
planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner,
request_type="planner"
)
chat_context_block, _ = build_readable_messages_with_id(messages=message_list)
# 4. 构建最终的生成提示词
confirmation_result, _ = await planner_llm.generate_response_async(prompt=confirmation_prompt)
if not confirmation_result or "SKIP_PROACTIVE_REPLY" in confirmation_result:
logger.info(f"{self.context.log_prefix} 决策模型二次确认决定跳过主动回复")
return
bot_name = global_config.bot.nickname
personality = global_config.personality
identity_block = (
@@ -200,29 +336,30 @@ class ProactiveThinker:
## 你今天的日程安排
{schedule_block}
## 关于你准备讨论的话题{topic}的最新信息
## 关于你准备讨论的话题"{topic}"的最新信息
{news_block}
## 最近的聊天内容
{chat_context_block}
## 任务
之前决定要发起一个关于“{topic}”的对话。现在,请结合以上所有信息,自然地开启这个话题
现在想要主动说些什么。话题是"{topic}",但这只是一个参考方向
根据最近的聊天内容,你可以:
- 如果是想关心朋友,就自然地询问他们的情况
- 如果想起了之前的话题,就问问后来怎么样了
- 如果有什么想分享的想法,就自然地开启话题
- 如果只是想闲聊,就随意地说些什么
## 要求
- 你的发言要听起来像是自发的,而不是在念报告。
- 巧妙地将日程安排或最新信息融入到你的开场白中。
- 风格要符合你的角色设定
- 直接输出你想说的内容,不要包含其他额外信息。
- 像真正的朋友一样,自然地表达关心或好奇
- 不要过于正式,要口语化和亲切
- 合你的角色设定,保持温暖的风格
- 直接输出你想说的,不要解释为什么要说
你的回复应该:
1. 可以分享你的看法、提出相关问题,或者开个合适的玩笑。
2. 目的是让对话更有趣、更深入。
3. 不要浮夸,不要夸张修辞,不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。
最终请输出一条简短、完整且口语化的回复。
请输出一条简短、自然的主动发言。
"""
# 5. 调用生成器API并发送
response_text = await generator_api.generate_response_custom(
chat_stream=self.context.chat_stream,
prompt=final_prompt,

View File

@@ -0,0 +1,260 @@
"""
智能提醒分析器
使用LLM分析用户消息识别提醒请求并提取时间和内容信息
"""
import re
import json
from datetime import datetime, timedelta
from typing import Optional
from src.common.logger import get_logger
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
logger = get_logger("smart_reminder")
class ReminderEvent:
"""提醒事件数据类"""
def __init__(self, user_id: str, reminder_time: datetime, content: str, confidence: float):
self.user_id = user_id
self.reminder_time = reminder_time
self.content = content
self.confidence = confidence
def __repr__(self):
return f"ReminderEvent(user_id={self.user_id}, time={self.reminder_time}, content={self.content}, confidence={self.confidence})"
def to_dict(self):
return {
'user_id': self.user_id,
'reminder_time': self.reminder_time.isoformat(),
'content': self.content,
'confidence': self.confidence
}
class SmartReminderAnalyzer:
"""智能提醒分析器"""
def __init__(self):
self.confidence_threshold = 0.7
# 使用规划器模型进行分析
self.analyzer_llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="reminder_analyzer"
)
async def analyze_message(self, user_id: str, message: str) -> Optional[ReminderEvent]:
"""分析消息是否包含提醒请求
Args:
user_id: 用户ID
message: 用户消息内容
Returns:
ReminderEvent对象如果没有检测到提醒请求则返回None
"""
if not message or len(message.strip()) == 0:
return None
logger.debug(f"分析消息中的提醒请求: {message}")
# 使用LLM分析消息
analysis_result = await self._analyze_with_llm(message)
if not analysis_result or analysis_result.get('confidence', 0) < 0.5: # 降低置信度阈值
return None
try:
# 解析时间
reminder_time = self._parse_relative_time(analysis_result['relative_time'])
if not reminder_time:
return None
# 创建提醒事件
reminder_event = ReminderEvent(
user_id=user_id,
reminder_time=reminder_time,
content=analysis_result.get('content', '提醒'),
confidence=analysis_result['confidence']
)
logger.info(f"检测到提醒请求: {reminder_event}")
return reminder_event
except Exception as e:
logger.error(f"创建提醒事件失败: {e}")
return None
async def _analyze_with_llm(self, message: str) -> Optional[dict]:
"""使用LLM分析消息中的提醒请求"""
try:
prompt = f"""分析以下消息是否包含提醒请求。
消息: {message}
当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
请判断用户是否想要设置提醒,如果是,请提取:
1. 是否包含提醒请求 (has_reminder: true/false)
2. 置信度 (confidence: 0.0-1.0)
3. 相对时间表达 (relative_time: 标准化的时间表达,例如将'半小时后'转换为'30分钟后', '明天下午三点'转换为'明天15点')
4. 提醒内容 (content: 提醒的具体内容)
5. 分析原因 (reasoning: 判断理由)
请以JSON格式输出:
{{
"has_reminder": true/false,
"confidence": 0.0-1.0,
"relative_time": "标准化的时间表达 (例如 '30分钟后', '2小时后')",
"content": "提醒内容",
"reasoning": "判断理由"
}}"""
response, _ = await self.analyzer_llm.generate_response_async(prompt=prompt)
if not response:
return None
# 解析JSON响应处理可能的markdown包装
try:
# 清理响应文本
cleaned_response = response.strip()
# 移除markdown代码块包装
if cleaned_response.startswith('```json'):
cleaned_response = cleaned_response[7:] # 移除 ```json
elif cleaned_response.startswith('```'):
cleaned_response = cleaned_response[3:] # 移除 ```
if cleaned_response.endswith('```'):
cleaned_response = cleaned_response[:-3] # 移除结尾的 ```
cleaned_response = cleaned_response.strip()
# 解析JSON
result = json.loads(cleaned_response)
if result.get('has_reminder', False):
logger.info(f"LLM分析结果: {result}")
return result
except json.JSONDecodeError as e:
logger.error(f"LLM响应JSON解析失败: {response}, Error: {e}")
# 尝试使用更宽松的JSON修复
try:
import re
# 提取JSON部分的正则表达式
json_match = re.search(r'\{.*\}', cleaned_response, re.DOTALL)
if json_match:
json_str = json_match.group()
result = json.loads(json_str)
if result.get('has_reminder', False):
logger.info(f"备用解析成功: {result}")
return result
except Exception as fallback_error:
logger.error(f"备用JSON解析也失败了: {fallback_error}")
except Exception as e:
logger.error(f"LLM分析失败: {e}")
return None
def _parse_relative_time(self, time_expr: str) -> Optional[datetime]:
"""解析时间表达式(支持相对时间和绝对时间)"""
try:
now = datetime.now()
# 1. 匹配相对时间X分钟后包括中文数字
# 先尝试匹配阿拉伯数字
minutes_match = re.search(r'(\d+)\s*分钟后', time_expr)
if minutes_match:
minutes = int(minutes_match.group(1))
result = now + timedelta(minutes=minutes)
logger.info(f"相对时间解析结果: timedelta(minutes={minutes}) -> {result}")
return result
# 匹配中文数字分钟
chinese_minutes_patterns = [
(r'一分钟后', 1), (r'二分钟后', 2), (r'两分钟后', 2), (r'三分钟后', 3), (r'四分钟后', 4), (r'五分钟后', 5),
(r'六分钟后', 6), (r'七分钟后', 7), (r'八分钟后', 8), (r'九分钟后', 9), (r'十分钟后', 10),
(r'十一分钟后', 11), (r'十二分钟后', 12), (r'十三分钟后', 13), (r'十四分钟后', 14), (r'十五分钟后', 15),
(r'二十分钟后', 20), (r'三十分钟后', 30), (r'四十分钟后', 40), (r'五十分钟后', 50), (r'六十分钟后', 60)
]
for pattern, minutes in chinese_minutes_patterns:
if re.search(pattern, time_expr):
result = now + timedelta(minutes=minutes)
logger.info(f"中文时间解析结果: {pattern} -> {minutes}分钟 -> {result}")
return result
# 2. 匹配相对时间X小时后
hours_match = re.search(r'(\d+)\s*小时后', time_expr)
if hours_match:
hours = int(hours_match.group(1))
result = now + timedelta(hours=hours)
logger.info(f"相对时间解析结果: timedelta(hours={hours})")
return result
# 3. 匹配相对时间X秒后
seconds_match = re.search(r'(\d+)\s*秒后', time_expr)
if seconds_match:
seconds = int(seconds_match.group(1))
result = now + timedelta(seconds=seconds)
logger.info(f"相对时间解析结果: timedelta(seconds={seconds})")
return result
# 4. 匹配明天+具体时间明天下午2点、明天上午10点
tomorrow_match = re.search(r'明天.*?(\d{1,2})\s*[点时]', time_expr)
if tomorrow_match:
hour = int(tomorrow_match.group(1))
# 如果是下午且小于12加12小时
if '下午' in time_expr and hour < 12:
hour += 12
elif '上午' in time_expr and hour == 12:
hour = 0
tomorrow = now + timedelta(days=1)
result = tomorrow.replace(hour=hour, minute=0, second=0, microsecond=0)
logger.info(f"绝对时间解析结果: 明天{hour}")
return result
# 5. 匹配今天+具体时间今天下午3点、今天晚上8点
today_match = re.search(r'今天.*?(\d{1,2})\s*[点时]', time_expr)
if today_match:
hour = int(today_match.group(1))
# 如果是下午且小于12加12小时
if '下午' in time_expr and hour < 12:
hour += 12
elif '晚上' in time_expr and hour < 12:
hour += 12
elif '上午' in time_expr and hour == 12:
hour = 0
result = now.replace(hour=hour, minute=0, second=0, microsecond=0)
# 如果时间已过,设为明天
if result <= now:
result += timedelta(days=1)
logger.info(f"绝对时间解析结果: 今天{hour}")
return result
# 6. 匹配纯数字时间14点、2点
pure_time_match = re.search(r'(\d{1,2})\s*[点时]', time_expr)
if pure_time_match:
hour = int(pure_time_match.group(1))
result = now.replace(hour=hour, minute=0, second=0, microsecond=0)
# 如果时间已过,设为明天
if result <= now:
result += timedelta(days=1)
logger.info(f"绝对时间解析结果: {hour}")
return result
except Exception as e:
logger.error(f"时间解析失败: {time_expr}, Error: {e}")
return None
# 全局智能提醒分析器实例
smart_reminder_analyzer = SmartReminderAnalyzer()

View File

@@ -2,6 +2,7 @@ import asyncio
import re
import math
import traceback
from datetime import datetime
from typing import Tuple, TYPE_CHECKING
@@ -16,6 +17,7 @@ from src.chat.utils.chat_message_builder import replace_user_references_sync
from src.common.logger import get_logger
from src.person_info.relationship_manager import get_relationship_manager
from src.mood.mood_manager import mood_manager
from src.chat.message_receive.chat_stream import get_chat_manager
if TYPE_CHECKING:
from src.chat.heart_flow.sub_heartflow import SubHeartflow
@@ -116,10 +118,11 @@ class HeartFCMessageReceiver:
主要流程:
1. 消息解析与初始化
2. 消息缓冲处理
3. 过滤检查
4. 兴趣度计算
5. 关系处理
2. 智能提醒分析
3. 消息缓冲处理
4. 过滤检查
5. 兴趣度计算
6. 关系处理
Args:
message_data: 原始消息字符串
@@ -129,7 +132,93 @@ class HeartFCMessageReceiver:
userinfo = message.message_info.user_info
chat = message.chat_stream
# 2. 兴趣度计算与更新
# 2. 智能提醒分析 - 检查用户是否请求提醒
from src.chat.chat_loop.proactive.smart_reminder_analyzer import smart_reminder_analyzer
from src.chat.chat_loop.proactive.event_scheduler import event_scheduler
try:
reminder_event = await smart_reminder_analyzer.analyze_message(
userinfo.user_id, # type: ignore
message.processed_plain_text
)
if reminder_event:
logger.info(f"检测到提醒请求: {reminder_event}")
# 创建提醒回调函数
async def reminder_callback(metadata):
"""提醒执行回调函数 - 触发完整的主动思考流程"""
try:
# 获取对应的subheartflow实例
from src.chat.heart_flow.heartflow import heartflow
subflow = await heartflow.get_or_create_subheartflow(metadata.get("chat_id"))
if not subflow:
logger.error(f"无法获取subheartflow实例: {metadata.get('chat_id')}")
return
# 创建主动思考事件,触发完整的思考流程
from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent
reminder_content = metadata.get('content', '提醒时间到了')
event = ProactiveTriggerEvent(
source="reminder_system",
reason=f"定时提醒:{reminder_content}",
metadata=metadata,
related_message_id=metadata.get("original_message_id")
)
# 通过subflow的HeartFChatting实例触发主动思考
await subflow.heart_fc_instance.proactive_thinker.think(event)
logger.info(f"已触发提醒的主动思考,内容: {reminder_content}")
except Exception as callback_error:
logger.error(f"执行提醒回调失败: {callback_error}")
import traceback
logger.error(traceback.format_exc())
# Fallback: 如果主动思考失败,直接发送提醒消息
try:
from src.plugin_system.apis.send_api import text_to_stream
reminder_content = metadata.get('content', '提醒时间到了')
await text_to_stream(
text=f"⏰ 提醒:{reminder_content}",
stream_id=metadata.get("chat_id"),
typing=False
)
logger.info(f"Fallback提醒消息已发送: {reminder_content}")
except Exception as fallback_error:
logger.error(f"Fallback提醒也失败了: {fallback_error}")
# 调度提醒事件
event_id = f"reminder_{reminder_event.user_id}_{int(reminder_event.reminder_time.timestamp())}"
metadata = {
"type": "reminder",
"user_id": reminder_event.user_id,
"platform": chat.platform,
"chat_id": chat.stream_id,
"content": reminder_event.content,
"confidence": reminder_event.confidence,
"created_at": datetime.now().isoformat(),
"original_message_id": message.message_info.message_id
}
success = await event_scheduler.schedule_event(
event_id=event_id,
trigger_time=reminder_event.reminder_time,
callback=reminder_callback,
metadata=metadata
)
if success:
logger.info(f"提醒事件调度成功: {event_id}")
else:
logger.error(f"提醒事件调度失败: {event_id}")
except Exception as e:
logger.error(f"智能提醒分析失败: {e}")
# 3. 兴趣度计算与更新
interested_rate, is_mentioned, keywords = await _calculate_interest(message)
message.interest_value = interested_rate
message.is_mentioned = is_mentioned

View File

@@ -50,6 +50,7 @@ def init_prompt():
{time_block}
{identity_block}
{users_in_chat}
{custom_prompt_block}
{chat_context_description},以下是具体的聊天内容。
{chat_content_block}
@@ -75,9 +76,9 @@ def init_prompt():
{action_options_text}
你必须从上面列出的可用action中选择一个并说明触发action的消息id不是消息原文和选择该action的原因。消息id格式:m+数字
你必须从上面列出的可用action中选择一个或多个并说明触发action的消息id不是消息原文和选择该action的原因。消息id格式:m+数字
请根据动作示例,以严格的 JSON 格式输出不要输出markdown格式```json等内容直接输出且仅包含 JSON 内容:
请根据动作示例,以严格的 JSON 格式输出,返回一个包含所有选定动作的JSON列表。如果只选择一个动作也请将其包含在列表中。如果没有任何合适的动作返回一个空列表[]。不要输出markdown格式```json等内容直接输出且仅包含 JSON 列表内容:
""",
"planner_prompt",
)
@@ -102,29 +103,38 @@ def init_prompt():
{actions_before_now_block}
## 任务
基于以上所有信息(特别是最近的聊天内容),分析当前情况,决定是否适合主动开启一个**新的、但又与当前氛围相关**的话题
你现在要决定是否主动说些什么。就像一个真实的人一样,有时候会突然想起之前聊到的话题,或者对朋友的近况感到好奇,想主动询问或关心一下
请基于聊天内容,用你的判断力来决定是否要主动发言。不要按照固定规则,而是像人类一样自然地思考:
- 是否想起了什么之前提到的事情,想问问后来怎么样了?
- 是否注意到朋友提到了什么值得关心的事情?
- 是否有什么话题突然想到,觉得现在聊聊很合适?
- 或者觉得现在保持沉默更好?
## 可用动作
动作proactive_reply
动作描述:在当前对话的基础上,主动发起一个新的对话,分享一个有趣的想法、见闻或者对未来的计划
- 当你觉得可以说些什么来活跃气氛,并且内容与当前聊天氛围不冲突
- 当你有一些新的想法或计划想要分享,并且可以自然地衔接当前话题
动作描述:主动发起对话,可以是关心朋友、询问近况、延续之前的话题,或分享想法
- 当你突然想起之前的话题,想询问进展
- 当你想关心朋友的情况
- 当你有什么想法想分享时
- 当你觉得现在是个合适的聊天时机时
{{
"action": "proactive_reply",
"reason": "决定主动发起对话的具体原因",
"topic": "你想要发起对话的主题或内容(需要简洁"
"reason": "决定主动发的具体原因",
"topic": "你想说的内容主题(简洁描述"
}}
动作do_nothing
动作描述:保持沉默,不主动发起任何动作或对话。
- 当你分析了所有信息后,觉得当前不是一个发起互动的好时机时
- 当最近的聊天内容很连贯,你的插入会打断别人
动作描述:保持沉默,不主动发起对话。
- 当你觉得现在不是合适的时机时
- 当最近已经说得够多了
- 当对话氛围不适合插入时
{{
"action": "do_nothing",
"reason":"决定保持沉默的具体原因"
"reason": "决定保持沉默的原因"
}}
你必须从上面列出的可用action中选择一个。
你必须从上面列出的可用action中选择一个。要像真人一样自然地思考和决策。
请以严格的 JSON 格式输出,且仅包含 JSON 内容:
""",
"proactive_planner_prompt",
@@ -144,38 +154,6 @@ def init_prompt():
"action_prompt",
)
Prompt(
"""
{name_block}
{personality_block}
{chat_context_description}{time_block}现在请你根据以下聊天内容选择一个或多个合适的action。如果没有合适的action请选择no_action。,
{chat_content_block}
**要求**
1.action必须符合使用条件如果符合条件就选择
2.如果聊天内容不适合使用action即使符合条件也不要使用
3.{moderation_prompt}
4.请注意如果相同的内容已经被执行,请不要重复执行
这是你最近执行过的动作:
{actions_before_now_block}
**可用的action**
no_action不选择任何动作
{{
"action": "no_action",
"reason":"不动作的原因"
}}
{action_options_text}
请选择并说明触发action的消息id和选择该action的原因。消息id格式:m+数字
请根据动作示例,以严格的 JSON 格式输出,且仅包含 JSON 内容:
""",
"sub_planner_prompt",
)
class ActionPlanner:
def __init__(self, chat_id: str, action_manager: ActionManager):
@@ -187,11 +165,6 @@ class ActionPlanner:
self.planner_llm = LLMRequest(
model_set=model_config.model_task_config.planner, request_type="planner"
)
# --- 小脑 (新增) ---
self.planner_small_llm = LLMRequest(
model_set=model_config.model_task_config.planner_small, request_type="planner_small"
)
self.last_obs_time_mark = 0.0
async def _get_long_term_memory_context(self) -> str:
@@ -294,14 +267,14 @@ class ActionPlanner:
# 假设消息列表是按时间顺序排列的,最后一个是最新的
return message_id_list[-1].get("message")
def _parse_single_action(
async def _parse_single_action(
self,
action_json: dict,
message_id_list: list, # 使用 planner.py 的 list of dict
current_available_actions: list, # 使用 planner.py 的 list of tuple
) -> List[Dict[str, Any]]:
"""
[注释] 解析单个小脑LLM返回的action JSON并将其转换为标准化的字典。
[注释] 解析单个LLM返回的action JSON并将其转换为标准化的字典。
"""
parsed_actions = []
try:
@@ -310,7 +283,7 @@ class ActionPlanner:
action_data = {k: v for k, v in action_json.items() if k not in ["action", "reason"]}
target_message = None
if action != "no_action":
if action not in ["no_action", "no_reply"]:
if target_message_id := action_json.get("target_message_id"):
target_message = self.find_message_by_id(target_message_id, message_id_list)
if target_message is None:
@@ -320,7 +293,7 @@ class ActionPlanner:
logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id")
available_action_names = [name for name, _ in current_available_actions]
if action not in ["no_action", "reply"] and action not in available_action_names:
if action not in ["no_action", "no_reply", "reply"] and action not in available_action_names:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {available_action_names}),将强制使用 'no_action'"
)
@@ -338,6 +311,16 @@ class ActionPlanner:
"available_actions": available_actions_dict,
}
)
# 如果是at_user动作且只有user_name尝试转换为user_id
if action == "at_user" and "user_name" in action_data and "user_id" not in action_data:
user_name = action_data["user_name"]
from src.person_info.person_info import get_person_info_manager
user_info = await get_person_info_manager().get_person_info_by_name(user_name)
if user_info and user_info.get("user_id"):
action_data["user_id"] = user_info["user_id"]
logger.info(f"成功将用户名 '{user_name}' 解析为 user_id '{user_info['user_id']}'")
else:
logger.warning(f"无法将用户名 '{user_name}' 解析为 user_id")
except Exception as e:
logger.error(f"{self.log_prefix}解析单个action时出错: {e}")
parsed_actions.append(
@@ -362,285 +345,84 @@ class ActionPlanner:
# 如果都是 no_action则返回一个包含第一个 no_action 的列表,以保留 reason
return action_list[:1] if action_list else []
async def sub_plan(
self,
action_list: list, # 使用 planner.py 的 list of tuple
chat_content_block: str,
message_id_list: list, # 使用 planner.py 的 list of dict
is_group_chat: bool = False,
chat_target_info: Optional[dict] = None,
) -> List[Dict[str, Any]]:
"""
[注释] "小脑"规划器。接收一小组actions使用轻量级LLM判断其中哪些应该被触发。
这是一个独立的、并行的思考单元。返回一个包含action字典的列表。
"""
try:
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=time.time() - 1200,
timestamp_end=time.time(),
limit=20,
)
action_names_in_list = [name for name, _ in action_list]
filtered_actions = [
record for record in actions_before_now if record.get("action_name") in action_names_in_list
]
actions_before_now_block = build_readable_actions(actions=filtered_actions)
chat_context_description = "你现在正在一个群聊中"
if not is_group_chat and chat_target_info:
chat_target_name = chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = ""
for using_actions_name, using_actions_info in action_list:
param_text = ""
if using_actions_info.action_parameters:
param_text = "\n" + "\n".join(
f' "{p_name}":"{p_desc}"'
for p_name, p_desc in using_actions_info.action_parameters.items()
)
require_text = "\n".join(f"- {req}" for req in using_actions_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=using_actions_name,
action_description=using_actions_info.description,
action_parameters=param_text,
action_require=require_text,
)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
name_block = f"你的名字是{bot_name}{bot_nickname},请注意哪些是你自己的发言。"
# 构建人格信息块(仅在启用时)
personality_block = ""
if global_config.chat.include_personality:
personality_core = global_config.personality.personality_core
personality_side = global_config.personality.personality_side
if personality_core or personality_side:
personality_parts = []
if personality_core:
personality_parts.append(f"核心人格:{personality_core}")
if personality_side:
personality_parts.append(f"人格侧面:{personality_side}")
personality_block = "你的人格特征是:" + "".join(personality_parts)
planner_prompt_template = await global_prompt_manager.get_prompt_async("sub_planner_prompt")
prompt = planner_prompt_template.format(
time_block=time_block,
chat_context_description=chat_context_description,
chat_content_block=chat_content_block,
actions_before_now_block=actions_before_now_block,
action_options_text=action_options_block,
moderation_prompt=moderation_prompt_block,
name_block=name_block,
personality_block=personality_block,
)
except Exception as e:
logger.error(f"构建小脑提示词时出错: {e}\n{traceback.format_exc()}")
return [{"action_type": "no_action", "reasoning": f"构建小脑Prompt时出错: {e}"}]
action_dicts: List[Dict[str, Any]] = []
try:
llm_content, (reasoning_content, _, _) = await self.planner_small_llm.generate_response_async(prompt=prompt)
if global_config.debug.show_prompt:
logger.info(f"{self.log_prefix}小脑原始提示词: {prompt}")
logger.info(f"{self.log_prefix}小脑原始响应: {llm_content}")
else:
logger.debug(f"{self.log_prefix}小脑原始响应: {llm_content}")
if llm_content:
parsed_json = orjson.loads(repair_json(llm_content))
if isinstance(parsed_json, list):
for item in parsed_json:
if isinstance(item, dict):
action_dicts.extend(self._parse_single_action(item, message_id_list, action_list))
elif isinstance(parsed_json, dict):
action_dicts.extend(self._parse_single_action(parsed_json, message_id_list, action_list))
except Exception as e:
logger.warning(f"{self.log_prefix}解析小脑响应JSON失败: {e}. LLM原始输出: '{llm_content}'")
action_dicts.append({"action_type": "no_action", "reasoning": f"解析小脑响应失败: {e}"})
if not action_dicts:
action_dicts.append({"action_type": "no_action", "reasoning": "小脑未返回有效action"})
return action_dicts
async def plan(
self,
mode: ChatMode = ChatMode.FOCUS,
loop_start_time: float = 0.0,
available_actions: Optional[Dict[str, ActionInfo]] = None,
pseudo_message: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""
[注释] "大脑"规划器。
1. 启动多个并行的"小脑"(sub_plan)来决定是否执行具体的actions。
2. 自己(大脑)则专注于决定是否进行聊天回复(reply)。
3. 整合大脑和小脑的决策,返回最终要执行的动作列表。
统一决策是否进行聊天回复(reply)以及执行哪些actions。
"""
# --- 1. 准备上下文信息 ---
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.chat_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
# 大脑使用较长的上下文
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal",
read_mark=self.last_obs_time_mark,
truncate=True,
show_actions=True,
)
# 小脑使用较短、较新的上下文
message_list_before_now_short = message_list_before_now[-int(global_config.chat.max_context_size * 0.3) :]
chat_content_block_short, message_id_list_short = build_readable_messages_with_id(
messages=message_list_before_now_short,
timestamp_mode="normal",
truncate=False,
show_actions=False,
)
self.last_obs_time_mark = time.time()
is_group_chat, chat_target_info, current_available_actions = self.get_necessary_info()
if available_actions is None:
available_actions = current_available_actions
# --- 2. 启动小脑并行思考 ---
all_sub_planner_results: List[Dict[str, Any]] = []
# --- 2. 大脑统一决策 ---
final_actions: List[Dict[str, Any]] = []
try:
sub_planner_actions: Dict[str, ActionInfo] = {}
for action_name, action_info in available_actions.items():
if action_info.activation_type in [ActionActivationType.LLM_JUDGE, ActionActivationType.ALWAYS]:
sub_planner_actions[action_name] = action_info
elif action_info.activation_type == ActionActivationType.RANDOM:
if random.random() < action_info.random_activation_probability:
sub_planner_actions[action_name] = action_info
elif action_info.activation_type == ActionActivationType.KEYWORD:
if any(keyword in chat_content_block_short for keyword in action_info.activation_keywords):
sub_planner_actions[action_name] = action_info
if sub_planner_actions:
sub_planner_actions_num = len(sub_planner_actions)
planner_size_config = global_config.chat.planner_size
sub_planner_size = int(planner_size_config) + (
1 if random.random() < planner_size_config - int(planner_size_config) else 0
)
sub_planner_num = math.ceil(sub_planner_actions_num / sub_planner_size)
logger.info(f"{self.log_prefix}使用{sub_planner_num}个小脑进行思考 (尺寸: {sub_planner_size})")
action_items = list(sub_planner_actions.items())
random.shuffle(action_items)
sub_planner_lists = [action_items[i::sub_planner_num] for i in range(sub_planner_num)]
sub_plan_tasks = [
self.sub_plan(
action_list=action_group,
chat_content_block=chat_content_block_short,
message_id_list=message_id_list_short,
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
)
for action_group in sub_planner_lists
]
sub_plan_results = await asyncio.gather(*sub_plan_tasks)
for sub_result in sub_plan_results:
all_sub_planner_results.extend(sub_result)
sub_actions_str = ", ".join(
a["action_type"] for a in all_sub_planner_results if a["action_type"] != "no_action"
) or "no_action"
logger.info(f"{self.log_prefix}小脑决策: [{sub_actions_str}]")
except Exception as e:
logger.error(f"{self.log_prefix}小脑调度过程中出错: {e}\n{traceback.format_exc()}")
# --- 3. 大脑独立思考是否回复 ---
action, reasoning, action_data, target_message = "no_reply", "大脑初始化默认", {}, None
try:
prompt, _ = await self.build_planner_prompt(
prompt, used_message_id_list = await self.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions={},
current_available_actions=available_actions,
mode=mode,
chat_content_block_override=chat_content_block,
message_id_list_override=message_id_list,
)
llm_content, _ = await self.planner_llm.generate_response_async(prompt=prompt)
if llm_content:
parsed_json = orjson.loads(repair_json(llm_content))
parsed_json = parsed_json[-1] if isinstance(parsed_json, list) and parsed_json else parsed_json
# 确保处理的是列表
if isinstance(parsed_json, dict):
action = parsed_json.get("action", "no_reply")
reasoning = parsed_json.get("reason", "未提供原因")
action_data = {k: v for k, v in parsed_json.items() if k not in ["action", "reason"]}
if action != "no_reply":
if target_id := parsed_json.get("target_message_id"):
target_message = self.find_message_by_id(target_id, message_id_list)
if not target_message:
target_message = self.get_latest_message(message_id_list)
logger.info(f"{self.log_prefix}大脑决策: [{action}]")
parsed_json = [parsed_json]
if isinstance(parsed_json, list):
for item in parsed_json:
if isinstance(item, dict):
final_actions.extend(await self._parse_single_action(item, used_message_id_list, list(available_actions.items())))
# 如果是私聊且开启了强制回复并且没有任何回复性action则强制添加reply
if not is_group_chat and global_config.chat.force_reply_private:
has_reply_action = any(a.get("action_type") == "reply" for a in final_actions)
if not has_reply_action:
final_actions.append({
"action_type": "reply",
"reasoning": "私聊强制回复",
"action_data": {},
"action_message": self.get_latest_message(used_message_id_list),
"available_actions": available_actions,
})
logger.info(f"{self.log_prefix}私聊强制回复已触发,添加 'reply' 动作")
logger.info(f"{self.log_prefix}大脑决策: {[a.get('action_type') for a in final_actions]}")
except Exception as e:
logger.error(f"{self.log_prefix}大脑处理过程中发生意外错误: {e}\n{traceback.format_exc()}")
action, reasoning = "no_reply", f"大脑处理错误: {e}"
# --- 4. 整合大脑和小脑的决策 ---
# 如果是私聊且开启了强制回复则将no_reply强制改为reply
if not is_group_chat and global_config.chat.force_reply_private and action == "no_reply":
action = "reply"
reasoning = "私聊强制回复"
logger.info(f"{self.log_prefix}私聊强制回复已触发,将动作从 'no_reply' 修改为 'reply'")
is_parallel = True
for info in all_sub_planner_results:
action_type = info.get("action_type")
if action_type and action_type not in ["no_action", "no_reply"]:
action_info = available_actions.get(action_type)
if action_info and not action_info.parallel_action:
is_parallel = False
break
action_data["loop_start_time"] = loop_start_time
final_actions: List[Dict[str, Any]] = []
if is_parallel:
logger.info(f"{self.log_prefix}决策模式: 大脑与小脑并行")
if action not in ["no_action", "no_reply"]:
final_actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions,
}
)
final_actions.extend(all_sub_planner_results)
else:
logger.info(f"{self.log_prefix}决策模式: 小脑优先 (检测到非并行action)")
final_actions.extend(all_sub_planner_results)
final_actions.append({"action_type": "no_action", "reasoning": f"大脑处理错误: {e}"})
# --- 3. 后处理 ---
final_actions = self._filter_no_actions(final_actions)
if not final_actions:
final_actions = [
{
"action_type": "no_action",
"reasoning": "所有规划器选择不执行动作",
"reasoning": "规划器选择不执行动作",
"action_data": {}, "action_message": None, "available_actions": available_actions
}
]
final_target_message = target_message
if not final_target_message and final_actions:
final_target_message = next((act.get("action_message") for act in final_actions if act.get("action_message")), None)
final_target_message = next((act.get("action_message") for act in final_actions if act.get("action_message")), None)
# 记录每个动作的原因
for action_info in final_actions:
action_type = action_info.get("action_type", "N/A")
reasoning = action_info.get("reasoning", "")
logger.info(f"{self.log_prefix}决策: [{action_type}],原因: {reasoning}")
actions_str = ", ".join([a.get('action_type', 'N/A') for a in final_actions])
logger.info(f"{self.log_prefix}最终执行动作 ({len(final_actions)}): [{actions_str}]")
@@ -653,8 +435,6 @@ class ActionPlanner:
chat_target_info: Optional[dict],
current_available_actions: Dict[str, ActionInfo],
mode: ChatMode = ChatMode.FOCUS,
chat_content_block_override: Optional[str] = None,
message_id_list_override: Optional[List] = None,
refresh_time: bool = False, # 添加缺失的参数
) -> tuple[str, list]:
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
@@ -688,7 +468,7 @@ class ActionPlanner:
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.2), # 主动思考时只看少量最近消息
)
chat_content_block, _ = build_readable_messages_with_id(
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_short,
timestamp_mode="normal",
truncate=False,
@@ -714,7 +494,7 @@ class ActionPlanner:
chat_content_block=chat_content_block or "最近没有聊天内容。",
actions_before_now_block=actions_before_now_block,
)
return prompt, []
return prompt, message_id_list
# --- FOCUS 和 NORMAL 模式的逻辑 ---
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
@@ -722,7 +502,6 @@ class ActionPlanner:
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal",
@@ -789,6 +568,14 @@ class ActionPlanner:
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content
from src.person_info.person_info import get_person_info_manager
users_in_chat_str = ""
if is_group_chat and chat_target_info and chat_target_info.get("group_id"):
user_list = await get_person_info_manager().get_specific_value_list("person_name", lambda x: x is not None)
if user_list:
users_in_chat_str = "当前聊天中的用户列表(用于@\n" + "\n".join([f"- {name} (ID: {pid})" for pid, name in user_list.items()]) + "\n"
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
@@ -805,6 +592,7 @@ class ActionPlanner:
identity_block=identity_block,
custom_prompt_block=custom_prompt_block,
bot_name=bot_name,
users_in_chat=users_in_chat_str
)
return prompt, message_id_list
except Exception as e:

View File

@@ -135,7 +135,6 @@ class ModelTaskConfig(ValidatedConfigBase):
voice: TaskConfig = Field(..., description="语音识别模型配置")
tool_use: TaskConfig = Field(..., description="专注工具使用模型配置")
planner: TaskConfig = Field(..., description="规划模型配置")
planner_small: TaskConfig = Field(..., description="小脑sub-planner规划模型配置")
embedding: TaskConfig = Field(..., description="嵌入模型配置")
lpmm_entity_extract: TaskConfig = Field(..., description="LPMM实体提取模型配置")
lpmm_rdf_build: TaskConfig = Field(..., description="LPMM RDF构建模型配置")

View File

@@ -92,8 +92,6 @@ class ChatConfig(ValidatedConfigBase):
default_factory=list, description="启用主动思考的群聊范围格式platform:group_id为空则不限制"
)
delta_sigma: int = Field(default=120, description="采用正态分布随机时间间隔")
planner_size: float = Field(default=5.0, ge=1.0, description="小脑sub-planner的尺寸决定每个小脑处理多少个action")
include_personality: bool = Field(default=False, description="是否在小脑决策中包含角色人设信息")
def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float:
"""

View File

@@ -21,12 +21,12 @@ class AtAction(BaseAction):
# === 基本信息(必须填写)===
action_name = "at_user"
action_description = "发送艾特消息"
activation_type = ActionActivationType.LLM_JUDGE # 消息接收时激活(?)
activation_type = ActionActivationType.LLM_JUDGE
parallel_action = False
chat_type_allow = ChatType.GROUP
# === 功能描述(必须填写)===
action_parameters = {"user_name": "需要艾特用户的名字", "at_message": "艾特用户时要发送的消,注意消息里不要有@"}
action_parameters = {"user_name": "需要艾特用户的名字", "at_message": "艾特用户时要发送的消"}
action_require = [
"当需要艾特某个用户时使用",
"当你需要提醒特定用户查看消息时使用",
@@ -48,24 +48,43 @@ class AtAction(BaseAction):
if not user_name or not at_message:
logger.warning("艾特用户的动作缺少必要参数。")
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了艾特用户动作:艾特用户 {user_name} 并发送消息: {at_message},失败了,因为没有提供必要参数",
action_done=False,
)
return False, "缺少必要参数"
user_info = await get_person_info_manager().get_person_info_by_name(user_name)
if not user_info or not user_info.get("user_id"):
logger.info(f"找不到名为 '{user_name}' 的用户。")
from src.plugin_system.apis import send_api
from fuzzywuzzy import process
group_id = self.chat_stream.group_info.group_id
if not group_id:
return False, "无法获取群组ID"
response = await send_api.adapter_command_to_stream(
action="get_group_member_list",
params={"group_id": group_id},
stream_id=self.chat_id,
)
if response.get("status") != "ok":
return False, f"获取群成员列表失败: {response.get('message')}"
member_list = response.get("data", [])
if not member_list:
return False, "群成员列表为空"
# 使用模糊匹配找到最接近的用户名
choices = {member["card"] or member["nickname"]: member["user_id"] for member in member_list}
best_match, score = process.extractOne(user_name, choices.keys())
if score < 30: # 设置一个匹配度阈值
logger.info(f"找不到与 '{user_name}' 高度匹配的用户 (最佳匹配: {best_match}, 分数: {score})")
return False, "用户不存在"
user_id = choices[best_match]
user_info = {"user_id": user_id, "user_nickname": best_match}
try:
# 使用回复器生成艾特回复,而不是直接发送命令
from src.chat.replyer.default_generator import DefaultReplyer
from src.chat.message_receive.chat_stream import get_chat_manager
# 获取当前聊天流
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(self.chat_id)
@@ -73,97 +92,51 @@ class AtAction(BaseAction):
logger.error(f"找不到聊天流: {self.stream_id}")
return False, "聊天流不存在"
# 创建回复器实例
replyer = DefaultReplyer(chat_stream)
# 构建回复对象,将艾特消息作为回复目标
reply_to = f"{user_name}:{at_message}"
extra_info = f"你需要艾特用户 {user_name} 并回复他们说: {at_message}"
# 使用回复器生成回复
success, llm_response, prompt = await replyer.generate_reply_with_context(
reply_to=reply_to,
success, llm_response, _ = await replyer.generate_reply_with_context(
reply_to=f"{user_name}:{at_message}",
extra_info=extra_info,
enable_tool=False, # 艾特回复通常不需要工具调用
enable_tool=False,
from_plugin=False
)
if success and llm_response:
# 获取生成的回复内容
reply_content = llm_response.get("content", "")
if reply_content:
# 获取用户QQ号发送真正的艾特消息
user_id = user_info.get("user_id")
# 发送真正的艾特命令,使用回复器生成的智能内容
await self.send_command(
"SEND_AT_MESSAGE",
args={"qq_id": user_id, "text": reply_content},
display_message=f"艾特用户 {user_name} 并发送智能回复: {reply_content}",
)
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了艾特用户动作:艾特用户 {user_name} 并发送智能回复: {reply_content}",
action_done=True,
)
logger.info(f"成功通过回复器生成智能内容并发送真正的艾特消息给 {user_name}: {reply_content}")
return True, "智能艾特消息发送成功"
else:
logger.warning("回复器生成了空内容")
return False, "回复内容为空"
else:
if not success or not llm_response:
logger.error("回复器生成回复失败")
return False, "回复生成失败"
final_message = llm_response.get("content", "")
if not final_message:
logger.warning("回复器生成了空内容")
return False, "回复内容为空"
await self.send_command(
"SEND_AT_MESSAGE",
args={"group_id": self.chat_stream.group_info.group_id, "qq_id": user_id, "text": final_message},
display_message=f"艾特用户 {user_name} 并发送消息: {final_message}",
)
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行了艾特用户动作:艾特用户 {user_name} 并发送消息: {final_message}",
action_done=True,
)
logger.info(f"成功发送艾特消息给 {user_name}: {final_message}")
return True, "艾特消息发送成功"
except Exception as e:
logger.error(f"执行艾特用户动作时发生异常: {e}", exc_info=True)
await self.store_action_info(
action_build_into_prompt=True,
action_prompt_display=f"执行艾特用户动作失败:{str(e)}",
action_done=False,
)
return False, f"执行失败: {str(e)}"
class AtCommand(BaseCommand):
command_name: str = "at_user"
description: str = "通过名字艾特用户"
command_pattern: str = r"/at\s+@?(?P<name>[\S]+)(?:\s+(?P<text>.*))?"
async def execute(self) -> Tuple[bool, str, bool]:
name = self.matched_groups.get("name")
text = self.matched_groups.get("text", "")
if not name:
await self.send_text("请指定要艾特的用户名称。")
return False, "缺少用户名称", True
person_info_manager = get_person_info_manager()
user_info = await person_info_manager.get_person_info_by_name(name)
if not user_info or not user_info.get("user_id"):
await self.send_text(f"找不到名为 '{name}' 的用户。")
return False, "用户不存在", True
user_id = user_info.get("user_id")
await self.send_command(
"SEND_AT_MESSAGE",
args={"qq_id": user_id, "text": text},
display_message=f"艾特用户 {name} 并发送消息: {text}",
)
return True, "艾特消息已发送", True
@register_plugin
class AtUserPlugin(BasePlugin):
plugin_name: str = "at_user_plugin"
enable_plugin: bool = True
dependencies: list[str] = []
python_dependencies: list[str] = []
python_dependencies: list[str] = ["fuzzywuzzy", "python-Levenshtein"]
config_file_name: str = "config.toml"
config_schema: dict = {}

View File

@@ -1,5 +1,5 @@
[inner]
version = "6.7.9"
version = "6.8.0"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -173,10 +173,6 @@ delta_sigma = 120 # 正态分布的标准差,控制时间间隔的随机程度
# 实验建议:试试 proactive_thinking_interval=0 + delta_sigma 非常大 的纯随机模式!
# 结果保证生成的间隔永远为正数负数会取绝对值最小1秒最大24小时
# --- 大脑/小脑 Planner 配置 ---
planner_size = 5.0 # 小脑sub-planner的尺寸决定每个小脑处理多少个action。数值越小并行度越高但单个小脑的上下文越少。建议范围3.0-8.0
include_personality = false # 是否在小脑决策中包含角色人设信息personality_core、personality_side
[relationship]
enable_relationship = true # 是否启用关系系统

View File

@@ -1,5 +1,5 @@
[inner]
version = "1.3.3"
version = "1.3.4"
# 配置文件版本号迭代规则同bot_config.toml
@@ -142,10 +142,6 @@ model_list = ["siliconflow-deepseek-v3"]
temperature = 0.3
max_tokens = 800
[model_task_config.planner_small] #决策小脑负责决定具体action的模型建议使用速度快的小模型
model_list = ["qwen3-30b"]
temperature = 0.5
max_tokens = 800
[model_task_config.emotion] #负责麦麦的情绪变化
model_list = ["siliconflow-deepseek-v3"]