fix:统计和person_info现已成为异步,巨爽

This commit is contained in:
SengokuCola
2025-06-22 17:13:43 +08:00
parent ca47144fd9
commit ce50f59c0a
6 changed files with 227 additions and 23 deletions

View File

@@ -344,7 +344,7 @@ class ChatManager:
async def load_all_streams(self): async def load_all_streams(self):
"""从数据库加载所有聊天流""" """从数据库加载所有聊天流"""
logger.info("正在从数据库加载所有聊天流")
def _db_load_all_streams_sync(): def _db_load_all_streams_sync():
loaded_streams_data = [] loaded_streams_data = []
for model_instance in ChatStreams.select(): for model_instance in ChatStreams.select():

View File

@@ -1,6 +1,8 @@
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, Tuple, List from typing import Any, Dict, Tuple, List
import asyncio
import concurrent.futures
from src.common.logger import get_logger from src.common.logger import get_logger
@@ -187,16 +189,81 @@ class StatisticOutputTask(AsyncTask):
async def run(self): async def run(self):
try: try:
now = datetime.now() now = datetime.now()
# 收集统计数据
stats = self._collect_all_statistics(now)
# 输出统计数据到控制台 # 使用线程池并行执行耗时操作
self._statistic_console_output(stats, now) loop = asyncio.get_event_loop()
# 输出统计数据到html文件
self._generate_html_report(stats, now) # 在线程池中并行执行数据收集和之前的HTML生成如果存在
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("正在收集统计数据...")
# 数据收集任务
collect_task = loop.run_in_executor(
executor, self._collect_all_statistics, now
)
# 等待数据收集完成
stats = await collect_task
logger.info("统计数据收集完成")
# 并行执行控制台输出和HTML报告生成
console_task = loop.run_in_executor(
executor, self._statistic_console_output, stats, now
)
html_task = loop.run_in_executor(
executor, self._generate_html_report, stats, now
)
# 等待两个输出任务完成
await asyncio.gather(console_task, html_task)
logger.info("统计数据输出完成")
except Exception as e: except Exception as e:
logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}") logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}")
async def run_async_background(self):
"""
备选方案:完全异步后台运行统计输出
使用此方法可以让统计任务完全非阻塞
"""
async def _async_collect_and_output():
try:
import concurrent.futures
now = datetime.now()
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("正在后台收集统计数据...")
# 创建后台任务,不等待完成
collect_task = asyncio.create_task(
loop.run_in_executor(executor, self._collect_all_statistics, now)
)
stats = await collect_task
logger.info("统计数据收集完成")
# 创建并发的输出任务
output_tasks = [
asyncio.create_task(
loop.run_in_executor(executor, self._statistic_console_output, stats, now)
),
asyncio.create_task(
loop.run_in_executor(executor, self._generate_html_report, stats, now)
)
]
# 等待所有输出任务完成
await asyncio.gather(*output_tasks)
logger.info("统计数据后台输出完成")
except Exception as e:
logger.exception(f"后台统计数据输出过程中发生异常:{e}")
# 创建后台任务,立即返回
asyncio.create_task(_async_collect_and_output())
# -- 以下为统计数据收集方法 -- # -- 以下为统计数据收集方法 --
@staticmethod @staticmethod
@@ -1148,3 +1215,97 @@ class StatisticOutputTask(AsyncTask):
</script> </script>
</div> </div>
""" """
class AsyncStatisticOutputTask(AsyncTask):
"""完全异步的统计输出任务 - 更高性能版本"""
def __init__(self, record_file_path: str = "maibot_statistics.html"):
# 延迟0秒启动运行间隔300秒
super().__init__(task_name="Async Statistics Data Output Task", wait_before_start=0, run_interval=300)
# 直接复用 StatisticOutputTask 的初始化逻辑
temp_stat_task = StatisticOutputTask(record_file_path)
self.name_mapping = temp_stat_task.name_mapping
self.record_file_path = temp_stat_task.record_file_path
self.stat_period = temp_stat_task.stat_period
async def run(self):
"""完全异步执行统计任务"""
async def _async_collect_and_output():
try:
now = datetime.now()
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("正在后台收集统计数据...")
# 数据收集任务
collect_task = asyncio.create_task(
loop.run_in_executor(executor, self._collect_all_statistics, now)
)
stats = await collect_task
logger.info("统计数据收集完成")
# 创建并发的输出任务
output_tasks = [
asyncio.create_task(
loop.run_in_executor(executor, self._statistic_console_output, stats, now)
),
asyncio.create_task(
loop.run_in_executor(executor, self._generate_html_report, stats, now)
)
]
# 等待所有输出任务完成
await asyncio.gather(*output_tasks)
logger.info("统计数据后台输出完成")
except Exception as e:
logger.exception(f"后台统计数据输出过程中发生异常:{e}")
# 创建后台任务,立即返回
asyncio.create_task(_async_collect_and_output())
# 复用 StatisticOutputTask 的所有方法
def _collect_all_statistics(self, now: datetime):
return StatisticOutputTask._collect_all_statistics(self, now)
def _statistic_console_output(self, stats: Dict[str, Any], now: datetime):
return StatisticOutputTask._statistic_console_output(self, stats, now)
def _generate_html_report(self, stats: dict[str, Any], now: datetime):
return StatisticOutputTask._generate_html_report(self, stats, now)
# 其他需要的方法也可以类似复用...
@staticmethod
def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
return StatisticOutputTask._collect_model_request_for_period(collect_period)
@staticmethod
def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]:
return StatisticOutputTask._collect_online_time_for_period(collect_period, now)
def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
return StatisticOutputTask._collect_message_count_for_period(self, collect_period)
@staticmethod
def _format_total_stat(stats: Dict[str, Any]) -> str:
return StatisticOutputTask._format_total_stat(stats)
@staticmethod
def _format_model_classified_stat(stats: Dict[str, Any]) -> str:
return StatisticOutputTask._format_model_classified_stat(stats)
def _format_chat_stat(self, stats: Dict[str, Any]) -> str:
return StatisticOutputTask._format_chat_stat(self, stats)
def _generate_chart_data(self, stat: dict[str, Any]) -> dict:
return StatisticOutputTask._generate_chart_data(self, stat)
def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict:
return StatisticOutputTask._collect_interval_data(self, now, hours, interval_minutes)
def _generate_chart_tab(self, chart_data: dict) -> str:
return StatisticOutputTask._generate_chart_tab(self, chart_data)

