Skip to content

[MaterializedView] Harden partition-state engine#18642

Open
hongkunxu wants to merge 1 commit into
apache:masterfrom
hongkunxu:feat/sse_mv_hardening
Open

[MaterializedView] Harden partition-state engine#18642
hongkunxu wants to merge 1 commit into
apache:masterfrom
hongkunxu:feat/sse_mv_hardening

Conversation

@hongkunxu
Copy link
Copy Markdown
Contributor

@hongkunxu hongkunxu commented Jun 1, 2026

Description

This PR hardens the materialized-view partition-state engine in three areas: it fixes two correctness bugs that let stale data look fresh and let backfilled buckets silently fail to re-sync; it consolidates every per-partition mutation behind a single CAS engine so future state transitions cannot drift across the executor, scheduler, and consistency manager; and it centralizes the window-fingerprint algorithm that those three components use to agree on "what the source looked like at task time".

Design reference

The full partition-state machine — state definitions, operations, allowed transitions, and how the consistency manager keeps the partition map in sync with base-table segment changes — is documented here, with the core tables inlined below for reviewer convenience:

MV partition state engine — design notes (Google Doc)

Partition state definitions

State Description
VACANT No PartitionInfo entry exists for this bucket key in the partition map. The bucket has not been recorded by the MV's per-partition state machine.
VALID A PartitionInfo entry exists in the partition map with state = VALID and a PartitionFingerprint (segment count + farm-hash CRC, or PartitionFingerprint.EMPTY).
STALE A PartitionInfo entry exists in the partition map with state = STALE. The MV's recorded fingerprint is known (or assumed) to no longer match the current base-table source for this bucket — typically because ConsistencyMgr observed a base-table change.

Operation definitions

Trigger Operation Partition effect Notes
MinionTaskCycle MinionTaskCycle_APPEND VACANT → VALID Runs at the frontier (>= watermarkMs); advances watermarkMs; fingerprint is computed over overlapping base-table segments (or PartitionFingerprint.EMPTY if the source window has no overlapping segments).
MinionTaskCycle_OVERWRITE STALE → VALID In-coverage STALE bucket whose source still has data; re-materializes MV segments via segment-lineage replace; watermarkMs unchanged; fingerprint = recomputed value.
MinionTaskCycle_DELETE STALE → VALID In-coverage STALE bucket whose source has been retention-deleted (segmentCount == 0); drops MV segments via segment-lineage replace (segmentsTo = []); watermarkMs unchanged; fingerprint = PartitionFingerprint.EMPTY. The entry is intentionally not removed from the map — keeping it as VALID-empty eliminates VACANT as a runtime state for processed windows so a later backfill flips through the standard VALID → STALE → OVERWRITE cycle.
ConsistencyMgr ConsistencyMgr_MARK_STALE * → STALE Sole side effect of ConsistencyMgr. A base-table segment add / update / delete event is run through a filter pipeline (pre-coverage check, post-coverage / watermark cap, existence + state check); surviving events drive this op. After the bug fix it covers both VALID → STALE and in-coverage VACANT → STALE (synthesizing a fresh STALE entry).
Manual Manual_MARK_STALE * → STALE Shared op for the future REFRESH MV family of admin commands: REFRESH_MV (whole MV), REFRESH_MV_RANGE (time range, with optional coverage extension below min(partKey)), and REFRESH_MV_PARTITION (single bucket). All three variants have identical per-partition semantics — they differ only in selection scope (which buckets are targeted and whether VACANT buckets are synthesized).
Manual_DROP_MV * → VACANT DDL DROP MATERIALIZED VIEW. Atomically deletes the runtime znode (and the table config / schema). Bypasses the per-partition state machine — partitions are not first marked STALE; the entire entry tree disappears in a single ZooKeeper delete.

State transition table

From \ To VACANT VALID STALE
VACANT MinionTaskCycle_APPEND ConsistencyMgr_MARK_STALE, Manual_MARK_STALE
VALID Manual_DROP_MV ConsistencyMgr_MARK_STALE, Manual_MARK_STALE
STALE Manual_DROP_MV MinionTaskCycle_OVERWRITE, MinionTaskCycle_DELETE

