This commit is contained in:
tt-P607
2025-10-23 20:25:33 +08:00
3 changed files with 92 additions and 30 deletions

View File

@@ -351,7 +351,7 @@ class StreamLoopManager:
return False
async def _process_stream_messages(self, stream_id: str, context: StreamContext) -> bool:
"""处理流消息
"""处理流消息 - 支持子任务管理
Args:
stream_id: 流ID
@@ -367,6 +367,9 @@ class StreamLoopManager:
# 设置处理状态为正在处理
self._set_stream_processing_status(stream_id, True)
# 子任务跟踪
child_tasks = set()
try:
start_time = time.time()
@@ -375,6 +378,11 @@ class StreamLoopManager:
if cached_messages:
logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}")
# 创建子任务用于刷新能量(不阻塞主流程)
energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id))
child_tasks.add(energy_task)
energy_task.add_done_callback(lambda t: child_tasks.discard(t))
# 直接调用chatter_manager处理流上下文
task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context))
self.chatter_manager.set_processing_task(stream_id, task)
@@ -387,7 +395,6 @@ class StreamLoopManager:
if additional_messages:
logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}")
asyncio.create_task(self._refresh_focus_energy(stream_id))
process_time = time.time() - start_time
logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)")
else:
@@ -395,8 +402,19 @@ class StreamLoopManager:
return success
except asyncio.CancelledError:
logger.info(f"流处理被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
# 异常时也要清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
return False
finally:
# 无论成功或失败,都要设置处理状态为未处理

View File

@@ -435,7 +435,7 @@ class MessageManager:
await self._trigger_reprocess(chat_stream)
async def _trigger_reprocess(self, chat_stream: ChatStream):
"""重新处理聊天流的核心逻辑"""
"""重新处理聊天流的核心逻辑 - 支持子任务管理"""
try:
stream_id = chat_stream.stream_id
@@ -455,38 +455,45 @@ class MessageManager:
logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
# 创建新的处理任务
# 创建处理任务并使用try-catch实现子任务管理
task = asyncio.create_task(
self.chatter_manager.process_stream_context(stream_id, context),
self._managed_reprocess_with_cleanup(stream_id, context),
name=f"reprocess_{stream_id}_{int(time.time())}"
)
# 设置处理任务
self.chatter_manager.set_processing_task(stream_id, task)
# 等待处理完成(使用超时防止无限等待)
try:
result = await asyncio.wait_for(task, timeout=30.0)
success = result.get("success", False)
actions_count = result.get("actions_count", 0)
if success:
logger.info(f"✅ 聊天流 {stream_id} 重新处理成功: 执行了 {actions_count} 个动作")
else:
logger.warning(f"❌ 聊天流 {stream_id} 重新处理失败")
except asyncio.TimeoutError:
logger.warning(f"⏰ 聊天流 {stream_id} 重新处理超时")
if not task.done():
task.cancel()
except Exception as e:
logger.error(f"💥 聊天流 {stream_id} 重新处理出错: {e}")
if not task.done():
task.cancel()
# 等待完成,让它异步执行
# 如果需要等待,调用者会等待 chatter_manager.process_stream_context
except Exception as e:
logger.error(f"🚨 触发重新处理时出错: {e}")
async def _managed_reprocess_with_cleanup(self, stream_id: str, context):
"""带清理功能的重新处理"""
child_tasks = set() # 跟踪子任务
try:
# 处理流上下文
result = await self.chatter_manager.process_stream_context(stream_id, context)
return result
except asyncio.CancelledError:
logger.info(f"重新处理任务被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
except Exception as e:
logger.error(f"重新处理任务执行出错: {stream_id} - {e}")
# 清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
try:

View File

@@ -279,6 +279,9 @@ class DefaultReplyer:
# 初始化聊天信息
await self._initialize_chat_info()
# 子任务跟踪 - 用于取消管理
child_tasks = set()
prompt = None
if available_actions is None:
available_actions = {}
@@ -349,18 +352,36 @@ class DefaultReplyer:
# 回复生成成功后,异步存储聊天记忆(不阻塞返回)
try:
await self._store_chat_memory_async(reply_to, reply_message)
# 将记忆存储作为子任务创建,可以被取消
memory_task = asyncio.create_task(
self._store_chat_memory_async(reply_to, reply_message),
name=f"store_memory_{self.chat_stream.stream_id}"
)
# 不等待完成,让它在后台运行
# 如果父任务被取消,这个子任务也会被垃圾回收
logger.debug(f"创建记忆存储子任务: {memory_task.get_name()}")
except Exception as memory_e:
# 记忆存储失败不应该影响回复生成的成功返回
logger.warning(f"记忆存储失败,但不影响回复生成: {memory_e}")
return True, llm_response, prompt
except asyncio.CancelledError:
logger.info(f"回复生成被取消: {self.chat_stream.stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
except UserWarning as uw:
raise uw
except Exception as e:
logger.error(f"回复生成意外失败: {e}")
traceback.print_exc()
# 异常时也要清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
return False, None, prompt
async def rewrite_reply_with_context(
@@ -1338,7 +1359,15 @@ class DefaultReplyer:
logger.info(f"为超时任务 {task_name} 提供默认值")
return task_name, default_values[task_name], timeout
task_results = await asyncio.gather(*(get_task_result(name, task) for name, task in tasks.items()))
try:
task_results = await asyncio.gather(*(get_task_result(name, task) for name, task in tasks.items()))
except asyncio.CancelledError:
logger.info("Prompt构建任务被取消正在清理子任务")
# 取消所有未完成的子任务
for name, task in tasks.items():
if not task.done():
task.cancel()
raise
# 任务名称中英文映射
task_name_mapping = {
@@ -1611,10 +1640,14 @@ class DefaultReplyer:
)
# 并行执行2个构建任务
expression_habits_block, relation_info = await asyncio.gather(
self.build_expression_habits(chat_talking_prompt_half, target),
self.build_relation_info(sender, target),
)
try:
expression_habits_block, relation_info = await asyncio.gather(
self.build_expression_habits(chat_talking_prompt_half, target),
self.build_relation_info(sender, target),
)
except asyncio.CancelledError:
logger.info("表达式和关系信息构建被取消")
raise
keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target)
@@ -1980,6 +2013,10 @@ class DefaultReplyer:
logger.debug(f"已启动记忆存储任务,用户: {memory_user_display or memory_user_id}")
except asyncio.CancelledError:
logger.debug("记忆存储任务被取消")
# 这是正常情况,不需要清理子任务,因为是叶子节点
raise
except Exception as e:
logger.error(f"存储聊天记忆失败: {e}")