diff --git a/src/chat/chat_loop/cycle_processor.py b/src/chat/chat_loop/cycle_processor.py index 179089a7b..4a39ddd0e 100644 --- a/src/chat/chat_loop/cycle_processor.py +++ b/src/chat/chat_loop/cycle_processor.py @@ -52,7 +52,11 @@ class CycleProcessor: actions, ) -> Tuple[Dict[str, Any], str, Dict[str, float]]: with Timer("回复发送", cycle_timers): - reply_text = await self.response_handler.send_response(response_set, loop_start_time, action_message) + reply_text, sent_messages = await self.response_handler.send_response( + response_set, loop_start_time, action_message + ) + if sent_messages: + asyncio.create_task(self.response_handler.handle_typo_correction(sent_messages)) # 存储reply action信息 person_info_manager = get_person_info_manager() @@ -148,7 +152,7 @@ class CycleProcessor: cycle_timers, thinking_id = self.cycle_tracker.start_cycle() logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考") - if ENABLE_S4U: + if ENABLE_S4U and self.context.chat_stream and self.context.chat_stream.user_info: await send_typing(self.context.chat_stream.user_info.user_id) loop_start_time = time.time() @@ -175,7 +179,7 @@ class CycleProcessor: result = await event_manager.trigger_event( EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.chat_stream ) - if not result.all_continue_process(): + if result and not result.all_continue_process(): raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成") with Timer("规划器", cycle_timers): @@ -380,7 +384,7 @@ class CycleProcessor: if fallback_action and fallback_action != action: logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}") action_handler = self.context.action_manager.create_action( - action_name=fallback_action if isinstance(fallback_action, list) else fallback_action, + action_name=str(fallback_action), action_data=action_data, reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}", cycle_timers=cycle_timers, diff --git a/src/chat/chat_loop/response_handler.py b/src/chat/chat_loop/response_handler.py index 123032a78..982b7d54d 100644 --- a/src/chat/chat_loop/response_handler.py +++ b/src/chat/chat_loop/response_handler.py @@ -1,5 +1,6 @@ import time import random +import asyncio from typing import Dict, Any, Tuple from src.common.logger import get_logger @@ -59,7 +60,9 @@ class ResponseHandler: - 构建并返回完整的循环信息 - 用于上级方法的状态跟踪 """ - reply_text = await self.send_response(response_set, loop_start_time, action_message) + reply_text, sent_messages = await self.send_response(response_set, loop_start_time, action_message) + if sent_messages: + asyncio.create_task(self.handle_typo_correction(sent_messages)) person_info_manager = get_person_info_manager() @@ -100,18 +103,17 @@ class ResponseHandler: return loop_info, reply_text, cycle_timers - async def send_response(self, reply_set, thinking_start_time, message_data) -> str: + async def send_response(self, reply_set, thinking_start_time, message_data) -> tuple[str, list[dict[str, str]]]: """ 发送回复内容的具体实现 Args: reply_set: 回复内容集合,包含多个回复段 - reply_to: 回复目标 thinking_start_time: 思考开始时间 message_data: 消息数据 Returns: - str: 完整的回复文本 + tuple[str, list[dict[str, str]]]: (完整的回复文本, 已发送消息列表) 功能说明: - 检查是否有新消息需要回复 @@ -128,19 +130,17 @@ class ResponseHandler: need_reply = new_message_count >= random.randint(2, 4) reply_text = "" + sent_messages = [] is_proactive_thinking = message_data.get("message_type") == "proactive_thinking" first_replied = False for reply_seg in reply_set: - # 调试日志:验证reply_seg的格式 logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}") - # 修正:正确处理元组格式 (格式为: (type, content)) - if isinstance(reply_seg, tuple) and len(reply_seg) >= 2: - _, data = reply_seg + if reply_seg["type"] == "typo": + data = reply_seg["typo"] else: - # 向下兼容:如果已经是字符串,则直接使用 - data = str(reply_seg) + data = reply_seg["content"] reply_text += data @@ -149,7 +149,7 @@ class ResponseHandler: continue if not first_replied: - await send_api.text_to_stream( + sent_message = await send_api.text_to_stream( text=data, stream_id=self.context.stream_id, reply_to_message=message_data, @@ -158,12 +158,32 @@ class ResponseHandler: ) first_replied = True else: - await send_api.text_to_stream( + sent_message = await send_api.text_to_stream( text=data, stream_id=self.context.stream_id, reply_to_message=None, set_reply=False, typing=True, ) + if sent_message and reply_seg["type"] == "typo": + sent_messages.append( + { + "type": "typo", + "message_id": sent_message, + "original_message": message_data, + "correction": reply_seg["correction"], + } + ) - return reply_text + return reply_text, sent_messages + + async def handle_typo_correction(self, sent_messages: list[dict[str, Any]]): + """处理错别字修正""" + for msg in sent_messages: + if msg["type"] == "typo": + await asyncio.sleep(random.uniform(2, 4)) + recalled = await send_api.recall_message(str(msg["message_id"]), self.context.stream_id) + if recalled: + await send_api.text_to_stream( + str(msg["correction"]), self.context.stream_id, reply_to_message=msg["original_message"] + ) diff --git a/src/chat/utils/typo_generator.py b/src/chat/utils/typo_generator.py index 9c3718b2b..c23c4c319 100644 --- a/src/chat/utils/typo_generator.py +++ b/src/chat/utils/typo_generator.py @@ -19,16 +19,22 @@ logger = get_logger("typo_gen") class ChineseTypoGenerator: + """ + 中文错别字生成器。 + 可以根据拼音、字频等信息,为给定的中文句子生成包含错别字的句子。 + 支持单字替换和整词替换。 + """ + def __init__(self, error_rate=0.3, min_freq=5, tone_error_rate=0.2, word_replace_rate=0.3, max_freq_diff=200): """ - 初始化错别字生成器 + 初始化错别字生成器。 - 参数: - error_rate: 单字替换概率 - min_freq: 最小字频阈值 - tone_error_rate: 声调错误概率 - word_replace_rate: 整词替换概率 - max_freq_diff: 最大允许的频率差异 + Args: + error_rate (float): 单个汉字被替换为同音字的概率。 + min_freq (int): 候选替换字的最小词频阈值,低于此阈值的字将被忽略。 + tone_error_rate (float): 在选择同音字时,使用错误声调的概率。 + word_replace_rate (float): 整个词语被替换为同音词的概率。 + max_freq_diff (int): 允许的原始字与替换字之间的最大频率差异。 """ self.error_rate = error_rate self.min_freq = min_freq @@ -36,42 +42,47 @@ class ChineseTypoGenerator: self.word_replace_rate = word_replace_rate self.max_freq_diff = max_freq_diff - # 加载数据 - # print("正在加载汉字数据库,请稍候...") - # logger.info("正在加载汉字数据库,请稍候...") - + # 加载核心数据 + logger.info("正在加载汉字数据库...") self.pinyin_dict = self._create_pinyin_dict() self.char_frequency = self._load_or_create_char_frequency() + logger.info("汉字数据库加载完成。") def _load_or_create_char_frequency(self): """ - 加载或创建汉字频率字典 + 加载或创建汉字频率字典。 + 如果存在缓存文件 `depends-data/char_frequency.json`,则直接加载。 + 否则,通过解析 `jieba` 的词典文件来创建,并保存为缓存。 + + Returns: + dict: 一个将汉字映射到其归一化频率的字典。 """ cache_file = Path("depends-data/char_frequency.json") - # 如果缓存文件存在,直接加载 + # 如果缓存文件存在,则直接从缓存加载,提高效率 if cache_file.exists(): with open(cache_file, "r", encoding="utf-8") as f: return orjson.loads(f.read()) - # 使用内置的词频文件 + # 如果没有缓存,则通过解析jieba词典来创建 char_freq = defaultdict(int) + # 定位jieba内置词典文件的路径 dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt") - # 读取jieba的词典文件 + # 读取jieba词典文件,统计每个汉字的频率 with open(dict_path, "r", encoding="utf-8") as f: for line in f: word, freq = line.strip().split()[:2] - # 对词中的每个字进行频率累加 + # 将词中每个汉字的频率进行累加 for char in word: if self._is_chinese_char(char): char_freq[char] += int(freq) - # 归一化频率值 + # 对频率值进行归一化处理,使其在0-1000的范围内 max_freq = max(char_freq.values()) normalized_freq = {char: freq / max_freq * 1000 for char, freq in char_freq.items()} - # 保存到缓存文件 + # 将计算出的频率数据保存到缓存文件,以便下次快速加载 with open(cache_file, "w", encoding="utf-8") as f: f.write(orjson.dumps(normalized_freq, option=orjson.OPT_INDENT_2).decode("utf-8")) @@ -80,18 +91,24 @@ class ChineseTypoGenerator: @staticmethod def _create_pinyin_dict(): """ - 创建拼音到汉字的映射字典 + 创建从拼音到汉字的映射字典。 + 遍历常用汉字范围,为每个汉字生成带声调的拼音,并构建映射。 + + Returns: + defaultdict: 一个将拼音映射到汉字列表的字典。 """ - # 常用汉字范围 + # 定义常用汉字的Unicode范围 chars = [chr(i) for i in range(0x4E00, 0x9FFF)] pinyin_dict = defaultdict(list) - # 为每个汉字建立拼音映射 + # 为范围内的每个汉字建立拼音到汉字的映射 for char in chars: try: - py = pinyin(char, style=Style.TONE3)[0][0] + # 获取带数字声调的拼音 (e.g., 'hao3') + py = pinyin(char, style=Style.TONE3) pinyin_dict[py].append(char) except Exception: + # 忽略无法转换拼音的字符 continue return pinyin_dict @@ -99,49 +116,62 @@ class ChineseTypoGenerator: @staticmethod def _is_chinese_char(char): """ - 判断是否为汉字 + 判断一个字符是否为中文字符。 + + Args: + char (str): 需要判断的字符。 + + Returns: + bool: 如果是中文字符,返回 True,否则返回 False。 """ try: + # 通过Unicode范围判断是否为中文字符 return "\u4e00" <= char <= "\u9fff" except Exception as e: - logger.debug(str(e)) + logger.debug(f"判断字符 '{char}' 时出错: {e}") return False def _get_pinyin(self, sentence): """ - 将中文句子拆分成单个汉字并获取其拼音 - """ - # 将句子拆分成单个字符 - characters = list(sentence) + 获取一个句子中每个汉字的拼音。 - # 获取每个字符的拼音 + Args: + sentence (str): 输入的中文句子。 + + Returns: + list: 一个元组列表,每个元组包含 (汉字, 拼音)。 + """ + characters = list(sentence) result = [] for char in characters: - # 跳过空格和非汉字字符 - if char.isspace() or not self._is_chinese_char(char): - continue - # 获取拼音(数字声调) - py = pinyin(char, style=Style.TONE3)[0][0] - result.append((char, py)) - + # 忽略所有非中文字符 + if self._is_chinese_char(char): + # 获取带数字声调的拼音 + py = pinyin(char, style=Style.TONE3) + result.append((char, py)) return result @staticmethod def _get_similar_tone_pinyin(py): """ - 获取相似声调的拼音 + 为一个给定的拼音生成一个声调错误的相似拼音。 + + Args: + py (str): 带数字声调的原始拼音 (e.g., 'hao3')。 + + Returns: + str: 一个声调被随机改变的拼音。 """ - # 检查拼音是否为空或无效 + # 检查拼音是否有效 if not py or len(py) < 1: return py - # 如果最后一个字符不是数字,说明可能是轻声或其他特殊情况 + # 如果拼音末尾不是数字(如轻声),则默认添加一声 if not py[-1].isdigit(): - # 为非数字结尾的拼音添加数字声调1 return f"{py}1" - base = py[:-1] # 去掉声调 - tone = int(py[-1]) # 获取声调 + base = py[:-1] # 拼音的基本部分 (e.g., 'hao') + tone = int(py[-1]) # 声调 (e.g., 3) # 处理轻声(通常用5表示)或无效声调 if tone not in [1, 2, 3, 4]: @@ -155,40 +185,56 @@ class ChineseTypoGenerator: def _calculate_replacement_probability(self, orig_freq, target_freq): """ - 根据频率差计算替换概率 + 根据原始字和目标替换字的频率差异,计算替换概率。 + 频率相近的字有更高的替换概率。 + + Args: + orig_freq (float): 原始字的频率。 + target_freq (float): 目标替换字的频率。 + + Returns: + float: 替换概率,介于 0.0 和 1.0 之间。 """ + # 如果目标字更常用,则替换概率为1 if target_freq > orig_freq: - return 1.0 # 如果替换字频率更高,保持原有概率 + return 1.0 freq_diff = orig_freq - target_freq + # 如果频率差异过大,则不进行替换 if freq_diff > self.max_freq_diff: - return 0.0 # 频率差太大,不替换 + return 0.0 - # 使用指数衰减函数计算概率 - # 频率差为0时概率为1,频率差为max_freq_diff时概率接近0 + # 使用指数衰减函数来计算概率,频率差异越大,概率越低 return math.exp(-3 * freq_diff / self.max_freq_diff) def _get_similar_frequency_chars(self, char, py, num_candidates=5): """ - 获取与给定字频率相近的同音字,可能包含声调错误 + 获取与给定汉字发音相似且频率相近的候选替换字。 + + Args: + char (str): 原始汉字。 + py (str): 原始汉字的拼音。 + num_candidates (int): 返回的候选字数量。 + + Returns: + list or None: 一个包含候选替换字的列表,如果没有找到则返回 None。 """ homophones = [] - # 有一定概率使用错误声调 + # 根据设定概率,可能使用声调错误的拼音来寻找候选字 if random.random() < self.tone_error_rate: wrong_tone_py = self._get_similar_tone_pinyin(py) - homophones.extend(self.pinyin_dict[wrong_tone_py]) + homophones.extend(self.pinyin_dict.get(wrong_tone_py, [])) - # 添加正确声调的同音字 - homophones.extend(self.pinyin_dict[py]) + # 添加声调正确的同音字 + homophones.extend(self.pinyin_dict.get(py, [])) if not homophones: return None - # 获取原字的频率 orig_freq = self.char_frequency.get(char, 0) - # 计算所有同音字与原字的频率差,并过滤掉低频字 + # 过滤掉低频字和原始字本身 freq_diff = [ (h, self.char_frequency.get(h, 0)) for h in homophones @@ -202,222 +248,215 @@ class ChineseTypoGenerator: candidates_with_prob = [] for h, freq in freq_diff: prob = self._calculate_replacement_probability(orig_freq, freq) - if prob > 0: # 只保留有效概率的候选字 + if prob > 0: candidates_with_prob.append((h, prob)) if not candidates_with_prob: return None - # 根据概率排序 - candidates_with_prob.sort(key=lambda x: x[1], reverse=True) + # 根据替换概率从高到低排序 + candidates_with_prob.sort(key=lambda x: x, reverse=True) - # 返回概率最高的几个字 - return [char for char, _ in candidates_with_prob[:num_candidates]] + # 返回概率最高的几个候选字 + return [c for c, _ in candidates_with_prob[:num_candidates]] @staticmethod def _get_word_pinyin(word): """ - 获取词语的拼音列表 + 获取一个词语中每个汉字的拼音列表。 + + Args: + word (str): 输入的词语。 + + Returns: + list: 包含每个汉字拼音的列表。 """ - return [py[0] for py in pinyin(word, style=Style.TONE3)] + return [py for py in pinyin(word, style=Style.TONE3)] @staticmethod def _segment_sentence(sentence): """ - 使用jieba分词,返回词语列表 + 使用 jieba 对句子进行分词。 + + Args: + sentence (str): 输入的句子。 + + Returns: + list: 分词后的词语列表。 """ return list(jieba.cut(sentence)) def _get_word_homophones(self, word): """ - 获取整个词的同音词,只返回高频的有意义词语 + 获取一个词语的同音词。 + 只返回在jieba词典中存在且频率较高的有意义词语。 + + Args: + word (str): 原始词语。 + + Returns: + list: 一个包含同音词的列表。 """ - if len(word) == 1: + if len(word) <= 1: return [] - # 获取词的拼音 word_pinyin = self._get_word_pinyin(word) - # 遍历所有可能的同音字组合 + # 为词语中的每个字找到所有同音字 candidates = [] for py in word_pinyin: chars = self.pinyin_dict.get(py, []) if not chars: - return [] + return [] # 如果某个字没有同音字,则无法构成同音词 candidates.append(chars) - # 生成所有可能的组合 + # 生成所有可能的同音字组合 import itertools all_combinations = itertools.product(*candidates) - # 获取jieba词典和词频信息 + # 加载jieba词典以验证组合出的词是否为有效词语 dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt") - valid_words = {} # 改用字典存储词语及其频率 + valid_words = {} with open(dict_path, "r", encoding="utf-8") as f: for line in f: parts = line.strip().split() if len(parts) >= 2: - word_text = parts[0] - word_freq = float(parts[1]) # 获取词频 - valid_words[word_text] = word_freq + valid_words[parts] = float(parts[0][1]) - # 获取原词的词频作为参考 original_word_freq = valid_words.get(word, 0) - min_word_freq = original_word_freq * 0.1 # 设置最小词频为原词频的10% + # 设置一个最小词频阈值,过滤掉非常生僻的词 + min_word_freq = original_word_freq * 0.1 - # 过滤和计算频率 homophones = [] for combo in all_combinations: new_word = "".join(combo) + # 检查新词是否为有效词语且与原词不同 if new_word != word and new_word in valid_words: new_word_freq = valid_words[new_word] - # 只保留词频达到阈值的词 if new_word_freq >= min_word_freq: - # 计算词的平均字频(考虑字频和词频) + # 计算综合评分,结合词频和平均字频 char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word) - # 综合评分:结合词频和字频 combined_score = new_word_freq * 0.7 + char_avg_freq * 0.3 if combined_score >= self.min_freq: homophones.append((new_word, combined_score)) - # 按综合分数排序并限制返回数量 - sorted_homophones = sorted(homophones, key=lambda x: x[1], reverse=True) - return [word for word, _ in sorted_homophones[:5]] # 限制返回前5个结果 + # 按综合分数排序并返回前5个结果 + sorted_homophones = sorted(homophones, key=lambda x: x, reverse=True) + return [w for w, _ in sorted_homophones[:5]] def create_typo_sentence(self, sentence): """ - 创建包含同音字错误的句子,支持词语级别和字级别的替换 + 为输入句子生成一个包含错别字的版本。 + 该方法会先对句子进行分词,然后根据概率进行整词替换或单字替换。 - 参数: - sentence: 输入的中文句子 + Args: + sentence (str): 原始中文句子。 - 返回: - typo_sentence: 包含错别字的句子 - correction_suggestion: 随机选择的一个纠正建议,返回正确的字/词 + Returns: + tuple: 包含三个元素的元组: + - original_sentence (str): 原始句子。 + - typo_sentence (str): 包含错别字的句子。 + - correction_suggestion (str or None): 一个随机的修正建议(可能是正确的字或词),或 None。 """ result = [] - typo_info = [] - word_typos = [] # 记录词语错误对(错词,正确词) - char_typos = [] # 记录单字错误对(错字,正确字) - current_pos = 0 + typo_info = [] # 用于调试,记录详细的替换信息 + word_typos = [] # 记录 (错词, 正确词) + char_typos = [] # 记录 (错字, 正确字) - # 分词 + # 对句子进行分词 words = self._segment_sentence(sentence) for word in words: - # 如果是标点符号或空格,直接添加 + # 如果是标点符号或非中文字符,直接保留 if all(not self._is_chinese_char(c) for c in word): result.append(word) - current_pos += len(word) continue - # 获取词语的拼音 word_pinyin = self._get_word_pinyin(word) - # 尝试整词替换 + # 步骤1: 尝试进行整词替换 if len(word) > 1 and random.random() < self.word_replace_rate: word_homophones = self._get_word_homophones(word) if word_homophones: typo_word = random.choice(word_homophones) - # 计算词的平均频率 orig_freq = sum(self.char_frequency.get(c, 0) for c in word) / len(word) typo_freq = sum(self.char_frequency.get(c, 0) for c in typo_word) / len(typo_word) - # 添加到结果中 result.append(typo_word) typo_info.append( ( word, typo_word, - " ".join(word_pinyin), + " ".join(self._get_word_pinyin(word)), " ".join(self._get_word_pinyin(typo_word)), orig_freq, typo_freq, ) ) - word_typos.append((typo_word, word)) # 记录(错词,正确词)对 - current_pos += len(typo_word) + word_typos.append((typo_word, word)) continue - # 如果不进行整词替换,则进行单字替换 - if len(word) == 1: - char = word - py = word_pinyin[0] - if random.random() < self.error_rate: + # 步骤2: 如果不进行整词替换,则对词中的每个字进行单字替换 + new_word = [] + for char, py in zip(word, word_pinyin, strict=False): + # 词语越长,其中单个字被替换的概率越低 + char_error_rate = self.error_rate * (0.7 ** (len(word) - 1)) + if random.random() < char_error_rate: similar_chars = self._get_similar_frequency_chars(char, py) if similar_chars: typo_char = random.choice(similar_chars) - typo_freq = self.char_frequency.get(typo_char, 0) orig_freq = self.char_frequency.get(char, 0) - replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq) - if random.random() < replace_prob: - result.append(typo_char) - typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] + typo_freq = self.char_frequency.get(typo_char, 0) + # 根据频率计算最终是否替换 + if random.random() < self._calculate_replacement_probability(orig_freq, typo_freq): + new_word.append(typo_char) + typo_py = pinyin(typo_char, style=Style.TONE3) typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) - char_typos.append((typo_char, char)) # 记录(错字,正确字)对 - current_pos += 1 + char_typos.append((typo_char, char)) continue - result.append(char) - current_pos += 1 - else: - # 处理多字词的单字替换 - word_result = [] - for _, (char, py) in enumerate(zip(word, word_pinyin, strict=False)): - # 词中的字替换概率降低 - word_error_rate = self.error_rate * (0.7 ** (len(word) - 1)) + # 如果不替换,则保留原字 + new_word.append(char) - if random.random() < word_error_rate: - similar_chars = self._get_similar_frequency_chars(char, py) - if similar_chars: - typo_char = random.choice(similar_chars) - typo_freq = self.char_frequency.get(typo_char, 0) - orig_freq = self.char_frequency.get(char, 0) - replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq) - if random.random() < replace_prob: - word_result.append(typo_char) - typo_py = pinyin(typo_char, style=Style.TONE3)[0][0] - typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq)) - char_typos.append((typo_char, char)) # 记录(错字,正确字)对 - continue - word_result.append(char) - result.append("".join(word_result)) - current_pos += len(word) + result.append("".join(new_word)) - # 优先从词语错误中选择,如果没有则从单字错误中选择 + # 步骤3: 生成修正建议 correction_suggestion = None - # 50%概率返回纠正建议 + # 有50%的概率提供一个修正建议 if random.random() < 0.5: + # 优先从整词错误中选择 if word_typos: - wrong_word, correct_word = random.choice(word_typos) + _, correct_word = random.choice(word_typos) correction_suggestion = correct_word + # 其次从单字错误中选择 elif char_typos: - wrong_char, correct_char = random.choice(char_typos) + _, correct_char = random.choice(char_typos) correction_suggestion = correct_char - return "".join(result), correction_suggestion + return sentence, "".join(result), correction_suggestion @staticmethod def format_typo_info(typo_info): """ - 格式化错别字信息 + 将错别字生成过程中的详细信息格式化为可读字符串。 - 参数: - typo_info: 错别字信息列表 + Args: + typo_info (list): `create_typo_sentence` 方法生成的详细信息列表。 - 返回: - 格式化后的错别字信息字符串 + Returns: + str: 格式化后的字符串,用于调试和分析。 """ if not typo_info: return "未生成错别字" result = [] for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info: - # 判断是否为词语替换 + # 判断是整词替换还是单字替换 is_word = " " in orig_py if is_word: error_type = "整词替换" else: + # 判断是声调错误还是同音字替换 tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1] error_type = "声调错误" if tone_error else "同音字替换" @@ -430,21 +469,22 @@ class ChineseTypoGenerator: def set_params(self, **kwargs): """ - 设置参数 + 动态设置生成器的参数。 - 可设置参数: - error_rate: 单字替换概率 - min_freq: 最小字频阈值 - tone_error_rate: 声调错误概率 - word_replace_rate: 整词替换概率 - max_freq_diff: 最大允许的频率差异 + Args: + **kwargs: 键值对参数,可设置的参数包括: + - error_rate (float) + - min_freq (int) + - tone_error_rate (float) + - word_replace_rate (float) + - max_freq_diff (int) """ for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) - print(f"参数 {key} 已设置为 {value}") + logger.info(f"参数 {key} 已更新为 {value}") else: - print(f"警告: 参数 {key} 不存在") + logger.warning(f"尝试设置不存在的参数: {key}") def main(): @@ -456,10 +496,10 @@ def main(): # 创建包含错别字的句子 start_time = time.time() - typo_sentence, correction_suggestion = typo_generator.create_typo_sentence(sentence) + original_sentence, typo_sentence, correction_suggestion = typo_generator.create_typo_sentence(sentence) # 打印结果 - print("\n原句:", sentence) + print("\n原句:", original_sentence) print("错字版:", typo_sentence) # 打印纠正建议 diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 501bf382d..dd60ef951 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -293,9 +293,11 @@ def random_remove_punctuation(text: str) -> str: return result -def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese_typo: bool = True) -> list[str]: +def process_llm_response( + text: str, enable_splitter: bool = True, enable_chinese_typo: bool = True +) -> list[dict[str, str]]: if not global_config.response_post_process.enable_response_post_process: - return [text] + return [{"type": "text", "content": text}] # 先保护颜文字 if global_config.response_splitter.enable_kaomoji_protection: @@ -311,7 +313,7 @@ def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese cleaned_text = pattern.sub("", protected_text) if cleaned_text == "": - return ["呃呃"] + return [{"type": "text", "content": "呃呃"}] logger.debug(f"{text}去除括号处理后的文本: {cleaned_text}") @@ -321,7 +323,7 @@ def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese # 如果基本上是中文,则进行长度过滤 if get_western_ratio(cleaned_text) < 0.1 and len(cleaned_text) > max_length: logger.warning(f"回复过长 ({len(cleaned_text)} 字符),返回默认回复") - return ["懒得说"] + return [{"type": "text", "content": "懒得说"}] typo_generator = ChineseTypoGenerator( error_rate=global_config.chinese_typo.error_rate, @@ -338,16 +340,24 @@ def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese sentences = [] for sentence in split_sentences: if global_config.chinese_typo.enable and enable_chinese_typo: - typoed_text, typo_corrections = typo_generator.create_typo_sentence(sentence) - sentences.append(typoed_text) + original_sentence, typo_sentence, typo_corrections = typo_generator.create_typo_sentence(sentence) if typo_corrections: - sentences.append(typo_corrections) + sentences.append( + { + "type": "typo", + "original": original_sentence, + "typo": typo_sentence, + "correction": typo_corrections, + } + ) + else: + sentences.append({"type": "text", "content": sentence}) else: - sentences.append(sentence) + sentences.append({"type": "text", "content": sentence}) if len(sentences) > max_sentence_num: logger.warning(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复") - return [f"{global_config.bot.nickname}不知道哦"] + return [{"type": "text", "content": f"{global_config.bot.nickname}不知道哦"}] # if extracted_contents: # for content in extracted_contents: @@ -355,7 +365,20 @@ def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese # 在所有句子处理完毕后,对包含占位符的列表进行恢复 if global_config.response_splitter.enable_kaomoji_protection: - sentences = recover_kaomoji(sentences, kaomoji_mapping) + # sentences中的元素可能是dict,也可能是str,所以要分开处理 + recovered_sentences = [] + for s in sentences: + if isinstance(s, dict) and s.get("type") == "typo": + s["original"] = recover_kaomoji([s["original"]], kaomoji_mapping) + s["typo"] = recover_kaomoji([s["typo"]], kaomoji_mapping) + s["correction"] = recover_kaomoji([s["correction"]], kaomoji_mapping) + recovered_sentences.append(s) + elif isinstance(s, dict) and s.get("type") == "text": + s["content"] = recover_kaomoji([s["content"]], kaomoji_mapping) + recovered_sentences.append(s) + else: + recovered_sentences.append(recover_kaomoji([s], kaomoji_mapping)) + sentences = recovered_sentences return sentences diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index b20909e77..3ea507cc0 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -83,7 +83,7 @@ async def generate_reply( return_prompt: bool = False, request_type: str = "generator_api", from_plugin: bool = True, -) -> Tuple[bool, List[Tuple[str, Any]], Optional[str]]: +) -> Tuple[bool, List[Dict[str, Any]], Optional[str]]: """生成回复 Args: @@ -167,7 +167,7 @@ async def rewrite_reply( reply_to: str = "", return_prompt: bool = False, request_type: str = "generator_api", -) -> Tuple[bool, List[Tuple[str, Any]], Optional[str]]: +) -> Tuple[bool, List[Dict[str, Any]], Optional[str]]: """重写回复 Args: @@ -225,7 +225,9 @@ async def rewrite_reply( return False, [], None -def process_human_text(content: str, enable_splitter: bool, enable_chinese_typo: bool) -> List[Tuple[str, Any]]: +def process_human_text( + content: str, enable_splitter: bool, enable_chinese_typo: bool +) -> List[Dict[str, Any]]: """将文本处理为更拟人化的文本 Args: @@ -239,9 +241,11 @@ def process_human_text(content: str, enable_splitter: bool, enable_chinese_typo: processed_response = process_llm_response(content, enable_splitter, enable_chinese_typo) reply_set = [] - for text in processed_response: - reply_seg = ("text", text) - reply_set.append(reply_seg) + for item in processed_response: + if item["type"] == "typo": + reply_set.append(item) + else: + reply_set.append({"type": "text", "content": item["content"]}) return reply_set diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index 7a4d371a2..9c629fc3b 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -178,7 +178,7 @@ async def _send_to_target( # 构建机器人用户信息 bot_user_info = UserInfo( - user_id=global_config.bot.qq_account, + user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=target_stream.platform, ) @@ -188,10 +188,13 @@ async def _send_to_target( if reply_to_message: anchor_message = message_dict_to_message_recv(message_dict=reply_to_message) - anchor_message.update_chat_stream(target_stream) - reply_to_platform_id = ( - f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}" - ) + if anchor_message and anchor_message.message_info and anchor_message.message_info.user_info: + anchor_message.update_chat_stream(target_stream) + reply_to_platform_id = ( + f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}" + ) + else: + reply_to_platform_id = None else: anchor_message = None reply_to_platform_id = None @@ -421,10 +424,10 @@ async def adapter_command_to_stream( # 创建临时的用户信息和聊天流 - temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) + temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform or "qq") temp_chat_stream = ChatStream( - stream_id=stream_id, platform=platform, user_info=temp_user_info, group_info=None + stream_id=stream_id, platform=platform or "qq", user_info=temp_user_info, group_info=None ) target_stream = temp_chat_stream @@ -441,7 +444,7 @@ async def adapter_command_to_stream( # 构建机器人用户信息 bot_user_info = UserInfo( - user_id=global_config.bot.qq_account, + user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=target_stream.platform, ) @@ -494,3 +497,21 @@ async def adapter_command_to_stream( logger.error(f"[SendAPI] 发送适配器命令时出错: {e}") traceback.print_exc() return {"status": "error", "message": f"发送适配器命令时出错: {str(e)}"} + + +async def recall_message(message_id: str, stream_id: str) -> bool: + """撤回消息 + + Args: + message_id: 消息ID + stream_id: 聊天流ID + + Returns: + bool: 是否成功 + """ + response = await adapter_command_to_stream( + action="delete_msg", + params={"message_id": message_id}, + stream_id=stream_id, + ) + return response.get("status") == "ok"