Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.13.0",
"apache-airflow-providers-common-compat>=1.13.0", # use next version
"apache-airflow-providers-common-sql>=1.32.0",
"apache-airflow-providers-http",
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
)
from airflow.providers.amazon.aws.utils import validate_execute_complete_event
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
from airflow.providers.common.compat.openlineage.utils.spark import (
inject_parent_job_information_into_glue_arguments,
inject_transport_information_into_glue_arguments,
)
from airflow.providers.common.compat.sdk import AirflowException, conf

if TYPE_CHECKING:
Expand Down Expand Up @@ -78,6 +82,12 @@ class GlueJobOperator(AwsBaseOperator[GlueJobHook]):
It is recommended to set this parameter to 10 when you are using concurrency=1.
For more information see:
https://repost.aws/questions/QUaKgpLBMPSGWO0iq2Fob_bw/glue-run-concurrent-jobs#ANFpCL2fRnQRqgDFuIU_rpvA
:param openlineage_inject_parent_job_info: If True, injects OpenLineage parent job information into the
Glue job's ``--conf`` argument so the Glue Spark job emits a ``parentRunFacet`` linking back to the
Airflow task. Defaults to the ``openlineage.spark_inject_parent_job_info`` config value.
:param openlineage_inject_transport_info: If True, injects OpenLineage transport configuration into the
Glue job's ``--conf`` argument so the Glue Spark job sends OL events to the same backend as Airflow.
Defaults to the ``openlineage.spark_inject_transport_info`` config value.
:param waiter_delay: Time in seconds to wait between status checks. (default: 60)
:param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 20)
:param aws_conn_id: The Airflow connection used for AWS credentials.
Expand Down Expand Up @@ -140,6 +150,12 @@ def __init__(
waiter_delay: int = 60,
waiter_max_attempts: int = 75,
resume_glue_job_on_retry: bool = False,
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
**kwargs,
):
super().__init__(**kwargs)
Expand Down Expand Up @@ -170,6 +186,8 @@ def __init__(
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.resume_glue_job_on_retry = resume_glue_job_on_retry
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info
self.openlineage_inject_transport_info = openlineage_inject_transport_info

@property
def _hook_parameters(self):
Expand Down Expand Up @@ -220,8 +238,10 @@ def _get_task_uuid(self, context: Context) -> str:
map_index = -1
return f"{ti.dag_id}:{ti.task_id}:{ti.run_id}:{map_index}"

def _prepare_script_args_with_task_uuid(self, context: Context) -> tuple[dict, str]:
script_args = dict(self.script_args or {})
def _prepare_script_args_with_task_uuid(
self, context: Context, base_args: dict | None = None
) -> tuple[dict, str]:
script_args = dict(base_args if base_args is not None else (self.script_args or {}))
if self.TASK_UUID_ARG in script_args:
task_uuid = str(script_args[self.TASK_UUID_ARG])
else:
Expand Down Expand Up @@ -254,11 +274,19 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
previous_job_run_id = None
script_args = self.script_args
script_args = dict(self.script_args)
task_uuid = None

if self.openlineage_inject_parent_job_info:
self.log.info("Injecting OpenLineage parent job information into Glue job arguments.")
script_args = inject_parent_job_information_into_glue_arguments(script_args, context)
if self.openlineage_inject_transport_info:
self.log.info("Injecting OpenLineage transport information into Glue job arguments.")
script_args = inject_transport_information_into_glue_arguments(script_args, context)

if self.resume_glue_job_on_retry:
ti = context["ti"]
script_args, task_uuid = self._prepare_script_args_with_task_uuid(context)
script_args, task_uuid = self._prepare_script_args_with_task_uuid(context, base_args=script_args)
previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", task_ids=ti.task_id)
if previous_job_run_id:
try:
Expand Down
189 changes: 189 additions & 0 deletions providers/amazon/tests/unit/amazon/aws/operators/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,195 @@ def test_resume_glue_job_on_retry_find_job_run_by_task_uuid(self, mock_initializ
assert xcom_calls[0][1]["value"] == "existing_run_123"


class TestGlueJobOperatorOpenLineageInjection:
"""Tests for OpenLineage parent job info and transport info injection in GlueJobOperator."""

@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments"
)
def test_inject_parent_job_info_called_when_enabled(
self, mock_inject_parent, mock_initialize_job, mock_get_conn
):
mock_inject_parent.side_effect = lambda args, ctx: {
**args,
"--conf": "spark.openlineage.parentJobNamespace=ns",
}
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}

glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3://folder/file",
iam_role_name="my_test_role",
wait_for_completion=False,
openlineage_inject_parent_job_info=True,
)
context = mock.MagicMock()
glue.execute(context)

mock_inject_parent.assert_called_once()
call_args = mock_initialize_job.call_args[0][0]
assert "--conf" in call_args
assert "spark.openlineage.parentJobNamespace=ns" in call_args["--conf"]

@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments"
)
def test_inject_parent_job_info_not_called_when_disabled(
self, mock_inject_parent, mock_initialize_job, mock_get_conn
):
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}

glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3://folder/file",
iam_role_name="my_test_role",
wait_for_completion=False,
openlineage_inject_parent_job_info=False,
)
glue.execute(mock.MagicMock())

