Skip to content

fix: log and optionally raise on record-level failures in SalesforceBulkOperator#64519

Draft
nagasrisai wants to merge 7 commits intoapache:mainfrom
nagasrisai:fix/salesforce-bulk-operator-silent-failures
Draft

fix: log and optionally raise on record-level failures in SalesforceBulkOperator#64519
nagasrisai wants to merge 7 commits intoapache:mainfrom
nagasrisai:fix/salesforce-bulk-operator-silent-failures

Conversation

@nagasrisai
Copy link
Copy Markdown
Contributor

@nagasrisai nagasrisai commented Mar 31, 2026

I ran into this while going through #63930. The Salesforce Bulk API does not throw exceptions when individual records fail. Instead it returns a list of results where failed records have success=False and an errors field with details. The operator was never looking at that list, so a task could process a thousand records, have every single one rejected by Salesforce, and still show green in Airflow. You would only find out something went wrong by digging into the XCom value yourself.

This adds _log_result_failures() which runs after every operation and emits a structured warning for each failed record with the Salesforce status code, error message, and field names. That is the full extent of the change — no new parameters, no policy decisions, just making failures visible in the task log where they belong.

The result iterable from simple-salesforce is also materialised into a list before the check so we only iterate it once, which is consistent with how callers already consume it via XCom.

Closes #63930

…e_on_failures option

Salesforce's Bulk API does not raise on record failures — it signals them
via `success=False` in the result list. Previously the operator returned
this list as XCom without ever inspecting it, so tasks would show green
even when every record had failed.

This commit:
- Adds `_check_result_for_failures()` which iterates the result list and
  logs a warning (with per-record status codes and messages) whenever any
  record carries `success=False`.
- Adds a `raise_on_failures=False` constructor parameter. When set to
  `True` an `AirflowException` is raised after logging, so operators can
  opt into hard failures without breaking existing pipelines.
- Converts the lazy iterable returned by simple-salesforce into a list
  before inspection, which is consistent with how callers already consume
  it via XCom.

Fixes apache#63930
Comment on lines +50 to +52
:param raise_on_failures: If True, raise an :class:`~airflow.exceptions.AirflowException` when
any record in the result set reports ``success=False``. Defaults to ``False`` for backward
compatibility — failures are always logged as warnings regardless of this setting.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This open the door to so many customizations that we won't be able to accommodate.
For example - your use case is yes/no - even a single error will be raised. What if for someone else the desired logic will be threshold? say X out of the total? There can be so many customized logics.

I tend to say that this feature should be kept in your local deployment rather than be served from the project code.

Copy link
Copy Markdown
Contributor Author

@nagasrisai nagasrisai Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal That is a fair point and I agree with you. The raise_on_failures parameter was probably overreach on my part since you could reasonably want threshold logic, partial-failure tolerance, or all sorts of custom handling once you open that door.

I am happy to drop that parameter entirely. The part I would like to keep is the logging, right now if every record comes back with success=False the task shows green and produces no output other than the raw XCom value. Just logging a warning that says how many records failed and what the Salesforce error codes were seems clearly useful without adding any policy footprint. Would you be OK with a version that removes raise_on_failures and only adds the failure logging?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK with logging.
I suggest you check the source code of
BigQueryInsertJobOperator there is a handle error mechanism there and submitting retry in case of error. Maybe if that make sense you can implement similar approach? While may not be exactly what you asked it may reduce the possibility of error rows and this is a feature thay benefit everyone

Copy link
Copy Markdown
Contributor Author

@nagasrisai nagasrisai Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal Thanks for pointing me there. I had a look at BigQueryInsertJobOperator and the key difference I noticed is that it retries at the job level, specifically resubmitting the entire job when it hits a rate-limit error from the API. Salesforce Bulk errors are different in that they live at the record level inside a successful API response, so the retry target would be a subset of the original payload rather than the whole request.

That said, the idea makes sense for certain Salesforce error codes that are genuinely transient, things like UNABLE_TO_LOCK_ROW or API_TEMPORARILY_UNAVAILABLE. For those, re-submitting just the failed records after a short delay has a reasonable chance of succeeding. Errors like INVALID_FIELD or REQUIRED_FIELD_MISSING are obviously not worth retrying.

