🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-06-16 05:48:17 +00:00
parent d554be3bd7
commit 40c5763864
9 changed files with 84 additions and 87 deletions

View File

@@ -896,7 +896,7 @@ class LogViewer:
if current_monitored_file != self.current_log_file:
current_monitored_file = self.current_log_file
last_position = 0 # 重置位置
if current_monitored_file.exists():
try:
# 使用共享读取模式,避免文件锁定
@@ -1113,7 +1113,7 @@ class LogViewer:
filename = filedialog.askopenfilename(
title="选择日志文件",
filetypes=[("JSONL日志文件", "*.jsonl"), ("所有文件", "*.*")],
initialdir="logs" if Path("logs").exists() else "."
initialdir="logs" if Path("logs").exists() else ".",
)
if filename:
new_file = Path(filename)
@@ -1133,14 +1133,14 @@ class LogViewer:
self.modules.clear()
self.selected_modules.clear()
self.log_text.delete(1.0, tk.END)
# 清空日志队列
while not self.log_queue.empty():
try:
self.log_queue.get_nowait()
except queue.Empty:
break
# 重新读取整个文件
if self.current_log_file.exists():
try:
@@ -1149,23 +1149,23 @@ class LogViewer:
try:
log_entry = json.loads(line)
self.log_cache.append(log_entry)
# 收集模块信息
if "logger_name" in log_entry:
self.modules.add(log_entry["logger_name"])
except json.JSONDecodeError:
continue
except Exception as e:
messagebox.showerror("错误", f"读取日志文件失败: {e}")
return
# 更新模块列表UI
self.update_module_list()
# 过滤并显示日志
self.filter_logs()
# 更新窗口标题
self.update_window_title()

View File

