diff --git a/src/chat/heart_flow/observation/chatting_observation.py b/src/chat/heart_flow/observation/chatting_observation.py index 415e4b100..9bd10e511 100644 --- a/src/chat/heart_flow/observation/chatting_observation.py +++ b/src/chat/heart_flow/observation/chatting_observation.py @@ -140,22 +140,22 @@ class ChattingObservation(Observation): return None # logger.debug(f"找到的锚定消息:find_msg: {find_msg}") - + # 创建所需的user_info字段 user_info = { "platform": find_msg.get("user_platform", ""), "user_id": find_msg.get("user_id", ""), "user_nickname": find_msg.get("user_nickname", ""), - "user_cardname": find_msg.get("user_cardname", "") + "user_cardname": find_msg.get("user_cardname", ""), } - + # 创建所需的group_info字段,如果是群聊的话 group_info = {} if find_msg.get("chat_info_group_id"): group_info = { "platform": find_msg.get("chat_info_group_platform", ""), "group_id": find_msg.get("chat_info_group_id", ""), - "group_name": find_msg.get("chat_info_group_name", "") + "group_name": find_msg.get("chat_info_group_name", ""), } content_format = "" @@ -196,7 +196,7 @@ class ChattingObservation(Observation): limit=self.max_now_obs_len, limit_mode="latest", ) - + # print(f"new_messages_list: {new_messages_list}") last_obs_time_mark = self.last_observe_time diff --git a/src/chat/memory_system/Hippocampus.py b/src/chat/memory_system/Hippocampus.py index 23a296c8d..1695a3948 100644 --- a/src/chat/memory_system/Hippocampus.py +++ b/src/chat/memory_system/Hippocampus.py @@ -861,9 +861,7 @@ class EntorhinalCortex: # 确保在更新前获取最新的 memorized_times current_memorized_times = message.get("memorized_times", 0) # 使用 Peewee 更新记录 - Messages.update( - memorized_times=current_memorized_times + 1 - ).where( + Messages.update(memorized_times=current_memorized_times + 1).where( Messages.message_id == message["message_id"] ).execute() return messages # 直接返回原始的消息列表 @@ -983,9 +981,7 @@ class EntorhinalCortex: if not node.last_modified: update_data["last_modified"] = current_time - GraphNodes.update( - **update_data - ).where(GraphNodes.concept == concept).execute() + GraphNodes.update(**update_data).where(GraphNodes.concept == concept).execute() logger.info(f"[时间更新] 节点 {concept} 添加缺失的时间字段") # 获取时间信息(如果不存在则使用当前时间) @@ -1014,9 +1010,7 @@ class EntorhinalCortex: if not edge.last_modified: update_data["last_modified"] = current_time - GraphEdges.update( - **update_data - ).where( + GraphEdges.update(**update_data).where( (GraphEdges.source == source) & (GraphEdges.target == target) ).execute() logger.info(f"[时间更新] 边 {source} - {target} 添加缺失的时间字段") diff --git a/src/chat/utils/chat_message_builder.py b/src/chat/utils/chat_message_builder.py index f81603e13..d662d8c0a 100644 --- a/src/chat/utils/chat_message_builder.py +++ b/src/chat/utils/chat_message_builder.py @@ -175,15 +175,15 @@ async def _build_readable_messages_internal( # 1 & 2: 获取发送者信息并提取消息组件 for msg in messages: # 检查并修复缺少的user_info字段 - if 'user_info' not in msg: + if "user_info" not in msg: # 创建user_info字段 - msg['user_info'] = { - 'platform': msg.get('user_platform', ''), - 'user_id': msg.get('user_id', ''), - 'user_nickname': msg.get('user_nickname', ''), - 'user_cardname': msg.get('user_cardname', '') + msg["user_info"] = { + "platform": msg.get("user_platform", ""), + "user_id": msg.get("user_id", ""), + "user_nickname": msg.get("user_nickname", ""), + "user_cardname": msg.get("user_cardname", ""), } - + user_info = msg.get("user_info", {}) platform = user_info.get("platform") user_id = user_info.get("user_id") diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index bf192ca6a..3544a8be0 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -279,6 +279,7 @@ class GraphNodes(BaseModel): """ 用于存储记忆图节点的模型 """ + concept = TextField(unique=True, index=True) # 节点概念 memory_items = TextField() # JSON格式存储的记忆列表 hash = TextField() # 节点哈希值 @@ -293,6 +294,7 @@ class GraphEdges(BaseModel): """ 用于存储记忆图边的模型 """ + source = TextField(index=True) # 源节点 target = TextField(index=True) # 目标节点 strength = IntegerField() # 连接强度 diff --git a/tests/common/test_message_repository.py b/tests/common/test_message_repository.py index 43d629761..798fa16b1 100644 --- a/tests/common/test_message_repository.py +++ b/tests/common/test_message_repository.py @@ -5,7 +5,7 @@ import sys import os # 添加项目根目录到Python路径 -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) from peewee import SqliteDatabase from src.common.database.database_model import Messages, BaseModel @@ -15,160 +15,158 @@ from src.common.message_repository import find_messages class TestMessageRepository(unittest.TestCase): def setUp(self): # 创建内存中的SQLite数据库用于测试 - self.test_db = SqliteDatabase(':memory:') - + self.test_db = SqliteDatabase(":memory:") + # 覆盖原有数据库连接 BaseModel._meta.database = self.test_db Messages._meta.database = self.test_db - + # 创建表 self.test_db.create_tables([Messages]) - + # 添加测试数据 current_time = datetime.datetime.now().timestamp() self.test_messages = [ { - 'message_id': 'msg1', - 'time': current_time - 3600, # 1小时前 - 'chat_id': '5ed68437e28644da51f314f37df68d18', - 'chat_info_stream_id': 'stream1', - 'chat_info_platform': 'qq', - 'chat_info_user_platform': 'qq', - 'chat_info_user_id': 'user1', - 'chat_info_user_nickname': '用户1', - 'chat_info_user_cardname': '卡片名1', - 'chat_info_group_platform': 'qq', - 'chat_info_group_id': 'group1', - 'chat_info_group_name': '群组1', - 'chat_info_create_time': current_time - 7200, # 2小时前 - 'chat_info_last_active_time': current_time - 1800, # 30分钟前 - 'user_platform': 'qq', - 'user_id': 'user1', - 'user_nickname': '用户1', - 'user_cardname': '卡片名1', - 'processed_plain_text': '你好', - 'detailed_plain_text': '你好', - 'memorized_times': 1 + "message_id": "msg1", + "time": current_time - 3600, # 1小时前 + "chat_id": "5ed68437e28644da51f314f37df68d18", + "chat_info_stream_id": "stream1", + "chat_info_platform": "qq", + "chat_info_user_platform": "qq", + "chat_info_user_id": "user1", + "chat_info_user_nickname": "用户1", + "chat_info_user_cardname": "卡片名1", + "chat_info_group_platform": "qq", + "chat_info_group_id": "group1", + "chat_info_group_name": "群组1", + "chat_info_create_time": current_time - 7200, # 2小时前 + "chat_info_last_active_time": current_time - 1800, # 30分钟前 + "user_platform": "qq", + "user_id": "user1", + "user_nickname": "用户1", + "user_cardname": "卡片名1", + "processed_plain_text": "你好", + "detailed_plain_text": "你好", + "memorized_times": 1, }, { - 'message_id': 'msg2', - 'time': current_time - 1800, # 30分钟前 - 'chat_id': 'chat1', - 'chat_info_stream_id': 'stream1', - 'chat_info_platform': 'qq', - 'chat_info_user_platform': 'qq', - 'chat_info_user_id': 'user1', - 'chat_info_user_nickname': '用户1', - 'chat_info_user_cardname': '卡片名1', - 'chat_info_group_platform': 'qq', - 'chat_info_group_id': 'group1', - 'chat_info_group_name': '群组1', - 'chat_info_create_time': current_time - 7200, - 'chat_info_last_active_time': current_time - 900, # 15分钟前 - 'user_platform': 'qq', - 'user_id': 'user1', - 'user_nickname': '用户1', - 'user_cardname': '卡片名1', - 'processed_plain_text': '世界', - 'detailed_plain_text': '世界', - 'memorized_times': 2 + "message_id": "msg2", + "time": current_time - 1800, # 30分钟前 + "chat_id": "chat1", + "chat_info_stream_id": "stream1", + "chat_info_platform": "qq", + "chat_info_user_platform": "qq", + "chat_info_user_id": "user1", + "chat_info_user_nickname": "用户1", + "chat_info_user_cardname": "卡片名1", + "chat_info_group_platform": "qq", + "chat_info_group_id": "group1", + "chat_info_group_name": "群组1", + "chat_info_create_time": current_time - 7200, + "chat_info_last_active_time": current_time - 900, # 15分钟前 + "user_platform": "qq", + "user_id": "user1", + "user_nickname": "用户1", + "user_cardname": "卡片名1", + "processed_plain_text": "世界", + "detailed_plain_text": "世界", + "memorized_times": 2, }, { - 'message_id': 'msg3', - 'time': current_time - 900, # 15分钟前 - 'chat_id': 'chat2', - 'chat_info_stream_id': 'stream2', - 'chat_info_platform': 'wechat', - 'chat_info_user_platform': 'wechat', - 'chat_info_user_id': 'user2', - 'chat_info_user_nickname': '用户2', - 'chat_info_user_cardname': '卡片名2', - 'chat_info_group_platform': 'wechat', - 'chat_info_group_id': 'group2', - 'chat_info_group_name': '群组2', - 'chat_info_create_time': current_time - 3600, - 'chat_info_last_active_time': current_time - 600, # 10分钟前 - 'user_platform': 'wechat', - 'user_id': 'user2', - 'user_nickname': '用户2', - 'user_cardname': '卡片名2', - 'processed_plain_text': '测试', - 'detailed_plain_text': '测试', - 'memorized_times': 0 - } + "message_id": "msg3", + "time": current_time - 900, # 15分钟前 + "chat_id": "chat2", + "chat_info_stream_id": "stream2", + "chat_info_platform": "wechat", + "chat_info_user_platform": "wechat", + "chat_info_user_id": "user2", + "chat_info_user_nickname": "用户2", + "chat_info_user_cardname": "卡片名2", + "chat_info_group_platform": "wechat", + "chat_info_group_id": "group2", + "chat_info_group_name": "群组2", + "chat_info_create_time": current_time - 3600, + "chat_info_last_active_time": current_time - 600, # 10分钟前 + "user_platform": "wechat", + "user_id": "user2", + "user_nickname": "用户2", + "user_cardname": "卡片名2", + "processed_plain_text": "测试", + "detailed_plain_text": "测试", + "memorized_times": 0, + }, ] - + for msg_data in self.test_messages: Messages.create(**msg_data) - + def tearDown(self): # 关闭测试数据库连接 self.test_db.close() - + def test_find_messages_no_filter(self): """测试不带过滤器的查询""" results = find_messages({}) self.assertEqual(len(results), 3) # 验证结果是否按时间升序排列 - self.assertEqual(results[0]['message_id'], 'msg1') - self.assertEqual(results[1]['message_id'], 'msg2') - self.assertEqual(results[2]['message_id'], 'msg3') - + self.assertEqual(results[0]["message_id"], "msg1") + self.assertEqual(results[1]["message_id"], "msg2") + self.assertEqual(results[2]["message_id"], "msg3") + def test_find_messages_with_filter(self): """测试带过滤器的查询""" - results = find_messages({'chat_id': 'chat1'}) + results = find_messages({"chat_id": "chat1"}) self.assertEqual(len(results), 2) - self.assertEqual(results[0]['message_id'], 'msg1') - self.assertEqual(results[1]['message_id'], 'msg2') - - results = find_messages({'user_id': 'user2'}) + self.assertEqual(results[0]["message_id"], "msg1") + self.assertEqual(results[1]["message_id"], "msg2") + + results = find_messages({"user_id": "user2"}) self.assertEqual(len(results), 1) - self.assertEqual(results[0]['message_id'], 'msg3') - + self.assertEqual(results[0]["message_id"], "msg3") + def test_find_messages_with_operators(self): """测试带操作符的查询""" - results = find_messages({'memorized_times': {'$gt': 0}}) + results = find_messages({"memorized_times": {"$gt": 0}}) self.assertEqual(len(results), 2) - self.assertEqual(results[0]['message_id'], 'msg1') - self.assertEqual(results[1]['message_id'], 'msg2') - - results = find_messages({'memorized_times': {'$gte': 2}}) + self.assertEqual(results[0]["message_id"], "msg1") + self.assertEqual(results[1]["message_id"], "msg2") + + results = find_messages({"memorized_times": {"$gte": 2}}) self.assertEqual(len(results), 1) - self.assertEqual(results[0]['message_id'], 'msg2') - + self.assertEqual(results[0]["message_id"], "msg2") + def test_find_messages_with_sort(self): """测试带排序的查询""" - results = find_messages({}, sort=[('memorized_times', -1)]) + results = find_messages({}, sort=[("memorized_times", -1)]) self.assertEqual(len(results), 3) # 验证结果是否按memorized_times降序排列 - self.assertEqual(results[0]['message_id'], 'msg2') # memorized_times = 2 - self.assertEqual(results[1]['message_id'], 'msg1') # memorized_times = 1 - self.assertEqual(results[2]['message_id'], 'msg3') # memorized_times = 0 - + self.assertEqual(results[0]["message_id"], "msg2") # memorized_times = 2 + self.assertEqual(results[1]["message_id"], "msg1") # memorized_times = 1 + self.assertEqual(results[2]["message_id"], "msg3") # memorized_times = 0 + def test_find_messages_with_limit(self): """测试带限制的查询""" # 默认limit_mode为latest,应返回最新的2条记录 results = find_messages({}, limit=2) self.assertEqual(len(results), 2) - self.assertEqual(results[0]['message_id'], 'msg2') - self.assertEqual(results[1]['message_id'], 'msg3') - + self.assertEqual(results[0]["message_id"], "msg2") + self.assertEqual(results[1]["message_id"], "msg3") + # 使用earliest模式,应返回最早的2条记录 - results = find_messages({}, limit=2, limit_mode='earliest') + results = find_messages({}, limit=2, limit_mode="earliest") self.assertEqual(len(results), 2) - self.assertEqual(results[0]['message_id'], 'msg1') - self.assertEqual(results[1]['message_id'], 'msg2') - + self.assertEqual(results[0]["message_id"], "msg1") + self.assertEqual(results[1]["message_id"], "msg2") + def test_find_messages_with_combined_criteria(self): """测试组合查询条件""" results = find_messages( - {'chat_info_platform': 'qq', 'memorized_times': {'$gt': 0}}, - sort=[('time', 1)], - limit=1 + {"chat_info_platform": "qq", "memorized_times": {"$gt": 0}}, sort=[("time", 1)], limit=1 ) self.assertEqual(len(results), 1) - self.assertEqual(results[0]['message_id'], 'msg2') + self.assertEqual(results[0]["message_id"], "msg2") -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_build_readable_messages.py b/tests/test_build_readable_messages.py index 76caffb75..71d91a46d 100644 --- a/tests/test_build_readable_messages.py +++ b/tests/test_build_readable_messages.py @@ -9,7 +9,7 @@ import json import copy # 添加项目根目录到Python路径 -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat, build_readable_messages from src.common.logger import get_module_logger @@ -17,13 +17,14 @@ from src.common.logger import get_module_logger # 创建测试日志记录器 logger = get_module_logger("test_readable_msg") + class TestBuildReadableMessages(unittest.TestCase): def setUp(self): # 准备测试数据:从真实数据库获取消息 - self.chat_id = '5ed68437e28644da51f314f37df68d18' + self.chat_id = "5ed68437e28644da51f314f37df68d18" self.current_time = time.time() self.thirty_days_ago = self.current_time - (30 * 24 * 60 * 60) # 30天前的时间戳 - + # 获取最新的10条消息 try: self.messages = get_raw_msg_by_timestamp_with_chat( @@ -31,10 +32,10 @@ class TestBuildReadableMessages(unittest.TestCase): timestamp_start=self.thirty_days_ago, timestamp_end=self.current_time, limit=10, - limit_mode="latest" + limit_mode="latest", ) logger.info(f"已获取 {len(self.messages)} 条测试消息") - + # 打印消息样例 if self.messages: sample_msg = self.messages[0] @@ -44,128 +45,129 @@ class TestBuildReadableMessages(unittest.TestCase): logger.error(f"获取消息失败: {e}") logger.error(traceback.format_exc()) self.messages = [] - + def test_manual_fix_messages(self): """创建一个手动修复版本的消息进行测试""" if not self.messages: self.skipTest("没有测试消息,跳过测试") return - + logger.info("开始手动修复消息...") - + # 创建修复版本的消息列表 fixed_messages = [] - + for msg in self.messages: # 深拷贝以避免修改原始数据 fixed_msg = copy.deepcopy(msg) - + # 构建 user_info 对象 - if 'user_info' not in fixed_msg: + if "user_info" not in fixed_msg: user_info = { - 'platform': fixed_msg.get('user_platform', 'qq'), - 'user_id': fixed_msg.get('user_id', '10000'), - 'user_nickname': fixed_msg.get('user_nickname', '测试用户'), - 'user_cardname': fixed_msg.get('user_cardname', '') + "platform": fixed_msg.get("user_platform", "qq"), + "user_id": fixed_msg.get("user_id", "10000"), + "user_nickname": fixed_msg.get("user_nickname", "测试用户"), + "user_cardname": fixed_msg.get("user_cardname", ""), } - fixed_msg['user_info'] = user_info + fixed_msg["user_info"] = user_info logger.info(f"为消息 {fixed_msg.get('message_id')} 添加了 user_info") - + fixed_messages.append(fixed_msg) - + logger.info(f"已修复 {len(fixed_messages)} 条消息") - + try: # 使用修复后的消息尝试格式化 - formatted_text = asyncio.run(build_readable_messages( - messages=fixed_messages, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="absolute", - read_mark=0.0, - truncate=False - )) - + formatted_text = asyncio.run( + build_readable_messages( + messages=fixed_messages, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="absolute", + read_mark=0.0, + truncate=False, + ) + ) + logger.info("使用修复后的消息格式化完成") logger.info(f"格式化结果长度: {len(formatted_text)}") if formatted_text: logger.info(f"格式化结果预览: {formatted_text[:200]}...") else: logger.warning("格式化结果为空") - + # 断言 self.assertNotEqual(formatted_text, "", "有消息时不应返回空字符串") except Exception as e: logger.error(f"使用修复后的消息格式化失败: {e}") logger.error(traceback.format_exc()) raise - + def test_debug_build_messages_internal(self): """调试_build_readable_messages_internal函数""" if not self.messages: self.skipTest("没有测试消息,跳过测试") return - + logger.info("开始调试内部构建函数...") - + try: # 直接导入内部函数进行测试 from src.chat.utils.chat_message_builder import _build_readable_messages_internal - + # 手动创建一个简单的测试消息列表 test_msg = self.messages[0].copy() # 使用第一条消息作为模板 - + # 检查消息结构 logger.info(f"测试消息keys: {list(test_msg.keys())}") logger.info(f"user_info存在: {'user_info' in test_msg}") - + # 修复缺少的user_info字段 - if 'user_info' not in test_msg: + if "user_info" not in test_msg: logger.warning("消息中缺少user_info字段,添加模拟数据") - test_msg['user_info'] = { - 'platform': test_msg.get('user_platform', 'qq'), - 'user_id': test_msg.get('user_id', '10000'), - 'user_nickname': test_msg.get('user_nickname', '测试用户'), - 'user_cardname': test_msg.get('user_cardname', '') + test_msg["user_info"] = { + "platform": test_msg.get("user_platform", "qq"), + "user_id": test_msg.get("user_id", "10000"), + "user_nickname": test_msg.get("user_nickname", "测试用户"), + "user_cardname": test_msg.get("user_cardname", ""), } logger.info(f"添加的user_info: {test_msg['user_info']}") - + simple_msgs = [test_msg] - + # 运行内部函数 - result_text, result_details = asyncio.run(_build_readable_messages_internal( - simple_msgs, - replace_bot_name=True, - merge_messages=False, - timestamp_mode="absolute", - truncate=False - )) - + result_text, result_details = asyncio.run( + _build_readable_messages_internal( + simple_msgs, replace_bot_name=True, merge_messages=False, timestamp_mode="absolute", truncate=False + ) + ) + logger.info(f"内部函数返回结果: {result_text[:200] if result_text else '空'}") logger.info(f"详情列表长度: {len(result_details)}") - + # 显示处理过程中的变量 if not result_text and len(simple_msgs) > 0: logger.warning("消息处理可能有问题,检查关键步骤") msg = simple_msgs[0] - + # 打印关键变量的值 user_info = msg.get("user_info", {}) platform = user_info.get("platform") user_id = user_info.get("user_id") timestamp = msg.get("time") content = msg.get("processed_plain_text", "") - + logger.warning(f"平台: {platform}, 用户ID: {user_id}, 时间戳: {timestamp}") logger.warning(f"内容: {content[:50]}...") - + # 检查必要信息是否完整 logger.warning(f"必要信息完整性检查: {all([platform, user_id, timestamp is not None])}") - + except Exception as e: logger.error(f"调试内部函数失败: {e}") logger.error(traceback.format_exc()) raise -if __name__ == '__main__': - unittest.main() \ No newline at end of file + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_extract_messages.py b/tests/test_extract_messages.py index d32e644b6..95ddb523f 100644 --- a/tests/test_extract_messages.py +++ b/tests/test_extract_messages.py @@ -5,13 +5,14 @@ import datetime import time # 添加项目根目录到Python路径 -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.common.message_repository import find_messages from src.chat.utils.chat_message_builder import get_raw_msg_by_timestamp_with_chat from peewee import SqliteDatabase from src.common.database.database import db # 导入实际的数据库连接 + class TestExtractMessages(unittest.TestCase): def setUp(self): # 这个测试使用真实的数据库,所以不需要创建测试数据 @@ -19,70 +20,64 @@ class TestExtractMessages(unittest.TestCase): def test_extract_latest_messages_direct(self): """测试直接使用message_repository.find_messages函数""" - chat_id = '5ed68437e28644da51f314f37df68d18' - + chat_id = "5ed68437e28644da51f314f37df68d18" + # 提取最新的10条消息 - results = find_messages( - {'chat_id': chat_id}, - limit=10 - ) - + results = find_messages({"chat_id": chat_id}, limit=10) + # 打印结果数量 print(f"\n直接使用find_messages,找到 {len(results)} 条消息") - + # 如果有结果,打印一些信息 if results: print("\n消息时间顺序:") for idx, msg in enumerate(results): - msg_time = datetime.datetime.fromtimestamp(msg['time']).strftime('%Y-%m-%d %H:%M:%S') - print(f"{idx+1}. ID: {msg['message_id']}, 时间: {msg_time}") + msg_time = datetime.datetime.fromtimestamp(msg["time"]).strftime("%Y-%m-%d %H:%M:%S") + print(f"{idx + 1}. ID: {msg['message_id']}, 时间: {msg_time}") print(f" 文本: {msg.get('processed_plain_text', '无文本内容')[:50]}...") - + # 验证结果按时间排序 - times = [msg['time'] for msg in results] + times = [msg["time"] for msg in results] self.assertEqual(times, sorted(times), "消息应该按时间升序排列") else: print(f"未找到chat_id为 {chat_id} 的消息") - - # 最基本的断言,确保测试有效 - self.assertIsInstance(results, list, "结果应该是一个列表") - - def test_extract_latest_messages_via_builder(self): - """使用chat_message_builder中的函数测试从真实数据库提取消息""" - chat_id = '5ed68437e28644da51f314f37df68d18' - - # 设置时间范围为过去30天到现在 - current_time = time.time() - thirty_days_ago = current_time - (30 * 24 * 60 * 60) # 30天前的时间戳 - - # 使用chat_message_builder中的函数 - results = get_raw_msg_by_timestamp_with_chat( - chat_id=chat_id, - timestamp_start=thirty_days_ago, - timestamp_end=current_time, - limit=10, - limit_mode="latest" - ) - - # 打印结果数量 - print(f"\n使用get_raw_msg_by_timestamp_with_chat,找到 {len(results)} 条消息") - - # 如果有结果,打印一些信息 - if results: - print("\n消息时间顺序:") - for idx, msg in enumerate(results): - msg_time = datetime.datetime.fromtimestamp(msg['time']).strftime('%Y-%m-%d %H:%M:%S') - print(f"{idx+1}. ID: {msg['message_id']}, 时间: {msg_time}") - print(f" 文本: {msg.get('processed_plain_text', '无文本内容')[:50]}...") - - # 验证结果按时间排序 - times = [msg['time'] for msg in results] - self.assertEqual(times, sorted(times), "消息应该按时间升序排列") - else: - print(f"未找到chat_id为 {chat_id} 的消息") - + # 最基本的断言,确保测试有效 self.assertIsInstance(results, list, "结果应该是一个列表") -if __name__ == '__main__': - unittest.main() \ No newline at end of file + def test_extract_latest_messages_via_builder(self): + """使用chat_message_builder中的函数测试从真实数据库提取消息""" + chat_id = "5ed68437e28644da51f314f37df68d18" + + # 设置时间范围为过去30天到现在 + current_time = time.time() + thirty_days_ago = current_time - (30 * 24 * 60 * 60) # 30天前的时间戳 + + # 使用chat_message_builder中的函数 + results = get_raw_msg_by_timestamp_with_chat( + chat_id=chat_id, timestamp_start=thirty_days_ago, timestamp_end=current_time, limit=10, limit_mode="latest" + ) + + # 打印结果数量 + print(f"\n使用get_raw_msg_by_timestamp_with_chat,找到 {len(results)} 条消息") + + # 如果有结果,打印一些信息 + if results: + print("\n消息时间顺序:") + for idx, msg in enumerate(results): + msg_time = datetime.datetime.fromtimestamp(msg["time"]).strftime("%Y-%m-%d %H:%M:%S") + print(f"{idx + 1}. ID: {msg['message_id']}, 时间: {msg_time}") + print(f" 文本: {msg.get('processed_plain_text', '无文本内容')[:50]}...") + + # 验证结果按时间排序 + times = [msg["time"] for msg in results] + self.assertEqual(times, sorted(times), "消息应该按时间升序排列") + else: + print(f"未找到chat_id为 {chat_id} 的消息") + + # 最基本的断言,确保测试有效 + self.assertIsInstance(results, list, "结果应该是一个列表") + + +if __name__ == "__main__": + unittest.main()