Add transient-error retry to SalesforceBulkOperator#64575
Add transient-error retry to SalesforceBulkOperator#64575nagasrisai wants to merge 19 commits intoapache:mainfrom
Conversation
Introduces max_retries, retry_delay, and transient_error_codes params. When max_retries > 0, records that fail with a transient Salesforce error (UNABLE_TO_LOCK_ROW or API_TEMPORARILY_UNAVAILABLE by default) are re-submitted after retry_delay seconds, up to max_retries times. Only the failed records are re-submitted, not the entire payload. Related to apache#64519
|
@nagasrisai This PR has been converted to draft because it does not yet meet our Pull Request quality criteria. Issues found:
What to do next:
Converting a PR to draft is not a rejection — it is an invitation to bring the PR up to the project's standards so that maintainer review time is spent productively. There is no rush — take your time and work at your own pace. We appreciate your contribution and are happy to wait for updates. If you have questions, feel free to ask on the Airflow Slack. |
There was a problem hiding this comment.
Pull request overview
Adds built-in, record-level retry handling to SalesforceBulkOperator for transient Salesforce Bulk API failures (e.g., lock/contention errors) so DAG authors don’t need to manually re-submit failed records from XCom.
Changes:
- Introduces
max_retries,retry_delay, andtransient_error_codesparameters to control transient-error retries. - Refactors bulk submission into
_run_operation()and adds_retry_transient_failures()to re-submit only transient-failing records while preserving result ordering. - Adds unit tests covering retry/no-retry behavior, delay handling, and custom transient error codes.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| providers/salesforce/src/airflow/providers/salesforce/operators/bulk.py | Adds retry configuration, refactors operation execution, and implements transient record-level retries. |
| providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py | Adds unit tests validating transient retry behavior and result placement. |
providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py
Outdated
Show resolved
Hide resolved
providers/salesforce/tests/unit/salesforce/operators/test_bulk_retry.py
Outdated
Show resolved
Hide resolved
…rator.retry_delay (timedelta)
Follow-up to #64519 as suggested by eladkal , adding retry support for transient Salesforce Bulk errors.
The issue is that Salesforce Bulk API errors come back at the record level, not as exceptions. So when Salesforce is under concurrent write load and throws
UNABLE_TO_LOCK_ROWon a few records, the operator just returns those assuccess=Falseentries with no way to recover automatically. Your only option today is to pull the XCom, filter it yourself, build a retry payload, and call the operator again from the DAG.I added three optional parameters to handle this inside the operator:
max_retriescontrols how many times to attempt a retry (defaults to 0, so nothing changes for existing users),retry_delayis how long to wait between attempts, andtransient_error_codesis the set of Salesforce status codes that qualify for retry ,defaulting toUNABLE_TO_LOCK_ROWandAPI_TEMPORARILY_UNAVAILABLE.On each retry pass only the failing records are re-submitted, not the whole payload. The results slot back into their original positions since Salesforce guarantees the response order matches the input order. Permanent errors like
INVALID_FIELDare not in the default set and will never be retried.Closes #64519