better 日程系统2.0

This commit is contained in:
SengokuCola
2025-03-26 21:31:32 +08:00
parent 2ac4db5f4e
commit ee4a2f6e72
3 changed files with 64 additions and 72 deletions

View File

@@ -130,7 +130,7 @@ MaiMBot是一个开源项目我们非常欢迎你的参与。你的贡献
### 💬交流群
- [五群](https://qm.qq.com/q/JxvHZnxyec) 1022489779开发和建议相关讨论不一定有空回复会优先写文档和代码
- [一群](https://qm.qq.com/q/VQ3XZrWgMs) 766798517 【已满】(开发和建议相关讨论)不一定有空回复,会优先写文档和代码
- [二群](https://qm.qq.com/q/RzmCiRtHEW) 571780722 【已满】(开发和建议相关讨论)不一定有空回复,会优先写文档和代码
- [二群](https://qm.qq.com/q/RzmCiRtHEW) 571780722开发和建议相关讨论不一定有空回复会优先写文档和代码
- [三群](https://qm.qq.com/q/wlH5eT8OmQ) 1035228475【已满】开发和建议相关讨论不一定有空回复会优先写文档和代码
- [四群](https://qm.qq.com/q/wlH5eT8OmQ) 729957033【已满】开发和建议相关讨论不一定有空回复会优先写文档和代码

View File

@@ -22,57 +22,7 @@ class LLMModel:
logger.info(f"API URL: {self.base_url}") # 使用 logger 记录 base_url
def generate_response(self, prompt: str) -> Union[str, Tuple[str, str]]:
"""根据输入的提示生成模型的响应"""
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
# 构建请求体
data = {
"model": self.model_name,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5,
**self.params,
}
# 发送请求到完整的 chat/completions 端点
api_url = f"{self.base_url.rstrip('/')}/chat/completions"
logger.info(f"Request URL: {api_url}") # 记录请求的 URL
max_retries = 3
base_wait_time = 15 # 基础等待时间(秒)
for retry in range(max_retries):
try:
response = requests.post(api_url, headers=headers, json=data)
if response.status_code == 429:
wait_time = base_wait_time * (2**retry) # 指数退避
logger.warning(f"遇到请求限制(429),等待{wait_time}秒后重试...")
time.sleep(wait_time)
continue
response.raise_for_status() # 检查其他响应状态
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
reasoning_content = result["choices"][0]["message"].get("reasoning_content", "")
return content, reasoning_content
return "没有返回结果", ""
except Exception as e:
if retry < max_retries - 1: # 如果还有重试机会
wait_time = base_wait_time * (2**retry)
logger.error(f"[回复]请求失败,等待{wait_time}秒后重试... 错误: {str(e)}")
time.sleep(wait_time)
else:
logger.error(f"请求失败: {str(e)}")
return f"请求失败: {str(e)}", ""
logger.error("达到最大重试次数,请求仍然失败")
return "达到最大重试次数,请求仍然失败", ""
async def generate_response_async(self, prompt: str) -> Union[str, Tuple[str, str]]:
async def generate_response_async(self, prompt: str) -> str:
"""异步方式根据输入的提示生成模型的响应"""
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}

View File

@@ -20,7 +20,7 @@ class ScheduleGenerator:
def __init__(self, name: str = "bot_name", personality: str = "你是一个爱国爱党的新时代青年", behavior: str = "你非常外向,喜欢尝试新事物和人交流"):
# 使用离线LLM模型
self.llm_scheduler = LLMModel(model_name="Pro/deepseek-ai/DeepSeek-V3", temperature=0.9)
self.llm_scheduler = LLMModel(model_name="Qwen/Qwen2.5-32B-Instruct", temperature=0.9)
self.today_schedule_text = ""
self.today_done_list = []
@@ -34,6 +34,8 @@ class ScheduleGenerator:
self.start_time = datetime.datetime.now()
self.schedule_doing_update_interval = 60 #最好大于60
async def mai_schedule_start(self):
"""启动日程系统每5分钟执行一次move_doing并在日期变化时重新检查日程"""
try:
@@ -43,6 +45,8 @@ class ScheduleGenerator:
self.print_schedule()
while True:
print(self.get_current_num_task(1, True))
current_time = datetime.datetime.now()
# 检查是否需要重新生成日程(日期变化)
@@ -56,8 +60,7 @@ class ScheduleGenerator:
current_activity = await self.move_doing()
logger.info(f"当前活动: {current_activity}")
# 等待5分钟
await asyncio.sleep(300) # 300秒 = 5分钟
await asyncio.sleep(self.schedule_doing_update_interval)
except Exception as e:
logger.error(f"日程系统运行时出错: {str(e)}")
@@ -79,6 +82,8 @@ class ScheduleGenerator:
# 检查今天的日程
self.today_schedule_text, self.today_done_list = self.load_schedule_from_db(today)
if not self.today_done_list:
self.today_done_list = []
if not self.today_schedule_text:
logger.info(f"{today.strftime('%Y-%m-%d')}的日程不存在,准备生成新的日程")
self.today_schedule_text = await self.generate_daily_schedule(target_date=today)
@@ -98,10 +103,16 @@ class ScheduleGenerator:
def construct_doing_prompt(self,time: datetime.datetime):
now_time = time.strftime("%H:%M")
previous_doing = self.today_done_list[-20:] if len(self.today_done_list) > 20 else self.today_done_list
if self.today_done_list:
previous_doing = self.get_current_num_task(10, True)
print(previous_doing)
else:
previous_doing = "我没做什么事情"
prompt = f"我是{self.name}{self.personality}{self.behavior}"
prompt += f"我今天的日程是:{self.today_schedule_text}\n"
prompt += f"我之前做了的事情是:{previous_doing}\n"
prompt += f"我之前做了的事情是:{previous_doing},从之前到现在已经过去了{self.schedule_doing_update_interval/60}分钟了\n"
prompt += f"现在是{now_time},结合我的个人特点和行为习惯,"
prompt += "推测我现在做什么,具体一些,详细一些\n"
prompt += "直接返回我在做的事情,不要输出其他内容:"
@@ -110,7 +121,7 @@ class ScheduleGenerator:
async def generate_daily_schedule(
self, target_date: datetime.datetime = None,) -> Dict[str, str]:
daytime_prompt = self.construct_daytime_prompt(target_date)
daytime_response, _ = await self.llm_scheduler.generate_response(daytime_prompt)
daytime_response,_ = await self.llm_scheduler.generate_response_async(daytime_prompt)
return daytime_response
def _time_diff(self, time1: str, time2: str) -> int:
@@ -160,15 +171,54 @@ class ScheduleGenerator:
current_time = datetime.datetime.now()
time_str = current_time.strftime("%H:%M")
doing_prompt = self.construct_doing_prompt(current_time)
doing_response, _ = await self.llm_scheduler.generate_response(doing_prompt)
self.today_done_list.append(current_time,time_str + "" + doing_response)
doing_response,_ = await self.llm_scheduler.generate_response_async(doing_prompt)
self.today_done_list.append((current_time, time_str + "时," + doing_response))
await self.update_today_done_list()
return doing_response
async def get_task_from_time_to_time(self, start_time: str, end_time: str):
"""获取指定时间范围内的任务列表
Args:
start_time (str): 开始时间,格式为"HH:MM"
end_time (str): 结束时间,格式为"HH:MM"
Returns:
list: 时间范围内的任务列表
"""
result = []
for task in self.today_done_list:
task_time = task[0] # 获取任务的时间戳
task_time_str = task_time.strftime("%H:%M")
# 检查任务时间是否在指定范围内
if self._time_diff(start_time, task_time_str) >= 0 and self._time_diff(task_time_str, end_time) >= 0:
result.append(task)
return result
def get_current_num_task(self, num=1, time_info = False):
"""获取最新加入的指定数量的日程
Args:
num (int): 需要获取的日程数量默认为1
Returns:
list: 最新加入的日程列表
"""
if not self.today_done_list:
return []
# 确保num不超过列表长度
num = min(num, len(self.today_done_list))
pre_doing = ""
for doing in self.today_done_list[-num:]:
pre_doing += doing[1]
# 返回最新的num条日程
return pre_doing
def save_today_schedule_to_db(self):
"""保存日程到数据库,同时初始化 today_done_list"""
@@ -200,18 +250,10 @@ class ScheduleGenerator:
async def main():
# 使用示例
scheduler = ScheduleGenerator(name="麦麦", personality="你叫麦麦你19岁是一个大二的女大学生你有一头黑色短发你会刷贴吧你现在在学习心理学", behavior="你比较内向")
await scheduler.check_and_create_today_schedule()
scheduler.print_schedule()
print("\n当前任务:")
print(await scheduler.get_current_task())
scheduler = ScheduleGenerator(name="麦麦", personality="你叫麦麦你19岁是一个大二的女大学生你有一头黑色短发你会刷贴吧你现在在学习心理学", behavior="你比较内向一般熬夜比较晚然后第二天早上10点起床吃早午饭")
await scheduler.mai_schedule_start()
print("昨天日程:")
print(scheduler.yesterday_schedule)
print("今天日程:")
print(scheduler.today_schedule)
print("明天日程:")
print(scheduler.tomorrow_schedule)
# 当作为组件导入时使用的实例
bot_schedule = ScheduleGenerator()