Sync with base-table change

The full filter pipeline (pre-coverage, post-coverage / watermark cap, existence + state check) that ConsistencyMgr applies to base-segment add / update / delete events — including how it decides which buckets to enumerate and which transitions to emit — lives in the linked design doc.

This PR is the first implementation pass that fully matches that document; the post-merge state machine has a single owner (MaterializedViewPartitionManager) and one test surface that exercises every transition in the table above.

Why introduce MaterializedViewPartitionManager

Before this PR the per-partition mutation logic was spread across three sites — the minion executor, the controller-side task scheduler, and the consistency manager — each shipping its own ~50-line CAS-write retry loop with its own retry budget, its own jittered backoff, and its own hand-rolled Map<Long, PartitionInfo> copy/edit code:

Site Trigger Bespoke CAS loop
Executor APPEND / OVERWRITE / DELETE task commit DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS
Scheduler False-positive STALE revert MAX_PARTITION_STATE_PERSIST_RETRIES (8, fixed)
Consistency mgr Segment-change flush MAX_MARK_RETRIES + ThreadLocalRandom backoff

Concrete problems this caused:

  • Three retry/backoff policies that could (and did) drift apart whenever any one was tuned. Operators had no single knob to control "MV runtime znode contention pressure".
  • Watermark advancement on APPEND was a separate ZK write from the bucket insert in some refactor proposals; under concurrent writers this opens a window where the partition map and watermarkMs disagree, which in turn poisons the broker's now - watermarkMs <= stalenessThresholdMs check.
  • Adding a new state to the state machine required touching ~3 files with ~3 different idioms. Every new transition was an opportunity to forget a precondition check (e.g. existing.getState() == STALE before OVERWRITE).
  • Cross-package callers could synthesize arbitrary PartitionInfo objects directly, bypassing every invariant the manager would otherwise enforce.

