fix:修改配置文件模型名,移除message_buffer

This commit is contained in:
SengokuCola
2025-06-03 13:41:30 +08:00
parent 691172766e
commit 0bea30c052
9 changed files with 57 additions and 392 deletions

View File

@@ -10,6 +10,7 @@
- 修复log出错问题
- 修复focus吞第一条消息问题
- 可关闭聊天规划处理器(建议关闭)
- 优化记忆同步速度修复记忆构建缺少chat_id的问题

View File

@@ -250,8 +250,6 @@ class MongoToSQLiteMigrator:
"nickname": "nickname",
"relationship_value": "relationship_value",
"konw_time": "know_time",
"msg_interval": "msg_interval",
"msg_interval_list": "msg_interval_list",
},
unique_fields=["person_id"],
),

View File

@@ -79,7 +79,7 @@ class ActionPlanner(BasePlanner):
super().__init__(log_prefix, action_manager)
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.focus_planner,
model=global_config.model.planner,
max_tokens=1000,
request_type="focus.planner", # 用于动作规划
)

View File

@@ -75,7 +75,7 @@ class ActionPlanner(BasePlanner):
super().__init__(log_prefix, action_manager)
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.model.focus_planner,
model=global_config.model.planner,
max_tokens=1000,
request_type="focus.planner", # 用于动作规划
)

View File

