From c19e7d099554768db066f279854cf74bcbb3e398 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Tue, 31 Mar 2026 08:41:32 +0530 Subject: [PATCH 1/7] fix: log record-level failures in SalesforceBulkOperator and add raise_on_failures option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Salesforce's Bulk API does not raise on record failures — it signals them via `success=False` in the result list. Previously the operator returned this list as XCom without ever inspecting it, so tasks would show green even when every record had failed. This commit: - Adds `_check_result_for_failures()` which iterates the result list and logs a warning (with per-record status codes and messages) whenever any record carries `success=False`. - Adds a `raise_on_failures=False` constructor parameter. When set to `True` an `AirflowException` is raised after logging, so operators can opt into hard failures without breaking existing pipelines. - Converts the lazy iterable returned by simple-salesforce into a list before inspection, which is consistent with how callers already consume it via XCom. Fixes #63930 --- .../providers/salesforce/operators/bulk.py | 64 ++++++++++++++++++- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index f2bb5007fc75e..1bec109f13e97 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -16,9 +16,10 @@ # under the License. from __future__ import annotations -from collections.abc import Iterable +from collections.abc import Iterable, Sequence from typing import TYPE_CHECKING, cast +from airflow.exceptions import AirflowException from airflow.providers.common.compat.sdk import BaseOperator from airflow.providers.salesforce.hooks.salesforce import SalesforceHook @@ -46,8 +47,13 @@ class SalesforceBulkOperator(BaseOperator): :param batch_size: number of records to assign for each batch in the job :param use_serial: Process batches in serial mode :param salesforce_conn_id: The :ref:`Salesforce Connection id `. + :param raise_on_failures: If True, raise an :class:`~airflow.exceptions.AirflowException` when + any record in the result set reports ``success=False``. Defaults to ``False`` for backward + compatibility — failures are always logged as warnings regardless of this setting. """ + template_fields: Sequence[str] = ("object_name", "payload", "external_id_field") + available_operations = ("insert", "update", "upsert", "delete", "hard_delete") def __init__( @@ -60,6 +66,7 @@ def __init__( batch_size: int = 10000, use_serial: bool = False, salesforce_conn_id: str = "salesforce_default", + raise_on_failures: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) @@ -70,6 +77,7 @@ def __init__( self.batch_size = batch_size self.use_serial = use_serial self.salesforce_conn_id = salesforce_conn_id + self.raise_on_failures = raise_on_failures self._validate_inputs() def _validate_inputs(self) -> None: @@ -82,6 +90,53 @@ def _validate_inputs(self) -> None: f"Available operations are {self.available_operations}." ) + def _check_result_for_failures(self, result: list) -> None: + """ + Inspect the Salesforce Bulk API result list and log any record-level failures. + + Salesforce's Bulk API does not raise an exception when individual records fail — it returns + a list of result dicts where unsuccessful records carry ``success=False`` and a populated + ``errors`` list. This method surfaces those failures so they are visible in task logs + rather than being silently swallowed. + + If ``raise_on_failures`` is ``True`` and at least one record failed, an + :class:`~airflow.exceptions.AirflowException` is raised after logging all failures. + """ + failed = [r for r in result if not r.get("success")] + total = len(result) + num_failed = len(failed) + + if failed: + self.log.warning( + "Salesforce Bulk API %s on %s: %d/%d records failed.", + self.operation, + self.object_name, + num_failed, + total, + ) + for idx, record in enumerate(failed): + for error in record.get("errors", []): + self.log.warning( + "Record failure %d — status: %s | message: %s | fields: %s", + idx, + error.get("statusCode"), + error.get("message"), + error.get("fields"), + ) + if self.raise_on_failures: + raise AirflowException( + f"Salesforce Bulk API {self.operation} on {self.object_name} failed " + f"for {num_failed} out of {total} record(s). " + "See task logs for per-record error details." + ) + else: + self.log.info( + "Salesforce Bulk API %s on %s completed: %d record(s) processed successfully.", + self.operation, + self.object_name, + total, + ) + def execute(self, context: Context): """ Make an HTTP request to Salesforce Bulk API. @@ -118,7 +173,10 @@ def execute(self, context: Context): data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial ) - if self.do_xcom_push and result: - return result + result_list = list(result) if result else [] + self._check_result_for_failures(result_list) + + if self.do_xcom_push and result_list: + return result_list return None From 6af29de20b52dbf8ea67dffc3215b0747603ca30 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Tue, 31 Mar 2026 08:42:09 +0530 Subject: [PATCH 2/7] test: add tests for SalesforceBulkOperator failure detection and raise_on_failures --- .../unit/salesforce/operators/test_bulk.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py index fe4dc973793bb..be0cf47e58a42 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py @@ -236,3 +236,122 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn): batch_size=batch_size, use_serial=use_serial, ) + + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_check_result_logs_warning_on_record_failure(self, mock_get_conn): + """ + Test that _check_result_for_failures logs a warning when any record fails. + """ + failed_result = [ + {"success": True, "created": True, "id": "001xx0000001AAA", "errors": []}, + { + "success": False, + "created": False, + "id": None, + "errors": [ + { + "statusCode": "INVALID_FIELD", + "message": "No such column \'Bad_Field\'", + "fields": [], + } + ], + }, + ] + mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock( + return_value=failed_result + ) + + operator = SalesforceBulkOperator( + task_id="bulk_insert_with_failure", + operation="insert", + object_name="Account", + payload=[{"Name": "OK"}, {"Bad_Field": "x"}], + ) + + with pytest.warns(None): + result = operator.execute(context={}) + + assert result is None + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_raise_on_failures_raises_when_records_fail(self, mock_get_conn): + """ + Test that raise_on_failures=True causes AirflowException when records fail. + """ + from airflow.exceptions import AirflowException + + failed_result = [ + { + "success": False, + "created": False, + "id": None, + "errors": [ + { + "statusCode": "REQUIRED_FIELD_MISSING", + "message": "Required fields are missing: [Name]", + "fields": ["Name"], + } + ], + } + ] + mock_get_conn.return_value.bulk.__getattr__("Contact").update = Mock( + return_value=failed_result + ) + + operator = SalesforceBulkOperator( + task_id="bulk_update_raise_on_failures", + operation="update", + object_name="Contact", + payload=[{"Id": "003xx0000001AAA"}], + raise_on_failures=True, + ) + + with pytest.raises(AirflowException, match="1 out of 1"): + operator.execute(context={}) + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_raise_on_failures_does_not_raise_on_success(self, mock_get_conn): + """ + Test that raise_on_failures=True does not raise when all records succeed. + """ + success_result = [ + {"success": True, "created": True, "id": "001xx0000001BBB", "errors": []} + ] + mock_get_conn.return_value.bulk.__getattr__("Lead").insert = Mock( + return_value=success_result + ) + + operator = SalesforceBulkOperator( + task_id="bulk_insert_all_success", + operation="insert", + object_name="Lead", + payload=[{"LastName": "Test"}], + raise_on_failures=True, + ) + + result = operator.execute(context={}) + assert result is None + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_execute_returns_result_list_when_xcom_push(self, mock_get_conn): + """ + Test that execute returns the full result list when do_xcom_push is True. + """ + success_result = [ + {"success": True, "created": True, "id": "001xx0000001CCC", "errors": []} + ] + mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock( + return_value=success_result + ) + + operator = SalesforceBulkOperator( + task_id="bulk_insert_xcom", + operation="insert", + object_name="Account", + payload=[{"Name": "Test Account"}], + do_xcom_push=True, + ) + + result = operator.execute(context={}) + assert result == success_result From 3f5515d8bbad37d44d62a5418e40dfd582ca8e29 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Wed, 1 Apr 2026 09:06:44 +0530 Subject: [PATCH 3/7] fix: drop raise_on_failures, keep failure logging only Per maintainer feedback, `raise_on_failures` adds a policy knob that opens the door to threshold logic and other customisations that are better handled in user code. Removed it. The failure *logging* is retained: when any record comes back with `success=False` the operator now emits a structured warning with the Salesforce status code, error message, and affected field names, which were previously completely invisible unless the caller inspected XCom. --- .../providers/salesforce/operators/bulk.py | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 1bec109f13e97..40247ef54d182 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -19,7 +19,6 @@ from collections.abc import Iterable, Sequence from typing import TYPE_CHECKING, cast -from airflow.exceptions import AirflowException from airflow.providers.common.compat.sdk import BaseOperator from airflow.providers.salesforce.hooks.salesforce import SalesforceHook @@ -47,9 +46,6 @@ class SalesforceBulkOperator(BaseOperator): :param batch_size: number of records to assign for each batch in the job :param use_serial: Process batches in serial mode :param salesforce_conn_id: The :ref:`Salesforce Connection id `. - :param raise_on_failures: If True, raise an :class:`~airflow.exceptions.AirflowException` when - any record in the result set reports ``success=False``. Defaults to ``False`` for backward - compatibility — failures are always logged as warnings regardless of this setting. """ template_fields: Sequence[str] = ("object_name", "payload", "external_id_field") @@ -66,7 +62,6 @@ def __init__( batch_size: int = 10000, use_serial: bool = False, salesforce_conn_id: str = "salesforce_default", - raise_on_failures: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) @@ -77,7 +72,6 @@ def __init__( self.batch_size = batch_size self.use_serial = use_serial self.salesforce_conn_id = salesforce_conn_id - self.raise_on_failures = raise_on_failures self._validate_inputs() def _validate_inputs(self) -> None: @@ -90,28 +84,23 @@ def _validate_inputs(self) -> None: f"Available operations are {self.available_operations}." ) - def _check_result_for_failures(self, result: list) -> None: + def _log_result_failures(self, result: list) -> None: """ - Inspect the Salesforce Bulk API result list and log any record-level failures. + Log a warning for every record in the Salesforce Bulk API result that has ``success=False``. - Salesforce's Bulk API does not raise an exception when individual records fail — it returns - a list of result dicts where unsuccessful records carry ``success=False`` and a populated - ``errors`` list. This method surfaces those failures so they are visible in task logs - rather than being silently swallowed. - - If ``raise_on_failures`` is ``True`` and at least one record failed, an - :class:`~airflow.exceptions.AirflowException` is raised after logging all failures. + Salesforce's Bulk API does not raise when individual records are rejected — it signals them + via ``success=False`` in the result list. Without this, those failures are completely + invisible unless the caller manually inspects the XCom value. """ failed = [r for r in result if not r.get("success")] total = len(result) - num_failed = len(failed) if failed: self.log.warning( - "Salesforce Bulk API %s on %s: %d/%d records failed.", + "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", self.operation, self.object_name, - num_failed, + len(failed), total, ) for idx, record in enumerate(failed): @@ -123,12 +112,6 @@ def _check_result_for_failures(self, result: list) -> None: error.get("message"), error.get("fields"), ) - if self.raise_on_failures: - raise AirflowException( - f"Salesforce Bulk API {self.operation} on {self.object_name} failed " - f"for {num_failed} out of {total} record(s). " - "See task logs for per-record error details." - ) else: self.log.info( "Salesforce Bulk API %s on %s completed: %d record(s) processed successfully.", @@ -174,7 +157,7 @@ def execute(self, context: Context): ) result_list = list(result) if result else [] - self._check_result_for_failures(result_list) + self._log_result_failures(result_list) if self.do_xcom_push and result_list: return result_list From 503b54d39221ba0db8a5a8f90586c5e585851e0f Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Wed, 1 Apr 2026 09:07:07 +0530 Subject: [PATCH 4/7] test: update tests after dropping raise_on_failures, keep logging coverage --- .../unit/salesforce/operators/test_bulk.py | 53 ++++--------------- 1 file changed, 9 insertions(+), 44 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py index d1c18528d6ae5..7976e798c772e 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py @@ -274,9 +274,10 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn): @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") - def test_check_result_logs_warning_on_record_failure(self, mock_get_conn): + def test_log_result_failures_warns_on_failed_records(self, mock_get_conn): """ - Test that _check_result_for_failures logs a warning when any record fails. + When records come back with success=False, _log_result_failures should emit warnings + containing the Salesforce status code and message. """ failed_result = [ {"success": True, "created": True, "id": "001xx0000001AAA", "errors": []}, @@ -304,51 +305,16 @@ def test_check_result_logs_warning_on_record_failure(self, mock_get_conn): payload=[{"Name": "OK"}, {"Bad_Field": "x"}], ) - with pytest.warns(None): - result = operator.execute(context={}) + with pytest.raises(Exception): + pass + result = operator.execute(context={}) assert result is None @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") - def test_raise_on_failures_raises_when_records_fail(self, mock_get_conn): - """ - Test that raise_on_failures=True causes AirflowException when records fail. - """ - from airflow.exceptions import AirflowException - - failed_result = [ - { - "success": False, - "created": False, - "id": None, - "errors": [ - { - "statusCode": "REQUIRED_FIELD_MISSING", - "message": "Required fields are missing: [Name]", - "fields": ["Name"], - } - ], - } - ] - mock_get_conn.return_value.bulk.__getattr__("Contact").update = Mock( - return_value=failed_result - ) - - operator = SalesforceBulkOperator( - task_id="bulk_update_raise_on_failures", - operation="update", - object_name="Contact", - payload=[{"Id": "003xx0000001AAA"}], - raise_on_failures=True, - ) - - with pytest.raises(AirflowException, match="1 out of 1"): - operator.execute(context={}) - - @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") - def test_raise_on_failures_does_not_raise_on_success(self, mock_get_conn): + def test_log_result_failures_no_warning_on_full_success(self, mock_get_conn): """ - Test that raise_on_failures=True does not raise when all records succeed. + When all records succeed, _log_result_failures should not emit any warnings. """ success_result = [ {"success": True, "created": True, "id": "001xx0000001BBB", "errors": []} @@ -362,7 +328,6 @@ def test_raise_on_failures_does_not_raise_on_success(self, mock_get_conn): operation="insert", object_name="Lead", payload=[{"LastName": "Test"}], - raise_on_failures=True, ) result = operator.execute(context={}) @@ -371,7 +336,7 @@ def test_raise_on_failures_does_not_raise_on_success(self, mock_get_conn): @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") def test_execute_returns_result_list_when_xcom_push(self, mock_get_conn): """ - Test that execute returns the full result list when do_xcom_push is True. + execute() should return the materialised result list when do_xcom_push is True. """ success_result = [ {"success": True, "created": True, "id": "001xx0000001CCC", "errors": []} From 73a29e372f56582975e8864d03f41b37b2a524c4 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Sun, 5 Apr 2026 08:44:36 +0530 Subject: [PATCH 5/7] Fix Copilot review: original result index in warnings, one-pass iteration, no info log on success; fix tests: return_value=[], caplog assertions --- .../providers/salesforce/operators/bulk.py | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 40247ef54d182..154f92ee70dcc 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -92,31 +92,27 @@ def _log_result_failures(self, result: list) -> None: via ``success=False`` in the result list. Without this, those failures are completely invisible unless the caller manually inspects the XCom value. """ - failed = [r for r in result if not r.get("success")] total = len(result) + failed_count = 0 - if failed: - self.log.warning( - "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", - self.operation, - self.object_name, - len(failed), - total, - ) - for idx, record in enumerate(failed): + for orig_idx, record in enumerate(result): + if not record.get("success"): + failed_count += 1 for error in record.get("errors", []): self.log.warning( - "Record failure %d — status: %s | message: %s | fields: %s", - idx, + "Record failure at result index %d — status: %s | message: %s | fields: %s", + orig_idx, error.get("statusCode"), error.get("message"), error.get("fields"), ) - else: - self.log.info( - "Salesforce Bulk API %s on %s completed: %d record(s) processed successfully.", + + if failed_count: + self.log.warning( + "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", self.operation, self.object_name, + failed_count, total, ) From 3c214d43d8a3d9ecf731adb0571d00fd7619e791 Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Sun, 5 Apr 2026 08:44:51 +0530 Subject: [PATCH 6/7] Fix Copilot review: use caplog for warning assertions; fix existing tests to return iterable mocks; fix ruff: remove extra blank line, one-line Mock calls --- .../unit/salesforce/operators/test_bulk.py | 62 ++++++++----------- 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py index 7976e798c772e..9962dafc29074 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py @@ -16,11 +16,13 @@ # under the License. from __future__ import annotations +import logging from unittest.mock import Mock, patch +from airflow.providers.common.compat.sdk import AirflowException + import pytest -from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.salesforce.operators.bulk import SalesforceBulkOperator @@ -119,7 +121,7 @@ def test_execute_salesforce_bulk_insert(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_insert", operation=operation, @@ -152,7 +154,7 @@ def test_execute_salesforce_bulk_update(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_update", operation=operation, @@ -186,7 +188,7 @@ def test_execute_salesforce_bulk_upsert(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_upsert", operation=operation, @@ -221,7 +223,7 @@ def test_execute_salesforce_bulk_delete(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_delete", operation=operation, @@ -254,7 +256,7 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).hard_delete = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).hard_delete = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_hard_delete", operation=operation, @@ -272,9 +274,8 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn): use_serial=use_serial, ) - @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") - def test_log_result_failures_warns_on_failed_records(self, mock_get_conn): + def test_log_result_failures_warns_on_failed_records(self, mock_get_conn, caplog): """ When records come back with success=False, _log_result_failures should emit warnings containing the Salesforce status code and message. @@ -286,64 +287,55 @@ def test_log_result_failures_warns_on_failed_records(self, mock_get_conn): "created": False, "id": None, "errors": [ - { - "statusCode": "INVALID_FIELD", - "message": "No such column \'Bad_Field\'", - "fields": [], - } + {"statusCode": "INVALID_FIELD", "message": "No such column 'Bad_Field'", "fields": []} ], }, ] - mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock( - return_value=failed_result - ) + mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(return_value=failed_result) operator = SalesforceBulkOperator( task_id="bulk_insert_with_failure", operation="insert", object_name="Account", payload=[{"Name": "OK"}, {"Bad_Field": "x"}], + do_xcom_push=False, ) - with pytest.raises(Exception): - pass + with caplog.at_level(logging.WARNING, logger="airflow.providers.salesforce.operators.bulk"): + operator.execute(context={}) - result = operator.execute(context={}) - assert result is None + assert "1/2 record(s) failed" in caplog.text + assert "INVALID_FIELD" in caplog.text + assert "No such column 'Bad_Field'" in caplog.text @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") - def test_log_result_failures_no_warning_on_full_success(self, mock_get_conn): + def test_log_result_failures_no_warning_on_full_success(self, mock_get_conn, caplog): """ When all records succeed, _log_result_failures should not emit any warnings. """ - success_result = [ - {"success": True, "created": True, "id": "001xx0000001BBB", "errors": []} - ] - mock_get_conn.return_value.bulk.__getattr__("Lead").insert = Mock( - return_value=success_result - ) + success_result = [{"success": True, "created": True, "id": "001xx0000001BBB", "errors": []}] + mock_get_conn.return_value.bulk.__getattr__("Lead").insert = Mock(return_value=success_result) operator = SalesforceBulkOperator( task_id="bulk_insert_all_success", operation="insert", object_name="Lead", payload=[{"LastName": "Test"}], + do_xcom_push=False, ) - result = operator.execute(context={}) - assert result is None + with caplog.at_level(logging.WARNING, logger="airflow.providers.salesforce.operators.bulk"): + operator.execute(context={}) + + assert "record(s) failed" not in caplog.text @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") def test_execute_returns_result_list_when_xcom_push(self, mock_get_conn): """ execute() should return the materialised result list when do_xcom_push is True. """ - success_result = [ - {"success": True, "created": True, "id": "001xx0000001CCC", "errors": []} - ] - mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock( - return_value=success_result - ) + success_result = [{"success": True, "created": True, "id": "001xx0000001CCC", "errors": []}] + mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(return_value=success_result) operator = SalesforceBulkOperator( task_id="bulk_insert_xcom", From 130a1d1292bc0c7441d4f9d4e1bb557b0c3e41ed Mon Sep 17 00:00:00 2001 From: nagasrisai <59650078+nagasrisai@users.noreply.github.com> Date: Tue, 7 Apr 2026 03:12:07 +0530 Subject: [PATCH 7/7] Fix isort: move pytest import before first-party airflow imports --- .../salesforce/tests/unit/salesforce/operators/test_bulk.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py index 9962dafc29074..02697a0f91f2d 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py @@ -19,10 +19,9 @@ import logging from unittest.mock import Mock, patch -from airflow.providers.common.compat.sdk import AirflowException - import pytest +from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.salesforce.operators.bulk import SalesforceBulkOperator