Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,38 @@ def _validate_inputs(self) -> None:
f"Available operations are {self.available_operations}."
)

def _log_result_failures(self, result: list) -> None:
"""
Log a warning for every record in the Salesforce Bulk API result that has ``success=False``.

Salesforce's Bulk API does not raise when individual records are rejected — it signals them
via ``success=False`` in the result list. Without this, those failures are completely
invisible unless the caller manually inspects the XCom value.
"""
total = len(result)
failed_count = 0

for orig_idx, record in enumerate(result):
if not record.get("success"):
failed_count += 1
for error in record.get("errors", []):
self.log.warning(
"Record failure at result index %d — status: %s | message: %s | fields: %s",
orig_idx,
error.get("statusCode"),
error.get("message"),
error.get("fields"),
)

if failed_count:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
failed_count,
total,
)

def execute(self, context: Context):
"""
Make an HTTP request to Salesforce Bulk API.
Expand Down Expand Up @@ -120,7 +152,10 @@ def execute(self, context: Context):
data=self.payload, batch_size=self.batch_size, use_serial=self.use_serial
)

if self.do_xcom_push and result:
return result
result_list = list(result) if result else []
self._log_result_failures(result_list)

if self.do_xcom_push and result_list:
return result_list

return None
85 changes: 80 additions & 5 deletions providers/salesforce/tests/unit/salesforce/operators/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import logging
from unittest.mock import Mock, patch

import pytest
Expand Down Expand Up @@ -119,7 +120,7 @@ def test_execute_salesforce_bulk_insert(self, mock_get_conn):
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock()
mock_get_conn.return_value.bulk.__getattr__(object_name).insert = Mock(return_value=[])
operator = SalesforceBulkOperator(
task_id="bulk_insert",
operation=operation,
Expand Down Expand Up @@ -152,7 +153,7 @@ def test_execute_salesforce_bulk_update(self, mock_get_conn):
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock()
mock_get_conn.return_value.bulk.__getattr__(object_name).update = Mock(return_value=[])
operator = SalesforceBulkOperator(
task_id="bulk_update",
operation=operation,
Expand Down Expand Up @@ -186,7 +187,7 @@ def test_execute_salesforce_bulk_upsert(self, mock_get_conn):
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock()
mock_get_conn.return_value.bulk.__getattr__(object_name).upsert = Mock(return_value=[])
operator = SalesforceBulkOperator(
task_id="bulk_upsert",
operation=operation,
Expand Down Expand Up @@ -221,7 +222,7 @@ def test_execute_salesforce_bulk_delete(self, mock_get_conn):
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock()
mock_get_conn.return_value.bulk.__getattr__(object_name).delete = Mock(return_value=[])
operator = SalesforceBulkOperator(
task_id="bulk_delete",
operation=operation,
Expand Down Expand Up @@ -254,7 +255,7 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn):
batch_size = 10000
use_serial = True

mock_get_conn.return_value.bulk.__getattr__(object_name).hard_delete = Mock()
mock_get_conn.return_value.bulk.__getattr__(object_name).hard_delete = Mock(return_value=[])
operator = SalesforceBulkOperator(
task_id="bulk_hard_delete",
operation=operation,
Expand All @@ -271,3 +272,77 @@ def test_execute_salesforce_bulk_hard_delete(self, mock_get_conn):
batch_size=batch_size,
use_serial=use_serial,
)

@patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn")
def test_log_result_failures_warns_on_failed_records(self, mock_get_conn, caplog):
"""
When records come back with success=False, _log_result_failures should emit warnings
containing the Salesforce status code and message.
"""
failed_result = [
{"success": True, "created": True, "id": "001xx0000001AAA", "errors": []},
{
"success": False,
"created": False,
"id": None,
"errors": [
{"statusCode": "INVALID_FIELD", "message": "No such column 'Bad_Field'", "fields": []}
],
},
]
mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(return_value=failed_result)

operator = SalesforceBulkOperator(
task_id="bulk_insert_with_failure",
operation="insert",
object_name="Account",
payload=[{"Name": "OK"}, {"Bad_Field": "x"}],
do_xcom_push=False,
)

with caplog.at_level(logging.WARNING, logger="airflow.providers.salesforce.operators.bulk"):
operator.execute(context={})

assert "1/2 record(s) failed" in caplog.text
assert "INVALID_FIELD" in caplog.text
assert "No such column 'Bad_Field'" in caplog.text

@patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn")
def test_log_result_failures_no_warning_on_full_success(self, mock_get_conn, caplog):
"""
When all records succeed, _log_result_failures should not emit any warnings.
"""
success_result = [{"success": True, "created": True, "id": "001xx0000001BBB", "errors": []}]
mock_get_conn.return_value.bulk.__getattr__("Lead").insert = Mock(return_value=success_result)

operator = SalesforceBulkOperator(
task_id="bulk_insert_all_success",
operation="insert",
object_name="Lead",
payload=[{"LastName": "Test"}],
do_xcom_push=False,
)

with caplog.at_level(logging.WARNING, logger="airflow.providers.salesforce.operators.bulk"):
operator.execute(context={})

assert "record(s) failed" not in caplog.text

@patch("airflow.providers.salesforce.operators.bulk.SalesforceHook.get_conn")
def test_execute_returns_result_list_when_xcom_push(self, mock_get_conn):
"""
execute() should return the materialised result list when do_xcom_push is True.
"""
success_result = [{"success": True, "created": True, "id": "001xx0000001CCC", "errors": []}]
mock_get_conn.return_value.bulk.__getattr__("Account").insert = Mock(return_value=success_result)

operator = SalesforceBulkOperator(
task_id="bulk_insert_xcom",
operation="insert",
object_name="Account",
payload=[{"Name": "Test Account"}],
do_xcom_push=True,
)

result = operator.execute(context={})
assert result == success_result
Loading