fix: log and optionally raise on record-level failures in SalesforceBulkOperator#64519
fix: log and optionally raise on record-level failures in SalesforceBulkOperator#64519nagasrisai wants to merge 7 commits intoapache:mainfrom
Conversation
…e_on_failures option Salesforce's Bulk API does not raise on record failures — it signals them via `success=False` in the result list. Previously the operator returned this list as XCom without ever inspecting it, so tasks would show green even when every record had failed. This commit: - Adds `_check_result_for_failures()` which iterates the result list and logs a warning (with per-record status codes and messages) whenever any record carries `success=False`. - Adds a `raise_on_failures=False` constructor parameter. When set to `True` an `AirflowException` is raised after logging, so operators can opt into hard failures without breaking existing pipelines. - Converts the lazy iterable returned by simple-salesforce into a list before inspection, which is consistent with how callers already consume it via XCom. Fixes apache#63930
| :param raise_on_failures: If True, raise an :class:`~airflow.exceptions.AirflowException` when | ||
| any record in the result set reports ``success=False``. Defaults to ``False`` for backward | ||
| compatibility — failures are always logged as warnings regardless of this setting. |
There was a problem hiding this comment.
This open the door to so many customizations that we won't be able to accommodate.
For example - your use case is yes/no - even a single error will be raised. What if for someone else the desired logic will be threshold? say X out of the total? There can be so many customized logics.
I tend to say that this feature should be kept in your local deployment rather than be served from the project code.
There was a problem hiding this comment.
@eladkal That is a fair point and I agree with you. The raise_on_failures parameter was probably overreach on my part since you could reasonably want threshold logic, partial-failure tolerance, or all sorts of custom handling once you open that door.
I am happy to drop that parameter entirely. The part I would like to keep is the logging, right now if every record comes back with success=False the task shows green and produces no output other than the raw XCom value. Just logging a warning that says how many records failed and what the Salesforce error codes were seems clearly useful without adding any policy footprint. Would you be OK with a version that removes raise_on_failures and only adds the failure logging?
There was a problem hiding this comment.
I am OK with logging.
I suggest you check the source code of
BigQueryInsertJobOperator there is a handle error mechanism there and submitting retry in case of error. Maybe if that make sense you can implement similar approach? While may not be exactly what you asked it may reduce the possibility of error rows and this is a feature thay benefit everyone
There was a problem hiding this comment.
@eladkal Thanks for pointing me there. I had a look at BigQueryInsertJobOperator and the key difference I noticed is that it retries at the job level, specifically resubmitting the entire job when it hits a rate-limit error from the API. Salesforce Bulk errors are different in that they live at the record level inside a successful API response, so the retry target would be a subset of the original payload rather than the whole request.
That said, the idea makes sense for certain Salesforce error codes that are genuinely transient, things like UNABLE_TO_LOCK_ROW or API_TEMPORARILY_UNAVAILABLE. For those, re-submitting just the failed records after a short delay has a reasonable chance of succeeding. Errors like INVALID_FIELD or REQUIRED_FIELD_MISSING are obviously not worth retrying.
Would you want me to add that to this PR, or keep this one focused on the logging and open a follow-up for the retry behaviour? I am happy either way, just want to make sure I do not make this PR harder to review by expanding its scope without checking first.
There was a problem hiding this comment.
If you wish to keep the logging part separate you can convert the logging code here and set the feature change for another PR.
There was a problem hiding this comment.
@eladkal Sounds good, I will raise a separate PR for the retry mechanism. The current PR already has only the logging part after the earlier update, so no further changes needed here.
Whenever you get a chance, would really appreciate a review on this one. No rush at all, take your time.
Per maintainer feedback, `raise_on_failures` adds a policy knob that opens the door to threshold logic and other customisations that are better handled in user code. Removed it. The failure *logging* is retained: when any record comes back with `success=False` the operator now emits a structured warning with the Salesforce status code, error message, and affected field names, which were previously completely invisible unless the caller inspected XCom.
Introduces max_retries, retry_delay, and transient_error_codes parameters. When max_retries > 0, records that fail with a transient Salesforce error (UNABLE_TO_LOCK_ROW, API_TEMPORARILY_UNAVAILABLE by default) are re-submitted after retry_delay seconds, up to max_retries times. Only the failed records are re-submitted, not the entire payload. Related to apache#64519
|
The follow-up PR for the retry mechanism is now up: #64575. It adds |
Introduces max_retries, retry_delay, and transient_error_codes params. When max_retries > 0, records that fail with a transient Salesforce error (UNABLE_TO_LOCK_ROW or API_TEMPORARILY_UNAVAILABLE by default) are re-submitted after retry_delay seconds, up to max_retries times. Only the failed records are re-submitted, not the entire payload. Related to apache#64519
|
@nagasrisai This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
There was a problem hiding this comment.
Pull request overview
Add visibility into per-record Salesforce Bulk API failures by logging failed results after each bulk operation, and adjust execute() to materialize results for single-pass iteration / optional XCom return.
Changes:
- Added
_log_result_failures()to warn onsuccess=Falserecords (and currently info-log on full success). - Materialized Bulk API result iterables into a list before logging / returning via XCom.
- Added unit tests around logging behavior and XCom return behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py | Logs record-level failures and materializes results before optional XCom return. |
| providers/salesforce/tests/unit/salesforce/operators/test_bulk.py | Adds tests intended to validate warnings/no-warnings and XCom return semantics. |
| with pytest.raises(Exception): | ||
| pass | ||
|
|
||
| result = operator.execute(context={}) | ||
| assert result is None |
There was a problem hiding this comment.
The test names/docstrings claim warnings are emitted (or not emitted), but the tests currently don’t assert on logs at all, and with pytest.raises(Exception): pass is a no-op that doesn’t validate behavior. Use caplog (or assertLogs) to assert that warnings are produced for failed records (including statusCode/message) and that no warnings are produced on full success; remove the no-op pytest.raises block.
| failed = [r for r in result if not r.get("success")] | ||
| total = len(result) | ||
|
|
||
| if failed: | ||
| self.log.warning( | ||
| "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", | ||
| self.operation, | ||
| self.object_name, | ||
| len(failed), | ||
| total, | ||
| ) | ||
| for idx, record in enumerate(failed): | ||
| for error in record.get("errors", []): | ||
| self.log.warning( | ||
| "Record failure %d — status: %s | message: %s | fields: %s", |
There was a problem hiding this comment.
The logged idx is the index within the filtered failed list, not the original record index in the result/payload. This can mislead debugging (e.g., the first failure logs as 0 even if it was record 57). Iterate with enumerate(result) and log the original index for each failure (or include both original index and a failure counter).
| failed = [r for r in result if not r.get("success")] | |
| total = len(result) | |
| if failed: | |
| self.log.warning( | |
| "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", | |
| self.operation, | |
| self.object_name, | |
| len(failed), | |
| total, | |
| ) | |
| for idx, record in enumerate(failed): | |
| for error in record.get("errors", []): | |
| self.log.warning( | |
| "Record failure %d — status: %s | message: %s | fields: %s", | |
| total = len(result) | |
| failed_indices = [idx for idx, r in enumerate(result) if not r.get("success")] | |
| if failed_indices: | |
| self.log.warning( | |
| "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", | |
| self.operation, | |
| self.object_name, | |
| len(failed_indices), | |
| total, | |
| ) | |
| for idx in failed_indices: | |
| record = result[idx] | |
| for error in record.get("errors", []): | |
| self.log.warning( | |
| "Record failure at result index %d — status: %s | message: %s | fields: %s", |
| for idx, record in enumerate(failed): | ||
| for error in record.get("errors", []): | ||
| self.log.warning( | ||
| "Record failure %d — status: %s | message: %s | fields: %s", | ||
| idx, | ||
| error.get("statusCode"), | ||
| error.get("message"), | ||
| error.get("fields"), | ||
| ) |
There was a problem hiding this comment.
The logged idx is the index within the filtered failed list, not the original record index in the result/payload. This can mislead debugging (e.g., the first failure logs as 0 even if it was record 57). Iterate with enumerate(result) and log the original index for each failure (or include both original index and a failure counter).
| else: | ||
| self.log.info( | ||
| "Salesforce Bulk API %s on %s completed: %d record(s) processed successfully.", | ||
| self.operation, | ||
| self.object_name, | ||
| total, | ||
| ) |
There was a problem hiding this comment.
PR description says the change is limited to making failures visible via warnings (no new policy/behavior beyond that). This adds an info log line on full success, which is additional behavior and may increase log volume. Either update the PR description to reflect this, or change this branch to be silent / debug-level to align with the stated scope.
| failed = [r for r in result if not r.get("success")] | ||
| total = len(result) | ||
|
|
||
| if failed: | ||
| self.log.warning( | ||
| "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", | ||
| self.operation, | ||
| self.object_name, | ||
| len(failed), | ||
| total, | ||
| ) | ||
| for idx, record in enumerate(failed): | ||
| for error in record.get("errors", []): | ||
| self.log.warning( | ||
| "Record failure %d — status: %s | message: %s | fields: %s", | ||
| idx, | ||
| error.get("statusCode"), | ||
| error.get("message"), | ||
| error.get("fields"), | ||
| ) |
There was a problem hiding this comment.
This creates an additional list (failed) that can be sizable (default batch sizes can be large), effectively duplicating references and increasing memory churn. Consider iterating result once, counting failures and logging per failure as you go (or collecting only minimal data needed for the summary) to avoid allocating a second list.
| failed = [r for r in result if not r.get("success")] | |
| total = len(result) | |
| if failed: | |
| self.log.warning( | |
| "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", | |
| self.operation, | |
| self.object_name, | |
| len(failed), | |
| total, | |
| ) | |
| for idx, record in enumerate(failed): | |
| for error in record.get("errors", []): | |
| self.log.warning( | |
| "Record failure %d — status: %s | message: %s | fields: %s", | |
| idx, | |
| error.get("statusCode"), | |
| error.get("message"), | |
| error.get("fields"), | |
| ) | |
| total = len(result) | |
| failed_count = 0 | |
| failure_index = 0 | |
| for record in result: | |
| if not record.get("success"): | |
| for error in record.get("errors", []): | |
| self.log.warning( | |
| "Record failure %d — status: %s | message: %s | fields: %s", | |
| failure_index, | |
| error.get("statusCode"), | |
| error.get("message"), | |
| error.get("fields"), | |
| ) | |
| failed_count += 1 | |
| failure_index += 1 | |
| if failed_count: | |
| self.log.warning( | |
| "Salesforce Bulk API %s on %s: %d/%d record(s) failed.", | |
| self.operation, | |
| self.object_name, | |
| failed_count, | |
| total, | |
| ) |
I ran into this while going through #63930. The Salesforce Bulk API does not throw exceptions when individual records fail. Instead it returns a list of results where failed records have
success=Falseand anerrorsfield with details. The operator was never looking at that list, so a task could process a thousand records, have every single one rejected by Salesforce, and still show green in Airflow. You would only find out something went wrong by digging into the XCom value yourself.This adds
_log_result_failures()which runs after every operation and emits a structured warning for each failed record with the Salesforce status code, error message, and field names. That is the full extent of the change — no new parameters, no policy decisions, just making failures visible in the task log where they belong.The result iterable from
simple-salesforceis also materialised into a list before the check so we only iterate it once, which is consistent with how callers already consume it via XCom.Closes #63930