From a600fa43f36a844e7511ded8e55ef9b1f865f072 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Wed, 1 Apr 2026 18:02:40 +0530 Subject: [PATCH 01/14] Add tests for SalesforceBulkOperator transient-error retry --- .../salesforce/operators/test_bulk_retry.py | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py new file mode 100644 index 0000000000000..df6e8128cd12b --- /dev/null +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py @@ -0,0 +1,187 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.providers.salesforce.operators.bulk import SalesforceBulkOperator + + +def _make_op(**kwargs): + defaults = dict( + task_id="test_task", + operation="insert", + object_name="Contact", + payload=[{"FirstName": "Ada"}, {"FirstName": "Grace"}], + ) + defaults.update(kwargs) + return SalesforceBulkOperator(**defaults) + + +def _transient_failure(status_code="UNABLE_TO_LOCK_ROW"): + return {"success": False, "errors": [{"statusCode": status_code, "message": "locked", "fields": []}]} + + +def _permanent_failure(): + return {"success": False, "errors": [{"statusCode": "REQUIRED_FIELD_MISSING", "message": "missing", "fields": ["Name"]}]} + + +def _success(): + return {"success": True, "errors": []} + + +class TestSalesforceBulkOperatorRetry: + def test_no_retry_when_max_retries_zero(self): + op = _make_op(max_retries=0) + assert op.max_retries == 0 + + bulk_mock = mock.MagicMock() + bulk_mock.__getattr__("Contact").insert.return_value = [_success(), _success()] + + with mock.patch("airflow.providers.salesforce.operators.bulk.SalesforceHook") as hook_cls: + hook_cls.return_value.get_conn.return_value.__getattr__.return_value = bulk_mock + result = op.execute(context={}) + + assert result == [_success(), _success()] + assert bulk_mock.__getattr__("Contact").insert.call_count == 1 + + def test_transient_failure_is_retried(self): + op = _make_op(max_retries=2, retry_delay=0) + + first_result = [_transient_failure(), _success()] + second_result = [_success()] + + run_mock = mock.MagicMock(side_effect=[first_result, second_result]) + + with mock.patch.object(op, "_run_operation", run_mock): + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep"): + final = op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=[{"FirstName": "Ada"}, {"FirstName": "Grace"}], + result=first_result, + ) + + assert final[0] == _success() + assert final[1] == _success() + assert run_mock.call_count == 2 + _, retry_call = run_mock.call_args_list + assert retry_call == mock.call(mock.ANY, [{"FirstName": "Ada"}]) + + def test_permanent_failure_is_not_retried(self): + op = _make_op(max_retries=3, retry_delay=0) + result = [_permanent_failure(), _success()] + + run_mock = mock.MagicMock() + + with mock.patch.object(op, "_run_operation", run_mock): + final = op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=[{"FirstName": "Ada"}, {"FirstName": "Grace"}], + result=result, + ) + + run_mock.assert_not_called() + assert final[0] == _permanent_failure() + + def test_retries_stop_after_max_retries(self): + op = _make_op(max_retries=2, retry_delay=0) + + always_transient = [_transient_failure()] + run_mock = mock.MagicMock(return_value=always_transient) + + with mock.patch.object(op, "_run_operation", run_mock): + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep"): + final = op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=[{"FirstName": "Ada"}], + result=always_transient, + ) + + assert run_mock.call_count == 2 + assert final[0]["success"] is False + + def test_retry_delay_is_respected(self): + op = _make_op(max_retries=1, retry_delay=30.0) + + run_mock = mock.MagicMock(return_value=[_success()]) + + with mock.patch.object(op, "_run_operation", run_mock): + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep") as sleep_mock: + op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=[{"FirstName": "Ada"}], + result=[_transient_failure()], + ) + + sleep_mock.assert_called_once_with(30.0) + + def test_custom_transient_error_codes(self): + op = _make_op(max_retries=1, retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"]) + assert op.transient_error_codes == frozenset({"MY_CUSTOM_ERROR"}) + + custom_failure = {"success": False, "errors": [{"statusCode": "MY_CUSTOM_ERROR", "message": "custom"}]} + run_mock = mock.MagicMock(return_value=[_success()]) + + with mock.patch.object(op, "_run_operation", run_mock): + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep"): + final = op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=[{"FirstName": "Ada"}], + result=[custom_failure], + ) + + run_mock.assert_called_once() + assert final[0] == _success() + + def test_api_temporarily_unavailable_is_retried(self): + op = _make_op(max_retries=1, retry_delay=0) + failure = _transient_failure("API_TEMPORARILY_UNAVAILABLE") + run_mock = mock.MagicMock(return_value=[_success()]) + + with mock.patch.object(op, "_run_operation", run_mock): + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep"): + final = op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=[{"FirstName": "Ada"}], + result=[failure], + ) + + run_mock.assert_called_once() + assert final[0] == _success() + + def test_mixed_failures_only_retries_transient(self): + op = _make_op(max_retries=1, retry_delay=0) + payload = [{"FirstName": "A"}, {"FirstName": "B"}, {"FirstName": "C"}] + initial = [_transient_failure(), _permanent_failure(), _success()] + + run_mock = mock.MagicMock(return_value=[_success()]) + + with mock.patch.object(op, "_run_operation", run_mock): + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep"): + final = op._retry_transient_failures( + bulk=mock.MagicMock(), + payload=payload, + result=initial, + ) + + _, retry_call = run_mock.call_args_list[0], run_mock.call_args_list[-1] + run_mock.assert_called_once_with(mock.ANY, [{"FirstName": "A"}]) + assert final[0] == _success() + assert final[1] == _permanent_failure() + assert final[2] == _success() From cd741535454713f0948558c65fd46d3ecd06c87b Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Wed, 1 Apr 2026 18:02:55 +0530 Subject: [PATCH 02/14] Add transient-error retry to SalesforceBulkOperator Introduces max_retries, retry_delay, and transient_error_codes params. When max_retries > 0, records that fail with a transient Salesforce error (UNABLE_TO_LOCK_ROW or API_TEMPORARILY_UNAVAILABLE by default) are re-submitted after retry_delay seconds, up to max_retries times. Only the failed records are re-submitted, not the entire payload. Related to #64519 --- .../providers/salesforce/operators/bulk.py | 109 ++++++++++++++---- 1 file changed, 85 insertions(+), 24 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 7b5d21030db02..c08b03b54032b 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import time from collections.abc import Iterable, Sequence from typing import TYPE_CHECKING, cast @@ -29,6 +30,13 @@ from airflow.providers.common.compat.sdk import Context +# Salesforce error statusCode values that indicate a transient server-side +# condition rather than a permanent data problem. Records that fail with one of +# these codes can reasonably be re-submitted after a short delay. +_DEFAULT_TRANSIENT_ERROR_CODES: frozenset[str] = frozenset( + {"UNABLE_TO_LOCK_ROW", "API_TEMPORARILY_UNAVAILABLE"} +) + class SalesforceBulkOperator(BaseOperator): """ @@ -46,6 +54,14 @@ class SalesforceBulkOperator(BaseOperator): :param batch_size: number of records to assign for each batch in the job :param use_serial: Process batches in serial mode :param salesforce_conn_id: The :ref:`Salesforce Connection id `. + :param max_retries: Number of times to re-submit records that failed with a + transient error code such as ``UNABLE_TO_LOCK_ROW`` or + ``API_TEMPORARILY_UNAVAILABLE``. Set to ``0`` (the default) to disable + automatic retries. + :param retry_delay: Seconds to wait before each retry attempt. Defaults to ``5``. + :param transient_error_codes: Collection of Salesforce error ``statusCode`` + values that should trigger a retry. Defaults to + ``{"UNABLE_TO_LOCK_ROW", "API_TEMPORARILY_UNAVAILABLE"}``. """ template_fields: Sequence[str] = ("object_name", "payload", "external_id_field") @@ -62,6 +78,9 @@ def __init__( batch_size: int = 10000, use_serial: bool = False, salesforce_conn_id: str = "salesforce_default", + max_retries: int = 0, + retry_delay: float = 5.0, + transient_error_codes: Iterable[str] = _DEFAULT_TRANSIENT_ERROR_CODES, **kwargs, ) -> None: super().__init__(**kwargs) @@ -72,6 +91,9 @@ def __init__( self.batch_size = batch_size self.use_serial = use_serial self.salesforce_conn_id = salesforce_conn_id + self.max_retries = max_retries + self.retry_delay = retry_delay + self.transient_error_codes = frozenset(transient_error_codes) self._validate_inputs() def _validate_inputs(self) -> None: @@ -84,6 +106,65 @@ def _validate_inputs(self) -> None: f"Available operations are {self.available_operations}." ) + def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: + """Submit *payload* through the configured Bulk API operation and return the result list.""" + obj = bulk.__getattr__(self.object_name) + if self.operation == "upsert": + return list( + obj.upsert( + data=payload, + external_id_field=self.external_id_field, + batch_size=self.batch_size, + use_serial=self.use_serial, + ) + ) + return list( + getattr(obj, self.operation)( + data=payload, + batch_size=self.batch_size, + use_serial=self.use_serial, + ) + ) + + def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: list) -> list: + """Re-submit records that failed with a transient error, up to *max_retries* times. + + Salesforce Bulk API results are ordered identically to the input payload, so + failed records are located by index and their retry results are written back + into the same positions. + """ + final = list(result) + + for attempt in range(1, self.max_retries + 1): + retry_indices = [ + i + for i, r in enumerate(final) + if not r.get("success") + and {e.get("statusCode") for e in r.get("errors", [])} & self.transient_error_codes + ] + + if not retry_indices: + break + + self.log.warning( + "Salesforce Bulk API %s on %s: retrying %d record(s) with transient errors " + "(attempt %d/%d, waiting %.1f second(s)).", + self.operation, + self.object_name, + len(retry_indices), + attempt, + self.max_retries, + self.retry_delay, + ) + time.sleep(self.retry_delay) + + retry_result = self._run_operation(bulk, [payload[i] for i in retry_indices]) + + for list_pos, original_idx in enumerate(retry_indices): + final[original_idx] = retry_result[list_pos] + + return final + def execute(self, context: Context): """ Make an HTTP request to Salesforce Bulk API. @@ -95,30 +176,10 @@ def execute(self, context: Context): conn = sf_hook.get_conn() bulk: SFBulkHandler = cast("SFBulkHandler", conn.__getattr__("bulk")) - result: Iterable = [] - if self.operation == "insert": - result = bulk.__getattr__(self.object_name).insert( - data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial - ) - elif self.operation == "update": - result = bulk.__getattr__(self.object_name).update( - data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial - ) - elif self.operation == "upsert": - result = bulk.__getattr__(self.object_name).upsert( - data=self.payload, - external_id_field=self.external_id_field, - batch_size=self.batch_size, - use_serial=self.use_serial, - ) - elif self.operation == "delete": - result = bulk.__getattr__(self.object_name).delete( - data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial - ) - elif self.operation == "hard_delete": - result = bulk.__getattr__(self.object_name).hard_delete( - data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial - ) + result = self._run_operation(bulk, self.payload) + + if self.max_retries > 0: + result = self._retry_transient_failures(bulk, self.payload, result) if self.do_xcom_push and result: return result From 3250d1a38bad2b282767a30379754bf80be1a071 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 11:23:00 +0530 Subject: [PATCH 03/14] Fix lint: remove unused pytest import and dead variable assignments --- .../tests/unit/salesforce/operators/test_bulk_retry.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py index df6e8128cd12b..ae6f95dceb5c5 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py @@ -18,8 +18,6 @@ from unittest import mock -import pytest - from airflow.providers.salesforce.operators.bulk import SalesforceBulkOperator @@ -80,7 +78,7 @@ def test_transient_failure_is_retried(self): assert final[0] == _success() assert final[1] == _success() assert run_mock.call_count == 2 - _, retry_call = run_mock.call_args_list + retry_call = run_mock.call_args_list[1] assert retry_call == mock.call(mock.ANY, [{"FirstName": "Ada"}]) def test_permanent_failure_is_not_retried(self): @@ -180,7 +178,6 @@ def test_mixed_failures_only_retries_transient(self): result=initial, ) - _, retry_call = run_mock.call_args_list[0], run_mock.call_args_list[-1] run_mock.assert_called_once_with(mock.ANY, [{"FirstName": "A"}]) assert final[0] == _success() assert final[1] == _permanent_failure() From cd7b6b05dbacdbae9ca8565234652db2514cf966 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 11:44:57 +0530 Subject: [PATCH 04/14] Add input validation for max_retries, retry_delay, and transient_error_codes --- .../airflow/providers/salesforce/operators/bulk.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index c08b03b54032b..7882c20b19613 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -93,11 +93,22 @@ def __init__( self.salesforce_conn_id = salesforce_conn_id self.max_retries = max_retries self.retry_delay = retry_delay + if isinstance(transient_error_codes, str): + raise ValueError( + "'transient_error_codes' must be a non-string iterable of strings, " + f"got {transient_error_codes!r}. Wrap it in a list: [{transient_error_codes!r}]" + ) self.transient_error_codes = frozenset(transient_error_codes) self._validate_inputs() def _validate_inputs(self) -> None: - if not self.object_name: + if self.max_retries < 0: + raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") + + if self.retry_delay < 0: + raise ValueError(f"'retry_delay' must be a non-negative number, got {self.retry_delay!r}.") + + if not self.object_name: raise ValueError("The required parameter 'object_name' cannot have an empty value.") if self.operation not in self.available_operations: From 9ba6b413ceba2753cfe4fd79d13dc251c29846b3 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 11:52:43 +0530 Subject: [PATCH 05/14] Fix IndentationError in _validate_inputs: use consistent 8-space indent --- .../airflow/providers/salesforce/operators/bulk.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 7882c20b19613..1be8e0aab2fd3 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -102,13 +102,13 @@ def __init__( self._validate_inputs() def _validate_inputs(self) -> None: - if self.max_retries < 0: - raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") + if self.max_retries < 0: + raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") - if self.retry_delay < 0: - raise ValueError(f"'retry_delay' must be a non-negative number, got {self.retry_delay!r}.") + if self.retry_delay < 0: + raise ValueError(f"'retry_delay' must be a non-negative number, got {self.retry_delay!r}.") - if not self.object_name: + if not self.object_name: raise ValueError("The required parameter 'object_name' cannot have an empty value.") if self.operation not in self.available_operations: @@ -116,7 +116,6 @@ def _validate_inputs(self) -> None: f"Operation {self.operation!r} not found! " f"Available operations are {self.available_operations}." ) - def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: """Submit *payload* through the configured Bulk API operation and return the result list.""" obj = bulk.__getattr__(self.object_name) From 419c9164f3fc4e6099d5c38f4628f76dd9e80420 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 12:23:58 +0530 Subject: [PATCH 06/14] =?UTF-8?q?Rename=20retry=5Fdelay=20=E2=86=92=20bulk?= =?UTF-8?q?=5Fretry=5Fdelay=20to=20avoid=20collision=20with=20BaseOperator?= =?UTF-8?q?.retry=5Fdelay=20(timedelta)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../airflow/providers/salesforce/operators/bulk.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 1be8e0aab2fd3..de1ff96a7208b 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -58,7 +58,7 @@ class SalesforceBulkOperator(BaseOperator): transient error code such as ``UNABLE_TO_LOCK_ROW`` or ``API_TEMPORARILY_UNAVAILABLE``. Set to ``0`` (the default) to disable automatic retries. - :param retry_delay: Seconds to wait before each retry attempt. Defaults to ``5``. + :param bulk_retry_delay: Seconds to wait before each retry attempt within the Bulk API retry loop. Defaults to ``5``. :param transient_error_codes: Collection of Salesforce error ``statusCode`` values that should trigger a retry. Defaults to ``{"UNABLE_TO_LOCK_ROW", "API_TEMPORARILY_UNAVAILABLE"}``. @@ -79,7 +79,7 @@ def __init__( use_serial: bool = False, salesforce_conn_id: str = "salesforce_default", max_retries: int = 0, - retry_delay: float = 5.0, + bulk_retry_delay: float = 5.0, transient_error_codes: Iterable[str] = _DEFAULT_TRANSIENT_ERROR_CODES, **kwargs, ) -> None: @@ -92,7 +92,7 @@ def __init__( self.use_serial = use_serial self.salesforce_conn_id = salesforce_conn_id self.max_retries = max_retries - self.retry_delay = retry_delay + self.bulk_retry_delay = bulk_retry_delay if isinstance(transient_error_codes, str): raise ValueError( "'transient_error_codes' must be a non-string iterable of strings, " @@ -105,8 +105,8 @@ def _validate_inputs(self) -> None: if self.max_retries < 0: raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") - if self.retry_delay < 0: - raise ValueError(f"'retry_delay' must be a non-negative number, got {self.retry_delay!r}.") + if self.bulk_retry_delay < 0: + raise ValueError(f"'bulk_retry_delay' must be a non-negative number, got {self.bulk_retry_delay!r}.") if not self.object_name: raise ValueError("The required parameter 'object_name' cannot have an empty value.") @@ -164,9 +164,9 @@ def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: len(retry_indices), attempt, self.max_retries, - self.retry_delay, + self.bulk_retry_delay, ) - time.sleep(self.retry_delay) + time.sleep(self.bulk_retry_delay) retry_result = self._run_operation(bulk, [payload[i] for i in retry_indices]) From 774c3e064d586ca3adfb95c84d67f419ddb3f7b6 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 12:23:59 +0530 Subject: [PATCH 07/14] =?UTF-8?q?Update=20tests:=20retry=5Fdelay=20?= =?UTF-8?q?=E2=86=92=20bulk=5Fretry=5Fdelay?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../unit/salesforce/operators/test_bulk_retry.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py index ae6f95dceb5c5..1b92cbd92002f 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py @@ -60,7 +60,7 @@ def test_no_retry_when_max_retries_zero(self): assert bulk_mock.__getattr__("Contact").insert.call_count == 1 def test_transient_failure_is_retried(self): - op = _make_op(max_retries=2, retry_delay=0) + op = _make_op(max_retries=2, bulk_retry_delay=0) first_result = [_transient_failure(), _success()] second_result = [_success()] @@ -82,7 +82,7 @@ def test_transient_failure_is_retried(self): assert retry_call == mock.call(mock.ANY, [{"FirstName": "Ada"}]) def test_permanent_failure_is_not_retried(self): - op = _make_op(max_retries=3, retry_delay=0) + op = _make_op(max_retries=3, bulk_retry_delay=0) result = [_permanent_failure(), _success()] run_mock = mock.MagicMock() @@ -98,7 +98,7 @@ def test_permanent_failure_is_not_retried(self): assert final[0] == _permanent_failure() def test_retries_stop_after_max_retries(self): - op = _make_op(max_retries=2, retry_delay=0) + op = _make_op(max_retries=2, bulk_retry_delay=0) always_transient = [_transient_failure()] run_mock = mock.MagicMock(return_value=always_transient) @@ -115,7 +115,7 @@ def test_retries_stop_after_max_retries(self): assert final[0]["success"] is False def test_retry_delay_is_respected(self): - op = _make_op(max_retries=1, retry_delay=30.0) + op = _make_op(max_retries=1, bulk_retry_delay=30.0) run_mock = mock.MagicMock(return_value=[_success()]) @@ -130,7 +130,7 @@ def test_retry_delay_is_respected(self): sleep_mock.assert_called_once_with(30.0) def test_custom_transient_error_codes(self): - op = _make_op(max_retries=1, retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"]) + op = _make_op(max_retries=1, bulk_retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"]) assert op.transient_error_codes == frozenset({"MY_CUSTOM_ERROR"}) custom_failure = {"success": False, "errors": [{"statusCode": "MY_CUSTOM_ERROR", "message": "custom"}]} @@ -148,7 +148,7 @@ def test_custom_transient_error_codes(self): assert final[0] == _success() def test_api_temporarily_unavailable_is_retried(self): - op = _make_op(max_retries=1, retry_delay=0) + op = _make_op(max_retries=1, bulk_retry_delay=0) failure = _transient_failure("API_TEMPORARILY_UNAVAILABLE") run_mock = mock.MagicMock(return_value=[_success()]) @@ -164,7 +164,7 @@ def test_api_temporarily_unavailable_is_retried(self): assert final[0] == _success() def test_mixed_failures_only_retries_transient(self): - op = _make_op(max_retries=1, retry_delay=0) + op = _make_op(max_retries=1, bulk_retry_delay=0) payload = [{"FirstName": "A"}, {"FirstName": "B"}, {"FirstName": "C"}] initial = [_transient_failure(), _permanent_failure(), _success()] From 38c9dccf652285782575bf28939d4bd99a527158 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:13:28 +0530 Subject: [PATCH 08/14] Fix: correct mock chain for hook conn.bulk; ruff format long dicts --- .../unit/salesforce/operators/test_bulk_retry.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py index 1b92cbd92002f..1960894a1ed79 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py @@ -37,7 +37,10 @@ def _transient_failure(status_code="UNABLE_TO_LOCK_ROW"): def _permanent_failure(): - return {"success": False, "errors": [{"statusCode": "REQUIRED_FIELD_MISSING", "message": "missing", "fields": ["Name"]}]} + return { + "success": False, + "errors": [{"statusCode": "REQUIRED_FIELD_MISSING", "message": "missing", "fields": ["Name"]}], + } def _success(): @@ -53,7 +56,7 @@ def test_no_retry_when_max_retries_zero(self): bulk_mock.__getattr__("Contact").insert.return_value = [_success(), _success()] with mock.patch("airflow.providers.salesforce.operators.bulk.SalesforceHook") as hook_cls: - hook_cls.return_value.get_conn.return_value.__getattr__.return_value = bulk_mock + hook_cls.return_value.get_conn.return_value.bulk = bulk_mock result = op.execute(context={}) assert result == [_success(), _success()] @@ -133,7 +136,10 @@ def test_custom_transient_error_codes(self): op = _make_op(max_retries=1, bulk_retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"]) assert op.transient_error_codes == frozenset({"MY_CUSTOM_ERROR"}) - custom_failure = {"success": False, "errors": [{"statusCode": "MY_CUSTOM_ERROR", "message": "custom"}]} + custom_failure = { + "success": False, + "errors": [{"statusCode": "MY_CUSTOM_ERROR", "message": "custom"}], + } run_mock = mock.MagicMock(return_value=[_success()]) with mock.patch.object(op, "_run_operation", run_mock): From 2ba32a3fd72ebdcf71f752d9e4f3dbd3096c71c8 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:13:44 +0530 Subject: [PATCH 09/14] Fix: remove list() from _run_operation, add to retry call; fix ruff format --- .../providers/salesforce/operators/bulk.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index de1ff96a7208b..d1633a2fee778 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -106,7 +106,9 @@ def _validate_inputs(self) -> None: raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") if self.bulk_retry_delay < 0: - raise ValueError(f"'bulk_retry_delay' must be a non-negative number, got {self.bulk_retry_delay!r}.") + raise ValueError( + f"'bulk_retry_delay' must be a non-negative number, got {self.bulk_retry_delay!r}." + ) if not self.object_name: raise ValueError("The required parameter 'object_name' cannot have an empty value.") @@ -116,28 +118,26 @@ def _validate_inputs(self) -> None: f"Operation {self.operation!r} not found! " f"Available operations are {self.available_operations}." ) + def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: """Submit *payload* through the configured Bulk API operation and return the result list.""" obj = bulk.__getattr__(self.object_name) if self.operation == "upsert": - return list( - obj.upsert( - data=payload, - external_id_field=self.external_id_field, - batch_size=self.batch_size, - use_serial=self.use_serial, - ) - ) - return list( - getattr(obj, self.operation)( + return obj.upsert( data=payload, + external_id_field=self.external_id_field, batch_size=self.batch_size, use_serial=self.use_serial, ) + return getattr(obj, self.operation)( + data=payload, + batch_size=self.batch_size, + use_serial=self.use_serial, ) def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: list) -> list: - """Re-submit records that failed with a transient error, up to *max_retries* times. + """ + Re-submit records that failed with a transient error, up to *max_retries* times. Salesforce Bulk API results are ordered identically to the input payload, so failed records are located by index and their retry results are written back @@ -168,7 +168,7 @@ def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: ) time.sleep(self.bulk_retry_delay) - retry_result = self._run_operation(bulk, [payload[i] for i in retry_indices]) + retry_result = list(self._run_operation(bulk, [payload[i] for i in retry_indices])) for list_pos, original_idx in enumerate(retry_indices): final[original_idx] = retry_result[list_pos] From 6f3d72d6ba7d7c0f5186ea3d30e2448219d5e7c3 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:17:50 +0530 Subject: [PATCH 10/14] Apply ruff format: split long lines, wrap method signature and retry call --- .../providers/salesforce/operators/bulk.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index d1633a2fee778..88b7ae80b7e89 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -103,15 +103,19 @@ def __init__( def _validate_inputs(self) -> None: if self.max_retries < 0: - raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") + raise ValueError( + f"'max_retries' must be a non-negative integer, got {self.max_retries!r}." + ) if self.bulk_retry_delay < 0: raise ValueError( - f"'bulk_retry_delay' must be a non-negative number, got {self.bulk_retry_delay!r}." - ) + f"'bulk_retry_delay' must be a non-negative number, got {self.bulk_retry_delay!r}." + ) if not self.object_name: - raise ValueError("The required parameter 'object_name' cannot have an empty value.") + raise ValueError( + "The required parameter 'object_name' cannot have an empty value." + ) if self.operation not in self.available_operations: raise ValueError( @@ -135,7 +139,9 @@ def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: use_serial=self.use_serial, ) - def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: list) -> list: + def _retry_transient_failures( + self, bulk: SFBulkHandler, payload: list, result: list + ) -> list: """ Re-submit records that failed with a transient error, up to *max_retries* times. @@ -150,7 +156,8 @@ def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: i for i, r in enumerate(final) if not r.get("success") - and {e.get("statusCode") for e in r.get("errors", [])} & self.transient_error_codes + and {e.get("statusCode") for e in r.get("errors", [])} + & self.transient_error_codes ] if not retry_indices: @@ -168,7 +175,9 @@ def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: ) time.sleep(self.bulk_retry_delay) - retry_result = list(self._run_operation(bulk, [payload[i] for i in retry_indices])) + retry_result = list( + self._run_operation(bulk, [payload[i] for i in retry_indices]) + ) for list_pos, original_idx in enumerate(retry_indices): final[original_idx] = retry_result[list_pos] From 351fea50738e000c31d5601add3eadfc328e5550 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:18:02 +0530 Subject: [PATCH 11/14] Apply ruff format: split long dicts in test helpers --- .../salesforce/operators/test_bulk_retry.py | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py index 1960894a1ed79..5de203c2df152 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py @@ -33,13 +33,22 @@ def _make_op(**kwargs): def _transient_failure(status_code="UNABLE_TO_LOCK_ROW"): - return {"success": False, "errors": [{"statusCode": status_code, "message": "locked", "fields": []}]} + return { + "success": False, + "errors": [{"statusCode": status_code, "message": "locked", "fields": []}], + } def _permanent_failure(): return { "success": False, - "errors": [{"statusCode": "REQUIRED_FIELD_MISSING", "message": "missing", "fields": ["Name"]}], + "errors": [ + { + "statusCode": "REQUIRED_FIELD_MISSING", + "message": "missing", + "fields": ["Name"], + } + ], } @@ -55,7 +64,9 @@ def test_no_retry_when_max_retries_zero(self): bulk_mock = mock.MagicMock() bulk_mock.__getattr__("Contact").insert.return_value = [_success(), _success()] - with mock.patch("airflow.providers.salesforce.operators.bulk.SalesforceHook") as hook_cls: + with mock.patch( + "airflow.providers.salesforce.operators.bulk.SalesforceHook" + ) as hook_cls: hook_cls.return_value.get_conn.return_value.bulk = bulk_mock result = op.execute(context={}) @@ -123,7 +134,9 @@ def test_retry_delay_is_respected(self): run_mock = mock.MagicMock(return_value=[_success()]) with mock.patch.object(op, "_run_operation", run_mock): - with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep") as sleep_mock: + with mock.patch( + "airflow.providers.salesforce.operators.bulk.time.sleep" + ) as sleep_mock: op._retry_transient_failures( bulk=mock.MagicMock(), payload=[{"FirstName": "Ada"}], @@ -133,7 +146,9 @@ def test_retry_delay_is_respected(self): sleep_mock.assert_called_once_with(30.0) def test_custom_transient_error_codes(self): - op = _make_op(max_retries=1, bulk_retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"]) + op = _make_op( + max_retries=1, bulk_retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"] + ) assert op.transient_error_codes == frozenset({"MY_CUSTOM_ERROR"}) custom_failure = { From a448191823408d6b384218f739de4c19036667a4 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 14:10:58 +0530 Subject: [PATCH 12/14] Fix ruff: reformat with line-length=110 (Airflow project standard) --- .../unit/salesforce/operators/test_bulk_retry.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py index 5de203c2df152..d373c208a3990 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py @@ -64,9 +64,7 @@ def test_no_retry_when_max_retries_zero(self): bulk_mock = mock.MagicMock() bulk_mock.__getattr__("Contact").insert.return_value = [_success(), _success()] - with mock.patch( - "airflow.providers.salesforce.operators.bulk.SalesforceHook" - ) as hook_cls: + with mock.patch("airflow.providers.salesforce.operators.bulk.SalesforceHook") as hook_cls: hook_cls.return_value.get_conn.return_value.bulk = bulk_mock result = op.execute(context={}) @@ -134,9 +132,7 @@ def test_retry_delay_is_respected(self): run_mock = mock.MagicMock(return_value=[_success()]) with mock.patch.object(op, "_run_operation", run_mock): - with mock.patch( - "airflow.providers.salesforce.operators.bulk.time.sleep" - ) as sleep_mock: + with mock.patch("airflow.providers.salesforce.operators.bulk.time.sleep") as sleep_mock: op._retry_transient_failures( bulk=mock.MagicMock(), payload=[{"FirstName": "Ada"}], @@ -146,9 +142,7 @@ def test_retry_delay_is_respected(self): sleep_mock.assert_called_once_with(30.0) def test_custom_transient_error_codes(self): - op = _make_op( - max_retries=1, bulk_retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"] - ) + op = _make_op(max_retries=1, bulk_retry_delay=0, transient_error_codes=["MY_CUSTOM_ERROR"]) assert op.transient_error_codes == frozenset({"MY_CUSTOM_ERROR"}) custom_failure = { From 49c75ce595f20a4e27a87add154ae4e18a10eb41 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 14:11:13 +0530 Subject: [PATCH 13/14] Fix mypy: cast(list,...) in _run_operation; fix ruff: use line-length=110 --- .../providers/salesforce/operators/bulk.py | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 88b7ae80b7e89..ce351ad855b51 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -103,9 +103,7 @@ def __init__( def _validate_inputs(self) -> None: if self.max_retries < 0: - raise ValueError( - f"'max_retries' must be a non-negative integer, got {self.max_retries!r}." - ) + raise ValueError(f"'max_retries' must be a non-negative integer, got {self.max_retries!r}.") if self.bulk_retry_delay < 0: raise ValueError( @@ -113,9 +111,7 @@ def _validate_inputs(self) -> None: ) if not self.object_name: - raise ValueError( - "The required parameter 'object_name' cannot have an empty value." - ) + raise ValueError("The required parameter 'object_name' cannot have an empty value.") if self.operation not in self.available_operations: raise ValueError( @@ -127,21 +123,25 @@ def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: """Submit *payload* through the configured Bulk API operation and return the result list.""" obj = bulk.__getattr__(self.object_name) if self.operation == "upsert": - return obj.upsert( + return cast( + list, + obj.upsert( + data=payload, + external_id_field=self.external_id_field, + batch_size=self.batch_size, + use_serial=self.use_serial, + ), + ) + return cast( + list, + getattr(obj, self.operation)( data=payload, - external_id_field=self.external_id_field, batch_size=self.batch_size, use_serial=self.use_serial, - ) - return getattr(obj, self.operation)( - data=payload, - batch_size=self.batch_size, - use_serial=self.use_serial, + ), ) - def _retry_transient_failures( - self, bulk: SFBulkHandler, payload: list, result: list - ) -> list: + def _retry_transient_failures(self, bulk: SFBulkHandler, payload: list, result: list) -> list: """ Re-submit records that failed with a transient error, up to *max_retries* times. @@ -156,8 +156,7 @@ def _retry_transient_failures( i for i, r in enumerate(final) if not r.get("success") - and {e.get("statusCode") for e in r.get("errors", [])} - & self.transient_error_codes + and {e.get("statusCode") for e in r.get("errors", [])} & self.transient_error_codes ] if not retry_indices: @@ -175,9 +174,7 @@ def _retry_transient_failures( ) time.sleep(self.bulk_retry_delay) - retry_result = list( - self._run_operation(bulk, [payload[i] for i in retry_indices]) - ) + retry_result = list(self._run_operation(bulk, [payload[i] for i in retry_indices])) for list_pos, original_idx in enumerate(retry_indices): final[original_idx] = retry_result[list_pos] From 4896b4fb1756ff9c622ebcd1e1e35858d28e7452 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Thu, 2 Apr 2026 16:20:45 +0530 Subject: [PATCH 14/14] Fix ruff: use cast("list",...) string-quoted form (TC rule) --- .../src/airflow/providers/salesforce/operators/bulk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index ce351ad855b51..720a3c6ad9d69 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -124,7 +124,7 @@ def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: obj = bulk.__getattr__(self.object_name) if self.operation == "upsert": return cast( - list, + "list", obj.upsert( data=payload, external_id_field=self.external_id_field, @@ -133,7 +133,7 @@ def _run_operation(self, bulk: SFBulkHandler, payload: list) -> list: ), ) return cast( - list, + "list", getattr(obj, self.operation)( data=payload, batch_size=self.batch_size,