Would you want me to add that to this PR, or keep this one focused on the logging and open a follow-up for the retry behaviour? I am happy either way, just want to make sure I do not make this PR harder to review by expanding its scope without checking first.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wish to keep the logging part separate you can convert the logging code here and set the feature change for another PR.

Copy link
Copy Markdown
Contributor Author

@nagasrisai nagasrisai Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal Sounds good, I will raise a separate PR for the retry mechanism. The current PR already has only the logging part after the earlier update, so no further changes needed here.

Whenever you get a chance, would really appreciate a review on this one. No rush at all, take your time.

Per maintainer feedback, `raise_on_failures` adds a policy knob
that opens the door to threshold logic and other customisations
that are better handled in user code. Removed it.

The failure *logging* is retained: when any record comes back with
`success=False` the operator now emits a structured warning with the
Salesforce status code, error message, and affected field names, which
were previously completely invisible unless the caller inspected XCom.
@nagasrisai nagasrisai requested a review from eladkal April 1, 2026 06:47
nagasrisai added a commit to nagasrisai/airflow that referenced this pull request Apr 1, 2026
Introduces max_retries, retry_delay, and transient_error_codes parameters.
When max_retries > 0, records that fail with a transient Salesforce error
(UNABLE_TO_LOCK_ROW, API_TEMPORARILY_UNAVAILABLE by default) are
re-submitted after retry_delay seconds, up to max_retries times.
Only the failed records are re-submitted, not the entire payload.

Related to apache#64519
@nagasrisai
Copy link
Copy Markdown
Contributor Author

nagasrisai commented Apr 1, 2026

The follow-up PR for the retry mechanism is now up: #64575. It adds max_retries, retry_delay, and transient_error_codes parameters following the pattern you suggested from BigQueryInsertJobOperator. Defaults to disabled (max_retries=0) so existing behaviour is unchanged.

nagasrisai added a commit to nagasrisai/airflow that referenced this pull request Apr 1, 2026
Introduces max_retries, retry_delay, and transient_error_codes params.
When max_retries > 0, records that fail with a transient Salesforce error
(UNABLE_TO_LOCK_ROW or API_TEMPORARILY_UNAVAILABLE by default) are
re-submitted after retry_delay seconds, up to max_retries times.
Only the failed records are re-submitted, not the entire payload.

Related to apache#64519
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 1, 2026

@nagasrisai This PR has been converted to draft because it does not yet meet our Pull Request quality criteria.

Issues found:

  • Pre-commit / static checks: Failing: CI image checks / Static checks. Run prek run --from-ref main locally to find and fix issues. See Pre-commit / static checks docs.
  • Provider tests: Failing: provider distributions tests / Compat 2.11.1:P3.10:, provider distributions tests / Compat 3.0.6:P3.10:, provider distributions tests / Compat 3.1.8:P3.10:, Non-DB tests: providers / Non-DB-prov::3.10:amazon...google, Low dep tests: providers / All-prov:LowestDeps:14:3.10:amazon...salesforce. Run provider tests with breeze run pytest <provider-test-path> -xvs. See Provider tests docs.
  • ⚠️ Unresolved review comments: This PR has 1 unresolved review thread from maintainers: @eladkal (MEMBER): 1 unresolved thread. Please review and resolve all inline review comments before requesting another review. You can resolve a conversation by clicking 'Resolve conversation' on each thread after addressing the feedback. See pull request guidelines.

What to do next:

  • The comment informs you what you need to do.
  • Fix each issue, then mark the PR as "Ready for review" in the GitHub UI - but only after making sure that all the issues are fixed.
  • There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates.
  • Maintainers will then proceed with a normal review.

Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Add visibility into per-record Salesforce Bulk API failures by logging failed results after each bulk operation, and adjust execute() to materialize results for single-pass iteration / optional XCom return.

