This commit is contained in:
SengokuCola
2025-04-05 23:12:53 +08:00
9 changed files with 324 additions and 11 deletions

View File

@@ -56,8 +56,9 @@ class MainSystem:
self.mood_manager.start_mood_update(update_interval=global_config.mood_update_interval) self.mood_manager.start_mood_update(update_interval=global_config.mood_update_interval)
logger.success("情绪管理器启动成功") logger.success("情绪管理器启动成功")
# 检查并清除person_info冗余字段 # 检查并清除person_info冗余字段,启动个人习惯推断
await person_info_manager.del_all_undefined_field() await person_info_manager.del_all_undefined_field()
asyncio.create_task(person_info_manager.personal_habit_deduction())
# 启动愿望管理器 # 启动愿望管理器
await willing_manager.ensure_started() await willing_manager.ensure_started()

View File

@@ -0,0 +1,187 @@
from ..person_info.person_info import person_info_manager
from src.common.logger import get_module_logger
import asyncio
from dataclasses import dataclass, field
from .message import MessageRecv
from ..message.message_base import BaseMessageInfo
import hashlib
from typing import Dict
from collections import OrderedDict
import random
import time
from ..config.config import global_config
logger = get_module_logger("message_buffer")
@dataclass
class CacheMessages:
message: MessageRecv
cache_determination: asyncio.Event = field(default_factory=asyncio.Event) # 判断缓冲是否产生结果
result: str = "U"
class MessageBuffer:
def __init__(self):
self.buffer_pool: Dict[str, OrderedDict[str, CacheMessages]] = {}
self.lock = asyncio.Lock()
def get_person_id_(self, platform:str, user_id:str, group_id:str):
"""获取唯一id"""
group_id = group_id or "私聊"
key = f"{platform}_{user_id}_{group_id}"
return hashlib.md5(key.encode()).hexdigest()
async def start_caching_messages(self, message:MessageRecv):
"""添加消息,启动缓冲"""
if not global_config.message_buffer:
person_id = person_info_manager.get_person_id(message.message_info.user_info.platform,
message.message_info.user_info.user_id)
asyncio.create_task(self.save_message_interval(person_id, message.message_info))
return
person_id_ = self.get_person_id_(message.message_info.platform,
message.message_info.user_info.user_id,
message.message_info.group_info.group_id)
async with self.lock:
if person_id_ not in self.buffer_pool:
self.buffer_pool[person_id_] = OrderedDict()
# 标记该用户之前的未处理消息
for cache_msg in self.buffer_pool[person_id_].values():
if cache_msg.result == "U":
cache_msg.result = "F"
cache_msg.cache_determination.set()
logger.debug(f"被新消息覆盖信息id: {cache_msg.message.message_info.message_id}")
# 查找最近的处理成功消息(T)
recent_F_count = 0
for msg_id in reversed(self.buffer_pool[person_id_]):
msg = self.buffer_pool[person_id_][msg_id]
if msg.result == "T":
break
elif msg.result == "F":
recent_F_count += 1
# 判断条件最近T之后有超过3-5条F
if (recent_F_count >= random.randint(3, 5)):
new_msg = CacheMessages(message=message, result="T")
new_msg.cache_determination.set()
self.buffer_pool[person_id_][message.message_info.message_id] = new_msg
logger.debug(f"快速处理消息(已堆积{recent_F_count}条F): {message.message_info.message_id}")
return
# 添加新消息
self.buffer_pool[person_id_][message.message_info.message_id] = CacheMessages(message=message)
# 启动3秒缓冲计时器
person_id = person_info_manager.get_person_id(message.message_info.user_info.platform,
message.message_info.user_info.user_id)
asyncio.create_task(self.save_message_interval(person_id, message.message_info))
asyncio.create_task(self._debounce_processor(person_id_,
message.message_info.message_id,
person_id))
async def _debounce_processor(self, person_id_: str, message_id: str, person_id: str):
"""等待3秒无新消息"""
interval_time = await person_info_manager.get_value(person_id, "msg_interval")
if not isinstance(interval_time, (int, str)) or not str(interval_time).isdigit():
logger.debug("debounce_processor无效的时间")
return
interval_time = max(0.5, int(interval_time) / 1000)
await asyncio.sleep(interval_time)
async with self.lock:
if (person_id_ not in self.buffer_pool or
message_id not in self.buffer_pool[person_id_]):
logger.debug(f"消息已被清理msgid: {message_id}")
return
cache_msg = self.buffer_pool[person_id_][message_id]
if cache_msg.result == "U":
cache_msg.result = "T"
cache_msg.cache_determination.set()
async def query_buffer_result(self, message:MessageRecv) -> bool:
"""查询缓冲结果,并清理"""
if not global_config.message_buffer:
return True
person_id_ = self.get_person_id_(message.message_info.platform,
message.message_info.user_info.user_id,
message.message_info.group_info.group_id)
async with self.lock:
user_msgs = self.buffer_pool.get(person_id_, {})
cache_msg = user_msgs.get(message.message_info.message_id)
if not cache_msg:
logger.debug(f"查询异常消息不存在msgid: {message.message_info.message_id}")
return False # 消息不存在或已清理
try:
await asyncio.wait_for(cache_msg.cache_determination.wait(), timeout=10)
result = cache_msg.result == "T"
if result:
async with self.lock: # 再次加锁
# 清理所有早于当前消息的已处理消息, 收集所有早于当前消息的F消息的processed_plain_text
keep_msgs = OrderedDict()
combined_text = []
found = False
type = "text"
is_update = True
for msg_id, msg in self.buffer_pool[person_id_].items():
if msg_id == message.message_info.message_id:
found = True
type = msg.message.message_segment.type
combined_text.append(msg.message.processed_plain_text)
continue
if found:
keep_msgs[msg_id] = msg
elif msg.result == "F":
# 收集F消息的文本内容
if (hasattr(msg.message, 'processed_plain_text')
and msg.message.processed_plain_text):
if msg.message.message_segment.type == "text":
combined_text.append(msg.message.processed_plain_text)
elif msg.message.message_segment.type != "text":
is_update = False
elif msg.result == "U":
logger.debug(f"异常未处理信息id {msg.message.message_info.message_id}")
# 更新当前消息的processed_plain_text
if combined_text and combined_text[0] != message.processed_plain_text and is_update:
if type == "text":
message.processed_plain_text = "".join(combined_text)
logger.debug(f"整合了{len(combined_text)-1}条F消息的内容到当前消息")
elif type == "emoji":
combined_text.pop()
message.processed_plain_text = "".join(combined_text)
message.is_emoji = False
logger.debug(f"整合了{len(combined_text)-1}条F消息的内容覆盖当前emoji消息")
self.buffer_pool[person_id_] = keep_msgs
return result
except asyncio.TimeoutError:
logger.debug(f"查询超时消息id {message.message_info.message_id}")
return False
async def save_message_interval(self, person_id:str, message:BaseMessageInfo):
message_interval_list = await person_info_manager.get_value(person_id, "msg_interval_list")
now_time_ms = int(round(time.time() * 1000))
if len(message_interval_list) < 1000:
message_interval_list.append(now_time_ms)
else:
message_interval_list.pop(0)
message_interval_list.append(now_time_ms)
data = {
"platform" : message.platform,
"user_id" : message.user_info.user_id,
"nickname" : message.user_info.user_nickname,
"konw_time" : int(time.time())
}
await person_info_manager.update_one_field(person_id, "msg_interval_list", message_interval_list, data)
message_buffer = MessageBuffer()

