Merge branch 'debug' into feature

This commit is contained in:
tcmofashi
2025-03-06 01:55:47 +08:00
26 changed files with 1692 additions and 1564 deletions

View File

@@ -0,0 +1,165 @@
from typing import Dict, List, Union, Optional, Any
import base64
import os
"""
OneBot v11 Message Segment Builder
This module provides classes for building message segments that conform to the
OneBot v11 standard. These segments can be used to construct complex messages
for sending through bots that implement the OneBot interface.
"""
class Segment:
"""Base class for all message segments."""
def __init__(self, type_: str, data: Dict[str, Any]):
self.type = type_
self.data = data
def to_dict(self) -> Dict[str, Any]:
"""Convert the segment to a dictionary format."""
return {
"type": self.type,
"data": self.data
}
class Text(Segment):
"""Text message segment."""
def __init__(self, text: str):
super().__init__("text", {"text": text})
class Face(Segment):
"""Face/emoji message segment."""
def __init__(self, face_id: int):
super().__init__("face", {"id": str(face_id)})
class Image(Segment):
"""Image message segment."""
@classmethod
def from_url(cls, url: str) -> 'Image':
"""Create an Image segment from a URL."""
return cls(url=url)
@classmethod
def from_path(cls, path: str) -> 'Image':
"""Create an Image segment from a file path."""
with open(path, 'rb') as f:
file_b64 = base64.b64encode(f.read()).decode('utf-8')
return cls(file=f"base64://{file_b64}")
def __init__(self, file: str = None, url: str = None, cache: bool = True):
data = {}
if file:
data["file"] = file
if url:
data["url"] = url
if not cache:
data["cache"] = "0"
super().__init__("image", data)
class At(Segment):
"""@Someone message segment."""
def __init__(self, user_id: Union[int, str]):
data = {"qq": str(user_id)}
super().__init__("at", data)
class Record(Segment):
"""Voice message segment."""
def __init__(self, file: str, magic: bool = False, cache: bool = True):
data = {"file": file}
if magic:
data["magic"] = "1"
if not cache:
data["cache"] = "0"
super().__init__("record", data)
class Video(Segment):
"""Video message segment."""
def __init__(self, file: str):
super().__init__("video", {"file": file})
class Reply(Segment):
"""Reply message segment."""
def __init__(self, message_id: int):
super().__init__("reply", {"id": str(message_id)})
class MessageBuilder:
"""Helper class for building complex messages."""
def __init__(self):
self.segments: List[Segment] = []
def text(self, text: str) -> 'MessageBuilder':
"""Add a text segment."""
self.segments.append(Text(text))
return self
def face(self, face_id: int) -> 'MessageBuilder':
"""Add a face/emoji segment."""
self.segments.append(Face(face_id))
return self
def image(self, file: str = None) -> 'MessageBuilder':
"""Add an image segment."""
self.segments.append(Image(file=file))
return self
def at(self, user_id: Union[int, str]) -> 'MessageBuilder':
"""Add an @someone segment."""
self.segments.append(At(user_id))
return self
def record(self, file: str, magic: bool = False) -> 'MessageBuilder':
"""Add a voice record segment."""
self.segments.append(Record(file, magic))
return self
def video(self, file: str) -> 'MessageBuilder':
"""Add a video segment."""
self.segments.append(Video(file))
return self
def reply(self, message_id: int) -> 'MessageBuilder':
"""Add a reply segment."""
self.segments.append(Reply(message_id))
return self
def build(self) -> List[Dict[str, Any]]:
"""Build the message into a list of segment dictionaries."""
return [segment.to_dict() for segment in self.segments]
'''Convenience functions
def text(content: str) -> Dict[str, Any]:
"""Create a text message segment."""
return Text(content).to_dict()
def image_url(url: str) -> Dict[str, Any]:
"""Create an image message segment from URL."""
return Image.from_url(url).to_dict()
def image_path(path: str) -> Dict[str, Any]:
"""Create an image message segment from file path."""
return Image.from_path(path).to_dict()
def at(user_id: Union[int, str]) -> Dict[str, Any]:
"""Create an @someone message segment."""
return At(user_id).to_dict()'''

View File

@@ -13,6 +13,7 @@ from .willing_manager import willing_manager
from nonebot.rule import to_me
from .bot import chat_bot
from .emoji_manager import emoji_manager
import time
# 获取驱动器
@@ -86,19 +87,27 @@ async def _(bot: Bot):
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
await chat_bot.handle_message(event, bot)
'''
@scheduler.scheduled_job("interval", seconds=300000, id="monitor_relationships")
async def monitor_relationships():
"""每15秒打印一次关系数据"""
relationship_manager.print_all_relationships()
'''
# 添加build_memory定时任务
@scheduler.scheduled_job("interval", seconds=global_config.build_memory_interval, id="build_memory")
async def build_memory_task():
"""每30秒执行一次记忆构建"""
print("\033[1;32m[记忆构建]\033[0m 开始构建记忆...")
await hippocampus.build_memory(chat_size=30)
print("\033[1;32m[记忆构建]\033[0m 记忆构建完成")
print("\033[1;32m[记忆构建]\033[0m -------------------------------------------开始构建记忆-------------------------------------------")
start_time = time.time()
await hippocampus.operation_build_memory(chat_size=20)
end_time = time.time()
print(f"\033[1;32m[记忆构建]\033[0m -------------------------------------------记忆构建完成:耗时: {end_time - start_time:.2f} 秒-------------------------------------------")
@scheduler.scheduled_job("interval", seconds=global_config.forget_memory_interval, id="forget_memory")
async def forget_memory_task():
"""每30秒执行一次记忆构建"""
# print("\033[1;32m[记忆遗忘]\033[0m 开始遗忘记忆...")
# await hippocampus.operation_forget_topic(percentage=0.1)
# print("\033[1;32m[记忆遗忘]\033[0m 记忆遗忘完成")
@scheduler.scheduled_job("interval", seconds=global_config.build_memory_interval + 10, id="merge_memory")
async def merge_memory_task():
"""每30秒执行一次记忆构建"""
# print("\033[1;32m[记忆整合]\033[0m 开始整合")
# await hippocampus.operation_merge_memory(percentage=0.1)
# print("\033[1;32m[记忆整合]\033[0m 记忆整合完成")

