Merge branch 'debug' into debug

This commit is contained in:
SengokuCola
2025-03-12 00:29:59 +08:00
committed by GitHub
27 changed files with 1023 additions and 256 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
data/
data1/
mongodb/
NapCat.Framework.Windows.Once/
log/

2
bot.py
View File

@@ -63,7 +63,7 @@ def init_env():
# 首先加载基础环境变量.env
if os.path.exists(".env"):
load_dotenv(".env")
load_dotenv(".env",override=True)
logger.success("成功加载基础环境变量配置")

59
config/auto_update.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import shutil
import tomlkit
from pathlib import Path
def update_config():
# 获取根目录路径
root_dir = Path(__file__).parent.parent
template_dir = root_dir / "template"
config_dir = root_dir / "config"
# 定义文件路径
template_path = template_dir / "bot_config_template.toml"
old_config_path = config_dir / "bot_config.toml"
new_config_path = config_dir / "bot_config.toml"
# 读取旧配置文件
old_config = {}
if old_config_path.exists():
with open(old_config_path, "r", encoding="utf-8") as f:
old_config = tomlkit.load(f)
# 删除旧的配置文件
if old_config_path.exists():
os.remove(old_config_path)
# 复制模板文件到配置目录
shutil.copy2(template_path, new_config_path)
# 读取新配置文件
with open(new_config_path, "r", encoding="utf-8") as f:
new_config = tomlkit.load(f)
# 递归更新配置
def update_dict(target, source):
for key, value in source.items():
# 跳过version字段的更新
if key == "version":
continue
if key in target:
if isinstance(value, dict) and isinstance(target[key], (dict, tomlkit.items.Table)):
update_dict(target[key], value)
else:
try:
# 直接使用tomlkit的item方法创建新值
target[key] = tomlkit.item(value)
except (TypeError, ValueError):
# 如果转换失败,直接赋值
target[key] = value
# 将旧配置的值更新到新配置中
update_dict(new_config, old_config)
# 保存更新后的配置(保留注释和格式)
with open(new_config_path, "w", encoding="utf-8") as f:
f.write(tomlkit.dumps(new_config))
if __name__ == "__main__":
update_config()

View File

