fix(agent): refine agent update retry logic to handle creation race conditions #193
This commit is contained in:
@@ -601,7 +601,8 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
|
|||||||
if agent_just_created:
|
if agent_just_created:
|
||||||
await asyncio.sleep(0.75)
|
await asyncio.sleep(0.75)
|
||||||
|
|
||||||
# Retry agents.update a few times to handle gateway hot-reload race.
|
# Retry agents.update only when this call just created the agent.
|
||||||
|
# If create reported "already exists", "not found" should fail fast.
|
||||||
_update_retries = 5
|
_update_retries = 5
|
||||||
_update_delay = 0.5
|
_update_delay = 0.5
|
||||||
for _attempt in range(_update_retries):
|
for _attempt in range(_update_retries):
|
||||||
@@ -617,7 +618,12 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
|
|||||||
)
|
)
|
||||||
break
|
break
|
||||||
except OpenClawGatewayError as exc:
|
except OpenClawGatewayError as exc:
|
||||||
if _is_missing_agent_error(exc) and _attempt < _update_retries - 1:
|
should_retry = (
|
||||||
|
agent_just_created
|
||||||
|
and _is_missing_agent_error(exc)
|
||||||
|
and _attempt < _update_retries - 1
|
||||||
|
)
|
||||||
|
if should_retry:
|
||||||
await asyncio.sleep(_update_delay)
|
await asyncio.sleep(_update_delay)
|
||||||
_update_delay = min(_update_delay * 2, 4.0)
|
_update_delay = min(_update_delay * 2, 4.0)
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -530,6 +530,89 @@ async def test_control_plane_upsert_agent_handles_already_exists(monkeypatch):
|
|||||||
assert calls[1][0] == "agents.update"
|
assert calls[1][0] == "agents.update"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_control_plane_upsert_agent_retries_update_after_create_race(monkeypatch):
|
||||||
|
calls: list[tuple[str, dict[str, object] | None]] = []
|
||||||
|
sleeps: list[float] = []
|
||||||
|
update_attempts = 0
|
||||||
|
|
||||||
|
async def _fake_sleep(seconds: float) -> None:
|
||||||
|
sleeps.append(seconds)
|
||||||
|
|
||||||
|
async def _fake_openclaw_call(method, params=None, config=None):
|
||||||
|
nonlocal update_attempts
|
||||||
|
_ = config
|
||||||
|
calls.append((method, params))
|
||||||
|
if method == "agents.create":
|
||||||
|
return {"ok": True}
|
||||||
|
if method == "agents.update":
|
||||||
|
update_attempts += 1
|
||||||
|
if update_attempts < 3:
|
||||||
|
raise agent_provisioning.OpenClawGatewayError('agent "board-agent-a" not found')
|
||||||
|
return {"ok": True}
|
||||||
|
if method == "config.get":
|
||||||
|
return {"hash": None, "config": {"agents": {"list": []}}}
|
||||||
|
if method == "config.patch":
|
||||||
|
return {"ok": True}
|
||||||
|
raise AssertionError(f"Unexpected method: {method}")
|
||||||
|
|
||||||
|
monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call)
|
||||||
|
monkeypatch.setattr(agent_provisioning.asyncio, "sleep", _fake_sleep)
|
||||||
|
cp = agent_provisioning.OpenClawGatewayControlPlane(
|
||||||
|
agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None),
|
||||||
|
)
|
||||||
|
await cp.upsert_agent(
|
||||||
|
agent_provisioning.GatewayAgentRegistration(
|
||||||
|
agent_id="board-agent-a",
|
||||||
|
name="Board Agent A",
|
||||||
|
workspace_path="/tmp/workspace-board-agent-a",
|
||||||
|
heartbeat={"every": "10m", "target": "last", "includeReasoning": False},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
update_calls = [method for method, _ in calls if method == "agents.update"]
|
||||||
|
assert len(update_calls) == 3
|
||||||
|
assert sleeps == [0.75, 0.5, 1.0]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_control_plane_upsert_agent_missing_after_already_exists_fails_fast(monkeypatch):
|
||||||
|
calls: list[tuple[str, dict[str, object] | None]] = []
|
||||||
|
sleeps: list[float] = []
|
||||||
|
|
||||||
|
async def _fake_sleep(seconds: float) -> None:
|
||||||
|
sleeps.append(seconds)
|
||||||
|
|
||||||
|
async def _fake_openclaw_call(method, params=None, config=None):
|
||||||
|
_ = config
|
||||||
|
calls.append((method, params))
|
||||||
|
if method == "agents.create":
|
||||||
|
raise agent_provisioning.OpenClawGatewayError("already exists")
|
||||||
|
if method == "agents.update":
|
||||||
|
raise agent_provisioning.OpenClawGatewayError('agent "board-agent-a" not found')
|
||||||
|
raise AssertionError(f"Unexpected method: {method}")
|
||||||
|
|
||||||
|
monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call)
|
||||||
|
monkeypatch.setattr(agent_provisioning.asyncio, "sleep", _fake_sleep)
|
||||||
|
cp = agent_provisioning.OpenClawGatewayControlPlane(
|
||||||
|
agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None),
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(agent_provisioning.OpenClawGatewayError):
|
||||||
|
await cp.upsert_agent(
|
||||||
|
agent_provisioning.GatewayAgentRegistration(
|
||||||
|
agent_id="board-agent-a",
|
||||||
|
name="Board Agent A",
|
||||||
|
workspace_path="/tmp/workspace-board-agent-a",
|
||||||
|
heartbeat={"every": "10m", "target": "last", "includeReasoning": False},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
update_calls = [method for method, _ in calls if method == "agents.update"]
|
||||||
|
assert len(update_calls) == 1
|
||||||
|
assert sleeps == []
|
||||||
|
|
||||||
|
|
||||||
def test_is_missing_agent_error_matches_gateway_agent_not_found() -> None:
|
def test_is_missing_agent_error_matches_gateway_agent_not_found() -> None:
|
||||||
assert agent_provisioning._is_missing_agent_error(
|
assert agent_provisioning._is_missing_agent_error(
|
||||||
agent_provisioning.OpenClawGatewayError('agent "mc-abc" not found'),
|
agent_provisioning.OpenClawGatewayError('agent "mc-abc" not found'),
|
||||||
|
|||||||
Reference in New Issue
Block a user