View File

@@ -69,11 +69,9 @@ class ChatBot:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.time))
identifier=topic_identifier.identify_topic()
if global_config.topic_extract=='llm':
topic=await identifier(message.processed_plain_text)
else:
topic=identifier(message.detailed_plain_text)
topic=await topic_identifier.identify_topic_llm(message.processed_plain_text)
# topic1 = topic_identifier.identify_topic_jieba(message.processed_plain_text)
# topic2 = await topic_identifier.identify_topic_llm(message.processed_plain_text)

View File

@@ -26,7 +26,8 @@ class BotConfig:
talk_frequency_down_groups = set()
ban_user_id = set()
build_memory_interval: int = 60 # 记忆构建间隔(秒)
build_memory_interval: int = 30 # 记忆构建间隔(秒)
forget_memory_interval: int = 300 # 记忆遗忘间隔(秒)
EMOJI_CHECK_INTERVAL: int = 120 # 表情包检查间隔(分钟)
EMOJI_REGISTER_INTERVAL: int = 10 # 表情包注册间隔(分钟)
@@ -155,6 +156,7 @@ class BotConfig:
if "memory" in toml_dict:
memory_config = toml_dict["memory"]
config.build_memory_interval = memory_config.get("build_memory_interval", config.build_memory_interval)
config.forget_memory_interval = memory_config.get("forget_memory_interval", config.forget_memory_interval)
# 群组配置
if "groups" in toml_dict:
@@ -188,6 +190,6 @@ global_config = BotConfig.load_config(config_path=bot_config_path)
if not global_config.enable_advance_output:
# logger.remove()
logger.remove()
pass

View File

