ruff
This commit is contained in:
@@ -245,11 +245,11 @@ class ExpressionLearner:
|
||||
all_expressions = await session.execute(
|
||||
select(Expression).where(Expression.chat_id == self.chat_id)
|
||||
)
|
||||
|
||||
|
||||
for expr in all_expressions.scalars():
|
||||
# 确保create_date存在,如果不存在则使用last_active_time
|
||||
create_date = expr.create_date if expr.create_date is not None else expr.last_active_time
|
||||
|
||||
|
||||
expr_data = {
|
||||
"situation": expr.situation,
|
||||
"style": expr.style,
|
||||
@@ -259,13 +259,13 @@ class ExpressionLearner:
|
||||
"type": expr.type,
|
||||
"create_date": create_date,
|
||||
}
|
||||
|
||||
|
||||
# 根据类型分类
|
||||
if expr.type == "style":
|
||||
learnt_style_expressions.append(expr_data)
|
||||
elif expr.type == "grammar":
|
||||
learnt_grammar_expressions.append(expr_data)
|
||||
|
||||
|
||||
return learnt_style_expressions, learnt_grammar_expressions
|
||||
|
||||
async def _apply_global_decay_to_database(self, current_time: float) -> None:
|
||||
|
||||
@@ -354,7 +354,7 @@ class MessageManager:
|
||||
# 取消 stream_loop_task,子任务会通过 try-catch 自动取消
|
||||
try:
|
||||
stream_loop_task.cancel()
|
||||
|
||||
|
||||
# 等待任务真正结束(设置超时避免死锁)
|
||||
try:
|
||||
await asyncio.wait_for(stream_loop_task, timeout=2.0)
|
||||
|
||||
@@ -24,20 +24,20 @@ class MessageUpdateBatcher:
|
||||
|
||||
优化: 将多个消息ID更新操作批量处理,减少数据库连接次数
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, batch_size: int = 20, flush_interval: float = 2.0):
|
||||
self.batch_size = batch_size
|
||||
self.flush_interval = flush_interval
|
||||
self.pending_updates: deque = deque()
|
||||
self._lock = asyncio.Lock()
|
||||
self._flush_task = None
|
||||
|
||||
|
||||
async def start(self):
|
||||
"""启动自动刷新任务"""
|
||||
if self._flush_task is None:
|
||||
self._flush_task = asyncio.create_task(self._auto_flush_loop())
|
||||
logger.debug("消息更新批处理器已启动")
|
||||
|
||||
|
||||
async def stop(self):
|
||||
"""停止批处理器"""
|
||||
if self._flush_task:
|
||||
@@ -47,29 +47,29 @@ class MessageUpdateBatcher:
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._flush_task = None
|
||||
|
||||
|
||||
# 刷新剩余的更新
|
||||
await self.flush()
|
||||
logger.debug("消息更新批处理器已停止")
|
||||
|
||||
|
||||
async def add_update(self, mmc_message_id: str, qq_message_id: str):
|
||||
"""添加消息ID更新到批处理队列"""
|
||||
async with self._lock:
|
||||
self.pending_updates.append((mmc_message_id, qq_message_id))
|
||||
|
||||
|
||||
# 如果达到批量大小,立即刷新
|
||||
if len(self.pending_updates) >= self.batch_size:
|
||||
await self.flush()
|
||||
|
||||
|
||||
async def flush(self):
|
||||
"""执行批量更新"""
|
||||
async with self._lock:
|
||||
if not self.pending_updates:
|
||||
return
|
||||
|
||||
|
||||
updates = list(self.pending_updates)
|
||||
self.pending_updates.clear()
|
||||
|
||||
|
||||
try:
|
||||
async with get_db_session() as session:
|
||||
updated_count = 0
|
||||
@@ -81,15 +81,15 @@ class MessageUpdateBatcher:
|
||||
)
|
||||
if result.rowcount > 0:
|
||||
updated_count += 1
|
||||
|
||||
|
||||
await session.commit()
|
||||
|
||||
|
||||
if updated_count > 0:
|
||||
logger.debug(f"批量更新了 {updated_count}/{len(updates)} 条消息ID")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量更新消息ID失败: {e}")
|
||||
|
||||
|
||||
async def _auto_flush_loop(self):
|
||||
"""自动刷新循环"""
|
||||
while True:
|
||||
|
||||
@@ -628,7 +628,7 @@ class ChatterActionManager:
|
||||
if not first_replied:
|
||||
# 决定是否引用回复
|
||||
is_private_chat = not bool(chat_stream.group_info)
|
||||
|
||||
|
||||
# 如果明确指定了should_quote_reply,则使用指定值
|
||||
if should_quote_reply is not None:
|
||||
set_reply_flag = should_quote_reply and bool(message_data)
|
||||
@@ -641,7 +641,7 @@ class ChatterActionManager:
|
||||
logger.debug(
|
||||
f"📤 [ActionManager] 使用默认引用逻辑: 默认不引用(is_private={is_private_chat})"
|
||||
)
|
||||
|
||||
|
||||
logger.debug(
|
||||
f"📤 [ActionManager] 准备发送第一段回复。message_data: {message_data}, set_reply: {set_reply_flag}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user