From 27a34dd57a0e8bee5c0df69b8ce3a15e79d06ee4 Mon Sep 17 00:00:00 2001 From: hrodmn Date: Sat, 16 May 2026 06:10:51 -0500 Subject: [PATCH] chore: replace ObjectStore type requirement with ObspecInput resolves #28 --- ARCHITECTURE.md | 10 ++-- README.md | 9 +++- docs/guides/cloud-storage.md | 12 +++-- pyproject.toml | 1 + src/lazycogs/_backend.py | 16 ++++--- src/lazycogs/_chunk_reader.py | 19 +++++--- src/lazycogs/_core.py | 46 ++++++++++-------- src/lazycogs/_explain.py | 7 +-- src/lazycogs/_store.py | 19 ++++---- tests/test_core.py | 88 +++++++++++++++++++++++++++++++---- tests/test_store.py | 17 +++++++ uv.lock | 2 + 12 files changed, 183 insertions(+), 63 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 6209445..0a9367c 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -45,7 +45,7 @@ src/lazycogs/ 2. Parses `time_period` into a `_TemporalGrouper` (see `_temporal.py`). 3. Converts `bbox` from the target CRS to EPSG:4326 using `pyproj.Transformer`. 4. Calls `_discover_bands()`: queries the parquet source via `duckdb_client.search(..., max_items=1)` to find asset keys. Assets with role `"data"` or media type `"image/tiff"` are returned first. -5. Calls `_smoketest_store()`: fetches one sample item from the parquet, resolves the object store for a representative data asset HREF, and calls `head()` to confirm access. Raises `RuntimeError` immediately with a clear message if the store cannot reach the asset, so misconfiguration is surfaced at `open()` time rather than deferred to the first chunk read. +5. Calls `_smoketest_store()`: fetches one sample item from the parquet, resolves the store for a representative data asset HREF, and calls `GeoTIFF.open(path, store=...)` to confirm that the configured store satisfies the same read contract the real chunk reader uses. Raises `RuntimeError` immediately with a clear message if the store cannot reach the asset, so misconfiguration is surfaced at `open()` time rather than deferred to the first chunk read. 6. Calls `_build_time_steps()`: queries the parquet source via `duckdb_client.search_to_arrow(...)` to obtain an Arrow table containing only the `datetime` and `start_datetime` columns (plus any filter/sort fields). Extracts timestamps from the Arrow columns without Python-level dict walking, buckets them with the `_TemporalGrouper`, deduplicates, and returns sorted `(filter_strings, time_coords)` pairs. Only groups with at least one item produce a time step. 7. Calls `compute_output_grid()` to get the output affine transform and dimensions (width, height). No eager coordinate arrays are produced. 8. Creates a single `MultiBandStacBackendArray` (a dataclass) with shape `(band, time, y, x)` holding all the parameters needed to materialise any chunk later, then wraps it in one `xarray.core.indexing.LazilyIndexedArray`. This avoids `xr.concat` (used internally by `ds.to_array()`), which would eagerly load `LazilyIndexedArray`-backed objects. @@ -248,11 +248,13 @@ mean that requires all weeks to be present before reducing). ## Store caching and the `store` parameter -`resolve()` in `_store.py` defers to `obstore.store.from_url` for scheme detection — including the special-case HTTPS routing for `amazonaws.com`, `r2.cloudflarestorage.com`, and Azure hosts — rather than maintaining its own list of known object-store domains. The constructed store is cached per thread in a `dict[str, ObjectStore]` keyed by root URL (`scheme://netloc`). Because dask tasks run in threads, this avoids repeated connection setup within a single task while remaining safe across concurrent tasks. +`lazycogs.open(..., store=...)` accepts the same obspec-compatible async range-read contract that `async-geotiff` consumes. In practice, any object that satisfies `async_tiff.ObspecInput` can be passed through and will be forwarded unchanged to `GeoTIFF.open(...)`. -No credential defaults are applied; stores are constructed with obstore's own environment-based credential discovery (environment variables, instance metadata, config files, etc.). For public buckets that do not require signed requests, callers pass `skip_signature=True` explicitly. For authenticated access or any non-default configuration, the caller is expected to construct an `ObjectStore` and pass it via the `store=` parameter to `open()`; `resolve()` then returns it unchanged and only extracts the object path from each HREF. No introspection is done on a user-supplied store — the caller is responsible for ensuring it is rooted at the same `scheme://netloc` the HREFs point to. +`resolve()` in `_store.py` remains the default convenience layer. When `store=None`, it defers to `obstore.store.from_url` for scheme detection — including the special-case HTTPS routing for `amazonaws.com`, `r2.cloudflarestorage.com`, and Azure hosts — rather than maintaining its own list of known object-store domains. The constructed obstore-backed store is cached per thread in a `dict[str, ObjectStore]` keyed by root URL (`scheme://netloc`). Because dask tasks run in threads, this avoids repeated connection setup within a single task while remaining safe across concurrent tasks. -`store_for(href, *, asset=None, **kwargs)` is a public convenience factory that automates this construction. It reads one sample item from the geoparquet file, extracts a data asset HREF, and calls `from_url` using obstore's environment-based credential discovery. If the item carries STAC Storage Extension metadata (v1.0.0 flat fields or v2.0.0 `storage:schemes`/`storage:refs`), `region` and `requester_pays` are also extracted and forwarded. Caller `kwargs` override all inferred values (e.g. `skip_signature=True` for public buckets). The returned store is not cached — the caller owns its lifetime and passes it to `open()` via `store=`. +No credential defaults are applied; auto-resolved stores are constructed with obstore's own environment-based credential discovery (environment variables, instance metadata, config files, etc.). For public buckets that do not require signed requests, callers pass `skip_signature=True` explicitly. For authenticated access or any non-default configuration, callers may still construct an obstore `ObjectStore` and pass it via `store=`; `resolve()` then returns it unchanged and only extracts the object path from each HREF. No introspection is done on a user-supplied store — the caller is responsible for ensuring it satisfies the `GeoTIFF.open` read contract and is rooted at the same `scheme://netloc` the HREFs point to. + +`store_for(href, *, asset=None, **kwargs)` is a public convenience factory that automates the default obstore path. It reads one sample item from the geoparquet file, extracts a data asset HREF, and calls `from_url` using obstore's environment-based credential discovery. If the item carries STAC Storage Extension metadata (v1.0.0 flat fields or v2.0.0 `storage:schemes`/`storage:refs`), `region` and `requester_pays` are also extracted and forwarded. Caller `kwargs` override all inferred values (e.g. `skip_signature=True` for public buckets). The returned store is not cached — the caller owns its lifetime and passes it to `open()` via `store=`. When the store root does not align with the URL structure of the asset HREFs — for example, an Azure Blob Storage store rooted at a container while the HREFs include the container name in the path — the caller can provide a `path_from_href` callable to `open()`. The callable takes the full HREF string and returns the object path to use with the store. When supplied, it replaces the default `urlparse`-based extraction in `resolve()`. diff --git a/README.md b/README.md index b406af3..a2edd0f 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Open a lazy `(band, time, y, x)` xarray DataArray from thousands of cloud-optimi [stackstac](https://stackstac.readthedocs.io) and [odc-stac](https://odc-stac.readthedocs.io) established the pattern that lazycogs builds on: take a STAC item collection and expose it as a spatially-aligned xarray DataArray ready for dask-parallel computation. Both are excellent tools that cover most satellite imagery workflows well. They rely on the trusty combination of rasterio and GDAL for data i/o and warping operations. -lazycogs takes the same approach but replaces GDAL and rasterio with a Rust-native stack: [rustac](https://stac-utils.github.io/rustac-py) for STAC queries over stac-geoparquet files, [async-geotiff](https://developmentseed/async-geotiff) for COG i/o, and [obstore](https://developmentseed.org/obstore) for cloud storage access. +lazycogs takes the same approach but replaces GDAL and rasterio with a Rust-native stack: [rustac](https://stac-utils.github.io/rustac-py) for STAC queries over stac-geoparquet files, [async-geotiff](https://developmentseed/async-geotiff) for COG i/o, and [obstore](https://developmentseed.org/obstore) as the default cloud storage integration. The result is a tool that can instantly expose a lazy xarray DataArray view of massive STAC item archives in any CRS and resolution. Each array operation triggers a targeted spatial query on the stac-geoparquet file to find only the assets needed for that specific chunk — no upfront scan of every item required. @@ -23,7 +23,7 @@ One constraint worth naming: lazycogs only reads Cloud Optimized GeoTIFFs. If yo |---|---| | STAC search + spatial indexing | `rustac` (DuckDB + geoparquet) | | COG I/O | `async-geotiff` (Rust, no GDAL) | -| Cloud storage | `obstore` | +| Cloud storage | `obstore` by default; any `async-geotiff`/obspec-compatible store when passed via `store=` | | Reprojection | `pyproj` + numpy | | Lazy dataset construction | xarray `BackendEntrypoint` + `LazilyIndexedArray` | @@ -97,6 +97,11 @@ event loop. Multiple concurrent chunk reads overlap naturally, so the async path can be faster than the synchronous `da.compute()` when reading many chunks inside an already-running loop. +## Custom stores + +`lazycogs.open(..., store=...)` accepts any store object that satisfies the async range-read contract consumed by `async-geotiff`. +For most users, the recommended path is still obstore: leave `store=None` to auto-resolve per-asset stores, or call `lazycogs.store_for()` to build one explicitly. + ## Documentation - [Home](https://developmentseed.github.io/lazycogs/) — quickstart and full usage guide diff --git a/docs/guides/cloud-storage.md b/docs/guides/cloud-storage.md index 9126a11..cab40be 100644 --- a/docs/guides/cloud-storage.md +++ b/docs/guides/cloud-storage.md @@ -1,15 +1,21 @@ # Cloud storage -lazycogs uses [obstore](https://developmentseed.org/obstore/latest/) to read COG assets from cloud object storage. This guide covers how to configure object stores for different storage backends and authentication scenarios. +lazycogs uses [obstore](https://developmentseed.org/obstore/latest/) as its default way to read COG assets from cloud object storage. It can also accept any custom store object that satisfies the async range-read contract consumed by `async-geotiff`. This guide covers the default obstore path plus the custom-store contract. ## Default behavior By default, `lazycogs.open()` parses each asset HREF into an `ObjectStore` using [`obstore.store.from_url`](https://developmentseed.org/obstore/latest/api/store/from_url/). No credential defaults are applied; the store uses obstore's own environment-based credential discovery (environment variables, instance metadata, config files, etc.). -`lazycogs.open()` runs a lightweight storage smoketest on startup: it resolves the object store for a sample data asset and calls `head()` to confirm access. If the store cannot reach the asset, a `RuntimeError` is raised immediately with a clear message rather than deferring the failure to the first chunk read. +`lazycogs.open()` runs a lightweight storage smoketest on startup: it resolves the store for a sample data asset and calls `GeoTIFF.open(..., store=...)` to confirm access through the same contract used by the real reader. If the store cannot reach the asset, a `RuntimeError` is raised immediately with a clear message rather than deferring the failure to the first chunk read. For public buckets that do not require signed requests, pass `skip_signature=True` when constructing the store. For authenticated buckets, provide credentials via environment variables or a pre-configured store. +## Custom store contract + +When you pass `store=` explicitly, lazycogs forwards that object to `async-geotiff`. The object does not need to be an obstore `ObjectStore`; it only needs to satisfy the obspec-compatible async range-read contract accepted by `GeoTIFF.open()`. + +For most users, obstore is still the recommended path because `store=None` auto-resolves it for each asset HREF and `lazycogs.store_for()` constructs it for you. + ## Constructing a store from your data `lazycogs.store_for()` inspects a geoparquet file and builds a matching `ObjectStore` automatically. It reads one sample item, derives the store root from a data asset HREF, and infers `region` and `requester_pays` from [STAC Storage Extension](https://github.com/stac-extensions/storage) metadata when present. @@ -29,7 +35,7 @@ store = lazycogs.store_for("items.parquet", asset="B04") ## Constructing stores manually -For authenticated buckets, requester-pays buckets, custom endpoints, or non-standard authentication, construct the store yourself: +For authenticated buckets, requester-pays buckets, custom endpoints, or non-standard authentication, construct an obstore-backed store yourself: ```python from obstore.store import S3Store, GCSStore, HTTPStore diff --git a/pyproject.toml b/pyproject.toml index 97ae53c..bb7bbd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "affine>=2.4.0", "arro3-core>=0.8.0", "async-geotiff>=0.4.0", + "async-tiff>=0.7.1", "cql2>=0.5.5", "numpy>=2.4.4", "obstore>=0.9.2", diff --git a/src/lazycogs/_backend.py b/src/lazycogs/_backend.py index e9ec514..b17e8c5 100644 --- a/src/lazycogs/_backend.py +++ b/src/lazycogs/_backend.py @@ -27,6 +27,7 @@ if TYPE_CHECKING: from collections.abc import Callable + from async_tiff import ObspecInput from rustac import DuckdbClient from lazycogs._mosaic_methods import MosaicMethodBase @@ -61,7 +62,8 @@ class _ChunkReadPlan: chunk_height: Chunk height in pixels. nodata: No-data fill value, or ``None``. mosaic_method_cls: Mosaic method class, or ``None`` for the default. - store: Pre-configured obstore ``ObjectStore`` instance, or ``None``. + store: Pre-configured obspec-compatible store accepted by + ``GeoTIFF.open``, or ``None``. max_concurrent_reads: Maximum concurrent COG reads per chunk. warp_cache: Shared warp map cache across time steps. path_fn: Optional callable extracting an object path from an asset HREF. @@ -83,7 +85,7 @@ class _ChunkReadPlan: chunk_height: int nodata: float | None mosaic_method_cls: type[MosaicMethodBase] | None - store: Any | None + store: ObspecInput | None max_concurrent_reads: int warp_cache: dict path_fn: Callable[[str], str] | None @@ -335,10 +337,10 @@ class MultiBandStacBackendArray(BackendArray): mosaic_method_cls: Mosaic method class instantiated per chunk, or ``None`` to use the default :class:`~lazycogs._mosaic_methods.FirstMethod`. - store: Pre-configured obstore ``ObjectStore`` instance shared across - all chunk reads. When ``None``, each asset HREF is resolved to a - store via the thread-local cache in - :func:`~lazycogs._store.resolve`. + store: Pre-configured obspec-compatible store accepted by + ``GeoTIFF.open`` and shared across all chunk reads. When ``None``, + each asset HREF is resolved to an obstore-backed store via the + thread-local cache in :func:`~lazycogs._store.resolve`. max_concurrent_reads: Maximum number of COG reads to run concurrently per chunk. Limits peak in-flight memory when a chunk overlaps many items. Defaults to 32. @@ -369,7 +371,7 @@ class MultiBandStacBackendArray(BackendArray): dtype: np.dtype nodata: float | None mosaic_method_cls: type[MosaicMethodBase] | None = field(default=None) - store: Any | None = field(default=None) + store: ObspecInput | None = field(default=None) max_concurrent_reads: int = field(default=32) path_from_href: Callable[[str], str] | None = field(default=None) shape: tuple[int, ...] = field(init=False) diff --git a/src/lazycogs/_chunk_reader.py b/src/lazycogs/_chunk_reader.py index b3b7439..05385b2 100644 --- a/src/lazycogs/_chunk_reader.py +++ b/src/lazycogs/_chunk_reader.py @@ -26,7 +26,7 @@ from collections.abc import Callable from affine import Affine - from obstore.store import ObjectStore + from async_tiff import ObspecInput from pyproj import CRS, Transformer logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ class _ChunkContext: chunk_width: int chunk_height: int nodata: float | None - store: ObjectStore | None + store: ObspecInput | None path_fn: Callable[[str], str] | None warp_cache: dict[tuple[tuple[float, ...], CRS], WarpMap] | None @@ -375,7 +375,10 @@ async def _read_item_band( return None # Open all COGs concurrently for metadata. - async def _open_band(band: str, href: str) -> tuple[str, GeoTIFF, ObjectStore]: + async def _open_band( + band: str, + href: str, + ) -> tuple[str, GeoTIFF, ObspecInput]: band_store, path = _resolve_store(href, ctx.store, ctx.path_fn) geotiff = await GeoTIFF.open(path, store=band_store) return band, geotiff, band_store @@ -520,7 +523,7 @@ async def read_chunk_async( chunk_height: int, nodata: float | None = None, mosaic_method_cls: type[MosaicMethodBase] | None = None, - store: ObjectStore | None = None, + store: ObspecInput | None = None, max_concurrent_reads: int = 32, warp_cache: dict | None = None, path_fn: Callable[[str], str] | None = None, @@ -544,7 +547,8 @@ async def read_chunk_async( nodata: No-data fill value. mosaic_method_cls: Mosaic method class instantiated once per band. Defaults to :class:`~lazycogs._mosaic_methods.FirstMethod`. - store: Optional pre-configured obstore ``ObjectStore`` instance. + store: Optional pre-configured obspec-compatible store accepted by + ``GeoTIFF.open``. max_concurrent_reads: Maximum number of COG reads to run concurrently. warp_cache: Optional cache shared across calls for reusing warp maps from earlier time steps. @@ -626,7 +630,7 @@ def read_chunk( chunk_height: int, nodata: float | None = None, mosaic_method_cls: type[MosaicMethodBase] | None = None, - store: ObjectStore | None = None, + store: ObspecInput | None = None, max_concurrent_reads: int = 32, warp_cache: dict | None = None, path_fn: Callable[[str], str] | None = None, @@ -645,7 +649,8 @@ def read_chunk( nodata: No-data fill value. mosaic_method_cls: Mosaic method class instantiated once per band. Defaults to :class:`~lazycogs._mosaic_methods.FirstMethod`. - store: Optional pre-configured obstore ``ObjectStore`` instance. + store: Optional pre-configured obspec-compatible store accepted by + ``GeoTIFF.open``. max_concurrent_reads: Maximum number of COG reads to run concurrently. warp_cache: Optional cache shared across calls for reusing warp maps from earlier time steps. diff --git a/src/lazycogs/_core.py b/src/lazycogs/_core.py index 758ded9..eb96933 100644 --- a/src/lazycogs/_core.py +++ b/src/lazycogs/_core.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Self import numpy as np +from async_geotiff import GeoTIFF from pyproj import CRS, Transformer from rasterix import RasterIndex from rustac import DuckdbClient @@ -15,6 +16,7 @@ from lazycogs._backend import MultiBandStacBackendArray from lazycogs._cql2 import _extract_filter_fields, _sortby_fields +from lazycogs._executor import _run_coroutine from lazycogs._grid import compute_output_grid from lazycogs._mosaic_methods import FirstMethod, MosaicMethodBase from lazycogs._store import resolve @@ -24,7 +26,7 @@ from collections.abc import Callable from arro3.core import Table - from obstore.store import ObjectStore + from async_tiff import ObspecInput logger = logging.getLogger(__name__) @@ -105,6 +107,11 @@ def _discover_bands( return data_bands or other_bands or list(assets) +async def _open_store_sample(path: str, *, store: ObspecInput) -> None: + """Open one representative asset through ``GeoTIFF.open`` for validation.""" + await GeoTIFF.open(path, store=store) + + def _smoketest_store( parquet_path: str, *, @@ -114,15 +121,15 @@ def _smoketest_store( filter: str | dict[str, Any] | None = None, ids: list[str] | None = None, bands: list[str] | None = None, - store: ObjectStore | None = None, + store: ObspecInput | None = None, path_from_href: Callable[[str], str] | None = None, ) -> None: - """Verify the object store can access a sample asset from the parquet. + """Verify the configured store can open a sample asset from the parquet. - Fetches one item, resolves the object store for a representative data asset - HREF, and calls ``head()`` to confirm access. Raises ``RuntimeError`` if - the store cannot reach the asset so misconfiguration is caught at - :func:`open` time rather than deferred to the first chunk read. + Fetches one item, resolves the store for a representative data asset HREF, + and validates it by calling ``GeoTIFF.open`` on the resolved path. Raises + ``RuntimeError`` if the store cannot reach the asset so misconfiguration is + caught at :func:`open` time rather than deferred to the first chunk read. """ items = duckdb_client.search( parquet_path, @@ -161,10 +168,10 @@ def _smoketest_store( resolved_store, path = resolve(href, store=store, path_fn=path_from_href) try: - resolved_store.head(path) + _run_coroutine(_open_store_sample(path, store=resolved_store)) except Exception as e: raise RuntimeError( - f"Object store cannot access {href!r}: {e}. " + f"Store cannot open {href!r} through GeoTIFF.open: {e}. " "Pass a configured store= argument to lazycogs.open() to authenticate. " "See the cloud storage guide for examples.", ) from e @@ -299,7 +306,7 @@ def _build_dataarray( out_dtype: np.dtype, method_cls: type[MosaicMethodBase], chunks: dict[str, int] | None, - store: ObjectStore | None = None, + store: ObspecInput | None = None, max_concurrent_reads: int = 32, path_from_href: Callable[[str], str] | None = None, ) -> DataArray: @@ -329,9 +336,9 @@ def _build_dataarray( out_dtype: Output array dtype. method_cls: Mosaic method class. chunks: Passed to ``DataArray.chunk()`` if not ``None``. - store: Pre-configured obstore ``ObjectStore`` instance. When - provided, it is used directly for all asset reads instead of - resolving a store from each HREF. + store: Pre-configured obspec-compatible store accepted by + ``GeoTIFF.open``. When provided, it is used directly for all asset + reads instead of resolving an obstore-backed store from each HREF. max_concurrent_reads: Maximum number of COG reads to run concurrently per chunk. path_from_href: Optional callable ``(href: str) -> str`` passed to @@ -470,7 +477,7 @@ def open( # noqa: A001 dtype: str | np.dtype | None = None, mosaic_method: type[MosaicMethodBase] | None = None, time_period: str = "P1D", - store: ObjectStore | None = None, + store: ObspecInput | None = None, max_concurrent_reads: int = 32, path_from_href: Callable[[str], str] | None = None, duckdb_client: DuckdbClient | None = None, @@ -512,11 +519,12 @@ def open( # noqa: A001 (calendar year). Defaults to ``"P1D"`` (one step per calendar day), which preserves the previous behaviour. Multi-day windows such as ``"P16D"`` are aligned to an epoch of 2000-01-01. - store: Pre-configured obstore ``ObjectStore`` instance to use for all - asset reads. Useful when credentials, custom endpoints, or - non-default options are needed without relying on automatic store - resolution from each HREF. When ``None`` (default), each asset - URL is parsed to create or reuse a per-thread cached store. + store: Pre-configured obspec-compatible store accepted by + ``GeoTIFF.open`` to use for all asset reads. Useful when + credentials, custom endpoints, or non-default options are needed + without relying on automatic store resolution from each HREF. When + ``None`` (default), each asset URL is parsed to create or reuse a + per-thread cached obstore-backed store. max_concurrent_reads: Maximum number of COG reads to run concurrently per chunk. Items are processed in batches of this size, which bounds peak in-flight memory when a chunk overlaps many files. diff --git a/src/lazycogs/_explain.py b/src/lazycogs/_explain.py index 217a300..bc4b0bb 100644 --- a/src/lazycogs/_explain.py +++ b/src/lazycogs/_explain.py @@ -20,7 +20,7 @@ if TYPE_CHECKING: from collections.abc import Iterator - from obstore.store import ObjectStore + from async_tiff import ObspecInput from lazycogs._backend import MultiBandStacBackendArray @@ -447,7 +447,7 @@ async def _inspect_item_async( dst_crs: CRS, chunk_width: int, chunk_height: int, - store: ObjectStore | None = None, + store: ObspecInput | None = None, ) -> CogRead | None: """Open a COG header and compute the overview level and read window. @@ -460,7 +460,8 @@ async def _inspect_item_async( dst_crs: CRS of the destination chunk. chunk_width: Chunk width in pixels. chunk_height: Chunk height in pixels. - store: Optional pre-configured obstore ``ObjectStore``. + store: Optional pre-configured obspec-compatible store accepted by + ``GeoTIFF.open``. Returns: A :class:`CogRead` with all header fields populated, or ``None`` if diff --git a/src/lazycogs/_store.py b/src/lazycogs/_store.py index 77bcbae..2a594eb 100644 --- a/src/lazycogs/_store.py +++ b/src/lazycogs/_store.py @@ -1,4 +1,4 @@ -"""Resolve cloud storage HREFs into obstore ``ObjectStore`` instances.""" +"""Resolve cloud storage HREFs into store/path pairs and obstore stores.""" from __future__ import annotations @@ -15,6 +15,7 @@ if TYPE_CHECKING: from collections.abc import Callable + from async_tiff import ObspecInput from obstore.store import ObjectStore logger = logging.getLogger(__name__) @@ -31,15 +32,16 @@ def _cache() -> dict[str, ObjectStore]: def resolve( href: str, - store: ObjectStore | None = None, + store: ObspecInput | None = None, path_fn: Callable[[str], str] | None = None, -) -> tuple[ObjectStore, str]: - """Resolve an HREF into an ``(ObjectStore, path)`` pair. +) -> tuple[ObspecInput, str]: + """Resolve an HREF into a ``(store, path)`` pair. When ``store`` is supplied, it is returned unchanged and only the object path is extracted from the HREF. The caller is responsible for ensuring - the store is rooted at the same ``scheme://netloc`` the HREF points to; - no introspection is performed on the provided store. + the store satisfies the obspec read contract accepted by + ``GeoTIFF.open`` and is rooted at the same ``scheme://netloc`` the HREF + points to; no introspection is performed on the provided store. When ``store`` is ``None``, a store is auto-constructed via :func:`obstore.store.from_url` using only the ``scheme://netloc`` portion @@ -53,7 +55,8 @@ def resolve( href: A storage URL supported by :func:`obstore.store.from_url` (``s3``, ``s3a``, ``gs``, Azure variants, ``http``, ``https``, ``file``, ``memory``). - store: Optional pre-configured ``ObjectStore`` to use directly. + store: Optional pre-configured obspec-compatible store accepted by + ``GeoTIFF.open``. path_fn: Optional callable that takes the full HREF and returns the object path to use with the store. When provided, it replaces the default ``urlparse``-based path extraction. Only meaningful when @@ -92,7 +95,7 @@ def store_for( duckdb_client: DuckdbClient | None = None, **kwargs: object, ) -> ObjectStore: - """Construct an ``ObjectStore`` by inspecting a geoparquet STAC items file. + """Construct an ``ObjectStore`` by inspecting a stac-geoparquet sample asset. Reads one sample item from *href*, derives the store root URL from a data asset HREF, and constructs an ``ObjectStore`` with obstore's own diff --git a/tests/test_core.py b/tests/test_core.py index f6ad72a..8de53f8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -312,9 +312,13 @@ def opened_dataarray(tmp_path): table = _items_to_arrow([{"properties": {"datetime": "2023-01-15T10:00:00Z"}}]) + async def fake_open(path: str, *, store): + return object() + with ( patch("rustac.DuckdbClient.search", return_value=[_fake_open_item()]), patch("rustac.DuckdbClient.search_to_arrow", return_value=table), + patch("lazycogs._core.GeoTIFF.open", side_effect=fake_open), ): return lazycogs.open( str(parquet), @@ -392,6 +396,17 @@ def test_chunked_spatial_selection_full_compute_succeeds(opened_dataarray): # _smoketest_store # --------------------------------------------------------------------------- + +class _ProtocolStore: + """Minimal non-obstore store that satisfies the async_tiff protocol shape.""" + + async def get_range_async(self, _start: int, _end: int): + return b"" + + async def get_ranges_async(self, starts_ends): + return [b"" for _ in starts_ends] + + _SMOKETEST_ITEM = { "id": "smoke-item", "stac_extensions": [], @@ -406,13 +421,22 @@ def test_chunked_spatial_selection_full_compute_succeeds(opened_dataarray): } -def test_smoketest_passes_when_head_succeeds(): - """_smoketest_store does not raise when head() succeeds.""" +def test_smoketest_passes_when_geotiff_open_succeeds(): + """_smoketest_store does not raise when GeoTIFF.open succeeds.""" store = MemoryStore() store.put("B04.tif", b"dummy") - with patch("rustac.DuckdbClient.search", return_value=[_SMOKETEST_ITEM]): + async def fake_open(path: str, *, store): + assert path == "B04.tif" + assert store is store_obj + return object() + + store_obj = store + with ( + patch("rustac.DuckdbClient.search", return_value=[_SMOKETEST_ITEM]), + patch("lazycogs._core.GeoTIFF.open", side_effect=fake_open), + ): _smoketest_store( "items.parquet", duckdb_client=DuckdbClient(), @@ -421,14 +445,18 @@ def test_smoketest_passes_when_head_succeeds(): ) -def test_smoketest_raises_runtime_error_on_head_failure(): - """_smoketest_store raises RuntimeError when the store cannot access the asset.""" +def test_smoketest_raises_runtime_error_on_geotiff_open_failure(): + """_smoketest_store raises RuntimeError when GeoTIFF.open fails.""" - store = MemoryStore() # empty — head() will raise + store = MemoryStore() + + async def fake_open(path: str, *, store): + raise FileNotFoundError(path) with ( patch("rustac.DuckdbClient.search", return_value=[_SMOKETEST_ITEM]), - pytest.raises(RuntimeError, match="cannot access"), + patch("lazycogs._core.GeoTIFF.open", side_effect=fake_open), + pytest.raises(RuntimeError, match=r"GeoTIFF\.open"), ): _smoketest_store( "items.parquet", @@ -438,6 +466,37 @@ def test_smoketest_raises_runtime_error_on_head_failure(): ) +def test_smoketest_accepts_protocol_store_without_head(tmp_path): + """A custom protocol store without head() still passes startup validation.""" + + parquet = tmp_path / "items.parquet" + parquet.write_bytes(b"") + store = _ProtocolStore() + table = _items_to_arrow([{"properties": {"datetime": "2023-01-15T10:00:00Z"}}]) + + async def fake_open(path: str, *, store): + assert path == "B04.tif" + assert store is protocol_store + return object() + + protocol_store = store + with ( + patch("rustac.DuckdbClient.search", return_value=[_fake_open_item()]), + patch("rustac.DuckdbClient.search_to_arrow", return_value=table), + patch("lazycogs._core.GeoTIFF.open", side_effect=fake_open), + ): + da = lazycogs.open( + str(parquet), + bbox=(0.0, 0.0, 100.0, 100.0), + crs="EPSG:32632", + resolution=10.0, + store=store, + path_from_href=lambda href: href.split("/", 3)[-1], + ) + + assert da.attrs["_stac_backend"].store is store + + def test_smoketest_no_op_when_no_items(): """_smoketest_store does nothing when the query returns no items.""" with patch("rustac.DuckdbClient.search", return_value=[]): @@ -448,7 +507,6 @@ def test_smoketest_prefers_specified_band(): """_smoketest_store uses the first specified band when bands= is given.""" store = MemoryStore() - store.put("B08.tif", b"dummy") item = { "id": "multi-band-item", @@ -460,8 +518,16 @@ def test_smoketest_prefers_specified_band(): }, } - with patch("rustac.DuckdbClient.search", return_value=[item]): - # B08 exists in the MemoryStore; B04 does not — smoketest must pick B08 + seen: list[str] = [] + + async def fake_open(path: str, *, store): + seen.append(path) + return object() + + with ( + patch("rustac.DuckdbClient.search", return_value=[item]), + patch("lazycogs._core.GeoTIFF.open", side_effect=fake_open), + ): _smoketest_store( "items.parquet", duckdb_client=DuckdbClient(), @@ -469,3 +535,5 @@ def test_smoketest_prefers_specified_band(): store=store, path_from_href=lambda href: href.split("/", 3)[-1], ) + + assert seen == ["B08.tif"] diff --git a/tests/test_store.py b/tests/test_store.py index 4221193..26c69f6 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -15,6 +15,15 @@ ) from lazycogs._store import resolve, store_for + +class _ProtocolStore: + async def get_range_async(self, _start: int, _end: int): + return b"" + + async def get_ranges_async(self, starts_ends): + return [b"" for _ in starts_ends] + + # --------------------------------------------------------------------------- # resolve # --------------------------------------------------------------------------- @@ -87,6 +96,14 @@ def test_user_supplied_store_is_returned_unchanged(): assert path == "some/key.tif" +def test_user_supplied_protocol_store_is_returned_unchanged(): + """resolve() accepts a non-obstore store that satisfies the reader contract.""" + user_store = _ProtocolStore() + store, path = resolve("s3://bucket/some/key.tif", store=user_store) + assert store is user_store + assert path == "some/key.tif" + + def test_user_supplied_store_bypasses_cache(): """Passing a store should never consult or populate the auto-cache.""" user_store = MemoryStore() diff --git a/uv.lock b/uv.lock index 28b3b74..a23dbcd 100644 --- a/uv.lock +++ b/uv.lock @@ -1290,6 +1290,7 @@ dependencies = [ { name = "affine" }, { name = "arro3-core" }, { name = "async-geotiff" }, + { name = "async-tiff" }, { name = "cql2" }, { name = "numpy" }, { name = "obstore" }, @@ -1328,6 +1329,7 @@ requires-dist = [ { name = "affine", specifier = ">=2.4.0" }, { name = "arro3-core", specifier = ">=0.8.0" }, { name = "async-geotiff", specifier = ">=0.4.0" }, + { name = "async-tiff", specifier = ">=0.7.1" }, { name = "cql2", specifier = ">=0.5.5" }, { name = "dask", marker = "extra == 'dask'", specifier = ">=2026.3.0" }, { name = "numpy", specifier = ">=2.4.4" },