Revert: 回退并发消息处理系统的相关提交

由于并发消息处理的实现在测试中暴露出消息重复和目标ID丢失的问题,暂时回退至该功能合并前的稳定状态,以便进一步排查问题。
This commit is contained in:
tt-P607
2025-09-26 01:52:50 +08:00
parent c5a55df5ec
commit 8d725911da
7 changed files with 430 additions and 288 deletions

View File

@@ -79,9 +79,7 @@ class ChatterManager:
del self.instances[stream_id]
logger.info(f"清理不活跃聊天流实例: {stream_id}")
async def process_stream_context(
self, stream_id: str, context: StreamContext, unread_messages: Optional[List[Any]] = None
) -> dict:
async def process_stream_context(self, stream_id: str, context: StreamContext) -> dict:
"""处理流上下文"""
chat_type = context.chat_type
logger.debug(f"处理流 {stream_id},聊天类型: {chat_type.value}")
@@ -106,14 +104,9 @@ class ChatterManager:
self.stats["streams_processed"] += 1
try:
# 如果提供了 unread_messages则传递给 execute 方法
if unread_messages:
result = await self.instances[stream_id].execute(context, unread_messages)
else:
result = await self.instances[stream_id].execute(context)
result = await self.instances[stream_id].execute(context)
self.stats["successful_executions"] += 1
# 记录处理结果
success = result.get("success", False)
actions_count = result.get("actions_count", 0)

View File

