feat: add is_board_lead property to agent and board types
This commit is contained in:
@@ -3,13 +3,16 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
from fastapi import Depends, Header, HTTPException, status
|
||||
from fastapi import Depends, Header, HTTPException, Request, status
|
||||
import logging
|
||||
from sqlmodel import Session, col, select
|
||||
|
||||
from app.core.agent_tokens import verify_agent_token
|
||||
from app.db.session import get_session
|
||||
from app.models.agents import Agent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentAuthContext:
|
||||
@@ -25,25 +28,78 @@ def _find_agent_for_token(session: Session, token: str) -> Agent | None:
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_agent_token(
|
||||
agent_token: str | None,
|
||||
authorization: str | None,
|
||||
*,
|
||||
accept_authorization: bool = True,
|
||||
) -> str | None:
|
||||
if agent_token:
|
||||
return agent_token
|
||||
if not accept_authorization:
|
||||
return None
|
||||
if not authorization:
|
||||
return None
|
||||
value = authorization.strip()
|
||||
if not value:
|
||||
return None
|
||||
if value.lower().startswith("bearer "):
|
||||
return value.split(" ", 1)[1].strip() or None
|
||||
return None
|
||||
|
||||
|
||||
def get_agent_auth_context(
|
||||
request: Request,
|
||||
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
|
||||
authorization: str | None = Header(default=None, alias="Authorization"),
|
||||
session: Session = Depends(get_session),
|
||||
) -> AgentAuthContext:
|
||||
if not agent_token:
|
||||
resolved = _resolve_agent_token(agent_token, authorization, accept_authorization=True)
|
||||
if not resolved:
|
||||
logger.warning(
|
||||
"agent auth missing token path=%s x_agent=%s authorization=%s",
|
||||
request.url.path,
|
||||
bool(agent_token),
|
||||
bool(authorization),
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
||||
agent = _find_agent_for_token(session, agent_token)
|
||||
agent = _find_agent_for_token(session, resolved)
|
||||
if agent is None:
|
||||
logger.warning(
|
||||
"agent auth invalid token path=%s token_prefix=%s",
|
||||
request.url.path,
|
||||
resolved[:6],
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
||||
return AgentAuthContext(actor_type="agent", agent=agent)
|
||||
|
||||
|
||||
def get_agent_auth_context_optional(
|
||||
request: Request,
|
||||
agent_token: str | None = Header(default=None, alias="X-Agent-Token"),
|
||||
authorization: str | None = Header(default=None, alias="Authorization"),
|
||||
session: Session = Depends(get_session),
|
||||
) -> AgentAuthContext | None:
|
||||
if not agent_token:
|
||||
resolved = _resolve_agent_token(
|
||||
agent_token,
|
||||
authorization,
|
||||
accept_authorization=False,
|
||||
)
|
||||
if not resolved:
|
||||
if agent_token:
|
||||
logger.warning(
|
||||
"agent auth optional missing token path=%s x_agent=%s authorization=%s",
|
||||
request.url.path,
|
||||
bool(agent_token),
|
||||
bool(authorization),
|
||||
)
|
||||
return None
|
||||
agent = _find_agent_for_token(session, agent_token)
|
||||
agent = _find_agent_for_token(session, resolved)
|
||||
if agent is None:
|
||||
logger.warning(
|
||||
"agent auth optional invalid token path=%s token_prefix=%s",
|
||||
request.url.path,
|
||||
resolved[:6],
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
||||
return AgentAuthContext(actor_type="agent", agent=agent)
|
||||
|
||||
Reference in New Issue
Block a user