feat: 遥测模块重构
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -35,7 +35,6 @@ config/bot_config.toml
|
|||||||
config/bot_config.toml.bak
|
config/bot_config.toml.bak
|
||||||
config/lpmm_config.toml
|
config/lpmm_config.toml
|
||||||
config/lpmm_config.toml.bak
|
config/lpmm_config.toml.bak
|
||||||
src/plugins/remote/client_uuid.json
|
|
||||||
(测试版)麦麦生成人格.bat
|
(测试版)麦麦生成人格.bat
|
||||||
(临时版)麦麦开始学习.bat
|
(临时版)麦麦开始学习.bat
|
||||||
src/plugins/utils/statistic.py
|
src/plugins/utils/statistic.py
|
||||||
|
|||||||
@@ -3,9 +3,10 @@ import time
|
|||||||
|
|
||||||
from maim_message import MessageServer
|
from maim_message import MessageServer
|
||||||
|
|
||||||
|
from .plugins.remote.remote import TelemetryHeartBeatTask
|
||||||
from .manager.async_task_manager import async_task_manager
|
from .manager.async_task_manager import async_task_manager
|
||||||
from .plugins.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask
|
from .plugins.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask
|
||||||
from src.manager.mood_manager import logger, MoodPrintTask, MoodUpdateTask
|
from .manager.mood_manager import MoodPrintTask, MoodUpdateTask
|
||||||
from .plugins.schedule.schedule_generator import bot_schedule
|
from .plugins.schedule.schedule_generator import bot_schedule
|
||||||
from .plugins.emoji_system.emoji_manager import emoji_manager
|
from .plugins.emoji_system.emoji_manager import emoji_manager
|
||||||
from .plugins.person_info.person_info import person_info_manager
|
from .plugins.person_info.person_info import person_info_manager
|
||||||
@@ -18,7 +19,6 @@ from .plugins.storage.storage import MessageStorage
|
|||||||
from .config.config import global_config
|
from .config.config import global_config
|
||||||
from .plugins.chat.bot import chat_bot
|
from .plugins.chat.bot import chat_bot
|
||||||
from .common.logger_manager import get_logger
|
from .common.logger_manager import get_logger
|
||||||
from .plugins.remote import heartbeat_thread # noqa: F401
|
|
||||||
from .individuality.individuality import Individuality
|
from .individuality.individuality import Individuality
|
||||||
from .common.server import global_server, Server
|
from .common.server import global_server, Server
|
||||||
from rich.traceback import install
|
from rich.traceback import install
|
||||||
@@ -59,6 +59,9 @@ class MainSystem:
|
|||||||
# 添加统计信息输出任务
|
# 添加统计信息输出任务
|
||||||
await async_task_manager.add_task(StatisticOutputTask())
|
await async_task_manager.add_task(StatisticOutputTask())
|
||||||
|
|
||||||
|
# 添加遥测心跳任务
|
||||||
|
await async_task_manager.add_task(TelemetryHeartBeatTask())
|
||||||
|
|
||||||
# 启动API服务器
|
# 启动API服务器
|
||||||
start_api_server()
|
start_api_server()
|
||||||
logger.success("API服务器启动成功")
|
logger.success("API服务器启动成功")
|
||||||
|
|||||||
@@ -1,4 +0,0 @@
|
|||||||
from .remote import main
|
|
||||||
|
|
||||||
# 启动心跳线程
|
|
||||||
heartbeat_thread = main()
|
|
||||||
@@ -1,248 +1,142 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import time
|
|
||||||
import uuid
|
|
||||||
import platform
|
import platform
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import threading
|
|
||||||
import subprocess
|
|
||||||
|
|
||||||
# from loguru import logger
|
# from loguru import logger
|
||||||
from src.common.logger_manager import get_logger
|
from src.common.logger_manager import get_logger
|
||||||
from src.config.config import global_config
|
from src.config.config import global_config
|
||||||
|
from src.manager.async_task_manager import AsyncTask
|
||||||
|
from src.manager.local_store_manager import local_storage
|
||||||
|
|
||||||
logger = get_logger("remote")
|
logger = get_logger("remote")
|
||||||
|
|
||||||
# --- 使用向上导航的方式定义路径 ---
|
TELEMETRY_SERVER_URL = "http://localhost:8080"
|
||||||
|
"""遥测服务地址"""
|
||||||
# 1. 获取当前文件 (remote.py) 所在的目录
|
|
||||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
||||||
|
|
||||||
# 2. 从当前目录向上导航三级找到项目根目录
|
|
||||||
# (src/plugins/remote/ -> src/plugins/ -> src/ -> project_root)
|
|
||||||
root_dir = os.path.abspath(os.path.join(current_dir, "..", "..", ".."))
|
|
||||||
|
|
||||||
# 3. 定义 data 目录的路径 (位于项目根目录下)
|
|
||||||
data_dir = os.path.join(root_dir, "data")
|
|
||||||
|
|
||||||
# 4. 定义 UUID 文件在 data 目录下的完整路径
|
|
||||||
UUID_FILE = os.path.join(data_dir, "client_uuid.json")
|
|
||||||
|
|
||||||
# --- 路径定义结束 ---
|
|
||||||
|
|
||||||
|
|
||||||
# 生成或获取客户端唯一ID
|
class TelemetryHeartBeatTask(AsyncTask):
|
||||||
def get_unique_id():
|
HEARTBEAT_INTERVAL = 300
|
||||||
# --- 在尝试读写 UUID_FILE 之前确保 data 目录存在 ---
|
|
||||||
# 将目录检查和创建逻辑移到这里,在首次需要写入前执行
|
|
||||||
try:
|
|
||||||
# exist_ok=True 意味着如果目录已存在也不会报错
|
|
||||||
os.makedirs(data_dir, exist_ok=True)
|
|
||||||
except OSError as e:
|
|
||||||
# 处理可能的权限错误等
|
|
||||||
logger.error(f"无法创建数据目录 {data_dir}: {e}")
|
|
||||||
# 根据你的错误处理逻辑,可能需要在这里返回错误或抛出异常
|
|
||||||
# 暂且返回 None 或抛出,避免继续执行导致问题
|
|
||||||
raise RuntimeError(f"无法创建必要的数据目录 {data_dir}") from e
|
|
||||||
# --- 目录检查结束 ---
|
|
||||||
|
|
||||||
# 检查是否已经有保存的UUID
|
def __init__(self):
|
||||||
if os.path.exists(UUID_FILE):
|
super().__init__(task_name="Telemetry Heart Beat Task", run_interval=self.HEARTBEAT_INTERVAL)
|
||||||
try:
|
self.server_url = TELEMETRY_SERVER_URL
|
||||||
with open(UUID_FILE, "r", encoding="utf-8") as f: # 指定 encoding
|
"""遥测服务地址"""
|
||||||
data = json.load(f)
|
|
||||||
if "client_id" in data:
|
|
||||||
logger.debug(f"从本地文件读取客户端ID: {UUID_FILE}")
|
|
||||||
return data["client_id"]
|
|
||||||
except (json.JSONDecodeError, IOError) as e:
|
|
||||||
logger.warning(f"读取UUID文件 {UUID_FILE} 出错: {e},将生成新的UUID")
|
|
||||||
except Exception as e: # 捕捉其他可能的异常
|
|
||||||
logger.error(f"读取UUID文件 {UUID_FILE} 时发生未知错误: {e}")
|
|
||||||
|
|
||||||
# 如果没有保存的UUID或读取出错,则生成新的
|
self.client_uuid = local_storage["mmc_uuid"] if "mmc_uuid" in local_storage else None
|
||||||
client_id = generate_unique_id()
|
"""客户端UUID"""
|
||||||
logger.info(f"生成新的客户端ID: {client_id}")
|
|
||||||
|
|
||||||
# 保存UUID到文件
|
self.info_dict = self._get_sys_info()
|
||||||
try:
|
"""系统信息字典"""
|
||||||
# 再次确认目录存在 (虽然理论上前面已创建,但更保险)
|
|
||||||
os.makedirs(data_dir, exist_ok=True)
|
|
||||||
with open(UUID_FILE, "w", encoding="utf-8") as f: # 指定 encoding
|
|
||||||
json.dump({"client_id": client_id}, f, indent=4) # 添加 indent 使json可读
|
|
||||||
logger.info(f"已保存新生成的客户端ID到本地文件: {UUID_FILE}")
|
|
||||||
except IOError as e:
|
|
||||||
logger.error(f"保存UUID时出错: {UUID_FILE} - {e}")
|
|
||||||
except Exception as e: # 捕捉其他可能的异常
|
|
||||||
logger.error(f"保存UUID文件 {UUID_FILE} 时发生未知错误: {e}")
|
|
||||||
|
|
||||||
return client_id
|
@staticmethod
|
||||||
|
def _get_sys_info() -> dict[str, str]:
|
||||||
|
"""获取系统信息"""
|
||||||
|
info_dict = {
|
||||||
|
"os_type": "Unknown",
|
||||||
|
"py_version": platform.python_version(),
|
||||||
|
"mmc_version": global_config.MAI_VERSION,
|
||||||
|
}
|
||||||
|
|
||||||
|
match platform.system():
|
||||||
|
case "Windows":
|
||||||
|
info_dict["os_type"] = "Windows"
|
||||||
|
case "Linux":
|
||||||
|
info_dict["os_type"] = "Linux"
|
||||||
|
case "Darwin":
|
||||||
|
info_dict["os_type"] = "macOS"
|
||||||
|
case _:
|
||||||
|
info_dict["os_type"] = "Unknown"
|
||||||
|
|
||||||
# 生成客户端唯一ID
|
return info_dict
|
||||||
def generate_unique_id():
|
|
||||||
# 基于机器码生成唯一ID,同一台机器上生成的UUID是固定的,只要机器码不变
|
|
||||||
import hashlib
|
|
||||||
|
|
||||||
system_info = platform.system()
|
async def _req_uuid(self) -> bool:
|
||||||
machine_code = None
|
"""
|
||||||
|
向服务端请求UUID(不应在已存在UUID的情况下调用,会覆盖原有的UUID)
|
||||||
|
"""
|
||||||
|
|
||||||
try:
|
if "deploy_time" not in local_storage:
|
||||||
if system_info == "Windows":
|
logger.error("本地存储中缺少部署时间,无法请求UUID")
|
||||||
# 使用wmic命令获取主机UUID(更稳定)
|
|
||||||
result = subprocess.check_output(
|
|
||||||
"wmic csproduct get uuid", shell=True, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL
|
|
||||||
)
|
|
||||||
lines = result.decode(errors="ignore").splitlines()
|
|
||||||
# 过滤掉空行和表头,只取有效UUID
|
|
||||||
uuids = [line.strip() for line in lines if line.strip() and line.strip().lower() != "uuid"]
|
|
||||||
if uuids:
|
|
||||||
uuid_val = uuids[0]
|
|
||||||
# logger.debug(f"主机UUID: {uuid_val}")
|
|
||||||
# 增加无效值判断
|
|
||||||
if uuid_val and uuid_val.lower() not in ["to be filled by o.e.m.", "none", "", "standard"]:
|
|
||||||
machine_code = uuid_val
|
|
||||||
elif system_info == "Linux":
|
|
||||||
# 优先读取 /etc/machine-id,其次 /var/lib/dbus/machine-id,取第一个非空且内容有效的
|
|
||||||
for path in ["/etc/machine-id", "/var/lib/dbus/machine-id"]:
|
|
||||||
if os.path.exists(path):
|
|
||||||
with open(path, "r") as f:
|
|
||||||
code = f.read().strip()
|
|
||||||
# 只要内容非空且不是全0
|
|
||||||
if code and set(code) != {"0"}:
|
|
||||||
machine_code = code
|
|
||||||
break
|
|
||||||
elif system_info == "Darwin":
|
|
||||||
# macOS: 使用IOPlatformUUID
|
|
||||||
result = subprocess.check_output(
|
|
||||||
"ioreg -rd1 -c IOPlatformExpertDevice | awk '/IOPlatformUUID/'", shell=True
|
|
||||||
)
|
|
||||||
uuid_line = result.decode(errors="ignore")
|
|
||||||
# 解析出 "IOPlatformUUID" = "xxxx-xxxx-xxxx-xxxx"
|
|
||||||
import re
|
|
||||||
|
|
||||||
m = re.search(r'"IOPlatformUUID"\s*=\s*"([^"]+)"', uuid_line)
|
|
||||||
if m:
|
|
||||||
uuid_val = m.group(1)
|
|
||||||
logger.debug(f"IOPlatformUUID: {uuid_val}")
|
|
||||||
if uuid_val and uuid_val.lower() not in ["to be filled by o.e.m.", "none", "", "standard"]:
|
|
||||||
machine_code = uuid_val
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"获取机器码失败: {e}")
|
|
||||||
|
|
||||||
# 如果主板序列号无效,尝试用MAC地址
|
|
||||||
if not machine_code:
|
|
||||||
try:
|
|
||||||
mac = uuid.getnode()
|
|
||||||
if (mac >> 40) % 2 == 0: # 不是本地伪造MAC
|
|
||||||
machine_code = str(mac)
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"获取MAC地址失败: {e}")
|
|
||||||
|
|
||||||
def md5_to_uuid(md5hex):
|
|
||||||
# 将32位md5字符串格式化为8-4-4-4-12的UUID格式
|
|
||||||
return f"{md5hex[0:8]}-{md5hex[8:12]}-{md5hex[12:16]}-{md5hex[16:20]}-{md5hex[20:32]}"
|
|
||||||
|
|
||||||
if machine_code:
|
|
||||||
# print(f"machine_code={machine_code!r}") # 可用于调试
|
|
||||||
md5 = hashlib.md5(machine_code.encode("utf-8")).hexdigest()
|
|
||||||
uuid_str = md5_to_uuid(md5)
|
|
||||||
else:
|
|
||||||
uuid_str = str(uuid.uuid4())
|
|
||||||
|
|
||||||
unique_id = f"{system_info}-{uuid_str}"
|
|
||||||
return unique_id
|
|
||||||
|
|
||||||
|
|
||||||
def send_heartbeat(server_url, client_id):
|
|
||||||
"""向服务器发送心跳"""
|
|
||||||
sys = platform.system()
|
|
||||||
try:
|
|
||||||
headers = {"Client-ID": client_id, "User-Agent": f"HeartbeatClient/{client_id[:8]}"}
|
|
||||||
data = json.dumps(
|
|
||||||
{"system": sys, "Version": global_config.MAI_VERSION},
|
|
||||||
)
|
|
||||||
logger.debug(f"正在发送心跳到服务器: {server_url}")
|
|
||||||
logger.debug(f"心跳数据: {data}")
|
|
||||||
response = requests.post(f"{server_url}/api/clients", headers=headers, data=data)
|
|
||||||
|
|
||||||
if response.status_code == 201:
|
|
||||||
data = response.json()
|
|
||||||
logger.debug(f"心跳发送成功。服务器响应: {data}")
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logger.debug(f"心跳发送失败。状态码: {response.status_code}, 响应内容: {response.text}")
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
try_count: int = 0
|
||||||
|
while True:
|
||||||
|
# 如果不存在,则向服务端请求一个新的UUID(注册客户端)
|
||||||
|
logger.info("正在向遥测服务端请求UUID...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
f"{TELEMETRY_SERVER_URL}/stat/reg_client",
|
||||||
|
json={"deploy_time": local_storage["deploy_time"]},
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
client_id = data.get("mmc_uuid")
|
||||||
|
if client_id:
|
||||||
|
# 将UUID存储到本地
|
||||||
|
local_storage["mmc_uuid"] = client_id
|
||||||
|
self.client_uuid = client_id
|
||||||
|
logger.info(f"成功获取UUID: {self.client_uuid}")
|
||||||
|
return True # 成功获取UUID,返回True
|
||||||
|
else:
|
||||||
|
logger.error("无效的服务端响应")
|
||||||
|
else:
|
||||||
|
logger.error(f"请求UUID失败,状态码: {response.status_code}, 响应内容: {response.text}")
|
||||||
|
except requests.RequestException as e:
|
||||||
|
logger.error(f"请求UUID时出错: {e}") # 可能是网络问题
|
||||||
|
|
||||||
|
# 请求失败,重试次数+1
|
||||||
|
try_count += 1
|
||||||
|
if try_count > 3:
|
||||||
|
# 如果超过3次仍然失败,则退出
|
||||||
|
logger.error("获取UUID失败,请检查网络连接或服务端状态")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
# 如果可以重试,等待后继续(指数退避)
|
||||||
|
await asyncio.sleep(4**try_count)
|
||||||
|
|
||||||
|
async def _send_heartbeat(self):
|
||||||
|
"""向服务器发送心跳"""
|
||||||
|
try:
|
||||||
|
headers = {
|
||||||
|
"Client-UUID": self.client_uuid,
|
||||||
|
"User-Agent": f"HeartbeatClient/{self.client_uuid[:8]}",
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(f"正在发送心跳到服务器: {self.server_url}")
|
||||||
|
|
||||||
|
response = requests.post(
|
||||||
|
f"{self.server_url}/stat/client_heartbeat",
|
||||||
|
headers=headers,
|
||||||
|
json=self.info_dict,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 处理响应
|
||||||
|
if 200 <= response.status_code < 300:
|
||||||
|
# 成功
|
||||||
|
logger.debug(f"心跳发送成功,状态码: {response.status_code}")
|
||||||
|
elif response.status_code == 403:
|
||||||
|
# 403 Forbidden
|
||||||
|
logger.error(
|
||||||
|
"心跳发送失败,403 Forbidden: 可能是UUID无效或未注册。"
|
||||||
|
"处理措施:重置UUID,下次发送心跳时将尝试重新注册。"
|
||||||
|
)
|
||||||
|
self.client_uuid = None
|
||||||
|
del local_storage["mmc_uuid"] # 删除本地存储的UUID
|
||||||
|
else:
|
||||||
|
# 其他错误
|
||||||
|
logger.error(f"心跳发送失败,状态码: {response.status_code}, 响应内容: {response.text}")
|
||||||
|
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
# 如果请求异常,可能是网络问题,不记录错误
|
logger.error(f"心跳发送失败: {e}")
|
||||||
logger.debug(f"发送心跳时出错: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
class HeartbeatThread(threading.Thread):
|
|
||||||
"""心跳线程类"""
|
|
||||||
|
|
||||||
def __init__(self, server_url, interval):
|
|
||||||
super().__init__(daemon=True) # 设置为守护线程,主程序结束时自动结束
|
|
||||||
self.server_url = server_url
|
|
||||||
self.interval = interval
|
|
||||||
self.client_id = get_unique_id()
|
|
||||||
self.running = True
|
|
||||||
self.stop_event = threading.Event() # 添加事件对象用于可中断的等待
|
|
||||||
self.last_heartbeat_time = 0 # 记录上次发送心跳的时间
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""线程运行函数"""
|
|
||||||
logger.debug(f"心跳线程已启动,客户端ID: {self.client_id}")
|
|
||||||
|
|
||||||
while self.running:
|
|
||||||
# 发送心跳
|
# 发送心跳
|
||||||
if send_heartbeat(self.server_url, self.client_id):
|
|
||||||
logger.info(f"{self.interval}秒后发送下一次心跳...")
|
|
||||||
else:
|
|
||||||
logger.info(f"{self.interval}秒后重试...")
|
|
||||||
|
|
||||||
self.last_heartbeat_time = time.time()
|
|
||||||
|
|
||||||
# 使用可中断的等待代替 sleep
|
|
||||||
# 每秒检查一次是否应该停止或发送心跳
|
|
||||||
remaining_wait = self.interval
|
|
||||||
while remaining_wait > 0 and self.running:
|
|
||||||
# 每次最多等待1秒,便于及时响应停止请求
|
|
||||||
wait_time = min(1, remaining_wait)
|
|
||||||
if self.stop_event.wait(wait_time):
|
|
||||||
break # 如果事件被设置,立即退出等待
|
|
||||||
remaining_wait -= wait_time
|
|
||||||
|
|
||||||
# 检查是否由于外部原因导致间隔异常延长
|
|
||||||
if time.time() - self.last_heartbeat_time >= self.interval * 1.5:
|
|
||||||
logger.warning("检测到心跳间隔异常延长,立即发送心跳")
|
|
||||||
break
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""停止线程"""
|
|
||||||
self.running = False
|
|
||||||
self.stop_event.set() # 设置事件,中断等待
|
|
||||||
logger.debug("心跳线程已收到停止信号")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if global_config.remote_enable:
|
if global_config.remote_enable:
|
||||||
"""主函数,启动心跳线程"""
|
if self.client_uuid is None:
|
||||||
# 配置
|
if not await self._req_uuid():
|
||||||
server_url = "http://hyybuth.xyz:10058"
|
logger.error("获取UUID失败,跳过此次心跳")
|
||||||
# server_url = "http://localhost:10058"
|
return
|
||||||
heartbeat_interval = 300 # 5分钟(秒)
|
|
||||||
|
|
||||||
# 创建并启动心跳线程
|
await self._send_heartbeat()
|
||||||
heartbeat_thread = HeartbeatThread(server_url, heartbeat_interval)
|
|
||||||
heartbeat_thread.start()
|
|
||||||
|
|
||||||
return heartbeat_thread # 返回线程对象,便于外部控制
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# --- 测试用例 ---
|
|
||||||
if __name__ == "__main__":
|
|
||||||
print("测试唯一ID生成:")
|
|
||||||
print("唯一ID:", get_unique_id())
|
|
||||||
|
|||||||
Reference in New Issue
Block a user