添加日程管理相关模型Schedule和MaiZoneScheduleStatus,并在调度管理器中实现日程处理状态的检查与更新逻辑,优化了基于日程的发送功能。

This commit is contained in:
minecraft1024a
2025-08-14 15:19:35 +08:00
committed by Windpicker-owo
parent 80eab4cf1a
commit 3af3904070
3 changed files with 97 additions and 15 deletions

View File

@@ -15,7 +15,7 @@ from src.common.logger import get_logger
from src.common.database.sqlalchemy_models import (
Base, get_db_session, Messages, ActionRecords, PersonInfo, ChatStreams,
LLMUsage, Emoji, Images, ImageDescriptions, OnlineTime, Memory,
Expression, ThinkingLog, GraphNodes, GraphEdges,get_session
Expression, ThinkingLog, GraphNodes, GraphEdges, Schedule, MaiZoneScheduleStatus, get_session
)
logger = get_logger("sqlalchemy_database_api")
@@ -36,6 +36,8 @@ MODEL_MAPPING = {
'ThinkingLog': ThinkingLog,
'GraphNodes': GraphNodes,
'GraphEdges': GraphEdges,
'Schedule': Schedule,
'MaiZoneScheduleStatus': MaiZoneScheduleStatus,
}

View File

@@ -454,6 +454,26 @@ class Schedule(Base):
)
class MaiZoneScheduleStatus(Base):
"""麦麦空间日程处理状态模型"""
__tablename__ = 'maizone_schedule_status'
id = Column(Integer, primary_key=True, autoincrement=True)
datetime_hour = Column(get_string_field(13), nullable=False, unique=True, index=True) # YYYY-MM-DD HH格式精确到小时
activity = Column(Text, nullable=False) # 该小时的活动内容
is_processed = Column(Boolean, nullable=False, default=False) # 是否已处理
processed_at = Column(DateTime, nullable=True) # 处理时间
story_content = Column(Text, nullable=True) # 生成的说说内容
send_success = Column(Boolean, nullable=False, default=False) # 是否发送成功
created_at = Column(DateTime, nullable=False, default=datetime.datetime.now)
updated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, onupdate=datetime.datetime.now)
__table_args__ = (
Index('idx_maizone_datetime_hour', 'datetime_hour'),
Index('idx_maizone_is_processed', 'is_processed'),
)
# 数据库引擎和会话管理
_engine = None
_SessionLocal = None

View File

@@ -8,6 +8,9 @@ from typing import Dict, Any
from src.common.logger import get_logger
from src.plugin_system.apis import llm_api, config_api
from src.manager.schedule_manager import schedule_manager
from src.common.database.sqlalchemy_database_api import get_db_session
from src.common.database.sqlalchemy_models import MaiZoneScheduleStatus
from sqlalchemy import select
# 导入工具模块
import sys
@@ -71,17 +74,20 @@ class ScheduleManager:
current_activity = schedule_manager.get_current_activity()
if current_activity:
# 生成当前活动的哈希以避免重复发送
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
activity_hash = f"{current_time}_{current_activity}"
# 获取当前小时的时间戳格式 YYYY-MM-DD HH
current_datetime_hour = datetime.datetime.now().strftime("%Y-%m-%d %H")
# 检查是否已经为这个活动发送过说说
if activity_hash != self.last_activity_hash:
logger.info(f"检测到新的日程活动: {current_activity}")
await self._execute_schedule_based_send(current_activity)
self.last_activity_hash = activity_hash
# 检查数据库中是否已经处理过这个小时的日程
is_already_processed = await self._check_if_already_processed(current_datetime_hour, current_activity)
if not is_already_processed:
logger.info(f"检测到新的日程活动: {current_activity} (时间: {current_datetime_hour})")
success, story_content = await self._execute_schedule_based_send(current_activity)
# 更新处理状态到数据库
await self._update_processing_status(current_datetime_hour, current_activity, success, story_content)
else:
logger.debug(f"当前活动已处理过: {current_activity}")
logger.debug(f"当前小时的日程活动已处理过: {current_activity} (时间: {current_datetime_hour})")
else:
logger.debug("当前时间没有日程活动")
@@ -97,8 +103,62 @@ class ScheduleManager:
# 出错后等待5分钟再重试
await asyncio.sleep(300)
async def _execute_schedule_based_send(self, activity: str) -> bool:
"""根据日程活动执行发送任务"""
async def _check_if_already_processed(self, datetime_hour: str, activity: str) -> bool:
"""检查数据库中是否已经处理过这个小时的日程"""
try:
with get_db_session() as session:
# 查询是否存在已处理的记录
query = session.query(MaiZoneScheduleStatus).filter(
MaiZoneScheduleStatus.datetime_hour == datetime_hour,
MaiZoneScheduleStatus.activity == activity,
MaiZoneScheduleStatus.is_processed == True
).first()
return query is not None
except Exception as e:
logger.error(f"检查日程处理状态时出错: {str(e)}")
# 如果查询出错为了安全起见返回False允许重新处理
return False
async def _update_processing_status(self, datetime_hour: str, activity: str, success: bool, story_content: str = ""):
"""更新日程处理状态到数据库"""
try:
with get_db_session() as session:
# 先查询是否已存在记录
existing_record = session.query(MaiZoneScheduleStatus).filter(
MaiZoneScheduleStatus.datetime_hour == datetime_hour,
MaiZoneScheduleStatus.activity == activity
).first()
if existing_record:
# 更新现有记录
existing_record.is_processed = True
existing_record.processed_at = datetime.datetime.now()
existing_record.send_success = success
if story_content:
existing_record.story_content = story_content
existing_record.updated_at = datetime.datetime.now()
else:
# 创建新记录
new_record = MaiZoneScheduleStatus(
datetime_hour=datetime_hour,
activity=activity,
is_processed=True,
processed_at=datetime.datetime.now(),
story_content=story_content or "",
send_success=success
)
session.add(new_record)
session.commit()
logger.info(f"已更新日程处理状态: {datetime_hour} - {activity} - 成功: {success}")
except Exception as e:
logger.error(f"更新日程处理状态时出错: {str(e)}")
async def _execute_schedule_based_send(self, activity: str) -> tuple[bool, str]:
"""根据日程活动执行发送任务,返回(成功状态, 故事内容)"""
try:
logger.info(f"根据日程活动生成说说: {activity}")
@@ -106,7 +166,7 @@ class ScheduleManager:
story = await self._generate_activity_story(activity)
if not story:
logger.error("生成活动相关说说内容失败")
return False
return False, ""
logger.info(f"基于日程活动生成说说内容: '{story}'")
@@ -121,11 +181,11 @@ class ScheduleManager:
else:
logger.error(f"基于日程活动的说说发送失败: {activity}")
return success
return success, story
except Exception as e:
logger.error(f"执行基于日程的发送任务失败: {str(e)}")
return False
return False, ""
async def _generate_activity_story(self, activity: str) -> str:
"""根据日程活动生成说说内容"""