This commit is contained in:
Windpicker-owo
2025-10-04 01:38:47 +08:00
7 changed files with 439 additions and 171 deletions

1
src/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
# This file makes src/api a Python package.

48
src/api/message_router.py Normal file
View File

@@ -0,0 +1,48 @@
import time
from typing import Literal
from fastapi import APIRouter, HTTPException, Query
from src.config.config import global_config
from src.plugin_system.apis import message_api
router = APIRouter()
@router.get("/messages/recent")
async def get_message_stats(
days: int = Query(1, ge=1, description="指定查询过去多少天的数据"),
message_type: Literal["all", "sent", "received"] = Query("all", description="筛选消息类型: 'sent' (BOT发送的), 'received' (BOT接收的), or 'all' (全部)")
):
"""
获取BOT在指定天数内的消息统计数据。
"""
try:
end_time = time.time()
start_time = end_time - (days * 24 * 3600)
messages = await message_api.get_messages_by_time(start_time, end_time)
sent_count = 0
received_count = 0
bot_qq = str(global_config.bot.qq_account)
for msg in messages:
if msg.get("user_id") == bot_qq:
sent_count += 1
else:
received_count += 1
if message_type == "sent":
return {"days": days, "message_type": message_type, "count": sent_count}
elif message_type == "received":
return {"days": days, "message_type": message_type, "count": received_count}
else:
return {
"days": days,
"message_type": message_type,
"sent_count": sent_count,
"received_count": received_count,
"total_count": len(messages)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -511,11 +511,11 @@ class ExpressionLearnerManager:
os.path.join(base_dir, "learnt_grammar"),
]
for directory in directories_to_create:
try:
try:
for directory in directories_to_create:
os.makedirs(directory, exist_ok=True)
logger.debug(f"确保目录存在: {directory}")
except Exception as e:
logger.debug(f"确保目录存在: {directory}")
except Exception as e:
logger.error(f"创建目录失败 {directory}: {e}")
@staticmethod

View File

@@ -245,6 +245,16 @@ MoFox_Bot(第三方修改版)
# start_api_server()
# logger.info("API服务器启动成功")
# 注册API路由
try:
from src.api.message_router import router as message_router
self.server.register_router(message_router, prefix="/api")
logger.info("API路由注册成功")
except ImportError as e:
logger.error(f"导入API路由失败: {e}")
except Exception as e:
logger.error(f"注册API路由时发生错误: {e}")
# 加载所有actions包括默认的和插件的
plugin_manager.load_all_plugins()

View File

@@ -12,7 +12,7 @@
"host_application": {
"min_version": "0.10.0"
},
"keywords": ["emoji", "reaction", "like", "表情", "回应", "点赞"],
"keywords": ["主动思考","自己发消息"],
"categories": ["Chat", "Integration"],
"default_locale": "zh-CN",

View File

