2026-02-09 15:49:50 +05:30
|
|
|
"""Lightweight immutable query-set wrapper for SQLModel statements."""
|
|
|
|
|
|
2026-02-09 00:51:26 +05:30
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from dataclasses import dataclass, replace
|
2026-02-09 20:40:17 +05:30
|
|
|
from typing import TYPE_CHECKING, Any, Generic, TypeVar
|
2026-02-09 00:51:26 +05:30
|
|
|
|
|
|
|
|
from sqlmodel import select
|
2026-02-09 15:49:50 +05:30
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
2026-02-09 20:40:17 +05:30
|
|
|
from sqlalchemy.orm import Mapped
|
|
|
|
|
from sqlalchemy.sql.elements import ColumnElement
|
2026-02-09 15:49:50 +05:30
|
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
|
from sqlmodel.sql.expression import SelectOfScalar
|
2026-02-09 00:51:26 +05:30
|
|
|
|
|
|
|
|
ModelT = TypeVar("ModelT")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class QuerySet(Generic[ModelT]):
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Composable immutable wrapper around a SQLModel scalar select statement."""
|
|
|
|
|
|
2026-02-09 00:51:26 +05:30
|
|
|
statement: SelectOfScalar[ModelT]
|
|
|
|
|
|
2026-02-09 20:40:17 +05:30
|
|
|
def filter(
|
|
|
|
|
self,
|
|
|
|
|
*criteria: ColumnElement[bool] | bool,
|
|
|
|
|
) -> QuerySet[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Return a new queryset with additional SQL criteria."""
|
2026-02-09 20:40:17 +05:30
|
|
|
statement = self.statement.where(*criteria)
|
2026-02-09 17:43:42 +05:30
|
|
|
return replace(self, statement=statement)
|
2026-02-09 00:51:26 +05:30
|
|
|
|
2026-02-09 20:40:17 +05:30
|
|
|
def where(
|
|
|
|
|
self,
|
|
|
|
|
*criteria: ColumnElement[bool] | bool,
|
|
|
|
|
) -> QuerySet[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Alias for `filter` to mirror SQLAlchemy naming."""
|
2026-02-09 02:04:14 +05:30
|
|
|
return self.filter(*criteria)
|
|
|
|
|
|
2026-02-09 15:49:50 +05:30
|
|
|
def filter_by(self, **kwargs: object) -> QuerySet[ModelT]:
|
|
|
|
|
"""Return a new queryset filtered by keyword-equality criteria."""
|
2026-02-09 02:04:14 +05:30
|
|
|
statement = self.statement.filter_by(**kwargs)
|
|
|
|
|
return replace(self, statement=statement)
|
|
|
|
|
|
2026-02-09 20:40:17 +05:30
|
|
|
def order_by(
|
|
|
|
|
self,
|
|
|
|
|
*ordering: Mapped[Any] | ColumnElement[Any] | str,
|
|
|
|
|
) -> QuerySet[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Return a new queryset with ordering clauses applied."""
|
2026-02-09 20:40:17 +05:30
|
|
|
statement = self.statement.order_by(*ordering)
|
2026-02-09 17:43:42 +05:30
|
|
|
return replace(self, statement=statement)
|
2026-02-09 00:51:26 +05:30
|
|
|
|
|
|
|
|
def limit(self, value: int) -> QuerySet[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Return a new queryset with a SQL row limit."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return replace(self, statement=self.statement.limit(value))
|
|
|
|
|
|
|
|
|
|
def offset(self, value: int) -> QuerySet[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Return a new queryset with a SQL row offset."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return replace(self, statement=self.statement.offset(value))
|
|
|
|
|
|
|
|
|
|
async def all(self, session: AsyncSession) -> list[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Execute and return all rows for the current queryset."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return list(await session.exec(self.statement))
|
|
|
|
|
|
|
|
|
|
async def first(self, session: AsyncSession) -> ModelT | None:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Execute and return the first row, if available."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return (await session.exec(self.statement)).first()
|
|
|
|
|
|
|
|
|
|
async def one_or_none(self, session: AsyncSession) -> ModelT | None:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Execute and return one row or `None`."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return (await session.exec(self.statement)).one_or_none()
|
|
|
|
|
|
|
|
|
|
async def exists(self, session: AsyncSession) -> bool:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Return whether the queryset yields at least one row."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return await self.limit(1).first(session) is not None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def qs(model: type[ModelT]) -> QuerySet[ModelT]:
|
2026-02-09 15:49:50 +05:30
|
|
|
"""Create a base queryset for a SQLModel class."""
|
2026-02-09 00:51:26 +05:30
|
|
|
return QuerySet(select(model))
|