@@ -7,7 +7,7 @@ import asyncio
import random
import time
import traceback
from typing import Dict, Optional, Any, TYPE_CHECKING, List
from typing import Dict, Optional, Any, TYPE_CHECKING
from src.common.logger import get_logger
from src.common.data_models.database_data_model import DatabaseMessages
@@ -30,13 +30,10 @@ class MessageManager:
def __init__(self, check_interval: float = 5.0):
self.stream_contexts: Dict[str, StreamContext] = {}
self.check_interval = check_interval
self.check_interval = check_interval # 检查间隔(秒)
self.is_running = False
self.manager_task: Optional[asyncio.Task] = None
# 并发控制信号量
self.concurrent_semaphore: Optional[asyncio.Semaphore] = None
# 统计信息
self.stats = MessageManagerStats()
@@ -56,10 +53,6 @@ class MessageManager:
self.is_running = True
self.manager_task = asyncio.create_task(self._manager_loop())
if global_config.chat.concurrent_message_processing:
limit = global_config.chat.concurrent_per_user_limit
self.concurrent_semaphore = asyncio.Semaphore(limit)
logger.info(f"并发处理已启用,全局并发限制: {limit}")
await self.wakeup_manager.start()
logger.info("消息管理器已启动")
@@ -72,12 +65,8 @@ class MessageManager:
# 停止所有流处理任务
for context in self.stream_contexts.values():
if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done():
if context.processing_task and not context.processing_task.done():
context.processing_task.cancel()
if hasattr(context, 'user_processing_tasks'):
for task in context.user_processing_tasks.values():
if task and not task.done():
task.cancel()
# 停止管理器任务
if self.manager_task and not self.manager_task.done():
@@ -91,14 +80,9 @@ class MessageManager:
"""添加消息到指定聊天流"""
# 获取或创建流上下文
if stream_id not in self.stream_contexts:
context = StreamContext(stream_id=stream_id)
# 为并发处理添加队列和锁
if global_config.chat.concurrent_message_processing:
context.send_lock = asyncio.Lock()
context.user_processing_tasks = {}
self.stream_contexts[stream_id] = context
self.stream_contexts[stream_id] = StreamContext(stream_id=stream_id)
self.stats.total_streams += 1
context = self.stream_contexts[stream_id]
context.set_chat_mode(ChatMode.FOCUS)
context.add_message(message)
@@ -116,9 +100,10 @@ class MessageManager:
await self._check_streams_with_individual_intervals()
# 计算下次检查时间(使用最小间隔或固定间隔)
next_check_delay = self.check_interval
if global_config.chat.dynamic_distribution_enabled:
next_check_delay = self._calculate_next_manager_delay()
else:
next_check_delay = self.check_interval
await asyncio.sleep(next_check_delay)
except asyncio.CancelledError:
@@ -127,63 +112,107 @@ class MessageManager:
logger.error(f"消息管理器循环出错: {e}")
traceback.print_exc()
async def _process_stream_messages(self, stream_id: str, unread_messages_override: List[DatabaseMessages]):
"""
处理指定聊天流的消息 (非并发模式专用)
"""
async def _check_all_streams(self):
"""检查所有聊天流"""
active_streams = 0
total_unread = 0
for stream_id, context in self.stream_contexts.items():
if not context.is_active:
continue
active_streams += 1
# 检查是否有未读消息
unread_messages = context.get_unread_messages()
if unread_messages:
total_unread += len(unread_messages)
# 如果没有处理任务,创建一个
if not context.processing_task or context.processing_task.done():
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id))
# 更新统计
self.stats.active_streams = active_streams
self.stats.total_unread_messages = total_unread
async def _process_stream_messages(self, stream_id: str):
"""处理指定聊天流的消息"""
if stream_id not in self.stream_contexts:
return
context = self.stream_contexts[stream_id]
context.processing_task = asyncio.current_task()
user_id = unread_messages_override[0].user_info.user_id if unread_messages_override and hasattr(unread_messages_override[0], 'user_info') else None
try:
await self._check_and_handle_interruption(context, stream_id, unread_messages_override, user_id)
# 获取未读消息
unread_messages = context.get_unread_messages()
if not unread_messages:
return
# 检查是否需要打断现有处理
await self._check_and_handle_interruption(context, stream_id)
# --- 睡眠状态检查 ---
if self.sleep_manager.is_sleeping():
logger.info(f"Bot正在睡觉检查聊天流 {stream_id} 是否有唤醒触发器。")
was_woken_up = False
is_private = context.is_private_chat()
for message in unread_messages_override:
for message in unread_messages:
is_mentioned = message.is_mentioned or False
if is_private or is_mentioned:
if self.wakeup_manager.add_wakeup_value(is_private, is_mentioned):
was_woken_up = True
break
break # 一旦被吵醒,就跳出循环并处理消息
if not was_woken_up:
logger.debug(f"聊天流 {stream_id} 中没有唤醒触发器,保持消息未读状态。")
self._clear_specific_unread_messages(context, unread_messages_override)
return
logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。")
return # 退出,不处理消息
logger.debug(f"开始处理聊天流 {stream_id} {len(unread_messages_override)} 条未读消息")
results = await self.chatter_manager.process_stream_context(stream_id, context, unread_messages_override)
if results.get("success", False):
logger.debug(f"聊天流 {stream_id} 处理成功")
else:
logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}")
self._clear_specific_unread_messages(context, unread_messages_override)
logger.info(f"Bot被聊天流 {stream_id} 中的消息吵醒,继续处理。")
# --- 睡眠状态检查结束 ---
logger.debug(f"开始处理聊天流 {stream_id}{len(unread_messages)} 条未读消息")
# 直接使用StreamContext对象进行处理
if unread_messages:
try:
# 记录当前chat type用于调试
logger.debug(f"聊天流 {stream_id} 检测到的chat type: {context.chat_type.value}")
# 发送到chatter manager传递StreamContext对象
results = await self.chatter_manager.process_stream_context(stream_id, context)
# 处理结果,标记消息为已读
if results.get("success", False):
self._clear_all_unread_messages(context)
logger.debug(f"聊天流 {stream_id} 处理成功,清除了 {len(unread_messages)} 条未读消息")
else:
logger.warning(f"聊天流 {stream_id} 处理失败: {results.get('error_message', '未知错误')}")
except Exception as e:
logger.error(f"处理聊天流 {stream_id} 时发生异常,将清除所有未读消息: {e}")
# 出现异常时也清除未读消息,避免重复处理
self._clear_all_unread_messages(context)
raise
logger.debug(f"聊天流 {stream_id} 消息处理完成")
except asyncio.CancelledError:
logger.info(f"聊天流 {stream_id} 的处理任务被取消")
self._clear_specific_unread_messages(context, unread_messages_override)
raise
except Exception as e:
logger.error(f"处理聊天流 {stream_id} 时发生异常: {e}")
logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}")
traceback.print_exc()
self._clear_specific_unread_messages(context, unread_messages_override)
finally:
context.processing_task = None
def deactivate_stream(self, stream_id: str):
"""停用聊天流"""
if stream_id in self.stream_contexts:
context = self.stream_contexts[stream_id]
context.is_active = False
if hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done():
# 取消处理任务
if context.processing_task and not context.processing_task.done():
context.processing_task.cancel()
logger.info(f"停用聊天流: {stream_id}")
@@ -206,7 +235,7 @@ class MessageManager:
unread_count=len(context.get_unread_messages()),
history_count=len(context.history_messages),
last_check_time=context.last_check_time,
has_active_task=bool(hasattr(context, 'processing_task') and context.processing_task and not context.processing_task.done()),
has_active_task=bool(context.processing_task and not context.processing_task.done()),
)
def get_manager_stats(self) -> Dict[str, Any]:
@@ -235,49 +264,30 @@ class MessageManager:
del self.stream_contexts[stream_id]
logger.info(f"清理不活跃聊天流: {stream_id}")
async def _check_and_handle_interruption(
self, context: StreamContext, stream_id: str, unread_messages: List[DatabaseMessages], user_id: Optional[str] = None
):
async def _check_and_handle_interruption(self, context: StreamContext, stream_id: str):
"""检查并处理消息打断"""
if not global_config.chat.interruption_enabled:
return
if context.interruption_count >= global_config.chat.interruption_max_limit:
logger.debug(f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},本次不进行打断")
return
task_to_check = None
if global_config.chat.concurrent_message_processing and global_config.chat.process_by_user_id and user_id:
task_to_check = context.user_processing_tasks.get(user_id)
else:
task_to_check = context.processing_task
if task_to_check and not task_to_check.done():
# 检查是否有正在进行的处理任务
if context.processing_task and not context.processing_task.done():
# 计算打断概率
interruption_probability = context.calculate_interruption_probability(
global_config.chat.interruption_max_limit, global_config.chat.interruption_probability_factor
)
# 根据概率决定是否打断
if random.random() < interruption_probability:
user_nickname = ""
if user_id and unread_messages:
for msg in unread_messages:
if hasattr(msg, "user_info") and msg.user_info and msg.user_info.user_id == user_id:
user_nickname = msg.user_info.user_nickname
break
if user_nickname:
log_target = f"用户'{user_nickname}({user_id})'在聊天流 {stream_id}"
else:
log_target = f"用户 {user_id} 在聊天流 {stream_id}" if user_id else f"聊天流 {stream_id}"
logger.info(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}")
logger.info(f"{log_target} 触发消息打断,打断概率: {interruption_probability:.2f}")
task_to_check.cancel()
# 取消现有任务
context.processing_task.cancel()
try:
await task_to_check
await context.processing_task
except asyncio.CancelledError:
pass
# 增加打断计数并应用afc阈值降低
context.increment_interruption_count()
context.apply_interruption_afc_reduction(global_config.chat.interruption_afc_reduction)
logger.info(
@@ -286,47 +296,145 @@ class MessageManager:
else:
logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}")
def _calculate_dynamic_distribution_interval(self, context: StreamContext) -> float:
"""计算单个聊天流的分发周期 - 基于阈值感知的focus_energy"""
def _calculate_dynamic_distribution_interval(self) -> float:
"""根据所有活跃聊天流的focus_energy动态计算分发周期"""
if not global_config.chat.dynamic_distribution_enabled:
return self.check_interval # 使用固定间隔
if not self.stream_contexts:
return self.check_interval # 默认间隔
# 计算活跃流的平均focus_energy
active_streams = [ctx for ctx in self.stream_contexts.values() if ctx.is_active]
if not active_streams:
return self.check_interval
focus_energy = 0.5
avg_message_interest = 0.5
total_focus_energy = 0.0
max_focus_energy = 0.0
stream_count = 0
if hasattr(context, 'chat_stream') and context.chat_stream:
focus_energy = context.chat_stream.focus_energy
if context.chat_stream.message_count > 0:
avg_message_interest = context.chat_stream.message_interest_total / context.chat_stream.message_count
for context in active_streams:
if hasattr(context, 'chat_stream') and context.chat_stream:
focus_energy = context.chat_stream.focus_energy
total_focus_energy += focus_energy
max_focus_energy = max(max_focus_energy, focus_energy)
stream_count += 1
reply_threshold = getattr(global_config.affinity_flow, 'reply_action_interest_threshold', 0.4)
non_reply_threshold = getattr(global_config.affinity_flow, 'non_reply_action_interest_threshold', 0.2)
high_match_threshold = getattr(global_config.affinity_flow, 'high_match_interest_threshold', 0.8)
if stream_count == 0:
return self.check_interval
avg_focus_energy = total_focus_energy / stream_count
# 使用配置参数
base_interval = global_config.chat.dynamic_distribution_base_interval
min_interval = global_config.chat.dynamic_distribution_min_interval
max_interval = global_config.chat.dynamic_distribution_max_interval
jitter_factor = global_config.chat.dynamic_distribution_jitter_factor
if avg_message_interest >= high_match_threshold:
interval_multiplier = 0.3 + (focus_energy - 0.7) * 2.0
elif avg_message_interest >= reply_threshold:
gap_from_reply = (avg_message_interest - reply_threshold) / (high_match_threshold - reply_threshold)
interval_multiplier = 0.6 + gap_from_reply * 0.4
elif avg_message_interest >= non_reply_threshold:
gap_from_non_reply = (avg_message_interest - non_reply_threshold) / (reply_threshold - non_reply_threshold)
interval_multiplier = 1.2 + gap_from_non_reply * 1.8
# 根据平均兴趣度调整间隔
# 高兴趣度 -> 更频繁检查 (间隔更短)
# 低兴趣度 -> 较少检查 (间隔更长)
if avg_focus_energy >= 0.7:
# 高兴趣度1-5秒
interval = base_interval * (1.0 - (avg_focus_energy - 0.7) * 2.0)
elif avg_focus_energy >= 0.4:
# 中等兴趣度5-15秒
interval = base_interval * (1.0 + (avg_focus_energy - 0.4) * 3.33)
else:
gap_ratio = max(0, avg_message_interest / non_reply_threshold)
interval_multiplier = 3.0 + (1.0 - gap_ratio) * 3.0
energy_adjustment = 1.0 + (focus_energy - 0.5) * 0.5
interval = base_interval * interval_multiplier * energy_adjustment
# 低兴趣度15-30秒
interval = base_interval * (3.0 + (0.4 - avg_focus_energy) * 5.0)
# 添加随机扰动避免同步
import random
jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor)
final_interval = interval * jitter
# 限制在合理范围内
final_interval = max(min_interval, min(max_interval, final_interval))
logger.debug(
f"动态分发周期: {final_interval:.2f}s | "
f"平均兴趣度: {avg_focus_energy:.3f} | "
f"活跃流数: {stream_count}"
)
return final_interval
def _calculate_stream_distribution_interval(self, context: StreamContext) -> float:
"""计算单个聊天流的分发周期 - 基于阈值感知的focus_energy"""
if not global_config.chat.dynamic_distribution_enabled:
return self.check_interval # 使用固定间隔
# 获取该流的focus_energy新的阈值感知版本
focus_energy = 0.5 # 默认值
avg_message_interest = 0.5 # 默认平均兴趣度
if hasattr(context, 'chat_stream') and context.chat_stream:
focus_energy = context.chat_stream.focus_energy
# 获取平均消息兴趣度用于更精确的计算
if context.chat_stream.message_count > 0:
avg_message_interest = context.chat_stream.message_interest_total / context.chat_stream.message_count
# 获取AFC阈值用于参考添加None值检查
reply_threshold = getattr(global_config.affinity_flow, 'reply_action_interest_threshold', 0.4)
non_reply_threshold = getattr(global_config.affinity_flow, 'non_reply_action_interest_threshold', 0.2)
high_match_threshold = getattr(global_config.affinity_flow, 'high_match_interest_threshold', 0.8)
# 使用配置参数
base_interval = global_config.chat.dynamic_distribution_base_interval
min_interval = global_config.chat.dynamic_distribution_min_interval
max_interval = global_config.chat.dynamic_distribution_max_interval
jitter_factor = global_config.chat.dynamic_distribution_jitter_factor
# 基于阈值感知的智能分发周期计算
if avg_message_interest >= high_match_threshold:
# 超高兴趣度:极快响应 (1-2秒)
interval_multiplier = 0.3 + (focus_energy - 0.7) * 2.0
elif avg_message_interest >= reply_threshold:
# 高兴趣度:快速响应 (2-6秒)
gap_from_reply = (avg_message_interest - reply_threshold) / (high_match_threshold - reply_threshold)
interval_multiplier = 0.6 + gap_from_reply * 0.4
elif avg_message_interest >= non_reply_threshold:
# 中等兴趣度:正常响应 (6-15秒)
gap_from_non_reply = (avg_message_interest - non_reply_threshold) / (reply_threshold - non_reply_threshold)
interval_multiplier = 1.2 + gap_from_non_reply * 1.8
else:
# 低兴趣度:缓慢响应 (15-30秒)
gap_ratio = max(0, avg_message_interest / non_reply_threshold)
interval_multiplier = 3.0 + (1.0 - gap_ratio) * 3.0
# 应用focus_energy微调
energy_adjustment = 1.0 + (focus_energy - 0.5) * 0.5
interval = base_interval * interval_multiplier * energy_adjustment
# 添加随机扰动避免同步
import random
jitter = random.uniform(1.0 - jitter_factor, 1.0 + jitter_factor)
final_interval = interval * jitter
# 限制在合理范围内
final_interval = max(min_interval, min(max_interval, final_interval))
# 根据兴趣度级别调整日志级别
if avg_message_interest >= high_match_threshold:
log_level = "info"
elif avg_message_interest >= reply_threshold:
log_level = "info"
else:
log_level = "debug"
log_msg = (
f"{context.stream_id} 分发周期: {final_interval:.2f}s | "
f"focus_energy: {focus_energy:.3f} | "
f"avg_interest: {avg_message_interest:.3f} | "
f"阈值参考: {non_reply_threshold:.2f}/{reply_threshold:.2f}/{high_match_threshold:.2f}"
)
if log_level == "info":
logger.info(log_msg)
else:
logger.debug(log_msg)
return final_interval
def _calculate_next_manager_delay(self) -> float:
@@ -334,6 +442,7 @@ class MessageManager:
current_time = time.time()
min_delay = float('inf')
# 找到最近需要检查的流
for context in self.stream_contexts.values():
if not context.is_active:
continue
@@ -342,11 +451,14 @@ class MessageManager:
if time_until_check > 0:
min_delay = min(min_delay, time_until_check)
else:
return 0.1
min_delay = 0.1 # 立即检查
break
# 如果没有活跃流,使用默认间隔
if min_delay == float('inf'):
return self.check_interval
# 确保最小延迟
return max(0.1, min(min_delay, self.check_interval))
async def _check_streams_with_individual_intervals(self):
@@ -358,106 +470,156 @@ class MessageManager:
if not context.is_active:
continue
# 检查是否达到检查时间
if current_time >= context.next_check_time:
# 更新检查时间
context.last_check_time = current_time
# 计算下次检查时间和分发周期
if global_config.chat.dynamic_distribution_enabled:
context.distribution_interval = self._calculate_stream_distribution_interval(context)
else:
context.distribution_interval = self.check_interval
# 设置下次检查时间
context.next_check_time = current_time + context.distribution_interval
# 检查未读消息
unread_messages = context.get_unread_messages()
if not unread_messages:
continue
if unread_messages:
processed_streams += 1
self.stats.total_unread_messages = len(unread_messages)
processed_streams += 1
if global_config.chat.concurrent_message_processing:
if global_config.chat.process_by_user_id:
user_messages = {}
for msg in unread_messages:
user_id = msg.user_info.user_id if hasattr(msg, 'user_info') and msg.user_info else 'unknown_user'
if user_id not in user_messages:
user_messages[user_id] = []
user_messages[user_id].append(msg)
for user_id, messages in user_messages.items():
await self._check_and_handle_interruption(context, stream_id, messages, user_id)
if not context.user_processing_tasks.get(user_id) or context.user_processing_tasks[user_id].done():
task = asyncio.create_task(self._process_and_send_reply(context, messages))
context.user_processing_tasks[user_id] = task
else:
# Fix: Ensure unread_messages is available in this branch
all_unread_messages = context.get_unread_messages()
if all_unread_messages:
if not global_config.chat.concurrent_message_processing:
await self._check_and_handle_interruption(context, stream_id, all_unread_messages)
if not context.processing_task or context.processing_task.done():
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id, all_unread_messages))
# 如果没有处理任务,创建一个
if not context.processing_task or context.processing_task.done():
focus_energy = context.chat_stream.focus_energy if hasattr(context, 'chat_stream') and context.chat_stream else 0.5
# 根据优先级记录日志
if focus_energy >= 0.7:
logger.info(
f"高优先级流 {stream_id} 开始处理 | "
f"focus_energy: {focus_energy:.3f} | "
f"分发周期: {context.distribution_interval:.2f}s | "
f"未读消息: {len(unread_messages)}"
)
else:
await self._check_and_handle_interruption(context, stream_id, all_unread_messages)
if not context.processing_task or context.processing_task.done():
task = asyncio.create_task(self._process_and_send_reply(context, all_unread_messages))
context.processing_task = task
# The original 'else' block for the 'if current_time >= context.next_check_time:' check
# was problematic. It seems it tried to process messages even when it wasn't time.
# Removing it should fix the UnboundLocalError and align with the logic of checking the time first.
async def _process_and_send_reply(self, context: StreamContext, unread_messages: list):
"""在后台处理单批消息并加锁发送 (并发模式专用)"""
if not self.concurrent_semaphore:
logger.error("并发信号量未初始化")
logger.debug(
f"{stream_id} 开始处理 | "
f"focus_energy: {focus_energy:.3f} | "
f"分发周期: {context.distribution_interval:.2f}s"
)
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id))
# 更新活跃流计数
active_count = sum(1 for ctx in self.stream_contexts.values() if ctx.is_active)
self.stats.active_streams = active_count
if processed_streams > 0:
logger.debug(
f"本次循环处理了 {processed_streams} 个流 | "
f"活跃流总数: {active_count}"
)
async def _check_all_streams_with_priority(self):
"""按优先级检查所有聊天流高focus_energy的流优先处理"""
if not self.stream_contexts:
return
user_id = unread_messages[0].user_info.user_id if global_config.chat.process_by_user_id and unread_messages and hasattr(unread_messages[0], 'user_info') else None
# 获取活跃的聊天流并按focus_energy排序
active_streams = []
for stream_id, context in self.stream_contexts.items():
if not context.is_active:
continue
async with self.concurrent_semaphore:
# 获取focus_energy如果不存在则使用默认值
focus_energy = 0.5
if hasattr(context, 'chat_stream') and context.chat_stream:
focus_energy = context.chat_stream.focus_energy
# 计算流优先级分数
priority_score = self._calculate_stream_priority(context, focus_energy)
active_streams.append((priority_score, stream_id, context))
# 按优先级降序排序
active_streams.sort(reverse=True, key=lambda x: x[0])
# 处理排序后的流
active_stream_count = 0
total_unread = 0
for priority_score, stream_id, context in active_streams:
active_stream_count += 1
# 检查是否有未读消息
unread_messages = context.get_unread_messages()
if unread_messages:
total_unread += len(unread_messages)
# 如果没有处理任务,创建一个
if not context.processing_task or context.processing_task.done():
context.processing_task = asyncio.create_task(self._process_stream_messages(stream_id))
# 高优先级流的额外日志
if priority_score > 0.7:
logger.info(
f"高优先级流 {stream_id} 开始处理 | "
f"优先级: {priority_score:.3f} | "
f"未读消息: {len(unread_messages)}"
)
# 更新统计
self.stats.active_streams = active_stream_count
self.stats.total_unread_messages = total_unread
def _calculate_stream_priority(self, context: StreamContext, focus_energy: float) -> float:
"""计算聊天流的优先级分数"""
# 基础优先级focus_energy
base_priority = focus_energy
# 未读消息数量加权
unread_count = len(context.get_unread_messages())
message_count_bonus = min(unread_count * 0.1, 0.3) # 最多30%加成
# 时间加权:最近活跃的流优先级更高
current_time = time.time()
time_since_active = current_time - context.last_check_time
time_penalty = max(0, 1.0 - time_since_active / 3600.0) # 1小时内无惩罚
# 连续无回复惩罚
if hasattr(context, 'chat_stream') and context.chat_stream:
consecutive_no_reply = context.chat_stream.consecutive_no_reply
no_reply_penalty = max(0, 1.0 - consecutive_no_reply * 0.05) # 每次无回复降低5%
else:
no_reply_penalty = 1.0
# 综合优先级计算
final_priority = (
base_priority * 0.6 + # 基础兴趣度权重60%
message_count_bonus * 0.2 + # 消息数量权重20%
time_penalty * 0.1 + # 时间权重10%
no_reply_penalty * 0.1 # 回复状态权重10%
)
return max(0.0, min(1.0, final_priority))
def _clear_all_unread_messages(self, context: StreamContext):
"""清除指定上下文中的所有未读消息,防止意外情况导致消息一直未读"""
unread_messages = context.get_unread_messages()
if not unread_messages:
return
logger.warning(f"正在清除 {len(unread_messages)} 条未读消息")
# 将所有未读消息标记为已读并移动到历史记录
for msg in unread_messages[:]: # 使用切片复制避免迭代时修改列表
try:
# 思考和发送都在锁内,确保单次回复的原子性
async with context.send_lock:
logger.debug(f"发送任务锁定聊天流 {context.stream_id}准备处理和回复")
results = await self.chatter_manager.process_stream_context(context.stream_id, context, unread_messages)
if results.get("success", False):
self._clear_specific_unread_messages(context, unread_messages)
logger.debug(f"聊天流 {context.stream_id} 并发处理成功,清除了 {len(unread_messages)} 条未读消息")
else:
logger.warning(f"聊天流 {context.stream_id} 并发处理失败: {results.get('error_message', '未知错误')}")
reply_delay = random.uniform(1.5, 3.0)
await asyncio.sleep(reply_delay)
logger.debug(f"发送任务解锁聊天流 {context.stream_id}")
except asyncio.CancelledError:
logger.info(f"用户 {user_id} 的任务被取消")
self._clear_specific_unread_messages(context, unread_messages) # 取消时也清除消息
raise
context.mark_message_as_read(msg.message_id)
self.stats.total_processed_messages += 1
logger.debug(f"强制清除消息 {msg.message_id}标记为已读")
except Exception as e:
logger.error(f"后台回复处理任务出错: {e}")
traceback.print_exc()
self._clear_specific_unread_messages(context, unread_messages)
finally:
if user_id and user_id in context.user_processing_tasks:
if context.user_processing_tasks[user_id] is asyncio.current_task():
del context.user_processing_tasks[user_id]
logger.error(f"清除消息 {msg.message_id}出错: {e}")
def _clear_specific_unread_messages(self, context: StreamContext, messages_to_clear: list):
"""清除指定上下文中的特定未读消息"""
if not messages_to_clear:
return
message_ids_to_clear = {msg.message_id for msg in messages_to_clear}
context.unread_messages = [msg for msg in context.unread_messages if msg.message_id not in message_ids_to_clear]
for msg in messages_to_clear:
context.history_messages.append(msg)
if len(context.history_messages) > 100:
context.history_messages = context.history_messages[-100:]
# 创建全局消息管理器实例
message_manager = MessageManager()

