diff --git a/.gitignore b/.gitignore index e8a931078..a8c972ab4 100644 --- a/.gitignore +++ b/.gitignore @@ -35,7 +35,6 @@ config/bot_config.toml config/bot_config.toml.bak config/lpmm_config.toml config/lpmm_config.toml.bak -src/plugins/remote/client_uuid.json (测试版)麦麦生成人格.bat (临时版)麦麦开始学习.bat src/plugins/utils/statistic.py diff --git a/src/main.py b/src/main.py index fbb40e3c5..09570a4f2 100644 --- a/src/main.py +++ b/src/main.py @@ -3,9 +3,10 @@ import time from maim_message import MessageServer +from .plugins.remote.remote import TelemetryHeartBeatTask from .manager.async_task_manager import async_task_manager from .plugins.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask -from src.manager.mood_manager import logger, MoodPrintTask, MoodUpdateTask +from .manager.mood_manager import MoodPrintTask, MoodUpdateTask from .plugins.schedule.schedule_generator import bot_schedule from .plugins.emoji_system.emoji_manager import emoji_manager from .plugins.person_info.person_info import person_info_manager @@ -18,7 +19,6 @@ from .plugins.storage.storage import MessageStorage from .config.config import global_config from .plugins.chat.bot import chat_bot from .common.logger_manager import get_logger -from .plugins.remote import heartbeat_thread # noqa: F401 from .individuality.individuality import Individuality from .common.server import global_server, Server from rich.traceback import install @@ -59,6 +59,9 @@ class MainSystem: # 添加统计信息输出任务 await async_task_manager.add_task(StatisticOutputTask()) + # 添加遥测心跳任务 + await async_task_manager.add_task(TelemetryHeartBeatTask()) + # 启动API服务器 start_api_server() logger.success("API服务器启动成功") diff --git a/src/plugins/remote/__init__.py b/src/plugins/remote/__init__.py deleted file mode 100644 index 4cbce96d1..000000000 --- a/src/plugins/remote/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .remote import main - -# 启动心跳线程 -heartbeat_thread = main() diff --git a/src/plugins/remote/remote.py b/src/plugins/remote/remote.py index 68b023969..1d26df01b 100644 --- a/src/plugins/remote/remote.py +++ b/src/plugins/remote/remote.py @@ -1,248 +1,142 @@ +import asyncio + import requests -import time -import uuid import platform -import os -import json -import threading -import subprocess # from loguru import logger from src.common.logger_manager import get_logger from src.config.config import global_config +from src.manager.async_task_manager import AsyncTask +from src.manager.local_store_manager import local_storage logger = get_logger("remote") -# --- 使用向上导航的方式定义路径 --- - -# 1. 获取当前文件 (remote.py) 所在的目录 -current_dir = os.path.dirname(os.path.abspath(__file__)) - -# 2. 从当前目录向上导航三级找到项目根目录 -# (src/plugins/remote/ -> src/plugins/ -> src/ -> project_root) -root_dir = os.path.abspath(os.path.join(current_dir, "..", "..", "..")) - -# 3. 定义 data 目录的路径 (位于项目根目录下) -data_dir = os.path.join(root_dir, "data") - -# 4. 定义 UUID 文件在 data 目录下的完整路径 -UUID_FILE = os.path.join(data_dir, "client_uuid.json") - -# --- 路径定义结束 --- +TELEMETRY_SERVER_URL = "http://localhost:8080" +"""遥测服务地址""" -# 生成或获取客户端唯一ID -def get_unique_id(): - # --- 在尝试读写 UUID_FILE 之前确保 data 目录存在 --- - # 将目录检查和创建逻辑移到这里,在首次需要写入前执行 - try: - # exist_ok=True 意味着如果目录已存在也不会报错 - os.makedirs(data_dir, exist_ok=True) - except OSError as e: - # 处理可能的权限错误等 - logger.error(f"无法创建数据目录 {data_dir}: {e}") - # 根据你的错误处理逻辑,可能需要在这里返回错误或抛出异常 - # 暂且返回 None 或抛出,避免继续执行导致问题 - raise RuntimeError(f"无法创建必要的数据目录 {data_dir}") from e - # --- 目录检查结束 --- +class TelemetryHeartBeatTask(AsyncTask): + HEARTBEAT_INTERVAL = 300 - # 检查是否已经有保存的UUID - if os.path.exists(UUID_FILE): - try: - with open(UUID_FILE, "r", encoding="utf-8") as f: # 指定 encoding - data = json.load(f) - if "client_id" in data: - logger.debug(f"从本地文件读取客户端ID: {UUID_FILE}") - return data["client_id"] - except (json.JSONDecodeError, IOError) as e: - logger.warning(f"读取UUID文件 {UUID_FILE} 出错: {e},将生成新的UUID") - except Exception as e: # 捕捉其他可能的异常 - logger.error(f"读取UUID文件 {UUID_FILE} 时发生未知错误: {e}") + def __init__(self): + super().__init__(task_name="Telemetry Heart Beat Task", run_interval=self.HEARTBEAT_INTERVAL) + self.server_url = TELEMETRY_SERVER_URL + """遥测服务地址""" - # 如果没有保存的UUID或读取出错,则生成新的 - client_id = generate_unique_id() - logger.info(f"生成新的客户端ID: {client_id}") + self.client_uuid = local_storage["mmc_uuid"] if "mmc_uuid" in local_storage else None + """客户端UUID""" - # 保存UUID到文件 - try: - # 再次确认目录存在 (虽然理论上前面已创建,但更保险) - os.makedirs(data_dir, exist_ok=True) - with open(UUID_FILE, "w", encoding="utf-8") as f: # 指定 encoding - json.dump({"client_id": client_id}, f, indent=4) # 添加 indent 使json可读 - logger.info(f"已保存新生成的客户端ID到本地文件: {UUID_FILE}") - except IOError as e: - logger.error(f"保存UUID时出错: {UUID_FILE} - {e}") - except Exception as e: # 捕捉其他可能的异常 - logger.error(f"保存UUID文件 {UUID_FILE} 时发生未知错误: {e}") + self.info_dict = self._get_sys_info() + """系统信息字典""" - return client_id + @staticmethod + def _get_sys_info() -> dict[str, str]: + """获取系统信息""" + info_dict = { + "os_type": "Unknown", + "py_version": platform.python_version(), + "mmc_version": global_config.MAI_VERSION, + } + match platform.system(): + case "Windows": + info_dict["os_type"] = "Windows" + case "Linux": + info_dict["os_type"] = "Linux" + case "Darwin": + info_dict["os_type"] = "macOS" + case _: + info_dict["os_type"] = "Unknown" -# 生成客户端唯一ID -def generate_unique_id(): - # 基于机器码生成唯一ID,同一台机器上生成的UUID是固定的,只要机器码不变 - import hashlib + return info_dict - system_info = platform.system() - machine_code = None + async def _req_uuid(self) -> bool: + """ + 向服务端请求UUID(不应在已存在UUID的情况下调用,会覆盖原有的UUID) + """ - try: - if system_info == "Windows": - # 使用wmic命令获取主机UUID(更稳定) - result = subprocess.check_output( - "wmic csproduct get uuid", shell=True, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL - ) - lines = result.decode(errors="ignore").splitlines() - # 过滤掉空行和表头,只取有效UUID - uuids = [line.strip() for line in lines if line.strip() and line.strip().lower() != "uuid"] - if uuids: - uuid_val = uuids[0] - # logger.debug(f"主机UUID: {uuid_val}") - # 增加无效值判断 - if uuid_val and uuid_val.lower() not in ["to be filled by o.e.m.", "none", "", "standard"]: - machine_code = uuid_val - elif system_info == "Linux": - # 优先读取 /etc/machine-id,其次 /var/lib/dbus/machine-id,取第一个非空且内容有效的 - for path in ["/etc/machine-id", "/var/lib/dbus/machine-id"]: - if os.path.exists(path): - with open(path, "r") as f: - code = f.read().strip() - # 只要内容非空且不是全0 - if code and set(code) != {"0"}: - machine_code = code - break - elif system_info == "Darwin": - # macOS: 使用IOPlatformUUID - result = subprocess.check_output( - "ioreg -rd1 -c IOPlatformExpertDevice | awk '/IOPlatformUUID/'", shell=True - ) - uuid_line = result.decode(errors="ignore") - # 解析出 "IOPlatformUUID" = "xxxx-xxxx-xxxx-xxxx" - import re - - m = re.search(r'"IOPlatformUUID"\s*=\s*"([^"]+)"', uuid_line) - if m: - uuid_val = m.group(1) - logger.debug(f"IOPlatformUUID: {uuid_val}") - if uuid_val and uuid_val.lower() not in ["to be filled by o.e.m.", "none", "", "standard"]: - machine_code = uuid_val - except Exception as e: - logger.debug(f"获取机器码失败: {e}") - - # 如果主板序列号无效,尝试用MAC地址 - if not machine_code: - try: - mac = uuid.getnode() - if (mac >> 40) % 2 == 0: # 不是本地伪造MAC - machine_code = str(mac) - except Exception as e: - logger.debug(f"获取MAC地址失败: {e}") - - def md5_to_uuid(md5hex): - # 将32位md5字符串格式化为8-4-4-4-12的UUID格式 - return f"{md5hex[0:8]}-{md5hex[8:12]}-{md5hex[12:16]}-{md5hex[16:20]}-{md5hex[20:32]}" - - if machine_code: - # print(f"machine_code={machine_code!r}") # 可用于调试 - md5 = hashlib.md5(machine_code.encode("utf-8")).hexdigest() - uuid_str = md5_to_uuid(md5) - else: - uuid_str = str(uuid.uuid4()) - - unique_id = f"{system_info}-{uuid_str}" - return unique_id - - -def send_heartbeat(server_url, client_id): - """向服务器发送心跳""" - sys = platform.system() - try: - headers = {"Client-ID": client_id, "User-Agent": f"HeartbeatClient/{client_id[:8]}"} - data = json.dumps( - {"system": sys, "Version": global_config.MAI_VERSION}, - ) - logger.debug(f"正在发送心跳到服务器: {server_url}") - logger.debug(f"心跳数据: {data}") - response = requests.post(f"{server_url}/api/clients", headers=headers, data=data) - - if response.status_code == 201: - data = response.json() - logger.debug(f"心跳发送成功。服务器响应: {data}") - return True - else: - logger.debug(f"心跳发送失败。状态码: {response.status_code}, 响应内容: {response.text}") + if "deploy_time" not in local_storage: + logger.error("本地存储中缺少部署时间,无法请求UUID") return False - except requests.RequestException as e: - # 如果请求异常,可能是网络问题,不记录错误 - logger.debug(f"发送心跳时出错: {e}") - return False + try_count: int = 0 + while True: + # 如果不存在,则向服务端请求一个新的UUID(注册客户端) + logger.info("正在向遥测服务端请求UUID...") + try: + response = requests.post( + f"{TELEMETRY_SERVER_URL}/stat/reg_client", + json={"deploy_time": local_storage["deploy_time"]}, + ) -class HeartbeatThread(threading.Thread): - """心跳线程类""" + if response.status_code == 200: + data = response.json() + client_id = data.get("mmc_uuid") + if client_id: + # 将UUID存储到本地 + local_storage["mmc_uuid"] = client_id + self.client_uuid = client_id + logger.info(f"成功获取UUID: {self.client_uuid}") + return True # 成功获取UUID,返回True + else: + logger.error("无效的服务端响应") + else: + logger.error(f"请求UUID失败,状态码: {response.status_code}, 响应内容: {response.text}") + except requests.RequestException as e: + logger.error(f"请求UUID时出错: {e}") # 可能是网络问题 - def __init__(self, server_url, interval): - super().__init__(daemon=True) # 设置为守护线程,主程序结束时自动结束 - self.server_url = server_url - self.interval = interval - self.client_id = get_unique_id() - self.running = True - self.stop_event = threading.Event() # 添加事件对象用于可中断的等待 - self.last_heartbeat_time = 0 # 记录上次发送心跳的时间 - - def run(self): - """线程运行函数""" - logger.debug(f"心跳线程已启动,客户端ID: {self.client_id}") - - while self.running: - # 发送心跳 - if send_heartbeat(self.server_url, self.client_id): - logger.info(f"{self.interval}秒后发送下一次心跳...") + # 请求失败,重试次数+1 + try_count += 1 + if try_count > 3: + # 如果超过3次仍然失败,则退出 + logger.error("获取UUID失败,请检查网络连接或服务端状态") + return False else: - logger.info(f"{self.interval}秒后重试...") + # 如果可以重试,等待后继续(指数退避) + await asyncio.sleep(4**try_count) - self.last_heartbeat_time = time.time() + async def _send_heartbeat(self): + """向服务器发送心跳""" + try: + headers = { + "Client-UUID": self.client_uuid, + "User-Agent": f"HeartbeatClient/{self.client_uuid[:8]}", + } - # 使用可中断的等待代替 sleep - # 每秒检查一次是否应该停止或发送心跳 - remaining_wait = self.interval - while remaining_wait > 0 and self.running: - # 每次最多等待1秒,便于及时响应停止请求 - wait_time = min(1, remaining_wait) - if self.stop_event.wait(wait_time): - break # 如果事件被设置,立即退出等待 - remaining_wait -= wait_time + logger.debug(f"正在发送心跳到服务器: {self.server_url}") - # 检查是否由于外部原因导致间隔异常延长 - if time.time() - self.last_heartbeat_time >= self.interval * 1.5: - logger.warning("检测到心跳间隔异常延长,立即发送心跳") - break + response = requests.post( + f"{self.server_url}/stat/client_heartbeat", + headers=headers, + json=self.info_dict, + ) - def stop(self): - """停止线程""" - self.running = False - self.stop_event.set() # 设置事件,中断等待 - logger.debug("心跳线程已收到停止信号") + # 处理响应 + if 200 <= response.status_code < 300: + # 成功 + logger.debug(f"心跳发送成功,状态码: {response.status_code}") + elif response.status_code == 403: + # 403 Forbidden + logger.error( + "心跳发送失败,403 Forbidden: 可能是UUID无效或未注册。" + "处理措施:重置UUID,下次发送心跳时将尝试重新注册。" + ) + self.client_uuid = None + del local_storage["mmc_uuid"] # 删除本地存储的UUID + else: + # 其他错误 + logger.error(f"心跳发送失败,状态码: {response.status_code}, 响应内容: {response.text}") + except requests.RequestException as e: + logger.error(f"心跳发送失败: {e}") -def main(): - if global_config.remote_enable: - """主函数,启动心跳线程""" - # 配置 - server_url = "http://hyybuth.xyz:10058" - # server_url = "http://localhost:10058" - heartbeat_interval = 300 # 5分钟(秒) + async def run(self): + # 发送心跳 + if global_config.remote_enable: + if self.client_uuid is None: + if not await self._req_uuid(): + logger.error("获取UUID失败,跳过此次心跳") + return - # 创建并启动心跳线程 - heartbeat_thread = HeartbeatThread(server_url, heartbeat_interval) - heartbeat_thread.start() - - return heartbeat_thread # 返回线程对象,便于外部控制 - return None - - -# --- 测试用例 --- -if __name__ == "__main__": - print("测试唯一ID生成:") - print("唯一ID:", get_unique_id()) + await self._send_heartbeat()