@@ -76,7 +76,7 @@ class MaiEmoji:
logger.debug(f"[初始化] 正在解码Base64并计算哈希: {self.filename}")
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode('ascii', errors='ignore').decode('ascii')
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
self.hash = hashlib.md5(image_bytes).hexdigest()
logger.debug(f"[初始化] 哈希计算成功: {self.hash}")
@@ -846,7 +846,7 @@ class EmojiManager:
# 解码图片并获取格式
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode('ascii', errors='ignore').decode('ascii')
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Dict, Any
from typing import List, Dict
from .info_base import InfoBase
@@ -49,17 +49,17 @@ class ExpressionSelectionInfo(InfoBase):
expressions = self.get_selected_expressions()
if not expressions:
return ""
# 格式化表达方式为可读文本
formatted_expressions = []
for expr in expressions:
situation = expr.get("situation", "")
style = expr.get("style", "")
expr_type = expr.get("type", "")
expr.get("type", "")
if situation and style:
formatted_expressions.append(f"{situation}时,使用 {style}")
return "\n".join(formatted_expressions)
def get_expressions_for_action_data(self) -> List[Dict[str, str]]:
@@ -68,4 +68,4 @@ class ExpressionSelectionInfo(InfoBase):
Returns:
List[Dict[str, str]]: 格式化后的表达方式数据
"""
return self.get_selected_expressions()
return self.get_selected_expressions()

View File

@@ -21,27 +21,27 @@ logger = get_logger("processor")
def weighted_sample_no_replacement(items, weights, k) -> list:
"""
加权随机抽样,不允许重复
Args:
items: 待抽样的项目列表
weights: 对应项目的权重列表
k: 抽样数量
Returns:
抽样结果列表
"""
if not items or k <= 0:
return []
k = min(k, len(items))
selected = []
remaining_items = list(items)
remaining_weights = list(weights)
for _ in range(k):
if not remaining_items:
break
# 计算累积权重
total_weight = sum(remaining_weights)
if total_weight <= 0:
@@ -57,13 +57,13 @@ def weighted_sample_no_replacement(items, weights, k) -> list:
if rand_val <= cumulative_weight:
selected_index = i
break
# 添加选中的项目
selected.append(remaining_items[selected_index])
# 移除已选中的项目
remaining_items.pop(selected_index)
remaining_weights.pop(selected_index)
return selected
@@ -98,20 +98,20 @@ class ExpressionSelectorProcessor(BaseProcessor):
def __init__(self, subheartflow_id: str):
super().__init__()
self.subheartflow_id = subheartflow_id
self.last_selection_time = 0
self.selection_interval = 60 # 1分钟间隔
self.cached_expressions = [] # 缓存上一次选择的表达方式
# 表达方式选择模式
self.selection_mode = getattr(global_config.expression, "selection_mode", "llm") # "llm" 或 "random"
self.llm_model = LLMRequest(
model=global_config.model.utils_small,
request_type="focus.processor.expression_selector",
)
name = get_chat_manager().get_stream_name(self.subheartflow_id)
self.log_prefix = f"[{name}] 表达选择器"
@@ -125,7 +125,7 @@ class ExpressionSelectorProcessor(BaseProcessor):
List[InfoBase]: 处理后的表达选择信息列表
"""
current_time = time.time()
# 检查频率限制
if current_time - self.last_selection_time < self.selection_interval:
logger.debug(f"{self.log_prefix} 距离上次选择不足{self.selection_interval}秒,使用缓存的表达方式")
@@ -133,17 +133,17 @@ class ExpressionSelectorProcessor(BaseProcessor):
if self.cached_expressions:
# 从缓存的15个中随机选5个
final_expressions = random.sample(self.cached_expressions, min(5, len(self.cached_expressions)))
# 创建表达选择信息
expression_info = ExpressionSelectionInfo()
expression_info.set_selected_expressions(final_expressions)
logger.info(f"{self.log_prefix} 使用缓存选择了{len(final_expressions)}个表达方式")
return [expression_info]
else:
logger.debug(f"{self.log_prefix} 没有缓存的表达方式,跳过选择")
return []
# 获取聊天内容
chat_info = ""
if observations:
@@ -151,11 +151,11 @@ class ExpressionSelectorProcessor(BaseProcessor):
if isinstance(observation, ChattingObservation):
chat_info = observation.get_observe_info()
break
if not chat_info:
logger.debug(f"{self.log_prefix} 没有聊天内容,跳过表达方式选择")
return []
try:
# 根据模式选择表达方式
if self.selection_mode == "llm":
@@ -168,26 +168,26 @@ class ExpressionSelectorProcessor(BaseProcessor):
selected_expressions = await self._select_suitable_expressions_random(chat_info)
cache_size = len(selected_expressions) if selected_expressions else 0
mode_desc = f"随机模式(已缓存{cache_size}个)"
if selected_expressions:
# 缓存选择的表达方式
self.cached_expressions = selected_expressions
# 更新最后选择时间
self.last_selection_time = current_time
# 从选择的表达方式中随机选5个
final_expressions = random.sample(selected_expressions, min(4, len(selected_expressions)))
# 创建表达选择信息
expression_info = ExpressionSelectionInfo()
expression_info.set_selected_expressions(final_expressions)
logger.info(f"{self.log_prefix} 为当前聊天选择了{len(final_expressions)}个表达方式({mode_desc}")
return [expression_info]
else:
logger.debug(f"{self.log_prefix} 未选择任何表达方式")
return []
except Exception as e:
logger.error(f"{self.log_prefix} 处理表达方式选择时出错: {e}")
return []
@@ -195,31 +195,31 @@ class ExpressionSelectorProcessor(BaseProcessor):
async def _get_random_expressions(self) -> tuple[List[Dict], List[Dict], List[Dict]]:
"""随机获取表达方式20个style20个grammar20个personality"""
expression_learner = get_expression_learner()
# 获取所有表达方式
(
learnt_style_expressions,
learnt_grammar_expressions,
personality_expressions,
) = await expression_learner.get_expression_by_chat_id(self.subheartflow_id)
# 随机选择
selected_style = random.sample(learnt_style_expressions, min(15, len(learnt_style_expressions)))
selected_grammar = random.sample(learnt_grammar_expressions, min(15, len(learnt_grammar_expressions)))
selected_personality = random.sample(personality_expressions, min(5, len(personality_expressions)))
return selected_style, selected_grammar, selected_personality
async def _select_suitable_expressions_llm(self, chat_info: str) -> List[Dict[str, str]]:
"""使用LLM选择适合的表达方式"""
# 1. 获取35个随机表达方式
style_exprs, grammar_exprs, personality_exprs = await self._get_random_expressions()
# 2. 构建所有表达方式的索引和情境列表
all_expressions = []
all_situations = []
# 添加style表达方式
for expr in style_exprs:
if isinstance(expr, dict) and "situation" in expr and "style" in expr:
@@ -227,7 +227,7 @@ class ExpressionSelectorProcessor(BaseProcessor):
expr_with_type["type"] = "style"
all_expressions.append(expr_with_type)
all_situations.append(f"{len(all_expressions)}. [语言风格] {expr['situation']}")
# 添加grammar表达方式
for expr in grammar_exprs:
if isinstance(expr, dict) and "situation" in expr and "style" in expr:
@@ -235,7 +235,7 @@ class ExpressionSelectorProcessor(BaseProcessor):
expr_with_type["type"] = "grammar"
all_expressions.append(expr_with_type)
all_situations.append(f"{len(all_expressions)}. [句法语法] {expr['situation']}")
# 添加personality表达方式
for expr in personality_exprs:
if isinstance(expr, dict) and "situation" in expr and "style" in expr:
@@ -243,57 +243,57 @@ class ExpressionSelectorProcessor(BaseProcessor):
expr_with_type["type"] = "personality"
all_expressions.append(expr_with_type)
all_situations.append(f"{len(all_expressions)}. [个性表达] {expr['situation']}")
if not all_expressions:
logger.warning(f"{self.log_prefix} 没有找到可用的表达方式")
return []
all_situations_str = "\n".join(all_situations)
# 3. 构建prompt只包含情境不包含完整的表达方式
prompt = (await global_prompt_manager.get_prompt_async("expression_evaluation_prompt")).format(
bot_name=global_config.bot.nickname,
chat_observe_info=chat_info,
all_situations=all_situations_str,
)
# 4. 调用LLM
try:
content, _ = await self.llm_model.generate_response_async(prompt=prompt)
logger.info(f"{self.log_prefix} LLM返回结果: {content}")
if not content:
logger.warning(f"{self.log_prefix} LLM返回空结果")
return []
# 5. 解析结果
result = repair_json(content)
if isinstance(result, str):
result = json.loads(result)
if not isinstance(result, dict) or "selected_situations" not in result:
logger.error(f"{self.log_prefix} LLM返回格式错误")
return []
selected_indices = result["selected_situations"]
# 根据索引获取完整的表达方式
valid_expressions = []
for idx in selected_indices:
if isinstance(idx, int) and 1 <= idx <= len(all_expressions):
valid_expressions.append(all_expressions[idx - 1]) # 索引从1开始
logger.info(f"{self.log_prefix} LLM从{len(all_expressions)}个情境中选择了{len(valid_expressions)}")
return valid_expressions
except Exception as e:
logger.error(f"{self.log_prefix} LLM处理表达方式选择时出错: {e}")
return []
async def _select_suitable_expressions_random(self, chat_info: str) -> List[Dict[str, str]]:
"""随机选择表达方式原replyer逻辑"""
# 获取所有表达方式
expression_learner = get_expression_learner()
(
@@ -301,9 +301,9 @@ class ExpressionSelectorProcessor(BaseProcessor):
learnt_grammar_expressions,
personality_expressions,
) = await expression_learner.get_expression_by_chat_id(self.subheartflow_id)
selected_expressions = []
# 1. learnt_style_expressions相似度匹配选择3条
if learnt_style_expressions:
similar_exprs = self._find_similar_expressions(chat_info, learnt_style_expressions, 3)
@@ -312,7 +312,7 @@ class ExpressionSelectorProcessor(BaseProcessor):
expr_copy = expr.copy()
expr_copy["type"] = "style"
selected_expressions.append(expr_copy)
# 2. learnt_grammar_expressions加权随机选2条
if learnt_grammar_expressions:
weights = [expr.get("count", 1) for expr in learnt_grammar_expressions]
@@ -322,7 +322,7 @@ class ExpressionSelectorProcessor(BaseProcessor):
expr_copy = expr.copy()
expr_copy["type"] = "grammar"
selected_expressions.append(expr_copy)
# 3. personality_expressions随机选1条
if personality_expressions:
expr = random.choice(personality_expressions)
@@ -330,7 +330,7 @@ class ExpressionSelectorProcessor(BaseProcessor):
expr_copy = expr.copy()
expr_copy["type"] = "personality"
selected_expressions.append(expr_copy)
logger.info(f"{self.log_prefix} 随机模式选择了{len(selected_expressions)}个表达方式")
return selected_expressions
@@ -338,28 +338,28 @@ class ExpressionSelectorProcessor(BaseProcessor):
"""使用简单的文本匹配找出相似的表达方式简化版避免依赖sklearn"""
if not expressions or not input_text:
return random.sample(expressions, min(top_k, len(expressions))) if expressions else []
# 简单的关键词匹配
scored_expressions = []
input_words = set(input_text.lower().split())
for expr in expressions:
situation = expr.get("situation", "").lower()
situation_words = set(situation.split())
# 计算交集大小作为相似度
similarity = len(input_words & situation_words)
scored_expressions.append((similarity, expr))
# 按相似度排序
scored_expressions.sort(key=lambda x: x[0], reverse=True)
# 如果没有匹配的,随机选择
if all(score == 0 for score, _ in scored_expressions):
return random.sample(expressions, min(top_k, len(expressions)))
# 返回top_k个最相似的
return [expr for _, expr in scored_expressions[:top_k]]
init_prompt()
init_prompt()