The new design replaces all three loops with one CAS engine (MaterializedViewPartitionManager#applyMutation), exposes one public method per documented operation (appendValid / refreshValid / clearValid / revertValid / markStale / deletePartition), and bundles watermark advancement into the same atomic write that mutates the bucket. The PartitionInfo constructor is locked to package access so production code outside the metadata package physically cannot bypass the manager — tests opt in via a single named factory PartitionInfo.forTesting(...) annotated @VisibleForTesting.

Net effect: one retry budget, one backoff implementation, one set of preconditions, one place to add the next state. Future transitions (VACANT → STALE synthesize for in-coverage backfill, an explicit VALID-empty state, etc.) become a one-method addition with predictable behavior, not a three-site coordination exercise.

Bug fixes that landed alongside the refactor

Two separate correctness bugs were fixed as part of this pass — both touch the same partition-state machinery the manager now owns, and both now have regression tests in the manager's unit-test surface:

  1. Watermark over-advance past latest source data. The MV scheduler kept advancing watermarkMs toward now - bufferMs even when the base table had stopped receiving new data, which made the broker's freshness check now - watermarkMs <= stalenessThresholdMs return true for stale MV data. watermarkMs is now capped at the latest endTime present in the source segments, so the staleness check reflects real data freshness instead of wall-clock drift.

  2. Backfill into a previously-deleted bucket silently dropped. When a STALE bucket's source data was retention-deleted, the executor's DELETE branch removed the runtime PartitionInfo entry entirely. That left the bucket in the absent state, which the consistency manager's markStale pass deliberately skips, so any subsequent base-table backfill into that window never propagated to the MV. The DELETE branch now writes VALID + PartitionFingerprint.EMPTY instead, so the bucket follows the standard VALID → STALE → OVERWRITE cycle on backfill. PartitionFingerprint.EMPTY is byte-identical to what computeWindowFingerprint already produces when the overlapping segment list is empty (both feed an empty input through farmHashFingerprint64), so existing ZK records remain comparable across rolling upgrades — no migration required.

Other clean-ups bundled in

Two verbatim copies of the filter + sort + farmHashFingerprint64 window-fingerprint algorithm (one in the scheduler, one in the executor) are collapsed into MaterializedViewTaskUtils#computeWindowFingerprint. Drift between the two copies would silently break the executor's commit-time fingerprint validation and produce tasks that retry forever without advancing the watermark — one of the worst classes of MV bugs to triage.

Backward compatibility / rolling upgrade

  • ZK format unchanged. The DELETE-branch fix relies on PartitionFingerprint.EMPTY being byte-identical to the empty-overlap fingerprint, so znodes written by the old executor remain comparable to znodes written by the new one.
  • No public API or REST surface changes.
  • PartitionInfo constructor is now package-private; the only production callers were already inside the metadata package. Tests use the new PartitionInfo.forTesting(...) factory.

Tests

  • New: MaterializedViewPartitionManagerTest exercises every public method, including CAS retry / version-conflict paths and precondition violations.
  • Updated: scheduler / executor / consistency-manager / rewrite-engine tests rebased onto the manager and the PartitionInfo.forTesting factory.
  • Manual end-to-end validation against a local Pinot cluster: created airlineStats_mv_hardening_smoke with 1m refresh and exercised the segment-delete → STALE → executor-DELETE → backfill path; verified that the VALID-empty write keeps the bucket re-syncing on backfill, and that the watermark stops advancing once base-table ingestion stalls.

Suggested labels

bugfix, refactor

…partition-state centralization

This change consolidates two materialized-view bug fixes and two refactors
that together close the gap between the broker's freshness contract and
the actual MV ingestion state, and remove duplicated CAS / fingerprint
logic that had drifted across the executor, scheduler, and consistency
manager.

1. bugfix: Prevent watermarkMs from advancing past latest source data

   The MV scheduler previously kept advancing watermarkMs toward
   `now - bufferMs` even when the base table had stopped receiving new
   data, which caused the rewrite engine to treat stale MV data as
   fresh.  watermarkMs is now capped at the latest endTime present in
   the source segments, so the staleness check
   `now - watermarkMs <= stalenessThresholdMs` reflects real data
   freshness instead of wall-clock drift.

2. bugfix: Persist VALID-empty on DELETE so backfilled buckets re-sync

   When a STALE bucket's source data was retention-deleted, the
   executor's DELETE branch used to remove the runtime PartitionInfo
   entry entirely.  That left the bucket in the `absent` state, which
   the consistency manager's STALE-marking pass deliberately skips, so
   any subsequent base-table backfill into that window would never
   propagate to the MV.

   Fix: the DELETE branch now writes `VALID + PartitionFingerprint.EMPTY`
   instead of removing the entry.  `absent` is now reserved for
   cold-start only; once any task has run for a bucket, an entry exists.
   A backfill into a previously-empty bucket flows through the standard
   `VALID -> STALE -> OVERWRITE` cycle the consistency manager already
   drives.

   PartitionFingerprint.EMPTY is byte-identical to what the scheduler's
   and executor's `computeWindowFingerprint` already produce when the
   overlapping segment list is empty (both feed an empty input through
   farmHashFingerprint64).  This keeps empty-by-DELETE and
   empty-by-APPEND on a single representation, so existing ZK records
   remain comparable across rolling upgrades.

3. refactor: Centralize computeWindowFingerprint in MaterializedViewTaskUtils

   The scheduler and the minion executor each carried a verbatim copy
   of the filter+sort+farmHash64 algorithm used to fingerprint the
   source segments overlapping a partition window.  Any drift between
   the two copies (different hasher, encoding, or sort key) would
   silently break the executor's commit-time fingerprint validation,
   producing one of the hardest classes of MV bugs to diagnose: tasks
   that fail validation on the way in, retry forever, and never advance
   the watermark.

   The algorithm now lives in
   MaterializedViewTaskUtils#computeWindowFingerprint as the single
   source of truth.  Both the scheduler call sites and the executor
   call site invoke the utility directly with
   `_context.getSegmentsZKMetadata` inlined at the call site - no
   per-module wrapper survives, so there is nowhere for a future
   contributor to silently re-implement it.

4. refactor: Centralize all partition-state mutations through
   MaterializedViewPartitionManager

   Adds a state-change DSL backed by a single CAS engine that
   consolidates every per-partition mutation on the materialized-view
   runtime znode.  The public methods
   (appendValid / refreshValid / clearValid / revertValid / markStale /
   deletePartition) map one-to-one to the per-partition state machine;
   private CRUD primitives enforce structural invariants on the
   in-memory partition map; applyMutation centralizes version-checked
   CAS retries with two retry profiles (critical, cluster-tunable;
   revert, fixed budget).

   The executor (APPEND -> appendValid, OVERWRITE -> refreshValid,
   DELETE -> clearValid), scheduler (false-positive STALE revert ->
   revertValid), and consistency manager (segment-change flush ->
   markStale) are switched onto the manager.  Watermark advancement on
   APPEND now happens inside the manager's mutator under the same
   atomic write that adds the bucket entry, so map state and watermark
   cannot diverge under concurrent writers.

   The PartitionInfo constructor is locked to package access to encode
   the contract at compile time: production code outside the metadata
   package physically cannot synthesize a PartitionInfo without going
   through the manager.  Cross-package tests use a single named factory
   `PartitionInfo.forTesting(...)` annotated `@VisibleForTesting`.

   Three bespoke CAS retry loops (DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS in
   the executor, MAX_PARTITION_STATE_PERSIST_RETRIES in the scheduler,
   MAX_MARK_RETRIES + jittered backoff in the consistency manager) and
   their associated ThreadLocalRandom imports are removed; retry budget
   is now governed by a single CLUSTER_CONFIG_KEY_MAX_RUNTIME_UPDATE_ATTEMPTS
   cap.

Co-authored-by: Cursor <cursoragent@cursor.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 1, 2026

Codecov Report

❌ Patch coverage is 73.33333% with 64 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.44%. Comparing base (eddf4c0) to head (af88fd6).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...iew/metadata/MaterializedViewPartitionManager.java 88.81% 7 Missing and 10 partials ⚠️
...onsistency/MaterializedViewConsistencyManager.java 50.00% 12 Missing and 4 partials ⚠️
...dview/scheduler/MaterializedViewTaskScheduler.java 27.27% 16 Missing ⚠️
...materializedview/MaterializedViewTaskExecutor.java 0.00% 14 Missing ⚠️
...lizedview/scheduler/MaterializedViewTaskUtils.java 93.75% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18642      +/-   ##
============================================
+ Coverage     56.79%   64.44%   +7.65%     
- Complexity        7     1277    +1270     
============================================
  Files          2579     3363     +784     
  Lines        149557   207955   +58398     
  Branches      24165    32470    +8305     
============================================
+ Hits          84939   134021   +49082     
- Misses        57424    63156    +5732     
- Partials       7194    10778    +3584     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.44% <73.33%> (+7.65%) ⬆️
temurin 64.44% <73.33%> (+7.65%) ⬆️
unittests 64.44% <73.33%> (+7.65%) ⬆️
unittests1 56.80% <ø> (+<0.01%) ⬆️
unittests2 37.19% <73.33%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal correctness issue; see inline comment.

// see `MaterializedViewPartitionManager#clearValid` for the design rationale (no
// fingerprint validation needed; the source contained zero overlapping segments by
// construction at scheduler dispatch time).
partitionManager.clearValid(tableName, windowStartMs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes the commit-time source check from the DELETE path. If a backfill lands after the scheduler emitted DELETE but before postProcess() runs, the create-segment event is ignored because the partition is already STALE; then clearValid() flips it to VALID + EMPTY and the broker will rewrite queries to an empty MV bucket even though source rows now exist. The old behavior of removing the entry still fell back to base-table routing, so this is a silent wrong-results regression. DELETE needs a commit-time fingerprint/emptiness validation (or another way to preserve STALE on mid-flight backfills) before writing VALID + EMPTY.

@xiangfu0 xiangfu0 added bug Something is not working as expected refactor Code restructuring without changing behavior materialized-view labels Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is not working as expected materialized-view refactor Code restructuring without changing behavior

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants