From 7494fff1ed89ea54df864d3d08b2e3db4c503151 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Fri, 28 Feb 2025 22:56:05 +0800 Subject: [PATCH] v0.2.2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 优化了CQ码获取逻辑和图片下载逻辑 去除硬编码的“麦麦”() --- README.md | 4 +- src/plugins/chat/__init__.py | 6 +- src/plugins/chat/bot.py | 12 +- src/plugins/chat/bot_config_toml | 4 + src/plugins/chat/config.py | 16 +- src/plugins/chat/cq_code.py | 237 +++++++++++---------- src/plugins/chat/llm_generator.py | 4 +- src/plugins/chat/message.py | 99 ++++----- src/plugins/chat/message_send_control.py | 7 +- src/plugins/chat/prompt_builder.py | 7 +- src/plugins/chat/utils.py | 8 +- src/plugins/chat/utils_cq.py | 72 +++++++ src/plugins/schedule/schedule_generator.py | 2 +- 13 files changed, 271 insertions(+), 207 deletions(-) create mode 100644 src/plugins/chat/utils_cq.py diff --git a/README.md b/README.md index e04bfc5e4..c0c31c749 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ ![Python Version](https://img.shields.io/badge/Python-3.8-blue) -![License](https://img.shields.io/badge/license-GNL-green) +![License](https://img.shields.io/github/license/SengokuCola/MaiMBot) ![Status](https://img.shields.io/badge/状态-开发中-yellow) @@ -20,7 +20,7 @@ > ⚠️ **警告**:请自行了解qqbot的风险,麦麦有时候一天被腾讯肘七八次 > ⚠️ **警告**:由于麦麦一直在迭代,所以可能存在一些bug,请自行测试,包括胡言乱语( -关于麦麦的开发和部署相关的讨论群(不建议发布无关消息) +关于麦麦的开发和部署相关的讨论群(不建议发布无关消息)这里不会有麦麦发言!
diff --git a/src/plugins/chat/__init__.py b/src/plugins/chat/__init__.py index bafcaa238..446b4cb0c 100644 --- a/src/plugins/chat/__init__.py +++ b/src/plugins/chat/__init__.py @@ -32,7 +32,7 @@ from .relationship_manager import relationship_manager # 初始化表情管理器 emoji_manager.initialize() -print("\033[1;32m正在唤醒麦麦......\033[0m") +print(f"\033[1;32m正在唤醒{global_config.BOT_NICKNAME}......\033[0m") # 创建机器人实例 chat_bot = ChatBot(global_config) @@ -54,11 +54,11 @@ async def start_background_tasks(): @driver.on_bot_connect async def _(bot: Bot): """Bot连接成功时的处理""" - print("\033[1;38;5;208m-----------麦麦成功连接!-----------\033[0m") + print(f"\033[1;38;5;208m-----------{global_config.BOT_NICKNAME}成功连接!-----------\033[0m") message_sender.set_bot(bot) asyncio.create_task(message_sender.start_processor(bot)) await willing_manager.ensure_started() - print("\033[1;38;5;208m-----------麦麦消息发送器已启动!-----------\033[0m") + print("\033[1;38;5;208m-----------消息发送器已启动!-----------\033[0m") asyncio.create_task(emoji_manager._periodic_scan(interval_MINS=global_config.EMOJI_REGISTER_INTERVAL)) print("\033[1;38;5;208m-----------开始偷表情包!-----------\033[0m") diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 856757c80..efa8e1014 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -33,14 +33,6 @@ class ChatBot: if not self._started: # 只保留必要的任务 self._started = True - - def is_mentioned_bot(self, message: Message) -> bool: - """检查消息是否提到了机器人""" - keywords = ['麦麦'] - for keyword in keywords: - if keyword in message.processed_plain_text: - return True - return False async def handle_message(self, event: GroupMessageEvent, bot: Bot) -> None: @@ -159,7 +151,7 @@ class ChatBot: raw_message=msg, plain_text=msg, processed_plain_text=msg, - user_nickname="麦麦", + user_nickname=global_config.BOT_NICKNAME, group_name=message.group_name, time=timepoint ) @@ -187,7 +179,7 @@ class ChatBot: raw_message=emoji_cq, plain_text=emoji_cq, processed_plain_text=emoji_cq, - user_nickname="麦麦", + user_nickname=global_config.BOT_NICKNAME, group_name=message.group_name, time=bot_response_time, is_emoji=True diff --git a/src/plugins/chat/bot_config_toml b/src/plugins/chat/bot_config_toml index 95ecb5244..ad61f7edb 100644 --- a/src/plugins/chat/bot_config_toml +++ b/src/plugins/chat/bot_config_toml @@ -16,6 +16,10 @@ emoji_chance = 0.2 check_interval = 120 register_interval = 10 +[cq_code] +enable_pic_translate = true + + [response] api_using = "siliconflow" model_r1_probability = 0.8 diff --git a/src/plugins/chat/config.py b/src/plugins/chat/config.py index f3867bc59..8c5d480e7 100644 --- a/src/plugins/chat/config.py +++ b/src/plugins/chat/config.py @@ -10,11 +10,11 @@ import tomli # 添加这行导入 # logger.remove() # # 只禁用 INFO 级别的日志输出到控制台 -logging.getLogger('nonebot').handlers.clear() -console_handler = logging.StreamHandler() -console_handler.setLevel(logging.WARNING) # 只输出 WARNING 及以上级别 -logging.getLogger('nonebot').addHandler(console_handler) -logging.getLogger('nonebot').setLevel(logging.WARNING) +# logging.getLogger('nonebot').handlers.clear() +# console_handler = logging.StreamHandler() +# console_handler.setLevel(logging.WARNING) # 只输出 WARNING 及以上级别 +# logging.getLogger('nonebot').addHandler(console_handler) +# logging.getLogger('nonebot').setLevel(logging.WARNING) @dataclass class BotConfig: @@ -33,6 +33,8 @@ class BotConfig: MAX_CONTEXT_SIZE: int = 15 # 上下文最大消息数 emoji_chance: float = 0.2 # 发送表情包的基础概率 + ENABLE_PIC_TRANSLATE: bool = True # 是否启用图片翻译 + talk_allowed_groups = set() talk_frequency_down_groups = set() ban_user_id = set() @@ -65,6 +67,10 @@ class BotConfig: config.EMOJI_CHECK_INTERVAL = emoji_config.get("check_interval", config.EMOJI_CHECK_INTERVAL) config.EMOJI_REGISTER_INTERVAL = emoji_config.get("register_interval", config.EMOJI_REGISTER_INTERVAL) + if "cq_code" in toml_dict: + cq_code_config = toml_dict["cq_code"] + config.ENABLE_PIC_TRANSLATE = cq_code_config.get("enable_pic_translate", config.ENABLE_PIC_TRANSLATE) + # 机器人基础配置 if "bot" in toml_dict: bot_config = toml_dict["bot"] diff --git a/src/plugins/chat/cq_code.py b/src/plugins/chat/cq_code.py index 41e0a31a5..55b4d82d7 100644 --- a/src/plugins/chat/cq_code.py +++ b/src/plugins/chat/cq_code.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, Optional +from typing import Dict, Optional, List, Union import html import requests import base64 @@ -12,6 +12,13 @@ from nonebot.adapters.onebot.v11 import Bot from .config import global_config, llm_config import time import asyncio + +#解析各种CQ码 +#包含CQ码类 + + + + @dataclass class CQCode: """ @@ -25,13 +32,14 @@ class CQCode: """ type: str params: Dict[str, str] - raw_code: str + # raw_code: str group_id: int user_id: int group_name: str = "" user_nickname: str = "" translated_plain_text: Optional[str] = None reply_message: Dict = None # 存储回复消息 + image_base64: Optional[str] = None @classmethod def from_cq_code(cls, cq_code: str, reply: Dict = None) -> 'CQCode': @@ -39,6 +47,9 @@ class CQCode: 从CQ码字符串创建CQCode对象 例如:[CQ:image,file=1.jpg,url=http://example.com/1.jpg] """ + if not cq_code.startswith('[CQ:'): + return cls('text', {'text': cq_code}, cq_code, group_id=0, user_id=0) + # 移除前后的[] content = cq_code[1:-1] # 分离类型和参数部分 @@ -69,7 +80,10 @@ class CQCode: if self.type == 'text': self.translated_plain_text = self.params.get('text', '') elif self.type == 'image': - self.translated_plain_text = self.translate_image() + if self.params.get('sub_type') == '0': + self.translated_plain_text = self.translate_image() + else: + self.translated_plain_text = self.translate_emoji() elif self.type == 'at': from .message import Message message_obj = Message( @@ -87,16 +101,8 @@ class CQCode: else: self.translated_plain_text = f"[{self.type}]" - def translate_image(self) -> str: - """处理图片类型的CQ码,区分普通图片和表情包""" - if 'url' not in self.params: - return '[图片]' - - # 获取子类型,默认为普通图片(0) - sub_type = int(self.params.get('sub_type', '0')) - is_emoji = (sub_type == 1) - - # 添加更多请求头 + def get_img(self): + ''' headers = { 'User-Agent': 'QQ/8.9.68.11565 CFNetwork/1220.1 Darwin/20.3.0', 'Accept': 'image/*;q=0.8', @@ -105,64 +111,71 @@ class CQCode: 'Cache-Control': 'no-cache', 'Pragma': 'no-cache' } + ''' - # 处理URL编码问题 + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36', + 'Accept': 'text/html, application/xhtml xml, */*', + 'Accept-Encoding': 'gbk, GB2312', + 'Accept-Language': 'zh-cn', + 'Content-Type': 'application/x-www-form-urlencoded', + 'Cache-Control': 'no-cache' + } url = html.unescape(self.params['url']) - if not url.startswith(('http://', 'https://')): - return '[图片]' # 直接返回而不是抛出异常 + return None # 直接返回而不是抛出异常 - try: - # 下载图片,增加重试机制 - max_retries = 3 - for retry in range(max_retries): - try: - response = requests.get(url, headers=headers, timeout=10, verify=False) - if response.status_code == 200: - break - elif response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url: - # 对于腾讯多媒体服务器的链接,直接返回图片描述 - if sub_type == 1: - return '[QQ表情]' - return '[图片]' - time.sleep(1) # 重试前等待1秒 - except requests.RequestException: - if retry == max_retries - 1: - raise - time.sleep(1) - - if response.status_code != 200: - print(f"\033[1;31m[警告]\033[0m 图片下载失败: HTTP {response.status_code}, URL: {url}") - return '[图片]' # 直接返回而不是抛出异常 - - # 检查响应内容类型 - content_type = response.headers.get('content-type', '') - if not content_type.startswith('image/'): - print(f"\033[1;31m[警告]\033[0m 非图片类型响应: {content_type}") - return '[图片]' # 直接返回而不是抛出异常 - - content = response.content - image_base64 = base64.b64encode(content).decode('utf-8') - - # 根据子类型选择不同的处理方式 - if sub_type == 1: # 表情包 - try: - return self.get_emoji_description(image_base64) - except Exception as e: - print(f"\033[1;31m[警告]\033[0m 表情描述生成失败: {str(e)}") - return '[QQ表情]' - elif sub_type == 0: # 普通图片 - try: - return self.get_image_description(image_base64) - except Exception as e: - print(f"\033[1;31m[警告]\033[0m 图片描述生成失败: {str(e)}") - return '[图片]' - else: # 其他类型都按普通图片处理 - return '[图片]' - - except Exception as e: - print(f"\033[1;31m[警告]\033[0m 图片处理失败: {str(e)}") - return '[图片]' # 出现任何错误都返回默认文本而不是抛出异常 + max_retries = 3 + for retry in range(max_retries): + try: + response = requests.get(url, headers=headers, timeout=10, verify=False) + if response.status_code == 200: + break + elif response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url: + # 对于腾讯多媒体服务器的链接,直接返回图片描述 + return None + time.sleep(1) # 重试前等待1秒 + except requests.RequestException: + if retry == max_retries - 1: + raise + time.sleep(1) + if response.status_code != 200: + print(f"\033[1;31m[警告]\033[0m 图片下载失败: HTTP {response.status_code}, URL: {url}") + return None # 直接返回而不是抛出异常 + #检查是否为图片 + content_type = response.headers.get('content-type', '') + if not content_type.startswith('image/'): + print(f"\033[1;31m[警告]\033[0m 非图片类型响应: {content_type}") + return None # 直接返回而不是抛出异常 + content = response.content + image_base64 = base64.b64encode(content).decode('utf-8') + if image_base64: + self.image_base64 = image_base64 + return image_base64 + else: + return None + + def translate_emoji(self) -> str: + """处理表情包类型的CQ码""" + if 'url' not in self.params: + return '[表情包]' + base64 = self.get_img() + if base64: + return self.get_image_description(base64) + else: + return '[表情包]' + + + def translate_image(self) -> str: + """处理图片类型的CQ码,区分普通图片和表情包""" + #没有url,直接返回默认文本 + if 'url' not in self.params: + return '[图片]' + base64 = self.get_img() + if base64: + return self.get_image_description(base64) + else: + return '[图片]' def get_emoji_description(self, image_base64: str) -> str: """调用AI接口获取表情包描述""" @@ -254,53 +267,6 @@ class CQCode: raise ValueError(f"AI接口调用失败: {response.text}") - - def get_image_description_is_setu(self, image_base64: str) -> str: - """调用AI接口获取普通图片描述""" - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {llm_config.SILICONFLOW_API_KEY}" - } - - payload = { - "model": "deepseek-ai/deepseek-vl2", - "messages": [ - { - "role": "user", - "content": [ - { - "type": "text", - "text": "请回答我这张图片是否涉及涩情、情色、裸露或性暗示,请严格判断,有任何涩情迹象就回答是,请用是或否回答" - }, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{image_base64}" - } - } - ] - } - ], - "max_tokens": 300, - "temperature": 0.6 - } - - response = requests.post( - f"{llm_config.SILICONFLOW_BASE_URL}chat/completions", - headers=headers, - json=payload, - timeout=30 - ) - - if response.status_code == 200: - result_json = response.json() - if "choices" in result_json and len(result_json["choices"]) > 0: - description = result_json["choices"][0]["message"]["content"] - # 如果描述中包含"否",返回否,其他情况返回是 - return "否" if "否" in description else "是" - - raise ValueError(f"AI接口调用失败: {response.text}") - def translate_forward(self) -> str: """处理转发消息""" try: @@ -391,7 +357,7 @@ class CQCode: group_id=self.group_id ) if message_obj.user_id == global_config.BOT_QQ: - return f"[回复 麦麦 的消息: {message_obj.processed_plain_text}]" + return f"[回复 {global_config.BOT_NICKNAME} 的消息: {message_obj.processed_plain_text}]" else: return f"[回复 {self.reply_message.sender.nickname} 的消息: {message_obj.processed_plain_text}]" @@ -424,7 +390,41 @@ class CQCode: .replace(',', ',') # 生成CQ码,设置sub_type=1表示这是表情包 return f"[CQ:image,file=file:///{escaped_path},sub_type=1]" - + +class CQCode_tool: + @staticmethod + def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode: + """ + 将CQ码字典转换为CQCode对象 + + Args: + cq_code: CQ码字典 + reply: 回复消息的字典(可选) + + Returns: + CQCode对象 + """ + # 处理字典形式的CQ码 + # 从cq_code字典中获取type字段的值,如果不存在则默认为'text' + cq_type = cq_code.get('type', 'text') + params = {} + if cq_type == 'text': + params['text'] = cq_code.get('data', {}).get('text', '') + else: + params = cq_code.get('data', {}) + + instance = CQCode( + type=cq_type, + params=params, + group_id=0, + user_id=0, + reply_message=reply + ) + + # 进行翻译处理 + instance.translate() + return instance + @staticmethod def create_reply_cq(message_id: int) -> str: """ @@ -434,4 +434,7 @@ class CQCode: Returns: 回复CQ码字符串 """ - return f"[CQ:reply,id={message_id}]" \ No newline at end of file + return f"[CQ:reply,id={message_id}]" + + +cq_code_tool = CQCode_tool() diff --git a/src/plugins/chat/llm_generator.py b/src/plugins/chat/llm_generator.py index 16cbb42e7..98494d795 100644 --- a/src/plugins/chat/llm_generator.py +++ b/src/plugins/chat/llm_generator.py @@ -58,7 +58,7 @@ class LLMResponseGenerator: else: self.current_model_type = 'r1_distill' # 默认使用 R1-Distill - print(f"+++++++++++++++++麦麦{self.current_model_type}思考中+++++++++++++++++") + print(f"+++++++++++++++++{global_config.BOT_NICKNAME}{self.current_model_type}思考中+++++++++++++++++") if self.current_model_type == 'r1': model_response = await self._generate_r1_response(message) elif self.current_model_type == 'v3': @@ -67,7 +67,7 @@ class LLMResponseGenerator: model_response = await self._generate_r1_distill_response(message) # 打印情感标签 - print(f'麦麦的回复是:{model_response}') + print(f'{global_config.BOT_NICKNAME}的回复是:{model_response}') model_response, emotion = await self._process_response(model_response) if model_response: diff --git a/src/plugins/chat/message.py b/src/plugins/chat/message.py index a13a309db..c9d6e8391 100644 --- a/src/plugins/chat/message.py +++ b/src/plugins/chat/message.py @@ -8,8 +8,9 @@ from ...common.database import Database from PIL import Image from .config import BotConfig, global_config import urllib3 -from .cq_code import CQCode from .utils_user import get_user_nickname +from .utils_cq import parse_cq_code +from .cq_code import cq_code_tool,CQCode Message = ForwardRef('Message') # 添加这行 @@ -63,12 +64,10 @@ class Message: self.group_name = self.get_groupname(self.group_id) if not self.processed_plain_text: - # 解析消息片段 if self.raw_message: - # print(f"\033[1;34m[调试信息]\033[0m 原始消息: {self.raw_message}") self.message_segments = self.parse_message_segments(str(self.raw_message)) self.processed_plain_text = ' '.join( - seg['translated_text'] + seg.translated_plain_text for seg in self.message_segments ) @@ -87,96 +86,80 @@ class Message: else: return f"群{group_id}" - def parse_message_segments(self, message: str) -> List[Dict]: + def parse_message_segments(self, message: str) -> List[CQCode]: """ 将消息解析为片段列表,包括纯文本和CQ码 返回的列表中每个元素都是字典,包含: - - type: 'text' 或 CQ码类型 - - data: 对于text类型是文本内容,对于CQ码是参数字典 - - translated_text: 经过处理后的文本 + - cq_code_list:分割出的聊天对象,包括文本和CQ码 + - trans_list:翻译后的对象列表 """ - segments = [] - start = 0 + cq_code_dict_list = [] + trans_list = [] + start = 0 + print(f"\033[1;34m[调试信息]\033[0m 原始消息: {message}") while True: # 查找下一个CQ码的开始位置 cq_start = message.find('[CQ:', start) + #如果没有cq码,直接返回文本内容 if cq_start == -1: # 如果没有找到更多CQ码,添加剩余文本 if start < len(message): text = message[start:].strip() if text: # 只添加非空文本 - segments.append({ - 'type': 'text', - 'data': {'text': text}, - 'translated_text': text - }) + cq_code_dict_list.append(parse_cq_code(text)) break - # 添加CQ码前的文本 if cq_start > start: text = message[start:cq_start].strip() if text: # 只添加非空文本 - segments.append({ - 'type': 'text', - 'data': {'text': text}, - 'translated_text': text - }) - + cq_code_dict_list.append(parse_cq_code(text)) # 查找CQ码的结束位置 cq_end = message.find(']', cq_start) if cq_end == -1: # CQ码未闭合,作为普通文本处理 text = message[cq_start:].strip() if text: - segments.append({ - 'type': 'text', - 'data': {'text': text}, - 'translated_text': text - }) + cq_code_dict_list.append(parse_cq_code(text)) break - - # 提取完整的CQ码并创建CQCode对象 cq_code = message[cq_start:cq_end + 1] - try: - cq_obj = CQCode.from_cq_code(cq_code,reply = self.reply_message) - # 设置必要的属性 - segments.append({ - 'type': cq_obj.type, - 'data': cq_obj.params, - 'translated_text': cq_obj.translated_plain_text - }) - except Exception as e: - import traceback - print(f"\033[1;31m[错误]\033[0m 处理CQ码失败: {str(e)}") - print(f"CQ码内容: {cq_code}") - print(f"当前消息属性:") - print(f"- group_id: {self.group_id}") - print(f"- user_id: {self.user_id}") - print(f"- user_nickname: {self.user_nickname}") - print(f"- group_name: {self.group_name}") - print("详细错误信息:") - print(traceback.format_exc()) - # 处理失败时,将CQ码作为普通文本处理 - segments.append({ - 'type': 'text', - 'data': {'text': cq_code}, - 'translated_text': cq_code - }) + #将cq_code解析成字典 + cq_code_dict_list.append(parse_cq_code(cq_code)) + # 更新start位置到当前CQ码之后 start = cq_end + 1 + + print(f"\033[1;34m[调试信息]\033[0m 提取的消息对象:列表: {cq_code_dict_list}") - - if len(segments) == 1 and segments[0]['type'] == 'image': + #判定是否是表情包消息,以及是否含有表情包 + if len(cq_code_dict_list) == 1 and cq_code_dict_list[0]['type'] == 'image': self.is_emoji = True self.has_emoji_emoji = True else: - for segment in segments: + for segment in cq_code_dict_list: if segment['type'] == 'image' and segment['data'].get('sub_type') == '1': self.has_emoji_emoji = True break + + + #翻译作为字典的CQ码 + for _code_item in cq_code_dict_list: + #一个一个CQ码处理 + message_obj = cq_code_tool.cq_from_dict_to_class(_code_item,reply = self.reply_message) + trans_list.append(message_obj) + # except Exception as e: + # import traceback + # print(f"\033[1;31m[错误]\033[0m 处理CQ码失败: {str(e)}") + # print(f"CQ码内容: {cq_code}") + # print(f"当前消息属性:") + # print(f"- group_id: {self.group_id}") + # print(f"- user_id: {self.user_id}") + # print(f"- user_nickname: {self.user_nickname}") + # print(f"- group_name: {self.group_name}") + # print("详细错误信息:") + # print(traceback.format_exc()) - return segments + return trans_list class Message_Thinking: """消息思考类""" diff --git a/src/plugins/chat/message_send_control.py b/src/plugins/chat/message_send_control.py index 61169d069..cee27bf1f 100644 --- a/src/plugins/chat/message_send_control.py +++ b/src/plugins/chat/message_send_control.py @@ -9,8 +9,11 @@ from collections import deque import time from .storage import MessageStorage from .config import global_config +from .cq_code import cq_code_tool + if os.name == "nt": from .message_visualizer import message_visualizer + class SendTemp: @@ -194,7 +197,7 @@ class MessageSendControl: print(f"- 群组: {group_id} - 内容: {message.processed_plain_text}") cost_time = round(time.time(), 2) - message.time if cost_time > 40: - message.processed_plain_text = CQCode.create_reply_cq(message.message_based_id) + message.processed_plain_text + message.processed_plain_text = cq_code_tool.create_reply_cq(message.message_based_id) + message.processed_plain_text cur_time = time.time() await self._current_bot.send_group_msg( group_id=group_id, @@ -204,7 +207,7 @@ class MessageSendControl: cost_time = round(time.time(), 2) - cur_time print(f"\033[1;34m[调试]\033[0m 消息发送时间: {cost_time}秒") current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.time)) - print(f"\033[1;32m群 {group_id} 消息, 用户 麦麦, 时间: {current_time}:\033[0m {str(message.processed_plain_text)}") + print(f"\033[1;32m群 {group_id} 消息, 用户 {global_config.BOT_NICKNAME}, 时间: {current_time}:\033[0m {str(message.processed_plain_text)}") await self.storage.store_message(message, None) queue.update_send_time() if queue.has_messages(): diff --git a/src/plugins/chat/prompt_builder.py b/src/plugins/chat/prompt_builder.py index 9d8cf4a62..8d8bf89a4 100644 --- a/src/plugins/chat/prompt_builder.py +++ b/src/plugins/chat/prompt_builder.py @@ -5,6 +5,7 @@ from ..schedule.schedule_generator import bot_schedule import os from .utils import get_embedding, combine_messages, get_recent_group_messages from ...common.database import Database +from .config import global_config # 获取当前文件的绝对路径 current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -67,7 +68,7 @@ class PromptBuilder: chat_talking_prompt = '' if group_id: # 从数据库获取最近消息 - message_objects = get_recent_group_messages(self.db, group_id, limit=16) + message_objects = get_recent_group_messages(self.db, group_id, limit=global_config.MAX_CONTEXT_SIZE) if message_objects: chat_talking_prompt = combine_messages(message_objects) # print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的最近{len(message_objects)}条消息记录") @@ -88,12 +89,12 @@ class PromptBuilder: prompt_personality = '' personality_choice = random.random() if personality_choice < 4/6: # 第一种人格 - prompt_personality = f'''你的网名叫麦麦,是一个学习地质的女大学生,喜欢摄影,你会刷贴吧,你正在浏览qq群,{promt_info_prompt}, + prompt_personality = f'''你的网名叫{global_config.BOT_NICKNAME},是一个学习地质的女大学生,喜欢摄影,你会刷贴吧,你正在浏览qq群,{promt_info_prompt}, {activate_prompt} 现在请你给出日常且口语化的回复,平淡一些,尽量简短一些。{is_bot_prompt} 请注意把握群里的聊天内容,不要回复的太有条理,可以有个性。''' elif personality_choice < 1: # 第二种人格 - prompt_personality = f'''你的网名叫麦麦,是一个女大学生,你有黑色头发,你会刷小红书, 你正在浏览qq群,{promt_info_prompt}, + prompt_personality = f'''你的网名叫{global_config.BOT_NICKNAME},是一个女大学生,你有黑色头发,你会刷小红书, 你正在浏览qq群,{promt_info_prompt}, {activate_prompt} 现在请你给出日常且口语化的回复,请表现你自己的见解,不要一昧迎合,尽量简短一些。{is_bot_prompt} 请你表达自己的见解和观点。可以有个性。''' diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index 438475a8f..ff4d7680d 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -4,7 +4,7 @@ from typing import List from .message import Message import requests import numpy as np -from .config import llm_config +from .config import llm_config, global_config import re @@ -29,7 +29,7 @@ def combine_messages(messages: List[Message]) -> str: def is_mentioned_bot_in_message(message: Message) -> bool: """检查消息是否提到了机器人""" - keywords = ['麦麦', '麦哲伦'] + keywords = [global_config.BOT_NICKNAME] for keyword in keywords: if keyword in message.processed_plain_text: return True @@ -37,7 +37,7 @@ def is_mentioned_bot_in_message(message: Message) -> bool: def is_mentioned_bot_in_txt(message: str) -> bool: """检查消息是否提到了机器人""" - keywords = ['麦麦', '麦哲伦'] + keywords = [global_config.BOT_NICKNAME] for keyword in keywords: if keyword in message: return True @@ -315,7 +315,7 @@ def process_llm_response(text: str) -> List[str]: # 检查分割后的消息数量是否过多(超过3条) if len(sentences) > 3: print(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复") - return ['麦麦不知道哦'] + return [f'{global_config.BOT_NICKNAME}不知道哦'] return sentences diff --git a/src/plugins/chat/utils_cq.py b/src/plugins/chat/utils_cq.py new file mode 100644 index 000000000..7826e6f92 --- /dev/null +++ b/src/plugins/chat/utils_cq.py @@ -0,0 +1,72 @@ +def parse_cq_code(cq_code: str) -> dict: + """ + 将CQ码解析为字典对象 + + Args: + cq_code (str): CQ码字符串,如 [CQ:image,file=xxx.jpg,url=http://xxx] + + Returns: + dict: 包含type和参数的字典,如 {'type': 'image', 'data': {'file': 'xxx.jpg', 'url': 'http://xxx'}} + """ + # 检查是否是有效的CQ码 + if not (cq_code.startswith('[CQ:') and cq_code.endswith(']')): + return {'type': 'text', 'data': {'text': cq_code}} + + # 移除前后的 [CQ: 和 ] + content = cq_code[4:-1] + + # 分离类型和参数 + parts = content.split(',') + if len(parts) < 1: + return {'type': 'text', 'data': {'text': cq_code}} + + cq_type = parts[0] + params = {} + + # 处理参数部分 + if len(parts) > 1: + # 遍历所有参数 + for part in parts[1:]: + if '=' in part: + key, value = part.split('=', 1) + params[key.strip()] = value.strip() + + return { + 'type': cq_type, + 'data': params + } + +if __name__ == "__main__": + # 测试用例列表 + test_cases = [ + # 测试图片CQ码 + '[CQ:image,summary=,file={6E392FD2-AAA1-5192-F52A-F724A8EC7998}.gif,sub_type=1,url=https://gchat.qpic.cn/gchatpic_new/0/0-0-6E392FD2AAA15192F52AF724A8EC7998/0,file_size=861609]', + + # 测试at CQ码 + '[CQ:at,qq=123456]', + + # 测试普通文本 + 'Hello World', + + # 测试face表情CQ码 + '[CQ:face,id=123]', + + # 测试含有多个逗号的URL + '[CQ:image,url=https://example.com/image,with,commas.jpg]', + + # 测试空参数 + '[CQ:image,summary=]', + + # 测试非法CQ码 + '[CQ:]', + '[CQ:invalid' + ] + + # 测试每个用例 + for i, test_case in enumerate(test_cases, 1): + print(f"\n测试用例 {i}:") + print(f"输入: {test_case}") + result = parse_cq_code(test_case) + print(f"输出: {result}") + print("-" * 50) + diff --git a/src/plugins/schedule/schedule_generator.py b/src/plugins/schedule/schedule_generator.py index 415e278c0..9164b0ffb 100644 --- a/src/plugins/schedule/schedule_generator.py +++ b/src/plugins/schedule/schedule_generator.py @@ -49,7 +49,7 @@ class ScheduleGenerator: elif read_only == False: print(f"{date_str}的日程不存在,准备生成新的日程。") - prompt = f"""我是麦麦,一个地质学大二女大学生,喜欢刷qq,贴吧,知乎和小红书,请为我生成{date_str}({weekday})的日程安排,包括: + prompt = f"""我是{global_config.BOT_NICKNAME},一个地质学大二女大学生,喜欢刷qq,贴吧,知乎和小红书,请为我生成{date_str}({weekday})的日程安排,包括: 1. 早上的学习和工作安排 2. 下午的活动和任务 3. 晚上的计划和休息时间