From d7ca0255febba87868242d13ca0ac34ce3b01a3f Mon Sep 17 00:00:00 2001
From: SengokuCola <1026294844@qq.com>
Date: Fri, 25 Apr 2025 18:12:11 +0800
Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E8=BF=9B=E4=B8=80=E6=AD=A5?=
=?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=8C=96=EF=BC=8C=E4=BF=AE=E5=A4=8D=E8=A7=82?=
=?UTF-8?q?=E5=AF=9F=E9=94=99=E4=BD=8D=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/common/logger.py | 10 +-
src/heart_flow/mai_state_manager.py | 18 +-
src/heart_flow/observation.py | 3 +
src/heart_flow/sub_heartflow.py | 129 +-
src/heart_flow/sub_mind.py | 9 +-
src/heart_flow/subheartflow_manager.py | 144 +--
src/main.py | 2 +-
src/plugins/chat/utils_image.py | 2 +-
src/plugins/emoji_system/emoji_manager.py | 14 +-
src/plugins/heartFC_chat/heartFC_chat.py | 1078 +++++++++++------
src/plugins/heartFC_chat/heartFC_generator.py | 3 -
src/plugins/heartFC_chat/heartFC_readme.md | 159 +++
.../heartFC_chat/heartflow_processor.py | 282 ++---
.../heartFC_chat/heartflow_prompt_builder.py | 28 +-
src/plugins/moods/moods.py | 2 +-
src/plugins/utils/chat_message_builder.py | 2 +-
16 files changed, 1217 insertions(+), 668 deletions(-)
create mode 100644 src/plugins/heartFC_chat/heartFC_readme.md
diff --git a/src/common/logger.py b/src/common/logger.py
index 19463c0fc..4ed69f320 100644
--- a/src/common/logger.py
+++ b/src/common/logger.py
@@ -163,13 +163,13 @@ MOOD_STYLE_CONFIG = {
"console_format": (
"{time:YYYY-MM-DD HH:mm:ss} | "
"{level: <8} | "
- "心情 | "
+ "心情 | "
"{message}"
),
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 心情 | {message}",
},
"simple": {
- "console_format": "{time:MM-DD HH:mm} | 心情 | {message}",
+ "console_format": "{time:MM-DD HH:mm} | 心情 | {message} ",
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 心情 | {message}",
},
}
@@ -315,14 +315,14 @@ CHAT_STYLE_CONFIG = {
"console_format": (
"{time:YYYY-MM-DD HH:mm:ss} | "
"{level: <8} | "
- "见闻 | "
+ "见闻 | "
"{message}"
),
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 见闻 | {message}",
},
"simple": {
"console_format": (
- "{time:MM-DD HH:mm} | 见闻 | {message}"
+ "{time:MM-DD HH:mm} | 见闻 | {message}"
), # noqa: E501
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 见闻 | {message}",
},
@@ -387,7 +387,7 @@ SUBHEARTFLOW_MANAGER_STYLE_CONFIG = {
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦水群[管理] | {message}",
},
"simple": {
- "console_format": ("{time:MM-DD HH:mm} | 麦麦水群[管理] | {message}"), # noqa: E501
+ "console_format": ("{time:MM-DD HH:mm} | 麦麦水群[管理] | {message}"), # noqa: E501
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 麦麦水群[管理] | {message}",
},
}
diff --git a/src/heart_flow/mai_state_manager.py b/src/heart_flow/mai_state_manager.py
index 6f645f670..f8d4341eb 100644
--- a/src/heart_flow/mai_state_manager.py
+++ b/src/heart_flow/mai_state_manager.py
@@ -13,8 +13,8 @@ mai_state_config = LogConfig(
logger = get_module_logger("mai_state_manager", config=mai_state_config)
-# enable_unlimited_hfc_chat = True
-enable_unlimited_hfc_chat = False
+enable_unlimited_hfc_chat = True
+# enable_unlimited_hfc_chat = False
class MaiState(enum.Enum):
@@ -22,14 +22,14 @@ class MaiState(enum.Enum):
聊天状态:
OFFLINE: 不在线:回复概率极低,不会进行任何聊天
PEEKING: 看一眼手机:回复概率较低,会进行一些普通聊天
- NORMAL_CHAT: 正常聊天:回复概率较高,会进行一些普通聊天和少量的专注聊天
+ NORMAL_CHAT: 正常看手机:回复概率较高,会进行一些普通聊天和少量的专注聊天
FOCUSED_CHAT: 专注聊天:回复概率极高,会进行专注聊天和少量的普通聊天
"""
OFFLINE = "不在线"
- PEEKING = "看一眼"
- NORMAL_CHAT = "正常聊天"
- FOCUSED_CHAT = "专心聊天"
+ PEEKING = "看一眼手机"
+ NORMAL_CHAT = "正常看手机"
+ FOCUSED_CHAT = "专心看手机"
def get_normal_chat_max_num(self):
# 调试用
@@ -137,11 +137,11 @@ class MaiStateManager:
if current_status == MaiState.OFFLINE:
logger.info("当前[离线],没看手机,思考要不要上线看看......")
elif current_status == MaiState.PEEKING:
- logger.info("当前[看一眼],思考要不要继续聊下去......")
+ logger.info("当前[看一眼手机],思考要不要继续聊下去......")
elif current_status == MaiState.NORMAL_CHAT:
- logger.info("当前在[正常聊天]思考要不要继续聊下去......")
+ logger.info("当前在[正常看手机]思考要不要继续聊下去......")
elif current_status == MaiState.FOCUSED_CHAT:
- logger.info("当前在[专心聊天]思考要不要继续聊下去......")
+ logger.info("当前在[专心看手机]思考要不要继续聊下去......")
# 1. 麦麦每分钟都有概率离线
if time_since_last_min_check >= 60:
diff --git a/src/heart_flow/observation.py b/src/heart_flow/observation.py
index 9391a660a..790c21805 100644
--- a/src/heart_flow/observation.py
+++ b/src/heart_flow/observation.py
@@ -22,6 +22,9 @@ class Observation:
self.observe_type = observe_type
self.observe_id = observe_id
self.last_observe_time = datetime.now().timestamp() # 初始化为当前时间
+
+ async def observe(self):
+ pass
# 聊天观察
diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py
index 7397a37f3..91ddc2cd7 100644
--- a/src/heart_flow/sub_heartflow.py
+++ b/src/heart_flow/sub_heartflow.py
@@ -43,6 +43,7 @@ class InterestChatting:
max_probability=max_reply_probability,
state_change_callback: Optional[Callable[[ChatState], None]] = None,
):
+ # 基础属性初始化
self.interest_level: float = 0.0
self.last_update_time: float = time.time()
self.decay_rate_per_second: float = decay_rate
@@ -56,16 +57,26 @@ class InterestChatting:
self.max_reply_probability: float = max_probability
self.current_reply_probability: float = 0.0
self.is_above_threshold: bool = False
+
+ # 任务相关属性初始化
self.update_task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
+ self._task_lock = asyncio.Lock()
+ self._is_running = False
self.interest_dict: Dict[str, tuple[MessageRecv, float, bool]] = {}
self.update_interval = 1.0
- self.start_updates(self.update_interval) # 初始化时启动后台更新任务
self.above_threshold = False
self.start_hfc_probability = 0.0
+ @classmethod
+ async def create(cls, *args, **kwargs):
+ """异步工厂方法,用于创建并初始化 InterestChatting 实例"""
+ instance = cls(*args, **kwargs)
+ await instance.start_updates(instance.update_interval)
+ return instance
+
def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool):
self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned)
self.last_interaction_time = time.time()
@@ -141,59 +152,74 @@ class InterestChatting:
# --- 新增后台更新任务相关方法 ---
async def _run_update_loop(self, update_interval: float = 1.0):
"""后台循环,定期更新兴趣和回复概率。"""
- while not self._stop_event.is_set():
- try:
- if self.interest_level != 0:
- await self._calculate_decay()
+ try:
+ while not self._stop_event.is_set():
+ try:
+ if self.interest_level != 0:
+ await self._calculate_decay()
- await self._update_reply_probability()
+ await self._update_reply_probability()
- # 等待下一个周期或停止事件
- await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
- except asyncio.TimeoutError:
- # 正常超时,继续循环
- continue
- except asyncio.CancelledError:
- logger.info("InterestChatting 更新循环被取消。")
- break
- except Exception as e:
- logger.error(f"InterestChatting 更新循环出错: {e}")
- logger.error(traceback.format_exc())
- # 防止错误导致CPU飙升,稍作等待
- await asyncio.sleep(5)
- logger.info("InterestChatting 更新循环已停止。")
+ # 等待下一个周期或停止事件
+ await asyncio.wait_for(self._stop_event.wait(), timeout=update_interval)
+ except asyncio.TimeoutError:
+ # 正常超时,继续循环
+ continue
+ except Exception as e:
+ logger.error(f"InterestChatting 更新循环出错: {e}")
+ logger.error(traceback.format_exc())
+ # 防止错误导致CPU飙升,稍作等待
+ await asyncio.sleep(5)
+ except asyncio.CancelledError:
+ logger.info("InterestChatting 更新循环被取消。")
+ finally:
+ self._is_running = False
+ logger.info("InterestChatting 更新循环已停止。")
- def start_updates(self, update_interval: float = 1.0):
- """启动后台更新任务"""
- if self.update_task is None or self.update_task.done():
- self._stop_event.clear()
- self.update_task = asyncio.create_task(self._run_update_loop(update_interval))
- logger.debug("后台兴趣更新任务已创建并启动。")
- else:
- logger.debug("后台兴趣更新任务已在运行中。")
+ async def start_updates(self, update_interval: float = 1.0):
+ """启动后台更新任务,使用锁确保并发安全"""
+ async with self._task_lock:
+ if self._is_running:
+ logger.debug("后台兴趣更新任务已在运行中。")
+ return
+
+ # 清理已完成或已取消的任务
+ if self.update_task and (self.update_task.done() or self.update_task.cancelled()):
+ self.update_task = None
+
+ if not self.update_task:
+ self._stop_event.clear()
+ self._is_running = True
+ self.update_task = asyncio.create_task(self._run_update_loop(update_interval))
+ logger.debug("后台兴趣更新任务已创建并启动。")
async def stop_updates(self):
- """停止后台更新任务"""
- if self.update_task and not self.update_task.done():
+ """停止后台更新任务,使用锁确保并发安全"""
+ async with self._task_lock:
+ if not self._is_running:
+ logger.debug("后台兴趣更新任务未运行。")
+ return
+
logger.info("正在停止 InterestChatting 后台更新任务...")
- self._stop_event.set() # 发送停止信号
- try:
- # 等待任务结束,设置超时
- await asyncio.wait_for(self.update_task, timeout=5.0)
- logger.info("InterestChatting 后台更新任务已成功停止。")
- except asyncio.TimeoutError:
- logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
- self.update_task.cancel()
+ self._stop_event.set()
+
+ if self.update_task and not self.update_task.done():
try:
- await self.update_task # 等待取消完成
- except asyncio.CancelledError:
- logger.info("InterestChatting 后台更新任务已被取消。")
- except Exception as e:
- logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
- finally:
- self.update_task = None
- else:
- logger.debug("InterestChatting 后台更新任务未运行或已完成。")
+ # 等待任务结束,设置超时
+ await asyncio.wait_for(self.update_task, timeout=5.0)
+ logger.info("InterestChatting 后台更新任务已成功停止。")
+ except asyncio.TimeoutError:
+ logger.warning("停止 InterestChatting 后台任务超时,尝试取消...")
+ self.update_task.cancel()
+ try:
+ await self.update_task # 等待取消完成
+ except asyncio.CancelledError:
+ logger.info("InterestChatting 后台更新任务已被取消。")
+ except Exception as e:
+ logger.error(f"停止 InterestChatting 后台任务时发生异常: {e}")
+ finally:
+ self.update_task = None
+ self._is_running = False
# --- 结束 新增方法 ---
@@ -214,7 +240,7 @@ class SubHeartflow:
# 聊天状态管理
self.chat_state: ChatStateInfo = ChatStateInfo() # 该sub_heartflow的聊天状态信息
- self.interest_chatting = InterestChatting(state_change_callback=self.set_chat_state)
+ self.interest_chatting = None # 将在 initialize 中创建
# 活动状态管理
self.last_active_time = time.time() # 最后活跃时间
@@ -234,6 +260,11 @@ class SubHeartflow:
self.log_prefix = chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id
+ async def initialize(self):
+ """异步初始化方法"""
+ self.interest_chatting = await InterestChatting.create(state_change_callback=self.set_chat_state)
+ logger.debug(f"{self.log_prefix} InterestChatting 实例已创建并初始化。")
+
async def add_time_current_state(self, add_time: float):
self.current_state_time += add_time
@@ -412,7 +443,7 @@ class SubHeartflow:
- 负责子心流的主要后台循环
- 每30秒检查一次停止标志
"""
- logger.info(f"{self.log_prefix} 子心流开始工作...")
+ logger.trace(f"{self.log_prefix} 子心流开始工作...")
while not self.should_stop:
await asyncio.sleep(30) # 30秒检查一次停止标志
diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py
index 92f0a9606..111c2cf5c 100644
--- a/src/heart_flow/sub_mind.py
+++ b/src/heart_flow/sub_mind.py
@@ -10,7 +10,7 @@ from ..plugins.utils.prompt_builder import Prompt, global_prompt_manager
from src.do_tool.tool_use import ToolUser
from src.plugins.utils.json_utils import safe_json_dumps, normalize_llm_response, process_llm_tool_calls
from src.heart_flow.chat_state_info import ChatStateInfo
-
+from src.plugins.chat.chat_stream import chat_manager
subheartflow_config = LogConfig(
console_format=SUB_HEARTFLOW_STYLE_CONFIG["console_format"],
@@ -30,6 +30,8 @@ def init_prompt():
prompt += "现在请你生成你的内心想法,要求思考群里正在进行的话题,之前大家聊过的话题,群里成员的关系。"
prompt += "请你思考,要不要对群里的话题进行回复,以及如何对群聊内容进行回复\n"
prompt += "回复的要求是:不要总是重复自己提到过的话题,如果你要回复,最好只回复一个人的一个话题\n"
+ prompt += "如果最后一条消息是你自己发的,观察到的内容只有你自己的发言,并且之后没有人回复你,不要回复。"
+ prompt += "如果聊天记录中最新的消息是你自己发送的,并且你还想继续回复,你应该紧紧衔接你发送的消息,进行话题的深入,补充,或追问等等。"
prompt += "请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要回复自己的发言\n"
prompt += "现在请你先输出想法,{hf_do_next},不要分点输出,文字不要浮夸"
prompt += "在输出完想法后,请你思考应该使用什么工具。工具可以帮你取得一些你不知道的信息,或者进行一些操作。"
@@ -138,7 +140,7 @@ class SubMind:
hf_do_next=hf_do_next,
)
- logger.debug(f"[{self.subheartflow_id}] 心流思考提示词构建完成")
+ # logger.debug(f"[{self.subheartflow_id}] 心流思考提示词构建完成")
# ---------- 5. 执行LLM请求并处理响应 ----------
content = "" # 初始化内容变量
@@ -190,7 +192,8 @@ class SubMind:
content = "思考过程中出现错误"
# 记录最终思考结果
- logger.debug(f"[{self.subheartflow_id}] 心流思考结果:\n{content}\n")
+ name = chat_manager.get_stream_name(self.subheartflow_id)
+ logger.debug(f"[{name}] \nPrompt:\n{prompt}\n\n心流思考结果:\n{content}\n")
# 处理空响应情况
if not content:
diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py
index b9703e53b..dcc455917 100644
--- a/src/heart_flow/subheartflow_manager.py
+++ b/src/heart_flow/subheartflow_manager.py
@@ -75,18 +75,22 @@ class SubHeartflowManager:
return subflow
# 创建新的子心流实例
- logger.info(f"子心流 {subheartflow_id} 不存在,正在创建...")
+ # logger.info(f"子心流 {subheartflow_id} 不存在,正在创建...")
try:
# 初始化子心流
new_subflow = SubHeartflow(subheartflow_id, mai_states)
+ # 异步初始化
+ await new_subflow.initialize()
+
# 添加聊天观察者
observation = ChattingObservation(chat_id=subheartflow_id)
new_subflow.add_observation(observation)
# 注册子心流
self.subheartflows[subheartflow_id] = new_subflow
- logger.info(f"子心流 {subheartflow_id} 创建成功")
+ heartflow_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id
+ logger.info(f"[{heartflow_name}] 开始看消息")
# 启动后台任务
asyncio.create_task(new_subflow.subheartflow_start_working())
@@ -264,104 +268,70 @@ class SubHeartflowManager:
async def evaluate_interest_and_promote(self, current_mai_state: MaiStateInfo):
"""评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)"""
- log_prefix_manager = "[子心流管理器-兴趣评估]"
- logger.debug(f"{log_prefix_manager} 开始周期... 当前状态: {current_mai_state.get_current_state().value}")
-
- # 获取 FOCUSED 状态的数量上限
- current_state_enum = current_mai_state.get_current_state()
- focused_limit = current_state_enum.get_focused_chat_max_num()
+ log_prefix = "[兴趣评估]"
+ current_state = current_mai_state.get_current_state()
+ focused_limit = current_state.get_focused_chat_max_num()
+
+
+ if int(time.time()) % 20 == 0: # 每20秒输出一次
+ logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天")
+
if focused_limit <= 0:
- logger.debug(
- f"{log_prefix_manager} 当前状态 ({current_state_enum.value}) 不允许 FOCUSED 子心流, 跳过提升检查。"
- )
+ # logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流")
return
- # 获取当前 FOCUSED 状态的数量 (初始值)
current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED)
- logger.debug(f"{log_prefix_manager} 专注上限: {focused_limit}, 当前专注数: {current_focused_count}")
+ if current_focused_count >= focused_limit:
+ logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})")
+ return
- # 使用快照安全遍历
- subflows_snapshot = list(self.subheartflows.values())
- promoted_count = 0 # 记录本次提升的数量
- try:
- for sub_hf in subflows_snapshot:
- flow_id = sub_hf.subheartflow_id
- stream_name = chat_manager.get_stream_name(flow_id) or flow_id
- log_prefix_flow = f"[{stream_name}]"
+ states_num = (
+ self.count_subflows_by_state(ChatState.ABSENT),
+ self.count_subflows_by_state(ChatState.CHAT),
+ current_focused_count
+ )
- # 只处理 CHAT 状态的子心流
- # The code snippet is checking if the `chat_status` attribute of `sub_hf.chat_state` is not equal to
- # `ChatState.CHAT`. If the condition is met, the code will continue to the next iteration of the loop
- # or block of code where this snippet is located.
- # if sub_hf.chat_state.chat_status != ChatState.CHAT:
- # continue
-
- # 检查是否满足提升概率
- should_hfc = random.random() < sub_hf.interest_chatting.start_hfc_probability
- if not should_hfc:
+ for sub_hf in list(self.subheartflows.values()):
+ flow_id = sub_hf.subheartflow_id
+ stream_name = chat_manager.get_stream_name(flow_id) or flow_id
+
+ # 跳过非CHAT状态或已经是FOCUSED状态的子心流
+ if sub_hf.chat_state.chat_status == ChatState.FOCUSED:
+ continue
+
+ from .mai_state_manager import enable_unlimited_hfc_chat
+ if not enable_unlimited_hfc_chat:
+ if sub_hf.chat_state.chat_status != ChatState.CHAT:
continue
+
+ # 检查是否满足提升概率
+ if random.random() >= sub_hf.interest_chatting.start_hfc_probability:
+ continue
- # --- 关键检查:检查 FOCUSED 数量是否已达上限 ---
- # 注意:在循环内部再次获取当前数量,因为之前的提升可能已经改变了计数
- # 使用已经记录并在循环中更新的 current_focused_count
- if current_focused_count >= focused_limit:
- logger.debug(
- f"{log_prefix_manager} {log_prefix_flow} 达到专注上限 ({current_focused_count}/{focused_limit}), 无法提升。概率={sub_hf.interest_chatting.start_hfc_probability:.2f}"
- )
- continue # 跳过这个子心流,继续检查下一个
+ # 再次检查是否达到上限
+ if current_focused_count >= focused_limit:
+ logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限")
+ break
- # --- 执行提升 ---
- # 获取当前实例以检查最新状态 (防御性编程)
- current_subflow = self.subheartflows.get(flow_id)
- if not current_subflow:
- logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试提升时状态已改变或实例消失,跳过。")
- continue
+ # 获取最新状态并执行提升
+ current_subflow = self.subheartflows.get(flow_id)
+ if not current_subflow:
+ continue
- logger.info(
- f"{log_prefix_manager} {log_prefix_flow} 兴趣评估触发升级 (prob={sub_hf.interest_chatting.start_hfc_probability:.2f}, 上限:{focused_limit}, 当前:{current_focused_count}) -> FOCUSED"
- )
+ logger.info(
+ f"{log_prefix} [{stream_name}] 触发 激情水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})"
+ )
- states_num = (
- self.count_subflows_by_state(ChatState.ABSENT),
- self.count_subflows_by_state(ChatState.CHAT), # 这个值在提升前计算
- current_focused_count, # 这个值在提升前计算
- )
+ # 执行状态提升
+ await current_subflow.set_chat_state(ChatState.FOCUSED, states_num)
+
+ # 验证提升结果
+ if (final_subflow := self.subheartflows.get(flow_id)) and \
+ final_subflow.chat_state.chat_status == ChatState.FOCUSED:
+ current_focused_count += 1
- # --- 状态设置 ---
- original_state = current_subflow.chat_state.chat_status # 记录原始状态
- await current_subflow.set_chat_state(ChatState.FOCUSED, states_num)
- # --- 状态验证 ---
- final_subflow = self.subheartflows.get(flow_id)
- if final_subflow:
- final_state = final_subflow.chat_state.chat_status
- if final_state == ChatState.FOCUSED:
- logger.debug(
- f"{log_prefix_manager} {log_prefix_flow} 成功从 {original_state.value} 升级到 FOCUSED 状态"
- )
- promoted_count += 1
- # 提升成功后,更新当前专注计数,以便后续检查能使用最新值
- current_focused_count += 1
- elif final_state == original_state: # 状态未变
- logger.warning(
- f"{log_prefix_manager} {log_prefix_flow} 尝试从 {original_state.value} 升级 FOCUSED 失败,状态仍为: {final_state.value} (可能被内部逻辑阻止)"
- )
- else: # 状态变成其他了?
- logger.warning(
- f"{log_prefix_manager} {log_prefix_flow} 尝试从 {original_state.value} 升级 FOCUSED 后状态变为 {final_state.value}"
- )
- else: # 子心流消失了?
- logger.warning(f"{log_prefix_manager} {log_prefix_flow} 升级后验证时子心流 {flow_id} 消失")
-
- except Exception as e:
- logger.error(f"{log_prefix_manager} 兴趣评估周期出错: {e}", exc_info=True)
-
- if promoted_count > 0:
- logger.info(f"{log_prefix_manager} 评估周期结束, 成功提升 {promoted_count} 个子心流到 FOCUSED。")
- else:
- logger.debug(f"{log_prefix_manager} 评估周期结束, 未提升任何子心流。")
-
- async def randomly_deactivate_subflows(self, deactivation_probability: float = 0.3):
+ async def randomly_deactivate_subflows(self, deactivation_probability: float = 0.1):
"""以一定概率将 FOCUSED 或 CHAT 状态的子心流回退到 ABSENT 状态。"""
log_prefix_manager = "[子心流管理器-随机停用]"
logger.debug(f"{log_prefix_manager} 开始随机停用检查... (概率: {deactivation_probability:.0%})")
diff --git a/src/main.py b/src/main.py
index 3ef1ed229..a2d8fc512 100644
--- a/src/main.py
+++ b/src/main.py
@@ -154,7 +154,7 @@ class MainSystem:
"""打印情绪状态"""
while True:
self.mood_manager.print_mood_status()
- await asyncio.sleep(30)
+ await asyncio.sleep(60)
@staticmethod
async def remove_recalled_message_task():
diff --git a/src/plugins/chat/utils_image.py b/src/plugins/chat/utils_image.py
index fb8522b94..f6b9231ad 100644
--- a/src/plugins/chat/utils_image.py
+++ b/src/plugins/chat/utils_image.py
@@ -152,7 +152,7 @@ class ImageManager:
"timestamp": timestamp,
}
db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
- logger.success(f"保存表情包: {file_path}")
+ logger.trace(f"保存表情包: {file_path}")
except Exception as e:
logger.error(f"保存表情包文件失败: {str(e)}")
diff --git a/src/plugins/emoji_system/emoji_manager.py b/src/plugins/emoji_system/emoji_manager.py
index 7222fd3f2..1c73ec780 100644
--- a/src/plugins/emoji_system/emoji_manager.py
+++ b/src/plugins/emoji_system/emoji_manager.py
@@ -28,6 +28,11 @@ EMOJI_DIR = os.path.join("data", "emoji") # 表情包存储目录
EMOJI_REGISTED_DIR = os.path.join("data", "emoji_registed") # 已注册的表情包注册目录
+'''
+还没经过测试,有些地方数据库和内存数据同步可能不完全
+
+'''
+
class MaiEmoji:
"""定义一个表情包"""
@@ -247,10 +252,12 @@ class EmojiManager:
def record_usage(self, hash: str):
"""记录表情使用次数"""
try:
+ db.emoji.update_one({"hash": hash}, {"$inc": {"usage_count": 1}})
for emoji in self.emoji_objects:
if emoji.hash == hash:
emoji.usage_count += 1
break
+
except Exception as e:
logger.error(f"记录表情使用失败: {str(e)}")
@@ -304,12 +311,11 @@ class EmojiManager:
selected_emoji, similarity = random.choice(top_5_emojis)
# 更新使用次数
- db.emoji.update_one({"hash": selected_emoji.hash}, {"$inc": {"usage_count": 1}})
-
- logger.info(f"[匹配] 找到表情包: {selected_emoji.description} (相似度: {similarity:.4f})")
+ self.record_usage(selected_emoji.hash)
time_end = time.time()
- logger.info(f"[匹配] 搜索表情包用时: {time_end - time_start:.2f} 秒")
+
+ logger.info(f"找到[{text_emotion}]表情包,用时:{time_end - time_start:.2f}秒: {selected_emoji.description} (相似度: {similarity:.4f})")
return selected_emoji.path, f"[ {selected_emoji.description} ]"
except Exception as e:
diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py
index ab80beaab..47cb52eb1 100644
--- a/src/plugins/heartFC_chat/heartFC_chat.py
+++ b/src/plugins/heartFC_chat/heartFC_chat.py
@@ -1,7 +1,8 @@
import asyncio
import time
import traceback
-from typing import List, Optional, Dict, Any
+from typing import List, Optional, Dict, Any, Set, Deque
+from collections import deque
from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending
from src.plugins.chat.message import MessageSet, Seg # Local import needed after move
from src.plugins.chat.chat_stream import ChatStream
@@ -20,6 +21,8 @@ from src.plugins.utils.json_utils import process_llm_tool_response # 导入新
from src.heart_flow.sub_mind import SubMind
from src.heart_flow.observation import Observation
from src.plugins.heartFC_chat.heartflow_prompt_builder import global_prompt_manager
+import contextlib
+from src.plugins.utils.chat_message_builder import num_new_messages_since
# --- End import ---
@@ -34,31 +37,175 @@ interest_log_config = LogConfig(
logger = get_module_logger("HeartFCLoop", config=interest_log_config) # Logger Name Changed
-PLANNER_TOOL_DEFINITION = [
- {
- "type": "function",
- "function": {
- "name": "decide_reply_action",
- "description": "根据当前聊天内容和上下文,决定机器人是否应该回复以及如何回复。",
- "parameters": {
- "type": "object",
- "properties": {
- "action": {
- "type": "string",
- "enum": ["no_reply", "text_reply", "emoji_reply"],
- "description": "决定采取的行动:'no_reply'(不回复), 'text_reply'(文本回复, 可选附带表情) 或 'emoji_reply'(仅表情回复)。",
- },
- "reasoning": {"type": "string", "description": "做出此决定的简要理由。"},
- "emoji_query": {
- "type": "string",
- "description": "如果行动是'emoji_reply',指定表情的主题或概念。如果行动是'text_reply'且希望在文本后追加表情,也在此指定表情主题。",
+# 默认动作定义
+DEFAULT_ACTIONS = {
+ "no_reply": "不回复",
+ "text_reply": "文本回复, 可选附带表情",
+ "emoji_reply": "仅表情回复"
+}
+
+class ActionManager:
+ """动作管理器:控制每次决策可以使用的动作"""
+
+ def __init__(self):
+ # 初始化为默认动作集
+ self._available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy()
+
+ def get_available_actions(self) -> Dict[str, str]:
+ """获取当前可用的动作集"""
+ return self._available_actions
+
+ def add_action(self, action_name: str, description: str) -> bool:
+ """
+ 添加新的动作
+
+ 参数:
+ action_name: 动作名称
+ description: 动作描述
+
+ 返回:
+ bool: 是否添加成功
+ """
+ if action_name in self._available_actions:
+ return False
+ self._available_actions[action_name] = description
+ return True
+
+ def remove_action(self, action_name: str) -> bool:
+ """
+ 移除指定动作
+
+ 参数:
+ action_name: 动作名称
+
+ 返回:
+ bool: 是否移除成功
+ """
+ if action_name not in self._available_actions:
+ return False
+ del self._available_actions[action_name]
+ return True
+
+ def clear_actions(self):
+ """清空所有动作"""
+ self._available_actions.clear()
+
+ def reset_to_default(self):
+ """重置为默认动作集"""
+ self._available_actions = DEFAULT_ACTIONS.copy()
+
+ def get_planner_tool_definition(self) -> List[Dict[str, Any]]:
+ """获取当前动作集对应的规划器工具定义"""
+ return [{
+ "type": "function",
+ "function": {
+ "name": "decide_reply_action",
+ "description": "根据当前聊天内容和上下文,决定机器人是否应该回复以及如何回复。",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {
+ "type": "string",
+ "enum": list(self._available_actions.keys()),
+ "description": "决定采取的行动:" +
+ ", ".join([f"'{k}'({v})" for k, v in self._available_actions.items()]),
+ },
+ "reasoning": {"type": "string", "description": "做出此决定的简要理由。"},
+ "emoji_query": {
+ "type": "string",
+ "description": "如果行动是'emoji_reply',指定表情的主题或概念。如果行动是'text_reply'且希望在文本后追加表情,也在此指定表情主题。",
+ },
},
+ "required": ["action", "reasoning"],
},
- "required": ["action", "reasoning"],
},
- },
- }
-]
+ }]
+
+
+# 在文件开头添加自定义异常类
+class HeartFCError(Exception):
+ """麦麦聊天系统基础异常类"""
+ pass
+
+class PlannerError(HeartFCError):
+ """规划器异常"""
+ pass
+
+class ReplierError(HeartFCError):
+ """回复器异常"""
+ pass
+
+class SenderError(HeartFCError):
+ """发送器异常"""
+ pass
+
+
+class CycleInfo:
+ """循环信息记录类"""
+ def __init__(self, cycle_id: int):
+ self.cycle_id = cycle_id
+ self.start_time = time.time()
+ self.end_time: Optional[float] = None
+ self.action_taken = False
+ self.action_type = "unknown"
+ self.reasoning = ""
+ self.timers: Dict[str, float] = {}
+ self.thinking_id = ""
+
+ # 添加响应信息相关字段
+ self.response_info: Dict[str, Any] = {
+ "response_text": [], # 回复的文本列表
+ "emoji_info": "", # 表情信息
+ "anchor_message_id": "", # 锚点消息ID
+ "reply_message_ids": [], # 回复消息ID列表
+ "sub_mind_thinking": "", # 子思维思考内容
+ }
+
+ def to_dict(self) -> Dict[str, Any]:
+ """将循环信息转换为字典格式"""
+ return {
+ "cycle_id": self.cycle_id,
+ "start_time": self.start_time,
+ "end_time": self.end_time,
+ "action_taken": self.action_taken,
+ "action_type": self.action_type,
+ "reasoning": self.reasoning,
+ "timers": self.timers,
+ "thinking_id": self.thinking_id,
+ "response_info": self.response_info
+ }
+
+ def complete_cycle(self):
+ """完成循环,记录结束时间"""
+ self.end_time = time.time()
+
+ def set_action_info(self, action_type: str, reasoning: str, action_taken: bool):
+ """设置动作信息"""
+ self.action_type = action_type
+ self.reasoning = reasoning
+ self.action_taken = action_taken
+
+ def set_thinking_id(self, thinking_id: str):
+ """设置思考消息ID"""
+ self.thinking_id = thinking_id
+
+ def set_response_info(self,
+ response_text: Optional[List[str]] = None,
+ emoji_info: Optional[str] = None,
+ anchor_message_id: Optional[str] = None,
+ reply_message_ids: Optional[List[str]] = None,
+ sub_mind_thinking: Optional[str] = None):
+ """设置响应信息"""
+ if response_text is not None:
+ self.response_info["response_text"] = response_text
+ if emoji_info is not None:
+ self.response_info["emoji_info"] = emoji_info
+ if anchor_message_id is not None:
+ self.response_info["anchor_message_id"] = anchor_message_id
+ if reply_message_ids is not None:
+ self.response_info["reply_message_ids"] = reply_message_ids
+ if sub_mind_thinking is not None:
+ self.response_info["sub_mind_thinking"] = sub_mind_thinking
class HeartFChatting:
@@ -79,7 +226,13 @@ class HeartFChatting:
self.stream_id: str = chat_id # 聊天流ID
self.chat_stream: Optional[ChatStream] = None # 关联的聊天流
self.sub_mind: SubMind = sub_mind # 关联的子思维
- self.observations: Observation = observations # 关联的观察
+ self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态
+
+ # 日志前缀
+ self.log_prefix: str = f"[{chat_manager.get_stream_name(chat_id) or chat_id}]"
+
+ # 动作管理器
+ self.action_manager = ActionManager()
# 初始化状态控制
self._initialized = False # 是否已初始化标志
@@ -101,331 +254,487 @@ class HeartFChatting:
self._loop_active: bool = False # 循环是否正在运行
self._loop_task: Optional[asyncio.Task] = None # 主循环任务
- def _get_log_prefix(self) -> str:
- """获取日志前缀,包含可读的流名称"""
- stream_name = chat_manager.get_stream_name(self.stream_id) or self.stream_id
- return f"[{stream_name}]"
+ # 添加循环信息管理相关的属性
+ self._cycle_counter = 0
+ self._cycle_history: Deque[CycleInfo] = deque(maxlen=10) # 保留最近10个循环的信息
+ self._current_cycle: Optional[CycleInfo] = None
async def _initialize(self) -> bool:
"""
- 懒初始化以使用提供的标识符解析chat_stream和sub_hf。
+ 懒初始化以使用提供的标识符解析chat_stream。
确保实例已准备好处理触发器。
"""
if self._initialized:
return True
- log_prefix = self._get_log_prefix() # 获取前缀
- try:
- self.chat_stream = chat_manager.get_stream(self.stream_id)
- if not self.chat_stream:
- logger.error(f"{log_prefix} 获取ChatStream失败。")
- return False
- self._initialized = True
- logger.info(f"麦麦感觉到了,激发了HeartFChatting{log_prefix} 初始化成功。")
- return True
- except Exception as e:
- logger.error(f"{log_prefix} 初始化失败: {e}")
- logger.error(traceback.format_exc())
+ self.chat_stream = chat_manager.get_stream(self.stream_id)
+ if not self.chat_stream:
+ logger.error(f"{self.log_prefix} 获取ChatStream失败。")
return False
+ # 更新日志前缀(以防流名称发生变化)
+ self.log_prefix = f"[{chat_manager.get_stream_name(self.stream_id) or self.stream_id}]"
+
+ self._initialized = True
+ logger.info(f"麦麦感觉到了,可以开始激情水群{self.log_prefix} ")
+ return True
+
async def start(self):
"""
- 显式尝试启动 HeartFChatting 的主循环。
- 如果循环未激活,则启动循环。
+ 启动 HeartFChatting 的主循环。
+ 注意:调用此方法前必须确保已经成功初始化。
"""
- log_prefix = self._get_log_prefix()
- if not self._initialized:
- if not await self._initialize():
- logger.error(f"{log_prefix} 无法启动循环: 初始化失败。")
- return
- logger.info(f"{log_prefix} 尝试显式启动循环...")
+ logger.info(f"{self.log_prefix} 开始激情水群(HFC)...")
await self._start_loop_if_needed()
async def _start_loop_if_needed(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
- log_prefix = self._get_log_prefix()
- should_start_loop = False
- # 直接检查是否激活,无需检查计时器
- if not self._loop_active:
- should_start_loop = True
- self._loop_active = True # 标记为活动,防止重复启动
+ # 如果循环已经激活,直接返回
+ if self._loop_active:
+ return
- if should_start_loop:
- # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False)
- if self._loop_task and not self._loop_task.done():
- logger.warning(f"{log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。")
- self._loop_task.cancel()
- try:
- # 等待旧任务确实被取消
- await asyncio.wait_for(self._loop_task, timeout=0.5)
- except (asyncio.CancelledError, asyncio.TimeoutError):
- pass # 忽略取消或超时错误
- self._loop_task = None # 清理旧任务引用
+ # 标记为活动状态,防止重复启动
+ self._loop_active = True
- logger.info(f"{log_prefix} 循环未激活,启动主循环...")
- # 创建新的循环任务
- self._loop_task = asyncio.create_task(self._run_pf_loop())
- # 添加完成回调
- self._loop_task.add_done_callback(self._handle_loop_completion)
- # else:
- # logger.trace(f"{log_prefix} 不需要启动循环(已激活)") # 可以取消注释以进行调试
+ # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False)
+ if self._loop_task and not self._loop_task.done():
+ logger.warning(f"{self.log_prefix} 发现之前的循环任务仍在运行(不符合预期)。取消旧任务。")
+ self._loop_task.cancel()
+ try:
+ # 等待旧任务确实被取消
+ await asyncio.wait_for(self._loop_task, timeout=0.5)
+ except (asyncio.CancelledError, asyncio.TimeoutError):
+ pass # 忽略取消或超时错误
+ self._loop_task = None # 清理旧任务引用
+
+ logger.info(f"{self.log_prefix} 启动激情水群(HFC)主循环...")
+ # 创建新的循环任务
+ self._loop_task = asyncio.create_task(self._hfc_loop())
+ # 添加完成回调
+ self._loop_task.add_done_callback(self._handle_loop_completion)
def _handle_loop_completion(self, task: asyncio.Task):
- """当 _run_pf_loop 任务完成时执行的回调。"""
- log_prefix = self._get_log_prefix()
+ """当 _hfc_loop 任务完成时执行的回调。"""
try:
exception = task.exception()
if exception:
- logger.error(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天(异常): {exception}")
+ logger.error(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天(异常): {exception}")
logger.error(traceback.format_exc()) # Log full traceback for exceptions
else:
# Loop completing normally now means it was cancelled/shutdown externally
- logger.info(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天 (外部停止)")
+ logger.info(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天 (外部停止)")
except asyncio.CancelledError:
- logger.info(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天(任务取消)")
+ logger.info(f"{self.log_prefix} HeartFChatting: 麦麦脱离了聊天(任务取消)")
finally:
self._loop_active = False
self._loop_task = None
if self._processing_lock.locked():
- logger.warning(f"{log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。")
+ logger.warning(f"{self.log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。")
self._processing_lock.release()
- async def _run_pf_loop(self):
- """
- 主循环,持续进行计划并可能回复消息,直到被外部取消。
- 管理每个循环周期的处理锁。
- """
- log_prefix = self._get_log_prefix()
- logger.info(f"{log_prefix} HeartFChatting: 麦麦打算好好聊聊 (进入专注模式)")
+ async def _hfc_loop(self):
+ """主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:
- thinking_id = ""
- while True: # Loop indefinitely until cancelled
- cycle_timers = {} # <--- Initialize timers dict for this cycle
-
- # Access MessageManager directly
- if message_manager.check_if_sending_message_exist(self.stream_id, thinking_id):
- # logger.info(f"{log_prefix} HeartFChatting: 麦麦还在发消息,等会再规划")
- await asyncio.sleep(1)
- continue
- else:
- # logger.info(f"{log_prefix} HeartFChatting: 麦麦不发消息了,开始规划")
- pass
-
- # 记录循环周期开始时间,用于计时和休眠计算
+ while True: # 主循环
+ # 创建新的循环信息
+ self._cycle_counter += 1
+ self._current_cycle = CycleInfo(self._cycle_counter)
+
+ # 初始化周期状态
+ cycle_timers = {}
loop_cycle_start_time = time.monotonic()
- action_taken_this_cycle = False
- acquired_lock = False
- planner_start_db_time = 0.0 # 初始化
-
- try:
- with Timer("Total Cycle", cycle_timers) as _total_timer: # <--- Start total cycle timer
- # Use try_acquire pattern or timeout?
- await self._processing_lock.acquire()
- acquired_lock = True
- # logger.debug(f"{log_prefix} HeartFChatting: 循环获取到处理锁")
-
- # 在规划前记录数据库时间戳
+
+ with Timer("Total Cycle", cycle_timers):
+ # 执行规划和处理阶段
+ async with self._get_cycle_context() as acquired_lock:
+ if not acquired_lock:
+ continue
+
+ # 记录规划开始时间点
planner_start_db_time = time.time()
-
- # --- Planner --- #
- planner_result = {}
- with Timer("Planner", cycle_timers): # <--- Start Planner timer
- planner_result = await self._planner()
- action = planner_result.get("action", "error")
- reasoning = planner_result.get("reasoning", "Planner did not provide reasoning.")
- emoji_query = planner_result.get("emoji_query", "")
- llm_error = planner_result.get("llm_error", False)
-
- if llm_error:
- logger.error(f"{log_prefix} Planner LLM 失败,跳过本周期回复尝试。理由: {reasoning}")
- # Optionally add a longer sleep?
- action_taken_this_cycle = False # Ensure no action is counted
- # Continue to sleep logic
-
- elif action == "text_reply":
- logger.debug(f"{log_prefix} HeartFChatting: 麦麦决定回复文本. 理由: {reasoning}")
- action_taken_this_cycle = True
- anchor_message = await self._get_anchor_message()
- if not anchor_message:
- logger.error(f"{log_prefix} 循环: 无法获取锚点消息用于回复. 跳过周期.")
- else:
- # --- Create Thinking Message (Moved) ---
- thinking_id = await self._create_thinking_message(anchor_message)
- if not thinking_id:
- logger.error(f"{log_prefix} 循环: 无法创建思考ID. 跳过周期.")
- else:
- replier_result = None
- try:
- # --- Replier Work --- #
- with Timer("Replier", cycle_timers): # <--- Start Replier timer
- replier_result = await self._replier_work(
- anchor_message=anchor_message,
- thinking_id=thinking_id,
- reason=reasoning,
- )
- except Exception as e_replier:
- logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}")
- # self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call
-
- if replier_result:
- # --- Sender Work --- #
- try:
- with Timer("Sender", cycle_timers): # <--- Start Sender timer
- await self._sender(
- thinking_id=thinking_id,
- anchor_message=anchor_message,
- response_set=replier_result,
- send_emoji=emoji_query,
- )
- # logger.info(f"{log_prefix} 循环: 发送器完成成功.")
- except Exception as e_sender:
- logger.error(f"{log_prefix} 循环: 发送器失败: {e_sender}")
- # _sender should handle cleanup, but double check
- # self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call
- else:
- logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.")
- # self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call
- elif action == "emoji_reply":
- logger.info(
- f"{log_prefix} HeartFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}"
+
+ # 执行规划阶段
+ with Timer("Planning Phase", cycle_timers):
+ action_taken, thinking_id = await self._think_plan_execute(
+ cycle_timers, planner_start_db_time
)
- action_taken_this_cycle = True
- anchor = await self._get_anchor_message()
- if anchor:
- try:
- # --- Handle Emoji (Moved) --- #
- with Timer("Emoji Handler", cycle_timers): # <--- Start Emoji timer
- await self._handle_emoji(anchor, [], emoji_query)
- except Exception as e_emoji:
- logger.error(f"{log_prefix} 循环: 发送表情失败: {e_emoji}")
- else:
- logger.warning(f"{log_prefix} 循环: 无法发送表情, 无法获取锚点.")
- action_taken_this_cycle = True # 即使发送失败,Planner 也决策了动作
+
+ # 更新循环信息
+ self._current_cycle.set_thinking_id(thinking_id)
+ self._current_cycle.timers = cycle_timers
- elif action == "no_reply":
- logger.info(f"{log_prefix} HeartFChatting: 麦麦决定不回复. 原因: {reasoning}")
- action_taken_this_cycle = False # 标记为未执行动作
- # --- 新增:等待新消息 ---
- logger.debug(f"{log_prefix} HeartFChatting: 开始等待新消息 (自 {planner_start_db_time})...")
- observation = None
-
- observation = self.observations[0]
-
- if observation:
- with Timer("Wait New Msg", cycle_timers): # <--- Start Wait timer
- wait_start_time = time.monotonic()
- while True:
- # 检查是否有新消息
- has_new = await observation.has_new_messages_since(planner_start_db_time)
- if has_new:
- logger.info(f"{log_prefix} HeartFChatting: 检测到新消息,结束等待。")
- break # 收到新消息,退出等待
-
- # 检查等待是否超时(例如,防止无限等待)
- if time.monotonic() - wait_start_time > 60: # 等待60秒示例
- logger.warning(f"{log_prefix} HeartFChatting: 等待新消息超时(60秒)。")
- break # 超时退出
-
- # 等待一段时间再检查
- try:
- await asyncio.sleep(1.5) # 检查间隔
- except asyncio.CancelledError:
- logger.info(f"{log_prefix} 等待新消息的 sleep 被中断。")
- raise # 重新抛出取消错误,以便外层循环处理
- else:
- logger.warning(
- f"{log_prefix} HeartFChatting: 无法获取 Observation 实例,无法等待新消息。"
- )
- # --- 等待结束 ---
-
- elif action == "error": # Action specifically set to error by planner
- logger.error(f"{log_prefix} HeartFChatting: Planner返回错误状态. 原因: {reasoning}")
- action_taken_this_cycle = False
-
- else: # Unknown action from planner
- logger.warning(
- f"{log_prefix} HeartFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}"
- )
- action_taken_this_cycle = False
-
- # --- Print Timer Results --- #
- if cycle_timers: # 先检查cycle_timers是否非空
- timer_strings = []
- for name, elapsed in cycle_timers.items():
- # 直接格式化存储在字典中的浮点数 elapsed
- formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
- timer_strings.append(f"{name}: {formatted_time}")
-
- if timer_strings: # 如果有有效计时器数据才打印
- logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
-
- # --- Timer Decrement Removed --- #
- cycle_duration = time.monotonic() - loop_cycle_start_time
-
- except Exception as e_cycle:
- logger.error(f"{log_prefix} 循环周期执行时发生错误: {e_cycle}")
- logger.error(traceback.format_exc())
- if acquired_lock and self._processing_lock.locked():
- self._processing_lock.release()
- acquired_lock = False
- logger.warning(f"{log_prefix} 由于循环周期中的错误释放了处理锁.")
-
- finally:
- if acquired_lock:
- self._processing_lock.release()
- # logger.trace(f"{log_prefix} 循环释放了处理锁.") # Reduce noise
-
- if cycle_duration > 0.1:
- logger.debug(f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s.")
-
- # --- Delay --- #
- try:
- sleep_duration = 0.0
- if not action_taken_this_cycle and cycle_duration < 1.5:
- sleep_duration = 1.5 - cycle_duration
- elif cycle_duration < 0.2: # Keep minimal sleep even after action
- sleep_duration = 0.2
-
- if sleep_duration > 0:
- # logger.debug(f"{log_prefix} Sleeping for {sleep_duration:.2f}s")
- await asyncio.sleep(sleep_duration)
-
- except asyncio.CancelledError:
- logger.info(f"{log_prefix} Sleep interrupted, loop likely cancelling.")
- break # Exit loop immediately on cancellation
+ # 防止循环过快消耗资源
+ with Timer("Cycle Delay", cycle_timers):
+ await self._handle_cycle_delay(action_taken, loop_cycle_start_time, self.log_prefix)
+
+ # 等待直到所有消息都发送完成
+ with Timer("Wait Messages Complete", cycle_timers):
+ while await self._should_skip_cycle(thinking_id):
+ await asyncio.sleep(0.2)
+
+ # 完成当前循环并保存历史
+ self._current_cycle.complete_cycle()
+ self._cycle_history.append(self._current_cycle)
+
+ # 记录循环信息和计时器结果
+ timer_strings = []
+ for name, elapsed in cycle_timers.items():
+ formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
+ timer_strings.append(f"{name}: {formatted_time}")
+
+ logger.debug(
+ f"{self.log_prefix} 循环 #{self._current_cycle.cycle_id} 完成, "
+ f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.2f}秒, "
+ f"动作: {self._current_cycle.action_type}"
+ + (f"\n计时器详情: {'; '.join(timer_strings)}" if timer_strings else "")
+ )
except asyncio.CancelledError:
- logger.info(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环被取消了")
- except Exception as e_loop_outer:
- logger.error(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环意外出错: {e_loop_outer}")
+ logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的激情水群(HFC)被取消了")
+ except Exception as e:
+ logger.error(f"{self.log_prefix} HeartFChatting: 意外错误: {e}")
logger.error(traceback.format_exc())
+
+ @contextlib.asynccontextmanager
+ async def _get_cycle_context(self):
+ """
+ 循环周期的上下文管理器
+
+ 用于确保资源的正确获取和释放:
+ 1. 获取处理锁
+ 2. 执行操作
+ 3. 释放锁
+ """
+ acquired = False
+ try:
+ await self._processing_lock.acquire()
+ acquired = True
+ yield acquired
finally:
- # State reset is primarily handled by _handle_loop_completion callback
- logger.info(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环结束。")
+ if acquired and self._processing_lock.locked():
+ self._processing_lock.release()
- async def _planner(self) -> Dict[str, Any]:
+ async def _check_new_messages(self, start_time: float) -> bool:
"""
- 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。
+ 检查从指定时间点后是否有新消息
+
+ 参数:
+ start_time: 开始检查的时间点
+
+ 返回:
+ bool: 是否有新消息
"""
- log_prefix = self._get_log_prefix()
- observed_messages: List[dict] = []
+ try:
+ new_msg_count = num_new_messages_since(self.stream_id, start_time)
+ if new_msg_count > 0:
+ logger.info(f"{self.log_prefix} 检测到{new_msg_count}条新消息")
+ return True
+ return False
+ except Exception as e:
+ logger.error(f"{self.log_prefix} 检查新消息时出错: {e}")
+ return False
- current_mind: Optional[str] = None
- llm_error = False
+ async def _think_plan_execute(
+ self, cycle_timers: dict, planner_start_db_time: float
+ ) -> tuple[bool, str]:
+ """执行规划阶段"""
+ try:
+ # 获取子思维思考结果
+ current_mind = ""
+ with Timer("SubMind Thinking", cycle_timers):
+ current_mind = await self._get_submind_thinking()
+ # 记录子思维思考内容
+ if self._current_cycle:
+ self._current_cycle.set_response_info(sub_mind_thinking=current_mind)
+
+ # 执行规划
+ with Timer("Planner", cycle_timers):
+ planner_result = await self._planner(current_mind)
+
+ # 在获取规划结果后检查新消息
+ if await self._check_new_messages(planner_start_db_time):
+ # 更新循环信息
+ logger.info(f"{self.log_prefix} 思考到一半,检测到新消息,重新思考")
+ self._current_cycle.set_action_info("new_messages", "检测到新消息", False)
+ return False, "new_messages"
+
+ # 解析规划结果
+ action = planner_result.get("action", "error")
+ reasoning = planner_result.get("reasoning", "未提供理由")
+
+ # 更新循环信息
+ self._current_cycle.set_action_info(action, reasoning, True)
+
+ # 处理LLM错误
+ if planner_result.get("llm_error"):
+ logger.error(f"{self.log_prefix} LLM失败: {reasoning}")
+ return False, ""
+
+ # 根据动作类型执行对应处理
+ return await self._handle_action(action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time)
+
+ except PlannerError as e:
+ logger.error(f"{self.log_prefix} 规划错误: {e}")
+ # 更新循环信息
+ self._current_cycle.set_action_info("error", str(e), False)
+ return False, ""
+ async def _handle_action(
+ self,
+ action: str,
+ reasoning: str,
+ emoji_query: str,
+ cycle_timers: dict,
+ planner_start_db_time: float
+ ) -> tuple[bool, str]:
+ """
+ 处理规划动作
+
+ 参数:
+ action: 动作类型
+ reasoning: 决策理由
+ emoji_query: 表情查询
+ cycle_timers: 计时器字典
+ planner_start_db_time: 规划开始时间
+
+ 返回:
+ tuple[bool, str]: (是否执行了动作, 思考消息ID)
+ """
+ action_handlers = {
+ "text_reply": self._handle_text_reply,
+ "emoji_reply": self._handle_emoji_reply,
+ "no_reply": self._handle_no_reply
+ }
+
+ handler = action_handlers.get(action)
+ if not handler:
+ logger.warning(f"{self.log_prefix} 未知动作: {action}, 原因: {reasoning}")
+ return False, ""
+
+ try:
+ if action == "text_reply":
+ return await handler(reasoning, emoji_query, cycle_timers)
+ elif action == "emoji_reply":
+ return await handler(reasoning, emoji_query), ""
+ else: # no_reply
+ return await handler(reasoning, planner_start_db_time, cycle_timers), ""
+ except HeartFCError as e:
+ logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
+ return False, ""
+
+ async def _handle_text_reply(
+ self, reasoning: str, emoji_query: str, cycle_timers: dict
+ ) -> tuple[bool, str]:
+ """
+ 处理文本回复
+
+ 工作流程:
+ 1. 获取锚点消息
+ 2. 创建思考消息
+ 3. 生成回复
+ 4. 发送消息
+
+ 参数:
+ reasoning: 回复原因
+ emoji_query: 表情查询
+ cycle_timers: 计时器字典
+
+ 返回:
+ tuple[bool, str]: (是否回复成功, 思考消息ID)
+ """
+
+ # 获取锚点消息
+ anchor_message = await self._get_anchor_message()
+ if not anchor_message:
+ raise PlannerError("无法获取锚点消息")
+
+ # 创建思考消息
+ thinking_id = await self._create_thinking_message(anchor_message)
+ if not thinking_id:
+ raise PlannerError("无法创建思考消息")
+
+ try:
+ # 生成回复
+ with Timer("Replier", cycle_timers):
+ reply = await self._replier_work(
+ anchor_message=anchor_message,
+ thinking_id=thinking_id,
+ reason=reasoning,
+ )
+
+ if not reply:
+ raise ReplierError("回复生成失败")
+
+ # 发送消息
+ with Timer("Sender", cycle_timers):
+ await self._sender(
+ thinking_id=thinking_id,
+ anchor_message=anchor_message,
+ response_set=reply,
+ send_emoji=emoji_query,
+ )
+
+ return True, thinking_id
+
+ except (ReplierError, SenderError) as e:
+ logger.error(f"{self.log_prefix} 回复失败: {e}")
+ return True, thinking_id # 仍然返回thinking_id以便跟踪
+
+ async def _handle_emoji_reply(self, reasoning: str, emoji_query: str) -> bool:
+ """
+ 处理表情回复
+
+ 工作流程:
+ 1. 获取锚点消息
+ 2. 发送表情
+
+ 参数:
+ reasoning: 回复原因
+ emoji_query: 表情查询
+
+ 返回:
+ bool: 是否发送成功
+ """
+ logger.info(f"{self.log_prefix} 决定回复表情({emoji_query}): {reasoning}")
+
+ try:
+ anchor = await self._get_anchor_message()
+ if not anchor:
+ raise PlannerError("无法获取锚点消息")
+
+ await self._handle_emoji(anchor, [], emoji_query)
+ return True
+
+ except Exception as e:
+ logger.error(f"{self.log_prefix} 表情发送失败: {e}")
+ return False
+
+ async def _handle_no_reply(
+ self, reasoning: str, planner_start_db_time: float, cycle_timers: dict
+ ) -> bool:
+ """
+ 处理不回复的情况
+
+ 工作流程:
+ 1. 等待新消息
+ 2. 超时或收到新消息时返回
+
+ 参数:
+ reasoning: 不回复的原因
+ planner_start_db_time: 规划开始时间
+ cycle_timers: 计时器字典
+
+ 返回:
+ bool: 是否成功处理
+ """
+ logger.info(f"{self.log_prefix} 决定不回复: {reasoning}")
+
+ observation = self.observations[0] if self.observations else None
+
+ try:
+ with Timer("Wait New Msg", cycle_timers):
+ return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
+ except asyncio.CancelledError:
+ logger.info(f"{self.log_prefix} 等待被中断")
+ raise
+
+ async def _wait_for_new_message(
+ self, observation, planner_start_db_time: float, log_prefix: str
+ ) -> bool:
+ """
+ 等待新消息
+
+ 参数:
+ observation: 观察实例
+ planner_start_db_time: 开始等待的时间
+ log_prefix: 日志前缀
+
+ 返回:
+ bool: 是否检测到新消息
+ """
+ wait_start_time = time.monotonic()
+ while True:
+ if await observation.has_new_messages_since(planner_start_db_time):
+ logger.info(f"{log_prefix} 检测到新消息")
+ return True
+
+ if time.monotonic() - wait_start_time > 60:
+ logger.warning(f"{log_prefix} 等待超时(60秒)")
+ return False
+
+ await asyncio.sleep(1.5)
+
+ async def _should_skip_cycle(self, thinking_id: str) -> bool:
+ """检查是否应该跳过当前循环周期"""
+ return message_manager.check_if_sending_message_exist(self.stream_id, thinking_id)
+
+ async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str):
+ """记录循环周期的计时器结果"""
+ if cycle_timers:
+ timer_strings = []
+ for name, elapsed in cycle_timers.items():
+ formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒"
+ timer_strings.append(f"{name}: {formatted_time}")
+
+ if timer_strings:
+ logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}")
+
+ async def _handle_cycle_delay(
+ self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str
+ ):
+ """处理循环延迟"""
+ cycle_duration = time.monotonic() - cycle_start_time
+ if cycle_duration > 0.1:
+ logger.debug(f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s.")
+
+ try:
+ sleep_duration = 0.0
+ if not action_taken_this_cycle and cycle_duration < 1:
+ sleep_duration = 1 - cycle_duration
+ elif cycle_duration < 0.2:
+ sleep_duration = 0.2
+
+ if sleep_duration > 0:
+ await asyncio.sleep(sleep_duration)
+
+ except asyncio.CancelledError:
+ logger.info(f"{log_prefix} Sleep interrupted, loop likely cancelling.")
+ raise
+
+ async def _get_submind_thinking(self) -> str:
+ """
+ 获取子思维的思考结果
+
+ 返回:
+ str: 思考结果,如果思考失败则返回错误信息
+ """
try:
observation = self.observations[0]
await observation.observe()
+ current_mind, _past_mind = await self.sub_mind.do_thinking_before_reply()
+ return current_mind
+ except Exception as e:
+ logger.error(f"{self.log_prefix}[SubMind] 思考失败: {e}")
+ logger.error(traceback.format_exc())
+ return "[思考时出错]"
+
+ async def _planner(self, current_mind: str) -> Dict[str, Any]:
+ """
+ 规划器 (Planner): 使用LLM根据上下文决定是否和如何回复。
+
+ 参数:
+ current_mind: 子思维的当前思考结果
+ """
+ logger.info(f"{self.log_prefix}[Planner] 开始执行规划器")
+
+ planner_timers = {} # 用于存储各阶段计时结果
+
+ # 获取观察信息
+ with Timer("获取观察信息", planner_timers):
+ observation = self.observations[0]
+ # await observation.observe()
observed_messages = observation.talking_message
observed_messages_str = observation.talking_message_str
- except Exception as e:
- logger.error(f"{log_prefix}[Planner] 获取观察信息时出错: {e}")
-
- try:
- current_mind, _past_mind = await self.sub_mind.do_thinking_before_reply()
- except Exception as e_subhf:
- logger.error(f"{log_prefix}[Planner] SubHeartflow 思考失败: {e_subhf}")
- current_mind = "[思考时出错]"
# --- 使用 LLM 进行决策 --- #
action = "no_reply" # 默认动作
@@ -434,54 +743,65 @@ class HeartFChatting:
llm_error = False # LLM错误标志
try:
- prompt = await self._build_planner_prompt(
- observed_messages_str, current_mind, self.sub_mind.structured_info
- )
- payload = {
- "model": self.planner_llm.model_name,
- "messages": [{"role": "user", "content": prompt}],
- "tools": PLANNER_TOOL_DEFINITION,
- "tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}},
- }
-
- # 执行LLM请求
- try:
- response = await self.planner_llm._execute_request(
- endpoint="/chat/completions", payload=payload, prompt=prompt
+ # 构建提示词
+ with Timer("构建提示词", planner_timers):
+ prompt = await self._build_planner_prompt(
+ observed_messages_str, current_mind, self.sub_mind.structured_info
)
- except Exception as req_e:
- logger.error(f"{log_prefix}[Planner] LLM请求执行失败: {req_e}")
- return {
- "action": "error",
- "reasoning": f"LLM请求执行失败: {req_e}",
- "emoji_query": "",
- "current_mind": current_mind,
- "observed_messages": observed_messages,
- "llm_error": True,
+ payload = {
+ "model": self.planner_llm.model_name,
+ "messages": [{"role": "user", "content": prompt}],
+ "tools": self.action_manager.get_planner_tool_definition(),
+ "tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}},
}
- # 使用辅助函数处理工具调用响应
- success, arguments, error_msg = process_llm_tool_response(
- response, expected_tool_name="decide_reply_action", log_prefix=f"{log_prefix}[Planner] "
- )
+ # 执行LLM请求
+ with Timer("LLM请求", planner_timers):
+ try:
+ response = await self.planner_llm._execute_request(
+ endpoint="/chat/completions", payload=payload, prompt=prompt
+ )
+ except Exception as req_e:
+ logger.error(f"{self.log_prefix}[Planner] LLM请求执行失败: {req_e}")
+ return {
+ "action": "error",
+ "reasoning": f"LLM请求执行失败: {req_e}",
+ "emoji_query": "",
+ "current_mind": current_mind,
+ "observed_messages": observed_messages,
+ "llm_error": True,
+ }
- if success:
- # 提取决策参数
- action = arguments.get("action", "no_reply")
- reasoning = arguments.get("reasoning", "未提供理由")
- emoji_query = arguments.get("emoji_query", "")
+ # 处理LLM响应
+ with Timer("处理LLM响应", planner_timers):
+ # 使用辅助函数处理工具调用响应
+ success, arguments, error_msg = process_llm_tool_response(
+ response, expected_tool_name="decide_reply_action", log_prefix=f"{self.log_prefix}[Planner] "
+ )
- # 记录决策结果
- logger.debug(f"{log_prefix}[Planner] 决策结果: {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'")
- else:
- # 处理工具调用失败
- logger.warning(f"{log_prefix}[Planner] {error_msg}")
- action = "error"
- reasoning = error_msg
- llm_error = True
+ if success:
+ # 提取决策参数
+ action = arguments.get("action", "no_reply")
+ # 验证动作是否在可用动作集中
+ if action not in self.action_manager.get_available_actions():
+ logger.warning(f"{self.log_prefix}[Planner] LLM返回了未授权的动作: {action},使用默认动作no_reply")
+ action = "no_reply"
+ reasoning = f"LLM返回了未授权的动作: {action}"
+ else:
+ reasoning = arguments.get("reasoning", "未提供理由")
+ emoji_query = arguments.get("emoji_query", "")
+
+ # 记录决策结果
+ logger.debug(f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果: {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'")
+ else:
+ # 处理工具调用失败
+ logger.warning(f"{self.log_prefix}[Planner] {error_msg}")
+ action = "error"
+ reasoning = error_msg
+ llm_error = True
except Exception as llm_e:
- logger.error(f"{log_prefix}[Planner] Planner LLM处理过程中出错: {llm_e}")
+ logger.error(f"{self.log_prefix}[Planner] Planner LLM处理过程中出错: {llm_e}")
logger.error(traceback.format_exc()) # 记录完整堆栈以便调试
action = "error"
reasoning = f"LLM处理失败: {llm_e}"
@@ -524,12 +844,12 @@ class HeartFChatting:
anchor_message = MessageRecv(placeholder_msg_dict)
anchor_message.update_chat_stream(self.chat_stream)
logger.info(
- f"{self._get_log_prefix()} Created placeholder anchor message: ID={anchor_message.message_info.message_id}"
+ f"{self.log_prefix} Created placeholder anchor message: ID={anchor_message.message_info.message_id}"
)
return anchor_message
except Exception as e:
- logger.error(f"{self._get_log_prefix()} Error getting/creating anchor message: {e}")
+ logger.error(f"{self.log_prefix} Error getting/creating anchor message: {e}")
logger.error(traceback.format_exc())
return None
@@ -545,7 +865,7 @@ class HeartFChatting:
发送器 (Sender): 使用本类的方法发送生成的回复。
处理相关的操作,如发送表情和更新关系。
"""
- log_prefix = self._get_log_prefix()
+ logger.info(f"{self.log_prefix}开始发送回复")
first_bot_msg: Optional[MessageSending] = None
# 尝试发送回复消息
@@ -553,43 +873,42 @@ class HeartFChatting:
if first_bot_msg:
# --- 处理关联表情(如果指定) --- #
if send_emoji:
- logger.info(f"{log_prefix}[Sender-{thinking_id}] 正在发送关联表情: '{send_emoji}'")
+ logger.info(f"{self.log_prefix}正在发送关联表情: '{send_emoji}'")
# 优先使用first_bot_msg作为锚点,否则回退到原始锚点
emoji_anchor = first_bot_msg if first_bot_msg else anchor_message
await self._handle_emoji(emoji_anchor, response_set, send_emoji)
else:
- # logger.warning(f"{log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。")
+ # logger.warning(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。")
# 无需清理,因为_send_response_messages返回None意味着已处理/已删除
raise RuntimeError("发送回复失败,_send_response_messages返回None")
async def shutdown(self):
"""优雅关闭HeartFChatting实例,取消活动循环任务"""
- log_prefix = self._get_log_prefix()
- logger.info(f"{log_prefix} 正在关闭HeartFChatting...")
+ logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...")
# 取消循环任务
if self._loop_task and not self._loop_task.done():
- logger.info(f"{log_prefix} 正在取消HeartFChatting循环任务")
+ logger.info(f"{self.log_prefix} 正在取消HeartFChatting循环任务")
self._loop_task.cancel()
try:
await asyncio.wait_for(self._loop_task, timeout=1.0)
- logger.info(f"{log_prefix} HeartFChatting循环任务已取消")
+ logger.info(f"{self.log_prefix} HeartFChatting循环任务已取消")
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
except Exception as e:
- logger.error(f"{log_prefix} 取消循环任务出错: {e}")
+ logger.error(f"{self.log_prefix} 取消循环任务出错: {e}")
else:
- logger.info(f"{log_prefix} 没有活动的HeartFChatting循环任务")
+ logger.info(f"{self.log_prefix} 没有活动的HeartFChatting循环任务")
# 清理状态
self._loop_active = False
self._loop_task = None
if self._processing_lock.locked():
self._processing_lock.release()
- logger.warning(f"{log_prefix} 已释放处理锁")
+ logger.warning(f"{self.log_prefix} 已释放处理锁")
- logger.info(f"{log_prefix} HeartFChatting关闭完成")
+ logger.info(f"{self.log_prefix} HeartFChatting关闭完成")
async def _build_planner_prompt(
self, observed_messages_str: str, current_mind: Optional[str], structured_info: Dict[str, Any]
@@ -637,7 +956,6 @@ class HeartFChatting:
"""
回复器 (Replier): 核心逻辑用于生成回复。
"""
- log_prefix = self._get_log_prefix()
response_set: Optional[List[str]] = None
try:
response_set = await self.gpt_instance.generate_response(
@@ -647,15 +965,18 @@ class HeartFChatting:
message=anchor_message, # Pass anchor_message positionally (matches 'message' parameter)
thinking_id=thinking_id, # Pass thinking_id positionally
)
+
+
+
if not response_set:
- logger.warning(f"{log_prefix}[Replier-{thinking_id}] LLM生成了一个空回复集。")
+ logger.warning(f"{self.log_prefix}[Replier-{thinking_id}] LLM生成了一个空回复集。")
return None
return response_set
except Exception as e:
- logger.error(f"{log_prefix}[Replier-{thinking_id}] Unexpected error in replier_work: {e}")
+ logger.error(f"{self.log_prefix}[Replier-{thinking_id}] Unexpected error in replier_work: {e}")
logger.error(traceback.format_exc())
return None
@@ -663,7 +984,7 @@ class HeartFChatting:
async def _create_thinking_message(self, anchor_message: Optional[MessageRecv]) -> Optional[str]:
"""创建思考消息 (尝试锚定到 anchor_message)"""
if not anchor_message or not anchor_message.chat_stream:
- logger.error(f"{self._get_log_prefix()} 无法创建思考消息,缺少有效的锚点消息或聊天流。")
+ logger.error(f"{self.log_prefix} 无法创建思考消息,缺少有效的锚点消息或聊天流。")
return None
chat = anchor_message.chat_stream
@@ -692,9 +1013,16 @@ class HeartFChatting:
) -> Optional[MessageSending]:
"""发送回复消息 (尝试锚定到 anchor_message)"""
if not anchor_message or not anchor_message.chat_stream:
- logger.error(f"{self._get_log_prefix()} 无法发送回复,缺少有效的锚点消息或聊天流。")
+ logger.error(f"{self.log_prefix} 无法发送回复,缺少有效的锚点消息或聊天流。")
return None
+ # 记录锚点消息ID
+ if self._current_cycle and anchor_message:
+ self._current_cycle.set_response_info(
+ response_text=response_set,
+ anchor_message_id=anchor_message.message_info.message_id
+ )
+
chat = anchor_message.chat_stream
container = await message_manager.get_container(chat.stream_id)
thinking_message = None
@@ -704,7 +1032,7 @@ class HeartFChatting:
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
thinking_message = msg
container.messages.remove(msg) # Remove the message directly here
- logger.debug(f"{self._get_log_prefix()} Removed thinking message {thinking_id} via iteration.")
+ # logger.debug(f"{self.log_prefix} Removed thinking message {thinking_id} via iteration.")
break
if not thinking_message:
@@ -716,6 +1044,7 @@ class HeartFChatting:
message_set = MessageSet(chat, thinking_id)
mark_head = False
first_bot_msg = None
+ reply_message_ids = [] # 用于记录所有回复消息的ID
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
@@ -738,6 +1067,11 @@ class HeartFChatting:
mark_head = True
first_bot_msg = bot_message
message_set.add_message(bot_message)
+ reply_message_ids.append(bot_message.message_info.message_id)
+
+ # 记录回复消息ID列表
+ if self._current_cycle:
+ self._current_cycle.set_response_info(reply_message_ids=reply_message_ids)
# Access MessageManager directly
await message_manager.add_message(message_set)
@@ -745,9 +1079,8 @@ class HeartFChatting:
async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""):
"""处理表情包 (尝试锚定到 anchor_message)"""
-
if not anchor_message or not anchor_message.chat_stream:
- logger.error(f"{self._get_log_prefix()} 无法处理表情包,缺少有效的锚点消息或聊天流。")
+ logger.error(f"{self.log_prefix} 无法处理表情包,缺少有效的锚点消息或聊天流。")
return
chat = anchor_message.chat_stream
@@ -759,7 +1092,13 @@ class HeartFChatting:
emoji_raw = await emoji_manager.get_emoji_for_text(emoji_text_source)
if emoji_raw:
- emoji_path, _description = emoji_raw
+ emoji_path, description = emoji_raw
+ # 记录表情信息
+ if self._current_cycle:
+ self._current_cycle.set_response_info(
+ emoji_info=f"表情: {description}, 路径: {emoji_path}"
+ )
+
emoji_cq = image_path_to_base64(emoji_path)
thinking_time_point = round(time.time(), 2)
message_segment = Seg(type="emoji", data=emoji_cq)
@@ -780,3 +1119,24 @@ class HeartFChatting:
)
# Access MessageManager directly
await message_manager.add_message(bot_message)
+
+ def get_cycle_history(self, last_n: Optional[int] = None) -> List[Dict[str, Any]]:
+ """获取循环历史记录
+
+ 参数:
+ last_n: 获取最近n个循环的信息,如果为None则获取所有历史记录
+
+ 返回:
+ List[Dict[str, Any]]: 循环历史记录列表
+ """
+ history = list(self._cycle_history)
+ if last_n is not None:
+ history = history[-last_n:]
+ return [cycle.to_dict() for cycle in history]
+
+ def get_last_cycle_info(self) -> Optional[Dict[str, Any]]:
+ """获取最近一个循环的信息"""
+ if self._cycle_history:
+ return self._cycle_history[-1].to_dict()
+ return None
+
diff --git a/src/plugins/heartFC_chat/heartFC_generator.py b/src/plugins/heartFC_chat/heartFC_generator.py
index da43c334f..c489e012c 100644
--- a/src/plugins/heartFC_chat/heartFC_generator.py
+++ b/src/plugins/heartFC_chat/heartFC_generator.py
@@ -57,9 +57,6 @@ class HeartFCGenerator:
)
if model_response:
- logger.info(
- f"{global_config.BOT_NICKNAME}的回复是:{model_response},生成回复时间: {t_generate_response.human_readable}"
- )
model_processed_response = await self._process_response(model_response)
return model_processed_response
diff --git a/src/plugins/heartFC_chat/heartFC_readme.md b/src/plugins/heartFC_chat/heartFC_readme.md
new file mode 100644
index 000000000..07bc4c63c
--- /dev/null
+++ b/src/plugins/heartFC_chat/heartFC_readme.md
@@ -0,0 +1,159 @@
+# HeartFC_chat 工作原理文档
+
+HeartFC_chat 是一个基于心流理论的聊天系统,通过模拟人类的思维过程和情感变化来实现自然的对话交互。系统采用Plan-Replier-Sender循环机制,实现了智能化的对话决策和生成。
+
+## 核心工作流程
+
+### 1. 消息处理与存储 (HeartFCProcessor)
+[代码位置: src/plugins/heartFC_chat/heartflow_processor.py]
+
+消息处理器负责接收和预处理消息,主要完成以下工作:
+```mermaid
+graph TD
+ A[接收原始消息] --> B[解析为MessageRecv对象]
+ B --> C[消息缓冲处理]
+ C --> D[过滤检查]
+ D --> E[存储到数据库]
+```
+
+核心实现:
+- 消息处理入口:`process_message()` [行号: 38-215]
+ - 消息解析和缓冲:`message_buffer.start_caching_messages()` [行号: 63]
+ - 过滤检查:`_check_ban_words()`, `_check_ban_regex()` [行号: 196-215]
+ - 消息存储:`storage.store_message()` [行号: 108]
+
+### 2. 对话管理循环 (HeartFChatting)
+[代码位置: src/plugins/heartFC_chat/heartFC_chat.py]
+
+HeartFChatting是系统的核心组件,实现了完整的对话管理循环:
+
+```mermaid
+graph TD
+ A[Plan阶段] -->|决策是否回复| B[Replier阶段]
+ B -->|生成回复内容| C[Sender阶段]
+ C -->|发送消息| D[等待新消息]
+ D --> A
+```
+
+#### Plan阶段 [行号: 282-386]
+- 主要函数:`_planner()`
+- 功能实现:
+ * 获取观察信息:`observation.observe()` [行号: 297]
+ * 思维处理:`sub_mind.do_thinking_before_reply()` [行号: 301]
+ * LLM决策:使用`PLANNER_TOOL_DEFINITION`进行动作规划 [行号: 13-42]
+
+#### Replier阶段 [行号: 388-416]
+- 主要函数:`_replier_work()`
+- 调用生成器:`gpt_instance.generate_response()` [行号: 394]
+- 处理生成结果和错误情况
+
+#### Sender阶段 [行号: 418-450]
+- 主要函数:`_sender()`
+- 发送实现:
+ * 创建消息:`_create_thinking_message()` [行号: 452-477]
+ * 发送回复:`_send_response_messages()` [行号: 479-525]
+ * 处理表情:`_handle_emoji()` [行号: 527-567]
+
+### 3. 回复生成机制 (HeartFCGenerator)
+[代码位置: src/plugins/heartFC_chat/heartFC_generator.py]
+
+回复生成器负责产生高质量的回复内容:
+
+```mermaid
+graph TD
+ A[获取上下文信息] --> B[构建提示词]
+ B --> C[调用LLM生成]
+ C --> D[后处理优化]
+ D --> E[返回回复集]
+```
+
+核心实现:
+- 生成入口:`generate_response()` [行号: 39-67]
+ * 情感调节:`arousal_multiplier = MoodManager.get_instance().get_arousal_multiplier()` [行号: 47]
+ * 模型生成:`_generate_response_with_model()` [行号: 69-95]
+ * 响应处理:`_process_response()` [行号: 97-106]
+
+### 4. 提示词构建系统 (HeartFlowPromptBuilder)
+[代码位置: src/plugins/heartFC_chat/heartflow_prompt_builder.py]
+
+提示词构建器支持两种工作模式,HeartFC_chat专门使用Focus模式,而Normal模式是为normal_chat设计的:
+
+#### 专注模式 (Focus Mode) - HeartFC_chat专用
+- 实现函数:`_build_prompt_focus()` [行号: 116-141]
+- 特点:
+ * 专注于当前对话状态和思维
+ * 更强的目标导向性
+ * 用于HeartFC_chat的Plan-Replier-Sender循环
+ * 简化的上下文处理,专注于决策
+
+#### 普通模式 (Normal Mode) - Normal_chat专用
+- 实现函数:`_build_prompt_normal()` [行号: 143-215]
+- 特点:
+ * 用于normal_chat的常规对话
+ * 完整的个性化处理
+ * 关系系统集成
+ * 知识库检索:`get_prompt_info()` [行号: 217-591]
+
+HeartFC_chat的Focus模式工作流程:
+```mermaid
+graph TD
+ A[获取结构化信息] --> B[获取当前思维状态]
+ B --> C[构建专注模式提示词]
+ C --> D[用于Plan阶段决策]
+ D --> E[用于Replier阶段生成]
+```
+
+## 智能特性
+
+### 1. 对话决策机制
+- LLM决策工具定义:`PLANNER_TOOL_DEFINITION` [heartFC_chat.py 行号: 13-42]
+- 决策执行:`_planner()` [heartFC_chat.py 行号: 282-386]
+- 考虑因素:
+ * 上下文相关性
+ * 情感状态
+ * 兴趣程度
+ * 对话时机
+
+### 2. 状态管理
+[代码位置: src/plugins/heartFC_chat/heartFC_chat.py]
+- 状态机实现:`HeartFChatting`类 [行号: 44-567]
+- 核心功能:
+ * 初始化:`_initialize()` [行号: 89-112]
+ * 循环控制:`_run_pf_loop()` [行号: 192-281]
+ * 状态转换:`_handle_loop_completion()` [行号: 166-190]
+
+### 3. 回复生成策略
+[代码位置: src/plugins/heartFC_chat/heartFC_generator.py]
+- 温度调节:`current_model.temperature = global_config.llm_normal["temp"] * arousal_multiplier` [行号: 48]
+- 生成控制:`_generate_response_with_model()` [行号: 69-95]
+- 响应处理:`_process_response()` [行号: 97-106]
+
+## 系统配置
+
+### 关键参数
+- LLM配置:`model_normal` [heartFC_generator.py 行号: 32-37]
+- 过滤规则:`_check_ban_words()`, `_check_ban_regex()` [heartflow_processor.py 行号: 196-215]
+- 状态控制:`INITIAL_DURATION = 60.0` [heartFC_chat.py 行号: 11]
+
+### 优化建议
+1. 调整LLM参数:`temperature`和`max_tokens`
+2. 优化提示词模板:`init_prompt()` [heartflow_prompt_builder.py 行号: 8-115]
+3. 配置状态转换条件
+4. 维护过滤规则
+
+## 注意事项
+
+1. 系统稳定性
+- 异常处理:各主要函数都包含try-except块
+- 状态检查:`_processing_lock`确保并发安全
+- 循环控制:`_loop_active`和`_loop_task`管理
+
+2. 性能优化
+- 缓存使用:`message_buffer`系统
+- LLM调用优化:批量处理和复用
+- 异步处理:使用`asyncio`
+
+3. 质量控制
+- 日志记录:使用`get_module_logger()`
+- 错误追踪:详细的异常记录
+- 响应监控:完整的状态跟踪
diff --git a/src/plugins/heartFC_chat/heartflow_processor.py b/src/plugins/heartFC_chat/heartflow_processor.py
index f7c3a64fd..27c88a983 100644
--- a/src/plugins/heartFC_chat/heartflow_processor.py
+++ b/src/plugins/heartFC_chat/heartflow_processor.py
@@ -12,6 +12,7 @@ from ..chat.chat_stream import chat_manager
from ..chat.message_buffer import message_buffer
from ..utils.timer_calculater import Timer
from src.plugins.person_info.relationship_manager import relationship_manager
+from typing import Optional, Tuple
# 定义日志配置
processor_config = LogConfig(
@@ -22,193 +23,204 @@ logger = get_module_logger("heartflow_processor", config=processor_config)
class HeartFCProcessor:
+ """心流处理器,负责处理接收到的消息并计算兴趣度"""
+
def __init__(self):
+ """初始化心流处理器,创建消息存储实例"""
self.storage = MessageStorage()
- async def process_message(self, message_data: str) -> None:
- """处理接收到的原始消息数据,完成消息解析、缓冲、过滤、存储、兴趣度计算与更新等核心流程。
-
- 此函数是消息处理的核心入口,负责接收原始字符串格式的消息数据,并将其转化为结构化的 `MessageRecv` 对象。
- 主要执行步骤包括:
- 1. 解析 `message_data` 为 `MessageRecv` 对象,提取用户信息、群组信息等。
- 2. 将消息加入 `message_buffer` 进行缓冲处理,以应对消息轰炸或者某些人一条消息分几次发等情况。
- 3. 获取或创建对应的 `chat_stream` 和 `subheartflow` 实例,用于管理会话状态和心流。
- 4. 对消息内容进行初步处理(如提取纯文本)。
- 5. 应用全局配置中的过滤词和正则表达式,过滤不符合规则的消息。
- 6. 查询消息缓冲结果,如果消息被缓冲器拦截(例如,判断为消息轰炸的一部分),则中止后续处理。
- 7. 对于通过缓冲的消息,将其存储到 `MessageStorage` 中。
-
- 8. 调用海马体(`HippocampusManager`)计算消息内容的记忆激活率。(这部分算法后续会进行优化)
- 9. 根据是否被提及(@)和记忆激活率,计算最终的兴趣度增量。(提及的额外兴趣增幅)
- 10. 使用计算出的增量更新 `InterestManager` 中对应会话的兴趣度。
- 11. 记录处理后的消息信息及当前的兴趣度到日志。
-
- 注意:此函数本身不负责生成和发送回复。回复的决策和生成逻辑被移至 `HeartFC_Chat` 类中的监控任务,
- 该任务会根据 `InterestManager` 中的兴趣度变化来决定何时触发回复。
-
+ async def _handle_error(self, error: Exception, context: str, message: Optional[MessageRecv] = None) -> None:
+ """统一的错误处理函数
+
Args:
- message_data: str: 从消息源接收到的原始消息字符串。
+ error: 捕获到的异常
+ context: 错误发生的上下文描述
+ message: 可选的消息对象,用于记录相关消息内容
+ """
+ logger.error(f"{context}: {error}")
+ logger.error(traceback.format_exc())
+ if message and hasattr(message, 'raw_message'):
+ logger.error(f"相关消息原始内容: {message.raw_message}")
+
+ async def _process_relationship(self, message: MessageRecv) -> None:
+ """处理用户关系逻辑
+
+ Args:
+ message: 消息对象,包含用户信息
+ """
+ platform = message.message_info.platform
+ user_id = message.message_info.user_info.user_id
+ nickname = message.message_info.user_info.user_nickname
+ cardname = message.message_info.user_info.user_cardname or nickname
+
+ is_known = await relationship_manager.is_known_some_one(platform, user_id)
+
+ if not is_known:
+ logger.info(f"首次认识用户: {nickname}")
+ await relationship_manager.first_knowing_some_one(
+ platform, user_id, nickname, cardname, ""
+ )
+ elif not await relationship_manager.is_qved_name(platform, user_id):
+ logger.info(f"给用户({nickname},{cardname})取名: {nickname}")
+ await relationship_manager.first_knowing_some_one(
+ platform, user_id, nickname, cardname, ""
+ )
+
+ async def _calculate_interest(self, message: MessageRecv) -> Tuple[float, bool]:
+ """计算消息的兴趣度
+
+ Args:
+ message: 待处理的消息对象
+
+ Returns:
+ Tuple[float, bool]: (兴趣度, 是否被提及)
+ """
+ is_mentioned, _ = is_mentioned_bot_in_message(message)
+ interested_rate = 0.0
+
+ with Timer("记忆激活"):
+ interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
+ message.processed_plain_text,
+ fast_retrieval=True,
+ )
+ logger.trace(f"记忆激活率: {interested_rate:.2f}")
+
+ if is_mentioned:
+ interest_increase_on_mention = 1
+ interested_rate += interest_increase_on_mention
+
+ return interested_rate, is_mentioned
+
+ def _get_message_type(self, message: MessageRecv) -> str:
+ """获取消息类型
+
+ Args:
+ message: 消息对象
+
+ Returns:
+ str: 消息类型
+ """
+ if message.message_segment.type != "seglist":
+ return message.message_segment.type
+
+ if (isinstance(message.message_segment.data, list)
+ and all(isinstance(x, Seg) for x in message.message_segment.data)
+ and len(message.message_segment.data) == 1):
+ return message.message_segment.data[0].type
+
+ return "seglist"
+
+ async def process_message(self, message_data: str) -> None:
+ """处理接收到的原始消息数据
+
+ 主要流程:
+ 1. 消息解析与初始化
+ 2. 消息缓冲处理
+ 3. 过滤检查
+ 4. 兴趣度计算
+ 5. 关系处理
+
+ Args:
+ message_data: 原始消息字符串
"""
- timing_results = {} # 初始化 timing_results
message = None
try:
+ # 1. 消息解析与初始化
message = MessageRecv(message_data)
groupinfo = message.message_info.group_info
userinfo = message.message_info.user_info
messageinfo = message.message_info
- # 消息加入缓冲池
+ # 2. 消息缓冲与流程序化
await message_buffer.start_caching_messages(message)
-
- # 创建聊天流
+
chat = await chat_manager.get_or_create_stream(
platform=messageinfo.platform,
user_info=userinfo,
group_info=groupinfo,
)
-
+
subheartflow = await heartflow.create_subheartflow(chat.stream_id)
-
message.update_chat_stream(chat)
-
- await heartflow.create_subheartflow(chat.stream_id)
-
await message.process()
- logger.trace(f"消息处理成功: {message.processed_plain_text}")
-
- # 过滤词/正则表达式过滤
- if self._check_ban_words(message.processed_plain_text, chat, userinfo) or self._check_ban_regex(
- message.raw_message, chat, userinfo
- ):
+
+ # 3. 过滤检查
+ if self._check_ban_words(message.processed_plain_text, chat, userinfo) or \
+ self._check_ban_regex(message.raw_message, chat, userinfo):
return
- # 查询缓冲器结果
+ # 4. 缓冲检查
buffer_result = await message_buffer.query_buffer_result(message)
-
- # 处理缓冲器结果 (Bombing logic)
if not buffer_result:
- f_type = "seglist"
- if message.message_segment.type != "seglist":
- f_type = message.message_segment.type
- else:
- if (
- isinstance(message.message_segment.data, list)
- and all(isinstance(x, Seg) for x in message.message_segment.data)
- and len(message.message_segment.data) == 1
- ):
- f_type = message.message_segment.data[0].type
- if f_type == "text":
- logger.debug(f"触发缓冲,消息:{message.processed_plain_text}")
- elif f_type == "image":
- logger.debug("触发缓冲,表情包/图片等待中")
- elif f_type == "seglist":
- logger.debug("触发缓冲,消息列表等待中")
- return # 被缓冲器拦截,不生成回复
-
- # ---- 只有通过缓冲的消息才进行存储和后续处理 ----
-
- # 存储消息 (使用可能被缓冲器更新过的 message)
- try:
- await self.storage.store_message(message, chat)
- logger.trace(f"存储成功 (通过缓冲后): {message.processed_plain_text}")
- except Exception as e:
- logger.error(f"存储消息失败: {e}")
- logger.error(traceback.format_exc())
- # 存储失败可能仍需考虑是否继续,暂时返回
+ msg_type = self._get_message_type(message)
+ type_messages = {
+ "text": f"触发缓冲,消息:{message.processed_plain_text}",
+ "image": "触发缓冲,表情包/图片等待中",
+ "seglist": "触发缓冲,消息列表等待中"
+ }
+ logger.debug(type_messages.get(msg_type, "触发未知类型缓冲"))
return
- # 激活度计算 (使用可能被缓冲器更新过的 message.processed_plain_text)
- is_mentioned, _ = is_mentioned_bot_in_message(message)
- interested_rate = 0.0 # 默认值
- try:
- with Timer("记忆激活", timing_results):
- interested_rate = await HippocampusManager.get_instance().get_activate_from_text(
- message.processed_plain_text,
- fast_retrieval=True, # 使用更新后的文本
- )
- logger.trace(f"记忆激活率 (通过缓冲后): {interested_rate:.2f}")
- except Exception as e:
- logger.error(f"计算记忆激活率失败: {e}")
- logger.error(traceback.format_exc())
+ # 5. 消息存储
+ await self.storage.store_message(message, chat)
+ logger.trace(f"存储成功: {message.processed_plain_text}")
- # --- 修改:兴趣度更新逻辑 --- #
- if is_mentioned:
- interest_increase_on_mention = 1
- mentioned_boost = interest_increase_on_mention # 从配置获取提及增加值
- interested_rate += mentioned_boost
-
- # 更新兴趣度 (调用 SubHeartflow 的方法)
+ # 6. 兴趣度计算与更新
+ interested_rate, is_mentioned = await self._calculate_interest(message)
current_time = time.time()
await subheartflow.interest_chatting.increase_interest(current_time, value=interested_rate)
-
- # 添加到 SubHeartflow 的 interest_dict,给normal_chat处理
await subheartflow.add_interest_dict_entry(message, interested_rate, is_mentioned)
- # 打印消息接收和处理信息
+ # 7. 日志记录
mes_name = chat.group_info.group_name if chat.group_info else "私聊"
- current_time = time.strftime("%H:%M:%S", time.localtime(message.message_info.time))
+ current_time = time.strftime("%H点%M分%S秒", time.localtime(message.message_info.time))
logger.info(
f"[{current_time}][{mes_name}]"
- f"{message.message_info.user_info.user_nickname}:"
+ f"{userinfo.user_nickname}:"
f"{message.processed_plain_text}"
f"[兴趣度: {interested_rate:.2f}]"
)
- try:
- is_known = await relationship_manager.is_known_some_one(
- message.message_info.platform, message.message_info.user_info.user_id
- )
- if not is_known:
- logger.info(f"首次认识用户: {message.message_info.user_info.user_nickname}")
- await relationship_manager.first_knowing_some_one(
- message.message_info.platform,
- message.message_info.user_info.user_id,
- message.message_info.user_info.user_nickname,
- message.message_info.user_info.user_cardname or message.message_info.user_info.user_nickname,
- "",
- )
- else:
- # logger.debug(f"已认识用户: {message.message_info.user_info.user_nickname}")
- if not await relationship_manager.is_qved_name(
- message.message_info.platform, message.message_info.user_info.user_id
- ):
- logger.info(f"更新已认识但未取名的用户: {message.message_info.user_info.user_nickname}")
- await relationship_manager.first_knowing_some_one(
- message.message_info.platform,
- message.message_info.user_info.user_id,
- message.message_info.user_info.user_nickname,
- message.message_info.user_info.user_cardname
- or message.message_info.user_info.user_nickname,
- "",
- )
- except Exception as e:
- logger.error(f"处理认识关系失败: {e}")
- logger.error(traceback.format_exc())
+ # 8. 关系处理
+ await self._process_relationship(message)
except Exception as e:
- logger.error(f"消息处理失败 (process_message V3): {e}")
- logger.error(traceback.format_exc())
- if message: # 记录失败的消息内容
- logger.error(f"失败消息原始内容: {message.raw_message}")
+ await self._handle_error(e, "消息处理失败", message)
def _check_ban_words(self, text: str, chat, userinfo) -> bool:
- """检查消息中是否包含过滤词"""
+ """检查消息是否包含过滤词
+
+ Args:
+ text: 待检查的文本
+ chat: 聊天对象
+ userinfo: 用户信息
+
+ Returns:
+ bool: 是否包含过滤词
+ """
for word in global_config.ban_words:
if word in text:
- logger.info(
- f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
- )
+ chat_name = chat.group_info.group_name if chat.group_info else "私聊"
+ logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}")
logger.info(f"[过滤词识别]消息中含有{word},filtered")
return True
return False
def _check_ban_regex(self, text: str, chat, userinfo) -> bool:
- """检查消息是否匹配过滤正则表达式"""
+ """检查消息是否匹配过滤正则表达式
+
+ Args:
+ text: 待检查的文本
+ chat: 聊天对象
+ userinfo: 用户信息
+
+ Returns:
+ bool: 是否匹配过滤正则
+ """
for pattern in global_config.ban_msgs_regex:
if pattern.search(text):
- logger.info(
- f"[{chat.group_info.group_name if chat.group_info else '私聊'}]{userinfo.user_nickname}:{text}"
- )
+ chat_name = chat.group_info.group_name if chat.group_info else "私聊"
+ logger.info(f"[{chat_name}]{userinfo.user_nickname}:{text}")
logger.info(f"[正则表达式过滤]消息匹配到{pattern},filtered")
return True
return False
diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py
index 102aef52b..146a5307f 100644
--- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py
+++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py
@@ -21,8 +21,7 @@ logger = get_module_logger("prompt")
def init_prompt():
Prompt(
"""
-你有以下信息可供参考:
-{structured_info}
+{info_from_tools}
{chat_target}
{chat_talking_prompt}
现在你想要在群里发言或者回复。\n
@@ -38,6 +37,12 @@ def init_prompt():
{moderation_prompt}。注意:回复不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""",
"heart_flow_prompt",
)
+
+ Prompt("""
+你有以下信息可供参考:
+{structured_info}
+以上的消息是你获取到的消息,或许可以帮助你更好地回复。
+""", "info_from_tools")
# Planner提示词
Prompt(
@@ -47,13 +52,9 @@ def init_prompt():
看了以上内容,你产生的内心想法是:
{current_mind_block}
请结合你的内心想法和观察到的聊天内容,分析情况并使用 'decide_reply_action' 工具来决定你的最终行动。
-决策依据:
+注意你必须参考以下决策依据来选择工具:
1. 如果聊天内容无聊、与你无关、或者你的内心想法认为不适合回复(例如在讨论你不懂或不感兴趣的话题),选择 'no_reply'。
-2. 如果聊天内容值得回应,且适合用文字表达(参考你的内心想法),选择 'text_reply'。如果你有情绪想表达,想在文字后追加一个表达情绪的表情,请同时提供 'emoji_query' (每个标签用一个词组表示,格式如下:
- 幽默的讽刺
- 悲伤的无奈
- 愤怒的抗议
- 愤怒的讽刺)。
+2. 如果聊天内容值得回应,且适合用文字表达(参考你的内心想法),选择 'text_reply'。如果你有情绪想表达,想在文字后追加一个表达情绪的表情,请同时提供 'emoji_query' (每个标签用一个词组表示,格式例如:幽默的讽刺,单纯的开心,愤怒的抗议)。
3. 如果聊天内容或你的内心想法适合用一个表情来回应,选择 'emoji_reply' 并提供表情主题 'emoji_query'。
4. 如果最后一条消息是你自己发的,观察到的内容只有你自己的发言,并且之后没有人回复你,通常选择 'no_reply',除非有特殊原因需要追问。
5. 如果聊天记录中最新的消息是你自己发送的,并且你还想继续回复,你应该紧紧衔接你发送的消息,进行话题的深入,补充,或追问等等;。
@@ -152,7 +153,7 @@ class PromptBuilder:
message_list_before_now,
replace_bot_name=True,
merge_messages=False,
- timestamp_mode="relative",
+ timestamp_mode="normal",
read_mark=0.0,
)
@@ -162,12 +163,19 @@ class PromptBuilder:
prompt_ger += "你喜欢用倒装句"
if random.random() < 0.02:
prompt_ger += "你喜欢用反问句"
+
+ if structured_info:
+ structured_info_prompt = await global_prompt_manager.format_prompt(
+ "info_from_tools",
+ structured_info = structured_info)
+ else:
+ structured_info_prompt = ""
logger.debug("开始构建prompt")
prompt = await global_prompt_manager.format_prompt(
"heart_flow_prompt",
- structured_info=structured_info,
+ info_from_tools=structured_info_prompt,
chat_target=await global_prompt_manager.get_prompt_async("chat_target_group1")
if chat_in_group
else await global_prompt_manager.get_prompt_async("chat_target_private1"),
diff --git a/src/plugins/moods/moods.py b/src/plugins/moods/moods.py
index e3fb377c6..eea2177ff 100644
--- a/src/plugins/moods/moods.py
+++ b/src/plugins/moods/moods.py
@@ -256,7 +256,7 @@ class MoodManager:
def print_mood_status(self) -> None:
"""打印当前情绪状态"""
logger.info(
- f"[情绪状态]愉悦度: {self.current_mood.valence:.2f}, "
+ f"愉悦度: {self.current_mood.valence:.2f}, "
f"唤醒度: {self.current_mood.arousal:.2f}, "
f"心情: {self.current_mood.text}"
)
diff --git a/src/plugins/utils/chat_message_builder.py b/src/plugins/utils/chat_message_builder.py
index 6ae6ccc32..edd60c05a 100644
--- a/src/plugins/utils/chat_message_builder.py
+++ b/src/plugins/utils/chat_message_builder.py
@@ -304,7 +304,7 @@ async def build_readable_messages(
readable_read_mark = translate_timestamp_to_human_readable(read_mark, mode=timestamp_mode)
read_mark_line = (
- f"\n\n--- 以上消息已读 (标记时间: {readable_read_mark}) ---\n--- 请关注你上次思考之后以下的新消息---\n"
+ f"\n\n--- 以上消息已读 (标记时间: {readable_read_mark}) ---\n--- 以下新消息未读---\n"
)
# 组合结果,确保空部分不引入多余的标记或换行