Files
openclaw-mission-control/backend/app/core/agent_auth.py
2026-02-05 19:06:32 +05:30

106 lines
3.3 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
from fastapi import Depends, Header, HTTPException, Request, status
import logging
from sqlmodel import Session, col, select
from app.core.agent_tokens import verify_agent_token
from app.db.session import get_session
from app.models.agents import Agent
logger = logging.getLogger(__name__)
@dataclass
class AgentAuthContext:
actor_type: Literal["agent"]
agent: Agent
def _find_agent_for_token(session: Session, token: str) -> Agent | None:
agents = list(session.exec(select(Agent).where(col(Agent.agent_token_hash).is_not(None))))
for agent in agents:
if agent.agent_token_hash and verify_agent_token(token, agent.agent_token_hash):
return agent
return None
def _resolve_agent_token(
agent_token: str | None,
authorization: str | None,
*,
accept_authorization: bool = True,
) -> str | None:
if agent_token:
return agent_token
if not accept_authorization:
return None
if not authorization:
return None
value = authorization.strip()
if not value:
return None
if value.lower().startswith("bearer "):
return value.split(" ", 1)[1].strip() or None
return None
def get_agent_auth_context(
request: Request,
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
authorization: str | None = Header(default=None, alias="Authorization"),
session: Session = Depends(get_session),
) -> AgentAuthContext:
resolved = _resolve_agent_token(agent_token, authorization, accept_authorization=True)
if not resolved:
logger.warning(
"agent auth missing token path=%s x_agent=%s authorization=%s",
request.url.path,
bool(agent_token),
bool(authorization),
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
agent = _find_agent_for_token(session, resolved)
if agent is None:
logger.warning(
"agent auth invalid token path=%s token_prefix=%s",
request.url.path,
resolved[:6],
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return AgentAuthContext(actor_type="agent", agent=agent)
def get_agent_auth_context_optional(
request: Request,
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
authorization: str | None = Header(default=None, alias="Authorization"),
session: Session = Depends(get_session),
) -> AgentAuthContext | None:
resolved = _resolve_agent_token(
agent_token,
authorization,
accept_authorization=False,
)
if not resolved:
if agent_token:
logger.warning(
"agent auth optional missing token path=%s x_agent=%s authorization=%s",
request.url.path,
bool(agent_token),
bool(authorization),
)
return None
agent = _find_agent_for_token(session, resolved)
if agent is None:
logger.warning(
"agent auth optional invalid token path=%s token_prefix=%s",
request.url.path,
resolved[:6],
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return AgentAuthContext(actor_type="agent", agent=agent)