ref:调整文件位置和命名,结构更清晰

This commit is contained in:
SengokuCola
2025-07-06 18:47:08 +08:00
parent 498d72384f
commit 1de15bcc31
23 changed files with 227 additions and 354 deletions

View File

@@ -1,135 +0,0 @@
import time
import os
from typing import Optional, Dict, Any
from src.common.logger import get_logger
import json
logger = get_logger("hfc") # Logger Name Changed
log_dir = "log/log_cycle_debug/"
class CycleDetail:
"""循环信息记录类"""
def __init__(self, cycle_id: int):
self.cycle_id = cycle_id
self.prefix = ""
self.thinking_id = ""
self.start_time = time.time()
self.end_time: Optional[float] = None
self.timers: Dict[str, float] = {}
# 新字段
self.loop_observation_info: Dict[str, Any] = {}
self.loop_processor_info: Dict[str, Any] = {} # 前处理器信息
self.loop_plan_info: Dict[str, Any] = {}
self.loop_action_info: Dict[str, Any] = {}
def to_dict(self) -> Dict[str, Any]:
"""将循环信息转换为字典格式"""
def convert_to_serializable(obj, depth=0, seen=None):
if seen is None:
seen = set()
# 防止递归过深
if depth > 5: # 降低递归深度限制
return str(obj)
# 防止循环引用
obj_id = id(obj)
if obj_id in seen:
return str(obj)
seen.add(obj_id)
try:
if hasattr(obj, "to_dict"):
# 对于有to_dict方法的对象直接调用其to_dict方法
return obj.to_dict()
elif isinstance(obj, dict):
# 对于字典,只保留基本类型和可序列化的值
return {
k: convert_to_serializable(v, depth + 1, seen)
for k, v in obj.items()
if isinstance(k, (str, int, float, bool))
}
elif isinstance(obj, (list, tuple)):
# 对于列表和元组,只保留可序列化的元素
return [
convert_to_serializable(item, depth + 1, seen)
for item in obj
if not isinstance(item, (dict, list, tuple))
or isinstance(item, (str, int, float, bool, type(None)))
]
elif isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
return str(obj)
finally:
seen.remove(obj_id)
return {
"cycle_id": self.cycle_id,
"start_time": self.start_time,
"end_time": self.end_time,
"timers": self.timers,
"thinking_id": self.thinking_id,
"loop_observation_info": convert_to_serializable(self.loop_observation_info),
"loop_processor_info": convert_to_serializable(self.loop_processor_info),
"loop_plan_info": convert_to_serializable(self.loop_plan_info),
"loop_action_info": convert_to_serializable(self.loop_action_info),
}
def complete_cycle(self):
"""完成循环,记录结束时间"""
self.end_time = time.time()
# 处理 prefix只保留中英文字符和基本标点
if not self.prefix:
self.prefix = "group"
else:
# 只保留中文、英文字母、数字和基本标点
allowed_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
self.prefix = (
"".join(char for char in self.prefix if "\u4e00" <= char <= "\u9fff" or char in allowed_chars)
or "group"
)
# current_time_minute = time.strftime("%Y%m%d_%H%M", time.localtime())
# try:
# self.log_cycle_to_file(
# log_dir + self.prefix + f"/{current_time_minute}_cycle_" + str(self.cycle_id) + ".json"
# )
# except Exception as e:
# logger.warning(f"写入文件日志,可能是群名称包含非法字符: {e}")
def log_cycle_to_file(self, file_path: str):
"""将循环信息写入文件"""
# 如果目录不存在,则创建目
dir_name = os.path.dirname(file_path)
# 去除特殊字符,保留字母、数字、下划线、中划线和中文
dir_name = "".join(
char for char in dir_name if char.isalnum() or char in ["_", "-", "/"] or "\u4e00" <= char <= "\u9fff"
)
# print("dir_name:", dir_name)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
# 写入文件
file_path = os.path.join(dir_name, os.path.basename(file_path))
# print("file_path:", file_path)
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(self.to_dict(), ensure_ascii=False) + "\n")
def set_thinking_id(self, thinking_id: str):
"""设置思考消息ID"""
self.thinking_id = thinking_id
def set_loop_info(self, loop_info: Dict[str, Any]):
"""设置循环信息"""
self.loop_observation_info = loop_info["loop_observation_info"]
self.loop_processor_info = loop_info["loop_processor_info"]
self.loop_plan_info = loop_info["loop_plan_info"]
self.loop_action_info = loop_info["loop_action_info"]

View File

