feat: enhance logging configuration and add request logging context
This commit is contained in:
@@ -14,6 +14,8 @@ POSTGRES_PORT=5432
|
||||
# --- backend settings (see backend/.env.example for full list) ---
|
||||
CORS_ORIGINS=http://localhost:3000
|
||||
DB_AUTO_MIGRATE=true
|
||||
LOG_LEVEL=INFO
|
||||
REQUEST_LOG_SLOW_MS=1000
|
||||
|
||||
# --- frontend settings ---
|
||||
# REQUIRED: Public URL used by the browser to reach the API.
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
ENVIRONMENT=dev
|
||||
LOG_LEVEL=INFO
|
||||
LOG_FORMAT=text
|
||||
LOG_USE_UTC=false
|
||||
REQUEST_LOG_SLOW_MS=1000
|
||||
REQUEST_LOG_INCLUDE_HEALTH=false
|
||||
DATABASE_URL=postgresql+psycopg://postgres:postgres@localhost:5432/mission_control
|
||||
CORS_ORIGINS=http://localhost:3000
|
||||
BASE_URL=
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
@@ -18,6 +17,7 @@ from app.api.deps import (
|
||||
require_admin_or_agent,
|
||||
)
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.core.time import utcnow
|
||||
from app.db.session import get_session
|
||||
from app.models.board_onboarding import BoardOnboardingSession
|
||||
@@ -49,7 +49,7 @@ if TYPE_CHECKING:
|
||||
from app.models.boards import Board
|
||||
|
||||
router = APIRouter(prefix="/boards/{board_id}/onboarding", tags=["board-onboarding"])
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_logger(__name__)
|
||||
BOARD_USER_READ_DEP = Depends(get_board_for_user_read)
|
||||
BOARD_USER_WRITE_DEP = Depends(get_board_for_user_write)
|
||||
BOARD_OR_404_DEP = Depends(get_board_or_404)
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
@@ -11,6 +10,7 @@ from fastapi import Depends, Header, HTTPException, Request, status
|
||||
from sqlmodel import col, select
|
||||
|
||||
from app.core.agent_tokens import verify_agent_token
|
||||
from app.core.logging import get_logger
|
||||
from app.core.time import utcnow
|
||||
from app.db.session import get_session
|
||||
from app.models.agents import Agent
|
||||
@@ -18,7 +18,7 @@ from app.models.agents import Agent
|
||||
if TYPE_CHECKING:
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_LAST_SEEN_TOUCH_INTERVAL = timedelta(seconds=30)
|
||||
_SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"})
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
@@ -17,6 +16,7 @@ from pydantic import BaseModel, ValidationError
|
||||
from starlette.concurrency import run_in_threadpool
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.db import crud
|
||||
from app.db.session import get_session
|
||||
from app.models.users import User
|
||||
@@ -25,7 +25,7 @@ if TYPE_CHECKING:
|
||||
from clerk_backend_api.models.user import User as ClerkUser
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_logger(__name__)
|
||||
security = HTTPBearer(auto_error=False)
|
||||
SECURITY_DEP = Depends(security)
|
||||
SESSION_DEP = Depends(get_session)
|
||||
@@ -331,9 +331,8 @@ async def _get_or_sync_user(
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"auth.user.sync clerk_user_id=%s updated=%s fetched_profile=%s",
|
||||
"auth.user.sync.noop clerk_user_id=%s fetched_profile=%s",
|
||||
clerk_user_id_log,
|
||||
changed,
|
||||
should_fetch_profile,
|
||||
)
|
||||
if not user.email:
|
||||
|
||||
@@ -42,6 +42,8 @@ class Settings(BaseSettings):
|
||||
log_level: str = "INFO"
|
||||
log_format: str = "text"
|
||||
log_use_utc: bool = False
|
||||
request_log_slow_ms: int = Field(default=1000, ge=0)
|
||||
request_log_include_health: bool = False
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _defaults(self) -> Self:
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Awaitable, Callable
|
||||
from time import perf_counter
|
||||
from typing import TYPE_CHECKING, Any, Final
|
||||
from uuid import uuid4
|
||||
|
||||
@@ -13,12 +13,23 @@ from fastapi.responses import JSONResponse
|
||||
from starlette.exceptions import HTTPException as StarletteHTTPException
|
||||
from starlette.responses import Response
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import (
|
||||
TRACE_LEVEL,
|
||||
get_logger,
|
||||
reset_request_id,
|
||||
reset_request_route_context,
|
||||
set_request_id,
|
||||
set_request_route_context,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from starlette.types import ASGIApp, Message, Receive, Scope, Send
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_logger(__name__)
|
||||
|
||||
REQUEST_ID_HEADER: Final[str] = "X-Request-Id"
|
||||
_HEALTH_CHECK_PATHS: Final[frozenset[str]] = frozenset({"/health", "/healthz", "/readyz"})
|
||||
|
||||
ExceptionHandler = Callable[[Request, Exception], Response | Awaitable[Response]]
|
||||
|
||||
@@ -31,6 +42,8 @@ class RequestIdMiddleware:
|
||||
self._app = app
|
||||
self._header_name = header_name
|
||||
self._header_name_bytes = header_name.lower().encode("latin-1")
|
||||
self._slow_request_ms = settings.request_log_slow_ms
|
||||
self._include_health_logs = settings.request_log_include_health
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
|
||||
"""Inject request-id into request state and response headers."""
|
||||
@@ -38,18 +51,80 @@ class RequestIdMiddleware:
|
||||
await self._app(scope, receive, send)
|
||||
return
|
||||
|
||||
method = str(scope.get("method") or "UNKNOWN").upper()
|
||||
path = str(scope.get("path") or "")
|
||||
client = scope.get("client")
|
||||
client_ip: str | None = None
|
||||
if isinstance(client, tuple) and client and isinstance(client[0], str):
|
||||
client_ip = client[0]
|
||||
should_log = self._include_health_logs or path not in _HEALTH_CHECK_PATHS
|
||||
started_at = perf_counter()
|
||||
status_code: int | None = None
|
||||
|
||||
request_id = self._get_or_create_request_id(scope)
|
||||
context_token = set_request_id(request_id)
|
||||
route_context_tokens = set_request_route_context(method, path)
|
||||
if should_log:
|
||||
logger.log(
|
||||
TRACE_LEVEL,
|
||||
"http.request.start",
|
||||
extra={
|
||||
"method": method,
|
||||
"path": path,
|
||||
"client_ip": client_ip,
|
||||
},
|
||||
)
|
||||
|
||||
async def send_with_request_id(message: Message) -> None:
|
||||
nonlocal status_code
|
||||
if message["type"] == "http.response.start":
|
||||
# Starlette uses `list[tuple[bytes, bytes]]` here.
|
||||
headers: list[tuple[bytes, bytes]] = message.setdefault("headers", [])
|
||||
if not any(key.lower() == self._header_name_bytes for key, _ in headers):
|
||||
request_id_bytes = request_id.encode("latin-1")
|
||||
headers.append((self._header_name_bytes, request_id_bytes))
|
||||
status = message.get("status")
|
||||
status_code = status if isinstance(status, int) else 500
|
||||
if should_log:
|
||||
duration_ms = int((perf_counter() - started_at) * 1000)
|
||||
extra = {
|
||||
"method": method,
|
||||
"path": path,
|
||||
"status_code": status_code,
|
||||
"duration_ms": duration_ms,
|
||||
"client_ip": client_ip,
|
||||
}
|
||||
if status_code >= 500:
|
||||
logger.error("http.request.complete", extra=extra)
|
||||
elif status_code >= 400:
|
||||
logger.warning("http.request.complete", extra=extra)
|
||||
else:
|
||||
logger.debug("http.request.complete", extra=extra)
|
||||
if self._slow_request_ms and duration_ms >= self._slow_request_ms:
|
||||
logger.warning(
|
||||
"http.request.slow",
|
||||
extra={
|
||||
**extra,
|
||||
"slow_threshold_ms": self._slow_request_ms,
|
||||
},
|
||||
)
|
||||
await send(message)
|
||||
|
||||
await self._app(scope, receive, send_with_request_id)
|
||||
try:
|
||||
await self._app(scope, receive, send_with_request_id)
|
||||
finally:
|
||||
if should_log and status_code is None:
|
||||
logger.warning(
|
||||
"http.request.incomplete",
|
||||
extra={
|
||||
"method": method,
|
||||
"path": path,
|
||||
"duration_ms": int((perf_counter() - started_at) * 1000),
|
||||
"client_ip": client_ip,
|
||||
},
|
||||
)
|
||||
reset_request_route_context(route_context_tokens)
|
||||
reset_request_id(context_token)
|
||||
|
||||
def _get_or_create_request_id(self, scope: Scope) -> str:
|
||||
# Accept a client-provided request id if present.
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from contextvars import ContextVar, Token
|
||||
from datetime import UTC, datetime
|
||||
from types import TracebackType
|
||||
from typing import Any
|
||||
@@ -17,6 +18,9 @@ from app.core.version import APP_NAME, APP_VERSION
|
||||
TRACE_LEVEL = 5
|
||||
EXC_INFO_TUPLE_SIZE = 3
|
||||
logging.addLevelName(TRACE_LEVEL, "TRACE")
|
||||
_REQUEST_ID_CONTEXT: ContextVar[str | None] = ContextVar("request_id", default=None)
|
||||
_REQUEST_METHOD_CONTEXT: ContextVar[str | None] = ContextVar("request_method", default=None)
|
||||
_REQUEST_PATH_CONTEXT: ContextVar[str | None] = ContextVar("request_path", default=None)
|
||||
|
||||
|
||||
def _coerce_exc_info(
|
||||
@@ -75,6 +79,53 @@ def _trace(self: logging.Logger, message: str, *args: object, **kwargs: object)
|
||||
|
||||
logging.Logger.trace = _trace # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def set_request_id(request_id: str | None) -> Token[str | None]:
|
||||
"""Bind request-id to logging context for the current task."""
|
||||
normalized = (request_id or "").strip() or None
|
||||
return _REQUEST_ID_CONTEXT.set(normalized)
|
||||
|
||||
|
||||
def reset_request_id(token: Token[str | None]) -> None:
|
||||
"""Reset request-id context to a previous token value."""
|
||||
_REQUEST_ID_CONTEXT.reset(token)
|
||||
|
||||
|
||||
def get_request_id() -> str | None:
|
||||
"""Return request-id currently bound to logging context."""
|
||||
return _REQUEST_ID_CONTEXT.get()
|
||||
|
||||
|
||||
def set_request_route_context(
|
||||
method: str | None,
|
||||
path: str | None,
|
||||
) -> tuple[Token[str | None], Token[str | None]]:
|
||||
"""Bind request method/path to logging context for the current task."""
|
||||
normalized_method = (method or "").strip().upper() or None
|
||||
normalized_path = (path or "").strip() or None
|
||||
return (
|
||||
_REQUEST_METHOD_CONTEXT.set(normalized_method),
|
||||
_REQUEST_PATH_CONTEXT.set(normalized_path),
|
||||
)
|
||||
|
||||
|
||||
def reset_request_route_context(tokens: tuple[Token[str | None], Token[str | None]]) -> None:
|
||||
"""Reset request method/path context to previously-bound values."""
|
||||
method_token, path_token = tokens
|
||||
_REQUEST_METHOD_CONTEXT.reset(method_token)
|
||||
_REQUEST_PATH_CONTEXT.reset(path_token)
|
||||
|
||||
|
||||
def get_request_method() -> str | None:
|
||||
"""Return request method currently bound to logging context."""
|
||||
return _REQUEST_METHOD_CONTEXT.get()
|
||||
|
||||
|
||||
def get_request_path() -> str | None:
|
||||
"""Return request path currently bound to logging context."""
|
||||
return _REQUEST_PATH_CONTEXT.get()
|
||||
|
||||
|
||||
_STANDARD_LOG_RECORD_ATTRS = {
|
||||
"args",
|
||||
"asctime",
|
||||
@@ -117,6 +168,18 @@ class AppLogFilter(logging.Filter):
|
||||
"""Attach app metadata fields to each emitted record."""
|
||||
record.app = self._app_name
|
||||
record.version = self._version
|
||||
if not getattr(record, "request_id", None):
|
||||
request_id = get_request_id()
|
||||
if request_id:
|
||||
record.request_id = request_id
|
||||
if not getattr(record, "method", None):
|
||||
method = get_request_method()
|
||||
if method:
|
||||
record.method = method
|
||||
if not getattr(record, "path", None):
|
||||
path = get_request_path()
|
||||
if path:
|
||||
record.path = path
|
||||
return True
|
||||
|
||||
|
||||
@@ -227,6 +290,18 @@ class AppLogger:
|
||||
logger = logging.getLogger(name)
|
||||
logger.disabled = True
|
||||
|
||||
logging.getLogger(__name__).info(
|
||||
"logging.configured level=%s format=%s use_utc=%s",
|
||||
level_name,
|
||||
format_name,
|
||||
settings.log_use_utc,
|
||||
)
|
||||
logging.getLogger(__name__).debug(
|
||||
"logging.libraries uvicorn_level=%s sql_enabled=%s",
|
||||
level_name,
|
||||
level_name == "TRACE",
|
||||
)
|
||||
|
||||
cls._configured = True
|
||||
|
||||
@classmethod
|
||||
@@ -240,3 +315,8 @@ class AppLogger:
|
||||
def configure_logging() -> None:
|
||||
"""Configure global application logging once during startup."""
|
||||
AppLogger.configure()
|
||||
|
||||
|
||||
def get_logger(name: str | None = None) -> logging.Logger:
|
||||
"""Return an app logger from the centralized logger configuration."""
|
||||
return AppLogger.get_logger(name)
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@@ -16,6 +15,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from app import models as _models
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import AsyncGenerator
|
||||
@@ -42,7 +42,7 @@ async_session_maker = async_sessionmaker(
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def _alembic_config() -> Config:
|
||||
|
||||
@@ -28,20 +28,30 @@ from app.api.tasks import router as tasks_router
|
||||
from app.api.users import router as users_router
|
||||
from app.core.config import settings
|
||||
from app.core.error_handling import install_error_handling
|
||||
from app.core.logging import configure_logging
|
||||
from app.core.logging import configure_logging, get_logger
|
||||
from app.db.session import init_db
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
configure_logging()
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
|
||||
"""Initialize application resources before serving requests."""
|
||||
logger.info(
|
||||
"app.lifecycle.starting environment=%s db_auto_migrate=%s",
|
||||
settings.environment,
|
||||
settings.db_auto_migrate,
|
||||
)
|
||||
await init_db()
|
||||
yield
|
||||
logger.info("app.lifecycle.started")
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
logger.info("app.lifecycle.stopped")
|
||||
|
||||
|
||||
app = FastAPI(title="Mission Control API", version="0.1.0", lifespan=lifespan)
|
||||
@@ -55,6 +65,9 @@ if origins:
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
logger.info("app.cors.enabled origins_count=%s", len(origins))
|
||||
else:
|
||||
logger.info("app.cors.disabled")
|
||||
|
||||
install_error_handling(app)
|
||||
|
||||
@@ -98,3 +111,4 @@ api_v1.include_router(users_router)
|
||||
app.include_router(api_v1)
|
||||
|
||||
add_pagination(app)
|
||||
logger.debug("app.routes.registered count=%s", len(app.routes))
|
||||
|
||||
@@ -10,6 +10,7 @@ from fastapi import HTTPException, status
|
||||
from sqlmodel import col
|
||||
|
||||
from app.core.auth import AuthContext
|
||||
from app.core.logging import TRACE_LEVEL
|
||||
from app.core.time import utcnow
|
||||
from app.db import crud
|
||||
from app.models.activity_events import ActivityEvent
|
||||
@@ -256,7 +257,7 @@ class GatewayAdminLifecycleService(OpenClawDBService):
|
||||
action: str = "provision",
|
||||
) -> Agent:
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.main_agent.ensure.start gateway_id=%s action=%s",
|
||||
gateway.id,
|
||||
action,
|
||||
@@ -331,7 +332,7 @@ class GatewayAdminLifecycleService(OpenClawDBService):
|
||||
auth: AuthContext,
|
||||
) -> GatewayTemplatesSyncResult:
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.templates.sync.start gateway_id=%s include_main=%s",
|
||||
gateway.id,
|
||||
query.include_main,
|
||||
|
||||
@@ -12,6 +12,7 @@ from fastapi import HTTPException, status
|
||||
from sqlmodel import col, select
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import TRACE_LEVEL
|
||||
from app.core.time import utcnow
|
||||
from app.models.agents import Agent
|
||||
from app.models.boards import Board
|
||||
@@ -172,7 +173,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
||||
) -> None:
|
||||
trace_id = GatewayDispatchService.resolve_trace_id(correlation_id, prefix="coord.nudge")
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.coordination.nudge.start trace_id=%s board_id=%s actor_agent_id=%s "
|
||||
"target_agent_id=%s",
|
||||
trace_id,
|
||||
@@ -252,7 +253,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
||||
) -> str:
|
||||
trace_id = GatewayDispatchService.resolve_trace_id(correlation_id, prefix="coord.soul.read")
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.coordination.soul_read.start trace_id=%s board_id=%s target_agent_id=%s",
|
||||
trace_id,
|
||||
board.id,
|
||||
@@ -322,7 +323,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
||||
correlation_id, prefix="coord.soul.write"
|
||||
)
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.coordination.soul_write.start trace_id=%s board_id=%s target_agent_id=%s "
|
||||
"actor_agent_id=%s",
|
||||
trace_id,
|
||||
@@ -418,7 +419,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
||||
payload.correlation_id, prefix="coord.ask_user"
|
||||
)
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.coordination.ask_user.start trace_id=%s board_id=%s actor_agent_id=%s",
|
||||
trace_id,
|
||||
board.id,
|
||||
@@ -563,7 +564,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
||||
payload.correlation_id, prefix="coord.lead_message"
|
||||
)
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.coordination.lead_message.start trace_id=%s board_id=%s actor_agent_id=%s",
|
||||
trace_id,
|
||||
board_id,
|
||||
@@ -652,7 +653,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
||||
payload.correlation_id, prefix="coord.lead_broadcast"
|
||||
)
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.coordination.lead_broadcast.start trace_id=%s actor_agent_id=%s",
|
||||
trace_id,
|
||||
actor_agent.id,
|
||||
|
||||
@@ -6,9 +6,11 @@ OpenClaw services without adding new architectural layers.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from logging import Logger
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from app.core.logging import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
@@ -19,7 +21,7 @@ class OpenClawDBService:
|
||||
def __init__(self, session: AsyncSession) -> None:
|
||||
self._session = session
|
||||
# Use the concrete subclass module for logger naming.
|
||||
self._logger = logging.getLogger(self.__class__.__module__)
|
||||
self._logger = get_logger(self.__class__.__module__)
|
||||
|
||||
@property
|
||||
def session(self) -> AsyncSession:
|
||||
@@ -30,11 +32,11 @@ class OpenClawDBService:
|
||||
self._session = value
|
||||
|
||||
@property
|
||||
def logger(self) -> logging.Logger:
|
||||
def logger(self) -> Logger:
|
||||
return self._logger
|
||||
|
||||
@logger.setter
|
||||
def logger(self, value: logging.Logger) -> None:
|
||||
def logger(self, value: Logger) -> None:
|
||||
self._logger = value
|
||||
|
||||
async def add_commit_refresh(self, model: object) -> None:
|
||||
|
||||
@@ -10,6 +10,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from time import perf_counter
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode, urlparse, urlunparse
|
||||
from uuid import uuid4
|
||||
@@ -17,7 +18,10 @@ from uuid import uuid4
|
||||
import websockets
|
||||
from websockets.exceptions import WebSocketException
|
||||
|
||||
from app.core.logging import TRACE_LEVEL, get_logger
|
||||
|
||||
PROTOCOL_VERSION = 3
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# NOTE: These are the base gateway methods from the OpenClaw gateway repo.
|
||||
# The gateway can expose additional methods at runtime via channel plugins.
|
||||
@@ -165,6 +169,11 @@ def _build_gateway_url(config: GatewayConfig) -> str:
|
||||
return str(urlunparse(parsed._replace(query=query)))
|
||||
|
||||
|
||||
def _redacted_url_for_log(raw_url: str) -> str:
|
||||
parsed = urlparse(raw_url)
|
||||
return str(urlunparse(parsed._replace(query="", fragment="")))
|
||||
|
||||
|
||||
async def _await_response(
|
||||
ws: websockets.ClientConnection,
|
||||
request_id: str,
|
||||
@@ -172,6 +181,12 @@ async def _await_response(
|
||||
while True:
|
||||
raw = await ws.recv()
|
||||
data = json.loads(raw)
|
||||
logger.log(
|
||||
TRACE_LEVEL,
|
||||
"gateway.rpc.recv request_id=%s type=%s",
|
||||
request_id,
|
||||
data.get("type"),
|
||||
)
|
||||
|
||||
if data.get("type") == "res" and data.get("id") == request_id:
|
||||
ok = data.get("ok")
|
||||
@@ -199,6 +214,13 @@ async def _send_request(
|
||||
"method": method,
|
||||
"params": params or {},
|
||||
}
|
||||
logger.log(
|
||||
TRACE_LEVEL,
|
||||
"gateway.rpc.send method=%s request_id=%s params_keys=%s",
|
||||
method,
|
||||
request_id,
|
||||
sorted((params or {}).keys()),
|
||||
)
|
||||
await ws.send(json.dumps(message))
|
||||
return await _await_response(ws, request_id)
|
||||
|
||||
@@ -229,7 +251,11 @@ async def _ensure_connected(
|
||||
first_message = first_message.decode("utf-8")
|
||||
data = json.loads(first_message)
|
||||
if data.get("type") != "event" or data.get("event") != "connect.challenge":
|
||||
pass
|
||||
logger.warning(
|
||||
"gateway.rpc.connect.unexpected_first_message type=%s event=%s",
|
||||
data.get("type"),
|
||||
data.get("event"),
|
||||
)
|
||||
connect_id = str(uuid4())
|
||||
response = {
|
||||
"type": "req",
|
||||
@@ -249,6 +275,12 @@ async def openclaw_call(
|
||||
) -> object:
|
||||
"""Call a gateway RPC method and return the result payload."""
|
||||
gateway_url = _build_gateway_url(config)
|
||||
started_at = perf_counter()
|
||||
logger.debug(
|
||||
"gateway.rpc.call.start method=%s gateway_url=%s",
|
||||
method,
|
||||
_redacted_url_for_log(gateway_url),
|
||||
)
|
||||
try:
|
||||
async with websockets.connect(gateway_url, ping_interval=None) as ws:
|
||||
first_message = None
|
||||
@@ -257,8 +289,19 @@ async def openclaw_call(
|
||||
except TimeoutError:
|
||||
first_message = None
|
||||
await _ensure_connected(ws, first_message, config)
|
||||
return await _send_request(ws, method, params)
|
||||
payload = await _send_request(ws, method, params)
|
||||
logger.debug(
|
||||
"gateway.rpc.call.success method=%s duration_ms=%s",
|
||||
method,
|
||||
int((perf_counter() - started_at) * 1000),
|
||||
)
|
||||
return payload
|
||||
except OpenClawGatewayError:
|
||||
logger.warning(
|
||||
"gateway.rpc.call.gateway_error method=%s duration_ms=%s",
|
||||
method,
|
||||
int((perf_counter() - started_at) * 1000),
|
||||
)
|
||||
raise
|
||||
except (
|
||||
TimeoutError,
|
||||
@@ -267,6 +310,12 @@ async def openclaw_call(
|
||||
ValueError,
|
||||
WebSocketException,
|
||||
) as exc: # pragma: no cover - network/protocol errors
|
||||
logger.error(
|
||||
"gateway.rpc.call.transport_error method=%s duration_ms=%s error_type=%s",
|
||||
method,
|
||||
int((perf_counter() - started_at) * 1000),
|
||||
exc.__class__.__name__,
|
||||
)
|
||||
raise OpenClawGatewayError(str(exc)) from exc
|
||||
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from app.core.logging import TRACE_LEVEL
|
||||
from app.models.board_onboarding import BoardOnboardingSession
|
||||
from app.models.boards import Board
|
||||
from app.services.openclaw.coordination_service import AbstractGatewayMessagingService
|
||||
@@ -25,7 +26,7 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService):
|
||||
correlation_id, prefix="onboarding.start"
|
||||
)
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.onboarding.start_dispatch.start trace_id=%s board_id=%s",
|
||||
trace_id,
|
||||
board.id,
|
||||
@@ -83,7 +84,7 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService):
|
||||
correlation_id, prefix="onboarding.answer"
|
||||
)
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.onboarding.answer_dispatch.start trace_id=%s board_id=%s onboarding_id=%s",
|
||||
trace_id,
|
||||
board.id,
|
||||
|
||||
@@ -22,6 +22,7 @@ from sqlmodel import col, select
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
|
||||
from app.core.agent_tokens import verify_agent_token
|
||||
from app.core.logging import TRACE_LEVEL
|
||||
from app.core.time import utcnow
|
||||
from app.db import crud
|
||||
from app.db.pagination import paginate
|
||||
@@ -983,7 +984,7 @@ class AgentLifecycleService(OpenClawDBService):
|
||||
raise_gateway_errors: bool,
|
||||
) -> None:
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"agent.provision.start action=%s agent_id=%s target_main=%s",
|
||||
action,
|
||||
agent.id,
|
||||
@@ -1471,7 +1472,7 @@ class AgentLifecycleService(OpenClawDBService):
|
||||
actor: ActorContextLike,
|
||||
) -> AgentRead:
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"agent.create.start actor_type=%s board_id=%s",
|
||||
actor.actor_type,
|
||||
payload.board_id,
|
||||
@@ -1523,7 +1524,12 @@ class AgentLifecycleService(OpenClawDBService):
|
||||
payload: AgentUpdate,
|
||||
options: AgentUpdateOptions,
|
||||
) -> AgentRead:
|
||||
self.logger.log(5, "agent.update.start agent_id=%s force=%s", agent_id, options.force)
|
||||
self.logger.log(
|
||||
TRACE_LEVEL,
|
||||
"agent.update.start agent_id=%s force=%s",
|
||||
agent_id,
|
||||
options.force,
|
||||
)
|
||||
agent = await Agent.objects.by_id(agent_id).first(self.session)
|
||||
if agent is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
||||
@@ -1573,7 +1579,10 @@ class AgentLifecycleService(OpenClawDBService):
|
||||
actor: ActorContextLike,
|
||||
) -> AgentRead:
|
||||
self.logger.log(
|
||||
5, "agent.heartbeat.start agent_id=%s actor_type=%s", agent_id, actor.actor_type
|
||||
TRACE_LEVEL,
|
||||
"agent.heartbeat.start agent_id=%s actor_type=%s",
|
||||
agent_id,
|
||||
actor.actor_type,
|
||||
)
|
||||
agent = await Agent.objects.by_id(agent_id).first(self.session)
|
||||
if agent is None:
|
||||
@@ -1599,7 +1608,7 @@ class AgentLifecycleService(OpenClawDBService):
|
||||
actor: ActorContextLike,
|
||||
) -> AgentRead:
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"agent.heartbeat_or_create.start actor_type=%s name=%s board_id=%s",
|
||||
actor.actor_type,
|
||||
payload.name,
|
||||
@@ -1644,7 +1653,7 @@ class AgentLifecycleService(OpenClawDBService):
|
||||
agent_id: str,
|
||||
ctx: OrganizationContext,
|
||||
) -> OkResponse:
|
||||
self.logger.log(5, "agent.delete.start agent_id=%s", agent_id)
|
||||
self.logger.log(TRACE_LEVEL, "agent.delete.start agent_id=%s", agent_id)
|
||||
agent = await Agent.objects.by_id(agent_id).first(self.session)
|
||||
if agent is None:
|
||||
return OkResponse()
|
||||
|
||||
@@ -9,6 +9,7 @@ from uuid import UUID
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
from app.core.logging import TRACE_LEVEL
|
||||
from app.models.boards import Board
|
||||
from app.schemas.gateway_api import (
|
||||
GatewayResolveQuery,
|
||||
@@ -88,7 +89,7 @@ class GatewaySessionService(OpenClawDBService):
|
||||
user: User | None = None,
|
||||
) -> tuple[Board | None, GatewayClientConfig, str | None]:
|
||||
self.logger.log(
|
||||
5,
|
||||
TRACE_LEVEL,
|
||||
"gateway.resolve.start board_id=%s gateway_url=%s",
|
||||
params.board_id,
|
||||
params.gateway_url,
|
||||
|
||||
114
backend/tests/test_common_logging_policy.py
Normal file
114
backend/tests/test_common_logging_policy.py
Normal file
@@ -0,0 +1,114 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from app.core.logging import TRACE_LEVEL, AppLogger, get_logger
|
||||
|
||||
BACKEND_ROOT = Path(__file__).resolve().parents[1]
|
||||
APP_ROOT = BACKEND_ROOT / "app"
|
||||
COMMON_LOGGER_FILE = APP_ROOT / "core" / "logging.py"
|
||||
|
||||
|
||||
def _iter_app_python_files() -> list[Path]:
|
||||
files: list[Path] = []
|
||||
for path in APP_ROOT.rglob("*.py"):
|
||||
if "__pycache__" in path.parts:
|
||||
continue
|
||||
files.append(path)
|
||||
return files
|
||||
|
||||
|
||||
def test_common_logger_supports_trace_to_critical_levels() -> None:
|
||||
AppLogger.configure(force=True)
|
||||
logger = get_logger("tests.common_logging_policy.levels")
|
||||
logger.setLevel(TRACE_LEVEL)
|
||||
|
||||
stream = io.StringIO()
|
||||
handler = logging.StreamHandler(stream)
|
||||
handler.setLevel(TRACE_LEVEL)
|
||||
handler.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
|
||||
logger.addHandler(handler)
|
||||
|
||||
try:
|
||||
logger.log(TRACE_LEVEL, "trace-level")
|
||||
logger.debug("debug-level")
|
||||
logger.info("info-level")
|
||||
logger.warning("warning-level")
|
||||
logger.error("error-level")
|
||||
logger.critical("critical-level")
|
||||
finally:
|
||||
logger.removeHandler(handler)
|
||||
handler.close()
|
||||
|
||||
lines = [line.strip() for line in stream.getvalue().splitlines() if line.strip()]
|
||||
assert lines == [
|
||||
"TRACE:trace-level",
|
||||
"DEBUG:debug-level",
|
||||
"INFO:info-level",
|
||||
"WARNING:warning-level",
|
||||
"ERROR:error-level",
|
||||
"CRITICAL:critical-level",
|
||||
]
|
||||
|
||||
|
||||
def test_backend_app_uses_common_logger() -> None:
|
||||
offenders: list[str] = []
|
||||
|
||||
for path in _iter_app_python_files():
|
||||
if path == COMMON_LOGGER_FILE:
|
||||
continue
|
||||
|
||||
text = path.read_text(encoding="utf-8")
|
||||
rel = path.relative_to(BACKEND_ROOT).as_posix()
|
||||
|
||||
if re.search(r"^\s*import\s+logging\b", text, flags=re.MULTILINE):
|
||||
offenders.append(f"{rel}: imports logging directly")
|
||||
if "logging.getLogger(" in text:
|
||||
offenders.append(f"{rel}: calls logging.getLogger directly")
|
||||
|
||||
assert not offenders, "\n".join(offenders)
|
||||
|
||||
|
||||
def test_module_level_loggers_bind_via_get_logger() -> None:
|
||||
offenders: list[str] = []
|
||||
assignment_pattern = re.compile(r"^\s*logger\s*=\s*(.+)$", flags=re.MULTILINE)
|
||||
|
||||
for path in _iter_app_python_files():
|
||||
if path == COMMON_LOGGER_FILE:
|
||||
continue
|
||||
text = path.read_text(encoding="utf-8")
|
||||
rel = path.relative_to(BACKEND_ROOT).as_posix()
|
||||
for expression in assignment_pattern.findall(text):
|
||||
normalized = expression.strip()
|
||||
if normalized.startswith("get_logger("):
|
||||
continue
|
||||
offenders.append(f"{rel}: logger assignment `{normalized}` is not get_logger(...)")
|
||||
|
||||
assert not offenders, "\n".join(offenders)
|
||||
|
||||
|
||||
def test_backend_app_has_all_log_levels_in_use() -> None:
|
||||
level_patterns: dict[str, re.Pattern[str]] = {
|
||||
"trace": re.compile(
|
||||
r"\b(?:self\.)?logger\.log\(\s*TRACE_LEVEL\b|\b(?:self\.)?logger\.trace\("
|
||||
),
|
||||
"debug": re.compile(r"\b(?:self\.)?logger\.debug\("),
|
||||
"info": re.compile(r"\b(?:self\.)?logger\.info\("),
|
||||
"warning": re.compile(r"\b(?:self\.)?logger\.warning\("),
|
||||
"error": re.compile(r"\b(?:self\.)?logger\.error\("),
|
||||
"critical": re.compile(r"\b(?:self\.)?logger\.critical\("),
|
||||
}
|
||||
|
||||
merged_source = "\n".join(
|
||||
path.read_text(encoding="utf-8")
|
||||
for path in _iter_app_python_files()
|
||||
if path != COMMON_LOGGER_FILE
|
||||
)
|
||||
|
||||
missing = [
|
||||
name for name, pattern in level_patterns.items() if not pattern.search(merged_source)
|
||||
]
|
||||
assert not missing, f"Missing log levels in backend app code: {', '.join(missing)}"
|
||||
@@ -2,9 +2,14 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
from app.core import error_handling as error_handling_module
|
||||
from app.core.error_handling import REQUEST_ID_HEADER, RequestIdMiddleware
|
||||
from app.core.logging import TRACE_LEVEL, AppLogFilter, get_logger
|
||||
from app.core.version import APP_NAME, APP_VERSION
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -89,3 +94,136 @@ async def test_request_id_middleware_does_not_duplicate_existing_header() -> Non
|
||||
v for k, v in start_headers if k.lower() == REQUEST_ID_HEADER.lower().encode("latin-1")
|
||||
]
|
||||
assert values == [b"already"]
|
||||
|
||||
|
||||
class _CaptureHandler(logging.Handler):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.records: list[logging.LogRecord] = []
|
||||
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
self.records.append(record)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_id_middleware_logs_trace_start_and_debug_completion() -> None:
|
||||
capture = _CaptureHandler()
|
||||
capture.setLevel(TRACE_LEVEL)
|
||||
logger = error_handling_module.logger
|
||||
logger.setLevel(TRACE_LEVEL)
|
||||
logger.addHandler(capture)
|
||||
|
||||
async def app(scope, receive, send): # type: ignore[no-untyped-def]
|
||||
await send({"type": "http.response.start", "status": 200, "headers": []})
|
||||
await send({"type": "http.response.body", "body": b"ok"})
|
||||
|
||||
middleware = RequestIdMiddleware(app)
|
||||
request_scope = {
|
||||
"type": "http",
|
||||
"method": "GET",
|
||||
"path": "/api/v1/auth/bootstrap",
|
||||
"client": ("127.0.0.1", 5454),
|
||||
"headers": [],
|
||||
}
|
||||
sent_messages: list[dict[str, object]] = []
|
||||
|
||||
async def send(message): # type: ignore[no-untyped-def]
|
||||
sent_messages.append(message)
|
||||
|
||||
try:
|
||||
await middleware(request_scope, lambda: None, send)
|
||||
finally:
|
||||
logger.removeHandler(capture)
|
||||
capture.close()
|
||||
|
||||
start = next(
|
||||
record for record in capture.records if record.getMessage() == "http.request.start"
|
||||
)
|
||||
complete = next(
|
||||
record for record in capture.records if record.getMessage() == "http.request.complete"
|
||||
)
|
||||
|
||||
assert start.levelname == "TRACE"
|
||||
assert getattr(start, "method", None) == "GET"
|
||||
assert getattr(start, "path", None) == "/api/v1/auth/bootstrap"
|
||||
|
||||
assert complete.levelname == "DEBUG"
|
||||
assert getattr(complete, "status_code", None) == 200
|
||||
assert isinstance(getattr(complete, "duration_ms", None), int)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_id_middleware_logs_error_for_5xx_completion() -> None:
|
||||
capture = _CaptureHandler()
|
||||
capture.setLevel(TRACE_LEVEL)
|
||||
logger = error_handling_module.logger
|
||||
logger.setLevel(TRACE_LEVEL)
|
||||
logger.addHandler(capture)
|
||||
|
||||
async def app(scope, receive, send): # type: ignore[no-untyped-def]
|
||||
await send({"type": "http.response.start", "status": 503, "headers": []})
|
||||
await send({"type": "http.response.body", "body": b"unavailable"})
|
||||
|
||||
middleware = RequestIdMiddleware(app)
|
||||
request_scope = {
|
||||
"type": "http",
|
||||
"method": "POST",
|
||||
"path": "/api/v1/tasks",
|
||||
"client": ("127.0.0.1", 5454),
|
||||
"headers": [],
|
||||
}
|
||||
sent_messages: list[dict[str, object]] = []
|
||||
|
||||
async def send(message): # type: ignore[no-untyped-def]
|
||||
sent_messages.append(message)
|
||||
|
||||
try:
|
||||
await middleware(request_scope, lambda: None, send)
|
||||
finally:
|
||||
logger.removeHandler(capture)
|
||||
capture.close()
|
||||
|
||||
complete = next(
|
||||
record for record in capture.records if record.getMessage() == "http.request.complete"
|
||||
)
|
||||
assert complete.levelname == "ERROR"
|
||||
assert getattr(complete, "status_code", None) == 503
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_request_id_middleware_enriches_in_request_logs_with_route_context() -> None:
|
||||
capture = _CaptureHandler()
|
||||
capture.setLevel(TRACE_LEVEL)
|
||||
capture.addFilter(AppLogFilter(APP_NAME, APP_VERSION))
|
||||
|
||||
app_logger = get_logger("tests.request_context.enrichment")
|
||||
app_logger.setLevel(TRACE_LEVEL)
|
||||
app_logger.addHandler(capture)
|
||||
|
||||
async def app(scope, receive, send): # type: ignore[no-untyped-def]
|
||||
app_logger.info("inside.request.handler")
|
||||
await send({"type": "http.response.start", "status": 200, "headers": []})
|
||||
await send({"type": "http.response.body", "body": b"ok"})
|
||||
|
||||
middleware = RequestIdMiddleware(app)
|
||||
request_scope = {
|
||||
"type": "http",
|
||||
"method": "PUT",
|
||||
"path": "/api/v1/boards/abc",
|
||||
"client": ("127.0.0.1", 5454),
|
||||
"headers": [],
|
||||
}
|
||||
|
||||
async def send(_message): # type: ignore[no-untyped-def]
|
||||
return None
|
||||
|
||||
try:
|
||||
await middleware(request_scope, lambda: None, send)
|
||||
finally:
|
||||
app_logger.removeHandler(capture)
|
||||
capture.close()
|
||||
|
||||
record = next(item for item in capture.records if item.getMessage() == "inside.request.handler")
|
||||
assert isinstance(getattr(record, "request_id", None), str) and getattr(record, "request_id")
|
||||
assert getattr(record, "method", None) == "PUT"
|
||||
assert getattr(record, "path", None) == "/api/v1/boards/abc"
|
||||
|
||||
Reference in New Issue
Block a user