fix:再次尝试修复模式
This commit is contained in:
@@ -159,94 +159,119 @@ class NormalChat:
|
||||
后台任务方法,轮询当前实例关联chat的兴趣消息
|
||||
通常由start_monitoring_interest()启动
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
# 检查任务是否已被取消 - 移动到try块最开始
|
||||
if self._chat_task is None or self._chat_task.cancelled():
|
||||
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出")
|
||||
break
|
||||
|
||||
# 检查是否已停用
|
||||
logger.debug(f"[{self.stream_name}] 兴趣监控任务开始")
|
||||
|
||||
try:
|
||||
while True:
|
||||
# 第一层检查:立即检查取消和停用状态
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控")
|
||||
logger.info(f"[{self.stream_name}] 检测到停用标志,退出兴趣监控")
|
||||
break
|
||||
|
||||
await asyncio.sleep(0.5) # 每0.5秒检查一次
|
||||
|
||||
# 再次检查取消状态
|
||||
if self._chat_task is None or self._chat_task.cancelled() or self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 检测到停止信号,退出")
|
||||
# 检查当前任务是否已被取消
|
||||
current_task = asyncio.current_task()
|
||||
if current_task and current_task.cancelled():
|
||||
logger.info(f"[{self.stream_name}] 当前任务已被取消,退出")
|
||||
break
|
||||
|
||||
items_to_process = list(self.interest_dict.items())
|
||||
if not items_to_process:
|
||||
continue
|
||||
|
||||
# 使用异步上下文管理器处理消息
|
||||
try:
|
||||
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
|
||||
# 在上下文内部再次检查取消状态
|
||||
if self._chat_task is None or self._chat_task.cancelled() or self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出")
|
||||
break
|
||||
# 短暂等待,让出控制权
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# 第二层检查:睡眠后再次检查状态
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 睡眠后检测到停用标志,退出")
|
||||
break
|
||||
|
||||
# 并行处理兴趣消息
|
||||
async def process_single_message(msg_id, message, interest_value, is_mentioned):
|
||||
"""处理单个兴趣消息"""
|
||||
try:
|
||||
# 在处理每个消息前检查停止状态
|
||||
if self._disabled or (self._chat_task and self._chat_task.cancelled()):
|
||||
return
|
||||
# 获取待处理消息
|
||||
items_to_process = list(self.interest_dict.items())
|
||||
if not items_to_process:
|
||||
# 没有消息时继续下一轮循环
|
||||
continue
|
||||
|
||||
# 处理消息
|
||||
if time.time() - self.start_time > 300:
|
||||
self.adjust_reply_frequency(duration=300 / 60)
|
||||
else:
|
||||
self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60)
|
||||
# 第三层检查:在处理消息前最后检查一次
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 处理消息前检测到停用标志,退出")
|
||||
break
|
||||
|
||||
await self.normal_response(
|
||||
message=message,
|
||||
is_mentioned=is_mentioned,
|
||||
interested_rate=interest_value * self.willing_amplifier,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}"
|
||||
)
|
||||
finally:
|
||||
self.interest_dict.pop(msg_id, None)
|
||||
# 使用异步上下文管理器处理消息
|
||||
try:
|
||||
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
|
||||
# 在上下文内部再次检查取消状态
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出")
|
||||
break
|
||||
|
||||
# 创建并行任务列表
|
||||
tasks = []
|
||||
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
|
||||
task = process_single_message(msg_id, message, interest_value, is_mentioned)
|
||||
tasks.append(task)
|
||||
# 并行处理兴趣消息
|
||||
async def process_single_message(msg_id, message, interest_value, is_mentioned):
|
||||
"""处理单个兴趣消息"""
|
||||
try:
|
||||
# 在处理每个消息前检查停止状态
|
||||
if self._disabled:
|
||||
logger.debug(f"[{self.stream_name}] 处理消息时检测到停用,跳过消息 {msg_id}")
|
||||
return
|
||||
|
||||
# 并行执行所有任务,限制并发数量避免资源过度消耗
|
||||
if tasks:
|
||||
# 使用信号量控制并发数,最多同时处理5个消息
|
||||
semaphore = asyncio.Semaphore(5)
|
||||
# 处理消息
|
||||
if time.time() - self.start_time > 300:
|
||||
self.adjust_reply_frequency(duration=300 / 60)
|
||||
else:
|
||||
self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60)
|
||||
|
||||
async def limited_process(task, sem):
|
||||
async with sem:
|
||||
await task
|
||||
await self.normal_response(
|
||||
message=message,
|
||||
is_mentioned=is_mentioned,
|
||||
interested_rate=interest_value * self.willing_amplifier,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"[{self.stream_name}] 处理消息 {msg_id} 时被取消")
|
||||
raise # 重新抛出取消异常
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}")
|
||||
# 不打印完整traceback,避免日志污染
|
||||
finally:
|
||||
# 无论如何都要清理消息
|
||||
self.interest_dict.pop(msg_id, None)
|
||||
|
||||
# 创建并行任务列表
|
||||
tasks = []
|
||||
for msg_id, (message, interest_value, is_mentioned) in items_to_process:
|
||||
task = process_single_message(msg_id, message, interest_value, is_mentioned)
|
||||
tasks.append(task)
|
||||
|
||||
# 并行执行所有任务,限制并发数量避免资源过度消耗
|
||||
if tasks:
|
||||
# 使用信号量控制并发数,最多同时处理5个消息
|
||||
semaphore = asyncio.Semaphore(5)
|
||||
|
||||
async def limited_process(task, sem):
|
||||
async with sem:
|
||||
await task
|
||||
|
||||
limited_tasks = [limited_process(task, semaphore) for task in tasks]
|
||||
await asyncio.gather(*limited_tasks, return_exceptions=True)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 处理上下文时任务被取消")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}")
|
||||
# 出错后短暂等待,避免快速重试
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
limited_tasks = [limited_process(task, semaphore) for task in tasks]
|
||||
await asyncio.gather(*limited_tasks, return_exceptions=True)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 处理上下文时任务被取消")
|
||||
logger.info(f"[{self.stream_name}] 主循环中任务被取消")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}")
|
||||
await asyncio.sleep(1)
|
||||
logger.error(f"[{self.stream_name}] 主循环出错: {e}")
|
||||
# 出错后等待一秒再继续
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 兴趣监控任务出错: {e}\n{traceback.format_exc()}")
|
||||
await asyncio.sleep(1) # 出错后等待一秒再继续
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 兴趣监控任务被取消")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 兴趣监控任务严重错误: {e}")
|
||||
finally:
|
||||
logger.debug(f"[{self.stream_name}] 兴趣监控任务结束")
|
||||
|
||||
# 改为实例方法, 移除 chat 参数
|
||||
async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None:
|
||||
@@ -504,60 +529,112 @@ class NormalChat:
|
||||
# 改为实例方法, 移除 chat 参数
|
||||
|
||||
async def start_chat(self):
|
||||
"""启动聊天任务。""" # Ensure initialized before starting tasks
|
||||
self._disabled = False # 启动时重置停用标志
|
||||
|
||||
if self._chat_task is None or self._chat_task.done():
|
||||
# logger.info(f"[{self.stream_name}] 开始处理兴趣消息...")
|
||||
polling_task = asyncio.create_task(self._reply_interested_message())
|
||||
polling_task.add_done_callback(lambda t: self._handle_task_completion(t))
|
||||
self._chat_task = polling_task
|
||||
else:
|
||||
"""启动聊天任务。"""
|
||||
logger.debug(f"[{self.stream_name}] 开始启动聊天任务")
|
||||
|
||||
# 重置停用标志
|
||||
self._disabled = False
|
||||
|
||||
# 检查是否已有运行中的任务
|
||||
if self._chat_task and not self._chat_task.done():
|
||||
logger.info(f"[{self.stream_name}] 聊天轮询任务已在运行中。")
|
||||
return
|
||||
|
||||
# 清理可能存在的已完成任务引用
|
||||
if self._chat_task and self._chat_task.done():
|
||||
self._chat_task = None
|
||||
|
||||
try:
|
||||
logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务")
|
||||
polling_task = asyncio.create_task(self._reply_interested_message())
|
||||
|
||||
# 设置回调
|
||||
polling_task.add_done_callback(lambda t: self._handle_task_completion(t))
|
||||
|
||||
# 保存任务引用
|
||||
self._chat_task = polling_task
|
||||
|
||||
logger.debug(f"[{self.stream_name}] 聊天任务启动完成")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 启动聊天任务失败: {e}")
|
||||
self._chat_task = None
|
||||
raise
|
||||
|
||||
def _handle_task_completion(self, task: asyncio.Task):
|
||||
"""任务完成回调处理"""
|
||||
if task is not self._chat_task:
|
||||
logger.warning(f"[{self.stream_name}] 收到未知任务回调")
|
||||
return
|
||||
try:
|
||||
if exc := task.exception():
|
||||
logger.error(f"[{self.stream_name}] 任务异常: {exc}")
|
||||
traceback.print_exc()
|
||||
except asyncio.CancelledError:
|
||||
logger.debug(f"[{self.stream_name}] 任务已取消")
|
||||
# 简化回调逻辑,避免复杂的异常处理
|
||||
logger.debug(f"[{self.stream_name}] 任务完成回调被调用")
|
||||
|
||||
# 检查是否是我们管理的任务
|
||||
if task is not self._chat_task:
|
||||
# 如果已经不是当前任务(可能在stop_chat中已被清空),直接返回
|
||||
logger.debug(f"[{self.stream_name}] 回调的任务不是当前管理的任务")
|
||||
return
|
||||
|
||||
# 清理任务引用
|
||||
self._chat_task = None
|
||||
logger.debug(f"[{self.stream_name}] 任务引用已清理")
|
||||
|
||||
# 简单记录任务状态,不进行复杂处理
|
||||
if task.cancelled():
|
||||
logger.debug(f"[{self.stream_name}] 任务已取消")
|
||||
elif task.done():
|
||||
try:
|
||||
# 尝试获取异常,但不抛出
|
||||
exc = task.exception()
|
||||
if exc:
|
||||
logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}")
|
||||
else:
|
||||
logger.debug(f"[{self.stream_name}] 任务正常完成")
|
||||
except Exception as e:
|
||||
# 获取异常时也可能出错,静默处理
|
||||
logger.debug(f"[{self.stream_name}] 获取任务异常时出错: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 回调处理错误: {e}")
|
||||
finally:
|
||||
if self._chat_task is task:
|
||||
self._chat_task = None
|
||||
logger.debug(f"[{self.stream_name}] 任务清理完成")
|
||||
# 回调函数中的任何异常都要捕获,避免影响系统
|
||||
logger.error(f"[{self.stream_name}] 任务完成回调处理出错: {e}")
|
||||
# 确保任务引用被清理
|
||||
self._chat_task = None
|
||||
|
||||
# 改为实例方法, 移除 stream_id 参数
|
||||
async def stop_chat(self):
|
||||
"""停止当前实例的兴趣监控任务。"""
|
||||
self._disabled = True # 停止时设置停用标志
|
||||
if self._chat_task and not self._chat_task.done():
|
||||
task = self._chat_task
|
||||
logger.debug(f"[{self.stream_name}] 尝试取消normal聊天任务。")
|
||||
task.cancel()
|
||||
try:
|
||||
# 添加超时机制,最多等待2秒
|
||||
await asyncio.wait_for(task, timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"[{self.stream_name}] 等待任务取消超时,强制结束")
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 结束一般聊天模式。")
|
||||
except Exception as e:
|
||||
# 回调函数 _handle_task_completion 会处理异常日志
|
||||
logger.warning(f"[{self.stream_name}] 等待监控任务取消时捕获到异常 (可能已在回调中记录): {e}")
|
||||
finally:
|
||||
# 确保任务状态更新,即使等待出错 (回调函数也会尝试更新)
|
||||
if self._chat_task is task:
|
||||
self._chat_task = None
|
||||
logger.debug(f"[{self.stream_name}] 开始停止聊天任务")
|
||||
|
||||
# 立即设置停用标志,防止新任务启动
|
||||
self._disabled = True
|
||||
|
||||
# 如果没有运行中的任务,直接返回
|
||||
if not self._chat_task or self._chat_task.done():
|
||||
logger.debug(f"[{self.stream_name}] 没有运行中的任务,直接完成停止")
|
||||
self._chat_task = None
|
||||
return
|
||||
|
||||
# 保存任务引用并立即清空,避免回调中的循环引用
|
||||
task_to_cancel = self._chat_task
|
||||
self._chat_task = None
|
||||
|
||||
logger.debug(f"[{self.stream_name}] 取消聊天任务")
|
||||
|
||||
# 尝试优雅取消任务
|
||||
task_to_cancel.cancel()
|
||||
|
||||
# 不等待任务完成,让它自然结束
|
||||
# 这样可以避免等待过程中的潜在递归问题
|
||||
|
||||
# 异步清理思考消息,不阻塞当前流程
|
||||
asyncio.create_task(self._cleanup_thinking_messages_async())
|
||||
|
||||
logger.debug(f"[{self.stream_name}] 聊天任务停止完成")
|
||||
|
||||
# 清理所有未处理的思考消息
|
||||
async def _cleanup_thinking_messages_async(self):
|
||||
"""异步清理思考消息,避免阻塞主流程"""
|
||||
try:
|
||||
# 添加短暂延迟,让任务有时间响应取消
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
container = await message_manager.get_container(self.stream_id)
|
||||
if container:
|
||||
# 查找并移除所有 MessageThinking 类型的消息
|
||||
@@ -567,8 +644,8 @@ class NormalChat:
|
||||
container.messages.remove(msg)
|
||||
logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 清理思考消息时出错: {e}")
|
||||
traceback.print_exc()
|
||||
logger.error(f"[{self.stream_name}] 异步清理思考消息时出错: {e}")
|
||||
# 不打印完整栈跟踪,避免日志污染
|
||||
|
||||
# 获取最近回复记录的方法
|
||||
def get_recent_replies(self, limit: int = 10) -> List[dict]:
|
||||
|
||||
Reference in New Issue
Block a user