This commit is contained in:
minecraft1024a
2025-08-21 12:32:51 +08:00
6 changed files with 1005 additions and 10 deletions

View File

@@ -154,11 +154,33 @@ class HeartFChatting:
请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""",
}
# 主动思考配置 - 支持新旧配置格式
self.proactive_thinking_chat_scope = global_config.chat.The_scope_that_proactive_thinking_can_trigger
if self.proactive_thinking_chat_scope not in self.VALID_PROACTIVE_SCOPES:
logger.error(f"无效的主动思考范围: '{self.proactive_thinking_chat_scope}'。有效值为: {self.VALID_PROACTIVE_SCOPES}")
raise ValueError(f"配置错误:无效的主动思考范围 '{self.proactive_thinking_chat_scope}'") #乱填参数是吧,我跟你爆了
# 新的配置项 - 分离的私聊/群聊控制
self.proactive_thinking_in_private = global_config.chat.proactive_thinking_in_private
self.proactive_thinking_in_group = global_config.chat.proactive_thinking_in_group
# ID列表控制支持新旧两个字段
self.proactive_thinking_ids = []
if hasattr(global_config.chat, 'enable_ids') and global_config.chat.enable_ids:
self.proactive_thinking_ids = global_config.chat.enable_ids
elif hasattr(global_config.chat, 'proactive_thinking_enable_ids') and global_config.chat.proactive_thinking_enable_ids:
self.proactive_thinking_ids = global_config.chat.proactive_thinking_enable_ids
# 正态分布时间间隔配置
self.delta_sigma = getattr(global_config.chat, 'delta_sigma', 120)
# 打印主动思考配置信息
logger.info(f"{self.log_prefix} 主动思考配置: 启用={global_config.chat.enable_proactive_thinking}, "
f"旧范围={self.proactive_thinking_chat_scope}, 私聊={self.proactive_thinking_in_private}, "
f"群聊={self.proactive_thinking_in_group}, ID列表={self.proactive_thinking_ids}, "
f"基础间隔={global_config.chat.proactive_thinking_interval}s, Delta={self.delta_sigma}")
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@@ -288,21 +310,24 @@ class HeartFChatting:
async def _proactive_thinking_loop(self):
"""主动思考循环仅在focus模式下生效"""
while self.running:
await asyncio.sleep(30) # 每30秒检查一次
await asyncio.sleep(15) # 每15秒检查一次
# 只在focus模式下进行主动思考
if self.loop_mode != ChatMode.FOCUS:
continue
if self.proactive_thinking_chat_scope == "group" and self.chat_stream.group_info is None:
continue
if self.proactive_thinking_chat_scope == "private" and self.chat_stream.group_info is not None:
# 检查是否应该在当前聊天类型中启用主动思考
if not self._should_enable_proactive_thinking():
continue
current_time = time.time()
silence_duration = current_time - self.last_message_time
# 使用正态分布计算动态间隔时间
target_interval = self._get_dynamic_thinking_interval()
# 检查是否达到主动思考的时间间隔
if silence_duration >= global_config.chat.proactive_thinking_interval:
if silence_duration >= target_interval:
try:
await self._execute_proactive_thinking(silence_duration)
# 重置计时器,避免频繁触发
@@ -311,6 +336,125 @@ class HeartFChatting:
logger.error(f"{self.log_prefix} 主动思考执行出错: {e}")
logger.error(traceback.format_exc())
def _should_enable_proactive_thinking(self) -> bool:
"""检查是否应该在当前聊天中启用主动思考"""
# 获取当前聊天ID
chat_id = None
if hasattr(self.chat_stream, 'chat_id'):
chat_id = int(self.chat_stream.chat_id)
# 如果指定了ID列表只在列表中的聊天启用
if self.proactive_thinking_ids:
if chat_id is None or chat_id not in self.proactive_thinking_ids:
return False
# 检查聊天类型(私聊/群聊)控制
is_group_chat = self.chat_stream.group_info is not None
if is_group_chat:
# 群聊:检查群聊启用开关
if not self.proactive_thinking_in_group:
return False
else:
# 私聊:检查私聊启用开关
if not self.proactive_thinking_in_private:
return False
# 兼容旧的范围配置
if self.proactive_thinking_chat_scope == "group" and not is_group_chat:
return False
if self.proactive_thinking_chat_scope == "private" and is_group_chat:
return False
return True
def _get_dynamic_thinking_interval(self) -> float:
"""获取动态的主动思考间隔时间使用正态分布和3-sigma规则"""
try:
from src.utils.timing_utils import get_normal_distributed_interval
base_interval = global_config.chat.proactive_thinking_interval
# 🚨 保险机制:处理负数配置
if base_interval < 0:
logger.warning(f"{self.log_prefix} proactive_thinking_interval设置为{base_interval}为负数,使用绝对值{abs(base_interval)}")
base_interval = abs(base_interval)
if self.delta_sigma < 0:
logger.warning(f"{self.log_prefix} delta_sigma设置为{self.delta_sigma}为负数,使用绝对值{abs(self.delta_sigma)}")
delta_sigma = abs(self.delta_sigma)
else:
delta_sigma = self.delta_sigma
# 🚨 特殊情况处理
if base_interval == 0 and delta_sigma == 0:
logger.warning(f"{self.log_prefix} 基础间隔和Delta都为0强制使用300秒安全间隔")
return 300
elif base_interval == 0:
# 基础间隔为0但有delta_sigma基于delta_sigma生成随机间隔
logger.info(f"{self.log_prefix} 基础间隔为0使用纯随机模式基于delta_sigma={delta_sigma}")
sigma_percentage = delta_sigma / 1000 # 假设1000秒作为虚拟基准
result = get_normal_distributed_interval(0, sigma_percentage, 1, 86400, use_3sigma_rule=True)
logger.debug(f"{self.log_prefix} 纯随机模式生成间隔: {result}")
return result
elif delta_sigma == 0:
# 禁用正态分布,使用固定间隔
logger.debug(f"{self.log_prefix} delta_sigma=0禁用正态分布使用固定间隔{base_interval}")
return base_interval
# 正常情况使用3-sigma规则的正态分布
sigma_percentage = delta_sigma / base_interval
# 3-sigma边界计算
sigma = delta_sigma
three_sigma_range = 3 * sigma
theoretical_min = max(1, base_interval - three_sigma_range)
theoretical_max = base_interval + three_sigma_range
logger.debug(f"{self.log_prefix} 3-sigma分布: 基础={base_interval}s, σ={sigma}s, "
f"理论范围=[{theoretical_min:.0f}, {theoretical_max:.0f}]s")
# 给用户最大自由度使用3-sigma规则但不强制限制范围
result = get_normal_distributed_interval(
base_interval,
sigma_percentage,
1, # 最小1秒
86400, # 最大24小时
use_3sigma_rule=True
)
return result
except ImportError:
# 如果timing_utils不可用回退到固定间隔
logger.warning(f"{self.log_prefix} timing_utils不可用使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
except Exception as e:
# 如果计算出错,回退到固定间隔
logger.error(f"{self.log_prefix} 动态间隔计算出错: {e},使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
def _generate_random_interval_from_sigma(self, sigma: float) -> float:
"""基于sigma值生成纯随机间隔当基础间隔为0时使用"""
try:
import numpy as np
# 使用sigma作为标准差0作为均值生成正态分布
interval = abs(np.random.normal(loc=0, scale=sigma))
# 确保最小值
interval = max(interval, 30) # 最小30秒
# 限制最大值防止过度极端
interval = min(interval, 86400) # 最大24小时
logger.debug(f"{self.log_prefix} 纯随机模式生成间隔: {int(interval)}")
return int(interval)
except Exception as e:
logger.error(f"{self.log_prefix} 纯随机间隔生成失败: {e}")
return 300 # 回退到5分钟
def _format_duration(self, seconds: float) -> str:
"""格式化时间间隔为易读格式"""
hours = int(seconds // 3600)
@@ -333,7 +477,11 @@ class HeartFChatting:
logger.info(f"{self.log_prefix} 触发主动思考,已沉默{formatted_time}")
try:
# 根据聊天类型选择prompt
# 优先使用配置文件中的prompt模板如果没有则使用内置模板
if hasattr(global_config.chat, 'proactive_thinking_prompt_template') and global_config.chat.proactive_thinking_prompt_template.strip():
proactive_prompt = global_config.chat.proactive_thinking_prompt_template.format(time=formatted_time)
else:
# 回退到内置的prompt模板
chat_type = "group" if self.chat_stream.group_info else "private"
prompt_template = self.proactive_thinking_prompts.get(chat_type, self.proactive_thinking_prompts["group"])
proactive_prompt = prompt_template.format(time=formatted_time)

View File

@@ -82,6 +82,17 @@ class ChatConfig(ValidatedConfigBase):
enable_proactive_thinking: bool = Field(default=False, description="启用主动思考")
proactive_thinking_interval: int = Field(default=1500, description="主动思考间隔")
The_scope_that_proactive_thinking_can_trigger: str = Field(default="all", description="主动思考可以触发的范围")
proactive_thinking_in_private: bool = Field(default=True, description="主动思考可以在私聊里面启用")
proactive_thinking_in_group: bool = Field(default=True, description="主动思考可以在群聊里面启用")
proactive_thinking_enable_ids: List[int] = Field(default_factory=list, description="启用主动思考的范围,不区分群聊和私聊,为空则不限制")
delta_sigma: int = Field(default=120, description="采用正态分布随机时间间隔")
enable_ids: List[int] = Field(default_factory=lambda: [123456, 234567], description="启用主动思考的范围,不区分群聊和私聊,为空则不限制")
proactive_thinking_prompt_template: str = Field(default="""现在群里面已经隔了{time}没有人发送消息了,请你结合上下文以及群聊里面之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择:
1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时)
2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时)
请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""", description="主动思考提示模板")
def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float:
"""

