From 8ef00ee5713899d28f5c95edf497a7b94df64c79 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Fri, 7 Mar 2025 00:09:36 +0800 Subject: [PATCH] v0.5.9 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复了记忆刷屏 加入了又新又好错别字生成器 增加了记忆过滤 --- README.md | 3 +- src/plugins/chat/emoji_manager.py | 10 - src/plugins/chat/prompt_builder.py | 3 +- src/plugins/chat/utils.py | 90 +- src/plugins/memory_system/memory.py | 27 +- .../memory_system/memory_manual_build.py | 33 +- src/plugins/utils/typo_generator.py | 437 +++++++++ src/test/typo.py | 827 ++++++++---------- 8 files changed, 883 insertions(+), 547 deletions(-) create mode 100644 src/plugins/utils/typo_generator.py diff --git a/README.md b/README.md index 73e1c3094..96c857bc7 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,8 @@ - 改进表情包发送逻辑 - 自动生成的回复逻辑,例如自生成的回复方向,回复风格 - 采用截断生成加快麦麦的反应速度 -- 改进发送消息的触发: +- 改进发送消息的触发 +- ## 📌 注意事项 纯编程外行,面向cursor编程,很多代码史一样多多包涵 diff --git a/src/plugins/chat/emoji_manager.py b/src/plugins/chat/emoji_manager.py index 4b81302b1..ede0d7135 100644 --- a/src/plugins/chat/emoji_manager.py +++ b/src/plugins/chat/emoji_manager.py @@ -29,16 +29,6 @@ config = driver.config class EmojiManager: _instance = None EMOJI_DIR = "data/emoji" # 表情包存储目录 - - EMOTION_KEYWORDS = { - 'happy': ['开心', '快乐', '高兴', '欢喜', '笑', '喜悦', '兴奋', '愉快', '乐', '好'], - 'angry': ['生气', '愤怒', '恼火', '不爽', '火大', '怒', '气愤', '恼怒', '发火', '不满'], - 'sad': ['伤心', '难过', '悲伤', '痛苦', '哭', '忧伤', '悲痛', '哀伤', '委屈', '失落'], - 'surprised': ['惊讶', '震惊', '吃惊', '意外', '惊', '诧异', '惊奇', '惊喜', '不敢相信', '目瞪口呆'], - 'disgusted': ['恶心', '讨厌', '厌恶', '反感', '嫌弃', '恶', '嫌恶', '憎恶', '不喜欢', '烦'], - 'fearful': ['害怕', '恐惧', '惊恐', '担心', '怕', '惊吓', '惊慌', '畏惧', '胆怯', '惧'], - 'neutral': ['普通', '一般', '还行', '正常', '平静', '平淡', '一般般', '凑合', '还好', '就这样'] - } def __new__(cls): if cls._instance is None: diff --git a/src/plugins/chat/prompt_builder.py b/src/plugins/chat/prompt_builder.py index 1c510e251..57795283b 100644 --- a/src/plugins/chat/prompt_builder.py +++ b/src/plugins/chat/prompt_builder.py @@ -84,7 +84,8 @@ class PromptBuilder: relevant_memories = await hippocampus.get_relevant_memories( text=message_txt, max_topics=5, - similarity_threshold=0.4 + similarity_threshold=0.4, + max_memory_num=5 ) if relevant_memories: diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index 63daf6680..18f1ed7a9 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -13,6 +13,7 @@ from nonebot import get_driver from ..models.utils_model import LLM_request import aiohttp import jieba +from ..utils.typo_generator import ChineseTypoGenerator driver = get_driver() config = driver.config @@ -285,75 +286,6 @@ def split_into_sentences_w_remove_punctuation(text: str) -> List[str]: print(f"处理后的句子: {sentences_done}") return sentences_done -# 常见的错别字映射 -TYPO_DICT = { - '的': '地得', - '了': '咯啦勒', - '吗': '嘛麻', - '吧': '八把罢', - '是': '事', - '在': '再在', - '和': '合', - '有': '又', - '我': '沃窝喔', - '你': '泥尼拟', - '他': '它她塔祂', - '们': '门', - '啊': '阿哇', - '呢': '呐捏', - '都': '豆读毒', - '很': '狠', - '会': '回汇', - '去': '趣取曲', - '做': '作坐', - '想': '相像', - '说': '说税睡', - '看': '砍堪刊', - '来': '来莱赖', - '好': '号毫豪', - '给': '给既继', - '过': '锅果裹', - '能': '嫩', - '为': '位未', - '什': '甚深伸', - '么': '末麽嘛', - '话': '话花划', - '知': '织直值', - '道': '到', - '听': '听停挺', - '见': '见件建', - '觉': '觉脚搅', - '得': '得德锝', - '着': '着找招', - '像': '向象想', - '等': '等灯登', - '谢': '谢写卸', - '对': '对队', - '里': '里理鲤', - '啦': '啦拉喇', - '吃': '吃持迟', - '哦': '哦喔噢', - '呀': '呀压', - '要': '药', - '太': '太抬台', - '快': '块', - '点': '店', - '以': '以已', - '因': '因应', - '啥': '啥沙傻', - '行': '行型形', - '哈': '哈蛤铪', - '嘿': '嘿黑嗨', - '嗯': '嗯恩摁', - '哎': '哎爱埃', - '呜': '呜屋污', - '喂': '喂位未', - '嘛': '嘛麻马', - '嗨': '嗨害亥', - '哇': '哇娃蛙', - '咦': '咦意易', - '嘻': '嘻西希' -} def random_remove_punctuation(text: str) -> str: """随机处理标点符号,模拟人类打字习惯 @@ -381,17 +313,6 @@ def random_remove_punctuation(text: str) -> str: result += char return result -def add_typos(text: str) -> str: - TYPO_RATE = 0.02 # 控制错别字出现的概率(2%) - result = "" - for char in text: - if char in TYPO_DICT and random.random() < TYPO_RATE: - # 从可能的错别字中随机选择一个 - typos = TYPO_DICT[char] - result += random.choice(typos) - else: - result += char - return result def process_llm_response(text: str) -> List[str]: # processed_response = process_text_with_typos(content) @@ -399,7 +320,14 @@ def process_llm_response(text: str) -> List[str]: print(f"回复过长 ({len(text)} 字符),返回默认回复") return ['懒得说'] # 处理长消息 - sentences = split_into_sentences_w_remove_punctuation(add_typos(text)) + typo_generator = ChineseTypoGenerator( + error_rate=0.03, + min_freq=7, + tone_error_rate=0.2, + word_replace_rate=0.02 + ) + typoed_text = typo_generator.create_typo_sentence(text)[0] + sentences = split_into_sentences_w_remove_punctuation(typoed_text) # 检查分割后的消息数量是否过多(超过3条) if len(sentences) > 4: print(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复") diff --git a/src/plugins/memory_system/memory.py b/src/plugins/memory_system/memory.py index cdb6e6e1b..840980783 100644 --- a/src/plugins/memory_system/memory.py +++ b/src/plugins/memory_system/memory.py @@ -181,13 +181,19 @@ class Hippocampus: topic_num = self.calculate_topic_num(input_text, compress_rate) topics_response = await self.llm_model_get_topic.generate_response(self.find_topic_llm(input_text, topic_num)) # 修改话题处理逻辑 - print(f"话题: {topics_response[0]}") - topics = [topic.strip() for topic in topics_response[0].replace(",", ",").replace("、", ",").replace(" ", ",").split(",") if topic.strip()] - print(f"话题: {topics}") + # 定义需要过滤的关键词 + filter_keywords = ['表情包', '图片', '回复', '聊天记录'] - # 创建所有话题的请求任务 + # 过滤topics + topics = [topic.strip() for topic in topics_response[0].replace(",", ",").replace("、", ",").replace(" ", ",").split(",") if topic.strip()] + filtered_topics = [topic for topic in topics if not any(keyword in topic for keyword in filter_keywords)] + + # print(f"原始话题: {topics}") + print(f"过滤后话题: {filtered_topics}") + + # 使用过滤后的话题继续处理 tasks = [] - for topic in topics: + for topic in filtered_topics: topic_what_prompt = self.topic_what(input_text, topic) # 创建异步任务 task = self.llm_model_summary.generate_response_async(topic_what_prompt) @@ -501,9 +507,9 @@ class Hippocampus: list: 识别出的主题列表 """ topics_response = await self.llm_model_get_topic.generate_response(self.find_topic_llm(text, 5)) - print(f"话题: {topics_response[0]}") + # print(f"话题: {topics_response[0]}") topics = [topic.strip() for topic in topics_response[0].replace(",", ",").replace("、", ",").replace(" ", ",").split(",") if topic.strip()] - print(f"话题: {topics}") + # print(f"话题: {topics}") return topics @@ -579,7 +585,7 @@ class Hippocampus: print(f"\033[1;32m[记忆激活]\033[0m 识别出的主题: {identified_topics}") if not identified_topics: - print(f"\033[1;32m[记忆激活]\033[0m 未识别出主题,返回0") + # print(f"\033[1;32m[记忆激活]\033[0m 未识别出主题,返回0") return 0 # 查找相似主题 @@ -644,7 +650,7 @@ class Hippocampus: return int(activation) - async def get_relevant_memories(self, text: str, max_topics: int = 5, similarity_threshold: float = 0.4) -> list: + async def get_relevant_memories(self, text: str, max_topics: int = 5, similarity_threshold: float = 0.4, max_memory_num: int = 5) -> list: """根据输入文本获取相关的记忆内容""" # 识别主题 identified_topics = await self._identify_topics(text) @@ -665,6 +671,9 @@ class Hippocampus: # 获取该主题的记忆内容 first_layer, _ = self.memory_graph.get_related_item(topic, depth=1) if first_layer: + # 如果记忆条数超过限制,随机选择指定数量的记忆 + if len(first_layer) > max_memory_num: + first_layer = random.sample(first_layer, max_memory_num) # 为每条记忆添加来源主题和相似度信息 for memory in first_layer: relevant_memories.append({ diff --git a/src/plugins/memory_system/memory_manual_build.py b/src/plugins/memory_system/memory_manual_build.py index d6aa2f669..950f01afa 100644 --- a/src/plugins/memory_system/memory_manual_build.py +++ b/src/plugins/memory_system/memory_manual_build.py @@ -234,16 +234,22 @@ class Hippocampus: async def memory_compress(self, input_text, compress_rate=0.1): print(input_text) - #获取topics topic_num = self.calculate_topic_num(input_text, compress_rate) - topics_response = await self.llm_model_get_topic.generate_response_async(self.find_topic_llm(input_text, topic_num)) + topics_response = self.llm_model_get_topic.generate_response(self.find_topic_llm(input_text, topic_num)) # 修改话题处理逻辑 + # 定义需要过滤的关键词 + filter_keywords = ['表情包', '图片', '回复', '聊天记录'] + + # 过滤topics topics = [topic.strip() for topic in topics_response[0].replace(",", ",").replace("、", ",").replace(" ", ",").split(",") if topic.strip()] - print(f"话题: {topics}") + filtered_topics = [topic for topic in topics if not any(keyword in topic for keyword in filter_keywords)] + + # print(f"原始话题: {topics}") + print(f"过滤后话题: {filtered_topics}") # 创建所有话题的请求任务 tasks = [] - for topic in topics: + for topic in filtered_topics: topic_what_prompt = self.topic_what(input_text, topic) # 创建异步任务 task = self.llm_model_small.generate_response_async(topic_what_prompt) @@ -650,7 +656,22 @@ def visualize_graph_lite(memory_graph: Memory_graph, color_by_memory: bool = Fal G = memory_graph.G # 创建一个新图用于可视化 - H = G.copy() + H = G.copy() + + # 过滤掉内容数量小于2的节点 + nodes_to_remove = [] + for node in H.nodes(): + memory_items = H.nodes[node].get('memory_items', []) + memory_count = len(memory_items) if isinstance(memory_items, list) else (1 if memory_items else 0) + if memory_count < 2: + nodes_to_remove.append(node) + + H.remove_nodes_from(nodes_to_remove) + + # 如果没有符合条件的节点,直接返回 + if len(H.nodes()) == 0: + print("没有找到内容数量大于等于2的节点") + return # 计算节点大小和颜色 node_colors = [] @@ -704,7 +725,7 @@ def visualize_graph_lite(memory_graph: Memory_graph, color_by_memory: bool = Fal edge_color='gray', width=1.5) # 统一的边宽度 - title = '记忆图谱可视化 - 节点大小表示记忆数量\n节点颜色:蓝(弱连接)到红(强连接)渐变,边的透明度表示连接强度\n连接强度越大的节点距离越近' + title = '记忆图谱可视化(仅显示内容≥2的节点)\n节点大小表示记忆数量\n节点颜色:蓝(弱连接)到红(强连接)渐变,边的透明度表示连接强度\n连接强度越大的节点距离越近' plt.title(title, fontsize=16, fontfamily='SimHei') plt.show() diff --git a/src/plugins/utils/typo_generator.py b/src/plugins/utils/typo_generator.py new file mode 100644 index 000000000..16834200f --- /dev/null +++ b/src/plugins/utils/typo_generator.py @@ -0,0 +1,437 @@ +""" +错别字生成器 - 基于拼音和字频的中文错别字生成工具 +""" + +from pypinyin import pinyin, Style +from collections import defaultdict +import json +import os +import jieba +from pathlib import Path +import random +import math +import time + +class ChineseTypoGenerator: + def __init__(self, + error_rate=0.3, + min_freq=5, + tone_error_rate=0.2, + word_replace_rate=0.3, + max_freq_diff=200): + """ + 初始化错别字生成器 + + 参数: + error_rate: 单字替换概率 + min_freq: 最小字频阈值 + tone_error_rate: 声调错误概率 + word_replace_rate: 整词替换概率 + max_freq_diff: 最大允许的频率差异 + """ + self.error_rate = error_rate + self.min_freq = min_freq + self.tone_error_rate = tone_error_rate + self.word_replace_rate = word_replace_rate + self.max_freq_diff = max_freq_diff + + # 加载数据 + print("正在加载汉字数据库,请稍候...") + self.pinyin_dict = self._create_pinyin_dict() + self.char_frequency = self._load_or_create_char_frequency() + + def _load_or_create_char_frequency(self): + """ + 加载或创建汉字频率字典 + """ + cache_file = Path("char_frequency.json") + + # 如果缓存文件存在,直接加载 + if cache_file.exists(): + with open(cache_file, 'r', encoding='utf-8') as f: + return json.load(f) + + # 使用内置的词频文件 + char_freq = defaultdict(int) + dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt') + + # 读取jieba的词典文件 + with open(dict_path, 'r', encoding='utf-8') as f: + for line in f: + word, freq = line.strip().split()[:2] + # 对词中的每个字进行频率累加 + for char in word: + if self._is_chinese_char(char): + char_freq[char] += int(freq) + + # 归一化频率值 + max_freq = max(char_freq.values()) + normalized_freq = {char: freq/max_freq * 1000 for char, freq in char_freq.items()} + + # 保存到缓存文件 + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(normalized_freq, f, ensure_ascii=False, indent=2) + + return normalized_freq + + def _create_pinyin_dict(self): + """ + 创建拼音到汉字的映射字典 + """ + # 常用汉字范围 + chars = [chr(i) for i in range(0x4e00, 0x9fff)] + pinyin_dict = defaultdict(list) + + # 为每个汉字建立拼音映射 + for char in chars: + try: + py = pinyin(char, style=Style.TONE3)[0][0] + pinyin_dict[py].append(char) + except Exception: + continue + + return pinyin_dict + + def _is_chinese_char(self, char): + """ + 判断是否为汉字 + """ + try: + return '\u4e00' <= char <= '\u9fff' + except: + return False + + def _get_pinyin(self, sentence): + """ + 将中文句子拆分成单个汉字并获取其拼音 + """ + # 将句子拆分成单个字符 + characters = list(sentence) + + # 获取每个字符的拼音 + result = [] + for char in characters: + # 跳过空格和非汉字字符 + if char.isspace() or not self._is_chinese_char(char): + continue + # 获取拼音(数字声调) + py = pinyin(char, style=Style.TONE3)[0][0] + result.append((char, py)) + + return result + + def _get_similar_tone_pinyin(self, py): + """ + 获取相似声调的拼音 + """ + # 检查拼音是否为空或无效 + if not py or len(py) < 1: + return py + + # 如果最后一个字符不是数字,说明可能是轻声或其他特殊情况 + if not py[-1].isdigit(): + # 为非数字结尾的拼音添加数字声调1 + return py + '1' + + base = py[:-1] # 去掉声调 + tone = int(py[-1]) # 获取声调 + + # 处理轻声(通常用5表示)或无效声调 + if tone not in [1, 2, 3, 4]: + return base + str(random.choice([1, 2, 3, 4])) + + # 正常处理声调 + possible_tones = [1, 2, 3, 4] + possible_tones.remove(tone) # 移除原声调 + new_tone = random.choice(possible_tones) # 随机选择一个新声调 + return base + str(new_tone) + + def _calculate_replacement_probability(self, orig_freq, target_freq): + """ + 根据频率差计算替换概率 + """ + if target_freq > orig_freq: + return 1.0 # 如果替换字频率更高,保持原有概率 + + freq_diff = orig_freq - target_freq + if freq_diff > self.max_freq_diff: + return 0.0 # 频率差太大,不替换 + + # 使用指数衰减函数计算概率 + # 频率差为0时概率为1,频率差为max_freq_diff时概率接近0 + return math.exp(-3 * freq_diff / self.max_freq_diff) + + def _get_similar_frequency_chars(self, char, py, num_candidates=5): + """ + 获取与给定字频率相近的同音字,可能包含声调错误 + """ + homophones = [] + + # 有一定概率使用错误声调 + if random.random() < self.tone_error_rate: + wrong_tone_py = self._get_similar_tone_pinyin(py) + homophones.extend(self.pinyin_dict[wrong_tone_py]) + + # 添加正确声调的同音字 + homophones.extend(self.pinyin_dict[py]) + + if not homophones: + return None + + # 获取原字的频率 + orig_freq = self.char_frequency.get(char, 0) + + # 计算所有同音字与原字的频率差,并过滤掉低频字 + freq_diff = [(h, self.char_frequency.get(h, 0)) + for h in homophones + if h != char and self.char_frequency.get(h, 0) >= self.min_freq] + + if not freq_diff: + return None + + # 计算每个候选字的替换概率 + candidates_with_prob = [] + for h, freq in freq_diff: + prob = self._calculate_replacement_probability(orig_freq, freq) + if prob > 0: # 只保留有效概率的候选字 + candidates_with_prob.append((h, prob)) + + if not candidates_with_prob: + return None + + # 根据概率排序 + candidates_with_prob.sort(key=lambda x: x[1], reverse=True) + + # 返回概率最高的几个字 + return [char for char, _ in candidates_with_prob[:num_candidates]] + + def _get_word_pinyin(self, word): + """ + 获取词语的拼音列表 + """ + return [py[0] for py in pinyin(word, style=Style.TONE3)] + + def _segment_sentence(self, sentence): + """ + 使用jieba分词,返回词语列表 + """ + return list(jieba.cut(sentence)) + + def _get_word_homophones(self, word): + """ + 获取整个词的同音词,只返回高频的有意义词语 + """ + if len(word) == 1: + return [] + + # 获取词的拼音 + word_pinyin = self._get_word_pinyin(word) + + # 遍历所有可能的同音字组合 + candidates = [] + for py in word_pinyin: + chars = self.pinyin_dict.get(py, []) + if not chars: + return [] + candidates.append(chars) + + # 生成所有可能的组合 + import itertools + all_combinations = itertools.product(*candidates) + + # 获取jieba词典和词频信息 + dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt') + valid_words = {} # 改用字典存储词语及其频率 + with open(dict_path, 'r', encoding='utf-8') as f: + for line in f: + parts = line.strip().split() + if len(parts) >= 2: + word_text = parts[0] + word_freq = float(parts[1]) # 获取词频 + valid_words[word_text] = word_freq + + # 获取原词的词频作为参考 + original_word_freq = valid_words.get(word, 0) + min_word_freq = original_word_freq * 0.1 # 设置最小词频为原词频的10% + + # 过滤和计算频率 + homophones = [] + for combo in all_combinations: + new_word = ''.join(combo) + if new_word != word and new_word in valid_words: + new_word_freq = valid_words[new_word] + # 只保留词频达到阈值的词 + if new_word_freq >= min_word_freq: + # 计算词的平均字频(考虑字频和词频) + char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word) + # 综合评分:结合词频和字频 + combined_score = (new_word_freq * 0.7 + char_avg_freq * 0.3) + if combined_score >= self.min_freq: + homophones.append((new_word, combined_score)) + + # 按综合分数排序并限制返回数量 + sorted_homophones = sorted(homophones, key=lambda x: x[1], reverse=True) + return [word for word, _ in sorted_homophones[:5]] # 限制返回前5个结果 + + def create_typo_sentence(self, sentence): + """ + 创建包含同音字错误的句子,支持词语级别和字级别的替换 + + 参数: + sentence: 输入的中文句子 + + 返回: + typo_sentence: 包含错别字的句子 + typo_info: 错别字信息列表 + """ + result = [] + typo_info = [] + + # 分词 + words = self._segment_sentence(sentence) + + for word in words: + # 如果是标点符号或空格,直接添加 + if all(not self._is_chinese_char(c) for c in word): + result.append(word) + continue + + # 获取词语的拼音 + word_pinyin = self._get_word_pinyin(word) + + # 尝试整词替换 + if len(word) > 1 and random.random() < self.word_replace_rate: + word_homophones = self._get_word_homophones(word) + if word_homophones: + typo_word = random.choice(word_homophones) + # 计算词的平均频率 + orig_freq = sum(self.char_frequency.get(c, 0) for c in word) / len(word) + typo_freq = sum(self.char_frequency.get(c, 0) for c in typo_word) / len(typo_word) + + # 添加到结果中 + result.append(typo_word) + typo_info.append((word, typo_word, + ' '.join(word_pinyin), + ' '.join(self._get_word_pinyin(typo_word)), + orig_freq, typo_freq)) + continue + + # 如果不进行整词替换,则进行单字替换 + if len(word) == 1: + char = word + py = word_pinyin[0] + if random.random() < self.error_rate: + similar_chars = self._get_similar_frequency_chars(char, py) + if similar_chars: + typo_char = random.choice(similar_chars) + typo_freq = self.char_frequency.get(typo_char, 0) + orig_freq = self.char_frequency.get(char, 0) + replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq) + if random.random() < replace_prob: + result.append(typo_char) + typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] + typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) + continue + result.append(char) + else: + # 处理多字词的单字替换 + word_result = [] + for i, (char, py) in enumerate(zip(word, word_pinyin)): + # 词中的字替换概率降低 + word_error_rate = self.error_rate * (0.7 ** (len(word) - 1)) + + if random.random() < word_error_rate: + similar_chars = self._get_similar_frequency_chars(char, py) + if similar_chars: + typo_char = random.choice(similar_chars) + typo_freq = self.char_frequency.get(typo_char, 0) + orig_freq = self.char_frequency.get(char, 0) + replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq) + if random.random() < replace_prob: + word_result.append(typo_char) + typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] + typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) + continue + word_result.append(char) + result.append(''.join(word_result)) + + return ''.join(result), typo_info + + def format_typo_info(self, typo_info): + """ + 格式化错别字信息 + + 参数: + typo_info: 错别字信息列表 + + 返回: + 格式化后的错别字信息字符串 + """ + if not typo_info: + return "未生成错别字" + + result = [] + for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info: + # 判断是否为词语替换 + is_word = ' ' in orig_py + if is_word: + error_type = "整词替换" + else: + tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1] + error_type = "声调错误" if tone_error else "同音字替换" + + result.append(f"原文:{orig}({orig_py}) [频率:{orig_freq:.2f}] -> " + f"替换:{typo}({typo_py}) [频率:{typo_freq:.2f}] [{error_type}]") + + return "\n".join(result) + + def set_params(self, **kwargs): + """ + 设置参数 + + 可设置参数: + error_rate: 单字替换概率 + min_freq: 最小字频阈值 + tone_error_rate: 声调错误概率 + word_replace_rate: 整词替换概率 + max_freq_diff: 最大允许的频率差异 + """ + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + print(f"参数 {key} 已设置为 {value}") + else: + print(f"警告: 参数 {key} 不存在") + +def main(): + # 创建错别字生成器实例 + typo_generator = ChineseTypoGenerator( + error_rate=0.03, + min_freq=7, + tone_error_rate=0.02, + word_replace_rate=0.3 + ) + + # 获取用户输入 + sentence = input("请输入中文句子:") + + # 创建包含错别字的句子 + start_time = time.time() + typo_sentence, typo_info = typo_generator.create_typo_sentence(sentence) + + # 打印结果 + print("\n原句:", sentence) + print("错字版:", typo_sentence) + + # 打印错别字信息 + if typo_info: + print("\n错别字信息:") + print(typo_generator.format_typo_info(typo_info)) + + # 计算并打印总耗时 + end_time = time.time() + total_time = end_time - start_time + print(f"\n总耗时:{total_time:.2f}秒") + +if __name__ == "__main__": + main() diff --git a/src/test/typo.py b/src/test/typo.py index c452589ce..16834200f 100644 --- a/src/test/typo.py +++ b/src/test/typo.py @@ -1,455 +1,376 @@ """ -错别字生成器 - 流程说明 - -整体替换逻辑: -1. 数据准备 - - 加载字频词典:使用jieba词典计算汉字使用频率 - - 创建拼音映射:建立拼音到汉字的映射关系 - - 加载词频信息:从jieba词典获取词语使用频率 - -2. 分词处理 - - 使用jieba将输入句子分词 - - 区分单字词和多字词 - - 保留标点符号和空格 - -3. 词语级别替换(针对多字词) - - 触发条件:词长>1 且 随机概率<0.3 - - 替换流程: - a. 获取词语拼音 - b. 生成所有可能的同音字组合 - c. 过滤条件: - - 必须是jieba词典中的有效词 - - 词频必须达到原词频的10%以上 - - 综合评分(词频70%+字频30%)必须达到阈值 - d. 按综合评分排序,选择最合适的替换词 - -4. 字级别替换(针对单字词或未进行整词替换的多字词) - - 单字替换概率:0.3 - - 多字词中的单字替换概率:0.3 * (0.7 ^ (词长-1)) - - 替换流程: - a. 获取字的拼音 - b. 声调错误处理(20%概率) - c. 获取同音字列表 - d. 过滤条件: - - 字频必须达到最小阈值 - - 频率差异不能过大(指数衰减计算) - e. 按频率排序选择替换字 - -5. 频率控制机制 - - 字频控制:使用归一化的字频(0-1000范围) - - 词频控制:使用jieba词典中的词频 - - 频率差异计算:使用指数衰减函数 - - 最小频率阈值:确保替换字/词不会太生僻 - -6. 输出信息 - - 原文和错字版本的对照 - - 每个替换的详细信息(原字/词、替换后字/词、拼音、频率) - - 替换类型说明(整词替换/声调错误/同音字替换) - - 词语分析和完整拼音 - -注意事项: -1. 所有替换都必须使用有意义的词语 -2. 替换词的使用频率不能过低 -3. 多字词优先考虑整词替换 -4. 考虑声调变化的情况 -5. 保持标点符号和空格不变 +错别字生成器 - 基于拼音和字频的中文错别字生成工具 """ from pypinyin import pinyin, Style from collections import defaultdict import json import os -import unicodedata import jieba -import jieba.posseg as pseg from pathlib import Path import random import math import time -def load_or_create_char_frequency(): - """ - 加载或创建汉字频率字典 - """ - cache_file = Path("char_frequency.json") - - # 如果缓存文件存在,直接加载 - if cache_file.exists(): - with open(cache_file, 'r', encoding='utf-8') as f: - return json.load(f) - - # 使用内置的词频文件 - char_freq = defaultdict(int) - dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt') - - # 读取jieba的词典文件 - with open(dict_path, 'r', encoding='utf-8') as f: - for line in f: - word, freq = line.strip().split()[:2] - # 对词中的每个字进行频率累加 - for char in word: - if is_chinese_char(char): - char_freq[char] += int(freq) - - # 归一化频率值 - max_freq = max(char_freq.values()) - normalized_freq = {char: freq/max_freq * 1000 for char, freq in char_freq.items()} - - # 保存到缓存文件 - with open(cache_file, 'w', encoding='utf-8') as f: - json.dump(normalized_freq, f, ensure_ascii=False, indent=2) - - return normalized_freq - -# 创建拼音到汉字的映射字典 -def create_pinyin_dict(): - """ - 创建拼音到汉字的映射字典 - """ - # 常用汉字范围 - chars = [chr(i) for i in range(0x4e00, 0x9fff)] - pinyin_dict = defaultdict(list) - - # 为每个汉字建立拼音映射 - for char in chars: - try: - py = pinyin(char, style=Style.TONE3)[0][0] - pinyin_dict[py].append(char) - except Exception: - continue - - return pinyin_dict - -def is_chinese_char(char): - """ - 判断是否为汉字 - """ - try: - return '\u4e00' <= char <= '\u9fff' - except: - return False - -def get_pinyin(sentence): - """ - 将中文句子拆分成单个汉字并获取其拼音 - :param sentence: 输入的中文句子 - :return: 每个汉字及其拼音的列表 - """ - # 将句子拆分成单个字符 - characters = list(sentence) - - # 获取每个字符的拼音 - result = [] - for char in characters: - # 跳过空格和非汉字字符 - if char.isspace() or not is_chinese_char(char): - continue - # 获取拼音(数字声调) - py = pinyin(char, style=Style.TONE3)[0][0] - result.append((char, py)) - - return result - -def get_homophone(char, py, pinyin_dict, char_frequency, min_freq=5): - """ - 获取同音字,按照使用频率排序 - """ - homophones = pinyin_dict[py] - # 移除原字并过滤低频字 - if char in homophones: - homophones.remove(char) - - # 过滤掉低频字 - homophones = [h for h in homophones if char_frequency.get(h, 0) >= min_freq] - - # 按照字频排序 - sorted_homophones = sorted(homophones, - key=lambda x: char_frequency.get(x, 0), - reverse=True) - - # 只返回前10个同音字,避免输出过多 - return sorted_homophones[:10] - -def get_similar_tone_pinyin(py): - """ - 获取相似声调的拼音 - 例如:'ni3' 可能返回 'ni2' 或 'ni4' - 处理特殊情况: - 1. 轻声(如 'de5' 或 'le') - 2. 非数字结尾的拼音 - """ - # 检查拼音是否为空或无效 - if not py or len(py) < 1: - return py +class ChineseTypoGenerator: + def __init__(self, + error_rate=0.3, + min_freq=5, + tone_error_rate=0.2, + word_replace_rate=0.3, + max_freq_diff=200): + """ + 初始化错别字生成器 - # 如果最后一个字符不是数字,说明可能是轻声或其他特殊情况 - if not py[-1].isdigit(): - # 为非数字结尾的拼音添加数字声调1 - return py + '1' - - base = py[:-1] # 去掉声调 - tone = int(py[-1]) # 获取声调 - - # 处理轻声(通常用5表示)或无效声调 - if tone not in [1, 2, 3, 4]: - return base + str(random.choice([1, 2, 3, 4])) - - # 正常处理声调 - possible_tones = [1, 2, 3, 4] - possible_tones.remove(tone) # 移除原声调 - new_tone = random.choice(possible_tones) # 随机选择一个新声调 - return base + str(new_tone) - -def calculate_replacement_probability(orig_freq, target_freq, max_freq_diff=200): - """ - 根据频率差计算替换概率 - 频率差越大,概率越低 - :param orig_freq: 原字频率 - :param target_freq: 目标字频率 - :param max_freq_diff: 最大允许的频率差 - :return: 0-1之间的概率值 - """ - if target_freq > orig_freq: - return 1.0 # 如果替换字频率更高,保持原有概率 - - freq_diff = orig_freq - target_freq - if freq_diff > max_freq_diff: - return 0.0 # 频率差太大,不替换 - - # 使用指数衰减函数计算概率 - # 频率差为0时概率为1,频率差为max_freq_diff时概率接近0 - return math.exp(-3 * freq_diff / max_freq_diff) - -def get_similar_frequency_chars(char, py, pinyin_dict, char_frequency, num_candidates=5, min_freq=5, tone_error_rate=0.2): - """ - 获取与给定字频率相近的同音字,可能包含声调错误 - """ - homophones = [] - - # 有20%的概率使用错误声调 - if random.random() < tone_error_rate: - wrong_tone_py = get_similar_tone_pinyin(py) - homophones.extend(pinyin_dict[wrong_tone_py]) - - # 添加正确声调的同音字 - homophones.extend(pinyin_dict[py]) - - if not homophones: - return None + 参数: + error_rate: 单字替换概率 + min_freq: 最小字频阈值 + tone_error_rate: 声调错误概率 + word_replace_rate: 整词替换概率 + max_freq_diff: 最大允许的频率差异 + """ + self.error_rate = error_rate + self.min_freq = min_freq + self.tone_error_rate = tone_error_rate + self.word_replace_rate = word_replace_rate + self.max_freq_diff = max_freq_diff - # 获取原字的频率 - orig_freq = char_frequency.get(char, 0) + # 加载数据 + print("正在加载汉字数据库,请稍候...") + self.pinyin_dict = self._create_pinyin_dict() + self.char_frequency = self._load_or_create_char_frequency() - # 计算所有同音字与原字的频率差,并过滤掉低频字 - freq_diff = [(h, char_frequency.get(h, 0)) - for h in homophones - if h != char and char_frequency.get(h, 0) >= min_freq] - - if not freq_diff: - return None - - # 计算每个候选字的替换概率 - candidates_with_prob = [] - for h, freq in freq_diff: - prob = calculate_replacement_probability(orig_freq, freq) - if prob > 0: # 只保留有效概率的候选字 - candidates_with_prob.append((h, prob)) - - if not candidates_with_prob: - return None - - # 根据概率排序 - candidates_with_prob.sort(key=lambda x: x[1], reverse=True) - - # 返回概率最高的几个字 - return [char for char, _ in candidates_with_prob[:num_candidates]] - -def get_word_pinyin(word): - """ - 获取词语的拼音列表 - """ - return [py[0] for py in pinyin(word, style=Style.TONE3)] - -def segment_sentence(sentence): - """ - 使用jieba分词,返回词语列表 - """ - return list(jieba.cut(sentence)) - -def get_word_homophones(word, pinyin_dict, char_frequency, min_freq=5): - """ - 获取整个词的同音词,只返回高频的有意义词语 - :param word: 输入词语 - :param pinyin_dict: 拼音字典 - :param char_frequency: 字频字典 - :param min_freq: 最小频率阈值 - :return: 同音词列表 - """ - if len(word) == 1: - return [] + def _load_or_create_char_frequency(self): + """ + 加载或创建汉字频率字典 + """ + cache_file = Path("char_frequency.json") - # 获取词的拼音 - word_pinyin = get_word_pinyin(word) - word_pinyin_str = ''.join(word_pinyin) - - # 创建词语频率字典 - word_freq = defaultdict(float) - - # 遍历所有可能的同音字组合 - candidates = [] - for py in word_pinyin: - chars = pinyin_dict.get(py, []) - if not chars: - return [] - candidates.append(chars) - - # 生成所有可能的组合 - import itertools - all_combinations = itertools.product(*candidates) - - # 获取jieba词典和词频信息 - dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt') - valid_words = {} # 改用字典存储词语及其频率 - with open(dict_path, 'r', encoding='utf-8') as f: - for line in f: - parts = line.strip().split() - if len(parts) >= 2: - word_text = parts[0] - word_freq = float(parts[1]) # 获取词频 - valid_words[word_text] = word_freq - - # 获取原词的词频作为参考 - original_word_freq = valid_words.get(word, 0) - min_word_freq = original_word_freq * 0.1 # 设置最小词频为原词频的10% - - # 过滤和计算频率 - homophones = [] - for combo in all_combinations: - new_word = ''.join(combo) - if new_word != word and new_word in valid_words: - new_word_freq = valid_words[new_word] - # 只保留词频达到阈值的词 - if new_word_freq >= min_word_freq: - # 计算词的平均字频(考虑字频和词频) - char_avg_freq = sum(char_frequency.get(c, 0) for c in new_word) / len(new_word) - # 综合评分:结合词频和字频 - combined_score = (new_word_freq * 0.7 + char_avg_freq * 0.3) - if combined_score >= min_freq: - homophones.append((new_word, combined_score)) - - # 按综合分数排序并限制返回数量 - sorted_homophones = sorted(homophones, key=lambda x: x[1], reverse=True) - return [word for word, _ in sorted_homophones[:5]] # 限制返回前5个结果 - -def create_typo_sentence(sentence, pinyin_dict, char_frequency, error_rate=0.5, min_freq=5, tone_error_rate=0.2, word_replace_rate=0.3): - """ - 创建包含同音字错误的句子,支持词语级别和字级别的替换 - 只使用高频的有意义词语进行替换 - """ - result = [] - typo_info = [] - - # 分词 - words = segment_sentence(sentence) - - for word in words: - # 如果是标点符号或空格,直接添加 - if all(not is_chinese_char(c) for c in word): - result.append(word) - continue - - # 获取词语的拼音 - word_pinyin = get_word_pinyin(word) + # 如果缓存文件存在,直接加载 + if cache_file.exists(): + with open(cache_file, 'r', encoding='utf-8') as f: + return json.load(f) - # 尝试整词替换 - if len(word) > 1 and random.random() < word_replace_rate: - word_homophones = get_word_homophones(word, pinyin_dict, char_frequency, min_freq) - if word_homophones: - typo_word = random.choice(word_homophones) - # 计算词的平均频率 - orig_freq = sum(char_frequency.get(c, 0) for c in word) / len(word) - typo_freq = sum(char_frequency.get(c, 0) for c in typo_word) / len(typo_word) - - # 添加到结果中 - result.append(typo_word) - typo_info.append((word, typo_word, - ' '.join(word_pinyin), - ' '.join(get_word_pinyin(typo_word)), - orig_freq, typo_freq)) + # 使用内置的词频文件 + char_freq = defaultdict(int) + dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt') + + # 读取jieba的词典文件 + with open(dict_path, 'r', encoding='utf-8') as f: + for line in f: + word, freq = line.strip().split()[:2] + # 对词中的每个字进行频率累加 + for char in word: + if self._is_chinese_char(char): + char_freq[char] += int(freq) + + # 归一化频率值 + max_freq = max(char_freq.values()) + normalized_freq = {char: freq/max_freq * 1000 for char, freq in char_freq.items()} + + # 保存到缓存文件 + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(normalized_freq, f, ensure_ascii=False, indent=2) + + return normalized_freq + + def _create_pinyin_dict(self): + """ + 创建拼音到汉字的映射字典 + """ + # 常用汉字范围 + chars = [chr(i) for i in range(0x4e00, 0x9fff)] + pinyin_dict = defaultdict(list) + + # 为每个汉字建立拼音映射 + for char in chars: + try: + py = pinyin(char, style=Style.TONE3)[0][0] + pinyin_dict[py].append(char) + except Exception: continue - # 如果不进行整词替换,则进行单字替换 + return pinyin_dict + + def _is_chinese_char(self, char): + """ + 判断是否为汉字 + """ + try: + return '\u4e00' <= char <= '\u9fff' + except: + return False + + def _get_pinyin(self, sentence): + """ + 将中文句子拆分成单个汉字并获取其拼音 + """ + # 将句子拆分成单个字符 + characters = list(sentence) + + # 获取每个字符的拼音 + result = [] + for char in characters: + # 跳过空格和非汉字字符 + if char.isspace() or not self._is_chinese_char(char): + continue + # 获取拼音(数字声调) + py = pinyin(char, style=Style.TONE3)[0][0] + result.append((char, py)) + + return result + + def _get_similar_tone_pinyin(self, py): + """ + 获取相似声调的拼音 + """ + # 检查拼音是否为空或无效 + if not py or len(py) < 1: + return py + + # 如果最后一个字符不是数字,说明可能是轻声或其他特殊情况 + if not py[-1].isdigit(): + # 为非数字结尾的拼音添加数字声调1 + return py + '1' + + base = py[:-1] # 去掉声调 + tone = int(py[-1]) # 获取声调 + + # 处理轻声(通常用5表示)或无效声调 + if tone not in [1, 2, 3, 4]: + return base + str(random.choice([1, 2, 3, 4])) + + # 正常处理声调 + possible_tones = [1, 2, 3, 4] + possible_tones.remove(tone) # 移除原声调 + new_tone = random.choice(possible_tones) # 随机选择一个新声调 + return base + str(new_tone) + + def _calculate_replacement_probability(self, orig_freq, target_freq): + """ + 根据频率差计算替换概率 + """ + if target_freq > orig_freq: + return 1.0 # 如果替换字频率更高,保持原有概率 + + freq_diff = orig_freq - target_freq + if freq_diff > self.max_freq_diff: + return 0.0 # 频率差太大,不替换 + + # 使用指数衰减函数计算概率 + # 频率差为0时概率为1,频率差为max_freq_diff时概率接近0 + return math.exp(-3 * freq_diff / self.max_freq_diff) + + def _get_similar_frequency_chars(self, char, py, num_candidates=5): + """ + 获取与给定字频率相近的同音字,可能包含声调错误 + """ + homophones = [] + + # 有一定概率使用错误声调 + if random.random() < self.tone_error_rate: + wrong_tone_py = self._get_similar_tone_pinyin(py) + homophones.extend(self.pinyin_dict[wrong_tone_py]) + + # 添加正确声调的同音字 + homophones.extend(self.pinyin_dict[py]) + + if not homophones: + return None + + # 获取原字的频率 + orig_freq = self.char_frequency.get(char, 0) + + # 计算所有同音字与原字的频率差,并过滤掉低频字 + freq_diff = [(h, self.char_frequency.get(h, 0)) + for h in homophones + if h != char and self.char_frequency.get(h, 0) >= self.min_freq] + + if not freq_diff: + return None + + # 计算每个候选字的替换概率 + candidates_with_prob = [] + for h, freq in freq_diff: + prob = self._calculate_replacement_probability(orig_freq, freq) + if prob > 0: # 只保留有效概率的候选字 + candidates_with_prob.append((h, prob)) + + if not candidates_with_prob: + return None + + # 根据概率排序 + candidates_with_prob.sort(key=lambda x: x[1], reverse=True) + + # 返回概率最高的几个字 + return [char for char, _ in candidates_with_prob[:num_candidates]] + + def _get_word_pinyin(self, word): + """ + 获取词语的拼音列表 + """ + return [py[0] for py in pinyin(word, style=Style.TONE3)] + + def _segment_sentence(self, sentence): + """ + 使用jieba分词,返回词语列表 + """ + return list(jieba.cut(sentence)) + + def _get_word_homophones(self, word): + """ + 获取整个词的同音词,只返回高频的有意义词语 + """ if len(word) == 1: - char = word - py = word_pinyin[0] - if random.random() < error_rate: - similar_chars = get_similar_frequency_chars(char, py, pinyin_dict, char_frequency, - min_freq=min_freq, tone_error_rate=tone_error_rate) - if similar_chars: - typo_char = random.choice(similar_chars) - typo_freq = char_frequency.get(typo_char, 0) - orig_freq = char_frequency.get(char, 0) - replace_prob = calculate_replacement_probability(orig_freq, typo_freq) - if random.random() < replace_prob: - result.append(typo_char) - typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] - typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) - continue - result.append(char) - else: - # 处理多字词的单字替换 - word_result = [] - for i, (char, py) in enumerate(zip(word, word_pinyin)): - # 词中的字替换概率降低 - word_error_rate = error_rate * (0.7 ** (len(word) - 1)) + return [] + + # 获取词的拼音 + word_pinyin = self._get_word_pinyin(word) + + # 遍历所有可能的同音字组合 + candidates = [] + for py in word_pinyin: + chars = self.pinyin_dict.get(py, []) + if not chars: + return [] + candidates.append(chars) + + # 生成所有可能的组合 + import itertools + all_combinations = itertools.product(*candidates) + + # 获取jieba词典和词频信息 + dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt') + valid_words = {} # 改用字典存储词语及其频率 + with open(dict_path, 'r', encoding='utf-8') as f: + for line in f: + parts = line.strip().split() + if len(parts) >= 2: + word_text = parts[0] + word_freq = float(parts[1]) # 获取词频 + valid_words[word_text] = word_freq + + # 获取原词的词频作为参考 + original_word_freq = valid_words.get(word, 0) + min_word_freq = original_word_freq * 0.1 # 设置最小词频为原词频的10% + + # 过滤和计算频率 + homophones = [] + for combo in all_combinations: + new_word = ''.join(combo) + if new_word != word and new_word in valid_words: + new_word_freq = valid_words[new_word] + # 只保留词频达到阈值的词 + if new_word_freq >= min_word_freq: + # 计算词的平均字频(考虑字频和词频) + char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word) + # 综合评分:结合词频和字频 + combined_score = (new_word_freq * 0.7 + char_avg_freq * 0.3) + if combined_score >= self.min_freq: + homophones.append((new_word, combined_score)) + + # 按综合分数排序并限制返回数量 + sorted_homophones = sorted(homophones, key=lambda x: x[1], reverse=True) + return [word for word, _ in sorted_homophones[:5]] # 限制返回前5个结果 + + def create_typo_sentence(self, sentence): + """ + 创建包含同音字错误的句子,支持词语级别和字级别的替换 + + 参数: + sentence: 输入的中文句子 + + 返回: + typo_sentence: 包含错别字的句子 + typo_info: 错别字信息列表 + """ + result = [] + typo_info = [] + + # 分词 + words = self._segment_sentence(sentence) + + for word in words: + # 如果是标点符号或空格,直接添加 + if all(not self._is_chinese_char(c) for c in word): + result.append(word) + continue - if random.random() < word_error_rate: - similar_chars = get_similar_frequency_chars(char, py, pinyin_dict, char_frequency, - min_freq=min_freq, tone_error_rate=tone_error_rate) + # 获取词语的拼音 + word_pinyin = self._get_word_pinyin(word) + + # 尝试整词替换 + if len(word) > 1 and random.random() < self.word_replace_rate: + word_homophones = self._get_word_homophones(word) + if word_homophones: + typo_word = random.choice(word_homophones) + # 计算词的平均频率 + orig_freq = sum(self.char_frequency.get(c, 0) for c in word) / len(word) + typo_freq = sum(self.char_frequency.get(c, 0) for c in typo_word) / len(typo_word) + + # 添加到结果中 + result.append(typo_word) + typo_info.append((word, typo_word, + ' '.join(word_pinyin), + ' '.join(self._get_word_pinyin(typo_word)), + orig_freq, typo_freq)) + continue + + # 如果不进行整词替换,则进行单字替换 + if len(word) == 1: + char = word + py = word_pinyin[0] + if random.random() < self.error_rate: + similar_chars = self._get_similar_frequency_chars(char, py) if similar_chars: typo_char = random.choice(similar_chars) - typo_freq = char_frequency.get(typo_char, 0) - orig_freq = char_frequency.get(char, 0) - replace_prob = calculate_replacement_probability(orig_freq, typo_freq) + typo_freq = self.char_frequency.get(typo_char, 0) + orig_freq = self.char_frequency.get(char, 0) + replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq) if random.random() < replace_prob: - word_result.append(typo_char) + result.append(typo_char) typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) continue - word_result.append(char) - result.append(''.join(word_result)) - - return ''.join(result), typo_info + result.append(char) + else: + # 处理多字词的单字替换 + word_result = [] + for i, (char, py) in enumerate(zip(word, word_pinyin)): + # 词中的字替换概率降低 + word_error_rate = self.error_rate * (0.7 ** (len(word) - 1)) + + if random.random() < word_error_rate: + similar_chars = self._get_similar_frequency_chars(char, py) + if similar_chars: + typo_char = random.choice(similar_chars) + typo_freq = self.char_frequency.get(typo_char, 0) + orig_freq = self.char_frequency.get(char, 0) + replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq) + if random.random() < replace_prob: + word_result.append(typo_char) + typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] + typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) + continue + word_result.append(char) + result.append(''.join(word_result)) + + return ''.join(result), typo_info -def format_frequency(freq): - """ - 格式化频率显示 - """ - return f"{freq:.2f}" - -def main(): - # 记录开始时间 - start_time = time.time() - - # 首先创建拼音字典和加载字频统计 - print("正在加载汉字数据库,请稍候...") - pinyin_dict = create_pinyin_dict() - char_frequency = load_or_create_char_frequency() - - # 获取用户输入 - sentence = input("请输入中文句子:") - - # 创建包含错别字的句子 - typo_sentence, typo_info = create_typo_sentence(sentence, pinyin_dict, char_frequency, - error_rate=0.3, min_freq=5, - tone_error_rate=0.2, word_replace_rate=0.3) - - # 打印结果 - print("\n原句:", sentence) - print("错字版:", typo_sentence) - - if typo_info: - print("\n错别字信息:") + def format_typo_info(self, typo_info): + """ + 格式化错别字信息 + + 参数: + typo_info: 错别字信息列表 + + 返回: + 格式化后的错别字信息字符串 + """ + if not typo_info: + return "未生成错别字" + + result = [] for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info: # 判断是否为词语替换 is_word = ' ' in orig_py @@ -459,25 +380,53 @@ def main(): tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1] error_type = "声调错误" if tone_error else "同音字替换" - print(f"原文:{orig}({orig_py}) [频率:{format_frequency(orig_freq)}] -> " - f"替换:{typo}({typo_py}) [频率:{format_frequency(typo_freq)}] [{error_type}]") + result.append(f"原文:{orig}({orig_py}) [频率:{orig_freq:.2f}] -> " + f"替换:{typo}({typo_py}) [频率:{typo_freq:.2f}] [{error_type}]") + + return "\n".join(result) - # 获取拼音结果 - result = get_pinyin(sentence) + def set_params(self, **kwargs): + """ + 设置参数 + + 可设置参数: + error_rate: 单字替换概率 + min_freq: 最小字频阈值 + tone_error_rate: 声调错误概率 + word_replace_rate: 整词替换概率 + max_freq_diff: 最大允许的频率差异 + """ + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + print(f"参数 {key} 已设置为 {value}") + else: + print(f"警告: 参数 {key} 不存在") + +def main(): + # 创建错别字生成器实例 + typo_generator = ChineseTypoGenerator( + error_rate=0.03, + min_freq=7, + tone_error_rate=0.02, + word_replace_rate=0.3 + ) - # 打印完整拼音 - print("\n完整拼音:") - print(" ".join(py for _, py in result)) + # 获取用户输入 + sentence = input("请输入中文句子:") - # 打印词语分析 - print("\n词语分析:") - words = segment_sentence(sentence) - for word in words: - if any(is_chinese_char(c) for c in word): - word_pinyin = get_word_pinyin(word) - print(f"词语:{word}") - print(f"拼音:{' '.join(word_pinyin)}") - print("---") + # 创建包含错别字的句子 + start_time = time.time() + typo_sentence, typo_info = typo_generator.create_typo_sentence(sentence) + + # 打印结果 + print("\n原句:", sentence) + print("错字版:", typo_sentence) + + # 打印错别字信息 + if typo_info: + print("\n错别字信息:") + print(typo_generator.format_typo_info(typo_info)) # 计算并打印总耗时 end_time = time.time()