typing and ruff fix

This commit is contained in:
UnCLAS-Prommer
2025-07-27 12:43:59 +08:00
parent 03098b2c13
commit c4c0983947
2 changed files with 134 additions and 112 deletions

View File

@@ -2,7 +2,7 @@ import asyncio
import time
import traceback
import random
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Tuple
from rich.traceback import install
from src.config.config import global_config
@@ -217,9 +217,11 @@ class HeartFChatting:
filter_bot=True,
)
if global_config.chat.focus_value != 0:
if len(new_messages_data) > 3 / pow(global_config.chat.focus_value,0.5):
if len(new_messages_data) > 3 / pow(global_config.chat.focus_value, 0.5):
self.loop_mode = ChatMode.FOCUS
self.energy_value = 10 + (len(new_messages_data) / (3 / pow(global_config.chat.focus_value,0.5))) * 10
self.energy_value = (
10 + (len(new_messages_data) / (3 / pow(global_config.chat.focus_value, 0.5))) * 10
)
return True
if self.energy_value >= 30:
@@ -254,20 +256,21 @@ class HeartFChatting:
)
person_name = await person_info_manager.get_value(person_id, "person_name")
return f"{person_name}:{message_data.get('processed_plain_text')}"
async def _send_and_store_reply(
self,
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers,
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers: Dict[str, float],
thinking_id,
plan_result):
plan_result,
) -> Tuple[Dict[str, Any], str, Dict[str, float]]:
with Timer("回复发送", cycle_timers):
reply_text = await self._send_response(response_set, reply_to_str, loop_start_time, action_message)
# 存储reply action信息
# 存储reply action信息
person_info_manager = get_person_info_manager()
person_id = person_info_manager.get_person_id(
action_message.get("chat_info_platform", ""),
@@ -275,7 +278,7 @@ class HeartFChatting:
)
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
@@ -285,10 +288,9 @@ class HeartFChatting:
action_data={"reply_text": reply_text, "reply_to": reply_to_str},
action_name="reply",
)
# 构建循环信息
loop_info = {
loop_info: Dict[str, Any] = {
"loop_plan_info": {
"action_result": plan_result.get("action_result", {}),
},
@@ -299,8 +301,8 @@ class HeartFChatting:
"taken_time": time.time(),
},
}
return loop_info, reply_text,cycle_timers
return loop_info, reply_text, cycle_timers
async def _observe(self, message_data: Optional[Dict[str, Any]] = None):
# sourcery skip: hoist-statement-from-if, merge-comparisons, reintroduce-else
@@ -310,12 +312,12 @@ class HeartFChatting:
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
gen_task = None # 初始化gen_task变量避免UnboundLocalError
reply_to_str = "" # 初始化reply_to_str变量
# 创建新的循环信息
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考[模式:{self.loop_mode}]")
if ENABLE_S4U:
await send_typing()
@@ -337,19 +339,20 @@ class HeartFChatting:
skip_planner = False
if self.loop_mode == ChatMode.NORMAL:
# 过滤掉reply相关的动作检查是否还有其他动作
non_reply_actions = {k: v for k, v in available_actions.items()
if k not in ['reply', 'no_reply', 'no_action']}
non_reply_actions = {
k: v for k, v in available_actions.items() if k not in ["reply", "no_reply", "no_action"]
}
if not non_reply_actions:
skip_planner = True
logger.info(f"{self.log_prefix} Normal模式下没有可用动作直接回复")
# 直接设置为reply动作
action_type = "reply"
reasoning = ""
action_data = {"loop_start_time": loop_start_time}
is_parallel = False
# 构建plan_result用于后续处理
plan_result = {
"action_result": {
@@ -361,22 +364,25 @@ class HeartFChatting:
},
"action_prompt": "",
}
target_message = message_data
target_message = message_data
# 如果normal模式且不跳过规划器开始一个回复生成进程先准备好回复其实是和planer同时进行的
if not skip_planner:
reply_to_str = await self.build_reply_to_str(message_data)
gen_task = asyncio.create_task(self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal"))
gen_task = asyncio.create_task(
self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal",
)
)
if not skip_planner:
with Timer("规划器", cycle_timers):
plan_result, target_message = await self.action_planner.plan(mode=self.loop_mode)
action_result: dict = plan_result.get("action_result", {}) # type: ignore
action_result: Dict[str, Any] = plan_result.get("action_result", {}) # type: ignore
action_type, action_data, reasoning, is_parallel = (
action_result.get("action_type", "error"),
action_result.get("action_data", {}),
@@ -386,29 +392,27 @@ class HeartFChatting:
action_data["loop_start_time"] = loop_start_time
if action_type == "reply":
logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复")
elif is_parallel:
logger.info(
f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作"
)
logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作")
else:
# 只有在gen_task存在时才进行相关操作
if gen_task is not None:
if gen_task:
if not gen_task.done():
gen_task.cancel()
logger.debug(f"{self.log_prefix} 已取消预生成的回复任务")
logger.info(
f"{self.log_prefix}{global_config.bot.nickname} 原本想要回复,但选择执行{action_type},不发表回复"
)
else:
content = " ".join([item[1] for item in gen_task.result() if item[0] == "text"])
elif generation_result := gen_task.result():
content = " ".join([item[1] for item in generation_result if item[0] == "text"])
logger.debug(f"{self.log_prefix} 预生成的回复任务已完成")
logger.info(
f"{self.log_prefix}{global_config.bot.nickname} 原本想要回复:{content},但选择执行{action_type},不发表回复"
)
else:
logger.warning(f"{self.log_prefix} 预生成的回复任务未生成有效内容")
action_message: Dict[str, Any] = message_data or target_message # type: ignore
if action_type == "reply":
@@ -417,11 +421,14 @@ class HeartFChatting:
# 只有在gen_task存在时才等待
if not gen_task:
reply_to_str = await self.build_reply_to_str(message_data)
gen_task = asyncio.create_task(self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal"))
gen_task = asyncio.create_task(
self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal",
)
)
gather_timeout = global_config.chat.thinking_timeout
try:
@@ -436,56 +443,71 @@ class HeartFChatting:
return False
else:
logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复 (focus模式)")
# 构建reply_to字符串
reply_to_str = await self.build_reply_to_str(action_message)
# 生成回复
with Timer("回复生成", cycle_timers):
response_set = await self._generate_response(
message_data=action_message,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.focus")
message_data=action_message,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.focus",
)
if not response_set:
logger.warning(f"{self.log_prefix}模型未生成回复内容")
return False
loop_info, reply_text,cycle_timers = await self._send_and_store_reply(response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result)
loop_info, reply_text, cycle_timers = await self._send_and_store_reply(
response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
)
return True
else:
# 并行执行:同时进行回复发送和动作执行
tasks = []
# 先置空防止未定义错误
background_reply_task = None
background_action_task = None
# 如果是并行执行且在normal模式下需要等待预生成的回复任务完成并发送回复
if self.loop_mode == ChatMode.NORMAL and is_parallel and gen_task:
async def handle_reply_task():
async def handle_reply_task() -> Tuple[Optional[Dict[str, Any]], str, Dict[str, float]]:
# 等待预生成的回复任务完成
gather_timeout = global_config.chat.thinking_timeout
try:
response_set = await asyncio.wait_for(gen_task, timeout=gather_timeout)
except asyncio.TimeoutError:
logger.warning(f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s已跳过")
logger.warning(
f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s已跳过"
)
return None, "", {}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return None, "", {}
if not response_set:
logger.warning(f"{self.log_prefix} 模型超时或生成回复内容为空")
return None, "", {}
reply_to_str = await self.build_reply_to_str(action_message)
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result)
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers,
thinking_id,
plan_result,
)
return loop_info, reply_text, cycle_timers_reply
# 添加回复任务到并行任务列表
tasks.append(asyncio.create_task(handle_reply_task()))
# 执行回复任务并赋值到变量
background_reply_task = asyncio.create_task(handle_reply_task())
# 动作执行任务
async def handle_action_task():
with Timer("动作执行", cycle_timers):
@@ -493,52 +515,55 @@ class HeartFChatting:
action_type, reasoning, action_data, cycle_timers, thinking_id, action_message
)
return success, reply_text, command
# 添加动作执行任务到并行任务列表
tasks.append(asyncio.create_task(handle_action_task()))
# 并行执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
# 执行动作任务并赋值到变量
background_action_task = asyncio.create_task(handle_action_task())
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
if len(tasks) == 2: # 有回复任务和动作任务
# 并行执行所有任务
if background_reply_task:
results = await asyncio.gather(
background_reply_task, background_action_task, return_exceptions=True
)
# 处理回复任务结果
reply_result = results[0]
if isinstance(reply_result, Exception):
if isinstance(reply_result, BaseException):
logger.error(f"{self.log_prefix} 回复任务执行异常: {reply_result}")
elif reply_result and reply_result[0] is not None:
reply_loop_info, reply_text_from_reply, _ = reply_result
# 处理动作任务结果
action_result = results[1]
if isinstance(action_result, Exception):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_result}")
action_task_result = results[1]
if isinstance(action_task_result, BaseException):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_task_result}")
else:
action_success, action_reply_text, action_command = action_result
else: # 只有动作任务
action_result = results[0]
if isinstance(action_result, Exception):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_result}")
action_success, action_reply_text, action_command = action_task_result
else:
results = await asyncio.gather(background_action_task, return_exceptions=True)
# 只有动作任务
action_task_result = results[0]
if isinstance(action_task_result, BaseException):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_task_result}")
else:
action_success, action_reply_text, action_command = action_result
action_success, action_reply_text, action_command = action_task_result
# 构建最终的循环信息
if reply_loop_info:
# 如果有回复信息使用回复的loop_info作为基础
loop_info = reply_loop_info
# 更新动作执行信息
loop_info["loop_action_info"].update({
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
})
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
)
reply_text = reply_text_from_reply
else:
# 没有回复信息构建纯动作的loop_info
@@ -555,11 +580,10 @@ class HeartFChatting:
}
reply_text = action_reply_text
if ENABLE_S4U:
await stop_typing()
await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text)
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
@@ -603,7 +627,7 @@ class HeartFChatting:
action: str,
reasoning: str,
action_data: dict,
cycle_timers: dict,
cycle_timers: Dict[str, float],
thinking_id: str,
action_message: dict,
) -> tuple[bool, str, str]:
@@ -712,7 +736,11 @@ class HeartFChatting:
return False
async def _generate_response(
self, message_data: dict, available_actions: Optional[Dict[str, ActionInfo]], reply_to: str, request_type: str = "chat.replyer.normal"
self,
message_data: dict,
available_actions: Optional[Dict[str, ActionInfo]],
reply_to: str,
request_type: str = "chat.replyer.normal",
) -> Optional[list]:
"""生成普通回复"""
try:
@@ -734,7 +762,7 @@ class HeartFChatting:
logger.error(f"{self.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None
async def _send_response(self, reply_set, reply_to, thinking_start_time, message_data):
async def _send_response(self, reply_set, reply_to, thinking_start_time, message_data) -> str:
current_time = time.time()
new_message_count = message_api.count_new_messages(
chat_id=self.chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time
@@ -746,13 +774,9 @@ class HeartFChatting:
need_reply = new_message_count >= random.randint(2, 4)
if need_reply:
logger.info(
f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复"
)
logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复")
else:
logger.info(
f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复"
)
logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复")
reply_text = ""
first_replied = False

View File

@@ -211,9 +211,8 @@ class ActionPlanner:
reasoning = f"Planner 内部处理错误: {outer_e}"
is_parallel = False
if mode == ChatMode.NORMAL:
if action in current_available_actions:
is_parallel = current_available_actions[action].parallel_action
if mode == ChatMode.NORMAL and action in current_available_actions:
is_parallel = current_available_actions[action].parallel_action
action_result = {
"action_type": action,
@@ -256,7 +255,7 @@ class ActionPlanner:
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=time.time()-3600,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
@@ -264,7 +263,7 @@ class ActionPlanner:
actions_before_now_block = build_readable_actions(
actions=actions_before_now,
)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
self.last_obs_time_mark = time.time()
@@ -276,7 +275,6 @@ class ActionPlanner:
if global_config.chat.at_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你或者at你"
by_what = "聊天内容"
target_prompt = '\n "target_message_id":"触发action的消息id"'
no_action_block = f"""重要说明: