fix:调整hf位置
This commit is contained in:
314
src/chat/heart_flow/background_tasks.py
Normal file
314
src/chat/heart_flow/background_tasks.py
Normal file
@@ -0,0 +1,314 @@
|
||||
import asyncio
|
||||
import traceback
|
||||
from typing import Optional, Coroutine, Callable, Any, List
|
||||
|
||||
from src.common.logger_manager import get_logger
|
||||
|
||||
# Need manager types for dependency injection
|
||||
from src.chat.heart_flow.mai_state_manager import MaiStateManager, MaiStateInfo
|
||||
from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager
|
||||
from src.chat.heart_flow.interest_logger import InterestLogger
|
||||
|
||||
|
||||
logger = get_logger("background_tasks")
|
||||
|
||||
|
||||
# 新增兴趣评估间隔
|
||||
INTEREST_EVAL_INTERVAL_SECONDS = 5
|
||||
# 新增聊天超时检查间隔
|
||||
NORMAL_CHAT_TIMEOUT_CHECK_INTERVAL_SECONDS = 60
|
||||
# 新增状态评估间隔
|
||||
HF_JUDGE_STATE_UPDATE_INTERVAL_SECONDS = 20
|
||||
# 新增私聊激活检查间隔
|
||||
PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS = 5 # 与兴趣评估类似,设为5秒
|
||||
|
||||
CLEANUP_INTERVAL_SECONDS = 1200
|
||||
STATE_UPDATE_INTERVAL_SECONDS = 60
|
||||
LOG_INTERVAL_SECONDS = 3
|
||||
|
||||
|
||||
async def _run_periodic_loop(
|
||||
task_name: str, interval: int, task_func: Callable[..., Coroutine[Any, Any, None]], **kwargs
|
||||
):
|
||||
"""周期性任务主循环"""
|
||||
while True:
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
# logger.debug(f"开始执行后台任务: {task_name}")
|
||||
|
||||
try:
|
||||
await task_func(**kwargs) # 执行实际任务
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"任务 {task_name} 已取消")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"任务 {task_name} 执行出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
# 计算并执行间隔等待
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
sleep_time = max(0, interval - elapsed)
|
||||
# if sleep_time < 0.1: # 任务超时处理, DEBUG 时可能干扰断点
|
||||
# logger.warning(f"任务 {task_name} 超时执行 ({elapsed:.2f}s > {interval}s)")
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
logger.debug(f"任务循环结束: {task_name}") # 调整日志信息
|
||||
|
||||
|
||||
class BackgroundTaskManager:
|
||||
"""管理 Heartflow 的后台周期性任务。"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mai_state_info: MaiStateInfo, # Needs current state info
|
||||
mai_state_manager: MaiStateManager,
|
||||
subheartflow_manager: SubHeartflowManager,
|
||||
interest_logger: InterestLogger,
|
||||
):
|
||||
self.mai_state_info = mai_state_info
|
||||
self.mai_state_manager = mai_state_manager
|
||||
self.subheartflow_manager = subheartflow_manager
|
||||
self.interest_logger = interest_logger
|
||||
|
||||
# Task references
|
||||
self._state_update_task: Optional[asyncio.Task] = None
|
||||
self._cleanup_task: Optional[asyncio.Task] = None
|
||||
self._logging_task: Optional[asyncio.Task] = None
|
||||
self._normal_chat_timeout_check_task: Optional[asyncio.Task] = None
|
||||
self._hf_judge_state_update_task: Optional[asyncio.Task] = None
|
||||
self._into_focus_task: Optional[asyncio.Task] = None
|
||||
self._private_chat_activation_task: Optional[asyncio.Task] = None # 新增私聊激活任务引用
|
||||
self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks
|
||||
self._detect_command_from_gui_task: Optional[asyncio.Task] = None # 新增GUI命令检测任务引用
|
||||
|
||||
async def start_tasks(self):
|
||||
"""启动所有后台任务
|
||||
|
||||
功能说明:
|
||||
- 启动核心后台任务: 状态更新、清理、日志记录、兴趣评估和随机停用
|
||||
- 每个任务启动前检查是否已在运行
|
||||
- 将任务引用保存到任务列表
|
||||
"""
|
||||
|
||||
# 任务配置列表: (任务函数, 任务名称, 日志级别, 额外日志信息, 任务对象引用属性名)
|
||||
task_configs = [
|
||||
(
|
||||
lambda: self._run_state_update_cycle(STATE_UPDATE_INTERVAL_SECONDS),
|
||||
"debug",
|
||||
f"聊天状态更新任务已启动 间隔:{STATE_UPDATE_INTERVAL_SECONDS}s",
|
||||
"_state_update_task",
|
||||
),
|
||||
(
|
||||
lambda: self._run_normal_chat_timeout_check_cycle(NORMAL_CHAT_TIMEOUT_CHECK_INTERVAL_SECONDS),
|
||||
"debug",
|
||||
f"聊天超时检查任务已启动 间隔:{NORMAL_CHAT_TIMEOUT_CHECK_INTERVAL_SECONDS}s",
|
||||
"_normal_chat_timeout_check_task",
|
||||
),
|
||||
(
|
||||
lambda: self._run_absent_into_chat(HF_JUDGE_STATE_UPDATE_INTERVAL_SECONDS),
|
||||
"debug",
|
||||
f"状态评估任务已启动 间隔:{HF_JUDGE_STATE_UPDATE_INTERVAL_SECONDS}s",
|
||||
"_hf_judge_state_update_task",
|
||||
),
|
||||
(
|
||||
self._run_cleanup_cycle,
|
||||
"info",
|
||||
f"清理任务已启动 间隔:{CLEANUP_INTERVAL_SECONDS}s",
|
||||
"_cleanup_task",
|
||||
),
|
||||
(
|
||||
self._run_logging_cycle,
|
||||
"info",
|
||||
f"日志任务已启动 间隔:{LOG_INTERVAL_SECONDS}s",
|
||||
"_logging_task",
|
||||
),
|
||||
# 新增兴趣评估任务配置
|
||||
(
|
||||
self._run_into_focus_cycle,
|
||||
"debug", # 设为debug,避免过多日志
|
||||
f"专注评估任务已启动 间隔:{INTEREST_EVAL_INTERVAL_SECONDS}s",
|
||||
"_into_focus_task",
|
||||
),
|
||||
# 新增私聊激活任务配置
|
||||
(
|
||||
# Use lambda to pass the interval to the runner function
|
||||
lambda: self._run_private_chat_activation_cycle(PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS),
|
||||
"debug",
|
||||
f"私聊激活检查任务已启动 间隔:{PRIVATE_CHAT_ACTIVATION_CHECK_INTERVAL_SECONDS}s",
|
||||
"_private_chat_activation_task",
|
||||
),
|
||||
# 新增GUI命令检测任务配置
|
||||
# (
|
||||
# lambda: self._run_detect_command_from_gui_cycle(3),
|
||||
# "debug",
|
||||
# f"GUI命令检测任务已启动 间隔:{3}s",
|
||||
# "_detect_command_from_gui_task",
|
||||
# ),
|
||||
]
|
||||
|
||||
# 统一启动所有任务
|
||||
for task_func, log_level, log_msg, task_attr_name in task_configs:
|
||||
# 检查任务变量是否存在且未完成
|
||||
current_task_var = getattr(self, task_attr_name)
|
||||
if current_task_var is None or current_task_var.done():
|
||||
new_task = asyncio.create_task(task_func())
|
||||
setattr(self, task_attr_name, new_task) # 更新任务变量
|
||||
if new_task not in self._tasks: # 避免重复添加
|
||||
self._tasks.append(new_task)
|
||||
|
||||
# 根据配置记录不同级别的日志
|
||||
getattr(logger, log_level)(log_msg)
|
||||
else:
|
||||
logger.warning(f"{task_attr_name}任务已在运行")
|
||||
|
||||
async def stop_tasks(self):
|
||||
"""停止所有后台任务。
|
||||
|
||||
该方法会:
|
||||
1. 遍历所有后台任务并取消未完成的任务
|
||||
2. 等待所有取消操作完成
|
||||
3. 清空任务列表
|
||||
"""
|
||||
logger.info("正在停止所有后台任务...")
|
||||
cancelled_count = 0
|
||||
|
||||
# 第一步:取消所有运行中的任务
|
||||
for task in self._tasks:
|
||||
if task and not task.done():
|
||||
task.cancel() # 发送取消请求
|
||||
cancelled_count += 1
|
||||
|
||||
# 第二步:处理取消结果
|
||||
if cancelled_count > 0:
|
||||
logger.debug(f"正在等待{cancelled_count}个任务完成取消...")
|
||||
# 使用gather等待所有取消操作完成,忽略异常
|
||||
await asyncio.gather(*[t for t in self._tasks if t and t.cancelled()], return_exceptions=True)
|
||||
logger.info(f"成功取消{cancelled_count}个后台任务")
|
||||
else:
|
||||
logger.info("没有需要取消的后台任务")
|
||||
|
||||
# 第三步:清空任务列表
|
||||
self._tasks = [] # 重置任务列表
|
||||
|
||||
async def _perform_state_update_work(self):
|
||||
"""执行状态更新工作"""
|
||||
previous_status = self.mai_state_info.get_current_state()
|
||||
next_state = self.mai_state_manager.check_and_decide_next_state(self.mai_state_info)
|
||||
|
||||
state_changed = False
|
||||
|
||||
if next_state is not None:
|
||||
state_changed = self.mai_state_info.update_mai_status(next_state)
|
||||
|
||||
# 处理保持离线状态的特殊情况
|
||||
if not state_changed and next_state == previous_status == self.mai_state_info.mai_status.OFFLINE:
|
||||
self.mai_state_info.reset_state_timer()
|
||||
logger.debug("[后台任务] 保持离线状态并重置计时器")
|
||||
state_changed = True # 触发后续处理
|
||||
|
||||
if state_changed:
|
||||
current_state = self.mai_state_info.get_current_state()
|
||||
await self.subheartflow_manager.enforce_subheartflow_limits()
|
||||
|
||||
# 状态转换处理
|
||||
|
||||
if (
|
||||
current_state == self.mai_state_info.mai_status.OFFLINE
|
||||
and previous_status != self.mai_state_info.mai_status.OFFLINE
|
||||
):
|
||||
logger.info("检测到离线,停用所有子心流")
|
||||
await self.subheartflow_manager.deactivate_all_subflows()
|
||||
|
||||
async def _perform_absent_into_chat(self):
|
||||
"""调用llm检测是否转换ABSENT-CHAT状态"""
|
||||
logger.debug("[状态评估任务] 开始基于LLM评估子心流状态...")
|
||||
await self.subheartflow_manager.sbhf_absent_into_chat()
|
||||
|
||||
async def _normal_chat_timeout_check_work(self):
|
||||
"""检查处于CHAT状态的子心流是否因长时间未发言而超时,并将其转为ABSENT"""
|
||||
logger.debug("[聊天超时检查] 开始检查处于CHAT状态的子心流...")
|
||||
await self.subheartflow_manager.sbhf_chat_into_absent()
|
||||
|
||||
async def _perform_cleanup_work(self):
|
||||
"""执行子心流清理任务
|
||||
1. 获取需要清理的不活跃子心流列表
|
||||
2. 逐个停止这些子心流
|
||||
3. 记录清理结果
|
||||
"""
|
||||
# 获取需要清理的子心流列表(包含ID和原因)
|
||||
flows_to_stop = self.subheartflow_manager.get_inactive_subheartflows()
|
||||
|
||||
if not flows_to_stop:
|
||||
return # 没有需要清理的子心流直接返回
|
||||
|
||||
logger.info(f"准备删除 {len(flows_to_stop)} 个不活跃(1h)子心流")
|
||||
stopped_count = 0
|
||||
|
||||
# 逐个停止子心流
|
||||
for flow_id in flows_to_stop:
|
||||
success = await self.subheartflow_manager.delete_subflow(flow_id)
|
||||
if success:
|
||||
stopped_count += 1
|
||||
logger.debug(f"[清理任务] 已停止子心流 {flow_id}")
|
||||
|
||||
# 记录最终清理结果
|
||||
logger.info(f"[清理任务] 清理完成, 共停止 {stopped_count}/{len(flows_to_stop)} 个子心流")
|
||||
|
||||
async def _perform_logging_work(self):
|
||||
"""执行一轮状态日志记录。"""
|
||||
await self.interest_logger.log_all_states()
|
||||
|
||||
# --- 新增兴趣评估工作函数 ---
|
||||
async def _perform_into_focus_work(self):
|
||||
"""执行一轮子心流兴趣评估与提升检查。"""
|
||||
# 直接调用 subheartflow_manager 的方法,并传递当前状态信息
|
||||
await self.subheartflow_manager.sbhf_absent_into_focus()
|
||||
|
||||
# --- 结束新增 ---
|
||||
|
||||
# --- 结束新增 ---
|
||||
|
||||
# --- Specific Task Runners --- #
|
||||
async def _run_state_update_cycle(self, interval: int):
|
||||
await _run_periodic_loop(task_name="State Update", interval=interval, task_func=self._perform_state_update_work)
|
||||
|
||||
async def _run_absent_into_chat(self, interval: int):
|
||||
await _run_periodic_loop(task_name="Into Chat", interval=interval, task_func=self._perform_absent_into_chat)
|
||||
|
||||
async def _run_normal_chat_timeout_check_cycle(self, interval: int):
|
||||
await _run_periodic_loop(
|
||||
task_name="Normal Chat Timeout Check", interval=interval, task_func=self._normal_chat_timeout_check_work
|
||||
)
|
||||
|
||||
async def _run_cleanup_cycle(self):
|
||||
await _run_periodic_loop(
|
||||
task_name="Subflow Cleanup", interval=CLEANUP_INTERVAL_SECONDS, task_func=self._perform_cleanup_work
|
||||
)
|
||||
|
||||
async def _run_logging_cycle(self):
|
||||
await _run_periodic_loop(
|
||||
task_name="State Logging", interval=LOG_INTERVAL_SECONDS, task_func=self._perform_logging_work
|
||||
)
|
||||
|
||||
# --- 新增兴趣评估任务运行器 ---
|
||||
async def _run_into_focus_cycle(self):
|
||||
await _run_periodic_loop(
|
||||
task_name="Into Focus",
|
||||
interval=INTEREST_EVAL_INTERVAL_SECONDS,
|
||||
task_func=self._perform_into_focus_work,
|
||||
)
|
||||
|
||||
# 新增私聊激活任务运行器
|
||||
async def _run_private_chat_activation_cycle(self, interval: int):
|
||||
await _run_periodic_loop(
|
||||
task_name="Private Chat Activation Check",
|
||||
interval=interval,
|
||||
task_func=self.subheartflow_manager.sbhf_absent_private_into_focus,
|
||||
)
|
||||
|
||||
# # 有api之后删除
|
||||
# async def _run_detect_command_from_gui_cycle(self, interval: int):
|
||||
# await _run_periodic_loop(
|
||||
# task_name="Detect Command from GUI",
|
||||
# interval=interval,
|
||||
# task_func=self.subheartflow_manager.detect_command_from_gui,
|
||||
# )
|
||||
17
src/chat/heart_flow/chat_state_info.py
Normal file
17
src/chat/heart_flow/chat_state_info.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from src.manager.mood_manager import mood_manager
|
||||
import enum
|
||||
|
||||
|
||||
class ChatState(enum.Enum):
|
||||
ABSENT = "没在看群"
|
||||
CHAT = "随便水群"
|
||||
FOCUSED = "认真水群"
|
||||
|
||||
|
||||
class ChatStateInfo:
|
||||
def __init__(self):
|
||||
self.chat_status: ChatState = ChatState.CHAT
|
||||
self.current_state_time = 120
|
||||
|
||||
self.mood_manager = mood_manager
|
||||
self.mood = self.mood_manager.get_mood_prompt()
|
||||
97
src/chat/heart_flow/heartflow.py
Normal file
97
src/chat/heart_flow/heartflow.py
Normal file
@@ -0,0 +1,97 @@
|
||||
from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState
|
||||
from src.chat.models.utils_model import LLMRequest
|
||||
from src.config.config import global_config
|
||||
from src.common.logger_manager import get_logger
|
||||
from typing import Any, Optional
|
||||
from src.tools.tool_use import ToolUser
|
||||
from src.chat.person_info.relationship_manager import relationship_manager # Module instance
|
||||
from src.chat.heart_flow.mai_state_manager import MaiStateInfo, MaiStateManager
|
||||
from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager
|
||||
from src.chat.heart_flow.interest_logger import InterestLogger # Import InterestLogger
|
||||
from src.chat.heart_flow.background_tasks import BackgroundTaskManager # Import BackgroundTaskManager
|
||||
|
||||
logger = get_logger("heartflow")
|
||||
|
||||
|
||||
class Heartflow:
|
||||
"""主心流协调器,负责初始化并协调各个子系统:
|
||||
- 状态管理 (MaiState)
|
||||
- 子心流管理 (SubHeartflow)
|
||||
- 思考过程 (Mind)
|
||||
- 日志记录 (InterestLogger)
|
||||
- 后台任务 (BackgroundTaskManager)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 核心状态
|
||||
self.current_mind = "什么也没想" # 当前主心流想法
|
||||
self.past_mind = [] # 历史想法记录
|
||||
|
||||
# 状态管理相关
|
||||
self.current_state: MaiStateInfo = MaiStateInfo() # 当前状态信息
|
||||
self.mai_state_manager: MaiStateManager = MaiStateManager() # 状态决策管理器
|
||||
|
||||
# 子心流管理 (在初始化时传入 current_state)
|
||||
self.subheartflow_manager: SubHeartflowManager = SubHeartflowManager(self.current_state)
|
||||
|
||||
# LLM模型配置
|
||||
self.llm_model = LLMRequest(
|
||||
model=global_config.llm_heartflow, temperature=0.6, max_tokens=1000, request_type="heart_flow"
|
||||
)
|
||||
|
||||
# 外部依赖模块
|
||||
self.tool_user_instance = ToolUser() # 工具使用模块
|
||||
self.relationship_manager_instance = relationship_manager # 关系管理模块
|
||||
|
||||
self.interest_logger: InterestLogger = InterestLogger(self.subheartflow_manager, self) # 兴趣日志记录器
|
||||
|
||||
# 后台任务管理器 (整合所有定时任务)
|
||||
self.background_task_manager: BackgroundTaskManager = BackgroundTaskManager(
|
||||
mai_state_info=self.current_state,
|
||||
mai_state_manager=self.mai_state_manager,
|
||||
subheartflow_manager=self.subheartflow_manager,
|
||||
interest_logger=self.interest_logger,
|
||||
)
|
||||
|
||||
async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]:
|
||||
"""获取或创建一个新的SubHeartflow实例 - 委托给 SubHeartflowManager"""
|
||||
# 不再需要传入 self.current_state
|
||||
return await self.subheartflow_manager.get_or_create_subheartflow(subheartflow_id)
|
||||
|
||||
async def force_change_subheartflow_status(self, subheartflow_id: str, status: ChatState) -> None:
|
||||
"""强制改变子心流的状态"""
|
||||
# 这里的 message 是可选的,可能是一个消息对象,也可能是其他类型的数据
|
||||
return await self.subheartflow_manager.force_change_state(subheartflow_id, status)
|
||||
|
||||
async def api_get_all_states(self):
|
||||
"""获取所有状态"""
|
||||
return await self.interest_logger.api_get_all_states()
|
||||
|
||||
async def api_get_subheartflow_cycle_info(self, subheartflow_id: str, history_len: int) -> Optional[dict]:
|
||||
"""获取子心流的循环信息"""
|
||||
subheartflow = await self.subheartflow_manager.get_or_create_subheartflow(subheartflow_id)
|
||||
if not subheartflow:
|
||||
logger.warning(f"尝试获取不存在的子心流 {subheartflow_id} 的周期信息")
|
||||
return None
|
||||
heartfc_instance = subheartflow.heart_fc_instance
|
||||
if not heartfc_instance:
|
||||
logger.warning(f"子心流 {subheartflow_id} 没有心流实例,无法获取周期信息")
|
||||
return None
|
||||
|
||||
return heartfc_instance.get_cycle_history(last_n=history_len)
|
||||
|
||||
async def heartflow_start_working(self):
|
||||
"""启动后台任务"""
|
||||
await self.background_task_manager.start_tasks()
|
||||
logger.info("[Heartflow] 后台任务已启动")
|
||||
|
||||
# 根本不会用到这个函数吧,那样麦麦直接死了
|
||||
async def stop_working(self):
|
||||
"""停止所有任务和子心流"""
|
||||
logger.info("[Heartflow] 正在停止任务和子心流...")
|
||||
await self.background_task_manager.stop_tasks()
|
||||
await self.subheartflow_manager.deactivate_all_subflows()
|
||||
logger.info("[Heartflow] 所有任务和子心流已停止")
|
||||
|
||||
|
||||
heartflow = Heartflow()
|
||||
200
src/chat/heart_flow/interest_chatting.py
Normal file
200
src/chat/heart_flow/interest_chatting.py
Normal file
@@ -0,0 +1,200 @@
|
||||
import asyncio
|
||||
from src.config.config import global_config
|
||||
from typing import Optional, Dict
|
||||
import traceback
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.chat.message_receive.message import MessageRecv
|
||||
import math
|
||||
|
||||
|
||||
# 定义常量 (从 interest.py 移动过来)
|
||||
MAX_INTEREST = 15.0
|
||||
|
||||
logger = get_logger("interest_chatting")
|
||||
|
||||
PROBABILITY_INCREASE_RATE_PER_SECOND = 0.1
|
||||
PROBABILITY_DECREASE_RATE_PER_SECOND = 0.1
|
||||
MAX_REPLY_PROBABILITY = 1
|
||||
|
||||
|
||||
class InterestChatting:
|
||||
def __init__(
|
||||
self,
|
||||
decay_rate=global_config.default_decay_rate_per_second,
|
||||
max_interest=MAX_INTEREST,
|
||||
trigger_threshold=global_config.reply_trigger_threshold,
|
||||
max_probability=MAX_REPLY_PROBABILITY,
|
||||
):
|
||||
# 基础属性初始化
|
||||
self.interest_level: float = 0.0
|
||||
self.decay_rate_per_second: float = decay_rate
|
||||
self.max_interest: float = max_interest
|
||||
|
||||
self.trigger_threshold: float = trigger_threshold
|
||||
self.max_reply_probability: float = max_probability
|
||||
self.is_above_threshold: bool = False
|
||||
|
||||
# 任务相关属性初始化
|
||||
self.update_task: Optional[asyncio.Task] = None
|
||||
self._stop_event = asyncio.Event()
|
||||
self._task_lock = asyncio.Lock()
|
||||
self._is_running = False
|
||||
|
||||
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
|
||||
self.update_interval = 1.0
|
||||
|
||||
self.above_threshold = False
|
||||
self.start_hfc_probability = 0.0
|
||||
|
||||
async def initialize(self):
|
||||
async with self._task_lock:
|
||||
if self._is_running:
|
||||
logger.debug("后台兴趣更新任务已在运行中。")
|
||||
return
|
||||
|
||||
# 清理已完成或已取消的任务
|
||||
if self.update_task and (self.update_task.done() or self.update_task.cancelled()):
|
||||
self.update_task = None
|
||||
|
||||
if not self.update_task:
|
||||
self._stop_event.clear()
|
||||
self._is_running = True
|
||||
self.update_task = asyncio.create_task(self._run_update_loop(self.update_interval))
|
||||
logger.debug("后台兴趣更新任务已创建并启动。")
|
||||
|
||||
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
|
||||
"""添加消息到兴趣字典
|
||||
|
||||
参数:
|
||||
message: 接收到的消息
|
||||
interest_value: 兴趣值
|
||||
is_mentioned: 是否被提及
|
||||
|
||||
功能:
|
||||
1. 将消息添加到兴趣字典
|
||||
2. 更新最后交互时间
|
||||
3. 如果字典长度超过10,删除最旧的消息
|
||||
"""
|
||||
# 添加新消息
|
||||
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
|
||||
|
||||
# 如果字典长度超过10,删除最旧的消息
|
||||
if len(self.interest_dict) > 10:
|
||||
oldest_key = next(iter(self.interest_dict))
|
||||
self.interest_dict.pop(oldest_key)
|
||||
|
||||
async def _calculate_decay(self):
|
||||
"""计算兴趣值的衰减
|
||||
|
||||
参数:
|
||||
current_time: 当前时间戳
|
||||
|
||||
处理逻辑:
|
||||
1. 计算时间差
|
||||
2. 处理各种异常情况(负值/零值)
|
||||
3. 正常计算衰减
|
||||
4. 更新最后更新时间
|
||||
"""
|
||||
|
||||
# 处理极小兴趣值情况
|
||||
if self.interest_level < 1e-9:
|
||||
self.interest_level = 0.0
|
||||
return
|
||||
|
||||
# 异常情况处理
|
||||
if self.decay_rate_per_second <= 0:
|
||||
logger.warning(f"衰减率({self.decay_rate_per_second})无效,重置兴趣值为0")
|
||||
self.interest_level = 0.0
|
||||
return
|
||||
|
||||
# 正常衰减计算
|
||||
try:
|
||||
decay_factor = math.pow(self.decay_rate_per_second, self.update_interval)
|
||||
self.interest_level *= decay_factor
|
||||
except ValueError as e:
|
||||
logger.error(
|
||||
f"衰减计算错误: {e} 参数: 衰减率={self.decay_rate_per_second} 时间差={self.update_interval} 当前兴趣={self.interest_level}"
|
||||
)
|
||||
self.interest_level = 0.0
|
||||
|
||||
async def _update_reply_probability(self):
|
||||
self.above_threshold = self.interest_level >= self.trigger_threshold
|
||||
if self.above_threshold:
|
||||
self.start_hfc_probability += PROBABILITY_INCREASE_RATE_PER_SECOND
|
||||
else:
|
||||
if self.start_hfc_probability > 0:
|
||||
self.start_hfc_probability = max(0, self.start_hfc_probability - PROBABILITY_DECREASE_RATE_PER_SECOND)
|
||||
|
||||
async def increase_interest(self, value: float):
|
||||
self.interest_level += value
|
||||
self.interest_level = min(self.interest_level, self.max_interest)
|
||||
|
||||
async def decrease_interest(self, value: float):
|
||||
self.interest_level -= value
|
||||
self.interest_level = max(self.interest_level, 0.0)
|
||||
|
||||
async def get_interest(self) -> float:
|
||||
return self.interest_level
|
||||
|
||||
async def get_state(self) -> dict:
|
||||
interest = self.interest_level # 直接使用属性值
|
||||
return {
|
||||
"interest_level": round(interest, 2),
|
||||
"start_hfc_probability": round(self.start_hfc_probability, 4),
|
||||
"above_threshold": self.above_threshold,
|
||||
}
|
||||
|
||||
# --- 新增后台更新任务相关方法 ---
|
||||
async def _run_update_loop(self, update_interval: float = 1.0):
|
||||
"""后台循环,定期更新兴趣和回复概率。"""
|
||||
try:
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
if self.interest_level != 0:
|
||||
await self._calculate_decay()
|
||||
|
||||
await self._update_reply_probability()
|
||||
|
||||
# 等待下一个周期或停止事件
|
||||
await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
|
||||
except asyncio.TimeoutError:
|
||||
# 正常超时,继续循环
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"InterestChatting 更新循环出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
# 防止错误导致CPU飙升,稍作等待
|
||||
await asyncio.sleep(5)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("InterestChatting 更新循环被取消。")
|
||||
finally:
|
||||
self._is_running = False
|
||||
logger.info("InterestChatting 更新循环已停止。")
|
||||
|
||||
async def stop_updates(self):
|
||||
"""停止后台更新任务,使用锁确保并发安全"""
|
||||
async with self._task_lock:
|
||||
if not self._is_running:
|
||||
logger.debug("后台兴趣更新任务未运行。")
|
||||
return
|
||||
|
||||
logger.info("正在停止 InterestChatting 后台更新任务...")
|
||||
self._stop_event.set()
|
||||
|
||||
if self.update_task and not self.update_task.done():
|
||||
try:
|
||||
# 等待任务结束,设置超时
|
||||
await asyncio.wait_for(self.update_task, timeout=5.0)
|
||||
logger.info("InterestChatting 后台更新任务已成功停止。")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
|
||||
self.update_task.cancel()
|
||||
try:
|
||||
await self.update_task # 等待取消完成
|
||||
except asyncio.CancelledError:
|
||||
logger.info("InterestChatting 后台更新任务已被取消。")
|
||||
except Exception as e:
|
||||
logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
|
||||
finally:
|
||||
self.update_task = None
|
||||
self._is_running = False
|
||||
212
src/chat/heart_flow/interest_logger.py
Normal file
212
src/chat/heart_flow/interest_logger.py
Normal file
@@ -0,0 +1,212 @@
|
||||
import asyncio
|
||||
import time
|
||||
import json
|
||||
import os
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Dict, List
|
||||
|
||||
from src.common.logger_manager import get_logger
|
||||
|
||||
# Need chat_manager to get stream names
|
||||
from src.chat.message_receive.chat_stream import chat_manager
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.chat.heart_flow.subheartflow_manager import SubHeartflowManager
|
||||
from src.chat.heart_flow.sub_heartflow import SubHeartflow
|
||||
from src.chat.heart_flow.heartflow import Heartflow # 导入 Heartflow 类型
|
||||
|
||||
|
||||
logger = get_logger("interest")
|
||||
|
||||
# Consider moving log directory/filename constants here
|
||||
LOG_DIRECTORY = "logs/interest"
|
||||
HISTORY_LOG_FILENAME = "interest_history.log"
|
||||
|
||||
|
||||
def _ensure_log_directory():
|
||||
"""确保日志目录存在。"""
|
||||
os.makedirs(LOG_DIRECTORY, exist_ok=True)
|
||||
logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在")
|
||||
|
||||
|
||||
def _clear_and_create_log_file():
|
||||
"""清除日志文件并创建新的日志文件。"""
|
||||
if os.path.exists(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)):
|
||||
os.remove(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME))
|
||||
with open(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME), "w", encoding="utf-8") as f:
|
||||
f.write("")
|
||||
|
||||
|
||||
class InterestLogger:
|
||||
"""负责定期记录主心流和所有子心流的状态到日志文件。"""
|
||||
|
||||
def __init__(self, subheartflow_manager: "SubHeartflowManager", heartflow: "Heartflow"):
|
||||
"""
|
||||
初始化 InterestLogger。
|
||||
|
||||
Args:
|
||||
subheartflow_manager: 子心流管理器实例。
|
||||
heartflow: 主心流实例,用于获取主心流状态。
|
||||
"""
|
||||
self.subheartflow_manager = subheartflow_manager
|
||||
self.heartflow = heartflow # 存储 Heartflow 实例
|
||||
self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)
|
||||
_ensure_log_directory()
|
||||
_clear_and_create_log_file()
|
||||
|
||||
async def get_all_subflow_states(self) -> Dict[str, Dict]:
|
||||
"""并发获取所有活跃子心流的当前完整状态。"""
|
||||
all_flows: List["SubHeartflow"] = self.subheartflow_manager.get_all_subheartflows()
|
||||
tasks = []
|
||||
results = {}
|
||||
|
||||
if not all_flows:
|
||||
# logger.debug("未找到任何子心流状态")
|
||||
return results
|
||||
|
||||
for subheartflow in all_flows:
|
||||
if await self.subheartflow_manager.get_or_create_subheartflow(subheartflow.subheartflow_id):
|
||||
tasks.append(
|
||||
asyncio.create_task(subheartflow.get_full_state(), name=f"get_state_{subheartflow.subheartflow_id}")
|
||||
)
|
||||
else:
|
||||
logger.warning(f"子心流 {subheartflow.subheartflow_id} 在创建任务前已消失")
|
||||
|
||||
if tasks:
|
||||
done, pending = await asyncio.wait(tasks, timeout=5.0)
|
||||
|
||||
if pending:
|
||||
logger.warning(f"获取子心流状态超时,有 {len(pending)} 个任务未完成")
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
for task in done:
|
||||
stream_id_str = task.get_name().split("get_state_")[-1]
|
||||
stream_id = stream_id_str
|
||||
|
||||
if task.cancelled():
|
||||
logger.warning(f"获取子心流 {stream_id} 状态的任务已取消(超时)", exc_info=False)
|
||||
elif task.exception():
|
||||
exc = task.exception()
|
||||
logger.warning(f"获取子心流 {stream_id} 状态出错: {exc}")
|
||||
else:
|
||||
result = task.result()
|
||||
results[stream_id] = result
|
||||
|
||||
logger.trace(f"成功获取 {len(results)} 个子心流的完整状态")
|
||||
return results
|
||||
|
||||
async def log_all_states(self):
|
||||
"""获取主心流状态和所有子心流的完整状态并写入日志文件。"""
|
||||
try:
|
||||
current_timestamp = time.time()
|
||||
|
||||
# main_mind = self.heartflow.current_mind
|
||||
# 获取 Mai 状态名称
|
||||
mai_state_name = self.heartflow.current_state.get_current_state().name
|
||||
|
||||
all_subflow_states = await self.get_all_subflow_states()
|
||||
|
||||
log_entry_base = {
|
||||
"timestamp": round(current_timestamp, 2),
|
||||
# "main_mind": main_mind,
|
||||
"mai_state": mai_state_name,
|
||||
"subflow_count": len(all_subflow_states),
|
||||
"subflows": [],
|
||||
}
|
||||
|
||||
if not all_subflow_states:
|
||||
# logger.debug("没有获取到任何子心流状态,仅记录主心流状态")
|
||||
with open(self._history_log_file_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(log_entry_base, ensure_ascii=False) + "\n")
|
||||
return
|
||||
|
||||
subflow_details = []
|
||||
items_snapshot = list(all_subflow_states.items())
|
||||
for stream_id, state in items_snapshot:
|
||||
group_name = stream_id
|
||||
try:
|
||||
chat_stream = chat_manager.get_stream(stream_id)
|
||||
if chat_stream:
|
||||
if chat_stream.group_info:
|
||||
group_name = chat_stream.group_info.group_name
|
||||
elif chat_stream.user_info:
|
||||
group_name = f"私聊_{chat_stream.user_info.user_nickname}"
|
||||
except Exception as e:
|
||||
logger.trace(f"无法获取 stream_id {stream_id} 的群组名: {e}")
|
||||
|
||||
interest_state = state.get("interest_state", {})
|
||||
|
||||
subflow_entry = {
|
||||
"stream_id": stream_id,
|
||||
"group_name": group_name,
|
||||
"sub_mind": state.get("current_mind", "未知"),
|
||||
"sub_chat_state": state.get("chat_state", "未知"),
|
||||
"interest_level": interest_state.get("interest_level", 0.0),
|
||||
"start_hfc_probability": interest_state.get("start_hfc_probability", 0.0),
|
||||
# "is_above_threshold": interest_state.get("is_above_threshold", False),
|
||||
}
|
||||
subflow_details.append(subflow_entry)
|
||||
|
||||
log_entry_base["subflows"] = subflow_details
|
||||
|
||||
with open(self._history_log_file_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(log_entry_base, ensure_ascii=False) + "\n")
|
||||
|
||||
except IOError as e:
|
||||
logger.error(f"写入状态日志到 {self._history_log_file_path} 出错: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"记录状态时发生意外错误: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
async def api_get_all_states(self):
|
||||
"""获取主心流和所有子心流的状态。"""
|
||||
try:
|
||||
current_timestamp = time.time()
|
||||
|
||||
# main_mind = self.heartflow.current_mind
|
||||
# 获取 Mai 状态名称
|
||||
mai_state_name = self.heartflow.current_state.get_current_state().name
|
||||
|
||||
all_subflow_states = await self.get_all_subflow_states()
|
||||
|
||||
log_entry_base = {
|
||||
"timestamp": round(current_timestamp, 2),
|
||||
# "main_mind": main_mind,
|
||||
"mai_state": mai_state_name,
|
||||
"subflow_count": len(all_subflow_states),
|
||||
"subflows": [],
|
||||
}
|
||||
|
||||
subflow_details = []
|
||||
items_snapshot = list(all_subflow_states.items())
|
||||
for stream_id, state in items_snapshot:
|
||||
group_name = stream_id
|
||||
try:
|
||||
chat_stream = chat_manager.get_stream(stream_id)
|
||||
if chat_stream:
|
||||
if chat_stream.group_info:
|
||||
group_name = chat_stream.group_info.group_name
|
||||
elif chat_stream.user_info:
|
||||
group_name = f"私聊_{chat_stream.user_info.user_nickname}"
|
||||
except Exception as e:
|
||||
logger.trace(f"无法获取 stream_id {stream_id} 的群组名: {e}")
|
||||
|
||||
interest_state = state.get("interest_state", {})
|
||||
|
||||
subflow_entry = {
|
||||
"stream_id": stream_id,
|
||||
"group_name": group_name,
|
||||
"sub_mind": state.get("current_mind", "未知"),
|
||||
"sub_chat_state": state.get("chat_state", "未知"),
|
||||
"interest_level": interest_state.get("interest_level", 0.0),
|
||||
"start_hfc_probability": interest_state.get("start_hfc_probability", 0.0),
|
||||
# "is_above_threshold": interest_state.get("is_above_threshold", False),
|
||||
}
|
||||
subflow_details.append(subflow_entry)
|
||||
|
||||
log_entry_base["subflows"] = subflow_details
|
||||
return subflow_details
|
||||
except Exception as e:
|
||||
logger.error(f"记录状态时发生意外错误: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
226
src/chat/heart_flow/mai_state_manager.py
Normal file
226
src/chat/heart_flow/mai_state_manager.py
Normal file
@@ -0,0 +1,226 @@
|
||||
import enum
|
||||
import time
|
||||
import random
|
||||
from typing import List, Tuple, Optional
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.manager.mood_manager import mood_manager
|
||||
from src.config.config import global_config
|
||||
|
||||
logger = get_logger("mai_state")
|
||||
|
||||
|
||||
# -- 状态相关的可配置参数 (可以从 glocal_config 加载) --
|
||||
# The line `enable_unlimited_hfc_chat = False` is setting a configuration parameter that controls
|
||||
# whether a specific debugging feature is enabled or not. When `enable_unlimited_hfc_chat` is set to
|
||||
# `False`, it means that the debugging feature for unlimited focused chatting is disabled.
|
||||
enable_unlimited_hfc_chat = True # 调试用:无限专注聊天
|
||||
# enable_unlimited_hfc_chat = False
|
||||
prevent_offline_state = True
|
||||
# 目前默认不启用OFFLINE状态
|
||||
|
||||
# 不同状态下普通聊天的最大消息数
|
||||
base_normal_chat_num = global_config.base_normal_chat_num
|
||||
base_focused_chat_num = global_config.base_focused_chat_num
|
||||
|
||||
|
||||
MAX_NORMAL_CHAT_NUM_PEEKING = int(base_normal_chat_num / 2)
|
||||
MAX_NORMAL_CHAT_NUM_NORMAL = base_normal_chat_num
|
||||
MAX_NORMAL_CHAT_NUM_FOCUSED = base_normal_chat_num + 1
|
||||
|
||||
# 不同状态下专注聊天的最大消息数
|
||||
MAX_FOCUSED_CHAT_NUM_PEEKING = int(base_focused_chat_num / 2)
|
||||
MAX_FOCUSED_CHAT_NUM_NORMAL = base_focused_chat_num
|
||||
MAX_FOCUSED_CHAT_NUM_FOCUSED = base_focused_chat_num + 2
|
||||
|
||||
# -- 状态定义 --
|
||||
|
||||
|
||||
class MaiState(enum.Enum):
|
||||
"""
|
||||
聊天状态:
|
||||
OFFLINE: 不在线:回复概率极低,不会进行任何聊天
|
||||
PEEKING: 看一眼手机:回复概率较低,会进行一些普通聊天
|
||||
NORMAL_CHAT: 正常看手机:回复概率较高,会进行一些普通聊天和少量的专注聊天
|
||||
FOCUSED_CHAT: 专注聊天:回复概率极高,会进行专注聊天和少量的普通聊天
|
||||
"""
|
||||
|
||||
OFFLINE = "不在线"
|
||||
PEEKING = "看一眼手机"
|
||||
NORMAL_CHAT = "正常看手机"
|
||||
FOCUSED_CHAT = "专心看手机"
|
||||
|
||||
def get_normal_chat_max_num(self):
|
||||
# 调试用
|
||||
if enable_unlimited_hfc_chat:
|
||||
return 1000
|
||||
|
||||
if self == MaiState.OFFLINE:
|
||||
return 0
|
||||
elif self == MaiState.PEEKING:
|
||||
return MAX_NORMAL_CHAT_NUM_PEEKING
|
||||
elif self == MaiState.NORMAL_CHAT:
|
||||
return MAX_NORMAL_CHAT_NUM_NORMAL
|
||||
elif self == MaiState.FOCUSED_CHAT:
|
||||
return MAX_NORMAL_CHAT_NUM_FOCUSED
|
||||
return None
|
||||
|
||||
def get_focused_chat_max_num(self):
|
||||
# 调试用
|
||||
if enable_unlimited_hfc_chat:
|
||||
return 1000
|
||||
|
||||
if self == MaiState.OFFLINE:
|
||||
return 0
|
||||
elif self == MaiState.PEEKING:
|
||||
return MAX_FOCUSED_CHAT_NUM_PEEKING
|
||||
elif self == MaiState.NORMAL_CHAT:
|
||||
return MAX_FOCUSED_CHAT_NUM_NORMAL
|
||||
elif self == MaiState.FOCUSED_CHAT:
|
||||
return MAX_FOCUSED_CHAT_NUM_FOCUSED
|
||||
return None
|
||||
|
||||
|
||||
class MaiStateInfo:
|
||||
def __init__(self):
|
||||
self.mai_status: MaiState = MaiState.NORMAL_CHAT # 初始状态改为 NORMAL_CHAT
|
||||
self.mai_status_history: List[Tuple[MaiState, float]] = [] # 历史状态,包含 状态,时间戳
|
||||
self.last_status_change_time: float = time.time() # 状态最后改变时间
|
||||
self.last_min_check_time: float = time.time() # 上次1分钟规则检查时间
|
||||
|
||||
# Mood management is now part of MaiStateInfo
|
||||
self.mood_manager = mood_manager # Use singleton instance
|
||||
|
||||
def update_mai_status(self, new_status: MaiState) -> bool:
|
||||
"""
|
||||
更新聊天状态。
|
||||
|
||||
Args:
|
||||
new_status: 新的 MaiState 状态。
|
||||
|
||||
Returns:
|
||||
bool: 如果状态实际发生了改变则返回 True,否则返回 False。
|
||||
"""
|
||||
if new_status != self.mai_status:
|
||||
self.mai_status = new_status
|
||||
current_time = time.time()
|
||||
self.last_status_change_time = current_time
|
||||
self.last_min_check_time = current_time # Reset 1-min check on any state change
|
||||
self.mai_status_history.append((new_status, current_time))
|
||||
logger.info(f"麦麦状态更新为: {self.mai_status.value}")
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def reset_state_timer(self):
|
||||
"""
|
||||
重置状态持续时间计时器和一分钟规则检查计时器。
|
||||
通常在状态保持不变但需要重新开始计时的情况下调用(例如,保持 OFFLINE)。
|
||||
"""
|
||||
current_time = time.time()
|
||||
self.last_status_change_time = current_time
|
||||
self.last_min_check_time = current_time # Also reset the 1-min check timer
|
||||
logger.debug("MaiStateInfo 状态计时器已重置。")
|
||||
|
||||
def get_mood_prompt(self) -> str:
|
||||
"""获取当前的心情提示词"""
|
||||
# Delegate to the internal mood manager
|
||||
return self.mood_manager.get_mood_prompt()
|
||||
|
||||
def get_current_state(self) -> MaiState:
|
||||
"""获取当前的 MaiState"""
|
||||
return self.mai_status
|
||||
|
||||
|
||||
class MaiStateManager:
|
||||
"""管理 Mai 的整体状态转换逻辑"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def check_and_decide_next_state(current_state_info: MaiStateInfo) -> Optional[MaiState]:
|
||||
"""
|
||||
根据当前状态和规则检查是否需要转换状态,并决定下一个状态。
|
||||
"""
|
||||
current_time = time.time()
|
||||
current_status = current_state_info.mai_status
|
||||
time_in_current_status = current_time - current_state_info.last_status_change_time
|
||||
_time_since_last_min_check = current_time - current_state_info.last_min_check_time
|
||||
next_state: Optional[MaiState] = None
|
||||
|
||||
# 辅助函数:根据 prevent_offline_state 标志调整目标状态
|
||||
def _resolve_offline(candidate_state: MaiState) -> MaiState:
|
||||
# 现在不再切换到OFFLINE,直接返回当前状态
|
||||
if candidate_state == MaiState.OFFLINE:
|
||||
return current_status
|
||||
return candidate_state
|
||||
|
||||
if current_status == MaiState.OFFLINE:
|
||||
logger.info("当前[离线],没看手机,思考要不要上线看看......")
|
||||
elif current_status == MaiState.PEEKING:
|
||||
logger.info("当前[看一眼手机],思考要不要继续聊下去......")
|
||||
elif current_status == MaiState.NORMAL_CHAT:
|
||||
logger.info("当前在[正常看手机]思考要不要继续聊下去......")
|
||||
elif current_status == MaiState.FOCUSED_CHAT:
|
||||
logger.info("当前在[专心看手机]思考要不要继续聊下去......")
|
||||
|
||||
# 1. 移除每分钟概率切换到OFFLINE的逻辑
|
||||
# if time_since_last_min_check >= 60:
|
||||
# if current_status != MaiState.OFFLINE:
|
||||
# if random.random() < 0.03: # 3% 概率切换到 OFFLINE
|
||||
# potential_next = MaiState.OFFLINE
|
||||
# resolved_next = _resolve_offline(potential_next)
|
||||
# logger.debug(f"概率触发下线,resolve 为 {resolved_next.value}")
|
||||
# # 只有当解析后的状态与当前状态不同时才设置 next_state
|
||||
# if resolved_next != current_status:
|
||||
# next_state = resolved_next
|
||||
|
||||
# 2. 状态持续时间规则 (只有在规则1没有触发状态改变时才检查)
|
||||
if next_state is None:
|
||||
time_limit_exceeded = False
|
||||
choices_list = []
|
||||
weights = []
|
||||
rule_id = ""
|
||||
|
||||
if current_status == MaiState.OFFLINE:
|
||||
# OFFLINE 状态不再自动切换,直接返回 None
|
||||
return None
|
||||
elif current_status == MaiState.PEEKING:
|
||||
if time_in_current_status >= 600: # PEEKING 最多持续 600 秒
|
||||
time_limit_exceeded = True
|
||||
rule_id = "2.2 (From PEEKING)"
|
||||
weights = [50, 50]
|
||||
choices_list = [MaiState.NORMAL_CHAT, MaiState.FOCUSED_CHAT]
|
||||
elif current_status == MaiState.NORMAL_CHAT:
|
||||
if time_in_current_status >= 300: # NORMAL_CHAT 最多持续 300 秒
|
||||
time_limit_exceeded = True
|
||||
rule_id = "2.3 (From NORMAL_CHAT)"
|
||||
weights = [50, 50]
|
||||
choices_list = [MaiState.PEEKING, MaiState.FOCUSED_CHAT]
|
||||
elif current_status == MaiState.FOCUSED_CHAT:
|
||||
if time_in_current_status >= 600: # FOCUSED_CHAT 最多持续 600 秒
|
||||
time_limit_exceeded = True
|
||||
rule_id = "2.4 (From FOCUSED_CHAT)"
|
||||
weights = [50, 50]
|
||||
choices_list = [MaiState.NORMAL_CHAT, MaiState.PEEKING]
|
||||
|
||||
if time_limit_exceeded:
|
||||
next_state_candidate = random.choices(choices_list, weights=weights, k=1)[0]
|
||||
resolved_candidate = _resolve_offline(next_state_candidate)
|
||||
logger.debug(
|
||||
f"规则{rule_id}:时间到,随机选择 {next_state_candidate.value},resolve 为 {resolved_candidate.value}"
|
||||
)
|
||||
next_state = resolved_candidate # 直接使用解析后的状态
|
||||
|
||||
# 注意:enable_unlimited_hfc_chat 优先级高于 prevent_offline_state
|
||||
# 如果触发了这个,它会覆盖上面规则2设置的 next_state
|
||||
if enable_unlimited_hfc_chat:
|
||||
logger.debug("调试用:开挂了,强制切换到专注聊天")
|
||||
next_state = MaiState.FOCUSED_CHAT
|
||||
|
||||
# --- 最终决策 --- #
|
||||
# 如果决定了下一个状态,且这个状态与当前状态不同,则返回下一个状态
|
||||
if next_state is not None and next_state != current_status:
|
||||
return next_state
|
||||
else:
|
||||
return None # 没有状态转换发生或无需重置计时器
|
||||
258
src/chat/heart_flow/observation/chatting_observation.py
Normal file
258
src/chat/heart_flow/observation/chatting_observation.py
Normal file
@@ -0,0 +1,258 @@
|
||||
from datetime import datetime
|
||||
from src.chat.models.utils_model import LLMRequest
|
||||
from src.config.config import global_config
|
||||
import traceback
|
||||
from src.chat.utils.chat_message_builder import (
|
||||
get_raw_msg_before_timestamp_with_chat,
|
||||
build_readable_messages,
|
||||
get_raw_msg_by_timestamp_with_chat,
|
||||
num_new_messages_since,
|
||||
get_person_id_list,
|
||||
)
|
||||
from src.chat.utils.prompt_builder import global_prompt_manager
|
||||
from typing import Optional
|
||||
import difflib
|
||||
from src.chat.message_receive.message import MessageRecv # 添加 MessageRecv 导入
|
||||
from src.chat.heart_flow.observation.observation import Observation
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
|
||||
from src.chat.utils.prompt_builder import Prompt
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
Prompt(
|
||||
"""这是qq群聊的聊天记录,请总结以下聊天记录的主题:
|
||||
{chat_logs}
|
||||
请用一句话概括,包括人物、事件和主要信息,不要分点。""",
|
||||
"chat_summary_group_prompt", # Template for group chat
|
||||
)
|
||||
|
||||
Prompt(
|
||||
"""这是你和{chat_target}的私聊记录,请总结以下聊天记录的主题:
|
||||
{chat_logs}
|
||||
请用一句话概括,包括事件,时间,和主要信息,不要分点。""",
|
||||
"chat_summary_private_prompt", # Template for private chat
|
||||
)
|
||||
# --- End Prompt Template Definition ---
|
||||
|
||||
|
||||
# 聊天观察
|
||||
class ChattingObservation(Observation):
|
||||
def __init__(self, chat_id):
|
||||
super().__init__(chat_id)
|
||||
self.chat_id = chat_id
|
||||
|
||||
# --- Initialize attributes (defaults) ---
|
||||
self.is_group_chat: bool = False
|
||||
self.chat_target_info: Optional[dict] = None
|
||||
# --- End Initialization ---
|
||||
|
||||
# --- Other attributes initialized in __init__ ---
|
||||
self.talking_message = []
|
||||
self.talking_message_str = ""
|
||||
self.talking_message_str_truncate = ""
|
||||
self.name = global_config.BOT_NICKNAME
|
||||
self.nick_name = global_config.BOT_ALIAS_NAMES
|
||||
self.max_now_obs_len = global_config.observation_context_size
|
||||
self.overlap_len = global_config.compressed_length
|
||||
self.mid_memorys = []
|
||||
self.max_mid_memory_len = global_config.compress_length_limit
|
||||
self.mid_memory_info = ""
|
||||
self.person_list = []
|
||||
self.oldest_messages = []
|
||||
self.oldest_messages_str = ""
|
||||
self.compressor_prompt = ""
|
||||
self.llm_summary = LLMRequest(
|
||||
model=global_config.llm_observation, temperature=0.7, max_tokens=300, request_type="chat_observation"
|
||||
)
|
||||
|
||||
async def initialize(self):
|
||||
self.is_group_chat, self.chat_target_info = await get_chat_type_and_target_info(self.chat_id)
|
||||
logger.debug(f"初始化observation: self.is_group_chat: {self.is_group_chat}")
|
||||
logger.debug(f"初始化observation: self.chat_target_info: {self.chat_target_info}")
|
||||
initial_messages = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 10)
|
||||
self.talking_message = initial_messages
|
||||
self.talking_message_str = await build_readable_messages(self.talking_message)
|
||||
|
||||
# 进行一次观察 返回观察结果observe_info
|
||||
def get_observe_info(self, ids=None):
|
||||
mid_memory_str = ""
|
||||
if ids:
|
||||
for id in ids:
|
||||
print(f"id:{id}")
|
||||
try:
|
||||
for mid_memory in self.mid_memorys:
|
||||
if mid_memory["id"] == id:
|
||||
mid_memory_by_id = mid_memory
|
||||
msg_str = ""
|
||||
for msg in mid_memory_by_id["messages"]:
|
||||
msg_str += f"{msg['detailed_plain_text']}"
|
||||
# time_diff = int((datetime.now().timestamp() - mid_memory_by_id["created_at"]) / 60)
|
||||
# mid_memory_str += f"距离现在{time_diff}分钟前:\n{msg_str}\n"
|
||||
mid_memory_str += f"{msg_str}\n"
|
||||
except Exception as e:
|
||||
logger.error(f"获取mid_memory_id失败: {e}")
|
||||
traceback.print_exc()
|
||||
return self.talking_message_str
|
||||
|
||||
return mid_memory_str + "现在群里正在聊:\n" + self.talking_message_str
|
||||
|
||||
else:
|
||||
mid_memory_str = "之前的聊天内容:\n"
|
||||
for mid_memory in self.mid_memorys:
|
||||
mid_memory_str += f"{mid_memory['theme']}\n"
|
||||
return mid_memory_str + "现在群里正在聊:\n" + self.talking_message_str
|
||||
|
||||
def serch_message_by_text(self, text: str) -> Optional[MessageRecv]:
|
||||
"""
|
||||
根据回复的纯文本
|
||||
1. 在talking_message中查找最新的,最匹配的消息
|
||||
2. 如果找到,则返回消息
|
||||
"""
|
||||
msg_list = []
|
||||
find_msg = None
|
||||
reverse_talking_message = list(reversed(self.talking_message))
|
||||
|
||||
for message in reverse_talking_message:
|
||||
if message["processed_plain_text"] == text:
|
||||
find_msg = message
|
||||
logger.debug(f"找到的锚定消息:find_msg: {find_msg}")
|
||||
break
|
||||
else:
|
||||
similarity = difflib.SequenceMatcher(None, text, message["processed_plain_text"]).ratio()
|
||||
msg_list.append({"message": message, "similarity": similarity})
|
||||
logger.debug(f"对锚定消息检查:message: {message['processed_plain_text']},similarity: {similarity}")
|
||||
if not find_msg:
|
||||
if msg_list:
|
||||
msg_list.sort(key=lambda x: x["similarity"], reverse=True)
|
||||
if msg_list[0]["similarity"] >= 0.5: # 只返回相似度大于等于0.5的消息
|
||||
find_msg = msg_list[0]["message"]
|
||||
else:
|
||||
logger.debug("没有找到锚定消息,相似度低")
|
||||
return None
|
||||
else:
|
||||
logger.debug("没有找到锚定消息,没有消息捕获")
|
||||
return None
|
||||
|
||||
# logger.debug(f"找到的锚定消息:find_msg: {find_msg}")
|
||||
group_info = find_msg.get("chat_info", {}).get("group_info")
|
||||
user_info = find_msg.get("chat_info", {}).get("user_info")
|
||||
|
||||
content_format = ""
|
||||
accept_format = ""
|
||||
template_items = {}
|
||||
|
||||
format_info = {"content_format": content_format, "accept_format": accept_format}
|
||||
template_info = {
|
||||
"template_items": template_items,
|
||||
}
|
||||
|
||||
message_info = {
|
||||
"platform": find_msg.get("platform"),
|
||||
"message_id": find_msg.get("message_id"),
|
||||
"time": find_msg.get("time"),
|
||||
"group_info": group_info,
|
||||
"user_info": user_info,
|
||||
"additional_config": find_msg.get("additional_config"),
|
||||
"format_info": format_info,
|
||||
"template_info": template_info,
|
||||
}
|
||||
message_dict = {
|
||||
"message_info": message_info,
|
||||
"raw_message": find_msg.get("processed_plain_text"),
|
||||
"detailed_plain_text": find_msg.get("processed_plain_text"),
|
||||
"processed_plain_text": find_msg.get("processed_plain_text"),
|
||||
}
|
||||
find_rec_msg = MessageRecv(message_dict)
|
||||
logger.debug(f"锚定消息处理后:find_rec_msg: {find_rec_msg}")
|
||||
return find_rec_msg
|
||||
|
||||
async def observe(self):
|
||||
# 自上一次观察的新消息
|
||||
new_messages_list = get_raw_msg_by_timestamp_with_chat(
|
||||
chat_id=self.chat_id,
|
||||
timestamp_start=self.last_observe_time,
|
||||
timestamp_end=datetime.now().timestamp(),
|
||||
limit=self.max_now_obs_len,
|
||||
limit_mode="latest",
|
||||
)
|
||||
|
||||
last_obs_time_mark = self.last_observe_time
|
||||
if new_messages_list:
|
||||
self.last_observe_time = new_messages_list[-1]["time"]
|
||||
self.talking_message.extend(new_messages_list)
|
||||
|
||||
if len(self.talking_message) > self.max_now_obs_len:
|
||||
# 计算需要移除的消息数量,保留最新的 max_now_obs_len 条
|
||||
messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len
|
||||
oldest_messages = self.talking_message[:messages_to_remove_count]
|
||||
self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的
|
||||
|
||||
oldest_messages_str = await build_readable_messages(
|
||||
messages=oldest_messages, timestamp_mode="normal", read_mark=0
|
||||
)
|
||||
|
||||
# --- Build prompt using template ---
|
||||
prompt = None # Initialize prompt as None
|
||||
try:
|
||||
# 构建 Prompt - 根据 is_group_chat 选择模板
|
||||
if self.is_group_chat:
|
||||
prompt_template_name = "chat_summary_group_prompt"
|
||||
prompt = await global_prompt_manager.format_prompt(
|
||||
prompt_template_name, chat_logs=oldest_messages_str
|
||||
)
|
||||
else:
|
||||
# For private chat, add chat_target to the prompt variables
|
||||
prompt_template_name = "chat_summary_private_prompt"
|
||||
# Determine the target name for the prompt
|
||||
chat_target_name = "对方" # Default fallback
|
||||
if self.chat_target_info:
|
||||
# Prioritize person_name, then nickname
|
||||
chat_target_name = (
|
||||
self.chat_target_info.get("person_name")
|
||||
or self.chat_target_info.get("user_nickname")
|
||||
or chat_target_name
|
||||
)
|
||||
|
||||
# Format the private chat prompt
|
||||
prompt = await global_prompt_manager.format_prompt(
|
||||
prompt_template_name,
|
||||
# Assuming the private prompt template uses {chat_target}
|
||||
chat_target=chat_target_name,
|
||||
chat_logs=oldest_messages_str,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"构建总结 Prompt 失败 for chat {self.chat_id}: {e}")
|
||||
# prompt remains None
|
||||
|
||||
if prompt: # Check if prompt was built successfully
|
||||
self.compressor_prompt = prompt
|
||||
self.oldest_messages = oldest_messages
|
||||
self.oldest_messages_str = oldest_messages_str
|
||||
|
||||
self.talking_message_str = await build_readable_messages(
|
||||
messages=self.talking_message,
|
||||
timestamp_mode="lite",
|
||||
read_mark=last_obs_time_mark,
|
||||
)
|
||||
self.talking_message_str_truncate = await build_readable_messages(
|
||||
messages=self.talking_message,
|
||||
timestamp_mode="normal",
|
||||
read_mark=last_obs_time_mark,
|
||||
truncate=True,
|
||||
)
|
||||
|
||||
self.person_list = await get_person_id_list(self.talking_message)
|
||||
|
||||
# print(f"self.11111person_list: {self.person_list}")
|
||||
|
||||
logger.trace(
|
||||
f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.talking_message_str}"
|
||||
)
|
||||
|
||||
async def has_new_messages_since(self, timestamp: float) -> bool:
|
||||
"""检查指定时间戳之后是否有新消息"""
|
||||
count = num_new_messages_since(chat_id=self.chat_id, timestamp_start=timestamp)
|
||||
return count > 0
|
||||
82
src/chat/heart_flow/observation/hfcloop_observation.py
Normal file
82
src/chat/heart_flow/observation/hfcloop_observation.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# 定义了来自外部世界的信息
|
||||
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
|
||||
from datetime import datetime
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
|
||||
from typing import List
|
||||
# Import the new utility function
|
||||
|
||||
logger = get_logger("observation")
|
||||
|
||||
|
||||
# 所有观察的基类
|
||||
class HFCloopObservation:
|
||||
def __init__(self, observe_id):
|
||||
self.observe_info = ""
|
||||
self.observe_id = observe_id
|
||||
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
|
||||
self.history_loop: List[CycleDetail] = []
|
||||
|
||||
def get_observe_info(self):
|
||||
return self.observe_info
|
||||
|
||||
def add_loop_info(self, loop_info: CycleDetail):
|
||||
# logger.debug(f"添加循环信息111111111111111111111111111111111111: {loop_info}")
|
||||
# print(f"添加循环信息111111111111111111111111111111111111: {loop_info}")
|
||||
print(f"action_taken: {loop_info.action_taken}")
|
||||
print(f"action_type: {loop_info.action_type}")
|
||||
print(f"response_info: {loop_info.response_info}")
|
||||
self.history_loop.append(loop_info)
|
||||
|
||||
async def observe(self):
|
||||
recent_active_cycles: List[CycleDetail] = []
|
||||
for cycle in reversed(self.history_loop):
|
||||
# 只关心实际执行了动作的循环
|
||||
if cycle.action_taken:
|
||||
recent_active_cycles.append(cycle)
|
||||
# 最多找最近的3个活动循环
|
||||
if len(recent_active_cycles) == 3:
|
||||
break
|
||||
|
||||
cycle_info_block = ""
|
||||
consecutive_text_replies = 0
|
||||
responses_for_prompt = []
|
||||
|
||||
# 检查这最近的活动循环中有多少是连续的文本回复 (从最近的开始看)
|
||||
for cycle in recent_active_cycles:
|
||||
if cycle.action_type == "reply":
|
||||
consecutive_text_replies += 1
|
||||
# 获取回复内容,如果不存在则返回'[空回复]'
|
||||
response_text = cycle.response_info.get("response_text", "[空回复]")
|
||||
responses_for_prompt.append(response_text)
|
||||
else:
|
||||
break
|
||||
|
||||
# 根据连续文本回复的数量构建提示信息
|
||||
# 注意: responses_for_prompt 列表是从最近到最远排序的
|
||||
if consecutive_text_replies >= 3: # 如果最近的三个活动都是文本回复
|
||||
cycle_info_block = f'你已经连续回复了三条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}",第三近: "{responses_for_prompt[2]}")。你回复的有点多了,请注意'
|
||||
elif consecutive_text_replies == 2: # 如果最近的两个活动是文本回复
|
||||
cycle_info_block = f'你已经连续回复了两条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}"),请注意'
|
||||
elif consecutive_text_replies == 1: # 如果最近的一个活动是文本回复
|
||||
cycle_info_block = f'你刚刚已经回复一条消息(内容: "{responses_for_prompt[0]}")'
|
||||
|
||||
# 包装提示块,增加可读性,即使没有连续回复也给个标记
|
||||
if cycle_info_block:
|
||||
cycle_info_block = f"\n你最近的回复\n{cycle_info_block}\n"
|
||||
else:
|
||||
# 如果最近的活动循环不是文本回复,或者没有活动循环
|
||||
cycle_info_block = "\n"
|
||||
|
||||
# 获取history_loop中最新添加的
|
||||
if self.history_loop:
|
||||
last_loop = self.history_loop[-1]
|
||||
start_time = last_loop.start_time
|
||||
end_time = last_loop.end_time
|
||||
if start_time is not None and end_time is not None:
|
||||
time_diff = int(end_time - start_time)
|
||||
cycle_info_block += f"\n距离你上一次阅读消息已经过去了{time_diff}分钟\n"
|
||||
else:
|
||||
cycle_info_block += "\n无法获取上一次阅读消息的时间\n"
|
||||
|
||||
self.observe_info = cycle_info_block
|
||||
55
src/chat/heart_flow/observation/memory_observation.py
Normal file
55
src/chat/heart_flow/observation/memory_observation.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from src.chat.heart_flow.observation.observation import Observation
|
||||
from datetime import datetime
|
||||
from src.common.logger_manager import get_logger
|
||||
import traceback
|
||||
|
||||
# Import the new utility function
|
||||
from src.chat.memory_system.Hippocampus import HippocampusManager
|
||||
import jieba
|
||||
from typing import List
|
||||
|
||||
logger = get_logger("memory")
|
||||
|
||||
|
||||
class MemoryObservation(Observation):
|
||||
def __init__(self, observe_id):
|
||||
super().__init__(observe_id)
|
||||
self.observe_info: str = ""
|
||||
self.context: str = ""
|
||||
self.running_memory: List[dict] = []
|
||||
|
||||
def get_observe_info(self):
|
||||
for memory in self.running_memory:
|
||||
self.observe_info += f"{memory['topic']}:{memory['content']}\n"
|
||||
return self.observe_info
|
||||
|
||||
async def observe(self):
|
||||
# ---------- 2. 获取记忆 ----------
|
||||
try:
|
||||
# 从聊天内容中提取关键词
|
||||
chat_words = set(jieba.cut(self.context))
|
||||
# 过滤掉停用词和单字词
|
||||
keywords = [word for word in chat_words if len(word) > 1]
|
||||
# 去重并限制数量
|
||||
keywords = list(set(keywords))[:5]
|
||||
|
||||
logger.debug(f"取的关键词: {keywords}")
|
||||
|
||||
# 调用记忆系统获取相关记忆
|
||||
related_memory = await HippocampusManager.get_instance().get_memory_from_topic(
|
||||
valid_keywords=keywords, max_memory_num=3, max_memory_length=2, max_depth=3
|
||||
)
|
||||
|
||||
logger.debug(f"获取到的记忆: {related_memory}")
|
||||
|
||||
if related_memory:
|
||||
for topic, memory in related_memory:
|
||||
# 将记忆添加到 running_memory
|
||||
self.running_memory.append(
|
||||
{"topic": topic, "content": memory, "timestamp": datetime.now().isoformat()}
|
||||
)
|
||||
logger.debug(f"添加新记忆: {topic} - {memory}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"观察 记忆时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
17
src/chat/heart_flow/observation/observation.py
Normal file
17
src/chat/heart_flow/observation/observation.py
Normal file
@@ -0,0 +1,17 @@
|
||||
# 定义了来自外部世界的信息
|
||||
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
|
||||
from datetime import datetime
|
||||
from src.common.logger_manager import get_logger
|
||||
|
||||
logger = get_logger("observation")
|
||||
|
||||
|
||||
# 所有观察的基类
|
||||
class Observation:
|
||||
def __init__(self, observe_id):
|
||||
self.observe_info = ""
|
||||
self.observe_id = observe_id
|
||||
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
|
||||
|
||||
async def observe(self):
|
||||
pass
|
||||
34
src/chat/heart_flow/observation/working_observation.py
Normal file
34
src/chat/heart_flow/observation/working_observation.py
Normal file
@@ -0,0 +1,34 @@
|
||||
# 定义了来自外部世界的信息
|
||||
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
|
||||
from datetime import datetime
|
||||
from src.common.logger_manager import get_logger
|
||||
|
||||
# Import the new utility function
|
||||
|
||||
logger = get_logger("observation")
|
||||
|
||||
|
||||
# 所有观察的基类
|
||||
class WorkingObservation:
|
||||
def __init__(self, observe_id):
|
||||
self.observe_info = ""
|
||||
self.observe_id = observe_id
|
||||
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
|
||||
self.history_loop = []
|
||||
self.structured_info = []
|
||||
|
||||
def get_observe_info(self):
|
||||
return self.structured_info
|
||||
|
||||
def add_structured_info(self, structured_info: dict):
|
||||
self.structured_info.append(structured_info)
|
||||
|
||||
async def observe(self):
|
||||
observed_structured_infos = []
|
||||
for structured_info in self.structured_info:
|
||||
if structured_info.get("ttl") > 0:
|
||||
structured_info["ttl"] -= 1
|
||||
observed_structured_infos.append(structured_info)
|
||||
logger.debug(f"观察到结构化信息仍旧在: {structured_info}")
|
||||
|
||||
self.structured_info = observed_structured_infos
|
||||
361
src/chat/heart_flow/sub_heartflow.py
Normal file
361
src/chat/heart_flow/sub_heartflow.py
Normal file
@@ -0,0 +1,361 @@
|
||||
from .observation.observation import Observation
|
||||
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Optional, List, Dict, Tuple, Callable, Coroutine
|
||||
import traceback
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.chat.message_receive.message import MessageRecv
|
||||
from src.chat.message_receive.chat_stream import chat_manager
|
||||
from src.chat.focus_chat.heartFC_chat import HeartFChatting
|
||||
from src.chat.normal_chat.normal_chat import NormalChat
|
||||
from src.chat.heart_flow.mai_state_manager import MaiStateInfo
|
||||
from src.chat.heart_flow.chat_state_info import ChatState, ChatStateInfo
|
||||
from .utils_chat import get_chat_type_and_target_info
|
||||
from .interest_chatting import InterestChatting
|
||||
|
||||
|
||||
logger = get_logger("sub_heartflow")
|
||||
|
||||
|
||||
class SubHeartflow:
|
||||
def __init__(
|
||||
self,
|
||||
subheartflow_id,
|
||||
mai_states: MaiStateInfo,
|
||||
hfc_no_reply_callback: Callable[[], Coroutine[None, None, None]],
|
||||
):
|
||||
"""子心流初始化函数
|
||||
|
||||
Args:
|
||||
subheartflow_id: 子心流唯一标识符
|
||||
mai_states: 麦麦状态信息实例
|
||||
hfc_no_reply_callback: HFChatting 连续不回复时触发的回调
|
||||
"""
|
||||
# 基础属性,两个值是一样的
|
||||
self.subheartflow_id = subheartflow_id
|
||||
self.chat_id = subheartflow_id
|
||||
self.hfc_no_reply_callback = hfc_no_reply_callback
|
||||
|
||||
# 麦麦的状态
|
||||
self.mai_states = mai_states
|
||||
|
||||
# 这个聊天流的状态
|
||||
self.chat_state: ChatStateInfo = ChatStateInfo()
|
||||
self.chat_state_changed_time: float = time.time()
|
||||
self.chat_state_last_time: float = 0
|
||||
self.history_chat_state: List[Tuple[ChatState, float]] = []
|
||||
|
||||
# --- Initialize attributes ---
|
||||
self.is_group_chat: bool = False
|
||||
self.chat_target_info: Optional[dict] = None
|
||||
# --- End Initialization ---
|
||||
|
||||
# 兴趣检测器
|
||||
self.interest_chatting: InterestChatting = InterestChatting()
|
||||
|
||||
# 活动状态管理
|
||||
self.should_stop = False # 停止标志
|
||||
self.task: Optional[asyncio.Task] = None # 后台任务
|
||||
|
||||
# 随便水群 normal_chat 和 认真水群 focus_chat 实例
|
||||
# CHAT模式激活 随便水群 FOCUS模式激活 认真水群
|
||||
self.heart_fc_instance: Optional[HeartFChatting] = None # 该sub_heartflow的HeartFChatting实例
|
||||
self.normal_chat_instance: Optional[NormalChat] = None # 该sub_heartflow的NormalChat实例
|
||||
|
||||
# 观察,目前只有聊天观察,可以载入多个
|
||||
# 负责对处理过的消息进行观察
|
||||
self.observations: List[ChattingObservation] = [] # 观察列表
|
||||
# self.running_knowledges = [] # 运行中的知识,待完善
|
||||
|
||||
# 日志前缀 - Moved determination to initialize
|
||||
self.log_prefix = str(subheartflow_id) # Initial default prefix
|
||||
|
||||
async def initialize(self):
|
||||
"""异步初始化方法,创建兴趣流并确定聊天类型"""
|
||||
|
||||
# --- Use utility function to determine chat type and fetch info ---
|
||||
self.is_group_chat, self.chat_target_info = await get_chat_type_and_target_info(self.chat_id)
|
||||
# Update log prefix after getting info (potential stream name)
|
||||
self.log_prefix = (
|
||||
chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id
|
||||
) # Keep this line or adjust if utils provides name
|
||||
logger.debug(
|
||||
f"SubHeartflow {self.chat_id} initialized: is_group={self.is_group_chat}, target_info={self.chat_target_info}"
|
||||
)
|
||||
# --- End using utility function ---
|
||||
|
||||
# Initialize interest system (existing logic)
|
||||
await self.interest_chatting.initialize()
|
||||
logger.debug(f"{self.log_prefix} InterestChatting 实例已初始化。")
|
||||
|
||||
def update_last_chat_state_time(self):
|
||||
self.chat_state_last_time = time.time() - self.chat_state_changed_time
|
||||
|
||||
async def _stop_normal_chat(self):
|
||||
"""
|
||||
停止 NormalChat 实例
|
||||
切出 CHAT 状态时使用
|
||||
"""
|
||||
if self.normal_chat_instance:
|
||||
logger.info(f"{self.log_prefix} 离开CHAT模式,结束 随便水群")
|
||||
try:
|
||||
await self.normal_chat_instance.stop_chat() # 调用 stop_chat
|
||||
except Exception as e:
|
||||
logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
async def _start_normal_chat(self, rewind=False) -> bool:
|
||||
"""
|
||||
启动 NormalChat 实例,并进行异步初始化。
|
||||
进入 CHAT 状态时使用。
|
||||
确保 HeartFChatting 已停止。
|
||||
"""
|
||||
await self._stop_heart_fc_chat() # 确保 专注聊天已停止
|
||||
|
||||
log_prefix = self.log_prefix
|
||||
try:
|
||||
# 获取聊天流并创建 NormalChat 实例 (同步部分)
|
||||
chat_stream = chat_manager.get_stream(self.chat_id)
|
||||
if not chat_stream:
|
||||
logger.error(f"{log_prefix} 无法获取 chat_stream,无法启动 NormalChat。")
|
||||
return False
|
||||
if rewind:
|
||||
self.normal_chat_instance = NormalChat(chat_stream=chat_stream, interest_dict=self.get_interest_dict())
|
||||
else:
|
||||
self.normal_chat_instance = NormalChat(chat_stream=chat_stream)
|
||||
|
||||
# 进行异步初始化
|
||||
await self.normal_chat_instance.initialize()
|
||||
|
||||
# 启动聊天任务
|
||||
logger.info(f"{log_prefix} 开始普通聊天,随便水群...")
|
||||
await self.normal_chat_instance.start_chat() # start_chat now ensures init is called again if needed
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 启动 NormalChat 或其初始化时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
self.normal_chat_instance = None # 启动/初始化失败,清理实例
|
||||
return False
|
||||
|
||||
async def _stop_heart_fc_chat(self):
|
||||
"""停止并清理 HeartFChatting 实例"""
|
||||
if self.heart_fc_instance:
|
||||
logger.debug(f"{self.log_prefix} 结束专注聊天...")
|
||||
try:
|
||||
await self.heart_fc_instance.shutdown()
|
||||
except Exception as e:
|
||||
logger.error(f"{self.log_prefix} 关闭 HeartFChatting 实例时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
finally:
|
||||
# 无论是否成功关闭,都清理引用
|
||||
self.heart_fc_instance = None
|
||||
|
||||
async def _start_heart_fc_chat(self) -> bool:
|
||||
"""启动 HeartFChatting 实例,确保 NormalChat 已停止"""
|
||||
await self._stop_normal_chat() # 确保普通聊天监控已停止
|
||||
self.clear_interest_dict() # 清理兴趣字典,准备专注聊天
|
||||
|
||||
log_prefix = self.log_prefix
|
||||
# 如果实例已存在,检查其循环任务状态
|
||||
if self.heart_fc_instance:
|
||||
# 如果任务已完成或不存在,则尝试重新启动
|
||||
if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done():
|
||||
logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...")
|
||||
try:
|
||||
await self.heart_fc_instance.start() # 启动循环
|
||||
logger.info(f"{log_prefix} HeartFChatting 循环已启动。")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
return False # 启动失败
|
||||
else:
|
||||
# 任务正在运行
|
||||
logger.debug(f"{log_prefix} HeartFChatting 已在运行中。")
|
||||
return True # 已经在运行
|
||||
|
||||
# 如果实例不存在,则创建并启动
|
||||
logger.info(f"{log_prefix} 麦麦准备开始专注聊天...")
|
||||
try:
|
||||
# 创建 HeartFChatting 实例,并传递 从构造函数传入的 回调函数
|
||||
self.heart_fc_instance = HeartFChatting(
|
||||
chat_id=self.subheartflow_id,
|
||||
observations=self.observations, # 传递所有观察者
|
||||
on_consecutive_no_reply_callback=self.hfc_no_reply_callback, # <-- Use stored callback
|
||||
)
|
||||
|
||||
# 初始化并启动 HeartFChatting
|
||||
if await self.heart_fc_instance._initialize():
|
||||
await self.heart_fc_instance.start()
|
||||
logger.debug(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"{log_prefix} HeartFChatting 初始化失败,无法进入专注模式。")
|
||||
self.heart_fc_instance = None # 初始化失败,清理实例
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
self.heart_fc_instance = None # 创建或初始化异常,清理实例
|
||||
return False
|
||||
|
||||
async def change_chat_state(self, new_state: "ChatState"):
|
||||
"""更新sub_heartflow的聊天状态,并管理 HeartFChatting 和 NormalChat 实例及任务"""
|
||||
current_state = self.chat_state.chat_status
|
||||
|
||||
if current_state == new_state:
|
||||
return
|
||||
|
||||
log_prefix = self.log_prefix
|
||||
state_changed = False # 标记状态是否实际发生改变
|
||||
|
||||
# --- 状态转换逻辑 ---
|
||||
if new_state == ChatState.CHAT:
|
||||
# 移除限额检查逻辑
|
||||
logger.debug(f"{log_prefix} 准备进入或保持 聊天 状态")
|
||||
if current_state == ChatState.FOCUSED:
|
||||
if await self._start_normal_chat(rewind=False):
|
||||
# logger.info(f"{log_prefix} 成功进入或保持 NormalChat 状态。")
|
||||
state_changed = True
|
||||
else:
|
||||
logger.error(f"{log_prefix} 从FOCUSED状态启动 NormalChat 失败,无法进入 CHAT 状态。")
|
||||
# 考虑是否需要回滚状态或采取其他措施
|
||||
return # 启动失败,不改变状态
|
||||
else:
|
||||
if await self._start_normal_chat(rewind=True):
|
||||
# logger.info(f"{log_prefix} 成功进入或保持 NormalChat 状态。")
|
||||
state_changed = True
|
||||
else:
|
||||
logger.error(f"{log_prefix} 从ABSENT状态启动 NormalChat 失败,无法进入 CHAT 状态。")
|
||||
# 考虑是否需要回滚状态或采取其他措施
|
||||
return # 启动失败,不改变状态
|
||||
|
||||
elif new_state == ChatState.FOCUSED:
|
||||
# 移除限额检查逻辑
|
||||
logger.debug(f"{log_prefix} 准备进入或保持 专注聊天 状态")
|
||||
if await self._start_heart_fc_chat():
|
||||
logger.debug(f"{log_prefix} 成功进入或保持 HeartFChatting 状态。")
|
||||
state_changed = True
|
||||
else:
|
||||
logger.error(f"{log_prefix} 启动 HeartFChatting 失败,无法进入 FOCUSED 状态。")
|
||||
# 启动失败,状态回滚到之前的状态或ABSENT?这里保持不改变
|
||||
return # 启动失败,不改变状态
|
||||
|
||||
elif new_state == ChatState.ABSENT:
|
||||
logger.info(f"{log_prefix} 进入 ABSENT 状态,停止所有聊天活动...")
|
||||
self.clear_interest_dict()
|
||||
|
||||
await self._stop_normal_chat()
|
||||
await self._stop_heart_fc_chat()
|
||||
state_changed = True # 总是可以成功转换到 ABSENT
|
||||
|
||||
# --- 更新状态和最后活动时间 ---
|
||||
if state_changed:
|
||||
self.update_last_chat_state_time()
|
||||
self.history_chat_state.append((current_state, self.chat_state_last_time))
|
||||
|
||||
logger.info(
|
||||
f"{log_prefix} 麦麦的聊天状态从 {current_state.value} (持续了 {int(self.chat_state_last_time)} 秒) 变更为 {new_state.value}"
|
||||
)
|
||||
|
||||
self.chat_state.chat_status = new_state
|
||||
self.chat_state_last_time = 0
|
||||
self.chat_state_changed_time = time.time()
|
||||
else:
|
||||
# 如果因为某些原因(如启动失败)没有成功改变状态,记录一下
|
||||
logger.debug(
|
||||
f"{log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。"
|
||||
)
|
||||
|
||||
async def subheartflow_start_working(self):
|
||||
"""启动子心流的后台任务
|
||||
|
||||
功能说明:
|
||||
- 负责子心流的主要后台循环
|
||||
- 每30秒检查一次停止标志
|
||||
"""
|
||||
logger.trace(f"{self.log_prefix} 子心流开始工作...")
|
||||
|
||||
while not self.should_stop:
|
||||
await asyncio.sleep(30) # 30秒检查一次停止标志
|
||||
|
||||
logger.info(f"{self.log_prefix} 子心流后台任务已停止。")
|
||||
|
||||
def add_observation(self, observation: Observation):
|
||||
for existing_obs in self.observations:
|
||||
if existing_obs.observe_id == observation.observe_id:
|
||||
return
|
||||
self.observations.append(observation)
|
||||
|
||||
def remove_observation(self, observation: Observation):
|
||||
if observation in self.observations:
|
||||
self.observations.remove(observation)
|
||||
|
||||
def get_all_observations(self) -> list[Observation]:
|
||||
return self.observations
|
||||
|
||||
def _get_primary_observation(self) -> Optional[ChattingObservation]:
|
||||
if self.observations and isinstance(self.observations[0], ChattingObservation):
|
||||
return self.observations[0]
|
||||
logger.warning(f"SubHeartflow {self.subheartflow_id} 没有找到有效的 ChattingObservation")
|
||||
return None
|
||||
|
||||
async def get_interest_state(self) -> dict:
|
||||
return await self.interest_chatting.get_state()
|
||||
|
||||
def get_normal_chat_last_speak_time(self) -> float:
|
||||
if self.normal_chat_instance:
|
||||
return self.normal_chat_instance.last_speak_time
|
||||
return 0
|
||||
|
||||
def get_interest_dict(self) -> Dict[str, tuple[MessageRecv, float, bool]]:
|
||||
return self.interest_chatting.interest_dict
|
||||
|
||||
def clear_interest_dict(self):
|
||||
self.interest_chatting.interest_dict.clear()
|
||||
|
||||
async def get_full_state(self) -> dict:
|
||||
"""获取子心流的完整状态,包括兴趣、思维和聊天状态。"""
|
||||
interest_state = await self.get_interest_state()
|
||||
return {
|
||||
"interest_state": interest_state,
|
||||
"chat_state": self.chat_state.chat_status.value,
|
||||
"chat_state_changed_time": self.chat_state_changed_time,
|
||||
}
|
||||
|
||||
async def shutdown(self):
|
||||
"""安全地关闭子心流及其管理的任务"""
|
||||
if self.should_stop:
|
||||
logger.info(f"{self.log_prefix} 子心流已在关闭过程中。")
|
||||
return
|
||||
|
||||
logger.info(f"{self.log_prefix} 开始关闭子心流...")
|
||||
self.should_stop = True # 标记为停止,让后台任务退出
|
||||
|
||||
# 使用新的停止方法
|
||||
await self._stop_normal_chat()
|
||||
await self._stop_heart_fc_chat()
|
||||
|
||||
# 停止兴趣更新任务
|
||||
if self.interest_chatting:
|
||||
logger.info(f"{self.log_prefix} 停止兴趣系统后台任务...")
|
||||
await self.interest_chatting.stop_updates()
|
||||
|
||||
# 取消可能存在的旧后台任务 (self.task)
|
||||
if self.task and not self.task.done():
|
||||
logger.debug(f"{self.log_prefix} 取消子心流主任务 (Shutdown)...")
|
||||
self.task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(self.task, timeout=1.0) # 给点时间响应取消
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"{self.log_prefix} 子心流主任务已取消 (Shutdown)。")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"{self.log_prefix} 等待子心流主任务取消超时 (Shutdown)。")
|
||||
except Exception as e:
|
||||
logger.error(f"{self.log_prefix} 等待子心流主任务取消时发生错误 (Shutdown): {e}")
|
||||
|
||||
self.task = None # 清理任务引用
|
||||
self.chat_state.chat_status = ChatState.ABSENT # 状态重置为不参与
|
||||
|
||||
logger.info(f"{self.log_prefix} 子心流关闭完成。")
|
||||
849
src/chat/heart_flow/subheartflow_manager.py
Normal file
849
src/chat/heart_flow/subheartflow_manager.py
Normal file
@@ -0,0 +1,849 @@
|
||||
import asyncio
|
||||
import time
|
||||
import random
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
import json # 导入 json 模块
|
||||
import functools # <-- 新增导入
|
||||
|
||||
# 导入日志模块
|
||||
from src.common.logger_manager import get_logger
|
||||
|
||||
# 导入聊天流管理模块
|
||||
from src.chat.message_receive.chat_stream import chat_manager
|
||||
|
||||
# 导入心流相关类
|
||||
from src.chat.heart_flow.sub_heartflow import SubHeartflow, ChatState
|
||||
from src.chat.heart_flow.mai_state_manager import MaiStateInfo
|
||||
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
|
||||
|
||||
# 导入LLM请求工具
|
||||
from src.chat.models.utils_model import LLMRequest
|
||||
from src.config.config import global_config
|
||||
from src.individuality.individuality import Individuality
|
||||
import traceback
|
||||
|
||||
|
||||
# 初始化日志记录器
|
||||
|
||||
logger = get_logger("subheartflow_manager")
|
||||
|
||||
# 子心流管理相关常量
|
||||
INACTIVE_THRESHOLD_SECONDS = 3600 # 子心流不活跃超时时间(秒)
|
||||
NORMAL_CHAT_TIMEOUT_SECONDS = 30 * 60 # 30分钟
|
||||
|
||||
|
||||
async def _try_set_subflow_absent_internal(subflow: "SubHeartflow", log_prefix: str) -> bool:
|
||||
"""
|
||||
尝试将给定的子心流对象状态设置为 ABSENT (内部方法,不处理锁)。
|
||||
|
||||
Args:
|
||||
subflow: 子心流对象。
|
||||
log_prefix: 用于日志记录的前缀 (例如 "[子心流管理]" 或 "[停用]")。
|
||||
|
||||
Returns:
|
||||
bool: 如果状态成功变为 ABSENT 或原本就是 ABSENT,返回 True;否则返回 False。
|
||||
"""
|
||||
flow_id = subflow.subheartflow_id
|
||||
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
|
||||
|
||||
if subflow.chat_state.chat_status != ChatState.ABSENT:
|
||||
logger.debug(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT")
|
||||
try:
|
||||
await subflow.change_chat_state(ChatState.ABSENT)
|
||||
# 再次检查以确认状态已更改 (change_chat_state 内部应确保)
|
||||
if subflow.chat_state.chat_status == ChatState.ABSENT:
|
||||
return True
|
||||
else:
|
||||
logger.warning(
|
||||
f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT 时失败: {e}", exc_info=True)
|
||||
return False
|
||||
else:
|
||||
logger.debug(f"{log_prefix} {stream_name} 已是 ABSENT 状态")
|
||||
return True # 已经是目标状态,视为成功
|
||||
|
||||
|
||||
class SubHeartflowManager:
|
||||
"""管理所有活跃的 SubHeartflow 实例。"""
|
||||
|
||||
def __init__(self, mai_state_info: MaiStateInfo):
|
||||
self.subheartflows: Dict[Any, "SubHeartflow"] = {}
|
||||
self._lock = asyncio.Lock() # 用于保护 self.subheartflows 的访问
|
||||
self.mai_state_info: MaiStateInfo = mai_state_info # 存储传入的 MaiStateInfo 实例
|
||||
|
||||
# 为 LLM 状态评估创建一个 LLMRequest 实例
|
||||
# 使用与 Heartflow 相同的模型和参数
|
||||
self.llm_state_evaluator = LLMRequest(
|
||||
model=global_config.llm_heartflow, # 与 Heartflow 一致
|
||||
temperature=0.6, # 与 Heartflow 一致
|
||||
max_tokens=1000, # 与 Heartflow 一致 (虽然可能不需要这么多)
|
||||
request_type="subheartflow_state_eval", # 保留特定的请求类型
|
||||
)
|
||||
|
||||
async def force_change_state(self, subflow_id: Any, target_state: ChatState) -> bool:
|
||||
"""强制改变指定子心流的状态"""
|
||||
async with self._lock:
|
||||
subflow = self.subheartflows.get(subflow_id)
|
||||
if not subflow:
|
||||
logger.warning(f"[强制状态转换]尝试转换不存在的子心流{subflow_id} 到 {target_state.value}")
|
||||
return False
|
||||
await subflow.change_chat_state(target_state)
|
||||
logger.info(f"[强制状态转换]子心流 {subflow_id} 已转换到 {target_state.value}")
|
||||
return True
|
||||
|
||||
def get_all_subheartflows(self) -> List["SubHeartflow"]:
|
||||
"""获取所有当前管理的 SubHeartflow 实例列表 (快照)。"""
|
||||
return list(self.subheartflows.values())
|
||||
|
||||
async def get_or_create_subheartflow(self, subheartflow_id: Any) -> Optional["SubHeartflow"]:
|
||||
"""获取或创建指定ID的子心流实例
|
||||
|
||||
Args:
|
||||
subheartflow_id: 子心流唯一标识符
|
||||
mai_states 参数已被移除,使用 self.mai_state_info
|
||||
|
||||
Returns:
|
||||
成功返回SubHeartflow实例,失败返回None
|
||||
"""
|
||||
async with self._lock:
|
||||
# 检查是否已存在该子心流
|
||||
if subheartflow_id in self.subheartflows:
|
||||
subflow = self.subheartflows[subheartflow_id]
|
||||
if subflow.should_stop:
|
||||
logger.warning(f"尝试获取已停止的子心流 {subheartflow_id},正在重新激活")
|
||||
subflow.should_stop = False # 重置停止标志
|
||||
|
||||
subflow.last_active_time = time.time() # 更新活跃时间
|
||||
# logger.debug(f"获取到已存在的子心流: {subheartflow_id}")
|
||||
return subflow
|
||||
|
||||
try:
|
||||
# --- 使用 functools.partial 创建 HFC 回调 --- #
|
||||
# 将 manager 的 _handle_hfc_no_reply 方法与当前的 subheartflow_id 绑定
|
||||
hfc_callback = functools.partial(self._handle_hfc_no_reply, subheartflow_id)
|
||||
# --- 结束创建回调 --- #
|
||||
|
||||
# 初始化子心流, 传入 mai_state_info 和 partial 创建的回调
|
||||
new_subflow = SubHeartflow(
|
||||
subheartflow_id,
|
||||
self.mai_state_info,
|
||||
hfc_callback, # <-- 传递 partial 创建的回调
|
||||
)
|
||||
|
||||
# 异步初始化
|
||||
await new_subflow.initialize()
|
||||
|
||||
# 添加聊天观察者
|
||||
observation = ChattingObservation(chat_id=subheartflow_id)
|
||||
await observation.initialize()
|
||||
|
||||
new_subflow.add_observation(observation)
|
||||
|
||||
# 注册子心流
|
||||
self.subheartflows[subheartflow_id] = new_subflow
|
||||
heartflow_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
|
||||
logger.info(f"[{heartflow_name}] 开始接收消息")
|
||||
|
||||
# 启动后台任务
|
||||
asyncio.create_task(new_subflow.subheartflow_start_working())
|
||||
|
||||
return new_subflow
|
||||
except Exception as e:
|
||||
logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
# --- 新增:内部方法,用于尝试将单个子心流设置为 ABSENT ---
|
||||
|
||||
# --- 结束新增 ---
|
||||
|
||||
async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool:
|
||||
"""停止指定的子心流并将其状态设置为 ABSENT"""
|
||||
log_prefix = "[子心流管理]"
|
||||
async with self._lock: # 加锁以安全访问字典
|
||||
subheartflow = self.subheartflows.get(subheartflow_id)
|
||||
|
||||
stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
|
||||
logger.info(f"{log_prefix} 正在停止 {stream_name}, 原因: {reason}")
|
||||
|
||||
# 调用内部方法处理状态变更
|
||||
success = await _try_set_subflow_absent_internal(subheartflow, log_prefix)
|
||||
|
||||
return success
|
||||
# 锁在此处自动释放
|
||||
|
||||
def get_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS):
|
||||
"""识别并返回需要清理的不活跃(处于ABSENT状态超过一小时)子心流(id, 原因)"""
|
||||
_current_time = time.time()
|
||||
flows_to_stop = []
|
||||
|
||||
for subheartflow_id, subheartflow in list(self.subheartflows.items()):
|
||||
state = subheartflow.chat_state.chat_status
|
||||
if state != ChatState.ABSENT:
|
||||
continue
|
||||
subheartflow.update_last_chat_state_time()
|
||||
_absent_last_time = subheartflow.chat_state_last_time
|
||||
flows_to_stop.append(subheartflow_id)
|
||||
|
||||
return flows_to_stop
|
||||
|
||||
async def enforce_subheartflow_limits(self):
|
||||
"""根据主状态限制停止超额子心流(优先停不活跃的)"""
|
||||
# 使用 self.mai_state_info 获取当前状态和限制
|
||||
current_mai_state = self.mai_state_info.get_current_state()
|
||||
normal_limit = current_mai_state.get_normal_chat_max_num()
|
||||
focused_limit = current_mai_state.get_focused_chat_max_num()
|
||||
logger.debug(f"[限制] 状态:{current_mai_state.value}, 普通限:{normal_limit}, 专注限:{focused_limit}")
|
||||
|
||||
# 分类统计当前子心流
|
||||
normal_flows = []
|
||||
focused_flows = []
|
||||
for flow_id, flow in list(self.subheartflows.items()):
|
||||
if flow.chat_state.chat_status == ChatState.CHAT:
|
||||
normal_flows.append((flow_id, getattr(flow, "last_active_time", 0)))
|
||||
elif flow.chat_state.chat_status == ChatState.FOCUSED:
|
||||
focused_flows.append((flow_id, getattr(flow, "last_active_time", 0)))
|
||||
|
||||
logger.debug(f"[限制] 当前数量 - 普通:{len(normal_flows)}, 专注:{len(focused_flows)}")
|
||||
stopped = 0
|
||||
|
||||
# 处理普通聊天超额
|
||||
if len(normal_flows) > normal_limit:
|
||||
excess = len(normal_flows) - normal_limit
|
||||
logger.info(f"[限制] 普通聊天超额({len(normal_flows)}>{normal_limit}), 停止{excess}个")
|
||||
normal_flows.sort(key=lambda x: x[1])
|
||||
for flow_id, _ in normal_flows[:excess]:
|
||||
if await self.sleep_subheartflow(flow_id, f"普通聊天超额(限{normal_limit})"):
|
||||
stopped += 1
|
||||
|
||||
# 处理专注聊天超额(需重新统计)
|
||||
focused_flows = [
|
||||
(fid, t)
|
||||
for fid, f in list(self.subheartflows.items())
|
||||
if (t := getattr(f, "last_active_time", 0)) and f.chat_state.chat_status == ChatState.FOCUSED
|
||||
]
|
||||
if len(focused_flows) > focused_limit:
|
||||
excess = len(focused_flows) - focused_limit
|
||||
logger.info(f"[限制] 专注聊天超额({len(focused_flows)}>{focused_limit}), 停止{excess}个")
|
||||
focused_flows.sort(key=lambda x: x[1])
|
||||
for flow_id, _ in focused_flows[:excess]:
|
||||
if await self.sleep_subheartflow(flow_id, f"专注聊天超额(限{focused_limit})"):
|
||||
stopped += 1
|
||||
|
||||
if stopped:
|
||||
logger.info(f"[限制] 已停止{stopped}个子心流, 剩余:{len(self.subheartflows)}")
|
||||
else:
|
||||
logger.debug(f"[限制] 无需停止, 当前总数:{len(self.subheartflows)}")
|
||||
|
||||
async def deactivate_all_subflows(self):
|
||||
"""将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)"""
|
||||
log_prefix = "[停用]"
|
||||
changed_count = 0
|
||||
processed_count = 0
|
||||
|
||||
async with self._lock: # 获取锁以安全迭代
|
||||
# 使用 list() 创建一个当前值的快照,防止在迭代时修改字典
|
||||
flows_to_update = list(self.subheartflows.values())
|
||||
processed_count = len(flows_to_update)
|
||||
if not flows_to_update:
|
||||
logger.debug(f"{log_prefix} 无活跃子心流,无需操作")
|
||||
return
|
||||
|
||||
for subflow in flows_to_update:
|
||||
# 记录原始状态,以便统计实际改变的数量
|
||||
original_state_was_absent = subflow.chat_state.chat_status == ChatState.ABSENT
|
||||
|
||||
success = await _try_set_subflow_absent_internal(subflow, log_prefix)
|
||||
|
||||
# 如果成功设置为 ABSENT 且原始状态不是 ABSENT,则计数
|
||||
if success and not original_state_was_absent:
|
||||
if subflow.chat_state.chat_status == ChatState.ABSENT:
|
||||
changed_count += 1
|
||||
else:
|
||||
# 这种情况理论上不应发生,如果内部方法返回 True 的话
|
||||
stream_name = chat_manager.get_stream_name(subflow.subheartflow_id) or subflow.subheartflow_id
|
||||
logger.warning(f"{log_prefix} 内部方法声称成功但 {stream_name} 状态未变为 ABSENT。")
|
||||
# 锁在此处自动释放
|
||||
|
||||
logger.info(
|
||||
f"{log_prefix} 完成,共处理 {processed_count} 个子心流,成功将 {changed_count} 个非 ABSENT 子心流的状态更改为 ABSENT。"
|
||||
)
|
||||
|
||||
async def sbhf_absent_into_focus(self):
|
||||
"""评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)"""
|
||||
try:
|
||||
current_state = self.mai_state_info.get_current_state()
|
||||
focused_limit = current_state.get_focused_chat_max_num()
|
||||
|
||||
# --- 新增:检查是否允许进入 FOCUS 模式 --- #
|
||||
if not global_config.allow_focus_mode:
|
||||
if int(time.time()) % 60 == 0: # 每60秒输出一次日志避免刷屏
|
||||
logger.trace("未开启 FOCUSED 状态 (allow_focus_mode=False)")
|
||||
return # 如果不允许,直接返回
|
||||
# --- 结束新增 ---
|
||||
|
||||
logger.info(f"当前状态 ({current_state.value}) 可以在{focused_limit}个群 专注聊天")
|
||||
|
||||
if focused_limit <= 0:
|
||||
# logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流")
|
||||
return
|
||||
|
||||
current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED)
|
||||
if current_focused_count >= focused_limit:
|
||||
logger.debug(f"已达专注上限 ({current_focused_count}/{focused_limit})")
|
||||
return
|
||||
|
||||
for sub_hf in list(self.subheartflows.values()):
|
||||
flow_id = sub_hf.subheartflow_id
|
||||
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
|
||||
|
||||
# 跳过非CHAT状态或已经是FOCUSED状态的子心流
|
||||
if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
|
||||
continue
|
||||
|
||||
if sub_hf.interest_chatting.start_hfc_probability == 0:
|
||||
continue
|
||||
else:
|
||||
logger.debug(
|
||||
f"{stream_name},现在状态: {sub_hf.chat_state.chat_status.value},进入专注概率: {sub_hf.interest_chatting.start_hfc_probability}"
|
||||
)
|
||||
|
||||
# 调试用
|
||||
from .mai_state_manager import enable_unlimited_hfc_chat
|
||||
|
||||
if not enable_unlimited_hfc_chat:
|
||||
if sub_hf.chat_state.chat_status != ChatState.CHAT:
|
||||
continue
|
||||
|
||||
if random.random() >= sub_hf.interest_chatting.start_hfc_probability:
|
||||
continue
|
||||
|
||||
# 再次检查是否达到上限
|
||||
if current_focused_count >= focused_limit:
|
||||
logger.debug(f"{stream_name} 已达专注上限")
|
||||
break
|
||||
|
||||
# 获取最新状态并执行提升
|
||||
current_subflow = self.subheartflows.get(flow_id)
|
||||
if not current_subflow:
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"{stream_name} 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})"
|
||||
)
|
||||
|
||||
# 执行状态提升
|
||||
await current_subflow.change_chat_state(ChatState.FOCUSED)
|
||||
|
||||
# 验证提升结果
|
||||
if (
|
||||
final_subflow := self.subheartflows.get(flow_id)
|
||||
) and final_subflow.chat_state.chat_status == ChatState.FOCUSED:
|
||||
current_focused_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"启动HFC 兴趣评估失败: {e}", exc_info=True)
|
||||
|
||||
async def sbhf_absent_into_chat(self):
|
||||
"""
|
||||
随机选一个 ABSENT 状态的 *群聊* 子心流,评估是否应转换为 CHAT 状态。
|
||||
每次调用最多转换一个。
|
||||
私聊会被忽略。
|
||||
"""
|
||||
current_mai_state = self.mai_state_info.get_current_state()
|
||||
chat_limit = current_mai_state.get_normal_chat_max_num()
|
||||
|
||||
async with self._lock:
|
||||
# 1. 筛选出所有 ABSENT 状态的 *群聊* 子心流
|
||||
absent_group_subflows = [
|
||||
hf
|
||||
for hf in self.subheartflows.values()
|
||||
if hf.chat_state.chat_status == ChatState.ABSENT and hf.is_group_chat
|
||||
]
|
||||
|
||||
if not absent_group_subflows:
|
||||
# logger.debug("没有摸鱼的群聊子心流可以评估。") # 日志太频繁
|
||||
return # 没有目标,直接返回
|
||||
|
||||
# 2. 随机选一个幸运儿
|
||||
sub_hf_to_evaluate = random.choice(absent_group_subflows)
|
||||
flow_id = sub_hf_to_evaluate.subheartflow_id
|
||||
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
|
||||
log_prefix = f"[{stream_name}]"
|
||||
|
||||
# 3. 检查 CHAT 上限
|
||||
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
|
||||
if current_chat_count >= chat_limit:
|
||||
logger.info(f"{log_prefix} 想看看能不能聊,但是聊天太多了, ({current_chat_count}/{chat_limit}) 满了。")
|
||||
return # 满了,这次就算了
|
||||
|
||||
# --- 获取 FOCUSED 计数 ---
|
||||
current_focused_count = self.count_subflows_by_state_nolock(ChatState.FOCUSED)
|
||||
focused_limit = current_mai_state.get_focused_chat_max_num()
|
||||
|
||||
# --- 新增:获取聊天和专注群名 ---
|
||||
chatting_group_names = []
|
||||
focused_group_names = []
|
||||
for flow_id, hf in self.subheartflows.items():
|
||||
stream_name = chat_manager.get_stream_name(flow_id) or str(flow_id) # 保证有名字
|
||||
if hf.chat_state.chat_status == ChatState.CHAT:
|
||||
chatting_group_names.append(stream_name)
|
||||
elif hf.chat_state.chat_status == ChatState.FOCUSED:
|
||||
focused_group_names.append(stream_name)
|
||||
# --- 结束新增 ---
|
||||
|
||||
# --- 获取观察信息和构建 Prompt ---
|
||||
first_observation = sub_hf_to_evaluate.observations[0] # 喵~第一个观察者肯定存在的说
|
||||
await first_observation.observe()
|
||||
current_chat_log = first_observation.talking_message_str or "当前没啥聊天内容。"
|
||||
_observation_summary = f"在[{stream_name}]这个群中,你最近看群友聊了这些:\n{current_chat_log}"
|
||||
|
||||
_mai_state_description = f"你当前状态: {current_mai_state.value}。"
|
||||
individuality = Individuality.get_instance()
|
||||
personality_prompt = individuality.get_prompt(x_person=2, level=2)
|
||||
prompt_personality = f"你正在扮演名为{individuality.name}的人类,{personality_prompt}"
|
||||
|
||||
# --- 修改:在 prompt 中加入当前聊天计数和群名信息 (条件显示) ---
|
||||
chat_status_lines = []
|
||||
if chatting_group_names:
|
||||
chat_status_lines.append(
|
||||
f"正在这些群闲聊 ({current_chat_count}/{chat_limit}): {', '.join(chatting_group_names)}"
|
||||
)
|
||||
if focused_group_names:
|
||||
chat_status_lines.append(
|
||||
f"正在这些群专注的聊天 ({current_focused_count}/{focused_limit}): {', '.join(focused_group_names)}"
|
||||
)
|
||||
|
||||
chat_status_prompt = "当前没有在任何群聊中。" # 默认消息喵~
|
||||
if chat_status_lines:
|
||||
chat_status_prompt = "当前聊天情况,你已经参与了下面这几个群的聊天:\n" + "\n".join(
|
||||
chat_status_lines
|
||||
) # 拼接状态信息
|
||||
|
||||
prompt = (
|
||||
f"{prompt_personality}\n"
|
||||
f"{chat_status_prompt}\n" # <-- 喵!用了新的状态信息~
|
||||
f"你当前尚未加入 [{stream_name}] 群聊天。\n"
|
||||
f"{_observation_summary}\n---\n"
|
||||
f"基于以上信息,你想不想开始在这个群闲聊?\n"
|
||||
f"请说明理由,并以 JSON 格式回答,包含 'decision' (布尔值) 和 'reason' (字符串)。\n"
|
||||
f'例如:{{"decision": true, "reason": "看起来挺热闹的,插个话"}}\n'
|
||||
f'例如:{{"decision": false, "reason": "已经聊了好多,休息一下"}}\n'
|
||||
f"请只输出有效的 JSON 对象。"
|
||||
)
|
||||
# --- 结束修改 ---
|
||||
|
||||
# --- 4. LLM 评估是否想聊 ---
|
||||
yao_kai_shi_liao_ma, reason = await self._llm_evaluate_state_transition(prompt)
|
||||
|
||||
if reason:
|
||||
if yao_kai_shi_liao_ma:
|
||||
logger.info(f"{log_prefix} 打算开始聊,原因是: {reason}")
|
||||
else:
|
||||
logger.info(f"{log_prefix} 不打算聊,原因是: {reason}")
|
||||
else:
|
||||
logger.info(f"{log_prefix} 结果: {yao_kai_shi_liao_ma}")
|
||||
|
||||
if yao_kai_shi_liao_ma is None:
|
||||
logger.debug(f"{log_prefix} 问AI想不想聊失败了,这次算了。")
|
||||
return # 评估失败,结束
|
||||
|
||||
if not yao_kai_shi_liao_ma:
|
||||
# logger.info(f"{log_prefix} 现在不想聊这个群。")
|
||||
return # 不想聊,结束
|
||||
|
||||
# --- 5. AI想聊,再次检查额度并尝试转换 ---
|
||||
# 再次检查以防万一
|
||||
current_chat_count_before_change = self.count_subflows_by_state_nolock(ChatState.CHAT)
|
||||
if current_chat_count_before_change < chat_limit:
|
||||
logger.info(
|
||||
f"{log_prefix} 想聊,而且还有精力 ({current_chat_count_before_change}/{chat_limit}),这就去聊!"
|
||||
)
|
||||
await sub_hf_to_evaluate.change_chat_state(ChatState.CHAT)
|
||||
# 确认转换成功
|
||||
if sub_hf_to_evaluate.chat_state.chat_status == ChatState.CHAT:
|
||||
logger.debug(f"{log_prefix} 成功进入聊天状态!本次评估圆满结束。")
|
||||
else:
|
||||
logger.warning(
|
||||
f"{log_prefix} 奇怪,尝试进入聊天状态失败了。当前状态: {sub_hf_to_evaluate.chat_state.chat_status.value}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"{log_prefix} AI说想聊,但是刚问完就没空位了 ({current_chat_count_before_change}/{chat_limit})。真不巧,下次再说吧。"
|
||||
)
|
||||
# 无论转换成功与否,本次评估都结束了
|
||||
|
||||
# 锁在这里自动释放
|
||||
|
||||
# --- 新增:单独检查 CHAT 状态超时的任务 ---
|
||||
async def sbhf_chat_into_absent(self):
|
||||
"""定期检查处于 CHAT 状态的子心流是否因长时间未发言而超时,并将其转为 ABSENT。"""
|
||||
log_prefix_task = "[聊天超时检查]"
|
||||
transitioned_to_absent = 0
|
||||
checked_count = 0
|
||||
|
||||
async with self._lock:
|
||||
subflows_snapshot = list(self.subheartflows.values())
|
||||
checked_count = len(subflows_snapshot)
|
||||
|
||||
if not subflows_snapshot:
|
||||
return
|
||||
|
||||
for sub_hf in subflows_snapshot:
|
||||
# 只检查 CHAT 状态的子心流
|
||||
if sub_hf.chat_state.chat_status != ChatState.CHAT:
|
||||
continue
|
||||
|
||||
flow_id = sub_hf.subheartflow_id
|
||||
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
|
||||
log_prefix = f"[{stream_name}]({log_prefix_task})"
|
||||
|
||||
should_deactivate = False
|
||||
reason = ""
|
||||
|
||||
try:
|
||||
last_bot_dong_zuo_time = sub_hf.get_normal_chat_last_speak_time()
|
||||
|
||||
if last_bot_dong_zuo_time > 0:
|
||||
current_time = time.time()
|
||||
time_since_last_bb = current_time - last_bot_dong_zuo_time
|
||||
minutes_since_last_bb = time_since_last_bb / 60
|
||||
|
||||
# 60分钟强制退出
|
||||
if minutes_since_last_bb >= 60:
|
||||
should_deactivate = True
|
||||
reason = "超过60分钟未发言,强制退出"
|
||||
else:
|
||||
# 根据时间区间确定退出概率
|
||||
exit_probability = 0
|
||||
if minutes_since_last_bb < 5:
|
||||
exit_probability = 0.01 # 1%
|
||||
elif minutes_since_last_bb < 15:
|
||||
exit_probability = 0.02 # 2%
|
||||
elif minutes_since_last_bb < 30:
|
||||
exit_probability = 0.04 # 4%
|
||||
else:
|
||||
exit_probability = 0.08 # 8%
|
||||
|
||||
# 随机判断是否退出
|
||||
if random.random() < exit_probability:
|
||||
should_deactivate = True
|
||||
reason = f"已{minutes_since_last_bb:.1f}分钟未发言,触发{exit_probability * 100:.0f}%退出概率"
|
||||
|
||||
except AttributeError:
|
||||
logger.error(
|
||||
f"{log_prefix} 无法获取 Bot 最后 BB 时间,请确保 SubHeartflow 相关实现正确。跳过超时检查。"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 检查 Bot 超时状态时出错: {e}", exc_info=True)
|
||||
|
||||
# 执行状态转换(如果超时)
|
||||
if should_deactivate:
|
||||
logger.debug(f"{log_prefix} 因超时 ({reason}),尝试转换为 ABSENT 状态。")
|
||||
await sub_hf.change_chat_state(ChatState.ABSENT)
|
||||
# 再次检查确保状态已改变
|
||||
if sub_hf.chat_state.chat_status == ChatState.ABSENT:
|
||||
transitioned_to_absent += 1
|
||||
logger.info(f"{log_prefix} 不看了。")
|
||||
else:
|
||||
logger.warning(f"{log_prefix} 尝试因超时转换为 ABSENT 失败。")
|
||||
|
||||
if transitioned_to_absent > 0:
|
||||
logger.debug(
|
||||
f"{log_prefix_task} 完成,共检查 {checked_count} 个子心流,{transitioned_to_absent} 个因超时转为 ABSENT。"
|
||||
)
|
||||
|
||||
# --- 结束新增 ---
|
||||
|
||||
async def _llm_evaluate_state_transition(self, prompt: str) -> Tuple[Optional[bool], Optional[str]]:
|
||||
"""
|
||||
使用 LLM 评估是否应进行状态转换,期望 LLM 返回 JSON 格式。
|
||||
|
||||
Args:
|
||||
prompt: 提供给 LLM 的提示信息,要求返回 {"decision": true/false}。
|
||||
|
||||
Returns:
|
||||
Optional[bool]: 如果成功解析 LLM 的 JSON 响应并提取了 'decision' 键的值,则返回该布尔值。
|
||||
如果 LLM 调用失败、返回无效 JSON 或 JSON 中缺少 'decision' 键或其值不是布尔型,则返回 None。
|
||||
"""
|
||||
log_prefix = "[LLM状态评估]"
|
||||
try:
|
||||
# --- 真实的 LLM 调用 ---
|
||||
response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt)
|
||||
# logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估")
|
||||
logger.debug(f"{log_prefix} 原始输入: {prompt}")
|
||||
logger.debug(f"{log_prefix} 原始评估结果: {response_text}")
|
||||
|
||||
# --- 解析 JSON 响应 ---
|
||||
try:
|
||||
# 尝试去除可能的Markdown代码块标记
|
||||
cleaned_response = response_text.strip().strip("`").strip()
|
||||
if cleaned_response.startswith("json"):
|
||||
cleaned_response = cleaned_response[4:].strip()
|
||||
|
||||
data = json.loads(cleaned_response)
|
||||
decision = data.get("decision") # 使用 .get() 避免 KeyError
|
||||
reason = data.get("reason")
|
||||
|
||||
if isinstance(decision, bool):
|
||||
logger.debug(f"{log_prefix} LLM评估结果 (来自JSON): {'建议转换' if decision else '建议不转换'}")
|
||||
|
||||
return decision, reason
|
||||
else:
|
||||
logger.warning(
|
||||
f"{log_prefix} LLM 返回的 JSON 中 'decision' 键的值不是布尔型: {decision}。响应: {response_text}"
|
||||
)
|
||||
return None, None # 值类型不正确
|
||||
|
||||
except json.JSONDecodeError as json_err:
|
||||
logger.warning(f"{log_prefix} LLM 返回的响应不是有效的 JSON: {json_err}。响应: {response_text}")
|
||||
# 尝试在非JSON响应中查找关键词作为后备方案 (可选)
|
||||
if "true" in response_text.lower():
|
||||
logger.debug(f"{log_prefix} 在非JSON响应中找到 'true',解释为建议转换")
|
||||
return True, None
|
||||
if "false" in response_text.lower():
|
||||
logger.debug(f"{log_prefix} 在非JSON响应中找到 'false',解释为建议不转换")
|
||||
return False, None
|
||||
return None, None # JSON 解析失败,也未找到关键词
|
||||
except Exception as parse_err: # 捕获其他可能的解析错误
|
||||
logger.warning(f"{log_prefix} 解析 LLM JSON 响应时发生意外错误: {parse_err}。响应: {response_text}")
|
||||
return None, None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 调用 LLM 或处理其响应时出错: {e}", exc_info=True)
|
||||
traceback.print_exc()
|
||||
return None, None # LLM 调用或处理失败
|
||||
|
||||
def count_subflows_by_state(self, state: ChatState) -> int:
|
||||
"""统计指定状态的子心流数量"""
|
||||
count = 0
|
||||
# 遍历所有子心流实例
|
||||
for subheartflow in self.subheartflows.values():
|
||||
# 检查子心流状态是否匹配
|
||||
if subheartflow.chat_state.chat_status == state:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def count_subflows_by_state_nolock(self, state: ChatState) -> int:
|
||||
"""
|
||||
统计指定状态的子心流数量 (不上锁版本)。
|
||||
警告:仅应在已持有 self._lock 的上下文中使用此方法。
|
||||
"""
|
||||
count = 0
|
||||
for subheartflow in self.subheartflows.values():
|
||||
if subheartflow.chat_state.chat_status == state:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def get_active_subflow_minds(self) -> List[str]:
|
||||
"""获取所有活跃(非ABSENT)子心流的当前想法"""
|
||||
minds = []
|
||||
for subheartflow in self.subheartflows.values():
|
||||
# 检查子心流是否活跃(非ABSENT状态)
|
||||
if subheartflow.chat_state.chat_status != ChatState.ABSENT:
|
||||
minds.append(subheartflow.sub_mind.current_mind)
|
||||
return minds
|
||||
|
||||
def update_main_mind_in_subflows(self, main_mind: str):
|
||||
"""更新所有子心流的主心流想法"""
|
||||
updated_count = sum(
|
||||
1
|
||||
for _, subheartflow in list(self.subheartflows.items())
|
||||
if subheartflow.subheartflow_id in self.subheartflows
|
||||
)
|
||||
logger.debug(f"[子心流管理器] 更新了{updated_count}个子心流的主想法")
|
||||
|
||||
async def delete_subflow(self, subheartflow_id: Any):
|
||||
"""删除指定的子心流。"""
|
||||
async with self._lock:
|
||||
subflow = self.subheartflows.pop(subheartflow_id, None)
|
||||
if subflow:
|
||||
logger.info(f"正在删除 SubHeartflow: {subheartflow_id}...")
|
||||
try:
|
||||
# 调用 shutdown 方法确保资源释放
|
||||
await subflow.shutdown()
|
||||
logger.info(f"SubHeartflow {subheartflow_id} 已成功删除。")
|
||||
except Exception as e:
|
||||
logger.error(f"删除 SubHeartflow {subheartflow_id} 时出错: {e}", exc_info=True)
|
||||
else:
|
||||
logger.warning(f"尝试删除不存在的 SubHeartflow: {subheartflow_id}")
|
||||
|
||||
# --- 新增:处理 HFC 无回复回调的专用方法 --- #
|
||||
async def _handle_hfc_no_reply(self, subheartflow_id: Any):
|
||||
"""处理来自 HeartFChatting 的连续无回复信号 (通过 partial 绑定 ID)"""
|
||||
# 注意:这里不需要再获取锁,因为 sbhf_focus_into_absent 内部会处理锁
|
||||
logger.debug(f"[管理器 HFC 处理器] 接收到来自 {subheartflow_id} 的 HFC 无回复信号")
|
||||
await self.sbhf_focus_into_absent_or_chat(subheartflow_id)
|
||||
|
||||
# --- 结束新增 --- #
|
||||
|
||||
# --- 新增:处理来自 HeartFChatting 的状态转换请求 --- #
|
||||
async def sbhf_focus_into_absent_or_chat(self, subflow_id: Any):
|
||||
"""
|
||||
接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 ABSENT 或 CHAT。
|
||||
通常在连续多次 "no_reply" 后被调用。
|
||||
对于私聊,总是转换为 ABSENT。
|
||||
对于群聊,随机决定转换为 ABSENT 或 CHAT (如果 CHAT 未达上限)。
|
||||
|
||||
Args:
|
||||
subflow_id: 需要转换状态的子心流 ID。
|
||||
"""
|
||||
async with self._lock:
|
||||
subflow = self.subheartflows.get(subflow_id)
|
||||
if not subflow:
|
||||
logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 ABSENT/CHAT")
|
||||
return
|
||||
|
||||
stream_name = chat_manager.get_stream_name(subflow_id) or subflow_id
|
||||
current_state = subflow.chat_state.chat_status
|
||||
|
||||
if current_state == ChatState.FOCUSED:
|
||||
target_state = ChatState.ABSENT # Default target
|
||||
log_reason = "默认转换 (私聊或群聊)"
|
||||
|
||||
# --- Modify logic based on chat type --- #
|
||||
if subflow.is_group_chat:
|
||||
# Group chat: Decide between ABSENT or CHAT
|
||||
if random.random() < 0.5: # 50% chance to try CHAT
|
||||
current_mai_state = self.mai_state_info.get_current_state()
|
||||
chat_limit = current_mai_state.get_normal_chat_max_num()
|
||||
current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT)
|
||||
|
||||
if current_chat_count < chat_limit:
|
||||
target_state = ChatState.CHAT
|
||||
log_reason = f"群聊随机选择 CHAT (当前 {current_chat_count}/{chat_limit})"
|
||||
else:
|
||||
target_state = ChatState.ABSENT # Fallback to ABSENT if CHAT limit reached
|
||||
log_reason = (
|
||||
f"群聊随机选择 CHAT 但已达上限 ({current_chat_count}/{chat_limit}),转为 ABSENT"
|
||||
)
|
||||
else: # 50% chance to go directly to ABSENT
|
||||
target_state = ChatState.ABSENT
|
||||
log_reason = "群聊随机选择 ABSENT"
|
||||
else:
|
||||
# Private chat: Always go to ABSENT
|
||||
target_state = ChatState.ABSENT
|
||||
log_reason = "私聊退出 FOCUSED,转为 ABSENT"
|
||||
# --- End modification --- #
|
||||
|
||||
logger.info(
|
||||
f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 尝试转换为 {target_state.value} ({log_reason})"
|
||||
)
|
||||
try:
|
||||
# 从HFC到CHAT时,清空兴趣字典
|
||||
subflow.clear_interest_dict()
|
||||
await subflow.change_chat_state(target_state)
|
||||
final_state = subflow.chat_state.chat_status
|
||||
if final_state == target_state:
|
||||
logger.debug(f"[状态转换请求] {stream_name} 状态已成功转换为 {final_state.value}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"[状态转换请求] 尝试将 {stream_name} 转换为 {target_state.value} 后,状态实际为 {final_state.value}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[状态转换请求] 转换 {stream_name} 到 {target_state.value} 时出错: {e}", exc_info=True
|
||||
)
|
||||
elif current_state == ChatState.ABSENT:
|
||||
logger.debug(f"[状态转换请求] {stream_name} 已处于 ABSENT 状态,无需转换")
|
||||
else:
|
||||
logger.warning(
|
||||
f"[状态转换请求] 收到对 {stream_name} 的请求,但其状态为 {current_state.value} (非 FOCUSED),不执行转换"
|
||||
)
|
||||
|
||||
# --- 结束新增 --- #
|
||||
|
||||
# --- 新增:处理私聊从 ABSENT 直接到 FOCUSED 的逻辑 --- #
|
||||
async def sbhf_absent_private_into_focus(self):
|
||||
"""检查 ABSENT 状态的私聊子心流是否有新活动,若有且未达 FOCUSED 上限,则直接转换为 FOCUSED。"""
|
||||
log_prefix_task = "[私聊激活检查]"
|
||||
transitioned_count = 0
|
||||
checked_count = 0
|
||||
|
||||
# --- 获取当前状态和 FOCUSED 上限 --- #
|
||||
current_mai_state = self.mai_state_info.get_current_state()
|
||||
focused_limit = current_mai_state.get_focused_chat_max_num()
|
||||
|
||||
# --- 检查是否允许 FOCUS 模式 --- #
|
||||
if not global_config.allow_focus_mode:
|
||||
# Log less frequently to avoid spam
|
||||
# if int(time.time()) % 60 == 0:
|
||||
# logger.debug(f"{log_prefix_task} 配置不允许进入 FOCUSED 状态")
|
||||
return
|
||||
|
||||
if focused_limit <= 0:
|
||||
# logger.debug(f"{log_prefix_task} 当前状态 ({current_mai_state.value}) 不允许 FOCUSED 子心流")
|
||||
return
|
||||
|
||||
async with self._lock:
|
||||
# --- 获取当前 FOCUSED 计数 (不上锁版本) --- #
|
||||
current_focused_count = self.count_subflows_by_state_nolock(ChatState.FOCUSED)
|
||||
|
||||
# --- 筛选出所有 ABSENT 状态的私聊子心流 --- #
|
||||
eligible_subflows = [
|
||||
hf
|
||||
for hf in self.subheartflows.values()
|
||||
if hf.chat_state.chat_status == ChatState.ABSENT and not hf.is_group_chat
|
||||
]
|
||||
checked_count = len(eligible_subflows)
|
||||
|
||||
if not eligible_subflows:
|
||||
# logger.debug(f"{log_prefix_task} 没有 ABSENT 状态的私聊子心流可以评估。")
|
||||
return
|
||||
|
||||
# --- 遍历评估每个符合条件的私聊 --- #
|
||||
for sub_hf in eligible_subflows:
|
||||
# --- 再次检查 FOCUSED 上限,因为可能有多个同时激活 --- #
|
||||
if current_focused_count >= focused_limit:
|
||||
logger.debug(
|
||||
f"{log_prefix_task} 已达专注上限 ({current_focused_count}/{focused_limit}),停止检查后续私聊。"
|
||||
)
|
||||
break # 已满,无需再检查其他私聊
|
||||
|
||||
flow_id = sub_hf.subheartflow_id
|
||||
stream_name = chat_manager.get_stream_name(flow_id) or flow_id
|
||||
log_prefix = f"[{stream_name}]({log_prefix_task})"
|
||||
|
||||
try:
|
||||
# --- 检查是否有新活动 --- #
|
||||
observation = sub_hf._get_primary_observation() # 获取主要观察者
|
||||
is_active = False
|
||||
if observation:
|
||||
# 检查自上次状态变为 ABSENT 后是否有新消息
|
||||
# 使用 chat_state_changed_time 可能更精确
|
||||
# 加一点点缓冲时间(例如 1 秒)以防时间戳完全相等
|
||||
timestamp_to_check = sub_hf.chat_state_changed_time - 1
|
||||
has_new = await observation.has_new_messages_since(timestamp_to_check)
|
||||
if has_new:
|
||||
is_active = True
|
||||
logger.debug(f"{log_prefix} 检测到新消息,标记为活跃。")
|
||||
else:
|
||||
logger.warning(f"{log_prefix} 无法获取主要观察者来检查活动状态。")
|
||||
|
||||
# --- 如果活跃且未达上限,则尝试转换 --- #
|
||||
if is_active:
|
||||
logger.info(
|
||||
f"{log_prefix} 检测到活跃且未达专注上限 ({current_focused_count}/{focused_limit}),尝试转换为 FOCUSED。"
|
||||
)
|
||||
await sub_hf.change_chat_state(ChatState.FOCUSED)
|
||||
# 确认转换成功
|
||||
if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
|
||||
transitioned_count += 1
|
||||
current_focused_count += 1 # 更新计数器以供本轮后续检查
|
||||
logger.info(f"{log_prefix} 成功进入 FOCUSED 状态。")
|
||||
else:
|
||||
logger.warning(
|
||||
f"{log_prefix} 尝试进入 FOCUSED 状态失败。当前状态: {sub_hf.chat_state.chat_status.value}"
|
||||
)
|
||||
# else: # 不活跃,无需操作
|
||||
# logger.debug(f"{log_prefix} 未检测到新活动,保持 ABSENT。")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"{log_prefix} 检查私聊活动或转换状态时出错: {e}", exc_info=True)
|
||||
|
||||
# --- 循环结束后记录总结日志 --- #
|
||||
if transitioned_count > 0:
|
||||
logger.debug(
|
||||
f"{log_prefix_task} 完成,共检查 {checked_count} 个私聊,{transitioned_count} 个转换为 FOCUSED。"
|
||||
)
|
||||
74
src/chat/heart_flow/utils_chat.py
Normal file
74
src/chat/heart_flow/utils_chat.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import asyncio
|
||||
from typing import Optional, Tuple, Dict
|
||||
from src.common.logger_manager import get_logger
|
||||
from src.chat.message_receive.chat_stream import chat_manager
|
||||
from src.chat.person_info.person_info import person_info_manager
|
||||
|
||||
logger = get_logger("heartflow_utils")
|
||||
|
||||
|
||||
async def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]:
|
||||
"""
|
||||
获取聊天类型(是否群聊)和私聊对象信息。
|
||||
|
||||
Args:
|
||||
chat_id: 聊天流ID
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Optional[Dict]]:
|
||||
- bool: 是否为群聊 (True 是群聊, False 是私聊或未知)
|
||||
- Optional[Dict]: 如果是私聊,包含对方信息的字典;否则为 None。
|
||||
字典包含: platform, user_id, user_nickname, person_id, person_name
|
||||
"""
|
||||
is_group_chat = False # Default to private/unknown
|
||||
chat_target_info = None
|
||||
|
||||
try:
|
||||
chat_stream = await asyncio.to_thread(chat_manager.get_stream, chat_id) # Use to_thread if get_stream is sync
|
||||
# If get_stream is already async, just use: chat_stream = await chat_manager.get_stream(chat_id)
|
||||
|
||||
if chat_stream:
|
||||
if chat_stream.group_info:
|
||||
is_group_chat = True
|
||||
chat_target_info = None # Explicitly None for group chat
|
||||
elif chat_stream.user_info: # It's a private chat
|
||||
is_group_chat = False
|
||||
user_info = chat_stream.user_info
|
||||
platform = chat_stream.platform
|
||||
user_id = user_info.user_id
|
||||
|
||||
# Initialize target_info with basic info
|
||||
target_info = {
|
||||
"platform": platform,
|
||||
"user_id": user_id,
|
||||
"user_nickname": user_info.user_nickname,
|
||||
"person_id": None,
|
||||
"person_name": None,
|
||||
}
|
||||
|
||||
# Try to fetch person info
|
||||
try:
|
||||
# Assume get_person_id is sync (as per original code), keep using to_thread
|
||||
person_id = await asyncio.to_thread(person_info_manager.get_person_id, platform, user_id)
|
||||
person_name = None
|
||||
if person_id:
|
||||
# get_value is async, so await it directly
|
||||
person_name = await person_info_manager.get_value(person_id, "person_name")
|
||||
|
||||
target_info["person_id"] = person_id
|
||||
target_info["person_name"] = person_name
|
||||
except Exception as person_e:
|
||||
logger.warning(
|
||||
f"获取 person_id 或 person_name 时出错 for {platform}:{user_id} in utils: {person_e}"
|
||||
)
|
||||
|
||||
chat_target_info = target_info
|
||||
else:
|
||||
logger.warning(f"无法获取 chat_stream for {chat_id} in utils")
|
||||
# Keep defaults: is_group_chat=False, chat_target_info=None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取聊天类型和目标信息时出错 for {chat_id}: {e}", exc_info=True)
|
||||
# Keep defaults on error
|
||||
|
||||
return is_group_chat, chat_target_info
|
||||
Reference in New Issue
Block a user