试图引爆定时提醒,我有一个更好的东西

This commit is contained in:
minecraft1024a
2025-09-13 12:19:15 +08:00
committed by Windpicker-owo
parent d1100913fa
commit 7d06dd1bce
3 changed files with 2 additions and 504 deletions

View File

@@ -1,242 +0,0 @@
"""
事件驱动的智能调度器
基于asyncio的精确定时事件调度系统替代轮询机制
"""
import asyncio
import traceback
from datetime import datetime
from typing import Dict, Callable, Any, Optional
from dataclasses import dataclass
from src.common.logger import get_logger
logger = get_logger("event_scheduler")
@dataclass
class ScheduledEvent:
"""调度事件数据类"""
event_id: str
trigger_time: datetime
callback: Callable
metadata: Dict[str, Any]
task: Optional[asyncio.Task] = None
class EventDrivenScheduler:
"""事件驱动的调度器"""
def __init__(self):
self.scheduled_events: Dict[str, ScheduledEvent] = {}
self._shutdown = False
async def schedule_event(
self,
event_id: str,
trigger_time: datetime,
callback: Callable,
metadata: Dict[str, Any] = None
) -> bool:
"""
调度一个事件在指定时间触发
Args:
event_id: 事件唯一标识
trigger_time: 触发时间
callback: 回调函数
metadata: 事件元数据
Returns:
bool: 调度成功返回True
"""
try:
if metadata is None:
metadata = {}
# 如果事件已存在,先取消
if event_id in self.scheduled_events:
await self.cancel_event(event_id)
# 计算延迟时间
now = datetime.now()
delay = (trigger_time - now).total_seconds()
if delay <= 0:
logger.warning(f"事件 {event_id} 的触发时间已过,立即执行")
# 立即执行
asyncio.create_task(self._execute_callback(event_id, callback, metadata))
return True
# 创建调度事件
scheduled_event = ScheduledEvent(
event_id=event_id,
trigger_time=trigger_time,
callback=callback,
metadata=metadata
)
# 创建异步任务
scheduled_event.task = asyncio.create_task(
self._wait_and_execute(scheduled_event)
)
self.scheduled_events[event_id] = scheduled_event
logger.info(f"调度事件 {event_id} 将在 {trigger_time} 触发 (延迟 {delay:.1f} 秒)")
return True
except Exception as e:
logger.error(f"调度事件失败: {e}")
return False
async def _wait_and_execute(self, event: ScheduledEvent):
"""等待并执行事件"""
try:
now = datetime.now()
delay = (event.trigger_time - now).total_seconds()
if delay > 0:
await asyncio.sleep(delay)
# 检查是否被取消
if self._shutdown or event.event_id not in self.scheduled_events:
return
# 执行回调
await self._execute_callback(event.event_id, event.callback, event.metadata)
except asyncio.CancelledError:
logger.info(f"事件 {event.event_id} 被取消")
except Exception as e:
logger.error(f"执行事件 {event.event_id} 时出错: {e}")
finally:
# 清理已完成的事件
if event.event_id in self.scheduled_events:
del self.scheduled_events[event.event_id]
async def _execute_callback(self, event_id: str, callback: Callable, metadata: Dict[str, Any]):
"""执行回调函数"""
try:
logger.info(f"执行调度事件: {event_id}")
# 根据回调函数签名调用
if asyncio.iscoroutinefunction(callback):
await callback(metadata)
else:
callback(metadata)
except Exception as e:
logger.error(f"执行回调函数失败: {e}")
logger.error(traceback.format_exc())
async def cancel_event(self, event_id: str) -> bool:
"""
取消一个调度事件
Args:
event_id: 事件ID
Returns:
bool: 取消成功返回True
"""
try:
if event_id in self.scheduled_events:
event = self.scheduled_events[event_id]
if event.task and not event.task.done():
event.task.cancel()
del self.scheduled_events[event_id]
logger.info(f"取消调度事件: {event_id}")
return True
return False
except Exception as e:
logger.error(f"取消事件失败: {e}")
return False
async def shutdown(self):
"""关闭调度器,取消所有事件"""
self._shutdown = True
for event_id in list(self.scheduled_events.keys()):
await self.cancel_event(event_id)
logger.info("事件调度器已关闭")
def get_scheduled_events(self) -> Dict[str, ScheduledEvent]:
"""获取所有调度事件"""
return self.scheduled_events.copy()
def get_event_count(self) -> int:
"""获取调度事件数量"""
return len(self.scheduled_events)
# 全局事件调度器实例
event_scheduler = EventDrivenScheduler()
# 便捷函数
async def schedule_reminder(
reminder_id: str,
reminder_time: datetime,
chat_id: str,
reminder_content: str,
callback: Callable
):
"""
调度提醒事件的便捷函数
Args:
reminder_id: 提醒唯一标识
reminder_time: 提醒时间
chat_id: 聊天ID
reminder_content: 提醒内容
callback: 回调函数
"""
metadata = {
"type": "reminder",
"chat_id": chat_id,
"content": reminder_content,
"created_at": datetime.now().isoformat()
}
return await event_scheduler.schedule_event(
event_id=reminder_id,
trigger_time=reminder_time,
callback=callback,
metadata=metadata
)
async def _execute_reminder_callback(subheartflow_id: str, reminder_text: str, original_message: str = None):
"""执行提醒回调函数"""
try:
# 获取对应的subheartflow实例
from src.chat.heart_flow.heartflow import heartflow
subflow = await heartflow.get_or_create_subheartflow(subheartflow_id)
if not subflow:
logger.error(f"无法获取subheartflow实例: {subheartflow_id}")
return
# 创建主动思考事件,触发完整的思考流程
from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent
# 使用原始消息来构造reason如果没有原始消息则使用处理后的内容
reason_content = original_message if original_message else reminder_text
event = ProactiveTriggerEvent(
source="reminder_system",
reason=f"定时提醒:{reason_content}", # 这里传递完整的原始消息
metadata={
"reminder_text": reminder_text,
"original_message": original_message,
"trigger_time": datetime.now().isoformat()
}
)
# 通过subflow的HeartFChatting实例触发主动思考
await subflow.heart_fc_instance.proactive_thinker.think(event)
logger.info(f"已触发提醒的主动思考,内容: {reminder_text},没有传递那条消息吗?{original_message}")
except Exception as e:
logger.error(f"执行提醒回调时发生错误: {e}")
import traceback
traceback.print_exc()

