Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,115 @@ except EmailError as e:
- The `from_email` must use a configured MailFrom address from your ACS resource
- Set `email_sending_disabled=True` during development to prevent actual emails from being sent
- Attachments must be base64-encoded before adding to the `EmailAttachment` object

---

### `sql`

Includes utility functions for working with SQL databases via pyodbc, with AAD token-based authentication. Provides helpers for connecting to Synapse serverless SQL and Fabric SQL Analytics endpoints, managing views over Delta Lake tables, and executing DDL statements.

**⚠️ Note: This module requires ODBC Driver 18 for SQL Server or later.**

| Component Name | Object Type | Description | Import syntax |
|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
| <code>get_pyodbc_connection</code> | Function | Opens a pyodbc connection to a Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. | <code>from corvus_python.sql import get_pyodbc_connection</code> |
| <code>get_pyodbc_connection_with_token</code> | Function | Opens a pyodbc connection using a pre-acquired AAD token. Useful when running inside Synapse notebooks. | <code>from corvus_python.sql import get_pyodbc_connection_with_token</code> |
| <code>execute_ddl</code> | Function | Executes a DDL statement (e.g. CREATE VIEW) using a pyodbc connection. | <code>from corvus_python.sql import execute_ddl</code> |
| <code>create_or_alter_view_over_delta_table</code> | Function | Creates or alters a SQL view over a Delta Lake table, with support for inferred or explicit column types. | <code>from corvus_python.sql import create_or_alter_view_over_delta_table</code> |
| <code>drop_views_in_schema</code> | Function | Drops all views in a given schema. | <code>from corvus_python.sql import drop_views_in_schema</code> |
| <code>SelectColumn</code> | Class | Dataclass representing a column to select, with an optional display title. | <code>from corvus_python.sql import SelectColumn</code> |
| <code>WithColumn</code> | Class | Dataclass representing a column with an explicit type for use in OPENROWSET WITH clauses. | <code>from corvus_python.sql import WithColumn</code> |

#### `sql.synapse`

Convenience wrappers specific to Synapse serverless SQL endpoints.

| Component Name | Object Type | Description | Import syntax |
|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
| <code>get_synapse_sql_pyodbc_connection</code> | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using AAD tokens. Builds the server URL from the workspace name. | <code>from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection</code> |
| <code>get_synapse_sql_pyodbc_connection_with_token</code> | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using a pre-acquired AAD token. Ideal for Synapse notebooks. | <code>from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token</code> |
| <code>create_database_if_not_exists</code> | Function | Creates a database if it doesn't already exist. Requires a connection to the master database. | <code>from corvus_python.sql.synapse import create_database_if_not_exists</code> |

#### `sql.fabric`

Convenience wrappers specific to Fabric SQL Analytics endpoints.

| Component Name | Object Type | Description | Import syntax |
|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
| <code>get_fabric_sql_pyodbc_connection</code> | Function | Opens a pyodbc connection to a Fabric SQL Analytics endpoint using AAD tokens. | <code>from corvus_python.sql.fabric import get_fabric_sql_pyodbc_connection</code> |

#### Usage Example

```python
from corvus_python.sql import (
get_pyodbc_connection,
create_or_alter_view_over_delta_table,
drop_views_in_schema,
SelectColumn,
WithColumn,
)

# Connect using DefaultAzureCredential
conn = get_pyodbc_connection(
server="myworkspace-ondemand.sql.azuresynapse.net",
database="my_database",
use_managed_identity=False,
)

# Create a view with inferred types
create_or_alter_view_over_delta_table(
conn=conn,
schema_name="dbo",
view_name="my_view",
delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table",
infer_types=True,
select_columns=[
SelectColumn(name="id"),
SelectColumn(name="full_name", title="Name"),
],
)

# Create a view with explicit types
create_or_alter_view_over_delta_table(
conn=conn,
schema_name="dbo",
view_name="my_typed_view",
delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table",
infer_types=False,
with_columns=[
WithColumn(name="id", type="INT"),
WithColumn(name="full_name", type="VARCHAR(200)", title="Name"),
],
)

# Drop all views in a schema
drop_views_in_schema(conn, schema_name="dbo")
```

Or use the Synapse/Fabric-specific helpers:

```python
from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection

conn = get_synapse_sql_pyodbc_connection(
workspace_name="myworkspace",
database="my_database",
use_managed_identity=False,
)
```

#### Usage from a Synapse Notebook

When running inside a Synapse notebook, you can use `mssparkutils` to acquire a token and pass it directly:

```python
from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token

token = mssparkutils.credentials.getToken("DW")

conn = get_synapse_sql_pyodbc_connection_with_token(
workspace_name="myworkspace",
database="my_database",
token=token,
)
```
74 changes: 18 additions & 56 deletions src/corvus_python/pyspark/synapse/sync_tables_locally.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from dataclasses import dataclass
from typing import List, Tuple
import pyodbc
import struct
from pyspark.sql import SparkSession

from corvus_python.pyspark.utilities import LocalSparkSessionConfig, get_or_create_spark_session
Expand All @@ -18,41 +16,11 @@ class ObjectSyncDetails:
database_name (str): Name of the database.
tables (List[str]): List of tables in the database.
"""

database_name: str
tables: List[str]


def _get_pyodbc_connection(workspace_name: str) -> pyodbc.Connection:
"""Gets an ODBC connection to the SQL Serverless endpoint of a Synapse workspace.

