#!/usr/bin/env python3 """ MoFox-Core 日志查看器 一个基于 HTTP 的日志查看服务,支持实时查看、搜索和筛选日志。 用法: python scripts/log_viewer.py python -m scripts.log_viewer [--port PORT] [--host HOST] """ import argparse import gzip import json import re import sys import tarfile import threading import webbrowser from collections import defaultdict from dataclasses import dataclass from datetime import datetime from http.server import HTTPServer, SimpleHTTPRequestHandler from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse # 添加项目根目录到路径(支持直接运行和模块运行) PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) # 切换工作目录到项目根目录 import os os.chdir(PROJECT_ROOT) # 日志目录 LOG_DIR = PROJECT_ROOT / "logs" # 从 logger.py 导入颜色和别名配置 DEFAULT_MODULE_COLORS = {} DEFAULT_MODULE_ALIASES = {} try: from src.common.logger import ( DEFAULT_MODULE_ALIASES, DEFAULT_MODULE_COLORS, ) except ImportError: pass # 使用空字典 @dataclass class LogEntry: """日志条目""" timestamp: str level: str logger_name: str event: str color: str | None = None alias: str | None = None extra: dict | None = None line_number: int = 0 file_name: str = "" class LogReader: """日志文件读取器""" def __init__(self, log_dir: Path): self.log_dir = log_dir self._cache: dict[str, list[LogEntry]] = {} self._cache_mtime: dict[str, float] = {} self._filter_cache: dict[str, tuple[list[LogEntry], str]] = {} # 筛选结果缓存 self._lock = threading.Lock() def get_log_files(self) -> list[dict[str, Any]]: """获取所有日志文件列表""" files = [] if not self.log_dir.exists(): return files for f in sorted(self.log_dir.glob("app_*.log.jsonl*"), reverse=True): try: stat = f.stat() is_compressed = f.suffix == ".gz" or ".tar.gz" in f.name files.append( { "name": f.name, "size": stat.st_size, "size_human": self._human_size(stat.st_size), "mtime": stat.st_mtime, "mtime_human": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), "compressed": is_compressed, } ) except OSError: continue return files def _human_size(self, size: int) -> str: """转换为人类可读的文件大小""" for unit in ["B", "KB", "MB", "GB"]: if size < 1024: return f"{size:.1f} {unit}" size /= 1024 return f"{size:.1f} TB" def read_log_file(self, filename: str, use_cache: bool = True) -> list[LogEntry]: """读取日志文件内容""" filepath = self.log_dir / filename if not filepath.exists(): return [] # 检查缓存 with self._lock: if use_cache and filename in self._cache: try: current_mtime = filepath.stat().st_mtime if self._cache_mtime.get(filename) == current_mtime: return self._cache[filename] except OSError: pass entries = [] try: # 处理压缩文件 if ".tar.gz" in filename: entries = self._read_tar_gz(filepath) elif filename.endswith(".gz"): entries = self._read_gzip(filepath) else: entries = self._read_plain(filepath) # 更新缓存 with self._lock: self._cache[filename] = entries try: self._cache_mtime[filename] = filepath.stat().st_mtime except OSError: pass except Exception as e: print(f"读取日志文件 {filename} 时出错: {e}") return entries def _read_plain(self, filepath: Path) -> list[LogEntry]: """读取普通日志文件""" entries = [] with open(filepath, encoding="utf-8", errors="replace") as f: for line_num, line in enumerate(f, 1): entry = self._parse_line(line, line_num, filepath.name) if entry: entries.append(entry) return entries def _read_gzip(self, filepath: Path) -> list[LogEntry]: """读取 gzip 压缩的日志文件""" entries = [] with gzip.open(filepath, "rt", encoding="utf-8", errors="replace") as f: for line_num, line in enumerate(f, 1): entry = self._parse_line(line, line_num, filepath.name) if entry: entries.append(entry) return entries def _read_tar_gz(self, filepath: Path) -> list[LogEntry]: """读取 tar.gz 压缩的日志文件""" entries = [] try: with tarfile.open(filepath, "r:gz") as tar: for member in tar.getmembers(): if member.isfile(): f = tar.extractfile(member) if f: content = f.read().decode("utf-8", errors="replace") for line_num, line in enumerate(content.splitlines(), 1): entry = self._parse_line(line, line_num, filepath.name) if entry: entries.append(entry) except Exception as e: print(f"读取 tar.gz 文件 {filepath} 时出错: {e}") return entries def _parse_line(self, line: str, line_num: int, filename: str) -> LogEntry | None: """解析单行日志""" line = line.strip() if not line: return None try: data = json.loads(line) logger_name = data.get("logger_name", "unknown") # 获取颜色和别名(优先使用日志中的,否则使用默认配置) color = data.get("color") or DEFAULT_MODULE_COLORS.get(logger_name) alias = data.get("alias") or DEFAULT_MODULE_ALIASES.get(logger_name) # 提取额外字段 extra = {k: v for k, v in data.items() if k not in ("timestamp", "level", "logger_name", "event", "color", "alias")} return LogEntry( timestamp=data.get("timestamp", ""), level=data.get("level", "info"), logger_name=logger_name, event=data.get("event", ""), color=color, alias=alias, extra=extra if extra else None, line_number=line_num, file_name=filename, ) except json.JSONDecodeError: # 非 JSON 格式的行,尝试作为纯文本处理 return LogEntry( timestamp="", level="info", logger_name="raw", event=line, line_number=line_num, file_name=filename, ) def search_logs( self, filename: str, query: str = "", level: str = "", logger: str = "", start_time: str = "", end_time: str = "", limit: int = 1000, offset: int = 0, regex: bool = False, ) -> tuple[list[LogEntry], int]: """搜索和筛选日志""" entries = self.read_log_file(filename) # 如果没有筛选条件,直接返回分页结果 if not query and not level and not logger and not start_time and not end_time: total = len(entries) return entries[offset : offset + limit], total # 生成筛选条件的缓存 key cache_key = f"{filename}:{query}:{level}:{logger}:{start_time}:{end_time}:{regex}" # 检查筛选缓存 with self._lock: cached = self._filter_cache.get(filename) if cached and cached[1] == cache_key: filtered = cached[0] return filtered[offset : offset + limit], len(filtered) # 编译正则表达式(如果需要) query_pattern = None query_lower = "" if query: if regex: try: query_pattern = re.compile(query, re.IGNORECASE) except re.error: query_pattern = None else: query_lower = query.lower() filtered = [] for entry in entries: # 日志级别筛选 if level and entry.level.lower() != level.lower(): continue # Logger 名称筛选 if logger and entry.logger_name.lower() != logger.lower(): continue # 时间范围筛选 if start_time and entry.timestamp < start_time: continue if end_time and entry.timestamp > end_time: continue # 关键词搜索 if query: if query_pattern: if not (query_pattern.search(entry.event) or query_pattern.search(entry.logger_name) or (entry.alias and query_pattern.search(entry.alias))): continue else: search_text = f"{entry.event} {entry.logger_name} {entry.alias or ''}".lower() if query_lower not in search_text: continue filtered.append(entry) # 更新筛选缓存 with self._lock: self._filter_cache[filename] = (filtered, cache_key) total = len(filtered) return filtered[offset : offset + limit], total def get_loggers(self, filename: str) -> list[dict[str, str]]: """获取日志文件中的所有 logger""" entries = self.read_log_file(filename) loggers = {} for entry in entries: if entry.logger_name not in loggers: loggers[entry.logger_name] = { "name": entry.logger_name, "alias": entry.alias or DEFAULT_MODULE_ALIASES.get(entry.logger_name, ""), "color": entry.color or DEFAULT_MODULE_COLORS.get(entry.logger_name, ""), } return sorted(loggers.values(), key=lambda x: x["name"]) def get_stats(self, filename: str) -> dict[str, Any]: """获取日志统计信息""" entries = self.read_log_file(filename) level_counts = defaultdict(int) logger_counts = defaultdict(int) for entry in entries: level_counts[entry.level] += 1 logger_counts[entry.logger_name] += 1 return { "total": len(entries), "by_level": dict(level_counts), "by_logger": dict(sorted(logger_counts.items(), key=lambda x: -x[1])[:20]), } # HTML 模板 HTML_TEMPLATE = """ 日志查看器
选择左侧文件查看日志
""" class LogViewerHandler(SimpleHTTPRequestHandler): """HTTP 请求处理器""" log_reader: LogReader = None # type: ignore def log_message(self, format, *args): """自定义日志格式""" print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") def do_GET(self): """处理 GET 请求""" parsed = urlparse(self.path) path = parsed.path query = parse_qs(parsed.query) # API 路由 if path == "/": self.send_html(HTML_TEMPLATE) elif path == "/api/files": self.send_json(self.log_reader.get_log_files()) elif path == "/api/logs": self.handle_logs_api(query) elif path == "/api/loggers": filename = query.get("file", [""])[0] self.send_json(self.log_reader.get_loggers(filename)) elif path == "/api/stats": filename = query.get("file", [""])[0] self.send_json(self.log_reader.get_stats(filename)) else: self.send_error(404, "Not Found") def handle_logs_api(self, query: dict): """处理日志搜索 API""" filename = query.get("file", [""])[0] search_query = query.get("query", [""])[0] level = query.get("level", [""])[0] logger = query.get("logger", [""])[0] regex = query.get("regex", ["false"])[0].lower() == "true" limit = int(query.get("limit", ["100"])[0]) offset = int(query.get("offset", ["0"])[0]) logs, total = self.log_reader.search_logs( filename=filename, query=search_query, level=level, logger=logger, limit=limit, offset=offset, regex=regex, ) # 转换为可序列化的格式 logs_data = [ { "timestamp": log.timestamp, "level": log.level, "logger_name": log.logger_name, "event": log.event, "color": log.color, "alias": log.alias, "extra": log.extra, "line_number": log.line_number, } for log in logs ] self.send_json({"logs": logs_data, "total": total}) def send_html(self, content: str): """发送 HTML 响应""" encoded = content.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() self.wfile.write(encoded) def send_json(self, data: Any): """发送 JSON 响应""" content = json.dumps(data, ensure_ascii=False, default=str) encoded = content.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() self.wfile.write(encoded) def run_server(host: str = "127.0.0.1", port: int = 8765, open_browser: bool = True): """启动 HTTP 服务器""" # 初始化日志读取器 LogViewerHandler.log_reader = LogReader(LOG_DIR) server = HTTPServer((host, port), LogViewerHandler) url = f"http://{host}:{port}" print(f"\n 📋 日志查看器已启动: {url}\n") # 自动打开浏览器 if open_browser: webbrowser.open(url) try: server.serve_forever() except KeyboardInterrupt: print("\n 服务器已停止") server.shutdown() def main(): parser = argparse.ArgumentParser(description="日志查看器") parser.add_argument("--host", default="127.0.0.1", help="服务器地址 (默认: 127.0.0.1)") parser.add_argument("--port", type=int, default=8765, help="服务器端口 (默认: 8765)") parser.add_argument("--no-browser", action="store_true", help="不自动打开浏览器") args = parser.parse_args() run_server(args.host, args.port, open_browser=not args.no_browser) if __name__ == "__main__": main()