@@ -9,17 +9,14 @@ from rich.traceback import install
from src.chat.utils.prompt_builder import global_prompt_manager
from src.common.logger import get_logger
from src.chat.utils.timer_calculator import Timer
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.heartFC_Cycleinfo import CycleDetail
from src.chat.focus_chat.observation.observation import Observation
from src.chat.focus_chat.info.info_base import InfoBase
from src.chat.focus_chat.info_processors.chattinginfo_processor import ChattingInfoProcessor
from src.chat.focus_chat.info_processors.working_memory_processor import WorkingMemoryProcessor
from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservation
from src.chat.heart_flow.observation.working_observation import WorkingMemoryObservation
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.heart_flow.observation.actions_observation import ActionObservation
from src.chat.focus_chat.memory_activator import MemoryActivator
from src.chat.focus_chat.observation.hfcloop_observation import HFCloopObservation
from src.chat.focus_chat.observation.working_observation import WorkingMemoryObservation
from src.chat.focus_chat.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.observation.actions_observation import ActionObservation
from src.chat.focus_chat.info_processors.base_processor import BaseProcessor
from src.chat.planner_actions.planner_focus import ActionPlanner
from src.chat.planner_actions.action_modifier import ActionModifier
@@ -28,6 +25,7 @@ from src.config.config import global_config
from src.chat.focus_chat.hfc_performance_logger import HFCPerformanceLogger
from src.chat.focus_chat.hfc_version_manager import get_hfc_version
from src.person_info.relationship_builder_manager import relationship_builder_manager
from src.chat.focus_chat.hfc_utils import CycleDetail
install(extra_lines=3)
@@ -76,8 +74,6 @@ class HeartFChatting:
self.chat_stream = get_chat_manager().get_stream(self.stream_id)
self.log_prefix = f"[{get_chat_manager().get_stream_name(self.stream_id) or self.stream_id}]"
self.memory_activator = MemoryActivator()
self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id)
# 新增:消息计数器和疲惫阈值

View File

@@ -1,152 +0,0 @@
from src.chat.memory_system.Hippocampus import hippocampus_manager
from src.config.config import global_config
from src.chat.message_receive.message import MessageRecv
from src.chat.message_receive.storage import MessageStorage
from src.chat.heart_flow.heartflow import heartflow
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.utils import is_mentioned_bot_in_message
from src.chat.utils.timer_calculator import Timer
from src.common.logger import get_logger
import re
import math
import traceback
from typing import Optional, Tuple
from src.person_info.relationship_manager import get_relationship_manager
# from ..message_receive.message_buffer import message_buffer
logger = get_logger("chat")
async def _handle_error(error: Exception, context: str, message: Optional[MessageRecv] = None) -> None:
"""统一的错误处理函数
Args:
error: 捕获到的异常
context: 错误发生的上下文描述
message: 可选的消息对象,用于记录相关消息内容
"""
logger.error(f"{context}: {error}")
logger.error(traceback.format_exc())
if message and hasattr(message, "raw_message"):
logger.error(f"相关消息原始内容: {message.raw_message}")
async def _process_relationship(message: MessageRecv) -> None:
"""处理用户关系逻辑
Args:
message: 消息对象,包含用户信息
"""
platform = message.message_info.platform
user_id = message.message_info.user_info.user_id
nickname = message.message_info.user_info.user_nickname
cardname = message.message_info.user_info.user_cardname or nickname
relationship_manager = get_relationship_manager()
is_known = await relationship_manager.is_known_some_one(platform, user_id)
if not is_known:
logger.info(f"首次认识用户: {nickname}")
await relationship_manager.first_knowing_some_one(platform, user_id, nickname, cardname)
async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool]:
"""计算消息的兴趣度
Args:
message: 待处理的消息对象
Returns:
Tuple[float, bool]: (兴趣度, 是否被提及)
"""
is_mentioned, _ = is_mentioned_bot_in_message(message)
interested_rate = 0.0
if global_config.memory.enable_memory:
with Timer("记忆激活"):
interested_rate = await hippocampus_manager.get_activate_from_text(
message.processed_plain_text,
fast_retrieval=True,
)
logger.debug(f"记忆激活率: {interested_rate:.2f}")
text_len = len(message.processed_plain_text)
# 根据文本长度调整兴趣度长度越大兴趣度越高但增长率递减最低0.01最高0.05
# 采用对数函数实现递减增长
base_interest = 0.01 + (0.05 - 0.01) * (math.log10(text_len + 1) / math.log10(1000 + 1))
base_interest = min(max(base_interest, 0.01), 0.05)
interested_rate += base_interest
if is_mentioned:
interest_increase_on_mention = 1
interested_rate += interest_increase_on_mention
return interested_rate, is_mentioned
class HeartFCMessageReceiver:
"""心流处理器,负责处理接收到的消息并计算兴趣度"""
def __init__(self):
"""初始化心流处理器,创建消息存储实例"""
self.storage = MessageStorage()
async def process_message(self, message: MessageRecv) -> None:
"""处理接收到的原始消息数据
主要流程:
1. 消息解析与初始化
2. 消息缓冲处理
3. 过滤检查
4. 兴趣度计算
5. 关系处理
Args:
message_data: 原始消息字符串
"""
try:
# 1. 消息解析与初始化
groupinfo = message.message_info.group_info
userinfo = message.message_info.user_info
messageinfo = message.message_info
chat = await get_chat_manager().get_or_create_stream(
platform=messageinfo.platform,
user_info=userinfo,
group_info=groupinfo,
)
await self.storage.store_message(message, chat)
subheartflow = await heartflow.get_or_create_subheartflow(chat.stream_id)
message.update_chat_stream(chat)
# 6. 兴趣度计算与更新
interested_rate, is_mentioned = await _calculate_interest(message)
subheartflow.add_message_to_normal_chat_cache(message, interested_rate, is_mentioned)
# 7. 日志记录
mes_name = chat.group_info.group_name if chat.group_info else "私聊"
# current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
current_talk_frequency = global_config.chat.get_current_talk_frequency(chat.stream_id)
# 如果消息中包含图片标识,则日志展示为图片
picid_match = re.search(r"\[picid:([^\]]+)\]", message.processed_plain_text)
if picid_match:
logger.info(f"[{mes_name}]{userinfo.user_nickname}: [图片] [当前回复频率: {current_talk_frequency}]")
else:
logger.info(
f"[{mes_name}]{userinfo.user_nickname}:{message.processed_plain_text}[当前回复频率: {current_talk_frequency}]"
)
# 8. 关系处理
if global_config.relationship.enable_relationship:
await _process_relationship(message)
except Exception as e:
await _handle_error(e, "消息处理失败", message)