@@ -1,216 +0,0 @@
from src.person_info.person_info import person_info_manager
from src.common.logger_manager import get_logger
import asyncio
from dataclasses import dataclass, field
from .message import MessageRecv
from maim_message import BaseMessageInfo, GroupInfo
import hashlib
from typing import Dict
from collections import OrderedDict
import random
import time
from ...config.config import global_config
logger = get_logger("message_buffer")
@dataclass
class CacheMessages:
message: MessageRecv
cache_determination: asyncio.Event = field(default_factory=asyncio.Event) # 判断缓冲是否产生结果
result: str = "U"
class MessageBuffer:
def __init__(self):
self.buffer_pool: Dict[str, OrderedDict[str, CacheMessages]] = {}
self.lock = asyncio.Lock()
@staticmethod
def get_person_id_(platform: str, user_id: str, group_info: GroupInfo):
"""获取唯一id"""
if group_info:
group_id = group_info.group_id
else:
group_id = "私聊"
key = f"{platform}_{user_id}_{group_id}"
return hashlib.md5(key.encode()).hexdigest()
async def start_caching_messages(self, message: MessageRecv):
"""添加消息,启动缓冲"""
if not global_config.chat.message_buffer:
person_id = person_info_manager.get_person_id(
message.message_info.user_info.platform, message.message_info.user_info.user_id
)
asyncio.create_task(self.save_message_interval(person_id, message.message_info))
return
person_id_ = self.get_person_id_(
message.message_info.platform, message.message_info.user_info.user_id, message.message_info.group_info
)
async with self.lock:
if person_id_ not in self.buffer_pool:
self.buffer_pool[person_id_] = OrderedDict()
# 标记该用户之前的未处理消息
for cache_msg in self.buffer_pool[person_id_].values():
if cache_msg.result == "U":
cache_msg.result = "F"
cache_msg.cache_determination.set()
logger.debug(f"被新消息覆盖信息id: {cache_msg.message.message_info.message_id}")
# 查找最近的处理成功消息(T)
recent_f_count = 0
for msg_id in reversed(self.buffer_pool[person_id_]):
msg = self.buffer_pool[person_id_][msg_id]
if msg.result == "T":
break
elif msg.result == "F":
recent_f_count += 1
# 判断条件最近T之后有超过3-5条F
if recent_f_count >= random.randint(3, 5):
new_msg = CacheMessages(message=message, result="T")
new_msg.cache_determination.set()
self.buffer_pool[person_id_][message.message_info.message_id] = new_msg
logger.debug(f"快速处理消息(已堆积{recent_f_count}条F): {message.message_info.message_id}")
return
# 添加新消息
self.buffer_pool[person_id_][message.message_info.message_id] = CacheMessages(message=message)
# 启动3秒缓冲计时器
person_id = person_info_manager.get_person_id(
message.message_info.user_info.platform, message.message_info.user_info.user_id
)
asyncio.create_task(self.save_message_interval(person_id, message.message_info))
asyncio.create_task(self._debounce_processor(person_id_, message.message_info.message_id, person_id))
async def _debounce_processor(self, person_id_: str, message_id: str, person_id: str):
"""等待3秒无新消息"""
interval_time = await person_info_manager.get_value(person_id, "msg_interval")
if not isinstance(interval_time, (int, str)) or not str(interval_time).isdigit():
logger.debug("debounce_processor无效的时间")
return
interval_time = max(0.5, int(interval_time) / 1000)
await asyncio.sleep(interval_time)
async with self.lock:
if person_id_ not in self.buffer_pool or message_id not in self.buffer_pool[person_id_]:
logger.debug(f"消息已被清理msgid: {message_id}")
return
cache_msg = self.buffer_pool[person_id_][message_id]
if cache_msg.result == "U":
cache_msg.result = "T"
cache_msg.cache_determination.set()
async def query_buffer_result(self, message: MessageRecv) -> bool:
"""查询缓冲结果,并清理"""
if not global_config.chat.message_buffer:
return True
person_id_ = self.get_person_id_(
message.message_info.platform, message.message_info.user_info.user_id, message.message_info.group_info
)
async with self.lock:
user_msgs = self.buffer_pool.get(person_id_, {})
cache_msg = user_msgs.get(message.message_info.message_id)
if not cache_msg:
logger.debug(f"查询异常消息不存在msgid: {message.message_info.message_id}")
return False # 消息不存在或已清理
try:
await asyncio.wait_for(cache_msg.cache_determination.wait(), timeout=10)
result = cache_msg.result == "T"
if result:
async with self.lock: # 再次加锁
# 清理所有早于当前消息的已处理消息, 收集所有早于当前消息的F消息的processed_plain_text
keep_msgs = OrderedDict() # 用于存放 T 消息之后的消息
collected_texts = [] # 用于收集 T 消息及之前 F 消息的文本
process_target_found = False
# 遍历当前用户的所有缓冲消息
for msg_id, cache_msg in self.buffer_pool[person_id_].items():
# 如果找到了目标处理消息 (T 状态)
if msg_id == message.message_info.message_id:
process_target_found = True
# 收集这条 T 消息的文本 (如果有)
if (
hasattr(cache_msg.message, "processed_plain_text")
and cache_msg.message.processed_plain_text
):
collected_texts.append(cache_msg.message.processed_plain_text)
# 不立即放入 keep_msgs因为它之前的 F 消息也处理完了
# 如果已经找到了目标 T 消息,之后的消息需要保留
elif process_target_found:
keep_msgs[msg_id] = cache_msg
# 如果还没找到目标 T 消息,说明是之前的消息 (F 或 U)
else:
if cache_msg.result == "F":
# 收集这条 F 消息的文本 (如果有)
if (
hasattr(cache_msg.message, "processed_plain_text")
and cache_msg.message.processed_plain_text
):
collected_texts.append(cache_msg.message.processed_plain_text)
elif cache_msg.result == "U":
# 理论上不应该在 T 消息之前还有 U 消息,记录日志
logger.warning(
f"异常状态:在目标 T 消息 {message.message_info.message_id} 之前发现未处理的 U 消息 {cache_msg.message.message_info.message_id}"
)
# 也可以选择收集其文本
if (
hasattr(cache_msg.message, "processed_plain_text")
and cache_msg.message.processed_plain_text
):
collected_texts.append(cache_msg.message.processed_plain_text)
# 更新当前消息 (message) 的 processed_plain_text
# 只有在收集到的文本多于一条,或者只有一条但与原始文本不同时才合并
if collected_texts:
# 使用 OrderedDict 去重,同时保留原始顺序
unique_texts = list(OrderedDict.fromkeys(collected_texts))
merged_text = "".join(unique_texts)
# 只有在合并后的文本与原始文本不同时才更新
# 并且确保不是空合并
if merged_text and merged_text != message.processed_plain_text:
message.processed_plain_text = merged_text
# 如果合并了文本,原消息不再视为纯 emoji
if hasattr(message, "is_emoji"):
message.is_emoji = False
logger.debug(
f"合并了 {len(unique_texts)} 条消息的文本内容到当前消息 {message.message_info.message_id}"
)
# 更新缓冲池,只保留 T 消息之后的消息
self.buffer_pool[person_id_] = keep_msgs
return result
except asyncio.TimeoutError:
logger.debug(f"查询超时消息id {message.message_info.message_id}")
return False
@staticmethod
async def save_message_interval(person_id: str, message: BaseMessageInfo):
message_interval_list = await person_info_manager.get_value(person_id, "msg_interval_list")
now_time_ms = int(round(time.time() * 1000))
if len(message_interval_list) < 1000:
message_interval_list.append(now_time_ms)
else:
message_interval_list.pop(0)
message_interval_list.append(now_time_ms)
data = {
"platform": message.platform,
"user_id": message.user_info.user_id,
"nickname": message.user_info.user_nickname,
"konw_time": int(time.time()),
}
await person_info_manager.update_one_field(person_id, "msg_interval_list", message_interval_list, data)
message_buffer = MessageBuffer()

