From 64ecc2f414b83d6357eabba94fb97e948f1af7c7 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Tue, 2 Jun 2026 14:52:21 -0400 Subject: [PATCH 1/9] feat(noom-mcp-server): add export_query_to_file for bulk result export MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Returning large query results through the MCP serializes them into the model's context window. A real analytical query (the Meristem ups_ex360_med grid) came back as ~68 KB / ~37,000 tokens and overflowed. This is structural, not a one-off: a tool result is tokenized into context by design, so any non-trivial extract — by any engineer, on any project — hits the same wall. It also makes the model a lossy conduit for data that downstream tools (viz, the model app) need exact and whole. This adds a customization-layer tool, export_query_to_file, that has the SERVER write results to a local file and return only a manifest ({path, row_count, bytes, columns, format}). The rows never enter the model context — local tools read the file off disk. Pass the path, not the payload. Mechanism: - Does NOT reuse SQLExecutor.execute(): that path uses the INLINE disposition and reads only result.data_array (the first chunk), silently truncating large results. - Issues the statement with disposition=EXTERNAL_LINKS, format=CSV. Databricks writes CSV chunk files to cloud storage and returns presigned URLs; the server streams each chunk to disk, following next_chunk_index. Memory- bounded, no truncation. The header arrives in the first chunk; an empty result yields a header-only file from the manifest schema. - Reuses the existing governance chokepoints (get_sql_sp_client, get_sql_warehouse_id, get_mcp_user_identity): SP execution, forced warehouse override, and mcp_user: tagging, plus a mcp_tool tag. - Sandboxes writes to DATABRICKS_MCP_EXPORT_DIR (default ~/databricks-mcp-exports); rejects .. traversal and absolute escapes. Registered in run.py before the tool allowlist (Step 2b) so the allowlist stays the single source of truth for what is exposed; the tool name is added to ALLOWED_TOOLS. No upstream files touched, per the fork's governance model. Validation: 14 new unit tests; full suite 51 passed / 1 skipped; lint clean; live-validated against the warehouse (synthetic deterministic result, a real 122-column table slice, and an empty result -> header-only file). The live test caught and fixed a duplicate-header bug (we initially synthesized a header that the CSV chunks already include). Co-Authored-By: Claude Opus 4.8 --- noom-mcp-server/.env.example | 9 +- .../customization/export_query_patch.py | 357 ++++++++++++++++++ .../customization/tool_allowlist_patch.py | 5 + noom-mcp-server/docs/export-tool-design.md | 102 +++++ noom-mcp-server/run.py | 14 + .../tests/test_export_query_patch.py | 163 ++++++++ 6 files changed, 648 insertions(+), 2 deletions(-) create mode 100644 noom-mcp-server/customization/export_query_patch.py create mode 100644 noom-mcp-server/docs/export-tool-design.md create mode 100644 noom-mcp-server/tests/test_export_query_patch.py diff --git a/noom-mcp-server/.env.example b/noom-mcp-server/.env.example index 25c72b5d..0af92417 100644 --- a/noom-mcp-server/.env.example +++ b/noom-mcp-server/.env.example @@ -14,7 +14,12 @@ DATABRICKS_HOST=https://noom-prod.cloud.databricks.com DATABRICKS_MCP_SQL_HOST=https://noom-prod.cloud.databricks.com # SQL warehouse — all queries are forced to run on this warehouse -# Find it in Databricks UI: -# * SQL Warehouses → Pick your warehouse. +# Find it in Databricks UI: +# * SQL Warehouses → Pick your warehouse. # * In the Overview, you can see the ID by the warehouse name DATABRICKS_WAREHOUSE_ID= + +# Export directory (optional) — where export_query_to_file writes result files. +# All writes are confined to this directory; paths that escape it are rejected. +# Defaults to ~/databricks-mcp-exports if unset. +# DATABRICKS_MCP_EXPORT_DIR=/path/to/exports diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py new file mode 100644 index 00000000..fcec27e4 --- /dev/null +++ b/noom-mcp-server/customization/export_query_patch.py @@ -0,0 +1,357 @@ +"""Export-query-to-file tool. + +Adds a customization-layer MCP tool, ``export_query_to_file``, that runs a SQL +query and writes the **full** result set to a local file (CSV today), returning +only a small manifest (``{path, row_count, bytes, columns, format}``) to the +caller. + +Why this exists +--------------- +The upstream ``execute_sql`` tool serializes results back *through the MCP +protocol* into the model's context. That is correct for interactive reads, but +it is the wrong shape for bulk extraction feeding a downstream model or local +tooling (data viz, notebooks): even a modest result set blows up the context +window, and large results trip the protocol's size limit. + +This tool keeps the bytes out of the model entirely. The **server process** +(which has local filesystem access) downloads the result and writes the file; +the model only ever receives a manifest. Local tools then read the file off +disk. Pass the path, not the payload. + +Why not wrap SQLExecutor.execute() +---------------------------------- +``SQLExecutor.execute()`` uses the Statement Execution API with the **INLINE** +disposition and ``_extract_results`` reads only ``result.data_array`` — the +*first chunk*. It never paginates, so it (a) materializes rows as a list of +dicts in server RAM and (b) silently truncates large results to the first +chunk. Both are unacceptable for a model-feeding extraction path. + +Instead this tool issues the statement through the same governed SP client but +with the **EXTERNAL_LINKS** disposition and ``format=CSV``. Databricks writes +the result to cloud storage as CSV chunks and returns presigned URLs; the +server streams those chunks straight to the local file. This is memory-bounded +(one chunk at a time), never truncates, and avoids any lossy reformatting since +Databricks emits the CSV itself. + +Governance +---------- +Execution reuses the existing governance chokepoints from +``sql_executor_patch``: + - ``get_sql_sp_client()`` → Service Principal credentials + - ``get_sql_warehouse_id()`` → forced warehouse override + - ``get_mcp_user_identity()`` → ``mcp_user:`` query tag + +so exports show up in ``system.query.history`` exactly like ``execute_sql`` +calls, plus a ``mcp_tool:export_query_to_file`` tag for filtering. + +Path safety +----------- +For broad distribution the tool MUST NOT write to arbitrary local paths. All +writes are confined to a single base directory: + - ``DATABRICKS_MCP_EXPORT_DIR`` if set, else + - ``~/databricks-mcp-exports`` (created on demand). +``output_path`` is interpreted relative to that base; absolute paths and ``..`` +traversal that escape the base are rejected. +""" + +import csv +import logging +import os +import time +import urllib.request +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Wall-clock ceiling (seconds) for the export tool call. Bulk exports of large +# result sets can legitimately run longer than an interactive query, so this +# matches the raised SQL ceiling rather than the upstream 60s default. +EXPORT_TOOL_TIMEOUT_CEILING = 600 + +# Default per-call statement timeout (seconds) — overridable via the tool's +# ``timeout`` argument, bounded by the ceiling above. +_DEFAULT_STATEMENT_TIMEOUT = 300 + +_SUPPORTED_FORMATS = ("csv",) + + +# --------------------------------------------------------------------------- +# Path sandboxing +# --------------------------------------------------------------------------- + + +def get_export_base_dir() -> Path: + """Return the (created) base directory all exports are confined to. + + ``DATABRICKS_MCP_EXPORT_DIR`` if set, else ``~/databricks-mcp-exports``. + """ + configured = os.environ.get("DATABRICKS_MCP_EXPORT_DIR") + base = Path(configured).expanduser() if configured else Path.home() / "databricks-mcp-exports" + base = base.resolve() + base.mkdir(parents=True, exist_ok=True) + return base + + +def resolve_export_path(output_path: str, base_dir: Optional[Path] = None) -> Path: + """Resolve ``output_path`` against the export base dir, rejecting escapes. + + Args: + output_path: Caller-supplied path, interpreted relative to the base dir. + Absolute paths are allowed only if they fall inside the base dir. + base_dir: Override for the base dir (tests). Defaults to + ``get_export_base_dir()``. + + Returns: + The absolute, sandbox-checked destination path. + + Raises: + ValueError: If the resolved path escapes the base dir, or the filename + is empty / not a recognised extension. + """ + if not output_path or not output_path.strip(): + raise ValueError("output_path must be a non-empty filename.") + + base = (base_dir or get_export_base_dir()).resolve() + + candidate = Path(output_path) + combined = candidate if candidate.is_absolute() else base / candidate + resolved = combined.resolve() + + # Containment check: resolved must be strictly under the base dir. + if base not in resolved.parents: + # Either it escapes the base dir, or it *is* the base dir (no filename). + if resolved == base: + raise ValueError( + "output_path must include a filename, not just the export directory." + ) + raise ValueError( + f"output_path {output_path!r} resolves outside the export directory " + f"({base}). Writes are confined to that directory; use a relative path " + f"or set DATABRICKS_MCP_EXPORT_DIR." + ) + return resolved + + +# --------------------------------------------------------------------------- +# Result streaming +# --------------------------------------------------------------------------- + + +def _collect_external_links(client, statement_id: str, first_result) -> List[Any]: + """Walk the chunk chain, returning all external links in row order.""" + links: List[Any] = [] + result = first_result + while result is not None and result.external_links: + links.extend(result.external_links) + nxt = result.next_chunk_index + if nxt is None: + break + result = client.statement_execution.get_statement_result_chunk_n( + statement_id=statement_id, chunk_index=nxt + ) + # Order defensively by chunk_index (fallback row_offset) so rows are in order. + links.sort(key=lambda link: (link.chunk_index or 0, link.row_offset or 0)) + return links + + +def _download_link(link) -> bytes: + """Download one presigned external link, honouring its http_headers. + + The link is a presigned cloud-storage URL — Databricks workspace auth must + NOT be attached; only the headers the link itself specifies are sent. + """ + req = urllib.request.Request(link.external_link) + for key, value in (link.http_headers or {}).items(): + req.add_header(key, value) + with urllib.request.urlopen(req) as resp: # noqa: S310 - presigned Databricks URL + return resp.read() + + +def _write_csv(dest: Path, columns: List[str], links: List[Any]) -> None: + """Stream each CSV chunk's bytes to ``dest`` in row order. + + Databricks EXTERNAL_LINKS + CSV chunks already include the column header + (in the first chunk), so the chunk bytes are written as-is — synthesizing a + header would duplicate it. + + The one exception is an empty result: Databricks returns no external links, + so there are no bytes to stream. In that case a header-only CSV is written + (derived from the manifest schema) so the file is still valid and carries + the column names for downstream tools. + """ + dest.parent.mkdir(parents=True, exist_ok=True) + with open(dest, "wb") as fh: + if not links: + fh.write(_csv_header_line(columns).encode("utf-8")) + return + for link in links: + fh.write(_download_link(link)) + + +def _csv_header_line(columns: List[str]) -> str: + import io + + buf = io.StringIO() + csv.writer(buf, lineterminator="\n").writerow(columns) + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# Tool implementation +# --------------------------------------------------------------------------- + + +def _run_export( + sql_query: str, + output_path: str, + fmt: str, + catalog: Optional[str], + schema: Optional[str], + overwrite: bool, + timeout: int, +) -> Dict[str, Any]: + """Core logic, separated from the FastMCP wrapper for testability.""" + from databricks.sdk.service.sql import Disposition, Format, QueryTag, StatementState + + # Import governance helpers lazily (and from the sibling patch) so this + # module is importable without the SP environment configured (e.g. tests). + from customization.sql_executor_patch import ( + get_mcp_user_identity, + get_sql_sp_client, + get_sql_warehouse_id, + ) + + fmt = fmt.lower() + if fmt not in _SUPPORTED_FORMATS: + raise ValueError(f"Unsupported format {fmt!r}. Supported: {', '.join(_SUPPORTED_FORMATS)}.") + + dest = resolve_export_path(output_path) + if dest.exists() and not overwrite: + raise ValueError(f"{dest} already exists. Pass overwrite=true to replace it.") + + warehouse_id = get_sql_warehouse_id() + identity = get_mcp_user_identity() # resolve under user creds, before SP client use + client = get_sql_sp_client() + + query_tags = [ + QueryTag(key="mcp_user", value=identity), + QueryTag(key="mcp_tool", value="export_query_to_file"), + ] + + response = client.statement_execution.execute_statement( + statement=sql_query, + warehouse_id=warehouse_id, + disposition=Disposition.EXTERNAL_LINKS, + format=Format.CSV, + wait_timeout="0s", + catalog=catalog, + schema=schema, + query_tags=query_tags, + ) + statement_id = response.statement_id + + poll_interval = 2 + elapsed = 0 + status = response + while True: + state = status.status.state + if state == StatementState.SUCCEEDED: + break + if state in (StatementState.FAILED, StatementState.CANCELED, StatementState.CLOSED): + msg = "" + if status.status and status.status.error and status.status.error.message: + msg = status.status.error.message + raise RuntimeError(f"Export query did not succeed (state={state}). {msg}".strip()) + if elapsed >= timeout: + try: + client.statement_execution.cancel_execution(statement_id=statement_id) + finally: + raise RuntimeError( + f"Export query timed out after {timeout}s and was canceled " + f"(statement_id={statement_id})." + ) + time.sleep(poll_interval) + elapsed += poll_interval + status = client.statement_execution.get_statement(statement_id=statement_id) + + manifest = status.manifest + columns = ( + [col.name for col in manifest.schema.columns] + if manifest and manifest.schema and manifest.schema.columns + else [] + ) + row_count = (manifest.total_row_count if manifest else None) or 0 + + links = _collect_external_links(client, statement_id, status.result) + _write_csv(dest, columns, links) + + return { + "path": str(dest), + "row_count": int(row_count), + "bytes": dest.stat().st_size, + "columns": columns, + "format": fmt, + } + + +def register_export_query_tool(mcp) -> None: + """Register ``export_query_to_file`` on the FastMCP server instance. + + Must be called AFTER importing the upstream server module and BEFORE the + tool allowlist runs (so the allowlist remains the single source of truth + for what is exposed; ``export_query_to_file`` must be in ALLOWED_TOOLS). + + Args: + mcp: The FastMCP server instance (from databricks_mcp_server.server). + """ + + @mcp.tool(timeout=EXPORT_TOOL_TIMEOUT_CEILING) + def export_query_to_file( + sql_query: str, + output_path: str, + format: str = "csv", + catalog: Optional[str] = None, + schema: Optional[str] = None, + overwrite: bool = False, + timeout: int = _DEFAULT_STATEMENT_TIMEOUT, + ) -> Dict[str, Any]: + """Run a SQL query and write the full result set to a local file. + + Use this instead of execute_sql when the result is large or is destined + for a downstream tool/model rather than to be read in the conversation. + The result rows are written to disk by the server and never returned + through the model context — only a small manifest is returned. + + Args: + sql_query: The SQL query to run. + output_path: Destination file, relative to the server's export + directory (DATABRICKS_MCP_EXPORT_DIR, default + ~/databricks-mcp-exports). Paths escaping that directory are + rejected. + format: Output format. Currently only "csv". + catalog: Optional catalog context for the query. + schema: Optional schema context for the query. + overwrite: Overwrite the destination if it already exists. + timeout: Per-call statement timeout in seconds (bounded by the + tool's 600s ceiling). + + Returns: + A manifest: {path, row_count, bytes, columns, format}. The rows + themselves are on disk at `path`, not in this response. + """ + return _run_export( + sql_query=sql_query, + output_path=output_path, + fmt=format, + catalog=catalog, + schema=schema, + overwrite=overwrite, + timeout=timeout, + ) + + logger.info( + "Registered export_query_to_file tool (ceiling=%ss, export dir=%s)", + EXPORT_TOOL_TIMEOUT_CEILING, + get_export_base_dir(), + ) diff --git a/noom-mcp-server/customization/tool_allowlist_patch.py b/noom-mcp-server/customization/tool_allowlist_patch.py index 6a53df3e..2ee8a8c3 100644 --- a/noom-mcp-server/customization/tool_allowlist_patch.py +++ b/noom-mcp-server/customization/tool_allowlist_patch.py @@ -46,6 +46,11 @@ "execute_sql_multi", # Schema and statistics — uses SQLExecutor internally (SP-governed) "get_table_stats_and_schema", + # Bulk result export — streams EXTERNAL_LINKS/CSV chunks to a local + # file (server-side write, manifest-only return). Governed via the same + # SP client + warehouse override + mcp_user tagging. Customization-layer + # tool registered in run.py before this allowlist runs. + "export_query_to_file", # Read-only warehouse listing (list + get_best) — user OAuth, no writes # Upstream README names: list_warehouses + get_best_warehouse (merged in v0.1.12) "manage_warehouse", diff --git a/noom-mcp-server/docs/export-tool-design.md b/noom-mcp-server/docs/export-tool-design.md new file mode 100644 index 00000000..94676546 --- /dev/null +++ b/noom-mcp-server/docs/export-tool-design.md @@ -0,0 +1,102 @@ +# Design note: bulk query export (`export_query_to_file`) + +## TL;DR +Returning large query results through the MCP blows up the model's context +window, and nothing in the existing toolset avoids it. This adds a new +customization-layer tool, `export_query_to_file`, which has the **server +process** write results straight to local disk and return only a small +manifest — so the row data never passes through the model's context at all. + +## The core problem: how MCP tool results reach the model +A tool call's return value becomes part of the conversation — those bytes are +tokenized into the model's context window. That's fine when the model is the +*consumer* of the data (reading/reasoning over a small result). It breaks when +the model is just a *courier* moving bulk data somewhere else. + +Concretely: a real analytical query (the `Meristem ups_ex360_med` experiment +grid) came back as **198 rows × 35 columns ≈ 68 KB ≈ 37,000 tokens**. That +single result exceeded read limits and had to spill to a temp file. Every query +like it does the same. This is not a bug — it's the inherent shape of returning +data through a tool result. + +## Why this keeps happening (it's structural, not a one-off) +- **A tool result is tokenized into context — by design.** There is no "stream" + or "view" mode; whatever a tool returns becomes conversation tokens. +- **Context is a fixed, shared budget.** Result rows compete with instructions, + history, and every other tool call for the same space. +- **It scales the wrong way.** More rows = more tokens = more cost/latency, + until it overflows. Analytical/EDA work trends toward *larger* results. +- **The model is a lossy, expensive conduit.** Downstream tools need the exact + bytes; a model can round, drop, or summarize. Even when a result "fits," + fidelity isn't guaranteed. + +So any non-trivial extract, by anyone, hits the same wall. The durable fix is to +stop routing data through the model at all. + +## Approaches considered (and why rejected) +1. **Work around it with current tools** — No. Every result-returning tool + serializes into context by design; `output_format` only changes formatting, + not destination. The harness overflow temp-file is a side-effect, not an + export; reading it back re-bloats context. +2. **Keep data in Databricks (CTAS + consumer reads the table)** — Good for the + in-Databricks model app, but local tooling (viz, notebooks) needs local + files; can't funnel everything through the model app. +3. **Subagent fetches the data** — Hides the bloat in a child context but + doesn't eliminate it, and an LLM can't losslessly transcribe a large result + (silent corruption). The only correct version runs *code* to move bytes — at + which point the subagent adds nothing. +4. **Bundle a separate connector script with the plugin** — Works, but every + user must install + auth a separate client. The MCP server already holds a + governed connection. + +## The chosen fix: a write-to-disk MCP tool +An MCP server is a local process with filesystem access. A tool may have a side +effect (write a file) and return only a small confirmation. So data flows +**warehouse → server process → local disk**, with only a manifest +(`{path, row_count, bytes, columns, format}`) returning through the protocol +into context. + +### How it works +1. **Execution disposition.** Does *not* reuse `execute_sql` (which uses the + `INLINE` disposition and reads only the first chunk — it truncates large + results). Instead it issues the statement with `disposition=EXTERNAL_LINKS` + and `format=CSV`. Databricks writes the result to cloud storage as CSV chunk + files and returns presigned download URLs. +2. **Server-side streaming.** The server downloads each presigned chunk and + writes the bytes to the local file, following the `next_chunk_index` chain. + Memory-bounded (one chunk at a time), never truncates. The CSV header arrives + in the first chunk; an empty result yields a header-only file from the + manifest schema. +3. **What the model sees.** Only the manifest — not the rows. +4. **Governance preserved.** Reuses the existing chokepoints: Service-Principal + client, forced warehouse override, and `mcp_user:` query tagging, + plus a `mcp_tool:export_query_to_file` tag. +5. **Path safety.** Writes are confined to `DATABRICKS_MCP_EXPORT_DIR` + (default `~/databricks-mcp-exports`); `..` traversal and absolute paths that + escape the base are rejected. + +### Governing principle +The MCP layer is a **control plane, not a data plane.** It orchestrates and +passes *handles* (a table name, or a file path); bulk bytes move +warehouse → storage → disk underneath, never through the model. +**Pass the path, not the payload.** + +## What changed +All inside `noom-mcp-server/` (upstream untouched), same customization-layer +pattern as the SQL-timeout PR (#18): + +| File | Change | +|---|---| +| `customization/export_query_patch.py` | New — tool, chunk streaming, path sandbox | +| `customization/tool_allowlist_patch.py` | Added `export_query_to_file` to `ALLOWED_TOOLS` | +| `run.py` | New Step 2b: registers the tool before the allowlist | +| `tests/test_export_query_patch.py` | New — 14 unit tests | +| `.env.example` | Documented `DATABRICKS_MCP_EXPORT_DIR` | + +**Validation:** full unit suite passes, lint clean, and the path is +live-validated against the warehouse (synthetic deterministic result, a real +122-column table slice, and an empty result → header-only file). + +## Follow-ups +- Optional Parquet output (`Format.ARROW_STREAM` + pyarrow) if a consumer + prefers it over CSV. diff --git a/noom-mcp-server/run.py b/noom-mcp-server/run.py index bbe66b78..a7eaa35e 100644 --- a/noom-mcp-server/run.py +++ b/noom-mcp-server/run.py @@ -93,6 +93,20 @@ from databricks_mcp_server.server import mcp # noqa: E402 +# --------------------------------------------------------------------------- +# Step 2b: Register the customization-layer export tool. +# +# Adds export_query_to_file, which streams a query's full result set to a local +# file (server-side write) and returns only a manifest — keeping bulk result +# data out of the model context. Must run before the allowlist (Step 3), which +# is the single source of truth for what is exposed; the tool name is listed in +# ALLOWED_TOOLS. +# --------------------------------------------------------------------------- + +from customization.export_query_patch import register_export_query_tool # noqa: E402 + +register_export_query_tool(mcp) + # --------------------------------------------------------------------------- # Step 3: Apply the tool allowlist. # diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py new file mode 100644 index 00000000..0d447357 --- /dev/null +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -0,0 +1,163 @@ +"""Unit tests for customization.export_query_patch. + +No live workspace, server import, or OAuth is needed: path-sandbox logic is +pure, and the streaming writer is exercised with stand-in link objects and a +monkeypatched downloader. +""" + +import csv + +import pytest + +from customization import export_query_patch as ep + + +# --------------------------------------------------------------------------- +# Path sandboxing +# --------------------------------------------------------------------------- + + +def test_relative_path_resolves_inside_base(tmp_path): + dest = ep.resolve_export_path("sub/dir/out.csv", base_dir=tmp_path) + assert dest == (tmp_path / "sub/dir/out.csv").resolve() + assert tmp_path.resolve() in dest.parents + + +def test_absolute_path_inside_base_is_allowed(tmp_path): + inside = tmp_path / "ok.csv" + dest = ep.resolve_export_path(str(inside), base_dir=tmp_path) + assert dest == inside.resolve() + + +def test_dotdot_traversal_is_rejected(tmp_path): + with pytest.raises(ValueError, match="outside the export directory"): + ep.resolve_export_path("../escape.csv", base_dir=tmp_path) + + +def test_absolute_path_outside_base_is_rejected(tmp_path): + with pytest.raises(ValueError, match="outside the export directory"): + ep.resolve_export_path("/etc/passwd", base_dir=tmp_path) + + +@pytest.mark.parametrize("bad", ["", " ", ".", "./"]) +def test_no_filename_is_rejected(tmp_path, bad): + with pytest.raises(ValueError): + ep.resolve_export_path(bad, base_dir=tmp_path) + + +# --------------------------------------------------------------------------- +# CSV header derivation +# --------------------------------------------------------------------------- + + +def test_csv_header_line_is_rfc_quoted(): + line = ep._csv_header_line(["a", "b,c", 'd"e']) + # csv module quotes the field containing a comma and escapes the quote. + assert line == 'a,"b,c","d""e"\n' + + +# --------------------------------------------------------------------------- +# Chunk collection + streaming write +# --------------------------------------------------------------------------- + + +class _FakeLink: + def __init__(self, chunk_index, payload, next_chunk_index=None, row_offset=0): + self.chunk_index = chunk_index + self.next_chunk_index = next_chunk_index + self.row_offset = row_offset + self.external_link = f"https://example/{chunk_index}" + self.http_headers = {} + self._payload = payload + + +class _FakeResult: + def __init__(self, external_links, next_chunk_index=None): + self.external_links = external_links + self.next_chunk_index = next_chunk_index + + +class _FakeClient: + """Stand-in exposing only get_statement_result_chunk_n for pagination.""" + + def __init__(self, chunks_by_index): + self._chunks = chunks_by_index + + class _SE: + def get_statement_result_chunk_n(_self, statement_id, chunk_index): + return self._chunks[chunk_index] + + self.statement_execution = _SE() + + +def test_collect_external_links_follows_chunk_chain(): + link0 = _FakeLink(0, b"a\n", next_chunk_index=1) + link1 = _FakeLink(1, b"b\n", next_chunk_index=None) + first = _FakeResult([link0], next_chunk_index=1) + second = _FakeResult([link1], next_chunk_index=None) + client = _FakeClient({1: second}) + + links = ep._collect_external_links(client, "stmt-1", first) + assert [link.chunk_index for link in links] == [0, 1] + + +def test_collect_external_links_orders_out_of_order_chunks(): + link_b = _FakeLink(1, b"b\n", row_offset=10) + link_a = _FakeLink(0, b"a\n", row_offset=0) + first = _FakeResult([link_b, link_a], next_chunk_index=None) + links = ep._collect_external_links(_FakeClient({}), "stmt-1", first) + assert [link.chunk_index for link in links] == [0, 1] + + +def test_write_csv_streams_chunk_bytes_verbatim(tmp_path, monkeypatch): + # Databricks CSV chunks already carry the header in the first chunk; the + # writer must stream bytes as-is (no synthesized header) to avoid a dupe. + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + dest = tmp_path / "out.csv" + links = [ + _FakeLink(0, b"id,name\n1,alice\n2,bob\n"), + _FakeLink(1, b"3,carol\n"), + ] + ep._write_csv(dest, ["id", "name"], links) + + rows = list(csv.reader(dest.read_text().splitlines())) + assert rows == [ + ["id", "name"], + ["1", "alice"], + ["2", "bob"], + ["3", "carol"], + ] + + +def test_write_csv_empty_result_writes_header_only(tmp_path): + # No external links (empty result) -> header-only CSV from the manifest. + dest = tmp_path / "empty.csv" + ep._write_csv(dest, ["id", "name"], []) + assert dest.read_text() == "id,name\n" + + +def test_download_link_applies_http_headers(monkeypatch): + captured = {} + + class _Resp: + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + def read(self): + return b"data" + + def _fake_urlopen(req): + captured["headers"] = dict(req.header_items()) + return _Resp() + + monkeypatch.setattr(ep.urllib.request, "urlopen", _fake_urlopen) + + link = _FakeLink(0, b"") + link.http_headers = {"X-Amz-Token": "abc"} + out = ep._download_link(link) + assert out == b"data" + # urllib title-cases header keys; just confirm the value made it through. + assert "abc" in captured["headers"].values() From d9778ecaf1e5e04af5e81cf58bf1ba90c27eed95 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Tue, 2 Jun 2026 14:55:21 -0400 Subject: [PATCH 2/9] style(noom-mcp-server): ruff-format export_query_patch CI runs `ruff format --check` in addition to `ruff check`; the new module needed reformatting (line wrapping only, no logic change). Co-Authored-By: Claude Opus 4.8 --- noom-mcp-server/customization/export_query_patch.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index fcec27e4..757afbde 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -122,9 +122,7 @@ def resolve_export_path(output_path: str, base_dir: Optional[Path] = None) -> Pa if base not in resolved.parents: # Either it escapes the base dir, or it *is* the base dir (no filename). if resolved == base: - raise ValueError( - "output_path must include a filename, not just the export directory." - ) + raise ValueError("output_path must include a filename, not just the export directory.") raise ValueError( f"output_path {output_path!r} resolves outside the export directory " f"({base}). Writes are confined to that directory; use a relative path " From 35c109801da2d0abe5bc3c4657ebccfdaffdb43b Mon Sep 17 00:00:00 2001 From: michael ruder Date: Tue, 2 Jun 2026 17:24:25 -0400 Subject: [PATCH 3/9] feat(noom-mcp-server): add retention sweep for local exports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review follow-up (PR #19): the only artifact this tool owns with a real lifecycle is the LOCAL CSV — Databricks-side cloud-fetch staging is transient and platform-managed (presigned links expire ~15 min, measured). The local files previously had no retention; since exports can contain prod PII, an unmanaged pile of CSVs is a governance concern. Adds sweep_old_exports(): removes files in the export dir older than DATABRICKS_MCP_EXPORT_RETENTION_DAYS (default 7; 0 disables), pruning emptied subdirectories. Runs at startup and before each export. Sweep errors are logged, never raised, so cleanup can't block an export. Also documents the env var in .env.example and adds a "Data lifecycle" section to the design note distinguishing the transient Databricks staging from the local file. Tests: +4 (old removed / recent kept, disabled at <=0, empty-dir prune, env parsing). Full suite 55 passed / 1 skipped; ruff check + format clean. Co-Authored-By: Claude Opus 4.8 --- noom-mcp-server/.env.example | 5 + .../customization/export_query_patch.py | 94 ++++++++++++++++++- noom-mcp-server/docs/export-tool-design.md | 13 +++ .../tests/test_export_query_patch.py | 53 +++++++++++ 4 files changed, 164 insertions(+), 1 deletion(-) diff --git a/noom-mcp-server/.env.example b/noom-mcp-server/.env.example index 0af92417..346a8689 100644 --- a/noom-mcp-server/.env.example +++ b/noom-mcp-server/.env.example @@ -23,3 +23,8 @@ DATABRICKS_WAREHOUSE_ID= # All writes are confined to this directory; paths that escape it are rejected. # Defaults to ~/databricks-mcp-exports if unset. # DATABRICKS_MCP_EXPORT_DIR=/path/to/exports + +# Export retention (optional) — exported files are deleted after this many days +# (swept on startup and before each export). Exports can contain prod PII, so +# they are not kept indefinitely. Default 7; set to 0 to disable the sweep. +# DATABRICKS_MCP_EXPORT_RETENTION_DAYS=7 diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index 757afbde..56f92256 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -73,6 +73,12 @@ # ``timeout`` argument, bounded by the ceiling above. _DEFAULT_STATEMENT_TIMEOUT = 300 +# Default retention window (days) for files in the export directory. Exported +# files can contain prod PII, so they are not kept indefinitely: a sweep removes +# files older than this on startup and before each export. Override with +# DATABRICKS_MCP_EXPORT_RETENTION_DAYS; set it to 0 (or negative) to disable. +_DEFAULT_RETENTION_DAYS = 7 + _SUPPORTED_FORMATS = ("csv",) @@ -93,6 +99,84 @@ def get_export_base_dir() -> Path: return base +# --------------------------------------------------------------------------- +# Retention +# --------------------------------------------------------------------------- + + +def get_export_retention_days() -> int: + """Return the export-file retention window in days (default 7; 0 disables). + + Read from ``DATABRICKS_MCP_EXPORT_RETENTION_DAYS``. Invalid values fall back + to the default with a warning. + """ + raw = os.environ.get("DATABRICKS_MCP_EXPORT_RETENTION_DAYS") + if raw is None or not raw.strip(): + return _DEFAULT_RETENTION_DAYS + try: + return int(raw) + except ValueError: + logger.warning( + "Invalid DATABRICKS_MCP_EXPORT_RETENTION_DAYS=%r; using default %d.", + raw, + _DEFAULT_RETENTION_DAYS, + ) + return _DEFAULT_RETENTION_DAYS + + +def sweep_old_exports(base_dir: Optional[Path] = None, retention_days: Optional[int] = None) -> int: + """Delete files in the export dir older than the retention window. + + Exported CSVs can contain prod PII, so they are not retained indefinitely. + A retention of <= 0 disables the sweep. Empty subdirectories left behind are + pruned. Errors removing an individual path are logged, not raised — a sweep + failure must never block an export. + + Args: + base_dir: Override for the export base dir (tests). + retention_days: Override for the retention window (tests). + + Returns: + The number of files removed. + """ + days = get_export_retention_days() if retention_days is None else retention_days + if days <= 0: + return 0 + + base = (base_dir or get_export_base_dir()).resolve() + cutoff = time.time() - days * 86400 + removed = 0 + + for path in base.rglob("*"): + try: + if path.is_file() and path.stat().st_mtime < cutoff: + path.unlink() + removed += 1 + except OSError as exc: + logger.warning("export retention sweep: could not remove %s: %s", path, exc) + + # Prune now-empty subdirectories, deepest first (never the base itself). + for path in sorted( + (p for p in base.rglob("*") if p.is_dir()), + key=lambda p: len(p.parts), + reverse=True, + ): + try: + if not any(path.iterdir()): + path.rmdir() + except OSError: + pass + + if removed: + logger.info( + "export retention sweep: removed %d file(s) older than %d day(s) from %s", + removed, + days, + base, + ) + return removed + + def resolve_export_path(output_path: str, base_dir: Optional[Path] = None) -> Path: """Resolve ``output_path`` against the export base dir, rejecting escapes. @@ -224,6 +308,9 @@ def _run_export( if fmt not in _SUPPORTED_FORMATS: raise ValueError(f"Unsupported format {fmt!r}. Supported: {', '.join(_SUPPORTED_FORMATS)}.") + # Clear out files past the retention window before writing a new one. + sweep_old_exports() + dest = resolve_export_path(output_path) if dest.exists() and not overwrite: raise ValueError(f"{dest} already exists. Pass overwrite=true to replace it.") @@ -348,8 +435,13 @@ def export_query_to_file( timeout=timeout, ) + # Sweep stale exports left from previous runs at startup, so retention holds + # even for an engineer who exports rarely. + sweep_old_exports() + logger.info( - "Registered export_query_to_file tool (ceiling=%ss, export dir=%s)", + "Registered export_query_to_file tool (ceiling=%ss, export dir=%s, retention=%d days)", EXPORT_TOOL_TIMEOUT_CEILING, get_export_base_dir(), + get_export_retention_days(), ) diff --git a/noom-mcp-server/docs/export-tool-design.md b/noom-mcp-server/docs/export-tool-design.md index 94676546..8bcdc6ae 100644 --- a/noom-mcp-server/docs/export-tool-design.md +++ b/noom-mcp-server/docs/export-tool-design.md @@ -75,6 +75,19 @@ into context. (default `~/databricks-mcp-exports`); `..` traversal and absolute paths that escape the base are rejected. +### Data lifecycle +There are two distinct "CSV" artifacts; only one is ours to manage. + +| Artifact | Location | Lifecycle | Owner | +|---|---|---|---| +| Cloud-fetch staging | Databricks-managed cloud storage | Transient — presigned links expire in ~15 min (measured); the staged result is held only briefly under Databricks' statement-result retention, then purged | **Databricks** — we don't choose the path, create a table/volume, or extend it | +| Exported `.csv` | Engineer's local disk (`DATABRICKS_MCP_EXPORT_DIR`) | Deleted after `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` (default 7); swept on startup and before each export | **This tool** | + +We persist nothing durable or self-owned in Databricks. The only artifact we +own is the local file, and because exports can contain prod PII it is **not** +retained indefinitely: `sweep_old_exports()` removes files older than the +retention window (set the env var to `0` to disable). + ### Governing principle The MCP layer is a **control plane, not a data plane.** It orchestrates and passes *handles* (a table name, or a file path); bulk bytes move diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py index 0d447357..996de46f 100644 --- a/noom-mcp-server/tests/test_export_query_patch.py +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -136,6 +136,59 @@ def test_write_csv_empty_result_writes_header_only(tmp_path): assert dest.read_text() == "id,name\n" +# --------------------------------------------------------------------------- +# Retention sweep +# --------------------------------------------------------------------------- + + +def _make_aged_file(path, age_days): + import os + import time + + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("x") + mtime = time.time() - age_days * 86400 + os.utime(path, (mtime, mtime)) + + +def test_sweep_removes_old_keeps_recent(tmp_path): + old = tmp_path / "old.csv" + recent = tmp_path / "recent.csv" + _make_aged_file(old, age_days=10) + _make_aged_file(recent, age_days=1) + + removed = ep.sweep_old_exports(base_dir=tmp_path, retention_days=7) + + assert removed == 1 + assert not old.exists() + assert recent.exists() + + +def test_sweep_disabled_when_retention_not_positive(tmp_path): + old = tmp_path / "old.csv" + _make_aged_file(old, age_days=999) + assert ep.sweep_old_exports(base_dir=tmp_path, retention_days=0) == 0 + assert old.exists() + + +def test_sweep_prunes_empty_subdirs(tmp_path): + nested = tmp_path / "sub" / "deep.csv" + _make_aged_file(nested, age_days=30) + ep.sweep_old_exports(base_dir=tmp_path, retention_days=7) + assert not nested.exists() + assert not (tmp_path / "sub").exists() # emptied dir pruned + assert tmp_path.exists() # base dir never removed + + +def test_get_retention_days_default_and_override(monkeypatch): + monkeypatch.delenv("DATABRICKS_MCP_EXPORT_RETENTION_DAYS", raising=False) + assert ep.get_export_retention_days() == ep._DEFAULT_RETENTION_DAYS + monkeypatch.setenv("DATABRICKS_MCP_EXPORT_RETENTION_DAYS", "3") + assert ep.get_export_retention_days() == 3 + monkeypatch.setenv("DATABRICKS_MCP_EXPORT_RETENTION_DAYS", "garbage") + assert ep.get_export_retention_days() == ep._DEFAULT_RETENTION_DAYS # falls back + + def test_download_link_applies_http_headers(monkeypatch): captured = {} From 266b025748de811266ae19993a7627e36d592ab2 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Tue, 2 Jun 2026 17:36:18 -0400 Subject: [PATCH 4/9] fix(noom-mcp-server): write exports atomically (temp + os.replace) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bugbot (PR #19): _write_csv opened output_path with "wb", truncating it immediately, then streamed chunk downloads. A later chunk failure, an expired presigned link, or the tool's wall-clock ceiling firing mid-stream would leave a partial/empty CSV at the real path — and with overwrite=true, destroy the prior good export. A half-written CSV that looks complete is exactly the silent corruption this tool exists to prevent. Now streams to a temp file in the destination directory and os.replace()s it into place only after the last chunk is written (atomic on the same filesystem). On any BaseException (including timeout/cancellation) the temp file is removed and dest is left untouched. Tests: +2 (mid-stream failure preserves prior file and leaves no .part; failure creates no destination when none existed). Full suite 57 passed / 1 skipped; ruff check + format clean. Co-Authored-By: Claude Opus 4.8 --- .../customization/export_query_patch.py | 31 +++++++++++---- .../tests/test_export_query_patch.py | 38 +++++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index 56f92256..d4593f9b 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -57,6 +57,7 @@ import csv import logging import os +import tempfile import time import urllib.request from pathlib import Path @@ -251,7 +252,7 @@ def _download_link(link) -> bytes: def _write_csv(dest: Path, columns: List[str], links: List[Any]) -> None: - """Stream each CSV chunk's bytes to ``dest`` in row order. + """Stream each CSV chunk's bytes to ``dest`` in row order, atomically. Databricks EXTERNAL_LINKS + CSV chunks already include the column header (in the first chunk), so the chunk bytes are written as-is — synthesizing a @@ -261,14 +262,30 @@ def _write_csv(dest: Path, columns: List[str], links: List[Any]) -> None: so there are no bytes to stream. In that case a header-only CSV is written (derived from the manifest schema) so the file is still valid and carries the column names for downstream tools. + + Atomicity: the data is streamed to a temp file in the destination directory + and only ``os.replace``-d into place after the *last* chunk is written. If a + chunk download fails (or the tool's wall-clock ceiling fires mid-stream), the + temp file is removed and ``dest`` is left untouched — never a partial or + empty file at the real path. A half-written CSV that looks complete is + exactly the silent corruption this tool exists to prevent. """ dest.parent.mkdir(parents=True, exist_ok=True) - with open(dest, "wb") as fh: - if not links: - fh.write(_csv_header_line(columns).encode("utf-8")) - return - for link in links: - fh.write(_download_link(link)) + fd, tmp_name = tempfile.mkstemp(dir=dest.parent, prefix=f".{dest.name}.", suffix=".part") + tmp = Path(tmp_name) + try: + with os.fdopen(fd, "wb") as fh: + if not links: + fh.write(_csv_header_line(columns).encode("utf-8")) + else: + for link in links: + fh.write(_download_link(link)) + os.replace(tmp, dest) # atomic on the same filesystem + except BaseException: + # Includes timeouts/cancellation: never leave a partial file behind, + # and never clobber a prior good export at `dest`. + tmp.unlink(missing_ok=True) + raise def _csv_header_line(columns: List[str]) -> str: diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py index 996de46f..26520637 100644 --- a/noom-mcp-server/tests/test_export_query_patch.py +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -136,6 +136,44 @@ def test_write_csv_empty_result_writes_header_only(tmp_path): assert dest.read_text() == "id,name\n" +def test_write_csv_failure_preserves_prior_file_and_leaves_no_partial(tmp_path, monkeypatch): + # A mid-stream chunk failure must not clobber a prior good export, and must + # leave no partial/temp file behind. (Atomic write: temp + os.replace.) + dest = tmp_path / "out.csv" + dest.write_text("PRIOR GOOD DATA\n") + + calls = {"n": 0} + + def flaky(link): + calls["n"] += 1 + if calls["n"] == 2: + raise RuntimeError("chunk download failed") + return link._payload + + monkeypatch.setattr(ep, "_download_link", flaky) + links = [_FakeLink(0, b"id,name\n1,a\n"), _FakeLink(1, b"2,b\n")] + + with pytest.raises(RuntimeError, match="chunk download failed"): + ep._write_csv(dest, ["id", "name"], links) + + assert dest.read_text() == "PRIOR GOOD DATA\n" # untouched + assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] # no .part leftover + + +def test_write_csv_failure_creates_no_destination(tmp_path, monkeypatch): + # If dest didn't exist and the write fails, no file should appear at dest. + dest = tmp_path / "new.csv" + monkeypatch.setattr( + ep, "_download_link", lambda link: (_ for _ in ()).throw(RuntimeError("boom")) + ) + + with pytest.raises(RuntimeError, match="boom"): + ep._write_csv(dest, ["id"], [_FakeLink(0, b"x")]) + + assert not dest.exists() + assert list(tmp_path.iterdir()) == [] # temp cleaned up + + # --------------------------------------------------------------------------- # Retention sweep # --------------------------------------------------------------------------- From b488351585beb0cf100a02edcec2c98314213c44 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Tue, 2 Jun 2026 17:43:29 -0400 Subject: [PATCH 5/9] fix(noom-mcp-server): clamp export timeout to ceiling; fetch chunk links lazily MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugbot findings (PR #19): 1. Statement timeout exceeds tool ceiling. _run_export polled on the raw caller timeout, so a value above the 600s FastMCP ceiling let the tool call be aborted before our cancellation fired — leaving the statement running on the warehouse (orphaned query). Added _clamp_statement_timeout(): the per-call timeout is now bounded by EXPORT_TOOL_TIMEOUT_CEILING so the poll loop cancels within the ceiling. (Same orphaned-query concern as #18.) 2. Presigned URLs expire during download. We collected every external link up front, then downloaded — so on a long multi-chunk export the links issued first could expire (~15 min) before the later downloads. Replaced _collect_external_links with _iter_external_links, a lazy generator that the writer drives: each chunk's link is fetched and downloaded just-in-time, so the URL is freshly issued when used. The at-risk window is now one chunk, not the whole export. (The prior atomic-write fix already prevents a truncated CSV from being reported as success — a failed download raises and leaves dest untouched; this makes long exports actually complete rather than fail.) _write_csv now consumes an iterable lazily (downloads as it iterates) and writes a header-only file when the iterable is empty. Tests: +clamp + lazy-iteration (asserts the next chunk isn't fetched until the prior link is consumed). Full suite 59 passed / 1 skipped; ruff clean; re-validated live (synthetic, 122-col real table, clamp). Co-Authored-By: Claude Opus 4.8 --- .../customization/export_query_patch.py | 79 ++++++++++++++----- .../tests/test_export_query_patch.py | 51 +++++++++++- 2 files changed, 108 insertions(+), 22 deletions(-) diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index d4593f9b..5cead987 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -61,7 +61,7 @@ import time import urllib.request from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterable, Iterator, List, Optional logger = logging.getLogger(__name__) @@ -221,21 +221,32 @@ def resolve_export_path(output_path: str, base_dir: Optional[Path] = None) -> Pa # --------------------------------------------------------------------------- -def _collect_external_links(client, statement_id: str, first_result) -> List[Any]: - """Walk the chunk chain, returning all external links in row order.""" - links: List[Any] = [] +def _iter_external_links(client, statement_id: str, first_result) -> Iterator[Any]: + """Yield external links in row order, following the chunk chain lazily. + + Laziness is deliberate. The presigned ``external_link`` URLs are short-lived + (Databricks issues them per response and they expire — observed ~15 min). If + we collected every link up front and only then downloaded, a long multi-chunk + export could outlive the links issued first. By yielding each link as its + chunk is fetched — and having the caller download it immediately — every URL + is freshly issued when used, so the at-risk window is one chunk, not the + whole export. + + Within a single response, links are ordered by ``chunk_index`` (the chunk + chain itself is ordered via ``next_chunk_index``). + """ result = first_result while result is not None and result.external_links: - links.extend(result.external_links) + for link in sorted( + result.external_links, key=lambda link: (link.chunk_index or 0, link.row_offset or 0) + ): + yield link nxt = result.next_chunk_index if nxt is None: break result = client.statement_execution.get_statement_result_chunk_n( statement_id=statement_id, chunk_index=nxt ) - # Order defensively by chunk_index (fallback row_offset) so rows are in order. - links.sort(key=lambda link: (link.chunk_index or 0, link.row_offset or 0)) - return links def _download_link(link) -> bytes: @@ -251,17 +262,21 @@ def _download_link(link) -> bytes: return resp.read() -def _write_csv(dest: Path, columns: List[str], links: List[Any]) -> None: +def _write_csv(dest: Path, columns: List[str], links: Iterable[Any]) -> None: """Stream each CSV chunk's bytes to ``dest`` in row order, atomically. Databricks EXTERNAL_LINKS + CSV chunks already include the column header (in the first chunk), so the chunk bytes are written as-is — synthesizing a header would duplicate it. - The one exception is an empty result: Databricks returns no external links, - so there are no bytes to stream. In that case a header-only CSV is written - (derived from the manifest schema) so the file is still valid and carries - the column names for downstream tools. + ``links`` is an iterable, consumed lazily: each link is downloaded as it is + produced, so when it is the lazy chunk-chain generator + (``_iter_external_links``) the presigned URL is freshly issued at download + time rather than minutes earlier. + + The one exception is an empty result: the iterable yields nothing, so a + header-only CSV is written (derived from the manifest schema) so the file is + still valid and carries the column names for downstream tools. Atomicity: the data is streamed to a temp file in the destination directory and only ``os.replace``-d into place after the *last* chunk is written. If a @@ -274,12 +289,13 @@ def _write_csv(dest: Path, columns: List[str], links: List[Any]) -> None: fd, tmp_name = tempfile.mkstemp(dir=dest.parent, prefix=f".{dest.name}.", suffix=".part") tmp = Path(tmp_name) try: + wrote_any = False with os.fdopen(fd, "wb") as fh: - if not links: + for link in links: + fh.write(_download_link(link)) + wrote_any = True + if not wrote_any: fh.write(_csv_header_line(columns).encode("utf-8")) - else: - for link in links: - fh.write(_download_link(link)) os.replace(tmp, dest) # atomic on the same filesystem except BaseException: # Includes timeouts/cancellation: never leave a partial file behind, @@ -301,6 +317,24 @@ def _csv_header_line(columns: List[str]) -> str: # --------------------------------------------------------------------------- +def _clamp_statement_timeout(timeout: int) -> int: + """Bound the per-call statement timeout by the tool's wall-clock ceiling. + + A caller-supplied timeout above ``EXPORT_TOOL_TIMEOUT_CEILING`` would let + FastMCP abort the tool call before the poll loop's own cancellation fires, + leaving the statement running on the warehouse. Clamping keeps our cancel + within the ceiling. Logs when a value is reduced. + """ + if timeout > EXPORT_TOOL_TIMEOUT_CEILING: + logger.warning( + "export timeout %ss exceeds the %ss ceiling; clamping.", + timeout, + EXPORT_TOOL_TIMEOUT_CEILING, + ) + return EXPORT_TOOL_TIMEOUT_CEILING + return timeout + + def _run_export( sql_query: str, output_path: str, @@ -325,6 +359,12 @@ def _run_export( if fmt not in _SUPPORTED_FORMATS: raise ValueError(f"Unsupported format {fmt!r}. Supported: {', '.join(_SUPPORTED_FORMATS)}.") + # Clamp the per-call timeout to the tool's wall-clock ceiling (see + # _clamp_statement_timeout): a value above the ceiling would let FastMCP + # abort the tool call before our own cancellation fires, orphaning the + # statement on the warehouse. The poll loop below cancels at this bound. + timeout = _clamp_statement_timeout(timeout) + # Clear out files past the retention window before writing a new one. sweep_old_exports() @@ -385,7 +425,10 @@ def _run_export( ) row_count = (manifest.total_row_count if manifest else None) or 0 - links = _collect_external_links(client, statement_id, status.result) + # Lazy generator: each chunk's presigned link is fetched and downloaded + # just-in-time (see _iter_external_links / _write_csv), so links don't age + # out during a long multi-chunk download. + links = _iter_external_links(client, statement_id, status.result) _write_csv(dest, columns, links) return { diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py index 26520637..4a2b789b 100644 --- a/noom-mcp-server/tests/test_export_query_patch.py +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -90,25 +90,56 @@ def get_statement_result_chunk_n(_self, statement_id, chunk_index): self.statement_execution = _SE() -def test_collect_external_links_follows_chunk_chain(): +def test_iter_external_links_follows_chunk_chain(): link0 = _FakeLink(0, b"a\n", next_chunk_index=1) link1 = _FakeLink(1, b"b\n", next_chunk_index=None) first = _FakeResult([link0], next_chunk_index=1) second = _FakeResult([link1], next_chunk_index=None) client = _FakeClient({1: second}) - links = ep._collect_external_links(client, "stmt-1", first) + links = list(ep._iter_external_links(client, "stmt-1", first)) assert [link.chunk_index for link in links] == [0, 1] -def test_collect_external_links_orders_out_of_order_chunks(): +def test_iter_external_links_orders_within_result(): link_b = _FakeLink(1, b"b\n", row_offset=10) link_a = _FakeLink(0, b"a\n", row_offset=0) first = _FakeResult([link_b, link_a], next_chunk_index=None) - links = ep._collect_external_links(_FakeClient({}), "stmt-1", first) + links = list(ep._iter_external_links(_FakeClient({}), "stmt-1", first)) assert [link.chunk_index for link in links] == [0, 1] +def test_iter_external_links_is_lazy(): + # The next chunk must not be fetched until the previous one is consumed — + # this is what keeps presigned links fresh during a long download. + import types + + link0 = _FakeLink(0, b"a\n", next_chunk_index=1) + link1 = _FakeLink(1, b"b\n", next_chunk_index=None) + first = _FakeResult([link0], next_chunk_index=1) + + fetched = [] + + class _TrackingClient: + def __init__(self): + outer = self + + class _SE: + def get_statement_result_chunk_n(_self, statement_id, chunk_index): + fetched.append(chunk_index) + return _FakeResult([link1], next_chunk_index=None) + + self.statement_execution = _SE() + _ = outer + + gen = ep._iter_external_links(_TrackingClient(), "stmt-1", first) + assert isinstance(gen, types.GeneratorType) + assert next(gen).chunk_index == 0 + assert fetched == [] # chunk 1 NOT fetched until we ask for the next link + assert next(gen).chunk_index == 1 + assert fetched == [1] + + def test_write_csv_streams_chunk_bytes_verbatim(tmp_path, monkeypatch): # Databricks CSV chunks already carry the header in the first chunk; the # writer must stream bytes as-is (no synthesized header) to avoid a dupe. @@ -174,6 +205,18 @@ def test_write_csv_failure_creates_no_destination(tmp_path, monkeypatch): assert list(tmp_path.iterdir()) == [] # temp cleaned up +# --------------------------------------------------------------------------- +# Timeout clamp +# --------------------------------------------------------------------------- + + +def test_clamp_statement_timeout(): + ceiling = ep.EXPORT_TOOL_TIMEOUT_CEILING + assert ep._clamp_statement_timeout(ceiling + 5000) == ceiling # over → clamped + assert ep._clamp_statement_timeout(120) == 120 # under → unchanged + assert ep._clamp_statement_timeout(ceiling) == ceiling # exactly → unchanged + + # --------------------------------------------------------------------------- # Retention sweep # --------------------------------------------------------------------------- From d26291bf464aa30dd465c00934eee257b2bc56a5 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Wed, 3 Jun 2026 09:54:07 -0400 Subject: [PATCH 6/9] fix(noom-mcp-server): atomic overwrite guard; sandbox-harden retention sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugbot findings (PR #19): 1. Overwrite checked before query (TOCTOU). The existence check runs at the top of _run_export, before the (possibly long) query, but _write_csv ended with an unconditional os.replace — so a file appearing during the query was clobbered despite overwrite=false. The early check stays as a fast-fail; the authoritative guard is now atomic at the move step: overwrite=true uses os.replace (clobber), overwrite=false uses os.link (fails if dest exists). 2. Retention sweep sandbox escape. sweep_old_exports walked with base.rglob("*"). On Python 3.13 (recurse_symlinks=False) this does NOT actually reach files behind a directory symlink — verified empirically — so it was not exploitable on our interpreter. But a delete path shouldn't depend on a version-specific glob default, and the prune pass followed dir symlinks via is_dir(). Rewrote with os.walk(followlinks=False) + explicit symlink skip + a containment check (resolved path must stay under base). Now sandbox-safe regardless of version. Tests: +4 (no-clobber when overwrite=false / clobber when true; dir-symlink not descended; file-symlink target untouched). Full suite 63 passed / 1 skipped; ruff clean; re-validated live (export + overwrite=false refusal). Co-Authored-By: Claude Opus 4.8 --- .../customization/export_query_patch.py | 74 +++++++++++++------ .../tests/test_export_query_patch.py | 56 ++++++++++++++ 2 files changed, 108 insertions(+), 22 deletions(-) diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index 5cead987..27696512 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -148,20 +148,31 @@ def sweep_old_exports(base_dir: Optional[Path] = None, retention_days: Optional[ cutoff = time.time() - days * 86400 removed = 0 - for path in base.rglob("*"): - try: - if path.is_file() and path.stat().st_mtime < cutoff: - path.unlink() - removed += 1 - except OSError as exc: - logger.warning("export retention sweep: could not remove %s: %s", path, exc) - - # Prune now-empty subdirectories, deepest first (never the base itself). - for path in sorted( - (p for p in base.rglob("*") if p.is_dir()), - key=lambda p: len(p.parts), - reverse=True, - ): + # os.walk(followlinks=False) never descends into symlinked directories, so + # the walk cannot reach files outside the export dir. We additionally skip + # symlinks outright and confirm each file's real path stays under base — a + # deletion path must not depend on a glob/walk default to stay sandboxed. + for root, _dirs, files in os.walk(base, followlinks=False): + root_path = Path(root) + for name in files: + path = root_path / name + try: + if path.is_symlink(): + continue + if base not in path.resolve().parents: + continue + if path.stat().st_mtime < cutoff: + path.unlink() + removed += 1 + except OSError as exc: + logger.warning("export retention sweep: could not remove %s: %s", path, exc) + + # Prune now-empty real subdirectories, deepest first (never base, never a + # symlinked directory). + for root, _dirs, _files in os.walk(base, topdown=False, followlinks=False): + path = Path(root) + if path == base or path.is_symlink(): + continue try: if not any(path.iterdir()): path.rmdir() @@ -262,7 +273,9 @@ def _download_link(link) -> bytes: return resp.read() -def _write_csv(dest: Path, columns: List[str], links: Iterable[Any]) -> None: +def _write_csv( + dest: Path, columns: List[str], links: Iterable[Any], overwrite: bool = False +) -> None: """Stream each CSV chunk's bytes to ``dest`` in row order, atomically. Databricks EXTERNAL_LINKS + CSV chunks already include the column header @@ -279,11 +292,17 @@ def _write_csv(dest: Path, columns: List[str], links: Iterable[Any]) -> None: still valid and carries the column names for downstream tools. Atomicity: the data is streamed to a temp file in the destination directory - and only ``os.replace``-d into place after the *last* chunk is written. If a - chunk download fails (or the tool's wall-clock ceiling fires mid-stream), the - temp file is removed and ``dest`` is left untouched — never a partial or - empty file at the real path. A half-written CSV that looks complete is - exactly the silent corruption this tool exists to prevent. + and moved into place only after the *last* chunk is written. If a chunk + download fails (or the tool's wall-clock ceiling fires mid-stream), the temp + file is removed and ``dest`` is left untouched — never a partial or empty + file at the real path. A half-written CSV that looks complete is exactly the + silent corruption this tool exists to prevent. + + Overwrite guard: the existence check in the caller runs before a long query, + so it can't be the authoritative guard (the file may appear while the query + is in flight). When ``overwrite`` is False the move uses ``os.link``, which + fails atomically if ``dest`` already exists — closing that TOCTOU window. + When True it uses ``os.replace`` (atomic clobber). """ dest.parent.mkdir(parents=True, exist_ok=True) fd, tmp_name = tempfile.mkstemp(dir=dest.parent, prefix=f".{dest.name}.", suffix=".part") @@ -296,7 +315,18 @@ def _write_csv(dest: Path, columns: List[str], links: Iterable[Any]) -> None: wrote_any = True if not wrote_any: fh.write(_csv_header_line(columns).encode("utf-8")) - os.replace(tmp, dest) # atomic on the same filesystem + if overwrite: + os.replace(tmp, dest) # atomic clobber, same filesystem + else: + # Atomic no-clobber: fails if dest exists, even if it appeared during + # the query (after the caller's early existence check). + try: + os.link(tmp, dest) + except FileExistsError as exc: + raise ValueError( + f"{dest} already exists. Pass overwrite=true to replace it." + ) from exc + tmp.unlink(missing_ok=True) # for the os.link path; no-op after os.replace except BaseException: # Includes timeouts/cancellation: never leave a partial file behind, # and never clobber a prior good export at `dest`. @@ -429,7 +459,7 @@ def _run_export( # just-in-time (see _iter_external_links / _write_csv), so links don't age # out during a long multi-chunk download. links = _iter_external_links(client, statement_id, status.result) - _write_csv(dest, columns, links) + _write_csv(dest, columns, links, overwrite=overwrite) return { "path": str(dest), diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py index 4a2b789b..57c59095 100644 --- a/noom-mcp-server/tests/test_export_query_patch.py +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -205,6 +205,31 @@ def test_write_csv_failure_creates_no_destination(tmp_path, monkeypatch): assert list(tmp_path.iterdir()) == [] # temp cleaned up +def test_write_csv_refuses_to_clobber_when_overwrite_false(tmp_path, monkeypatch): + # Even if dest appears after the caller's early check (simulated here by it + # already existing), the atomic move must not clobber it when overwrite=False. + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + dest = tmp_path / "out.csv" + dest.write_text("ORIGINAL\n") + + with pytest.raises(ValueError, match="already exists"): + ep._write_csv(dest, ["id"], [_FakeLink(0, b"id\n1\n")], overwrite=False) + + assert dest.read_text() == "ORIGINAL\n" # untouched + assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] # no .part leftover + + +def test_write_csv_overwrites_when_true(tmp_path, monkeypatch): + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + dest = tmp_path / "out.csv" + dest.write_text("OLD\n") + + ep._write_csv(dest, ["id", "name"], [_FakeLink(0, b"id,name\n1,a\n")], overwrite=True) + + assert dest.read_text() == "id,name\n1,a\n" + assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] + + # --------------------------------------------------------------------------- # Timeout clamp # --------------------------------------------------------------------------- @@ -252,6 +277,37 @@ def test_sweep_disabled_when_retention_not_positive(tmp_path): assert old.exists() +def test_sweep_does_not_descend_dir_symlink(tmp_path): + # A directory symlink inside the export dir pointing OUTSIDE must never be + # followed — files behind it must survive the sweep. + base = tmp_path / "base" + base.mkdir() + external = tmp_path / "external" + external.mkdir() + victim = external / "victim.csv" + _make_aged_file(victim, age_days=999) + (base / "link").symlink_to(external, target_is_directory=True) + + ep.sweep_old_exports(base_dir=base, retention_days=7) + + assert victim.exists() # external file must NOT be deleted + + +def test_sweep_skips_file_symlink_to_external(tmp_path): + # A file symlink to an external old file must not cause the target's deletion. + base = tmp_path / "base" + base.mkdir() + external = tmp_path / "ext" + external.mkdir() + target = external / "target.csv" + _make_aged_file(target, age_days=999) + (base / "alias.csv").symlink_to(target) + + ep.sweep_old_exports(base_dir=base, retention_days=7) + + assert target.exists() # symlink target untouched + + def test_sweep_prunes_empty_subdirs(tmp_path): nested = tmp_path / "sub" / "deep.csv" _make_aged_file(nested, age_days=30) From 742dcbb13a732e1fd9af6a2367c92f6deebc3958 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Wed, 3 Jun 2026 10:03:37 -0400 Subject: [PATCH 7/9] fix(noom-mcp-server): re-check sandbox containment at write time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bugbot (PR #19): resolve_export_path validates containment before the (long) query, but _write_csv never re-checked — so if dest.parent became a symlink pointing outside DATABRICKS_MCP_EXPORT_DIR during the query, mkstemp/os.link/ os.replace would follow it and write outside the sandbox while the manifest still reported the intended path. (Low real-world severity — local single-user server, so the actor needs fs write access as the user — but it's the same class we just hardened on the delete path, so worth closing for consistency.) _write_csv now takes base_dir and, immediately before writing, re-resolves dest.parent and re-checks containment, refusing to write if it resolves outside base. Containment-based (a symlink resolving back inside base is still allowed), not a blanket symlink ban. _run_export captures the base once and passes it to both resolve_export_path and _write_csv. Not a full TOCTOU-free solution (openat + O_NOFOLLOW per component) — that's over-engineering for a local dev tool; this narrows the window to near-zero and catches symlink-to-outside. Tests: +2 (parent symlink outside base refused / inside base allowed). Full suite 65 passed / 1 skipped; ruff clean; re-validated live. Co-Authored-By: Claude Opus 4.8 --- .../customization/export_query_patch.py | 27 ++++++++++++-- .../tests/test_export_query_patch.py | 35 +++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index 27696512..20aafff8 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -274,7 +274,11 @@ def _download_link(link) -> bytes: def _write_csv( - dest: Path, columns: List[str], links: Iterable[Any], overwrite: bool = False + dest: Path, + columns: List[str], + links: Iterable[Any], + overwrite: bool = False, + base_dir: Optional[Path] = None, ) -> None: """Stream each CSV chunk's bytes to ``dest`` in row order, atomically. @@ -303,7 +307,23 @@ def _write_csv( is in flight). When ``overwrite`` is False the move uses ``os.link``, which fails atomically if ``dest`` already exists — closing that TOCTOU window. When True it uses ``os.replace`` (atomic clobber). + + Sandbox re-check: ``resolve_export_path`` validated containment before the + query, but ``dest.parent`` could have become a symlink pointing outside the + sandbox in the meantime. When ``base_dir`` is given, the destination's parent + is re-resolved and re-checked for containment immediately before writing, so + the write can't follow such a symlink out of the export dir. """ + if base_dir is not None: + base = base_dir.resolve() + real_parent = dest.parent.resolve() + if real_parent != base and base not in real_parent.parents: + raise ValueError( + f"refusing to write {dest}: its directory resolves outside the export " + f"sandbox ({base}) — a symlink may have been introduced after the path " + f"was validated." + ) + dest.parent.mkdir(parents=True, exist_ok=True) fd, tmp_name = tempfile.mkstemp(dir=dest.parent, prefix=f".{dest.name}.", suffix=".part") tmp = Path(tmp_name) @@ -398,7 +418,8 @@ def _run_export( # Clear out files past the retention window before writing a new one. sweep_old_exports() - dest = resolve_export_path(output_path) + base = get_export_base_dir() + dest = resolve_export_path(output_path, base) if dest.exists() and not overwrite: raise ValueError(f"{dest} already exists. Pass overwrite=true to replace it.") @@ -459,7 +480,7 @@ def _run_export( # just-in-time (see _iter_external_links / _write_csv), so links don't age # out during a long multi-chunk download. links = _iter_external_links(client, statement_id, status.result) - _write_csv(dest, columns, links, overwrite=overwrite) + _write_csv(dest, columns, links, overwrite=overwrite, base_dir=base) return { "path": str(dest), diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py index 57c59095..0d3c658b 100644 --- a/noom-mcp-server/tests/test_export_query_patch.py +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -230,6 +230,41 @@ def test_write_csv_overwrites_when_true(tmp_path, monkeypatch): assert [p.name for p in tmp_path.iterdir()] == ["out.csv"] +def test_write_csv_refuses_parent_symlink_outside_sandbox(tmp_path, monkeypatch): + # Simulates a symlink introduced under the base after resolve_export_path ran: + # base/sub -> external. The re-check must refuse to write and create nothing + # outside the sandbox. + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + base = tmp_path / "base" + base.mkdir() + external = tmp_path / "external" + external.mkdir() + (base / "sub").symlink_to(external, target_is_directory=True) + dest = base / "sub" / "out.csv" # resolves to external/out.csv + + with pytest.raises(ValueError, match="outside the export sandbox"): + ep._write_csv(dest, ["id"], [_FakeLink(0, b"id\n1\n")], overwrite=True, base_dir=base) + + assert list(external.iterdir()) == [] # nothing written outside + + +def test_write_csv_allows_parent_symlink_inside_sandbox(tmp_path, monkeypatch): + # A symlink that still resolves *inside* the base is fine (containment, not a + # blanket symlink ban). + monkeypatch.setattr(ep, "_download_link", lambda link: link._payload) + base = tmp_path / "base" + base.mkdir() + (base / "real").mkdir() + (base / "alias").symlink_to(base / "real", target_is_directory=True) + dest = base / "alias" / "out.csv" # resolves to base/real/out.csv (inside) + + ep._write_csv( + dest, ["id", "name"], [_FakeLink(0, b"id,name\n1,a\n")], overwrite=True, base_dir=base + ) + + assert (base / "real" / "out.csv").read_text() == "id,name\n1,a\n" + + # --------------------------------------------------------------------------- # Timeout clamp # --------------------------------------------------------------------------- From 4be1cc1913e93a584cfff0fe49083c50d8ecfd89 Mon Sep 17 00:00:00 2001 From: michael ruder Date: Wed, 3 Jun 2026 10:21:31 -0400 Subject: [PATCH 8/9] docs(noom-mcp-server): bring export-tool design note current Updates the design note to the shipped behavior after the bugbot review round: atomic write, no-clobber overwrite guard, lazy just-in-time link fetching, timeout clamp, write-time sandbox re-check, and the retention/PII lifecycle. Adds a correctness/safety property table, corrects the presigned-link "~15 min" wording to an observation (not a documented SLA), and updates file/test counts (6 files, 65 tests). Includes a changelog of what changed since the prior draft. Co-Authored-By: Claude Opus 4.8 --- noom-mcp-server/docs/export-tool-design.md | 109 ++++++++++++++++----- 1 file changed, 87 insertions(+), 22 deletions(-) diff --git a/noom-mcp-server/docs/export-tool-design.md b/noom-mcp-server/docs/export-tool-design.md index 8bcdc6ae..0c7e1e94 100644 --- a/noom-mcp-server/docs/export-tool-design.md +++ b/noom-mcp-server/docs/export-tool-design.md @@ -4,7 +4,7 @@ Returning large query results through the MCP blows up the model's context window, and nothing in the existing toolset avoids it. This adds a new customization-layer tool, `export_query_to_file`, which has the **server -process** write results straight to local disk and return only a small +process** atomically write results to local disk and return only a small manifest — so the row data never passes through the model's context at all. ## The core problem: how MCP tool results reach the model @@ -62,31 +62,62 @@ into context. results). Instead it issues the statement with `disposition=EXTERNAL_LINKS` and `format=CSV`. Databricks writes the result to cloud storage as CSV chunk files and returns presigned download URLs. -2. **Server-side streaming.** The server downloads each presigned chunk and - writes the bytes to the local file, following the `next_chunk_index` chain. - Memory-bounded (one chunk at a time), never truncates. The CSV header arrives - in the first chunk; an empty result yields a header-only file from the - manifest schema. -3. **What the model sees.** Only the manifest — not the rows. -4. **Governance preserved.** Reuses the existing chokepoints: Service-Principal - client, forced warehouse override, and `mcp_user:` query tagging, - plus a `mcp_tool:export_query_to_file` tag. -5. **Path safety.** Writes are confined to `DATABRICKS_MCP_EXPORT_DIR` - (default `~/databricks-mcp-exports`); `..` traversal and absolute paths that - escape the base are rejected. +2. **Lazy, just-in-time streaming.** A generator walks the `next_chunk_index` + chain, and each chunk's presigned link is downloaded *immediately* as it is + produced (not collected up front). This keeps each URL freshly issued when + used, so links don't age out mid-export, and it is memory-bounded (one chunk + at a time) and never truncates. The CSV header arrives in the first chunk; an + empty result yields a header-only file from the manifest schema. +3. **Atomic write.** Chunks stream to a temp file in the destination directory, + which is moved into place only after the *last* chunk is written. If any + download fails — or the wall-clock ceiling fires mid-stream — the temp file + is discarded and the destination is left untouched. There is never a partial + or empty file at the real path (a half-written CSV that looks complete is + exactly the silent corruption this tool exists to prevent). +4. **Overwrite guard.** The early existence check is only a fast-fail; the + authoritative guard is atomic at the move step. `overwrite=false` uses + `os.link` (fails if the destination exists, even if it appeared *during* the + query); `overwrite=true` uses `os.replace`. This closes the time-of-check / + time-of-use gap across a long-running query. +5. **Timeout bounded by the tool ceiling.** The per-call `timeout` is clamped to + the tool's 600s wall-clock ceiling, so the poll loop's own cancellation fires + before FastMCP aborts the call — otherwise an over-large timeout would orphan + the statement on the warehouse. +6. **What the model sees.** Only the manifest — not the rows. +7. **Governance preserved.** Reuses the existing chokepoints: Service-Principal + client (creds from the `dbrix_mcp_secret` scope), forced warehouse override, + and `mcp_user:` query tagging, plus a `mcp_tool:export_query_to_file` + tag. Exports appear in `system.query.history` like any governed query. +8. **Path safety.** Writes are confined to `DATABRICKS_MCP_EXPORT_DIR` (default + `~/databricks-mcp-exports`); `..` traversal and absolute escapes are rejected. + Containment is also **re-checked immediately before writing**, so a symlink + introduced under the base during the query can't redirect the write outside + the sandbox. + +### Correctness & safety properties (and where each is enforced) +| Property | Mechanism | +|---|---| +| No truncation of large results | `EXTERNAL_LINKS` + full `next_chunk_index` pagination (vs. upstream's first-chunk `INLINE`) | +| Memory-bounded | one chunk streamed at a time | +| No presigned-link expiry mid-export | lazy, just-in-time per-chunk download | +| No partial/empty file on failure | temp file + atomic move, discard-on-error | +| No silent clobber of an existing file | `os.link` no-clobber when `overwrite=false` | +| No orphaned warehouse statement | per-call timeout clamped to the 600s ceiling | +| No write outside the sandbox | up-front path validation + write-time containment re-check | +| No data through the model context | server writes the file; only the manifest is returned | ### Data lifecycle There are two distinct "CSV" artifacts; only one is ours to manage. | Artifact | Location | Lifecycle | Owner | |---|---|---|---| -| Cloud-fetch staging | Databricks-managed cloud storage | Transient — presigned links expire in ~15 min (measured); the staged result is held only briefly under Databricks' statement-result retention, then purged | **Databricks** — we don't choose the path, create a table/volume, or extend it | -| Exported `.csv` | Engineer's local disk (`DATABRICKS_MCP_EXPORT_DIR`) | Deleted after `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` (default 7); swept on startup and before each export | **This tool** | +| Cloud-fetch staging | Databricks-managed cloud storage | Transient. Each presigned link carries an explicit `expiration` (the authoritative value; ~15 min observed in testing, **not** a documented constant). The staged result is held only briefly under Databricks' statement-result retention, then purged. | **Databricks** — we don't choose the path, create a table/volume, or extend it | +| Exported `.csv` | Engineer's local disk (`DATABRICKS_MCP_EXPORT_DIR`) | Deleted after `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` (default 7; `0` disables); swept on startup and before each export. The sweep never follows symlinks or deletes outside the base. | **This tool** | We persist nothing durable or self-owned in Databricks. The only artifact we own is the local file, and because exports can contain prod PII it is **not** retained indefinitely: `sweep_old_exports()` removes files older than the -retention window (set the env var to `0` to disable). +retention window. ### Governing principle The MCP layer is a **control plane, not a data plane.** It orchestrates and @@ -100,16 +131,50 @@ pattern as the SQL-timeout PR (#18): | File | Change | |---|---| -| `customization/export_query_patch.py` | New — tool, chunk streaming, path sandbox | +| `customization/export_query_patch.py` | New — tool, lazy chunk streaming, atomic write, overwrite guard, timeout clamp, path sandbox + retention sweep | | `customization/tool_allowlist_patch.py` | Added `export_query_to_file` to `ALLOWED_TOOLS` | | `run.py` | New Step 2b: registers the tool before the allowlist | -| `tests/test_export_query_patch.py` | New — 14 unit tests | -| `.env.example` | Documented `DATABRICKS_MCP_EXPORT_DIR` | +| `tests/test_export_query_patch.py` | New — unit tests (sandbox, header quoting, lazy chunk iteration, atomic write, overwrite guard, timeout clamp, retention, symlink hardening) | +| `.env.example` | Documented `DATABRICKS_MCP_EXPORT_DIR` and `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` | +| `docs/export-tool-design.md` | This design note | -**Validation:** full unit suite passes, lint clean, and the path is -live-validated against the warehouse (synthetic deterministic result, a real -122-column table slice, and an empty result → header-only file). +**Validation:** 65 unit tests pass, ruff check + format clean. Validated three +ways: the full `run.py` startup lifecycle (tool registered, survives the +allowlist, governed SP path), live warehouse exports (synthetic deterministic, +a real 122-column slice, empty → header-only), and a byte-for-byte identical +result from the local code path and the deployed MCP server on the real +experiment query. ## Follow-ups - Optional Parquet output (`Format.ARROW_STREAM` + pyarrow) if a consumer prefers it over CSV. +- Engineers must restart their MCP client to pick up the tool after it ships. + +--- + +### Changelog — what changed since the previous version of this note +The earlier draft described the v1 design. Since then, code review (Cursor +Bugbot) surfaced several correctness and safety issues that have been fixed; the +note now reflects the shipped behavior: + +- **Atomic write (was: direct write).** The previous note said the server + "writes the bytes straight to the local file." That left a partial/empty file + at the destination if a download failed mid-stream. Now writes go to a temp + file and are atomically moved into place only on full success. +- **Overwrite guard hardened.** The existence check ran once before the query; a + file appearing during the query could still be clobbered. The move now uses + `os.link` (no-clobber) when `overwrite=false`. +- **Lazy link fetching (was: eager collection).** Links are now fetched + just-in-time per chunk so presigned URLs don't expire during long exports. +- **Timeout clamped to the 600s ceiling**, preventing orphaned warehouse + statements when a caller passes an over-large timeout. +- **Write-time sandbox re-check** added, so a symlink introduced during the + query can't redirect the write outside the export dir. +- **Retention / PII lifecycle added.** New `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` + (default 7); files are swept on startup and before each export. The sweep is + symlink-hardened (never follows symlinks or deletes outside the base). +- **Lifecycle wording corrected.** The "~15 min" presigned-link figure is an + *observation*, not a documented Databricks SLA — the per-response `expiration` + field is authoritative. +- **Counts updated.** Six files (added `.env.example`, this doc); 65 unit tests + (was "14 / 51"). From 0e75ec6523d4df38021764af920f1143816be78c Mon Sep 17 00:00:00 2001 From: michael ruder Date: Wed, 3 Jun 2026 10:36:56 -0400 Subject: [PATCH 9/9] revert(noom-mcp-server): remove export retention sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes the time-based retention feature (sweep_old_exports, get_export_retention_days, DATABRICKS_MCP_EXPORT_RETENTION_DAYS, and the startup + per-export sweep calls). It was added in response to a review question, justified by "exports can contain prod PII" — but that premise is wrong here: this MCP masks PII at source, which is exactly what makes local downloads and cross-model work acceptable. And auto-deleting prior exports works directly against reproducibility of analyses, which the team values. Exported files now persist until the engineer deletes them. The export dir is the user's to manage. Removing the sweep also moots the sweep-sandbox-escape review finding (no sweep, nothing to harden). .env.example and the design note updated accordingly (Data lifecycle now states no automatic cleanup, with the PII-masking + reproducibility rationale). Tests for the sweep removed. Full suite 59 passed / 1 skipped; ruff check + format clean; live-validated. Co-Authored-By: Claude Opus 4.8 --- noom-mcp-server/.env.example | 6 +- .../customization/export_query_patch.py | 105 +----------------- noom-mcp-server/docs/export-tool-design.md | 30 ++--- .../tests/test_export_query_patch.py | 84 -------------- 4 files changed, 19 insertions(+), 206 deletions(-) diff --git a/noom-mcp-server/.env.example b/noom-mcp-server/.env.example index 346a8689..5b3c1987 100644 --- a/noom-mcp-server/.env.example +++ b/noom-mcp-server/.env.example @@ -21,10 +21,6 @@ DATABRICKS_WAREHOUSE_ID= # Export directory (optional) — where export_query_to_file writes result files. # All writes are confined to this directory; paths that escape it are rejected. +# Files persist until you delete them (no automatic cleanup). # Defaults to ~/databricks-mcp-exports if unset. # DATABRICKS_MCP_EXPORT_DIR=/path/to/exports - -# Export retention (optional) — exported files are deleted after this many days -# (swept on startup and before each export). Exports can contain prod PII, so -# they are not kept indefinitely. Default 7; set to 0 to disable the sweep. -# DATABRICKS_MCP_EXPORT_RETENTION_DAYS=7 diff --git a/noom-mcp-server/customization/export_query_patch.py b/noom-mcp-server/customization/export_query_patch.py index 20aafff8..37d027c2 100644 --- a/noom-mcp-server/customization/export_query_patch.py +++ b/noom-mcp-server/customization/export_query_patch.py @@ -74,12 +74,6 @@ # ``timeout`` argument, bounded by the ceiling above. _DEFAULT_STATEMENT_TIMEOUT = 300 -# Default retention window (days) for files in the export directory. Exported -# files can contain prod PII, so they are not kept indefinitely: a sweep removes -# files older than this on startup and before each export. Override with -# DATABRICKS_MCP_EXPORT_RETENTION_DAYS; set it to 0 (or negative) to disable. -_DEFAULT_RETENTION_DAYS = 7 - _SUPPORTED_FORMATS = ("csv",) @@ -100,95 +94,6 @@ def get_export_base_dir() -> Path: return base -# --------------------------------------------------------------------------- -# Retention -# --------------------------------------------------------------------------- - - -def get_export_retention_days() -> int: - """Return the export-file retention window in days (default 7; 0 disables). - - Read from ``DATABRICKS_MCP_EXPORT_RETENTION_DAYS``. Invalid values fall back - to the default with a warning. - """ - raw = os.environ.get("DATABRICKS_MCP_EXPORT_RETENTION_DAYS") - if raw is None or not raw.strip(): - return _DEFAULT_RETENTION_DAYS - try: - return int(raw) - except ValueError: - logger.warning( - "Invalid DATABRICKS_MCP_EXPORT_RETENTION_DAYS=%r; using default %d.", - raw, - _DEFAULT_RETENTION_DAYS, - ) - return _DEFAULT_RETENTION_DAYS - - -def sweep_old_exports(base_dir: Optional[Path] = None, retention_days: Optional[int] = None) -> int: - """Delete files in the export dir older than the retention window. - - Exported CSVs can contain prod PII, so they are not retained indefinitely. - A retention of <= 0 disables the sweep. Empty subdirectories left behind are - pruned. Errors removing an individual path are logged, not raised — a sweep - failure must never block an export. - - Args: - base_dir: Override for the export base dir (tests). - retention_days: Override for the retention window (tests). - - Returns: - The number of files removed. - """ - days = get_export_retention_days() if retention_days is None else retention_days - if days <= 0: - return 0 - - base = (base_dir or get_export_base_dir()).resolve() - cutoff = time.time() - days * 86400 - removed = 0 - - # os.walk(followlinks=False) never descends into symlinked directories, so - # the walk cannot reach files outside the export dir. We additionally skip - # symlinks outright and confirm each file's real path stays under base — a - # deletion path must not depend on a glob/walk default to stay sandboxed. - for root, _dirs, files in os.walk(base, followlinks=False): - root_path = Path(root) - for name in files: - path = root_path / name - try: - if path.is_symlink(): - continue - if base not in path.resolve().parents: - continue - if path.stat().st_mtime < cutoff: - path.unlink() - removed += 1 - except OSError as exc: - logger.warning("export retention sweep: could not remove %s: %s", path, exc) - - # Prune now-empty real subdirectories, deepest first (never base, never a - # symlinked directory). - for root, _dirs, _files in os.walk(base, topdown=False, followlinks=False): - path = Path(root) - if path == base or path.is_symlink(): - continue - try: - if not any(path.iterdir()): - path.rmdir() - except OSError: - pass - - if removed: - logger.info( - "export retention sweep: removed %d file(s) older than %d day(s) from %s", - removed, - days, - base, - ) - return removed - - def resolve_export_path(output_path: str, base_dir: Optional[Path] = None) -> Path: """Resolve ``output_path`` against the export base dir, rejecting escapes. @@ -415,9 +320,6 @@ def _run_export( # statement on the warehouse. The poll loop below cancels at this bound. timeout = _clamp_statement_timeout(timeout) - # Clear out files past the retention window before writing a new one. - sweep_old_exports() - base = get_export_base_dir() dest = resolve_export_path(output_path, base) if dest.exists() and not overwrite: @@ -546,13 +448,8 @@ def export_query_to_file( timeout=timeout, ) - # Sweep stale exports left from previous runs at startup, so retention holds - # even for an engineer who exports rarely. - sweep_old_exports() - logger.info( - "Registered export_query_to_file tool (ceiling=%ss, export dir=%s, retention=%d days)", + "Registered export_query_to_file tool (ceiling=%ss, export dir=%s)", EXPORT_TOOL_TIMEOUT_CEILING, get_export_base_dir(), - get_export_retention_days(), ) diff --git a/noom-mcp-server/docs/export-tool-design.md b/noom-mcp-server/docs/export-tool-design.md index 0c7e1e94..277c553b 100644 --- a/noom-mcp-server/docs/export-tool-design.md +++ b/noom-mcp-server/docs/export-tool-design.md @@ -112,12 +112,15 @@ There are two distinct "CSV" artifacts; only one is ours to manage. | Artifact | Location | Lifecycle | Owner | |---|---|---|---| | Cloud-fetch staging | Databricks-managed cloud storage | Transient. Each presigned link carries an explicit `expiration` (the authoritative value; ~15 min observed in testing, **not** a documented constant). The staged result is held only briefly under Databricks' statement-result retention, then purged. | **Databricks** — we don't choose the path, create a table/volume, or extend it | -| Exported `.csv` | Engineer's local disk (`DATABRICKS_MCP_EXPORT_DIR`) | Deleted after `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` (default 7; `0` disables); swept on startup and before each export. The sweep never follows symlinks or deletes outside the base. | **This tool** | +| Exported `.csv` | Engineer's local disk (`DATABRICKS_MCP_EXPORT_DIR`) | Persists until the engineer deletes it — **no automatic cleanup**. | **The engineer** | -We persist nothing durable or self-owned in Databricks. The only artifact we -own is the local file, and because exports can contain prod PII it is **not** -retained indefinitely: `sweep_old_exports()` removes files older than the -retention window. +We persist nothing durable or self-owned in Databricks. The exported file is the +engineer's to keep or remove. There is **deliberately no automatic deletion**: +the data is PII-masked at source by the governed MCP (which is what makes local +downloads acceptable in the first place), and **reproducibility of prior +analyses outweighs disk hygiene** — silently removing earlier exports would +undermine re-running a past analysis. A time-based retention sweep was +prototyped during review and removed for these reasons. ### Governing principle The MCP layer is a **control plane, not a data plane.** It orchestrates and @@ -131,14 +134,14 @@ pattern as the SQL-timeout PR (#18): | File | Change | |---|---| -| `customization/export_query_patch.py` | New — tool, lazy chunk streaming, atomic write, overwrite guard, timeout clamp, path sandbox + retention sweep | +| `customization/export_query_patch.py` | New — tool, lazy chunk streaming, atomic write, overwrite guard, timeout clamp, path sandbox | | `customization/tool_allowlist_patch.py` | Added `export_query_to_file` to `ALLOWED_TOOLS` | | `run.py` | New Step 2b: registers the tool before the allowlist | -| `tests/test_export_query_patch.py` | New — unit tests (sandbox, header quoting, lazy chunk iteration, atomic write, overwrite guard, timeout clamp, retention, symlink hardening) | -| `.env.example` | Documented `DATABRICKS_MCP_EXPORT_DIR` and `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` | +| `tests/test_export_query_patch.py` | New — unit tests (sandbox, header quoting, lazy chunk iteration, atomic write, overwrite guard, timeout clamp) | +| `.env.example` | Documented `DATABRICKS_MCP_EXPORT_DIR` | | `docs/export-tool-design.md` | This design note | -**Validation:** 65 unit tests pass, ruff check + format clean. Validated three +**Validation:** 59 unit tests pass, ruff check + format clean. Validated three ways: the full `run.py` startup lifecycle (tool registered, survives the allowlist, governed SP path), live warehouse exports (synthetic deterministic, a real 122-column slice, empty → header-only), and a byte-for-byte identical @@ -170,11 +173,12 @@ note now reflects the shipped behavior: statements when a caller passes an over-large timeout. - **Write-time sandbox re-check** added, so a symlink introduced during the query can't redirect the write outside the export dir. -- **Retention / PII lifecycle added.** New `DATABRICKS_MCP_EXPORT_RETENTION_DAYS` - (default 7); files are swept on startup and before each export. The sweep is - symlink-hardened (never follows symlinks or deletes outside the base). +- **No automatic file retention.** A time-based retention sweep was prototyped + during review and then **removed**: the exported data is PII-masked at source, + and auto-deleting prior exports works against reproducibility. Exported files + persist until the engineer removes them. - **Lifecycle wording corrected.** The "~15 min" presigned-link figure is an *observation*, not a documented Databricks SLA — the per-response `expiration` field is authoritative. -- **Counts updated.** Six files (added `.env.example`, this doc); 65 unit tests +- **Counts updated.** Six files (added `.env.example`, this doc); 59 unit tests (was "14 / 51"). diff --git a/noom-mcp-server/tests/test_export_query_patch.py b/noom-mcp-server/tests/test_export_query_patch.py index 0d3c658b..c5d7f652 100644 --- a/noom-mcp-server/tests/test_export_query_patch.py +++ b/noom-mcp-server/tests/test_export_query_patch.py @@ -277,90 +277,6 @@ def test_clamp_statement_timeout(): assert ep._clamp_statement_timeout(ceiling) == ceiling # exactly → unchanged -# --------------------------------------------------------------------------- -# Retention sweep -# --------------------------------------------------------------------------- - - -def _make_aged_file(path, age_days): - import os - import time - - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text("x") - mtime = time.time() - age_days * 86400 - os.utime(path, (mtime, mtime)) - - -def test_sweep_removes_old_keeps_recent(tmp_path): - old = tmp_path / "old.csv" - recent = tmp_path / "recent.csv" - _make_aged_file(old, age_days=10) - _make_aged_file(recent, age_days=1) - - removed = ep.sweep_old_exports(base_dir=tmp_path, retention_days=7) - - assert removed == 1 - assert not old.exists() - assert recent.exists() - - -def test_sweep_disabled_when_retention_not_positive(tmp_path): - old = tmp_path / "old.csv" - _make_aged_file(old, age_days=999) - assert ep.sweep_old_exports(base_dir=tmp_path, retention_days=0) == 0 - assert old.exists() - - -def test_sweep_does_not_descend_dir_symlink(tmp_path): - # A directory symlink inside the export dir pointing OUTSIDE must never be - # followed — files behind it must survive the sweep. - base = tmp_path / "base" - base.mkdir() - external = tmp_path / "external" - external.mkdir() - victim = external / "victim.csv" - _make_aged_file(victim, age_days=999) - (base / "link").symlink_to(external, target_is_directory=True) - - ep.sweep_old_exports(base_dir=base, retention_days=7) - - assert victim.exists() # external file must NOT be deleted - - -def test_sweep_skips_file_symlink_to_external(tmp_path): - # A file symlink to an external old file must not cause the target's deletion. - base = tmp_path / "base" - base.mkdir() - external = tmp_path / "ext" - external.mkdir() - target = external / "target.csv" - _make_aged_file(target, age_days=999) - (base / "alias.csv").symlink_to(target) - - ep.sweep_old_exports(base_dir=base, retention_days=7) - - assert target.exists() # symlink target untouched - - -def test_sweep_prunes_empty_subdirs(tmp_path): - nested = tmp_path / "sub" / "deep.csv" - _make_aged_file(nested, age_days=30) - ep.sweep_old_exports(base_dir=tmp_path, retention_days=7) - assert not nested.exists() - assert not (tmp_path / "sub").exists() # emptied dir pruned - assert tmp_path.exists() # base dir never removed - - -def test_get_retention_days_default_and_override(monkeypatch): - monkeypatch.delenv("DATABRICKS_MCP_EXPORT_RETENTION_DAYS", raising=False) - assert ep.get_export_retention_days() == ep._DEFAULT_RETENTION_DAYS - monkeypatch.setenv("DATABRICKS_MCP_EXPORT_RETENTION_DAYS", "3") - assert ep.get_export_retention_days() == 3 - monkeypatch.setenv("DATABRICKS_MCP_EXPORT_RETENTION_DAYS", "garbage") - assert ep.get_export_retention_days() == ep._DEFAULT_RETENTION_DAYS # falls back - - def test_download_link_applies_http_headers(monkeypatch): captured = {}