committed by
Windpicker-owo
parent
086a7322b3
commit
796daf8ddc
@@ -71,12 +71,7 @@ class CycleProcessor:
|
|||||||
"""
|
"""
|
||||||
# 发送回复
|
# 发送回复
|
||||||
with Timer("回复发送", cycle_timers):
|
with Timer("回复发送", cycle_timers):
|
||||||
reply_text, sent_messages = await self.response_handler.send_response(
|
reply_text = await self.response_handler.send_response(response_set, loop_start_time, action_message)
|
||||||
response_set, loop_start_time, action_message
|
|
||||||
)
|
|
||||||
if sent_messages:
|
|
||||||
# 异步处理错别字修正
|
|
||||||
asyncio.create_task(self.response_handler.handle_typo_correction(sent_messages))
|
|
||||||
|
|
||||||
# 存储reply action信息
|
# 存储reply action信息
|
||||||
person_info_manager = get_person_info_manager()
|
person_info_manager = get_person_info_manager()
|
||||||
@@ -185,8 +180,7 @@ class CycleProcessor:
|
|||||||
cycle_timers, thinking_id = self.cycle_tracker.start_cycle()
|
cycle_timers, thinking_id = self.cycle_tracker.start_cycle()
|
||||||
logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考")
|
logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考")
|
||||||
|
|
||||||
# 发送正在输入状态
|
if ENABLE_S4U:
|
||||||
if ENABLE_S4U and self.context.chat_stream and self.context.chat_stream.user_info:
|
|
||||||
await send_typing(self.context.chat_stream.user_info.user_id)
|
await send_typing(self.context.chat_stream.user_info.user_id)
|
||||||
|
|
||||||
loop_start_time = time.time()
|
loop_start_time = time.time()
|
||||||
@@ -214,7 +208,7 @@ class CycleProcessor:
|
|||||||
result = await event_manager.trigger_event(
|
result = await event_manager.trigger_event(
|
||||||
EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.chat_stream
|
EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.chat_stream
|
||||||
)
|
)
|
||||||
if result and not result.all_continue_process():
|
if not result.all_continue_process():
|
||||||
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成")
|
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成")
|
||||||
|
|
||||||
# 规划动作
|
# 规划动作
|
||||||
@@ -427,7 +421,7 @@ class CycleProcessor:
|
|||||||
if fallback_action and fallback_action != action:
|
if fallback_action and fallback_action != action:
|
||||||
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
|
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
|
||||||
action_handler = self.context.action_manager.create_action(
|
action_handler = self.context.action_manager.create_action(
|
||||||
action_name=str(fallback_action),
|
action_name=fallback_action if isinstance(fallback_action, list) else fallback_action,
|
||||||
action_data=action_data,
|
action_data=action_data,
|
||||||
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
|
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
|
||||||
cycle_timers=cycle_timers,
|
cycle_timers=cycle_timers,
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import time
|
import time
|
||||||
import random
|
import random
|
||||||
import asyncio
|
|
||||||
from typing import Dict, Any, Tuple
|
from typing import Dict, Any, Tuple
|
||||||
|
|
||||||
from src.common.logger import get_logger
|
from src.common.logger import get_logger
|
||||||
@@ -64,11 +63,7 @@ class ResponseHandler:
|
|||||||
- 构建并返回完整的循环信息
|
- 构建并返回完整的循环信息
|
||||||
- 用于上级方法的状态跟踪
|
- 用于上级方法的状态跟踪
|
||||||
"""
|
"""
|
||||||
# 发送回复
|
reply_text = await self.send_response(response_set, loop_start_time, action_message)
|
||||||
reply_text, sent_messages = await self.send_response(response_set, loop_start_time, action_message)
|
|
||||||
if sent_messages:
|
|
||||||
# 异步处理错别字修正
|
|
||||||
asyncio.create_task(self.handle_typo_correction(sent_messages))
|
|
||||||
|
|
||||||
person_info_manager = get_person_info_manager()
|
person_info_manager = get_person_info_manager()
|
||||||
|
|
||||||
@@ -113,17 +108,18 @@ class ResponseHandler:
|
|||||||
|
|
||||||
return loop_info, reply_text, cycle_timers
|
return loop_info, reply_text, cycle_timers
|
||||||
|
|
||||||
async def send_response(self, reply_set, thinking_start_time, message_data) -> tuple[str, list[dict[str, str]]]:
|
async def send_response(self, reply_set, thinking_start_time, message_data) -> str:
|
||||||
"""
|
"""
|
||||||
发送回复内容的具体实现
|
发送回复内容的具体实现
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
reply_set: 回复内容集合,包含多个回复段
|
reply_set: 回复内容集合,包含多个回复段
|
||||||
|
reply_to: 回复目标
|
||||||
thinking_start_time: 思考开始时间
|
thinking_start_time: 思考开始时间
|
||||||
message_data: 消息数据
|
message_data: 消息数据
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
tuple[str, list[dict[str, str]]]: (完整的回复文本, 已发送消息列表)
|
str: 完整的回复文本
|
||||||
|
|
||||||
功能说明:
|
功能说明:
|
||||||
- 检查是否有新消息需要回复
|
- 检查是否有新消息需要回复
|
||||||
@@ -142,18 +138,19 @@ class ResponseHandler:
|
|||||||
need_reply = new_message_count >= random.randint(2, 4)
|
need_reply = new_message_count >= random.randint(2, 4)
|
||||||
|
|
||||||
reply_text = ""
|
reply_text = ""
|
||||||
sent_messages = []
|
|
||||||
is_proactive_thinking = message_data.get("message_type") == "proactive_thinking"
|
is_proactive_thinking = message_data.get("message_type") == "proactive_thinking"
|
||||||
|
|
||||||
first_replied = False
|
first_replied = False
|
||||||
for reply_seg in reply_set:
|
for reply_seg in reply_set:
|
||||||
|
# 调试日志:验证reply_seg的格式
|
||||||
logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}")
|
logger.debug(f"Processing reply_seg type: {type(reply_seg)}, content: {reply_seg}")
|
||||||
|
|
||||||
# 提取回复内容
|
# 修正:正确处理元组格式 (格式为: (type, content))
|
||||||
if reply_seg["type"] == "typo":
|
if isinstance(reply_seg, tuple) and len(reply_seg) >= 2:
|
||||||
data = reply_seg["typo"]
|
_, data = reply_seg
|
||||||
else:
|
else:
|
||||||
data = reply_seg["content"]
|
# 向下兼容:如果已经是字符串,则直接使用
|
||||||
|
data = str(reply_seg)
|
||||||
|
|
||||||
if isinstance(data, list):
|
if isinstance(data, list):
|
||||||
data = "".join(map(str, data))
|
data = "".join(map(str, data))
|
||||||
@@ -166,7 +163,7 @@ class ResponseHandler:
|
|||||||
|
|
||||||
# 发送第一段回复
|
# 发送第一段回复
|
||||||
if not first_replied:
|
if not first_replied:
|
||||||
sent_message = await send_api.text_to_stream(
|
await send_api.text_to_stream(
|
||||||
text=data,
|
text=data,
|
||||||
stream_id=self.context.stream_id,
|
stream_id=self.context.stream_id,
|
||||||
reply_to_message=message_data,
|
reply_to_message=message_data,
|
||||||
@@ -183,29 +180,5 @@ class ResponseHandler:
|
|||||||
set_reply=False,
|
set_reply=False,
|
||||||
typing=True,
|
typing=True,
|
||||||
)
|
)
|
||||||
# 记录已发送的错别字消息
|
|
||||||
if sent_message and reply_seg["type"] == "typo":
|
|
||||||
sent_messages.append(
|
|
||||||
{
|
|
||||||
"type": "typo",
|
|
||||||
"message_id": sent_message,
|
|
||||||
"original_message": message_data,
|
|
||||||
"correction": reply_seg["correction"],
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return reply_text, sent_messages
|
return reply_text
|
||||||
|
|
||||||
async def handle_typo_correction(self, sent_messages: list[dict[str, Any]]):
|
|
||||||
"""处理错别字修正"""
|
|
||||||
for msg in sent_messages:
|
|
||||||
if msg["type"] == "typo":
|
|
||||||
# 随机等待一段时间
|
|
||||||
await asyncio.sleep(random.uniform(2, 4))
|
|
||||||
# 撤回消息
|
|
||||||
recalled = await send_api.recall_message(str(msg["message_id"]), self.context.stream_id)
|
|
||||||
if recalled:
|
|
||||||
# 发送修正后的消息
|
|
||||||
await send_api.text_to_stream(
|
|
||||||
str(msg["correction"]), self.context.stream_id, reply_to_message=msg["original_message"]
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -19,22 +19,16 @@ logger = get_logger("typo_gen")
|
|||||||
|
|
||||||
|
|
||||||
class ChineseTypoGenerator:
|
class ChineseTypoGenerator:
|
||||||
"""
|
|
||||||
中文错别字生成器。
|
|
||||||
可以根据拼音、字频等信息,为给定的中文句子生成包含错别字的句子。
|
|
||||||
支持单字替换和整词替换。
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, error_rate=0.3, min_freq=5, tone_error_rate=0.2, word_replace_rate=0.3, max_freq_diff=200):
|
def __init__(self, error_rate=0.3, min_freq=5, tone_error_rate=0.2, word_replace_rate=0.3, max_freq_diff=200):
|
||||||
"""
|
"""
|
||||||
初始化错别字生成器。
|
初始化错别字生成器
|
||||||
|
|
||||||
Args:
|
参数:
|
||||||
error_rate (float): 单个汉字被替换为同音字的概率。
|
error_rate: 单字替换概率
|
||||||
min_freq (int): 候选替换字的最小词频阈值,低于此阈值的字将被忽略。
|
min_freq: 最小字频阈值
|
||||||
tone_error_rate (float): 在选择同音字时,使用错误声调的概率。
|
tone_error_rate: 声调错误概率
|
||||||
word_replace_rate (float): 整个词语被替换为同音词的概率。
|
word_replace_rate: 整词替换概率
|
||||||
max_freq_diff (int): 允许的原始字与替换字之间的最大频率差异。
|
max_freq_diff: 最大允许的频率差异
|
||||||
"""
|
"""
|
||||||
self.error_rate = error_rate
|
self.error_rate = error_rate
|
||||||
self.min_freq = min_freq
|
self.min_freq = min_freq
|
||||||
@@ -42,47 +36,42 @@ class ChineseTypoGenerator:
|
|||||||
self.word_replace_rate = word_replace_rate
|
self.word_replace_rate = word_replace_rate
|
||||||
self.max_freq_diff = max_freq_diff
|
self.max_freq_diff = max_freq_diff
|
||||||
|
|
||||||
# 加载核心数据
|
# 加载数据
|
||||||
logger.info("正在加载汉字数据库...")
|
# print("正在加载汉字数据库,请稍候...")
|
||||||
|
# logger.info("正在加载汉字数据库,请稍候...")
|
||||||
|
|
||||||
self.pinyin_dict = self._create_pinyin_dict()
|
self.pinyin_dict = self._create_pinyin_dict()
|
||||||
self.char_frequency = self._load_or_create_char_frequency()
|
self.char_frequency = self._load_or_create_char_frequency()
|
||||||
logger.info("汉字数据库加载完成。")
|
|
||||||
|
|
||||||
def _load_or_create_char_frequency(self):
|
def _load_or_create_char_frequency(self):
|
||||||
"""
|
"""
|
||||||
加载或创建汉字频率字典。
|
加载或创建汉字频率字典
|
||||||
如果存在缓存文件 `depends-data/char_frequency.json`,则直接加载。
|
|
||||||
否则,通过解析 `jieba` 的词典文件来创建,并保存为缓存。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: 一个将汉字映射到其归一化频率的字典。
|
|
||||||
"""
|
"""
|
||||||
cache_file = Path("depends-data/char_frequency.json")
|
cache_file = Path("depends-data/char_frequency.json")
|
||||||
|
|
||||||
# 如果缓存文件存在,则直接从缓存加载,提高效率
|
# 如果缓存文件存在,直接加载
|
||||||
if cache_file.exists():
|
if cache_file.exists():
|
||||||
with open(cache_file, "r", encoding="utf-8") as f:
|
with open(cache_file, "r", encoding="utf-8") as f:
|
||||||
return orjson.loads(f.read())
|
return orjson.loads(f.read())
|
||||||
|
|
||||||
# 如果没有缓存,则通过解析jieba词典来创建
|
# 使用内置的词频文件
|
||||||
char_freq = defaultdict(int)
|
char_freq = defaultdict(int)
|
||||||
# 定位jieba内置词典文件的路径
|
|
||||||
dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt")
|
dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt")
|
||||||
|
|
||||||
# 读取jieba词典文件,统计每个汉字的频率
|
# 读取jieba的词典文件
|
||||||
with open(dict_path, "r", encoding="utf-8") as f:
|
with open(dict_path, "r", encoding="utf-8") as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
word, freq = line.strip().split()[:2]
|
word, freq = line.strip().split()[:2]
|
||||||
# 将词中每个汉字的频率进行累加
|
# 对词中的每个字进行频率累加
|
||||||
for char in word:
|
for char in word:
|
||||||
if self._is_chinese_char(char):
|
if self._is_chinese_char(char):
|
||||||
char_freq[char] += int(freq)
|
char_freq[char] += int(freq)
|
||||||
|
|
||||||
# 对频率值进行归一化处理,使其在0-1000的范围内
|
# 归一化频率值
|
||||||
max_freq = max(char_freq.values())
|
max_freq = max(char_freq.values())
|
||||||
normalized_freq = {char: freq / max_freq * 1000 for char, freq in char_freq.items()}
|
normalized_freq = {char: freq / max_freq * 1000 for char, freq in char_freq.items()}
|
||||||
|
|
||||||
# 将计算出的频率数据保存到缓存文件,以便下次快速加载
|
# 保存到缓存文件
|
||||||
with open(cache_file, "w", encoding="utf-8") as f:
|
with open(cache_file, "w", encoding="utf-8") as f:
|
||||||
f.write(orjson.dumps(normalized_freq, option=orjson.OPT_INDENT_2).decode("utf-8"))
|
f.write(orjson.dumps(normalized_freq, option=orjson.OPT_INDENT_2).decode("utf-8"))
|
||||||
|
|
||||||
@@ -91,24 +80,18 @@ class ChineseTypoGenerator:
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _create_pinyin_dict():
|
def _create_pinyin_dict():
|
||||||
"""
|
"""
|
||||||
创建从拼音到汉字的映射字典。
|
创建拼音到汉字的映射字典
|
||||||
遍历常用汉字范围,为每个汉字生成带声调的拼音,并构建映射。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
defaultdict: 一个将拼音映射到汉字列表的字典。
|
|
||||||
"""
|
"""
|
||||||
# 定义常用汉字的Unicode范围
|
# 常用汉字范围
|
||||||
chars = [chr(i) for i in range(0x4E00, 0x9FFF)]
|
chars = [chr(i) for i in range(0x4E00, 0x9FFF)]
|
||||||
pinyin_dict = defaultdict(list)
|
pinyin_dict = defaultdict(list)
|
||||||
|
|
||||||
# 为范围内的每个汉字建立拼音到汉字的映射
|
# 为每个汉字建立拼音映射
|
||||||
for char in chars:
|
for char in chars:
|
||||||
try:
|
try:
|
||||||
# 获取带数字声调的拼音 (e.g., 'hao3')
|
py = pinyin(char, style=Style.TONE3)[0][0]
|
||||||
py = pinyin(char, style=Style.TONE3)
|
|
||||||
pinyin_dict[py].append(char)
|
pinyin_dict[py].append(char)
|
||||||
except Exception:
|
except Exception:
|
||||||
# 忽略无法转换拼音的字符
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return pinyin_dict
|
return pinyin_dict
|
||||||
@@ -116,62 +99,49 @@ class ChineseTypoGenerator:
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _is_chinese_char(char):
|
def _is_chinese_char(char):
|
||||||
"""
|
"""
|
||||||
判断一个字符是否为中文字符。
|
判断是否为汉字
|
||||||
|
|
||||||
Args:
|
|
||||||
char (str): 需要判断的字符。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: 如果是中文字符,返回 True,否则返回 False。
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# 通过Unicode范围判断是否为中文字符
|
|
||||||
return "\u4e00" <= char <= "\u9fff"
|
return "\u4e00" <= char <= "\u9fff"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f"判断字符 '{char}' 时出错: {e}")
|
logger.debug(str(e))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_pinyin(self, sentence):
|
def _get_pinyin(self, sentence):
|
||||||
"""
|
"""
|
||||||
获取一个句子中每个汉字的拼音。
|
将中文句子拆分成单个汉字并获取其拼音
|
||||||
|
|
||||||
Args:
|
|
||||||
sentence (str): 输入的中文句子。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: 一个元组列表,每个元组包含 (汉字, 拼音)。
|
|
||||||
"""
|
"""
|
||||||
|
# 将句子拆分成单个字符
|
||||||
characters = list(sentence)
|
characters = list(sentence)
|
||||||
|
|
||||||
|
# 获取每个字符的拼音
|
||||||
result = []
|
result = []
|
||||||
for char in characters:
|
for char in characters:
|
||||||
# 忽略所有非中文字符
|
# 跳过空格和非汉字字符
|
||||||
if self._is_chinese_char(char):
|
if char.isspace() or not self._is_chinese_char(char):
|
||||||
# 获取带数字声调的拼音
|
continue
|
||||||
py = pinyin(char, style=Style.TONE3)
|
# 获取拼音(数字声调)
|
||||||
|
py = pinyin(char, style=Style.TONE3)[0][0]
|
||||||
result.append((char, py))
|
result.append((char, py))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_similar_tone_pinyin(py):
|
def _get_similar_tone_pinyin(py):
|
||||||
"""
|
"""
|
||||||
为一个给定的拼音生成一个声调错误的相似拼音。
|
获取相似声调的拼音
|
||||||
|
|
||||||
Args:
|
|
||||||
py (str): 带数字声调的原始拼音 (e.g., 'hao3')。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: 一个声调被随机改变的拼音。
|
|
||||||
"""
|
"""
|
||||||
# 检查拼音是否有效
|
# 检查拼音是否为空或无效
|
||||||
if not py or len(py) < 1:
|
if not py or len(py) < 1:
|
||||||
return py
|
return py
|
||||||
|
|
||||||
# 如果拼音末尾不是数字(如轻声),则默认添加一声
|
# 如果最后一个字符不是数字,说明可能是轻声或其他特殊情况
|
||||||
if not py[-1].isdigit():
|
if not py[-1].isdigit():
|
||||||
|
# 为非数字结尾的拼音添加数字声调1
|
||||||
return f"{py}1"
|
return f"{py}1"
|
||||||
|
|
||||||
base = py[:-1] # 拼音的基本部分 (e.g., 'hao')
|
base = py[:-1] # 去掉声调
|
||||||
tone = int(py[-1]) # 声调 (e.g., 3)
|
tone = int(py[-1]) # 获取声调
|
||||||
|
|
||||||
# 处理轻声(通常用5表示)或无效声调
|
# 处理轻声(通常用5表示)或无效声调
|
||||||
if tone not in [1, 2, 3, 4]:
|
if tone not in [1, 2, 3, 4]:
|
||||||
@@ -185,56 +155,40 @@ class ChineseTypoGenerator:
|
|||||||
|
|
||||||
def _calculate_replacement_probability(self, orig_freq, target_freq):
|
def _calculate_replacement_probability(self, orig_freq, target_freq):
|
||||||
"""
|
"""
|
||||||
根据原始字和目标替换字的频率差异,计算替换概率。
|
根据频率差计算替换概率
|
||||||
频率相近的字有更高的替换概率。
|
|
||||||
|
|
||||||
Args:
|
|
||||||
orig_freq (float): 原始字的频率。
|
|
||||||
target_freq (float): 目标替换字的频率。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
float: 替换概率,介于 0.0 和 1.0 之间。
|
|
||||||
"""
|
"""
|
||||||
# 如果目标字更常用,则替换概率为1
|
|
||||||
if target_freq > orig_freq:
|
if target_freq > orig_freq:
|
||||||
return 1.0
|
return 1.0 # 如果替换字频率更高,保持原有概率
|
||||||
|
|
||||||
freq_diff = orig_freq - target_freq
|
freq_diff = orig_freq - target_freq
|
||||||
# 如果频率差异过大,则不进行替换
|
|
||||||
if freq_diff > self.max_freq_diff:
|
if freq_diff > self.max_freq_diff:
|
||||||
return 0.0
|
return 0.0 # 频率差太大,不替换
|
||||||
|
|
||||||
# 使用指数衰减函数来计算概率,频率差异越大,概率越低
|
# 使用指数衰减函数计算概率
|
||||||
|
# 频率差为0时概率为1,频率差为max_freq_diff时概率接近0
|
||||||
return math.exp(-3 * freq_diff / self.max_freq_diff)
|
return math.exp(-3 * freq_diff / self.max_freq_diff)
|
||||||
|
|
||||||
def _get_similar_frequency_chars(self, char, py, num_candidates=5):
|
def _get_similar_frequency_chars(self, char, py, num_candidates=5):
|
||||||
"""
|
"""
|
||||||
获取与给定汉字发音相似且频率相近的候选替换字。
|
获取与给定字频率相近的同音字,可能包含声调错误
|
||||||
|
|
||||||
Args:
|
|
||||||
char (str): 原始汉字。
|
|
||||||
py (str): 原始汉字的拼音。
|
|
||||||
num_candidates (int): 返回的候选字数量。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list or None: 一个包含候选替换字的列表,如果没有找到则返回 None。
|
|
||||||
"""
|
"""
|
||||||
homophones = []
|
homophones = []
|
||||||
|
|
||||||
# 根据设定概率,可能使用声调错误的拼音来寻找候选字
|
# 有一定概率使用错误声调
|
||||||
if random.random() < self.tone_error_rate:
|
if random.random() < self.tone_error_rate:
|
||||||
wrong_tone_py = self._get_similar_tone_pinyin(py)
|
wrong_tone_py = self._get_similar_tone_pinyin(py)
|
||||||
homophones.extend(self.pinyin_dict.get(wrong_tone_py, []))
|
homophones.extend(self.pinyin_dict[wrong_tone_py])
|
||||||
|
|
||||||
# 添加声调正确的同音字
|
# 添加正确声调的同音字
|
||||||
homophones.extend(self.pinyin_dict.get(py, []))
|
homophones.extend(self.pinyin_dict[py])
|
||||||
|
|
||||||
if not homophones:
|
if not homophones:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# 获取原字的频率
|
||||||
orig_freq = self.char_frequency.get(char, 0)
|
orig_freq = self.char_frequency.get(char, 0)
|
||||||
|
|
||||||
# 过滤掉低频字和原始字本身
|
# 计算所有同音字与原字的频率差,并过滤掉低频字
|
||||||
freq_diff = [
|
freq_diff = [
|
||||||
(h, self.char_frequency.get(h, 0))
|
(h, self.char_frequency.get(h, 0))
|
||||||
for h in homophones
|
for h in homophones
|
||||||
@@ -248,215 +202,222 @@ class ChineseTypoGenerator:
|
|||||||
candidates_with_prob = []
|
candidates_with_prob = []
|
||||||
for h, freq in freq_diff:
|
for h, freq in freq_diff:
|
||||||
prob = self._calculate_replacement_probability(orig_freq, freq)
|
prob = self._calculate_replacement_probability(orig_freq, freq)
|
||||||
if prob > 0:
|
if prob > 0: # 只保留有效概率的候选字
|
||||||
candidates_with_prob.append((h, prob))
|
candidates_with_prob.append((h, prob))
|
||||||
|
|
||||||
if not candidates_with_prob:
|
if not candidates_with_prob:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# 根据替换概率从高到低排序
|
# 根据概率排序
|
||||||
candidates_with_prob.sort(key=lambda x: x, reverse=True)
|
candidates_with_prob.sort(key=lambda x: x[1], reverse=True)
|
||||||
|
|
||||||
# 返回概率最高的几个候选字
|
# 返回概率最高的几个字
|
||||||
return [c for c, _ in candidates_with_prob[:num_candidates]]
|
return [char for char, _ in candidates_with_prob[:num_candidates]]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_word_pinyin(word):
|
def _get_word_pinyin(word):
|
||||||
"""
|
"""
|
||||||
获取一个词语中每个汉字的拼音列表。
|
获取词语的拼音列表
|
||||||
|
|
||||||
Args:
|
|
||||||
word (str): 输入的词语。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: 包含每个汉字拼音的列表。
|
|
||||||
"""
|
"""
|
||||||
return [py for py in pinyin(word, style=Style.TONE3)]
|
return [py[0] for py in pinyin(word, style=Style.TONE3)]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _segment_sentence(sentence):
|
def _segment_sentence(sentence):
|
||||||
"""
|
"""
|
||||||
使用 jieba 对句子进行分词。
|
使用jieba分词,返回词语列表
|
||||||
|
|
||||||
Args:
|
|
||||||
sentence (str): 输入的句子。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: 分词后的词语列表。
|
|
||||||
"""
|
"""
|
||||||
return list(jieba.cut(sentence))
|
return list(jieba.cut(sentence))
|
||||||
|
|
||||||
def _get_word_homophones(self, word):
|
def _get_word_homophones(self, word):
|
||||||
"""
|
"""
|
||||||
获取一个词语的同音词。
|
获取整个词的同音词,只返回高频的有意义词语
|
||||||
只返回在jieba词典中存在且频率较高的有意义词语。
|
|
||||||
|
|
||||||
Args:
|
|
||||||
word (str): 原始词语。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: 一个包含同音词的列表。
|
|
||||||
"""
|
"""
|
||||||
if len(word) <= 1:
|
if len(word) == 1:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
# 获取词的拼音
|
||||||
word_pinyin = self._get_word_pinyin(word)
|
word_pinyin = self._get_word_pinyin(word)
|
||||||
|
|
||||||
# 为词语中的每个字找到所有同音字
|
# 遍历所有可能的同音字组合
|
||||||
candidates = []
|
candidates = []
|
||||||
for py in word_pinyin:
|
for py in word_pinyin:
|
||||||
chars = self.pinyin_dict.get(py, [])
|
chars = self.pinyin_dict.get(py, [])
|
||||||
if not chars:
|
if not chars:
|
||||||
return [] # 如果某个字没有同音字,则无法构成同音词
|
return []
|
||||||
candidates.append(chars)
|
candidates.append(chars)
|
||||||
|
|
||||||
# 生成所有可能的同音字组合
|
# 生成所有可能的组合
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
all_combinations = itertools.product(*candidates)
|
all_combinations = itertools.product(*candidates)
|
||||||
|
|
||||||
# 加载jieba词典以验证组合出的词是否为有效词语
|
# 获取jieba词典和词频信息
|
||||||
dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt")
|
dict_path = os.path.join(os.path.dirname(jieba.__file__), "dict.txt")
|
||||||
valid_words = {}
|
valid_words = {} # 改用字典存储词语及其频率
|
||||||
with open(dict_path, "r", encoding="utf-8") as f:
|
with open(dict_path, "r", encoding="utf-8") as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
parts = line.strip().split()
|
parts = line.strip().split()
|
||||||
if len(parts) >= 2:
|
if len(parts) >= 2:
|
||||||
valid_words[parts] = float(parts[0][1])
|
word_text = parts[0]
|
||||||
|
word_freq = float(parts[1]) # 获取词频
|
||||||
|
valid_words[word_text] = word_freq
|
||||||
|
|
||||||
|
# 获取原词的词频作为参考
|
||||||
original_word_freq = valid_words.get(word, 0)
|
original_word_freq = valid_words.get(word, 0)
|
||||||
# 设置一个最小词频阈值,过滤掉非常生僻的词
|
min_word_freq = original_word_freq * 0.1 # 设置最小词频为原词频的10%
|
||||||
min_word_freq = original_word_freq * 0.1
|
|
||||||
|
|
||||||
|
# 过滤和计算频率
|
||||||
homophones = []
|
homophones = []
|
||||||
for combo in all_combinations:
|
for combo in all_combinations:
|
||||||
new_word = "".join(combo)
|
new_word = "".join(combo)
|
||||||
# 检查新词是否为有效词语且与原词不同
|
|
||||||
if new_word != word and new_word in valid_words:
|
if new_word != word and new_word in valid_words:
|
||||||
new_word_freq = valid_words[new_word]
|
new_word_freq = valid_words[new_word]
|
||||||
|
# 只保留词频达到阈值的词
|
||||||
if new_word_freq >= min_word_freq:
|
if new_word_freq >= min_word_freq:
|
||||||
# 计算综合评分,结合词频和平均字频
|
# 计算词的平均字频(考虑字频和词频)
|
||||||
char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word)
|
char_avg_freq = sum(self.char_frequency.get(c, 0) for c in new_word) / len(new_word)
|
||||||
|
# 综合评分:结合词频和字频
|
||||||
combined_score = new_word_freq * 0.7 + char_avg_freq * 0.3
|
combined_score = new_word_freq * 0.7 + char_avg_freq * 0.3
|
||||||
if combined_score >= self.min_freq:
|
if combined_score >= self.min_freq:
|
||||||
homophones.append((new_word, combined_score))
|
homophones.append((new_word, combined_score))
|
||||||
|
|
||||||
# 按综合分数排序并返回前5个结果
|
# 按综合分数排序并限制返回数量
|
||||||
sorted_homophones = sorted(homophones, key=lambda x: x, reverse=True)
|
sorted_homophones = sorted(homophones, key=lambda x: x[1], reverse=True)
|
||||||
return [w for w, _ in sorted_homophones[:5]]
|
return [word for word, _ in sorted_homophones[:5]] # 限制返回前5个结果
|
||||||
|
|
||||||
def create_typo_sentence(self, sentence):
|
def create_typo_sentence(self, sentence):
|
||||||
"""
|
"""
|
||||||
为输入句子生成一个包含错别字的版本。
|
创建包含同音字错误的句子,支持词语级别和字级别的替换
|
||||||
该方法会先对句子进行分词,然后根据概率进行整词替换或单字替换。
|
|
||||||
|
|
||||||
Args:
|
参数:
|
||||||
sentence (str): 原始中文句子。
|
sentence: 输入的中文句子
|
||||||
|
|
||||||
Returns:
|
返回:
|
||||||
tuple: 包含三个元素的元组:
|
typo_sentence: 包含错别字的句子
|
||||||
- original_sentence (str): 原始句子。
|
correction_suggestion: 随机选择的一个纠正建议,返回正确的字/词
|
||||||
- typo_sentence (str): 包含错别字的句子。
|
|
||||||
- correction_suggestion (str or None): 一个随机的修正建议(可能是正确的字或词),或 None。
|
|
||||||
"""
|
"""
|
||||||
result = []
|
result = []
|
||||||
typo_info = [] # 用于调试,记录详细的替换信息
|
typo_info = []
|
||||||
word_typos = [] # 记录 (错词, 正确词)
|
word_typos = [] # 记录词语错误对(错词,正确词)
|
||||||
char_typos = [] # 记录 (错字, 正确字)
|
char_typos = [] # 记录单字错误对(错字,正确字)
|
||||||
|
current_pos = 0
|
||||||
|
|
||||||
# 对句子进行分词
|
# 分词
|
||||||
words = self._segment_sentence(sentence)
|
words = self._segment_sentence(sentence)
|
||||||
|
|
||||||
for word in words:
|
for word in words:
|
||||||
# 如果是标点符号或非中文字符,直接保留
|
# 如果是标点符号或空格,直接添加
|
||||||
if all(not self._is_chinese_char(c) for c in word):
|
if all(not self._is_chinese_char(c) for c in word):
|
||||||
result.append(word)
|
result.append(word)
|
||||||
|
current_pos += len(word)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# 获取词语的拼音
|
||||||
word_pinyin = self._get_word_pinyin(word)
|
word_pinyin = self._get_word_pinyin(word)
|
||||||
|
|
||||||
# 步骤1: 尝试进行整词替换
|
# 尝试整词替换
|
||||||
if len(word) > 1 and random.random() < self.word_replace_rate:
|
if len(word) > 1 and random.random() < self.word_replace_rate:
|
||||||
word_homophones = self._get_word_homophones(word)
|
word_homophones = self._get_word_homophones(word)
|
||||||
if word_homophones:
|
if word_homophones:
|
||||||
typo_word = random.choice(word_homophones)
|
typo_word = random.choice(word_homophones)
|
||||||
|
# 计算词的平均频率
|
||||||
orig_freq = sum(self.char_frequency.get(c, 0) for c in word) / len(word)
|
orig_freq = sum(self.char_frequency.get(c, 0) for c in word) / len(word)
|
||||||
typo_freq = sum(self.char_frequency.get(c, 0) for c in typo_word) / len(typo_word)
|
typo_freq = sum(self.char_frequency.get(c, 0) for c in typo_word) / len(typo_word)
|
||||||
|
|
||||||
|
# 添加到结果中
|
||||||
result.append(typo_word)
|
result.append(typo_word)
|
||||||
typo_info.append(
|
typo_info.append(
|
||||||
(
|
(
|
||||||
word,
|
word,
|
||||||
typo_word,
|
typo_word,
|
||||||
" ".join(self._get_word_pinyin(word)),
|
" ".join(word_pinyin),
|
||||||
" ".join(self._get_word_pinyin(typo_word)),
|
" ".join(self._get_word_pinyin(typo_word)),
|
||||||
orig_freq,
|
orig_freq,
|
||||||
typo_freq,
|
typo_freq,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
word_typos.append((typo_word, word))
|
word_typos.append((typo_word, word)) # 记录(错词,正确词)对
|
||||||
|
current_pos += len(typo_word)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 步骤2: 如果不进行整词替换,则对词中的每个字进行单字替换
|
# 如果不进行整词替换,则进行单字替换
|
||||||
new_word = []
|
if len(word) == 1:
|
||||||
for char, py in zip(word, word_pinyin, strict=False):
|
char = word
|
||||||
# 词语越长,其中单个字被替换的概率越低
|
py = word_pinyin[0]
|
||||||
char_error_rate = self.error_rate * (0.7 ** (len(word) - 1))
|
if random.random() < self.error_rate:
|
||||||
if random.random() < char_error_rate:
|
|
||||||
similar_chars = self._get_similar_frequency_chars(char, py)
|
similar_chars = self._get_similar_frequency_chars(char, py)
|
||||||
if similar_chars:
|
if similar_chars:
|
||||||
typo_char = random.choice(similar_chars)
|
typo_char = random.choice(similar_chars)
|
||||||
orig_freq = self.char_frequency.get(char, 0)
|
|
||||||
typo_freq = self.char_frequency.get(typo_char, 0)
|
typo_freq = self.char_frequency.get(typo_char, 0)
|
||||||
# 根据频率计算最终是否替换
|
orig_freq = self.char_frequency.get(char, 0)
|
||||||
if random.random() < self._calculate_replacement_probability(orig_freq, typo_freq):
|
replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq)
|
||||||
new_word.append(typo_char)
|
if random.random() < replace_prob:
|
||||||
typo_py = pinyin(typo_char, style=Style.TONE3)
|
result.append(typo_char)
|
||||||
|
typo_py = pinyin(typo_char, style=Style.TONE3)[0][0]
|
||||||
typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq))
|
typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq))
|
||||||
char_typos.append((typo_char, char))
|
char_typos.append((typo_char, char)) # 记录(错字,正确字)对
|
||||||
|
current_pos += 1
|
||||||
continue
|
continue
|
||||||
# 如果不替换,则保留原字
|
result.append(char)
|
||||||
new_word.append(char)
|
current_pos += 1
|
||||||
|
else:
|
||||||
|
# 处理多字词的单字替换
|
||||||
|
word_result = []
|
||||||
|
for _, (char, py) in enumerate(zip(word, word_pinyin, strict=False)):
|
||||||
|
# 词中的字替换概率降低
|
||||||
|
word_error_rate = self.error_rate * (0.7 ** (len(word) - 1))
|
||||||
|
|
||||||
result.append("".join(new_word))
|
if random.random() < word_error_rate:
|
||||||
|
similar_chars = self._get_similar_frequency_chars(char, py)
|
||||||
|
if similar_chars:
|
||||||
|
typo_char = random.choice(similar_chars)
|
||||||
|
typo_freq = self.char_frequency.get(typo_char, 0)
|
||||||
|
orig_freq = self.char_frequency.get(char, 0)
|
||||||
|
replace_prob = self._calculate_replacement_probability(orig_freq, typo_freq)
|
||||||
|
if random.random() < replace_prob:
|
||||||
|
word_result.append(typo_char)
|
||||||
|
typo_py = pinyin(typo_char, style=Style.TONE3)[0][0]
|
||||||
|
typo_info.append((char, typo_char, py, typo_py, orig_freq, typo_freq))
|
||||||
|
char_typos.append((typo_char, char)) # 记录(错字,正确字)对
|
||||||
|
continue
|
||||||
|
word_result.append(char)
|
||||||
|
result.append("".join(word_result))
|
||||||
|
current_pos += len(word)
|
||||||
|
|
||||||
# 步骤3: 生成修正建议
|
# 优先从词语错误中选择,如果没有则从单字错误中选择
|
||||||
correction_suggestion = None
|
correction_suggestion = None
|
||||||
# 有50%的概率提供一个修正建议
|
# 50%概率返回纠正建议
|
||||||
if random.random() < 0.5:
|
if random.random() < 0.5:
|
||||||
# 优先从整词错误中选择
|
|
||||||
if word_typos:
|
if word_typos:
|
||||||
_, correct_word = random.choice(word_typos)
|
wrong_word, correct_word = random.choice(word_typos)
|
||||||
correction_suggestion = correct_word
|
correction_suggestion = correct_word
|
||||||
# 其次从单字错误中选择
|
|
||||||
elif char_typos:
|
elif char_typos:
|
||||||
_, correct_char = random.choice(char_typos)
|
wrong_char, correct_char = random.choice(char_typos)
|
||||||
correction_suggestion = correct_char
|
correction_suggestion = correct_char
|
||||||
|
|
||||||
return sentence, "".join(result), correction_suggestion
|
return "".join(result), correction_suggestion
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def format_typo_info(typo_info):
|
def format_typo_info(typo_info):
|
||||||
"""
|
"""
|
||||||
将错别字生成过程中的详细信息格式化为可读字符串。
|
格式化错别字信息
|
||||||
|
|
||||||
Args:
|
参数:
|
||||||
typo_info (list): `create_typo_sentence` 方法生成的详细信息列表。
|
typo_info: 错别字信息列表
|
||||||
|
|
||||||
Returns:
|
返回:
|
||||||
str: 格式化后的字符串,用于调试和分析。
|
格式化后的错别字信息字符串
|
||||||
"""
|
"""
|
||||||
if not typo_info:
|
if not typo_info:
|
||||||
return "未生成错别字"
|
return "未生成错别字"
|
||||||
|
|
||||||
result = []
|
result = []
|
||||||
for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info:
|
for orig, typo, orig_py, typo_py, orig_freq, typo_freq in typo_info:
|
||||||
# 判断是整词替换还是单字替换
|
# 判断是否为词语替换
|
||||||
is_word = " " in orig_py
|
is_word = " " in orig_py
|
||||||
if is_word:
|
if is_word:
|
||||||
error_type = "整词替换"
|
error_type = "整词替换"
|
||||||
else:
|
else:
|
||||||
# 判断是声调错误还是同音字替换
|
|
||||||
tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1]
|
tone_error = orig_py[:-1] == typo_py[:-1] and orig_py[-1] != typo_py[-1]
|
||||||
error_type = "声调错误" if tone_error else "同音字替换"
|
error_type = "声调错误" if tone_error else "同音字替换"
|
||||||
|
|
||||||
@@ -469,22 +430,21 @@ class ChineseTypoGenerator:
|
|||||||
|
|
||||||
def set_params(self, **kwargs):
|
def set_params(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
动态设置生成器的参数。
|
设置参数
|
||||||
|
|
||||||
Args:
|
可设置参数:
|
||||||
**kwargs: 键值对参数,可设置的参数包括:
|
error_rate: 单字替换概率
|
||||||
- error_rate (float)
|
min_freq: 最小字频阈值
|
||||||
- min_freq (int)
|
tone_error_rate: 声调错误概率
|
||||||
- tone_error_rate (float)
|
word_replace_rate: 整词替换概率
|
||||||
- word_replace_rate (float)
|
max_freq_diff: 最大允许的频率差异
|
||||||
- max_freq_diff (int)
|
|
||||||
"""
|
"""
|
||||||
for key, value in kwargs.items():
|
for key, value in kwargs.items():
|
||||||
if hasattr(self, key):
|
if hasattr(self, key):
|
||||||
setattr(self, key, value)
|
setattr(self, key, value)
|
||||||
logger.info(f"参数 {key} 已更新为 {value}")
|
print(f"参数 {key} 已设置为 {value}")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"尝试设置不存在的参数: {key}")
|
print(f"警告: 参数 {key} 不存在")
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -496,10 +456,10 @@ def main():
|
|||||||
|
|
||||||
# 创建包含错别字的句子
|
# 创建包含错别字的句子
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
original_sentence, typo_sentence, correction_suggestion = typo_generator.create_typo_sentence(sentence)
|
typo_sentence, correction_suggestion = typo_generator.create_typo_sentence(sentence)
|
||||||
|
|
||||||
# 打印结果
|
# 打印结果
|
||||||
print("\n原句:", original_sentence)
|
print("\n原句:", sentence)
|
||||||
print("错字版:", typo_sentence)
|
print("错字版:", typo_sentence)
|
||||||
|
|
||||||
# 打印纠正建议
|
# 打印纠正建议
|
||||||
|
|||||||
@@ -293,11 +293,9 @@ def random_remove_punctuation(text: str) -> str:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def process_llm_response(
|
def process_llm_response(text: str, enable_splitter: bool = True, enable_chinese_typo: bool = True) -> list[str]:
|
||||||
text: str, enable_splitter: bool = True, enable_chinese_typo: bool = True
|
|
||||||
) -> list[dict[str, str]]:
|
|
||||||
if not global_config.response_post_process.enable_response_post_process:
|
if not global_config.response_post_process.enable_response_post_process:
|
||||||
return [{"type": "text", "content": text}]
|
return [text]
|
||||||
|
|
||||||
# 先保护颜文字
|
# 先保护颜文字
|
||||||
if global_config.response_splitter.enable_kaomoji_protection:
|
if global_config.response_splitter.enable_kaomoji_protection:
|
||||||
@@ -313,7 +311,7 @@ def process_llm_response(
|
|||||||
cleaned_text = pattern.sub("", protected_text)
|
cleaned_text = pattern.sub("", protected_text)
|
||||||
|
|
||||||
if cleaned_text == "":
|
if cleaned_text == "":
|
||||||
return [{"type": "text", "content": "呃呃"}]
|
return ["呃呃"]
|
||||||
|
|
||||||
logger.debug(f"{text}去除括号处理后的文本: {cleaned_text}")
|
logger.debug(f"{text}去除括号处理后的文本: {cleaned_text}")
|
||||||
|
|
||||||
@@ -323,7 +321,7 @@ def process_llm_response(
|
|||||||
# 如果基本上是中文,则进行长度过滤
|
# 如果基本上是中文,则进行长度过滤
|
||||||
if get_western_ratio(cleaned_text) < 0.1 and len(cleaned_text) > max_length:
|
if get_western_ratio(cleaned_text) < 0.1 and len(cleaned_text) > max_length:
|
||||||
logger.warning(f"回复过长 ({len(cleaned_text)} 字符),返回默认回复")
|
logger.warning(f"回复过长 ({len(cleaned_text)} 字符),返回默认回复")
|
||||||
return [{"type": "text", "content": "懒得说"}]
|
return ["懒得说"]
|
||||||
|
|
||||||
typo_generator = ChineseTypoGenerator(
|
typo_generator = ChineseTypoGenerator(
|
||||||
error_rate=global_config.chinese_typo.error_rate,
|
error_rate=global_config.chinese_typo.error_rate,
|
||||||
@@ -340,24 +338,16 @@ def process_llm_response(
|
|||||||
sentences = []
|
sentences = []
|
||||||
for sentence in split_sentences:
|
for sentence in split_sentences:
|
||||||
if global_config.chinese_typo.enable and enable_chinese_typo:
|
if global_config.chinese_typo.enable and enable_chinese_typo:
|
||||||
original_sentence, typo_sentence, typo_corrections = typo_generator.create_typo_sentence(sentence)
|
typoed_text, typo_corrections = typo_generator.create_typo_sentence(sentence)
|
||||||
|
sentences.append(typoed_text)
|
||||||
if typo_corrections:
|
if typo_corrections:
|
||||||
sentences.append(
|
sentences.append(typo_corrections)
|
||||||
{
|
|
||||||
"type": "typo",
|
|
||||||
"original": original_sentence,
|
|
||||||
"typo": typo_sentence,
|
|
||||||
"correction": typo_corrections,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
sentences.append({"type": "text", "content": sentence})
|
sentences.append(sentence)
|
||||||
else:
|
|
||||||
sentences.append({"type": "text", "content": sentence})
|
|
||||||
|
|
||||||
if len(sentences) > max_sentence_num:
|
if len(sentences) > max_sentence_num:
|
||||||
logger.warning(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复")
|
logger.warning(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复")
|
||||||
return [{"type": "text", "content": f"{global_config.bot.nickname}不知道哦"}]
|
return [f"{global_config.bot.nickname}不知道哦"]
|
||||||
|
|
||||||
# if extracted_contents:
|
# if extracted_contents:
|
||||||
# for content in extracted_contents:
|
# for content in extracted_contents:
|
||||||
@@ -365,20 +355,7 @@ def process_llm_response(
|
|||||||
|
|
||||||
# 在所有句子处理完毕后,对包含占位符的列表进行恢复
|
# 在所有句子处理完毕后,对包含占位符的列表进行恢复
|
||||||
if global_config.response_splitter.enable_kaomoji_protection:
|
if global_config.response_splitter.enable_kaomoji_protection:
|
||||||
# sentences中的元素可能是dict,也可能是str,所以要分开处理
|
sentences = recover_kaomoji(sentences, kaomoji_mapping)
|
||||||
recovered_sentences = []
|
|
||||||
for s in sentences:
|
|
||||||
if isinstance(s, dict) and s.get("type") == "typo":
|
|
||||||
s["original"] = recover_kaomoji(s["original"], kaomoji_mapping)
|
|
||||||
s["typo"] = recover_kaomoji(s["typo"], kaomoji_mapping)
|
|
||||||
s["correction"] = recover_kaomoji(s["correction"], kaomoji_mapping)
|
|
||||||
recovered_sentences.append(s)
|
|
||||||
elif isinstance(s, dict) and s.get("type") == "text":
|
|
||||||
s["content"] = recover_kaomoji(s["content"], kaomoji_mapping)
|
|
||||||
recovered_sentences.append(s)
|
|
||||||
else:
|
|
||||||
recovered_sentences.append(recover_kaomoji([s], kaomoji_mapping))
|
|
||||||
sentences = recovered_sentences
|
|
||||||
|
|
||||||
return sentences
|
return sentences
|
||||||
|
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ async def generate_reply(
|
|||||||
return_prompt: bool = False,
|
return_prompt: bool = False,
|
||||||
request_type: str = "generator_api",
|
request_type: str = "generator_api",
|
||||||
from_plugin: bool = True,
|
from_plugin: bool = True,
|
||||||
) -> Tuple[bool, List[Dict[str, Any]], Optional[str]]:
|
) -> Tuple[bool, List[Tuple[str, Any]], Optional[str]]:
|
||||||
"""生成回复
|
"""生成回复
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -185,7 +185,7 @@ async def rewrite_reply(
|
|||||||
reply_to: str = "",
|
reply_to: str = "",
|
||||||
return_prompt: bool = False,
|
return_prompt: bool = False,
|
||||||
request_type: str = "generator_api",
|
request_type: str = "generator_api",
|
||||||
) -> Tuple[bool, List[Dict[str, Any]], Optional[str]]:
|
) -> Tuple[bool, List[Tuple[str, Any]], Optional[str]]:
|
||||||
"""重写回复
|
"""重写回复
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -244,9 +244,7 @@ async def rewrite_reply(
|
|||||||
return False, [], None
|
return False, [], None
|
||||||
|
|
||||||
|
|
||||||
def process_human_text(
|
def process_human_text(content: str, enable_splitter: bool, enable_chinese_typo: bool) -> List[Tuple[str, Any]]:
|
||||||
content: str, enable_splitter: bool, enable_chinese_typo: bool
|
|
||||||
) -> List[Dict[str, Any]]:
|
|
||||||
"""将文本处理为更拟人化的文本
|
"""将文本处理为更拟人化的文本
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -263,11 +261,9 @@ def process_human_text(
|
|||||||
processed_response = process_llm_response(content, enable_splitter, enable_chinese_typo)
|
processed_response = process_llm_response(content, enable_splitter, enable_chinese_typo)
|
||||||
|
|
||||||
reply_set = []
|
reply_set = []
|
||||||
for item in processed_response:
|
for text in processed_response:
|
||||||
if item["type"] == "typo":
|
reply_seg = ("text", text)
|
||||||
reply_set.append(item)
|
reply_set.append(reply_seg)
|
||||||
else:
|
|
||||||
reply_set.append({"type": "text", "content": item["content"]})
|
|
||||||
|
|
||||||
return reply_set
|
return reply_set
|
||||||
|
|
||||||
|
|||||||
@@ -180,7 +180,7 @@ async def _send_to_target(
|
|||||||
|
|
||||||
# 构建机器人用户信息
|
# 构建机器人用户信息
|
||||||
bot_user_info = UserInfo(
|
bot_user_info = UserInfo(
|
||||||
user_id=str(global_config.bot.qq_account),
|
user_id=global_config.bot.qq_account,
|
||||||
user_nickname=global_config.bot.nickname,
|
user_nickname=global_config.bot.nickname,
|
||||||
platform=target_stream.platform,
|
platform=target_stream.platform,
|
||||||
)
|
)
|
||||||
@@ -191,13 +191,10 @@ async def _send_to_target(
|
|||||||
# 处理回复消息
|
# 处理回复消息
|
||||||
if reply_to_message:
|
if reply_to_message:
|
||||||
anchor_message = message_dict_to_message_recv(message_dict=reply_to_message)
|
anchor_message = message_dict_to_message_recv(message_dict=reply_to_message)
|
||||||
if anchor_message and anchor_message.message_info and anchor_message.message_info.user_info:
|
|
||||||
anchor_message.update_chat_stream(target_stream)
|
anchor_message.update_chat_stream(target_stream)
|
||||||
reply_to_platform_id = (
|
reply_to_platform_id = (
|
||||||
f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}"
|
f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}"
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
reply_to_platform_id = None
|
|
||||||
else:
|
else:
|
||||||
anchor_message = None
|
anchor_message = None
|
||||||
reply_to_platform_id = None
|
reply_to_platform_id = None
|
||||||
@@ -430,10 +427,10 @@ async def adapter_command_to_stream(
|
|||||||
|
|
||||||
# 创建临时的用户信息和聊天流
|
# 创建临时的用户信息和聊天流
|
||||||
|
|
||||||
temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform or "qq")
|
temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform)
|
||||||
|
|
||||||
temp_chat_stream = ChatStream(
|
temp_chat_stream = ChatStream(
|
||||||
stream_id=stream_id, platform=platform or "qq", user_info=temp_user_info, group_info=None
|
stream_id=stream_id, platform=platform, user_info=temp_user_info, group_info=None
|
||||||
)
|
)
|
||||||
|
|
||||||
target_stream = temp_chat_stream
|
target_stream = temp_chat_stream
|
||||||
@@ -450,7 +447,7 @@ async def adapter_command_to_stream(
|
|||||||
|
|
||||||
# 构建机器人用户信息
|
# 构建机器人用户信息
|
||||||
bot_user_info = UserInfo(
|
bot_user_info = UserInfo(
|
||||||
user_id=str(global_config.bot.qq_account),
|
user_id=global_config.bot.qq_account,
|
||||||
user_nickname=global_config.bot.nickname,
|
user_nickname=global_config.bot.nickname,
|
||||||
platform=target_stream.platform,
|
platform=target_stream.platform,
|
||||||
)
|
)
|
||||||
@@ -503,23 +500,3 @@ async def adapter_command_to_stream(
|
|||||||
logger.error(f"[SendAPI] 发送适配器命令时出错: {e}")
|
logger.error(f"[SendAPI] 发送适配器命令时出错: {e}")
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
return {"status": "error", "message": f"发送适配器命令时出错: {str(e)}"}
|
return {"status": "error", "message": f"发送适配器命令时出错: {str(e)}"}
|
||||||
|
|
||||||
|
|
||||||
async def recall_message(message_id: str, stream_id: str) -> bool:
|
|
||||||
"""撤回消息
|
|
||||||
|
|
||||||
Args:
|
|
||||||
message_id: 消息ID
|
|
||||||
stream_id: 聊天流ID
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: 是否成功
|
|
||||||
"""
|
|
||||||
command_data = {"name": "delete_msg", "args": message_id}
|
|
||||||
|
|
||||||
success = await command_to_stream(
|
|
||||||
command=command_data,
|
|
||||||
stream_id=stream_id,
|
|
||||||
storage_message=True,
|
|
||||||
)
|
|
||||||
return success
|
|
||||||
Reference in New Issue
Block a user