diff --git a/.gitignore b/.gitignore index 5744424aa..9e1b96811 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ mongodb/ NapCat.Framework.Windows.Once/ log/ logs/ +out/ tool_call_benchmark.py run_maibot_core.bat run_napcat_adapter.bat diff --git a/bot.py b/bot.py index 41847a01f..7e2359690 100644 --- a/bot.py +++ b/bot.py @@ -226,6 +226,7 @@ def raw_main(): if __name__ == "__main__": + exit_code = 0 # 用于记录程序最终的退出状态 try: # 获取MainSystem实例 main_system = raw_main() @@ -241,13 +242,29 @@ if __name__ == "__main__": except KeyboardInterrupt: # loop.run_until_complete(global_api.stop()) logger.warning("收到中断信号,正在优雅关闭...") - loop.run_until_complete(graceful_shutdown()) - finally: - loop.close() + if loop and not loop.is_closed(): + try: + loop.run_until_complete(graceful_shutdown()) + except Exception as ge: # 捕捉优雅关闭时可能发生的错误 + logger.error(f"优雅关闭时发生错误: {ge}") + # except Exception as e: # 将主异常捕获移到外层 try...except + # logger.error(f"事件循环内发生错误: {str(e)} {str(traceback.format_exc())}") + # exit_code = 1 + # finally: # finally 块移到最外层,确保 loop 关闭和暂停总是执行 + # if loop and not loop.is_closed(): + # loop.close() + # # 在这里添加 input() 来暂停 + # input("按 Enter 键退出...") # <--- 添加这行 + # sys.exit(exit_code) # <--- 使用记录的退出码 except Exception as e: - logger.error(f"主程序异常: {str(e)} {str(traceback.format_exc())}") - if loop and not loop.is_closed(): - loop.run_until_complete(graceful_shutdown()) + logger.error(f"主程序发生异常: {str(e)} {str(traceback.format_exc())}") + exit_code = 1 # 标记发生错误 + finally: + # 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭) + if "loop" in locals() and loop and not loop.is_closed(): loop.close() - sys.exit(1) + logger.info("事件循环已关闭") + # 在程序退出前暂停,让你有机会看到输出 + input("按 Enter 键退出...") # <--- 添加这行 + sys.exit(exit_code) # <--- 使用记录的退出码 diff --git a/launcher.py b/launcher.py new file mode 100644 index 000000000..133c219e7 --- /dev/null +++ b/launcher.py @@ -0,0 +1,242 @@ +import flet as ft +import os +import atexit +import psutil # Keep for initial PID checks maybe, though state should handle it + +# --- Import refactored modules --- # +from src.MaiGoi.state import AppState +from src.MaiGoi.process_manager import ( + start_bot_and_show_console, + output_processor_loop, # Needed for restarting on navigate + cleanup_on_exit, + handle_disconnect, +) +from src.MaiGoi.ui_views import ( + create_main_view, + create_console_view, + create_adapters_view, + create_settings_view, + create_process_output_view, +) +from src.MaiGoi.config_manager import load_config + +# --- Global AppState instance --- # +# This holds all the state previously scattered as globals +app_state = AppState() + +# --- File Picker Result Handler Placeholder --- +# We need a placeholder function or logic to handle the result here if needed +# For now, the result will be handled within the adapters view itself. +# def handle_file_picker_result(e: ft.FilePickerResultEvent): +# print("File picker result in launcher (should be handled in view):", e.files) + +# --- atexit Cleanup Registration --- # +# Register the cleanup function from the process manager module +# It needs access to the app_state +atexit.register(cleanup_on_exit, app_state) +print("[Main Script] atexit cleanup handler from process_manager registered.", flush=True) + + +# --- Routing Logic --- # +def route_change(route: ft.RouteChangeEvent): + """Handles Flet route changes, creating and appending views.""" + page = route.page + target_route = route.route + + # Clear existing views before adding new ones + page.views.clear() + + # Always add the main view + main_view = create_main_view(page, app_state) + page.views.append(main_view) + + # --- Handle Specific Routes --- # + if target_route == "/console": + console_view = create_console_view(page, app_state) + page.views.append(console_view) + + # Check process status and potentially restart processor loop if needed + is_running = app_state.bot_pid is not None and psutil.pid_exists(app_state.bot_pid) + print( + f"[Route Change /console] Checking status: PID={app_state.bot_pid}, is_running={is_running}, stop_event={app_state.stop_event.is_set()}", + flush=True, + ) + + if is_running: + print("[Route Change /console] Process is running.", flush=True) + # If the processor loop was stopped (e.g., by navigating away or stop button), + # but the process is still running, restart the loop. + if app_state.stop_event.is_set(): + print("[Route Change /console] Stop event was set, clearing and restarting processor loop.", flush=True) + app_state.stop_event.clear() + # Make sure output_list_view is available before starting loop + if not app_state.output_list_view: + print("[Route Change /console] Warning: output_list_view is None when restarting loop. Creating.") + app_state.output_list_view = ft.ListView( + expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5 + ) + console_view.controls[1].controls[0].content = app_state.output_list_view # Update content in view + + page.run_task(output_processor_loop, page, app_state) + else: + print("[Route Change /console] Process is not running.", flush=True) + # Ensure console view shows the 'not running' state if needed + if app_state.output_list_view: + # Check if already has the message? Might add duplicates. + # Simple approach: just add it if the list is empty or last msg isn't it. + add_not_running_msg = True + if app_state.output_list_view.controls: + last_control = app_state.output_list_view.controls[-1] + # Check if it's Text and value is not None before checking content + if ( + isinstance(last_control, ft.Text) + and last_control.value is not None + and "Bot 进程未运行" in last_control.value + ): + add_not_running_msg = False + if add_not_running_msg: + app_state.output_list_view.controls.append(ft.Text("--- Bot 进程未运行 ---", italic=True)) + else: + # If list view doesn't exist here, create it and add the message + print("[Route Change /console] Creating ListView to show 'not running' message.") + app_state.output_list_view = ft.ListView( + expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5 + ) + app_state.output_list_view.controls.append(ft.Text("--- Bot 进程未运行 ---", italic=True)) + # Update the console view container's content + console_view.controls[1].controls[0].content = app_state.output_list_view + elif target_route == "/adapters": + adapters_view = create_adapters_view(page, app_state) + page.views.append(adapters_view) + elif target_route == "/settings": + settings_view = create_settings_view(page, app_state) + page.views.append(settings_view) + + # --- Handle Dynamic Adapter Output Route --- # + # Check if the route matches the pattern /adapters/ + elif target_route.startswith("/adapters/") and len(target_route.split("/")) == 3: + parts = target_route.split("/") + process_id = parts[2] # Extract the process ID (which is the script path for now) + print(f"[Route Change] Detected adapter output route for ID: {process_id}") + adapter_output_view = create_process_output_view(page, app_state, process_id) + if adapter_output_view: + page.views.append(adapter_output_view) + else: + # If view creation failed (e.g., process state not found), show error and stay on previous view? + # Or redirect back to /adapters? Let's go back to adapters list. + print(f"[Route Change] Failed to create output view for {process_id}. Redirecting to /adapters.") + # Avoid infinite loop if /adapters also fails + if len(page.views) > 1: # Ensure we don't pop the main view + page.views.pop() # Pop the failed view attempt + # Find the adapters view if it exists, otherwise just update + adapters_view_index = -1 + for i, view in enumerate(page.views): + if view.route == "/adapters": + adapters_view_index = i + break + if adapters_view_index == -1: # Adapters view wasn't in stack? Add it. + adapters_view = create_adapters_view(page, app_state) + page.views.append(adapters_view) + # Go back to the adapters list route to rebuild the view stack correctly + page.go("/adapters") + return # Prevent page.update() below + + # Update the page to show the correct view(s) + page.update() + + +def view_pop(e: ft.ViewPopEvent): + """Handles view popping (e.g., back navigation).""" + page = e.page + # Remove the top view + page.views.pop() + if page.views: + top_view = page.views[-1] + # Go to the route of the view now at the top of the stack + # This will trigger route_change again to rebuild the view stack correctly + page.go(top_view.route) + # else: print("Warning: Popped the last view.") + + +# --- Main Application Setup --- # +def main(page: ft.Page): + # Load initial config and store in state + loaded_config = load_config() + app_state.gui_config = loaded_config + app_state.adapter_paths = loaded_config.get("adapters", []).copy() # Get adapter paths + print(f"[Main] Initial adapters loaded: {app_state.adapter_paths}") + + # Set script_dir in AppState early + app_state.script_dir = os.path.dirname(os.path.abspath(__file__)) + print(f"[Main] Script directory set in state: {app_state.script_dir}", flush=True) + + # --- Setup File Picker --- # + # Create the FilePicker instance + # The on_result handler will be set dynamically in the view that uses it + app_state.file_picker = ft.FilePicker() + # Add the FilePicker to the page's overlay controls + page.overlay.append(app_state.file_picker) + print("[Main] FilePicker created and added to page overlay.") + + page.title = "MaiBot 启动器" + page.window_width = 500 + page.window_height = 650 # Increased height slightly for monitor + page.vertical_alignment = ft.MainAxisAlignment.START + page.horizontal_alignment = ft.CrossAxisAlignment.CENTER + page.theme_mode = ft.ThemeMode.SYSTEM + page.padding = 10 # Reduced padding slightly + + # --- Create the main 'Start Bot' button and store in state --- # + # This button needs to exist before the first route_change call + app_state.start_bot_button = ft.FilledButton( + "启动 MaiBot 主程序 (bot.py)", + icon=ft.icons.SMART_TOY_OUTLINED, + # The click handler now calls the function from process_manager + on_click=lambda _: start_bot_and_show_console(page, app_state), + expand=True, + tooltip="启动主程序并在新视图中显示控制台输出", + ) + print("[Main] Start Bot Button created and stored in state.", flush=True) + + # --- Routing Setup --- # + page.on_route_change = route_change + page.on_view_pop = view_pop + + # --- Disconnect Handler --- # + # Pass app_state to the disconnect handler + page.on_disconnect = lambda e: handle_disconnect(page, app_state, e) + print("[Main] Registered page.on_disconnect handler.", flush=True) + + # Prevent immediate close to allow cleanup + page.window_prevent_close = True + + # --- Initial Navigation --- # + # Trigger the initial route change to build the first view + page.go(page.route if page.route else "/") + + +# --- Run Flet App --- # +if __name__ == "__main__": + # No need to initialize globals here anymore, AppState handles it. + ft.app(target=main) + # This print will appear *after* the Flet window closes, + # but *before* the atexit handler runs. + print("[Main Script] Flet app exited. atexit handler should run next.", flush=True) + +# --- Removed Code Sections (Previously Globals and Functions) --- +# (Keep this comment block or similar for reference if desired) +# Removed: bot_process, bot_pid, output_queue, stop_event, interest_monitor_control, +# output_list_view, start_bot_button (now in AppState), +# is_auto_scroll_enabled (now in AppState) +# Removed: ansi_converter +# Removed: cleanup_on_exit (moved to process_manager) +# Removed: update_page_safe (moved to utils) +# Removed: show_snackbar (moved to utils) +# Removed: run_script (moved to utils) +# Removed: handle_disconnect (moved to process_manager) +# Removed: stop_bot_process (moved to process_manager) +# Removed: read_process_output (moved to process_manager) +# Removed: output_processor_loop (moved to process_manager) +# Removed: start_bot_and_show_console (moved to process_manager) +# Removed: create_console_view (moved to ui_views) +# (Main view creation logic also moved to ui_views within create_main_view) diff --git a/requirements.txt b/requirements.txt index 91ae096c1..5416220a6 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/src/MaiGoi/color_parser.py b/src/MaiGoi/color_parser.py new file mode 100644 index 000000000..305150d45 --- /dev/null +++ b/src/MaiGoi/color_parser.py @@ -0,0 +1,229 @@ +""" +Parses log lines containing ANSI escape codes or Loguru-style color tags +into a list of Flet TextSpan objects for colored output. +""" + +import re +import flet as ft + +# Basic ANSI SGR (Select Graphic Rendition) codes mapping +# See: https://en.wikipedia.org/wiki/ANSI_escape_code#SGR_(Select_Graphic_Rendition)_parameters +# Focusing on common foreground colors and styles used by Loguru +ANSI_CODES = { + # Styles + "1": ft.FontWeight.BOLD, + "3": ft.TextStyle(italic=True), # Italic + "4": ft.TextStyle(decoration=ft.TextDecoration.UNDERLINE), # Underline + "22": ft.FontWeight.NORMAL, # Reset bold + "23": ft.TextStyle(italic=False), # Reset italic + "24": ft.TextStyle(decoration=ft.TextDecoration.NONE), # Reset underline + # Foreground Colors (30-37) + "30": ft.colors.BLACK, + "31": ft.colors.RED, + "32": ft.colors.GREEN, + "33": ft.colors.YELLOW, + "34": ft.colors.BLUE, + "35": ft.colors.PINK, + "36": ft.colors.CYAN, + "37": ft.colors.WHITE, + "39": None, # Default foreground color + # Bright Foreground Colors (90-97) + "90": ft.colors.with_opacity(0.7, ft.colors.BLACK), # Often rendered as gray + "91": ft.colors.RED_ACCENT, # Or RED_400 / LIGHT_RED + "92": ft.colors.LIGHT_GREEN, # Or GREEN_ACCENT + "93": ft.colors.YELLOW_ACCENT, # Or LIGHT_YELLOW + "94": ft.colors.LIGHT_BLUE, # Or BLUE_ACCENT + "95": ft.colors.PINK, # ANSI bright magenta maps well to Flet's PINK + "96": ft.colors.CYAN_ACCENT, + "97": ft.colors.WHITE70, # Brighter white +} + +# Loguru simple tags mapping (add more as needed from your logger.py) +# Using lowercase for matching +LOGURU_TAGS = { + "red": ft.colors.RED, + "green": ft.colors.GREEN, + "yellow": ft.colors.YELLOW, + "blue": ft.colors.BLUE, + "magenta": ft.colors.PINK, + "cyan": ft.colors.CYAN, + "white": ft.colors.WHITE, + "light-yellow": ft.colors.YELLOW_ACCENT, # Or specific yellow shade + "light-green": ft.colors.LIGHT_GREEN, + "light-magenta": ft.colors.PINK, # Or specific magenta shade + "light-cyan": ft.colors.CYAN_ACCENT, # Or specific cyan shade + "light-blue": ft.colors.LIGHT_BLUE, + "fg #ffd700": "#FFD700", # Handle specific hex colors like emoji + "fg #3399ff": "#3399FF", # Handle specific hex colors like emoji + "fg #66ccff": "#66CCFF", + "fg #005ba2": "#005BA2", + "fg #7cffe6": "#7CFFE6", # 海马体 + "fg #37ffb4": "#37FFB4", # LPMM + "fg #00788a": "#00788A", # 远程 + "fg #3fc1c9": "#3FC1C9", # Tools + # Add other colors used in your logger.py simple formats +} + +# Regex to find ANSI codes (basic SGR, true-color fg) OR Loguru tags +# Added specific capture for 38;2;r;g;b +ANSI_COLOR_REGEX = re.compile( + r"(\x1b\[(?:(?:(?:3[0-7]|9[0-7]|1|3|4|22|23|24);?)+|39|0)m)" # Group 1: Basic SGR codes (like 31, 1;32, 0, 39) + r"|" + r"(\x1b\[38;2;(\d{1,3});(\d{1,3});(\d{1,3})m)" # Group 2: Truecolor FG ( captures full code, Grp 3: R, Grp 4: G, Grp 5: B ) + # r"|(\x1b\[48;2;...m)" # Placeholder for Truecolor BG if needed later + r"|" + r"(<(/?)([^>]+)?>)" # Group 6: Loguru tags ( Grp 7: slash, Grp 8: content ) +) + + +def parse_log_line_to_spans(line: str) -> list[ft.TextSpan]: + """ + Parses a log line potentially containing ANSI codes OR Loguru tags + into a list of Flet TextSpan objects. + Uses a style stack for basic nesting. + """ + spans = [] + current_pos = 0 + # Stack holds TextStyle objects. Base style is default. + style_stack = [ft.TextStyle()] + + for match in ANSI_COLOR_REGEX.finditer(line): + start, end = match.span() + basic_ansi_code = match.group(1) + truecolor_ansi_code = match.group(2) + tc_r, tc_g, tc_b = match.group(3), match.group(4), match.group(5) + loguru_full_tag = match.group(6) + loguru_closing_slash = match.group(7) + loguru_tag_content = match.group(8) + + current_style = style_stack[-1] + + if start > current_pos: + spans.append(ft.TextSpan(line[current_pos:start], current_style)) + + if basic_ansi_code: + # --- Handle Basic ANSI --- + params = basic_ansi_code[2:-1] + if not params or params == "0": # Reset code + style_stack = [ft.TextStyle()] # Reset stack + else: + temp_style_dict = { + k: getattr(current_style, k, None) for k in ["color", "weight", "italic", "decoration"] + } + codes = params.split(";") + for code in filter(None, codes): + style_attr = ANSI_CODES.get(code) + if isinstance(style_attr, str): + temp_style_dict["color"] = style_attr + elif isinstance(style_attr, ft.FontWeight): + temp_style_dict["weight"] = None if code == "22" else style_attr + elif isinstance(style_attr, ft.TextStyle): + if style_attr.italic is not None: + temp_style_dict["italic"] = False if code == "23" else style_attr.italic + if style_attr.decoration is not None: + temp_style_dict["decoration"] = ( + ft.TextDecoration.NONE if code == "24" else style_attr.decoration + ) + elif style_attr is None and code == "39": + temp_style_dict["color"] = None + style_stack[-1] = ft.TextStyle(**{k: v for k, v in temp_style_dict.items() if v is not None}) + + elif truecolor_ansi_code: + # --- Handle Truecolor ANSI --- + try: + r, g, b = int(tc_r), int(tc_g), int(tc_b) + hex_color = f"#{r:02x}{g:02x}{b:02x}" + # print(f"--- TrueColor Debug: Parsed RGB ({r},{g},{b}) -> {hex_color} ---") + # Update color in the current style on stack top + temp_style_dict = { + k: getattr(current_style, k, None) for k in ["color", "weight", "italic", "decoration"] + } + temp_style_dict["color"] = hex_color + style_stack[-1] = ft.TextStyle(**{k: v for k, v in temp_style_dict.items() if v is not None}) + except (ValueError, TypeError) as e: + print(f"Error parsing truecolor ANSI: {e}, Code: {truecolor_ansi_code}") + # Keep current style if parsing fails + + elif loguru_full_tag: + if loguru_closing_slash: + if len(style_stack) > 1: + style_stack.pop() + # print(f"--- Loguru Debug: Closing Tag processed. Stack size: {len(style_stack)} ---") + elif loguru_tag_content: # Opening tag + tag_lower = loguru_tag_content.lower() + style_attr = LOGURU_TAGS.get(tag_lower) + + # print(f"--- Loguru Debug: Opening Tag --- ") + # print(f" Raw Content : {repr(loguru_tag_content)}") + # print(f" Lowercase Key: {repr(tag_lower)}") + # print(f" Found Attr : {repr(style_attr)} --- ") + + temp_style_dict = { + k: getattr(current_style, k, None) for k in ["color", "weight", "italic", "decoration"] + } + + if style_attr: + if isinstance(style_attr, str): + temp_style_dict["color"] = style_attr + # print(f" Applied Color: {style_attr}") + # ... (handle other style types if needed) + + # Push the new style only if the tag was recognized and resulted in a change + # (or check if style_attr is not None) + new_style = ft.TextStyle(**{k: v for k, v in temp_style_dict.items() if v is not None}) + # Avoid pushing identical style + if new_style != current_style: + style_stack.append(new_style) + # print(f" Pushed Style. Stack size: {len(style_stack)}") + # else: + # print(f" Style unchanged, stack not pushed.") + # else: + # print(f" Tag NOT FOUND in LOGURU_TAGS.") + # else: Invalid tag format? + + current_pos = end + + # Add any remaining text after the last match + final_style = style_stack[-1] + if current_pos < len(line): + spans.append(ft.TextSpan(line[current_pos:], final_style)) + + return [span for span in spans if span.text] + + +if __name__ == "__main__": + # Example Usage & Testing + test_lines = [ + "This is normal text.", + "\\x1b[31mThis is red text.\\x1b[0m And back to normal.", + "\\x1b[1;32mThis is bold green.\\x1b[0m", + "Text with red tag inside.", + "Nested yellow bold yellow.", # Bold tag not handled yet + "Light green message", + "Emoji color", + "\\x1b[94mBright Blue ANSI\\x1b[0m", + "\\x1b[3mItalic ANSI\\x1b[0m", + # Example from user image (simplified) + "\\x1b[37m2025-05-03 23:00:44\\x1b[0m | \\x1b[1mINFO\\x1b[0m | \\x1b[96m配置\\x1b[0m | \\x1b[1m成功加载配置文件: ...\\x1b[0m", + "\\x1b[1mDEBUG\\x1b[0m | \\x1b[94m人物信息\\x1b[0m | \\x1b[1m已加载 81 个用户名\\x1b[0m", + "TIME | 模块 | 消息", # Loguru format string itself + ] + + # Simple print test (won't show colors in standard terminal) + for t_line in test_lines: + print(f"--- Input: {repr(t_line)} ---") + parsed_spans = parse_log_line_to_spans(t_line) + print("Parsed Spans:") + for s in parsed_spans: + print( + f" Text: {repr(s.text)}, Style: color={s.style.color}, weight={s.style.weight}, italic={s.style.italic}, decoration={s.style.decoration}" + ) + print("-" * 20) + + # To visually test with Flet, you'd run this in a simple Flet app: + # import flet as ft + # def main(page: ft.Page): + # page.add(ft.Column([ + # ft.Text(spans=parse_log_line_to_spans(line)) for line in test_lines + # ])) + # ft.app(target=main) diff --git a/src/MaiGoi/config_manager.py b/src/MaiGoi/config_manager.py new file mode 100644 index 000000000..92a4af668 --- /dev/null +++ b/src/MaiGoi/config_manager.py @@ -0,0 +1,73 @@ +import os +import toml +from pathlib import Path +from typing import Dict, Any + +CONFIG_DIR = "config" +CONFIG_FILE = "gui_config.toml" +DEFAULT_CONFIG = {"adapters": []} + + +def get_config_path() -> Path: + """Gets the full path to the config file.""" + # Assume script_dir is the project root for simplicity here + # A more robust solution might involve finding the project root explicitly + script_dir = Path(os.path.dirname(os.path.abspath(__file__))).parent.parent + config_path = script_dir / CONFIG_DIR / CONFIG_FILE + return config_path + + +def load_config() -> Dict[str, Any]: + """Loads the configuration from the TOML file.""" + config_path = get_config_path() + print(f"[Config] Loading config from: {config_path}") + try: + config_path.parent.mkdir(parents=True, exist_ok=True) # Ensure directory exists + if config_path.is_file(): + with open(config_path, "r", encoding="utf-8") as f: + config_data = toml.load(f) + # Ensure essential keys exist, merge with defaults if necessary + # For now, just check for 'adapters' + if "adapters" not in config_data: + config_data["adapters"] = DEFAULT_CONFIG["adapters"] + print("[Config] Config loaded successfully.") + return config_data + else: + print("[Config] Config file not found, using default config.") + # Save default config if file doesn't exist + save_config(DEFAULT_CONFIG) + return DEFAULT_CONFIG.copy() # Return a copy + except FileNotFoundError: + print("[Config] Config file not found (FileNotFoundError), using default config.") + save_config(DEFAULT_CONFIG) # Attempt to save default + return DEFAULT_CONFIG.copy() + except toml.TomlDecodeError as e: + print(f"[Config] Error decoding TOML file: {e}. Using default config.") + # Optionally: backup the corrupted file here + return DEFAULT_CONFIG.copy() + except Exception as e: + print(f"[Config] An unexpected error occurred loading config: {e}. Using default config.") + import traceback + + traceback.print_exc() + return DEFAULT_CONFIG.copy() + + +def save_config(config_data: Dict[str, Any]) -> bool: + """Saves the configuration dictionary to the TOML file.""" + config_path = get_config_path() + print(f"[Config] Saving config to: {config_path}") + try: + config_path.parent.mkdir(parents=True, exist_ok=True) # Ensure directory exists + with open(config_path, "w", encoding="utf-8") as f: + toml.dump(config_data, f) + print("[Config] Config saved successfully.") + return True + except IOError as e: + print(f"[Config] Error writing config file (IOError): {e}") + except Exception as e: + print(f"[Config] An unexpected error occurred saving config: {e}") + import traceback + + traceback.print_exc() + return False diff --git a/src/MaiGoi/flet_interest_monitor.py b/src/MaiGoi/flet_interest_monitor.py new file mode 100644 index 000000000..04823b221 --- /dev/null +++ b/src/MaiGoi/flet_interest_monitor.py @@ -0,0 +1,765 @@ +import flet as ft +import asyncio +import os +import json +import time +import traceback +import random +from datetime import datetime +from collections import deque + +# --- 配置 (可以从 launcher.py 传入或在此处定义) --- +LOG_FILE_PATH = os.path.join("logs", "interest", "interest_history.log") +REFRESH_INTERVAL_SECONDS = 1 # 刷新间隔(秒) +MAX_HISTORY_POINTS = 1000 # 图表数据点 (Tkinter version uses 1000) +MAX_STREAMS_TO_DISPLAY = 15 # 最多显示的流数量 (Tkinter version uses 15) +MAX_QUEUE_SIZE = 30 # 历史想法队列最大长度 (Tkinter version uses 30) +CHART_HEIGHT = 250 # 图表区域高度 + + +# --- 辅助函数 --- +def format_timestamp(ts): + """辅助函数:格式化时间戳,处理 None 或无效值""" + if ts is None: + return "N/A" + try: + # 假设 ts 是 float 类型的时间戳 + dt_object = datetime.fromtimestamp(float(ts)) + return dt_object.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError): + return "Invalid Time" + + +def get_random_flet_color(): + """生成一个随机的 Flet 颜色字符串。""" + r = random.randint(50, 200) + g = random.randint(50, 200) + b = random.randint(50, 200) + return f"#{r:02x}{g:02x}{b:02x}" + + +class InterestMonitorDisplay(ft.Column): + """一个 Flet 控件,用于显示兴趣监控图表和信息。""" + + def __init__(self): + super().__init__( + expand=True, + ) + # --- 状态变量 --- + self.log_reader_task = None + self.stream_history = {} # {stream_id: deque([(ts, interest), ...])} + self.probability_history = {} # {stream_id: deque([(ts, probability), ...])} + self.stream_display_names = {} # {stream_id: display_name} + self.stream_colors = {} # {stream_id: color_string} + self.selected_stream_id_for_details = None + self.last_log_read_time = 0 # 上次读取日志的时间戳 + + # --- 新增:存储其他参数 --- + # 顶层信息 (直接使用 Text 控件引用) + self.global_mai_state_text = ft.Text("状态: N/A", size=11) + self.global_main_mind_text = ft.Text( + "想法: N/A", size=11, overflow=ft.TextOverflow.ELLIPSIS, tooltip="完整想法请查看历史记录" + ) + self.global_subflow_count_text = ft.Text("子流数: 0", size=11) + # 子流最新状态 (key: stream_id) + self.stream_sub_minds = {} + self.stream_chat_states = {} + self.stream_threshold_status = {} + self.stream_last_active = {} + # self.stream_last_interaction = {} # Tkinter 有但日志似乎没有? + + # 新增:历史想法队列 + self.main_mind_history = deque(maxlen=MAX_QUEUE_SIZE) + self.last_main_mind_timestamp = 0 + + # --- UI 控件引用 --- + self.status_text = ft.Text("正在初始化监控器...", size=10, color=ft.colors.SECONDARY) + + # --- 全局信息 Row --- + self.global_info_row = ft.Row( + controls=[ + self.global_mai_state_text, + self.global_main_mind_text, + self.global_subflow_count_text, + ], + spacing=15, + wrap=False, # 防止换行 + ) + + # --- 图表控件 --- + self.main_chart = ft.LineChart(height=CHART_HEIGHT, expand=True) + # --- 新增:图例 Column --- + self.legend_column = ft.Column( + controls=[], + width=150, # 给图例固定宽度 + scroll=ft.ScrollMode.ADAPTIVE, # 如果图例过多则滚动 + spacing=2, + ) + + self.stream_dropdown = ft.Dropdown( + label="选择流查看详情", options=[], width=300, on_change=self.on_stream_selected + ) + self.detail_chart_interest = ft.LineChart(height=CHART_HEIGHT) + self.detail_chart_probability = ft.LineChart(height=CHART_HEIGHT) + + # --- 单个流详情文本控件 (Column) --- + self.detail_texts = ft.Column( + [ + # --- 添加新的 Text 控件来显示详情 --- + ft.Text("想法: N/A", size=11, no_wrap=True, overflow=ft.TextOverflow.ELLIPSIS), # index 0: sub_mind + ft.Text("状态: N/A", size=11), # index 1: chat_state + ft.Text("阈值以上: N/A", size=11), # index 2: threshold + ft.Text("最后活跃: N/A", size=11), # index 3: last_active + # ft.Text("最后交互: N/A", size=11), # 如果需要的话 + ], + spacing=2, + ) + + # --- 历史想法 ListView --- + self.mind_history_listview = ft.ListView(expand=True, spacing=5, padding=5) + + # --- 构建整体布局 --- + # 创建 Tabs 控件 (现在是 Column 的直接子控件) + self.tabs_control = ft.Tabs( + selected_index=0, + animation_duration=300, + tabs=[ + ft.Tab( + text="所有流兴趣度", + content=ft.Row( + controls=[ + self.main_chart, # 图表在左侧 + self.legend_column, # 图例在右侧 + ], + vertical_alignment=ft.CrossAxisAlignment.START, + expand=True, # 让 Row 扩展 + ), + ), + ft.Tab( + text="单个流详情", + content=ft.Column( + [ + self.stream_dropdown, + ft.Divider(height=5, color=ft.colors.TRANSPARENT), + self.detail_texts, # 显示文本信息的 Column + ft.Divider(height=10, color=ft.colors.TRANSPARENT), + ft.Row( + [ + ft.Column( + [ft.Text("兴趣度", weight=ft.FontWeight.BOLD), self.detail_chart_interest], + expand=1, + ), + ft.Column( + [ft.Text("HFC概率", weight=ft.FontWeight.BOLD), self.detail_chart_probability], + expand=1, + ), + ], + ), + ], + scroll=ft.ScrollMode.ADAPTIVE, # 自适应滚动 + ), + ), + # --- 添加历史想法 Tab --- + ft.Tab( + text="麦麦历史想法", + content=self.mind_history_listview, # 直接使用 ListView + ), + ], + expand=True, # 让 Tabs 在父 Column 中扩展 + ) + + self.controls = [ + self.status_text, + self.global_info_row, # 添加全局信息行 + # --- Tabs 直接放在 Column 里 --- # + self.tabs_control, + ] + + print("[InterestMonitor] 初始化完成") + + def did_mount(self): + print("[InterestMonitor] 控件已挂载,启动日志读取任务") + if self.page: + # --- 首次加载历史想法 (可以在这里或 log_reader_loop 首次运行时加载) --- + # self.page.run_task(self.load_and_process_log, initial_load=True) # 传递标志? + self.log_reader_task = self.page.run_task(self.log_reader_loop) + # self.page.run_task(self.update_charts) # update_charts 会在 loop 中调用 + else: + print("[InterestMonitor] 错误: 无法访问 self.page 来启动后台任务") + + def will_unmount(self): + print("[InterestMonitor] 控件将卸载,取消日志读取任务") + if self.log_reader_task: + self.log_reader_task.cancel() + print("[InterestMonitor] 日志读取任务已取消 (will_unmount)") + + async def log_reader_loop(self): + while True: + try: + mind_history_updated = await self.load_and_process_log() + await self.update_charts() + if mind_history_updated: + await self.refresh_mind_listview() + except asyncio.CancelledError: + print("[InterestMonitor] 日志读取循环被取消") + break + except Exception as e: + print(f"[InterestMonitor] 日志读取循环出错: {e}") + traceback.print_exc() + self.update_status(f"日志读取错误: {e}", ft.colors.ERROR) + + await asyncio.sleep(REFRESH_INTERVAL_SECONDS) + + async def load_and_process_log(self): + """读取并处理日志文件的新增内容。返回是否有新的 Main Mind 记录。""" + mind_history_updated = False # 跟踪是否有新的 Main Mind + if not os.path.exists(LOG_FILE_PATH): + self.update_status("日志文件未找到", ft.colors.WARNING) + return mind_history_updated + + try: + file_mod_time = os.path.getmtime(LOG_FILE_PATH) + if file_mod_time <= self.last_log_read_time: + return mind_history_updated + + print(f"[InterestMonitor] 检测到日志文件更新 (修改时间: {file_mod_time}), 正在读取...", flush=True) + + new_stream_history = {} + new_probability_history = {} + new_stream_display_names = {} + # 清理旧的子流状态,因为每次都重新读取文件 + self.stream_sub_minds.clear() + self.stream_chat_states.clear() + self.stream_threshold_status.clear() + self.stream_last_active.clear() + # self.stream_last_interaction.clear() + + read_count = 0 + error_count = 0 + # 注意:Flet 版本目前没有实现时间过滤,会读取整个文件 + + with open(LOG_FILE_PATH, "r", encoding="utf-8") as f: + for line in f: + read_count += 1 + try: + log_entry = json.loads(line.strip()) + if not isinstance(log_entry, dict): + continue + + entry_timestamp = log_entry.get("timestamp") + if entry_timestamp is None: + continue + + # --- 处理主兴趣流 --- # + stream_id = log_entry.get("stream_id") + interest = log_entry.get("interest") + probability = log_entry.get("probability") # 新增:获取概率 + + if stream_id is not None and interest is not None: + try: + interest_float = float(interest) + if stream_id not in new_stream_history: + new_stream_history[stream_id] = [] + # 避免重复添加相同时间戳的数据点 + if ( + not new_stream_history[stream_id] + or new_stream_history[stream_id][-1][0] < entry_timestamp + ): + new_stream_history[stream_id].append((entry_timestamp, interest_float)) + except (ValueError, TypeError): + pass # 忽略无法转换的值 + + # --- 处理概率 --- # + if stream_id is not None and probability is not None: + try: + prob_float = float(probability) + if stream_id not in new_probability_history: + new_probability_history[stream_id] = [] + if ( + not new_probability_history[stream_id] + or new_probability_history[stream_id][-1][0] < entry_timestamp + ): + new_probability_history[stream_id].append((entry_timestamp, prob_float)) + except (ValueError, TypeError): + pass # 忽略无法转换的值 + + # --- 处理子流 (subflows) --- # + subflows = log_entry.get("subflows") + if not isinstance(subflows, list): + continue + + for subflow_entry in subflows: + stream_id = subflow_entry.get("stream_id") + interest = subflow_entry.get("interest") + group_name = subflow_entry.get("group_name", stream_id) + # 新增: 获取子流概率 + probability = subflow_entry.get("probability") + + if stream_id is None or interest is None: + continue + try: + interest_float = float(interest) + except (ValueError, TypeError): + continue + + if stream_id not in new_stream_history: + new_stream_history[stream_id] = [] + self.stream_details[stream_id] = {"group_name": group_name} # 存储详情 + # 避免重复添加相同时间戳的数据点 + if ( + not new_stream_history[stream_id] + or new_stream_history[stream_id][-1][0] < entry_timestamp + ): + new_stream_history[stream_id].append((entry_timestamp, interest_float)) + + # --- 处理子流概率 --- # + if probability is not None: + try: + prob_float = float(probability) + if stream_id not in new_probability_history: + new_probability_history[stream_id] = [] + if ( + not new_probability_history[stream_id] + or new_probability_history[stream_id][-1][0] < entry_timestamp + ): + new_probability_history[stream_id].append((entry_timestamp, prob_float)) + except (ValueError, TypeError): + pass # 忽略无法转换的值 + + # --- 存储其他子流详情 (最新的会覆盖旧的) --- + self.stream_sub_minds[stream_id] = subflow_entry.get("sub_mind", "N/A") + self.stream_chat_states[stream_id] = subflow_entry.get("sub_chat_state", "N/A") + self.stream_threshold_status[stream_id] = subflow_entry.get("is_above_threshold", False) + self.stream_last_active[stream_id] = subflow_entry.get("chat_state_changed_time") + # self.stream_last_interaction[stream_id] = ... # 如果日志中有 + + except json.JSONDecodeError: + error_count += 1 + continue + except Exception as line_err: + print(f"处理日志行时出错: {line_err}") # 打印行级错误 + error_count += 1 + continue + + # 更新状态 + self.stream_history = new_stream_history + self.probability_history = new_probability_history + self.stream_display_names = new_stream_display_names + self.last_log_read_time = file_mod_time + + status_msg = f"日志读取于 {datetime.now().strftime('%H:%M:%S')}. 行数: {read_count}." + if error_count > 0: + status_msg += f" 跳过 {error_count} 无效行." + self.update_status(status_msg, ft.colors.ORANGE) + else: + self.update_status(status_msg, ft.colors.GREEN) + + # 更新全局信息控件 (如果 page 存在) + if self.page: + # TODO: Update global info based on latest log entry? Or aggregate? + # For now, let's update with the last processed entry's data if available + if log_entry: # Check if log_entry was populated + self.global_mai_state_text.value = f"状态: {log_entry.get('mai_state', 'N/A')}" + self.global_main_mind_text.value = f"想法: {log_entry.get('main_mind', 'N/A')}" + self.global_main_mind_text.tooltip = log_entry.get("main_mind", "N/A") + self.global_subflow_count_text.value = f"子流数: {log_entry.get('subflow_count', '0')}" + self.global_info_row.update() # Update the row + + # --- 处理 Main Mind 历史 --- + if "main_mind" in log_entry and entry_timestamp > self.last_main_mind_timestamp: + self.main_mind_history.append(log_entry) + self.last_main_mind_timestamp = entry_timestamp + mind_history_updated = True + + self.global_info_row.update() # 直接调用 update() + + # 更新下拉列表选项 + await self.update_dropdown_options() + + return mind_history_updated # 返回是否有新 mind 记录 + + except IOError as e: + print(f"读取日志文件时发生 IO 错误: {e}") + self.update_status(f"日志 IO 错误: {e}", ft.colors.ERROR) + except Exception as e: + print(f"处理日志时发生意外错误: {e}") + traceback.print_exc() + self.update_status(f"处理日志时出错: {e}", ft.colors.ERROR) + + return mind_history_updated + + async def refresh_mind_listview(self): + """刷新历史想法列表视图。""" + if not self.page: + return # 无法更新 + + self.mind_history_listview.controls.clear() + for entry in self.main_mind_history: + ts = entry.get("timestamp", 0) + dt_str = format_timestamp(ts) + main_mind = entry.get("main_mind", "") + mai_state = entry.get("mai_state", "") + subflow_count = entry.get("subflow_count", "") + # 使用 Markdown 加粗时间,简化显示 + text_content = f"**[{dt_str}]** 状态:{mai_state} 子流:{subflow_count}\n{main_mind}" + self.mind_history_listview.controls.append( + ft.Markdown(text_content, selectable=True, extension_set=ft.MarkdownExtensionSet.COMMON_MARK) + ) + + # print(f"[InterestMonitor] 刷新历史想法列表,共 {len(self.mind_history_listview.controls)} 条") + self.mind_history_listview.update() # 更新 ListView + # 滚动到底部 (如果需要) + # await asyncio.sleep(0.1) # 短暂延迟确保控件更新 + # self.mind_history_listview.scroll_to(offset=-1, duration=300) # 滚动到底部 + # 注意:scroll_to 可能需要在 page 上下文或特定条件下才有效 + + async def update_charts(self): + all_series = [] + legend_items = [] # 存储图例控件 + active_streams_sorted = sorted( + self.stream_history.items(), key=lambda item: item[1][-1][1] if item[1] else -1, reverse=True + )[:MAX_STREAMS_TO_DISPLAY] + + min_ts, max_ts = self.get_time_range(self.stream_history) + + for stream_id, history in active_streams_sorted: + if not history: + continue + try: + mpl_dates = [ts for ts, _ in history] + interests = [interest for _, interest in history] + if not mpl_dates: + continue + + data_points = [ft.LineChartDataPoint(x=ts, y=interest) for ts, interest in zip(mpl_dates, interests)] + all_series.append( + ft.LineChartData( + data_points=data_points, + color=self.stream_colors.get(stream_id, ft.colors.BLACK), + stroke_width=2, + ) + ) + # --- 创建图例项 --- + legend_color = self.stream_colors.get(stream_id, ft.colors.BLACK) + display_name = self.stream_display_names.get(stream_id, stream_id) + legend_items.append( + ft.Row( + controls=[ + ft.Container(width=10, height=10, bgcolor=legend_color, border_radius=2), + ft.Text(display_name, size=10, overflow=ft.TextOverflow.ELLIPSIS), + ], + spacing=5, + alignment=ft.MainAxisAlignment.START, + ) + ) + except Exception as plot_err: + print(f"绘制主图表/图例时跳过 Stream {stream_id}: {plot_err}") + continue + + # --- 更新主图表 --- + self.main_chart.data_series = all_series + self.main_chart.min_y = 0 + self.main_chart.max_y = 10 + self.main_chart.min_x = min_ts + self.main_chart.max_x = max_ts + + # --- 更新图例 --- + self.legend_column.controls = legend_items + + await self.update_detail_charts(self.selected_stream_id_for_details) + + if self.page: + # 更新整个控件,包含图表和图例的更新 + self.update() + + async def update_detail_charts(self, stream_id): + interest_series = [] + probability_series = [] + min_ts_detail, max_ts_detail = None, None + + # --- 兴趣度图 --- + if stream_id and stream_id in self.stream_history and self.stream_history[stream_id]: + min_ts_detail, max_ts_detail = self.get_time_range({stream_id: self.stream_history[stream_id]}) + try: + mpl_dates = [ts for ts, _ in self.stream_history[stream_id]] + interests = [interest for _, interest in self.stream_history[stream_id]] + if mpl_dates: + interest_data_points = [ + ft.LineChartDataPoint(x=ts, y=interest) for ts, interest in zip(mpl_dates, interests) + ] + interest_series.append( + ft.LineChartData( + data_points=interest_data_points, + color=self.stream_colors.get(stream_id, ft.colors.BLUE), + stroke_width=2, + ) + ) + except Exception as plot_err: + print(f"绘制详情兴趣图时出错 Stream {stream_id}: {plot_err}") + + # --- 概率图 --- + if stream_id and stream_id in self.probability_history and self.probability_history[stream_id]: + try: + prob_dates = [ts for ts, _ in self.probability_history[stream_id]] + probabilities = [prob for _, prob in self.probability_history[stream_id]] + if prob_dates: + if min_ts_detail is None: # 如果兴趣图没有数据,单独计算时间范围 + min_ts_detail, max_ts_detail = self.get_time_range( + {stream_id: self.probability_history[stream_id]}, is_prob=True + ) + else: # 合并时间范围 + min_prob_ts, max_prob_ts = self.get_time_range( + {stream_id: self.probability_history[stream_id]}, is_prob=True + ) + if min_prob_ts is not None: + min_ts_detail = min(min_ts_detail, min_prob_ts) + if max_prob_ts is not None: + max_ts_detail = max(max_ts_detail, max_prob_ts) + + probability_data_points = [ + ft.LineChartDataPoint(x=ts, y=prob) for ts, prob in zip(prob_dates, probabilities) + ] + probability_series.append( + ft.LineChartData( + data_points=probability_data_points, + color=self.stream_colors.get(stream_id, ft.colors.GREEN), + stroke_width=2, + ) + ) + except Exception as plot_err: + print(f"绘制详情概率图时出错 Stream {stream_id}: {plot_err}") + + self.detail_chart_interest.data_series = interest_series + self.detail_chart_interest.min_y = 0 + self.detail_chart_interest.max_y = 10 + self.detail_chart_interest.min_x = min_ts_detail + self.detail_chart_interest.max_x = max_ts_detail + + self.detail_chart_probability.data_series = probability_series + self.detail_chart_probability.min_y = 0 + self.detail_chart_probability.max_y = 1.05 + self.detail_chart_probability.min_x = min_ts_detail + self.detail_chart_probability.max_x = max_ts_detail + + await self.update_detail_texts(stream_id) + + async def update_dropdown_options(self): + current_value = self.stream_dropdown.value + options = [] + valid_stream_ids = set() + sorted_items = sorted(self.stream_display_names.items(), key=lambda item: item[1]) + for stream_id, display_name in sorted_items: + if stream_id in self.stream_history and self.stream_history[stream_id]: + # 使用 f"DisplayName (StreamID)" 格式? + option_text = f"{display_name}" + options.append(ft.dropdown.Option(key=stream_id, text=option_text)) + valid_stream_ids.add(stream_id) + + self.stream_dropdown.options = options + # Flet Dropdown 的 value 似乎是用 key (stream_id) 来匹配的 + if current_value not in valid_stream_ids: + # 如果之前的 stream_id 不再有效,尝试选择第一个,否则清空 + new_value = options[0].key if options else None + if self.stream_dropdown.value != new_value: # 避免不必要的更新循环 + self.stream_dropdown.value = new_value + self.selected_stream_id_for_details = new_value + await self.update_detail_charts(new_value) + + if self.page and self.stream_dropdown.page: # 确保控件已挂载再更新 + self.stream_dropdown.update() + + async def on_stream_selected(self, e): + selected_id = e.control.value # value 应该是 stream_id (key) + print(f"[InterestMonitor] 选择了 Stream ID: {selected_id}") + if self.selected_stream_id_for_details != selected_id: + self.selected_stream_id_for_details = selected_id + await self.update_detail_charts(selected_id) + # Dropdown 更新是自动的,但图表和文本需要手动触发父容器更新 + if self.page: + self.update() + + async def update_detail_texts(self, stream_id): + if not self.detail_texts or not hasattr(self.detail_texts, "controls") or len(self.detail_texts.controls) < 4: + print("[InterestMonitor] 错误:detail_texts 未正确初始化或控件不足") + return + + if stream_id: + sub_mind = self.stream_sub_minds.get(stream_id, "N/A") + chat_state = self.stream_chat_states.get(stream_id, "N/A") + threshold = self.stream_threshold_status.get(stream_id, False) + last_active_ts = self.stream_last_active.get(stream_id) + + self.detail_texts.controls[0].value = f"想法: {sub_mind}" + self.detail_texts.controls[0].tooltip = sub_mind # 添加 tooltip + self.detail_texts.controls[1].value = f"状态: {chat_state}" + self.detail_texts.controls[2].value = f"阈值以上: {'是' if threshold else '否'}" + self.detail_texts.controls[3].value = f"最后活跃: {format_timestamp(last_active_ts)}" + # self.detail_texts.controls[4].value = ... # 如果有更多详情 + else: + self.detail_texts.controls[0].value = "想法: N/A" + self.detail_texts.controls[0].tooltip = "N/A" + self.detail_texts.controls[1].value = "状态: N/A" + self.detail_texts.controls[2].value = "阈值以上: N/A" + self.detail_texts.controls[3].value = "最后活跃: N/A" + # self.detail_texts.controls[4].value = "N/A" + + if self.page and self.detail_texts.page: # 确保控件已挂载再更新 + self.detail_texts.update() + + def update_status(self, message: str, color: str = ft.colors.SECONDARY): + max_len = 150 + display_message = (message[:max_len] + "...") if len(message) > max_len else message + self.status_text.value = display_message + self.status_text.color = color + if self.page and self.status_text.page: + self.status_text.update() + + def get_time_range(self, history_dict, is_prob=False): + all_ts = [] + target_history_key = self.probability_history if is_prob else self.stream_history + for stream_id, _history in history_dict.items(): + # 使用正确的历史记录字典 + actual_history = target_history_key.get(stream_id) + if actual_history: + all_ts.extend([ts for ts, _ in actual_history]) + + if not all_ts: + now = time.time() + return now - 3600, now + + min_ts = min(all_ts) + max_ts = max(all_ts) + padding = (max_ts - min_ts) * 0.05 if max_ts > min_ts else 10 + return min_ts - padding, max_ts + padding + + +# --- 测试部分保持不变 --- +if __name__ == "__main__": + # ... (创建测试日志文件代码不变) ... + if not os.path.exists("logs/interest"): + os.makedirs("logs/interest") + test_log_path = LOG_FILE_PATH + with open(test_log_path, "w", encoding="utf-8") as f: + # ... (写入测试数据不变) ... + ts = time.time() + f.write( + json.dumps( + { + "timestamp": ts - 60, + "mai_state": "Idle", + "main_mind": "Start", + "subflow_count": 2, + "subflows": [ + { + "stream_id": "user1", + "group_name": "用户A", + "interest_level": 5, + "start_hfc_probability": 0.1, + "sub_mind": "Thinking about A", + "sub_chat_state": "Active", + "is_above_threshold": False, + "chat_state_changed_time": ts - 65, + }, + { + "stream_id": "user2", + "group_name": "用户B", + "interest_level": 3, + "start_hfc_probability": 0.05, + "sub_mind": "Thinking about B", + "sub_chat_state": "Idle", + "is_above_threshold": False, + "chat_state_changed_time": ts - 70, + }, + ], + } + ) + + "\n" + ) + f.write( + json.dumps( + { + "timestamp": ts - 30, + "mai_state": "Processing", + "main_mind": "Thinking", + "subflow_count": 2, + "subflows": [ + { + "stream_id": "user1", + "group_name": "用户A", + "interest_level": 6, + "start_hfc_probability": 0.2, + "sub_mind": "Processing A's request", + "sub_chat_state": "Active", + "is_above_threshold": True, + "chat_state_changed_time": ts - 65, + }, + { + "stream_id": "user2", + "group_name": "用户B", + "interest_level": 4, + "start_hfc_probability": 0.1, + "sub_mind": "Waiting for B", + "sub_chat_state": "Idle", + "is_above_threshold": False, + "chat_state_changed_time": ts - 70, + }, + ], + } + ) + + "\n" + ) + f.write( + json.dumps( + { + "timestamp": ts, + "mai_state": "Responding", + "main_mind": "Responding to A", + "subflow_count": 2, + "subflows": [ + { + "stream_id": "user1", + "group_name": "用户A", + "interest_level": 7, + "start_hfc_probability": 0.3, + "sub_mind": "Generating response A", + "sub_chat_state": "Active", + "is_above_threshold": True, + "chat_state_changed_time": ts - 65, + }, + { + "stream_id": "user2", + "group_name": "用户B", + "interest_level": 3, + "start_hfc_probability": 0.08, + "sub_mind": "Still waiting B", + "sub_chat_state": "Idle", + "is_above_threshold": False, + "chat_state_changed_time": ts - 70, + }, + ], + } + ) + + "\n" + ) + + async def main(page: ft.Page): + page.title = "Interest Monitor 测试" + page.vertical_alignment = ft.MainAxisAlignment.START + # --- 让窗口适应内容 --- + page.window_width = 800 # 增加宽度 + page.window_height = 650 # 增加高度 + page.padding = 10 # 统一内边距 + + monitor = InterestMonitorDisplay() + # --- 添加外层容器并设置属性 --- + container = ft.Container( + content=monitor, + expand=True, # 让容器扩展 + border=ft.border.all(1, ft.Colors.OUTLINE), + border_radius=ft.border_radius.all(5), + padding=10, + margin=ft.margin.only(top=10), + ) + page.add(container) # 将容器添加到页面 + + ft.app(target=main) diff --git a/src/MaiGoi/process_manager.py b/src/MaiGoi/process_manager.py new file mode 100644 index 000000000..81aa13567 --- /dev/null +++ b/src/MaiGoi/process_manager.py @@ -0,0 +1,595 @@ +import flet as ft +import subprocess +import os +import sys +import platform +import threading +import queue +import traceback +import asyncio +import psutil +from typing import Optional, TYPE_CHECKING, Tuple + +# Import the color parser and AppState/ManagedProcessState +from .color_parser import parse_log_line_to_spans + +if TYPE_CHECKING: + from .state import AppState +from .utils import show_snackbar, update_page_safe # Add import here + +# --- Helper Function to Update Button States (Mostly Unchanged for now) --- # + + +def update_buttons_state(page: Optional[ft.Page], app_state: "AppState", is_running: bool): + """Updates the state (text, icon, color, on_click) of the console button.""" + console_button = app_state.console_action_button + needs_update = False + + # --- Define Button Actions (Point to adapted functions) --- # + # start_action = lambda _: start_bot_and_show_console(page, app_state) if page else None + # stop_action = lambda _: stop_bot_process(page, app_state) if page else None # stop_bot_process now calls stop_managed_process + def _start_action(_): + if page: + start_bot_and_show_console(page, app_state) + + def _stop_action(_): + if page: + stop_bot_process(page, app_state) + + if console_button: + button_text_control = console_button.content if isinstance(console_button.content, ft.Text) else None + if button_text_control: + if is_running: + new_text = "停止 MaiCore" + new_color = ft.colors.with_opacity(0.6, ft.colors.RED_ACCENT_100) + new_onclick = _stop_action # Use def + if ( + button_text_control.value != new_text + or console_button.bgcolor != new_color + or console_button.on_click != new_onclick + ): + button_text_control.value = new_text + console_button.bgcolor = new_color + console_button.on_click = new_onclick + needs_update = True + else: + new_text = "MaiCore 主控室" + new_color = ft.colors.with_opacity(0.6, ft.colors.GREEN_ACCENT_100) + new_onclick = _start_action # Use def + if ( + button_text_control.value != new_text + or console_button.bgcolor != new_color + or console_button.on_click != new_onclick + ): + button_text_control.value = new_text + console_button.bgcolor = new_color + console_button.on_click = new_onclick + needs_update = True + else: + print("[Update Buttons] Warning: console_action_button content is not Text?") + + if needs_update and page: + print(f"[Update Buttons] State changed, triggering page update. is_running={is_running}") + # from .utils import update_page_safe # Moved import to top + page.run_task(update_page_safe, page) + + +# --- Generic Process Termination Helper --- +def _terminate_process_gracefully(process_id: str, handle: Optional[subprocess.Popen], pid: Optional[int]): + """Helper to attempt graceful termination, then kill.""" + stopped_cleanly = False + if handle and pid: + print(f"[_terminate] Attempting termination using handle for PID: {pid} (ID: {process_id})...", flush=True) + try: + if handle.poll() is None: + handle.terminate() + print(f"[_terminate] Sent terminate() to PID: {pid}. Waiting briefly...", flush=True) + try: + handle.wait(timeout=1.0) + print(f"[_terminate] Process PID: {pid} stopped after terminate().", flush=True) + stopped_cleanly = True + except subprocess.TimeoutExpired: + print(f"[_terminate] Terminate timed out for PID: {pid}. Attempting kill()...", flush=True) + try: + handle.kill() + print(f"[_terminate] Sent kill() to PID: {pid}.", flush=True) + except Exception as kill_err: + print(f"[_terminate] Error during kill() for PID: {pid}: {kill_err}", flush=True) + else: + print("[_terminate] Process poll() was not None before terminate (already stopped?).", flush=True) + stopped_cleanly = True # Already stopped + except Exception as e: + print(f"[_terminate] Error during terminate/wait for PID: {pid}: {e}", flush=True) + elif pid: + print( + f"[_terminate] No process handle, attempting psutil fallback for PID: {pid} (ID: {process_id})...", + flush=True, + ) + try: + if psutil.pid_exists(pid): + proc = psutil.Process(pid) + proc.terminate() + try: + proc.wait(timeout=1.0) + stopped_cleanly = True + except psutil.TimeoutExpired: + proc.kill() + print(f"[_terminate] psutil terminated/killed PID {pid}.", flush=True) + else: + print(f"[_terminate] psutil confirms PID {pid} does not exist.", flush=True) + stopped_cleanly = True # Already gone + except Exception as ps_err: + print(f"[_terminate] Error during psutil fallback for PID {pid}: {ps_err}", flush=True) + else: + print(f"[_terminate] Cannot terminate process ID '{process_id}': No handle or PID provided.", flush=True) + stopped_cleanly = True # Nothing to stop + + return stopped_cleanly + + +# --- Process Management Functions (Refactored for Multi-Process) --- # + + +def cleanup_on_exit(app_state: "AppState"): + """Registered with atexit to ensure ALL managed processes are killed on script exit.""" + print("--- [atexit Cleanup] Running cleanup function ---", flush=True) + # Iterate through a copy of the keys to avoid modification issues + process_ids = list(app_state.managed_processes.keys()) + print(f"[atexit Cleanup] Found managed process IDs: {process_ids}", flush=True) + + for process_id in process_ids: + process_state = app_state.managed_processes.get(process_id) + if process_state and process_state.pid: + print(f"[atexit Cleanup] Checking PID: {process_state.pid} for ID: {process_id}...", flush=True) + try: + # Use psutil directly as handles might be invalid in atexit + if psutil.pid_exists(process_state.pid): + print( + f"[atexit Cleanup] PID {process_state.pid} exists. Attempting termination/kill...", flush=True + ) + proc = psutil.Process(process_state.pid) + proc.terminate() + try: + proc.wait(timeout=0.5) + except psutil.TimeoutExpired: + proc.kill() + print( + f"[atexit Cleanup] psutil terminate/kill signal sent for PID {process_state.pid}.", flush=True + ) + else: + print(f"[atexit Cleanup] PID {process_state.pid} does not exist.", flush=True) + except psutil.NoSuchProcess: + print(f"[atexit Cleanup] psutil.NoSuchProcess error checking PID {process_state.pid}.", flush=True) + except Exception as ps_err: + print(f"[atexit Cleanup] Error cleaning up PID {process_state.pid}: {ps_err}", flush=True) + elif process_state: + print(f"[atexit Cleanup] Process ID '{process_id}' has no PID stored.", flush=True) + # else: Process ID might have been removed already + + print("--- [atexit Cleanup] Cleanup function finished ---", flush=True) + + +def handle_disconnect(page: Optional[ft.Page], app_state: "AppState", e): + """Handles UI disconnect. Sets the stop_event for the main bot.py process FOR NOW.""" + # TODO: In a full multi-process model, this might need to signal all running processes or be handled differently. + print(f"--- [Disconnect Event] Triggered! Setting main stop_event. Event data: {e} ---", flush=True) + if not app_state.stop_event.is_set(): # Still uses the old singleton event + app_state.stop_event.set() + print("[Disconnect Event] Main stop_event set. atexit handler will perform final cleanup.", flush=True) + + +# --- New Generic Stop Function --- +def stop_managed_process(process_id: str, page: Optional[ft.Page], app_state: "AppState"): + """Stops a specific managed process by its ID.""" + print(f"[Stop Managed] Request to stop process ID: '{process_id}'", flush=True) + process_state = app_state.managed_processes.get(process_id) + + if not process_state: + print(f"[Stop Managed] Process ID '{process_id}' not found in managed processes.", flush=True) + if page and process_id == "bot.py": # Show snackbar only for the main bot? + # from .utils import show_snackbar; show_snackbar(page, "Bot process not found or already stopped.") # Already imported at top + show_snackbar(page, "Bot process not found or already stopped.") + # If it's the main bot, ensure button state is correct + if process_id == "bot.py": + update_buttons_state(page, app_state, is_running=False) + return + + # Signal the specific stop event for this process + if not process_state.stop_event.is_set(): + print(f"[Stop Managed] Setting stop_event for ID: '{process_id}'", flush=True) + process_state.stop_event.set() + + # Attempt termination + _terminate_process_gracefully(process_id, process_state.process_handle, process_state.pid) + + # Update state in AppState dictionary + process_state.status = "stopped" + process_state.process_handle = None # Clear handle + process_state.pid = None # Clear PID + # Optionally remove the entry from the dictionary entirely? + # del app_state.managed_processes[process_id] + print(f"[Stop Managed] Marked process ID '{process_id}' as stopped in AppState.") + + # Update UI (specifically for the main bot for now) + if process_id == "bot.py": + # If the process being stopped is the main bot, update the console button + update_buttons_state(page, app_state, is_running=False) + # Also clear the old singleton state for compatibility + app_state.clear_process() # This now also updates the dict entry + + # TODO: Add UI update logic for other processes if a management view exists + + +# --- Adapted Old Stop Function (Calls the new generic one) --- +def stop_bot_process(page: Optional[ft.Page], app_state: "AppState"): + """(Called by Button) Stops the main bot.py process by calling stop_managed_process.""" + stop_managed_process("bot.py", page, app_state) + + +# --- Parameterized Reader Thread --- +def read_process_output( + app_state: "AppState", # Still pass app_state for global checks? Or remove? Let's keep for now. + process_handle: Optional[subprocess.Popen] = None, + output_queue: Optional[queue.Queue] = None, + stop_event: Optional[threading.Event] = None, + process_id: str = "bot.py", # ID for logging +): + """ + Background thread function to read raw output from a process and put it into a queue. + Defaults to using AppState singletons if specific handles/queues/events aren't provided. + """ + # Use provided arguments or default to AppState singletons + proc_handle = process_handle if process_handle is not None else app_state.bot_process + proc_queue = output_queue if output_queue is not None else app_state.output_queue + proc_stop_event = stop_event if stop_event is not None else app_state.stop_event + + if not proc_handle or not proc_handle.stdout: + if not proc_stop_event.is_set(): + print(f"[Reader Thread - {process_id}] Error: Process or stdout not available at start.", flush=True) + return + + print(f"[Reader Thread - {process_id}] Started.", flush=True) + try: + for line in iter(proc_handle.stdout.readline, ""): + if proc_stop_event.is_set(): + print(f"[Reader Thread - {process_id}] Stop event detected, exiting.", flush=True) + break + if line: + proc_queue.put(line.strip()) + else: + break # End of stream + except ValueError: + if not proc_stop_event.is_set(): + print(f"[Reader Thread - {process_id}] ValueError likely due to closed stdout.", flush=True) + except Exception as e: + if not proc_stop_event.is_set(): + print(f"[Reader Thread - {process_id}] Error reading output: {e}", flush=True) + finally: + if not proc_stop_event.is_set(): + try: + proc_queue.put(None) # Signal natural end + except Exception as q_err: + print(f"[Reader Thread - {process_id}] Error putting None signal: {q_err}", flush=True) + print(f"[Reader Thread - {process_id}] Finished.", flush=True) + + +# --- Parameterized Processor Loop --- +async def output_processor_loop( + page: Optional[ft.Page], + app_state: "AppState", # Pass AppState for PID checks and potentially global state access + process_id: str = "bot.py", # ID to identify the process and its state + # Defaults use AppState singletons for backward compatibility with bot.py + output_queue: Optional[queue.Queue] = None, + stop_event: Optional[threading.Event] = None, + target_list_view: Optional[ft.ListView] = None, +): + """ + Processes a specific output queue and updates the UI until stop_event is set. + Defaults to using AppState singletons if specific queue/event/view aren't provided. + """ + print(f"[Processor Loop - {process_id}] Started.", flush=True) + proc_queue = output_queue if output_queue is not None else app_state.output_queue + proc_stop_event = stop_event if stop_event is not None else app_state.stop_event + output_lv = target_list_view if target_list_view is not None else app_state.output_list_view + + # from .utils import update_page_safe # Moved to top + + while not proc_stop_event.is_set(): + lines_to_add = [] + process_ended_signal_received = False + + try: + while not proc_queue.empty(): + raw_line = proc_queue.get_nowait() + if raw_line is None: + process_ended_signal_received = True + print(f"[Processor Loop - {process_id}] Process ended signal received from reader.", flush=True) + lines_to_add.append(ft.Text(f"--- Process '{process_id}' Finished --- ", italic=True)) + break + else: + spans = parse_log_line_to_spans(raw_line) + lines_to_add.append(ft.Text(spans=spans, selectable=True, size=12)) + except queue.Empty: + pass + + if lines_to_add: + if proc_stop_event.is_set(): + break + + if output_lv: + output_lv.controls.extend(lines_to_add) + while len(output_lv.controls) > 1000: + output_lv.controls.pop(0) # Limit lines + if output_lv.visible and page: + try: + await update_page_safe(page) + except Exception: + pass + # else: print(f"[Processor Loop - {process_id}] Warning: target_list_view is None...") + + if process_ended_signal_received: + print( + f"[Processor Loop - {process_id}] Process ended naturally. Setting stop event and cleaning up.", + flush=True, + ) + if not proc_stop_event.is_set(): + proc_stop_event.set() + # Update the specific process state in the dictionary + proc_state = app_state.managed_processes.get(process_id) + if proc_state: + proc_state.status = "stopped" + proc_state.process_handle = None + proc_state.pid = None + # If it's the main bot, also update the old state and buttons + if process_id == "bot.py": + app_state.clear_process() # Clears old state and marks new as stopped + update_buttons_state(page, app_state, is_running=False) + break + + # Check if the specific process died unexpectedly using its PID from managed_processes + current_proc_state = app_state.managed_processes.get(process_id) + current_pid = current_proc_state.pid if current_proc_state else None + + if current_pid is not None and not psutil.pid_exists(current_pid) and not proc_stop_event.is_set(): + print( + f"[Processor Loop - {process_id}] Process PID {current_pid} ended unexpectedly. Setting stop event.", + flush=True, + ) + proc_stop_event.set() + if current_proc_state: # Update state + current_proc_state.status = "stopped" + current_proc_state.process_handle = None + current_proc_state.pid = None + # Add message to its specific output view + if output_lv: + output_lv.controls.append(ft.Text(f"--- Process '{process_id}' Ended Unexpectedly ---", italic=True)) + if page and output_lv.visible: + try: + await update_page_safe(page) + except Exception: + pass + # If it's the main bot, update buttons and old state + if process_id == "bot.py": + app_state.clear_process() + update_buttons_state(page, app_state, is_running=False) + break + + try: + await asyncio.sleep(0.2) + except asyncio.CancelledError: + print(f"[Processor Loop - {process_id}] Cancelled during sleep.", flush=True) + if not proc_stop_event.is_set(): + proc_stop_event.set() + break + + print(f"[Processor Loop - {process_id}] Exited.", flush=True) + + +# --- New Generic Start Function --- +def start_managed_process( + script_path: str, + display_name: str, + page: ft.Page, + app_state: "AppState", + # target_list_view: Optional[ft.ListView] = None # Removed parameter +) -> Tuple[bool, Optional[str]]: + """ + Starts a managed background process, creates its state, and starts reader/processor. + Returns (success: bool, message: Optional[str]) + """ + # from .utils import show_snackbar # Dynamic import - Already imported at top + from .state import ManagedProcessState # Dynamic import + + process_id = script_path # Use script path as ID for now, ensure uniqueness later if needed + + # Prevent duplicate starts if ID already exists and is running + existing_state = app_state.managed_processes.get(process_id) + if ( + existing_state + and existing_state.status == "running" + and existing_state.pid + and psutil.pid_exists(existing_state.pid) + ): + msg = f"Process '{display_name}' (ID: {process_id}) is already running." + print(f"[Start Managed] {msg}", flush=True) + # show_snackbar(page, msg) # Maybe too noisy? + return False, msg + + full_path = os.path.join(app_state.script_dir, script_path) + if not os.path.exists(full_path): + msg = f"Error: Script file not found {script_path}" + print(f"[Start Managed] {msg}", flush=True) + show_snackbar(page, msg, error=True) + return False, msg + + print(f"[Start Managed] Preparing to start NEW process: {display_name} ({script_path})", flush=True) + + # Create NEW state object for this process with its OWN queue and event + # UNLESS it's bot.py, in which case we still use the old singletons for now + is_main_bot = script_path == "bot.py" + new_queue = app_state.output_queue if is_main_bot else queue.Queue() + new_event = app_state.stop_event if is_main_bot else threading.Event() + + new_process_state = ManagedProcessState( + process_id=process_id, + script_path=script_path, + display_name=display_name, + output_queue=new_queue, + stop_event=new_event, + status="starting", + ) + # Add to managed processes *before* starting + app_state.managed_processes[process_id] = new_process_state + + # --- Create and store ListView if not main bot --- # + output_lv: Optional[ft.ListView] = None + if is_main_bot: + output_lv = app_state.output_list_view # Use the main console view + else: + # Create and store a new ListView for this specific process + output_lv = ft.ListView(expand=True, spacing=2, padding=5, auto_scroll=True) # Default auto_scroll on + new_process_state.output_list_view = output_lv + + # Add starting message to the determined ListView + if output_lv: + output_lv.controls.append(ft.Text(f"--- Starting {display_name} --- ", italic=True)) + else: # Should not happen if is_main_bot or created above + print(f"[Start Managed - {process_id}] Error: Could not determine target ListView.") + + try: + print(f"[Start Managed - {process_id}] Starting subprocess: {full_path}", flush=True) + sub_env = os.environ.copy() + # Set env vars if needed (e.g., for colorization) + sub_env["LOGURU_COLORIZE"] = "True" # Tell Loguru (inside bot.py) to colorize + sub_env["FORCE_COLOR"] = "1" # Force libraries that check this to use color + sub_env["SIMPLE_OUTPUT"] = "True" # Custom flag for bot.py to use simple format + print( + f"[Start Managed - {process_id}] Subprocess environment set: COLORIZE={sub_env.get('LOGURU_COLORIZE')}, FORCE_COLOR={sub_env.get('FORCE_COLOR')}, SIMPLE_OUTPUT={sub_env.get('SIMPLE_OUTPUT')}", + flush=True, + ) + + process = subprocess.Popen( + [sys.executable, "-u", full_path], # -u for unbuffered output + cwd=app_state.script_dir, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + errors="replace", + bufsize=1, + creationflags=subprocess.CREATE_NO_WINDOW if platform.system() == "Windows" else 0, + env=sub_env, + ) + + # Update the state with handle and PID + new_process_state.process_handle = process + new_process_state.pid = process.pid + new_process_state.status = "running" + print(f"[Start Managed - {process_id}] Subprocess started. PID: {process.pid}", flush=True) + + # If it's the main bot, also update the old state vars for compatibility + if is_main_bot: + app_state.bot_process = process + app_state.bot_pid = process.pid + update_buttons_state(page, app_state, is_running=True) + + # Start the PARAMETERIZED reader thread + output_thread = threading.Thread( + target=read_process_output, + args=(app_state, process, new_queue, new_event, process_id), # Pass specific objects + daemon=True, + ) + output_thread.start() + print(f"[Start Managed - {process_id}] Output reader thread started.", flush=True) + + # Start the PARAMETERIZED processor loop task + # Pass the determined output_lv (either main console or the new one) + page.run_task(output_processor_loop, page, app_state, process_id, new_queue, new_event, output_lv) + print(f"[Start Managed - {process_id}] Output processor loop scheduled.", flush=True) + + return True, f"Process '{display_name}' started successfully." + + except Exception as e: + print(f"[Start Managed - {process_id}] Error during startup:", flush=True) + traceback.print_exc() + # Clean up state if startup failed + new_process_state.status = "error" + new_process_state.process_handle = None + new_process_state.pid = None + if process_id in app_state.managed_processes: # Might be redundant check + app_state.managed_processes[process_id].status = "error" + + if is_main_bot: # Update UI/state for main bot failure + app_state.clear_process() + update_buttons_state(page, app_state, is_running=False) + + error_message = str(e) if str(e) else repr(e) + show_snackbar(page, f"Error running {script_path}: {error_message}", error=True) + return False, f"Error starting process '{display_name}': {error_message}" + + +# --- Adapted Old Start Function (Calls the new generic one for bot.py) --- +def start_bot_and_show_console(page: ft.Page, app_state: "AppState"): + """Starts bot.py or navigates to its console view, managing state via AppState.""" + script_path_relative = "bot.py" + display_name = "MaiCore" + # from .utils import show_snackbar, update_page_safe # Dynamic imports - Already imported at top + + # Check running status using OLD state for now + is_running = app_state.bot_pid is not None and psutil.pid_exists(app_state.bot_pid) + print( + f"[Start Bot Click] Current state: is_running={is_running} (PID={app_state.bot_pid}), stop_event={app_state.stop_event.is_set()}", + flush=True, + ) + + if is_running: + print("[Start Bot Click] Process is running. Navigating to console.", flush=True) + show_snackbar(page, "Bot process is already running, showing console.") + # Ensure processor loop is running (it uses the singleton stop_event) + if app_state.stop_event.is_set(): + print("[Start Bot Click] Stop event was set, clearing and restarting processor loop.", flush=True) + app_state.stop_event.clear() + # Start the processor loop using defaults (targets main console view) + page.run_task(output_processor_loop, page, app_state) + + if page.route != "/console": + page.go("/console") + else: + page.run_task(update_page_safe, page) + return + + # --- Start the bot process --- + print("[Start Bot Click] Process not running. Starting new process via start_managed_process.", flush=True) + + # Clear and setup OLD ListView from state (used by default processor loop) + if app_state.output_list_view: + app_state.output_list_view.controls.clear() + app_state.output_list_view.auto_scroll = app_state.is_auto_scroll_enabled + print("[Start Bot Click] Cleared console history.", flush=True) + else: + app_state.output_list_view = ft.ListView( + expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5 + ) + print( + f"[Start Bot Click] Created new ListView with auto_scroll={app_state.is_auto_scroll_enabled}.", flush=True + ) + + # Reset OLD state (clears queue, event) - this also resets the managed state entry + app_state.reset_process_state() + + # Call the generic start function, targeting the main console list view + # This will use the OLD singleton queue/event because script_path == "bot.py" + # and start the default (non-parameterized call) reader/processor + # The call below now implicitly passes app_state.output_list_view because is_main_bot=True inside start_managed_process + success, message = start_managed_process( + script_path=script_path_relative, + display_name=display_name, + page=page, + app_state=app_state, + # target_list_view=app_state.output_list_view # Removed parameter + ) + + if success: + # Navigate to console view + page.go("/console") + # else: Error message already shown by start_managed_process diff --git a/src/MaiGoi/state.py b/src/MaiGoi/state.py new file mode 100644 index 000000000..2accc735b --- /dev/null +++ b/src/MaiGoi/state.py @@ -0,0 +1,138 @@ +import flet as ft +import subprocess +import queue +import threading +from typing import Optional, List, Dict, Any +from dataclasses import dataclass, field + +# 从 flet_interest_monitor 导入,如果需要类型提示 +from .flet_interest_monitor import InterestMonitorDisplay + + +@dataclass +class ManagedProcessState: + """Holds the state for a single managed background process.""" + + process_id: str # Unique identifier (e.g., script path or UUID) + script_path: str + display_name: str + process_handle: Optional[subprocess.Popen] = None + pid: Optional[int] = None + output_queue: queue.Queue = field(default_factory=queue.Queue) + stop_event: threading.Event = field(default_factory=threading.Event) + status: str = "stopped" # e.g., "running", "stopped", "error" + # Store UI references if needed later, e.g., for dedicated output views + # output_view_controls: Optional[List[ft.Control]] = None + output_list_view: Optional[ft.ListView] = None # Added to hold the specific ListView for this process + + +class AppState: + """Holds the shared state of the launcher application.""" + + def __init__(self): + # Process related state + self.bot_process: Optional[subprocess.Popen] = None + self.bot_pid: Optional[int] = None + self.output_queue: queue.Queue = queue.Queue() + self.stop_event: threading.Event = threading.Event() + + # UI related state + self.output_list_view: Optional[ft.ListView] = None + self.start_bot_button: Optional[ft.FilledButton] = None + self.console_action_button: Optional[ft.ElevatedButton] = None + self.is_auto_scroll_enabled: bool = False + self.interest_monitor_control: Optional[InterestMonitorDisplay] = None + + # Script directory (useful for paths) + self.script_dir: str = "" # Will be set during initialization in launcher.py + + # --- Configuration State --- # + self.gui_config: Dict[str, Any] = {} # Loaded from gui_config.toml + self.adapter_paths: List[str] = [] # Specific list of adapter paths from config + + # --- Process Management State (NEW - For multi-process support) --- # + self.managed_processes: Dict[str, ManagedProcessState] = {} + + def reset_process_state(self): + """Resets variables related to the bot process.""" + print("[AppState] Resetting process state.", flush=True) + self.bot_process = None + self.bot_pid = None + # Clear the queue? Maybe not, might lose messages if reset mid-operation + # while not self.output_queue.empty(): + # try: self.output_queue.get_nowait() + # except queue.Empty: break + self.stop_event.clear() # Ensure stop event is cleared + + # --- Reset corresponding NEW state (if exists) --- + process_id = "bot.py" + if process_id in self.managed_processes: + # Ensure the managed state reflects the reset event/queue + # (Since they point to the same objects for now, this is redundant but good practice) + self.managed_processes[process_id].stop_event = self.stop_event + self.managed_processes[process_id].output_queue = self.output_queue + self.managed_processes[process_id].status = "stopped" # Ensure status is reset before start + print(f"[AppState] Reset NEW managed state event/queue pointers and status for ID: '{process_id}'.") + + def set_process(self, process: subprocess.Popen, script_path: str = "bot.py", display_name: str = "MaiCore"): + """ + Sets the process handle and PID. + Also updates the new managed_processes dictionary for compatibility. + """ + # --- Update OLD state --- + self.bot_process = process + self.bot_pid = process.pid + # Reset stop event for the new process run + self.stop_event.clear() + # NOTE: We keep the OLD output_queue and stop_event separate for now, + # as the current reader/processor loops use them directly. + # In the future, the reader/processor will use the queue/event + # from the ManagedProcessState object. + + # --- Update NEW state --- + process_id = script_path # Use script_path as ID for now + new_process_state = ManagedProcessState( + process_id=process_id, + script_path=script_path, + display_name=display_name, + process_handle=process, + pid=process.pid, + # IMPORTANT: For now, use the *old* queue/event for the bot.py entry + # to keep existing reader/processor working without immediate changes. + # A true multi-process implementation would give each process its own. + output_queue=self.output_queue, + stop_event=self.stop_event, + status="running", + ) + self.managed_processes[process_id] = new_process_state + print( + f"[AppState] Set OLD process state (PID: {self.bot_pid}) and added/updated NEW managed state for ID: '{process_id}'" + ) + + def clear_process(self): + """ + Clears the process handle and PID. + Also updates the status in the new managed_processes dictionary. + """ + old_pid = self.bot_pid + process_id = "bot.py" # Assuming clear is for the main bot process + + # --- Clear OLD state --- + self.bot_process = None + self.bot_pid = None + # Don't clear stop_event here, it should be set to signal stopping. + # Don't clear output_queue, might still contain final messages. + + # --- Update NEW state --- + if process_id in self.managed_processes: + self.managed_processes[process_id].process_handle = None + self.managed_processes[process_id].pid = None + self.managed_processes[process_id].status = "stopped" + # Keep queue and event references for now + print( + f"[AppState] Cleared OLD process state (was PID: {old_pid}) and marked NEW managed state for ID: '{process_id}' as stopped." + ) + else: + print( + f"[AppState] Cleared OLD process state (was PID: {old_pid}). No corresponding NEW state found for ID: '{process_id}'." + ) diff --git a/src/MaiGoi/ui_views.py b/src/MaiGoi/ui_views.py new file mode 100644 index 000000000..90bbbdcb0 --- /dev/null +++ b/src/MaiGoi/ui_views.py @@ -0,0 +1,745 @@ +import flet as ft +from typing import Optional, TYPE_CHECKING +import psutil +import os + +# Import components and state +from .flet_interest_monitor import InterestMonitorDisplay + +if TYPE_CHECKING: + from .state import AppState + + +def create_main_view(page: ft.Page, app_state: "AppState") -> ft.View: + """Creates the main view ('/') of the application.""" + # Get the main button from state (should be created in launcher.py main) + start_button = app_state.start_bot_button + if not start_button: + print("[Main View] Error: start_bot_button not initialized in state! Creating placeholder.") + start_button = ft.FilledButton("Error - Reload App") + app_state.start_bot_button = start_button # Store placeholder back just in case + + from .utils import run_script # Dynamic import to avoid cycles + + # --- Card Styling --- # + card_shadow = ft.BoxShadow( + spread_radius=1, + blur_radius=10, # Slightly more blur for frosted effect + color=ft.colors.with_opacity(0.2, ft.colors.BLACK87), + offset=ft.Offset(1, 2), + ) + # card_border = ft.border.all(1, ft.colors.with_opacity(0.5, ft.colors.SECONDARY)) # Optional: Remove border for cleaner glass look + card_radius = ft.border_radius.all(4) # Slightly softer edges for glass + # card_bgcolor = ft.colors.with_opacity(0.05, ft.colors.BLUE_GREY_50) # Subtle background + # Use a semi-transparent primary color for the frosted glass effect + card_bgcolor = ft.colors.with_opacity(0.65, ft.colors.PRIMARY_CONTAINER) # Example: using theme container color + + # --- Card Creation Function --- # + def create_action_card(icon: str, text: str, on_click_handler, tooltip: str = None): + # Removed icon parameter usage + return ft.Container( + content=ft.Row( + [ + # ft.Icon(name=icon, color=ft.colors.PRIMARY, size=20), # Icon Removed + ft.Text( + text, + weight=ft.FontWeight.BOLD, # Bolder text + size=20, # Even larger font size + expand=True, # Allow text to take available space + text_align=ft.TextAlign.CENTER, # Center text within the row + ), + ], + # alignment=ft.MainAxisAlignment.START, # Row alignment doesn't matter much with only text + vertical_alignment=ft.CrossAxisAlignment.CENTER, + # spacing=15, # Spacing removed as icon is gone + ), + width=300, # *** Explicitly set a fixed width for the card *** + # min_height=80, # Increase minimum height (Container doesn't have min_height) + border_radius=card_radius, + # border=card_border, # Border removed + bgcolor=card_bgcolor, + padding=ft.padding.symmetric(vertical=25, horizontal=20), # Further increase vertical padding for height + margin=ft.margin.only(bottom=20), # Increased bottom margin for more spacing + shadow=card_shadow, + on_click=on_click_handler, + tooltip=tooltip, + ink=True, # Add ripple effect on click + ) + + # --- Main Button Action --- # + # Need process_manager for the main button action + start_bot_card = create_action_card( + icon=ft.icons.SMART_TOY_OUTLINED, + text="启动麦麦Core", + on_click_handler=lambda _: page.go("/console"), + tooltip="打开 Bot 控制台视图 (在此启动 Bot)", + ) + # Note: We are not using app_state.start_bot_button directly here anymore + # The button state update logic in process_manager might need adjustment + # if we want this card's appearance to change (e.g., text to "返回控制台"). + # For now, it will always show "启动". + + # --- Define Popup Menu Items --- # + menu_items = [ + # ft.PopupMenuItem( + # text="麦麦学习", + # on_click=lambda _: run_script("start_lpmm.bat", page, app_state), + # ), + ft.PopupMenuItem( + text="人格生成(测试版)", + on_click=lambda _: run_script("start_personality.bat", page, app_state), + ), + # Add more items here if needed in the future + ] + + # --- Create "More..." Card Separately for Stack --- # + more_options_card_stack = ft.Container( + content=ft.Row( + [ + ft.Text( + "更多...", # Renamed text + weight=ft.FontWeight.BOLD, + size=14, # Smaller size for less emphasis + # expand=True, + text_align=ft.TextAlign.LEFT, + ), + ft.PopupMenuButton(items=menu_items, icon=ft.icons.MORE_VERT, tooltip="选择要运行的脚本"), + ], + vertical_alignment=ft.CrossAxisAlignment.CENTER, + spacing=5, # Reduced spacing + alignment=ft.MainAxisAlignment.END, # Align content to the end (right) of the row + ), + # width=150, # Reduced width + border_radius=card_radius, + bgcolor=card_bgcolor, + padding=ft.padding.symmetric(vertical=10, horizontal=15), # Reduced padding + # margin=ft.margin.only(bottom=20), # Margin handled by Stack positioning + shadow=card_shadow, + ) + + # --- Main Column of Cards --- # + main_cards_column = ft.Column( + controls=[ + ft.Container(height=15), # Top spacing + # start_button, # Removed direct button + start_bot_card, # Use the card + # --- Move Adapters Card Up --- # + create_action_card( + icon=ft.icons.EXTENSION_OUTLINED, # Example icon + text="启动适配器...", + on_click_handler=lambda _: page.go("/adapters"), + tooltip="管理和运行适配器脚本", + ), + # Re-add the LPMM script card + create_action_card( + icon=ft.icons.MODEL_TRAINING_OUTLINED, # Icon is not used visually but kept for consistency maybe + text="麦麦学习", + on_click_handler=lambda _: run_script("start_lpmm.bat", page, app_state), + tooltip="运行学习脚本 (start_lpmm.bat)", + ), + # more_options_card, # Add the new card with the popup menu (Moved to Stack) + # --- Add Adapters and Settings Cards --- # + create_action_card( + icon=ft.icons.SETTINGS_OUTLINED, # Example icon + text="设置", + on_click_handler=lambda _: page.go("/settings"), + tooltip="配置启动器选项", + ), + ], + # alignment=ft.MainAxisAlignment.START, # Default vertical alignment is START + horizontal_alignment=ft.CrossAxisAlignment.START, # Align cards to the START (left) + spacing=0, # Let card margin handle spacing + # expand=True, # Remove expand from the inner column if using Stack + ) + + return ft.View( + "/", # Main view route + [ + ft.AppBar( + # title=ft.Text("MaiBot 工具箱", size=18, weight=ft.FontWeight.W_600), # Larger, bolder title + # Use leading for custom title layout with a line + leading=ft.Row( + [ + ft.Container(width=4, height=28, bgcolor=ft.colors.PRIMARY, border_radius=2), # Vertical line + ft.Container(width=5), # Use a Container for simple horizontal spacing + ft.Text("MaiBot 工具箱", size=22, weight=ft.FontWeight.BOLD), # Larger title + ], + spacing=5, # Spacing within the row + vertical_alignment=ft.CrossAxisAlignment.CENTER, + ), + leading_width=300, # Adjust width to fit the custom leading widget + center_title=False, # Left-align title + ), + # --- Use Stack for Layout --- # + ft.Stack( + [ + # Main column of cards (aligned top-left implicitly) + main_cards_column, + # "More..." card aligned bottom-right + ft.Container( + content=more_options_card_stack, + # Use Stack positioning properties instead of alignment + right=10, # Distance from right edge + bottom=10, # Distance from bottom edge + ), + ], + expand=True, # Make Stack fill the available space + ), + # ft.Column( + # [ + # ft.Container(height=15), # Top spacing + # # start_button, # Removed direct button + # start_bot_card, # Use the card + # # Re-add the LPMM script card + # create_action_card( + # icon=ft.icons.MODEL_TRAINING_OUTLINED, # Icon is not used visually but kept for consistency maybe + # text="麦麦学习", + # on_click_handler=lambda _: run_script("start_lpmm.bat", page, app_state), + # tooltip="运行学习脚本 (start_lpmm.bat)" + # ), + # # more_options_card, # Add the new card with the popup menu (Moved to Stack) + # # --- Add Adapters and Settings Cards --- # + # create_action_card( + # icon=ft.icons.EXTENSION_OUTLINED, # Example icon + # text="适配器...", + # on_click_handler=lambda _: page.go("/adapters"), + # tooltip="管理和运行适配器脚本" + # ), + # create_action_card( + # icon=ft.icons.SETTINGS_OUTLINED, # Example icon + # text="设置", + # on_click_handler=lambda _: page.go("/settings"), + # tooltip="配置启动器选项" + # ), + # ], + # # alignment=ft.MainAxisAlignment.START, # Default vertical alignment is START + # horizontal_alignment=ft.CrossAxisAlignment.START, # Align cards to the START (left) + # spacing=0, # Let card margin handle spacing + # expand=True, + # ) + ], + padding=ft.padding.symmetric(horizontal=15), # Add horizontal padding to the view + scroll=ft.ScrollMode.ADAPTIVE, # Allow scrolling if content overflows + ) + + +def create_console_view(page: ft.Page, app_state: "AppState") -> ft.View: + """Creates the console output view ('/console'), including the interest monitor.""" + # Get UI elements from state + output_list_view = app_state.output_list_view + # start_button = app_state.start_bot_button # Variable is assigned but never used + from .process_manager import update_buttons_state # Dynamic import + + # Create ListView if it doesn't exist (as a fallback, should be created by start_bot) + if not output_list_view: + output_list_view = ft.ListView(expand=True, spacing=2, auto_scroll=app_state.is_auto_scroll_enabled, padding=5) + app_state.output_list_view = output_list_view # Store back to state + print("[Create Console View] Fallback: Created ListView.") + + # --- Create or get InterestMonitorDisplay instance --- # + # Ensure the same instance is used if the view is recreated + if app_state.interest_monitor_control is None: + print("[Create Console View] Creating InterestMonitorDisplay instance") + app_state.interest_monitor_control = InterestMonitorDisplay() # Store in state + else: + print("[Create Console View] Using existing InterestMonitorDisplay instance from state") + # Optional: Trigger reactivation if needed + # asyncio.create_task(app_state.interest_monitor_control.start_updates_if_needed()) + + interest_monitor = app_state.interest_monitor_control + + # --- Process Manager Functions (Import for button actions) --- + + # --- Auto-scroll toggle button callback (remains separate) --- # + def toggle_auto_scroll(e): + app_state.is_auto_scroll_enabled = not app_state.is_auto_scroll_enabled + lv = app_state.output_list_view # Get potentially updated list view + if lv: + lv.auto_scroll = app_state.is_auto_scroll_enabled + # Update button appearance (assuming button reference is available) + # e.control is the Container now + # We need to update the Text control stored in its data attribute + text_control = e.control.data if isinstance(e.control.data, ft.Text) else None + if text_control: + text_control.value = "自动滚动 开" if app_state.is_auto_scroll_enabled else "自动滚动 关" + else: + print("[toggle_auto_scroll] Warning: Could not find Text control in button data.") + # The icon and tooltip are on the Container itself (though tooltip might be better on Text?) + # e.control.icon = ft.icons.PLAY_ARROW if app_state.is_auto_scroll_enabled else ft.icons.PAUSE # Icon removed + e.control.tooltip = "切换控制台自动滚动" # Tooltip remains useful + print(f"Auto-scroll {'enabled' if app_state.is_auto_scroll_enabled else 'disabled'}.", flush=True) + # Update the container to reflect text changes + # page.run_task(update_page_safe, page) # This updates the whole page + e.control.update() # Try updating only the container first + + # --- Card Styling (Copied from create_main_view for reuse) --- # + card_shadow = ft.BoxShadow( + spread_radius=1, + blur_radius=10, + color=ft.colors.with_opacity(0.2, ft.colors.BLACK87), + offset=ft.Offset(1, 2), + ) + card_radius = ft.border_radius.all(4) + card_bgcolor = ft.colors.with_opacity(0.65, ft.colors.PRIMARY_CONTAINER) + card_padding = ft.padding.symmetric(vertical=8, horizontal=12) # Smaller padding for console buttons + + # --- Create Buttons --- # + # Create the main action button (Start/Stop) as a styled Container + console_action_button_text = ft.Text("...") # Placeholder text, updated by update_buttons_state + console_action_button = ft.Container( + content=console_action_button_text, + bgcolor=card_bgcolor, # Apply style + border_radius=card_radius, + shadow=card_shadow, + padding=card_padding, + ink=True, + # on_click is set by update_buttons_state + ) + app_state.console_action_button = console_action_button # Store container ref + + # Create the auto-scroll toggle button as a styled Container with Text + auto_scroll_text_content = "自动滚动 开" if app_state.is_auto_scroll_enabled else "自动滚动 关" + auto_scroll_text = ft.Text(auto_scroll_text_content, size=12) + toggle_button = ft.Container( + content=auto_scroll_text, + tooltip="切换控制台自动滚动", + on_click=toggle_auto_scroll, # Attach click handler here + bgcolor=card_bgcolor, # Apply style + border_radius=card_radius, + shadow=card_shadow, + padding=card_padding, + ink=True, + # Remove left margin + margin=ft.margin.only(right=10), + ) + # Store the text control inside the toggle button container for updating + toggle_button.data = auto_scroll_text # Store Text reference in data attribute + + # --- 附加信息区 Column (在 View 级别创建) --- + info_top_section = ft.Column( + controls=[ + ft.Text("附加信息 - 上", weight=ft.FontWeight.BOLD), + ft.Divider(), + ft.Text("..."), # 上半部分占位符 + ], + expand=True, # 让上半部分填充可用垂直空间 + scroll=ft.ScrollMode.ADAPTIVE, + ) + info_bottom_section = ft.Column( + controls=[ + ft.Text("附加信息 - 下", weight=ft.FontWeight.BOLD), + ft.Divider(), + ft.Text("..."), # 下半部分占位符 + # 将按钮放在底部 + # Wrap the Row in a Container to apply padding + ft.Container( + content=ft.Row( + [console_action_button, toggle_button], + # alignment=ft.MainAxisAlignment.SPACE_AROUND, + alignment=ft.MainAxisAlignment.START, # Align buttons to the start + ), + # Apply padding to the container holding the row + padding=ft.padding.only(bottom=10), + ), + ], + # height=100, # 可以给下半部分固定高度,或者让它自适应 + spacing=5, + # Remove padding from the Column itself + # padding=ft.padding.only(bottom=10) + ) + info_column = ft.Column( + controls=[ + # ft.Text("附加信息区", weight=ft.FontWeight.BOLD), + # ft.Divider(), + info_top_section, + info_bottom_section, + ], + width=250, # 增加宽度 + # scroll=ft.ScrollMode.ADAPTIVE, # 内部分区滚动,外部不需要 + spacing=10, # 分区之间的间距 + ) + + # --- Set Initial Button State --- # + # Call the helper AFTER the button is created and stored in state + is_initially_running = app_state.bot_pid is not None and psutil.pid_exists(app_state.bot_pid) + update_buttons_state(page, app_state, is_running=is_initially_running) + + # --- 视图布局 --- # + return ft.View( + "/console", # View route + [ + ft.AppBar(title=ft.Text("Mai控制台")), + # --- 主要内容区域改为 Row --- # + ft.Row( + controls=[ + # --- 左侧 Column (可扩展) --- # + ft.Column( + controls=[ + # 1. Console Output Area + ft.Container( + content=output_list_view, # From state + expand=5, # 在左侧 Column 内部分配比例 + border=ft.border.only(bottom=ft.border.BorderSide(1, ft.colors.OUTLINE)), + ), + # 2. Interest Monitor Area + ft.Container( + content=interest_monitor, # From state + expand=4, # 在左侧 Column 内部分配比例 + # border=ft.border.all(1, ft.colors.OUTLINE), # 可以去掉这里的边框 + # border_radius=ft.border_radius.all(5), + # padding=10, # 可以调整或去掉 + # margin=ft.margin.only(top=10), + ), + ], + expand=True, # 让左侧 Column 占据 Row 的大部分空间 + ), + # --- 右侧 Column (固定宽度) --- # + info_column, + ], + expand=True, # 让 Row 填满 AppBar 下方的空间 + ), + ], + padding=0, # View padding set to 0 + # Flet automatically handles calling will_unmount on UserControls like InterestMonitorDisplay + # when the view is removed or the app closes. + # on_disappear=lambda _: asyncio.create_task(interest_monitor.will_unmount_async()) if interest_monitor else None + ) + + +# --- Adapters View --- # +def create_adapters_view(page: ft.Page, app_state: "AppState") -> ft.View: + """Creates the view for managing adapters (/adapters).""" + # Import necessary functions + from .config_manager import save_config + from .utils import show_snackbar # Removed run_script import + + # Import process management functions + from .process_manager import start_managed_process, stop_managed_process + import psutil # To check if PID exists for status + + adapters_list_view = ft.ListView(expand=True, spacing=5) + + def update_adapters_list(): + """Refreshes the list view with current adapter paths and status-dependent buttons.""" + adapters_list_view.controls.clear() + for index, path in enumerate(app_state.adapter_paths): + process_id = path # Use path as the unique ID for now + process_state = app_state.managed_processes.get(process_id) + is_running = False + if ( + process_state + and process_state.status == "running" + and process_state.pid + and psutil.pid_exists(process_state.pid) + ): + is_running = True + + action_buttons = [] + if is_running: + # If running: View Output Button and Stop Button + action_buttons.append( + ft.IconButton( + ft.icons.VISIBILITY_OUTLINED, + tooltip="查看输出", + data=process_id, + on_click=lambda e: page.go(f"/adapters/{e.control.data}"), + icon_color=ft.colors.BLUE_GREY, # Neutral color + ) + ) + action_buttons.append( + ft.IconButton( + ft.icons.STOP_CIRCLE_OUTLINED, + tooltip="停止此适配器", + data=process_id, + # Call stop and then refresh the list view + on_click=lambda e: ( + stop_managed_process(e.control.data, page, app_state), + update_adapters_list(), + ), + icon_color=ft.colors.RED_ACCENT, + ) + ) + else: + # If stopped: Start Button + action_buttons.append( + ft.IconButton( + ft.icons.PLAY_ARROW_OUTLINED, + tooltip="启动此适配器脚本", + data=path, + on_click=lambda e: start_adapter_process(e, page, app_state), + icon_color=ft.colors.GREEN, + ) + ) + + adapters_list_view.controls.append( + ft.Row( + [ + ft.Text(path, expand=True, overflow=ft.TextOverflow.ELLIPSIS), + # Add action buttons based on state + *action_buttons, + # Keep the remove button + ft.IconButton( + ft.icons.DELETE_OUTLINE, + tooltip="移除此适配器", + data=index, # Store index to know which one to remove + on_click=remove_adapter, + icon_color=ft.colors.ERROR, + ), + ], + alignment=ft.MainAxisAlignment.SPACE_BETWEEN, + ) + ) + # Trigger update if the list view is part of the page + if adapters_list_view.page: + adapters_list_view.update() + + def remove_adapter(e): + """Removes an adapter path based on the button's data (index).""" + index_to_remove = e.control.data + if 0 <= index_to_remove < len(app_state.adapter_paths): + removed_path = app_state.adapter_paths.pop(index_to_remove) + app_state.gui_config["adapters"] = app_state.adapter_paths + if save_config(app_state.gui_config): + update_adapters_list() + show_snackbar(page, f"已移除: {removed_path}") + else: + show_snackbar(page, "保存配置失败,未能移除", error=True) + # Revert state + app_state.adapter_paths.insert(index_to_remove, removed_path) + app_state.gui_config["adapters"] = app_state.adapter_paths + else: + show_snackbar(page, "移除时发生错误:无效索引", error=True) + + # --- Start Adapter Process Handler --- # + def start_adapter_process(e, page: ft.Page, app_state: "AppState"): + """Handles the click event for the start adapter button.""" + path_to_run = e.control.data + if not path_to_run or not isinstance(path_to_run, str): + show_snackbar(page, "运行错误:无效的适配器路径", error=True) + return + + display_name = os.path.basename(path_to_run) # Use filename as display name + process_id = path_to_run # Use path as ID + print(f"[Adapters View] Requesting start for: {display_name} (ID: {process_id})") + + # Call the generic start function from process_manager + # It will create the specific ListView in the state + success, message = start_managed_process( + script_path=path_to_run, + display_name=display_name, + page=page, + app_state=app_state, + # No target_list_view needed here, it creates its own + ) + + if success: + show_snackbar(page, f"正在启动: {display_name}") + update_adapters_list() # Refresh button states + # Navigate to the specific output view for this process + page.go(f"/adapters/{process_id}") + else: + # Error message already shown by start_managed_process via snackbar + update_adapters_list() # Refresh button states even on failure + + # --- Initial population of the list --- # + update_adapters_list() + + new_adapter_path_field = ft.TextField(label="新适配器路径 (.py 文件)", expand=True) + + # --- File Picker Logic --- # + def pick_adapter_file_result(e: ft.FilePickerResultEvent): + """Callback when the file picker dialog closes.""" + if e.files: + selected_file = e.files[0] # Get the first selected file + new_adapter_path_field.value = selected_file.path + new_adapter_path_field.update() + show_snackbar(page, f"已选择文件: {os.path.basename(selected_file.path)}") + else: + show_snackbar(page, "未选择文件") + + def open_file_picker(e): + """Opens the file picker dialog.""" + if app_state.file_picker: + app_state.file_picker.on_result = pick_adapter_file_result + app_state.file_picker.pick_files( + allow_multiple=False, + allowed_extensions=["py"], # Only allow Python files + dialog_title="选择适配器 Python 文件", + ) + else: + show_snackbar(page, "错误:无法打开文件选择器", error=True) + + # Ensure the file picker's on_result is connected when the view is created + if app_state.file_picker: + app_state.file_picker.on_result = pick_adapter_file_result + else: + # This case shouldn't happen if launcher.py runs correctly + print("[create_adapters_view] Warning: FilePicker not available during view creation.") + + def add_adapter(e): + """Adds a new adapter path to the list and config.""" + new_path = new_adapter_path_field.value.strip() + + if not new_path: + show_snackbar(page, "请输入适配器路径", error=True) + return + # Basic validation (you might want more robust checks) + if not new_path.lower().endswith(".py"): + show_snackbar(page, "路径应指向一个 Python (.py) 文件", error=True) + return + # Optional: Check if the file actually exists? Might be too strict. + # if not os.path.exists(new_path): + # show_snackbar(page, f"文件未找到: {new_path}", error=True) + # return + + if new_path in app_state.adapter_paths: + show_snackbar(page, "此适配器路径已存在") + return + + app_state.adapter_paths.append(new_path) + app_state.gui_config["adapters"] = app_state.adapter_paths + + save_successful = save_config(app_state.gui_config) + + if save_successful: + new_adapter_path_field.value = "" # Clear input field + update_adapters_list() # Update the list view + new_adapter_path_field.update() # Update the input field visually + show_snackbar(page, "适配器已添加") + else: + show_snackbar(page, "保存配置失败", error=True) + # Revert state if save failed + try: # Add try-except just in case pop fails unexpectedly + app_state.adapter_paths.pop() + app_state.gui_config["adapters"] = app_state.adapter_paths + except IndexError: + pass # Silently ignore if list was empty during failed save + + return ft.View( + "/adapters", + [ + ft.AppBar(title=ft.Text("适配器管理"), bgcolor=ft.colors.SURFACE_VARIANT), + # Use a Container with the padding property instead + ft.Container( + padding=ft.padding.all(10), # Set padding property on the Container + content=ft.Column( # Place the original content inside the Container + [ + ft.Text("已配置的适配器:"), + adapters_list_view, # ListView for adapters + ft.Divider(), + ft.Row( + [ + new_adapter_path_field, + # --- Add Browse Button --- # + ft.IconButton( + ft.icons.FOLDER_OPEN_OUTLINED, + tooltip="浏览文件...", + on_click=open_file_picker, # Call the file picker opener + ), + ft.IconButton(ft.icons.ADD_CIRCLE_OUTLINE, tooltip="添加适配器", on_click=add_adapter), + ] + ), + ], + expand=True, + ), + ), + ], + ) + + +# --- Settings View --- # +def create_settings_view(page: ft.Page, app_state: "AppState") -> ft.View: + """Creates the settings view (/settings).""" + return ft.View( + "/settings", + [ + ft.AppBar(title=ft.Text("设置"), bgcolor=ft.colors.SURFACE_VARIANT), + # Pass padding value positionally, use content keyword argument + # ft.Padding(ft.padding.all(20), content=ft.Text("设置选项将在此处显示...")), + # Use a Container with the padding property instead + ft.Container( + padding=ft.padding.all(20), # Set padding property on the Container + content=ft.Text("设置选项将在此处显示..."), # Place the original content inside + ), + # Add settings controls here later + ], + ) + + +# --- Process Output View (for Adapters etc.) --- # +def create_process_output_view(page: ft.Page, app_state: "AppState", process_id: str) -> Optional[ft.View]: + """Creates a view to display the output of a specific managed process.""" + # Import stop function + from .process_manager import stop_managed_process + + process_state = app_state.managed_processes.get(process_id) + + if not process_state: + print(f"[Create Output View] Error: Process state not found for ID: {process_id}") + # Optionally show an error view or navigate back + # For now, return None, route_change might handle this + return None + + # Get or create the ListView for this process + # It should have been created and stored by start_managed_process + if process_state.output_list_view is None: + print(f"[Create Output View] Warning: ListView not found in state for {process_id}. Creating fallback.") + # Create a fallback, though this indicates an issue elsewhere + process_state.output_list_view = ft.ListView(expand=True, spacing=2, padding=5, auto_scroll=True) + process_state.output_list_view.controls.append( + ft.Text( + "--- Error: Output view created unexpectedly. Process might need restart. ---", + italic=True, + color=ft.colors.ERROR, + ) + ) + + output_lv = process_state.output_list_view + + # --- Stop Button --- # + stop_button = ft.ElevatedButton( + "停止进程", + icon=ft.icons.STOP_CIRCLE_OUTLINED, + on_click=lambda _: stop_managed_process(process_id, page, app_state), + bgcolor=ft.colors.with_opacity(0.6, ft.colors.RED_ACCENT_100), + color=ft.colors.WHITE, + tooltip=f"停止 {process_state.display_name}", + ) + + # --- Auto-scroll Toggle (Specific to this view) --- # + # Create a local state for this view's scroll toggle + is_this_view_auto_scroll = ft.Ref[bool]() + is_this_view_auto_scroll.current = True # Default to true + output_lv.auto_scroll = is_this_view_auto_scroll.current + + def toggle_this_view_auto_scroll(e): + is_this_view_auto_scroll.current = not is_this_view_auto_scroll.current + output_lv.auto_scroll = is_this_view_auto_scroll.current + e.control.text = "自动滚动 开" if is_this_view_auto_scroll.current else "自动滚动 关" + e.control.update() + print(f"Process '{process_id}' view auto-scroll set to: {is_this_view_auto_scroll.current}") + + auto_scroll_button = ft.OutlinedButton( + "自动滚动 开" if is_this_view_auto_scroll.current else "自动滚动 关", + # icon=ft.icons.SCROLLING, + icon=ft.icons.SWAP_VERT, # Use a valid icon for toggling + on_click=toggle_this_view_auto_scroll, + tooltip="切换此视图的自动滚动", + ) + + return ft.View( + route=f"/adapters/{process_id}", # Dynamic route + appbar=ft.AppBar( + title=ft.Text(f"输出: {process_state.display_name}"), + bgcolor=ft.colors.SURFACE_VARIANT, + actions=[ + stop_button, + auto_scroll_button, + ft.Container(width=5), # Spacer + ], + ), + controls=[ + output_lv # Display the specific ListView for this process + ], + padding=0, + ) diff --git a/src/MaiGoi/utils.py b/src/MaiGoi/utils.py new file mode 100644 index 000000000..20d58eec6 --- /dev/null +++ b/src/MaiGoi/utils.py @@ -0,0 +1,126 @@ +import flet as ft +import os +import sys +import subprocess +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from .state import AppState # Avoid circular import for type hinting + + +async def update_page_safe(page: Optional[ft.Page]): + """Safely call page.update() if the page object is valid.""" + if page: + try: + await page.update() + except Exception: + # Reduce noise, perhaps only print if debug is enabled later + # print(f"Error during safe page update: {e}") + pass # Silently ignore update errors, especially during shutdown + + +def show_snackbar(page: Optional[ft.Page], message: str, error: bool = False): + """Helper function to display a SnackBar.""" + if not page: + print(f"[Snackbar - No Page] {'Error' if error else 'Info'}: {message}") + return + try: + page.snack_bar = ft.SnackBar( + ft.Text(message), + bgcolor=ft.colors.ERROR if error else None, + open=True, + ) + page.update() + except Exception as e: + print(f"Error showing snackbar: {e}") + + +def run_script(script_path: str, page: Optional["ft.Page"], app_state: Optional["AppState"], is_python: bool = False): + """Runs a script file (.bat or .py) in a new process/window.""" + if not app_state or not app_state.script_dir: + print("[run_script] Error: AppState or script_dir not available.", flush=True) + if page: + show_snackbar(page, "错误:无法确定脚本目录", error=True) + return + + # Construct the full path to the script + full_script_path = os.path.join(app_state.script_dir, script_path) + print(f"[run_script] Attempting to run: {full_script_path}", flush=True) + + try: + if not os.path.exists(full_script_path): + print(f"[run_script] Error: Script file not found: {full_script_path}", flush=True) + if page: + show_snackbar(page, f"错误:脚本文件未找到\\n{script_path}", error=True) + return + + # --- Platform-specific execution --- # + if sys.platform == "win32": + if script_path.lower().endswith(".bat"): + print("[run_script] Using 'start cmd /k' for .bat on Windows.", flush=True) + # Use start cmd /k to keep the window open after script finishes + subprocess.Popen(f'start cmd /k "{full_script_path}"', shell=True, cwd=app_state.script_dir) + elif script_path.lower().endswith(".py"): + print("[run_script] Using Python executable for .py on Windows.", flush=True) + # Run Python script using the current interpreter in a new console window + # Using sys.executable ensures the correct Python environment is used. + # 'start' is a cmd command, so shell=True is needed. + # We don't use /k here, the Python process itself will keep the window open if needed (e.g., input()). + subprocess.Popen( + f'start "Running {script_path}" "{sys.executable}" "{full_script_path}"', + shell=True, + cwd=app_state.script_dir, + ) + else: + print( + f"[run_script] Attempting generic 'start' for unknown file type on Windows: {script_path}", + flush=True, + ) + # Try generic start for other file types, might open associated program + subprocess.Popen(f'start "{full_script_path}"', shell=True, cwd=app_state.script_dir) + else: # Linux/macOS + if script_path.lower().endswith(".py"): + print("[run_script] Using Python executable for .py on non-Windows.", flush=True) + # On Unix-like systems, we typically need a terminal emulator to see output. + # This example uses xterm, adjust if needed for other terminals (gnome-terminal, etc.) + # The '-e' flag is common for executing a command. + try: + subprocess.Popen(["xterm", "-e", sys.executable, full_script_path], cwd=app_state.script_dir) + except FileNotFoundError: + print( + "[run_script] xterm not found. Trying to run Python directly (output might be lost).", + flush=True, + ) + try: + subprocess.Popen([sys.executable, full_script_path], cwd=app_state.script_dir) + except Exception as e_direct: + print(f"[run_script] Error running Python script directly: {e_direct}", flush=True) + if page: + show_snackbar(page, f"运行脚本时出错: {e_direct}", error=True) + return + elif os.access(full_script_path, os.X_OK): # Check if it's executable + print("[run_script] Running executable script directly on non-Windows.", flush=True) + # Similar terminal issue might apply here if it's a console app + try: + subprocess.Popen([full_script_path], cwd=app_state.script_dir) + except Exception as e_exec: + print(f"[run_script] Error running executable script: {e_exec}", flush=True) + if page: + show_snackbar(page, f"运行脚本时出错: {e_exec}", error=True) + return + else: + print( + f"[run_script] Don't know how to run non-executable, non-python script on non-Windows: {script_path}", + flush=True, + ) + if page: + show_snackbar(page, f"无法运行此类型的文件: {script_path}", error=True) + return + + if page: + show_snackbar(page, f"正在尝试运行脚本: {script_path}") + + except Exception as e: + print(f"[run_script] Unexpected error running script '{script_path}': {e}", flush=True) + if page: + show_snackbar(page, f"运行脚本时发生意外错误: {e}", error=True) diff --git a/src/common/logger.py b/src/common/logger.py index a82c6d883..5b03c7cfe 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -358,6 +358,23 @@ SUB_HEARTFLOW_STYLE_CONFIG = { }, } +INTEREST_CHAT_STYLE_CONFIG = { + "advanced": { + "console_format": ( + "{time:YYYY-MM-DD HH:mm:ss} | " + "{level: <8} | " + "兴趣 | " + "{message}" + ), + "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 兴趣 | {message}", + }, + "simple": { + "console_format": "{time:MM-DD HH:mm} | 兴趣 | {message}", # noqa: E501 + "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 兴趣 | {message}", + }, +} + + SUB_HEARTFLOW_MIND_STYLE_CONFIG = { "advanced": { "console_format": ( @@ -878,6 +895,9 @@ CHAT_MESSAGE_STYLE_CONFIG = ( ) CHAT_IMAGE_STYLE_CONFIG = CHAT_IMAGE_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else CHAT_IMAGE_STYLE_CONFIG["advanced"] INIT_STYLE_CONFIG = INIT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else INIT_STYLE_CONFIG["advanced"] +INTEREST_CHAT_STYLE_CONFIG = ( + INTEREST_CHAT_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else INTEREST_CHAT_STYLE_CONFIG["advanced"] +) def is_registered_module(record: dict) -> bool: diff --git a/src/common/logger_manager.py b/src/common/logger_manager.py index 5c5538385..93be012b9 100644 --- a/src/common/logger_manager.py +++ b/src/common/logger_manager.py @@ -41,6 +41,7 @@ from src.common.logger import ( CHAT_MESSAGE_STYLE_CONFIG, CHAT_IMAGE_STYLE_CONFIG, INIT_STYLE_CONFIG, + INTEREST_CHAT_STYLE_CONFIG, ) # 可根据实际需要补充更多模块配置 @@ -86,6 +87,7 @@ MODULE_LOGGER_CONFIGS = { "chat_message": CHAT_MESSAGE_STYLE_CONFIG, # 聊天消息 "chat_image": CHAT_IMAGE_STYLE_CONFIG, # 聊天图片 "init": INIT_STYLE_CONFIG, # 初始化 + "interest_chat": INTEREST_CHAT_STYLE_CONFIG, # 兴趣 # ...如有更多模块,继续添加... } diff --git a/src/heart_flow/interest_chatting.py b/src/heart_flow/interest_chatting.py new file mode 100644 index 000000000..4525d09d2 --- /dev/null +++ b/src/heart_flow/interest_chatting.py @@ -0,0 +1,200 @@ +import asyncio +from src.config.config import global_config +from typing import Optional, Dict +import traceback +from src.common.logger_manager import get_logger +from src.plugins.chat.message import MessageRecv +import math + + +# 定义常量 (从 interest.py 移动过来) +MAX_INTEREST = 15.0 + +logger = get_logger("interest_chatting") + +PROBABILITY_INCREASE_RATE_PER_SECOND = 0.1 +PROBABILITY_DECREASE_RATE_PER_SECOND = 0.1 +MAX_REPLY_PROBABILITY = 1 + + +class InterestChatting: + def __init__( + self, + decay_rate=global_config.default_decay_rate_per_second, + max_interest=MAX_INTEREST, + trigger_threshold=global_config.reply_trigger_threshold, + max_probability=MAX_REPLY_PROBABILITY, + ): + # 基础属性初始化 + self.interest_level: float = 0.0 + self.decay_rate_per_second: float = decay_rate + self.max_interest: float = max_interest + + self.trigger_threshold: float = trigger_threshold + self.max_reply_probability: float = max_probability + self.is_above_threshold: bool = False + + # 任务相关属性初始化 + self.update_task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() + self._task_lock = asyncio.Lock() + self._is_running = False + + self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {} + self.update_interval = 1.0 + + self.above_threshold = False + self.start_hfc_probability = 0.0 + + async def initialize(self): + async with self._task_lock: + if self._is_running: + logger.debug("后台兴趣更新任务已在运行中。") + return + + # 清理已完成或已取消的任务 + if self.update_task and (self.update_task.done() or self.update_task.cancelled()): + self.update_task = None + + if not self.update_task: + self._stop_event.clear() + self._is_running = True + self.update_task = asyncio.create_task(self._run_update_loop(self.update_interval)) + logger.debug("后台兴趣更新任务已创建并启动。") + + def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool): + """添加消息到兴趣字典 + + 参数: + message: 接收到的消息 + interest_value: 兴趣值 + is_mentioned: 是否被提及 + + 功能: + 1. 将消息添加到兴趣字典 + 2. 更新最后交互时间 + 3. 如果字典长度超过10,删除最旧的消息 + """ + # 添加新消息 + self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) + + # 如果字典长度超过10,删除最旧的消息 + if len(self.interest_dict) > 10: + oldest_key = next(iter(self.interest_dict)) + self.interest_dict.pop(oldest_key) + + async def _calculate_decay(self): + """计算兴趣值的衰减 + + 参数: + current_time: 当前时间戳 + + 处理逻辑: + 1. 计算时间差 + 2. 处理各种异常情况(负值/零值) + 3. 正常计算衰减 + 4. 更新最后更新时间 + """ + + # 处理极小兴趣值情况 + if self.interest_level < 1e-9: + self.interest_level = 0.0 + return + + # 异常情况处理 + if self.decay_rate_per_second <= 0: + logger.warning(f"衰减率({self.decay_rate_per_second})无效,重置兴趣值为0") + self.interest_level = 0.0 + return + + # 正常衰减计算 + try: + decay_factor = math.pow(self.decay_rate_per_second, self.update_interval) + self.interest_level *= decay_factor + except ValueError as e: + logger.error( + f"衰减计算错误: {e} 参数: 衰减率={self.decay_rate_per_second} 时间差={self.update_interval} 当前兴趣={self.interest_level}" + ) + self.interest_level = 0.0 + + async def _update_reply_probability(self): + self.above_threshold = self.interest_level >= self.trigger_threshold + if self.above_threshold: + self.start_hfc_probability += PROBABILITY_INCREASE_RATE_PER_SECOND + else: + if self.start_hfc_probability > 0: + self.start_hfc_probability = max(0, self.start_hfc_probability - PROBABILITY_DECREASE_RATE_PER_SECOND) + + async def increase_interest(self, value: float): + self.interest_level += value + self.interest_level = min(self.interest_level, self.max_interest) + + async def decrease_interest(self, value: float): + self.interest_level -= value + self.interest_level = max(self.interest_level, 0.0) + + async def get_interest(self) -> float: + return self.interest_level + + async def get_state(self) -> dict: + interest = self.interest_level # 直接使用属性值 + return { + "interest_level": round(interest, 2), + "start_hfc_probability": round(self.start_hfc_probability, 4), + "above_threshold": self.above_threshold, + } + + # --- 新增后台更新任务相关方法 --- + async def _run_update_loop(self, update_interval: float = 1.0): + """后台循环,定期更新兴趣和回复概率。""" + try: + while not self._stop_event.is_set(): + try: + if self.interest_level != 0: + await self._calculate_decay() + + await self._update_reply_probability() + + # 等待下一个周期或停止事件 + await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval) + except asyncio.TimeoutError: + # 正常超时,继续循环 + continue + except Exception as e: + logger.error(f"InterestChatting 更新循环出错: {e}") + logger.error(traceback.format_exc()) + # 防止错误导致CPU飙升,稍作等待 + await asyncio.sleep(5) + except asyncio.CancelledError: + logger.info("InterestChatting 更新循环被取消。") + finally: + self._is_running = False + logger.info("InterestChatting 更新循环已停止。") + + async def stop_updates(self): + """停止后台更新任务,使用锁确保并发安全""" + async with self._task_lock: + if not self._is_running: + logger.debug("后台兴趣更新任务未运行。") + return + + logger.info("正在停止 InterestChatting 后台更新任务...") + self._stop_event.set() + + if self.update_task and not self.update_task.done(): + try: + # 等待任务结束,设置超时 + await asyncio.wait_for(self.update_task, timeout=5.0) + logger.info("InterestChatting 后台更新任务已成功停止。") + except asyncio.TimeoutError: + logger.warning("停止 InterestChatting 后台任务超时,尝试取消...") + self.update_task.cancel() + try: + await self.update_task # 等待取消完成 + except asyncio.CancelledError: + logger.info("InterestChatting 后台更新任务已被取消。") + except Exception as e: + logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}") + finally: + self.update_task = None + self._is_running = False diff --git a/src/heart_flow/interest_logger.py b/src/heart_flow/interest_logger.py index 06d3f1cb0..38baf4785 100644 --- a/src/heart_flow/interest_logger.py +++ b/src/heart_flow/interest_logger.py @@ -29,6 +29,14 @@ def _ensure_log_directory(): logger.info(f"已确保日志目录 '{LOG_DIRECTORY}' 存在") +def _clear_and_create_log_file(): + """清除日志文件并创建新的日志文件。""" + if os.path.exists(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)): + os.remove(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME)) + with open(os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME), "w", encoding="utf-8") as f: + f.write("") + + class InterestLogger: """负责定期记录主心流和所有子心流的状态到日志文件。""" @@ -44,6 +52,7 @@ class InterestLogger: self.heartflow = heartflow # 存储 Heartflow 实例 self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME) _ensure_log_directory() + _clear_and_create_log_file() async def get_all_subflow_states(self) -> Dict[str, Dict]: """并发获取所有活跃子心流的当前完整状态。""" diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index eb8bbabdb..66d50762e 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -1,215 +1,22 @@ from .observation import Observation, ChattingObservation import asyncio -from src.config.config import global_config import time from typing import Optional, List, Dict, Tuple, Callable, Coroutine import traceback from src.common.logger_manager import get_logger from src.plugins.chat.message import MessageRecv from src.plugins.chat.chat_stream import chat_manager -import math from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting from src.plugins.heartFC_chat.normal_chat import NormalChat from src.heart_flow.mai_state_manager import MaiStateInfo from src.heart_flow.chat_state_info import ChatState, ChatStateInfo from src.heart_flow.sub_mind import SubMind from .utils_chat import get_chat_type_and_target_info +from .interest_chatting import InterestChatting -# 定义常量 (从 interest.py 移动过来) -MAX_INTEREST = 15.0 - logger = get_logger("sub_heartflow") -PROBABILITY_INCREASE_RATE_PER_SECOND = 0.1 -PROBABILITY_DECREASE_RATE_PER_SECOND = 0.1 -MAX_REPLY_PROBABILITY = 1 - - -class InterestChatting: - def __init__( - self, - decay_rate=global_config.default_decay_rate_per_second, - max_interest=MAX_INTEREST, - trigger_threshold=global_config.reply_trigger_threshold, - max_probability=MAX_REPLY_PROBABILITY, - ): - # 基础属性初始化 - self.interest_level: float = 0.0 - self.decay_rate_per_second: float = decay_rate - self.max_interest: float = max_interest - - self.trigger_threshold: float = trigger_threshold - self.max_reply_probability: float = max_probability - self.is_above_threshold: bool = False - - # 任务相关属性初始化 - self.update_task: Optional[asyncio.Task] = None - self._stop_event = asyncio.Event() - self._task_lock = asyncio.Lock() - self._is_running = False - - self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {} - self.update_interval = 1.0 - - self.above_threshold = False - self.start_hfc_probability = 0.0 - - async def initialize(self): - async with self._task_lock: - if self._is_running: - logger.debug("后台兴趣更新任务已在运行中。") - return - - # 清理已完成或已取消的任务 - if self.update_task and (self.update_task.done() or self.update_task.cancelled()): - self.update_task = None - - if not self.update_task: - self._stop_event.clear() - self._is_running = True - self.update_task = asyncio.create_task(self._run_update_loop(self.update_interval)) - logger.debug("后台兴趣更新任务已创建并启动。") - - def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool): - """添加消息到兴趣字典 - - 参数: - message: 接收到的消息 - interest_value: 兴趣值 - is_mentioned: 是否被提及 - - 功能: - 1. 将消息添加到兴趣字典 - 2. 更新最后交互时间 - 3. 如果字典长度超过10,删除最旧的消息 - """ - # 添加新消息 - self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) - - # 如果字典长度超过10,删除最旧的消息 - if len(self.interest_dict) > 10: - oldest_key = next(iter(self.interest_dict)) - self.interest_dict.pop(oldest_key) - - async def _calculate_decay(self): - """计算兴趣值的衰减 - - 参数: - current_time: 当前时间戳 - - 处理逻辑: - 1. 计算时间差 - 2. 处理各种异常情况(负值/零值) - 3. 正常计算衰减 - 4. 更新最后更新时间 - """ - - # 处理极小兴趣值情况 - if self.interest_level < 1e-9: - self.interest_level = 0.0 - return - - # 异常情况处理 - if self.decay_rate_per_second <= 0: - logger.warning(f"衰减率({self.decay_rate_per_second})无效,重置兴趣值为0") - self.interest_level = 0.0 - return - - # 正常衰减计算 - try: - decay_factor = math.pow(self.decay_rate_per_second, self.update_interval) - self.interest_level *= decay_factor - except ValueError as e: - logger.error( - f"衰减计算错误: {e} 参数: 衰减率={self.decay_rate_per_second} 时间差={self.update_interval} 当前兴趣={self.interest_level}" - ) - self.interest_level = 0.0 - - async def _update_reply_probability(self): - self.above_threshold = self.interest_level >= self.trigger_threshold - if self.above_threshold: - self.start_hfc_probability += PROBABILITY_INCREASE_RATE_PER_SECOND - else: - if self.start_hfc_probability > 0: - self.start_hfc_probability = max(0, self.start_hfc_probability - PROBABILITY_DECREASE_RATE_PER_SECOND) - - async def increase_interest(self, value: float): - self.interest_level += value - self.interest_level = min(self.interest_level, self.max_interest) - - async def decrease_interest(self, value: float): - self.interest_level -= value - self.interest_level = max(self.interest_level, 0.0) - - async def get_interest(self) -> float: - return self.interest_level - - async def get_state(self) -> dict: - interest = self.interest_level # 直接使用属性值 - return { - "interest_level": round(interest, 2), - "start_hfc_probability": round(self.start_hfc_probability, 4), - "above_threshold": self.above_threshold, - } - - # --- 新增后台更新任务相关方法 --- - async def _run_update_loop(self, update_interval: float = 1.0): - """后台循环,定期更新兴趣和回复概率。""" - try: - while not self._stop_event.is_set(): - try: - if self.interest_level != 0: - await self._calculate_decay() - - await self._update_reply_probability() - - # 等待下一个周期或停止事件 - await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval) - except asyncio.TimeoutError: - # 正常超时,继续循环 - continue - except Exception as e: - logger.error(f"InterestChatting 更新循环出错: {e}") - logger.error(traceback.format_exc()) - # 防止错误导致CPU飙升,稍作等待 - await asyncio.sleep(5) - except asyncio.CancelledError: - logger.info("InterestChatting 更新循环被取消。") - finally: - self._is_running = False - logger.info("InterestChatting 更新循环已停止。") - - async def stop_updates(self): - """停止后台更新任务,使用锁确保并发安全""" - async with self._task_lock: - if not self._is_running: - logger.debug("后台兴趣更新任务未运行。") - return - - logger.info("正在停止 InterestChatting 后台更新任务...") - self._stop_event.set() - - if self.update_task and not self.update_task.done(): - try: - # 等待任务结束,设置超时 - await asyncio.wait_for(self.update_task, timeout=5.0) - logger.info("InterestChatting 后台更新任务已成功停止。") - except asyncio.TimeoutError: - logger.warning("停止 InterestChatting 后台任务超时,尝试取消...") - self.update_task.cancel() - try: - await self.update_task # 等待取消完成 - except asyncio.CancelledError: - logger.info("InterestChatting 后台更新任务已被取消。") - except Exception as e: - logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}") - finally: - self.update_task = None - self._is_running = False - - # --- 结束 新增方法 --- - class SubHeartflow: def __init__( diff --git a/src/plugins/remote/remote.py b/src/plugins/remote/remote.py index 1ba32ec98..5d8802712 100644 --- a/src/plugins/remote/remote.py +++ b/src/plugins/remote/remote.py @@ -15,33 +15,67 @@ remote_log_config = LogConfig( ) logger = get_module_logger("remote", config=remote_log_config) -# UUID文件路径 -UUID_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "client_uuid.json") +# --- 使用向上导航的方式定义路径 --- + +# 1. 获取当前文件 (remote.py) 所在的目录 +current_dir = os.path.dirname(os.path.abspath(__file__)) + +# 2. 从当前目录向上导航三级找到项目根目录 +# (src/plugins/remote/ -> src/plugins/ -> src/ -> project_root) +root_dir = os.path.abspath(os.path.join(current_dir, "..", "..", "..")) + +# 3. 定义 data 目录的路径 (位于项目根目录下) +data_dir = os.path.join(root_dir, "data") + +# 4. 定义 UUID 文件在 data 目录下的完整路径 +UUID_FILE = os.path.join(data_dir, "client_uuid.json") + +# --- 路径定义结束 --- # 生成或获取客户端唯一ID def get_unique_id(): + # --- 在尝试读写 UUID_FILE 之前确保 data 目录存在 --- + # 将目录检查和创建逻辑移到这里,在首次需要写入前执行 + try: + # exist_ok=True 意味着如果目录已存在也不会报错 + os.makedirs(data_dir, exist_ok=True) + except OSError as e: + # 处理可能的权限错误等 + logger.error(f"无法创建数据目录 {data_dir}: {e}") + # 根据你的错误处理逻辑,可能需要在这里返回错误或抛出异常 + # 暂且返回 None 或抛出,避免继续执行导致问题 + raise RuntimeError(f"无法创建必要的数据目录 {data_dir}") from e + # --- 目录检查结束 --- + # 检查是否已经有保存的UUID if os.path.exists(UUID_FILE): try: - with open(UUID_FILE, "r") as f: + with open(UUID_FILE, "r", encoding="utf-8") as f: # 指定 encoding data = json.load(f) if "client_id" in data: - # print("从本地文件读取客户端ID") + logger.debug(f"从本地文件读取客户端ID: {UUID_FILE}") return data["client_id"] except (json.JSONDecodeError, IOError) as e: - print(f"读取UUID文件出错: {e},将生成新的UUID") + logger.warning(f"读取UUID文件 {UUID_FILE} 出错: {e},将生成新的UUID") + except Exception as e: # 捕捉其他可能的异常 + logger.error(f"读取UUID文件 {UUID_FILE} 时发生未知错误: {e}") # 如果没有保存的UUID或读取出错,则生成新的 client_id = generate_unique_id() + logger.info(f"生成新的客户端ID: {client_id}") # 保存UUID到文件 try: - with open(UUID_FILE, "w") as f: - json.dump({"client_id": client_id}, f) - logger.info("已保存新生成的客户端ID到本地文件") + # 再次确认目录存在 (虽然理论上前面已创建,但更保险) + os.makedirs(data_dir, exist_ok=True) + with open(UUID_FILE, "w", encoding="utf-8") as f: # 指定 encoding + json.dump({"client_id": client_id}, f, indent=4) # 添加 indent 使json可读 + logger.info(f"已保存新生成的客户端ID到本地文件: {UUID_FILE}") except IOError as e: - logger.error(f"保存UUID时出错: {e}") + logger.error(f"保存UUID时出错: {UUID_FILE} - {e}") + except Exception as e: # 捕捉其他可能的异常 + logger.error(f"保存UUID文件 {UUID_FILE} 时发生未知错误: {e}") return client_id diff --git a/![新版麦麦开始学习.bat b/start_lpmm.bat similarity index 100% rename from ![新版麦麦开始学习.bat rename to start_lpmm.bat diff --git a/(测试版)麦麦生成人格 copy.bat b/start_personality.bat similarity index 100% rename from (测试版)麦麦生成人格 copy.bat rename to start_personality.bat diff --git a/(临时版)聊天兴趣监控.bat.bat b/(临时版)聊天兴趣监控.bat.bat deleted file mode 100644 index f26d14dee..000000000 --- a/(临时版)聊天兴趣监控.bat.bat +++ /dev/null @@ -1,26 +0,0 @@ -@echo off -CHCP 65001 > nul -setlocal enabledelayedexpansion - -REM 查找venv虚拟环境 -set "venv_path=%~dp0venv\Scripts\activate.bat" -if not exist "%venv_path%" ( - echo 错误: 未找到虚拟环境,请确保venv目录存在 - pause - exit /b 1 -) - -REM 激活虚拟环境 -call "%venv_path%" -if %ERRORLEVEL% neq 0 ( - echo 错误: 虚拟环境激活失败 - pause - exit /b 1 -) - -echo 虚拟环境已激活,正在启动 GUI... - -REM 运行 Python 脚本 -python scripts/interest_monitor_gui.py - -pause \ No newline at end of file