feat: 增加适用于直播等场景的新回复策略,在ada发送特定消息段的情况下可以按照优先度同一时间只回复一人

This commit is contained in:
tcmofashi
2025-07-01 10:26:29 +08:00
parent c7fc6e57ff
commit 97ab4a242e
4 changed files with 323 additions and 261 deletions

View File

@@ -47,6 +47,16 @@ class ChatMessageContext:
return False
return True
def get_priority_mode(self) -> str:
"""获取优先级模式"""
return self.message.priority_mode
def get_priority_info(self) -> Optional[dict]:
"""获取优先级信息"""
if hasattr(self.message, "priority_info") and self.message.priority_info:
return self.message.priority_info
return None
class ChatStream:
"""聊天流对象,存储一个完整的聊天上下文"""

View File

@@ -108,6 +108,9 @@ class MessageRecv(Message):
self.detailed_plain_text = message_dict.get("detailed_plain_text", "")
self.is_emoji = False
self.is_picid = False
self.is_mentioned = 0.0
self.priority_mode = "interest"
self.priority_info = None
def update_chat_stream(self, chat_stream: "ChatStream"):
self.chat_stream = chat_stream
@@ -146,8 +149,27 @@ class MessageRecv(Message):
if isinstance(segment.data, str):
return await get_image_manager().get_emoji_description(segment.data)
return "[发了一个表情包,网卡了加载不出来]"
elif segment.type == "mention_bot":
self.is_mentioned = float(segment.data)
return ""
elif segment.type == "set_priority_mode":
# 处理设置优先级模式的消息段
if isinstance(segment.data, str):
self.priority_mode = segment.data
return ""
elif segment.type == "priority_info":
if isinstance(segment.data, dict):
# 处理优先级信息
self.priority_info = segment.data
"""
{
'message_type': 'vip', # vip or normal
'message_priority': 1.0, # 优先级大为优先float
}
"""
return ""
else:
return f"[{segment.type}:{str(segment.data)}]"
return ""
except Exception as e:
logger.error(f"处理消息段失败: {str(e)}, 类型: {segment.type}, 数据: {segment.data}")
return f"[处理失败的{segment.type}消息]"

View File

@@ -6,7 +6,7 @@ import os
import pickle
from maim_message import UserInfo, Seg
from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.chat.utils.timer_calculator import Timer
from src.chat.utils.prompt_builder import global_prompt_manager
@@ -27,6 +27,15 @@ from src.chat.utils.chat_message_builder import (
from .priority_manager import PriorityManager
import traceback
from .normal_chat_generator import NormalChatGenerator
from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor
from src.chat.replyer.default_generator import DefaultReplyer
from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner
from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
from src.manager.mood_manager import mood_manager
willing_manager = get_willing_manager()
logger = get_logger("normal_chat")
@@ -46,7 +55,7 @@ class NormalChat:
每个聊天私聊或群聊都会有一个独立的NormalChat实例。
"""
def __init__(self, chat_stream: ChatStream):
def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None):
"""
初始化NormalChat实例。
@@ -55,10 +64,61 @@ class NormalChat:
"""
self.chat_stream = chat_stream
self.stream_id = chat_stream.stream_id
self.stream_name = chat_stream.get_name()
self.willing_amplifier = 1.0 # 回复意愿放大器,动态调整
self.enable_planner = global_config.normal_chat.get("enable_planner", False) # 是否启用planner
self.action_manager = ActionManager(chat_stream) # 初始化动作管理
self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id
# 初始化Normal Chat专用表达
self.expressor = NormalChatExpressor(self.chat_stream)
self.replyer = DefaultReplyer(self.chat_stream)
# Interest dict
self.interest_dict = interest_dict
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.stream_id)
self.willing_amplifier = 1
self.start_time = time.time()
# Other sync initializations
self.gpt = NormalChatGenerator()
self.mood_manager = mood_manager
self.start_time = time.time()
self._initialized = False # Track initialization status
# Planner相关初始化
self.action_manager = ActionManager()
self.planner = NormalChatPlanner(self.stream_name, self.action_manager)
self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name)
self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner
# 记录最近的回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply}
self.recent_replies = []
self.max_replies_history = 20 # 最多保存最近20条回复记录
# 新的消息段缓存结构:
# {person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...]}
self.person_engaged_cache: Dict[str, List[Dict[str, any]]] = {}
# 持久化存储文件路径
self.cache_file_path = os.path.join("data", "relationship", f"relationship_cache_{self.stream_id}.pkl")
# 最后处理的消息时间,避免重复处理相同消息
self.last_processed_message_time = 0.0
# 最后清理时间,用于定期清理老消息段
self.last_cleanup_time = 0.0
# 添加回调函数用于在满足条件时通知切换到focus_chat模式
self.on_switch_to_focus_callback = on_switch_to_focus_callback
self._disabled = False # 增加停用标志
# 加载持久化的缓存
self._load_cache()
logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。")
self.action_type: Optional[str] = None # 当前动作类型
self.is_parallel_action: bool = False # 是否是可并行动作
@@ -66,20 +126,15 @@ class NormalChat:
self._chat_task: Optional[asyncio.Task] = None
self._disabled = False # 停用标志
# 消息段缓存,用于关系构建
self.person_engaged_cache: Dict[str, List[Dict[str, Any]]] = {}
self.last_cleanup_time = time.time()
# 最近回复记录
self.recent_replies: List[Dict[str, Any]] = []
self.on_switch_to_focus_callback = on_switch_to_focus_callback
# 新增:回复模式和优先级管理器
self.reply_mode = global_config.chat.get_reply_mode(self.stream_id)
self.reply_mode = self.chat_stream.context.get_priority_mode()
if self.reply_mode == "priority":
interest_dict = self.chat_stream.interest_dict or {}
interest_dict = interest_dict or {}
self.priority_manager = PriorityManager(
interest_dict=interest_dict,
normal_queue_max_size=global_config.chat.get("priority_queue_max_size", 5),
normal_queue_max_size=5,
)
else:
self.priority_manager = None
@@ -393,6 +448,29 @@ class NormalChat:
f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}"
)
async def _priority_chat_loop_add_message(self):
while not self._disabled:
try:
ids = list(self.interest_dict.keys())
for msg_id in ids:
message, interest_value, _ = self.interest_dict[msg_id]
if not self._disabled:
# 更新消息段信息
self._update_user_message_segments(message)
# 添加消息到优先级管理器
if self.priority_manager:
self.priority_manager.add_message(message, interest_value)
self.interest_dict.pop(msg_id, None)
except Exception as e:
logger.error(
f"[{self.stream_name}] 优先级聊天循环添加消息时出现错误: {traceback.format_exc()}", exc_info=True
)
print(traceback.format_exc())
# 出现错误时,等待一段时间再重试
raise
await asyncio.sleep(0.1)
async def _priority_chat_loop(self):
"""
使用优先级队列的消息处理循环。
@@ -401,15 +479,22 @@ class NormalChat:
try:
if not self.priority_manager.is_empty():
# 获取最高优先级的消息
message_to_process = self.priority_manager.get_highest_priority_message()
message = self.priority_manager.get_highest_priority_message()
if message_to_process:
if message:
logger.info(
f"[{self.stream_name}] 从队列中取出消息进行处理: User {message_to_process.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message_to_process.message_info.time))}"
f"[{self.stream_name}] 从队列中取出消息进行处理: User {message.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message.message_info.time))}"
)
# 检查是否应该回复
async with self.chat_stream.get_process_lock():
await self._process_chat_message(message_to_process)
# 执行定期清理
self._cleanup_old_segments()
# 更新消息段信息
self._update_user_message_segments(message)
# 检查是否有用户满足关系构建条件
asyncio.create_task(self._check_relation_building_conditions())
await self.reply_one_message(message)
# 等待一段时间再检查队列
await asyncio.sleep(1)
@@ -418,7 +503,7 @@ class NormalChat:
logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。")
break
except Exception as e:
logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {e}", exc_info=True)
logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {traceback.format_exc()}", exc_info=True)
# 出现错误时,等待更长时间避免频繁报错
await asyncio.sleep(10)
@@ -645,7 +730,7 @@ class NormalChat:
# 新增在auto模式下检查是否需要直接切换到focus模式
if global_config.chat.chat_mode == "auto":
if await self._should_switch_to_focus(message, is_mentioned, interested_rate):
if await self._check_should_switch_to_focus():
logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件直接执行切换")
if self.on_switch_to_focus_callback:
await self.on_switch_to_focus_callback()
@@ -695,176 +780,10 @@ class NormalChat:
do_reply = False
response_set = None # 初始化 response_set
if random() < reply_probability:
do_reply = True
# 回复前处理
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
thinking_id = await self._create_thinking_message(message)
# 如果启用planner预先修改可用actions避免在并行任务中重复调用
available_actions = None
if self.enable_planner:
try:
await self.action_modifier.modify_actions_for_normal_chat(
self.chat_stream, self.recent_replies, message.processed_plain_text
)
available_actions = self.action_manager.get_using_actions_for_mode("normal")
except Exception as e:
logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}")
available_actions = None
# 定义并行执行的任务
async def generate_normal_response():
"""生成普通回复"""
try:
return await self.gpt.generate_response(
message=message,
thinking_id=thinking_id,
enable_planner=self.enable_planner,
available_actions=available_actions,
)
except Exception as e:
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None
async def plan_and_execute_actions():
"""规划和执行额外动作"""
if not self.enable_planner:
logger.debug(f"[{self.stream_name}] Planner未启用跳过动作规划")
return None
try:
# 获取发送者名称(动作修改已在并行执行前完成)
sender_name = self._get_sender_name(message)
no_action = {
"action_result": {
"action_type": "no_action",
"action_data": {},
"reasoning": "规划器初始化默认",
"is_parallel": True,
},
"chat_context": "",
"action_prompt": "",
}
# 检查是否应该跳过规划
if self.action_modifier.should_skip_planning():
logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划")
self.action_type = "no_action"
return no_action
# 执行规划
plan_result = await self.planner.plan(message, sender_name)
action_type = plan_result["action_result"]["action_type"]
action_data = plan_result["action_result"]["action_data"]
reasoning = plan_result["action_result"]["reasoning"]
is_parallel = plan_result["action_result"].get("is_parallel", False)
logger.info(
f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}"
)
self.action_type = action_type # 更新实例属性
self.is_parallel_action = is_parallel # 新增:保存并行执行标志
# 如果规划器决定不执行任何动作
if action_type == "no_action":
logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作")
return no_action
# 执行额外的动作(不影响回复生成)
action_result = await self._execute_action(action_type, action_data, message, thinking_id)
if action_result is not None:
logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成")
else:
logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败")
return {
"action_type": action_type,
"action_data": action_data,
"reasoning": reasoning,
"is_parallel": is_parallel,
}
except Exception as e:
logger.error(f"[{self.stream_name}] Planner执行失败: {e}")
return no_action
# 并行执行回复生成和动作规划
self.action_type = None # 初始化动作类型
self.is_parallel_action = False # 初始化并行动作标志
with Timer("并行生成回复和规划", timing_results):
response_set, plan_result = await asyncio.gather(
generate_normal_response(), plan_and_execute_actions(), return_exceptions=True
)
# 处理生成回复的结果
if isinstance(response_set, Exception):
logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}")
response_set = None
# 处理规划结果(可选,不影响回复)
if isinstance(plan_result, Exception):
logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}")
elif plan_result:
logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}")
if not response_set or (
self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action
):
if not response_set:
logger.info(f"[{self.stream_name}] 模型未生成回复内容")
elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action:
logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)")
# 如果模型未生成回复,移除思考消息
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
for msg in container.messages[:]:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
container.messages.remove(msg)
logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}")
break
# 需要在此处也调用 not_reply_handle 和 delete 吗?
# 如果是因为模型没回复,也算是一种 "未回复"
await willing_manager.not_reply_handle(message.message_info.message_id)
willing_manager.delete(message.message_info.message_id)
return # 不执行后续步骤
# logger.info(f"[{self.stream_name}] 回复内容: {response_set}")
if self._disabled:
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
return
# 发送回复 (不再需要传入 chat)
with Timer("消息发送", timing_results):
first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id)
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
if first_bot_msg:
# 消息段已在接收消息时更新,这里不需要额外处理
# 记录回复信息到最近回复列表中
reply_info = {
"time": time.time(),
"user_message": message.processed_plain_text,
"user_info": {
"user_id": message.message_info.user_info.user_id,
"user_nickname": message.message_info.user_info.user_nickname,
},
"response": response_set,
"is_mentioned": is_mentioned,
"is_reference_reply": message.reply is not None, # 判断是否为引用回复
"timing": {k: round(v, 2) for k, v in timing_results.items()},
}
self.recent_replies.append(reply_info)
# 保持最近回复历史在限定数量内
if len(self.recent_replies) > self.max_replies_history:
self.recent_replies = self.recent_replies[-self.max_replies_history :]
# 回复后处理
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
with Timer("获取回复", timing_results):
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
do_reply = await self.reply_one_message(message)
response_set = do_reply if do_reply else None
# 输出性能计时结果
if do_reply and response_set: # 确保 response_set 不是 None
timing_str = " | ".join([f"{step}: {duration:.2f}" for step, duration in timing_results.items()])
@@ -873,6 +792,7 @@ class NormalChat:
logger.info(
f"[{self.stream_name}]回复消息: {trigger_msg[:30]}... | 回复内容: {response_msg[:30]}... | 计时: {timing_str}"
)
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
elif not do_reply:
# 不回复处理
await willing_manager.not_reply_handle(message.message_info.message_id)
@@ -880,6 +800,167 @@ class NormalChat:
# 意愿管理器注销当前message信息 (无论是否回复,只要处理过就删除)
willing_manager.delete(message.message_info.message_id)
async def reply_one_message(self, message: MessageRecv) -> None:
# 回复前处理
thinking_id = await self._create_thinking_message(message)
# 如果启用planner预先修改可用actions避免在并行任务中重复调用
available_actions = None
if self.enable_planner:
try:
await self.action_modifier.modify_actions_for_normal_chat(
self.chat_stream, self.recent_replies, message.processed_plain_text
)
available_actions = self.action_manager.get_using_actions_for_mode("normal")
except Exception as e:
logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}")
available_actions = None
# 定义并行执行的任务
async def generate_normal_response():
"""生成普通回复"""
try:
return await self.gpt.generate_response(
message=message,
thinking_id=thinking_id,
enable_planner=self.enable_planner,
available_actions=available_actions,
)
except Exception as e:
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None
async def plan_and_execute_actions():
"""规划和执行额外动作"""
if not self.enable_planner:
logger.debug(f"[{self.stream_name}] Planner未启用跳过动作规划")
return None
try:
# 获取发送者名称(动作修改已在并行执行前完成)
sender_name = self._get_sender_name(message)
no_action = {
"action_result": {
"action_type": "no_action",
"action_data": {},
"reasoning": "规划器初始化默认",
"is_parallel": True,
},
"chat_context": "",
"action_prompt": "",
}
# 检查是否应该跳过规划
if self.action_modifier.should_skip_planning():
logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划")
self.action_type = "no_action"
return no_action
# 执行规划
plan_result = await self.planner.plan(message, sender_name)
action_type = plan_result["action_result"]["action_type"]
action_data = plan_result["action_result"]["action_data"]
reasoning = plan_result["action_result"]["reasoning"]
is_parallel = plan_result["action_result"].get("is_parallel", False)
logger.info(
f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}"
)
self.action_type = action_type # 更新实例属性
self.is_parallel_action = is_parallel # 新增:保存并行执行标志
# 如果规划器决定不执行任何动作
if action_type == "no_action":
logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作")
return no_action
# 执行额外的动作(不影响回复生成)
action_result = await self._execute_action(action_type, action_data, message, thinking_id)
if action_result is not None:
logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成")
else:
logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败")
return {
"action_type": action_type,
"action_data": action_data,
"reasoning": reasoning,
"is_parallel": is_parallel,
}
except Exception as e:
logger.error(f"[{self.stream_name}] Planner执行失败: {e}")
return no_action
# 并行执行回复生成和动作规划
self.action_type = None # 初始化动作类型
self.is_parallel_action = False # 初始化并行动作标志
response_set, plan_result = await asyncio.gather(
generate_normal_response(), plan_and_execute_actions(), return_exceptions=True
)
# 处理生成回复的结果
if isinstance(response_set, Exception):
logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}")
response_set = None
# 处理规划结果(可选,不影响回复)
if isinstance(plan_result, Exception):
logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}")
elif plan_result:
logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}")
if not response_set or (
self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action
):
if not response_set:
logger.info(f"[{self.stream_name}] 模型未生成回复内容")
elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action:
logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)")
# 如果模型未生成回复,移除思考消息
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
for msg in container.messages[:]:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
container.messages.remove(msg)
logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}")
break
# 需要在此处也调用 not_reply_handle 和 delete 吗?
# 如果是因为模型没回复,也算是一种 "未回复"
return False
# logger.info(f"[{self.stream_name}] 回复内容: {response_set}")
if self._disabled:
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
return False
# 发送回复 (不再需要传入 chat)
first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id)
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
if first_bot_msg:
# 消息段已在接收消息时更新,这里不需要额外处理
# 记录回复信息到最近回复列表中
reply_info = {
"time": time.time(),
"user_message": message.processed_plain_text,
"user_info": {
"user_id": message.message_info.user_info.user_id,
"user_nickname": message.message_info.user_info.user_nickname,
},
"response": response_set,
# "is_mentioned": is_mentioned,
"is_reference_reply": message.reply is not None, # 判断是否为引用回复
# "timing": {k: round(v, 2) for k, v in timing_results.items()},
}
self.recent_replies.append(reply_info)
# 保持最近回复历史在限定数量内
if len(self.recent_replies) > self.max_replies_history:
self.recent_replies = self.recent_replies[-self.max_replies_history :]
return response_set if response_set else False
# 改为实例方法, 移除 chat 参数
async def start_chat(self):
@@ -899,9 +980,14 @@ class NormalChat:
self._chat_task = None
try:
logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}")
logger.info(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}")
if self.reply_mode == "priority":
polling_task = asyncio.create_task(self._priority_reply_loop())
polling_task_send = asyncio.create_task(self._priority_chat_loop())
polling_task_recv = asyncio.create_task(self._priority_chat_loop_add_message())
print("555")
polling_task = asyncio.gather(polling_task_send, polling_task_recv)
print("666")
else: # 默认或 "interest" 模式
polling_task = asyncio.create_task(self._reply_interested_message())
@@ -942,7 +1028,7 @@ class NormalChat:
# 尝试获取异常,但不抛出
exc = task.exception()
if exc:
logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}")
logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}", exc_info=exc)
else:
logger.debug(f"[{self.stream_name}] 任务正常完成")
except Exception as e:
@@ -1024,52 +1110,6 @@ class NormalChat:
# 返回最近的limit条记录按时间倒序排列
return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True)
async def _priority_reply_loop(self) -> None:
"""
[优先级模式] 循环获取并处理最高优先级的消息。
"""
logger.info(f"[{self.stream_name}] 已启动优先级回复模式循环。")
try:
while not self._disabled:
if self.priority_manager is None:
logger.error(f"[{self.stream_name}] 处于优先级模式,但 priority_manager 未初始化。")
await asyncio.sleep(5)
continue
# 动态调整回复频率
self.adjust_reply_frequency()
# 从优先级队列中获取消息
highest_priority_message = self.priority_manager.get_highest_priority_message()
if highest_priority_message:
message = highest_priority_message
logger.debug(
f"[{self.stream_name}] 从优先级队列中取出消息进行处理: {message.processed_plain_text[:30]}..."
)
# 复用现有的消息处理逻辑
# 需要计算 is_mentioned 和 interested_rate
is_mentioned = message.is_mentioned
# 对于优先级模式,我们可以认为取出的消息就是我们感兴趣的
# 或者我们可以从 priority_manager 的 PrioritizedMessage 中获取原始兴趣分
# 这里我们先用一个较高的固定值,或者从消息本身获取
interested_rate = 1.0 # 简化处理,或者可以传递更精确的值
await self._process_message(message, is_mentioned, interested_rate)
# 处理完一条消息后可以稍微等待,避免过于频繁地连续回复
await asyncio.sleep(global_config.chat.get("priority_post_reply_delay", 1.0))
else:
# 如果队列为空,等待一段时间
await asyncio.sleep(global_config.chat.get("priority_empty_queue_delay", 0.5))
except asyncio.CancelledError:
logger.debug(f"[{self.stream_name}] 优先级回复任务被取消。")
raise # 重新抛出异常
except Exception as e:
logger.error(f"[{self.stream_name}] 优先级回复循环异常: {e}", exc_info=True)
def adjust_reply_frequency(self):
"""
根据预设规则动态调整回复意愿willing_amplifier

View File

@@ -11,10 +11,10 @@ logger = get_logger("normal_chat")
class PrioritizedMessage:
"""带有优先级的消息对象"""
def __init__(self, message: MessageRecv, interest_score: float, is_vip: bool = False):
def __init__(self, message: MessageRecv, interest_scores: List[float], is_vip: bool = False):
self.message = message
self.arrival_time = time.time()
self.interest_score = interest_score
self.interest_scores = interest_scores
self.is_vip = is_vip
self.priority = self.calculate_priority()
@@ -25,7 +25,7 @@ class PrioritizedMessage:
"""
age = time.time() - self.arrival_time
decay_factor = math.exp(-decay_rate * age)
priority = self.interest_score * decay_factor
priority = sum(self.interest_scores) + decay_factor
return priority
def __lt__(self, other: "PrioritizedMessage") -> bool:
@@ -43,25 +43,20 @@ class PriorityManager:
self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆)
self.interest_dict = interest_dict if interest_dict is not None else {}
self.normal_queue_max_size = normal_queue_max_size
self.vip_users = self.interest_dict.get("vip_users", []) # 假设vip用户在interest_dict中指定
def _get_interest_score(self, user_id: str) -> float:
"""获取用户的兴趣分默认为1.0"""
return self.interest_dict.get("interests", {}).get(user_id, 1.0)
def _is_vip(self, user_id: str) -> bool:
"""检查用户是否为VIP"""
return user_id in self.vip_users
def add_message(self, message: MessageRecv):
def add_message(self, message: MessageRecv, interest_score: Optional[float] = None):
"""
添加新消息到合适的队列中。
"""
user_id = message.message_info.user_info.user_id
is_vip = self._is_vip(user_id)
interest_score = self._get_interest_score(user_id)
is_vip = message.priority_info.get("message_type") == "vip" if message.priority_info else False
message_priority = message.priority_info.get("message_priority", 0.0) if message.priority_info else 0.0
p_message = PrioritizedMessage(message, interest_score, is_vip)
p_message = PrioritizedMessage(message, [interest_score, message_priority], is_vip)
if is_vip:
heapq.heappush(self.vip_queue, p_message)
@@ -97,12 +92,7 @@ class PriorityManager:
vip_msg = self.vip_queue[0] if self.vip_queue else None
normal_msg = self.normal_queue[0] if self.normal_queue else None
if vip_msg and normal_msg:
if vip_msg.priority >= normal_msg.priority:
return heapq.heappop(self.vip_queue).message
else:
return heapq.heappop(self.normal_queue).message
elif vip_msg:
if vip_msg:
return heapq.heappop(self.vip_queue).message
elif normal_msg:
return heapq.heappop(self.normal_queue).message