View File

@@ -74,9 +74,20 @@ class ChatConfig(ValidatedConfigBase):
"""聊天配置类"""
max_context_size: int = Field(default=18, description="最大上下文大小")
replyer_random_probability: float = Field(default=0.5, description="回复者随机概率")
thinking_timeout: int = Field(default=40, description="思考超时时间")
talk_frequency: float = Field(default=1.0, description="聊天频率")
mentioned_bot_inevitable_reply: bool = Field(default=False, description="提到机器人的必然回复")
at_bot_inevitable_reply: bool = Field(default=False, description="@机器人的必然回复")
allow_reply_self: bool = Field(default=False, description="是否允许回复自己说的话")
talk_frequency_adjust: list[list[str]] = Field(default_factory=lambda: [], description="聊天频率调整")
focus_value: float = Field(default=1.0, description="专注值")
focus_mode_quiet_groups: List[str] = Field(
default_factory=list,
description='专注模式下需要保持安静的群组列表, 格式: ["platform:group_id1", "platform:group_id2"]',
)
force_reply_private: bool = Field(default=False, description="强制回复私聊")
group_chat_mode: Literal["auto", "normal", "focus"] = Field(default="auto", description="群聊模式")
timestamp_display_mode: Literal["normal", "normal_no_YMD", "relative"] = Field(
default="normal_no_YMD", description="时间戳显示模式"
)
@@ -117,57 +128,46 @@ class ChatConfig(ValidatedConfigBase):
dynamic_distribution_jitter_factor: float = Field(
default=0.2, ge=0.0, le=0.5, description="分发间隔随机扰动因子"
)
# 并发消息处理
concurrent_message_processing: bool = Field(
default=False, description="是否启用并发消息处理,在同一聊天流中并行处理多个消息"
)
concurrent_per_user_limit: int = Field(
default=3, description="在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数"
)
process_by_user_id: bool = Field(
default=True, description="在并发模式下是否按用户ID进行独立串行处理"
)
def get_current_talk_frequency(self, chat_stream_id: Optional[str] = None) -> float:
"""
根据当前时间和聊天流获取对应的 talk_frequency
Args:
chat_stream_id: 聊天流ID格式为 "platform:chat_id:type"
Returns:
float: 对应的频率值
"""
if not self.talk_frequency_adjust:
return 1.0
return self.talk_frequency
# 优先检查聊天流特定的配置
if chat_stream_id:
stream_frequency = self._get_stream_specific_frequency(chat_stream_id)
if stream_frequency is not None:
return stream_frequency
# 检查全局时段配置(第一个元素为空字符串的配置)
global_frequency = self._get_global_frequency()
return 1.0 if global_frequency is None else global_frequency
return self.talk_frequency if global_frequency is None else global_frequency
def _get_time_based_frequency(self, time_freq_list: list[str]) -> Optional[float]:
"""
根据时间配置列表获取当前时段的频率
Args:
time_freq_list: 时间频率配置列表,格式为 ["HH:MM,frequency", ...]
Returns:
float: 频率值,如果没有配置则返回 None
"""
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
current_hour, current_minute = map(int, current_time.split(":"))
current_minutes = current_hour * 60 + current_minute
# 解析时间频率配置
time_freq_pairs = []
for time_freq_str in time_freq_list:
@@ -179,13 +179,13 @@ class ChatConfig(ValidatedConfigBase):
time_freq_pairs.append((minutes, frequency))
except (ValueError, IndexError):
continue
if not time_freq_pairs:
return None
# 按时间排序
time_freq_pairs.sort(key=lambda x: x[0])
# 查找当前时间对应的频率
current_frequency = None
for minutes, frequency in time_freq_pairs:
@@ -193,20 +193,20 @@ class ChatConfig(ValidatedConfigBase):
current_frequency = frequency
else:
break
# 如果当前时间在所有配置时间之前,使用最后一个时间段的频率(跨天逻辑)
if current_frequency is None and time_freq_pairs:
current_frequency = time_freq_pairs[-1][1]
return current_frequency
def _get_stream_specific_frequency(self, chat_stream_id: str):
"""
获取特定聊天流在当前时间的频率
Args:
chat_stream_id: 聊天流ID哈希值
Returns:
float: 频率值,如果没有配置则返回 None
"""
@@ -214,30 +214,30 @@ class ChatConfig(ValidatedConfigBase):
for config_item in self.talk_frequency_adjust:
if not config_item or len(config_item) < 2:
continue
stream_config_str = config_item[0] # 例如 "qq:1026294844:group"
# 解析配置字符串并生成对应的 chat_id
config_chat_id = self._parse_stream_config_to_chat_id(stream_config_str)
if config_chat_id is None:
continue
# 比较生成的 chat_id
if config_chat_id != chat_stream_id:
continue
# 使用通用的时间频率解析方法
return self._get_time_based_frequency(config_item[1:])
return None
def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]:
"""
解析流配置字符串并生成对应的 chat_id
Args:
stream_config_str: 格式为 "platform:id:type" 的字符串
Returns:
str: 生成的 chat_id如果解析失败则返回 None
"""
@@ -245,42 +245,42 @@ class ChatConfig(ValidatedConfigBase):
parts = stream_config_str.split(":")
if len(parts) != 3:
return None
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
# 判断是否为群聊
is_group = stream_type == "group"
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id
import hashlib
if is_group:
components = [platform, str(id_str)]
else:
components = [platform, str(id_str), "private"]
key = "_".join(components)
return hashlib.md5(key.encode()).hexdigest()
except (ValueError, IndexError):
return None
def _get_global_frequency(self) -> Optional[float]:
"""
获取全局默认频率配置
Returns:
float: 频率值,如果没有配置则返回 None
"""
for config_item in self.talk_frequency_adjust:
if not config_item or len(config_item) < 2:
continue
# 检查是否为全局默认配置(第一个元素为空字符串)
if config_item[0] == "":
return self._get_time_based_frequency(config_item[1:])
return None
@@ -313,10 +313,10 @@ class ExpressionConfig(ValidatedConfigBase):
def _parse_stream_config_to_chat_id(self, stream_config_str: str) -> Optional[str]:
"""
解析流配置字符串并生成对应的 chat_id
Args:
stream_config_str: 格式为 "platform:id:type" 的字符串
Returns:
str: 生成的 chat_id如果解析失败则返回 None
"""
@@ -324,52 +324,52 @@ class ExpressionConfig(ValidatedConfigBase):
parts = stream_config_str.split(":")
if len(parts) != 3:
return None
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
# 判断是否为群聊
is_group = stream_type == "group"
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 chat_id
import hashlib
if is_group:
components = [platform, str(id_str)]
else:
components = [platform, str(id_str), "private"]
key = "_".join(components)
return hashlib.md5(key.encode()).hexdigest()
except (ValueError, IndexError):
return None
def get_expression_config_for_chat(self, chat_stream_id: Optional[str] = None) -> tuple[bool, bool, float]:
"""
根据聊天流ID获取表达配置
Args:
chat_stream_id: 聊天流ID格式为哈希值
Returns:
tuple: (是否使用表达, 是否学习表达, 学习间隔)
"""
if not self.rules:
# 如果没有配置使用默认值启用表达启用学习强度1.0
return True, True, 1.0
# 优先检查聊天流特定的配置
if chat_stream_id:
for rule in self.rules:
if rule.chat_stream_id and self._parse_stream_config_to_chat_id(rule.chat_stream_id) == chat_stream_id:
return rule.use_expression, rule.learn_expression, rule.learning_strength
# 检查全局配置chat_stream_id为空字符串的配置
for rule in self.rules:
if rule.chat_stream_id == "":
return rule.use_expression, rule.learn_expression, rule.learning_strength
# 如果都没有匹配,返回默认值
return True, True, 1.0
@@ -443,7 +443,7 @@ class KeywordRuleConfig(ValidatedConfigBase):
def __post_init__(self):
import re
if not self.keywords and not self.regex:
raise ValueError("关键词规则必须至少包含keywords或regex中的一个")
if not self.reaction:
@@ -466,6 +466,7 @@ class CustomPromptConfig(ValidatedConfigBase):
"""自定义提示词配置类"""
image_prompt: str = Field(default="", description="图片提示词")
planner_custom_prompt_enable: bool = Field(default=False, description="启用规划器自定义提示词")
planner_custom_prompt_content: str = Field(default="", description="规划器自定义提示词内容")

