完成消息间隔推断功能
This commit is contained in:
@@ -58,7 +58,7 @@ class MainSystem:
|
|||||||
|
|
||||||
# 检查并清除person_info冗余字段,启动个人习惯推断
|
# 检查并清除person_info冗余字段,启动个人习惯推断
|
||||||
await person_info_manager.del_all_undefined_field()
|
await person_info_manager.del_all_undefined_field()
|
||||||
# asyncio.create_task(person_info_manager.personal_habit_deduction())
|
asyncio.create_task(person_info_manager.personal_habit_deduction())
|
||||||
|
|
||||||
# 启动愿望管理器
|
# 启动愿望管理器
|
||||||
await willing_manager.ensure_started()
|
await willing_manager.ensure_started()
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ class MassageBuffer:
|
|||||||
if len(message_interval_list) < 1000:
|
if len(message_interval_list) < 1000:
|
||||||
message_interval_list.append(now_time_ms)
|
message_interval_list.append(now_time_ms)
|
||||||
else:
|
else:
|
||||||
message_interval_list = message_interval_list.pop(0)
|
message_interval_list.pop(0)
|
||||||
message_interval_list.append(now_time_ms)
|
message_interval_list.append(now_time_ms)
|
||||||
data = {
|
data = {
|
||||||
"platform" : message.platform,
|
"platform" : message.platform,
|
||||||
|
|||||||
@@ -5,6 +5,11 @@ import hashlib
|
|||||||
from typing import Any, Callable, Dict, TypeVar
|
from typing import Any, Callable, Dict, TypeVar
|
||||||
T = TypeVar('T') # 泛型类型
|
T = TypeVar('T') # 泛型类型
|
||||||
import datetime
|
import datetime
|
||||||
|
import asyncio
|
||||||
|
import numpy
|
||||||
|
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
"""
|
"""
|
||||||
PersonInfoManager 类方法功能摘要:
|
PersonInfoManager 类方法功能摘要:
|
||||||
@@ -38,7 +43,6 @@ person_info_default = {
|
|||||||
|
|
||||||
class PersonInfoManager:
|
class PersonInfoManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.start_time = datetime.datetime.now()
|
|
||||||
if "person_info" not in db.list_collection_names():
|
if "person_info" not in db.list_collection_names():
|
||||||
db.create_collection("person_info")
|
db.create_collection("person_info")
|
||||||
db.person_info.create_index("person_id", unique=True)
|
db.person_info.create_index("person_id", unique=True)
|
||||||
@@ -222,29 +226,59 @@ class PersonInfoManager:
|
|||||||
async def personal_habit_deduction(self):
|
async def personal_habit_deduction(self):
|
||||||
"""启动个人信息推断,每天根据一定条件推断一次"""
|
"""启动个人信息推断,每天根据一定条件推断一次"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"个人信息推断启动: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
while(1):
|
||||||
# # 初始化日程
|
await asyncio.sleep(60)
|
||||||
# await self.check_and_create_today_schedule()
|
current_time = datetime.datetime.now()
|
||||||
# self.print_schedule()
|
logger.info(f"个人信息推断启动: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
|
||||||
# while True:
|
# "msg_interval"推断
|
||||||
# # print(self.get_current_num_task(1, True))
|
msg_interval_lists = await self.get_specific_value_list(
|
||||||
|
"msg_interval_list",
|
||||||
|
lambda x: isinstance(x, list) and len(x) >= 100
|
||||||
|
)
|
||||||
|
for person_id, msg_interval_list_ in msg_interval_lists.items():
|
||||||
|
try:
|
||||||
|
time_interval = []
|
||||||
|
for t1, t2 in zip(msg_interval_list_, msg_interval_list_[1:]):
|
||||||
|
delta = t2 - t1
|
||||||
|
if delta < 6000: # 小于6秒
|
||||||
|
time_interval.append(delta)
|
||||||
|
|
||||||
# current_time = datetime.datetime.now()
|
if len(time_interval) > 30:
|
||||||
|
time_interval.sort()
|
||||||
|
|
||||||
# # 检查是否需要重新生成日程(日期变化)
|
# 画图(log)
|
||||||
# if current_time.date() != self.start_time.date():
|
log_dir = Path("logs/person_info")
|
||||||
# logger.info("检测到日期变化,重新生成日程")
|
log_dir.mkdir(parents=True, exist_ok=True)
|
||||||
# self.start_time = current_time
|
plt.figure(figsize=(10, 6))
|
||||||
# await self.check_and_create_today_schedule()
|
|
||||||
# self.print_schedule()
|
|
||||||
|
|
||||||
# # 执行当前活动
|
plt.hist(time_interval, bins=30, density=True, alpha=0.5, color='g')
|
||||||
# # mind_thinking = heartflow.current_state.current_mind
|
|
||||||
|
|
||||||
# await self.move_doing()
|
plt.grid(True, alpha=0.3)
|
||||||
|
|
||||||
# await asyncio.sleep(self.schedule_doing_update_interval)
|
plt.title(f"Message Interval Density (User: {person_id[:8]}...)")
|
||||||
|
plt.xlabel("Interval (ms)")
|
||||||
|
plt.ylabel("Density")
|
||||||
|
|
||||||
|
img_path = log_dir / f"interval_density_{person_id[:8]}.png"
|
||||||
|
plt.savefig(img_path)
|
||||||
|
plt.close()
|
||||||
|
logger.info(f"已保存分布图到: {img_path}")
|
||||||
|
# 画图
|
||||||
|
|
||||||
|
filtered_intervals = [t for t in time_interval if t >= 500]
|
||||||
|
if len(filtered_intervals) > 25:
|
||||||
|
msg_interval = numpy.percentile(filtered_intervals, 90)
|
||||||
|
await self.update_one_field(person_id, "msg_interval", int(msg_interval))
|
||||||
|
logger.debug(f"用户{person_id}的msg_interval已经被更新为{msg_interval}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"处理用户{person_id}msg_interval推断时出错: {str(e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 其他...
|
||||||
|
|
||||||
|
logger.info(f"个人信息推断结束: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
await asyncio.sleep(86400)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"个人信息推断运行时出错: {str(e)}")
|
logger.error(f"个人信息推断运行时出错: {str(e)}")
|
||||||
|
|||||||
Reference in New Issue
Block a user