View File

@@ -214,11 +214,10 @@ class PersonInfo(BaseModel):
platform = TextField() # 平台
user_id = TextField(index=True) # 用户ID
nickname = TextField() # 用户昵称
person_impression = TextField(null=True) # 个人印象
relationship_value = IntegerField(default=0) # 关系值
know_time = FloatField() # 认识时间 (时间戳)
msg_interval = IntegerField() # 消息间隔
# msg_interval_list: 存储为 JSON 字符串的列表
msg_interval_list = TextField(null=True)
class Meta:
# database = db # 继承自 BaseModel
@@ -334,7 +333,7 @@ def create_tables():
def initialize_database():
"""
检查所有定义的表是否存在,如果不存在则创建它们。
检查所有表的所有字段是否存在,如果缺失则警告用户并退出程序
检查所有表的所有字段是否存在,如果缺失则自动添加
"""
import sys
@@ -350,44 +349,56 @@ def initialize_database():
Knowledges,
ThinkingLog,
RecalledMessages,
GraphNodes, # 添加图节点表
GraphEdges, # 添加图边表
GraphNodes,
GraphEdges,
]
needs_creation = False
try:
with db: # 管理 table_exists 检查的连接
for model in models:
table_name = model._meta.table_name
if not db.table_exists(model):
logger.warning(f"'{table_name}' 未找到")
needs_creation = True
break # 一个表丢失,无需进一步检查。
if not needs_creation:
logger.warning(f"'{table_name}' 未找到,正在创建...")
db.create_tables([model])
logger.info(f"'{table_name}' 创建成功")
continue
# 检查字段
for model in models:
table_name = model._meta.table_name
cursor = db.execute_sql(f"PRAGMA table_info('{table_name}')")
existing_columns = {row[1] for row in cursor.fetchall()}
model_fields = model._meta.fields
for field_name in model_fields:
if field_name not in existing_columns:
logger.error(f"'{table_name}' 缺失字段 '{field_name}',请手动迁移数据库结构后重启程序。")
sys.exit(1)
cursor = db.execute_sql(f"PRAGMA table_info('{table_name}')")
existing_columns = {row[1] for row in cursor.fetchall()}
model_fields = model._meta.fields
for field_name, field_obj in model_fields.items():
if field_name not in existing_columns:
logger.info(f"'{table_name}' 缺失字段 '{field_name}',正在添加...")
# 获取字段类型
field_type = field_obj.__class__.__name__
sql_type = {
'TextField': 'TEXT',
'IntegerField': 'INTEGER',
'FloatField': 'FLOAT',
'DoubleField': 'DOUBLE',
'BooleanField': 'INTEGER',
'DateTimeField': 'DATETIME'
}.get(field_type, 'TEXT')
# 构建 ALTER TABLE 语句
alter_sql = f'ALTER TABLE {table_name} ADD COLUMN {field_name} {sql_type}'
if field_obj.null:
alter_sql += ' NULL'
else:
alter_sql += ' NOT NULL'
if hasattr(field_obj, 'default') and field_obj.default is not None:
alter_sql += f' DEFAULT {field_obj.default}'
db.execute_sql(alter_sql)
logger.info(f"字段 '{field_name}' 添加成功")
except Exception as e:
logger.exception(f"检查表或字段是否存在时出错: {e}")
# 如果检查失败(例如数据库不可用),则退出
return
if needs_creation:
logger.info("正在初始化数据库:一个或多个表丢失。正在尝试创建所有定义的表...")
try:
create_tables() # 此函数有其自己的 'with db:' 上下文管理。
logger.info("数据库表创建过程完成。")
except Exception as e:
logger.exception(f"创建表期间出错: {e}")
else:
logger.info("所有数据库表及字段均已存在。")
logger.info("数据库初始化完成")
# 模块加载时调用初始化函数

View File

@@ -451,8 +451,8 @@ class ModelConfig(ConfigBase):
focus_tool_use: dict[str, Any] = field(default_factory=lambda: {})
"""专注工具使用模型配置"""
focus_planner: dict[str, Any] = field(default_factory=lambda: {})
"""专注规划模型配置"""
planner: dict[str, Any] = field(default_factory=lambda: {})
"""规划模型配置"""
focus_expressor: dict[str, Any] = field(default_factory=lambda: {})
"""专注表达器模型配置"""

