diff --git a/src/main.py b/src/main.py index e3bbf38d1..932fbfcfe 100644 --- a/src/main.py +++ b/src/main.py @@ -56,8 +56,9 @@ class MainSystem: self.mood_manager.start_mood_update(update_interval=global_config.mood_update_interval) logger.success("情绪管理器启动成功") - # 检查并清除person_info冗余字段 + # 检查并清除person_info冗余字段,启动个人习惯推断 await person_info_manager.del_all_undefined_field() + # asyncio.create_task(person_info_manager.personal_habit_deduction()) # 启动愿望管理器 await willing_manager.ensure_started() diff --git a/src/plugins/chat/message_buffer.py b/src/plugins/chat/message_buffer.py index 4e9aa5582..9919e6cf7 100644 --- a/src/plugins/chat/message_buffer.py +++ b/src/plugins/chat/message_buffer.py @@ -3,11 +3,13 @@ from src.common.logger import get_module_logger import asyncio from dataclasses import dataclass from .message import MessageRecv +from ..message.message_base import BaseMessageInfo import hashlib from typing import Dict from dataclasses import dataclass, field from collections import OrderedDict import random +import time logger = get_module_logger("message_buffer") @@ -40,17 +42,15 @@ class MassageBuffer: self.buffer_pool[person_id_] = OrderedDict() # 查找最近的处理成功消息(T) - last_T_msg = None recent_F_count = 0 for msg_id in reversed(self.buffer_pool[person_id_]): msg = self.buffer_pool[person_id_][msg_id] if msg.result == "T": - last_T_msg = msg break elif msg.result == "F": recent_F_count += 1 - # 判断条件:最近T之后有超过3条F + # 判断条件:最近T之后有超过3-5条F if (recent_F_count >= random.randint(3, 5)): new_msg = CacheMessages(message=message, result="T") new_msg.cache_determination.set() @@ -63,7 +63,7 @@ class MassageBuffer: if cache_msg.result == "U": cache_msg.result = "F" cache_msg.cache_determination.set() - logger.debug(f"被新消息覆盖信息id: {message.message_info.message_id}") + logger.debug(f"被新消息覆盖信息id: {cache_msg.message.message_info.message_id}") # 添加新消息 self.buffer_pool[person_id_][message.message_info.message_id] = CacheMessages(message=message) @@ -71,6 +71,7 @@ class MassageBuffer: # 启动3秒缓冲计时器 person_id = person_info_manager.get_person_id(message.message_info.user_info.platform, message.message_info.user_info.user_id) + asyncio.create_task(self.save_message_interval(person_id, message.message_info)) asyncio.create_task(self._debounce_processor(person_id_, message.message_info.message_id, person_id)) @@ -121,22 +122,26 @@ class MassageBuffer: keep_msgs = OrderedDict() combined_text = [] found = False + is_text = False for msg_id, msg in self.buffer_pool[person_id_].items(): if msg_id == message.message_info.message_id: found = True + is_text = msg.message.message_segment.type == "text" combined_text.append(msg.message.processed_plain_text) continue if found: keep_msgs[msg_id] = msg elif msg.result == "F": # 收集F消息的文本内容 - if hasattr(msg.message, 'processed_plain_text') and msg.message.processed_plain_text: + if (hasattr(msg.message, 'processed_plain_text') + and msg.message.message_segment.type == "text" + and msg.message.processed_plain_text): combined_text.append(msg.message.processed_plain_text) elif msg.result == "U": logger.debug(f"异常未处理信息id: {msg.message.message_info.message_id}") # 更新当前消息的processed_plain_text - if combined_text and combined_text[0] != message.processed_plain_text: + if combined_text and combined_text[0] != message.processed_plain_text and is_text: message.processed_plain_text = "".join(combined_text) logger.debug(f"整合了{len(combined_text)-1}条F消息的内容到当前消息") @@ -145,6 +150,22 @@ class MassageBuffer: except asyncio.TimeoutError: logger.debug(f"查询超时消息id: {message.message_info.message_id}") return False + + async def save_message_interval(self, person_id:str, message:BaseMessageInfo): + message_interval_list = await person_info_manager.get_value(person_id, "msg_interval_list") + now_time_ms = int(round(time.time() * 1000)) + if len(message_interval_list) < 1000: + message_interval_list.append(now_time_ms) + else: + message_interval_list = message_interval_list.pop(0) + message_interval_list.append(now_time_ms) + data = { + "platform" : message.platform, + "user_id" : message.user_info.user_id, + "nickname" : message.user_info.user_nickname, + "konw_time" : int(time.time()) + } + await person_info_manager.update_one_field(person_id, "msg_interval_list", message_interval_list, data) message_buffer = MassageBuffer() \ No newline at end of file diff --git a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py index de034e25b..8f5322e22 100644 --- a/src/plugins/chat_module/think_flow_chat/think_flow_chat.py +++ b/src/plugins/chat_module/think_flow_chat/think_flow_chat.py @@ -198,7 +198,12 @@ class ThinkFlowChat: # 查询缓冲器结果,会整合前面跳过的消息,改变processed_plain_text buffer_result = await message_buffer.query_buffer_result(message) if not buffer_result: - logger.info(f"触发缓冲,已炸飞消息:{message.processed_plain_text}") + if message.message_segment.type == "text": + logger.info(f"触发缓冲,已炸飞消息:{message.processed_plain_text}") + elif message.message_segment.type == "image": + logger.info(f"触发缓冲,已炸飞表情包/图片") + elif message.message_segment.type == "seglist": + logger.info(f"触发缓冲,已炸飞消息列") return is_mentioned = is_mentioned_bot_in_message(message) diff --git a/src/plugins/person_info/person_info.py b/src/plugins/person_info/person_info.py index 3373366a0..20ab2db8b 100644 --- a/src/plugins/person_info/person_info.py +++ b/src/plugins/person_info/person_info.py @@ -4,6 +4,7 @@ import copy import hashlib from typing import Any, Callable, Dict, TypeVar T = TypeVar('T') # 泛型类型 +import datetime """ PersonInfoManager 类方法功能摘要: @@ -15,6 +16,7 @@ PersonInfoManager 类方法功能摘要: 6. get_values - 批量获取字段值(任一字段无效则返回空字典) 7. del_all_undefined_field - 清理全集合中未定义的字段 8. get_specific_value_list - 根据指定条件,返回person_id,value字典 +9. personal_habit_deduction - 定时推断个人习惯 """ logger = get_module_logger("person_info") @@ -30,11 +32,13 @@ person_info_default = { # "impression" : None, # "gender" : Unkown, "konw_time" : 0, - "msg_interval": 3000 + "msg_interval": 3000, + "msg_interval_list": [] } # 个人信息的各项与默认值在此定义,以下处理会自动创建/补全每一项 class PersonInfoManager: def __init__(self): + self.start_time = datetime.datetime.now() if "person_info" not in db.list_collection_names(): db.create_collection("person_info") db.person_info.create_index("person_id", unique=True) @@ -109,8 +113,9 @@ class PersonInfoManager: if document and field_name in document: return document[field_name] else: - logger.debug(f"获取{person_id}的{field_name}失败,已返回默认值{person_info_default[field_name]}") - return person_info_default[field_name] + default_value = copy.deepcopy(person_info_default[field_name]) + logger.debug(f"获取{person_id}的{field_name}失败,已返回默认值{default_value}") + return default_value async def get_values(self, person_id: str, field_names: list) -> dict: """获取指定person_id文档的多个字段值,若不存在该字段,则返回该字段的全局默认值""" @@ -134,7 +139,10 @@ class PersonInfoManager: result = {} for field in field_names: - result[field] = document.get(field, person_info_default[field]) if document else person_info_default[field] + result[field] = copy.deepcopy( + document.get(field, person_info_default[field]) + if document else person_info_default[field] + ) return result @@ -210,5 +218,36 @@ class PersonInfoManager: except Exception as e: logger.error(f"数据库查询失败: {str(e)}", exc_info=True) return {} + + async def personal_habit_deduction(self): + """启动个人信息推断,每天根据一定条件推断一次""" + try: + logger.info(f"个人信息推断启动: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") + # # 初始化日程 + # await self.check_and_create_today_schedule() + # self.print_schedule() + + # while True: + # # print(self.get_current_num_task(1, True)) + + # current_time = datetime.datetime.now() + + # # 检查是否需要重新生成日程(日期变化) + # if current_time.date() != self.start_time.date(): + # logger.info("检测到日期变化,重新生成日程") + # self.start_time = current_time + # await self.check_and_create_today_schedule() + # self.print_schedule() + + # # 执行当前活动 + # # mind_thinking = heartflow.current_state.current_mind + + # await self.move_doing() + + # await asyncio.sleep(self.schedule_doing_update_interval) + + except Exception as e: + logger.error(f"个人信息推断运行时出错: {str(e)}") + logger.exception("详细错误信息:") person_info_manager = PersonInfoManager() \ No newline at end of file