diff --git a/src/chat/focus_chat/info_processors/relationship_processor.py b/src/chat/focus_chat/info_processors/relationship_processor.py index 95628d4b3..32c3e5747 100644 --- a/src/chat/focus_chat/info_processors/relationship_processor.py +++ b/src/chat/focus_chat/info_processors/relationship_processor.py @@ -123,9 +123,6 @@ class RelationshipProcessor(BaseProcessor): # 获取聊天内容 chat_observe_info = observation.get_observe_info() person_list = observation.person_list - if isinstance(observation, HFCloopObservation): - # hfcloop_observe_info = observation.get_observe_info() - pass nickname_str = "" for nicknames in global_config.bot.alias_names: diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 3ceebc9e2..47b629c63 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -623,15 +623,15 @@ def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal" diff = now - timestamp if diff < 20: - return "刚刚:\n" + return "刚刚" elif diff < 60: - return f"{int(diff)}秒前:\n" + return f"{int(diff)}秒前" elif diff < 3600: - return f"{int(diff / 60)}分钟前:\n" + return f"{int(diff / 60)}分钟前" elif diff < 86400: - return f"{int(diff / 3600)}小时前:\n" + return f"{int(diff / 3600)}小时前" elif diff < 86400 * 2: - return f"{int(diff / 86400)}天前:\n" + return f"{int(diff / 86400)}天前" else: return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":\n" else: # mode = "lite" or unknown diff --git a/src/main.py b/src/main.py index afb42a8a9..564684feb 100644 --- a/src/main.py +++ b/src/main.py @@ -21,6 +21,7 @@ from .common.server import global_server, Server from rich.traceback import install from .chat.focus_chat.expressors.exprssion_learner import expression_learner from .api.main import start_api_server +from .person_info.impression_update_task import impression_update_task install(extra_lines=3) @@ -60,6 +61,9 @@ class MainSystem: # 添加遥测心跳任务 await async_task_manager.add_task(TelemetryHeartBeatTask()) + # 添加印象更新任务 + await async_task_manager.add_task(impression_update_task) + # 启动API服务器 start_api_server() logger.success("API服务器启动成功") diff --git a/src/person_info/impression_update_task.py b/src/person_info/impression_update_task.py new file mode 100644 index 000000000..53f50e303 --- /dev/null +++ b/src/person_info/impression_update_task.py @@ -0,0 +1,147 @@ +from src.manager.async_task_manager import AsyncTask +from src.common.logger_manager import get_logger +from src.person_info.relationship_manager import relationship_manager +from src.chat.message_receive.chat_stream import ChatStream +from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat_users +from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp +from src.config.config import global_config +from src.person_info.person_info import person_info_manager +from src.chat.message_receive.chat_stream import chat_manager +import time +import random +from collections import defaultdict + +logger = get_logger("impression_update_task") + +class ImpressionUpdateTask(AsyncTask): + def __init__(self): + super().__init__( + task_name="impression_update", + wait_before_start=10, # 启动后等待10秒 + run_interval=60 # 每1分钟运行一次 + ) + + async def run(self): + try: + logger.info("开始执行印象更新任务") + + # 获取最近10分钟的消息 + current_time = int(time.time()) + start_time = current_time - 3600 # 10分钟前 + logger.debug(f"获取时间范围: {start_time} -> {current_time}") + + # 获取所有消息 + messages = get_raw_msg_by_timestamp( + timestamp_start=start_time, + timestamp_end=current_time, + limit=200 + ) + + if not messages: + logger.info("没有找到需要处理的消息") + return + + logger.info(f"获取到 {len(messages)} 条消息") + + # 按chat_id分组消息 + chat_messages = defaultdict(list) + for msg in messages: + chat_messages[msg["chat_id"]].append(msg) + + logger.info(f"消息按聊天分组: {len(chat_messages)} 个聊天组") + + # 处理每个聊天组 + for chat_id, msgs in chat_messages.items(): + logger.info(f"处理聊天组 {chat_id}, 消息数: {len(msgs)}") + + # 获取chat_stream + chat_stream = chat_manager.get_stream(chat_id) + if not chat_stream: + logger.warning(f"未找到聊天组 {chat_id} 的chat_stream,跳过处理") + continue + + # 统计用户发言权重 + user_weights = defaultdict(lambda: {"weight": 0, "messages": [], "middle_time": 0}) + + # 找到bot的消息 + bot_messages = [msg for msg in msgs if msg["user_nickname"] == global_config.bot.nickname] + logger.debug(f"找到 {len(bot_messages)} 条bot消息") + + for bot_msg in bot_messages: + # 获取bot消息前后的消息 + bot_time = bot_msg["time"] + context_messages = [msg for msg in msgs if abs(msg["time"] - bot_time) <= 600] # 前后10分钟 + logger.debug(f"Bot消息 {bot_time} 的上下文消息数: {len(context_messages)}") + + # 计算权重 + for msg in context_messages: + if msg["user_nickname"] == global_config.bot.nickname: + continue + + person_id = person_info_manager.get_person_id(msg["chat_info_platform"], msg["user_id"]) + if not person_id: + logger.warning(f"未找到用户 {msg['user_nickname']} 的person_id") + continue + + # 在bot消息附近的发言权重加倍 + if abs(msg["time"] - bot_time) <= 300: # 前后5分钟 + user_weights[person_id]["weight"] += 2 + logger.debug(f"用户 {msg['user_nickname']} 在bot消息附近发言,权重+2") + else: + user_weights[person_id]["weight"] += 1 + logger.debug(f"用户 {msg['user_nickname']} 发言,权重+1") + + user_weights[person_id]["messages"].append(msg) + + # 计算每个用户的中间时间 + for person_id, data in user_weights.items(): + if data["messages"]: + sorted_messages = sorted(data["messages"], key=lambda x: x["time"]) + middle_index = len(sorted_messages) // 2 + data["middle_time"] = sorted_messages[middle_index]["time"] + logger.debug(f"用户 {sorted_messages[0]['user_nickname']} 中间时间: {data['middle_time']}") + + # 按权重排序 + sorted_users = sorted( + user_weights.items(), + key=lambda x: x[1]["weight"], + reverse=True + ) + + logger.info(f"用户权重排序: {[(msg[1]['messages'][0]['user_nickname'], msg[1]['weight']) for msg in sorted_users]}") + + # 随机选择三个用户 + selected_users = [] + if len(sorted_users) > 3: + # 使用权重作为概率进行随机选择 + weights = [user[1]["weight"] for user in sorted_users] + selected_indices = random.choices( + range(len(sorted_users)), + weights=weights, + k=3 + ) + selected_users = [sorted_users[i] for i in selected_indices] + logger.info(f"随机选择用户: {[msg[1]['messages'][0]['user_nickname'] for msg in selected_users]}") + else: + selected_users = sorted_users + logger.info(f"用户数不足3个,全部选择: {[msg[1]['messages'][0]['user_nickname'] for msg in selected_users]}") + + # 更新选中用户的印象 + for person_id, data in selected_users: + user_nickname = data["messages"][0]["user_nickname"] + logger.info(f"开始更新用户 {user_nickname} 的印象") + await relationship_manager.update_person_impression( + person_id=person_id, + chat_id=chat_id, + reason="", + timestamp=data["middle_time"] + ) + logger.info(f"用户 {user_nickname} 的印象更新完成") + + logger.info("印象更新任务执行完成") + + except Exception as e: + logger.exception(f"更新印象任务失败: {str(e)}") + +# 创建任务实例 +impression_update_task = ImpressionUpdateTask() \ No newline at end of file diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index 84442a2d5..0f212f417 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -32,6 +32,7 @@ logger = get_logger("person_info") person_info_default = { "person_id": None, "person_name": None, # 模型中已设为 null=True,此默认值OK + "person_name_reason": None, "name_reason": None, "platform": "unknown", # 提供非None的默认值 "user_id": "unknown", # 提供非None的默认值 diff --git a/src/person_info/relationship_manager.py b/src/person_info/relationship_manager.py index 6e9a4cb91..81c29d32a 100644 --- a/src/person_info/relationship_manager.py +++ b/src/person_info/relationship_manager.py @@ -1,26 +1,29 @@ from src.common.logger_manager import get_logger from src.chat.message_receive.chat_stream import ChatStream import math -from bson.decimal128 import Decimal128 from src.person_info.person_info import person_info_manager import time import random -from maim_message import UserInfo - +from src.llm_models.utils_model import LLMRequest +from src.config.config import global_config +from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat_users +from src.chat.utils.chat_message_builder import build_readable_messages from src.manager.mood_manager import mood_manager -# import re -# import traceback - logger = get_logger("relation") - class RelationshipManager: def __init__(self): self.positive_feedback_value = 0 # 正反馈系统 self.gain_coefficient = [1.0, 1.0, 1.1, 1.2, 1.4, 1.7, 1.9, 2.0] self._mood_manager = None + + self.relationship_llm = LLMRequest( + model=global_config.model.normal_chat_1, + max_tokens=1000, + request_type="relationship", # 用于动作规划 + ) @property def mood_manager(self): @@ -112,91 +115,6 @@ class RelationshipManager: person_id=person_id, user_nickname=user_nickname, user_cardname=user_cardname, user_avatar=user_avatar ) - async def calculate_update_relationship_value(self, user_info: UserInfo, platform: str, label: str, stance: str): - """计算并变更关系值 - 新的关系值变更计算方式: - 将关系值限定在-1000到1000 - 对于关系值的变更,期望: - 1.向两端逼近时会逐渐减缓 - 2.关系越差,改善越难,关系越好,恶化越容易 - 3.人维护关系的精力往往有限,所以当高关系值用户越多,对于中高关系值用户增长越慢 - 4.连续正面或负面情感会正反馈 - - 返回: - 用户昵称,变更值,变更后关系等级 - - """ - stancedict = { - "支持": 0, - "中立": 1, - "反对": 2, - } - - valuedict = { - "开心": 1.5, - "愤怒": -2.0, - "悲伤": -0.5, - "惊讶": 0.6, - "害羞": 2.0, - "平静": 0.3, - "恐惧": -1.5, - "厌恶": -1.0, - "困惑": 0.5, - } - - person_id = person_info_manager.get_person_id(platform, user_info.user_id) - data = { - "platform": platform, - "user_id": user_info.user_id, - "nickname": user_info.user_nickname, - "konw_time": int(time.time()), - } - old_value = await person_info_manager.get_value(person_id, "relationship_value") - old_value = self.ensure_float(old_value, person_id) - - if old_value > 1000: - old_value = 1000 - elif old_value < -1000: - old_value = -1000 - - value = valuedict[label] - if old_value >= 0: - if valuedict[label] >= 0 and stancedict[stance] != 2: - value = value * math.cos(math.pi * old_value / 2000) - if old_value > 500: - rdict = await person_info_manager.get_specific_value_list("relationship_value", lambda x: x > 700) - high_value_count = len(rdict) - if old_value > 700: - value *= 3 / (high_value_count + 2) # 排除自己 - else: - value *= 3 / (high_value_count + 3) - elif valuedict[label] < 0 and stancedict[stance] != 0: - value = value * math.exp(old_value / 2000) - else: - value = 0 - elif old_value < 0: - if valuedict[label] >= 0 and stancedict[stance] != 2: - value = value * math.exp(old_value / 2000) - elif valuedict[label] < 0 and stancedict[stance] != 0: - value = value * math.cos(math.pi * old_value / 2000) - else: - value = 0 - - self.positive_feedback_sys(label, stance) - value = self.mood_feedback(value) - - level_num = self.calculate_level_num(old_value + value) - relationship_level = ["厌恶", "冷漠", "一般", "友好", "喜欢", "暧昧"] - logger.info( - f"用户: {user_info.user_nickname}" - f"当前关系: {relationship_level[level_num]}, " - f"关系值: {old_value:.2f}, " - f"当前立场情感: {stance}-{label}, " - f"变更: {value:+.5f}" - ) - - await person_info_manager.update_one_field(person_id, "relationship_value", old_value + value, data) - async def calculate_update_relationship_value_with_reason( self, chat_stream: ChatStream, label: str, stance: str, reason: str ) -> tuple: @@ -337,6 +255,14 @@ class RelationshipManager: relation_prompt = f"{relation_value_prompt},ta在{platform}上的昵称是{nickname_str}。\n" else: relation_prompt = "" + + person_name_reason = await person_info_manager.get_value(person_id, "person_name_reason") + if person_name_reason: + relation_prompt += f"ta的昵称{person_name}的由来是:{person_name_reason}。\n" + + person_impression = await person_info_manager.get_value(person_id, "person_impression") + if person_impression: + relation_prompt += f"ta的印象是:{person_impression}。\n" return relation_prompt @@ -358,17 +284,95 @@ class RelationshipManager: else: level_num = 5 if relationship_value > 1000 else 0 return level_num + + + async def update_person_impression(self, person_id, chat_id, reason, timestamp): + """更新用户印象 + + Args: + person_id: 用户ID + chat_id: 聊天ID + reason: 更新原因 + timestamp: 时间戳 + """ + # 获取现有印象和用户信息 + person_name = await person_info_manager.get_value(person_id, "person_name") + nickname = await person_info_manager.get_value(person_id, "nickname") + old_impression = await person_info_manager.get_value(person_id, "person_impression") + user_id = await person_info_manager.get_value(person_id, "user_id") + + logger.debug(f"更新印象的person_id: {person_id}, chat_id: {chat_id}, reason: {reason}, timestamp: {timestamp}, user_id: {user_id}") - @staticmethod - def ensure_float(value, person_id): - """确保返回浮点数,转换失败返回0.0""" - if isinstance(value, float): - return value - try: - return float(value.to_decimal() if isinstance(value, Decimal128) else value) - except (ValueError, TypeError, AttributeError): - logger.warning(f"[关系管理] {person_id}值转换失败(原始值:{value}),已重置为0") - return 0.0 + # 获取时间戳前后的消息 + messages_before = get_raw_msg_by_timestamp_with_chat_users( + chat_id=chat_id, + timestamp_start=timestamp - 3600, # 前1小时 + timestamp_end=timestamp, + person_ids=[user_id], + limit=30, + limit_mode="latest" + ) + + messages_after = get_raw_msg_by_timestamp_with_chat_users( + chat_id=chat_id, + timestamp_start=timestamp, + timestamp_end=timestamp + 3600, # 后1小时 + person_ids=[user_id], + limit=30, + limit_mode="earliest" + ) + + # 合并消息并按时间排序 + user_messages = messages_before + messages_after + user_messages.sort(key=lambda x: x["time"]) + + # print(f"user_messages: {user_messages}") + + # 构建可读消息 + + readable_messages = await build_readable_messages( + messages=user_messages, + replace_bot_name=True, + timestamp_mode="relative", + truncate=False + ) + + # 使用LLM总结印象 + prompt = f"""基于以下信息,总结对{person_name}(昵称:{nickname})的印象: +原因:{reason} +历史印象:{old_impression if old_impression else '无'} +最近发言: +{readable_messages} + +请用简洁的语言总结对这个人的印象,不超过200字。""" + + new_impression, _ = await self.relationship_llm.generate_response_async(prompt=prompt) + + + logger.debug(f"新印象prompt:{prompt}") + logger.info(f"新印象:{new_impression}") + + + # 合并新旧印象 + if old_impression: + merge_prompt = f"""请将以下两段印象合并,形成一段连贯的描述: +旧印象:{old_impression} +新印象:{new_impression} + +请用简洁的语言合并这两段印象,不超过200字。""" + final_impression, _ = await self.relationship_llm.generate_response_async(prompt=merge_prompt) + + logger.debug(f"合并印象prompt:{merge_prompt}") + logger.info(f"合并印象:{final_impression}") + + else: + final_impression = new_impression + + + # 更新到数据库 + await person_info_manager.update_one_field(person_id, "person_impression", final_impression) + + return final_impression relationship_manager = RelationshipManager()