Merge branch 'master' of https://github.com/MaiBot-Plus/MaiMbot-Pro-Max
This commit is contained in:
@@ -83,60 +83,164 @@ class ChatConfig(ValidatedConfigBase):
|
||||
proactive_thinking_interval: int = Field(default=1500, description="主动思考间隔")
|
||||
proactive_thinking_prompt_template: str = Field(default="", description="主动思考提示模板")
|
||||
|
||||
def get_current_talk_frequency(self, stream_id: str) -> float:
|
||||
def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float:
|
||||
"""
|
||||
根据时间和聊天流ID获取当前的聊天频率
|
||||
|
||||
根据当前时间和聊天流获取对应的 talk_frequency
|
||||
|
||||
Args:
|
||||
stream_id: 聊天流ID
|
||||
|
||||
chat_stream_id: 聊天流ID,格式为 "platform:chat_id:type"
|
||||
|
||||
Returns:
|
||||
float: 当前聊天频率
|
||||
float: 对应的频率值
|
||||
"""
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
# 获取当前时间
|
||||
current_time = datetime.now()
|
||||
current_hour = current_time.hour
|
||||
current_minute = current_time.minute
|
||||
current_time_str = f"{current_hour:02d}:{current_minute:02d}"
|
||||
|
||||
# 查找匹配的聊天频率调整配置
|
||||
for config_entry in self.talk_frequency_adjust:
|
||||
if not config_entry:
|
||||
continue
|
||||
|
||||
# 第一个元素是聊天流匹配模式
|
||||
stream_pattern = config_entry[0]
|
||||
|
||||
# 检查是否匹配当前聊天流
|
||||
if stream_pattern == "" or stream_pattern in stream_id:
|
||||
# 查找当前时间对应的频率
|
||||
current_frequency = self.talk_frequency
|
||||
|
||||
# 遍历时间-频率对
|
||||
for time_freq_pair in config_entry[1:]:
|
||||
if "," not in time_freq_pair:
|
||||
continue
|
||||
|
||||
time_part, freq_part = time_freq_pair.split(",", 1)
|
||||
try:
|
||||
config_hour, config_minute = map(int, time_part.split(":"))
|
||||
config_time_minutes = config_hour * 60 + config_minute
|
||||
current_time_minutes = current_hour * 60 + current_minute
|
||||
|
||||
# 如果当前时间大于等于配置时间,更新频率
|
||||
if current_time_minutes >= config_time_minutes:
|
||||
current_frequency = float(freq_part)
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
return current_frequency
|
||||
|
||||
# 如果没有找到匹配的配置,返回默认频率
|
||||
if not self.talk_frequency_adjust:
|
||||
return self.talk_frequency
|
||||
|
||||
# 优先检查聊天流特定的配置
|
||||
if chat_stream_id:
|
||||
stream_frequency = self._get_stream_specific_frequency(chat_stream_id)
|
||||
if stream_frequency is not None:
|
||||
return stream_frequency
|
||||
|
||||
# 检查全局时段配置(第一个元素为空字符串的配置)
|
||||
global_frequency = self._get_global_frequency()
|
||||
if global_frequency is not None:
|
||||
return global_frequency
|
||||
|
||||
# 如果都没有匹配,返回默认值
|
||||
return self.talk_frequency
|
||||
|
||||
def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]:
|
||||
"""
|
||||
根据时间配置列表获取当前时段的频率
|
||||
|
||||
Args:
|
||||
time_freq_list: 时间频率配置列表,格式为 ["HH:MM,frequency", ...]
|
||||
|
||||
Returns:
|
||||
float: 频率值,如果没有配置则返回 None
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
current_time = datetime.now().strftime("%H:%M")
|
||||
current_hour, current_minute = map(int, current_time.split(":"))
|
||||
current_minutes = current_hour * 60 + current_minute
|
||||
|
||||
# 解析时间频率配置
|
||||
time_freq_pairs = []
|
||||
for time_freq_str in time_freq_list:
|
||||
try:
|
||||
time_str, freq_str = time_freq_str.split(",")
|
||||
hour, minute = map(int, time_str.split(":"))
|
||||
frequency = float(freq_str)
|
||||
minutes = hour * 60 + minute
|
||||
time_freq_pairs.append((minutes, frequency))
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
if not time_freq_pairs:
|
||||
return None
|
||||
|
||||
# 按时间排序
|
||||
time_freq_pairs.sort(key=lambda x: x[0])
|
||||
|
||||
# 查找当前时间对应的频率
|
||||
current_frequency = None
|
||||
for minutes, frequency in time_freq_pairs:
|
||||
if current_minutes >= minutes:
|
||||
current_frequency = frequency
|
||||
else:
|
||||
break
|
||||
|
||||
# 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑)
|
||||
if current_frequency is None and time_freq_pairs:
|
||||
current_frequency = time_freq_pairs[-1][1]
|
||||
|
||||
return current_frequency
|
||||
|
||||
def _get_stream_specific_frequency(self, chat_stream_id: str):
|
||||
"""
|
||||
获取特定聊天流在当前时间的频率
|
||||
|
||||
Args:
|
||||
chat_stream_id: 聊天流ID(哈希值)
|
||||
|
||||
Returns:
|
||||
float: 频率值,如果没有配置则返回 None
|
||||
"""
|
||||
# 查找匹配的聊天流配置
|
||||
for config_item in self.talk_frequency_adjust:
|
||||
if not config_item or len(config_item) < 2:
|
||||
continue
|
||||
|
||||
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
|
||||
|
||||
# 解析配置字符串并生成对应的 chat_id
|
||||
config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str)
|
||||
if config_chat_id is None:
|
||||
continue
|
||||
|
||||
# 比较生成的 chat_id
|
||||
if config_chat_id != chat_stream_id:
|
||||
continue
|
||||
|
||||
# 使用通用的时间频率解析方法
|
||||
return self._get_time_based_frequency(config_item[1:])
|
||||
|
||||
return None
|
||||
|
||||
def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]:
|
||||
"""
|
||||
解析流配置字符串并生成对应的 chat_id
|
||||
|
||||
Args:
|
||||
stream_config_str: 格式为 "platform:id:type" 的字符串
|
||||
|
||||
Returns:
|
||||
str: 生成的 chat_id,如果解析失败则返回 None
|
||||
"""
|
||||
try:
|
||||
parts = stream_config_str.split(":")
|
||||
if len(parts) != 3:
|
||||
return None
|
||||
|
||||
platform = parts[0]
|
||||
id_str = parts[1]
|
||||
stream_type = parts[2]
|
||||
|
||||
# 判断是否为群聊
|
||||
is_group = stream_type == "group"
|
||||
|
||||
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id
|
||||
import hashlib
|
||||
|
||||
if is_group:
|
||||
components = [platform, str(id_str)]
|
||||
else:
|
||||
components = [platform, str(id_str), "private"]
|
||||
key = "_".join(components)
|
||||
return hashlib.md5(key.encode()).hexdigest()
|
||||
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
def _get_global_frequency(self) -> Optional[float]:
|
||||
"""
|
||||
获取全局默认频率配置
|
||||
|
||||
Returns:
|
||||
float: 频率值,如果没有配置则返回 None
|
||||
"""
|
||||
for config_item in self.talk_frequency_adjust:
|
||||
if not config_item or len(config_item) < 2:
|
||||
continue
|
||||
|
||||
# 检查是否为全局默认配置(第一个元素为空字符串)
|
||||
if config_item[0] == "":
|
||||
return self._get_time_based_frequency(config_item[1:])
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
||||
class MessageReceiveConfig(ValidatedConfigBase):
|
||||
@@ -160,49 +264,131 @@ class ExpressionConfig(ValidatedConfigBase):
|
||||
expression_learning: list[list] = Field(default_factory=lambda: [], description="表达学习")
|
||||
expression_groups: list[list[str]] = Field(default_factory=list, description="表达组")
|
||||
|
||||
def get_expression_config_for_chat(self, chat_id: str) -> tuple[bool, bool, float]:
|
||||
def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]:
|
||||
"""
|
||||
获取指定聊天流的表达配置
|
||||
|
||||
解析流配置字符串并生成对应的 chat_id
|
||||
|
||||
Args:
|
||||
chat_id: 聊天流ID
|
||||
|
||||
stream_config_str: 格式为 "platform:id:type" 的字符串
|
||||
|
||||
Returns:
|
||||
tuple[bool, bool, float]: (use_expression, enable_learning, learning_intensity)
|
||||
str: 生成的 chat_id,如果解析失败则返回 None
|
||||
"""
|
||||
# 默认值
|
||||
use_expression = False
|
||||
enable_learning = False
|
||||
learning_intensity = 1.0
|
||||
|
||||
# 查找匹配的表达学习配置
|
||||
for config_entry in self.expression_learning:
|
||||
if not config_entry or len(config_entry) < 4:
|
||||
try:
|
||||
parts = stream_config_str.split(":")
|
||||
if len(parts) != 3:
|
||||
return None
|
||||
|
||||
platform = parts[0]
|
||||
id_str = parts[1]
|
||||
stream_type = parts[2]
|
||||
|
||||
# 判断是否为群聊
|
||||
is_group = stream_type == "group"
|
||||
|
||||
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id
|
||||
import hashlib
|
||||
|
||||
if is_group:
|
||||
components = [platform, str(id_str)]
|
||||
else:
|
||||
components = [platform, str(id_str), "private"]
|
||||
key = "_".join(components)
|
||||
return hashlib.md5(key.encode()).hexdigest()
|
||||
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
def get_expression_config_for_chat(self, chat_stream_id: Optional[str] = None) -> tuple[bool, bool, float]:
|
||||
"""
|
||||
根据聊天流ID获取表达配置
|
||||
|
||||
Args:
|
||||
chat_stream_id: 聊天流ID,格式为哈希值
|
||||
|
||||
Returns:
|
||||
tuple: (是否使用表达, 是否学习表达, 学习间隔)
|
||||
"""
|
||||
if not self.expression_learning:
|
||||
# 如果没有配置,使用默认值:启用表达,启用学习,300秒间隔
|
||||
return True, True, 300
|
||||
|
||||
# 优先检查聊天流特定的配置
|
||||
if chat_stream_id:
|
||||
specific_config = self._get_stream_specific_config(chat_stream_id)
|
||||
if specific_config is not None:
|
||||
return specific_config
|
||||
|
||||
# 检查全局配置(第一个元素为空字符串的配置)
|
||||
global_config = self._get_global_config()
|
||||
if global_config is not None:
|
||||
return global_config
|
||||
|
||||
# 如果都没有匹配,返回默认值
|
||||
return True, True, 300
|
||||
|
||||
def _get_stream_specific_config(self, chat_stream_id: str) -> Optional[tuple[bool, bool, float]]:
|
||||
"""
|
||||
获取特定聊天流的表达配置
|
||||
|
||||
Args:
|
||||
chat_stream_id: 聊天流ID(哈希值)
|
||||
|
||||
Returns:
|
||||
tuple: (是否使用表达, 是否学习表达, 学习间隔),如果没有配置则返回 None
|
||||
"""
|
||||
for config_item in self.expression_learning:
|
||||
if not config_item or len(config_item) < 4:
|
||||
continue
|
||||
|
||||
# 配置格式: [chat_pattern, use_expression, enable_learning, learning_intensity]
|
||||
chat_pattern = config_entry[0]
|
||||
|
||||
# 检查是否匹配当前聊天流
|
||||
if chat_pattern == "" or chat_pattern in chat_id:
|
||||
|
||||
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
|
||||
|
||||
# 如果是空字符串,跳过(这是全局配置)
|
||||
if stream_config_str == "":
|
||||
continue
|
||||
|
||||
# 解析配置字符串并生成对应的 chat_id
|
||||
config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str)
|
||||
if config_chat_id is None:
|
||||
continue
|
||||
|
||||
# 比较生成的 chat_id
|
||||
if config_chat_id != chat_stream_id:
|
||||
continue
|
||||
|
||||
# 解析配置
|
||||
try:
|
||||
use_expression = config_item[1].lower() == "enable"
|
||||
enable_learning = config_item[2].lower() == "enable"
|
||||
learning_intensity = float(config_item[3])
|
||||
return use_expression, enable_learning, learning_intensity
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
def _get_global_config(self) -> Optional[tuple[bool, bool, float]]:
|
||||
"""
|
||||
获取全局表达配置
|
||||
|
||||
Returns:
|
||||
tuple: (是否使用表达, 是否学习表达, 学习间隔),如果没有配置则返回 None
|
||||
"""
|
||||
for config_item in self.expression_learning:
|
||||
if not config_item or len(config_item) < 4:
|
||||
continue
|
||||
|
||||
# 检查是否为全局配置(第一个元素为空字符串)
|
||||
if config_item[0] == "":
|
||||
try:
|
||||
# 解析配置值
|
||||
use_expr_str = config_entry[1].lower() if isinstance(config_entry[1], str) else str(config_entry[1])
|
||||
enable_learn_str = config_entry[2].lower() if isinstance(config_entry[2], str) else str(config_entry[2])
|
||||
|
||||
use_expression = use_expr_str in ['enable', 'true', '1']
|
||||
enable_learning = enable_learn_str in ['enable', 'true', '1']
|
||||
learning_intensity = float(config_entry[3])
|
||||
|
||||
# 找到匹配的配置后返回
|
||||
use_expression = config_item[1].lower() == "enable"
|
||||
enable_learning = config_item[2].lower() == "enable"
|
||||
learning_intensity = float(config_item[3])
|
||||
return use_expression, enable_learning, learning_intensity
|
||||
|
||||
except (ValueError, IndexError, TypeError):
|
||||
# 如果解析失败,继续查找下一个配置
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
|
||||
# 如果没有找到匹配的配置,返回默认值
|
||||
return use_expression, enable_learning, learning_intensity
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user