diff --git a/README.md b/README.md
index 656f536ad..26cd30f61 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@
-
+
👆 点击观看麦麦演示视频 👆
@@ -186,7 +185,7 @@ MaiCore是一个开源项目,我们非常欢迎你的参与。你的贡献,
感谢各位大佬!
-
+
**也感谢每一位给麦麦发展提出宝贵意见与建议的用户,感谢陪伴麦麦走到现在的你们**
diff --git a/src/individuality/individuality.py b/src/individuality/individuality.py
index e7616ec27..2a489338a 100644
--- a/src/individuality/individuality.py
+++ b/src/individuality/individuality.py
@@ -105,3 +105,4 @@ class Individuality:
return self.personality.agreeableness
elif factor == "neuroticism":
return self.personality.neuroticism
+ return None
diff --git a/src/main.py b/src/main.py
index f113a732d..05196068b 100644
--- a/src/main.py
+++ b/src/main.py
@@ -118,11 +118,8 @@ class MainSystem:
await interest_manager.start_background_tasks()
logger.success("兴趣管理器后台任务启动成功")
- # 初始化 ReasoningChat 单例 (确保它在需要之前被创建)
- ReasoningChat.get_instance()
- logger.success("ReasoningChat 单例初始化成功")
-
- # 初始化并独立启动 HeartFC_Chat 控制器 (使用 get_instance 获取单例)
+ # 初始化并独立启动 HeartFC_Chat
+ HeartFC_Controller()
heartfc_chat_instance = HeartFC_Controller.get_instance()
if heartfc_chat_instance:
await heartfc_chat_instance.start()
diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py
index 598468e89..9502b755c 100644
--- a/src/plugins/PFC/conversation.py
+++ b/src/plugins/PFC/conversation.py
@@ -180,6 +180,7 @@ class Conversation:
"time": datetime.datetime.now().strftime("%H:%M:%S"),
}
)
+ return None
elif action == "fetch_knowledge":
self.waiter.wait_accumulated_time = 0
@@ -193,28 +194,35 @@ class Conversation:
if knowledge:
if topic not in self.conversation_info.knowledge_list:
self.conversation_info.knowledge_list.append({"topic": topic, "knowledge": knowledge})
+ return None
else:
self.conversation_info.knowledge_list[topic] += knowledge
+ return None
+ return None
elif action == "rethink_goal":
self.waiter.wait_accumulated_time = 0
self.state = ConversationState.RETHINKING
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
+ return None
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info("倾听对方发言...")
await self.waiter.wait_listening(conversation_info)
+ return None
elif action == "end_conversation":
self.should_continue = False
logger.info("决定结束对话...")
+ return None
else: # wait
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
await self.waiter.wait(self.conversation_info)
+ return None
async def _send_timeout_message(self):
"""发送超时结束消息"""
diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py
index eaf829970..12ee7b6d3 100644
--- a/src/plugins/chat/bot.py
+++ b/src/plugins/chat/bot.py
@@ -7,7 +7,7 @@ from ..chat_module.only_process.only_message_process import MessageProcessor
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from ..chat_module.reasoning_chat.reasoning_chat import ReasoningChat
-from ..chat_module.heartFC_chat.heartFC_processor import HeartFC_Processor
+from ..chat_module.heartFC_chat.heartFC_processor import HeartFCProcessor
from ..utils.prompt_builder import Prompt, global_prompt_manager
import traceback
@@ -29,7 +29,7 @@ class ChatBot:
self.mood_manager = MoodManager.get_instance() # 获取情绪管理器单例
self.mood_manager.start_mood_update() # 启动情绪更新
self.reasoning_chat = ReasoningChat()
- self.heartFC_processor = HeartFC_Processor() # 新增
+ self.heartFC_processor = HeartFCProcessor() # 新增
# 创建初始化PFC管理器的任务,会在_ensure_started时执行
self.only_process_chat = MessageProcessor()
diff --git a/src/plugins/chat/message.py b/src/plugins/chat/message.py
index cbea1fd92..87380e7c0 100644
--- a/src/plugins/chat/message.py
+++ b/src/plugins/chat/message.py
@@ -1,14 +1,13 @@
import time
from dataclasses import dataclass
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
import urllib3
-from .utils_image import image_manager
-
-from ..message.message_base import Seg, UserInfo, BaseMessageInfo, MessageBase
-from .chat_stream import ChatStream
from src.common.logger import get_module_logger
+from .chat_stream import ChatStream
+from .utils_image import image_manager
+from ..message.message_base import Seg, UserInfo, BaseMessageInfo, MessageBase
logger = get_module_logger("chat_message")
@@ -207,7 +206,7 @@ class MessageProcessBase(Message):
# 处理单个消息段
return await self._process_single_segment(segment)
- async def _process_single_segment(self, seg: Seg) -> str:
+ async def _process_single_segment(self, seg: Seg) -> Union[str, None]:
"""处理单个消息段
Args:
@@ -233,6 +232,7 @@ class MessageProcessBase(Message):
elif seg.type == "reply":
if self.reply and hasattr(self.reply, "processed_plain_text"):
return f"[回复:{self.reply.processed_plain_text}]"
+ return None
else:
return f"[{seg.type}:{str(seg.data)}]"
except Exception as e:
diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py
index 9c98a16a5..3e4cfa52d 100644
--- a/src/plugins/chat/utils.py
+++ b/src/plugins/chat/utils.py
@@ -2,7 +2,7 @@ import random
import time
import re
from collections import Counter
-from typing import Dict, List
+from typing import Dict, List, Optional
import jieba
import numpy as np
@@ -688,7 +688,7 @@ def count_messages_between(start_time: float, end_time: float, stream_id: str) -
return 0, 0
-def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> str:
+def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> Optional[str]:
"""将时间戳转换为人类可读的时间格式
Args:
@@ -716,6 +716,7 @@ def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal"
return f"{int(diff / 86400)}天前:\n"
else:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":\n"
+ return None
def parse_text_timestamps(text: str, mode: str = "normal") -> str:
diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_controler.py b/src/plugins/chat_module/heartFC_chat/heartFC_controler.py
index 55790eb4c..51b1a05da 100644
--- a/src/plugins/chat_module/heartFC_chat/heartFC_controler.py
+++ b/src/plugins/chat_module/heartFC_chat/heartFC_controler.py
@@ -2,6 +2,7 @@ import traceback
from typing import Optional, Dict
import asyncio
from asyncio import Lock
+import threading # 导入 threading
from ...moods.moods import MoodManager
from ...chat.emoji_manager import emoji_manager
from .heartFC_generator import ResponseGenerator
@@ -13,7 +14,7 @@ from src.do_tool.tool_use import ToolUser
from .interest import InterestManager
from src.plugins.chat.chat_stream import chat_manager
from .pf_chatting import PFChatting
-import threading # 导入 threading
+
# 定义日志配置
chat_config = LogConfig(
@@ -21,51 +22,73 @@ chat_config = LogConfig(
file_format=CHAT_STYLE_CONFIG["file_format"],
)
-logger = get_module_logger("HeartFC_Controller", config=chat_config)
+logger = get_module_logger("HeartFCController", config=chat_config)
# 检测群聊兴趣的间隔时间
INTEREST_MONITOR_INTERVAL_SECONDS = 1
-class HeartFC_Controller:
+# 合并后的版本:使用 __new__ + threading.Lock 实现线程安全单例,类名为 HeartFCController
+class HeartFCController:
_instance = None
- _lock = threading.Lock() # 使用 threading.Lock 替代 asyncio.Lock 以兼容 __new__
+ _lock = threading.Lock() # 使用 threading.Lock 保证 __new__ 线程安全
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
+ # Double-checked locking
if cls._instance is None:
+ logger.debug("创建 HeartFCController 单例实例...")
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
+ # 使用 _initialized 标志确保 __init__ 只执行一次
if self._initialized:
return
- with self.__class__._lock: # 使用类锁确保初始化线程安全
- if self._initialized:
+ # 虽然 __new__ 保证了只有一个实例,但为了防止意外重入或多线程下的初始化竞争,
+ # 再次使用类锁保护初始化过程是更严谨的做法。
+ # 如果确定 __init__ 逻辑本身是幂等的或非关键的,可以省略这里的锁。
+ # 但为了保持原始逻辑的意图(防止重复初始化),这里保留检查。
+ with self.__class__._lock: # 确保初始化逻辑线程安全
+ if self._initialized: # 再次检查,防止锁等待期间其他线程已完成初始化
return
- logger.info("正在初始化 HeartFC_Controller 单例...")
+
+ logger.info("正在初始化 HeartFCController 单例...")
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
+ # 注意:mood_manager 的 start_mood_update 可能需要在应用主循环启动后调用,
+ # 或者确保其内部实现是安全的。这里保持原状。
self.mood_manager.start_mood_update()
self.tool_user = ToolUser()
+ # 注意:InterestManager() 可能是另一个单例或需要特定初始化。
+ # 假设 InterestManager() 返回的是正确配置的实例。
self.interest_manager = InterestManager()
self._interest_monitor_task: Optional[asyncio.Task] = None
self.pf_chatting_instances: Dict[str, PFChatting] = {}
- self._pf_chatting_lock = Lock() # 这个可以是 asyncio.Lock,用于异步上下文
- self.emoji_manager = emoji_manager
- self.relationship_manager = relationship_manager
+ # _pf_chatting_lock 用于保护 pf_chatting_instances 的异步操作
+ self._pf_chatting_lock = asyncio.Lock() # 这个是 asyncio.Lock,用于异步上下文
+ self.emoji_manager = emoji_manager # 假设是全局或已初始化的实例
+ self.relationship_manager = relationship_manager # 假设是全局或已初始化的实例
+ # MessageManager 可能是类本身或单例实例,根据其设计确定
self.MessageManager = MessageManager
self._initialized = True
- logger.info("HeartFC_Controller 单例初始化完成。")
+ logger.info("HeartFCController 单例初始化完成。")
@classmethod
def get_instance(cls):
- """获取 HeartFC_Controller 的单例实例。"""
+ """获取 HeartFCController 的单例实例。"""
+ # 如果实例尚未创建,调用构造函数(这将触发 __new__ 和 __init__)
if cls._instance is None:
- logger.warning("HeartFC_Controller 实例在首次 get_instance 时创建,可能未在 main 中正确初始化。")
- cls() # 调用构造函数创建
+ # 在首次调用 get_instance 时创建实例。
+ # __new__ 中的锁会确保线程安全。
+ cls()
+ # 添加日志记录,说明实例是在 get_instance 调用时创建的
+ logger.info("HeartFCController 实例在首次 get_instance 时创建。")
+ elif not cls._initialized:
+ # 实例已创建但可能未初始化完成(理论上不太可能发生,除非 __init__ 异常)
+ logger.warning("HeartFCController 实例存在但尚未完成初始化。")
return cls._instance
# --- 新增:检查 PFChatting 状态的方法 --- #
@@ -83,9 +106,9 @@ class HeartFC_Controller:
async def start(self):
"""启动异步任务,如回复启动器"""
- logger.debug("HeartFC_Controller 正在启动异步任务...")
+ logger.debug("HeartFCController 正在启动异步任务...")
self._initialize_monitor_task()
- logger.info("HeartFC_Controller 异步任务启动完成")
+ logger.info("HeartFCController 异步任务启动完成")
def _initialize_monitor_task(self):
"""启动后台兴趣监控任务,可以检查兴趣是否足以开启心流对话"""
@@ -105,7 +128,7 @@ class HeartFC_Controller:
async with self._pf_chatting_lock:
if stream_id not in self.pf_chatting_instances:
logger.info(f"为流 {stream_id} 创建新的PFChatting实例")
- # 传递 self (HeartFC_Controller 实例) 进行依赖注入
+ # 传递 self (HeartFCController 实例) 进行依赖注入
instance = PFChatting(stream_id, self)
# 执行异步初始化
if not await instance._initialize():
diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py
index 38c687791..f907a8be0 100644
--- a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py
+++ b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py
@@ -26,7 +26,7 @@ logger = get_module_logger("heartFC_processor", config=processor_config)
# INTEREST_INCREASE_THRESHOLD = 0.5
-class HeartFC_Processor:
+class HeartFCProcessor:
def __init__(self):
self.storage = MessageStorage()
self.interest_manager = InterestManager()
@@ -97,21 +97,21 @@ class HeartFC_Processor:
# 处理缓冲器结果 (Bombing logic)
if not buffer_result:
- F_type = "seglist"
+ f_type = "seglist"
if message.message_segment.type != "seglist":
- F_type = message.message_segment.type
+ f_type = message.message_segment.type
else:
if (
isinstance(message.message_segment.data, list)
and all(isinstance(x, Seg) for x in message.message_segment.data)
and len(message.message_segment.data) == 1
):
- F_type = message.message_segment.data[0].type
- if F_type == "text":
+ f_type = message.message_segment.data[0].type
+ if f_type == "text":
logger.debug(f"触发缓冲,消息:{message.processed_plain_text}")
- elif F_type == "image":
+ elif f_type == "image":
logger.debug("触发缓冲,表情包/图片等待中")
- elif F_type == "seglist":
+ elif f_type == "seglist":
logger.debug("触发缓冲,消息列表等待中")
return # 被缓冲器拦截,不生成回复
diff --git a/src/plugins/chat_module/heartFC_chat/pf_chatting.py b/src/plugins/chat_module/heartFC_chat/pf_chatting.py
index 7e6acd537..e4486a795 100644
--- a/src/plugins/chat_module/heartFC_chat/pf_chatting.py
+++ b/src/plugins/chat_module/heartFC_chat/pf_chatting.py
@@ -28,7 +28,7 @@ logger = get_module_logger("PFCLoop", config=interest_log_config) # Logger Name
# Forward declaration for type hinting
if TYPE_CHECKING:
- from .heartFC_controler import HeartFC_Controller
+ from .heartFC_controler import HeartFCController
PLANNER_TOOL_DEFINITION = [
{
@@ -64,7 +64,7 @@ class PFChatting:
只要计时器>0,循环就会继续。
"""
- def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFC_Controller"):
+ def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFCController"):
"""
初始化PFChatting实例。
@@ -377,6 +377,22 @@ class PFChatting:
)
action_taken_this_cycle = False
+ # --- Print Timer Results --- #
+ if cycle_timers: # 先检查cycle_timers是否非空
+ timer_strings = []
+ for name, elapsed in cycle_timers.items():
+ # 直接格式化存储在字典中的浮点数 elapsed
+ formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
+ timer_strings.append(f"{name}: {formatted_time}")
+
+ if timer_strings: # 如果有有效计时器数据才打印
+ logger.debug(
+ f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}"
+ )
+
+ # --- Timer Decrement --- #
+ cycle_duration = time.monotonic() - loop_cycle_start_time
+
except Exception as e_cycle:
logger.error(f"{log_prefix} 循环周期执行时发生错误: {e_cycle}")
logger.error(traceback.format_exc())
@@ -390,21 +406,6 @@ class PFChatting:
self._processing_lock.release()
logger.trace(f"{log_prefix} 循环释放了处理锁.")
- # --- Print Timer Results --- #
- if cycle_timers: # 先检查cycle_timers是否非空
- timer_strings = []
- for name, elapsed in cycle_timers.items():
- # 直接格式化存储在字典中的浮点数 elapsed
- formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
- timer_strings.append(f"{name}: {formatted_time}")
-
- if timer_strings: # 如果有有效计时器数据才打印
- logger.debug(
- f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}"
- )
-
- # --- Timer Decrement --- #
- cycle_duration = time.monotonic() - loop_cycle_start_time
async with self._timer_lock:
self._loop_timer -= cycle_duration
# Log timer decrement less aggressively
@@ -774,7 +775,7 @@ class PFChatting:
logger.error(traceback.format_exc())
return None
- # --- Methods moved from HeartFC_Controller start ---
+ # --- Methods moved from HeartFCController start ---
async def _create_thinking_message(self, anchor_message: Optional[MessageRecv]) -> Optional[str]:
"""创建思考消息 (尝试锚定到 anchor_message)"""
if not anchor_message or not anchor_message.chat_stream:
diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py
index be1c66280..50613a982 100644
--- a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py
+++ b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py
@@ -1,25 +1,26 @@
import time
-from random import random
import traceback
-from typing import List
-from ...memory_system.Hippocampus import HippocampusManager
-from ...moods.moods import MoodManager
-from ....config.config import global_config
-from ...chat.emoji_manager import emoji_manager
+from random import random
+from typing import List, Optional
+
+from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
+from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from .reasoning_generator import ResponseGenerator
+from ...chat.chat_stream import chat_manager
+from ...chat.emoji_manager import emoji_manager
from ...chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet
+from ...chat.message_buffer import message_buffer
from ...chat.messagesender import message_manager
-from ...storage.storage import MessageStorage
from ...chat.utils import is_mentioned_bot_in_message
from ...chat.utils_image import image_path_to_base64
-from ...willing.willing_manager import willing_manager
+from ...memory_system.Hippocampus import HippocampusManager
from ...message import UserInfo, Seg
-from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
-from ...chat.chat_stream import chat_manager
+from ...moods.moods import MoodManager
from ...person_info.relationship_manager import relationship_manager
-from ...chat.message_buffer import message_buffer
-from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
+from ...storage.storage import MessageStorage
from ...utils.timer_calculater import Timer
+from ...willing.willing_manager import willing_manager
+from ....config.config import global_config
# 定义日志配置
chat_config = LogConfig(
@@ -61,7 +62,7 @@ class ReasoningChat:
return thinking_id
@staticmethod
- async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> MessageSending:
+ async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> Optional[MessageSending]:
"""发送回复消息"""
container = message_manager.get_container(chat.stream_id)
thinking_message = None
@@ -74,7 +75,7 @@ class ReasoningChat:
if not thinking_message:
logger.warning("未找到对应的思考消息,可能已超时被移除")
- return
+ return None
thinking_start_time = thinking_message.thinking_start_time
message_set = MessageSet(chat, thinking_id)
diff --git a/src/plugins/memory_system/Hippocampus.py b/src/plugins/memory_system/Hippocampus.py
index 5ccdec5a5..738e47c4e 100644
--- a/src/plugins/memory_system/Hippocampus.py
+++ b/src/plugins/memory_system/Hippocampus.py
@@ -342,720 +342,6 @@ class Hippocampus:
memories.sort(key=lambda x: x[2], reverse=True)
return memories
- async def get_memory_from_text(
- self,
- text: str,
- max_memory_num: int = 3,
- max_memory_length: int = 2,
- max_depth: int = 3,
- fast_retrieval: bool = False,
- ) -> list:
- """从文本中提取关键词并获取相关记忆。
-
- Args:
- text (str): 输入文本
- max_memory_num (int, optional): 记忆数量限制。默认为3。
- max_memory_length (int, optional): 记忆长度限制。默认为2。
- max_depth (int, optional): 记忆检索深度。默认为2。
- fast_retrieval (bool, optional): 是否使用快速检索。默认为False。
- 如果为True,使用jieba分词和TF-IDF提取关键词,速度更快但可能不够准确。
- 如果为False,使用LLM提取关键词,速度较慢但更准确。
-
- Returns:
- list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity)
- - topic: str, 记忆主题
- - memory_items: list, 该主题下的记忆项列表
- - similarity: float, 与文本的相似度
- """
- if not text:
- return []
-
- if fast_retrieval:
- # 使用jieba分词提取关键词
- words = jieba.cut(text)
- # 过滤掉停用词和单字词
- keywords = [word for word in words if len(word) > 1]
- # 去重
- keywords = list(set(keywords))
- # 限制关键词数量
- keywords = keywords[:5]
- else:
- # 使用LLM提取关键词
- topic_num = min(5, max(1, int(len(text) * 0.1))) # 根据文本长度动态调整关键词数量
- # logger.info(f"提取关键词数量: {topic_num}")
- topics_response = await self.llm_topic_judge.generate_response(self.find_topic_llm(text, topic_num))
-
- # 提取关键词
- keywords = re.findall(r"<([^>]+)>", topics_response[0])
- if not keywords:
- keywords = []
- else:
- keywords = [
- keyword.strip()
- for keyword in ",".join(keywords).replace(",", ",").replace("、", ",").replace(" ", ",").split(",")
- if keyword.strip()
- ]
-
- # logger.info(f"提取的关键词: {', '.join(keywords)}")
-
- # 过滤掉不存在于记忆图中的关键词
- valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]
- if not valid_keywords:
- # logger.info("没有找到有效的关键词节点")
- return []
-
- logger.info(f"有效的关键词: {', '.join(valid_keywords)}")
-
- # 从每个关键词获取记忆
- all_memories = []
- activate_map = {} # 存储每个词的累计激活值
-
- # 对每个关键词进行扩散式检索
- for keyword in valid_keywords:
- logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):")
- # 初始化激活值
- activation_values = {keyword: 1.0}
- # 记录已访问的节点
- visited_nodes = {keyword}
- # 待处理的节点队列,每个元素是(节点, 激活值, 当前深度)
- nodes_to_process = [(keyword, 1.0, 0)]
-
- while nodes_to_process:
- current_node, current_activation, current_depth = nodes_to_process.pop(0)
-
- # 如果激活值小于0或超过最大深度,停止扩散
- if current_activation <= 0 or current_depth >= max_depth:
- continue
-
- # 获取当前节点的所有邻居
- neighbors = list(self.memory_graph.G.neighbors(current_node))
-
- for neighbor in neighbors:
- if neighbor in visited_nodes:
- continue
-
- # 获取连接强度
- edge_data = self.memory_graph.G[current_node][neighbor]
- strength = edge_data.get("strength", 1)
-
- # 计算新的激活值
- new_activation = current_activation - (1 / strength)
-
- if new_activation > 0:
- activation_values[neighbor] = new_activation
- visited_nodes.add(neighbor)
- nodes_to_process.append((neighbor, new_activation, current_depth + 1))
- logger.trace(
- f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})"
- ) # noqa: E501
-
- # 更新激活映射
- for node, activation_value in activation_values.items():
- if activation_value > 0:
- if node in activate_map:
- activate_map[node] += activation_value
- else:
- activate_map[node] = activation_value
-
- # 输出激活映射
- # logger.info("激活映射统计:")
- # for node, total_activation in sorted(activate_map.items(), key=lambda x: x[1], reverse=True):
- # logger.info(f"节点 '{node}': 累计激活值 = {total_activation:.2f}")
-
- # 基于激活值平方的独立概率选择
- remember_map = {}
- # logger.info("基于激活值平方的归一化选择:")
-
- # 计算所有激活值的平方和
- total_squared_activation = sum(activation**2 for activation in activate_map.values())
- if total_squared_activation > 0:
- # 计算归一化的激活值
- normalized_activations = {
- node: (activation**2) / total_squared_activation for node, activation in activate_map.items()
- }
-
- # 按归一化激活值排序并选择前max_memory_num个
- sorted_nodes = sorted(normalized_activations.items(), key=lambda x: x[1], reverse=True)[:max_memory_num]
-
- # 将选中的节点添加到remember_map
- for node, normalized_activation in sorted_nodes:
- remember_map[node] = activate_map[node] # 使用原始激活值
- logger.debug(
- f"节点 '{node}' (归一化激活值: {normalized_activation:.2f}, 激活值: {activate_map[node]:.2f})"
- )
- else:
- logger.info("没有有效的激活值")
-
- # 从选中的节点中提取记忆
- all_memories = []
- # logger.info("开始从选中的节点中提取记忆:")
- for node, activation in remember_map.items():
- logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):")
- node_data = self.memory_graph.G.nodes[node]
- memory_items = node_data.get("memory_items", [])
- if not isinstance(memory_items, list):
- memory_items = [memory_items] if memory_items else []
-
- if memory_items:
- logger.debug(f"节点包含 {len(memory_items)} 条记忆")
- # 计算每条记忆与输入文本的相似度
- memory_similarities = []
- for memory in memory_items:
- # 计算与输入文本的相似度
- memory_words = set(jieba.cut(memory))
- text_words = set(jieba.cut(text))
- all_words = memory_words | text_words
- v1 = [1 if word in memory_words else 0 for word in all_words]
- v2 = [1 if word in text_words else 0 for word in all_words]
- similarity = cosine_similarity(v1, v2)
- memory_similarities.append((memory, similarity))
-
- # 按相似度排序
- memory_similarities.sort(key=lambda x: x[1], reverse=True)
- # 获取最匹配的记忆
- top_memories = memory_similarities[:max_memory_length]
-
- # 添加到结果中
- for memory, similarity in top_memories:
- all_memories.append((node, [memory], similarity))
- # logger.info(f"选中记忆: {memory} (相似度: {similarity:.2f})")
- else:
- logger.info("节点没有记忆")
-
- # 去重(基于记忆内容)
- logger.debug("开始记忆去重:")
- seen_memories = set()
- unique_memories = []
- for topic, memory_items, activation_value in all_memories:
- memory = memory_items[0] # 因为每个topic只有一条记忆
- if memory not in seen_memories:
- seen_memories.add(memory)
- unique_memories.append((topic, memory_items, activation_value))
- logger.debug(f"保留记忆: {memory} (来自节点: {topic}, 激活值: {activation_value:.2f})")
- else:
- logger.debug(f"跳过重复记忆: {memory} (来自节点: {topic})")
-
- # 转换为(关键词, 记忆)格式
- result = []
- for topic, memory_items, _ in unique_memories:
- memory = memory_items[0] # 因为每个topic只有一条记忆
- result.append((topic, memory))
- logger.info(f"选中记忆: {memory} (来自节点: {topic})")
-
- return result
-
- async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float:
- """从文本中提取关键词并获取相关记忆。
-
- Args:
- text (str): 输入文本
- max_depth (int, optional): 记忆检索深度。默认为2。
- fast_retrieval (bool, optional): 是否使用快速检索。默认为False。
- 如果为True,使用jieba分词和TF-IDF提取关键词,速度更快但可能不够准确。
- 如果为False,使用LLM提取关键词,速度较慢但更准确。
-
- Returns:
- float: 激活节点数与总节点数的比值
- """
- if not text:
- return 0
-
- if fast_retrieval:
- # 使用jieba分词提取关键词
- words = jieba.cut(text)
- # 过滤掉停用词和单字词
- keywords = [word for word in words if len(word) > 1]
- # 去重
- keywords = list(set(keywords))
- # 限制关键词数量
- keywords = keywords[:5]
- else:
- # 使用LLM提取关键词
- topic_num = min(5, max(1, int(len(text) * 0.1))) # 根据文本长度动态调整关键词数量
- # logger.info(f"提取关键词数量: {topic_num}")
- topics_response = await self.llm_topic_judge.generate_response(self.find_topic_llm(text, topic_num))
-
- # 提取关键词
- keywords = re.findall(r"<([^>]+)>", topics_response[0])
- if not keywords:
- keywords = []
- else:
- keywords = [
- keyword.strip()
- for keyword in ",".join(keywords).replace(",", ",").replace("、", ",").replace(" ", ",").split(",")
- if keyword.strip()
- ]
-
- # logger.info(f"提取的关键词: {', '.join(keywords)}")
-
- # 过滤掉不存在于记忆图中的关键词
- valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]
- if not valid_keywords:
- # logger.info("没有找到有效的关键词节点")
- return 0
-
- logger.info(f"有效的关键词: {', '.join(valid_keywords)}")
-
- # 从每个关键词获取记忆
- activate_map = {} # 存储每个词的累计激活值
-
- # 对每个关键词进行扩散式检索
- for keyword in valid_keywords:
- logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):")
- # 初始化激活值
- activation_values = {keyword: 1.0}
- # 记录已访问的节点
- visited_nodes = {keyword}
- # 待处理的节点队列,每个元素是(节点, 激活值, 当前深度)
- nodes_to_process = [(keyword, 1.0, 0)]
-
- while nodes_to_process:
- current_node, current_activation, current_depth = nodes_to_process.pop(0)
-
- # 如果激活值小于0或超过最大深度,停止扩散
- if current_activation <= 0 or current_depth >= max_depth:
- continue
-
- # 获取当前节点的所有邻居
- neighbors = list(self.memory_graph.G.neighbors(current_node))
-
- for neighbor in neighbors:
- if neighbor in visited_nodes:
- continue
-
- # 获取连接强度
- edge_data = self.memory_graph.G[current_node][neighbor]
- strength = edge_data.get("strength", 1)
-
- # 计算新的激活值
- new_activation = current_activation - (1 / strength)
-
- if new_activation > 0:
- activation_values[neighbor] = new_activation
- visited_nodes.add(neighbor)
- nodes_to_process.append((neighbor, new_activation, current_depth + 1))
- # logger.debug(
- # f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})") # noqa: E501
-
- # 更新激活映射
- for node, activation_value in activation_values.items():
- if activation_value > 0:
- if node in activate_map:
- activate_map[node] += activation_value
- else:
- activate_map[node] = activation_value
-
- # 输出激活映射
- # logger.info("激活映射统计:")
- # for node, total_activation in sorted(activate_map.items(), key=lambda x: x[1], reverse=True):
- # logger.info(f"节点 '{node}': 累计激活值 = {total_activation:.2f}")
-
- # 计算激活节点数与总节点数的比值
- total_activation = sum(activate_map.values())
- logger.info(f"总激活值: {total_activation:.2f}")
- total_nodes = len(self.memory_graph.G.nodes())
- # activated_nodes = len(activate_map)
- activation_ratio = total_activation / total_nodes if total_nodes > 0 else 0
- activation_ratio = activation_ratio * 60
- logger.info(f"总激活值: {total_activation:.2f}, 总节点数: {total_nodes}, 激活: {activation_ratio}")
-
- return activation_ratio
-
-
-# 负责海马体与其他部分的交互
-class EntorhinalCortex:
- def __init__(self, hippocampus: Hippocampus):
- self.hippocampus = hippocampus
- self.memory_graph = hippocampus.memory_graph
- self.config = hippocampus.config
-
- def get_memory_sample(self):
- """从数据库获取记忆样本"""
- # 硬编码:每条消息最大记忆次数
- max_memorized_time_per_msg = 3
-
- # 创建双峰分布的记忆调度器
- sample_scheduler = MemoryBuildScheduler(
- n_hours1=self.config.memory_build_distribution[0],
- std_hours1=self.config.memory_build_distribution[1],
- weight1=self.config.memory_build_distribution[2],
- n_hours2=self.config.memory_build_distribution[3],
- std_hours2=self.config.memory_build_distribution[4],
- weight2=self.config.memory_build_distribution[5],
- total_samples=self.config.build_memory_sample_num,
- )
-
- timestamps = sample_scheduler.get_timestamp_array()
- logger.info(f"回忆往事: {[time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts)) for ts in timestamps]}")
- chat_samples = []
- for timestamp in timestamps:
- messages = self.random_get_msg_snippet(
- timestamp, self.config.build_memory_sample_length, max_memorized_time_per_msg
- )
- if messages:
- time_diff = (datetime.datetime.now().timestamp() - timestamp) / 3600
- logger.debug(f"成功抽取 {time_diff:.1f} 小时前的消息样本,共{len(messages)}条")
- chat_samples.append(messages)
- else:
- logger.debug(f"时间戳 {timestamp} 的消息样本抽取失败")
-
- return chat_samples
-
- @staticmethod
- def random_get_msg_snippet(target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list:
- """从数据库中随机获取指定时间戳附近的消息片段"""
- try_count = 0
- while try_count < 3:
- messages = get_closest_chat_from_db(length=chat_size, timestamp=target_timestamp)
- if messages:
- for message in messages:
- if message["memorized_times"] >= max_memorized_time_per_msg:
- messages = None
- break
- if messages:
- for message in messages:
- db.messages.update_one(
- {"_id": message["_id"]}, {"$set": {"memorized_times": message["memorized_times"] + 1}}
- )
- return messages
- try_count += 1
- return None
-
- async def sync_memory_to_db(self):
- """将记忆图同步到数据库"""
- # 获取数据库中所有节点和内存中所有节点
- db_nodes = list(db.graph_data.nodes.find())
- memory_nodes = list(self.memory_graph.G.nodes(data=True))
-
- # 转换数据库节点为字典格式,方便查找
- db_nodes_dict = {node["concept"]: node for node in db_nodes}
-
- # 检查并更新节点
- for concept, data in memory_nodes:
- memory_items = data.get("memory_items", [])
- if not isinstance(memory_items, list):
- memory_items = [memory_items] if memory_items else []
-
- # 计算内存中节点的特征值
- memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items)
-
- # 获取时间信息
- created_time = data.get("created_time", datetime.datetime.now().timestamp())
- last_modified = data.get("last_modified", datetime.datetime.now().timestamp())
-
- if concept not in db_nodes_dict:
- # 数据库中缺少的节点,添加
- node_data = {
- "concept": concept,
- "memory_items": memory_items,
- "hash": memory_hash,
- "created_time": created_time,
- "last_modified": last_modified,
- }
- db.graph_data.nodes.insert_one(node_data)
- else:
- # 获取数据库中节点的特征值
- db_node = db_nodes_dict[concept]
- db_hash = db_node.get("hash", None)
-
- # 如果特征值不同,则更新节点
- if db_hash != memory_hash:
- db.graph_data.nodes.update_one(
- {"concept": concept},
- {
- "$set": {
- "memory_items": memory_items,
- "hash": memory_hash,
- "created_time": created_time,
- "last_modified": last_modified,
- }
- },
- )
-
- # 处理边的信息
- db_edges = list(db.graph_data.edges.find())
- memory_edges = list(self.memory_graph.G.edges(data=True))
-
- # 创建边的哈希值字典
- db_edge_dict = {}
- for edge in db_edges:
- edge_hash = self.hippocampus.calculate_edge_hash(edge["source"], edge["target"])
- db_edge_dict[(edge["source"], edge["target"])] = {"hash": edge_hash, "strength": edge.get("strength", 1)}
-
- # 检查并更新边
- for source, target, data in memory_edges:
- edge_hash = self.hippocampus.calculate_edge_hash(source, target)
- edge_key = (source, target)
- strength = data.get("strength", 1)
-
- # 获取边的时间信息
- created_time = data.get("created_time", datetime.datetime.now().timestamp())
- last_modified = data.get("last_modified", datetime.datetime.now().timestamp())
-
- if edge_key not in db_edge_dict:
- # 添加新边
- edge_data = {
- "source": source,
- "target": target,
- "strength": strength,
- "hash": edge_hash,
- "created_time": created_time,
- "last_modified": last_modified,
- }
- db.graph_data.edges.insert_one(edge_data)
- else:
- # 检查边的特征值是否变化
- if db_edge_dict[edge_key]["hash"] != edge_hash:
- db.graph_data.edges.update_one(
- {"source": source, "target": target},
- {
- "$set": {
- "hash": edge_hash,
- "strength": strength,
- "created_time": created_time,
- "last_modified": last_modified,
- }
- },
- )
-
- def sync_memory_from_db(self):
- """从数据库同步数据到内存中的图结构"""
- current_time = datetime.datetime.now().timestamp()
- need_update = False
-
- # 清空当前图
- self.memory_graph.G.clear()
-
- # 从数据库加载所有节点
- nodes = list(db.graph_data.nodes.find())
- for node in nodes:
- concept = node["concept"]
- memory_items = node.get("memory_items", [])
- if not isinstance(memory_items, list):
- memory_items = [memory_items] if memory_items else []
-
- # 检查时间字段是否存在
- if "created_time" not in node or "last_modified" not in node:
- need_update = True
- # 更新数据库中的节点
- update_data = {}
- if "created_time" not in node:
- update_data["created_time"] = current_time
- if "last_modified" not in node:
- update_data["last_modified"] = current_time
-
- db.graph_data.nodes.update_one({"concept": concept}, {"$set": update_data})
- logger.info(f"[时间更新] 节点 {concept} 添加缺失的时间字段")
-
- # 获取时间信息(如果不存在则使用当前时间)
- created_time = node.get("created_time", current_time)
- last_modified = node.get("last_modified", current_time)
-
- # 添加节点到图中
- self.memory_graph.G.add_node(
- concept, memory_items=memory_items, created_time=created_time, last_modified=last_modified
- )
-
- # 从数据库加载所有边
- edges = list(db.graph_data.edges.find())
- for edge in edges:
- source = edge["source"]
- target = edge["target"]
- strength = edge.get("strength", 1)
-
- # 检查时间字段是否存在
- if "created_time" not in edge or "last_modified" not in edge:
- need_update = True
- # 更新数据库中的边
- update_data = {}
- if "created_time" not in edge:
- update_data["created_time"] = current_time
- if "last_modified" not in edge:
- update_data["last_modified"] = current_time
-
- db.graph_data.edges.update_one({"source": source, "target": target}, {"$set": update_data})
- logger.info(f"[时间更新] 边 {source} - {target} 添加缺失的时间字段")
-
- # 获取时间信息(如果不存在则使用当前时间)
- created_time = edge.get("created_time", current_time)
- last_modified = edge.get("last_modified", current_time)
-
- # 只有当源节点和目标节点都存在时才添加边
- if source in self.memory_graph.G and target in self.memory_graph.G:
- self.memory_graph.G.add_edge(
- source, target, strength=strength, created_time=created_time, last_modified=last_modified
- )
-
- if need_update:
- logger.success("[数据库] 已为缺失的时间字段进行补充")
-
- async def resync_memory_to_db(self):
- """清空数据库并重新同步所有记忆数据"""
- start_time = time.time()
- logger.info("[数据库] 开始重新同步所有记忆数据...")
-
- # 清空数据库
- clear_start = time.time()
- db.graph_data.nodes.delete_many({})
- db.graph_data.edges.delete_many({})
- clear_end = time.time()
- logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}秒")
-
- # 获取所有节点和边
- memory_nodes = list(self.memory_graph.G.nodes(data=True))
- memory_edges = list(self.memory_graph.G.edges(data=True))
-
- # 重新写入节点
- node_start = time.time()
- for concept, data in memory_nodes:
- memory_items = data.get("memory_items", [])
- if not isinstance(memory_items, list):
- memory_items = [memory_items] if memory_items else []
-
- node_data = {
- "concept": concept,
- "memory_items": memory_items,
- "hash": self.hippocampus.calculate_node_hash(concept, memory_items),
- "created_time": data.get("created_time", datetime.datetime.now().timestamp()),
- "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
- }
- db.graph_data.nodes.insert_one(node_data)
- node_end = time.time()
- logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}秒")
-
- # 重新写入边
- edge_start = time.time()
- for source, target, data in memory_edges:
- edge_data = {
- "source": source,
- "target": target,
- "strength": data.get("strength", 1),
- "hash": self.hippocampus.calculate_edge_hash(source, target),
- "created_time": data.get("created_time", datetime.datetime.now().timestamp()),
- "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
- }
- db.graph_data.edges.insert_one(edge_data)
- edge_end = time.time()
- logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}秒")
-
- end_time = time.time()
- logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒")
- logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边")
-
-
-# 海马体
-class Hippocampus:
- def __init__(self):
- self.memory_graph = MemoryGraph()
- self.llm_topic_judge = None
- self.llm_summary_by_topic = None
- self.entorhinal_cortex = None
- self.parahippocampal_gyrus = None
- self.config = None
-
- def initialize(self, global_config):
- self.config = MemoryConfig.from_global_config(global_config)
- # 初始化子组件
- self.entorhinal_cortex = EntorhinalCortex(self)
- self.parahippocampal_gyrus = ParahippocampalGyrus(self)
- # 从数据库加载记忆图
- self.entorhinal_cortex.sync_memory_from_db()
- self.llm_topic_judge = LLMRequest(self.config.llm_topic_judge, request_type="memory")
- self.llm_summary_by_topic = LLMRequest(self.config.llm_summary_by_topic, request_type="memory")
-
- def get_all_node_names(self) -> list:
- """获取记忆图中所有节点的名字列表"""
- return list(self.memory_graph.G.nodes())
-
- @staticmethod
- def calculate_node_hash(concept, memory_items) -> int:
- """计算节点的特征值"""
- if not isinstance(memory_items, list):
- memory_items = [memory_items] if memory_items else []
- sorted_items = sorted(memory_items)
- content = f"{concept}:{'|'.join(sorted_items)}"
- return hash(content)
-
- @staticmethod
- def calculate_edge_hash(source, target) -> int:
- """计算边的特征值"""
- nodes = sorted([source, target])
- return hash(f"{nodes[0]}:{nodes[1]}")
-
- @staticmethod
- def find_topic_llm(text, topic_num):
- prompt = (
- f"这是一段文字:{text}。请你从这段话中总结出最多{topic_num}个关键的概念,可以是名词,动词,或者特定人物,帮我列出来,"
- f"将主题用逗号隔开,并加上<>,例如<主题1>,<主题2>......尽可能精简。只需要列举最多{topic_num}个话题就好,不要有序号,不要告诉我其他内容。"
- f"如果确定找不出主题或者没有明显主题,返回
。"
- )
- return prompt
-
- @staticmethod
- def topic_what(text, topic, time_info):
- prompt = (
- f'这是一段文字,{time_info}:{text}。我想让你基于这段文字来概括"{topic}"这个概念,帮我总结成一句自然的话,'
- f"可以包含时间和人物,以及具体的观点。只输出这句话就好"
- )
- return prompt
-
- @staticmethod
- def calculate_topic_num(text, compress_rate):
- """计算文本的话题数量"""
- information_content = calculate_information_content(text)
- topic_by_length = text.count("\n") * compress_rate
- topic_by_information_content = max(1, min(5, int((information_content - 3) * 2)))
- topic_num = int((topic_by_length + topic_by_information_content) / 2)
- logger.debug(
- f"topic_by_length: {topic_by_length}, topic_by_information_content: {topic_by_information_content}, "
- f"topic_num: {topic_num}"
- )
- return topic_num
-
- def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list:
- """从关键词获取相关记忆。
-
- Args:
- keyword (str): 关键词
- max_depth (int, optional): 记忆检索深度,默认为2。1表示只获取直接相关的记忆,2表示获取间接相关的记忆。
-
- Returns:
- list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity)
- - topic: str, 记忆主题
- - memory_items: list, 该主题下的记忆项列表
- - similarity: float, 与关键词的相似度
- """
- if not keyword:
- return []
-
- # 获取所有节点
- all_nodes = list(self.memory_graph.G.nodes())
- memories = []
-
- # 计算关键词的词集合
- keyword_words = set(jieba.cut(keyword))
-
- # 遍历所有节点,计算相似度
- for node in all_nodes:
- node_words = set(jieba.cut(node))
- all_words = keyword_words | node_words
- v1 = [1 if word in keyword_words else 0 for word in all_words]
- v2 = [1 if word in node_words else 0 for word in all_words]
- similarity = cosine_similarity(v1, v2)
-
- # 如果相似度超过阈值,获取该节点的记忆
- if similarity >= 0.3: # 可以调整这个阈值
- node_data = self.memory_graph.G.nodes[node]
- memory_items = node_data.get("memory_items", [])
- if not isinstance(memory_items, list):
- memory_items = [memory_items] if memory_items else []
-
- memories.append((node, memory_items, similarity))
-
- # 按相似度降序排序
- memories.sort(key=lambda x: x[2], reverse=True)
- return memories
-
async def get_memory_from_text(
self,
text: str,
@@ -1543,6 +829,287 @@ class Hippocampus:
return activation_ratio
+# 负责海马体与其他部分的交互
+class EntorhinalCortex:
+ def __init__(self, hippocampus: Hippocampus):
+ self.hippocampus = hippocampus
+ self.memory_graph = hippocampus.memory_graph
+ self.config = hippocampus.config
+
+ def get_memory_sample(self):
+ """从数据库获取记忆样本"""
+ # 硬编码:每条消息最大记忆次数
+ max_memorized_time_per_msg = 3
+
+ # 创建双峰分布的记忆调度器
+ sample_scheduler = MemoryBuildScheduler(
+ n_hours1=self.config.memory_build_distribution[0],
+ std_hours1=self.config.memory_build_distribution[1],
+ weight1=self.config.memory_build_distribution[2],
+ n_hours2=self.config.memory_build_distribution[3],
+ std_hours2=self.config.memory_build_distribution[4],
+ weight2=self.config.memory_build_distribution[5],
+ total_samples=self.config.build_memory_sample_num,
+ )
+
+ timestamps = sample_scheduler.get_timestamp_array()
+ logger.info(f"回忆往事: {[time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts)) for ts in timestamps]}")
+ chat_samples = []
+ for timestamp in timestamps:
+ messages = self.random_get_msg_snippet(
+ timestamp, self.config.build_memory_sample_length, max_memorized_time_per_msg
+ )
+ if messages:
+ time_diff = (datetime.datetime.now().timestamp() - timestamp) / 3600
+ logger.debug(f"成功抽取 {time_diff:.1f} 小时前的消息样本,共{len(messages)}条")
+ chat_samples.append(messages)
+ else:
+ logger.debug(f"时间戳 {timestamp} 的消息样本抽取失败")
+
+ return chat_samples
+
+ @staticmethod
+ def random_get_msg_snippet(target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list:
+ """从数据库中随机获取指定时间戳附近的消息片段"""
+ try_count = 0
+ while try_count < 3:
+ messages = get_closest_chat_from_db(length=chat_size, timestamp=target_timestamp)
+ if messages:
+ for message in messages:
+ if message["memorized_times"] >= max_memorized_time_per_msg:
+ messages = None
+ break
+ if messages:
+ for message in messages:
+ db.messages.update_one(
+ {"_id": message["_id"]}, {"$set": {"memorized_times": message["memorized_times"] + 1}}
+ )
+ return messages
+ try_count += 1
+ return None
+
+ async def sync_memory_to_db(self):
+ """将记忆图同步到数据库"""
+ # 获取数据库中所有节点和内存中所有节点
+ db_nodes = list(db.graph_data.nodes.find())
+ memory_nodes = list(self.memory_graph.G.nodes(data=True))
+
+ # 转换数据库节点为字典格式,方便查找
+ db_nodes_dict = {node["concept"]: node for node in db_nodes}
+
+ # 检查并更新节点
+ for concept, data in memory_nodes:
+ memory_items = data.get("memory_items", [])
+ if not isinstance(memory_items, list):
+ memory_items = [memory_items] if memory_items else []
+
+ # 计算内存中节点的特征值
+ memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items)
+
+ # 获取时间信息
+ created_time = data.get("created_time", datetime.datetime.now().timestamp())
+ last_modified = data.get("last_modified", datetime.datetime.now().timestamp())
+
+ if concept not in db_nodes_dict:
+ # 数据库中缺少的节点,添加
+ node_data = {
+ "concept": concept,
+ "memory_items": memory_items,
+ "hash": memory_hash,
+ "created_time": created_time,
+ "last_modified": last_modified,
+ }
+ db.graph_data.nodes.insert_one(node_data)
+ else:
+ # 获取数据库中节点的特征值
+ db_node = db_nodes_dict[concept]
+ db_hash = db_node.get("hash", None)
+
+ # 如果特征值不同,则更新节点
+ if db_hash != memory_hash:
+ db.graph_data.nodes.update_one(
+ {"concept": concept},
+ {
+ "$set": {
+ "memory_items": memory_items,
+ "hash": memory_hash,
+ "created_time": created_time,
+ "last_modified": last_modified,
+ }
+ },
+ )
+
+ # 处理边的信息
+ db_edges = list(db.graph_data.edges.find())
+ memory_edges = list(self.memory_graph.G.edges(data=True))
+
+ # 创建边的哈希值字典
+ db_edge_dict = {}
+ for edge in db_edges:
+ edge_hash = self.hippocampus.calculate_edge_hash(edge["source"], edge["target"])
+ db_edge_dict[(edge["source"], edge["target"])] = {"hash": edge_hash, "strength": edge.get("strength", 1)}
+
+ # 检查并更新边
+ for source, target, data in memory_edges:
+ edge_hash = self.hippocampus.calculate_edge_hash(source, target)
+ edge_key = (source, target)
+ strength = data.get("strength", 1)
+
+ # 获取边的时间信息
+ created_time = data.get("created_time", datetime.datetime.now().timestamp())
+ last_modified = data.get("last_modified", datetime.datetime.now().timestamp())
+
+ if edge_key not in db_edge_dict:
+ # 添加新边
+ edge_data = {
+ "source": source,
+ "target": target,
+ "strength": strength,
+ "hash": edge_hash,
+ "created_time": created_time,
+ "last_modified": last_modified,
+ }
+ db.graph_data.edges.insert_one(edge_data)
+ else:
+ # 检查边的特征值是否变化
+ if db_edge_dict[edge_key]["hash"] != edge_hash:
+ db.graph_data.edges.update_one(
+ {"source": source, "target": target},
+ {
+ "$set": {
+ "hash": edge_hash,
+ "strength": strength,
+ "created_time": created_time,
+ "last_modified": last_modified,
+ }
+ },
+ )
+
+ def sync_memory_from_db(self):
+ """从数据库同步数据到内存中的图结构"""
+ current_time = datetime.datetime.now().timestamp()
+ need_update = False
+
+ # 清空当前图
+ self.memory_graph.G.clear()
+
+ # 从数据库加载所有节点
+ nodes = list(db.graph_data.nodes.find())
+ for node in nodes:
+ concept = node["concept"]
+ memory_items = node.get("memory_items", [])
+ if not isinstance(memory_items, list):
+ memory_items = [memory_items] if memory_items else []
+
+ # 检查时间字段是否存在
+ if "created_time" not in node or "last_modified" not in node:
+ need_update = True
+ # 更新数据库中的节点
+ update_data = {}
+ if "created_time" not in node:
+ update_data["created_time"] = current_time
+ if "last_modified" not in node:
+ update_data["last_modified"] = current_time
+
+ db.graph_data.nodes.update_one({"concept": concept}, {"$set": update_data})
+ logger.info(f"[时间更新] 节点 {concept} 添加缺失的时间字段")
+
+ # 获取时间信息(如果不存在则使用当前时间)
+ created_time = node.get("created_time", current_time)
+ last_modified = node.get("last_modified", current_time)
+
+ # 添加节点到图中
+ self.memory_graph.G.add_node(
+ concept, memory_items=memory_items, created_time=created_time, last_modified=last_modified
+ )
+
+ # 从数据库加载所有边
+ edges = list(db.graph_data.edges.find())
+ for edge in edges:
+ source = edge["source"]
+ target = edge["target"]
+ strength = edge.get("strength", 1)
+
+ # 检查时间字段是否存在
+ if "created_time" not in edge or "last_modified" not in edge:
+ need_update = True
+ # 更新数据库中的边
+ update_data = {}
+ if "created_time" not in edge:
+ update_data["created_time"] = current_time
+ if "last_modified" not in edge:
+ update_data["last_modified"] = current_time
+
+ db.graph_data.edges.update_one({"source": source, "target": target}, {"$set": update_data})
+ logger.info(f"[时间更新] 边 {source} - {target} 添加缺失的时间字段")
+
+ # 获取时间信息(如果不存在则使用当前时间)
+ created_time = edge.get("created_time", current_time)
+ last_modified = edge.get("last_modified", current_time)
+
+ # 只有当源节点和目标节点都存在时才添加边
+ if source in self.memory_graph.G and target in self.memory_graph.G:
+ self.memory_graph.G.add_edge(
+ source, target, strength=strength, created_time=created_time, last_modified=last_modified
+ )
+
+ if need_update:
+ logger.success("[数据库] 已为缺失的时间字段进行补充")
+
+ async def resync_memory_to_db(self):
+ """清空数据库并重新同步所有记忆数据"""
+ start_time = time.time()
+ logger.info("[数据库] 开始重新同步所有记忆数据...")
+
+ # 清空数据库
+ clear_start = time.time()
+ db.graph_data.nodes.delete_many({})
+ db.graph_data.edges.delete_many({})
+ clear_end = time.time()
+ logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}秒")
+
+ # 获取所有节点和边
+ memory_nodes = list(self.memory_graph.G.nodes(data=True))
+ memory_edges = list(self.memory_graph.G.edges(data=True))
+
+ # 重新写入节点
+ node_start = time.time()
+ for concept, data in memory_nodes:
+ memory_items = data.get("memory_items", [])
+ if not isinstance(memory_items, list):
+ memory_items = [memory_items] if memory_items else []
+
+ node_data = {
+ "concept": concept,
+ "memory_items": memory_items,
+ "hash": self.hippocampus.calculate_node_hash(concept, memory_items),
+ "created_time": data.get("created_time", datetime.datetime.now().timestamp()),
+ "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
+ }
+ db.graph_data.nodes.insert_one(node_data)
+ node_end = time.time()
+ logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}秒")
+
+ # 重新写入边
+ edge_start = time.time()
+ for source, target, data in memory_edges:
+ edge_data = {
+ "source": source,
+ "target": target,
+ "strength": data.get("strength", 1),
+ "hash": self.hippocampus.calculate_edge_hash(source, target),
+ "created_time": data.get("created_time", datetime.datetime.now().timestamp()),
+ "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()),
+ }
+ db.graph_data.edges.insert_one(edge_data)
+ edge_end = time.time()
+ logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}秒")
+
+ end_time = time.time()
+ logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒")
+ logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边")
+
+
# 负责整合,遗忘,合并记忆
class ParahippocampalGyrus:
def __init__(self, hippocampus: Hippocampus):
diff --git a/src/plugins/memory_system/manually_alter_memory.py b/src/plugins/memory_system/manually_alter_memory.py
index 818742113..1452d3d56 100644
--- a/src/plugins/memory_system/manually_alter_memory.py
+++ b/src/plugins/memory_system/manually_alter_memory.py
@@ -5,7 +5,8 @@ import time
from pathlib import Path
import datetime
from rich.console import Console
-from memory_manual_build import Memory_graph, Hippocampus # 海马体和记忆图
+from Hippocampus import Hippocampus # 海马体和记忆图
+
from dotenv import load_dotenv
@@ -45,13 +46,13 @@ else:
# 查询节点信息
-def query_mem_info(memory_graph: Memory_graph):
+def query_mem_info(hippocampus: Hippocampus):
while True:
query = input("\n请输入新的查询概念(输入'退出'以结束):")
if query.lower() == "退出":
break
- items_list = memory_graph.get_related_item(query)
+ items_list = hippocampus.memory_graph.get_related_item(query)
if items_list:
have_memory = False
first_layer, second_layer = items_list
@@ -312,14 +313,11 @@ def alter_mem_edge(hippocampus: Hippocampus):
async def main():
start_time = time.time()
- # 创建记忆图
- memory_graph = Memory_graph()
-
# 创建海马体
- hippocampus = Hippocampus(memory_graph)
+ hippocampus = Hippocampus()
# 从数据库同步数据
- hippocampus.sync_memory_from_db()
+ hippocampus.entorhinal_cortex.sync_memory_from_db()
end_time = time.time()
logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m")
@@ -338,7 +336,7 @@ async def main():
query = -1
if query == 0:
- query_mem_info(memory_graph)
+ query_mem_info(hippocampus.memory_graph)
elif query == 1:
add_mem_node(hippocampus)
elif query == 2:
@@ -355,7 +353,7 @@ async def main():
print("已结束操作")
break
- hippocampus.sync_memory_to_db()
+ hippocampus.entorhinal_cortex.sync_memory_to_db()
if __name__ == "__main__":
diff --git a/src/plugins/message/message_base.py b/src/plugins/message/message_base.py
index 2f1776702..b853d469a 100644
--- a/src/plugins/message/message_base.py
+++ b/src/plugins/message/message_base.py
@@ -12,7 +12,6 @@ class Seg:
- 对于 text 类型,data 是字符串
- 对于 image 类型,data 是 base64 字符串
- 对于 seglist 类型,data 是 Seg 列表
- translated_data: 经过翻译处理的数据(可选)
"""
type: str
diff --git a/src/plugins/person_info/person_info.py b/src/plugins/person_info/person_info.py
index 8105b330f..b4404988e 100644
--- a/src/plugins/person_info/person_info.py
+++ b/src/plugins/person_info/person_info.py
@@ -169,7 +169,7 @@ class PersonInfoManager:
"""给某个用户取名"""
if not person_id:
logger.debug("取名失败:person_id不能为空")
- return
+ return None
old_name = await self.get_value(person_id, "person_name")
old_reason = await self.get_value(person_id, "name_reason")
diff --git a/src/plugins/remote/remote.py b/src/plugins/remote/remote.py
index 0d119a3ec..5bc4dab14 100644
--- a/src/plugins/remote/remote.py
+++ b/src/plugins/remote/remote.py
@@ -134,3 +134,4 @@ def main():
heartbeat_thread.start()
return heartbeat_thread # 返回线程对象,便于外部控制
+ return None
diff --git a/template/template.env b/template/template.env
index 06e9b07ec..c1a6dd0dc 100644
--- a/template/template.env
+++ b/template/template.env
@@ -29,8 +29,18 @@ CHAT_ANY_WHERE_KEY=
SILICONFLOW_KEY=
# 定义日志相关配置
-SIMPLE_OUTPUT=true # 精简控制台输出格式
-CONSOLE_LOG_LEVEL=INFO # 自定义日志的默认控制台输出日志级别
-FILE_LOG_LEVEL=DEBUG # 自定义日志的默认文件输出日志级别
-DEFAULT_CONSOLE_LOG_LEVEL=SUCCESS # 原生日志的控制台输出日志级别(nonebot就是这一类)
-DEFAULT_FILE_LOG_LEVEL=DEBUG # 原生日志的默认文件输出日志级别(nonebot就是这一类)
\ No newline at end of file
+
+# 精简控制台输出格式
+SIMPLE_OUTPUT=true
+
+# 自定义日志的默认控制台输出日志级别
+CONSOLE_LOG_LEVEL=INFO
+
+# 自定义日志的默认文件输出日志级别
+FILE_LOG_LEVEL=DEBUG
+
+# 原生日志的控制台输出日志级别(nonebot就是这一类)
+DEFAULT_CONSOLE_LOG_LEVEL=SUCCESS
+
+# 原生日志的默认文件输出日志级别(nonebot就是这一类)
+DEFAULT_FILE_LOG_LEVEL=DEBUG