View File

@@ -5,9 +5,143 @@ from src.chat.message_receive.chat_stream import ChatStream
from src.chat.message_receive.message import UserInfo
from src.common.logger import get_logger
import json
import time
import os
from typing import Optional, Dict, Any
from src.common.logger import get_logger
import json
logger = get_logger(__name__)
log_dir = "log/log_cycle_debug/"
class CycleDetail:
"""循环信息记录类"""
def __init__(self, cycle_id: int):
self.cycle_id = cycle_id
self.prefix = ""
self.thinking_id = ""
self.start_time = time.time()
self.end_time: Optional[float] = None
self.timers: Dict[str, float] = {}
# 新字段
self.loop_observation_info: Dict[str, Any] = {}
self.loop_processor_info: Dict[str, Any] = {} # 前处理器信息
self.loop_plan_info: Dict[str, Any] = {}
self.loop_action_info: Dict[str, Any] = {}
def to_dict(self) -> Dict[str, Any]:
"""将循环信息转换为字典格式"""
def convert_to_serializable(obj, depth=0, seen=None):
if seen is None:
seen = set()
# 防止递归过深
if depth > 5: # 降低递归深度限制
return str(obj)
# 防止循环引用
obj_id = id(obj)
if obj_id in seen:
return str(obj)
seen.add(obj_id)
try:
if hasattr(obj, "to_dict"):
# 对于有to_dict方法的对象直接调用其to_dict方法
return obj.to_dict()
elif isinstance(obj, dict):
# 对于字典,只保留基本类型和可序列化的值
return {
k: convert_to_serializable(v, depth + 1, seen)
for k, v in obj.items()
if isinstance(k, (str, int, float, bool))
}
elif isinstance(obj, (list, tuple)):
# 对于列表和元组,只保留可序列化的元素
return [
convert_to_serializable(item, depth + 1, seen)
for item in obj
if not isinstance(item, (dict, list, tuple))
or isinstance(item, (str, int, float, bool, type(None)))
]
elif isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
return str(obj)
finally:
seen.remove(obj_id)
return {
"cycle_id": self.cycle_id,
"start_time": self.start_time,
"end_time": self.end_time,
"timers": self.timers,
"thinking_id": self.thinking_id,
"loop_observation_info": convert_to_serializable(self.loop_observation_info),
"loop_processor_info": convert_to_serializable(self.loop_processor_info),
"loop_plan_info": convert_to_serializable(self.loop_plan_info),
"loop_action_info": convert_to_serializable(self.loop_action_info),
}
def complete_cycle(self):
"""完成循环,记录结束时间"""
self.end_time = time.time()
# 处理 prefix只保留中英文字符和基本标点
if not self.prefix:
self.prefix = "group"
else:
# 只保留中文、英文字母、数字和基本标点
allowed_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
self.prefix = (
"".join(char for char in self.prefix if "\u4e00" <= char <= "\u9fff" or char in allowed_chars)
or "group"
)
# current_time_minute = time.strftime("%Y%m%d_%H%M", time.localtime())
# try:
# self.log_cycle_to_file(
# log_dir + self.prefix + f"/{current_time_minute}_cycle_" + str(self.cycle_id) + ".json"
# )
# except Exception as e:
# logger.warning(f"写入文件日志,可能是群名称包含非法字符: {e}")
def log_cycle_to_file(self, file_path: str):
"""将循环信息写入文件"""
# 如果目录不存在,则创建目
dir_name = os.path.dirname(file_path)
# 去除特殊字符,保留字母、数字、下划线、中划线和中文
dir_name = "".join(
char for char in dir_name if char.isalnum() or char in ["_", "-", "/"] or "\u4e00" <= char <= "\u9fff"
)
# print("dir_name:", dir_name)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
# 写入文件
file_path = os.path.join(dir_name, os.path.basename(file_path))
# print("file_path:", file_path)
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(self.to_dict(), ensure_ascii=False) + "\n")
def set_thinking_id(self, thinking_id: str):
"""设置思考消息ID"""
self.thinking_id = thinking_id
def set_loop_info(self, loop_info: Dict[str, Any]):
"""设置循环信息"""
self.loop_observation_info = loop_info["loop_observation_info"]
self.loop_processor_info = loop_info["loop_processor_info"]
self.loop_plan_info = loop_info["loop_plan_info"]
self.loop_action_info = loop_info["loop_action_info"]
async def create_empty_anchor_message(
platform: str, group_info: dict, chat_stream: ChatStream

View File

@@ -1,97 +0,0 @@
from typing import Dict, Optional
from dataclasses import dataclass
from .info_base import InfoBase
@dataclass
class ChatInfo(InfoBase):
"""聊天信息类
用于记录和管理聊天相关的信息包括聊天ID、名称和类型等。
继承自 InfoBase 类,使用字典存储具体数据。
Attributes:
type (str): 信息类型标识符,固定为 "chat"
Data Fields:
chat_id (str): 聊天的唯一标识符
chat_name (str): 聊天的名称
chat_type (str): 聊天的类型
"""
type: str = "chat"
def set_chat_id(self, chat_id: str) -> None:
"""设置聊天ID
Args:
chat_id (str): 聊天的唯一标识符
"""
self.data["chat_id"] = chat_id
def set_chat_name(self, chat_name: str) -> None:
"""设置聊天名称
Args:
chat_name (str): 聊天的名称
"""
self.data["chat_name"] = chat_name
def set_chat_type(self, chat_type: str) -> None:
"""设置聊天类型
Args:
chat_type (str): 聊天的类型
"""
self.data["chat_type"] = chat_type
def get_chat_id(self) -> Optional[str]:
"""获取聊天ID
Returns:
Optional[str]: 聊天的唯一标识符,如果未设置则返回 None
"""
return self.get_info("chat_id")
def get_chat_name(self) -> Optional[str]:
"""获取聊天名称
Returns:
Optional[str]: 聊天的名称,如果未设置则返回 None
"""
return self.get_info("chat_name")
def get_chat_type(self) -> Optional[str]:
"""获取聊天类型
Returns:
Optional[str]: 聊天的类型,如果未设置则返回 None
"""
return self.get_info("chat_type")
def get_type(self) -> str:
"""获取信息类型
Returns:
str: 当前信息对象的类型标识符
"""
return self.type
def get_data(self) -> Dict[str, str]:
"""获取所有信息数据
Returns:
Dict[str, str]: 包含所有信息数据的字典
"""
return self.data
def get_info(self, key: str) -> Optional[str]:
"""获取特定属性的信息
Args:
key: 要获取的属性键名
Returns:
Optional[str]: 属性值,如果键不存在则返回 None
"""
return self.data.get(key)

View File

@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import List, Any
from src.chat.focus_chat.info.info_base import InfoBase
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.observation.observation import Observation
from src.common.logger import get_logger
logger = get_logger("base_processor")

View File

@@ -1,10 +1,10 @@
from typing import List, Any
from src.chat.focus_chat.info.obs_info import ObsInfo
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.observation.observation import Observation
from src.chat.focus_chat.info.info_base import InfoBase
from .base_processor import BaseProcessor
from src.common.logger import get_logger
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.observation.chatting_observation import ChattingObservation
from datetime import datetime
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config

View File

@@ -1,5 +1,5 @@
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.observation.observation import Observation
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
import time
@@ -9,7 +9,7 @@ from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.message_receive.chat_stream import get_chat_manager
from .base_processor import BaseProcessor
from typing import List
from src.chat.heart_flow.observation.working_observation import WorkingMemoryObservation
from src.chat.focus_chat.observation.working_observation import WorkingMemoryObservation
from src.chat.focus_chat.working_memory.working_memory import WorkingMemory
from src.chat.focus_chat.info.info_base import InfoBase
from json_repair import repair_json

View File

@@ -1,154 +0,0 @@
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config
from src.common.logger import get_logger
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from datetime import datetime
from src.chat.memory_system.Hippocampus import hippocampus_manager
from typing import List, Dict
import difflib
import json
from json_repair import repair_json
logger = get_logger("memory_activator")
def get_keywords_from_json(json_str):
"""
从JSON字符串中提取关键词列表
Args:
json_str: JSON格式的字符串
Returns:
List[str]: 关键词列表
"""
try:
# 使用repair_json修复JSON格式
fixed_json = repair_json(json_str)
# 如果repair_json返回的是字符串需要解析为Python对象
if isinstance(fixed_json, str):
result = json.loads(fixed_json)
else:
# 如果repair_json直接返回了字典对象直接使用
result = fixed_json
# 提取关键词
keywords = result.get("keywords", [])
return keywords
except Exception as e:
logger.error(f"解析关键词JSON失败: {e}")
return []
def init_prompt():
# --- Group Chat Prompt ---
memory_activator_prompt = """
你是一个记忆分析器,你需要根据以下信息来进行回忆
以下是一段聊天记录,请根据这些信息,总结出几个关键词作为记忆回忆的触发词
聊天记录:
{obs_info_text}
你想要回复的消息:
{target_message}
历史关键词(请避免重复提取这些关键词):
{cached_keywords}
请输出一个json格式包含以下字段
{{
"keywords": ["关键词1", "关键词2", "关键词3",......]
}}
不要输出其他多余内容只输出json格式就好
"""
Prompt(memory_activator_prompt, "memory_activator_prompt")
class MemoryActivator:
def __init__(self):
# TODO: API-Adapter修改标记
self.summary_model = LLMRequest(
model=global_config.model.memory_summary,
temperature=0.7,
request_type="memory_activator",
)
self.running_memory = []
self.cached_keywords = set() # 用于缓存历史关键词
async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> List[Dict]:
"""
激活记忆
Args:
observations: 现有的进行观察后的 观察列表
Returns:
List[Dict]: 激活的记忆列表
"""
# 如果记忆系统被禁用,直接返回空列表
if not global_config.memory.enable_memory:
return []
# 将缓存的关键词转换为字符串用于prompt
cached_keywords_str = ", ".join(self.cached_keywords) if self.cached_keywords else "暂无历史关键词"
prompt = await global_prompt_manager.format_prompt(
"memory_activator_prompt",
obs_info_text=chat_history_prompt,
target_message=target_message,
cached_keywords=cached_keywords_str,
)
# logger.debug(f"prompt: {prompt}")
response, (reasoning_content, model_name) = await self.summary_model.generate_response_async(prompt)
keywords = list(get_keywords_from_json(response))
# 更新关键词缓存
if keywords:
# 限制缓存大小最多保留10个关键词
if len(self.cached_keywords) > 10:
# 转换为列表,移除最早的关键词
cached_list = list(self.cached_keywords)
self.cached_keywords = set(cached_list[-8:])
# 添加新的关键词到缓存
self.cached_keywords.update(keywords)
# 调用记忆系统获取相关记忆
related_memory = await hippocampus_manager.get_memory_from_topic(
valid_keywords=keywords, max_memory_num=3, max_memory_length=2, max_depth=3
)
logger.info(f"当前记忆关键词: {self.cached_keywords} 。获取到的记忆: {related_memory}")
# 激活时所有已有记忆的duration+1达到3则移除
for m in self.running_memory[:]:
m["duration"] = m.get("duration", 1) + 1
self.running_memory = [m for m in self.running_memory if m["duration"] < 3]
if related_memory:
for topic, memory in related_memory:
# 检查是否已存在相同topic或相似内容相似度>=0.7)的记忆
exists = any(
m["topic"] == topic or difflib.SequenceMatcher(None, m["content"], memory).ratio() >= 0.7
for m in self.running_memory
)
if not exists:
self.running_memory.append(
{"topic": topic, "content": memory, "timestamp": datetime.now().isoformat(), "duration": 1}
)
logger.debug(f"添加新记忆: {topic} - {memory}")
# 限制同时加载的记忆条数最多保留最后3条
if len(self.running_memory) > 3:
self.running_memory = self.running_memory[-3:]
return self.running_memory
init_prompt()

View File

@@ -0,0 +1,46 @@
# 定义了来自外部世界的信息
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
from datetime import datetime
from src.common.logger import get_logger
from src.chat.planner_actions.action_manager import ActionManager
logger = get_logger("observation")
# 特殊的观察,专门用于观察动作
# 所有观察的基类
class ActionObservation:
def __init__(self, observe_id):
self.observe_info = ""
self.observe_id = observe_id
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
self.action_manager: ActionManager = None
self.all_actions = {}
self.all_using_actions = {}
def get_observe_info(self):
return self.observe_info
def set_action_manager(self, action_manager: ActionManager):
self.action_manager = action_manager
self.all_actions = self.action_manager.get_registered_actions()
async def observe(self):
action_info_block = ""
self.all_using_actions = self.action_manager.get_using_actions()
for action_name, action_info in self.all_using_actions.items():
action_info_block += f"\n{action_name}: {action_info.get('description', '')}"
action_info_block += "\n注意,除了上面动作选项之外,你在群聊里不能做其他任何事情,这是你能力的边界\n"
self.observe_info = action_info_block
def to_dict(self) -> dict:
"""将观察对象转换为可序列化的字典"""
return {
"observe_info": self.observe_info,
"observe_id": self.observe_id,
"last_observe_time": self.last_observe_time,
"all_actions": self.all_actions,
"all_using_actions": self.all_using_actions,
}

View File

@@ -0,0 +1,183 @@
from datetime import datetime
from src.config.config import global_config
from src.chat.utils.chat_message_builder import (
get_raw_msg_before_timestamp_with_chat,
build_readable_messages,
get_raw_msg_by_timestamp_with_chat,
num_new_messages_since,
get_person_id_list,
)
from src.chat.utils.prompt_builder import global_prompt_manager, Prompt
from src.chat.focus_chat.observation.observation import Observation
from src.common.logger import get_logger
from src.chat.utils.utils import get_chat_type_and_target_info
logger = get_logger("observation")
# 定义提示模板
Prompt(
"""这是{chat_type_description},请总结以下聊天记录的主题:
{chat_logs}
请概括这段聊天记录的主题和主要内容
主题简短的概括包括时间人物和事件不要超过20个字
内容具体的信息内容包括人物、事件和信息不要超过200个字不要分点。
请用json格式返回格式如下
{{
"theme": "主题,例如 2025-06-14 10:00:00 群聊 麦麦 和 网友 讨论了 游戏 的话题",
"content": "内容,可以是对聊天记录的概括,也可以是聊天记录的详细内容"
}}
""",
"chat_summary_prompt",
)
class ChattingObservation(Observation):
def __init__(self, chat_id):
super().__init__(chat_id)
self.chat_id = chat_id
self.platform = "qq"
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id)
self.talking_message = []
self.talking_message_str = ""
self.talking_message_str_truncate = ""
self.talking_message_str_short = ""
self.talking_message_str_truncate_short = ""
self.name = global_config.bot.nickname
self.nick_name = global_config.bot.alias_names
self.max_now_obs_len = global_config.chat.max_context_size
self.overlap_len = global_config.focus_chat.compressed_length
self.person_list = []
self.compressor_prompt = ""
self.oldest_messages = []
self.oldest_messages_str = ""
self.last_observe_time = datetime.now().timestamp()
initial_messages = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 10)
initial_messages_short = get_raw_msg_before_timestamp_with_chat(self.chat_id, self.last_observe_time, 5)
self.last_observe_time = initial_messages[-1]["time"] if initial_messages else self.last_observe_time
self.talking_message = initial_messages
self.talking_message_short = initial_messages_short
self.talking_message_str = build_readable_messages(self.talking_message, show_actions=True)
self.talking_message_str_truncate = build_readable_messages(
self.talking_message, show_actions=True, truncate=True
)
self.talking_message_str_short = build_readable_messages(self.talking_message_short, show_actions=True)
self.talking_message_str_truncate_short = build_readable_messages(
self.talking_message_short, show_actions=True, truncate=True
)
def to_dict(self) -> dict:
"""将观察对象转换为可序列化的字典"""
return {
"chat_id": self.chat_id,
"platform": self.platform,
"is_group_chat": self.is_group_chat,
"chat_target_info": self.chat_target_info,
"talking_message_str": self.talking_message_str,
"talking_message_str_truncate": self.talking_message_str_truncate,
"talking_message_str_short": self.talking_message_str_short,
"talking_message_str_truncate_short": self.talking_message_str_truncate_short,
"name": self.name,
"nick_name": self.nick_name,
"last_observe_time": self.last_observe_time,
}
def get_observe_info(self, ids=None):
return self.talking_message_str
async def observe(self):
# 自上一次观察的新消息
new_messages_list = get_raw_msg_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=self.last_observe_time,
timestamp_end=datetime.now().timestamp(),
limit=self.max_now_obs_len,
limit_mode="latest",
)
# print(f"new_messages_list: {new_messages_list}")
last_obs_time_mark = self.last_observe_time
if new_messages_list:
self.last_observe_time = new_messages_list[-1]["time"]
self.talking_message.extend(new_messages_list)
if len(self.talking_message) > self.max_now_obs_len:
# 计算需要移除的消息数量,保留最新的 max_now_obs_len 条
messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len
oldest_messages = self.talking_message[:messages_to_remove_count]
self.talking_message = self.talking_message[messages_to_remove_count:]
# 构建压缩提示
oldest_messages_str = build_readable_messages(
messages=oldest_messages, timestamp_mode="normal_no_YMD", read_mark=0, show_actions=True
)
# 根据聊天类型选择提示模板
prompt_template_name = "chat_summary_prompt"
if self.is_group_chat:
chat_type_description = "qq群聊的聊天记录"
else:
chat_target_name = "对方"
if self.chat_target_info:
chat_target_name = (
self.chat_target_info.get("person_name")
or self.chat_target_info.get("user_nickname")
or chat_target_name
)
chat_type_description = f"你和{chat_target_name}的私聊记录"
prompt = await global_prompt_manager.format_prompt(
prompt_template_name,
chat_type_description=chat_type_description,
chat_logs=oldest_messages_str,
)
self.compressor_prompt = prompt
# 构建当前消息
self.talking_message_str = build_readable_messages(
messages=self.talking_message,
timestamp_mode="lite",
read_mark=last_obs_time_mark,
show_actions=True,
)
self.talking_message_str_truncate = build_readable_messages(
messages=self.talking_message,
timestamp_mode="normal_no_YMD",
read_mark=last_obs_time_mark,
truncate=True,
show_actions=True,
)
# 构建简短版本 - 使用最新一半的消息
half_count = len(self.talking_message) // 2
recent_messages = self.talking_message[-half_count:] if half_count > 0 else self.talking_message
self.talking_message_str_short = build_readable_messages(
messages=recent_messages,
timestamp_mode="lite",
read_mark=last_obs_time_mark,
show_actions=True,
)
self.talking_message_str_truncate_short = build_readable_messages(
messages=recent_messages,
timestamp_mode="normal_no_YMD",
read_mark=last_obs_time_mark,
truncate=True,
show_actions=True,
)
self.person_list = await get_person_id_list(self.talking_message)
# logger.debug(
# f"Chat {self.chat_id} - 现在聊天内容:{self.talking_message_str}"
# )
async def has_new_messages_since(self, timestamp: float) -> bool:
"""检查指定时间戳之后是否有新消息"""
count = num_new_messages_since(chat_id=self.chat_id, timestamp_start=timestamp)
return count > 0

