feat:基于不同心流单独的发送器实例,反正能跑,但我也不知道能不能跑

睡觉
This commit is contained in:
SengokuCola
2025-04-23 00:41:46 +08:00
parent bcf295905e
commit 2eecd746af
14 changed files with 568 additions and 672 deletions

View File

@@ -7,7 +7,6 @@ from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinki
from src.plugins.chat.message import MessageSet, Seg # Local import needed after move
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import UserInfo
from src.heart_flow.heartflow import heartflow, SubHeartflow
from src.plugins.chat.chat_stream import chat_manager
from src.common.logger import get_module_logger, LogConfig, PFC_STYLE_CONFIG # 引入 DEFAULT_CONFIG
from src.plugins.models.utils_model import LLMRequest
@@ -18,7 +17,7 @@ from src.plugins.utils.timer_calculater import Timer # <--- Import Timer
from .heartFC_generator import ResponseGenerator # Assuming this is the type for gpt
from src.do_tool.tool_use import ToolUser
from src.plugins.chat.emoji_manager import EmojiManager # Assuming this is the type
from .heartflow_message_sender import MessageManager # Assuming this is the type
from ..chat.message_sender import message_manager # <-- Import the global manager
# --- End import ---
@@ -37,7 +36,8 @@ logger = get_module_logger("HeartFCLoop", config=interest_log_config) # Logger
if TYPE_CHECKING:
# Keep this if HeartFCController methods are still needed elsewhere,
# but the instance variable will be removed from HeartFChatting
from .heartFC_controler import HeartFCController
# from .heartFC_controler import HeartFCController
from src.heart_flow.heartflow import SubHeartflow, heartflow # <-- 同时导入 heartflow 实例用于类型检查
PLANNER_TOOL_DEFINITION = [
{
@@ -76,54 +76,51 @@ class HeartFChatting:
def __init__(self,
chat_id: str,
# --- Explicit Dependencies ---
gpt_instance: ResponseGenerator,
tool_user_instance: ToolUser,
emoji_manager_instance: EmojiManager,
message_manager_instance: MessageManager
# --- End Explicit Dependencies ---
# 显式依赖注入
gpt_instance: ResponseGenerator, # 文本回复生成器
tool_user_instance: ToolUser, # 工具使用实例
emoji_manager_instance: EmojiManager, # 表情管理实例
):
"""
初始化HeartFChatting实例。
Args:
chat_id: The identifier for the chat stream (e.g., stream_id).
gpt_instance: The ResponseGenerator instance for generating text replies.
tool_user_instance: The ToolUser instance for using tools.
emoji_manager_instance: The EmojiManager instance for handling emojis.
message_manager_instance: The MessageManager instance for sending/managing messages.
HeartFChatting 初始化函数
参数:
chat_id: 聊天流唯一标识符(如stream_id)
gpt_instance: 文本回复生成器实例
tool_user_instance: 工具使用实例
emoji_manager_instance: 表情管理实例
"""
self.stream_id: str = chat_id
self.chat_stream: Optional[ChatStream] = None
self.sub_hf: Optional[SubHeartflow] = None
self._initialized = False
self._init_lock = asyncio.Lock() # Ensure initialization happens only once
self._processing_lock = asyncio.Lock() # 确保只有一个 Plan-Replier-Sender 周期在运行
self._timer_lock = asyncio.Lock() # 用于安全更新计时器
# 基础属性
self.stream_id: str = chat_id # 聊天流ID
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流
self.sub_hf: Optional[SubHeartflow] = None # 关联的子心流
# 初始化状态控制
self._initialized = False # 是否已初始化标志
self._init_lock = asyncio.Lock() # 初始化锁(确保只初始化一次)
self._processing_lock = asyncio.Lock() # 处理锁(确保单次Plan-Replier-Sender周期)
self._timer_lock = asyncio.Lock() # 计时器锁(安全更新计时器)
# --- Store Dependencies ---
self.gpt_instance = gpt_instance
self.tool_user = tool_user_instance
self.emoji_manager = emoji_manager_instance
self.message_manager = message_manager_instance
# --- End Store Dependencies ---
# 依赖注入存储
self.gpt_instance = gpt_instance # 文本回复生成器
self.tool_user = tool_user_instance # 工具使用实例
self.emoji_manager = emoji_manager_instance # 表情管理实例
# Access LLM config through global_config or pass if needed
# LLM规划器配置
self.planner_llm = LLMRequest(
model=global_config.llm_normal,
temperature=global_config.llm_normal["temp"],
max_tokens=1000,
request_type="action_planning",
request_type="action_planning", # 用于动作规划
)
# Internal state for loop control
self._loop_timer: float = 0.0 # Remaining time for the loop in seconds
self._loop_active: bool = False # Is the loop currently running?
self._loop_task: Optional[asyncio.Task] = None # Stores the main loop task
self._trigger_count_this_activation: int = 0 # Counts triggers within an active period
# 循环控制内部状态
self._loop_timer: float = 0.0 # 循环剩余时间(秒)
self._loop_active: bool = False # 循环是否正在运行
self._loop_task: Optional[asyncio.Task] = None # 主循环任务
self._trigger_count_this_activation: int = 0 # 当前激活周期内的触发计数
self._initial_duration: float = INITIAL_DURATION # 首次触发增加的时间
self._last_added_duration: float = self._initial_duration # <--- 新增:存储上次增加的时间
self._last_added_duration: float = self._initial_duration # 上次增加的时间
def _get_log_prefix(self) -> str:
"""获取日志前缀,包含可读的流名称"""
@@ -146,6 +143,8 @@ class HeartFChatting:
logger.error(f"{log_prefix} 获取ChatStream失败。")
return False
# <-- 在这里导入 heartflow 实例
from src.heart_flow.heartflow import heartflow
self.sub_hf = heartflow.get_subheartflow(self.stream_id)
if not self.sub_hf:
logger.warning(f"{log_prefix} 获取SubHeartflow失败。一些功能可能受限。")
@@ -245,7 +244,7 @@ class HeartFChatting:
cycle_timers = {} # <--- Initialize timers dict for this cycle
# Access MessageManager directly
if self.message_manager.check_if_sending_message_exist(self.stream_id, thinking_id):
if message_manager.check_if_sending_message_exist(self.stream_id, thinking_id):
# logger.info(f"{log_prefix} HeartFChatting: 11111111111111111111111111111111麦麦还在发消息等会再规划")
await asyncio.sleep(1)
continue
@@ -318,7 +317,7 @@ class HeartFChatting:
)
except Exception as e_replier:
logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}")
self._cleanup_thinking_message(thinking_id)
# self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call
if replier_result:
# --- Sender Work --- #
@@ -334,10 +333,10 @@ class HeartFChatting:
except Exception as e_sender:
logger.error(f"{log_prefix} 循环: 发送器失败: {e_sender}")
# _sender should handle cleanup, but double check
# self._cleanup_thinking_message(thinking_id)
# self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call
else:
logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.")
self._cleanup_thinking_message(thinking_id)
# self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call
elif action == "emoji_reply":
logger.info(
f"{log_prefix} HeartFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}"
@@ -652,16 +651,25 @@ class HeartFChatting:
logger.error(traceback.format_exc())
return None
def _cleanup_thinking_message(self, thinking_id: str):
"""Safely removes the thinking message."""
log_prefix = self._get_log_prefix()
try:
# Access MessageManager directly
container = self.message_manager.get_container(self.stream_id)
container.remove_message(thinking_id, msg_type=MessageThinking)
logger.debug(f"{log_prefix} Cleaned up thinking message {thinking_id}.")
except Exception as e:
logger.error(f"{log_prefix} Error cleaning up thinking message {thinking_id}: {e}")
# def _cleanup_thinking_message(self, thinking_id: str):
# """Safely removes the thinking message."""
# log_prefix = self._get_log_prefix()
# try:
# # Access MessageManager directly
# container = await message_manager.get_container(self.stream_id)
# # container.remove_message(thinking_id, msg_type=MessageThinking) # Need to find the message object first
# found_msg = None
# for msg in container.get_all_messages():
# if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
# found_msg = msg
# break
# if found_msg:
# container.remove_message(found_msg)
# logger.debug(f"{log_prefix} Cleaned up thinking message {thinking_id}.")
# else:
# logger.warning(f"{log_prefix} Could not find thinking message {thinking_id} to cleanup.")
# except Exception as e:
# logger.error(f"{log_prefix} Error cleaning up thinking message {thinking_id}: {e}")
# --- 发送器 (Sender) --- #
async def _sender(
@@ -774,10 +782,10 @@ class HeartFChatting:
# Ensure generate_response has access to current_mind if it's crucial context
# Access gpt_instance directly
response_set = await self.gpt_instance.generate_response(
self.sub_hf,
reason,
anchor_message, # Pass anchor_message positionally (matches 'message' parameter)
thinking_id, # Pass thinking_id positionally
current_mind_info=self.sub_hf.current_mind,
reason=reason,
message=anchor_message, # Pass anchor_message positionally (matches 'message' parameter)
thinking_id=thinking_id, # Pass thinking_id positionally
)
if not response_set:
@@ -818,7 +826,7 @@ class HeartFChatting:
thinking_start_time=thinking_time_point,
)
# Access MessageManager directly
self.message_manager.add_message(thinking_message)
await message_manager.add_message(thinking_message)
return thinking_id
async def _send_response_messages(
@@ -831,7 +839,7 @@ class HeartFChatting:
chat = anchor_message.chat_stream
# Access MessageManager directly
container = self.message_manager.get_container(chat.stream_id)
container = await message_manager.get_container(chat.stream_id)
thinking_message = None
# 移除思考消息
@@ -875,7 +883,7 @@ class HeartFChatting:
message_set.add_message(bot_message)
# Access MessageManager directly
self.message_manager.add_message(message_set)
await message_manager.add_message(message_set)
return first_bot_msg
async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""):
@@ -917,4 +925,4 @@ class HeartFChatting:
is_emoji=True,
)
# Access MessageManager directly
self.message_manager.add_message(bot_message)
await message_manager.add_message(bot_message)