diff --git a/README.md b/README.md
index 7d3dd43..a66d3b6 100644
--- a/README.md
+++ b/README.md
@@ -201,3 +201,115 @@ except EmailError as e:
- The `from_email` must use a configured MailFrom address from your ACS resource
- Set `email_sending_disabled=True` during development to prevent actual emails from being sent
- Attachments must be base64-encoded before adding to the `EmailAttachment` object
+
+---
+
+### `sql`
+
+Includes utility functions for working with SQL databases via pyodbc, with AAD token-based authentication. Provides helpers for connecting to Synapse serverless SQL and Fabric SQL Analytics endpoints, managing views over Delta Lake tables, and executing DDL statements.
+
+**⚠️ Note: This module requires ODBC Driver 18 for SQL Server or later.**
+
+| Component Name | Object Type | Description | Import syntax |
+|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
+| get_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql import get_pyodbc_connection |
+| get_pyodbc_connection_with_token | Function | Opens a pyodbc connection using a pre-acquired AAD token. Useful when running inside Synapse notebooks. | from corvus_python.sql import get_pyodbc_connection_with_token |
+| execute_ddl | Function | Executes a DDL statement (e.g. CREATE VIEW) using a pyodbc connection. | from corvus_python.sql import execute_ddl |
+| create_or_alter_view_over_delta_table | Function | Creates or alters a SQL view over a Delta Lake table, with support for inferred or explicit column types. | from corvus_python.sql import create_or_alter_view_over_delta_table |
+| drop_views_in_schema | Function | Drops all views in a given schema. | from corvus_python.sql import drop_views_in_schema |
+| SelectColumn | Class | Dataclass representing a column to select, with an optional display title. | from corvus_python.sql import SelectColumn |
+| WithColumn | Class | Dataclass representing a column with an explicit type for use in OPENROWSET WITH clauses. | from corvus_python.sql import WithColumn |
+
+#### `sql.synapse`
+
+Convenience wrappers specific to Synapse serverless SQL endpoints.
+
+| Component Name | Object Type | Description | Import syntax |
+|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
+| get_synapse_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using AAD tokens. Builds the server URL from the workspace name. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection |
+| get_synapse_sql_pyodbc_connection_with_token | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using a pre-acquired AAD token. Ideal for Synapse notebooks. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token |
+| create_database_if_not_exists | Function | Creates a database if it doesn't already exist. Requires a connection to the master database. | from corvus_python.sql.synapse import create_database_if_not_exists |
+
+#### `sql.fabric`
+
+Convenience wrappers specific to Fabric SQL Analytics endpoints.
+
+| Component Name | Object Type | Description | Import syntax |
+|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
+| get_fabric_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql.fabric import get_fabric_sql_pyodbc_connection |
+
+#### Usage Example
+
+```python
+from corvus_python.sql import (
+ get_pyodbc_connection,
+ create_or_alter_view_over_delta_table,
+ drop_views_in_schema,
+ SelectColumn,
+ WithColumn,
+)
+
+# Connect using DefaultAzureCredential
+conn = get_pyodbc_connection(
+ server="myworkspace-ondemand.sql.azuresynapse.net",
+ database="my_database",
+ use_managed_identity=False,
+)
+
+# Create a view with inferred types
+create_or_alter_view_over_delta_table(
+ conn=conn,
+ schema_name="dbo",
+ view_name="my_view",
+ delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table",
+ infer_types=True,
+ select_columns=[
+ SelectColumn(name="id"),
+ SelectColumn(name="full_name", title="Name"),
+ ],
+)
+
+# Create a view with explicit types
+create_or_alter_view_over_delta_table(
+ conn=conn,
+ schema_name="dbo",
+ view_name="my_typed_view",
+ delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table",
+ infer_types=False,
+ with_columns=[
+ WithColumn(name="id", type="INT"),
+ WithColumn(name="full_name", type="VARCHAR(200)", title="Name"),
+ ],
+)
+
+# Drop all views in a schema
+drop_views_in_schema(conn, schema_name="dbo")
+```
+
+Or use the Synapse/Fabric-specific helpers:
+
+```python
+from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection
+
+conn = get_synapse_sql_pyodbc_connection(
+ workspace_name="myworkspace",
+ database="my_database",
+ use_managed_identity=False,
+)
+```
+
+#### Usage from a Synapse Notebook
+
+When running inside a Synapse notebook, you can use `mssparkutils` to acquire a token and pass it directly:
+
+```python
+from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token
+
+token = mssparkutils.credentials.getToken("DW")
+
+conn = get_synapse_sql_pyodbc_connection_with_token(
+ workspace_name="myworkspace",
+ database="my_database",
+ token=token,
+)
+```
diff --git a/src/corvus_python/pyspark/synapse/sync_tables_locally.py b/src/corvus_python/pyspark/synapse/sync_tables_locally.py
index df8ffd5..b985e0f 100644
--- a/src/corvus_python/pyspark/synapse/sync_tables_locally.py
+++ b/src/corvus_python/pyspark/synapse/sync_tables_locally.py
@@ -2,8 +2,6 @@
from dataclasses import dataclass
from typing import List, Tuple
-import pyodbc
-import struct
from pyspark.sql import SparkSession
from corvus_python.pyspark.utilities import LocalSparkSessionConfig, get_or_create_spark_session
@@ -18,41 +16,11 @@ class ObjectSyncDetails:
database_name (str): Name of the database.
tables (List[str]): List of tables in the database.
"""
+
database_name: str
tables: List[str]
-def _get_pyodbc_connection(workspace_name: str) -> pyodbc.Connection:
- """Gets an ODBC connection to the SQL Serverless endpoint of a Synapse workspace.
-
- Args:
- workspace_name (str): Name of the workspace.
-
- Returns:
- pyodbc.Connection: ODBC connection to the SQL Serverless endpoint.
-
- Raises:
- RuntimeError: If user is not logged into the Azure CLI.
-
- Note:
- The connection object returned can be used in a pandas read_sql query to pull data from Synapse. E.g.:
- df = pd.read_sql(f'SELECT * FROM db_name.dbo.table_name', conn)
- """
- server = f'{workspace_name}-ondemand.sql.azuresynapse.net'
- database = 'master'
- driver = '{ODBC Driver 17 for SQL Server}'
- connection_string = f'Driver={driver};Server=tcp:{server},1433;\
- Database={database};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'
-
- token = get_az_cli_token('https://database.windows.net/.default')
-
- token_bytes = token.encode("UTF-16-LE")
- token_struct = struct.pack(f' Tuple[str, dict]:
"""Gets the Spark JDBC connection properties for a Synapse SQL Serverless endpoint.
@@ -65,28 +33,25 @@ def _get_jdbc_connection_properties(workspace_name: str) -> Tuple[str, dict]:
Raises:
RuntimeError: If user is not logged into the Azure CLI.
"""
- server = f'{workspace_name}-ondemand.sql.azuresynapse.net'
- database = 'master'
+ server = f"{workspace_name}-ondemand.sql.azuresynapse.net"
+ database = "master"
jdbc_url = f"jdbc:sqlserver://{server}:1433;database={database};encrypt=true;\
trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30"
- token = get_az_cli_token('https://database.windows.net/.default')
+ token = get_az_cli_token("https://database.windows.net/.default")
- connection_properties = {
- "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
- "accessToken": token
- }
+ connection_properties = {"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "accessToken": token}
return jdbc_url, connection_properties
def sync_synapse_tables_to_local_spark(
- workspace_name: str,
- object_sync_details: List[ObjectSyncDetails],
- overwrite: bool = False,
- spark: SparkSession = None
- ):
+ workspace_name: str,
+ object_sync_details: List[ObjectSyncDetails],
+ overwrite: bool = False,
+ spark: SparkSession = None,
+):
"""Syncs tables from a Synapse workspace to a local Spark metastore.
Args:
@@ -111,20 +76,17 @@ def sync_synapse_tables_to_local_spark(
table_exists = spark.catalog.tableExists(table, osd.database_name)
if table_exists and not overwrite:
- print('\033[93m' + f"Table '{table}' in database '{osd.database_name}' already exists and \
-overwrite is set to False. Skipping table sync." + '\033[0m')
+ print(
+ "\033[93m"
+ + f"Table '{table}' in database '{osd.database_name}' already exists and \
+overwrite is set to False. Skipping table sync."
+ + "\033[0m"
+ )
continue
else:
spark.read.jdbc(
- url=jdbc_url,
- table=f"{osd.database_name}.dbo.{table}",
- properties=connection_properties
- ) \
- .coalesce(1) \
- .write \
- .format("delta") \
- .mode("overwrite") \
- .saveAsTable(f"{osd.database_name}.{table}")
+ url=jdbc_url, table=f"{osd.database_name}.dbo.{table}", properties=connection_properties
+ ).coalesce(1).write.format("delta").mode("overwrite").saveAsTable(f"{osd.database_name}.{table}")
if not existing_spark_session:
spark.stop()
diff --git a/src/corvus_python/sql/__init__.py b/src/corvus_python/sql/__init__.py
new file mode 100644
index 0000000..5d2dc29
--- /dev/null
+++ b/src/corvus_python/sql/__init__.py
@@ -0,0 +1,19 @@
+from .sql_utils import (
+ execute_ddl,
+ get_pyodbc_connection,
+ get_pyodbc_connection_with_token,
+ create_or_alter_view_over_delta_table,
+ drop_views_in_schema,
+ SelectColumn,
+ WithColumn,
+)
+
+__all__ = [
+ "execute_ddl",
+ "get_pyodbc_connection",
+ "get_pyodbc_connection_with_token",
+ "create_or_alter_view_over_delta_table",
+ "drop_views_in_schema",
+ "SelectColumn",
+ "WithColumn",
+]
diff --git a/src/corvus_python/sql/fabric/__init__.py b/src/corvus_python/sql/fabric/__init__.py
new file mode 100644
index 0000000..c92cfa9
--- /dev/null
+++ b/src/corvus_python/sql/fabric/__init__.py
@@ -0,0 +1,5 @@
+from .fabric_sql_utils import get_fabric_sql_pyodbc_connection
+
+__all__ = [
+ "get_fabric_sql_pyodbc_connection",
+]
diff --git a/src/corvus_python/sql/fabric/fabric_sql_utils.py b/src/corvus_python/sql/fabric/fabric_sql_utils.py
new file mode 100644
index 0000000..334e7f3
--- /dev/null
+++ b/src/corvus_python/sql/fabric/fabric_sql_utils.py
@@ -0,0 +1,29 @@
+import pyodbc
+from ..sql_utils import get_pyodbc_connection
+
+
+def get_fabric_sql_pyodbc_connection(
+ sql_analytics_endpoint: str,
+ database: str,
+ use_managed_identity: bool = True,
+ client_id: str | None = None,
+) -> pyodbc.Connection:
+ """Open a pyodbc connection to Fabric SQL Analytics endpoint using AAD tokens.
+
+ Requires ODBC Driver 18 for SQL Server or later.
+
+ Parameters
+ ----------
+ sql_analytics_endpoint:
+ The Fabric SQL Analytics endpoint, e.g. ".datawarehouse.fabric.microsoft.com".
+ database:
+ The database name.
+ use_managed_identity:
+ If True, uses ManagedIdentityCredential (for Container Apps / ACI).
+ If False, uses DefaultAzureCredential (for local dev — picks up
+ az login, VS Code, environment variables, etc.).
+ client_id:
+ Optional user-assigned managed identity client ID. Only used when
+ use_managed_identity is True.
+ """
+ return get_pyodbc_connection(sql_analytics_endpoint, database, use_managed_identity, client_id)
diff --git a/src/corvus_python/sql/sql_utils.py b/src/corvus_python/sql/sql_utils.py
new file mode 100644
index 0000000..fa4b0df
--- /dev/null
+++ b/src/corvus_python/sql/sql_utils.py
@@ -0,0 +1,249 @@
+from dataclasses import dataclass
+from typing import List
+
+import pyodbc
+import struct
+from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
+
+
+def execute_ddl(
+ conn: pyodbc.Connection,
+ ddl: str,
+) -> None:
+ """Execute a DDL statement (e.g. CREATE VIEW) using a pyodbc connection."""
+ with conn.cursor() as cursor:
+ cursor.execute(ddl)
+ conn.commit()
+
+
+def get_pyodbc_connection(
+ server: str,
+ database: str,
+ use_managed_identity: bool = True,
+ client_id: str | None = None,
+) -> pyodbc.Connection:
+ """Open a pyodbc connection to Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens.
+
+ Requires ODBC Driver 18 for SQL Server or later.
+
+ Parameters
+ ----------
+ server:
+ The Synapse serverless endpoint or Fabric SQL Analytics endpoint,
+ e.g. "myworkspace-ondemand.sql.azuresynapse.net"
+ or ".datawarehouse.fabric.microsoft.com".
+ database:
+ The database name.
+ use_managed_identity:
+ If True, uses ManagedIdentityCredential (for Container Apps / ACI).
+ If False, uses DefaultAzureCredential (for local dev — picks up
+ az login, VS Code, environment variables, etc.).
+ client_id:
+ Optional user-assigned managed identity client ID. Only used when
+ use_managed_identity is True.
+ """
+ # Acquire an AAD token for the Azure SQL resource
+ resource = "https://database.windows.net/.default"
+
+ if use_managed_identity:
+ credential = ManagedIdentityCredential(client_id=client_id)
+ else:
+ credential = DefaultAzureCredential()
+
+ token = credential.get_token(resource).token
+
+ return get_pyodbc_connection_with_token(server, database, token)
+
+
+def get_pyodbc_connection_with_token(
+ server: str,
+ database: str,
+ token: str,
+) -> pyodbc.Connection:
+ """
+ Open a pyodbc connection to Synapse serverless SQL or Fabric SQL Analytics endpoint using a pre-acquired AAD token.
+ Parameters
+ ----------
+ server:
+ The Synapse serverless endpoint or Fabric SQL Analytics endpoint,
+ e.g. "myworkspace-ondemand.sql.azuresynapse.net"
+ or ".datawarehouse.fabric.microsoft.com".
+ database:
+ The database name.
+ token:
+ An AAD access token with the appropriate scopes/permissions for Azure SQL.
+ """
+
+ # Pack the token into the bytes format pyodbc expects for
+ # SQL_COPT_SS_ACCESS_TOKEN (driver-level AAD token injection).
+ token_bytes = token.encode("utf-16-le")
+ token_struct = struct.pack(f" str:
+ """
+ Generate the DDL statement to create or alter a SQL view over a Delta Lake table.
+
+
+ Parameters
+ ----------
+ schema_name:
+ The schema for the view, e.g. "dbo".
+ view_name:
+ The name of the view to create or alter.
+ delta_table_path:
+ The path to the Delta Lake table, e.g. "abfss://container@storageaccount.dfs.core.windows.net/path/to/table".
+ infer_types:
+ If True, the view will be created with "SELECT [col] AS [title],..." and Synapse will infer the column types
+ from the data.
+ If False, the view will be created with "SELECT * FROM OPENROWSET(...) WITH ([col] type, ...)" and the column
+ types will be determined by the caller and passed in via with_columns.
+ select_columns:
+ If infer_types is True, the list of columns to select from the Delta table, along with optional titles for the
+ view. If a title is not provided for a column, the original column name will be used in the view.
+ with_columns:
+ If infer_types is False, the list of columns and their types to define in the WITH clause of the OPENROWSET
+ statement. The column names should match the columns in the Delta table, and the types should be valid
+ Synapse SQL types (e.g. "INT", "VARCHAR(100)", "DATETIME2", etc.).
+ """
+
+ if infer_types:
+ if not select_columns:
+ raise ValueError("select_columns must be provided when infer_types is True")
+
+ col_def_list: list[str] = []
+ for column in select_columns:
+ if column.title:
+ col_def_list.append(f"[{column.name}] AS [{column.title}]")
+ else:
+ col_def_list.append(f"[{column.name}]")
+
+ select_clause = ",\n ".join(col_def_list)
+ with_clause = ""
+ else:
+ select_clause = "*"
+ with_column_list: list[str] = []
+ if not with_columns:
+ raise ValueError("with_columns must be provided when infer_types is False")
+ for column in with_columns:
+ with_column_list.append(f"[{column.title or column.name}] {column.type} '$.{column.name}'")
+ with_clause = ",\n ".join(with_column_list)
+ with_clause = f"\n WITH (\n {with_clause}\n )"
+
+ bulk_clause = f"BULK '{delta_table_path}',\n" f"FORMAT = 'DELTA'"
+
+ ddl = f"""\
+CREATE OR ALTER VIEW [{schema_name}].[{view_name}]
+AS
+SELECT
+ {select_clause}
+FROM OPENROWSET(
+ {bulk_clause}
+ ){with_clause} AS [result]"""
+
+ return ddl
+
+
+def create_or_alter_view_over_delta_table(
+ conn: pyodbc.Connection,
+ schema_name: str,
+ view_name: str,
+ delta_table_path: str,
+ infer_types: bool = True,
+ select_columns: List[SelectColumn] | None = None,
+ with_columns: List[WithColumn] | None = None,
+) -> None:
+ """
+ Create or alter a SQL view over a Delta Lake table.
+
+
+ Parameters
+ ----------
+ conn:
+ An open pyodbc connection to the Synapse workspace.
+ schema_name:
+ The schema for the view, e.g. "dbo".
+ view_name:
+ The name of the view to create or alter.
+ delta_table_path:
+ The path to the Delta Lake table, e.g. "abfss://container@storageaccount.dfs.core.windows.net/path/to/table".
+ infer_types:
+ If True, the view will be created with "SELECT [col] AS [title],..." and Synapse will infer the column types
+ from the data.
+ If False, the view will be created with "SELECT * FROM OPENROWSET(...) WITH ([col] type, ...)" and the column
+ types will be determined by the caller and passed in via with_columns.
+ select_columns:
+ If infer_types is True, the list of columns to select from the Delta table, along with optional titles for the
+ view. If a title is not provided for a column, the original column name will be used in the view.
+ with_columns:
+ If infer_types is False, the list of columns and their types to define in the WITH clause of the OPENROWSET
+ statement. The column names should match the columns in the Delta table, and the types should be valid
+ Synapse SQL types (e.g. "INT", "VARCHAR(100)", "DATETIME2", etc.).
+ """
+
+ ddl = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name, view_name, delta_table_path, infer_types, select_columns, with_columns
+ )
+
+ execute_ddl(conn, ddl)
+
+
+def drop_views_in_schema(
+ conn: pyodbc.Connection,
+ schema_name: str = "dbo",
+) -> None:
+ """
+ Drop all views in a given schema.
+
+ Parameters
+ ----------
+ conn:
+ An open pyodbc connection to the Synapse workspace.
+ schema_name:
+ The schema from which to drop all views, e.g. "dbo".
+ """
+ cursor = conn.cursor()
+ cursor.execute(
+ f"""
+ SELECT TABLE_NAME
+ FROM INFORMATION_SCHEMA.VIEWS
+ WHERE TABLE_SCHEMA = '{schema_name}'
+ """
+ )
+ views = cursor.fetchall()
+ for view in views:
+ view_name = view[0]
+ drop_ddl = f"DROP VIEW [{schema_name}].[{view_name}]"
+ execute_ddl(conn, drop_ddl)
diff --git a/src/corvus_python/sql/synapse/__init__.py b/src/corvus_python/sql/synapse/__init__.py
new file mode 100644
index 0000000..148029f
--- /dev/null
+++ b/src/corvus_python/sql/synapse/__init__.py
@@ -0,0 +1,11 @@
+from .synapse_sql_utils import (
+ get_synapse_sql_pyodbc_connection,
+ get_synapse_sql_pyodbc_connection_with_token,
+ create_database_if_not_exists,
+)
+
+__all__ = [
+ "get_synapse_sql_pyodbc_connection",
+ "get_synapse_sql_pyodbc_connection_with_token",
+ "create_database_if_not_exists",
+]
diff --git a/src/corvus_python/sql/synapse/synapse_sql_utils.py b/src/corvus_python/sql/synapse/synapse_sql_utils.py
new file mode 100644
index 0000000..448b79d
--- /dev/null
+++ b/src/corvus_python/sql/synapse/synapse_sql_utils.py
@@ -0,0 +1,81 @@
+import pyodbc
+from ..sql_utils import (
+ get_pyodbc_connection,
+ get_pyodbc_connection_with_token,
+)
+
+
+def get_synapse_sql_pyodbc_connection(
+ workspace_name: str,
+ database: str,
+ use_managed_identity: bool = True,
+ client_id: str | None = None,
+) -> pyodbc.Connection:
+ """Open a pyodbc connection to Synapse serverless SQL endpoint using AAD tokens.
+
+ Requires ODBC Driver 18 for SQL Server or later.
+
+ Parameters
+ ----------
+ workspace_name:
+ The Synapse workspace name, e.g. "myworkspace".
+ database:
+ The database name.
+ use_managed_identity:
+ If True, uses ManagedIdentityCredential (for Container Apps / ACI).
+ If False, uses DefaultAzureCredential (for local dev — picks up
+ az login, VS Code, environment variables, etc.).
+ client_id:
+ Optional user-assigned managed identity client ID. Only used when
+ use_managed_identity is True.
+ """
+ server = f"{workspace_name}-ondemand.sql.azuresynapse.net"
+ return get_pyodbc_connection(server, database, use_managed_identity, client_id)
+
+
+def get_synapse_sql_pyodbc_connection_with_token(
+ workspace_name: str,
+ database: str,
+ token: str,
+) -> pyodbc.Connection:
+ """Open a pyodbc connection to Synapse serverless SQL endpoint using a pre-acquired AAD token.
+
+ Requires ODBC Driver 18 for SQL Server or later.
+
+ Parameters
+ ----------
+ workspace_name:
+ The Synapse workspace name, e.g. "myworkspace".
+ database:
+ The database name.
+ token:
+ A pre-acquired AAD token with audience "https://database.windows.net/.default".
+ """
+ server = f"{workspace_name}-ondemand.sql.azuresynapse.net"
+ return get_pyodbc_connection_with_token(server, database, token)
+
+
+def create_database_if_not_exists(
+ conn: pyodbc.Connection,
+ database_name: str,
+) -> None:
+ """Create a database if it doesn't already exist."""
+
+ if conn.getinfo(pyodbc.SQL_DATABASE_NAME) != "master":
+ raise ValueError("Connection must be to master database to create a new database.")
+
+ prev_autocommit = conn.autocommit
+ conn.autocommit = True
+
+ ddl = f"""
+ IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{database_name}')
+ BEGIN
+ CREATE DATABASE [{database_name}]
+ END
+ """
+
+ try:
+ with conn.cursor() as cursor:
+ cursor.execute(ddl)
+ finally:
+ conn.autocommit = prev_autocommit
diff --git a/tests/unit/test_sql_utils.py b/tests/unit/test_sql_utils.py
new file mode 100644
index 0000000..61dd752
--- /dev/null
+++ b/tests/unit/test_sql_utils.py
@@ -0,0 +1,170 @@
+import pytest
+
+from corvus_python.sql.sql_utils import (
+ SelectColumn,
+ WithColumn,
+ generate_create_or_alter_view_over_delta_table_ddl,
+)
+
+DELTA_PATH = "abfss://container@storageaccount.dfs.core.windows.net/path/to/table"
+
+
+class TestGenerateCreateOrAlterViewOverDeltaTableDdl:
+ def test_infer_types_single_column_no_title(self):
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=DELTA_PATH,
+ infer_types=True,
+ select_columns=[SelectColumn(name="col1")],
+ )
+
+ expected = (
+ "CREATE OR ALTER VIEW [dbo].[vw_test]\n"
+ "AS\n"
+ "SELECT\n"
+ " [col1]\n"
+ "FROM OPENROWSET(\n"
+ f" BULK '{DELTA_PATH}',\n"
+ "FORMAT = 'DELTA'\n"
+ " ) AS [result]"
+ )
+ assert result == expected
+
+ def test_infer_types_single_column_with_title(self):
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=DELTA_PATH,
+ infer_types=True,
+ select_columns=[SelectColumn(name="col1", title="Column One")],
+ )
+
+ assert "[col1] AS [Column One]" in result
+
+ def test_infer_types_multiple_columns_mixed_titles(self):
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="myschema",
+ view_name="vw_multi",
+ delta_table_path=DELTA_PATH,
+ infer_types=True,
+ select_columns=[
+ SelectColumn(name="id"),
+ SelectColumn(name="first_name", title="First Name"),
+ SelectColumn(name="last_name"),
+ ],
+ )
+
+ assert "CREATE OR ALTER VIEW [myschema].[vw_multi]" in result
+ assert "[id]" in result
+ assert "[first_name] AS [First Name]" in result
+ assert "[last_name]" in result
+ # Columns should be comma-separated
+ assert ",\n " in result
+
+ def test_infer_types_raises_when_select_columns_missing(self):
+ with pytest.raises(ValueError, match="select_columns must be provided"):
+ generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=DELTA_PATH,
+ infer_types=True,
+ select_columns=None,
+ )
+
+ def test_infer_types_raises_when_select_columns_empty(self):
+ with pytest.raises(ValueError, match="select_columns must be provided"):
+ generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=DELTA_PATH,
+ infer_types=True,
+ select_columns=[],
+ )
+
+ def test_no_infer_types_single_with_column_no_title(self):
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_typed",
+ delta_table_path=DELTA_PATH,
+ infer_types=False,
+ with_columns=[WithColumn(name="col1", type="INT")],
+ )
+
+ expected = (
+ "CREATE OR ALTER VIEW [dbo].[vw_typed]\n"
+ "AS\n"
+ "SELECT\n"
+ " *\n"
+ "FROM OPENROWSET(\n"
+ f" BULK '{DELTA_PATH}',\n"
+ "FORMAT = 'DELTA'\n"
+ " )\n"
+ " WITH (\n"
+ " [col1] INT '$.col1'\n"
+ " ) AS [result]"
+ )
+ assert result == expected
+
+ def test_no_infer_types_with_column_with_title(self):
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_typed",
+ delta_table_path=DELTA_PATH,
+ infer_types=False,
+ with_columns=[WithColumn(name="col1", type="VARCHAR(100)", title="Column One")],
+ )
+
+ assert "[Column One] VARCHAR(100) '$.col1'" in result
+
+ def test_no_infer_types_multiple_with_columns(self):
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_typed",
+ delta_table_path=DELTA_PATH,
+ infer_types=False,
+ with_columns=[
+ WithColumn(name="id", type="INT"),
+ WithColumn(name="name", type="VARCHAR(200)", title="Full Name"),
+ WithColumn(name="created_at", type="DATETIME2"),
+ ],
+ )
+
+ assert "SELECT\n *" in result
+ assert "[id] INT '$.id'" in result
+ assert "[Full Name] VARCHAR(200) '$.name'" in result
+ assert "[created_at] DATETIME2 '$.created_at'" in result
+ assert "WITH (" in result
+
+ def test_no_infer_types_raises_when_with_columns_missing(self):
+ with pytest.raises(ValueError, match="with_columns must be provided"):
+ generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=DELTA_PATH,
+ infer_types=False,
+ with_columns=None,
+ )
+
+ def test_no_infer_types_raises_when_with_columns_empty(self):
+ with pytest.raises(ValueError, match="with_columns must be provided"):
+ generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=DELTA_PATH,
+ infer_types=False,
+ with_columns=[],
+ )
+
+ def test_delta_table_path_appears_in_bulk_clause(self):
+ custom_path = "abfss://mycontainer@myaccount.dfs.core.windows.net/my/table"
+ result = generate_create_or_alter_view_over_delta_table_ddl(
+ schema_name="dbo",
+ view_name="vw_test",
+ delta_table_path=custom_path,
+ infer_types=True,
+ select_columns=[SelectColumn(name="x")],
+ )
+
+ assert f"BULK '{custom_path}'" in result
+ assert "FORMAT = 'DELTA'" in result