fix(chatter): 防止 Chatter 和 ProactiveThinker 之间的竞争条件
可能会发生竞争条件,即 ProactiveThinker 在用户发送消息的同时选择一个会话进行主动操作。这将导致 Chatter 和 ProactiveThinker 同时处理该会话,从而产生重复或冲突的响应。 该提交引入了两部分修复: 1. Chatter 现在在接收到消息后会立即更新会话的 `last_activity_at`。这相当于一个软锁,表明该会话正在被主动处理。 2. ProactiveThinker 会在执行操作前对会话状态进行最终检查。如果会话不再处于 `WAITING` 状态,它会中止操作,让位给 Chatter。
This commit is contained in:
@@ -125,6 +125,9 @@ class KokoroFlowChatter(BaseChatter):
|
||||
# 3. 获取或创建 Session
|
||||
session = await self.session_manager.get_session(user_id, self.stream_id)
|
||||
|
||||
# 3.5 **立即**更新活动时间,阻止 ProactiveThinker 并发处理
|
||||
session.last_activity_at = time.time()
|
||||
|
||||
# 4. 确定 situation_type(根据之前的等待状态)
|
||||
situation_type = self._determine_situation_type(session)
|
||||
|
||||
|
||||
@@ -460,6 +460,13 @@ class ProactiveThinker:
|
||||
action.params["situation_type"] = "timeout"
|
||||
action.params["extra_context"] = extra_context
|
||||
|
||||
# ★ 在执行动作前最后一次检查状态,防止与 Chatter 并发
|
||||
if session.status != SessionStatus.WAITING:
|
||||
logger.info(
|
||||
f"[ProactiveThinker] Session {session.user_id} 已被 Chatter 处理,取消执行动作"
|
||||
)
|
||||
return
|
||||
|
||||
# 执行动作(回复生成在 Action.execute() 中完成)
|
||||
for action in plan_response.actions:
|
||||
await action_manager.execute_action(
|
||||
|
||||
@@ -199,6 +199,8 @@ class KokoroSession:
|
||||
"""结束等待"""
|
||||
self.status = SessionStatus.IDLE
|
||||
self.waiting_config.reset()
|
||||
# 更新活动时间,防止 ProactiveThinker 并发处理
|
||||
self.last_activity_at = time.time()
|
||||
|
||||
def get_recent_entries(self, limit: int = 20) -> list[MentalLogEntry]:
|
||||
"""获取最近的心理活动日志"""
|
||||
|
||||
Reference in New Issue
Block a user