Skip to content

Attempt cancellation of pending futures and cap workers by file count in GCSTimeSpanFileTransformOperator#64511

Open
SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
SameerMesiah97:GCSTimeSpanFileTransformOperator-Parallel-UL-DL-Fix
Open

Attempt cancellation of pending futures and cap workers by file count in GCSTimeSpanFileTransformOperator#64511
SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
SameerMesiah97:GCSTimeSpanFileTransformOperator-Parallel-UL-DL-Fix

Conversation

@SameerMesiah97
Copy link
Copy Markdown
Contributor

@SameerMesiah97 SameerMesiah97 commented Mar 30, 2026

Description

This change is a follow-up to PR #62196, which introduced parallel download and upload support in GCSTimeSpanFileTransformOperator. It adds best-effort cancellation of pending futures when a GoogleCloudError occurs during parallel execution and caps the number of worker threads to the number of files being processed. When *_continue_on_fail=False, the operator still raises on the first failure, but now also calls cancel() on submitted futures to prevent queued tasks from starting, reducing unnecessary work in systemic failure scenarios.

Rationale

Previously, the operator raised on failure but allowed all queued tasks to proceed, and could create more worker threads than necessary for small batches. In high-volume workloads, this could result in unnecessary GCS API calls after a failure was already known, while in low-volume workloads it led to inefficient thread usage. Cancelling pending futures reduces wasted work and capping workers ensures more efficient resource utilisation, while preserving existing behaviour and keeping failure semantics unchanged.

Tests

  • Updated existing parallel execution tests to assert that worker count is capped to the number of files and that cancel() is invoked when *_continue_on_fail=False (and not invoked when *_continue_on_fail=True).
  • Extended existing failure-path tests for both download and upload flows using controlled executor and as_completed mocking; system tests are not required as external service interaction remains unchanged.
  • System tests are not required as external service interaction remains unchanged from the previous implementation.

Backwards Compatibility

No change in behaviour; task failure semantics remain unchanged.

@SameerMesiah97 SameerMesiah97 requested a review from shahar1 as a code owner March 30, 2026 21:22
@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Mar 30, 2026
…ror` and cap worker count to number of files in `GCSTimeSpanFileTransformOperator`. This avoids scheduling unnecessary work during failures and prevents over-provisioning threads for small batches. Existing failure semantics are preserved (`*_continue_on_fail` unchanged). Updated tests to validate cancellation behaviour and worker cap.
@SameerMesiah97 SameerMesiah97 force-pushed the GCSTimeSpanFileTransformOperator-Parallel-UL-DL-Fix branch from 76358d3 to 4ded30c Compare March 30, 2026 21:39
@SameerMesiah97 SameerMesiah97 changed the title Attempt cancellation of pending futures on failure in GCSTimeSpanFileTransformOperator Attempt cancellation of pending futures and cap workers by file count in GCSTimeSpanFileTransformOperator Mar 30, 2026
@VladaZakharova
Copy link
Copy Markdown
Contributor

hi there, thanks for the PR!
Can you please upload the screenshot of green system tests?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants