Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Architecture: lazycogs

lazycogs turns a geoparquet STAC item index into a lazy `(band, time, y, x)` xarray DataArray backed by Cloud-Optimized GeoTIFFs. It requires no GDAL. All raster I/O is done through `async-geotiff` (Rust-backed), spatial queries go through DuckDB via `rustac`, and reprojection is pure `pyproj` + numpy.
lazycogs turns a geoparquet STAC item index into a lazy `(band, time, y, x)` xarray DataArray backed by Cloud-Optimized GeoTIFFs. It requires no GDAL. All raster I/O is done through `async-geotiff` (Rust-backed), spatial queries go through DuckDB via `rustac`, and reprojection flows through a small dispatcher in `_warp.py` that routes all public resampling modes to rust-warp.

## Why parquet, not a STAC API URL

Expand Down Expand Up @@ -30,7 +30,7 @@ src/lazycogs/
_executor.py Per-chunk reprojection thread pool configuration. Exposes set_reproject_workers() and get_max_workers(); the actual pool is created per event loop in _backend.py.
_explain.py Dry-run read estimator. Registers the da.lazycogs.explain() xarray accessor.
_grid.py Compute output affine transform and dimensions from bbox + resolution.
_reproject.py Nearest-neighbor reprojection using pyproj Transformer + numpy fancy indexing.
_warp.py Reprojection request types and rust-warp dispatch.
_storage_ext.py STAC Storage Extension metadata parsing (version detection, kwargs extraction for v1 and v2).
_store.py Resolve cloud HREFs into obstore Store instances (or route through a user-supplied store) with a thread-local cache; store_for() factory for constructing stores from parquet STAC files.
_temporal.py Temporal grouping strategies (day, week, month, year, fixed-day-count).
Expand All @@ -42,7 +42,7 @@ src/lazycogs/
`open()` in `_core.py`:

1. Resolves `duckdb_client`: if not provided, creates a plain `DuckdbClient()`. Validates that `href` ends in `.parquet`/`.geoparquet` when no client is supplied (a directory path is accepted when a custom client is passed).
2. Parses `time_period` into a `_TemporalGrouper` (see `_temporal.py`).
2. Validates `time_period` and `resampling` up front in `_core.py`, so unsupported public options fail before any storage or DuckDB I/O.
3. Converts `bbox` from the target CRS to EPSG:4326 using `pyproj.Transformer`.
4. Calls `_discover_bands()`: queries the parquet source via `duckdb_client.search(..., max_items=1)` to find asset keys. Assets with role `"data"` or media type `"image/tiff"` are returned first.
5. Calls `_smoketest_store()`: fetches one sample item from the parquet, resolves the object store for a representative data asset HREF, and calls `head()` to confirm access. Raises `RuntimeError` immediately with a clear message if the store cannot reach the asset, so misconfiguration is surfaced at `open()` time rather than deferred to the first chunk read.
Expand Down Expand Up @@ -125,33 +125,27 @@ If the chunk bbox falls entirely outside the source image after clamping, `_nati

`await reader.read(window=window)` fetches the windowed pixel data from the selected overview level (or full-res). The result is a `(bands, window_h, window_w)` array in the source CRS/grid.

### 4. Nearest-neighbor reprojection
### 4. Reprojection dispatch and current backend state

`reproject_array()` in `_reproject.py` warps the source tile onto the destination chunk grid without GDAL:
`_chunk_reader.py` builds a `ReprojectRequest` and calls `reproject_tile()` in `_warp.py`. The dispatcher always short-circuits exact same-grid reads. After that, all public reprojection requests route through rust-warp for `nearest`, `bilinear`, and `cubic`.

1. Build a meshgrid of destination pixel-centre coordinates.
2. Transform all coordinates from `dst_crs` to `src_crs` in one vectorised `Transformer.transform()` call.
3. Apply the inverse source affine transform to get fractional source pixel indices.
4. `np.floor` rounds to the nearest-neighbor sample; numpy fancy indexing populates the output array.
5. Out-of-bounds pixels get the nodata fill value.

Nearest-neighbor is the only supported resampling method.
Public resampling is validated at `open()` time and must be passed as a `ResamplingMethod` enum. Same-grid reads bypass reprojection regardless of the selected resampling mode.

## Concurrency model

There are four nested layers of concurrency in a chunk read.

**Dask (chunk level).** When a dask-backed DataArray is computed, dask dispatches each chunk task to a worker thread. Each worker thread calls `_sync_getitem()` in `_backend.py`, which calls `_run_coroutine(_async_getitem(...))` to drive all time steps from the persistent per-thread background loop. Worker threads are independent — they share no state except the thread-local store cache in `_store.py` and the thread-local DuckDB clients in `_backend.py`.

**asyncio (time + item level).** A single event loop call (via `_run_coroutine`) handles the entire chunk. Inside `_async_getitem`, `asyncio.gather` fans out one `_run_one_date` coroutine per time step, so all time steps are in flight concurrently within the same event loop. DuckDB queries run on `_DUCKDB_EXECUTOR` (a dedicated two-thread pool) via `run_in_executor`, yielding the event loop during each query. DuckDB's internal mutex serialises actual DB access, so queries are safe but not parallel on a single `DuckdbClient`. Once a query returns, its mosaic coroutine proceeds immediately and COG reads for all time steps overlap in the event loop's I/O layer. Because all time steps share a single event loop and therefore a single bounded reprojection executor, the reprojection thread count stays at `get_max_workers()` regardless of how many time steps are in the chunk (no thread explosion). The `warp_cache` is shared across coroutines: `compute_warp_map` is deterministic, so concurrent writes are safe.
**asyncio (time + item level).** A single event loop call (via `_run_coroutine`) handles the entire chunk. Inside `_async_getitem`, `asyncio.gather` fans out one `_run_one_date` coroutine per time step, so all time steps are in flight concurrently within the same event loop. DuckDB queries run on `_DUCKDB_EXECUTOR` (a dedicated two-thread pool) via `run_in_executor`, yielding the event loop during each query. DuckDB's internal mutex serialises actual DB access, so queries are safe but not parallel on a single `DuckdbClient`. Once a query returns, its mosaic coroutine proceeds immediately and COG reads for all time steps overlap in the event loop's I/O layer. Because all time steps share a single event loop and therefore a single bounded reprojection executor, the reprojection thread count stays at `get_max_workers()` regardless of how many time steps are in the chunk (no thread explosion).

**asyncio (item level).** Inside a time step's event loop, `read_chunk_async` launches one `_read_item_band()` task per overlapping item up front, with an `asyncio.Semaphore(max_concurrent_reads)` (configurable via `open()`, default 32) capping how many reads run concurrently. Tasks complete in I/O arrival order, but results are buffered by their original list index and drained into the mosaic in source-list order. This preserves the sort contract for `FirstMethod` — items are fed strictly in the order returned by DuckDB (i.e. the caller's `sortby` order) regardless of which COGs arrive first over the network, while all concurrent I/O remains in flight. COG header reads and tile fetches from `async-geotiff` are all awaitable, so the event loop multiplexes them without blocking. Early exit is preserved: once the mosaic method signals completion, remaining tasks are cancelled in a `finally` block, and items still waiting on the semaphore never start.

**Thread pool (CPU work per item).** `_apply_bands_with_warp_cache` is synchronous CPU-bound work that processes all bands for one item together. `_read_item_band` dispatches it via `loop.run_in_executor(None, ...)` — one executor call per item — so the event loop stays free to process other items' tile reads while reprojections run on threads. Because the call is coarse-grained (all bands per item) and GIL-releasing (`pyproj` and numpy both release during heavy inner loops), offloading to the thread pool gives real CPU parallelism without excessive submission overhead.
**Thread pool (CPU work per item).** `_reproject_bands` is synchronous CPU-bound work that processes all bands for one item together. `_read_item_band` dispatches it via `loop.run_in_executor(None, ...)` — one executor call per item — so the event loop stays free to process other items' tile reads while reprojections run on threads. Because the call is coarse-grained and rust-warp does the heavy inner loops off the event loop, offloading to the thread pool gives real CPU parallelism without excessive submission overhead.

**Why threads, not a process pool.** `pyproj.Transformer.transform()` and numpy's fancy-indexing both release the GIL during their heavy inner loops. Threads therefore give real CPU parallelism here — not just interleaving — without the overhead of process spawning and array pickling that a `ProcessPoolExecutor` would require.

**Why reprojection is memory-bandwidth-bound, not compute-bound.** `compute_warp_map` builds two meshgrids the size of the output chunk, transforms all coordinates in one vectorised call, and produces large index arrays. `apply_warp_map` samples the source array with random-access fancy indexing (`out[:, valid] = data[:, row_idx[valid], col_idx[valid]]`), which produces near-constant cache misses. Both phases are dominated by memory latency and bandwidth rather than arithmetic. In practice this means CPU utilisation is low (threads stall waiting for memory), and adding more than 4 concurrent reprojection threads provides no throughput benefit — they saturate the memory bus instead.
**Why reprojection is still memory-bandwidth-sensitive.** Even with rust-warp handling the reprojection kernel, chunk reprojection moves large raster windows through memory and writes full destination tiles. In practice this means throughput still stops improving after a small number of concurrent reprojection threads, so the default executor cap remains conservative.

**Bounded per-loop executor.** Rather than using Python's default `min(32, cpu_count + 4)` thread count, `_get_or_create_background_loop()` installs a bounded `ThreadPoolExecutor` (default `min(os.cpu_count(), 4)`) as the default executor on each background loop it creates, before any coroutines run. This caps thread count per loop while preserving per-loop isolation: each dask task has its own independent pool and does not queue behind other tasks. The executor is shut down when the background loop thread exits. Call `lazycogs.set_reproject_workers(n)` to change the per-loop bound (see `_executor.py`).

Expand Down Expand Up @@ -221,7 +215,7 @@ These are copied from `rio-tiler` (MIT licence, zero GDAL imports) to avoid pull
Combining `time_period` with a mosaic method is the idiomatic way to produce
temporal composites. Setting `time_period="P1W"` groups every STAC item within
the same ISO calendar week into a single time step. When a chunk is read,
`async_mosaic_chunk` feeds all items for that week to the mosaic method in
`read_chunk_async()` feeds all items for that week to the mosaic method in
order. With `FirstMethod` (the default), reading stops as soon as every output
pixel has a valid (non-nodata) value — the remaining items in the week are
never fetched.
Expand Down Expand Up @@ -264,7 +258,8 @@ When the store root does not align with the URL structure of the asset HREFs —
| `arro3-core` | Zero-copy Arrow table output from DuckDB queries (installed via `rustac[arrow]`) |
| `async-geotiff` | Async COG header reads and windowed tile reads (Rust, no GDAL) |
| `obstore` | Cloud object store abstraction layer for async-geotiff |
| `pyproj` | CRS transforms: bbox reprojection, warp map generation |
| `rust-warp` | Reprojection backend, currently sourced from GitHub |
| `pyproj` | CRS transforms: bbox reprojection and target-resolution estimation |
| `xarray` | DataArray / Dataset assembly, `BackendArray` / `LazilyIndexedArray` protocol |
| `rasterix` | CRS-aware `RasterIndex` for lazy spatial coordinates |
| `xproj` | CRS accessor and alignment for xarray Flexible Indexes |
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Benchmarks live in `tests/benchmarks/` and are excluded from the default test ru
uv run python scripts/prepare_benchmark_data.py
```