View File

@@ -0,0 +1,128 @@
# 定义了来自外部世界的信息
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
from datetime import datetime
from src.common.logger import get_logger
from src.chat.focus_chat.hfc_utils import CycleDetail
from typing import List
# Import the new utility function
logger = get_logger("observation")
# 所有观察的基类
class HFCloopObservation:
def __init__(self, observe_id):
self.observe_info = ""
self.observe_id = observe_id
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
self.history_loop: List[CycleDetail] = []
def get_observe_info(self):
return self.observe_info
def add_loop_info(self, loop_info: CycleDetail):
self.history_loop.append(loop_info)
async def observe(self):
recent_active_cycles: List[CycleDetail] = []
for cycle in reversed(self.history_loop):
# 只关心实际执行了动作的循环
# action_taken = cycle.loop_action_info["action_taken"]
# if action_taken:
recent_active_cycles.append(cycle)
if len(recent_active_cycles) == 5:
break
cycle_info_block = ""
action_detailed_str = ""
consecutive_text_replies = 0
responses_for_prompt = []
cycle_last_reason = ""
# 检查这最近的活动循环中有多少是连续的文本回复 (从最近的开始看)
for cycle in recent_active_cycles:
action_result = cycle.loop_plan_info.get("action_result", {})
action_type = action_result.get("action_type", "unknown")
action_reasoning = action_result.get("reasoning", "未提供理由")
is_taken = cycle.loop_action_info.get("action_taken", False)
action_taken_time = cycle.loop_action_info.get("taken_time", 0)
action_taken_time_str = (
datetime.fromtimestamp(action_taken_time).strftime("%H:%M:%S") if action_taken_time > 0 else "未知时间"
)
# print(action_type)
# print(action_reasoning)
# print(is_taken)
# print(action_taken_time_str)
# print("--------------------------------")
if action_reasoning != cycle_last_reason:
cycle_last_reason = action_reasoning
action_reasoning_str = f"你选择这个action的原因是:{action_reasoning}"
else:
action_reasoning_str = ""
if action_type == "reply":
consecutive_text_replies += 1
response_text = cycle.loop_action_info.get("reply_text", "")
responses_for_prompt.append(response_text)
if is_taken:
action_detailed_str += f"{action_taken_time_str}时,你选择回复(action:{action_type},内容是:'{response_text}')。{action_reasoning_str}\n"
else:
action_detailed_str += f"{action_taken_time_str}时,你选择回复(action:{action_type},内容是:'{response_text}'),但是动作失败了。{action_reasoning_str}\n"
elif action_type == "no_reply":
# action_detailed_str += (
# f"{action_taken_time_str}时,你选择不回复(action:{action_type}){action_reasoning_str}\n"
# )
pass
else:
if is_taken:
action_detailed_str += (
f"{action_taken_time_str}时,你选择执行了(action:{action_type}){action_reasoning_str}\n"
)
else:
action_detailed_str += f"{action_taken_time_str}时,你选择执行了(action:{action_type}),但是动作失败了。{action_reasoning_str}\n"
if action_detailed_str:
cycle_info_block = f"\n你最近做的事:\n{action_detailed_str}\n"
else:
cycle_info_block = "\n"
# 根据连续文本回复的数量构建提示信息
if consecutive_text_replies >= 3: # 如果最近的三个活动都是文本回复
cycle_info_block = f'你已经连续回复了三条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}",第三近: "{responses_for_prompt[2]}")。你回复的有点多了,请注意'
elif consecutive_text_replies == 2: # 如果最近的两个活动是文本回复
cycle_info_block = f'你已经连续回复了两条消息(最近: "{responses_for_prompt[0]}",第二近: "{responses_for_prompt[1]}"),请注意'
# 包装提示块,增加可读性,即使没有连续回复也给个标记
# if cycle_info_block:
# cycle_info_block = f"\n你最近的回复\n{cycle_info_block}\n"
# else:
# cycle_info_block = "\n"
# 获取history_loop中最新添加的
if self.history_loop:
last_loop = self.history_loop[0]
start_time = last_loop.start_time
end_time = last_loop.end_time
if start_time is not None and end_time is not None:
time_diff = int(end_time - start_time)
if time_diff > 60:
cycle_info_block += f"距离你上一次阅读消息并思考和规划,已经过去了{int(time_diff / 60)}分钟\n"
else:
cycle_info_block += f"距离你上一次阅读消息并思考和规划,已经过去了{time_diff}\n"
else:
cycle_info_block += "你还没看过消息\n"
self.observe_info = cycle_info_block
def to_dict(self) -> dict:
"""将观察对象转换为可序列化的字典"""
# 只序列化基本信息,避免循环引用
return {
"observe_info": self.observe_info,
"observe_id": self.observe_id,
"last_observe_time": self.last_observe_time,
# 不序列化history_loop避免循环引用
"history_loop_count": len(self.history_loop),
}

