diff --git a/README.md b/README.md index c7ff8da0..0192ebfd 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,31 @@ run = data_contract.test() if not run.has_passed(): print("Data quality validation failed.") # Abort pipeline, alert, or take corrective actions... + +# run quality checks with DQX engine (Databricks server type only) +# requires: pip install datacontract-cli[dqx] +data_contract_dqx = DataContract( + data_contract_file="odcs.yaml", + server="production", + test_engine="dqx", +) +run_dqx = data_contract_dqx.test() + +# access all executed DQX rule checks +all_checks = run_dqx.checks + +# access failed/error/warning DQX rule checks +failed_checks = [c for c in run_dqx.checks if c.result in ("failed", "error", "warning")] + +for check in failed_checks: + print(f"[{check.result}] {check.name} ({check.model})") + print(f"reason: {check.reason}") + +# full structured output +print(run_dqx.pretty()) + +if not run_dqx.has_passed(): + print("DQX data quality validation failed.") ``` ## How to @@ -236,6 +261,7 @@ A list of available extras: | Avro Support | `pip install datacontract-cli[avro]` | | Google BigQuery | `pip install datacontract-cli[bigquery]` | | Databricks Integration | `pip install datacontract-cli[databricks]` | +| DQX (Databricks quality checks) | `pip install datacontract-cli[dqx]` | | DuckDB (local/S3/GCS/Azure file testing) | `pip install datacontract-cli[duckdb]` | | Iceberg | `pip install datacontract-cli[iceberg]` | | Kafka Integration | `pip install datacontract-cli[kafka]` | diff --git a/datacontract/api.py b/datacontract/api.py index 09a94073..425868d7 100644 --- a/datacontract/api.py +++ b/datacontract/api.py @@ -323,11 +323,18 @@ async def test( examples=["https://api.datamesh-manager.com/api/test-results"], ), ] = None, + test_engine: Annotated[ + str, + Query( + description="The engine used for quality checks. Supported values: soda (default), dqx (Databricks only).", + examples=["soda", "dqx"], + ), + ] = "soda", ) -> Run: check_api_key(api_key) logging.info("Testing data contract...") logging.info(body) - return DataContract(data_contract_str=body, server=server, publish_url=publish_url).test() + return DataContract(data_contract_str=body, server=server, publish_url=publish_url, test_engine=test_engine).test() @app.post( diff --git a/datacontract/cli.py b/datacontract/cli.py index ec839386..7b6c1572 100644 --- a/datacontract/cli.py +++ b/datacontract/cli.py @@ -161,6 +161,12 @@ def test( typer.Option(help="SSL verification when publishing the data contract."), ] = True, debug: debug_option = None, + test_engine: Annotated[ + str, + typer.Option( + help="The engine used for quality checks. Supported values: `soda` (default), `dqx` (Databricks only)." + ), + ] = "soda", ): """ Run schema and quality tests on configured servers. @@ -177,6 +183,7 @@ def test( publish_url=publish, server=server, ssl_verification=ssl_verification, + test_engine=test_engine, ).test() if logs: _print_logs(run) diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py index b7da9dd1..45614b2a 100644 --- a/datacontract/data_contract.py +++ b/datacontract/data_contract.py @@ -32,6 +32,7 @@ def __init__( inline_definitions: bool = True, ssl_verification: bool = True, publish_test_results: bool = False, + test_engine: str = "soda", ): self._data_contract_file = data_contract_file self._data_contract_str = data_contract_str @@ -44,6 +45,7 @@ def __init__( self._duckdb_connection = duckdb_connection self._inline_definitions = inline_definitions self._ssl_verification = ssl_verification + self._test_engine = test_engine @classmethod def init(cls, template: typing.Optional[str], schema: typing.Optional[str] = None) -> OpenDataContractStandard: @@ -103,7 +105,14 @@ def test(self) -> Run: inline_definitions=self._inline_definitions, ) - execute_data_contract_test(data_contract, run, self._server, self._spark, self._duckdb_connection) + execute_data_contract_test( + data_contract, + run, + self._server, + self._spark, + self._duckdb_connection, + self._test_engine, + ) except DataContractException as e: run.checks.append( diff --git a/datacontract/engines/data_contract_test.py b/datacontract/engines/data_contract_test.py index c3594955..4d3414ca 100644 --- a/datacontract/engines/data_contract_test.py +++ b/datacontract/engines/data_contract_test.py @@ -15,6 +15,7 @@ from datacontract.engines.datacontract.check_that_datacontract_contains_valid_servers_configuration import ( check_that_datacontract_contains_valid_server_configuration, ) +from datacontract.engines.dqx.check_dqx_execute import check_dqx_execute from datacontract.engines.fastjsonschema.check_jsonschema import check_jsonschema from datacontract.engines.soda.check_soda_execute import check_soda_execute from datacontract.model.exceptions import DataContractException @@ -27,6 +28,7 @@ def execute_data_contract_test( server_name: str = None, spark: "SparkSession" = None, duckdb_connection: "DuckDBPyConnection" = None, + test_engine: str = "soda", ): if data_contract.schema_ is None or len(data_contract.schema_) == 0: raise DataContractException( @@ -53,13 +55,28 @@ def execute_data_contract_test( if server.type == "api": server = process_api_response(run, server) - run.checks.extend(create_checks(data_contract, server)) + normalized_test_engine = test_engine.lower() if test_engine is not None else "soda" + + if normalized_test_engine not in ["soda", "dqx"]: + raise DataContractException( + type="test", + name="Check that test engine is supported", + result=ResultEnum.error, + reason=f"Unsupported test engine '{test_engine}'. Supported values are: soda, dqx.", + engine="datacontract", + ) + + if normalized_test_engine == "soda": + run.checks.extend(create_checks(data_contract, server)) # TODO check server is supported type for nicer error messages # TODO check server credentials are complete for nicer error messages - if server.format == "json" and server.type != "kafka": - check_jsonschema(run, data_contract, server) - check_soda_execute(run, data_contract, server, spark, duckdb_connection) + if normalized_test_engine == "dqx": + check_dqx_execute(run, data_contract, server, spark) + else: + if server.format == "json" and server.type != "kafka": + check_jsonschema(run, data_contract, server) + check_soda_execute(run, data_contract, server, spark, duckdb_connection) def get_server(data_contract: OpenDataContractStandard, server_name: str = None) -> Server | None: diff --git a/datacontract/engines/dqx/__init__.py b/datacontract/engines/dqx/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/datacontract/engines/dqx/check_dqx_execute.py b/datacontract/engines/dqx/check_dqx_execute.py new file mode 100644 index 00000000..f19376b2 --- /dev/null +++ b/datacontract/engines/dqx/check_dqx_execute.py @@ -0,0 +1,201 @@ +import typing +import uuid + +import yaml +from open_data_contract_standard.model import OpenDataContractStandard, Server + +if typing.TYPE_CHECKING: + from pyspark.sql import SparkSession + +from datacontract.engines.data_contract_checks import to_schema_name +from datacontract.export.dqx_exporter import extract_quality_rules +from datacontract.model.run import Check, ResultEnum, Run + + +def _get_rule_check_name(dqx_rule: dict, index: int) -> str: + check_metadata = dqx_rule.get("check") or {} + function_name = check_metadata.get("function") or "dqx_rule" + return dqx_rule.get("name") or f"{function_name}_{index + 1}" + + +def check_dqx_execute( + run: Run, + data_contract: OpenDataContractStandard, + server: Server, + spark: "SparkSession" = None, +): + if data_contract is None: + run.log_warn("Cannot run engine dqx, as data contract is invalid") + return + + if server.type != "databricks": + run.log_info( + f"DQX execution is only available for server type 'databricks'. " + f"Configured server type is '{server.type}'. Skipping DQX checks." + ) + return + + rules_by_schema: list[tuple[str, list[dict]]] = [] + for schema_obj in data_contract.schema_ or []: + schema_name = to_schema_name(schema_obj, server.type) + dqx_rules = extract_quality_rules(schema_obj) + if dqx_rules: + rules_by_schema.append((schema_name, dqx_rules)) + + try: + from databricks.labs.dqx.engine import DQEngine + from databricks.sdk import WorkspaceClient + from pyspark.sql import SparkSession + except ImportError: + run.log_warn( + "Cannot run engine dqx, dependencies are missing. " + "Install datacontract-cli[dqx] to enable DQX execution." + ) + for schema_name, dqx_rules in rules_by_schema: + for index, dqx_rule in enumerate(dqx_rules): + check_name = _get_rule_check_name(dqx_rule, index) + run.checks.append( + Check( + id=str(uuid.uuid4()), + key=f"{schema_name}__{check_name}__dqx", + category="quality", + type="custom", + name=check_name, + model=schema_name, + engine="dqx", + language="python", + implementation=yaml.dump(dqx_rule, sort_keys=False), + result=ResultEnum.error, + reason="DQX dependencies are missing. Install datacontract-cli[dqx] to enable DQX execution.", + ) + ) + return + + # Resolve or create a Spark session. + # Priority: + # 1. Explicitly provided Spark session (programmatic API). + # 2. Existing active session (e.g. running on Databricks cluster). + # 3. Databricks Connect session (if databricks-connect is installed). + # 4. Fallback SparkSession.builder.getOrCreate() (e.g. local Spark). + spark_session = spark or SparkSession.getActiveSession() + + if spark_session is None: + # Try Databricks Connect first (optional dependency). + try: + from databricks.connect import DatabricksSession # type: ignore[import-not-found] + + run.log_info("Creating Spark session via Databricks Connect (DatabricksSession).") + spark_session = DatabricksSession.builder.getOrCreate() + except Exception: + spark_session = None + + if spark_session is None: + try: + run.log_info("Creating Spark session via SparkSession.builder.getOrCreate().") + spark_session = SparkSession.builder.getOrCreate() + except Exception: + spark_session = None + + if spark_session is None: + run.log_warn("Cannot run engine dqx, as no active Spark session is available.") + for schema_name, dqx_rules in rules_by_schema: + for index, dqx_rule in enumerate(dqx_rules): + check_name = _get_rule_check_name(dqx_rule, index) + run.checks.append( + Check( + id=str(uuid.uuid4()), + key=f"{schema_name}__{check_name}__dqx", + category="quality", + type="custom", + name=check_name, + model=schema_name, + engine="dqx", + language="python", + implementation=yaml.dump(dqx_rule, sort_keys=False), + result=ResultEnum.error, + reason="No active Spark session is available to execute DQX checks.", + ) + ) + return + + run.log_info("Running engine dqx") + dq_engine = DQEngine(workspace_client=WorkspaceClient(), spark=spark_session) + + for schema_name, dqx_rules in rules_by_schema: + + run.log_info(f"Running {len(dqx_rules)} DQX checks for model {schema_name}") + + try: + model_df = spark_session.read.table(schema_name) + except Exception as exc: + run.log_error(str(exc)) + for index, dqx_rule in enumerate(dqx_rules): + check_name = _get_rule_check_name(dqx_rule, index) + run.checks.append( + Check( + id=str(uuid.uuid4()), + key=f"{schema_name}__{check_name}__dqx", + category="quality", + type="custom", + name=check_name, + model=schema_name, + engine="dqx", + language="python", + implementation=yaml.dump(dqx_rule, sort_keys=False), + result=ResultEnum.error, + reason=str(exc), + ) + ) + continue + + for index, dqx_rule in enumerate(dqx_rules): + check_name = _get_rule_check_name(dqx_rule, index) + check_key = f"{schema_name}__{check_name}__dqx" + + try: + passed_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split(model_df, [dqx_rule]) + violations = invalid_df.count() + + criticality = str(dqx_rule.get("criticality", "error")).lower() + if violations == 0: + result = ResultEnum.passed + reason = f"all {passed_df.count()} row(s) passed the DQX rule" + elif criticality == "warn": + result = ResultEnum.warning + reason = f"{violations} row(s) violated the warning DQX rule" + else: + result = ResultEnum.failed + reason = f"{violations} row(s) violated the DQX rule" + + run.checks.append( + Check( + id=str(uuid.uuid4()), + key=check_key, + category="quality", + type="custom", + name=check_name, + model=schema_name, + engine="dqx", + language="python", + implementation=yaml.dump(dqx_rule, sort_keys=False), + result=result, + reason=reason, + ) + ) + except Exception as exc: + run.log_error(str(exc)) + run.checks.append( + Check( + id=str(uuid.uuid4()), + key=check_key, + category="quality", + type="custom", + name=check_name, + model=schema_name, + engine="dqx", + language="python", + implementation=yaml.dump(dqx_rule, sort_keys=False), + result=ResultEnum.error, + reason=str(exc), + ) + ) diff --git a/datacontract/export/dqx_exporter.py b/datacontract/export/dqx_exporter.py index 7f24f607..adc82e53 100644 --- a/datacontract/export/dqx_exporter.py +++ b/datacontract/export/dqx_exporter.py @@ -59,6 +59,13 @@ def process_quality_rule(rule: DataQuality, column_name: str) -> Dict[str, Any]: implementation = rule.implementation check = implementation[DqxKeys.CHECK] + # Ensure each rule has a stable name so that DQX doesn't + # try to infer it from the Spark column expression (which can + # trigger issues with certain Spark Connect representations). + if "name" not in implementation: + function_name = check.get(DqxKeys.FUNCTION, "dqx_rule") + implementation["name"] = f"{column_name}__{function_name}" if column_name else function_name + if column_name: arguments = check.setdefault(DqxKeys.ARGUMENTS, {}) diff --git a/pyproject.toml b/pyproject.toml index 095d69d7..722faa9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,10 @@ databricks = [ "numpy>=1.26.4,<2.0.0", # pyspark incompatible with numpy 2.0 ] +dqx = [ + "databricks-labs-dqx[datacontract]>=0.13.0,<1.0.0", +] + iceberg = [ "pyiceberg==0.10.0" ] @@ -134,7 +138,7 @@ protobuf = [ ] all = [ - "datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,sqlserver,s3,athena,trino,dbt,dbml,duckdb,iceberg,parquet,rdf,api,protobuf,oracle]" + "datacontract-cli[kafka,bigquery,csv,excel,snowflake,postgres,databricks,dqx,sqlserver,s3,athena,trino,dbt,dbml,duckdb,iceberg,parquet,rdf,api,protobuf,oracle]" ] # for development, we pin all libraries to an exact version diff --git a/tests/test_cli.py b/tests/test_cli.py index 9c6f40dc..48d80760 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -16,3 +16,9 @@ def test_file_does_not_exist(): result = runner.invoke(app, ["test", "unknown.yaml"]) assert result.exit_code == 1 assert "The file 'unknown.yaml' does not \nexist." in result.stdout + + +def test_file_does_not_exist_with_dqx_test_engine(): + result = runner.invoke(app, ["test", "unknown.yaml", "--test-engine", "dqx"]) + assert result.exit_code == 1 + assert "The file 'unknown.yaml' does not \nexist." in result.stdout diff --git a/tests/test_dqx_execution.py b/tests/test_dqx_execution.py new file mode 100644 index 00000000..06a88895 --- /dev/null +++ b/tests/test_dqx_execution.py @@ -0,0 +1,53 @@ +from types import SimpleNamespace + +from datacontract.engines.data_contract_test import execute_data_contract_test +from datacontract.engines.dqx.check_dqx_execute import check_dqx_execute +from datacontract.model.run import Run + + +class _DummyServer: + def __init__(self, server: str = "production", server_type: str = "databricks", fmt: str = "delta"): + self.server = server + self.type = server_type + self.format = fmt + + +def _dummy_contract(server: _DummyServer): + return SimpleNamespace( + id="urn:datacontract:test", + version="1.0.0", + dataProduct="test-product", + schema_=[SimpleNamespace(name="orders")], + servers=[server], + ) + + +def test_execute_data_contract_test_routes_to_dqx(monkeypatch): + server = _DummyServer() + contract = _dummy_contract(server) + run = Run.create_run() + + calls = {"dqx": 0, "soda": 0, "jsonschema": 0} + + monkeypatch.setattr("datacontract.engines.data_contract_test.get_server", lambda *_: server) + monkeypatch.setattr("datacontract.engines.data_contract_test.create_checks", lambda *_: []) + monkeypatch.setattr("datacontract.engines.data_contract_test.check_jsonschema", lambda *_: calls.__setitem__("jsonschema", 1)) + monkeypatch.setattr("datacontract.engines.data_contract_test.check_dqx_execute", lambda *_: calls.__setitem__("dqx", 1)) + monkeypatch.setattr("datacontract.engines.data_contract_test.check_soda_execute", lambda *_: calls.__setitem__("soda", 1)) + + execute_data_contract_test(contract, run, test_engine="dqx") + + assert calls["dqx"] == 1 + assert calls["soda"] == 0 + assert calls["jsonschema"] == 0 + + +def test_check_dqx_execute_skips_for_non_databricks_server(): + run = Run.create_run() + contract = SimpleNamespace(schema_=[SimpleNamespace(name="orders")]) + server = _DummyServer(server_type="postgres", fmt="table") + + check_dqx_execute(run, contract, server) + + assert len(run.checks) == 0 + assert any("DQX execution is only available for server type 'databricks'" in log.message for log in run.logs) diff --git a/tests/test_export_dqx.py b/tests/test_export_dqx.py index 59cc886d..71672dbc 100644 --- a/tests/test_export_dqx.py +++ b/tests/test_export_dqx.py @@ -17,21 +17,35 @@ def test_to_dqx(): actual = DataContract(data_contract_file="fixtures/dqx/datacontract.odcs.yaml").export("dqx") # Expected quality rules (based on the data contract) expected_rules = [ - {"check": {"arguments": {"column": "interaction_id"}, "function": "is_not_null"}, "criticality": "error"}, - {"check": {"arguments": {"column": "user_id"}, "function": "is_not_null"}, "criticality": "error"}, + { + "check": {"arguments": {"column": "interaction_id"}, "function": "is_not_null"}, + "name": "interaction_id__is_not_null", + "criticality": "error", + }, + { + "check": {"arguments": {"column": "user_id"}, "function": "is_not_null"}, + "name": "user_id__is_not_null", + "criticality": "error", + }, { "check": { "arguments": {"columns": ["user_id"], "ref_columns": ["id"], "ref_table": "catalog1.schema1.user"}, "function": "foreign_key", }, + "name": "user_id__foreign_key", + "criticality": "error", + }, + { + "check": {"arguments": {"columns": ["user_id"]}, "function": "is_unique"}, + "name": "user_id__is_unique", "criticality": "error", }, - {"check": {"arguments": {"columns": ["user_id"]}, "function": "is_unique"}, "criticality": "error"}, { "check": { "arguments": {"allowed": ["click", "view", "purchase", "like", "share"], "column": "interaction_type"}, "function": "is_in_list", }, + "name": "interaction_type__is_in_list", "criticality": "error", }, { @@ -39,18 +53,25 @@ def test_to_dqx(): "arguments": {"column": "interaction_timestamp", "timestamp_format": "yyyy-MM-dd HH:mm:ss"}, "function": "is_valid_timestamp", }, + "name": "interaction_timestamp__is_valid_timestamp", "criticality": "error", }, { "check": {"arguments": {"column": "interaction_timestamp", "offset": "1h"}, "function": "not_in_future"}, + "name": "interaction_timestamp__not_in_future", "criticality": "warning", }, - {"check": {"arguments": {"column": "item_id"}, "function": "is_not_null"}, "criticality": "minor"}, + { + "check": {"arguments": {"column": "item_id"}, "function": "is_not_null"}, + "name": "item_id__is_not_null", + "criticality": "minor", + }, { "check": { "arguments": {"column": "interaction_value", "max_limit": 1000, "min_limit": 0}, "function": "is_in_range", }, + "name": "interaction_value__is_in_range", "criticality": "minor", }, { @@ -58,6 +79,7 @@ def test_to_dqx(): "arguments": {"column": "location", "regex": "^[A-Za-z]+(?:[\\s-][A-Za-z]+)*$"}, "function": "regex_match", }, + "name": "location__regex_match", "criticality": "minor", }, { @@ -65,6 +87,7 @@ def test_to_dqx(): "arguments": {"allowed": ["mobile", "desktop", "tablet"], "column": "device"}, "function": "is_in_list", }, + "name": "device__is_in_list", "criticality": "minor", }, { @@ -72,6 +95,7 @@ def test_to_dqx(): "arguments": {"column": "interaction_date", "date_format": "yyyy-MM-dd"}, "function": "is_valid_date", }, + "name": "interaction_date__is_valid_date", "criticality": "error", }, { @@ -79,6 +103,7 @@ def test_to_dqx(): "arguments": {"column": "time_since_last_interaction", "days": 30}, "function": "is_older_than_n_days", }, + "name": "time_since_last_interaction__is_older_than_n_days", "criticality": "minor", }, { @@ -86,6 +111,7 @@ def test_to_dqx(): "arguments": {"column": "is_active", "expression": "is_active IN ('true', 'false')"}, "function": "sql_expression", }, + "name": "is_active__sql_expression", "criticality": "minor", }, { @@ -93,6 +119,7 @@ def test_to_dqx(): "arguments": {"column": "user_profile.age", "max_limit": 120, "min_limit": 13}, "function": "is_in_range", }, + "name": "user_profile.age__is_in_range", "criticality": "minor", }, { @@ -100,6 +127,7 @@ def test_to_dqx(): "arguments": {"allowed": ["male", "female", "other"], "column": "user_profile.gender"}, "function": "is_in_list", }, + "name": "user_profile.gender__is_in_list", "criticality": "minor", }, { @@ -107,10 +135,12 @@ def test_to_dqx(): "arguments": {"column": "user_profile.location_details.country", "regex": "^[A-Z]{2}$"}, "function": "regex_match", }, + "name": "user_profile.location_details.country__regex_match", "criticality": "minor", }, { "check": {"arguments": {"column": "related_items"}, "function": "is_not_null_and_not_empty_array"}, + "name": "related_items__is_not_null_and_not_empty_array", "criticality": "minor", }, { @@ -118,6 +148,7 @@ def test_to_dqx(): "arguments": {"column": "interaction_context.page_url", "regex": "^https?://.+$"}, "function": "regex_match", }, + "name": "interaction_context.page_url__regex_match", "criticality": "minor", }, { @@ -128,6 +159,7 @@ def test_to_dqx(): }, "function": "is_in_list", }, + "name": "interaction_context.device_type__is_in_list", "criticality": "minor", }, { @@ -135,6 +167,7 @@ def test_to_dqx(): "for_each_column": ["user_id", "interaction_id", "interaction_type", "interaction_timestamp"], "function": "is_not_null", }, + "name": "is_not_null", "criticality": "error", "filter": "interaction_type IN ('click', 'view', 'purchase')", }, @@ -143,11 +176,13 @@ def test_to_dqx(): "arguments": {"column": "interaction_value", "max_limit": 1000, "min_limit": 0}, "function": "is_in_range", }, + "name": "is_in_range", "criticality": "warning", "filter": "interaction_type = 'purchase'", }, { "check": {"arguments": {"allowed": ["mobile", "tablet"], "column": "device"}, "function": "is_in_list"}, + "name": "is_in_list", "criticality": "minor", "filter": "device = 'mobile'", }, @@ -156,11 +191,13 @@ def test_to_dqx(): "arguments": {"column": "user_profile.age", "max_limit": 120, "min_limit": 13}, "function": "is_in_range", }, + "name": "is_in_range", "criticality": "minor", "filter": "user_profile.age IS NOT NULL", }, { "check": {"arguments": {"columns": ["user_id", "interaction_date"]}, "function": "is_unique"}, + "name": "is_unique", "criticality": "error", }, { @@ -168,6 +205,7 @@ def test_to_dqx(): "arguments": {"aggr_type": "max", "column": "interaction_value", "limit": 1000}, "function": "is_aggr_not_greater_than", }, + "name": "is_aggr_not_greater_than", "criticality": "error", "filter": "interaction_type = 'purchase'", }, @@ -176,6 +214,7 @@ def test_to_dqx(): "arguments": {"aggr_type": "min", "column": "user_profile.age", "group_by": ["user_id"], "limit": 21}, "function": "is_aggr_not_less_than", }, + "name": "is_aggr_not_less_than", "criticality": "error", }, { @@ -183,6 +222,7 @@ def test_to_dqx(): "arguments": {"aggr_type": "count", "column": "interaction_date", "limit": 24}, "function": "is_aggr_equal", }, + "name": "is_aggr_equal", "criticality": "error", }, { @@ -190,6 +230,7 @@ def test_to_dqx(): "arguments": {"columns": ["user_id"], "ref_columns": ["id"], "ref_df_name": "df_user"}, "function": "foreign_key", }, + "name": "foreign_key", "criticality": "error", }, { @@ -197,6 +238,7 @@ def test_to_dqx(): "arguments": {"columns": ["user_id"], "ref_columns": ["id"], "ref_table": "catalog1.schema1.user"}, "function": "foreign_key", }, + "name": "foreign_key", "criticality": "error", }, ]