fix:简化了身份配置

This commit is contained in:
SengokuCola
2025-05-27 10:50:47 +08:00
parent 8d2e649527
commit 57c9dacb99
9 changed files with 24 additions and 694 deletions

View File

@@ -32,6 +32,12 @@
- 示例插件:禁言插件
- 示例插件:豆包绘图插件
**人格**
- 简化了人格身份的配置
**语音**
- 麦麦可以决定自行发送语音消息需要搭配tts适配器
**新增表达方式学习**
- 自主学习群聊中的表达方式,更贴近群友
- 可自定义的学习频率和开关

View File

@@ -1,63 +0,0 @@
# -*- coding: utf-8 -*-
import asyncio
import time
import sys
import os
# 添加项目根目录到系统路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
from src.chat.memory_system.Hippocampus import HippocampusManager
from rich.traceback import install
install(extra_lines=3)
async def test_memory_system():
"""测试记忆系统的主要功能"""
try:
# 初始化记忆系统
print("开始初始化记忆系统...")
hippocampus_manager = HippocampusManager.get_instance()
hippocampus_manager.initialize()
print("记忆系统初始化完成")
# 测试记忆构建
# print("开始测试记忆构建...")
# await hippocampus_manager.build_memory()
# print("记忆构建完成")
# 测试记忆检索
test_text = "千石可乐在群里聊天"
# test_text = '''千石可乐分不清AI的陪伴和人类的陪伴,是这样吗?'''
print(f"开始测试记忆检索,测试文本: {test_text}\n")
memories = await hippocampus_manager.get_memory_from_text(
text=test_text, max_memory_num=3, max_memory_length=2, max_depth=3, fast_retrieval=False
)
await asyncio.sleep(1)
print("检索到的记忆:")
for topic, memory_items in memories:
print(f"主题: {topic}")
print(f"- {memory_items}")
except Exception as e:
print(f"测试过程中出现错误: {e}")
raise
async def main():
"""主函数"""
try:
start_time = time.time()
await test_memory_system()
end_time = time.time()
print(f"测试完成,总耗时: {end_time - start_time:.2f}")
except Exception as e:
print(f"程序执行出错: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,365 +0,0 @@
# -*- coding: utf-8 -*-
import os
import sys
import time
from pathlib import Path
import datetime
from rich.console import Console
from Hippocampus import Hippocampus # 海马体和记忆图
from dotenv import load_dotenv
from rich.traceback import install
install(extra_lines=3)
"""
我想 总有那么一个瞬间
你会想和某天才变态少女助手一样
往Bot的海马体里插上几个电极 不是吗
Let's do some dirty job.
"""
# 获取当前文件的目录
current_dir = Path(__file__).resolve().parent
# 获取项目根目录(上三层目录)
project_root = current_dir.parent.parent.parent
# env.dev文件路径
env_path = project_root / ".env.dev"
# from chat.config import global_config
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
sys.path.append(root_path)
from src.common.logger import get_module_logger # noqa E402
from common.database.database import db # noqa E402
logger = get_module_logger("mem_alter")
console = Console()
# 加载环境变量
if env_path.exists():
logger.info(f"{env_path} 加载环境变量")
load_dotenv(env_path)
else:
logger.warning(f"未找到环境变量文件: {env_path}")
logger.info("将使用默认配置")
# 查询节点信息
def query_mem_info(hippocampus: Hippocampus):
while True:
query = input("\n请输入新的查询概念(输入'退出'以结束):")
if query.lower() == "退出":
break
items_list = hippocampus.memory_graph.get_related_item(query)
if items_list:
have_memory = False
first_layer, second_layer = items_list
if first_layer:
have_memory = True
print("\n直接相关的记忆:")
for item in first_layer:
print(f"- {item}")
if second_layer:
have_memory = True
print("\n间接相关的记忆:")
for item in second_layer:
print(f"- {item}")
if not have_memory:
print("\n未找到相关记忆。")
else:
print("未找到相关记忆。")
# 增加概念节点
def add_mem_node(hippocampus: Hippocampus):
while True:
concept = input("请输入节点概念名:\n")
result = db.graph_data.nodes.count_documents({"concept": concept})
if result != 0:
console.print("[yellow]已存在名为“{concept}”的节点,行为已取消[/yellow]")
continue
memory_items = list()
while True:
context = input("请输入节点描述信息(输入'终止'以结束)")
if context.lower() == "终止":
break
memory_items.append(context)
current_time = datetime.datetime.now().timestamp()
hippocampus.memory_graph.G.add_node(
concept, memory_items=memory_items, created_time=current_time, last_modified=current_time
)
# 删除概念节点(及连接到它的边)
def remove_mem_node(hippocampus: Hippocampus):
concept = input("请输入节点概念名:\n")
result = db.graph_data.nodes.count_documents({"concept": concept})
if result == 0:
console.print(f"[red]不存在名为“{concept}”的节点[/red]")
edges = db.graph_data.edges.find({"$or": [{"source": concept}, {"target": concept}]})
for edge in edges:
console.print(f"[yellow]存在边“{edge['source']} -> {edge['target']}”, 请慎重考虑[/yellow]")
console.print(f"[yellow]确定要移除名为“{concept}”的节点以及其相关边吗[/yellow]")
destory = console.input(f"[red]请输入“{concept}”以删除节点 其他输入将被视为取消操作[/red]\n")
if destory == concept:
hippocampus.memory_graph.G.remove_node(concept)
else:
logger.info("[green]删除操作已取消[/green]")
# 增加节点间边
def add_mem_edge(hippocampus: Hippocampus):
while True:
source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n")
if source.lower() == "退出":
break
if db.graph_data.nodes.count_documents({"concept": source}) == 0:
console.print(f"[yellow]“{source}”节点不存在,操作已取消。[/yellow]")
continue
target = input("请输入 **第二个节点** 名称:\n")
if db.graph_data.nodes.count_documents({"concept": target}) == 0:
console.print(f"[yellow]“{target}”节点不存在,操作已取消。[/yellow]")
continue
if source == target:
console.print(f"[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]")
continue
hippocampus.memory_graph.connect_dot(source, target)
edge = hippocampus.memory_graph.G.get_edge_data(source, target)
if edge["strength"] == 1:
console.print(f"[green]成功创建边“{source} <-> {target}默认权重1[/green]")
else:
console.print(
f"[yellow]边“{source} <-> {target}”已存在,"
f"更新权重: {edge['strength'] - 1} <-> {edge['strength']}[/yellow]"
)
# 删除节点间边
def remove_mem_edge(hippocampus: Hippocampus):
while True:
source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n")
if source.lower() == "退出":
break
if db.graph_data.nodes.count_documents({"concept": source}) == 0:
console.print("[yellow]“{source}”节点不存在,操作已取消。[/yellow]")
continue
target = input("请输入 **第二个节点** 名称:\n")
if db.graph_data.nodes.count_documents({"concept": target}) == 0:
console.print("[yellow]“{target}”节点不存在,操作已取消。[/yellow]")
continue
if source == target:
console.print("[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]")
continue
edge = hippocampus.memory_graph.G.get_edge_data(source, target)
if edge is None:
console.print("[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]")
continue
else:
accept = console.input("[orange]请输入“确认”以确认删除操作(其他输入视为取消)[/orange]\n")
if accept.lower() == "确认":
hippocampus.memory_graph.G.remove_edge(source, target)
console.print(f"[green]边“{source} <-> {target}”已删除。[green]")
# 修改节点信息
def alter_mem_node(hippocampus: Hippocampus):
batch_environment = dict()
while True:
concept = input("请输入节点概念名(输入'终止'以结束):\n")
if concept.lower() == "终止":
break
_, node = hippocampus.memory_graph.get_dot(concept)
if node is None:
console.print(f"[yellow]“{concept}”节点不存在,操作已取消。[/yellow]")
continue
console.print("[yellow]注意,请确保你知道自己在做什么[/yellow]")
console.print("[yellow]你将获得一个执行任意代码的环境[/yellow]")
console.print("[red]你已经被警告过了。[/red]\n")
node_environment = {"concept": "<节点名>", "memory_items": "<记忆文本数组>"}
console.print(
"[green]环境变量中会有env与batchEnv两个dict, env在切换节点时会清空, batchEnv在操作终止时才会清空[/green]"
)
console.print(
f"[green] env 会被初始化为[/green]\n{node_environment}\n[green]且会在用户代码执行完毕后被提交 [/green]"
)
console.print(
"[yellow]为便于书写临时脚本请手动在输入代码通过Ctrl+C等方式触发KeyboardInterrupt来结束代码执行[/yellow]"
)
# 拷贝数据以防操作炸了
node_environment = dict(node)
node_environment["concept"] = concept
while True:
def user_exec(script, env, batch_env):
return eval(script, env, batch_env)
try:
command = console.input()
except KeyboardInterrupt:
# 稍微防一下小天才
try:
if isinstance(node_environment["memory_items"], list):
node["memory_items"] = node_environment["memory_items"]
else:
raise Exception
except Exception as e:
console.print(
f"[red]我不知道你做了什么但显然nodeEnviroment['memory_items']已经不是个数组了,"
f"操作已取消: {str(e)}[/red]"
)
break
try:
user_exec(command, node_environment, batch_environment)
except Exception as e:
console.print(e)
console.print(
"[red]自定义代码执行时发生异常,已捕获,请重试(可通过 console.print(locals()) 检查环境状态)[/red]"
)
# 修改边信息
def alter_mem_edge(hippocampus: Hippocampus):
batch_enviroment = dict()
while True:
source = input("请输入 **第一个节点** 名称(输入'终止'以结束):\n")
if source.lower() == "终止":
break
if hippocampus.memory_graph.get_dot(source) is None:
console.print(f"[yellow]“{source}”节点不存在,操作已取消。[/yellow]")
continue
target = input("请输入 **第二个节点** 名称:\n")
if hippocampus.memory_graph.get_dot(target) is None:
console.print(f"[yellow]“{target}”节点不存在,操作已取消。[/yellow]")
continue
edge = hippocampus.memory_graph.G.get_edge_data(source, target)
if edge is None:
console.print(f"[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]")
continue
console.print("[yellow]注意,请确保你知道自己在做什么[/yellow]")
console.print("[yellow]你将获得一个执行任意代码的环境[/yellow]")
console.print("[red]你已经被警告过了。[/red]\n")
edge_environment = {"source": "<节点名>", "target": "<节点名>", "strength": "<强度值,装在一个list里>"}
console.print(
"[green]环境变量中会有env与batchEnv两个dict, env在切换节点时会清空, batchEnv在操作终止时才会清空[/green]"
)
console.print(
f"[green] env 会被初始化为[/green]\n{edge_environment}\n[green]且会在用户代码执行完毕后被提交 [/green]"
)
console.print(
"[yellow]为便于书写临时脚本请手动在输入代码通过Ctrl+C等方式触发KeyboardInterrupt来结束代码执行[/yellow]"
)
# 拷贝数据以防操作炸了
edge_environment["strength"] = [edge["strength"]]
edge_environment["source"] = source
edge_environment["target"] = target
while True:
def user_exec(script, env, batch_env):
return eval(script, env, batch_env)
try:
command = console.input()
except KeyboardInterrupt:
# 稍微防一下小天才
try:
if isinstance(edge_environment["strength"][0], int):
edge["strength"] = edge_environment["strength"][0]
else:
raise Exception
except Exception as e:
console.print(
f"[red]我不知道你做了什么但显然edgeEnviroment['strength']已经不是个int了"
f"操作已取消: {str(e)}[/red]"
)
break
try:
user_exec(command, edge_environment, batch_enviroment)
except Exception as e:
console.print(e)
console.print(
"[red]自定义代码执行时发生异常,已捕获,请重试(可通过 console.print(locals()) 检查环境状态)[/red]"
)
async def main():
start_time = time.time()
# 创建海马体
hippocampus = Hippocampus()
# 从数据库同步数据
hippocampus.entorhinal_cortex.sync_memory_from_db()
end_time = time.time()
logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m")
while True:
try:
query = int(
input(
"""请输入操作类型
0 -> 查询节点; 1 -> 增加节点; 2 -> 移除节点; 3 -> 增加边; 4 -> 移除边;
5 -> 修改节点; 6 -> 修改边; 其他任意输入 -> 退出
"""
)
)
except ValueError:
query = -1
if query == 0:
query_mem_info(hippocampus.memory_graph)
elif query == 1:
add_mem_node(hippocampus)
elif query == 2:
remove_mem_node(hippocampus)
elif query == 3:
add_mem_edge(hippocampus)
elif query == 4:
remove_mem_edge(hippocampus)
elif query == 5:
alter_mem_node(hippocampus)
elif query == 6:
alter_mem_edge(hippocampus)
else:
print("已结束操作")
break
hippocampus.entorhinal_cortex.sync_memory_to_db()
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View File

@@ -1,126 +0,0 @@
import asyncio
import os
import time
from typing import Tuple, Union
import aiohttp
import requests
from src.common.logger import get_module_logger
from rich.traceback import install
install(extra_lines=3)
logger = get_module_logger("offline_llm")
class LLMRequestOff:
def __init__(self, model_name="deepseek-ai/DeepSeek-V3", **kwargs):
self.model_name = model_name
self.params = kwargs
self.api_key = os.getenv("SILICONFLOW_KEY")
self.base_url = os.getenv("SILICONFLOW_BASE_URL")
if not self.api_key or not self.base_url:
raise ValueError("环境变量未正确加载SILICONFLOW_KEY 或 SILICONFLOW_BASE_URL 未设置")
logger.info(f"API URL: {self.base_url}") # 使用 logger 记录 base_url
def generate_response(self, prompt: str) -> Union[str, Tuple[str, str]]:
"""根据输入的提示生成模型的响应"""
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
# 构建请求体
data = {
"model": self.model_name,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5,
**self.params,
}
# 发送请求到完整的 chat/completions 端点
api_url = f"{self.base_url.rstrip('/')}/chat/completions"
logger.info(f"Request URL: {api_url}") # 记录请求的 URL
max_retries = 3
base_wait_time = 15 # 基础等待时间(秒)
for retry in range(max_retries):
try:
response = requests.post(api_url, headers=headers, json=data)
if response.status_code == 429:
wait_time = base_wait_time * (2**retry) # 指数退避
logger.warning(f"遇到请求限制(429),等待{wait_time}秒后重试...")
time.sleep(wait_time)
continue
response.raise_for_status() # 检查其他响应状态
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
reasoning_content = result["choices"][0]["message"].get("reasoning_content", "")
return content, reasoning_content
return "没有返回结果", ""
except Exception as e:
if retry < max_retries - 1: # 如果还有重试机会
wait_time = base_wait_time * (2**retry)
logger.error(f"[回复]请求失败,等待{wait_time}秒后重试... 错误: {str(e)}")
time.sleep(wait_time)
else:
logger.error(f"请求失败: {str(e)}")
return f"请求失败: {str(e)}", ""
logger.error("达到最大重试次数,请求仍然失败")
return "达到最大重试次数,请求仍然失败", ""
async def generate_response_async(self, prompt: str) -> Union[str, Tuple[str, str]]:
"""异步方式根据输入的提示生成模型的响应"""
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
# 构建请求体
data = {
"model": self.model_name,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5,
**self.params,
}
# 发送请求到完整的 chat/completions 端点
api_url = f"{self.base_url.rstrip('/')}/chat/completions"
logger.info(f"Request URL: {api_url}") # 记录请求的 URL
max_retries = 3
base_wait_time = 15
async with aiohttp.ClientSession() as session:
for retry in range(max_retries):
try:
async with session.post(api_url, headers=headers, json=data) as response:
if response.status == 429:
wait_time = base_wait_time * (2**retry) # 指数退避
logger.warning(f"遇到请求限制(429),等待{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
continue
response.raise_for_status() # 检查其他响应状态
result = await response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
reasoning_content = result["choices"][0]["message"].get("reasoning_content", "")
return content, reasoning_content
return "没有返回结果", ""
except Exception as e:
if retry < max_retries - 1: # 如果还有重试机会
wait_time = base_wait_time * (2**retry)
logger.error(f"[回复]请求失败,等待{wait_time}秒后重试... 错误: {str(e)}")
await asyncio.sleep(wait_time)
else:
logger.error(f"请求失败: {str(e)}")
return f"请求失败: {str(e)}", ""
logger.error("达到最大重试次数,请求仍然失败")
return "达到最大重试次数,请求仍然失败", ""

View File

@@ -1,4 +1,4 @@
from ..person_info.person_info import person_info_manager
from src.person_info.person_info import person_info_manager
from src.common.logger_manager import get_logger
import asyncio
from dataclasses import dataclass, field

View File

@@ -41,21 +41,6 @@ class PersonalityConfig(ConfigBase):
class IdentityConfig(ConfigBase):
"""个体特征配置类"""
height: int = 170
"""身高(单位:厘米)"""
weight: float = 50
"""体重(单位:千克)"""
age: int = 18
"""年龄(单位:岁)"""
gender: str = ""
"""性别(男/女)"""
appearance: str = "可爱"
"""外貌描述"""
identity_detail: list[str] = field(default_factory=lambda: [])
"""身份特征"""

View File

@@ -7,99 +7,24 @@ class Identity:
"""身份特征类"""
identity_detail: List[str] # 身份细节描述
height: int # 身高(厘米)
weight: float # 体重(千克)
age: int # 年龄
gender: str # 性别
appearance: str # 外貌特征
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
identity_detail: List[str] = None,
height: int = 0,
weight: float = 0,
age: int = 0,
gender: str = "",
appearance: str = "",
):
def __init__(self, identity_detail: List[str] = None):
"""初始化身份特征
Args:
identity_detail: 身份细节描述列表
height: 身高(厘米)
weight: 体重(千克)
age: 年龄
gender: 性别
appearance: 外貌特征
"""
if identity_detail is None:
identity_detail = []
self.identity_detail = identity_detail
self.height = height
self.weight = weight
self.age = age
self.gender = gender
self.appearance = appearance
@classmethod
def get_instance(cls) -> "Identity":
"""获取Identity单例实例
Returns:
Identity: 单例实例
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def initialize(
cls, identity_detail: List[str], height: int, weight: float, age: int, gender: str, appearance: str
) -> "Identity":
"""初始化身份特征
Args:
identity_detail: 身份细节描述列表
height: 身高(厘米)
weight: 体重(千克)
age: 年龄
gender: 性别
appearance: 外貌特征
Returns:
Identity: 初始化后的身份特征实例
"""
instance = cls.get_instance()
instance.identity_detail = identity_detail
instance.height = height
instance.weight = weight
instance.age = age
instance.gender = gender
instance.appearance = appearance
return instance
def to_dict(self) -> dict:
"""将身份特征转换为字典格式"""
return {
"identity_detail": self.identity_detail,
"height": self.height,
"weight": self.weight,
"age": self.age,
"gender": self.gender,
"appearance": self.appearance,
}
@classmethod
def from_dict(cls, data: dict) -> "Identity":
"""从字典创建身份特征实例"""
instance = cls.get_instance()
for key, value in data.items():
setattr(instance, key, value)
return instance
return cls(identity_detail=data.get("identity_detail", []))

View File

@@ -1,6 +1,4 @@
from typing import Optional
from numpy import double
from .personality import Personality
from .identity import Identity
from .expression_style import PersonalityExpression
@@ -27,11 +25,6 @@ class Individuality:
personality_core: str,
personality_sides: list,
identity_detail: list,
height: int,
weight: double,
age: int,
gender: str,
appearance: str,
) -> None:
"""初始化个体特征
@@ -40,11 +33,6 @@ class Individuality:
personality_core: 人格核心特点
personality_sides: 人格侧面描述
identity_detail: 身份细节描述
height: 身高(厘米)
weight: 体重(千克)
age: 年龄
gender: 性别
appearance: 外貌特征
"""
# 初始化人格
self.personality = Personality.initialize(
@@ -52,9 +40,7 @@ class Individuality:
)
# 初始化身份
self.identity = Identity.initialize(
identity_detail=identity_detail, height=height, weight=weight, age=age, gender=gender, appearance=appearance
)
self.identity = Identity(identity_detail=identity_detail)
await self.express_style.extract_and_store_personality_expressions()
@@ -120,7 +106,7 @@ class Individuality:
获取身份特征的prompt
Args:
level (int): 详细程度 (1: 随机细节, 2: 所有细节+外貌年龄性别, 3: 同2)
level (int): 详细程度 (1: 随机细节, 2: 所有细节, 3: 同2)
x_person (int, optional): 人称代词 (0: 无人称, 1: 我, 2: 你). 默认为 2.
Returns:
@@ -145,23 +131,10 @@ class Individuality:
identity_detail = list(self.identity.identity_detail)
random.shuffle(identity_detail)
if level == 1:
identity_parts.append(f"身份是{identity_detail[0]}")
identity_parts.append(f"{identity_detail[0]}")
elif level >= 2:
details_str = "".join(identity_detail)
identity_parts.append(f"身份是{details_str}")
# 根据level添加其他身份信息
if level >= 3:
if self.identity.appearance:
identity_parts.append(f"{self.identity.appearance}")
if self.identity.age > 0:
identity_parts.append(f"年龄大约{self.identity.age}")
if self.identity.gender:
identity_parts.append(f"性别是{self.identity.gender}")
if self.identity.height:
identity_parts.append(f"身高大约{self.identity.height}厘米")
if self.identity.weight:
identity_parts.append(f"体重大约{self.identity.weight}千克")
identity_parts.append(f"{details_str}")
if identity_parts:
details_str = "".join(identity_parts)

View File

@@ -1,5 +1,5 @@
[inner]
version = "2.4.0"
version = "2.5.0"
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
#如果你想要修改配置文件请在修改后将version的值进行变更
@@ -25,21 +25,18 @@ personality_sides = [
"用一句话或几句话描述人格的一些细节",
"用一句话或几句话描述人格的一些细节",
"用一句话或几句话描述人格的一些细节",
]# 条数任意不能为0, 该选项还在调试中,可能未完全生效
# 身份特点 部分选项仅在 专注聊天 有效
[identity] #アイデンティティがない 生まれないらららら
identity_detail = [
"身份特点",
"身份特点",
]# 条数任意不能为0
#外貌特征
age = 18 # 年龄 单位岁
gender = "女" # 性别
height = "170" # 身高单位cm
weight = "50" # 体重单位kg
appearance = "用一句或几句话描述外貌特征" # 外貌特征
# 身份特点
[identity] #アイデンティティがない 生まれないらららら
identity_detail = [
"年龄为19岁",
"是女孩子",
"身高为160cm",
"有橙色的短发",
]
# 可以描述外贸,性别,身高,职业,属性等等描述
# 条数任意不能为0
[chat] #麦麦的聊天通用设置
chat_mode = "normal" # 聊天模式 —— 普通模式normal专注模式focus在普通模式和专注模式之间自动切换
@@ -96,8 +93,6 @@ self_identify_processor = true # 是否启用自我识别处理器
tool_use_processor = true # 是否启用工具使用处理器
working_memory_processor = true # 是否启用工作记忆处理器
[expression]
# 表达方式
expression_style = "描述麦麦说话的表达风格,表达习惯"