@@ -1,6 +1,7 @@
import asyncio
import random
import time
import traceback
from datetime import datetime
from maim_message import UserInfo
@@ -20,77 +21,69 @@ logger = get_logger(__name__)
class ColdStartTask(AsyncTask):
"""
冷启动任务,专门用于处理那些在白名单里,但从未与机器人发生过交互的用户
它的核心职责是“破冰”,主动创建聊天流并发起第一次问候
冷启动任务,在机器人启动时执行一次
它的核心职责是“唤醒”那些因重启而“沉睡”的聊天流,确保它们能够接收主动思考
对于在白名单中但从未有过记录的全新用户,它也会发起第一次“破冰”问候。
"""
def __init__(self):
def __init__(self, bot_start_time: float):
super().__init__(task_name="ColdStartTask")
self.chat_manager = get_chat_manager()
self.executor = ProactiveThinkerExecutor()
self.bot_start_time = bot_start_time
async def run(self):
"""任务主循环,周期性地检查是否有需要“破冰”的新用户"""
logger.info("冷启动任务已启动,将周期性检查白名单中的新朋友。")
# 初始等待一段时间,确保其他服务(如数据库)完全启动
await asyncio.sleep(100)
"""任务主逻辑,在启动后执行一次白名单扫描"""
logger.info("冷启动任务已启动,将在短暂延迟后开始唤醒沉睡的聊天流...")
await asyncio.sleep(30) # 延迟以确保所有服务和聊天流已从数据库加载完毕
while True:
try:
logger.info("【冷启动】开始扫描白名单,寻找从未聊过的用户...")
try:
logger.info("【冷启动】开始扫描白名单,唤醒沉睡的聊天流...")
# 从全局配置中获取私聊白名单
enabled_private_chats = global_config.proactive_thinking.enabled_private_chats
if not enabled_private_chats:
logger.debug("【冷启动】私聊白名单为空,任务暂停一小时。")
await asyncio.sleep(3600) # 白名单为空时,没必要频繁检查
continue
enabled_private_chats = global_config.proactive_thinking.enabled_private_chats
if not enabled_private_chats:
logger.debug("【冷启动】私聊白名单为空,任务结束。")
return
# 遍历白名单中的每一个用户
for chat_id in enabled_private_chats:
try:
platform, user_id_str = chat_id.split(":")
user_id = int(user_id_str)
for chat_id in enabled_private_chats:
try:
platform, user_id_str = chat_id.split(":")
user_id = int(user_id_str)
# 【核心逻辑】使用 chat_api 检查该用户是否已经存在聊天流ChatStream
# 如果返回了 ChatStream 对象,说明已经聊过天了,不是本次任务的目标
if chat_api.get_stream_by_user_id(user_id_str, platform):
continue # 跳过已存在的用户
should_wake_up = False
stream = chat_api.get_stream_by_user_id(user_id_str, platform)
logger.info(f"【冷启动】发现白名单新用户 {chat_id},准备发起第一次问候。")
if not stream:
should_wake_up = True
logger.info(f"【冷启动】发现全新用户 {chat_id},准备发起第一次问候。")
elif stream.last_active_time < self.bot_start_time:
should_wake_up = True
logger.info(f"【冷启动】发现沉睡的聊天流 {chat_id} (最后活跃于 {datetime.fromtimestamp(stream.last_active_time)}),准备唤醒。")
# 【增强体验】尝试从关系数据库中获取该用户的昵称
# 这样打招呼时可以更亲切而不是只知道一个冷冰冰的ID
if should_wake_up:
person_id = person_api.get_person_id(platform, user_id)
nickname = await person_api.get_person_value(person_id, "nickname")
# 如果数据库里有昵称,就用数据库里的;如果没有,就用 "用户+ID" 作为备用
user_nickname = nickname or f"用户{user_id}"
# 创建 UserInfo 对象,这是创建聊天流的必要信息
user_info = UserInfo(platform=platform, user_id=str(user_id), user_nickname=user_nickname)
# 【关键步骤】主动创建聊天流。
# 创建后,该用户就进入了机器人的“好友列表”,后续将由 ProactiveThinkingTask 接管
# 使用 get_or_create_stream 来安全地获取或创建流
stream = await self.chat_manager.get_or_create_stream(platform, user_info)
await self.executor.execute(stream_id=stream.stream_id, start_mode="cold_start")
logger.info(f"【冷启动】已为新用户 {chat_id} (昵称: {user_nickname}) 创建聊天流并发送问候。")
formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private"
await self.executor.execute(stream_id=formatted_stream_id, start_mode="cold_start")
logger.info(f"【冷启动】已为用户 {chat_id} (昵称: {user_nickname}) 发送唤醒/问候消息。")
except ValueError:
logger.warning(f"【冷启动】白名单条目格式错误或用户ID无效已跳过: {chat_id}")
except Exception as e:
logger.error(f"【冷启动】处理用户 {chat_id} 时发生未知错误: {e}", exc_info=True)
except ValueError:
logger.warning(f"【冷启动】白名单条目格式错误或用户ID无效已跳过: {chat_id}")
except Exception as e:
logger.error(f"【冷启动】处理用户 {chat_id} 时发生未知错误: {e}", exc_info=True)
# 完成一轮检查后,进入长时休眠
await asyncio.sleep(3600)
except asyncio.CancelledError:
logger.info("冷启动任务被正常取消。")
break
except Exception as e:
logger.error(f"【冷启动】任务出现严重错误将在5分钟后重试: {e}", exc_info=True)
await asyncio.sleep(300)
except asyncio.CancelledError:
logger.info("冷启动任务被正常取消。")
except Exception as e:
logger.error(f"【冷启动】任务出现严重错误: {e}", exc_info=True)
finally:
logger.info("【冷启动】任务执行完毕。")
class ProactiveThinkingTask(AsyncTask):
@@ -156,49 +149,56 @@ class ProactiveThinkingTask(AsyncTask):
enabled_private = set(global_config.proactive_thinking.enabled_private_chats)
enabled_groups = set(global_config.proactive_thinking.enabled_group_chats)
# 获取当前所有聊天流的快照
all_streams = list(self.chat_manager.streams.values())
for stream in all_streams:
# 1. 检查该聊天是否在白名单内(或白名单为空时默认允许)
is_whitelisted = False
if stream.group_info: # 群聊
if not enabled_groups or f"qq:{stream.group_info.group_id}" in enabled_groups:
is_whitelisted = True
else: # 私聊
if not enabled_private or f"qq:{stream.user_info.user_id}" in enabled_private:
is_whitelisted = True
if not is_whitelisted:
continue # 不在白名单内,跳过
# 2. 【核心逻辑】检查聊天冷却时间是否足够长
time_since_last_active = time.time() - stream.last_active_time
if time_since_last_active > next_interval:
logger.info(
f"【日常唤醒】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。"
)
# 构建符合 executor 期望的 stream_id 格式
if stream.group_info and stream.group_info.group_id:
formatted_stream_id = f"{stream.user_info.platform}:{stream.group_info.group_id}:group"
elif stream.user_info and stream.user_info.user_id:
formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private"
else:
logger.warning(f"【日常唤醒】跳过 stream {stream.stream_id},因为它缺少有效的用户信息或群组信息。")
# 分别处理私聊和群聊
# 1. 处理私聊:直接遍历白名单,确保能覆盖到所有(包括本次运行尚未活跃的)用户
for chat_id in enabled_private:
try:
platform, user_id_str = chat_id.split(":")
# 【核心逻辑】检查聊天流是否存在。不存在则跳过交由ColdStartTask处理。
stream = chat_api.get_stream_by_user_id(user_id_str, platform)
if not stream:
continue
await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up")
# 检查冷却时间
time_since_last_active = time.time() - stream.last_active_time
if time_since_last_active > next_interval:
logger.info(
f"【日常唤醒-私聊】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。"
)
formatted_stream_id = f"{stream.user_info.platform}:{stream.user_info.user_id}:private"
await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up")
stream.update_active_time()
await self.chat_manager._save_stream(stream)
# 【关键步骤】在触发后,立刻更新活跃时间并保存。
# 这可以防止在同一个检查周期内,对同一个目标因为意外的延迟而发送多条消息。
stream.update_active_time()
await self.chat_manager._save_stream(stream)
except ValueError:
logger.warning(f"【日常唤醒】私聊白名单条目格式错误,已跳过: {chat_id}")
except Exception as e:
logger.error(f"【日常唤醒】处理私聊用户 {chat_id} 时发生未知错误: {e}", exc_info=True)
# 2. 处理群聊:遍历内存中的活跃流(群聊不存在冷启动问题)
all_streams = list(self.chat_manager.streams.values())
for stream in all_streams:
if not stream.group_info:
continue # 只处理群聊
# 检查群聊是否在白名单内
if not enabled_groups or f"qq:{stream.group_info.group_id}" in enabled_groups:
# 检查冷却时间
time_since_last_active = time.time() - stream.last_active_time
if time_since_last_active > next_interval:
logger.info(
f"【日常唤醒-群聊】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。"
)
formatted_stream_id = f"{stream.user_info.platform}:{stream.group_info.group_id}:group"
await self.executor.execute(stream_id=formatted_stream_id, start_mode="wake_up")
stream.update_active_time()
await self.chat_manager._save_stream(stream)
except asyncio.CancelledError:
logger.info("日常唤醒任务被正常取消。")
break
except Exception as e:
traceback.print_exc() # 打印完整的堆栈跟踪
logger.error(f"【日常唤醒】任务出现错误将在60秒后重试: {e}", exc_info=True)
await asyncio.sleep(60)
@@ -215,13 +215,15 @@ class ProactiveThinkerEventHandler(BaseEventHandler):
logger.info("检测到插件启动事件,正在初始化【主动思考】")
# 检查总开关
if global_config.proactive_thinking.enable:
bot_start_time = time.time() # 记录“诞生时刻”
# 启动负责“日常唤醒”的核心任务
proactive_task = ProactiveThinkingTask()
await async_task_manager.add_task(proactive_task)
# 检查“冷启动”功能的独立开关
if global_config.proactive_thinking.enable_cold_start:
cold_start_task = ColdStartTask()
cold_start_task = ColdStartTask(bot_start_time)
await async_task_manager.add_task(cold_start_task)
else:

