feat:移除watching,增加回复生成缓冲,添加头部动作

This commit is contained in:
SengokuCola
2025-07-17 22:25:33 +08:00
parent 8768b5d31b
commit 9be97acb00
14 changed files with 497 additions and 512 deletions

View File

@@ -1,5 +1,5 @@
[inner]
version = "1.0.1"
version = "1.1.0"
#----以下是S4U聊天系统配置文件----
# S4U (Smart 4 U) 聊天系统是MaiBot的核心对话模块
@@ -13,8 +13,8 @@ version = "1.0.1"
[s4u]
# 消息管理配置
message_timeout_seconds = 120 # 普通消息存活时间(秒),超过此时间的消息将被丢弃
recent_message_keep_count = 6 # 保留最近N条消息超出范围的普通消息将被移除
message_timeout_seconds = 80 # 普通消息存活时间(秒),超过此时间的消息将被丢弃
recent_message_keep_count = 8 # 保留最近N条消息超出范围的普通消息将被移除
# 优先级系统配置
at_bot_priority_bonus = 100.0 # @机器人时的优先级加成分数
@@ -34,7 +34,99 @@ max_typing_delay = 2.0 # 最大打字延迟(秒)
enable_old_message_cleanup = true # 是否自动清理过旧的普通消息
enable_loading_indicator = true # 是否显示加载提示
enable_streaming_output = true # 是否启用流式输出false时全部生成后一次性发送
enable_streaming_output = false # 是否启用流式输出false时全部生成后一次性发送
max_context_message_length = 20
max_core_message_length = 30
max_context_message_length = 30
max_core_message_length = 20
# 模型配置
[models]
# 主要对话模型配置
[models.chat]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
enable_thinking = false
# 规划模型配置
[models.motion]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
enable_thinking = false
# 情感分析模型配置
[models.emotion]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 记忆模型配置
[models.memory]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 工具使用模型配置
[models.tool_use]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 嵌入模型配置
[models.embedding]
name = "text-embedding-v1"
provider = "OPENAI"
dimension = 1024
# 视觉语言模型配置
[models.vlm]
name = "qwen-vl-plus"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 知识库模型配置
[models.knowledge]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 实体提取模型配置
[models.entity_extract]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 问答模型配置
[models.qa]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
# 兼容性配置已废弃请使用models.motion
[model_motion] # 在麦麦的一些组件中使用的小模型,消耗量较大,建议使用速度较快的小模型
# 强烈建议使用免费的小模型
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
enable_thinking = false # 是否启用思考

View File

@@ -1,5 +1,5 @@
[inner]
version = "1.0.1"
version = "1.1.0"
#----以下是S4U聊天系统配置文件----
# S4U (Smart 4 U) 聊天系统是MaiBot的核心对话模块
@@ -32,9 +32,36 @@ max_typing_delay = 2.0 # 最大打字延迟(秒)
# 系统功能开关
enable_old_message_cleanup = true # 是否自动清理过旧的普通消息
enable_loading_indicator = true # 是否显示加载提示
enable_streaming_output = true # 是否启用流式输出false时全部生成后一次性发送
max_context_message_length = 20
max_core_message_length = 30
# 模型配置
[models]
# 主要对话模型配置
[models.chat]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
enable_thinking = false
# 规划模型配置
[models.motion]
name = "qwen3-32b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7
enable_thinking = false
# 情感分析模型配置
[models.emotion]
name = "qwen3-8b"
provider = "BAILIAN"
pri_in = 0.5
pri_out = 2
temp = 0.7

View File