View File

@@ -6,17 +6,10 @@ import hashlib
from typing import Any, Callable, Dict
import datetime
import asyncio
import numpy as np
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.individuality.individuality import individuality
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
import json # 新增导入
import re
@@ -31,7 +24,6 @@ PersonInfoManager 类方法功能摘要:
6. get_values - 批量获取字段值(任一字段无效则返回空字典)
7. del_all_undefined_field - 清理全集合中未定义的字段
8. get_specific_value_list - 根据指定条件返回person_id,value字典
9. personal_habit_deduction - 定时推断个人习惯
"""
@@ -46,8 +38,6 @@ person_info_default = {
"nickname": "Unknown", # 提供非None的默认值
"relationship_value": 0,
"know_time": 0, # 修正拼写konw_time -> know_time
"msg_interval": 2000,
"msg_interval_list": [], # 将作为 JSON 字符串存储在 Peewee 的 TextField
"user_cardname": None, # 注意:此字段不在 PersonInfo Peewee 模型中
"user_avatar": None, # 注意:此字段不在 PersonInfo Peewee 模型中
}
@@ -135,11 +125,6 @@ class PersonInfoManager:
if key in model_fields and key not in final_data:
final_data[key] = default_value
if "msg_interval_list" in final_data and isinstance(final_data["msg_interval_list"], list):
final_data["msg_interval_list"] = json.dumps(final_data["msg_interval_list"])
elif "msg_interval_list" not in final_data and "msg_interval_list" in model_fields:
final_data["msg_interval_list"] = json.dumps([])
def _db_create_sync(p_data: dict):
try:
PersonInfo.create(**p_data)
@@ -162,10 +147,7 @@ class PersonInfoManager:
def _db_update_sync(p_id: str, f_name: str, val):
record = PersonInfo.get_or_none(PersonInfo.person_id == p_id)
if record:
if f_name == "msg_interval_list" and isinstance(val, list):
setattr(record, f_name, json.dumps(val))
else:
setattr(record, f_name, val)
setattr(record, f_name, val)
record.save()
return True, False
return False, True
@@ -366,12 +348,6 @@ class PersonInfoManager:
record = PersonInfo.get_or_none(PersonInfo.person_id == p_id)
if record:
val = getattr(record, f_name)
if f_name == "msg_interval_list" and isinstance(val, str):
try:
return json.loads(val)
except json.JSONDecodeError:
logger.warning(f"无法解析 {p_id} 的 msg_interval_list JSON: {val}")
return copy.deepcopy(person_info_default.get(f_name, []))
return val
return None
@@ -410,13 +386,7 @@ class PersonInfoManager:
if record:
value = getattr(record, field_name)
if field_name == "msg_interval_list" and isinstance(value, str):
try:
result[field_name] = json.loads(value)
except json.JSONDecodeError:
logger.warning(f"无法解析 {person_id} 的 msg_interval_list JSON: {value}")
result[field_name] = copy.deepcopy(person_info_default.get(field_name, []))
elif value is not None:
if value is not None:
result[field_name] = value
else:
result[field_name] = copy.deepcopy(person_info_default.get(field_name))
@@ -425,14 +395,6 @@ class PersonInfoManager:
return result
# @staticmethod
# async def del_all_undefined_field():
# """删除所有项里的未定义字段 - 对于Peewee (SQL),此操作通常不适用,因为模式是固定的。"""
# logger.info(
# "del_all_undefined_field: 对于使用Peewee的SQL数据库此操作通常不适用或不需要因为表结构是预定义的。"
# )
# return
@staticmethod
async def get_specific_value_list(
field_name: str,
@@ -450,17 +412,8 @@ class PersonInfoManager:
try:
for record in PersonInfo.select(PersonInfo.person_id, getattr(PersonInfo, f_name)):
value = getattr(record, f_name)
if f_name == "msg_interval_list" and isinstance(value, str):
try:
processed_value = json.loads(value)
except json.JSONDecodeError:
logger.warning(f"跳过记录 {record.person_id},无法解析 msg_interval_list: {value}")
continue
else:
processed_value = value
if way(processed_value):
found_results[record.person_id] = processed_value
if way(value):
found_results[record.person_id] = value
except Exception as e_query:
logger.error(f"数据库查询失败 (Peewee specific_value_list for {f_name}): {str(e_query)}", exc_info=True)
return found_results
@@ -471,86 +424,6 @@ class PersonInfoManager:
logger.error(f"执行 get_specific_value_list 线程时出错: {str(e)}", exc_info=True)
return {}
async def personal_habit_deduction(self):
"""启动个人信息推断,每天根据一定条件推断一次"""
try:
while 1:
await asyncio.sleep(600)
current_time_dt = datetime.datetime.now()
logger.info(f"个人信息推断启动: {current_time_dt.strftime('%Y-%m-%d %H:%M:%S')}")
msg_interval_map_generated = False
msg_interval_lists_map = await self.get_specific_value_list(
"msg_interval_list", lambda x: isinstance(x, list) and len(x) >= 100
)
for person_id, actual_msg_interval_list in msg_interval_lists_map.items():
await asyncio.sleep(0.3)
try:
time_interval = []
for t1, t2 in zip(actual_msg_interval_list, actual_msg_interval_list[1:]):
delta = t2 - t1
if delta > 0:
time_interval.append(delta)
time_interval = [t for t in time_interval if 200 <= t <= 8000]
if len(time_interval) >= 30 + 10:
time_interval.sort()
msg_interval_map_generated = True
log_dir = Path("logs/person_info")
log_dir.mkdir(parents=True, exist_ok=True)
plt.figure(figsize=(10, 6))
time_series_original = pd.Series(time_interval)
plt.hist(
time_series_original,
bins=50,
density=True,
alpha=0.4,
color="pink",
label="Histogram (Original Filtered)",
)
time_series_original.plot(
kind="kde", color="mediumpurple", linewidth=1, label="Density (Original Filtered)"
)
plt.grid(True, alpha=0.2)
plt.xlim(0, 8000)
plt.title(f"Message Interval Distribution (User: {person_id[:8]}...)")
plt.xlabel("Interval (ms)")
plt.ylabel("Density")
plt.legend(framealpha=0.9, facecolor="white")
img_path = log_dir / f"interval_distribution_{person_id[:8]}.png"
plt.savefig(img_path)
plt.close()
trimmed_interval = time_interval[5:-5]
if trimmed_interval:
msg_interval_val = int(round(np.percentile(trimmed_interval, 37)))
await self.update_one_field(person_id, "msg_interval", msg_interval_val)
logger.trace(
f"用户{person_id}的msg_interval通过头尾截断和37分位数更新为{msg_interval_val}"
)
else:
logger.trace(f"用户{person_id}截断后数据为空无法计算msg_interval")
else:
logger.trace(
f"用户{person_id}有效消息间隔数量 ({len(time_interval)}) 不足进行推断 (需要至少 {30 + 10} 条)"
)
except Exception as e_inner:
logger.trace(f"用户{person_id}消息间隔计算失败: {type(e_inner).__name__}: {str(e_inner)}")
continue
if msg_interval_map_generated:
logger.trace("已保存分布图到: logs/person_info")
current_time_dt_end = datetime.datetime.now()
logger.trace(f"个人信息推断结束: {current_time_dt_end.strftime('%Y-%m-%d %H:%M:%S')}")
await asyncio.sleep(86400)
except Exception as e:
logger.error(f"个人信息推断运行时出错: {str(e)}")
logger.exception("详细错误信息:")
async def get_or_create_person(
self, platform: str, user_id: int, nickname: str = None, user_cardname: str = None, user_avatar: str = None
) -> str:

View File

@@ -1,5 +1,5 @@
[inner]
version = "2.11.0"
version = "2.11.1"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请在修改后将version的值进行变更
@@ -214,6 +214,13 @@ provider = "SILICONFLOW"
pri_in = 0.35
pri_out = 0.35
[model.planner] #决策:负责决定麦麦该做什么,麦麦的决策模型
name = "Pro/deepseek-ai/DeepSeek-V3"
provider = "SILICONFLOW"
pri_in = 2
pri_out = 8
temp = 0.3
#嵌入模型
[model.embedding]
@@ -266,15 +273,6 @@ pri_out = 2
temp = 0.7
enable_thinking = false # 是否启用思考qwen3 only
[model.focus_planner] #决策:认真聊天时,负责决定麦麦该做什么
name = "Pro/deepseek-ai/DeepSeek-V3"
# name = "Qwen/Qwen3-30B-A3B"
provider = "SILICONFLOW"
# enable_thinking = false # 是否启用思考(qwen3 only)
pri_in = 2
pri_out = 8
temp = 0.3
#表达器模型,用于表达麦麦的想法,生成最终回复,对语言风格影响极大
#也用于表达方式学习
[model.focus_expressor]