This commit is contained in:
Windpicker-owo
2025-10-30 17:35:29 +08:00
13 changed files with 200 additions and 55 deletions

View File

@@ -590,7 +590,7 @@ class MessageProcessBase(Message):
if self.reply and hasattr(self.reply, "processed_plain_text"):
# print(f"self.reply.processed_plain_text: {self.reply.processed_plain_text}")
# print(f"reply: {self.reply}")
return f"[回复<{self.reply.message_info.user_info.user_nickname}> 的消息:{self.reply.processed_plain_text}]" # type: ignore
return f"[回复<{self.reply.message_info.user_info.user_nickname}({self.reply.message_info.user_info.user_id})> 的消息:{self.reply.processed_plain_text}]" # type: ignore
return None
else:
return f"[{seg.type}:{seg.data!s}]"

View File

@@ -951,41 +951,43 @@ class DefaultReplyer:
# 构建已读历史消息 prompt
read_history_prompt = ""
if read_messages:
# 总是从数据库加载历史记录,并与会话历史合并
logger.info("正在从数据库加载上下文并与会话历史合并...")
db_messages_raw = await get_raw_msg_before_timestamp_with_chat(
chat_id=chat_id,
timestamp=time.time(),
limit=global_config.chat.max_context_size,
)
# 合并和去重
combined_messages = {}
# 首先添加数据库消息
for msg in db_messages_raw:
if msg.get("message_id"):
combined_messages[msg["message_id"]] = msg
# 然后用会话消息覆盖/添加,以确保它们是最新的
for msg_obj in read_messages:
msg_dict = msg_obj.flatten()
if msg_dict.get("message_id"):
combined_messages[msg_dict["message_id"]] = msg_dict
# 按时间排序
sorted_messages = sorted(combined_messages.values(), key=lambda x: x.get("time", 0))
read_history_prompt = ""
if sorted_messages:
# 限制最终用于prompt的历史消息数量
final_history = sorted_messages[-50:]
read_content = await build_readable_messages(
[msg.flatten() for msg in read_messages[-50:]], # 限制数量
final_history,
replace_bot_name=True,
timestamp_mode="normal_no_YMD",
truncate=True,
)
read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}"
else:
# 如果没有已读消息,则从数据库加载最近的上下文
logger.info("暂无已读历史消息,正在从数据库加载上下文...")
fallback_messages = await get_raw_msg_before_timestamp_with_chat(
chat_id=chat_id,
timestamp=time.time(),
limit=global_config.chat.max_context_size,
)
if fallback_messages:
# 从 unread_messages 获取 message_id 列表,用于去重
unread_message_ids = {msg.message_id for msg in unread_messages}
filtered_fallback_messages = [
msg for msg in fallback_messages if msg.get("message_id") not in unread_message_ids
]
if filtered_fallback_messages:
read_content = await build_readable_messages(
filtered_fallback_messages,
replace_bot_name=True,
timestamp_mode="normal_no_YMD",
truncate=True,
)
read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}"
else:
read_history_prompt = "暂无已读历史消息"
else:
read_history_prompt = "暂无已读历史消息"
read_history_prompt = "暂无已读历史消息"
# 构建未读历史消息 prompt
unread_history_prompt = ""
@@ -1817,6 +1819,19 @@ class DefaultReplyer:
prompt
)
if content:
# 循环移除,防止模型生成多个回复头
cleaned_content = content
while True:
new_content = re.sub(r"^\s*\[回复<[^>]+>\s*的消息:[^\]]+\]\s*", "", cleaned_content).lstrip()
if new_content == cleaned_content:
break
cleaned_content = new_content
if cleaned_content != content:
logger.debug(f"移除了模型自行生成的回复头,原始内容: '{content}', 清理后: '{cleaned_content}'")
content = cleaned_content
logger.debug(f"replyer生成内容: {content}")
return content, reasoning_content, model_name, tool_calls

View File

@@ -7,7 +7,7 @@ import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, List, Optional
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatMode, ChatType
@@ -28,6 +28,14 @@ class MessageStatus(Enum):
PROCESSING = "processing" # 处理中
@dataclass
class DecisionRecord(BaseDataModel):
"""决策记录"""
thought: str
action: str
@dataclass
class StreamContext(BaseDataModel):
"""聊天流上下文信息"""
@@ -56,6 +64,7 @@ class StreamContext(BaseDataModel):
triggering_user_id: str | None = None # 触发当前聊天流的用户ID
is_replying: bool = False # 是否正在生成回复
processing_message_id: str | None = None # 当前正在规划/处理的目标消息ID用于防止重复回复
decision_history: List["DecisionRecord"] = field(default_factory=list) # 决策历史
def add_action_to_message(self, message_id: str, action: str):
"""

View File

@@ -143,6 +143,10 @@ class ChatConfig(ValidatedConfigBase):
dynamic_distribution_max_interval: float = Field(default=30.0, ge=5.0, le=300.0, description="最大分发间隔(秒)")
dynamic_distribution_jitter_factor: float = Field(default=0.2, ge=0.0, le=0.5, description="分发间隔随机扰动因子")
max_concurrent_distributions: int = Field(default=10, ge=1, le=100, description="最大并发处理的消息流数量")
enable_decision_history: bool = Field(default=True, description="是否启用决策历史功能")
decision_history_length: int = Field(
default=3, ge=1, le=10, description="决策历史记录的长度,用于增强语言模型的上下文连续性"
)
class MessageReceiveConfig(ValidatedConfigBase):

View File

@@ -26,13 +26,13 @@ class UsageRecord:
provider_name: str
"""提供商名称"""
prompt_tokens: int
prompt_tokens: int = 0
"""提示token数"""
completion_tokens: int
completion_tokens: int = 0
"""完成token数"""
total_tokens: int
total_tokens: int = 0
"""总token数"""

View File

@@ -290,9 +290,9 @@ async def _default_stream_response_handler(
if event.usage:
# 如果有使用情况则将其存储在APIResponse对象中
_usage_record = (
event.usage.prompt_tokens or 0,
event.usage.completion_tokens or 0,
event.usage.total_tokens or 0,
getattr(event.usage, "prompt_tokens", 0) or 0,
getattr(event.usage, "completion_tokens", 0) or 0,
getattr(event.usage, "total_tokens", 0) or 0,
)
try:
@@ -360,9 +360,9 @@ def _default_normal_response_parser(
# 提取Usage信息
if resp.usage:
_usage_record = (
resp.usage.prompt_tokens or 0,
resp.usage.completion_tokens or 0,
resp.usage.total_tokens or 0,
getattr(resp.usage, "prompt_tokens", 0) or 0,
getattr(resp.usage, "completion_tokens", 0) or 0,
getattr(resp.usage, "total_tokens", 0) or 0,
)
else:
_usage_record = None
@@ -591,7 +591,7 @@ class OpenaiClient(BaseClient):
model_name=model_info.name,
provider_name=model_info.api_provider,
prompt_tokens=raw_response.usage.prompt_tokens or 0,
completion_tokens=raw_response.usage.completion_tokens or 0, # type: ignore
completion_tokens=getattr(raw_response.usage, "completion_tokens", 0) or 0,
total_tokens=raw_response.usage.total_tokens or 0,
)

View File

@@ -155,8 +155,12 @@ class LLMUsageRecorder:
endpoint: str,
time_cost: float = 0.0,
):
input_cost = (model_usage.prompt_tokens / 1000000) * model_info.price_in
output_cost = (model_usage.completion_tokens / 1000000) * model_info.price_out
prompt_tokens = getattr(model_usage, "prompt_tokens", 0)
completion_tokens = getattr(model_usage, "completion_tokens", 0)
total_tokens = getattr(model_usage, "total_tokens", 0)
input_cost = (prompt_tokens / 1000000) * model_info.price_in
output_cost = (completion_tokens / 1000000) * model_info.price_out
round(input_cost + output_cost, 6)
session = None
@@ -170,9 +174,9 @@ class LLMUsageRecorder:
user_id=user_id,
request_type=request_type,
endpoint=endpoint,
prompt_tokens=model_usage.prompt_tokens or 0,
completion_tokens=model_usage.completion_tokens or 0,
total_tokens=model_usage.total_tokens or 0,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cost=1.0,
time_cost=round(time_cost or 0.0, 3),
status="success",
@@ -185,8 +189,8 @@ class LLMUsageRecorder:
logger.debug(
f"Token使用情况 - 模型: {model_usage.model_name}, "
f"用户: {user_id}, 类型: {request_type}, "
f"提示词: {model_usage.prompt_tokens}, 完成: {model_usage.completion_tokens}, "
f"总计: {model_usage.total_tokens}"
f"提示词: {prompt_tokens}, 完成: {completion_tokens}, "
f"总计: {total_tokens}"
)
except Exception as e:
logger.error(f"记录token使用情况失败: {e!s}")

View File

@@ -1009,12 +1009,15 @@ class LLMRequest:
# 步骤1: 更新内存中的统计数据,用于负载均衡
stats = self.model_usage[model_info.name]
# 安全地获取 token 使用量, embedding 模型可能不返回 completion_tokens
total_tokens = getattr(usage, "total_tokens", 0)
# 计算新的平均延迟
new_request_count = stats.request_count + 1
new_avg_latency = (stats.avg_latency * stats.request_count + time_cost) / new_request_count
self.model_usage[model_info.name] = stats._replace(
total_tokens=stats.total_tokens + usage.total_tokens,
total_tokens=stats.total_tokens + total_tokens,
avg_latency=new_avg_latency,
request_count=new_request_count,
)

View File

@@ -271,12 +271,34 @@ async def _send_to_target(
message_segment = Seg(type=message_type, data=content) # type: ignore
# 处理回复消息
if reply_to_message:
if reply_to:
# 优先使用 reply_to 字符串构建 anchor_message
# 解析 "发送者(ID)" 格式
import re
match = re.match(r"(.+)\((\d+)\)", reply_to)
if match:
sender_name, sender_id = match.groups()
temp_message_dict = {
"user_nickname": sender_name,
"user_id": sender_id,
"chat_info_platform": target_stream.platform,
"message_id": "temp_reply_id", # 临时ID
"time": time.time()
}
anchor_message = message_dict_to_message_recv(message_dict=temp_message_dict)
else:
anchor_message = None
reply_to_platform_id = f"{target_stream.platform}:{sender_id}" if anchor_message else None
elif reply_to_message:
anchor_message = message_dict_to_message_recv(message_dict=reply_to_message)
anchor_message.update_chat_stream(target_stream)
reply_to_platform_id = (
f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}"
)
if anchor_message:
anchor_message.update_chat_stream(target_stream)
reply_to_platform_id = (
f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}"
)
else:
reply_to_platform_id = None
else:
anchor_message = None
reply_to_platform_id = None

View File

@@ -104,6 +104,23 @@ class ChatterPlanFilter:
# 预解析 action_type 来进行判断
thinking = item.get("thinking", "未提供思考过程")
actions_obj = item.get("actions", {})
# 记录决策历史
if hasattr(global_config.chat, "enable_decision_history") and global_config.chat.enable_decision_history:
action_types_to_log = []
actions_to_process_for_log = []
if isinstance(actions_obj, dict):
actions_to_process_for_log.append(actions_obj)
elif isinstance(actions_obj, list):
actions_to_process_for_log.extend(actions_obj)
for single_action in actions_to_process_for_log:
if isinstance(single_action, dict):
action_types_to_log.append(single_action.get("action_type", "no_action"))
if thinking != "未提供思考过程" and action_types_to_log:
await self._add_decision_to_history(plan, thinking, ", ".join(action_types_to_log))
# 处理actions字段可能是字典或列表的情况
if isinstance(actions_obj, dict):
@@ -141,6 +158,65 @@ class ChatterPlanFilter:
return plan
async def _add_decision_to_history(self, plan: Plan, thought: str, action: str):
"""添加决策记录到历史中"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.data_models.message_manager_data_model import DecisionRecord
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(plan.chat_id)
if not chat_stream:
return
if not thought or not action:
logger.debug("尝试添加空的决策历史,已跳过")
return
context = chat_stream.context_manager.context
new_record = DecisionRecord(thought=thought, action=action)
# 添加新记录
context.decision_history.append(new_record)
# 获取历史长度限制
max_history_length = getattr(global_config.chat, "decision_history_length", 3)
# 如果历史记录超过长度,则移除最旧的记录
if len(context.decision_history) > max_history_length:
context.decision_history.pop(0)
logger.debug(f"已添加决策历史,当前长度: {len(context.decision_history)}")
except Exception as e:
logger.warning(f"记录决策历史失败: {e}")
async def _build_decision_history_block(self, plan: Plan) -> str:
"""构建决策历史块"""
if not hasattr(global_config.chat, "enable_decision_history") or not global_config.chat.enable_decision_history:
return ""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = await chat_manager.get_stream(plan.chat_id)
if not chat_stream:
return ""
context = chat_stream.context_manager.context
if not context.decision_history:
return ""
history_records = []
for i, record in enumerate(context.decision_history):
history_records.append(f"- 思考: {record.thought}\n - 动作: {record.action}")
history_str = "\n".join(history_records)
return f"{history_str}"
except Exception as e:
logger.warning(f"构建决策历史块失败: {e}")
return ""
async def _build_prompt(self, plan: Plan) -> tuple[str, list]:
"""
根据 Plan 对象构建提示词。
@@ -166,6 +242,9 @@ class ChatterPlanFilter:
chat_mood = mood_manager.get_mood_by_chat_id(plan.chat_id)
mood_block = f"你现在的心情是:{chat_mood.mood_state}"
# 构建决策历史
decision_history_block = await self._build_decision_history_block(plan)
# 构建已读/未读历史消息
read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks(
plan
@@ -239,6 +318,7 @@ class ChatterPlanFilter:
mood_block=mood_block,
time_block=time_block,
chat_context_description=chat_context_description,
decision_history_block=decision_history_block,
read_history_block=read_history_block,
unread_history_block=unread_history_block,
actions_before_now_block=actions_before_now_block,

View File

@@ -30,6 +30,9 @@ def init_prompts():
{actions_before_now_block}
## 🤔 最近的决策历史 (回顾你之前的思考与动作,可以帮助你避免重复,并做出更有趣的连贯回应)
{decision_history_block}
## 📜 已读历史(仅供理解,不可作为动作对象)
{read_history_block}

View File

@@ -719,12 +719,16 @@ class MessageHandler:
reply_message = [Seg(type="text", data="(获取发言内容失败)")]
sender_info: dict = message_detail.get("sender")
sender_nickname: str = sender_info.get("nickname")
sender_id = sender_info.get("user_id")
seg_message: List[Seg] = []
if not sender_nickname:
logger.warning("无法获取被引用的人的昵称,返回默认值")
seg_message.append(Seg(type="text", data="[回复 未知用户:"))
else:
seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}>"))
if sender_id:
seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}({sender_id})>"))
else:
seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}>"))
seg_message += reply_message
seg_message.append(Seg(type="text", data="],说:"))
return seg_message

View File

@@ -142,7 +142,8 @@ dynamic_distribution_min_interval = 1.0 # 最小分发间隔(秒)
dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒)
dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子
max_concurrent_distributions = 10 # 最大并发处理的消息流数量可以根据API性能和服务器负载调整
enable_decision_history = true # 是否启用决策历史功能
decision_history_length = 3 # 决策历史记录的长度,用于增强语言模型的上下文连续性
[message_receive]