Attempt cancellation of pending futures and cap workers by file count in GCSTimeSpanFileTransformOperator#64511
Open
SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
Conversation
…ror` and cap worker count to number of files in `GCSTimeSpanFileTransformOperator`. This avoids scheduling unnecessary work during failures and prevents over-provisioning threads for small batches. Existing failure semantics are preserved (`*_continue_on_fail` unchanged). Updated tests to validate cancellation behaviour and worker cap.
76358d3 to
4ded30c
Compare
Contributor
|
hi there, thanks for the PR! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This change is a follow-up to PR #62196, which introduced parallel download and upload support in
GCSTimeSpanFileTransformOperator. It adds best-effort cancellation of pending futures when aGoogleCloudErroroccurs during parallel execution and caps the number of worker threads to the number of files being processed. When*_continue_on_fail=False, the operator still raises on the first failure, but now also callscancel()on submitted futures to prevent queued tasks from starting, reducing unnecessary work in systemic failure scenarios.Rationale
Previously, the operator raised on failure but allowed all queued tasks to proceed, and could create more worker threads than necessary for small batches. In high-volume workloads, this could result in unnecessary GCS API calls after a failure was already known, while in low-volume workloads it led to inefficient thread usage. Cancelling pending futures reduces wasted work and capping workers ensures more efficient resource utilisation, while preserving existing behaviour and keeping failure semantics unchanged.
Tests
cancel()is invoked when*_continue_on_fail=False(and not invoked when*_continue_on_fail=True).as_completedmocking; system tests are not required as external service interaction remains unchanged.Backwards Compatibility
No change in behaviour; task failure semantics remain unchanged.