View File

@@ -162,10 +162,10 @@ class ProactiveThinker:
web_search_tool = tool_api.get_tool_instance("web_search")
if web_search_tool:
try:
search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10)
search_result_dict = await web_search_tool.execute(function_args={"keyword": topic, "max_results": 10})
except TypeError:
try:
search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10)
search_result_dict = await web_search_tool.execute(function_args={"keyword": topic, "max_results": 10})
except TypeError:
logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索")
news_block = "跳过网络搜索。"

View File

@@ -1,260 +0,0 @@
"""
智能提醒分析器
使用LLM分析用户消息识别提醒请求并提取时间和内容信息
"""
import re
import json
from datetime import datetime, timedelta
from typing import Optional
from src.common.logger import get_logger
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
logger = get_logger("smart_reminder")
class ReminderEvent:
"""提醒事件数据类"""
def __init__(self, user_id: str, reminder_time: datetime, content: str, confidence: float):
self.user_id = user_id
self.reminder_time = reminder_time
self.content = content
self.confidence = confidence
def __repr__(self):
return f"ReminderEvent(user_id={self.user_id}, time={self.reminder_time}, content={self.content}, confidence={self.confidence})"
def to_dict(self):
return {
'user_id': self.user_id,
'reminder_time': self.reminder_time.isoformat(),
'content': self.content,
'confidence': self.confidence
}
class SmartReminderAnalyzer:
"""智能提醒分析器"""
def __init__(self):
self.confidence_threshold = 0.7
# 使用规划器模型进行分析
self.analyzer_llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="reminder_analyzer"
)
async def analyze_message(self, user_id: str, message: str) -> Optional[ReminderEvent]:
"""分析消息是否包含提醒请求
Args:
user_id: 用户ID
message: 用户消息内容
Returns:
ReminderEvent对象如果没有检测到提醒请求则返回None
"""
if not message or len(message.strip()) == 0:
return None
logger.debug(f"分析消息中的提醒请求: {message}")
# 使用LLM分析消息
analysis_result = await self._analyze_with_llm(message)
if not analysis_result or analysis_result.get('confidence', 0) < 0.5: # 降低置信度阈值
return None
try:
# 解析时间
reminder_time = self._parse_relative_time(analysis_result['relative_time'])
if not reminder_time:
return None
# 创建提醒事件
reminder_event = ReminderEvent(
user_id=user_id,
reminder_time=reminder_time,
content=analysis_result.get('content', '提醒'),
confidence=analysis_result['confidence']
)
logger.info(f"检测到提醒请求: {reminder_event}")
return reminder_event
except Exception as e:
logger.error(f"创建提醒事件失败: {e}")
return None
async def _analyze_with_llm(self, message: str) -> Optional[dict]:
"""使用LLM分析消息中的提醒请求"""
try:
prompt = f"""分析以下消息是否包含提醒请求。
消息: {message}
当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
请判断用户是否想要设置提醒,如果是,请提取:
1. 是否包含提醒请求 (has_reminder: true/false)
2. 置信度 (confidence: 0.0-1.0)
3. 相对时间表达 (relative_time: 标准化的时间表达,例如将'半小时后'转换为'30分钟后', '明天下午三点'转换为'明天15点')
4. 提醒内容 (content: 提醒的具体内容)
5. 分析原因 (reasoning: 判断理由)
请以JSON格式输出:
{{
"has_reminder": true/false,
"confidence": 0.0-1.0,
"relative_time": "标准化的时间表达 (例如 '30分钟后', '2小时后')",
"content": "提醒内容",
"reasoning": "判断理由"
}}"""
response, _ = await self.analyzer_llm.generate_response_async(prompt=prompt)
if not response:
return None
# 解析JSON响应处理可能的markdown包装
try:
# 清理响应文本
cleaned_response = response.strip()
# 移除markdown代码块包装
if cleaned_response.startswith('```json'):
cleaned_response = cleaned_response[7:] # 移除 ```json
elif cleaned_response.startswith('```'):
cleaned_response = cleaned_response[3:] # 移除 ```
if cleaned_response.endswith('```'):
cleaned_response = cleaned_response[:-3] # 移除结尾的 ```
cleaned_response = cleaned_response.strip()
# 解析JSON
result = json.loads(cleaned_response)
if result.get('has_reminder', False):
logger.info(f"LLM分析结果: {result}")
return result
except json.JSONDecodeError as e:
logger.error(f"LLM响应JSON解析失败: {response}, Error: {e}")
# 尝试使用更宽松的JSON修复
try:
import re
# 提取JSON部分的正则表达式
json_match = re.search(r'\{.*\}', cleaned_response, re.DOTALL)
if json_match:
json_str = json_match.group()
result = json.loads(json_str)
if result.get('has_reminder', False):
logger.info(f"备用解析成功: {result}")
return result
except Exception as fallback_error:
logger.error(f"备用JSON解析也失败了: {fallback_error}")
except Exception as e:
logger.error(f"LLM分析失败: {e}")
return None
def _parse_relative_time(self, time_expr: str) -> Optional[datetime]:
"""解析时间表达式(支持相对时间和绝对时间)"""
try:
now = datetime.now()
# 1. 匹配相对时间X分钟后包括中文数字
# 先尝试匹配阿拉伯数字
minutes_match = re.search(r'(\d+)\s*分钟后', time_expr)
if minutes_match:
minutes = int(minutes_match.group(1))
result = now + timedelta(minutes=minutes)
logger.info(f"相对时间解析结果: timedelta(minutes={minutes}) -> {result}")
return result
# 匹配中文数字分钟
chinese_minutes_patterns = [
(r'一分钟后', 1), (r'二分钟后', 2), (r'两分钟后', 2), (r'三分钟后', 3), (r'四分钟后', 4), (r'五分钟后', 5),
(r'六分钟后', 6), (r'七分钟后', 7), (r'八分钟后', 8), (r'九分钟后', 9), (r'十分钟后', 10),
(r'十一分钟后', 11), (r'十二分钟后', 12), (r'十三分钟后', 13), (r'十四分钟后', 14), (r'十五分钟后', 15),
(r'二十分钟后', 20), (r'三十分钟后', 30), (r'四十分钟后', 40), (r'五十分钟后', 50), (r'六十分钟后', 60)
]
for pattern, minutes in chinese_minutes_patterns:
if re.search(pattern, time_expr):
result = now + timedelta(minutes=minutes)
logger.info(f"中文时间解析结果: {pattern} -> {minutes}分钟 -> {result}")
return result
# 2. 匹配相对时间X小时后
hours_match = re.search(r'(\d+)\s*小时后', time_expr)
if hours_match:
hours = int(hours_match.group(1))
result = now + timedelta(hours=hours)
logger.info(f"相对时间解析结果: timedelta(hours={hours})")
return result
# 3. 匹配相对时间X秒后
seconds_match = re.search(r'(\d+)\s*秒后', time_expr)
if seconds_match:
seconds = int(seconds_match.group(1))
result = now + timedelta(seconds=seconds)
logger.info(f"相对时间解析结果: timedelta(seconds={seconds})")
return result
# 4. 匹配明天+具体时间明天下午2点、明天上午10点
tomorrow_match = re.search(r'明天.*?(\d{1,2})\s*[点时]', time_expr)
if tomorrow_match:
hour = int(tomorrow_match.group(1))
# 如果是下午且小于12加12小时
if '下午' in time_expr and hour < 12:
hour += 12
elif '上午' in time_expr and hour == 12:
hour = 0
tomorrow = now + timedelta(days=1)
result = tomorrow.replace(hour=hour, minute=0, second=0, microsecond=0)
logger.info(f"绝对时间解析结果: 明天{hour}")
return result
# 5. 匹配今天+具体时间今天下午3点、今天晚上8点
today_match = re.search(r'今天.*?(\d{1,2})\s*[点时]', time_expr)
if today_match:
hour = int(today_match.group(1))
# 如果是下午且小于12加12小时
if '下午' in time_expr and hour < 12:
hour += 12
elif '晚上' in time_expr and hour < 12:
hour += 12
elif '上午' in time_expr and hour == 12:
hour = 0
result = now.replace(hour=hour, minute=0, second=0, microsecond=0)
# 如果时间已过,设为明天
if result <= now:
result += timedelta(days=1)
logger.info(f"绝对时间解析结果: 今天{hour}")
return result
# 6. 匹配纯数字时间14点、2点
pure_time_match = re.search(r'(\d{1,2})\s*[点时]', time_expr)
if pure_time_match:
hour = int(pure_time_match.group(1))
result = now.replace(hour=hour, minute=0, second=0, microsecond=0)
# 如果时间已过,设为明天
if result <= now:
result += timedelta(days=1)
logger.info(f"绝对时间解析结果: {hour}")
return result
except Exception as e:
logger.error(f"时间解析失败: {time_expr}, Error: {e}")
return None
# 全局智能提醒分析器实例
smart_reminder_analyzer = SmartReminderAnalyzer()