diff --git a/src/chat/chat_loop/proactive/event_scheduler.py b/src/chat/chat_loop/proactive/event_scheduler.py deleted file mode 100644 index 8c005cb3d..000000000 --- a/src/chat/chat_loop/proactive/event_scheduler.py +++ /dev/null @@ -1,242 +0,0 @@ -""" -事件驱动的智能调度器 -基于asyncio的精确定时事件调度系统,替代轮询机制 -""" - -import asyncio -import traceback -from datetime import datetime -from typing import Dict, Callable, Any, Optional -from dataclasses import dataclass -from src.common.logger import get_logger - -logger = get_logger("event_scheduler") - - -@dataclass -class ScheduledEvent: - """调度事件数据类""" - event_id: str - trigger_time: datetime - callback: Callable - metadata: Dict[str, Any] - task: Optional[asyncio.Task] = None - - -class EventDrivenScheduler: - """事件驱动的调度器""" - - def __init__(self): - self.scheduled_events: Dict[str, ScheduledEvent] = {} - self._shutdown = False - - async def schedule_event( - self, - event_id: str, - trigger_time: datetime, - callback: Callable, - metadata: Dict[str, Any] = None - ) -> bool: - """ - 调度一个事件在指定时间触发 - - Args: - event_id: 事件唯一标识 - trigger_time: 触发时间 - callback: 回调函数 - metadata: 事件元数据 - - Returns: - bool: 调度成功返回True - """ - try: - if metadata is None: - metadata = {} - - # 如果事件已存在,先取消 - if event_id in self.scheduled_events: - await self.cancel_event(event_id) - - # 计算延迟时间 - now = datetime.now() - delay = (trigger_time - now).total_seconds() - - if delay <= 0: - logger.warning(f"事件 {event_id} 的触发时间已过,立即执行") - # 立即执行 - asyncio.create_task(self._execute_callback(event_id, callback, metadata)) - return True - - # 创建调度事件 - scheduled_event = ScheduledEvent( - event_id=event_id, - trigger_time=trigger_time, - callback=callback, - metadata=metadata - ) - - # 创建异步任务 - scheduled_event.task = asyncio.create_task( - self._wait_and_execute(scheduled_event) - ) - - self.scheduled_events[event_id] = scheduled_event - logger.info(f"调度事件 {event_id} 将在 {trigger_time} 触发 (延迟 {delay:.1f} 秒)") - return True - - except Exception as e: - logger.error(f"调度事件失败: {e}") - return False - - async def _wait_and_execute(self, event: ScheduledEvent): - """等待并执行事件""" - try: - now = datetime.now() - delay = (event.trigger_time - now).total_seconds() - - if delay > 0: - await asyncio.sleep(delay) - - # 检查是否被取消 - if self._shutdown or event.event_id not in self.scheduled_events: - return - - # 执行回调 - await self._execute_callback(event.event_id, event.callback, event.metadata) - - except asyncio.CancelledError: - logger.info(f"事件 {event.event_id} 被取消") - except Exception as e: - logger.error(f"执行事件 {event.event_id} 时出错: {e}") - finally: - # 清理已完成的事件 - if event.event_id in self.scheduled_events: - del self.scheduled_events[event.event_id] - - async def _execute_callback(self, event_id: str, callback: Callable, metadata: Dict[str, Any]): - """执行回调函数""" - try: - logger.info(f"执行调度事件: {event_id}") - - # 根据回调函数签名调用 - if asyncio.iscoroutinefunction(callback): - await callback(metadata) - else: - callback(metadata) - - except Exception as e: - logger.error(f"执行回调函数失败: {e}") - logger.error(traceback.format_exc()) - - async def cancel_event(self, event_id: str) -> bool: - """ - 取消一个调度事件 - - Args: - event_id: 事件ID - - Returns: - bool: 取消成功返回True - """ - try: - if event_id in self.scheduled_events: - event = self.scheduled_events[event_id] - if event.task and not event.task.done(): - event.task.cancel() - del self.scheduled_events[event_id] - logger.info(f"取消调度事件: {event_id}") - return True - return False - except Exception as e: - logger.error(f"取消事件失败: {e}") - return False - - async def shutdown(self): - """关闭调度器,取消所有事件""" - self._shutdown = True - for event_id in list(self.scheduled_events.keys()): - await self.cancel_event(event_id) - logger.info("事件调度器已关闭") - - def get_scheduled_events(self) -> Dict[str, ScheduledEvent]: - """获取所有调度事件""" - return self.scheduled_events.copy() - - def get_event_count(self) -> int: - """获取调度事件数量""" - return len(self.scheduled_events) - - -# 全局事件调度器实例 -event_scheduler = EventDrivenScheduler() - - -# 便捷函数 -async def schedule_reminder( - reminder_id: str, - reminder_time: datetime, - chat_id: str, - reminder_content: str, - callback: Callable -): - """ - 调度提醒事件的便捷函数 - - Args: - reminder_id: 提醒唯一标识 - reminder_time: 提醒时间 - chat_id: 聊天ID - reminder_content: 提醒内容 - callback: 回调函数 - """ - metadata = { - "type": "reminder", - "chat_id": chat_id, - "content": reminder_content, - "created_at": datetime.now().isoformat() - } - - return await event_scheduler.schedule_event( - event_id=reminder_id, - trigger_time=reminder_time, - callback=callback, - metadata=metadata - ) - - -async def _execute_reminder_callback(subheartflow_id: str, reminder_text: str, original_message: str = None): - """执行提醒回调函数""" - try: - # 获取对应的subheartflow实例 - from src.chat.heart_flow.heartflow import heartflow - - subflow = await heartflow.get_or_create_subheartflow(subheartflow_id) - if not subflow: - logger.error(f"无法获取subheartflow实例: {subheartflow_id}") - return - - # 创建主动思考事件,触发完整的思考流程 - from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent - - # 使用原始消息来构造reason,如果没有原始消息则使用处理后的内容 - reason_content = original_message if original_message else reminder_text - - event = ProactiveTriggerEvent( - source="reminder_system", - reason=f"定时提醒:{reason_content}", # 这里传递完整的原始消息 - metadata={ - "reminder_text": reminder_text, - "original_message": original_message, - "trigger_time": datetime.now().isoformat() - } - ) - - # 通过subflow的HeartFChatting实例触发主动思考 - await subflow.heart_fc_instance.proactive_thinker.think(event) - - logger.info(f"已触发提醒的主动思考,内容: {reminder_text},没有传递那条消息吗?{original_message}") - - except Exception as e: - logger.error(f"执行提醒回调时发生错误: {e}") - import traceback - traceback.print_exc() \ No newline at end of file diff --git a/src/chat/chat_loop/proactive/proactive_thinker.py b/src/chat/chat_loop/proactive/proactive_thinker.py index 8f304b406..5f0761de7 100644 --- a/src/chat/chat_loop/proactive/proactive_thinker.py +++ b/src/chat/chat_loop/proactive/proactive_thinker.py @@ -162,10 +162,10 @@ class ProactiveThinker: web_search_tool = tool_api.get_tool_instance("web_search") if web_search_tool: try: - search_result_dict = await web_search_tool.execute(search_query=topic, max_results=10) + search_result_dict = await web_search_tool.execute(function_args={"keyword": topic, "max_results": 10}) except TypeError: try: - search_result_dict = await web_search_tool.execute(keyword=topic, max_results=10) + search_result_dict = await web_search_tool.execute(function_args={"keyword": topic, "max_results": 10}) except TypeError: logger.warning(f"{self.context.log_prefix} 网络搜索工具参数不匹配,跳过搜索") news_block = "跳过网络搜索。" diff --git a/src/chat/chat_loop/proactive/smart_reminder_analyzer.py b/src/chat/chat_loop/proactive/smart_reminder_analyzer.py deleted file mode 100644 index 3498cb2f5..000000000 --- a/src/chat/chat_loop/proactive/smart_reminder_analyzer.py +++ /dev/null @@ -1,260 +0,0 @@ -""" -智能提醒分析器 - -使用LLM分析用户消息,识别提醒请求并提取时间和内容信息 -""" - -import re -import json -from datetime import datetime, timedelta -from typing import Optional - -from src.common.logger import get_logger -from src.llm_models.utils_model import LLMRequest -from src.config.config import model_config - -logger = get_logger("smart_reminder") - - -class ReminderEvent: - """提醒事件数据类""" - def __init__(self, user_id: str, reminder_time: datetime, content: str, confidence: float): - self.user_id = user_id - self.reminder_time = reminder_time - self.content = content - self.confidence = confidence - - def __repr__(self): - return f"ReminderEvent(user_id={self.user_id}, time={self.reminder_time}, content={self.content}, confidence={self.confidence})" - - def to_dict(self): - return { - 'user_id': self.user_id, - 'reminder_time': self.reminder_time.isoformat(), - 'content': self.content, - 'confidence': self.confidence - } - - -class SmartReminderAnalyzer: - """智能提醒分析器""" - - def __init__(self): - self.confidence_threshold = 0.7 - # 使用规划器模型进行分析 - self.analyzer_llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, - request_type="reminder_analyzer" - ) - - async def analyze_message(self, user_id: str, message: str) -> Optional[ReminderEvent]: - """分析消息是否包含提醒请求 - - Args: - user_id: 用户ID - message: 用户消息内容 - - Returns: - ReminderEvent对象,如果没有检测到提醒请求则返回None - """ - if not message or len(message.strip()) == 0: - return None - - logger.debug(f"分析消息中的提醒请求: {message}") - - # 使用LLM分析消息 - analysis_result = await self._analyze_with_llm(message) - - if not analysis_result or analysis_result.get('confidence', 0) < 0.5: # 降低置信度阈值 - return None - - try: - # 解析时间 - reminder_time = self._parse_relative_time(analysis_result['relative_time']) - if not reminder_time: - return None - - # 创建提醒事件 - reminder_event = ReminderEvent( - user_id=user_id, - reminder_time=reminder_time, - content=analysis_result.get('content', '提醒'), - confidence=analysis_result['confidence'] - ) - - logger.info(f"检测到提醒请求: {reminder_event}") - return reminder_event - - except Exception as e: - logger.error(f"创建提醒事件失败: {e}") - return None - - async def _analyze_with_llm(self, message: str) -> Optional[dict]: - """使用LLM分析消息中的提醒请求""" - try: - prompt = f"""分析以下消息是否包含提醒请求。 - -消息: {message} -当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - -请判断用户是否想要设置提醒,如果是,请提取: -1. 是否包含提醒请求 (has_reminder: true/false) -2. 置信度 (confidence: 0.0-1.0) -3. 相对时间表达 (relative_time: 标准化的时间表达,例如将'半小时后'转换为'30分钟后', '明天下午三点'转换为'明天15点') -4. 提醒内容 (content: 提醒的具体内容) -5. 分析原因 (reasoning: 判断理由) - -请以JSON格式输出: -{{ - "has_reminder": true/false, - "confidence": 0.0-1.0, - "relative_time": "标准化的时间表达 (例如 '30分钟后', '2小时后')", - "content": "提醒内容", - "reasoning": "判断理由" -}}""" - - response, _ = await self.analyzer_llm.generate_response_async(prompt=prompt) - if not response: - return None - - # 解析JSON响应,处理可能的markdown包装 - try: - # 清理响应文本 - cleaned_response = response.strip() - - # 移除markdown代码块包装 - if cleaned_response.startswith('```json'): - cleaned_response = cleaned_response[7:] # 移除 ```json - elif cleaned_response.startswith('```'): - cleaned_response = cleaned_response[3:] # 移除 ``` - - if cleaned_response.endswith('```'): - cleaned_response = cleaned_response[:-3] # 移除结尾的 ``` - - cleaned_response = cleaned_response.strip() - - # 解析JSON - result = json.loads(cleaned_response) - if result.get('has_reminder', False): - logger.info(f"LLM分析结果: {result}") - return result - except json.JSONDecodeError as e: - logger.error(f"LLM响应JSON解析失败: {response}, Error: {e}") - # 尝试使用更宽松的JSON修复 - try: - import re - # 提取JSON部分的正则表达式 - json_match = re.search(r'\{.*\}', cleaned_response, re.DOTALL) - if json_match: - json_str = json_match.group() - result = json.loads(json_str) - if result.get('has_reminder', False): - logger.info(f"备用解析成功: {result}") - return result - except Exception as fallback_error: - logger.error(f"备用JSON解析也失败了: {fallback_error}") - - except Exception as e: - logger.error(f"LLM分析失败: {e}") - - return None - - def _parse_relative_time(self, time_expr: str) -> Optional[datetime]: - """解析时间表达式(支持相对时间和绝对时间)""" - try: - now = datetime.now() - - # 1. 匹配相对时间:X分钟后,包括中文数字 - # 先尝试匹配阿拉伯数字 - minutes_match = re.search(r'(\d+)\s*分钟后', time_expr) - if minutes_match: - minutes = int(minutes_match.group(1)) - result = now + timedelta(minutes=minutes) - logger.info(f"相对时间解析结果: timedelta(minutes={minutes}) -> {result}") - return result - - # 匹配中文数字分钟 - chinese_minutes_patterns = [ - (r'一分钟后', 1), (r'二分钟后', 2), (r'两分钟后', 2), (r'三分钟后', 3), (r'四分钟后', 4), (r'五分钟后', 5), - (r'六分钟后', 6), (r'七分钟后', 7), (r'八分钟后', 8), (r'九分钟后', 9), (r'十分钟后', 10), - (r'十一分钟后', 11), (r'十二分钟后', 12), (r'十三分钟后', 13), (r'十四分钟后', 14), (r'十五分钟后', 15), - (r'二十分钟后', 20), (r'三十分钟后', 30), (r'四十分钟后', 40), (r'五十分钟后', 50), (r'六十分钟后', 60) - ] - - for pattern, minutes in chinese_minutes_patterns: - if re.search(pattern, time_expr): - result = now + timedelta(minutes=minutes) - logger.info(f"中文时间解析结果: {pattern} -> {minutes}分钟 -> {result}") - return result - - # 2. 匹配相对时间:X小时后 - hours_match = re.search(r'(\d+)\s*小时后', time_expr) - if hours_match: - hours = int(hours_match.group(1)) - result = now + timedelta(hours=hours) - logger.info(f"相对时间解析结果: timedelta(hours={hours})") - return result - - # 3. 匹配相对时间:X秒后 - seconds_match = re.search(r'(\d+)\s*秒后', time_expr) - if seconds_match: - seconds = int(seconds_match.group(1)) - result = now + timedelta(seconds=seconds) - logger.info(f"相对时间解析结果: timedelta(seconds={seconds})") - return result - - # 4. 匹配明天+具体时间:明天下午2点、明天上午10点 - tomorrow_match = re.search(r'明天.*?(\d{1,2})\s*[点时]', time_expr) - if tomorrow_match: - hour = int(tomorrow_match.group(1)) - # 如果是下午且小于12,加12小时 - if '下午' in time_expr and hour < 12: - hour += 12 - elif '上午' in time_expr and hour == 12: - hour = 0 - - tomorrow = now + timedelta(days=1) - result = tomorrow.replace(hour=hour, minute=0, second=0, microsecond=0) - logger.info(f"绝对时间解析结果: 明天{hour}点") - return result - - # 5. 匹配今天+具体时间:今天下午3点、今天晚上8点 - today_match = re.search(r'今天.*?(\d{1,2})\s*[点时]', time_expr) - if today_match: - hour = int(today_match.group(1)) - # 如果是下午且小于12,加12小时 - if '下午' in time_expr and hour < 12: - hour += 12 - elif '晚上' in time_expr and hour < 12: - hour += 12 - elif '上午' in time_expr and hour == 12: - hour = 0 - - result = now.replace(hour=hour, minute=0, second=0, microsecond=0) - # 如果时间已过,设为明天 - if result <= now: - result += timedelta(days=1) - - logger.info(f"绝对时间解析结果: 今天{hour}点") - return result - - # 6. 匹配纯数字时间:14点、2点 - pure_time_match = re.search(r'(\d{1,2})\s*[点时]', time_expr) - if pure_time_match: - hour = int(pure_time_match.group(1)) - result = now.replace(hour=hour, minute=0, second=0, microsecond=0) - # 如果时间已过,设为明天 - if result <= now: - result += timedelta(days=1) - - logger.info(f"绝对时间解析结果: {hour}点") - return result - - except Exception as e: - logger.error(f"时间解析失败: {time_expr}, Error: {e}") - - return None - - -# 全局智能提醒分析器实例 -smart_reminder_analyzer = SmartReminderAnalyzer() \ No newline at end of file