diff --git a/src/chat/express/expression_learner.py b/src/chat/express/expression_learner.py index 71bc2c355..4b32b2a9e 100644 --- a/src/chat/express/expression_learner.py +++ b/src/chat/express/expression_learner.py @@ -48,6 +48,7 @@ def init_prompt() -> None: 例如: 当"对某件事表示十分惊叹,有些意外"时,使用"我嘞个xxxx" 当"表示讽刺的赞同,不想讲道理"时,使用"对对对" +当"表达观点较复杂"时,使用"使用省略主语(3-6个字)"的句法 当"想说明某个具体的事实观点,但懒得明说,或者不便明说,或表达一种默契",使用"懂的都懂" 当"当涉及游戏相关时,表示意外的夸赞,略带戏谑意味"时,使用"这么强!" @@ -56,27 +57,6 @@ def init_prompt() -> None: """ Prompt(learn_style_prompt, "learn_style_prompt") - learn_grammar_prompt = """ -{chat_str} - -请从上面这段群聊中概括除了人名为"SELF"之外的人的语法和句法特点,只考虑纯文字,不要考虑表情包和图片 -1.不要总结【图片】,【动画表情】,[图片],[动画表情],不总结 表情符号 at @ 回复 和[回复] -2.不要涉及具体的人名,只考虑语法和句法特点, -3.语法和句法特点要包括,句子长短(具体字数),有何种语病,如何拆分句子。 -4. 例子仅供参考,请严格根据群聊内容总结!!! -总结成如下格式的规律,总结的内容要简洁,不浮夸: -当"xxx"时,可以"xxx" - -例如: -当"表达观点较复杂"时,使用"省略主语(3-6个字)"的句法 -当"不用详细说明的一般表达"时,使用"非常简洁的句子"的句法 -当"需要单纯简单的确认"时,使用"单字或几个字的肯定(1-2个字)"的句法 - -注意不要总结你自己(SELF)的发言 -现在请你概括 -""" - Prompt(learn_grammar_prompt, "learn_grammar_prompt") - class ExpressionLearner: def __init__(self, chat_id: str) -> None: @@ -176,13 +156,10 @@ class ExpressionLearner: # 学习语言风格 learnt_style = await self.learn_and_store(type="style", num=25) - # 学习句法特点 - learnt_grammar = await self.learn_and_store(type="grammar", num=10) - # 更新学习时间 self.last_learning_time = time.time() - if learnt_style or learnt_grammar: + if learnt_style: logger.info(f"聊天流 {self.chat_name} 表达学习完成") return True else: @@ -195,11 +172,10 @@ class ExpressionLearner: def get_expression_by_chat_id(self) -> Tuple[List[Dict[str, float]], List[Dict[str, float]]]: """ - 获取指定chat_id的style和grammar表达方式 + 获取指定chat_id的style表达方式(已禁用grammar的获取) 返回的每个表达方式字典中都包含了source_id, 用于后续的更新操作 """ learnt_style_expressions = [] - learnt_grammar_expressions = [] # 直接从数据库查询 style_query = Expression.select().where((Expression.chat_id == self.chat_id) & (Expression.type == "style")) @@ -217,26 +193,7 @@ class ExpressionLearner: "create_date": create_date, } ) - grammar_query = Expression.select().where((Expression.chat_id == self.chat_id) & (Expression.type == "grammar")) - for expr in grammar_query: - # 确保create_date存在,如果不存在则使用last_active_time - create_date = expr.create_date if expr.create_date is not None else expr.last_active_time - learnt_grammar_expressions.append( - { - "situation": expr.situation, - "style": expr.style, - "count": expr.count, - "last_active_time": expr.last_active_time, - "source_id": self.chat_id, - "type": "grammar", - "create_date": create_date, - } - ) - return learnt_style_expressions, learnt_grammar_expressions - - - - + return learnt_style_expressions @@ -298,25 +255,16 @@ class ExpressionLearner: return min(0.01, decay) - async def learn_and_store(self, type: str, num: int = 10) -> List[Tuple[str, str, str]]: - # sourcery skip: use-join + async def learn_and_store(self, num: int = 10) -> List[Tuple[str, str, str]]: """ 学习并存储表达方式 - type: "style" or "grammar" """ - if type == "style": - type_str = "语言风格" - elif type == "grammar": - type_str = "句法特点" - else: - raise ValueError(f"Invalid type: {type}") - # 检查是否允许在此聊天流中学习(在函数最前面检查) if not self.can_learn_for_chat(): logger.debug(f"聊天流 {self.chat_name} 不允许学习表达,跳过学习") return [] - res = await self.learn_expression(type, num) + res = await self.learn_expression(num) if res is None: return [] @@ -332,10 +280,10 @@ class ExpressionLearner: learnt_expressions_str = "" for _chat_id, situation, style in learnt_expressions: learnt_expressions_str += f"{situation}->{style}\n" - logger.info(f"在 {group_name} 学习到{type_str}:\n{learnt_expressions_str}") + logger.info(f"在 {group_name} 学习到表达风格:\n{learnt_expressions_str}") if not learnt_expressions: - logger.info(f"没有学习到{type_str}") + logger.info(f"没有学习到表达风格") return [] # 按chat_id分组 @@ -353,7 +301,7 @@ class ExpressionLearner: # 查找是否已存在相似表达方式 query = Expression.select().where( (Expression.chat_id == chat_id) - & (Expression.type == type) + & (Expression.type == "style") & (Expression.situation == new_expr["situation"]) & (Expression.style == new_expr["style"]) ) @@ -373,13 +321,13 @@ class ExpressionLearner: count=1, last_active_time=current_time, chat_id=chat_id, - type=type, + type="style", create_date=current_time, # 手动设置创建日期 ) # 限制最大数量 exprs = list( Expression.select() - .where((Expression.chat_id == chat_id) & (Expression.type == type)) + .where((Expression.chat_id == chat_id) & (Expression.type == "style")) .order_by(Expression.count.asc()) ) if len(exprs) > MAX_EXPRESSION_COUNT: @@ -388,20 +336,14 @@ class ExpressionLearner: expr.delete_instance() return learnt_expressions - async def learn_expression(self, type: str, num: int = 10) -> Optional[Tuple[List[Tuple[str, str, str]], str]]: + async def learn_expression(self, num: int = 10) -> Optional[Tuple[List[Tuple[str, str, str]], str]]: """从指定聊天流学习表达方式 Args: - type: "style" or "grammar" + num: 学习数量 """ - if type == "style": - type_str = "语言风格" - prompt = "learn_style_prompt" - elif type == "grammar": - type_str = "句法特点" - prompt = "learn_grammar_prompt" - else: - raise ValueError(f"Invalid type: {type}") + type_str = "语言风格" + prompt = "learn_style_prompt" current_time = time.time() @@ -510,9 +452,11 @@ class ExpressionLearnerManager: """ 自动将/data/expression/learnt_style 和 learnt_grammar 下所有expressions.json迁移到数据库。 迁移完成后在/data/expression/done.done写入标记文件,存在则跳过。 + 然后检查done.done2,如果没有就删除所有grammar表达并创建该标记文件。 """ base_dir = os.path.join("data", "expression") done_flag = os.path.join(base_dir, "done.done") + done_flag2 = os.path.join(base_dir, "done.done2") # 确保基础目录存在 try: @@ -524,98 +468,113 @@ class ExpressionLearnerManager: if os.path.exists(done_flag): logger.info("表达方式JSON已迁移,无需重复迁移。") - return + else: + logger.info("开始迁移表达方式JSON到数据库...") + migrated_count = 0 - logger.info("开始迁移表达方式JSON到数据库...") - migrated_count = 0 - - for type in ["learnt_style", "learnt_grammar"]: - type_str = "style" if type == "learnt_style" else "grammar" - type_dir = os.path.join(base_dir, type) - if not os.path.exists(type_dir): - logger.debug(f"目录不存在,跳过: {type_dir}") - continue - - try: - chat_ids = os.listdir(type_dir) - logger.debug(f"在 {type_dir} 中找到 {len(chat_ids)} 个聊天ID目录") - except Exception as e: - logger.error(f"读取目录失败 {type_dir}: {e}") - continue - - for chat_id in chat_ids: - expr_file = os.path.join(type_dir, chat_id, "expressions.json") - if not os.path.exists(expr_file): + for type in ["learnt_style", "learnt_grammar"]: + type_str = "style" if type == "learnt_style" else "grammar" + type_dir = os.path.join(base_dir, type) + if not os.path.exists(type_dir): + logger.debug(f"目录不存在,跳过: {type_dir}") continue + try: - with open(expr_file, "r", encoding="utf-8") as f: - expressions = json.load(f) - - if not isinstance(expressions, list): - logger.warning(f"表达方式文件格式错误,跳过: {expr_file}") - continue - - for expr in expressions: - if not isinstance(expr, dict): - continue - - situation = expr.get("situation") - style_val = expr.get("style") - count = expr.get("count", 1) - last_active_time = expr.get("last_active_time", time.time()) - - if not situation or not style_val: - logger.warning(f"表达方式缺少必要字段,跳过: {expr}") - continue - - # 查重:同chat_id+type+situation+style - from src.common.database.database_model import Expression - - query = Expression.select().where( - (Expression.chat_id == chat_id) - & (Expression.type == type_str) - & (Expression.situation == situation) - & (Expression.style == style_val) - ) - if query.exists(): - expr_obj = query.get() - expr_obj.count = max(expr_obj.count, count) - expr_obj.last_active_time = max(expr_obj.last_active_time, last_active_time) - expr_obj.save() - else: - Expression.create( - situation=situation, - style=style_val, - count=count, - last_active_time=last_active_time, - chat_id=chat_id, - type=type_str, - create_date=last_active_time, # 迁移时使用last_active_time作为创建时间 - ) - migrated_count += 1 - logger.info(f"已迁移 {expr_file} 到数据库,包含 {len(expressions)} 个表达方式") - except json.JSONDecodeError as e: - logger.error(f"JSON解析失败 {expr_file}: {e}") + chat_ids = os.listdir(type_dir) + logger.debug(f"在 {type_dir} 中找到 {len(chat_ids)} 个聊天ID目录") except Exception as e: - logger.error(f"迁移表达方式 {expr_file} 失败: {e}") + logger.error(f"读取目录失败 {type_dir}: {e}") + continue - # 标记迁移完成 - try: - # 确保done.done文件的父目录存在 - done_parent_dir = os.path.dirname(done_flag) - if not os.path.exists(done_parent_dir): - os.makedirs(done_parent_dir, exist_ok=True) - logger.debug(f"为done.done创建父目录: {done_parent_dir}") + for chat_id in chat_ids: + expr_file = os.path.join(type_dir, chat_id, "expressions.json") + if not os.path.exists(expr_file): + continue + try: + with open(expr_file, "r", encoding="utf-8") as f: + expressions = json.load(f) - with open(done_flag, "w", encoding="utf-8") as f: - f.write("done\n") - logger.info(f"表达方式JSON迁移已完成,共迁移 {migrated_count} 个表达方式,已写入done.done标记文件") - except PermissionError as e: - logger.error(f"权限不足,无法写入done.done标记文件: {e}") - except OSError as e: - logger.error(f"文件系统错误,无法写入done.done标记文件: {e}") - except Exception as e: - logger.error(f"写入done.done标记文件失败: {e}") + if not isinstance(expressions, list): + logger.warning(f"表达方式文件格式错误,跳过: {expr_file}") + continue + + for expr in expressions: + if not isinstance(expr, dict): + continue + + situation = expr.get("situation") + style_val = expr.get("style") + count = expr.get("count", 1) + last_active_time = expr.get("last_active_time", time.time()) + + if not situation or not style_val: + logger.warning(f"表达方式缺少必要字段,跳过: {expr}") + continue + + # 查重:同chat_id+type+situation+style + from src.common.database.database_model import Expression + + query = Expression.select().where( + (Expression.chat_id == chat_id) + & (Expression.type == type_str) + & (Expression.situation == situation) + & (Expression.style == style_val) + ) + if query.exists(): + expr_obj = query.get() + expr_obj.count = max(expr_obj.count, count) + expr_obj.last_active_time = max(expr_obj.last_active_time, last_active_time) + expr_obj.save() + else: + Expression.create( + situation=situation, + style=style_val, + count=count, + last_active_time=last_active_time, + chat_id=chat_id, + type=type_str, + create_date=last_active_time, # 迁移时使用last_active_time作为创建时间 + ) + migrated_count += 1 + logger.info(f"已迁移 {expr_file} 到数据库,包含 {len(expressions)} 个表达方式") + except json.JSONDecodeError as e: + logger.error(f"JSON解析失败 {expr_file}: {e}") + except Exception as e: + logger.error(f"迁移表达方式 {expr_file} 失败: {e}") + + # 标记迁移完成 + try: + # 确保done.done文件的父目录存在 + done_parent_dir = os.path.dirname(done_flag) + if not os.path.exists(done_parent_dir): + os.makedirs(done_parent_dir, exist_ok=True) + logger.debug(f"为done.done创建父目录: {done_parent_dir}") + + with open(done_flag, "w", encoding="utf-8") as f: + f.write("done\n") + logger.info(f"表达方式JSON迁移已完成,共迁移 {migrated_count} 个表达方式,已写入done.done标记文件") + except PermissionError as e: + logger.error(f"权限不足,无法写入done.done标记文件: {e}") + except OSError as e: + logger.error(f"文件系统错误,无法写入done.done标记文件: {e}") + except Exception as e: + logger.error(f"写入done.done标记文件失败: {e}") + + # 检查并处理grammar表达删除 + if not os.path.exists(done_flag2): + logger.info("开始删除所有grammar类型的表达...") + try: + deleted_count = self.delete_all_grammar_expressions() + logger.info(f"grammar表达删除完成,共删除 {deleted_count} 个表达") + + # 创建done.done2标记文件 + with open(done_flag2, "w", encoding="utf-8") as f: + f.write("done\n") + logger.info("已创建done.done2标记文件,grammar表达删除标记完成") + except Exception as e: + logger.error(f"删除grammar表达或创建标记文件失败: {e}") + else: + logger.info("grammar表达已删除,跳过重复删除") def _migrate_old_data_create_date(self): """ @@ -638,5 +597,40 @@ class ExpressionLearnerManager: except Exception as e: logger.error(f"迁移老数据创建日期失败: {e}") + def delete_all_grammar_expressions(self) -> int: + """ + 检查expression库中所有type为"grammar"的表达并全部删除 + + Returns: + int: 删除的grammar表达数量 + """ + try: + # 查询所有type为"grammar"的表达 + grammar_expressions = Expression.select().where(Expression.type == "grammar") + grammar_count = grammar_expressions.count() + + if grammar_count == 0: + logger.info("expression库中没有找到grammar类型的表达") + return 0 + + logger.info(f"找到 {grammar_count} 个grammar类型的表达,开始删除...") + + # 删除所有grammar类型的表达 + deleted_count = 0 + for expr in grammar_expressions: + try: + expr.delete_instance() + deleted_count += 1 + except Exception as e: + logger.error(f"删除grammar表达失败: {e}") + continue + + logger.info(f"成功删除 {deleted_count} 个grammar类型的表达") + return deleted_count + + except Exception as e: + logger.error(f"删除grammar表达过程中发生错误: {e}") + return 0 + expression_learner_manager = ExpressionLearnerManager() diff --git a/src/chat/express/expression_selector.py b/src/chat/express/expression_selector.py index 652c3aa67..c5d08b61d 100644 --- a/src/chat/express/expression_selector.py +++ b/src/chat/express/expression_selector.py @@ -124,8 +124,8 @@ class ExpressionSelector: return [chat_id] def get_random_expressions( - self, chat_id: str, total_num: int, style_percentage: float, grammar_percentage: float - ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + self, chat_id: str, total_num: int + ) -> List[Dict[str, Any]]: # sourcery skip: extract-duplicate-method, move-assign # 支持多chat_id合并抽选 related_chat_ids = self.get_related_chat_ids(chat_id) @@ -134,9 +134,6 @@ class ExpressionSelector: style_query = Expression.select().where( (Expression.chat_id.in_(related_chat_ids)) & (Expression.type == "style") ) - grammar_query = Expression.select().where( - (Expression.chat_id.in_(related_chat_ids)) & (Expression.type == "grammar") - ) style_exprs = [ { @@ -151,33 +148,13 @@ class ExpressionSelector: for expr in style_query ] - grammar_exprs = [ - { - "situation": expr.situation, - "style": expr.style, - "count": expr.count, - "last_active_time": expr.last_active_time, - "source_id": expr.chat_id, - "type": "grammar", - "create_date": expr.create_date if expr.create_date is not None else expr.last_active_time, - } - for expr in grammar_query - ] - - style_num = int(total_num * style_percentage) - grammar_num = int(total_num * grammar_percentage) # 按权重抽样(使用count作为权重) if style_exprs: style_weights = [expr.get("count", 1) for expr in style_exprs] - selected_style = weighted_sample(style_exprs, style_weights, style_num) + selected_style = weighted_sample(style_exprs, style_weights, total_num) else: selected_style = [] - if grammar_exprs: - grammar_weights = [expr.get("count", 1) for expr in grammar_exprs] - selected_grammar = weighted_sample(grammar_exprs, grammar_weights, grammar_num) - else: - selected_grammar = [] - return selected_style, selected_grammar + return selected_style def update_expressions_count_batch(self, expressions_to_update: List[Dict[str, Any]], increment: float = 0.1): """对一批表达方式更新count值,按chat_id+type分组后一次性写入数据库""" @@ -230,7 +207,7 @@ class ExpressionSelector: return [] # 1. 获取35个随机表达方式(现在按权重抽取) - style_exprs, grammar_exprs = self.get_random_expressions(chat_id, 30, 0.5, 0.5) + style_exprs = self.get_random_expressions(chat_id, 30) # 2. 构建所有表达方式的索引和情境列表 all_expressions = [] @@ -244,14 +221,6 @@ class ExpressionSelector: all_expressions.append(expr_with_type) all_situations.append(f"{len(all_expressions)}.{expr['situation']}") - # 添加grammar表达方式 - for expr in grammar_exprs: - if isinstance(expr, dict) and "situation" in expr and "style" in expr: - expr_with_type = expr.copy() - expr_with_type["type"] = "grammar" - all_expressions.append(expr_with_type) - all_situations.append(f"{len(all_expressions)}.{expr['situation']}") - if not all_expressions: logger.warning("没有找到可用的表达方式") return [] diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 027a9f0e8..52aac4312 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -327,10 +327,7 @@ class DefaultReplyer: use_expression, _, _ = global_config.expression.get_expression_config_for_chat(self.chat_stream.stream_id) if not use_expression: return "" - style_habits = [] - grammar_habits = [] - # 使用从处理器传来的选中表达方式 # LLM模式:调用LLM选择5-10个,然后随机选5个 selected_expressions = await expression_selector.select_suitable_expressions_llm( @@ -341,17 +338,12 @@ class DefaultReplyer: logger.debug(f"使用处理器选中的{len(selected_expressions)}个表达方式") for expr in selected_expressions: if isinstance(expr, dict) and "situation" in expr and "style" in expr: - expr_type = expr.get("type", "style") - if expr_type == "grammar": - grammar_habits.append(f"当{expr['situation']}时,使用 {expr['style']}") - else: - style_habits.append(f"当{expr['situation']}时,使用 {expr['style']}") + style_habits.append(f"当{expr['situation']}时,使用 {expr['style']}") else: logger.debug("没有从处理器获得表达方式,将使用空的表达方式") # 不再在replyer中进行随机选择,全部交给处理器处理 style_habits_str = "\n".join(style_habits) - grammar_habits_str = "\n".join(grammar_habits) # 动态构建expression habits块 expression_habits_block = "" @@ -361,14 +353,6 @@ class DefaultReplyer: "你可以参考以下的语言习惯,当情景合适就使用,但不要生硬使用,以合理的方式结合到你的回复中:" ) expression_habits_block += f"{style_habits_str}\n" - if grammar_habits_str.strip(): - expression_habits_title = ( - "你可以选择下面的句法进行回复,如果情景合适就使用,不要盲目使用,不要生硬使用,以合理的方式使用:" - ) - expression_habits_block += f"{grammar_habits_str}\n" - - if style_habits_str.strip() and grammar_habits_str.strip(): - expression_habits_title = "你可以参考以下的语言习惯和句法,如果情景合适就使用,不要盲目使用,不要生硬使用,以合理的方式结合到你的回复中:" return f"{expression_habits_title}\n{expression_habits_block}"