View File

@@ -0,0 +1,603 @@
import asyncio
import io
import base64
from typing import Callable, AsyncIterator, Optional, Coroutine, Any, List, Dict, Union
import google.generativeai as genai
from google.generativeai.types import (
GenerateContentResponse,
HarmCategory,
HarmBlockThreshold,
)
try:
# 尝试从较新的API导入
from google.generativeai import configure
from google.generativeai.types import SafetySetting, GenerationConfig
except ImportError:
# 回退到基本类型
SafetySetting = Dict
GenerationConfig = Dict
# 定义兼容性类型
ContentDict = Dict
PartDict = Dict
ToolDict = Dict
FunctionDeclaration = Dict
Tool = Dict
ContentListUnion = List[Dict]
ContentUnion = Dict
Content = Dict
Part = Dict
ThinkingConfig = Dict
GenerateContentConfig = Dict
EmbedContentConfig = Dict
EmbedContentResponse = Dict
# 定义异常类型
class ClientError(Exception):
pass
class ServerError(Exception):
pass
class UnknownFunctionCallArgumentError(Exception):
pass
class UnsupportedFunctionError(Exception):
pass
class FunctionInvocationError(Exception):
pass
from src.config.api_ada_configs import ModelInfo, APIProvider
from src.common.logger import get_logger
from .base_client import APIResponse, UsageRecord, BaseClient, client_registry
from ..exceptions import (
RespParseException,
NetworkConnectionError,
RespNotOkException,
ReqAbortException,
)
from ..payload_content.message import Message, RoleType
from ..payload_content.resp_format import RespFormat, RespFormatType
from ..payload_content.tool_option import ToolOption, ToolParam, ToolCall
logger = get_logger("Gemini客户端")
SAFETY_SETTINGS = [
{"category": HarmCategory.HARM_CATEGORY_HATE_SPEECH, "threshold": HarmBlockThreshold.BLOCK_NONE},
{"category": HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, "threshold": HarmBlockThreshold.BLOCK_NONE},
{"category": HarmCategory.HARM_CATEGORY_HARASSMENT, "threshold": HarmBlockThreshold.BLOCK_NONE},
{"category": HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, "threshold": HarmBlockThreshold.BLOCK_NONE},
]
def _convert_messages(
messages: list[Message],
) -> tuple[List[Dict], list[str] | None]:
"""
转换消息格式 - 将消息转换为Gemini API所需的格式
:param messages: 消息列表
:return: 转换后的消息列表(和可能存在的system消息)
"""
def _get_correct_mime_type(image_format: str) -> str:
"""
获取正确的MIME类型修复jpg到jpeg的映射问题
:param image_format: 图片格式
:return: 正确的MIME类型
"""
# 标准化格式名称解决jpg/jpeg兼容性问题
format_mapping = {
"jpg": "jpeg",
"jpeg": "jpeg",
"png": "png",
"webp": "webp",
"heic": "heic",
"heif": "heif",
"gif": "gif"
}
normalized_format = format_mapping.get(image_format.lower(), image_format.lower())
return f"image/{normalized_format}"
def _convert_message_item(message: Message) -> Dict:
"""
转换单个消息格式除了system和tool类型的消息
:param message: 消息对象
:return: 转换后的消息字典
"""
# 将openai格式的角色重命名为gemini格式的角色
if message.role == RoleType.Assistant:
role = "model"
elif message.role == RoleType.User:
role = "user"
# 添加Content
if isinstance(message.content, str):
content = [{"text": message.content}]
elif isinstance(message.content, list):
content = []
for item in message.content:
if isinstance(item, tuple):
content.append({
"inline_data": {
"mime_type": _get_correct_mime_type(item[0]),
"data": item[1]
}
})
elif isinstance(item, str):
content.append({"text": item})
else:
raise RuntimeError("无法触及的代码请使用MessageBuilder类构建消息对象")
return {"role": role, "parts": content}
temp_list: List[Dict] = []
system_instructions: list[str] = []
for message in messages:
if message.role == RoleType.System:
if isinstance(message.content, str):
system_instructions.append(message.content)
else:
raise ValueError("你tm怎么往system里面塞图片base64")
elif message.role == RoleType.Tool:
if not message.tool_call_id:
raise ValueError("无法触及的代码请使用MessageBuilder类构建消息对象")
else:
temp_list.append(_convert_message_item(message))
if system_instructions:
# 如果有system消息就把它加上去
ret: tuple = (temp_list, system_instructions)
else:
# 如果没有system消息就直接返回
ret: tuple = (temp_list, None)
return ret
def _convert_tool_options(tool_options: list[ToolOption]) -> list[FunctionDeclaration]:
"""
转换工具选项格式 - 将工具选项转换为Gemini API所需的格式
:param tool_options: 工具选项列表
:return: 转换后的工具对象列表
"""
def _convert_tool_param(tool_option_param: ToolParam) -> dict:
"""
转换单个工具参数格式
:param tool_option_param: 工具参数对象
:return: 转换后的工具参数字典
"""
return_dict: dict[str, Any] = {
"type": tool_option_param.param_type.value,
"description": tool_option_param.description,
}
if tool_option_param.enum_values:
return_dict["enum"] = tool_option_param.enum_values
return return_dict
def _convert_tool_option_item(tool_option: ToolOption) -> FunctionDeclaration:
"""
转换单个工具项格式
:param tool_option: 工具选项对象
:return: 转换后的Gemini工具选项对象
"""
ret: dict[str, Any] = {
"name": tool_option.name,
"description": tool_option.description,
}
if tool_option.params:
ret["parameters"] = {
"type": "object",
"properties": {param.name: _convert_tool_param(param) for param in tool_option.params},
"required": [param.name for param in tool_option.params if param.required],
}
ret1 = FunctionDeclaration(**ret)
return ret1
return [_convert_tool_option_item(tool_option) for tool_option in tool_options]
def _process_delta(
delta: GenerateContentResponse,
fc_delta_buffer: io.StringIO,
tool_calls_buffer: list[tuple[str, str, dict[str, Any]]],
):
if not hasattr(delta, "candidates") or not delta.candidates:
raise RespParseException(delta, "响应解析失败缺失candidates字段")
if delta.text:
fc_delta_buffer.write(delta.text)
if delta.function_calls: # 为什么不用hasattr呢是因为这个属性一定有即使是个空的
for call in delta.function_calls:
try:
if not isinstance(call.args, dict): # gemini返回的function call参数就是dict格式的了
raise RespParseException(delta, "响应解析失败,工具调用参数无法解析为字典类型")
if not call.id or not call.name:
raise RespParseException(delta, "响应解析失败工具调用缺失id或name字段")
tool_calls_buffer.append(
(
call.id,
call.name,
call.args or {}, # 如果args是None则转换为一个空字典
)
)
except Exception as e:
raise RespParseException(delta, "响应解析失败,无法解析工具调用参数") from e
def _build_stream_api_resp(
_fc_delta_buffer: io.StringIO,
_tool_calls_buffer: list[tuple[str, str, dict]],
) -> APIResponse:
# sourcery skip: simplify-len-comparison, use-assigned-variable
resp = APIResponse()
if _fc_delta_buffer.tell() > 0:
# 如果正式内容缓冲区不为空则将其写入APIResponse对象
resp.content = _fc_delta_buffer.getvalue()
_fc_delta_buffer.close()
if len(_tool_calls_buffer) > 0:
# 如果工具调用缓冲区不为空则将其解析为ToolCall对象列表
resp.tool_calls = []
for call_id, function_name, arguments_buffer in _tool_calls_buffer:
if arguments_buffer is not None:
arguments = arguments_buffer
if not isinstance(arguments, dict):
raise RespParseException(
None,
f"响应解析失败,工具调用参数无法解析为字典类型。工具调用参数原始响应:\n{arguments_buffer}",
)
else:
arguments = None
resp.tool_calls.append(ToolCall(call_id, function_name, arguments))
return resp
async def _default_stream_response_handler(
resp_stream: AsyncIterator[GenerateContentResponse],
interrupt_flag: asyncio.Event | None,
) -> tuple[APIResponse, Optional[tuple[int, int, int]]]:
"""
流式响应处理函数 - 处理Gemini API的流式响应
:param resp_stream: 流式响应对象,是一个神秘的iterator我完全不知道这个玩意能不能跑不过遍历一遍之后它就空了如果跑不了一点的话可以考虑改成别的东西
:return: APIResponse对象
"""
_fc_delta_buffer = io.StringIO() # 正式内容缓冲区,用于存储接收到的正式内容
_tool_calls_buffer: list[tuple[str, str, dict]] = [] # 工具调用缓冲区,用于存储接收到的工具调用
_usage_record = None # 使用情况记录
def _insure_buffer_closed():
if _fc_delta_buffer and not _fc_delta_buffer.closed:
_fc_delta_buffer.close()
async for chunk in resp_stream:
# 检查是否有中断量
if interrupt_flag and interrupt_flag.is_set():
# 如果中断量被设置则抛出ReqAbortException
raise ReqAbortException("请求被外部信号中断")
_process_delta(
chunk,
_fc_delta_buffer,
_tool_calls_buffer,
)
if chunk.usage_metadata:
# 如果有使用情况则将其存储在APIResponse对象中
_usage_record = (
chunk.usage_metadata.prompt_token_count or 0,
(chunk.usage_metadata.candidates_token_count or 0) + (chunk.usage_metadata.thoughts_token_count or 0),
chunk.usage_metadata.total_token_count or 0,
)
try:
return _build_stream_api_resp(
_fc_delta_buffer,
_tool_calls_buffer,
), _usage_record
except Exception:
# 确保缓冲区被关闭
_insure_buffer_closed()
raise
def _default_normal_response_parser(
resp: GenerateContentResponse,
) -> tuple[APIResponse, Optional[tuple[int, int, int]]]:
"""
解析对话补全响应 - 将Gemini API响应解析为APIResponse对象
:param resp: 响应对象
:return: APIResponse对象
"""
api_response = APIResponse()
if not hasattr(resp, "candidates") or not resp.candidates:
raise RespParseException(resp, "响应解析失败缺失candidates字段")
try:
if resp.candidates[0].content and resp.candidates[0].content.parts:
for part in resp.candidates[0].content.parts:
if not part.text:
continue
if part.thought:
api_response.reasoning_content = (
api_response.reasoning_content + part.text if api_response.reasoning_content else part.text
)
except Exception as e:
logger.warning(f"解析思考内容时发生错误: {e},跳过解析")
if resp.text:
api_response.content = resp.text
if resp.function_calls:
api_response.tool_calls = []
for call in resp.function_calls:
try:
if not isinstance(call.args, dict):
raise RespParseException(resp, "响应解析失败,工具调用参数无法解析为字典类型")
if not call.name:
raise RespParseException(resp, "响应解析失败工具调用缺失name字段")
api_response.tool_calls.append(ToolCall(call.id or "gemini-tool_call", call.name, call.args or {}))
except Exception as e:
raise RespParseException(resp, "响应解析失败,无法解析工具调用参数") from e
if resp.usage_metadata:
_usage_record = (
resp.usage_metadata.prompt_token_count or 0,
(resp.usage_metadata.candidates_token_count or 0) + (resp.usage_metadata.thoughts_token_count or 0),
resp.usage_metadata.total_token_count or 0,
)
else:
_usage_record = None
api_response.raw_data = resp
return api_response, _usage_record
@client_registry.register_client_class("gemini")
class GeminiClient(BaseClient):
def __init__(self, api_provider: APIProvider):
super().__init__(api_provider)
# 配置 Google Generative AI
genai.configure(api_key=api_provider.api_key)
async def get_response(
self,
model_info: ModelInfo,
message_list: list[Message],
tool_options: list[ToolOption] | None = None,
max_tokens: int = 1024,
temperature: float = 0.4,
response_format: RespFormat | None = None,
stream_response_handler: Optional[
Callable[
[AsyncIterator[GenerateContentResponse], asyncio.Event | None],
Coroutine[Any, Any, tuple[APIResponse, Optional[tuple[int, int, int]]]],
]
] = None,
async_response_parser: Optional[
Callable[[GenerateContentResponse], tuple[APIResponse, Optional[tuple[int, int, int]]]]
] = None,
interrupt_flag: asyncio.Event | None = None,
extra_params: dict[str, Any] | None = None,
) -> APIResponse:
"""
获取对话响应
Args:
model_info: 模型信息
message_list: 对话体
tool_options: 工具选项可选默认为None
max_tokens: 最大token数可选默认为1024
temperature: 温度可选默认为0.7
response_format: 响应格式默认为text/plain,如果是输入的JSON Schema则必须遵守OpenAPI3.0格式,理论上和openai是一样的暂不支持其它相应格式输入
stream_response_handler: 流式响应处理函数可选默认为default_stream_response_handler
async_response_parser: 响应解析函数可选默认为default_response_parser
interrupt_flag: 中断信号量可选默认为None
Returns:
APIResponse对象包含响应内容、推理内容、工具调用等信息
"""
if stream_response_handler is None:
stream_response_handler = _default_stream_response_handler
if async_response_parser is None:
async_response_parser = _default_normal_response_parser
# 将messages构造为Gemini API所需的格式
messages = _convert_messages(message_list)
# 将tool_options转换为Gemini API所需的格式
tools = _convert_tool_options(tool_options) if tool_options else None
# 将response_format转换为Gemini API所需的格式
generation_config_dict = {
"max_output_tokens": max_tokens,
"temperature": temperature,
"response_modalities": ["TEXT"],
"thinking_config": {
"include_thoughts": True,
"thinking_budget": (
extra_params["thinking_budget"]
if extra_params and "thinking_budget" in extra_params
else int(max_tokens / 2) # 默认思考预算为最大token数的一半防止空回复
),
},
"safety_settings": SAFETY_SETTINGS, # 防止空回复问题
}
if tools:
generation_config_dict["tools"] = {"function_declarations": tools}
if messages[1]:
# 如果有system消息则将其添加到配置中
generation_config_dict["system_instructions"] = messages[1]
if response_format and response_format.format_type == RespFormatType.TEXT:
generation_config_dict["response_mime_type"] = "text/plain"
elif response_format and response_format.format_type in (RespFormatType.JSON_OBJ, RespFormatType.JSON_SCHEMA):
generation_config_dict["response_mime_type"] = "application/json"
generation_config_dict["response_schema"] = response_format.to_dict()
generation_config = generation_config_dict
try:
# 创建模型实例
model = genai.GenerativeModel(model_info.model_identifier)
if model_info.force_stream_mode:
req_task = asyncio.create_task(
model.generate_content_async(
contents=messages[0],
generation_config=generation_config,
stream=True
)
)
while not req_task.done():
if interrupt_flag and interrupt_flag.is_set():
# 如果中断量存在且被设置,则取消任务并抛出异常
req_task.cancel()
raise ReqAbortException("请求被外部信号中断")
await asyncio.sleep(0.1) # 等待0.1秒后再次检查任务&中断信号量状态
resp, usage_record = await stream_response_handler(req_task.result(), interrupt_flag)
else:
req_task = asyncio.create_task(
model.generate_content_async(
contents=messages[0],
generation_config=generation_config
)
)
while not req_task.done():
if interrupt_flag and interrupt_flag.is_set():
# 如果中断量存在且被设置,则取消任务并抛出异常
req_task.cancel()
raise ReqAbortException("请求被外部信号中断")
await asyncio.sleep(0.5) # 等待0.5秒后再次检查任务&中断信号量状态
resp, usage_record = async_response_parser(req_task.result())
except Exception as e:
# 处理Google Generative AI异常
if "rate limit" in str(e).lower():
raise RespNotOkException(429, "请求频率过高,请稍后再试") from None
elif "quota" in str(e).lower():
raise RespNotOkException(429, "配额已用完") from None
elif "invalid" in str(e).lower() or "bad request" in str(e).lower():
raise RespNotOkException(400, f"请求无效:{str(e)}") from None
elif "permission" in str(e).lower() or "forbidden" in str(e).lower():
raise RespNotOkException(403, "权限不足") from None
else:
raise NetworkConnectionError() from e
if usage_record:
resp.usage = UsageRecord(
model_name=model_info.name,
provider_name=model_info.api_provider,
prompt_tokens=usage_record[0],
completion_tokens=usage_record[1],
total_tokens=usage_record[2],
)
return resp
async def get_embedding(
self,
model_info: ModelInfo,
embedding_input: str,
extra_params: dict[str, Any] | None = None,
) -> APIResponse:
"""
获取文本嵌入
:param model_info: 模型信息
:param embedding_input: 嵌入输入文本
:return: 嵌入响应
"""
try:
raw_response: EmbedContentResponse = await self.client.aio.models.embed_content(
model=model_info.model_identifier,
contents=embedding_input,
config=EmbedContentConfig(task_type="SEMANTIC_SIMILARITY"),
)
except (ClientError, ServerError) as e:
# 重封装ClientError和ServerError为RespNotOkException
raise RespNotOkException(e.code) from None
except Exception as e:
raise NetworkConnectionError() from e
response = APIResponse()
# 解析嵌入响应和使用情况
if hasattr(raw_response, "embeddings") and raw_response.embeddings:
response.embedding = raw_response.embeddings[0].values
else:
raise RespParseException(raw_response, "响应解析失败缺失embeddings字段")
response.usage = UsageRecord(
model_name=model_info.name,
provider_name=model_info.api_provider,
prompt_tokens=len(embedding_input),
completion_tokens=0,
total_tokens=len(embedding_input),
)
return response
def get_audio_transcriptions(
self, model_info: ModelInfo, audio_base64: str, extra_params: dict[str, Any] | None = None
) -> APIResponse:
"""
获取音频转录
:param model_info: 模型信息
:param audio_base64: 音频文件的Base64编码字符串
:param extra_params: 额外参数(可选)
:return: 转录响应
"""
generation_config_dict = {
"max_output_tokens": 2048,
"response_modalities": ["TEXT"],
"thinking_config": ThinkingConfig(
include_thoughts=True,
thinking_budget=(
extra_params["thinking_budget"] if extra_params and "thinking_budget" in extra_params else 1024
),
),
"safety_settings": SAFETY_SETTINGS,
}
generate_content_config = GenerateContentConfig(**generation_config_dict)
prompt = "Generate a transcript of the speech. The language of the transcript should **match the language of the speech**."
try:
raw_response: GenerateContentResponse = self.client.models.generate_content(
model=model_info.model_identifier,
contents=[
Content(
role="user",
parts=[
Part.from_text(text=prompt),
Part.from_bytes(data=base64.b64decode(audio_base64), mime_type="audio/wav"),
],
)
],
config=generate_content_config,
)
resp, usage_record = _default_normal_response_parser(raw_response)
except (ClientError, ServerError) as e:
# 重封装ClientError和ServerError为RespNotOkException
raise RespNotOkException(e.code) from None
except Exception as e:
raise NetworkConnectionError() from e
if usage_record:
resp.usage = UsageRecord(
model_name=model_info.name,
provider_name=model_info.api_provider,
prompt_tokens=usage_record[0],
completion_tokens=usage_record[1],
total_tokens=usage_record[2],
)
return resp
def get_support_image_formats(self) -> list[str]:
"""
获取支持的图片格式
:return: 支持的图片格式列表
"""
return ["png", "jpg", "jpeg", "webp", "heic", "heif"]

