From 72d011f699c012dbec1085e7d3d7ba2b12059d33 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Thu, 5 Jun 2025 16:15:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E4=BC=98=E5=8C=96=E8=A1=A8=E8=BE=BE?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E5=AD=A6=E4=B9=A0=EF=BC=8C=E5=A4=AA=E4=B9=85?= =?UTF-8?q?=E6=B2=A1=E5=AD=A6=E7=9A=84=E4=BC=9A=E6=8A=9B=E5=BC=83=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E4=BE=9B=E6=A3=80=E6=9F=A5=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/analyze_expressions.py | 203 ++++++++++++++++++ .../expressors/exprssion_learner.py | 92 ++++++-- .../info_processors/relationship_processor.py | 2 +- src/individuality/expression_style.py | 46 ++-- src/person_info/impression_update_task.py | 6 +- src/person_info/relationship_manager.py | 9 +- 6 files changed, 316 insertions(+), 42 deletions(-) create mode 100644 scripts/analyze_expressions.py diff --git a/scripts/analyze_expressions.py b/scripts/analyze_expressions.py new file mode 100644 index 000000000..62e4480ba --- /dev/null +++ b/scripts/analyze_expressions.py @@ -0,0 +1,203 @@ +import os +import json +import time +import re +from datetime import datetime +from typing import Dict, List, Any +import pandas as pd +from pathlib import Path +import sqlite3 + +def clean_group_name(name: str) -> str: + """清理群组名称,只保留中文和英文字符""" + # 提取中文和英文字符 + cleaned = re.sub(r'[^\u4e00-\u9fa5a-zA-Z]', '', name) + # 如果清理后为空,使用当前日期 + if not cleaned: + cleaned = datetime.now().strftime("%Y%m%d") + return cleaned + +def get_group_name(stream_id: str) -> str: + """从数据库中获取群组名称""" + conn = sqlite3.connect("data/maibot.db") + cursor = conn.cursor() + + cursor.execute( + """ + SELECT group_name, user_nickname, platform + FROM chat_streams + WHERE stream_id = ? + """, + (stream_id,), + ) + + result = cursor.fetchone() + conn.close() + + if result: + group_name, user_nickname, platform = result + if group_name: + return clean_group_name(group_name) + if user_nickname: + return clean_group_name(user_nickname) + if platform: + return clean_group_name(f"{platform}{stream_id[:8]}") + return stream_id + +def load_expressions(chat_id: str) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + """加载指定群组的表达方式""" + learnt_style_file = os.path.join("data", "expression", "learnt_style", str(chat_id), "expressions.json") + learnt_grammar_file = os.path.join("data", "expression", "learnt_grammar", str(chat_id), "expressions.json") + personality_file = os.path.join("data", "expression", "personality", "expressions.json") + + style_expressions = [] + grammar_expressions = [] + personality_expressions = [] + + if os.path.exists(learnt_style_file): + with open(learnt_style_file, "r", encoding="utf-8") as f: + style_expressions = json.load(f) + + if os.path.exists(learnt_grammar_file): + with open(learnt_grammar_file, "r", encoding="utf-8") as f: + grammar_expressions = json.load(f) + + if os.path.exists(personality_file): + with open(personality_file, "r", encoding="utf-8") as f: + personality_expressions = json.load(f) + + return style_expressions, grammar_expressions, personality_expressions + +def format_time(timestamp: float) -> str: + """格式化时间戳为可读字符串""" + return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") + +def write_expressions(f, expressions: List[Dict[str, Any]], title: str): + """写入表达方式列表""" + if not expressions: + f.write(f"{title}:暂无数据\n") + f.write("-" * 40 + "\n") + return + + f.write(f"{title}:\n") + for expr in expressions: + count = expr.get("count", 0) + last_active = expr.get("last_active_time", time.time()) + f.write(f"场景: {expr['situation']}\n") + f.write(f"表达: {expr['style']}\n") + f.write(f"计数: {count:.2f}\n") + f.write(f"最后活跃: {format_time(last_active)}\n") + f.write("-" * 40 + "\n") + +def write_group_report(group_file: str, group_name: str, chat_id: str, style_exprs: List[Dict[str, Any]], grammar_exprs: List[Dict[str, Any]]): + """写入群组详细报告""" + with open(group_file, "w", encoding="utf-8") as gf: + gf.write(f"群组: {group_name} (ID: {chat_id})\n") + gf.write("=" * 80 + "\n\n") + + # 写入语言风格 + gf.write("【语言风格】\n") + gf.write("=" * 40 + "\n") + write_expressions(gf, style_exprs, "语言风格") + gf.write("\n") + + # 写入句法特点 + gf.write("【句法特点】\n") + gf.write("=" * 40 + "\n") + write_expressions(gf, grammar_exprs, "句法特点") + +def analyze_expressions(): + """分析所有群组的表达方式""" + # 获取所有群组ID + style_dir = os.path.join("data", "expression", "learnt_style") + chat_ids = [d for d in os.listdir(style_dir) if os.path.isdir(os.path.join(style_dir, d))] + + # 创建输出目录 + output_dir = "data/expression_analysis" + personality_dir = os.path.join(output_dir, "personality") + os.makedirs(output_dir, exist_ok=True) + os.makedirs(personality_dir, exist_ok=True) + + # 生成时间戳 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # 创建总报告 + summary_file = os.path.join(output_dir, f"summary_{timestamp}.txt") + with open(summary_file, "w", encoding="utf-8") as f: + f.write(f"表达方式分析报告 - 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write("=" * 80 + "\n\n") + + # 先处理人格表达 + personality_exprs = [] + personality_file = os.path.join("data", "expression", "personality", "expressions.json") + if os.path.exists(personality_file): + with open(personality_file, "r", encoding="utf-8") as pf: + personality_exprs = json.load(pf) + + # 保存人格表达总数 + total_personality = len(personality_exprs) + + # 排序并取前20条 + personality_exprs.sort(key=lambda x: x.get("count", 0), reverse=True) + personality_exprs = personality_exprs[:20] + + # 写入人格表达报告 + personality_report = os.path.join(personality_dir, f"expressions_{timestamp}.txt") + with open(personality_report, "w", encoding="utf-8") as pf: + pf.write("【人格表达方式】\n") + pf.write("=" * 40 + "\n") + write_expressions(pf, personality_exprs, "人格表达") + + # 写入总报告摘要中的人格表达部分 + f.write("【人格表达方式】\n") + f.write("=" * 40 + "\n") + f.write(f"人格表达总数: {total_personality} (显示前20条)\n") + f.write(f"详细报告: {personality_report}\n") + f.write("-" * 40 + "\n\n") + + # 处理各个群组的表达方式 + f.write("【群组表达方式】\n") + f.write("=" * 40 + "\n\n") + + for chat_id in chat_ids: + style_exprs, grammar_exprs, _ = load_expressions(chat_id) + + # 保存总数 + total_style = len(style_exprs) + total_grammar = len(grammar_exprs) + + # 分别排序 + style_exprs.sort(key=lambda x: x.get("count", 0), reverse=True) + grammar_exprs.sort(key=lambda x: x.get("count", 0), reverse=True) + + # 只取前20条 + style_exprs = style_exprs[:20] + grammar_exprs = grammar_exprs[:20] + + # 获取群组名称 + group_name = get_group_name(chat_id) + + # 创建群组子目录(使用清理后的名称) + safe_group_name = clean_group_name(group_name) + group_dir = os.path.join(output_dir, f"{safe_group_name}_{chat_id}") + os.makedirs(group_dir, exist_ok=True) + + # 写入群组详细报告 + group_file = os.path.join(group_dir, f"expressions_{timestamp}.txt") + write_group_report(group_file, group_name, chat_id, style_exprs, grammar_exprs) + + # 写入总报告摘要 + f.write(f"群组: {group_name} (ID: {chat_id})\n") + f.write("-" * 40 + "\n") + f.write(f"语言风格总数: {total_style} (显示前20条)\n") + f.write(f"句法特点总数: {total_grammar} (显示前20条)\n") + f.write(f"详细报告: {group_file}\n") + f.write("-" * 40 + "\n\n") + + print(f"分析报告已生成:") + print(f"总报告: {summary_file}") + print(f"人格表达报告: {personality_report}") + print(f"各群组详细报告位于: {output_dir}") + +if __name__ == "__main__": + analyze_expressions() \ No newline at end of file diff --git a/src/chat/focus_chat/expressors/exprssion_learner.py b/src/chat/focus_chat/expressors/exprssion_learner.py index d7da9af11..2cc1b0b34 100644 --- a/src/chat/focus_chat/expressors/exprssion_learner.py +++ b/src/chat/focus_chat/expressors/exprssion_learner.py @@ -12,6 +12,8 @@ import json MAX_EXPRESSION_COUNT = 300 +DECAY_DAYS = 30 # 30天衰减到0.01 +DECAY_MIN = 0.01 # 最小衰减值 logger = get_logger("expressor") @@ -30,9 +32,10 @@ def init_prompt() -> None: 当"xxx"时,可以"xxx", xxx不超过10个字 例如: -当"表示十分惊叹"时,使用"我嘞个xxxx" +当"表示十分惊叹,有些意外"时,使用"我嘞个xxxx" 当"表示讽刺的赞同,不想讲道理"时,使用"对对对" -当"想说明某个观点,但懒得明说",使用"懂的都懂" +当"想说明某个观点,但懒得明说,或者不便明说",使用"懂的都懂" +当"表示意外的夸赞,略带戏谑意味"时,使用"这么强!" 注意不要总结你自己(SELF)的发言 现在请你概括 @@ -109,16 +112,62 @@ class ExpressionLearner: """ 学习并存储表达方式,分别学习语言风格和句法特点 """ - learnt_style: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="style", num=15) - if not learnt_style: - return [] + for i in range(3): + learnt_style: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="style", num=15) + if not learnt_style: + return [] - learnt_grammar: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="grammar", num=15) - if not learnt_grammar: - return [] + for i in range(1): + learnt_grammar: Optional[List[Tuple[str, str, str]]] = await self.learn_and_store(type="grammar", num=15) + if not learnt_grammar: + return [] return learnt_style, learnt_grammar + def calculate_decay_factor(self, time_diff_days: float) -> float: + """ + 计算衰减因子 + 当时间差为0天或30天时,衰减值为0.01 + 当时间差为7天时,衰减值为1.0 + 使用二次函数进行曲线插值 + """ + if time_diff_days <= 0 or time_diff_days >= DECAY_DAYS: + return DECAY_MIN + + # 使用二次函数进行插值 + # 将7天作为顶点,0天和30天作为两个端点 + # 使用顶点式:y = a(x-h)^2 + k,其中(h,k)为顶点 + h = 7.0 # 顶点x坐标 + k = 1.0 # 顶点y坐标 + + # 计算a值,使得x=0和x=30时y=0.01 + # 0.01 = a(0-7)^2 + 1 + # 0.01 = a(30-7)^2 + 1 + # 解得a = -0.99/49 + a = -0.99 / 49 + + # 计算衰减因子 + decay = a * (time_diff_days - h) ** 2 + k + return max(DECAY_MIN, min(1.0, decay)) + + def apply_decay_to_expressions(self, expressions: List[Dict[str, Any]], current_time: float) -> List[Dict[str, Any]]: + """ + 对表达式列表应用衰减 + 返回衰减后的表达式列表,移除count小于0的项 + """ + result = [] + for expr in expressions: + last_active = expr.get("last_active_time", current_time) + time_diff_days = (current_time - last_active) / (24 * 3600) # 转换为天 + + decay_factor = self.calculate_decay_factor(time_diff_days) + expr["count"] = expr.get("count", 1) * decay_factor + + if expr["count"] > 0: + result.append(expr) + + return result + async def learn_and_store(self, type: str, num: int = 10) -> List[Tuple[str, str, str]]: """ 选择从当前到最近1小时内的随机num条消息,然后学习这些消息的表达方式 @@ -130,7 +179,7 @@ class ExpressionLearner: type_str = "句法特点" else: raise ValueError(f"Invalid type: {type}") - # logger.info(f"开始学习{type_str}...") + res = await self.learn_expression(type, num) if res is None: @@ -146,7 +195,6 @@ class ExpressionLearner: for _chat_id, situation, style in learnt_expressions: learnt_expressions_str += f"{situation}->{style}\n" logger.info(f"在 {group_name} 学习到{type_str}:\n{learnt_expressions_str}") - # learnt_expressions: List[(chat_id, situation, style)] if not learnt_expressions: logger.info(f"没有学习到{type_str}") @@ -158,29 +206,27 @@ class ExpressionLearner: if chat_id not in chat_dict: chat_dict[chat_id] = [] chat_dict[chat_id].append({"situation": situation, "style": style}) + + current_time = time.time() + # 存储到/data/expression/对应chat_id/expressions.json for chat_id, expr_list in chat_dict.items(): dir_path = os.path.join("data", "expression", f"learnt_{type}", str(chat_id)) os.makedirs(dir_path, exist_ok=True) file_path = os.path.join(dir_path, "expressions.json") + # 若已存在,先读出合并 + old_data: List[Dict[str, Any]] = [] if os.path.exists(file_path): - old_data: List[Dict[str, str, str]] = [] try: with open(file_path, "r", encoding="utf-8") as f: old_data = json.load(f) except Exception: old_data = [] - else: - old_data = [] - # 超过最大数量时,20%概率移除count=1的项 - if len(old_data) >= MAX_EXPRESSION_COUNT: - new_old_data = [] - for item in old_data: - if item.get("count", 1) == 1 and random.random() < 0.2: - continue # 20%概率移除 - new_old_data.append(item) - old_data = new_old_data + + # 应用衰减 + old_data = self.apply_decay_to_expressions(old_data, current_time) + # 合并逻辑 for new_expr in expr_list: found = False @@ -194,12 +240,16 @@ class ExpressionLearner: old_expr["situation"] = new_expr["situation"] old_expr["style"] = new_expr["style"] old_expr["count"] = old_expr.get("count", 1) + 1 + old_expr["last_active_time"] = current_time break if not found: new_expr["count"] = 1 + new_expr["last_active_time"] = current_time old_data.append(new_expr) + with open(file_path, "w", encoding="utf-8") as f: json.dump(old_data, f, ensure_ascii=False, indent=2) + return learnt_expressions async def learn_expression(self, type: str, num: int = 10) -> Optional[Tuple[List[Tuple[str, str, str]], str]]: diff --git a/src/chat/focus_chat/info_processors/relationship_processor.py b/src/chat/focus_chat/info_processors/relationship_processor.py index 19c31f99c..d0614ad2e 100644 --- a/src/chat/focus_chat/info_processors/relationship_processor.py +++ b/src/chat/focus_chat/info_processors/relationship_processor.py @@ -49,7 +49,7 @@ class RelationshipProcessor(BaseProcessor): self.llm_model = LLMRequest( model=global_config.model.relation, max_tokens=800, - request_type="focus.processor.self_identify", + request_type="relation", ) name = chat_manager.get_stream_name(self.subheartflow_id) diff --git a/src/individuality/expression_style.py b/src/individuality/expression_style.py index fe81eac5b..20d20bfcc 100644 --- a/src/individuality/expression_style.py +++ b/src/individuality/expression_style.py @@ -7,15 +7,18 @@ from typing import List, Tuple import os import json from datetime import datetime +from src.individuality.individuality import individuality logger = get_logger("expressor") def init_prompt() -> None: personality_expression_prompt = """ -{personality} +你的人物设定:{personality} -请从以上人设中总结出这个角色可能的语言风格,你必须严格根据人设引申,不要输出例子 +你说话的表达方式:{expression_style} + +请从以上表达方式中总结出这个角色可能的语言风格,你必须严格根据人设引申,不要输出例子 思考回复的特殊内容和情感 思考有没有特殊的梗,一并总结成语言风格 总结成如下格式的规律,总结的内容要详细,但具有概括性: @@ -80,19 +83,27 @@ class PersonalityExpression: """ 检查data/expression/personality目录,不存在则创建。 用peronality变量作为chat_str,调用LLM生成表达风格,解析后count=100,存储到expressions.json。 - 如果expression_style发生变化,则删除旧的expressions.json并重置计数。 + 如果expression_style、personality或identity发生变化,则删除旧的expressions.json并重置计数。 对于相同的expression_style,最多计算self.max_calculations次。 """ os.makedirs(os.path.dirname(self.expressions_file_path), exist_ok=True) current_style_text = global_config.expression.expression_style + current_personality = individuality.get_personality_prompt(x_person=2, level=2) + current_identity = individuality.get_identity_prompt(x_person=2, level=2) + meta_data = self._read_meta_data() last_style_text = meta_data.get("last_style_text") + last_personality = meta_data.get("last_personality") + last_identity = meta_data.get("last_identity") count = meta_data.get("count", 0) - if current_style_text != last_style_text: - logger.info(f"表达风格已从 '{last_style_text}' 变为 '{current_style_text}'。重置计数。") + # 检查是否有任何变化 + if (current_style_text != last_style_text or + current_personality != last_personality or + current_identity != last_identity): + logger.info(f"检测到变化:\n风格: '{last_style_text}' -> '{current_style_text}'\n人格: '{last_personality}' -> '{current_personality}'\n身份: '{last_identity}' -> '{current_identity}'") count = 0 if os.path.exists(self.expressions_file_path): try: @@ -102,11 +113,13 @@ class PersonalityExpression: logger.error(f"删除旧的表达文件 {self.expressions_file_path} 失败: {e}") if count >= self.max_calculations: - logger.debug(f"对于风格 '{current_style_text}' 已达到最大计算次数 ({self.max_calculations})。跳过提取。") - # 即使跳过,也更新元数据以反映当前风格已被识别且计数已满 + logger.debug(f"对于当前配置已达到最大计算次数 ({self.max_calculations})。跳过提取。") + # 即使跳过,也更新元数据以反映当前配置已被识别且计数已满 self._write_meta_data( { "last_style_text": current_style_text, + "last_personality": current_personality, + "last_identity": current_identity, "count": count, "last_update_time": meta_data.get("last_update_time"), } @@ -116,18 +129,20 @@ class PersonalityExpression: # 构建prompt prompt = await global_prompt_manager.format_prompt( "personality_expression_prompt", - personality=current_style_text, + personality=current_personality, + expression_style=current_style_text, ) - # logger.info(f"个性表达方式提取prompt: {prompt}") try: response, _ = await self.express_learn_model.generate_response_async(prompt) except Exception as e: logger.error(f"个性表达方式提取失败: {e}") - # 如果提取失败,保存当前的风格和未增加的计数 + # 如果提取失败,保存当前的配置和未增加的计数 self._write_meta_data( { "last_style_text": current_style_text, + "last_personality": current_personality, + "last_identity": current_identity, "count": count, "last_update_time": meta_data.get("last_update_time"), } @@ -135,7 +150,6 @@ class PersonalityExpression: return logger.info(f"个性表达方式提取response: {response}") - # chat_id用personality # 转为dict并count=100 if response != "": @@ -183,9 +197,15 @@ class PersonalityExpression: count += 1 current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self._write_meta_data( - {"last_style_text": current_style_text, "count": count, "last_update_time": current_time} + { + "last_style_text": current_style_text, + "last_personality": current_personality, + "last_identity": current_identity, + "count": count, + "last_update_time": current_time + } ) - logger.info(f"成功处理。风格 '{current_style_text}' 的计数现在是 {count},最后更新时间:{current_time}。") + logger.info(f"成功处理。当前配置的计数现在是 {count},最后更新时间:{current_time}。") else: logger.warning(f"个性表达方式提取失败,模型返回空内容: {response}") diff --git a/src/person_info/impression_update_task.py b/src/person_info/impression_update_task.py index 52e09725b..56da12ca2 100644 --- a/src/person_info/impression_update_task.py +++ b/src/person_info/impression_update_task.py @@ -17,12 +17,12 @@ class ImpressionUpdateTask(AsyncTask): super().__init__( task_name="impression_update", wait_before_start=5, # 启动后等待10秒 - run_interval=10, # 每1分钟运行一次 + run_interval=20, # 每1分钟运行一次 ) async def run(self): try: - if random.random() < 0.5: + if random.random() < 0.1: # 获取最近10分钟的消息 current_time = int(time.time()) start_time = current_time - 6000 # 10分钟前 @@ -30,7 +30,7 @@ class ImpressionUpdateTask(AsyncTask): else: now = int(time.time()) # 30天前的时间戳 - month_ago = now - 30 * 24 * 60 * 60 + month_ago = now - 90 * 24 * 60 * 60 # 随机选择一个小时的起点 random_start = random.randint(month_ago, now - 3600) start_time = random_start diff --git a/src/person_info/relationship_manager.py b/src/person_info/relationship_manager.py index 331e377b4..2b55e8c92 100644 --- a/src/person_info/relationship_manager.py +++ b/src/person_info/relationship_manager.py @@ -228,7 +228,7 @@ class RelationshipManager: readable_messages = build_readable_messages( messages=user_messages, replace_bot_name=True, - timestamp_mode="relative", + timestamp_mode="normal", truncate=False) @@ -263,7 +263,8 @@ class RelationshipManager: new_impression, _ = await self.relationship_llm.generate_response_async(prompt=prompt) - logger.debug(f"new_impression: {new_impression}") + logger.info(f"prompt: {prompt}") + logger.info(f"new_impression: {new_impression}") prompt_json = f""" 你的名字是{global_config.bot.nickname},别名是{alias_str}。 @@ -274,8 +275,8 @@ class RelationshipManager: 请用json格式总结对{person_name}(昵称:{nickname})的印象,要求: 1.总结出这个人的最核心的性格,可能在这段话里看不出,总结不出来的话,就输出空字符串 -2.尝试猜测这个人的性别,如果看不出来,就输出空字符串 -3.尝试猜测自己与这个人的关系,你与ta的交互,还可以思考是积极还是消极,以及具体内容 +2.尝试猜测这个人的性别 +3.尝试猜测自己与这个人的关系,你与ta的交互,思考是积极还是消极,以及具体内容 4.尝试猜测这个人的身份,比如职业,兴趣爱好,生活状态等 5.尝试总结你与他之间是否有一些独特的梗,如果有,就输出梗的内容,如果没有,就输出空字符串