@@ -1,251 +0,0 @@
from typing import Union, List, Optional, Deque, Dict
from nonebot.adapters.onebot.v11 import Bot, MessageSegment
import asyncio
import random
import os
from .message import Message, Message_Thinking, MessageSet
from .cq_code import CQCode
from collections import deque
import time
from .storage import MessageStorage
from .config import global_config
from .cq_code import cq_code_tool
if os.name == "nt":
from .message_visualizer import message_visualizer
class SendTemp:
"""单个群组的临时消息队列管理器"""
def __init__(self, group_id: int, max_size: int = 100):
self.group_id = group_id
self.max_size = max_size
self.messages: Deque[Union[Message, Message_Thinking]] = deque(maxlen=max_size)
self.last_send_time = 0
def add(self, message: Message) -> None:
"""按时间顺序添加消息到队列"""
if not self.messages:
self.messages.append(message)
return
# 按时间顺序插入
if message.time >= self.messages[-1].time:
self.messages.append(message)
return
# 使用二分查找找到合适的插入位置
messages_list = list(self.messages)
left, right = 0, len(messages_list)
while left < right:
mid = (left + right) // 2
if messages_list[mid].time < message.time:
left = mid + 1
else:
right = mid
# 重建消息队列,保持时间顺序
new_messages = deque(maxlen=self.max_size)
new_messages.extend(messages_list[:left])
new_messages.append(message)
new_messages.extend(messages_list[left:])
self.messages = new_messages
def get_earliest_message(self) -> Optional[Message]:
"""获取时间最早的消息"""
message = self.messages.popleft() if self.messages else None
return message
def clear(self) -> None:
"""清空队列"""
self.messages.clear()
def get_all(self, group_id: Optional[int] = None) -> List[Union[Message, Message_Thinking]]:
"""获取所有待发送的消息"""
if group_id is None:
return list(self.messages)
return [msg for msg in self.messages if msg.group_id == group_id]
def peek_next(self) -> Optional[Union[Message, Message_Thinking]]:
"""查看下一条要发送的消息(不移除)"""
return self.messages[0] if self.messages else None
def has_messages(self) -> bool:
"""检查是否有待发送的消息"""
return bool(self.messages)
def count(self, group_id: Optional[int] = None) -> int:
"""获取待发送消息数量"""
if group_id is None:
return len(self.messages)
return len([msg for msg in self.messages if msg.group_id == group_id])
def get_last_send_time(self) -> float:
"""获取最后一次发送时间"""
return self.last_send_time
def update_send_time(self):
"""更新最后发送时间"""
self.last_send_time = time.time()
class SendTempContainer:
"""管理所有群组的消息缓存容器"""
def __init__(self):
self.temp_queues: Dict[int, SendTemp] = {}
def get_queue(self, group_id: int) -> SendTemp:
"""获取或创建群组的消息队列"""
if group_id not in self.temp_queues:
self.temp_queues[group_id] = SendTemp(group_id)
return self.temp_queues[group_id]
def add_message(self, message: Message) -> None:
"""添加消息到对应群组的队列"""
queue = self.get_queue(message.group_id)
queue.add(message)
def get_group_messages(self, group_id: int) -> List[Union[Message, Message_Thinking]]:
"""获取指定群组的所有待发送消息"""
queue = self.get_queue(group_id)
return queue.get_all()
def has_messages(self, group_id: int) -> bool:
"""检查指定群组是否有待发送消息"""
queue = self.get_queue(group_id)
return queue.has_messages()
def get_all_groups(self) -> List[int]:
"""获取所有有待发送消息的群组ID"""
return list(self.temp_queues.keys())
def update_thinking_message(self, message_obj: Union[Message, MessageSet]) -> bool:
queue = self.get_queue(message_obj.group_id)
# 使用列表解析找到匹配的消息索引
matching_indices = [
i for i, msg in enumerate(queue.messages)
if msg.message_id == message_obj.message_id
]
if not matching_indices:
return False
index = matching_indices[0] # 获取第一个匹配的索引
# 将消息转换为列表以便修改
messages = list(queue.messages)
# 根据消息类型处理
if isinstance(message_obj, MessageSet):
messages.pop(index)
# 在原位置插入新消息组
for i, single_message in enumerate(message_obj.messages):
messages.insert(index + i, single_message)
# print(f"\033[1;34m[调试]\033[0m 添加消息组中的第{i+1}条消息: {single_message}")
else:
# 直接替换原消息
messages[index] = message_obj
# print(f"\033[1;34m[调试]\033[0m 已更新消息: {message_obj}")
# 重建队列
queue.messages.clear()
for msg in messages:
queue.messages.append(msg)
return True
class MessageSendControl:
"""消息发送控制器"""
def __init__(self):
self.typing_speed = (0.1, 0.3) # 每个字符的打字时间范围(秒)
self.message_interval = (0.5, 1) # 多条消息间的间隔时间范围(秒)
self.max_retry = 3 # 最大重试次数
self.send_temp_container = SendTempContainer()
self._running = True
self._paused = False
self._current_bot = None
self.storage = MessageStorage() # 添加存储实例
try:
message_visualizer.start()
except(NameError):
pass
async def process_group_messages(self, group_id: int):
queue = self.send_temp_container.get_queue(group_id)
if queue.has_messages():
message = queue.peek_next()
# 处理消息的逻辑
if isinstance(message, Message_Thinking):
message.update_thinking_time()
thinking_time = message.thinking_time
if message.interupt:
print(f"\033[1;34m[调试]\033[0m 思考不打算回复,移除")
queue.get_earliest_message()
return
elif thinking_time < 90: # 最少思考2秒
if int(thinking_time) % 15 == 0:
print(f"\033[1;34m[调试]\033[0m 消息正在思考中,已思考{thinking_time:.1f}")
return
else:
print(f"\033[1;34m[调试]\033[0m 思考消息超时,移除")
queue.get_earliest_message() # 移除超时的思考消息
return
elif isinstance(message, Message):
message = queue.get_earliest_message()
if message and message.processed_plain_text:
print(f"- 群组: {group_id} - 内容: {message.processed_plain_text}")
cost_time = round(time.time(), 2) - message.time
if cost_time > 40:
message.processed_plain_text = cq_code_tool.create_reply_cq(message.message_id) + message.processed_plain_text
cur_time = time.time()
await self._current_bot.send_group_msg(
group_id=group_id,
message=str(message.processed_plain_text),
auto_escape=False
)
cost_time = round(time.time(), 2) - cur_time
print(f"\033[1;34m[调试]\033[0m 消息发送时间: {cost_time}")
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.time))
print(f"\033[1;32m群 {group_id} 消息, 用户 {global_config.BOT_NICKNAME}, 时间: {current_time}:\033[0m {str(message.processed_plain_text)}")
if message.is_emoji:
message.processed_plain_text = "[表情包]"
await self.storage.store_message(message, None)
else:
await self.storage.store_message(message, None)
queue.update_send_time()
if queue.has_messages():
await asyncio.sleep(
random.uniform(
self.message_interval[0],
self.message_interval[1]
)
)
async def start_processor(self, bot: Bot):
"""启动消息处理器"""
self._current_bot = bot
while self._running:
await asyncio.sleep(1.5)
tasks = []
for group_id in self.send_temp_container.get_all_groups():
tasks.append(self.process_group_messages(group_id))
# 并行处理所有群组的消息
await asyncio.gather(*tasks)
try:
message_visualizer.update_content(self.send_temp_container)
except(NameError):
pass
def set_typing_speed(self, min_speed: float, max_speed: float):
"""设置打字速度范围"""
self.typing_speed = (min_speed, max_speed)
# 创建全局实例
message_sender_control = MessageSendControl()

View File

