feat: add custom-fields

This commit is contained in:
Abhimanyu Saharan
2026-02-13 21:24:36 +05:30
parent b032e94ca1
commit 277bfcb33a
127 changed files with 11305 additions and 6643 deletions

View File

@@ -287,13 +287,16 @@ async def create_task(
"""Create a task as the board lead.
Lead-only endpoint. Supports dependency-aware creation via
`depends_on_task_ids` and optional `tag_ids`.
`depends_on_task_ids`, optional `tag_ids`, and `custom_field_values`.
"""
_guard_board_access(agent_ctx, board)
_require_board_lead(agent_ctx)
data = payload.model_dump(exclude={"depends_on_task_ids", "tag_ids"})
data = payload.model_dump(
exclude={"depends_on_task_ids", "tag_ids", "custom_field_values"},
)
depends_on_task_ids = list(payload.depends_on_task_ids)
tag_ids = list(payload.tag_ids)
custom_field_values = dict(payload.custom_field_values)
task = Task.model_validate(data)
task.board_id = board.id
@@ -343,6 +346,12 @@ async def create_task(
session.add(task)
# Ensure the task exists in the DB before inserting dependency rows.
await session.flush()
await tasks_api._set_task_custom_field_values_for_create(
session,
board_id=board.id,
task_id=task.id,
custom_field_values=custom_field_values,
)
for dep_id in normalized_deps:
session.add(
TaskDependency(

View File

@@ -0,0 +1,343 @@
"""Organization-level task custom field definition management."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError
from sqlmodel import col, select
from app.api.deps import require_org_admin, require_org_member
from app.core.time import utcnow
from app.db.session import get_session
from app.models.boards import Board
from app.models.task_custom_fields import (
BoardTaskCustomField,
TaskCustomFieldDefinition,
TaskCustomFieldValue,
)
from app.schemas.common import OkResponse
from app.schemas.task_custom_fields import (
TaskCustomFieldDefinitionCreate,
TaskCustomFieldDefinitionRead,
TaskCustomFieldDefinitionUpdate,
validate_custom_field_definition,
)
from app.services.organizations import OrganizationContext
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/organizations/me/custom-fields", tags=["org-custom-fields"])
SESSION_DEP = Depends(get_session)
ORG_MEMBER_DEP = Depends(require_org_member)
ORG_ADMIN_DEP = Depends(require_org_admin)
def _to_definition_read_payload(
*,
definition: TaskCustomFieldDefinition,
board_ids: list[UUID],
) -> TaskCustomFieldDefinitionRead:
payload = TaskCustomFieldDefinitionRead.model_validate(definition, from_attributes=True)
payload.board_ids = board_ids
return payload
async def _board_ids_by_definition_id(
*,
session: AsyncSession,
definition_ids: list[UUID],
) -> dict[UUID, list[UUID]]:
if not definition_ids:
return {}
rows = (
await session.exec(
select(
col(BoardTaskCustomField.task_custom_field_definition_id),
col(BoardTaskCustomField.board_id),
).where(
col(BoardTaskCustomField.task_custom_field_definition_id).in_(definition_ids),
),
)
).all()
board_ids_by_definition_id: dict[UUID, list[UUID]] = {
definition_id: [] for definition_id in definition_ids
}
for definition_id, board_id in rows:
board_ids_by_definition_id.setdefault(definition_id, []).append(board_id)
for definition_id in board_ids_by_definition_id:
board_ids_by_definition_id[definition_id].sort(key=str)
return board_ids_by_definition_id
async def _validated_board_ids_for_org(
*,
session: AsyncSession,
ctx: OrganizationContext,
board_ids: list[UUID],
) -> list[UUID]:
normalized_board_ids = list(dict.fromkeys(board_ids))
if not normalized_board_ids:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="At least one board must be selected.",
)
valid_board_ids = set(
(
await session.exec(
select(col(Board.id)).where(
col(Board.organization_id) == ctx.organization.id,
col(Board.id).in_(normalized_board_ids),
),
)
).all(),
)
missing_board_ids = sorted(
{board_id for board_id in normalized_board_ids if board_id not in valid_board_ids},
key=str,
)
if missing_board_ids:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"message": "Some selected boards are invalid for this organization.",
"invalid_board_ids": [str(value) for value in missing_board_ids],
},
)
return normalized_board_ids
async def _get_org_definition(
*,
session: AsyncSession,
ctx: OrganizationContext,
definition_id: UUID,
) -> TaskCustomFieldDefinition:
definition = (
await session.exec(
select(TaskCustomFieldDefinition).where(
col(TaskCustomFieldDefinition.id) == definition_id,
col(TaskCustomFieldDefinition.organization_id) == ctx.organization.id,
),
)
).first()
if definition is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return definition
@router.get("", response_model=list[TaskCustomFieldDefinitionRead])
async def list_org_custom_fields(
ctx: OrganizationContext = ORG_MEMBER_DEP,
session: AsyncSession = SESSION_DEP,
) -> list[TaskCustomFieldDefinitionRead]:
"""List task custom field definitions for the authenticated organization."""
definitions = list(
await session.exec(
select(TaskCustomFieldDefinition)
.where(col(TaskCustomFieldDefinition.organization_id) == ctx.organization.id)
.order_by(func.lower(col(TaskCustomFieldDefinition.label)).asc()),
),
)
board_ids_by_definition_id = await _board_ids_by_definition_id(
session=session,
definition_ids=[definition.id for definition in definitions],
)
return [
_to_definition_read_payload(
definition=definition,
board_ids=board_ids_by_definition_id.get(definition.id, []),
)
for definition in definitions
]
@router.post("", response_model=TaskCustomFieldDefinitionRead)
async def create_org_custom_field(
payload: TaskCustomFieldDefinitionCreate,
ctx: OrganizationContext = ORG_ADMIN_DEP,
session: AsyncSession = SESSION_DEP,
) -> TaskCustomFieldDefinitionRead:
"""Create an organization-level task custom field definition."""
board_ids = await _validated_board_ids_for_org(
session=session,
ctx=ctx,
board_ids=payload.board_ids,
)
try:
validate_custom_field_definition(
field_type=payload.field_type,
validation_regex=payload.validation_regex,
default_value=payload.default_value,
)
except ValueError as err:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(err),
) from err
definition = TaskCustomFieldDefinition(
organization_id=ctx.organization.id,
field_key=payload.field_key,
label=payload.label or payload.field_key,
field_type=payload.field_type,
ui_visibility=payload.ui_visibility,
validation_regex=payload.validation_regex,
description=payload.description,
required=payload.required,
default_value=payload.default_value,
)
session.add(definition)
await session.flush()
for board_id in board_ids:
session.add(
BoardTaskCustomField(
board_id=board_id,
task_custom_field_definition_id=definition.id,
),
)
try:
await session.commit()
except IntegrityError as err:
await session.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Field key already exists in this organization.",
) from err
await session.refresh(definition)
return _to_definition_read_payload(definition=definition, board_ids=board_ids)
@router.patch("/{task_custom_field_definition_id}", response_model=TaskCustomFieldDefinitionRead)
async def update_org_custom_field(
task_custom_field_definition_id: UUID,
payload: TaskCustomFieldDefinitionUpdate,
ctx: OrganizationContext = ORG_ADMIN_DEP,
session: AsyncSession = SESSION_DEP,
) -> TaskCustomFieldDefinitionRead:
"""Update an organization-level task custom field definition."""
definition = await _get_org_definition(
session=session,
ctx=ctx,
definition_id=task_custom_field_definition_id,
)
updates = payload.model_dump(exclude_unset=True)
board_ids = updates.pop("board_ids", None)
validated_board_ids: list[UUID] | None = None
if board_ids is not None:
validated_board_ids = await _validated_board_ids_for_org(
session=session,
ctx=ctx,
board_ids=board_ids,
)
next_field_type = updates.get("field_type", definition.field_type)
next_validation_regex = (
updates["validation_regex"]
if "validation_regex" in updates
else definition.validation_regex
)
next_default_value = (
updates["default_value"] if "default_value" in updates else definition.default_value
)
try:
validate_custom_field_definition(
field_type=next_field_type,
validation_regex=next_validation_regex,
default_value=next_default_value,
)
except ValueError as err:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(err),
) from err
for key, value in updates.items():
setattr(definition, key, value)
if validated_board_ids is not None:
bindings = list(
await session.exec(
select(BoardTaskCustomField).where(
col(BoardTaskCustomField.task_custom_field_definition_id) == definition.id,
),
),
)
current_board_ids = {binding.board_id for binding in bindings}
target_board_ids = set(validated_board_ids)
for binding in bindings:
if binding.board_id not in target_board_ids:
await session.delete(binding)
for board_id in validated_board_ids:
if board_id in current_board_ids:
continue
session.add(
BoardTaskCustomField(
board_id=board_id,
task_custom_field_definition_id=definition.id,
),
)
definition.updated_at = utcnow()
session.add(definition)
try:
await session.commit()
except IntegrityError as err:
await session.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Field key already exists in this organization.",
) from err
await session.refresh(definition)
if validated_board_ids is None:
board_ids = (
await _board_ids_by_definition_id(
session=session,
definition_ids=[definition.id],
)
).get(definition.id, [])
else:
board_ids = validated_board_ids
return _to_definition_read_payload(definition=definition, board_ids=board_ids)
@router.delete("/{task_custom_field_definition_id}", response_model=OkResponse)
async def delete_org_custom_field(
task_custom_field_definition_id: UUID,
ctx: OrganizationContext = ORG_ADMIN_DEP,
session: AsyncSession = SESSION_DEP,
) -> OkResponse:
"""Delete an org-level definition when it has no persisted task values."""
definition = await _get_org_definition(
session=session,
ctx=ctx,
definition_id=task_custom_field_definition_id,
)
value_ids = (
await session.exec(
select(col(TaskCustomFieldValue.id)).where(
col(TaskCustomFieldValue.task_custom_field_definition_id) == definition.id,
),
)
).all()
if value_ids:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Cannot delete a custom field definition while task values exist.",
)
bindings = list(
await session.exec(
select(BoardTaskCustomField).where(
col(BoardTaskCustomField.task_custom_field_definition_id) == definition.id,
),
),
)
for binding in bindings:
await session.delete(binding)
await session.delete(definition)
await session.commit()
return OkResponse()

View File

@@ -7,7 +7,7 @@ import json
from collections import deque
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
@@ -33,6 +33,11 @@ from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.tag_assignments import TagAssignment
from app.models.task_custom_fields import (
BoardTaskCustomField,
TaskCustomFieldDefinition,
TaskCustomFieldValue,
)
from app.models.task_dependencies import TaskDependency
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
@@ -40,6 +45,11 @@ from app.schemas.activity_events import ActivityEventRead
from app.schemas.common import OkResponse
from app.schemas.errors import BlockedTaskError
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.task_custom_fields import (
TaskCustomFieldType,
TaskCustomFieldValues,
validate_custom_field_value,
)
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
from app.services.approval_task_links import (
@@ -99,6 +109,16 @@ ADMIN_AUTH_DEP = Depends(require_admin_auth)
TASK_DEP = Depends(get_task_or_404)
@dataclass(frozen=True, slots=True)
class _BoardCustomFieldDefinition:
id: UUID
field_key: str
field_type: TaskCustomFieldType
validation_regex: str | None
required: bool
default_value: object | None
def _comment_validation_error() -> HTTPException:
return HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
@@ -697,6 +717,281 @@ def _status_values(status_filter: str | None) -> list[str]:
return values
async def _organization_custom_field_definitions_for_board(
session: AsyncSession,
*,
board_id: UUID,
) -> dict[str, _BoardCustomFieldDefinition]:
organization_id = (
await session.exec(
select(Board.organization_id).where(col(Board.id) == board_id),
)
).first()
if organization_id is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
definitions = list(
await session.exec(
select(TaskCustomFieldDefinition)
.join(
BoardTaskCustomField,
col(BoardTaskCustomField.task_custom_field_definition_id)
== col(TaskCustomFieldDefinition.id),
)
.where(
col(BoardTaskCustomField.board_id) == board_id,
col(TaskCustomFieldDefinition.organization_id) == organization_id,
),
),
)
return {
definition.field_key: _BoardCustomFieldDefinition(
id=definition.id,
field_key=definition.field_key,
field_type=cast(TaskCustomFieldType, definition.field_type),
validation_regex=definition.validation_regex,
required=definition.required,
default_value=definition.default_value,
)
for definition in definitions
}
def _reject_unknown_custom_field_keys(
*,
custom_field_values: TaskCustomFieldValues,
definitions_by_key: dict[str, _BoardCustomFieldDefinition],
) -> None:
unknown_field_keys = sorted(set(custom_field_values) - set(definitions_by_key))
if not unknown_field_keys:
return
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"message": "Unknown custom field keys for this board.",
"unknown_field_keys": unknown_field_keys,
},
)
def _reject_missing_required_custom_field_keys(
*,
effective_values: TaskCustomFieldValues,
definitions_by_key: dict[str, _BoardCustomFieldDefinition],
) -> None:
missing_field_keys = [
definition.field_key
for definition in definitions_by_key.values()
if definition.required and effective_values.get(definition.field_key) is None
]
if not missing_field_keys:
return
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"message": "Required custom fields must have values.",
"missing_field_keys": sorted(missing_field_keys),
},
)
def _reject_invalid_custom_field_values(
*,
custom_field_values: TaskCustomFieldValues,
definitions_by_key: dict[str, _BoardCustomFieldDefinition],
) -> None:
for field_key, value in custom_field_values.items():
definition = definitions_by_key[field_key]
try:
validate_custom_field_value(
field_type=definition.field_type,
value=value,
validation_regex=definition.validation_regex,
)
except ValueError as err:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"message": "Invalid custom field value.",
"field_key": field_key,
"field_type": definition.field_type,
"reason": str(err),
},
) from err
async def _task_custom_field_rows_by_definition_id(
session: AsyncSession,
*,
task_id: UUID,
definition_ids: list[UUID],
) -> dict[UUID, TaskCustomFieldValue]:
if not definition_ids:
return {}
rows = list(
await session.exec(
select(TaskCustomFieldValue).where(
col(TaskCustomFieldValue.task_id) == task_id,
col(TaskCustomFieldValue.task_custom_field_definition_id).in_(definition_ids),
),
),
)
return {row.task_custom_field_definition_id: row for row in rows}
async def _set_task_custom_field_values_for_create(
session: AsyncSession,
*,
board_id: UUID,
task_id: UUID,
custom_field_values: TaskCustomFieldValues,
) -> None:
definitions_by_key = await _organization_custom_field_definitions_for_board(
session,
board_id=board_id,
)
_reject_unknown_custom_field_keys(
custom_field_values=custom_field_values,
definitions_by_key=definitions_by_key,
)
_reject_invalid_custom_field_values(
custom_field_values=custom_field_values,
definitions_by_key=definitions_by_key,
)
effective_values: TaskCustomFieldValues = {}
for field_key, definition in definitions_by_key.items():
if field_key in custom_field_values:
effective_values[field_key] = custom_field_values[field_key]
else:
effective_values[field_key] = definition.default_value
_reject_missing_required_custom_field_keys(
effective_values=effective_values,
definitions_by_key=definitions_by_key,
)
for field_key, definition in definitions_by_key.items():
value = effective_values.get(field_key)
if value is None:
continue
session.add(
TaskCustomFieldValue(
task_id=task_id,
task_custom_field_definition_id=definition.id,
value=value,
),
)
async def _set_task_custom_field_values_for_update(
session: AsyncSession,
*,
board_id: UUID,
task_id: UUID,
custom_field_values: TaskCustomFieldValues,
) -> None:
definitions_by_key = await _organization_custom_field_definitions_for_board(
session,
board_id=board_id,
)
_reject_unknown_custom_field_keys(
custom_field_values=custom_field_values,
definitions_by_key=definitions_by_key,
)
_reject_invalid_custom_field_values(
custom_field_values=custom_field_values,
definitions_by_key=definitions_by_key,
)
definitions_by_id = {definition.id: definition for definition in definitions_by_key.values()}
rows_by_definition_id = await _task_custom_field_rows_by_definition_id(
session,
task_id=task_id,
definition_ids=list(definitions_by_id),
)
effective_values: TaskCustomFieldValues = {}
for field_key, definition in definitions_by_key.items():
current_row = rows_by_definition_id.get(definition.id)
if field_key in custom_field_values:
effective_values[field_key] = custom_field_values[field_key]
elif current_row is not None:
effective_values[field_key] = current_row.value
else:
effective_values[field_key] = definition.default_value
_reject_missing_required_custom_field_keys(
effective_values=effective_values,
definitions_by_key=definitions_by_key,
)
for field_key, value in custom_field_values.items():
definition = definitions_by_key[field_key]
row = rows_by_definition_id.get(definition.id)
if value is None:
if row is not None:
await session.delete(row)
continue
if row is None:
session.add(
TaskCustomFieldValue(
task_id=task_id,
task_custom_field_definition_id=definition.id,
value=value,
),
)
continue
row.value = value
row.updated_at = utcnow()
session.add(row)
async def _task_custom_field_values_by_task_id(
session: AsyncSession,
*,
board_id: UUID,
task_ids: Sequence[UUID],
) -> dict[UUID, TaskCustomFieldValues]:
unique_task_ids = list({*task_ids})
if not unique_task_ids:
return {}
definitions_by_key = await _organization_custom_field_definitions_for_board(
session,
board_id=board_id,
)
if not definitions_by_key:
return {task_id: {} for task_id in unique_task_ids}
definitions_by_id = {definition.id: definition for definition in definitions_by_key.values()}
default_values = {
field_key: definition.default_value for field_key, definition in definitions_by_key.items()
}
values_by_task_id: dict[UUID, TaskCustomFieldValues] = {
task_id: dict(default_values) for task_id in unique_task_ids
}
rows = (
await session.exec(
select(
col(TaskCustomFieldValue.task_id),
col(TaskCustomFieldValue.task_custom_field_definition_id),
col(TaskCustomFieldValue.value),
).where(
col(TaskCustomFieldValue.task_id).in_(unique_task_ids),
col(TaskCustomFieldValue.task_custom_field_definition_id).in_(
list(definitions_by_id),
),
),
)
).all()
for task_id, definition_id, value in rows:
definition = definitions_by_id.get(definition_id)
if definition is None:
continue
values_by_task_id[task_id][definition.field_key] = value
return values_by_task_id
def _task_list_statement(
*,
board_id: UUID,
@@ -742,6 +1037,11 @@ async def _task_read_page(
board_id=board_id,
dependency_ids=list({*dep_ids}),
)
custom_field_values_by_task_id = await _task_custom_field_values_by_task_id(
session,
board_id=board_id,
task_ids=task_ids,
)
output: list[TaskRead] = []
for task in tasks:
@@ -761,6 +1061,7 @@ async def _task_read_page(
"tags": tag_state.tags,
"blocked_by_task_ids": blocked_by,
"is_blocked": bool(blocked_by),
"custom_field_values": custom_field_values_by_task_id.get(task.id, {}),
},
),
)
@@ -772,12 +1073,17 @@ async def _stream_task_state(
*,
board_id: UUID,
rows: list[tuple[ActivityEvent, Task | None]],
) -> tuple[dict[UUID, list[UUID]], dict[UUID, str], dict[UUID, TagState]]:
) -> tuple[
dict[UUID, list[UUID]],
dict[UUID, str],
dict[UUID, TagState],
dict[UUID, TaskCustomFieldValues],
]:
task_ids = [
task.id for event, task in rows if task is not None and event.event_type != "task.comment"
]
if not task_ids:
return {}, {}, {}
return {}, {}, {}, {}
tag_state_by_task_id = await load_tag_state(
session,
@@ -791,15 +1097,20 @@ async def _stream_task_state(
dep_ids: list[UUID] = []
for value in deps_map.values():
dep_ids.extend(value)
custom_field_values_by_task_id = await _task_custom_field_values_by_task_id(
session,
board_id=board_id,
task_ids=list({*task_ids}),
)
if not dep_ids:
return deps_map, {}, tag_state_by_task_id
return deps_map, {}, tag_state_by_task_id, custom_field_values_by_task_id
dep_status = await dependency_status_by_id(
session,
board_id=board_id,
dependency_ids=list({*dep_ids}),
)
return deps_map, dep_status, tag_state_by_task_id
return deps_map, dep_status, tag_state_by_task_id, custom_field_values_by_task_id
def _task_event_payload(
@@ -809,7 +1120,9 @@ def _task_event_payload(
deps_map: dict[UUID, list[UUID]],
dep_status: dict[UUID, str],
tag_state_by_task_id: dict[UUID, TagState],
custom_field_values_by_task_id: dict[UUID, TaskCustomFieldValues] | None = None,
) -> dict[str, object]:
resolved_custom_field_values_by_task_id = custom_field_values_by_task_id or {}
payload: dict[str, object] = {
"type": event.event_type,
"activity": ActivityEventRead.model_validate(event).model_dump(mode="json"),
@@ -838,6 +1151,10 @@ def _task_event_payload(
"tags": tag_state.tags,
"blocked_by_task_ids": blocked_by,
"is_blocked": bool(blocked_by),
"custom_field_values": resolved_custom_field_values_by_task_id.get(
task.id,
{},
),
},
)
.model_dump(mode="json")
@@ -861,10 +1178,12 @@ async def _task_event_generator(
async with async_session_maker() as session:
rows = await _fetch_task_events(session, board_id, last_seen)
deps_map, dep_status, tag_state_by_task_id = await _stream_task_state(
session,
board_id=board_id,
rows=rows,
deps_map, dep_status, tag_state_by_task_id, custom_field_values_by_task_id = (
await _stream_task_state(
session,
board_id=board_id,
rows=rows,
)
)
for event, task in rows:
@@ -883,6 +1202,7 @@ async def _task_event_generator(
deps_map=deps_map,
dep_status=dep_status,
tag_state_by_task_id=tag_state_by_task_id,
custom_field_values_by_task_id=custom_field_values_by_task_id,
)
yield {"event": "task", "data": json.dumps(payload)}
await asyncio.sleep(2)
@@ -943,9 +1263,10 @@ async def create_task(
auth: AuthContext = ADMIN_AUTH_DEP,
) -> TaskRead:
"""Create a task and initialize dependency rows."""
data = payload.model_dump(exclude={"depends_on_task_ids", "tag_ids"})
data = payload.model_dump(exclude={"depends_on_task_ids", "tag_ids", "custom_field_values"})
depends_on_task_ids = list(payload.depends_on_task_ids)
tag_ids = list(payload.tag_ids)
custom_field_values = dict(payload.custom_field_values)
task = Task.model_validate(data)
task.board_id = board.id
@@ -977,6 +1298,12 @@ async def create_task(
session.add(task)
# Ensure the task exists in the DB before inserting dependency rows.
await session.flush()
await _set_task_custom_field_values_for_create(
session,
board_id=board.id,
task_id=task.id,
custom_field_values=custom_field_values,
)
for dep_id in normalized_deps:
session.add(
TaskDependency(
@@ -1051,9 +1378,14 @@ async def update_task(
payload.depends_on_task_ids if "depends_on_task_ids" in payload.model_fields_set else None
)
tag_ids = payload.tag_ids if "tag_ids" in payload.model_fields_set else None
custom_field_values = (
payload.custom_field_values if "custom_field_values" in payload.model_fields_set else None
)
custom_field_values_set = "custom_field_values" in payload.model_fields_set
updates.pop("comment", None)
updates.pop("depends_on_task_ids", None)
updates.pop("tag_ids", None)
updates.pop("custom_field_values", None)
requested_status = payload.status if "status" in payload.model_fields_set else None
update = _TaskUpdateInput(
task=task,
@@ -1066,6 +1398,8 @@ async def update_task(
comment=comment,
depends_on_task_ids=depends_on_task_ids,
tag_ids=tag_ids,
custom_field_values=custom_field_values or {},
custom_field_values_set=custom_field_values_set,
)
if actor.actor_type == "agent" and actor.agent and actor.agent.is_board_lead:
return await _apply_lead_task_update(session, update=update)
@@ -1142,6 +1476,12 @@ async def delete_task(
col(TagAssignment.task_id) == task.id,
commit=False,
)
await crud.delete_where(
session,
TaskCustomFieldValue,
col(TaskCustomFieldValue.task_id) == task.id,
commit=False,
)
await session.delete(task)
await session.commit()
return OkResponse()
@@ -1306,6 +1646,8 @@ class _TaskUpdateInput:
comment: str | None
depends_on_task_ids: list[UUID] | None
tag_ids: list[UUID] | None
custom_field_values: TaskCustomFieldValues
custom_field_values_set: bool
normalized_tag_ids: list[UUID] | None = None
@@ -1385,6 +1727,11 @@ async def _task_read_response(
board_id=board_id,
dep_ids=dep_ids,
)
custom_field_values_by_task_id = await _task_custom_field_values_by_task_id(
session,
board_id=board_id,
task_ids=[task.id],
)
if task.status == "done":
blocked_ids = []
return TaskRead.model_validate(task, from_attributes=True).model_copy(
@@ -1394,6 +1741,7 @@ async def _task_read_response(
"tags": tag_state.tags,
"blocked_by_task_ids": blocked_ids,
"is_blocked": bool(blocked_ids),
"custom_field_values": custom_field_values_by_task_id.get(task.id, {}),
},
)
@@ -1420,18 +1768,26 @@ def _lead_requested_fields(update: _TaskUpdateInput) -> set[str]:
requested_fields.add("depends_on_task_ids")
if update.tag_ids is not None:
requested_fields.add("tag_ids")
if update.custom_field_values_set:
requested_fields.add("custom_field_values")
return requested_fields
def _validate_lead_update_request(update: _TaskUpdateInput) -> None:
allowed_fields = {"assigned_agent_id", "status", "depends_on_task_ids", "tag_ids"}
allowed_fields = {
"assigned_agent_id",
"status",
"depends_on_task_ids",
"tag_ids",
"custom_field_values",
}
requested_fields = _lead_requested_fields(update)
if update.comment is not None or not requested_fields.issubset(allowed_fields):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=(
"Board leads can only assign/unassign tasks, update "
"dependencies, or resolve review tasks."
"dependencies/custom fields, or resolve review tasks."
),
)
@@ -1622,6 +1978,13 @@ async def _apply_lead_task_update(
task_id=update.task.id,
tag_ids=normalized_tag_ids,
)
if update.custom_field_values_set:
await _set_task_custom_field_values_for_update(
session,
board_id=update.board_id,
task_id=update.task.id,
custom_field_values=update.custom_field_values,
)
update.task.updated_at = utcnow()
session.add(update.task)
@@ -1666,7 +2029,7 @@ async def _apply_non_lead_agent_task_rules(
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# Agents are limited to status/comment updates, and non-inbox status moves
# must pass dependency checks before they can proceed.
allowed_fields = {"status", "comment"}
allowed_fields = {"status", "comment", "custom_field_values"}
if (
update.depends_on_task_ids is not None
or update.tag_ids is not None
@@ -1938,6 +2301,14 @@ async def _finalize_updated_task(
tag_ids=normalized or [],
)
if update.custom_field_values_set:
await _set_task_custom_field_values_for_update(
session,
board_id=update.board_id,
task_id=update.task.id,
custom_field_values=update.custom_field_values,
)
session.add(update.task)
await session.commit()
await session.refresh(update.task)

View File

@@ -26,6 +26,7 @@ from app.api.metrics import router as metrics_router
from app.api.organizations import router as organizations_router
from app.api.souls_directory import router as souls_directory_router
from app.api.tags import router as tags_router
from app.api.task_custom_fields import router as task_custom_fields_router
from app.api.tasks import router as tasks_router
from app.api.users import router as users_router
from app.core.config import settings
@@ -145,6 +146,7 @@ api_v1.include_router(board_webhooks_router)
api_v1.include_router(board_onboarding_router)
api_v1.include_router(approvals_router)
api_v1.include_router(tasks_router)
api_v1.include_router(task_custom_fields_router)
api_v1.include_router(tags_router)
api_v1.include_router(users_router)
app.include_router(api_v1)

View File

@@ -19,6 +19,11 @@ from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
from app.models.tag_assignments import TagAssignment
from app.models.tags import Tag
from app.models.task_custom_fields import (
BoardTaskCustomField,
TaskCustomFieldDefinition,
TaskCustomFieldValue,
)
from app.models.task_dependencies import TaskDependency
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
@@ -38,6 +43,9 @@ __all__ = [
"Board",
"Gateway",
"Organization",
"BoardTaskCustomField",
"TaskCustomFieldDefinition",
"TaskCustomFieldValue",
"OrganizationMember",
"OrganizationBoardAccess",
"OrganizationInvite",

View File

@@ -0,0 +1,92 @@
"""Task custom field models and board binding helpers."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, CheckConstraint, Column, UniqueConstraint
from sqlmodel import Field
from app.core.time import utcnow
from app.models.tenancy import TenantScoped
RUNTIME_ANNOTATION_TYPES = (datetime,)
class TaskCustomFieldDefinition(TenantScoped, table=True):
"""Reusable custom field definition for task metadata."""
__tablename__ = "task_custom_field_definitions" # pyright: ignore[reportAssignmentType]
__table_args__ = (
UniqueConstraint(
"organization_id",
"field_key",
name="uq_task_custom_field_definitions_org_id_field_key",
),
CheckConstraint(
"field_type IN ('text','text_long','integer','decimal','boolean','date','date_time','url','json')",
name="ck_tcf_def_field_type",
),
CheckConstraint(
"ui_visibility IN ('always','if_set','hidden')",
name="ck_tcf_def_ui_visibility",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(foreign_key="organizations.id", index=True)
field_key: str = Field(index=True)
label: str
field_type: str = Field(default="text")
ui_visibility: str = Field(default="always")
validation_regex: str | None = None
description: str | None = None
required: bool = Field(default=False)
default_value: object | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
class BoardTaskCustomField(TenantScoped, table=True):
"""Board-level binding of a custom field definition."""
__tablename__ = "board_task_custom_fields" # pyright: ignore[reportAssignmentType]
__table_args__ = (
UniqueConstraint(
"board_id",
"task_custom_field_definition_id",
name="uq_board_task_custom_fields_board_id_task_custom_field_definition_id",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
task_custom_field_definition_id: UUID = Field(
foreign_key="task_custom_field_definitions.id",
index=True,
)
created_at: datetime = Field(default_factory=utcnow)
class TaskCustomFieldValue(TenantScoped, table=True):
"""Stored task-level values for bound custom fields."""
__tablename__ = "task_custom_field_values" # pyright: ignore[reportAssignmentType]
__table_args__ = (
UniqueConstraint(
"task_id",
"task_custom_field_definition_id",
name="uq_task_custom_field_values_task_id_task_custom_field_definition_id",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
task_id: UUID = Field(foreign_key="tasks.id", index=True)
task_custom_field_definition_id: UUID = Field(
foreign_key="task_custom_field_definitions.id",
index=True,
)
value: object | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)

View File

@@ -0,0 +1,366 @@
"""Schemas for task custom field metadata, board bindings, and payloads."""
from __future__ import annotations
import re
from datetime import date, datetime
from typing import Literal, Self
from urllib.parse import urlparse
from uuid import UUID
from pydantic import Field, field_validator, model_validator
from sqlmodel import SQLModel
from app.schemas.common import NonEmptyStr
RUNTIME_ANNOTATION_TYPES = (datetime, UUID, date)
TaskCustomFieldType = Literal[
"text",
"text_long",
"integer",
"decimal",
"boolean",
"date",
"date_time",
"url",
"json",
]
TaskCustomFieldUiVisibility = Literal["always", "if_set", "hidden"]
STRING_FIELD_TYPES: set[str] = {"text", "text_long", "date", "date_time", "url"}
TASK_CUSTOM_FIELD_TYPE_ALIASES: dict[str, TaskCustomFieldType] = {
"text": "text",
"text_long": "text_long",
"text (long)": "text_long",
"long_text": "text_long",
"integer": "integer",
"decimal": "decimal",
"boolean": "boolean",
"true/false": "boolean",
"date": "date",
"date_time": "date_time",
"date & time": "date_time",
"datetime": "date_time",
"url": "url",
"json": "json",
}
TASK_CUSTOM_FIELD_UI_VISIBILITY_ALIASES: dict[str, TaskCustomFieldUiVisibility] = {
"always": "always",
"if_set": "if_set",
"if set": "if_set",
"hidden": "hidden",
}
# Reusable alias for task payload payloads containing custom-field values.
TaskCustomFieldValues = dict[str, object | None]
class TaskCustomFieldDefinitionBase(SQLModel):
"""Shared custom field definition properties."""
field_key: str
label: str | None = None
field_type: TaskCustomFieldType = "text"
ui_visibility: TaskCustomFieldUiVisibility = "always"
validation_regex: str | None = None
description: str | None = None
required: bool = False
default_value: object | None = None
@field_validator("field_key", mode="before")
@classmethod
def normalize_field_key(cls, value: object) -> object:
"""Normalize field keys to a stable lowercase representation."""
if not isinstance(value, str):
raise ValueError("field_key must be a string")
normalized = value.strip()
if not normalized:
raise ValueError("field_key is required")
return normalized
@field_validator("label", mode="before")
@classmethod
def normalize_label(cls, value: object) -> object:
"""Normalize labels to a trimmed representation when provided."""
if value is None:
return None
if not isinstance(value, str):
raise ValueError("label must be a string")
normalized = value.strip()
if not normalized:
raise ValueError("label is required")
return normalized
@field_validator("field_type", mode="before")
@classmethod
def normalize_field_type(cls, value: object) -> object:
"""Normalize field type aliases."""
if not isinstance(value, str):
raise ValueError("field_type must be a string")
normalized = value.strip().lower()
resolved = TASK_CUSTOM_FIELD_TYPE_ALIASES.get(normalized)
if resolved is None:
raise ValueError(
"field_type must be one of: text, text_long, integer, decimal, "
"boolean, date, date_time, url, json",
)
return resolved
@field_validator("validation_regex", mode="before")
@classmethod
def normalize_validation_regex(cls, value: object) -> object:
"""Normalize and validate regex pattern syntax."""
if value is None:
return None
if not isinstance(value, str):
raise ValueError("validation_regex must be a string")
normalized = value.strip()
if not normalized:
return None
try:
re.compile(normalized)
except re.error as exc:
raise ValueError(f"validation_regex is invalid: {exc}") from exc
return normalized
@field_validator("ui_visibility", mode="before")
@classmethod
def normalize_ui_visibility(cls, value: object) -> object:
"""Normalize UI visibility aliases."""
if not isinstance(value, str):
raise ValueError("ui_visibility must be a string")
normalized = value.strip().lower()
resolved = TASK_CUSTOM_FIELD_UI_VISIBILITY_ALIASES.get(normalized)
if resolved is None:
raise ValueError("ui_visibility must be one of: always, if_set, hidden")
return resolved
class TaskCustomFieldDefinitionCreate(TaskCustomFieldDefinitionBase):
"""Payload for creating a task custom field definition."""
field_key: NonEmptyStr
label: NonEmptyStr | None = None
board_ids: list[UUID] = Field(min_length=1)
@field_validator("board_ids")
@classmethod
def normalize_board_ids(cls, value: list[UUID]) -> list[UUID]:
"""Remove duplicates while preserving user-supplied order."""
deduped = list(dict.fromkeys(value))
if not deduped:
raise ValueError("board_ids must include at least one board")
return deduped
@model_validator(mode="after")
def default_label_to_field_key(self) -> Self:
"""Default labels to field_key when omitted by older clients."""
if self.label is None:
self.label = self.field_key
return self
@model_validator(mode="after")
def validate_regex_field_type_combo(self) -> Self:
"""Restrict regex validation to string-compatible field types."""
if self.validation_regex is not None and self.field_type not in STRING_FIELD_TYPES:
raise ValueError(
"validation_regex is only supported for string field types.",
)
return self
class TaskCustomFieldDefinitionUpdate(SQLModel):
"""Payload for editing an existing task custom field definition."""
label: NonEmptyStr | None = None
field_type: TaskCustomFieldType | None = None
ui_visibility: TaskCustomFieldUiVisibility | None = None
validation_regex: str | None = None
description: str | None = None
required: bool | None = None
default_value: object | None = None
board_ids: list[UUID] | None = None
@field_validator("board_ids")
@classmethod
def normalize_board_ids(cls, value: list[UUID] | None) -> list[UUID] | None:
"""Normalize board bindings when provided in updates."""
if value is None:
return None
deduped = list(dict.fromkeys(value))
if not deduped:
raise ValueError("board_ids must include at least one board")
return deduped
@field_validator("field_type", mode="before")
@classmethod
def normalize_optional_field_type(cls, value: object) -> object:
"""Normalize optional field type aliases."""
if value is None:
return None
return TaskCustomFieldDefinitionBase.normalize_field_type(value)
@field_validator("validation_regex", mode="before")
@classmethod
def normalize_optional_validation_regex(cls, value: object) -> object:
"""Normalize and validate optional regex pattern syntax."""
if value is None:
return None
return TaskCustomFieldDefinitionBase.normalize_validation_regex(value)
@field_validator("ui_visibility", mode="before")
@classmethod
def normalize_optional_ui_visibility(cls, value: object) -> object:
"""Normalize optional UI visibility aliases."""
if value is None:
return None
return TaskCustomFieldDefinitionBase.normalize_ui_visibility(value)
@model_validator(mode="before")
@classmethod
def reject_field_key_update(cls, value: object) -> object:
"""Disallow field_key updates after definition creation."""
if isinstance(value, dict) and "field_key" in value:
raise ValueError("field_key cannot be changed after creation.")
return value
@model_validator(mode="after")
def reject_null_for_non_nullable_fields(self) -> Self:
"""Reject explicit null for non-nullable update fields."""
non_nullable_fields = ("label", "field_type", "ui_visibility", "required")
invalid = [
field_name
for field_name in non_nullable_fields
if field_name in self.model_fields_set and getattr(self, field_name) is None
]
if invalid:
raise ValueError(
f"{', '.join(invalid)} cannot be null; omit the field to leave it unchanged",
)
return self
@model_validator(mode="after")
def require_some_update(self) -> Self:
"""Reject empty updates to avoid no-op requests."""
if not self.model_fields_set:
raise ValueError("At least one field is required")
return self
class TaskCustomFieldDefinitionRead(TaskCustomFieldDefinitionBase):
"""Payload returned for custom field definitions."""
id: UUID
organization_id: UUID
label: str
field_type: TaskCustomFieldType
ui_visibility: TaskCustomFieldUiVisibility
validation_regex: str | None = None
board_ids: list[UUID] = Field(default_factory=list)
created_at: datetime
updated_at: datetime
class BoardTaskCustomFieldCreate(SQLModel):
"""Payload for binding a definition to a board."""
task_custom_field_definition_id: UUID
class BoardTaskCustomFieldRead(SQLModel):
"""Payload returned when listing board-bound custom fields."""
id: UUID
board_id: UUID
task_custom_field_definition_id: UUID
field_key: str
label: str
field_type: TaskCustomFieldType
ui_visibility: TaskCustomFieldUiVisibility
validation_regex: str | None
description: str | None
required: bool
default_value: object | None
created_at: datetime
class TaskCustomFieldValuesPayload(SQLModel):
"""Payload for setting all custom-field values at once."""
custom_field_values: TaskCustomFieldValues = Field(default_factory=dict)
def _parse_iso_datetime(value: str) -> datetime:
normalized = value.strip()
if normalized.endswith("Z"):
normalized = f"{normalized[:-1]}+00:00"
return datetime.fromisoformat(normalized)
def validate_custom_field_value(
*,
field_type: TaskCustomFieldType,
value: object | None,
validation_regex: str | None = None,
) -> None:
"""Validate a custom field value against field type and optional regex."""
if value is None:
return
if field_type in {"text", "text_long"}:
if not isinstance(value, str):
raise ValueError("must be a string")
elif field_type == "integer":
if not isinstance(value, int) or isinstance(value, bool):
raise ValueError("must be an integer")
elif field_type == "decimal":
if (not isinstance(value, (int, float))) or isinstance(value, bool):
raise ValueError("must be a decimal number")
elif field_type == "boolean":
if not isinstance(value, bool):
raise ValueError("must be true or false")
elif field_type == "date":
if not isinstance(value, str):
raise ValueError("must be an ISO date string (YYYY-MM-DD)")
try:
date.fromisoformat(value)
except ValueError as exc:
raise ValueError("must be an ISO date string (YYYY-MM-DD)") from exc
elif field_type == "date_time":
if not isinstance(value, str):
raise ValueError("must be an ISO datetime string")
try:
_parse_iso_datetime(value)
except ValueError as exc:
raise ValueError("must be an ISO datetime string") from exc
elif field_type == "url":
if not isinstance(value, str):
raise ValueError("must be a URL string")
parsed = urlparse(value)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("must be a valid http/https URL")
elif field_type == "json":
if not isinstance(value, (dict, list)):
raise ValueError("must be a JSON object or array")
if validation_regex is not None and field_type in STRING_FIELD_TYPES:
if not isinstance(value, str):
raise ValueError("must be a string for regex validation")
if re.fullmatch(validation_regex, value) is None:
raise ValueError("does not match validation_regex")
def validate_custom_field_definition(
*,
field_type: TaskCustomFieldType,
validation_regex: str | None,
default_value: object | None,
) -> None:
"""Validate field definition constraints and default-value compatibility."""
if validation_regex is not None and field_type not in STRING_FIELD_TYPES:
raise ValueError("validation_regex is only supported for string field types.")
validate_custom_field_value(
field_type=field_type,
value=default_value,
validation_regex=validation_regex,
)

View File

@@ -11,6 +11,7 @@ from sqlmodel import Field, SQLModel
from app.schemas.common import NonEmptyStr
from app.schemas.tags import TagRef
from app.schemas.task_custom_fields import TaskCustomFieldValues
TaskStatus = Literal["inbox", "in_progress", "review", "done"]
STATUS_REQUIRED_ERROR = "status is required"
@@ -36,6 +37,7 @@ class TaskCreate(TaskBase):
"""Payload for creating a task."""
created_by_user_id: UUID | None = None
custom_field_values: TaskCustomFieldValues = Field(default_factory=dict)
class TaskUpdate(SQLModel):
@@ -49,6 +51,7 @@ class TaskUpdate(SQLModel):
assigned_agent_id: UUID | None = None
depends_on_task_ids: list[UUID] | None = None
tag_ids: list[UUID] | None = None
custom_field_values: TaskCustomFieldValues | None = None
comment: NonEmptyStr | None = None
@field_validator("comment", mode="before")
@@ -81,6 +84,7 @@ class TaskRead(TaskBase):
blocked_by_task_ids: list[UUID] = Field(default_factory=list)
is_blocked: bool = False
tags: list[TagRef] = Field(default_factory=list)
custom_field_values: TaskCustomFieldValues | None = None
class TaskCommentCreate(SQLModel):

View File

@@ -0,0 +1,141 @@
"""Add task custom field tables.
Revision ID: b6f4c7d9e1a2
Revises: 1a7b2c3d4e5f
Create Date: 2026-02-13 00:20:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "b6f4c7d9e1a2"
down_revision = "1a7b2c3d4e5f"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create task custom-field definition, binding, and value tables."""
op.create_table(
"task_custom_field_definitions",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("field_key", sa.String(), nullable=False),
sa.Column("label", sa.String(), nullable=False),
sa.Column(
"field_type",
sa.String(),
nullable=False,
server_default=sa.text("'text'"),
),
sa.Column(
"ui_visibility",
sa.String(),
nullable=False,
server_default=sa.text("'always'"),
),
sa.Column("validation_regex", sa.String(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("required", sa.Boolean(), nullable=False),
sa.Column("default_value", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"]),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"organization_id",
"field_key",
name="uq_tcf_def_org_key",
),
sa.CheckConstraint(
"field_type IN "
"('text','text_long','integer','decimal','boolean','date','date_time','url','json')",
name="ck_tcf_def_field_type",
),
sa.CheckConstraint(
"ui_visibility IN ('always','if_set','hidden')",
name="ck_tcf_def_ui_visibility",
),
)
op.create_index(
"ix_task_custom_field_definitions_organization_id",
"task_custom_field_definitions",
["organization_id"],
)
op.create_index(
"ix_task_custom_field_definitions_field_key",
"task_custom_field_definitions",
["field_key"],
)
op.create_table(
"board_task_custom_fields",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("task_custom_field_definition_id", sa.Uuid(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.ForeignKeyConstraint(
["task_custom_field_definition_id"],
["task_custom_field_definitions.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"board_id",
"task_custom_field_definition_id",
name="uq_board_tcf_binding",
),
)
op.create_index(
"ix_board_task_custom_fields_board_id",
"board_task_custom_fields",
["board_id"],
)
op.create_index(
"ix_board_task_custom_fields_task_custom_field_definition_id",
"board_task_custom_fields",
["task_custom_field_definition_id"],
)
op.create_table(
"task_custom_field_values",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("task_id", sa.Uuid(), nullable=False),
sa.Column("task_custom_field_definition_id", sa.Uuid(), nullable=False),
sa.Column("value", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["task_id"], ["tasks.id"]),
sa.ForeignKeyConstraint(
["task_custom_field_definition_id"],
["task_custom_field_definitions.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"task_id",
"task_custom_field_definition_id",
name="uq_tcf_values_task_def",
),
)
op.create_index(
"ix_task_custom_field_values_task_id",
"task_custom_field_values",
["task_id"],
)
op.create_index(
"ix_task_custom_field_values_task_custom_field_definition_id",
"task_custom_field_values",
["task_custom_field_definition_id"],
)
def downgrade() -> None:
"""Drop task custom field tables."""
op.drop_table("task_custom_field_values")
op.drop_table("board_task_custom_fields")
op.drop_table("task_custom_field_definitions")

View File

@@ -43,7 +43,9 @@ def main() -> int:
return 1
if len(heads) > 1 and not allow_multiple_heads:
print("ERROR: multiple Alembic heads detected (set ALLOW_MULTIPLE_HEADS=true only for intentional merge windows)")
print(
"ERROR: multiple Alembic heads detected (set ALLOW_MULTIPLE_HEADS=true only for intentional merge windows)"
)
for h in heads:
print(f" - {h}")
return 1