typing和防炸

This commit is contained in:
UnCLAS-Prommer
2025-08-11 11:35:14 +08:00
parent 43190b12d2
commit 4cb57278b1
3 changed files with 88 additions and 106 deletions

View File

@@ -11,7 +11,6 @@ from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.chat.utils.prompt_builder import global_prompt_manager
from src.chat.utils.timer_calculator import Timer
from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat
from src.chat.planner_actions.planner import ActionPlanner
from src.chat.planner_actions.action_modifier import ActionModifier
from src.chat.planner_actions.action_manager import ActionManager
@@ -25,6 +24,7 @@ from src.plugin_system.apis import generator_api, send_api, message_api, databas
from src.chat.willing.willing_manager import get_willing_manager
from src.mais4u.mai_think import mai_thinking_manager
from src.mais4u.constant_s4u import ENABLE_S4U
# no_reply逻辑已集成到heartFC_chat.py中不再需要导入
from src.chat.chat_loop.hfc_utils import send_typing, stop_typing
@@ -90,7 +90,6 @@ class HeartFChatting:
self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id)
self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id)
self.action_manager = ActionManager()
self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager)
@@ -116,7 +115,7 @@ class HeartFChatting:
logger.info(f"{self.log_prefix} HeartFChatting 初始化完成")
self.energy_value = 5
self.focus_energy = 1
self.no_reply_consecutive = 0
# 最近三次no_reply的新消息兴趣度记录
@@ -194,28 +193,27 @@ class HeartFChatting:
# 获取动作类型,兼容新旧格式
action_type = "未知动作"
if hasattr(self, '_current_cycle_detail') and self._current_cycle_detail:
if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail:
loop_plan_info = self._current_cycle_detail.loop_plan_info
if isinstance(loop_plan_info, dict):
action_result = loop_plan_info.get('action_result', {})
action_result = loop_plan_info.get("action_result", {})
if isinstance(action_result, dict):
# 旧格式action_result是字典
action_type = action_result.get('action_type', '未知动作')
action_type = action_result.get("action_type", "未知动作")
elif isinstance(action_result, list) and action_result:
# 新格式action_result是actions列表
action_type = action_result[0].get('action_type', '未知动作')
action_type = action_result[0].get("action_type", "未知动作")
elif isinstance(loop_plan_info, list) and loop_plan_info:
# 直接是actions列表的情况
action_type = loop_plan_info[0].get('action_type', '未知动作')
action_type = loop_plan_info[0].get("action_type", "未知动作")
logger.info(
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " # type: ignore
f"选择动作: {action_type}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
def _determine_form_type(self) -> str:
def _determine_form_type(self):
"""判断使用哪种形式的no_reply"""
# 如果连续no_reply次数少于3次使用waiting形式
if self.no_reply_consecutive <= 3:
@@ -223,71 +221,73 @@ class HeartFChatting:
else:
# 计算最近三次记录的兴趣度总和
total_recent_interest = sum(self.recent_interest_records)
# 计算调整后的阈值
adjusted_threshold = 3 / global_config.chat.get_current_talk_frequency(self.stream_id)
logger.info(f"{self.log_prefix} 最近三次兴趣度总和: {total_recent_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}")
logger.info(
f"{self.log_prefix} 最近三次兴趣度总和: {total_recent_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}"
)
# 如果兴趣度总和小于阈值进入breaking形式
if total_recent_interest < adjusted_threshold:
logger.info(f"{self.log_prefix} 兴趣度不足进入breaking形式")
self.focus_energy = random.randint(3, 6)
else:
logger.info(f"{self.log_prefix} 兴趣度充足")
self.focus_energy = 1
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool,float]:
self.focus_energy = 1
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]:
"""
判断是否应该处理消息
Args:
new_message: 新消息列表
mode: 当前聊天模式
Returns:
bool: 是否应该处理消息
"""
new_message_count = len(new_message)
# talk_frequency = global_config.chat.get_current_talk_frequency(self.stream_id)
modified_exit_count_threshold = self.focus_energy / global_config.chat.focus_value
total_interest = 0.0
for msg_dict in new_message:
interest_value = msg_dict.get("interest_value", 0.0)
if msg_dict.get("processed_plain_text", ""):
total_interest += interest_value
if new_message_count >= modified_exit_count_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
logger.info(
f"{self.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待"
)
logger.info(self.last_read_time)
logger.info(new_message)
return True,total_interest/new_message_count
logger.info(str(self.last_read_time))
logger.info(str(new_message))
return True, total_interest / new_message_count
# 检查累计兴趣值
if new_message_count > 0:
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest:
logger.info(f"{self.log_prefix} breaking形式当前累计兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}")
logger.info(
f"{self.log_prefix} breaking形式当前累计兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}"
)
self._last_accumulated_interest = total_interest
if total_interest >= 3 / global_config.chat.focus_value:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
logger.info(
f"{self.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{3 / global_config.chat.focus_value}),结束等待"
)
return True,total_interest/new_message_count
return True, total_interest / new_message_count
# 每10秒输出一次等待状态
if int(time.time() - self.last_read_time) > 0 and int(time.time() - self.last_read_time) % 10 == 0:
@@ -295,29 +295,28 @@ class HeartFChatting:
f"{self.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,累计兴趣{total_interest:.1f},继续等待..."
)
await asyncio.sleep(0.5)
return False,0.0
return False, 0.0
async def _loopbody(self):
recent_messages_dict = message_api.get_messages_by_time_in_chat(
chat_id=self.stream_id,
start_time=self.last_read_time,
end_time=time.time(),
limit = 10,
limit=10,
limit_mode="latest",
filter_mai=True,
filter_command=True,
)
)
# 统一的消息处理逻辑
should_process,interest_value = await self._should_process_messages(recent_messages_dict)
should_process, interest_value = await self._should_process_messages(recent_messages_dict)
if should_process:
# earliest_message_data = recent_messages_dict[0]
# self.last_read_time = earliest_message_data.get("time")
self.last_read_time = time.time()
await self._observe(interest_value = interest_value)
await self._observe(interest_value=interest_value)
else:
# Normal模式消息数量不足等待
@@ -328,12 +327,12 @@ class HeartFChatting:
async def build_reply_to_str(self, message_data: dict):
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
platform = message_data.get("chat_info_platform")
if platform is None:
platform = getattr(self.chat_stream, "platform", "unknown")
person_id = person_info_manager.get_person_id(
platform, # type: ignore
message_data.get("user_id"), # type: ignore
@@ -356,12 +355,12 @@ class HeartFChatting:
# 存储reply action信息
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
platform = action_message.get("chat_info_platform")
if platform is None:
platform = getattr(self.chat_stream, "platform", "unknown")
person_id = person_info_manager.get_person_id(
platform,
action_message.get("user_id", ""),
@@ -394,17 +393,15 @@ class HeartFChatting:
return loop_info, reply_text, cycle_timers
async def _observe(self,interest_value:float = 0.0) -> bool:
async def _observe(self, interest_value: float = 0.0) -> bool:
action_type = "no_action"
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
reply_to_str = "" # 初始化reply_to_str变量
# 根据interest_value计算概率决定使用哪种planner模式
# interest_value越高越倾向于使用Normal模式
import random
import math
# 使用sigmoid函数将interest_value转换为概率
# 当interest_value为0时概率接近0使用Focus模式
# 当interest_value很高时概率接近1使用Normal模式
@@ -417,16 +414,22 @@ class HeartFChatting:
k = 2.0 # 控制曲线陡峭程度
x0 = 1.0 # 控制曲线中心点
return 1.0 / (1.0 + math.exp(-k * (interest_val - x0)))
normal_mode_probability = calculate_normal_mode_probability(interest_value) / global_config.chat.get_current_talk_frequency(self.stream_id)
normal_mode_probability = calculate_normal_mode_probability(
interest_value
) / global_config.chat.get_current_talk_frequency(self.stream_id)
# 根据概率决定使用哪种模式
if random.random() < normal_mode_probability:
mode = ChatMode.NORMAL
logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Normal planner模式")
logger.info(
f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Normal planner模式"
)
else:
mode = ChatMode.FOCUS
logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Focus planner模式")
logger.info(
f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Focus planner模式"
)
# 创建新的循环信息
cycle_timers, thinking_id = self.start_cycle()
@@ -463,7 +466,7 @@ class HeartFChatting:
):
return False
with Timer("规划器", cycle_timers):
actions, _= await self.action_planner.plan(
actions, _ = await self.action_planner.plan(
mode=mode,
loop_start_time=loop_start_time,
available_actions=available_actions,
@@ -477,7 +480,6 @@ class HeartFChatting:
# action_result.get("is_parallel", True),
# )
# 3. 并行执行所有动作
async def execute_action(action_info):
"""执行单个动作的通用函数"""
@@ -486,7 +488,7 @@ class HeartFChatting:
# 直接处理no_reply逻辑不再通过动作系统
reason = action_info.get("reasoning", "选择不回复")
logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}")
# 存储no_reply信息到数据库
await database_api.store_action_info(
chat_stream=self.chat_stream,
@@ -497,13 +499,8 @@ class HeartFChatting:
action_data={"reason": reason},
action_name="no_reply",
)
return {
"action_type": "no_reply",
"success": True,
"reply_text": "",
"command": ""
}
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
elif action_info["action_type"] != "reply":
# 执行普通动作
with Timer("动作执行", cycle_timers):
@@ -513,19 +510,18 @@ class HeartFChatting:
action_info["action_data"],
cycle_timers,
thinking_id,
action_info["action_message"]
action_info["action_message"],
)
return {
"action_type": action_info["action_type"],
"success": success,
"reply_text": reply_text,
"command": command
"command": command,
}
else:
# 执行回复动作
reply_to_str = await self.build_reply_to_str(action_info["action_message"])
# 生成回复
gather_timeout = global_config.chat.thinking_timeout
try:
@@ -536,35 +532,20 @@ class HeartFChatting:
reply_to=reply_to_str,
request_type="chat.replyer",
),
timeout=gather_timeout
timeout=gather_timeout,
)
except asyncio.TimeoutError:
logger.warning(
f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s已跳过"
)
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None
}
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None
}
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
if not response_set:
logger.warning(f"{self.log_prefix} 模型超时或生成回复内容为空")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None
}
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
response_set,
@@ -579,7 +560,7 @@ class HeartFChatting:
"action_type": "reply",
"success": True,
"reply_text": reply_text,
"loop_info": loop_info
"loop_info": loop_info,
}
except Exception as e:
logger.error(f"{self.log_prefix} 执行动作时出错: {e}")
@@ -589,30 +570,29 @@ class HeartFChatting:
"success": False,
"reply_text": "",
"loop_info": None,
"error": str(e)
"error": str(e),
}
# 创建所有动作的后台任务
# print(actions)
action_tasks = [asyncio.create_task(execute_action(action)) for action in actions]
# 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 处理执行结果
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
for i, result in enumerate(results):
for result in results:
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
continue
action_info = actions[i]
if result["action_type"] != "reply":
action_success = result["success"]
action_reply_text = result["reply_text"]
@@ -651,7 +631,6 @@ class HeartFChatting:
},
}
reply_text = action_reply_text
if ENABLE_S4U:
await stop_typing()
@@ -663,7 +642,7 @@ class HeartFChatting:
# await self.willing_manager.after_generate_reply_handle(message_data.get("message_id", ""))
action_type = actions[0]["action_type"] if actions else "no_action"
# 管理no_reply计数器当执行了非no_reply动作时重置计数器
if action_type != "no_reply":
# no_reply逻辑已集成到heartFC_chat.py中直接重置计数器
@@ -671,7 +650,7 @@ class HeartFChatting:
self.no_reply_consecutive = 0
logger.debug(f"{self.log_prefix} 执行了{action_type}动作重置no_reply计数器")
return True
if action_type == "no_reply":
self.no_reply_consecutive += 1
self._determine_form_type()

