diff --git a/noom-mcp-server/.env.example b/noom-mcp-server/.env.example index 3a33aafc..c9f853ff 100644 --- a/noom-mcp-server/.env.example +++ b/noom-mcp-server/.env.example @@ -16,8 +16,20 @@ DATABRICKS_HOST=https://noom-prod.cloud.databricks.com # SQL execution workspace — must match DATABRICKS_HOST DATABRICKS_MCP_SQL_HOST=https://noom-prod.cloud.databricks.com +# SQL warehouse — all queries are forced to run on this warehouse +# Find it in Databricks UI: +# * SQL Warehouses → Pick your warehouse. +# * In the Overview, you can see the ID by the warehouse name +DATABRICKS_WAREHOUSE_ID= + # SQL warehouse — all queries are forced to run on this warehouse. # The default below is the shared Noom production warehouse. Override if # you need a different warehouse (find its ID in Databricks UI under # SQL Warehouses → your warehouse → Overview). DATABRICKS_WAREHOUSE_ID=575c0a43969584a4 + +# Export directory (optional) — where export_query_to_file writes result files. +# All writes are confined to this directory; paths that escape it are rejected. +# Files persist until you delete them (no automatic cleanup). +# Defaults to ~/databricks-mcp-exports if unset. +# DATABRICKS_MCP_EXPORT_DIR=/path/to/exports diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py new file mode 100644 index 00000000..37d027c2 --- /dev/null +++ b/noom-mcp-server/customization/export_query_patch.py @@ -0,0 +1,455 @@ +"""Export-query-to-file tool. + +Adds a customization-layer MCP tool, ``export_query_to_file``, that runs a SQL +query and writes the **full** result set to a local file (CSV today), returning +only a small manifest (``{path, row_count, bytes, columns, format}``) to the +caller. + +Why this exists +--------------- +The upstream ``execute_sql`` tool serializes results back *through the MCP +protocol* into the model's context. That is correct for interactive reads, but +it is the wrong shape for bulk extraction feeding a downstream model or local +tooling (data viz, notebooks): even a modest result set blows up the context +window, and large results trip the protocol's size limit. + +This tool keeps the bytes out of the model entirely. The **server process** +(which has local filesystem access) downloads the result and writes the file; +the model only ever receives a manifest. Local tools then read the file off +disk. Pass the path, not the payload. + +Why not wrap SQLExecutor.execute() +---------------------------------- +``SQLExecutor.execute()`` uses the Statement Execution API with the **INLINE** +disposition and ``_extract_results`` reads only ``result.data_array`` — the +*first chunk*. It never paginates, so it (a) materializes rows as a list of +dicts in server RAM and (b) silently truncates large results to the first +chunk. Both are unacceptable for a model-feeding extraction path. + +Instead this tool issues the statement through the same governed SP client but +with the **EXTERNAL_LINKS** disposition and ``format=CSV``. Databricks writes +the result to cloud storage as CSV chunks and returns presigned URLs; the +server streams those chunks straight to the local file. This is memory-bounded +(one chunk at a time), never truncates, and avoids any lossy reformatting since +Databricks emits the CSV itself. + +Governance +---------- +Execution reuses the existing governance chokepoints from +``sql_executor_patch``: + - ``get_sql_sp_client()`` → Service Principal credentials + - ``get_sql_warehouse_id()`` → forced warehouse override + - ``get_mcp_user_identity()`` → ``mcp_user:`` query tag + +so exports show up in ``system.query.history`` exactly like ``execute_sql`` +calls, plus a ``mcp_tool:export_query_to_file`` tag for filtering. + +Path safety +----------- +For broad distribution the tool MUST NOT write to arbitrary local paths. All +writes are confined to a single base directory: + - ``DATABRICKS_MCP_EXPORT_DIR`` if set, else + - ``~/databricks-mcp-exports`` (created on demand). +``output_path`` is interpreted relative to that base; absolute paths and ``..`` +traversal that escape the base are rejected. +""" + +import csv +import logging +import os +import tempfile +import time +import urllib.request +from pathlib import Path +from typing import Any, Dict, Iterable, Iterator, List, Optional + +logger = logging.getLogger(__name__) + +# Wall-clock ceiling (seconds) for the export tool call. Bulk exports of large +# result sets can legitimately run longer than an interactive query, so this +# matches the raised SQL ceiling rather than the upstream 60s default. +EXPORT_TOOL_TIMEOUT_CEILING = 600 + +# Default per-call statement timeout (seconds) — overridable via the tool's +# ``timeout`` argument, bounded by the ceiling above. +_DEFAULT_STATEMENT_TIMEOUT = 300 + +_SUPPORTED_FORMATS = ("csv",) + + +# --------------------------------------------------------------------------- +# Path sandboxing +# --------------------------------------------------------------------------- + + +def get_export_base_dir() -> Path: + """Return the (created) base directory all exports are confined to. + + ``DATABRICKS_MCP_EXPORT_DIR`` if set, else ``~/databricks-mcp-exports``. + """ + configured = os.environ.get("DATABRICKS_MCP_EXPORT_DIR") + base = Path(configured).expanduser() if configured else Path.home() / "databricks-mcp-exports" + base = base.resolve() + base.mkdir(parents=True, exist_ok=True) + return base + + +def resolve_export_path(output_path: str, base_dir: Optional[Path] = None) -> Path: + """Resolve ``output_path`` against the export base dir, rejecting escapes. + + Args: + output_path: Caller-supplied path, interpreted relative to the base dir. + Absolute paths are allowed only if they fall inside the base dir. + base_dir: Override for the base dir (tests). Defaults to + ``get_export_base_dir()``. + + Returns: + The absolute, sandbox-checked destination path. + + Raises: + ValueError: If the resolved path escapes the base dir, or the filename + is empty / not a recognised extension. + """ + if not output_path or not output_path.strip(): + raise ValueError("output_path must be a non-empty filename.") + + base = (base_dir or get_export_base_dir()).resolve() + + candidate = Path(output_path) + combined = candidate if candidate.is_absolute() else base / candidate + resolved = combined.resolve() + + # Containment check: resolved must be strictly under the base dir. + if base not in resolved.parents: + # Either it escapes the base dir, or it *is* the base dir (no filename). + if resolved == base: + raise ValueError("output_path must include a filename, not just the export directory.") + raise ValueError( + f"output_path {output_path!r} resolves outside the export directory " + f"({base}). Writes are confined to that directory; use a relative path " + f"or set DATABRICKS_MCP_EXPORT_DIR." + ) + return resolved + + +# --------------------------------------------------------------------------- +# Result streaming +# --------------------------------------------------------------------------- + + +def _iter_external_links(client, statement_id: str, first_result) -> Iterator[Any]: + """Yield external links in row order, following the chunk chain lazily. + + Laziness is deliberate. The presigned ``external_link`` URLs are short-lived + (Databricks issues them per response and they expire — observed ~15 min). If + we collected every link up front and only then downloaded, a long multi-chunk + export could outlive the links issued first. By yielding each link as its + chunk is fetched — and having the caller download it immediately — every URL + is freshly issued when used, so the at-risk window is one chunk, not the + whole export. + + Within a single response, links are ordered by ``chunk_index`` (the chunk + chain itself is ordered via ``next_chunk_index``). + """ + result = first_result + while result is not None and result.external_links: + for link in sorted( + result.external_links, key=lambda link: (link.chunk_index or 0, link.row_offset or 0) + ): + yield link + nxt = result.next_chunk_index + if nxt is None: + break + result = client.statement_execution.get_statement_result_chunk_n( + statement_id=statement_id, chunk_index=nxt + ) + + +def _download_link(link) -> bytes: + """Download one presigned external link, honouring its http_headers. + + The link is a presigned cloud-storage URL — Databricks workspace auth must + NOT be attached; only the headers the link itself specifies are sent. + """ + req = urllib.request.Request(link.external_link) + for key, value in (link.http_headers or {}).items(): + req.add_header(key, value) + with urllib.request.urlopen(req) as resp: # noqa: S310 - presigned Databricks URL + return resp.read() + + +def _write_csv( + dest: Path, + columns: List[str], + links: Iterable[Any], + overwrite: bool = False, + base_dir: Optional[Path] = None, +) -> None: + """Stream each CSV chunk's bytes to ``dest`` in row order, atomically. + + Databricks EXTERNAL_LINKS + CSV chunks already include the column header + (in the first chunk), so the chunk bytes are written as-is — synthesizing a + header would duplicate it. + + ``links`` is an iterable, consumed lazily: each link is downloaded as it is + produced, so when it is the lazy chunk-chain generator + (``_iter_external_links``) the presigned URL is freshly issued at download + time rather than minutes earlier. + + The one exception is an empty result: the iterable yields nothing, so a + header-only CSV is written (derived from the manifest schema) so the file is + still valid and carries the column names for downstream tools. + + Atomicity: the data is streamed to a temp file in the destination directory + and moved into place only after the *last* chunk is written. If a chunk + download fails (or the tool's wall-clock ceiling fires mid-stream), the temp + file is removed and ``dest`` is left untouched — never a partial or empty + file at the real path. A half-written CSV that looks complete is exactly the + silent corruption this tool exists to prevent. + + Overwrite guard: the existence check in the caller runs before a long query, + so it can't be the authoritative guard (the file may appear while the query + is in flight). When ``overwrite`` is False the move uses ``os.link``, which + fails atomically if ``dest`` already exists — closing that TOCTOU window. + When True it uses ``os.replace`` (atomic clobber). + + Sandbox re-check: ``resolve_export_path`` validated containment before the + query, but ``dest.parent`` could have become a symlink pointing outside the + sandbox in the meantime. When ``base_dir`` is given, the destination's parent + is re-resolved and re-checked for containment immediately before writing, so + the write can't follow such a symlink out of the export dir. + """ + if base_dir is not None: + base = base_dir.resolve() + real_parent = dest.parent.resolve() + if real_parent != base and base not in real_parent.parents: + raise ValueError( + f"refusing to write {dest}: its directory resolves outside the export " + f"sandbox ({base}) — a symlink may have been introduced after the path " + f"was validated." + ) + + dest.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp(dir=dest.parent, prefix=f".{dest.name}.", suffix=".part") + tmp = Path(tmp_name) + try: + wrote_any = False + with os.fdopen(fd, "wb") as fh: + for link in links: + fh.write(_download_link(link)) + wrote_any = True + if not wrote_any: + fh.write(_csv_header_line(columns).encode("utf-8")) + if overwrite: + os.replace(tmp, dest) # atomic clobber, same filesystem + else: + # Atomic no-clobber: fails if dest exists, even if it appeared during + # the query (after the caller's early existence check). + try: + os.link(tmp, dest) + except FileExistsError as exc: + raise ValueError( + f"{dest} already exists. Pass overwrite=true to replace it." + ) from exc + tmp.unlink(missing_ok=True) # for the os.link path; no-op after os.replace + except BaseException: + # Includes timeouts/cancellation: never leave a partial file behind, + # and never clobber a prior good export at `dest`. + tmp.unlink(missing_ok=True) + raise + + +def _csv_header_line(columns: List[str]) -> str: + import io + + buf = io.StringIO() + csv.writer(buf, lineterminator="\n").writerow(columns) + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# Tool implementation +# --------------------------------------------------------------------------- + + +def _clamp_statement_timeout(timeout: int) -> int: + """Bound the per-call statement timeout by the tool's wall-clock ceiling. + + A caller-supplied timeout above ``EXPORT_TOOL_TIMEOUT_CEILING`` would let + FastMCP abort the tool call before the poll loop's own cancellation fires, + leaving the statement running on the warehouse. Clamping keeps our cancel + within the ceiling. Logs when a value is reduced. + """ + if timeout > EXPORT_TOOL_TIMEOUT_CEILING: + logger.warning( + "export timeout %ss exceeds the %ss ceiling; clamping.", + timeout, + EXPORT_TOOL_TIMEOUT_CEILING, + ) + return EXPORT_TOOL_TIMEOUT_CEILING + return timeout + + +def _run_export( + sql_query: str, + output_path: str, + fmt: str, + catalog: Optional[str], + schema: Optional[str], + overwrite: bool, + timeout: int, +) -> Dict[str, Any]: + """Core logic, separated from the FastMCP wrapper for testability.""" + from databricks.sdk.service.sql import Disposition, Format, QueryTag, StatementState + + # Import governance helpers lazily (and from the sibling patch) so this + # module is importable without the SP environment configured (e.g. tests). + from customization.sql_executor_patch import ( + get_mcp_user_identity, + get_sql_sp_client, + get_sql_warehouse_id, + ) + + fmt = fmt.lower() + if fmt not in _SUPPORTED_FORMATS: + raise ValueError(f"Unsupported format {fmt!r}. Supported: {', '.join(_SUPPORTED_FORMATS)}.") + + # Clamp the per-call timeout to the tool's wall-clock ceiling (see + # _clamp_statement_timeout): a value above the ceiling would let FastMCP + # abort the tool call before our own cancellation fires, orphaning the + # statement on the warehouse. The poll loop below cancels at this bound. + timeout = _clamp_statement_timeout(timeout) + + base = get_export_base_dir() + dest = resolve_export_path(output_path, base) + if dest.exists() and not overwrite: + raise ValueError(f"{dest} already exists. Pass overwrite=true to replace it.") + + warehouse_id = get_sql_warehouse_id() + identity = get_mcp_user_identity() # resolve under user creds, before SP client use + client = get_sql_sp_client() + + query_tags = [ + QueryTag(key="mcp_user", value=identity), + QueryTag(key="mcp_tool", value="export_query_to_file"), + ] + + response = client.statement_execution.execute_statement( + statement=sql_query, + warehouse_id=warehouse_id, + disposition=Disposition.EXTERNAL_LINKS, + format=Format.CSV, + wait_timeout="0s", + catalog=catalog, + schema=schema, + query_tags=query_tags, + ) + statement_id = response.statement_id + + poll_interval = 2 + elapsed = 0 + status = response + while True: + state = status.status.state + if state == StatementState.SUCCEEDED: + break + if state in (StatementState.FAILED, StatementState.CANCELED, StatementState.CLOSED): + msg = "" + if status.status and status.status.error and status.status.error.message: + msg = status.status.error.message + raise RuntimeError(f"Export query did not succeed (state={state}). {msg}".strip()) + if elapsed >= timeout: + try: + client.statement_execution.cancel_execution(statement_id=statement_id) + finally: + raise RuntimeError( + f"Export query timed out after {timeout}s and was canceled " + f"(statement_id={statement_id})." + ) + time.sleep(poll_interval) + elapsed += poll_interval + status = client.statement_execution.get_statement(statement_id=statement_id) + + manifest = status.manifest + columns = ( + [col.name for col in manifest.schema.columns] + if manifest and manifest.schema and manifest.schema.columns + else [] + ) + row_count = (manifest.total_row_count if manifest else None) or 0 + + # Lazy generator: each chunk's presigned link is fetched and downloaded + # just-in-time (see _iter_external_links / _write_csv), so links don't age + # out during a long multi-chunk download. + links = _iter_external_links(client, statement_id, status.result) + _write_csv(dest, columns, links, overwrite=overwrite, base_dir=base) + + return { + "path": str(dest), + "row_count": int(row_count), + "bytes": dest.stat().st_size, + "columns": columns, + "format": fmt, + } + + +def register_export_query_tool(mcp) -> None: + """Register ``export_query_to_file`` on the FastMCP server instance. + + Must be called AFTER importing the upstream server module and BEFORE the + tool allowlist runs (so the allowlist remains the single source of truth + for what is exposed; ``export_query_to_file`` must be in ALLOWED_TOOLS). + + Args: + mcp: The FastMCP server instance (from databricks_mcp_server.server). + """ + + @mcp.tool(timeout=EXPORT_TOOL_TIMEOUT_CEILING) + def export_query_to_file( + sql_query: str, + output_path: str, + format: str = "csv", + catalog: Optional[str] = None, + schema: Optional[str] = None, + overwrite: bool = False, + timeout: int = _DEFAULT_STATEMENT_TIMEOUT, + ) -> Dict[str, Any]: + """Run a SQL query and write the full result set to a local file. + + Use this instead of execute_sql when the result is large or is destined + for a downstream tool/model rather than to be read in the conversation. + The result rows are written to disk by the server and never returned + through the model context — only a small manifest is returned. + + Args: + sql_query: The SQL query to run. + output_path: Destination file, relative to the server's export + directory (DATABRICKS_MCP_EXPORT_DIR, default + ~/databricks-mcp-exports). Paths escaping that directory are + rejected. + format: Output format. Currently only "csv". + catalog: Optional catalog context for the query. + schema: Optional schema context for the query. + overwrite: Overwrite the destination if it already exists. + timeout: Per-call statement timeout in seconds (bounded by the + tool's 600s ceiling). + + Returns: + A manifest: {path, row_count, bytes, columns, format}. The rows + themselves are on disk at `path`, not in this response. + """ + return _run_export( + sql_query=sql_query, + output_path=output_path, + fmt=format, + catalog=catalog, + schema=schema, + overwrite=overwrite, + timeout=timeout, + ) + + logger.info( + "Registered export_query_to_file tool (ceiling=%ss, export dir=%s)", + EXPORT_TOOL_TIMEOUT_CEILING, + get_export_base_dir(), + ) diff --git a/noom-mcp-server/customization/tool_allowlist_patch.py b/noom-mcp-server/customization/tool_allowlist_patch.py index 6a53df3e..2ee8a8c3 100644 --- a/noom-mcp-server/customization/tool_allowlist_patch.py +++ b/noom-mcp-server/customization/tool_allowlist_patch.py @@ -46,6 +46,11 @@ "execute_sql_multi", # Schema and statistics — uses SQLExecutor internally (SP-governed) "get_table_stats_and_schema", + # Bulk result export — streams EXTERNAL_LINKS/CSV chunks to a local + # file (server-side write, manifest-only return). Governed via the same + # SP client + warehouse override + mcp_user tagging. Customization-layer + # tool registered in run.py before this allowlist runs. + "export_query_to_file", # Read-only warehouse listing (list + get_best) — user OAuth, no writes # Upstream README names: list_warehouses + get_best_warehouse (merged in v0.1.12) "manage_warehouse", diff --git a/noom-mcp-server/docs/export-tool-design.md b/noom-mcp-server/docs/export-tool-design.md new file mode 100644 index 00000000..277c553b --- /dev/null +++ b/noom-mcp-server/docs/export-tool-design.md @@ -0,0 +1,184 @@ +# Design note: bulk query export (`export_query_to_file`) + +## TL;DR +Returning large query results through the MCP blows up the model's context +window, and nothing in the existing toolset avoids it. This adds a new +customization-layer tool, `export_query_to_file`, which has the **server +process** atomically write results to local disk and return only a small +manifest — so the row data never passes through the model's context at all. + +## The core problem: how MCP tool results reach the model +A tool call's return value becomes part of the conversation — those bytes are +tokenized into the model's context window. That's fine when the model is the +*consumer* of the data (reading/reasoning over a small result). It breaks when +the model is just a *courier* moving bulk data somewhere else. + +Concretely: a real analytical query (the `Meristem ups_ex360_med` experiment +grid) came back as **198 rows × 35 columns ≈ 68 KB ≈ 37,000 tokens**. That +single result exceeded read limits and had to spill to a temp file. Every query +like it does the same. This is not a bug — it's the inherent shape of returning +data through a tool result. + +## Why this keeps happening (it's structural, not a one-off) +- **A tool result is tokenized into context — by design.** There is no "stream" + or "view" mode; whatever a tool returns becomes conversation tokens. +- **Context is a fixed, shared budget.** Result rows compete with instructions, + history, and every other tool call for the same space. +- **It scales the wrong way.** More rows = more tokens = more cost/latency, + until it overflows. Analytical/EDA work trends toward *larger* results. +- **The model is a lossy, expensive conduit.** Downstream tools need the exact + bytes; a model can round, drop, or summarize. Even when a result "fits," + fidelity isn't guaranteed. + +So any non-trivial extract, by anyone, hits the same wall. The durable fix is to +stop routing data through the model at all. + +## Approaches considered (and why rejected) +1. **Work around it with current tools** — No. Every result-returning tool + serializes into context by design; `output_format` only changes formatting, + not destination. The harness overflow temp-file is a side-effect, not an + export; reading it back re-bloats context. +2. **Keep data in Databricks (CTAS + consumer reads the table)** — Good for the + in-Databricks model app, but local tooling (viz, notebooks) needs local + files; can't funnel everything through the model app. +3. **Subagent fetches the data** — Hides the bloat in a child context but + doesn't eliminate it, and an LLM can't losslessly transcribe a large result + (silent corruption). The only correct version runs *code* to move bytes — at + which point the subagent adds nothing. +4. **Bundle a separate connector script with the plugin** — Works, but every + user must install + auth a separate client. The MCP server already holds a + governed connection. + +## The chosen fix: a write-to-disk MCP tool +An MCP server is a local process with filesystem access. A tool may have a side +effect (write a file) and return only a small confirmation. So data flows +**warehouse → server process → local disk**, with only a manifest +(`{path, row_count, bytes, columns, format}`) returning through the protocol +into context. + +### How it works +1. **Execution disposition.** Does *not* reuse `execute_sql` (which uses the + `INLINE` disposition and reads only the first chunk — it truncates large + results). Instead it issues the statement with `disposition=EXTERNAL_LINKS` + and `format=CSV`. Databricks writes the result to cloud storage as CSV chunk + files and returns presigned download URLs. +2. **Lazy, just-in-time streaming.** A generator walks the `next_chunk_index` + chain, and each chunk's presigned link is downloaded *immediately* as it is + produced (not collected up front). This keeps each URL freshly issued when + used, so links don't age out mid-export, and it is memory-bounded (one chunk + at a time) and never truncates. The CSV header arrives in the first chunk; an + empty result yields a header-only file from the manifest schema. +3. **Atomic write.** Chunks stream to a temp file in the destination directory, + which is moved into place only after the *last* chunk is written. If any + download fails — or the wall-clock ceiling fires mid-stream — the temp file + is discarded and the destination is left untouched. There is never a partial + or empty file at the real path (a half-written CSV that looks complete is + exactly the silent corruption this tool exists to prevent). +4. **Overwrite guard.** The early existence check is only a fast-fail; the + authoritative guard is atomic at the move step. `overwrite=false` uses + `os.link` (fails if the destination exists, even if it appeared *during* the + query); `overwrite=true` uses `os.replace`. This closes the time-of-check / + time-of-use gap across a long-running query. +5. **Timeout bounded by the tool ceiling.** The per-call `timeout` is clamped to + the tool's 600s wall-clock ceiling, so the poll loop's own cancellation fires + before FastMCP aborts the call — otherwise an over-large timeout would orphan + the statement on the warehouse. +6. **What the model sees.** Only the manifest — not the rows. +7. **Governance preserved.** Reuses the existing chokepoints: Service-Principal + client (creds from the `dbrix_mcp_secret` scope), forced warehouse override, + and `mcp_user:` query tagging, plus a `mcp_tool:export_query_to_file` + tag. Exports appear in `system.query.history` like any governed query. +8. **Path safety.** Writes are confined to `DATABRICKS_MCP_EXPORT_DIR` (default + `~/databricks-mcp-exports`); `..` traversal and absolute escapes are rejected. + Containment is also **re-checked immediately before writing**, so a symlink + introduced under the base during the query can't redirect the write outside + the sandbox. + +### Correctness & safety properties (and where each is enforced) +| Property | Mechanism | +|---|---| +| No truncation of large results | `EXTERNAL_LINKS` + full `next_chunk_index` pagination (vs. upstream's first-chunk `INLINE`) | +| Memory-bounded | one chunk streamed at a time | +| No presigned-link expiry mid-export | lazy, just-in-time per-chunk download | +| No partial/empty file on failure | temp file + atomic move, discard-on-error | +| No silent clobber of an existing file | `os.link` no-clobber when `overwrite=false` | +| No orphaned warehouse statement | per-call timeout clamped to the 600s ceiling | +| No write outside the sandbox | up-front path validation + write-time containment re-check | +| No data through the model context | server writes the file; only the manifest is returned | + +### Data lifecycle +There are two distinct "CSV" artifacts; only one is ours to manage. + +| Artifact | Location | Lifecycle | Owner | +|---|---|---|---| +| Cloud-fetch staging | Databricks-managed cloud storage | Transient. Each presigned link carries an explicit `expiration` (the authoritative value; ~15 min observed in testing, **not** a documented constant). The staged result is held only briefly under Databricks' statement-result retention, then purged. | **Databricks** — we don't choose the path, create a table/volume, or extend it | +| Exported `.csv` | Engineer's local disk (`DATABRICKS_MCP_EXPORT_DIR`) | Persists until the engineer deletes it — **no automatic cleanup**. | **The engineer** | + +We persist nothing durable or self-owned in Databricks. The exported file is the +engineer's to keep or remove. There is **deliberately no automatic deletion**: +the data is PII-masked at source by the governed MCP (which is what makes local +downloads acceptable in the first place), and **reproducibility of prior +analyses outweighs disk hygiene** — silently removing earlier exports would +undermine re-running a past analysis. A time-based retention sweep was +prototyped during review and removed for these reasons. + +### Governing principle +The MCP layer is a **control plane, not a data plane.** It orchestrates and +passes *handles* (a table name, or a file path); bulk bytes move +warehouse → storage → disk underneath, never through the model. +**Pass the path, not the payload.** + +## What changed +All inside `noom-mcp-server/` (upstream untouched), same customization-layer +pattern as the SQL-timeout PR (#18): + +| File | Change | +|---|---| +| `customization/export_query_patch.py` | New — tool, lazy chunk streaming, atomic write, overwrite guard, timeout clamp, path sandbox | +| `customization/tool_allowlist_patch.py` | Added `export_query_to_file` to `ALLOWED_TOOLS` | +| `run.py` | New Step 2b: registers the tool before the allowlist | +| `tests/test_export_query_patch.py` | New — unit tests (sandbox, header quoting, lazy chunk iteration, atomic write, overwrite guard, timeout clamp) | +| `.env.example` | Documented `DATABRICKS_MCP_EXPORT_DIR` | +| `docs/export-tool-design.md` | This design note | + +**Validation:** 59 unit tests pass, ruff check + format clean. Validated three +ways: the full `run.py` startup lifecycle (tool registered, survives the +allowlist, governed SP path), live warehouse exports (synthetic deterministic, +a real 122-column slice, empty → header-only), and a byte-for-byte identical +result from the local code path and the deployed MCP server on the real +experiment query. + +## Follow-ups +- Optional Parquet output (`Format.ARROW_STREAM` + pyarrow) if a consumer + prefers it over CSV. +- Engineers must restart their MCP client to pick up the tool after it ships. + +--- + +### Changelog — what changed since the previous version of this note +The earlier draft described the v1 design. Since then, code review (Cursor +Bugbot) surfaced several correctness and safety issues that have been fixed; the +note now reflects the shipped behavior: + +- **Atomic write (was: direct write).** The previous note said the server + "writes the bytes straight to the local file." That left a partial/empty file + at the destination if a download failed mid-stream. Now writes go to a temp + file and are atomically moved into place only on full success. +- **Overwrite guard hardened.** The existence check ran once before the query; a + file appearing during the query could still be clobbered. The move now uses + `os.link` (no-clobber) when `overwrite=false`. +- **Lazy link fetching (was: eager collection).** Links are now fetched + just-in-time per chunk so presigned URLs don't expire during long exports. +- **Timeout clamped to the 600s ceiling**, preventing orphaned warehouse + statements when a caller passes an over-large timeout. +- **Write-time sandbox re-check** added, so a symlink introduced during the + query can't redirect the write outside the export dir. +- **No automatic file retention.** A time-based retention sweep was prototyped + during review and then **removed**: the exported data is PII-masked at source, + and auto-deleting prior exports works against reproducibility. Exported files + persist until the engineer removes them. +- **Lifecycle wording corrected.** The "~15 min" presigned-link figure is an + *observation*, not a documented Databricks SLA — the per-response `expiration` + field is authoritative. +- **Counts updated.** Six files (added `.env.example`, this doc); 59 unit tests + (was "14 / 51"). diff --git a/noom-mcp-server/run.py b/noom-mcp-server/run.py index bbe66b78..a7eaa35e 100644 --- a/noom-mcp-server/run.py +++ b/noom-mcp-server/run.py @@ -93,6 +93,20 @@ from databricks_mcp_server.server import mcp # noqa: E402 +# --------------------------------------------------------------------------- +# Step 2b: Register the customization-layer export tool. +# +# Adds export_query_to_file, which streams a query's full result set to a local +# file (server-side write) and returns only a manifest — keeping bulk result +# data out of the model context. Must run before the allowlist (Step 3), which +# is the single source of truth for what is exposed; the tool name is listed in +# ALLOWED_TOOLS. +# --------------------------------------------------------------------------- + +from customization.export_query_patch import register_export_query_tool # noqa: E402 + +register_export_query_tool(mcp) + # --------------------------------------------------------------------------- # Step 3: Apply the tool allowlist. # diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py new file mode 100644 index 00000000..c5d7f652 --- /dev/null +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -0,0 +1,304 @@ +"""Unit tests for customization.export_query_patch. + +No live workspace, server import, or OAuth is needed: path-sandbox logic is +pure, and the streaming writer is exercised with stand-in link objects and a +monkeypatched downloader. +""" + +import csv + +import pytest + +from customization import export_query_patch as ep + + +# --------------------------------------------------------------------------- +# Path sandboxing +# --------------------------------------------------------------------------- + + +def test_relative_path_resolves_inside_base(tmp_path): + dest = ep.resolve_export_path("sub/dir/out.csv", base_dir=tmp_path) + assert dest == (tmp_path / "sub/dir/out.csv").resolve() + assert tmp_path.resolve() in dest.parents + + +def test_absolute_path_inside_base_is_allowed(tmp_path): + inside = tmp_path / "ok.csv" + dest = ep.resolve_export_path(str(inside), base_dir=tmp_path) + assert dest == inside.resolve() + + +def test_dotdot_traversal_is_rejected(tmp_path): + with pytest.raises(ValueError, match="outside the export directory"): + ep.resolve_export_path("../escape.csv", base_dir=tmp_path) + + +def test_absolute_path_outside_base_is_rejected(tmp_path): + with pytest.raises(ValueError, match="outside the export directory"): + ep.resolve_export_path("/etc/passwd", base_dir=tmp_path) + + +@pytest.mark.parametrize("bad", ["", " ", ".", "./"]) +def test_no_filename_is_rejected(tmp_path, bad): + with pytest.raises(ValueError): + ep.resolve_export_path(bad, base_dir=tmp_path) + + +# --------------------------------------------------------------------------- +# CSV header derivation +# --------------------------------------------------------------------------- + + +def test_csv_header_line_is_rfc_quoted(): + line = ep._csv_header_line(["a", "b,c", 'd"e']) + # csv module quotes the field containing a comma and escapes the quote. + assert line == 'a,"b,c","d""e"\n' + + +# --------------------------------------------------------------------------- +# Chunk collection + streaming write +# --------------------------------------------------------------------------- + + +class _FakeLink: + def __init__(self, chunk_index, payload, next_chunk_index=None, row_offset=0): + self.chunk_index = chunk_index + self.next_chunk_index = next_chunk_index + self.row_offset = row_offset + self.external_link = f"https://example/{chunk_index}" + self.http_headers = {} + self._payload = payload + + +class _FakeResult: + def __init__(self, external_links, next_chunk_index=None): + self.external_links = external_links + self.next_chunk_index = next_chunk_index + + +class _FakeClient: + """Stand-in exposing only get_statement_result_chunk_n for pagination.""" + + def __init__(self, chunks_by_index): + self._chunks = chunks_by_index + + class _SE: + def get_statement_result_chunk_n(_self, statement_id, chunk_index): + return self._chunks[chunk_index] + + self.statement_execution = _SE() + + +def test_iter_external_links_follows_chunk_chain(): + link0 = _FakeLink(0, b"a\n", next_chunk_index=1) + link1 = _FakeLink(1, b"b\n", next_chunk_index=None) + first = _FakeResult([link0], next_chunk_index=1) + second = _FakeResult([link1], next_chunk_index=None) + client = _FakeClient({1: second}) + + links = list(ep._iter_external_links(client, "stmt-1", first)) + assert [link.chunk_index for link in links] == [0, 1] + + +def test_iter_external_links_orders_within_result(): + link_b = _FakeLink(1, b"b\n", row_offset=10) + link_a = _FakeLink(0, b"a\n", row_offset=0) + first = _FakeResult([link_b, link_a], next_chunk_index=None) + links = list(ep._iter_external_links(_FakeClient({}), "stmt-1", first)) + assert [link.chunk_index for link in links] == [0, 1] + + +def test_iter_external_links_is_lazy(): + # The next chunk must not be fetched until the previous one is consumed — + # this is what keeps presigned links fresh during a long download. + import types + + link0 = _FakeLink(0, b"a\n", next_chunk_index=1) + link1 = _FakeLink(1, b"b\n", next_chunk_index=None) + first = _FakeResult([link0], next_chunk_index=1) + + fetched = [] + + class _TrackingClient: + def __init__(self): + outer = self + + class _SE: + def get_statement_result_chunk_n(_self, statement_id, chunk_index): + fetched.append(chunk_index) + return _FakeResult([link1], next_chunk_index=None) + + self.statement_execution = _SE() + _ = outer + + gen = ep._iter_external_links(_TrackingClient(), "stmt-1", first) + assert isinstance(gen, types.GeneratorType) + assert next(gen).chunk_index == 0 + assert fetched == [] # chunk 1 NOT fetched until we ask for the next link + assert next(gen).chunk_index == 1 + assert fetched == [1] + + +def test_write_csv_streams_chunk_bytes_verbatim(tmp_path, monkeypatch): + # Databricks CSV chunks already carry the header in the first chunk; the + # writer must stream bytes as-is (no synthesized header) to avoid a dupe. + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + dest = tmp_path / "out.csv" + links = [ + _FakeLink(0, b"id,name\n1,alice\n2,bob\n"), + _FakeLink(1, b"3,carol\n"), + ] + ep._write_csv(dest, ["id", "name"], links) + + rows = list(csv.reader(dest.read_text().splitlines())) + assert rows == [ + ["id", "name"], + ["1", "alice"], + ["2", "bob"], + ["3", "carol"], + ] + + +def test_write_csv_empty_result_writes_header_only(tmp_path): + # No external links (empty result) -> header-only CSV from the manifest. + dest = tmp_path / "empty.csv" + ep._write_csv(dest, ["id", "name"], []) + assert dest.read_text() == "id,name\n" + + +def test_write_csv_failure_preserves_prior_file_and_leaves_no_partial(tmp_path, monkeypatch): + # A mid-stream chunk failure must not clobber a prior good export, and must + # leave no partial/temp file behind. (Atomic write: temp + os.replace.) + dest = tmp_path / "out.csv" + dest.write_text("PRIOR GOOD DATA\n") + + calls = {"n": 0} + + def flaky(link): + calls["n"] += 1 + if calls["n"] == 2: + raise RuntimeError("chunk download failed") + return link._payload + + monkeypatch.setattr(ep, "_download_link", flaky) + links = [_FakeLink(0, b"id,name\n1,a\n"), _FakeLink(1, b"2,b\n")] + + with pytest.raises(RuntimeError, match="chunk download failed"): + ep._write_csv(dest, ["id", "name"], links) + + assert dest.read_text() == "PRIOR GOOD DATA\n" # untouched + assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] # no .part leftover + + +def test_write_csv_failure_creates_no_destination(tmp_path, monkeypatch): + # If dest didn't exist and the write fails, no file should appear at dest. + dest = tmp_path / "new.csv" + monkeypatch.setattr( + ep, "_download_link", lambda link: (_ for _ in ()).throw(RuntimeError("boom")) + ) + + with pytest.raises(RuntimeError, match="boom"): + ep._write_csv(dest, ["id"], [_FakeLink(0, b"x")]) + + assert not dest.exists() + assert list(tmp_path.iterdir()) == [] # temp cleaned up + + +def test_write_csv_refuses_to_clobber_when_overwrite_false(tmp_path, monkeypatch): + # Even if dest appears after the caller's early check (simulated here by it + # already existing), the atomic move must not clobber it when overwrite=False. + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + dest = tmp_path / "out.csv" + dest.write_text("ORIGINAL\n") + + with pytest.raises(ValueError, match="already exists"): + ep._write_csv(dest, ["id"], [_FakeLink(0, b"id\n1\n")], overwrite=False) + + assert dest.read_text() == "ORIGINAL\n" # untouched + assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] # no .part leftover + + +def test_write_csv_overwrites_when_true(tmp_path, monkeypatch): + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + dest = tmp_path / "out.csv" + dest.write_text("OLD\n") + + ep._write_csv(dest, ["id", "name"], [_FakeLink(0, b"id,name\n1,a\n")], overwrite=True) + + assert dest.read_text() == "id,name\n1,a\n" + assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] + + +def test_write_csv_refuses_parent_symlink_outside_sandbox(tmp_path, monkeypatch): + # Simulates a symlink introduced under the base after resolve_export_path ran: + # base/sub -> external. The re-check must refuse to write and create nothing + # outside the sandbox. + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + base = tmp_path / "base" + base.mkdir() + external = tmp_path / "external" + external.mkdir() + (base / "sub").symlink_to(external, target_is_directory=True) + dest = base / "sub" / "out.csv" # resolves to external/out.csv + + with pytest.raises(ValueError, match="outside the export sandbox"): + ep._write_csv(dest, ["id"], [_FakeLink(0, b"id\n1\n")], overwrite=True, base_dir=base) + + assert list(external.iterdir()) == [] # nothing written outside + + +def test_write_csv_allows_parent_symlink_inside_sandbox(tmp_path, monkeypatch): + # A symlink that still resolves *inside* the base is fine (containment, not a + # blanket symlink ban). + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + base = tmp_path / "base" + base.mkdir() + (base / "real").mkdir() + (base / "alias").symlink_to(base / "real", target_is_directory=True) + dest = base / "alias" / "out.csv" # resolves to base/real/out.csv (inside) + + ep._write_csv( + dest, ["id", "name"], [_FakeLink(0, b"id,name\n1,a\n")], overwrite=True, base_dir=base + ) + + assert (base / "real" / "out.csv").read_text() == "id,name\n1,a\n" + + +# --------------------------------------------------------------------------- +# Timeout clamp +# --------------------------------------------------------------------------- + + +def test_clamp_statement_timeout(): + ceiling = ep.EXPORT_TOOL_TIMEOUT_CEILING + assert ep._clamp_statement_timeout(ceiling + 5000) == ceiling # over → clamped + assert ep._clamp_statement_timeout(120) == 120 # under → unchanged + assert ep._clamp_statement_timeout(ceiling) == ceiling # exactly → unchanged + + +def test_download_link_applies_http_headers(monkeypatch): + captured = {} + + class _Resp: + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + def read(self): + return b"data" + + def _fake_urlopen(req): + captured["headers"] = dict(req.header_items()) + return _Resp() + + monkeypatch.setattr(ep.urllib.request, "urlopen", _fake_urlopen) + + link = _FakeLink(0, b"") + link.http_headers = {"X-Amz-Token": "abc"} + out = ep._download_link(link) + assert out == b"data" + # urllib title-cases header keys; just confirm the value made it through. + assert "abc" in captured["headers"].values()