diff --git a/src/common/database/sqlalchemy_database_api.py b/src/common/database/sqlalchemy_database_api.py index d85cf1c0b..0ed797795 100644 --- a/src/common/database/sqlalchemy_database_api.py +++ b/src/common/database/sqlalchemy_database_api.py @@ -15,7 +15,7 @@ from src.common.logger import get_logger from src.common.database.sqlalchemy_models import ( Base, get_db_session, Messages, ActionRecords, PersonInfo, ChatStreams, LLMUsage, Emoji, Images, ImageDescriptions, OnlineTime, Memory, - Expression, ThinkingLog, GraphNodes, GraphEdges,get_session + Expression, ThinkingLog, GraphNodes, GraphEdges, Schedule, MaiZoneScheduleStatus, get_session ) logger = get_logger("sqlalchemy_database_api") @@ -36,6 +36,8 @@ MODEL_MAPPING = { 'ThinkingLog': ThinkingLog, 'GraphNodes': GraphNodes, 'GraphEdges': GraphEdges, + 'Schedule': Schedule, + 'MaiZoneScheduleStatus': MaiZoneScheduleStatus, } diff --git a/src/common/database/sqlalchemy_models.py b/src/common/database/sqlalchemy_models.py index 97de907b7..b62ff5cb6 100644 --- a/src/common/database/sqlalchemy_models.py +++ b/src/common/database/sqlalchemy_models.py @@ -454,6 +454,26 @@ class Schedule(Base): ) +class MaiZoneScheduleStatus(Base): + """麦麦空间日程处理状态模型""" + __tablename__ = 'maizone_schedule_status' + + id = Column(Integer, primary_key=True, autoincrement=True) + datetime_hour = Column(get_string_field(13), nullable=False, unique=True, index=True) # YYYY-MM-DD HH格式,精确到小时 + activity = Column(Text, nullable=False) # 该小时的活动内容 + is_processed = Column(Boolean, nullable=False, default=False) # 是否已处理 + processed_at = Column(DateTime, nullable=True) # 处理时间 + story_content = Column(Text, nullable=True) # 生成的说说内容 + send_success = Column(Boolean, nullable=False, default=False) # 是否发送成功 + created_at = Column(DateTime, nullable=False, default=datetime.datetime.now) + updated_at = Column(DateTime, nullable=False, default=datetime.datetime.now, onupdate=datetime.datetime.now) + + __table_args__ = ( + Index('idx_maizone_datetime_hour', 'datetime_hour'), + Index('idx_maizone_is_processed', 'is_processed'), + ) + + # 数据库引擎和会话管理 _engine = None _SessionLocal = None diff --git a/src/plugins/built_in/maizone/scheduler.py b/src/plugins/built_in/maizone/scheduler.py index b2c65a028..5761258a7 100644 --- a/src/plugins/built_in/maizone/scheduler.py +++ b/src/plugins/built_in/maizone/scheduler.py @@ -8,6 +8,9 @@ from typing import Dict, Any from src.common.logger import get_logger from src.plugin_system.apis import llm_api, config_api from src.manager.schedule_manager import schedule_manager +from src.common.database.sqlalchemy_database_api import get_db_session +from src.common.database.sqlalchemy_models import MaiZoneScheduleStatus +from sqlalchemy import select # 导入工具模块 import sys @@ -71,17 +74,20 @@ class ScheduleManager: current_activity = schedule_manager.get_current_activity() if current_activity: - # 生成当前活动的哈希以避免重复发送 - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") - activity_hash = f"{current_time}_{current_activity}" + # 获取当前小时的时间戳格式 YYYY-MM-DD HH + current_datetime_hour = datetime.datetime.now().strftime("%Y-%m-%d %H") - # 检查是否已经为这个活动发送过说说 - if activity_hash != self.last_activity_hash: - logger.info(f"检测到新的日程活动: {current_activity}") - await self._execute_schedule_based_send(current_activity) - self.last_activity_hash = activity_hash + # 检查数据库中是否已经处理过这个小时的日程 + is_already_processed = await self._check_if_already_processed(current_datetime_hour, current_activity) + + if not is_already_processed: + logger.info(f"检测到新的日程活动: {current_activity} (时间: {current_datetime_hour})") + success, story_content = await self._execute_schedule_based_send(current_activity) + + # 更新处理状态到数据库 + await self._update_processing_status(current_datetime_hour, current_activity, success, story_content) else: - logger.debug(f"当前活动已处理过: {current_activity}") + logger.debug(f"当前小时的日程活动已处理过: {current_activity} (时间: {current_datetime_hour})") else: logger.debug("当前时间没有日程活动") @@ -97,8 +103,62 @@ class ScheduleManager: # 出错后等待5分钟再重试 await asyncio.sleep(300) - async def _execute_schedule_based_send(self, activity: str) -> bool: - """根据日程活动执行发送任务""" + async def _check_if_already_processed(self, datetime_hour: str, activity: str) -> bool: + """检查数据库中是否已经处理过这个小时的日程""" + try: + with get_db_session() as session: + # 查询是否存在已处理的记录 + query = session.query(MaiZoneScheduleStatus).filter( + MaiZoneScheduleStatus.datetime_hour == datetime_hour, + MaiZoneScheduleStatus.activity == activity, + MaiZoneScheduleStatus.is_processed == True + ).first() + + return query is not None + + except Exception as e: + logger.error(f"检查日程处理状态时出错: {str(e)}") + # 如果查询出错,为了安全起见返回False,允许重新处理 + return False + + async def _update_processing_status(self, datetime_hour: str, activity: str, success: bool, story_content: str = ""): + """更新日程处理状态到数据库""" + try: + with get_db_session() as session: + # 先查询是否已存在记录 + existing_record = session.query(MaiZoneScheduleStatus).filter( + MaiZoneScheduleStatus.datetime_hour == datetime_hour, + MaiZoneScheduleStatus.activity == activity + ).first() + + if existing_record: + # 更新现有记录 + existing_record.is_processed = True + existing_record.processed_at = datetime.datetime.now() + existing_record.send_success = success + if story_content: + existing_record.story_content = story_content + existing_record.updated_at = datetime.datetime.now() + else: + # 创建新记录 + new_record = MaiZoneScheduleStatus( + datetime_hour=datetime_hour, + activity=activity, + is_processed=True, + processed_at=datetime.datetime.now(), + story_content=story_content or "", + send_success=success + ) + session.add(new_record) + + session.commit() + logger.info(f"已更新日程处理状态: {datetime_hour} - {activity} - 成功: {success}") + + except Exception as e: + logger.error(f"更新日程处理状态时出错: {str(e)}") + + async def _execute_schedule_based_send(self, activity: str) -> tuple[bool, str]: + """根据日程活动执行发送任务,返回(成功状态, 故事内容)""" try: logger.info(f"根据日程活动生成说说: {activity}") @@ -106,7 +166,7 @@ class ScheduleManager: story = await self._generate_activity_story(activity) if not story: logger.error("生成活动相关说说内容失败") - return False + return False, "" logger.info(f"基于日程活动生成说说内容: '{story}'") @@ -121,11 +181,11 @@ class ScheduleManager: else: logger.error(f"基于日程活动的说说发送失败: {activity}") - return success + return success, story except Exception as e: logger.error(f"执行基于日程的发送任务失败: {str(e)}") - return False + return False, "" async def _generate_activity_story(self, activity: str) -> str: """根据日程活动生成说说内容"""