Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dandi/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def urls(self) -> Iterator[str]:
#: Maximum number of Zarr directory entries to delete at once
ZARR_DELETE_BATCH_SIZE = 100

#: Zarr chunks above this size (bytes) are uploaded via multipart upload
ZARR_LARGE_CHUNK_THRESHOLD = 5 * 1024 * 1024 * 1024 # 5 GB

BIDS_DATASET_DESCRIPTION = "dataset_description.json"

BIDS_IGNORE_FILE = ".bidsignore"
Expand Down
22 changes: 22 additions & 0 deletions dandi/dandiapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datetime import datetime
from enum import Enum
from fnmatch import fnmatchcase
from functools import cached_property
import json
import os.path
from pathlib import Path, PurePosixPath
Expand Down Expand Up @@ -565,6 +566,27 @@ def _get_keyring_ids(self) -> tuple[str, str]:
def _instance_id(self) -> str:
return self.dandi_instance.name.upper()

@cached_property
def supports_zarr_multipart_upload(self) -> bool:
"""
Whether the server exposes the zarr multipart upload endpoints
introduced in dandi-archive#2784 (``/uploads/zarr/initialize/`` and
friends).

Probed once per client by POSTing an empty body to
``/uploads/zarr/initialize/``: if the route does not exist the server
returns 404; any other status (400 for the bad payload, 401/403 for
auth, etc.) means the route is present and the server supports
multipart zarr uploads.
"""
try:
self.post("/uploads/zarr/initialize/", json={})
except HTTP404Error:
return False
except requests.HTTPError:
return True
return True

