This commit is contained in:
SengokuCola
2025-04-13 21:16:32 +08:00
10 changed files with 294 additions and 143 deletions

View File

@@ -9,6 +9,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with:
fetch-depth: 0
ref: ${{ github.head_ref || github.ref_name }}
- uses: astral-sh/ruff-action@v3 - uses: astral-sh/ruff-action@v3
- run: ruff check --fix - run: ruff check --fix
- run: ruff format - run: ruff format

4
.gitignore vendored
View File

@@ -28,6 +28,9 @@ config/bot_config.toml
config/bot_config.toml.bak config/bot_config.toml.bak
src/plugins/remote/client_uuid.json src/plugins/remote/client_uuid.json
run_none.bat run_none.bat
(测试版)麦麦生成人格.bat
(临时版)麦麦开始学习.bat
src/plugins/utils/statistic.py
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
@@ -237,3 +240,4 @@ logs
/config/* /config/*
run_none.bat run_none.bat
config/old/bot_config_20250405_212257.toml config/old/bot_config_20250405_212257.toml

View File

@@ -19,6 +19,7 @@ from ...chat.chat_stream import chat_manager
from ...person_info.relationship_manager import relationship_manager from ...person_info.relationship_manager import relationship_manager
from ...chat.message_buffer import message_buffer from ...chat.message_buffer import message_buffer
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from ...utils.timer_calculater import Timer
# 定义日志配置 # 定义日志配置
chat_config = LogConfig( chat_config = LogConfig(
@@ -173,12 +174,10 @@ class ReasoningChat:
await self.storage.store_message(message, chat) await self.storage.store_message(message, chat)
# 记忆激活 # 记忆激活
timer1 = time.time() with Timer("记忆激活", timing_results):
interested_rate = await HippocampusManager.get_instance().get_activate_from_text( interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
message.processed_plain_text, fast_retrieval=True message.processed_plain_text, fast_retrieval=True
) )
timer2 = time.time()
timing_results["记忆激活"] = timer2 - timer1
# 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text # 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text
buffer_result = await message_buffer.query_buffer_result(message) buffer_result = await message_buffer.query_buffer_result(message)
@@ -228,10 +227,8 @@ class ReasoningChat:
await willing_manager.before_generate_reply_handle(message.message_info.message_id) await willing_manager.before_generate_reply_handle(message.message_info.message_id)
# 创建思考消息 # 创建思考消息
timer1 = time.time() with Timer("创建思考消息", timing_results):
thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo)
timer2 = time.time()
timing_results["创建思考消息"] = timer2 - timer1
logger.debug(f"创建捕捉器thinking_id:{thinking_id}") logger.debug(f"创建捕捉器thinking_id:{thinking_id}")
@@ -239,11 +236,9 @@ class ReasoningChat:
info_catcher.catch_decide_to_response(message) info_catcher.catch_decide_to_response(message)
# 生成回复 # 生成回复
timer1 = time.time()
try: try:
with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(message, thinking_id) response_set = await self.gpt.generate_response(message, thinking_id)
timer2 = time.time()
timing_results["生成回复"] = timer2 - timer1
info_catcher.catch_after_generate_response(timing_results["生成回复"]) info_catcher.catch_after_generate_response(timing_results["生成回复"])
except Exception as e: except Exception as e:
@@ -255,26 +250,20 @@ class ReasoningChat:
return return
# 发送消息 # 发送消息
timer1 = time.time() with Timer("发送消息", timing_results):
first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id) first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id)
timer2 = time.time()
timing_results["发送消息"] = timer2 - timer1
info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg) info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg)
info_catcher.done_catch() info_catcher.done_catch()
# 处理表情包 # 处理表情包
timer1 = time.time() with Timer("处理表情包", timing_results):
await self._handle_emoji(message, chat, response_set) await self._handle_emoji(message, chat, response_set)
timer2 = time.time()
timing_results["处理表情包"] = timer2 - timer1
# 更新关系情绪 # 更新关系情绪
timer1 = time.time() with Timer("更新关系情绪", timing_results):
await self._update_relationship(message, response_set) await self._update_relationship(message, response_set)
timer2 = time.time()
timing_results["更新关系情绪"] = timer2 - timer1
# 回复后处理 # 回复后处理
await willing_manager.after_generate_reply_handle(message.message_info.message_id) await willing_manager.after_generate_reply_handle(message.message_info.message_id)

View File

@@ -1,4 +1,3 @@
import time
from typing import List, Optional, Tuple, Union from typing import List, Optional, Tuple, Union
import random import random
@@ -7,6 +6,7 @@ from ...config.config import global_config
from ...chat.message import MessageThinking from ...chat.message import MessageThinking
from .reasoning_prompt_builder import prompt_builder from .reasoning_prompt_builder import prompt_builder
from ...chat.utils import process_llm_response from ...chat.utils import process_llm_response
from ...utils.timer_calculater import Timer
from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
@@ -82,15 +82,14 @@ class ResponseGenerator:
logger.debug("开始使用生成回复-2") logger.debug("开始使用生成回复-2")
# 构建prompt # 构建prompt
timer1 = time.time() with Timer() as t_build_prompt:
prompt = await prompt_builder._build_prompt( prompt = await prompt_builder._build_prompt(
message.chat_stream, message.chat_stream,
message_txt=message.processed_plain_text, message_txt=message.processed_plain_text,
sender_name=sender_name, sender_name=sender_name,
stream_id=message.chat_stream.stream_id, stream_id=message.chat_stream.stream_id,
) )
timer2 = time.time() logger.info(f"构建prompt时间: {t_build_prompt.human_readable}")
logger.info(f"构建prompt时间: {timer2 - timer1}")
try: try:
content, reasoning_content, self.current_model_name = await model.generate_response(prompt) content, reasoning_content, self.current_model_name = await model.generate_response(prompt)

View File

@@ -175,7 +175,7 @@ class PromptBuilder:
# moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。
# 涉及政治敏感以及违法违规的内容请规避。""" # 涉及政治敏感以及违法违规的内容请规避。"""
logger.info("开始构建prompt") logger.debug("开始构建prompt")
# prompt = f""" # prompt = f"""
# {relation_prompt_all} # {relation_prompt_all}

View File

@@ -20,6 +20,7 @@ from ...chat.chat_stream import chat_manager
from ...person_info.relationship_manager import relationship_manager from ...person_info.relationship_manager import relationship_manager
from ...chat.message_buffer import message_buffer from ...chat.message_buffer import message_buffer
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from ...utils.timer_calculater import Timer
# 定义日志配置 # 定义日志配置
chat_config = LogConfig( chat_config = LogConfig(
@@ -196,12 +197,10 @@ class ThinkFlowChat:
logger.debug(f"存储成功{message.processed_plain_text}") logger.debug(f"存储成功{message.processed_plain_text}")
# 记忆激活 # 记忆激活
timer1 = time.time() with Timer("记忆激活", timing_results):
interested_rate = await HippocampusManager.get_instance().get_activate_from_text( interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
message.processed_plain_text, fast_retrieval=True message.processed_plain_text, fast_retrieval=True
) )
timer2 = time.time()
timing_results["记忆激活"] = timer2 - timer1
logger.debug(f"记忆激活: {interested_rate}") logger.debug(f"记忆激活: {interested_rate}")
# 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text # 查询缓冲器结果会整合前面跳过的消息改变processed_plain_text
@@ -262,10 +261,8 @@ class ThinkFlowChat:
# 创建思考消息 # 创建思考消息
try: try:
timer1 = time.time() with Timer("创建思考消息", timing_results):
thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo)
timer2 = time.time()
timing_results["创建思考消息"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流创建思考消息失败: {e}") logger.error(f"心流创建思考消息失败: {e}")
@@ -276,10 +273,8 @@ class ThinkFlowChat:
try: try:
# 观察 # 观察
timer1 = time.time() with Timer("观察", timing_results):
await heartflow.get_subheartflow(chat.stream_id).do_observe() await heartflow.get_subheartflow(chat.stream_id).do_observe()
timer2 = time.time()
timing_results["观察"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流观察失败: {e}") logger.error(f"心流观察失败: {e}")
@@ -287,24 +282,22 @@ class ThinkFlowChat:
# 思考前脑内状态 # 思考前脑内状态
try: try:
timer1 = time.time() with Timer("思考前脑内状态", timing_results):
current_mind, past_mind = await heartflow.get_subheartflow(chat.stream_id).do_thinking_before_reply( current_mind, past_mind = await heartflow.get_subheartflow(
chat.stream_id
).do_thinking_before_reply(
message_txt=message.processed_plain_text, message_txt=message.processed_plain_text,
sender_name=message.message_info.user_info.user_nickname, sender_name=message.message_info.user_info.user_nickname,
chat_stream=chat, chat_stream=chat,
) )
timer2 = time.time()
timing_results["思考前脑内状态"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流思考前脑内状态失败: {e}") logger.error(f"心流思考前脑内状态失败: {e}")
info_catcher.catch_afer_shf_step(timing_results["思考前脑内状态"], past_mind, current_mind) info_catcher.catch_afer_shf_step(timing_results["思考前脑内状态"], past_mind, current_mind)
# 生成回复 # 生成回复
timer1 = time.time() with Timer("生成回复", timing_results):
response_set = await self.gpt.generate_response(message, thinking_id) response_set = await self.gpt.generate_response(message, thinking_id)
timer2 = time.time()
timing_results["生成回复"] = timer2 - timer1
info_catcher.catch_after_generate_response(timing_results["生成回复"]) info_catcher.catch_after_generate_response(timing_results["生成回复"])
@@ -314,10 +307,8 @@ class ThinkFlowChat:
# 发送消息 # 发送消息
try: try:
timer1 = time.time() with Timer("发送消息", timing_results):
first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id) first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id)
timer2 = time.time()
timing_results["发送消息"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流发送消息失败: {e}") logger.error(f"心流发送消息失败: {e}")
@@ -327,28 +318,22 @@ class ThinkFlowChat:
# 处理表情包 # 处理表情包
try: try:
timer1 = time.time() with Timer("处理表情包", timing_results):
await self._handle_emoji(message, chat, response_set) await self._handle_emoji(message, chat, response_set)
timer2 = time.time()
timing_results["处理表情包"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流处理表情包失败: {e}") logger.error(f"心流处理表情包失败: {e}")
# 更新心流 # 更新心流
try: try:
timer1 = time.time() with Timer("更新心流", timing_results):
await self._update_using_response(message, response_set) await self._update_using_response(message, response_set)
timer2 = time.time()
timing_results["更新心流"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流更新失败: {e}") logger.error(f"心流更新失败: {e}")
# 更新关系情绪 # 更新关系情绪
try: try:
timer1 = time.time() with Timer("更新关系情绪", timing_results):
await self._update_relationship(message, response_set) await self._update_relationship(message, response_set)
timer2 = time.time()
timing_results["更新关系情绪"] = timer2 - timer1
except Exception as e: except Exception as e:
logger.error(f"心流更新关系情绪失败: {e}") logger.error(f"心流更新关系情绪失败: {e}")

View File

@@ -1,4 +1,3 @@
import time
from typing import List, Optional from typing import List, Optional
import random import random
@@ -10,6 +9,7 @@ from .think_flow_prompt_builder import prompt_builder
from ...chat.utils import process_llm_response from ...chat.utils import process_llm_response
from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG
from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager
from ...utils.timer_calculater import Timer
from src.plugins.moods.moods import MoodManager from src.plugins.moods.moods import MoodManager
@@ -44,8 +44,7 @@ class ResponseGenerator:
arousal_multiplier = MoodManager.get_instance().get_arousal_multiplier() arousal_multiplier = MoodManager.get_instance().get_arousal_multiplier()
time1 = time.time() with Timer() as t_generate_response:
checked = False checked = False
if random.random() > 0: if random.random() > 0:
checked = False checked = False
@@ -70,15 +69,15 @@ class ResponseGenerator:
message, model_response, current_model, thinking_id message, model_response, current_model, thinking_id
) )
time2 = time.time()
if model_response: if model_response:
if checked: if checked:
logger.info( logger.info(
f"{global_config.BOT_NICKNAME}的回复是:{model_response},思忖后,回复是:{model_checked_response},生成回复时间: {time2 - time1}" f"{global_config.BOT_NICKNAME}的回复是:{model_response},思忖后,回复是:{model_checked_response},生成回复时间: {t_generate_response.human_readable}"
) )
else: else:
logger.info(f"{global_config.BOT_NICKNAME}的回复是:{model_response},生成回复时间: {time2 - time1}") logger.info(
f"{global_config.BOT_NICKNAME}的回复是:{model_response},生成回复时间: {t_generate_response.human_readable}"
)
model_processed_response = await self._process_response(model_checked_response) model_processed_response = await self._process_response(model_checked_response)
@@ -105,7 +104,7 @@ class ResponseGenerator:
sender_name = f"用户({message.chat_stream.user_info.user_id})" sender_name = f"用户({message.chat_stream.user_info.user_id})"
# 构建prompt # 构建prompt
timer1 = time.time() with Timer() as t_build_prompt:
if mode == "normal": if mode == "normal":
prompt = await prompt_builder._build_prompt( prompt = await prompt_builder._build_prompt(
message.chat_stream, message.chat_stream,
@@ -120,8 +119,7 @@ class ResponseGenerator:
sender_name=sender_name, sender_name=sender_name,
stream_id=message.chat_stream.stream_id, stream_id=message.chat_stream.stream_id,
) )
timer2 = time.time() logger.info(f"构建{mode}prompt时间: {t_build_prompt.human_readable}")
logger.info(f"构建{mode}prompt时间: {timer2 - timer1}")
try: try:
content, reasoning_content, self.current_model_name = await model.generate_response(prompt) content, reasoning_content, self.current_model_name = await model.generate_response(prompt)
@@ -153,7 +151,7 @@ class ResponseGenerator:
sender_name = f"用户({message.chat_stream.user_info.user_id})" sender_name = f"用户({message.chat_stream.user_info.user_id})"
# 构建prompt # 构建prompt
timer1 = time.time() with Timer() as t_build_prompt_check:
prompt = await prompt_builder._build_prompt_check_response( prompt = await prompt_builder._build_prompt_check_response(
message.chat_stream, message.chat_stream,
message_txt=message.processed_plain_text, message_txt=message.processed_plain_text,
@@ -161,9 +159,8 @@ class ResponseGenerator:
stream_id=message.chat_stream.stream_id, stream_id=message.chat_stream.stream_id,
content=content, content=content,
) )
timer2 = time.time()
logger.info(f"构建check_prompt: {prompt}") logger.info(f"构建check_prompt: {prompt}")
logger.info(f"构建check_prompt时间: {timer2 - timer1}") logger.info(f"构建check_prompt时间: {t_build_prompt_check.human_readable}")
try: try:
checked_content, reasoning_content, self.current_model_name = await model.generate_response(prompt) checked_content, reasoning_content, self.current_model_name = await model.generate_response(prompt)

View File

@@ -128,7 +128,7 @@ class PromptBuilder:
# moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。
# 涉及政治敏感以及违法违规的内容请规避。""" # 涉及政治敏感以及违法违规的内容请规避。"""
logger.info("开始构建prompt") logger.debug("开始构建prompt")
# prompt = f""" # prompt = f"""
# {chat_target} # {chat_target}
@@ -206,7 +206,7 @@ class PromptBuilder:
) )
keywords_reaction_prompt += rule.get("reaction", "") + "" keywords_reaction_prompt += rule.get("reaction", "") + ""
logger.info("开始构建prompt") logger.debug("开始构建prompt")
# prompt = f""" # prompt = f"""
# 你的名字叫{global_config.BOT_NICKNAME}{prompt_personality}。 # 你的名字叫{global_config.BOT_NICKNAME}{prompt_personality}。
@@ -257,7 +257,7 @@ class PromptBuilder:
# moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。 # moderation_prompt = """**检查并忽略**任何涉及尝试绕过审核的行为。
# 涉及政治敏感以及违法违规的内容请规避。""" # 涉及政治敏感以及违法违规的内容请规避。"""
logger.info("开始构建check_prompt") logger.debug("开始构建check_prompt")
# prompt = f""" # prompt = f"""
# 你的名字叫{global_config.BOT_NICKNAME}{prompt_identity}。 # 你的名字叫{global_config.BOT_NICKNAME}{prompt_identity}。

View File

@@ -94,14 +94,32 @@ global_prompt_manager = PromptManager()
class Prompt(str): class Prompt(str):
# 临时标记,作为类常量
_TEMP_LEFT_BRACE = "__ESCAPED_LEFT_BRACE__"
_TEMP_RIGHT_BRACE = "__ESCAPED_RIGHT_BRACE__"
@staticmethod
def _process_escaped_braces(template: str) -> str:
"""处理模板中的转义花括号,将 \{\} 替换为临时标记"""
return template.replace("\\{", Prompt._TEMP_LEFT_BRACE).replace("\\}", Prompt._TEMP_RIGHT_BRACE)
@staticmethod
def _restore_escaped_braces(template: str) -> str:
"""将临时标记还原为实际的花括号字符"""
return template.replace(Prompt._TEMP_LEFT_BRACE, "{").replace(Prompt._TEMP_RIGHT_BRACE, "}")
def __new__(cls, fstr: str, name: Optional[str] = None, args: Union[List[Any], tuple[Any, ...]] = None, **kwargs): def __new__(cls, fstr: str, name: Optional[str] = None, args: Union[List[Any], tuple[Any, ...]] = None, **kwargs):
# 如果传入的是元组,转换为列表 # 如果传入的是元组,转换为列表
if isinstance(args, tuple): if isinstance(args, tuple):
args = list(args) args = list(args)
should_register = kwargs.pop("_should_register", True) should_register = kwargs.pop("_should_register", True)
# 预处理模板中的转义花括号
processed_fstr = cls._process_escaped_braces(fstr)
# 解析模板 # 解析模板
template_args = [] template_args = []
result = re.findall(r"\{(.*?)\}", fstr) result = re.findall(r"\{(.*?)\}", processed_fstr)
for expr in result: for expr in result:
if expr and expr not in template_args: if expr and expr not in template_args:
template_args.append(expr) template_args.append(expr)
@@ -142,8 +160,11 @@ class Prompt(str):
@classmethod @classmethod
def _format_template(cls, template: str, args: List[Any] = None, kwargs: Dict[str, Any] = None) -> str: def _format_template(cls, template: str, args: List[Any] = None, kwargs: Dict[str, Any] = None) -> str:
# 预处理模板中的转义花括号
processed_template = cls._process_escaped_braces(template)
template_args = [] template_args = []
result = re.findall(r"\{(.*?)\}", template) result = re.findall(r"\{(.*?)\}", processed_template)
for expr in result: for expr in result:
if expr and expr not in template_args: if expr and expr not in template_args:
template_args.append(expr) template_args.append(expr)
@@ -177,13 +198,15 @@ class Prompt(str):
try: try:
# 先用位置参数格式化 # 先用位置参数格式化
if args: if args:
template = template.format(**formatted_args) processed_template = processed_template.format(**formatted_args)
# 再用关键字参数格式化 # 再用关键字参数格式化
if kwargs: if kwargs:
template = template.format(**formatted_kwargs) processed_template = processed_template.format(**formatted_kwargs)
return template
# 将临时标记还原为实际的花括号
result = cls._restore_escaped_braces(processed_template)
return result
except (IndexError, KeyError) as e: except (IndexError, KeyError) as e:
raise ValueError( raise ValueError(
f"格式化模板失败: {template}, args={formatted_args}, kwargs={formatted_kwargs} {str(e)}" f"格式化模板失败: {template}, args={formatted_args}, kwargs={formatted_kwargs} {str(e)}"

View File

@@ -0,0 +1,151 @@
from time import perf_counter
from functools import wraps
from typing import Optional, Dict, Callable
import asyncio
"""
# 更好的计时器
使用形式:
- 上下文
- 装饰器
- 直接实例化
使用场景:
- 使用Timer在需要测量代码执行时间时如性能测试、计时器工具Timer类是更可靠、高精度的选择。
- 使用time.time()的场景:当需要记录实际时间点(如日志、时间戳)时使用,但避免用它测量时间间隔。
使用方式:
【装饰器】
time_dict = {}
@Timer("计数", time_dict)
def func():
pass
print(time_dict)
【上下文_1】
def func():
with Timer() as t:
pass
print(t)
print(t.human_readable)
【上下文_2】
def func():
time_dict = {}
with Timer("计数", time_dict):
pass
print(time_dict)
【直接实例化】
a = Timer()
print(a) # 直接输出当前 perf_counter 值
参数:
- name计时器的名字默认为 None
- storage计时器结果存储字典默认为 None
- auto_unit自动选择单位毫秒或秒默认为 True自动根据时间切换毫秒或秒
- do_type_check是否进行类型检查默认为 False不进行类型检查
属性human_readable
自定义错误TimerTypeError
"""
class TimerTypeError(TypeError):
"""自定义类型错误"""
__slots__ = ()
def __init__(self, param, expected_type, actual_type):
super().__init__(f"参数 '{param}' 类型错误,期望 {expected_type},实际得到 {actual_type.__name__}")
class Timer:
"""
Timer 支持三种模式:
1. 装饰器模式:用于测量函数/协程运行时间
2. 上下文管理器模式:用于 with 语句块内部计时
3. 直接实例化:如果不调用 __enter__打印对象时将显示当前 perf_counter 的值
"""
__slots__ = ("name", "storage", "elapsed", "auto_unit", "start")
def __init__(
self,
name: Optional[str] = None,
storage: Optional[Dict[str, float]] = None,
auto_unit: bool = True,
do_type_check: bool = False,
):
if do_type_check:
self._validate_types(name, storage)
self.name = name
self.storage = storage
self.elapsed = None
self.auto_unit = auto_unit
self.start = None
def _validate_types(self, name, storage):
"""类型检查"""
if name is not None and not isinstance(name, str):
raise TimerTypeError("name", "Optional[str]", type(name))
if storage is not None and not isinstance(storage, dict):
raise TimerTypeError("storage", "Optional[dict]", type(storage))
def __call__(self, func: Optional[Callable] = None) -> Callable:
"""装饰器模式"""
if func is None:
return lambda f: Timer(name=self.name or f.__name__, storage=self.storage, auto_unit=self.auto_unit)(f)
@wraps(func)
async def async_wrapper(*args, **kwargs):
with self:
return await func(*args, **kwargs)
@wraps(func)
def sync_wrapper(*args, **kwargs):
with self:
return func(*args, **kwargs)
wrapper = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
wrapper.__timer__ = self # 保留计时器引用
return wrapper
def __enter__(self):
"""上下文管理器入口"""
self.start = perf_counter()
return self
def __exit__(self, *args):
self.elapsed = perf_counter() - self.start
self._record_time()
return False
def _record_time(self):
"""记录时间"""
if self.storage is not None and self.name:
self.storage[self.name] = self.elapsed
@property
def human_readable(self) -> str:
"""人类可读时间格式"""
if self.elapsed is None:
return "未计时"
if self.auto_unit:
return f"{self.elapsed * 1000:.2f}毫秒" if self.elapsed < 1 else f"{self.elapsed:.2f}"
return f"{self.elapsed:.4f}"
def __str__(self):
if self.start is not None:
if self.elapsed is None:
current_elapsed = perf_counter() - self.start
return f"<Timer {self.name or '匿名'} [计时中: {current_elapsed:.4f}秒]>"
return f"<Timer {self.name or '匿名'} [{self.human_readable}]>"
return f"{perf_counter()}"