feat: enhance agent management with session handling and UI improvements

This commit is contained in:
Abhimanyu Saharan
2026-02-04 14:58:20 +05:30
parent a33c539860
commit b0e3208fa3
3 changed files with 136 additions and 0 deletions

View File

@@ -0,0 +1,51 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
from fastapi import Depends, Header, HTTPException, status
from sqlmodel import Session, col, select
from app.core.agent_tokens import verify_agent_token
from app.db.session import get_session
from app.models.agents import Agent
@dataclass
class AgentAuthContext:
actor_type: Literal["agent"]
agent: Agent
def _find_agent_for_token(session: Session, token: str) -> Agent | None:
agents = list(
session.exec(select(Agent).where(col(Agent.agent_token_hash).is_not(None)))
)
for agent in agents:
if agent.agent_token_hash and verify_agent_token(token, agent.agent_token_hash):
return agent
return None
def get_agent_auth_context(
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
session: Session = Depends(get_session),
) -> AgentAuthContext:
if not agent_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
agent = _find_agent_for_token(session, agent_token)
if agent is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return AgentAuthContext(actor_type="agent", agent=agent)
def get_agent_auth_context_optional(
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
session: Session = Depends(get_session),
) -> AgentAuthContext | None:
if not agent_token:
return None
agent = _find_agent_for_token(session, agent_token)
if agent is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return AgentAuthContext(actor_type="agent", agent=agent)

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
import base64
import hashlib
import hmac
import secrets
ITERATIONS = 200_000
SALT_BYTES = 16
def generate_agent_token() -> str:
return secrets.token_urlsafe(32)
def _b64encode(value: bytes) -> str:
return base64.urlsafe_b64encode(value).decode("utf-8").rstrip("=")
def _b64decode(value: str) -> bytes:
padding = "=" * (-len(value) % 4)
return base64.urlsafe_b64decode(value + padding)
def hash_agent_token(token: str) -> str:
salt = secrets.token_bytes(SALT_BYTES)
digest = hashlib.pbkdf2_hmac("sha256", token.encode("utf-8"), salt, ITERATIONS)
return f"pbkdf2_sha256${ITERATIONS}${_b64encode(salt)}${_b64encode(digest)}"
def verify_agent_token(token: str, stored_hash: str) -> bool:
try:
algorithm, iterations, salt_b64, digest_b64 = stored_hash.split("$")
except ValueError:
return False
if algorithm != "pbkdf2_sha256":
return False
try:
iterations_int = int(iterations)
except ValueError:
return False
salt = _b64decode(salt_b64)
expected_digest = _b64decode(digest_b64)
candidate = hashlib.pbkdf2_hmac(
"sha256", token.encode("utf-8"), salt, iterations_int
)
return hmac.compare_digest(candidate, expected_digest)