feat(proactive-thinking): 利用聚焦能量增强能量计算和调度

-重构关系EnergyCalculator,使用聊天流兴趣评分代替用户关系评分
-更新主动思维调度器,使用聊天流中的焦点能量进行间隔计算
-通过更详细的信息改进整个主动思维系统的日志记录
-将chat_stream参数添加到插件工具构造函数中,以更好地处理上下文
-增强调度和事件处理中的错误处理和调试信息
这些变化通过以下方式改善了主动思维系统:
1.使用聊天流中的实时焦点能量,而不是静态用户关系评分
2.根据当前对话参与度提供更准确和动态的日程安排
3.添加全面的日志记录,以便更好地进行调试和监控
4.通过工具中的chat_stream参数确保正确的上下文传播
This commit is contained in:
Windpicker-owo
2025-10-31 15:09:32 +08:00
parent 4aa19cb1ef
commit 94537cf57c
6 changed files with 96 additions and 62 deletions

View File

@@ -38,8 +38,8 @@ class BilibiliTool(BaseTool):
),
]
def __init__(self, plugin_config: dict | None = None):
super().__init__(plugin_config)
def __init__(self, plugin_config: dict | None = None, chat_stream=None):
super().__init__(plugin_config, chat_stream)
self.analyzer = get_bilibili_analyzer()
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:

View File

@@ -191,24 +191,35 @@ class RecencyEnergyCalculator(EnergyCalculator):
class RelationshipEnergyCalculator(EnergyCalculator):
"""关系能量计算器"""
"""关系能量计算器 - 基于聊天流兴趣度"""
async def calculate(self, context: dict[str, Any]) -> float:
"""基于关系计算能量"""
user_id = context.get("user_id")
if not user_id:
"""基于聊天流兴趣度计算能量"""
stream_id = context.get("stream_id")
if not stream_id:
return 0.3
# 使用统一的评分API获取关系分
# 从数据库获取聊天流兴趣分数
try:
from src.plugin_system.apis.scoring_api import scoring_api
from src.common.database.sqlalchemy_database_api import get_db_session
from src.common.database.sqlalchemy_models import ChatStreams
from sqlalchemy import select
relationship_score = await scoring_api.get_user_relationship_score(user_id)
logger.debug(f"使用统一评分API计算关系分: {relationship_score:.3f}")
return relationship_score
async with get_db_session() as session:
stmt = select(ChatStreams).where(ChatStreams.stream_id == stream_id)
result = await session.execute(stmt)
stream = result.scalar_one_or_none()
if stream and stream.stream_interest_score is not None:
interest_score = float(stream.stream_interest_score)
logger.debug(f"使用聊天流兴趣度计算关系能量: {interest_score:.3f}")
return interest_score
else:
logger.debug(f"聊天流 {stream_id} 无兴趣分数,使用默认值")
return 0.3
except Exception as e:
logger.warning(f"关系分计算失败,使用默认值: {e}")
logger.warning(f"获取聊天流兴趣度失败,使用默认值: {e}")
return 0.3 # 默认基础分
def get_weight(self) -> float:

View File