View File

@@ -241,7 +241,7 @@ class ActionPlanner(BasePlanner):
if relation_info:
action_data["relation_info_block"] = relation_info
# 将选中的表达方式传递给action_data
if selected_expressions:
action_data["selected_expressions"] = selected_expressions

View File

@@ -1,7 +1,6 @@
import traceback
from typing import List, Optional, Dict, Any, Tuple
from src.chat.focus_chat.expressors.exprssion_learner import get_expression_learner
from src.chat.message_receive.message import MessageRecv, MessageThinking, MessageSending
from src.chat.message_receive.message import Seg # Local import needed after move
from src.chat.message_receive.message import UserInfo
@@ -350,7 +349,7 @@ class DefaultReplyer:
# 使用从处理器传来的选中表达方式
selected_expressions = action_data.get("selected_expressions", []) if action_data else []
if selected_expressions:
logger.info(f"{self.log_prefix} 使用处理器选中的{len(selected_expressions)}个表达方式")
for expr in selected_expressions:

View File

@@ -100,7 +100,7 @@ class ImageManager:
# 计算图片哈希
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode('ascii', errors='ignore').decode('ascii')
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
image_hash = hashlib.md5(image_bytes).hexdigest()
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
@@ -180,7 +180,7 @@ class ImageManager:
# 计算图片哈希
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode('ascii', errors='ignore').decode('ascii')
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
image_hash = hashlib.md5(image_bytes).hexdigest()
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
@@ -263,7 +263,7 @@ class ImageManager:
try:
# 确保base64字符串只包含ASCII字符
if isinstance(gif_base64, str):
gif_base64 = gif_base64.encode('ascii', errors='ignore').decode('ascii')
gif_base64 = gif_base64.encode("ascii", errors="ignore").decode("ascii")
# 解码base64
gif_data = base64.b64decode(gif_base64)
gif = Image.open(io.BytesIO(gif_data))
@@ -385,7 +385,7 @@ class ImageManager:
# 计算图片哈希
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode('ascii', errors='ignore').decode('ascii')
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
image_hash = hashlib.md5(image_bytes).hexdigest()
@@ -458,7 +458,7 @@ class ImageManager:
# 计算图片哈希
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode('ascii', errors='ignore').decode('ascii')
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
image_hash = hashlib.md5(image_bytes).hexdigest()