def get_dandiset(
self, dandiset_id: str, version_id: str | None = None, lazy: bool = True
) -> RemoteDandiset:
Expand Down
230 changes: 127 additions & 103 deletions dandi/files/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from abc import ABC, abstractmethod
from collections import deque
from collections.abc import Iterator
from collections.abc import Generator, Iterator
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -341,118 +341,28 @@ def iter_upload(
``"done"`` and an ``"asset"`` key containing the resulting
`RemoteAsset`.
"""
# Avoid heavy import by importing within function:
from dandi.support.digests import get_dandietag

asset_path = metadata.setdefault("path", self.path)
client = dandiset.client
yield {"status": "calculating etag"}
etagger = get_dandietag(self.filepath)
filetag = etagger.as_str()
lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath)
digest = metadata.get("digest", {})
if "dandi:dandi-etag" in digest:
if digest["dandi:dandi-etag"] != filetag:
raise RuntimeError(
f"{self.filepath}: File etag changed; was originally"
f" {digest['dandi:dandi-etag']} but is now {filetag}"
)
yield {"status": "initiating upload"}
lgr.debug("%s: Beginning upload", asset_path)
total_size = pre_upload_size_check(self.filepath)
expected_etag = metadata.get("digest", {}).get("dandi:dandi-etag")
try:
resp = client.post(
"/uploads/initialize/",
json={
"contentSize": total_size,
"digest": {
"algorithm": "dandi:dandi-etag",
"value": filetag,
},
"dandiset": dandiset.identifier,
},
validate_resp = yield from _multipart_upload(
client=client,
filepath=self.filepath,
asset_path=asset_path,
upload_prefix="/uploads",
extra_init_fields={"dandiset": dandiset.identifier},
expected_etag=expected_etag,
jobs=jobs,
)
if validate_resp is None:
raise RuntimeError("Expected upload response of type `dict` but received `None`")
blob_id = validate_resp["blob_id"]
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 409:
lgr.debug("%s: Blob already exists on server", asset_path)
blob_id = e.response.headers["Location"]
else:
raise
else:
try:
upload_id = resp["upload_id"]
parts = resp["parts"]
if len(parts) != etagger.part_qty:
raise RuntimeError(
f"Server and client disagree on number of parts for upload;"
f" server says {len(parts)}, client says {etagger.part_qty}"
)
parts_out = []
bytes_uploaded = 0
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
with RESTFullAPIClient("http://nil.nil") as storage:
with self.filepath.open("rb") as fp:
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
lock = Lock()
futures = [
executor.submit(
_upload_blob_part,
storage_session=storage,
fp=fp,
lock=lock,
etagger=etagger,
asset_path=asset_path,
part=part,
)
for part in parts
]
for fut in as_completed(futures):
out_part = fut.result()
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"progress": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
lgr.debug("%s: Completing upload", asset_path)
resp = client.post(
f"/uploads/{upload_id}/complete/",
json={"parts": parts_out},
)
lgr.debug(
"%s: Announcing completion to %s",
asset_path,
resp["complete_url"],
)
r = storage.post(
resp["complete_url"], data=resp["body"], json_resp=False
)
lgr.debug(
"%s: Upload completed. Response content: %s",
asset_path,
r.content,
)
rxml = fromstring(r.text)
m = re.match(r"\{.+?\}", rxml.tag)
ns = m.group(0) if m else ""
final_etag = rxml.findtext(f"{ns}ETag")
if final_etag is not None:
final_etag = final_etag.strip('"')
if final_etag != filetag:
raise RuntimeError(
"Server and client disagree on final ETag of"
f" uploaded file; server says {final_etag},"
f" client says {filetag}"
)
# else: Error? Warning?
resp = client.post(f"/uploads/{upload_id}/validate/")
blob_id = resp["blob_id"]
except Exception:
post_upload_size_check(self.filepath, total_size, True)
raise
else:
post_upload_size_check(self.filepath, total_size, False)
lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path)
yield {"status": "producing asset"}
if replacing is not None:
Expand Down Expand Up @@ -696,6 +606,120 @@ def _upload_blob_part(
}


def _multipart_upload(
client: RESTFullAPIClient,
filepath: Path,
asset_path: str,
upload_prefix: str,
extra_init_fields: dict,
expected_etag: str | None = None,
jobs: int | None = None,
) -> Generator[dict, None, dict | None]:
"""Perform multipart upload: etag calculation, initialization, part upload, and validation.

Yields progress dicts and returns the validate response dict. If
``expected_etag`` is provided and does not match the computed etag, raises
``RuntimeError``. A 409 HTTPError from the initialize call propagates
unchanged so the caller can handle the "blob already exists" case.
"""
# Avoid heavy import by importing within function:
from dandi.support.digests import get_dandietag

yield {"status": "calculating etag"}
etagger = get_dandietag(filepath)
filetag = etagger.as_str()
lgr.debug("Calculated dandi-etag of %s for %s", filetag, filepath)
if expected_etag is not None and filetag != expected_etag:
raise RuntimeError(
f"{filepath}: File etag changed; was originally"
f" {expected_etag} but is now {filetag}"
)
yield {"status": "initiating upload"}
lgr.debug("%s: Beginning upload", asset_path)
total_size = pre_upload_size_check(filepath)
resp = client.post(
f"{upload_prefix}/initialize/",
json={
"contentSize": total_size,
"digest": {"algorithm": "dandi:dandi-etag", "value": filetag},
**extra_init_fields,
},
)
try:
upload_id = resp["upload_id"]
parts = resp["parts"]
if len(parts) != etagger.part_qty:
raise RuntimeError(
f"Server and client disagree on number of parts for upload;"
f" server says {len(parts)}, client says {etagger.part_qty}"
)
parts_out = []
bytes_uploaded = 0
lgr.debug("Uploading %s in %d parts", filepath, len(parts))
with RESTFullAPIClient("http://nil.nil") as storage:
with filepath.open("rb") as fp:
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
lock = Lock()
futures = [
executor.submit(
_upload_blob_part,
storage_session=storage,
fp=fp,
lock=lock,
etagger=etagger,
asset_path=asset_path,
part=part,
)
for part in parts
]
for fut in as_completed(futures):
out_part = fut.result()
bytes_uploaded += out_part["size"]
yield {
"status": "uploading",
"progress": 100 * bytes_uploaded / total_size,
"current": bytes_uploaded,
}
parts_out.append(out_part)
lgr.debug("%s: Completing upload", asset_path)
resp = client.post(
f"{upload_prefix}/{upload_id}/complete/",
json={"parts": parts_out},
)
lgr.debug(
"%s: Announcing completion to %s",
asset_path,
resp["complete_url"],
)
r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False)
lgr.debug(
"%s: Upload completed. Response content: %s",
asset_path,
r.content,
)
rxml = fromstring(r.text)
m = re.match(r"\{.+?\}", rxml.tag)
ns = m.group(0) if m else ""
final_etag = rxml.findtext(f"{ns}ETag")
if final_etag is not None:
final_etag = final_etag.strip('"')
if final_etag != filetag:
raise RuntimeError(
"Server and client disagree on final ETag of"
f" uploaded file; server says {final_etag},"
f" client says {filetag}"
)
# else: Error? Warning?
validate_resp: dict | None = client.post(f"{upload_prefix}/{upload_id}/validate/")
except Exception:
post_upload_size_check(filepath, total_size, True)
raise
else:
post_upload_size_check(filepath, total_size, False)

return validate_resp


def _check_required_fields(
d: dict, required: list[str], file_path: str
) -> list[ValidationResult]:
Expand Down
Loading
Loading