@@ -1,271 +0,0 @@
from typing import List, Optional, Dict
from .message import Message
import time
from collections import deque
from datetime import datetime, timedelta
import os
import json
import asyncio
class MessageStream:
"""单个群组的消息流容器"""
def __init__(self, group_id: int, max_size: int = 1000):
self.group_id = group_id
self.messages = deque(maxlen=max_size)
self.max_size = max_size
self.last_save_time = time.time()
# 确保日志目录存在
self.log_dir = os.path.join("log", str(self.group_id))
os.makedirs(self.log_dir, exist_ok=True)
# 启动自动保存任务
asyncio.create_task(self._auto_save())
async def _auto_save(self):
"""每30秒自动保存一次消息记录"""
while True:
await asyncio.sleep(30) # 等待30秒
await self.save_to_log()
async def save_to_log(self):
"""将消息保存到日志文件"""
try:
current_time = time.time()
# 只有有新消息时才保存
if not self.messages or self.last_save_time == current_time:
return
# 生成日志文件名 (使用当前日期)
date_str = time.strftime("%Y-%m-%d", time.localtime(current_time))
log_file = os.path.join(self.log_dir, f"chat_{date_str}.log")
# 获取需要保存的新消息
new_messages = [
msg for msg in self.messages
if msg.time > self.last_save_time
]
if not new_messages:
return
# 将消息转换为可序列化的格式
message_logs = []
for msg in new_messages:
message_logs.append({
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(msg.time)),
"user_id": msg.user_id,
"user_nickname": msg.user_nickname,
"user_cardname": msg.user_cardname,
"message_id": msg.message_id,
"raw_message": msg.raw_message,
"processed_text": msg.processed_plain_text
})
# 追加写入日志文件
with open(log_file, "a", encoding="utf-8") as f:
for log in message_logs:
f.write(json.dumps(log, ensure_ascii=False) + "\n")
self.last_save_time = current_time
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 保存群 {self.group_id} 的消息日志失败: {str(e)}")
def add_message(self, message: Message) -> None:
"""按时间顺序添加新消息到队列
使用改进的二分查找算法来保持消息的时间顺序,同时优化内存使用。
Args:
message: Message对象要添加的新消息
"""
# 空队列或消息应该添加到末尾的情况
if (not self.messages or
message.time >= self.messages[-1].time):
self.messages.append(message)
return
# 消息应该添加到开头的情况
if message.time <= self.messages[0].time:
self.messages.appendleft(message)
return
# 使用二分查找在现有队列中找到合适的插入位置
left, right = 0, len(self.messages) - 1
while left <= right:
mid = (left + right) // 2
if self.messages[mid].time < message.time:
left = mid + 1
else:
right = mid - 1
temp = list(self.messages)
temp.insert(left, message)
# 如果超出最大长度,移除多余的消息
if len(temp) > self.max_size:
temp = temp[-self.max_size:]
# 重建队列
self.messages = deque(temp, maxlen=self.max_size)
async def get_recent_messages_from_db(self, count: int = 10) -> List[Message]:
"""从数据库中获取最近的消息记录
Args:
count: 需要获取的消息数量
Returns:
List[Message]: 最近的消息列表
"""
try:
from ...common.database import Database
db = Database.get_instance()
# 从数据库中查询最近的消息
recent_messages = list(db.db.messages.find(
{"group_id": self.group_id},
# {
# "time": 1,
# "user_id": 1,
# "user_nickname": 1,
# # "user_cardname": 1,
# "message_id": 1,
# "raw_message": 1,
# "processed_text": 1
# }
).sort("time", -1).limit(count))
if not recent_messages:
return []
# 转换为 Message 对象
from .message import Message
messages = []
for msg_data in recent_messages:
try:
msg = Message(
time=msg_data["time"],
user_id=msg_data["user_id"],
user_nickname=msg_data.get("user_nickname", ""),
user_cardname=msg_data.get("user_cardname", ""),
message_id=msg_data["message_id"],
raw_message=msg_data["raw_message"],
processed_plain_text=msg_data.get("processed_text", ""),
group_id=self.group_id
)
messages.append(msg)
except KeyError:
print("[WARNING] 数据库中存在无效的消息")
continue
return list(reversed(messages)) # 返回按时间正序的消息
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 从数据库获取群 {self.group_id} 的最近消息记录失败: {str(e)}")
return []
def get_recent_messages(self, count: int = 10) -> List[Message]:
"""获取最近的n条消息从内存队列"""
print(f"\033[1;34m[调试]\033[0m 从内存获取群 {self.group_id} 的最近{count}条消息记录")
return list(self.messages)[-count:]
def get_messages_in_timerange(self,
start_time: Optional[float] = None,
end_time: Optional[float] = None) -> List[Message]:
"""获取时间范围内的消息"""
if start_time is None:
start_time = time.time() - 3600
if end_time is None:
end_time = time.time()
return [
msg for msg in self.messages
if start_time <= msg.time <= end_time
]
def get_user_messages(self, user_id: int, count: int = 10) -> List[Message]:
"""获取特定用户的最近消息"""
user_messages = [msg for msg in self.messages if msg.user_id == user_id]
return user_messages[-count:]
def clear_old_messages(self, hours: int = 24) -> None:
"""清理旧消息"""
cutoff_time = time.time() - (hours * 3600)
self.messages = deque(
[msg for msg in self.messages if msg.time > cutoff_time],
maxlen=self.max_size
)
class MessageStreamContainer:
"""管理所有群组的消息流容器"""
def __init__(self, max_size: int = 1000):
self.streams: Dict[int, MessageStream] = {}
self.max_size = max_size
async def save_all_logs(self):
"""保存所有群组的消息日志"""
for stream in self.streams.values():
await stream.save_to_log()
def add_message(self, message: Message) -> None:
"""添加消息到对应群组的消息流"""
if not message.group_id:
return
if message.group_id not in self.streams:
self.streams[message.group_id] = MessageStream(message.group_id, self.max_size)
self.streams[message.group_id].add_message(message)
def get_stream(self, group_id: int) -> Optional[MessageStream]:
"""获取特定群组的消息流"""
return self.streams.get(group_id)
def get_all_streams(self) -> Dict[int, MessageStream]:
"""获取所有群组的消息流"""
return self.streams
def clear_old_messages(self, hours: int = 24) -> None:
"""清理所有群组的旧消息"""
for stream in self.streams.values():
stream.clear_old_messages(hours)
def get_group_stats(self, group_id: int) -> Dict:
"""获取群组的消息统计信息"""
stream = self.streams.get(group_id)
if not stream:
return {
"total_messages": 0,
"unique_users": 0,
"active_hours": [],
"most_active_user": None
}
messages = stream.messages
user_counts = {}
hour_counts = {}
for msg in messages:
user_counts[msg.user_id] = user_counts.get(msg.user_id, 0) + 1
hour = datetime.fromtimestamp(msg.time).hour
hour_counts[hour] = hour_counts.get(hour, 0) + 1
most_active_user = max(user_counts.items(), key=lambda x: x[1])[0] if user_counts else None
active_hours = sorted(
hour_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5]
return {
"total_messages": len(messages),
"unique_users": len(user_counts),
"active_hours": active_hours,
"most_active_user": most_active_user
}
# 创建全局实例
message_stream_container = MessageStreamContainer()