View File

@@ -0,0 +1,25 @@
# 定义了来自外部世界的信息
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
from datetime import datetime
from src.common.logger import get_logger
logger = get_logger("observation")
# 所有观察的基类
class Observation:
def __init__(self, observe_id):
self.observe_info = ""
self.observe_id = observe_id
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
def to_dict(self) -> dict:
"""将观察对象转换为可序列化的字典"""
return {
"observe_info": self.observe_info,
"observe_id": self.observe_id,
"last_observe_time": self.last_observe_time,
}
async def observe(self):
pass

View File

@@ -0,0 +1,34 @@
# 定义了来自外部世界的信息
# 外部世界可以是某个聊天 不同平台的聊天 也可以是任意媒体
from datetime import datetime
from src.common.logger import get_logger
from src.chat.focus_chat.working_memory.working_memory import WorkingMemory
from src.chat.focus_chat.working_memory.memory_item import MemoryItem
from typing import List
# Import the new utility function
logger = get_logger("observation")
# 所有观察的基类
class WorkingMemoryObservation:
def __init__(self, observe_id):
self.observe_info = ""
self.observe_id = observe_id
self.last_observe_time = datetime.now().timestamp()
self.working_memory = WorkingMemory(chat_id=observe_id)
self.retrieved_working_memory = []
def get_observe_info(self):
return self.working_memory
def add_retrieved_working_memory(self, retrieved_working_memory: List[MemoryItem]):
self.retrieved_working_memory.append(retrieved_working_memory)
def get_retrieved_working_memory(self):
return self.retrieved_working_memory
async def observe(self):
pass