feat(agent): Refactor agent provisioning and cleanup logic for improved functionality
This commit is contained in:
@@ -11,7 +11,6 @@ from sqlmodel import Session, col, select
|
||||
from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent
|
||||
from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token
|
||||
from app.core.auth import AuthContext
|
||||
from app.core.config import settings
|
||||
from app.db.session import get_session
|
||||
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
|
||||
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
|
||||
@@ -31,8 +30,8 @@ from app.schemas.agents import (
|
||||
from app.services.activity_log import record_activity
|
||||
from app.services.agent_provisioning import (
|
||||
DEFAULT_HEARTBEAT_CONFIG,
|
||||
send_provisioning_message,
|
||||
send_update_message,
|
||||
cleanup_agent_direct,
|
||||
provision_agent,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/agents", tags=["agents"])
|
||||
@@ -194,8 +193,6 @@ async def create_agent(
|
||||
agent.agent_token_hash = hash_agent_token(raw_token)
|
||||
if agent.heartbeat_config is None:
|
||||
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
|
||||
provision_token = generate_agent_token()
|
||||
agent.provision_confirm_token_hash = hash_agent_token(provision_token)
|
||||
agent.provision_requested_at = datetime.utcnow()
|
||||
agent.provision_action = "provision"
|
||||
session_key, session_error = await _ensure_gateway_session(agent.name, client_config)
|
||||
@@ -219,15 +216,27 @@ async def create_agent(
|
||||
)
|
||||
session.commit()
|
||||
try:
|
||||
await send_provisioning_message(
|
||||
agent, board, gateway, raw_token, provision_token, auth.user
|
||||
await provision_agent(agent, board, gateway, raw_token, auth.user, action="provision")
|
||||
await _send_wakeup_message(agent, client_config, verb="provisioned")
|
||||
agent.provision_confirm_token_hash = None
|
||||
agent.provision_requested_at = None
|
||||
agent.provision_action = None
|
||||
agent.updated_at = datetime.utcnow()
|
||||
session.add(agent)
|
||||
session.commit()
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.provision",
|
||||
message=f"Provisioned directly for {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.provision.requested",
|
||||
message=f"Provisioning requested for {agent.name}.",
|
||||
event_type="agent.wakeup.sent",
|
||||
message=f"Wakeup message sent to {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
session.commit()
|
||||
except OpenClawGatewayError as exc:
|
||||
_record_instruction_failure(session, agent, str(exc), "provision")
|
||||
session.commit()
|
||||
@@ -295,9 +304,7 @@ async def update_agent(
|
||||
_record_instruction_failure(session, agent, str(exc), "update")
|
||||
session.commit()
|
||||
raw_token = generate_agent_token()
|
||||
provision_token = generate_agent_token()
|
||||
agent.agent_token_hash = hash_agent_token(raw_token)
|
||||
agent.provision_confirm_token_hash = hash_agent_token(provision_token)
|
||||
agent.provision_requested_at = datetime.utcnow()
|
||||
agent.provision_action = "update"
|
||||
agent.status = "updating"
|
||||
@@ -305,11 +312,25 @@ async def update_agent(
|
||||
session.commit()
|
||||
session.refresh(agent)
|
||||
try:
|
||||
await send_update_message(agent, board, gateway, raw_token, provision_token, auth.user)
|
||||
await provision_agent(agent, board, gateway, raw_token, auth.user, action="update")
|
||||
await _send_wakeup_message(agent, client_config, verb="updated")
|
||||
agent.provision_confirm_token_hash = None
|
||||
agent.provision_requested_at = None
|
||||
agent.provision_action = None
|
||||
agent.status = "online"
|
||||
agent.updated_at = datetime.utcnow()
|
||||
session.add(agent)
|
||||
session.commit()
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.update.requested",
|
||||
message=f"Update requested for {agent.name}.",
|
||||
event_type="agent.update.direct",
|
||||
message=f"Updated directly for {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.wakeup.sent",
|
||||
message=f"Wakeup message sent to {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
session.commit()
|
||||
@@ -367,8 +388,6 @@ async def heartbeat_or_create_agent(
|
||||
)
|
||||
raw_token = generate_agent_token()
|
||||
agent.agent_token_hash = hash_agent_token(raw_token)
|
||||
provision_token = generate_agent_token()
|
||||
agent.provision_confirm_token_hash = hash_agent_token(provision_token)
|
||||
agent.provision_requested_at = datetime.utcnow()
|
||||
agent.provision_action = "provision"
|
||||
session_key, session_error = await _ensure_gateway_session(agent.name, client_config)
|
||||
@@ -392,15 +411,27 @@ async def heartbeat_or_create_agent(
|
||||
)
|
||||
session.commit()
|
||||
try:
|
||||
await send_provisioning_message(
|
||||
agent, board, gateway, raw_token, provision_token, actor.user
|
||||
await provision_agent(agent, board, gateway, raw_token, actor.user, action="provision")
|
||||
await _send_wakeup_message(agent, client_config, verb="provisioned")
|
||||
agent.provision_confirm_token_hash = None
|
||||
agent.provision_requested_at = None
|
||||
agent.provision_action = None
|
||||
agent.updated_at = datetime.utcnow()
|
||||
session.add(agent)
|
||||
session.commit()
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.provision",
|
||||
message=f"Provisioned directly for {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.provision.requested",
|
||||
message=f"Provisioning requested for {agent.name}.",
|
||||
event_type="agent.wakeup.sent",
|
||||
message=f"Wakeup message sent to {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
session.commit()
|
||||
except OpenClawGatewayError as exc:
|
||||
_record_instruction_failure(session, agent, str(exc), "provision")
|
||||
session.commit()
|
||||
@@ -414,8 +445,6 @@ async def heartbeat_or_create_agent(
|
||||
agent.agent_token_hash = hash_agent_token(raw_token)
|
||||
if agent.heartbeat_config is None:
|
||||
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
|
||||
provision_token = generate_agent_token()
|
||||
agent.provision_confirm_token_hash = hash_agent_token(provision_token)
|
||||
agent.provision_requested_at = datetime.utcnow()
|
||||
agent.provision_action = "provision"
|
||||
session.add(agent)
|
||||
@@ -424,15 +453,27 @@ async def heartbeat_or_create_agent(
|
||||
try:
|
||||
board = _require_board(session, str(agent.board_id) if agent.board_id else None)
|
||||
gateway, client_config = _require_gateway(session, board)
|
||||
await send_provisioning_message(
|
||||
agent, board, gateway, raw_token, provision_token, actor.user
|
||||
await provision_agent(agent, board, gateway, raw_token, actor.user, action="provision")
|
||||
await _send_wakeup_message(agent, client_config, verb="provisioned")
|
||||
agent.provision_confirm_token_hash = None
|
||||
agent.provision_requested_at = None
|
||||
agent.provision_action = None
|
||||
agent.updated_at = datetime.utcnow()
|
||||
session.add(agent)
|
||||
session.commit()
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.provision",
|
||||
message=f"Provisioned directly for {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.provision.requested",
|
||||
message=f"Provisioning requested for {agent.name}.",
|
||||
event_type="agent.wakeup.sent",
|
||||
message=f"Wakeup message sent to {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
session.commit()
|
||||
except OpenClawGatewayError as exc:
|
||||
_record_instruction_failure(session, agent, str(exc), "provision")
|
||||
session.commit()
|
||||
@@ -485,61 +526,39 @@ def delete_agent(
|
||||
return {"ok": True}
|
||||
|
||||
board = _require_board(session, str(agent.board_id) if agent.board_id else None)
|
||||
gateway, client_config = _require_gateway(session, board)
|
||||
raw_token = generate_agent_token()
|
||||
agent.delete_confirm_token_hash = hash_agent_token(raw_token)
|
||||
agent.delete_requested_at = datetime.utcnow()
|
||||
agent.status = "deleting"
|
||||
agent.updated_at = datetime.utcnow()
|
||||
session.add(agent)
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.delete.requested",
|
||||
message=f"Delete requested for {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
)
|
||||
session.commit()
|
||||
|
||||
async def _gateway_cleanup_request() -> None:
|
||||
main_session = gateway.main_session_key
|
||||
if not main_session:
|
||||
raise OpenClawGatewayError("Gateway main_session_key is required")
|
||||
workspace_path = _workspace_path(agent.name, gateway.workspace_root)
|
||||
base_url = settings.base_url or "REPLACE_WITH_BASE_URL"
|
||||
cleanup_message = (
|
||||
"Cleanup request for deleted agent.\n\n"
|
||||
f"Agent name: {agent.name}\n"
|
||||
f"Agent id: {agent.id}\n"
|
||||
f"Session key: {agent.openclaw_session_id or _build_session_key(agent.name)}\n"
|
||||
f"Workspace path: {workspace_path}\n\n"
|
||||
"Actions:\n"
|
||||
"1) Remove the workspace directory.\n"
|
||||
"2) Delete the agent session from the gateway.\n"
|
||||
"3) Confirm deletion by calling:\n"
|
||||
f" POST {base_url}/api/v1/agents/{agent.id}/delete/confirm\n"
|
||||
' Body: {"token": "' + raw_token + '"}\n'
|
||||
"Reply NO_REPLY."
|
||||
)
|
||||
await ensure_session(main_session, config=client_config, label="Main Agent")
|
||||
await send_message(
|
||||
cleanup_message,
|
||||
session_key=main_session,
|
||||
config=client_config,
|
||||
deliver=False,
|
||||
)
|
||||
|
||||
gateway, _ = _require_gateway(session, board)
|
||||
try:
|
||||
import asyncio
|
||||
|
||||
asyncio.run(_gateway_cleanup_request())
|
||||
asyncio.run(cleanup_agent_direct(agent, gateway, delete_workspace=True))
|
||||
except OpenClawGatewayError as exc:
|
||||
_record_instruction_failure(session, agent, str(exc), "delete")
|
||||
session.commit()
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=f"Gateway cleanup request failed: {exc}",
|
||||
detail=f"Gateway cleanup failed: {exc}",
|
||||
) from exc
|
||||
except Exception as exc: # pragma: no cover - unexpected cleanup errors
|
||||
_record_instruction_failure(session, agent, str(exc), "delete")
|
||||
session.commit()
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Workspace cleanup failed: {exc}",
|
||||
) from exc
|
||||
|
||||
record_activity(
|
||||
session,
|
||||
event_type="agent.delete.direct",
|
||||
message=f"Deleted agent {agent.name}.",
|
||||
agent_id=None,
|
||||
)
|
||||
session.execute(
|
||||
update(ActivityEvent)
|
||||
.where(col(ActivityEvent.agent_id) == agent.id)
|
||||
.values(agent_id=None)
|
||||
)
|
||||
session.delete(agent)
|
||||
session.commit()
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user