From 99fca94728f42a54aadd6b94bc970025d83dbcb4 Mon Sep 17 00:00:00 2001 From: Alexandru Rotaru Date: Wed, 24 Jun 2026 15:06:41 +0200 Subject: [PATCH 1/2] overhead opts: WL_INSTANCE_TARGETS_CPU (VRAM) + empty-mask guard + WL_H5_APPEND_ONLY delta flush Captured from the box's in-place pip-1.2.6 patches (sdk_overhead_patches/), ported onto ddp-support-wip. All env-gated (off by default): - WL_INSTANCE_TARGETS_CPU: detach per-instance targets to CPU at enqueue (seg-mask VRAM fix). - empty-mask guard: np.max([]) on 0-instance rows. - WL_H5_APPEND_ONLY: O(delta) append-only H5 flush vs read-all->merge->rewrite-all. --- weightslab/data/h5_dataframe_store.py | 32 +++++++++++++++++++++++++++ weightslab/src.py | 13 ++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/weightslab/data/h5_dataframe_store.py b/weightslab/data/h5_dataframe_store.py index 6a797a96..43393edf 100644 --- a/weightslab/data/h5_dataframe_store.py +++ b/weightslab/data/h5_dataframe_store.py @@ -660,6 +660,38 @@ def upsert(self, origin: str, df: pd.DataFrame) -> int: key = self._key(origin) self._ensure_parent() + # DELTA / append-only fast path (gated by WL_H5_APPEND_ONLY): write ONLY the + # changed rows instead of read-all -> merge -> rewrite-all. O(delta) per flush + # vs O(total). Appends when the table schema matches; on first write or schema + # change, falls through to the rewrite path. Duplicate indices accumulate and + # are resolved keep-last on read (serving/UI dedup is a follow-up). + if os.environ.get("WL_H5_APPEND_ONLY", "0").lower() in ("1", "true", "yes", "on"): + try: + with self._local_lock: + with _InterProcessFileLock(self._lock_path, timeout=self._lock_timeout, poll_interval=self._poll_interval): + with pd.HDFStore(str(self._path), mode="a") as store: + if key in store: + head = store.select(key, start=0, stop=0) + ecols = list(head.columns) + if set(ecols) == set(df_norm.columns): + df2 = df_norm[ecols].copy() + for col in ecols: + edt = head[col].dtype + if str(edt) == "category": + df2[col] = pd.Categorical(df2[col], categories=head[col].cat.categories) + else: + try: + df2[col] = df2[col].astype(edt) + except Exception: + pass + store.append(key, df2, format="table", data_columns=True) + store.flush() + return len(df_norm) + except Exception as exc: + import sys as _sys; print(f"[DELTA] fell back: {type(exc).__name__}: {exc}", file=_sys.stderr, flush=True) + logger.warning(f"[H5DataFrameStore] append-only fast path fell back: {exc}") + # else fall through to the read-merge-rewrite path below + # Create backup BEFORE any writes backup_path = self._create_backup() diff --git a/weightslab/src.py b/weightslab/src.py index 9cf477ce..1587f459 100644 --- a/weightslab/src.py +++ b/weightslab/src.py @@ -1699,7 +1699,7 @@ def normalize(x): if x is None: return None if isinstance(x, list) and isinstance(x[0], list): - return [np.max(np.array([to_numpy(t) for t in row]), axis=0) for row in x] + return [ (np.max(np.array([to_numpy(t) for t in row]), axis=0) if len(row) else np.zeros((0,), dtype=np.uint16)) for row in x] elif isinstance(x, list): return [to_numpy(t) for t in x] if isinstance(x, th.Tensor): @@ -1946,6 +1946,17 @@ def _coerce_sid(x): if not losses_data: return + # Move per-instance targets OFF the GPU at enqueue time (gated by WL_INSTANCE_TARGETS_CPU): + # otherwise raw target tensors (e.g. seg [H,W] masks) sit in the pending-records buffer + # on-GPU until flush, so VRAM grows with flush_max (the disproportionate per-rank VRAM). + if targets is not None and os.environ.get("WL_INSTANCE_TARGETS_CPU", "0").lower() in ("1","true","yes","on"): + def _to_cpu(t): + return t.detach().cpu() if hasattr(t, "detach") else t + if isinstance(targets, (list, tuple)): + targets = [[_to_cpu(t) for t in s] if isinstance(s, (list, tuple)) else _to_cpu(s) for s in targets] + else: + targets = _to_cpu(targets) + # origin is intentionally NOT forwarded: instance rows (annotation_id >= 1) don't # carry an origin; the flush derives it from the sample row (annotation_id 0). DATAFRAME_M.enqueue_instance_batch( From 1ba3d5c8c1180092a9407efdabb868c7bfa6f969 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 25 Jun 2026 12:47:02 +0000 Subject: [PATCH 2/2] ultralytics: unwrap DistributedDataParallel before installing per-sample signals Under UL native DDP the trainer model is a DDP wrapper, so signal hooks that read model.criterion/init_criterion/args/modules raised AttributeError. Unwrap to model.module in install_per_sample_signals (single entry point feeding both default_train_signals + install_train_pipeline). Unblocks UL-YOLO under DDP. Found via re-exec spike: WLAwareTrainer survives DDP re-exec + per-rank init works; this was the first real blocker. --- weightslab/integrations/ultralytics/signals.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/weightslab/integrations/ultralytics/signals.py b/weightslab/integrations/ultralytics/signals.py index e6a733f7..c472f944 100644 --- a/weightslab/integrations/ultralytics/signals.py +++ b/weightslab/integrations/ultralytics/signals.py @@ -369,9 +369,16 @@ def overlay_p(batch): # ─── top-level API (back-compat with the existing trainer.py calls) ──── +def _unwrap_ddp(model): + """Under UL native DDP the model is a DistributedDataParallel wrapper; signal + hooks need the underlying module (criterion/init_criterion/args/modules).""" + return model.module if isinstance(model, th.nn.parallel.DistributedDataParallel) else model + + def install_per_sample_signals(model, signals_cfg: dict = {}): """Default train pipeline. Equivalent to: install_train_pipeline(model, default_train_signals(model))""" + model = _unwrap_ddp(model) install_train_pipeline(model, default_train_signals(model, signals_cfg=signals_cfg))