fix:再次尝试修复log占用

This commit is contained in:
SengokuCola
2025-06-15 21:30:05 +08:00
parent 51342da589
commit ef0973739a
3 changed files with 132 additions and 226 deletions

View File

@@ -25,9 +25,23 @@ def get_file_handler():
"""获取文件handler单例""" """获取文件handler单例"""
global _file_handler global _file_handler
if _file_handler is None: if _file_handler is None:
# 确保日志目录存在
LOG_DIR.mkdir(exist_ok=True)
# 检查是否已有其他handler在使用同一个文件
log_file_path = LOG_DIR / "app.log.jsonl"
root_logger = logging.getLogger()
# 检查现有handler避免重复创建
for handler in root_logger.handlers:
if isinstance(handler, logging.handlers.RotatingFileHandler):
if hasattr(handler, 'baseFilename') and Path(handler.baseFilename) == log_file_path:
_file_handler = handler
return _file_handler
# 使用带压缩功能的handler使用硬编码的默认值 # 使用带压缩功能的handler使用硬编码的默认值
_file_handler = CompressedRotatingFileHandler( _file_handler = CompressedRotatingFileHandler(
LOG_DIR / "app.log.jsonl", log_file_path,
maxBytes=10 * 1024 * 1024, # 10MB maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5, backupCount=5,
encoding="utf-8", encoding="utf-8",
@@ -58,51 +72,99 @@ class CompressedRotatingFileHandler(logging.handlers.RotatingFileHandler):
super().__init__(filename, "a", maxBytes, backupCount, encoding) super().__init__(filename, "a", maxBytes, backupCount, encoding)
self.compress = compress self.compress = compress
self.compress_level = compress_level self.compress_level = compress_level
self._rollover_lock = threading.Lock() # 添加轮转锁
def doRollover(self): def doRollover(self):
"""执行日志轮转,并压缩旧文件""" """执行日志轮转,并压缩旧文件"""
if self.stream: with self._rollover_lock:
self.stream.close() if self.stream:
self.stream = None self.stream.close()
self.stream = None
# 如果有备份文件数量限制 # 如果有备份文件数量限制
if self.backupCount > 0: if self.backupCount > 0:
# 删除最旧的压缩文件 # 删除最旧的压缩文件
old_gz = f"{self.baseFilename}.{self.backupCount}.gz" old_gz = f"{self.baseFilename}.{self.backupCount}.gz"
old_file = f"{self.baseFilename}.{self.backupCount}" old_file = f"{self.baseFilename}.{self.backupCount}"
if Path(old_gz).exists(): if Path(old_gz).exists():
Path(old_gz).unlink() self._safe_remove(old_gz)
if Path(old_file).exists(): if Path(old_file).exists():
Path(old_file).unlink() self._safe_remove(old_file)
# 重命名现有的备份文件 # 重命名现有的备份文件
for i in range(self.backupCount - 1, 0, -1): for i in range(self.backupCount - 1, 0, -1):
source_gz = f"{self.baseFilename}.{i}.gz" source_gz = f"{self.baseFilename}.{i}.gz"
dest_gz = f"{self.baseFilename}.{i + 1}.gz" dest_gz = f"{self.baseFilename}.{i + 1}.gz"
source_file = f"{self.baseFilename}.{i}" source_file = f"{self.baseFilename}.{i}"
dest_file = f"{self.baseFilename}.{i + 1}" dest_file = f"{self.baseFilename}.{i + 1}"
if Path(source_gz).exists(): if Path(source_gz).exists():
Path(source_gz).rename(dest_gz) self._safe_rename(source_gz, dest_gz)
elif Path(source_file).exists(): elif Path(source_file).exists():
Path(source_file).rename(dest_file) self._safe_rename(source_file, dest_file)
# 处理当前日志文件 # 处理当前日志文件
dest_file = f"{self.baseFilename}.1" dest_file = f"{self.baseFilename}.1"
if Path(self.baseFilename).exists(): if Path(self.baseFilename).exists():
Path(self.baseFilename).rename(dest_file) if self._safe_rename(self.baseFilename, dest_file):
# 在后台线程中压缩文件
if self.compress:
threading.Thread(target=self._compress_file, args=(dest_file,), daemon=True).start()
# 在后台线程中压缩文件 # 重新创建日志文件
if self.compress: if not self.delay:
threading.Thread(target=self._compress_file, args=(dest_file,), daemon=True).start() self.stream = self._open()
# 重新创建日志文件 def _safe_rename(self, source, dest):
if not self.delay: """安全重命名文件处理Windows文件占用问题"""
self.stream = self._open() max_retries = 5
retry_delay = 0.1
for attempt in range(max_retries):
try:
Path(source).rename(dest)
return True
except PermissionError as e:
if attempt < max_retries - 1:
print(f"[日志轮转] 重命名失败,重试 {attempt + 1}/{max_retries}: {source} -> {dest}")
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
print(f"[日志轮转] 重命名最终失败: {source} -> {dest}, 错误: {e}")
return False
except Exception as e:
print(f"[日志轮转] 重命名错误: {source} -> {dest}, 错误: {e}")
return False
return False
def _safe_remove(self, filepath):
"""安全删除文件处理Windows文件占用问题"""
max_retries = 3
retry_delay = 0.1
for attempt in range(max_retries):
try:
Path(filepath).unlink()
return True
except PermissionError as e:
if attempt < max_retries - 1:
print(f"[日志轮转] 删除失败,重试 {attempt + 1}/{max_retries}: {filepath}")
time.sleep(retry_delay)
retry_delay *= 2
else:
print(f"[日志轮转] 删除最终失败: {filepath}, 错误: {e}")
return False
except Exception as e:
print(f"[日志轮转] 删除错误: {filepath}, 错误: {e}")
return False
return False
def _compress_file(self, filepath): def _compress_file(self, filepath):
"""在后台压缩文件""" """在后台压缩文件"""
# 等待一段时间确保文件写入完成
time.sleep(0.5)
try: try:
source_path = Path(filepath) source_path = Path(filepath)
if not source_path.exists(): if not source_path.exists():
@@ -110,21 +172,20 @@ class CompressedRotatingFileHandler(logging.handlers.RotatingFileHandler):
compressed_path = Path(f"{filepath}.gz") compressed_path = Path(f"{filepath}.gz")
# 记录原始大小
original_size = source_path.stat().st_size
with open(source_path, "rb") as f_in: with open(source_path, "rb") as f_in:
with gzip.open(compressed_path, "wb", compresslevel=self.compress_level) as f_out: with gzip.open(compressed_path, "wb", compresslevel=self.compress_level) as f_out:
shutil.copyfileobj(f_in, f_out) shutil.copyfileobj(f_in, f_out)
# 删除原文件 # 安全删除原文件
source_path.unlink() if self._safe_remove(filepath):
compressed_size = compressed_path.stat().st_size
# 记录压缩完成使用标准print避免循环日志 ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0
if source_path.exists(): print(f"[日志压缩] {source_path.name} -> {compressed_path.name} (压缩率: {ratio:.1f}%)")
original_size = source_path.stat().st_size
else: else:
original_size = 0 print(f"[日志压缩] 压缩完成但原文件删除失败: {filepath}")
compressed_size = compressed_path.stat().st_size
ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0
print(f"[日志压缩] {source_path.name} -> {compressed_path.name} (压缩率: {ratio:.1f}%)")
except Exception as e: except Exception as e:
print(f"[日志压缩] 压缩失败 {filepath}: {e}") print(f"[日志压缩] 压缩失败 {filepath}: {e}")
@@ -143,6 +204,31 @@ def close_handlers():
_console_handler = None _console_handler = None
def remove_duplicate_handlers():
"""移除重复的handler特别是文件handler"""
root_logger = logging.getLogger()
log_file_path = str(LOG_DIR / "app.log.jsonl")
# 收集所有文件handler
file_handlers = []
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.handlers.RotatingFileHandler):
if hasattr(handler, 'baseFilename') and handler.baseFilename == log_file_path:
file_handlers.append(handler)
# 如果有多个文件handler保留第一个关闭其他的
if len(file_handlers) > 1:
print(f"[日志系统] 检测到 {len(file_handlers)} 个重复的文件handler正在清理...")
for i, handler in enumerate(file_handlers[1:], 1):
print(f"[日志系统] 关闭重复的文件handler {i}")
root_logger.removeHandler(handler)
handler.close()
# 更新全局引用
global _file_handler
_file_handler = file_handlers[0]
# 读取日志配置 # 读取日志配置
def load_log_config(): def load_log_config():
"""从配置文件加载日志设置""" """从配置文件加载日志设置"""
@@ -612,6 +698,9 @@ def _immediate_setup():
file_handler.setFormatter(file_formatter) file_handler.setFormatter(file_formatter)
console_handler.setFormatter(console_formatter) console_handler.setFormatter(console_formatter)
# 清理重复的handler
remove_duplicate_handlers()
# 配置第三方库日志 # 配置第三方库日志
configure_third_party_loggers() configure_third_party_loggers()

View File

@@ -1,87 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试日志轮转错误的脚本
"""
import logging
import sys
import os
from pathlib import Path
# 添加src目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from common.logger import get_logger, force_initialize_logging
def test_log_rotation_with_error_detection():
"""测试日志轮转并捕获错误"""
print("开始测试日志轮转错误检测...")
# 强制初始化日志系统
force_initialize_logging()
logger = get_logger("error_test")
# 生成足够多的日志来强制轮转
large_message = "这是一条用于强制轮转的长消息。" * 200
print("开始生成日志以强制轮转...")
# 监控控制台输出中的错误信息
original_print = print
errors = []
def capture_print(*args, **kwargs):
message = ' '.join(str(arg) for arg in args)
if "重命名失败" in message or "删除失败" in message or "错误" in message:
errors.append(message)
original_print(*args, **kwargs)
# 临时替换print函数来捕获错误
import builtins
builtins.print = capture_print
try:
# 生成大量日志
for i in range(500):
logger.info(f"错误测试消息 {i}: {large_message}")
if i % 50 == 0:
original_print(f"已生成 {i} 条日志...")
# 等待一段时间让压缩线程完成
import time
time.sleep(2)
finally:
# 恢复原始print函数
builtins.print = original_print
print(f"\n检测到的错误信息:")
if errors:
for error in errors:
print(f" - {error}")
else:
print(" 没有检测到错误")
# 检查日志文件状态
log_dir = Path("logs")
if log_dir.exists():
log_files = list(log_dir.glob("app.log*"))
print(f"\n当前日志文件:")
for log_file in sorted(log_files):
size = log_file.stat().st_size / 1024 # KB
print(f" {log_file.name}: {size:.1f} KB")
return errors
if __name__ == "__main__":
errors = test_log_rotation_with_error_detection()
if errors:
print("\n⚠️ 发现错误,需要进一步修复")
sys.exit(1)
else:
print("\n✅ 测试通过,没有发现错误")
sys.exit(0)

View File

@@ -1,96 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试日志轮转修复的脚本
"""
import logging
import time
import threading
from pathlib import Path
import sys
import os
# 添加src目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from common.logger import get_logger, force_initialize_logging, get_log_stats
def test_concurrent_logging():
"""测试并发日志写入"""
logger = get_logger("test")
def log_worker(worker_id):
"""工作线程函数"""
for i in range(100):
logger.info(f"工作线程 {worker_id} - 消息 {i}: 这是一条测试日志消息,用于测试并发写入和轮转功能")
time.sleep(0.01)
# 创建多个线程并发写入日志
threads = []
for i in range(5):
thread = threading.Thread(target=log_worker, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("并发日志测试完成")
def test_log_rotation():
"""测试日志轮转"""
logger = get_logger("rotation_test")
# 生成大量日志来触发轮转
large_message = "这是一条很长的日志消息用于测试轮转功能。" * 100
print("开始生成大量日志以触发轮转...")
for i in range(1000):
logger.info(f"轮转测试消息 {i}: {large_message}")
if i % 100 == 0:
print(f"已生成 {i} 条日志...")
print("日志轮转测试完成")
def main():
"""主函数"""
print("开始测试日志系统修复...")
# 强制初始化日志系统
force_initialize_logging()
# 显示初始日志统计
stats = get_log_stats()
print(f"初始日志统计: {stats}")
# 测试并发日志
print("\n=== 测试并发日志写入 ===")
test_concurrent_logging()
# 测试日志轮转
print("\n=== 测试日志轮转 ===")
test_log_rotation()
# 显示最终日志统计
stats = get_log_stats()
print(f"\n最终日志统计: {stats}")
# 检查日志文件
log_dir = Path("logs")
if log_dir.exists():
log_files = list(log_dir.glob("app.log*"))
print(f"\n生成的日志文件:")
for log_file in sorted(log_files):
size = log_file.stat().st_size / 1024 / 1024 # MB
print(f" {log_file.name}: {size:.2f} MB")
print("\n测试完成!如果没有出现权限错误,说明修复成功。")
if __name__ == "__main__":
main()