better:让表情包和文字统一发送

This commit is contained in:
SengokuCola
2025-05-13 22:13:00 +08:00
parent 238165d1f9
commit d2f7f093e3
11 changed files with 243 additions and 241 deletions

View File

@@ -30,7 +30,7 @@ from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservati
from src.chat.heart_flow.observation.working_observation import WorkingObservation
from src.chat.focus_chat.info_processors.tool_processor import ToolProcessor
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.focus_chat.hfc_utils import _create_empty_anchor_message
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message, parse_thinking_id_to_timestamp
from src.chat.focus_chat.memory_activator import MemoryActivator
install(extra_lines=3)
@@ -203,9 +203,9 @@ class HeartFChatting:
self._cycle_counter = 0
self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息
self._current_cycle: Optional[CycleDetail] = None
self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器
self.total_no_reply_count: int = 0 # <--- 新增:连续不回复计数器
self._shutting_down: bool = False # <--- 新增:关闭标志位
self._lian_xu_deng_dai_shi_jian: float = 0.0 # <--- 新增:累计等待时间
self.total_waiting_time: float = 0.0 # <--- 新增:累计等待时间
async def _initialize(self) -> bool:
"""
@@ -325,11 +325,12 @@ class HeartFChatting:
await asyncio.sleep(0.1) # 短暂等待避免空转
continue
# 记录规划开始时间点
planner_start_db_time = time.time()
# thinking_id 是思考过程的ID用于标记每一轮思考
thinking_id = "tid" + str(round(time.time(), 2))
# 主循环:思考->决策->执行
action_taken, thinking_id = await self._think_plan_execute_loop(cycle_timers, planner_start_db_time)
action_taken = await self._think_plan_execute_loop(cycle_timers, thinking_id)
# 更新循环信息
self._current_cycle.set_thinking_id(thinking_id)
@@ -391,9 +392,8 @@ class HeartFChatting:
if acquired and self._processing_lock.locked():
self._processing_lock.release()
async def _think_plan_execute_loop(self, cycle_timers: dict, planner_start_db_time: float) -> tuple[bool, str]:
async def _think_plan_execute_loop(self, cycle_timers: dict, thinking_id: str) -> tuple[bool, str]:
try:
await asyncio.sleep(1)
with Timer("观察", cycle_timers):
await self.observations[0].observe()
await self.memory_observation.observe()
@@ -515,7 +515,7 @@ class HeartFChatting:
self.hfcloop_observation.add_loop_info(self._current_cycle)
return await self._handle_action(action, reasoning, action_data, cycle_timers, planner_start_db_time)
return await self._handle_action(action, reasoning, action_data, cycle_timers, thinking_id)
except Exception as e:
logger.error(f"{self.log_prefix} 并行+串行处理失败: {e}")
@@ -523,7 +523,12 @@ class HeartFChatting:
return False, ""
async def _handle_action(
self, action: str, reasoning: str, action_data: dict, cycle_timers: dict, planner_start_db_time: float
self,
action: str,
reasoning: str,
action_data: dict,
cycle_timers: dict,
thinking_id: str,
) -> tuple[bool, str]:
"""
处理规划动作
@@ -550,17 +555,17 @@ class HeartFChatting:
try:
if action == "reply":
return await handler(reasoning, action_data, cycle_timers)
return await handler(reasoning, action_data, cycle_timers, thinking_id)
else: # no_reply
return await handler(reasoning, planner_start_db_time, cycle_timers), ""
return await handler(reasoning, cycle_timers, thinking_id)
except Exception as e:
logger.error(f"{self.log_prefix} 处理{action}时出错: {e}")
# 出错时也重置计数器
self._lian_xu_bu_hui_fu_ci_shu = 0
self._lian_xu_deng_dai_shi_jian = 0.0
self.total_no_reply_count = 0
self.total_waiting_time = 0.0
return False, ""
async def _handle_no_reply(self, reasoning: str, planner_start_db_time: float, cycle_timers: dict) -> bool:
async def _handle_no_reply(self, reasoning: str, cycle_timers: dict, thinking_id: str) -> bool:
"""
处理不回复的情况
@@ -584,53 +589,50 @@ class HeartFChatting:
try:
with Timer("等待新消息", cycle_timers):
# 等待新消息、超时或关闭信号,并获取结果
await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix)
await self._wait_for_new_message(observation, thinking_id, self.log_prefix)
# 从计时器获取实际等待时间
current_waiting = cycle_timers.get("等待新消息", 0.0)
if not self._shutting_down:
self._lian_xu_bu_hui_fu_ci_shu += 1
self._lian_xu_deng_dai_shi_jian += current_waiting # 累加等待时间
self.total_no_reply_count += 1
self.total_waiting_time += current_waiting # 累加等待时间
logger.debug(
f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{CONSECUTIVE_NO_REPLY_THRESHOLD}, "
f"本次等待: {current_waiting:.2f}秒, 累计等待: {self._lian_xu_deng_dai_shi_jian:.2f}"
f"{self.log_prefix} 连续不回复计数增加: {self.total_no_reply_count}/{CONSECUTIVE_NO_REPLY_THRESHOLD}, "
f"本次等待: {current_waiting:.2f}秒, 累计等待: {self.total_waiting_time:.2f}"
)
# 检查是否同时达到次数和时间阈值
time_threshold = 0.66 * WAITING_TIME_THRESHOLD * CONSECUTIVE_NO_REPLY_THRESHOLD
if (
self._lian_xu_bu_hui_fu_ci_shu >= CONSECUTIVE_NO_REPLY_THRESHOLD
and self._lian_xu_deng_dai_shi_jian >= time_threshold
self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD
and self.total_waiting_time >= time_threshold
):
logger.info(
f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次) "
f"且累计等待时间达到 {self._lian_xu_deng_dai_shi_jian:.2f}秒 (阈值 {time_threshold}秒)"
f"{self.log_prefix} 连续不回复达到阈值 ({self.total_no_reply_count}次) "
f"且累计等待时间达到 {self.total_waiting_time:.2f}秒 (阈值 {time_threshold}秒)"
f"调用回调请求状态转换"
)
# 调用回调。注意:这里不重置计数器和时间,依赖回调函数成功改变状态来隐式重置上下文。
await self.on_consecutive_no_reply_callback()
elif self._lian_xu_bu_hui_fu_ci_shu >= CONSECUTIVE_NO_REPLY_THRESHOLD:
elif self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD:
# 仅次数达到阈值,但时间未达到
logger.debug(
f"{self.log_prefix} 连续不回复次数达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次) "
f"但累计等待时间 {self._lian_xu_deng_dai_shi_jian:.2f}秒 未达到时间阈值 ({time_threshold}秒),暂不调用回调"
f"{self.log_prefix} 连续不回复次数达到阈值 ({self.total_no_reply_count}次) "
f"但累计等待时间 {self.total_waiting_time:.2f}秒 未达到时间阈值 ({time_threshold}秒),暂不调用回调"
)
# else: 次数和时间都未达到阈值,不做处理
return True
return True, thinking_id
except asyncio.CancelledError:
# 如果在等待过程中任务被取消(可能是因为 shutdown
logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)")
# 让异常向上传播,由 _hfc_loop 的异常处理逻辑接管
raise
except Exception as e: # 捕获调用管理器或其他地方可能发生的错误
logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}")
logger.error(traceback.format_exc())
# 发生意外错误时,可以选择是否重置计数器,这里选择不重置
return False # 表示动作未成功
return False, thinking_id
async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool:
async def _wait_for_new_message(self, observation: ChattingObservation, thinking_id: str, log_prefix: str) -> bool:
"""
等待新消息 或 检测到关闭信号
@@ -650,8 +652,10 @@ class HeartFChatting:
return False # 表示因为关闭而退出
# -----------------------------------
thinking_id_timestamp = parse_thinking_id_to_timestamp(thinking_id)
# 检查新消息
if await observation.has_new_messages_since(planner_start_db_time):
if await observation.has_new_messages_since(thinking_id_timestamp):
logger.info(f"{log_prefix} 检测到新消息")
return True
@@ -925,38 +929,42 @@ class HeartFChatting:
"llm_error": llm_error, # 返回错误状态
}
async def _handle_reply(self, reasoning: str, reply_data: dict, cycle_timers: dict) -> tuple[bool, str]:
async def _handle_reply(
self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
) -> tuple[bool, str]:
"""
处理统一的回复动作 - 可包含文本和表情,顺序任意
reply_data格式:
{
"text": ["你好啊", "今天天气真不错"], # 文本内容列表(可选)
"emojis": ["微笑", "阳光"] # 表情关键词列表(可选)
"text": "你好啊" # 文本内容列表(可选)
"target": "锚定消息", # 锚定消息的文本内容
"emojis": "微笑" # 表情关键词列表(可选)
}
"""
# 重置连续不回复计数器
self._lian_xu_bu_hui_fu_ci_shu = 0
self._lian_xu_deng_dai_shi_jian = 0.0
self.total_no_reply_count = 0
self.total_waiting_time = 0.0
# 获取锚定消息
# 从聊天观察获取锚定消息
observations: ChattingObservation = self.observations[0]
anchor_message = observations.serch_message_by_text(reply_data["target"])
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await _create_empty_anchor_message(
anchor_message = await create_empty_anchor_message(
self.chat_stream.platform, self.chat_stream.group_info, self.chat_stream
)
if not anchor_message:
logger.error(f"{self.log_prefix} 创建占位符失败,无法继续处理回复")
return False, ""
else:
anchor_message.update_chat_stream(self.chat_stream)
success, reply_set = await self.expressor.deal_reply(
cycle_timers=cycle_timers, action_data=reply_data, anchor_message=anchor_message, reasoning=reasoning
cycle_timers=cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=reasoning,
thinking_id=thinking_id,
)
reply_text = ""