This queries the Element84 Earth Search STAC API, downloads a small set of COG assets to `.benchmark_data/`, and writes local parquet index files. Pass `--overwrite` to re-download if needed.
This queries the Element84 Earth Search STAC API, downloads a small set of COG assets to `.benchmark_data/`, and writes local parquet index files. The parquet files are always refreshed to point at the current checkout's local `.benchmark_data/cogs/` paths, so rerunning the script fixes stale `file://` HREFs after moving the repo. Pass `--overwrite` to force re-downloading the raw query and COG files.

Once the data is in place:

Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ One constraint worth naming: lazycogs only reads Cloud Optimized GeoTIFFs. If yo
| STAC search + spatial indexing | `rustac` (DuckDB + geoparquet) |
| COG I/O | `async-geotiff` (Rust, no GDAL) |
| Cloud storage | `obstore` |
| Reprojection | `pyproj` + numpy |
| Reprojection | `rust-warp` via `lazycogs.ResamplingMethod` |
| Lazy dataset construction | xarray `BackendEntrypoint` + `LazilyIndexedArray` |

## Installation
Expand All @@ -33,6 +33,8 @@ One constraint worth naming: lazycogs only reads Cloud Optimized GeoTIFFs. If yo
pip install lazycogs
```

`rust-warp` is currently pinned from GitHub via uv pending a stable release flow for this dependency.

## Coordinate convention

`lazycogs.open()` returns a DataArray whose `y` coordinates follow the standard
Expand Down Expand Up @@ -73,7 +75,10 @@ da = lazycogs.open(
bbox=dst_bbox,
crs=dst_crs,
resolution=10.0,
resampling=lazycogs.ResamplingMethod.NEAREST,
)

# Other supported modes: ResamplingMethod.BILINEAR and ResamplingMethod.CUBIC
```

### Async loading
Expand Down
Loading
Loading