Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ __pycache__
.databricks
.claude
.cursor
.codex
.bundle
/.vscode/
node_modules/
Expand Down
49 changes: 49 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,55 @@ Use full redeploy instead if:
- Ensure it's plumbed through all layers
- Check `databricks.yml` parameters, stage parameter parsing, and implementation usage

#### 5a. `--var catalog` vs `--params CATALOG` drift
- **Why fragile**: Two separate dials use a catalog name and they can disagree.
- `bundle deploy --var catalog=X` → resolves `${var.catalog}` at deploy
time. Bakes `X` into every DABs-managed resource name (e.g. the `all`
target's `caspers_ops_warehouse` becomes `X-ops-warehouse`), every
AI/BI dashboard name, every dashboard `dataset_catalog`, and every job
parameter `default: ${var.catalog}...` (incl. `CATALOG`,
`REFUND_AGENT_APP_NAME`, `COMPLAINT_AGENT_APP_NAME`,
`OPS_WAREHOUSE_NAME`, `SUPERVISOR_ENDPOINT_NAME`).
- `bundle run caspers --params "CATALOG=Y"` → overrides ONLY the
run-time `CATALOG` widget value inside stage notebooks. Cannot rename
anything DABs already created.
- **Symptom when they disagree**: stages that reconstruct a DABs-managed
resource name from the run-time `CATALOG` widget fail to find it.
Example: deploying with the default and then running
`--params CATALOG=mycatalog` against the `all` target makes
`stages/operational_app.ipynb` fail with
`RuntimeError: Warehouse 'mycatalog-ops-warehouse' not found` because
DABs created `caspersdev-ops-warehouse`.
- **Best practice**:
- When in doubt, pass the same catalog to both:
`bundle deploy -t <target> --var catalog=X` then
`bundle run caspers --params "CATALOG=X"`.
- When adding a stage that needs the name of a DABs-managed resource (or
of an agent App the agent stages deployed), do NOT reconstruct it from
the `CATALOG` widget. Add a dedicated job parameter with a
`${var.catalog}-...` default in `databricks.yml` (this is how
`OPS_WAREHOUSE_NAME`, `REFUND_AGENT_APP_NAME` and
`COMPLAINT_AGENT_APP_NAME` are wired), then read that parameter via
`dbutils.widgets.get(...)` in the stage. The deploy-time value rides
through the job parameter into the run-time widget, so the two dials
physically cannot disagree.
- The agent App names additionally pass through
`utils/agent_app_client.resolve_agent_app_name(...)`, which prefers the
baked param and re-sanitises to the Databricks Apps name rules; the
`all`-target param block in `databricks.yml` (with its
deploy-time-vs-run-time comment) is the canonical example of this pattern.

#### 5b. Unity AI Gateway endpoint name (`AI_GATEWAY_ENDPOINT_NAME`)
- **Distinct from `LLM_MODEL`**: `LLM_MODEL` is the foundation model the
generators / support agent call directly. `AI_GATEWAY_ENDPOINT_NAME` is the
governed Unity AI Gateway endpoint the Refund + Complaint **App** agents send
every LLM call through (gateway-always-on, no model-serving fallback). It is
sent verbatim as the request `model` to `<host>/ai-gateway/mlflow/v1`.
- **Manual setup**: the v2 Beta gateway is UI-created only and has no
permissions API, so `CAN_QUERY` must be granted to **each agent App's service
principal** by hand (the App SP is not in `account users`). See step 5 of
`demos/dais2026-runbooks/SETUP.ipynb`.

### 6. Resource Dependencies
- **Why fragile**: Stages create resources that others depend on (endpoints, tables, etc.)
- **When touching**: Creation/deletion order, stage dependencies
Expand Down
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,37 @@ Available targets:
| `free` | Data generation, Lakeflow pipeline (Free Edition compatible) |
| `all` | Everything end-to-end: refund + complaints + Operational Dashboard (3 Genies + 6 Knowledge Assistants + Multi-Agent Supervisor + Lakebase-backed FastAPI app) |

Optionally specify a catalog (default: `caspersdev`):
Optionally specify a catalog (default: `caspersdev`). There are **two** dials
that take a catalog name and they must agree:

| Dial | When | What it controls |
|---|---|---|
| `bundle deploy --var catalog=<name>` | deploy time | the catalog baked into every DABs-managed resource — the `all` target's `caspers_ops_warehouse` SQL warehouse, AI/BI dashboard names, dashboard `dataset_catalog`, and the *default* value of every job parameter that uses `${var.catalog}` (including `CATALOG`, `REFUND_AGENT_APP_NAME`, `COMPLAINT_AGENT_APP_NAME`, `OPS_WAREHOUSE_NAME`, etc.) |
| `bundle run caspers --params "CATALOG=<name>"` | run time | only the value of the `CATALOG` widget inside stage notebooks. Cannot rename anything DABs already created. |

If they disagree (e.g. `bundle deploy -t all` with the default + `bundle run
--params CATALOG=mycatalog`), the `all` target will fail at the
`Operational_App` stage because the warehouse DABs created (`caspersdev-ops-warehouse`)
is not what the stage looks up (`mycatalog-ops-warehouse`). The fix is to
pass the same catalog to both:

```bash
databricks bundle deploy -t all --var catalog=mycatalog
databricks bundle run caspers --params "CATALOG=mycatalog"
```

For targets other than `all` (no DABs-owned warehouse/dashboards),
`--params CATALOG=mycatalog` alone usually works, but passing both keeps the
deploy-time and run-time catalogs in sync and is the safer habit.

> **Agents on the `all` target run as Databricks Apps through Unity AI
> Gateway.** The Refund and Complaint agents are deployed as Apps
> (`apps/refund-agent`, `apps/complaint-agent`) and route every LLM call
> through a UI-created Unity AI Gateway endpoint (`AI_GATEWAY_ENDPOINT_NAME`).
> The gateway and its `CAN_QUERY` grant to each agent App's service principal
> are manual one-time setup — see step 5 of
> `demos/dais2026-runbooks/SETUP.ipynb`.

## Clean Up

```bash
Expand Down
10 changes: 7 additions & 3 deletions apps/caspers-ops-dashboard/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ env:
# #page= fragment and the user sees the dashboard's default landing page.
- name: OPS_DASHBOARD_PAGE
value: ''
# Custom agent endpoints — populated by operational_app stage from CATALOG
- name: REFUND_AGENT_ENDPOINT
# Custom agent Apps — populated by operational_app stage from CATALOG
- name: REFUND_AGENT_APP_NAME
value: ''
- name: COMPLAINT_AGENT_ENDPOINT
- name: REFUND_AGENT_APP_URL
value: ''
- name: COMPLAINT_AGENT_APP_NAME
value: ''
- name: COMPLAINT_AGENT_APP_URL
value: ''
- name: REFUND_MANAGER_APP_URL
value: ''
Expand Down
141 changes: 75 additions & 66 deletions apps/caspers-ops-dashboard/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
SUPERVISOR_ENDPOINT = os.environ.get("SUPERVISOR_ENDPOINT", "")
SUPERVISOR_TILE_ID = os.environ.get("SUPERVISOR_TILE_ID", "") # written by operational_lakebase stage
SUPERVISOR_MLFLOW_EXP_ID = os.environ.get("SUPERVISOR_MLFLOW_EXPERIMENT_ID", "") # written by operational_lakebase stage
REFUND_AGENT_ENDPOINT = os.environ.get("REFUND_AGENT_ENDPOINT", "")
COMPLAINT_AGENT_ENDPOINT = os.environ.get("COMPLAINT_AGENT_ENDPOINT", "")
REFUND_AGENT_APP_NAME = os.environ.get("REFUND_AGENT_APP_NAME", "")
REFUND_AGENT_APP_URL = os.environ.get("REFUND_AGENT_APP_URL", "")
COMPLAINT_AGENT_APP_NAME = os.environ.get("COMPLAINT_AGENT_APP_NAME", "")
COMPLAINT_AGENT_APP_URL = os.environ.get("COMPLAINT_AGENT_APP_URL", "")
REFUND_MANAGER_APP_URL = os.environ.get("REFUND_MANAGER_APP_URL", "")
SUPPORT_CONSOLE_APP_URL = os.environ.get("SUPPORT_CONSOLE_APP_URL", "")
LAKEBASE_INSTANCE = os.environ.get("LAKEBASE_ENDPOINT_PATH", "") # non-empty = DB enabled
Expand Down Expand Up @@ -315,8 +317,10 @@ def _embed_url(d_id: str) -> str:
"warehouse_id": WAREHOUSE_ID,
"db_enabled": bool(LAKEBASE_INSTANCE),
"supervisor_enabled": bool(SUPERVISOR_ENDPOINT),
"refund_agent_endpoint": REFUND_AGENT_ENDPOINT,
"complaint_agent_endpoint": COMPLAINT_AGENT_ENDPOINT,
"refund_agent_app_name": REFUND_AGENT_APP_NAME,
"refund_agent_app_url": REFUND_AGENT_APP_URL,
"complaint_agent_app_name": COMPLAINT_AGENT_APP_NAME,
"complaint_agent_app_url": COMPLAINT_AGENT_APP_URL,
"refund_manager_app_url": REFUND_MANAGER_APP_URL,
"support_console_app_url": SUPPORT_CONSOLE_APP_URL,
"mlflow_experiment_id": mlflow_experiment_id,
Expand Down Expand Up @@ -1047,16 +1051,38 @@ class ComplaintRequest(BaseModel):
order_id: str = ""


def _call_agent_endpoint(endpoint_name: str, payload: dict) -> dict:
"""Call a model serving endpoint (ChatAgent or ResponsesAgent) and return the parsed response body."""
url = f"{(_sdk_config.host or '').rstrip('/')}/serving-endpoints/{endpoint_name}/invocations"
def _agent_app_url(app_name: str, configured_url: str) -> str:
if configured_url:
return configured_url.rstrip("/")
if app_name:
app_info = _ws.apps.get(app_name)
url = getattr(app_info, "url", "") or ""
if url:
return url.rstrip("/")
return ""


def _call_agent_app(app_name: str, configured_url: str, payload: dict) -> dict:
"""Call a DAIS custom agent Databricks App via MLflow AgentServer /responses."""
base_url = _agent_app_url(app_name, configured_url)
if not base_url:
raise HTTPException(status_code=503, detail=f"Agent app {app_name or '(unknown)'} not configured.")
url = f"{base_url}/responses"
headers = {"Content-Type": "application/json"}
headers.update(_sdk_config.authenticate())
resp = httpx.post(url, headers=headers, json=payload, timeout=120.0)
resp.raise_for_status()
return resp.json()


def _extract_agent_output_text(data: dict) -> str:
try:
text = data["output"][0]["content"][0]["text"]
except (KeyError, IndexError, TypeError):
return ""
return text if isinstance(text, str) else ""


def _build_refund_user_message(req: "RefundRequest") -> str:
"""Compose the user message sent to the refund agent.

Expand Down Expand Up @@ -1084,52 +1110,42 @@ def _build_refund_user_message(req: "RefundRequest") -> str:
@app.post("/api/refund")
def refund(req: RefundRequest):
"""Call the refund agent for a given order_id and return a structured decision."""
if not REFUND_AGENT_ENDPOINT:
raise HTTPException(status_code=503, detail="Refund agent endpoint not configured.")
if not (REFUND_AGENT_APP_NAME or REFUND_AGENT_APP_URL):
raise HTTPException(status_code=503, detail="Refund agent app not configured.")
try:
user_msg = _build_refund_user_message(req)
data = _call_agent_endpoint(
REFUND_AGENT_ENDPOINT,
{"messages": [{"role": "user", "content": user_msg}]},
data = _call_agent_app(
REFUND_AGENT_APP_NAME,
REFUND_AGENT_APP_URL,
{"input": [{"role": "user", "content": user_msg}]},
)
# Extract the last assistant message from the ChatAgent response
messages = data.get("messages") or []
for msg in reversed(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role == "assistant" and content:
# Robustly extract a JSON object from the assistant message:
# the agent's prompt asks for raw JSON, but LLMs often wrap it in
# ```json … ``` fences or sprinkle commentary around it. Try a
# bare json.loads first, then fall back to the first {...} match.
cleaned = content.strip()
if cleaned.startswith("```"):
# strip markdown code fence (```json … ``` or ``` … ```)
cleaned = cleaned.strip("`")
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:]
cleaned = cleaned.strip()
decision = None
content = _extract_agent_output_text(data)
if not content:
raise HTTPException(status_code=502, detail="No output in refund agent response.")
cleaned = content.strip()
if cleaned.startswith("```"):
cleaned = cleaned.strip("`")
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:]
cleaned = cleaned.strip()
decision = None
try:
decision = json.loads(cleaned)
except Exception:
m = re.search(r"\{[\s\S]*\}", cleaned)
if m:
try:
decision = json.loads(cleaned)
decision = json.loads(m.group(0))
except Exception:
import re as _re
m = _re.search(r"\{[\s\S]*\}", cleaned)
if m:
try:
decision = json.loads(m.group(0))
except Exception:
decision = None
if decision is not None:
return {
"order_id": req.order_id,
"refund_usd": float(decision.get("refund_usd", 0)),
"refund_class": decision.get("refund_class", "none"),
"reason": decision.get("reason", ""),
}
# Last-resort fallback: surface raw text so the UI can show something.
return {"order_id": req.order_id, "raw": content}
raise HTTPException(status_code=502, detail="No assistant message in refund agent response.")
decision = None
if decision is not None:
return {
"order_id": req.order_id,
"refund_usd": float(decision.get("refund_usd", 0)),
"refund_class": decision.get("refund_class", "none"),
"reason": decision.get("reason", ""),
}
return {"order_id": req.order_id, "raw": content}
except HTTPException:
raise
except Exception as e:
Expand All @@ -1140,25 +1156,18 @@ def refund(req: RefundRequest):
@app.post("/api/complaint")
def complaint(req: ComplaintRequest):
"""Call the complaint agent for a raw complaint text and return a structured classification."""
if not COMPLAINT_AGENT_ENDPOINT:
raise HTTPException(status_code=503, detail="Complaint agent endpoint not configured.")
if not (COMPLAINT_AGENT_APP_NAME or COMPLAINT_AGENT_APP_URL):
raise HTTPException(status_code=503, detail="Complaint agent app not configured.")
content = req.complaint_text
if req.order_id:
content = f"{content} (Order ID: {req.order_id})"
try:
data = _call_agent_endpoint(
COMPLAINT_AGENT_ENDPOINT,
data = _call_agent_app(
COMPLAINT_AGENT_APP_NAME,
COMPLAINT_AGENT_APP_URL,
{"input": [{"role": "user", "content": content}]},
)
# ResponsesAgent returns output list or choices
output_text = ""
for out in data.get("output", []):
for part in (out.get("content") or []):
output_text += part.get("text", "")
if not output_text:
choices = data.get("choices") or []
if choices:
output_text = (choices[0].get("message") or {}).get("content", "")
output_text = _extract_agent_output_text(data)
if output_text:
try:
result = json.loads(output_text)
Expand Down Expand Up @@ -1276,14 +1285,14 @@ def list_agents():
"url": url, "id": tile_id})

_CUSTOM_AGENTS = [
{"name": "Refund Agent", "icon": "💳", "endpoint": REFUND_AGENT_ENDPOINT},
{"name": "Complaint Agent", "icon": "📬", "endpoint": COMPLAINT_AGENT_ENDPOINT},
{"name": "Refund Agent", "icon": "💳", "app_name": REFUND_AGENT_APP_NAME, "app_url": REFUND_AGENT_APP_URL},
{"name": "Complaint Agent", "icon": "📬", "app_name": COMPLAINT_AGENT_APP_NAME, "app_url": COMPLAINT_AGENT_APP_URL},
]
for ca in _CUSTOM_AGENTS:
ep = ca["endpoint"]
url = f"{host}/ml/endpoints/{ep}" if ep and host else ""
app_name = ca["app_name"]
url = ca["app_url"] or (f"{host}/apps/{app_name}" if app_name and host else "")
agents.append({"name": ca["name"], "icon": ca["icon"], "type": "agent",
"url": url, "id": ep})
"url": url, "id": app_name})

return agents

Expand Down
Loading