mock_inject_parent.assert_not_called()

@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_transport_information_into_glue_arguments"
)
def test_inject_transport_info_called_when_enabled(
self, mock_inject_transport, mock_initialize_job, mock_get_conn
):
mock_inject_transport.side_effect = lambda args, ctx: {
**args,
"--conf": "spark.openlineage.transport.type=http",
}
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}

glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3://folder/file",
iam_role_name="my_test_role",
wait_for_completion=False,
openlineage_inject_transport_info=True,
)
context = mock.MagicMock()
glue.execute(context)

mock_inject_transport.assert_called_once()
call_args = mock_initialize_job.call_args[0][0]
assert "--conf" in call_args
assert "spark.openlineage.transport.type=http" in call_args["--conf"]

@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments"
)
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_transport_information_into_glue_arguments"
)
def test_inject_both_parent_and_transport_info(
self, mock_inject_transport, mock_inject_parent, mock_initialize_job, mock_get_conn
):
mock_inject_parent.side_effect = lambda args, ctx: {
**args,
"--conf": "spark.openlineage.parentJobNamespace=ns",
}
mock_inject_transport.side_effect = lambda args, ctx: {
**args,
"--conf": args.get("--conf", "") + " --conf spark.openlineage.transport.type=http",
}
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}

glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3://folder/file",
iam_role_name="my_test_role",
wait_for_completion=False,
openlineage_inject_parent_job_info=True,
openlineage_inject_transport_info=True,
)
glue.execute(mock.MagicMock())

mock_inject_parent.assert_called_once()
mock_inject_transport.assert_called_once()
call_args = mock_initialize_job.call_args[0][0]
assert "spark.openlineage.parentJobNamespace=ns" in call_args["--conf"]
assert "spark.openlineage.transport.type=http" in call_args["--conf"]

@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments"
)
def test_inject_parent_job_info_preserves_existing_script_args(
self, mock_inject_parent, mock_initialize_job, mock_get_conn
):
mock_inject_parent.side_effect = lambda args, ctx: {
**args,
"--conf": "spark.openlineage.parentJobNamespace=ns",
}
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}

glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3://folder/file",
iam_role_name="my_test_role",
wait_for_completion=False,
openlineage_inject_parent_job_info=True,
script_args={"--my-arg": "my-value"},
)
glue.execute(mock.MagicMock())

call_args = mock_initialize_job.call_args[0][0]
assert call_args["--my-arg"] == "my-value"
assert "--conf" in call_args

@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch(
"airflow.providers.amazon.aws.operators.glue.inject_parent_job_information_into_glue_arguments"
)
def test_inject_parent_job_info_with_resume_on_retry(
self, mock_inject_parent, mock_initialize_job, mock_get_conn
):
"""OL injection is applied before task UUID is added; both end up in the args passed to initialize_job."""
mock_inject_parent.side_effect = lambda args, ctx: {
**args,
"--conf": "spark.openlineage.parentJobNamespace=ns",
}
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}

glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3://folder/file",
iam_role_name="my_test_role",
wait_for_completion=False,
openlineage_inject_parent_job_info=True,
resume_glue_job_on_retry=True,
)

mock_ti = mock.MagicMock()
mock_ti.xcom_pull.return_value = None # no previous run
context = {"ti": mock_ti}
glue.execute(context)

mock_inject_parent.assert_called_once()
# The injected OL arg and the task UUID arg should both be present
call_args = mock_initialize_job.call_args[0][0]
assert "--conf" in call_args
assert GlueJobOperator.TASK_UUID_ARG in call_args


class TestGlueDataQualityOperator:
RULE_SET_NAME = "TestRuleSet"
RULE_SET = 'Rules=[ColumnLength "review_id" = 15]'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

if TYPE_CHECKING:
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_glue_arguments,
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_glue_arguments,
inject_transport_information_into_spark_properties,
)
from airflow.sdk import Context
try:
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_glue_arguments,
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_glue_arguments,
inject_transport_information_into_spark_properties,
)
except ImportError:
Expand All @@ -49,8 +53,24 @@ def inject_transport_information_into_spark_properties(properties: dict, context
)
return properties

def inject_parent_job_information_into_glue_arguments(script_args: dict, context: Context) -> dict:
log.warning(
"Could not import `airflow.providers.openlineage.plugins.macros`."
"Skipping the injection of OpenLineage parent job information into Glue job arguments."
)
return script_args

def inject_transport_information_into_glue_arguments(script_args: dict, context: Context) -> dict:
log.warning(
"Could not import `airflow.providers.openlineage.plugins.listener`."
"Skipping the injection of OpenLineage transport information into Glue job arguments."
)
return script_args


__all__ = [
"inject_parent_job_information_into_glue_arguments",
"inject_parent_job_information_into_spark_properties",
"inject_transport_information_into_glue_arguments",
"inject_transport_information_into_spark_properties",
]
2 changes: 1 addition & 1 deletion providers/openlineage/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-sql>=1.32.0",
"apache-airflow-providers-common-compat>=1.14.0",
"apache-airflow-providers-common-compat>=1.14.0", # use next version
"attrs>=22.2",
"openlineage-integration-common>=1.41.0",
"openlineage-python>=1.41.0",
Expand Down
Loading
Loading