From e7ae471505220bc5fcf860750ff0d4a38d86d214 Mon Sep 17 00:00:00 2001 From: Mike Evans-Larah Date: Tue, 31 Mar 2026 10:41:52 +0000 Subject: [PATCH 1/4] Implement SQL utilities for Fabric and Synapse, including connection handling and view management --- .../pyspark/synapse/sync_tables_locally.py | 74 ++---- src/corvus_python/sql/__init__.py | 17 ++ src/corvus_python/sql/fabric/__init__.py | 5 + .../sql/fabric/fabric_sql_utils.py | 29 +++ src/corvus_python/sql/sql_utils.py | 227 ++++++++++++++++++ src/corvus_python/sql/synapse/__init__.py | 6 + .../sql/synapse/synapse_sql_utils.py | 58 +++++ tests/unit/test_sql_utils.py | 170 +++++++++++++ 8 files changed, 530 insertions(+), 56 deletions(-) create mode 100644 src/corvus_python/sql/__init__.py create mode 100644 src/corvus_python/sql/fabric/__init__.py create mode 100644 src/corvus_python/sql/fabric/fabric_sql_utils.py create mode 100644 src/corvus_python/sql/sql_utils.py create mode 100644 src/corvus_python/sql/synapse/__init__.py create mode 100644 src/corvus_python/sql/synapse/synapse_sql_utils.py create mode 100644 tests/unit/test_sql_utils.py diff --git a/src/corvus_python/pyspark/synapse/sync_tables_locally.py b/src/corvus_python/pyspark/synapse/sync_tables_locally.py index df8ffd5..b985e0f 100644 --- a/src/corvus_python/pyspark/synapse/sync_tables_locally.py +++ b/src/corvus_python/pyspark/synapse/sync_tables_locally.py @@ -2,8 +2,6 @@ from dataclasses import dataclass from typing import List, Tuple -import pyodbc -import struct from pyspark.sql import SparkSession from corvus_python.pyspark.utilities import LocalSparkSessionConfig, get_or_create_spark_session @@ -18,41 +16,11 @@ class ObjectSyncDetails: database_name (str): Name of the database. tables (List[str]): List of tables in the database. """ + database_name: str tables: List[str] -def _get_pyodbc_connection(workspace_name: str) -> pyodbc.Connection: - """Gets an ODBC connection to the SQL Serverless endpoint of a Synapse workspace. - - Args: - workspace_name (str): Name of the workspace. - - Returns: - pyodbc.Connection: ODBC connection to the SQL Serverless endpoint. - - Raises: - RuntimeError: If user is not logged into the Azure CLI. - - Note: - The connection object returned can be used in a pandas read_sql query to pull data from Synapse. E.g.: - df = pd.read_sql(f'SELECT * FROM db_name.dbo.table_name', conn) - """ - server = f'{workspace_name}-ondemand.sql.azuresynapse.net' - database = 'master' - driver = '{ODBC Driver 17 for SQL Server}' - connection_string = f'Driver={driver};Server=tcp:{server},1433;\ - Database={database};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' - - token = get_az_cli_token('https://database.windows.net/.default') - - token_bytes = token.encode("UTF-16-LE") - token_struct = struct.pack(f' Tuple[str, dict]: """Gets the Spark JDBC connection properties for a Synapse SQL Serverless endpoint. @@ -65,28 +33,25 @@ def _get_jdbc_connection_properties(workspace_name: str) -> Tuple[str, dict]: Raises: RuntimeError: If user is not logged into the Azure CLI. """ - server = f'{workspace_name}-ondemand.sql.azuresynapse.net' - database = 'master' + server = f"{workspace_name}-ondemand.sql.azuresynapse.net" + database = "master" jdbc_url = f"jdbc:sqlserver://{server}:1433;database={database};encrypt=true;\ trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30" - token = get_az_cli_token('https://database.windows.net/.default') + token = get_az_cli_token("https://database.windows.net/.default") - connection_properties = { - "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", - "accessToken": token - } + connection_properties = {"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "accessToken": token} return jdbc_url, connection_properties def sync_synapse_tables_to_local_spark( - workspace_name: str, - object_sync_details: List[ObjectSyncDetails], - overwrite: bool = False, - spark: SparkSession = None - ): + workspace_name: str, + object_sync_details: List[ObjectSyncDetails], + overwrite: bool = False, + spark: SparkSession = None, +): """Syncs tables from a Synapse workspace to a local Spark metastore. Args: @@ -111,20 +76,17 @@ def sync_synapse_tables_to_local_spark( table_exists = spark.catalog.tableExists(table, osd.database_name) if table_exists and not overwrite: - print('\033[93m' + f"Table '{table}' in database '{osd.database_name}' already exists and \ -overwrite is set to False. Skipping table sync." + '\033[0m') + print( + "\033[93m" + + f"Table '{table}' in database '{osd.database_name}' already exists and \ +overwrite is set to False. Skipping table sync." + + "\033[0m" + ) continue else: spark.read.jdbc( - url=jdbc_url, - table=f"{osd.database_name}.dbo.{table}", - properties=connection_properties - ) \ - .coalesce(1) \ - .write \ - .format("delta") \ - .mode("overwrite") \ - .saveAsTable(f"{osd.database_name}.{table}") + url=jdbc_url, table=f"{osd.database_name}.dbo.{table}", properties=connection_properties + ).coalesce(1).write.format("delta").mode("overwrite").saveAsTable(f"{osd.database_name}.{table}") if not existing_spark_session: spark.stop() diff --git a/src/corvus_python/sql/__init__.py b/src/corvus_python/sql/__init__.py new file mode 100644 index 0000000..db88150 --- /dev/null +++ b/src/corvus_python/sql/__init__.py @@ -0,0 +1,17 @@ +from .sql_utils import ( + execute_ddl, + get_pyodbc_connection, + create_or_alter_view_over_delta_table, + drop_views_in_schema, + SelectColumn, + WithColumn, +) + +__all__ = [ + "execute_ddl", + "get_pyodbc_connection", + "create_or_alter_view_over_delta_table", + "drop_views_in_schema", + "SelectColumn", + "WithColumn", +] diff --git a/src/corvus_python/sql/fabric/__init__.py b/src/corvus_python/sql/fabric/__init__.py new file mode 100644 index 0000000..c92cfa9 --- /dev/null +++ b/src/corvus_python/sql/fabric/__init__.py @@ -0,0 +1,5 @@ +from .fabric_sql_utils import get_fabric_sql_pyodbc_connection + +__all__ = [ + "get_fabric_sql_pyodbc_connection", +] diff --git a/src/corvus_python/sql/fabric/fabric_sql_utils.py b/src/corvus_python/sql/fabric/fabric_sql_utils.py new file mode 100644 index 0000000..334e7f3 --- /dev/null +++ b/src/corvus_python/sql/fabric/fabric_sql_utils.py @@ -0,0 +1,29 @@ +import pyodbc +from ..sql_utils import get_pyodbc_connection + + +def get_fabric_sql_pyodbc_connection( + sql_analytics_endpoint: str, + database: str, + use_managed_identity: bool = True, + client_id: str | None = None, +) -> pyodbc.Connection: + """Open a pyodbc connection to Fabric SQL Analytics endpoint using AAD tokens. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + sql_analytics_endpoint: + The Fabric SQL Analytics endpoint, e.g. ".datawarehouse.fabric.microsoft.com". + database: + The database name. + use_managed_identity: + If True, uses ManagedIdentityCredential (for Container Apps / ACI). + If False, uses DefaultAzureCredential (for local dev — picks up + az login, VS Code, environment variables, etc.). + client_id: + Optional user-assigned managed identity client ID. Only used when + use_managed_identity is True. + """ + return get_pyodbc_connection(sql_analytics_endpoint, database, use_managed_identity, client_id) diff --git a/src/corvus_python/sql/sql_utils.py b/src/corvus_python/sql/sql_utils.py new file mode 100644 index 0000000..57b99df --- /dev/null +++ b/src/corvus_python/sql/sql_utils.py @@ -0,0 +1,227 @@ +from dataclasses import dataclass +from typing import List + +import pyodbc +import struct +from azure.identity import DefaultAzureCredential, ManagedIdentityCredential + + +def execute_ddl( + conn: pyodbc.Connection, + ddl: str, +) -> None: + """Execute a DDL statement (e.g. CREATE VIEW) using a pyodbc connection.""" + with conn.cursor() as cursor: + cursor.execute(ddl) + conn.commit() + + +def get_pyodbc_connection( + server: str, + database: str, + use_managed_identity: bool = True, + client_id: str | None = None, +) -> pyodbc.Connection: + """Open a pyodbc connection to Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + server: + The Synapse serverless endpoint or Fabric SQL Analytics endpoint, + e.g. "myworkspace-ondemand.sql.azuresynapse.net" + or ".datawarehouse.fabric.microsoft.com". + database: + The database name. + use_managed_identity: + If True, uses ManagedIdentityCredential (for Container Apps / ACI). + If False, uses DefaultAzureCredential (for local dev — picks up + az login, VS Code, environment variables, etc.). + client_id: + Optional user-assigned managed identity client ID. Only used when + use_managed_identity is True. + """ + # 1. Acquire an AAD token for the Azure SQL resource + resource = "https://database.windows.net/.default" + + if use_managed_identity: + credential = ManagedIdentityCredential(client_id=client_id) + else: + credential = DefaultAzureCredential() + + token = credential.get_token(resource) + + # 2. Pack the token into the bytes format pyodbc expects for + # SQL_COPT_SS_ACCESS_TOKEN (driver-level AAD token injection). + token_bytes = token.token.encode("utf-16-le") + token_struct = struct.pack(f" str: + """ + Generate the DDL statement to create or alter a serverless SQL view over a Delta Lake table. + + + Parameters + ---------- + schema_name: + The schema for the view, e.g. "dbo". + view_name: + The name of the view to create or alter. + delta_table_path: + The path to the Delta Lake table, e.g. "abfss://container@storageaccount.dfs.core.windows.net/path/to/table". + infer_types: + If True, the view will be created with "SELECT [col] AS [title],..." and Synapse will infer the column types + from the data. + If False, the view will be created with "SELECT * FROM OPENROWSET(...) WITH ([col] type, ...)" and the column + types will be determined by the caller and passed in via with_columns. + select_columns: + If infer_types is True, the list of columns to select from the Delta table, along with optional titles for the + view. If a title is not provided for a column, the original column name will be used in the view. + with_columns: + If infer_types is False, the list of columns and their types to define in the WITH clause of the OPENROWSET + statement. The column names should match the columns in the Delta table, and the types should be valid + Synapse SQL types (e.g. "INT", "VARCHAR(100)", "DATETIME2", etc.). + """ + + if infer_types: + if not select_columns: + raise ValueError("select_columns must be provided when infer_types is True") + + col_def_list: list[str] = [] + for column in select_columns: + if column.title: + col_def_list.append(f"[{column.name}] AS [{column.title}]") + else: + col_def_list.append(f"[{column.name}]") + + select_clause = ",\n ".join(col_def_list) + with_clause = "" + else: + select_clause = "*" + with_column_list: list[str] = [] + if not with_columns: + raise ValueError("with_columns must be provided when infer_types is False") + for column in with_columns: + with_column_list.append(f"[{column.title or column.name}] {column.type} '$.{column.name}'") + with_clause = ",\n ".join(with_column_list) + with_clause = f"\n WITH (\n {with_clause}\n )" + + bulk_clause = f"BULK '{delta_table_path}',\n" f"FORMAT = 'DELTA'" + + ddl = f"""\ +CREATE OR ALTER VIEW [{schema_name}].[{view_name}] +AS +SELECT + {select_clause} +FROM OPENROWSET( + {bulk_clause} + ){with_clause} AS [result]""" + + return ddl + + +def create_or_alter_view_over_delta_table( + conn: pyodbc.Connection, + schema_name: str, + view_name: str, + delta_table_path: str, + infer_types: bool = True, + select_columns: List[SelectColumn] | None = None, + with_columns: List[WithColumn] | None = None, +) -> None: + """ + Create or alter a serverless SQL view over a Delta Lake table. + + + Parameters + ---------- + conn: + An open pyodbc connection to the Synapse workspace. + schema_name: + The schema for the view, e.g. "dbo". + view_name: + The name of the view to create or alter. + delta_table_path: + The path to the Delta Lake table, e.g. "abfss://container@storageaccount.dfs.core.windows.net/path/to/table". + infer_types: + If True, the view will be created with "SELECT [col] AS [title],..." and Synapse will infer the column types + from the data. + If False, the view will be created with "SELECT * FROM OPENROWSET(...) WITH ([col] type, ...)" and the column + types will be determined by the caller and passed in via with_columns. + select_columns: + If infer_types is True, the list of columns to select from the Delta table, along with optional titles for the + view. If a title is not provided for a column, the original column name will be used in the view. + with_columns: + If infer_types is False, the list of columns and their types to define in the WITH clause of the OPENROWSET + statement. The column names should match the columns in the Delta table, and the types should be valid + Synapse SQL types (e.g. "INT", "VARCHAR(100)", "DATETIME2", etc.). + """ + + ddl = generate_create_or_alter_view_over_delta_table_ddl( + schema_name, view_name, delta_table_path, infer_types, select_columns, with_columns + ) + + execute_ddl(conn, ddl) + + +def drop_views_in_schema( + conn: pyodbc.Connection, + schema_name: str = "dbo", +) -> None: + """ + Drop all views in a given schema. + + Parameters + ---------- + conn: + An open pyodbc connection to the Synapse workspace. + schema_name: + The schema from which to drop all views, e.g. "dbo". + """ + cursor = conn.cursor() + cursor.execute( + f""" + SELECT TABLE_NAME + FROM INFORMATION_SCHEMA.VIEWS + WHERE TABLE_SCHEMA = '{schema_name}' + """ + ) + views = cursor.fetchall() + for view in views: + view_name = view[0] + drop_ddl = f"DROP VIEW [{schema_name}].[{view_name}]" + execute_ddl(conn, drop_ddl) diff --git a/src/corvus_python/sql/synapse/__init__.py b/src/corvus_python/sql/synapse/__init__.py new file mode 100644 index 0000000..b86dc61 --- /dev/null +++ b/src/corvus_python/sql/synapse/__init__.py @@ -0,0 +1,6 @@ +from .synapse_sql_utils import get_synapse_sql_pyodbc_connection, create_database_if_not_exists + +__all__ = [ + "get_synapse_sql_pyodbc_connection", + "create_database_if_not_exists", +] diff --git a/src/corvus_python/sql/synapse/synapse_sql_utils.py b/src/corvus_python/sql/synapse/synapse_sql_utils.py new file mode 100644 index 0000000..e68deb5 --- /dev/null +++ b/src/corvus_python/sql/synapse/synapse_sql_utils.py @@ -0,0 +1,58 @@ +import pyodbc +from ..sql_utils import ( + get_pyodbc_connection, +) + + +def get_synapse_sql_pyodbc_connection( + workspace_name: str, + database: str, + use_managed_identity: bool = True, + client_id: str | None = None, +) -> pyodbc.Connection: + """Open a pyodbc connection to Synapse serverless SQL endpoint using AAD tokens. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + workspace_name: + The Synapse workspace name, e.g. "myworkspace". + database: + The database name. + use_managed_identity: + If True, uses ManagedIdentityCredential (for Container Apps / ACI). + If False, uses DefaultAzureCredential (for local dev — picks up + az login, VS Code, environment variables, etc.). + client_id: + Optional user-assigned managed identity client ID. Only used when + use_managed_identity is True. + """ + server = f"{workspace_name}-ondemand.sql.azuresynapse.net" + return get_pyodbc_connection(server, database, use_managed_identity, client_id) + + +def create_database_if_not_exists( + conn: pyodbc.Connection, + database_name: str, +) -> None: + """Create a database if it doesn't already exist.""" + + if conn.getinfo(pyodbc.SQL_DATABASE_NAME) != "master": + raise ValueError("Connection must be to master database to create a new database.") + + prev_autocommit = conn.autocommit + conn.autocommit = True + + ddl = f""" + IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{database_name}') + BEGIN + CREATE DATABASE [{database_name}] + END + """ + + try: + with conn.cursor() as cursor: + cursor.execute(ddl) + finally: + conn.autocommit = prev_autocommit diff --git a/tests/unit/test_sql_utils.py b/tests/unit/test_sql_utils.py new file mode 100644 index 0000000..61dd752 --- /dev/null +++ b/tests/unit/test_sql_utils.py @@ -0,0 +1,170 @@ +import pytest + +from corvus_python.sql.sql_utils import ( + SelectColumn, + WithColumn, + generate_create_or_alter_view_over_delta_table_ddl, +) + +DELTA_PATH = "abfss://container@storageaccount.dfs.core.windows.net/path/to/table" + + +class TestGenerateCreateOrAlterViewOverDeltaTableDdl: + def test_infer_types_single_column_no_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[SelectColumn(name="col1")], + ) + + expected = ( + "CREATE OR ALTER VIEW [dbo].[vw_test]\n" + "AS\n" + "SELECT\n" + " [col1]\n" + "FROM OPENROWSET(\n" + f" BULK '{DELTA_PATH}',\n" + "FORMAT = 'DELTA'\n" + " ) AS [result]" + ) + assert result == expected + + def test_infer_types_single_column_with_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[SelectColumn(name="col1", title="Column One")], + ) + + assert "[col1] AS [Column One]" in result + + def test_infer_types_multiple_columns_mixed_titles(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="myschema", + view_name="vw_multi", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[ + SelectColumn(name="id"), + SelectColumn(name="first_name", title="First Name"), + SelectColumn(name="last_name"), + ], + ) + + assert "CREATE OR ALTER VIEW [myschema].[vw_multi]" in result + assert "[id]" in result + assert "[first_name] AS [First Name]" in result + assert "[last_name]" in result + # Columns should be comma-separated + assert ",\n " in result + + def test_infer_types_raises_when_select_columns_missing(self): + with pytest.raises(ValueError, match="select_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=None, + ) + + def test_infer_types_raises_when_select_columns_empty(self): + with pytest.raises(ValueError, match="select_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[], + ) + + def test_no_infer_types_single_with_column_no_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_typed", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[WithColumn(name="col1", type="INT")], + ) + + expected = ( + "CREATE OR ALTER VIEW [dbo].[vw_typed]\n" + "AS\n" + "SELECT\n" + " *\n" + "FROM OPENROWSET(\n" + f" BULK '{DELTA_PATH}',\n" + "FORMAT = 'DELTA'\n" + " )\n" + " WITH (\n" + " [col1] INT '$.col1'\n" + " ) AS [result]" + ) + assert result == expected + + def test_no_infer_types_with_column_with_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_typed", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[WithColumn(name="col1", type="VARCHAR(100)", title="Column One")], + ) + + assert "[Column One] VARCHAR(100) '$.col1'" in result + + def test_no_infer_types_multiple_with_columns(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_typed", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[ + WithColumn(name="id", type="INT"), + WithColumn(name="name", type="VARCHAR(200)", title="Full Name"), + WithColumn(name="created_at", type="DATETIME2"), + ], + ) + + assert "SELECT\n *" in result + assert "[id] INT '$.id'" in result + assert "[Full Name] VARCHAR(200) '$.name'" in result + assert "[created_at] DATETIME2 '$.created_at'" in result + assert "WITH (" in result + + def test_no_infer_types_raises_when_with_columns_missing(self): + with pytest.raises(ValueError, match="with_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=None, + ) + + def test_no_infer_types_raises_when_with_columns_empty(self): + with pytest.raises(ValueError, match="with_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[], + ) + + def test_delta_table_path_appears_in_bulk_clause(self): + custom_path = "abfss://mycontainer@myaccount.dfs.core.windows.net/my/table" + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=custom_path, + infer_types=True, + select_columns=[SelectColumn(name="x")], + ) + + assert f"BULK '{custom_path}'" in result + assert "FORMAT = 'DELTA'" in result From 2c5b6b78e1af34dd43b5a0a8f871fb8f7652a60c Mon Sep 17 00:00:00 2001 From: Mike Evans-Larah Date: Tue, 31 Mar 2026 10:46:22 +0000 Subject: [PATCH 2/4] Update readme --- README.md | 94 ++++++++++++++++++++++++++++++ src/corvus_python/sql/sql_utils.py | 4 +- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7d3dd43..8ab3b4e 100644 --- a/README.md +++ b/README.md @@ -201,3 +201,97 @@ except EmailError as e: - The `from_email` must use a configured MailFrom address from your ACS resource - Set `email_sending_disabled=True` during development to prevent actual emails from being sent - Attachments must be base64-encoded before adding to the `EmailAttachment` object + +--- + +### `sql` + +Includes utility functions for working with SQL databases via pyodbc, with AAD token-based authentication. Provides helpers for connecting to Synapse serverless SQL and Fabric SQL Analytics endpoints, managing views over Delta Lake tables, and executing DDL statements. + +**⚠️ Note: This module requires ODBC Driver 18 for SQL Server or later.** + +| Component Name | Object Type | Description | Import syntax | +|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| get_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql import get_pyodbc_connection | +| execute_ddl | Function | Executes a DDL statement (e.g. CREATE VIEW) using a pyodbc connection. | from corvus_python.sql import execute_ddl | +| create_or_alter_view_over_delta_table | Function | Creates or alters a SQL view over a Delta Lake table, with support for inferred or explicit column types. | from corvus_python.sql import create_or_alter_view_over_delta_table | +| drop_views_in_schema | Function | Drops all views in a given schema. | from corvus_python.sql import drop_views_in_schema | +| SelectColumn | Class | Dataclass representing a column to select, with an optional display title. | from corvus_python.sql import SelectColumn | +| WithColumn | Class | Dataclass representing a column with an explicit type for use in OPENROWSET WITH clauses. | from corvus_python.sql import WithColumn | + +#### `sql.synapse` + +Convenience wrappers specific to Synapse serverless SQL endpoints. + +| Component Name | Object Type | Description | Import syntax | +|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| get_synapse_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using AAD tokens. Builds the server URL from the workspace name. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection | +| create_database_if_not_exists | Function | Creates a database if it doesn't already exist. Requires a connection to the master database. | from corvus_python.sql.synapse import create_database_if_not_exists | + +#### `sql.fabric` + +Convenience wrappers specific to Fabric SQL Analytics endpoints. + +| Component Name | Object Type | Description | Import syntax | +|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| get_fabric_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql.fabric import get_fabric_sql_pyodbc_connection | + +#### Usage Example + +```python +from corvus_python.sql import ( + get_pyodbc_connection, + create_or_alter_view_over_delta_table, + drop_views_in_schema, + SelectColumn, + WithColumn, +) + +# Connect using DefaultAzureCredential +conn = get_pyodbc_connection( + server="myworkspace-ondemand.sql.azuresynapse.net", + database="my_database", + use_managed_identity=False, +) + +# Create a view with inferred types +create_or_alter_view_over_delta_table( + conn=conn, + schema_name="dbo", + view_name="my_view", + delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table", + infer_types=True, + select_columns=[ + SelectColumn(name="id"), + SelectColumn(name="full_name", title="Name"), + ], +) + +# Create a view with explicit types +create_or_alter_view_over_delta_table( + conn=conn, + schema_name="dbo", + view_name="my_typed_view", + delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table", + infer_types=False, + with_columns=[ + WithColumn(name="id", type="INT"), + WithColumn(name="full_name", type="VARCHAR(200)", title="Name"), + ], +) + +# Drop all views in a schema +drop_views_in_schema(conn, schema_name="dbo") +``` + +Or use the Synapse/Fabric-specific helpers: + +```python +from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection + +conn = get_synapse_sql_pyodbc_connection( + workspace_name="myworkspace", + database="my_database", + use_managed_identity=False, +) +``` diff --git a/src/corvus_python/sql/sql_utils.py b/src/corvus_python/sql/sql_utils.py index 57b99df..1f7df40 100644 --- a/src/corvus_python/sql/sql_utils.py +++ b/src/corvus_python/sql/sql_utils.py @@ -92,7 +92,7 @@ def generate_create_or_alter_view_over_delta_table_ddl( with_columns: List[WithColumn] | None = None, ) -> str: """ - Generate the DDL statement to create or alter a serverless SQL view over a Delta Lake table. + Generate the DDL statement to create or alter a SQL view over a Delta Lake table. Parameters @@ -164,7 +164,7 @@ def create_or_alter_view_over_delta_table( with_columns: List[WithColumn] | None = None, ) -> None: """ - Create or alter a serverless SQL view over a Delta Lake table. + Create or alter a SQL view over a Delta Lake table. Parameters From b6b04e22f0793b1ee9f52a890505d9eeed7a29ff Mon Sep 17 00:00:00 2001 From: Mike Evans-Larah Date: Tue, 31 Mar 2026 15:43:17 +0000 Subject: [PATCH 3/4] Add connection methods that take the token --- src/corvus_python/sql/__init__.py | 2 ++ src/corvus_python/sql/sql_utils.py | 34 +++++++++++++++---- src/corvus_python/sql/synapse/__init__.py | 7 +++- .../sql/synapse/synapse_sql_utils.py | 23 +++++++++++++ 4 files changed, 59 insertions(+), 7 deletions(-) diff --git a/src/corvus_python/sql/__init__.py b/src/corvus_python/sql/__init__.py index db88150..5d2dc29 100644 --- a/src/corvus_python/sql/__init__.py +++ b/src/corvus_python/sql/__init__.py @@ -1,6 +1,7 @@ from .sql_utils import ( execute_ddl, get_pyodbc_connection, + get_pyodbc_connection_with_token, create_or_alter_view_over_delta_table, drop_views_in_schema, SelectColumn, @@ -10,6 +11,7 @@ __all__ = [ "execute_ddl", "get_pyodbc_connection", + "get_pyodbc_connection_with_token", "create_or_alter_view_over_delta_table", "drop_views_in_schema", "SelectColumn", diff --git a/src/corvus_python/sql/sql_utils.py b/src/corvus_python/sql/sql_utils.py index 1f7df40..fa4b0df 100644 --- a/src/corvus_python/sql/sql_utils.py +++ b/src/corvus_python/sql/sql_utils.py @@ -42,7 +42,7 @@ def get_pyodbc_connection( Optional user-assigned managed identity client ID. Only used when use_managed_identity is True. """ - # 1. Acquire an AAD token for the Azure SQL resource + # Acquire an AAD token for the Azure SQL resource resource = "https://database.windows.net/.default" if use_managed_identity: @@ -50,14 +50,36 @@ def get_pyodbc_connection( else: credential = DefaultAzureCredential() - token = credential.get_token(resource) + token = credential.get_token(resource).token - # 2. Pack the token into the bytes format pyodbc expects for - # SQL_COPT_SS_ACCESS_TOKEN (driver-level AAD token injection). - token_bytes = token.token.encode("utf-16-le") + return get_pyodbc_connection_with_token(server, database, token) + + +def get_pyodbc_connection_with_token( + server: str, + database: str, + token: str, +) -> pyodbc.Connection: + """ + Open a pyodbc connection to Synapse serverless SQL or Fabric SQL Analytics endpoint using a pre-acquired AAD token. + Parameters + ---------- + server: + The Synapse serverless endpoint or Fabric SQL Analytics endpoint, + e.g. "myworkspace-ondemand.sql.azuresynapse.net" + or ".datawarehouse.fabric.microsoft.com". + database: + The database name. + token: + An AAD access token with the appropriate scopes/permissions for Azure SQL. + """ + + # Pack the token into the bytes format pyodbc expects for + # SQL_COPT_SS_ACCESS_TOKEN (driver-level AAD token injection). + token_bytes = token.encode("utf-16-le") token_struct = struct.pack(f" pyodbc.Connection: + """Open a pyodbc connection to Synapse serverless SQL endpoint using a pre-acquired AAD token. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + workspace_name: + The Synapse workspace name, e.g. "myworkspace". + database: + The database name. + token: + A pre-acquired AAD token with audience "https://database.windows.net/.default". + """ + server = f"{workspace_name}-ondemand.sql.azuresynapse.net" + return get_pyodbc_connection_with_token(server, database, token) + + def create_database_if_not_exists( conn: pyodbc.Connection, database_name: str, From 95c0ae4f88f9fe652e5e84bb0a9e1e1d09ab6afa Mon Sep 17 00:00:00 2001 From: Mike Evans-Larah Date: Tue, 31 Mar 2026 15:46:59 +0000 Subject: [PATCH 4/4] Update readme --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 8ab3b4e..a66d3b6 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,7 @@ Includes utility functions for working with SQL databases via pyodbc, with AAD t | Component Name | Object Type | Description | Import syntax | |--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| | get_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql import get_pyodbc_connection | +| get_pyodbc_connection_with_token | Function | Opens a pyodbc connection using a pre-acquired AAD token. Useful when running inside Synapse notebooks. | from corvus_python.sql import get_pyodbc_connection_with_token | | execute_ddl | Function | Executes a DDL statement (e.g. CREATE VIEW) using a pyodbc connection. | from corvus_python.sql import execute_ddl | | create_or_alter_view_over_delta_table | Function | Creates or alters a SQL view over a Delta Lake table, with support for inferred or explicit column types. | from corvus_python.sql import create_or_alter_view_over_delta_table | | drop_views_in_schema | Function | Drops all views in a given schema. | from corvus_python.sql import drop_views_in_schema | @@ -226,6 +227,7 @@ Convenience wrappers specific to Synapse serverless SQL endpoints. | Component Name | Object Type | Description | Import syntax | |--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| | get_synapse_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using AAD tokens. Builds the server URL from the workspace name. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection | +| get_synapse_sql_pyodbc_connection_with_token | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using a pre-acquired AAD token. Ideal for Synapse notebooks. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token | | create_database_if_not_exists | Function | Creates a database if it doesn't already exist. Requires a connection to the master database. | from corvus_python.sql.synapse import create_database_if_not_exists | #### `sql.fabric` @@ -295,3 +297,19 @@ conn = get_synapse_sql_pyodbc_connection( use_managed_identity=False, ) ``` + +#### Usage from a Synapse Notebook + +When running inside a Synapse notebook, you can use `mssparkutils` to acquire a token and pass it directly: + +```python +from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token + +token = mssparkutils.credentials.getToken("DW") + +conn = get_synapse_sql_pyodbc_connection_with_token( + workspace_name="myworkspace", + database="my_database", + token=token, +) +```