@@ -3,6 +3,8 @@ import time
import traceback
from random import random
from typing import List , Optional , Dict , Any # 导入类型提示
import os
import pickle
from maim_message import UserInfo , Seg
from src . common . logger import get_logger
from src . chat . heart_flow . utils_chat import get_chat_type_and_target_info
@@ -22,13 +24,21 @@ from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionMod
from src . chat . normal_chat . normal_chat_expressor import NormalChatExpressor
from src . chat . focus_chat . replyer . default_replyer import DefaultReplyer
from src . person_info . person_info import PersonInfoManager
from src . chat . utils . chat_message_builder import get_raw_msg_by_timestamp_with_chat
from src . chat . utils . chat_message_builder import get_raw_msg_by_timestamp_with_chat , get_raw_msg_by_timestamp_with_chat_inclusive , get_raw_msg_before_timestamp_with_chat , num_new_messages_since
from src . person_info . relationship_manager import get_relationship_manager
willing_manager = get_willing_manager ( )
logger = get_logger ( " normal_chat " )
# 消息段清理配置
SEGMENT_CLEANUP_CONFIG = {
" enable_cleanup " : True , # 是否启用清理
" max_segment_age_days " : 7 , # 消息段最大保存天数
" max_segments_per_user " : 10 , # 每用户最大消息段数
" cleanup_interval_hours " : 1 , # 清理间隔(小时)
}
class NormalChat :
def __init__ ( self , chat_stream : ChatStream , interest_dict : dict = None , on_switch_to_focus_callback = None ) :
@@ -67,16 +77,307 @@ class NormalChat:
self . recent_replies = [ ]
self . max_replies_history = 20 # 最多保存最近20条回复记录
# 添加engaging_person统计
self . engaging_persons = { } # person_id -> {first_time, last_time, receive_count, reply_count, relation_built }
# 新的消息段缓存结构:
# { person_id: [{"start_time": float, "end_time": float, "last_msg_time": float, "message_count": int}, ...] }
self . person_engaged_cache : Dict [ str , List [ Dict [ str , any ] ] ] = { }
# 持久化存储文件路径
self . cache_file_path = os . path . join ( " data " , f " relationship_cache_ { self . stream_id } .pkl " )
# 最后处理的消息时间,避免重复处理相同消息
self . last_processed_message_time = 0.0
# 最后清理时间,用于定期清理老消息段
self . last_cleanup_time = 0.0
# 添加回调函数, 用于在满足条件时通知切换到focus_chat模式
self . on_switch_to_focus_callback = on_switch_to_focus_callback
self . _disabled = False # 增加停用标志
# 加载持久化的缓存
self . _load_cache ( )
logger . debug ( f " [ { self . stream_name } ] NormalChat 初始化完成 (异步部分)。 " )
# ================================
# 缓存管理模块
# 负责持久化存储、状态管理、缓存读写
# ================================
def _load_cache ( self ) :
""" 从文件加载持久化的缓存 """
if os . path . exists ( self . cache_file_path ) :
try :
with open ( self . cache_file_path , ' rb ' ) as f :
cache_data = pickle . load ( f )
# 新格式:包含额外信息的缓存
self . person_engaged_cache = cache_data . get ( ' person_engaged_cache ' , { } )
self . last_processed_message_time = cache_data . get ( ' last_processed_message_time ' , 0.0 )
self . last_cleanup_time = cache_data . get ( ' last_cleanup_time ' , 0.0 )
logger . info ( f " [ { self . stream_name } ] 成功加载关系缓存,包含 { len ( self . person_engaged_cache ) } 个用户,最后处理时间: { time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( self . last_processed_message_time ) ) if self . last_processed_message_time > 0 else ' 未设置 ' } " )
except Exception as e :
logger . error ( f " [ { self . stream_name } ] 加载关系缓存失败: { e } " )
self . person_engaged_cache = { }
self . last_processed_message_time = 0.0
else :
logger . info ( f " [ { self . stream_name } ] 关系缓存文件不存在,使用空缓存 " )
def _save_cache ( self ) :
""" 保存缓存到文件 """
try :
os . makedirs ( os . path . dirname ( self . cache_file_path ) , exist_ok = True )
cache_data = {
' person_engaged_cache ' : self . person_engaged_cache ,
' last_processed_message_time ' : self . last_processed_message_time ,
' last_cleanup_time ' : self . last_cleanup_time
}
with open ( self . cache_file_path , ' wb ' ) as f :
pickle . dump ( cache_data , f )
logger . debug ( f " [ { self . stream_name } ] 成功保存关系缓存 " )
except Exception as e :
logger . error ( f " [ { self . stream_name } ] 保存关系缓存失败: { e } " )
# ================================
# 消息段管理模块
# 负责跟踪用户消息活动、管理消息段、清理过期数据
# ================================
def _update_message_segments ( self , person_id : str , message_time : float ) :
""" 更新用户的消息段
Args:
person_id: 用户ID
message_time: 消息时间戳
"""
if person_id not in self . person_engaged_cache :
self . person_engaged_cache [ person_id ] = [ ]
segments = self . person_engaged_cache [ person_id ]
current_time = time . time ( )
# 获取该消息前5条消息的时间作为潜在的开始时间
before_messages = get_raw_msg_before_timestamp_with_chat ( self . stream_id , message_time , limit = 5 )
if before_messages :
# 由于get_raw_msg_before_timestamp_with_chat返回按时间升序排序的消息, 最后一个是最接近message_time的
# 我们需要第一个消息作为开始时间, 但应该确保至少包含5条消息或该用户之前的消息
potential_start_time = before_messages [ 0 ] [ ' time ' ]
else :
# 如果没有前面的消息,就从当前消息开始
potential_start_time = message_time
# 如果没有现有消息段,创建新的
if not segments :
new_segment = {
" start_time " : potential_start_time ,
" end_time " : message_time ,
" last_msg_time " : message_time ,
" message_count " : self . _count_messages_in_timerange ( potential_start_time , message_time )
}
segments . append ( new_segment )
logger . info ( f " [ { self . stream_name } ] 为用户 { person_id } 创建新消息段: 时间范围 { time . strftime ( ' % H: % M: % S ' , time . localtime ( potential_start_time ) ) } - { time . strftime ( ' % H: % M: % S ' , time . localtime ( message_time ) ) } , 消息数: { new_segment [ ' message_count ' ] } " )
self . _save_cache ( )
return
# 获取最后一个消息段
last_segment = segments [ - 1 ]
# 计算从最后一条消息到当前消息之间的消息数量(不包含边界)
messages_between = self . _count_messages_between ( last_segment [ " last_msg_time " ] , message_time )
if messages_between < = 10 :
# 在10条消息内, 延伸当前消息段
last_segment [ " end_time " ] = message_time
last_segment [ " last_msg_time " ] = message_time
# 重新计算整个消息段的消息数量
last_segment [ " message_count " ] = self . _count_messages_in_timerange (
last_segment [ " start_time " ] , last_segment [ " end_time " ]
)
logger . debug ( f " [ { self . stream_name } ] 延伸用户 { person_id } 的消息段: { last_segment } " )
else :
# 超过10条消息, 结束当前消息段并创建新的
# 结束当前消息段: 延伸到原消息段最后一条消息后5条消息的时间
after_messages = get_raw_msg_by_timestamp_with_chat (
self . stream_id , last_segment [ " last_msg_time " ] , current_time , limit = 5 , limit_mode = " earliest "
)
if after_messages and len ( after_messages ) > = 5 :
# 如果有足够的后续消息, 使用第5条消息的时间作为结束时间
last_segment [ " end_time " ] = after_messages [ 4 ] [ ' time ' ]
else :
# 如果没有足够的后续消息,保持原有的结束时间
pass
# 重新计算当前消息段的消息数量
last_segment [ " message_count " ] = self . _count_messages_in_timerange (
last_segment [ " start_time " ] , last_segment [ " end_time " ]
)
# 创建新的消息段
new_segment = {
" start_time " : potential_start_time ,
" end_time " : message_time ,
" last_msg_time " : message_time ,
" message_count " : self . _count_messages_in_timerange ( potential_start_time , message_time )
}
segments . append ( new_segment )
logger . info ( f " [ { self . stream_name } ] 为用户 { person_id } 创建新消息段( 超过10条消息间隔) : { new_segment } " )
self . _save_cache ( )
def _count_messages_in_timerange ( self , start_time : float , end_time : float ) - > int :
""" 计算指定时间范围内的消息数量(包含边界) """
messages = get_raw_msg_by_timestamp_with_chat_inclusive ( self . stream_id , start_time , end_time )
return len ( messages )
def _count_messages_between ( self , start_time : float , end_time : float ) - > int :
""" 计算两个时间点之间的消息数量(不包含边界),用于间隔检查 """
return num_new_messages_since ( self . stream_id , start_time , end_time )
def _get_total_message_count ( self , person_id : str ) - > int :
""" 获取用户所有消息段的总消息数量 """
if person_id not in self . person_engaged_cache :
return 0
total_count = 0
for segment in self . person_engaged_cache [ person_id ] :
total_count + = segment [ " message_count " ]
return total_count
def _cleanup_old_segments ( self ) - > bool :
""" 清理老旧的消息段
Returns:
bool: 是否执行了清理操作
"""
if not SEGMENT_CLEANUP_CONFIG [ " enable_cleanup " ] :
return False
current_time = time . time ( )
# 检查是否需要执行清理(基于时间间隔)
cleanup_interval_seconds = SEGMENT_CLEANUP_CONFIG [ " cleanup_interval_hours " ] * 3600
if current_time - self . last_cleanup_time < cleanup_interval_seconds :
return False
logger . info ( f " [ { self . stream_name } ] 开始执行老消息段清理... " )
cleanup_stats = {
" users_cleaned " : 0 ,
" segments_removed " : 0 ,
" total_segments_before " : 0 ,
" total_segments_after " : 0
}
max_age_seconds = SEGMENT_CLEANUP_CONFIG [ " max_segment_age_days " ] * 24 * 3600
max_segments_per_user = SEGMENT_CLEANUP_CONFIG [ " max_segments_per_user " ]
users_to_remove = [ ]
for person_id , segments in self . person_engaged_cache . items ( ) :
cleanup_stats [ " total_segments_before " ] + = len ( segments )
original_segment_count = len ( segments )
# 1. 按时间清理:移除过期的消息段
segments_after_age_cleanup = [ ]
for segment in segments :
segment_age = current_time - segment [ " end_time " ]
if segment_age < = max_age_seconds :
segments_after_age_cleanup . append ( segment )
else :
cleanup_stats [ " segments_removed " ] + = 1
logger . debug ( f " [ { self . stream_name } ] 移除用户 { person_id } 的过期消息段: { time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( segment [ ' start_time ' ] ) ) } - { time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( segment [ ' end_time ' ] ) ) } " )
# 2. 按数量清理:如果消息段数量仍然过多,保留最新的
if len ( segments_after_age_cleanup ) > max_segments_per_user :
# 按end_time排序, 保留最新的
segments_after_age_cleanup . sort ( key = lambda x : x [ " end_time " ] , reverse = True )
segments_removed_count = len ( segments_after_age_cleanup ) - max_segments_per_user
cleanup_stats [ " segments_removed " ] + = segments_removed_count
segments_after_age_cleanup = segments_after_age_cleanup [ : max_segments_per_user ]
logger . debug ( f " [ { self . stream_name } ] 用户 { person_id } 消息段数量过多,移除 { segments_removed_count } 个最老的消息段 " )
# 使用清理后的消息段
# 更新缓存
if len ( segments_after_age_cleanup ) == 0 :
# 如果没有剩余消息段,标记用户为待移除
users_to_remove . append ( person_id )
else :
self . person_engaged_cache [ person_id ] = segments_after_age_cleanup
cleanup_stats [ " total_segments_after " ] + = len ( segments_after_age_cleanup )
if original_segment_count != len ( segments_after_age_cleanup ) :
cleanup_stats [ " users_cleaned " ] + = 1
# 移除没有消息段的用户
for person_id in users_to_remove :
del self . person_engaged_cache [ person_id ]
logger . debug ( f " [ { self . stream_name } ] 移除用户 { person_id } :没有剩余消息段 " )
# 更新最后清理时间
self . last_cleanup_time = current_time
# 保存缓存
if cleanup_stats [ " segments_removed " ] > 0 or len ( users_to_remove ) > 0 :
self . _save_cache ( )
logger . info ( f " [ { self . stream_name } ] 清理完成 - 影响用户: { cleanup_stats [ ' users_cleaned ' ] } , 移除消息段: { cleanup_stats [ ' segments_removed ' ] } , 移除用户: { len ( users_to_remove ) } " )
logger . info ( f " [ { self . stream_name } ] 消息段统计 - 清理前: { cleanup_stats [ ' total_segments_before ' ] } , 清理后: { cleanup_stats [ ' total_segments_after ' ] } " )
else :
logger . debug ( f " [ { self . stream_name } ] 清理完成 - 无需清理任何内容 " )
return cleanup_stats [ " segments_removed " ] > 0 or len ( users_to_remove ) > 0
def get_cache_status ( self ) - > str :
""" 获取缓存状态信息,用于调试和监控 """
if not self . person_engaged_cache :
return f " [ { self . stream_name } ] 关系缓存为空 "
status_lines = [ f " [ { self . stream_name } ] 关系缓存状态: " ]
status_lines . append ( f " 最后处理消息时间: { time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( self . last_processed_message_time ) ) if self . last_processed_message_time > 0 else ' 未设置 ' } " )
status_lines . append ( f " 最后清理时间: { time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( self . last_cleanup_time ) ) if self . last_cleanup_time > 0 else ' 未执行 ' } " )
status_lines . append ( f " 总用户数: { len ( self . person_engaged_cache ) } " )
status_lines . append ( f " 清理配置: { ' 启用 ' if SEGMENT_CLEANUP_CONFIG [ ' enable_cleanup ' ] else ' 禁用 ' } (最大保存 { SEGMENT_CLEANUP_CONFIG [ ' max_segment_age_days ' ] } 天, 每用户最多 { SEGMENT_CLEANUP_CONFIG [ ' max_segments_per_user ' ] } 段) " )
status_lines . append ( " " )
for person_id , segments in self . person_engaged_cache . items ( ) :
total_count = self . _get_total_message_count ( person_id )
status_lines . append ( f " 用户 { person_id } : " )
status_lines . append ( f " 总消息数: { total_count } ( { total_count } /45) " )
status_lines . append ( f " 消息段数: { len ( segments ) } " )
for i , segment in enumerate ( segments ) :
start_str = time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( segment [ ' start_time ' ] ) )
end_str = time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( segment [ ' end_time ' ] ) )
last_str = time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( segment [ ' last_msg_time ' ] ) )
status_lines . append ( f " 段 { i + 1 } : { start_str } -> { end_str } (最后消息: { last_str } , 消息数: { segment [ ' message_count ' ] } ) " )
status_lines . append ( " " )
return " \n " . join ( status_lines )
def _update_user_message_segments ( self , message : MessageRecv ) :
""" 更新用户消息段信息 """
current_time = time . time ( )
user_id = message . message_info . user_info . user_id
platform = message . message_info . platform
msg_time = message . message_info . time
# 跳过机器人自己的消息
if user_id == global_config . bot . qq_account :
return
# 只处理新消息(避免重复处理)
if msg_time < = self . last_processed_message_time :
return
person_id = PersonInfoManager . get_person_id ( platform , user_id )
self . _update_message_segments ( person_id , msg_time )
# 更新最后处理时间
self . last_processed_message_time = max ( self . last_processed_message_time , msg_time )
logger . debug ( f " [ { self . stream_name } ] 更新用户 { person_id } 的消息段,消息时间: { time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( msg_time ) ) } " )
# 改为实例方法
async def _create_thinking_message ( self , message : MessageRecv , timestamp : Optional [ float ] = None ) - > str :
""" 创建思考消息 """
@@ -282,8 +583,11 @@ class NormalChat:
logger . info ( f " [ { self . stream_name } ] 已停用,忽略 normal_response。 " )
return
# 更新engaging_persons统计信息
self . _update_engaging_person_stats ( message , is_reply = False )
# 执行定期清理
self . _cleanup_old_segments ( )
# 更新消息段信息
self . _update_user_message_segments ( message )
# 检查是否有用户满足关系构建条件
asyncio . create_task ( self . _check_relation_building_conditions ( ) )
@@ -477,8 +781,7 @@ class NormalChat:
# 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况)
if first_bot_msg :
# 更新engaging_persons统计信息 - 标记为回复
self . _update_engaging_person_stats ( message , is_reply = True )
# 消息段已在接收消息时更新,这里不需要额外处理
# 记录回复信息到最近回复列表中
reply_info = {
@@ -769,200 +1072,91 @@ class NormalChat:
""" 获取动作管理器实例 """
return self . action_manager
def _update_engaging_person_stats ( self , message : MessageRecv , is_reply : bool ) :
""" 更新engaging_persons统计信息 """
# 通过platform和user_id计算person_id
platform = message . message_info . platform
user_id = message . message_info . user_info . user_id
person_id = PersonInfoManager . get_person_id ( platform , user_id )
current_time = time . time ( )
if person_id not in self . engaging_persons :
self . engaging_persons [ person_id ] = {
" first_time " : current_time ,
" last_time " : current_time ,
" receive_count " : 0 ,
" reply_count " : 0 ,
" relation_built " : False ,
}
if is_reply :
self . engaging_persons [ person_id ] [ " reply_count " ] + = 1
logger . debug (
f " [ { self . stream_name } ] 用户 { person_id } 回复次数更新: { self . engaging_persons [ person_id ] [ ' reply_count ' ] } "
)
else :
self . engaging_persons [ person_id ] [ " receive_count " ] + = 1
self . engaging_persons [ person_id ] [ " last_time " ] = current_time
logger . debug (
f " [ { self . stream_name } ] 用户 { person_id } 消息次数更新: { self . engaging_persons [ person_id ] [ ' receive_count ' ] } "
)
async def _check_relation_building_conditions ( self ) :
""" 检查engaging_persons 中是否有满足关系构建条件的用户 """
current_time = time . time ( )
""" 检查person_engaged_cache 中是否有满足关系构建条件的用户 """
users_to_build_relationship = [ ]
for person_id , sta ts in list ( self . engaging_persons . items ( ) ) :
# 计算时间差和消息数量
time_elapsed = current_time - stats [ " first_time " ]
total_messages = self . _get_total_messages_in_timerange ( stats [ " first_time " ] , stats [ " last_time " ] )
# print(f"person_id: {person_id}, total_messages: {total_messages}, time_elapsed: {time_elapsed}")
# 检查是否满足关系构建条件
should_build_relation = (
total_messages > = 30 # 30条消息必定满足
or ( total_messages > = 15 and time_elapsed > = 600 ) # 15条且10分钟
or ( total_messages > = 10 and time_elapsed > = 900 ) # 10条且15分钟
or ( total_messages > = 5 and time_elapsed > = 1800 ) # 5条且30
)
if should_build_relation :
for person_id , segmen ts in list ( self . person_engaged_cache . items ( ) ) :
total_message_count = self . _get_total_message_count ( person_id )
if total_message_count > = 45 :
users_to_build_relationship . append ( person_id )
logger . info (
f " [ { self . stream_name } ] 用户 { person_id } 满足关系构建条件。 "
f " 消息数: { total_messages } ,时长: { time_elapsed : .0f } 秒, "
f " 收到消息: { stats [ ' receive_count ' ] } ,回复次数: { stats [ ' reply_count ' ] } "
f " [ { self . stream_name } ] 用户 { person_id } 满足关系构建条件,总消息数: { total_message_count } ,消息段数: { len ( segments ) } "
)
elif total_message_count > 0 :
# 记录进度信息
logger . debug (
f " [ { self . stream_name } ] 用户 { person_id } 进度: { total_message_count } /45 条消息, { len ( segments ) } 个消息段 "
)
# 计算构建概率并决定是否构建
await self . _evaluate_and_build_relation ( person_id , stats , total_messages )
# 为满足条件的用户构建关系
for person_id in users_to_build_relationship :
segments = self . person_engaged_cache [ person_id ]
# 异步执行关系构建
asyncio . create_task (
self . _build_relation_for_person_segments ( person_id , segments )
)
# 移除已处理的用户缓存
del self . person_engaged_cache [ person_id ]
self . _save_cache ( )
logger . info ( f " [ { self . stream_name } ] 用户 { person_id } 关系构建已启动,缓存已清理 " )
# 评估完成后移除该用户,重新开始统计
del self . engaging_persons [ person_id ]
logger . info ( f " [ { self . stream_name } ] 用户 { person_id } 评估完成,已移除记录,将重新开始统计 " )
def _get_total_messages_in_timerange ( self , start_time : float , end_time : float ) - > int :
""" 获取指定时间范围内的总消息数量 """
async def _build_relation_for_person_segments ( self , person_id : str , segments : List [ Dict [ str , any ] ] ) :
""" 基于消息段为特定用户构建关系 """
logger . info ( f " [ { self . stream_name } ] 开始为 { person_id } 基于 { len ( segments ) } 个消息段更新印象 " )
try :
messages = get_raw_msg_by_timestamp_with_chat ( self . stream_id , start_time , end_time )
return len ( messages ) if messages else 0
except Exception as e :
logger . error ( f " [ { self . stream_name } ] 获取时间范围内消息数量失败: { e } " )
return 0
processed_messages = [ ]
async def _evaluate_and_build_relation ( self , person_id : str , stats : dict , total_messages : i nt) :
""" 评估并执行关系构建 """
import math
for i , segment in enumerate ( segme nts ) :
start_time = segment [ " start_time " ]
end_time = segment [ " end_time " ]
message_count = segment [ " message_count " ]
start_date = time . strftime ( ' % Y- % m- %d % H: % M ' , time . localtime ( start_time ) )
receive_count = stats [ " receive_count " ]
reply_count = stats [ " reply_count " ]
# 获取该段的消息(包含边界)
segment_messages = get_raw_msg_by_timestamp_with_chat_inclusive ( self . stream_id , start_time , end_time )
logger . info ( f " [ { self . stream_name } ] 消息段 { i + 1 } : { start_date } - { time . strftime ( ' % Y- % m- %d % H: % M ' , time . localtime ( end_time ) ) } , 消息数: { len ( segment_messages ) } " )
# 计算回复概率( reply_count在总消息中的比值)
reply_ratio = reply_count / total_messages if total_messages > 0 else 0
# 使用对数函数让低比率时概率上升更快: log(1 + ratio * k) / log(1 + k) + base
# k=7时, 0.05比率对应约0.4概率, 0.1比率对应约0.6概率, 0.2比率对应约0.8概率
k_reply = 10 * global_config . relationship . relation_frequency
base_reply_prob = 0.1 # 基础概率10%
reply_build_probability = (
( math . log ( 1 + reply_ratio * k_reply ) / math . log ( 1 + k_reply ) ) * 0.9 + base_reply_prob
if reply_ratio > 0
else base_reply_prob
)
if segment_messages :
# 如果不是第一个消息段,在消息列表前添加间隔标识
if i > 0 :
# 创建一个特殊的间隔消息
gap_message = {
" time " : start_time - 0.1 , # 稍微早于段开始时间
" user_id " : " system " ,
" user_platform " : " system " ,
" user_nickname " : " 系统 " ,
" user_cardname " : " " ,
" display_message " : f " ...(中间省略一些消息) { start_date } 之后的消息如下... " ,
" is_action_record " : True ,
" chat_info_platform " : segment_messages [ 0 ] . get ( " chat_info_platform " , " " ) ,
" chat_id " : self . stream_id
}
processed_messages . append ( gap_message )
# 计算接收概率( receive_count的影响)
receive_ratio = receive_count / total_messages if total_messages > 0 else 0
# 接收概率使用更温和的对数曲线, 最大0.5, 基础0.08
k_receive = 10 * global_config . relationship . relation_frequency
base_receive_prob = 0.08 # 基础概率8%
receive_build_probability = (
( math . log ( 1 + receive_ratio * k_receive ) / math . log ( 1 + k_receive ) ) * 0.42 + base_receive_prob
if receive_ratio > 0
else base_receive_prob
)
# 添加该段的所有消息
processed_messages . extend ( segment_messages )
# 取最高概率
final_probability = max ( reply_build_probability , receive_build_probability )
if processed_messages :
# 按时间排序所有消息(包括间隔标识)
processed_messages . sort ( key = lambda x : x [ ' time ' ] )
logger . info (
f " [ { self . stream_name } ] 用户 { person_id } 关系构建概率评估: "
f " 回复比例: { reply_ratio : .2f } (对数概率: { reply_build_probability : .2f } ) "
f " ,接收比例: { receive_ratio : .2f } (对数概率: { receive_build_probability : .2f } ) "
f " ,最终概率: { final_probability : .2f } "
)
# 使用随机数决定是否构建关系
if random ( ) < final_probability :
logger . info ( f " [ { self . stream_name } ] 决定为用户 { person_id } 构建关系 " )
await self . _build_relation_for_person ( person_id , stats )
else :
logger . info ( f " [ { self . stream_name } ] 用户 { person_id } 未通过关系构建概率判定 " )
async def _build_relation_for_person ( self , person_id : str , stats : dict ) :
""" 为特定用户构建关系 """
try :
start_time = stats [ " first_time " ]
end_time = stats [ " last_time " ]
# 获取该时间段的所有消息用于关系构建
main_messages = get_raw_msg_by_timestamp_with_chat ( self . stream_id , start_time , end_time )
if not main_messages :
logger . warning ( f " [ { self . stream_name } ] 未找到用户 { person_id } 的消息,关系构建跳过 " )
return
# 获取第一条消息的时间戳, 然后获取之前的5条消息
first_message_time = main_messages [ 0 ] [ " time " ]
before_messages = self . _get_messages_before_timestamp ( first_message_time , 5 )
# 获取最后一条消息的时间戳, 然后获取之后的5条消息
last_message_time = main_messages [ - 1 ] [ " time " ]
after_messages = self . _get_messages_after_timestamp ( last_message_time , 5 )
# 合并所有消息并去重
all_messages = before_messages + main_messages + after_messages
# 根据消息ID去重并按时间排序
seen_ids = set ( )
unique_messages = [ ]
for msg in all_messages :
msg_id = msg [ " message_id " ]
if msg_id not in seen_ids :
seen_ids . add ( msg_id )
unique_messages . append ( msg )
# 按时间排序
unique_messages . sort ( key = lambda x : x [ " time " ] )
logger . info (
f " [ { self . stream_name } ] 为用户 { person_id } 获取到消息用于关系构建: "
f " 原时间段内 { len ( main_messages ) } 条,之前 { len ( before_messages ) } 条, "
f " 之后 { len ( after_messages ) } 条,去重后总计 { len ( unique_messages ) } 条 "
)
# 调用关系管理器更新印象
logger . info ( f " [ { self . stream_name } ] 为 { person_id } 获取到总共 { len ( processed_messages ) } 条消息(包含间隔标识)用于印象更新 " )
relationship_manager = get_relationship_manager ( )
# 调用原有的更新方法
await relationship_manager . update_person_impression (
person_id = person_id , timestamp = end_time , bot_engaged_messages = unique_messages
person_id = person_id ,
timestamp = time . time ( ) ,
bot_engaged_messages = processed_messages
)
logger . info ( f " [ { self . stream_name } ] 用户 { person_id } 关系构建完成 " )
else :
logger . warning ( f " [ { self . stream_name } ] 没有找到 { person_id } 的消息段对应的消息,不更新印象 " )
except Exception as e :
logger . error ( f " [ { self . stream_name } ] 为用户 { person_id } 构建关系时出错 : { e } " )
traceback . prin t_exc( )
def _get_messages_before_timestamp ( self , timestamp : float , limit : int = 5 ) - > List [ Dict [ str , Any ] ] :
""" 获取指定时间戳之前的指定数量消息 """
try :
from src . common . message_repository import find_messages
filter_query = { " chat_id " : self . stream_id , " time " : { " $lt " : timestamp } }
sort_order = [ ( " time " , - 1 ) ] # 倒序排列,取最近的几条
messages = find_messages ( message_filter = filter_query , sort = sort_order , limit = limit )
# 返回时保持正序
return sorted ( messages , key = lambda x : x [ " time " ] )
except Exception as e :
logger . error ( f " [ { self . stream_name } ] 获取时间戳之前的消息失败: { e } " )
return [ ]
def _get_messages_after_timestamp ( self , timestamp : float , limit : int = 5 ) - > List [ Dict [ str , Any ] ] :
""" 获取指定时间戳之后的指定数量消息 """
try :
from src . common . message_repository import find_messages
filter_query = { " chat_id " : self . stream_id , " time " : { " $gt " : timestamp } }
sort_order = [ ( " time " , 1 ) ] # 正序排列,取最早的几条
return find_messages ( message_filter = filter_query , sort = sort_order , limit = limit )
except Exception as e :
logger . error ( f " [ { self . stream_name } ] 获取时间戳之后的消息失败: { e } " )
return [ ]
logger . error ( f " [ { self . stream_name } ] 为 { person_id } 更新印象时发生错误 : { e } " )
logger . error ( traceback. forma t_exc( ) )