This commit is contained in:
minecraft1024a
2025-08-22 13:24:27 +08:00
9 changed files with 1271 additions and 12 deletions

View File

@@ -329,7 +329,35 @@ class CycleProcessor:
action_message=action_message,
)
if not action_handler:
return False, "", ""
# 动作处理器创建失败,尝试回退机制
logger.warning(f"{self.context.log_prefix} 创建动作处理器失败: {action},尝试回退方案")
# 获取当前可用的动作
available_actions = self.context.action_manager.get_using_actions()
fallback_action = None
# 回退优先级reply > 第一个可用动作
if "reply" in available_actions:
fallback_action = "reply"
elif available_actions:
fallback_action = list(available_actions.keys())[0]
if fallback_action and fallback_action != action:
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
action_handler = self.context.action_manager.create_action(
action_name=fallback_action,
action_data=action_data,
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
cycle_timers=cycle_timers,
thinking_id=thinking_id,
chat_stream=self.context.chat_stream,
log_prefix=self.context.log_prefix,
action_message=action_message,
)
if not action_handler:
logger.error(f"{self.context.log_prefix} 回退方案也失败,无法创建任何动作处理器")
return False, "", ""
success, reply_text = await action_handler.handle_action()
return success, reply_text, ""

View File

@@ -0,0 +1,390 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Action组件诊断和修复脚本
检查no_reply等核心Action是否正确注册并尝试修复相关问题
"""
import sys
import os
from typing import Dict, Any
# 添加项目路径
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../'))
from src.common.logger import get_logger
from src.plugin_system.core.component_registry import component_registry
from src.plugin_system.core.plugin_manager import plugin_manager
from src.plugin_system.base.component_types import ComponentType
logger = get_logger("action_diagnostics")
class ActionDiagnostics:
"""Action组件诊断器"""
def __init__(self):
self.required_actions = ["no_reply", "reply", "emoji", "at_user"]
def check_plugin_loading(self) -> Dict[str, Any]:
"""检查插件加载状态"""
logger.info("开始检查插件加载状态...")
result = {
"plugins_loaded": False,
"total_plugins": 0,
"loaded_plugins": [],
"failed_plugins": [],
"core_actions_plugin": None
}
try:
# 加载所有插件
plugin_manager.load_all_plugins()
# 获取插件统计信息
stats = plugin_manager.get_stats()
result["plugins_loaded"] = True
result["total_plugins"] = stats.get("total_plugins", 0)
# 检查是否有core_actions插件
for plugin_name in plugin_manager.loaded_plugins:
result["loaded_plugins"].append(plugin_name)
if "core_actions" in plugin_name.lower():
result["core_actions_plugin"] = plugin_name
logger.info(f"插件加载成功,总数: {result['total_plugins']}")
logger.info(f"已加载插件: {result['loaded_plugins']}")
except Exception as e:
logger.error(f"插件加载失败: {e}")
result["error"] = str(e)
return result
def check_action_registry(self) -> Dict[str, Any]:
"""检查Action注册状态"""
logger.info("开始检查Action组件注册状态...")
result = {
"registered_actions": [],
"missing_actions": [],
"default_actions": {},
"total_actions": 0
}
try:
# 获取所有注册的Action
all_components = component_registry.get_all_components(ComponentType.ACTION)
result["total_actions"] = len(all_components)
for name, info in all_components.items():
result["registered_actions"].append(name)
logger.debug(f"已注册Action: {name} (插件: {info.plugin_name})")
# 检查必需的Action是否存在
for required_action in self.required_actions:
if required_action not in all_components:
result["missing_actions"].append(required_action)
logger.warning(f"缺失必需Action: {required_action}")
else:
logger.info(f"找到必需Action: {required_action}")
# 获取默认Action
default_actions = component_registry.get_default_actions()
result["default_actions"] = {name: info.plugin_name for name, info in default_actions.items()}
logger.info(f"总注册Action数量: {result['total_actions']}")
logger.info(f"缺失Action: {result['missing_actions']}")
except Exception as e:
logger.error(f"Action注册检查失败: {e}")
result["error"] = str(e)
return result
def check_specific_action(self, action_name: str) -> Dict[str, Any]:
"""检查特定Action的详细信息"""
logger.info(f"检查Action详细信息: {action_name}")
result = {
"exists": False,
"component_info": None,
"component_class": None,
"is_default": False,
"plugin_name": None
}
try:
# 检查组件信息
component_info = component_registry.get_component_info(action_name, ComponentType.ACTION)
if component_info:
result["exists"] = True
result["component_info"] = {
"name": component_info.name,
"description": component_info.description,
"plugin_name": component_info.plugin_name,
"version": component_info.version
}
result["plugin_name"] = component_info.plugin_name
logger.info(f"找到Action组件信息: {action_name}")
else:
logger.warning(f"未找到Action组件信息: {action_name}")
return result
# 检查组件类
component_class = component_registry.get_component_class(action_name, ComponentType.ACTION)
if component_class:
result["component_class"] = component_class.__name__
logger.info(f"找到Action组件类: {component_class.__name__}")
else:
logger.warning(f"未找到Action组件类: {action_name}")
# 检查是否为默认Action
default_actions = component_registry.get_default_actions()
result["is_default"] = action_name in default_actions
logger.info(f"Action {action_name} 检查完成: 存在={result['exists']}, 默认={result['is_default']}")
except Exception as e:
logger.error(f"检查Action {action_name} 失败: {e}")
result["error"] = str(e)
return result
def attempt_fix_missing_actions(self) -> Dict[str, Any]:
"""尝试修复缺失的Action"""
logger.info("尝试修复缺失的Action组件...")
result = {
"fixed_actions": [],
"still_missing": [],
"errors": []
}
try:
# 重新加载插件
plugin_manager.load_all_plugins()
# 再次检查Action注册状态
registry_check = self.check_action_registry()
for required_action in self.required_actions:
if required_action in registry_check["missing_actions"]:
try:
# 尝试手动注册核心Action
if required_action == "no_reply":
self._register_no_reply_action()
result["fixed_actions"].append(required_action)
else:
result["still_missing"].append(required_action)
except Exception as e:
error_msg = f"修复Action {required_action} 失败: {e}"
logger.error(error_msg)
result["errors"].append(error_msg)
result["still_missing"].append(required_action)
logger.info(f"Action修复完成: 已修复={result['fixed_actions']}, 仍缺失={result['still_missing']}")
except Exception as e:
error_msg = f"Action修复过程失败: {e}"
logger.error(error_msg)
result["errors"].append(error_msg)
return result
def _register_no_reply_action(self):
"""手动注册no_reply Action"""
try:
from src.plugins.built_in.core_actions.no_reply import NoReplyAction
from src.plugin_system.base.component_types import ActionInfo
# 创建Action信息
action_info = ActionInfo(
name="no_reply",
description="暂时不回复消息",
plugin_name="built_in.core_actions",
version="1.0.0"
)
# 注册Action
success = component_registry._register_action_component(action_info, NoReplyAction)
if success:
logger.info("手动注册no_reply Action成功")
else:
raise Exception("注册失败")
except Exception as e:
raise Exception(f"手动注册no_reply Action失败: {e}")
def run_full_diagnosis(self) -> Dict[str, Any]:
"""运行完整诊断"""
logger.info("🔧 开始Action组件完整诊断")
logger.info("=" * 60)
diagnosis_result = {
"plugin_status": {},
"registry_status": {},
"action_details": {},
"fix_attempts": {},
"summary": {}
}
# 1. 检查插件加载
logger.info("\n📦 步骤1: 检查插件加载状态")
diagnosis_result["plugin_status"] = self.check_plugin_loading()
# 2. 检查Action注册
logger.info("\n📋 步骤2: 检查Action注册状态")
diagnosis_result["registry_status"] = self.check_action_registry()
# 3. 检查特定Action详细信息
logger.info("\n🔍 步骤3: 检查特定Action详细信息")
diagnosis_result["action_details"] = {}
for action in self.required_actions:
diagnosis_result["action_details"][action] = self.check_specific_action(action)
# 4. 尝试修复缺失的Action
if diagnosis_result["registry_status"].get("missing_actions"):
logger.info("\n🔧 步骤4: 尝试修复缺失的Action")
diagnosis_result["fix_attempts"] = self.attempt_fix_missing_actions()
# 5. 生成诊断摘要
logger.info("\n📊 步骤5: 生成诊断摘要")
diagnosis_result["summary"] = self._generate_summary(diagnosis_result)
self._print_diagnosis_results(diagnosis_result)
return diagnosis_result
def _generate_summary(self, diagnosis_result: Dict[str, Any]) -> Dict[str, Any]:
"""生成诊断摘要"""
summary = {
"overall_status": "unknown",
"critical_issues": [],
"recommendations": []
}
try:
# 检查插件加载状态
if not diagnosis_result["plugin_status"].get("plugins_loaded"):
summary["critical_issues"].append("插件加载失败")
summary["recommendations"].append("检查插件系统配置")
# 检查必需Action
missing_actions = diagnosis_result["registry_status"].get("missing_actions", [])
if "no_reply" in missing_actions:
summary["critical_issues"].append("缺失no_reply Action")
summary["recommendations"].append("检查core_actions插件是否正确加载")
# 检查修复结果
if diagnosis_result.get("fix_attempts"):
still_missing = diagnosis_result["fix_attempts"].get("still_missing", [])
if still_missing:
summary["critical_issues"].append(f"修复后仍缺失Action: {still_missing}")
summary["recommendations"].append("需要手动修复插件注册问题")
# 确定整体状态
if not summary["critical_issues"]:
summary["overall_status"] = "healthy"
elif len(summary["critical_issues"]) <= 2:
summary["overall_status"] = "warning"
else:
summary["overall_status"] = "critical"
except Exception as e:
summary["critical_issues"].append(f"摘要生成失败: {e}")
summary["overall_status"] = "error"
return summary
def _print_diagnosis_results(self, diagnosis_result: Dict[str, Any]):
"""打印诊断结果"""
logger.info("\n" + "=" * 60)
logger.info("📈 诊断结果摘要")
logger.info("=" * 60)
summary = diagnosis_result.get("summary", {})
overall_status = summary.get("overall_status", "unknown")
# 状态指示器
status_indicators = {
"healthy": "✅ 系统健康",
"warning": "⚠️ 存在警告",
"critical": "❌ 存在严重问题",
"error": "💥 诊断出错",
"unknown": "❓ 状态未知"
}
logger.info(f"🎯 整体状态: {status_indicators.get(overall_status, overall_status)}")
# 关键问题
critical_issues = summary.get("critical_issues", [])
if critical_issues:
logger.info("\n🚨 关键问题:")
for issue in critical_issues:
logger.info(f"{issue}")
# 建议
recommendations = summary.get("recommendations", [])
if recommendations:
logger.info("\n💡 建议:")
for rec in recommendations:
logger.info(f"{rec}")
# 详细状态
plugin_status = diagnosis_result.get("plugin_status", {})
if plugin_status.get("plugins_loaded"):
logger.info(f"\n📦 插件状态: 已加载 {plugin_status.get('total_plugins', 0)} 个插件")
else:
logger.info("\n📦 插件状态: ❌ 插件加载失败")
registry_status = diagnosis_result.get("registry_status", {})
total_actions = registry_status.get("total_actions", 0)
missing_actions = registry_status.get("missing_actions", [])
logger.info(f"📋 Action状态: 已注册 {total_actions} 个,缺失 {len(missing_actions)}")
if missing_actions:
logger.info(f" 缺失的Action: {missing_actions}")
logger.info("\n" + "=" * 60)
def main():
"""主函数"""
diagnostics = ActionDiagnostics()
try:
result = diagnostics.run_full_diagnosis()
# 保存诊断结果
import json
with open("action_diagnosis_results.json", "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False, default=str)
logger.info("📄 诊断结果已保存到: action_diagnosis_results.json")
# 根据诊断结果返回适当的退出代码
summary = result.get("summary", {})
overall_status = summary.get("overall_status", "unknown")
if overall_status == "healthy":
return 0
elif overall_status == "warning":
return 1
else:
return 2
except KeyboardInterrupt:
logger.info("❌ 诊断被用户中断")
return 3
except Exception as e:
logger.error(f"❌ 诊断执行失败: {e}")
import traceback
traceback.print_exc()
return 4
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
exit_code = main()
sys.exit(exit_code)

View File

@@ -0,0 +1,240 @@
# -*- coding: utf-8 -*-
"""
异步瞬时记忆包装器
提供对现有瞬时记忆系统的异步包装,支持超时控制和回退机制
"""
import asyncio
import time
from typing import Optional, List, Dict, Any
from src.common.logger import get_logger
from src.config.config import global_config
logger = get_logger("async_instant_memory_wrapper")
class AsyncInstantMemoryWrapper:
"""异步瞬时记忆包装器"""
def __init__(self, chat_id: str):
self.chat_id = chat_id
self.llm_memory = None
self.vector_memory = None
self.cache: Dict[str, tuple[Any, float]] = {} # 缓存:(结果, 时间戳)
self.cache_ttl = 300 # 缓存5分钟
self.default_timeout = 3.0 # 默认超时3秒
# 延迟加载记忆系统
self._initialize_memory_systems()
def _initialize_memory_systems(self):
"""延迟初始化记忆系统"""
try:
# 初始化LLM记忆系统
from src.chat.memory_system.instant_memory import InstantMemory
self.llm_memory = InstantMemory(self.chat_id)
logger.debug(f"LLM瞬时记忆系统已初始化: {self.chat_id}")
except Exception as e:
logger.warning(f"LLM瞬时记忆系统初始化失败: {e}")
try:
# 初始化向量记忆系统
from src.chat.memory_system.vector_instant_memory import VectorInstantMemoryV2
self.vector_memory = VectorInstantMemoryV2(self.chat_id)
logger.debug(f"向量瞬时记忆系统已初始化: {self.chat_id}")
except Exception as e:
logger.warning(f"向量瞬时记忆系统初始化失败: {e}")
def _get_cache_key(self, operation: str, content: str) -> str:
"""生成缓存键"""
return f"{operation}_{self.chat_id}_{hash(content)}"
def _is_cache_valid(self, cache_key: str) -> bool:
"""检查缓存是否有效"""
if cache_key not in self.cache:
return False
_, timestamp = self.cache[cache_key]
return time.time() - timestamp < self.cache_ttl
def _get_cached_result(self, cache_key: str) -> Optional[Any]:
"""获取缓存结果"""
if self._is_cache_valid(cache_key):
result, _ = self.cache[cache_key]
return result
return None
def _cache_result(self, cache_key: str, result: Any):
"""缓存结果"""
self.cache[cache_key] = (result, time.time())
async def store_memory_async(self, content: str, timeout: float = None) -> bool:
"""异步存储记忆(带超时控制)"""
if timeout is None:
timeout = self.default_timeout
success_count = 0
total_systems = 0
# 异步存储到LLM记忆系统
if self.llm_memory:
total_systems += 1
try:
await asyncio.wait_for(
self.llm_memory.create_and_store_memory(content),
timeout=timeout
)
success_count += 1
logger.debug(f"LLM记忆存储成功: {content[:50]}...")
except asyncio.TimeoutError:
logger.warning(f"LLM记忆存储超时: {content[:50]}...")
except Exception as e:
logger.error(f"LLM记忆存储失败: {e}")
# 异步存储到向量记忆系统
if self.vector_memory:
total_systems += 1
try:
await asyncio.wait_for(
self.vector_memory.store_message(content),
timeout=timeout
)
success_count += 1
logger.debug(f"向量记忆存储成功: {content[:50]}...")
except asyncio.TimeoutError:
logger.warning(f"向量记忆存储超时: {content[:50]}...")
except Exception as e:
logger.error(f"向量记忆存储失败: {e}")
return success_count > 0
async def retrieve_memory_async(self, query: str, timeout: float = None,
use_cache: bool = True) -> Optional[Any]:
"""异步检索记忆(带缓存和超时控制)"""
if timeout is None:
timeout = self.default_timeout
# 检查缓存
if use_cache:
cache_key = self._get_cache_key("retrieve", query)
cached_result = self._get_cached_result(cache_key)
if cached_result is not None:
logger.debug(f"记忆检索命中缓存: {query[:30]}...")
return cached_result
# 尝试多种记忆系统
results = []
# 从向量记忆系统检索(优先,速度快)
if self.vector_memory:
try:
vector_result = await asyncio.wait_for(
self.vector_memory.get_memory_for_context(query),
timeout=timeout * 0.6 # 给向量系统60%的时间
)
if vector_result:
results.append(vector_result)
logger.debug(f"向量记忆检索成功: {query[:30]}...")
except asyncio.TimeoutError:
logger.warning(f"向量记忆检索超时: {query[:30]}...")
except Exception as e:
logger.error(f"向量记忆检索失败: {e}")
# 从LLM记忆系统检索备用更准确但较慢
if self.llm_memory and len(results) == 0: # 只有向量检索失败时才使用LLM
try:
llm_result = await asyncio.wait_for(
self.llm_memory.get_memory(query),
timeout=timeout * 0.4 # 给LLM系统40%的时间
)
if llm_result:
results.extend(llm_result)
logger.debug(f"LLM记忆检索成功: {query[:30]}...")
except asyncio.TimeoutError:
logger.warning(f"LLM记忆检索超时: {query[:30]}...")
except Exception as e:
logger.error(f"LLM记忆检索失败: {e}")
# 合并结果
final_result = None
if results:
if len(results) == 1:
final_result = results[0]
else:
# 合并多个结果
if isinstance(results[0], str):
final_result = "\n".join(str(r) for r in results)
elif isinstance(results[0], list):
final_result = []
for r in results:
if isinstance(r, list):
final_result.extend(r)
else:
final_result.append(r)
else:
final_result = results[0] # 使用第一个结果
# 缓存结果
if use_cache and final_result is not None:
cache_key = self._get_cache_key("retrieve", query)
self._cache_result(cache_key, final_result)
return final_result
async def get_memory_with_fallback(self, query: str, max_timeout: float = 2.0) -> str:
"""获取记忆的回退方法,保证不会长时间阻塞"""
try:
# 首先尝试快速检索
result = await self.retrieve_memory_async(query, timeout=max_timeout)
if result:
if isinstance(result, list):
return "\n".join(str(item) for item in result)
return str(result)
return ""
except Exception as e:
logger.error(f"记忆检索完全失败: {e}")
return ""
def store_memory_background(self, content: str):
"""在后台存储记忆(发后即忘模式)"""
async def background_store():
try:
await self.store_memory_async(content, timeout=10.0) # 后台任务可以用更长超时
except Exception as e:
logger.error(f"后台记忆存储失败: {e}")
# 创建后台任务
asyncio.create_task(background_store())
def get_status(self) -> Dict[str, Any]:
"""获取包装器状态"""
return {
"chat_id": self.chat_id,
"llm_memory_available": self.llm_memory is not None,
"vector_memory_available": self.vector_memory is not None,
"cache_entries": len(self.cache),
"cache_ttl": self.cache_ttl,
"default_timeout": self.default_timeout
}
def clear_cache(self):
"""清理缓存"""
self.cache.clear()
logger.info(f"记忆缓存已清理: {self.chat_id}")
# 缓存包装器实例,避免重复创建
_wrapper_cache: Dict[str, AsyncInstantMemoryWrapper] = {}
def get_async_instant_memory(chat_id: str) -> AsyncInstantMemoryWrapper:
"""获取异步瞬时记忆包装器实例"""
if chat_id not in _wrapper_cache:
_wrapper_cache[chat_id] = AsyncInstantMemoryWrapper(chat_id)
return _wrapper_cache[chat_id]
def clear_wrapper_cache():
"""清理包装器缓存"""
global _wrapper_cache
_wrapper_cache.clear()
logger.info("异步瞬时记忆包装器缓存已清理")

View File

@@ -0,0 +1,337 @@
# -*- coding: utf-8 -*-
"""
异步记忆系统优化器
解决记忆系统阻塞主程序的问题,将同步操作改为异步非阻塞操作
"""
import asyncio
import time
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
from queue import Queue
import threading
from concurrent.futures import ThreadPoolExecutor
from src.common.logger import get_logger
from src.config.config import global_config
logger = get_logger("async_memory_optimizer")
@dataclass
class MemoryTask:
"""记忆任务数据结构"""
task_id: str
task_type: str # "store", "retrieve", "build"
chat_id: str
content: str
priority: int = 1 # 1=低优先级, 2=中优先级, 3=高优先级
callback: Optional[Callable] = None
created_at: float = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
class AsyncMemoryQueue:
"""异步记忆任务队列管理器"""
def __init__(self, max_workers: int = 3):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = asyncio.Queue()
self.running_tasks: Dict[str, asyncio.Task] = {}
self.completed_tasks: Dict[str, Any] = {}
self.failed_tasks: Dict[str, str] = {}
self.is_running = False
self.worker_tasks: List[asyncio.Task] = []
async def start(self):
"""启动异步队列处理器"""
if self.is_running:
return
self.is_running = True
# 启动多个工作协程
for i in range(self.max_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.worker_tasks.append(worker)
logger.info(f"异步记忆队列已启动,工作线程数: {self.max_workers}")
async def stop(self):
"""停止队列处理器"""
self.is_running = False
# 等待所有工作任务完成
for task in self.worker_tasks:
task.cancel()
await asyncio.gather(*self.worker_tasks, return_exceptions=True)
self.executor.shutdown(wait=True)
logger.info("异步记忆队列已停止")
async def _worker(self, worker_name: str):
"""工作协程,处理队列中的任务"""
logger.info(f"记忆处理工作线程 {worker_name} 启动")
while self.is_running:
try:
# 等待任务超时1秒避免永久阻塞
task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
# 执行任务
await self._execute_task(task, worker_name)
except asyncio.TimeoutError:
# 超时正常,继续下一次循环
continue
except Exception as e:
logger.error(f"工作线程 {worker_name} 处理任务时出错: {e}")
async def _execute_task(self, task: MemoryTask, worker_name: str):
"""执行具体的记忆任务"""
try:
logger.debug(f"[{worker_name}] 开始处理任务: {task.task_type} - {task.task_id}")
start_time = time.time()
# 根据任务类型执行不同的处理逻辑
result = None
if task.task_type == "store":
result = await self._handle_store_task(task)
elif task.task_type == "retrieve":
result = await self._handle_retrieve_task(task)
elif task.task_type == "build":
result = await self._handle_build_task(task)
else:
raise ValueError(f"未知的任务类型: {task.task_type}")
# 记录完成的任务
self.completed_tasks[task.task_id] = result
execution_time = time.time() - start_time
logger.debug(f"[{worker_name}] 任务完成: {task.task_id} (耗时: {execution_time:.2f}s)")
# 执行回调函数
if task.callback:
try:
if asyncio.iscoroutinefunction(task.callback):
await task.callback(result)
else:
task.callback(result)
except Exception as e:
logger.error(f"任务回调执行失败: {e}")
except Exception as e:
error_msg = f"任务执行失败: {e}"
logger.error(f"[{worker_name}] {error_msg}")
self.failed_tasks[task.task_id] = error_msg
# 执行错误回调
if task.callback:
try:
if asyncio.iscoroutinefunction(task.callback):
await task.callback(None)
else:
task.callback(None)
except:
pass
async def _handle_store_task(self, task: MemoryTask) -> Any:
"""处理记忆存储任务"""
# 这里需要根据具体的记忆系统来实现
# 为了避免循环导入,这里使用延迟导入
try:
from src.chat.memory_system.instant_memory import InstantMemory
instant_memory = InstantMemory(task.chat_id)
await instant_memory.create_and_store_memory(task.content)
return True
except Exception as e:
logger.error(f"记忆存储失败: {e}")
return False
async def _handle_retrieve_task(self, task: MemoryTask) -> Any:
"""处理记忆检索任务"""
try:
from src.chat.memory_system.instant_memory import InstantMemory
instant_memory = InstantMemory(task.chat_id)
memories = await instant_memory.get_memory(task.content)
return memories or []
except Exception as e:
logger.error(f"记忆检索失败: {e}")
return []
async def _handle_build_task(self, task: MemoryTask) -> Any:
"""处理记忆构建任务(海马体系统)"""
try:
# 延迟导入避免循环依赖
if global_config.memory.enable_memory:
from src.chat.memory_system.Hippocampus import hippocampus_manager
if hippocampus_manager._initialized:
await hippocampus_manager.build_memory()
return True
return False
except Exception as e:
logger.error(f"记忆构建失败: {e}")
return False
async def add_task(self, task: MemoryTask) -> str:
"""添加任务到队列"""
await self.task_queue.put(task)
self.running_tasks[task.task_id] = task
logger.debug(f"任务已加入队列: {task.task_type} - {task.task_id}")
return task.task_id
def get_task_result(self, task_id: str) -> Optional[Any]:
"""获取任务结果(非阻塞)"""
return self.completed_tasks.get(task_id)
def is_task_completed(self, task_id: str) -> bool:
"""检查任务是否完成"""
return task_id in self.completed_tasks or task_id in self.failed_tasks
def get_queue_status(self) -> Dict[str, Any]:
"""获取队列状态"""
return {
"is_running": self.is_running,
"queue_size": self.task_queue.qsize(),
"running_tasks": len(self.running_tasks),
"completed_tasks": len(self.completed_tasks),
"failed_tasks": len(self.failed_tasks),
"worker_count": len(self.worker_tasks)
}
class NonBlockingMemoryManager:
"""非阻塞记忆管理器"""
def __init__(self):
self.queue = AsyncMemoryQueue(max_workers=3)
self.cache: Dict[str, Any] = {}
self.cache_ttl: Dict[str, float] = {}
self.cache_timeout = 300 # 缓存5分钟
async def initialize(self):
"""初始化管理器"""
await self.queue.start()
logger.info("非阻塞记忆管理器已初始化")
async def shutdown(self):
"""关闭管理器"""
await self.queue.stop()
logger.info("非阻塞记忆管理器已关闭")
async def store_memory_async(self, chat_id: str, content: str,
callback: Optional[Callable] = None) -> str:
"""异步存储记忆(非阻塞)"""
task = MemoryTask(
task_id=f"store_{chat_id}_{int(time.time() * 1000)}",
task_type="store",
chat_id=chat_id,
content=content,
priority=1, # 存储优先级较低
callback=callback
)
return await self.queue.add_task(task)
async def retrieve_memory_async(self, chat_id: str, query: str,
callback: Optional[Callable] = None) -> str:
"""异步检索记忆(非阻塞)"""
# 先检查缓存
cache_key = f"retrieve_{chat_id}_{hash(query)}"
if self._is_cache_valid(cache_key):
result = self.cache[cache_key]
if callback:
if asyncio.iscoroutinefunction(callback):
await callback(result)
else:
callback(result)
return "cache_hit"
task = MemoryTask(
task_id=f"retrieve_{chat_id}_{int(time.time() * 1000)}",
task_type="retrieve",
chat_id=chat_id,
content=query,
priority=2, # 检索优先级中等
callback=self._create_cache_callback(cache_key, callback)
)
return await self.queue.add_task(task)
async def build_memory_async(self, callback: Optional[Callable] = None) -> str:
"""异步构建记忆(非阻塞)"""
task = MemoryTask(
task_id=f"build_memory_{int(time.time() * 1000)}",
task_type="build",
chat_id="system",
content="",
priority=1, # 构建优先级较低,避免影响用户体验
callback=callback
)
return await self.queue.add_task(task)
def _is_cache_valid(self, cache_key: str) -> bool:
"""检查缓存是否有效"""
if cache_key not in self.cache:
return False
return time.time() - self.cache_ttl.get(cache_key, 0) < self.cache_timeout
def _create_cache_callback(self, cache_key: str, original_callback: Optional[Callable]):
"""创建带缓存的回调函数"""
async def cache_callback(result):
# 存储到缓存
if result is not None:
self.cache[cache_key] = result
self.cache_ttl[cache_key] = time.time()
# 执行原始回调
if original_callback:
if asyncio.iscoroutinefunction(original_callback):
await original_callback(result)
else:
original_callback(result)
return cache_callback
def get_cached_memory(self, chat_id: str, query: str) -> Optional[Any]:
"""获取缓存的记忆(同步,立即返回)"""
cache_key = f"retrieve_{chat_id}_{hash(query)}"
if self._is_cache_valid(cache_key):
return self.cache[cache_key]
return None
def get_status(self) -> Dict[str, Any]:
"""获取管理器状态"""
status = self.queue.get_queue_status()
status.update({
"cache_entries": len(self.cache),
"cache_timeout": self.cache_timeout
})
return status
# 全局实例
async_memory_manager = NonBlockingMemoryManager()
# 便捷函数
async def store_memory_nonblocking(chat_id: str, content: str) -> str:
"""非阻塞存储记忆的便捷函数"""
return await async_memory_manager.store_memory_async(chat_id, content)
async def retrieve_memory_nonblocking(chat_id: str, query: str) -> Optional[Any]:
"""非阻塞检索记忆的便捷函数,支持缓存"""
# 先尝试从缓存获取
cached_result = async_memory_manager.get_cached_memory(chat_id, query)
if cached_result is not None:
return cached_result
# 缓存未命中,启动异步检索
await async_memory_manager.retrieve_memory_async(chat_id, query)
return None # 返回None表示需要异步获取
async def build_memory_nonblocking() -> str:
"""非阻塞构建记忆的便捷函数"""
return await async_memory_manager.build_memory_async()

View File

@@ -323,18 +323,69 @@ class ActionPlanner:
)
reasoning = f"LLM 返回了当前不可用的动作 '{action}' (可用: {list(current_available_actions.keys())})。原始理由: {reasoning}"
action = "no_reply"
# 检查no_reply是否可用如果不可用则使用reply作为终极回退
if "no_reply" not in current_available_actions:
if "reply" in current_available_actions:
action = "reply"
reasoning += " (no_reply不可用使用reply作为回退)"
logger.warning(f"{self.log_prefix}no_reply不可用使用reply作为回退")
else:
# 如果连reply都不可用使用第一个可用的动作
if current_available_actions:
action = list(current_available_actions.keys())[0]
reasoning += f" (no_reply和reply都不可用使用{action}作为回退)"
logger.warning(f"{self.log_prefix}no_reply和reply都不可用使用{action}作为回退")
else:
# 如果没有任何可用动作,这是一个严重错误
logger.error(f"{self.log_prefix}没有任何可用动作,系统状态异常")
action = "no_reply" # 仍然尝试no_reply让上层处理
# 对no_reply动作本身也进行可用性检查
elif action == "no_reply" and "no_reply" not in current_available_actions:
if "reply" in current_available_actions:
action = "reply"
reasoning = f"no_reply不可用自动回退到reply。原因: {reasoning}"
logger.warning(f"{self.log_prefix}no_reply不可用自动回退到reply")
elif current_available_actions:
action = list(current_available_actions.keys())[0]
reasoning = f"no_reply不可用自动回退到{action}。原因: {reasoning}"
logger.warning(f"{self.log_prefix}no_reply不可用自动回退到{action}")
else:
logger.error(f"{self.log_prefix}没有任何可用动作保持no_reply让上层处理")
except Exception as json_e:
logger.warning(f"{self.log_prefix}解析LLM响应JSON失败 {json_e}. LLM原始输出: '{llm_content}'")
traceback.print_exc()
reasoning = f"解析LLM响应JSON失败: {json_e}. 将使用默认动作 'no_reply'."
action = "no_reply"
# 检查no_reply是否可用
if "no_reply" not in current_available_actions:
if "reply" in current_available_actions:
action = "reply"
reasoning += " (no_reply不可用使用reply)"
elif current_available_actions:
action = list(current_available_actions.keys())[0]
reasoning += f" (no_reply不可用使用{action})"
except Exception as outer_e:
logger.error(f"{self.log_prefix}Planner 处理过程中发生意外错误,规划失败,将执行 no_reply: {outer_e}")
traceback.print_exc()
action = "no_reply"
reasoning = f"Planner 内部处理错误: {outer_e}"
# 检查no_reply是否可用
current_available_actions = self.action_manager.get_using_actions()
if "no_reply" not in current_available_actions:
if "reply" in current_available_actions:
action = "reply"
reasoning += " (no_reply不可用使用reply)"
elif current_available_actions:
action = list(current_available_actions.keys())[0]
reasoning += f" (no_reply不可用使用{action})"
else:
logger.error(f"{self.log_prefix}严重错误:没有任何可用动作")
is_parallel = False
if mode == ChatMode.NORMAL and action in current_available_actions:

View File

@@ -547,13 +547,79 @@ class DefaultReplyer:
)
if global_config.memory.enable_instant_memory:
# 异步存储聊天历史到向量记忆系统
asyncio.create_task(self.instant_memory.store_message(chat_history))
# 使用异步记忆包装器(最优化的非阻塞模式)
try:
from src.chat.memory_system.async_instant_memory_wrapper import get_async_instant_memory
# 获取异步记忆包装器
async_memory = get_async_instant_memory(self.chat_stream.stream_id)
# 后台存储聊天历史(完全非阻塞)
async_memory.store_memory_background(chat_history)
# 快速检索记忆最大超时2秒
instant_memory = await async_memory.get_memory_with_fallback(target, max_timeout=2.0)
logger.info(f"异步瞬时记忆:{instant_memory}")
except ImportError:
# 如果异步包装器不可用,尝试使用异步记忆管理器
try:
from src.chat.memory_system.async_memory_optimizer import (
retrieve_memory_nonblocking,
store_memory_nonblocking
)
# 异步存储聊天历史(非阻塞)
asyncio.create_task(store_memory_nonblocking(
chat_id=self.chat_stream.stream_id,
content=chat_history
))
# 尝试从缓存获取瞬时记忆
instant_memory = await retrieve_memory_nonblocking(
chat_id=self.chat_stream.stream_id,
query=target
)
# 如果没有缓存结果,快速检索一次
if instant_memory is None:
try:
instant_memory = await asyncio.wait_for(
self.instant_memory.get_memory_for_context(target),
timeout=1.5
)
except asyncio.TimeoutError:
logger.warning("瞬时记忆检索超时,使用空结果")
instant_memory = ""
logger.info(f"向量瞬时记忆:{instant_memory}")
except ImportError:
# 最后的fallback使用原有逻辑但加上超时控制
logger.warning("异步记忆系统不可用,使用带超时的同步方式")
# 异步存储聊天历史
asyncio.create_task(self.instant_memory.store_message(chat_history))
# 从向量记忆系统获取相关记忆上下文
instant_memory = await self.instant_memory.get_memory_for_context(target)
# 带超时的记忆检索
try:
instant_memory = await asyncio.wait_for(
self.instant_memory.get_memory_for_context(target),
timeout=1.0 # 最保守的1秒超时
)
except asyncio.TimeoutError:
logger.warning("瞬时记忆检索超时,跳过记忆获取")
instant_memory = ""
except Exception as e:
logger.error(f"瞬时记忆检索失败: {e}")
instant_memory = ""
logger.info(f"同步瞬时记忆:{instant_memory}")
logger.info(f"向量瞬时记忆:{instant_memory}")
except Exception as e:
logger.error(f"瞬时记忆系统异常: {e}")
instant_memory = ""
# 构建记忆字符串,即使某种记忆为空也要继续
memory_str = ""

View File

@@ -77,6 +77,22 @@ class MainSystem:
logger.info("🛑 插件热重载系统已停止")
except Exception as e:
logger.error(f"停止热重载系统时出错: {e}")
try:
# 停止异步记忆管理器
if global_config.memory.enable_memory:
from src.chat.memory_system.async_memory_optimizer import async_memory_manager
import asyncio
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(async_memory_manager.shutdown())
else:
loop.run_until_complete(async_memory_manager.shutdown())
logger.info("🛑 记忆管理器已停止")
except ImportError:
pass # 异步记忆优化器不存在
except Exception as e:
logger.error(f"停止记忆管理器时出错: {e}")
async def initialize(self):
"""初始化系统组件"""
@@ -92,7 +108,7 @@ class MainSystem:
("初墨小姐宇宙第一(不是)", 10), #15
("world.execute(me);", 10),
("正在尝试连接到MaiBot的服务器...连接失败...正在转接到maimaiDX", 10),
("你的bug就像星星一样多而我的代码像太阳一样一出来就看不见了。 (金日成...误💦)", 10),
("你的bug就像星星一样多而我的代码像太阳一样一出来就看不见了。", 10),
("温馨提示:请不要在代码中留下任何魔法数字,除非你知道它的含义。", 10),
("世界上只有10种人懂二进制的和不懂的。", 10),
("喵喵~你的麦麦被猫娘入侵了喵~", 15),
@@ -167,6 +183,16 @@ MaiMbot-Pro-Max(第三方修改版)
if self.hippocampus_manager:
self.hippocampus_manager.initialize()
logger.info("记忆系统初始化成功")
# 初始化异步记忆管理器
try:
from src.chat.memory_system.async_memory_optimizer import async_memory_manager
await async_memory_manager.initialize()
logger.info("记忆管理器初始化成功")
except ImportError:
logger.warning("异步记忆优化方法不可用,将回退使用同步模式")
except Exception as e:
logger.error(f"记忆管理器初始化失败: {e}")
else:
logger.info("记忆系统已禁用,跳过初始化")
@@ -223,8 +249,50 @@ MaiMbot-Pro-Max(第三方修改版)
"""记忆构建任务"""
while True:
await asyncio.sleep(global_config.memory.memory_build_interval)
logger.info("正在进行记忆构建")
await self.hippocampus_manager.build_memory() # type: ignore
try:
# 使用异步记忆管理器进行非阻塞记忆构建
from src.chat.memory_system.async_memory_optimizer import build_memory_nonblocking
logger.info("正在启动记忆构建")
# 定义构建完成的回调函数
def build_completed(result):
if result:
logger.info("记忆构建完成")
else:
logger.warning("记忆构建失败")
# 启动异步构建,不等待完成
task_id = await build_memory_nonblocking()
logger.info(f"记忆构建任务已提交:{task_id}")
except ImportError:
# 如果异步优化器不可用,使用原有的同步方式(但在单独的线程中运行)
logger.warning("记忆优化器不可用,使用线性运行执行记忆构建")
def sync_build_memory():
"""在线程池中执行同步记忆构建"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(self.hippocampus_manager.build_memory())
logger.info("记忆构建完成")
return result
except Exception as e:
logger.error(f"记忆构建失败: {e}")
return None
finally:
loop.close()
# 在线程池中执行记忆构建
asyncio.get_event_loop().run_in_executor(None, sync_build_memory)
except Exception as e:
logger.error(f"记忆构建任务启动失败: {e}")
# fallback到原有的同步方式
logger.info("正在进行记忆构建(同步模式)")
await self.hippocampus_manager.build_memory() # type: ignore
async def forget_memory_task(self):
"""记忆遗忘任务"""