View File

@@ -17,6 +17,7 @@ from ...message import UserInfo, Seg
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from ...chat.chat_stream import chat_manager from ...chat.chat_stream import chat_manager
from ...person_info.relationship_manager import relationship_manager from ...person_info.relationship_manager import relationship_manager
from ...chat.message_buffer import message_buffer
# 定义日志配置 # 定义日志配置
chat_config = LogConfig( chat_config = LogConfig(
@@ -143,6 +144,8 @@ class ReasoningChat:
userinfo = message.message_info.user_info userinfo = message.message_info.user_info
messageinfo = message.message_info messageinfo = message.message_info
# 消息加入缓冲池
await message_buffer.start_caching_messages(message)
# logger.info("使用推理聊天模式") # logger.info("使用推理聊天模式")
@@ -172,6 +175,17 @@ class ReasoningChat:
timer2 = time.time() timer2 = time.time()
timing_results["记忆激活"] = timer2 - timer1 timing_results["记忆激活"] = timer2 - timer1
# 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text
buffer_result = await message_buffer.query_buffer_result(message)
if not buffer_result:
if message.message_segment.type == "text":
logger.info(f"触发缓冲,已炸飞消息:{message.processed_plain_text}")
elif message.message_segment.type == "image":
logger.info("触发缓冲,已炸飞表情包/图片")
elif message.message_segment.type == "seglist":
logger.info("触发缓冲,已炸飞消息列")
return
is_mentioned = is_mentioned_bot_in_message(message) is_mentioned = is_mentioned_bot_in_message(message)
# 计算回复意愿 # 计算回复意愿

View File

@@ -18,6 +18,7 @@ from src.heart_flow.heartflow import heartflow
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from ...chat.chat_stream import chat_manager from ...chat.chat_stream import chat_manager
from ...person_info.relationship_manager import relationship_manager from ...person_info.relationship_manager import relationship_manager
from ...chat.message_buffer import message_buffer
# 定义日志配置 # 定义日志配置
chat_config = LogConfig( chat_config = LogConfig(
@@ -163,6 +164,8 @@ class ThinkFlowChat:
userinfo = message.message_info.user_info userinfo = message.message_info.user_info
messageinfo = message.message_info messageinfo = message.message_info
# 消息加入缓冲池
await message_buffer.start_caching_messages(message)
# 创建聊天流 # 创建聊天流
chat = await chat_manager.get_or_create_stream( chat = await chat_manager.get_or_create_stream(
@@ -197,8 +200,20 @@ class ThinkFlowChat:
timing_results["记忆激活"] = timer2 - timer1 timing_results["记忆激活"] = timer2 - timer1
logger.debug(f"记忆激活: {interested_rate}") logger.debug(f"记忆激活: {interested_rate}")
# 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text
buffer_result = await message_buffer.query_buffer_result(message)
if not buffer_result:
if message.message_segment.type == "text":
logger.info(f"触发缓冲,已炸飞消息:{message.processed_plain_text}")
elif message.message_segment.type == "image":
logger.info("触发缓冲,已炸飞表情包/图片")
elif message.message_segment.type == "seglist":
logger.info("触发缓冲,已炸飞消息列")
return
is_mentioned = is_mentioned_bot_in_message(message) is_mentioned = is_mentioned_bot_in_message(message)
# 计算回复意愿 # 计算回复意愿
current_willing_old = willing_manager.get_willing(chat_stream=chat) current_willing_old = willing_manager.get_willing(chat_stream=chat)
# current_willing_new = (heartflow.get_subheartflow(chat.stream_id).current_state.willing - 5) / 4 # current_willing_new = (heartflow.get_subheartflow(chat.stream_id).current_state.willing - 5) / 4

View File

@@ -162,6 +162,7 @@ class BotConfig:
emoji_chance: float = 0.2 # 发送表情包的基础概率 emoji_chance: float = 0.2 # 发送表情包的基础概率
thinking_timeout: int = 120 # 思考时间 thinking_timeout: int = 120 # 思考时间
max_response_length: int = 1024 # 最大回复长度 max_response_length: int = 1024 # 最大回复长度
message_buffer: bool = True # 消息缓冲器
ban_words = set() ban_words = set()
ban_msgs_regex = set() ban_msgs_regex = set()
@@ -505,6 +506,8 @@ class BotConfig:
if config.INNER_VERSION in SpecifierSet(">=0.0.11"): if config.INNER_VERSION in SpecifierSet(">=0.0.11"):
config.max_response_length = msg_config.get("max_response_length", config.max_response_length) config.max_response_length = msg_config.get("max_response_length", config.max_response_length)
if config.INNER_VERSION in SpecifierSet(">=1.1.4"):
config.message_buffer = msg_config.get("message_buffer", config.message_buffer)
def memory(parent: dict): def memory(parent: dict):
memory_config = parent["memory"] memory_config = parent["memory"]

View File

@@ -237,7 +237,7 @@ class MoodManager:
old_arousal = self.current_mood.arousal old_arousal = self.current_mood.arousal
old_mood = self.current_mood.text old_mood = self.current_mood.text
valence_change *= relationship_manager.gain_coefficient[relationship_manager.positive_feedback_value] valence_change = relationship_manager.feedback_to_mood(valence_change)
# 应用情绪强度 # 应用情绪强度
valence_change *= intensity valence_change *= intensity

View File

@@ -2,8 +2,14 @@ from src.common.logger import get_module_logger
from ...common.database import db from ...common.database import db
import copy import copy
import hashlib import hashlib
from typing import Any, Callable, Dict, TypeVar from typing import Any, Callable, Dict
T = TypeVar('T') # 泛型类型 import datetime
import asyncio
import numpy
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
""" """
PersonInfoManager 类方法功能摘要: PersonInfoManager 类方法功能摘要:
@@ -15,6 +21,7 @@ PersonInfoManager 类方法功能摘要:
6. get_values - 批量获取字段值(任一字段无效则返回空字典) 6. get_values - 批量获取字段值(任一字段无效则返回空字典)
7. del_all_undefined_field - 清理全集合中未定义的字段 7. del_all_undefined_field - 清理全集合中未定义的字段
8. get_specific_value_list - 根据指定条件返回person_id,value字典 8. get_specific_value_list - 根据指定条件返回person_id,value字典
9. personal_habit_deduction - 定时推断个人习惯
""" """
logger = get_module_logger("person_info") logger = get_module_logger("person_info")
@@ -30,6 +37,8 @@ person_info_default = {
# "impression" : None, # "impression" : None,
# "gender" : Unkown, # "gender" : Unkown,
"konw_time" : 0, "konw_time" : 0,
"msg_interval": 3000,
"msg_interval_list": []
} # 个人信息的各项与默认值在此定义,以下处理会自动创建/补全每一项 } # 个人信息的各项与默认值在此定义,以下处理会自动创建/补全每一项
class PersonInfoManager: class PersonInfoManager:
@@ -108,8 +117,9 @@ class PersonInfoManager:
if document and field_name in document: if document and field_name in document:
return document[field_name] return document[field_name]
else: else:
logger.debug(f"获取{person_id}{field_name}失败,已返回默认值{person_info_default[field_name]}") default_value = copy.deepcopy(person_info_default[field_name])
return person_info_default[field_name] logger.debug(f"获取{person_id}{field_name}失败,已返回默认值{default_value}")
return default_value
async def get_values(self, person_id: str, field_names: list) -> dict: async def get_values(self, person_id: str, field_names: list) -> dict:
"""获取指定person_id文档的多个字段值若不存在该字段则返回该字段的全局默认值""" """获取指定person_id文档的多个字段值若不存在该字段则返回该字段的全局默认值"""
@@ -133,7 +143,10 @@ class PersonInfoManager:
result = {} result = {}
for field in field_names: for field in field_names:
result[field] = document.get(field, person_info_default[field]) if document else person_info_default[field] result[field] = copy.deepcopy(
document.get(field, person_info_default[field])
if document else person_info_default[field]
)
return result return result
@@ -209,5 +222,76 @@ class PersonInfoManager:
except Exception as e: except Exception as e:
logger.error(f"数据库查询失败: {str(e)}", exc_info=True) logger.error(f"数据库查询失败: {str(e)}", exc_info=True)
return {} return {}
async def personal_habit_deduction(self):
"""启动个人信息推断,每天根据一定条件推断一次"""
try:
while(1):
await asyncio.sleep(60)
current_time = datetime.datetime.now()
logger.info(f"个人信息推断启动: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
person_info_manager = PersonInfoManager() # "msg_interval"推断
msg_interval_map = False
msg_interval_lists = await self.get_specific_value_list(
"msg_interval_list",
lambda x: isinstance(x, list) and len(x) >= 100
)
for person_id, msg_interval_list_ in msg_interval_lists.items():
try:
time_interval = []
for t1, t2 in zip(msg_interval_list_, msg_interval_list_[1:]):
delta = t2 - t1
if delta < 8000 and delta > 0: # 小于8秒
time_interval.append(delta)
if len(time_interval) > 30:
time_interval.sort()
# 画图(log)
msg_interval_map = True
log_dir = Path("logs/person_info")
log_dir.mkdir(parents=True, exist_ok=True)
plt.figure(figsize=(10, 6))
time_series = pd.Series(time_interval)
# 绘制直方图
plt.hist(time_series, bins=50, density=True, alpha=0.4, color='pink', label='Histogram')
# 绘制KDE曲线使用相同的实际数据
time_series.plot(kind='kde', color='mediumpurple', linewidth=1, label='Density')
plt.grid(True, alpha=0.2)
plt.xlim(0, 8000)
plt.title(f"Message Interval Distribution (User: {person_id[:8]}...)")
plt.xlabel("Interval (ms)")
plt.ylabel("Density")
plt.legend(framealpha=0.9, facecolor='white')
img_path = log_dir / f"interval_distribution_{person_id[:8]}.png"
plt.savefig(img_path)
plt.close()
# 画图
filtered_intervals = [t for t in time_interval if t >= 500]
if len(filtered_intervals) > 25:
msg_interval = int(round(numpy.percentile(filtered_intervals, 80)))
await self.update_one_field(person_id, "msg_interval", msg_interval)
logger.debug(f"用户{person_id}的msg_interval已经被更新为{msg_interval}")
except Exception as e:
logger.debug(f"处理用户{person_id}msg_interval推断时出错: {str(e)}")
continue
# 其他...
if msg_interval_map:
logger.info("已保存分布图到: logs/person_info")
logger.info(f"个人信息推断结束: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
await asyncio.sleep(86400)
except Exception as e:
logger.error(f"个人信息推断运行时出错: {str(e)}")
logger.exception("详细错误信息:")
person_info_manager = PersonInfoManager()

View File

@@ -63,7 +63,15 @@ class RelationshipManager:
value += value * mood_gain value += value * mood_gain
logger.info(f"当前relationship增益系数{mood_gain:.3f}") logger.info(f"当前relationship增益系数{mood_gain:.3f}")
return value return value
def feedback_to_mood(self, mood_value):
"""对情绪的反馈"""
coefficient = self.gain_coefficient[abs(self.positive_feedback_value)]
if (mood_value > 0 and self.positive_feedback_value > 0
or mood_value < 0 and self.positive_feedback_value < 0):
return mood_value*coefficient
else:
return mood_value/coefficient
async def calculate_update_relationship_value(self, chat_stream: ChatStream, label: str, stance: str) -> None: async def calculate_update_relationship_value(self, chat_stream: ChatStream, label: str, stance: str) -> None:
"""计算并变更关系值 """计算并变更关系值

View File

@@ -1,5 +1,5 @@
[inner] [inner]
version = "1.1.3" version = "1.1.4"
#以下是给开发人员阅读的,一般用户不需要阅读 #以下是给开发人员阅读的,一般用户不需要阅读
@@ -72,6 +72,7 @@ max_context_size = 12 # 麦麦获得的上文数量建议12太短太长都
emoji_chance = 0.2 # 麦麦使用表情包的概率 emoji_chance = 0.2 # 麦麦使用表情包的概率
thinking_timeout = 60 # 麦麦最长思考时间,超过这个时间的思考会放弃 thinking_timeout = 60 # 麦麦最长思考时间,超过这个时间的思考会放弃
max_response_length = 256 # 麦麦回答的最大token数 max_response_length = 256 # 麦麦回答的最大token数
message_buffer = true # 启用消息缓冲器?启用此项以解决消息的拆分问题,但会使麦麦的回复延迟
ban_words = [ ban_words = [
# "403","张三" # "403","张三"
] ]
@@ -236,4 +237,4 @@ pri_out = 1.26
name = "Qwen/Qwen2.5-32B-Instruct" name = "Qwen/Qwen2.5-32B-Instruct"
provider = "SILICONFLOW" provider = "SILICONFLOW"
pri_in = 1.26 pri_in = 1.26
pri_out = 1.26 pri_out = 1.26