View File

@@ -24,7 +24,9 @@ class QAManager:
self.kg_manager = kg_manager
self.qa_model = LLMRequest(model_set=model_config.model_task_config.lpmm_qa, request_type="lpmm.qa")
async def process_query(self, question: str) -> Optional[Tuple[List[Tuple[str, float, float]], Optional[Dict[str, float]]]]:
async def process_query(
self, question: str
) -> Optional[Tuple[List[Tuple[str, float, float]], Optional[Dict[str, float]]]]:
"""处理查询"""
# 生成问题的Embedding
@@ -56,7 +58,8 @@ class QAManager:
logger.debug(f"关系检索用时:{part_end_time - part_start_time:.5f}s")
for res in relation_search_res:
rel_str = self.embed_manager.relation_embedding_store.store.get(res[0]).str
if store_item := self.embed_manager.relation_embedding_store.store.get(res[0]):
rel_str = store_item.str
print(f"找到相关关系,相似度:{(res[1] * 100):.2f}% - {rel_str}")
# TODO: 使用LLM过滤三元组结果
@@ -105,7 +108,7 @@ class QAManager:
if not query_res:
logger.debug("知识库查询结果为空,可能是知识库中没有相关内容")
return None
knowledge = [
(
self.embed_manager.paragraphs_embedding_store.store[res[0]].str,

View File

@@ -70,8 +70,8 @@ def get_key_comment(toml_table, key):
return item.trivia.comment
if hasattr(toml_table, "keys"):
for k in toml_table.keys():
if isinstance(k, KeyType) and k.key == key:
return k.trivia.comment
if isinstance(k, KeyType) and k.key == key: # type: ignore
return k.trivia.comment # type: ignore
return None