diff --git a/README.md b/README.md index 7d3dd43..a66d3b6 100644 --- a/README.md +++ b/README.md @@ -201,3 +201,115 @@ except EmailError as e: - The `from_email` must use a configured MailFrom address from your ACS resource - Set `email_sending_disabled=True` during development to prevent actual emails from being sent - Attachments must be base64-encoded before adding to the `EmailAttachment` object + +--- + +### `sql` + +Includes utility functions for working with SQL databases via pyodbc, with AAD token-based authentication. Provides helpers for connecting to Synapse serverless SQL and Fabric SQL Analytics endpoints, managing views over Delta Lake tables, and executing DDL statements. + +**⚠️ Note: This module requires ODBC Driver 18 for SQL Server or later.** + +| Component Name | Object Type | Description | Import syntax | +|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| get_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql import get_pyodbc_connection | +| get_pyodbc_connection_with_token | Function | Opens a pyodbc connection using a pre-acquired AAD token. Useful when running inside Synapse notebooks. | from corvus_python.sql import get_pyodbc_connection_with_token | +| execute_ddl | Function | Executes a DDL statement (e.g. CREATE VIEW) using a pyodbc connection. | from corvus_python.sql import execute_ddl | +| create_or_alter_view_over_delta_table | Function | Creates or alters a SQL view over a Delta Lake table, with support for inferred or explicit column types. | from corvus_python.sql import create_or_alter_view_over_delta_table | +| drop_views_in_schema | Function | Drops all views in a given schema. | from corvus_python.sql import drop_views_in_schema | +| SelectColumn | Class | Dataclass representing a column to select, with an optional display title. | from corvus_python.sql import SelectColumn | +| WithColumn | Class | Dataclass representing a column with an explicit type for use in OPENROWSET WITH clauses. | from corvus_python.sql import WithColumn | + +#### `sql.synapse` + +Convenience wrappers specific to Synapse serverless SQL endpoints. + +| Component Name | Object Type | Description | Import syntax | +|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| get_synapse_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using AAD tokens. Builds the server URL from the workspace name. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection | +| get_synapse_sql_pyodbc_connection_with_token | Function | Opens a pyodbc connection to a Synapse serverless SQL endpoint using a pre-acquired AAD token. Ideal for Synapse notebooks. | from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token | +| create_database_if_not_exists | Function | Creates a database if it doesn't already exist. Requires a connection to the master database. | from corvus_python.sql.synapse import create_database_if_not_exists | + +#### `sql.fabric` + +Convenience wrappers specific to Fabric SQL Analytics endpoints. + +| Component Name | Object Type | Description | Import syntax | +|--------------------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| get_fabric_sql_pyodbc_connection | Function | Opens a pyodbc connection to a Fabric SQL Analytics endpoint using AAD tokens. | from corvus_python.sql.fabric import get_fabric_sql_pyodbc_connection | + +#### Usage Example + +```python +from corvus_python.sql import ( + get_pyodbc_connection, + create_or_alter_view_over_delta_table, + drop_views_in_schema, + SelectColumn, + WithColumn, +) + +# Connect using DefaultAzureCredential +conn = get_pyodbc_connection( + server="myworkspace-ondemand.sql.azuresynapse.net", + database="my_database", + use_managed_identity=False, +) + +# Create a view with inferred types +create_or_alter_view_over_delta_table( + conn=conn, + schema_name="dbo", + view_name="my_view", + delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table", + infer_types=True, + select_columns=[ + SelectColumn(name="id"), + SelectColumn(name="full_name", title="Name"), + ], +) + +# Create a view with explicit types +create_or_alter_view_over_delta_table( + conn=conn, + schema_name="dbo", + view_name="my_typed_view", + delta_table_path="abfss://container@storageaccount.dfs.core.windows.net/path/to/table", + infer_types=False, + with_columns=[ + WithColumn(name="id", type="INT"), + WithColumn(name="full_name", type="VARCHAR(200)", title="Name"), + ], +) + +# Drop all views in a schema +drop_views_in_schema(conn, schema_name="dbo") +``` + +Or use the Synapse/Fabric-specific helpers: + +```python +from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection + +conn = get_synapse_sql_pyodbc_connection( + workspace_name="myworkspace", + database="my_database", + use_managed_identity=False, +) +``` + +#### Usage from a Synapse Notebook + +When running inside a Synapse notebook, you can use `mssparkutils` to acquire a token and pass it directly: + +```python +from corvus_python.sql.synapse import get_synapse_sql_pyodbc_connection_with_token + +token = mssparkutils.credentials.getToken("DW") + +conn = get_synapse_sql_pyodbc_connection_with_token( + workspace_name="myworkspace", + database="my_database", + token=token, +) +``` diff --git a/src/corvus_python/pyspark/synapse/sync_tables_locally.py b/src/corvus_python/pyspark/synapse/sync_tables_locally.py index df8ffd5..b985e0f 100644 --- a/src/corvus_python/pyspark/synapse/sync_tables_locally.py +++ b/src/corvus_python/pyspark/synapse/sync_tables_locally.py @@ -2,8 +2,6 @@ from dataclasses import dataclass from typing import List, Tuple -import pyodbc -import struct from pyspark.sql import SparkSession from corvus_python.pyspark.utilities import LocalSparkSessionConfig, get_or_create_spark_session @@ -18,41 +16,11 @@ class ObjectSyncDetails: database_name (str): Name of the database. tables (List[str]): List of tables in the database. """ + database_name: str tables: List[str] -def _get_pyodbc_connection(workspace_name: str) -> pyodbc.Connection: - """Gets an ODBC connection to the SQL Serverless endpoint of a Synapse workspace. - - Args: - workspace_name (str): Name of the workspace. - - Returns: - pyodbc.Connection: ODBC connection to the SQL Serverless endpoint. - - Raises: - RuntimeError: If user is not logged into the Azure CLI. - - Note: - The connection object returned can be used in a pandas read_sql query to pull data from Synapse. E.g.: - df = pd.read_sql(f'SELECT * FROM db_name.dbo.table_name', conn) - """ - server = f'{workspace_name}-ondemand.sql.azuresynapse.net' - database = 'master' - driver = '{ODBC Driver 17 for SQL Server}' - connection_string = f'Driver={driver};Server=tcp:{server},1433;\ - Database={database};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;' - - token = get_az_cli_token('https://database.windows.net/.default') - - token_bytes = token.encode("UTF-16-LE") - token_struct = struct.pack(f' Tuple[str, dict]: """Gets the Spark JDBC connection properties for a Synapse SQL Serverless endpoint. @@ -65,28 +33,25 @@ def _get_jdbc_connection_properties(workspace_name: str) -> Tuple[str, dict]: Raises: RuntimeError: If user is not logged into the Azure CLI. """ - server = f'{workspace_name}-ondemand.sql.azuresynapse.net' - database = 'master' + server = f"{workspace_name}-ondemand.sql.azuresynapse.net" + database = "master" jdbc_url = f"jdbc:sqlserver://{server}:1433;database={database};encrypt=true;\ trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30" - token = get_az_cli_token('https://database.windows.net/.default') + token = get_az_cli_token("https://database.windows.net/.default") - connection_properties = { - "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", - "accessToken": token - } + connection_properties = {"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "accessToken": token} return jdbc_url, connection_properties def sync_synapse_tables_to_local_spark( - workspace_name: str, - object_sync_details: List[ObjectSyncDetails], - overwrite: bool = False, - spark: SparkSession = None - ): + workspace_name: str, + object_sync_details: List[ObjectSyncDetails], + overwrite: bool = False, + spark: SparkSession = None, +): """Syncs tables from a Synapse workspace to a local Spark metastore. Args: @@ -111,20 +76,17 @@ def sync_synapse_tables_to_local_spark( table_exists = spark.catalog.tableExists(table, osd.database_name) if table_exists and not overwrite: - print('\033[93m' + f"Table '{table}' in database '{osd.database_name}' already exists and \ -overwrite is set to False. Skipping table sync." + '\033[0m') + print( + "\033[93m" + + f"Table '{table}' in database '{osd.database_name}' already exists and \ +overwrite is set to False. Skipping table sync." + + "\033[0m" + ) continue else: spark.read.jdbc( - url=jdbc_url, - table=f"{osd.database_name}.dbo.{table}", - properties=connection_properties - ) \ - .coalesce(1) \ - .write \ - .format("delta") \ - .mode("overwrite") \ - .saveAsTable(f"{osd.database_name}.{table}") + url=jdbc_url, table=f"{osd.database_name}.dbo.{table}", properties=connection_properties + ).coalesce(1).write.format("delta").mode("overwrite").saveAsTable(f"{osd.database_name}.{table}") if not existing_spark_session: spark.stop() diff --git a/src/corvus_python/sql/__init__.py b/src/corvus_python/sql/__init__.py new file mode 100644 index 0000000..5d2dc29 --- /dev/null +++ b/src/corvus_python/sql/__init__.py @@ -0,0 +1,19 @@ +from .sql_utils import ( + execute_ddl, + get_pyodbc_connection, + get_pyodbc_connection_with_token, + create_or_alter_view_over_delta_table, + drop_views_in_schema, + SelectColumn, + WithColumn, +) + +__all__ = [ + "execute_ddl", + "get_pyodbc_connection", + "get_pyodbc_connection_with_token", + "create_or_alter_view_over_delta_table", + "drop_views_in_schema", + "SelectColumn", + "WithColumn", +] diff --git a/src/corvus_python/sql/fabric/__init__.py b/src/corvus_python/sql/fabric/__init__.py new file mode 100644 index 0000000..c92cfa9 --- /dev/null +++ b/src/corvus_python/sql/fabric/__init__.py @@ -0,0 +1,5 @@ +from .fabric_sql_utils import get_fabric_sql_pyodbc_connection + +__all__ = [ + "get_fabric_sql_pyodbc_connection", +] diff --git a/src/corvus_python/sql/fabric/fabric_sql_utils.py b/src/corvus_python/sql/fabric/fabric_sql_utils.py new file mode 100644 index 0000000..334e7f3 --- /dev/null +++ b/src/corvus_python/sql/fabric/fabric_sql_utils.py @@ -0,0 +1,29 @@ +import pyodbc +from ..sql_utils import get_pyodbc_connection + + +def get_fabric_sql_pyodbc_connection( + sql_analytics_endpoint: str, + database: str, + use_managed_identity: bool = True, + client_id: str | None = None, +) -> pyodbc.Connection: + """Open a pyodbc connection to Fabric SQL Analytics endpoint using AAD tokens. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + sql_analytics_endpoint: + The Fabric SQL Analytics endpoint, e.g. ".datawarehouse.fabric.microsoft.com". + database: + The database name. + use_managed_identity: + If True, uses ManagedIdentityCredential (for Container Apps / ACI). + If False, uses DefaultAzureCredential (for local dev — picks up + az login, VS Code, environment variables, etc.). + client_id: + Optional user-assigned managed identity client ID. Only used when + use_managed_identity is True. + """ + return get_pyodbc_connection(sql_analytics_endpoint, database, use_managed_identity, client_id) diff --git a/src/corvus_python/sql/sql_utils.py b/src/corvus_python/sql/sql_utils.py new file mode 100644 index 0000000..fa4b0df --- /dev/null +++ b/src/corvus_python/sql/sql_utils.py @@ -0,0 +1,249 @@ +from dataclasses import dataclass +from typing import List + +import pyodbc +import struct +from azure.identity import DefaultAzureCredential, ManagedIdentityCredential + + +def execute_ddl( + conn: pyodbc.Connection, + ddl: str, +) -> None: + """Execute a DDL statement (e.g. CREATE VIEW) using a pyodbc connection.""" + with conn.cursor() as cursor: + cursor.execute(ddl) + conn.commit() + + +def get_pyodbc_connection( + server: str, + database: str, + use_managed_identity: bool = True, + client_id: str | None = None, +) -> pyodbc.Connection: + """Open a pyodbc connection to Synapse serverless SQL or Fabric SQL Analytics endpoint using AAD tokens. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + server: + The Synapse serverless endpoint or Fabric SQL Analytics endpoint, + e.g. "myworkspace-ondemand.sql.azuresynapse.net" + or ".datawarehouse.fabric.microsoft.com". + database: + The database name. + use_managed_identity: + If True, uses ManagedIdentityCredential (for Container Apps / ACI). + If False, uses DefaultAzureCredential (for local dev — picks up + az login, VS Code, environment variables, etc.). + client_id: + Optional user-assigned managed identity client ID. Only used when + use_managed_identity is True. + """ + # Acquire an AAD token for the Azure SQL resource + resource = "https://database.windows.net/.default" + + if use_managed_identity: + credential = ManagedIdentityCredential(client_id=client_id) + else: + credential = DefaultAzureCredential() + + token = credential.get_token(resource).token + + return get_pyodbc_connection_with_token(server, database, token) + + +def get_pyodbc_connection_with_token( + server: str, + database: str, + token: str, +) -> pyodbc.Connection: + """ + Open a pyodbc connection to Synapse serverless SQL or Fabric SQL Analytics endpoint using a pre-acquired AAD token. + Parameters + ---------- + server: + The Synapse serverless endpoint or Fabric SQL Analytics endpoint, + e.g. "myworkspace-ondemand.sql.azuresynapse.net" + or ".datawarehouse.fabric.microsoft.com". + database: + The database name. + token: + An AAD access token with the appropriate scopes/permissions for Azure SQL. + """ + + # Pack the token into the bytes format pyodbc expects for + # SQL_COPT_SS_ACCESS_TOKEN (driver-level AAD token injection). + token_bytes = token.encode("utf-16-le") + token_struct = struct.pack(f" str: + """ + Generate the DDL statement to create or alter a SQL view over a Delta Lake table. + + + Parameters + ---------- + schema_name: + The schema for the view, e.g. "dbo". + view_name: + The name of the view to create or alter. + delta_table_path: + The path to the Delta Lake table, e.g. "abfss://container@storageaccount.dfs.core.windows.net/path/to/table". + infer_types: + If True, the view will be created with "SELECT [col] AS [title],..." and Synapse will infer the column types + from the data. + If False, the view will be created with "SELECT * FROM OPENROWSET(...) WITH ([col] type, ...)" and the column + types will be determined by the caller and passed in via with_columns. + select_columns: + If infer_types is True, the list of columns to select from the Delta table, along with optional titles for the + view. If a title is not provided for a column, the original column name will be used in the view. + with_columns: + If infer_types is False, the list of columns and their types to define in the WITH clause of the OPENROWSET + statement. The column names should match the columns in the Delta table, and the types should be valid + Synapse SQL types (e.g. "INT", "VARCHAR(100)", "DATETIME2", etc.). + """ + + if infer_types: + if not select_columns: + raise ValueError("select_columns must be provided when infer_types is True") + + col_def_list: list[str] = [] + for column in select_columns: + if column.title: + col_def_list.append(f"[{column.name}] AS [{column.title}]") + else: + col_def_list.append(f"[{column.name}]") + + select_clause = ",\n ".join(col_def_list) + with_clause = "" + else: + select_clause = "*" + with_column_list: list[str] = [] + if not with_columns: + raise ValueError("with_columns must be provided when infer_types is False") + for column in with_columns: + with_column_list.append(f"[{column.title or column.name}] {column.type} '$.{column.name}'") + with_clause = ",\n ".join(with_column_list) + with_clause = f"\n WITH (\n {with_clause}\n )" + + bulk_clause = f"BULK '{delta_table_path}',\n" f"FORMAT = 'DELTA'" + + ddl = f"""\ +CREATE OR ALTER VIEW [{schema_name}].[{view_name}] +AS +SELECT + {select_clause} +FROM OPENROWSET( + {bulk_clause} + ){with_clause} AS [result]""" + + return ddl + + +def create_or_alter_view_over_delta_table( + conn: pyodbc.Connection, + schema_name: str, + view_name: str, + delta_table_path: str, + infer_types: bool = True, + select_columns: List[SelectColumn] | None = None, + with_columns: List[WithColumn] | None = None, +) -> None: + """ + Create or alter a SQL view over a Delta Lake table. + + + Parameters + ---------- + conn: + An open pyodbc connection to the Synapse workspace. + schema_name: + The schema for the view, e.g. "dbo". + view_name: + The name of the view to create or alter. + delta_table_path: + The path to the Delta Lake table, e.g. "abfss://container@storageaccount.dfs.core.windows.net/path/to/table". + infer_types: + If True, the view will be created with "SELECT [col] AS [title],..." and Synapse will infer the column types + from the data. + If False, the view will be created with "SELECT * FROM OPENROWSET(...) WITH ([col] type, ...)" and the column + types will be determined by the caller and passed in via with_columns. + select_columns: + If infer_types is True, the list of columns to select from the Delta table, along with optional titles for the + view. If a title is not provided for a column, the original column name will be used in the view. + with_columns: + If infer_types is False, the list of columns and their types to define in the WITH clause of the OPENROWSET + statement. The column names should match the columns in the Delta table, and the types should be valid + Synapse SQL types (e.g. "INT", "VARCHAR(100)", "DATETIME2", etc.). + """ + + ddl = generate_create_or_alter_view_over_delta_table_ddl( + schema_name, view_name, delta_table_path, infer_types, select_columns, with_columns + ) + + execute_ddl(conn, ddl) + + +def drop_views_in_schema( + conn: pyodbc.Connection, + schema_name: str = "dbo", +) -> None: + """ + Drop all views in a given schema. + + Parameters + ---------- + conn: + An open pyodbc connection to the Synapse workspace. + schema_name: + The schema from which to drop all views, e.g. "dbo". + """ + cursor = conn.cursor() + cursor.execute( + f""" + SELECT TABLE_NAME + FROM INFORMATION_SCHEMA.VIEWS + WHERE TABLE_SCHEMA = '{schema_name}' + """ + ) + views = cursor.fetchall() + for view in views: + view_name = view[0] + drop_ddl = f"DROP VIEW [{schema_name}].[{view_name}]" + execute_ddl(conn, drop_ddl) diff --git a/src/corvus_python/sql/synapse/__init__.py b/src/corvus_python/sql/synapse/__init__.py new file mode 100644 index 0000000..148029f --- /dev/null +++ b/src/corvus_python/sql/synapse/__init__.py @@ -0,0 +1,11 @@ +from .synapse_sql_utils import ( + get_synapse_sql_pyodbc_connection, + get_synapse_sql_pyodbc_connection_with_token, + create_database_if_not_exists, +) + +__all__ = [ + "get_synapse_sql_pyodbc_connection", + "get_synapse_sql_pyodbc_connection_with_token", + "create_database_if_not_exists", +] diff --git a/src/corvus_python/sql/synapse/synapse_sql_utils.py b/src/corvus_python/sql/synapse/synapse_sql_utils.py new file mode 100644 index 0000000..448b79d --- /dev/null +++ b/src/corvus_python/sql/synapse/synapse_sql_utils.py @@ -0,0 +1,81 @@ +import pyodbc +from ..sql_utils import ( + get_pyodbc_connection, + get_pyodbc_connection_with_token, +) + + +def get_synapse_sql_pyodbc_connection( + workspace_name: str, + database: str, + use_managed_identity: bool = True, + client_id: str | None = None, +) -> pyodbc.Connection: + """Open a pyodbc connection to Synapse serverless SQL endpoint using AAD tokens. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + workspace_name: + The Synapse workspace name, e.g. "myworkspace". + database: + The database name. + use_managed_identity: + If True, uses ManagedIdentityCredential (for Container Apps / ACI). + If False, uses DefaultAzureCredential (for local dev — picks up + az login, VS Code, environment variables, etc.). + client_id: + Optional user-assigned managed identity client ID. Only used when + use_managed_identity is True. + """ + server = f"{workspace_name}-ondemand.sql.azuresynapse.net" + return get_pyodbc_connection(server, database, use_managed_identity, client_id) + + +def get_synapse_sql_pyodbc_connection_with_token( + workspace_name: str, + database: str, + token: str, +) -> pyodbc.Connection: + """Open a pyodbc connection to Synapse serverless SQL endpoint using a pre-acquired AAD token. + + Requires ODBC Driver 18 for SQL Server or later. + + Parameters + ---------- + workspace_name: + The Synapse workspace name, e.g. "myworkspace". + database: + The database name. + token: + A pre-acquired AAD token with audience "https://database.windows.net/.default". + """ + server = f"{workspace_name}-ondemand.sql.azuresynapse.net" + return get_pyodbc_connection_with_token(server, database, token) + + +def create_database_if_not_exists( + conn: pyodbc.Connection, + database_name: str, +) -> None: + """Create a database if it doesn't already exist.""" + + if conn.getinfo(pyodbc.SQL_DATABASE_NAME) != "master": + raise ValueError("Connection must be to master database to create a new database.") + + prev_autocommit = conn.autocommit + conn.autocommit = True + + ddl = f""" + IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{database_name}') + BEGIN + CREATE DATABASE [{database_name}] + END + """ + + try: + with conn.cursor() as cursor: + cursor.execute(ddl) + finally: + conn.autocommit = prev_autocommit diff --git a/tests/unit/test_sql_utils.py b/tests/unit/test_sql_utils.py new file mode 100644 index 0000000..61dd752 --- /dev/null +++ b/tests/unit/test_sql_utils.py @@ -0,0 +1,170 @@ +import pytest + +from corvus_python.sql.sql_utils import ( + SelectColumn, + WithColumn, + generate_create_or_alter_view_over_delta_table_ddl, +) + +DELTA_PATH = "abfss://container@storageaccount.dfs.core.windows.net/path/to/table" + + +class TestGenerateCreateOrAlterViewOverDeltaTableDdl: + def test_infer_types_single_column_no_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[SelectColumn(name="col1")], + ) + + expected = ( + "CREATE OR ALTER VIEW [dbo].[vw_test]\n" + "AS\n" + "SELECT\n" + " [col1]\n" + "FROM OPENROWSET(\n" + f" BULK '{DELTA_PATH}',\n" + "FORMAT = 'DELTA'\n" + " ) AS [result]" + ) + assert result == expected + + def test_infer_types_single_column_with_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[SelectColumn(name="col1", title="Column One")], + ) + + assert "[col1] AS [Column One]" in result + + def test_infer_types_multiple_columns_mixed_titles(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="myschema", + view_name="vw_multi", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[ + SelectColumn(name="id"), + SelectColumn(name="first_name", title="First Name"), + SelectColumn(name="last_name"), + ], + ) + + assert "CREATE OR ALTER VIEW [myschema].[vw_multi]" in result + assert "[id]" in result + assert "[first_name] AS [First Name]" in result + assert "[last_name]" in result + # Columns should be comma-separated + assert ",\n " in result + + def test_infer_types_raises_when_select_columns_missing(self): + with pytest.raises(ValueError, match="select_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=None, + ) + + def test_infer_types_raises_when_select_columns_empty(self): + with pytest.raises(ValueError, match="select_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=True, + select_columns=[], + ) + + def test_no_infer_types_single_with_column_no_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_typed", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[WithColumn(name="col1", type="INT")], + ) + + expected = ( + "CREATE OR ALTER VIEW [dbo].[vw_typed]\n" + "AS\n" + "SELECT\n" + " *\n" + "FROM OPENROWSET(\n" + f" BULK '{DELTA_PATH}',\n" + "FORMAT = 'DELTA'\n" + " )\n" + " WITH (\n" + " [col1] INT '$.col1'\n" + " ) AS [result]" + ) + assert result == expected + + def test_no_infer_types_with_column_with_title(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_typed", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[WithColumn(name="col1", type="VARCHAR(100)", title="Column One")], + ) + + assert "[Column One] VARCHAR(100) '$.col1'" in result + + def test_no_infer_types_multiple_with_columns(self): + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_typed", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[ + WithColumn(name="id", type="INT"), + WithColumn(name="name", type="VARCHAR(200)", title="Full Name"), + WithColumn(name="created_at", type="DATETIME2"), + ], + ) + + assert "SELECT\n *" in result + assert "[id] INT '$.id'" in result + assert "[Full Name] VARCHAR(200) '$.name'" in result + assert "[created_at] DATETIME2 '$.created_at'" in result + assert "WITH (" in result + + def test_no_infer_types_raises_when_with_columns_missing(self): + with pytest.raises(ValueError, match="with_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=None, + ) + + def test_no_infer_types_raises_when_with_columns_empty(self): + with pytest.raises(ValueError, match="with_columns must be provided"): + generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=DELTA_PATH, + infer_types=False, + with_columns=[], + ) + + def test_delta_table_path_appears_in_bulk_clause(self): + custom_path = "abfss://mycontainer@myaccount.dfs.core.windows.net/my/table" + result = generate_create_or_alter_view_over_delta_table_ddl( + schema_name="dbo", + view_name="vw_test", + delta_table_path=custom_path, + infer_types=True, + select_columns=[SelectColumn(name="x")], + ) + + assert f"BULK '{custom_path}'" in result + assert "FORMAT = 'DELTA'" in result