fix:进一步模块化,修复观察错位问题

This commit is contained in:
SengokuCola
2025-04-25 18:12:11 +08:00
parent 1e75082141
commit d7ca0255fe
16 changed files with 1217 additions and 668 deletions

View File

@@ -43,6 +43,7 @@ class InterestChatting:
max_probability=max_reply_probability,
state_change_callback: Optional[Callable[[ChatState], None]] = None,
):
# 基础属性初始化
self.interest_level: float = 0.0
self.last_update_time: float = time.time()
self.decay_rate_per_second: float = decay_rate
@@ -56,16 +57,26 @@ class InterestChatting:
self.max_reply_probability: float = max_probability
self.current_reply_probability: float = 0.0
self.is_above_threshold: bool = False
# 任务相关属性初始化
self.update_task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._task_lock = asyncio.Lock()
self._is_running = False
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
self.update_interval = 1.0
self.start_updates(self.update_interval) # 初始化时启动后台更新任务
self.above_threshold = False
self.start_hfc_probability = 0.0
@classmethod
async def create(cls, *args, **kwargs):
"""异步工厂方法,用于创建并初始化 InterestChatting 实例"""
instance = cls(*args, **kwargs)
await instance.start_updates(instance.update_interval)
return instance
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
self.last_interaction_time = time.time()
@@ -141,59 +152,74 @@ class InterestChatting:
# --- 新增后台更新任务相关方法 ---
async def _run_update_loop(self, update_interval: float = 1.0):
"""后台循环,定期更新兴趣和回复概率。"""
while not self._stop_event.is_set():
try:
if self.interest_level != 0:
await self._calculate_decay()
try:
while not self._stop_event.is_set():
try:
if self.interest_level != 0:
await self._calculate_decay()
await self._update_reply_probability()
await self._update_reply_probability()
# 等待下一个周期或停止事件
await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
except asyncio.TimeoutError:
# 正常超时,继续循环
continue
except asyncio.CancelledError:
logger.info("InterestChatting 更新循环被取消。")
break
except Exception as e:
logger.error(f"InterestChatting 更新循环出错: {e}")
logger.error(traceback.format_exc())
# 防止错误导致CPU飙升稍作等待
await asyncio.sleep(5)
logger.info("InterestChatting 更新循环已停止。")
# 等待下一个周期或停止事件
await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
except asyncio.TimeoutError:
# 正常超时,继续循环
continue
except Exception as e:
logger.error(f"InterestChatting 更新循环出错: {e}")
logger.error(traceback.format_exc())
# 防止错误导致CPU飙升稍作等待
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info("InterestChatting 更新循环被取消。")
finally:
self._is_running = False
logger.info("InterestChatting 更新循环已停止。")
def start_updates(self, update_interval: float = 1.0):
"""启动后台更新任务"""
if self.update_task is None or self.update_task.done():
self._stop_event.clear()
self.update_task = asyncio.create_task(self._run_update_loop(update_interval))
logger.debug("后台兴趣更新任务已创建并启动。")
else:
logger.debug("后台兴趣更新任务已在运行中。")
async def start_updates(self, update_interval: float = 1.0):
"""启动后台更新任务,使用锁确保并发安全"""
async with self._task_lock:
if self._is_running:
logger.debug("后台兴趣更新任务已在运行中。")
return
# 清理已完成或已取消的任务
if self.update_task and (self.update_task.done() or self.update_task.cancelled()):
self.update_task = None
if not self.update_task:
self._stop_event.clear()
self._is_running = True
self.update_task = asyncio.create_task(self._run_update_loop(update_interval))
logger.debug("后台兴趣更新任务已创建并启动。")
async def stop_updates(self):
"""停止后台更新任务"""
if self.update_task and not self.update_task.done():
"""停止后台更新任务,使用锁确保并发安全"""
async with self._task_lock:
if not self._is_running:
logger.debug("后台兴趣更新任务未运行。")
return
logger.info("正在停止 InterestChatting 后台更新任务...")
self._stop_event.set() # 发送停止信号
try:
# 等待任务结束,设置超时
await asyncio.wait_for(self.update_task, timeout=5.0)
logger.info("InterestChatting 后台更新任务已成功停止。")
except asyncio.TimeoutError:
logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
self.update_task.cancel()
self._stop_event.set()
if self.update_task and not self.update_task.done():
try:
await self.update_task # 等待取消完成
except asyncio.CancelledError:
logger.info("InterestChatting 后台更新任务已被取消")
except Exception as e:
logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
finally:
self.update_task = None
else:
logger.debug("InterestChatting 后台更新任务未运行或已完成。")
# 等待任务结束,设置超时
await asyncio.wait_for(self.update_task, timeout=5.0)
logger.info("InterestChatting 后台更新任务已成功停止")
except asyncio.TimeoutError:
logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
self.update_task.cancel()
try:
await self.update_task # 等待取消完成
except asyncio.CancelledError:
logger.info("InterestChatting 后台更新任务已被取消。")
except Exception as e:
logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
finally:
self.update_task = None
self._is_running = False
# --- 结束 新增方法 ---
@@ -214,7 +240,7 @@ class SubHeartflow:
# 聊天状态管理
self.chat_state: ChatStateInfo = ChatStateInfo() # 该sub_heartflow的聊天状态信息
self.interest_chatting = InterestChatting(state_change_callback=self.set_chat_state)
self.interest_chatting = None # 将在 initialize 中创建
# 活动状态管理
self.last_active_time = time.time() # 最后活跃时间
@@ -234,6 +260,11 @@ class SubHeartflow:
self.log_prefix = chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id
async def initialize(self):
"""异步初始化方法"""
self.interest_chatting = await InterestChatting.create(state_change_callback=self.set_chat_state)
logger.debug(f"{self.log_prefix} InterestChatting 实例已创建并初始化。")
async def add_time_current_state(self, add_time: float):
self.current_state_time += add_time
@@ -412,7 +443,7 @@ class SubHeartflow:
- 负责子心流的主要后台循环
- 每30秒检查一次停止标志
"""
logger.info(f"{self.log_prefix} 子心流开始工作...")
logger.trace(f"{self.log_prefix} 子心流开始工作...")
while not self.should_stop:
await asyncio.sleep(30) # 30秒检查一次停止标志