Args:
workspace_name (str): Name of the workspace.

Returns:
pyodbc.Connection: ODBC connection to the SQL Serverless endpoint.

Raises:
RuntimeError: If user is not logged into the Azure CLI.

Note:
The connection object returned can be used in a pandas read_sql query to pull data from Synapse. E.g.:
df = pd.read_sql(f'SELECT * FROM db_name.dbo.table_name', conn)
"""
server = f'{workspace_name}-ondemand.sql.azuresynapse.net'
database = 'master'
driver = '{ODBC Driver 17 for SQL Server}'
connection_string = f'Driver={driver};Server=tcp:{server},1433;\
Database={database};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'

token = get_az_cli_token('https://database.windows.net/.default')

token_bytes = token.encode("UTF-16-LE")
token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)

conn = pyodbc.connect(connection_string, attrs_before={1256: token_struct})
return conn


def _get_jdbc_connection_properties(workspace_name: str) -> Tuple[str, dict]:
"""Gets the Spark JDBC connection properties for a Synapse SQL Serverless endpoint.

Expand All @@ -65,28 +33,25 @@ def _get_jdbc_connection_properties(workspace_name: str) -> Tuple[str, dict]:
Raises:
RuntimeError: If user is not logged into the Azure CLI.
"""
server = f'{workspace_name}-ondemand.sql.azuresynapse.net'
database = 'master'
server = f"{workspace_name}-ondemand.sql.azuresynapse.net"
database = "master"

jdbc_url = f"jdbc:sqlserver://{server}:1433;database={database};encrypt=true;\
trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30"

token = get_az_cli_token('https://database.windows.net/.default')
token = get_az_cli_token("https://database.windows.net/.default")

connection_properties = {
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"accessToken": token
}
connection_properties = {"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "accessToken": token}

return jdbc_url, connection_properties


def sync_synapse_tables_to_local_spark(
workspace_name: str,
object_sync_details: List[ObjectSyncDetails],
overwrite: bool = False,
spark: SparkSession = None
):
workspace_name: str,
object_sync_details: List[ObjectSyncDetails],
overwrite: bool = False,
spark: SparkSession = None,
):
"""Syncs tables from a Synapse workspace to a local Spark metastore.

Args:
Expand All @@ -111,20 +76,17 @@ def sync_synapse_tables_to_local_spark(
table_exists = spark.catalog.tableExists(table, osd.database_name)

if table_exists and not overwrite:
print('\033[93m' + f"Table '{table}' in database '{osd.database_name}' already exists and \
overwrite is set to False. Skipping table sync." + '\033[0m')
print(
"\033[93m"
+ f"Table '{table}' in database '{osd.database_name}' already exists and \
overwrite is set to False. Skipping table sync."
+ "\033[0m"
)
continue
else:
spark.read.jdbc(
url=jdbc_url,
table=f"{osd.database_name}.dbo.{table}",
properties=connection_properties
) \
.coalesce(1) \
.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable(f"{osd.database_name}.{table}")
url=jdbc_url, table=f"{osd.database_name}.dbo.{table}", properties=connection_properties
).coalesce(1).write.format("delta").mode("overwrite").saveAsTable(f"{osd.database_name}.{table}")

if not existing_spark_session:
spark.stop()
19 changes: 19 additions & 0 deletions src/corvus_python/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from .sql_utils import (
execute_ddl,
get_pyodbc_connection,
get_pyodbc_connection_with_token,
create_or_alter_view_over_delta_table,
drop_views_in_schema,
SelectColumn,
WithColumn,
)

__all__ = [
"execute_ddl",
"get_pyodbc_connection",
"get_pyodbc_connection_with_token",
"create_or_alter_view_over_delta_table",
"drop_views_in_schema",
"SelectColumn",
"WithColumn",
]
5 changes: 5 additions & 0 deletions src/corvus_python/sql/fabric/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .fabric_sql_utils import get_fabric_sql_pyodbc_connection

__all__ = [
"get_fabric_sql_pyodbc_connection",
]
29 changes: 29 additions & 0 deletions src/corvus_python/sql/fabric/fabric_sql_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pyodbc
from ..sql_utils import get_pyodbc_connection


def get_fabric_sql_pyodbc_connection(
sql_analytics_endpoint: str,
database: str,
use_managed_identity: bool = True,
client_id: str | None = None,
) -> pyodbc.Connection:
"""Open a pyodbc connection to Fabric SQL Analytics endpoint using AAD tokens.

Requires ODBC Driver 18 for SQL Server or later.

Parameters
----------
sql_analytics_endpoint:
The Fabric SQL Analytics endpoint, e.g. "<id>.datawarehouse.fabric.microsoft.com".
database:
The database name.
use_managed_identity:
If True, uses ManagedIdentityCredential (for Container Apps / ACI).
If False, uses DefaultAzureCredential (for local dev — picks up
az login, VS Code, environment variables, etc.).
client_id:
Optional user-assigned managed identity client ID. Only used when
use_managed_identity is True.
"""
return get_pyodbc_connection(sql_analytics_endpoint, database, use_managed_identity, client_id)
Loading
Loading