Skip to content

[Refactor] Introduce a build-report lifecycle handle for hash-join partitions #22622

@kosiew

Description

@kosiew

Background and Motivation

#21666 addressed correctness around scheduled-vs-delivered reporting and drop-time cancellation. Lifecycle logic is still distributed across multiple components, which makes code review and future edits harder.

Coordination remains spread across:

  • HashJoinStream transition logic
  • lazy OnceFut execution/polling
  • Drop cancellation behavior
  • shared bounds/accumulator terminal state

This distribution increases cognitive overhead and regression risk during maintenance.

Problem Statement

The stream-level state machine still carries too much lifecycle responsibility. Transition ownership is documented, but spread across several types.

We want one component to own lifecycle transitions and invariants for architectural clarity.

Goals

  • Centralize build-report lifecycle decisions in one abstraction.
  • Preserve exactly one terminal outcome per partition report:
    • Delivered
    • Canceled
    • Finalized/no-op
  • Make drop-time behavior deterministic and self-documenting.
  • Reduce lifecycle-related branching in HashJoinStream.
  • Improve local reasoning for future contributors.

Non-Goals

  • Re-opening already-fixed correctness bugs as part of this issue.
  • Redesigning hash-join dynamic filtering behavior.
  • Broad changes to join planner/optimizer behavior.
  • Performance tuning outside lifecycle maintainability.

Proposed Design

Introduce a dedicated lifecycle type, for example:

  • BuildReportHandle
  • BuildReportLifecycle
  • PartitionReportToken

Responsibilities:

  1. Track explicit state (NotReported, Scheduled, Delivered, Canceled, Finalized).
  2. Own transitions (schedule, mark_delivered, cancel_if_pending, finalize).
  3. Ensure terminal transition happens exactly once.
  4. Provide drop-safe default behavior (cancel_if_pending) when dropped without delivery.
  5. Expose minimal API to stream state machine so callers cannot bypass invariants.

Potential integration pattern:

  • transition_after_build_collected() obtains or updates handle state.
  • wait_for_partition_bounds_report() marks Delivered only after successful completion.
  • Drop path delegates to handle cancel_if_pending.
  • Shared bounds/accumulator interactions occur through handle entry points rather than direct bool checks.

This refactor must preserve current behavior and test outcomes.

API and Invariants

Enforce with assertions/tests:

  • A report cannot be both Delivered and Canceled.
  • Delivered may only occur after successful waiter completion.
  • Drop on Scheduled must attempt cancellation.
  • Repeated terminal operations are idempotent and side-effect free.
  • Transition intent is encoded in API shape, not scattered call-site conventions.

References

  • Related area:
    • datafusion/physical-plan/src/joins/hash_join/stream.rs
    • datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions