Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions weightslab/data/h5_dataframe_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,38 @@ def upsert(self, origin: str, df: pd.DataFrame) -> int:
key = self._key(origin)
self._ensure_parent()

# DELTA / append-only fast path (gated by WL_H5_APPEND_ONLY): write ONLY the
# changed rows instead of read-all -> merge -> rewrite-all. O(delta) per flush
# vs O(total). Appends when the table schema matches; on first write or schema
# change, falls through to the rewrite path. Duplicate indices accumulate and
# are resolved keep-last on read (serving/UI dedup is a follow-up).
if os.environ.get("WL_H5_APPEND_ONLY", "0").lower() in ("1", "true", "yes", "on"):
try:
with self._local_lock:
with _InterProcessFileLock(self._lock_path, timeout=self._lock_timeout, poll_interval=self._poll_interval):
with pd.HDFStore(str(self._path), mode="a") as store:
if key in store:
head = store.select(key, start=0, stop=0)
ecols = list(head.columns)
if set(ecols) == set(df_norm.columns):
df2 = df_norm[ecols].copy()
for col in ecols:
edt = head[col].dtype
if str(edt) == "category":
df2[col] = pd.Categorical(df2[col], categories=head[col].cat.categories)
else:
try:
df2[col] = df2[col].astype(edt)
except Exception:
pass
store.append(key, df2, format="table", data_columns=True)
store.flush()
return len(df_norm)
except Exception as exc:
import sys as _sys; print(f"[DELTA] fell back: {type(exc).__name__}: {exc}", file=_sys.stderr, flush=True)
logger.warning(f"[H5DataFrameStore] append-only fast path fell back: {exc}")
# else fall through to the read-merge-rewrite path below

# Create backup BEFORE any writes
backup_path = self._create_backup()

Expand Down
7 changes: 7 additions & 0 deletions weightslab/integrations/ultralytics/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,16 @@ def overlay_p(batch):

# ─── top-level API (back-compat with the existing trainer.py calls) ────

def _unwrap_ddp(model):
"""Under UL native DDP the model is a DistributedDataParallel wrapper; signal
hooks need the underlying module (criterion/init_criterion/args/modules)."""
return model.module if isinstance(model, th.nn.parallel.DistributedDataParallel) else model


def install_per_sample_signals(model, signals_cfg: dict = {}):
"""Default train pipeline. Equivalent to:
install_train_pipeline(model, default_train_signals(model))"""
model = _unwrap_ddp(model)
install_train_pipeline(model, default_train_signals(model, signals_cfg=signals_cfg))


Expand Down
13 changes: 12 additions & 1 deletion weightslab/src.py
Original file line number Diff line number Diff line change
Expand Up @@ -1699,7 +1699,7 @@ def normalize(x):
if x is None:
return None
if isinstance(x, list) and isinstance(x[0], list):
return [np.max(np.array([to_numpy(t) for t in row]), axis=0) for row in x]
return [ (np.max(np.array([to_numpy(t) for t in row]), axis=0) if len(row) else np.zeros((0,), dtype=np.uint16)) for row in x]
elif isinstance(x, list):
return [to_numpy(t) for t in x]
if isinstance(x, th.Tensor):
Expand Down Expand Up @@ -1946,6 +1946,17 @@ def _coerce_sid(x):
if not losses_data:
return

# Move per-instance targets OFF the GPU at enqueue time (gated by WL_INSTANCE_TARGETS_CPU):
# otherwise raw target tensors (e.g. seg [H,W] masks) sit in the pending-records buffer
# on-GPU until flush, so VRAM grows with flush_max (the disproportionate per-rank VRAM).
if targets is not None and os.environ.get("WL_INSTANCE_TARGETS_CPU", "0").lower() in ("1","true","yes","on"):
def _to_cpu(t):
return t.detach().cpu() if hasattr(t, "detach") else t
if isinstance(targets, (list, tuple)):
targets = [[_to_cpu(t) for t in s] if isinstance(s, (list, tuple)) else _to_cpu(s) for s in targets]
else:
targets = _to_cpu(targets)

# origin is intentionally NOT forwarded: instance rows (annotation_id >= 1) don't
# carry an origin; the flush derives it from the sample row (annotation_id 0).
DATAFRAME_M.enqueue_instance_batch(
Expand Down
Loading