From 06b4b7e4b9ce57738b6a8cd98a5ab5e2aecd6f13 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sun, 30 Nov 2025 12:13:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(log=5Fviewer):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=9F=A5=E7=9C=8B=E5=99=A8=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E5=AE=9E=E6=97=B6=E6=9F=A5=E7=9C=8B?= =?UTF-8?q?=E3=80=81=E6=90=9C=E7=B4=A2=E5=92=8C=E7=AD=9B=E9=80=89=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/log_viewer.py | 1107 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1107 insertions(+) create mode 100644 scripts/log_viewer.py diff --git a/scripts/log_viewer.py b/scripts/log_viewer.py new file mode 100644 index 000000000..88fff24ac --- /dev/null +++ b/scripts/log_viewer.py @@ -0,0 +1,1107 @@ +#!/usr/bin/env python3 +""" +MoFox-Core 日志查看器 +一个基于 HTTP 的日志查看服务,支持实时查看、搜索和筛选日志。 + +用法: + python scripts/log_viewer.py + python -m scripts.log_viewer [--port PORT] [--host HOST] +""" + +import argparse +import gzip +import json +import re +import sys +import tarfile +import threading +import webbrowser +from collections import defaultdict +from dataclasses import dataclass +from datetime import datetime +from http.server import HTTPServer, SimpleHTTPRequestHandler +from pathlib import Path +from typing import Any +from urllib.parse import parse_qs, urlparse + +# 添加项目根目录到路径(支持直接运行和模块运行) +PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +# 切换工作目录到项目根目录 +import os +os.chdir(PROJECT_ROOT) + +# 日志目录 +LOG_DIR = PROJECT_ROOT / "logs" + +# 从 logger.py 导入颜色和别名配置 +DEFAULT_MODULE_COLORS = {} +DEFAULT_MODULE_ALIASES = {} +try: + from src.common.logger import ( + DEFAULT_MODULE_ALIASES, + DEFAULT_MODULE_COLORS, + ) +except ImportError: + pass # 使用空字典 + + +@dataclass +class LogEntry: + """日志条目""" + + timestamp: str + level: str + logger_name: str + event: str + color: str | None = None + alias: str | None = None + extra: dict | None = None + line_number: int = 0 + file_name: str = "" + + +class LogReader: + """日志文件读取器""" + + def __init__(self, log_dir: Path): + self.log_dir = log_dir + self._cache: dict[str, list[LogEntry]] = {} + self._cache_mtime: dict[str, float] = {} + self._filter_cache: dict[str, tuple[list[LogEntry], str]] = {} # 筛选结果缓存 + self._lock = threading.Lock() + + def get_log_files(self) -> list[dict[str, Any]]: + """获取所有日志文件列表""" + files = [] + if not self.log_dir.exists(): + return files + + for f in sorted(self.log_dir.glob("app_*.log.jsonl*"), reverse=True): + try: + stat = f.stat() + is_compressed = f.suffix == ".gz" or ".tar.gz" in f.name + files.append( + { + "name": f.name, + "size": stat.st_size, + "size_human": self._human_size(stat.st_size), + "mtime": stat.st_mtime, + "mtime_human": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), + "compressed": is_compressed, + } + ) + except OSError: + continue + return files + + def _human_size(self, size: int) -> str: + """转换为人类可读的文件大小""" + for unit in ["B", "KB", "MB", "GB"]: + if size < 1024: + return f"{size:.1f} {unit}" + size /= 1024 + return f"{size:.1f} TB" + + def read_log_file(self, filename: str, use_cache: bool = True) -> list[LogEntry]: + """读取日志文件内容""" + filepath = self.log_dir / filename + if not filepath.exists(): + return [] + + # 检查缓存 + with self._lock: + if use_cache and filename in self._cache: + try: + current_mtime = filepath.stat().st_mtime + if self._cache_mtime.get(filename) == current_mtime: + return self._cache[filename] + except OSError: + pass + + entries = [] + try: + # 处理压缩文件 + if ".tar.gz" in filename: + entries = self._read_tar_gz(filepath) + elif filename.endswith(".gz"): + entries = self._read_gzip(filepath) + else: + entries = self._read_plain(filepath) + + # 更新缓存 + with self._lock: + self._cache[filename] = entries + try: + self._cache_mtime[filename] = filepath.stat().st_mtime + except OSError: + pass + + except Exception as e: + print(f"读取日志文件 {filename} 时出错: {e}") + + return entries + + def _read_plain(self, filepath: Path) -> list[LogEntry]: + """读取普通日志文件""" + entries = [] + with open(filepath, encoding="utf-8", errors="replace") as f: + for line_num, line in enumerate(f, 1): + entry = self._parse_line(line, line_num, filepath.name) + if entry: + entries.append(entry) + return entries + + def _read_gzip(self, filepath: Path) -> list[LogEntry]: + """读取 gzip 压缩的日志文件""" + entries = [] + with gzip.open(filepath, "rt", encoding="utf-8", errors="replace") as f: + for line_num, line in enumerate(f, 1): + entry = self._parse_line(line, line_num, filepath.name) + if entry: + entries.append(entry) + return entries + + def _read_tar_gz(self, filepath: Path) -> list[LogEntry]: + """读取 tar.gz 压缩的日志文件""" + entries = [] + try: + with tarfile.open(filepath, "r:gz") as tar: + for member in tar.getmembers(): + if member.isfile(): + f = tar.extractfile(member) + if f: + content = f.read().decode("utf-8", errors="replace") + for line_num, line in enumerate(content.splitlines(), 1): + entry = self._parse_line(line, line_num, filepath.name) + if entry: + entries.append(entry) + except Exception as e: + print(f"读取 tar.gz 文件 {filepath} 时出错: {e}") + return entries + + def _parse_line(self, line: str, line_num: int, filename: str) -> LogEntry | None: + """解析单行日志""" + line = line.strip() + if not line: + return None + + try: + data = json.loads(line) + logger_name = data.get("logger_name", "unknown") + + # 获取颜色和别名(优先使用日志中的,否则使用默认配置) + color = data.get("color") or DEFAULT_MODULE_COLORS.get(logger_name) + alias = data.get("alias") or DEFAULT_MODULE_ALIASES.get(logger_name) + + # 提取额外字段 + extra = {k: v for k, v in data.items() if k not in ("timestamp", "level", "logger_name", "event", "color", "alias")} + + return LogEntry( + timestamp=data.get("timestamp", ""), + level=data.get("level", "info"), + logger_name=logger_name, + event=data.get("event", ""), + color=color, + alias=alias, + extra=extra if extra else None, + line_number=line_num, + file_name=filename, + ) + except json.JSONDecodeError: + # 非 JSON 格式的行,尝试作为纯文本处理 + return LogEntry( + timestamp="", + level="info", + logger_name="raw", + event=line, + line_number=line_num, + file_name=filename, + ) + + def search_logs( + self, + filename: str, + query: str = "", + level: str = "", + logger: str = "", + start_time: str = "", + end_time: str = "", + limit: int = 1000, + offset: int = 0, + regex: bool = False, + ) -> tuple[list[LogEntry], int]: + """搜索和筛选日志""" + entries = self.read_log_file(filename) + + # 如果没有筛选条件,直接返回分页结果 + if not query and not level and not logger and not start_time and not end_time: + total = len(entries) + return entries[offset : offset + limit], total + + # 生成筛选条件的缓存 key + cache_key = f"{filename}:{query}:{level}:{logger}:{start_time}:{end_time}:{regex}" + + # 检查筛选缓存 + with self._lock: + cached = self._filter_cache.get(filename) + if cached and cached[1] == cache_key: + filtered = cached[0] + return filtered[offset : offset + limit], len(filtered) + + # 编译正则表达式(如果需要) + query_pattern = None + query_lower = "" + if query: + if regex: + try: + query_pattern = re.compile(query, re.IGNORECASE) + except re.error: + query_pattern = None + else: + query_lower = query.lower() + + filtered = [] + for entry in entries: + # 日志级别筛选 + if level and entry.level.lower() != level.lower(): + continue + + # Logger 名称筛选 + if logger and entry.logger_name.lower() != logger.lower(): + continue + + # 时间范围筛选 + if start_time and entry.timestamp < start_time: + continue + if end_time and entry.timestamp > end_time: + continue + + # 关键词搜索 + if query: + if query_pattern: + if not (query_pattern.search(entry.event) or query_pattern.search(entry.logger_name) or (entry.alias and query_pattern.search(entry.alias))): + continue + else: + search_text = f"{entry.event} {entry.logger_name} {entry.alias or ''}".lower() + if query_lower not in search_text: + continue + + filtered.append(entry) + + # 更新筛选缓存 + with self._lock: + self._filter_cache[filename] = (filtered, cache_key) + + total = len(filtered) + return filtered[offset : offset + limit], total + + def get_loggers(self, filename: str) -> list[dict[str, str]]: + """获取日志文件中的所有 logger""" + entries = self.read_log_file(filename) + loggers = {} + for entry in entries: + if entry.logger_name not in loggers: + loggers[entry.logger_name] = { + "name": entry.logger_name, + "alias": entry.alias or DEFAULT_MODULE_ALIASES.get(entry.logger_name, ""), + "color": entry.color or DEFAULT_MODULE_COLORS.get(entry.logger_name, ""), + } + return sorted(loggers.values(), key=lambda x: x["name"]) + + def get_stats(self, filename: str) -> dict[str, Any]: + """获取日志统计信息""" + entries = self.read_log_file(filename) + + level_counts = defaultdict(int) + logger_counts = defaultdict(int) + + for entry in entries: + level_counts[entry.level] += 1 + logger_counts[entry.logger_name] += 1 + + return { + "total": len(entries), + "by_level": dict(level_counts), + "by_logger": dict(sorted(logger_counts.items(), key=lambda x: -x[1])[:20]), + } + + +# HTML 模板 +HTML_TEMPLATE = """ + + + + + 日志查看器 + + + + + + +
+ + +
+
+
+ + +
+ +
+ + +
+ +
+ + +
+ + + + +
+ +
+ + + +
+ +
+ + +
+
+ +
+ +
+
+
+ + + + + 选择左侧文件查看日志 +
+
+ +
+
+
+ + + + +""" + + +class LogViewerHandler(SimpleHTTPRequestHandler): + """HTTP 请求处理器""" + + log_reader: LogReader = None # type: ignore + + def log_message(self, format, *args): + """自定义日志格式""" + print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}") + + def do_GET(self): + """处理 GET 请求""" + parsed = urlparse(self.path) + path = parsed.path + query = parse_qs(parsed.query) + + # API 路由 + if path == "/": + self.send_html(HTML_TEMPLATE) + elif path == "/api/files": + self.send_json(self.log_reader.get_log_files()) + elif path == "/api/logs": + self.handle_logs_api(query) + elif path == "/api/loggers": + filename = query.get("file", [""])[0] + self.send_json(self.log_reader.get_loggers(filename)) + elif path == "/api/stats": + filename = query.get("file", [""])[0] + self.send_json(self.log_reader.get_stats(filename)) + else: + self.send_error(404, "Not Found") + + def handle_logs_api(self, query: dict): + """处理日志搜索 API""" + filename = query.get("file", [""])[0] + search_query = query.get("query", [""])[0] + level = query.get("level", [""])[0] + logger = query.get("logger", [""])[0] + regex = query.get("regex", ["false"])[0].lower() == "true" + limit = int(query.get("limit", ["100"])[0]) + offset = int(query.get("offset", ["0"])[0]) + + logs, total = self.log_reader.search_logs( + filename=filename, + query=search_query, + level=level, + logger=logger, + limit=limit, + offset=offset, + regex=regex, + ) + + # 转换为可序列化的格式 + logs_data = [ + { + "timestamp": log.timestamp, + "level": log.level, + "logger_name": log.logger_name, + "event": log.event, + "color": log.color, + "alias": log.alias, + "extra": log.extra, + "line_number": log.line_number, + } + for log in logs + ] + + self.send_json({"logs": logs_data, "total": total}) + + def send_html(self, content: str): + """发送 HTML 响应""" + encoded = content.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + def send_json(self, data: Any): + """发送 JSON 响应""" + content = json.dumps(data, ensure_ascii=False, default=str) + encoded = content.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + +def run_server(host: str = "127.0.0.1", port: int = 8765, open_browser: bool = True): + """启动 HTTP 服务器""" + # 初始化日志读取器 + LogViewerHandler.log_reader = LogReader(LOG_DIR) + + server = HTTPServer((host, port), LogViewerHandler) + url = f"http://{host}:{port}" + + print(f"\n 📋 日志查看器已启动: {url}\n") + + # 自动打开浏览器 + if open_browser: + webbrowser.open(url) + + try: + server.serve_forever() + except KeyboardInterrupt: + print("\n 服务器已停止") + server.shutdown() + + +def main(): + parser = argparse.ArgumentParser(description="日志查看器") + parser.add_argument("--host", default="127.0.0.1", help="服务器地址 (默认: 127.0.0.1)") + parser.add_argument("--port", type=int, default=8765, help="服务器端口 (默认: 8765)") + parser.add_argument("--no-browser", action="store_true", help="不自动打开浏览器") + args = parser.parse_args() + + run_server(args.host, args.port, open_browser=not args.no_browser) + + +if __name__ == "__main__": + main()