@@ -1,134 +0,0 @@
# SuperChat管理器使用说明
## 概述
SuperChat管理器是用于管理和跟踪超级弹幕消息的核心组件。它能够根据SuperChat的金额自动设置不同的存活时间并提供多种格式的字符串构建功能。
## 主要功能
### 1. 自动记录SuperChat
当收到SuperChat消息时管理器会自动记录以下信息
- 用户ID和昵称
- 平台信息
- 聊天ID
- SuperChat金额和消息内容
- 时间戳和过期时间
- 群组名称(如果适用)
### 2. 基于金额的存活时间
SuperChat的存活时间根据金额阶梯设置
| 金额范围 | 存活时间 |
|---------|---------|
| ≥500元 | 4小时 |
| 200-499元 | 2小时 |
| 100-199元 | 1小时 |
| 50-99元 | 30分钟 |
| 20-49元 | 15分钟 |
| 10-19元 | 10分钟 |
| <10元 | 5分钟 |
### 3. 自动清理
管理器每30秒自动检查并清理过期的SuperChat记录保持内存使用的高效性
## 使用方法
### 基本用法
```python
from src.mais4u.mais4u_chat.super_chat_manager import get_super_chat_manager
# 获取全局管理器实例
super_chat_manager = get_super_chat_manager()
# 添加SuperChat通常在消息处理时自动调用
await super_chat_manager.add_superchat(message)
# 获取指定聊天的SuperChat显示字符串
display_string = super_chat_manager.build_superchat_display_string(chat_id, max_count=10)
# 获取摘要信息
summary = super_chat_manager.build_superchat_summary_string(chat_id)
# 获取统计信息
stats = super_chat_manager.get_superchat_statistics(chat_id)
```
### 结合S4UChat使用
```python
from src.mais4u.mais4u_chat.s4u_chat import get_s4u_chat_manager
# 获取S4UChat实例
s4u_manager = get_s4u_chat_manager()
s4u_chat = s4u_manager.get_or_create_chat(chat_stream)
# 便捷方法获取SuperChat信息
display_string = s4u_chat.get_superchat_display_string(max_count=10)
summary = s4u_chat.get_superchat_summary_string()
stats = s4u_chat.get_superchat_statistics()
```
## API 参考
### SuperChatManager类
#### 主要方法
- `add_superchat(message: MessageRecvS4U)`: 添加SuperChat记录
- `get_superchats_by_chat(chat_id: str)`: 获取指定聊天的有效SuperChat列表
- `build_superchat_display_string(chat_id: str, max_count: int = 10)`: 构建显示字符串
- `build_superchat_summary_string(chat_id: str)`: 构建摘要字符串
- `get_superchat_statistics(chat_id: str)`: 获取统计信息
#### 输出格式示例
**显示字符串格式:**
```
📢 当前有效超级弹幕:
1. 【100元】用户名: 消息内容 (剩余25分30秒)
2. 【50元】用户名: 消息内容 (剩余10分15秒)
... 还有3条SuperChat
```
**摘要字符串格式:**
```
当前有5条超级弹幕总金额350元最高单笔100元
```
**统计信息格式:**
```python
{
"count": 5,
"total_amount": 350.0,
"average_amount": 70.0,
"highest_amount": 100.0,
"lowest_amount": 20.0
}
```
### S4UChat扩展方法
- `get_superchat_display_string(max_count: int = 10)`: 获取当前聊天的SuperChat显示字符串
- `get_superchat_summary_string()`: 获取当前聊天的SuperChat摘要字符串
- `get_superchat_statistics()`: 获取当前聊天的SuperChat统计信息
## 集成说明
SuperChat管理器已经集成到S4U聊天系统中
1. **自动处理**: 当S4UChat收到SuperChat消息时会自动调用管理器记录
2. **内存管理**: 管理器会自动清理过期的SuperChat无需手动管理
3. **全局单例**: 使用全局单例模式确保所有聊天共享同一个管理器实例
## 注意事项
1. SuperChat管理器是全局单例在应用程序整个生命周期中保持运行
2. 过期时间基于消息金额自动计算无需手动设置
3. 管理器会自动处理异常情况如无效的价格格式等
4. 清理任务在后台异步运行不会阻塞主要功能
## 示例文件
参考 `superchat_example.py` 文件查看完整的使用示例

View File

