Merge remote-tracking branch 'upstream/dev' into dev

This commit is contained in:
meng_xi_pan
2025-04-09 16:05:15 +08:00
25 changed files with 1950 additions and 958 deletions

160
README.md
View File

@@ -1,24 +1,66 @@
# 麦麦MaiCore-MaiMBot (编辑中) # 麦麦MaiCore-MaiMBot (编辑中)
<br />
<div align="center">
![Python Version](https://img.shields.io/badge/Python-3.9+-blue)
![License](https://img.shields.io/github/license/SengokuCola/MaiMBot?label=协议)
![Status](https://img.shields.io/badge/状态-开发中-yellow)
![Contributors](https://img.shields.io/github/contributors/MaiM-with-u/MaiBot.svg?style=flat&label=贡献者)
![forks](https://img.shields.io/github/forks/MaiM-with-u/MaiBot.svg?style=flat&label=分支数)
![stars](https://img.shields.io/github/stars/MaiM-with-u/MaiBot?style=flat&label=星标数)
![issues](https://img.shields.io/github/issues/MaiM-with-u/MaiBot)
</div>
<p align="center">
<a href="https://github.com/MaiM-with-u/MaiBot/">
<img src="depends-data/maimai.png" alt="Logo" width="200">
</a>
<br />
<a href="https://space.bilibili.com/1344099355">
画师略nd
</a>
<h3 align="center">MaiBot(麦麦)</h3>
<p align="center">
一款专注于<strong> 群组聊天 </strong>的赛博网友
<br />
<a href="https://docs.mai-mai.org"><strong>探索本项目的文档 »</strong></a>
<br />
<br />
<!-- <a href="https://github.com/shaojintian/Best_README_template">查看Demo</a>
· -->
<a href="https://github.com/MaiM-with-u/MaiBot/issues">报告Bug</a>
·
<a href="https://github.com/MaiM-with-u/MaiBot/issues">提出新特性</a>
</p>
</p>
## 新版0.6.0部署前先阅读https://docs.mai-mai.org/manual/usage/mmc_q_a ## 新版0.6.0部署前先阅读https://docs.mai-mai.org/manual/usage/mmc_q_a
<div align="center">
![Python Version](https://img.shields.io/badge/Python-3.9+-blue)
![License](https://img.shields.io/github/license/SengokuCola/MaiMBot)
![Status](https://img.shields.io/badge/状态-开发中-yellow)
</div>
## 📝 项目简介 ## 📝 项目简介
**🍔MaiCore是一个基于大语言模型的可交互智能体** **🍔MaiCore是一个基于大语言模型的可交互智能体**
- LLM 提供对话能力
- 动态Prompt构建器 - 💭 **智能对话系统**基于LLM的自然语言交互
- 实时思维系统 - 🤔 **实时思维系统**:模拟人类思考过程
- MongoDB 提供数据持久化支持 - 💝 **情感表达系统**:丰富的表情包和情绪表达
- 可扩展,可支持多种平台和多种功能 - 🧠 **持久记忆系统**基于MongoDB的长期记忆存储
- 🔄 **动态人格系统**:自适应的性格特征
<div align="center">
<a href="https://www.bilibili.com/video/BV1amAneGE3P" target="_blank">
<img src="depends-data/video.png" width="200" alt="麦麦演示视频">
<br>
👆 点击观看麦麦演示视频 👆
</a>
</div>
### 📢 版本信息
**最新版本: v0.6.0** ([查看更新日志](changelogs/changelog.md)) **最新版本: v0.6.0** ([查看更新日志](changelogs/changelog.md))
> [!WARNING] > [!WARNING]
@@ -28,19 +70,12 @@
> 次版本MaiBot将基于MaiCore运行不再依赖于nonebot相关组件运行。 > 次版本MaiBot将基于MaiCore运行不再依赖于nonebot相关组件运行。
> MaiBot将通过nonebot的插件与nonebot建立联系然后nonebot与QQ建立联系实现MaiBot与QQ的交互 > MaiBot将通过nonebot的插件与nonebot建立联系然后nonebot与QQ建立联系实现MaiBot与QQ的交互
**分支介绍:** **分支说明:**
- main 稳定版本 - `main`: 稳定发布版本
- dev 开发版(不知道什么意思就别下) - `dev`: 开发测试版本(不知道什么意思就别下)
- classical 0.6.0前的版本 - `classical`: 0.6.0前的版本
<div align="center">
<a href="https://www.bilibili.com/video/BV1amAneGE3P" target="_blank">
<img src="docs/pic/video.png" width="300" alt="麦麦演示视频">
<br>
👆 点击观看麦麦演示视频 👆
</a>
</div>
> [!WARNING] > [!WARNING]
> - 项目处于活跃开发阶段,代码可能随时更改 > - 项目处于活跃开发阶段,代码可能随时更改
@@ -49,6 +84,12 @@
> - 由于持续迭代可能存在一些已知或未知的bug > - 由于持续迭代可能存在一些已知或未知的bug
> - 由于开发中可能消耗较多token > - 由于开发中可能消耗较多token
### ⚠️ 重要提示
- 升级到v0.6.0版本前请务必阅读:[升级指南](https://docs.mai-mai.org/manual/usage/mmc_q_a)
- 本版本基于MaiCore重构通过nonebot插件与QQ平台交互
- 项目处于活跃开发阶段功能和API可能随时调整
### 💬交流群(开发和建议相关讨论)不一定有空回复,会优先写文档和代码 ### 💬交流群(开发和建议相关讨论)不一定有空回复,会优先写文档和代码
- [五群](https://qm.qq.com/q/JxvHZnxyec) 1022489779 - [五群](https://qm.qq.com/q/JxvHZnxyec) 1022489779
- [一群](https://qm.qq.com/q/VQ3XZrWgMs) 766798517 【已满】 - [一群](https://qm.qq.com/q/VQ3XZrWgMs) 766798517 【已满】
@@ -72,55 +113,35 @@
## 🎯 功能介绍 ## 🎯 功能介绍
### 💬 聊天功能 | 模块 | 主要功能 | 特点 |
- 提供思维流(心流)聊天和推理聊天两种对话逻辑 |------|---------|------|
- 支持关键词检索主动发言对消息的话题topic进行识别如果检测到麦麦存储过的话题就会主动进行发言 | 💬 聊天系统 | • 思维流/推理聊天<br>关键词主动发言<br>• 多模型支持<br>• 动态prompt构建<br>• 私聊功能(PFC) | 拟人化交互 |
- 支持bot名字呼唤发言检测到"麦麦"会主动发言,可配置 | 🧠 思维流系统 | • 实时思考生成<br>• 自动启停机制<br>• 日程系统联动 | 智能化决策 |
- 支持多模型,多厂商自定义配置 | 🧠 记忆系统 2.0 | • 优化记忆抽取<br>• 海马体记忆机制<br>• 聊天记录概括 | 持久化记忆 |
- 动态的prompt构建器更拟人 | 😊 表情包系统 | • 情绪匹配发送<br>• GIF支持<br>• 自动收集与审查 | 丰富表达 |
- 支持图片,转发消息,回复消息的识别 | 📅 日程系统 | • 动态日程生成<br>• 自定义想象力<br>• 思维流联动 | 智能规划 |
- 支持私聊功能可使用PFC模式的有目的多轮对话实验性 | 👥 关系系统 2.0 | • 关系管理优化<br>• 丰富接口支持<br>• 个性化交互 | 深度社交 |
| 📊 统计系统 | • 使用数据统计<br>• LLM调用记录<br>• 实时控制台显示 | 数据可视 |
| 🔧 系统功能 | • 优雅关闭机制<br>• 自动数据保存<br>• 异常处理完善 | 稳定可靠 |
### 🧠 思维流系统 ## 📐 项目架构
- 思维流能够在回复前后进行思考,生成实时想法
- 思维流自动启停机制,提升资源利用效率
- 思维流与日程系统联动,实现动态日程生成
### 🧠 记忆系统 2.0 ```mermaid
- 优化记忆抽取策略和prompt结构 graph TD
- 改进海马体记忆提取机制,提升自然度 A[MaiCore] --> B[对话系统]
- 对聊天记录进行概括存储,在需要时调用 A --> C[思维流系统]
A --> D[记忆系统]
A --> E[情感系统]
B --> F[多模型支持]
B --> G[动态Prompt]
C --> H[实时思考]
C --> I[日程联动]
D --> J[记忆存储]
D --> K[记忆检索]
E --> L[表情管理]
E --> M[情绪识别]
```
### 😊 表情包系统
- 支持根据发言内容发送对应情绪的表情包
- 支持识别和处理gif表情包
- 会自动偷群友的表情包
- 表情包审查功能
- 表情包文件完整性自动检查
- 自动清理缓存图片
### 📅 日程系统
- 动态更新的日程生成
- 可自定义想象力程度
- 与聊天情况交互(思维流模式下)
### 👥 关系系统 2.0
- 优化关系管理系统,适用于新版本
- 提供更丰富的关系接口
- 针对每个用户创建"关系",实现个性化回复
### 📊 统计系统
- 详细的使用数据统计
- LLM调用统计
- 在控制台显示统计信息
### 🔧 系统功能
- 支持优雅的shutdown机制
- 自动保存功能,定期保存聊天记录和关系数据
- 完善的异常处理机制
- 可自定义时区设置
- 优化的日志输出格式
- 配置自动更新功能
## 开发计划TODOLIST ## 开发计划TODOLIST
@@ -157,7 +178,6 @@ MaiCore是一个开源项目我们非常欢迎你的参与。你的贡献
## 致谢 ## 致谢
- [nonebot2](https://github.com/nonebot/nonebot2): 跨平台 Python 异步聊天机器人框架
- [NapCat](https://github.com/NapNeko/NapCatQQ): 现代化的基于 NTQQ 的 Bot 协议端实现 - [NapCat](https://github.com/NapNeko/NapCatQQ): 现代化的基于 NTQQ 的 Bot 协议端实现
### 贡献者 ### 贡献者

4
bot.py
View File

@@ -8,6 +8,7 @@ import time
import platform import platform
from dotenv import load_dotenv from dotenv import load_dotenv
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from src.common.crash_logger import install_crash_handler
from src.main import MainSystem from src.main import MainSystem
logger = get_module_logger("main_bot") logger = get_module_logger("main_bot")
@@ -193,6 +194,9 @@ def raw_main():
if platform.system().lower() != "windows": if platform.system().lower() != "windows":
time.tzset() time.tzset()
# 安装崩溃日志处理器
install_crash_handler()
check_eula() check_eula()
print("检查EULA和隐私条款完成") print("检查EULA和隐私条款完成")
easter_egg() easter_egg()

BIN
depends-data/maimai.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 455 KiB

BIN
depends-data/video.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

View File

@@ -0,0 +1,72 @@
import sys
import traceback
import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler
def setup_crash_logger():
"""设置崩溃日志记录器"""
# 创建logs/crash目录如果不存在
crash_log_dir = Path("logs/crash")
crash_log_dir.mkdir(parents=True, exist_ok=True)
# 创建日志记录器
crash_logger = logging.getLogger('crash_logger')
crash_logger.setLevel(logging.ERROR)
# 设置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s\n'
'异常类型: %(exc_info)s\n'
'详细信息:\n%(message)s\n'
'-------------------\n'
)
# 创建按大小轮转的文件处理器最大10MB保留5个备份
log_file = crash_log_dir / "crash.log"
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
crash_logger.addHandler(file_handler)
return crash_logger
def log_crash(exc_type, exc_value, exc_traceback):
"""记录崩溃信息到日志文件"""
if exc_type is None:
return
# 获取崩溃日志记录器
crash_logger = logging.getLogger('crash_logger')
# 获取完整的异常堆栈信息
stack_trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
# 记录崩溃信息
crash_logger.error(
stack_trace,
exc_info=(exc_type, exc_value, exc_traceback)
)
def install_crash_handler():
"""安装全局异常处理器"""
# 设置崩溃日志记录器
setup_crash_logger()
# 保存原始的异常处理器
original_hook = sys.excepthook
def exception_handler(exc_type, exc_value, exc_traceback):
"""全局异常处理器"""
# 记录崩溃信息
log_crash(exc_type, exc_value, exc_traceback)
# 调用原始的异常处理器
original_hook(exc_type, exc_value, exc_traceback)
# 设置全局异常处理器
sys.excepthook = exception_handler

View File

@@ -0,0 +1,139 @@
from typing import Tuple
from src.common.logger import get_module_logger
from ..models.utils_model import LLM_request
from ..config.config import global_config
from .chat_observer import ChatObserver
from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
logger = get_module_logger("action_planner")
class ActionPlannerInfo:
def __init__(self):
self.done_action = []
self.goal_list = []
self.knowledge_list = []
self.memory_list = []
class ActionPlanner:
"""行动规划器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal,
temperature=0.7,
max_tokens=1000,
request_type="action_planning"
)
self.personality_info = Individuality.get_instance().get_prompt(type = "personality", x_person = 2, level = 2)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
async def plan(
self,
observation_info: ObservationInfo,
conversation_info: ConversationInfo
) -> Tuple[str, str]:
"""规划下一步行动
Args:
observation_info: 决策信息
conversation_info: 对话信息
Returns:
Tuple[str, str]: (行动类型, 行动原因)
"""
# 构建提示词
logger.debug(f"开始规划行动:当前目标: {conversation_info.goal_list}")
#构建对话目标
if conversation_info.goal_list:
goal, reasoning = conversation_info.goal_list[-1]
else:
goal = "目前没有明确对话目标"
reasoning = "目前没有明确对话目标,最好思考一个对话目标"
# 获取聊天历史记录
chat_history_list = observation_info.chat_history
chat_history_text = ""
for msg in chat_history_list:
chat_history_text += f"{msg}\n"
if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg}\n"
observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本
action_history_list = conversation_info.done_action
action_history_text = "你之前做的事情是:"
for action in action_history_list:
action_history_text += f"{action}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动
当前对话目标:{goal}
产生该对话目标的原因:{reasoning}
{action_history_text}
最近的对话记录:
{chat_history_text}
请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言:
行动类型:
fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择
wait: 当你做出了发言,对方尚未回复时等待对方的回复
listening: 倾听对方发言,当你认为对方发言尚未结束时采用
direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言
rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标
请以JSON格式输出包含以下字段
1. action: 行动类型,注意你之前的行为
2. reason: 选择该行动的原因,注意你之前的行为(简要解释)
注意请严格按照JSON格式输出不要包含任何其他内容。"""
logger.debug(f"发送到LLM的提示词: {prompt}")
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
# 使用简化函数提取JSON内容
success, result = get_items_from_json(
content,
"action", "reason",
default_values={"action": "direct_reply", "reason": "没有明确原因"}
)
if not success:
return "direct_reply", "JSON解析失败选择直接回复"
action = result["action"]
reason = result["reason"]
# 验证action类型
if action not in ["direct_reply", "fetch_knowledge", "wait", "listening", "rethink_goal"]:
logger.warning(f"未知的行动类型: {action}默认使用listening")
action = "listening"
logger.info(f"规划的行动: {action}")
logger.info(f"行动原因: {reason}")
return action, reason
except Exception as e:
logger.error(f"规划行动时出错: {str(e)}")
return "direct_reply", "发生错误,选择直接回复"

View File

@@ -2,9 +2,10 @@ import time
import asyncio import asyncio
from typing import Optional, Dict, Any, List, Tuple from typing import Optional, Dict, Any, List, Tuple
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from src.common.database import db
from ..message.message_base import UserInfo from ..message.message_base import UserInfo
from ..config.config import global_config from ..config.config import global_config
from .chat_states import NotificationManager, create_new_message_notification, create_cold_chat_notification
from .message_storage import MessageStorage, MongoDBMessageStorage
logger = get_module_logger("chat_observer") logger = get_module_logger("chat_observer")
@@ -16,36 +17,40 @@ class ChatObserver:
_instances: Dict[str, "ChatObserver"] = {} _instances: Dict[str, "ChatObserver"] = {}
@classmethod @classmethod
def get_instance(cls, stream_id: str) -> "ChatObserver": def get_instance(cls, stream_id: str, message_storage: Optional[MessageStorage] = None) -> 'ChatObserver':
"""获取或创建观察器实例 """获取或创建观察器实例
Args: Args:
stream_id: 聊天流ID stream_id: 聊天流ID
message_storage: 消息存储实现如果为None则使用MongoDB实现
Returns: Returns:
ChatObserver: 观察器实例 ChatObserver: 观察器实例
""" """
if stream_id not in cls._instances: if stream_id not in cls._instances:
cls._instances[stream_id] = cls(stream_id) cls._instances[stream_id] = cls(stream_id, message_storage)
return cls._instances[stream_id] return cls._instances[stream_id]
def __init__(self, stream_id: str): def __init__(self, stream_id: str, message_storage: Optional[MessageStorage] = None):
"""初始化观察器 """初始化观察器
Args: Args:
stream_id: 聊天流ID stream_id: 聊天流ID
message_storage: 消息存储实现如果为None则使用MongoDB实现
""" """
if stream_id in self._instances: if stream_id in self._instances:
raise RuntimeError(f"ChatObserver for {stream_id} already exists. Use get_instance() instead.") raise RuntimeError(f"ChatObserver for {stream_id} already exists. Use get_instance() instead.")
self.stream_id = stream_id self.stream_id = stream_id
self.last_user_speak_time: Optional[float] = None # 对方上次发言时间 self.message_storage = message_storage or MongoDBMessageStorage()
self.last_bot_speak_time: Optional[float] = None # 机器人上次发言时间
self.last_check_time: float = time.time() # 上次查看聊天记录时间
self.last_message_read: Optional[str] = None # 最后读取的消息ID
self.last_message_time: Optional[float] = None # 最后一条消息的时间戳
self.waiting_start_time: Optional[float] = None # 等待开始时间 self.last_user_speak_time: Optional[float] = None # 对方上次发言时间
self.last_bot_speak_time: Optional[float] = None # 机器人上次发言时间
self.last_check_time: float = time.time() # 上次查看聊天记录时间
self.last_message_read: Optional[str] = None # 最后读取的消息ID
self.last_message_time: Optional[float] = None # 最后一条消息的时间戳
self.waiting_start_time: float = time.time() # 等待开始时间,初始化为当前时间
# 消息历史记录 # 消息历史记录
self.message_history: List[Dict[str, Any]] = [] # 所有消息历史 self.message_history: List[Dict[str, Any]] = [] # 所有消息历史
@@ -58,7 +63,20 @@ class ChatObserver:
self._update_event = asyncio.Event() # 触发更新的事件 self._update_event = asyncio.Event() # 触发更新的事件
self._update_complete = asyncio.Event() # 更新完成的事件 self._update_complete = asyncio.Event() # 更新完成的事件
def check(self) -> bool: # 通知管理器
self.notification_manager = NotificationManager()
# 冷场检查配置
self.cold_chat_threshold: float = 60.0 # 60秒无消息判定为冷场
self.last_cold_chat_check: float = time.time()
self.is_cold_chat_state: bool = False
self.update_event = asyncio.Event()
self.update_interval = 5 # 更新间隔(秒)
self.message_cache = []
self.update_running = False
async def check(self) -> bool:
"""检查距离上一次观察之后是否有了新消息 """检查距离上一次观察之后是否有了新消息
Returns: Returns:
@@ -66,10 +84,10 @@ class ChatObserver:
""" """
logger.debug(f"检查距离上一次观察之后是否有了新消息: {self.last_check_time}") logger.debug(f"检查距离上一次观察之后是否有了新消息: {self.last_check_time}")
query = {"chat_id": self.stream_id, "time": {"$gt": self.last_check_time}} new_message_exists = await self.message_storage.has_new_messages(
self.stream_id,
# 只需要查询是否存在,不需要获取具体消息 self.last_check_time
new_message_exists = db.messages.find_one(query) is not None )
if new_message_exists: if new_message_exists:
logger.debug("发现新消息") logger.debug("发现新消息")
@@ -77,27 +95,8 @@ class ChatObserver:
return new_message_exists return new_message_exists
def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: async def _add_message_to_history(self, message: Dict[str, Any]):
"""获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象""" """添加消息到历史记录并发送通知
messages = self.get_message_history(self.last_check_time)
for message in messages:
self._add_message_to_history(message)
return messages, self.message_history
def new_message_after(self, time_point: float) -> bool:
"""判断是否在指定时间点后有新消息
Args:
time_point: 时间戳
Returns:
bool: 是否有新消息
"""
logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point}")
return self.last_message_time is None or self.last_message_time > time_point
def _add_message_to_history(self, message: Dict[str, Any]):
"""添加消息到历史记录
Args: Args:
message: 消息数据 message: 消息数据
@@ -114,6 +113,75 @@ class ChatObserver:
else: else:
self.last_user_speak_time = message["time"] self.last_user_speak_time = message["time"]
# 发送新消息通知
notification = create_new_message_notification(
sender="chat_observer",
target="pfc",
message=message
)
await self.notification_manager.send_notification(notification)
# 检查并更新冷场状态
await self._check_cold_chat()
async def _check_cold_chat(self):
"""检查是否处于冷场状态并发送通知"""
current_time = time.time()
# 每10秒检查一次冷场状态
if current_time - self.last_cold_chat_check < 10:
return
self.last_cold_chat_check = current_time
# 判断是否冷场
is_cold = False
if self.last_message_time is None:
is_cold = True
else:
is_cold = (current_time - self.last_message_time) > self.cold_chat_threshold
# 如果冷场状态发生变化,发送通知
if is_cold != self.is_cold_chat_state:
self.is_cold_chat_state = is_cold
notification = create_cold_chat_notification(
sender="chat_observer",
target="pfc",
is_cold=is_cold
)
await self.notification_manager.send_notification(notification)
async def get_new_message(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""获取上一次观察的时间点后的新消息,插入到历史记录中,并返回新消息和历史记录两个对象"""
messages = await self.message_storage.get_messages_after(
self.stream_id,
self.last_message_read
)
for message in messages:
await self._add_message_to_history(message)
return messages, self.message_history
def new_message_after(self, time_point: float) -> bool:
"""判断是否在指定时间点后有新消息
Args:
time_point: 时间戳
Returns:
bool: 是否有新消息
"""
if time_point is None:
logger.warning("time_point 为 None返回 False")
return False
if self.last_message_time is None:
logger.debug("没有最后消息时间,返回 False")
return False
has_new = self.last_message_time > time_point
logger.debug(f"判断是否在指定时间点后有新消息: {self.last_message_time} > {time_point} = {has_new}")
return has_new
def get_message_history( def get_message_history(
self, self,
start_time: Optional[float] = None, start_time: Optional[float] = None,
@@ -156,14 +224,10 @@ class ChatObserver:
Returns: Returns:
List[Dict[str, Any]]: 新消息列表 List[Dict[str, Any]]: 新消息列表
""" """
query = {"chat_id": self.stream_id} new_messages = await self.message_storage.get_messages_after(
if self.last_message_read: self.stream_id,
# 获取ID大于last_message_read的消息 self.last_message_read
last_message = db.messages.find_one({"message_id": self.last_message_read}) )
if last_message:
query["time"] = {"$gt": last_message["time"]}
new_messages = list(db.messages.find(query).sort("time", 1))
if new_messages: if new_messages:
self.last_message_read = new_messages[-1]["message_id"] self.last_message_read = new_messages[-1]["message_id"]
@@ -179,27 +243,24 @@ class ChatObserver:
Returns: Returns:
List[Dict[str, Any]]: 最多5条消息 List[Dict[str, Any]]: 最多5条消息
""" """
query = {"chat_id": self.stream_id, "time": {"$lt": time_point}} new_messages = await self.message_storage.get_messages_before(
self.stream_id,
new_messages = list( time_point
db.messages.find(query).sort("time", -1).limit(5) # 倒序获取5条
) )
# 将消息按时间正序排列
new_messages.reverse()
if new_messages: if new_messages:
self.last_message_read = new_messages[-1]["message_id"] self.last_message_read = new_messages[-1]["message_id"]
return new_messages return new_messages
'''主要观察循环'''
async def _update_loop(self): async def _update_loop(self):
"""更新循环""" """更新循环"""
try: try:
start_time = time.time() start_time = time.time()
messages = await self._fetch_new_messages_before(start_time) messages = await self._fetch_new_messages_before(start_time)
for message in messages: for message in messages:
self._add_message_to_history(message) await self._add_message_to_history(message)
except Exception as e: except Exception as e:
logger.error(f"缓冲消息出错: {e}") logger.error(f"缓冲消息出错: {e}")
@@ -220,7 +281,7 @@ class ChatObserver:
if new_messages: if new_messages:
# 处理新消息 # 处理新消息
for message in new_messages: for message in new_messages:
self._add_message_to_history(message) await self._add_message_to_history(message)
# 设置完成事件 # 设置完成事件
self._update_complete.set() self._update_complete.set()
@@ -312,3 +373,71 @@ class ChatObserver:
time_info += f"\n距离对方上次发言已经过去了{int(user_speak_ago)}" time_info += f"\n距离对方上次发言已经过去了{int(user_speak_ago)}"
return time_info return time_info
def start_periodic_update(self):
"""启动观察器的定期更新"""
if not self.update_running:
self.update_running = True
asyncio.create_task(self._periodic_update())
async def _periodic_update(self):
"""定期更新消息历史"""
try:
while self.update_running:
await self._update_message_history()
await asyncio.sleep(self.update_interval)
except Exception as e:
logger.error(f"定期更新消息历史时出错: {str(e)}")
async def _update_message_history(self) -> bool:
"""更新消息历史
Returns:
bool: 是否有新消息
"""
try:
messages = await self.message_storage.get_messages_for_stream(
self.stream_id,
limit=50
)
if not messages:
return False
# 检查是否有新消息
has_new_messages = False
if messages and (not self.message_cache or messages[0]["message_id"] != self.message_cache[0]["message_id"]):
has_new_messages = True
self.message_cache = messages
if has_new_messages:
self.update_event.set()
self.update_event.clear()
return True
return False
except Exception as e:
logger.error(f"更新消息历史时出错: {str(e)}")
return False
def get_cached_messages(self, limit: int = 50) -> List[Dict[str, Any]]:
"""获取缓存的消息历史
Args:
limit: 获取的最大消息数量默认50
Returns:
List[Dict[str, Any]]: 缓存的消息历史列表
"""
return self.message_cache[:limit]
def get_last_message(self) -> Optional[Dict[str, Any]]:
"""获取最后一条消息
Returns:
Optional[Dict[str, Any]]: 最后一条消息如果没有则返回None
"""
if not self.message_cache:
return None
return self.message_cache[0]

View File

@@ -0,0 +1,267 @@
from enum import Enum, auto
from typing import Optional, Dict, Any, List, Set
from dataclasses import dataclass
from datetime import datetime
from abc import ABC, abstractmethod
class ChatState(Enum):
"""聊天状态枚举"""
NORMAL = auto() # 正常状态
NEW_MESSAGE = auto() # 有新消息
COLD_CHAT = auto() # 冷场状态
ACTIVE_CHAT = auto() # 活跃状态
BOT_SPEAKING = auto() # 机器人正在说话
USER_SPEAKING = auto() # 用户正在说话
SILENT = auto() # 沉默状态
ERROR = auto() # 错误状态
class NotificationType(Enum):
"""通知类型枚举"""
NEW_MESSAGE = auto() # 新消息通知
COLD_CHAT = auto() # 冷场通知
ACTIVE_CHAT = auto() # 活跃通知
BOT_SPEAKING = auto() # 机器人说话通知
USER_SPEAKING = auto() # 用户说话通知
MESSAGE_DELETED = auto() # 消息删除通知
USER_JOINED = auto() # 用户加入通知
USER_LEFT = auto() # 用户离开通知
ERROR = auto() # 错误通知
@dataclass
class ChatStateInfo:
"""聊天状态信息"""
state: ChatState
last_message_time: Optional[float] = None
last_message_content: Optional[str] = None
last_speaker: Optional[str] = None
message_count: int = 0
cold_duration: float = 0.0 # 冷场持续时间(秒)
active_duration: float = 0.0 # 活跃持续时间(秒)
@dataclass
class Notification:
"""通知基类"""
type: NotificationType
timestamp: float
sender: str # 发送者标识
target: str # 接收者标识
data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"type": self.type.name,
"timestamp": self.timestamp,
"data": self.data
}
@dataclass
class StateNotification(Notification):
"""持续状态通知"""
is_active: bool = True
def to_dict(self) -> Dict[str, Any]:
base_dict = super().to_dict()
base_dict["is_active"] = self.is_active
return base_dict
class NotificationHandler(ABC):
"""通知处理器接口"""
@abstractmethod
async def handle_notification(self, notification: Notification):
"""处理通知"""
pass
class NotificationManager:
"""通知管理器"""
def __init__(self):
# 按接收者和通知类型存储处理器
self._handlers: Dict[str, Dict[NotificationType, List[NotificationHandler]]] = {}
self._active_states: Set[NotificationType] = set()
self._notification_history: List[Notification] = []
def register_handler(self, target: str, notification_type: NotificationType, handler: NotificationHandler):
"""注册通知处理器
Args:
target: 接收者标识(例如:"pfc"
notification_type: 要处理的通知类型
handler: 处理器实例
"""
if target not in self._handlers:
self._handlers[target] = {}
if notification_type not in self._handlers[target]:
self._handlers[target][notification_type] = []
self._handlers[target][notification_type].append(handler)
def unregister_handler(self, target: str, notification_type: NotificationType, handler: NotificationHandler):
"""注销通知处理器
Args:
target: 接收者标识
notification_type: 通知类型
handler: 要注销的处理器实例
"""
if target in self._handlers and notification_type in self._handlers[target]:
handlers = self._handlers[target][notification_type]
if handler in handlers:
handlers.remove(handler)
# 如果该类型的处理器列表为空,删除该类型
if not handlers:
del self._handlers[target][notification_type]
# 如果该目标没有任何处理器,删除该目标
if not self._handlers[target]:
del self._handlers[target]
async def send_notification(self, notification: Notification):
"""发送通知"""
self._notification_history.append(notification)
# 如果是状态通知,更新活跃状态
if isinstance(notification, StateNotification):
if notification.is_active:
self._active_states.add(notification.type)
else:
self._active_states.discard(notification.type)
# 调用目标接收者的处理器
target = notification.target
if target in self._handlers:
handlers = self._handlers[target].get(notification.type, [])
for handler in handlers:
await handler.handle_notification(notification)
def get_active_states(self) -> Set[NotificationType]:
"""获取当前活跃的状态"""
return self._active_states.copy()
def is_state_active(self, state_type: NotificationType) -> bool:
"""检查特定状态是否活跃"""
return state_type in self._active_states
def get_notification_history(self,
sender: Optional[str] = None,
target: Optional[str] = None,
limit: Optional[int] = None) -> List[Notification]:
"""获取通知历史
Args:
sender: 过滤特定发送者的通知
target: 过滤特定接收者的通知
limit: 限制返回数量
"""
history = self._notification_history
if sender:
history = [n for n in history if n.sender == sender]
if target:
history = [n for n in history if n.target == target]
if limit is not None:
history = history[-limit:]
return history
# 一些常用的通知创建函数
def create_new_message_notification(sender: str, target: str, message: Dict[str, Any]) -> Notification:
"""创建新消息通知"""
return Notification(
type=NotificationType.NEW_MESSAGE,
timestamp=datetime.now().timestamp(),
sender=sender,
target=target,
data={
"message_id": message.get("message_id"),
"content": message.get("content"),
"sender": message.get("sender"),
"time": message.get("time")
}
)
def create_cold_chat_notification(sender: str, target: str, is_cold: bool) -> StateNotification:
"""创建冷场状态通知"""
return StateNotification(
type=NotificationType.COLD_CHAT,
timestamp=datetime.now().timestamp(),
sender=sender,
target=target,
data={"is_cold": is_cold},
is_active=is_cold
)
def create_active_chat_notification(sender: str, target: str, is_active: bool) -> StateNotification:
"""创建活跃状态通知"""
return StateNotification(
type=NotificationType.ACTIVE_CHAT,
timestamp=datetime.now().timestamp(),
sender=sender,
target=target,
data={"is_active": is_active},
is_active=is_active
)
class ChatStateManager:
"""聊天状态管理器"""
def __init__(self):
self.current_state = ChatState.NORMAL
self.state_info = ChatStateInfo(state=ChatState.NORMAL)
self.state_history: list[ChatStateInfo] = []
def update_state(self, new_state: ChatState, **kwargs):
"""更新聊天状态
Args:
new_state: 新的状态
**kwargs: 其他状态信息
"""
self.current_state = new_state
self.state_info.state = new_state
# 更新其他状态信息
for key, value in kwargs.items():
if hasattr(self.state_info, key):
setattr(self.state_info, key, value)
# 记录状态历史
self.state_history.append(self.state_info)
def get_current_state_info(self) -> ChatStateInfo:
"""获取当前状态信息"""
return self.state_info
def get_state_history(self) -> list[ChatStateInfo]:
"""获取状态历史"""
return self.state_history
def is_cold_chat(self, threshold: float = 60.0) -> bool:
"""判断是否处于冷场状态
Args:
threshold: 冷场阈值(秒)
Returns:
bool: 是否冷场
"""
if not self.state_info.last_message_time:
return True
current_time = datetime.now().timestamp()
return (current_time - self.state_info.last_message_time) > threshold
def is_active_chat(self, threshold: float = 5.0) -> bool:
"""判断是否处于活跃状态
Args:
threshold: 活跃阈值(秒)
Returns:
bool: 是否活跃
"""
if not self.state_info.last_message_time:
return False
current_time = datetime.now().timestamp()
return (current_time - self.state_info.last_message_time) <= threshold

View File

@@ -0,0 +1,245 @@
import asyncio
import datetime
from typing import Dict, Any
from ..chat.message import Message
from .pfc_types import ConversationState
from .pfc import ChatObserver, GoalAnalyzer, Waiter, DirectMessageSender
from src.common.logger import get_module_logger
from .action_planner import ActionPlanner
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
from .reply_generator import ReplyGenerator
from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo
from src.plugins.chat.chat_stream import chat_manager
from .pfc_KnowledgeFetcher import KnowledgeFetcher
import traceback
logger = get_module_logger("pfc_conversation")
class Conversation:
"""对话类,负责管理单个对话的状态和行为"""
def __init__(self, stream_id: str):
"""初始化对话实例
Args:
stream_id: 聊天流ID
"""
self.stream_id = stream_id
self.state = ConversationState.INIT
self.should_continue = False
# 回复相关
self.generated_reply = ""
async def _initialize(self):
"""初始化实例,注册所有组件"""
try:
self.action_planner = ActionPlanner(self.stream_id)
self.goal_analyzer = GoalAnalyzer(self.stream_id)
self.reply_generator = ReplyGenerator(self.stream_id)
self.knowledge_fetcher = KnowledgeFetcher()
self.waiter = Waiter(self.stream_id)
self.direct_sender = DirectMessageSender()
# 获取聊天流信息
self.chat_stream = chat_manager.get_stream(self.stream_id)
self.stop_action_planner = False
except Exception as e:
logger.error(f"初始化对话实例:注册运行组件失败: {e}")
logger.error(traceback.format_exc())
raise
try:
#决策所需要的信息,包括自身自信和观察信息两部分
#注册观察器和观测信息
self.chat_observer = ChatObserver.get_instance(self.stream_id)
self.chat_observer.start()
self.observation_info = ObservationInfo()
self.observation_info.bind_to_chat_observer(self.stream_id)
#对话信息
self.conversation_info = ConversationInfo()
except Exception as e:
logger.error(f"初始化对话实例:注册信息组件失败: {e}")
logger.error(traceback.format_exc())
raise
# 组件准备完成,启动该论对话
self.should_continue = True
asyncio.create_task(self.start())
async def start(self):
"""开始对话流程"""
try:
logger.info("对话系统启动中...")
asyncio.create_task(self._plan_and_action_loop())
except Exception as e:
logger.error(f"启动对话系统失败: {e}")
raise
async def _plan_and_action_loop(self):
"""思考步PFC核心循环模块"""
# 获取最近的消息历史
while self.should_continue:
# 使用决策信息来辅助行动规划
action, reason = await self.action_planner.plan(
self.observation_info,
self.conversation_info
)
if self._check_new_messages_after_planning():
continue
# 执行行动
await self._handle_action(action, reason, self.observation_info, self.conversation_info)
def _check_new_messages_after_planning(self):
"""检查在规划后是否有新消息"""
if self.observation_info.new_messages_count > 0:
logger.info(f"发现{self.observation_info.new_messages_count}条新消息,可能需要重新考虑行动")
# 如果需要,可以在这里添加逻辑来根据新消息重新决定行动
return True
return False
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
chat_info = msg_dict.get("chat_info", {})
chat_stream = ChatStream.from_dict(chat_info)
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
return Message(
message_id=msg_dict["message_id"],
chat_stream=chat_stream,
time=msg_dict["time"],
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", "")
)
except Exception as e:
logger.warning(f"转换消息时出错: {e}")
raise
async def _handle_action(self, action: str, reason: str, observation_info: ObservationInfo, conversation_info: ConversationInfo):
"""处理规划的行动"""
logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史先设置为stop完成后再设置为done
conversation_info.done_action.append({
"action": action,
"reason": reason,
"status": "start",
"time": datetime.datetime.now().strftime("%H:%M:%S")
})
if action == "direct_reply":
self.state = ConversationState.GENERATING
self.generated_reply = await self.reply_generator.generate(
observation_info,
conversation_info
)
# # 检查回复是否合适
# is_suitable, reason, need_replan = await self.reply_generator.check_reply(
# self.generated_reply,
# self.current_goal
# )
if self._check_new_messages_after_planning():
return None
await self._send_reply()
conversation_info.done_action.append({
"action": action,
"reason": reason,
"status": "done",
"time": datetime.datetime.now().strftime("%H:%M:%S")
})
elif action == "fetch_knowledge":
self.state = ConversationState.FETCHING
knowledge = "TODO:知识"
topic = "TODO:关键词"
logger.info(f"假装获取到知识{knowledge},关键词是: {topic}")
if knowledge:
if topic not in self.conversation_info.knowledge_list:
self.conversation_info.knowledge_list.append({
"topic": topic,
"knowledge": knowledge
})
else:
self.conversation_info.knowledge_list[topic] += knowledge
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
await self.goal_analyzer.analyze_goal(conversation_info, observation_info)
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info("倾听对方发言...")
if await self.waiter.wait(): # 如果返回True表示超时
await self._send_timeout_message()
await self._stop_conversation()
else: # wait
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
if await self.waiter.wait(): # 如果返回True表示超时
await self._send_timeout_message()
await self._stop_conversation()
async def _send_timeout_message(self):
"""发送超时结束消息"""
try:
messages = self.chat_observer.get_cached_messages(limit=1)
if not messages:
return
latest_message = self._convert_to_message(messages[0])
await self.direct_sender.send_message(
chat_stream=self.chat_stream,
content="TODO:超时消息",
reply_to_message=latest_message
)
except Exception as e:
logger.error(f"发送超时消息失败: {str(e)}")
async def _send_reply(self):
"""发送回复"""
if not self.generated_reply:
logger.warning("没有生成回复")
return
messages = self.chat_observer.get_cached_messages(limit=1)
if not messages:
logger.warning("没有最近的消息可以回复")
return
latest_message = self._convert_to_message(messages[0])
try:
await self.direct_sender.send_message(
chat_stream=self.chat_stream,
content=self.generated_reply,
reply_to_message=latest_message
)
self.chat_observer.trigger_update() # 触发立即更新
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
self.state = ConversationState.ANALYZING
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
self.state = ConversationState.ANALYZING

View File

@@ -0,0 +1,8 @@
class ConversationInfo:
def __init__(self):
self.done_action = []
self.goal_list = []
self.knowledge_list = []
self.memory_list = []

View File

@@ -0,0 +1,49 @@
from typing import Optional
from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream
from ..chat.message import Message
from ..message.message_base import Seg
from src.plugins.chat.message import MessageSending
logger = get_module_logger("message_sender")
class DirectMessageSender:
"""直接消息发送器"""
def __init__(self):
pass
async def send_message(
self,
chat_stream: ChatStream,
content: str,
reply_to_message: Optional[Message] = None,
) -> None:
"""发送消息到聊天流
Args:
chat_stream: 聊天流
content: 消息内容
reply_to_message: 要回复的消息(可选)
"""
try:
# 创建消息内容
segments = [Seg(type="text", data={"text": content})]
# 检查是否需要引用回复
if reply_to_message:
reply_id = reply_to_message.message_id
message_sending = MessageSending(
segments=segments,
reply_to_id=reply_id
)
else:
message_sending = MessageSending(segments=segments)
# 发送消息
await chat_stream.send_message(message_sending)
logger.info(f"消息已发送: {content}")
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
raise

View File

@@ -0,0 +1,134 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from src.common.database import db
class MessageStorage(ABC):
"""消息存储接口"""
@abstractmethod
async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取指定消息ID之后的所有消息
Args:
chat_id: 聊天ID
message_id: 消息ID如果为None则获取所有消息
Returns:
List[Dict[str, Any]]: 消息列表
"""
pass
@abstractmethod
async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]:
"""获取指定时间点之前的消息
Args:
chat_id: 聊天ID
time_point: 时间戳
limit: 最大消息数量
Returns:
List[Dict[str, Any]]: 消息列表
"""
pass
@abstractmethod
async def has_new_messages(self, chat_id: str, after_time: float) -> bool:
"""检查是否有新消息
Args:
chat_id: 聊天ID
after_time: 时间戳
Returns:
bool: 是否有新消息
"""
pass
class MongoDBMessageStorage(MessageStorage):
"""MongoDB消息存储实现"""
def __init__(self):
self.db = db
async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]:
query = {"chat_id": chat_id}
if message_id:
# 获取ID大于message_id的消息
last_message = self.db.messages.find_one({"message_id": message_id})
if last_message:
query["time"] = {"$gt": last_message["time"]}
return list(
self.db.messages.find(query).sort("time", 1)
)
async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]:
query = {
"chat_id": chat_id,
"time": {"$lt": time_point}
}
messages = list(
self.db.messages.find(query).sort("time", -1).limit(limit)
)
# 将消息按时间正序排列
messages.reverse()
return messages
async def has_new_messages(self, chat_id: str, after_time: float) -> bool:
query = {
"chat_id": chat_id,
"time": {"$gt": after_time}
}
return self.db.messages.find_one(query) is not None
# # 创建一个内存消息存储实现,用于测试
# class InMemoryMessageStorage(MessageStorage):
# """内存消息存储实现,主要用于测试"""
# def __init__(self):
# self.messages: Dict[str, List[Dict[str, Any]]] = {}
# async def get_messages_after(self, chat_id: str, message_id: Optional[str] = None) -> List[Dict[str, Any]]:
# if chat_id not in self.messages:
# return []
# messages = self.messages[chat_id]
# if not message_id:
# return messages
# # 找到message_id的索引
# try:
# index = next(i for i, m in enumerate(messages) if m["message_id"] == message_id)
# return messages[index + 1:]
# except StopIteration:
# return []
# async def get_messages_before(self, chat_id: str, time_point: float, limit: int = 5) -> List[Dict[str, Any]]:
# if chat_id not in self.messages:
# return []
# messages = [
# m for m in self.messages[chat_id]
# if m["time"] < time_point
# ]
# return messages[-limit:]
# async def has_new_messages(self, chat_id: str, after_time: float) -> bool:
# if chat_id not in self.messages:
# return False
# return any(m["time"] > after_time for m in self.messages[chat_id])
# # 测试辅助方法
# def add_message(self, chat_id: str, message: Dict[str, Any]):
# """添加测试消息"""
# if chat_id not in self.messages:
# self.messages[chat_id] = []
# self.messages[chat_id].append(message)
# self.messages[chat_id].sort(key=lambda m: m["time"])

View File

@@ -0,0 +1,71 @@
from typing import TYPE_CHECKING
from src.common.logger import get_module_logger
from .chat_states import NotificationHandler, Notification, NotificationType
if TYPE_CHECKING:
from .conversation import Conversation
logger = get_module_logger("notification_handler")
class PFCNotificationHandler(NotificationHandler):
"""PFC通知处理器"""
def __init__(self, conversation: 'Conversation'):
"""初始化PFC通知处理器
Args:
conversation: 对话实例
"""
self.conversation = conversation
async def handle_notification(self, notification: Notification):
"""处理通知
Args:
notification: 通知对象
"""
logger.debug(f"收到通知: {notification.type.name}, 数据: {notification.data}")
# 根据通知类型执行不同的处理
if notification.type == NotificationType.NEW_MESSAGE:
# 新消息通知
await self._handle_new_message(notification)
elif notification.type == NotificationType.COLD_CHAT:
# 冷聊天通知
await self._handle_cold_chat(notification)
elif notification.type == NotificationType.COMMAND:
# 命令通知
await self._handle_command(notification)
else:
logger.warning(f"未知的通知类型: {notification.type.name}")
async def _handle_new_message(self, notification: Notification):
"""处理新消息通知
Args:
notification: 通知对象
"""
# 更新决策信息
observation_info = self.conversation.observation_info
observation_info.last_message_time = notification.data.get("time", 0)
observation_info.add_unprocessed_message(notification.data)
# 手动触发观察器更新
self.conversation.chat_observer.trigger_update()
async def _handle_cold_chat(self, notification: Notification):
"""处理冷聊天通知
Args:
notification: 通知对象
"""
# 获取冷聊天信息
cold_duration = notification.data.get("duration", 0)
# 更新决策信息
observation_info = self.conversation.observation_info
observation_info.conversation_cold_duration = cold_duration
logger.info(f"对话已冷: {cold_duration}")

View File

@@ -0,0 +1,246 @@
#Programmable Friendly Conversationalist
#Prefrontal cortex
from typing import List, Optional, Dict, Any, Set
from ..message.message_base import UserInfo
import time
from dataclasses import dataclass, field
from src.common.logger import get_module_logger
from .chat_observer import ChatObserver
from .chat_states import NotificationHandler
logger = get_module_logger("observation_info")
class ObservationInfoHandler(NotificationHandler):
"""ObservationInfo的通知处理器"""
def __init__(self, observation_info: 'ObservationInfo'):
"""初始化处理器
Args:
observation_info: 要更新的ObservationInfo实例
"""
self.observation_info = observation_info
async def handle_notification(self, notification: Dict[str, Any]):
"""处理通知
Args:
notification: 通知数据
"""
notification_type = notification.get("type")
data = notification.get("data", {})
if notification_type == "NEW_MESSAGE":
# 处理新消息通知
logger.debug(f"收到新消息通知data: {data}")
message = data.get("message", {})
self.observation_info.update_from_message(message)
# self.observation_info.has_unread_messages = True
# self.observation_info.new_unread_message.append(message.get("processed_plain_text", ""))
elif notification_type == "COLD_CHAT":
# 处理冷场通知
is_cold = data.get("is_cold", False)
self.observation_info.update_cold_chat_status(is_cold, time.time())
elif notification_type == "ACTIVE_CHAT":
# 处理活跃通知
is_active = data.get("is_active", False)
self.observation_info.is_cold = not is_active
elif notification_type == "BOT_SPEAKING":
# 处理机器人说话通知
self.observation_info.is_typing = False
self.observation_info.last_bot_speak_time = time.time()
elif notification_type == "USER_SPEAKING":
# 处理用户说话通知
self.observation_info.is_typing = False
self.observation_info.last_user_speak_time = time.time()
elif notification_type == "MESSAGE_DELETED":
# 处理消息删除通知
message_id = data.get("message_id")
self.observation_info.unprocessed_messages = [
msg for msg in self.observation_info.unprocessed_messages
if msg.get("message_id") != message_id
]
elif notification_type == "USER_JOINED":
# 处理用户加入通知
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.add(user_id)
elif notification_type == "USER_LEFT":
# 处理用户离开通知
user_id = data.get("user_id")
if user_id:
self.observation_info.active_users.discard(user_id)
elif notification_type == "ERROR":
# 处理错误通知
error_msg = data.get("error", "")
logger.error(f"收到错误通知: {error_msg}")
@dataclass
class ObservationInfo:
"""决策信息类用于收集和管理来自chat_observer的通知信息"""
#data_list
chat_history: List[str] = field(default_factory=list)
unprocessed_messages: List[Dict[str, Any]] = field(default_factory=list)
active_users: Set[str] = field(default_factory=set)
#data
last_bot_speak_time: Optional[float] = None
last_user_speak_time: Optional[float] = None
last_message_time: Optional[float] = None
last_message_content: str = ""
last_message_sender: Optional[str] = None
bot_id: Optional[str] = None
new_messages_count: int = 0
cold_chat_duration: float = 0.0
#state
is_typing: bool = False
has_unread_messages: bool = False
is_cold_chat: bool = False
changed: bool = False
# #spec
# meta_plan_trigger: bool = False
def __post_init__(self):
"""初始化后创建handler"""
self.chat_observer = None
self.handler = ObservationInfoHandler(self)
def bind_to_chat_observer(self, stream_id: str):
"""绑定到指定的chat_observer
Args:
stream_id: 聊天流ID
"""
self.chat_observer = ChatObserver.get_instance(stream_id)
self.chat_observer.notification_manager.register_handler(
target="observation_info",
notification_type="NEW_MESSAGE",
handler=self.handler
)
self.chat_observer.notification_manager.register_handler(
target="observation_info",
notification_type="COLD_CHAT",
handler=self.handler
)
def unbind_from_chat_observer(self):
"""解除与chat_observer的绑定"""
if self.chat_observer:
self.chat_observer.notification_manager.unregister_handler(
target="observation_info",
notification_type="NEW_MESSAGE",
handler=self.handler
)
self.chat_observer.notification_manager.unregister_handler(
target="observation_info",
notification_type="COLD_CHAT",
handler=self.handler
)
self.chat_observer = None
def update_from_message(self, message: Dict[str, Any]):
"""从消息更新信息
Args:
message: 消息数据
"""
logger.debug(f"更新信息from_message: {message}")
self.last_message_time = message["time"]
self.last_message_content = message.get("processed_plain_text", "")
user_info = UserInfo.from_dict(message.get("user_info", {}))
self.last_message_sender = user_info.user_id
if user_info.user_id == self.bot_id:
self.last_bot_speak_time = message["time"]
else:
self.last_user_speak_time = message["time"]
self.active_users.add(user_info.user_id)
self.new_messages_count += 1
self.unprocessed_messages.append(message)
self.update_changed()
def update_changed(self):
"""更新changed状态"""
self.changed = True
# self.meta_plan_trigger = True
def update_cold_chat_status(self, is_cold: bool, current_time: float):
"""更新冷场状态
Args:
is_cold: 是否冷场
current_time: 当前时间
"""
self.is_cold_chat = is_cold
if is_cold and self.last_message_time:
self.cold_chat_duration = current_time - self.last_message_time
def get_active_duration(self) -> float:
"""获取当前活跃时长
Returns:
float: 最后一条消息到现在的时长(秒)
"""
if not self.last_message_time:
return 0.0
return time.time() - self.last_message_time
def get_user_response_time(self) -> Optional[float]:
"""获取用户响应时间
Returns:
Optional[float]: 用户最后发言到现在的时长如果没有用户发言则返回None
"""
if not self.last_user_speak_time:
return None
return time.time() - self.last_user_speak_time
def get_bot_response_time(self) -> Optional[float]:
"""获取机器人响应时间
Returns:
Optional[float]: 机器人最后发言到现在的时长如果没有机器人发言则返回None
"""
if not self.last_bot_speak_time:
return None
return time.time() - self.last_bot_speak_time
def clear_unprocessed_messages(self):
"""清空未处理消息列表"""
# 将未处理消息添加到历史记录中
for message in self.unprocessed_messages:
if "processed_plain_text" in message:
self.chat_history.append(message["processed_plain_text"])
# 清空未处理消息列表
self.has_unread_messages = False
self.unprocessed_messages.clear()
self.new_messages_count = 0
def add_unprocessed_message(self, message: Dict[str, Any]):
"""添加未处理的消息
Args:
message: 消息数据
"""
# 防止重复添加同一消息
message_id = message.get("message_id")
if message_id and not any(m.get("message_id") == message_id for m in self.unprocessed_messages):
self.unprocessed_messages.append(message)
self.new_messages_count += 1
# 同时更新其他消息相关信息
self.update_from_message(message)

View File

@@ -2,8 +2,7 @@
# Prefrontal cortex # Prefrontal cortex
import datetime import datetime
import asyncio import asyncio
from typing import List, Optional, Dict, Any, Tuple, Literal from typing import List, Optional, Tuple, TYPE_CHECKING
from enum import Enum
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from ..chat.chat_stream import ChatStream from ..chat.chat_stream import ChatStream
from ..message.message_base import UserInfo, Seg from ..message.message_base import UserInfo, Seg
@@ -11,153 +10,21 @@ from ..chat.message import Message
from ..models.utils_model import LLM_request from ..models.utils_model import LLM_request
from ..config.config import global_config from ..config.config import global_config
from src.plugins.chat.message import MessageSending from src.plugins.chat.message import MessageSending
from src.plugins.chat.chat_stream import chat_manager
from ..message.api import global_api from ..message.api import global_api
from ..storage.storage import MessageStorage from ..storage.storage import MessageStorage
from .chat_observer import ChatObserver from .chat_observer import ChatObserver
from .pfc_KnowledgeFetcher import KnowledgeFetcher
from .reply_checker import ReplyChecker
from .pfc_utils import get_items_from_json from .pfc_utils import get_items_from_json
from src.individuality.individuality import Individuality from src.individuality.individuality import Individuality
from .conversation_info import ConversationInfo
from .observation_info import ObservationInfo
import time import time
if TYPE_CHECKING:
pass
logger = get_module_logger("pfc") logger = get_module_logger("pfc")
class ConversationState(Enum):
"""对话状态"""
INIT = "初始化"
RETHINKING = "重新思考"
ANALYZING = "分析历史"
PLANNING = "规划目标"
GENERATING = "生成回复"
CHECKING = "检查回复"
SENDING = "发送消息"
WAITING = "等待"
LISTENING = "倾听"
ENDED = "结束"
JUDGING = "判断"
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]
class ActionPlanner:
"""行动规划器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal, temperature=0.7, max_tokens=1000, request_type="action_planning"
)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
async def plan(
self,
goal: str,
method: str,
reasoning: str,
action_history: List[Dict[str, str]] = None,
chat_observer: Optional[ChatObserver] = None, # 添加chat_observer参数
) -> Tuple[str, str]:
"""规划下一步行动
Args:
goal: 对话目标
reasoning: 目标原因
action_history: 行动历史记录
Returns:
Tuple[str, str]: (行动类型, 行动原因)
"""
# 构建提示词
# 获取最近20条消息
self.chat_observer.waiting_start_time = time.time()
messages = self.chat_observer.get_message_history(limit=20)
chat_history_text = ""
for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}"
if sender == self.name:
sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
personality_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本
action_history_text = ""
if action_history:
if action_history[-1]["action"] == "direct_reply":
action_history_text = "你刚刚发言回复了对方"
# 获取时间信息
time_info = self.chat_observer.get_time_info()
prompt = f"""现在你在参与一场QQ聊天请分析以下内容根据信息决定下一步行动
{personality_text}
当前对话目标:{goal}
实现该对话目标的方式:{method}
产生该对话目标的原因:{reasoning}
{time_info}
最近的对话记录:
{chat_history_text}
{action_history_text}
请你接下去想想要你要做什么,可以发言,可以等待,可以倾听,可以调取知识。注意不同行动类型的要求,不要重复发言:
行动类型:
fetch_knowledge: 需要调取知识,当需要专业知识或特定信息时选择
wait: 当你做出了发言,对方尚未回复时等待对方的回复
listening: 倾听对方发言,当你认为对方发言尚未结束时采用
direct_reply: 不符合上述情况,回复对方,注意不要过多或者重复发言
rethink_goal: 重新思考对话目标,当发现对话目标不合适时选择,会重新思考对话目标
judge_conversation: 判断对话是否结束,当发现对话目标已经达到或者希望停止对话时选择,会判断对话是否结束
请以JSON格式输出包含以下字段
1. action: 行动类型,注意你之前的行为
2. reason: 选择该行动的原因,注意你之前的行为(简要解释)
注意请严格按照JSON格式输出不要包含任何其他内容。"""
logger.debug(f"发送到LLM的提示词: {prompt}")
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}")
# 使用简化函数提取JSON内容
success, result = get_items_from_json(
content, "action", "reason", default_values={"action": "direct_reply", "reason": "默认原因"}
)
if not success:
return "direct_reply", "JSON解析失败选择直接回复"
action = result["action"]
reason = result["reason"]
# 验证action类型
if action not in [
"direct_reply",
"fetch_knowledge",
"wait",
"listening",
"rethink_goal",
"judge_conversation",
]:
logger.warning(f"未知的行动类型: {action}默认使用listening")
action = "listening"
logger.info(f"规划的行动: {action}")
logger.info(f"行动原因: {reason}")
return action, reason
except Exception as e:
logger.error(f"规划行动时出错: {str(e)}")
return "direct_reply", "发生错误,选择直接回复"
class GoalAnalyzer: class GoalAnalyzer:
"""对话目标分析器""" """对话目标分析器"""
@@ -176,42 +43,55 @@ class GoalAnalyzer:
self.max_goals = 3 # 同时保持的最大目标数量 self.max_goals = 3 # 同时保持的最大目标数量
self.current_goal_and_reason = None self.current_goal_and_reason = None
async def analyze_goal(self) -> Tuple[str, str, str]: async def analyze_goal(self, conversation_info: ConversationInfo, observation_info: ObservationInfo):
"""分析对话历史并设定目标 """分析对话历史并设定目标
Args: Args:
chat_history: 聊天历史记录列表 conversation_info: 对话信息
observation_info: 观察信息
Returns: Returns:
Tuple[str, str, str]: (目标, 方法, 原因) Tuple[str, str, str]: (目标, 方法, 原因)
""" """
max_retries = 3 #构建对话目标
for retry in range(max_retries): goal_list = conversation_info.goal_list
try: goal_text = ""
# 构建提示词 for goal, reason in goal_list:
messages = self.chat_observer.get_message_history(limit=20) goal_text += f"目标:{goal};"
chat_history_text = "" goal_text += f"原因:{reason}\n"
for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}"
if sender == self.name:
sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
personality_text = f"你的名字是{self.name}{self.personality_info}"
# 构建当前已有目标的文本 # 获取聊天历史记录
existing_goals_text = "" chat_history_list = observation_info.chat_history
if self.goals: chat_history_text = ""
existing_goals_text = "当前已有的对话目标:\n" for msg in chat_history_list:
for i, (goal, _, reason) in enumerate(self.goals): chat_history_text += f"{msg}\n"
existing_goals_text += f"{i + 1}. 目标: {goal}, 原因: {reason}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下聊天记录并根据你的性格特征确定多个明确的对话目标。 if observation_info.new_messages_count > 0:
new_messages_list = observation_info.unprocessed_messages
chat_history_text += f"{observation_info.new_messages_count}条新消息:\n"
for msg in new_messages_list:
chat_history_text += f"{msg}\n"
observation_info.clear_unprocessed_messages()
personality_text = f"你的名字是{self.name}{self.personality_info}"
# 构建action历史文本
action_history_list = conversation_info.done_action
action_history_text = "你之前做的事情是:"
for action in action_history_list:
action_history_text += f"{action}\n"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请分析以下聊天记录并根据你的性格特征确定多个明确的对话目标。
这些目标应该反映出对话的不同方面和意图。 这些目标应该反映出对话的不同方面和意图。
{existing_goals_text} {action_history_text}
当前对话目标:
{goal_text}
聊天记录: 聊天记录:
{chat_history_text} {chat_history_text}
@@ -222,53 +102,36 @@ class GoalAnalyzer:
3. 添加新目标 3. 添加新目标
4. 删除不再相关的目标 4. 删除不再相关的目标
请以JSON格式输出一个当前最主要的对话目标,包含以下字段: 请以JSON格式输出当前的所有对话目标,包含以下字段:
1. goal: 对话目标(简短的一句话) 1. goal: 对话目标(简短的一句话)
2. reasoning: 对话原因,为什么设定这个目标(简要解释) 2. reasoning: 对话原因,为什么设定这个目标(简要解释)
输出格式示例: 输出格式示例:
{{ {{
"goal": "回答用户关于Python编程的具体问题", "goal": "回答用户关于Python编程的具体问题",
"reasoning": "用户提出了关于Python的技术问题需要专业且准确的解答" "reasoning": "用户提出了关于Python的技术问题需要专业且准确的解答"
}},
{{
"goal": "回答用户关于python安装的具体问题",
"reasoning": "用户提出了关于Python的技术问题需要专业且准确的解答"
}}""" }}"""
logger.debug(f"发送到LLM的提示词: {prompt}") logger.debug(f"发送到LLM的提示词: {prompt}")
content, _ = await self.llm.generate_response_async(prompt) content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}") logger.debug(f"LLM原始返回内容: {content}")
# 使用简化函数提取JSON内容 # 使用简化函数提取JSON内容
success, result = get_items_from_json( success, result = get_items_from_json(
content, "goal", "reasoning", required_types={"goal": str, "reasoning": str} content,
) "goal", "reasoning",
required_types={"goal": str, "reasoning": str}
)
#TODO
if not success:
logger.error(f"无法解析JSON重试第{retry + 1}")
continue
goal = result["goal"] conversation_info.goal_list.append(result)
reasoning = result["reasoning"]
# 使用默认的方法
method = "以友好的态度回应"
# 更新目标列表
await self._update_goals(goal, method, reasoning)
# 返回当前最主要的目标
if self.goals:
current_goal, current_method, current_reasoning = self.goals[0]
return current_goal, current_method, current_reasoning
else:
return goal, method, reasoning
except Exception as e:
logger.error(f"分析对话目标时出错: {str(e)},重试第{retry + 1}")
if retry == max_retries - 1:
return "保持友好的对话", "以友好的态度回应", "确保对话顺利进行"
continue
# 所有重试都失败后的默认返回
return "保持友好的对话", "以友好的态度回应", "确保对话顺利进行"
async def _update_goals(self, new_goal: str, method: str, reasoning: str): async def _update_goals(self, new_goal: str, method: str, reasoning: str):
"""更新目标列表 """更新目标列表
@@ -332,7 +195,7 @@ class GoalAnalyzer:
return self.goals[1:].copy() return self.goals[1:].copy()
async def analyze_conversation(self, goal, reasoning): async def analyze_conversation(self, goal, reasoning):
messages = self.chat_observer.get_message_history() messages = self.chat_observer.get_cached_messages()
chat_history_text = "" chat_history_text = ""
for msg in messages: for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
@@ -360,40 +223,33 @@ class GoalAnalyzer:
{{ {{
"goal_achieved": true, "goal_achieved": true,
"stop_conversation": false, "stop_conversation": false,
"reason": "用户已经得到了满意的回答,但我仍希望继续聊天" "reason": "虽然目标已达成,但对话仍然有继续的价值"
}}""" }}"""
logger.debug(f"发送到LLM的提示词: {prompt}")
try: try:
content, _ = await self.llm.generate_response_async(prompt) content, _ = await self.llm.generate_response_async(prompt)
logger.debug(f"LLM原始返回内容: {content}") logger.debug(f"LLM原始返回内容: {content}")
# 使用简化函数提取JSON内容 # 尝试解析JSON
success, result = get_items_from_json( success, result = get_items_from_json(
content, content,
"goal_achieved", "goal_achieved", "stop_conversation", "reason",
"stop_conversation", required_types={"goal_achieved": bool, "stop_conversation": bool, "reason": str}
"reason",
required_types={"goal_achieved": bool, "stop_conversation": bool, "reason": str},
) )
if not success: if not success:
return False, False, "确保对话顺利进行" logger.error("无法解析对话分析结果JSON")
return False, False, "解析结果失败"
# 如果当前目标达成,从目标列表中移除 goal_achieved = result["goal_achieved"]
if result["goal_achieved"] and not result["stop_conversation"]: stop_conversation = result["stop_conversation"]
for i, (g, _, _) in enumerate(self.goals): reason = result["reason"]
if g == goal:
self.goals.pop(i)
# 如果还有其他目标,不停止对话
if self.goals:
result["stop_conversation"] = False
break
return result["goal_achieved"], result["stop_conversation"], result["reason"] return goal_achieved, stop_conversation, reason
except Exception as e: except Exception as e:
logger.error(f"分析对话目标时出错: {str(e)}") logger.error(f"分析对话状态时出错: {str(e)}")
return False, False, "确保对话顺利进行" return False, False, f"分析出错: {str(e)}"
class Waiter: class Waiter:
@@ -410,563 +266,24 @@ class Waiter:
Returns: Returns:
bool: 是否超时True表示超时 bool: 是否超时True表示超时
""" """
wait_start_time = self.chat_observer.waiting_start_time # 使用当前时间作为等待开始时间
while not self.chat_observer.new_message_after(wait_start_time): wait_start_time = time.time()
await asyncio.sleep(1) self.chat_observer.waiting_start_time = wait_start_time # 设置等待开始时间
logger.info("等待中...")
# 检查是否超过60秒 while True:
# 检查是否有新消息
if self.chat_observer.new_message_after(wait_start_time):
logger.info("等待结束,收到新消息")
return False
# 检查是否超时
if time.time() - wait_start_time > 300: if time.time() - wait_start_time > 300:
logger.info("等待超过300秒结束对话") logger.info("等待超过300秒结束对话")
return True return True
logger.info("等待结束")
return False
class ReplyGenerator:
"""回复生成器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal, temperature=0.7, max_tokens=300, request_type="reply_generation"
)
self.personality_info = Individuality.get_instance().get_prompt(type="personality", x_person=2, level=2)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
self.reply_checker = ReplyChecker(stream_id)
async def generate(
self,
goal: str,
chat_history: List[Message],
knowledge_cache: Dict[str, str],
previous_reply: Optional[str] = None,
retry_count: int = 0,
) -> str:
"""生成回复
Args:
goal: 对话目标
chat_history: 聊天历史
knowledge_cache: 知识缓存
previous_reply: 上一次生成的回复(如果有)
retry_count: 当前重试次数
Returns:
str: 生成的回复
"""
# 构建提示词
logger.debug(f"开始生成回复:当前目标: {goal}")
self.chat_observer.trigger_update() # 触发立即更新
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
messages = self.chat_observer.get_message_history(limit=20)
chat_history_text = ""
for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")
user_info = UserInfo.from_dict(msg.get("user_info", {}))
sender = user_info.user_nickname or f"用户{user_info.user_id}"
if sender == self.name:
sender = "你说"
chat_history_text += f"{time_str},{sender}:{msg.get('processed_plain_text', '')}\n"
# 整理知识缓存
knowledge_text = ""
if knowledge_cache:
knowledge_text = "\n相关知识:"
if isinstance(knowledge_cache, dict):
for _source, content in knowledge_cache.items():
knowledge_text += f"\n{content}"
elif isinstance(knowledge_cache, list):
for item in knowledge_cache:
knowledge_text += f"\n{item}"
# 添加上一次生成的回复信息
previous_reply_text = ""
if previous_reply:
previous_reply_text = f"\n上一次生成的回复(需要改进):\n{previous_reply}"
personality_text = f"你的名字是{self.name}{self.personality_info}"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请根据以下信息生成回复
当前对话目标:{goal}
{knowledge_text}
{previous_reply_text}
最近的聊天记录:
{chat_history_text}
请根据上述信息,以你的性格特征生成一个自然、得体的回复。回复应该:
1. 符合对话目标,以""的角度发言
2. 体现你的性格特征
3. 自然流畅,像正常聊天一样,简短
4. 适当利用相关知识,但不要生硬引用
{"5. 改进上一次回复中的问题" if previous_reply else ""}
请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清""和对方说的话,不要把""说的话当做对方说的话,这是你自己说的话。
请你回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话
请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。
请直接输出回复内容,不需要任何额外格式。"""
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.info(f"生成的回复: {content}")
is_new = self.chat_observer.check()
logger.debug(f"再看一眼聊天记录,{'' if is_new else '没有'}新消息")
# 如果有新消息,重新生成回复
if is_new:
logger.info("检测到新消息,重新生成回复")
return await self.generate(goal, chat_history, knowledge_cache, None, retry_count)
return content
except Exception as e:
logger.error(f"生成回复时出错: {e}")
return "抱歉,我现在有点混乱,让我重新思考一下..."
async def check_reply(self, reply: str, goal: str, retry_count: int = 0) -> Tuple[bool, str, bool]:
"""检查回复是否合适
Args:
reply: 生成的回复
goal: 对话目标
retry_count: 当前重试次数
Returns:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
"""
return await self.reply_checker.check(reply, goal, retry_count)
class Conversation:
# 类级别的实例管理
_instances: Dict[str, "Conversation"] = {}
_instance_lock = asyncio.Lock() # 类级别的全局锁
_init_events: Dict[str, asyncio.Event] = {} # 初始化完成事件
_initializing: Dict[str, bool] = {} # 标记是否正在初始化
@classmethod
async def get_instance(cls, stream_id: str) -> Optional["Conversation"]:
"""获取或创建对话实例
Args:
stream_id: 聊天流ID
Returns:
Optional[Conversation]: 对话实例如果创建或等待失败则返回None
"""
try:
# 使用全局锁来确保线程安全
async with cls._instance_lock:
# 如果已经在初始化中,等待初始化完成
if stream_id in cls._initializing and cls._initializing[stream_id]:
# 释放锁等待初始化
cls._instance_lock.release()
try:
await asyncio.wait_for(cls._init_events[stream_id].wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.error(f"等待实例 {stream_id} 初始化超时")
return None
finally:
await cls._instance_lock.acquire()
# 如果实例不存在,创建新实例
if stream_id not in cls._instances:
cls._instances[stream_id] = cls(stream_id)
cls._init_events[stream_id] = asyncio.Event()
cls._initializing[stream_id] = True
logger.info(f"创建新的对话实例: {stream_id}")
return cls._instances[stream_id]
except Exception as e:
logger.error(f"获取对话实例失败: {e}")
return None
@classmethod
async def remove_instance(cls, stream_id: str):
"""删除对话实例
Args:
stream_id: 聊天流ID
"""
async with cls._instance_lock:
if stream_id in cls._instances:
# 停止相关组件
instance = cls._instances[stream_id]
instance.chat_observer.stop()
# 删除实例
del cls._instances[stream_id]
if stream_id in cls._init_events:
del cls._init_events[stream_id]
if stream_id in cls._initializing:
del cls._initializing[stream_id]
logger.info(f"已删除对话实例 {stream_id}")
def __init__(self, stream_id: str):
"""初始化对话系统"""
self.stream_id = stream_id
self.state = ConversationState.INIT
self.current_goal: Optional[str] = None
self.current_method: Optional[str] = None
self.goal_reasoning: Optional[str] = None
self.generated_reply: Optional[str] = None
self.should_continue = True
# 初始化聊天观察器
self.chat_observer = ChatObserver.get_instance(stream_id)
# 添加action历史记录
self.action_history: List[Dict[str, str]] = []
# 知识缓存
self.knowledge_cache: Dict[str, str] = {} # 确保初始化为字典
# 初始化各个组件
self.goal_analyzer = GoalAnalyzer(self.stream_id)
self.action_planner = ActionPlanner(self.stream_id)
self.reply_generator = ReplyGenerator(self.stream_id)
self.knowledge_fetcher = KnowledgeFetcher()
self.direct_sender = DirectMessageSender()
self.waiter = Waiter(self.stream_id)
# 创建聊天流
self.chat_stream = chat_manager.get_stream(self.stream_id)
def _clear_knowledge_cache(self):
"""清空知识缓存"""
self.knowledge_cache.clear() # 使用clear方法清空字典
async def start(self):
"""开始对话流程"""
try:
logger.info("对话系统启动")
self.should_continue = True
self.chat_observer.start() # 启动观察器
await asyncio.sleep(1) await asyncio.sleep(1)
# 启动对话循环 logger.info("等待中...")
await self._conversation_loop()
except Exception as e:
logger.error(f"启动对话系统失败: {e}")
raise
finally:
# 标记初始化完成
self._init_events[self.stream_id].set()
self._initializing[self.stream_id] = False
async def _conversation_loop(self):
"""对话循环"""
# 获取最近的消息历史
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
while self.should_continue:
# 执行行动
self.chat_observer.trigger_update() # 触发立即更新
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
action, reason = await self.action_planner.plan(
self.current_goal,
self.current_method,
self.goal_reasoning,
self.action_history, # 传入action历史
self.chat_observer, # 传入chat_observer
)
# 执行行动
await self._handle_action(action, reason)
def _convert_to_message(self, msg_dict: Dict[str, Any]) -> Message:
"""将消息字典转换为Message对象"""
try:
chat_info = msg_dict.get("chat_info", {})
chat_stream = ChatStream.from_dict(chat_info)
user_info = UserInfo.from_dict(msg_dict.get("user_info", {}))
return Message(
message_id=msg_dict["message_id"],
chat_stream=chat_stream,
time=msg_dict["time"],
user_info=user_info,
processed_plain_text=msg_dict.get("processed_plain_text", ""),
detailed_plain_text=msg_dict.get("detailed_plain_text", ""),
)
except Exception as e:
logger.warning(f"转换消息时出错: {e}")
raise
async def _handle_action(self, action: str, reason: str):
"""处理规划的行动"""
logger.info(f"执行行动: {action}, 原因: {reason}")
# 记录action历史
self.action_history.append(
{"action": action, "reason": reason, "time": datetime.datetime.now().strftime("%H:%M:%S")}
)
# 只保留最近的10条记录
if len(self.action_history) > 10:
self.action_history = self.action_history[-10:]
if action == "direct_reply":
self.state = ConversationState.GENERATING
messages = self.chat_observer.get_message_history(limit=30)
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
)
# 检查回复是否合适
is_suitable, reason, need_replan = await self.reply_generator.check_reply(
self.generated_reply, self.current_goal
)
if not is_suitable:
logger.warning(f"生成的回复不合适,原因: {reason}")
if need_replan:
# 尝试切换到其他备选目标
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 有备选目标,尝试使用下一个目标
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"切换到备选目标: {self.current_goal}")
# 使用新目标生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
)
# 检查使用新目标生成的回复是否合适
is_suitable, reason, _ = await self.reply_generator.check_reply(
self.generated_reply, self.current_goal
)
if is_suitable:
# 如果新目标的回复合适,调整目标优先级
await self.goal_analyzer._update_goals(
self.current_goal, self.current_method, self.goal_reasoning
)
else:
# 如果新目标还是不合适,重新思考目标
self.state = ConversationState.RETHINKING
(
self.current_goal,
self.current_method,
self.goal_reasoning,
) = await self.goal_analyzer.analyze_goal()
return
else:
# 没有备选目标,重新分析
self.state = ConversationState.RETHINKING
(
self.current_goal,
self.current_method,
self.goal_reasoning,
) = await self.goal_analyzer.analyze_goal()
return
else:
# 重新生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
self.generated_reply, # 将不合适的回复作为previous_reply传入
)
while self.chat_observer.check():
if not is_suitable:
logger.warning(f"生成的回复不合适,原因: {reason}")
if need_replan:
# 尝试切换到其他备选目标
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 有备选目标,尝试使用下一个目标
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"切换到备选目标: {self.current_goal}")
# 使用新目标生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
)
is_suitable = True # 假设使用新目标后回复是合适的
else:
# 没有备选目标,重新分析
self.state = ConversationState.RETHINKING
(
self.current_goal,
self.current_method,
self.goal_reasoning,
) = await self.goal_analyzer.analyze_goal()
return
else:
# 重新生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
self.generated_reply, # 将不合适的回复作为previous_reply传入
)
await self._send_reply()
elif action == "fetch_knowledge":
self.state = ConversationState.GENERATING
messages = self.chat_observer.get_message_history(limit=30)
knowledge, sources = await self.knowledge_fetcher.fetch(
self.current_goal, [self._convert_to_message(msg) for msg in messages]
)
logger.info(f"获取到知识,来源: {sources}")
if knowledge != "未找到相关知识":
self.knowledge_cache[sources] = knowledge
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
)
# 检查回复是否合适
is_suitable, reason, need_replan = await self.reply_generator.check_reply(
self.generated_reply, self.current_goal
)
if not is_suitable:
logger.warning(f"生成的回复不合适,原因: {reason}")
if need_replan:
# 尝试切换到其他备选目标
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 有备选目标,尝试使用
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"切换到备选目标: {self.current_goal}")
# 使用新目标获取知识并生成回复
knowledge, sources = await self.knowledge_fetcher.fetch(
self.current_goal, [self._convert_to_message(msg) for msg in messages]
)
if knowledge != "未找到相关知识":
self.knowledge_cache[sources] = knowledge
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
)
else:
# 没有备选目标,重新分析
self.state = ConversationState.RETHINKING
(
self.current_goal,
self.current_method,
self.goal_reasoning,
) = await self.goal_analyzer.analyze_goal()
return
else:
# 重新生成回复
self.generated_reply = await self.reply_generator.generate(
self.current_goal,
self.current_method,
[self._convert_to_message(msg) for msg in messages],
self.knowledge_cache,
self.generated_reply, # 将不合适的回复作为previous_reply传入
)
await self._send_reply()
elif action == "rethink_goal":
self.state = ConversationState.RETHINKING
self.current_goal, self.current_method, self.goal_reasoning = await self.goal_analyzer.analyze_goal()
elif action == "judge_conversation":
self.state = ConversationState.JUDGING
self.goal_achieved, self.stop_conversation, self.reason = await self.goal_analyzer.analyze_conversation(
self.current_goal, self.goal_reasoning
)
# 如果当前目标达成但还有其他目标
if self.goal_achieved and not self.stop_conversation:
alternative_goals = await self.goal_analyzer.get_alternative_goals()
if alternative_goals:
# 切换到下一个目标
self.current_goal, self.current_method, self.goal_reasoning = alternative_goals[0]
logger.info(f"当前目标已达成,切换到新目标: {self.current_goal}")
return
if self.stop_conversation:
await self._stop_conversation()
elif action == "listening":
self.state = ConversationState.LISTENING
logger.info("倾听对方发言...")
if await self.waiter.wait(): # 如果返回True表示超时
await self._send_timeout_message()
await self._stop_conversation()
else: # wait
self.state = ConversationState.WAITING
logger.info("等待更多信息...")
if await self.waiter.wait(): # 如果返回True表示超时
await self._send_timeout_message()
await self._stop_conversation()
async def _stop_conversation(self):
"""完全停止对话"""
logger.info("停止对话")
self.should_continue = False
self.state = ConversationState.ENDED
# 删除实例这会同时停止chat_observer
await self.remove_instance(self.stream_id)
async def _send_timeout_message(self):
"""发送超时结束消息"""
try:
messages = self.chat_observer.get_message_history(limit=1)
if not messages:
return
latest_message = self._convert_to_message(messages[0])
await self.direct_sender.send_message(
chat_stream=self.chat_stream,
content="抱歉,由于等待时间过长,我需要先去忙别的了。下次再聊吧~",
reply_to_message=latest_message,
)
except Exception as e:
logger.error(f"发送超时消息失败: {str(e)}")
async def _send_reply(self):
"""发送回复"""
if not self.generated_reply:
logger.warning("没有生成回复")
return
messages = self.chat_observer.get_message_history(limit=1)
if not messages:
logger.warning("没有最近的消息可以回复")
return
latest_message = self._convert_to_message(messages[0])
try:
await self.direct_sender.send_message(
chat_stream=self.chat_stream, content=self.generated_reply, reply_to_message=latest_message
)
self.chat_observer.trigger_update() # 触发立即更新
if not await self.chat_observer.wait_for_update():
logger.warning("等待消息更新超时")
self.state = ConversationState.ANALYZING
except Exception as e:
logger.error(f"发送消息失败: {str(e)}")
self.state = ConversationState.ANALYZING
class DirectMessageSender: class DirectMessageSender:

View File

@@ -0,0 +1,97 @@
from typing import Dict, Optional
from src.common.logger import get_module_logger
from .conversation import Conversation
import traceback
logger = get_module_logger("pfc_manager")
class PFCManager:
"""PFC对话管理器负责管理所有对话实例"""
# 单例模式
_instance = None
# 会话实例管理
_instances: Dict[str, Conversation] = {}
_initializing: Dict[str, bool] = {}
@classmethod
def get_instance(cls) -> 'PFCManager':
"""获取管理器单例
Returns:
PFCManager: 管理器实例
"""
if cls._instance is None:
cls._instance = PFCManager()
return cls._instance
async def get_or_create_conversation(self, stream_id: str) -> Optional[Conversation]:
"""获取或创建对话实例
Args:
stream_id: 聊天流ID
Returns:
Optional[Conversation]: 对话实例创建失败则返回None
"""
# 检查是否已经有实例
if stream_id in self._initializing and self._initializing[stream_id]:
logger.debug(f"会话实例正在初始化中: {stream_id}")
return None
if stream_id in self._instances:
logger.debug(f"使用现有会话实例: {stream_id}")
return self._instances[stream_id]
try:
# 创建新实例
logger.info(f"创建新的对话实例: {stream_id}")
self._initializing[stream_id] = True
# 创建实例
conversation_instance = Conversation(stream_id)
self._instances[stream_id] = conversation_instance
# 启动实例初始化
await self._initialize_conversation(conversation_instance)
except Exception as e:
logger.error(f"创建会话实例失败: {stream_id}, 错误: {e}")
return None
return conversation_instance
async def _initialize_conversation(self, conversation: Conversation):
"""初始化会话实例
Args:
conversation: 要初始化的会话实例
"""
stream_id = conversation.stream_id
try:
logger.info(f"开始初始化会话实例: {stream_id}")
# 启动初始化流程
await conversation._initialize()
# 标记初始化完成
self._initializing[stream_id] = False
logger.info(f"会话实例 {stream_id} 初始化完成")
except Exception as e:
logger.error(f"管理器初始化会话实例失败: {stream_id}, 错误: {e}")
logger.error(traceback.format_exc())
# 清理失败的初始化
async def get_conversation(self, stream_id: str) -> Optional[Conversation]:
"""获取已存在的会话实例
Args:
stream_id: 聊天流ID
Returns:
Optional[Conversation]: 会话实例不存在则返回None
"""
return self._instances.get(stream_id)

View File

@@ -0,0 +1,21 @@
from enum import Enum
from typing import Literal
class ConversationState(Enum):
"""对话状态"""
INIT = "初始化"
RETHINKING = "重新思考"
ANALYZING = "分析历史"
PLANNING = "规划目标"
GENERATING = "生成回复"
CHECKING = "检查回复"
SENDING = "发送消息"
FETCHING = "获取知识"
WAITING = "等待"
LISTENING = "倾听"
ENDED = "结束"
JUDGING = "判断"
ActionType = Literal["direct_reply", "fetch_knowledge", "wait"]

View File

@@ -33,7 +33,7 @@ class ReplyChecker:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划) Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
""" """
# 获取最新的消息记录 # 获取最新的消息记录
messages = self.chat_observer.get_message_history(limit=5) messages = self.chat_observer.get_cached_messages(limit=5)
chat_history_text = "" chat_history_text = ""
for msg in messages: for msg in messages:
time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S") time_str = datetime.datetime.fromtimestamp(msg["time"]).strftime("%H:%M:%S")

View File

@@ -0,0 +1,126 @@
from typing import Tuple
from src.common.logger import get_module_logger
from ..models.utils_model import LLM_request
from ..config.config import global_config
from .chat_observer import ChatObserver
from .reply_checker import ReplyChecker
from src.individuality.individuality import Individuality
from .observation_info import ObservationInfo
from .conversation_info import ConversationInfo
logger = get_module_logger("reply_generator")
class ReplyGenerator:
"""回复生成器"""
def __init__(self, stream_id: str):
self.llm = LLM_request(
model=global_config.llm_normal,
temperature=0.7,
max_tokens=300,
request_type="reply_generation"
)
self.personality_info = Individuality.get_instance().get_prompt(type = "personality", x_person = 2, level = 2)
self.name = global_config.BOT_NICKNAME
self.chat_observer = ChatObserver.get_instance(stream_id)
self.reply_checker = ReplyChecker(stream_id)
async def generate(
self,
observation_info: ObservationInfo,
conversation_info: ConversationInfo
) -> str:
"""生成回复
Args:
goal: 对话目标
chat_history: 聊天历史
knowledge_cache: 知识缓存
previous_reply: 上一次生成的回复(如果有)
retry_count: 当前重试次数
Returns:
str: 生成的回复
"""
# 构建提示词
logger.debug(f"开始生成回复:当前目标: {conversation_info.goal_list}")
goal_list = conversation_info.goal_list
goal_text = ""
for goal, reason in goal_list:
goal_text += f"目标:{goal};"
goal_text += f"原因:{reason}\n"
# 获取聊天历史记录
chat_history_list = observation_info.chat_history
chat_history_text = ""
for msg in chat_history_list:
chat_history_text += f"{msg}\n"
# 整理知识缓存
knowledge_text = ""
knowledge_list = conversation_info.knowledge_list
for knowledge in knowledge_list:
knowledge_text += f"知识:{knowledge}\n"
personality_text = f"你的名字是{self.name}{self.personality_info}"
prompt = f"""{personality_text}。现在你在参与一场QQ聊天请根据以下信息生成回复
当前对话目标:{goal_text}
{knowledge_text}
最近的聊天记录:
{chat_history_text}
请根据上述信息,以你的性格特征生成一个自然、得体的回复。回复应该:
1. 符合对话目标,以""的角度发言
2. 体现你的性格特征
3. 自然流畅,像正常聊天一样,简短
4. 适当利用相关知识,但不要生硬引用
请注意把握聊天内容,不要回复的太有条理,可以有个性。请分清""和对方说的话,不要把""说的话当做对方说的话,这是你自己说的话。
请你回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,尽量不要说你说过的话
请你注意不要输出多余内容(包括前后缀,冒号和引号,括号,表情等),只输出回复内容。
不要输出多余内容(包括前后缀冒号和引号括号表情包at或 @等 )。
请直接输出回复内容,不需要任何额外格式。"""
try:
content, _ = await self.llm.generate_response_async(prompt)
logger.info(f"生成的回复: {content}")
# is_new = self.chat_observer.check()
# logger.debug(f"再看一眼聊天记录,{'有' if is_new else '没有'}新消息")
# 如果有新消息,重新生成回复
# if is_new:
# logger.info("检测到新消息,重新生成回复")
# return await self.generate(
# goal, chat_history, knowledge_cache,
# None, retry_count
# )
return content
except Exception as e:
logger.error(f"生成回复时出错: {e}")
return "抱歉,我现在有点混乱,让我重新思考一下..."
async def check_reply(
self,
reply: str,
goal: str,
retry_count: int = 0
) -> Tuple[bool, str, bool]:
"""检查回复是否合适
Args:
reply: 生成的回复
goal: 对话目标
retry_count: 当前重试次数
Returns:
Tuple[bool, str, bool]: (是否合适, 原因, 是否需要重新规划)
"""
return await self.reply_checker.check(reply, goal, retry_count)

45
src/plugins/PFC/waiter.py Normal file
View File

@@ -0,0 +1,45 @@
from src.common.logger import get_module_logger
from .chat_observer import ChatObserver
logger = get_module_logger("waiter")
class Waiter:
"""等待器,用于等待对话流中的事件"""
def __init__(self, stream_id: str):
self.stream_id = stream_id
self.chat_observer = ChatObserver.get_instance(stream_id)
async def wait(self, timeout: float = 20.0) -> bool:
"""等待用户回复或超时
Args:
timeout: 超时时间(秒)
Returns:
bool: 如果因为超时返回则为True否则为False
"""
try:
message_before = self.chat_observer.get_last_message()
# 等待新消息
logger.debug(f"等待新消息,超时时间: {timeout}")
is_timeout = await self.chat_observer.wait_for_update(timeout=timeout)
if is_timeout:
logger.debug("等待超时,没有收到新消息")
return True
# 检查是否是新消息
message_after = self.chat_observer.get_last_message()
if message_before and message_after and message_before.get("message_id") == message_after.get("message_id"):
# 如果消息ID相同说明没有新消息
logger.debug("没有收到新消息")
return True
logger.debug("收到新消息")
return False
except Exception as e:
logger.error(f"等待时出错: {str(e)}")
return True

View File

@@ -1,14 +1,13 @@
from ..moods.moods import MoodManager # 导入情绪管理器 from ..moods.moods import MoodManager # 导入情绪管理器
from ..config.config import global_config from ..config.config import global_config
from .message import MessageRecv from .message import MessageRecv
from ..PFC.pfc import Conversation, ConversationState from ..PFC.pfc_manager import PFCManager
from .chat_stream import chat_manager from .chat_stream import chat_manager
from ..chat_module.only_process.only_message_process import MessageProcessor from ..chat_module.only_process.only_message_process import MessageProcessor
from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig
from ..chat_module.think_flow_chat.think_flow_chat import ThinkFlowChat from ..chat_module.think_flow_chat.think_flow_chat import ThinkFlowChat
from ..chat_module.reasoning_chat.reasoning_chat import ReasoningChat from ..chat_module.reasoning_chat.reasoning_chat import ReasoningChat
import asyncio
import traceback import traceback
# 定义日志配置 # 定义日志配置
@@ -32,9 +31,14 @@ class ChatBot:
self.reasoning_chat = ReasoningChat() self.reasoning_chat = ReasoningChat()
self.only_process_chat = MessageProcessor() self.only_process_chat = MessageProcessor()
# 创建初始化PFC管理器的任务会在_ensure_started时执行
self.pfc_manager = PFCManager.get_instance()
async def _ensure_started(self): async def _ensure_started(self):
"""确保所有任务已启动""" """确保所有任务已启动"""
if not self._started: if not self._started:
logger.info("确保ChatBot所有任务已启动")
self._started = True self._started = True
async def _create_PFC_chat(self, message: MessageRecv): async def _create_PFC_chat(self, message: MessageRecv):
@@ -42,27 +46,11 @@ class ChatBot:
chat_id = str(message.chat_stream.stream_id) chat_id = str(message.chat_stream.stream_id)
if global_config.enable_pfc_chatting: if global_config.enable_pfc_chatting:
# 获取或创建对话实例
conversation = await Conversation.get_instance(chat_id)
if conversation is None:
logger.error(f"创建或获取对话实例失败: {chat_id}")
return
# 如果是新创建的实例,启动对话系统 await self.pfc_manager.get_or_create_conversation(chat_id)
if conversation.state == ConversationState.INIT:
asyncio.create_task(conversation.start())
logger.info(f"为聊天 {chat_id} 创建新的对话实例")
elif conversation.state == ConversationState.ENDED:
# 如果实例已经结束,重新创建
await Conversation.remove_instance(chat_id)
conversation = await Conversation.get_instance(chat_id)
if conversation is None:
logger.error(f"重新创建对话实例失败: {chat_id}")
return
asyncio.create_task(conversation.start())
logger.info(f"为聊天 {chat_id} 重新创建对话实例")
except Exception as e: except Exception as e:
logger.error(f"创建PFC聊天失败: {e}") logger.error(f"创建PFC聊天失败: {e}")
async def message_process(self, message_data: str) -> None: async def message_process(self, message_data: str) -> None:
"""处理转化后的统一格式消息 """处理转化后的统一格式消息
@@ -90,6 +78,9 @@ class ChatBot:
- 性能计时 - 性能计时
""" """
try: try:
# 确保所有任务已启动
await self._ensure_started()
message = MessageRecv(message_data) message = MessageRecv(message_data)
groupinfo = message.message_info.group_info groupinfo = message.message_info.group_info
userinfo = message.message_info.user_info userinfo = message.message_info.user_info

View File

@@ -24,10 +24,11 @@ config_config = LogConfig(
# 配置主程序日志格式 # 配置主程序日志格式
logger = get_module_logger("config", config=config_config) logger = get_module_logger("config", config=config_config)
# 考虑到实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 #考虑到实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码
is_test = False is_test = True
mai_version_main = "0.6.1" mai_version_main = "0.6.2"
mai_version_fix = "" mai_version_fix = "snapshot-1"
if mai_version_fix: if mai_version_fix:
if is_test: if is_test:
mai_version = f"test-{mai_version_main}-{mai_version_fix}" mai_version = f"test-{mai_version_main}-{mai_version_fix}"
@@ -454,6 +455,7 @@ class BotConfig:
config.emoji_response_penalty = willing_config.get( config.emoji_response_penalty = willing_config.get(
"emoji_response_penalty", config.emoji_response_penalty "emoji_response_penalty", config.emoji_response_penalty
) )
if config.INNER_VERSION in SpecifierSet(">=1.2.5"):
config.mentioned_bot_inevitable_reply = willing_config.get( config.mentioned_bot_inevitable_reply = willing_config.get(
"mentioned_bot_inevitable_reply", config.mentioned_bot_inevitable_reply "mentioned_bot_inevitable_reply", config.mentioned_bot_inevitable_reply
) )

View File

@@ -2,7 +2,7 @@ import threading
import time import time
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict from typing import Any, Dict, List
from src.common.logger import get_module_logger from src.common.logger import get_module_logger
from ...common.database import db from ...common.database import db
@@ -22,6 +22,7 @@ class LLMStatistics:
self.stats_thread = None self.stats_thread = None
self.console_thread = None self.console_thread = None
self._init_database() self._init_database()
self.name_dict: Dict[List] = {}
def _init_database(self): def _init_database(self):
"""初始化数据库集合""" """初始化数据库集合"""
@@ -137,16 +138,24 @@ class LLMStatistics:
# user_id = str(doc.get("user_info", {}).get("user_id", "unknown")) # user_id = str(doc.get("user_info", {}).get("user_id", "unknown"))
chat_info = doc.get("chat_info", {}) chat_info = doc.get("chat_info", {})
user_info = doc.get("user_info", {}) user_info = doc.get("user_info", {})
message_time = doc.get("time", 0)
group_info = chat_info.get("group_info") if chat_info else {} group_info = chat_info.get("group_info") if chat_info else {}
# print(f"group_info: {group_info}") # print(f"group_info: {group_info}")
group_name = None group_name = None
if group_info: if group_info:
group_id = f"g{group_info.get('group_id')}"
group_name = group_info.get("group_name", f"{group_info.get('group_id')}") group_name = group_info.get("group_name", f"{group_info.get('group_id')}")
if user_info and not group_name: if user_info and not group_name:
group_id = f"u{user_info['user_id']}"
group_name = user_info["user_nickname"] group_name = user_info["user_nickname"]
if self.name_dict.get(group_id):
if message_time > self.name_dict.get(group_id)[1]:
self.name_dict[group_id] = [group_name, message_time]
else:
self.name_dict[group_id] = [group_name, message_time]
# print(f"group_name: {group_name}") # print(f"group_name: {group_name}")
stats["messages_by_user"][user_id] += 1 stats["messages_by_user"][user_id] += 1
stats["messages_by_chat"][group_name] += 1 stats["messages_by_chat"][group_id] += 1
return stats return stats
@@ -187,7 +196,7 @@ class LLMStatistics:
tokens = stats["tokens_by_model"][model_name] tokens = stats["tokens_by_model"][model_name]
cost = stats["costs_by_model"][model_name] cost = stats["costs_by_model"][model_name]
output.append( output.append(
data_fmt.format(model_name[:32] + ".." if len(model_name) > 32 else model_name, count, tokens, cost) data_fmt.format(model_name[:30] + ".." if len(model_name) > 32 else model_name, count, tokens, cost)
) )
output.append("") output.append("")
@@ -221,8 +230,8 @@ class LLMStatistics:
# 添加聊天统计 # 添加聊天统计
output.append("群组统计:") output.append("群组统计:")
output.append(("群组名称 消息数量")) output.append(("群组名称 消息数量"))
for group_name, count in sorted(stats["messages_by_chat"].items()): for group_id, count in sorted(stats["messages_by_chat"].items()):
output.append(f"{group_name[:32]:<32} {count:>10}") output.append(f"{self.name_dict[group_id][0][:32]:<32} {count:>10}")
return "\n".join(output) return "\n".join(output)
@@ -250,7 +259,7 @@ class LLMStatistics:
tokens = stats["tokens_by_model"][model_name] tokens = stats["tokens_by_model"][model_name]
cost = stats["costs_by_model"][model_name] cost = stats["costs_by_model"][model_name]
output.append( output.append(
data_fmt.format(model_name[:32] + ".." if len(model_name) > 32 else model_name, count, tokens, cost) data_fmt.format(model_name[:30] + ".." if len(model_name) > 32 else model_name, count, tokens, cost)
) )
output.append("") output.append("")
@@ -284,8 +293,8 @@ class LLMStatistics:
# 添加聊天统计 # 添加聊天统计
output.append("群组统计:") output.append("群组统计:")
output.append(("群组名称 消息数量")) output.append(("群组名称 消息数量"))
for group_name, count in sorted(stats["messages_by_chat"].items()): for group_id, count in sorted(stats["messages_by_chat"].items()):
output.append(f"{group_name[:32]:<32} {count:>10}") output.append(f"{self.name_dict[group_id][0][:32]:<32} {count:>10}")
return "\n".join(output) return "\n".join(output)

View File

@@ -158,12 +158,12 @@ class WillingManager:
logger.debug(f"被提及, 当前意愿: {current_willing}") logger.debug(f"被提及, 当前意愿: {current_willing}")
if is_emoji: if is_emoji:
current_willing *= 0.1 current_willing = global_config.emoji_response_penalty * 0.1
logger.debug(f"表情包, 当前意愿: {current_willing}") logger.debug(f"表情包, 当前意愿: {current_willing}")
# 根据话题兴趣度适当调整 # 根据话题兴趣度适当调整
if interested_rate > 0.5: if interested_rate > 0.5:
current_willing += (interested_rate - 0.5) * 0.5 current_willing += (interested_rate - 0.5) * 0.5 * global_config.response_interested_rate_amplifier
# 根据当前模式计算回复概率 # 根据当前模式计算回复概率
base_probability = 0.0 base_probability = 0.0
@@ -180,7 +180,7 @@ class WillingManager:
base_probability = 0.30 if msg_count >= 15 else 0.03 * min(msg_count, 10) base_probability = 0.30 if msg_count >= 15 else 0.03 * min(msg_count, 10)
# 考虑回复意愿的影响 # 考虑回复意愿的影响
reply_probability = base_probability * current_willing reply_probability = base_probability * current_willing * global_config.response_willing_amplifier
# 检查群组权限(如果是群聊) # 检查群组权限(如果是群聊)
if chat_stream.group_info and config: if chat_stream.group_info and config:

View File

@@ -1,5 +1,5 @@
[inner] [inner]
version = "1.2.4" version = "1.2.5"
#以下是给开发人员阅读的,一般用户不需要阅读 #以下是给开发人员阅读的,一般用户不需要阅读