diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 8753db41d..98b12d694 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -590,7 +590,7 @@ class MessageProcessBase(Message): if self.reply and hasattr(self.reply, "processed_plain_text"): # print(f"self.reply.processed_plain_text: {self.reply.processed_plain_text}") # print(f"reply: {self.reply}") - return f"[回复<{self.reply.message_info.user_info.user_nickname}> 的消息:{self.reply.processed_plain_text}]" # type: ignore + return f"[回复<{self.reply.message_info.user_info.user_nickname}({self.reply.message_info.user_info.user_id})> 的消息:{self.reply.processed_plain_text}]" # type: ignore return None else: return f"[{seg.type}:{seg.data!s}]" diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index a04804544..325fa1e15 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -951,41 +951,43 @@ class DefaultReplyer: # 构建已读历史消息 prompt read_history_prompt = "" - if read_messages: + # 总是从数据库加载历史记录,并与会话历史合并 + logger.info("正在从数据库加载上下文并与会话历史合并...") + db_messages_raw = await get_raw_msg_before_timestamp_with_chat( + chat_id=chat_id, + timestamp=time.time(), + limit=global_config.chat.max_context_size, + ) + + # 合并和去重 + combined_messages = {} + # 首先添加数据库消息 + for msg in db_messages_raw: + if msg.get("message_id"): + combined_messages[msg["message_id"]] = msg + + # 然后用会话消息覆盖/添加,以确保它们是最新的 + for msg_obj in read_messages: + msg_dict = msg_obj.flatten() + if msg_dict.get("message_id"): + combined_messages[msg_dict["message_id"]] = msg_dict + + # 按时间排序 + sorted_messages = sorted(combined_messages.values(), key=lambda x: x.get("time", 0)) + + read_history_prompt = "" + if sorted_messages: + # 限制最终用于prompt的历史消息数量 + final_history = sorted_messages[-50:] read_content = await build_readable_messages( - [msg.flatten() for msg in read_messages[-50:]], # 限制数量 + final_history, replace_bot_name=True, timestamp_mode="normal_no_YMD", truncate=True, ) read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}" else: - # 如果没有已读消息,则从数据库加载最近的上下文 - logger.info("暂无已读历史消息,正在从数据库加载上下文...") - fallback_messages = await get_raw_msg_before_timestamp_with_chat( - chat_id=chat_id, - timestamp=time.time(), - limit=global_config.chat.max_context_size, - ) - if fallback_messages: - # 从 unread_messages 获取 message_id 列表,用于去重 - unread_message_ids = {msg.message_id for msg in unread_messages} - filtered_fallback_messages = [ - msg for msg in fallback_messages if msg.get("message_id") not in unread_message_ids - ] - - if filtered_fallback_messages: - read_content = await build_readable_messages( - filtered_fallback_messages, - replace_bot_name=True, - timestamp_mode="normal_no_YMD", - truncate=True, - ) - read_history_prompt = f"这是已读历史消息,仅作为当前聊天情景的参考:\n{read_content}" - else: - read_history_prompt = "暂无已读历史消息" - else: - read_history_prompt = "暂无已读历史消息" + read_history_prompt = "暂无已读历史消息" # 构建未读历史消息 prompt unread_history_prompt = "" @@ -1817,6 +1819,19 @@ class DefaultReplyer: prompt ) + if content: + # 循环移除,防止模型生成多个回复头 + cleaned_content = content + while True: + new_content = re.sub(r"^\s*\[回复<[^>]+>\s*的消息:[^\]]+\]\s*", "", cleaned_content).lstrip() + if new_content == cleaned_content: + break + cleaned_content = new_content + + if cleaned_content != content: + logger.debug(f"移除了模型自行生成的回复头,原始内容: '{content}', 清理后: '{cleaned_content}'") + content = cleaned_content + logger.debug(f"replyer生成内容: {content}") return content, reasoning_content, model_name, tool_calls diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 81e8a7764..eb29b3302 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -7,7 +7,7 @@ import asyncio import time from dataclasses import dataclass, field from enum import Enum -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, List, Optional from src.common.logger import get_logger from src.plugin_system.base.component_types import ChatMode, ChatType @@ -28,6 +28,14 @@ class MessageStatus(Enum): PROCESSING = "processing" # 处理中 +@dataclass +class DecisionRecord(BaseDataModel): + """决策记录""" + + thought: str + action: str + + @dataclass class StreamContext(BaseDataModel): """聊天流上下文信息""" @@ -56,6 +64,7 @@ class StreamContext(BaseDataModel): triggering_user_id: str | None = None # 触发当前聊天流的用户ID is_replying: bool = False # 是否正在生成回复 processing_message_id: str | None = None # 当前正在规划/处理的目标消息ID,用于防止重复回复 + decision_history: List["DecisionRecord"] = field(default_factory=list) # 决策历史 def add_action_to_message(self, message_id: str, action: str): """ diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 3d1529420..3f7622115 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -143,6 +143,10 @@ class ChatConfig(ValidatedConfigBase): dynamic_distribution_max_interval: float = Field(default=30.0, ge=5.0, le=300.0, description="最大分发间隔(秒)") dynamic_distribution_jitter_factor: float = Field(default=0.2, ge=0.0, le=0.5, description="分发间隔随机扰动因子") max_concurrent_distributions: int = Field(default=10, ge=1, le=100, description="最大并发处理的消息流数量") + enable_decision_history: bool = Field(default=True, description="是否启用决策历史功能") + decision_history_length: int = Field( + default=3, ge=1, le=10, description="决策历史记录的长度,用于增强语言模型的上下文连续性" + ) class MessageReceiveConfig(ValidatedConfigBase): diff --git a/src/llm_models/model_client/base_client.py b/src/llm_models/model_client/base_client.py index 246b0618b..baab2897b 100644 --- a/src/llm_models/model_client/base_client.py +++ b/src/llm_models/model_client/base_client.py @@ -26,13 +26,13 @@ class UsageRecord: provider_name: str """提供商名称""" - prompt_tokens: int + prompt_tokens: int = 0 """提示token数""" - completion_tokens: int + completion_tokens: int = 0 """完成token数""" - total_tokens: int + total_tokens: int = 0 """总token数""" diff --git a/src/llm_models/model_client/openai_client.py b/src/llm_models/model_client/openai_client.py index 06d6b50b2..c42fa2b67 100644 --- a/src/llm_models/model_client/openai_client.py +++ b/src/llm_models/model_client/openai_client.py @@ -290,9 +290,9 @@ async def _default_stream_response_handler( if event.usage: # 如果有使用情况,则将其存储在APIResponse对象中 _usage_record = ( - event.usage.prompt_tokens or 0, - event.usage.completion_tokens or 0, - event.usage.total_tokens or 0, + getattr(event.usage, "prompt_tokens", 0) or 0, + getattr(event.usage, "completion_tokens", 0) or 0, + getattr(event.usage, "total_tokens", 0) or 0, ) try: @@ -360,9 +360,9 @@ def _default_normal_response_parser( # 提取Usage信息 if resp.usage: _usage_record = ( - resp.usage.prompt_tokens or 0, - resp.usage.completion_tokens or 0, - resp.usage.total_tokens or 0, + getattr(resp.usage, "prompt_tokens", 0) or 0, + getattr(resp.usage, "completion_tokens", 0) or 0, + getattr(resp.usage, "total_tokens", 0) or 0, ) else: _usage_record = None @@ -591,7 +591,7 @@ class OpenaiClient(BaseClient): model_name=model_info.name, provider_name=model_info.api_provider, prompt_tokens=raw_response.usage.prompt_tokens or 0, - completion_tokens=raw_response.usage.completion_tokens or 0, # type: ignore + completion_tokens=getattr(raw_response.usage, "completion_tokens", 0) or 0, total_tokens=raw_response.usage.total_tokens or 0, ) diff --git a/src/llm_models/utils.py b/src/llm_models/utils.py index b2c7e57b0..9855b2446 100644 --- a/src/llm_models/utils.py +++ b/src/llm_models/utils.py @@ -155,8 +155,12 @@ class LLMUsageRecorder: endpoint: str, time_cost: float = 0.0, ): - input_cost = (model_usage.prompt_tokens / 1000000) * model_info.price_in - output_cost = (model_usage.completion_tokens / 1000000) * model_info.price_out + prompt_tokens = getattr(model_usage, "prompt_tokens", 0) + completion_tokens = getattr(model_usage, "completion_tokens", 0) + total_tokens = getattr(model_usage, "total_tokens", 0) + + input_cost = (prompt_tokens / 1000000) * model_info.price_in + output_cost = (completion_tokens / 1000000) * model_info.price_out round(input_cost + output_cost, 6) session = None @@ -170,9 +174,9 @@ class LLMUsageRecorder: user_id=user_id, request_type=request_type, endpoint=endpoint, - prompt_tokens=model_usage.prompt_tokens or 0, - completion_tokens=model_usage.completion_tokens or 0, - total_tokens=model_usage.total_tokens or 0, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, cost=1.0, time_cost=round(time_cost or 0.0, 3), status="success", @@ -185,8 +189,8 @@ class LLMUsageRecorder: logger.debug( f"Token使用情况 - 模型: {model_usage.model_name}, " f"用户: {user_id}, 类型: {request_type}, " - f"提示词: {model_usage.prompt_tokens}, 完成: {model_usage.completion_tokens}, " - f"总计: {model_usage.total_tokens}" + f"提示词: {prompt_tokens}, 完成: {completion_tokens}, " + f"总计: {total_tokens}" ) except Exception as e: logger.error(f"记录token使用情况失败: {e!s}") diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 49c83c135..25349ecc1 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -1009,12 +1009,15 @@ class LLMRequest: # 步骤1: 更新内存中的统计数据,用于负载均衡 stats = self.model_usage[model_info.name] + # 安全地获取 token 使用量, embedding 模型可能不返回 completion_tokens + total_tokens = getattr(usage, "total_tokens", 0) + # 计算新的平均延迟 new_request_count = stats.request_count + 1 new_avg_latency = (stats.avg_latency * stats.request_count + time_cost) / new_request_count self.model_usage[model_info.name] = stats._replace( - total_tokens=stats.total_tokens + usage.total_tokens, + total_tokens=stats.total_tokens + total_tokens, avg_latency=new_avg_latency, request_count=new_request_count, ) diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index 191bbc16d..96f0e4b09 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -271,12 +271,34 @@ async def _send_to_target( message_segment = Seg(type=message_type, data=content) # type: ignore # 处理回复消息 - if reply_to_message: + if reply_to: + # 优先使用 reply_to 字符串构建 anchor_message + # 解析 "发送者(ID)" 格式 + import re + match = re.match(r"(.+)\((\d+)\)", reply_to) + if match: + sender_name, sender_id = match.groups() + temp_message_dict = { + "user_nickname": sender_name, + "user_id": sender_id, + "chat_info_platform": target_stream.platform, + "message_id": "temp_reply_id", # 临时ID + "time": time.time() + } + anchor_message = message_dict_to_message_recv(message_dict=temp_message_dict) + else: + anchor_message = None + reply_to_platform_id = f"{target_stream.platform}:{sender_id}" if anchor_message else None + + elif reply_to_message: anchor_message = message_dict_to_message_recv(message_dict=reply_to_message) - anchor_message.update_chat_stream(target_stream) - reply_to_platform_id = ( - f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}" - ) + if anchor_message: + anchor_message.update_chat_stream(target_stream) + reply_to_platform_id = ( + f"{anchor_message.message_info.platform}:{anchor_message.message_info.user_info.user_id}" + ) + else: + reply_to_platform_id = None else: anchor_message = None reply_to_platform_id = None diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py index 3005b4e40..3013afaa4 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_filter.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_filter.py @@ -104,6 +104,23 @@ class ChatterPlanFilter: # 预解析 action_type 来进行判断 thinking = item.get("thinking", "未提供思考过程") actions_obj = item.get("actions", {}) + + # 记录决策历史 + if hasattr(global_config.chat, "enable_decision_history") and global_config.chat.enable_decision_history: + action_types_to_log = [] + actions_to_process_for_log = [] + if isinstance(actions_obj, dict): + actions_to_process_for_log.append(actions_obj) + elif isinstance(actions_obj, list): + actions_to_process_for_log.extend(actions_obj) + + for single_action in actions_to_process_for_log: + if isinstance(single_action, dict): + action_types_to_log.append(single_action.get("action_type", "no_action")) + + if thinking != "未提供思考过程" and action_types_to_log: + await self._add_decision_to_history(plan, thinking, ", ".join(action_types_to_log)) + # 处理actions字段可能是字典或列表的情况 if isinstance(actions_obj, dict): @@ -141,6 +158,65 @@ class ChatterPlanFilter: return plan + async def _add_decision_to_history(self, plan: Plan, thought: str, action: str): + """添加决策记录到历史中""" + try: + from src.chat.message_receive.chat_stream import get_chat_manager + from src.common.data_models.message_manager_data_model import DecisionRecord + + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(plan.chat_id) + if not chat_stream: + return + + if not thought or not action: + logger.debug("尝试添加空的决策历史,已跳过") + return + + context = chat_stream.context_manager.context + new_record = DecisionRecord(thought=thought, action=action) + + # 添加新记录 + context.decision_history.append(new_record) + + # 获取历史长度限制 + max_history_length = getattr(global_config.chat, "decision_history_length", 3) + + # 如果历史记录超过长度,则移除最旧的记录 + if len(context.decision_history) > max_history_length: + context.decision_history.pop(0) + + logger.debug(f"已添加决策历史,当前长度: {len(context.decision_history)}") + + except Exception as e: + logger.warning(f"记录决策历史失败: {e}") + + async def _build_decision_history_block(self, plan: Plan) -> str: + """构建决策历史块""" + if not hasattr(global_config.chat, "enable_decision_history") or not global_config.chat.enable_decision_history: + return "" + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(plan.chat_id) + if not chat_stream: + return "" + + context = chat_stream.context_manager.context + if not context.decision_history: + return "" + + history_records = [] + for i, record in enumerate(context.decision_history): + history_records.append(f"- 思考: {record.thought}\n - 动作: {record.action}") + + history_str = "\n".join(history_records) + return f"{history_str}" + except Exception as e: + logger.warning(f"构建决策历史块失败: {e}") + return "" + async def _build_prompt(self, plan: Plan) -> tuple[str, list]: """ 根据 Plan 对象构建提示词。 @@ -166,6 +242,9 @@ class ChatterPlanFilter: chat_mood = mood_manager.get_mood_by_chat_id(plan.chat_id) mood_block = f"你现在的心情是:{chat_mood.mood_state}" + # 构建决策历史 + decision_history_block = await self._build_decision_history_block(plan) + # 构建已读/未读历史消息 read_history_block, unread_history_block, message_id_list = await self._build_read_unread_history_blocks( plan @@ -239,6 +318,7 @@ class ChatterPlanFilter: mood_block=mood_block, time_block=time_block, chat_context_description=chat_context_description, + decision_history_block=decision_history_block, read_history_block=read_history_block, unread_history_block=unread_history_block, actions_before_now_block=actions_before_now_block, diff --git a/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py b/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py index 6f2bcc6f3..0e760fc15 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner_prompts.py @@ -30,6 +30,9 @@ def init_prompts(): {actions_before_now_block} +## 🤔 最近的决策历史 (回顾你之前的思考与动作,可以帮助你避免重复,并做出更有趣的连贯回应) +{decision_history_block} + ## 📜 已读历史(仅供理解,不可作为动作对象) {read_history_block} diff --git a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py index 6992dbc8b..45c5f4cf1 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py +++ b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py @@ -719,12 +719,16 @@ class MessageHandler: reply_message = [Seg(type="text", data="(获取发言内容失败)")] sender_info: dict = message_detail.get("sender") sender_nickname: str = sender_info.get("nickname") + sender_id = sender_info.get("user_id") seg_message: List[Seg] = [] if not sender_nickname: logger.warning("无法获取被引用的人的昵称,返回默认值") seg_message.append(Seg(type="text", data="[回复 未知用户:")) else: - seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}>:")) + if sender_id: + seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}({sender_id})>:")) + else: + seg_message.append(Seg(type="text", data=f"[回复<{sender_nickname}>:")) seg_message += reply_message seg_message.append(Seg(type="text", data="],说:")) return seg_message diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index ddfdada13..e824467d2 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -142,7 +142,8 @@ dynamic_distribution_min_interval = 1.0 # 最小分发间隔(秒) dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒) dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子 max_concurrent_distributions = 10 # 最大并发处理的消息流数量,可以根据API性能和服务器负载调整 - +enable_decision_history = true # 是否启用决策历史功能 +decision_history_length = 3 # 决策历史记录的长度,用于增强语言模型的上下文连续性 [message_receive]