From 0894a0644bc47feca043fadb2ab4aec83fe5889b Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Thu, 23 Apr 2026 17:09:53 -0400 Subject: [PATCH 01/15] Use multi-part upload on files larger than 5GB --- dandi/consts.py | 3 + dandi/files/zarr.py | 146 +++++++++++++++++++++++++++++++++++++- dandi/tests/test_files.py | 43 +++++++++-- 3 files changed, 184 insertions(+), 8 deletions(-) diff --git a/dandi/consts.py b/dandi/consts.py index 933c59aba..da4fbd7d3 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -199,6 +199,9 @@ def urls(self) -> Iterator[str]: #: Maximum number of Zarr directory entries to delete at once ZARR_DELETE_BATCH_SIZE = 100 +#: Zarr chunks above this size (bytes) are uploaded via multipart upload +ZARR_LARGE_CHUNK_THRESHOLD = 5 * 1024 * 1024 * 1024 # 5 GB + BIDS_DATASET_DESCRIPTION = "dataset_description.json" BIDS_IGNORE_FILE = ".bidsignore" diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 67a9ddc86..e6ec75f4d 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -10,10 +10,13 @@ import json import os import os.path +import re from pathlib import Path +from threading import Lock from time import sleep from typing import Any, Optional import urllib.parse +from xml.etree.ElementTree import fromstring from dandischema.models import BareAsset, DigestType from pydantic import BaseModel, ConfigDict, ValidationError @@ -25,6 +28,7 @@ from dandi.consts import ( MAX_ZARR_DEPTH, ZARR_DELETE_BATCH_SIZE, + ZARR_LARGE_CHUNK_THRESHOLD, ZARR_MIME_TYPE, ZARR_UPLOAD_BATCH_SIZE, ) @@ -46,7 +50,7 @@ pre_upload_size_check, ) -from .bases import LocalDirectoryAsset +from .bases import LocalDirectoryAsset, _upload_blob_part from ..validate._types import ( ORIGIN_VALIDATION_DANDI_ZARR, Origin, @@ -741,13 +745,40 @@ def mkzarr() -> str: ): # Items to upload in this batch (may be retried e.g. due to # 403 errors because of timed-out upload URLs) - items_to_upload = list(items) + all_items = list(items) + large_items = [ + it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD + ] + items_to_upload = [ + it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD + ] max_retries = 5 retry_count = 0 # Add all items to checksum tree (only done once) - for it in items_to_upload: + for it in all_items: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) + # Upload chunks above 5GB individually via multipart upload + for it in large_items: + for status in upload_zarr_file_multipart( + item=it, + zarr_id=zarr_id, + dandiset=dandiset, + jobs=jobs, + ): + if status.get("status") == "done": + changed = True + bytes_uploaded += it.size + yield { + "status": "uploading", + "progress": 100 + * bytes_uploaded + / to_upload.total_size, + "current": bytes_uploaded, + } + else: + yield status + while items_to_upload and retry_count <= max_retries: # Prepare upload requests for current items uploading = [it.upload_request() for it in items_to_upload] @@ -902,6 +933,115 @@ def _handle_failed_items_and_raise( raise failed_items[0][1] +def upload_zarr_file_multipart( + item: UploadItem, + zarr_id: str, + dandiset: RemoteDandiset, + jobs: int | None = None, +): + # Avoid heavy import by importing within function: + from dandi.support.digests import get_dandietag + + client = dandiset.client + + yield {"status": "calculating etag"} + etagger = get_dandietag(item.filepath) + filetag = etagger.as_str() + + yield {"status": "initiating upload"} + lgr.debug("%s: Beginning upload", item.filepath) + total_size = pre_upload_size_check(item.filepath) + + resp = client.post( + "/uploads/zarr/initialize/", + json={ + "contentSize": total_size, + "digest": { + "algorithm": "dandi:dandi-etag", + "value": filetag, + }, + "zarr": { + "chunk_key": item.entry_path, + "zarr_id": zarr_id, + }, + }, + ) + + try: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", item.filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with item.filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=item.entry_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + + lgr.debug("%s: Completing upload", item.entry_path) + resp = client.post( + f"/uploads/zarr/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + item.entry_path, + resp["complete_url"], + ) + r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) + lgr.debug( + "%s: Upload completed. Response content: %s", + item.entry_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of" + f" uploaded file; server says {final_etag}," + f" client says {filetag}" + ) + # else: Error? Warning? + resp = client.post(f"/uploads/zarr/{upload_id}/validate/") + yield {"status": "done"} + except Exception: + post_upload_size_check(item.filepath, total_size, True) + raise + else: + post_upload_size_check(item.filepath, total_size, False) + + def _upload_zarr_file( storage_session: RESTFullAPIClient, dandiset: RemoteDandiset, diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 6f3dff19e..5977f2c67 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -1,17 +1,16 @@ from __future__ import annotations -from operator import attrgetter import os -from pathlib import Path import subprocess -from unittest.mock import ANY +from operator import attrgetter +from pathlib import Path +from unittest.mock import ANY, patch -from dandischema.models import get_schema_version import numpy as np import pytest import zarr +from dandischema.models import get_schema_version -from .fixtures import SampleDandiset from .. import get_logger from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset @@ -29,6 +28,7 @@ dandi_file, find_dandi_files, ) +from .fixtures import SampleDandiset lgr = get_logger() @@ -536,6 +536,39 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): assert r.headers["Content-Type"] == "application/json" +@pytest.mark.ai_generated +def test_upload_zarr_large_chunks(new_dandiset, tmp_path): + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via upload_zarr_file_multipart.""" + filepath = tmp_path / "example.zarr" + zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) + zf = dandi_file(filepath) + assert isinstance(zf, ZarrAsset) + + from ..files.zarr import upload_zarr_file_multipart + + real_upload_zarr_file_multipart = upload_zarr_file_multipart + called_paths: list[str] = [] + + def spy_upload_zarr_file_multipart(item, *args, **kwargs): + called_paths.append(item.entry_path) + yield from real_upload_zarr_file_multipart(item, *args, **kwargs) + + # Set threshold to 0 so every chunk is treated as "large" + with ( + patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), + patch( + "dandi.files.zarr.upload_zarr_file_multipart", + spy_upload_zarr_file_multipart, + ), + ): + asset = zf.upload(new_dandiset.dandiset, {}) + + assert isinstance(asset, RemoteZarrAsset) + # Every chunk file in the zarr should have been routed through upload_zarr_file_multipart + remote_entries = {str(e) for e in asset.iterfiles()} + assert remote_entries == set(called_paths) + + def test_validate_deep_zarr(tmp_path: Path) -> None: zarr_path = tmp_path / "foo.zarr" zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1)) From b3bf063a87d8a2f511aca01f7e7dc5317ce8bd63 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Fri, 24 Apr 2026 12:17:20 -0400 Subject: [PATCH 02/15] Unify multipart upload logic between zarrs and blobs --- dandi/files/bases.py | 228 +++++++++++++++++++++----------------- dandi/files/zarr.py | 151 ++++--------------------- dandi/tests/test_files.py | 25 ++--- 3 files changed, 158 insertions(+), 246 deletions(-) diff --git a/dandi/files/bases.py b/dandi/files/bases.py index de52027f7..42af8d51e 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections import deque -from collections.abc import Iterator +from collections.abc import Generator, Iterator from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime @@ -341,118 +341,26 @@ def iter_upload( ``"done"`` and an ``"asset"`` key containing the resulting `RemoteAsset`. """ - # Avoid heavy import by importing within function: - from dandi.support.digests import get_dandietag - asset_path = metadata.setdefault("path", self.path) client = dandiset.client - yield {"status": "calculating etag"} - etagger = get_dandietag(self.filepath) - filetag = etagger.as_str() - lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath) - digest = metadata.get("digest", {}) - if "dandi:dandi-etag" in digest: - if digest["dandi:dandi-etag"] != filetag: - raise RuntimeError( - f"{self.filepath}: File etag changed; was originally" - f" {digest['dandi:dandi-etag']} but is now {filetag}" - ) - yield {"status": "initiating upload"} - lgr.debug("%s: Beginning upload", asset_path) - total_size = pre_upload_size_check(self.filepath) + expected_etag = metadata.get("digest", {}).get("dandi:dandi-etag") try: - resp = client.post( - "/uploads/initialize/", - json={ - "contentSize": total_size, - "digest": { - "algorithm": "dandi:dandi-etag", - "value": filetag, - }, - "dandiset": dandiset.identifier, - }, + validate_resp = yield from _multipart_upload( + client=client, + filepath=self.filepath, + asset_path=asset_path, + upload_prefix="/uploads", + extra_init_fields={"dandiset": dandiset.identifier}, + expected_etag=expected_etag, + jobs=jobs, ) + blob_id = validate_resp["blob_id"] except requests.HTTPError as e: if e.response is not None and e.response.status_code == 409: lgr.debug("%s: Blob already exists on server", asset_path) blob_id = e.response.headers["Location"] else: raise - else: - try: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with self.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=asset_path, - part=part, - ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - lgr.debug("%s: Completing upload", asset_path) - resp = client.post( - f"/uploads/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - asset_path, - resp["complete_url"], - ) - r = storage.post( - resp["complete_url"], data=resp["body"], json_resp=False - ) - lgr.debug( - "%s: Upload completed. Response content: %s", - asset_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of" - f" uploaded file; server says {final_etag}," - f" client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/{upload_id}/validate/") - blob_id = resp["blob_id"] - except Exception: - post_upload_size_check(self.filepath, total_size, True) - raise - else: - post_upload_size_check(self.filepath, total_size, False) lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path) yield {"status": "producing asset"} if replacing is not None: @@ -696,6 +604,120 @@ def _upload_blob_part( } +def _multipart_upload( + client: RESTFullAPIClient, + filepath: Path, + asset_path: str, + upload_prefix: str, + extra_init_fields: dict, + expected_etag: str | None = None, + jobs: int | None = None, +) -> Generator[dict, None, dict]: + """Perform a full multipart upload: etag calculation, initialization, part upload, and validation. + + Yields progress dicts and returns the validate response dict. If + ``expected_etag`` is provided and does not match the computed etag, raises + ``RuntimeError``. A 409 HTTPError from the initialize call propagates + unchanged so the caller can handle the "blob already exists" case. + """ + # Avoid heavy import by importing within function: + from dandi.support.digests import get_dandietag + + yield {"status": "calculating etag"} + etagger = get_dandietag(filepath) + filetag = etagger.as_str() + lgr.debug("Calculated dandi-etag of %s for %s", filetag, filepath) + if expected_etag is not None and filetag != expected_etag: + raise RuntimeError( + f"{filepath}: File etag changed; was originally" + f" {expected_etag} but is now {filetag}" + ) + yield {"status": "initiating upload"} + lgr.debug("%s: Beginning upload", asset_path) + total_size = pre_upload_size_check(filepath) + resp = client.post( + f"{upload_prefix}/initialize/", + json={ + "contentSize": total_size, + "digest": {"algorithm": "dandi:dandi-etag", "value": filetag}, + **extra_init_fields, + }, + ) + try: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=asset_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + lgr.debug("%s: Completing upload", asset_path) + resp = client.post( + f"{upload_prefix}/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + asset_path, + resp["complete_url"], + ) + r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) + lgr.debug( + "%s: Upload completed. Response content: %s", + asset_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of" + f" uploaded file; server says {final_etag}," + f" client says {filetag}" + ) + # else: Error? Warning? + validate_resp = client.post(f"{upload_prefix}/{upload_id}/validate/") + except Exception: + post_upload_size_check(filepath, total_size, True) + raise + else: + post_upload_size_check(filepath, total_size, False) + + return validate_resp + + def _check_required_fields( d: dict, required: list[str], file_path: str ) -> list[ValidationResult]: diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index e6ec75f4d..de0c29c93 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -10,13 +10,10 @@ import json import os import os.path -import re from pathlib import Path -from threading import Lock from time import sleep from typing import Any, Optional import urllib.parse -from xml.etree.ElementTree import fromstring from dandischema.models import BareAsset, DigestType from pydantic import BaseModel, ConfigDict, ValidationError @@ -50,7 +47,7 @@ pre_upload_size_check, ) -from .bases import LocalDirectoryAsset, _upload_blob_part +from .bases import LocalDirectoryAsset, _multipart_upload from ..validate._types import ( ORIGIN_VALIDATION_DANDI_ZARR, Origin, @@ -760,25 +757,28 @@ def mkzarr() -> str: # Upload chunks above 5GB individually via multipart upload for it in large_items: - for status in upload_zarr_file_multipart( - item=it, - zarr_id=zarr_id, - dandiset=dandiset, + # Yield uploading status + yield from _multipart_upload( + client=client, + filepath=it.filepath, + asset_path=it.entry_path, + upload_prefix="/uploads/zarr", + extra_init_fields={ + "zarr": {"chunk_key": it.entry_path, "zarr_id": zarr_id} + }, jobs=jobs, - ): - if status.get("status") == "done": - changed = True - bytes_uploaded += it.size - yield { - "status": "uploading", - "progress": 100 - * bytes_uploaded - / to_upload.total_size, - "current": bytes_uploaded, - } - else: - yield status + ) + + # Part is finished uploading, yield final progress + changed = True + bytes_uploaded += it.size + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / to_upload.total_size, + "current": bytes_uploaded, + } + # Upload the remaining files using single part upload while items_to_upload and retry_count <= max_retries: # Prepare upload requests for current items uploading = [it.upload_request() for it in items_to_upload] @@ -933,115 +933,6 @@ def _handle_failed_items_and_raise( raise failed_items[0][1] -def upload_zarr_file_multipart( - item: UploadItem, - zarr_id: str, - dandiset: RemoteDandiset, - jobs: int | None = None, -): - # Avoid heavy import by importing within function: - from dandi.support.digests import get_dandietag - - client = dandiset.client - - yield {"status": "calculating etag"} - etagger = get_dandietag(item.filepath) - filetag = etagger.as_str() - - yield {"status": "initiating upload"} - lgr.debug("%s: Beginning upload", item.filepath) - total_size = pre_upload_size_check(item.filepath) - - resp = client.post( - "/uploads/zarr/initialize/", - json={ - "contentSize": total_size, - "digest": { - "algorithm": "dandi:dandi-etag", - "value": filetag, - }, - "zarr": { - "chunk_key": item.entry_path, - "zarr_id": zarr_id, - }, - }, - ) - - try: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", item.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with item.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=item.entry_path, - part=part, - ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - - lgr.debug("%s: Completing upload", item.entry_path) - resp = client.post( - f"/uploads/zarr/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - item.entry_path, - resp["complete_url"], - ) - r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) - lgr.debug( - "%s: Upload completed. Response content: %s", - item.entry_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of" - f" uploaded file; server says {final_etag}," - f" client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/zarr/{upload_id}/validate/") - yield {"status": "done"} - except Exception: - post_upload_size_check(item.filepath, total_size, True) - raise - else: - post_upload_size_check(item.filepath, total_size, False) - - def _upload_zarr_file( storage_session: RESTFullAPIClient, dandiset: RemoteDandiset, diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 5977f2c67..cccf53d74 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -1,16 +1,17 @@ from __future__ import annotations -import os -import subprocess from operator import attrgetter +import os from pathlib import Path +import subprocess from unittest.mock import ANY, patch +from dandischema.models import get_schema_version import numpy as np import pytest import zarr -from dandischema.models import get_schema_version +from .fixtures import SampleDandiset from .. import get_logger from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset @@ -28,7 +29,6 @@ dandi_file, find_dandi_files, ) -from .fixtures import SampleDandiset lgr = get_logger() @@ -538,33 +538,32 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): @pytest.mark.ai_generated def test_upload_zarr_large_chunks(new_dandiset, tmp_path): - """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via upload_zarr_file_multipart.""" + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload.""" filepath = tmp_path / "example.zarr" zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) zf = dandi_file(filepath) assert isinstance(zf, ZarrAsset) - from ..files.zarr import upload_zarr_file_multipart + from ..files.bases import _multipart_upload as real_multipart_upload - real_upload_zarr_file_multipart = upload_zarr_file_multipart called_paths: list[str] = [] - def spy_upload_zarr_file_multipart(item, *args, **kwargs): - called_paths.append(item.entry_path) - yield from real_upload_zarr_file_multipart(item, *args, **kwargs) + def spy_multipart_upload(**kwargs): + called_paths.append(kwargs["asset_path"]) + yield from real_multipart_upload(**kwargs) # Set threshold to 0 so every chunk is treated as "large" with ( patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), patch( - "dandi.files.zarr.upload_zarr_file_multipart", - spy_upload_zarr_file_multipart, + "dandi.files.zarr._multipart_upload", + spy_multipart_upload, ), ): asset = zf.upload(new_dandiset.dandiset, {}) assert isinstance(asset, RemoteZarrAsset) - # Every chunk file in the zarr should have been routed through upload_zarr_file_multipart + # Every chunk file in the zarr should have been routed through multipart upload remote_entries = {str(e) for e in asset.iterfiles()} assert remote_entries == set(called_paths) From 4490bbb79ed1229a6b0315290e2927425c7d098e Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 13:15:08 -0400 Subject: [PATCH 03/15] Fix zarr params to multi-part upload function --- dandi/files/zarr.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index de0c29c93..e77a8270c 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -764,7 +764,8 @@ def mkzarr() -> str: asset_path=it.entry_path, upload_prefix="/uploads/zarr", extra_init_fields={ - "zarr": {"chunk_key": it.entry_path, "zarr_id": zarr_id} + "zarr_id": zarr_id, + "chunk_key": it.entry_path, }, jobs=jobs, ) From 3772cc40773130af0b68b99523c61c4865d3629d Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 13:17:46 -0400 Subject: [PATCH 04/15] Fix linting --- dandi/files/bases.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dandi/files/bases.py b/dandi/files/bases.py index 42af8d51e..c984478bf 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -613,7 +613,7 @@ def _multipart_upload( expected_etag: str | None = None, jobs: int | None = None, ) -> Generator[dict, None, dict]: - """Perform a full multipart upload: etag calculation, initialization, part upload, and validation. + """Perform multipart upload: etag calculation, initialization, part upload, and validation. Yields progress dicts and returns the validate response dict. If ``expected_etag`` is provided and does not match the computed etag, raises From 0ded7f99acfe2b931d5cca1dfbef7e56daf2c3f3 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 13:29:11 -0400 Subject: [PATCH 05/15] Fix typing error --- dandi/files/bases.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dandi/files/bases.py b/dandi/files/bases.py index c984478bf..22bfc6f77 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -354,6 +354,8 @@ def iter_upload( expected_etag=expected_etag, jobs=jobs, ) + if validate_resp is None: + raise RuntimeError("Expected upload response of type `dict` but received `None`") blob_id = validate_resp["blob_id"] except requests.HTTPError as e: if e.response is not None and e.response.status_code == 409: @@ -612,7 +614,7 @@ def _multipart_upload( extra_init_fields: dict, expected_etag: str | None = None, jobs: int | None = None, -) -> Generator[dict, None, dict]: +) -> Generator[dict, None, dict | None]: """Perform multipart upload: etag calculation, initialization, part upload, and validation. Yields progress dicts and returns the validate response dict. If @@ -708,7 +710,7 @@ def _multipart_upload( f" client says {filetag}" ) # else: Error? Warning? - validate_resp = client.post(f"{upload_prefix}/{upload_id}/validate/") + validate_resp: dict | None = client.post(f"{upload_prefix}/{upload_id}/validate/") except Exception: post_upload_size_check(filepath, total_size, True) raise From 6e9064b97a4f50e678995e330ad7672bc8bb7e32 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 17:18:53 -0400 Subject: [PATCH 06/15] Move test import to top of file --- dandi/tests/test_files.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index cccf53d74..2fb67e082 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -16,6 +16,7 @@ from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..exceptions import UnknownAssetError +from ..files.bases import _multipart_upload as real_multipart_upload from ..files import ( BIDSDatasetDescriptionAsset, DandisetMetadataFile, @@ -544,8 +545,6 @@ def test_upload_zarr_large_chunks(new_dandiset, tmp_path): zf = dandi_file(filepath) assert isinstance(zf, ZarrAsset) - from ..files.bases import _multipart_upload as real_multipart_upload - called_paths: list[str] = [] def spy_multipart_upload(**kwargs): From 57b353f8d14accf0ef69b52babc5487ba5837489 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 28 Apr 2026 13:00:58 -0400 Subject: [PATCH 07/15] Use multi-part checksums for multi-part zarr uploads --- dandi/files/zarr.py | 10 ++++++++-- dandi/support/digests.py | 24 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index e77a8270c..0bbcc63fb 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1037,9 +1037,15 @@ def register(self, e: LocalZarrEntry, digest: str | None = None) -> None: @staticmethod def _mkitem(e: LocalZarrEntry) -> UploadItem: # Avoid heavy import by importing within function: - from dandi.support.digests import md5file_nocache + from dandi.support.digests import md5file_nocache, multipart_md5file_nocache + + file_size = e.filepath.stat().st_size + digest = ( + md5file_nocache(e.filepath) + if file_size <= ZARR_LARGE_CHUNK_THRESHOLD + else multipart_md5file_nocache(e.filepath) + ) - digest = md5file_nocache(e.filepath) return UploadItem.from_entry(e, digest) def get_items(self, jobs: int = 5) -> Generator[UploadItem, None, None]: diff --git a/dandi/support/digests.py b/dandi/support/digests.py index 7a69a1629..018626f89 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -137,6 +137,30 @@ def md5file_nocache(filepath: str | Path) -> str: return Digester(["md5"])(filepath)["md5"] +def multipart_md5file_nocache(filepath: str | Path) -> str: + """ + Compute the S3 multipart ETag for a file. + + Splits the file into parts of ``part_size`` bytes, hashes each part with + MD5, then returns ``MD5(concat(part_md5s))-{num_parts}``, matching what S3 + stores as the ETag for a multipart upload. + """ + if isinstance(filepath, str): + filepath = Path(filepath) + + part_size = DandiETag(filepath.stat().st_size)._part_gen.initial_part_size + part_md5s = b"" + num_parts = 0 + with open(filepath, "rb") as f: + while True: + chunk = f.read(part_size) + if not chunk: + break + part_md5s += hashlib.md5(chunk).digest() + num_parts += 1 + return f"{hashlib.md5(part_md5s).hexdigest()}-{num_parts}" + + def checksum_zarr_dir( files: dict[str, tuple[str, int]], directories: dict[str, tuple[str, int]] ) -> str: From 9bc29766b34341799b47a8e2d40ea3baca542ea1 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 28 Apr 2026 13:01:33 -0400 Subject: [PATCH 08/15] Add test for multi-part zarr upload --- dandi/tests/test_files.py | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 2fb67e082..9875d2087 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -567,6 +567,48 @@ def spy_multipart_upload(**kwargs): assert remote_entries == set(called_paths) +@pytest.mark.ai_generated +def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path): + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload.""" + filepath = tmp_path / "mixed.zarr" + store = zarr.open_group(str(filepath), mode="w") + # small array: 10 int64 elements, produces a ~96-byte chunk (compressed) + store.create_dataset("small", data=np.arange(10, dtype=np.int64), chunks=(10,)) + # large array: 200 int64 elements, produces a ~329-byte chunk (compressed) + store.create_dataset("large", data=np.arange(200, dtype=np.int64), chunks=(200,)) + + zf = dandi_file(filepath) + assert isinstance(zf, ZarrAsset) + + multipart_paths: list[str] = [] + + def spy_multipart_upload(**kwargs): + multipart_paths.append(kwargs["asset_path"]) + yield from real_multipart_upload(**kwargs) + + # Threshold sits between the two chunk sizes so only the large chunk goes multipart + mixed_threshold = 200 + with ( + patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), + patch("dandi.files.zarr._multipart_upload", spy_multipart_upload), + ): + asset = zf.upload(new_dandiset.dandiset, {}) + + assert isinstance(asset, RemoteZarrAsset) + + remote_entries = {str(e) for e in asset.iterfiles()} + # Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded + large_chunks = { + p + for p in remote_entries + if (filepath / p).stat().st_size > mixed_threshold + } + assert set(multipart_paths) == large_chunks + # At least one chunk must have gone each path so the test is meaningful + assert len(multipart_paths) > 0 + assert len(remote_entries) - len(multipart_paths) > 0 + + def test_validate_deep_zarr(tmp_path: Path) -> None: zarr_path = tmp_path / "foo.zarr" zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1)) From 6d996707ff027353bf39e21531ff397d3da6bbe7 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 28 Apr 2026 16:28:26 -0400 Subject: [PATCH 09/15] Merge multipart behavior into md5file_nocache func --- dandi/files/zarr.py | 10 ++-------- dandi/support/digests.py | 28 +++++++--------------------- dandi/tests/test_files.py | 2 ++ 3 files changed, 11 insertions(+), 29 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 0bbcc63fb..e77a8270c 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1037,15 +1037,9 @@ def register(self, e: LocalZarrEntry, digest: str | None = None) -> None: @staticmethod def _mkitem(e: LocalZarrEntry) -> UploadItem: # Avoid heavy import by importing within function: - from dandi.support.digests import md5file_nocache, multipart_md5file_nocache - - file_size = e.filepath.stat().st_size - digest = ( - md5file_nocache(e.filepath) - if file_size <= ZARR_LARGE_CHUNK_THRESHOLD - else multipart_md5file_nocache(e.filepath) - ) + from dandi.support.digests import md5file_nocache + digest = md5file_nocache(e.filepath) return UploadItem.from_entry(e, digest) def get_items(self, jobs: int = 5) -> Generator[UploadItem, None, None]: diff --git a/dandi/support/digests.py b/dandi/support/digests.py index 018626f89..c35522984 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -26,6 +26,7 @@ from fscacher import PersistentCache from zarr_checksum.checksum import ZarrChecksum, ZarrChecksumManifest from zarr_checksum.tree import ZarrChecksumTree +from dandi.consts import ZARR_LARGE_CHUNK_THRESHOLD from .threaded_walk import threaded_walk from ..utils import Hasher, exclude_from_zarr @@ -133,32 +134,17 @@ def md5file_nocache(filepath: str | Path) -> str: Compute the MD5 digest of a file without caching with fscacher, which has been shown to slow things down for the large numbers of files typically present in Zarrs - """ - return Digester(["md5"])(filepath)["md5"] - -def multipart_md5file_nocache(filepath: str | Path) -> str: - """ - Compute the S3 multipart ETag for a file. - - Splits the file into parts of ``part_size`` bytes, hashes each part with - MD5, then returns ``MD5(concat(part_md5s))-{num_parts}``, matching what S3 - stores as the ETag for a multipart upload. + If the file is larger than `ZARR_LARGE_CHUNK_THRESHOLD`, the computed checksum is not a + traditional md5 checksum, but is instead an S3 multipart ETag. """ if isinstance(filepath, str): filepath = Path(filepath) - part_size = DandiETag(filepath.stat().st_size)._part_gen.initial_part_size - part_md5s = b"" - num_parts = 0 - with open(filepath, "rb") as f: - while True: - chunk = f.read(part_size) - if not chunk: - break - part_md5s += hashlib.md5(chunk).digest() - num_parts += 1 - return f"{hashlib.md5(part_md5s).hexdigest()}-{num_parts}" + if filepath.stat().st_size > ZARR_LARGE_CHUNK_THRESHOLD: + return get_dandietag(filepath).as_str() + + return Digester(["md5"])(filepath)["md5"] def checksum_zarr_dir( diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 9875d2087..03ec6a213 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -554,6 +554,7 @@ def spy_multipart_upload(**kwargs): # Set threshold to 0 so every chunk is treated as "large" with ( patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), + patch("dandi.support.digests.ZARR_LARGE_CHUNK_THRESHOLD", 0), patch( "dandi.files.zarr._multipart_upload", spy_multipart_upload, @@ -590,6 +591,7 @@ def spy_multipart_upload(**kwargs): mixed_threshold = 200 with ( patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), + patch("dandi.support.digests.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), patch("dandi.files.zarr._multipart_upload", spy_multipart_upload), ): asset = zf.upload(new_dandiset.dandiset, {}) From 871294b74d5b5672dd83eefc12a0f8f23da3bb96 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Wed, 29 Apr 2026 13:24:36 -0400 Subject: [PATCH 10/15] Ignore incorrect type checking error --- dandi/support/digests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dandi/support/digests.py b/dandi/support/digests.py index c35522984..d57486a6e 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -142,7 +142,8 @@ def md5file_nocache(filepath: str | Path) -> str: filepath = Path(filepath) if filepath.stat().st_size > ZARR_LARGE_CHUNK_THRESHOLD: - return get_dandietag(filepath).as_str() + # For some reason the type checker treats this return as an Any type + return get_dandietag(filepath).as_str() # type: ignore [no-any-return] return Digester(["md5"])(filepath)["md5"] From a5bfdd79fc73471c38fa14e4f2d81562e09ee08a Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 19:51:28 -0400 Subject: [PATCH 11/15] Rename zarr upload batch vars: large_items->multipart_items, items_to_upload->singlepart_items The split of a batch into "items above the 5GB threshold" and "items at or below" is really a split between items routed through the multipart upload endpoint and items uploaded with a single signed PUT. Naming them after the upload mechanism rather than the size makes the subsequent capability/gating logic read more naturally. Pure rename, no behavior change. Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/files/zarr.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index e77a8270c..3df279b50 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -743,10 +743,10 @@ def mkzarr() -> str: # Items to upload in this batch (may be retried e.g. due to # 403 errors because of timed-out upload URLs) all_items = list(items) - large_items = [ + multipart_items = [ it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD ] - items_to_upload = [ + singlepart_items = [ it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD ] max_retries = 5 @@ -756,7 +756,7 @@ def mkzarr() -> str: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) # Upload chunks above 5GB individually via multipart upload - for it in large_items: + for it in multipart_items: # Yield uploading status yield from _multipart_upload( client=client, @@ -780,9 +780,9 @@ def mkzarr() -> str: } # Upload the remaining files using single part upload - while items_to_upload and retry_count <= max_retries: + while singlepart_items and retry_count <= max_retries: # Prepare upload requests for current items - uploading = [it.upload_request() for it in items_to_upload] + uploading = [it.upload_request() for it in singlepart_items] if retry_count == 0: lgr.debug( @@ -814,7 +814,7 @@ def mkzarr() -> str: upload_url=signed_url, item=it, ) - for (signed_url, it) in zip(r, items_to_upload) + for (signed_url, it) in zip(r, singlepart_items) ] changed = True @@ -846,20 +846,20 @@ def mkzarr() -> str: ) # Prepare for next iteration with retry items - if items_to_upload := retry_items: + if singlepart_items := retry_items: retry_count += 1 if retry_count <= max_retries: lgr.info( "%s: %s got 403 errors, requesting new URLs", asset_path, - pluralize(len(items_to_upload), "file"), + pluralize(len(singlepart_items), "file"), ) # Small delay before retry sleep(1 * retry_count) # Check if we exhausted retries - if items_to_upload: - nfiles_str = pluralize(len(items_to_upload), "file") + if singlepart_items: + nfiles_str = pluralize(len(singlepart_items), "file") raise UploadError( f"{asset_path}: failed to upload {nfiles_str} " f"after {max_retries} retries due to repeated 403 errors" From 4b8134051ccebc260f0c3ee1d0a13135ebf224f5 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 20:53:37 -0400 Subject: [PATCH 12/15] Gate zarr multipart upload on server capability dandi-archive#2784 (still open) introduces the ``/uploads/zarr/initialize/`` endpoint required to upload zarr chunks larger than 5 GiB. Until that lands and is deployed, posting to the endpoint returns 404 and the multi-part code path here would surface that as an opaque ``HTTP404Error`` from inside ``_multipart_upload``. Add a lazy ``DandiAPIClient.supports_zarr_multipart_upload`` cached property that probes the endpoint once with an empty POST body: 404 means the server lacks the feature, anything else (400 for the bogus payload, 401/403 for auth) means it is present. In ``ZarrAsset.iter_upload``, right after the batch is split into ``multipart_items`` and ``singlepart_items``, raise an ``UploadError`` when there are multipart items and the server does not support them. The error reports the count and aggregate size of the oversized chunks plus the path of the largest one, mirroring the wording proposed in dandi/dandi-cli#1827. Marked with a TODO: the check (and the cached property) can be removed once all supported servers ship dandi-archive#2784 (> 0.23.0). Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/dandiapi.py | 22 ++++++++++++++++++++++ dandi/files/zarr.py | 18 ++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/dandi/dandiapi.py b/dandi/dandiapi.py index 61566425b..f2d0cd49d 100644 --- a/dandi/dandiapi.py +++ b/dandi/dandiapi.py @@ -19,6 +19,7 @@ from datetime import datetime from enum import Enum from fnmatch import fnmatchcase +from functools import cached_property import json import os.path from pathlib import Path, PurePosixPath @@ -565,6 +566,27 @@ def _get_keyring_ids(self) -> tuple[str, str]: def _instance_id(self) -> str: return self.dandi_instance.name.upper() + @cached_property + def supports_zarr_multipart_upload(self) -> bool: + """ + Whether the server exposes the zarr multipart upload endpoints + introduced in dandi-archive#2784 (``/uploads/zarr/initialize/`` and + friends). + + Probed once per client by POSTing an empty body to + ``/uploads/zarr/initialize/``: if the route does not exist the server + returns 404; any other status (400 for the bad payload, 401/403 for + auth, etc.) means the route is present and the server supports + multipart zarr uploads. + """ + try: + self.post("/uploads/zarr/initialize/", json={}) + except HTTP404Error: + return False + except requests.HTTPError: + return True + return True + def get_dandiset( self, dandiset_id: str, version_id: str | None = None, lazy: bool = True ) -> RemoteDandiset: diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 3df279b50..6f1a15bc2 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -749,6 +749,24 @@ def mkzarr() -> str: singlepart_items = [ it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD ] + # TODO: remove once all servers are > 0.23.0 (i.e. ship + # dandi-archive#2784) and the multipart zarr upload + # endpoints are universally available; the capability + # check would then be unnecessary. + if multipart_items and not client.supports_zarr_multipart_upload: + largest = max(multipart_items, key=lambda it: it.size) + total_large_size = sum(it.size for it in multipart_items) + raise UploadError( + f"{asset_path}:" + f" {pluralize(len(multipart_items), 'Zarr chunk')}" + f" totaling {total_large_size / 1024**3:.2f} GiB" + f" exceed the S3 single-part upload limit of" + f" {ZARR_LARGE_CHUNK_THRESHOLD / 1024**3:.0f} GiB" + f" (largest: {largest.entry_path}," + f" {largest.size / 1024**3:.2f} GiB);" + f" the server does not support multipart zarr" + f" uploads (dandi-archive#2784)." + ) max_retries = 5 retry_count = 0 # Add all items to checksum tree (only done once) From 39cc742d44a89c02d3be663ca345132fd97f7824 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 20:54:11 -0400 Subject: [PATCH 13/15] Translate per-file ``current`` from _multipart_upload to whole-zarr cumulative MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``_multipart_upload`` (used here to upload zarr chunks larger than 5 GiB) yields progress dicts whose ``current`` is the number of bytes uploaded *within the chunk currently being transferred*. The surrounding ``ZarrAsset.iter_upload`` loop reports ``current`` as the number of bytes uploaded *across the whole zarr*. Yielded as-is, the inner per-file values caused ``current`` to jump backwards each time a new multipart chunk started, breaking progress bars driven from that field. Wrap the inner generator: when an "uploading" status carries a ``current`` field, translate it by the cumulative bytes uploaded so far before re-yielding; pass other status dicts through unchanged. The trailing post-loop yield is dropped — the last per-part translated yield already reports the full cumulative size for that chunk. Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/files/zarr.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 6f1a15bc2..91dbdda84 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -773,10 +773,14 @@ def mkzarr() -> str: for it in all_items: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) - # Upload chunks above 5GB individually via multipart upload + # Upload chunks above 5GB individually via multipart upload. + # ``_multipart_upload`` reports ``current`` as bytes within + # the single chunk being uploaded; translate it to bytes + # uploaded across the whole zarr so progress reporting + # stays monotonic for downstream consumers. for it in multipart_items: - # Yield uploading status - yield from _multipart_upload( + cumulative_before = bytes_uploaded + for status in _multipart_upload( client=client, filepath=it.filepath, asset_path=it.entry_path, @@ -786,16 +790,21 @@ def mkzarr() -> str: "chunk_key": it.entry_path, }, jobs=jobs, - ) - - # Part is finished uploading, yield final progress + ): + if ( + status.get("status") == "uploading" + and "current" in status + ): + cumulative = cumulative_before + status["current"] + yield { + "status": "uploading", + "progress": 100 * cumulative / to_upload.total_size, + "current": cumulative, + } + else: + yield status changed = True bytes_uploaded += it.size - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / to_upload.total_size, - "current": bytes_uploaded, - } # Upload the remaining files using single part upload while singlepart_items and retry_count <= max_retries: From 684da5ccae0f2371fab2e8dec451e9595561785d Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 21:15:40 -0400 Subject: [PATCH 14/15] Skip zarr multipart-upload tests when server lacks the endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two ``ai_generated`` tests added in 9bc29766 patch ``ZARR_LARGE_CHUNK_THRESHOLD`` down so chunks are routed through ``_multipart_upload`` against whatever server backs ``new_dandiset``. The local docker dandi-archive image does not yet ship dandi-archive#2784, so the new ``supports_zarr_multipart_upload`` capability check raises ``UploadError`` before ``_multipart_upload`` is ever called and the tests fail without exercising what they claim to. Probe the live client for capability and ``pytest.skip`` only when the endpoint is missing — keeps the tests effective on setups whose server already carries dandi-archive#2784. Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/tests/test_files.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 03ec6a213..95e927960 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -16,7 +16,6 @@ from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..exceptions import UnknownAssetError -from ..files.bases import _multipart_upload as real_multipart_upload from ..files import ( BIDSDatasetDescriptionAsset, DandisetMetadataFile, @@ -30,6 +29,7 @@ dandi_file, find_dandi_files, ) +from ..files.bases import _multipart_upload as real_multipart_upload lgr = get_logger() @@ -540,6 +540,11 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): @pytest.mark.ai_generated def test_upload_zarr_large_chunks(new_dandiset, tmp_path): """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload.""" + if not new_dandiset.client.supports_zarr_multipart_upload: + pytest.skip( + "Server does not expose the zarr multipart upload endpoints" + " (dandi-archive#2784)" + ) filepath = tmp_path / "example.zarr" zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) zf = dandi_file(filepath) @@ -571,6 +576,11 @@ def spy_multipart_upload(**kwargs): @pytest.mark.ai_generated def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path): """Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload.""" + if not new_dandiset.client.supports_zarr_multipart_upload: + pytest.skip( + "Server does not expose the zarr multipart upload endpoints" + " (dandi-archive#2784)" + ) filepath = tmp_path / "mixed.zarr" store = zarr.open_group(str(filepath), mode="w") # small array: 10 int64 elements, produces a ~96-byte chunk (compressed) @@ -601,9 +611,7 @@ def spy_multipart_upload(**kwargs): remote_entries = {str(e) for e in asset.iterfiles()} # Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded large_chunks = { - p - for p in remote_entries - if (filepath / p).stat().st_size > mixed_threshold + p for p in remote_entries if (filepath / p).stat().st_size > mixed_threshold } assert set(multipart_paths) == large_chunks # At least one chunk must have gone each path so the test is meaningful From db6bcf5e35959fa352f23cc260b386e900927ef5 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Thu, 30 Apr 2026 14:21:15 -0400 Subject: [PATCH 15/15] Fix test_upload_zarr_large_chunks --- dandi/tests/test_files.py | 75 +++++++++++---------------------------- 1 file changed, 20 insertions(+), 55 deletions(-) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 95e927960..6c9683c7b 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -13,7 +13,7 @@ from .fixtures import SampleDandiset from .. import get_logger -from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file +from ..consts import ZARR_LARGE_CHUNK_THRESHOLD, ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..exceptions import UnknownAssetError from ..files import ( @@ -547,6 +547,20 @@ def test_upload_zarr_large_chunks(new_dandiset, tmp_path): ) filepath = tmp_path / "example.zarr" zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) + + # Add a "large" array whose single chunk is a sparse file just over the threshold. + # Sparse files appear large to stat() without consuming real disk space. + store = zarr.open_group(str(filepath), mode="a") + large_arr = store.create_dataset( + "large", shape=(1,), chunks=(1,), dtype=np.uint8, compressor=None + ) + large_arr[:] = np.zeros(1, dtype=np.uint8) + large_chunk_path = filepath / "large" / "0" + large_chunk_path.unlink() + with open(large_chunk_path, "wb") as f: + f.seek(ZARR_LARGE_CHUNK_THRESHOLD + 1) + f.write(b"\x00") + zf = dandi_file(filepath) assert isinstance(zf, ZarrAsset) @@ -556,67 +570,18 @@ def spy_multipart_upload(**kwargs): called_paths.append(kwargs["asset_path"]) yield from real_multipart_upload(**kwargs) - # Set threshold to 0 so every chunk is treated as "large" - with ( - patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), - patch("dandi.support.digests.ZARR_LARGE_CHUNK_THRESHOLD", 0), - patch( - "dandi.files.zarr._multipart_upload", - spy_multipart_upload, - ), - ): + with patch("dandi.files.zarr._multipart_upload", spy_multipart_upload): asset = zf.upload(new_dandiset.dandiset, {}) assert isinstance(asset, RemoteZarrAsset) - # Every chunk file in the zarr should have been routed through multipart upload remote_entries = {str(e) for e in asset.iterfiles()} - assert remote_entries == set(called_paths) - -@pytest.mark.ai_generated -def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path): - """Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload.""" - if not new_dandiset.client.supports_zarr_multipart_upload: - pytest.skip( - "Server does not expose the zarr multipart upload endpoints" - " (dandi-archive#2784)" - ) - filepath = tmp_path / "mixed.zarr" - store = zarr.open_group(str(filepath), mode="w") - # small array: 10 int64 elements, produces a ~96-byte chunk (compressed) - store.create_dataset("small", data=np.arange(10, dtype=np.int64), chunks=(10,)) - # large array: 200 int64 elements, produces a ~329-byte chunk (compressed) - store.create_dataset("large", data=np.arange(200, dtype=np.int64), chunks=(200,)) + # Only the large chunk should have been routed through multipart upload + assert set(called_paths) == {"large/0"} - zf = dandi_file(filepath) - assert isinstance(zf, ZarrAsset) - - multipart_paths: list[str] = [] - - def spy_multipart_upload(**kwargs): - multipart_paths.append(kwargs["asset_path"]) - yield from real_multipart_upload(**kwargs) - - # Threshold sits between the two chunk sizes so only the large chunk goes multipart - mixed_threshold = 200 - with ( - patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), - patch("dandi.support.digests.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), - patch("dandi.files.zarr._multipart_upload", spy_multipart_upload), - ): - asset = zf.upload(new_dandiset.dandiset, {}) - - assert isinstance(asset, RemoteZarrAsset) - - remote_entries = {str(e) for e in asset.iterfiles()} - # Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded - large_chunks = { - p for p in remote_entries if (filepath / p).stat().st_size > mixed_threshold - } - assert set(multipart_paths) == large_chunks # At least one chunk must have gone each path so the test is meaningful - assert len(multipart_paths) > 0 - assert len(remote_entries) - len(multipart_paths) > 0 + assert len(called_paths) > 0 + assert len(remote_entries) - len(called_paths) > 0 def test_validate_deep_zarr(tmp_path: Path) -> None: