diff --git a/src/chat/frequency_analyzer/analyzer.py b/src/chat/frequency_analyzer/analyzer.py new file mode 100644 index 000000000..bd6331465 --- /dev/null +++ b/src/chat/frequency_analyzer/analyzer.py @@ -0,0 +1,144 @@ +""" +Chat Frequency Analyzer +======================= + +本模块负责分析用户的聊天时间戳,以识别出他们最活跃的聊天时段(高峰时段)。 + +核心功能: +- 使用滑动窗口算法来检测时间戳集中的区域。 +- 提供接口查询指定用户当前是否处于其聊天高峰时段内。 +- 结果会被缓存以提高性能。 + +可配置参数: +- ANALYSIS_WINDOW_HOURS: 用于分析的时间窗口大小(小时)。 +- MIN_CHATS_FOR_PEAK: 在一个窗口内需要多少次聊天才能被认为是高峰时段。 +- MIN_GAP_BETWEEN_PEAKS_HOURS: 两个独立高峰时段之间的最小间隔(小时)。 +""" +import time as time_module +from datetime import datetime, timedelta, time +from typing import List, Tuple, Optional + +from .tracker import chat_frequency_tracker + +# --- 可配置参数 --- +# 用于分析的时间窗口大小(小时) +ANALYSIS_WINDOW_HOURS = 2 +# 触发高峰时段所需的最小聊天次数 +MIN_CHATS_FOR_PEAK = 4 +# 两个独立高峰时段之间的最小间隔(小时) +MIN_GAP_BETWEEN_PEAKS_HOURS = 1 + + +class ChatFrequencyAnalyzer: + """ + 分析聊天时间戳,以识别用户的高频聊天时段。 + """ + + def __init__(self): + # 缓存分析结果,避免重复计算 + # 格式: { "chat_id": (timestamp_of_analysis, [peak_windows]) } + self._analysis_cache: dict[str, tuple[float, list[tuple[time, time]]]] = {} + self._cache_ttl_seconds = 60 * 30 # 缓存30分钟 + + def _find_peak_windows(self, timestamps: List[float]) -> List[Tuple[datetime, datetime]]: + """ + 使用滑动窗口算法来识别时间戳列表中的高峰时段。 + + Args: + timestamps (List[float]): 按时间排序的聊天时间戳。 + + Returns: + List[Tuple[datetime, datetime]]: 识别出的高峰时段列表,每个元组代表一个时间窗口的开始和结束。 + """ + if len(timestamps) < MIN_CHATS_FOR_PEAK: + return [] + + # 将时间戳转换为 datetime 对象 + datetimes = [datetime.fromtimestamp(ts) for ts in timestamps] + datetimes.sort() + + peak_windows: List[Tuple[datetime, datetime]] = [] + window_start_idx = 0 + + for i in range(len(datetimes)): + # 移动窗口的起始点 + while datetimes[i] - datetimes[window_start_idx] > timedelta(hours=ANALYSIS_WINDOW_HOURS): + window_start_idx += 1 + + # 检查当前窗口是否满足高峰条件 + if i - window_start_idx + 1 >= MIN_CHATS_FOR_PEAK: + current_window_start = datetimes[window_start_idx] + current_window_end = datetimes[i] + + # 合并重叠或相邻的高峰时段 + if peak_windows and current_window_start - peak_windows[-1][1] < timedelta(hours=MIN_GAP_BETWEEN_PEAKS_HOURS): + # 扩展上一个窗口的结束时间 + peak_windows[-1] = (peak_windows[-1][0], current_window_end) + else: + peak_windows.append((current_window_start, current_window_end)) + + return peak_windows + + def get_peak_chat_times(self, chat_id: str) -> List[Tuple[time, time]]: + """ + 获取指定用户的高峰聊天时间段。 + + Args: + chat_id (str): 聊天标识符。 + + Returns: + List[Tuple[time, time]]: 高峰时段的列表,每个元组包含开始和结束时间 (time 对象)。 + """ + # 检查缓存 + cached_timestamp, cached_windows = self._analysis_cache.get(chat_id, (0, [])) + if time_module.time() - cached_timestamp < self._cache_ttl_seconds: + return cached_windows + + timestamps = chat_frequency_tracker.get_timestamps_for_chat(chat_id) + if not timestamps: + return [] + + peak_datetime_windows = self._find_peak_windows(timestamps) + + # 将 datetime 窗口转换为 time 窗口,并进行归一化处理 + peak_time_windows = [] + for start_dt, end_dt in peak_datetime_windows: + # TODO:这里可以添加更复杂的逻辑来处理跨天的平均时间 + # 为简化,我们直接使用窗口的起止时间 + peak_time_windows.append((start_dt.time(), end_dt.time())) + + # 更新缓存 + self._analysis_cache[chat_id] = (time_module.time(), peak_time_windows) + + return peak_time_windows + + def is_in_peak_time(self, chat_id: str, now: Optional[datetime] = None) -> bool: + """ + 检查当前时间是否处于用户的高峰聊天时段内。 + + Args: + chat_id (str): 聊天标识符。 + now (Optional[datetime]): 要检查的时间,默认为当前时间。 + + Returns: + bool: 如果处于高峰时段则返回 True,否则返回 False。 + """ + if now is None: + now = datetime.now() + + now_time = now.time() + peak_times = self.get_peak_chat_times(chat_id) + + for start_time, end_time in peak_times: + if start_time <= end_time: # 同一天 + if start_time <= now_time <= end_time: + return True + else: # 跨天 + if now_time >= start_time or now_time <= end_time: + return True + + return False + + +# 创建一个全局单例 +chat_frequency_analyzer = ChatFrequencyAnalyzer() diff --git a/src/chat/frequency_analyzer/tracker.py b/src/chat/frequency_analyzer/tracker.py new file mode 100644 index 000000000..bee9e4623 --- /dev/null +++ b/src/chat/frequency_analyzer/tracker.py @@ -0,0 +1,77 @@ +import orjson +import time +from typing import Dict, List, Optional +from pathlib import Path + +from src.common.logger import get_logger + +# 数据存储路径 +DATA_DIR = Path("data/frequency_analyzer") +DATA_DIR.mkdir(parents=True, exist_ok=True) +TRACKER_FILE = DATA_DIR / "chat_timestamps.json" + +logger = get_logger("ChatFrequencyTracker") + + +class ChatFrequencyTracker: + """ + 负责跟踪和存储用户聊天启动时间戳。 + """ + + def __init__(self): + self._timestamps: Dict[str, List[float]] = self._load_timestamps() + + def _load_timestamps(self) -> Dict[str, List[float]]: + """从本地文件加载时间戳数据。""" + if not TRACKER_FILE.exists(): + return {} + try: + with open(TRACKER_FILE, "rb") as f: + data = orjson.loads(f.read()) + logger.info(f"成功从 {TRACKER_FILE} 加载了聊天时间戳数据。") + return data + except orjson.JSONDecodeError: + logger.warning(f"无法解析 {TRACKER_FILE},将创建一个新的空数据文件。") + return {} + except Exception as e: + logger.error(f"加载聊天时间戳数据时发生未知错误: {e}") + return {} + + def _save_timestamps(self): + """将当前的时间戳数据保存到本地文件。""" + try: + with open(TRACKER_FILE, "wb") as f: + f.write(orjson.dumps(self._timestamps)) + except Exception as e: + logger.error(f"保存聊天时间戳数据到 {TRACKER_FILE} 时失败: {e}") + + def record_chat_start(self, chat_id: str): + """ + 记录一次聊天会话的开始。 + + Args: + chat_id (str): 唯一的聊天标识符 (例如,用户ID)。 + """ + now = time.time() + if chat_id not in self._timestamps: + self._timestamps[chat_id] = [] + + self._timestamps[chat_id].append(now) + logger.debug(f"为 chat_id '{chat_id}' 记录了新的聊天时间: {now}") + self._save_timestamps() + + def get_timestamps_for_chat(self, chat_id: str) -> Optional[List[float]]: + """ + 获取指定聊天的所有时间戳记录。 + + Args: + chat_id (str): 聊天标识符。 + + Returns: + Optional[List[float]]: 时间戳列表,如果不存在则返回 None。 + """ + return self._timestamps.get(chat_id) + + +# 创建一个全局单例 +chat_frequency_tracker = ChatFrequencyTracker() diff --git a/src/chat/frequency_analyzer/trigger.py b/src/chat/frequency_analyzer/trigger.py new file mode 100644 index 000000000..a6b4d8a3b --- /dev/null +++ b/src/chat/frequency_analyzer/trigger.py @@ -0,0 +1,119 @@ +""" +Frequency-Based Proactive Trigger +================================= + +本模块实现了一个周期性任务,用于根据用户的聊天频率来智能地触发主动思考。 + +核心功能: +- 定期运行,检查所有已知的私聊用户。 +- 调用 ChatFrequencyAnalyzer 判断当前是否处于用户的高峰聊天时段。 +- 如果满足条件(高峰时段、角色清醒、聊天循环空闲),则触发一次主动思考。 +- 包含冷却机制,以避免在同一个高峰时段内重复打扰用户。 + +可配置参数: +- TRIGGER_CHECK_INTERVAL_SECONDS: 触发器检查的周期(秒)。 +- COOLDOWN_HOURS: 在同一个高峰时段内触发一次后的冷却时间(小时)。 +""" +import asyncio +import time +from datetime import datetime +from typing import Dict, Optional + +from src.common.logger import get_logger +from src.chat.chat_loop.proactive.events import ProactiveTriggerEvent +from src.chat.heart_flow.heartflow import heartflow +from src.chat.chat_loop.sleep_manager.sleep_manager import SleepManager +from .analyzer import chat_frequency_analyzer + +logger = get_logger("FrequencyBasedTrigger") + +# --- 可配置参数 --- +# 触发器检查周期(秒) +TRIGGER_CHECK_INTERVAL_SECONDS = 60 * 5 # 5分钟 +# 冷却时间(小时),确保在一个高峰时段只触发一次 +COOLDOWN_HOURS = 3 + + +class FrequencyBasedTrigger: + """ + 一个周期性任务,根据聊天频率分析结果来触发主动思考。 + """ + + def __init__(self, sleep_manager: SleepManager): + self._sleep_manager = sleep_manager + self._task: Optional[asyncio.Task] = None + # 记录上次为用户触发的时间,用于冷却控制 + # 格式: { "chat_id": timestamp } + self._last_triggered: Dict[str, float] = {} + + async def _run_trigger_cycle(self): + """触发器的主要循环逻辑。""" + while True: + try: + await asyncio.sleep(TRIGGER_CHECK_INTERVAL_SECONDS) + logger.debug("开始执行频率触发器检查...") + + # 1. 检查角色是否清醒 + if self._sleep_manager.is_sleeping(): + logger.debug("角色正在睡眠,跳过本次频率触发检查。") + continue + + # 2. 获取所有已知的聊天ID + # 【注意】这里我们假设所有 subheartflow 的 ID 就是 chat_id + all_chat_ids = list(heartflow.subheartflows.keys()) + if not all_chat_ids: + continue + + now = datetime.now() + + for chat_id in all_chat_ids: + # 3. 检查是否处于冷却时间内 + last_triggered_time = self._last_triggered.get(chat_id, 0) + if time.time() - last_triggered_time < COOLDOWN_HOURS * 3600: + continue + + # 4. 检查当前是否是该用户的高峰聊天时间 + if chat_frequency_analyzer.is_in_peak_time(chat_id, now): + + sub_heartflow = await heartflow.get_or_create_subheartflow(chat_id) + if not sub_heartflow: + logger.warning(f"无法为 {chat_id} 获取或创建 sub_heartflow。") + continue + + # 5. 检查用户当前是否已有活跃的思考或回复任务 + cycle_detail = sub_heartflow.heart_fc_instance.context.current_cycle_detail + if cycle_detail and not cycle_detail.end_time: + logger.debug(f"用户 {chat_id} 的聊天循环正忙(仍在周期 {cycle_detail.cycle_id} 中),本次不触发。") + continue + + logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,且聊天循环空闲,准备触发主动思考。") + + # 6. 直接调用 proactive_thinker + event = ProactiveTriggerEvent( + source="frequency_analyzer", + reason=f"User is in a high-frequency chat period." + ) + await sub_heartflow.heart_fc_instance.proactive_thinker.think(event) + + # 7. 更新触发时间,进入冷却 + self._last_triggered[chat_id] = time.time() + + except asyncio.CancelledError: + logger.info("频率触发器任务被取消。") + break + except Exception as e: + logger.error(f"频率触发器循环发生未知错误: {e}", exc_info=True) + # 发生错误后,等待更长时间再重试,避免刷屏 + await asyncio.sleep(TRIGGER_CHECK_INTERVAL_SECONDS * 2) + + def start(self): + """启动触发器任务。""" + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._run_trigger_cycle()) + logger.info("基于聊天频率的主动思考触发器已启动。") + + def stop(self): + """停止触发器任务。""" + if self._task and not self._task.done(): + self._task.cancel() + logger.info("基于聊天频率的主动思考触发器已停止。")