View File

@@ -53,13 +53,12 @@ class AffinityChatter(BaseChatter):
}
self.last_activity_time = time.time()
async def execute(self, context: StreamContext, unread_messages: list | None = None) -> dict:
async def execute(self, context: StreamContext) -> dict:
"""
处理StreamContext对象
Args:
context: StreamContext对象包含聊天流的所有消息信息
unread_messages: (可选) 指定要处理的未读消息列表,用于并发处理
Returns:
处理结果字典
@@ -69,12 +68,10 @@ class AffinityChatter(BaseChatter):
learner = expression_learner_manager.get_expression_learner(self.stream_id)
asyncio.create_task(learner.trigger_learning_for_chat())
# 如果没有提供未读消息列表,则从上下文中获取
if unread_messages is None:
unread_messages = context.get_unread_messages()
unread_messages = context.get_unread_messages()
# 使用增强版规划器处理消息
actions, target_message = await self.planner.plan(context=context, unread_messages=unread_messages)
actions, target_message = await self.planner.plan(context=context)
self.stats["plans_created"] += 1
# 执行动作(如果规划器返回了动作)

View File

@@ -218,6 +218,10 @@ class ChatterPlanFilter:
self.last_obs_time_mark = time.time()
mentioned_bonus = ""
if global_config.chat.mentioned_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你"
if global_config.chat.at_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你或者at你"
if plan.mode == ChatMode.FOCUS:
no_action_block = """

