diff --git a/backend/app/api/agents.py b/backend/app/api/agents.py index 002019d4..a0788be0 100644 --- a/backend/app/api/agents.py +++ b/backend/app/api/agents.py @@ -192,9 +192,10 @@ async def _ensure_gateway_session( session_key = _build_session_key(agent_name) try: await ensure_session(session_key, config=config, label=agent_name) - return session_key, None except OpenClawGatewayError as exc: return session_key, str(exc) + else: + return session_key, None def _with_computed_status(agent: Agent) -> Agent: @@ -356,21 +357,20 @@ async def stream_agents( while True: if await request.is_disconnected(): break - async with async_session_maker() as session: + async with async_session_maker() as stream_session: if board_id is not None: - agents = await _fetch_agent_events(session, board_id, last_seen) + agents = await _fetch_agent_events(stream_session, board_id, last_seen) elif allowed_ids: - agents = await _fetch_agent_events(session, None, last_seen) + agents = await _fetch_agent_events(stream_session, None, last_seen) agents = [agent for agent in agents if agent.board_id in allowed_ids] else: agents = [] main_session_keys = ( - await _get_gateway_main_session_keys(session) if agents else set() + await _get_gateway_main_session_keys(stream_session) if agents else set() ) for agent in agents: updated_at = agent.updated_at or agent.last_seen_at or utcnow() - if updated_at > last_seen: - last_seen = updated_at + last_seen = max(updated_at, last_seen) payload = {"agent": _serialize_agent(agent, main_session_keys)} yield {"event": "agent", "data": json.dumps(payload)} await asyncio.sleep(2) @@ -841,60 +841,59 @@ async def heartbeat_or_create_agent( except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover _record_instruction_failure(session, agent, str(exc), "provision") await session.commit() - else: - if actor.actor_type == "user": - ctx = await _require_user_context(session, actor.user) - await _require_agent_access(session, agent=agent, ctx=ctx, write=True) + elif actor.actor_type == "user": + ctx = await _require_user_context(session, actor.user) + await _require_agent_access(session, agent=agent, ctx=ctx, write=True) - if agent.agent_token_hash is None: - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - agent.provision_requested_at = utcnow() - agent.provision_action = "provision" + if agent.agent_token_hash is None: + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + if agent.heartbeat_config is None: + agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() + agent.provision_requested_at = utcnow() + agent.provision_action = "provision" + session.add(agent) + await session.commit() + await session.refresh(agent) + try: + board = await _require_board( + session, + str(agent.board_id) if agent.board_id else None, + user=actor.user, + write=True, + ) + gateway, client_config = await _require_gateway(session, board) + await provision_agent( + agent, board, gateway, raw_token, actor.user, action="provision" + ) + await _send_wakeup_message(agent, client_config, verb="provisioned") + agent.provision_confirm_token_hash = None + agent.provision_requested_at = None + agent.provision_action = None + agent.updated_at = utcnow() session.add(agent) await session.commit() - await session.refresh(agent) - try: - board = await _require_board( - session, - str(agent.board_id) if agent.board_id else None, - user=actor.user, - write=True, - ) - gateway, client_config = await _require_gateway(session, board) - await provision_agent( - agent, board, gateway, raw_token, actor.user, action="provision" - ) - await _send_wakeup_message(agent, client_config, verb="provisioned") - agent.provision_confirm_token_hash = None - agent.provision_requested_at = None - agent.provision_action = None - agent.updated_at = utcnow() - session.add(agent) - await session.commit() - record_activity( - session, - event_type="agent.provision", - message=f"Provisioned directly for {agent.name}.", - agent_id=agent.id, - ) - record_activity( - session, - event_type="agent.wakeup.sent", - message=f"Wakeup message sent to {agent.name}.", - agent_id=agent.id, - ) - await session.commit() - except OpenClawGatewayError as exc: - _record_instruction_failure(session, agent, str(exc), "provision") - await session.commit() - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - _record_instruction_failure(session, agent, str(exc), "provision") - await session.commit() - elif actor.actor_type == "agent" and actor.agent and actor.agent.id != agent.id: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + record_activity( + session, + event_type="agent.provision", + message=f"Provisioned directly for {agent.name}.", + agent_id=agent.id, + ) + record_activity( + session, + event_type="agent.wakeup.sent", + message=f"Wakeup message sent to {agent.name}.", + agent_id=agent.id, + ) + await session.commit() + except OpenClawGatewayError as exc: + _record_instruction_failure(session, agent, str(exc), "provision") + await session.commit() + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + _record_instruction_failure(session, agent, str(exc), "provision") + await session.commit() + elif actor.actor_type == "agent" and actor.agent and actor.agent.id != agent.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if not agent.openclaw_session_id: board = await _require_board(