This commit is contained in:
Windpicker-owo
2025-09-06 00:11:31 +08:00
44 changed files with 965 additions and 1257 deletions

View File

@@ -5,7 +5,6 @@ import math
import random
from typing import Optional, Dict, Any, Tuple
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.timer_calculator import Timer
from src.common.logger import get_logger
from src.config.config import global_config
@@ -32,7 +31,7 @@ class CycleProcessor:
context: HFC聊天上下文对象包含聊天流、能量值等信息
response_handler: 响应处理器,负责生成和发送回复
cycle_tracker: 循环跟踪器,负责记录和管理每次思考循环的信息
"""
"""
self.context = context
self.response_handler = response_handler
self.cycle_tracker = cycle_tracker
@@ -57,12 +56,12 @@ class CycleProcessor:
# 存储reply action信息
person_info_manager = get_person_info_manager()
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
platform = action_message.get("chat_info_platform")
if platform is None:
platform = getattr(self.context.chat_stream, "platform", "unknown")
person_id = person_info_manager.get_person_id(
platform,
action_message.get("user_id", ""),
@@ -94,8 +93,8 @@ class CycleProcessor:
}
return loop_info, reply_text, cycle_timers
async def observe(self,interest_value:float = 0.0) -> bool:
async def observe(self, interest_value: float = 0.0) -> str:
"""
观察和处理单次思考循环的核心方法
@@ -114,7 +113,7 @@ class CycleProcessor:
"""
action_type = "no_action"
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
# 使用sigmoid函数将interest_value转换为概率
# 当interest_value为0时概率接近0使用Focus模式
# 当interest_value很高时概率接近1使用Normal模式
@@ -127,16 +126,24 @@ class CycleProcessor:
k = 2.0 # 控制曲线陡峭程度
x0 = 1.0 # 控制曲线中心点
return 1.0 / (1.0 + math.exp(-k * (interest_val - x0)))
normal_mode_probability = calculate_normal_mode_probability(interest_value) * 0.5 / global_config.chat.get_current_talk_frequency(self.context.stream_id)
normal_mode_probability = (
calculate_normal_mode_probability(interest_value)
* 0.5
/ global_config.chat.get_current_talk_frequency(self.context.stream_id)
)
# 根据概率决定使用哪种模式
if random.random() < normal_mode_probability:
mode = ChatMode.NORMAL
logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Normal planner模式")
logger.info(
f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Normal planner模式"
)
else:
mode = ChatMode.FOCUS
logger.info(f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Focus planner模式")
logger.info(
f"{self.log_prefix} 基于兴趣值 {interest_value:.2f},概率 {normal_mode_probability:.2f}选择Focus planner模式"
)
cycle_timers, thinking_id = self.cycle_tracker.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self.context.cycle_counter}次思考")
@@ -165,12 +172,14 @@ class CycleProcessor:
from src.plugin_system.core.event_manager import event_manager
from src.plugin_system import EventType
result = await event_manager.trigger_event(EventType.ON_PLAN,plugin_name="SYSTEM", stream_id=self.context.chat_stream)
result = await event_manager.trigger_event(
EventType.ON_PLAN, plugin_name="SYSTEM", stream_id=self.context.chat_stream
)
if not result.all_continue_process():
raise UserWarning(f"插件{result.get_summary().get('stopped_handlers', '')}于规划前中断了内容生成")
with Timer("规划器", cycle_timers):
actions, _= await self.action_planner.plan(
actions, _ = await self.action_planner.plan(
mode=mode,
loop_start_time=loop_start_time,
available_actions=available_actions,
@@ -183,7 +192,7 @@ class CycleProcessor:
# 直接处理no_reply逻辑不再通过动作系统
reason = action_info.get("reasoning", "选择不回复")
logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}")
# 存储no_reply信息到数据库
await database_api.store_action_info(
chat_stream=self.context.chat_stream,
@@ -194,13 +203,8 @@ class CycleProcessor:
action_data={"reason": reason},
action_name="no_reply",
)
return {
"action_type": "no_reply",
"success": True,
"reply_text": "",
"command": ""
}
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
elif action_info["action_type"] != "reply":
# 执行普通动作
with Timer("动作执行", cycle_timers):
@@ -210,40 +214,32 @@ class CycleProcessor:
action_info["action_data"],
cycle_timers,
thinking_id,
action_info["action_message"]
action_info["action_message"],
)
return {
"action_type": action_info["action_type"],
"success": success,
"reply_text": reply_text,
"command": command
"command": command,
}
else:
try:
success, response_set, _ = await generator_api.generate_reply(
chat_stream=self.context.chat_stream,
reply_message = action_info["action_message"],
reply_message=action_info["action_message"],
available_actions=available_actions,
enable_tool=global_config.tool.enable_tool,
request_type="chat.replyer",
from_plugin=False,
)
)
if not success or not response_set:
logger.info(f"{action_info['action_message'].get('processed_plain_text')} 的回复生成失败")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None
}
logger.info(
f"{action_info['action_message'].get('processed_plain_text')} 的回复生成失败"
)
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None
}
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
loop_info, reply_text, cycle_timers_reply = await self._send_and_store_reply(
response_set,
@@ -253,12 +249,7 @@ class CycleProcessor:
thinking_id,
actions,
)
return {
"action_type": "reply",
"success": True,
"reply_text": reply_text,
"loop_info": loop_info
}
return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info}
except Exception as e:
logger.error(f"{self.log_prefix} 执行动作时出错: {e}")
logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}")
@@ -267,9 +258,9 @@ class CycleProcessor:
"success": False,
"reply_text": "",
"loop_info": None,
"error": str(e)
"error": str(e),
}
# 创建所有动作的后台任务
action_tasks = [asyncio.create_task(execute_action(action)) for action in actions]
@@ -282,12 +273,12 @@ class CycleProcessor:
action_success = False
action_reply_text = ""
action_command = ""
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
continue
action_info = actions[i]
if result["action_type"] != "reply":
action_success = result["success"]
@@ -327,7 +318,7 @@ class CycleProcessor:
},
}
reply_text = action_reply_text
if ENABLE_S4U:
await stop_typing()
@@ -335,226 +326,7 @@ class CycleProcessor:
self.context.chat_instance.cycle_tracker.print_cycle_info(cycle_timers)
action_type = actions[0]["action_type"] if actions else "no_action"
# 管理no_reply计数器当执行了非no_reply动作时重置计数器
if action_type != "no_reply":
# no_reply逻辑已集成到heartFC_chat.py中直接重置计数器
self.context.chat_instance.recent_interest_records.clear()
self.context.no_reply_consecutive = 0
logger.debug(f"{self.log_prefix} 执行了{action_type}动作重置no_reply计数器")
return True
if action_type == "no_reply":
self.context.no_reply_consecutive += 1
self.context.chat_instance._determine_form_type()
# 在一轮动作执行完毕后,增加睡眠压力
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
if action_type not in ["no_reply", "no_action"]:
self.context.energy_manager.increase_sleep_pressure()
return True
async def execute_plan(self, action_result: Dict[str, Any], target_message: Optional[Dict[str, Any]]):
"""
执行一个已经制定好的计划
"""
action_type = action_result.get("action_type", "error")
# 这里我们需要为执行计划创建一个新的循环追踪
cycle_timers, thinking_id = self.cycle_tracker.start_cycle(is_proactive=True)
loop_start_time = time.time()
if action_type == "reply":
# 主动思考不应该直接触发简单回复但为了逻辑完整性我们假设它会调用response_handler
# 注意:这里的 available_actions 和 plan_result 是缺失的,需要根据实际情况处理
await self._handle_reply_action(
target_message, {}, None, loop_start_time, cycle_timers, thinking_id, {"action_result": action_result}
)
else:
await self._handle_other_actions(
action_type,
action_result.get("reasoning", ""),
action_result.get("action_data", {}),
action_result.get("is_parallel", False),
None,
target_message,
cycle_timers,
thinking_id,
{"action_result": action_result},
loop_start_time,
)
async def _handle_reply_action(
self, message_data, available_actions, gen_task, loop_start_time, cycle_timers, thinking_id, plan_result
):
"""
处理回复类型的动作
Args:
message_data: 消息数据
available_actions: 可用动作列表
gen_task: 预先创建的生成任务可能为None
loop_start_time: 循环开始时间
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
功能说明:
- 根据聊天模式决定是否使用预生成的回复或实时生成
- 在NORMAL模式下使用异步生成提高效率
- 在FOCUS模式下同步生成确保及时响应
- 发送生成的回复并结束循环
"""
# 初始化reply_to_str以避免UnboundLocalError
reply_to_str = None
if self.context.loop_mode == ChatMode.NORMAL:
if not gen_task:
reply_to_str = await self._build_reply_to_str(message_data)
gen_task = asyncio.create_task(
self.response_handler.generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.normal",
)
)
else:
# 如果gen_task已存在但reply_to_str还未构建需要构建它
if reply_to_str is None:
reply_to_str = await self._build_reply_to_str(message_data)
try:
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
except asyncio.TimeoutError:
response_set = None
else:
reply_to_str = await self._build_reply_to_str(message_data)
response_set = await self.response_handler.generate_response(
message_data=message_data,
available_actions=available_actions,
reply_to=reply_to_str,
request_type="chat.replyer.focus",
)
if response_set:
loop_info, _, _ = await self.response_handler.generate_and_send_reply(
response_set, reply_to_str, loop_start_time, message_data, cycle_timers, thinking_id, plan_result
)
self.cycle_tracker.end_cycle(loop_info, cycle_timers)
async def _handle_other_actions(
self,
action_type,
reasoning,
action_data,
is_parallel,
gen_task,
action_message,
cycle_timers,
thinking_id,
plan_result,
loop_start_time,
):
"""
处理非回复类型的动作如no_reply、自定义动作等
Args:
action_type: 动作类型
reasoning: 动作理由
action_data: 动作数据
is_parallel: 是否并行执行
gen_task: 生成任务
action_message: 动作消息
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
loop_start_time: 循环开始时间
功能说明:
- 在NORMAL模式下可能并行执行回复生成和动作处理
- 等待所有异步任务完成
- 整合回复和动作的执行结果
- 构建最终循环信息并结束循环
"""
background_reply_task = None
if self.context.loop_mode == ChatMode.NORMAL and is_parallel and gen_task:
background_reply_task = asyncio.create_task(
self._handle_parallel_reply(
gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
)
)
background_action_task = asyncio.create_task(
self._handle_action(action_type, reasoning, action_data, cycle_timers, thinking_id, action_message)
)
reply_loop_info, action_success, action_reply_text, action_command = None, False, "", ""
if background_reply_task:
results = await asyncio.gather(background_reply_task, background_action_task, return_exceptions=True)
reply_result, action_result_val = results
if not isinstance(reply_result, BaseException) and reply_result is not None:
reply_loop_info, _, _ = reply_result
else:
reply_loop_info = None
if not isinstance(action_result_val, BaseException) and action_result_val is not None:
action_success, action_reply_text, action_command = action_result_val
else:
action_success, action_reply_text, action_command = False, "", ""
else:
results = await asyncio.gather(background_action_task, return_exceptions=True)
if results and len(results) > 0:
action_result_val = results[0] # Get the actual result from the tuple
else:
action_result_val = (False, "", "")
if not isinstance(action_result_val, BaseException) and action_result_val is not None:
action_success, action_reply_text, action_command = action_result_val
else:
action_success, action_reply_text, action_command = False, "", ""
loop_info = self._build_final_loop_info(
reply_loop_info, action_success, action_reply_text, action_command, plan_result
)
self.cycle_tracker.end_cycle(loop_info, cycle_timers)
async def _handle_parallel_reply(
self, gen_task, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
):
"""
处理并行回复生成
Args:
gen_task: 回复生成任务
loop_start_time: 循环开始时间
action_message: 动作消息
cycle_timers: 循环计时器
thinking_id: 思考ID
plan_result: 规划结果
Returns:
tuple: (循环信息, 回复文本, 计时器信息) 或 None
功能说明:
- 等待并行回复生成任务完成(带超时)
- 构建回复目标字符串
- 发送生成的回复
- 返回循环信息供上级方法使用
"""
try:
response_set = await asyncio.wait_for(gen_task, timeout=global_config.chat.thinking_timeout)
except asyncio.TimeoutError:
return None, "", {}
if not response_set:
return None, "", {}
reply_to_str = await self._build_reply_to_str(action_message)
return await self.response_handler.generate_and_send_reply(
response_set, reply_to_str, loop_start_time, action_message, cycle_timers, thinking_id, plan_result
)
return action_type
async def _handle_action(
self, action, reasoning, action_data, cycle_timers, thinking_id, action_message
@@ -603,12 +375,12 @@ class CycleProcessor:
if "reply" in available_actions:
fallback_action = "reply"
elif available_actions:
fallback_action = list(available_actions.keys())[0]
fallback_action = list(available_actions.keys())
if fallback_action and fallback_action != action:
logger.info(f"{self.context.log_prefix} 使用回退动作: {fallback_action}")
action_handler = self.context.action_manager.create_action(
action_name=fallback_action,
action_name=fallback_action if isinstance(fallback_action, list) else fallback_action,
action_data=action_data,
reasoning=f"原动作'{action}'不可用,自动回退。{reasoning}",
cycle_timers=cycle_timers,
@@ -628,43 +400,3 @@ class CycleProcessor:
logger.error(f"{self.context.log_prefix} 处理{action}时出错: {e}")
traceback.print_exc()
return False, "", ""
def _build_final_loop_info(self, reply_loop_info, action_success, action_reply_text, action_command, plan_result):
"""
构建最终的循环信息
Args:
reply_loop_info: 回复循环信息可能为None
action_success: 动作执行是否成功
action_reply_text: 动作回复文本
action_command: 动作命令
plan_result: 规划结果
Returns:
dict: 完整的循环信息,包含规划信息和动作信息
功能说明:
- 如果有回复循环信息,则在其基础上添加动作信息
- 如果没有回复信息,则创建新的循环信息结构
- 整合所有执行结果供循环跟踪器记录
"""
if reply_loop_info:
loop_info = reply_loop_info
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
)
else:
loop_info = {
"loop_plan_info": {"action_result": plan_result.get("action_result", {})},
"loop_action_info": {
"action_taken": action_success,
"reply_text": action_reply_text,
"command": action_command,
"taken_time": time.time(),
},
}
return loop_info

View File

@@ -91,25 +91,24 @@ class CycleTracker:
# 获取动作类型,兼容新旧格式
action_type = "未知动作"
if hasattr(self, '_current_cycle_detail') and self._current_cycle_detail:
if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail:
loop_plan_info = self._current_cycle_detail.loop_plan_info
if isinstance(loop_plan_info, dict):
action_result = loop_plan_info.get('action_result', {})
action_result = loop_plan_info.get("action_result", {})
if isinstance(action_result, dict):
# 旧格式action_result是字典
action_type = action_result.get('action_type', '未知动作')
action_type = action_result.get("action_type", "未知动作")
elif isinstance(action_result, list) and action_result:
# 新格式action_result是actions列表
action_type = action_result[0].get('action_type', '未知动作')
action_type = action_result[0].get("action_type", "未知动作")
elif isinstance(loop_plan_info, list) and loop_plan_info:
# 直接是actions列表的情况
action_type = loop_plan_info[0].get('action_type', '未知动作')
action_type = loop_plan_info[0].get("action_type", "未知动作")
if self.context.current_cycle_detail.end_time and self.context.current_cycle_detail.start_time:
duration = self.context.current_cycle_detail.end_time - self.context.current_cycle_detail.start_time
logger.info(
f"{self.context.log_prefix}{self.context.current_cycle_detail.cycle_id}次思考,"
f"耗时: {duration:.1f}秒, "
f"选择动作: {action_type}"
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)

View File

@@ -3,7 +3,6 @@ import time
from typing import Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.base.component_types import ChatMode
from .hfc_context import HfcContext
from src.schedule.schedule_manager import schedule_manager

View File

@@ -2,24 +2,24 @@ import asyncio
import time
import traceback
import random
from typing import Optional, List, Dict, Any, Tuple
from typing import Optional, List, Dict, Any
from collections import deque
from src.common.logger import get_logger
from src.config.config import global_config
from src.person_info.relationship_builder_manager import relationship_builder_manager
from src.chat.express.expression_learner import expression_learner_manager
from src.plugin_system.base.component_types import ChatMode
from src.schedule.schedule_manager import schedule_manager, SleepState
from src.plugin_system.apis import message_api
from .hfc_context import HfcContext
from .energy_manager import EnergyManager
from .proactive_thinker import ProactiveThinker
from .proactive.proactive_thinker import ProactiveThinker
from .cycle_processor import CycleProcessor
from .response_handler import ResponseHandler
from .cycle_tracker import CycleTracker
from .wakeup_manager import WakeUpManager
from .sleep_manager.wakeup_manager import WakeUpManager
from .proactive.events import ProactiveTriggerEvent
logger = get_logger("hfc")
@@ -54,7 +54,8 @@ class HeartFChatting:
self.context.chat_instance = self
self._loop_task: Optional[asyncio.Task] = None
self._proactive_monitor_task: Optional[asyncio.Task] = None
# 记录最近3次的兴趣度
self.recent_interest_records: deque = deque(maxlen=3)
self._initialize_chat_mode()
@@ -93,8 +94,12 @@ class HeartFChatting:
self.context.relationship_builder = relationship_builder_manager.get_or_create_builder(self.context.stream_id)
self.context.expression_learner = expression_learner_manager.get_expression_learner(self.context.stream_id)
#await self.energy_manager.start()
await self.proactive_thinker.start()
# 启动主动思考监视器
if global_config.chat.enable_proactive_thinking:
self._proactive_monitor_task = asyncio.create_task(self._proactive_monitor_loop())
self._proactive_monitor_task.add_done_callback(self._handle_proactive_monitor_completion)
logger.info(f"{self.context.log_prefix} 主动思考监视器已启动")
await self.wakeup_manager.start()
self._loop_task = asyncio.create_task(self._main_chat_loop())
@@ -116,8 +121,12 @@ class HeartFChatting:
return
self.context.running = False
#await self.energy_manager.stop()
await self.proactive_thinker.stop()
# 停止主动思考监视器
if self._proactive_monitor_task and not self._proactive_monitor_task.done():
self._proactive_monitor_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} 主动思考监视器已停止")
await self.wakeup_manager.stop()
if self._loop_task and not self._loop_task.done():
@@ -147,6 +156,151 @@ class HeartFChatting:
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} HeartFChatting: 结束了聊天")
def _handle_proactive_monitor_completion(self, task: asyncio.Task):
"""
处理主动思考监视器任务完成
Args:
task: 完成的异步任务对象
功能说明:
- 处理任务异常完成的情况
- 记录任务正常结束或被取消的日志
"""
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} 主动思考监视器异常: {exception}")
else:
logger.info(f"{self.context.log_prefix} 主动思考监视器正常结束")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 主动思考监视器被取消")
async def _proactive_monitor_loop(self):
"""
主动思考监视器循环
功能说明:
- 定期检查是否需要进行主动思考
- 计算聊天沉默时间,并与动态思考间隔比较
- 当沉默时间超过阈值时,触发主动思考
- 处理思考过程中的异常
"""
while self.context.running:
await asyncio.sleep(15)
if not self._should_enable_proactive_thinking():
continue
current_time = time.time()
silence_duration = current_time - self.context.last_message_time
target_interval = self._get_dynamic_thinking_interval()
if silence_duration >= target_interval:
try:
formatted_time = self._format_duration(silence_duration)
event = ProactiveTriggerEvent(
source="silence_monitor",
reason=f"聊天已沉默 {formatted_time}",
metadata={"silence_duration": silence_duration},
)
await self.proactive_thinker.think(event)
self.context.last_message_time = current_time
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考触发执行出错: {e}")
logger.error(traceback.format_exc())
def _should_enable_proactive_thinking(self) -> bool:
"""
判断是否应启用主动思考
Returns:
bool: 如果应启用主动思考则返回True否则返回False
功能说明:
- 检查全局配置和特定聊天设置
- 支持按群聊和私聊分别配置
- 支持白名单模式,只在特定聊天中启用
"""
if not self.context.chat_stream:
return False
is_group_chat = self.context.chat_stream.group_info is not None
if is_group_chat and not global_config.chat.proactive_thinking_in_group:
return False
if not is_group_chat and not global_config.chat.proactive_thinking_in_private:
return False
stream_parts = self.context.stream_id.split(":")
current_chat_identifier = f"{stream_parts}:{stream_parts}" if len(stream_parts) >= 2 else self.context.stream_id
enable_list = getattr(
global_config.chat,
"proactive_thinking_enable_in_groups" if is_group_chat else "proactive_thinking_enable_in_private",
[],
)
return not enable_list or current_chat_identifier in enable_list
def _get_dynamic_thinking_interval(self) -> float:
"""
获取动态思考间隔时间
Returns:
float: 思考间隔秒数
功能说明:
- 尝试从timing_utils导入正态分布间隔函数
- 根据配置计算动态间隔,增加随机性
- 在无法导入或计算出错时,回退到固定的间隔
"""
try:
from src.utils.timing_utils import get_normal_distributed_interval
base_interval = global_config.chat.proactive_thinking_interval
delta_sigma = getattr(global_config.chat, "delta_sigma", 120)
if base_interval <= 0:
base_interval = abs(base_interval)
if delta_sigma < 0:
delta_sigma = abs(delta_sigma)
if base_interval == 0 and delta_sigma == 0:
return 300
if delta_sigma == 0:
return base_interval
sigma_percentage = delta_sigma / base_interval if base_interval > 0 else delta_sigma / 1000
return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True)
except ImportError:
logger.warning(f"{self.context.log_prefix} timing_utils不可用使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
except Exception as e:
logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
def _format_duration(self, seconds: float) -> str:
"""
格式化时长为可读字符串
Args:
seconds: 时长秒数
Returns:
str: 格式化后的字符串 (例如 "1小时2分3秒")
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if hours > 0:
parts.append(f"{hours}小时")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
async def _main_chat_loop(self):
"""
主聊天循环
@@ -238,30 +392,36 @@ class HeartFChatting:
logger.info(f"{self.context.log_prefix} 从睡眠中被唤醒,将处理积压的消息。")
# 根据聊天模式处理新消息
# 统一使用 _should_process_messages 判断是否应该处理
should_process,interest_value = await self._should_process_messages(recent_messages if has_new_messages else None)
if should_process:
self.context.last_read_time = time.time()
await self.cycle_processor.observe(interest_value = interest_value)
else:
# Normal模式消息数量不足等待
should_process, interest_value = await self._should_process_messages(recent_messages)
if not should_process:
# 消息数量不足或兴趣不够,等待
await asyncio.sleep(0.5)
return True
if not await self._should_process_messages(recent_messages if has_new_messages else None):
return has_new_messages
# 处理新消息
for message in recent_messages:
await self.cycle_processor.observe(interest_value = interest_value)
return True # Skip rest of the logic for this iteration
# Messages should be processed
action_type = await self.cycle_processor.observe(interest_value=interest_value)
# 管理no_reply计数器
if action_type != "no_reply":
self.recent_interest_records.clear()
self.context.no_reply_consecutive = 0
logger.debug(f"{self.context.log_prefix} 执行了{action_type}动作重置no_reply计数器")
else: # action_type == "no_reply"
self.context.no_reply_consecutive += 1
self._determine_form_type()
# 在一轮动作执行完毕后,增加睡眠压力
if self.context.energy_manager and global_config.sleep_system.enable_insomnia_system:
if action_type not in ["no_reply", "no_action"]:
self.context.energy_manager.increase_sleep_pressure()
# 如果成功观察,增加能量值并重置累积兴趣值
if has_new_messages:
self.context.energy_value += 1 / global_config.chat.focus_value
# 重置累积兴趣值,因为消息已经被成功处理
self.context.breaking_accumulated_interest = 0.0
logger.info(f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值")
self.context.energy_value += 1 / global_config.chat.focus_value
# 重置累积兴趣值,因为消息已经被成功处理
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 能量值增加,当前能量值:{self.context.energy_value:.1f},重置累积兴趣值"
)
# 更新上一帧的睡眠状态
self.context.was_sleeping = is_sleeping
@@ -282,7 +442,6 @@ class HeartFChatting:
return has_new_messages
def _handle_wakeup_messages(self, messages):
"""
处理休眠状态下的消息,累积唤醒度
@@ -321,73 +480,83 @@ class HeartFChatting:
def _determine_form_type(self) -> str:
"""判断使用哪种形式的no_reply"""
# 检查是否启用breaking模式
if not global_config.chat.enable_breaking_mode:
if not getattr(global_config.chat, "enable_breaking_mode", False):
logger.info(f"{self.context.log_prefix} breaking模式已禁用使用waiting形式")
self.context.focus_energy = 1
return
return "waiting"
# 如果连续no_reply次数少于3次使用waiting形式
if self.context.no_reply_consecutive <= 3:
self.context.focus_energy = 1
return "waiting"
else:
# 使用累积兴趣值而不是最近3次的记录
total_interest = self.context.breaking_accumulated_interest
# 计算调整后的阈值
adjusted_threshold = 1 / global_config.chat.get_current_talk_frequency(self.context.stream_id)
logger.info(f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}")
logger.info(
f"{self.context.log_prefix} 累积兴趣值: {total_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}"
)
# 如果累积兴趣值小于阈值进入breaking形式
if total_interest < adjusted_threshold:
logger.info(f"{self.context.log_prefix} 累积兴趣度不足进入breaking形式")
self.context.focus_energy = random.randint(3, 6)
return "breaking"
else:
logger.info(f"{self.context.log_prefix} 累积兴趣度充足使用waiting形式")
self.context.focus_energy = 1
return "waiting"
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool,float]:
async def _should_process_messages(self, new_message: List[Dict[str, Any]]) -> tuple[bool, float]:
"""
统一判断是否应该处理消息的函数
根据当前循环模式和消息内容决定是否继续处理
"""
if not new_message:
return False, 0.0
new_message_count = len(new_message)
talk_frequency = global_config.chat.get_current_talk_frequency(self.context.chat_stream.stream_id)
talk_frequency = global_config.chat.get_current_talk_frequency(self.context.stream_id)
modified_exit_count_threshold = self.context.focus_energy * 0.5 / talk_frequency
modified_exit_interest_threshold = 1.5 / talk_frequency
# 计算当前批次消息的兴趣值
batch_interest = 0.0
for msg_dict in new_message:
interest_value = msg_dict.get("interest_value", 0.0)
if msg_dict.get("processed_plain_text", ""):
batch_interest += interest_value
# 在breaking形式下累积所有消息的兴趣值
if new_message_count > 0:
self.context.breaking_accumulated_interest += batch_interest
total_interest = self.context.breaking_accumulated_interest
else:
total_interest = self.context.breaking_accumulated_interest
if new_message_count >= modified_exit_count_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
# 重置累积兴趣值,因为已经达到了消息数量阈值
self.context.breaking_accumulated_interest = 0.0
logger.info(
f"{self.context.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待,累积兴趣值: {total_interest:.2f}"
)
return True,total_interest/new_message_count
return True, total_interest / new_message_count
# 检查累计兴趣值
if new_message_count > 0:
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest:
logger.info(f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}")
logger.info(
f"{self.context.log_prefix} breaking形式当前累积兴趣值: {total_interest:.2f}, 专注度: {global_config.chat.focus_value:.1f}"
)
self._last_accumulated_interest = total_interest
if total_interest >= modified_exit_interest_threshold:
# 记录兴趣度到列表
@@ -397,13 +566,16 @@ class HeartFChatting:
logger.info(
f"{self.context.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{modified_exit_interest_threshold:.1f}),结束等待"
)
return True,total_interest/new_message_count
return True, total_interest / new_message_count
# 每10秒输出一次等待状态
if int(time.time() - self.context.last_read_time) > 0 and int(time.time() - self.context.last_read_time) % 10 == 0:
if (
int(time.time() - self.context.last_read_time) > 0
and int(time.time() - self.context.last_read_time) % 10 == 0
):
logger.info(
f"{self.context.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..."
f"{self.context.log_prefix} 已等待{time.time() - self.context.last_read_time:.0f}秒,累计{new_message_count}条消息,累积兴趣{total_interest:.1f},继续等待..."
)
await asyncio.sleep(0.5)
return False,0.0
return False, 0.0

View File

@@ -1,16 +1,15 @@
from typing import List, Optional, TYPE_CHECKING
import time
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.common.logger import get_logger
from src.person_info.relationship_builder_manager import RelationshipBuilder
from src.chat.express.expression_learner import ExpressionLearner
from src.plugin_system.base.component_types import ChatMode
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.chat_loop.hfc_utils import CycleDetail
if TYPE_CHECKING:
from .wakeup_manager import WakeUpManager
from .sleep_manager.wakeup_manager import WakeUpManager
from .energy_manager import EnergyManager
from .heartFC_chat import HeartFChatting
class HfcContext:
@@ -43,13 +42,13 @@ class HfcContext:
self.energy_value = self.chat_stream.energy_value
self.sleep_pressure = self.chat_stream.sleep_pressure
self.was_sleeping = False # 用于检测睡眠状态的切换
self.was_sleeping = False # 用于检测睡眠状态的切换
self.last_message_time = time.time()
self.last_read_time = time.time() - 10
# 从聊天流恢复breaking累积兴趣值
self.breaking_accumulated_interest = getattr(self.chat_stream, 'breaking_accumulated_interest', 0.0)
self.breaking_accumulated_interest = getattr(self.chat_stream, "breaking_accumulated_interest", 0.0)
self.action_manager = ActionManager()
@@ -69,7 +68,7 @@ class HfcContext:
# breaking形式下的累积兴趣值
self.breaking_accumulated_interest = 0.0
# 引用HeartFChatting实例以便其他组件可以调用其方法
self.chat_instance = None
self.chat_instance: Optional["HeartFChatting"] = None
def save_context_state(self):
"""将当前状态保存到聊天流"""
@@ -78,4 +77,4 @@ class HfcContext:
self.chat_stream.sleep_pressure = self.sleep_pressure
self.chat_stream.focus_energy = self.focus_energy
self.chat_stream.no_reply_consecutive = self.no_reply_consecutive
self.chat_stream.breaking_accumulated_interest = self.breaking_accumulated_interest
self.chat_stream.breaking_accumulated_interest = self.breaking_accumulated_interest

View File

@@ -1,13 +1,11 @@
import time
from typing import Optional, Dict, Any, Union
from src.config.config import global_config
from src.common.logger import get_logger
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.apis import send_api
from maim_message.message_base import GroupInfo
from src.common.message_repository import count_messages
logger = get_logger("hfc")
@@ -122,6 +120,7 @@ class CycleDetail:
self.loop_plan_info = loop_info["loop_plan_info"]
self.loop_action_info = loop_info["loop_action_info"]
async def send_typing():
"""
发送打字状态指示

View File

@@ -0,0 +1,13 @@
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
@dataclass
class ProactiveTriggerEvent:
"""
主动思考触发事件的数据类
"""
source: str # 触发源的标识,例如 "silence_monitor", "insomnia_manager"
reason: str # 触发的具体原因,例如 "聊天已沉默10分钟", "深夜emo"
metadata: Optional[Dict[str, Any]] = field(default_factory=dict) # 可选的元数据,用于传递额外信息

View File

@@ -0,0 +1,125 @@
import time
import traceback
from typing import TYPE_CHECKING
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ChatMode
from ..hfc_context import HfcContext
from .events import ProactiveTriggerEvent
from src.plugin_system.apis import generator_api
if TYPE_CHECKING:
from ..cycle_processor import CycleProcessor
logger = get_logger("hfc")
class ProactiveThinker:
def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"):
"""
初始化主动思考器
Args:
context: HFC聊天上下文对象
cycle_processor: 循环处理器,用于执行主动思考的结果
功能说明:
- 接收主动思考事件并执行思考流程
- 根据事件类型执行不同的前置操作(如修改情绪)
- 调用planner进行决策并由cycle_processor执行
"""
self.context = context
self.cycle_processor = cycle_processor
async def think(self, trigger_event: ProactiveTriggerEvent):
"""
统一的API入口用于触发主动思考
Args:
trigger_event: 描述触发上下文的事件对象
"""
logger.info(
f"{self.context.log_prefix} 接收到主动思考事件: "
f"来源='{trigger_event.source}', 原因='{trigger_event.reason}'"
)
try:
# 1. 根据事件类型执行前置操作
await self._prepare_for_thinking(trigger_event)
# 2. 执行核心思考逻辑
await self._execute_proactive_thinking(trigger_event)
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考 think 方法执行异常: {e}")
logger.error(traceback.format_exc())
async def _prepare_for_thinking(self, trigger_event: ProactiveTriggerEvent):
"""
根据事件类型,执行思考前的准备工作,例如修改情绪
Args:
trigger_event: 触发事件
"""
if trigger_event.source != "insomnia_manager":
return
try:
from src.mood.mood_manager import mood_manager
mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id)
new_mood = None
if trigger_event.reason == "low_pressure":
new_mood = "精力过剩,毫无睡意"
elif trigger_event.reason == "random":
new_mood = "深夜emo胡思乱想"
elif trigger_event.reason == "goodnight":
new_mood = "有点困了,准备睡觉了"
if new_mood:
mood_obj.mood_state = new_mood
mood_obj.last_change_time = time.time()
logger.info(
f"{self.context.log_prefix}'{trigger_event.reason}'"
f"情绪状态被强制更新为: {mood_obj.mood_state}"
)
except Exception as e:
logger.error(f"{self.context.log_prefix} 设置失眠情绪时出错: {e}")
async def _execute_proactive_thinking(self, trigger_event: ProactiveTriggerEvent):
"""
执行主动思考的核心逻辑
Args:
trigger_event: 触发事件
"""
try:
# 直接调用 planner 的 PROACTIVE 模式
actions, target_message = await self.cycle_processor.action_planner.plan(mode=ChatMode.PROACTIVE)
# 获取第一个规划出的动作作为主要决策
action_result = actions[0] if actions else {}
# 如果决策不是 do_nothing则执行
if action_result and action_result.get("action_type") != "do_nothing":
if action_result.get("action_type") == "reply":
success, response_set, _ = await generator_api.generate_reply(
chat_stream=self.context.chat_stream,
reply_message=action_result["action_message"],
available_actions={},
enable_tool=False,
request_type="chat.replyer.proactive",
from_plugin=False,
)
if success and response_set:
await self.cycle_processor.response_handler.send_response(
response_set, time.time(), action_result["action_message"]
)
else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc())

