from __future__ import annotations import json import re from pathlib import Path from typing import Any from uuid import uuid4 from jinja2 import Environment, FileSystemLoader, StrictUndefined, select_autoescape from app.core.config import settings from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, openclaw_call from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway from app.models.users import User DEFAULT_HEARTBEAT_CONFIG = {"every": "10m", "target": "none"} DEFAULT_GATEWAY_FILES = frozenset( { "AGENTS.md", "SOUL.md", "TOOLS.md", "IDENTITY.md", "USER.md", "HEARTBEAT.md", "BOOTSTRAP.md", "MEMORY.md", } ) def _repo_root() -> Path: return Path(__file__).resolve().parents[3] def _templates_root() -> Path: return _repo_root() / "templates" def _slugify(value: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") return slug or uuid4().hex def _agent_key(agent: Agent) -> str: session_key = agent.openclaw_session_id or "" if session_key.startswith("agent:"): parts = session_key.split(":") if len(parts) >= 2 and parts[1]: return parts[1] return _slugify(agent.name) def _heartbeat_config(agent: Agent) -> dict[str, Any]: if agent.heartbeat_config: return agent.heartbeat_config return DEFAULT_HEARTBEAT_CONFIG.copy() def _template_env() -> Environment: return Environment( loader=FileSystemLoader(_templates_root()), autoescape=select_autoescape(default=True), undefined=StrictUndefined, keep_trailing_newline=True, ) def _workspace_path(agent_name: str, workspace_root: str) -> str: if not workspace_root: raise ValueError("gateway_workspace_root is required") root = workspace_root root = root.rstrip("/") return f"{root}/workspace-{_slugify(agent_name)}" def _build_context( agent: Agent, board: Board, gateway: Gateway, auth_token: str, user: User | None, ) -> dict[str, str]: if not gateway.workspace_root: raise ValueError("gateway_workspace_root is required") if not gateway.main_session_key: raise ValueError("gateway_main_session_key is required") agent_id = str(agent.id) workspace_root = gateway.workspace_root workspace_path = _workspace_path(agent.name, workspace_root) session_key = agent.openclaw_session_id or "" base_url = settings.base_url or "REPLACE_WITH_BASE_URL" main_session_key = gateway.main_session_key return { "agent_name": agent.name, "agent_id": agent_id, "board_id": str(board.id), "session_key": session_key, "workspace_path": workspace_path, "base_url": base_url, "auth_token": auth_token, "main_session_key": main_session_key, "workspace_root": workspace_root, "user_name": (user.name or "") if user else "", "user_preferred_name": (user.preferred_name or "") if user else "", "user_pronouns": (user.pronouns or "") if user else "", "user_timezone": (user.timezone or "") if user else "", "user_notes": (user.notes or "") if user else "", "user_context": (user.context or "") if user else "", } def _session_key(agent: Agent) -> str: if agent.openclaw_session_id: return agent.openclaw_session_id return f"agent:{_agent_key(agent)}:main" async def _supported_gateway_files(config: GatewayClientConfig) -> set[str]: try: agents_payload = await openclaw_call("agents.list", config=config) agents = [] default_id = None if isinstance(agents_payload, dict): agents = list(agents_payload.get("agents") or []) default_id = agents_payload.get("defaultId") or agents_payload.get("default_id") agent_id = default_id or (agents[0].get("id") if agents else None) if not agent_id: return set(DEFAULT_GATEWAY_FILES) files_payload = await openclaw_call( "agents.files.list", {"agentId": agent_id}, config=config ) if isinstance(files_payload, dict): files = files_payload.get("files") or [] supported = {item.get("name") for item in files if isinstance(item, dict)} return supported or set(DEFAULT_GATEWAY_FILES) except OpenClawGatewayError: pass return set(DEFAULT_GATEWAY_FILES) async def _gateway_agent_files_index( agent_id: str, config: GatewayClientConfig ) -> dict[str, dict[str, Any]]: try: payload = await openclaw_call("agents.files.list", {"agentId": agent_id}, config=config) if isinstance(payload, dict): files = payload.get("files") or [] return { item.get("name"): item for item in files if isinstance(item, dict) and item.get("name") } except OpenClawGatewayError: pass return {} def _render_agent_files( context: dict[str, str], agent: Agent, file_names: set[str], *, include_bootstrap: bool, ) -> dict[str, str]: env = _template_env() overrides: dict[str, str] = {} if agent.identity_template: overrides["IDENTITY.md"] = agent.identity_template if agent.soul_template: overrides["SOUL.md"] = agent.soul_template rendered: dict[str, str] = {} for name in sorted(file_names): if name == "BOOTSTRAP.md" and not include_bootstrap: continue if name == "MEMORY.md": rendered[name] = "# MEMORY\n\nBootstrap pending.\n" continue override = overrides.get(name) if override: rendered[name] = env.from_string(override).render(**context).strip() continue path = _templates_root() / name if path.exists(): rendered[name] = env.get_template(name).render(**context).strip() continue rendered[name] = "" return rendered async def _patch_gateway_agent_list( agent_id: str, workspace_path: str, heartbeat: dict[str, Any], config: GatewayClientConfig, ) -> None: cfg = await openclaw_call("config.get", config=config) if not isinstance(cfg, dict): raise OpenClawGatewayError("config.get returned invalid payload") base_hash = cfg.get("hash") data = cfg.get("config") or cfg.get("parsed") or {} if not isinstance(data, dict): raise OpenClawGatewayError("config.get returned invalid config") agents = data.get("agents") or {} lst = agents.get("list") or [] if not isinstance(lst, list): raise OpenClawGatewayError("config agents.list is not a list") updated = False new_list: list[dict[str, Any]] = [] for entry in lst: if isinstance(entry, dict) and entry.get("id") == agent_id: new_entry = dict(entry) new_entry["workspace"] = workspace_path new_entry["heartbeat"] = heartbeat new_list.append(new_entry) updated = True else: new_list.append(entry) if not updated: new_list.append({"id": agent_id, "workspace": workspace_path, "heartbeat": heartbeat}) patch = {"agents": {"list": new_list}} params = {"raw": json.dumps(patch)} if base_hash: params["baseHash"] = base_hash await openclaw_call("config.patch", params, config=config) async def _remove_gateway_agent_list( agent_id: str, config: GatewayClientConfig, ) -> None: cfg = await openclaw_call("config.get", config=config) if not isinstance(cfg, dict): raise OpenClawGatewayError("config.get returned invalid payload") base_hash = cfg.get("hash") data = cfg.get("config") or cfg.get("parsed") or {} if not isinstance(data, dict): raise OpenClawGatewayError("config.get returned invalid config") agents = data.get("agents") or {} lst = agents.get("list") or [] if not isinstance(lst, list): raise OpenClawGatewayError("config agents.list is not a list") new_list = [entry for entry in lst if not (isinstance(entry, dict) and entry.get("id") == agent_id)] if len(new_list) == len(lst): return patch = {"agents": {"list": new_list}} params = {"raw": json.dumps(patch)} if base_hash: params["baseHash"] = base_hash await openclaw_call("config.patch", params, config=config) async def _get_gateway_agent_entry( agent_id: str, config: GatewayClientConfig, ) -> dict[str, Any] | None: cfg = await openclaw_call("config.get", config=config) if not isinstance(cfg, dict): return None data = cfg.get("config") or cfg.get("parsed") or {} if not isinstance(data, dict): return None agents = data.get("agents") or {} lst = agents.get("list") or [] if not isinstance(lst, list): return None for entry in lst: if isinstance(entry, dict) and entry.get("id") == agent_id: return entry return None async def provision_agent( agent: Agent, board: Board, gateway: Gateway, auth_token: str, user: User | None, *, action: str = "provision", ) -> None: if not gateway.url: return if not gateway.workspace_root: raise ValueError("gateway_workspace_root is required") client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) session_key = _session_key(agent) await ensure_session(session_key, config=client_config, label=agent.name) agent_id = _agent_key(agent) workspace_path = _workspace_path(agent.name, gateway.workspace_root) heartbeat = _heartbeat_config(agent) await _patch_gateway_agent_list(agent_id, workspace_path, heartbeat, client_config) context = _build_context(agent, board, gateway, auth_token, user) supported = await _supported_gateway_files(client_config) existing_files = await _gateway_agent_files_index(agent_id, client_config) include_bootstrap = True if action == "update": if not existing_files: include_bootstrap = False else: entry = existing_files.get("BOOTSTRAP.md") if entry and entry.get("missing") is True: include_bootstrap = False rendered = _render_agent_files( context, agent, supported, include_bootstrap=include_bootstrap, ) for name, content in rendered.items(): if content == "": continue await openclaw_call( "agents.files.set", {"agentId": agent_id, "name": name, "content": content}, config=client_config, ) async def cleanup_agent( agent: Agent, gateway: Gateway, ) -> str | None: if not gateway.url: return if not gateway.workspace_root: raise ValueError("gateway_workspace_root is required") client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) agent_id = _agent_key(agent) entry = await _get_gateway_agent_entry(agent_id, client_config) await _remove_gateway_agent_list(agent_id, client_config) session_key = _session_key(agent) await openclaw_call("sessions.delete", {"key": session_key}, config=client_config) workspace_path = entry.get("workspace") if entry else None if not workspace_path: workspace_path = _workspace_path(agent.name, gateway.workspace_root) return workspace_path