Changes:

  • Added _log_result_failures() to warn on success=False records (and currently info-log on full success).
  • Materialized Bulk API result iterables into a list before logging / returning via XCom.
  • Added unit tests around logging behavior and XCom return behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py Logs record-level failures and materializes results before optional XCom return.
providers/salesforce/tests/unit/salesforce/operators/test_bulk.py Adds tests intended to validate warnings/no-warnings and XCom return semantics.

Comment on lines +308 to +312
with pytest.raises(Exception):
pass

result = operator.execute(context={})
assert result is None
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test names/docstrings claim warnings are emitted (or not emitted), but the tests currently don’t assert on logs at all, and with pytest.raises(Exception): pass is a no-op that doesn’t validate behavior. Use caplog (or assertLogs) to assert that warnings are produced for failed records (including statusCode/message) and that no warnings are produced on full success; remove the no-op pytest.raises block.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +109
failed = [r for r in result if not r.get("success")]
total = len(result)

if failed:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
len(failed),
total,
)
for idx, record in enumerate(failed):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s | fields: %s",
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logged idx is the index within the filtered failed list, not the original record index in the result/payload. This can mislead debugging (e.g., the first failure logs as 0 even if it was record 57). Iterate with enumerate(result) and log the original index for each failure (or include both original index and a failure counter).

Suggested change
failed = [r for r in result if not r.get("success")]
total = len(result)
if failed:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
len(failed),
total,
)
for idx, record in enumerate(failed):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s | fields: %s",
total = len(result)
failed_indices = [idx for idx, r in enumerate(result) if not r.get("success")]
if failed_indices:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
len(failed_indices),
total,
)
for idx in failed_indices:
record = result[idx]
for error in record.get("errors", []):
self.log.warning(
"Record failure at result index %d — status: %s | message: %s | fields: %s",

Copilot uses AI. Check for mistakes.
Comment on lines +106 to +114
for idx, record in enumerate(failed):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s | fields: %s",
idx,
error.get("statusCode"),
error.get("message"),
error.get("fields"),
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logged idx is the index within the filtered failed list, not the original record index in the result/payload. This can mislead debugging (e.g., the first failure logs as 0 even if it was record 57). Iterate with enumerate(result) and log the original index for each failure (or include both original index and a failure counter).

Copilot uses AI. Check for mistakes.
Comment on lines +115 to +121
else:
self.log.info(
"Salesforce Bulk API %s on %s completed: %d record(s) processed successfully.",
self.operation,
self.object_name,
total,
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says the change is limited to making failures visible via warnings (no new policy/behavior beyond that). This adds an info log line on full success, which is additional behavior and may increase log volume. Either update the PR description to reflect this, or change this branch to be silent / debug-level to align with the stated scope.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +114
failed = [r for r in result if not r.get("success")]
total = len(result)

if failed:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
len(failed),
total,
)
for idx, record in enumerate(failed):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s | fields: %s",
idx,
error.get("statusCode"),
error.get("message"),
error.get("fields"),
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates an additional list (failed) that can be sizable (default batch sizes can be large), effectively duplicating references and increasing memory churn. Consider iterating result once, counting failures and logging per failure as you go (or collecting only minimal data needed for the summary) to avoid allocating a second list.

Suggested change
failed = [r for r in result if not r.get("success")]
total = len(result)
if failed:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
len(failed),
total,
)
for idx, record in enumerate(failed):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s | fields: %s",
idx,
error.get("statusCode"),
error.get("message"),
error.get("fields"),
)
total = len(result)
failed_count = 0
failure_index = 0
for record in result:
if not record.get("success"):
for error in record.get("errors", []):
self.log.warning(
"Record failure %d — status: %s | message: %s | fields: %s",
failure_index,
error.get("statusCode"),
error.get("message"),
error.get("fields"),
)
failed_count += 1
failure_index += 1
if failed_count:
self.log.warning(
"Salesforce Bulk API %s on %s: %d/%d record(s) failed.",
self.operation,
self.object_name,
failed_count,
total,
)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Upsert and update operations on SalesforceBulkOperator fail silently

4 participants