From 2bcd078c53b68562e1d03c3efaeddb94067e5cca Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Mon, 2 Feb 2026 16:05:18 +0530 Subject: [PATCH] Mission Control: notify OpenClaw via tools/invoke on task events --- ...d52471_employee_openclaw_routing_fields.py | 31 ++++ backend/app/api/org.py | 27 ++-- backend/app/api/work.py | 27 +++- backend/app/integrations/notify.py | 135 ++++++++++++++++++ backend/app/integrations/openclaw.py | 34 +++++ backend/app/models/org.py | 4 + backend/app/schemas/org.py | 8 ++ 7 files changed, 254 insertions(+), 12 deletions(-) create mode 100644 backend/alembic/versions/b8810dd52471_employee_openclaw_routing_fields.py create mode 100644 backend/app/integrations/notify.py create mode 100644 backend/app/integrations/openclaw.py diff --git a/backend/alembic/versions/b8810dd52471_employee_openclaw_routing_fields.py b/backend/alembic/versions/b8810dd52471_employee_openclaw_routing_fields.py new file mode 100644 index 00000000..48cb6164 --- /dev/null +++ b/backend/alembic/versions/b8810dd52471_employee_openclaw_routing_fields.py @@ -0,0 +1,31 @@ +"""employee_openclaw_routing_fields + +Revision ID: b8810dd52471 +Revises: 2b8d1e2c0d01 +Create Date: 2026-02-02 16:03:56.528787 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "b8810dd52471" +down_revision: Union[str, Sequence[str], None] = "2b8d1e2c0d01" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.add_column("employees", sa.Column("openclaw_session_key", sa.String(), nullable=True)) + op.add_column("employees", sa.Column("notify_enabled", sa.Boolean(), nullable=False, server_default=sa.true())) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_column("employees", "notify_enabled") + op.drop_column("employees", "openclaw_session_key") diff --git a/backend/app/api/org.py b/backend/app/api/org.py index 1395f681..092e44b9 100644 --- a/backend/app/api/org.py +++ b/backend/app/api/org.py @@ -79,11 +79,17 @@ def list_employees(session: Session = Depends(get_session)): def create_employee(payload: EmployeeCreate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): emp = Employee(**payload.model_dump()) session.add(emp) - session.commit() + + try: + session.flush() + log_activity(session, actor_employee_id=actor_employee_id, entity_type="employee", entity_id=emp.id, verb="created", payload={"name": emp.name, "type": emp.employee_type}) + session.commit() + except IntegrityError: + session.rollback() + raise HTTPException(status_code=409, detail="Employee create violates constraints") + session.refresh(emp) - log_activity(session, actor_employee_id=actor_employee_id, entity_type="employee", entity_id=emp.id, verb="created", payload={"name": emp.name, "type": emp.employee_type}) - session.commit() - return emp + return Employee.model_validate(emp) @router.patch("/employees/{employee_id}", response_model=Employee) @@ -97,8 +103,13 @@ def update_employee(employee_id: int, payload: EmployeeUpdate, session: Session setattr(emp, k, v) session.add(emp) - session.commit() + try: + session.flush() + log_activity(session, actor_employee_id=actor_employee_id, entity_type="employee", entity_id=emp.id, verb="updated", payload=data) + session.commit() + except IntegrityError: + session.rollback() + raise HTTPException(status_code=409, detail="Employee update violates constraints") + session.refresh(emp) - log_activity(session, actor_employee_id=actor_employee_id, entity_type="employee", entity_id=emp.id, verb="updated", payload=data) - session.commit() - return emp + return Employee.model_validate(emp) diff --git a/backend/app/api/work.py b/backend/app/api/work.py index f2f32d6c..8892d620 100644 --- a/backend/app/api/work.py +++ b/backend/app/api/work.py @@ -2,7 +2,7 @@ from __future__ import annotations from datetime import datetime -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from sqlmodel import Session, select from sqlalchemy.exc import IntegrityError @@ -10,6 +10,7 @@ from app.api.utils import log_activity, get_actor_employee_id from app.db.session import get_session from app.models.work import Task, TaskComment from app.schemas.work import TaskCommentCreate, TaskCreate, TaskUpdate +from app.integrations.notify import NotifyContext, notify_openclaw router = APIRouter(tags=["work"]) @@ -25,7 +26,7 @@ def list_tasks(project_id: int | None = None, session: Session = Depends(get_ses @router.post("/tasks", response_model=Task) -def create_task(payload: TaskCreate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): +def create_task(payload: TaskCreate, background: BackgroundTasks, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): if payload.created_by_employee_id is None: payload = TaskCreate(**{**payload.model_dump(), "created_by_employee_id": actor_employee_id}) @@ -51,16 +52,19 @@ def create_task(payload: TaskCreate, session: Session = Depends(get_session), ac raise HTTPException(status_code=409, detail="Task create violates constraints") session.refresh(task) + background.add_task(notify_openclaw, session, NotifyContext(event="task.created", actor_employee_id=actor_employee_id, task=task)) # Explicitly return a serializable payload (guards against empty {} responses) return Task.model_validate(task) @router.patch("/tasks/{task_id}", response_model=Task) -def update_task(task_id: int, payload: TaskUpdate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): +def update_task(task_id: int, payload: TaskUpdate, background: BackgroundTasks, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): task = session.get(Task, task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") + before = {"assignee_employee_id": task.assignee_employee_id, "reviewer_employee_id": task.reviewer_employee_id, "status": task.status} + data = payload.model_dump(exclude_unset=True) if "status" in data and data["status"] not in ALLOWED_STATUSES: raise HTTPException(status_code=400, detail="Invalid status") @@ -79,6 +83,18 @@ def update_task(task_id: int, payload: TaskUpdate, session: Session = Depends(ge raise HTTPException(status_code=409, detail="Task update violates constraints") session.refresh(task) + + # notify based on meaningful changes + changed = {} + if before.get("assignee_employee_id") != task.assignee_employee_id: + changed["assignee_employee_id"] = {"from": before.get("assignee_employee_id"), "to": task.assignee_employee_id} + background.add_task(notify_openclaw, session, NotifyContext(event="task.assigned", actor_employee_id=actor_employee_id, task=task, changed_fields=changed)) + if before.get("status") != task.status: + changed["status"] = {"from": before.get("status"), "to": task.status} + background.add_task(notify_openclaw, session, NotifyContext(event="status.changed", actor_employee_id=actor_employee_id, task=task, changed_fields=changed)) + if not changed and data: + background.add_task(notify_openclaw, session, NotifyContext(event="task.updated", actor_employee_id=actor_employee_id, task=task, changed_fields=data)) + return Task.model_validate(task) @@ -106,7 +122,7 @@ def list_task_comments(task_id: int, session: Session = Depends(get_session)): @router.post("/task-comments", response_model=TaskComment) -def create_task_comment(payload: TaskCommentCreate, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): +def create_task_comment(payload: TaskCommentCreate, background: BackgroundTasks, session: Session = Depends(get_session), actor_employee_id: int = Depends(get_actor_employee_id)): if payload.author_employee_id is None: payload = TaskCommentCreate(**{**payload.model_dump(), "author_employee_id": actor_employee_id}) @@ -122,4 +138,7 @@ def create_task_comment(payload: TaskCommentCreate, session: Session = Depends(g raise HTTPException(status_code=409, detail="Comment create violates constraints") session.refresh(c) + task = session.get(Task, c.task_id) + if task is not None: + background.add_task(notify_openclaw, session, NotifyContext(event="comment.created", actor_employee_id=actor_employee_id, task=task, comment=c)) return TaskComment.model_validate(c) diff --git a/backend/app/integrations/notify.py b/backend/app/integrations/notify.py new file mode 100644 index 00000000..712d1f63 --- /dev/null +++ b/backend/app/integrations/notify.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable + +from sqlmodel import Session, select + +from app.integrations.openclaw import OpenClawClient +from app.models.org import Employee +from app.models.projects import ProjectMember +from app.models.work import Task, TaskComment + + +@dataclass(frozen=True) +class NotifyContext: + event: str # task.created | task.updated | task.assigned | comment.created | status.changed + actor_employee_id: int + task: Task + comment: TaskComment | None = None + changed_fields: dict | None = None + + +def _employee_session_keys(session: Session, employee_ids: Iterable[int]) -> list[str]: + ids = sorted({i for i in employee_ids if i is not None}) + if not ids: + return [] + + emps = session.exec(select(Employee).where(Employee.id.in_(ids))).all() + keys: list[str] = [] + for e in emps: + if not getattr(e, "notify_enabled", True): + continue + sk = getattr(e, "openclaw_session_key", None) + if sk: + keys.append(sk) + return sorted(set(keys)) + + +def _project_pm_employee_ids(session: Session, project_id: int) -> set[int]: + # Generic, data-driven: PMs are determined by project_members.role. + pms = session.exec(select(ProjectMember).where(ProjectMember.project_id == project_id)).all() + pm_ids: set[int] = set() + for m in pms: + role = (m.role or "").lower() + if role in {"pm", "product", "product_manager", "manager"}: + pm_ids.add(m.employee_id) + return pm_ids + + +def resolve_recipients(session: Session, ctx: NotifyContext) -> set[int]: + t = ctx.task + recipients: set[int] = set() + + if ctx.event == "task.created": + # notify assignee + PMs + if t.assignee_employee_id: + recipients.add(t.assignee_employee_id) + recipients |= _project_pm_employee_ids(session, t.project_id) + + elif ctx.event == "task.assigned": + if t.assignee_employee_id: + recipients.add(t.assignee_employee_id) + recipients |= _project_pm_employee_ids(session, t.project_id) + + elif ctx.event == "comment.created": + # notify assignee + reviewer + PMs, excluding author + if t.assignee_employee_id: + recipients.add(t.assignee_employee_id) + if t.reviewer_employee_id: + recipients.add(t.reviewer_employee_id) + recipients |= _project_pm_employee_ids(session, t.project_id) + if ctx.comment and ctx.comment.author_employee_id: + recipients.discard(ctx.comment.author_employee_id) + + elif ctx.event == "status.changed": + new_status = (getattr(t, "status", None) or "").lower() + if new_status in {"review", "ready_for_review"} and t.reviewer_employee_id: + recipients.add(t.reviewer_employee_id) + recipients |= _project_pm_employee_ids(session, t.project_id) + + elif ctx.event == "task.updated": + # conservative: PMs only + recipients |= _project_pm_employee_ids(session, t.project_id) + + recipients.discard(ctx.actor_employee_id) + return recipients + + +def build_message(ctx: NotifyContext) -> str: + t = ctx.task + base = f"Task #{t.id}: {t.title}" if t.id is not None else f"Task: {t.title}" + + if ctx.event == "task.assigned": + return f"Assigned: {base}.\nWork ONE task only; update Mission Control with a comment when you make progress." + + if ctx.event == "comment.created": + snippet = "" + if ctx.comment and ctx.comment.body: + snippet = ctx.comment.body.strip().replace("\n", " ") + if len(snippet) > 180: + snippet = snippet[:177] + "..." + snippet = f"\nComment: {snippet}" + return f"New comment on {base}.{snippet}\nWork ONE task only; reply/update in Mission Control." + + if ctx.event == "status.changed": + return f"Status changed on {base} → {t.status}.\nWork ONE task only; update Mission Control with next step." + + if ctx.event == "task.created": + return f"New task created: {base}.\nWork ONE task only; add acceptance criteria / next step in Mission Control." + + return f"Update on {base}.\nWork ONE task only; update Mission Control." + + +def notify_openclaw(session: Session, ctx: NotifyContext) -> None: + client = OpenClawClient.from_env() + if client is None: + return + + recipient_ids = resolve_recipients(session, ctx) + session_keys = _employee_session_keys(session, recipient_ids) + if not session_keys: + return + + message = build_message(ctx) + + for sk in session_keys: + try: + client.tools_invoke( + "sessions_send", + {"sessionKey": sk, "message": message}, + timeout_s=3.0, + ) + except Exception: + # best-effort; never break Mission Control writes + continue diff --git a/backend/app/integrations/openclaw.py b/backend/app/integrations/openclaw.py new file mode 100644 index 00000000..26fba7dd --- /dev/null +++ b/backend/app/integrations/openclaw.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import os +from typing import Any + +import requests + + +class OpenClawClient: + def __init__(self, base_url: str, token: str): + self.base_url = base_url.rstrip("/") + self.token = token + + @classmethod + def from_env(cls) -> "OpenClawClient | None": + url = os.environ.get("OPENCLAW_GATEWAY_URL") + token = os.environ.get("OPENCLAW_GATEWAY_TOKEN") + if not url or not token: + return None + return cls(url, token) + + def tools_invoke(self, tool: str, args: dict[str, Any], *, session_key: str | None = None, timeout_s: float = 5.0) -> dict[str, Any]: + payload: dict[str, Any] = {"tool": tool, "args": args} + if session_key is not None: + payload["sessionKey"] = session_key + + r = requests.post( + f"{self.base_url}/tools/invoke", + headers={"Authorization": f"Bearer {self.token}", "Content-Type": "application/json"}, + json=payload, + timeout=timeout_s, + ) + r.raise_for_status() + return r.json() diff --git a/backend/app/models/org.py b/backend/app/models/org.py index 9beb459a..f0d2caac 100644 --- a/backend/app/models/org.py +++ b/backend/app/models/org.py @@ -25,3 +25,7 @@ class Employee(SQLModel, table=True): title: str | None = None status: str = Field(default="active") + + # OpenClaw integration + openclaw_session_key: str | None = None + notify_enabled: bool = Field(default=True) diff --git a/backend/app/schemas/org.py b/backend/app/schemas/org.py index 0079b282..102872a0 100644 --- a/backend/app/schemas/org.py +++ b/backend/app/schemas/org.py @@ -21,6 +21,10 @@ class EmployeeCreate(SQLModel): title: str | None = None status: str = "active" + # OpenClaw integration + openclaw_session_key: str | None = None + notify_enabled: bool = True + class EmployeeUpdate(SQLModel): name: str | None = None @@ -29,3 +33,7 @@ class EmployeeUpdate(SQLModel): manager_id: int | None = None title: str | None = None status: str | None = None + + # OpenClaw integration + openclaw_session_key: str | None = None + notify_enabled: bool | None = None