feat: add disable_device_pairing option to gateway configuration
This commit is contained in:
67
backend/tests/test_gateway_device_identity.py
Normal file
67
backend/tests/test_gateway_device_identity.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
|
||||
import pytest
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
|
||||
|
||||
from app.services.openclaw.device_identity import (
|
||||
build_device_auth_payload,
|
||||
load_or_create_device_identity,
|
||||
sign_device_payload,
|
||||
)
|
||||
|
||||
|
||||
def _base64url_decode(value: str) -> bytes:
|
||||
padding = "=" * ((4 - len(value) % 4) % 4)
|
||||
return base64.urlsafe_b64decode(f"{value}{padding}")
|
||||
|
||||
|
||||
def test_load_or_create_device_identity_persists_same_identity(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
tmp_path,
|
||||
) -> None:
|
||||
identity_path = tmp_path / "identity" / "device.json"
|
||||
monkeypatch.setenv("OPENCLAW_GATEWAY_DEVICE_IDENTITY_PATH", str(identity_path))
|
||||
|
||||
first = load_or_create_device_identity()
|
||||
second = load_or_create_device_identity()
|
||||
|
||||
assert identity_path.exists()
|
||||
assert first.device_id == second.device_id
|
||||
assert first.public_key_pem.strip() == second.public_key_pem.strip()
|
||||
assert first.private_key_pem.strip() == second.private_key_pem.strip()
|
||||
|
||||
|
||||
def test_build_device_auth_payload_uses_nonce_for_v2() -> None:
|
||||
payload = build_device_auth_payload(
|
||||
device_id="dev",
|
||||
client_id="gateway-client",
|
||||
client_mode="backend",
|
||||
role="operator",
|
||||
scopes=["operator.read", "operator.admin"],
|
||||
signed_at_ms=123,
|
||||
token="token",
|
||||
nonce="nonce-xyz",
|
||||
)
|
||||
|
||||
assert payload == (
|
||||
"v2|dev|gateway-client|backend|operator|operator.read,operator.admin|123|token|nonce-xyz"
|
||||
)
|
||||
|
||||
|
||||
def test_sign_device_payload_produces_valid_ed25519_signature(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
tmp_path,
|
||||
) -> None:
|
||||
identity_path = tmp_path / "identity" / "device.json"
|
||||
monkeypatch.setenv("OPENCLAW_GATEWAY_DEVICE_IDENTITY_PATH", str(identity_path))
|
||||
identity = load_or_create_device_identity()
|
||||
|
||||
payload = "v1|device|client|backend|operator|operator.read|1|token"
|
||||
signature = sign_device_payload(identity.private_key_pem, payload)
|
||||
|
||||
loaded = serialization.load_pem_public_key(identity.public_key_pem.encode("utf-8"))
|
||||
assert isinstance(loaded, Ed25519PublicKey)
|
||||
loaded.verify(_base64url_decode(signature), payload.encode("utf-8"))
|
||||
64
backend/tests/test_gateway_resolver.py
Normal file
64
backend/tests/test_gateway_resolver.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# ruff: noqa: S101
|
||||
from __future__ import annotations
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
from app.models.gateways import Gateway
|
||||
from app.services.openclaw.gateway_resolver import (
|
||||
gateway_client_config,
|
||||
optional_gateway_client_config,
|
||||
)
|
||||
from app.services.openclaw.session_service import GatewaySessionService
|
||||
|
||||
|
||||
def _gateway(
|
||||
*,
|
||||
disable_device_pairing: bool,
|
||||
url: str = "ws://gateway.example:18789/ws",
|
||||
token: str | None = " secret-token ",
|
||||
) -> Gateway:
|
||||
return Gateway(
|
||||
id=uuid4(),
|
||||
organization_id=uuid4(),
|
||||
name="Primary gateway",
|
||||
url=url,
|
||||
token=token,
|
||||
workspace_root="~/.openclaw",
|
||||
disable_device_pairing=disable_device_pairing,
|
||||
)
|
||||
|
||||
|
||||
def test_gateway_client_config_maps_disable_device_pairing() -> None:
|
||||
config = gateway_client_config(_gateway(disable_device_pairing=True))
|
||||
|
||||
assert config.url == "ws://gateway.example:18789/ws"
|
||||
assert config.token == "secret-token"
|
||||
assert config.disable_device_pairing is True
|
||||
|
||||
|
||||
def test_optional_gateway_client_config_maps_disable_device_pairing() -> None:
|
||||
config = optional_gateway_client_config(_gateway(disable_device_pairing=False))
|
||||
|
||||
assert config is not None
|
||||
assert config.disable_device_pairing is False
|
||||
|
||||
|
||||
def test_optional_gateway_client_config_returns_none_for_missing_or_blank_url() -> None:
|
||||
assert optional_gateway_client_config(None) is None
|
||||
assert (
|
||||
optional_gateway_client_config(
|
||||
_gateway(disable_device_pairing=False, url=" "),
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
def test_to_resolve_query_keeps_gateway_disable_device_pairing_value() -> None:
|
||||
resolved = GatewaySessionService.to_resolve_query(
|
||||
board_id=None,
|
||||
gateway_url="ws://gateway.example:18789/ws",
|
||||
gateway_token="secret-token",
|
||||
gateway_disable_device_pairing=True,
|
||||
)
|
||||
|
||||
assert resolved.gateway_disable_device_pairing is True
|
||||
@@ -1,24 +1,194 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
import app.services.openclaw.gateway_rpc as gateway_rpc
|
||||
from app.services.openclaw.gateway_rpc import (
|
||||
CONTROL_UI_CLIENT_ID,
|
||||
CONTROL_UI_CLIENT_MODE,
|
||||
DEFAULT_GATEWAY_CLIENT_ID,
|
||||
DEFAULT_GATEWAY_CLIENT_MODE,
|
||||
GATEWAY_OPERATOR_SCOPES,
|
||||
GatewayConfig,
|
||||
OpenClawGatewayError,
|
||||
_build_connect_params,
|
||||
_build_control_ui_origin,
|
||||
openclaw_call,
|
||||
)
|
||||
|
||||
|
||||
def test_build_connect_params_sets_explicit_operator_role_and_scopes() -> None:
|
||||
def test_build_connect_params_defaults_to_device_pairing(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
captured: dict[str, object] = {}
|
||||
expected_device_payload = {
|
||||
"id": "device-id",
|
||||
"publicKey": "public-key",
|
||||
"signature": "signature",
|
||||
"signedAt": 1,
|
||||
}
|
||||
|
||||
def _fake_build_device_connect_payload(
|
||||
*,
|
||||
client_id: str,
|
||||
client_mode: str,
|
||||
role: str,
|
||||
scopes: list[str],
|
||||
auth_token: str | None,
|
||||
connect_nonce: str | None,
|
||||
) -> dict[str, object]:
|
||||
captured["client_id"] = client_id
|
||||
captured["client_mode"] = client_mode
|
||||
captured["role"] = role
|
||||
captured["scopes"] = scopes
|
||||
captured["auth_token"] = auth_token
|
||||
captured["connect_nonce"] = connect_nonce
|
||||
return expected_device_payload
|
||||
|
||||
monkeypatch.setattr(
|
||||
gateway_rpc,
|
||||
"_build_device_connect_payload",
|
||||
_fake_build_device_connect_payload,
|
||||
)
|
||||
|
||||
params = _build_connect_params(GatewayConfig(url="ws://gateway.example/ws"))
|
||||
|
||||
assert params["role"] == "operator"
|
||||
assert params["scopes"] == list(GATEWAY_OPERATOR_SCOPES)
|
||||
assert params["client"]["id"] == DEFAULT_GATEWAY_CLIENT_ID
|
||||
assert params["client"]["mode"] == DEFAULT_GATEWAY_CLIENT_MODE
|
||||
assert params["device"] == expected_device_payload
|
||||
assert "auth" not in params
|
||||
assert captured["client_id"] == DEFAULT_GATEWAY_CLIENT_ID
|
||||
assert captured["client_mode"] == DEFAULT_GATEWAY_CLIENT_MODE
|
||||
assert captured["role"] == "operator"
|
||||
assert captured["scopes"] == list(GATEWAY_OPERATOR_SCOPES)
|
||||
assert captured["auth_token"] is None
|
||||
assert captured["connect_nonce"] is None
|
||||
|
||||
|
||||
def test_build_connect_params_includes_auth_token_when_provided() -> None:
|
||||
def test_build_connect_params_uses_control_ui_when_pairing_disabled() -> None:
|
||||
params = _build_connect_params(
|
||||
GatewayConfig(url="ws://gateway.example/ws", token="secret-token"),
|
||||
GatewayConfig(
|
||||
url="ws://gateway.example/ws",
|
||||
token="secret-token",
|
||||
disable_device_pairing=True,
|
||||
),
|
||||
)
|
||||
|
||||
assert params["auth"] == {"token": "secret-token"}
|
||||
assert params["scopes"] == list(GATEWAY_OPERATOR_SCOPES)
|
||||
assert params["client"]["id"] == CONTROL_UI_CLIENT_ID
|
||||
assert params["client"]["mode"] == CONTROL_UI_CLIENT_MODE
|
||||
assert "device" not in params
|
||||
|
||||
|
||||
def test_build_connect_params_passes_nonce_to_device_payload(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def _fake_build_device_connect_payload(
|
||||
*,
|
||||
client_id: str,
|
||||
client_mode: str,
|
||||
role: str,
|
||||
scopes: list[str],
|
||||
auth_token: str | None,
|
||||
connect_nonce: str | None,
|
||||
) -> dict[str, object]:
|
||||
captured["client_id"] = client_id
|
||||
captured["client_mode"] = client_mode
|
||||
captured["role"] = role
|
||||
captured["scopes"] = scopes
|
||||
captured["auth_token"] = auth_token
|
||||
captured["connect_nonce"] = connect_nonce
|
||||
return {"id": "device-id", "nonce": connect_nonce}
|
||||
|
||||
monkeypatch.setattr(
|
||||
gateway_rpc,
|
||||
"_build_device_connect_payload",
|
||||
_fake_build_device_connect_payload,
|
||||
)
|
||||
|
||||
params = _build_connect_params(
|
||||
GatewayConfig(url="ws://gateway.example/ws", token="secret-token"),
|
||||
connect_nonce="nonce-xyz",
|
||||
)
|
||||
|
||||
assert params["auth"] == {"token": "secret-token"}
|
||||
assert params["client"]["id"] == DEFAULT_GATEWAY_CLIENT_ID
|
||||
assert params["client"]["mode"] == DEFAULT_GATEWAY_CLIENT_MODE
|
||||
assert params["device"] == {"id": "device-id", "nonce": "nonce-xyz"}
|
||||
assert captured["client_id"] == DEFAULT_GATEWAY_CLIENT_ID
|
||||
assert captured["client_mode"] == DEFAULT_GATEWAY_CLIENT_MODE
|
||||
assert captured["role"] == "operator"
|
||||
assert captured["scopes"] == list(GATEWAY_OPERATOR_SCOPES)
|
||||
assert captured["auth_token"] == "secret-token"
|
||||
assert captured["connect_nonce"] == "nonce-xyz"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("gateway_url", "expected_origin"),
|
||||
[
|
||||
("ws://gateway.example/ws", "http://gateway.example"),
|
||||
("wss://gateway.example/ws", "https://gateway.example"),
|
||||
("ws://gateway.example:8080/ws", "http://gateway.example:8080"),
|
||||
("wss://gateway.example:8443/ws", "https://gateway.example:8443"),
|
||||
("ws://[::1]:8000/ws", "http://[::1]:8000"),
|
||||
],
|
||||
)
|
||||
def test_build_control_ui_origin(gateway_url: str, expected_origin: str) -> None:
|
||||
assert _build_control_ui_origin(gateway_url) == expected_origin
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openclaw_call_uses_single_connect_attempt(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
call_count = 0
|
||||
|
||||
async def _fake_call_once(
|
||||
method: str,
|
||||
params: dict[str, object] | None,
|
||||
*,
|
||||
config: GatewayConfig,
|
||||
gateway_url: str,
|
||||
) -> object:
|
||||
nonlocal call_count
|
||||
del method, params, config, gateway_url
|
||||
call_count += 1
|
||||
return {"ok": True}
|
||||
|
||||
monkeypatch.setattr(gateway_rpc, "_openclaw_call_once", _fake_call_once)
|
||||
|
||||
payload = await openclaw_call(
|
||||
"status",
|
||||
config=GatewayConfig(url="ws://gateway.example/ws"),
|
||||
)
|
||||
|
||||
assert payload == {"ok": True}
|
||||
assert call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openclaw_call_surfaces_scope_error_without_device_fallback(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
async def _fake_call_once(
|
||||
method: str,
|
||||
params: dict[str, object] | None,
|
||||
*,
|
||||
config: GatewayConfig,
|
||||
gateway_url: str,
|
||||
) -> object:
|
||||
del method, params, config, gateway_url
|
||||
raise OpenClawGatewayError("missing scope: operator.read")
|
||||
|
||||
monkeypatch.setattr(gateway_rpc, "_openclaw_call_once", _fake_call_once)
|
||||
|
||||
with pytest.raises(OpenClawGatewayError, match="missing scope: operator.read"):
|
||||
await openclaw_call(
|
||||
"status",
|
||||
config=GatewayConfig(url="ws://gateway.example/ws", token="secret-token"),
|
||||
)
|
||||
|
||||
@@ -200,6 +200,24 @@ async def test_admin_service_maps_gateway_transport_errors(
|
||||
assert "compatibility check failed" in str(exc_info.value.detail).lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_service_maps_gateway_scope_errors_with_guidance(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
async def _fake_check(config: GatewayConfig, *, minimum_version: str | None = None) -> object:
|
||||
_ = (config, minimum_version)
|
||||
raise OpenClawGatewayError("missing scope: operator.read")
|
||||
|
||||
monkeypatch.setattr(admin_service, "check_gateway_runtime_compatibility", _fake_check)
|
||||
|
||||
service = GatewayAdminLifecycleService(session=object()) # type: ignore[arg-type]
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await service.assert_gateway_runtime_compatible(url="ws://gateway.example/ws", token=None)
|
||||
|
||||
assert exc_info.value.status_code == 502
|
||||
assert "missing required scope `operator.read`" in str(exc_info.value.detail)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gateway_status_reports_incompatible_version(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
@@ -226,6 +244,28 @@ async def test_gateway_status_reports_incompatible_version(
|
||||
assert response.error == "Gateway version 2026.1.0 is not supported."
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gateway_status_surfaces_scope_error_guidance(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
async def _fake_check(config: GatewayConfig, *, minimum_version: str | None = None) -> object:
|
||||
_ = (config, minimum_version)
|
||||
raise OpenClawGatewayError("missing scope: operator.read")
|
||||
|
||||
monkeypatch.setattr(session_service, "check_gateway_runtime_compatibility", _fake_check)
|
||||
|
||||
service = GatewaySessionService(session=object()) # type: ignore[arg-type]
|
||||
response = await service.get_status(
|
||||
params=GatewayResolveQuery(gateway_url="ws://gateway.example/ws"),
|
||||
organization_id=uuid4(),
|
||||
user=None,
|
||||
)
|
||||
|
||||
assert response.connected is False
|
||||
assert response.error is not None
|
||||
assert "missing required scope `operator.read`" in response.error
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gateway_status_returns_sessions_when_version_compatible(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
|
||||
Reference in New Issue
Block a user