diff --git a/src/chat/chat_loop/heartFC_chat.py b/src/chat/chat_loop/heartFC_chat.py index 35b5ec725..bfbde2ccc 100644 --- a/src/chat/chat_loop/heartFC_chat.py +++ b/src/chat/chat_loop/heartFC_chat.py @@ -154,10 +154,32 @@ class HeartFChatting: 请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""", } + + # 主动思考配置 - 支持新旧配置格式 self.proactive_thinking_chat_scope = global_config.chat.The_scope_that_proactive_thinking_can_trigger if self.proactive_thinking_chat_scope not in self.VALID_PROACTIVE_SCOPES: logger.error(f"无效的主动思考范围: '{self.proactive_thinking_chat_scope}'。有效值为: {self.VALID_PROACTIVE_SCOPES}") raise ValueError(f"配置错误:无效的主动思考范围 '{self.proactive_thinking_chat_scope}'") #乱填参数是吧,我跟你爆了 + + # 新的配置项 - 分离的私聊/群聊控制 + self.proactive_thinking_in_private = global_config.chat.proactive_thinking_in_private + self.proactive_thinking_in_group = global_config.chat.proactive_thinking_in_group + + # ID列表控制(支持新旧两个字段) + self.proactive_thinking_ids = [] + if hasattr(global_config.chat, 'enable_ids') and global_config.chat.enable_ids: + self.proactive_thinking_ids = global_config.chat.enable_ids + elif hasattr(global_config.chat, 'proactive_thinking_enable_ids') and global_config.chat.proactive_thinking_enable_ids: + self.proactive_thinking_ids = global_config.chat.proactive_thinking_enable_ids + + # 正态分布时间间隔配置 + self.delta_sigma = getattr(global_config.chat, 'delta_sigma', 120) + + # 打印主动思考配置信息 + logger.info(f"{self.log_prefix} 主动思考配置: 启用={global_config.chat.enable_proactive_thinking}, " + f"旧范围={self.proactive_thinking_chat_scope}, 私聊={self.proactive_thinking_in_private}, " + f"群聊={self.proactive_thinking_in_group}, ID列表={self.proactive_thinking_ids}, " + f"基础间隔={global_config.chat.proactive_thinking_interval}s, Delta={self.delta_sigma}") async def start(self): """检查是否需要启动主循环,如果未激活则启动。""" @@ -288,21 +310,24 @@ class HeartFChatting: async def _proactive_thinking_loop(self): """主动思考循环,仅在focus模式下生效""" while self.running: - await asyncio.sleep(30) # 每30秒检查一次 + await asyncio.sleep(15) # 每15秒检查一次 # 只在focus模式下进行主动思考 if self.loop_mode != ChatMode.FOCUS: continue - if self.proactive_thinking_chat_scope == "group" and self.chat_stream.group_info is None: - continue - if self.proactive_thinking_chat_scope == "private" and self.chat_stream.group_info is not None: + + # 检查是否应该在当前聊天类型中启用主动思考 + if not self._should_enable_proactive_thinking(): continue current_time = time.time() silence_duration = current_time - self.last_message_time + # 使用正态分布计算动态间隔时间 + target_interval = self._get_dynamic_thinking_interval() + # 检查是否达到主动思考的时间间隔 - if silence_duration >= global_config.chat.proactive_thinking_interval: + if silence_duration >= target_interval: try: await self._execute_proactive_thinking(silence_duration) # 重置计时器,避免频繁触发 @@ -310,6 +335,125 @@ class HeartFChatting: except Exception as e: logger.error(f"{self.log_prefix} 主动思考执行出错: {e}") logger.error(traceback.format_exc()) + + def _should_enable_proactive_thinking(self) -> bool: + """检查是否应该在当前聊天中启用主动思考""" + # 获取当前聊天ID + chat_id = None + if hasattr(self.chat_stream, 'chat_id'): + chat_id = int(self.chat_stream.chat_id) + + # 如果指定了ID列表,只在列表中的聊天启用 + if self.proactive_thinking_ids: + if chat_id is None or chat_id not in self.proactive_thinking_ids: + return False + + # 检查聊天类型(私聊/群聊)控制 + is_group_chat = self.chat_stream.group_info is not None + + if is_group_chat: + # 群聊:检查群聊启用开关 + if not self.proactive_thinking_in_group: + return False + else: + # 私聊:检查私聊启用开关 + if not self.proactive_thinking_in_private: + return False + + # 兼容旧的范围配置 + if self.proactive_thinking_chat_scope == "group" and not is_group_chat: + return False + if self.proactive_thinking_chat_scope == "private" and is_group_chat: + return False + + return True + + def _get_dynamic_thinking_interval(self) -> float: + """获取动态的主动思考间隔时间(使用正态分布和3-sigma规则)""" + try: + from src.utils.timing_utils import get_normal_distributed_interval + + base_interval = global_config.chat.proactive_thinking_interval + + # 🚨 保险机制:处理负数配置 + if base_interval < 0: + logger.warning(f"{self.log_prefix} proactive_thinking_interval设置为{base_interval}为负数,使用绝对值{abs(base_interval)}") + base_interval = abs(base_interval) + + if self.delta_sigma < 0: + logger.warning(f"{self.log_prefix} delta_sigma设置为{self.delta_sigma}为负数,使用绝对值{abs(self.delta_sigma)}") + delta_sigma = abs(self.delta_sigma) + else: + delta_sigma = self.delta_sigma + + # 🚨 特殊情况处理 + if base_interval == 0 and delta_sigma == 0: + logger.warning(f"{self.log_prefix} 基础间隔和Delta都为0,强制使用300秒安全间隔") + return 300 + elif base_interval == 0: + # 基础间隔为0,但有delta_sigma,基于delta_sigma生成随机间隔 + logger.info(f"{self.log_prefix} 基础间隔为0,使用纯随机模式,基于delta_sigma={delta_sigma}") + sigma_percentage = delta_sigma / 1000 # 假设1000秒作为虚拟基准 + result = get_normal_distributed_interval(0, sigma_percentage, 1, 86400, use_3sigma_rule=True) + logger.debug(f"{self.log_prefix} 纯随机模式生成间隔: {result}秒") + return result + elif delta_sigma == 0: + # 禁用正态分布,使用固定间隔 + logger.debug(f"{self.log_prefix} delta_sigma=0,禁用正态分布,使用固定间隔{base_interval}秒") + return base_interval + + # 正常情况:使用3-sigma规则的正态分布 + sigma_percentage = delta_sigma / base_interval + + # 3-sigma边界计算 + sigma = delta_sigma + three_sigma_range = 3 * sigma + theoretical_min = max(1, base_interval - three_sigma_range) + theoretical_max = base_interval + three_sigma_range + + logger.debug(f"{self.log_prefix} 3-sigma分布: 基础={base_interval}s, σ={sigma}s, " + f"理论范围=[{theoretical_min:.0f}, {theoretical_max:.0f}]s") + + # 给用户最大自由度:使用3-sigma规则但不强制限制范围 + result = get_normal_distributed_interval( + base_interval, + sigma_percentage, + 1, # 最小1秒 + 86400, # 最大24小时 + use_3sigma_rule=True + ) + + return result + + except ImportError: + # 如果timing_utils不可用,回退到固定间隔 + logger.warning(f"{self.log_prefix} timing_utils不可用,使用固定间隔") + return max(300, abs(global_config.chat.proactive_thinking_interval)) + except Exception as e: + # 如果计算出错,回退到固定间隔 + logger.error(f"{self.log_prefix} 动态间隔计算出错: {e},使用固定间隔") + return max(300, abs(global_config.chat.proactive_thinking_interval)) + + def _generate_random_interval_from_sigma(self, sigma: float) -> float: + """基于sigma值生成纯随机间隔(当基础间隔为0时使用)""" + try: + import numpy as np + + # 使用sigma作为标准差,0作为均值生成正态分布 + interval = abs(np.random.normal(loc=0, scale=sigma)) + + # 确保最小值 + interval = max(interval, 30) # 最小30秒 + + # 限制最大值防止过度极端 + interval = min(interval, 86400) # 最大24小时 + + logger.debug(f"{self.log_prefix} 纯随机模式生成间隔: {int(interval)}秒") + return int(interval) + + except Exception as e: + logger.error(f"{self.log_prefix} 纯随机间隔生成失败: {e}") + return 300 # 回退到5分钟 def _format_duration(self, seconds: float) -> str: """格式化时间间隔为易读格式""" @@ -333,10 +477,14 @@ class HeartFChatting: logger.info(f"{self.log_prefix} 触发主动思考,已沉默{formatted_time}") try: - # 根据聊天类型选择prompt - chat_type = "group" if self.chat_stream.group_info else "private" - prompt_template = self.proactive_thinking_prompts.get(chat_type, self.proactive_thinking_prompts["group"]) - proactive_prompt = prompt_template.format(time=formatted_time) + # 优先使用配置文件中的prompt模板,如果没有则使用内置模板 + if hasattr(global_config.chat, 'proactive_thinking_prompt_template') and global_config.chat.proactive_thinking_prompt_template.strip(): + proactive_prompt = global_config.chat.proactive_thinking_prompt_template.format(time=formatted_time) + else: + # 回退到内置的prompt模板 + chat_type = "group" if self.chat_stream.group_info else "private" + prompt_template = self.proactive_thinking_prompts.get(chat_type, self.proactive_thinking_prompts["group"]) + proactive_prompt = prompt_template.format(time=formatted_time) # 创建一个虚拟的消息数据用于主动思考 thinking_message = { diff --git a/src/config/official_configs.py b/src/config/official_configs.py index e734bb08d..2530e40db 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -82,6 +82,17 @@ class ChatConfig(ValidatedConfigBase): enable_proactive_thinking: bool = Field(default=False, description="启用主动思考") proactive_thinking_interval: int = Field(default=1500, description="主动思考间隔") The_scope_that_proactive_thinking_can_trigger: str = Field(default="all", description="主动思考可以触发的范围") + proactive_thinking_in_private: bool = Field(default=True, description="主动思考可以在私聊里面启用") + proactive_thinking_in_group: bool = Field(default=True, description="主动思考可以在群聊里面启用") + proactive_thinking_enable_ids: List[int] = Field(default_factory=list, description="启用主动思考的范围,不区分群聊和私聊,为空则不限制") + delta_sigma: int = Field(default=120, description="采用正态分布随机时间间隔") + enable_ids: List[int] = Field(default_factory=lambda: [123456, 234567], description="启用主动思考的范围,不区分群聊和私聊,为空则不限制") + proactive_thinking_prompt_template: str = Field(default="""现在群里面已经隔了{time}没有人发送消息了,请你结合上下文以及群聊里面之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择: + +1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时) +2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时) + +请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""", description="主动思考提示模板") def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float: """ diff --git a/src/llm_models/model_client/gemini_client.py b/src/llm_models/model_client/gemini_client.py new file mode 100644 index 000000000..9bda858ef --- /dev/null +++ b/src/llm_models/model_client/gemini_client.py @@ -0,0 +1,603 @@ +import asyncio +import io +import base64 +from typing import Callable, AsyncIterator, Optional, Coroutine, Any, List, Dict, Union + +import google.generativeai as genai +from google.generativeai.types import ( + GenerateContentResponse, + HarmCategory, + HarmBlockThreshold, +) + +try: + # 尝试从较新的API导入 + from google.generativeai import configure + from google.generativeai.types import SafetySetting, GenerationConfig +except ImportError: + # 回退到基本类型 + SafetySetting = Dict + GenerationConfig = Dict + +# 定义兼容性类型 +ContentDict = Dict +PartDict = Dict +ToolDict = Dict +FunctionDeclaration = Dict +Tool = Dict +ContentListUnion = List[Dict] +ContentUnion = Dict +Content = Dict +Part = Dict +ThinkingConfig = Dict +GenerateContentConfig = Dict +EmbedContentConfig = Dict +EmbedContentResponse = Dict + +# 定义异常类型 +class ClientError(Exception): + pass + +class ServerError(Exception): + pass + +class UnknownFunctionCallArgumentError(Exception): + pass + +class UnsupportedFunctionError(Exception): + pass + +class FunctionInvocationError(Exception): + pass + +from src.config.api_ada_configs import ModelInfo, APIProvider +from src.common.logger import get_logger + +from .base_client import APIResponse, UsageRecord, BaseClient, client_registry +from ..exceptions import ( + RespParseException, + NetworkConnectionError, + RespNotOkException, + ReqAbortException, +) +from ..payload_content.message import Message, RoleType +from ..payload_content.resp_format import RespFormat, RespFormatType +from ..payload_content.tool_option import ToolOption, ToolParam, ToolCall + +logger = get_logger("Gemini客户端") + +SAFETY_SETTINGS = [ + {"category": HarmCategory.HARM_CATEGORY_HATE_SPEECH, "threshold": HarmBlockThreshold.BLOCK_NONE}, + {"category": HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, "threshold": HarmBlockThreshold.BLOCK_NONE}, + {"category": HarmCategory.HARM_CATEGORY_HARASSMENT, "threshold": HarmBlockThreshold.BLOCK_NONE}, + {"category": HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, "threshold": HarmBlockThreshold.BLOCK_NONE}, +] + + +def _convert_messages( + messages: list[Message], +) -> tuple[List[Dict], list[str] | None]: + """ + 转换消息格式 - 将消息转换为Gemini API所需的格式 + :param messages: 消息列表 + :return: 转换后的消息列表(和可能存在的system消息) + """ + + def _get_correct_mime_type(image_format: str) -> str: + """ + 获取正确的MIME类型,修复jpg到jpeg的映射问题 + :param image_format: 图片格式 + :return: 正确的MIME类型 + """ + # 标准化格式名称,解决jpg/jpeg兼容性问题 + format_mapping = { + "jpg": "jpeg", + "jpeg": "jpeg", + "png": "png", + "webp": "webp", + "heic": "heic", + "heif": "heif", + "gif": "gif" + } + normalized_format = format_mapping.get(image_format.lower(), image_format.lower()) + return f"image/{normalized_format}" + + def _convert_message_item(message: Message) -> Dict: + """ + 转换单个消息格式,除了system和tool类型的消息 + :param message: 消息对象 + :return: 转换后的消息字典 + """ + + # 将openai格式的角色重命名为gemini格式的角色 + if message.role == RoleType.Assistant: + role = "model" + elif message.role == RoleType.User: + role = "user" + + # 添加Content + if isinstance(message.content, str): + content = [{"text": message.content}] + elif isinstance(message.content, list): + content = [] + for item in message.content: + if isinstance(item, tuple): + content.append({ + "inline_data": { + "mime_type": _get_correct_mime_type(item[0]), + "data": item[1] + } + }) + elif isinstance(item, str): + content.append({"text": item}) + else: + raise RuntimeError("无法触及的代码:请使用MessageBuilder类构建消息对象") + + return {"role": role, "parts": content} + + temp_list: List[Dict] = [] + system_instructions: list[str] = [] + for message in messages: + if message.role == RoleType.System: + if isinstance(message.content, str): + system_instructions.append(message.content) + else: + raise ValueError("你tm怎么往system里面塞图片base64?") + elif message.role == RoleType.Tool: + if not message.tool_call_id: + raise ValueError("无法触及的代码:请使用MessageBuilder类构建消息对象") + else: + temp_list.append(_convert_message_item(message)) + if system_instructions: + # 如果有system消息,就把它加上去 + ret: tuple = (temp_list, system_instructions) + else: + # 如果没有system消息,就直接返回 + ret: tuple = (temp_list, None) + + return ret + + +def _convert_tool_options(tool_options: list[ToolOption]) -> list[FunctionDeclaration]: + """ + 转换工具选项格式 - 将工具选项转换为Gemini API所需的格式 + :param tool_options: 工具选项列表 + :return: 转换后的工具对象列表 + """ + + def _convert_tool_param(tool_option_param: ToolParam) -> dict: + """ + 转换单个工具参数格式 + :param tool_option_param: 工具参数对象 + :return: 转换后的工具参数字典 + """ + return_dict: dict[str, Any] = { + "type": tool_option_param.param_type.value, + "description": tool_option_param.description, + } + if tool_option_param.enum_values: + return_dict["enum"] = tool_option_param.enum_values + return return_dict + + def _convert_tool_option_item(tool_option: ToolOption) -> FunctionDeclaration: + """ + 转换单个工具项格式 + :param tool_option: 工具选项对象 + :return: 转换后的Gemini工具选项对象 + """ + ret: dict[str, Any] = { + "name": tool_option.name, + "description": tool_option.description, + } + if tool_option.params: + ret["parameters"] = { + "type": "object", + "properties": {param.name: _convert_tool_param(param) for param in tool_option.params}, + "required": [param.name for param in tool_option.params if param.required], + } + ret1 = FunctionDeclaration(**ret) + return ret1 + + return [_convert_tool_option_item(tool_option) for tool_option in tool_options] + + +def _process_delta( + delta: GenerateContentResponse, + fc_delta_buffer: io.StringIO, + tool_calls_buffer: list[tuple[str, str, dict[str, Any]]], +): + if not hasattr(delta, "candidates") or not delta.candidates: + raise RespParseException(delta, "响应解析失败,缺失candidates字段") + + if delta.text: + fc_delta_buffer.write(delta.text) + + if delta.function_calls: # 为什么不用hasattr呢,是因为这个属性一定有,即使是个空的 + for call in delta.function_calls: + try: + if not isinstance(call.args, dict): # gemini返回的function call参数就是dict格式的了 + raise RespParseException(delta, "响应解析失败,工具调用参数无法解析为字典类型") + if not call.id or not call.name: + raise RespParseException(delta, "响应解析失败,工具调用缺失id或name字段") + tool_calls_buffer.append( + ( + call.id, + call.name, + call.args or {}, # 如果args是None,则转换为一个空字典 + ) + ) + except Exception as e: + raise RespParseException(delta, "响应解析失败,无法解析工具调用参数") from e + + +def _build_stream_api_resp( + _fc_delta_buffer: io.StringIO, + _tool_calls_buffer: list[tuple[str, str, dict]], +) -> APIResponse: + # sourcery skip: simplify-len-comparison, use-assigned-variable + resp = APIResponse() + + if _fc_delta_buffer.tell() > 0: + # 如果正式内容缓冲区不为空,则将其写入APIResponse对象 + resp.content = _fc_delta_buffer.getvalue() + _fc_delta_buffer.close() + if len(_tool_calls_buffer) > 0: + # 如果工具调用缓冲区不为空,则将其解析为ToolCall对象列表 + resp.tool_calls = [] + for call_id, function_name, arguments_buffer in _tool_calls_buffer: + if arguments_buffer is not None: + arguments = arguments_buffer + if not isinstance(arguments, dict): + raise RespParseException( + None, + f"响应解析失败,工具调用参数无法解析为字典类型。工具调用参数原始响应:\n{arguments_buffer}", + ) + else: + arguments = None + + resp.tool_calls.append(ToolCall(call_id, function_name, arguments)) + + return resp + + +async def _default_stream_response_handler( + resp_stream: AsyncIterator[GenerateContentResponse], + interrupt_flag: asyncio.Event | None, +) -> tuple[APIResponse, Optional[tuple[int, int, int]]]: + """ + 流式响应处理函数 - 处理Gemini API的流式响应 + :param resp_stream: 流式响应对象,是一个神秘的iterator,我完全不知道这个玩意能不能跑,不过遍历一遍之后它就空了,如果跑不了一点的话可以考虑改成别的东西 + :return: APIResponse对象 + """ + _fc_delta_buffer = io.StringIO() # 正式内容缓冲区,用于存储接收到的正式内容 + _tool_calls_buffer: list[tuple[str, str, dict]] = [] # 工具调用缓冲区,用于存储接收到的工具调用 + _usage_record = None # 使用情况记录 + + def _insure_buffer_closed(): + if _fc_delta_buffer and not _fc_delta_buffer.closed: + _fc_delta_buffer.close() + + async for chunk in resp_stream: + # 检查是否有中断量 + if interrupt_flag and interrupt_flag.is_set(): + # 如果中断量被设置,则抛出ReqAbortException + raise ReqAbortException("请求被外部信号中断") + + _process_delta( + chunk, + _fc_delta_buffer, + _tool_calls_buffer, + ) + + if chunk.usage_metadata: + # 如果有使用情况,则将其存储在APIResponse对象中 + _usage_record = ( + chunk.usage_metadata.prompt_token_count or 0, + (chunk.usage_metadata.candidates_token_count or 0) + (chunk.usage_metadata.thoughts_token_count or 0), + chunk.usage_metadata.total_token_count or 0, + ) + try: + return _build_stream_api_resp( + _fc_delta_buffer, + _tool_calls_buffer, + ), _usage_record + except Exception: + # 确保缓冲区被关闭 + _insure_buffer_closed() + raise + + +def _default_normal_response_parser( + resp: GenerateContentResponse, +) -> tuple[APIResponse, Optional[tuple[int, int, int]]]: + """ + 解析对话补全响应 - 将Gemini API响应解析为APIResponse对象 + :param resp: 响应对象 + :return: APIResponse对象 + """ + api_response = APIResponse() + + if not hasattr(resp, "candidates") or not resp.candidates: + raise RespParseException(resp, "响应解析失败,缺失candidates字段") + try: + if resp.candidates[0].content and resp.candidates[0].content.parts: + for part in resp.candidates[0].content.parts: + if not part.text: + continue + if part.thought: + api_response.reasoning_content = ( + api_response.reasoning_content + part.text if api_response.reasoning_content else part.text + ) + except Exception as e: + logger.warning(f"解析思考内容时发生错误: {e},跳过解析") + + if resp.text: + api_response.content = resp.text + + if resp.function_calls: + api_response.tool_calls = [] + for call in resp.function_calls: + try: + if not isinstance(call.args, dict): + raise RespParseException(resp, "响应解析失败,工具调用参数无法解析为字典类型") + if not call.name: + raise RespParseException(resp, "响应解析失败,工具调用缺失name字段") + api_response.tool_calls.append(ToolCall(call.id or "gemini-tool_call", call.name, call.args or {})) + except Exception as e: + raise RespParseException(resp, "响应解析失败,无法解析工具调用参数") from e + + if resp.usage_metadata: + _usage_record = ( + resp.usage_metadata.prompt_token_count or 0, + (resp.usage_metadata.candidates_token_count or 0) + (resp.usage_metadata.thoughts_token_count or 0), + resp.usage_metadata.total_token_count or 0, + ) + else: + _usage_record = None + + api_response.raw_data = resp + + return api_response, _usage_record + + +@client_registry.register_client_class("gemini") +class GeminiClient(BaseClient): + def __init__(self, api_provider: APIProvider): + super().__init__(api_provider) + # 配置 Google Generative AI + genai.configure(api_key=api_provider.api_key) + + async def get_response( + self, + model_info: ModelInfo, + message_list: list[Message], + tool_options: list[ToolOption] | None = None, + max_tokens: int = 1024, + temperature: float = 0.4, + response_format: RespFormat | None = None, + stream_response_handler: Optional[ + Callable[ + [AsyncIterator[GenerateContentResponse], asyncio.Event | None], + Coroutine[Any, Any, tuple[APIResponse, Optional[tuple[int, int, int]]]], + ] + ] = None, + async_response_parser: Optional[ + Callable[[GenerateContentResponse], tuple[APIResponse, Optional[tuple[int, int, int]]]] + ] = None, + interrupt_flag: asyncio.Event | None = None, + extra_params: dict[str, Any] | None = None, + ) -> APIResponse: + """ + 获取对话响应 + Args: + model_info: 模型信息 + message_list: 对话体 + tool_options: 工具选项(可选,默认为None) + max_tokens: 最大token数(可选,默认为1024) + temperature: 温度(可选,默认为0.7) + response_format: 响应格式(默认为text/plain,如果是输入的JSON Schema则必须遵守OpenAPI3.0格式,理论上和openai是一样的,暂不支持其它相应格式输入) + stream_response_handler: 流式响应处理函数(可选,默认为default_stream_response_handler) + async_response_parser: 响应解析函数(可选,默认为default_response_parser) + interrupt_flag: 中断信号量(可选,默认为None) + Returns: + APIResponse对象,包含响应内容、推理内容、工具调用等信息 + """ + if stream_response_handler is None: + stream_response_handler = _default_stream_response_handler + + if async_response_parser is None: + async_response_parser = _default_normal_response_parser + + # 将messages构造为Gemini API所需的格式 + messages = _convert_messages(message_list) + # 将tool_options转换为Gemini API所需的格式 + tools = _convert_tool_options(tool_options) if tool_options else None + # 将response_format转换为Gemini API所需的格式 + generation_config_dict = { + "max_output_tokens": max_tokens, + "temperature": temperature, + "response_modalities": ["TEXT"], + "thinking_config": { + "include_thoughts": True, + "thinking_budget": ( + extra_params["thinking_budget"] + if extra_params and "thinking_budget" in extra_params + else int(max_tokens / 2) # 默认思考预算为最大token数的一半,防止空回复 + ), + }, + "safety_settings": SAFETY_SETTINGS, # 防止空回复问题 + } + if tools: + generation_config_dict["tools"] = {"function_declarations": tools} + if messages[1]: + # 如果有system消息,则将其添加到配置中 + generation_config_dict["system_instructions"] = messages[1] + if response_format and response_format.format_type == RespFormatType.TEXT: + generation_config_dict["response_mime_type"] = "text/plain" + elif response_format and response_format.format_type in (RespFormatType.JSON_OBJ, RespFormatType.JSON_SCHEMA): + generation_config_dict["response_mime_type"] = "application/json" + generation_config_dict["response_schema"] = response_format.to_dict() + + generation_config = generation_config_dict + + try: + # 创建模型实例 + model = genai.GenerativeModel(model_info.model_identifier) + + if model_info.force_stream_mode: + req_task = asyncio.create_task( + model.generate_content_async( + contents=messages[0], + generation_config=generation_config, + stream=True + ) + ) + while not req_task.done(): + if interrupt_flag and interrupt_flag.is_set(): + # 如果中断量存在且被设置,则取消任务并抛出异常 + req_task.cancel() + raise ReqAbortException("请求被外部信号中断") + await asyncio.sleep(0.1) # 等待0.1秒后再次检查任务&中断信号量状态 + resp, usage_record = await stream_response_handler(req_task.result(), interrupt_flag) + else: + req_task = asyncio.create_task( + model.generate_content_async( + contents=messages[0], + generation_config=generation_config + ) + ) + while not req_task.done(): + if interrupt_flag and interrupt_flag.is_set(): + # 如果中断量存在且被设置,则取消任务并抛出异常 + req_task.cancel() + raise ReqAbortException("请求被外部信号中断") + await asyncio.sleep(0.5) # 等待0.5秒后再次检查任务&中断信号量状态 + + resp, usage_record = async_response_parser(req_task.result()) + except Exception as e: + # 处理Google Generative AI异常 + if "rate limit" in str(e).lower(): + raise RespNotOkException(429, "请求频率过高,请稍后再试") from None + elif "quota" in str(e).lower(): + raise RespNotOkException(429, "配额已用完") from None + elif "invalid" in str(e).lower() or "bad request" in str(e).lower(): + raise RespNotOkException(400, f"请求无效:{str(e)}") from None + elif "permission" in str(e).lower() or "forbidden" in str(e).lower(): + raise RespNotOkException(403, "权限不足") from None + else: + raise NetworkConnectionError() from e + + if usage_record: + resp.usage = UsageRecord( + model_name=model_info.name, + provider_name=model_info.api_provider, + prompt_tokens=usage_record[0], + completion_tokens=usage_record[1], + total_tokens=usage_record[2], + ) + + return resp + + async def get_embedding( + self, + model_info: ModelInfo, + embedding_input: str, + extra_params: dict[str, Any] | None = None, + ) -> APIResponse: + """ + 获取文本嵌入 + :param model_info: 模型信息 + :param embedding_input: 嵌入输入文本 + :return: 嵌入响应 + """ + try: + raw_response: EmbedContentResponse = await self.client.aio.models.embed_content( + model=model_info.model_identifier, + contents=embedding_input, + config=EmbedContentConfig(task_type="SEMANTIC_SIMILARITY"), + ) + except (ClientError, ServerError) as e: + # 重封装ClientError和ServerError为RespNotOkException + raise RespNotOkException(e.code) from None + except Exception as e: + raise NetworkConnectionError() from e + + response = APIResponse() + + # 解析嵌入响应和使用情况 + if hasattr(raw_response, "embeddings") and raw_response.embeddings: + response.embedding = raw_response.embeddings[0].values + else: + raise RespParseException(raw_response, "响应解析失败,缺失embeddings字段") + + response.usage = UsageRecord( + model_name=model_info.name, + provider_name=model_info.api_provider, + prompt_tokens=len(embedding_input), + completion_tokens=0, + total_tokens=len(embedding_input), + ) + + return response + + def get_audio_transcriptions( + self, model_info: ModelInfo, audio_base64: str, extra_params: dict[str, Any] | None = None + ) -> APIResponse: + """ + 获取音频转录 + :param model_info: 模型信息 + :param audio_base64: 音频文件的Base64编码字符串 + :param extra_params: 额外参数(可选) + :return: 转录响应 + """ + generation_config_dict = { + "max_output_tokens": 2048, + "response_modalities": ["TEXT"], + "thinking_config": ThinkingConfig( + include_thoughts=True, + thinking_budget=( + extra_params["thinking_budget"] if extra_params and "thinking_budget" in extra_params else 1024 + ), + ), + "safety_settings": SAFETY_SETTINGS, + } + generate_content_config = GenerateContentConfig(**generation_config_dict) + prompt = "Generate a transcript of the speech. The language of the transcript should **match the language of the speech**." + try: + raw_response: GenerateContentResponse = self.client.models.generate_content( + model=model_info.model_identifier, + contents=[ + Content( + role="user", + parts=[ + Part.from_text(text=prompt), + Part.from_bytes(data=base64.b64decode(audio_base64), mime_type="audio/wav"), + ], + ) + ], + config=generate_content_config, + ) + resp, usage_record = _default_normal_response_parser(raw_response) + except (ClientError, ServerError) as e: + # 重封装ClientError和ServerError为RespNotOkException + raise RespNotOkException(e.code) from None + except Exception as e: + raise NetworkConnectionError() from e + + if usage_record: + resp.usage = UsageRecord( + model_name=model_info.name, + provider_name=model_info.api_provider, + prompt_tokens=usage_record[0], + completion_tokens=usage_record[1], + total_tokens=usage_record[2], + ) + + return resp + + def get_support_image_formats(self) -> list[str]: + """ + 获取支持的图片格式 + :return: 支持的图片格式列表 + """ + return ["png", "jpg", "jpeg", "webp", "heic", "heif"] diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 000000000..548e274fa --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,3 @@ +""" +工具模块 +""" diff --git a/src/utils/timing_utils.py b/src/utils/timing_utils.py new file mode 100644 index 000000000..c23e0d89e --- /dev/null +++ b/src/utils/timing_utils.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +""" +时间间隔工具函数 +用于主动思考功能的正态分布时间计算,支持3-sigma规则 +""" + +import random +import numpy as np +from typing import Optional + + +def get_normal_distributed_interval( + base_interval: int, + sigma_percentage: float = 0.1, + min_interval: Optional[int] = None, + max_interval: Optional[int] = None, + use_3sigma_rule: bool = True +) -> int: + """ + 获取符合正态分布的时间间隔,基于3-sigma规则 + + Args: + base_interval: 基础时间间隔(秒),作为正态分布的均值μ + sigma_percentage: 标准差占基础间隔的百分比,默认10% + min_interval: 最小间隔时间(秒),防止间隔过短 + max_interval: 最大间隔时间(秒),防止间隔过长 + use_3sigma_rule: 是否使用3-sigma规则限制分布范围,默认True + + Returns: + int: 符合正态分布的时间间隔(秒) + + Example: + >>> # 基础间隔1500秒(25分钟),标准差为150秒(10%) + >>> interval = get_normal_distributed_interval(1500, 0.1) + >>> # 99.7%的值会在μ±3σ范围内:1500±450 = [1050,1950] + """ + # 🚨 基本输入保护:处理负数 + if base_interval < 0: + base_interval = abs(base_interval) + + if sigma_percentage < 0: + sigma_percentage = abs(sigma_percentage) + + # 特殊情况:基础间隔为0,使用纯随机模式 + if base_interval == 0: + if sigma_percentage == 0: + return 1 # 都为0时返回1秒 + return _generate_pure_random_interval(sigma_percentage, min_interval, max_interval, use_3sigma_rule) + + # 特殊情况:sigma为0,返回固定间隔 + if sigma_percentage == 0: + return base_interval + + # 计算标准差 + sigma = base_interval * sigma_percentage + + # 📊 3-sigma规则:99.7%的数据落在μ±3σ范围内 + if use_3sigma_rule: + three_sigma_min = base_interval - 3 * sigma + three_sigma_max = base_interval + 3 * sigma + + # 确保3-sigma边界合理 + three_sigma_min = max(1, three_sigma_min) # 最小1秒 + three_sigma_max = max(three_sigma_min + 1, three_sigma_max) # 确保max > min + + # 应用用户设定的边界(如果更严格的话) + if min_interval is not None: + three_sigma_min = max(three_sigma_min, min_interval) + if max_interval is not None: + three_sigma_max = min(three_sigma_max, max_interval) + + effective_min = int(three_sigma_min) + effective_max = int(three_sigma_max) + else: + # 不使用3-sigma规则,使用更宽松的边界 + effective_min = max(1, min_interval or 1) + effective_max = max(effective_min + 1, max_interval or int(base_interval * 50)) + + # 🎲 生成正态分布随机数 + max_attempts = 50 # 3-sigma规则下成功率约99.7%,50次足够了 + + for attempt in range(max_attempts): + # 生成正态分布值 + value = np.random.normal(loc=base_interval, scale=sigma) + + # 💡 关键:对负数取绝对值,保持分布特性 + if value < 0: + value = abs(value) + + # 转换为整数 + interval = int(round(value)) + + # 检查是否在有效范围内 + if effective_min <= interval <= effective_max: + return interval + + # 如果50次都没成功,返回3-sigma范围内的随机值 + return int(np.random.uniform(effective_min, effective_max)) + + +def _generate_pure_random_interval( + sigma_percentage: float, + min_interval: Optional[int] = None, + max_interval: Optional[int] = None, + use_3sigma_rule: bool = True +) -> int: + """ + 当base_interval=0时的纯随机模式,基于3-sigma规则 + + Args: + sigma_percentage: 标准差百分比,将被转换为实际时间值 + min_interval: 最小间隔 + max_interval: 最大间隔 + use_3sigma_rule: 是否使用3-sigma规则 + + Returns: + int: 随机生成的时间间隔(秒) + """ + # 将百分比转换为实际时间值(假设1000秒作为基准) + # sigma_percentage=0.3 -> sigma=300秒 + base_reference = 1000 # 基准时间 + sigma = abs(sigma_percentage) * base_reference + + # 使用sigma作为均值,sigma/3作为标准差 + # 这样3σ范围约为[0, 2*sigma] + mean = sigma + std = sigma / 3 + + if use_3sigma_rule: + # 3-sigma边界:μ±3σ = sigma±3*(sigma/3) = sigma±sigma = [0, 2*sigma] + three_sigma_min = max(1, mean - 3 * std) # 理论上约为0,但最小1秒 + three_sigma_max = mean + 3 * std # 约为2*sigma + + # 应用用户边界 + if min_interval is not None: + three_sigma_min = max(three_sigma_min, min_interval) + if max_interval is not None: + three_sigma_max = min(three_sigma_max, max_interval) + three_sigma_min = max(three_sigma_min, min_interval) + if max_interval is not None: + three_sigma_max = min(three_sigma_max, max_interval) + + effective_min = int(three_sigma_min) + effective_max = int(three_sigma_max) + else: + # 不使用3-sigma规则 + effective_min = max(1, min_interval or 1) + effective_max = max(effective_min + 1, max_interval or int(mean * 10)) + + # 生成随机值 + for _ in range(50): + value = np.random.normal(loc=mean, scale=std) + + # 对负数取绝对值 + if value < 0: + value = abs(value) + + interval = int(round(value)) + + if effective_min <= interval <= effective_max: + return interval + + # 备用方案 + return int(np.random.uniform(effective_min, effective_max)) + + +def format_time_duration(seconds: int) -> str: + """ + 将秒数格式化为易读的时间格式 + + Args: + seconds: 秒数 + + Returns: + str: 格式化的时间字符串,如"2小时30分15秒" + """ + if seconds < 60: + return f"{seconds}秒" + + minutes = seconds // 60 + remaining_seconds = seconds % 60 + + if minutes < 60: + if remaining_seconds > 0: + return f"{minutes}分{remaining_seconds}秒" + else: + return f"{minutes}分" + + hours = minutes // 60 + remaining_minutes = minutes % 60 + + if hours < 24: + if remaining_minutes > 0 and remaining_seconds > 0: + return f"{hours}小时{remaining_minutes}分{remaining_seconds}秒" + elif remaining_minutes > 0: + return f"{hours}小时{remaining_minutes}分" + else: + return f"{hours}小时" + + days = hours // 24 + remaining_hours = hours % 24 + + if remaining_hours > 0: + return f"{days}天{remaining_hours}小时" + else: + return f"{days}天" \ No newline at end of file diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 8251761f9..2cc080101 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -120,9 +120,33 @@ talk_frequency_adjust = [ # [["", "8:00,1", "12:00,2", "18:00,1.5", "00:00,0.5"]] # 主动思考功能配置(仅在focus模式下生效) + enable_proactive_thinking = false # 是否启用主动思考功能 proactive_thinking_interval = 1500 # 主动思考触发间隔时间(秒),默认1500秒(25分钟) -The_scope_that_proactive_thinking_can_trigger = "all" #主动思考可以触发的范围(all - 所有,private - 私聊,group - 群聊) +# TIPS: +# 创意玩法:可以设置为0!设置为0时将基于delta_sigma生成纯随机间隔 +# 负数保险:如果设置为负数,会自动使用绝对值 + +proactive_thinking_in_private = true # 主动思考可以在私聊里面启用 +proactive_thinking_in_group = true # 主动思考可以在群聊里面启用 +proactive_thinking_enable_ids = [123456, 234567] # 启用主动思考的范围,不区分群聊和私聊,为空则不限制 +delta_sigma = 120 # 正态分布的标准差,控制时间间隔的随机程度 + +# 特殊用法: +# - 设置为0:禁用正态分布,使用固定间隔 +# - 设置得很大(如6000):产生高度随机的间隔,即使基础间隔为0也能工作 +# - 负数会自动转换为正数,不用担心配置错误以及极端边界情况 +# 实验建议:试试 proactive_thinking_interval=0 + delta_sigma 非常大 的纯随机模式! +# 结果保证:生成的间隔永远为正数(负数会取绝对值),最小1秒,最大24小时 + +enable_ids = [] # 启用主动思考的范围,不区分群聊和私聊,为空则不限制 +# 主动思考prompt模板,{time}会被替换为实际的沉默时间(如"2小时30分15秒") +proactive_thinking_prompt_template = """现在当前的聊天里面已经隔了{time}没有人发送消息了,请你结合上下文以及群聊里面之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择: + +1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时) +2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时) + +请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""" # 特定聊天流配置示例: