diff --git a/README.md b/README.md index e04bfc5e4..c0c31c749 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@  - +  @@ -20,7 +20,7 @@ > ⚠️ **警告**:请自行了解qqbot的风险,麦麦有时候一天被腾讯肘七八次 > ⚠️ **警告**:由于麦麦一直在迭代,所以可能存在一些bug,请自行测试,包括胡言乱语( -关于麦麦的开发和部署相关的讨论群(不建议发布无关消息) +关于麦麦的开发和部署相关的讨论群(不建议发布无关消息)这里不会有麦麦发言!
diff --git a/src/plugins/chat/__init__.py b/src/plugins/chat/__init__.py
index bafcaa238..446b4cb0c 100644
--- a/src/plugins/chat/__init__.py
+++ b/src/plugins/chat/__init__.py
@@ -32,7 +32,7 @@ from .relationship_manager import relationship_manager
# 初始化表情管理器
emoji_manager.initialize()
-print("\033[1;32m正在唤醒麦麦......\033[0m")
+print(f"\033[1;32m正在唤醒{global_config.BOT_NICKNAME}......\033[0m")
# 创建机器人实例
chat_bot = ChatBot(global_config)
@@ -54,11 +54,11 @@ async def start_background_tasks():
@driver.on_bot_connect
async def _(bot: Bot):
"""Bot连接成功时的处理"""
- print("\033[1;38;5;208m-----------麦麦成功连接!-----------\033[0m")
+ print(f"\033[1;38;5;208m-----------{global_config.BOT_NICKNAME}成功连接!-----------\033[0m")
message_sender.set_bot(bot)
asyncio.create_task(message_sender.start_processor(bot))
await willing_manager.ensure_started()
- print("\033[1;38;5;208m-----------麦麦消息发送器已启动!-----------\033[0m")
+ print("\033[1;38;5;208m-----------消息发送器已启动!-----------\033[0m")
asyncio.create_task(emoji_manager._periodic_scan(interval_MINS=global_config.EMOJI_REGISTER_INTERVAL))
print("\033[1;38;5;208m-----------开始偷表情包!-----------\033[0m")
diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py
index 856757c80..efa8e1014 100644
--- a/src/plugins/chat/bot.py
+++ b/src/plugins/chat/bot.py
@@ -33,14 +33,6 @@ class ChatBot:
if not self._started:
# 只保留必要的任务
self._started = True
-
- def is_mentioned_bot(self, message: Message) -> bool:
- """检查消息是否提到了机器人"""
- keywords = ['麦麦']
- for keyword in keywords:
- if keyword in message.processed_plain_text:
- return True
- return False
async def handle_message(self, event: GroupMessageEvent, bot: Bot) -> None:
@@ -159,7 +151,7 @@ class ChatBot:
raw_message=msg,
plain_text=msg,
processed_plain_text=msg,
- user_nickname="麦麦",
+ user_nickname=global_config.BOT_NICKNAME,
group_name=message.group_name,
time=timepoint
)
@@ -187,7 +179,7 @@ class ChatBot:
raw_message=emoji_cq,
plain_text=emoji_cq,
processed_plain_text=emoji_cq,
- user_nickname="麦麦",
+ user_nickname=global_config.BOT_NICKNAME,
group_name=message.group_name,
time=bot_response_time,
is_emoji=True
diff --git a/src/plugins/chat/bot_config_toml b/src/plugins/chat/bot_config_toml
index 95ecb5244..ad61f7edb 100644
--- a/src/plugins/chat/bot_config_toml
+++ b/src/plugins/chat/bot_config_toml
@@ -16,6 +16,10 @@ emoji_chance = 0.2
check_interval = 120
register_interval = 10
+[cq_code]
+enable_pic_translate = true
+
+
[response]
api_using = "siliconflow"
model_r1_probability = 0.8
diff --git a/src/plugins/chat/config.py b/src/plugins/chat/config.py
index f3867bc59..8c5d480e7 100644
--- a/src/plugins/chat/config.py
+++ b/src/plugins/chat/config.py
@@ -10,11 +10,11 @@ import tomli # 添加这行导入
# logger.remove()
# # 只禁用 INFO 级别的日志输出到控制台
-logging.getLogger('nonebot').handlers.clear()
-console_handler = logging.StreamHandler()
-console_handler.setLevel(logging.WARNING) # 只输出 WARNING 及以上级别
-logging.getLogger('nonebot').addHandler(console_handler)
-logging.getLogger('nonebot').setLevel(logging.WARNING)
+# logging.getLogger('nonebot').handlers.clear()
+# console_handler = logging.StreamHandler()
+# console_handler.setLevel(logging.WARNING) # 只输出 WARNING 及以上级别
+# logging.getLogger('nonebot').addHandler(console_handler)
+# logging.getLogger('nonebot').setLevel(logging.WARNING)
@dataclass
class BotConfig:
@@ -33,6 +33,8 @@ class BotConfig:
MAX_CONTEXT_SIZE: int = 15 # 上下文最大消息数
emoji_chance: float = 0.2 # 发送表情包的基础概率
+ ENABLE_PIC_TRANSLATE: bool = True # 是否启用图片翻译
+
talk_allowed_groups = set()
talk_frequency_down_groups = set()
ban_user_id = set()
@@ -65,6 +67,10 @@ class BotConfig:
config.EMOJI_CHECK_INTERVAL = emoji_config.get("check_interval", config.EMOJI_CHECK_INTERVAL)
config.EMOJI_REGISTER_INTERVAL = emoji_config.get("register_interval", config.EMOJI_REGISTER_INTERVAL)
+ if "cq_code" in toml_dict:
+ cq_code_config = toml_dict["cq_code"]
+ config.ENABLE_PIC_TRANSLATE = cq_code_config.get("enable_pic_translate", config.ENABLE_PIC_TRANSLATE)
+
# 机器人基础配置
if "bot" in toml_dict:
bot_config = toml_dict["bot"]
diff --git a/src/plugins/chat/cq_code.py b/src/plugins/chat/cq_code.py
index 41e0a31a5..55b4d82d7 100644
--- a/src/plugins/chat/cq_code.py
+++ b/src/plugins/chat/cq_code.py
@@ -1,5 +1,5 @@
from dataclasses import dataclass
-from typing import Dict, Optional
+from typing import Dict, Optional, List, Union
import html
import requests
import base64
@@ -12,6 +12,13 @@ from nonebot.adapters.onebot.v11 import Bot
from .config import global_config, llm_config
import time
import asyncio
+
+#解析各种CQ码
+#包含CQ码类
+
+
+
+
@dataclass
class CQCode:
"""
@@ -25,13 +32,14 @@ class CQCode:
"""
type: str
params: Dict[str, str]
- raw_code: str
+ # raw_code: str
group_id: int
user_id: int
group_name: str = ""
user_nickname: str = ""
translated_plain_text: Optional[str] = None
reply_message: Dict = None # 存储回复消息
+ image_base64: Optional[str] = None
@classmethod
def from_cq_code(cls, cq_code: str, reply: Dict = None) -> 'CQCode':
@@ -39,6 +47,9 @@ class CQCode:
从CQ码字符串创建CQCode对象
例如:[CQ:image,file=1.jpg,url=http://example.com/1.jpg]
"""
+ if not cq_code.startswith('[CQ:'):
+ return cls('text', {'text': cq_code}, cq_code, group_id=0, user_id=0)
+
# 移除前后的[]
content = cq_code[1:-1]
# 分离类型和参数部分
@@ -69,7 +80,10 @@ class CQCode:
if self.type == 'text':
self.translated_plain_text = self.params.get('text', '')
elif self.type == 'image':
- self.translated_plain_text = self.translate_image()
+ if self.params.get('sub_type') == '0':
+ self.translated_plain_text = self.translate_image()
+ else:
+ self.translated_plain_text = self.translate_emoji()
elif self.type == 'at':
from .message import Message
message_obj = Message(
@@ -87,16 +101,8 @@ class CQCode:
else:
self.translated_plain_text = f"[{self.type}]"
- def translate_image(self) -> str:
- """处理图片类型的CQ码,区分普通图片和表情包"""
- if 'url' not in self.params:
- return '[图片]'
-
- # 获取子类型,默认为普通图片(0)
- sub_type = int(self.params.get('sub_type', '0'))
- is_emoji = (sub_type == 1)
-
- # 添加更多请求头
+ def get_img(self):
+ '''
headers = {
'User-Agent': 'QQ/8.9.68.11565 CFNetwork/1220.1 Darwin/20.3.0',
'Accept': 'image/*;q=0.8',
@@ -105,64 +111,71 @@ class CQCode:
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
+ '''
- # 处理URL编码问题
+ headers = {
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36',
+ 'Accept': 'text/html, application/xhtml xml, */*',
+ 'Accept-Encoding': 'gbk, GB2312',
+ 'Accept-Language': 'zh-cn',
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ 'Cache-Control': 'no-cache'
+ }
url = html.unescape(self.params['url'])
-
if not url.startswith(('http://', 'https://')):
- return '[图片]' # 直接返回而不是抛出异常
+ return None # 直接返回而不是抛出异常
- try:
- # 下载图片,增加重试机制
- max_retries = 3
- for retry in range(max_retries):
- try:
- response = requests.get(url, headers=headers, timeout=10, verify=False)
- if response.status_code == 200:
- break
- elif response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
- # 对于腾讯多媒体服务器的链接,直接返回图片描述
- if sub_type == 1:
- return '[QQ表情]'
- return '[图片]'
- time.sleep(1) # 重试前等待1秒
- except requests.RequestException:
- if retry == max_retries - 1:
- raise
- time.sleep(1)
-
- if response.status_code != 200:
- print(f"\033[1;31m[警告]\033[0m 图片下载失败: HTTP {response.status_code}, URL: {url}")
- return '[图片]' # 直接返回而不是抛出异常
-
- # 检查响应内容类型
- content_type = response.headers.get('content-type', '')
- if not content_type.startswith('image/'):
- print(f"\033[1;31m[警告]\033[0m 非图片类型响应: {content_type}")
- return '[图片]' # 直接返回而不是抛出异常
-
- content = response.content
- image_base64 = base64.b64encode(content).decode('utf-8')
-
- # 根据子类型选择不同的处理方式
- if sub_type == 1: # 表情包
- try:
- return self.get_emoji_description(image_base64)
- except Exception as e:
- print(f"\033[1;31m[警告]\033[0m 表情描述生成失败: {str(e)}")
- return '[QQ表情]'
- elif sub_type == 0: # 普通图片
- try:
- return self.get_image_description(image_base64)
- except Exception as e:
- print(f"\033[1;31m[警告]\033[0m 图片描述生成失败: {str(e)}")
- return '[图片]'
- else: # 其他类型都按普通图片处理
- return '[图片]'
-
- except Exception as e:
- print(f"\033[1;31m[警告]\033[0m 图片处理失败: {str(e)}")
- return '[图片]' # 出现任何错误都返回默认文本而不是抛出异常
+ max_retries = 3
+ for retry in range(max_retries):
+ try:
+ response = requests.get(url, headers=headers, timeout=10, verify=False)
+ if response.status_code == 200:
+ break
+ elif response.status_code == 400 and 'multimedia.nt.qq.com.cn' in url:
+ # 对于腾讯多媒体服务器的链接,直接返回图片描述
+ return None
+ time.sleep(1) # 重试前等待1秒
+ except requests.RequestException:
+ if retry == max_retries - 1:
+ raise
+ time.sleep(1)
+ if response.status_code != 200:
+ print(f"\033[1;31m[警告]\033[0m 图片下载失败: HTTP {response.status_code}, URL: {url}")
+ return None # 直接返回而不是抛出异常
+ #检查是否为图片
+ content_type = response.headers.get('content-type', '')
+ if not content_type.startswith('image/'):
+ print(f"\033[1;31m[警告]\033[0m 非图片类型响应: {content_type}")
+ return None # 直接返回而不是抛出异常
+ content = response.content
+ image_base64 = base64.b64encode(content).decode('utf-8')
+ if image_base64:
+ self.image_base64 = image_base64
+ return image_base64
+ else:
+ return None
+
+ def translate_emoji(self) -> str:
+ """处理表情包类型的CQ码"""
+ if 'url' not in self.params:
+ return '[表情包]'
+ base64 = self.get_img()
+ if base64:
+ return self.get_image_description(base64)
+ else:
+ return '[表情包]'
+
+
+ def translate_image(self) -> str:
+ """处理图片类型的CQ码,区分普通图片和表情包"""
+ #没有url,直接返回默认文本
+ if 'url' not in self.params:
+ return '[图片]'
+ base64 = self.get_img()
+ if base64:
+ return self.get_image_description(base64)
+ else:
+ return '[图片]'
def get_emoji_description(self, image_base64: str) -> str:
"""调用AI接口获取表情包描述"""
@@ -254,53 +267,6 @@ class CQCode:
raise ValueError(f"AI接口调用失败: {response.text}")
-
- def get_image_description_is_setu(self, image_base64: str) -> str:
- """调用AI接口获取普通图片描述"""
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {llm_config.SILICONFLOW_API_KEY}"
- }
-
- payload = {
- "model": "deepseek-ai/deepseek-vl2",
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": "请回答我这张图片是否涉及涩情、情色、裸露或性暗示,请严格判断,有任何涩情迹象就回答是,请用是或否回答"
- },
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{image_base64}"
- }
- }
- ]
- }
- ],
- "max_tokens": 300,
- "temperature": 0.6
- }
-
- response = requests.post(
- f"{llm_config.SILICONFLOW_BASE_URL}chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
-
- if response.status_code == 200:
- result_json = response.json()
- if "choices" in result_json and len(result_json["choices"]) > 0:
- description = result_json["choices"][0]["message"]["content"]
- # 如果描述中包含"否",返回否,其他情况返回是
- return "否" if "否" in description else "是"
-
- raise ValueError(f"AI接口调用失败: {response.text}")
-
def translate_forward(self) -> str:
"""处理转发消息"""
try:
@@ -391,7 +357,7 @@ class CQCode:
group_id=self.group_id
)
if message_obj.user_id == global_config.BOT_QQ:
- return f"[回复 麦麦 的消息: {message_obj.processed_plain_text}]"
+ return f"[回复 {global_config.BOT_NICKNAME} 的消息: {message_obj.processed_plain_text}]"
else:
return f"[回复 {self.reply_message.sender.nickname} 的消息: {message_obj.processed_plain_text}]"
@@ -424,7 +390,41 @@ class CQCode:
.replace(',', ',')
# 生成CQ码,设置sub_type=1表示这是表情包
return f"[CQ:image,file=file:///{escaped_path},sub_type=1]"
-
+
+class CQCode_tool:
+ @staticmethod
+ def cq_from_dict_to_class(cq_code: Dict, reply: Optional[Dict] = None) -> CQCode:
+ """
+ 将CQ码字典转换为CQCode对象
+
+ Args:
+ cq_code: CQ码字典
+ reply: 回复消息的字典(可选)
+
+ Returns:
+ CQCode对象
+ """
+ # 处理字典形式的CQ码
+ # 从cq_code字典中获取type字段的值,如果不存在则默认为'text'
+ cq_type = cq_code.get('type', 'text')
+ params = {}
+ if cq_type == 'text':
+ params['text'] = cq_code.get('data', {}).get('text', '')
+ else:
+ params = cq_code.get('data', {})
+
+ instance = CQCode(
+ type=cq_type,
+ params=params,
+ group_id=0,
+ user_id=0,
+ reply_message=reply
+ )
+
+ # 进行翻译处理
+ instance.translate()
+ return instance
+
@staticmethod
def create_reply_cq(message_id: int) -> str:
"""
@@ -434,4 +434,7 @@ class CQCode:
Returns:
回复CQ码字符串
"""
- return f"[CQ:reply,id={message_id}]"
\ No newline at end of file
+ return f"[CQ:reply,id={message_id}]"
+
+
+cq_code_tool = CQCode_tool()
diff --git a/src/plugins/chat/llm_generator.py b/src/plugins/chat/llm_generator.py
index 16cbb42e7..98494d795 100644
--- a/src/plugins/chat/llm_generator.py
+++ b/src/plugins/chat/llm_generator.py
@@ -58,7 +58,7 @@ class LLMResponseGenerator:
else:
self.current_model_type = 'r1_distill' # 默认使用 R1-Distill
- print(f"+++++++++++++++++麦麦{self.current_model_type}思考中+++++++++++++++++")
+ print(f"+++++++++++++++++{global_config.BOT_NICKNAME}{self.current_model_type}思考中+++++++++++++++++")
if self.current_model_type == 'r1':
model_response = await self._generate_r1_response(message)
elif self.current_model_type == 'v3':
@@ -67,7 +67,7 @@ class LLMResponseGenerator:
model_response = await self._generate_r1_distill_response(message)
# 打印情感标签
- print(f'麦麦的回复是:{model_response}')
+ print(f'{global_config.BOT_NICKNAME}的回复是:{model_response}')
model_response, emotion = await self._process_response(model_response)
if model_response:
diff --git a/src/plugins/chat/message.py b/src/plugins/chat/message.py
index a13a309db..c9d6e8391 100644
--- a/src/plugins/chat/message.py
+++ b/src/plugins/chat/message.py
@@ -8,8 +8,9 @@ from ...common.database import Database
from PIL import Image
from .config import BotConfig, global_config
import urllib3
-from .cq_code import CQCode
from .utils_user import get_user_nickname
+from .utils_cq import parse_cq_code
+from .cq_code import cq_code_tool,CQCode
Message = ForwardRef('Message') # 添加这行
@@ -63,12 +64,10 @@ class Message:
self.group_name = self.get_groupname(self.group_id)
if not self.processed_plain_text:
- # 解析消息片段
if self.raw_message:
- # print(f"\033[1;34m[调试信息]\033[0m 原始消息: {self.raw_message}")
self.message_segments = self.parse_message_segments(str(self.raw_message))
self.processed_plain_text = ' '.join(
- seg['translated_text']
+ seg.translated_plain_text
for seg in self.message_segments
)
@@ -87,96 +86,80 @@ class Message:
else:
return f"群{group_id}"
- def parse_message_segments(self, message: str) -> List[Dict]:
+ def parse_message_segments(self, message: str) -> List[CQCode]:
"""
将消息解析为片段列表,包括纯文本和CQ码
返回的列表中每个元素都是字典,包含:
- - type: 'text' 或 CQ码类型
- - data: 对于text类型是文本内容,对于CQ码是参数字典
- - translated_text: 经过处理后的文本
+ - cq_code_list:分割出的聊天对象,包括文本和CQ码
+ - trans_list:翻译后的对象列表
"""
- segments = []
- start = 0
+ cq_code_dict_list = []
+ trans_list = []
+ start = 0
+ print(f"\033[1;34m[调试信息]\033[0m 原始消息: {message}")
while True:
# 查找下一个CQ码的开始位置
cq_start = message.find('[CQ:', start)
+ #如果没有cq码,直接返回文本内容
if cq_start == -1:
# 如果没有找到更多CQ码,添加剩余文本
if start < len(message):
text = message[start:].strip()
if text: # 只添加非空文本
- segments.append({
- 'type': 'text',
- 'data': {'text': text},
- 'translated_text': text
- })
+ cq_code_dict_list.append(parse_cq_code(text))
break
-
# 添加CQ码前的文本
if cq_start > start:
text = message[start:cq_start].strip()
if text: # 只添加非空文本
- segments.append({
- 'type': 'text',
- 'data': {'text': text},
- 'translated_text': text
- })
-
+ cq_code_dict_list.append(parse_cq_code(text))
# 查找CQ码的结束位置
cq_end = message.find(']', cq_start)
if cq_end == -1:
# CQ码未闭合,作为普通文本处理
text = message[cq_start:].strip()
if text:
- segments.append({
- 'type': 'text',
- 'data': {'text': text},
- 'translated_text': text
- })
+ cq_code_dict_list.append(parse_cq_code(text))
break
-
- # 提取完整的CQ码并创建CQCode对象
cq_code = message[cq_start:cq_end + 1]
- try:
- cq_obj = CQCode.from_cq_code(cq_code,reply = self.reply_message)
- # 设置必要的属性
- segments.append({
- 'type': cq_obj.type,
- 'data': cq_obj.params,
- 'translated_text': cq_obj.translated_plain_text
- })
- except Exception as e:
- import traceback
- print(f"\033[1;31m[错误]\033[0m 处理CQ码失败: {str(e)}")
- print(f"CQ码内容: {cq_code}")
- print(f"当前消息属性:")
- print(f"- group_id: {self.group_id}")
- print(f"- user_id: {self.user_id}")
- print(f"- user_nickname: {self.user_nickname}")
- print(f"- group_name: {self.group_name}")
- print("详细错误信息:")
- print(traceback.format_exc())
- # 处理失败时,将CQ码作为普通文本处理
- segments.append({
- 'type': 'text',
- 'data': {'text': cq_code},
- 'translated_text': cq_code
- })
+ #将cq_code解析成字典
+ cq_code_dict_list.append(parse_cq_code(cq_code))
+ # 更新start位置到当前CQ码之后
start = cq_end + 1
+
+ print(f"\033[1;34m[调试信息]\033[0m 提取的消息对象:列表: {cq_code_dict_list}")
-
- if len(segments) == 1 and segments[0]['type'] == 'image':
+ #判定是否是表情包消息,以及是否含有表情包
+ if len(cq_code_dict_list) == 1 and cq_code_dict_list[0]['type'] == 'image':
self.is_emoji = True
self.has_emoji_emoji = True
else:
- for segment in segments:
+ for segment in cq_code_dict_list:
if segment['type'] == 'image' and segment['data'].get('sub_type') == '1':
self.has_emoji_emoji = True
break
+
+
+ #翻译作为字典的CQ码
+ for _code_item in cq_code_dict_list:
+ #一个一个CQ码处理
+ message_obj = cq_code_tool.cq_from_dict_to_class(_code_item,reply = self.reply_message)
+ trans_list.append(message_obj)
+ # except Exception as e:
+ # import traceback
+ # print(f"\033[1;31m[错误]\033[0m 处理CQ码失败: {str(e)}")
+ # print(f"CQ码内容: {cq_code}")
+ # print(f"当前消息属性:")
+ # print(f"- group_id: {self.group_id}")
+ # print(f"- user_id: {self.user_id}")
+ # print(f"- user_nickname: {self.user_nickname}")
+ # print(f"- group_name: {self.group_name}")
+ # print("详细错误信息:")
+ # print(traceback.format_exc())
- return segments
+ return trans_list
class Message_Thinking:
"""消息思考类"""
diff --git a/src/plugins/chat/message_send_control.py b/src/plugins/chat/message_send_control.py
index 61169d069..cee27bf1f 100644
--- a/src/plugins/chat/message_send_control.py
+++ b/src/plugins/chat/message_send_control.py
@@ -9,8 +9,11 @@ from collections import deque
import time
from .storage import MessageStorage
from .config import global_config
+from .cq_code import cq_code_tool
+
if os.name == "nt":
from .message_visualizer import message_visualizer
+
class SendTemp:
@@ -194,7 +197,7 @@ class MessageSendControl:
print(f"- 群组: {group_id} - 内容: {message.processed_plain_text}")
cost_time = round(time.time(), 2) - message.time
if cost_time > 40:
- message.processed_plain_text = CQCode.create_reply_cq(message.message_based_id) + message.processed_plain_text
+ message.processed_plain_text = cq_code_tool.create_reply_cq(message.message_based_id) + message.processed_plain_text
cur_time = time.time()
await self._current_bot.send_group_msg(
group_id=group_id,
@@ -204,7 +207,7 @@ class MessageSendControl:
cost_time = round(time.time(), 2) - cur_time
print(f"\033[1;34m[调试]\033[0m 消息发送时间: {cost_time}秒")
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(message.time))
- print(f"\033[1;32m群 {group_id} 消息, 用户 麦麦, 时间: {current_time}:\033[0m {str(message.processed_plain_text)}")
+ print(f"\033[1;32m群 {group_id} 消息, 用户 {global_config.BOT_NICKNAME}, 时间: {current_time}:\033[0m {str(message.processed_plain_text)}")
await self.storage.store_message(message, None)
queue.update_send_time()
if queue.has_messages():
diff --git a/src/plugins/chat/prompt_builder.py b/src/plugins/chat/prompt_builder.py
index 9d8cf4a62..8d8bf89a4 100644
--- a/src/plugins/chat/prompt_builder.py
+++ b/src/plugins/chat/prompt_builder.py
@@ -5,6 +5,7 @@ from ..schedule.schedule_generator import bot_schedule
import os
from .utils import get_embedding, combine_messages, get_recent_group_messages
from ...common.database import Database
+from .config import global_config
# 获取当前文件的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -67,7 +68,7 @@ class PromptBuilder:
chat_talking_prompt = ''
if group_id:
# 从数据库获取最近消息
- message_objects = get_recent_group_messages(self.db, group_id, limit=16)
+ message_objects = get_recent_group_messages(self.db, group_id, limit=global_config.MAX_CONTEXT_SIZE)
if message_objects:
chat_talking_prompt = combine_messages(message_objects)
# print(f"\033[1;34m[调试]\033[0m 已从数据库获取群 {group_id} 的最近{len(message_objects)}条消息记录")
@@ -88,12 +89,12 @@ class PromptBuilder:
prompt_personality = ''
personality_choice = random.random()
if personality_choice < 4/6: # 第一种人格
- prompt_personality = f'''你的网名叫麦麦,是一个学习地质的女大学生,喜欢摄影,你会刷贴吧,你正在浏览qq群,{promt_info_prompt},
+ prompt_personality = f'''你的网名叫{global_config.BOT_NICKNAME},是一个学习地质的女大学生,喜欢摄影,你会刷贴吧,你正在浏览qq群,{promt_info_prompt},
{activate_prompt}
现在请你给出日常且口语化的回复,平淡一些,尽量简短一些。{is_bot_prompt}
请注意把握群里的聊天内容,不要回复的太有条理,可以有个性。'''
elif personality_choice < 1: # 第二种人格
- prompt_personality = f'''你的网名叫麦麦,是一个女大学生,你有黑色头发,你会刷小红书, 你正在浏览qq群,{promt_info_prompt},
+ prompt_personality = f'''你的网名叫{global_config.BOT_NICKNAME},是一个女大学生,你有黑色头发,你会刷小红书, 你正在浏览qq群,{promt_info_prompt},
{activate_prompt}
现在请你给出日常且口语化的回复,请表现你自己的见解,不要一昧迎合,尽量简短一些。{is_bot_prompt}
请你表达自己的见解和观点。可以有个性。'''
diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py
index 438475a8f..ff4d7680d 100644
--- a/src/plugins/chat/utils.py
+++ b/src/plugins/chat/utils.py
@@ -4,7 +4,7 @@ from typing import List
from .message import Message
import requests
import numpy as np
-from .config import llm_config
+from .config import llm_config, global_config
import re
@@ -29,7 +29,7 @@ def combine_messages(messages: List[Message]) -> str:
def is_mentioned_bot_in_message(message: Message) -> bool:
"""检查消息是否提到了机器人"""
- keywords = ['麦麦', '麦哲伦']
+ keywords = [global_config.BOT_NICKNAME]
for keyword in keywords:
if keyword in message.processed_plain_text:
return True
@@ -37,7 +37,7 @@ def is_mentioned_bot_in_message(message: Message) -> bool:
def is_mentioned_bot_in_txt(message: str) -> bool:
"""检查消息是否提到了机器人"""
- keywords = ['麦麦', '麦哲伦']
+ keywords = [global_config.BOT_NICKNAME]
for keyword in keywords:
if keyword in message:
return True
@@ -315,7 +315,7 @@ def process_llm_response(text: str) -> List[str]:
# 检查分割后的消息数量是否过多(超过3条)
if len(sentences) > 3:
print(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复")
- return ['麦麦不知道哦']
+ return [f'{global_config.BOT_NICKNAME}不知道哦']
return sentences
diff --git a/src/plugins/chat/utils_cq.py b/src/plugins/chat/utils_cq.py
new file mode 100644
index 000000000..7826e6f92
--- /dev/null
+++ b/src/plugins/chat/utils_cq.py
@@ -0,0 +1,72 @@
+def parse_cq_code(cq_code: str) -> dict:
+ """
+ 将CQ码解析为字典对象
+
+ Args:
+ cq_code (str): CQ码字符串,如 [CQ:image,file=xxx.jpg,url=http://xxx]
+
+ Returns:
+ dict: 包含type和参数的字典,如 {'type': 'image', 'data': {'file': 'xxx.jpg', 'url': 'http://xxx'}}
+ """
+ # 检查是否是有效的CQ码
+ if not (cq_code.startswith('[CQ:') and cq_code.endswith(']')):
+ return {'type': 'text', 'data': {'text': cq_code}}
+
+ # 移除前后的 [CQ: 和 ]
+ content = cq_code[4:-1]
+
+ # 分离类型和参数
+ parts = content.split(',')
+ if len(parts) < 1:
+ return {'type': 'text', 'data': {'text': cq_code}}
+
+ cq_type = parts[0]
+ params = {}
+
+ # 处理参数部分
+ if len(parts) > 1:
+ # 遍历所有参数
+ for part in parts[1:]:
+ if '=' in part:
+ key, value = part.split('=', 1)
+ params[key.strip()] = value.strip()
+
+ return {
+ 'type': cq_type,
+ 'data': params
+ }
+
+if __name__ == "__main__":
+ # 测试用例列表
+ test_cases = [
+ # 测试图片CQ码
+ '[CQ:image,summary=,file={6E392FD2-AAA1-5192-F52A-F724A8EC7998}.gif,sub_type=1,url=https://gchat.qpic.cn/gchatpic_new/0/0-0-6E392FD2AAA15192F52AF724A8EC7998/0,file_size=861609]',
+
+ # 测试at CQ码
+ '[CQ:at,qq=123456]',
+
+ # 测试普通文本
+ 'Hello World',
+
+ # 测试face表情CQ码
+ '[CQ:face,id=123]',
+
+ # 测试含有多个逗号的URL
+ '[CQ:image,url=https://example.com/image,with,commas.jpg]',
+
+ # 测试空参数
+ '[CQ:image,summary=]',
+
+ # 测试非法CQ码
+ '[CQ:]',
+ '[CQ:invalid'
+ ]
+
+ # 测试每个用例
+ for i, test_case in enumerate(test_cases, 1):
+ print(f"\n测试用例 {i}:")
+ print(f"输入: {test_case}")
+ result = parse_cq_code(test_case)
+ print(f"输出: {result}")
+ print("-" * 50)
+
diff --git a/src/plugins/schedule/schedule_generator.py b/src/plugins/schedule/schedule_generator.py
index 415e278c0..9164b0ffb 100644
--- a/src/plugins/schedule/schedule_generator.py
+++ b/src/plugins/schedule/schedule_generator.py
@@ -49,7 +49,7 @@ class ScheduleGenerator:
elif read_only == False:
print(f"{date_str}的日程不存在,准备生成新的日程。")
- prompt = f"""我是麦麦,一个地质学大二女大学生,喜欢刷qq,贴吧,知乎和小红书,请为我生成{date_str}({weekday})的日程安排,包括:
+ prompt = f"""我是{global_config.BOT_NICKNAME},一个地质学大二女大学生,喜欢刷qq,贴吧,知乎和小红书,请为我生成{date_str}({weekday})的日程安排,包括:
1. 早上的学习和工作安排
2. 下午的活动和任务
3. 晚上的计划和休息时间