diff --git a/dandi/consts.py b/dandi/consts.py index f59ae7e7b..56abcff8d 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -199,6 +199,9 @@ def urls(self) -> Iterator[str]: #: Maximum number of Zarr directory entries to delete at once ZARR_DELETE_BATCH_SIZE = 100 +#: Zarr chunks above this size (bytes) are uploaded via multipart upload +ZARR_LARGE_CHUNK_THRESHOLD = 5 * 1024 * 1024 * 1024 # 5 GB + BIDS_DATASET_DESCRIPTION = "dataset_description.json" BIDS_IGNORE_FILE = ".bidsignore" diff --git a/dandi/dandiapi.py b/dandi/dandiapi.py index 61566425b..f2d0cd49d 100644 --- a/dandi/dandiapi.py +++ b/dandi/dandiapi.py @@ -19,6 +19,7 @@ from datetime import datetime from enum import Enum from fnmatch import fnmatchcase +from functools import cached_property import json import os.path from pathlib import Path, PurePosixPath @@ -565,6 +566,27 @@ def _get_keyring_ids(self) -> tuple[str, str]: def _instance_id(self) -> str: return self.dandi_instance.name.upper() + @cached_property + def supports_zarr_multipart_upload(self) -> bool: + """ + Whether the server exposes the zarr multipart upload endpoints + introduced in dandi-archive#2784 (``/uploads/zarr/initialize/`` and + friends). + + Probed once per client by POSTing an empty body to + ``/uploads/zarr/initialize/``: if the route does not exist the server + returns 404; any other status (400 for the bad payload, 401/403 for + auth, etc.) means the route is present and the server supports + multipart zarr uploads. + """ + try: + self.post("/uploads/zarr/initialize/", json={}) + except HTTP404Error: + return False + except requests.HTTPError: + return True + return True + def get_dandiset( self, dandiset_id: str, version_id: str | None = None, lazy: bool = True ) -> RemoteDandiset: diff --git a/dandi/files/bases.py b/dandi/files/bases.py index de52027f7..22bfc6f77 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections import deque -from collections.abc import Iterator +from collections.abc import Generator, Iterator from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime @@ -341,118 +341,28 @@ def iter_upload( ``"done"`` and an ``"asset"`` key containing the resulting `RemoteAsset`. """ - # Avoid heavy import by importing within function: - from dandi.support.digests import get_dandietag - asset_path = metadata.setdefault("path", self.path) client = dandiset.client - yield {"status": "calculating etag"} - etagger = get_dandietag(self.filepath) - filetag = etagger.as_str() - lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath) - digest = metadata.get("digest", {}) - if "dandi:dandi-etag" in digest: - if digest["dandi:dandi-etag"] != filetag: - raise RuntimeError( - f"{self.filepath}: File etag changed; was originally" - f" {digest['dandi:dandi-etag']} but is now {filetag}" - ) - yield {"status": "initiating upload"} - lgr.debug("%s: Beginning upload", asset_path) - total_size = pre_upload_size_check(self.filepath) + expected_etag = metadata.get("digest", {}).get("dandi:dandi-etag") try: - resp = client.post( - "/uploads/initialize/", - json={ - "contentSize": total_size, - "digest": { - "algorithm": "dandi:dandi-etag", - "value": filetag, - }, - "dandiset": dandiset.identifier, - }, + validate_resp = yield from _multipart_upload( + client=client, + filepath=self.filepath, + asset_path=asset_path, + upload_prefix="/uploads", + extra_init_fields={"dandiset": dandiset.identifier}, + expected_etag=expected_etag, + jobs=jobs, ) + if validate_resp is None: + raise RuntimeError("Expected upload response of type `dict` but received `None`") + blob_id = validate_resp["blob_id"] except requests.HTTPError as e: if e.response is not None and e.response.status_code == 409: lgr.debug("%s: Blob already exists on server", asset_path) blob_id = e.response.headers["Location"] else: raise - else: - try: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with self.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=asset_path, - part=part, - ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - lgr.debug("%s: Completing upload", asset_path) - resp = client.post( - f"/uploads/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - asset_path, - resp["complete_url"], - ) - r = storage.post( - resp["complete_url"], data=resp["body"], json_resp=False - ) - lgr.debug( - "%s: Upload completed. Response content: %s", - asset_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of" - f" uploaded file; server says {final_etag}," - f" client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/{upload_id}/validate/") - blob_id = resp["blob_id"] - except Exception: - post_upload_size_check(self.filepath, total_size, True) - raise - else: - post_upload_size_check(self.filepath, total_size, False) lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path) yield {"status": "producing asset"} if replacing is not None: @@ -696,6 +606,120 @@ def _upload_blob_part( } +def _multipart_upload( + client: RESTFullAPIClient, + filepath: Path, + asset_path: str, + upload_prefix: str, + extra_init_fields: dict, + expected_etag: str | None = None, + jobs: int | None = None, +) -> Generator[dict, None, dict | None]: + """Perform multipart upload: etag calculation, initialization, part upload, and validation. + + Yields progress dicts and returns the validate response dict. If + ``expected_etag`` is provided and does not match the computed etag, raises + ``RuntimeError``. A 409 HTTPError from the initialize call propagates + unchanged so the caller can handle the "blob already exists" case. + """ + # Avoid heavy import by importing within function: + from dandi.support.digests import get_dandietag + + yield {"status": "calculating etag"} + etagger = get_dandietag(filepath) + filetag = etagger.as_str() + lgr.debug("Calculated dandi-etag of %s for %s", filetag, filepath) + if expected_etag is not None and filetag != expected_etag: + raise RuntimeError( + f"{filepath}: File etag changed; was originally" + f" {expected_etag} but is now {filetag}" + ) + yield {"status": "initiating upload"} + lgr.debug("%s: Beginning upload", asset_path) + total_size = pre_upload_size_check(filepath) + resp = client.post( + f"{upload_prefix}/initialize/", + json={ + "contentSize": total_size, + "digest": {"algorithm": "dandi:dandi-etag", "value": filetag}, + **extra_init_fields, + }, + ) + try: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=asset_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + lgr.debug("%s: Completing upload", asset_path) + resp = client.post( + f"{upload_prefix}/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + asset_path, + resp["complete_url"], + ) + r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) + lgr.debug( + "%s: Upload completed. Response content: %s", + asset_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of" + f" uploaded file; server says {final_etag}," + f" client says {filetag}" + ) + # else: Error? Warning? + validate_resp: dict | None = client.post(f"{upload_prefix}/{upload_id}/validate/") + except Exception: + post_upload_size_check(filepath, total_size, True) + raise + else: + post_upload_size_check(filepath, total_size, False) + + return validate_resp + + def _check_required_fields( d: dict, required: list[str], file_path: str ) -> list[ValidationResult]: diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index a2f5a8945..2a056b8f9 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -25,6 +25,7 @@ from dandi.consts import ( MAX_ZARR_DEPTH, ZARR_DELETE_BATCH_SIZE, + ZARR_LARGE_CHUNK_THRESHOLD, ZARR_MIME_TYPE, ZARR_UPLOAD_BATCH_SIZE, ) @@ -46,7 +47,7 @@ pre_upload_size_check, ) -from .bases import LocalDirectoryAsset +from .bases import LocalDirectoryAsset, _multipart_upload from ..validate._types import ( ORIGIN_VALIDATION_DANDI_ZARR, Origin, @@ -744,16 +745,74 @@ def mkzarr() -> str: ): # Items to upload in this batch (may be retried e.g. due to # 403 errors because of timed-out upload URLs) - items_to_upload = list(items) + all_items = list(items) + multipart_items = [ + it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD + ] + singlepart_items = [ + it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD + ] + # TODO: remove once all servers are > 0.23.0 (i.e. ship + # dandi-archive#2784) and the multipart zarr upload + # endpoints are universally available; the capability + # check would then be unnecessary. + if multipart_items and not client.supports_zarr_multipart_upload: + largest = max(multipart_items, key=lambda it: it.size) + total_large_size = sum(it.size for it in multipart_items) + raise UploadError( + f"{asset_path}:" + f" {pluralize(len(multipart_items), 'Zarr chunk')}" + f" totaling {total_large_size / 1024**3:.2f} GiB" + f" exceed the S3 single-part upload limit of" + f" {ZARR_LARGE_CHUNK_THRESHOLD / 1024**3:.0f} GiB" + f" (largest: {largest.entry_path}," + f" {largest.size / 1024**3:.2f} GiB);" + f" the server does not support multipart zarr" + f" uploads (dandi-archive#2784)." + ) max_retries = 5 retry_count = 0 # Add all items to checksum tree (only done once) - for it in items_to_upload: + for it in all_items: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) - while items_to_upload and retry_count <= max_retries: + # Upload chunks above 5GB individually via multipart upload. + # ``_multipart_upload`` reports ``current`` as bytes within + # the single chunk being uploaded; translate it to bytes + # uploaded across the whole zarr so progress reporting + # stays monotonic for downstream consumers. + for it in multipart_items: + cumulative_before = bytes_uploaded + for status in _multipart_upload( + client=client, + filepath=it.filepath, + asset_path=it.entry_path, + upload_prefix="/uploads/zarr", + extra_init_fields={ + "zarr_id": zarr_id, + "chunk_key": it.entry_path, + }, + jobs=jobs, + ): + if ( + status.get("status") == "uploading" + and "current" in status + ): + cumulative = cumulative_before + status["current"] + yield { + "status": "uploading", + "progress": 100 * cumulative / to_upload.total_size, + "current": cumulative, + } + else: + yield status + changed = True + bytes_uploaded += it.size + + # Upload the remaining files using single part upload + while singlepart_items and retry_count <= max_retries: # Prepare upload requests for current items - uploading = [it.upload_request() for it in items_to_upload] + uploading = [it.upload_request() for it in singlepart_items] if retry_count == 0: lgr.debug( @@ -785,7 +844,7 @@ def mkzarr() -> str: upload_url=signed_url, item=it, ) - for (signed_url, it) in zip(r, items_to_upload) + for (signed_url, it) in zip(r, singlepart_items) ] changed = True @@ -817,20 +876,20 @@ def mkzarr() -> str: ) # Prepare for next iteration with retry items - if items_to_upload := retry_items: + if singlepart_items := retry_items: retry_count += 1 if retry_count <= max_retries: lgr.info( "%s: %s got 403 errors, requesting new URLs", asset_path, - pluralize(len(items_to_upload), "file"), + pluralize(len(singlepart_items), "file"), ) # Small delay before retry sleep(1 * retry_count) # Check if we exhausted retries - if items_to_upload: - nfiles_str = pluralize(len(items_to_upload), "file") + if singlepart_items: + nfiles_str = pluralize(len(singlepart_items), "file") raise UploadError( f"{asset_path}: failed to upload {nfiles_str} " f"after {max_retries} retries due to repeated 403 errors" diff --git a/dandi/support/digests.py b/dandi/support/digests.py index 7a69a1629..d57486a6e 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -26,6 +26,7 @@ from fscacher import PersistentCache from zarr_checksum.checksum import ZarrChecksum, ZarrChecksumManifest from zarr_checksum.tree import ZarrChecksumTree +from dandi.consts import ZARR_LARGE_CHUNK_THRESHOLD from .threaded_walk import threaded_walk from ..utils import Hasher, exclude_from_zarr @@ -133,7 +134,17 @@ def md5file_nocache(filepath: str | Path) -> str: Compute the MD5 digest of a file without caching with fscacher, which has been shown to slow things down for the large numbers of files typically present in Zarrs + + If the file is larger than `ZARR_LARGE_CHUNK_THRESHOLD`, the computed checksum is not a + traditional md5 checksum, but is instead an S3 multipart ETag. """ + if isinstance(filepath, str): + filepath = Path(filepath) + + if filepath.stat().st_size > ZARR_LARGE_CHUNK_THRESHOLD: + # For some reason the type checker treats this return as an Any type + return get_dandietag(filepath).as_str() # type: ignore [no-any-return] + return Digester(["md5"])(filepath)["md5"] diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 6f3dff19e..6c9683c7b 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -4,7 +4,7 @@ import os from pathlib import Path import subprocess -from unittest.mock import ANY +from unittest.mock import ANY, patch from dandischema.models import get_schema_version import numpy as np @@ -13,7 +13,7 @@ from .fixtures import SampleDandiset from .. import get_logger -from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file +from ..consts import ZARR_LARGE_CHUNK_THRESHOLD, ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..exceptions import UnknownAssetError from ..files import ( @@ -29,6 +29,7 @@ dandi_file, find_dandi_files, ) +from ..files.bases import _multipart_upload as real_multipart_upload lgr = get_logger() @@ -536,6 +537,53 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): assert r.headers["Content-Type"] == "application/json" +@pytest.mark.ai_generated +def test_upload_zarr_large_chunks(new_dandiset, tmp_path): + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload.""" + if not new_dandiset.client.supports_zarr_multipart_upload: + pytest.skip( + "Server does not expose the zarr multipart upload endpoints" + " (dandi-archive#2784)" + ) + filepath = tmp_path / "example.zarr" + zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) + + # Add a "large" array whose single chunk is a sparse file just over the threshold. + # Sparse files appear large to stat() without consuming real disk space. + store = zarr.open_group(str(filepath), mode="a") + large_arr = store.create_dataset( + "large", shape=(1,), chunks=(1,), dtype=np.uint8, compressor=None + ) + large_arr[:] = np.zeros(1, dtype=np.uint8) + large_chunk_path = filepath / "large" / "0" + large_chunk_path.unlink() + with open(large_chunk_path, "wb") as f: + f.seek(ZARR_LARGE_CHUNK_THRESHOLD + 1) + f.write(b"\x00") + + zf = dandi_file(filepath) + assert isinstance(zf, ZarrAsset) + + called_paths: list[str] = [] + + def spy_multipart_upload(**kwargs): + called_paths.append(kwargs["asset_path"]) + yield from real_multipart_upload(**kwargs) + + with patch("dandi.files.zarr._multipart_upload", spy_multipart_upload): + asset = zf.upload(new_dandiset.dandiset, {}) + + assert isinstance(asset, RemoteZarrAsset) + remote_entries = {str(e) for e in asset.iterfiles()} + + # Only the large chunk should have been routed through multipart upload + assert set(called_paths) == {"large/0"} + + # At least one chunk must have gone each path so the test is meaningful + assert len(called_paths) > 0 + assert len(remote_entries) - len(called_paths) > 0 + + def test_validate_deep_zarr(tmp_path: Path) -> None: zarr_path = tmp_path / "foo.zarr" zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1))