@@ -1,6 +1,6 @@
import json
import time
import random
from src.chat.message_receive.message import MessageRecv
from src.llm_models.utils_model import LLMRequest
from src.common.logger import get_logger
@@ -8,10 +8,35 @@ from src.chat.utils.chat_message_builder import build_readable_messages, get_raw
from src.config.config import global_config
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.plugin_system.apis import send_api
from json_repair import repair_json
from src.mais4u.s4u_config import s4u_config
logger = get_logger("action")
HEAD_CODE = {
"看向上方": "(0,0.5,0)",
"看向下方": "(0,-0.5,0)",
"看向左边": "(-1,0,0)",
"看向右边": "(1,0,0)",
"随意朝向": "random",
"看向摄像机": "camera",
"注视对方": "(0,0,0)",
"看向正前方": "(0,0,0)",
}
BODY_CODE = {
"双手背后向前弯腰": "010_0070",
"歪头双手合十": "010_0100",
"标准文静站立": "010_0101",
"双手交叠腹部站立": "010_0150",
"帅气的姿势": "010_0190",
"另一个帅气的姿势": "010_0191",
"手掌朝前可爱": "010_0210",
"平静,双手后放":"平静,双手后放",
"思考": "思考"
}
def init_prompt():
Prompt(
@@ -21,16 +46,15 @@ def init_prompt():
{indentify_block}
你现在的动作状态是:
- 手部:{hand_action}
- 上半身:{upper_body_action}
- 头部:{head_action}
- 身体动作:{body_action}
现在,因为你发送了消息,或者群里其他人发送了消息,引起了你的注意,你对其进行了阅读和思考,请你更新你的动作状态。
请只按照以下json格式输出描述你新的动作状态每个动作一到三个中文词确保每个字段都存在
身体动作可选
{all_actions}
请只按照以下json格式输出描述你新的动作状态确保每个字段都存在
{{
"hand_action": "...",
"upper_body_action": "...",
"head_action": "..."
"body_action": "..."
}}
""",
"change_action_prompt",
@@ -41,17 +65,16 @@ def init_prompt():
以上是群里最近的聊天记录
{indentify_block}
你之前的动作状态是
- 手部:{hand_action}
- 上半身:{upper_body_action}
- 头部:{head_action}
你之前的动作状态是
- 身体动作:{body_action}
身体动作可选:
{all_actions}
距离你上次关注群里消息已经过去了一段时间,你冷静了下来,你的动作会趋于平缓或静止,请你输出你现在新的动作状态,用中文。
请只按照以下json格式输出描述你新的动作状态每个动作一到三个词,确保每个字段都存在:
请只按照以下json格式输出描述你新的动作状态确保每个字段都存在
{{
"hand_action": "...",
"upper_body_action": "...",
"head_action": "..."
"body_action": "..."
}}
""",
"regress_action_prompt",
@@ -62,19 +85,38 @@ class ChatAction:
def __init__(self, chat_id: str):
self.chat_id: str = chat_id
self.hand_action: str = "双手放在桌面"
self.upper_body_action: str = "坐着"
self.body_action: str = "坐着"
self.head_action: str = "注视摄像机"
self.regression_count: int = 0
# 新增body_action冷却池key为动作名value为剩余冷却次数
self.body_action_cooldown: dict[str, int] = {}
print(s4u_config.models.motion)
print(global_config.model.emotion)
self.action_model = LLMRequest(
model=global_config.model.emotion,
temperature=0.7,
request_type="action",
request_type="motion",
)
self.last_change_time = 0
async def send_action_update(self):
"""发送动作更新到前端"""
body_code = BODY_CODE.get(self.body_action, "")
await send_api.custom_to_stream(
message_type="body_action",
content=body_code,
stream_id=self.chat_id,
storage_message=False,
show_log=True,
)
async def update_action_by_message(self, message: MessageRecv):
self.regression_count = 0
@@ -105,28 +147,42 @@ class ChatAction:
prompt_personality = global_config.personality.personality_core
indentify_block = f"你的名字是{bot_name}{bot_nickname},你{prompt_personality}"
prompt = await global_prompt_manager.format_prompt(
"change_action_prompt",
chat_talking_prompt=chat_talking_prompt,
indentify_block=indentify_block,
hand_action=self.hand_action,
upper_body_action=self.upper_body_action,
head_action=self.head_action,
)
try:
# 冷却池处理:过滤掉冷却中的动作
self._update_body_action_cooldown()
available_actions = [k for k in BODY_CODE.keys() if k not in self.body_action_cooldown]
all_actions = "\n".join(available_actions)
logger.info(f"prompt: {prompt}")
response, (reasoning_content, model_name) = await self.action_model.generate_response_async(prompt=prompt)
logger.info(f"response: {response}")
logger.info(f"reasoning_content: {reasoning_content}")
prompt = await global_prompt_manager.format_prompt(
"change_action_prompt",
chat_talking_prompt=chat_talking_prompt,
indentify_block=indentify_block,
body_action=self.body_action,
all_actions=all_actions,
)
action_data = json.loads(repair_json(response))
logger.info(f"prompt: {prompt}")
response, (reasoning_content, model_name) = await self.action_model.generate_response_async(prompt=prompt)
logger.info(f"response: {response}")
logger.info(f"reasoning_content: {reasoning_content}")
if action_data:
self.hand_action = action_data.get("hand_action", self.hand_action)
self.upper_body_action = action_data.get("upper_body_action", self.upper_body_action)
self.head_action = action_data.get("head_action", self.head_action)
action_data = json.loads(repair_json(response))
self.last_change_time = message_time
if action_data:
# 记录原动作,切换后进入冷却
prev_body_action = self.body_action
new_body_action = action_data.get("body_action", self.body_action)
if new_body_action != prev_body_action:
if prev_body_action:
self.body_action_cooldown[prev_body_action] = 3
self.body_action = new_body_action
self.head_action = action_data.get("head_action", self.head_action)
# 发送动作更新
await self.send_action_update()
self.last_change_time = message_time
except Exception as e:
logger.error(f"update_action_by_message error: {e}")
async def regress_action(self):
message_time = time.time()
@@ -134,7 +190,7 @@ class ChatAction:
chat_id=self.chat_id,
timestamp_start=self.last_change_time,
timestamp_end=message_time,
limit=15,
limit=10,
limit_mode="last",
)
chat_talking_prompt = build_readable_messages(
@@ -155,33 +211,56 @@ class ChatAction:
prompt_personality = global_config.personality.personality_core
indentify_block = f"你的名字是{bot_name}{bot_nickname},你{prompt_personality}"
try:
prompt = await global_prompt_manager.format_prompt(
"regress_action_prompt",
chat_talking_prompt=chat_talking_prompt,
indentify_block=indentify_block,
hand_action=self.hand_action,
upper_body_action=self.upper_body_action,
head_action=self.head_action,
)
# 冷却池处理:过滤掉冷却中的动作
self._update_body_action_cooldown()
available_actions = [k for k in BODY_CODE.keys() if k not in self.body_action_cooldown]
all_actions = "\n".join(available_actions)
logger.info(f"prompt: {prompt}")
response, (reasoning_content, model_name) = await self.action_model.generate_response_async(prompt=prompt)
logger.info(f"response: {response}")
logger.info(f"reasoning_content: {reasoning_content}")
prompt = await global_prompt_manager.format_prompt(
"regress_action_prompt",
chat_talking_prompt=chat_talking_prompt,
indentify_block=indentify_block,
body_action=self.body_action,
all_actions=all_actions,
)
action_data = json.loads(repair_json(response))
if action_data:
self.hand_action = action_data.get("hand_action", self.hand_action)
self.upper_body_action = action_data.get("upper_body_action", self.upper_body_action)
self.head_action = action_data.get("head_action", self.head_action)
logger.info(f"prompt: {prompt}")
response, (reasoning_content, model_name) = await self.action_model.generate_response_async(prompt=prompt)
logger.info(f"response: {response}")
logger.info(f"reasoning_content: {reasoning_content}")
self.regression_count += 1
action_data = json.loads(repair_json(response))
if action_data:
prev_body_action = self.body_action
new_body_action = action_data.get("body_action", self.body_action)
if new_body_action != prev_body_action:
if prev_body_action:
self.body_action_cooldown[prev_body_action] = 6
self.body_action = new_body_action
# 发送动作更新
await self.send_action_update()
self.regression_count += 1
self.last_change_time = message_time
except Exception as e:
logger.error(f"regress_action error: {e}")
# 新增:冷却池维护方法
def _update_body_action_cooldown(self):
remove_keys = []
for k in self.body_action_cooldown:
self.body_action_cooldown[k] -= 1
if self.body_action_cooldown[k] <= 0:
remove_keys.append(k)
for k in remove_keys:
del self.body_action_cooldown[k]
class ActionRegressionTask(AsyncTask):
def __init__(self, action_manager: "ActionManager"):
super().__init__(task_name="ActionRegressionTask", run_interval=30)
super().__init__(task_name="ActionRegressionTask", run_interval=3)
self.action_manager = action_manager
async def run(self):
@@ -191,7 +270,7 @@ class ActionRegressionTask(AsyncTask):
if action_state.last_change_time == 0:
continue
if now - action_state.last_change_time > 180:
if now - action_state.last_change_time > 10:
if action_state.regression_count >= 3:
continue
@@ -225,15 +304,8 @@ class ActionManager:
self.action_state_list.append(new_action_state)
return new_action_state
def reset_action_state_by_chat_id(self, chat_id: str):
for action_state in self.action_state_list:
if action_state.chat_id == chat_id:
action_state.hand_action = "双手放在桌面"
action_state.upper_body_action = "坐着"
action_state.head_action = "注视摄像机"
action_state.regression_count = 0
return
self.action_state_list.append(ChatAction(chat_id))
init_prompt()

View File

@@ -1,22 +0,0 @@
from src.plugin_system.apis import send_api
async def send_loading(chat_id: str, content: str):
await send_api.custom_to_stream(
message_type="loading",
content=content,
stream_id=chat_id,
storage_message=False,
show_log=True,
)
async def send_unloading(chat_id: str):
await send_api.custom_to_stream(
message_type="loading",
content="",
stream_id=chat_id,
storage_message=False,
show_log=True,
)

View File

@@ -1,110 +0,0 @@
import time
import heapq
import math
import json
from typing import List, Optional
from src.common.logger import get_logger
logger = get_logger("normal_chat")
class PrioritizedMessage:
"""带有优先级的消息对象"""
def __init__(self, message_data: dict, interest_scores: List[float], is_vip: bool = False):
self.message_data = message_data
self.arrival_time = time.time()
self.interest_scores = interest_scores
self.is_vip = is_vip
self.priority = self.calculate_priority()
def calculate_priority(self, decay_rate: float = 0.01) -> float:
"""
计算优先级分数。
优先级 = 兴趣分 * exp(-衰减率 * 消息年龄)
"""
age = time.time() - self.arrival_time
decay_factor = math.exp(-decay_rate * age)
return sum(self.interest_scores) + decay_factor
def __lt__(self, other: "PrioritizedMessage") -> bool:
"""用于堆排序的比较函数,我们想要一个最大堆,所以用 >"""
return self.priority > other.priority
class PriorityManager:
"""
管理消息队列,根据优先级选择消息进行处理。
"""
def __init__(self, normal_queue_max_size: int = 5):
self.vip_queue: List[PrioritizedMessage] = [] # VIP 消息队列 (最大堆)
self.normal_queue: List[PrioritizedMessage] = [] # 普通消息队列 (最大堆)
self.normal_queue_max_size = normal_queue_max_size
def add_message(self, message_data: dict, interest_score: float = 0):
"""
添加新消息到合适的队列中。
"""
user_id = message_data.get("user_id")
priority_info_raw = message_data.get("priority_info")
priority_info = {}
if isinstance(priority_info_raw, str):
priority_info = json.loads(priority_info_raw)
elif isinstance(priority_info_raw, dict):
priority_info = priority_info_raw
is_vip = priority_info.get("message_type") == "vip"
message_priority = priority_info.get("message_priority", 0.0)
p_message = PrioritizedMessage(message_data, [interest_score, message_priority], is_vip)
if is_vip:
heapq.heappush(self.vip_queue, p_message)
logger.debug(f"消息来自VIP用户 {user_id}, 已添加到VIP队列. 当前VIP队列长度: {len(self.vip_queue)}")
else:
if len(self.normal_queue) >= self.normal_queue_max_size:
# 如果队列已满,只在消息优先级高于最低优先级消息时才添加
if p_message.priority > self.normal_queue[0].priority:
heapq.heapreplace(self.normal_queue, p_message)
logger.debug(f"普通队列已满,但新消息优先级更高,已替换. 用户: {user_id}")
else:
logger.debug(f"普通队列已满且新消息优先级较低,已忽略. 用户: {user_id}")
else:
heapq.heappush(self.normal_queue, p_message)
logger.debug(
f"消息来自普通用户 {user_id}, 已添加到普通队列. 当前普通队列长度: {len(self.normal_queue)}"
)
def get_highest_priority_message(self) -> Optional[dict]:
"""
从VIP和普通队列中获取当前最高优先级的消息。
"""
# 更新所有消息的优先级
for p_msg in self.vip_queue:
p_msg.priority = p_msg.calculate_priority()
for p_msg in self.normal_queue:
p_msg.priority = p_msg.calculate_priority()
# 重建堆
heapq.heapify(self.vip_queue)
heapq.heapify(self.normal_queue)
vip_msg = self.vip_queue[0] if self.vip_queue else None
normal_msg = self.normal_queue[0] if self.normal_queue else None
if vip_msg:
return heapq.heappop(self.vip_queue).message_data
elif normal_msg:
return heapq.heappop(self.normal_queue).message_data
else:
return None
def is_empty(self) -> bool:
"""检查所有队列是否为空"""
return not self.vip_queue and not self.normal_queue
def get_queue_status(self) -> str:
"""获取队列状态信息"""
return f"VIP队列: {len(self.vip_queue)}, 普通队列: {len(self.normal_queue)}"

View File

@@ -13,11 +13,12 @@ from src.common.message.api import get_global_api
from src.chat.message_receive.storage import MessageStorage
from .s4u_watching_manager import watching_manager
import json
from .s4u_mood_manager import mood_manager
from src.person_info.relationship_builder_manager import relationship_builder_manager
from .loading import send_loading, send_unloading
from src.mais4u.s4u_config import s4u_config
from src.person_info.person_info import PersonInfoManager
from .super_chat_manager import get_super_chat_manager
from .yes_or_no import yes_or_no_head
logger = get_logger("S4U_chat")
@@ -36,6 +37,8 @@ class MessageSenderContainer:
self.msg_id = ""
self.last_msg_id = ""
self.voice_done = ""
@@ -220,7 +223,7 @@ class S4UChat:
return self.interest_dict.get(user_id, 1.0)
def go_processing(self):
if self.voice_done == self.msg_id:
if self.voice_done == self.last_msg_id:
return True
return False
@@ -432,15 +435,12 @@ class S4UChat:
logger.error(f"[{self.stream_name}] Message processor main loop error: {e}", exc_info=True)
await asyncio.sleep(1)
async def delay_change_watching_state(self):
random_delay = random.randint(1, 3)
await asyncio.sleep(random_delay)
chat_watching = watching_manager.get_watching_by_chat_id(self.stream_id)
await chat_watching.on_message_received()
def get_processing_message_id(self):
self.last_msg_id = self.msg_id
self.msg_id = f"{time.time()}_{random.randint(1000, 9999)}"
async def _generate_and_send(self, message: MessageRecv):
"""为单个消息生成文本回复。整个过程可以被中断。"""
self._is_replying = True
@@ -448,20 +448,21 @@ class S4UChat:
self.get_processing_message_id()
if s4u_config.enable_loading_indicator:
await send_loading(self.stream_id, ".........")
# 视线管理:开始生成回复时切换视线状态
chat_watching = watching_manager.get_watching_by_chat_id(self.stream_id)
asyncio.create_task(self.delay_change_watching_state())
await chat_watching.on_reply_start()
sender_container = MessageSenderContainer(self.chat_stream, message)
sender_container.start()
try:
async def generate_and_send_inner():
nonlocal total_chars_sent
logger.info(f"[S4U] 开始为消息生成文本和音频流: '{message.processed_plain_text[:30]}...'")
if s4u_config.enable_streaming_output:
logger.info(f"[S4U] 开始流式输出")
# 流式输出,边生成边发送
gen = self.gpt.generate_response(message, "")
async for chunk in gen:
@@ -469,6 +470,7 @@ class S4UChat:
await sender_container.add_message(chunk)
total_chars_sent += len(chunk)
else:
logger.info(f"[S4U] 开始一次性输出")
# 一次性输出先收集所有chunk
all_chunks = []
gen = self.gpt.generate_response(message, "")
@@ -479,17 +481,36 @@ class S4UChat:
sender_container.msg_id = self.msg_id
await sender_container.add_message("".join(all_chunks))
try:
try:
await asyncio.wait_for(generate_and_send_inner(), timeout=10)
except asyncio.TimeoutError:
logger.warning(f"[{self.stream_name}] 回复生成超时,发送默认回复。")
sender_container.msg_id = self.msg_id
await sender_container.add_message("麦麦不知道哦")
total_chars_sent = len("麦麦不知道哦")
mood = mood_manager.get_mood_by_chat_id(self.stream_id)
await yes_or_no_head(text = total_chars_sent,emotion = mood.mood_state,chat_history=message.processed_plain_text,chat_id=self.stream_id)
# 等待所有文本消息发送完成
await sender_container.close()
await sender_container.join()
await chat_watching.on_thinking_finished()
start_time = time.time()
logged = False
while not self.go_processing():
if time.time() - start_time > 60:
logger.warning(f"[{self.stream_name}] 等待消息发送超时60秒强制跳出循环。")
break
logger.info(f"[{self.stream_name}] 等待消息发送完成...")
await asyncio.sleep(0.3)
if not logged:
logger.info(f"[{self.stream_name}] 等待消息发送完成...")
logged = True
await asyncio.sleep(0.2)
logger.info(f"[{self.stream_name}] 所有文本块处理完毕。")
@@ -503,9 +524,6 @@ class S4UChat:
finally:
self._is_replying = False
if s4u_config.enable_loading_indicator:
await send_unloading(self.stream_id)
# 视线管理:回复结束时切换视线状态
chat_watching = watching_manager.get_watching_by_chat_id(self.stream_id)
await chat_watching.on_reply_finished()
@@ -534,7 +552,3 @@ class S4UChat:
except asyncio.CancelledError:
logger.info(f"处理任务已成功取消: {self.stream_name}")
# 注意SuperChat管理器是全局的不需要在单个S4UChat关闭时关闭
# 如果需要关闭SuperChat管理器应该在应用程序关闭时调用
# super_chat_manager = get_super_chat_manager()
# await super_chat_manager.shutdown()

View File

@@ -168,7 +168,7 @@ class ChatMood:
chat_id=self.chat_id,
timestamp_start=self.last_change_time,
timestamp_end=message_time,
limit=15,
limit=10,
limit_mode="last",
)
chat_talking_prompt = build_readable_messages(
@@ -245,7 +245,7 @@ class ChatMood:
chat_id=self.chat_id,
timestamp_start=self.last_change_time,
timestamp_end=message_time,
limit=15,
limit=5,
limit_mode="last",
)
chat_talking_prompt = build_readable_messages(

View File

@@ -126,7 +126,7 @@ class S4UMessageProcessor:
asyncio.create_task(chat_action.update_action_by_message(message))
# 视线管理:收到消息时切换视线状态
chat_watching = watching_manager.get_watching_by_chat_id(chat.stream_id)
asyncio.create_task(chat_watching.on_message_received())
await chat_watching.on_message_received()
# 上下文网页管理启动独立task处理消息上下文
asyncio.create_task(self._handle_context_web_update(chat.stream_id, message))
@@ -200,6 +200,8 @@ class S4UMessageProcessor:
await context_manager.start_server()
# 添加消息到上下文并更新网页
await asyncio.sleep(1.5)
await context_manager.add_message(chat_id, message)
logger.debug(f"✅ 上下文网页更新完成: {message.message_info.user_info.user_nickname}")

View File

@@ -15,6 +15,7 @@ from src.chat.message_receive.chat_stream import ChatStream
from src.mais4u.mais4u_chat.super_chat_manager import get_super_chat_manager
from src.mais4u.mais4u_chat.screen_manager import screen_manager
from src.chat.express.expression_selector import expression_selector
from .s4u_mood_manager import mood_manager
logger = get_logger("prompt")
@@ -28,7 +29,7 @@ def init_prompt():
"""
你的名字是麦麦, 是千石可乐开发的程序可以在QQ微信等平台发言你现在正在哔哩哔哩作为虚拟主播进行直播
虽然是个AI但你在努力扮演真人, 人设的配色是橙色和紫色和绿色
你有一头法式橙色卷发,你很可爱,穿戴英式侦探风格学院风裙子和帽子,你正在一个教室场景里进行虚拟线上直播。
你有一头法式橙色卷发,你很可爱,穿戴英式侦探风格学院风裙子和帽子,你正在一个教室场景里进行虚拟线上直播。不过不要刻意提及这些内容,自然回复
你可以看见用户发送的弹幕礼物和superchat
你可以看见面前的屏幕,目前屏幕的内容是:
{screen_info}
@@ -49,8 +50,8 @@ def init_prompt():
对方最新发送的内容:{message_txt}
{gift_info}
回复可以简短一些可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些
表现的有个性,不要随意服从他人要求,积极互动。
回复简短一些,平淡一些,可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞。
表现的有个性,不要随意服从他人要求,积极互动。你现在的心情是:{mood_state}
不要输出多余内容(包括前后缀,冒号和引号,括号()表情包at或 @等 )。只输出回复内容,现在{sender_name}正在等待你的回复。
你的回复风格不要浮夸,有逻辑和条理,请你继续回复{sender_name}
你的发言:
@@ -144,7 +145,7 @@ class PromptBuilder:
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=chat_stream.stream_id,
timestamp=time.time(),
limit=200,
limit=300,
)
talk_type = message.message_info.platform + ":" + str(message.chat_stream.user_info.user_id)
@@ -254,6 +255,8 @@ class PromptBuilder:
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
mood = mood_manager.get_mood_by_chat_id(chat_stream.stream_id)
template_name = "s4u_prompt"
prompt = await global_prompt_manager.format_prompt(
@@ -269,6 +272,7 @@ class PromptBuilder:
core_dialogue_prompt=core_dialogue_prompt,
background_dialogue_prompt=background_dialogue_prompt,
message_txt=message_txt,
mood_state=mood.mood_state,
)
print(prompt)

View File

@@ -72,12 +72,12 @@ class S4UStreamGenerator:
# 构建prompt
if previous_reply_context:
message_txt = f"""
你正在回复用户的消息,但中途被打断了。这是已有的对话上下文:
[你已经对上一条消息说的话]: {previous_reply_context}
---
[这是用户发来的新消息, 你需要结合上下文,对此进行回复]:
{message.processed_plain_text}
"""
你正在回复用户的消息,但中途被打断了。这是已有的对话上下文:
[你已经对上一条消息说的话]: {previous_reply_context}
---
[这是用户发来的新消息, 你需要结合上下文,对此进行回复]:
{message.processed_plain_text}
"""
else:
message_txt = message.processed_plain_text

View File

@@ -40,102 +40,44 @@ from src.plugin_system.apis import send_api
logger = get_logger("watching")
class WatchingState(Enum):
"""视线状态枚举"""
WANDERING = "wandering" # 随意看
DANMU = "danmu" # 看弹幕
LENS = "lens" # 看镜头
HEAD_CODE = {
"看向上方": "(0,0.5,0)",
"看向下方": "(0,-0.5,0)",
"看向左边": "(-1,0,0)",
"看向右边": "(1,0,0)",
"随意朝向": "random",
"看向摄像机": "camera",
"注视对方": "(0,0,0)",
"看向正前方": "(0,0,0)",
}
class ChatWatching:
def __init__(self, chat_id: str):
self.chat_id: str = chat_id
self.current_state: WatchingState = WatchingState.LENS # 默认看镜头
self.last_sent_state: Optional[WatchingState] = None # 上次发送的状态
self.state_needs_update: bool = True # 是否需要更新状态
# 状态切换相关
self.is_replying: bool = False # 是否正在生成回复
self.reply_finished_time: Optional[float] = None # 回复完成时间
self.danmu_viewing_duration: float = 1.0 # 看弹幕持续时间(秒)
logger.info(f"[{self.chat_id}] 视线管理器初始化,默认状态: {self.current_state.value}")
async def _change_state(self, new_state: WatchingState, reason: str = ""):
"""内部状态切换方法"""
if self.current_state != new_state:
old_state = self.current_state
self.current_state = new_state
self.state_needs_update = True
logger.info(f"[{self.chat_id}] 视线状态切换: {old_state.value}{new_state.value} ({reason})")
# 立即发送视线状态更新
await self._send_watching_update()
else:
logger.debug(f"[{self.chat_id}] 状态无变化,保持: {new_state.value} ({reason})")
async def on_message_received(self):
"""收到消息时调用"""
if not self.is_replying: # 只有在非回复状态下才切换到看弹幕
await self._change_state(WatchingState.DANMU, "收到消息")
else:
logger.debug(f"[{self.chat_id}] 正在生成回复中,暂不切换到弹幕状态")
async def on_reply_start(self, look_at_lens: bool = True):
async def on_reply_start(self):
"""开始生成回复时调用"""
self.is_replying = True
self.reply_finished_time = None
if look_at_lens:
await self._change_state(WatchingState.LENS, "开始生成回复-看镜头")
else:
await self._change_state(WatchingState.WANDERING, "开始生成回复-随意看")
await send_api.custom_to_stream(
message_type="state", content="start_thinking", stream_id=self.chat_id, storage_message=False
)
async def on_reply_finished(self):
"""生成回复完毕时调用"""
self.is_replying = False
self.reply_finished_time = time.time()
# 先看弹幕1秒
await self._change_state(WatchingState.DANMU, "回复完毕-看弹幕")
logger.info(f"[{self.chat_id}] 回复完毕,将看弹幕{self.danmu_viewing_duration}秒后转为看镜头")
# 设置定时器1秒后自动切换到看镜头
asyncio.create_task(self._auto_switch_to_lens())
async def _auto_switch_to_lens(self):
"""自动切换到看镜头(延迟执行)"""
await asyncio.sleep(self.danmu_viewing_duration)
# 检查是否仍需要切换(可能状态已经被其他事件改变)
if self.reply_finished_time is not None and self.current_state == WatchingState.DANMU and not self.is_replying:
await self._change_state(WatchingState.LENS, "看弹幕时间结束")
self.reply_finished_time = None # 重置完成时间
async def _send_watching_update(self):
"""立即发送视线状态更新"""
await send_api.custom_to_stream(
message_type="watching", content=self.current_state.value, stream_id=self.chat_id, storage_message=False
message_type="state", content="finish_reply", stream_id=self.chat_id, storage_message=False
)
logger.info(f"[{self.chat_id}] 发送视线状态更新: {self.current_state.value}")
self.last_sent_state = self.current_state
self.state_needs_update = False
async def on_thinking_finished(self):
"""思考完毕时调用"""
await send_api.custom_to_stream(
message_type="state", content="finish_thinking", stream_id=self.chat_id, storage_message=False
)
def get_current_state(self) -> WatchingState:
"""获取当前视线状态"""
return self.current_state
def get_state_info(self) -> dict:
"""获取状态信息(用于调试)"""
return {
"current_state": self.current_state.value,
"is_replying": self.is_replying,
"reply_finished_time": self.reply_finished_time,
"state_needs_update": self.state_needs_update,
}
async def on_message_received(self):
"""收到消息时调用"""
await send_api.custom_to_stream(
message_type="state", content="start_viewing", stream_id=self.chat_id, storage_message=False
)
class WatchingManager:
@@ -144,16 +86,6 @@ class WatchingManager:
"""当前视线状态列表"""
self.task_started: bool = False
async def start(self):
"""启动视线管理系统"""
if self.task_started:
return
logger.info("启动视线管理系统...")
self.task_started = True
logger.info("视线管理系统已启动(状态变化时立即发送)")
def get_watching_by_chat_id(self, chat_id: str) -> ChatWatching:
"""获取或创建聊天对应的视线管理器"""
for watching in self.watching_list:
@@ -164,39 +96,8 @@ class WatchingManager:
self.watching_list.append(new_watching)
logger.info(f"为chat {chat_id}创建新的视线管理器")
# 发送初始状态
asyncio.create_task(new_watching._send_watching_update())
return new_watching
def reset_watching_by_chat_id(self, chat_id: str):
"""重置聊天的视线状态"""
for watching in self.watching_list:
if watching.chat_id == chat_id:
watching.current_state = WatchingState.LENS
watching.last_sent_state = None
watching.state_needs_update = True
watching.is_replying = False
watching.reply_finished_time = None
logger.info(f"[{chat_id}] 视线状态已重置为默认状态")
# 发送重置后的状态
asyncio.create_task(watching._send_watching_update())
return
# 如果没有找到现有的watching创建新的
new_watching = ChatWatching(chat_id)
self.watching_list.append(new_watching)
logger.info(f"为chat {chat_id}创建并重置视线管理器")
# 发送初始状态
asyncio.create_task(new_watching._send_watching_update())
def get_all_watching_info(self) -> dict:
"""获取所有聊天的视线状态信息(用于调试)"""
return {watching.chat_id: watching.get_state_info() for watching in self.watching_list}
# 全局视线管理器实例
watching_manager = WatchingManager()
"""全局视线管理器"""

View File

@@ -0,0 +1,74 @@
import json
import time
import random
from src.chat.message_receive.message import MessageRecv
from src.llm_models.utils_model import LLMRequest
from src.common.logger import get_logger
from src.chat.utils.chat_message_builder import build_readable_messages, get_raw_msg_by_timestamp_with_chat_inclusive
from src.config.config import global_config
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.manager.async_task_manager import AsyncTask, async_task_manager
from src.plugin_system.apis import send_api
from json_repair import repair_json
from src.mais4u.s4u_config import s4u_config
from src.plugin_system.apis import send_api
logger = get_logger(__name__)
head_actions_list = [
"不做额外动作",
"点头一次",
"点头两次",
"摇头",
"歪脑袋",
"低头望向一边"
]
async def yes_or_no_head(text: str,emotion: str = "",chat_history: str = "",chat_id: str = ""):
prompt = f"""
{chat_history}
以上是对方的发言:
对这个发言,你的心情是:{emotion}
对上面的发言,你的回复是:{text}
请判断时是否要伴随回复做头部动作,你可以选择:
不做额外动作
点头一次
点头两次
摇头
歪脑袋
低头望向一边
请从上面的动作中选择一个,并输出,请只输出你选择的动作就好,不要输出其他内容。"""
model = LLMRequest(
model=global_config.model.emotion,
temperature=0.7,
request_type="motion",
)
try:
logger.info(f"prompt: {prompt}")
response, (reasoning_content, model_name) = await model.generate_response_async(prompt=prompt)
logger.info(f"response: {response}")
logger.info(f"reasoning_content: {reasoning_content}")
if response in head_actions_list:
head_action = response
else:
head_action = "不做额外动作"
await send_api.custom_to_stream(
message_type="head_action",
content=head_action,
stream_id=chat_id,
storage_message=False,
show_log=True,
)
except Exception as e:
logger.error(f"yes_or_no_head error: {e}")
return "不做额外动作"

View File

@@ -4,13 +4,28 @@ import shutil
from datetime import datetime
from tomlkit import TOMLDocument
from tomlkit.items import Table
from dataclasses import dataclass, fields, MISSING
from dataclasses import dataclass, fields, MISSING, field
from typing import TypeVar, Type, Any, get_origin, get_args, Literal
from src.common.logger import get_logger
logger = get_logger("s4u_config")
# 新增兼容dict和tomlkit Table
def is_dict_like(obj):
return isinstance(obj, (dict, Table))
# 新增递归将Table转为dict
def table_to_dict(obj):
if isinstance(obj, Table):
return {k: table_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, dict):
return {k: table_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [table_to_dict(i) for i in obj]
else:
return obj
# 获取mais4u模块目录
MAIS4U_ROOT = os.path.dirname(__file__)
CONFIG_DIR = os.path.join(MAIS4U_ROOT, "config")
@@ -18,7 +33,7 @@ TEMPLATE_PATH = os.path.join(CONFIG_DIR, "s4u_config_template.toml")
CONFIG_PATH = os.path.join(CONFIG_DIR, "s4u_config.toml")
# S4U配置版本
S4U_VERSION = "1.0.0"
S4U_VERSION = "1.1.0"
T = TypeVar("T", bound="S4UConfigBase")
@@ -30,7 +45,8 @@ class S4UConfigBase:
@classmethod
def from_dict(cls: Type[T], data: dict[str, Any]) -> T:
"""从字典加载配置字段"""
if not isinstance(data, dict):
data = table_to_dict(data) # 递归转dict兼容tomlkit Table
if not is_dict_like(data):
raise TypeError(f"Expected a dictionary, got {type(data).__name__}")
init_args: dict[str, Any] = {}
@@ -66,7 +82,7 @@ class S4UConfigBase:
"""转换字段值为指定类型"""
# 如果是嵌套的 dataclass递归调用 from_dict 方法
if isinstance(field_type, type) and issubclass(field_type, S4UConfigBase):
if not isinstance(value, dict):
if not is_dict_like(value):
raise TypeError(f"Expected a dictionary for {field_type.__name__}, got {type(value).__name__}")
return field_type.from_dict(value)
@@ -96,7 +112,7 @@ class S4UConfigBase:
return tuple(cls._convert_field(item, arg) for item, arg in zip(value, field_type_args, strict=False))
if field_origin_type is dict:
if not isinstance(value, dict):
if not is_dict_like(value):
raise TypeError(f"Expected a dictionary for {field_type.__name__}, got {type(value).__name__}")
if len(field_type_args) != 2:
@@ -127,6 +143,51 @@ class S4UConfigBase:
raise TypeError(f"Cannot convert {type(value).__name__} to {field_type.__name__}") from e
@dataclass
class S4UModelConfig(S4UConfigBase):
"""S4U模型配置类"""
# 主要对话模型配置
chat: dict[str, Any] = field(default_factory=lambda: {})
"""主要对话模型配置"""
# 规划模型配置原model_motion
motion: dict[str, Any] = field(default_factory=lambda: {})
"""规划模型配置"""
# 情感分析模型配置
emotion: dict[str, Any] = field(default_factory=lambda: {})
"""情感分析模型配置"""
# 记忆模型配置
memory: dict[str, Any] = field(default_factory=lambda: {})
"""记忆模型配置"""
# 工具使用模型配置
tool_use: dict[str, Any] = field(default_factory=lambda: {})
"""工具使用模型配置"""
# 嵌入模型配置
embedding: dict[str, Any] = field(default_factory=lambda: {})
"""嵌入模型配置"""
# 视觉语言模型配置
vlm: dict[str, Any] = field(default_factory=lambda: {})
"""视觉语言模型配置"""
# 知识库模型配置
knowledge: dict[str, Any] = field(default_factory=lambda: {})
"""知识库模型配置"""
# 实体提取模型配置
entity_extract: dict[str, Any] = field(default_factory=lambda: {})
"""实体提取模型配置"""
# 问答模型配置
qa: dict[str, Any] = field(default_factory=lambda: {})
"""问答模型配置"""
@dataclass
class S4UConfig(S4UConfigBase):
"""S4U聊天系统配置类"""
@@ -164,9 +225,6 @@ class S4UConfig(S4UConfigBase):
enable_old_message_cleanup: bool = True
"""是否自动清理过旧的普通消息"""
enable_loading_indicator: bool = True
"""是否显示加载提示"""
enable_streaming_output: bool = True
"""是否启用流式输出false时全部生成后一次性发送"""
@@ -176,6 +234,13 @@ class S4UConfig(S4UConfigBase):
max_core_message_length: int = 30
"""核心消息最大长度"""
# 模型配置
models: S4UModelConfig = field(default_factory=S4UModelConfig)
"""S4U模型配置"""
# 兼容性字段,保持向后兼容
@dataclass
class S4UGlobalConfig(S4UConfigBase):