feat(tags): add tag management interfaces and update related schemas

This commit is contained in:
Abhimanyu Saharan
2026-02-12 18:35:48 +05:30
parent 2ebdead95b
commit 8d7d9da4e9
146 changed files with 6956 additions and 8645 deletions

View File

@@ -20,8 +20,8 @@ from app.db.pagination import paginate
from app.db.session import get_session
from app.models.agents import Agent
from app.models.boards import Board
from app.models.tags import Tag
from app.models.task_dependencies import TaskDependency
from app.models.task_tags import TaskTag
from app.models.tasks import Task
from app.schemas.agents import (
AgentCreate,
@@ -44,18 +44,18 @@ from app.schemas.gateway_coordination import (
GatewayMainAskUserResponse,
)
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.task_tags import TaskTagRef
from app.schemas.tags import TagRef
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
from app.services.openclaw.coordination_service import GatewayCoordinationService
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning_db import AgentLifecycleService
from app.services.tags import replace_tags, validate_tag_ids
from app.services.task_dependencies import (
blocked_by_dependency_ids,
dependency_status_by_id,
validate_dependency_update,
)
from app.services.task_tags import replace_task_tags, validate_task_tag_ids
if TYPE_CHECKING:
from collections.abc import Sequence
@@ -214,23 +214,23 @@ async def list_tasks(
)
@router.get("/boards/{board_id}/tags", response_model=list[TaskTagRef])
async def list_task_tags(
@router.get("/boards/{board_id}/tags", response_model=list[TagRef])
async def list_tags(
board: Board = BOARD_DEP,
session: AsyncSession = SESSION_DEP,
agent_ctx: AgentAuthContext = AGENT_CTX_DEP,
) -> list[TaskTagRef]:
"""List task tags available to the board's organization."""
) -> list[TagRef]:
"""List tags available to the board's organization."""
_guard_board_access(agent_ctx, board)
tags = (
await session.exec(
select(TaskTag)
.where(col(TaskTag.organization_id) == board.organization_id)
.order_by(func.lower(col(TaskTag.name)).asc(), col(TaskTag.created_at).asc()),
select(Tag)
.where(col(Tag.organization_id) == board.organization_id)
.order_by(func.lower(col(Tag.name)).asc(), col(Tag.created_at).asc()),
)
).all()
return [
TaskTagRef(
TagRef(
id=tag.id,
name=tag.name,
slug=tag.slug,
@@ -265,7 +265,7 @@ async def create_task(
task_id=task.id,
depends_on_task_ids=depends_on_task_ids,
)
normalized_tag_ids = await validate_task_tag_ids(
normalized_tag_ids = await validate_tag_ids(
session,
organization_id=board.organization_id,
tag_ids=tag_ids,
@@ -310,7 +310,7 @@ async def create_task(
depends_on_task_id=dep_id,
),
)
await replace_task_tags(
await replace_tags(
session,
task_id=task.id,
tag_ids=normalized_tag_ids,

View File

@@ -1,4 +1,4 @@
"""Task-tag CRUD endpoints for organization-scoped task categorization."""
"""Tag CRUD endpoints for organization-scoped task categorization."""
from __future__ import annotations
@@ -14,13 +14,13 @@ from app.core.time import utcnow
from app.db import crud
from app.db.pagination import paginate
from app.db.session import get_session
from app.models.task_tag_assignments import TaskTagAssignment
from app.models.task_tags import TaskTag
from app.models.tag_assignments import TagAssignment
from app.models.tags import Tag
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.task_tags import TaskTagCreate, TaskTagRead, TaskTagUpdate
from app.schemas.tags import TagCreate, TagRead, TagUpdate
from app.services.organizations import OrganizationContext
from app.services.task_tags import slugify_task_tag, task_counts_for_tags
from app.services.tags import slugify_tag, task_counts_for_tags
if TYPE_CHECKING:
from collections.abc import Sequence
@@ -36,16 +36,16 @@ ORG_ADMIN_DEP = Depends(require_org_admin)
def _normalize_slug(slug: str | None, *, fallback_name: str) -> str:
source = (slug or "").strip() or fallback_name
return slugify_task_tag(source)
return slugify_tag(source)
async def _require_org_task_tag(
async def _require_org_tag(
session: AsyncSession,
*,
tag_id: UUID,
ctx: OrganizationContext,
) -> TaskTag:
tag = await TaskTag.objects.by_id(tag_id).first(session)
) -> Tag:
tag = await Tag.objects.by_id(tag_id).first(session)
if tag is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if tag.organization_id != ctx.organization.id:
@@ -60,7 +60,7 @@ async def _ensure_slug_available(
slug: str,
exclude_tag_id: UUID | None = None,
) -> None:
existing = await TaskTag.objects.filter_by(organization_id=organization_id, slug=slug).first(
existing = await Tag.objects.filter_by(organization_id=organization_id, slug=slug).first(
session
)
if existing is None:
@@ -69,15 +69,15 @@ async def _ensure_slug_available(
return
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Task tag slug already exists in this organization.",
detail="Tag slug already exists in this organization.",
)
async def _tag_read_page(
*,
session: AsyncSession,
items: Sequence[TaskTag],
) -> list[TaskTagRead]:
items: Sequence[Tag],
) -> list[TagRead]:
if not items:
return []
counts = await task_counts_for_tags(
@@ -85,30 +85,30 @@ async def _tag_read_page(
tag_ids=[item.id for item in items],
)
return [
TaskTagRead.model_validate(item, from_attributes=True).model_copy(
TagRead.model_validate(item, from_attributes=True).model_copy(
update={"task_count": counts.get(item.id, 0)},
)
for item in items
]
@router.get("", response_model=DefaultLimitOffsetPage[TaskTagRead])
async def list_task_tags(
@router.get("", response_model=DefaultLimitOffsetPage[TagRead])
async def list_tags(
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> LimitOffsetPage[TaskTagRead]:
"""List task tags for the active organization."""
) -> LimitOffsetPage[TagRead]:
"""List tags for the active organization."""
statement = (
select(TaskTag)
.where(col(TaskTag.organization_id) == ctx.organization.id)
.order_by(func.lower(col(TaskTag.name)).asc(), col(TaskTag.created_at).asc())
select(Tag)
.where(col(Tag.organization_id) == ctx.organization.id)
.order_by(func.lower(col(Tag.name)).asc(), col(Tag.created_at).asc())
)
async def _transform(items: Sequence[object]) -> Sequence[object]:
tags: list[TaskTag] = []
tags: list[Tag] = []
for item in items:
if not isinstance(item, TaskTag):
msg = "Expected TaskTag items from paginated query"
if not isinstance(item, Tag):
msg = "Expected Tag items from paginated query"
raise TypeError(msg)
tags.append(item)
return await _tag_read_page(session=session, items=tags)
@@ -116,13 +116,13 @@ async def list_task_tags(
return await paginate(session, statement, transformer=_transform)
@router.post("", response_model=TaskTagRead)
async def create_task_tag(
payload: TaskTagCreate,
@router.post("", response_model=TagRead)
async def create_tag(
payload: TagCreate,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_ADMIN_DEP,
) -> TaskTagRead:
"""Create a task tag within the active organization."""
) -> TagRead:
"""Create a tag within the active organization."""
slug = _normalize_slug(payload.slug, fallback_name=payload.name)
await _ensure_slug_available(
session,
@@ -131,49 +131,49 @@ async def create_task_tag(
)
tag = await crud.create(
session,
TaskTag,
Tag,
organization_id=ctx.organization.id,
name=payload.name,
slug=slug,
color=payload.color,
description=payload.description,
)
return TaskTagRead.model_validate(tag, from_attributes=True)
return TagRead.model_validate(tag, from_attributes=True)
@router.get("/{tag_id}", response_model=TaskTagRead)
async def get_task_tag(
@router.get("/{tag_id}", response_model=TagRead)
async def get_tag(
tag_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> TaskTagRead:
"""Get a single task tag in the active organization."""
tag = await _require_org_task_tag(
) -> TagRead:
"""Get a single tag in the active organization."""
tag = await _require_org_tag(
session,
tag_id=tag_id,
ctx=ctx,
)
count = (
await session.exec(
select(func.count(col(TaskTagAssignment.task_id))).where(
col(TaskTagAssignment.tag_id) == tag.id,
select(func.count(col(TagAssignment.task_id))).where(
col(TagAssignment.tag_id) == tag.id,
),
)
).one()
return TaskTagRead.model_validate(tag, from_attributes=True).model_copy(
return TagRead.model_validate(tag, from_attributes=True).model_copy(
update={"task_count": int(count or 0)},
)
@router.patch("/{tag_id}", response_model=TaskTagRead)
async def update_task_tag(
@router.patch("/{tag_id}", response_model=TagRead)
async def update_tag(
tag_id: UUID,
payload: TaskTagUpdate,
payload: TagUpdate,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_ADMIN_DEP,
) -> TaskTagRead:
"""Update a task tag in the active organization."""
tag = await _require_org_task_tag(
) -> TagRead:
"""Update a tag in the active organization."""
tag = await _require_org_tag(
session,
tag_id=tag_id,
ctx=ctx,
@@ -194,25 +194,25 @@ async def update_task_tag(
)
updates["updated_at"] = utcnow()
updated = await crud.patch(session, tag, updates)
return TaskTagRead.model_validate(updated, from_attributes=True)
return TagRead.model_validate(updated, from_attributes=True)
@router.delete("/{tag_id}", response_model=OkResponse)
async def delete_task_tag(
async def delete_tag(
tag_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_ADMIN_DEP,
) -> OkResponse:
"""Delete a task tag and remove all associated task-tag links."""
tag = await _require_org_task_tag(
"""Delete a tag and remove all associated tag links."""
tag = await _require_org_tag(
session,
tag_id=tag_id,
ctx=ctx,
)
await crud.delete_where(
session,
TaskTagAssignment,
col(TaskTagAssignment.tag_id) == tag.id,
TagAssignment,
col(TagAssignment.tag_id) == tag.id,
commit=False,
)
await session.delete(tag)

View File

@@ -32,9 +32,9 @@ from app.models.agents import Agent
from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.tag_assignments import TagAssignment
from app.models.task_dependencies import TaskDependency
from app.models.task_fingerprints import TaskFingerprint
from app.models.task_tag_assignments import TaskTagAssignment
from app.models.tasks import Task
from app.schemas.activity_events import ActivityEventRead
from app.schemas.common import OkResponse
@@ -48,6 +48,12 @@ from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.organizations import require_board_access
from app.services.tags import (
TagState,
load_tag_state,
replace_tags,
validate_tag_ids,
)
from app.services.task_dependencies import (
blocked_by_dependency_ids,
dependency_ids_by_task_id,
@@ -56,12 +62,6 @@ from app.services.task_dependencies import (
replace_task_dependencies,
validate_dependency_update,
)
from app.services.task_tags import (
TaskTagState,
load_task_tag_state,
replace_task_tags,
validate_task_tag_ids,
)
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Sequence
@@ -583,7 +583,7 @@ async def _task_read_page(
return []
task_ids = [task.id for task in tasks]
tag_state_by_task_id = await load_task_tag_state(
tag_state_by_task_id = await load_tag_state(
session,
task_ids=task_ids,
)
@@ -603,7 +603,7 @@ async def _task_read_page(
output: list[TaskRead] = []
for task in tasks:
tag_state = tag_state_by_task_id.get(task.id, TaskTagState())
tag_state = tag_state_by_task_id.get(task.id, TagState())
dep_list = deps_map.get(task.id, [])
blocked_by = blocked_by_dependency_ids(
dependency_ids=dep_list,
@@ -630,14 +630,14 @@ async def _stream_task_state(
*,
board_id: UUID,
rows: list[tuple[ActivityEvent, Task | None]],
) -> tuple[dict[UUID, list[UUID]], dict[UUID, str], dict[UUID, TaskTagState]]:
) -> tuple[dict[UUID, list[UUID]], dict[UUID, str], dict[UUID, TagState]]:
task_ids = [
task.id for event, task in rows if task is not None and event.event_type != "task.comment"
]
if not task_ids:
return {}, {}, {}
tag_state_by_task_id = await load_task_tag_state(
tag_state_by_task_id = await load_tag_state(
session,
task_ids=list({*task_ids}),
)
@@ -666,7 +666,7 @@ def _task_event_payload(
*,
deps_map: dict[UUID, list[UUID]],
dep_status: dict[UUID, str],
tag_state_by_task_id: dict[UUID, TaskTagState],
tag_state_by_task_id: dict[UUID, TagState],
) -> dict[str, object]:
payload: dict[str, object] = {
"type": event.event_type,
@@ -679,7 +679,7 @@ def _task_event_payload(
payload["task"] = None
return payload
tag_state = tag_state_by_task_id.get(task.id, TaskTagState())
tag_state = tag_state_by_task_id.get(task.id, TagState())
dep_list = deps_map.get(task.id, [])
blocked_by = blocked_by_dependency_ids(
dependency_ids=dep_list,
@@ -816,7 +816,7 @@ async def create_task(
task_id=task.id,
depends_on_task_ids=depends_on_task_ids,
)
normalized_tag_ids = await validate_task_tag_ids(
normalized_tag_ids = await validate_tag_ids(
session,
organization_id=board.organization_id,
tag_ids=tag_ids,
@@ -843,7 +843,7 @@ async def create_task(
depends_on_task_id=dep_id,
),
)
await replace_task_tags(
await replace_tags(
session,
task_id=task.id,
tag_ids=normalized_tag_ids,
@@ -994,8 +994,8 @@ async def delete_task(
)
await crud.delete_where(
session,
TaskTagAssignment,
col(TaskTagAssignment.task_id) == task.id,
TagAssignment,
col(TagAssignment.task_id) == task.id,
commit=False,
)
await session.delete(task)
@@ -1231,9 +1231,9 @@ async def _task_read_response(
board_id: UUID,
) -> TaskRead:
dep_ids = await _task_dep_ids(session, board_id=board_id, task_id=task.id)
tag_state = (await load_task_tag_state(session, task_ids=[task.id])).get(
tag_state = (await load_tag_state(session, task_ids=[task.id])).get(
task.id,
TaskTagState(),
TagState(),
)
blocked_ids = await _task_blocked_ids(
session,
@@ -1337,7 +1337,7 @@ async def _normalized_update_tag_ids(
session,
board_id=update.board_id,
)
return await validate_task_tag_ids(
return await validate_tag_ids(
session,
organization_id=organization_id,
tag_ids=update.tag_ids,
@@ -1449,7 +1449,7 @@ async def _apply_lead_task_update(
_lead_apply_status(update)
if normalized_tag_ids is not None:
await replace_task_tags(
await replace_tags(
session,
task_id=update.task.id,
tag_ids=normalized_tag_ids,
@@ -1723,7 +1723,7 @@ async def _finalize_updated_task(
update=update,
)
)
await replace_task_tags(
await replace_tags(
session,
task_id=update.task.id,
tag_ids=normalized or [],