Merge branch 'dev' into dev

This commit is contained in:
UnCLAS-Prommer
2025-07-28 21:59:48 +08:00
committed by GitHub
35 changed files with 1734 additions and 3146 deletions

View File

@@ -2,7 +2,7 @@ import asyncio
import time
import traceback
import random
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Tuple
from rich.traceback import install
from src.config.config import global_config
@@ -217,9 +217,11 @@ class HeartFChatting:
filter_bot=True,
)
if global_config.chat.focus_value != 0:
if len(new_messages_data) > 3 / pow(global_config.chat.focus_value,0.5):
if len(new_messages_data) > 3 / pow(global_config.chat.focus_value, 0.5):
self.loop_mode = ChatMode.FOCUS
self.energy_value = 10 + (len(new_messages_data) / (3 / pow(global_config.chat.focus_value,0.5))) * 10
self.energy_value = (
10 + (len(new_messages_data) / (3 / pow(global_config.chat.focus_value, 0.5))) * 10
)
return True
if self.energy_value >= 30:
@@ -254,20 +256,21 @@ class HeartFChatting:
)
person_name = await person_info_manager.get_value(person_id, "person_name")
return f"{person_name}:{message_data.get('processed_plain_text')}"
async def _send_and_store_reply(
self,
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers,
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers: Dict[str, float],
thinking_id,
plan_result):
plan_result,
) -> Tuple[Dict[str, Any], str, Dict[str, float]]:
with Timer("回复发送", cycle_timers):
reply_text = await self._send_response(response_set, reply_to_str, loop_start_time, action_message)
# 存储reply action信息
# 存储reply action信息
person_info_manager = get_person_info_manager()
person_id = person_info_manager.get_person_id(
action_message.get("chat_info_platform", ""),
@@ -275,7 +278,7 @@ class HeartFChatting:
)
person_name = await person_info_manager.get_value(person_id, "person_name")
action_prompt_display = f"你对{person_name}进行了回复:{reply_text}"
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
@@ -285,10 +288,9 @@ class HeartFChatting:
action_data={"reply_text": reply_text, "reply_to": reply_to_str},
action_name="reply",
)
# 构建循环信息
loop_info = {
loop_info: Dict[str, Any] = {
"loop_plan_info": {
"action_result": plan_result.get("action_result", {}),
},
@@ -299,8 +301,8 @@ class HeartFChatting:
"taken_time": time.time(),
},
}
return loop_info, reply_text,cycle_timers
return loop_info, reply_text, cycle_timers
async def _observe(self, message_data: Optional[Dict[str, Any]] = None):
# sourcery skip: hoist-statement-from-if, merge-comparisons, reintroduce-else
@@ -310,12 +312,12 @@ class HeartFChatting:
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
gen_task = None # 初始化gen_task变量避免UnboundLocalError
reply_to_str = "" # 初始化reply_to_str变量
# 创建新的循环信息
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考[模式:{self.loop_mode}]")
if ENABLE_S4U:
await send_typing()
@@ -337,19 +339,20 @@ class HeartFChatting:
skip_planner = False
if self.loop_mode == ChatMode.NORMAL:
# 过滤掉reply相关的动作检查是否还有其他动作
non_reply_actions = {k: v for k, v in available_actions.items()
if k not in ['reply', 'no_reply', 'no_action']}
non_reply_actions = {
k: v for k, v in available_actions.items() if k not in ["reply", "no_reply", "no_action"]
}
if not non_reply_actions:
skip_planner = True
logger.info(f"{self.log_prefix} Normal模式下没有可用动作直接回复")
# 直接设置为reply动作
action_type = "reply"
reasoning = ""
action_data = {"loop_start_time": loop_start_time}
is_parallel = False
# 构建plan_result用于后续处理
plan_result = {
"action_result": {
@@ -361,22 +364,25 @@ class HeartFChatting:
},
"action_prompt": "",
}
target_message = message_data
target_message = message_data
# 如果normal模式且不跳过规划器开始一个回复生成进程先准备好回复其实是和planer同时进行的
if not skip_planner:
reply_to_str = await self.build_reply_to_str(message_data)
gen_task = asyncio.create_task(self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal"))
gen_task = asyncio.create_task(
self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal",
)
)
if not skip_planner:
with Timer("规划器", cycle_timers):
plan_result, target_message = await self.action_planner.plan(mode=self.loop_mode)
action_result: dict = plan_result.get("action_result", {}) # type: ignore
action_result: Dict[str, Any] = plan_result.get("action_result", {}) # type: ignore
action_type, action_data, reasoning, is_parallel = (
action_result.get("action_type", "error"),
action_result.get("action_data", {}),
@@ -386,29 +392,27 @@ class HeartFChatting:
action_data["loop_start_time"] = loop_start_time
if action_type == "reply":
logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复")
elif is_parallel:
logger.info(
f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作"
)
logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作")
else:
# 只有在gen_task存在时才进行相关操作
if gen_task is not None:
if gen_task:
if not gen_task.done():
gen_task.cancel()
logger.debug(f"{self.log_prefix} 已取消预生成的回复任务")
logger.info(
f"{self.log_prefix}{global_config.bot.nickname} 原本想要回复,但选择执行{action_type},不发表回复"
)
else:
content = " ".join([item[1] for item in gen_task.result() if item[0] == "text"])
elif generation_result := gen_task.result():
content = " ".join([item[1] for item in generation_result if item[0] == "text"])
logger.debug(f"{self.log_prefix} 预生成的回复任务已完成")
logger.info(
f"{self.log_prefix}{global_config.bot.nickname} 原本想要回复:{content},但选择执行{action_type},不发表回复"
)
else:
logger.warning(f"{self.log_prefix} 预生成的回复任务未生成有效内容")
action_message: Dict[str, Any] = message_data or target_message # type: ignore
if action_type == "reply":
@@ -417,11 +421,14 @@ class HeartFChatting:
# 只有在gen_task存在时才等待
if not gen_task:
reply_to_str = await self.build_reply_to_str(message_data)
gen_task = asyncio.create_task(self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal"))
gen_task = asyncio.create_task(
self._generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal",
)
)
gather_timeout = global_config.chat.thinking_timeout
try:
@@ -436,56 +443,71 @@ class HeartFChatting:
return False
else:
logger.info(f"{self.log_prefix}{global_config.bot.nickname} 决定进行回复 (focus模式)")
# 构建reply_to字符串
reply_to_str = await self.build_reply_to_str(action_message)
# 生成回复
with Timer("回复生成", cycle_timers):
response_set = await self._generate_response(
message_data=action_message,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.focus")
message_data=action_message,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.focus",
)
if not response_set:
logger.warning(f"{self.log_prefix}模型未生成回复内容")
return False
loop_info, reply_text,cycle_timers = await self._send_and_store_reply(response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result)
loop_info, reply_text, cycle_timers = await self._send_and_store_reply(
response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
)
return True
else:
# 并行执行:同时进行回复发送和动作执行
tasks = []
# 先置空防止未定义错误
background_reply_task = None
background_action_task = None
# 如果是并行执行且在normal模式下需要等待预生成的回复任务完成并发送回复
if self.loop_mode == ChatMode.NORMAL and is_parallel and gen_task:
async def handle_reply_task():
async def handle_reply_task() -> Tuple[Optional[Dict[str, Any]], str, Dict[str, float]]:
# 等待预生成的回复任务完成
gather_timeout = global_config.chat.thinking_timeout
try:
response_set = await asyncio.wait_for(gen_task, timeout=gather_timeout)
except asyncio.TimeoutError:
logger.warning(f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s已跳过")
logger.warning(
f"{self.log_prefix} 并行执行:回复生成超时>{global_config.chat.thinking_timeout}s已跳过"
)
return None, "", {}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return None, "", {}
if not response_set:
logger.warning(f"{self.log_prefix} 模型超时或生成回复内容为空")
return None, "", {}
reply_to_str = await self.build_reply_to_str(action_message)
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result)
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
response_set,
reply_to_str,
loop_start_time,
action_message,
cycle_timers,
thinking_id,
plan_result,
)
return loop_info, reply_text, cycle_timers_reply
# 添加回复任务到并行任务列表
tasks.append(asyncio.create_task(handle_reply_task()))
# 执行回复任务并赋值到变量
background_reply_task = asyncio.create_task(handle_reply_task())
# 动作执行任务
async def handle_action_task():
with Timer("动作执行", cycle_timers):
@@ -493,52 +515,55 @@ class HeartFChatting:
action_type, reasoning, action_data, cycle_timers, thinking_id, action_message
)
return success, reply_text, command
# 添加动作执行任务到并行任务列表
tasks.append(asyncio.create_task(handle_action_task()))
# 并行执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
# 执行动作任务并赋值到变量
background_action_task = asyncio.create_task(handle_action_task())
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
if len(tasks) == 2: # 有回复任务和动作任务
# 并行执行所有任务
if background_reply_task:
results = await asyncio.gather(
background_reply_task, background_action_task, return_exceptions=True
)
# 处理回复任务结果
reply_result = results[0]
if isinstance(reply_result, Exception):
if isinstance(reply_result, BaseException):
logger.error(f"{self.log_prefix} 回复任务执行异常: {reply_result}")
elif reply_result and reply_result[0] is not None:
reply_loop_info, reply_text_from_reply, _ = reply_result
# 处理动作任务结果
action_result = results[1]
if isinstance(action_result, Exception):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_result}")
action_task_result = results[1]
if isinstance(action_task_result, BaseException):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_task_result}")
else:
action_success, action_reply_text, action_command = action_result
else: # 只有动作任务
action_result = results[0]
if isinstance(action_result, Exception):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_result}")
action_success, action_reply_text, action_command = action_task_result
else:
results = await asyncio.gather(background_action_task, return_exceptions=True)
# 只有动作任务
action_task_result = results[0]
if isinstance(action_task_result, BaseException):
logger.error(f"{self.log_prefix} 动作任务执行异常: {action_task_result}")
else:
action_success, action_reply_text, action_command = action_result
action_success, action_reply_text, action_command = action_task_result
# 构建最终的循环信息
if reply_loop_info:
# 如果有回复信息使用回复的loop_info作为基础
loop_info = reply_loop_info
# 更新动作执行信息
loop_info["loop_action_info"].update({
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
})
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
)
reply_text = reply_text_from_reply
else:
# 没有回复信息构建纯动作的loop_info
@@ -555,11 +580,10 @@ class HeartFChatting:
}
reply_text = action_reply_text
if ENABLE_S4U:
await stop_typing()
await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text)
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
@@ -603,7 +627,7 @@ class HeartFChatting:
action: str,
reasoning: str,
action_data: dict,
cycle_timers: dict,
cycle_timers: Dict[str, float],
thinking_id: str,
action_message: dict,
) -> tuple[bool, str, str]:
@@ -712,7 +736,11 @@ class HeartFChatting:
return False
async def _generate_response(
self, message_data: dict, available_actions: Optional[Dict[str, ActionInfo]], reply_to: str, request_type: str = "chat.replyer.normal"
self,
message_data: dict,
available_actions: Optional[Dict[str, ActionInfo]],
reply_to: str,
request_type: str = "chat.replyer.normal",
) -> Optional[list]:
"""生成普通回复"""
try:
@@ -734,7 +762,7 @@ class HeartFChatting:
logger.error(f"{self.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None
async def _send_response(self, reply_set, reply_to, thinking_start_time, message_data):
async def _send_response(self, reply_set, reply_to, thinking_start_time, message_data) -> str:
current_time = time.time()
new_message_count = message_api.count_new_messages(
chat_id=self.chat_stream.stream_id, start_time=thinking_start_time, end_time=current_time
@@ -746,13 +774,9 @@ class HeartFChatting:
need_reply = new_message_count >= random.randint(2, 4)
if need_reply:
logger.info(
f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复"
)
logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,使用引用回复")
else:
logger.info(
f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复"
)
logger.info(f"{self.log_prefix} 从思考到回复,共有{new_message_count}条新消息,不使用引用回复")
reply_text = ""
first_replied = False

View File

@@ -444,7 +444,7 @@ class MessageSending(MessageProcessBase):
is_emoji: bool = False,
thinking_start_time: float = 0,
apply_set_reply_logic: bool = False,
reply_to: str = None, # type: ignore
reply_to: Optional[str] = None,
):
# 调用父类初始化
super().__init__(

View File

@@ -211,9 +211,8 @@ class ActionPlanner:
reasoning = f"Planner 内部处理错误: {outer_e}"
is_parallel = False
if mode == ChatMode.NORMAL:
if action in current_available_actions:
is_parallel = current_available_actions[action].parallel_action
if mode == ChatMode.NORMAL and action in current_available_actions:
is_parallel = current_available_actions[action].parallel_action
action_result = {
"action_type": action,
@@ -256,7 +255,7 @@ class ActionPlanner:
actions_before_now = get_actions_by_timestamp_with_chat(
chat_id=self.chat_id,
timestamp_start=time.time()-3600,
timestamp_start=time.time() - 3600,
timestamp_end=time.time(),
limit=5,
)
@@ -264,7 +263,7 @@ class ActionPlanner:
actions_before_now_block = build_readable_actions(
actions=actions_before_now,
)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
self.last_obs_time_mark = time.time()
@@ -276,7 +275,6 @@ class ActionPlanner:
if global_config.chat.at_bot_inevitable_reply:
mentioned_bonus = "\n- 有人提到你或者at你"
by_what = "聊天内容"
target_prompt = '\n "target_message_id":"触发action的消息id"'
no_action_block = f"""重要说明:

View File

@@ -39,7 +39,7 @@ def init_prompt():
Prompt("你正在和{sender_name}聊天,这是你们之前聊的内容:", "chat_target_private1")
Prompt("在群里聊天", "chat_target_group2")
Prompt("{sender_name}聊天", "chat_target_private2")
Prompt(
"""
{expression_habits_block}
@@ -156,10 +156,18 @@ class DefaultReplyer:
extra_info: str = "",
available_actions: Optional[Dict[str, ActionInfo]] = None,
enable_tool: bool = True,
enable_timeout: bool = False,
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
回复器 (Replier): 核心逻辑,负责生成回复文本。
回复器 (Replier): 负责生成回复文本的核心逻辑
Args:
reply_to: 回复对象,格式为 "发送者:消息内容"
extra_info: 额外信息,用于补充上下文
available_actions: 可用的动作信息字典
enable_tool: 是否启用工具调用
Returns:
Tuple[bool, Optional[str], Optional[str]]: (是否成功, 生成的回复内容, 使用的prompt)
"""
prompt = None
if available_actions is None:
@@ -168,43 +176,25 @@ class DefaultReplyer:
# 3. 构建 Prompt
with Timer("构建Prompt", {}): # 内部计时器,可选保留
prompt = await self.build_prompt_reply_context(
reply_to = reply_to,
reply_to=reply_to,
extra_info=extra_info,
available_actions=available_actions,
enable_timeout=enable_timeout,
enable_tool=enable_tool,
)
if not prompt:
logger.warning("构建prompt失败跳过回复生成")
return False, None, None
# 4. 调用 LLM 生成回复
content = None
reasoning_content = None
model_name = "unknown_model"
# TODO: 复活这里
# reasoning_content = None
# model_name = "unknown_model"
try:
with Timer("LLM生成", {}): # 内部计时器,可选保留
# 加权随机选择一个模型配置
selected_model_config = self._select_weighted_model_config()
logger.info(
f"使用模型生成回复: {selected_model_config.get('name', 'N/A')} (选中概率: {selected_model_config.get('weight', 1.0)})"
)
express_model = LLMRequest(
model=selected_model_config,
request_type=self.request_type,
)
if global_config.debug.show_prompt:
logger.info(f"\n{prompt}\n")
else:
logger.debug(f"\n{prompt}\n")
content, (reasoning_content, model_name) = await express_model.generate_response_async(prompt)
logger.debug(f"replyer生成内容: {content}")
content = await self.llm_generate_content(prompt)
logger.debug(f"replyer生成内容: {content}")
except Exception as llm_e:
# 精简报错信息
@@ -220,62 +210,54 @@ class DefaultReplyer:
async def rewrite_reply_with_context(
self,
reply_data: Dict[str, Any],
raw_reply: str = "",
reason: str = "",
reply_to: str = "",
relation_info: str = "",
) -> Tuple[bool, Optional[str]]:
return_prompt: bool = False,
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
表达器 (Expressor): 核心逻辑,负责生成回复文本。
表达器 (Expressor): 负责重写和优化回复文本。
Args:
raw_reply: 原始回复内容
reason: 回复原因
reply_to: 回复对象,格式为 "发送者:消息内容"
relation_info: 关系信息
Returns:
Tuple[bool, Optional[str]]: (是否成功, 重写后的回复内容)
"""
try:
if not reply_data:
reply_data = {
"reply_to": reply_to,
"relation_info": relation_info,
}
with Timer("构建Prompt", {}): # 内部计时器,可选保留
prompt = await self.build_prompt_rewrite_context(
reply_data=reply_data,
raw_reply=raw_reply,
reason=reason,
reply_to=reply_to,
)
content = None
reasoning_content = None
model_name = "unknown_model"
# TODO: 复活这里
# reasoning_content = None
# model_name = "unknown_model"
if not prompt:
logger.error("Prompt 构建失败,无法生成回复。")
return False, None
return False, None, None
try:
with Timer("LLM生成", {}): # 内部计时器,可选保留
# 加权随机选择一个模型配置
selected_model_config = self._select_weighted_model_config()
logger.info(
f"使用模型重写回复: {selected_model_config.get('name', 'N/A')} (选中概率: {selected_model_config.get('weight', 1.0)})"
)
express_model = LLMRequest(
model=selected_model_config,
request_type=self.request_type,
)
content, (reasoning_content, model_name) = await express_model.generate_response_async(prompt)
logger.info(f"想要表达:{raw_reply}||理由:{reason}||生成回复: {content}\n")
content = await self.llm_generate_content(prompt)
logger.info(f"想要表达:{raw_reply}||理由:{reason}||生成回复: {content}\n")
except Exception as llm_e:
# 精简报错信息
logger.error(f"LLM 生成失败: {llm_e}")
return False, None # LLM 调用失败则无法生成回复
return False, None, prompt if return_prompt else None # LLM 调用失败则无法生成回复
return True, content
return True, content, prompt if return_prompt else None
except Exception as e:
logger.error(f"回复生成意外失败: {e}")
traceback.print_exc()
return False, None
return False, None, prompt if return_prompt else None
async def build_relation_info(self, reply_to: str = ""):
if not global_config.relationship.enable_relationship:
@@ -297,7 +279,16 @@ class DefaultReplyer:
return await relationship_fetcher.build_relation_info(person_id, points_num=5)
async def build_expression_habits(self, chat_history, target):
async def build_expression_habits(self, chat_history: str, target: str) -> str:
"""构建表达习惯块
Args:
chat_history: 聊天历史记录
target: 目标消息内容
Returns:
str: 表达习惯信息字符串
"""
if not global_config.expression.enable_expression:
return ""
@@ -343,11 +334,18 @@ class DefaultReplyer:
if style_habits_str.strip() and grammar_habits_str.strip():
expression_habits_title = "你可以参考以下的语言习惯和句法,如果情景合适就使用,不要盲目使用,不要生硬使用,以合理的方式结合到你的回复中:"
expression_habits_block = f"{expression_habits_title}\n{expression_habits_block}"
return f"{expression_habits_title}\n{expression_habits_block}"
return expression_habits_block
async def build_memory_block(self, chat_history: str, target: str) -> str:
"""构建记忆块
async def build_memory_block(self, chat_history, target):
Args:
chat_history: 聊天历史记录
target: 目标消息内容
Returns:
str: 记忆信息字符串
"""
if not global_config.memory.enable_memory:
return ""
@@ -375,12 +373,13 @@ class DefaultReplyer:
return memory_str
async def build_tool_info(self, chat_history, reply_to: str = "", enable_tool: bool = True):
async def build_tool_info(self, chat_history: str, reply_to: str = "", enable_tool: bool = True) -> str:
"""构建工具信息块
Args:
reply_data: 回复数据,包含要回复的消息内容
chat_history: 聊天历史
chat_history: 聊天历史记录
reply_to: 回复对象,格式为 "发送者:消息内容"
enable_tool: 是否启用工具调用
Returns:
str: 工具信息字符串
@@ -424,7 +423,15 @@ class DefaultReplyer:
logger.error(f"工具信息获取失败: {e}")
return ""
def _parse_reply_target(self, target_message: str) -> tuple:
def _parse_reply_target(self, target_message: str) -> Tuple[str, str]:
"""解析回复目标消息
Args:
target_message: 目标消息,格式为 "发送者:消息内容""发送者:消息内容"
Returns:
Tuple[str, str]: (发送者名称, 消息内容)
"""
sender = ""
target = ""
# 添加None检查防止NoneType错误
@@ -438,7 +445,15 @@ class DefaultReplyer:
target = parts[1].strip()
return sender, target
async def build_keywords_reaction_prompt(self, target):
async def build_keywords_reaction_prompt(self, target: Optional[str]) -> str:
"""构建关键词反应提示
Args:
target: 目标消息内容
Returns:
str: 关键词反应提示字符串
"""
# 关键词检测与反应
keywords_reaction_prompt = ""
try:
@@ -472,15 +487,25 @@ class DefaultReplyer:
return keywords_reaction_prompt
async def _time_and_run_task(self, coroutine, name: str):
"""一个简单的帮助函数,用于计时运行异步任务,返回任务名、结果和耗时"""
async def _time_and_run_task(self, coroutine, name: str) -> Tuple[str, Any, float]:
"""计时运行异步任务的辅助函数
Args:
coroutine: 要执行的协程
name: 任务名称
Returns:
Tuple[str, Any, float]: (任务名称, 任务结果, 执行耗时)
"""
start_time = time.time()
result = await coroutine
end_time = time.time()
duration = end_time - start_time
return name, result, duration
def build_s4u_chat_history_prompts(self, message_list_before_now: list, target_user_id: str) -> tuple[str, str]:
def build_s4u_chat_history_prompts(
self, message_list_before_now: List[Dict[str, Any]], target_user_id: str
) -> Tuple[str, str]:
"""
构建 s4u 风格的分离对话 prompt
@@ -489,7 +514,7 @@ class DefaultReplyer:
target_user_id: 目标用户ID当前对话对象
Returns:
tuple: (核心对话prompt, 背景对话prompt)
Tuple[str, str]: (核心对话prompt, 背景对话prompt)
"""
core_dialogue_list = []
background_dialogue_list = []
@@ -508,7 +533,7 @@ class DefaultReplyer:
# 其他用户的对话
background_dialogue_list.append(msg_dict)
except Exception as e:
logger.error(f"![1753364551656](image/default_generator/1753364551656.png)记录: {msg_dict}, 错误: {e}")
logger.error(f"处理消息记录时出错: {msg_dict}, 错误: {e}")
# 构建背景对话 prompt
background_dialogue_prompt = ""
@@ -553,8 +578,25 @@ class DefaultReplyer:
sender: str,
target: str,
chat_info: str,
):
"""构建 mai_think 上下文信息"""
) -> Any:
"""构建 mai_think 上下文信息
Args:
chat_id: 聊天ID
memory_block: 记忆块内容
relation_info: 关系信息
time_block: 时间块内容
chat_target_1: 聊天目标1
chat_target_2: 聊天目标2
mood_prompt: 情绪提示
identity_block: 身份块内容
sender: 发送者名称
target: 目标消息内容
chat_info: 聊天信息
Returns:
Any: mai_think 实例
"""
mai_think = mai_thinking_manager.get_mai_think(chat_id)
mai_think.memory_block = memory_block
mai_think.relation_info_block = relation_info
@@ -573,19 +615,17 @@ class DefaultReplyer:
reply_to: str,
extra_info: str = "",
available_actions: Optional[Dict[str, ActionInfo]] = None,
enable_timeout: bool = False,
enable_tool: bool = True,
) -> str: # sourcery skip: merge-else-if-into-elif, remove-redundant-if
"""
构建回复器上下文
Args:
reply_data: 回复数据
replay_data 包含以下字段:
structured_info: 结构化信息,一般是工具调用获得的信息
reply_to: 回复对象
extra_info/extra_info_block: 额外信息
reply_to: 回复对象,格式为 "发送者:消息内容"
extra_info: 额外信息,用于补充上下文
available_actions: 可用动作
enable_timeout: 是否启用超时处理
enable_tool: 是否启用工具调用
Returns:
str: 构建好的上下文
@@ -800,15 +840,14 @@ class DefaultReplyer:
async def build_prompt_rewrite_context(
self,
reply_data: Dict[str, Any],
raw_reply: str,
reason: str,
reply_to: str,
) -> str:
chat_stream = self.chat_stream
chat_id = chat_stream.stream_id
is_group_chat = bool(chat_stream.group_info)
reply_to = reply_data.get("reply_to", "none")
raw_reply = reply_data.get("raw_reply", "")
reason = reply_data.get("reason", "")
sender, target = self._parse_reply_target(reply_to)
# 添加情绪状态获取
@@ -835,7 +874,7 @@ class DefaultReplyer:
# 并行执行2个构建任务
expression_habits_block, relation_info = await asyncio.gather(
self.build_expression_habits(chat_talking_prompt_half, target),
self.build_relation_info(reply_data),
self.build_relation_info(reply_to),
)
keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target)
@@ -938,6 +977,30 @@ class DefaultReplyer:
display_message=display_message,
)
async def llm_generate_content(self, prompt: str) -> str:
with Timer("LLM生成", {}): # 内部计时器,可选保留
# 加权随机选择一个模型配置
selected_model_config = self._select_weighted_model_config()
logger.info(
f"使用模型生成回复: {selected_model_config.get('name', 'N/A')} (选中概率: {selected_model_config.get('weight', 1.0)})"
)
express_model = LLMRequest(
model=selected_model_config,
request_type=self.request_type,
)
if global_config.debug.show_prompt:
logger.info(f"\n{prompt}\n")
else:
logger.debug(f"\n{prompt}\n")
# TODO: 这里的_应该做出替换
content, _ = await express_model.generate_response_async(prompt)
logger.debug(f"replyer生成内容: {content}")
return content
def weighted_sample_no_replacement(items, weights, k) -> list:
"""
@@ -996,9 +1059,7 @@ async def get_prompt_info(message: str, threshold: float):
logger.debug(f"获取知识库内容耗时: {(end_time - start_time):.3f}")
logger.debug(f"获取知识库内容,相关信息:{related_info[:100]}...,信息长度: {len(related_info)}")
# 格式化知识信息
formatted_prompt_info = f"你有以下这些**知识**\n{related_info}\n请你**记住上面的知识**,之后可能会用到。\n"
return formatted_prompt_info
return f"你有以下这些**知识**\n{related_info}\n请你**记住上面的知识**,之后可能会用到。\n"
else:
logger.debug("从LPMM知识库获取知识失败可能是从未导入过知识返回空知识...")
return ""