Merge branch 'debug' into feature

This commit is contained in:
SengokuCola
2025-03-03 22:43:44 +08:00
committed by GitHub
10 changed files with 677 additions and 570 deletions

View File

@@ -97,8 +97,13 @@ class ChatBot:
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.time))
topic = topic_identifier.identify_topic_jieba(message.processed_plain_text)
print(f"\033[1;32m[主题识别]\033[0m 主题: {topic}")
topic1 = topic_identifier.identify_topic_jieba(message.processed_plain_text)
topic2 = await topic_identifier.identify_topic_llm(message.processed_plain_text)
topic3 = topic_identifier.identify_topic_snownlp(message.processed_plain_text)
print(f"\033[1;32m[主题识别]\033[0m 使用jieba主题: {topic1}")
print(f"\033[1;32m[主题识别]\033[0m 使用llm主题: {topic2}")
print(f"\033[1;32m[主题识别]\033[0m 使用snownlp主题: {topic3}")
topic = topic3
all_num = 0
interested_num = 0

View File

@@ -4,19 +4,18 @@ from .message import Message
import jieba
from nonebot import get_driver
from .config import global_config
from snownlp import SnowNLP
from ..models.utils_model import LLM_request
driver = get_driver()
config = driver.config
class TopicIdentifier:
def __init__(self):
self.client = OpenAI(
api_key=config.siliconflow_key,
base_url=config.siliconflow_base_url
)
self.llm_client = LLM_request(model=global_config.llm_normal)
def identify_topic_llm(self, text: str) -> Optional[str]:
"""识别消息主题"""
async def identify_topic_llm(self, text: str) -> Optional[List[str]]:
"""识别消息主题,返回主题列表"""
prompt = f"""判断这条消息的主题,如果没有明显主题请回复"无主题",要求:
1. 主题通常2-4个字必须简短要求精准概括不要太具体。
@@ -24,33 +23,20 @@ class TopicIdentifier:
消息内容:{text}"""
response = self.client.chat.completions.create(
model=global_config.SILICONFLOW_MODEL_V3,
messages=[{"role": "user", "content": prompt}],
temperature=0.8,
max_tokens=10
)
# 使用 LLM_request 类进行请求
topic, _ = await self.llm_client.generate_response(prompt)
if not response or not response.choices:
print(f"\033[1;31m[错误]\033[0m OpenAI API 返回为空")
if not topic:
print(f"\033[1;31m[错误]\033[0m LLM API 返回为空")
return None
# 从 OpenAI API 响应中获取第一个选项的消息内容,并去除首尾空白字符
topic = response.choices[0].message.content.strip() if response.choices[0].message.content else None
if topic == "无主题":
return None
else:
# print(f"[主题分析结果]{text[:20]}... : {topic}")
split_topic = self.parse_topic(topic)
return split_topic
def parse_topic(self, topic: str) -> List[str]:
"""解析主题,返回主题列表"""
# 直接在这里处理主题解析
if not topic or topic == "无主题":
return []
return [t.strip() for t in topic.split(",") if t.strip()]
return None
# 解析主题字符串为列表
topic_list = [t.strip() for t in topic.split(",") if t.strip()]
return topic_list if topic_list else None
def identify_topic_jieba(self, text: str) -> Optional[str]:
"""使用jieba识别主题"""
@@ -80,9 +66,12 @@ class TopicIdentifier:
filtered_words = []
for word in words:
if word not in stop_words and not word.strip() in {
'', '', '', '', '', '', '', '"', '"', ''', ''',
'', '', '', '', '', '', '', '', '·', '', '~',
'', '+', '=', '-','[',']'
'', '', '', '', '', '', '', '"', '"', ''', ''',
'', '', '', '', '', '', '', '', '·', '', '~',
'', '+', '=', '-', '/', '\\', '|', '*', '#', '@', '$', '%',
'^', '&', '[', ']', '{', '}', '<', '>', '`', '_', '.', ',',
';', ':', '\'', '"', '(', ')', '?', '!', '±', '×', '÷', '',
'', '', '', '', '', '', '', '', '', '', ''
}:
filtered_words.append(word)
@@ -97,4 +86,25 @@ class TopicIdentifier:
return top_words if top_words else None
topic_identifier = TopicIdentifier()
def identify_topic_snownlp(self, text: str) -> Optional[List[str]]:
"""使用 SnowNLP 进行主题识别
Args:
text (str): 需要识别主题的文本
Returns:
Optional[List[str]]: 返回识别出的主题关键词列表,如果无法识别则返回 None
"""
if not text or len(text.strip()) == 0:
return None
try:
s = SnowNLP(text)
# 提取前3个关键词作为主题
keywords = s.keywords(3)
return keywords if keywords else None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m SnowNLP 处理失败: {str(e)}")
return None
topic_identifier = TopicIdentifier()