View File

@@ -1,138 +0,0 @@
import subprocess
import threading
import queue
import os
import time
from typing import Dict
from .message import Message_Thinking
class MessageVisualizer:
def __init__(self):
self.process = None
self.message_queue = queue.Queue()
self.is_running = False
self.content_file = "message_queue_content.txt"
def start(self):
if self.process is None:
# 创建用于显示的批处理文件
with open("message_queue_window.bat", "w", encoding="utf-8") as f:
f.write('@echo off\n')
f.write('chcp 65001\n') # 设置UTF-8编码
f.write('title Message Queue Visualizer\n')
f.write('echo Waiting for message queue updates...\n')
f.write(':loop\n')
f.write('if exist "queue_update.txt" (\n')
f.write(' type "queue_update.txt" > "message_queue_content.txt"\n')
f.write(' del "queue_update.txt"\n')
f.write(' cls\n')
f.write(' type "message_queue_content.txt"\n')
f.write(')\n')
f.write('timeout /t 1 /nobreak >nul\n')
f.write('goto loop\n')
# 清空内容文件
with open(self.content_file, "w", encoding="utf-8") as f:
f.write("")
# 启动新窗口
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.process = subprocess.Popen(
['cmd', '/c', 'start', 'message_queue_window.bat'],
shell=True,
startupinfo=startupinfo
)
self.is_running = True
# 启动处理线程
threading.Thread(target=self._process_messages, daemon=True).start()
def _process_messages(self):
while self.is_running:
try:
# 获取新消息
text = self.message_queue.get(timeout=1)
# 写入更新文件
with open("queue_update.txt", "w", encoding="utf-8") as f:
f.write(text)
except queue.Empty:
continue
except Exception as e:
print(f"处理队列可视化内容时出错: {e}")
def update_content(self, send_temp_container):
"""更新显示内容"""
if not self.is_running:
return
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
display_text = f"Message Queue Status - {current_time}\n"
display_text += "=" * 50 + "\n\n"
# 遍历所有群组的队列
for group_id, queue in send_temp_container.temp_queues.items():
display_text += f"\n{'='*20} 群组: {queue.group_id} {'='*20}\n"
display_text += f"消息队列长度: {len(queue.messages)}\n"
display_text += f"最后发送时间: {time.strftime('%H:%M:%S', time.localtime(queue.last_send_time))}\n"
display_text += "\n消息队列内容:\n"
# 显示队列中的消息
if not queue.messages:
display_text += " [空队列]\n"
else:
for i, msg in enumerate(queue.messages):
msg_time = time.strftime("%H:%M:%S", time.localtime(msg.time))
display_text += f"\n--- 消息 {i+1} ---\n"
if isinstance(msg, Message_Thinking):
display_text += f"类型: \033[1;33m思考中消息\033[0m\n"
display_text += f"时间: {msg_time}\n"
display_text += f"消息ID: {msg.message_id}\n"
display_text += f"群组: {msg.group_id}\n"
display_text += f"用户: {msg.user_nickname}({msg.user_id})\n"
display_text += f"内容: {msg.thinking_text}\n"
display_text += f"思考时间: {int(msg.thinking_time)}\n"
else:
display_text += f"类型: 普通消息\n"
display_text += f"时间: {msg_time}\n"
display_text += f"消息ID: {msg.message_id}\n"
display_text += f"群组: {msg.group_id}\n"
display_text += f"用户: {msg.user_nickname}({msg.user_id})\n"
if hasattr(msg, 'is_emoji') and msg.is_emoji:
display_text += f"内容: [表情包消息]\n"
else:
# 显示原始消息和处理后的消息
display_text += f"原始内容: {msg.raw_message[:50]}...\n"
display_text += f"处理后内容: {msg.processed_plain_text[:50]}...\n"
if msg.reply_message:
display_text += f"回复消息: {str(msg.reply_message)[:50]}...\n"
display_text += f"\n{'-' * 50}\n"
# 添加统计信息
display_text += "\n总体统计:\n"
display_text += f"活跃群组数: {len(send_temp_container.temp_queues)}\n"
total_messages = sum(len(q.messages) for q in send_temp_container.temp_queues.values())
display_text += f"总消息数: {total_messages}\n"
thinking_messages = sum(
sum(1 for msg in q.messages if isinstance(msg, Message_Thinking))
for q in send_temp_container.temp_queues.values()
)
display_text += f"思考中消息数: {thinking_messages}\n"
self.message_queue.put(display_text)
def stop(self):
self.is_running = False
if self.process:
self.process.terminate()
self.process = None
# 清理文件
for file in ["message_queue_window.bat", "message_queue_content.txt", "queue_update.txt"]:
if os.path.exists(file):
os.remove(file)
# 创建全局单例
message_visualizer = MessageVisualizer()

