diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index dd454e86e..c616d3cc6 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -21,6 +21,9 @@ from src.chat.normal_chat.normal_chat_planner import NormalChatPlanner from src.chat.normal_chat.normal_chat_action_modifier import NormalChatActionModifier from src.chat.normal_chat.normal_chat_expressor import NormalChatExpressor from src.chat.focus_chat.replyer.default_replyer import DefaultReplyer +from src.person_info.person_info import PersonInfoManager +from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat +from src.person_info.relationship_manager import get_relationship_manager willing_manager = get_willing_manager() @@ -64,6 +67,9 @@ class NormalChat: self.recent_replies = [] self.max_replies_history = 20 # 最多保存最近20条回复记录 + # 添加engaging_person统计 + self.engaging_persons = {} # person_id -> {first_time, last_time, receive_count, reply_count, relation_built} + # 添加回调函数,用于在满足条件时通知切换到focus_chat模式 self.on_switch_to_focus_callback = on_switch_to_focus_callback @@ -176,6 +182,8 @@ class NormalChat: else: self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) + # print(self.engaging_persons) + await self.normal_response( message=message, is_mentioned=is_mentioned, @@ -219,6 +227,12 @@ class NormalChat: logger.info(f"[{self.stream_name}] 已停用,忽略 normal_response。") return + # 更新engaging_persons统计信息 + self._update_engaging_person_stats(message, is_reply=False) + + # 检查是否有用户满足关系构建条件 + asyncio.create_task(self._check_relation_building_conditions()) + timing_results = {} reply_probability = ( 1.0 if is_mentioned and global_config.normal_chat.mentioned_bot_inevitable_reply else 0.0 @@ -410,6 +424,9 @@ class NormalChat: # 检查 first_bot_msg 是否为 None (例如思考消息已被移除的情况) if first_bot_msg: + # 更新engaging_persons统计信息 - 标记为回复 + self._update_engaging_person_stats(message, is_reply=True) + # 记录回复信息到最近回复列表中 reply_info = { "time": time.time(), @@ -646,3 +663,255 @@ class NormalChat: def get_action_manager(self) -> ActionManager: """获取动作管理器实例""" return self.action_manager + + def _update_engaging_person_stats(self, message: MessageRecv, is_reply: bool): + """更新engaging_persons统计信息""" + # 通过platform和user_id计算person_id + platform = message.message_info.platform + user_id = message.message_info.user_info.user_id + person_id = PersonInfoManager.get_person_id(platform, user_id) + current_time = time.time() + + if person_id not in self.engaging_persons: + self.engaging_persons[person_id] = { + "first_time": current_time, + "last_time": current_time, + "receive_count": 0, + "reply_count": 0, + "relation_built": False + } + + if is_reply: + self.engaging_persons[person_id]["reply_count"] += 1 + logger.debug(f"[{self.stream_name}] 用户 {person_id} 回复次数更新: {self.engaging_persons[person_id]['reply_count']}") + else: + self.engaging_persons[person_id]["receive_count"] += 1 + self.engaging_persons[person_id]["last_time"] = current_time + logger.debug(f"[{self.stream_name}] 用户 {person_id} 消息次数更新: {self.engaging_persons[person_id]['receive_count']}") + + def get_engaging_persons(self) -> dict: + """获取所有engaging_persons统计信息 + + Returns: + dict: person_id -> {first_time, last_time, receive_count, reply_count} + """ + return self.engaging_persons.copy() + + def get_engaging_person_stats(self, person_id: str) -> dict: + """获取特定用户的统计信息 + + Args: + person_id: 用户ID + + Returns: + dict: 用户统计信息,如果用户不存在则返回None + """ + return self.engaging_persons.get(person_id) + + def get_top_engaging_persons(self, limit: int = 10, sort_by: str = "receive_count") -> list: + """获取最活跃的用户列表 + + Args: + limit: 返回的用户数量限制 + sort_by: 排序依据,可选值: "receive_count", "reply_count", "last_time" + + Returns: + list: 按指定条件排序的用户列表 + """ + if sort_by not in ["receive_count", "reply_count", "last_time"]: + sort_by = "receive_count" + + sorted_persons = sorted( + self.engaging_persons.items(), + key=lambda x: x[1][sort_by], + reverse=True + ) + + return sorted_persons[:limit] + + def clear_engaging_persons_stats(self): + """清空engaging_persons统计信息""" + self.engaging_persons.clear() + logger.info(f"[{self.stream_name}] 已清空engaging_persons统计信息") + + def get_relation_building_stats(self) -> dict: + """获取关系构建相关统计信息 + + Returns: + dict: 关系构建统计信息 + """ + total_persons = len(self.engaging_persons) + relation_built_count = sum(1 for stats in self.engaging_persons.values() + if stats.get("relation_built", False)) + pending_persons = [] + + current_time = time.time() + for person_id, stats in self.engaging_persons.items(): + if not stats.get("relation_built", False): + time_elapsed = current_time - stats["first_time"] + total_messages = self._get_total_messages_in_timerange( + stats["first_time"], stats["last_time"] + ) + + # 检查是否接近满足条件 + progress_info = { + "person_id": person_id, + "time_elapsed": time_elapsed, + "total_messages": total_messages, + "receive_count": stats["receive_count"], + "reply_count": stats["reply_count"], + "progress": { + "50_messages": f"{total_messages}/50 ({total_messages/50*100:.1f}%)", + "35_msg_10min": f"{total_messages}/35 + {time_elapsed}/600s", + "25_msg_30min": f"{total_messages}/25 + {time_elapsed}/1800s", + "10_msg_1hour": f"{total_messages}/10 + {time_elapsed}/3600s" + } + } + pending_persons.append(progress_info) + + return { + "total_persons": total_persons, + "relation_built_count": relation_built_count, + "pending_count": len(pending_persons), + "pending_persons": pending_persons + } + + def get_engaging_persons_summary(self) -> dict: + """获取engaging_persons统计摘要 + + Returns: + dict: 包含总用户数、总消息数、总回复数等统计信息 + """ + if not self.engaging_persons: + return { + "total_persons": 0, + "total_messages": 0, + "total_replies": 0, + "most_active_person": None, + "most_replied_person": None + } + + total_messages = sum(stats["receive_count"] for stats in self.engaging_persons.values()) + total_replies = sum(stats["reply_count"] for stats in self.engaging_persons.values()) + + most_active = max(self.engaging_persons.items(), key=lambda x: x[1]["receive_count"]) + most_replied = max(self.engaging_persons.items(), key=lambda x: x[1]["reply_count"]) + + return { + "total_persons": len(self.engaging_persons), + "total_messages": total_messages, + "total_replies": total_replies, + "most_active_person": { + "person_id": most_active[0], + "message_count": most_active[1]["receive_count"] + }, + "most_replied_person": { + "person_id": most_replied[0], + "reply_count": most_replied[1]["reply_count"] + } + } + + async def _check_relation_building_conditions(self): + """检查engaging_persons中是否有满足关系构建条件的用户""" + current_time = time.time() + + for person_id, stats in list(self.engaging_persons.items()): + # 跳过已经进行过关系构建的用户 + if stats.get("relation_built", False): + continue + + # 计算时间差和消息数量 + time_elapsed = current_time - stats["first_time"] + total_messages = self._get_total_messages_in_timerange( + stats["first_time"], stats["last_time"] + ) + + # 检查是否满足关系构建条件 + should_build_relation = ( + total_messages >= 50 # 50条消息必定满足 + or (total_messages >= 35 and time_elapsed >= 600) # 35条且10分钟 + or (total_messages >= 25 and time_elapsed >= 1800) # 25条且30分钟 + or (total_messages >= 10 and time_elapsed >= 3600) # 10条且1小时 + ) + + if should_build_relation: + logger.info( + f"[{self.stream_name}] 用户 {person_id} 满足关系构建条件。" + f"消息数:{total_messages},时长:{time_elapsed:.0f}秒," + f"收到消息:{stats['receive_count']},回复次数:{stats['reply_count']}" + ) + + # 计算构建概率并决定是否构建 + await self._evaluate_and_build_relation(person_id, stats, total_messages) + + + def _get_total_messages_in_timerange(self, start_time: float, end_time: float) -> int: + """获取指定时间范围内的总消息数量""" + try: + messages = get_raw_msg_by_timestamp_with_chat(self.stream_id, start_time, end_time) + return len(messages) if messages else 0 + except Exception as e: + logger.error(f"[{self.stream_name}] 获取时间范围内消息数量失败: {e}") + return 0 + + async def _evaluate_and_build_relation(self, person_id: str, stats: dict, total_messages: int): + """评估并执行关系构建""" + receive_count = stats["receive_count"] + reply_count = stats["reply_count"] + + # 计算回复概率(reply_count在总消息中的比值) + reply_ratio = reply_count / total_messages if total_messages > 0 else 0 + reply_build_probability = reply_ratio # 100%回复则100%构建 + + # 计算接收概率(receive_count的影响) + receive_ratio = receive_count / total_messages if total_messages > 0 else 0 + receive_build_probability = receive_ratio * 0.25 # 100%接收则25%构建 + + # 取最高概率 + final_probability = max(reply_build_probability, receive_build_probability) + + logger.info( + f"[{self.stream_name}] 用户 {person_id} 关系构建概率评估:" + f"回复比例:{reply_ratio:.2f}({reply_build_probability:.2f})" + f",接收比例:{receive_ratio:.2f}({receive_build_probability:.2f})" + f",最终概率:{final_probability:.2f}" + ) + + # 使用随机数决定是否构建关系 + if random() < final_probability: + logger.info(f"[{self.stream_name}] 决定为用户 {person_id} 构建关系") + await self._build_relation_for_person(person_id, stats) + # 标记已构建 + stats["relation_built"] = True + else: + logger.info(f"[{self.stream_name}] 用户 {person_id} 未通过关系构建概率判定") + # 即使未构建,也标记为已处理,避免重复判定 + stats["relation_built"] = True + + async def _build_relation_for_person(self, person_id: str, stats: dict): + """为特定用户构建关系""" + try: + start_time = stats["first_time"] + end_time = stats["last_time"] + + # 获取该时间段的所有消息用于关系构建 + messages = get_raw_msg_by_timestamp_with_chat(self.stream_id, start_time, end_time) + + if messages: + logger.info(f"[{self.stream_name}] 为用户 {person_id} 获取到 {len(messages)} 条消息用于关系构建") + + # 调用关系管理器更新印象 + relationship_manager = get_relationship_manager() + await relationship_manager.update_person_impression( + person_id=person_id, + timestamp=end_time, + bot_engaged_messages=messages + ) + + logger.info(f"[{self.stream_name}] 用户 {person_id} 关系构建完成") + else: + logger.warning(f"[{self.stream_name}] 未找到用户 {person_id} 的消息,关系构建跳过") + + except Exception as e: + logger.error(f"[{self.stream_name}] 为用户 {person_id} 构建关系时出错: {e}") + traceback.print_exc() diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index 6e22ee0fa..e5acaa308 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -242,6 +242,7 @@ class PersonInfo(BaseModel): user_id = TextField(index=True) # 用户ID nickname = TextField() # 用户昵称 impression = TextField(null=True) # 个人印象 + short_impression = TextField(null=True) # 个人印象的简短描述 points = TextField(null=True) # 个人印象的点 forgotten_points = TextField(null=True) # 被遗忘的点 info_list = TextField(null=True) # 与Bot的互动 diff --git a/src/person_info/person_info.py b/src/person_info/person_info.py index a83f6c9b2..68027d383 100644 --- a/src/person_info/person_info.py +++ b/src/person_info/person_info.py @@ -43,6 +43,7 @@ person_info_default = { # "user_cardname": None, # This field is not in Peewee model PersonInfo # "user_avatar": None, # This field is not in Peewee model PersonInfo "impression": None, # Corrected from persion_impression + "short_impression": None, "info_list": None, "points": None, "forgotten_points": None, diff --git a/src/person_info/relationship_manager.py b/src/person_info/relationship_manager.py index 663c71762..06e19c2fd 100644 --- a/src/person_info/relationship_manager.py +++ b/src/person_info/relationship_manager.py @@ -124,31 +124,14 @@ class RelationshipManager: person_name = await person_info_manager.get_value(person_id, "person_name") if not person_name or person_name == "none": return "" - # impression = await person_info_manager.get_value(person_id, "impression") - points = await person_info_manager.get_value(person_id, "points") or [] - - if isinstance(points, str): - try: - points = ast.literal_eval(points) - except (SyntaxError, ValueError): - points = [] - - random_points = random.sample(points, min(5, len(points))) if points else [] + short_impression = await person_info_manager.get_value(person_id, "short_impression") nickname_str = await person_info_manager.get_value(person_id, "nickname") platform = await person_info_manager.get_value(person_id, "platform") relation_prompt = f"'{person_name}' ,ta在{platform}上的昵称是{nickname_str}。" - # if impression: - # relation_prompt += f"你对ta的印象是:{impression}。" - - if random_points: - for point in random_points: - # print(f"point: {point}") - # print(f"point[2]: {point[2]}") - # print(f"point[0]: {point[0]}") - point_str = f"时间:{point[2]}。内容:{point[0]}" - relation_prompt += f"你记得{person_name}最近的点是:{point_str}。" + if short_impression: + relation_prompt += f"你对ta的印象是:{short_impression}。" return relation_prompt @@ -448,6 +431,35 @@ class RelationshipManager: await person_info_manager.update_one_field(person_id, "impression", compressed_summary) + + + + compress_short_prompt = f""" +你的名字是{global_config.bot.nickname},{global_config.bot.nickname}的别名是{alias_str}。 +请不要混淆你自己和{global_config.bot.nickname}和{person_name}。 + +你对{person_name}的了解是: +{compressed_summary} + +请你用一句话概括你对{person_name}的了解。突出: +1.对{person_name}的直观印象 +2.{global_config.bot.nickname}与{person_name}的关系 +3.{person_name}的关键信息 +请输出一段平文本,以陈诉自白的语气,输出你对{person_name}的概括,不要输出任何其他内容。 +""" + compressed_short_summary, _ = await self.relationship_llm.generate_response_async(prompt=compress_short_prompt) + + # current_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") + # compressed_short_summary = f"截至{current_time},你对{person_name}的了解:{compressed_short_summary}" + + await person_info_manager.update_one_field(person_id, "short_impression", compressed_short_summary) + + + + + + + forgotten_points = [] # 这句代码的作用是:将更新后的 forgotten_points(遗忘的记忆点)列表,序列化为 JSON 字符串后,写回到数据库中的 forgotten_points 字段