View File

@@ -1,353 +0,0 @@
import asyncio
import time
import traceback
from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
from src.plugin_system.base.component_types import ChatMode
from .hfc_context import HfcContext
if TYPE_CHECKING:
from .cycle_processor import CycleProcessor
logger = get_logger("hfc")
class ProactiveThinker:
def __init__(self, context: HfcContext, cycle_processor: "CycleProcessor"):
"""
初始化主动思考器
Args:
context: HFC聊天上下文对象
cycle_processor: 循环处理器,用于执行主动思考的结果
功能说明:
- 管理机器人的主动发言功能
- 根据沉默时间和配置触发主动思考
- 提供私聊和群聊不同的思考提示模板
- 使用3-sigma规则计算动态思考间隔
"""
self.context = context
self.cycle_processor = cycle_processor
self._proactive_thinking_task: Optional[asyncio.Task] = None
self.proactive_thinking_prompts = {
"private": """现在你和你朋友的私聊里面已经隔了{time}没有发送消息了,请你结合上下文以及你和你朋友之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择:
1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时)
2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时)
请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""",
"group": """现在群里面已经隔了{time}没有人发送消息了,请你结合上下文以及群聊里面之前聊过的话题和你的人设来决定要不要主动发送消息,你可以选择:
1. 继续保持沉默(当{time}以前已经结束了一个话题并且你不想挑起新话题时)
2. 选择回复(当{time}以前你发送了一条消息且没有人回复你时、你想主动挑起一个话题时)
请根据当前情况做出选择。如果选择回复,请直接发送你想说的内容;如果选择保持沉默,请只回复"沉默"(注意:这个词不会被发送到群聊中)。""",
}
async def start(self):
"""
启动主动思考器
功能说明:
- 检查运行状态和配置,避免重复启动
- 只有在启用主动思考功能时才启动
- 创建主动思考循环异步任务
- 设置任务完成回调处理
- 记录启动日志
"""
if self.context.running and not self._proactive_thinking_task and global_config.chat.enable_proactive_thinking:
self._proactive_thinking_task = asyncio.create_task(self._proactive_thinking_loop())
self._proactive_thinking_task.add_done_callback(self._handle_proactive_thinking_completion)
logger.info(f"{self.context.log_prefix} 主动思考器已启动")
async def stop(self):
"""
停止主动思考器
功能说明:
- 取消正在运行的主动思考任务
- 等待任务完全停止
- 记录停止日志
"""
if self._proactive_thinking_task and not self._proactive_thinking_task.done():
self._proactive_thinking_task.cancel()
await asyncio.sleep(0)
logger.info(f"{self.context.log_prefix} 主动思考器已停止")
def _handle_proactive_thinking_completion(self, task: asyncio.Task):
"""
处理主动思考任务完成
Args:
task: 完成的异步任务对象
功能说明:
- 处理任务正常完成或异常情况
- 记录相应的日志信息
- 区分取消和异常终止的情况
"""
try:
if exception := task.exception():
logger.error(f"{self.context.log_prefix} 主动思考循环异常: {exception}")
else:
logger.info(f"{self.context.log_prefix} 主动思考循环正常结束")
except asyncio.CancelledError:
logger.info(f"{self.context.log_prefix} 主动思考循环被取消")
async def _proactive_thinking_loop(self):
"""
主动思考的主循环
功能说明:
- 每15秒检查一次是否需要主动思考
- 只在FOCUS模式下进行主动思考
- 检查是否启用主动思考功能
- 计算沉默时间并与动态间隔比较
- 达到条件时执行主动思考并更新最后消息时间
- 处理执行过程中的异常
"""
while self.context.running:
await asyncio.sleep(15)
if self.context.loop_mode != ChatMode.FOCUS:
continue
if not self._should_enable_proactive_thinking():
continue
current_time = time.time()
silence_duration = current_time - self.context.last_message_time
target_interval = self._get_dynamic_thinking_interval()
if silence_duration >= target_interval:
try:
await self._execute_proactive_thinking(silence_duration)
self.context.last_message_time = current_time
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行出错: {e}")
logger.error(traceback.format_exc())
def _should_enable_proactive_thinking(self) -> bool:
"""
检查是否应该启用主动思考
Returns:
bool: 如果应该启用主动思考则返回True
功能说明:
- 检查聊天流是否存在
- 检查当前聊天是否在启用列表中(按平台和类型分别检查)
- 根据聊天类型(群聊/私聊)和配置决定是否启用
- 群聊需要proactive_thinking_in_group为True
- 私聊需要proactive_thinking_in_private为True
"""
if not self.context.chat_stream:
return False
is_group_chat = self.context.chat_stream.group_info is not None
# 检查基础开关
if is_group_chat and not global_config.chat.proactive_thinking_in_group:
return False
if not is_group_chat and not global_config.chat.proactive_thinking_in_private:
return False
# 获取当前聊天的完整标识 (platform:chat_id)
stream_parts = self.context.stream_id.split(":")
if len(stream_parts) >= 2:
platform = stream_parts[0]
chat_id = stream_parts[1]
current_chat_identifier = f"{platform}:{chat_id}"
else:
# 如果无法解析则使用原始stream_id
current_chat_identifier = self.context.stream_id
# 检查是否在启用列表中
if is_group_chat:
# 群聊检查
enable_list = getattr(global_config.chat, "proactive_thinking_enable_in_groups", [])
if enable_list and current_chat_identifier not in enable_list:
return False
else:
# 私聊检查
enable_list = getattr(global_config.chat, "proactive_thinking_enable_in_private", [])
if enable_list and current_chat_identifier not in enable_list:
return False
return True
def _get_dynamic_thinking_interval(self) -> float:
"""
获取动态思考间隔
Returns:
float: 计算得出的思考间隔时间(秒)
功能说明:
- 使用3-sigma规则计算正态分布的思考间隔
- 基于base_interval和delta_sigma配置计算
- 处理特殊情况为0或负数的配置
- 如果timing_utils不可用则使用固定间隔
- 间隔范围被限制在1秒到86400秒1天之间
"""
try:
from src.utils.timing_utils import get_normal_distributed_interval
base_interval = global_config.chat.proactive_thinking_interval
delta_sigma = getattr(global_config.chat, "delta_sigma", 120)
if base_interval < 0:
base_interval = abs(base_interval)
if delta_sigma < 0:
delta_sigma = abs(delta_sigma)
if base_interval == 0 and delta_sigma == 0:
return 300
elif base_interval == 0:
sigma_percentage = delta_sigma / 1000
return get_normal_distributed_interval(0, sigma_percentage, 1, 86400, use_3sigma_rule=True)
elif delta_sigma == 0:
return base_interval
sigma_percentage = delta_sigma / base_interval
return get_normal_distributed_interval(base_interval, sigma_percentage, 1, 86400, use_3sigma_rule=True)
except ImportError:
logger.warning(f"{self.context.log_prefix} timing_utils不可用使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
except Exception as e:
logger.error(f"{self.context.log_prefix} 动态间隔计算出错: {e},使用固定间隔")
return max(300, abs(global_config.chat.proactive_thinking_interval))
def _format_duration(self, seconds: float) -> str:
"""
格式化持续时间为中文描述
Args:
seconds: 持续时间(秒)
Returns:
str: 格式化后的时间字符串,如"1小时30分45秒"
功能说明:
- 将秒数转换为小时、分钟、秒的组合
- 只显示非零的时间单位
- 如果所有单位都为0则显示"0秒"
- 用于主动思考日志的时间显示
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if hours > 0:
parts.append(f"{hours}小时")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
async def _execute_proactive_thinking(self, silence_duration: float):
"""
执行主动思考
Args:
silence_duration: 沉默持续时间(秒)
"""
formatted_time = self._format_duration(silence_duration)
logger.info(f"{self.context.log_prefix} 触发主动思考,已沉默{formatted_time}")
try:
# 直接调用 planner 的 PROACTIVE 模式
action_result_tuple, target_message = await self.cycle_processor.action_planner.plan(
mode=ChatMode.PROACTIVE
)
action_result = action_result_tuple.get("action_result")
# 如果决策不是 do_nothing则执行
if action_result and action_result.get("action_type") != "do_nothing":
logger.info(f"{self.context.log_prefix} 主动思考决策: {action_result.get('action_type')}, 原因: {action_result.get('reasoning')}")
# 在主动思考时,如果 target_message 为 None则默认选取最新 message 作为 target_message
if target_message is None and self.context.chat_stream and self.context.chat_stream.context:
from src.chat.message_receive.message import MessageRecv
latest_message = self.context.chat_stream.context.get_last_message()
if isinstance(latest_message, MessageRecv):
user_info = latest_message.message_info.user_info
target_message = {
"chat_info_platform": latest_message.message_info.platform,
"user_platform": user_info.platform if user_info else None,
"user_id": user_info.user_id if user_info else None,
"processed_plain_text": latest_message.processed_plain_text,
"is_mentioned": latest_message.is_mentioned,
}
# 将决策结果交给 cycle_processor 的后续流程处理
await self.cycle_processor.execute_plan(action_result, target_message)
else:
logger.info(f"{self.context.log_prefix} 主动思考决策: 保持沉默")
except Exception as e:
logger.error(f"{self.context.log_prefix} 主动思考执行异常: {e}")
logger.error(traceback.format_exc())
async def trigger_insomnia_thinking(self, reason: str):
"""
由外部事件(如失眠)触发的一次性主动思考
Args:
reason: 触发的原因 (e.g., "low_pressure", "random")
"""
logger.info(f"{self.context.log_prefix} 因“{reason}”触发失眠,开始深夜思考...")
# 1. 根据原因修改情绪
try:
from src.mood.mood_manager import mood_manager
mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id)
if reason == "low_pressure":
mood_obj.mood_state = "精力过剩,毫无睡意"
elif reason == "random":
mood_obj.mood_state = "深夜emo胡思乱想"
mood_obj.last_change_time = time.time() # 更新时间戳以允许后续的情绪回归
logger.info(f"{self.context.log_prefix} 因失眠,情绪状态被强制更新为: {mood_obj.mood_state}")
except Exception as e:
logger.error(f"{self.context.log_prefix} 设置失眠情绪时出错: {e}")
# 2. 直接执行主动思考逻辑
try:
# 传入一个象征性的silence_duration因为它在这里不重要
await self._execute_proactive_thinking(silence_duration=1)
except Exception as e:
logger.error(f"{self.context.log_prefix} 失眠思考执行出错: {e}")
logger.error(traceback.format_exc())
async def trigger_goodnight_thinking(self):
"""
在失眠状态结束后,触发一次准备睡觉的主动思考
"""
logger.info(f"{self.context.log_prefix} 失眠状态结束,准备睡觉,触发告别思考...")
# 1. 设置一个准备睡觉的特定情绪
try:
from src.mood.mood_manager import mood_manager
mood_obj = mood_manager.get_mood_by_chat_id(self.context.stream_id)
mood_obj.mood_state = "有点困了,准备睡觉了"
mood_obj.last_change_time = time.time()
logger.info(f"{self.context.log_prefix} 情绪状态更新为: {mood_obj.mood_state}")
except Exception as e:
logger.error(f"{self.context.log_prefix} 设置睡前情绪时出错: {e}")
# 2. 直接执行主动思考逻辑
try:
await self._execute_proactive_thinking(silence_duration=1)
except Exception as e:
logger.error(f"{self.context.log_prefix} 睡前告别思考执行出错: {e}")
logger.error(traceback.format_exc())

View File

@@ -64,7 +64,7 @@ class ResponseHandler:
- 构建并返回完整的循环信息
- 用于上级方法的状态跟踪
"""
reply_text = await self.send_response(response_set, reply_to_str, loop_start_time, action_message)
reply_text = await self.send_response(response_set, loop_start_time, action_message)
person_info_manager = get_person_info_manager()
@@ -157,7 +157,7 @@ class ResponseHandler:
await send_api.text_to_stream(
text=data,
stream_id=self.context.stream_id,
reply_to_message = message_data,
reply_to_message=message_data,
set_reply=need_reply,
typing=False,
)
@@ -166,107 +166,9 @@ class ResponseHandler:
await send_api.text_to_stream(
text=data,
stream_id=self.context.stream_id,
reply_to_message = message_data,
set_reply=need_reply,
reply_to_message=None,
set_reply=False,
typing=True,
)
return reply_text
# TODO: 已废弃
async def generate_response(
self,
message_data: dict,
available_actions: Optional[Dict[str, Any]],
reply_to: str,
request_type: str = "chat.replyer.normal",
) -> Optional[list]:
"""
生成回复内容
Args:
message_data: 消息数据
available_actions: 可用动作列表
reply_to: 回复目标
request_type: 请求类型,默认为普通回复
Returns:
list: 生成的回复内容列表失败时返回None
功能说明:
- 在生成回复前进行反注入检测(提高效率)
- 调用生成器API生成回复
- 根据配置启用或禁用工具功能
- 处理生成失败的情况
- 记录生成过程中的错误和异常
"""
try:
# === 反注入检测(仅在需要生成回复时) ===
# 执行反注入检测(直接使用字典格式)
anti_injector = get_anti_injector()
result, modified_content, reason = await anti_injector.process_message(
message_data, self.context.chat_stream
)
# 根据反注入结果处理消息数据
await anti_injector.handle_message_storage(result, modified_content, reason, message_data)
if result == ProcessResult.BLOCKED_BAN:
# 用户被封禁 - 直接阻止回复生成
anti_injector_logger.warning(f"用户被反注入系统封禁,阻止回复生成: {reason}")
return None
elif result == ProcessResult.BLOCKED_INJECTION:
# 消息被阻止(危险内容等) - 直接阻止回复生成
anti_injector_logger.warning(f"消息被反注入系统阻止,阻止回复生成: {reason}")
return None
elif result == ProcessResult.COUNTER_ATTACK:
# 反击模式:生成反击消息作为回复
anti_injector_logger.info(f"反击模式启动,生成反击回复: {reason}")
if modified_content:
# 返回反击消息作为回复内容
return [("text", modified_content)]
else:
# 没有反击内容时阻止回复生成
return None
# 检查是否需要加盾处理
safety_prompt = None
if result == ProcessResult.SHIELDED:
# 获取安全系统提示词并注入
shield = anti_injector.shield
safety_prompt = shield.get_safety_system_prompt()
await Prompt.create_async(safety_prompt, "anti_injection_safety_prompt")
anti_injector_logger.info(f"消息已被反注入系统加盾处理,已注入安全提示词: {reason}")
# 处理被修改的消息内容(用于生成回复)
modified_reply_to = reply_to
if modified_content:
# 更新消息内容用于生成回复
anti_injector_logger.info(f"消息内容已被反注入系统修改,使用修改后内容生成回复: {reason}")
# 解析原始reply_to格式"发送者:消息内容"
if ":" in reply_to:
sender_part, _ = reply_to.split(":", 1)
modified_reply_to = f"{sender_part}:{modified_content}"
else:
# 如果格式不标准,直接使用修改后的内容
modified_reply_to = modified_content
# === 正常的回复生成流程 ===
success, reply_set, _ = await generator_api.generate_reply(
chat_stream=self.context.chat_stream,
reply_to=modified_reply_to, # 使用可能被修改的内容
available_actions=available_actions,
enable_tool=global_config.tool.enable_tool,
request_type=request_type,
from_plugin=False,
)
if not success or not reply_set:
logger.info(f"{message_data.get('processed_plain_text')} 的回复生成失败")
return None
return reply_set
except Exception as e:
logger.error(f"{self.context.log_prefix}回复生成出现错误:{str(e)} {traceback.format_exc()}")
return None

View File

@@ -0,0 +1,352 @@
import asyncio
import random
from datetime import datetime, timedelta, date, time
from enum import Enum, auto
from typing import Optional, TYPE_CHECKING
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.local_store_manager import local_storage
from src.plugin_system.apis import send_api, generator_api
if TYPE_CHECKING:
from mmc.src.chat.chat_loop.sleep_manager.wakeup_manager import WakeUpManager
logger = get_logger("sleep_manager")
class SleepState(Enum):
"""睡眠状态枚举"""
AWAKE = auto() # 完全清醒
INSOMNIA = auto() # 失眠(在理论睡眠时间内保持清醒)
PREPARING_SLEEP = auto() # 准备入睡(缓冲期)
SLEEPING = auto() # 正在休眠
WOKEN_UP = auto() # 被吵醒
class SleepManager:
def __init__(self, schedule_manager):
self.schedule_manager = schedule_manager
self.last_sleep_log_time = 0
self.sleep_log_interval = 35 # 日志记录间隔,单位秒
# --- 统一睡眠状态管理 ---
self._current_state: SleepState = SleepState.AWAKE
self._sleep_buffer_end_time: Optional[datetime] = None
self._total_delayed_minutes_today: int = 0
self._last_sleep_check_date: Optional[date] = None
self._last_fully_slept_log_time: float = 0
self._re_sleep_attempt_time: Optional[datetime] = None # 新增:重新入睡的尝试时间
self._load_sleep_state()
def get_current_sleep_state(self) -> SleepState:
"""获取当前的睡眠状态"""
return self._current_state
def is_sleeping(self) -> bool:
"""检查当前是否处于正式休眠状态"""
return self._current_state == SleepState.SLEEPING
async def update_sleep_state(self, wakeup_manager: Optional["WakeUpManager"] = None):
"""
核心状态机:根据当前情况更新睡眠状态
"""
# --- 基础检查 ---
if not global_config.sleep_system.enable or not self.schedule_manager.today_schedule:
if self._current_state != SleepState.AWAKE:
logger.debug("睡眠系统禁用或无日程,强制设为 AWAKE")
self._current_state = SleepState.AWAKE
return
now = datetime.now()
today = now.date()
# --- 每日状态重置 ---
if self._last_sleep_check_date != today:
logger.info(f"新的一天 ({today}),重置睡眠状态为 AWAKE。")
self._total_delayed_minutes_today = 0
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._last_sleep_check_date = today
self._save_sleep_state()
# --- 判断当前是否为理论上的睡眠时间 ---
is_in_theoretical_sleep, activity = self._is_in_theoretical_sleep_time(now.time())
# ===================================
# 状态机核心逻辑
# ===================================
# 状态:清醒 (AWAKE)
if self._current_state == SleepState.AWAKE:
if is_in_theoretical_sleep:
logger.info(f"进入理论休眠时间 '{activity}',开始进行睡眠决策...")
# --- 合并后的失眠与弹性睡眠决策逻辑 ---
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
# 决策1因睡眠压力低而延迟入睡原弹性睡眠
if (
sleep_pressure < pressure_threshold
and self._total_delayed_minutes_today < global_config.sleep_system.max_sleep_delay_minutes
):
delay_minutes = 15
self._total_delayed_minutes_today += delay_minutes
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
self._current_state = SleepState.INSOMNIA
logger.info(
f"睡眠压力 ({sleep_pressure:.1f}) 低于阈值 ({pressure_threshold}),进入失眠状态,延迟入睡 {delay_minutes} 分钟。"
)
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(self._send_pre_sleep_notification())
# 决策2进入正常的入睡准备流程
else:
buffer_seconds = random.randint(5 * 60, 10 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
logger.info(
f"睡眠压力正常或已达今日最大延迟,进入准备入睡状态,将在 {buffer_seconds / 60:.1f} 分钟内入睡。"
)
# 发送睡前通知
if global_config.sleep_system.enable_pre_sleep_notification:
asyncio.create_task(self._send_pre_sleep_notification())
self._save_sleep_state()
# 状态:失眠 (INSOMNIA)
elif self._current_state == SleepState.INSOMNIA:
if not is_in_theoretical_sleep:
logger.info("已离开理论休眠时间,失眠结束,恢复清醒。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("失眠状态下的延迟时间已过,重新评估是否入睡...")
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
if (
sleep_pressure >= pressure_threshold
or self._total_delayed_minutes_today >= global_config.sleep_system.max_sleep_delay_minutes
):
logger.info("睡眠压力足够或已达最大延迟,从失眠状态转换到准备入睡。")
buffer_seconds = random.randint(5 * 60, 10 * 60)
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
else:
logger.info(f"睡眠压力({sleep_pressure:.1f})仍然较低再延迟15分钟。")
delay_minutes = 15
self._total_delayed_minutes_today += delay_minutes
self._sleep_buffer_end_time = now + timedelta(minutes=delay_minutes)
self._save_sleep_state()
# 状态:准备入睡 (PREPARING_SLEEP)
elif self._current_state == SleepState.PREPARING_SLEEP:
if not is_in_theoretical_sleep:
logger.info("准备入睡期间离开理论休眠时间,取消入睡,恢复清醒。")
self._current_state = SleepState.AWAKE
self._sleep_buffer_end_time = None
self._save_sleep_state()
elif self._sleep_buffer_end_time and now >= self._sleep_buffer_end_time:
logger.info("睡眠缓冲期结束,正式进入休眠状态。")
self._current_state = SleepState.SLEEPING
self._last_fully_slept_log_time = now.timestamp()
self._save_sleep_state()
# 状态:休眠中 (SLEEPING)
elif self._current_state == SleepState.SLEEPING:
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,自然醒来。")
self._current_state = SleepState.AWAKE
self._save_sleep_state()
else:
# 记录日志
current_timestamp = now.timestamp()
if current_timestamp - self.last_sleep_log_time > self.sleep_log_interval:
logger.info(f"当前处于休眠活动 '{activity}' 中。")
self.last_sleep_log_time = current_timestamp
# 状态:被吵醒 (WOKEN_UP)
elif self._current_state == SleepState.WOKEN_UP:
if not is_in_theoretical_sleep:
logger.info("理论休眠时间结束,被吵醒的状态自动结束。")
self._current_state = SleepState.AWAKE
self._re_sleep_attempt_time = None
self._save_sleep_state()
elif self._re_sleep_attempt_time and now >= self._re_sleep_attempt_time:
logger.info("被吵醒后经过一段时间,尝试重新入睡...")
sleep_pressure = wakeup_manager.context.sleep_pressure if wakeup_manager else 999
pressure_threshold = global_config.sleep_system.flexible_sleep_pressure_threshold
if sleep_pressure >= pressure_threshold:
logger.info("睡眠压力足够,从被吵醒状态转换到准备入睡。")
buffer_seconds = random.randint(3 * 60, 8 * 60) # 重新入睡的缓冲期可以短一些
self._sleep_buffer_end_time = now + timedelta(seconds=buffer_seconds)
self._current_state = SleepState.PREPARING_SLEEP
self._re_sleep_attempt_time = None
else:
delay_minutes = 15
self._re_sleep_attempt_time = now + timedelta(minutes=delay_minutes)
logger.info(
f"睡眠压力({sleep_pressure:.1f})仍然较低,暂时保持清醒,在 {delay_minutes} 分钟后再次尝试。"
)
self._save_sleep_state()
def reset_sleep_state_after_wakeup(self):
"""被唤醒后,将状态切换到 WOKEN_UP"""
if self._current_state in [SleepState.PREPARING_SLEEP, SleepState.SLEEPING, SleepState.INSOMNIA]:
logger.info("被唤醒,进入 WOKEN_UP 状态!")
self._current_state = SleepState.WOKEN_UP
self._sleep_buffer_end_time = None
# 设置一个延迟,之后再尝试重新入睡
re_sleep_delay_minutes = getattr(global_config.sleep_system, "re_sleep_delay_minutes", 10)
self._re_sleep_attempt_time = datetime.now() + timedelta(minutes=re_sleep_delay_minutes)
logger.info(f"将在 {re_sleep_delay_minutes} 分钟后尝试重新入睡。")
self._save_sleep_state()
def _is_in_theoretical_sleep_time(self, now_time: time) -> tuple[bool, Optional[str]]:
"""检查当前时间是否落在日程表的任何一个睡眠活动中"""
sleep_keywords = ["休眠", "睡觉", "梦乡"]
if self.schedule_manager.today_schedule:
for event in self.schedule_manager.today_schedule:
try:
activity = event.get("activity", "").strip()
time_range = event.get("time_range")
if not activity or not time_range:
continue
if any(keyword in activity for keyword in sleep_keywords):
start_str, end_str = time_range.split("-")
start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
if start_time <= end_time: # 同一天
if start_time <= now_time < end_time:
return True, activity
else: # 跨天
if now_time >= start_time or now_time < end_time:
return True, activity
except (ValueError, KeyError, AttributeError) as e:
logger.warning(f"解析日程事件时出错: {event}, 错误: {e}")
continue
return False, None
async def _send_pre_sleep_notification(self):
"""异步生成并发送睡前通知"""
try:
groups = global_config.sleep_system.pre_sleep_notification_groups
prompt = global_config.sleep_system.pre_sleep_prompt
if not groups:
logger.info("未配置睡前通知的群组,跳过发送。")
return
if not prompt:
logger.warning("睡前通知的prompt为空跳过发送。")
return
# 为防止消息风暴,稍微延迟一下
await asyncio.sleep(random.uniform(5, 15))
for group_id_str in groups:
try:
# 格式 "platform:group_id"
parts = group_id_str.split(":")
if len(parts) != 2:
logger.warning(f"无效的群组ID格式: {group_id_str}")
continue
platform, group_id = parts
# 使用与 ChatStream.get_stream_id 相同的逻辑生成 stream_id
import hashlib
key = "_".join([platform, group_id])
stream_id = hashlib.md5(key.encode()).hexdigest()
logger.info(f"正在为群组 {group_id_str} (Stream ID: {stream_id}) 生成睡前消息...")
# 调用 generator_api 生成回复
success, reply_set, _ = await generator_api.generate_reply(
chat_id=stream_id, extra_info=prompt, request_type="schedule.pre_sleep_notification"
)
if success and reply_set:
# 提取文本内容并发送
reply_text = "".join([content for msg_type, content in reply_set if msg_type == "text"])
if reply_text:
logger.info(f"向群组 {group_id_str} 发送睡前消息: {reply_text}")
await send_api.text_to_stream(text=reply_text, stream_id=stream_id)
else:
logger.warning(f"为群组 {group_id_str} 生成的回复内容为空。")
else:
logger.error(f"为群组 {group_id_str} 生成睡前消息失败。")
await asyncio.sleep(random.uniform(2, 5)) # 避免发送过快
except Exception as e:
logger.error(f"向群组 {group_id_str} 发送睡前消息失败: {e}")
except Exception as e:
logger.error(f"发送睡前通知任务失败: {e}")
def _save_sleep_state(self):
"""将当前睡眠状态保存到本地存储"""
try:
state = {
"current_state": self._current_state.name,
"sleep_buffer_end_time_ts": self._sleep_buffer_end_time.timestamp()
if self._sleep_buffer_end_time
else None,
"total_delayed_minutes_today": self._total_delayed_minutes_today,
"last_sleep_check_date_str": self._last_sleep_check_date.isoformat()
if self._last_sleep_check_date
else None,
"re_sleep_attempt_time_ts": self._re_sleep_attempt_time.timestamp()
if self._re_sleep_attempt_time
else None,
}
local_storage["schedule_sleep_state"] = state
logger.debug(f"已保存睡眠状态: {state}")
except Exception as e:
logger.error(f"保存睡眠状态失败: {e}")
def _load_sleep_state(self):
"""从本地存储加载睡眠状态"""
try:
state = local_storage["schedule_sleep_state"]
if state and isinstance(state, dict):
state_name = state.get("current_state")
if state_name and hasattr(SleepState, state_name):
self._current_state = SleepState[state_name]
end_time_ts = state.get("sleep_buffer_end_time_ts")
if end_time_ts:
self._sleep_buffer_end_time = datetime.fromtimestamp(end_time_ts)
re_sleep_ts = state.get("re_sleep_attempt_time_ts")
if re_sleep_ts:
self._re_sleep_attempt_time = datetime.fromtimestamp(re_sleep_ts)
self._total_delayed_minutes_today = state.get("total_delayed_minutes_today", 0)
date_str = state.get("last_sleep_check_date_str")
if date_str:
self._last_sleep_check_date = datetime.fromisoformat(date_str).date()
logger.info(f"成功从本地存储加载睡眠状态: {state}")
except Exception as e:
logger.warning(f"加载睡眠状态失败,将使用默认值: {e}")

View File

@@ -4,7 +4,7 @@ from typing import Optional
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.local_store_manager import local_storage
from .hfc_context import HfcContext
from ..hfc_context import HfcContext
logger = get_logger("wakeup")
@@ -139,7 +139,7 @@ class WakeUpManager:
# 只有在休眠且非失眠状态下才累积唤醒度
from src.schedule.schedule_manager import schedule_manager
from src.schedule.sleep_manager import SleepState
from mmc.src.chat.chat_loop.sleep_manager.sleep_manager import SleepState
current_sleep_state = schedule_manager.get_current_sleep_state()
if current_sleep_state != SleepState.SLEEPING:

View File

@@ -724,7 +724,7 @@ class EmojiManager:
if not emoji.is_deleted and emoji.hash == emoji_hash:
return emoji
return None # 如果循环结束还没找到,则返回 None
async def get_emoji_tag_by_hash(self, emoji_hash: str) -> Optional[str]:
"""根据哈希值获取已注册表情包的描述
@@ -755,7 +755,7 @@ class EmojiManager:
except Exception as e:
logger.error(f"获取表情包描述失败 (Hash: {emoji_hash}): {str(e)}")
return None
async def get_emoji_description_by_hash(self, emoji_hash: str) -> Optional[str]:
"""根据哈希值获取已注册表情包的描述

View File

@@ -85,6 +85,7 @@ class ChatStream:
self.context: ChatMessageContext = None # type: ignore # 用于存储该聊天的上下文信息
self.focus_energy = 1
self.no_reply_consecutive = 0
self.breaking_accumulated_interest = 0.0
def to_dict(self) -> dict:
"""转换为字典格式"""
@@ -97,6 +98,7 @@ class ChatStream:
"last_active_time": self.last_active_time,
"energy_value": self.energy_value,
"sleep_pressure": self.sleep_pressure,
"breaking_accumulated_interest": self.breaking_accumulated_interest,
}
@classmethod
@@ -257,7 +259,7 @@ class ChatManager:
"user_cardname": model_instance.user_cardname or "",
}
group_info_data = None
if model_instance.group_id:
if model_instance and getattr(model_instance, "group_id", None):
group_info_data = {
"platform": model_instance.group_platform,
"group_id": model_instance.group_id,
@@ -403,7 +405,7 @@ class ChatManager:
"user_cardname": model_instance.user_cardname or "",
}
group_info_data = None
if model_instance.group_id:
if model_instance and getattr(model_instance, "group_id", None):
group_info_data = {
"platform": model_instance.group_platform,
"group_id": model_instance.group_id,

View File

@@ -120,7 +120,7 @@ class MessageRecv(Message):
self.priority_mode = "interest"
self.priority_info = None
self.interest_value: float = 0.0
self.key_words = []
self.key_words_lite = []

View File

@@ -20,7 +20,7 @@ class MessageStorage:
if isinstance(keywords, list):
return orjson.dumps(keywords).decode("utf-8")
return "[]"
@staticmethod
def _deserialize_keywords(keywords_str: str) -> list:
"""将JSON字符串反序列化为关键词列表"""

View File

@@ -161,10 +161,8 @@ class ActionModifier:
available_actions = list(self.action_manager.get_using_actions().keys())
available_actions_text = "".join(available_actions) if available_actions else ""
logger.info(
f"{self.log_prefix} 当前可用动作: {available_actions_text}||移除: {removals_summary}"
)
logger.info(f"{self.log_prefix} 当前可用动作: {available_actions_text}||移除: {removals_summary}")
def _check_action_associated_types(self, all_actions: Dict[str, ActionInfo], chat_context: ChatMessageContext):
type_mismatched_actions: List[Tuple[str, str]] = []

View File

@@ -188,15 +188,12 @@ class ActionPlanner:
param_text = ""
if action_info.action_parameters:
param_text = "\n" + "\n".join(
f' "{p_name}":"{p_desc}"'
for p_name, p_desc in action_info.action_parameters.items()
f' "{p_name}":"{p_desc}"' for p_name, p_desc in action_info.action_parameters.items()
)
require_text = "\n".join(f"- {req}" for req in action_info.action_require)
using_action_prompt = await global_prompt_manager.get_prompt_async(
"action_prompt"
)
using_action_prompt = await global_prompt_manager.get_prompt_async("action_prompt")
action_options_block += using_action_prompt.format(
action_name=action_name,
action_description=action_info.description,
@@ -205,9 +202,7 @@ class ActionPlanner:
)
return action_options_block
def find_message_by_id(
self, message_id: str, message_id_list: list
) -> Optional[Dict[str, Any]]:
def find_message_by_id(self, message_id: str, message_id_list: list) -> Optional[Dict[str, Any]]:
# sourcery skip: use-next
"""
根据message_id从message_id_list中查找对应的原始消息
@@ -245,7 +240,7 @@ class ActionPlanner:
async def plan(
self,
mode: ChatMode = ChatMode.FOCUS,
loop_start_time:float = 0.0,
loop_start_time: float = 0.0,
available_actions: Optional[Dict[str, ActionInfo]] = None,
) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""
@@ -323,11 +318,15 @@ class ActionPlanner:
# 如果获取的target_message为None输出warning并重新plan
if target_message is None:
self.plan_retry_count += 1
logger.warning(f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息,重试次数: {self.plan_retry_count}/{self.max_plan_retries}")
logger.warning(
f"{self.log_prefix}无法找到target_message_id '{target_message_id}' 对应的消息,重试次数: {self.plan_retry_count}/{self.max_plan_retries}"
)
# 如果连续三次plan均为None输出error并选取最新消息
if self.plan_retry_count >= self.max_plan_retries:
logger.error(f"{self.log_prefix}连续{self.max_plan_retries}次plan获取target_message失败选择最新消息作为target_message")
logger.error(
f"{self.log_prefix}连续{self.max_plan_retries}次plan获取target_message失败选择最新消息作为target_message"
)
target_message = self.get_latest_message(message_id_list)
self.plan_retry_count = 0 # 重置计数器
else:
@@ -338,8 +337,7 @@ class ActionPlanner:
self.plan_retry_count = 0
else:
logger.warning(f"{self.log_prefix}动作'{action}'缺少target_message_id")
if action != "no_reply" and action != "reply" and action not in current_available_actions:
logger.warning(
f"{self.log_prefix}LLM 返回了当前不可用或无效的动作: '{action}' (可用: {list(current_available_actions.keys())}),将强制使用 'no_reply'"
@@ -362,36 +360,35 @@ class ActionPlanner:
is_parallel = False
if mode == ChatMode.NORMAL and action in current_available_actions:
is_parallel = current_available_actions[action].parallel_action
action_data["loop_start_time"] = loop_start_time
actions = []
# 1. 添加Planner取得的动作
actions.append({
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions # 添加这个字段
})
if action != "reply" and is_parallel:
actions.append({
"action_type": "reply",
actions.append(
{
"action_type": action,
"reasoning": reasoning,
"action_data": action_data,
"action_message": target_message,
"available_actions": available_actions
})
return actions,target_message
"available_actions": available_actions, # 添加这个字段
}
)
if action != "reply" and is_parallel:
actions.append(
{"action_type": "reply", "action_message": target_message, "available_actions": available_actions}
)
return actions, target_message
async def build_planner_prompt(
self,
is_group_chat: bool, # Now passed as argument
chat_target_info: Optional[dict], # Now passed as argument
current_available_actions: Dict[str, ActionInfo],
refresh_time :bool = False,
refresh_time: bool = False,
mode: ChatMode = ChatMode.FOCUS,
) -> tuple[str, list]: # sourcery skip: use-join
"""构建 Planner LLM 的提示词 (获取模板并填充数据)"""
@@ -400,21 +397,15 @@ class ActionPlanner:
time_block = f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
bot_name = global_config.bot.nickname
bot_nickname = (
f",也有人叫你{','.join(global_config.bot.alias_names)}"
if global_config.bot.alias_names
else ""
f",也有人叫你{','.join(global_config.bot.alias_names)}" if global_config.bot.alias_names else ""
)
bot_core_personality = global_config.personality.personality_core
identity_block = (
f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
)
identity_block = f"你的名字是{bot_name}{bot_nickname},你{bot_core_personality}"
schedule_block = ""
if global_config.schedule.enable:
if current_activity := schedule_manager.get_current_activity():
schedule_block = (
f"你当前正在:{current_activity},但注意它与群聊的聊天无关。"
)
schedule_block = f"你当前正在:{current_activity},但注意它与群聊的聊天无关。"
mood_block = ""
if global_config.mood.enable_mood:
@@ -424,13 +415,9 @@ class ActionPlanner:
# --- 根据模式构建不同的Prompt ---
if mode == ChatMode.PROACTIVE:
long_term_memory_block = await self._get_long_term_memory_context()
action_options_text = await self._build_action_options(
current_available_actions, mode
)
action_options_text = await self._build_action_options(current_available_actions, mode)
prompt_template = await global_prompt_manager.get_prompt_async(
"proactive_planner_prompt"
)
prompt_template = await global_prompt_manager.get_prompt_async("proactive_planner_prompt")
prompt = prompt_template.format(
time_block=time_block,
identity_block=identity_block,
@@ -463,12 +450,8 @@ class ActionPlanner:
limit=5,
)
actions_before_now_block = build_readable_actions(
actions=actions_before_now
)
actions_before_now_block = (
f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
)
actions_before_now_block = build_readable_actions(actions=actions_before_now)
actions_before_now_block = f"你刚刚选择并执行过的action是\n{actions_before_now_block}"
if refresh_time:
self.last_obs_time_mark = time.time()
@@ -504,30 +487,22 @@ class ActionPlanner:
}}"""
chat_context_description = "你现在正在一个群聊中"
chat_target_name = None
chat_target_name = None
if not is_group_chat and chat_target_info:
chat_target_name = (
chat_target_info.get("person_name")
or chat_target_info.get("user_nickname")
or "对方"
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or "对方"
)
chat_context_description = f"你正在和 {chat_target_name} 私聊"
action_options_block = await self._build_action_options(
current_available_actions, mode
)
action_options_block = await self._build_action_options(current_available_actions, mode)
moderation_prompt_block = "请不要输出违法违规内容,不要输出色情,暴力,政治相关内容,如有敏感内容,请规避。"
custom_prompt_block = ""
if global_config.custom_prompt.planner_custom_prompt_content:
custom_prompt_block = (
global_config.custom_prompt.planner_custom_prompt_content
)
custom_prompt_block = global_config.custom_prompt.planner_custom_prompt_content
planner_prompt_template = await global_prompt_manager.get_prompt_async(
"planner_prompt"
)
planner_prompt_template = await global_prompt_manager.get_prompt_async("planner_prompt")
prompt = planner_prompt_template.format(
schedule_block=schedule_block,
mood_block=mood_block,
@@ -555,9 +530,7 @@ class ActionPlanner:
"""
is_group_chat = True
is_group_chat, chat_target_info = get_chat_type_and_target_info(self.chat_id)
logger.debug(
f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}"
)
logger.debug(f"{self.log_prefix}获取到聊天信息 - 群聊: {is_group_chat}, 目标信息: {chat_target_info}")
current_available_actions_dict = self.action_manager.get_using_actions()
@@ -568,13 +541,9 @@ class ActionPlanner:
current_available_actions = {}
for action_name in current_available_actions_dict:
if action_name in all_registered_actions:
current_available_actions[action_name] = all_registered_actions[
action_name
]
current_available_actions[action_name] = all_registered_actions[action_name]
else:
logger.warning(
f"{self.log_prefix}使用中的动作 {action_name} 未在已注册动作中找到"
)
logger.warning(f"{self.log_prefix}使用中的动作 {action_name} 未在已注册动作中找到")
# 将no_reply作为系统级特殊动作添加到可用动作中
# no_reply虽然是系统级决策但需要让规划器认为它是可用的

View File

@@ -15,7 +15,6 @@ from src.chat.utils.prompt_utils import PromptUtils
from src.mais4u.mai_think import mai_thinking_manager
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.config.api_ada_configs import TaskConfig
from src.individuality.individuality import get_individuality
from src.llm_models.utils_model import LLMRequest
from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending
@@ -706,16 +705,16 @@ class DefaultReplyer:
# 检查最新五条消息中是否包含bot自己说的消息
latest_5_messages = core_dialogue_list[-5:] if len(core_dialogue_list) >= 5 else core_dialogue_list
has_bot_message = any(str(msg.get("user_id")) == bot_id for msg in latest_5_messages)
# logger.info(f"最新五条消息:{latest_5_messages}")
# logger.info(f"最新五条消息中是否包含bot自己说的消息{has_bot_message}")
# 如果最新五条消息中不包含bot的消息则返回空字符串
if not has_bot_message:
core_dialogue_prompt = ""
else:
core_dialogue_list = core_dialogue_list[-int(global_config.chat.max_context_size * 2) :] # 限制消息数量
core_dialogue_prompt_str = build_readable_messages(
core_dialogue_list,
replace_bot_name=True,
@@ -819,7 +818,7 @@ class DefaultReplyer:
mood_prompt = ""
if reply_to:
#兼容旧的reply_to
# 兼容旧的reply_to
sender, target = self._parse_reply_target(reply_to)
else:
# 获取 platform如果不存在则从 chat_stream 获取,如果还是 None 则使用默认值
@@ -830,7 +829,7 @@ class DefaultReplyer:
)
person_name = await person_info_manager.get_value(person_id, "person_name")
sender = person_name
target = reply_message.get('processed_plain_text')
target = reply_message.get("processed_plain_text")
person_info_manager = get_person_info_manager()
person_id = person_info_manager.get_person_id_by_person_name(sender)
@@ -1021,7 +1020,7 @@ class DefaultReplyer:
chat_stream = self.chat_stream
chat_id = chat_stream.stream_id
is_group_chat = bool(chat_stream.group_info)
if reply_message:
sender = reply_message.get("sender")
target = reply_message.get("target")
@@ -1178,7 +1177,9 @@ class DefaultReplyer:
else:
logger.debug(f"\n{prompt}\n")
content, (reasoning_content, model_name, tool_calls) = await self.express_model.generate_response_async(prompt)
content, (reasoning_content, model_name, tool_calls) = await self.express_model.generate_response_async(
prompt
)
logger.debug(f"replyer生成内容: {content}")
return content, reasoning_content, model_name, tool_calls

View File

@@ -1,7 +1,6 @@
from typing import Dict, Optional, List, Tuple
from typing import Dict, Optional
from src.common.logger import get_logger
from src.config.api_ada_configs import TaskConfig
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.chat.replyer.default_generator import DefaultReplyer

View File

@@ -1250,7 +1250,7 @@ async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]:
# 检查必要信息是否存在 且 不是机器人自己
if not all([platform, user_id]) or user_id == global_config.bot.qq_account:
continue
# 添加空值检查,防止 platform 为 None 时出错
if platform is None:
platform = "unknown"

View File

@@ -4,17 +4,13 @@
"""
import re
import time
from typing import Dict, Any, Optional, Tuple
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.utils.chat_message_builder import (
get_raw_msg_before_timestamp_with_chat,
build_readable_messages_with_id,
)
from src.chat.message_receive.chat_stream import get_chat_manager
from src.person_info.person_info import get_person_info_manager
from src.plugin_system.apis import cross_context_api
logger = get_logger("prompt_utils")
@@ -88,108 +84,24 @@ class PromptUtils:
) -> str:
"""
构建跨群聊上下文 - 统一实现完全继承DefaultReplyer功能
Args:
chat_id: 当前聊天ID
target_user_info: 目标用户信息
current_prompt_mode: 当前提示模式
Returns:
str: 跨群上下文块
"""
if not global_config.cross_context.enable:
return ""
# 找到当前群聊所在的共享组
target_group = None
current_stream = get_chat_manager().get_stream(chat_id)
if not current_stream or not current_stream.group_info:
other_chat_raw_ids = cross_context_api.get_context_groups(chat_id)
if not other_chat_raw_ids:
return ""
try:
current_chat_raw_id = current_stream.group_info.group_id
except Exception as e:
logger.error(f"获取群聊ID失败: {e}")
chat_stream = get_chat_manager().get_stream(chat_id)
if not chat_stream:
return ""
for group in global_config.cross_context.groups:
if str(current_chat_raw_id) in group.chat_ids:
target_group = group
break
if not target_group:
return ""
# 根据prompt_mode选择策略
other_chat_raw_ids = [chat_id for chat_id in target_group.chat_ids if chat_id != str(current_chat_raw_id)]
cross_context_messages = []
if current_prompt_mode == "normal":
# normal模式获取其他群聊的最近N条消息
for chat_raw_id in other_chat_raw_ids:
stream_id = get_chat_manager().get_stream_id(current_stream.platform, chat_raw_id, is_group=True)
if not stream_id:
continue
try:
messages = get_raw_msg_before_timestamp_with_chat(
chat_id=stream_id,
timestamp=time.time(),
limit=5, # 可配置
)
if messages:
chat_name = get_chat_manager().get_stream_name(stream_id) or stream_id
formatted_messages, _ = build_readable_messages_with_id(messages, timestamp_mode="relative")
cross_context_messages.append(f'[以下是来自"{chat_name}"的近期消息]\n{formatted_messages}')
except Exception as e:
logger.error(f"获取群聊{chat_raw_id}的消息失败: {e}")
continue
return await cross_context_api.build_cross_context_normal(chat_stream, other_chat_raw_ids)
elif current_prompt_mode == "s4u":
# s4u模式获取当前发言用户在其他群聊的消息
if target_user_info:
user_id = target_user_info.get("user_id")
return await cross_context_api.build_cross_context_s4u(chat_stream, other_chat_raw_ids, target_user_info)
if user_id:
for chat_raw_id in other_chat_raw_ids:
stream_id = get_chat_manager().get_stream_id(
current_stream.platform, chat_raw_id, is_group=True
)
if not stream_id:
continue
try:
messages = get_raw_msg_before_timestamp_with_chat(
chat_id=stream_id,
timestamp=time.time(),
limit=20, # 获取更多消息以供筛选
)
user_messages = [msg for msg in messages if msg.get("user_id") == user_id][
-5:
] # 筛选并取最近5条
if user_messages:
chat_name = get_chat_manager().get_stream_name(stream_id) or stream_id
user_name = (
target_user_info.get("person_name")
or target_user_info.get("user_nickname")
or user_id
)
formatted_messages, _ = build_readable_messages_with_id(
user_messages, timestamp_mode="relative"
)
cross_context_messages.append(
f'[以下是"{user_name}""{chat_name}"的近期发言]\n{formatted_messages}'
)
except Exception as e:
logger.error(f"获取用户{user_id}在群聊{chat_raw_id}的消息失败: {e}")
continue
if not cross_context_messages:
return ""
return "# 跨群上下文参考\n" + "\n\n".join(cross_context_messages) + "\n"
return ""
@staticmethod
def parse_reply_target_id(reply_to: str) -> str:

View File

@@ -4,7 +4,6 @@ import time
import hashlib
import uuid
import io
import asyncio
import numpy as np
from typing import Optional, Tuple, Dict, Any
@@ -27,16 +26,15 @@ logger = get_logger("chat_image")
def is_image_message(message: Dict[str, Any]) -> bool:
"""
判断消息是否为图片消息
Args:
message: 消息字典
Returns:
bool: 是否为图片消息
"""
return message.get("type") == "image" or (
isinstance(message.get("content"), dict) and
message["content"].get("type") == "image"
isinstance(message.get("content"), dict) and message["content"].get("type") == "image"
)
@@ -596,7 +594,6 @@ class ImageManager:
return "", "[图片]"
# 创建全局单例
image_manager = None