[MaterializedView] Harden partition-state engine#18642
Conversation
…partition-state centralization This change consolidates two materialized-view bug fixes and two refactors that together close the gap between the broker's freshness contract and the actual MV ingestion state, and remove duplicated CAS / fingerprint logic that had drifted across the executor, scheduler, and consistency manager. 1. bugfix: Prevent watermarkMs from advancing past latest source data The MV scheduler previously kept advancing watermarkMs toward `now - bufferMs` even when the base table had stopped receiving new data, which caused the rewrite engine to treat stale MV data as fresh. watermarkMs is now capped at the latest endTime present in the source segments, so the staleness check `now - watermarkMs <= stalenessThresholdMs` reflects real data freshness instead of wall-clock drift. 2. bugfix: Persist VALID-empty on DELETE so backfilled buckets re-sync When a STALE bucket's source data was retention-deleted, the executor's DELETE branch used to remove the runtime PartitionInfo entry entirely. That left the bucket in the `absent` state, which the consistency manager's STALE-marking pass deliberately skips, so any subsequent base-table backfill into that window would never propagate to the MV. Fix: the DELETE branch now writes `VALID + PartitionFingerprint.EMPTY` instead of removing the entry. `absent` is now reserved for cold-start only; once any task has run for a bucket, an entry exists. A backfill into a previously-empty bucket flows through the standard `VALID -> STALE -> OVERWRITE` cycle the consistency manager already drives. PartitionFingerprint.EMPTY is byte-identical to what the scheduler's and executor's `computeWindowFingerprint` already produce when the overlapping segment list is empty (both feed an empty input through farmHashFingerprint64). This keeps empty-by-DELETE and empty-by-APPEND on a single representation, so existing ZK records remain comparable across rolling upgrades. 3. refactor: Centralize computeWindowFingerprint in MaterializedViewTaskUtils The scheduler and the minion executor each carried a verbatim copy of the filter+sort+farmHash64 algorithm used to fingerprint the source segments overlapping a partition window. Any drift between the two copies (different hasher, encoding, or sort key) would silently break the executor's commit-time fingerprint validation, producing one of the hardest classes of MV bugs to diagnose: tasks that fail validation on the way in, retry forever, and never advance the watermark. The algorithm now lives in MaterializedViewTaskUtils#computeWindowFingerprint as the single source of truth. Both the scheduler call sites and the executor call site invoke the utility directly with `_context.getSegmentsZKMetadata` inlined at the call site - no per-module wrapper survives, so there is nowhere for a future contributor to silently re-implement it. 4. refactor: Centralize all partition-state mutations through MaterializedViewPartitionManager Adds a state-change DSL backed by a single CAS engine that consolidates every per-partition mutation on the materialized-view runtime znode. The public methods (appendValid / refreshValid / clearValid / revertValid / markStale / deletePartition) map one-to-one to the per-partition state machine; private CRUD primitives enforce structural invariants on the in-memory partition map; applyMutation centralizes version-checked CAS retries with two retry profiles (critical, cluster-tunable; revert, fixed budget). The executor (APPEND -> appendValid, OVERWRITE -> refreshValid, DELETE -> clearValid), scheduler (false-positive STALE revert -> revertValid), and consistency manager (segment-change flush -> markStale) are switched onto the manager. Watermark advancement on APPEND now happens inside the manager's mutator under the same atomic write that adds the bucket entry, so map state and watermark cannot diverge under concurrent writers. The PartitionInfo constructor is locked to package access to encode the contract at compile time: production code outside the metadata package physically cannot synthesize a PartitionInfo without going through the manager. Cross-package tests use a single named factory `PartitionInfo.forTesting(...)` annotated `@VisibleForTesting`. Three bespoke CAS retry loops (DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS in the executor, MAX_PARTITION_STATE_PERSIST_RETRIES in the scheduler, MAX_MARK_RETRIES + jittered backoff in the consistency manager) and their associated ThreadLocalRandom imports are removed; retry budget is now governed by a single CLUSTER_CONFIG_KEY_MAX_RUNTIME_UPDATE_ATTEMPTS cap. Co-authored-by: Cursor <cursoragent@cursor.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18642 +/- ##
============================================
+ Coverage 56.79% 64.44% +7.65%
- Complexity 7 1277 +1270
============================================
Files 2579 3363 +784
Lines 149557 207955 +58398
Branches 24165 32470 +8305
============================================
+ Hits 84939 134021 +49082
- Misses 57424 63156 +5732
- Partials 7194 10778 +3584
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal correctness issue; see inline comment.
| // see `MaterializedViewPartitionManager#clearValid` for the design rationale (no | ||
| // fingerprint validation needed; the source contained zero overlapping segments by | ||
| // construction at scheduler dispatch time). | ||
| partitionManager.clearValid(tableName, windowStartMs); |
There was a problem hiding this comment.
This removes the commit-time source check from the DELETE path. If a backfill lands after the scheduler emitted DELETE but before postProcess() runs, the create-segment event is ignored because the partition is already STALE; then clearValid() flips it to VALID + EMPTY and the broker will rewrite queries to an empty MV bucket even though source rows now exist. The old behavior of removing the entry still fell back to base-table routing, so this is a silent wrong-results regression. DELETE needs a commit-time fingerprint/emptiness validation (or another way to preserve STALE on mid-flight backfills) before writing VALID + EMPTY.
Description
This PR hardens the materialized-view partition-state engine in three areas: it fixes two correctness bugs that let stale data look fresh and let backfilled buckets silently fail to re-sync; it consolidates every per-partition mutation behind a single CAS engine so future state transitions cannot drift across the executor, scheduler, and consistency manager; and it centralizes the window-fingerprint algorithm that those three components use to agree on "what the source looked like at task time".
Design reference
The full partition-state machine — state definitions, operations, allowed transitions, and how the consistency manager keeps the partition map in sync with base-table segment changes — is documented here, with the core tables inlined below for reviewer convenience:
Partition state definitions
VACANTPartitionInfoentry exists for this bucket key in the partition map. The bucket has not been recorded by the MV's per-partition state machine.VALIDPartitionInfoentry exists in the partition map withstate = VALIDand aPartitionFingerprint(segment count + farm-hash CRC, orPartitionFingerprint.EMPTY).STALEPartitionInfoentry exists in the partition map withstate = STALE. The MV's recorded fingerprint is known (or assumed) to no longer match the current base-table source for this bucket — typically becauseConsistencyMgrobserved a base-table change.Operation definitions
MinionTaskCycleMinionTaskCycle_APPENDVACANT → VALID>= watermarkMs); advanceswatermarkMs; fingerprint is computed over overlapping base-table segments (orPartitionFingerprint.EMPTYif the source window has no overlapping segments).MinionTaskCycle_OVERWRITESTALE → VALIDSTALEbucket whose source still has data; re-materializes MV segments via segment-lineage replace;watermarkMsunchanged; fingerprint = recomputed value.MinionTaskCycle_DELETESTALE → VALIDSTALEbucket whose source has been retention-deleted (segmentCount == 0); drops MV segments via segment-lineage replace (segmentsTo = []);watermarkMsunchanged; fingerprint =PartitionFingerprint.EMPTY. The entry is intentionally not removed from the map — keeping it asVALID-emptyeliminatesVACANTas a runtime state for processed windows so a later backfill flips through the standardVALID → STALE → OVERWRITEcycle.ConsistencyMgrConsistencyMgr_MARK_STALE* → STALEConsistencyMgr. A base-table segment add / update / delete event is run through a filter pipeline (pre-coverage check, post-coverage / watermark cap, existence + state check); surviving events drive this op. After the bug fix it covers bothVALID → STALEand in-coverageVACANT → STALE(synthesizing a freshSTALEentry).ManualManual_MARK_STALE* → STALEREFRESH MVfamily of admin commands:REFRESH_MV(whole MV),REFRESH_MV_RANGE(time range, with optional coverage extension belowmin(partKey)), andREFRESH_MV_PARTITION(single bucket). All three variants have identical per-partition semantics — they differ only in selection scope (which buckets are targeted and whetherVACANTbuckets are synthesized).Manual_DROP_MV* → VACANTDROP MATERIALIZED VIEW. Atomically deletes the runtime znode (and the table config / schema). Bypasses the per-partition state machine — partitions are not first markedSTALE; the entire entry tree disappears in a single ZooKeeper delete.State transition table
VACANTVALIDSTALEVACANTMinionTaskCycle_APPENDConsistencyMgr_MARK_STALE,Manual_MARK_STALEVALIDManual_DROP_MVConsistencyMgr_MARK_STALE,Manual_MARK_STALESTALEManual_DROP_MVMinionTaskCycle_OVERWRITE,MinionTaskCycle_DELETESync with base-table change
The full filter pipeline (pre-coverage, post-coverage / watermark cap, existence + state check) that
ConsistencyMgrapplies to base-segment add / update / delete events — including how it decides which buckets to enumerate and which transitions to emit — lives in the linked design doc.This PR is the first implementation pass that fully matches that document; the post-merge state machine has a single owner (
MaterializedViewPartitionManager) and one test surface that exercises every transition in the table above.Why introduce
MaterializedViewPartitionManagerBefore this PR the per-partition mutation logic was spread across three sites — the minion executor, the controller-side task scheduler, and the consistency manager — each shipping its own ~50-line CAS-write retry loop with its own retry budget, its own jittered backoff, and its own hand-rolled
Map<Long, PartitionInfo>copy/edit code:APPEND/OVERWRITE/DELETEtask commitDEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTSSTALErevertMAX_PARTITION_STATE_PERSIST_RETRIES(8, fixed)MAX_MARK_RETRIES+ThreadLocalRandombackoffConcrete problems this caused:
APPENDwas a separate ZK write from the bucket insert in some refactor proposals; under concurrent writers this opens a window where the partition map andwatermarkMsdisagree, which in turn poisons the broker'snow - watermarkMs <= stalenessThresholdMscheck.existing.getState() == STALEbeforeOVERWRITE).PartitionInfoobjects directly, bypassing every invariant the manager would otherwise enforce.The new design replaces all three loops with one CAS engine (
MaterializedViewPartitionManager#applyMutation), exposes one public method per documented operation (appendValid/refreshValid/clearValid/revertValid/markStale/deletePartition), and bundles watermark advancement into the same atomic write that mutates the bucket. ThePartitionInfoconstructor is locked to package access so production code outside the metadata package physically cannot bypass the manager — tests opt in via a single named factoryPartitionInfo.forTesting(...)annotated@VisibleForTesting.Net effect: one retry budget, one backoff implementation, one set of preconditions, one place to add the next state. Future transitions (
VACANT → STALEsynthesize for in-coverage backfill, an explicitVALID-emptystate, etc.) become a one-method addition with predictable behavior, not a three-site coordination exercise.Bug fixes that landed alongside the refactor
Two separate correctness bugs were fixed as part of this pass — both touch the same partition-state machinery the manager now owns, and both now have regression tests in the manager's unit-test surface:
Watermark over-advance past latest source data. The MV scheduler kept advancing
watermarkMstowardnow - bufferMseven when the base table had stopped receiving new data, which made the broker's freshness checknow - watermarkMs <= stalenessThresholdMsreturntruefor stale MV data.watermarkMsis now capped at the latestendTimepresent in the source segments, so the staleness check reflects real data freshness instead of wall-clock drift.Backfill into a previously-deleted bucket silently dropped. When a
STALEbucket's source data was retention-deleted, the executor'sDELETEbranch removed the runtimePartitionInfoentry entirely. That left the bucket in theabsentstate, which the consistency manager'smarkStalepass deliberately skips, so any subsequent base-table backfill into that window never propagated to the MV. TheDELETEbranch now writesVALID + PartitionFingerprint.EMPTYinstead, so the bucket follows the standardVALID → STALE → OVERWRITEcycle on backfill.PartitionFingerprint.EMPTYis byte-identical to whatcomputeWindowFingerprintalready produces when the overlapping segment list is empty (both feed an empty input throughfarmHashFingerprint64), so existing ZK records remain comparable across rolling upgrades — no migration required.Other clean-ups bundled in
Two verbatim copies of the filter + sort +
farmHashFingerprint64window-fingerprint algorithm (one in the scheduler, one in the executor) are collapsed intoMaterializedViewTaskUtils#computeWindowFingerprint. Drift between the two copies would silently break the executor's commit-time fingerprint validation and produce tasks that retry forever without advancing the watermark — one of the worst classes of MV bugs to triage.Backward compatibility / rolling upgrade
PartitionFingerprint.EMPTYbeing byte-identical to the empty-overlap fingerprint, so znodes written by the old executor remain comparable to znodes written by the new one.PartitionInfoconstructor is now package-private; the only production callers were already inside themetadatapackage. Tests use the newPartitionInfo.forTesting(...)factory.Tests
MaterializedViewPartitionManagerTestexercises every public method, including CAS retry / version-conflict paths and precondition violations.PartitionInfo.forTestingfactory.airlineStats_mv_hardening_smokewith1mrefresh and exercised the segment-delete → STALE → executor-DELETE → backfill path; verified that theVALID-emptywrite keeps the bucket re-syncing on backfill, and that the watermark stops advancing once base-table ingestion stalls.Suggested labels
bugfix,refactor