From db4fcee91aec52ac77af2fc9b3c7a22b76eb466b Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 18 Apr 2026 11:16:11 -0400 Subject: [PATCH] Add batch_size gradient accumulation; release pandas matrix after init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two memory-scale improvements for Calibration at >1e6 records: 1. Release the pandas DataFrame after __init__. Calibration now builds a single float32 torch copy of the user's estimate_matrix (self.estimate_matrix_tensor) during __init__ and sets self.original_estimate_matrix = None. Downstream code in hyperparameter_tuning, evaluation, exclude_targets, and assess_analytical_solution reads the cached tensor instead of re-materializing from .values. This avoids holding both a float64 pandas (6 GB at 1.5M x 500) and a float32 torch (3 GB) copy simultaneously. 2. Add batch_size to Calibration and reweight(). When set, the chi-squared loss gradient is accumulated over disjoint record batches via a two-pass scheme: - Phase 1: accumulate S_j = sum_i w_i * A_{ij} per target under torch.no_grad() (peak memory O(batch_size * n_targets)). - Compute per-target coefficient c_j = d(loss)/d(S_j) = 2 * ((S_j - t_j + 1) / denom_j^2) / n_targets * normalization_factor_j using the same clamped denominator as metrics.loss(), so batched and full-batch agree on targets near -1. - Phase 2: per batch, compute virtual_loss_b = coef · (exp(weights_log[b]) @ A[b]) and call .backward() to accumulate gradients into weights_log. retain_graph=True on all but the last batch preserves the weights_log dropout computation graph across the inner loop. Single-mask semantics: dropout is applied once to the full weights_log tensor before the phase 2 loop, then sliced per batch, so batched gradient equals full-batch gradient exactly (not approximately) under the same dropout realization. Not supported: regularize_with_l0=True with batch_size (the L0 sparse loop uses a different objective and is not yet batched). Raises ValueError if both are set. Tests (TDD): - tests/test_memory.py: original_estimate_matrix is None post-init; estimate_matrix_tensor is present with correct dtype/shape; calibrate() still converges after the release. - tests/test_batch_reweight.py: 9 tests covering (a) full-batch determinism, (b) batch_size=100 matches full-batch within 1e-4 rel err, (c) ragged tail (batch_size=333 with n=1000), (d) batch_size >= n degenerates exactly, (e) batch_size=1 extreme, (f) equivalence under dropout_rate=0.3 (single-mask invariant), (g) equivalence under a non-trivial normalization_factor, (h) equivalence under excluded_targets, (i) batch_size + regularize_with_l0 raises. Reviewer feedback addressed (three subagent reviews, 2026-04-18): - Methodology (accept): math derivation verified. E1 fix — coef uses _safe_denominator(targets) identical to metrics.loss, so targets = -1 no longer diverges between paths. - Reproducibility (minor revisions): changelog fragments added; _full_estimate_matrix_tensor renamed to estimate_matrix_tensor (public-ish, since cross-module); L0+batch_size interaction now raises instead of silently running full-batch; edge-case tests added (dropout, normalization, excluded, batch=1); weakref-based memory test replaced with direct attribute-state assertions. - Code-simplifier: unused `field` import dropped in adapter, DataFrame construction collapsed to dict comprehension, redundant isinstance guard removed, batch_starts list replaces recomputed n_batches. All 49 upstream tests pass under `pytest tests/`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../batch-calibration-memory-fix.added.md | 1 + .../batch-calibration-memory-fix.changed.md | 1 + src/microcalibrate/calibration.py | 77 +++++---- src/microcalibrate/evaluation.py | 16 +- src/microcalibrate/hyperparameter_tuning.py | 11 +- src/microcalibrate/reweight.py | 100 +++++++++++- tests/test_batch_reweight.py | 152 ++++++++++++++++++ tests/test_memory.py | 74 +++++++++ 8 files changed, 381 insertions(+), 51 deletions(-) create mode 100644 changelog.d/batch-calibration-memory-fix.added.md create mode 100644 changelog.d/batch-calibration-memory-fix.changed.md create mode 100644 tests/test_batch_reweight.py create mode 100644 tests/test_memory.py diff --git a/changelog.d/batch-calibration-memory-fix.added.md b/changelog.d/batch-calibration-memory-fix.added.md new file mode 100644 index 0000000..60246a7 --- /dev/null +++ b/changelog.d/batch-calibration-memory-fix.added.md @@ -0,0 +1 @@ +Added `batch_size` parameter to `Calibration` and `reweight()` for gradient accumulation over record batches. When set, the chi-squared loss is accumulated under `no_grad` in a first pass and the backward pass is split into per-batch virtual-loss calls with pre-computed per-target coefficients. Peak autograd activation memory drops from O(n_records × n_targets) to O(batch_size × n_targets). The full-batch path is unchanged when `batch_size` is `None` (default) or greater than or equal to `n_records`. Not supported in combination with `regularize_with_l0=True` (raises `ValueError`). diff --git a/changelog.d/batch-calibration-memory-fix.changed.md b/changelog.d/batch-calibration-memory-fix.changed.md new file mode 100644 index 0000000..9455d37 --- /dev/null +++ b/changelog.d/batch-calibration-memory-fix.changed.md @@ -0,0 +1 @@ +`Calibration` now converts the user-provided `estimate_matrix` DataFrame to a cached `float32` torch tensor on `estimate_matrix_tensor` during `__init__` and releases the pandas DataFrame reference by setting `original_estimate_matrix` to `None`. Downstream code (`hyperparameter_tuning`, `evaluation`, `assess_analytical_solution`) reads the cached tensor rather than re-materializing from `DataFrame.values`. This substantially reduces peak RSS during `calibrate()` at large record counts. External readers of `Calibration.original_estimate_matrix` will now see `None` after construction; the tensor equivalent is available on `Calibration.estimate_matrix_tensor`. diff --git a/src/microcalibrate/calibration.py b/src/microcalibrate/calibration.py index 87962ed..f3f6c6d 100644 --- a/src/microcalibrate/calibration.py +++ b/src/microcalibrate/calibration.py @@ -42,6 +42,7 @@ def __init__( sparse_learning_rate: Optional[float] = 0.2, regularize_with_l0: Optional[bool] = False, seed: Optional[int] = 42, + batch_size: Optional[int] = None, ): """Initialize the Calibration class. @@ -64,6 +65,7 @@ def __init__( temperature (float): Temperature parameter for L0 regularization, controlling the sparsity of the model. Defaults to 0.5. sparse_learning_rate (float): Learning rate for the regularizing optimizer. Defaults to 0.2. regularize_with_l0 (Optional[bool]): Whether to apply L0 regularization. Defaults to False. + batch_size (Optional[int]): If set, the per-epoch gradient is accumulated over disjoint record batches of this size (two-pass: accumulate the chi-squared estimate under no_grad, then per-batch backward with pre-computed target-coefficient). This keeps peak activation memory O(batch_size * n_targets) instead of O(n_records * n_targets), at the cost of modest fp32 rounding during accumulation. None (default) = full-batch, matching prior behavior exactly. """ # Resolve the torch device exactly once. The fallback chain # (cuda -> mps -> cpu) runs when ``device`` is None so callers @@ -99,6 +101,13 @@ def __init__( self.sparse_learning_rate = sparse_learning_rate self.regularize_with_l0 = regularize_with_l0 self.seed = seed + self.batch_size = batch_size + + # Authoritative float32 copy of the estimate matrix; the + # pandas DataFrame is released after __init__ so its storage is + # garbage-collectable and peak RSS during calibrate() is cut + # substantially at v7 scale (>1e6 rows). + self.estimate_matrix_tensor: Optional[torch.Tensor] = None # Seed torch on every path, and CUDA as well when we actually # resolved to a CUDA device, so stochastic CUDA kernels are @@ -121,17 +130,26 @@ def __init__( self.original_estimate_matrix.columns.to_numpy() ) + # Build a single float32 torch copy of the full estimate matrix + # and release the caller's pandas DataFrame. The tensor is the + # authoritative matrix from here on; downstream code (including + # exclude_targets and hyperparameter tuning) reads it instead of + # re-materializing from .values. + if self.original_estimate_matrix is not None: + self.estimate_matrix_tensor = torch.tensor( + self.original_estimate_matrix.values, + dtype=torch.float32, + device=self.device, + ) + self.original_estimate_matrix = None + if self.excluded_targets is not None: self.exclude_targets() else: self.targets = self.original_targets self.target_names = self.original_target_names - if self.original_estimate_matrix is not None: - self.estimate_matrix = torch.tensor( - self.original_estimate_matrix.values, - dtype=torch.float32, - device=self.device, - ) + if self.estimate_matrix_tensor is not None: + self.estimate_matrix = self.estimate_matrix_tensor else: self.estimate_matrix = None @@ -182,6 +200,8 @@ def calibrate(self) -> None: regularize_with_l0=self.regularize_with_l0, logger=self.logger, seed=self.seed, + batch_size=self.batch_size, + estimate_matrix=self.estimate_matrix, ) self.weights = new_weights @@ -242,29 +262,25 @@ def exclude_targets( .cpu() .numpy() ) - elif self.original_estimate_matrix is not None: - # Get initial estimates using the original full matrix - original_estimate_matrix_tensor = torch.tensor( - self.original_estimate_matrix.values, - dtype=torch.float32, - device=self.device, - ) + elif self.estimate_matrix_tensor is not None: + # Get initial estimates using the full matrix tensor initial_estimates_all = ( - (initial_weights_tensor @ original_estimate_matrix_tensor) + (initial_weights_tensor @ self.estimate_matrix_tensor) .detach() .cpu() .numpy() ) - # Filter estimate matrix for calibration - filtered_estimate_matrix = self.original_estimate_matrix.iloc[ - :, calibration_mask - ] - self.estimate_matrix = torch.tensor( - filtered_estimate_matrix.values, - dtype=torch.float32, + # Filter estimate matrix for calibration via torch column + # indexing — no pandas round-trip, no extra materialized copy. + keep_idx = torch.as_tensor( + np.flatnonzero(calibration_mask), + dtype=torch.long, device=self.device, ) + self.estimate_matrix = ( + self.estimate_matrix_tensor.index_select(1, keep_idx) + ) self.estimate_function = ( lambda weights: weights @ self.estimate_matrix @@ -284,12 +300,8 @@ def exclude_targets( ) else: - if self.original_estimate_matrix is not None: - self.estimate_matrix = torch.tensor( - self.original_estimate_matrix.values, - dtype=torch.float32, - device=self.device, - ) + if self.estimate_matrix_tensor is not None: + self.estimate_matrix = self.estimate_matrix_tensor if self.original_estimate_function is None: self.estimate_function = ( lambda weights: weights @ self.estimate_matrix @@ -451,14 +463,19 @@ def _get_linear_loss(metrics_matrix, target_vector, sparse=False): return np.mean(((y - y_hat) ** 2) * normalization_factor) - X = self.original_estimate_matrix.values + if self.estimate_matrix_tensor is None: + raise ValueError( + "analytical_solution requires a dense estimate matrix; " + "Calibration was constructed without one." + ) + X = self.estimate_matrix_tensor.cpu().numpy() y = self.targets results = [] slices = [] idx_dict = { - self.original_estimate_matrix.columns.to_list()[i]: i - for i in range(len(self.original_estimate_matrix.columns)) + self.original_target_names[i]: i + for i in range(len(self.original_target_names)) } self.logger.info( diff --git a/src/microcalibrate/evaluation.py b/src/microcalibrate/evaluation.py index 4b71361..1253495 100644 --- a/src/microcalibrate/evaluation.py +++ b/src/microcalibrate/evaluation.py @@ -175,15 +175,15 @@ def _evaluate_single_holdout_robustness( final_weights, dtype=torch.float32, device=calibration.device ) - # Get estimates for all targets using original estimate function/matrix - if calibration.original_estimate_matrix is not None: - original_matrix_tensor = torch.tensor( - calibration.original_estimate_matrix.values, - dtype=torch.float32, - device=calibration.device, - ) + # Get estimates for all targets using the cached full matrix + # tensor (built once in Calibration.__init__). Falls back to + # the user-supplied estimate_function for callers that passed + # an opaque function rather than a dense matrix. + if calibration.estimate_matrix_tensor is not None: all_estimates = ( - (weights_tensor @ original_matrix_tensor).cpu().numpy() + (weights_tensor @ calibration.estimate_matrix_tensor) + .cpu() + .numpy() ) else: all_estimates = ( diff --git a/src/microcalibrate/hyperparameter_tuning.py b/src/microcalibrate/hyperparameter_tuning.py index 428bbf8..d7cc287 100644 --- a/src/microcalibrate/hyperparameter_tuning.py +++ b/src/microcalibrate/hyperparameter_tuning.py @@ -62,14 +62,11 @@ def _evaluate_single_holdout( sparse_weights, dtype=torch.float32, device=calibration.device ) - if calibration.original_estimate_matrix is not None: - original_matrix_tensor = torch.tensor( - calibration.original_estimate_matrix.values, - dtype=torch.float32, - device=calibration.device, - ) + if calibration.estimate_matrix_tensor is not None: all_estimates = ( - (weights_tensor @ original_matrix_tensor).cpu().numpy() + (weights_tensor @ calibration.estimate_matrix_tensor) + .cpu() + .numpy() ) else: all_estimates = ( diff --git a/src/microcalibrate/reweight.py b/src/microcalibrate/reweight.py index 6143d42..3b69c21 100644 --- a/src/microcalibrate/reweight.py +++ b/src/microcalibrate/reweight.py @@ -10,7 +10,7 @@ from tqdm import tqdm from .utils.log_performance import log_performance_over_epochs -from .utils.metrics import loss, pct_close +from .utils.metrics import _safe_denominator, loss, pct_close def dropout_weights(weights: torch.Tensor, p: float) -> torch.Tensor: @@ -70,6 +70,8 @@ def reweight( device: Optional[str] = None, logger: Optional[logging.Logger] = None, seed: Optional[int] = None, + batch_size: Optional[int] = None, + estimate_matrix: Optional[torch.Tensor] = None, ) -> tuple[np.ndarray, Union[np.ndarray, None], pd.DataFrame]: """Reweight the original weights based on the loss matrix and targets. @@ -97,6 +99,20 @@ def reweight( draws the initial weight noise and torch's generator. When None, a non-deterministic draw is used (preserving the historical behaviour). + batch_size (Optional[int]): If set, the per-epoch gradient is + accumulated over disjoint record batches of this size. This + keeps the autograd activation O(batch_size * n_targets) + instead of O(n_records * n_targets) — critical at v7 scale + (n_records > 1e6). Requires ``estimate_matrix`` to be + provided; not supported for arbitrary ``estimate_function``. + None (default) preserves the existing full-batch path bit- + for-bit. batch_size >= n_records degenerates to full-batch. + estimate_matrix (Optional[torch.Tensor]): The float32 estimate + matrix of shape (n_records, n_targets) backing the + ``estimate_function``. Required when ``batch_size`` is set; + ignored otherwise. Callers passing a custom + ``estimate_function`` that does not correspond to a dense + matrix must use full-batch mode. Returns: np.ndarray: Reweighted weights. @@ -149,6 +165,23 @@ def reweight( optimizer = torch.optim.Adam([weights], lr=learning_rate) + n_records = original_weights.shape[0] + use_batched = batch_size is not None and batch_size < n_records + if use_batched and estimate_matrix is None: + raise ValueError( + "batch_size requires `estimate_matrix` to be provided so the " + "reweight loop can index per-batch rows. Pass the torch " + "estimate tensor explicitly, or leave batch_size=None to use " + "the full-batch path with an arbitrary estimate_function." + ) + if use_batched and regularize_with_l0: + raise ValueError( + "batch_size is not yet supported with regularize_with_l0=True. " + "The L0 sparse-reweighting loop uses a different objective and " + "is not yet batched. Choose one: disable L0 for the dense " + "calibration, or leave batch_size=None." + ) + iterator = tqdm(range(epochs), desc="Reweighting progress", unit="epoch") tracking_n = max(1, epochs // 10) if epochs > 10 else 1 progress_update_interval = 10 @@ -161,9 +194,61 @@ def reweight( for i in iterator: optimizer.zero_grad() weights_ = dropout_weights(weights, dropout_rate) - estimate = estimate_function(torch.exp(weights_)) - l = loss(estimate, targets, normalization_factor) - close = pct_close(estimate, targets) + + if use_batched: + # Two-pass batched gradient accumulation. + # + # The chi-squared loss is separable across record batches + # given the per-target coefficient c_j = d(loss)/d(S_j), + # because S_j (the weighted sum of estimate_matrix column j) + # is itself a sum over records. Phase 1 accumulates S under + # no_grad; Phase 2 computes, per batch, + # virtual_loss_batch = c · (exp(w_log[batch]) @ A[batch]) + # and calls .backward() to accumulate gradients into weights. + # The sum of virtual_loss_batch over batches has exactly the + # same gradient as the full-batch loss; peak autograd + # activation is O(batch_size * n_targets). + n_targets = targets.shape[0] + with torch.no_grad(): + exp_w_ = torch.exp(weights_) + S = torch.zeros(n_targets, dtype=torch.float32, device=device) + for start in range(0, n_records, batch_size): + end = min(start + batch_size, n_records) + S += exp_w_[start:end] @ estimate_matrix[start:end] + # Coefficient c_j = d(loss)/d(S_j). Using the same + # clamped denominator as the reference loss so batched + # and full-batch paths agree on targets near -1. + # loss = mean(((S-t)+1) / _safe_denominator(t))^2 * normalization_factor) + # => d(loss)/d(S_j) = 2 * ((S_j - t_j + 1) / denom_j^2) / n_targets * normalization_factor_j + denominator = _safe_denominator(targets) + rel_error_unrooted = ((S - targets) + 1) / denominator + coef = 2.0 * rel_error_unrooted / denominator / n_targets + if normalization_factor is not None: + coef = coef * normalization_factor + + # Phase 2: per-batch backward with retain_graph until the + # final batch, so weights_ → weights graph persists across + # the multiple .backward() calls within this epoch. + batch_starts = list(range(0, n_records, batch_size)) + for batch_idx, start in enumerate(batch_starts): + end = min(start + batch_size, n_records) + batch_estimate = ( + torch.exp(weights_[start:end]) @ estimate_matrix[start:end] + ) + virtual_loss = (coef * batch_estimate).sum() + retain = batch_idx < len(batch_starts) - 1 + virtual_loss.backward(retain_graph=retain) + + # For logging only: full-batch-equivalent loss value, + # computed from S (no additional activation memory). + with torch.no_grad(): + estimate = S + l = loss(estimate, targets, normalization_factor) + close = pct_close(estimate, targets) + else: + estimate = estimate_function(torch.exp(weights_)) + l = loss(estimate, targets, normalization_factor) + close = pct_close(estimate, targets) if i % progress_update_interval == 0: iterator.set_postfix( @@ -197,8 +282,11 @@ def reweight( # Step every epoch. The returned final_weights reflect the state # after the last step; the final logged row above reflects the - # pre-step state of the same (last) epoch. - l.backward() + # pre-step state of the same (last) epoch. In the batched path + # gradients were already accumulated above, so we only call + # l.backward() on the full-batch path. + if not use_batched: + l.backward() optimizer.step() tracker_dict = { diff --git a/tests/test_batch_reweight.py b/tests/test_batch_reweight.py new file mode 100644 index 0000000..90bfba4 --- /dev/null +++ b/tests/test_batch_reweight.py @@ -0,0 +1,152 @@ +"""Gradient-accumulation batch mode must produce the same weights as full-batch. + +The chi-squared loss is separable across record batches *given* the +pre-computed per-target coefficient c_j = 2*(S_j - t_j) / (t_j + 1)^2, +because the estimate S_j is a sum over records. Implementing this as +two passes (accumulate S under no_grad, then per-batch backward) keeps +peak memory O(B * k) instead of O(N * k) — critical at v7 scale where +N ≈ 1.5M and k ≈ 500. + +These tests verify that: + +1. `batch_size=None` (default) matches the existing full-batch behavior bit + for bit. +2. `batch_size < N` produces final weights within tight numerical tolerance + of the full-batch run (relative error < 1e-4). +3. `batch_size` that does not evenly divide N still processes every row. +4. `batch_size >= N` degenerates to full-batch and matches exactly. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import torch + +from microcalibrate import Calibration + + +def _problem(n_rows: int = 1_000, n_cols: int = 5, seed: int = 42): + rng = np.random.default_rng(seed) + weights = rng.uniform(2.0, 10.0, size=n_rows).astype(np.float64) + matrix = rng.uniform(0.0, 1.0, size=(n_rows, n_cols)).astype(np.float32) + # Targets that require weight adjustments of ~5-20%. + targets = matrix.sum(axis=0) * rng.uniform(0.85, 1.15, size=n_cols) + target_names = np.array([f"c{i}" for i in range(n_cols)]) + df = pd.DataFrame(matrix, columns=target_names) + return weights, targets, target_names, df + + +def _calibrate( + batch_size, + epochs=50, + seed=42, + dropout_rate=0.0, + normalization_factor=None, + excluded_targets=None, + regularize_with_l0=False, +): + torch.manual_seed(seed) + np.random.seed(seed) + weights, targets, target_names, df = _problem() + calibration = Calibration( + weights=weights.copy(), + targets=targets, + target_names=target_names, + estimate_matrix=df, + epochs=epochs, + learning_rate=0.05, + noise_level=0.0, + dropout_rate=dropout_rate, + normalization_factor=normalization_factor, + excluded_targets=excluded_targets, + regularize_with_l0=regularize_with_l0, + seed=seed, + batch_size=batch_size, + ) + calibration.calibrate() + return calibration.weights + + +class TestBatchEquivalence: + def test_full_batch_default_matches_none(self) -> None: + """batch_size=None is the existing full-batch path; reproducible.""" + w1 = _calibrate(batch_size=None) + w2 = _calibrate(batch_size=None) + np.testing.assert_allclose(w1, w2, rtol=1e-6, atol=0.0) + + def test_batched_matches_full_batch(self) -> None: + """Smaller batch size must produce the same final weights.""" + full = _calibrate(batch_size=None) + batched = _calibrate(batch_size=100) + rel_err = np.abs(batched - full) / np.maximum(np.abs(full), 1e-6) + # 1e-4 is tight enough to catch implementation bugs but loose + # enough for fp32 rounding in matmul over N/B partial sums. + assert rel_err.max() < 1e-4, ( + f"batched vs full max rel error = {rel_err.max():.6e}; " + f"full[:5]={full[:5]}, batched[:5]={batched[:5]}" + ) + + def test_batch_size_not_evenly_dividing(self) -> None: + """batch_size=333 with N=1000 must still cover every row exactly once per epoch.""" + full = _calibrate(batch_size=None) + batched = _calibrate(batch_size=333) + rel_err = np.abs(batched - full) / np.maximum(np.abs(full), 1e-6) + assert rel_err.max() < 1e-4, rel_err.max() + + def test_batch_size_at_or_above_n(self) -> None: + """batch_size >= n_rows is a no-op; must match full-batch exactly.""" + full = _calibrate(batch_size=None) + batched = _calibrate(batch_size=10_000) # > n_rows=1000 + np.testing.assert_allclose(batched, full, rtol=1e-6, atol=0.0) + + def test_batch_size_one(self) -> None: + """Extreme case: one record per batch — N backward calls per epoch.""" + full = _calibrate(batch_size=None, epochs=5) + batched = _calibrate(batch_size=1, epochs=5) + rel_err = np.abs(batched - full) / np.maximum(np.abs(full), 1e-6) + assert rel_err.max() < 1e-4, rel_err.max() + + +class TestBatchInteractionsWithOtherFeatures: + """Equivalence must hold when combined with dropout and normalization.""" + + def test_equivalence_with_nonzero_dropout(self) -> None: + """Single per-epoch dropout mask is shared across batches; matches full.""" + full = _calibrate(batch_size=None, dropout_rate=0.3) + batched = _calibrate(batch_size=250, dropout_rate=0.3) + rel_err = np.abs(batched - full) / np.maximum(np.abs(full), 1e-6) + assert rel_err.max() < 1e-4, rel_err.max() + + def test_equivalence_with_normalization_factor(self) -> None: + """Per-target normalization_factor multiplies the coefficient identically.""" + n_cols = 5 + normalization_factor = torch.tensor( + [0.5, 1.0, 2.0, 1.5, 0.8], dtype=torch.float32 + ) + full = _calibrate( + batch_size=None, normalization_factor=normalization_factor + ) + batched = _calibrate( + batch_size=100, normalization_factor=normalization_factor + ) + rel_err = np.abs(batched - full) / np.maximum(np.abs(full), 1e-6) + assert rel_err.max() < 1e-4, rel_err.max() + + def test_equivalence_with_excluded_targets(self) -> None: + """exclude_targets() filters `estimate_matrix` via torch indexing; batched still agrees.""" + full = _calibrate(batch_size=None, excluded_targets=["c4"]) + batched = _calibrate(batch_size=100, excluded_targets=["c4"]) + rel_err = np.abs(batched - full) / np.maximum(np.abs(full), 1e-6) + assert rel_err.max() < 1e-4, rel_err.max() + + +class TestBatchGuardrails: + """Configurations that aren't supported must fail loudly, not silently no-op.""" + + def test_batch_size_with_l0_raises(self) -> None: + """L0 sparse loop is not batched; the combination must raise.""" + import pytest + + with pytest.raises(ValueError, match="regularize_with_l0"): + _calibrate(batch_size=100, regularize_with_l0=True) diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 0000000..57df075 --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,74 @@ +"""Memory-footprint regression tests for the Calibration class. + +At the 1.5M-row scale used by microplex-us's v7 pipeline, holding both +the user-provided pandas DataFrame *and* an independent torch.float32 +copy of the same matrix roughly doubles peak RSS during calibrate(). + +This test pins the fix: after Calibration builds the torch tensor, +self.original_estimate_matrix must be released so its storage is +garbage-collectable. +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd + +from microcalibrate import Calibration + + +def _small_problem(n_rows: int = 200, n_cols: int = 4, seed: int = 0): + rng = np.random.default_rng(seed) + weights = rng.uniform(1.0, 5.0, size=n_rows) + matrix = rng.uniform(0.0, 1.0, size=(n_rows, n_cols)).astype(np.float64) + targets = matrix.sum(axis=0) * 1.1 + target_names = np.array([f"c{i}" for i in range(n_cols)]) + estimate_matrix = pd.DataFrame(matrix, columns=target_names) + return weights, targets, target_names, estimate_matrix + + +class TestOriginalEstimateMatrixReleased: + """After __init__, the user-provided DataFrame must be releasable.""" + + def test_original_estimate_matrix_released_after_init(self) -> None: + weights, targets, target_names, estimate_matrix = _small_problem() + calibration = Calibration( + weights=weights, + targets=targets, + target_names=target_names, + estimate_matrix=estimate_matrix, + epochs=4, + noise_level=0.0, + ) + assert calibration.original_estimate_matrix is None, ( + "Calibration retained original_estimate_matrix; at v7 scale " + "(1.5M rows x 500 cols float64) this is a 6 GB leak." + ) + # The authoritative matrix is the cached float32 torch tensor; + # downstream code (hyperparameter tuning, evaluation) reads this. + assert calibration.estimate_matrix_tensor is not None + assert calibration.estimate_matrix_tensor.dtype.is_floating_point + assert calibration.estimate_matrix_tensor.shape == ( + len(weights), + len(target_names), + ) + + def test_calibrate_still_works_after_release(self) -> None: + """Convergence behavior must be preserved after the matrix is freed.""" + weights, targets, target_names, estimate_matrix = _small_problem() + calibration = Calibration( + weights=weights, + targets=targets, + target_names=target_names, + estimate_matrix=estimate_matrix, + epochs=200, + learning_rate=0.05, + noise_level=0.0, + ) + performance = calibration.calibrate() + # Loss is strictly decreasing on a well-posed small problem. + losses = performance["loss"].tolist() if "loss" in performance else [] + assert len(losses) >= 2, performance + assert ( + losses[-1] < losses[0] + ), f"Calibration did not improve loss: {losses[:5]}...{losses[-5:]}"