fix: Update type hints to use newer Python syntax

- Replace Dict, List, Optional with dict, list,  < /dev/null |  None syntax
- Fix abstract method implementation in message.py
- Improve type annotations and function return types
- Remove unreachable code in get_current_task_tool.py
- Refactor HTML elements to use style attributes

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
晴猫
2025-05-01 06:55:05 +09:00
parent 3d001da30e
commit 263e8d196a
29 changed files with 125 additions and 143 deletions

View File

@@ -23,6 +23,7 @@ class ChatObserver:
Args:
stream_id: 聊天流ID
private_name: 私聊名称
Returns:
ChatObserver: 观察器实例

View File

@@ -33,6 +33,7 @@ class PFCManager:
Args:
stream_id: 聊天流ID
private_name: 私聊名称
Returns:
Optional[Conversation]: 对话实例创建失败则返回None

View File

@@ -18,6 +18,7 @@ def get_items_from_json(
Args:
content: 包含JSON的文本
private_name: 私聊名称
*items: 要提取的字段名
default_values: 字段的默认值,格式为 {字段名: 默认值}
required_types: 字段的必需类型,格式为 {字段名: 类型}

View File

@@ -29,6 +29,8 @@ class ReplyChecker:
Args:
reply: 生成的回复
goal: 对话目标
chat_history: 对话历史记录
chat_history_text: 对话历史记录文本
retry_count: 当前重试次数
Returns:

View File

@@ -1,6 +1,7 @@
import time
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
from typing import Optional, Any
import urllib3
@@ -58,12 +59,37 @@ class Message(MessageBase):
# 回复消息
self.reply = reply
async def _process_message_segments(self, segment: Seg) -> str:
"""递归处理消息段,转换为文字描述
Args:
segment: 要处理的消息段
Returns:
str: 处理后的文本
"""
if segment.type == "seglist":
# 处理消息段列表
segments_text = []
for seg in segment.data:
processed = await self._process_message_segments(seg)
if processed:
segments_text.append(processed)
return " ".join(segments_text)
else:
# 处理单个消息段
return await self._process_single_segment(segment)
@abstractmethod
async def _process_single_segment(self, segment):
pass
@dataclass
class MessageRecv(Message):
"""接收消息类用于处理从MessageCQ序列化的消息"""
def __init__(self, message_dict: Dict):
def __init__(self, message_dict: dict[str, Any]):
"""从MessageCQ的字典初始化
Args:
@@ -90,26 +116,7 @@ class MessageRecv(Message):
self.processed_plain_text = await self._process_message_segments(self.message_segment)
self.detailed_plain_text = self._generate_detailed_text()
async def _process_message_segments(self, segment: Seg) -> str:
"""递归处理消息段,转换为文字描述
Args:
segment: 要处理的消息段
Returns:
str: 处理后的文本
"""
if segment.type == "seglist":
# 处理消息段列表
segments_text = []
for seg in segment.data:
processed = await self._process_message_segments(seg)
if processed:
segments_text.append(processed)
return " ".join(segments_text)
else:
# 处理单个消息段
return await self._process_single_segment(segment)
async def _process_single_segment(self, seg: Seg) -> str:
"""处理单个消息段
@@ -179,28 +186,7 @@ class MessageProcessBase(Message):
self.thinking_time = round(time.time() - self.thinking_start_time, 2)
return self.thinking_time
async def _process_message_segments(self, segment: Seg) -> str:
"""递归处理消息段,转换为文字描述
Args:
segment: 要处理的消息段
Returns:
str: 处理后的文本
"""
if segment.type == "seglist":
# 处理消息段列表
segments_text = []
for seg in segment.data:
processed = await self._process_message_segments(seg)
if processed:
segments_text.append(processed)
return " ".join(segments_text)
else:
# 处理单个消息段
return await self._process_single_segment(segment)
async def _process_single_segment(self, seg: Seg) -> Union[str, None]:
async def _process_single_segment(self, seg: Seg) -> str | None:
"""处理单个消息段
Args:
@@ -278,7 +264,7 @@ class MessageSending(MessageProcessBase):
message_id: str,
chat_stream: ChatStream,
bot_user_info: UserInfo,
sender_info: UserInfo, # 用来记录发送者信息,用于私聊回复
sender_info: UserInfo | None, # 用来记录发送者信息,用于私聊回复
message_segment: Seg,
reply: Optional["MessageRecv"] = None,
is_head: bool = False,
@@ -303,7 +289,7 @@ class MessageSending(MessageProcessBase):
self.is_emoji = is_emoji
self.apply_set_reply_logic = apply_set_reply_logic
def set_reply(self, reply: Optional["MessageRecv"] = None) -> None:
def set_reply(self, reply: Optional["MessageRecv"] = None):
"""设置回复消息"""
if self.message_info.format_info is not None and "reply" in self.message_info.format_info.accept_format:
if reply:
@@ -317,7 +303,6 @@ class MessageSending(MessageProcessBase):
self.message_segment,
],
)
return self
async def process(self) -> None:
"""处理消息内容,生成纯文本和详细文本"""
@@ -342,6 +327,7 @@ class MessageSending(MessageProcessBase):
reply=thinking.reply,
is_head=is_head,
is_emoji=is_emoji,
sender_info=None,
)
def to_dict(self):
@@ -361,7 +347,7 @@ class MessageSet:
def __init__(self, chat_stream: ChatStream, message_id: str):
self.chat_stream = chat_stream
self.message_id = message_id
self.messages: List[MessageSending] = []
self.messages: list[MessageSending] = []
self.time = round(time.time(), 3) # 保留3位小数
def add_message(self, message: MessageSending) -> None:

View File

@@ -1,7 +1,7 @@
# src/plugins/chat/message_sender.py
import asyncio
import time
from typing import Dict, List, Optional, Union
from typing import Union
# from ...common.database import db # 数据库依赖似乎不需要了,注释掉
from ..message.api import global_api
@@ -70,7 +70,7 @@ class MessageContainer:
def __init__(self, chat_id: str, max_size: int = 100):
self.chat_id = chat_id
self.max_size = max_size
self.messages: List[Union[MessageThinking, MessageSending]] = [] # 明确类型
self.messages: list[MessageThinking | MessageSending] = [] # 明确类型
self.last_send_time = 0
self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - 从旧 sender 合并
@@ -78,7 +78,7 @@ class MessageContainer:
"""计算当前容器中思考消息的数量"""
return sum(1 for msg in self.messages if isinstance(msg, MessageThinking))
def get_timeout_sending_messages(self) -> List[MessageSending]:
def get_timeout_sending_messages(self) -> list[MessageSending]:
"""获取所有超时的MessageSending对象思考时间超过20秒按thinking_start_time排序 - 从旧 sender 合并"""
current_time = time.time()
timeout_messages = []
@@ -94,7 +94,7 @@ class MessageContainer:
timeout_messages.sort(key=lambda x: x.thinking_start_time)
return timeout_messages
def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]:
def get_earliest_message(self):
"""获取thinking_start_time最早的消息对象"""
if not self.messages:
return None
@@ -108,7 +108,7 @@ class MessageContainer:
earliest_message = msg
return earliest_message
def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None:
def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]):
"""添加消息到队列"""
if isinstance(message, MessageSet):
for single_message in message.messages:
@@ -116,7 +116,7 @@ class MessageContainer:
else:
self.messages.append(message)
def remove_message(self, message_to_remove: Union[MessageThinking, MessageSending]) -> bool:
def remove_message(self, message_to_remove: Union[MessageThinking, MessageSending]):
"""移除指定的消息对象如果消息存在则返回True否则返回False"""
try:
_initial_len = len(self.messages)
@@ -138,7 +138,7 @@ class MessageContainer:
"""检查是否有待发送的消息"""
return bool(self.messages)
def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]:
def get_all_messages(self) -> list[MessageThinking | MessageSending]:
"""获取所有消息"""
return list(self.messages) # 返回副本
@@ -148,7 +148,7 @@ class MessageManager:
def __init__(self):
self._processor_task = None
self.containers: Dict[str, MessageContainer] = {}
self.containers: dict[str, MessageContainer] = {}
self.storage = MessageStorage() # 添加 storage 实例
self._running = True # 处理器运行状态
self._container_lock = asyncio.Lock() # 保护 containers 字典的锁

View File

@@ -2,7 +2,6 @@ import random
import time
import re
from collections import Counter
from typing import Dict, List, Optional
import jieba
import numpy as np
@@ -26,7 +25,7 @@ def is_english_letter(char: str) -> bool:
return "a" <= char.lower() <= "z"
def db_message_to_str(message_dict: Dict) -> str:
def db_message_to_str(message_dict: dict) -> str:
logger.debug(f"message_dict: {message_dict}")
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message_dict["time"]))
try:
@@ -35,7 +34,7 @@ def db_message_to_str(message_dict: Dict) -> str:
message_dict.get("user_nickname", ""),
message_dict.get("user_cardname", ""),
)
except Exception:
except Exception as e:
name = message_dict.get("user_nickname", "") or f"用户{message_dict['user_id']}"
content = message_dict.get("processed_plain_text", "")
result = f"[{time_str}] {name}: {content}\n"
@@ -77,13 +76,13 @@ def is_mentioned_bot_in_message(message: MessageRecv) -> tuple[bool, float]:
if not is_mentioned:
# 判断是否被回复
if re.match(
f"\[回复 [\s\S]*?\({str(global_config.BOT_QQ)}\)[\s\S]*?\],说:", message.processed_plain_text
f"\[回复 [\s\S]*?\({str(global_config.BOT_QQ)}\)[\s\S]*?],说:" , message.processed_plain_text
):
is_mentioned = True
else:
# 判断内容中是否被提及
message_content = re.sub(r"@[\s\S]*?(\d+)", "", message.processed_plain_text)
message_content = re.sub(r"\[回复 [\s\S]*?\(((\d+)|未知id)\)[\s\S]*?\],说:", "", message_content)
message_content = re.sub(r"\[回复 [\s\S]*?\(((\d+)|未知id)\)[\s\S]*?],说:", "", message_content)
for keyword in keywords:
if keyword in message_content:
is_mentioned = True
@@ -223,7 +222,7 @@ def get_recent_group_speaker(chat_stream_id: int, sender, limit: int = 12) -> li
return who_chat_in_group
def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
def split_into_sentences_w_remove_punctuation(text: str) -> list[str]:
"""将文本分割成句子,并根据概率合并
1. 识别分割点(, 。 ; 空格),但如果分割点左右都是英文字母则不分割。
2. 将文本分割成 (内容, 分隔符) 的元组。
@@ -370,7 +369,7 @@ def random_remove_punctuation(text: str) -> str:
return result
def process_llm_response(text: str) -> List[str]:
def process_llm_response(text: str) -> list[str]:
# 先保护颜文字
if global_config.enable_kaomoji_protection:
protected_text, kaomoji_mapping = protect_kaomoji(text)
@@ -379,7 +378,7 @@ def process_llm_response(text: str) -> List[str]:
protected_text = text
kaomoji_mapping = {}
# 提取被 () 或 [] 包裹且包含中文的内容
pattern = re.compile(r"[\(\[\](?=.*[\u4e00-\u9fff]).*?[\)\]\]")
pattern = re.compile(r"[(\[](?=.*[一-鿿]).*?[)\]]")
# _extracted_contents = pattern.findall(text)
_extracted_contents = pattern.findall(protected_text) # 在保护后的文本上查找
# 去除 () 和 [] 及其包裹的内容
@@ -554,7 +553,7 @@ def protect_kaomoji(sentence):
r"[^()\[\]()【】]*?" # 非括号字符(惰性匹配)
r"[^一-龥a-zA-Z0-9\s]" # 非中文、非英文、非数字、非空格字符(必须包含至少一个)
r"[^()\[\]()【】]*?" # 非括号字符(惰性匹配)
r"[\)\])】" # 右括号
r"[)\])】" # 右括号
r"]"
r")"
r"|"
@@ -704,7 +703,7 @@ def count_messages_between(start_time: float, end_time: float, stream_id: str) -
return 0, 0
def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> Optional[str]:
def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> str:
"""将时间戳转换为人类可读的时间格式
Args:
@@ -732,10 +731,9 @@ def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal"
return f"{int(diff / 86400)}天前:\n"
else:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":\n"
elif mode == "lite":
else: # mode = "lite" or unknown
# 只返回时分秒格式,喵~
return time.strftime("%H:%M:%S", time.localtime(timestamp))
return None
def parse_text_timestamps(text: str, mode: str = "normal") -> str:

View File

@@ -511,7 +511,7 @@ class Hippocampus:
"""从文本中提取关键词并获取相关记忆。
Args:
topic (str): 记忆主题
keywords (list): 输入文本
max_memory_num (int, optional): 返回的记忆条目数量上限。默认为3表示最多返回3条与输入文本相关度最高的记忆。
max_memory_length (int, optional): 每个主题最多返回的记忆条目数量。默认为2表示每个主题最多返回2条相似度最高的记忆。
max_depth (int, optional): 记忆检索深度。默认为3。值越大检索范围越广可以获取更多间接相关的记忆但速度会变慢。
@@ -829,7 +829,7 @@ class EntorhinalCortex:
return chat_samples
@staticmethod
def random_get_msg_snippet(target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list:
def random_get_msg_snippet(target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list | None:
"""从数据库中随机获取指定时间戳附近的消息片段 (使用 chat_message_builder)"""
try_count = 0
time_window_seconds = random.randint(300, 1800) # 随机时间窗口5到30分钟

View File

@@ -1,7 +1,6 @@
import datetime
import os
import sys
from typing import Dict
import asyncio
from dateutil import tz
@@ -162,7 +161,7 @@ class ScheduleGenerator:
async def generate_daily_schedule(
self,
target_date: datetime.datetime = None,
) -> Dict[str, str]:
) -> dict[str, str]:
daytime_prompt = self.construct_daytime_prompt(target_date)
daytime_response, _ = await self.llm_scheduler_all.generate_response_async(daytime_prompt)
return daytime_response