diff --git a/providers/amazon/pyproject.toml b/providers/amazon/pyproject.toml index 414cedfeabf47..5b155bd19fc69 100644 --- a/providers/amazon/pyproject.toml +++ b/providers/amazon/pyproject.toml @@ -60,7 +60,7 @@ requires-python = ">=3.10" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.11.0", - "apache-airflow-providers-common-compat>=1.13.0", + "apache-airflow-providers-common-compat>=1.13.0", # use next version "apache-airflow-providers-common-sql>=1.32.0", "apache-airflow-providers-http", # We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py index ce96a76b3d06f..3d46aecb32e0b 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py @@ -35,6 +35,10 @@ ) from airflow.providers.amazon.aws.utils import validate_execute_complete_event from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.providers.common.compat.openlineage.utils.spark import ( + inject_parent_job_information_into_glue_arguments, + inject_transport_information_into_glue_arguments, +) from airflow.providers.common.compat.sdk import AirflowException, conf if TYPE_CHECKING: @@ -78,6 +82,12 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]): It is recommended to set this parameter to 10 when you are using concurrency=1. For more information see: https://repost.aws/questions/QUaKgpLBMPSGWO0iq2Fob_bw/glue-run-concurrent-jobs#ANFpCL2fRnQRqgDFuIU_rpvA + :param openlineage_inject_parent_job_info: If True, injects OpenLineage parent job information into the + Glue job's ``--conf`` argument so the Glue Spark job emits a ``parentRunFacet`` linking back to the + Airflow task. Defaults to the ``openlineage.spark_inject_parent_job_info`` config value. + :param openlineage_inject_transport_info: If True, injects OpenLineage transport configuration into the + Glue job's ``--conf`` argument so the Glue Spark job sends OL events to the same backend as Airflow. + Defaults to the ``openlineage.spark_inject_transport_info`` config value. :param waiter_delay: Time in seconds to wait between status checks. (default: 60) :param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 20) :param aws_conn_id: The Airflow connection used for AWS credentials. @@ -140,6 +150,12 @@ def __init__( waiter_delay: int = 60, waiter_max_attempts: int = 75, resume_glue_job_on_retry: bool = False, + openlineage_inject_parent_job_info: bool = conf.getboolean( + "openlineage", "spark_inject_parent_job_info", fallback=False + ), + openlineage_inject_transport_info: bool = conf.getboolean( + "openlineage", "spark_inject_transport_info", fallback=False + ), **kwargs, ): super().__init__(**kwargs) @@ -170,6 +186,8 @@ def __init__( self.waiter_delay = waiter_delay self.waiter_max_attempts = waiter_max_attempts self.resume_glue_job_on_retry = resume_glue_job_on_retry + self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info + self.openlineage_inject_transport_info = openlineage_inject_transport_info @property def _hook_parameters(self): @@ -220,8 +238,10 @@ def _get_task_uuid(self, context: Context) -> str: map_index = -1 return f"{ti.dag_id}:{ti.task_id}:{ti.run_id}:{map_index}" - def _prepare_script_args_with_task_uuid(self, context: Context) -> tuple[dict, str]: - script_args = dict(self.script_args or {}) + def _prepare_script_args_with_task_uuid( + self, context: Context, base_args: dict | None = None + ) -> tuple[dict, str]: + script_args = dict(base_args if base_args is not None else (self.script_args or {})) if self.TASK_UUID_ARG in script_args: task_uuid = str(script_args[self.TASK_UUID_ARG]) else: @@ -254,11 +274,19 @@ def execute(self, context: Context): :return: the current Glue job ID. """ previous_job_run_id = None - script_args = self.script_args + script_args = dict(self.script_args) task_uuid = None + + if self.openlineage_inject_parent_job_info: + self.log.info("Injecting OpenLineage parent job information into Glue job arguments.") + script_args = inject_parent_job_information_into_glue_arguments(script_args, context) + if self.openlineage_inject_transport_info: + self.log.info("Injecting OpenLineage transport information into Glue job arguments.") + script_args = inject_transport_information_into_glue_arguments(script_args, context) + if self.resume_glue_job_on_retry: ti = context["ti"] - script_args, task_uuid = self._prepare_script_args_with_task_uuid(context) + script_args, task_uuid = self._prepare_script_args_with_task_uuid(context, base_args=script_args) previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", task_ids=ti.task_id) if previous_job_run_id: try: diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py b/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py index e7d15fbbaa828..230193feeb42b 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_glue.py @@ -587,6 +587,195 @@ def test_resume_glue_job_on_retry_find_job_run_by_task_uuid(self, mock_initializ assert xcom_calls[0][1]["value"] == "existing_run_123" +class TestGlueJobOperatorOpenLineageInjection: + """Tests for OpenLineage parent job info and transport info injection in GlueJobOperator.""" + + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments" + ) + def test_inject_parent_job_info_called_when_enabled( + self, mock_inject_parent, mock_initialize_job, mock_get_conn + ): + mock_inject_parent.side_effect = lambda args, ctx: { + **args, + "--conf": "spark.openlineage.parentJobNamespace=ns", + } + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="s3://folder/file", + iam_role_name="my_test_role", + wait_for_completion=False, + openlineage_inject_parent_job_info=True, + ) + context = mock.MagicMock() + glue.execute(context) + + mock_inject_parent.assert_called_once() + call_args = mock_initialize_job.call_args[0][0] + assert "--conf" in call_args + assert "spark.openlineage.parentJobNamespace=ns" in call_args["--conf"] + + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments" + ) + def test_inject_parent_job_info_not_called_when_disabled( + self, mock_inject_parent, mock_initialize_job, mock_get_conn + ): + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="s3://folder/file", + iam_role_name="my_test_role", + wait_for_completion=False, + openlineage_inject_parent_job_info=False, + ) + glue.execute(mock.MagicMock()) + + mock_inject_parent.assert_not_called() + + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_transport_information_into_glue_arguments" + ) + def test_inject_transport_info_called_when_enabled( + self, mock_inject_transport, mock_initialize_job, mock_get_conn + ): + mock_inject_transport.side_effect = lambda args, ctx: { + **args, + "--conf": "spark.openlineage.transport.type=http", + } + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="s3://folder/file", + iam_role_name="my_test_role", + wait_for_completion=False, + openlineage_inject_transport_info=True, + ) + context = mock.MagicMock() + glue.execute(context) + + mock_inject_transport.assert_called_once() + call_args = mock_initialize_job.call_args[0][0] + assert "--conf" in call_args + assert "spark.openlineage.transport.type=http" in call_args["--conf"] + + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments" + ) + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_transport_information_into_glue_arguments" + ) + def test_inject_both_parent_and_transport_info( + self, mock_inject_transport, mock_inject_parent, mock_initialize_job, mock_get_conn + ): + mock_inject_parent.side_effect = lambda args, ctx: { + **args, + "--conf": "spark.openlineage.parentJobNamespace=ns", + } + mock_inject_transport.side_effect = lambda args, ctx: { + **args, + "--conf": args.get("--conf", "") + " --conf spark.openlineage.transport.type=http", + } + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="s3://folder/file", + iam_role_name="my_test_role", + wait_for_completion=False, + openlineage_inject_parent_job_info=True, + openlineage_inject_transport_info=True, + ) + glue.execute(mock.MagicMock()) + + mock_inject_parent.assert_called_once() + mock_inject_transport.assert_called_once() + call_args = mock_initialize_job.call_args[0][0] + assert "spark.openlineage.parentJobNamespace=ns" in call_args["--conf"] + assert "spark.openlineage.transport.type=http" in call_args["--conf"] + + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments" + ) + def test_inject_parent_job_info_preserves_existing_script_args( + self, mock_inject_parent, mock_initialize_job, mock_get_conn + ): + mock_inject_parent.side_effect = lambda args, ctx: { + **args, + "--conf": "spark.openlineage.parentJobNamespace=ns", + } + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="s3://folder/file", + iam_role_name="my_test_role", + wait_for_completion=False, + openlineage_inject_parent_job_info=True, + script_args={"--my-arg": "my-value"}, + ) + glue.execute(mock.MagicMock()) + + call_args = mock_initialize_job.call_args[0][0] + assert call_args["--my-arg"] == "my-value" + assert "--conf" in call_args + + @mock.patch.object(GlueJobHook, "get_conn") + @mock.patch.object(GlueJobHook, "initialize_job") + @mock.patch( + "airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments" + ) + def test_inject_parent_job_info_with_resume_on_retry( + self, mock_inject_parent, mock_initialize_job, mock_get_conn + ): + """OL injection is applied before task UUID is added; both end up in the args passed to initialize_job.""" + mock_inject_parent.side_effect = lambda args, ctx: { + **args, + "--conf": "spark.openlineage.parentJobNamespace=ns", + } + mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID} + + glue = GlueJobOperator( + task_id=TASK_ID, + job_name=JOB_NAME, + script_location="s3://folder/file", + iam_role_name="my_test_role", + wait_for_completion=False, + openlineage_inject_parent_job_info=True, + resume_glue_job_on_retry=True, + ) + + mock_ti = mock.MagicMock() + mock_ti.xcom_pull.return_value = None # no previous run + context = {"ti": mock_ti} + glue.execute(context) + + mock_inject_parent.assert_called_once() + # The injected OL arg and the task UUID arg should both be present + call_args = mock_initialize_job.call_args[0][0] + assert "--conf" in call_args + assert GlueJobOperator.TASK_UUID_ARG in call_args + + class TestGlueDataQualityOperator: RULE_SET_NAME = "TestRuleSet" RULE_SET = 'Rules=[ColumnLength "review_id" = 15]' diff --git a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py index 1028bf3debf2b..d92dad56dad8f 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py +++ b/providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py @@ -24,13 +24,17 @@ if TYPE_CHECKING: from airflow.providers.openlineage.utils.spark import ( + inject_parent_job_information_into_glue_arguments, inject_parent_job_information_into_spark_properties, + inject_transport_information_into_glue_arguments, inject_transport_information_into_spark_properties, ) from airflow.sdk import Context try: from airflow.providers.openlineage.utils.spark import ( + inject_parent_job_information_into_glue_arguments, inject_parent_job_information_into_spark_properties, + inject_transport_information_into_glue_arguments, inject_transport_information_into_spark_properties, ) except ImportError: @@ -49,8 +53,24 @@ def inject_transport_information_into_spark_properties(properties: dict, context ) return properties + def inject_parent_job_information_into_glue_arguments(script_args: dict, context: Context) -> dict: + log.warning( + "Could not import `airflow.providers.openlineage.plugins.macros`." + "Skipping the injection of OpenLineage parent job information into Glue job arguments." + ) + return script_args + + def inject_transport_information_into_glue_arguments(script_args: dict, context: Context) -> dict: + log.warning( + "Could not import `airflow.providers.openlineage.plugins.listener`." + "Skipping the injection of OpenLineage transport information into Glue job arguments." + ) + return script_args + __all__ = [ + "inject_parent_job_information_into_glue_arguments", "inject_parent_job_information_into_spark_properties", + "inject_transport_information_into_glue_arguments", "inject_transport_information_into_spark_properties", ] diff --git a/providers/openlineage/pyproject.toml b/providers/openlineage/pyproject.toml index 5f7870c38f3bb..989d2d2748d5c 100644 --- a/providers/openlineage/pyproject.toml +++ b/providers/openlineage/pyproject.toml @@ -61,7 +61,7 @@ requires-python = ">=3.10" dependencies = [ "apache-airflow>=2.11.0", "apache-airflow-providers-common-sql>=1.32.0", - "apache-airflow-providers-common-compat>=1.14.0", + "apache-airflow-providers-common-compat>=1.14.0", # use next version "attrs>=22.2", "openlineage-integration-common>=1.41.0", "openlineage-python>=1.41.0", diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py index a92ac25eab274..837946fecbefd 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py @@ -195,3 +195,73 @@ def inject_transport_information_into_spark_properties(properties: dict, context return properties return {**properties, **_get_transport_information_as_spark_properties()} + + +def inject_parent_job_information_into_glue_arguments(script_args: dict, context: Context) -> dict: + """ + Inject parent job information into Glue job arguments if not already present. + + Glue jobs pass Spark properties via the ``--conf`` key in the script_args dict. + Multiple Spark conf properties are combined into the ``--conf`` key value with + ``' --conf '`` as separator between each property assignment. + + Args: + script_args: Glue job script arguments dict (maps to boto3 ``Arguments``). + context: The context containing task instance information. + + Returns: + Modified script_args dict with OpenLineage parent job information injected, if applicable. + """ + existing_conf = script_args.get("--conf", "") + + if "spark.openlineage.parent" in existing_conf: + log.info( + "Some OpenLineage properties with parent job information are already present " + "in Glue job arguments. Skipping the injection of OpenLineage " + "parent job information into Glue job arguments." + ) + return script_args + + parent_props = _get_parent_job_information_as_spark_properties(context) + if not parent_props: + return script_args + + new_conf_parts = " --conf ".join(f"{k}={v}" for k, v in parent_props.items()) + + combined_conf = f"{existing_conf} --conf {new_conf_parts}" if existing_conf else new_conf_parts + return {**script_args, "--conf": combined_conf} + + +def inject_transport_information_into_glue_arguments(script_args: dict, context: Context) -> dict: + """ + Inject transport information into Glue job arguments if not already present. + + Glue jobs pass Spark properties via the ``--conf`` key in the script_args dict. + Multiple Spark conf properties are combined into the ``--conf`` key value with + ``' --conf '`` as separator between each property assignment. + + Args: + script_args: Glue job script arguments dict (maps to boto3 ``Arguments``). + context: The context containing task instance information. + + Returns: + Modified script_args dict with OpenLineage transport information injected, if applicable. + """ + existing_conf = script_args.get("--conf", "") + + if "spark.openlineage.transport" in existing_conf: + log.info( + "Some OpenLineage properties with transport information are already present " + "in Glue job arguments. Skipping the injection of OpenLineage " + "transport information into Glue job arguments." + ) + return script_args + + transport_props = _get_transport_information_as_spark_properties() + if not transport_props: + return script_args + + new_conf_parts = " --conf ".join(f"{k}={v}" for k, v in transport_props.items()) + + combined_conf = f"{existing_conf} --conf {new_conf_parts}" if existing_conf else new_conf_parts + return {**script_args, "--conf": combined_conf} diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py index 3dadf68482d13..be835d6a429bc 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py @@ -31,7 +31,9 @@ _get_transport_information_as_spark_properties, _is_parent_job_information_present_in_spark_properties, _is_transport_information_present_in_spark_properties, + inject_parent_job_information_into_glue_arguments, inject_parent_job_information_into_spark_properties, + inject_transport_information_into_glue_arguments, inject_transport_information_into_spark_properties, ) @@ -364,3 +366,99 @@ def test_inject_composite_transport_information_into_spark_properties( result = inject_transport_information_into_spark_properties(properties, EXAMPLE_CONTEXT) expected = {**properties, **EXAMPLE_COMPOSITE_TRANSPORT_SPARK_PROPERTIES} if should_inject else properties assert result == expected + + +# --------------------------------------------------------------------------- +# Glue argument injection tests +# --------------------------------------------------------------------------- + + +@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties") +def test_inject_parent_job_information_into_glue_arguments_empty_args(mock_get_parent): + """With no existing --conf, parent props are joined into a new --conf value.""" + mock_get_parent.return_value = { + "spark.openlineage.parentJobNamespace": "ns", + "spark.openlineage.parentJobName": "dag.task", + } + result = inject_parent_job_information_into_glue_arguments({}, EXAMPLE_CONTEXT) + assert "--conf" in result + conf = result["--conf"] + assert "spark.openlineage.parentJobNamespace=ns" in conf + assert "spark.openlineage.parentJobName=dag.task" in conf + # Multiple props joined with ' --conf ' + assert " --conf " in conf + + +@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties") +def test_inject_parent_job_information_into_glue_arguments_appends_to_existing_conf(mock_get_parent): + """Existing --conf value is preserved and OL props are appended.""" + mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": "ns"} + script_args = {"--conf": "spark.some.existing=val", "--other": "arg"} + result = inject_parent_job_information_into_glue_arguments(script_args, EXAMPLE_CONTEXT) + assert result["--other"] == "arg" + conf = result["--conf"] + assert conf.startswith("spark.some.existing=val") + assert "spark.openlineage.parentJobNamespace=ns" in conf + + +@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties") +def test_inject_parent_job_information_into_glue_arguments_skips_if_already_present(mock_get_parent): + """Injection is skipped when parent job info is already in --conf.""" + mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": "ns"} + existing = "spark.openlineage.parentJobNamespace=already_there" + script_args = {"--conf": existing} + result = inject_parent_job_information_into_glue_arguments(script_args, EXAMPLE_CONTEXT) + assert result["--conf"] == existing + mock_get_parent.assert_not_called() + + +@patch("airflow.providers.openlineage.utils.spark._get_parent_job_information_as_spark_properties") +def test_inject_parent_job_information_into_glue_arguments_does_not_mutate_input(mock_get_parent): + """The original script_args dict is not mutated.""" + mock_get_parent.return_value = {"spark.openlineage.parentJobNamespace": "ns"} + original = {"--my-arg": "val"} + original_copy = dict(original) + inject_parent_job_information_into_glue_arguments(original, EXAMPLE_CONTEXT) + assert original == original_copy + + +@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener") +def test_inject_transport_information_into_glue_arguments_empty_args(mock_ol_listener): + """With no existing --conf, transport props are joined into a new --conf value.""" + fake_listener = mock.MagicMock() + mock_ol_listener.return_value = fake_listener + fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport( + HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG) + ) + result = inject_transport_information_into_glue_arguments({}, EXAMPLE_CONTEXT) + assert "--conf" in result + conf = result["--conf"] + assert "spark.openlineage.transport.type=http" in conf + assert "spark.openlineage.transport.url=https://some-custom.url" in conf + + +@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener") +def test_inject_transport_information_into_glue_arguments_appends_to_existing_conf(mock_ol_listener): + """Existing --conf value is preserved and transport props are appended.""" + fake_listener = mock.MagicMock() + mock_ol_listener.return_value = fake_listener + fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport( + HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG) + ) + script_args = {"--conf": "spark.some.existing=val"} + result = inject_transport_information_into_glue_arguments(script_args, EXAMPLE_CONTEXT) + conf = result["--conf"] + assert conf.startswith("spark.some.existing=val") + assert "spark.openlineage.transport.type=http" in conf + + +@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener") +def test_inject_transport_information_into_glue_arguments_skips_if_already_present(mock_ol_listener): + """Injection is skipped when transport info is already in --conf.""" + fake_listener = mock.MagicMock() + mock_ol_listener.return_value = fake_listener + existing = "spark.openlineage.transport.type=http" + script_args = {"--conf": existing} + result = inject_transport_information_into_glue_arguments(script_args, EXAMPLE_CONTEXT) + assert result["--conf"] == existing + fake_listener.adapter.get_or_create_openlineage_client.assert_not_called()