Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ local*.*
!local_spark_session.py
!local_file_system_storage_configuration.py
!local_cred_utils.py
!src/corvus_python/storage/local_file_storage.py
test-reports/
pytest-test-results.xml
behave-test-results.xml
Expand Down
125 changes: 125 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,131 @@ except EmailError as e:

---

### `storage`

Provides storage configuration abstractions for data lake operations, with implementations for local and Azure Data Lake Gen2 storage.

| Component Name | Object Type | Description | Import syntax |
|---|---|---|---|
| <code>DataLakeLayer</code> | Enum | Enumeration of data lake layers: `BRONZE`, `SILVER`, `GOLD`. | <code>from corvus_python.storage import DataLakeLayer</code> |
| <code>StorageConfiguration</code> | Class (abstract) | Base class for storage configurations. Provides a `get_full_path()` method and `storage_options` dict. | <code>from corvus_python.storage import StorageConfiguration</code> |
| <code>LocalFileSystemStorageConfiguration</code> | Class | Storage configuration backed by the local file system. Useful for local development. | <code>from corvus_python.storage import LocalFileSystemStorageConfiguration</code> |
| <code>AzureDataLakeFileSystemPerLayerConfiguration</code> | Class | ADLS Gen2 configuration where each data lake layer maps to a separate file system (`bronze`, `silver`, `gold`). | <code>from corvus_python.storage import AzureDataLakeFileSystemPerLayerConfiguration</code> |
| <code>AzureDataLakeSingleFileSystemConfiguration</code> | Class | ADLS Gen2 configuration using a single file system with top-level folders for each layer. | <code>from corvus_python.storage import AzureDataLakeSingleFileSystemConfiguration</code> |

#### Usage Example

```python
from corvus_python.storage import (
DataLakeLayer,
LocalFileSystemStorageConfiguration,
AzureDataLakeFileSystemPerLayerConfiguration,
AzureDataLakeSingleFileSystemConfiguration,
)

# Local filesystem (for development)
local_config = LocalFileSystemStorageConfiguration(base_path="./data")
path = local_config.get_full_path(DataLakeLayer.BRONZE, "my_database/my_table")
# -> ./data/bronze/my_database/my_table

# Azure Data Lake - separate file system per layer
adls_per_layer = AzureDataLakeFileSystemPerLayerConfiguration(
storage_account_name="mystorageaccount",
storage_options={"account_key": "..."},
)
path = adls_per_layer.get_full_path(DataLakeLayer.SILVER, "my_database/my_table")
# -> abfss://silver@mystorageaccount.dfs.core.windows.net/my_database/my_table

# Azure Data Lake - single file system
adls_single = AzureDataLakeSingleFileSystemConfiguration(
storage_account_name="mystorageaccount",
file_system_name="datalake",
)
path = adls_single.get_full_path(DataLakeLayer.GOLD, "my_database/my_table")
# -> abfss://datalake@mystorageaccount.dfs.core.windows.net/gold/my_database/my_table
```

---

### `repositories`

Provides repository classes for reading and writing structured data across various storage backends, built on [Polars](https://pola.rs/). All repositories accept a `StorageConfiguration` to abstract over local and cloud storage.

#### Supporting Data Classes

| Component Name | Object Type | Description | Import syntax |
|---|---|---|---|
| <code>DatabaseDefinition</code> | Dataclass | Defines a logical database by name and a list of `TableDefinition` instances. | <code>from corvus_python.repositories import DatabaseDefinition</code> |
| <code>TableDefinition</code> | Dataclass | Defines a table by name, Pandera schema, optional `title`, and optional `db_schema` (the SQL schema name, e.g. `dbo`, used when creating SQL views over the table in Synapse or Fabric). | <code>from corvus_python.repositories import TableDefinition</code> |

#### `PolarsDeltaTableRepository`

Manages Delta Lake tables within a specified data lake layer. Handles schema validation using Pandera and integrates with OpenTelemetry for tracing.

```python
from corvus_python.repositories import PolarsDeltaTableRepository, DatabaseDefinition, TableDefinition
from corvus_python.storage import LocalFileSystemStorageConfiguration, DataLakeLayer
```

| Method | Description |
|---|---|
| `read_data(table_name)` | Reads a Delta table into a `LazyFrame`. Returns `None` if the table is empty. |
| `overwrite_table(table_name, data, overwrite_schema=False)` | Overwrites the table after eagerly validating the full dataset against its Pandera schema. |
| `overwrite_table_lazy(table_name, data, overwrite_schema=False)` | Overwrites the table using streaming execution. Performs schema-level validation only. |
| `overwrite_table_with_condition(table_name, data, predicate, overwrite_schema=False)` | Overwrites only rows matching the given predicate (e.g. for partition-level updates). |
| `append_to_table(table_name, data)` | Appends data to an existing Delta table. |

#### `PolarsCsvDataRepository`

Reads CSV files from a hive-partitioned path (`snapshot_time=<timestamp>/<name>.csv`).

```python
from corvus_python.repositories import PolarsCsvDataRepository
```

| Method | Description |
|---|---|
| `load_csv(object_name, snapshot_timestamp, include_file_paths=None)` | Loads a CSV file into a `DataFrame`. Strips `.csv` suffix from `object_name` if present. |

#### `PolarsExcelDataRepository`

Reads Excel workbooks from a hive-partitioned path, returning all sheets as a `dict[str, DataFrame]`.

```python
from corvus_python.repositories import PolarsExcelDataRepository
```

| Method | Description |
|---|---|
| `load_excel(snapshot_timestamp, workbook_name, relative_path=None)` | Loads all sheets from an `.xlsx` workbook into a dict keyed by sheet name. |

#### `PolarsNdJsonDataRepository`

Reads Newline Delimited JSON (NDJSON) files from a partitioned path (`<load_type>/snapshot_time=<timestamp>/<name>.json`).

```python
from corvus_python.repositories import PolarsNdJsonDataRepository
```

| Method | Description |
|---|---|
| `load_ndjson(object_name, load_type, snapshot_timestamp, include_file_paths=None, schema_overrides=None, schema=None)` | Loads an NDJSON file into a `DataFrame`. Supports schema overrides and full schema specification. |

#### `PolarsAzureTableRepository`

Queries Azure Table Storage, returning results as Polars DataFrames. Authenticates using `DefaultAzureCredential`.

```python
from corvus_python.repositories import PolarsAzureTableRepository
```

| Method | Description |
|---|---|
| `query(table_name, query_filter, parameters, schema=None)` | Queries an Azure Table with an OData filter and named parameters. |
| `get_entities_partition_key_starts_with(table_name, partition_key_prefix, schema=None)` | Retrieves all entities whose `PartitionKey` starts with the given prefix. |

---

### `sql`

Includes utility functions for working with SQL databases via pyodbc, with AAD token-based authentication. Provides helpers for connecting to Synapse serverless SQL and Fabric SQL Analytics endpoints, managing views over Delta Lake tables, and executing DDL statements.
Expand Down
Loading
Loading