Merge pull request #1074 from tcmofashi/dev
feat: 增加适用于单线程场景的模式priority,由ada触发
This commit is contained in:
@@ -47,6 +47,16 @@ class ChatMessageContext:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_priority_mode(self) -> str:
|
||||
"""获取优先级模式"""
|
||||
return self.message.priority_mode
|
||||
|
||||
def get_priority_info(self) -> Optional[dict]:
|
||||
"""获取优先级信息"""
|
||||
if hasattr(self.message, "priority_info") and self.message.priority_info:
|
||||
return self.message.priority_info
|
||||
return None
|
||||
|
||||
|
||||
class ChatStream:
|
||||
"""聊天流对象,存储一个完整的聊天上下文"""
|
||||
|
||||
@@ -108,6 +108,9 @@ class MessageRecv(Message):
|
||||
self.detailed_plain_text = message_dict.get("detailed_plain_text", "")
|
||||
self.is_emoji = False
|
||||
self.is_picid = False
|
||||
self.is_mentioned = 0.0
|
||||
self.priority_mode = "interest"
|
||||
self.priority_info = None
|
||||
|
||||
def update_chat_stream(self, chat_stream: "ChatStream"):
|
||||
self.chat_stream = chat_stream
|
||||
@@ -146,8 +149,27 @@ class MessageRecv(Message):
|
||||
if isinstance(segment.data, str):
|
||||
return await get_image_manager().get_emoji_description(segment.data)
|
||||
return "[发了一个表情包,网卡了加载不出来]"
|
||||
elif segment.type == "mention_bot":
|
||||
self.is_mentioned = float(segment.data)
|
||||
return ""
|
||||
elif segment.type == "set_priority_mode":
|
||||
# 处理设置优先级模式的消息段
|
||||
if isinstance(segment.data, str):
|
||||
self.priority_mode = segment.data
|
||||
return ""
|
||||
elif segment.type == "priority_info":
|
||||
if isinstance(segment.data, dict):
|
||||
# 处理优先级信息
|
||||
self.priority_info = segment.data
|
||||
"""
|
||||
{
|
||||
'message_type': 'vip', # vip or normal
|
||||
'message_priority': 1.0, # 优先级,大为优先,float
|
||||
}
|
||||
"""
|
||||
return ""
|
||||
else:
|
||||
return f"[{segment.type}:{str(segment.data)}]"
|
||||
return ""
|
||||
except Exception as e:
|
||||
logger.error(f"处理消息段失败: {str(e)}, 类型: {segment.type}, 数据: {segment.data}")
|
||||
return f"[处理失败的{segment.type}消息]"
|
||||
|
||||
@@ -1,28 +1,21 @@
|
||||
import asyncio
|
||||
import time
|
||||
import traceback
|
||||
from random import random
|
||||
from typing import List, Optional, Dict # 导入类型提示
|
||||
from typing import List, Dict, Optional
|
||||
import os
|
||||
import pickle
|
||||
from maim_message import UserInfo, Seg
|
||||
from src.common.logger import get_logger
|
||||
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
|
||||
from src.manager.mood_manager import mood_manager
|
||||
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
|
||||
from src.chat.utils.timer_calculator import Timer
|
||||
|
||||
from src.chat.utils.prompt_builder import global_prompt_manager
|
||||
from .normal_chat_generator import NormalChatGenerator
|
||||
from ..message_receive.message import MessageSending, MessageRecv, MessageThinking, MessageSet
|
||||
from src.chat.message_receive.message_sender import message_manager
|
||||
from src.chat.normal_chat.willing.willing_manager import get_willing_manager
|
||||
from src.chat.normal_chat.normal_chat_utils import get_recent_message_stats
|
||||
from src.config.config import global_config
|
||||
from src.chat.focus_chat.planners.action_manager import ActionManager
|
||||
from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner
|
||||
from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier
|
||||
from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor
|
||||
from src.chat.replyer.default_generator import DefaultReplyer
|
||||
from src.person_info.person_info import PersonInfoManager
|
||||
from src.person_info.relationship_manager import get_relationship_manager
|
||||
from src.chat.utils.chat_message_builder import (
|
||||
@@ -31,6 +24,17 @@ from src.chat.utils.chat_message_builder import (
|
||||
get_raw_msg_before_timestamp_with_chat,
|
||||
num_new_messages_since,
|
||||
)
|
||||
from .priority_manager import PriorityManager
|
||||
import traceback
|
||||
|
||||
from .normal_chat_generator import NormalChatGenerator
|
||||
from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor
|
||||
from src.chat.replyer.default_generator import DefaultReplyer
|
||||
from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner
|
||||
from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier
|
||||
|
||||
from src.chat.heart_flow.utils_chat import get_chat_type_and_target_info
|
||||
from src.manager.mood_manager import mood_manager
|
||||
|
||||
willing_manager = get_willing_manager()
|
||||
|
||||
@@ -46,11 +50,21 @@ SEGMENT_CLEANUP_CONFIG = {
|
||||
|
||||
|
||||
class NormalChat:
|
||||
def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None):
|
||||
"""初始化 NormalChat 实例。只进行同步操作。"""
|
||||
"""
|
||||
普通聊天处理类,负责处理非核心对话的聊天逻辑。
|
||||
每个聊天(私聊或群聊)都会有一个独立的NormalChat实例。
|
||||
"""
|
||||
|
||||
def __init__(self, chat_stream: ChatStream, interest_dict: dict = None, on_switch_to_focus_callback=None):
|
||||
"""
|
||||
初始化NormalChat实例。
|
||||
|
||||
Args:
|
||||
chat_stream (ChatStream): 聊天流对象,包含与特定聊天相关的所有信息。
|
||||
"""
|
||||
self.chat_stream = chat_stream
|
||||
self.stream_id = chat_stream.stream_id
|
||||
|
||||
self.stream_name = get_chat_manager().get_stream_name(self.stream_id) or self.stream_id
|
||||
|
||||
# 初始化Normal Chat专用表达器
|
||||
@@ -69,7 +83,7 @@ class NormalChat:
|
||||
self.gpt = NormalChatGenerator()
|
||||
self.mood_manager = mood_manager
|
||||
self.start_time = time.time()
|
||||
self._chat_task: Optional[asyncio.Task] = None
|
||||
|
||||
self._initialized = False # Track initialization status
|
||||
|
||||
# Planner相关初始化
|
||||
@@ -105,6 +119,35 @@ class NormalChat:
|
||||
|
||||
logger.debug(f"[{self.stream_name}] NormalChat 初始化完成 (异步部分)。")
|
||||
|
||||
self.action_type: Optional[str] = None # 当前动作类型
|
||||
self.is_parallel_action: bool = False # 是否是可并行动作
|
||||
|
||||
# 任务管理
|
||||
self._chat_task: Optional[asyncio.Task] = None
|
||||
self._disabled = False # 停用标志
|
||||
|
||||
self.on_switch_to_focus_callback = on_switch_to_focus_callback
|
||||
|
||||
# 新增:回复模式和优先级管理器
|
||||
self.reply_mode = self.chat_stream.context.get_priority_mode()
|
||||
if self.reply_mode == "priority":
|
||||
interest_dict = interest_dict or {}
|
||||
self.priority_manager = PriorityManager(
|
||||
interest_dict=interest_dict,
|
||||
normal_queue_max_size=5,
|
||||
)
|
||||
else:
|
||||
self.priority_manager = None
|
||||
|
||||
async def disable(self):
|
||||
"""停用 NormalChat 实例,停止所有后台任务"""
|
||||
self._disabled = True
|
||||
if self._chat_task and not self._chat_task.done():
|
||||
self._chat_task.cancel()
|
||||
if self.reply_mode == "priority" and self._priority_chat_task and not self._priority_chat_task.done():
|
||||
self._priority_chat_task.cancel()
|
||||
logger.info(f"[{self.stream_name}] NormalChat 已停用。")
|
||||
|
||||
# ================================
|
||||
# 缓存管理模块
|
||||
# 负责持久化存储、状态管理、缓存读写
|
||||
@@ -405,6 +448,65 @@ class NormalChat:
|
||||
f"[{self.stream_name}] 更新用户 {person_id} 的消息段,消息时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg_time))}"
|
||||
)
|
||||
|
||||
async def _priority_chat_loop_add_message(self):
|
||||
while not self._disabled:
|
||||
try:
|
||||
ids = list(self.interest_dict.keys())
|
||||
for msg_id in ids:
|
||||
message, interest_value, _ = self.interest_dict[msg_id]
|
||||
if not self._disabled:
|
||||
# 更新消息段信息
|
||||
self._update_user_message_segments(message)
|
||||
|
||||
# 添加消息到优先级管理器
|
||||
if self.priority_manager:
|
||||
self.priority_manager.add_message(message, interest_value)
|
||||
self.interest_dict.pop(msg_id, None)
|
||||
except Exception:
|
||||
logger.error(
|
||||
f"[{self.stream_name}] 优先级聊天循环添加消息时出现错误: {traceback.format_exc()}", exc_info=True
|
||||
)
|
||||
print(traceback.format_exc())
|
||||
# 出现错误时,等待一段时间再重试
|
||||
raise
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def _priority_chat_loop(self):
|
||||
"""
|
||||
使用优先级队列的消息处理循环。
|
||||
"""
|
||||
while not self._disabled:
|
||||
try:
|
||||
if not self.priority_manager.is_empty():
|
||||
# 获取最高优先级的消息
|
||||
message = self.priority_manager.get_highest_priority_message()
|
||||
|
||||
if message:
|
||||
logger.info(
|
||||
f"[{self.stream_name}] 从队列中取出消息进行处理: User {message.message_info.user_info.user_id}, Time: {time.strftime('%H:%M:%S', time.localtime(message.message_info.time))}"
|
||||
)
|
||||
# 执行定期清理
|
||||
self._cleanup_old_segments()
|
||||
|
||||
# 更新消息段信息
|
||||
self._update_user_message_segments(message)
|
||||
|
||||
# 检查是否有用户满足关系构建条件
|
||||
asyncio.create_task(self._check_relation_building_conditions())
|
||||
|
||||
await self.reply_one_message(message)
|
||||
|
||||
# 等待一段时间再检查队列
|
||||
await asyncio.sleep(1)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"[{self.stream_name}] 优先级聊天循环被取消。")
|
||||
break
|
||||
except Exception:
|
||||
logger.error(f"[{self.stream_name}] 优先级聊天循环出现错误: {traceback.format_exc()}", exc_info=True)
|
||||
# 出现错误时,等待更长时间避免频繁报错
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# 改为实例方法
|
||||
async def _create_thinking_message(self, message: MessageRecv, timestamp: Optional[float] = None) -> str:
|
||||
"""创建思考消息"""
|
||||
@@ -602,15 +704,33 @@ class NormalChat:
|
||||
|
||||
# 改为实例方法, 移除 chat 参数
|
||||
async def normal_response(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None:
|
||||
# 新增:如果已停用,直接返回
|
||||
"""
|
||||
处理接收到的消息。
|
||||
根据回复模式,决定是立即处理还是放入优先级队列。
|
||||
"""
|
||||
if self._disabled:
|
||||
return
|
||||
|
||||
# 根据回复模式决定行为
|
||||
if self.reply_mode == "priority":
|
||||
# 优先模式下,所有消息都进入管理器
|
||||
if self.priority_manager:
|
||||
self.priority_manager.add_message(message)
|
||||
return
|
||||
|
||||
# --- 以下为原有的 "兴趣" 模式逻辑 ---
|
||||
await self._process_message(message, is_mentioned, interested_rate)
|
||||
|
||||
async def _process_message(self, message: MessageRecv, is_mentioned: bool, interested_rate: float) -> None:
|
||||
"""
|
||||
实际处理单条消息的逻辑,包括意愿判断、回复生成、动作执行等。
|
||||
"""
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
|
||||
return
|
||||
|
||||
# 新增:在auto模式下检查是否需要直接切换到focus模式
|
||||
if global_config.chat.chat_mode == "auto":
|
||||
should_switch = await self._check_should_switch_to_focus()
|
||||
if should_switch:
|
||||
if await self._check_should_switch_to_focus():
|
||||
logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,直接执行切换")
|
||||
if self.on_switch_to_focus_callback:
|
||||
await self.on_switch_to_focus_callback()
|
||||
@@ -660,174 +780,10 @@ class NormalChat:
|
||||
do_reply = False
|
||||
response_set = None # 初始化 response_set
|
||||
if random() < reply_probability:
|
||||
do_reply = True
|
||||
|
||||
# 回复前处理
|
||||
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
|
||||
|
||||
thinking_id = await self._create_thinking_message(message)
|
||||
|
||||
# 如果启用planner,预先修改可用actions(避免在并行任务中重复调用)
|
||||
available_actions = None
|
||||
if self.enable_planner:
|
||||
try:
|
||||
await self.action_modifier.modify_actions_for_normal_chat(
|
||||
self.chat_stream, self.recent_replies, message.processed_plain_text
|
||||
)
|
||||
available_actions = self.action_manager.get_using_actions_for_mode("normal")
|
||||
except Exception as e:
|
||||
logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}")
|
||||
available_actions = None
|
||||
|
||||
# 定义并行执行的任务
|
||||
async def generate_normal_response():
|
||||
"""生成普通回复"""
|
||||
try:
|
||||
return await self.gpt.generate_response(
|
||||
message=message,
|
||||
available_actions=available_actions,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
async def plan_and_execute_actions():
|
||||
"""规划和执行额外动作"""
|
||||
if not self.enable_planner:
|
||||
logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划")
|
||||
return None
|
||||
|
||||
try:
|
||||
# 获取发送者名称(动作修改已在并行执行前完成)
|
||||
sender_name = self._get_sender_name(message)
|
||||
|
||||
no_action = {
|
||||
"action_result": {
|
||||
"action_type": "no_action",
|
||||
"action_data": {},
|
||||
"reasoning": "规划器初始化默认",
|
||||
"is_parallel": True,
|
||||
},
|
||||
"chat_context": "",
|
||||
"action_prompt": "",
|
||||
}
|
||||
|
||||
# 检查是否应该跳过规划
|
||||
if self.action_modifier.should_skip_planning():
|
||||
logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划")
|
||||
self.action_type = "no_action"
|
||||
return no_action
|
||||
|
||||
# 执行规划
|
||||
plan_result = await self.planner.plan(message, sender_name)
|
||||
action_type = plan_result["action_result"]["action_type"]
|
||||
action_data = plan_result["action_result"]["action_data"]
|
||||
reasoning = plan_result["action_result"]["reasoning"]
|
||||
is_parallel = plan_result["action_result"].get("is_parallel", False)
|
||||
|
||||
logger.info(
|
||||
f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}"
|
||||
)
|
||||
self.action_type = action_type # 更新实例属性
|
||||
self.is_parallel_action = is_parallel # 新增:保存并行执行标志
|
||||
|
||||
# 如果规划器决定不执行任何动作
|
||||
if action_type == "no_action":
|
||||
logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作")
|
||||
return no_action
|
||||
|
||||
# 执行额外的动作(不影响回复生成)
|
||||
action_result = await self._execute_action(action_type, action_data, message, thinking_id)
|
||||
if action_result is not None:
|
||||
logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成")
|
||||
else:
|
||||
logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败")
|
||||
|
||||
return {
|
||||
"action_type": action_type,
|
||||
"action_data": action_data,
|
||||
"reasoning": reasoning,
|
||||
"is_parallel": is_parallel,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] Planner执行失败: {e}")
|
||||
return no_action
|
||||
|
||||
# 并行执行回复生成和动作规划
|
||||
self.action_type = None # 初始化动作类型
|
||||
self.is_parallel_action = False # 初始化并行动作标志
|
||||
with Timer("并行生成回复和规划", timing_results):
|
||||
response_set, plan_result = await asyncio.gather(
|
||||
generate_normal_response(), plan_and_execute_actions(), return_exceptions=True
|
||||
)
|
||||
|
||||
# 处理生成回复的结果
|
||||
if isinstance(response_set, Exception):
|
||||
logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}")
|
||||
response_set = None
|
||||
|
||||
# 处理规划结果(可选,不影响回复)
|
||||
if isinstance(plan_result, Exception):
|
||||
logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}")
|
||||
elif plan_result:
|
||||
logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}")
|
||||
|
||||
if not response_set or (
|
||||
self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action
|
||||
):
|
||||
if not response_set:
|
||||
logger.info(f"[{self.stream_name}] 模型未生成回复内容")
|
||||
elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action:
|
||||
logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)")
|
||||
# 如果模型未生成回复,移除思考消息
|
||||
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
|
||||
for msg in container.messages[:]:
|
||||
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
|
||||
container.messages.remove(msg)
|
||||
logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}")
|
||||
break
|
||||
# 需要在此处也调用 not_reply_handle 和 delete 吗?
|
||||
# 如果是因为模型没回复,也算是一种 "未回复"
|
||||
await willing_manager.not_reply_handle(message.message_info.message_id)
|
||||
willing_manager.delete(message.message_info.message_id)
|
||||
return # 不执行后续步骤
|
||||
|
||||
# logger.info(f"[{self.stream_name}] 回复内容: {response_set}")
|
||||
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
|
||||
return
|
||||
|
||||
# 发送回复 (不再需要传入 chat)
|
||||
with Timer("消息发送", timing_results):
|
||||
first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id)
|
||||
|
||||
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
|
||||
if first_bot_msg:
|
||||
# 消息段已在接收消息时更新,这里不需要额外处理
|
||||
|
||||
# 记录回复信息到最近回复列表中
|
||||
reply_info = {
|
||||
"time": time.time(),
|
||||
"user_message": message.processed_plain_text,
|
||||
"user_info": {
|
||||
"user_id": message.message_info.user_info.user_id,
|
||||
"user_nickname": message.message_info.user_info.user_nickname,
|
||||
},
|
||||
"response": response_set,
|
||||
"is_mentioned": is_mentioned,
|
||||
"is_reference_reply": message.reply is not None, # 判断是否为引用回复
|
||||
"timing": {k: round(v, 2) for k, v in timing_results.items()},
|
||||
}
|
||||
self.recent_replies.append(reply_info)
|
||||
# 保持最近回复历史在限定数量内
|
||||
if len(self.recent_replies) > self.max_replies_history:
|
||||
self.recent_replies = self.recent_replies[-self.max_replies_history :]
|
||||
|
||||
# 回复后处理
|
||||
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
|
||||
|
||||
with Timer("获取回复", timing_results):
|
||||
await willing_manager.before_generate_reply_handle(message.message_info.message_id)
|
||||
do_reply = await self.reply_one_message(message)
|
||||
response_set = do_reply if do_reply else None
|
||||
# 输出性能计时结果
|
||||
if do_reply and response_set: # 确保 response_set 不是 None
|
||||
timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()])
|
||||
@@ -836,6 +792,7 @@ class NormalChat:
|
||||
logger.info(
|
||||
f"[{self.stream_name}]回复消息: {trigger_msg[:30]}... | 回复内容: {response_msg[:30]}... | 计时: {timing_str}"
|
||||
)
|
||||
await willing_manager.after_generate_reply_handle(message.message_info.message_id)
|
||||
elif not do_reply:
|
||||
# 不回复处理
|
||||
await willing_manager.not_reply_handle(message.message_info.message_id)
|
||||
@@ -843,6 +800,168 @@ class NormalChat:
|
||||
# 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除)
|
||||
willing_manager.delete(message.message_info.message_id)
|
||||
|
||||
async def reply_one_message(self, message: MessageRecv) -> None:
|
||||
# 回复前处理
|
||||
thinking_id = await self._create_thinking_message(message)
|
||||
|
||||
|
||||
# 如果启用planner,预先修改可用actions(避免在并行任务中重复调用)
|
||||
available_actions = None
|
||||
if self.enable_planner:
|
||||
try:
|
||||
await self.action_modifier.modify_actions_for_normal_chat(
|
||||
self.chat_stream, self.recent_replies, message.processed_plain_text
|
||||
)
|
||||
available_actions = self.action_manager.get_using_actions_for_mode("normal")
|
||||
except Exception as e:
|
||||
logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}")
|
||||
available_actions = None
|
||||
|
||||
# 定义并行执行的任务
|
||||
async def generate_normal_response():
|
||||
"""生成普通回复"""
|
||||
try:
|
||||
return await self.gpt.generate_response(
|
||||
message=message,
|
||||
available_actions=available_actions,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
async def plan_and_execute_actions():
|
||||
"""规划和执行额外动作"""
|
||||
if not self.enable_planner:
|
||||
logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划")
|
||||
return None
|
||||
|
||||
try:
|
||||
# 获取发送者名称(动作修改已在并行执行前完成)
|
||||
sender_name = self._get_sender_name(message)
|
||||
|
||||
no_action = {
|
||||
"action_result": {
|
||||
"action_type": "no_action",
|
||||
"action_data": {},
|
||||
"reasoning": "规划器初始化默认",
|
||||
"is_parallel": True,
|
||||
},
|
||||
"chat_context": "",
|
||||
"action_prompt": "",
|
||||
}
|
||||
|
||||
# 检查是否应该跳过规划
|
||||
if self.action_modifier.should_skip_planning():
|
||||
logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划")
|
||||
self.action_type = "no_action"
|
||||
return no_action
|
||||
|
||||
# 执行规划
|
||||
plan_result = await self.planner.plan(message, sender_name)
|
||||
action_type = plan_result["action_result"]["action_type"]
|
||||
action_data = plan_result["action_result"]["action_data"]
|
||||
reasoning = plan_result["action_result"]["reasoning"]
|
||||
is_parallel = plan_result["action_result"].get("is_parallel", False)
|
||||
|
||||
logger.info(
|
||||
f"[{self.stream_name}] Planner决策: {action_type}, 理由: {reasoning}, 并行执行: {is_parallel}"
|
||||
)
|
||||
self.action_type = action_type # 更新实例属性
|
||||
self.is_parallel_action = is_parallel # 新增:保存并行执行标志
|
||||
|
||||
# 如果规划器决定不执行任何动作
|
||||
if action_type == "no_action":
|
||||
logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作")
|
||||
return no_action
|
||||
|
||||
# 执行额外的动作(不影响回复生成)
|
||||
action_result = await self._execute_action(action_type, action_data, message, thinking_id)
|
||||
if action_result is not None:
|
||||
logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成")
|
||||
else:
|
||||
logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败")
|
||||
|
||||
return {
|
||||
"action_type": action_type,
|
||||
"action_data": action_data,
|
||||
"reasoning": reasoning,
|
||||
"is_parallel": is_parallel,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.stream_name}] Planner执行失败: {e}")
|
||||
return no_action
|
||||
|
||||
# 并行执行回复生成和动作规划
|
||||
self.action_type = None # 初始化动作类型
|
||||
self.is_parallel_action = False # 初始化并行动作标志
|
||||
response_set, plan_result = await asyncio.gather(
|
||||
generate_normal_response(), plan_and_execute_actions(), return_exceptions=True
|
||||
)
|
||||
|
||||
# 处理生成回复的结果
|
||||
if isinstance(response_set, Exception):
|
||||
logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}")
|
||||
response_set = None
|
||||
|
||||
# 处理规划结果(可选,不影响回复)
|
||||
if isinstance(plan_result, Exception):
|
||||
logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}")
|
||||
elif plan_result:
|
||||
logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}")
|
||||
|
||||
if not response_set or (
|
||||
self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action
|
||||
):
|
||||
if not response_set:
|
||||
logger.info(f"[{self.stream_name}] 模型未生成回复内容")
|
||||
elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action:
|
||||
logger.info(f"[{self.stream_name}] 模型选择其他动作(非并行动作)")
|
||||
# 如果模型未生成回复,移除思考消息
|
||||
container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id
|
||||
for msg in container.messages[:]:
|
||||
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
|
||||
container.messages.remove(msg)
|
||||
logger.debug(f"[{self.stream_name}] 已移除未产生回复的思考消息 {thinking_id}")
|
||||
break
|
||||
# 需要在此处也调用 not_reply_handle 和 delete 吗?
|
||||
# 如果是因为模型没回复,也算是一种 "未回复"
|
||||
return False
|
||||
|
||||
# logger.info(f"[{self.stream_name}] 回复内容: {response_set}")
|
||||
|
||||
if self._disabled:
|
||||
logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。")
|
||||
return False
|
||||
|
||||
# 发送回复 (不再需要传入 chat)
|
||||
first_bot_msg = await self._add_messages_to_manager(message, response_set, thinking_id)
|
||||
|
||||
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
|
||||
if first_bot_msg:
|
||||
# 消息段已在接收消息时更新,这里不需要额外处理
|
||||
|
||||
# 记录回复信息到最近回复列表中
|
||||
reply_info = {
|
||||
"time": time.time(),
|
||||
"user_message": message.processed_plain_text,
|
||||
"user_info": {
|
||||
"user_id": message.message_info.user_info.user_id,
|
||||
"user_nickname": message.message_info.user_info.user_nickname,
|
||||
},
|
||||
"response": response_set,
|
||||
# "is_mentioned": is_mentioned,
|
||||
"is_reference_reply": message.reply is not None, # 判断是否为引用回复
|
||||
# "timing": {k: round(v, 2) for k, v in timing_results.items()},
|
||||
}
|
||||
self.recent_replies.append(reply_info)
|
||||
# 保持最近回复历史在限定数量内
|
||||
if len(self.recent_replies) > self.max_replies_history:
|
||||
self.recent_replies = self.recent_replies[-self.max_replies_history :]
|
||||
return response_set if response_set else False
|
||||
|
||||
# 改为实例方法, 移除 chat 参数
|
||||
|
||||
async def start_chat(self):
|
||||
@@ -862,8 +981,16 @@ class NormalChat:
|
||||
self._chat_task = None
|
||||
|
||||
try:
|
||||
logger.debug(f"[{self.stream_name}] 创建新的聊天轮询任务")
|
||||
polling_task = asyncio.create_task(self._reply_interested_message())
|
||||
logger.info(f"[{self.stream_name}] 创建新的聊天轮询任务,模式: {self.reply_mode}")
|
||||
if self.reply_mode == "priority":
|
||||
polling_task_send = asyncio.create_task(self._priority_chat_loop())
|
||||
polling_task_recv = asyncio.create_task(self._priority_chat_loop_add_message())
|
||||
print("555")
|
||||
polling_task = asyncio.gather(polling_task_send, polling_task_recv)
|
||||
print("666")
|
||||
|
||||
else: # 默认或 "interest" 模式
|
||||
polling_task = asyncio.create_task(self._reply_interested_message())
|
||||
|
||||
# 设置回调
|
||||
polling_task.add_done_callback(lambda t: self._handle_task_completion(t))
|
||||
@@ -902,7 +1029,7 @@ class NormalChat:
|
||||
# 尝试获取异常,但不抛出
|
||||
exc = task.exception()
|
||||
if exc:
|
||||
logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}")
|
||||
logger.error(f"[{self.stream_name}] 任务异常: {type(exc).__name__}: {exc}", exc_info=exc)
|
||||
else:
|
||||
logger.debug(f"[{self.stream_name}] 任务正常完成")
|
||||
except Exception as e:
|
||||
|
||||
108
src/chat/normal_chat/priority_manager.py
Normal file
108
src/chat/normal_chat/priority_manager.py
Normal file
@@ -0,0 +1,108 @@
|
||||
import time
|
||||
import heapq
|
||||
import math
|
||||
from typing import List, Dict, Optional
|
||||
from ..message_receive.message import MessageRecv
|
||||
from src.common.logger import get_logger
|
||||
|
||||
logger = get_logger("normal_chat")
|
||||
|
||||
|
||||
class PrioritizedMessage:
|
||||
"""带有优先级的消息对象"""
|
||||
|
||||
def __init__(self, message: MessageRecv, interest_scores: List[float], is_vip: bool = False):
|
||||
self.message = message
|
||||
self.arrival_time = time.time()
|
||||
self.interest_scores = interest_scores
|
||||
self.is_vip = is_vip
|
||||
self.priority = self.calculate_priority()
|
||||
|
||||
def calculate_priority(self, decay_rate: float = 0.01) -> float:
|
||||
"""
|
||||
计算优先级分数。
|
||||
优先级 = 兴趣分 * exp(-衰减率 * 消息年龄)
|
||||
"""
|
||||
age = time.time() - self.arrival_time
|
||||
decay_factor = math.exp(-decay_rate * age)
|
||||
priority = sum(self.interest_scores) + decay_factor
|
||||
return priority
|
||||
|
||||
def __lt__(self, other: "PrioritizedMessage") -> bool:
|
||||
"""用于堆排序的比较函数,我们想要一个最大堆,所以用 >"""
|
||||
return self.priority > other.priority
|
||||
|
||||
|
||||
class PriorityManager:
|
||||
"""
|
||||
管理消息队列,根据优先级选择消息进行处理。
|
||||
"""
|
||||
|
||||
def __init__(self, interest_dict: Dict[str, float], normal_queue_max_size: int = 5):
|
||||
self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆)
|
||||
self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆)
|
||||
self.interest_dict = interest_dict if interest_dict is not None else {}
|
||||
self.normal_queue_max_size = normal_queue_max_size
|
||||
|
||||
def _get_interest_score(self, user_id: str) -> float:
|
||||
"""获取用户的兴趣分,默认为1.0"""
|
||||
return self.interest_dict.get("interests", {}).get(user_id, 1.0)
|
||||
|
||||
def add_message(self, message: MessageRecv, interest_score: Optional[float] = None):
|
||||
"""
|
||||
添加新消息到合适的队列中。
|
||||
"""
|
||||
user_id = message.message_info.user_info.user_id
|
||||
is_vip = message.priority_info.get("message_type") == "vip" if message.priority_info else False
|
||||
message_priority = message.priority_info.get("message_priority", 0.0) if message.priority_info else 0.0
|
||||
|
||||
p_message = PrioritizedMessage(message, [interest_score, message_priority], is_vip)
|
||||
|
||||
if is_vip:
|
||||
heapq.heappush(self.vip_queue, p_message)
|
||||
logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}")
|
||||
else:
|
||||
if len(self.normal_queue) >= self.normal_queue_max_size:
|
||||
# 如果队列已满,只在消息优先级高于最低优先级消息时才添加
|
||||
if p_message.priority > self.normal_queue[0].priority:
|
||||
heapq.heapreplace(self.normal_queue, p_message)
|
||||
logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}")
|
||||
else:
|
||||
logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}")
|
||||
else:
|
||||
heapq.heappush(self.normal_queue, p_message)
|
||||
logger.debug(
|
||||
f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}"
|
||||
)
|
||||
|
||||
def get_highest_priority_message(self) -> Optional[MessageRecv]:
|
||||
"""
|
||||
从VIP和普通队列中获取当前最高优先级的消息。
|
||||
"""
|
||||
# 更新所有消息的优先级
|
||||
for p_msg in self.vip_queue:
|
||||
p_msg.priority = p_msg.calculate_priority()
|
||||
for p_msg in self.normal_queue:
|
||||
p_msg.priority = p_msg.calculate_priority()
|
||||
|
||||
# 重建堆
|
||||
heapq.heapify(self.vip_queue)
|
||||
heapq.heapify(self.normal_queue)
|
||||
|
||||
vip_msg = self.vip_queue[0] if self.vip_queue else None
|
||||
normal_msg = self.normal_queue[0] if self.normal_queue else None
|
||||
|
||||
if vip_msg:
|
||||
return heapq.heappop(self.vip_queue).message
|
||||
elif normal_msg:
|
||||
return heapq.heappop(self.normal_queue).message
|
||||
else:
|
||||
return None
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
"""检查所有队列是否为空"""
|
||||
return not self.vip_queue and not self.normal_queue
|
||||
|
||||
def get_queue_status(self) -> str:
|
||||
"""获取队列状态信息"""
|
||||
return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}"
|
||||
Reference in New Issue
Block a user