3
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
工具模块
"""

206
src/utils/timing_utils.py Normal file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
时间间隔工具函数
用于主动思考功能的正态分布时间计算支持3-sigma规则
"""
import random
import numpy as np
from typing import Optional
def get_normal_distributed_interval(
base_interval: int,
sigma_percentage: float = 0.1,
min_interval: Optional[int] = None,
max_interval: Optional[int] = None,
use_3sigma_rule: bool = True
) -> int:
"""
获取符合正态分布的时间间隔基于3-sigma规则
Args:
base_interval: 基础时间间隔(秒),作为正态分布的均值μ
sigma_percentage: 标准差占基础间隔的百分比默认10%
min_interval: 最小间隔时间(秒),防止间隔过短
max_interval: 最大间隔时间(秒),防止间隔过长
use_3sigma_rule: 是否使用3-sigma规则限制分布范围默认True
Returns:
int: 符合正态分布的时间间隔(秒)
Example:
>>> # 基础间隔1500秒25分钟标准差为150秒10%
>>> interval = get_normal_distributed_interval(1500, 0.1)
>>> # 99.7%的值会在μ±3σ范围内1500±450 = [1050,1950]
"""
# 🚨 基本输入保护:处理负数
if base_interval < 0:
base_interval = abs(base_interval)
if sigma_percentage < 0:
sigma_percentage = abs(sigma_percentage)
# 特殊情况基础间隔为0使用纯随机模式
if base_interval == 0:
if sigma_percentage == 0:
return 1 # 都为0时返回1秒
return _generate_pure_random_interval(sigma_percentage, min_interval, max_interval, use_3sigma_rule)
# 特殊情况sigma为0返回固定间隔
if sigma_percentage == 0:
return base_interval
# 计算标准差
sigma = base_interval * sigma_percentage
# 📊 3-sigma规则99.7%的数据落在μ±3σ范围内
if use_3sigma_rule:
three_sigma_min = base_interval - 3 * sigma
three_sigma_max = base_interval + 3 * sigma
# 确保3-sigma边界合理
three_sigma_min = max(1, three_sigma_min) # 最小1秒
three_sigma_max = max(three_sigma_min + 1, three_sigma_max) # 确保max > min
# 应用用户设定的边界(如果更严格的话)
if min_interval is not None:
three_sigma_min = max(three_sigma_min, min_interval)
if max_interval is not None:
three_sigma_max = min(three_sigma_max, max_interval)
effective_min = int(three_sigma_min)
effective_max = int(three_sigma_max)
else:
# 不使用3-sigma规则使用更宽松的边界
effective_min = max(1, min_interval or 1)
effective_max = max(effective_min + 1, max_interval or int(base_interval * 50))
# 🎲 生成正态分布随机数
max_attempts = 50 # 3-sigma规则下成功率约99.7%50次足够了
for attempt in range(max_attempts):
# 生成正态分布值
value = np.random.normal(loc=base_interval, scale=sigma)
# 💡 关键:对负数取绝对值,保持分布特性
if value < 0:
value = abs(value)
# 转换为整数
interval = int(round(value))
# 检查是否在有效范围内
if effective_min <= interval <= effective_max:
return interval
# 如果50次都没成功返回3-sigma范围内的随机值
return int(np.random.uniform(effective_min, effective_max))
def _generate_pure_random_interval(
sigma_percentage: float,
min_interval: Optional[int] = None,
max_interval: Optional[int] = None,
use_3sigma_rule: bool = True
) -> int:
"""
当base_interval=0时的纯随机模式基于3-sigma规则
Args:
sigma_percentage: 标准差百分比,将被转换为实际时间值
min_interval: 最小间隔
max_interval: 最大间隔
use_3sigma_rule: 是否使用3-sigma规则
Returns:
int: 随机生成的时间间隔(秒)
"""
# 将百分比转换为实际时间值假设1000秒作为基准
# sigma_percentage=0.3 -> sigma=300秒
base_reference = 1000 # 基准时间
sigma = abs(sigma_percentage) * base_reference
# 使用sigma作为均值sigma/3作为标准差
# 这样3σ范围约为[0, 2*sigma]
mean = sigma
std = sigma / 3
if use_3sigma_rule:
# 3-sigma边界μ±3σ = sigma±3*(sigma/3) = sigma±sigma = [0, 2*sigma]
three_sigma_min = max(1, mean - 3 * std) # 理论上约为0但最小1秒
three_sigma_max = mean + 3 * std # 约为2*sigma
# 应用用户边界
if min_interval is not None:
three_sigma_min = max(three_sigma_min, min_interval)
if max_interval is not None:
three_sigma_max = min(three_sigma_max, max_interval)
three_sigma_min = max(three_sigma_min, min_interval)
if max_interval is not None:
three_sigma_max = min(three_sigma_max, max_interval)
effective_min = int(three_sigma_min)
effective_max = int(three_sigma_max)
else:
# 不使用3-sigma规则
effective_min = max(1, min_interval or 1)
effective_max = max(effective_min + 1, max_interval or int(mean * 10))
# 生成随机值
for _ in range(50):
value = np.random.normal(loc=mean, scale=std)
# 对负数取绝对值
if value < 0:
value = abs(value)
interval = int(round(value))
if effective_min <= interval <= effective_max:
return interval
# 备用方案
return int(np.random.uniform(effective_min, effective_max))
def format_time_duration(seconds: int) -> str:
"""
将秒数格式化为易读的时间格式
Args:
seconds: 秒数
Returns:
str: 格式化的时间字符串,如"2小时30分15秒"
"""
if seconds < 60:
return f"{seconds}"
minutes = seconds // 60
remaining_seconds = seconds % 60
if minutes < 60:
if remaining_seconds > 0:
return f"{minutes}{remaining_seconds}"
else:
return f"{minutes}"
hours = minutes // 60
remaining_minutes = minutes % 60
if hours < 24:
if remaining_minutes > 0 and remaining_seconds > 0:
return f"{hours}小时{remaining_minutes}{remaining_seconds}"
elif remaining_minutes > 0:
return f"{hours}小时{remaining_minutes}"
else:
return f"{hours}小时"
days = hours // 24
remaining_hours = hours % 24
if remaining_hours > 0:
return f"{days}{remaining_hours}小时"
else:
return f"{days}"

