🤖 自动格式化代码 [skip ci]

This commit is contained in:
github-actions[bot]
2025-04-18 03:37:20 +00:00
parent c0dcd578c9
commit dfe788c65c
12 changed files with 610 additions and 487 deletions

View File

@@ -39,7 +39,7 @@ class ChattingObservation(Observation):
self.mid_memory_info = ""
self.now_message_info = ""
self._observe_lock = asyncio.Lock() # 添加锁
self._observe_lock = asyncio.Lock() # 添加锁
self.llm_summary = LLMRequest(
model=global_config.llm_observation, temperature=0.7, max_tokens=300, request_type="chat_observation"
@@ -73,7 +73,7 @@ class ChattingObservation(Observation):
return self.now_message_info
async def observe(self):
async with self._observe_lock: # 获取锁
async with self._observe_lock: # 获取锁
# 查找新消息,最多获取 self.max_now_obs_len 条
new_messages_cursor = (
db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}})
@@ -87,30 +87,38 @@ class ChattingObservation(Observation):
# 如果没有获取到限制数量内的较新消息,可能仍然有更早的消息,但我们只关注最近的
# 检查是否有任何新消息(即使超出限制),以决定是否更新 last_observe_time
# 注意:这里的查询也可能与其他并发 observe 冲突,但锁保护了状态更新
any_new_message = db.messages.find_one({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}})
any_new_message = db.messages.find_one(
{"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}
)
if not any_new_message:
return # 确实没有新消息
return # 确实没有新消息
# 如果有超过限制的更早的新消息,仍然需要更新时间戳,防止重复获取旧消息
# 但不将它们加入 talking_message
latest_message_time_cursor = db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}}).sort("time", -1).limit(1)
latest_message_time_cursor = (
db.messages.find({"chat_id": self.chat_id, "time": {"$gt": self.last_observe_time}})
.sort("time", -1)
.limit(1)
)
latest_time_doc = next(latest_message_time_cursor, None)
if latest_time_doc:
# 确保只在严格大于时更新,避免因并发查询导致时间戳回退
if latest_time_doc["time"] > self.last_observe_time:
self.last_observe_time = latest_time_doc["time"]
return # 返回,因为我们只关心限制内的最新消息
return # 返回,因为我们只关心限制内的最新消息
# 在持有锁的情况下,再次过滤,确保只处理真正新的消息
# 防止处理在等待锁期间已被其他协程处理的消息
truly_new_messages = [msg for msg in new_messages if msg["time"] > self.last_observe_time]
if not truly_new_messages:
logger.debug(f"Chat {self.chat_id}: Fetched messages, but already processed by another concurrent observe call.")
return # 所有获取的消息都已被处理
logger.debug(
f"Chat {self.chat_id}: Fetched messages, but already processed by another concurrent observe call."
)
return # 所有获取的消息都已被处理
# 如果获取到了 truly_new_messages (在限制内且时间戳大于上次记录)
self.last_observe_time = truly_new_messages[-1]["time"] # 更新时间戳为获取到的最新消息的时间
self.last_observe_time = truly_new_messages[-1]["time"] # 更新时间戳为获取到的最新消息的时间
self.talking_message.extend(truly_new_messages)
@@ -124,21 +132,23 @@ class ChattingObservation(Observation):
# 锁保证了这部分逻辑的原子性
if len(self.talking_message) > self.max_now_obs_len:
try: # 使用 try...finally 仅用于可能的LLM调用错误处理
try: # 使用 try...finally 仅用于可能的LLM调用错误处理
# 计算需要移除的消息数量,保留最新的 max_now_obs_len 条
messages_to_remove_count = len(self.talking_message) - self.max_now_obs_len
oldest_messages = self.talking_message[:messages_to_remove_count]
self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的
oldest_messages_str = "\n".join([msg["detailed_plain_text"] for msg in oldest_messages if "detailed_plain_text" in msg]) # 增加检查
self.talking_message = self.talking_message[messages_to_remove_count:] # 保留后半部分,即最新的
oldest_messages_str = "\n".join(
[msg["detailed_plain_text"] for msg in oldest_messages if "detailed_plain_text" in msg]
) # 增加检查
oldest_timestamps = [msg["time"] for msg in oldest_messages]
# 调用 LLM 总结主题
prompt = f"请总结以下聊天记录的主题:\n{oldest_messages_str}\n主题,用一句话概括包括人物事件和主要信息,不要分点:"
summary = "无法总结主题" # 默认值
summary = "无法总结主题" # 默认值
try:
summary_result, _ = await self.llm_summary.generate_response_async(prompt)
if summary_result: # 确保结果不为空
summary = summary_result
if summary_result: # 确保结果不为空
summary = summary_result
except Exception as e:
logger.error(f"总结主题失败 for chat {self.chat_id}: {e}")
# 保留默认总结 "无法总结主题"
@@ -146,7 +156,7 @@ class ChattingObservation(Observation):
mid_memory = {
"id": str(int(datetime.now().timestamp())),
"theme": summary,
"messages": oldest_messages, # 存储原始消息对象
"messages": oldest_messages, # 存储原始消息对象
"timestamps": oldest_timestamps,
"chat_id": self.chat_id,
"created_at": datetime.now().timestamp(),
@@ -155,16 +165,16 @@ class ChattingObservation(Observation):
# 存入内存中的 mid_memorys
self.mid_memorys.append(mid_memory)
if len(self.mid_memorys) > self.max_mid_memory_len:
self.mid_memorys.pop(0) # 移除最旧的
self.mid_memorys.pop(0) # 移除最旧的
mid_memory_str = "之前聊天的内容概括是:\n"
for mid_memory_item in self.mid_memorys: # 重命名循环变量以示区分
for mid_memory_item in self.mid_memorys: # 重命名循环变量以示区分
time_diff = int((datetime.now().timestamp() - mid_memory_item["created_at"]) / 60)
mid_memory_str += f"距离现在{time_diff}分钟前(聊天记录id:{mid_memory_item['id']}){mid_memory_item['theme']}\n"
self.mid_memory_info = mid_memory_str
except Exception as e: # 将异常处理移至此处以覆盖整个总结过程
except Exception as e: # 将异常处理移至此处以覆盖整个总结过程
logger.error(f"处理和总结旧消息时出错 for chat {self.chat_id}: {e}")
traceback.print_exc() # 记录详细堆栈
traceback.print_exc() # 记录详细堆栈
# print(f"处理后self.talking_message{self.talking_message}")
@@ -173,7 +183,9 @@ class ChattingObservation(Observation):
now_message_str += self.translate_message_list_to_str(talking_message=self.talking_message)
self.now_message_info = now_message_str
logger.debug(f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}")
logger.debug(
f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}"
)
# 锁在退出 async with 块时自动释放
async def update_talking_summary(self, new_messages_str):