refactor: update internal helpers and improve slugify function usage

This commit is contained in:
Abhimanyu Saharan
2026-02-11 00:27:44 +05:30
parent b038d0df4c
commit f4161494d9
6 changed files with 83 additions and 198 deletions

View File

@@ -8,12 +8,10 @@ DB-backed workflows (template sync, lead-agent record creation) live in
from __future__ import annotations
import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
from uuid import uuid4
from jinja2 import Environment, FileSystemLoader, StrictUndefined, select_autoescape
@@ -40,7 +38,8 @@ from app.services.openclaw.gateway_rpc import (
openclaw_call,
send_message,
)
from app.services.openclaw.internal import agent_key as _agent_key
from app.services.openclaw.internal.agent_key import agent_key as _agent_key
from app.services.openclaw.internal.agent_key import slugify
from app.services.openclaw.shared import GatewayAgentIdentity
if TYPE_CHECKING:
@@ -78,11 +77,6 @@ def _templates_root() -> Path:
return _repo_root() / "templates"
def _slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or uuid4().hex
def _heartbeat_config(agent: Agent) -> dict[str, Any]:
merged = DEFAULT_HEARTBEAT_CONFIG.copy()
if isinstance(agent.heartbeat_config, dict):
@@ -134,7 +128,58 @@ def _workspace_path(agent: Agent, workspace_root: str) -> str:
# lead agents (session key includes board id) even if multiple boards share the same
# display name (e.g. "Lead Agent").
key = _agent_key(agent)
return f"{root}/workspace-{_slugify(key)}"
return f"{root}/workspace-{slugify(key)}"
def _preferred_name(user: User | None) -> str:
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
return preferred_name
def _user_context(user: User | None) -> dict[str, str]:
return {
"user_name": (user.name or "") if user else "",
"user_preferred_name": _preferred_name(user),
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
"user_context": (user.context or "") if user else "",
}
def _normalized_identity_profile(agent: Agent) -> dict[str, str]:
identity_profile: dict[str, Any] = {}
if isinstance(agent.identity_profile, dict):
identity_profile = agent.identity_profile
normalized_identity: dict[str, str] = {}
for key, value in identity_profile.items():
if value is None:
continue
if isinstance(value, list):
parts = [str(item).strip() for item in value if str(item).strip()]
if not parts:
continue
normalized_identity[key] = ", ".join(parts)
continue
text = str(value).strip()
if text:
normalized_identity[key] = text
return normalized_identity
def _identity_context(agent: Agent) -> dict[str, str]:
normalized_identity = _normalized_identity_profile(agent)
identity_context = {
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
extra_identity_context = {
context_key: normalized_identity.get(field, "")
for field, context_key in EXTRA_IDENTITY_PROFILE_FIELDS.items()
}
return {**identity_context, **extra_identity_context}
def _build_context(
@@ -153,33 +198,8 @@ def _build_context(
session_key = agent.openclaw_session_id or ""
base_url = settings.base_url or "REPLACE_WITH_BASE_URL"
main_session_key = GatewayAgentIdentity.session_key(gateway)
identity_profile: dict[str, Any] = {}
if isinstance(agent.identity_profile, dict):
identity_profile = agent.identity_profile
normalized_identity: dict[str, str] = {}
for key, value in identity_profile.items():
if value is None:
continue
if isinstance(value, list):
parts = [str(item).strip() for item in value if str(item).strip()]
if not parts:
continue
normalized_identity[key] = ", ".join(parts)
continue
text = str(value).strip()
if text:
normalized_identity[key] = text
identity_context = {
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
extra_identity_context = {
context_key: normalized_identity.get(field, "")
for field, context_key in EXTRA_IDENTITY_PROFILE_FIELDS.items()
}
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
identity_context = _identity_context(agent)
user_context = _user_context(user)
return {
"agent_name": agent.name,
"agent_id": agent_id,
@@ -197,14 +217,8 @@ def _build_context(
"auth_token": auth_token,
"main_session_key": main_session_key,
"workspace_root": workspace_root,
"user_name": (user.name or "") if user else "",
"user_preferred_name": preferred_name,
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
"user_context": (user.context or "") if user else "",
**user_context,
**identity_context,
**extra_identity_context,
}
@@ -215,33 +229,8 @@ def _build_main_context(
user: User | None,
) -> dict[str, str]:
base_url = settings.base_url or "REPLACE_WITH_BASE_URL"
identity_profile: dict[str, Any] = {}
if isinstance(agent.identity_profile, dict):
identity_profile = agent.identity_profile
normalized_identity: dict[str, str] = {}
for key, value in identity_profile.items():
if value is None:
continue
if isinstance(value, list):
parts = [str(item).strip() for item in value if str(item).strip()]
if not parts:
continue
normalized_identity[key] = ", ".join(parts)
continue
text = str(value).strip()
if text:
normalized_identity[key] = text
identity_context = {
context_key: normalized_identity.get(field, DEFAULT_IDENTITY_PROFILE[field])
for field, context_key in IDENTITY_PROFILE_FIELDS.items()
}
extra_identity_context = {
context_key: normalized_identity.get(field, "")
for field, context_key in EXTRA_IDENTITY_PROFILE_FIELDS.items()
}
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
identity_context = _identity_context(agent)
user_context = _user_context(user)
return {
"agent_name": agent.name,
"agent_id": str(agent.id),
@@ -250,14 +239,8 @@ def _build_main_context(
"auth_token": auth_token,
"main_session_key": GatewayAgentIdentity.session_key(gateway),
"workspace_root": gateway.workspace_root or "",
"user_name": (user.name or "") if user else "",
"user_preferred_name": preferred_name,
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
"user_context": (user.context or "") if user else "",
**user_context,
**identity_context,
**extra_identity_context,
}