View File

@@ -120,9 +120,33 @@ talk_frequency_adjust = [
# [["", "8:00,1", "12:00,2", "18:00,1.5", "00:00,0.5"]]
# 主动思考功能配置仅在focus模式下生效
enable_proactive_thinking = false # 是否启用主动思考功能
proactive_thinking_interval = 1500 # 主动思考触发间隔时间默认1500秒25分钟
The_scope_that_proactive_thinking_can_trigger = "all" #主动思考可以触发的范围(all - 所有private - 私聊group - 群聊)
# TIPS:
# 创意玩法可以设置为0设置为0时将基于delta_sigma生成纯随机间隔
# 负数保险:如果设置为负数,会自动使用绝对值
proactive_thinking_in_private = true # 主动思考可以在私聊里面启用
proactive_thinking_in_group = true # 主动思考可以在群聊里面启用
proactive_thinking_enable_ids = [123456, 234567] # 启用主动思考的范围,不区分群聊和私聊,为空则不限制
delta_sigma = 120 # 正态分布的标准差,控制时间间隔的随机程度
# 特殊用法:
# - 设置为0禁用正态分布使用固定间隔
# - 设置得很大如6000产生高度随机的间隔即使基础间隔为0也能工作
# - 负数会自动转换为正数,不用担心配置错误以及极端边界情况
# 实验建议:试试 proactive_thinking_interval=0 + delta_sigma 非常大 的纯随机模式!
# 结果保证生成的间隔永远为正数负数会取绝对值最小1秒最大24小时
enable_ids = [] # 启用主动思考的范围,不区分群聊和私聊,为空则不限制
# 主动思考prompt模板{time}会被替换为实际的沉默时间(如"2小时30分15秒"
proactive_thinking_prompt_template = """现在当前的聊天里面已经隔了{time}没有人发送消息了,请你结合上下文以及群聊里面之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择:
1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时)
2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时)
请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复""(注意:这个词不会被发送到群聊中)。"""
# 特定聊天流配置示例: