diff --git a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py index 7b5d21030db02..154f92ee70dcc 100644 --- a/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py +++ b/providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py @@ -84,6 +84,38 @@ def _validate_inputs(self) -> None: f"Available operations are {self.available_operations}." ) + def _log_result_failures(self, result: list) -> None: + """ + Log a warning for every record in the Salesforce Bulk API result that has ``success=False``. + + Salesforce's Bulk API does not raise when individual records are rejected — it signals them + via ``success=False`` in the result list. Without this, those failures are completely + invisible unless the caller manually inspects the XCom value. + """ + total = len(result) + failed_count = 0 + + for orig_idx, record in enumerate(result): + if not record.get("success"): + failed_count += 1 + for error in record.get("errors", []): + self.log.warning( + "Record failure at result index %d — status: %s | message: %s | fields: %s", + orig_idx, + error.get("statusCode"), + error.get("message"), + error.get("fields"), + ) + + if failed_count: + self.log.warning( + "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", + self.operation, + self.object_name, + failed_count, + total, + ) + def execute(self, context: Context): """ Make an HTTP request to Salesforce Bulk API. @@ -120,7 +152,10 @@ def execute(self, context: Context): data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial ) - if self.do_xcom_push and result: - return result + result_list = list(result) if result else [] + self._log_result_failures(result_list) + + if self.do_xcom_push and result_list: + return result_list return None diff --git a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py index 96589ff1c93a3..02697a0f91f2d 100644 --- a/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py +++ b/providers/salesforce/tests/unit/salesforce/operators/test_bulk.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import logging from unittest.mock import Mock, patch import pytest @@ -119,7 +120,7 @@ def test_execute_salesforce_bulk_insert(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_insert", operation=operation, @@ -152,7 +153,7 @@ def test_execute_salesforce_bulk_update(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_update", operation=operation, @@ -186,7 +187,7 @@ def test_execute_salesforce_bulk_upsert(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_upsert", operation=operation, @@ -221,7 +222,7 @@ def test_execute_salesforce_bulk_delete(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_delete", operation=operation, @@ -254,7 +255,7 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn): batch_size = 10000 use_serial = True - mock_get_conn.return_value.bulk.__getattr__(object_name).hard_delete = Mock() + mock_get_conn.return_value.bulk.__getattr__(object_name).hard_delete = Mock(return_value=[]) operator = SalesforceBulkOperator( task_id="bulk_hard_delete", operation=operation, @@ -271,3 +272,77 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn): batch_size=batch_size, use_serial=use_serial, ) + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_log_result_failures_warns_on_failed_records(self, mock_get_conn, caplog): + """ + When records come back with success=False, _log_result_failures should emit warnings + containing the Salesforce status code and message. + """ + failed_result = [ + {"success": True, "created": True, "id": "001xx0000001AAA", "errors": []}, + { + "success": False, + "created": False, + "id": None, + "errors": [ + {"statusCode": "INVALID_FIELD", "message": "No such column 'Bad_Field'", "fields": []} + ], + }, + ] + mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(return_value=failed_result) + + operator = SalesforceBulkOperator( + task_id="bulk_insert_with_failure", + operation="insert", + object_name="Account", + payload=[{"Name": "OK"}, {"Bad_Field": "x"}], + do_xcom_push=False, + ) + + with caplog.at_level(logging.WARNING, logger="airflow.providers.salesforce.operators.bulk"): + operator.execute(context={}) + + assert "1/2 record(s) failed" in caplog.text + assert "INVALID_FIELD" in caplog.text + assert "No such column 'Bad_Field'" in caplog.text + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_log_result_failures_no_warning_on_full_success(self, mock_get_conn, caplog): + """ + When all records succeed, _log_result_failures should not emit any warnings. + """ + success_result = [{"success": True, "created": True, "id": "001xx0000001BBB", "errors": []}] + mock_get_conn.return_value.bulk.__getattr__("Lead").insert = Mock(return_value=success_result) + + operator = SalesforceBulkOperator( + task_id="bulk_insert_all_success", + operation="insert", + object_name="Lead", + payload=[{"LastName": "Test"}], + do_xcom_push=False, + ) + + with caplog.at_level(logging.WARNING, logger="airflow.providers.salesforce.operators.bulk"): + operator.execute(context={}) + + assert "record(s) failed" not in caplog.text + + @patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn") + def test_execute_returns_result_list_when_xcom_push(self, mock_get_conn): + """ + execute() should return the materialised result list when do_xcom_push is True. + """ + success_result = [{"success": True, "created": True, "id": "001xx0000001CCC", "errors": []}] + mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(return_value=success_result) + + operator = SalesforceBulkOperator( + task_id="bulk_insert_xcom", + operation="insert", + object_name="Account", + payload=[{"Name": "Test Account"}], + do_xcom_push=True, + ) + + result = operator.execute(context={}) + assert result == success_result