View File

@@ -61,9 +61,7 @@ def get_console_handler():
class TimestampedFileHandler(logging.Handler):
"""基于时间戳的文件处理器,简单的轮转份数限制"""
def __init__(
self, log_dir, max_bytes=2 * 1024 * 1024, backup_count=30, encoding="utf-8"
):
def __init__(self, log_dir, max_bytes=2 * 1024 * 1024, backup_count=30, encoding="utf-8"):
super().__init__()
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
@@ -100,8 +98,6 @@ class TimestampedFileHandler(logging.Handler):
# 创建新文件
self._init_current_file()
def _cleanup_old_files(self):
"""清理旧的日志文件,保留指定数量"""
try:
@@ -922,7 +918,9 @@ def force_initialize_logging():
logger = get_logger("logger")
console_level = LOG_CONFIG.get("console_log_level", LOG_CONFIG.get("log_level", "INFO"))
file_level = LOG_CONFIG.get("file_log_level", LOG_CONFIG.get("log_level", "INFO"))
logger.info(f"日志系统已强制重新初始化,控制台级别: {console_level},文件级别: {file_level},轮转份数: 30个文件所有logger格式已统一")
logger.info(
f"日志系统已强制重新初始化,控制台级别: {console_level},文件级别: {file_level},轮转份数: 30个文件所有logger格式已统一"
)
def show_module_colors():

View File

@@ -838,7 +838,7 @@ def compress_base64_image_by_scale(base64_data: str, target_size: int = 0.8 * 10
# 将base64转换为字节数据
# 确保base64字符串只包含ASCII字符
if isinstance(base64_data, str):
base64_data = base64_data.encode('ascii', errors='ignore').decode('ascii')
base64_data = base64_data.encode("ascii", errors="ignore").decode("ascii")
image_data = base64.b64decode(base64_data)
# 如果已经小于目标大小,直接返回原图