@@ -34,46 +34,51 @@ class ProactiveThinkingReplyHandler(BaseEventHandler):
Returns:
HandlerResult: 处理结果
"""
logger.info("[事件] ProactiveThinkingReplyHandler 开始执行")
logger.info("[主动思考事件] ProactiveThinkingReplyHandler 开始执行")
logger.info(f"[主动思考事件] 接收到的参数: {kwargs}")
if not kwargs:
logger.warning("[主动思考事件] kwargs 为空,跳过处理")
return HandlerResult(success=True, continue_process=True, message=None)
stream_id = kwargs.get("stream_id")
if not stream_id:
logger.warning("Reply事件缺少stream_id参数")
logger.warning(f"[主动思考事件] Reply事件缺少stream_id参数kwargs={kwargs}")
return HandlerResult(success=True, continue_process=True, message=None)
logger.info(f"[事件] 收到 AFTER_SEND 事件stream_id={stream_id}")
logger.info(f"[主动思考事件] 收到 AFTER_SEND 事件stream_id={stream_id}")
try:
from src.config.config import global_config
# 检查是否启用reply重置
logger.info(f"[主动思考事件] reply_reset_enabled={global_config.proactive_thinking.reply_reset_enabled}")
if not global_config.proactive_thinking.reply_reset_enabled:
logger.info(f"[主动思考事件] reply_reset_enabled 为 False跳过重置")
return HandlerResult(success=True, continue_process=True, message=None)
# 检查是否被暂停
was_paused = await proactive_thinking_scheduler.is_paused(stream_id)
logger.info(f"[主动思考事件] 聊天流 {stream_id} 暂停状态: {was_paused}")
if was_paused:
logger.info(f"检测到reply事件聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复")
logger.info(f"[主动思考事件] 检测到reply事件聊天流 {stream_id} 之前因抛出话题而暂停,现在恢复")
# 重置定时任务(这会自动清除暂停标记并创建新任务)
logger.debug(f"[事件] 准备调用 schedule_proactive_thinking")
logger.info(f"[主动思考事件] 准备调用 schedule_proactive_thinkingstream_id={stream_id}")
success = await proactive_thinking_scheduler.schedule_proactive_thinking(stream_id)
logger.debug(f"[事件] schedule_proactive_thinking 调用完成success={success}")
logger.info(f"[主动思考事件] schedule_proactive_thinking 调用完成success={success}")
if success:
if was_paused:
logger.info(f"聊天流 {stream_id} 的主动思考已恢复并重置")
logger.info(f"[主动思考事件] ✅ 聊天流 {stream_id} 的主动思考已恢复并重置")
else:
logger.debug(f"聊天流 {stream_id} 的主动思考定时任务已重置")
logger.info(f"[主动思考事件] ✅ 聊天流 {stream_id} 的主动思考定时任务已重置")
else:
logger.warning(f"重置聊天流 {stream_id} 的主动思考任务失败")
logger.warning(f"[主动思考事件] ❌ 重置聊天流 {stream_id} 的主动思考任务失败")
except Exception as e:
logger.error(f"处理reply事件时出错: {e}", exc_info=True)
logger.error(f"[主动思考事件] ❌ 处理reply事件时出错: {e}", exc_info=True)
# 总是继续处理其他handler
return HandlerResult(success=True, continue_process=True, message=None)

View File

@@ -44,37 +44,37 @@ class ProactiveThinkingScheduler:
from src.config.config import global_config
self.config = global_config.proactive_thinking
def _calculate_interval(self, interest_score: float) -> int:
"""根据兴趣分数计算触发间隔
def _calculate_interval(self, focus_energy: float) -> int:
"""根据 focus_energy 计算触发间隔
Args:
interest_score: 聊天流兴趣分数 (0.0-1.0)
focus_energy: 聊天流的 focus_energy 值 (0.0-1.0)
Returns:
int: 触发间隔(秒)
公式:
- 兴趣分数越高,间隔越短(更频繁思考)
- interval = base_interval * (factor - interest_score)
- 例如:interest_score=0.5 -> interval=1800*1.5=2700秒(45分钟)
- interest_score=0.8 -> interval=1800*1.2=2160秒(36分钟)
- interest_score=0.2 -> interval=1800*1.8=3240秒(54分钟)
- focus_energy 越高,间隔越短(更频繁思考)
- interval = base_interval * (factor - focus_energy)
- 例如:focus_energy=0.5 -> interval=1800*1.5=2700秒(45分钟)
- focus_energy=0.8 -> interval=1800*1.2=2160秒(36分钟)
- focus_energy=0.2 -> interval=1800*1.8=3240秒(54分钟)
"""
# 如果不使用兴趣分数,直接返回基础间隔
# 如果不使用 focus_energy,直接返回基础间隔
if not self.config.use_interest_score:
return self.config.base_interval
# 确保分数在有效范围内
interest_score = max(0.0, min(1.0, interest_score))
# 确保在有效范围内
focus_energy = max(0.0, min(1.0, focus_energy))
# 计算间隔:分数越高,系数越小,间隔越短
factor = self.config.interest_score_factor - interest_score
# 计算间隔:focus_energy 越高,系数越小,间隔越短
factor = self.config.interest_score_factor - focus_energy
interval = int(self.config.base_interval * factor)
# 限制在最小和最大间隔之间
interval = max(self.config.min_interval, min(self.config.max_interval, interval))
logger.debug(f"兴趣分数 {interest_score:.2f} -> 触发间隔 {interval}秒 ({interval/60:.1f}分钟)")
logger.debug(f"Focus Energy {focus_energy:.3f} -> 触发间隔 {interval}秒 ({interval/60:.1f}分钟)")
return interval
def _check_whitelist_blacklist(self, stream_config: str) -> bool:
@@ -195,28 +195,36 @@ class ProactiveThinkingScheduler:
else:
return current_time >= start or current_time <= end
async def _get_stream_interest_score(self, stream_id: str) -> float:
"""从数据库获取聊天流的兴趣分数
async def _get_stream_focus_energy(self, stream_id: str) -> float:
"""获取聊天流的 focus_energy
Args:
stream_id: 聊天流ID
Returns:
float: 兴趣分数默认0.5
float: focus_energy 值默认0.5
"""
try:
async with get_db_session() as session:
stmt = select(ChatStreams).where(ChatStreams.stream_id == stream_id)
result = await session.execute(stmt)
stream = result.scalar_one_or_none()
# 从聊天管理器获取聊天流
from src.chat.message_receive.chat_stream import get_chat_manager
if stream and stream.stream_interest_score is not None:
return float(stream.stream_interest_score)
else:
return 0.5 # 默认中等兴趣
logger.debug(f"[调度器] 获取聊天管理器")
chat_manager = get_chat_manager()
logger.debug(f"[调度器] 从聊天管理器获取聊天流 {stream_id}")
chat_stream = await chat_manager.get_stream(stream_id)
if chat_stream:
# 计算并获取最新的 focus_energy
logger.debug(f"[调度器] 找到聊天流,开始计算 focus_energy")
focus_energy = await chat_stream.calculate_focus_energy()
logger.info(f"[调度器] 聊天流 {stream_id} 的 focus_energy: {focus_energy:.3f}")
return focus_energy
else:
logger.warning(f"[调度器] ⚠️ 未找到聊天流 {stream_id},使用默认 focus_energy=0.5")
return 0.5
except Exception as e:
logger.error(f"获取聊天流 {stream_id} 兴趣分数失败: {e}")
logger.error(f"[调度器] ❌ 获取聊天流 {stream_id} 的 focus_energy 失败: {e}", exc_info=True)
return 0.5
async def schedule_proactive_thinking(self, stream_id: str) -> bool:
@@ -228,22 +236,28 @@ class ProactiveThinkingScheduler:
Returns:
bool: 是否成功创建/重置任务
"""
logger.info(f"[调度器] 开始为聊天流 {stream_id} 创建/重置主动思考任务")
try:
async with self._lock:
# 如果该流因抛出话题而暂停,先清除暂停标记
if stream_id in self._paused_streams:
logger.info(f"清除聊天流 {stream_id} 的暂停标记")
logger.info(f"[调度器] 清除聊天流 {stream_id} 的暂停标记")
self._paused_streams.discard(stream_id)
# 如果已经有任务,先移除
if stream_id in self._stream_schedules:
old_schedule_id = self._stream_schedules[stream_id]
logger.info(f"[调度器] 移除聊天流 {stream_id} 的旧任务schedule_id={old_schedule_id}")
await unified_scheduler.remove_schedule(old_schedule_id)
logger.debug(f"移除聊天流 {stream_id} 的旧任务")
logger.info(f"[调度器] 旧任务已移除")
# 获取兴趣分数并计算间隔
interest_score = await self._get_stream_interest_score(stream_id)
interval_seconds = self._calculate_interval(interest_score)
# 获取 focus_energy 并计算间隔
logger.info(f"[调度器] 开始获取聊天流 {stream_id} 的 focus_energy")
focus_energy = await self._get_stream_focus_energy(stream_id)
logger.info(f"[调度器] 获取到 focus_energy={focus_energy:.3f}")
interval_seconds = self._calculate_interval(focus_energy)
logger.info(f"[调度器] 计算得到触发间隔={interval_seconds}秒 ({interval_seconds/60:.1f}分钟)")
# 导入回调函数(延迟导入避免循环依赖)
from src.plugins.built_in.affinity_flow_chatter.proactive_thinking_executor import (
@@ -251,6 +265,7 @@ class ProactiveThinkingScheduler:
)
# 创建新任务
logger.info(f"[调度器] 开始创建新的调度任务")
schedule_id = await unified_scheduler.create_schedule(
callback=execute_proactive_thinking,
trigger_type=TriggerType.TIME,
@@ -263,19 +278,22 @@ class ProactiveThinkingScheduler:
)
self._stream_schedules[stream_id] = schedule_id
logger.info(f"[调度器] 新任务已创建schedule_id={schedule_id}")
# 计算下次触发时间
next_run_time = datetime.now() + timedelta(seconds=interval_seconds)
logger.info(
f"为聊天流 {stream_id} 创建主动思考任务\n"
f" - 兴趣分数: {interest_score:.2f}\n"
f"[调度器] ✅ 为聊天流 {stream_id} 创建主动思考任务成功\n"
f" - Focus Energy: {focus_energy:.3f}\n"
f" - 触发间隔: {interval_seconds}秒 ({interval_seconds/60:.1f}分钟)\n"
f" - 下次触发: {next_run_time.strftime('%Y-%m-%d %H:%M:%S')}"
f" - 下次触发: {next_run_time.strftime('%Y-%m-%d %H:%M:%S')}\n"
f" - Schedule ID: {schedule_id}"
)
return True
except Exception as e:
logger.error(f"[调度器] ❌ 为聊天流 {stream_id} 创建主动思考任务失败: {e}", exc_info=True)
logger.error(f"为聊天流 {stream_id} 创建主动思考任务失败: {e}", exc_info=True)
return False
@@ -308,7 +326,7 @@ class ProactiveThinkingScheduler:
return success
except Exception as e:
logger.error(f"暂停聊天流 {stream_id} 的主动思考任务失败: {e}", exc_info=True)
# 错误日志已在上面记录
return False
async def resume_proactive_thinking(self, stream_id: str) -> bool:

View File

@@ -34,8 +34,8 @@ class URLParserTool(BaseTool):
("urls", ToolParamType.STRING, "要理解的网站", True, None),
]
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
def __init__(self, plugin_config=None, chat_stream=None):
super().__init__(plugin_config, chat_stream)
self._initialize_exa_clients()
def _initialize_exa_clients(self):

View File

@@ -43,8 +43,8 @@ class WebSurfingTool(BaseTool):
),
] # type: ignore
def __init__(self, plugin_config=None):
super().__init__(plugin_config)
def __init__(self, plugin_config=None, chat_stream=None):
super().__init__(plugin_config, chat_stream)
# 初始化搜索引擎
self.engines = {
"exa": ExaSearchEngine(),