View File

@@ -70,15 +70,12 @@ class ChatterActionPlanner:
"other_actions_executed": 0,
}
async def plan(
self, context: "StreamContext" = None, unread_messages: Optional[List[Dict]] = None
) -> Tuple[List[Dict], Optional[Dict]]:
async def plan(self, context: "StreamContext" = None) -> Tuple[List[Dict], Optional[Dict]]:
"""
执行完整的增强版规划流程。
Args:
context (StreamContext): 包含聊天流消息的上下文对象。
unread_messages (Optional[List[Dict]]): (可选) 指定要处理的未读消息列表,用于并发处理
Returns:
Tuple[List[Dict], Optional[Dict]]: 一个元组,包含:
@@ -88,16 +85,14 @@ class ChatterActionPlanner:
try:
self.planner_stats["total_plans"] += 1
return await self._enhanced_plan_flow(context, unread_messages)
return await self._enhanced_plan_flow(context)
except Exception as e:
logger.error(f"规划流程出错: {e}")
self.planner_stats["failed_plans"] += 1
return [], None
async def _enhanced_plan_flow(
self, context: "StreamContext", unread_messages: Optional[List[Dict]] = None
) -> Tuple[List[Dict], Optional[Dict]]:
async def _enhanced_plan_flow(self, context: "StreamContext") -> Tuple[List[Dict], Optional[Dict]]:
"""执行增强版规划流程"""
try:
# 在规划前,先进行动作修改
@@ -111,10 +106,7 @@ class ChatterActionPlanner:
# 确保Plan中包含所有当前可用的动作
initial_plan.available_actions = self.action_manager.get_using_actions()
# 如果没有提供未读消息列表,则从上下文中获取
if unread_messages is None:
unread_messages = context.get_unread_messages() if context else []
unread_messages = context.get_unread_messages() if context else []
# 2. 兴趣度评分 - 只对未读消息进行评分
if unread_messages:
bot_nickname = global_config.bot.nickname

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.0.4"
version = "7.0.2"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -132,13 +132,6 @@ dynamic_distribution_min_interval = 1.0 # 最小分发间隔(秒)
dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒)
dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子
# 是否启用并发消息处理,在同一聊天流中并行处理多个消息
concurrent_message_processing = false
# 在并发模式下,每个聊天流(群/私聊)同时处理的最大用户数
concurrent_per_user_limit = 3
# 在并发模式下是否按用户ID进行独立串行处理
process_by_user_id = true
talk_frequency_adjust = [
["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"],
["qq:114514:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"],