View File

@@ -14,9 +14,9 @@ logger = get_logger("no_reply_action")
class NoReplyAction(BaseAction):
"""不回复动作支持waiting和breaking两种形式."""
focus_activation_type = ActionActivationType.NEVER
normal_activation_type = ActionActivationType.NEVER
mode_enable = ChatMode.FOCUS
focus_activation_type = ActionActivationType.ALWAYS # 修复在focus模式下应该始终可用
normal_activation_type = ActionActivationType.ALWAYS # 修复在normal模式下应该始终可用
mode_enable = ChatMode.FOCUS | ChatMode.NORMAL # 修复:在所有模式下都可用
parallel_action = False
# 动作基本信息

View File

@@ -0,0 +1,79 @@
from typing import Tuple
# 导入新插件系统
from src.plugin_system import BaseAction, ActionActivationType, ChatMode
# 导入依赖的系统组件
from src.common.logger import get_logger
from src.plugin_system.apis import generator_api
logger = get_logger("reply_action")
class ReplyAction(BaseAction):
"""基本回复动作:确保系统始终有一个可用的回退动作!!!"""
focus_activation_type = ActionActivationType.ALWAYS
normal_activation_type = ActionActivationType.ALWAYS
mode_enable = ChatMode.FOCUS | ChatMode.NORMAL
parallel_action = False
# 动作基本信息
action_name = "reply"
action_description = "进行基本回复"
# 动作参数定义
action_parameters = {}
# 动作使用场景
action_require = [""]
# 关联类型
associated_types = []
async def execute(self) -> Tuple[bool, str]:
"""执行回复动作"""
try:
reason = self.action_data.get("reason", "")
logger.info(f"{self.log_prefix} 执行基本回复动作,原因: {reason}")
# 获取当前消息和上下文
if not self.chat_stream or not self.chat_stream.get_latest_message():
logger.warning(f"{self.log_prefix} 没有可回复的消息")
return False, ""
latest_message = self.chat_stream.get_latest_message()
# 使用生成器API生成回复
success, reply_set, _ = await generator_api.generate_reply(
target_message=latest_message.processed_plain_text,
chat_stream=self.chat_stream,
reasoning=reason,
action_message={}
)
if success and reply_set:
# 提取回复文本
reply_text = ""
for message_type, content in reply_set:
if message_type == "text":
reply_text += content
break
if reply_text:
logger.info(f"{self.log_prefix} 回复生成成功: {reply_text[:50]}...")
return True, reply_text
else:
logger.warning(f"{self.log_prefix} 生成的回复为空")
return False, ""
else:
logger.warning(f"{self.log_prefix} 回复生成失败")
return False, ""
except Exception as e:
logger.error(f"{self.log_prefix} 执行回复动作时发生异常: {e}")
import traceback
traceback.print_exc()
return False, ""