diff --git a/changelogs/changelog.md b/changelogs/changelog.md index 160d21a81..0d1127b0b 100644 --- a/changelogs/changelog.md +++ b/changelogs/changelog.md @@ -10,6 +10,7 @@ - 修复log出错问题 - 修复focus吞第一条消息问题 - 可关闭聊天规划处理器(建议关闭) +- 优化记忆同步速度,修复记忆构建缺少chat_id的问题 diff --git a/scripts/mongodb_to_sqlite.py b/scripts/mongodb_to_sqlite.py index edd27e435..da34c7d44 100644 --- a/scripts/mongodb_to_sqlite.py +++ b/scripts/mongodb_to_sqlite.py @@ -250,8 +250,6 @@ class MongoToSQLiteMigrator: "nickname": "nickname", "relationship_value": "relationship_value", "konw_time": "know_time", - "msg_interval": "msg_interval", - "msg_interval_list": "msg_interval_list", }, unique_fields=["person_id"], ), diff --git a/src/chat/focus_chat/planners/planner_complex.py b/src/chat/focus_chat/planners/planner_complex.py index 9a0f7e68a..7f4aed7f7 100644 --- a/src/chat/focus_chat/planners/planner_complex.py +++ b/src/chat/focus_chat/planners/planner_complex.py @@ -79,7 +79,7 @@ class ActionPlanner(BasePlanner): super().__init__(log_prefix, action_manager) # LLM规划器配置 self.planner_llm = LLMRequest( - model=global_config.model.focus_planner, + model=global_config.model.planner, max_tokens=1000, request_type="focus.planner", # 用于动作规划 ) diff --git a/src/chat/focus_chat/planners/planner_simple.py b/src/chat/focus_chat/planners/planner_simple.py index 7a81101f9..60818a2f4 100644 --- a/src/chat/focus_chat/planners/planner_simple.py +++ b/src/chat/focus_chat/planners/planner_simple.py @@ -75,7 +75,7 @@ class ActionPlanner(BasePlanner): super().__init__(log_prefix, action_manager) # LLM规划器配置 self.planner_llm = LLMRequest( - model=global_config.model.focus_planner, + model=global_config.model.planner, max_tokens=1000, request_type="focus.planner", # 用于动作规划 ) diff --git a/src/chat/message_receive/message_buffer.py b/src/chat/message_receive/message_buffer.py deleted file mode 100644 index f513b22a5..000000000 --- a/src/chat/message_receive/message_buffer.py +++ /dev/null @@ -1,216 +0,0 @@ -from src.person_info.person_info import person_info_manager -from src.common.logger_manager import get_logger -import asyncio -from dataclasses import dataclass, field -from .message import MessageRecv -from maim_message import BaseMessageInfo, GroupInfo -import hashlib -from typing import Dict -from collections import OrderedDict -import random -import time -from ...config.config import global_config - -logger = get_logger("message_buffer") - - -@dataclass -class CacheMessages: - message: MessageRecv - cache_determination: asyncio.Event = field(default_factory=asyncio.Event) # 判断缓冲是否产生结果 - result: str = "U" - - -class MessageBuffer: - def __init__(self): - self.buffer_pool: Dict[str, OrderedDict[str, CacheMessages]] = {} - self.lock = asyncio.Lock() - - @staticmethod - def get_person_id_(platform: str, user_id: str, group_info: GroupInfo): - """获取唯一id""" - if group_info: - group_id = group_info.group_id - else: - group_id = "私聊" - key = f"{platform}_{user_id}_{group_id}" - return hashlib.md5(key.encode()).hexdigest() - - async def start_caching_messages(self, message: MessageRecv): - """添加消息,启动缓冲""" - if not global_config.chat.message_buffer: - person_id = person_info_manager.get_person_id( - message.message_info.user_info.platform, message.message_info.user_info.user_id - ) - asyncio.create_task(self.save_message_interval(person_id, message.message_info)) - return - person_id_ = self.get_person_id_( - message.message_info.platform, message.message_info.user_info.user_id, message.message_info.group_info - ) - - async with self.lock: - if person_id_ not in self.buffer_pool: - self.buffer_pool[person_id_] = OrderedDict() - - # 标记该用户之前的未处理消息 - for cache_msg in self.buffer_pool[person_id_].values(): - if cache_msg.result == "U": - cache_msg.result = "F" - cache_msg.cache_determination.set() - logger.debug(f"被新消息覆盖信息id: {cache_msg.message.message_info.message_id}") - - # 查找最近的处理成功消息(T) - recent_f_count = 0 - for msg_id in reversed(self.buffer_pool[person_id_]): - msg = self.buffer_pool[person_id_][msg_id] - if msg.result == "T": - break - elif msg.result == "F": - recent_f_count += 1 - - # 判断条件:最近T之后有超过3-5条F - if recent_f_count >= random.randint(3, 5): - new_msg = CacheMessages(message=message, result="T") - new_msg.cache_determination.set() - self.buffer_pool[person_id_][message.message_info.message_id] = new_msg - logger.debug(f"快速处理消息(已堆积{recent_f_count}条F): {message.message_info.message_id}") - return - - # 添加新消息 - self.buffer_pool[person_id_][message.message_info.message_id] = CacheMessages(message=message) - - # 启动3秒缓冲计时器 - person_id = person_info_manager.get_person_id( - message.message_info.user_info.platform, message.message_info.user_info.user_id - ) - asyncio.create_task(self.save_message_interval(person_id, message.message_info)) - asyncio.create_task(self._debounce_processor(person_id_, message.message_info.message_id, person_id)) - - async def _debounce_processor(self, person_id_: str, message_id: str, person_id: str): - """等待3秒无新消息""" - interval_time = await person_info_manager.get_value(person_id, "msg_interval") - if not isinstance(interval_time, (int, str)) or not str(interval_time).isdigit(): - logger.debug("debounce_processor无效的时间") - return - interval_time = max(0.5, int(interval_time) / 1000) - await asyncio.sleep(interval_time) - - async with self.lock: - if person_id_ not in self.buffer_pool or message_id not in self.buffer_pool[person_id_]: - logger.debug(f"消息已被清理,msgid: {message_id}") - return - - cache_msg = self.buffer_pool[person_id_][message_id] - if cache_msg.result == "U": - cache_msg.result = "T" - cache_msg.cache_determination.set() - - async def query_buffer_result(self, message: MessageRecv) -> bool: - """查询缓冲结果,并清理""" - if not global_config.chat.message_buffer: - return True - person_id_ = self.get_person_id_( - message.message_info.platform, message.message_info.user_info.user_id, message.message_info.group_info - ) - - async with self.lock: - user_msgs = self.buffer_pool.get(person_id_, {}) - cache_msg = user_msgs.get(message.message_info.message_id) - - if not cache_msg: - logger.debug(f"查询异常,消息不存在,msgid: {message.message_info.message_id}") - return False # 消息不存在或已清理 - - try: - await asyncio.wait_for(cache_msg.cache_determination.wait(), timeout=10) - result = cache_msg.result == "T" - - if result: - async with self.lock: # 再次加锁 - # 清理所有早于当前消息的已处理消息, 收集所有早于当前消息的F消息的processed_plain_text - keep_msgs = OrderedDict() # 用于存放 T 消息之后的消息 - collected_texts = [] # 用于收集 T 消息及之前 F 消息的文本 - process_target_found = False - - # 遍历当前用户的所有缓冲消息 - for msg_id, cache_msg in self.buffer_pool[person_id_].items(): - # 如果找到了目标处理消息 (T 状态) - if msg_id == message.message_info.message_id: - process_target_found = True - # 收集这条 T 消息的文本 (如果有) - if ( - hasattr(cache_msg.message, "processed_plain_text") - and cache_msg.message.processed_plain_text - ): - collected_texts.append(cache_msg.message.processed_plain_text) - # 不立即放入 keep_msgs,因为它之前的 F 消息也处理完了 - - # 如果已经找到了目标 T 消息,之后的消息需要保留 - elif process_target_found: - keep_msgs[msg_id] = cache_msg - - # 如果还没找到目标 T 消息,说明是之前的消息 (F 或 U) - else: - if cache_msg.result == "F": - # 收集这条 F 消息的文本 (如果有) - if ( - hasattr(cache_msg.message, "processed_plain_text") - and cache_msg.message.processed_plain_text - ): - collected_texts.append(cache_msg.message.processed_plain_text) - elif cache_msg.result == "U": - # 理论上不应该在 T 消息之前还有 U 消息,记录日志 - logger.warning( - f"异常状态:在目标 T 消息 {message.message_info.message_id} 之前发现未处理的 U 消息 {cache_msg.message.message_info.message_id}" - ) - # 也可以选择收集其文本 - if ( - hasattr(cache_msg.message, "processed_plain_text") - and cache_msg.message.processed_plain_text - ): - collected_texts.append(cache_msg.message.processed_plain_text) - - # 更新当前消息 (message) 的 processed_plain_text - # 只有在收集到的文本多于一条,或者只有一条但与原始文本不同时才合并 - if collected_texts: - # 使用 OrderedDict 去重,同时保留原始顺序 - unique_texts = list(OrderedDict.fromkeys(collected_texts)) - merged_text = ",".join(unique_texts) - - # 只有在合并后的文本与原始文本不同时才更新 - # 并且确保不是空合并 - if merged_text and merged_text != message.processed_plain_text: - message.processed_plain_text = merged_text - # 如果合并了文本,原消息不再视为纯 emoji - if hasattr(message, "is_emoji"): - message.is_emoji = False - logger.debug( - f"合并了 {len(unique_texts)} 条消息的文本内容到当前消息 {message.message_info.message_id}" - ) - - # 更新缓冲池,只保留 T 消息之后的消息 - self.buffer_pool[person_id_] = keep_msgs - return result - except asyncio.TimeoutError: - logger.debug(f"查询超时消息id: {message.message_info.message_id}") - return False - - @staticmethod - async def save_message_interval(person_id: str, message: BaseMessageInfo): - message_interval_list = await person_info_manager.get_value(person_id, "msg_interval_list") - now_time_ms = int(round(time.time() * 1000)) - if len(message_interval_list) < 1000: - message_interval_list.append(now_time_ms) - else: - message_interval_list.pop(0) - message_interval_list.append(now_time_ms) - data = { - "platform": message.platform, - "user_id": message.user_info.user_id, - "nickname": message.user_info.user_nickname, - "konw_time": int(time.time()), - } - await person_info_manager.update_one_field(person_id, "msg_interval_list", message_interval_list, data) - - -message_buffer = MessageBuffer() diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index bd2646371..2e814aa69 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -214,11 +214,10 @@ class PersonInfo(BaseModel): platform = TextField() # 平台 user_id = TextField(index=True) # 用户ID nickname = TextField() # 用户昵称 + person_impression = TextField(null=True) # 个人印象 relationship_value = IntegerField(default=0) # 关系值 know_time = FloatField() # 认识时间 (时间戳) - msg_interval = IntegerField() # 消息间隔 - # msg_interval_list: 存储为 JSON 字符串的列表 - msg_interval_list = TextField(null=True) + class Meta: # database = db # 继承自 BaseModel @@ -334,7 +333,7 @@ def create_tables(): def initialize_database(): """ 检查所有定义的表是否存在,如果不存在则创建它们。 - 检查所有表的所有字段是否存在,如果缺失则警告用户并退出程序。 + 检查所有表的所有字段是否存在,如果缺失则自动添加。 """ import sys @@ -350,44 +349,56 @@ def initialize_database(): Knowledges, ThinkingLog, RecalledMessages, - GraphNodes, # 添加图节点表 - GraphEdges, # 添加图边表 + GraphNodes, + GraphEdges, ] - needs_creation = False try: with db: # 管理 table_exists 检查的连接 for model in models: table_name = model._meta.table_name if not db.table_exists(model): - logger.warning(f"表 '{table_name}' 未找到。") - needs_creation = True - break # 一个表丢失,无需进一步检查。 - if not needs_creation: + logger.warning(f"表 '{table_name}' 未找到,正在创建...") + db.create_tables([model]) + logger.info(f"表 '{table_name}' 创建成功") + continue + # 检查字段 - for model in models: - table_name = model._meta.table_name - cursor = db.execute_sql(f"PRAGMA table_info('{table_name}')") - existing_columns = {row[1] for row in cursor.fetchall()} - model_fields = model._meta.fields - for field_name in model_fields: - if field_name not in existing_columns: - logger.error(f"表 '{table_name}' 缺失字段 '{field_name}',请手动迁移数据库结构后重启程序。") - sys.exit(1) + cursor = db.execute_sql(f"PRAGMA table_info('{table_name}')") + existing_columns = {row[1] for row in cursor.fetchall()} + model_fields = model._meta.fields + + for field_name, field_obj in model_fields.items(): + if field_name not in existing_columns: + logger.info(f"表 '{table_name}' 缺失字段 '{field_name}',正在添加...") + # 获取字段类型 + field_type = field_obj.__class__.__name__ + sql_type = { + 'TextField': 'TEXT', + 'IntegerField': 'INTEGER', + 'FloatField': 'FLOAT', + 'DoubleField': 'DOUBLE', + 'BooleanField': 'INTEGER', + 'DateTimeField': 'DATETIME' + }.get(field_type, 'TEXT') + + # 构建 ALTER TABLE 语句 + alter_sql = f'ALTER TABLE {table_name} ADD COLUMN {field_name} {sql_type}' + if field_obj.null: + alter_sql += ' NULL' + else: + alter_sql += ' NOT NULL' + if hasattr(field_obj, 'default') and field_obj.default is not None: + alter_sql += f' DEFAULT {field_obj.default}' + + db.execute_sql(alter_sql) + logger.info(f"字段 '{field_name}' 添加成功") except Exception as e: logger.exception(f"检查表或字段是否存在时出错: {e}") # 如果检查失败(例如数据库不可用),则退出 return - if needs_creation: - logger.info("正在初始化数据库:一个或多个表丢失。正在尝试创建所有定义的表...") - try: - create_tables() # 此函数有其自己的 'with db:' 上下文管理。 - logger.info("数据库表创建过程完成。") - except Exception as e: - logger.exception(f"创建表期间出错: {e}") - else: - logger.info("所有数据库表及字段均已存在。") + logger.info("数据库初始化完成") # 模块加载时调用初始化函数 diff --git a/src/config/official_configs.py b/src/config/official_configs.py index f7c504c5a..347d8f2d0 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -451,8 +451,8 @@ class ModelConfig(ConfigBase): focus_tool_use: dict[str, Any] = field(default_factory=lambda: {}) """专注工具使用模型配置""" - focus_planner: dict[str, Any] = field(default_factory=lambda: {}) - """专注规划模型配置""" + planner: dict[str, Any] = field(default_factory=lambda: {}) + """规划模型配置""" focus_expressor: dict[str, Any] = field(default_factory=lambda: {}) """专注表达器模型配置""" diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index 11f8dd2bf..84442a2d5 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -6,17 +6,10 @@ import hashlib from typing import Any, Callable, Dict import datetime import asyncio -import numpy as np from src.llm_models.utils_model import LLMRequest from src.config.config import global_config from src.individuality.individuality import individuality -import matplotlib - -matplotlib.use("Agg") -import matplotlib.pyplot as plt -from pathlib import Path -import pandas as pd import json # 新增导入 import re @@ -31,7 +24,6 @@ PersonInfoManager 类方法功能摘要: 6. get_values - 批量获取字段值(任一字段无效则返回空字典) 7. del_all_undefined_field - 清理全集合中未定义的字段 8. get_specific_value_list - 根据指定条件,返回person_id,value字典 -9. personal_habit_deduction - 定时推断个人习惯 """ @@ -46,8 +38,6 @@ person_info_default = { "nickname": "Unknown", # 提供非None的默认值 "relationship_value": 0, "know_time": 0, # 修正拼写:konw_time -> know_time - "msg_interval": 2000, - "msg_interval_list": [], # 将作为 JSON 字符串存储在 Peewee 的 TextField "user_cardname": None, # 注意:此字段不在 PersonInfo Peewee 模型中 "user_avatar": None, # 注意:此字段不在 PersonInfo Peewee 模型中 } @@ -135,11 +125,6 @@ class PersonInfoManager: if key in model_fields and key not in final_data: final_data[key] = default_value - if "msg_interval_list" in final_data and isinstance(final_data["msg_interval_list"], list): - final_data["msg_interval_list"] = json.dumps(final_data["msg_interval_list"]) - elif "msg_interval_list" not in final_data and "msg_interval_list" in model_fields: - final_data["msg_interval_list"] = json.dumps([]) - def _db_create_sync(p_data: dict): try: PersonInfo.create(**p_data) @@ -162,10 +147,7 @@ class PersonInfoManager: def _db_update_sync(p_id: str, f_name: str, val): record = PersonInfo.get_or_none(PersonInfo.person_id == p_id) if record: - if f_name == "msg_interval_list" and isinstance(val, list): - setattr(record, f_name, json.dumps(val)) - else: - setattr(record, f_name, val) + setattr(record, f_name, val) record.save() return True, False return False, True @@ -366,12 +348,6 @@ class PersonInfoManager: record = PersonInfo.get_or_none(PersonInfo.person_id == p_id) if record: val = getattr(record, f_name) - if f_name == "msg_interval_list" and isinstance(val, str): - try: - return json.loads(val) - except json.JSONDecodeError: - logger.warning(f"无法解析 {p_id} 的 msg_interval_list JSON: {val}") - return copy.deepcopy(person_info_default.get(f_name, [])) return val return None @@ -410,13 +386,7 @@ class PersonInfoManager: if record: value = getattr(record, field_name) - if field_name == "msg_interval_list" and isinstance(value, str): - try: - result[field_name] = json.loads(value) - except json.JSONDecodeError: - logger.warning(f"无法解析 {person_id} 的 msg_interval_list JSON: {value}") - result[field_name] = copy.deepcopy(person_info_default.get(field_name, [])) - elif value is not None: + if value is not None: result[field_name] = value else: result[field_name] = copy.deepcopy(person_info_default.get(field_name)) @@ -425,14 +395,6 @@ class PersonInfoManager: return result - # @staticmethod - # async def del_all_undefined_field(): - # """删除所有项里的未定义字段 - 对于Peewee (SQL),此操作通常不适用,因为模式是固定的。""" - # logger.info( - # "del_all_undefined_field: 对于使用Peewee的SQL数据库,此操作通常不适用或不需要,因为表结构是预定义的。" - # ) - # return - @staticmethod async def get_specific_value_list( field_name: str, @@ -450,17 +412,8 @@ class PersonInfoManager: try: for record in PersonInfo.select(PersonInfo.person_id, getattr(PersonInfo, f_name)): value = getattr(record, f_name) - if f_name == "msg_interval_list" and isinstance(value, str): - try: - processed_value = json.loads(value) - except json.JSONDecodeError: - logger.warning(f"跳过记录 {record.person_id},无法解析 msg_interval_list: {value}") - continue - else: - processed_value = value - - if way(processed_value): - found_results[record.person_id] = processed_value + if way(value): + found_results[record.person_id] = value except Exception as e_query: logger.error(f"数据库查询失败 (Peewee specific_value_list for {f_name}): {str(e_query)}", exc_info=True) return found_results @@ -471,86 +424,6 @@ class PersonInfoManager: logger.error(f"执行 get_specific_value_list 线程时出错: {str(e)}", exc_info=True) return {} - async def personal_habit_deduction(self): - """启动个人信息推断,每天根据一定条件推断一次""" - try: - while 1: - await asyncio.sleep(600) - current_time_dt = datetime.datetime.now() - logger.info(f"个人信息推断启动: {current_time_dt.strftime('%Y-%m-%d %H:%M:%S')}") - - msg_interval_map_generated = False - msg_interval_lists_map = await self.get_specific_value_list( - "msg_interval_list", lambda x: isinstance(x, list) and len(x) >= 100 - ) - - for person_id, actual_msg_interval_list in msg_interval_lists_map.items(): - await asyncio.sleep(0.3) - try: - time_interval = [] - for t1, t2 in zip(actual_msg_interval_list, actual_msg_interval_list[1:]): - delta = t2 - t1 - if delta > 0: - time_interval.append(delta) - - time_interval = [t for t in time_interval if 200 <= t <= 8000] - - if len(time_interval) >= 30 + 10: - time_interval.sort() - msg_interval_map_generated = True - log_dir = Path("logs/person_info") - log_dir.mkdir(parents=True, exist_ok=True) - plt.figure(figsize=(10, 6)) - time_series_original = pd.Series(time_interval) - plt.hist( - time_series_original, - bins=50, - density=True, - alpha=0.4, - color="pink", - label="Histogram (Original Filtered)", - ) - time_series_original.plot( - kind="kde", color="mediumpurple", linewidth=1, label="Density (Original Filtered)" - ) - plt.grid(True, alpha=0.2) - plt.xlim(0, 8000) - plt.title(f"Message Interval Distribution (User: {person_id[:8]}...)") - plt.xlabel("Interval (ms)") - plt.ylabel("Density") - plt.legend(framealpha=0.9, facecolor="white") - img_path = log_dir / f"interval_distribution_{person_id[:8]}.png" - plt.savefig(img_path) - plt.close() - - trimmed_interval = time_interval[5:-5] - if trimmed_interval: - msg_interval_val = int(round(np.percentile(trimmed_interval, 37))) - await self.update_one_field(person_id, "msg_interval", msg_interval_val) - logger.trace( - f"用户{person_id}的msg_interval通过头尾截断和37分位数更新为{msg_interval_val}" - ) - else: - logger.trace(f"用户{person_id}截断后数据为空,无法计算msg_interval") - else: - logger.trace( - f"用户{person_id}有效消息间隔数量 ({len(time_interval)}) 不足进行推断 (需要至少 {30 + 10} 条)" - ) - except Exception as e_inner: - logger.trace(f"用户{person_id}消息间隔计算失败: {type(e_inner).__name__}: {str(e_inner)}") - continue - - if msg_interval_map_generated: - logger.trace("已保存分布图到: logs/person_info") - - current_time_dt_end = datetime.datetime.now() - logger.trace(f"个人信息推断结束: {current_time_dt_end.strftime('%Y-%m-%d %H:%M:%S')}") - await asyncio.sleep(86400) - - except Exception as e: - logger.error(f"个人信息推断运行时出错: {str(e)}") - logger.exception("详细错误信息:") - async def get_or_create_person( self, platform: str, user_id: int, nickname: str = None, user_cardname: str = None, user_avatar: str = None ) -> str: diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 9334405d4..e26e2dfb2 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "2.11.0" +version = "2.11.1" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -214,6 +214,13 @@ provider = "SILICONFLOW" pri_in = 0.35 pri_out = 0.35 +[model.planner] #决策:负责决定麦麦该做什么,麦麦的决策模型 +name = "Pro/deepseek-ai/DeepSeek-V3" +provider = "SILICONFLOW" +pri_in = 2 +pri_out = 8 +temp = 0.3 + #嵌入模型 [model.embedding] @@ -266,15 +273,6 @@ pri_out = 2 temp = 0.7 enable_thinking = false # 是否启用思考(qwen3 only) -[model.focus_planner] #决策:认真聊天时,负责决定麦麦该做什么 -name = "Pro/deepseek-ai/DeepSeek-V3" -# name = "Qwen/Qwen3-30B-A3B" -provider = "SILICONFLOW" -# enable_thinking = false # 是否启用思考(qwen3 only) -pri_in = 2 -pri_out = 8 -temp = 0.3 - #表达器模型,用于表达麦麦的想法,生成最终回复,对语言风格影响极大 #也用于表达方式学习 [model.focus_expressor]