302 lines
12 KiB
Python
302 lines
12 KiB
Python
import asyncio
|
||
import traceback
|
||
from typing import Optional, Coroutine, Callable, Any, List
|
||
|
||
from src.common.logger import get_module_logger, LogConfig, BACKGROUND_TASKS_STYLE_CONFIG
|
||
|
||
# Need manager types for dependency injection
|
||
from src.heart_flow.mai_state_manager import MaiStateManager, MaiStateInfo
|
||
from src.heart_flow.subheartflow_manager import SubHeartflowManager
|
||
from src.heart_flow.interest_logger import InterestLogger
|
||
|
||
background_tasks_log_config = LogConfig(
|
||
console_format=BACKGROUND_TASKS_STYLE_CONFIG["console_format"],
|
||
file_format=BACKGROUND_TASKS_STYLE_CONFIG["file_format"],
|
||
)
|
||
|
||
logger = get_module_logger("background_tasks", config=background_tasks_log_config)
|
||
|
||
# 新增随机停用间隔 (5 分钟)
|
||
RANDOM_DEACTIVATION_INTERVAL_SECONDS = 300
|
||
# 新增兴趣评估间隔
|
||
INTEREST_EVAL_INTERVAL_SECONDS = 5
|
||
|
||
|
||
class BackgroundTaskManager:
|
||
"""管理 Heartflow 的后台周期性任务。"""
|
||
|
||
def __init__(
|
||
self,
|
||
mai_state_info: MaiStateInfo, # Needs current state info
|
||
mai_state_manager: MaiStateManager,
|
||
subheartflow_manager: SubHeartflowManager,
|
||
interest_logger: InterestLogger,
|
||
update_interval: int,
|
||
cleanup_interval: int,
|
||
log_interval: int,
|
||
inactive_threshold: int,
|
||
# 新增兴趣评估间隔参数
|
||
interest_eval_interval: int = INTEREST_EVAL_INTERVAL_SECONDS,
|
||
# 新增随机停用间隔参数
|
||
random_deactivation_interval: int = RANDOM_DEACTIVATION_INTERVAL_SECONDS,
|
||
):
|
||
self.mai_state_info = mai_state_info
|
||
self.mai_state_manager = mai_state_manager
|
||
self.subheartflow_manager = subheartflow_manager
|
||
self.interest_logger = interest_logger
|
||
|
||
# Intervals
|
||
self.update_interval = update_interval
|
||
self.cleanup_interval = cleanup_interval
|
||
self.log_interval = log_interval
|
||
self.interest_eval_interval = interest_eval_interval # 存储兴趣评估间隔
|
||
self.random_deactivation_interval = random_deactivation_interval # 存储随机停用间隔
|
||
|
||
# Task references
|
||
self._state_update_task: Optional[asyncio.Task] = None
|
||
self._cleanup_task: Optional[asyncio.Task] = None
|
||
self._logging_task: Optional[asyncio.Task] = None
|
||
self._interest_eval_task: Optional[asyncio.Task] = None # 新增兴趣评估任务引用
|
||
self._random_deactivation_task: Optional[asyncio.Task] = None # 新增随机停用任务引用
|
||
self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks
|
||
|
||
async def start_tasks(self):
|
||
"""启动所有后台任务
|
||
|
||
功能说明:
|
||
- 启动核心后台任务: 状态更新、清理、日志记录、兴趣评估和随机停用
|
||
- 每个任务启动前检查是否已在运行
|
||
- 将任务引用保存到任务列表
|
||
"""
|
||
|
||
# 任务配置列表: (任务变量名, 任务函数, 任务名称, 日志级别, 额外日志信息, 任务对象引用属性名)
|
||
task_configs = [
|
||
(
|
||
self._state_update_task,
|
||
lambda: self._run_state_update_cycle(self.update_interval),
|
||
"hf_state_update",
|
||
"debug",
|
||
f"聊天状态更新任务已启动 间隔:{self.update_interval}s",
|
||
"_state_update_task",
|
||
),
|
||
(
|
||
self._cleanup_task,
|
||
self._run_cleanup_cycle,
|
||
"hf_cleanup",
|
||
"info",
|
||
f"清理任务已启动 间隔:{self.cleanup_interval}s 阈值:{self.inactive_threshold}s",
|
||
"_cleanup_task",
|
||
),
|
||
(
|
||
self._logging_task,
|
||
self._run_logging_cycle,
|
||
"hf_logging",
|
||
"info",
|
||
f"日志任务已启动 间隔:{self.log_interval}s",
|
||
"_logging_task",
|
||
),
|
||
# 新增兴趣评估任务配置
|
||
(
|
||
self._interest_eval_task,
|
||
self._run_interest_eval_cycle,
|
||
"hf_interest_eval",
|
||
"debug", # 设为debug,避免过多日志
|
||
f"兴趣评估任务已启动 间隔:{self.interest_eval_interval}s",
|
||
"_interest_eval_task",
|
||
),
|
||
# 新增随机停用任务配置
|
||
(
|
||
self._random_deactivation_task,
|
||
self._run_random_deactivation_cycle,
|
||
"hf_random_deactivation",
|
||
"debug", # 设为debug,避免过多日志
|
||
f"随机停用任务已启动 间隔:{self.random_deactivation_interval}s",
|
||
"_random_deactivation_task",
|
||
),
|
||
]
|
||
|
||
# 统一启动所有任务
|
||
for _task_var, task_func, task_name, log_level, log_msg, task_attr_name in task_configs:
|
||
# 检查任务变量是否存在且未完成
|
||
current_task_var = getattr(self, task_attr_name)
|
||
if current_task_var is None or current_task_var.done():
|
||
new_task = asyncio.create_task(task_func(), name=task_name)
|
||
setattr(self, task_attr_name, new_task) # 更新任务变量
|
||
if new_task not in self._tasks: # 避免重复添加
|
||
self._tasks.append(new_task)
|
||
|
||
# 根据配置记录不同级别的日志
|
||
getattr(logger, log_level)(log_msg)
|
||
else:
|
||
logger.warning(f"{task_name}任务已在运行")
|
||
|
||
async def stop_tasks(self):
|
||
"""停止所有后台任务。
|
||
|
||
该方法会:
|
||
1. 遍历所有后台任务并取消未完成的任务
|
||
2. 等待所有取消操作完成
|
||
3. 清空任务列表
|
||
"""
|
||
logger.info("正在停止所有后台任务...")
|
||
cancelled_count = 0
|
||
|
||
# 第一步:取消所有运行中的任务
|
||
for task in self._tasks:
|
||
if task and not task.done():
|
||
task.cancel() # 发送取消请求
|
||
cancelled_count += 1
|
||
|
||
# 第二步:处理取消结果
|
||
if cancelled_count > 0:
|
||
logger.debug(f"正在等待{cancelled_count}个任务完成取消...")
|
||
# 使用gather等待所有取消操作完成,忽略异常
|
||
await asyncio.gather(*[t for t in self._tasks if t and t.cancelled()], return_exceptions=True)
|
||
logger.info(f"成功取消{cancelled_count}个后台任务")
|
||
else:
|
||
logger.info("没有需要取消的后台任务")
|
||
|
||
# 第三步:清空任务列表
|
||
self._tasks = [] # 重置任务列表
|
||
|
||
async def _run_periodic_loop(
|
||
self, task_name: str, interval: int, task_func: Callable[..., Coroutine[Any, Any, None]], **kwargs
|
||
):
|
||
"""周期性任务主循环"""
|
||
while True:
|
||
start_time = asyncio.get_event_loop().time()
|
||
# logger.debug(f"开始执行后台任务: {task_name}")
|
||
|
||
try:
|
||
await task_func(**kwargs) # 执行实际任务
|
||
except asyncio.CancelledError:
|
||
logger.info(f"任务 {task_name} 已取消")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"任务 {task_name} 执行出错: {e}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 计算并执行间隔等待
|
||
elapsed = asyncio.get_event_loop().time() - start_time
|
||
sleep_time = max(0, interval - elapsed)
|
||
# if sleep_time < 0.1: # 任务超时处理, DEBUG 时可能干扰断点
|
||
# logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)")
|
||
await asyncio.sleep(sleep_time)
|
||
|
||
logger.debug(f"任务循环结束: {task_name}") # 调整日志信息
|
||
|
||
async def _perform_state_update_work(self):
|
||
"""执行状态更新工作"""
|
||
previous_status = self.mai_state_info.get_current_state()
|
||
next_state = self.mai_state_manager.check_and_decide_next_state(self.mai_state_info)
|
||
|
||
state_changed = False
|
||
|
||
if next_state is not None:
|
||
state_changed = self.mai_state_info.update_mai_status(next_state)
|
||
|
||
# 处理保持离线状态的特殊情况
|
||
if not state_changed and next_state == previous_status == self.mai_state_info.mai_status.OFFLINE:
|
||
self.mai_state_info.reset_state_timer()
|
||
logger.debug("[后台任务] 保持离线状态并重置计时器")
|
||
state_changed = True # 触发后续处理
|
||
|
||
if state_changed:
|
||
current_state = self.mai_state_info.get_current_state()
|
||
await self.subheartflow_manager.enforce_subheartflow_limits(current_state)
|
||
|
||
# 状态转换处理
|
||
if (
|
||
previous_status == self.mai_state_info.mai_status.OFFLINE
|
||
and current_state != self.mai_state_info.mai_status.OFFLINE
|
||
):
|
||
logger.info("[后台任务] 主状态激活,触发子流激活")
|
||
await self.subheartflow_manager.activate_random_subflows_to_chat(current_state)
|
||
elif (
|
||
current_state == self.mai_state_info.mai_status.OFFLINE
|
||
and previous_status != self.mai_state_info.mai_status.OFFLINE
|
||
):
|
||
logger.info("检测到离线,停用所有子心流")
|
||
await self.subheartflow_manager.deactivate_all_subflows()
|
||
|
||
async def _perform_cleanup_work(self):
|
||
"""执行子心流清理任务
|
||
1. 获取需要清理的不活跃子心流列表
|
||
2. 逐个停止这些子心流
|
||
3. 记录清理结果
|
||
"""
|
||
# 获取需要清理的子心流列表(包含ID和原因)
|
||
flows_to_stop = self.subheartflow_manager.get_inactive_subheartflows()
|
||
|
||
if not flows_to_stop:
|
||
return # 没有需要清理的子心流直接返回
|
||
|
||
logger.info(f"准备删除 {len(flows_to_stop)} 个不活跃(1h)子心流")
|
||
stopped_count = 0
|
||
|
||
# 逐个停止子心流
|
||
for flow_id in flows_to_stop:
|
||
success = await self.subheartflow_manager.delete_subflow(flow_id)
|
||
if success:
|
||
stopped_count += 1
|
||
logger.debug(f"[清理任务] 已停止子心流 {flow_id}")
|
||
|
||
# 记录最终清理结果
|
||
logger.info(f"[清理任务] 清理完成, 共停止 {stopped_count}/{len(flows_to_stop)} 个子心流")
|
||
|
||
async def _perform_logging_work(self):
|
||
"""执行一轮状态日志记录。"""
|
||
await self.interest_logger.log_all_states()
|
||
|
||
# --- 新增兴趣评估工作函数 ---
|
||
async def _perform_interest_eval_work(self):
|
||
"""执行一轮子心流兴趣评估与提升检查。"""
|
||
# 直接调用 subheartflow_manager 的方法,并传递当前状态信息
|
||
await self.subheartflow_manager.evaluate_interest_and_promote(self.mai_state_info)
|
||
|
||
# --- 结束新增 ---
|
||
|
||
# --- 新增随机停用工作函数 ---
|
||
async def _perform_random_deactivation_work(self):
|
||
"""执行一轮子心流随机停用检查。"""
|
||
await self.subheartflow_manager.randomly_deactivate_subflows()
|
||
|
||
# --- 结束新增 ---
|
||
|
||
# --- Specific Task Runners --- #
|
||
async def _run_state_update_cycle(self, interval: int):
|
||
await self._run_periodic_loop(
|
||
task_name="State Update", interval=interval, task_func=self._perform_state_update_work
|
||
)
|
||
|
||
async def _run_cleanup_cycle(self):
|
||
await self._run_periodic_loop(
|
||
task_name="Subflow Cleanup", interval=self.cleanup_interval, task_func=self._perform_cleanup_work
|
||
)
|
||
|
||
async def _run_logging_cycle(self):
|
||
await self._run_periodic_loop(
|
||
task_name="State Logging", interval=self.log_interval, task_func=self._perform_logging_work
|
||
)
|
||
|
||
# --- 新增兴趣评估任务运行器 ---
|
||
async def _run_interest_eval_cycle(self):
|
||
await self._run_periodic_loop(
|
||
task_name="Interest Evaluation",
|
||
interval=self.interest_eval_interval,
|
||
task_func=self._perform_interest_eval_work,
|
||
)
|
||
|
||
# --- 结束新增 ---
|
||
|
||
# --- 新增随机停用任务运行器 ---
|
||
async def _run_random_deactivation_cycle(self):
|
||
"""运行随机停用循环。"""
|
||
await self._run_periodic_loop(
|
||
task_name="Random Deactivation",
|
||
interval=self.random_deactivation_interval,
|
||
task_func=self._perform_random_deactivation_work,
|
||
)
|
||
|
||
# --- 结束新增 ---
|