Mission Control: notify OpenClaw via tools/invoke on task events

This commit is contained in:
Abhimanyu Saharan
2026-02-02 16:05:18 +05:30
parent 63419a26f1
commit 2bcd078c53
7 changed files with 254 additions and 12 deletions

View File

@@ -0,0 +1,31 @@
"""employee_openclaw_routing_fields
Revision ID: b8810dd52471
Revises: 2b8d1e2c0d01
Create Date: 2026-02-02 16:03:56.528787
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b8810dd52471"
down_revision: Union[str, Sequence[str], None] = "2b8d1e2c0d01"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.add_column("employees", sa.Column("openclaw_session_key", sa.String(), nullable=True))
op.add_column("employees", sa.Column("notify_enabled", sa.Boolean(), nullable=False, server_default=sa.true()))
def downgrade() -> None:
"""Downgrade schema."""
op.drop_column("employees", "notify_enabled")
op.drop_column("employees", "openclaw_session_key")

View File

@@ -79,11 +79,17 @@ def list_employees(session: Session = Depends(get_session)):
def create_employee(payload: EmployeeCreate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
emp = Employee(**payload.model_dump())
session.add(emp)
session.commit()
session.refresh(emp)
try:
session.flush()
log_activity(session, actor_employee_id=actor_employee_id, entity_type="employee", entity_id=emp.id, verb="created", payload={"name": emp.name, "type": emp.employee_type})
session.commit()
return emp
except IntegrityError:
session.rollback()
raise HTTPException(status_code=409, detail="Employee create violates constraints")
session.refresh(emp)
return Employee.model_validate(emp)
@router.patch("/employees/{employee_id}", response_model=Employee)
@@ -97,8 +103,13 @@ def update_employee(employee_id: int, payload: EmployeeUpdate, session: Session
setattr(emp, k, v)
session.add(emp)
session.commit()
session.refresh(emp)
try:
session.flush()
log_activity(session, actor_employee_id=actor_employee_id, entity_type="employee", entity_id=emp.id, verb="updated", payload=data)
session.commit()
return emp
except IntegrityError:
session.rollback()
raise HTTPException(status_code=409, detail="Employee update violates constraints")
session.refresh(emp)
return Employee.model_validate(emp)

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlmodel import Session, select
from sqlalchemy.exc import IntegrityError
@@ -10,6 +10,7 @@ from app.api.utils import log_activity, get_actor_employee_id
from app.db.session import get_session
from app.models.work import Task, TaskComment
from app.schemas.work import TaskCommentCreate, TaskCreate, TaskUpdate
from app.integrations.notify import NotifyContext, notify_openclaw
router = APIRouter(tags=["work"])
@@ -25,7 +26,7 @@ def list_tasks(project_id: int | None = None, session: Session = Depends(get_ses
@router.post("/tasks", response_model=Task)
def create_task(payload: TaskCreate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
def create_task(payload: TaskCreate, background: BackgroundTasks, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
if payload.created_by_employee_id is None:
payload = TaskCreate(**{**payload.model_dump(), "created_by_employee_id": actor_employee_id})
@@ -51,16 +52,19 @@ def create_task(payload: TaskCreate, session: Session = Depends(get_session), ac
raise HTTPException(status_code=409, detail="Task create violates constraints")
session.refresh(task)
background.add_task(notify_openclaw, session, NotifyContext(event="task.created", actor_employee_id=actor_employee_id, task=task))
# Explicitly return a serializable payload (guards against empty {} responses)
return Task.model_validate(task)
@router.patch("/tasks/{task_id}", response_model=Task)
def update_task(task_id: int, payload: TaskUpdate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
def update_task(task_id: int, payload: TaskUpdate, background: BackgroundTasks, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
task = session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
before = {"assignee_employee_id": task.assignee_employee_id, "reviewer_employee_id": task.reviewer_employee_id, "status": task.status}
data = payload.model_dump(exclude_unset=True)
if "status" in data and data["status"] not in ALLOWED_STATUSES:
raise HTTPException(status_code=400, detail="Invalid status")
@@ -79,6 +83,18 @@ def update_task(task_id: int, payload: TaskUpdate, session: Session = Depends(ge
raise HTTPException(status_code=409, detail="Task update violates constraints")
session.refresh(task)
# notify based on meaningful changes
changed = {}
if before.get("assignee_employee_id") != task.assignee_employee_id:
changed["assignee_employee_id"] = {"from": before.get("assignee_employee_id"), "to": task.assignee_employee_id}
background.add_task(notify_openclaw, session, NotifyContext(event="task.assigned", actor_employee_id=actor_employee_id, task=task, changed_fields=changed))
if before.get("status") != task.status:
changed["status"] = {"from": before.get("status"), "to": task.status}
background.add_task(notify_openclaw, session, NotifyContext(event="status.changed", actor_employee_id=actor_employee_id, task=task, changed_fields=changed))
if not changed and data:
background.add_task(notify_openclaw, session, NotifyContext(event="task.updated", actor_employee_id=actor_employee_id, task=task, changed_fields=data))
return Task.model_validate(task)
@@ -106,7 +122,7 @@ def list_task_comments(task_id: int, session: Session = Depends(get_session)):
@router.post("/task-comments", response_model=TaskComment)
def create_task_comment(payload: TaskCommentCreate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
def create_task_comment(payload: TaskCommentCreate, background: BackgroundTasks, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)):
if payload.author_employee_id is None:
payload = TaskCommentCreate(**{**payload.model_dump(), "author_employee_id": actor_employee_id})
@@ -122,4 +138,7 @@ def create_task_comment(payload: TaskCommentCreate, session: Session = Depends(g
raise HTTPException(status_code=409, detail="Comment create violates constraints")
session.refresh(c)
task = session.get(Task, c.task_id)
if task is not None:
background.add_task(notify_openclaw, session, NotifyContext(event="comment.created", actor_employee_id=actor_employee_id, task=task, comment=c))
return TaskComment.model_validate(c)

View File

@@ -0,0 +1,135 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable
from sqlmodel import Session, select
from app.integrations.openclaw import OpenClawClient
from app.models.org import Employee
from app.models.projects import ProjectMember
from app.models.work import Task, TaskComment
@dataclass(frozen=True)
class NotifyContext:
event: str # task.created | task.updated | task.assigned | comment.created | status.changed
actor_employee_id: int
task: Task
comment: TaskComment | None = None
changed_fields: dict | None = None
def _employee_session_keys(session: Session, employee_ids: Iterable[int]) -> list[str]:
ids = sorted({i for i in employee_ids if i is not None})
if not ids:
return []
emps = session.exec(select(Employee).where(Employee.id.in_(ids))).all()
keys: list[str] = []
for e in emps:
if not getattr(e, "notify_enabled", True):
continue
sk = getattr(e, "openclaw_session_key", None)
if sk:
keys.append(sk)
return sorted(set(keys))
def _project_pm_employee_ids(session: Session, project_id: int) -> set[int]:
# Generic, data-driven: PMs are determined by project_members.role.
pms = session.exec(select(ProjectMember).where(ProjectMember.project_id == project_id)).all()
pm_ids: set[int] = set()
for m in pms:
role = (m.role or "").lower()
if role in {"pm", "product", "product_manager", "manager"}:
pm_ids.add(m.employee_id)
return pm_ids
def resolve_recipients(session: Session, ctx: NotifyContext) -> set[int]:
t = ctx.task
recipients: set[int] = set()
if ctx.event == "task.created":
# notify assignee + PMs
if t.assignee_employee_id:
recipients.add(t.assignee_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
elif ctx.event == "task.assigned":
if t.assignee_employee_id:
recipients.add(t.assignee_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
elif ctx.event == "comment.created":
# notify assignee + reviewer + PMs, excluding author
if t.assignee_employee_id:
recipients.add(t.assignee_employee_id)
if t.reviewer_employee_id:
recipients.add(t.reviewer_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
if ctx.comment and ctx.comment.author_employee_id:
recipients.discard(ctx.comment.author_employee_id)
elif ctx.event == "status.changed":
new_status = (getattr(t, "status", None) or "").lower()
if new_status in {"review", "ready_for_review"} and t.reviewer_employee_id:
recipients.add(t.reviewer_employee_id)
recipients |= _project_pm_employee_ids(session, t.project_id)
elif ctx.event == "task.updated":
# conservative: PMs only
recipients |= _project_pm_employee_ids(session, t.project_id)
recipients.discard(ctx.actor_employee_id)
return recipients
def build_message(ctx: NotifyContext) -> str:
t = ctx.task
base = f"Task #{t.id}: {t.title}" if t.id is not None else f"Task: {t.title}"
if ctx.event == "task.assigned":
return f"Assigned: {base}.\nWork ONE task only; update Mission Control with a comment when you make progress."
if ctx.event == "comment.created":
snippet = ""
if ctx.comment and ctx.comment.body:
snippet = ctx.comment.body.strip().replace("\n", " ")
if len(snippet) > 180:
snippet = snippet[:177] + "..."
snippet = f"\nComment: {snippet}"
return f"New comment on {base}.{snippet}\nWork ONE task only; reply/update in Mission Control."
if ctx.event == "status.changed":
return f"Status changed on {base}{t.status}.\nWork ONE task only; update Mission Control with next step."
if ctx.event == "task.created":
return f"New task created: {base}.\nWork ONE task only; add acceptance criteria / next step in Mission Control."
return f"Update on {base}.\nWork ONE task only; update Mission Control."
def notify_openclaw(session: Session, ctx: NotifyContext) -> None:
client = OpenClawClient.from_env()
if client is None:
return
recipient_ids = resolve_recipients(session, ctx)
session_keys = _employee_session_keys(session, recipient_ids)
if not session_keys:
return
message = build_message(ctx)
for sk in session_keys:
try:
client.tools_invoke(
"sessions_send",
{"sessionKey": sk, "message": message},
timeout_s=3.0,
)
except Exception:
# best-effort; never break Mission Control writes
continue

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
import os
from typing import Any
import requests
class OpenClawClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = token
@classmethod
def from_env(cls) -> "OpenClawClient | None":
url = os.environ.get("OPENCLAW_GATEWAY_URL")
token = os.environ.get("OPENCLAW_GATEWAY_TOKEN")
if not url or not token:
return None
return cls(url, token)
def tools_invoke(self, tool: str, args: dict[str, Any], *, session_key: str | None = None, timeout_s: float = 5.0) -> dict[str, Any]:
payload: dict[str, Any] = {"tool": tool, "args": args}
if session_key is not None:
payload["sessionKey"] = session_key
r = requests.post(
f"{self.base_url}/tools/invoke",
headers={"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"},
json=payload,
timeout=timeout_s,
)
r.raise_for_status()
return r.json()

View File

@@ -25,3 +25,7 @@ class Employee(SQLModel, table=True):
title: str | None = None
status: str = Field(default="active")
# OpenClaw integration
openclaw_session_key: str | None = None
notify_enabled: bool = Field(default=True)

View File

@@ -21,6 +21,10 @@ class EmployeeCreate(SQLModel):
title: str | None = None
status: str = "active"
# OpenClaw integration
openclaw_session_key: str | None = None
notify_enabled: bool = True
class EmployeeUpdate(SQLModel):
name: str | None = None
@@ -29,3 +33,7 @@ class EmployeeUpdate(SQLModel):
manager_id: int | None = None
title: str | None = None
status: str | None = None
# OpenClaw integration
openclaw_session_key: str | None = None
notify_enabled: bool | None = None