This commit is contained in:
minecraft1024a
2025-11-01 10:59:38 +08:00
committed by Windpicker-owo
parent 4a0bd5845e
commit 12c2b806a2
10 changed files with 30 additions and 30 deletions

View File

@@ -24,20 +24,20 @@ class MessageUpdateBatcher:
优化: 将多个消息ID更新操作批量处理减少数据库连接次数
"""
def __init__(self, batch_size: int = 20, flush_interval: float = 2.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_updates: deque = deque()
self._lock = asyncio.Lock()
self._flush_task = None
async def start(self):
"""启动自动刷新任务"""
if self._flush_task is None:
self._flush_task = asyncio.create_task(self._auto_flush_loop())
logger.debug("消息更新批处理器已启动")
async def stop(self):
"""停止批处理器"""
if self._flush_task:
@@ -47,29 +47,29 @@ class MessageUpdateBatcher:
except asyncio.CancelledError:
pass
self._flush_task = None
# 刷新剩余的更新
await self.flush()
logger.debug("消息更新批处理器已停止")
async def add_update(self, mmc_message_id: str, qq_message_id: str):
"""添加消息ID更新到批处理队列"""
async with self._lock:
self.pending_updates.append((mmc_message_id, qq_message_id))
# 如果达到批量大小,立即刷新
if len(self.pending_updates) >= self.batch_size:
await self.flush()
async def flush(self):
"""执行批量更新"""
async with self._lock:
if not self.pending_updates:
return
updates = list(self.pending_updates)
self.pending_updates.clear()
try:
async with get_db_session() as session:
updated_count = 0
@@ -81,15 +81,15 @@ class MessageUpdateBatcher:
)
if result.rowcount > 0:
updated_count += 1
await session.commit()
if updated_count > 0:
logger.debug(f"批量更新了 {updated_count}/{len(updates)} 条消息ID")
except Exception as e:
logger.error(f"批量更新消息ID失败: {e}")
async def _auto_flush_loop(self):
"""自动刷新循环"""
while True: