feat: 增加priority模式

This commit is contained in:
tcmofashi
2025-06-28 18:41:44 +08:00
parent 1c57e68f13
commit e3480e989e
2 changed files with 267 additions and 56 deletions

View File

@@ -1,7 +1,7 @@
import asyncio
import time
import traceback
from random import random
from typing import List, Dict, Optional, Any
from typing import List, Optional, Dict # 导入类型提示
import os
import pickle
@@ -11,6 +11,8 @@ from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
from src.manager.mood_manager import mood_manager
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.chat.utils.timer_calculator import Timer
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.utils.prompt_builder import global_prompt_manager
from .normal_chat_generator import NormalChatGenerator
from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet
@@ -31,6 +33,8 @@ from src.chat.utils.chat_message_builder import (
get_raw_msg_before_timestamp_with_chat,
num_new_messages_since,
)
from .priority_manager import PriorityManager
import traceback
willing_manager = get_willing_manager()
@@ -46,64 +50,57 @@ SEGMENT_CLEANUP_CONFIG = {
class NormalChat:
def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None):
"""初始化 NormalChat 实例。只进行同步操作。"""
"""
普通聊天处理类,负责处理非核心对话的聊天逻辑。
每个聊天私聊或群聊都会有一个独立的NormalChat实例。
"""
def __init__(self, chat_stream: ChatStream):
"""
初始化NormalChat实例。
Args:
chat_stream (ChatStream): 聊天流对象,包含与特定聊天相关的所有信息。
"""
self.chat_stream = chat_stream
self.stream_id = chat_stream.stream_id
self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id
self.stream_name = chat_stream.get_name()
self.willing_amplifier = 1.0 # 回复意愿放大器,动态调整
self.enable_planner = global_config.normal_chat.get("enable_planner", False) # 是否启用planner
self.action_manager = ActionManager(chat_stream) # 初始化动作管理器
self.action_type: Optional[str] = None # 当前动作类型
self.is_parallel_action: bool = False # 是否是可并行动作
# 初始化Normal Chat专用表达器
self.expressor = NormalChatExpressor(self.chat_stream)
self.replyer = DefaultReplyer(self.chat_stream)
# Interest dict
self.interest_dict = interest_dict
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.stream_id)
self.willing_amplifier = 1
self.start_time = time.time()
# Other sync initializations
self.gpt = NormalChatGenerator()
self.mood_manager = mood_manager
self.start_time = time.time()
# 任务管理
self._chat_task: Optional[asyncio.Task] = None
self._initialized = False # Track initialization status
self._disabled = False # 停用标志
# Planner相关初始化
self.action_manager = ActionManager()
self.planner = NormalChatPlanner(self.stream_name, self.action_manager)
self.action_modifier = NormalChatActionModifier(self.action_manager, self.stream_id, self.stream_name)
self.enable_planner = global_config.normal_chat.enable_planner # 从配置中读取是否启用planner
# 消息段缓存,用于关系构建
self.person_engaged_cache: Dict[str, List[Dict[str, Any]]] = {}
self.last_cleanup_time = time.time()
# 记录最近回复内容,每项包含: {time, user_message, response, is_mentioned, is_reference_reply}
self.recent_replies = []
self.max_replies_history = 20 # 最多保存最近20条回复记录
# 最近回复记录
self.recent_replies: List[Dict[str, Any]] = []
# 新的消息段缓存结构:
# {person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...]}
self.person_engaged_cache: Dict[str, List[Dict[str, any]]] = {}
# 新增:回复模式和优先级管理器
self.reply_mode = global_config.chat.get_reply_mode(self.stream_id)
if self.reply_mode == "priority":
interest_dict = self.chat_stream.interest_dict or {}
self.priority_manager = PriorityManager(
interest_dict=interest_dict,
normal_queue_max_size=global_config.chat.get("priority_queue_max_size", 5),
)
else:
self.priority_manager = None
# 持久化存储文件路径
self.cache_file_path = os.path.join("data", "relationship", f"relationship_cache_{self.stream_id}.pkl")
# 最后处理的消息时间,避免重复处理相同消息
self.last_processed_message_time = 0.0
# 最后清理时间,用于定期清理老消息段
self.last_cleanup_time = 0.0
# 添加回调函数用于在满足条件时通知切换到focus_chat模式
self.on_switch_to_focus_callback = on_switch_to_focus_callback
self._disabled = False # 增加停用标志
# 加载持久化的缓存
self._load_cache()
logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。")
async def disable(self):
"""停用 NormalChat 实例,停止所有后台任务"""
self._disabled = True
if self._chat_task and not self._chat_task.done():
self._chat_task.cancel()
if self.reply_mode == "priority" and self._priority_chat_task and not self._priority_chat_task.done():
self._priority_chat_task.cancel()
logger.info(f"[{self.stream_name}] NormalChat 已停用。")
# ================================
# 缓存管理模块
@@ -405,6 +402,35 @@ class NormalChat:
f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}"
)
async def _priority_chat_loop(self):
"""
使用优先级队列的消息处理循环。
"""
while not self._disabled:
try:
if not self.priority_manager.is_empty():
# 获取最高优先级的消息
message_to_process = self.priority_manager.get_highest_priority_message()
if message_to_process:
logger.info(
f"[{self.stream_name}] 从队列中取出消息进行处理: User {message_to_process.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message_to_process.message_info.time))}"
)
# 检查是否应该回复
async with self.chat_stream.get_process_lock():
await self._process_chat_message(message_to_process)
# 等待一段时间再检查队列
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。")
break
except Exception as e:
logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {e}", exc_info=True)
# 出现错误时,等待更长时间避免频繁报错
await asyncio.sleep(10)
# 改为实例方法
async def _create_thinking_message(self, message: MessageRecv, timestamp: Optional[float] = None) -> str:
"""创建思考消息"""
@@ -602,15 +628,33 @@ class NormalChat:
# 改为实例方法, 移除 chat 参数
async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None:
# 新增:如果已停用,直接返回
"""
处理接收到的消息。
根据回复模式,决定是立即处理还是放入优先级队列。
"""
if self._disabled:
return
# 根据回复模式决定行为
if self.reply_mode == "priority":
# 优先模式下,所有消息都进入管理器
if self.priority_manager:
self.priority_manager.add_message(message)
return
# --- 以下为原有的 "兴趣" 模式逻辑 ---
await self._process_message(message, is_mentioned, interested_rate)
async def _process_message(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None:
"""
实际处理单条消息的逻辑,包括意愿判断、回复生成、动作执行等。
"""
if self._disabled:
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
return
# 新增在auto模式下检查是否需要直接切换到focus模式
if global_config.chat.chat_mode == "auto":
should_switch = await self._check_should_switch_to_focus()
if should_switch:
if await self._should_switch_to_focus(message, is_mentioned, interested_rate):
logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件直接执行切换")
if self.on_switch_to_focus_callback:
await self.on_switch_to_focus_callback()
@@ -864,8 +908,11 @@ class NormalChat:
self._chat_task = None
try:
logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务")
polling_task = asyncio.create_task(self._reply_interested_message())
logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}")
if self.reply_mode == "priority":
polling_task = asyncio.create_task(self._priority_reply_loop())
else: # 默认或 "interest" 模式
polling_task = asyncio.create_task(self._reply_interested_message())
# 设置回调
polling_task.add_done_callback(lambda t: self._handle_task_completion(t))
@@ -986,6 +1033,52 @@ class NormalChat:
# 返回最近的limit条记录按时间倒序排列
return sorted(self.recent_replies[-limit:], key=lambda x: x["time"], reverse=True)
async def _priority_reply_loop(self) -> None:
"""
[优先级模式] 循环获取并处理最高优先级的消息。
"""
logger.info(f"[{self.stream_name}] 已启动优先级回复模式循环。")
try:
while not self._disabled:
if self.priority_manager is None:
logger.error(f"[{self.stream_name}] 处于优先级模式,但 priority_manager 未初始化。")
await asyncio.sleep(5)
continue
# 动态调整回复频率
self.adjust_reply_frequency()
# 从优先级队列中获取消息
highest_priority_message = self.priority_manager.get_highest_priority_message()
if highest_priority_message:
message = highest_priority_message
logger.debug(
f"[{self.stream_name}] 从优先级队列中取出消息进行处理: {message.processed_plain_text[:30]}..."
)
# 复用现有的消息处理逻辑
# 需要计算 is_mentioned 和 interested_rate
is_mentioned = message.is_mentioned
# 对于优先级模式,我们可以认为取出的消息就是我们感兴趣的
# 或者我们可以从 priority_manager 的 PrioritizedMessage 中获取原始兴趣分
# 这里我们先用一个较高的固定值,或者从消息本身获取
interested_rate = 1.0 # 简化处理,或者可以传递更精确的值
await self._process_message(message, is_mentioned, interested_rate)
# 处理完一条消息后可以稍微等待,避免过于频繁地连续回复
await asyncio.sleep(global_config.chat.get("priority_post_reply_delay", 1.0))
else:
# 如果队列为空,等待一段时间
await asyncio.sleep(global_config.chat.get("priority_empty_queue_delay", 0.5))
except asyncio.CancelledError:
logger.debug(f"[{self.stream_name}] 优先级回复任务被取消。")
raise # 重新抛出异常
except Exception as e:
logger.error(f"[{self.stream_name}] 优先级回复循环异常: {e}", exc_info=True)
def adjust_reply_frequency(self):
"""
根据预设规则动态调整回复意愿willing_amplifier

View File

@@ -0,0 +1,118 @@
import time
import heapq
import math
from typing import List, Tuple, Dict, Any, Optional
from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet
from src.common.logger import get_logger
logger = get_logger("normal_chat")
class PrioritizedMessage:
"""带有优先级的消息对象"""
def __init__(self, message: MessageRecv, interest_score: float, is_vip: bool = False):
self.message = message
self.arrival_time = time.time()
self.interest_score = interest_score
self.is_vip = is_vip
self.priority = self.calculate_priority()
def calculate_priority(self, decay_rate: float = 0.01) -> float:
"""
计算优先级分数。
优先级 = 兴趣分 * exp(-衰减率 * 消息年龄)
"""
age = time.time() - self.arrival_time
decay_factor = math.exp(-decay_rate * age)
priority = self.interest_score * decay_factor
return priority
def __lt__(self, other: "PrioritizedMessage") -> bool:
"""用于堆排序的比较函数,我们想要一个最大堆,所以用 >"""
return self.priority > other.priority
class PriorityManager:
"""
管理消息队列,根据优先级选择消息进行处理。
"""
def __init__(self, interest_dict: Dict[str, float], normal_queue_max_size: int = 5):
self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆)
self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆)
self.interest_dict = interest_dict if interest_dict is not None else {}
self.normal_queue_max_size = normal_queue_max_size
self.vip_users = self.interest_dict.get("vip_users", []) # 假设vip用户在interest_dict中指定
def _get_interest_score(self, user_id: str) -> float:
"""获取用户的兴趣分默认为1.0"""
return self.interest_dict.get("interests", {}).get(user_id, 1.0)
def _is_vip(self, user_id: str) -> bool:
"""检查用户是否为VIP"""
return user_id in self.vip_users
def add_message(self, message: MessageRecv):
"""
添加新消息到合适的队列中。
"""
user_id = message.message_info.user_info.user_id
is_vip = self._is_vip(user_id)
interest_score = self._get_interest_score(user_id)
p_message = PrioritizedMessage(message, interest_score, is_vip)
if is_vip:
heapq.heappush(self.vip_queue, p_message)
logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}")
else:
if len(self.normal_queue) >= self.normal_queue_max_size:
# 如果队列已满,只在消息优先级高于最低优先级消息时才添加
if p_message.priority > self.normal_queue[0].priority:
heapq.heapreplace(self.normal_queue, p_message)
logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}")
else:
logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}")
else:
heapq.heappush(self.normal_queue, p_message)
logger.debug(
f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}"
)
def get_highest_priority_message(self) -> Optional[MessageRecv]:
"""
从VIP和普通队列中获取当前最高优先级的消息。
"""
# 更新所有消息的优先级
for p_msg in self.vip_queue:
p_msg.priority = p_msg.calculate_priority()
for p_msg in self.normal_queue:
p_msg.priority = p_msg.calculate_priority()
# 重建堆
heapq.heapify(self.vip_queue)
heapq.heapify(self.normal_queue)
vip_msg = self.vip_queue[0] if self.vip_queue else None
normal_msg = self.normal_queue[0] if self.normal_queue else None
if vip_msg and normal_msg:
if vip_msg.priority >= normal_msg.priority:
return heapq.heappop(self.vip_queue).message
else:
return heapq.heappop(self.normal_queue).message
elif vip_msg:
return heapq.heappop(self.vip_queue).message
elif normal_msg:
return heapq.heappop(self.normal_queue).message
else:
return None
def is_empty(self) -> bool:
"""检查所有队列是否为空"""
return not self.vip_queue and not self.normal_queue
def get_queue_status(self) -> str:
"""获取队列状态信息"""
return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}"