diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index fc251f4ff..30e0e83da 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -344,7 +344,7 @@ class ChatManager: async def load_all_streams(self): """从数据库加载所有聊天流""" - + logger.info("正在从数据库加载所有聊天流") def _db_load_all_streams_sync(): loaded_streams_data = [] for model_instance in ChatStreams.select(): diff --git a/src/chat/utils/statistic.py b/src/chat/utils/statistic.py index 68a70591c..81f820b2c 100644 --- a/src/chat/utils/statistic.py +++ b/src/chat/utils/statistic.py @@ -1,6 +1,8 @@ from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Dict, Tuple, List +import asyncio +import concurrent.futures from src.common.logger import get_logger @@ -185,18 +187,83 @@ class StatisticOutputTask(AsyncTask): logger.info("\n" + "\n".join(output)) async def run(self): - try: + try: now = datetime.now() - # 收集统计数据 - stats = self._collect_all_statistics(now) - - # 输出统计数据到控制台 - self._statistic_console_output(stats, now) - # 输出统计数据到html文件 - self._generate_html_report(stats, now) + + # 使用线程池并行执行耗时操作 + loop = asyncio.get_event_loop() + + # 在线程池中并行执行数据收集和之前的HTML生成(如果存在) + with concurrent.futures.ThreadPoolExecutor() as executor: + logger.info("正在收集统计数据...") + + # 数据收集任务 + collect_task = loop.run_in_executor( + executor, self._collect_all_statistics, now + ) + + # 等待数据收集完成 + stats = await collect_task + logger.info("统计数据收集完成") + + # 并行执行控制台输出和HTML报告生成 + console_task = loop.run_in_executor( + executor, self._statistic_console_output, stats, now + ) + html_task = loop.run_in_executor( + executor, self._generate_html_report, stats, now + ) + + # 等待两个输出任务完成 + await asyncio.gather(console_task, html_task) + + logger.info("统计数据输出完成") except Exception as e: logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}") + async def run_async_background(self): + """ + 备选方案:完全异步后台运行统计输出 + 使用此方法可以让统计任务完全非阻塞 + """ + async def _async_collect_and_output(): + try: + import concurrent.futures + + now = datetime.now() + loop = asyncio.get_event_loop() + + with concurrent.futures.ThreadPoolExecutor() as executor: + logger.info("正在后台收集统计数据...") + + # 创建后台任务,不等待完成 + collect_task = asyncio.create_task( + loop.run_in_executor(executor, self._collect_all_statistics, now) + ) + + stats = await collect_task + logger.info("统计数据收集完成") + + # 创建并发的输出任务 + output_tasks = [ + asyncio.create_task( + loop.run_in_executor(executor, self._statistic_console_output, stats, now) + ), + asyncio.create_task( + loop.run_in_executor(executor, self._generate_html_report, stats, now) + ) + ] + + # 等待所有输出任务完成 + await asyncio.gather(*output_tasks) + + logger.info("统计数据后台输出完成") + except Exception as e: + logger.exception(f"后台统计数据输出过程中发生异常:{e}") + + # 创建后台任务,立即返回 + asyncio.create_task(_async_collect_and_output()) + # -- 以下为统计数据收集方法 -- @staticmethod @@ -1148,3 +1215,97 @@ class StatisticOutputTask(AsyncTask): """ + + +class AsyncStatisticOutputTask(AsyncTask): + """完全异步的统计输出任务 - 更高性能版本""" + + def __init__(self, record_file_path: str = "maibot_statistics.html"): + # 延迟0秒启动,运行间隔300秒 + super().__init__(task_name="Async Statistics Data Output Task", wait_before_start=0, run_interval=300) + + # 直接复用 StatisticOutputTask 的初始化逻辑 + temp_stat_task = StatisticOutputTask(record_file_path) + self.name_mapping = temp_stat_task.name_mapping + self.record_file_path = temp_stat_task.record_file_path + self.stat_period = temp_stat_task.stat_period + + async def run(self): + """完全异步执行统计任务""" + async def _async_collect_and_output(): + try: + now = datetime.now() + loop = asyncio.get_event_loop() + + with concurrent.futures.ThreadPoolExecutor() as executor: + logger.info("正在后台收集统计数据...") + + # 数据收集任务 + collect_task = asyncio.create_task( + loop.run_in_executor(executor, self._collect_all_statistics, now) + ) + + stats = await collect_task + logger.info("统计数据收集完成") + + # 创建并发的输出任务 + output_tasks = [ + asyncio.create_task( + loop.run_in_executor(executor, self._statistic_console_output, stats, now) + ), + asyncio.create_task( + loop.run_in_executor(executor, self._generate_html_report, stats, now) + ) + ] + + # 等待所有输出任务完成 + await asyncio.gather(*output_tasks) + + logger.info("统计数据后台输出完成") + except Exception as e: + logger.exception(f"后台统计数据输出过程中发生异常:{e}") + + # 创建后台任务,立即返回 + asyncio.create_task(_async_collect_and_output()) + + # 复用 StatisticOutputTask 的所有方法 + def _collect_all_statistics(self, now: datetime): + return StatisticOutputTask._collect_all_statistics(self, now) + + def _statistic_console_output(self, stats: Dict[str, Any], now: datetime): + return StatisticOutputTask._statistic_console_output(self, stats, now) + + def _generate_html_report(self, stats: dict[str, Any], now: datetime): + return StatisticOutputTask._generate_html_report(self, stats, now) + + # 其他需要的方法也可以类似复用... + @staticmethod + def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: + return StatisticOutputTask._collect_model_request_for_period(collect_period) + + @staticmethod + def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]: + return StatisticOutputTask._collect_online_time_for_period(collect_period, now) + + def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]: + return StatisticOutputTask._collect_message_count_for_period(self, collect_period) + + @staticmethod + def _format_total_stat(stats: Dict[str, Any]) -> str: + return StatisticOutputTask._format_total_stat(stats) + + @staticmethod + def _format_model_classified_stat(stats: Dict[str, Any]) -> str: + return StatisticOutputTask._format_model_classified_stat(stats) + + def _format_chat_stat(self, stats: Dict[str, Any]) -> str: + return StatisticOutputTask._format_chat_stat(self, stats) + + def _generate_chart_data(self, stat: dict[str, Any]) -> dict: + return StatisticOutputTask._generate_chart_data(self, stat) + + def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict: + return StatisticOutputTask._collect_interval_data(self, now, hours, interval_minutes) + + def _generate_chart_tab(self, chart_data: dict) -> str: + return StatisticOutputTask._generate_chart_tab(self, chart_data) diff --git a/src/common/database/database.py b/src/common/database/database.py index a2dab739d..59fcfac7c 100644 --- a/src/common/database/database.py +++ b/src/common/database/database.py @@ -69,4 +69,14 @@ _DB_FILE = os.path.join(_DB_DIR, "MaiBot.db") os.makedirs(_DB_DIR, exist_ok=True) # 全局 Peewee SQLite 数据库访问点 -db = SqliteDatabase(_DB_FILE) +db = SqliteDatabase( + _DB_FILE, + pragmas={ + 'journal_mode': 'wal', # WAL模式提高并发性能 + 'cache_size': -64 * 1000, # 64MB缓存 + 'foreign_keys': 1, + 'ignore_check_constraints': 0, + 'synchronous': 0, # 异步写入提高性能 + 'busy_timeout': 1000, # 1秒超时而不是3秒 + } +) diff --git a/src/individuality/individuality.py b/src/individuality/individuality.py index a3174e76a..02a5a1fc6 100644 --- a/src/individuality/individuality.py +++ b/src/individuality/individuality.py @@ -44,9 +44,11 @@ class Individuality: personality_sides: 人格侧面描述 identity_detail: 身份细节描述 """ + logger.info("正在初始化个体特征") person_info_manager = get_person_info_manager() self.bot_person_id = person_info_manager.get_person_id("system", "bot_id") self.name = bot_nickname + # 检查配置变化,如果变化则清空 await self._check_config_and_clear_if_changed( @@ -61,6 +63,8 @@ class Individuality: # 初始化身份 self.identity = Identity(identity_detail=identity_detail) + + logger.info("正在将所有人设写入impression") # 将所有人设写入impression impression_parts = [] if personality_core: @@ -69,6 +73,7 @@ class Individuality: impression_parts.append(f"人格侧面: {'、'.join(personality_sides)}") if identity_detail: impression_parts.append(f"身份: {'、'.join(identity_detail)}") + logger.info(f"impression_parts: {impression_parts}") impression_text = "。".join(impression_parts) if impression_text: diff --git a/src/main.py b/src/main.py index 3857df0dc..1021f15fa 100644 --- a/src/main.py +++ b/src/main.py @@ -93,13 +93,20 @@ class MainSystem: # 添加情绪打印任务 await async_task_manager.add_task(MoodPrintTask()) + logger.info("情绪管理器初始化成功") + # 启动愿望管理器 await willing_manager.async_task_starter() + logger.info("willing管理器初始化成功") + # 初始化聊天管理器 + await get_chat_manager()._initialize() asyncio.create_task(get_chat_manager()._auto_save_task()) + logger.info("聊天管理器初始化成功") + # 根据配置条件性地初始化记忆系统 if global_config.memory.enable_memory: if self.hippocampus_manager: diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index 901a8723a..00961bb5f 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -61,6 +61,12 @@ class PersonInfoManager: ) try: db.connect(reuse_if_open=True) + # 设置连接池参数 + if hasattr(db, 'execute_sql'): + # 设置SQLite优化参数 + db.execute_sql('PRAGMA cache_size = -64000') # 64MB缓存 + db.execute_sql('PRAGMA temp_store = memory') # 临时存储在内存中 + db.execute_sql('PRAGMA mmap_size = 268435456') # 256MB内存映射 db.create_tables([PersonInfo], safe=True) except Exception as e: logger.error(f"数据库连接或 PersonInfo 表创建失败: {e}") @@ -159,34 +165,49 @@ class PersonInfoManager: async def update_one_field(self, person_id: str, field_name: str, value, data: dict = None): """更新某一个字段,会补全""" if field_name not in PersonInfo._meta.fields: - # if field_name in person_info_default: # Keep this check if some defaults are not DB fields - # logger.debug(f"更新'{field_name}'跳过,字段存在于默认配置但不在 PersonInfo Peewee 模型中。") - # return + logger.debug(f"更新'{field_name}'失败,未在 PersonInfo Peewee 模型中定义的字段。") return - # print(f"更新字段: {field_name},值: {value}") - processed_value = value if field_name in JSON_SERIALIZED_FIELDS: if isinstance(value, (list, dict)): processed_value = json.dumps(value, ensure_ascii=False, indent=None) elif value is None: # Store None as "[]" for JSON list fields processed_value = json.dumps([], ensure_ascii=False, indent=None) - # If value is already a string, assume it's pre-serialized or a non-JSON string. def _db_update_sync(p_id: str, f_name: str, val_to_set): - record = PersonInfo.get_or_none(PersonInfo.person_id == p_id) - if record: - setattr(record, f_name, val_to_set) - record.save() - return True, False # Found and updated, no creation needed - return False, True # Not found, needs creation + import time + start_time = time.time() + try: + record = PersonInfo.get_or_none(PersonInfo.person_id == p_id) + query_time = time.time() + + if record: + setattr(record, f_name, val_to_set) + record.save() + save_time = time.time() + + total_time = save_time - start_time + if total_time > 0.5: # 如果超过500ms就记录日志 + logger.warning(f"数据库更新操作耗时 {total_time:.3f}秒 (查询: {query_time-start_time:.3f}s, 保存: {save_time-query_time:.3f}s) person_id={p_id}, field={f_name}") + + return True, False # Found and updated, no creation needed + else: + total_time = time.time() - start_time + if total_time > 0.5: + logger.warning(f"数据库查询操作耗时 {total_time:.3f}秒 person_id={p_id}, field={f_name}") + return False, True # Not found, needs creation + except Exception as e: + total_time = time.time() - start_time + logger.error(f"数据库操作异常,耗时 {total_time:.3f}秒: {e}") + raise + found, needs_creation = await asyncio.to_thread(_db_update_sync, person_id, field_name, processed_value) if needs_creation: - logger.debug(f"更新时 {person_id} 不存在,将新建。") + logger.info(f"{person_id} 不存在,将新建。") creation_data = data if data is not None else {} # Ensure platform and user_id are present for context if available from 'data' # but primarily, set the field that triggered the update.