feat: 优化长期记忆检索和合并操作,支持图结构扩展和智能合并

This commit is contained in:
Windpicker-owo
2025-11-19 11:33:10 +08:00
parent f260a2c2f4
commit edc1cd5555
4 changed files with 687 additions and 98 deletions

View File

@@ -115,75 +115,75 @@ class BaseEvent:
if not self.enabled:
return HandlerResultsCollection([])
# 使用锁确保同一事件不能同时激活多次
async with self.event_handle_lock:
sorted_subscribers = sorted(
self.subscribers, key=lambda h: h.weight if hasattr(h, "weight") and h.weight != -1 else 0, reverse=True
# 移除全局锁,允许同一事件并发触发
# async with self.event_handle_lock:
sorted_subscribers = sorted(
self.subscribers, key=lambda h: h.weight if hasattr(h, "weight") and h.weight != -1 else 0, reverse=True
)
if not sorted_subscribers:
return HandlerResultsCollection([])
concurrency_limit = None
if max_concurrency is not None:
concurrency_limit = max_concurrency if max_concurrency > 0 else None
if concurrency_limit:
concurrency_limit = min(concurrency_limit, len(sorted_subscribers))
semaphore = (
asyncio.Semaphore(concurrency_limit)
if concurrency_limit and concurrency_limit < len(sorted_subscribers)
else None
)
async def _run_handler(subscriber):
handler_name = (
subscriber.handler_name if hasattr(subscriber, "handler_name") else subscriber.__class__.__name__
)
if not sorted_subscribers:
return HandlerResultsCollection([])
async def _invoke():
return await self._execute_subscriber(subscriber, params)
concurrency_limit = None
if max_concurrency is not None:
concurrency_limit = max_concurrency if max_concurrency > 0 else None
if concurrency_limit:
concurrency_limit = min(concurrency_limit, len(sorted_subscribers))
semaphore = (
asyncio.Semaphore(concurrency_limit)
if concurrency_limit and concurrency_limit < len(sorted_subscribers)
else None
)
async def _run_handler(subscriber):
handler_name = (
subscriber.handler_name if hasattr(subscriber, "handler_name") else subscriber.__class__.__name__
)
async def _invoke():
return await self._execute_subscriber(subscriber, params)
try:
if handler_timeout and handler_timeout > 0:
result = await asyncio.wait_for(_invoke(), timeout=handler_timeout)
else:
result = await _invoke()
except asyncio.TimeoutError:
logger.warning(f"事件处理器 {handler_name} 执行超时 ({handler_timeout}s)")
return HandlerResult(False, True, f"timeout after {handler_timeout}s", handler_name)
except Exception as exc:
logger.error(f"事件处理器 {handler_name} 执行失败: {exc}")
return HandlerResult(False, True, str(exc), handler_name)
if not isinstance(result, HandlerResult):
return HandlerResult(True, True, result, handler_name)
if not result.handler_name:
result.handler_name = handler_name
return result
async def _guarded_run(subscriber):
if semaphore:
async with semaphore:
return await _run_handler(subscriber)
return await _run_handler(subscriber)
tasks = [asyncio.create_task(_guarded_run(subscriber)) for subscriber in sorted_subscribers]
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results: list[HandlerResult] = []
for subscriber, result in zip(sorted_subscribers, results):
handler_name = (
subscriber.handler_name if hasattr(subscriber, "handler_name") else subscriber.__class__.__name__
)
if isinstance(result, Exception):
logger.error(f"事件处理器 {handler_name} 执行失败: {result}")
processed_results.append(HandlerResult(False, True, str(result), handler_name))
try:
if handler_timeout and handler_timeout > 0:
result = await asyncio.wait_for(_invoke(), timeout=handler_timeout)
else:
processed_results.append(result)
result = await _invoke()
except asyncio.TimeoutError:
logger.warning(f"事件处理器 {handler_name} 执行超时 ({handler_timeout}s)")
return HandlerResult(False, True, f"timeout after {handler_timeout}s", handler_name)
except Exception as exc:
logger.error(f"事件处理器 {handler_name} 执行失败: {exc}")
return HandlerResult(False, True, str(exc), handler_name)
return HandlerResultsCollection(processed_results)
if not isinstance(result, HandlerResult):
return HandlerResult(True, True, result, handler_name)
if not result.handler_name:
result.handler_name = handler_name
return result
async def _guarded_run(subscriber):
if semaphore:
async with semaphore:
return await _run_handler(subscriber)
return await _run_handler(subscriber)
tasks = [asyncio.create_task(_guarded_run(subscriber)) for subscriber in sorted_subscribers]
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results: list[HandlerResult] = []
for subscriber, result in zip(sorted_subscribers, results):
handler_name = (
subscriber.handler_name if hasattr(subscriber, "handler_name") else subscriber.__class__.__name__
)
if isinstance(result, Exception):
logger.error(f"事件处理器 {handler_name} 执行失败: {result}")
processed_results.append(HandlerResult(False, True, str(result), handler_name))
else:
processed_results.append(result)
return HandlerResultsCollection(processed_results)
@staticmethod
async def _execute_subscriber(subscriber, params: dict) -> HandlerResult: