diff --git a/src/chat/__init__.py b/src/chat/__init__.py index c69d5205e..a569c0226 100644 --- a/src/chat/__init__.py +++ b/src/chat/__init__.py @@ -5,11 +5,9 @@ MaiBot模块系统 from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.emoji_system.emoji_manager import get_emoji_manager -from src.chat.normal_chat.willing.willing_manager import get_willing_manager # 导出主要组件供外部使用 __all__ = [ "get_chat_manager", "get_emoji_manager", - "get_willing_manager", ] diff --git a/src/chat/focus_chat/heartFC_chat.py b/src/chat/focus_chat/heartFC_chat.py index 872a800a8..13361700d 100644 --- a/src/chat/focus_chat/heartFC_chat.py +++ b/src/chat/focus_chat/heartFC_chat.py @@ -1,9 +1,10 @@ import asyncio -import contextlib import time import traceback from collections import deque -from typing import List, Optional, Dict, Any, Deque, Callable, Awaitable +from typing import Optional, Deque, Callable, Awaitable + +from sqlalchemy import False_ from src.chat.message_receive.chat_stream import get_chat_manager from rich.traceback import install from src.chat.utils.prompt_builder import global_prompt_manager @@ -16,6 +17,16 @@ from src.chat.planner_actions.action_manager import ActionManager from src.config.config import global_config from src.person_info.relationship_builder_manager import relationship_builder_manager from src.chat.focus_chat.hfc_utils import CycleDetail +from random import random +from src.chat.focus_chat.hfc_utils import create_thinking_message_from_dict, add_messages_to_manager,get_recent_message_stats,cleanup_thinking_message_by_id +from src.person_info.person_info import get_person_info_manager +from src.plugin_system.apis import generator_api +from ..message_receive.message import MessageThinking +from src.chat.message_receive.normal_message_sender import message_manager +from src.chat.willing.willing_manager import get_willing_manager +from .priority_manager import PriorityManager +from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat_inclusive + ERROR_LOOP_INFO = { @@ -34,6 +45,17 @@ ERROR_LOOP_INFO = { }, } +NO_ACTION = { + "action_result": { + "action_type": "no_action", + "action_data": {}, + "reasoning": "规划器初始化默认", + "is_parallel": True, + }, + "chat_context": "", + "action_prompt": "", +} + install(extra_lines=3) # 注释:原来的动作修改超时常量已移除,因为改为顺序执行 @@ -51,7 +73,6 @@ class HeartFChatting: def __init__( self, chat_id: str, - on_stop_focus_chat: Optional[Callable[[], Awaitable[None]]] = None, ): """ HeartFChatting 初始化函数 @@ -68,6 +89,10 @@ class HeartFChatting: self.relationship_builder = relationship_builder_manager.get_or_create_builder(self.stream_id) + self.loop_mode = "normal" + + self.recent_replies = [] + # 新增:消息计数器和疲惫阈值 self._message_count = 0 # 发送的消息计数 # 基于exit_focus_threshold动态计算疲惫阈值 @@ -90,11 +115,32 @@ class HeartFChatting: self._cycle_history: Deque[CycleDetail] = deque(maxlen=10) # 保留最近10个循环的信息 self._current_cycle_detail: Optional[CycleDetail] = None - # 存储回调函数 - self.on_stop_focus_chat = on_stop_focus_chat - self.reply_timeout_count = 0 self.plan_timeout_count = 0 + + self.last_read_time = time.time()-1 + + + + self.willing_amplifier = 1 + + self.action_type: Optional[str] = None # 当前动作类型 + self.is_parallel_action: bool = False # 是否是可并行动作 + + self._chat_task: Optional[asyncio.Task] = None + self._priority_chat_task: Optional[asyncio.Task] = None # for priority mode consumer + + self.reply_mode = self.chat_stream.context.get_priority_mode() + if self.reply_mode == "priority": + self.priority_manager = PriorityManager( + normal_queue_max_size=5, + ) + else: + self.priority_manager = None + + self.willing_manager = get_willing_manager() + + logger.info( f"{self.log_prefix} HeartFChatting 初始化完成,消息疲惫阈值: {self._message_threshold}条(基于exit_focus_threshold={global_config.chat.exit_focus_threshold}计算,仅在auto模式下生效)" @@ -172,44 +218,169 @@ class HeartFChatting: - async def _focus_mode_loopbody(self): - logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") + async def _loopbody(self): + if self.loop_mode == "focus": + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次观察") + return await self._observe() + elif self.loop_mode == "normal": + now = time.time() + new_messages_data = get_raw_msg_by_timestamp_with_chat_inclusive( + chat_id=self.stream_id, timestamp_start=self.last_read_time, timestamp_end=now, limit_mode="earliest" + ) + + if new_messages_data: + self.last_read_time = now + + for msg_data in new_messages_data: + try: + self.adjust_reply_frequency() + logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次循环") + await self.normal_response(msg_data) + # TODO: 这个地方可能导致阻塞,需要优化 + return True + except Exception as e: + logger.error(f"[{self.log_prefix}] 处理消息时出错: {e} {traceback.format_exc()}") + else: + await asyncio.sleep(0.1) + + return True + + + async def _observe(self,message_data:dict = None): # 创建新的循环信息 cycle_timers, thinking_id = self.start_cycle() + + await create_thinking_message_from_dict(message_data,self.chat_stream,thinking_id) - # 执行规划和处理阶段 - try: - async with global_prompt_manager.async_message_scope( - self.chat_stream.context.get_template_name() - ): + async with global_prompt_manager.async_message_scope( + self.chat_stream.context.get_template_name() + ): - loop_start_time = time.time() - await self.loop_info.observe() - await self.relationship_builder.build_relation() - - # 第一步:动作修改 - with Timer("动作修改", cycle_timers): - try: + loop_start_time = time.time() + await self.loop_info.observe() + await self.relationship_builder.build_relation() + + # 第一步:动作修改 + with Timer("动作修改", cycle_timers): + try: + if self.loop_mode == "focus": await self.action_modifier.modify_actions( loop_info=self.loop_info, mode="focus", ) - except Exception as e: - logger.error(f"{self.log_prefix} 动作修改失败: {e}") + elif self.loop_mode == "normal": + await self.action_modifier.modify_actions(mode="normal") + available_actions = self.action_manager.get_using_actions_for_mode("normal") + except Exception as e: + logger.error(f"{self.log_prefix} 动作修改失败: {e}") + + #如果normal,开始一个回复生成进程,先准备好回复(其实是和planer同时进行的) + if self.loop_mode == "normal": + gen_task = asyncio.create_task(self._generate_normal_response(message_data, available_actions)) + - with Timer("规划器", cycle_timers): - plan_result = await self.action_planner.plan() + with Timer("规划器", cycle_timers): + if self.loop_mode == "focus": + if self.action_modifier.should_skip_planning_for_no_reply(): + logger.info(f"[{self.log_prefix}] 没有可用动作,跳过规划") + action_type = "no_reply" + else: + plan_result = await self.action_planner.plan(mode="focus") + elif self.loop_mode == "normal": + if self.action_modifier.should_skip_planning_for_no_action(): + logger.info(f"[{self.log_prefix}] 没有可用动作,跳过规划") + action_type = "no_action" + else: + plan_result = await self.action_planner.plan(mode="normal") - action_result = plan_result.get("action_result", {}) - action_type, action_data, reasoning = ( - action_result.get("action_type", "error"), - action_result.get("action_data", {}), - action_result.get("reasoning", "未提供理由"), + + + action_result = plan_result.get("action_result", {}) + action_type, action_data, reasoning, is_parallel = ( + action_result.get("action_type", "error"), + action_result.get("action_data", {}), + action_result.get("reasoning", "未提供理由"), + action_result.get("is_parallel", True), + ) + + action_data["loop_start_time"] = loop_start_time + + if self.loop_mode == "normal": + if action_type == "no_action": + logger.info(f"[{self.log_prefix}] {global_config.bot.nickname} 决定进行回复") + elif is_parallel: + logger.info( + f"[{self.log_prefix}] {global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作" + ) + else: + logger.info(f"[{self.log_prefix}] {global_config.bot.nickname} 决定执行{action_type}动作") + + + + if action_type == "no_action": + gather_timeout = global_config.chat.thinking_timeout + results = await asyncio.wait_for( + asyncio.gather(gen_task, return_exceptions=True), + timeout=gather_timeout, ) + response_set = results[0] + + if response_set: + content = " ".join([item[1] for item in response_set if item[0] == "text"]) - action_data["loop_start_time"] = loop_start_time + + if not response_set or ( + action_type not in ["no_action"] and not is_parallel + ): + if not response_set: + logger.warning(f"[{self.log_prefix}] 模型未生成回复内容") + elif action_type not in ["no_action"] and not is_parallel: + logger.info( + f"[{self.log_prefix}] {global_config.bot.nickname} 原本想要回复:{content},但选择执行{self.action_type},不发表回复" + ) + # 如果模型未生成回复,移除思考消息 + await cleanup_thinking_message_by_id(self.chat_stream.stream_id,thinking_id,self.log_prefix) + return False + logger.info(f"[{self.log_prefix}] {global_config.bot.nickname} 决定的回复内容: {content}") + + # 提取回复文本 + reply_texts = [item[1] for item in response_set if item[0] == "text"] + if not reply_texts: + logger.info(f"[{self.log_prefix}] 回复内容中没有文本,不发送消息") + await cleanup_thinking_message_by_id(self.chat_stream.stream_id,thinking_id,self.log_prefix) + return False + + # 发送回复 (不再需要传入 chat) + first_bot_msg = await add_messages_to_manager(message_data, reply_texts, thinking_id,self.chat_stream.stream_id) + + # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) + if first_bot_msg: + # 消息段已在接收消息时更新,这里不需要额外处理 + + # 记录回复信息到最近回复列表中 + reply_info = { + "time": time.time(), + "user_message": message_data.get("processed_plain_text"), + "user_info": { + "user_id": message_data.get("user_id"), + "user_nickname": message_data.get("user_nickname"), + }, + "response": response_set, + "is_reference_reply": message_data.get("reply") is not None, # 判断是否为引用回复 + } + self.recent_replies.append(reply_info) + # 保持最近回复历史在限定数量内 + if len(self.recent_replies) > 10: + self.recent_replies = self.recent_replies[-10 :] + return response_set if response_set else False + + + + + + else: # 动作执行计时 with Timer("动作执行", cycle_timers): success, reply_text, command = await self._handle_action( @@ -233,55 +404,33 @@ class HeartFChatting: return False #停止该聊天模式的循环 - self.end_cycle(loop_info,cycle_timers) - self.print_cycle_info(cycle_timers) + self.end_cycle(loop_info,cycle_timers) + self.print_cycle_info(cycle_timers) - await asyncio.sleep(global_config.focus_chat.think_interval) - - return True - - - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} focus循环任务被取消") - return False - except Exception as e: - logger.error(f"{self.log_prefix} 循环处理时出错: {e}") - logger.error(traceback.format_exc()) - - # 如果_current_cycle_detail存在但未完成,为其设置错误状态 - if self._current_cycle_detail and not hasattr(self._current_cycle_detail, "end_time"): - error_loop_info = ERROR_LOOP_INFO - try: - self._current_cycle_detail.set_loop_info(error_loop_info) - self._current_cycle_detail.complete_cycle() - except Exception as inner_e: - logger.error(f"{self.log_prefix} 设置错误状态时出错: {inner_e}") + if self.loop_mode == "normal": + await self.willing_manager.after_generate_reply_handle(message_data.get("message_id")) - await asyncio.sleep(1) # 出错后等待一秒再继续\ - return False - + return True + + async def _main_chat_loop(self): """主循环,持续进行计划并可能回复消息,直到被外部取消。""" try: - loop_mode = "focus" - loop_mode_loopbody = self._focus_mode_loopbody - - while self.running: # 主循环 - success = await loop_mode_loopbody() + success = await self._loopbody() if not success: break - logger.info(f"{self.log_prefix} 麦麦已强制离开 {loop_mode} 聊天模式") - - + logger.info(f"{self.log_prefix} 麦麦已强制离开聊天") except asyncio.CancelledError: # 设置了关闭标志位后被取消是正常流程 - logger.info(f"{self.log_prefix} 麦麦已强制离开 {loop_mode} 聊天模式") - except Exception as e: - logger.error(f"{self.log_prefix} 麦麦 {loop_mode} 聊天模式意外错误: {e}") + logger.info(f"{self.log_prefix} 麦麦已关闭聊天") + except Exception: + logger.error(f"{self.log_prefix} 麦麦聊天意外错误") print(traceback.format_exc()) + # 理论上不能到这里 + logger.error(f"{self.log_prefix} 麦麦聊天意外错误,结束了聊天循环") async def _handle_action( self, @@ -376,8 +525,6 @@ class HeartFChatting: return command return "" - - def _get_current_fatigue_threshold(self) -> int: """动态获取当前的疲惫阈值,基于exit_focus_threshold配置 @@ -427,3 +574,226 @@ class HeartFChatting: logger.info(f"{self.log_prefix} HeartFChatting关闭完成") + + def adjust_reply_frequency(self): + """ + 根据预设规则动态调整回复意愿(willing_amplifier)。 + - 评估周期:10分钟 + - 目标频率:由 global_config.chat.talk_frequency 定义(例如 1条/分钟) + - 调整逻辑: + - 0条回复 -> 5.0x 意愿 + - 达到目标回复数 -> 1.0x 意愿(基准) + - 达到目标2倍回复数 -> 0.2x 意愿 + - 中间值线性变化 + - 增益抑制:如果最近5分钟回复过快,则不增加意愿。 + """ + # --- 1. 定义参数 --- + evaluation_minutes = 10.0 + target_replies_per_min = global_config.chat.get_current_talk_frequency( + self.stream_id + ) # 目标频率:e.g. 1条/分钟 + target_replies_in_window = target_replies_per_min * evaluation_minutes # 10分钟内的目标回复数 + + if target_replies_in_window <= 0: + logger.debug(f"[{self.log_prefix}] 目标回复频率为0或负数,不调整意愿放大器。") + return + + # --- 2. 获取近期统计数据 --- + stats_10_min = get_recent_message_stats(minutes=evaluation_minutes, chat_id=self.stream_id) + bot_reply_count_10_min = stats_10_min["bot_reply_count"] + + # --- 3. 计算新的意愿放大器 (willing_amplifier) --- + # 基于回复数在 [0, target*2] 区间内进行分段线性映射 + if bot_reply_count_10_min <= target_replies_in_window: + # 在 [0, 目标数] 区间,意愿从 5.0 线性下降到 1.0 + new_amplifier = 5.0 + (bot_reply_count_10_min - 0) * (1.0 - 5.0) / (target_replies_in_window - 0) + elif bot_reply_count_10_min <= target_replies_in_window * 2: + # 在 [目标数, 目标数*2] 区间,意愿从 1.0 线性下降到 0.2 + over_target_cap = target_replies_in_window * 2 + new_amplifier = 1.0 + (bot_reply_count_10_min - target_replies_in_window) * (0.2 - 1.0) / ( + over_target_cap - target_replies_in_window + ) + else: + # 超过目标数2倍,直接设为最小值 + new_amplifier = 0.2 + + # --- 4. 检查是否需要抑制增益 --- + # "如果邻近5分钟内,回复数量 > 频率/2,就不再进行增益" + suppress_gain = False + if new_amplifier > self.willing_amplifier: # 仅在计算结果为增益时检查 + suppression_minutes = 5.0 + # 5分钟内目标回复数的一半 + suppression_threshold = (target_replies_per_min / 2) * suppression_minutes # e.g., (1/2)*5 = 2.5 + stats_5_min = get_recent_message_stats(minutes=suppression_minutes, chat_id=self.stream_id) + bot_reply_count_5_min = stats_5_min["bot_reply_count"] + + if bot_reply_count_5_min > suppression_threshold: + suppress_gain = True + + # --- 5. 更新意愿放大器 --- + if suppress_gain: + logger.debug( + f"[{self.log_prefix}] 回复增益被抑制。最近5分钟内回复数 ({bot_reply_count_5_min}) " + f"> 阈值 ({suppression_threshold:.1f})。意愿放大器保持在 {self.willing_amplifier:.2f}" + ) + # 不做任何改动 + else: + # 限制最终值在 [0.2, 5.0] 范围内 + self.willing_amplifier = max(0.2, min(5.0, new_amplifier)) + logger.debug( + f"[{self.log_prefix}] 调整回复意愿。10分钟内回复: {bot_reply_count_10_min} (目标: {target_replies_in_window:.0f}) -> " + f"意愿放大器更新为: {self.willing_amplifier:.2f}" + ) + + + + async def normal_response(self, message_data: dict) -> None: + """ + 处理接收到的消息。 + 在"兴趣"模式下,判断是否回复并生成内容。 + """ + + is_mentioned = message_data.get("is_mentioned", False) + interested_rate = message_data.get("interest_rate", 0.0) * self.willing_amplifier + + reply_probability = ( + 1.0 if is_mentioned and global_config.normal_chat.mentioned_bot_inevitable_reply else 0.0 + ) # 如果被提及,且开启了提及必回复,则基础概率为1,否则需要意愿判断 + + # 意愿管理器:设置当前message信息 + self.willing_manager.setup(message_data, self.chat_stream) + + # 获取回复概率 + # 仅在未被提及或基础概率不为1时查询意愿概率 + if reply_probability < 1: # 简化逻辑,如果未提及 (reply_probability 为 0),则获取意愿概率 + # is_willing = True + reply_probability = await self.willing_manager.get_reply_probability(message_data.get("message_id")) + + additional_config = message_data.get("additional_config", {}) + if additional_config and "maimcore_reply_probability_gain" in additional_config: + reply_probability += additional_config["maimcore_reply_probability_gain"] + reply_probability = min(max(reply_probability, 0), 1) # 确保概率在 0-1 之间 + + # 处理表情包 + if message_data.get("is_emoji") or message_data.get("is_picid"): + reply_probability = 0 + + # 应用疲劳期回复频率调整 + fatigue_multiplier = self._get_fatigue_reply_multiplier() + original_probability = reply_probability + reply_probability *= fatigue_multiplier + + # 如果应用了疲劳调整,记录日志 + if fatigue_multiplier < 1.0: + logger.info( + f"[{self.log_prefix}] 疲劳期回复频率调整: {original_probability * 100:.1f}% -> {reply_probability * 100:.1f}% (系数: {fatigue_multiplier:.2f})" + ) + + # 打印消息信息 + mes_name = self.chat_stream.group_info.group_name if self.chat_stream.group_info else "私聊" + if reply_probability > 0.1: + logger.info( + f"[{mes_name}]" + f"{message_data.get('user_nickname')}:" + f"{message_data.get('processed_plain_text')}[兴趣:{interested_rate:.2f}][回复概率:{reply_probability * 100:.1f}%]" + ) + + if random() < reply_probability: + await self.willing_manager.before_generate_reply_handle(message_data.get("message_id")) + await self._observe(message_data = message_data) + + # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) + self.willing_manager.delete(message_data.get("message_id")) + + return True + + + async def _generate_normal_response( + self, message_data: dict, available_actions: Optional[list] + ) -> Optional[list]: + """生成普通回复""" + try: + person_info_manager = get_person_info_manager() + person_id = person_info_manager.get_person_id( + message_data.get("chat_info_platform"), message_data.get("user_id") + ) + person_name = await person_info_manager.get_value(person_id, "person_name") + reply_to_str = f"{person_name}:{message_data.get('processed_plain_text')}" + + success, reply_set = await generator_api.generate_reply( + chat_stream=self.chat_stream, + reply_to=reply_to_str, + available_actions=available_actions, + enable_tool=global_config.tool.enable_in_normal_chat, + request_type="normal.replyer", + ) + + if not success or not reply_set: + logger.info(f"对 {message_data.get('processed_plain_text')} 的回复生成失败") + return None + + return reply_set + + except Exception as e: + logger.error(f"[{self.log_prefix}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") + return None + + + def _get_fatigue_reply_multiplier(self) -> float: + """获取疲劳期回复频率调整系数 + + Returns: + float: 回复频率调整系数,范围0.5-1.0 + """ + if not self.get_cooldown_progress_callback: + return 1.0 # 没有冷却进度回调,返回正常系数 + + try: + cooldown_progress = self.get_cooldown_progress_callback() + + if cooldown_progress >= 1.0: + return 1.0 # 冷却完成,正常回复频率 + + # 疲劳期间:从0.5逐渐恢复到1.0 + # progress=0时系数为0.5,progress=1时系数为1.0 + multiplier = 0.2 + (0.8 * cooldown_progress) + + return multiplier + except Exception as e: + logger.warning(f"[{self.log_prefix}] 获取疲劳调整系数时出错: {e}") + return 1.0 # 出错时返回正常系数 + + # async def _check_should_switch_to_focus(self) -> bool: + # """ + # 检查是否满足切换到focus模式的条件 + + # Returns: + # bool: 是否应该切换到focus模式 + # """ + # # 检查思考消息堆积情况 + # container = await message_manager.get_container(self.stream_id) + # if container: + # thinking_count = sum(1 for msg in container.messages if isinstance(msg, MessageThinking)) + # if thinking_count >= 4 * global_config.chat.auto_focus_threshold: # 如果堆积超过阈值条思考消息 + # logger.debug(f"[{self.stream_name}] 检测到思考消息堆积({thinking_count}条),切换到focus模式") + # return True + + # if not self.recent_replies: + # return False + + # current_time = time.time() + # time_threshold = 120 / global_config.chat.auto_focus_threshold + # reply_threshold = 6 * global_config.chat.auto_focus_threshold + + # one_minute_ago = current_time - time_threshold + + # # 统计指定时间内的回复数量 + # recent_reply_count = sum(1 for reply in self.recent_replies if reply["time"] > one_minute_ago) + + # should_switch = recent_reply_count > reply_threshold + # if should_switch: + # logger.debug( + # f"[{self.stream_name}] 检测到{time_threshold:.0f}秒内回复数量({recent_reply_count})大于{reply_threshold},满足切换到focus模式条件" + # ) + + # return should_switch \ No newline at end of file diff --git a/src/chat/focus_chat/hfc_utils.py b/src/chat/focus_chat/hfc_utils.py index 5820d8eb4..c36f06a77 100644 --- a/src/chat/focus_chat/hfc_utils.py +++ b/src/chat/focus_chat/hfc_utils.py @@ -1,11 +1,19 @@ import time from typing import Optional -from src.chat.message_receive.message import MessageRecv, BaseMessageInfo from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.message import UserInfo from src.common.logger import get_logger -import json from typing import Dict, Any +from src.config.config import global_config +from src.chat.message_receive.message import MessageThinking +from src.chat.message_receive.normal_message_sender import message_manager +from typing import List +from maim_message import Seg +from src.common.message_repository import count_messages +from ..message_receive.message import MessageSending, MessageSet, message_from_db_dict +from src.chat.message_receive.chat_stream import get_chat_manager + + logger = get_logger(__name__) @@ -113,3 +121,129 @@ def parse_thinking_id_to_timestamp(thinking_id: str) -> float: ts_str = thinking_id[3:] return float(ts_str) + +async def create_thinking_message_from_dict(message_data: dict, chat_stream: ChatStream, thinking_id: str) -> str: + """创建思考消息""" + bot_user_info = UserInfo( + user_id=global_config.bot.qq_account, + user_nickname=global_config.bot.nickname, + platform=message_data.get("chat_info_platform"), + ) + + thinking_message = MessageThinking( + message_id=thinking_id, + chat_stream=chat_stream, + bot_user_info=bot_user_info, + reply=None, + thinking_start_time=time.time(), + timestamp=time.time(), + ) + + await message_manager.add_message(thinking_message) + return thinking_id + +async def cleanup_thinking_message_by_id(chat_id: str, thinking_id: str, log_prefix: str): + """根据ID清理思考消息""" + try: + container = await message_manager.get_container(chat_id) + if container: + for msg in container.messages[:]: + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + container.messages.remove(msg) + logger.info(f"{log_prefix}已清理思考消息 {thinking_id}") + break + except Exception as e: + logger.error(f"{log_prefix} 清理思考消息 {thinking_id} 时出错: {e}") + + + +async def add_messages_to_manager( + message_data: dict, response_set: List[str], thinking_id, chat_id + ) -> Optional[MessageSending]: + """发送回复消息""" + + chat_stream = get_chat_manager().get_stream(chat_id) + + container = await message_manager.get_container(chat_id) # 使用 self.stream_id + thinking_message = None + + for msg in container.messages[:]: + # print(msg) + if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + thinking_message = msg + container.messages.remove(msg) + break + + if not thinking_message: + logger.warning(f"[{chat_id}] 未找到对应的思考消息 {thinking_id},可能已超时被移除") + return None + + thinking_start_time = thinking_message.thinking_start_time + message_set = MessageSet(chat_stream, thinking_id) # 使用 self.chat_stream + + sender_info = UserInfo( + user_id=message_data.get("user_id"), + user_nickname=message_data.get("user_nickname"), + platform=message_data.get("chat_info_platform"), + ) + + reply = message_from_db_dict(message_data) + + + mark_head = False + first_bot_msg = None + for msg in response_set: + if global_config.debug.debug_show_chat_mode: + msg += "ⁿ" + message_segment = Seg(type="text", data=msg) + bot_message = MessageSending( + message_id=thinking_id, + chat_stream=chat_stream, # 使用 self.chat_stream + bot_user_info=UserInfo( + user_id=global_config.bot.qq_account, + user_nickname=global_config.bot.nickname, + platform=message_data.get("chat_info_platform"), + ), + sender_info=sender_info, + message_segment=message_segment, + reply=reply, + is_head=not mark_head, + is_emoji=False, + thinking_start_time=thinking_start_time, + apply_set_reply_logic=True, + ) + if not mark_head: + mark_head = True + first_bot_msg = bot_message + message_set.add_message(bot_message) + + await message_manager.add_message(message_set) + + return first_bot_msg + + +def get_recent_message_stats(minutes: int = 30, chat_id: str = None) -> dict: + """ + Args: + minutes (int): 检索的分钟数,默认30分钟 + chat_id (str, optional): 指定的chat_id,仅统计该chat下的消息。为None时统计全部。 + Returns: + dict: {"bot_reply_count": int, "total_message_count": int} + """ + + now = time.time() + start_time = now - minutes * 60 + bot_id = global_config.bot.qq_account + + filter_base = {"time": {"$gte": start_time}} + if chat_id is not None: + filter_base["chat_id"] = chat_id + + # 总消息数 + total_message_count = count_messages(filter_base) + # bot自身回复数 + bot_filter = filter_base.copy() + bot_filter["user_id"] = bot_id + bot_reply_count = count_messages(bot_filter) + + return {"bot_reply_count": bot_reply_count, "total_message_count": total_message_count} diff --git a/src/chat/normal_chat/priority_manager.py b/src/chat/focus_chat/priority_manager.py similarity index 99% rename from src/chat/normal_chat/priority_manager.py rename to src/chat/focus_chat/priority_manager.py index facbecd23..a3f379651 100644 --- a/src/chat/normal_chat/priority_manager.py +++ b/src/chat/focus_chat/priority_manager.py @@ -2,7 +2,7 @@ import time import heapq import math import json -from typing import List, Dict, Optional +from typing import List, Optional from src.common.logger import get_logger diff --git a/src/chat/heart_flow/chat_state_info.py b/src/chat/heart_flow/chat_state_info.py deleted file mode 100644 index 33936186b..000000000 --- a/src/chat/heart_flow/chat_state_info.py +++ /dev/null @@ -1,13 +0,0 @@ -import enum - - -class ChatState(enum.Enum): - ABSENT = "没在看群" - NORMAL = "随便水群" - FOCUSED = "认真水群" - - -class ChatStateInfo: - def __init__(self): - self.chat_status: ChatState = ChatState.NORMAL - self.current_state_time = 120 diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index 0e4655952..2b55f9fe6 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -1,13 +1,11 @@ import asyncio import time -from typing import Optional, List, Dict, Tuple +from typing import Optional, List, Tuple import traceback from src.common.logger import get_logger from src.chat.message_receive.message import MessageRecv from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.focus_chat.heartFC_chat import HeartFChatting -from src.chat.normal_chat.normal_chat import NormalChat -from src.chat.heart_flow.chat_state_info import ChatState, ChatStateInfo from src.chat.utils.utils import get_chat_type_and_target_info from src.config.config import global_config from rich.traceback import install @@ -31,11 +29,6 @@ class SubHeartflow: self.subheartflow_id = subheartflow_id self.chat_id = subheartflow_id - # 这个聊天流的状态 - self.chat_state: ChatStateInfo = ChatStateInfo() - self.chat_state_changed_time: float = time.time() - self.chat_state_last_time: float = 0 - self.history_chat_state: List[Tuple[ChatState, float]] = [] self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_id) self.log_prefix = get_chat_manager().get_stream_name(self.subheartflow_id) or self.subheartflow_id @@ -47,125 +40,14 @@ class SubHeartflow: # CHAT模式激活 随便水群 FOCUS模式激活 认真水群 self.heart_fc_instance: Optional[HeartFChatting] = HeartFChatting( chat_id=self.subheartflow_id, - on_stop_focus_chat=self._handle_stop_focus_chat_request, ) # 该sub_heartflow的HeartFChatting实例 - self.normal_chat_instance: Optional[NormalChat] = NormalChat( - chat_stream=get_chat_manager().get_stream(self.chat_id), - on_switch_to_focus_callback=self._handle_switch_to_focus_request, - get_cooldown_progress_callback=self.get_cooldown_progress, - ) # 该sub_heartflow的NormalChat实例 async def initialize(self): """异步初始化方法,创建兴趣流并确定聊天类型""" + await self.heart_fc_instance.start() - # 根据配置决定初始状态 - if not self.is_group_chat: - logger.debug(f"{self.log_prefix} 检测到是私聊,将直接尝试进入 FOCUSED 状态。") - await self.change_chat_state(ChatState.FOCUSED) - elif global_config.chat.chat_mode == "focus": - logger.debug(f"{self.log_prefix} 配置为 focus 模式,将直接尝试进入 FOCUSED 状态。") - await self.change_chat_state(ChatState.FOCUSED) - else: # "auto" 或其他模式保持原有逻辑或默认为 NORMAL - logger.debug(f"{self.log_prefix} 配置为 auto 或其他模式,将尝试进入 NORMAL 状态。") - await self.change_chat_state(ChatState.NORMAL) - def update_last_chat_state_time(self): - self.chat_state_last_time = time.time() - self.chat_state_changed_time - async def _stop_normal_chat(self): - """ - 停止 NormalChat 实例 - 切出 CHAT 状态时使用 - """ - if self.normal_chat_instance: - logger.info(f"{self.log_prefix} 离开normal模式") - try: - logger.debug(f"{self.log_prefix} 开始调用 stop_chat()") - # 使用更短的超时时间,强制快速停止 - await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=3.0) - logger.debug(f"{self.log_prefix} stop_chat() 调用完成") - except Exception as e: - logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}") - # 出错时也要清理实例,避免状态不一致 - self.normal_chat_instance = None - finally: - # 确保实例被清理 - if self.normal_chat_instance: - logger.warning(f"{self.log_prefix} 强制清理 NormalChat 实例") - self.normal_chat_instance = None - logger.debug(f"{self.log_prefix} _stop_normal_chat 完成") - else: - logger.info(f"{self.log_prefix} 没有normal聊天实例,无需停止normal聊天") - - async def _start_normal_chat(self) -> bool: - """ - 启动 NormalChat 实例,并进行异步初始化。 - 进入 CHAT 状态时使用。 - 确保 HeartFChatting 已停止。 - """ - await self._stop_heart_fc_chat() # 确保 专注聊天已停止 - - try: - # 获取聊天流并创建 NormalChat 实例 (同步部分) - chat_stream = get_chat_manager().get_stream(self.chat_id) - # 在 NormalChat 实例尚未创建时,创建新实例 - if not self.normal_chat_instance: - # 提供回调函数,用于接收需要切换到focus模式的通知 - self.normal_chat_instance = NormalChat( - chat_stream=chat_stream, - on_switch_to_focus_callback=self._handle_switch_to_focus_request, - get_cooldown_progress_callback=self.get_cooldown_progress, - ) - - logger.info(f"[{self.log_prefix}] 开始普通聊天") - await self.normal_chat_instance.start_chat() # start_chat now ensures init is called again if needed - return True - except Exception as e: - logger.error(f"[{self.log_prefix}] 启动 NormalChat 或其初始化时出错: {e}") - logger.error(traceback.format_exc()) - self.normal_chat_instance = None # 启动/初始化失败,清理实例 - return False - - async def _handle_switch_to_focus_request(self) -> bool: - """ - 处理来自NormalChat的切换到focus模式的请求 - - Args: - stream_id: 请求切换的stream_id - Returns: - bool: 切换成功返回True,失败返回False - """ - logger.info(f"{self.log_prefix} 收到NormalChat请求切换到focus模式") - - # 检查是否在focus冷却期内 - if self.is_in_focus_cooldown(): - logger.info(f"{self.log_prefix} 正在focus冷却期内,忽略切换到focus模式的请求") - return False - - # 切换到focus模式 - current_state = self.chat_state.chat_status - if current_state == ChatState.NORMAL: - await self.change_chat_state(ChatState.FOCUSED) - logger.info(f"{self.log_prefix} 已根据NormalChat请求从NORMAL切换到FOCUSED状态") - return True - else: - logger.warning(f"{self.log_prefix} 当前状态为{current_state.value},无法切换到FOCUSED状态") - return False - - async def _handle_stop_focus_chat_request(self) -> None: - """ - 处理来自HeartFChatting的停止focus模式的请求 - 当收到stop_focus_chat命令时被调用 - """ - logger.info(f"{self.log_prefix} 收到HeartFChatting请求停止focus模式") - - # 切换到normal模式 - current_state = self.chat_state.chat_status - if current_state == ChatState.FOCUSED: - await self.change_chat_state(ChatState.NORMAL) - logger.info(f"{self.log_prefix} 已根据HeartFChatting请求从FOCUSED切换到NORMAL状态") - else: - logger.warning(f"{self.log_prefix} 当前状态为{current_state.value},无法切换到NORMAL状态") async def _stop_heart_fc_chat(self): """停止并清理 HeartFChatting 实例""" @@ -204,64 +86,7 @@ class SubHeartflow: logger.error(f"{self.log_prefix} _start_heart_fc_chat 执行时出错: {e}") logger.error(traceback.format_exc()) return False - - async def change_chat_state(self, new_state: ChatState) -> None: - """ - 改变聊天状态。 - 如果转换到CHAT或FOCUSED状态时超过限制,会保持当前状态。 - """ - current_state = self.chat_state.chat_status - state_changed = False - - - if new_state == ChatState.NORMAL: - if self.normal_chat_instance.running: - logger.info(f"{self.log_prefix} 当前状态已经为normal") - return - else: - if await self._start_normal_chat(): - logger.debug(f"{self.log_prefix} 成功进入或保持 NormalChat 状态。") - state_changed = True - else: - logger.error(f"{self.log_prefix} 启动 NormalChat 失败,无法进入 CHAT 状态。") - return - - elif new_state == ChatState.FOCUSED: - if self.heart_fc_instance.running: - logger.info(f"{self.log_prefix} 当前状态已经为focused") - return - if await self._start_heart_fc_chat(): - logger.debug(f"{self.log_prefix} 成功进入或保持 HeartFChatting 状态。") - state_changed = True - else: - logger.error(f"{self.log_prefix} 启动 HeartFChatting 失败,无法进入 FOCUSED 状态。") - # 启动失败时,保持当前状态 - return - - # --- 记录focus模式退出时间 --- - if state_changed and current_state == ChatState.FOCUSED and new_state != ChatState.FOCUSED: - self.last_focus_exit_time = time.time() - logger.debug(f"{self.log_prefix} 记录focus模式退出时间: {self.last_focus_exit_time}") - - # --- 更新状态和最后活动时间 --- - if state_changed: - self.update_last_chat_state_time() - self.history_chat_state.append((current_state, self.chat_state_last_time)) - - self.chat_state.chat_status = new_state - self.chat_state_last_time = 0 - self.chat_state_changed_time = time.time() - else: - logger.debug( - f"{self.log_prefix} 尝试将状态从 {current_state.value} 变为 {new_state.value},但未成功或未执行更改。" - ) - - def add_message_to_normal_chat_cache(self, message: MessageRecv, interest_value: float, is_mentioned: bool): - self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) - # 如果字典长度超过10,删除最旧的消息 - if len(self.interest_dict) > 30: - oldest_key = next(iter(self.interest_dict)) - self.interest_dict.pop(oldest_key) + def is_in_focus_cooldown(self) -> bool: """检查是否在focus模式的冷却期内 diff --git a/src/chat/message_receive/normal_message_sender.py b/src/chat/message_receive/normal_message_sender.py index aa6721db3..c8bf72107 100644 --- a/src/chat/message_receive/normal_message_sender.py +++ b/src/chat/message_receive/normal_message_sender.py @@ -13,6 +13,7 @@ from ..utils.utils import truncate_message, calculate_typing_time, count_message from src.common.logger import get_logger from rich.traceback import install +import traceback install(extra_lines=3) @@ -292,6 +293,7 @@ class MessageManager: await asyncio.gather(*tasks) except Exception as e: logger.error(f"消息处理循环 gather 出错: {e}") + print(traceback.format_exc()) # 等待一小段时间,避免CPU空转 try: diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index 0efcf16d8..1102ab655 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -1,6 +1,6 @@ import asyncio -from typing import Dict, Optional # 重新导入类型 -from src.chat.message_receive.message import MessageSending, MessageThinking +from typing import Dict # 重新导入类型 +from src.chat.message_receive.message import MessageSending from src.common.message.api import get_global_api from src.chat.message_receive.storage import MessageStorage from src.chat.utils.utils import truncate_message @@ -36,42 +36,6 @@ class HeartFCSender: def __init__(self): self.storage = MessageStorage() - # 用于存储活跃的思考消息 - self.thinking_messages: Dict[str, Dict[str, MessageThinking]] = {} - self._thinking_lock = asyncio.Lock() # 保护 thinking_messages 的锁 - - async def register_thinking(self, thinking_message: MessageThinking): - """注册一个思考中的消息。""" - if not thinking_message.chat_stream or not thinking_message.message_info.message_id: - logger.error("无法注册缺少 chat_stream 或 message_id 的思考消息") - return - - chat_id = thinking_message.chat_stream.stream_id - message_id = thinking_message.message_info.message_id - - async with self._thinking_lock: - if chat_id not in self.thinking_messages: - self.thinking_messages[chat_id] = {} - if message_id in self.thinking_messages[chat_id]: - logger.warning(f"[{chat_id}] 尝试注册已存在的思考消息 ID: {message_id}") - self.thinking_messages[chat_id][message_id] = thinking_message - logger.debug(f"[{chat_id}] Registered thinking message: {message_id}") - - async def complete_thinking(self, chat_id: str, message_id: str): - """完成并移除一个思考中的消息记录。""" - async with self._thinking_lock: - if chat_id in self.thinking_messages and message_id in self.thinking_messages[chat_id]: - del self.thinking_messages[chat_id][message_id] - logger.debug(f"[{chat_id}] Completed thinking message: {message_id}") - if not self.thinking_messages[chat_id]: - del self.thinking_messages[chat_id] - logger.debug(f"[{chat_id}] Removed empty thinking message container.") - - async def get_thinking_start_time(self, chat_id: str, message_id: str) -> Optional[float]: - """获取已注册思考消息的开始时间。""" - async with self._thinking_lock: - thinking_message = self.thinking_messages.get(chat_id, {}).get(message_id) - return thinking_message.thinking_start_time if thinking_message else None async def send_message(self, message: MessageSending, typing=False, set_reply=False, storage_message=True): """ @@ -121,5 +85,4 @@ class HeartFCSender: except Exception as e: logger.error(f"[{chat_id}] 处理或存储消息 {message_id} 时出错: {e}") raise e - finally: - await self.complete_thinking(chat_id, message_id) + diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 32fb24966..5a9293dd8 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -1,29 +1,21 @@ import asyncio import time -from random import random -from typing import List, Optional +from typing import Optional from src.config.config import global_config from src.common.logger import get_logger -from src.person_info.person_info import get_person_info_manager -from src.plugin_system.apis import generator_api -from maim_message import UserInfo, Seg from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.chat.utils.timer_calculator import Timer -from src.common.message_repository import count_messages -from src.chat.utils.prompt_builder import global_prompt_manager -from ..message_receive.message import MessageSending, MessageThinking, MessageSet, MessageRecv,message_from_db_dict +from ..message_receive.message import MessageThinking from src.chat.message_receive.normal_message_sender import message_manager from src.chat.normal_chat.willing.willing_manager import get_willing_manager from src.chat.planner_actions.action_manager import ActionManager from src.person_info.relationship_builder_manager import relationship_builder_manager -from .priority_manager import PriorityManager +from ..focus_chat.priority_manager import PriorityManager import traceback from src.chat.planner_actions.planner import ActionPlanner from src.chat.planner_actions.action_modifier import ActionModifier from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat_inclusive from src.chat.utils.utils import get_chat_type_and_target_info -from src.mood.mood_manager import mood_manager willing_manager = get_willing_manager() @@ -62,7 +54,7 @@ class NormalChat: self.willing_amplifier = 1 self.start_time = time.time() - self.mood_manager = mood_manager + # self.mood_manager = mood_manager self.start_time = time.time() self.running = False @@ -115,40 +107,40 @@ class NormalChat: self._priority_chat_task.cancel() logger.info(f"[{self.stream_name}] NormalChat 已停用。") - async def _interest_mode_loopbody(self): - try: - await asyncio.sleep(LOOP_INTERVAL) + # async def _interest_mode_loopbody(self): + # try: + # await asyncio.sleep(LOOP_INTERVAL) - if self._disabled: - return False + # if self._disabled: + # return False - now = time.time() - new_messages_data = get_raw_msg_by_timestamp_with_chat_inclusive( - chat_id=self.stream_id, timestamp_start=self.last_read_time, timestamp_end=now, limit_mode="earliest" - ) + # now = time.time() + # new_messages_data = get_raw_msg_by_timestamp_with_chat_inclusive( + # chat_id=self.stream_id, timestamp_start=self.last_read_time, timestamp_end=now, limit_mode="earliest" + # ) - if new_messages_data: - self.last_read_time = now + # if new_messages_data: + # self.last_read_time = now - for msg_data in new_messages_data: - try: - self.adjust_reply_frequency() - await self.normal_response( - message_data=msg_data, - is_mentioned=msg_data.get("is_mentioned", False), - interested_rate=msg_data.get("interest_rate", 0.0) * self.willing_amplifier, - ) - return True - except Exception as e: - logger.error(f"[{self.stream_name}] 处理消息时出错: {e} {traceback.format_exc()}") + # for msg_data in new_messages_data: + # try: + # self.adjust_reply_frequency() + # await self.normal_response( + # message_data=msg_data, + # is_mentioned=msg_data.get("is_mentioned", False), + # interested_rate=msg_data.get("interest_rate", 0.0) * self.willing_amplifier, + # ) + # return True + # except Exception as e: + # logger.error(f"[{self.stream_name}] 处理消息时出错: {e} {traceback.format_exc()}") - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 兴趣模式轮询任务被取消") - return False - except Exception: - logger.error(f"[{self.stream_name}] 兴趣模式轮询循环出现错误: {traceback.format_exc()}", exc_info=True) - await asyncio.sleep(10) + # except asyncio.CancelledError: + # logger.info(f"[{self.stream_name}] 兴趣模式轮询任务被取消") + # return False + # except Exception: + # logger.error(f"[{self.stream_name}] 兴趣模式轮询循环出现错误: {traceback.format_exc()}", exc_info=True) + # await asyncio.sleep(10) async def _priority_mode_loopbody(self): try: @@ -181,20 +173,20 @@ class NormalChat: logger.error(f"[{self.stream_name}] 优先级消息生产者循环出现错误: {traceback.format_exc()}", exc_info=True) await asyncio.sleep(10) - async def _interest_message_polling_loop(self): - """ - [Interest Mode] 通过轮询数据库获取新消息并直接处理。 - """ - logger.info(f"[{self.stream_name}] 兴趣模式消息轮询任务开始") - try: - while not self._disabled: - success = await self._interest_mode_loopbody() + # async def _interest_message_polling_loop(self): + # """ + # [Interest Mode] 通过轮询数据库获取新消息并直接处理。 + # """ + # logger.info(f"[{self.stream_name}] 兴趣模式消息轮询任务开始") + # try: + # while not self._disabled: + # success = await self._interest_mode_loopbody() - if not success: - break + # if not success: + # break - except asyncio.CancelledError: - logger.info(f"[{self.stream_name}] 兴趣模式消息轮询任务被优雅地取消了") + # except asyncio.CancelledError: + # logger.info(f"[{self.stream_name}] 兴趣模式消息轮询任务被优雅地取消了") @@ -232,404 +224,403 @@ class NormalChat: await asyncio.sleep(10) # 改为实例方法 - async def _create_thinking_message(self, message_data: dict, timestamp: Optional[float] = None) -> str: - """创建思考消息""" - bot_user_info = UserInfo( - user_id=global_config.bot.qq_account, - user_nickname=global_config.bot.nickname, - platform=message_data.get("chat_info_platform"), - ) + # async def _create_thinking_message(self, message_data: dict, timestamp: Optional[float] = None) -> str: + # """创建思考消息""" + # bot_user_info = UserInfo( + # user_id=global_config.bot.qq_account, + # user_nickname=global_config.bot.nickname, + # platform=message_data.get("chat_info_platform"), + # ) - thinking_time_point = round(time.time(), 2) - thinking_id = "tid" + str(thinking_time_point) - thinking_message = MessageThinking( - message_id=thinking_id, - chat_stream=self.chat_stream, - bot_user_info=bot_user_info, - reply=None, - thinking_start_time=thinking_time_point, - timestamp=timestamp if timestamp is not None else None, - ) + # thinking_time_point = round(time.time(), 2) + # thinking_id = "tid" + str(thinking_time_point) + # thinking_message = MessageThinking( + # message_id=thinking_id, + # chat_stream=self.chat_stream, + # bot_user_info=bot_user_info, + # reply=None, + # thinking_start_time=thinking_time_point, + # timestamp=timestamp if timestamp is not None else None, + # ) - await message_manager.add_message(thinking_message) - return thinking_id + # await message_manager.add_message(thinking_message) + # return thinking_id # 改为实例方法 - async def _add_messages_to_manager( - self, message_data: dict, response_set: List[str], thinking_id - ) -> Optional[MessageSending]: - """发送回复消息""" - container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id - thinking_message = None + # async def _add_messages_to_manager( + # self, message_data: dict, response_set: List[str], thinking_id + # ) -> Optional[MessageSending]: + # """发送回复消息""" + # container = await message_manager.get_container(self.stream_id) # 使用 self.stream_id + # thinking_message = None - for msg in container.messages[:]: - if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: - thinking_message = msg - container.messages.remove(msg) - break + # for msg in container.messages[:]: + # if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + # thinking_message = msg + # container.messages.remove(msg) + # break - if not thinking_message: - logger.warning(f"[{self.stream_name}] 未找到对应的思考消息 {thinking_id},可能已超时被移除") - return None + # if not thinking_message: + # logger.warning(f"[{self.stream_name}] 未找到对应的思考消息 {thinking_id},可能已超时被移除") + # return None - thinking_start_time = thinking_message.thinking_start_time - message_set = MessageSet(self.chat_stream, thinking_id) # 使用 self.chat_stream + # thinking_start_time = thinking_message.thinking_start_time + # message_set = MessageSet(self.chat_stream, thinking_id) # 使用 self.chat_stream - sender_info = UserInfo( - user_id=message_data.get("user_id"), - user_nickname=message_data.get("user_nickname"), - platform=message_data.get("chat_info_platform"), - ) + # sender_info = UserInfo( + # user_id=message_data.get("user_id"), + # user_nickname=message_data.get("user_nickname"), + # platform=message_data.get("chat_info_platform"), + # ) - reply = message_from_db_dict(message_data) + # reply = message_from_db_dict(message_data) - mark_head = False - first_bot_msg = None - for msg in response_set: - if global_config.debug.debug_show_chat_mode: - msg += "ⁿ" - message_segment = Seg(type="text", data=msg) - bot_message = MessageSending( - message_id=thinking_id, - chat_stream=self.chat_stream, # 使用 self.chat_stream - bot_user_info=UserInfo( - user_id=global_config.bot.qq_account, - user_nickname=global_config.bot.nickname, - platform=message_data.get("chat_info_platform"), - ), - sender_info=sender_info, - message_segment=message_segment, - reply=reply, - is_head=not mark_head, - is_emoji=False, - thinking_start_time=thinking_start_time, - apply_set_reply_logic=True, - ) - if not mark_head: - mark_head = True - first_bot_msg = bot_message - message_set.add_message(bot_message) + # mark_head = False + # first_bot_msg = None + # for msg in response_set: + # if global_config.debug.debug_show_chat_mode: + # msg += "ⁿ" + # message_segment = Seg(type="text", data=msg) + # bot_message = MessageSending( + # message_id=thinking_id, + # chat_stream=self.chat_stream, # 使用 self.chat_stream + # bot_user_info=UserInfo( + # user_id=global_config.bot.qq_account, + # user_nickname=global_config.bot.nickname, + # platform=message_data.get("chat_info_platform"), + # ), + # sender_info=sender_info, + # message_segment=message_segment, + # reply=reply, + # is_head=not mark_head, + # is_emoji=False, + # thinking_start_time=thinking_start_time, + # apply_set_reply_logic=True, + # ) + # if not mark_head: + # mark_head = True + # first_bot_msg = bot_message + # message_set.add_message(bot_message) - await message_manager.add_message(message_set) + # await message_manager.add_message(message_set) - return first_bot_msg + # return first_bot_msg # 改为实例方法, 移除 chat 参数 - async def normal_response(self, message_data: dict, is_mentioned: bool, interested_rate: float) -> None: - """ - 处理接收到的消息。 - 在"兴趣"模式下,判断是否回复并生成内容。 - """ - if self._disabled: - return + # async def normal_response(self, message_data: dict, is_mentioned: bool, interested_rate: float) -> None: + # """ + # 处理接收到的消息。 + # 在"兴趣"模式下,判断是否回复并生成内容。 + # """ + # if self._disabled: + # return - # 新增:在auto模式下检查是否需要直接切换到focus模式 - if global_config.chat.chat_mode == "auto": - if await self._check_should_switch_to_focus(): - logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,尝试执行切换") - if self.on_switch_to_focus_callback: - switched_successfully = await self.on_switch_to_focus_callback() - if switched_successfully: - logger.info(f"[{self.stream_name}] 成功切换到focus模式,中止NormalChat处理") - return - else: - logger.info(f"[{self.stream_name}] 切换到focus模式失败(可能在冷却中),继续NormalChat处理") - else: - logger.warning(f"[{self.stream_name}] 没有设置切换到focus聊天模式的回调函数,无法执行切换") + # # 新增:在auto模式下检查是否需要直接切换到focus模式 + # if global_config.chat.chat_mode == "auto": + # if await self._check_should_switch_to_focus(): + # logger.info(f"[{self.stream_name}] 检测到切换到focus聊天模式的条件,尝试执行切换") + # if self.on_switch_to_focus_callback: + # switched_successfully = await self.on_switch_to_focus_callback() + # if switched_successfully: + # logger.info(f"[{self.stream_name}] 成功切换到focus模式,中止NormalChat处理") + # return + # else: + # logger.info(f"[{self.stream_name}] 切换到focus模式失败(可能在冷却中),继续NormalChat处理") + # else: + # logger.warning(f"[{self.stream_name}] 没有设置切换到focus聊天模式的回调函数,无法执行切换") - # --- 以下为 "兴趣" 模式逻辑 (从 _process_message 合并而来) --- - timing_results = {} - reply_probability = ( - 1.0 if is_mentioned and global_config.normal_chat.mentioned_bot_inevitable_reply else 0.0 - ) # 如果被提及,且开启了提及必回复,则基础概率为1,否则需要意愿判断 + # # --- 以下为 "兴趣" 模式逻辑 (从 _process_message 合并而来) --- + # timing_results = {} + # reply_probability = ( + # 1.0 if is_mentioned and global_config.normal_chat.mentioned_bot_inevitable_reply else 0.0 + # ) # 如果被提及,且开启了提及必回复,则基础概率为1,否则需要意愿判断 - # 意愿管理器:设置当前message信息 - willing_manager.setup(message_data, self.chat_stream) - # TODO: willing_manager 也需要修改以接收字典 + # # 意愿管理器:设置当前message信息 + # willing_manager.setup(message_data, self.chat_stream) - # 获取回复概率 - # is_willing = False - # 仅在未被提及或基础概率不为1时查询意愿概率 - if reply_probability < 1: # 简化逻辑,如果未提及 (reply_probability 为 0),则获取意愿概率 - # is_willing = True - reply_probability = await willing_manager.get_reply_probability(message_data.get("message_id")) + # # 获取回复概率 + # # is_willing = False + # # 仅在未被提及或基础概率不为1时查询意愿概率 + # if reply_probability < 1: # 简化逻辑,如果未提及 (reply_probability 为 0),则获取意愿概率 + # # is_willing = True + # reply_probability = await willing_manager.get_reply_probability(message_data.get("message_id")) - additional_config = message_data.get("additional_config", {}) - if additional_config and "maimcore_reply_probability_gain" in additional_config: - reply_probability += additional_config["maimcore_reply_probability_gain"] - reply_probability = min(max(reply_probability, 0), 1) # 确保概率在 0-1 之间 + # additional_config = message_data.get("additional_config", {}) + # if additional_config and "maimcore_reply_probability_gain" in additional_config: + # reply_probability += additional_config["maimcore_reply_probability_gain"] + # reply_probability = min(max(reply_probability, 0), 1) # 确保概率在 0-1 之间 - # 处理表情包 - if message_data.get("is_emoji") or message_data.get("is_picid"): - reply_probability = 0 + # # 处理表情包 + # if message_data.get("is_emoji") or message_data.get("is_picid"): + # reply_probability = 0 - # 应用疲劳期回复频率调整 - fatigue_multiplier = self._get_fatigue_reply_multiplier() - original_probability = reply_probability - reply_probability *= fatigue_multiplier + # # 应用疲劳期回复频率调整 + # fatigue_multiplier = self._get_fatigue_reply_multiplier() + # original_probability = reply_probability + # reply_probability *= fatigue_multiplier - # 如果应用了疲劳调整,记录日志 - if fatigue_multiplier < 1.0: - logger.info( - f"[{self.stream_name}] 疲劳期回复频率调整: {original_probability * 100:.1f}% -> {reply_probability * 100:.1f}% (系数: {fatigue_multiplier:.2f})" - ) + # # 如果应用了疲劳调整,记录日志 + # if fatigue_multiplier < 1.0: + # logger.info( + # f"[{self.stream_name}] 疲劳期回复频率调整: {original_probability * 100:.1f}% -> {reply_probability * 100:.1f}% (系数: {fatigue_multiplier:.2f})" + # ) - # 打印消息信息 - mes_name = self.chat_stream.group_info.group_name if self.chat_stream.group_info else "私聊" - if reply_probability > 0.1: - logger.info( - f"[{mes_name}]" - f"{message_data.get('user_nickname')}:" - f"{message_data.get('processed_plain_text')}[兴趣:{interested_rate:.2f}][回复概率:{reply_probability * 100:.1f}%]" - ) - do_reply = False - response_set = None # 初始化 response_set - if random() < reply_probability: - with Timer("获取回复", timing_results): - await willing_manager.before_generate_reply_handle(message_data.get("message_id")) - do_reply = await self.reply_one_message(message_data) - response_set = do_reply if do_reply else None + # # 打印消息信息 + # mes_name = self.chat_stream.group_info.group_name if self.chat_stream.group_info else "私聊" + # if reply_probability > 0.1: + # logger.info( + # f"[{mes_name}]" + # f"{message_data.get('user_nickname')}:" + # f"{message_data.get('processed_plain_text')}[兴趣:{interested_rate:.2f}][回复概率:{reply_probability * 100:.1f}%]" + # ) + # do_reply = False + # response_set = None # 初始化 response_set + # if random() < reply_probability: + # with Timer("获取回复", timing_results): + # await willing_manager.before_generate_reply_handle(message_data.get("message_id")) + # do_reply = await self.reply_one_message(message_data) + # response_set = do_reply if do_reply else None - # 输出性能计时结果 - if do_reply and response_set: # 确保 response_set 不是 None - timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) - trigger_msg = message_data.get("processed_plain_text") - response_msg = " ".join([item[1] for item in response_set if item[0] == "text"]) - logger.info( - f"[{self.stream_name}]回复消息: {trigger_msg[:30]}... | 回复内容: {response_msg[:30]}... | 计时: {timing_str}" - ) - await willing_manager.after_generate_reply_handle(message_data.get("message_id")) - elif not do_reply: - # 不回复处理 - await willing_manager.not_reply_handle(message_data.get("message_id")) + # # 输出性能计时结果 + # if do_reply and response_set: # 确保 response_set 不是 None + # timing_str = " | ".join([f"{step}: {duration:.2f}秒" for step, duration in timing_results.items()]) + # trigger_msg = message_data.get("processed_plain_text") + # response_msg = " ".join([item[1] for item in response_set if item[0] == "text"]) + # logger.info( + # f"[{self.stream_name}]回复消息: {trigger_msg[:30]}... | 回复内容: {response_msg[:30]}... | 计时: {timing_str}" + # ) + # await willing_manager.after_generate_reply_handle(message_data.get("message_id")) + # elif not do_reply: + # # 不回复处理 + # await willing_manager.not_reply_handle(message_data.get("message_id")) - # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) - willing_manager.delete(message_data.get("message_id")) + # # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) + # willing_manager.delete(message_data.get("message_id")) - async def _generate_normal_response( - self, message_data: dict, available_actions: Optional[list] - ) -> Optional[list]: - """生成普通回复""" - try: - person_info_manager = get_person_info_manager() - person_id = person_info_manager.get_person_id( - message_data.get("chat_info_platform"), message_data.get("user_id") - ) - person_name = await person_info_manager.get_value(person_id, "person_name") - reply_to_str = f"{person_name}:{message_data.get('processed_plain_text')}" + # async def _generate_normal_response( + # self, message_data: dict, available_actions: Optional[list] + # ) -> Optional[list]: + # """生成普通回复""" + # try: + # person_info_manager = get_person_info_manager() + # person_id = person_info_manager.get_person_id( + # message_data.get("chat_info_platform"), message_data.get("user_id") + # ) + # person_name = await person_info_manager.get_value(person_id, "person_name") + # reply_to_str = f"{person_name}:{message_data.get('processed_plain_text')}" - success, reply_set = await generator_api.generate_reply( - chat_stream=self.chat_stream, - reply_to=reply_to_str, - available_actions=available_actions, - enable_tool=global_config.tool.enable_in_normal_chat, - request_type="normal.replyer", - ) + # success, reply_set = await generator_api.generate_reply( + # chat_stream=self.chat_stream, + # reply_to=reply_to_str, + # available_actions=available_actions, + # enable_tool=global_config.tool.enable_in_normal_chat, + # request_type="normal.replyer", + # ) - if not success or not reply_set: - logger.info(f"对 {message_data.get('processed_plain_text')} 的回复生成失败") - return None + # if not success or not reply_set: + # logger.info(f"对 {message_data.get('processed_plain_text')} 的回复生成失败") + # return None - return reply_set + # return reply_set - except Exception as e: - logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") - return None + # except Exception as e: + # logger.error(f"[{self.stream_name}] 回复生成出现错误:{str(e)} {traceback.format_exc()}") + # return None - async def _plan_and_execute_actions(self, message_data: dict, thinking_id: str) -> Optional[dict]: - """规划和执行额外动作""" - no_action = { - "action_result": { - "action_type": "no_action", - "action_data": {}, - "reasoning": "规划器初始化默认", - "is_parallel": True, - }, - "chat_context": "", - "action_prompt": "", - } + # async def _plan_and_execute_actions(self, message_data: dict, thinking_id: str) -> Optional[dict]: + # """规划和执行额外动作""" + # no_action = { + # "action_result": { + # "action_type": "no_action", + # "action_data": {}, + # "reasoning": "规划器初始化默认", + # "is_parallel": True, + # }, + # "chat_context": "", + # "action_prompt": "", + # } - if not self.enable_planner: - logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划") - return no_action + # if not self.enable_planner: + # logger.debug(f"[{self.stream_name}] Planner未启用,跳过动作规划") + # return no_action - try: - # 检查是否应该跳过规划 - if self.action_modifier.should_skip_planning(): - logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划") - self.action_type = "no_action" - return no_action + # try: + # # 检查是否应该跳过规划 + # if self.action_modifier.should_skip_planning(): + # logger.debug(f"[{self.stream_name}] 没有可用动作,跳过规划") + # self.action_type = "no_action" + # return no_action - # 执行规划 - plan_result = await self.planner.plan() - action_type = plan_result["action_result"]["action_type"] - action_data = plan_result["action_result"]["action_data"] - reasoning = plan_result["action_result"]["reasoning"] - is_parallel = plan_result["action_result"].get("is_parallel", False) + # # 执行规划 + # plan_result = await self.planner.plan() + # action_type = plan_result["action_result"]["action_type"] + # action_data = plan_result["action_result"]["action_data"] + # reasoning = plan_result["action_result"]["reasoning"] + # is_parallel = plan_result["action_result"].get("is_parallel", False) - if action_type == "no_action": - logger.info(f"[{self.stream_name}] {global_config.bot.nickname} 决定进行回复") - elif is_parallel: - logger.info( - f"[{self.stream_name}] {global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作" - ) - else: - logger.info(f"[{self.stream_name}] {global_config.bot.nickname} 决定执行{action_type}动作") + # if action_type == "no_action": + # logger.info(f"[{self.stream_name}] {global_config.bot.nickname} 决定进行回复") + # elif is_parallel: + # logger.info( + # f"[{self.stream_name}] {global_config.bot.nickname} 决定进行回复, 同时执行{action_type}动作" + # ) + # else: + # logger.info(f"[{self.stream_name}] {global_config.bot.nickname} 决定执行{action_type}动作") - self.action_type = action_type # 更新实例属性 - self.is_parallel_action = is_parallel # 新增:保存并行执行标志 + # self.action_type = action_type # 更新实例属性 + # self.is_parallel_action = is_parallel # 新增:保存并行执行标志 - # 如果规划器决定不执行任何动作 - if action_type == "no_action": - logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作") - return no_action + # # 如果规划器决定不执行任何动作 + # if action_type == "no_action": + # logger.debug(f"[{self.stream_name}] Planner决定不执行任何额外动作") + # return no_action - # 执行额外的动作(不影响回复生成) - action_result = await self._execute_action(action_type, action_data, message_data, thinking_id) - if action_result is not None: - logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成") - else: - logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败") + # # 执行额外的动作(不影响回复生成) + # action_result = await self._handle_action(action_type, action_data, message_data, thinking_id) + # if action_result is not None: + # logger.info(f"[{self.stream_name}] 额外动作 {action_type} 执行完成") + # else: + # logger.warning(f"[{self.stream_name}] 额外动作 {action_type} 执行失败") - return { - "action_type": action_type, - "action_data": action_data, - "reasoning": reasoning, - "is_parallel": is_parallel, - } + # return { + # "action_type": action_type, + # "action_data": action_data, + # "reasoning": reasoning, + # "is_parallel": is_parallel, + # } - except Exception as e: - logger.error(f"[{self.stream_name}] Planner执行失败: {e}") - return no_action + # except Exception as e: + # logger.error(f"[{self.stream_name}] Planner执行失败: {e}") + # return no_action - async def reply_one_message(self, message_data: dict) -> None: - # 回复前处理 - await self.relationship_builder.build_relation() + # async def reply_one_message(self, message_data: dict) -> None: + # # 回复前处理 + # await self.relationship_builder.build_relation() - thinking_id = await self._create_thinking_message(message_data) + # thinking_id = await self._create_thinking_message(message_data) - # 如果启用planner,预先修改可用actions(避免在并行任务中重复调用) - available_actions = None - if self.enable_planner: - try: - await self.action_modifier.modify_actions(mode="normal", message_content=message_data.get("processed_plain_text")) - available_actions = self.action_manager.get_using_actions_for_mode("normal") - except Exception as e: - logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}") - available_actions = None + # # 如果启用planner,预先修改可用actions(避免在并行任务中重复调用) + # available_actions = None + # if self.enable_planner: + # try: + # await self.action_modifier.modify_actions(mode="normal", message_content=message_data.get("processed_plain_text")) + # available_actions = self.action_manager.get_using_actions_for_mode("normal") + # except Exception as e: + # logger.warning(f"[{self.stream_name}] 获取available_actions失败: {e}") + # available_actions = None - # 并行执行回复生成和动作规划 - self.action_type = None # 初始化动作类型 - self.is_parallel_action = False # 初始化并行动作标志 + # # 并行执行回复生成和动作规划 + # self.action_type = None # 初始化动作类型 + # self.is_parallel_action = False # 初始化并行动作标志 - gen_task = asyncio.create_task(self._generate_normal_response(message_data, available_actions)) - plan_task = asyncio.create_task(self._plan_and_execute_actions(message_data, thinking_id)) + # gen_task = asyncio.create_task(self._generate_normal_response(message_data, available_actions)) + # plan_task = asyncio.create_task(self._plan_and_execute_actions(message_data, thinking_id)) - try: - gather_timeout = global_config.chat.thinking_timeout - results = await asyncio.wait_for( - asyncio.gather(gen_task, plan_task, return_exceptions=True), - timeout=gather_timeout, - ) - response_set, plan_result = results - except asyncio.TimeoutError: - gen_timed_out = not gen_task.done() - plan_timed_out = not plan_task.done() + # try: + # gather_timeout = global_config.chat.thinking_timeout + # results = await asyncio.wait_for( + # asyncio.gather(gen_task, plan_task, return_exceptions=True), + # timeout=gather_timeout, + # ) + # response_set, plan_result = results + # except asyncio.TimeoutError: + # gen_timed_out = not gen_task.done() + # plan_timed_out = not plan_task.done() - timeout_details = [] - if gen_timed_out: - timeout_details.append("回复生成(gen)") - if plan_timed_out: - timeout_details.append("动作规划(plan)") + # timeout_details = [] + # if gen_timed_out: + # timeout_details.append("回复生成(gen)") + # if plan_timed_out: + # timeout_details.append("动作规划(plan)") - timeout_source = " 和 ".join(timeout_details) + # timeout_source = " 和 ".join(timeout_details) - logger.warning( - f"[{self.stream_name}] {timeout_source} 任务超时 ({global_config.chat.thinking_timeout}秒),正在取消相关任务..." - ) - # print(f"111{self.timeout_count}") - self.timeout_count += 1 - if self.timeout_count > 5: - logger.warning( - f"[{self.stream_name}] 连续回复超时次数过多,{global_config.chat.thinking_timeout}秒 内大模型没有返回有效内容,请检查你的api是否速度过慢或配置错误。建议不要使用推理模型,推理模型生成速度过慢。或者尝试拉高thinking_timeout参数,这可能导致回复时间过长。" - ) + # logger.warning( + # f"[{self.stream_name}] {timeout_source} 任务超时 ({global_config.chat.thinking_timeout}秒),正在取消相关任务..." + # ) + # # print(f"111{self.timeout_count}") + # self.timeout_count += 1 + # if self.timeout_count > 5: + # logger.warning( + # f"[{self.stream_name}] 连续回复超时次数过多,{global_config.chat.thinking_timeout}秒 内大模型没有返回有效内容,请检查你的api是否速度过慢或配置错误。建议不要使用推理模型,推理模型生成速度过慢。或者尝试拉高thinking_timeout参数,这可能导致回复时间过长。" + # ) - # 取消未完成的任务 - if not gen_task.done(): - gen_task.cancel() - if not plan_task.done(): - plan_task.cancel() + # # 取消未完成的任务 + # if not gen_task.done(): + # gen_task.cancel() + # if not plan_task.done(): + # plan_task.cancel() - # 清理思考消息 - await self._cleanup_thinking_message_by_id(thinking_id) + # # 清理思考消息 + # await self._cleanup_thinking_message_by_id(thinking_id) - response_set = None - plan_result = None + # response_set = None + # plan_result = None - # 处理生成回复的结果 - if isinstance(response_set, Exception): - logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}") - response_set = None + # # 处理生成回复的结果 + # if isinstance(response_set, Exception): + # logger.error(f"[{self.stream_name}] 回复生成异常: {response_set}") + # response_set = None - # 处理规划结果(可选,不影响回复) - if isinstance(plan_result, Exception): - logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}") - elif plan_result: - logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}") + # # 处理规划结果(可选,不影响回复) + # if isinstance(plan_result, Exception): + # logger.error(f"[{self.stream_name}] 动作规划异常: {plan_result}") + # elif plan_result: + # logger.debug(f"[{self.stream_name}] 额外动作处理完成: {self.action_type}") - if response_set: - content = " ".join([item[1] for item in response_set if item[0] == "text"]) + # if response_set: + # content = " ".join([item[1] for item in response_set if item[0] == "text"]) - if not response_set or ( - self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action - ): - if not response_set: - logger.warning(f"[{self.stream_name}] 模型未生成回复内容") - elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action: - logger.info( - f"[{self.stream_name}] {global_config.bot.nickname} 原本想要回复:{content},但选择执行{self.action_type},不发表回复" - ) - # 如果模型未生成回复,移除思考消息 - await self._cleanup_thinking_message_by_id(thinking_id) - return False + # if not response_set or ( + # self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action + # ): + # if not response_set: + # logger.warning(f"[{self.stream_name}] 模型未生成回复内容") + # elif self.enable_planner and self.action_type not in ["no_action"] and not self.is_parallel_action: + # logger.info( + # f"[{self.stream_name}] {global_config.bot.nickname} 原本想要回复:{content},但选择执行{self.action_type},不发表回复" + # ) + # # 如果模型未生成回复,移除思考消息 + # await self._cleanup_thinking_message_by_id(thinking_id) + # return False - logger.info(f"[{self.stream_name}] {global_config.bot.nickname} 决定的回复内容: {content}") + # logger.info(f"[{self.stream_name}] {global_config.bot.nickname} 决定的回复内容: {content}") - if self._disabled: - logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") - return False + # if self._disabled: + # logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") + # return False - # 提取回复文本 - reply_texts = [item[1] for item in response_set if item[0] == "text"] - if not reply_texts: - logger.info(f"[{self.stream_name}] 回复内容中没有文本,不发送消息") - await self._cleanup_thinking_message_by_id(thinking_id) - return False + # # 提取回复文本 + # reply_texts = [item[1] for item in response_set if item[0] == "text"] + # if not reply_texts: + # logger.info(f"[{self.stream_name}] 回复内容中没有文本,不发送消息") + # await self._cleanup_thinking_message_by_id(thinking_id) + # return False - # 发送回复 (不再需要传入 chat) - first_bot_msg = await self._add_messages_to_manager(message_data, reply_texts, thinking_id) + # # 发送回复 (不再需要传入 chat) + # first_bot_msg = await add_messages_to_manager(message_data, reply_texts, thinking_id) - # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) - if first_bot_msg: - # 消息段已在接收消息时更新,这里不需要额外处理 + # # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) + # if first_bot_msg: + # # 消息段已在接收消息时更新,这里不需要额外处理 - # 记录回复信息到最近回复列表中 - reply_info = { - "time": time.time(), - "user_message": message_data.get("processed_plain_text"), - "user_info": { - "user_id": message_data.get("user_id"), - "user_nickname": message_data.get("user_nickname"), - }, - "response": response_set, - "is_reference_reply": message_data.get("reply") is not None, # 判断是否为引用回复 - } - self.recent_replies.append(reply_info) - # 保持最近回复历史在限定数量内 - if len(self.recent_replies) > self.max_replies_history: - self.recent_replies = self.recent_replies[-self.max_replies_history :] - return response_set if response_set else False + # # 记录回复信息到最近回复列表中 + # reply_info = { + # "time": time.time(), + # "user_message": message_data.get("processed_plain_text"), + # "user_info": { + # "user_id": message_data.get("user_id"), + # "user_nickname": message_data.get("user_nickname"), + # }, + # "response": response_set, + # "is_reference_reply": message_data.get("reply") is not None, # 判断是否为引用回复 + # } + # self.recent_replies.append(reply_info) + # # 保持最近回复历史在限定数量内 + # if len(self.recent_replies) > self.max_replies_history: + # self.recent_replies = self.recent_replies[-self.max_replies_history :] + # return response_set if response_set else False # 改为实例方法, 移除 chat 参数 @@ -677,34 +668,34 @@ class NormalChat: self._priority_chat_task = None raise - def _handle_task_completion(self, task: asyncio.Task, task_name: str = "unknown"): - """任务完成回调处理""" - try: - logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 完成回调被调用") + # def _handle_task_completion(self, task: asyncio.Task, task_name: str = "unknown"): + # """任务完成回调处理""" + # try: + # logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 完成回调被调用") - if task is self._chat_task: - self._chat_task = None - elif task is self._priority_chat_task: - self._priority_chat_task = None - else: - logger.debug(f"[{self.stream_name}] 回调的任务 '{task_name}' 不是当前管理的任务") - return + # if task is self._chat_task: + # self._chat_task = None + # elif task is self._priority_chat_task: + # self._priority_chat_task = None + # else: + # logger.debug(f"[{self.stream_name}] 回调的任务 '{task_name}' 不是当前管理的任务") + # return - logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 引用已清理") + # logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 引用已清理") - if task.cancelled(): - logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 已取消") - elif task.done(): - exc = task.exception() - if exc: - logger.error(f"[{self.stream_name}] 任务 '{task_name}' 异常: {type(exc).__name__}: {exc}", exc_info=exc) - else: - logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 正常完成") + # if task.cancelled(): + # logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 已取消") + # elif task.done(): + # exc = task.exception() + # if exc: + # logger.error(f"[{self.stream_name}] 任务 '{task_name}' 异常: {type(exc).__name__}: {exc}", exc_info=exc) + # else: + # logger.debug(f"[{self.stream_name}] 任务 '{task_name}' 正常完成") - except Exception as e: - logger.error(f"[{self.stream_name}] 任务完成回调处理出错: {e}") - self._chat_task = None - self._priority_chat_task = None + # except Exception as e: + # logger.error(f"[{self.stream_name}] 任务完成回调处理出错: {e}") + # self._chat_task = None + # self._priority_chat_task = None # 改为实例方法, 移除 stream_id 参数 async def stop_chat(self): @@ -721,156 +712,140 @@ class NormalChat: self._chat_task = None self._priority_chat_task = None - asyncio.create_task(self._cleanup_thinking_messages_async()) - async def _cleanup_thinking_messages_async(self): - """异步清理思考消息,避免阻塞主流程""" - try: - await asyncio.sleep(0.1) + # def adjust_reply_frequency(self): + # """ + # 根据预设规则动态调整回复意愿(willing_amplifier)。 + # - 评估周期:10分钟 + # - 目标频率:由 global_config.chat.talk_frequency 定义(例如 1条/分钟) + # - 调整逻辑: + # - 0条回复 -> 5.0x 意愿 + # - 达到目标回复数 -> 1.0x 意愿(基准) + # - 达到目标2倍回复数 -> 0.2x 意愿 + # - 中间值线性变化 + # - 增益抑制:如果最近5分钟回复过快,则不增加意愿。 + # """ + # # --- 1. 定义参数 --- + # evaluation_minutes = 10.0 + # target_replies_per_min = global_config.chat.get_current_talk_frequency( + # self.stream_id + # ) # 目标频率:e.g. 1条/分钟 + # target_replies_in_window = target_replies_per_min * evaluation_minutes # 10分钟内的目标回复数 - container = await message_manager.get_container(self.stream_id) - if container: - thinking_messages = [msg for msg in container.messages[:] if isinstance(msg, MessageThinking)] - if thinking_messages: - for msg in thinking_messages: - container.messages.remove(msg) - logger.info(f"[{self.stream_name}] 清理了 {len(thinking_messages)} 条未处理的思考消息。") - except Exception as e: - logger.error(f"[{self.stream_name}] 异步清理思考消息时出错: {e}") + # if target_replies_in_window <= 0: + # logger.debug(f"[{self.stream_name}] 目标回复频率为0或负数,不调整意愿放大器。") + # return - def adjust_reply_frequency(self): - """ - 根据预设规则动态调整回复意愿(willing_amplifier)。 - - 评估周期:10分钟 - - 目标频率:由 global_config.chat.talk_frequency 定义(例如 1条/分钟) - - 调整逻辑: - - 0条回复 -> 5.0x 意愿 - - 达到目标回复数 -> 1.0x 意愿(基准) - - 达到目标2倍回复数 -> 0.2x 意愿 - - 中间值线性变化 - - 增益抑制:如果最近5分钟回复过快,则不增加意愿。 - """ - # --- 1. 定义参数 --- - evaluation_minutes = 10.0 - target_replies_per_min = global_config.chat.get_current_talk_frequency( - self.stream_id - ) # 目标频率:e.g. 1条/分钟 - target_replies_in_window = target_replies_per_min * evaluation_minutes # 10分钟内的目标回复数 + # # --- 2. 获取近期统计数据 --- + # stats_10_min = get_recent_message_stats(minutes=evaluation_minutes, chat_id=self.stream_id) + # bot_reply_count_10_min = stats_10_min["bot_reply_count"] - if target_replies_in_window <= 0: - logger.debug(f"[{self.stream_name}] 目标回复频率为0或负数,不调整意愿放大器。") - return + # # --- 3. 计算新的意愿放大器 (willing_amplifier) --- + # # 基于回复数在 [0, target*2] 区间内进行分段线性映射 + # if bot_reply_count_10_min <= target_replies_in_window: + # # 在 [0, 目标数] 区间,意愿从 5.0 线性下降到 1.0 + # new_amplifier = 5.0 + (bot_reply_count_10_min - 0) * (1.0 - 5.0) / (target_replies_in_window - 0) + # elif bot_reply_count_10_min <= target_replies_in_window * 2: + # # 在 [目标数, 目标数*2] 区间,意愿从 1.0 线性下降到 0.2 + # over_target_cap = target_replies_in_window * 2 + # new_amplifier = 1.0 + (bot_reply_count_10_min - target_replies_in_window) * (0.2 - 1.0) / ( + # over_target_cap - target_replies_in_window + # ) + # else: + # # 超过目标数2倍,直接设为最小值 + # new_amplifier = 0.2 - # --- 2. 获取近期统计数据 --- - stats_10_min = get_recent_message_stats(minutes=evaluation_minutes, chat_id=self.stream_id) - bot_reply_count_10_min = stats_10_min["bot_reply_count"] + # # --- 4. 检查是否需要抑制增益 --- + # # "如果邻近5分钟内,回复数量 > 频率/2,就不再进行增益" + # suppress_gain = False + # if new_amplifier > self.willing_amplifier: # 仅在计算结果为增益时检查 + # suppression_minutes = 5.0 + # # 5分钟内目标回复数的一半 + # suppression_threshold = (target_replies_per_min / 2) * suppression_minutes # e.g., (1/2)*5 = 2.5 + # stats_5_min = get_recent_message_stats(minutes=suppression_minutes, chat_id=self.stream_id) + # bot_reply_count_5_min = stats_5_min["bot_reply_count"] - # --- 3. 计算新的意愿放大器 (willing_amplifier) --- - # 基于回复数在 [0, target*2] 区间内进行分段线性映射 - if bot_reply_count_10_min <= target_replies_in_window: - # 在 [0, 目标数] 区间,意愿从 5.0 线性下降到 1.0 - new_amplifier = 5.0 + (bot_reply_count_10_min - 0) * (1.0 - 5.0) / (target_replies_in_window - 0) - elif bot_reply_count_10_min <= target_replies_in_window * 2: - # 在 [目标数, 目标数*2] 区间,意愿从 1.0 线性下降到 0.2 - over_target_cap = target_replies_in_window * 2 - new_amplifier = 1.0 + (bot_reply_count_10_min - target_replies_in_window) * (0.2 - 1.0) / ( - over_target_cap - target_replies_in_window - ) - else: - # 超过目标数2倍,直接设为最小值 - new_amplifier = 0.2 + # if bot_reply_count_5_min > suppression_threshold: + # suppress_gain = True - # --- 4. 检查是否需要抑制增益 --- - # "如果邻近5分钟内,回复数量 > 频率/2,就不再进行增益" - suppress_gain = False - if new_amplifier > self.willing_amplifier: # 仅在计算结果为增益时检查 - suppression_minutes = 5.0 - # 5分钟内目标回复数的一半 - suppression_threshold = (target_replies_per_min / 2) * suppression_minutes # e.g., (1/2)*5 = 2.5 - stats_5_min = get_recent_message_stats(minutes=suppression_minutes, chat_id=self.stream_id) - bot_reply_count_5_min = stats_5_min["bot_reply_count"] + # # --- 5. 更新意愿放大器 --- + # if suppress_gain: + # logger.debug( + # f"[{self.stream_name}] 回复增益被抑制。最近5分钟内回复数 ({bot_reply_count_5_min}) " + # f"> 阈值 ({suppression_threshold:.1f})。意愿放大器保持在 {self.willing_amplifier:.2f}" + # ) + # # 不做任何改动 + # else: + # # 限制最终值在 [0.2, 5.0] 范围内 + # self.willing_amplifier = max(0.2, min(5.0, new_amplifier)) + # logger.debug( + # f"[{self.stream_name}] 调整回复意愿。10分钟内回复: {bot_reply_count_10_min} (目标: {target_replies_in_window:.0f}) -> " + # f"意愿放大器更新为: {self.willing_amplifier:.2f}" + # ) - if bot_reply_count_5_min > suppression_threshold: - suppress_gain = True + # async def _execute_action( + # self, action_type: str, action_data: dict, message_data: dict, thinking_id: str + # ) -> Optional[bool]: + # """执行具体的动作,只返回执行成功与否""" + # try: + # # 创建动作处理器实例 + # action_handler = self.action_manager.create_action( + # action_name=action_type, + # action_data=action_data, + # reasoning=action_data.get("reasoning", ""), + # cycle_timers={}, # normal_chat使用空的cycle_timers + # thinking_id=thinking_id, + # chat_stream=self.chat_stream, + # log_prefix=self.stream_name, + # shutting_down=self._disabled, + # ) - # --- 5. 更新意愿放大器 --- - if suppress_gain: - logger.debug( - f"[{self.stream_name}] 回复增益被抑制。最近5分钟内回复数 ({bot_reply_count_5_min}) " - f"> 阈值 ({suppression_threshold:.1f})。意愿放大器保持在 {self.willing_amplifier:.2f}" - ) - # 不做任何改动 - else: - # 限制最终值在 [0.2, 5.0] 范围内 - self.willing_amplifier = max(0.2, min(5.0, new_amplifier)) - logger.debug( - f"[{self.stream_name}] 调整回复意愿。10分钟内回复: {bot_reply_count_10_min} (目标: {target_replies_in_window:.0f}) -> " - f"意愿放大器更新为: {self.willing_amplifier:.2f}" - ) + # if action_handler: + # # 执行动作 + # result = await action_handler.handle_action() + # success = False - async def _execute_action( - self, action_type: str, action_data: dict, message_data: dict, thinking_id: str - ) -> Optional[bool]: - """执行具体的动作,只返回执行成功与否""" - try: - # 创建动作处理器实例 - action_handler = self.action_manager.create_action( - action_name=action_type, - action_data=action_data, - reasoning=action_data.get("reasoning", ""), - cycle_timers={}, # normal_chat使用空的cycle_timers - thinking_id=thinking_id, - chat_stream=self.chat_stream, - log_prefix=self.stream_name, - shutting_down=self._disabled, - ) + # if result and isinstance(result, tuple) and len(result) >= 2: + # # handle_action返回 (success: bool, message: str) + # success = result[0] + # elif result: + # # 如果返回了其他结果,假设成功 + # success = True - if action_handler: - # 执行动作 - result = await action_handler.handle_action() - success = False + # return success - if result and isinstance(result, tuple) and len(result) >= 2: - # handle_action返回 (success: bool, message: str) - success = result[0] - elif result: - # 如果返回了其他结果,假设成功 - success = True + # except Exception as e: + # logger.error(f"[{self.stream_name}] 执行动作 {action_type} 失败: {e}") - return success + # return False - except Exception as e: - logger.error(f"[{self.stream_name}] 执行动作 {action_type} 失败: {e}") + # def get_action_manager(self) -> ActionManager: + # """获取动作管理器实例""" + # return self.action_manager - return False + # def _get_fatigue_reply_multiplier(self) -> float: + # """获取疲劳期回复频率调整系数 - def get_action_manager(self) -> ActionManager: - """获取动作管理器实例""" - return self.action_manager + # Returns: + # float: 回复频率调整系数,范围0.5-1.0 + # """ + # if not self.get_cooldown_progress_callback: + # return 1.0 # 没有冷却进度回调,返回正常系数 - def _get_fatigue_reply_multiplier(self) -> float: - """获取疲劳期回复频率调整系数 + # try: + # cooldown_progress = self.get_cooldown_progress_callback() - Returns: - float: 回复频率调整系数,范围0.5-1.0 - """ - if not self.get_cooldown_progress_callback: - return 1.0 # 没有冷却进度回调,返回正常系数 + # if cooldown_progress >= 1.0: + # return 1.0 # 冷却完成,正常回复频率 - try: - cooldown_progress = self.get_cooldown_progress_callback() + # # 疲劳期间:从0.5逐渐恢复到1.0 + # # progress=0时系数为0.5,progress=1时系数为1.0 + # multiplier = 0.2 + (0.8 * cooldown_progress) - if cooldown_progress >= 1.0: - return 1.0 # 冷却完成,正常回复频率 - - # 疲劳期间:从0.5逐渐恢复到1.0 - # progress=0时系数为0.5,progress=1时系数为1.0 - multiplier = 0.2 + (0.8 * cooldown_progress) - - return multiplier - except Exception as e: - logger.warning(f"[{self.stream_name}] 获取疲劳调整系数时出错: {e}") - return 1.0 # 出错时返回正常系数 + # return multiplier + # except Exception as e: + # logger.warning(f"[{self.stream_name}] 获取疲劳调整系数时出错: {e}") + # return 1.0 # 出错时返回正常系数 async def _check_should_switch_to_focus(self) -> bool: """ @@ -907,42 +882,42 @@ class NormalChat: return should_switch - async def _cleanup_thinking_message_by_id(self, thinking_id: str): - """根据ID清理思考消息""" - try: - container = await message_manager.get_container(self.stream_id) - if container: - for msg in container.messages[:]: - if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: - container.messages.remove(msg) - logger.info(f"[{self.stream_name}] 已清理思考消息 {thinking_id}") - break - except Exception as e: - logger.error(f"[{self.stream_name}] 清理思考消息 {thinking_id} 时出错: {e}") + # async def _cleanup_thinking_message_by_id(self, thinking_id: str): + # """根据ID清理思考消息""" + # try: + # container = await message_manager.get_container(self.stream_id) + # if container: + # for msg in container.messages[:]: + # if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + # container.messages.remove(msg) + # logger.info(f"[{self.stream_name}] 已清理思考消息 {thinking_id}") + # break + # except Exception as e: + # logger.error(f"[{self.stream_name}] 清理思考消息 {thinking_id} 时出错: {e}") -def get_recent_message_stats(minutes: int = 30, chat_id: str = None) -> dict: - """ - Args: - minutes (int): 检索的分钟数,默认30分钟 - chat_id (str, optional): 指定的chat_id,仅统计该chat下的消息。为None时统计全部。 - Returns: - dict: {"bot_reply_count": int, "total_message_count": int} - """ +# def get_recent_message_stats(minutes: int = 30, chat_id: str = None) -> dict: +# """ +# Args: +# minutes (int): 检索的分钟数,默认30分钟 +# chat_id (str, optional): 指定的chat_id,仅统计该chat下的消息。为None时统计全部。 +# Returns: +# dict: {"bot_reply_count": int, "total_message_count": int} +# """ - now = time.time() - start_time = now - minutes * 60 - bot_id = global_config.bot.qq_account +# now = time.time() +# start_time = now - minutes * 60 +# bot_id = global_config.bot.qq_account - filter_base = {"time": {"$gte": start_time}} - if chat_id is not None: - filter_base["chat_id"] = chat_id +# filter_base = {"time": {"$gte": start_time}} +# if chat_id is not None: +# filter_base["chat_id"] = chat_id - # 总消息数 - total_message_count = count_messages(filter_base) - # bot自身回复数 - bot_filter = filter_base.copy() - bot_filter["user_id"] = bot_id - bot_reply_count = count_messages(bot_filter) +# # 总消息数 +# total_message_count = count_messages(filter_base) +# # bot自身回复数 +# bot_filter = filter_base.copy() +# bot_filter["user_id"] = bot_id +# bot_reply_count = count_messages(bot_filter) - return {"bot_reply_count": bot_reply_count, "total_message_count": total_message_count} +# return {"bot_reply_count": bot_reply_count, "total_message_count": total_message_count} diff --git a/src/chat/planner_actions/action_modifier.py b/src/chat/planner_actions/action_modifier.py index a2e0066cf..58be641e1 100644 --- a/src/chat/planner_actions/action_modifier.py +++ b/src/chat/planner_actions/action_modifier.py @@ -526,16 +526,24 @@ class ActionModifier: return removals - def get_available_actions_count(self) -> int: + def get_available_actions_count(self,mode:str = "focus") -> int: """获取当前可用动作数量(排除默认的no_action)""" - current_actions = self.action_manager.get_using_actions_for_mode("normal") + current_actions = self.action_manager.get_using_actions_for_mode(mode) # 排除no_action(如果存在) filtered_actions = {k: v for k, v in current_actions.items() if k != "no_action"} return len(filtered_actions) - - def should_skip_planning(self) -> bool: + + def should_skip_planning_for_no_reply(self) -> bool: """判断是否应该跳过规划过程""" - available_count = self.get_available_actions_count() + current_actions = self.action_manager.get_using_actions_for_mode("focus") + # 排除no_action(如果存在) + if len(current_actions) == 1 and "no_reply" in current_actions: + return True + return False + + def should_skip_planning_for_no_action(self) -> bool: + """判断是否应该跳过规划过程""" + available_count = self.action_manager.get_using_actions_for_mode("normal") if available_count == 0: logger.debug(f"{self.log_prefix} 没有可用动作,跳过规划") return True diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 760148d05..c088fd78c 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -64,20 +64,19 @@ def init_prompt(): class ActionPlanner: - def __init__(self, chat_id: str, action_manager: ActionManager, mode: str = "focus"): + def __init__(self, chat_id: str, action_manager: ActionManager): self.chat_id = chat_id self.log_prefix = f"[{get_chat_manager().get_stream_name(chat_id) or chat_id}]" - self.mode = mode self.action_manager = action_manager # LLM规划器配置 self.planner_llm = LLMRequest( model=global_config.model.planner, - request_type=f"{self.mode}.planner", # 用于动作规划 + request_type="planner", # 用于动作规划 ) self.last_obs_time_mark = 0.0 - async def plan(self) -> Dict[str, Any]: + async def plan(self,mode: str = "focus") -> Dict[str, Any]: """ 规划器 (Planner): 使用LLM根据上下文决定做出什么动作。 """ @@ -92,7 +91,7 @@ class ActionPlanner: is_group_chat, chat_target_info = get_chat_type_and_target_info(self.chat_id) logger.debug(f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}") - current_available_actions_dict = self.action_manager.get_using_actions_for_mode(self.mode) + current_available_actions_dict = self.action_manager.get_using_actions_for_mode(mode) # 获取完整的动作信息 all_registered_actions = self.action_manager.get_registered_actions() @@ -122,6 +121,7 @@ class ActionPlanner: is_group_chat=is_group_chat, # <-- Pass HFC state chat_target_info=chat_target_info, # <-- 传递获取到的聊天目标信息 current_available_actions=current_available_actions, # <-- Pass determined actions + mode=mode, ) # --- 调用 LLM (普通文本生成) --- @@ -215,6 +215,7 @@ class ActionPlanner: is_group_chat: bool, # Now passed as argument chat_target_info: Optional[dict], # Now passed as argument current_available_actions, + mode: str = "focus", ) -> str: """构建 Planner LLM 的提示词 (获取模板并填充数据)""" try: @@ -244,7 +245,7 @@ class ActionPlanner: self.last_obs_time_mark = time.time() - if self.mode == "focus": + if mode == "focus": by_what = "聊天内容" no_action_block = "" else: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 846112305..627bcc69f 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -136,33 +136,6 @@ class DefaultReplyer: selected_config = random.choices(population=configs, weights=weights, k=1)[0] return selected_config - async def _create_thinking_message(self, anchor_message: Optional[MessageRecv], thinking_id: str): - """创建思考消息 (尝试锚定到 anchor_message)""" - if not anchor_message or not anchor_message.chat_stream: - logger.error(f"{self.log_prefix} 无法创建思考消息,缺少有效的锚点消息或聊天流。") - return None - - chat = anchor_message.chat_stream - messageinfo = anchor_message.message_info - thinking_time_point = parse_thinking_id_to_timestamp(thinking_id) - bot_user_info = UserInfo( - user_id=global_config.bot.qq_account, - user_nickname=global_config.bot.nickname, - platform=messageinfo.platform, - ) - - thinking_message = MessageThinking( - message_id=thinking_id, - chat_stream=chat, - bot_user_info=bot_user_info, - reply=anchor_message, # 回复的是锚点消息 - thinking_start_time=thinking_time_point, - ) - # logger.debug(f"创建思考消息thinking_message:{thinking_message}") - - await self.heart_fc_sender.register_thinking(thinking_message) - return None - async def generate_reply_with_context( self, reply_data: Dict[str, Any] = None, @@ -812,108 +785,6 @@ class DefaultReplyer: return prompt - async def send_response_messages( - self, - anchor_message: Optional[MessageRecv], - response_set: List[Tuple[str, str]], - thinking_id: str = "", - display_message: str = "", - ) -> Optional[MessageSending]: - """发送回复消息 (尝试锚定到 anchor_message),使用 HeartFCSender""" - chat = self.chat_stream - chat_id = self.chat_stream.stream_id - if chat is None: - logger.error(f"{self.log_prefix} 无法发送回复,chat_stream 为空。") - return None - if not anchor_message: - logger.error(f"{self.log_prefix} 无法发送回复,anchor_message 为空。") - return None - - stream_name = get_chat_manager().get_stream_name(chat_id) or chat_id # 获取流名称用于日志 - - # 检查思考过程是否仍在进行,并获取开始时间 - if thinking_id: - # print(f"thinking_id: {thinking_id}") - thinking_start_time = await self.heart_fc_sender.get_thinking_start_time(chat_id, thinking_id) - else: - print("thinking_id is None") - # thinking_id = "ds" + str(round(time.time(), 2)) - thinking_start_time = time.time() - - if thinking_start_time is None: - logger.error(f"[{stream_name}]replyer思考过程未找到或已结束,无法发送回复。") - return None - - mark_head = False - # first_bot_msg: Optional[MessageSending] = None - reply_message_ids = [] # 记录实际发送的消息ID - - sent_msg_list = [] - - for i, msg_text in enumerate(response_set): - # 为每个消息片段生成唯一ID - type = msg_text[0] - data = msg_text[1] - - if global_config.debug.debug_show_chat_mode and type == "text": - data += "ᶠ" - - part_message_id = f"{thinking_id}_{i}" - message_segment = Seg(type=type, data=data) - - if type == "emoji": - is_emoji = True - else: - is_emoji = False - reply_to = not mark_head - - bot_message: MessageSending = await self._build_single_sending_message( - anchor_message=anchor_message, - message_id=part_message_id, - message_segment=message_segment, - display_message=display_message, - reply_to=reply_to, - is_emoji=is_emoji, - thinking_id=thinking_id, - thinking_start_time=thinking_start_time, - ) - - try: - if ( - bot_message.is_private_message() - or bot_message.reply.processed_plain_text != "[System Trigger Context]" - or mark_head - ): - set_reply = False - else: - set_reply = True - - if not mark_head: - mark_head = True - typing = False - else: - typing = True - - sent_msg = await self.heart_fc_sender.send_message(bot_message, typing=typing, set_reply=set_reply) - - reply_message_ids.append(part_message_id) # 记录我们生成的ID - - sent_msg_list.append((type, sent_msg)) - - except Exception as e: - logger.error(f"{self.log_prefix}发送回复片段 {i} ({part_message_id}) 时失败: {e}") - traceback.print_exc() - # 这里可以选择是继续发送下一个片段还是中止 - - # 在尝试发送完所有片段后,完成原始的 thinking_id 状态 - try: - await self.heart_fc_sender.complete_thinking(chat_id, thinking_id) - - except Exception as e: - logger.error(f"{self.log_prefix}完成思考状态 {thinking_id} 时出错: {e}") - - return sent_msg_list - async def _build_single_sending_message( self, message_id: str, diff --git a/src/chat/normal_chat/willing/mode_classical.py b/src/chat/willing/mode_classical.py similarity index 100% rename from src/chat/normal_chat/willing/mode_classical.py rename to src/chat/willing/mode_classical.py diff --git a/src/chat/normal_chat/willing/mode_custom.py b/src/chat/willing/mode_custom.py similarity index 100% rename from src/chat/normal_chat/willing/mode_custom.py rename to src/chat/willing/mode_custom.py diff --git a/src/chat/normal_chat/willing/mode_mxp.py b/src/chat/willing/mode_mxp.py similarity index 100% rename from src/chat/normal_chat/willing/mode_mxp.py rename to src/chat/willing/mode_mxp.py diff --git a/src/chat/normal_chat/willing/willing_manager.py b/src/chat/willing/willing_manager.py similarity index 100% rename from src/chat/normal_chat/willing/willing_manager.py rename to src/chat/willing/willing_manager.py diff --git a/src/main.py b/src/main.py index bd9005394..ec15f76de 100644 --- a/src/main.py +++ b/src/main.py @@ -7,7 +7,7 @@ from src.common.remote import TelemetryHeartBeatTask from src.manager.async_task_manager import async_task_manager from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask from src.chat.emoji_system.emoji_manager import get_emoji_manager -from src.chat.normal_chat.willing.willing_manager import get_willing_manager +from src.chat.willing.willing_manager import get_willing_manager from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.message_receive.normal_message_sender import message_manager from src.chat.message_receive.storage import MessageStorage diff --git a/src/plugins/built_in/core_actions/no_reply.py b/src/plugins/built_in/core_actions/no_reply.py index fa5a2fafe..063175490 100644 --- a/src/plugins/built_in/core_actions/no_reply.py +++ b/src/plugins/built_in/core_actions/no_reply.py @@ -11,8 +11,6 @@ from src.common.logger import get_logger # 导入API模块 - 标准Python包方式 from src.plugin_system.apis import message_api from src.config.config import global_config -from src.chat.memory_system.Hippocampus import hippocampus_manager -import math logger = get_logger("core_actions")