Files
openclaw-mission-control/backend/app/services/openclaw/device_identity.py

168 lines
5.2 KiB
Python

"""OpenClaw-compatible device identity and connect-signature helpers."""
from __future__ import annotations
import hashlib
import json
import os
from dataclasses import dataclass
from pathlib import Path
from time import time
from typing import Any, cast
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
DEFAULT_DEVICE_IDENTITY_PATH = Path.home() / ".openclaw" / "identity" / "device.json"
@dataclass(frozen=True)
class DeviceIdentity:
"""Persisted gateway device identity used for connect signatures."""
device_id: str
public_key_pem: str
private_key_pem: str
def _identity_path() -> Path:
raw = os.getenv("OPENCLAW_GATEWAY_DEVICE_IDENTITY_PATH", "").strip()
if raw:
return Path(raw).expanduser().resolve()
return DEFAULT_DEVICE_IDENTITY_PATH
def _base64url_encode(raw: bytes) -> str:
import base64
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
def _derive_public_key_raw(public_key_pem: str) -> bytes:
loaded = serialization.load_pem_public_key(public_key_pem.encode("utf-8"))
if not isinstance(loaded, Ed25519PublicKey):
msg = "device identity public key is not Ed25519"
raise ValueError(msg)
return loaded.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
def _derive_device_id(public_key_pem: str) -> str:
return hashlib.sha256(_derive_public_key_raw(public_key_pem)).hexdigest()
def _write_identity(path: Path, identity: DeviceIdentity) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"version": 1,
"deviceId": identity.device_id,
"publicKeyPem": identity.public_key_pem,
"privateKeyPem": identity.private_key_pem,
"createdAtMs": int(time() * 1000),
}
path.write_text(f"{json.dumps(payload, indent=2)}\n", encoding="utf-8")
try:
path.chmod(0o600)
except OSError:
# Best effort on platforms/filesystems that ignore chmod.
pass
def _generate_identity() -> DeviceIdentity:
private_key = Ed25519PrivateKey.generate()
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
public_key_pem = (
private_key.public_key()
.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("utf-8")
)
device_id = _derive_device_id(public_key_pem)
return DeviceIdentity(
device_id=device_id,
public_key_pem=public_key_pem,
private_key_pem=private_key_pem,
)
def load_or_create_device_identity() -> DeviceIdentity:
"""Load persisted device identity or create a new one when missing/invalid."""
path = _identity_path()
try:
if path.exists():
payload = cast(dict[str, Any], json.loads(path.read_text(encoding="utf-8")))
device_id = str(payload.get("deviceId") or "").strip()
public_key_pem = str(payload.get("publicKeyPem") or "").strip()
private_key_pem = str(payload.get("privateKeyPem") or "").strip()
if device_id and public_key_pem and private_key_pem:
derived_id = _derive_device_id(public_key_pem)
identity = DeviceIdentity(
device_id=derived_id,
public_key_pem=public_key_pem,
private_key_pem=private_key_pem,
)
if derived_id != device_id:
_write_identity(path, identity)
return identity
except (OSError, ValueError, json.JSONDecodeError):
# Fall through to regenerate.
pass
identity = _generate_identity()
_write_identity(path, identity)
return identity
def public_key_raw_base64url_from_pem(public_key_pem: str) -> str:
"""Return raw Ed25519 public key in base64url form expected by OpenClaw."""
return _base64url_encode(_derive_public_key_raw(public_key_pem))
def sign_device_payload(private_key_pem: str, payload: str) -> str:
"""Sign a device payload with Ed25519 and return base64url signature."""
loaded = serialization.load_pem_private_key(private_key_pem.encode("utf-8"), password=None)
if not isinstance(loaded, Ed25519PrivateKey):
msg = "device identity private key is not Ed25519"
raise ValueError(msg)
signature = loaded.sign(payload.encode("utf-8"))
return _base64url_encode(signature)
def build_device_auth_payload(
*,
device_id: str,
client_id: str,
client_mode: str,
role: str,
scopes: list[str],
signed_at_ms: int,
token: str | None,
nonce: str | None,
) -> str:
"""Build the OpenClaw canonical payload string for device signatures."""
version = "v2" if nonce else "v1"
parts = [
version,
device_id,
client_id,
client_mode,
role,
",".join(scopes),
str(signed_at_ms),
token or "",
]
if version == "v2":
parts.append(nonce or "")
return "|".join(parts)