refactor(chat): 移除旧的聊天频率分析器模块(原来我还有个测试性功能没写啊,那删了吧())
整个 `frequency_analyzer` 模块已被移除。此模块的功能(如主动聊天触发)已被弃用或计划在未来通过更通用的机制重新实现,例如基于事件总线和插件系统的交互策略。 移除此模块可以简化代码库,并消除对已移动或重构的旧组件(如 AFC 管理器)的依赖。
This commit is contained in:
@@ -1,147 +0,0 @@
|
||||
"""
|
||||
Chat Frequency Analyzer
|
||||
=======================
|
||||
|
||||
本模块负责分析用户的聊天时间戳,以识别出他们最活跃的聊天时段(高峰时段)。
|
||||
|
||||
核心功能:
|
||||
- 使用滑动窗口算法来检测时间戳集中的区域。
|
||||
- 提供接口查询指定用户当前是否处于其聊天高峰时段内。
|
||||
- 结果会被缓存以提高性能。
|
||||
|
||||
可配置参数:
|
||||
- ANALYSIS_WINDOW_HOURS: 用于分析的时间窗口大小(小时)。
|
||||
- MIN_CHATS_FOR_PEAK: 在一个窗口内需要多少次聊天才能被认为是高峰时段。
|
||||
- MIN_GAP_BETWEEN_PEAKS_HOURS: 两个独立高峰时段之间的最小间隔(小时)。
|
||||
"""
|
||||
|
||||
import time as time_module
|
||||
from datetime import datetime, time, timedelta
|
||||
|
||||
from .tracker import chat_frequency_tracker
|
||||
|
||||
# --- 可配置参数 ---
|
||||
# 用于分析的时间窗口大小(小时)
|
||||
ANALYSIS_WINDOW_HOURS = 2
|
||||
# 触发高峰时段所需的最小聊天次数
|
||||
MIN_CHATS_FOR_PEAK = 4
|
||||
# 两个独立高峰时段之间的最小间隔(小时)
|
||||
MIN_GAP_BETWEEN_PEAKS_HOURS = 1
|
||||
|
||||
|
||||
class ChatFrequencyAnalyzer:
|
||||
"""
|
||||
分析聊天时间戳,以识别用户的高频聊天时段。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 缓存分析结果,避免重复计算
|
||||
# 格式: { "chat_id": (timestamp_of_analysis, [peak_windows]) }
|
||||
self._analysis_cache: dict[str, tuple[float, list[tuple[time, time]]]] = {}
|
||||
self._cache_ttl_seconds = 60 * 30 # 缓存30分钟
|
||||
|
||||
@staticmethod
|
||||
def _find_peak_windows(timestamps: list[float]) -> list[tuple[datetime, datetime]]:
|
||||
"""
|
||||
使用滑动窗口算法来识别时间戳列表中的高峰时段。
|
||||
|
||||
Args:
|
||||
timestamps (List[float]): 按时间排序的聊天时间戳。
|
||||
|
||||
Returns:
|
||||
List[Tuple[datetime, datetime]]: 识别出的高峰时段列表,每个元组代表一个时间窗口的开始和结束。
|
||||
"""
|
||||
if len(timestamps) < MIN_CHATS_FOR_PEAK:
|
||||
return []
|
||||
|
||||
# 将时间戳转换为 datetime 对象
|
||||
datetimes = [datetime.fromtimestamp(ts) for ts in timestamps]
|
||||
datetimes.sort()
|
||||
|
||||
peak_windows: list[tuple[datetime, datetime]] = []
|
||||
window_start_idx = 0
|
||||
|
||||
for i in range(len(datetimes)):
|
||||
# 移动窗口的起始点
|
||||
while datetimes[i] - datetimes[window_start_idx] > timedelta(hours=ANALYSIS_WINDOW_HOURS):
|
||||
window_start_idx += 1
|
||||
|
||||
# 检查当前窗口是否满足高峰条件
|
||||
if i - window_start_idx + 1 >= MIN_CHATS_FOR_PEAK:
|
||||
current_window_start = datetimes[window_start_idx]
|
||||
current_window_end = datetimes[i]
|
||||
|
||||
# 合并重叠或相邻的高峰时段
|
||||
if peak_windows and current_window_start - peak_windows[-1][1] < timedelta(
|
||||
hours=MIN_GAP_BETWEEN_PEAKS_HOURS
|
||||
):
|
||||
# 扩展上一个窗口的结束时间
|
||||
peak_windows[-1] = (peak_windows[-1][0], current_window_end)
|
||||
else:
|
||||
peak_windows.append((current_window_start, current_window_end))
|
||||
|
||||
return peak_windows
|
||||
|
||||
def get_peak_chat_times(self, chat_id: str) -> list[tuple[time, time]]:
|
||||
"""
|
||||
获取指定用户的高峰聊天时间段。
|
||||
|
||||
Args:
|
||||
chat_id (str): 聊天标识符。
|
||||
|
||||
Returns:
|
||||
List[Tuple[time, time]]: 高峰时段的列表,每个元组包含开始和结束时间 (time 对象)。
|
||||
"""
|
||||
# 检查缓存
|
||||
cached_timestamp, cached_windows = self._analysis_cache.get(chat_id, (0, []))
|
||||
if time_module.time() - cached_timestamp < self._cache_ttl_seconds:
|
||||
return cached_windows
|
||||
|
||||
timestamps = chat_frequency_tracker.get_timestamps_for_chat(chat_id)
|
||||
if not timestamps:
|
||||
return []
|
||||
|
||||
peak_datetime_windows = self._find_peak_windows(timestamps)
|
||||
|
||||
# 将 datetime 窗口转换为 time 窗口,并进行归一化处理
|
||||
peak_time_windows = []
|
||||
for start_dt, end_dt in peak_datetime_windows:
|
||||
# TODO:这里可以添加更复杂的逻辑来处理跨天的平均时间
|
||||
# 为简化,我们直接使用窗口的起止时间
|
||||
peak_time_windows.append((start_dt.time(), end_dt.time()))
|
||||
|
||||
# 更新缓存
|
||||
self._analysis_cache[chat_id] = (time_module.time(), peak_time_windows)
|
||||
|
||||
return peak_time_windows
|
||||
|
||||
def is_in_peak_time(self, chat_id: str, now: datetime | None = None) -> bool:
|
||||
"""
|
||||
检查当前时间是否处于用户的高峰聊天时段内。
|
||||
|
||||
Args:
|
||||
chat_id (str): 聊天标识符。
|
||||
now (Optional[datetime]): 要检查的时间,默认为当前时间。
|
||||
|
||||
Returns:
|
||||
bool: 如果处于高峰时段则返回 True,否则返回 False。
|
||||
"""
|
||||
if now is None:
|
||||
now = datetime.now()
|
||||
|
||||
now_time = now.time()
|
||||
peak_times = self.get_peak_chat_times(chat_id)
|
||||
|
||||
for start_time, end_time in peak_times:
|
||||
if start_time <= end_time: # 同一天
|
||||
if start_time <= now_time <= end_time:
|
||||
return True
|
||||
else: # 跨天
|
||||
if now_time >= start_time or now_time <= end_time:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# 创建一个全局单例
|
||||
chat_frequency_analyzer = ChatFrequencyAnalyzer()
|
||||
@@ -1,78 +0,0 @@
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import orjson
|
||||
|
||||
from src.common.logger import get_logger
|
||||
|
||||
# 数据存储路径
|
||||
DATA_DIR = Path("data/frequency_analyzer")
|
||||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
TRACKER_FILE = DATA_DIR / "chat_timestamps.json"
|
||||
|
||||
logger = get_logger("ChatFrequencyTracker")
|
||||
|
||||
|
||||
class ChatFrequencyTracker:
|
||||
"""
|
||||
负责跟踪和存储用户聊天启动时间戳。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._timestamps: dict[str, list[float]] = self._load_timestamps()
|
||||
|
||||
@staticmethod
|
||||
def _load_timestamps() -> dict[str, list[float]]:
|
||||
"""从本地文件加载时间戳数据。"""
|
||||
if not TRACKER_FILE.exists():
|
||||
return {}
|
||||
try:
|
||||
with open(TRACKER_FILE, "rb") as f:
|
||||
data = orjson.loads(f.read())
|
||||
logger.info(f"成功从 {TRACKER_FILE} 加载了聊天时间戳数据。")
|
||||
return data
|
||||
except orjson.JSONDecodeError:
|
||||
logger.warning(f"无法解析 {TRACKER_FILE},将创建一个新的空数据文件。")
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.error(f"加载聊天时间戳数据时发生未知错误: {e}")
|
||||
return {}
|
||||
|
||||
def _save_timestamps(self):
|
||||
"""将当前的时间戳数据保存到本地文件。"""
|
||||
try:
|
||||
with open(TRACKER_FILE, "wb") as f:
|
||||
f.write(orjson.dumps(self._timestamps))
|
||||
except Exception as e:
|
||||
logger.error(f"保存聊天时间戳数据到 {TRACKER_FILE} 时失败: {e}")
|
||||
|
||||
def record_chat_start(self, chat_id: str):
|
||||
"""
|
||||
记录一次聊天会话的开始。
|
||||
|
||||
Args:
|
||||
chat_id (str): 唯一的聊天标识符 (例如,用户ID)。
|
||||
"""
|
||||
now = time.time()
|
||||
if chat_id not in self._timestamps:
|
||||
self._timestamps[chat_id] = []
|
||||
|
||||
self._timestamps[chat_id].append(now)
|
||||
logger.debug(f"为 chat_id '{chat_id}' 记录了新的聊天时间: {now}")
|
||||
self._save_timestamps()
|
||||
|
||||
def get_timestamps_for_chat(self, chat_id: str) -> list[float] | None:
|
||||
"""
|
||||
获取指定聊天的所有时间戳记录。
|
||||
|
||||
Args:
|
||||
chat_id (str): 聊天标识符。
|
||||
|
||||
Returns:
|
||||
Optional[List[float]]: 时间戳列表,如果不存在则返回 None。
|
||||
"""
|
||||
return self._timestamps.get(chat_id)
|
||||
|
||||
|
||||
# 创建一个全局单例
|
||||
chat_frequency_tracker = ChatFrequencyTracker()
|
||||
@@ -1,103 +0,0 @@
|
||||
"""
|
||||
Frequency-Based Proactive Trigger
|
||||
=================================
|
||||
|
||||
本模块实现了一个周期性任务,用于根据用户的聊天频率来智能地触发主动思考。
|
||||
|
||||
核心功能:
|
||||
- 定期运行,检查所有已知的私聊用户。
|
||||
- 调用 ChatFrequencyAnalyzer 判断当前是否处于用户的高峰聊天时段。
|
||||
- 如果满足条件(高峰时段、角色清醒、聊天循环空闲),则触发一次主动思考。
|
||||
- 包含冷却机制,以避免在同一个高峰时段内重复打扰用户。
|
||||
|
||||
可配置参数:
|
||||
- TRIGGER_CHECK_INTERVAL_SECONDS: 触发器检查的周期(秒)。
|
||||
- COOLDOWN_HOURS: 在同一个高峰时段内触发一次后的冷却时间(小时)。
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from src.common.logger import get_logger
|
||||
|
||||
# AFC manager has been moved to chatter plugin
|
||||
# TODO: 需要重新实现主动思考和睡眠管理功能
|
||||
from .analyzer import chat_frequency_analyzer
|
||||
|
||||
logger = get_logger("FrequencyBasedTrigger")
|
||||
|
||||
# --- 可配置参数 ---
|
||||
# 触发器检查周期(秒)
|
||||
TRIGGER_CHECK_INTERVAL_SECONDS = 60 * 5 # 5分钟
|
||||
# 冷却时间(小时),确保在一个高峰时段只触发一次
|
||||
COOLDOWN_HOURS = 3
|
||||
|
||||
|
||||
class FrequencyBasedTrigger:
|
||||
"""
|
||||
一个周期性任务,根据聊天频率分析结果来触发主动思考。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# TODO: 需要重新实现睡眠管理器
|
||||
self._task: asyncio.Task | None = None
|
||||
# 记录上次为用户触发的时间,用于冷却控制
|
||||
# 格式: { "chat_id": timestamp }
|
||||
self._last_triggered: dict[str, float] = {}
|
||||
|
||||
async def _run_trigger_cycle(self):
|
||||
"""触发器的主要循环逻辑。"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(TRIGGER_CHECK_INTERVAL_SECONDS)
|
||||
logger.debug("开始执行频率触发器检查...")
|
||||
|
||||
# 1. TODO: 检查角色是否清醒 - 需要重新实现睡眠状态检查
|
||||
# 暂时跳过睡眠检查
|
||||
# if self._sleep_manager.is_sleeping():
|
||||
# logger.debug("角色正在睡眠,跳过本次频率触发检查。")
|
||||
# continue
|
||||
|
||||
# 2. 获取所有已知的聊天ID
|
||||
# 注意:AFC管理器已移至chatter插件,此功能暂时禁用
|
||||
# all_chat_ids = list(afc_manager.affinity_flow_chatters.keys())
|
||||
all_chat_ids = [] # 暂时禁用此功能
|
||||
if not all_chat_ids:
|
||||
continue
|
||||
|
||||
now = datetime.now()
|
||||
|
||||
for chat_id in all_chat_ids:
|
||||
# 3. 检查是否处于冷却时间内
|
||||
last_triggered_time = self._last_triggered.get(chat_id, 0)
|
||||
if time.time() - last_triggered_time < COOLDOWN_HOURS * 3600:
|
||||
continue
|
||||
|
||||
# 4. 检查当前是否是该用户的高峰聊天时间
|
||||
if chat_frequency_analyzer.is_in_peak_time(chat_id, now):
|
||||
# 5. 检查用户当前是否已有活跃的处理任务
|
||||
# 注意:AFC管理器已移至chatter插件,此功能暂时禁用
|
||||
# chatter = afc_manager.get_or_create_chatter(chat_id)
|
||||
logger.info(f"检测到用户 {chat_id} 处于聊天高峰期,但AFC功能已移至chatter插件")
|
||||
continue
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("频率触发器任务被取消。")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"频率触发器循环发生未知错误: {e}", exc_info=True)
|
||||
# 发生错误后,等待更长时间再重试,避免刷屏
|
||||
await asyncio.sleep(TRIGGER_CHECK_INTERVAL_SECONDS * 2)
|
||||
|
||||
def start(self):
|
||||
"""启动触发器任务。"""
|
||||
if self._task is None or self._task.done():
|
||||
self._task = asyncio.create_task(self._run_trigger_cycle())
|
||||
logger.info("基于聊天频率的主动思考触发器已启动。")
|
||||
|
||||
def stop(self):
|
||||
"""停止触发器任务。"""
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
logger.info("基于聊天频率的主动思考触发器已停止。")
|
||||
Reference in New Issue
Block a user