View File

@@ -69,4 +69,14 @@ _DB_FILE = os.path.join(_DB_DIR, "MaiBot.db")
os.makedirs(_DB_DIR, exist_ok=True) os.makedirs(_DB_DIR, exist_ok=True)
# 全局 Peewee SQLite 数据库访问点 # 全局 Peewee SQLite 数据库访问点
db = SqliteDatabase(_DB_FILE) db = SqliteDatabase(
_DB_FILE,
pragmas={
'journal_mode': 'wal', # WAL模式提高并发性能
'cache_size': -64 * 1000, # 64MB缓存
'foreign_keys': 1,
'ignore_check_constraints': 0,
'synchronous': 0, # 异步写入提高性能
'busy_timeout': 1000, # 1秒超时而不是3秒
}
)

View File

@@ -44,10 +44,12 @@ class Individuality:
personality_sides: 人格侧面描述 personality_sides: 人格侧面描述
identity_detail: 身份细节描述 identity_detail: 身份细节描述
""" """
logger.info("正在初始化个体特征")
person_info_manager = get_person_info_manager() person_info_manager = get_person_info_manager()
self.bot_person_id = person_info_manager.get_person_id("system", "bot_id") self.bot_person_id = person_info_manager.get_person_id("system", "bot_id")
self.name = bot_nickname self.name = bot_nickname
# 检查配置变化,如果变化则清空 # 检查配置变化,如果变化则清空
await self._check_config_and_clear_if_changed( await self._check_config_and_clear_if_changed(
bot_nickname, personality_core, personality_sides, identity_detail bot_nickname, personality_core, personality_sides, identity_detail
@@ -61,6 +63,8 @@ class Individuality:
# 初始化身份 # 初始化身份
self.identity = Identity(identity_detail=identity_detail) self.identity = Identity(identity_detail=identity_detail)
logger.info("正在将所有人设写入impression")
# 将所有人设写入impression # 将所有人设写入impression
impression_parts = [] impression_parts = []
if personality_core: if personality_core:
@@ -69,6 +73,7 @@ class Individuality:
impression_parts.append(f"人格侧面: {''.join(personality_sides)}") impression_parts.append(f"人格侧面: {''.join(personality_sides)}")
if identity_detail: if identity_detail:
impression_parts.append(f"身份: {''.join(identity_detail)}") impression_parts.append(f"身份: {''.join(identity_detail)}")
logger.info(f"impression_parts: {impression_parts}")
impression_text = "".join(impression_parts) impression_text = "".join(impression_parts)
if impression_text: if impression_text:

View File

@@ -93,13 +93,20 @@ class MainSystem:
# 添加情绪打印任务 # 添加情绪打印任务
await async_task_manager.add_task(MoodPrintTask()) await async_task_manager.add_task(MoodPrintTask())
logger.info("情绪管理器初始化成功")
# 启动愿望管理器 # 启动愿望管理器
await willing_manager.async_task_starter() await willing_manager.async_task_starter()
logger.info("willing管理器初始化成功")
# 初始化聊天管理器 # 初始化聊天管理器
await get_chat_manager()._initialize() await get_chat_manager()._initialize()
asyncio.create_task(get_chat_manager()._auto_save_task()) asyncio.create_task(get_chat_manager()._auto_save_task())
logger.info("聊天管理器初始化成功")
# 根据配置条件性地初始化记忆系统 # 根据配置条件性地初始化记忆系统
if global_config.memory.enable_memory: if global_config.memory.enable_memory:
if self.hippocampus_manager: if self.hippocampus_manager:

View File

@@ -61,6 +61,12 @@ class PersonInfoManager:
) )
try: try:
db.connect(reuse_if_open=True) db.connect(reuse_if_open=True)
# 设置连接池参数
if hasattr(db, 'execute_sql'):
# 设置SQLite优化参数
db.execute_sql('PRAGMA cache_size = -64000') # 64MB缓存
db.execute_sql('PRAGMA temp_store = memory') # 临时存储在内存中
db.execute_sql('PRAGMA mmap_size = 268435456') # 256MB内存映射
db.create_tables([PersonInfo], safe=True) db.create_tables([PersonInfo], safe=True)
except Exception as e: except Exception as e:
logger.error(f"数据库连接或 PersonInfo 表创建失败: {e}") logger.error(f"数据库连接或 PersonInfo 表创建失败: {e}")
@@ -159,34 +165,49 @@ class PersonInfoManager:
async def update_one_field(self, person_id: str, field_name: str, value, data: dict = None): async def update_one_field(self, person_id: str, field_name: str, value, data: dict = None):
"""更新某一个字段,会补全""" """更新某一个字段,会补全"""
if field_name not in PersonInfo._meta.fields: if field_name not in PersonInfo._meta.fields:
# if field_name in person_info_default: # Keep this check if some defaults are not DB fields
# logger.debug(f"更新'{field_name}'跳过,字段存在于默认配置但不在 PersonInfo Peewee 模型中。")
# return
logger.debug(f"更新'{field_name}'失败,未在 PersonInfo Peewee 模型中定义的字段。") logger.debug(f"更新'{field_name}'失败,未在 PersonInfo Peewee 模型中定义的字段。")
return return
# print(f"更新字段: {field_name},值: {value}")
processed_value = value processed_value = value
if field_name in JSON_SERIALIZED_FIELDS: if field_name in JSON_SERIALIZED_FIELDS:
if isinstance(value, (list, dict)): if isinstance(value, (list, dict)):
processed_value = json.dumps(value, ensure_ascii=False, indent=None) processed_value = json.dumps(value, ensure_ascii=False, indent=None)
elif value is None: # Store None as "[]" for JSON list fields elif value is None: # Store None as "[]" for JSON list fields
processed_value = json.dumps([], ensure_ascii=False, indent=None) processed_value = json.dumps([], ensure_ascii=False, indent=None)
# If value is already a string, assume it's pre-serialized or a non-JSON string.
def _db_update_sync(p_id: str, f_name: str, val_to_set): def _db_update_sync(p_id: str, f_name: str, val_to_set):
import time
start_time = time.time()
try:
record = PersonInfo.get_or_none(PersonInfo.person_id == p_id) record = PersonInfo.get_or_none(PersonInfo.person_id == p_id)
query_time = time.time()
if record: if record:
setattr(record, f_name, val_to_set) setattr(record, f_name, val_to_set)
record.save() record.save()
save_time = time.time()
total_time = save_time - start_time
if total_time > 0.5: # 如果超过500ms就记录日志
logger.warning(f"数据库更新操作耗时 {total_time:.3f}秒 (查询: {query_time-start_time:.3f}s, 保存: {save_time-query_time:.3f}s) person_id={p_id}, field={f_name}")
return True, False # Found and updated, no creation needed return True, False # Found and updated, no creation needed
else:
total_time = time.time() - start_time
if total_time > 0.5:
logger.warning(f"数据库查询操作耗时 {total_time:.3f}秒 person_id={p_id}, field={f_name}")
return False, True # Not found, needs creation return False, True # Not found, needs creation
except Exception as e:
total_time = time.time() - start_time
logger.error(f"数据库操作异常,耗时 {total_time:.3f}秒: {e}")
raise
found, needs_creation = await asyncio.to_thread(_db_update_sync, person_id, field_name, processed_value) found, needs_creation = await asyncio.to_thread(_db_update_sync, person_id, field_name, processed_value)
if needs_creation: if needs_creation:
logger.debug(f"更新时 {person_id} 不存在,将新建。") logger.info(f"{person_id} 不存在,将新建。")
creation_data = data if data is not None else {} creation_data = data if data is not None else {}
# Ensure platform and user_id are present for context if available from 'data' # Ensure platform and user_id are present for context if available from 'data'
# but primarily, set the field that triggered the update. # but primarily, set the field that triggered the update.