View File

@@ -5,6 +5,7 @@ import orjson
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.mood.mood_manager import mood_manager
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import (
chat_api,
@@ -29,7 +30,10 @@ class ProactiveThinkerExecutor:
"""
def __init__(self):
# 可以在此初始化所需模块例如LLM请求器等
"""
初始化 ProactiveThinkerExecutor 实例。
目前无需初始化操作。
"""
pass
async def execute(self, stream_id: str, start_mode: str = "wake_up"):
@@ -76,7 +80,7 @@ class ProactiveThinkerExecutor:
plan_prompt = self._build_plan_prompt(context, start_mode, topic, reason)
is_success, response, _, _ = await llm_api.generate_with_model(
prompt=plan_prompt, model_config=model_config.model_task_config.utils
prompt=plan_prompt, model_config=model_config.model_task_config.replyer
)
if is_success and response:
@@ -91,77 +95,80 @@ class ProactiveThinkerExecutor:
logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流")
def _get_stream_from_id(self, stream_id: str):
"""根据stream_id解析并获取stream对象"""
"""
根据 stream_id 解析并获取对应的聊天流对象。
Args:
stream_id: 聊天流的唯一标识符,格式为 "platform:chat_id:stream_type"
Returns:
对应的 ChatStream 对象,如果解析失败或找不到则返回 None。
"""
try:
platform, chat_id, stream_type = stream_id.split(":")
if stream_type == "private":
return chat_api.ChatManager.get_private_stream_by_user_id(platform=platform, user_id=chat_id)
elif stream_type == "group":
return chat_api.ChatManager.get_group_stream_by_group_id(platform=platform,group_id=chat_id)
return chat_api.ChatManager.get_group_stream_by_group_id(platform=platform, group_id=chat_id)
except Exception as e:
logger.error(f"解析 stream_id ({stream_id}) 或获取 stream 失败: {e}")
return None
logger.error(f"获取 stream_id ({stream_id}) 失败: {e}")
return None
async def _gather_context(self, stream_id: str) -> dict[str, Any] | None:
"""
收集构建提示词所需的所有上下文信息
收集构建决策和规划提示词所需的所有上下文信息
此函数会根据聊天流是私聊还是群聊,收集不同的信息,
包括但不限于日程、聊天历史、人设、关系信息等。
Args:
stream_id: 聊天流ID。
Returns:
一个包含所有上下文信息的字典,如果找不到聊天流则返回 None。
"""
stream = self._get_stream_from_id(stream_id)
if not stream:
logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流")
return None
user_info = stream.user_info
if not user_info or not user_info.platform or not user_info.user_id:
logger.warning(f"Stream {stream_id} 的 user_info 不完整")
return None
person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id))
person_info_manager = get_person_info_manager()
# 获取日程
# 1. 收集通用信息 (日程, 聊天历史, 动作历史)
schedules = await schedule_api.ScheduleAPI.get_today_schedule()
schedule_context = (
"\n".join(
[
f"- {s.get('time_range', '未知时间')}: {s.get('activity', '未知活动')}"
for s in schedules
]
)
"\n".join([f"- {s.get('time_range', '未知时间')}: {s.get('activity', '未知活动')}" for s in schedules])
if schedules
else "今天没有日程安排。"
)
# 获取关系信息
short_impression = await person_info_manager.get_value(person_id, "short_impression") or ""
impression = await person_info_manager.get_value(person_id, "impression") or ""
attitude = await person_info_manager.get_value(person_id, "attitude") or 50
# 获取最近聊天记录
recent_messages = await message_api.get_recent_messages(stream_id, limit=10)
recent_messages = await message_api.get_recent_messages(stream.stream_id)
recent_chat_history = (
await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else ""
)
# 获取最近的动作历史
action_history = await database_api.db_query(
action_history_list = await database_api.db_query(
database_api.MODEL_MAPPING["ActionRecords"],
filters={"chat_id": stream_id, "action_name": "proactive_decision"},
limit=3,
order_by=["-time"],
)
action_history_context = ""
if isinstance(action_history, list):
action_history_context = (
"\n".join([f"- {a['action_data']}" for a in action_history if isinstance(a, dict)]) or ""
)
action_history_context = (
"\n".join([f"- {a['action_data']}" for a in action_history_list if isinstance(a, dict)])
if isinstance(action_history_list, list)
else ""
)
return {
"person_id": person_id,
"user_info": user_info,
# 2. 构建基础上下文
mood_state = "暂时没有"
if global_config.mood.enable_mood:
try:
mood_state = mood_manager.get_mood_by_chat_id(stream.stream_id).mood_state
except Exception as e:
logger.error(f"获取情绪失败,原因:{e}")
base_context = {
"schedule_context": schedule_context,
"recent_chat_history": recent_chat_history,
"action_history_context": action_history_context,
"mood_state": mood_state,
"persona": {
"core": global_config.personality.personality_core,
"side": global_config.personality.personality_side,
@@ -170,14 +177,61 @@ class ProactiveThinkerExecutor:
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
async def _make_decision(self, context: dict[str, Any], start_mode: str) -> dict[str, Any] | None:
"""
决策模块:判断是否应该主动发起对话,以及聊什么话题
"""
persona = context["persona"]
user_info = context["user_info"]
relationship = context["relationship"]
# 3. 根据聊天类型补充特定上下文
if stream.group_info: # 群聊场景
base_context.update(
{
"chat_type": "group",
"group_info": {"group_name": stream.group_info.group_name, "group_id": stream.group_info.group_id},
}
)
return base_context
elif stream.user_info: # 私聊场景
user_info = stream.user_info
if not user_info.platform or not user_info.user_id:
logger.warning(f"Stream {stream_id} 的 user_info 不完整")
return None
person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id))
person_info_manager = get_person_info_manager()
# 获取关系信息
short_impression = await person_info_manager.get_value(person_id, "short_impression") or ""
impression = await person_info_manager.get_value(person_id, "impression") or ""
attitude = await person_info_manager.get_value(person_id, "attitude") or 50
base_context.update(
{
"chat_type": "private",
"person_id": person_id,
"user_info": user_info,
"relationship": {
"short_impression": short_impression,
"impression": impression,
"attitude": attitude,
},
}
)
return base_context
else:
logger.warning(f"Stream {stream_id} 既没有 group_info 也没有 user_info")
return None
def _build_decision_prompt(self, context: dict[str, Any], start_mode: str) -> str:
"""
根据收集到的上下文信息,构建用于决策的提示词。
Args:
context: 包含所有上下文信息的字典。
start_mode: 启动模式 ('cold_start''wake_up')。
Returns:
构建完成的决策提示词字符串。
"""
chat_type = context["chat_type"]
persona = context["persona"]
# 构建通用头部
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
@@ -185,6 +239,16 @@ class ProactiveThinkerExecutor:
- 侧面人设: {persona["side"]}
- 身份: {persona["identity"]}
你的当前情绪状态是: {context["mood_state"]}
# 你最近的相关决策历史 (供参考)
{context["action_history_context"]}
"""
# 根据聊天类型构建任务和情境
if chat_type == "private":
user_info = context["user_info"]
relationship = context["relationship"]
prompt += f"""
# 任务
现在是 {context["current_time"]},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。
@@ -198,33 +262,93 @@ class ProactiveThinkerExecutor:
- 好感度: {relationship["attitude"]}/100
4. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
elif chat_type == "group":
group_info = context["group_info"]
prompt += f"""
# 任务
现在是 {context["current_time"]},你需要根据当前的情境,决定是否要主动向群聊 '{group_info["group_name"]}' 发起对话。
# 情境分析
1. **启动模式**: {start_mode} ({"首次加入/很久未发言" if start_mode == "cold_start" else "日常唤醒"})
2. **你的日程**:
{context["schedule_context"]}
3. **群聊信息**:
- 群名称: {group_info["group_name"]}
4. **最近的聊天摘要**:
{context["recent_chat_history"]}
"""
# 构建通用尾部
prompt += """
# 决策指令
请综合以上所有信息做出决策。你的决策需要以JSON格式输出包含以下字段
请综合以上所有信息,以稳定、真实、拟人的方式做出决策。你的决策需要以JSON格式输出包含以下字段
- `should_reply`: bool, 是否应该发起对话。
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题(例如:问候一下今天的日程、关心一下昨天的某件事、分享一个你自己的趣事等)
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题
- `reason`: str, 做出此决策的简要理由。
# 决策原则
- **谨慎对待未回复的对话**: 在发起新话题前,请检查【最近的聊天摘要】。如果最后一条消息是你自己发送的,请仔细评估等待的时间和上下文,判断再次主动发起对话是否礼貌和自然。如果等待时间很短(例如几分钟或半小时内),通常应该选择“不回复”。
- **优先利用上下文**: 优先从【情境分析】中已有的信息如最近的聊天摘要、你的日程、你对Ta的关系印象寻找自然的话题切入点。
- **简单问候作为备选**: 如果上下文中没有合适的话题,可以生成一个简单、真诚的日常问候(例如“在忙吗?”,“下午好呀~”)。
- **避免抽象**: 避免创造过于复杂、抽象或需要对方思考很久才能明白的话题。目标是轻松、自然地开启对话。
- **避免过于频繁**: 如果你最近(尤其是在最近的几次决策中)已经主动发起过对话,请倾向于选择“不回复”,除非有非常重要和紧急的事情。
- **如果上下文中只有你的消息而没有别人的消息**:选择不回复,以防刷屏或者打扰到别人
---
示例1 (应该回复):
示例1 (基于上下文):
{{
"should_reply": true,
"topic": "提醒Ta今天下午有'项目会议'的日程",
"reason": "现在是上午Ta下午有个重要会议我觉得应该主动提醒一下这会显得我很贴心"
"topic": "关心一下Ta昨天提到的那个项目进展如何了",
"reason": "用户昨天在聊天中提到了一个重要的项目,现在主动关心一下进展,会显得很体贴,也能自然地开启对话"
}}
示例2 (不应回复):
示例2 (简单问候):
{{
"should_reply": true,
"topic": "打个招呼问问Ta现在在忙些什么",
"reason": "最近没有聊天记录,日程也很常规,没有特别的切入点。一个简单的日常问候是最安全和自然的方式来重新连接。"
}}
示例3 (不应回复 - 过于频繁):
{{
"should_reply": false,
"topic": null,
"reason": "虽然我们的关系不错,但现在是深夜,而且Ta今天的日程都已经完成了没有合适的理由去打扰Ta"
"reason": "虽然群里很活跃,但现在是深夜,而且最近的聊天话题我也不熟悉,没有合适的理由去打扰大家"
}}
示例4 (不应回复 - 等待回应):
{{
"should_reply": false,
"topic": null,
"reason": "我注意到上一条消息是我几分钟前主动发送的,对方可能正在忙。为了表现出耐心和体贴,我现在最好保持安静,等待对方的回应。"
}}
---
请输出你的决策:
"""
return prompt
async def _make_decision(self, context: dict[str, Any], start_mode: str) -> dict[str, Any] | None:
"""
调用 LLM 进行决策,判断是否应该主动发起对话,以及聊什么话题。
Args:
context: 包含所有上下文信息的字典。
start_mode: 启动模式。
Returns:
一个包含决策结果的字典 (例如: {"should_reply": bool, "topic": str, "reason": str})
如果决策过程失败则返回 None 或包含错误信息的字典。
"""
if context["chat_type"] not in ["private", "group"]:
return {"should_reply": False, "reason": "未知的聊天类型"}
prompt = self._build_decision_prompt(context, start_mode)
if global_config.debug.show_prompt:
logger.info(f"主动思考规划器原始提示词:{prompt}")
logger.info(f"主动思考决策器原始提示词:{prompt}")
is_success, response, _, _ = await llm_api.generate_with_model(
prompt=prompt, model_config=model_config.model_task_config.utils
)
@@ -233,55 +357,56 @@ class ProactiveThinkerExecutor:
return {"should_reply": False, "reason": "决策模型生成失败"}
try:
# 假设LLM返回JSON格式的决策结果
if global_config.debug.show_prompt:
logger.info(f"主动思考规划器响应:{response}")
logger.info(f"主动思考决策器响应:{response}")
decision = orjson.loads(response)
return decision
except orjson.JSONDecodeError:
logger.error(f"决策LLM返回的JSON格式无效: {response}")
return {"should_reply": False, "reason": "决策模型返回格式错误"}
def _build_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str:
def _build_private_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str:
"""
根据启动模式和决策话题,构建最终的规划提示词
为私聊场景构建生成对话内容的规划提示词
Args:
context: 上下文信息字典。
start_mode: 启动模式。
topic: 决策模块决定的话题。
reason: 决策模块给出的理由。
Returns:
构建完成的私聊规划提示词字符串。
"""
persona = context["persona"]
user_info = context["user_info"]
relationship = context["relationship"]
if start_mode == "cold_start":
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona["core"]}
- 侧面人设: {persona["side"]}
- 身份: {persona["identity"]}
return f"""
# 任务
你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。
# 决策上下文
- **决策理由**: {reason}
- **你和Ta的关系**:
# 情境分析
1. **你的日程**:
{context["schedule_context"]}
2. **你和Ta的关系**:
- 简短印象: {relationship["short_impression"]}
- 详细印象: {relationship["impression"]}
- 好感度: {relationship["attitude"]}/100
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
# 对话指引
- 你的目标是“破冰”,让对话自然地开始。
- 你应该围绕这个话题展开: {topic}
- 你的语气应该符合你的人设,友好且真诚。
- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。
- 你的语气应该符合你的人设和你当前的心情({context["mood_state"]}),友好且真诚。
"""
else: # wake_up
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona["core"]}
- 侧面人设: {persona["side"]}
- 身份: {persona["identity"]}
return f"""
# 任务
现在是 {context["current_time"]},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。
@@ -301,10 +426,92 @@ class ProactiveThinkerExecutor:
# 对话指引
- 你决定和Ta聊聊关于“{topic}”的话题。
- **重要**: 在开始你的话题前,必须先用一句通用的、礼貌的开场白进行问候(例如:“在吗?”、“上午好!”、“晚上好呀~”),然后再自然地衔接你的话题,确保整个回复在一条消息内流畅、自然、像人类的说话方式。
- 请结合以上所有情境信息,自然地开启对话。
- 你的语气应该符合你的人设以及你对Ta的好感度。
- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。
- 你的语气应该符合你的人设({context["mood_state"]})以及你对Ta的好感度。
"""
def _build_group_plan_prompt(self, context: dict[str, Any], topic: str, reason: str) -> str:
"""
为群聊场景构建生成对话内容的规划提示词。
Args:
context: 上下文信息字典。
topic: 决策模块决定的话题。
reason: 决策模块给出的理由。
Returns:
构建完成的群聊规划提示词字符串。
"""
group_info = context["group_info"]
return f"""
# 任务
现在是 {context["current_time"]},你需要主动向群聊 '{group_info["group_name"]}' 发起对话。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
1. **你的日程**:
你当前的心情({context["mood_state"]}
{context["schedule_context"]}
2. **群聊信息**:
- 群名称: {group_info["group_name"]}
3. **最近的聊天摘要**:
{context["recent_chat_history"]}
4. **你最近的相关动作**:
{context["action_history_context"]}
# 对话指引
- 你决定和大家聊聊关于“{topic}”的话题。
- **重要**: 在开始你的话题前,必须先用一句通用的、礼貌的开场白进行问候(例如:“哈喽,大家好呀~”、“下午好!”),然后再自然地衔接你的话题,确保整个回复在一条消息内流畅、自然、像人类的说话方式。
- 你的语气应该更活泼、更具包容性,以吸引更多群成员参与讨论。你的语气应该符合你的人设)。
- 请结合以上所有情境信息,自然地开启对话。
- 可以分享你的看法、提出相关问题,或者开个合适的玩笑。
"""
def _build_plan_prompt(self, context: dict[str, Any], start_mode: str, topic: str, reason: str) -> str:
"""
根据聊天类型、启动模式和决策结果,构建最终生成对话内容的规划提示词。
Args:
context: 上下文信息字典。
start_mode: 启动模式。
topic: 决策模块决定的话题。
reason: 决策模块给出的理由。
Returns:
最终的规划提示词字符串。
"""
persona = context["persona"]
chat_type = context["chat_type"]
# 1. 构建通用角色头部
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona["core"]}
- 侧面人设: {persona["side"]}
- 身份: {persona["identity"]}
"""
# 2. 根据聊天类型构建特定内容
if chat_type == "private":
prompt += self._build_private_plan_prompt(context, start_mode, topic, reason)
elif chat_type == "group":
prompt += self._build_group_plan_prompt(context, topic, reason)
# 3. 添加通用结尾
final_instructions = """
# 输出要求
- **简洁**: 不要输出任何多余内容如前缀、后缀、冒号、引号、at/@等)。
- **原创**: 不要重复之前的内容,即使意思相近也不行。
- **直接**: 只输出最终的回复文本本身。
- **风格**: 回复需简短、完整且口语化。
现在,你说:"""
prompt += final_instructions
if global_config.debug.show_prompt:
logger.info(f"主动思考回复器原始提示词:{prompt}")
return prompt