修改了一些bug,包括缓冲器对于消息类型的处理,personinfo的拷贝问题,添加用户信息间隔的收集

This commit is contained in:
meng_xi_pan
2025-04-04 13:26:46 +08:00
parent 78b5ee81d6
commit 255d4ea575
4 changed files with 78 additions and 12 deletions

View File

@@ -3,11 +3,13 @@ from src.common.logger import get_module_logger
import asyncio
from dataclasses import dataclass
from .message import MessageRecv
from ..message.message_base import BaseMessageInfo
import hashlib
from typing import Dict
from dataclasses import dataclass, field
from collections import OrderedDict
import random
import time
logger = get_module_logger("message_buffer")
@@ -40,17 +42,15 @@ class MassageBuffer:
self.buffer_pool[person_id_] = OrderedDict()
# 查找最近的处理成功消息(T)
last_T_msg = None
recent_F_count = 0
for msg_id in reversed(self.buffer_pool[person_id_]):
msg = self.buffer_pool[person_id_][msg_id]
if msg.result == "T":
last_T_msg = msg
break
elif msg.result == "F":
recent_F_count += 1
# 判断条件最近T之后有超过3条F
# 判断条件最近T之后有超过3-5条F
if (recent_F_count >= random.randint(3, 5)):
new_msg = CacheMessages(message=message, result="T")
new_msg.cache_determination.set()
@@ -63,7 +63,7 @@ class MassageBuffer:
if cache_msg.result == "U":
cache_msg.result = "F"
cache_msg.cache_determination.set()
logger.debug(f"被新消息覆盖信息id: {message.message_info.message_id}")
logger.debug(f"被新消息覆盖信息id: {cache_msg.message.message_info.message_id}")
# 添加新消息
self.buffer_pool[person_id_][message.message_info.message_id] = CacheMessages(message=message)
@@ -71,6 +71,7 @@ class MassageBuffer:
# 启动3秒缓冲计时器
person_id = person_info_manager.get_person_id(message.message_info.user_info.platform,
message.message_info.user_info.user_id)
asyncio.create_task(self.save_message_interval(person_id, message.message_info))
asyncio.create_task(self._debounce_processor(person_id_,
message.message_info.message_id,
person_id))
@@ -121,22 +122,26 @@ class MassageBuffer:
keep_msgs = OrderedDict()
combined_text = []
found = False
is_text = False
for msg_id, msg in self.buffer_pool[person_id_].items():
if msg_id == message.message_info.message_id:
found = True
is_text = msg.message.message_segment.type == "text"
combined_text.append(msg.message.processed_plain_text)
continue
if found:
keep_msgs[msg_id] = msg
elif msg.result == "F":
# 收集F消息的文本内容
if hasattr(msg.message, 'processed_plain_text') and msg.message.processed_plain_text:
if (hasattr(msg.message, 'processed_plain_text')
and msg.message.message_segment.type == "text"
and msg.message.processed_plain_text):
combined_text.append(msg.message.processed_plain_text)
elif msg.result == "U":
logger.debug(f"异常未处理信息id {msg.message.message_info.message_id}")
# 更新当前消息的processed_plain_text
if combined_text and combined_text[0] != message.processed_plain_text:
if combined_text and combined_text[0] != message.processed_plain_text and is_text:
message.processed_plain_text = "".join(combined_text)
logger.debug(f"整合了{len(combined_text)-1}条F消息的内容到当前消息")
@@ -145,6 +150,22 @@ class MassageBuffer:
except asyncio.TimeoutError:
logger.debug(f"查询超时消息id {message.message_info.message_id}")
return False
async def save_message_interval(self, person_id:str, message:BaseMessageInfo):
message_interval_list = await person_info_manager.get_value(person_id, "msg_interval_list")
now_time_ms = int(round(time.time() * 1000))
if len(message_interval_list) < 1000:
message_interval_list.append(now_time_ms)
else:
message_interval_list = message_interval_list.pop(0)
message_interval_list.append(now_time_ms)
data = {
"platform" : message.platform,
"user_id" : message.user_info.user_id,
"nickname" : message.user_info.user_nickname,
"konw_time" : int(time.time())
}
await person_info_manager.update_one_field(person_id, "msg_interval_list", message_interval_list, data)
message_buffer = MassageBuffer()