-
Notifications
You must be signed in to change notification settings - Fork 9
docs(geneva): add user guide for profiling stateful UDF memory #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,233 @@ | ||
| --- | ||
| title: Profiling Stateful UDF Memory | ||
| sidebarTitle: Profiling Memory | ||
| description: Find memory leaks and runaway peak usage in stateful UDFs with memray, before they cause worker OOMs in production. | ||
| icon: chart-line | ||
| --- | ||
|
|
||
| import { | ||
| PyStatefulUdfClass, | ||
| PyMemrayTrackerUdf, | ||
| PyRayClusterProfile, | ||
| PyLogMemory, | ||
| PyLeakyCache, | ||
| PyBoundedCache, | ||
| PyLeakyAggregator, | ||
| PyLeakyClosure, | ||
| PyTorchInferenceMode, | ||
| PyConfidenceCheck, | ||
| } from '/snippets/geneva_profiling_memory.mdx'; | ||
|
|
||
| Stateful UDFs are the most common source of worker memory pressure in Geneva. Unlike scalar UDFs, a stateful UDF instance lives for the **entire lifetime of a Ray actor**, processing many batches in sequence. Anything your `setup()` allocates is held for the duration of the job, and anything `__call__` retains accumulates batch after batch — sometimes silently, until a worker OOMs partway through a large backfill. | ||
|
|
||
| This page shows you how to profile a stateful UDF locally with [memray](https://github.com/bloomberg/memray), what to look for, and the common patterns that leak. | ||
|
|
||
| <Tip> | ||
| If your worker is being OOM-killed and you don't know why, **profile a single actor locally first**. A 5-minute memray run on your laptop is faster than another 45-minute distributed run that fails the same way. | ||
| </Tip> | ||
|
|
||
| ## Why stateful UDFs leak | ||
|
|
||
| A stateful UDF in Geneva is a class: | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyStatefulUdfClass} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| A few facts make memory behavior easy to get wrong: | ||
|
|
||
| - **One instance per worker, many batches.** Geneva instantiates the class once per Ray actor. The same `self` processes every batch routed to that worker — potentially thousands. | ||
| - **`setup()` runs once.** Whatever you allocate there stays in memory until the actor dies. That's intentional for things like ML models, but it's a footgun for "lazy" caches that grow. | ||
| - **`self.<attr>` survives across calls.** Anything you attach to `self` inside `__call__` is retained for the rest of the actor's life. | ||
| - **Workers don't restart between batches.** Unlike a serverless function, you don't get a fresh process per invocation. Memory accumulates linearly with batch count until the worker hits its memory cap. | ||
|
|
||
| The result: a leak that looks tiny in unit tests (1 batch, 4 MiB) can blow up an 8-hour backfill (10 000 batches, 40 GiB). | ||
|
|
||
| ## When to profile | ||
|
|
||
| Profile your UDF if **any** of these are true: | ||
|
|
||
| - The UDF loads a model, builds an index, or otherwise allocates more than ~100 MiB in `setup()`. | ||
| - The UDF maintains a cache, deduplication table, or running statistic in `self`. | ||
| - A worker is being OOM-killed during backfill (look for `FatalWorkerOOMError`, see [Job troubleshooting](/geneva/jobs/troubleshooting)). | ||
| - Worker RSS grows steadily during a backfill rather than staying flat after `setup()`. | ||
|
|
||
| You do **not** need to profile pure functional UDFs (no `self` state) or UDFs that only ever read from `self` — those can't leak by construction. | ||
|
|
||
| ## Profiling a UDF with memray | ||
|
|
||
| memray ships as a `dev` dependency in Geneva, so it's already in your environment if you installed with `uv sync`. | ||
|
|
||
| The trick to profiling under Ray is that **workers run in separate processes**. Wrapping `pytest` with `memray run` only sees the driver, not the actors that actually run your UDF. The cleanest pattern is to have **the UDF instrument itself**, controlled by an environment variable that's only set when you want a profile. | ||
|
|
||
| ### Step 1 — add an opt-in tracker to your UDF | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyMemrayTrackerUdf} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| The tracker is a no-op when `MY_UDF_MEMRAY_OUT_DIR` isn't set, so leaving this code in your UDF is safe for production runs. | ||
|
|
||
| ### Step 2 — propagate the env var to Ray workers | ||
|
|
||
| Ray workers don't inherit driver environment variables by default. When you start a local Ray cluster from Geneva, pass `extra_env` so the variable reaches each worker: | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyRayClusterProfile} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| <Tip> | ||
| Set `concurrency=1` while profiling. One actor processing all batches sequentially produces a single clean trace; the default of 8 produces 8 noisier traces that you'd have to merge mentally. | ||
| </Tip> | ||
|
|
||
| ### Step 3 — read the trace | ||
|
|
||
| When backfill finishes, you'll have one (or more) `memray-<pid>-<uuid>.bin` files under `/tmp/my-udf-profile/`. Render and inspect them: | ||
|
|
||
| ```bash | ||
| # Quick summary — peak heap, total allocations, what's leaked | ||
| uv run -m memray summary /tmp/my-udf-profile/memray-*.bin | ||
|
|
||
| # Interactive flamegraph in your browser | ||
| uv run -m memray flamegraph /tmp/my-udf-profile/memray-*.bin | ||
| open /tmp/my-udf-profile/memray-*.html | ||
|
|
||
| # Top allocators by retained bytes | ||
| uv run -m memray tree /tmp/my-udf-profile/memray-*.bin | ||
| ``` | ||
|
|
||
| ## What the numbers mean | ||
|
|
||
| memray reports two values you'll care about most: | ||
|
|
||
| - **Peak heap** (`metadata.peak_memory`) — the high-water mark. This is what triggers OOMs. A peak well above your `setup()` allocations means a batch transiently doubles memory before freeing. | ||
| - **Leaked allocations** (`get_leaked_allocation_records()`) — what was still allocated when the tracker ended. **This is not necessarily a bug** — your `setup()` model is "leaked" in this sense because it lives the actor's lifetime. The signal is *how much above expected baseline* is leaked. | ||
|
|
||
| A healthy stateful UDF profile, after processing many batches, looks roughly like: | ||
|
|
||
| ``` | ||
| peak heap ≈ setup() allocations + 1 batch of working memory | ||
| leaked ≈ setup() allocations (i.e. nothing extra retained from __call__) | ||
| ``` | ||
|
|
||
| An **unhealthy** profile looks like: | ||
|
|
||
| ``` | ||
| peak heap ≈ setup() + N × per-call allocation ← grows with batch count | ||
| leaked ≈ setup() + N × per-call allocation ← per-call state never freed | ||
| ``` | ||
|
|
||
| The flamegraph will show a thick stack frame anchored in `__call__` rising as you scroll through time — that's the leak. | ||
|
|
||
| ## Going deeper: RSS vs Arrow allocations | ||
|
|
||
| memray gives you the *Python-side* allocation story. For real diagnosis of "where is the worker's memory actually going?", it pays to watch **process RSS** and **Arrow's own allocator** side-by-side — together they tell you which subsystem owns the bytes, often faster than reading a flamegraph. | ||
|
|
||
| Drop this into your UDF (or anywhere on the worker) to log a snapshot: | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyLogMemory} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| The three numbers answer different questions: | ||
|
|
||
| - **`rss_mb`** — every byte the OS has handed this Python interpreter. Includes Python heap, Arrow, native libraries, and pages the C allocator (`glibc`/`jemalloc`) is holding even though Python freed them. This is what triggers cgroup OOM-kills. | ||
| - **`arrow_live_mb`** — bytes currently held by *live PyArrow buffers* (`RecordBatch`, `Array`, `ChunkedArray`, etc.). Goes up when you create Arrow data, down when those references are dropped. | ||
| - **`gap_mb` = rss − arrow_live** — "everything else." This is the Python heap (your own `self.cache`, model weights, dicts, lists), native libraries (PyTorch, ONNX), and allocator retention. | ||
|
|
||
| ### Diagnostic patterns | ||
|
|
||
| Log the breakdown every few batches and the *shape of growth over time* tells you which subsystem to fix: | ||
|
|
||
| | Pattern | Diagnosis | First thing to try | | ||
| |---|---|---| | ||
| | `rss` climbs slowly, `arrow_live` flat near zero, big growing `gap` | Allocator retention — Python freed it but `glibc` is keeping the pages | `ctypes.CDLL("libc.so.6").malloc_trim(0)` periodically (Linux only); or set `MALLOC_TRIM_THRESHOLD_=131072` | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where would I set MALLOC_TRIM_THRESHOLD_? I guess that's an env var? This is confusing to me - I don't have a sense what's happening here, so I would probably copy these in as magic incantations but feel bad about it :) But this whole section is a little over my head anyway. Good to have it, I think, but I hope I never need it!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that was a new one for me as well, it's just a linux environment variable. So much of the tuning in Linux/MacOS/Windows is different, really makes me wish there was a more standard API/config for this stuff. |
||
| | `rss` climbs, `arrow_live` climbs in lockstep | Real Arrow leak — your code is holding `RecordBatch` / `Array` references | Find where you're appending batches to `self`, or where checkpoint / error payloads aren't being released | | ||
| | `rss` spikes hugely on a few calls then settles, eventually one spike OOMs | Peak is too big, not a leak — a single call allocates more than the worker has | Shrink `batch_size`, `blob_read_buffer_size`, or split the work | | ||
| | `rss` flat for hours then sudden cliff upward | One pathological row — usually one huge blob (a 4K-resolution image, a 50 MB PDF) | Find the offending row by ID; add a size check at the top of `__call__` | | ||
| | `rss` rises during `setup()`, then flat for the whole run, `gap` constant | Healthy — that's your model loaded once per actor | Nothing to do | | ||
|
|
||
| <Tip> | ||
| The reference UDFs in Geneva's own integration test (`src/stress_tests/_memray_probe.py`) print exactly this breakdown every 32 calls. The workflow's stdout logs are a working example of the "clean" and "leaky" patterns — the leaky one shows the **lockstep with Python heap** signature (the second row above, but with `gap` climbing instead of `arrow_live` — because the leak is `bytearray`, not Arrow). | ||
| </Tip> | ||
|
|
||
| ## Common leak patterns | ||
|
|
||
| ### 1. The growing cache | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyLeakyCache} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| Looks harmless. Fine on a unit test with 10 inputs. **Catastrophic on a backfill of 10M rows**, where most inputs are unique and the cache grows to fill the worker. | ||
|
|
||
| **Fix:** Use a bounded cache (`functools.lru_cache` with `maxsize`, or a manual size cap), or skip caching when you don't know the cardinality. | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyBoundedCache} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| ### 2. Accumulating per-call buffers | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyLeakyAggregator} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| **Fix:** Don't hold references to inputs past the return of `__call__`. If you need rolling state, summarize into a small aggregate (counts, sums) instead of holding the raw batches. | ||
|
|
||
| ### 3. Closures capturing batch arrays | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyLeakyClosure} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| **Fix:** Extract only the small values you actually need into the closure, or execute the work eagerly. | ||
|
|
||
| ### 4. ML model state that grows | ||
|
|
||
| Some ML libraries retain per-call state internally (KV caches, gradient buffers, autograd graphs). If you're using PyTorch: | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyTorchInferenceMode} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| For Hugging Face pipelines, ensure you're in `eval()` mode and not accumulating gradients. For long-running stateful UDFs on GPUs, also see `torch.cuda.empty_cache()` between large batches. | ||
|
|
||
| ## A confidence check | ||
|
|
||
| A useful "does my profiling actually work?" sanity check: temporarily introduce a deliberate leak and confirm memray catches it. | ||
|
|
||
| <CodeGroup> | ||
| <CodeBlock filename="Python" language="python" icon="python"> | ||
| {PyConfidenceCheck} | ||
| </CodeBlock> | ||
| </CodeGroup> | ||
|
|
||
| If `memray summary` doesn't show leaked bytes growing roughly with batch count after this change, your tracker isn't actually attached (most often: the env var isn't reaching workers — re-check `extra_env`). | ||
|
|
||
| Geneva's own test suite ships a reference implementation of this pattern in `src/stress_tests/_memray_probe.py` and `src/stress_tests/test_memray_stateful_udf.py`, plus a GitHub Actions workflow (`memray-stateful-udf-profile.yml`) that uploads the per-actor `.bin` and rendered flamegraph as a CI artifact. Feel free to copy that scaffolding for your own project's UDFs. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reference the geneva source? I guess so - if you pay for geneva, you get the source. It kinda feels weird; we basically never do otherwise. But I guess there's not anything actually wrong here. Ok, I guess I don't really have a question here 😆 |
||
|
|
||
| ## Related | ||
|
|
||
| - [UDFs](/geneva/udfs/udfs) — defining stateful UDFs | ||
| - [Job troubleshooting](/geneva/jobs/troubleshooting) — diagnosing OOMs and other worker errors | ||
| - [Advanced configuration](/geneva/udfs/advanced-configuration) — admission control and resource limits | ||
| - [memray documentation](https://bloomberg.github.io/memray/) — flamegraph, summary, and tree report formats | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| {/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */} | ||
|
|
||
| export const PyBoundedCache = "from functools import lru_cache\n\nclass GoodEmbedding:\n def __init__(self):\n self._embed = None\n\n def setup(self):\n model = load_model()\n self._embed = lru_cache(maxsize=1024)(model.embed)\n\n def __call__(self, text: str) -> list[float]:\n if self._embed is None:\n self.setup()\n return self._embed(text)\n"; | ||
|
|
||
| export const PyConfidenceCheck = "def __call__(self, x):\n scratch = bytearray(8 * 1024 * 1024) # 8 MiB\n self._scratches.append(scratch) # <-- deliberate leak\n return ...\n"; | ||
|
|
||
| export const PyLeakyAggregator = "class BadAggregator:\n def __init__(self):\n self.history = []\n\n def __call__(self, batch: pa.RecordBatch) -> pa.Array:\n self.history.append(batch) # holds every batch ever processed\n ...\n"; | ||
|
|
||
| export const PyLeakyCache = "class BadEmbedding:\n def __init__(self):\n self.cache: dict[str, list[float]] = {}\n\n def __call__(self, text: str) -> list[float]:\n if text not in self.cache:\n self.cache[text] = self.model.embed(text)\n return self.cache[text]\n"; | ||
|
|
||
| export const PyLeakyClosure = "class BadDeferred:\n def __init__(self):\n self.work_queue = []\n\n def __call__(self, x: pa.Array) -> pa.Array:\n # Lambda captures `x` by reference — the whole Array stays alive\n self.work_queue.append(lambda: expensive(x))\n ...\n"; | ||
|
|
||
| export const PyLogMemory = "import resource, pyarrow as pa\n\ndef log_memory(seq: int) -> None:\n rss_bytes = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss\n # ru_maxrss is bytes on macOS, KiB on Linux:\n import sys\n if sys.platform != \"darwin\":\n rss_bytes *= 1024\n arrow_live = pa.total_allocated_bytes()\n print(\n f\"seq={seq} \"\n f\"rss_mb={rss_bytes // 1024**2} \"\n f\"arrow_live_mb={arrow_live // 1024**2} \"\n f\"gap_mb={(rss_bytes - arrow_live) // 1024**2}\",\n flush=True,\n )\n"; | ||
|
|
||
| export const PyMemrayTrackerUdf = "import os, pathlib, uuid\nfrom typing import Any\nimport memray\nimport geneva\nimport pyarrow as pa\n\n_MEMRAY_OUT_DIR_ENV = \"MY_UDF_MEMRAY_OUT_DIR\"\n\n\n@geneva.udf(data_type=pa.list_(pa.float32(), 512))\nclass MyEmbedding:\n def __init__(self):\n self.model = None\n self._tracker: Any = None # memray.Tracker, when profiling is on\n\n def setup(self):\n # Open a memray tracker per worker process, if requested. Each\n # worker writes its own .bin file so traces don't collide.\n out_dir = os.environ.get(_MEMRAY_OUT_DIR_ENV)\n if out_dir:\n pathlib.Path(out_dir).mkdir(parents=True, exist_ok=True)\n bin_path = pathlib.Path(out_dir) / (\n f\"memray-{os.getpid()}-{uuid.uuid4().hex}.bin\"\n )\n self._tracker = memray.Tracker(\n str(bin_path), native_traces=False, follow_fork=False\n )\n self._tracker.__enter__()\n self.model = load_model()\n\n def __call__(self, text: str) -> list[float]:\n if self.model is None:\n self.setup()\n return self.model.embed(text)\n"; | ||
|
|
||
| export const PyRayClusterProfile = "from geneva.runners.ray._mgr import ray_cluster\n\nwith ray_cluster(\n local=True,\n extra_env={\"MY_UDF_MEMRAY_OUT_DIR\": \"/tmp/my-udf-profile\"},\n):\n table.backfill(\"embedding\", concurrency=1)\n"; | ||
|
|
||
| export const PyStatefulUdfClass = "@geneva.udf(data_type=pa.list_(pa.float32(), 512))\nclass MyEmbedding:\n def __init__(self):\n self.model = None\n\n def setup(self):\n self.model = load_model() # allocated once per actor\n\n def __call__(self, text: str) -> list[float]:\n if self.model is None:\n self.setup()\n return self.model.embed(text)\n"; | ||
|
|
||
| export const PyTorchInferenceMode = "def __call__(self, text: str) -> list[float]:\n with torch.inference_mode(): # <-- prevents autograd graph retention\n return self.model.encode(text)\n"; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could mean transient doubling, or a lot of other things, right? like a slow memory leak would still result in high peak_memory, I assume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, it would just take longer to surface, which is unfortunately what some of our customers seem to be experiencing.