Files
Mofox-Core/src/plugin_system/apis/schedule_api.py
minecraft1024a 3c4a3b0428 chore: 统一代码风格并进行现代化改造
本次提交主要包含以下内容:
- **代码风格统一**:对多个文件进行了格式化,包括移除多余的空行、调整导入顺序、统一字符串引号等,以提高代码一致性和可读性。
- **类型提示现代化**:在多个文件中将旧的 `typing` 模块类型提示(如 `Optional[T]`、`List[T]`、`Union[T, U]`)更新为现代 Python 语法(`T | None`、`list[T]`、`T | U`)。
- **f-string 格式化**:在 `scripts/convert_manifest.py` 中,将 `.format()` 调用更新为更现代和易读的 f-string `!r` 表示法。
- **文件末尾换行符**:为多个文件添加或修正了文件末尾的换行符,遵循 POSIX 标准。
2025-10-25 13:31:22 +08:00

331 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
日程表与月度计划查询API模块
本模块提供了一系列用于查询日程和月度计划的只读接口。
所有对外接口均为异步函数,专为插件开发者设计,以便在异步环境中无缝集成。
核心功能:
- 查询指定日期的日程安排。
- 获取当前正在进行的活动。
- 筛选特定时间范围内的活动。
- 查询月度计划,支持随机抽样和计数。
- 所有查询接口均提供格式化输出选项。
使用方式:
import asyncio
from src.plugin_system.apis import schedule_api
async def main():
# 获取今天的日程(原始数据)
today_schedule = await schedule_api.get_schedule()
if today_schedule:
print("今天的日程:", today_schedule)
# 获取昨天的日程,并格式化为字符串
from datetime import datetime, timedelta
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
formatted_schedule = await schedule_api.get_schedule(date=yesterday, formatted=True)
if formatted_schedule:
print(f"\\n昨天的日程 (格式化):\\n{formatted_schedule}")
# 获取当前活动
current_activity = await schedule_api.get_current_activity()
if current_activity:
print(f"\\n当前活动: {current_activity.get('activity')}")
# 获取本月月度计划总数
plan_count = await schedule_api.count_monthly_plans()
print(f"\\n本月月度计划总数: {plan_count}")
# 随机获取本月的2个计划
random_plans = await schedule_api.get_monthly_plans(random_count=2)
if random_plans:
print("\\n随机的2个计划:", [p.plan_text for p in random_plans])
asyncio.run(main())
"""
import random
from datetime import datetime
from typing import Any
import orjson
from sqlalchemy import func, select
from src.common.database.sqlalchemy_models import MonthlyPlan, Schedule, get_db_session
from src.common.logger import get_logger
from src.schedule.database import get_active_plans_for_month
logger = get_logger("schedule_api")
# --- 内部辅助函数 ---
def _format_schedule_list(
items: list[dict[str, Any]] | list[MonthlyPlan],
template: str,
item_type: str,
) -> str:
"""将日程或计划列表格式化为字符串"""
if not items:
return ""
lines = []
for item in items:
if item_type == "schedule" and isinstance(item, dict):
lines.append(template.format(time_range=item.get("time_range", ""), activity=item.get("activity", "")))
elif item_type == "plan" and isinstance(item, MonthlyPlan):
lines.append(template.format(plan_text=item.plan_text))
return "\\n".join(lines)
async def _get_schedule_from_db(date_str: str) -> list[dict[str, Any]] | None:
"""从数据库中获取并解析指定日期的日程"""
async with get_db_session() as session:
result = await session.execute(select(Schedule).filter(Schedule.date == date_str))
schedule_record = result.scalars().first()
if schedule_record and schedule_record.schedule_data:
try:
return orjson.loads(str(schedule_record.schedule_data))
except orjson.JSONDecodeError:
logger.warning(f"无法解析数据库中的日程数据 (日期: {date_str})")
return None
# --- API实现 ---
class ScheduleAPI:
"""日程表与月度计划查询API"""
@staticmethod
async def get_schedule(
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""
(异步) 获取指定日期的日程安排。
Args:
date (Optional[str]): 目标日期,格式 "YYYY-MM-DD"。如果为None则使用当前日期。
formatted (bool): 如果为True返回格式化的字符串否则返回原始数据列表。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
Union[List[Dict[str, Any]], str, None]: 日程数据或None。
"""
target_date = date or datetime.now().strftime("%Y-%m-%d")
try:
logger.debug(f"[ScheduleAPI] 正在获取 {target_date} 的日程安排...")
schedule_data = await _get_schedule_from_db(target_date)
if schedule_data is None:
return None
if formatted:
return _format_schedule_list(schedule_data, format_template, "schedule")
return schedule_data
except Exception as e:
logger.error(f"[ScheduleAPI] 获取 {target_date} 日程失败: {e}")
return None
@staticmethod
async def get_current_activity(
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> dict[str, Any] | str | None:
"""
(异步) 获取当前正在进行的活动。
Args:
formatted (bool): 如果为True返回格式化的字符串否则返回活动字典。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
Union[Dict[str, Any], str, None]: 当前活动数据或None。
"""
try:
logger.debug("[ScheduleAPI] 正在获取当前活动...")
today_schedule = await _get_schedule_from_db(datetime.now().strftime("%Y-%m-%d"))
if not today_schedule:
return None
now = datetime.now().time()
for event in today_schedule:
time_range = event.get("time_range")
if not time_range:
continue
try:
start_str, end_str = time_range.split("-")
start_time = datetime.strptime(start_str.strip(), "%H:%M").time()
end_time = datetime.strptime(end_str.strip(), "%H:%M").time()
if (start_time <= now < end_time) or \
(end_time < start_time and (now >= start_time or now < end_time)):
if formatted:
return _format_schedule_list([event], format_template, "schedule")
return event
except (ValueError, KeyError):
continue
return None
except Exception as e:
logger.error(f"[ScheduleAPI] 获取当前活动失败: {e}")
return None
@staticmethod
async def get_activities_between(
start_time: str,
end_time: str,
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""
(异步) 获取指定日期和时间范围内的所有活动。
Args:
start_time (str): 开始时间,格式 "HH:MM"
end_time (str): 结束时间,格式 "HH:MM"
date (Optional[str]): 目标日期,格式 "YYYY-MM-DD"。如果为None则使用当前日期。
formatted (bool): 如果为True返回格式化的字符串否则返回活动列表。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
Union[List[Dict[str, Any]], str, None]: 在时间范围内的活动列表或None。
"""
target_date = date or datetime.now().strftime("%Y-%m-%d")
try:
logger.debug(f"[ScheduleAPI] 正在获取 {target_date}{start_time}{end_time} 的活动...")
schedule_data = await _get_schedule_from_db(target_date)
if not schedule_data:
return None
start = datetime.strptime(start_time, "%H:%M").time()
end = datetime.strptime(end_time, "%H:%M").time()
activities_in_range = []
for event in schedule_data:
time_range = event.get("time_range")
if not time_range:
continue
try:
event_start_str, event_end_str = time_range.split("-")
event_start = datetime.strptime(event_start_str.strip(), "%H:%M").time()
if start <= event_start < end:
activities_in_range.append(event)
except (ValueError, KeyError):
continue
if formatted:
return _format_schedule_list(activities_in_range, format_template, "schedule")
return activities_in_range
except Exception as e:
logger.error(f"[ScheduleAPI] 获取时间段内活动失败: {e}")
return None
@staticmethod
async def get_monthly_plans(
target_month: str | None = None,
random_count: int | None = None,
formatted: bool = False,
format_template: str = "- {plan_text}",
) -> list[MonthlyPlan] | str | None:
"""
(异步) 获取指定月份的有效月度计划。
Args:
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None则使用当前月份。
random_count (Optional[int]): 如果设置,将随机返回指定数量的计划。
formatted (bool): 如果为True返回格式化的字符串否则返回对象列表。
format_template (str): 当 formatted=True 时使用的格式化模板。
Returns:
Union[List[MonthlyPlan], str, None]: 月度计划列表、格式化字符串或None。
"""
month = target_month or datetime.now().strftime("%Y-%m")
try:
logger.debug(f"[ScheduleAPI] 正在获取 {month} 的月度计划...")
plans = await get_active_plans_for_month(month)
if not plans:
return None
if random_count is not None and random_count > 0 and len(plans) > random_count:
plans = random.sample(plans, random_count)
if formatted:
return _format_schedule_list(plans, format_template, "plan")
return plans
except Exception as e:
logger.error(f"[ScheduleAPI] 获取 {month} 月度计划失败: {e}")
return None
@staticmethod
async def count_monthly_plans(target_month: str | None = None) -> int:
"""
(异步) 获取指定月份的有效月度计划总数。
Args:
target_month (Optional[str]): 目标月份,格式 "YYYY-MM"。如果为None则使用当前月份。
Returns:
int: 有效月度计划的数量。
"""
month = target_month or datetime.now().strftime("%Y-%m")
try:
logger.debug(f"[ScheduleAPI] 正在统计 {month} 的月度计划数量...")
async with get_db_session() as session:
result = await session.execute(
select(func.count(MonthlyPlan.id)).where(
MonthlyPlan.target_month == month, MonthlyPlan.status == "active"
)
)
return result.scalar_one() or 0
except Exception as e:
logger.error(f"[ScheduleAPI] 统计 {month} 月度计划数量失败: {e}")
return 0
# =============================================================================
# 模块级别的便捷函数 (全部为异步)
# =============================================================================
async def get_schedule(
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""(异步) 获取指定日期的日程安排的便捷函数。"""
return await ScheduleAPI.get_schedule(date, formatted, format_template)
async def get_current_activity(
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> dict[str, Any] | str | None:
"""(异步) 获取当前正在进行的活动的便捷函数。"""
return await ScheduleAPI.get_current_activity(formatted, format_template)
async def get_activities_between(
start_time: str,
end_time: str,
date: str | None = None,
formatted: bool = False,
format_template: str = "{time_range}: {activity}",
) -> list[dict[str, Any]] | str | None:
"""(异步) 获取指定时间范围内活动的便捷函数。"""
return await ScheduleAPI.get_activities_between(start_time, end_time, date, formatted, format_template)
async def get_monthly_plans(
target_month: str | None = None,
random_count: int | None = None,
formatted: bool = False,
format_template: str = "- {plan_text}",
) -> list[MonthlyPlan] | str | None:
"""(异步) 获取月度计划的便捷函数。"""
return await ScheduleAPI.get_monthly_plans(target_month, random_count, formatted, format_template)
async def count_monthly_plans(target_month: str | None = None) -> int:
"""(异步) 获取月度计划总数的便捷函数。"""
return await ScheduleAPI.count_monthly_plans(target_month)