feat(core): 实现消息异步处理并引入LLM驱动的智能表情回应
本次更新对系统核心处理流程和插件功能进行了重要升级,主要包含以下两方面:
1. **消息处理异步化**:
- 在 `main.py` 中引入了 `asyncio.create_task` 机制,将每条消息的处理过程包装成一个独立的后台任务。
- 这解决了长时间运行的AI或插件操作可能阻塞主事件循环的问题,显著提升了机器人的响应速度和系统稳定性。
- 为后台任务添加了完成回调,现在可以详细地记录每个消息处理任务的成功、失败或取消状态及其耗时,便于监控和调试。
2. **`set_emoji_like` 插件智能化**:
- 为 `set_emoji_like` 插件增加了LLM驱动的表情选择功能。当动作指令未指定具体表情时,插件会自动构建包含聊天上下文、情绪和人设的提示,请求LLM选择一个最合适的表情进行回应。
- 为支持此功能,对AFC规划器的提示词进行了优化,为LLM提供了更清晰的参数示例和规则,提高了动作生成的准确性。
此外,为了统一日志规范,将 `[所见]` 消息接收日志集中到 `bot.py` 中,确保在任何过滤逻辑执行前记录所有收到的消息,并移除了插件中重复的日志。
This commit is contained in:
@@ -13,11 +13,14 @@ from src.common.logger import get_logger
|
||||
from src.plugin_system.apis import send_api
|
||||
from .qq_emoji_list import qq_face
|
||||
from src.plugin_system.base.component_types import ChatType
|
||||
from src.plugin_system.apis import llm_api
|
||||
from src.config.config import model_config, global_config
|
||||
from src.chat.utils.chat_message_builder import build_readable_messages
|
||||
|
||||
logger = get_logger("set_emoji_like_plugin")
|
||||
|
||||
|
||||
def get_emoji_id(emoji_input: str) -> str | None:
|
||||
async def get_emoji_id(emoji_input: str) -> str | None:
|
||||
"""根据输入获取表情ID"""
|
||||
# 如果输入本身就是数字ID,直接返回
|
||||
if emoji_input.isdigit() or (isinstance(emoji_input, str) and emoji_input.startswith("😊")):
|
||||
@@ -99,11 +102,19 @@ class SetEmojiLikeAction(BaseAction):
|
||||
set_like = self.action_data.get("set", True)
|
||||
|
||||
if not emoji_input:
|
||||
logger.error("未提供表情")
|
||||
return False, "未提供表情"
|
||||
logger.info("未提供表情,将由LLM决定")
|
||||
try:
|
||||
emoji_input = await self.ask_llm_for_emoji()
|
||||
if not emoji_input:
|
||||
logger.error("LLM未能选择表情")
|
||||
return False, "LLM未能选择表情"
|
||||
except Exception as e:
|
||||
logger.error(f"请求LLM选择表情时出错: {e}")
|
||||
return False, f"请求LLM选择表情时出错: {e}"
|
||||
|
||||
logger.info(f"设置表情回应: {emoji_input}, 是否设置: {set_like}")
|
||||
|
||||
emoji_id = get_emoji_id(emoji_input)
|
||||
emoji_id = await get_emoji_id(emoji_input)
|
||||
if not emoji_id:
|
||||
logger.error(f"找不到表情: '{emoji_input}'。请从可用列表中选择。")
|
||||
await self.store_action_info(
|
||||
@@ -160,6 +171,71 @@ class SetEmojiLikeAction(BaseAction):
|
||||
)
|
||||
return False, f"设置表情回应失败: {e}"
|
||||
|
||||
async def ask_llm_for_emoji(self) -> str | None:
|
||||
"""构建Prompt并请求LLM选择一个表情"""
|
||||
from src.mood.mood_manager import mood_manager
|
||||
from src.individuality.individuality import get_individuality
|
||||
from src.chat.message_manager.message_manager import message_manager
|
||||
|
||||
# 1. 获取上下文信息
|
||||
stream_context = message_manager.stream_contexts.get(self.chat_stream.stream_id)
|
||||
if not stream_context:
|
||||
logger.error(f"无法为 stream_id '{self.chat_stream.stream_id}' 找到 StreamContext")
|
||||
return None
|
||||
|
||||
history_messages = stream_context.get_latest_messages(20)
|
||||
chat_context = build_readable_messages(
|
||||
[msg.flatten() for msg in history_messages],
|
||||
replace_bot_name=True,
|
||||
timestamp_mode="normal_no_YMD",
|
||||
truncate=True,
|
||||
)
|
||||
|
||||
target_message_content = self.action_message.get("processed_plain_text", "")
|
||||
mood = mood_manager.get_mood_by_chat_id(self.chat_stream.stream_id).mood_state
|
||||
identity = await get_individuality().get_personality_block()
|
||||
|
||||
# 2. 构建Prompt
|
||||
emoji_options_str = ", ".join(self.emoji_options)
|
||||
bot_name = global_config.bot.nickname or "爱莉希雅"
|
||||
prompt = f"""
|
||||
# 指令:选择一个最合适的表情来回应消息
|
||||
|
||||
## 场景描述
|
||||
你的名字是“{bot_name}”。
|
||||
{identity}
|
||||
你现在的心情是:{mood}
|
||||
|
||||
## 聊天上下文
|
||||
下面是最近的聊天记录:
|
||||
{chat_context}
|
||||
|
||||
## 你的任务
|
||||
你需要针对下面的这条消息,选择一个最合适的表情来“贴”在上面,以表达你的心情和回应。
|
||||
目标消息:"{target_message_content}"
|
||||
|
||||
## 表情选项
|
||||
请从以下表情中,选择一个最能代表你此刻心情的表情。你只能选择一个,并直接返回它的【名称】。
|
||||
{emoji_options_str}
|
||||
|
||||
## 输出要求
|
||||
直接输出你选择的表情【名称】,不要添加任何多余的文字、解释或标点符号。
|
||||
|
||||
你选择的表情名称是:
|
||||
"""
|
||||
|
||||
# 3. 调用LLM
|
||||
success, response, _, _ = await llm_api.generate_with_model(
|
||||
prompt, model_config.model_task_config.tool_executor
|
||||
)
|
||||
|
||||
if success and response:
|
||||
# 清理LLM返回的可能存在的额外字符
|
||||
cleaned_response = re.sub(r"[\[\]\'\"]", "", response).strip()
|
||||
logger.info(f"LLM选择了表情: '{cleaned_response}'")
|
||||
return cleaned_response
|
||||
|
||||
return None
|
||||
|
||||
# ===== 插件注册 =====
|
||||
@register_plugin
|
||||
|
||||
@@ -431,6 +431,9 @@ class ChatBot:
|
||||
# 处理消息内容,生成纯文本
|
||||
await message.process()
|
||||
|
||||
# 在这里打印[所见]日志,确保在所有处理和过滤之前记录
|
||||
logger.info(f"\u001b[38;5;118m{message.message_info.user_info.user_nickname}:{message.processed_plain_text}\u001b[0m")
|
||||
|
||||
# 过滤检查
|
||||
if _check_ban_words(message.processed_plain_text, chat, user_info) or _check_ban_regex( # type: ignore
|
||||
message.raw_message, # type: ignore
|
||||
|
||||
33
src/main.py
33
src/main.py
@@ -3,6 +3,9 @@ import asyncio
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
from functools import partial
|
||||
from typing import Dict, Any
|
||||
|
||||
from maim_message import MessageServer
|
||||
|
||||
from src.common.remote import TelemetryHeartBeatTask
|
||||
@@ -86,6 +89,20 @@ install(extra_lines=3)
|
||||
logger = get_logger("main")
|
||||
|
||||
|
||||
def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float):
|
||||
"""后台任务完成时的回调函数"""
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
try:
|
||||
task.result() # 如果任务有异常,这里会重新抛出
|
||||
logger.info(f"消息 {message_id} 的后台任务 (ID: {id(task)}) 已成功完成, 耗时: {duration:.2f}s")
|
||||
except asyncio.CancelledError:
|
||||
logger.warning(f"消息 {message_id} 的后台任务 (ID: {id(task)}) 被取消, 耗时: {duration:.2f}s")
|
||||
except Exception:
|
||||
logger.error(f"处理消息 {message_id} 的后台任务 (ID: {id(task)}) 出现未捕获的异常, 耗时: {duration:.2f}s:")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
|
||||
class MainSystem:
|
||||
def __init__(self):
|
||||
self.hippocampus_manager = hippocampus_manager
|
||||
@@ -165,6 +182,20 @@ class MainSystem:
|
||||
except Exception as e:
|
||||
logger.error(f"停止记忆管理器时出错: {e}")
|
||||
|
||||
async def _message_process_wrapper(self, message_data: Dict[str, Any]):
|
||||
"""并行处理消息的包装器"""
|
||||
try:
|
||||
start_time = time.time()
|
||||
message_id = message_data.get("message_info", {}).get("message_id", "UNKNOWN")
|
||||
# 创建后台任务
|
||||
task = asyncio.create_task(chat_bot.message_process(message_data))
|
||||
logger.info(f"已为消息 {message_id} 创建后台处理任务 (ID: {id(task)})")
|
||||
# 添加一个回调函数,当任务完成时,它会被调用
|
||||
task.add_done_callback(partial(_task_done_callback, message_id=message_id, start_time=start_time))
|
||||
except Exception:
|
||||
logger.error("在创建消息处理任务时发生严重错误:")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
async def initialize(self):
|
||||
"""初始化系统组件"""
|
||||
logger.info(f"正在唤醒{global_config.bot.nickname}......")
|
||||
@@ -294,7 +325,7 @@ MoFox_Bot(第三方修改版)
|
||||
# await asyncio.sleep(0.5) #防止logger输出飞了
|
||||
|
||||
# 将bot.py中的chat_bot.message_process消息处理函数注册到api.py的消息处理基类中
|
||||
self.app.register_message_handler(chat_bot.message_process)
|
||||
self.app.register_message_handler(self._message_process_wrapper)
|
||||
|
||||
# 启动消息重组器的清理任务
|
||||
from src.utils.message_chunker import reassembler
|
||||
|
||||
@@ -64,10 +64,6 @@ class AffinityChatter(BaseChatter):
|
||||
try:
|
||||
unread_messages = context.get_unread_messages()
|
||||
|
||||
# 像hfc一样,打印收到的消息
|
||||
for msg in unread_messages:
|
||||
logger.info(f"{SOFT_GREEN}[所见] {msg.user_info.user_nickname}:{msg.processed_plain_text}{RESET_COLOR}")
|
||||
|
||||
# 使用增强版规划器处理消息
|
||||
actions, target_message = await self.planner.plan(context=context)
|
||||
self.stats["plans_created"] += 1
|
||||
|
||||
@@ -5,6 +5,7 @@ PlanFilter: 接收 Plan 对象,根据不同模式的逻辑进行筛选,决
|
||||
import orjson
|
||||
import time
|
||||
import traceback
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
@@ -430,8 +431,12 @@ class ChatterPlanFilter:
|
||||
continue
|
||||
|
||||
action = single_action_obj.get("action_type", "no_action")
|
||||
reasoning = single_action_obj.get("reason", "未提供原因")
|
||||
action_data = {k: v for k, v in single_action_obj.items() if k not in ["action_type", "reason"]}
|
||||
reasoning = single_action_obj.get("reasoning", "未提供原因") # 兼容旧的reason字段
|
||||
action_data = single_action_obj.get("action_data", {})
|
||||
|
||||
# 为了向后兼容,如果action_data不存在,则从顶层字段获取
|
||||
if not action_data:
|
||||
action_data = {k: v for k, v in single_action_obj.items() if k not in ["action_type", "reason", "reasoning", "thinking"]}
|
||||
|
||||
# 保留原始的thinking字段(如果有)
|
||||
thinking = action_json.get("thinking", "")
|
||||
@@ -536,6 +541,12 @@ class ChatterPlanFilter:
|
||||
if action_info.action_parameters:
|
||||
for p_name, p_desc in action_info.action_parameters.items():
|
||||
# 为参数描述添加一个通用示例值
|
||||
if action_name == "set_emoji_like" and p_name == "emoji":
|
||||
# 特殊处理set_emoji_like的emoji参数
|
||||
from plugins.set_emoji_like.qq_emoji_list import qq_face
|
||||
emoji_options = [re.search(r"\[表情:(.+?)\]", name).group(1) for name in qq_face.values() if re.search(r"\[表情:(.+?)\]", name)]
|
||||
example_value = f"<从'{', '.join(emoji_options[:10])}...'中选择一个>"
|
||||
else:
|
||||
example_value = f"<{p_desc}>"
|
||||
params_json_list.append(f' "{p_name}": "{example_value}"')
|
||||
|
||||
|
||||
@@ -88,6 +88,7 @@ def init_prompts():
|
||||
|
||||
**强制规则**:
|
||||
- 对于每一个需要目标消息的动作(如`reply`, `poke_user`, `set_emoji_like`),你 **必须** 在`action_data`中提供准确的`target_message_id`,这个ID来源于`## 未读历史消息`中消息前的`<m...>`标签。
|
||||
- 当你选择的动作需要参数时(例如 `set_emoji_like` 需要 `emoji` 参数),你 **必须** 在 `action_data` 中提供所有必需的参数及其对应的值。
|
||||
|
||||
如果没有合适的回复对象或不需要回复,输出空的 actions 数组:
|
||||
```json
|
||||
|
||||
Reference in New Issue
Block a user