diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 2ef5cbd79..81ee5cacd 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -83,60 +83,164 @@ class ChatConfig(ValidatedConfigBase): proactive_thinking_interval: int = Field(default=1500, description="主动思考间隔") proactive_thinking_prompt_template: str = Field(default="", description="主动思考提示模板") - def get_current_talk_frequency(self, stream_id: str) -> float: + def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float: """ - 根据时间和聊天流ID获取当前的聊天频率 - + 根据当前时间和聊天流获取对应的 talk_frequency + Args: - stream_id: 聊天流ID - + chat_stream_id: 聊天流ID,格式为 "platform:chat_id:type" + Returns: - float: 当前聊天频率 + float: 对应的频率值 """ - import time - from datetime import datetime - - # 获取当前时间 - current_time = datetime.now() - current_hour = current_time.hour - current_minute = current_time.minute - current_time_str = f"{current_hour:02d}:{current_minute:02d}" - - # 查找匹配的聊天频率调整配置 - for config_entry in self.talk_frequency_adjust: - if not config_entry: - continue - - # 第一个元素是聊天流匹配模式 - stream_pattern = config_entry[0] - - # 检查是否匹配当前聊天流 - if stream_pattern == "" or stream_pattern in stream_id: - # 查找当前时间对应的频率 - current_frequency = self.talk_frequency - - # 遍历时间-频率对 - for time_freq_pair in config_entry[1:]: - if "," not in time_freq_pair: - continue - - time_part, freq_part = time_freq_pair.split(",", 1) - try: - config_hour, config_minute = map(int, time_part.split(":")) - config_time_minutes = config_hour * 60 + config_minute - current_time_minutes = current_hour * 60 + current_minute - - # 如果当前时间大于等于配置时间,更新频率 - if current_time_minutes >= config_time_minutes: - current_frequency = float(freq_part) - except (ValueError, IndexError): - continue - - return current_frequency - - # 如果没有找到匹配的配置,返回默认频率 + if not self.talk_frequency_adjust: + return self.talk_frequency + + # 优先检查聊天流特定的配置 + if chat_stream_id: + stream_frequency = self._get_stream_specific_frequency(chat_stream_id) + if stream_frequency is not None: + return stream_frequency + + # 检查全局时段配置(第一个元素为空字符串的配置) + global_frequency = self._get_global_frequency() + if global_frequency is not None: + return global_frequency + + # 如果都没有匹配,返回默认值 return self.talk_frequency + def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]: + """ + 根据时间配置列表获取当前时段的频率 + + Args: + time_freq_list: 时间频率配置列表,格式为 ["HH:MM,frequency", ...] + + Returns: + float: 频率值,如果没有配置则返回 None + """ + from datetime import datetime + + current_time = datetime.now().strftime("%H:%M") + current_hour, current_minute = map(int, current_time.split(":")) + current_minutes = current_hour * 60 + current_minute + + # 解析时间频率配置 + time_freq_pairs = [] + for time_freq_str in time_freq_list: + try: + time_str, freq_str = time_freq_str.split(",") + hour, minute = map(int, time_str.split(":")) + frequency = float(freq_str) + minutes = hour * 60 + minute + time_freq_pairs.append((minutes, frequency)) + except (ValueError, IndexError): + continue + + if not time_freq_pairs: + return None + + # 按时间排序 + time_freq_pairs.sort(key=lambda x: x[0]) + + # 查找当前时间对应的频率 + current_frequency = None + for minutes, frequency in time_freq_pairs: + if current_minutes >= minutes: + current_frequency = frequency + else: + break + + # 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑) + if current_frequency is None and time_freq_pairs: + current_frequency = time_freq_pairs[-1][1] + + return current_frequency + + def _get_stream_specific_frequency(self, chat_stream_id: str): + """ + 获取特定聊天流在当前时间的频率 + + Args: + chat_stream_id: 聊天流ID(哈希值) + + Returns: + float: 频率值,如果没有配置则返回 None + """ + # 查找匹配的聊天流配置 + for config_item in self.talk_frequency_adjust: + if not config_item or len(config_item) < 2: + continue + + stream_config_str = config_item[0] # 例如 "qq:1026294844:group" + + # 解析配置字符串并生成对应的 chat_id + config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str) + if config_chat_id is None: + continue + + # 比较生成的 chat_id + if config_chat_id != chat_stream_id: + continue + + # 使用通用的时间频率解析方法 + return self._get_time_based_frequency(config_item[1:]) + + return None + + def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: + """ + 解析流配置字符串并生成对应的 chat_id + + Args: + stream_config_str: 格式为 "platform:id:type" 的字符串 + + Returns: + str: 生成的 chat_id,如果解析失败则返回 None + """ + try: + parts = stream_config_str.split(":") + if len(parts) != 3: + return None + + platform = parts[0] + id_str = parts[1] + stream_type = parts[2] + + # 判断是否为群聊 + is_group = stream_type == "group" + + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id + import hashlib + + if is_group: + components = [platform, str(id_str)] + else: + components = [platform, str(id_str), "private"] + key = "_".join(components) + return hashlib.md5(key.encode()).hexdigest() + + except (ValueError, IndexError): + return None + + def _get_global_frequency(self) -> Optional[float]: + """ + 获取全局默认频率配置 + + Returns: + float: 频率值,如果没有配置则返回 None + """ + for config_item in self.talk_frequency_adjust: + if not config_item or len(config_item) < 2: + continue + + # 检查是否为全局默认配置(第一个元素为空字符串) + if config_item[0] == "": + return self._get_time_based_frequency(config_item[1:]) + + return None + class MessageReceiveConfig(ValidatedConfigBase): @@ -160,49 +264,131 @@ class ExpressionConfig(ValidatedConfigBase): expression_learning: list[list] = Field(default_factory=lambda: [], description="表达学习") expression_groups: list[list[str]] = Field(default_factory=list, description="表达组") - def get_expression_config_for_chat(self, chat_id: str) -> tuple[bool, bool, float]: + def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]: """ - 获取指定聊天流的表达配置 - + 解析流配置字符串并生成对应的 chat_id + Args: - chat_id: 聊天流ID - + stream_config_str: 格式为 "platform:id:type" 的字符串 + Returns: - tuple[bool, bool, float]: (use_expression, enable_learning, learning_intensity) + str: 生成的 chat_id,如果解析失败则返回 None """ - # 默认值 - use_expression = False - enable_learning = False - learning_intensity = 1.0 - - # 查找匹配的表达学习配置 - for config_entry in self.expression_learning: - if not config_entry or len(config_entry) < 4: + try: + parts = stream_config_str.split(":") + if len(parts) != 3: + return None + + platform = parts[0] + id_str = parts[1] + stream_type = parts[2] + + # 判断是否为群聊 + is_group = stream_type == "group" + + # 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id + import hashlib + + if is_group: + components = [platform, str(id_str)] + else: + components = [platform, str(id_str), "private"] + key = "_".join(components) + return hashlib.md5(key.encode()).hexdigest() + + except (ValueError, IndexError): + return None + + def get_expression_config_for_chat(self, chat_stream_id: Optional[str] = None) -> tuple[bool, bool, float]: + """ + 根据聊天流ID获取表达配置 + + Args: + chat_stream_id: 聊天流ID,格式为哈希值 + + Returns: + tuple: (是否使用表达, 是否学习表达, 学习间隔) + """ + if not self.expression_learning: + # 如果没有配置,使用默认值:启用表达,启用学习,300秒间隔 + return True, True, 300 + + # 优先检查聊天流特定的配置 + if chat_stream_id: + specific_config = self._get_stream_specific_config(chat_stream_id) + if specific_config is not None: + return specific_config + + # 检查全局配置(第一个元素为空字符串的配置) + global_config = self._get_global_config() + if global_config is not None: + return global_config + + # 如果都没有匹配,返回默认值 + return True, True, 300 + + def _get_stream_specific_config(self, chat_stream_id: str) -> Optional[tuple[bool, bool, float]]: + """ + 获取特定聊天流的表达配置 + + Args: + chat_stream_id: 聊天流ID(哈希值) + + Returns: + tuple: (是否使用表达, 是否学习表达, 学习间隔),如果没有配置则返回 None + """ + for config_item in self.expression_learning: + if not config_item or len(config_item) < 4: continue - - # 配置格式: [chat_pattern, use_expression, enable_learning, learning_intensity] - chat_pattern = config_entry[0] - - # 检查是否匹配当前聊天流 - if chat_pattern == "" or chat_pattern in chat_id: + + stream_config_str = config_item[0] # 例如 "qq:1026294844:group" + + # 如果是空字符串,跳过(这是全局配置) + if stream_config_str == "": + continue + + # 解析配置字符串并生成对应的 chat_id + config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str) + if config_chat_id is None: + continue + + # 比较生成的 chat_id + if config_chat_id != chat_stream_id: + continue + + # 解析配置 + try: + use_expression = config_item[1].lower() == "enable" + enable_learning = config_item[2].lower() == "enable" + learning_intensity = float(config_item[3]) + return use_expression, enable_learning, learning_intensity + except (ValueError, IndexError): + continue + + return None + + def _get_global_config(self) -> Optional[tuple[bool, bool, float]]: + """ + 获取全局表达配置 + + Returns: + tuple: (是否使用表达, 是否学习表达, 学习间隔),如果没有配置则返回 None + """ + for config_item in self.expression_learning: + if not config_item or len(config_item) < 4: + continue + + # 检查是否为全局配置(第一个元素为空字符串) + if config_item[0] == "": try: - # 解析配置值 - use_expr_str = config_entry[1].lower() if isinstance(config_entry[1], str) else str(config_entry[1]) - enable_learn_str = config_entry[2].lower() if isinstance(config_entry[2], str) else str(config_entry[2]) - - use_expression = use_expr_str in ['enable', 'true', '1'] - enable_learning = enable_learn_str in ['enable', 'true', '1'] - learning_intensity = float(config_entry[3]) - - # 找到匹配的配置后返回 + use_expression = config_item[1].lower() == "enable" + enable_learning = config_item[2].lower() == "enable" + learning_intensity = float(config_item[3]) return use_expression, enable_learning, learning_intensity - - except (ValueError, IndexError, TypeError): - # 如果解析失败,继续查找下一个配置 + except (ValueError, IndexError): continue - - # 如果没有找到匹配的配置,返回默认值 - return use_expression, enable_learning, learning_intensity + + return None