View File

@@ -95,7 +95,11 @@ class ResponseGenerator:
# return None
# 生成回复
content, reasoning_content = await model.generate_response(prompt)
try:
content, reasoning_content = await model.generate_response(prompt)
except Exception as e:
print(f"生成回复时出错: {e}")
return None
# 保存到数据库
self._save_to_db(
@@ -138,9 +142,12 @@ class ResponseGenerator:
内容:{content}
输出:
'''
content, _ = await self.model_v3.generate_response(prompt)
return [content.strip()] if content else ["neutral"]
content=content.strip()
if content in ['happy','angry','sad','surprised','disgusted','fearful','neutral']:
return [content]
else:
return ["neutral"]
except Exception as e:
print(f"获取情感标签时出错: {e}")

View File

@@ -52,12 +52,16 @@ class Message_Sender:
await asyncio.sleep(typing_time)
# 发送消息
await self._current_bot.send_group_msg(
group_id=group_id,
message=message,
auto_escape=auto_escape
)
print(f"\033[1;34m[调试]\033[0m 发送消息{message}成功")
try:
await self._current_bot.send_group_msg(
group_id=group_id,
message=message,
auto_escape=auto_escape
)
print(f"\033[1;34m[调试]\033[0m 发送消息{message}成功")
except Exception as e:
print(f"发生错误 {e}")
print(f"\033[1;34m[调试]\033[0m 发送消息{message}失败")
class MessageContainer:

View File

@@ -36,7 +36,9 @@ class PromptBuilder:
memory_prompt = ''
start_time = time.time() # 记录开始时间
topic = topic_identifier.identify_topic_jieba(message_txt)
# topic = await topic_identifier.identify_topic_llm(message_txt)
topic = topic_identifier.identify_topic_snownlp(message_txt)
# print(f"\033[1;32m[pb主题识别]\033[0m 主题: {topic}")
all_first_layer_items = [] # 存储所有第一层记忆
@@ -64,15 +66,7 @@ class PromptBuilder:
if overlap:
# print(f"\033[1;32m[前额叶]\033[0m 发现主题 '{current_topic}' 和 '{other_topic}' 有共同的第二层记忆: {overlap}")
overlapping_second_layer.update(overlap)
# 合并所有需要的记忆
# if all_first_layer_items:
# print(f"\033[1;32m[前额叶]\033[0m 合并所有需要的记忆1: {all_first_layer_items}")
# if overlapping_second_layer:
# print(f"\033[1;32m[前额叶]\033[0m 合并所有需要的记忆2: {list(overlapping_second_layer)}")
# 使用集合去重
# 从每个来源随机选择2条记忆如果有的话
selected_first_layer = random.sample(all_first_layer_items, min(2, len(all_first_layer_items))) if all_first_layer_items else []
selected_second_layer = random.sample(list(overlapping_second_layer), min(2, len(overlapping_second_layer))) if overlapping_second_layer else []

View File

@@ -15,16 +15,6 @@ class TopicIdentifier:
self.llm_client = LLM_request(model=global_config.llm_topic_extract)
self.select=global_config.topic_extract
def identify_topic(self):
if self.select=='jieba':
return self.identify_topic_jieba
elif self.select=='snownlp':
return self.identify_topic_snownlp
elif self.select=='llm':
return self.identify_topic_llm
else:
return self.identify_topic_snownlp
async def identify_topic_llm(self, text: str) -> Optional[List[str]]:
"""识别消息主题,返回主题列表"""
@@ -48,56 +38,10 @@ class TopicIdentifier:
# 解析主题字符串为列表
topic_list = [t.strip() for t in topic.split(",") if t.strip()]
print(f"\033[1;32m[主题识别]\033[0m 主题: {topic_list}")
return topic_list if topic_list else None
def identify_topic_jieba(self, text: str) -> Optional[str]:
"""使用jieba识别主题"""
words = jieba.lcut(text)
# 去除停用词和标点符号
stop_words = {
'', '', '', '', '', '', '', '', '', '', '', '', '', '',
'因为', '所以', '如果', '虽然', '一个', '', '', '', '', '', '我们', '你们',
'他们', '', '', '', '', '', '', '', '', '', '', '', '', '',
'', '', '', '', '', '', '', '', '', '', '', '', '', '',
'', '', '什么', '怎么', '为什么', '怎样', '如何', '什么样', '这样', '那样', '这么',
'那么', '多少', '', '', '哪里', '哪儿', '什么时候', '何时', '为何', '怎么办',
'怎么样', '这些', '那些', '一些', '一点', '一下', '一直', '一定', '一般', '一样',
'一会儿', '一边', '一起',
# 添加更多量词
'', '', '', '', '', '', '', '', '', '', '', '', '',
'', '', '', '', '', '', '', '', '', '', '', '', '',
'', '', '', '', '', '', '', '', '', '', '', '', '',
# 添加更多介词
'', '按照', '', '', '', '比如', '', '除了', '', '', '对于',
'根据', '关于', '', '', '', '', '经过', '', '', '', '通过',
'', '', '', '为了', '围绕', '', '', '由于', '', '', '沿', '沿着',
'', '依照', '', '', '因为', '', '', '', '', '自从'
}
# 过滤掉停用词和标点符号,只保留名词和动词
filtered_words = []
for word in words:
if word not in stop_words and not word.strip() in {
'', '', '', '', '', '', '', '"', '"', ''', ''',
'', '', '', '', '', '', '', '', '·', '', '~',
'', '+', '=', '-', '/', '\\', '|', '*', '#', '@', '$', '%',
'^', '&', '[', ']', '{', '}', '<', '>', '`', '_', '.', ',',
';', ':', '\'', '"', '(', ')', '?', '!', '±', '×', '÷', '',
'', '', '', '', '', '', '', '', '', '', ''
}:
filtered_words.append(word)
# 统计词频
word_freq = {}
for word in filtered_words:
word_freq[word] = word_freq.get(word, 0) + 1
# 按词频排序取前3个
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
top_words = [word for word, freq in sorted_words[:3]]
return top_words if top_words else None
def identify_topic_snownlp(self, text: str) -> Optional[List[str]]:
"""使用 SnowNLP 进行主题识别
@@ -113,7 +57,7 @@ class TopicIdentifier:
try:
s = SnowNLP(text)
# 提取前3个关键词作为主题
keywords = s.keywords(3)
keywords = s.keywords(5)
return keywords if keywords else None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m SnowNLP 处理失败: {str(e)}")

View File

@@ -75,13 +75,11 @@ def cosine_similarity(v1, v2):
norm2 = np.linalg.norm(v2)
return dot_product / (norm1 * norm2)
def calculate_information_content(text):
def calculate_information_content(text):
"""计算文本的信息量(熵)"""
# 统计字符频率
char_count = Counter(text)
total_chars = len(text)
# 计算熵
entropy = 0
for count in char_count.values():
probability = count / total_chars
@@ -90,27 +88,37 @@ def calculate_information_content(text):
return entropy
def get_cloest_chat_from_db(db, length: int, timestamp: str):
# 从数据库中根据时间戳获取离其最近的聊天记录
"""从数据库中获取最接近指定时间戳的聊天记录,并记录读取次数"""
chat_text = ''
closest_record = db.db.messages.find_one({"time": {"$lte": timestamp}}, sort=[('time', -1)]) # 调试输出
# print(f"距离time最近的消息时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(closest_record['time'])))}")
closest_record = db.db.messages.find_one({"time": {"$lte": timestamp}}, sort=[('time', -1)])
if closest_record:
if closest_record and closest_record.get('memorized', 0) < 4:
closest_time = closest_record['time']
group_id = closest_record['group_id'] # 获取groupid
# 获取该时间戳之后的length条消息且groupid相同
chat_record = list(db.db.messages.find({"time": {"$gt": closest_time}, "group_id": group_id}).sort('time', 1).limit(length))
for record in chat_record:
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(record['time'])))
try:
displayname="[(%s)%s]%s" % (record["user_id"],record["user_nickname"],record["user_cardname"])
except:
displayname=record["user_nickname"] or "用户" + str(record["user_id"])
chat_text += f'[{time_str}] {displayname}: {record["processed_plain_text"]}\n' # 添加发送者和时间信息
chat_records = list(db.db.messages.find(
{"time": {"$gt": closest_time}, "group_id": group_id}
).sort('time', 1).limit(length))
# 更新每条消息的memorized属性
for record in chat_records:
# 检查当前记录的memorized值
current_memorized = record.get('memorized', 0)
if current_memorized > 3:
# print(f"消息已读取3次跳过")
return ''
# 更新memorized值
db.db.messages.update_one(
{"_id": record["_id"]},
{"$set": {"memorized": current_memorized + 1}}
)
chat_text += record["detailed_plain_text"]
return chat_text
return [] # 如果没有找到记录,返回空列表
print(f"消息已读取3次跳过")
return ''
def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
"""从数据库获取群组最近的消息记录

View File

@@ -7,6 +7,7 @@ from ...common.database import Database
import zlib # 用于 CRC32
import base64
from nonebot import get_driver
from loguru import logger
driver = get_driver()
config = driver.config
@@ -213,11 +214,11 @@ def storage_image(image_data: bytes) -> bytes:
print(f"\033[1;31m[错误]\033[0m 保存图片失败: {str(e)}")
return image_data
def compress_base64_image_by_scale(base64_data: str, scale: float = 0.5) -> str:
"""按比例压缩base64格式的图片
def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 1024 * 1024) -> str:
"""压缩base64格式的图片到指定大小
Args:
base64_data: base64编码的图片数据
scale: 压缩比例0-1之间的浮点数
target_size: 目标文件大小字节默认0.8MB
Returns:
str: 压缩后的base64图片数据
"""
@@ -225,34 +226,64 @@ def compress_base64_image_by_scale(base64_data: str, scale: float = 0.5) -> str:
# 将base64转换为字节数据
image_data = base64.b64decode(base64_data)
# 如果已经小于目标大小,直接返回原图
if len(image_data) <= target_size:
return base64_data
# 将字节数据转换为图片对象
img = Image.open(io.BytesIO(image_data))
# 如果是动图,直接返回原图
if getattr(img, 'is_animated', False):
return base64_data
# 获取原始尺寸
original_width, original_height = img.size
# 计算缩放比例
scale = min(1.0, (target_size / len(image_data)) ** 0.5)
# 计算新的尺寸
new_width = int(img.width * scale)
new_height = int(img.height * scale)
new_width = int(original_width * scale)
new_height = int(original_height * scale)
# 缩放图片
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 创建内存缓冲区
output_buffer = io.BytesIO()
# 转换为RGB模式去除透明通道
if img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
# 如果是GIF处理所有帧
if getattr(img, "is_animated", False):
frames = []
for frame_idx in range(img.n_frames):
img.seek(frame_idx)
new_frame = img.copy()
new_frame = new_frame.resize((new_width, new_height), Image.Resampling.LANCZOS)
frames.append(new_frame)
# 保存到缓冲区
frames[0].save(
output_buffer,
format='GIF',
save_all=True,
append_images=frames[1:],
optimize=True,
duration=img.info.get('duration', 100),
loop=img.info.get('loop', 0)
)
else:
# 处理静态图片
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 保存到缓冲区,保持原始格式
if img.format == 'PNG' and img.mode in ('RGBA', 'LA'):
resized_img.save(output_buffer, format='PNG', optimize=True)
else:
resized_img.save(output_buffer, format='JPEG', quality=95, optimize=True)
# 保存压缩后的图片
output = io.BytesIO()
img.save(output, format='JPEG', quality=85, optimize=True)
compressed_data = output.getvalue()
# 获取压缩后的数据并转换为base64
compressed_data = output_buffer.getvalue()
logger.success(f"压缩图片: {original_width}x{original_height} -> {new_width}x{new_height}")
logger.info(f"压缩前大小: {len(image_data)/1024:.1f}KB, 压缩后大小: {len(compressed_data)/1024:.1f}KB")
# 转换回base64
return base64.b64encode(compressed_data).decode('utf-8')
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 压缩图片失败: {str(e)}")
logger.error(f"压缩图片失败: {str(e)}")
import traceback
print(traceback.format_exc())
logger.error(traceback.format_exc())
return base64_data

View File

@@ -9,7 +9,7 @@ class WillingManager:
async def _decay_reply_willing(self):
"""定期衰减回复意愿"""
while True:
await asyncio.sleep(3)
await asyncio.sleep(5)
for group_id in self.group_reply_willing:
self.group_reply_willing[group_id] = max(0, self.group_reply_willing[group_id] * 0.6)
@@ -39,11 +39,11 @@ class WillingManager:
if interested_rate > 0.65:
print(f"兴趣度: {interested_rate}, 当前意愿: {current_willing}")
current_willing += interested_rate-0.5
current_willing += interested_rate-0.6
self.group_reply_willing[group_id] = min(current_willing, 3.0)
reply_probability = max((current_willing - 0.5) * 2, 0)
reply_probability = max((current_willing - 0.55) * 1.9, 0)
if group_id not in config.talk_allowed_groups:
current_willing = 0
reply_probability = 0
@@ -52,8 +52,8 @@ class WillingManager:
reply_probability = reply_probability / 3.5
reply_probability = min(reply_probability, 1)
if reply_probability < 0.1:
reply_probability = 0.1
if reply_probability < 0:
reply_probability = 0
return reply_probability
def change_reply_willing_sent(self, group_id: int):
@@ -65,7 +65,7 @@ class WillingManager:
"""发送消息后提高群组的回复意愿"""
current_willing = self.group_reply_willing.get(group_id, 0)
if current_willing < 1:
self.group_reply_willing[group_id] = min(1, current_willing + 0.3)
self.group_reply_willing[group_id] = min(1, current_willing + 0.2)
async def ensure_started(self):
"""确保衰减任务已启动"""