diff --git a/src/manager/schedule_manager.py b/src/manager/schedule_manager.py index 3df3d119c..a9b20eccc 100644 --- a/src/manager/schedule_manager.py +++ b/src/manager/schedule_manager.py @@ -120,10 +120,11 @@ class ScheduleManager: def __init__(self): self.today_schedule: Optional[List[Dict[str, Any]]] = None self.llm = LLMRequest(model_set=model_config.model_task_config.schedule_generator, request_type="schedule") - self.max_retries = 3 # 最大重试次数 + self.max_retries = -1 # 无限重试,直到成功生成标准日程表 self.daily_task_started = False self.last_sleep_log_time = 0 self.sleep_log_interval = 35 # 日志记录间隔,单位秒 + self.schedule_generation_running = False # 防止重复生成任务 async def start_daily_schedule_generation(self): """启动每日零点自动生成新日程的任务""" @@ -161,57 +162,73 @@ class ScheduleManager: schedule_str += f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n" logger.info(schedule_str) else: - logger.warning("数据库中的日程数据格式无效,将重新生成日程") + logger.warning("数据库中的日程数据格式无效,将异步重新生成日程") await self.generate_and_save_schedule() except json.JSONDecodeError as e: - logger.error(f"日程数据JSON解析失败: {e},将重新生成日程") + logger.error(f"日程数据JSON解析失败: {e},将异步重新生成日程") await self.generate_and_save_schedule() else: - logger.info(f"数据库中未找到今天的日程 ({today_str}),将调用 LLM 生成。") + logger.info(f"数据库中未找到今天的日程 ({today_str}),将异步调用 LLM 生成。") await self.generate_and_save_schedule() except Exception as e: logger.error(f"加载或生成日程时出错: {e}") + # 出错时也尝试异步生成 + logger.info("尝试异步生成日程作为备用方案...") + await self.generate_and_save_schedule() async def generate_and_save_schedule(self): - now = datetime.now() - today_str = now.strftime("%Y-%m-%d") - current_month_str = now.strftime("%Y-%m") - weekday = now.strftime("%A") - - # 新增:获取节日信息 - lunar = Lunar.fromDate(now) - festivals = lunar.getFestivals() - other_festivals = lunar.getOtherFestivals() - all_festivals = festivals + other_festivals + """启动异步日程生成任务,避免阻塞主程序""" + if self.schedule_generation_running: + logger.info("日程生成任务已在运行中,跳过重复启动") + return + + # 创建异步任务进行日程生成,不阻塞主程序 + asyncio.create_task(self._async_generate_and_save_schedule()) + logger.info("已启动异步日程生成任务") - festival_block = "" - if all_festivals: - festival_text = "、".join(all_festivals) - festival_block = f"**今天也是一个特殊的日子: {festival_text}!请在日程中考虑和庆祝这个节日。**" + async def _async_generate_and_save_schedule(self): + """异步生成并保存日程的内部方法""" + self.schedule_generation_running = True + + try: + now = datetime.now() + today_str = now.strftime("%Y-%m-%d") + current_month_str = now.strftime("%Y-%m") + weekday = now.strftime("%A") + # 新增:获取节日信息 + lunar = Lunar.fromDate(now) + festivals = lunar.getFestivals() + other_festivals = lunar.getOtherFestivals() + all_festivals = festivals + other_festivals + + festival_block = "" + if all_festivals: + festival_text = "、".join(all_festivals) + festival_block = f"**今天也是一个特殊的日子: {festival_text}!请在日程中考虑和庆祝这个节日。**" - # 获取月度计划作为额外参考 - monthly_plans_block = "" - used_plan_ids = [] - if global_config.monthly_plan_system and global_config.monthly_plan_system.enable: - active_plans = get_active_plans_for_month(current_month_str) - if active_plans: - # 随机抽取最多3个计划 - num_to_sample = min(len(active_plans), 3) - sampled_plans = random.sample(active_plans, num_to_sample) - used_plan_ids = [p.id for p in sampled_plans] # type: ignore - - plan_texts = "\n".join([f"- {p.plan_text}" for p in sampled_plans]) - monthly_plans_block = f""" + # 获取月度计划作为额外参考 + monthly_plans_block = "" + used_plan_ids = [] + if global_config.monthly_plan_system and global_config.monthly_plan_system.enable: + active_plans = get_active_plans_for_month(current_month_str) + if active_plans: + # 随机抽取最多3个计划 + num_to_sample = min(len(active_plans), 3) + sampled_plans = random.sample(active_plans, num_to_sample) + used_plan_ids = [p.id for p in sampled_plans] # type: ignore + + plan_texts = "\n".join([f"- {p.plan_text}" for p in sampled_plans]) + monthly_plans_block = f""" **我这个月的一些小目标/计划 (请在今天的日程中适当体现)**: {plan_texts} """ - guidelines = global_config.schedule.guidelines or DEFAULT_SCHEDULE_GUIDELINES - personality = global_config.personality.personality_core - personality_side = global_config.personality.personality_side + guidelines = global_config.schedule.guidelines or DEFAULT_SCHEDULE_GUIDELINES + personality = global_config.personality.personality_core + personality_side = global_config.personality.personality_side - prompt = f""" + base_prompt = f""" 我,{global_config.bot.nickname},需要为自己规划一份今天({today_str},星期{weekday})的详细日程安排。 {festival_block} **关于我**: @@ -229,70 +246,107 @@ class ScheduleManager: 4. time_range格式必须为 "HH:MM-HH:MM" (24小时制) 5. 相邻的时间段必须连续,不能有间隙 6. 不要包含任何JSON以外的解释性文字或代码块标记 +7. **重要:必须在JSON数组的最后一个对象后面加上结束标记 "###SCHEDULE_END###"** **示例**: [ {{"time_range": "00:00-07:00", "activity": "进入梦乡,处理数据"}}, {{"time_range": "07:00-08:00", "activity": "起床伸个懒腰,看看今天有什么新闻"}}, - {{"time_range": "08:00-09:00", "activity": "享用早餐,规划今天的任务"}} -] + {{"time_range": "08:00-09:00", "activity": "享用早餐,规划今天的任务"}}, + {{"time_range": "09:00-23:30", "activity": "其他活动"}}, + {{"time_range": "23:30-00:00", "activity": "准备休眠"}} +]###SCHEDULE_END### -请你扮演我,以我的身份和口吻,为我生成一份完整的24小时日程表。 +请你扮演我,以我的身份和口吻,为我生成一份完整的24小时日程表。记住最后一定要加上结束标记。 """ - - # 尝试生成并验证日程,最多重试max_retries次 - for attempt in range(self.max_retries): - try: - logger.info(f"正在生成日程 (尝试 {attempt + 1}/{self.max_retries})") - response, _ = await self.llm.generate_response_async(prompt) - schedule_data = json.loads(repair_json(response)) - - # 使用Pydantic验证生成的日程数据 - if self._validate_schedule_with_pydantic(schedule_data): - # 验证通过,保存到数据库 - with get_db_session() as session: - # 检查是否已存在今天的日程 - existing_schedule = session.query(Schedule).filter(Schedule.date == today_str).first() - if existing_schedule: - # 更新现有日程 - session.query(Schedule).filter(Schedule.date == today_str).update({ - Schedule.schedule_data: json.dumps(schedule_data), - Schedule.updated_at: datetime.now() - }) - else: - # 创建新日程 - new_schedule = Schedule( - date=today_str, - schedule_data=json.dumps(schedule_data) - ) - session.add(new_schedule) - session.commit() + + # 无限重试直到生成成功的标准日程表 + attempt = 0 + while True: + attempt += 1 + try: + logger.info(f"正在生成日程 (第 {attempt} 次尝试)") - # 美化输出 - schedule_str = f"已成功生成并保存今天的日程 ({today_str}):\n" - for item in schedule_data: - schedule_str += f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n" - logger.info(schedule_str) + # 构建当前尝试的prompt,增加压力提示 + prompt = base_prompt + if attempt > 1: + failure_hint = f""" + +**重要提醒 (第{attempt}次尝试)**: +- 前面{attempt-1}次生成都失败了,请务必严格按照要求生成完整的24小时日程 +- 确保JSON格式正确,所有时间段连续覆盖24小时 +- 时间格式必须为HH:MM-HH:MM,不能有时间间隙或重叠 +- 不要输出任何解释文字,只输出纯JSON数组 +- **必须在最后加上结束标记 "###SCHEDULE_END###",这很重要!** +""" + prompt += failure_hint - self.today_schedule = schedule_data + response, _ = await self.llm.generate_response_async(prompt) - # 成功生成日程后,根据概率软删除使用过的月度计划 - if used_plan_ids and global_config.monthly_plan_system: - if random.random() < global_config.monthly_plan_system.deletion_probability_on_use: - logger.info(f"根据概率,将使用过的月度计划 {used_plan_ids} 标记为已完成。") - soft_delete_plans(used_plan_ids) - - return - else: - logger.warning(f"第 {attempt + 1} 次生成的日程验证失败,正在重试...") - if attempt < self.max_retries - 1: - # 在重试时添加更详细的错误提示 - prompt += "\n\n**上次生成失败,请特别注意**:\n- 确保所有时间段连续覆盖24小时\n- 时间格式必须为HH:MM-HH:MM\n- 不能有时间间隙或重叠" + # 首先检查结束标记,确保消息没有被截断 + if not self._check_response_completeness(response): + logger.warning(f"第 {attempt} 次生成被截断(缺少结束标记),继续重试...") + await asyncio.sleep(2) + continue + + # 清理响应内容,移除结束标记 + cleaned_response = self._clean_response(response) + + # 尝试解析和验证JSON + schedule_data = json.loads(repair_json(cleaned_response)) + + # 使用Pydantic验证生成的日程数据 + if self._validate_schedule_with_pydantic(schedule_data): + # 验证通过,保存到数据库 + with get_db_session() as session: + # 检查是否已存在今天的日程 + existing_schedule = session.query(Schedule).filter(Schedule.date == today_str).first() + if existing_schedule: + # 更新现有日程 + session.query(Schedule).filter(Schedule.date == today_str).update({ + Schedule.schedule_data: json.dumps(schedule_data), + Schedule.updated_at: datetime.now() + }) + else: + # 创建新日程 + new_schedule = Schedule( + date=today_str, + schedule_data=json.dumps(schedule_data) + ) + session.add(new_schedule) + session.commit() - except Exception as e: - logger.error(f"第 {attempt + 1} 次生成日程失败: {e}") - if attempt == self.max_retries - 1: - logger.error(f"经过 {self.max_retries} 次尝试,仍无法生成有效日程") + # 美化输出 + schedule_str = f"✅ 经过 {attempt} 次尝试,成功生成并保存今天的日程 ({today_str}):\n" + for item in schedule_data: + schedule_str += f" - {item.get('time_range', '未知时间')}: {item.get('activity', '未知活动')}\n" + logger.info(schedule_str) + + self.today_schedule = schedule_data + + # 成功生成日程后,根据概率软删除使用过的月度计划 + if used_plan_ids and global_config.monthly_plan_system: + if random.random() < global_config.monthly_plan_system.deletion_probability_on_use: + logger.info(f"根据概率,将使用过的月度计划 {used_plan_ids} 标记为已完成。") + soft_delete_plans(used_plan_ids) + + # 成功生成,退出无限循环 + break + + else: + logger.warning(f"第 {attempt} 次生成的日程验证失败,继续重试...") + # 添加短暂延迟,避免过于频繁的请求 + await asyncio.sleep(2) + + except Exception as e: + logger.error(f"第 {attempt} 次生成日程失败: {e}") + logger.info("继续重试...") + # 添加短暂延迟,避免过于频繁的请求 + await asyncio.sleep(3) + + finally: + self.schedule_generation_running = False + logger.info("日程生成任务结束") def get_current_activity(self) -> Optional[str]: # 检查是否启用日程管理功能 @@ -428,6 +482,43 @@ class ScheduleManager: return True + def _check_response_completeness(self, response: str) -> bool: + """检查响应是否完整(包含结束标记)""" + if not response or not response.strip(): + logger.warning("响应为空") + return False + + # 检查是否包含结束标记 + end_marker = "###SCHEDULE_END###" + if end_marker not in response: + logger.warning("响应缺少结束标记,可能被截断") + return False + + logger.debug("响应包含结束标记,消息完整") + return True + + def _clean_response(self, response: str) -> str: + """清理响应内容,移除结束标记和多余内容""" + if not response: + return response + + # 移除结束标记 + end_marker = "###SCHEDULE_END###" + if end_marker in response: + # 取结束标记之前的内容 + response = response.split(end_marker)[0] + + # 清理可能的多余空白 + response = response.strip() + + # 如果存在markdown代码块标记,清理掉 + if response.startswith("```json"): + response = response[7:] + if response.endswith("```"): + response = response[:-3] + + return response.strip() + class DailyScheduleGenerationTask(AsyncTask): """每日零点自动生成新日程的任务""" @@ -450,8 +541,8 @@ class DailyScheduleGenerationTask(AsyncTask): # 2. 等待直到零点 await asyncio.sleep(sleep_seconds) - # 3. 执行日程生成 - logger.info("到达每日零点,开始为新的一天生成日程...") + # 3. 执行异步日程生成 + logger.info("到达每日零点,开始异步生成新的一天日程...") await self.schedule_manager.generate_and_save_schedule() except asyncio.CancelledError: