diff --git a/plugins/take_picture_plugin/plugin.py b/plugins/take_picture_plugin/plugin.py index bbe189526..75bd7ed8e 100644 --- a/plugins/take_picture_plugin/plugin.py +++ b/plugins/take_picture_plugin/plugin.py @@ -106,9 +106,9 @@ class TakePictureAction(BaseAction): bot_nickname = self.api.get_global_config("bot.nickname", "麦麦") bot_personality = self.api.get_global_config("personality.personality_core", "") - personality_sides = self.api.get_global_config("personality.personality_sides", []) - if personality_sides: - bot_personality += random.choice(personality_sides) + personality_side = self.api.get_global_config("personality.personality_side", []) + if personality_side: + bot_personality += random.choice(personality_side) # 准备模板变量 template_vars = {"name": bot_nickname, "personality": bot_personality} diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 88506eaa2..d4714f813 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -91,9 +91,7 @@ class HeartFChatting: # 新增:消息计数器和疲惫阈值 self._message_count = 0 # 发送的消息计数 - # 基于exit_focus_threshold动态计算疲惫阈值 - # 基础值30条,通过exit_focus_threshold调节:threshold越小,越容易疲惫 - self._message_threshold = max(10, int(30 * global_config.chat.exit_focus_threshold)) + self._message_threshold = max(10, int(30 * global_config.chat.focus_value)) self._fatigue_triggered = False # 是否已触发疲惫退出 self.action_manager = ActionManager() @@ -127,7 +125,7 @@ class HeartFChatting: self.priority_manager = None logger.info( - f"{self.log_prefix} HeartFChatting 初始化完成,消息疲惫阈值: {self._message_threshold}条(基于exit_focus_threshold={global_config.chat.exit_focus_threshold}计算,仅在auto模式下生效)" + f"{self.log_prefix} HeartFChatting 初始化完成" ) self.energy_value = 100 @@ -195,7 +193,7 @@ class HeartFChatting: async def _loopbody(self): if self.loop_mode == "focus": - self.energy_value -= 5 * (1 / global_config.chat.exit_focus_threshold) + self.energy_value -= 5 * global_config.chat.focus_value if self.energy_value <= 0: self.loop_mode = "normal" return True @@ -211,7 +209,7 @@ class HeartFChatting: filter_bot=True, ) - if len(new_messages_data) > 4 * global_config.chat.auto_focus_threshold: + if len(new_messages_data) > 4 * global_config.chat.focus_value: self.loop_mode = "focus" self.energy_value = 100 return True diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index dbc417aad..f0478c51c 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -39,90 +39,3 @@ class SubHeartflow: async def initialize(self): """异步初始化方法,创建兴趣流并确定聊天类型""" await self.heart_fc_instance.start() - - # async def _stop_heart_fc_chat(self): - # """停止并清理 HeartFChatting 实例""" - # if self.heart_fc_instance.running: - # logger.info(f"{self.log_prefix} 结束专注聊天...") - # try: - # await self.heart_fc_instance.shutdown() - # except Exception as e: - # logger.error(f"{self.log_prefix} 关闭 HeartFChatting 实例时出错: {e}") - # logger.error(traceback.format_exc()) - # else: - # logger.info(f"{self.log_prefix} 没有专注聊天实例,无需停止专注聊天") - - # async def _start_heart_fc_chat(self) -> bool: - # """启动 HeartFChatting 实例,确保 NormalChat 已停止""" - # try: - # # 如果任务已完成或不存在,则尝试重新启动 - # if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done(): - # logger.info(f"{self.log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...") - # try: - # # 添加超时保护 - # await asyncio.wait_for(self.heart_fc_instance.start(), timeout=15.0) - # logger.info(f"{self.log_prefix} HeartFChatting 循环已启动。") - # return True - # except Exception as e: - # logger.error(f"{self.log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") - # logger.error(traceback.format_exc()) - # # 出错时清理实例,准备重新创建 - # self.heart_fc_instance = None # type: ignore - # return False - # else: - # # 任务正在运行 - # logger.debug(f"{self.log_prefix} HeartFChatting 已在运行中。") - # return True # 已经在运行 - - # except Exception as e: - # logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}") - # logger.error(traceback.format_exc()) - # return False - - # def is_in_focus_cooldown(self) -> bool: - # """检查是否在focus模式的冷却期内 - - # Returns: - # bool: 如果在冷却期内返回True,否则返回False - # """ - # if self.last_focus_exit_time == 0: - # return False - - # # 基础冷却时间10分钟,受auto_focus_threshold调控 - # base_cooldown = 10 * 60 # 10分钟转换为秒 - # cooldown_duration = base_cooldown / global_config.chat.auto_focus_threshold - - # current_time = time.time() - # elapsed_since_exit = current_time - self.last_focus_exit_time - - # is_cooling = elapsed_since_exit < cooldown_duration - - # if is_cooling: - # remaining_time = cooldown_duration - elapsed_since_exit - # remaining_minutes = remaining_time / 60 - # logger.debug( - # f"[{self.log_prefix}] focus冷却中,剩余时间: {remaining_minutes:.1f}分钟 (阈值: {global_config.chat.auto_focus_threshold})" - # ) - - # return is_cooling - - # def get_cooldown_progress(self) -> float: - # """获取冷却进度,返回0-1之间的值 - - # Returns: - # float: 0表示刚开始冷却,1表示冷却完成 - # """ - # if self.last_focus_exit_time == 0: - # return 1.0 # 没有冷却,返回1表示完全恢复 - - # # 基础冷却时间10分钟,受auto_focus_threshold调控 - # base_cooldown = 10 * 60 # 10分钟转换为秒 - # cooldown_duration = base_cooldown / global_config.chat.auto_focus_threshold - - # current_time = time.time() - # elapsed_since_exit = current_time - self.last_focus_exit_time - - # if elapsed_since_exit >= cooldown_duration: - # return 1.0 # 冷却完成 - - # return elapsed_since_exit / cooldown_duration diff --git a/src/chat/planner_actions/action_modifier.py b/src/chat/planner_actions/action_modifier.py index d47b7d004..3ed82d671 100644 --- a/src/chat/planner_actions/action_modifier.py +++ b/src/chat/planner_actions/action_modifier.py @@ -436,72 +436,72 @@ class ActionModifier: logger.debug(f"{self.log_prefix}动作 {action_name} 未匹配到任何关键词: {activation_keywords}") return False - async def analyze_loop_actions(self, history_loop: List[CycleDetail]) -> List[tuple[str, str]]: - """分析最近的循环内容并决定动作的移除 + # async def analyze_loop_actions(self, history_loop: List[CycleDetail]) -> List[tuple[str, str]]: + # """分析最近的循环内容并决定动作的移除 - Returns: - List[Tuple[str, str]]: 包含要删除的动作及原因的元组列表 - [("action3", "some reason")] - """ - removals = [] + # Returns: + # List[Tuple[str, str]]: 包含要删除的动作及原因的元组列表 + # [("action3", "some reason")] + # """ + # removals = [] - # 获取最近10次循环 - recent_cycles = history_loop[-10:] if len(history_loop) > 10 else history_loop - if not recent_cycles: - return removals + # # 获取最近10次循环 + # recent_cycles = history_loop[-10:] if len(history_loop) > 10 else history_loop + # if not recent_cycles: + # return removals - reply_sequence = [] # 记录最近的动作序列 + # reply_sequence = [] # 记录最近的动作序列 - for cycle in recent_cycles: - action_result = cycle.loop_plan_info.get("action_result", {}) - action_type = action_result.get("action_type", "unknown") - reply_sequence.append(action_type == "reply") + # for cycle in recent_cycles: + # action_result = cycle.loop_plan_info.get("action_result", {}) + # action_type = action_result.get("action_type", "unknown") + # reply_sequence.append(action_type == "reply") - # 计算连续回复的相关阈值 + # # 计算连续回复的相关阈值 - max_reply_num = int(global_config.focus_chat.consecutive_replies * 3.2) - sec_thres_reply_num = int(global_config.focus_chat.consecutive_replies * 2) - one_thres_reply_num = int(global_config.focus_chat.consecutive_replies * 1.5) + # max_reply_num = int(global_config.focus_chat.consecutive_replies * 3.2) + # sec_thres_reply_num = int(global_config.focus_chat.consecutive_replies * 2) + # one_thres_reply_num = int(global_config.focus_chat.consecutive_replies * 1.5) - # 获取最近max_reply_num次的reply状态 - if len(reply_sequence) >= max_reply_num: - last_max_reply_num = reply_sequence[-max_reply_num:] - else: - last_max_reply_num = reply_sequence[:] + # # 获取最近max_reply_num次的reply状态 + # if len(reply_sequence) >= max_reply_num: + # last_max_reply_num = reply_sequence[-max_reply_num:] + # else: + # last_max_reply_num = reply_sequence[:] - # 详细打印阈值和序列信息,便于调试 - logger.info( - f"连续回复阈值: max={max_reply_num}, sec={sec_thres_reply_num}, one={one_thres_reply_num}," - f"最近reply序列: {last_max_reply_num}" - ) - # print(f"consecutive_replies: {consecutive_replies}") + # # 详细打印阈值和序列信息,便于调试 + # logger.info( + # f"连续回复阈值: max={max_reply_num}, sec={sec_thres_reply_num}, one={one_thres_reply_num}," + # f"最近reply序列: {last_max_reply_num}" + # ) + # # print(f"consecutive_replies: {consecutive_replies}") - # 根据最近的reply情况决定是否移除reply动作 - if len(last_max_reply_num) >= max_reply_num and all(last_max_reply_num): - # 如果最近max_reply_num次都是reply,直接移除 - reason = f"连续回复过多(最近{len(last_max_reply_num)}次全是reply,超过阈值{max_reply_num})" - removals.append(("reply", reason)) - # reply_count = len(last_max_reply_num) - no_reply_count - elif len(last_max_reply_num) >= sec_thres_reply_num and all(last_max_reply_num[-sec_thres_reply_num:]): - # 如果最近sec_thres_reply_num次都是reply,40%概率移除 - removal_probability = 0.4 / global_config.focus_chat.consecutive_replies - if random.random() < removal_probability: - reason = ( - f"连续回复较多(最近{sec_thres_reply_num}次全是reply,{removal_probability:.2f}概率移除,触发移除)" - ) - removals.append(("reply", reason)) - elif len(last_max_reply_num) >= one_thres_reply_num and all(last_max_reply_num[-one_thres_reply_num:]): - # 如果最近one_thres_reply_num次都是reply,20%概率移除 - removal_probability = 0.2 / global_config.focus_chat.consecutive_replies - if random.random() < removal_probability: - reason = ( - f"连续回复检测(最近{one_thres_reply_num}次全是reply,{removal_probability:.2f}概率移除,触发移除)" - ) - removals.append(("reply", reason)) - else: - logger.debug(f"{self.log_prefix}连续回复检测:无需移除reply动作,最近回复模式正常") + # # 根据最近的reply情况决定是否移除reply动作 + # if len(last_max_reply_num) >= max_reply_num and all(last_max_reply_num): + # # 如果最近max_reply_num次都是reply,直接移除 + # reason = f"连续回复过多(最近{len(last_max_reply_num)}次全是reply,超过阈值{max_reply_num})" + # removals.append(("reply", reason)) + # # reply_count = len(last_max_reply_num) - no_reply_count + # elif len(last_max_reply_num) >= sec_thres_reply_num and all(last_max_reply_num[-sec_thres_reply_num:]): + # # 如果最近sec_thres_reply_num次都是reply,40%概率移除 + # removal_probability = 0.4 / global_config.focus_chat.consecutive_replies + # if random.random() < removal_probability: + # reason = ( + # f"连续回复较多(最近{sec_thres_reply_num}次全是reply,{removal_probability:.2f}概率移除,触发移除)" + # ) + # removals.append(("reply", reason)) + # elif len(last_max_reply_num) >= one_thres_reply_num and all(last_max_reply_num[-one_thres_reply_num:]): + # # 如果最近one_thres_reply_num次都是reply,20%概率移除 + # removal_probability = 0.2 / global_config.focus_chat.consecutive_replies + # if random.random() < removal_probability: + # reason = ( + # f"连续回复检测(最近{one_thres_reply_num}次全是reply,{removal_probability:.2f}概率移除,触发移除)" + # ) + # removals.append(("reply", reason)) + # else: + # logger.debug(f"{self.log_prefix}连续回复检测:无需移除reply动作,最近回复模式正常") - return removals + # return removals # def get_available_actions_count(self, mode: str = "focus") -> int: # """获取当前可用动作数量(排除默认的no_action)""" diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 912cd7997..e35568476 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -10,6 +10,7 @@ from datetime import datetime from src.common.logger import get_logger from src.config.config import global_config +from src.individuality.individuality import get_individuality from src.llm_models.utils_model import LLMRequest from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending from src.chat.message_receive.chat_stream import ChatStream @@ -561,7 +562,6 @@ class DefaultReplyer: chat_stream = self.chat_stream chat_id = chat_stream.stream_id person_info_manager = get_person_info_manager() - bot_person_id = person_info_manager.get_person_id("system", "bot_id") is_group_chat = bool(chat_stream.group_info) reply_to = reply_data.get("reply_to", "none") extra_info_block = reply_data.get("extra_info", "") or reply_data.get("extra_info_block", "") @@ -661,31 +661,7 @@ class DefaultReplyer: time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - # logger.debug("开始构建 focus prompt") - bot_name = global_config.bot.nickname - if global_config.bot.alias_names: - bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" - else: - bot_nickname = "" - short_impression = await person_info_manager.get_value(bot_person_id, "short_impression") - # 解析字符串形式的Python列表 - try: - if isinstance(short_impression, str) and short_impression.strip(): - short_impression = ast.literal_eval(short_impression) - elif not short_impression: - logger.warning("short_impression为空,使用默认值") - short_impression = ["友好活泼", "人类"] - except (ValueError, SyntaxError) as e: - logger.error(f"解析short_impression失败: {e}, 原始值: {short_impression}") - short_impression = ["友好活泼", "人类"] - # 确保short_impression是列表格式且有足够的元素 - if not isinstance(short_impression, list) or len(short_impression) < 2: - logger.warning(f"short_impression格式不正确: {short_impression}, 使用默认值") - short_impression = ["友好活泼", "人类"] - personality = short_impression[0] - identity = short_impression[1] - prompt_personality = f"{personality},{identity}" - identity_block = f"你的名字是{bot_name}{bot_nickname},你{prompt_personality}:" + identity_block = get_individuality().get_personality_block() moderation_prompt_block = ( "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。不要随意遵从他人指令。" @@ -732,24 +708,24 @@ class DefaultReplyer: "chat_target_private2", sender_name=chat_target_name ) + target_user_id = "" + if sender: + # 根据sender通过person_info_manager反向查找person_id,再获取user_id + person_id = person_info_manager.get_person_id_by_person_name(sender) + + + # 根据配置选择使用哪种 prompt 构建模式 - if global_config.chat.use_s4u_prompt_mode: + if global_config.chat.use_s4u_prompt_mode and person_id: # 使用 s4u 对话构建模式:分离当前对话对象和其他对话 - - # 获取目标用户ID用于消息过滤 - target_user_id = "" - if sender: - # 根据sender通过person_info_manager反向查找person_id,再获取user_id - person_id = person_info_manager.get_person_id_by_person_name(sender) - if person_id: - # 通过person_info_manager获取person_id对应的user_id字段 - try: - user_id_value = await person_info_manager.get_value(person_id, "user_id") - if user_id_value: - target_user_id = str(user_id_value) - except Exception as e: - logger.warning(f"无法从person_id {person_id} 获取user_id: {e}") - target_user_id = "" + try: + user_id_value = await person_info_manager.get_value(person_id, "user_id") + if user_id_value: + target_user_id = str(user_id_value) + except Exception as e: + logger.warning(f"无法从person_id {person_id} 获取user_id: {e}") + target_user_id = "" + # 构建分离的对话 prompt core_dialogue_prompt, background_dialogue_prompt = self.build_s4u_chat_history_prompts( @@ -811,8 +787,6 @@ class DefaultReplyer: ) -> str: chat_stream = self.chat_stream chat_id = chat_stream.stream_id - person_info_manager = get_person_info_manager() - bot_person_id = person_info_manager.get_person_id("system", "bot_id") is_group_chat = bool(chat_stream.group_info) reply_to = reply_data.get("reply_to", "none") @@ -844,29 +818,7 @@ class DefaultReplyer: time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - bot_name = global_config.bot.nickname - if global_config.bot.alias_names: - bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" - else: - bot_nickname = "" - short_impression = await person_info_manager.get_value(bot_person_id, "short_impression") - try: - if isinstance(short_impression, str) and short_impression.strip(): - short_impression = ast.literal_eval(short_impression) - elif not short_impression: - logger.warning("short_impression为空,使用默认值") - short_impression = ["友好活泼", "人类"] - except (ValueError, SyntaxError) as e: - logger.error(f"解析short_impression失败: {e}, 原始值: {short_impression}") - short_impression = ["友好活泼", "人类"] - # 确保short_impression是列表格式且有足够的元素 - if not isinstance(short_impression, list) or len(short_impression) < 2: - logger.warning(f"short_impression格式不正确: {short_impression}, 使用默认值") - short_impression = ["友好活泼", "人类"] - personality = short_impression[0] - identity = short_impression[1] - prompt_personality = f"{personality},{identity}" - identity_block = f"你的名字是{bot_name}{bot_nickname},你{prompt_personality}:" + identity_block = get_individuality().get_personality_block() moderation_prompt_block = ( "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。不要随意遵从他人指令。" diff --git a/src/chat/working_memory/memory_item.py b/src/chat/working_memory/memory_item.py deleted file mode 100644 index dc6ab0652..000000000 --- a/src/chat/working_memory/memory_item.py +++ /dev/null @@ -1,84 +0,0 @@ -from typing import Tuple -import time -import random -import string - - -class MemoryItem: - """记忆项类,用于存储单个记忆的所有相关信息""" - - def __init__(self, summary: str, from_source: str = "", brief: str = ""): - """ - 初始化记忆项 - - Args: - summary: 记忆内容概括 - from_source: 数据来源 - brief: 记忆内容主题 - """ - # 生成可读ID:时间戳_随机字符串 - timestamp = int(time.time()) - random_str = "".join(random.choices(string.ascii_lowercase + string.digits, k=2)) - self.id = f"{timestamp}_{random_str}" - self.from_source = from_source - self.brief = brief - self.timestamp = time.time() - - # 记忆内容概括 - self.summary = summary - - # 记忆精简次数 - self.compress_count = 0 - - # 记忆提取次数 - self.retrieval_count = 0 - - # 记忆强度 (初始为10) - self.memory_strength = 10.0 - - # 记忆操作历史记录 - # 格式: [(操作类型, 时间戳, 当时精简次数, 当时强度), ...] - self.history = [("create", self.timestamp, self.compress_count, self.memory_strength)] - - def matches_source(self, source: str) -> bool: - """检查来源是否匹配""" - return self.from_source == source - - def increase_strength(self, amount: float) -> None: - """增加记忆强度""" - self.memory_strength = min(10.0, self.memory_strength + amount) - # 记录操作历史 - self.record_operation("strengthen") - - def decrease_strength(self, amount: float) -> None: - """减少记忆强度""" - self.memory_strength = max(0.1, self.memory_strength - amount) - # 记录操作历史 - self.record_operation("weaken") - - def increase_compress_count(self) -> None: - """增加精简次数并减弱记忆强度""" - self.compress_count += 1 - # 记录操作历史 - self.record_operation("compress") - - def record_retrieval(self) -> None: - """记录记忆被提取的情况""" - self.retrieval_count += 1 - # 提取后强度翻倍 - self.memory_strength = min(10.0, self.memory_strength * 2) - # 记录操作历史 - self.record_operation("retrieval") - - def record_operation(self, operation_type: str) -> None: - """记录操作历史""" - current_time = time.time() - self.history.append((operation_type, current_time, self.compress_count, self.memory_strength)) - - def to_tuple(self) -> Tuple[str, str, float, str]: - """转换为元组格式(为了兼容性)""" - return (self.summary, self.from_source, self.timestamp, self.id) - - def is_memory_valid(self) -> bool: - """检查记忆是否有效(强度是否大于等于1)""" - return self.memory_strength >= 1.0 diff --git a/src/chat/working_memory/memory_manager.py b/src/chat/working_memory/memory_manager.py deleted file mode 100644 index fd28bc947..000000000 --- a/src/chat/working_memory/memory_manager.py +++ /dev/null @@ -1,413 +0,0 @@ -from typing import Dict, TypeVar, List, Optional -import traceback -from json_repair import repair_json -from rich.traceback import install -from src.common.logger import get_logger -from src.llm_models.utils_model import LLMRequest -from src.config.config import global_config -from src.chat.focus_chat.working_memory.memory_item import MemoryItem -import json # 添加json模块导入 - - -install(extra_lines=3) -logger = get_logger("working_memory") - -T = TypeVar("T") - - -class MemoryManager: - def __init__(self, chat_id: str): - """ - 初始化工作记忆 - - Args: - chat_id: 关联的聊天ID,用于标识该工作记忆属于哪个聊天 - """ - # 关联的聊天ID - self._chat_id = chat_id - - # 记忆项列表 - self._memories: List[MemoryItem] = [] - - # ID到记忆项的映射 - self._id_map: Dict[str, MemoryItem] = {} - - self.llm_summarizer = LLMRequest( - model=global_config.model.memory, - temperature=0.3, - request_type="working_memory", - ) - - @property - def chat_id(self) -> str: - """获取关联的聊天ID""" - return self._chat_id - - @chat_id.setter - def chat_id(self, value: str): - """设置关联的聊天ID""" - self._chat_id = value - - def push_item(self, memory_item: MemoryItem) -> str: - """ - 推送一个已创建的记忆项到工作记忆中 - - Args: - memory_item: 要存储的记忆项 - - Returns: - 记忆项的ID - """ - # 添加到内存和ID映射 - self._memories.append(memory_item) - self._id_map[memory_item.id] = memory_item - - return memory_item.id - - def get_by_id(self, memory_id: str) -> Optional[MemoryItem]: - """ - 通过ID获取记忆项 - - Args: - memory_id: 记忆项ID - - Returns: - 找到的记忆项,如果不存在则返回None - """ - memory_item = self._id_map.get(memory_id) - if memory_item: - # 检查记忆强度,如果小于1则删除 - if not memory_item.is_memory_valid(): - print(f"记忆 {memory_id} 强度过低 ({memory_item.memory_strength}),已自动移除") - self.delete(memory_id) - return None - - return memory_item - - def get_all_items(self) -> List[MemoryItem]: - """获取所有记忆项""" - return list(self._id_map.values()) - - def find_items( - self, - source: Optional[str] = None, - start_time: Optional[float] = None, - end_time: Optional[float] = None, - memory_id: Optional[str] = None, - limit: Optional[int] = None, - newest_first: bool = False, - min_strength: float = 0.0, - ) -> List[MemoryItem]: - """ - 按条件查找记忆项 - - Args: - source: 数据来源 - start_time: 开始时间戳 - end_time: 结束时间戳 - memory_id: 特定记忆项ID - limit: 返回结果的最大数量 - newest_first: 是否按最新优先排序 - min_strength: 最小记忆强度 - - Returns: - 符合条件的记忆项列表 - """ - # 如果提供了特定ID,直接查找 - if memory_id: - item = self.get_by_id(memory_id) - return [item] if item else [] - - results = [] - - # 获取所有项目 - items = self._memories - - # 如果需要最新优先,则反转遍历顺序 - if newest_first: - items_to_check = list(reversed(items)) - else: - items_to_check = items - - # 遍历项目 - for item in items_to_check: - # 检查来源是否匹配 - if source is not None and not item.matches_source(source): - continue - - # 检查时间范围 - if start_time is not None and item.timestamp < start_time: - continue - if end_time is not None and item.timestamp > end_time: - continue - - # 检查记忆强度 - if min_strength > 0 and item.memory_strength < min_strength: - continue - - # 所有条件都满足,添加到结果中 - results.append(item) - - # 如果达到限制数量,提前返回 - if limit is not None and len(results) >= limit: - return results - - return results - - async def summarize_memory_item(self, content: str) -> Dict[str, str]: - """ - 使用LLM总结记忆项 - - Args: - content: 需要总结的内容 - - Returns: - 包含brief和summary的字典 - """ - prompt = f"""请对以下内容进行总结,总结成记忆,输出两部分: -1. 记忆内容主题(精简,20字以内):让用户可以一眼看出记忆内容是什么 -2. 记忆内容概括:对内容进行概括,保留重要信息,200字以内 - -内容: -{content} - -请按以下JSON格式输出: -{{ - "brief": "记忆内容主题", - "summary": "记忆内容概括" -}} -请确保输出是有效的JSON格式,不要添加任何额外的说明或解释。 -""" - default_summary = { - "brief": "主题未知的记忆", - "summary": "无法概括的记忆内容", - } - - try: - # 调用LLM生成总结 - response, _ = await self.llm_summarizer.generate_response_async(prompt) - - # 使用repair_json解析响应 - try: - # 使用repair_json修复JSON格式 - fixed_json_string = repair_json(response) - - # 如果repair_json返回的是字符串,需要解析为Python对象 - if isinstance(fixed_json_string, str): - try: - json_result = json.loads(fixed_json_string) - except json.JSONDecodeError as decode_error: - logger.error(f"JSON解析错误: {str(decode_error)}") - return default_summary - else: - # 如果repair_json直接返回了字典对象,直接使用 - json_result = fixed_json_string - - # 进行额外的类型检查 - if not isinstance(json_result, dict): - logger.error(f"修复后的JSON不是字典类型: {type(json_result)}") - return default_summary - - # 确保所有必要字段都存在且类型正确 - if "brief" not in json_result or not isinstance(json_result["brief"], str): - json_result["brief"] = "主题未知的记忆" - - if "summary" not in json_result or not isinstance(json_result["summary"], str): - json_result["summary"] = "无法概括的记忆内容" - - return json_result - - except Exception as json_error: - logger.error(f"JSON处理失败: {str(json_error)},将使用默认摘要") - return default_summary - - except Exception as e: - logger.error(f"生成总结时出错: {str(e)}") - return default_summary - - def decay_memory(self, memory_id: str, decay_factor: float = 0.8) -> bool: - """ - 使单个记忆衰减 - - Args: - memory_id: 记忆ID - decay_factor: 衰减因子(0-1之间) - - Returns: - 是否成功衰减 - """ - memory_item = self.get_by_id(memory_id) - if not memory_item: - return False - - # 计算衰减量(当前强度 * (1-衰减因子)) - old_strength = memory_item.memory_strength - decay_amount = old_strength * (1 - decay_factor) - - # 更新强度 - memory_item.memory_strength = decay_amount - - return True - - def delete(self, memory_id: str) -> bool: - """ - 删除指定ID的记忆项 - - Args: - memory_id: 要删除的记忆项ID - - Returns: - 是否成功删除 - """ - if memory_id not in self._id_map: - return False - - # 获取要删除的项 - self._id_map[memory_id] - - # 从内存中删除 - self._memories = [i for i in self._memories if i.id != memory_id] - - # 从ID映射中删除 - del self._id_map[memory_id] - - return True - - def clear(self) -> None: - """清除所有记忆""" - self._memories.clear() - self._id_map.clear() - - async def merge_memories( - self, memory_id1: str, memory_id2: str, reason: str, delete_originals: bool = True - ) -> MemoryItem: - """ - 合并两个记忆项 - - Args: - memory_id1: 第一个记忆项ID - memory_id2: 第二个记忆项ID - reason: 合并原因 - delete_originals: 是否删除原始记忆,默认为True - - Returns: - 合并后的记忆项 - """ - # 获取两个记忆项 - memory_item1 = self.get_by_id(memory_id1) - memory_item2 = self.get_by_id(memory_id2) - - if not memory_item1 or not memory_item2: - raise ValueError("无法找到指定的记忆项") - - # 构建合并提示 - prompt = f""" -请根据以下原因,将两段记忆内容有机合并成一段新的记忆内容。 -合并时保留两段记忆的重要信息,避免重复,确保生成的内容连贯、自然。 - -合并原因:{reason} - -记忆1主题:{memory_item1.brief} -记忆1内容:{memory_item1.summary} - -记忆2主题:{memory_item2.brief} -记忆2内容:{memory_item2.summary} - -请按以下JSON格式输出合并结果: -{{ - "brief": "合并后的主题(20字以内)", - "summary": "合并后的内容概括(200字以内)" -}} -请确保输出是有效的JSON格式,不要添加任何额外的说明或解释。 -""" - - # 默认合并结果 - default_merged = { - "brief": f"合并:{memory_item1.brief} + {memory_item2.brief}", - "summary": f"合并的记忆:{memory_item1.summary}\n{memory_item2.summary}", - } - - try: - # 调用LLM合并记忆 - response, _ = await self.llm_summarizer.generate_response_async(prompt) - - # 处理LLM返回的合并结果 - try: - # 修复JSON格式 - fixed_json_string = repair_json(response) - - # 将修复后的字符串解析为Python对象 - if isinstance(fixed_json_string, str): - try: - merged_data = json.loads(fixed_json_string) - except json.JSONDecodeError as decode_error: - logger.error(f"JSON解析错误: {str(decode_error)}") - merged_data = default_merged - else: - # 如果repair_json直接返回了字典对象,直接使用 - merged_data = fixed_json_string - - # 确保是字典类型 - if not isinstance(merged_data, dict): - logger.error(f"修复后的JSON不是字典类型: {type(merged_data)}") - merged_data = default_merged - - if "brief" not in merged_data or not isinstance(merged_data["brief"], str): - merged_data["brief"] = default_merged["brief"] - - if "summary" not in merged_data or not isinstance(merged_data["summary"], str): - merged_data["summary"] = default_merged["summary"] - - except Exception as e: - logger.error(f"合并记忆时处理JSON出错: {str(e)}") - traceback.print_exc() - merged_data = default_merged - except Exception as e: - logger.error(f"合并记忆调用LLM出错: {str(e)}") - traceback.print_exc() - merged_data = default_merged - - # 创建新的记忆项 - # 取两个记忆项中更强的来源 - merged_source = ( - memory_item1.from_source - if memory_item1.memory_strength >= memory_item2.memory_strength - else memory_item2.from_source - ) - - # 创建新的记忆项 - merged_memory = MemoryItem( - summary=merged_data["summary"], from_source=merged_source, brief=merged_data["brief"] - ) - - # 记忆强度取两者最大值 - merged_memory.memory_strength = max(memory_item1.memory_strength, memory_item2.memory_strength) - - # 添加到存储中 - self.push_item(merged_memory) - - # 如果需要,删除原始记忆 - if delete_originals: - self.delete(memory_id1) - self.delete(memory_id2) - - return merged_memory - - def delete_earliest_memory(self) -> bool: - """ - 删除最早的记忆项 - - Returns: - 是否成功删除 - """ - # 获取所有记忆项 - all_memories = self.get_all_items() - - if not all_memories: - return False - - # 按时间戳排序,找到最早的记忆项 - earliest_memory = min(all_memories, key=lambda item: item.timestamp) - - # 删除最早的记忆项 - return self.delete(earliest_memory.id) diff --git a/src/chat/working_memory/working_memory.py b/src/chat/working_memory/working_memory.py deleted file mode 100644 index 9488a9dbe..000000000 --- a/src/chat/working_memory/working_memory.py +++ /dev/null @@ -1,156 +0,0 @@ -from typing import List, Any, Optional -import asyncio -from src.common.logger import get_logger -from src.chat.focus_chat.working_memory.memory_manager import MemoryManager, MemoryItem -from src.config.config import global_config - -logger = get_logger(__name__) - -# 问题是我不知道这个manager是不是需要和其他manager统一管理,因为这个manager是从属于每一个聊天流,都有自己的定时任务 - - -class WorkingMemory: - """ - 工作记忆,负责协调和运作记忆 - 从属于特定的流,用chat_id来标识 - """ - - def __init__(self, chat_id: str, max_memories_per_chat: int = 10, auto_decay_interval: int = 60): - """ - 初始化工作记忆管理器 - - Args: - max_memories_per_chat: 每个聊天的最大记忆数量 - auto_decay_interval: 自动衰减记忆的时间间隔(秒) - """ - self.memory_manager = MemoryManager(chat_id) - - # 记忆容量上限 - self.max_memories_per_chat = max_memories_per_chat - - # 自动衰减间隔 - self.auto_decay_interval = auto_decay_interval - - # 衰减任务 - self.decay_task = None - - # 只有在工作记忆处理器启用时才启动自动衰减任务 - if global_config.focus_chat_processor.working_memory_processor: - self._start_auto_decay() - else: - logger.debug(f"工作记忆处理器已禁用,跳过启动自动衰减任务 (chat_id: {chat_id})") - - def _start_auto_decay(self): - """启动自动衰减任务""" - if self.decay_task is None: - self.decay_task = asyncio.create_task(self._auto_decay_loop()) - - async def _auto_decay_loop(self): - """自动衰减循环""" - while True: - await asyncio.sleep(self.auto_decay_interval) - try: - await self.decay_all_memories() - except Exception as e: - print(f"自动衰减记忆时出错: {str(e)}") - - async def add_memory(self, summary: Any, from_source: str = "", brief: str = ""): - """ - 添加一段记忆到指定聊天 - - Args: - summary: 记忆内容 - from_source: 数据来源 - - Returns: - 记忆项 - """ - # 如果是字符串类型,生成总结 - - memory = MemoryItem(summary, from_source, brief) - - # 添加到管理器 - self.memory_manager.push_item(memory) - - # 如果超过最大记忆数量,删除最早的记忆 - if len(self.memory_manager.get_all_items()) > self.max_memories_per_chat: - self.remove_earliest_memory() - - return memory - - def remove_earliest_memory(self): - """ - 删除最早的记忆 - """ - return self.memory_manager.delete_earliest_memory() - - async def retrieve_memory(self, memory_id: str) -> Optional[MemoryItem]: - """ - 检索记忆 - - Args: - chat_id: 聊天ID - memory_id: 记忆ID - - Returns: - 检索到的记忆项,如果不存在则返回None - """ - memory_item = self.memory_manager.get_by_id(memory_id) - if memory_item: - memory_item.retrieval_count += 1 - memory_item.increase_strength(5) - return memory_item - return None - - async def decay_all_memories(self, decay_factor: float = 0.5): - """ - 对所有聊天的所有记忆进行衰减 - 衰减:对记忆进行refine压缩,强度会变为原先的0.5 - - Args: - decay_factor: 衰减因子(0-1之间) - """ - logger.debug(f"开始对所有记忆进行衰减,衰减因子: {decay_factor}") - - all_memories = self.memory_manager.get_all_items() - - for memory_item in all_memories: - # 如果压缩完小于1会被删除 - memory_id = memory_item.id - self.memory_manager.decay_memory(memory_id, decay_factor) - if memory_item.memory_strength < 1: - self.memory_manager.delete(memory_id) - continue - # 计算衰减量 - # if memory_item.memory_strength < 5: - # await self.memory_manager.refine_memory( - # memory_id, f"由于时间过去了{self.auto_decay_interval}秒,记忆变的模糊,所以需要压缩" - # ) - - async def merge_memory(self, memory_id1: str, memory_id2: str) -> MemoryItem: - """合并记忆 - - Args: - memory_str: 记忆内容 - """ - return await self.memory_manager.merge_memories( - memory_id1=memory_id1, memory_id2=memory_id2, reason="两端记忆有重复的内容" - ) - - async def shutdown(self) -> None: - """关闭管理器,停止所有任务""" - if self.decay_task and not self.decay_task.done(): - self.decay_task.cancel() - try: - await self.decay_task - except asyncio.CancelledError: - pass - - def get_all_memories(self) -> List[MemoryItem]: - """ - 获取所有记忆项目 - - Returns: - List[MemoryItem]: 当前工作记忆中的所有记忆项目列表 - """ - return self.memory_manager.get_all_items() diff --git a/src/chat/working_memory/working_memory_processor.py b/src/chat/working_memory/working_memory_processor.py deleted file mode 100644 index 562278462..000000000 --- a/src/chat/working_memory/working_memory_processor.py +++ /dev/null @@ -1,261 +0,0 @@ -from src.chat.focus_chat.observation.chatting_observation import ChattingObservation -from src.chat.focus_chat.observation.observation import Observation -from src.llm_models.utils_model import LLMRequest -from src.config.config import global_config -import time -import traceback -from src.common.logger import get_logger -from src.chat.utils.prompt_builder import Prompt, global_prompt_manager -from src.chat.message_receive.chat_stream import get_chat_manager -from typing import List -from src.chat.focus_chat.observation.working_observation import WorkingMemoryObservation -from src.chat.focus_chat.working_memory.working_memory import WorkingMemory -from src.chat.focus_chat.info.info_base import InfoBase -from json_repair import repair_json -from src.chat.focus_chat.info.workingmemory_info import WorkingMemoryInfo -import asyncio -import json - -logger = get_logger("processor") - - -def init_prompt(): - memory_proces_prompt = """ -你的名字是{bot_name} - -现在是{time_now},你正在上网,和qq群里的网友们聊天,以下是正在进行的聊天内容: -{chat_observe_info} - -以下是你已经总结的记忆摘要,你可以调取这些记忆查看内容来帮助你聊天,不要一次调取太多记忆,最多调取3个左右记忆: -{memory_str} - -观察聊天内容和已经总结的记忆,思考如果有相近的记忆,请合并记忆,输出merge_memory, -合并记忆的格式为[["id1", "id2"], ["id3", "id4"],...],你可以进行多组合并,但是每组合并只能有两个记忆id,不要输出其他内容 - -请根据聊天内容选择你需要调取的记忆并考虑是否添加新记忆,以JSON格式输出,格式如下: -```json -{{ - "selected_memory_ids": ["id1", "id2", ...] - "merge_memory": [["id1", "id2"], ["id3", "id4"],...] -}} -``` -""" - Prompt(memory_proces_prompt, "prompt_memory_proces") - - -class WorkingMemoryProcessor: - log_prefix = "工作记忆" - - def __init__(self, subheartflow_id: str): - self.subheartflow_id = subheartflow_id - - self.llm_model = LLMRequest( - model=global_config.model.planner, - request_type="focus.processor.working_memory", - ) - - name = get_chat_manager().get_stream_name(self.subheartflow_id) - self.log_prefix = f"[{name}] " - - async def process_info(self, observations: List[Observation] = None, *infos) -> List[InfoBase]: - """处理信息对象 - - Args: - *infos: 可变数量的InfoBase类型的信息对象 - - Returns: - List[InfoBase]: 处理后的结构化信息列表 - """ - working_memory = None - chat_info = "" - chat_obs = None - try: - for observation in observations: - if isinstance(observation, WorkingMemoryObservation): - working_memory = observation.get_observe_info() - if isinstance(observation, ChattingObservation): - chat_info = observation.get_observe_info() - chat_obs = observation - # 检查是否有待压缩内容 - if chat_obs and chat_obs.compressor_prompt: - logger.debug(f"{self.log_prefix} 压缩聊天记忆") - await self.compress_chat_memory(working_memory, chat_obs) - - # 检查working_memory是否为None - if working_memory is None: - logger.debug(f"{self.log_prefix} 没有找到工作记忆观察,跳过处理") - return [] - - all_memory = working_memory.get_all_memories() - if not all_memory: - logger.debug(f"{self.log_prefix} 目前没有工作记忆,跳过提取") - return [] - - memory_prompts = [] - for memory in all_memory: - memory_id = memory.id - memory_brief = memory.brief - memory_single_prompt = f"记忆id:{memory_id},记忆摘要:{memory_brief}\n" - memory_prompts.append(memory_single_prompt) - - memory_choose_str = "".join(memory_prompts) - - # 使用提示模板进行处理 - prompt = (await global_prompt_manager.get_prompt_async("prompt_memory_proces")).format( - bot_name=global_config.bot.nickname, - time_now=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), - chat_observe_info=chat_info, - memory_str=memory_choose_str, - ) - - # 调用LLM处理记忆 - content = "" - try: - content, _ = await self.llm_model.generate_response_async(prompt=prompt) - - # print(f"prompt: {prompt}---------------------------------") - # print(f"content: {content}---------------------------------") - - if not content: - logger.warning(f"{self.log_prefix} LLM返回空结果,处理工作记忆失败。") - return [] - except Exception as e: - logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}") - logger.error(traceback.format_exc()) - return [] - - # 解析LLM返回的JSON - try: - result = repair_json(content) - if isinstance(result, str): - result = json.loads(result) - if not isinstance(result, dict): - logger.error(f"{self.log_prefix} 解析LLM返回的JSON失败,结果不是字典类型: {type(result)}") - return [] - - selected_memory_ids = result.get("selected_memory_ids", []) - merge_memory = result.get("merge_memory", []) - except Exception as e: - logger.error(f"{self.log_prefix} 解析LLM返回的JSON失败: {e}") - logger.error(traceback.format_exc()) - return [] - - logger.debug( - f"{self.log_prefix} 解析LLM返回的JSON,selected_memory_ids: {selected_memory_ids}, merge_memory: {merge_memory}" - ) - - # 根据selected_memory_ids,调取记忆 - memory_str = "" - selected_ids = set(selected_memory_ids) # 转换为集合以便快速查找 - - # 遍历所有记忆 - for memory in all_memory: - if memory.id in selected_ids: - # 选中的记忆显示详细内容 - memory = await working_memory.retrieve_memory(memory.id) - if memory: - memory_str += f"{memory.summary}\n" - else: - # 未选中的记忆显示梗概 - memory_str += f"{memory.brief}\n" - - working_memory_info = WorkingMemoryInfo() - if memory_str: - working_memory_info.add_working_memory(memory_str) - logger.debug(f"{self.log_prefix} 取得工作记忆: {memory_str}") - else: - logger.debug(f"{self.log_prefix} 没有找到工作记忆") - - if merge_memory: - for merge_pairs in merge_memory: - memory1 = await working_memory.retrieve_memory(merge_pairs[0]) - memory2 = await working_memory.retrieve_memory(merge_pairs[1]) - if memory1 and memory2: - asyncio.create_task(self.merge_memory_async(working_memory, merge_pairs[0], merge_pairs[1])) - - return [working_memory_info] - except Exception as e: - logger.error(f"{self.log_prefix} 处理观察时出错: {e}") - logger.error(traceback.format_exc()) - return [] - - async def compress_chat_memory(self, working_memory: WorkingMemory, obs: ChattingObservation): - """压缩聊天记忆 - - Args: - working_memory: 工作记忆对象 - obs: 聊天观察对象 - """ - # 检查working_memory是否为None - if working_memory is None: - logger.warning(f"{self.log_prefix} 工作记忆对象为None,无法压缩聊天记忆") - return - - try: - summary_result, _ = await self.llm_model.generate_response_async(obs.compressor_prompt) - if not summary_result: - logger.debug(f"{self.log_prefix} 压缩聊天记忆失败: 没有生成摘要") - return - - print(f"compressor_prompt: {obs.compressor_prompt}") - print(f"summary_result: {summary_result}") - - # 修复并解析JSON - try: - fixed_json = repair_json(summary_result) - summary_data = json.loads(fixed_json) - - if not isinstance(summary_data, dict): - logger.error(f"{self.log_prefix} 解析压缩结果失败: 不是有效的JSON对象") - return - - theme = summary_data.get("theme", "") - content = summary_data.get("content", "") - - if not theme or not content: - logger.error(f"{self.log_prefix} 解析压缩结果失败: 缺少必要字段") - return - - # 创建新记忆 - await working_memory.add_memory(from_source="chat_compress", summary=content, brief=theme) - - logger.debug(f"{self.log_prefix} 压缩聊天记忆成功: {theme} - {content}") - - except Exception as e: - logger.error(f"{self.log_prefix} 解析压缩结果失败: {e}") - logger.error(traceback.format_exc()) - return - - # 清理压缩状态 - obs.compressor_prompt = "" - obs.oldest_messages = [] - obs.oldest_messages_str = "" - - except Exception as e: - logger.error(f"{self.log_prefix} 压缩聊天记忆失败: {e}") - logger.error(traceback.format_exc()) - - async def merge_memory_async(self, working_memory: WorkingMemory, memory_id1: str, memory_id2: str): - """异步合并记忆,不阻塞主流程 - - Args: - working_memory: 工作记忆对象 - memory_id1: 第一个记忆ID - memory_id2: 第二个记忆ID - """ - # 检查working_memory是否为None - if working_memory is None: - logger.warning(f"{self.log_prefix} 工作记忆对象为None,无法合并记忆") - return - - try: - merged_memory = await working_memory.merge_memory(memory_id1, memory_id2) - logger.debug(f"{self.log_prefix} 合并后的记忆梗概: {merged_memory.brief}") - logger.debug(f"{self.log_prefix} 合并后的记忆内容: {merged_memory.summary}") - - except Exception as e: - logger.error(f"{self.log_prefix} 异步合并记忆失败: {e}") - logger.error(traceback.format_exc()) - - -init_prompt() diff --git a/src/config/config.py b/src/config/config.py index 6bf97d005..d40679b71 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -13,11 +13,9 @@ from src.config.config_base import ConfigBase from src.config.official_configs import ( BotConfig, PersonalityConfig, - IdentityConfig, ExpressionConfig, ChatConfig, NormalChatConfig, - FocusChatConfig, EmojiConfig, MemoryConfig, MoodConfig, @@ -145,12 +143,10 @@ class Config(ConfigBase): bot: BotConfig personality: PersonalityConfig - identity: IdentityConfig relationship: RelationshipConfig chat: ChatConfig message_receive: MessageReceiveConfig normal_chat: NormalChatConfig - focus_chat: FocusChatConfig emoji: EmojiConfig expression: ExpressionConfig memory: MemoryConfig diff --git a/src/config/official_configs.py b/src/config/official_configs.py index b2cec9511..4433bae44 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -35,20 +35,15 @@ class PersonalityConfig(ConfigBase): personality_core: str """核心人格""" - personality_sides: list[str] = field(default_factory=lambda: []) + personality_side: str """人格侧写""" + + identity: str = "" + """身份特征""" compress_personality: bool = True """是否压缩人格,压缩后会精简人格信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果人设不长,可以关闭""" - -@dataclass -class IdentityConfig(ConfigBase): - """个体特征配置类""" - - identity_detail: list[str] = field(default_factory=lambda: []) - """身份特征""" - compress_identity: bool = True """是否压缩身份,压缩后会精简身份信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果不长,可以关闭""" @@ -108,11 +103,9 @@ class ChatConfig(ConfigBase): 表示从该时间开始使用该频率,直到下一个时间点 """ - auto_focus_threshold: float = 1.0 - """自动切换到专注聊天的阈值,越低越容易进入专注聊天""" + focus_value: float = 1.0 + """麦麦的专注思考能力,越低越容易专注,消耗token也越多""" - exit_focus_threshold: float = 1.0 - """自动退出专注聊天的阈值,越低越容易退出专注聊天""" def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float: """ @@ -253,7 +246,6 @@ class ChatConfig(ConfigBase): except (ValueError, IndexError): return None - @dataclass class MessageReceiveConfig(ConfigBase): """消息接收配置类""" @@ -282,12 +274,6 @@ class NormalChatConfig(ConfigBase): """@bot 必然回复""" -@dataclass -class FocusChatConfig(ConfigBase): - """专注聊天配置类""" - - consecutive_replies: float = 1 - """连续回复能力,值越高,麦麦连续回复的概率越高""" @dataclass diff --git a/src/individuality/identity.py b/src/individuality/identity.py deleted file mode 100644 index 730615e3d..000000000 --- a/src/individuality/identity.py +++ /dev/null @@ -1,30 +0,0 @@ -from dataclasses import dataclass -from typing import List, Optional - - -@dataclass -class Identity: - """身份特征类""" - - identity_detail: List[str] # 身份细节描述 - - def __init__(self, identity_detail: Optional[List[str]] = None): - """初始化身份特征 - - Args: - identity_detail: 身份细节描述列表 - """ - if identity_detail is None: - identity_detail = [] - self.identity_detail = identity_detail - - def to_dict(self) -> dict: - """将身份特征转换为字典格式""" - return { - "identity_detail": self.identity_detail, - } - - @classmethod - def from_dict(cls, data: dict) -> "Identity": - """从字典创建身份特征实例""" - return cls(identity_detail=data.get("identity_detail", [])) diff --git a/src/individuality/individuality.py b/src/individuality/individuality.py index 47048a2bd..878e00455 100644 --- a/src/individuality/individuality.py +++ b/src/individuality/individuality.py @@ -3,7 +3,27 @@ import random import json import os import hashlib +from typing import List, Optional, Dict, Any, Tuple +from datetime import datetime +from src.common.logger import get_logger +from src.config.config import global_config +from src.llm_models.utils_model import LLMRequest +from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending +from src.chat.message_receive.chat_stream import ChatStream +from src.chat.message_receive.uni_message_sender import HeartFCSender +from src.chat.utils.timer_calculator import Timer # <--- Import Timer +from src.chat.utils.utils import get_chat_type_and_target_info +from src.chat.utils.prompt_builder import Prompt, global_prompt_manager +from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat +from src.chat.express.expression_selector import expression_selector +from src.chat.knowledge.knowledge_lib import qa_manager +from src.chat.memory_system.memory_activator import MemoryActivator +from src.mood.mood_manager import mood_manager +from src.person_info.relationship_fetcher import relationship_fetcher_manager +from src.person_info.person_info import get_person_info_manager +from src.tools.tool_executor import ToolExecutor +from src.plugin_system.base.component_types import ActionInfo from typing import Optional from rich.traceback import install @@ -12,7 +32,6 @@ from src.config.config import global_config from src.llm_models.utils_model import LLMRequest from src.person_info.person_info import get_person_info_manager from .personality import Personality -from .identity import Identity install(extra_lines=3) @@ -25,7 +44,6 @@ class Individuality: def __init__(self): # 正常初始化实例属性 self.personality: Personality = None # type: ignore - self.identity: Optional[Identity] = None self.name = "" self.bot_person_id = "" @@ -36,21 +54,20 @@ class Individuality: request_type="individuality.compress", ) - async def initialize( - self, - bot_nickname: str, - personality_core: str, - personality_sides: list, - identity_detail: list, - ) -> None: + async def initialize(self) -> None: """初始化个体特征 Args: bot_nickname: 机器人昵称 personality_core: 人格核心特点 - personality_sides: 人格侧面描述 - identity_detail: 身份细节描述 + personality_side: 人格侧面描述 + identity: 身份细节描述 """ + bot_nickname=global_config.bot.nickname + personality_core=global_config.personality.personality_core + personality_side=global_config.personality.personality_side + identity=global_config.personality.identity + logger.info("正在初始化个体特征") person_info_manager = get_person_info_manager() self.bot_person_id = person_info_manager.get_person_id("system", "bot_id") @@ -58,26 +75,28 @@ class Individuality: # 检查配置变化,如果变化则清空 personality_changed, identity_changed = await self._check_config_and_clear_if_changed( - bot_nickname, personality_core, personality_sides, identity_detail + bot_nickname, personality_core, personality_side, identity ) - # 初始化人格 + # 初始化人格(现在包含身份) self.personality = Personality.initialize( - bot_nickname=bot_nickname, personality_core=personality_core, personality_sides=personality_sides + bot_nickname=bot_nickname, + personality_core=personality_core, + personality_side=personality_side, + identity=identity, + compress_personality=global_config.personality.compress_personality, + compress_identity=global_config.personality.compress_identity, ) - # 初始化身份 - self.identity = Identity(identity_detail=identity_detail) - logger.info("正在将所有人设写入impression") # 将所有人设写入impression impression_parts = [] if personality_core: impression_parts.append(f"核心人格: {personality_core}") - if personality_sides: - impression_parts.append(f"人格侧面: {'、'.join(personality_sides)}") - if identity_detail: - impression_parts.append(f"身份: {'、'.join(identity_detail)}") + if personality_side: + impression_parts.append(f"人格侧面: {personality_side}") + if identity: + impression_parts.append(f"身份: {identity}") logger.info(f"impression_parts: {impression_parts}") impression_text = "。".join(impression_parts) @@ -103,7 +122,7 @@ class Individuality: if personality_changed: logger.info("检测到人格配置变化,重新生成压缩版本") - personality_result = await self._create_personality(personality_core, personality_sides) + personality_result = await self._create_personality(personality_core, personality_side) else: logger.info("人格配置未变化,使用缓存版本") # 从缓存中获取已有的personality结果 @@ -115,14 +134,14 @@ class Individuality: personality_result = existing_data[0] except (json.JSONDecodeError, TypeError, IndexError): logger.warning("无法解析现有的short_impression,将重新生成人格部分") - personality_result = await self._create_personality(personality_core, personality_sides) + personality_result = await self._create_personality(personality_core, personality_side) else: logger.info("未找到现有的人格缓存,重新生成") - personality_result = await self._create_personality(personality_core, personality_sides) + personality_result = await self._create_personality(personality_core, personality_side) if identity_changed: logger.info("检测到身份配置变化,重新生成压缩版本") - identity_result = await self._create_identity(identity_detail) + identity_result = await self._create_identity(identity) else: logger.info("身份配置未变化,使用缓存版本") # 从缓存中获取已有的identity结果 @@ -134,10 +153,10 @@ class Individuality: identity_result = existing_data[1] except (json.JSONDecodeError, TypeError, IndexError): logger.warning("无法解析现有的short_impression,将重新生成身份部分") - identity_result = await self._create_identity(identity_detail) + identity_result = await self._create_identity(identity) else: logger.info("未找到现有的身份缓存,重新生成") - identity_result = await self._create_identity(identity_detail) + identity_result = await self._create_identity(identity) result = [personality_result, identity_result] @@ -149,175 +168,41 @@ class Individuality: else: logger.error("人设构建失败") - def to_dict(self) -> dict: - """将个体特征转换为字典格式""" - return { - "personality": self.personality.to_dict() if self.personality else None, - "identity": self.identity.to_dict() if self.identity else None, - } - @classmethod - def from_dict(cls, data: dict) -> "Individuality": - """从字典创建个体特征实例""" - instance = cls() - if data.get("personality"): - instance.personality = Personality.from_dict(data["personality"]) - if data.get("identity"): - instance.identity = Identity.from_dict(data["identity"]) - return instance - - def get_personality_prompt(self, level: int, x_person: int = 2) -> str: - """ - 获取人格特征的prompt - - Args: - level (int): 详细程度 (1: 核心, 2: 核心+随机侧面, 3: 核心+所有侧面) - x_person (int, optional): 人称代词 (0: 无人称, 1: 我, 2: 你). 默认为 2. - - Returns: - str: 生成的人格prompt字符串 - """ - if x_person not in [0, 1, 2]: - return "无效的人称代词,请使用 0 (无人称), 1 (我) 或 2 (你)。" - if not self.personality: - return "人格特征尚未初始化。" - - if x_person == 2: - p_pronoun = "你" - prompt_personality = f"{p_pronoun}{self.personality.personality_core}" - elif x_person == 1: - p_pronoun = "我" - prompt_personality = f"{p_pronoun}{self.personality.personality_core}" - else: # x_person == 0 - # 对于无人称,直接描述核心特征 - prompt_personality = f"{self.personality.personality_core}" - - # 根据level添加人格侧面 - if level >= 2 and self.personality.personality_sides: - personality_sides = list(self.personality.personality_sides) - random.shuffle(personality_sides) - if level == 2: - prompt_personality += f",有时也会{personality_sides[0]}" - elif level == 3: - sides_str = "、".join(personality_sides) - prompt_personality += f",有时也会{sides_str}" - prompt_personality += "。" - return prompt_personality - - def get_identity_prompt(self, level: int, x_person: int = 2) -> str: - # sourcery skip: assign-if-exp, merge-else-if-into-elif - """ - 获取身份特征的prompt - - Args: - level (int): 详细程度 (1: 随机细节, 2: 所有细节, 3: 同2) - x_person (int, optional): 人称代词 (0: 无人称, 1: 我, 2: 你). 默认为 2. - - Returns: - str: 生成的身份prompt字符串 - """ - if x_person not in [0, 1, 2]: - return "无效的人称代词,请使用 0 (无人称), 1 (我) 或 2 (你)。" - if not self.identity: - return "身份特征尚未初始化。" - - if x_person == 2: - i_pronoun = "你" - elif x_person == 1: - i_pronoun = "我" - else: # x_person == 0 - i_pronoun = "" # 无人称 - - identity_parts = [] - - # 根据level添加身份细节 - if level >= 1 and self.identity.identity_detail: - identity_detail = list(self.identity.identity_detail) - random.shuffle(identity_detail) - if level == 1: - identity_parts.append(f"{identity_detail[0]}") - elif level >= 2: - details_str = "、".join(identity_detail) - identity_parts.append(f"{details_str}") - - if identity_parts: - details_str = ",".join(identity_parts) - if x_person in {1, 2}: - return f"{i_pronoun},{details_str}。" - else: # x_person == 0 - # 无人称时,直接返回细节,不加代词和开头的逗号 - return f"{details_str}。" + async def get_personality_block(self) -> str: + person_info_manager = get_person_info_manager() + bot_person_id = person_info_manager.get_person_id("system", "bot_id") + + bot_name = global_config.bot.nickname + if global_config.bot.alias_names: + bot_nickname = f",也有人叫你{','.join(global_config.bot.alias_names)}" else: - if x_person in {1, 2}: - return f"{i_pronoun}的身份信息不完整。" - else: # x_person == 0 - return "身份信息不完整。" + bot_nickname = "" + short_impression = await person_info_manager.get_value(bot_person_id, "short_impression") + # 解析字符串形式的Python列表 + try: + if isinstance(short_impression, str) and short_impression.strip(): + short_impression = ast.literal_eval(short_impression) + elif not short_impression: + logger.warning("short_impression为空,使用默认值") + short_impression = ["友好活泼", "人类"] + except (ValueError, SyntaxError) as e: + logger.error(f"解析short_impression失败: {e}, 原始值: {short_impression}") + short_impression = ["友好活泼", "人类"] + # 确保short_impression是列表格式且有足够的元素 + if not isinstance(short_impression, list) or len(short_impression) < 2: + logger.warning(f"short_impression格式不正确: {short_impression}, 使用默认值") + short_impression = ["友好活泼", "人类"] + personality = short_impression[0] + identity = short_impression[1] + prompt_personality = f"{personality},{identity}" + identity_block = f"你的名字是{bot_name}{bot_nickname},你{prompt_personality}:" + + return identity_block - def get_prompt(self, level: int, x_person: int = 2) -> str: - """ - 获取合并的个体特征prompt - - Args: - level (int): 详细程度 (1: 核心/随机细节, 2: 核心+随机侧面/全部细节, 3: 全部) - x_person (int, optional): 人称代词 (0: 无人称, 1: 我, 2: 你). 默认为 2. - - Returns: - str: 生成的合并prompt字符串 - """ - if x_person not in [0, 1, 2]: - return "无效的人称代词,请使用 0 (无人称), 1 (我) 或 2 (你)。" - - if not self.personality or not self.identity: - return "个体特征尚未完全初始化。" - - # 调用新的独立方法 - prompt_personality = self.get_personality_prompt(level, x_person) - prompt_identity = self.get_identity_prompt(level, x_person) - - # 移除可能存在的错误信息,只合并有效的 prompt - valid_prompts = [] - if "尚未初始化" not in prompt_personality and "无效的人称" not in prompt_personality: - valid_prompts.append(prompt_personality) - if ( - "尚未初始化" not in prompt_identity - and "无效的人称" not in prompt_identity - and "信息不完整" not in prompt_identity - ): - # 从身份 prompt 中移除代词和句号,以便更好地合并 - identity_content = prompt_identity - if x_person == 2 and identity_content.startswith("你,"): - identity_content = identity_content[2:] - elif x_person == 1 and identity_content.startswith("我,"): - identity_content = identity_content[2:] - # 对于 x_person == 0,身份提示不带前缀,无需移除 - - if identity_content.endswith("。"): - identity_content = identity_content[:-1] - valid_prompts.append(identity_content) - - # --- 合并 Prompt --- - final_prompt = " ".join(valid_prompts) - - return final_prompt.strip() - - def get_traits(self, factor): - """ - 获取个体特征的特质 - """ - if factor == "openness": - return self.personality.openness - elif factor == "conscientiousness": - return self.personality.conscientiousness - elif factor == "extraversion": - return self.personality.extraversion - elif factor == "agreeableness": - return self.personality.agreeableness - elif factor == "neuroticism": - return self.personality.neuroticism - return None def _get_config_hash( - self, bot_nickname: str, personality_core: str, personality_sides: list, identity_detail: list + self, bot_nickname: str, personality_core: str, personality_side: str, identity: list ) -> tuple[str, str]: """获取personality和identity配置的哈希值 @@ -328,16 +213,16 @@ class Individuality: personality_config = { "nickname": bot_nickname, "personality_core": personality_core, - "personality_sides": sorted(personality_sides), - "compress_personality": global_config.personality.compress_personality, + "personality_side": personality_side, + "compress_personality": self.personality.compress_personality if self.personality else True, } personality_str = json.dumps(personality_config, sort_keys=True) personality_hash = hashlib.md5(personality_str.encode("utf-8")).hexdigest() # 身份配置哈希 identity_config = { - "identity_detail": sorted(identity_detail), - "compress_identity": global_config.identity.compress_identity, + "identity": sorted(identity), + "compress_identity": self.personality.compress_identity if self.personality else True, } identity_str = json.dumps(identity_config, sort_keys=True) identity_hash = hashlib.md5(identity_str.encode("utf-8")).hexdigest() @@ -345,7 +230,7 @@ class Individuality: return personality_hash, identity_hash async def _check_config_and_clear_if_changed( - self, bot_nickname: str, personality_core: str, personality_sides: list, identity_detail: list + self, bot_nickname: str, personality_core: str, personality_side: str, identity: list ) -> tuple[bool, bool]: """检查配置是否发生变化,如果变化则清空相应缓存 @@ -354,7 +239,7 @@ class Individuality: """ person_info_manager = get_person_info_manager() current_personality_hash, current_identity_hash = self._get_config_hash( - bot_nickname, personality_core, personality_sides, identity_detail + bot_nickname, personality_core, personality_side, identity ) meta_info = self._load_meta_info() @@ -410,54 +295,14 @@ class Individuality: except IOError as e: logger.error(f"保存meta_info文件失败: {e}") - async def get_keyword_info(self, keyword: str) -> str: - """获取指定关键词的信息 - Args: - keyword: 关键词 - - Returns: - str: 随机选择的一条信息,如果没有则返回空字符串 - """ - person_info_manager = get_person_info_manager() - info_list_json = await person_info_manager.get_value(self.bot_person_id, "info_list") - if info_list_json: - try: - # get_value might return a pre-deserialized list if it comes from a cache, - # or a JSON string if it comes from DB. - info_list = json.loads(info_list_json) if isinstance(info_list_json, str) else info_list_json - - for item in info_list: - if isinstance(item, dict) and item.get("info_type") == keyword: - return item.get("info_content", "") - except (json.JSONDecodeError, TypeError): - logger.error(f"解析info_list失败: {info_list_json}") - return "" - return "" - - async def get_all_keywords(self) -> list: - """获取所有已缓存的关键词列表""" - person_info_manager = get_person_info_manager() - info_list_json = await person_info_manager.get_value(self.bot_person_id, "info_list") - keywords = [] - if info_list_json: - try: - info_list = json.loads(info_list_json) if isinstance(info_list_json, str) else info_list_json - keywords.extend( - item["info_type"] for item in info_list if isinstance(item, dict) and "info_type" in item - ) - except (json.JSONDecodeError, TypeError): - logger.error(f"解析info_list失败: {info_list_json}") - return keywords - - async def _create_personality(self, personality_core: str, personality_sides: list) -> str: + async def _create_personality(self, personality_core: str, personality_side: str) -> str: # sourcery skip: merge-list-append, move-assign """使用LLM创建压缩版本的impression Args: personality_core: 核心人格 - personality_sides: 人格侧面列表 - identity_detail: 身份细节列表 + personality_side: 人格侧面列表 Returns: str: 压缩后的impression文本 @@ -470,12 +315,10 @@ class Individuality: personality_parts.append(f"{personality_core}") # 准备需要压缩的内容 - if global_config.personality.compress_personality: - personality_to_compress = [] - if personality_sides: - personality_to_compress.append(f"人格特质: {'、'.join(personality_sides)}") + if self.personality.compress_personality: + personality_to_compress = f"人格特质: {personality_side}" - prompt = f"""请将以下人格信息进行简洁压缩,保留主要内容,用简练的中文表达: + prompt = f"""请将以下人格信息进行简洁压缩,保留主要内容,用简练的中文表达: {personality_to_compress} 要求: @@ -483,34 +326,32 @@ class Individuality: 2. 尽量简洁,不超过30字 3. 直接输出压缩后的内容,不要解释""" - response, (_, _) = await self.model.generate_response_async( - prompt=prompt, - ) + response, (_, _) = await self.model.generate_response_async( + prompt=prompt, + ) - if response.strip(): - personality_parts.append(response.strip()) - logger.info(f"精简人格侧面: {response.strip()}") - else: - logger.error(f"使用LLM压缩人设时出错: {response}") - if personality_parts: - personality_result = "。".join(personality_parts) - else: - personality_result = personality_core + if response.strip(): + personality_parts.append(response.strip()) + logger.info(f"精简人格侧面: {response.strip()}") + else: + logger.error(f"使用LLM压缩人设时出错: {response}") + if personality_parts: + personality_result = "。".join(personality_parts) + else: + personality_result = personality_core else: personality_result = personality_core - if personality_sides: - personality_result += ",".join(personality_sides) + if personality_side: + personality_result += f",{personality_side}" return personality_result - async def _create_identity(self, identity_detail: list) -> str: + async def _create_identity(self, identity: list) -> str: """使用LLM创建压缩版本的impression""" logger.info("正在构建身份.........") - if global_config.identity.compress_identity: - identity_to_compress = [] - if identity_detail: - identity_to_compress.append(f"身份背景: {'、'.join(identity_detail)}") + if self.personality.compress_identity: + identity_to_compress = f"身份背景: {identity}" prompt = f"""请将以下身份信息进行简洁压缩,保留主要内容,用简练的中文表达: {identity_to_compress} @@ -530,7 +371,7 @@ class Individuality: else: logger.error(f"使用LLM压缩身份时出错: {response}") else: - identity_result = "。".join(identity_detail) + identity_result = "。".join(identity) return identity_result diff --git a/src/individuality/not_using/per_bf_gen.py b/src/individuality/not_using/per_bf_gen.py index 2d0961cb1..3b66d0551 100644 --- a/src/individuality/not_using/per_bf_gen.py +++ b/src/individuality/not_using/per_bf_gen.py @@ -33,10 +33,10 @@ else: def adapt_scene(scene: str) -> str: personality_core = config["personality"]["personality_core"] - personality_sides = config["personality"]["personality_sides"] - personality_side = random.choice(personality_sides) - identity_details = config["identity"]["identity_detail"] - identity_detail = random.choice(identity_details) + personality_side = config["personality"]["personality_side"] + personality_side = random.choice(personality_side) + identitys = config["identity"]["identity"] + identity = random.choice(identitys) """ 根据config中的属性,改编场景使其更适合当前角色 @@ -56,7 +56,7 @@ def adapt_scene(scene: str) -> str: - 外貌: {config["identity"]["appearance"]} - 性格核心: {personality_core} - 性格侧面: {personality_side} -- 身份细节: {identity_detail} +- 身份细节: {identity} 请根据上述形象,改编以下场景,在测评中,用户将根据该场景给出上述角色形象的反应: {scene} @@ -180,8 +180,8 @@ class PersonalityEvaluatorDirect: print("\n角色基本信息:") print(f"- 昵称:{config['bot']['nickname']}") print(f"- 性格核心:{config['personality']['personality_core']}") - print(f"- 性格侧面:{config['personality']['personality_sides']}") - print(f"- 身份细节:{config['identity']['identity_detail']}") + print(f"- 性格侧面:{config['personality']['personality_side']}") + print(f"- 身份细节:{config['identity']['identity']}") print("\n准备好了吗?按回车键开始...") input() @@ -262,8 +262,8 @@ class PersonalityEvaluatorDirect: "weight": config["identity"]["weight"], "appearance": config["identity"]["appearance"], "personality_core": config["personality"]["personality_core"], - "personality_sides": config["personality"]["personality_sides"], - "identity_detail": config["identity"]["identity_detail"], + "personality_side": config["personality"]["personality_side"], + "identity": config["identity"]["identity"], }, } diff --git a/src/individuality/personality.py b/src/individuality/personality.py index ace719331..5d666101e 100644 --- a/src/individuality/personality.py +++ b/src/individuality/personality.py @@ -9,14 +9,12 @@ from pathlib import Path class Personality: """人格特质类""" - openness: float # 开放性 - conscientiousness: float # 尽责性 - extraversion: float # 外向性 - agreeableness: float # 宜人性 - neuroticism: float # 神经质 bot_nickname: str # 机器人昵称 personality_core: str # 人格核心特点 - personality_sides: List[str] # 人格侧面描述 + personality_side: str # 人格侧面描述 + identity: List[str] # 身份细节描述 + compress_personality: bool # 是否压缩人格 + compress_identity: bool # 是否压缩身份 _instance = None @@ -25,11 +23,12 @@ class Personality: cls._instance = super().__new__(cls) return cls._instance - def __init__(self, personality_core: str = "", personality_sides: Optional[List[str]] = None): - if personality_sides is None: - personality_sides = [] + def __init__(self, personality_core: str = "", personality_side: str = "", identity: List[str] = None): self.personality_core = personality_core - self.personality_sides = personality_sides + self.personality_side = personality_side + self.identity = identity + self.compress_personality = True + self.compress_identity = True @classmethod def get_instance(cls) -> "Personality": @@ -42,51 +41,17 @@ class Personality: cls._instance = cls() return cls._instance - def _init_big_five_personality(self): # sourcery skip: extract-method - """初始化大五人格特质""" - # 构建文件路径 - personality_file = Path("data/personality") / f"{self.bot_nickname}_personality.per" - - # 如果文件存在,读取文件 - if personality_file.exists(): - with open(personality_file, "r", encoding="utf-8") as f: - personality_data = json.load(f) - self.openness = personality_data.get("openness", 0.5) - self.conscientiousness = personality_data.get("conscientiousness", 0.5) - self.extraversion = personality_data.get("extraversion", 0.5) - self.agreeableness = personality_data.get("agreeableness", 0.5) - self.neuroticism = personality_data.get("neuroticism", 0.5) - else: - # 如果文件不存在,根据personality_core和personality_core来设置大五人格特质 - if "活泼" in self.personality_core or "开朗" in self.personality_sides: - self.extraversion = 0.8 - self.neuroticism = 0.2 - else: - self.extraversion = 0.3 - self.neuroticism = 0.5 - if "认真" in self.personality_core or "负责" in self.personality_sides: - self.conscientiousness = 0.9 - else: - self.conscientiousness = 0.5 - - if "友善" in self.personality_core or "温柔" in self.personality_sides: - self.agreeableness = 0.9 - else: - self.agreeableness = 0.5 - - if "创新" in self.personality_core or "开放" in self.personality_sides: - self.openness = 0.8 - else: - self.openness = 0.5 - @classmethod - def initialize(cls, bot_nickname: str, personality_core: str, personality_sides: List[str]) -> "Personality": + def initialize(cls, bot_nickname: str, personality_core: str, personality_side: str, identity: List[str] = None, compress_personality: bool = True, compress_identity: bool = True) -> "Personality": """初始化人格特质 Args: bot_nickname: 机器人昵称 personality_core: 人格核心特点 - personality_sides: 人格侧面描述 + personality_side: 人格侧面描述 + identity: 身份细节描述 + compress_personality: 是否压缩人格 + compress_identity: 是否压缩身份 Returns: Personality: 初始化后的人格特质实例 @@ -94,21 +59,21 @@ class Personality: instance = cls.get_instance() instance.bot_nickname = bot_nickname instance.personality_core = personality_core - instance.personality_sides = personality_sides - instance._init_big_five_personality() + instance.personality_side = personality_side + instance.identity = identity + instance.compress_personality = compress_personality + instance.compress_identity = compress_identity return instance def to_dict(self) -> Dict: """将人格特质转换为字典格式""" return { - "openness": self.openness, - "conscientiousness": self.conscientiousness, - "extraversion": self.extraversion, - "agreeableness": self.agreeableness, - "neuroticism": self.neuroticism, "bot_nickname": self.bot_nickname, "personality_core": self.personality_core, - "personality_sides": self.personality_sides, + "personality_side": self.personality_side, + "identity": self.identity, + "compress_personality": self.compress_personality, + "compress_identity": self.compress_identity, } @classmethod diff --git a/src/main.py b/src/main.py index 0e85f6945..e97054477 100644 --- a/src/main.py +++ b/src/main.py @@ -116,12 +116,7 @@ class MainSystem: self.app.register_message_handler(chat_bot.message_process) # 初始化个体特征 - await self.individuality.initialize( - bot_nickname=global_config.bot.nickname, - personality_core=global_config.personality.personality_core, - personality_sides=global_config.personality.personality_sides, - identity_detail=global_config.identity.identity_detail, - ) + await self.individuality.initialize() logger.info("个体特征初始化成功") try: diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 923ad49f8..d6d13017c 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "4.0.2" +version = "4.1.1" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -21,25 +21,12 @@ alias_names = ["麦叠", "牢麦"] # 麦麦的别名 # 建议50字以内,描述人格的核心特质 personality_core = "是一个积极向上的女大学生" # 人格的细节,可以描述人格的一些侧面,条数任意,不能为0,不宜太多 -personality_sides = [ - "用一句话或几句话描述人格的一些侧面", - "用一句话或几句话描述人格的一些侧面", - "用一句话或几句话描述人格的一些侧面", -] - -compress_personality = false # 是否压缩人格,压缩后会精简人格信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果人设不长,可以关闭 - - -[identity] +personality_side = "用一句话或几句话描述人格的侧面特质" #アイデンティティがない 生まれないらららら # 可以描述外貌,性别,身高,职业,属性等等描述,条数任意,不能为0 -identity_detail = [ - "年龄为19岁", - "是女孩子", - "身高为160cm", - "有橙色的短发", -] +identity = "年龄为19岁,是女孩子,身高为160cm,有黑色的短发" +compress_personality = false # 是否压缩人格,压缩后会精简人格信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果人设不长,可以关闭 compress_identity = true # 是否压缩身份,压缩后会精简身份信息,节省token消耗并提高回复性能,但是会丢失一些信息,如果不长,可以关闭 [expression] @@ -62,18 +49,18 @@ enable_relationship = true # 是否启用关系系统 relation_frequency = 1 # 关系频率,麦麦构建关系的频率 [chat] #麦麦的聊天通用设置 -auto_focus_threshold = 1 # 自动切换到专注聊天的阈值,越低越容易进入专注聊天 -exit_focus_threshold = 1 # 自动退出专注聊天的阈值,越低越容易退出专注聊天 -# 普通模式下,麦麦会针对感兴趣的消息进行回复,token消耗量较低 -# 专注模式下,麦麦会进行主动的观察,并给出回复,token消耗量略高,但是回复时机更准确 -# 自动模式下,麦麦会根据消息内容自动切换到专注模式或普通模式 +focus_value = 1 +# 麦麦的专注思考能力,越低越容易专注,消耗token也越多 +# 专注时能更好把握发言时机,能够进行持久的连续对话 max_context_size = 25 # 上下文长度 thinking_timeout = 20 # 麦麦一次回复最长思考规划时间,超过这个时间的思考会放弃(往往是api反应太慢) replyer_random_probability = 0.5 # 首要replyer模型被选择的概率 +use_s4u_prompt_mode = false # 是否使用 s4u 对话构建模式,该模式会更好的把握当前对话对象的对话内容,但是对群聊整理理解能力较差(测试功能!!可能有未知问题!!) + + talk_frequency = 1 # 麦麦回复频率,越高,麦麦回复越频繁 -use_s4u_prompt_mode = false # 是否使用 s4u 对话构建模式,该模式会更好的把握当前对话对象的对话内容,但是对群聊整理理解能力较差 time_based_talk_frequency = ["8:00,1", "12:00,1.5", "18:00,2", "01:00,0.5"] # 基于时段的回复频率配置(可选) @@ -87,7 +74,6 @@ talk_frequency_adjust = [ ["qq:114514:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"], ["qq:1919810:private", "8:20,1", "12:10,2", "20:10,1.5", "00:10,0.2"] ] - # 基于聊天流的个性化时段频率配置(可选) # 格式:talk_frequency_adjust = [["platform:id:type", "HH:MM,frequency", ...], ...] # 说明: @@ -120,9 +106,6 @@ response_interested_rate_amplifier = 1 # 麦麦回复兴趣度放大系数 mentioned_bot_inevitable_reply = true # 提及 bot 必然回复 at_bot_inevitable_reply = true # @bot 必然回复(包含提及) -[focus_chat] #专注聊天 -consecutive_replies = 1 # 连续回复能力,值越高,麦麦连续回复的概率越高 - [tool] enable_in_normal_chat = false # 是否在普通聊天中启用工具 enable_in_focus_chat = true # 是否在专注聊天中启用工具