View File

@@ -2,7 +2,6 @@
import os
import sys
import jieba
from llm_module import LLMModel
import networkx as nx
import matplotlib.pyplot as plt
import math
@@ -10,10 +9,76 @@ from collections import Counter
import datetime
import random
import time
# from chat.config import global_config
from dotenv import load_dotenv
import sys
import asyncio
import aiohttp
from typing import Tuple
sys.path.append("C:/GitHub/MaiMBot") # 添加项目根目录到 Python 路径
from src.common.database import Database # 使用正确的导入语法
# 加载.env.dev文件
env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), '.env.dev')
load_dotenv(env_path)
class LLMModel:
def __init__(self, model_name=os.getenv("SILICONFLOW_MODEL_V3"), **kwargs):
self.model_name = model_name
self.params = kwargs
self.api_key = os.getenv("SILICONFLOW_KEY")
self.base_url = os.getenv("SILICONFLOW_BASE_URL")
async def generate_response(self, prompt: str) -> Tuple[str, str]:
"""根据输入的提示生成模型的响应"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 构建请求体
data = {
"model": self.model_name,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5,
**self.params
}
# 发送请求到完整的chat/completions端点
api_url = f"{self.base_url.rstrip('/')}/chat/completions"
max_retries = 3
base_wait_time = 15
for retry in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=data) as response:
if response.status == 429:
wait_time = base_wait_time * (2 ** retry) # 指数退避
print(f"遇到请求限制(429),等待{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
continue
response.raise_for_status() # 检查其他响应状态
result = await response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
reasoning_content = result["choices"][0]["message"].get("reasoning_content", "")
return content, reasoning_content
return "没有返回结果", ""
except Exception as e:
if retry < max_retries - 1: # 如果还有重试机会
wait_time = base_wait_time * (2 ** retry)
print(f"请求失败,等待{wait_time}秒后重试... 错误: {str(e)}")
await asyncio.sleep(wait_time)
else:
return f"请求失败: {str(e)}", ""
return "达到最大重试次数,请求仍然失败", ""
class Memory_graph:
def __init__(self):
@@ -158,12 +223,12 @@ class Memory_graph:
def main():
# 初始化数据库
Database.initialize(
host= os.getenv("MONGODB_HOST"),
port= int(os.getenv("MONGODB_PORT")),
db_name= os.getenv("DATABASE_NAME"),
username= os.getenv("MONGODB_USERNAME"),
password= os.getenv("MONGODB_PASSWORD"),
auth_source=os.getenv("MONGODB_AUTH_SOURCE")
host=os.getenv("MONGODB_HOST", "127.0.0.1"),
port=int(os.getenv("MONGODB_PORT", "27017")),
db_name=os.getenv("DATABASE_NAME", "MegBot"),
username=os.getenv("MONGODB_USERNAME", ""),
password=os.getenv("MONGODB_PASSWORD", ""),
auth_source=os.getenv("MONGODB_AUTH_SOURCE", "")
)
memory_graph = Memory_graph()
@@ -185,11 +250,14 @@ def main():
query = input("请输入新的查询概念(输入'退出'以结束):")
if query.lower() == '退出':
break
items_list = memory_graph.get_related_item(query)
if items_list:
# print(items_list)
for memory_item in items_list:
print(memory_item)
first_layer_items, second_layer_items = memory_graph.get_related_item(query)
if first_layer_items or second_layer_items:
print("\n第一层记忆:")
for item in first_layer_items:
print(item)
print("\n第二层记忆:")
for item in second_layer_items:
print(item)
else:
print("未找到相关记忆。")

View File

@@ -1,70 +0,0 @@
from textblob import TextBlob
import jieba
from translate import Translator
def analyze_emotion(text):
"""
分析文本的情感,返回情感极性和主观性得分
:param text: 输入文本
:return: (情感极性, 主观性) 元组
情感极性: -1(非常消极) 到 1(非常积极)
主观性: 0(客观) 到 1(主观)
"""
try:
# 创建翻译器
translator = Translator(to_lang="en", from_lang="zh")
# 如果是中文文本,先翻译成英文
# 因为TextBlob的情感分析主要基于英文
translated_text = translator.translate(text)
# 创建TextBlob对象
blob = TextBlob(translated_text)
# 获取情感极性和主观性
polarity = blob.sentiment.polarity
subjectivity = blob.sentiment.subjectivity
return polarity, subjectivity
except Exception as e:
print(f"分析过程中出现错误: {str(e)}")
return None, None
def get_emotion_description(polarity, subjectivity):
"""
根据情感极性和主观性生成描述性文字
"""
if polarity is None or subjectivity is None:
return "无法分析情感"
# 情感极性描述
if polarity > 0.5:
emotion = "非常积极"
elif polarity > 0:
emotion = "较为积极"
elif polarity == 0:
emotion = "中性"
elif polarity > -0.5:
emotion = "较为消极"
else:
emotion = "非常消极"
# 主观性描述
if subjectivity > 0.7:
subj = "非常主观"
elif subjectivity > 0.3:
subj = "较为主观"
else:
subj = "较为客观"
return f"情感倾向: {emotion}, 表达方式: {subj}"
if __name__ == "__main__":
# 测试样例
test_text = "今天天气真好,我感到非常开心!"
polarity, subjectivity = analyze_emotion(test_text)
print(f"测试文本: {test_text}")
print(f"情感极性: {polarity:.2f}")
print(f"主观性得分: {subjectivity:.2f}")
print(get_emotion_description(polarity, subjectivity))

View File

@@ -1,74 +0,0 @@
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
def setup_bert_analyzer():
"""
设置中文BERT情感分析器
"""
# 使用专门针对中文情感分析的模型
model_name = "uer/roberta-base-finetuned-jd-binary-chinese"
try:
# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 创建情感分析pipeline
analyzer = pipeline("sentiment-analysis",
model=model,
tokenizer=tokenizer)
return analyzer
except Exception as e:
print(f"模型加载错误: {str(e)}")
return None
def analyze_emotion_bert(text, analyzer):
"""
使用BERT模型进行中文情感分析
"""
try:
if not analyzer:
return None
# 进行情感分析
result = analyzer(text)[0]
return {
'label': result['label'],
'score': result['score']
}
except Exception as e:
print(f"分析过程中出现错误: {str(e)}")
return None
def get_emotion_description_bert(result):
"""
将BERT的情感分析结果转换为描述性文字
"""
if not result:
return "无法分析情感"
label = "积极" if result['label'] == 'positive' else "消极"
confidence = result['score']
if confidence > 0.9:
strength = "强烈"
elif confidence > 0.7:
strength = "明显"
else:
strength = "轻微"
return f"{strength}{label}"
if __name__ == "__main__":
# 初始化分析器
analyzer = setup_bert_analyzer()
# 测试样例
test_text = "这个产品质量很好,使用起来非常方便,推荐购买!"
result = analyze_emotion_bert(test_text, analyzer)
print(f"测试文本: {test_text}")
if result:
print(f"情感倾向: {get_emotion_description_bert(result)}")
print(f"置信度: {result['score']:.2f}")

View File

@@ -1,62 +0,0 @@
import hanlp
def analyze_emotion_hanlp(text):
"""
使用HanLP进行中文情感分析
"""
try:
# 使用更基础的模型
tokenizer = hanlp.load('PKU_NAME_MERGED_SIX_MONTHS_CONVSEG')
# 分词
words = tokenizer(text)
# 简单的情感词典方法
positive_words = {'', '', '优秀', '喜欢', '开心', '快乐', '美味', '推荐', '优质', '满意'}
negative_words = {'', '', '', '讨厌', '失望', '难受', '恶心', '不满', '差劲', '垃圾'}
# 计算情感得分
score = 0
for word in words:
if word in positive_words:
score += 1
elif word in negative_words:
score -= 1
# 归一化得分
if score > 0:
return 1
elif score < 0:
return 0
else:
return 0.5
except Exception as e:
print(f"分析过程中出现错误: {str(e)}")
return None
def get_emotion_description_hanlp(score):
"""
将HanLP的情感分析结果转换为描述性文字
"""
if score is None:
return "无法分析情感"
elif score == 1:
return "积极"
elif score == 0:
return "消极"
else:
return "中性"
if __name__ == "__main__":
# 测试样例
test_texts = [
"这家餐厅的服务态度很好,菜品也很美味!",
"这个产品质量太差了,一点都不值这个价",
"今天天气不错,但是工作很累"
]
for test_text in test_texts:
result = analyze_emotion_hanlp(test_text)
print(f"\n测试文本: {test_text}")
print(f"情感倾向: {get_emotion_description_hanlp(result)}")

488
src/test/typo_creator.py Normal file
View File

@@ -0,0 +1,488 @@
"""
错别字生成器 - 流程说明
整体替换逻辑:
1. 数据准备
- 加载字频词典使用jieba词典计算汉字使用频率
- 创建拼音映射:建立拼音到汉字的映射关系
- 加载词频信息从jieba词典获取词语使用频率
2. 分词处理
- 使用jieba将输入句子分词
- 区分单字词和多字词
- 保留标点符号和空格
3. 词语级别替换(针对多字词)
- 触发条件:词长>1 且 随机概率<0.3
- 替换流程:
a. 获取词语拼音
b. 生成所有可能的同音字组合
c. 过滤条件:
- 必须是jieba词典中的有效词
- 词频必须达到原词频的10%以上
- 综合评分(词频70%+字频30%)必须达到阈值
d. 按综合评分排序,选择最合适的替换词
4. 字级别替换(针对单字词或未进行整词替换的多字词)
- 单字替换概率0.3
- 多字词中的单字替换概率0.3 * (0.7 ^ (词长-1))
- 替换流程:
a. 获取字的拼音
b. 声调错误处理20%概率)
c. 获取同音字列表
d. 过滤条件:
- 字频必须达到最小阈值
- 频率差异不能过大(指数衰减计算)
e. 按频率排序选择替换字
5. 频率控制机制
- 字频控制使用归一化的字频0-1000范围
- 词频控制使用jieba词典中的词频
- 频率差异计算:使用指数衰减函数
- 最小频率阈值:确保替换字/词不会太生僻
6. 输出信息
- 原文和错字版本的对照
- 每个替换的详细信息(原字/词、替换后字/词、拼音、频率)
- 替换类型说明(整词替换/声调错误/同音字替换)
- 词语分析和完整拼音
注意事项:
1. 所有替换都必须使用有意义的词语
2. 替换词的使用频率不能过低
3. 多字词优先考虑整词替换
4. 考虑声调变化的情况
5. 保持标点符号和空格不变
"""
from pypinyin import pinyin, Style
from collections import defaultdict
import json
import os
import unicodedata
import jieba
import jieba.posseg as pseg
from pathlib import Path
import random
import math
import time
def load_or_create_char_frequency():
"""
加载或创建汉字频率字典
"""
cache_file = Path("char_frequency.json")
# 如果缓存文件存在,直接加载
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
# 使用内置的词频文件
char_freq = defaultdict(int)
dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt')
# 读取jieba的词典文件
with open(dict_path, 'r', encoding='utf-8') as f:
for line in f:
word, freq = line.strip().split()[:2]
# 对词中的每个字进行频率累加
for char in word:
if is_chinese_char(char):
char_freq[char] += int(freq)
# 归一化频率值
max_freq = max(char_freq.values())
normalized_freq = {char: freq/max_freq * 1000 for char, freq in char_freq.items()}
# 保存到缓存文件
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(normalized_freq, f, ensure_ascii=False, indent=2)
return normalized_freq
# 创建拼音到汉字的映射字典
def create_pinyin_dict():
"""
创建拼音到汉字的映射字典
"""
# 常用汉字范围
chars = [chr(i) for i in range(0x4e00, 0x9fff)]
pinyin_dict = defaultdict(list)
# 为每个汉字建立拼音映射
for char in chars:
try:
py = pinyin(char, style=Style.TONE3)[0][0]
pinyin_dict[py].append(char)
except Exception:
continue
return pinyin_dict
def is_chinese_char(char):
"""
判断是否为汉字
"""
try:
return '\u4e00' <= char <= '\u9fff'
except:
return False
def get_pinyin(sentence):
"""
将中文句子拆分成单个汉字并获取其拼音
:param sentence: 输入的中文句子
:return: 每个汉字及其拼音的列表
"""
# 将句子拆分成单个字符
characters = list(sentence)
# 获取每个字符的拼音
result = []
for char in characters:
# 跳过空格和非汉字字符
if char.isspace() or not is_chinese_char(char):
continue
# 获取拼音(数字声调)
py = pinyin(char, style=Style.TONE3)[0][0]
result.append((char, py))
return result
def get_homophone(char, py, pinyin_dict, char_frequency, min_freq=5):
"""
获取同音字,按照使用频率排序
"""
homophones = pinyin_dict[py]
# 移除原字并过滤低频字
if char in homophones:
homophones.remove(char)
# 过滤掉低频字
homophones = [h for h in homophones if char_frequency.get(h, 0) >= min_freq]
# 按照字频排序
sorted_homophones = sorted(homophones,
key=lambda x: char_frequency.get(x, 0),
reverse=True)
# 只返回前10个同音字避免输出过多
return sorted_homophones[:10]
def get_similar_tone_pinyin(py):
"""
获取相似声调的拼音
例如:'ni3' 可能返回 'ni2''ni4'
处理特殊情况:
1. 轻声(如 'de5''le'
2. 非数字结尾的拼音
"""
# 检查拼音是否为空或无效
if not py or len(py) < 1:
return py
# 如果最后一个字符不是数字,说明可能是轻声或其他特殊情况
if not py[-1].isdigit():
# 为非数字结尾的拼音添加数字声调1
return py + '1'
base = py[:-1] # 去掉声调
tone = int(py[-1]) # 获取声调
# 处理轻声通常用5表示或无效声调
if tone not in [1, 2, 3, 4]:
return base + str(random.choice([1, 2, 3, 4]))
# 正常处理声调
possible_tones = [1, 2, 3, 4]
possible_tones.remove(tone) # 移除原声调
new_tone = random.choice(possible_tones) # 随机选择一个新声调
return base + str(new_tone)
def calculate_replacement_probability(orig_freq, target_freq, max_freq_diff=200):
"""
根据频率差计算替换概率
频率差越大,概率越低
:param orig_freq: 原字频率
:param target_freq: 目标字频率
:param max_freq_diff: 最大允许的频率差
:return: 0-1之间的概率值
"""
if target_freq > orig_freq:
return 1.0 # 如果替换字频率更高,保持原有概率
freq_diff = orig_freq - target_freq
if freq_diff > max_freq_diff:
return 0.0 # 频率差太大,不替换
# 使用指数衰减函数计算概率
# 频率差为0时概率为1频率差为max_freq_diff时概率接近0
return math.exp(-3 * freq_diff / max_freq_diff)
def get_similar_frequency_chars(char, py, pinyin_dict, char_frequency, num_candidates=5, min_freq=5, tone_error_rate=0.2):
"""
获取与给定字频率相近的同音字,可能包含声调错误
"""
homophones = []
# 有20%的概率使用错误声调
if random.random() < tone_error_rate:
wrong_tone_py = get_similar_tone_pinyin(py)
homophones.extend(pinyin_dict[wrong_tone_py])
# 添加正确声调的同音字
homophones.extend(pinyin_dict[py])
if not homophones:
return None
# 获取原字的频率
orig_freq = char_frequency.get(char, 0)
# 计算所有同音字与原字的频率差,并过滤掉低频字
freq_diff = [(h, char_frequency.get(h, 0))
for h in homophones
if h != char and char_frequency.get(h, 0) >= min_freq]
if not freq_diff:
return None
# 计算每个候选字的替换概率
candidates_with_prob = []
for h, freq in freq_diff:
prob = calculate_replacement_probability(orig_freq, freq)
if prob > 0: # 只保留有效概率的候选字
candidates_with_prob.append((h, prob))
if not candidates_with_prob:
return None
# 根据概率排序
candidates_with_prob.sort(key=lambda x: x[1], reverse=True)
# 返回概率最高的几个字
return [char for char, _ in candidates_with_prob[:num_candidates]]
def get_word_pinyin(word):
"""
获取词语的拼音列表
"""
return [py[0] for py in pinyin(word, style=Style.TONE3)]
def segment_sentence(sentence):
"""
使用jieba分词返回词语列表
"""
return list(jieba.cut(sentence))
def get_word_homophones(word, pinyin_dict, char_frequency, min_freq=5):
"""
获取整个词的同音词,只返回高频的有意义词语
:param word: 输入词语
:param pinyin_dict: 拼音字典
:param char_frequency: 字频字典
:param min_freq: 最小频率阈值
:return: 同音词列表
"""
if len(word) == 1:
return []
# 获取词的拼音
word_pinyin = get_word_pinyin(word)
word_pinyin_str = ''.join(word_pinyin)
# 创建词语频率字典
word_freq = defaultdict(float)
# 遍历所有可能的同音字组合
candidates = []
for py in word_pinyin:
chars = pinyin_dict.get(py, [])
if not chars:
return []
candidates.append(chars)
# 生成所有可能的组合
import itertools
all_combinations = itertools.product(*candidates)
# 获取jieba词典和词频信息
dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt')
valid_words = {} # 改用字典存储词语及其频率
with open(dict_path, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 2:
word_text = parts[0]
word_freq = float(parts[1]) # 获取词频
valid_words[word_text] = word_freq
# 获取原词的词频作为参考
original_word_freq = valid_words.get(word, 0)
min_word_freq = original_word_freq * 0.1 # 设置最小词频为原词频的10%
# 过滤和计算频率
homophones = []
for combo in all_combinations:
new_word = ''.join(combo)
if new_word != word and new_word in valid_words:
new_word_freq = valid_words[new_word]
# 只保留词频达到阈值的词
if new_word_freq >= min_word_freq:
# 计算词的平均字频(考虑字频和词频)
char_avg_freq = sum(char_frequency.get(c, 0) for c in new_word) / len(new_word)
# 综合评分:结合词频和字频
combined_score = (new_word_freq * 0.7 + char_avg_freq * 0.3)
if combined_score >= min_freq:
homophones.append((new_word, combined_score))
# 按综合分数排序并限制返回数量
sorted_homophones = sorted(homophones, key=lambda x: x[1], reverse=True)
return [word for word, _ in sorted_homophones[:5]] # 限制返回前5个结果
def create_typo_sentence(sentence, pinyin_dict, char_frequency, error_rate=0.5, min_freq=5, tone_error_rate=0.2, word_replace_rate=0.3):
"""
创建包含同音字错误的句子,支持词语级别和字级别的替换
只使用高频的有意义词语进行替换
"""
result = []
typo_info = []
# 分词
words = segment_sentence(sentence)
for word in words:
# 如果是标点符号或空格,直接添加
if all(not is_chinese_char(c) for c in word):
result.append(word)
continue
# 获取词语的拼音
word_pinyin = get_word_pinyin(word)
# 尝试整词替换
if len(word) > 1 and random.random() < word_replace_rate:
word_homophones = get_word_homophones(word, pinyin_dict, char_frequency, min_freq)
if word_homophones:
typo_word = random.choice(word_homophones)
# 计算词的平均频率
orig_freq = sum(char_frequency.get(c, 0) for c in word) / len(word)
typo_freq = sum(char_frequency.get(c, 0) for c in typo_word) / len(typo_word)
# 添加到结果中
result.append(typo_word)
typo_info.append((word, typo_word,
' '.join(word_pinyin),
' '.join(get_word_pinyin(typo_word)),
orig_freq, typo_freq))
continue
# 如果不进行整词替换,则进行单字替换
if len(word) == 1:
char = word
py = word_pinyin[0]
if random.random() < error_rate:
similar_chars = get_similar_frequency_chars(char, py, pinyin_dict, char_frequency,
min_freq=min_freq, tone_error_rate=tone_error_rate)
if similar_chars:
typo_char = random.choice(similar_chars)
typo_freq = char_frequency.get(typo_char, 0)
orig_freq = char_frequency.get(char, 0)
replace_prob = calculate_replacement_probability(orig_freq, typo_freq)
if random.random() < replace_prob:
result.append(typo_char)
typo_py = pinyin(typo_char, style=Style.TONE3)[0][0]
typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq))
continue
result.append(char)
else:
# 处理多字词的单字替换
word_result = []
for i, (char, py) in enumerate(zip(word, word_pinyin)):
# 词中的字替换概率降低
word_error_rate = error_rate * (0.7 ** (len(word) - 1))
if random.random() < word_error_rate:
similar_chars = get_similar_frequency_chars(char, py, pinyin_dict, char_frequency,
min_freq=min_freq, tone_error_rate=tone_error_rate)
if similar_chars:
typo_char = random.choice(similar_chars)
typo_freq = char_frequency.get(typo_char, 0)
orig_freq = char_frequency.get(char, 0)
replace_prob = calculate_replacement_probability(orig_freq, typo_freq)
if random.random() < replace_prob:
word_result.append(typo_char)
typo_py = pinyin(typo_char, style=Style.TONE3)[0][0]
typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq))
continue
word_result.append(char)
result.append(''.join(word_result))
return ''.join(result), typo_info
def format_frequency(freq):
"""
格式化频率显示
"""
return f"{freq:.2f}"
def main():
# 记录开始时间
start_time = time.time()
# 首先创建拼音字典和加载字频统计
print("正在加载汉字数据库,请稍候...")
pinyin_dict = create_pinyin_dict()
char_frequency = load_or_create_char_frequency()
# 获取用户输入
sentence = input("请输入中文句子:")
# 创建包含错别字的句子
typo_sentence, typo_info = create_typo_sentence(sentence, pinyin_dict, char_frequency,
error_rate=0.3, min_freq=5,
tone_error_rate=0.2, word_replace_rate=0.3)
# 打印结果
print("\n原句:", sentence)
print("错字版:", typo_sentence)
if typo_info:
print("\n错别字信息:")
for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info:
# 判断是否为词语替换
is_word = ' ' in orig_py
if is_word:
error_type = "整词替换"
else:
tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1]
error_type = "声调错误" if tone_error else "同音字替换"
print(f"原文:{orig}({orig_py}) [频率:{format_frequency(orig_freq)}] -> "
f"替换:{typo}({typo_py}) [频率:{format_frequency(typo_freq)}] [{error_type}]")
# 获取拼音结果
result = get_pinyin(sentence)
# 打印完整拼音
print("\n完整拼音:")
print(" ".join(py for _, py in result))
# 打印词语分析
print("\n词语分析:")
words = segment_sentence(sentence)
for word in words:
if any(is_chinese_char(c) for c in word):
word_pinyin = get_word_pinyin(word)
print(f"词语:{word}")
print(f"拼音:{' '.join(word_pinyin)}")
print("---")
# 计算并打印总耗时
end_time = time.time()
total_time = end_time - start_time
print(f"\n总耗时:{total_time:.2f}")
if __name__ == "__main__":
main()

View File

@@ -1,301 +0,0 @@
from pypinyin import pinyin, Style
from collections import defaultdict
import json
import os
import unicodedata
import jieba
import jieba.posseg as pseg
from pathlib import Path
import random
import math
def load_or_create_char_frequency():
"""
加载或创建汉字频率字典
"""
cache_file = Path("char_frequency.json")
# 如果缓存文件存在,直接加载
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
# 使用内置的词频文件
char_freq = defaultdict(int)
dict_path = os.path.join(os.path.dirname(jieba.__file__), 'dict.txt')
# 读取jieba的词典文件
with open(dict_path, 'r', encoding='utf-8') as f:
for line in f:
word, freq = line.strip().split()[:2]
# 对词中的每个字进行频率累加
for char in word:
if is_chinese_char(char):
char_freq[char] += int(freq)
# 归一化频率值
max_freq = max(char_freq.values())
normalized_freq = {char: freq/max_freq * 1000 for char, freq in char_freq.items()}
# 保存到缓存文件
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(normalized_freq, f, ensure_ascii=False, indent=2)
return normalized_freq
# 创建拼音到汉字的映射字典
def create_pinyin_dict():
"""
创建拼音到汉字的映射字典
"""
# 常用汉字范围
chars = [chr(i) for i in range(0x4e00, 0x9fff)]
pinyin_dict = defaultdict(list)
# 为每个汉字建立拼音映射
for char in chars:
try:
py = pinyin(char, style=Style.TONE3)[0][0]
pinyin_dict[py].append(char)
except Exception:
continue
return pinyin_dict
def is_chinese_char(char):
"""
判断是否为汉字
"""
try:
return '\u4e00' <= char <= '\u9fff'
except:
return False
def get_pinyin(sentence):
"""
将中文句子拆分成单个汉字并获取其拼音
:param sentence: 输入的中文句子
:return: 每个汉字及其拼音的列表
"""
# 将句子拆分成单个字符
characters = list(sentence)
# 获取每个字符的拼音
result = []
for char in characters:
# 跳过空格和非汉字字符
if char.isspace() or not is_chinese_char(char):
continue
# 获取拼音(数字声调)
py = pinyin(char, style=Style.TONE3)[0][0]
result.append((char, py))
return result
def get_homophone(char, py, pinyin_dict, char_frequency, min_freq=5):
"""
获取同音字,按照使用频率排序
"""
homophones = pinyin_dict[py]
# 移除原字并过滤低频字
if char in homophones:
homophones.remove(char)
# 过滤掉低频字
homophones = [h for h in homophones if char_frequency.get(h, 0) >= min_freq]
# 按照字频排序
sorted_homophones = sorted(homophones,
key=lambda x: char_frequency.get(x, 0),
reverse=True)
# 只返回前10个同音字避免输出过多
return sorted_homophones[:10]
def get_similar_tone_pinyin(py):
"""
获取相似声调的拼音
例如:'ni3' 可能返回 'ni2''ni4'
"""
base = py[:-1] # 去掉声调
tone = int(py[-1]) # 获取声调
possible_tones = [1, 2, 3, 4]
possible_tones.remove(tone) # 移除原声调
new_tone = random.choice(possible_tones) # 随机选择一个新声调
return base + str(new_tone)
def calculate_replacement_probability(orig_freq, target_freq, max_freq_diff=200):
"""
根据频率差计算替换概率
频率差越大,概率越低
:param orig_freq: 原字频率
:param target_freq: 目标字频率
:param max_freq_diff: 最大允许的频率差
:return: 0-1之间的概率值
"""
if target_freq > orig_freq:
return 1.0 # 如果替换字频率更高,保持原有概率
freq_diff = orig_freq - target_freq
if freq_diff > max_freq_diff:
return 0.0 # 频率差太大,不替换
# 使用指数衰减函数计算概率
# 频率差为0时概率为1频率差为max_freq_diff时概率接近0
return math.exp(-3 * freq_diff / max_freq_diff)
def get_similar_frequency_chars(char, py, pinyin_dict, char_frequency, num_candidates=5, min_freq=5, tone_error_rate=0.2):
"""
获取与给定字频率相近的同音字,可能包含声调错误
"""
homophones = []
# 有20%的概率使用错误声调
if random.random() < tone_error_rate:
wrong_tone_py = get_similar_tone_pinyin(py)
homophones.extend(pinyin_dict[wrong_tone_py])
# 添加正确声调的同音字
homophones.extend(pinyin_dict[py])
if not homophones:
return None
# 获取原字的频率
orig_freq = char_frequency.get(char, 0)
# 计算所有同音字与原字的频率差,并过滤掉低频字
freq_diff = [(h, char_frequency.get(h, 0))
for h in homophones
if h != char and char_frequency.get(h, 0) >= min_freq]
if not freq_diff:
return None
# 计算每个候选字的替换概率
candidates_with_prob = []
for h, freq in freq_diff:
prob = calculate_replacement_probability(orig_freq, freq)
if prob > 0: # 只保留有效概率的候选字
candidates_with_prob.append((h, prob))
if not candidates_with_prob:
return None
# 根据概率排序
candidates_with_prob.sort(key=lambda x: x[1], reverse=True)
# 返回概率最高的几个字
return [char for char, _ in candidates_with_prob[:num_candidates]]
def create_typo_sentence(sentence, pinyin_dict, char_frequency, error_rate=0.5, min_freq=5, tone_error_rate=0.2):
"""
创建包含同音字错误的句子,保留原文标点符号
"""
result = []
typo_info = []
# 获取每个字的拼音
chars_with_pinyin = get_pinyin(sentence)
# 创建原字到拼音的映射,用于跟踪已处理的字符
processed_chars = {char: py for char, py in chars_with_pinyin}
# 遍历原句中的每个字符
char_index = 0
for i, char in enumerate(sentence):
if char.isspace():
# 保留空格
result.append(char)
elif char in processed_chars:
# 处理汉字
py = processed_chars[char]
# 基础错误率
if random.random() < error_rate:
# 获取频率相近的同音字(可能包含声调错误)
similar_chars = get_similar_frequency_chars(char, py, pinyin_dict, char_frequency,
min_freq=min_freq, tone_error_rate=tone_error_rate)
if similar_chars:
# 随机选择一个替换字
typo_char = random.choice(similar_chars)
# 获取替换字的频率
typo_freq = char_frequency.get(typo_char, 0)
orig_freq = char_frequency.get(char, 0)
# 计算实际替换概率
replace_prob = calculate_replacement_probability(orig_freq, typo_freq)
# 根据频率差进行概率替换
if random.random() < replace_prob:
result.append(typo_char)
# 获取替换字的实际拼音
typo_py = pinyin(typo_char, style=Style.TONE3)[0][0]
typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq))
else:
result.append(char)
else:
result.append(char)
else:
result.append(char)
char_index += 1
else:
# 保留非汉字字符(标点符号等)
result.append(char)
return ''.join(result), typo_info
def format_frequency(freq):
"""
格式化频率显示
"""
return f"{freq:.2f}"
def main():
# 首先创建拼音字典和加载字频统计
print("正在加载汉字数据库,请稍候...")
pinyin_dict = create_pinyin_dict()
char_frequency = load_or_create_char_frequency()
# 获取用户输入
sentence = input("请输入中文句子:")
# 创建包含错别字的句子
typo_sentence, typo_info = create_typo_sentence(sentence, pinyin_dict, char_frequency,
min_freq=5, tone_error_rate=0.2)
# 打印结果
print("\n原句:", sentence)
print("错字版:", typo_sentence)
if typo_info:
print("\n错别字信息:")
for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info:
tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1]
error_type = "声调错误" if tone_error else "同音字替换"
print(f"原字:{orig}({orig_py}) [频率:{format_frequency(orig_freq)}] -> "
f"错字:{typo}({typo_py}) [频率:{format_frequency(typo_freq)}] [{error_type}]")
# 获取拼音结果
result = get_pinyin(sentence)
# 打印完整拼音
print("\n完整拼音:")
print(" ".join(py for _, py in result))
# 打印所有可能的同音字
print("\n每个字的所有同音字(按频率排序,仅显示频率>=5的字")
for char, py in result:
homophones = get_homophone(char, py, pinyin_dict, char_frequency, min_freq=5)
char_freq = char_frequency.get(char, 0)
print(f"{char}: {py} [频率:{format_frequency(char_freq)}]")
if homophones:
homophone_info = []
for h in homophones:
h_freq = char_frequency.get(h, 0)
homophone_info.append(f"{h}[{format_frequency(h_freq)}]")
print(f"同音字: {''.join(homophone_info)}")
else:
print("没有找到频率>=5的同音字")
if __name__ == "__main__":
main()