@@ -0,0 +1,444 @@
# 面向纯新手的Linux服务器麦麦部署指南
## 你得先有一个服务器
为了能使麦麦在你的电脑关机之后还能运行,你需要一台不间断开机的主机,也就是我们常说的服务器。
华为云、阿里云、腾讯云等等都是在国内可以选择的选择。
你可以去租一台最低配置的就足敷需要了,按月租大概十几块钱就能租到了。
我们假设你已经租好了一台Linux架构的云服务器。我用的是阿里云ubuntu24.04,其他的原理相似。
## 0.我们就从零开始吧
### 网络问题
为访问github相关界面推荐去下一款加速器新手可以试试watttoolkit。
### 安装包下载
#### MongoDB
对于ubuntu24.04 x86来说是这个
https://repo.mongodb.org/apt/ubuntu/dists/noble/mongodb-org/8.0/multiverse/binary-amd64/mongodb-org-server_8.0.5_amd64.deb
如果不是就在这里自行选择对应版本
https://www.mongodb.com/try/download/community-kubernetes-operator
#### Napcat
在这里选择对应版本。
https://github.com/NapNeko/NapCatQQ/releases/tag/v4.6.7
对于ubuntu24.04 x86来说是这个
https://dldir1.qq.com/qqfile/qq/QQNT/ee4bd910/linuxqq_3.2.16-32793_amd64.deb
#### 麦麦
https://github.com/SengokuCola/MaiMBot/archive/refs/tags/0.5.8-alpha.zip
下载这个官方压缩包。
### 路径
我把麦麦相关文件放在了/moi/mai里面你可以凭喜好更改记得适当调整下面涉及到的部分即可。
文件结构:
```
moi
└─ mai
├─ linuxqq_3.2.16-32793_amd64.deb
├─ mongodb-org-server_8.0.5_amd64.deb
└─ bot
└─ MaiMBot-0.5.8-alpha.zip
```
### 网络
你可以在你的服务器控制台网页更改防火墙规则允许6099808027017这几个端口的出入。
## 1.正式开始!
远程连接你的服务器你会看到一个黑框框闪着白方格这就是我们要进行设置的场所——终端了。以下的bash命令都是在这里输入。
## 2. Python的安装
- 导入 Python 的稳定版 PPA
```bash
sudo add-apt-repository ppa:deadsnakes/ppa
```
- 导入 PPA 后,更新 APT 缓存:
```bash
sudo apt update
```
- 在「终端」中执行以下命令来安装 Python 3.12
```bash
sudo apt install python3.12
```
- 验证安装是否成功:
```bash
python3.12 --version
```
- 在「终端」中,执行以下命令安装 pip
```bash
sudo apt install python3-pip
```
- 检查Pip是否安装成功
```bash
pip --version
```
- 安装必要组件
``` bash
sudo apt install python-is-python3
```
## 3.MongoDB的安装
``` bash
cd /moi/mai
```
``` bash
dpkg -i mongodb-org-server_8.0.5_amd64.deb
```
``` bash
mkdir -p /root/data/mongodb/{data,log}
```
## 4.MongoDB的运行
```bash
service mongod start
```
```bash
systemctl status mongod #通过这条指令检查运行状态
```
有需要的话可以把这个服务注册成开机自启
```bash
sudo systemctl enable mongod
```
## 5.napcat的安装
``` bash
curl -o napcat.sh https://nclatest.znin.net/NapNeko/NapCat-Installer/main/script/install.sh && sudo bash napcat.sh
```
上面的不行试试下面的
``` bash
dpkg -i linuxqq_3.2.16-32793_amd64.deb
apt-get install -f
dpkg -i linuxqq_3.2.16-32793_amd64.deb
```
成功的标志是输入``` napcat ```出来炫酷的彩虹色界面
## 6.napcat的运行
此时你就可以根据提示在```napcat```里面登录你的QQ号了。
```bash
napcat start <你的QQ号>
napcat status #检查运行状态
```
然后你就可以登录napcat的webui进行设置了
```http://<你服务器的公网IP>:6099/webui?token=napcat```
第一次是这个后续改了密码之后token就会对应修改。你也可以使用```napcat log <你的QQ号>```来查看webui地址。把里面的```127.0.0.1```改成<你服务器的公网IP>即可。
登录上之后在网络配置界面添加websocket客户端名称随便输一个url改成`ws://127.0.0.1:8080/onebot/v11/ws`保存之后点启用,就大功告成了。
## 7.麦麦的安装
### step 1 安装解压软件
```
sudo apt-get install unzip
```
### step 2 解压文件
```bash
cd /moi/mai/bot # 注意:要切换到压缩包的目录中去
unzip MaiMBot-0.5.8-alpha.zip
```
### step 3 进入虚拟环境安装库
```bash
cd /moi/mai/bot
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
### step 4 试运行
```bash
cd /moi/mai/bot
python -m venv venv
source venv/bin/activate
python bot.py
```
肯定运行不成功,不过你会发现结束之后多了一些文件
```
bot
├─ .env.prod
└─ config
└─ bot_config.toml
```
你要会vim直接在终端里修改也行不过也可以把它们下到本地改好再传上去
### step 5 文件配置
本项目需要配置两个主要文件:
1. `.env.prod` - 配置API服务和系统环境
2. `bot_config.toml` - 配置机器人行为和模型
#### API
你可以注册一个硅基流动的账号通过邀请码注册有14块钱的免费额度https://cloud.siliconflow.cn/i/7Yld7cfg。
#### 在.env.prod中定义API凭证
```
# API凭证配置
SILICONFLOW_KEY=your_key # 硅基流动API密钥
SILICONFLOW_BASE_URL=https://api.siliconflow.cn/v1/ # 硅基流动API地址
DEEP_SEEK_KEY=your_key # DeepSeek API密钥
DEEP_SEEK_BASE_URL=https://api.deepseek.com/v1 # DeepSeek API地址
CHAT_ANY_WHERE_KEY=your_key # ChatAnyWhere API密钥
CHAT_ANY_WHERE_BASE_URL=https://api.chatanywhere.tech/v1 # ChatAnyWhere API地址
```
#### 在bot_config.toml中引用API凭证
```
[model.llm_reasoning]
name = "Pro/deepseek-ai/DeepSeek-R1"
base_url = "SILICONFLOW_BASE_URL" # 引用.env.prod中定义的地址
key = "SILICONFLOW_KEY" # 引用.env.prod中定义的密钥
```
如需切换到其他API服务只需修改引用
```
[model.llm_reasoning]
name = "Pro/deepseek-ai/DeepSeek-R1"
base_url = "DEEP_SEEK_BASE_URL" # 切换为DeepSeek服务
key = "DEEP_SEEK_KEY" # 使用DeepSeek密钥
```
#### 配置文件详解
##### 环境配置文件 (.env.prod)
```
# API配置
SILICONFLOW_KEY=your_key
SILICONFLOW_BASE_URL=https://api.siliconflow.cn/v1/
DEEP_SEEK_KEY=your_key
DEEP_SEEK_BASE_URL=https://api.deepseek.com/v1
CHAT_ANY_WHERE_KEY=your_key
CHAT_ANY_WHERE_BASE_URL=https://api.chatanywhere.tech/v1
# 服务配置
HOST=127.0.0.1 # 如果使用Docker部署需要改成0.0.0.0否则QQ消息无法传入
PORT=8080
# 数据库配置
MONGODB_HOST=127.0.0.1 # 如果使用Docker部署需要改成数据库容器的名字默认是mongodb
MONGODB_PORT=27017
DATABASE_NAME=MegBot
MONGODB_USERNAME = "" # 数据库用户名
MONGODB_PASSWORD = "" # 数据库密码
MONGODB_AUTH_SOURCE = "" # 认证数据库
# 插件配置
PLUGINS=["src2.plugins.chat"]
```
##### 机器人配置文件 (bot_config.toml)
```
[bot]
qq = "机器人QQ号" # 必填
nickname = "麦麦" # 机器人昵称(你希望机器人怎么称呼它自己)
[personality]
prompt_personality = [
"曾经是一个学习地质的女大学生,现在学习心理学和脑科学,你会刷贴吧",
"是一个女大学生,你有黑色头发,你会刷小红书"
]
prompt_schedule = "一个曾经学习地质,现在学习心理学和脑科学的女大学生喜欢刷qq贴吧知乎和小红书"
[message]
min_text_length = 2 # 最小回复长度
max_context_size = 15 # 上下文记忆条数
emoji_chance = 0.2 # 表情使用概率
ban_words = [] # 禁用词列表
[emoji]
auto_save = true # 自动保存表情
enable_check = false # 启用表情审核
check_prompt = "符合公序良俗"
[groups]
talk_allowed = [] # 允许对话的群号
talk_frequency_down = [] # 降低回复频率的群号
ban_user_id = [] # 禁止回复的用户QQ号
[others]
enable_advance_output = true # 启用详细日志
enable_kuuki_read = true # 启用场景理解
# 模型配置
[model.llm_reasoning] # 推理模型
name = "Pro/deepseek-ai/DeepSeek-R1"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
[model.llm_reasoning_minor] # 轻量推理模型
name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
[model.llm_normal] # 对话模型
name = "Pro/deepseek-ai/DeepSeek-V3"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
[model.llm_normal_minor] # 备用对话模型
name = "deepseek-ai/DeepSeek-V2.5"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
[model.vlm] # 图像识别模型
name = "deepseek-ai/deepseek-vl2"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
[model.embedding] # 文本向量模型
name = "BAAI/bge-m3"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
[topic.llm_topic]
name = "Pro/deepseek-ai/DeepSeek-V3"
base_url = "SILICONFLOW_BASE_URL"
key = "SILICONFLOW_KEY"
```
**step # 6** 运行
现在再运行
```bash
cd /moi/mai/bot
python -m venv venv
source venv/bin/activate
python bot.py
```
应该就能运行成功了。
## 8.事后配置
可是现在还有个问题只要你一关闭终端bot.py就会停止运行。那该怎么办呢我们可以把bot.py注册成服务。
重启服务器打开MongoDB和napcat服务。
新建一个文件,名为`bot.service`,内容如下
```
[Unit]
Description=maimai bot
[Service]
WorkingDirectory=/moi/mai/bot
ExecStart=/moi/mai/bot/venv/bin/python /moi/mai/bot/bot.py
Restart=on-failure
User=root
[Install]
WantedBy=multi-user.target
```
里面的路径视自己的情况更改。
把它放到`/etc/systemd/system`里面。
重新加载 `systemd` 配置:
```bash
sudo systemctl daemon-reload
```
启动服务:
```bash
sudo systemctl start bot.service # 启动服务
sudo systemctl restart bot.service # 或者重启服务
```
检查服务状态:
```bash
sudo systemctl status bot.service
```
现在再关闭终端检查麦麦能不能正常回复QQ信息。如果可以的话就大功告成了
## 9.命令速查
```bash
service mongod start # 启动mongod服务
napcat start <你的QQ号> # 登录napcat
cd /moi/mai/bot # 切换路径
python -m venv venv # 创建虚拟环境
source venv/bin/activate # 激活虚拟环境
sudo systemctl daemon-reload # 重新加载systemd配置
sudo systemctl start bot.service # 启动bot服务
sudo systemctl enable bot.service # 启动bot服务
sudo systemctl status bot.service # 检查bot服务状态
```
```
python bot.py
```

View File

@@ -21,6 +21,7 @@
buildInputs = [
pythonPackages.python
pythonPackages.venvShellHook
pythonPackages.numpy
];
postVenvCreation = ''

Binary file not shown.

12
run.py
View File

@@ -128,13 +128,17 @@ if __name__ == "__main__":
)
os.system("cls")
if choice == "1":
install_napcat()
install_mongodb()
confirm = input("首次安装将下载并配置所需组件\n1.确认\n2.取消\n")
if confirm == "1":
install_napcat()
install_mongodb()
else:
print("已取消安装")
elif choice == "2":
run_maimbot()
choice = input("是否启动推理可视化y/N").upper()
choice = input("是否启动推理可视化?(未完善)(y/N").upper()
if choice == "Y":
run_cmd(r"python src\gui\reasoning_gui.py")
choice = input("是否启动记忆可视化y/N").upper()
choice = input("是否启动记忆可视化?(未完善)(y/N").upper()
if choice == "Y":
run_cmd(r"python src/plugins/memory_system/memory_manual_build.py")

View File

@@ -1,8 +1,6 @@
from typing import Optional
from pymongo import MongoClient
class Database:
_instance: Optional["Database"] = None
@@ -51,24 +49,3 @@ class Database:
if cls._instance is None:
raise RuntimeError("Database not initialized")
return cls._instance
#测试用
def get_random_group_messages(self, group_id: str, limit: int = 5):
# 先随机获取一条消息
random_message = list(self.db.messages.aggregate([
{"$match": {"group_id": group_id}},
{"$sample": {"size": 1}}
]))[0]
# 获取该消息之后的消息
subsequent_messages = list(self.db.messages.find({
"group_id": group_id,
"time": {"$gt": random_message["time"]}
}).sort("time", 1).limit(limit))
# 将随机消息和后续消息合并
messages = [random_message] + subsequent_messages
return messages

View File

@@ -12,7 +12,6 @@ from nonebot.adapters.onebot.v11 import (
from ..memory_system.memory import hippocampus
from ..moods.moods import MoodManager # 导入情绪管理器
from .config import global_config
from .cq_code import CQCode, cq_code_tool # 导入CQCode模块
from .emoji_manager import emoji_manager # 导入表情包管理器
from .llm_generator import ResponseGenerator
from .message import MessageSending, MessageRecv, MessageThinking, MessageSet
@@ -244,7 +243,9 @@ class ChatBot:
# message_set 可以直接加入 message_manager
# print(f"\033[1;32m[回复]\033[0m 将回复载入发送容器")
logger.debug("添加message_set到message_manager")
message_manager.add_message(message_set)
bot_response_time = thinking_time_point

View File

@@ -161,6 +161,7 @@ class EmojiManager:
{'_id': selected_emoji['_id']},
{'$inc': {'usage_count': 1}}
)
logger.success(
f"找到匹配的表情包: {selected_emoji.get('description', '无描述')} (相似度: {similarity:.4f})")
# 稍微改一下文本描述,不然容易产生幻觉,描述已经包含 表情包 了
@@ -176,8 +177,10 @@ class EmojiManager:
logger.error(f"获取表情包失败: {str(e)}")
return None
async def _get_emoji_discription(self, image_base64: str) -> str:
"""获取表情包的标签使用image_manager的描述生成功能"""
try:
# 使用image_manager获取描述去掉前后的方括号和"表情包:"前缀
description = await image_manager.get_emoji_description(image_base64)
@@ -272,11 +275,14 @@ class EmojiManager:
# 获取表情包的描述
description = await self._get_emoji_discription(image_base64)
if global_config.EMOJI_CHECK:
check = await self._check_emoji(image_base64)
if '' not in check:
os.remove(image_path)
logger.info(f"描述: {description}")
logger.info(f"描述: {description}")
logger.info(f"其不满足过滤规则,被剔除 {check}")
continue
@@ -287,6 +293,7 @@ class EmojiManager:
if description is not None:
embedding = await get_embedding(description)
# 准备数据库记录
emoji_record = {
'filename': filename,
@@ -302,6 +309,7 @@ class EmojiManager:
logger.success(f"注册新表情包: {filename}")
logger.info(f"描述: {description}")
# 保存到images数据库
image_doc = {
'hash': image_hash,
@@ -389,5 +397,7 @@ class EmojiManager:
# 创建全局单例
emoji_manager = EmojiManager()

View File

@@ -8,7 +8,7 @@ from loguru import logger
from ...common.database import Database
from ..models.utils_model import LLM_request
from .config import global_config
from .message import MessageRecv, MessageThinking, MessageSending,Message
from .message import MessageRecv, MessageThinking, Message
from .prompt_builder import prompt_builder
from .relationship_manager import relationship_manager
from .utils import process_llm_response

View File

@@ -3,12 +3,13 @@ import html
import re
import json
from dataclasses import dataclass
from typing import Dict, ForwardRef, List, Optional, Union
from typing import Dict, List, Optional
import urllib3
from loguru import logger
from .utils_image import image_manager
from .message_base import Seg, GroupInfo, UserInfo, BaseMessageInfo, MessageBase
from .chat_stream import ChatStream, chat_manager

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass, asdict
from typing import List, Optional, Union, Any, Dict
from typing import List, Optional, Union, Dict
@dataclass
class Seg:

View File

@@ -1,12 +1,12 @@
import time
from dataclasses import dataclass
from typing import Dict, ForwardRef, List, Optional, Union
from typing import Dict, Optional
import urllib3
from .cq_code import CQCode, cq_code_tool
from .cq_code import cq_code_tool
from .utils_cq import parse_cq_code
from .utils_user import get_groupname, get_user_cardname, get_user_nickname
from .utils_user import get_groupname
from .message_base import Seg, GroupInfo, UserInfo, BaseMessageInfo, MessageBase
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

View File

@@ -5,12 +5,11 @@ from typing import Dict, List, Optional, Union
from loguru import logger
from nonebot.adapters.onebot.v11 import Bot
from .cq_code import cq_code_tool
from .message_cq import MessageSendCQ
from .message import MessageSending, MessageThinking, MessageRecv, MessageSet
from .storage import MessageStorage
from .config import global_config
from .chat_stream import chat_manager
class Message_Sender:

View File

@@ -9,7 +9,7 @@ from ..moods.moods import MoodManager
from ..schedule.schedule_generator import bot_schedule
from .config import global_config
from .utils import get_embedding, get_recent_group_detailed_plain_text
from .chat_stream import ChatStream, chat_manager
from .chat_stream import chat_manager
class PromptBuilder:

View File

@@ -1,6 +1,5 @@
import asyncio
from typing import Optional, Union
from typing import Optional, Union
from typing import Optional
from loguru import logger
from ...common.database import Database

View File

@@ -1,8 +1,6 @@
from typing import Optional, Union
from typing import Optional, Union
from ...common.database import Database
from .message_base import MessageBase
from .message import MessageSending, MessageRecv
from .chat_stream import ChatStream
from loguru import logger

View File

@@ -12,8 +12,8 @@ from loguru import logger
from ..models.utils_model import LLM_request
from ..utils.typo_generator import ChineseTypoGenerator
from .config import global_config
from .message import MessageThinking, MessageRecv,MessageSending,MessageProcessBase,Message
from .message_base import MessageBase,BaseMessageInfo,UserInfo,GroupInfo
from .message import MessageRecv,Message
from .message_base import UserInfo
from .chat_stream import ChatStream
from ..moods.moods import MoodManager
@@ -39,9 +39,13 @@ def db_message_to_str(message_dict: Dict) -> str:
def is_mentioned_bot_in_message(message: MessageRecv) -> bool:
"""检查消息是否提到了机器人"""
keywords = [global_config.BOT_NICKNAME]
nicknames = global_config.BOT_ALIAS_NAMES
for keyword in keywords:
if keyword in message.processed_plain_text:
return True
for nickname in nicknames:
if nickname in message.processed_plain_text:
return True
return False

View File

@@ -1,16 +1,12 @@
import base64
import io
import os
import time
import zlib
import aiohttp
import hashlib
from typing import Optional, Tuple, Union
from urllib.parse import urlparse
from typing import Optional, Union
from loguru import logger
from nonebot import get_driver
from PIL import Image
from ...common.database import Database
from ..chat.config import global_config

View File

@@ -1,13 +1,9 @@
import asyncio
from typing import Dict
from loguru import logger
from typing import Dict
from loguru import logger
from .config import global_config
from .message_base import UserInfo, GroupInfo
from .chat_stream import chat_manager,ChatStream
from .chat_stream import ChatStream
class WillingManager:

View File

@@ -1,199 +0,0 @@
import os
import sys
import time
import requests
from dotenv import load_dotenv
# 添加项目根目录到 Python 路径
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
sys.path.append(root_path)
# 加载根目录下的env.edv文件
env_path = os.path.join(root_path, ".env.dev")
if not os.path.exists(env_path):
raise FileNotFoundError(f"配置文件不存在: {env_path}")
load_dotenv(env_path)
from src.common.database import Database
# 从环境变量获取配置
Database.initialize(
uri=os.getenv("MONGODB_URI"),
host=os.getenv("MONGODB_HOST", "127.0.0.1"),
port=int(os.getenv("MONGODB_PORT", "27017")),
db_name=os.getenv("DATABASE_NAME", "MegBot"),
username=os.getenv("MONGODB_USERNAME"),
password=os.getenv("MONGODB_PASSWORD"),
auth_source=os.getenv("MONGODB_AUTH_SOURCE"),
)
class KnowledgeLibrary:
def __init__(self):
self.db = Database.get_instance()
self.raw_info_dir = "data/raw_info"
self._ensure_dirs()
self.api_key = os.getenv("SILICONFLOW_KEY")
if not self.api_key:
raise ValueError("SILICONFLOW_API_KEY 环境变量未设置")
def _ensure_dirs(self):
"""确保必要的目录存在"""
os.makedirs(self.raw_info_dir, exist_ok=True)
def get_embedding(self, text: str) -> list:
"""获取文本的embedding向量"""
url = "https://api.siliconflow.cn/v1/embeddings"
payload = {
"model": "BAAI/bge-m3",
"input": text,
"encoding_format": "float"
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
print(f"获取embedding失败: {response.text}")
return None
return response.json()['data'][0]['embedding']
def process_files(self):
"""处理raw_info目录下的所有txt文件"""
for filename in os.listdir(self.raw_info_dir):
if filename.endswith('.txt'):
file_path = os.path.join(self.raw_info_dir, filename)
self.process_single_file(file_path)
def process_single_file(self, file_path: str):
"""处理单个文件"""
try:
# 检查文件是否已处理
if self.db.db.processed_files.find_one({"file_path": file_path}):
print(f"文件已处理过,跳过: {file_path}")
return
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 按1024字符分段
segments = [content[i:i+600] for i in range(0, len(content), 300)]
# 处理每个分段
for segment in segments:
if not segment.strip(): # 跳过空段
continue
# 获取embedding
embedding = self.get_embedding(segment)
if not embedding:
continue
# 存储到数据库
doc = {
"content": segment,
"embedding": embedding,
"file_path": file_path,
"segment_length": len(segment)
}
# 使用文本内容的哈希值作为唯一标识
content_hash = hash(segment)
# 更新或插入文档
self.db.db.knowledges.update_one(
{"content_hash": content_hash},
{"$set": doc},
upsert=True
)
# 记录文件已处理
self.db.db.processed_files.insert_one({
"file_path": file_path,
"processed_time": time.time()
})
print(f"成功处理文件: {file_path}")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {str(e)}")
def search_similar_segments(self, query: str, limit: int = 5) -> list:
"""搜索与查询文本相似的片段"""
query_embedding = self.get_embedding(query)
if not query_embedding:
return []
# 使用余弦相似度计算
pipeline = [
{
"$addFields": {
"dotProduct": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{"$multiply": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]}
]}
]
}
}
},
"magnitude1": {
"$sqrt": {
"$reduce": {
"input": "$embedding",
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}
}
}
},
"magnitude2": {
"$sqrt": {
"$reduce": {
"input": query_embedding,
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}
}
}
}
}
},
{
"$addFields": {
"similarity": {
"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]
}
}
},
{"$sort": {"similarity": -1}},
{"$limit": limit},
{"$project": {"content": 1, "similarity": 1, "file_path": 1}}
]
results = list(self.db.db.knowledges.aggregate(pipeline))
return results
# 创建单例实例
knowledge_library = KnowledgeLibrary()
if __name__ == "__main__":
# 测试知识库功能
print("开始处理知识库文件...")
knowledge_library.process_files()
# 测试搜索功能
test_query = "麦麦评价一下僕と花"
print(f"\n搜索与'{test_query}'相似的内容:")
results = knowledge_library.search_similar_segments(test_query)
for result in results:
print(f"相似度: {result['similarity']:.4f}")
print(f"内容: {result['content'][:100]}...")
print("-" * 50)

View File

@@ -10,7 +10,6 @@ from pathlib import Path
import matplotlib.pyplot as plt
import networkx as nx
import pymongo
from dotenv import load_dotenv
from loguru import logger
import jieba

View File

@@ -0,0 +1,383 @@
import os
import sys
import time
import requests
from dotenv import load_dotenv
import hashlib
from datetime import datetime
from tqdm import tqdm
from rich.console import Console
from rich.table import Table
# 添加项目根目录到 Python 路径
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
sys.path.append(root_path)
# 现在可以导入src模块
from src.common.database import Database
# 加载根目录下的env.edv文件
env_path = os.path.join(root_path, ".env.prod")
if not os.path.exists(env_path):
raise FileNotFoundError(f"配置文件不存在: {env_path}")
load_dotenv(env_path)
class KnowledgeLibrary:
def __init__(self):
# 初始化数据库连接
if Database._instance is None:
Database.initialize(
uri=os.getenv("MONGODB_URI"),
host=os.getenv("MONGODB_HOST", "127.0.0.1"),
port=int(os.getenv("MONGODB_PORT", "27017")),
db_name=os.getenv("DATABASE_NAME", "MegBot"),
username=os.getenv("MONGODB_USERNAME"),
password=os.getenv("MONGODB_PASSWORD"),
auth_source=os.getenv("MONGODB_AUTH_SOURCE"),
)
self.db = Database.get_instance()
self.raw_info_dir = "data/raw_info"
self._ensure_dirs()
self.api_key = os.getenv("SILICONFLOW_KEY")
if not self.api_key:
raise ValueError("SILICONFLOW_API_KEY 环境变量未设置")
self.console = Console()
def _ensure_dirs(self):
"""确保必要的目录存在"""
os.makedirs(self.raw_info_dir, exist_ok=True)
def read_file(self, file_path: str) -> str:
"""读取文件内容"""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def split_content(self, content: str, max_length: int = 512) -> list:
"""将内容分割成适当大小的块,保持段落完整性
Args:
content: 要分割的文本内容
max_length: 每个块的最大长度
Returns:
list: 分割后的文本块列表
"""
# 首先按段落分割
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
chunks = []
current_chunk = []
current_length = 0
for para in paragraphs:
para_length = len(para)
# 如果单个段落就超过最大长度
if para_length > max_length:
# 如果当前chunk不为空先保存
if current_chunk:
chunks.append('\n'.join(current_chunk))
current_chunk = []
current_length = 0
# 将长段落按句子分割
sentences = [s.strip() for s in para.replace('', '\n').replace('', '\n').replace('', '\n').split('\n') if s.strip()]
temp_chunk = []
temp_length = 0
for sentence in sentences:
sentence_length = len(sentence)
if sentence_length > max_length:
# 如果单个句子超长,强制按长度分割
if temp_chunk:
chunks.append('\n'.join(temp_chunk))
temp_chunk = []
temp_length = 0
for i in range(0, len(sentence), max_length):
chunks.append(sentence[i:i + max_length])
elif temp_length + sentence_length + 1 <= max_length:
temp_chunk.append(sentence)
temp_length += sentence_length + 1
else:
chunks.append('\n'.join(temp_chunk))
temp_chunk = [sentence]
temp_length = sentence_length
if temp_chunk:
chunks.append('\n'.join(temp_chunk))
# 如果当前段落加上现有chunk不超过最大长度
elif current_length + para_length + 1 <= max_length:
current_chunk.append(para)
current_length += para_length + 1
else:
# 保存当前chunk并开始新的chunk
chunks.append('\n'.join(current_chunk))
current_chunk = [para]
current_length = para_length
# 添加最后一个chunk
if current_chunk:
chunks.append('\n'.join(current_chunk))
return chunks
def get_embedding(self, text: str) -> list:
"""获取文本的embedding向量"""
url = "https://api.siliconflow.cn/v1/embeddings"
payload = {
"model": "BAAI/bge-m3",
"input": text,
"encoding_format": "float"
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
print(f"获取embedding失败: {response.text}")
return None
return response.json()['data'][0]['embedding']
def process_files(self, knowledge_length:int=512):
"""处理raw_info目录下的所有txt文件"""
txt_files = [f for f in os.listdir(self.raw_info_dir) if f.endswith('.txt')]
if not txt_files:
self.console.print("[red]警告:在 {} 目录下没有找到任何txt文件[/red]".format(self.raw_info_dir))
self.console.print("[yellow]请将需要处理的文本文件放入该目录后再运行程序[/yellow]")
return
total_stats = {
"processed_files": 0,
"total_chunks": 0,
"failed_files": [],
"skipped_files": []
}
self.console.print(f"\n[bold blue]开始处理知识库文件 - 共{len(txt_files)}个文件[/bold blue]")
for filename in tqdm(txt_files, desc="处理文件进度"):
file_path = os.path.join(self.raw_info_dir, filename)
result = self.process_single_file(file_path, knowledge_length)
self._update_stats(total_stats, result, filename)
self._display_processing_results(total_stats)
def process_single_file(self, file_path: str, knowledge_length: int = 512):
"""处理单个文件"""
result = {
"status": "success",
"chunks_processed": 0,
"error": None
}
try:
current_hash = self.calculate_file_hash(file_path)
processed_record = self.db.db.processed_files.find_one({"file_path": file_path})
if processed_record:
if processed_record.get("hash") == current_hash:
if knowledge_length in processed_record.get("split_by", []):
result["status"] = "skipped"
return result
content = self.read_file(file_path)
chunks = self.split_content(content, knowledge_length)
for chunk in tqdm(chunks, desc=f"处理 {os.path.basename(file_path)} 的文本块", leave=False):
embedding = self.get_embedding(chunk)
if embedding:
knowledge = {
"content": chunk,
"embedding": embedding,
"source_file": file_path,
"split_length": knowledge_length,
"created_at": datetime.now()
}
self.db.db.knowledges.insert_one(knowledge)
result["chunks_processed"] += 1
split_by = processed_record.get("split_by", []) if processed_record else []
if knowledge_length not in split_by:
split_by.append(knowledge_length)
self.db.db.processed_files.update_one(
{"file_path": file_path},
{
"$set": {
"hash": current_hash,
"last_processed": datetime.now(),
"split_by": split_by
}
},
upsert=True
)
except Exception as e:
result["status"] = "failed"
result["error"] = str(e)
return result
def _update_stats(self, total_stats, result, filename):
"""更新总体统计信息"""
if result["status"] == "success":
total_stats["processed_files"] += 1
total_stats["total_chunks"] += result["chunks_processed"]
elif result["status"] == "failed":
total_stats["failed_files"].append((filename, result["error"]))
elif result["status"] == "skipped":
total_stats["skipped_files"].append(filename)
def _display_processing_results(self, stats):
"""显示处理结果统计"""
self.console.print("\n[bold green]处理完成!统计信息如下:[/bold green]")
table = Table(show_header=True, header_style="bold magenta")
table.add_column("统计项", style="dim")
table.add_column("数值")
table.add_row("成功处理文件数", str(stats["processed_files"]))
table.add_row("处理的知识块总数", str(stats["total_chunks"]))
table.add_row("跳过的文件数", str(len(stats["skipped_files"])))
table.add_row("失败的文件数", str(len(stats["failed_files"])))
self.console.print(table)
if stats["failed_files"]:
self.console.print("\n[bold red]处理失败的文件:[/bold red]")
for filename, error in stats["failed_files"]:
self.console.print(f"[red]- {filename}: {error}[/red]")
if stats["skipped_files"]:
self.console.print("\n[bold yellow]跳过的文件(已处理):[/bold yellow]")
for filename in stats["skipped_files"]:
self.console.print(f"[yellow]- {filename}[/yellow]")
def calculate_file_hash(self, file_path):
"""计算文件的MD5哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def search_similar_segments(self, query: str, limit: int = 5) -> list:
"""搜索与查询文本相似的片段"""
query_embedding = self.get_embedding(query)
if not query_embedding:
return []
# 使用余弦相似度计算
pipeline = [
{
"$addFields": {
"dotProduct": {
"$reduce": {
"input": {"$range": [0, {"$size": "$embedding"}]},
"initialValue": 0,
"in": {
"$add": [
"$$value",
{"$multiply": [
{"$arrayElemAt": ["$embedding", "$$this"]},
{"$arrayElemAt": [query_embedding, "$$this"]}
]}
]
}
}
},
"magnitude1": {
"$sqrt": {
"$reduce": {
"input": "$embedding",
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}
}
}
},
"magnitude2": {
"$sqrt": {
"$reduce": {
"input": query_embedding,
"initialValue": 0,
"in": {"$add": ["$$value", {"$multiply": ["$$this", "$$this"]}]}
}
}
}
}
},
{
"$addFields": {
"similarity": {
"$divide": ["$dotProduct", {"$multiply": ["$magnitude1", "$magnitude2"]}]
}
}
},
{"$sort": {"similarity": -1}},
{"$limit": limit},
{"$project": {"content": 1, "similarity": 1, "file_path": 1}}
]
results = list(self.db.db.knowledges.aggregate(pipeline))
return results
# 创建单例实例
knowledge_library = KnowledgeLibrary()
if __name__ == "__main__":
console = Console()
console.print("[bold green]知识库处理工具[/bold green]")
while True:
console.print("\n请选择要执行的操作:")
console.print("[1] 麦麦开始学习")
console.print("[2] 麦麦全部忘光光(仅知识)")
console.print("[q] 退出程序")
choice = input("\n请输入选项: ").strip()
if choice.lower() == 'q':
console.print("[yellow]程序退出[/yellow]")
sys.exit(0)
elif choice == '2':
confirm = input("确定要删除所有知识吗?这个操作不可撤销!(y/n): ").strip().lower()
if confirm == 'y':
knowledge_library.db.db.knowledges.delete_many({})
console.print("[green]已清空所有知识![/green]")
continue
elif choice == '1':
if not os.path.exists(knowledge_library.raw_info_dir):
console.print(f"[yellow]创建目录:{knowledge_library.raw_info_dir}[/yellow]")
os.makedirs(knowledge_library.raw_info_dir, exist_ok=True)
# 询问分割长度
while True:
try:
length_input = input("请输入知识分割长度默认512输入q退出回车使用默认值: ").strip()
if length_input.lower() == 'q':
break
if not length_input: # 如果直接回车,使用默认值
knowledge_length = 512
break
knowledge_length = int(length_input)
if knowledge_length <= 0:
print("分割长度必须大于0请重新输入")
continue
break
except ValueError:
print("请输入有效的数字")
continue
if length_input.lower() == 'q':
continue
# 测试知识库功能
print(f"开始处理知识库文件,使用分割长度: {knowledge_length}...")
knowledge_library.process_files(knowledge_length=knowledge_length)
else:
console.print("[red]无效的选项,请重新选择[/red]")
continue

View File

@@ -0,0 +1,4 @@
更新版本后建议删除数据库messages中所有内容不然会出现报错
该操作不会影响你的记忆
如果显示配置文件版本过低运行根目录的bat

View File

@@ -0,0 +1,45 @@
@echo off
setlocal enabledelayedexpansion
chcp 65001
cd /d %~dp0
echo =====================================
echo 选择Python环境:
echo 1 - venv (推荐)
echo 2 - conda
echo =====================================
choice /c 12 /n /m "输入数字(1或2): "
if errorlevel 2 (
echo =====================================
set "CONDA_ENV="
set /p CONDA_ENV="请输入要激活的 conda 环境名称: "
:: 检查输入是否为空
if "!CONDA_ENV!"=="" (
echo 错误:环境名称不能为空
pause
exit /b 1
)
call conda activate !CONDA_ENV!
if errorlevel 1 (
echo 激活 conda 环境失败
pause
exit /b 1
)
echo Conda 环境 "!CONDA_ENV!" 激活成功
python config/auto_update.py
) else (
if exist "venv\Scripts\python.exe" (
venv\Scripts\python config/auto_update.py
) else (
echo =====================================
echo 错误: venv环境不存在请先创建虚拟环境
pause
exit /b 1
)
)
endlocal
pause

45
麦麦开始学习.bat Normal file
View File

@@ -0,0 +1,45 @@
@echo off
setlocal enabledelayedexpansion
chcp 65001
cd /d %~dp0
echo =====================================
echo 选择Python环境:
echo 1 - venv (推荐)
echo 2 - conda
echo =====================================
choice /c 12 /n /m "输入数字(1或2): "
if errorlevel 2 (
echo =====================================
set "CONDA_ENV="
set /p CONDA_ENV="请输入要激活的 conda 环境名称: "
:: 检查输入是否为空
if "!CONDA_ENV!"=="" (
echo 错误:环境名称不能为空
pause
exit /b 1
)
call conda activate !CONDA_ENV!
if errorlevel 1 (
echo 激活 conda 环境失败
pause
exit /b 1
)
echo Conda 环境 "!CONDA_ENV!" 激活成功
python src/plugins/zhishi/knowledge_library.py
) else (
if exist "venv\Scripts\python.exe" (
venv\Scripts\python src/plugins/zhishi/knowledge_library.py
) else (
echo =====================================
echo 错误: venv环境不存在请先创建虚拟环境
pause
exit /b 1
)
)
endlocal
pause