Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.materializedview.metadata;

import com.google.common.hash.Hashing;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,29 @@
public class PartitionFingerprint {
private static final char SEPARATOR = ',';

/// Canonical fingerprint for an "empty" partition window — no base segments overlap.
///
/// The `crcChecksum` field is initialised by running the same `farmHashFingerprint64`
/// hasher with no input bytes, which is byte-identical to what
/// `MaterializedViewTaskUtils#computeWindowFingerprint` (the single source of truth used
/// by both the scheduler and the minion executor) produces when the overlapping segment
/// list is empty. Two consequences:
///
/// - Existing ZK records written by the APPEND-empty path (carrying `(0, farmHash64(""))`)
/// are byte-equal to this constant, so `equals` comparisons against [#EMPTY] continue
/// to behave correctly across rolling upgrades.
///
/// - The DELETE task executor uses [#EMPTY] when it persists a `VALID + empty`
/// PartitionInfo after retention-deleting the source data. Reusing the same value the
/// APPEND-empty path naturally produces avoids introducing a second representation of
/// "empty fingerprint" that would silently fail equality checks.
///
/// `farmHashFingerprint64("")` is a deterministic non-zero constant; never use
/// `new PartitionFingerprint(0, 0L)` as a stand-in for "empty" — the two values do NOT
/// compare equal.
public static final PartitionFingerprint EMPTY =
new PartitionFingerprint(0, Hashing.farmHashFingerprint64().newHasher().hash().asLong());

/// Number of base table segments whose time range overlaps this partition window.
private final int _segmentCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.materializedview.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -40,6 +41,12 @@
/// unlike the prior packed `"V,10,5000,1700006400000"` string format.
///
/// Thread-safety: instances are immutable after construction.
///
/// Construction discipline: production callers MUST go through
/// [org.apache.pinot.materializedview.metadata.MaterializedViewPartitionManager] —
/// the constructor is package-private so every state transition is funnelled through
/// the manager's CAS engine. Tests in other packages use [#forTesting] for fixture
/// setup; the dedicated name keeps any production-side misuse obvious in code review.
public class PartitionInfo {
private static final String STATE_KEY = "state";
private static final String SEGMENT_COUNT_KEY = "segmentCount";
Expand All @@ -50,12 +57,21 @@ public class PartitionInfo {
private final PartitionFingerprint _fingerprint;
private final long _lastRefreshTime;

public PartitionInfo(PartitionState state, PartitionFingerprint fingerprint, long lastRefreshTime) {
PartitionInfo(PartitionState state, PartitionFingerprint fingerprint, long lastRefreshTime) {
_state = state;
_fingerprint = fingerprint;
_lastRefreshTime = lastRefreshTime;
}

/// Test-only factory for seeding partition fixtures in cross-package tests. Production
/// code MUST NOT call this — every real state transition goes through
/// [org.apache.pinot.materializedview.metadata.MaterializedViewPartitionManager].
@VisibleForTesting
public static PartitionInfo forTesting(PartitionState state, PartitionFingerprint fingerprint,
long lastRefreshTime) {
return new PartitionInfo(state, fingerprint, lastRefreshTime);
}

public PartitionState getState() {
return _state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@

/// State of a materialized partition in the MV lifecycle.
///
/// - `VALID` – partition is up-to-date with base table data.
/// - `STALE` – base table data has changed since last materialization; partition
/// needs OVERWRITE.
/// - `VALID` – partition is up-to-date with base table data. When the base table window
/// is empty (no overlapping segments), the entry carries
/// [PartitionFingerprint#EMPTY] and is still `VALID`; the broker treats those buckets
/// as "covered by the MV with no rows".
/// - `STALE` – base table data has changed since the partition was last materialized;
/// the scheduler will dispatch a recompute (OVERWRITE), or DELETE if the source
/// window is now empty.
///
/// Partition expiration is modeled by **absence** from the runtime metadata's
/// partition map, not as a separate state. The DELETE task path removes the
/// map entry; the broker then treats that bucket as "not covered by the MV"
/// and routes those queries to the base table.
/// `absent` (no entry in the partition map) is reserved for cold-start, before the first
/// APPEND has run for that bucket. After the first run of any task type — APPEND with
/// data, APPEND with empty source, OVERWRITE, or DELETE — the bucket carries an entry
/// (`VALID` with a real fingerprint, or `VALID` with [PartitionFingerprint#EMPTY]).
/// The DELETE task explicitly does NOT remove the entry; it rewrites it to
/// `VALID + PartitionFingerprint.EMPTY` so subsequent backfills into the now-empty
/// window flow through the standard `VALID → STALE → OVERWRITE` cycle.
///
/// Encoded as a single character (`"V"` / `"S"`) for compact ZK storage.
public enum PartitionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
Expand All @@ -45,6 +40,7 @@
import org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadata;
import org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadata.MaterializedViewSplitSpec;
import org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadataUtils;
import org.apache.pinot.materializedview.metadata.MaterializedViewPartitionManager;
import org.apache.pinot.materializedview.metadata.MaterializedViewRuntimeMetadata;
import org.apache.pinot.materializedview.metadata.MaterializedViewRuntimeMetadataUtils;
import org.apache.pinot.materializedview.metadata.PartitionFingerprint;
Expand Down Expand Up @@ -262,7 +258,31 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
// bucketTimePeriod (operator changed config mid-flight); the partition map is keyed by
// aligned starts, so a misaligned nextWindowStartMs would miss VALID-skip lookups.
long nextWindowStartMs = Math.floorDiv(maxInFlightAppendWindowEndMs, bucketMs) * bucketMs;
long cutoffMs = System.currentTimeMillis() - bufferMs;

// Resolve the APPEND cutoff in two stages so steady-state cycles do not pay a ZK list
// call. Stage 1: rough cutoff = `now - bufferMs`. If the next window already lies past
// it, no scheduling work is possible regardless of source state, so we fall through to
// the loop where the first iteration breaks immediately — and the source-segment
// fetch below is skipped. Stage 2: when stage 1 admits at least one candidate bucket,
// fetch source segments once and tighten the cutoff by `max(source segment endTimeMs)`.
// The tighter cap matters because without it the scheduler would walk past the actual
// data tail (each empty bucket completes with zero rows, executor persists a VALID-empty
// partition + advances watermark), so `watermarkMs` would drift toward `now - bufferMs`
// even when source ingestion has stalled — defeating the staleness-threshold contract
// the broker uses in `now - watermarkMs <= stalenessThresholdMs`. See
// `MaterializedViewQueryRewriteEngine#isEligible`.
long roughCutoffMs = System.currentTimeMillis() - bufferMs;
long cutoffMs;
if (nextWindowStartMs + bucketMs > roughCutoffMs) {
// No bucket can fit before the rough cutoff. Skip the ZK fetch — the loop below
// will break on its first iteration and the caught-up log will fire.
cutoffMs = roughCutoffMs;
} else {
long maxSourceEndMs =
computeMaxSourceEndTimeMs(_context.getSegmentsZKMetadata(sourceTableWithType));
cutoffMs =
(maxSourceEndMs == Long.MIN_VALUE) ? roughCutoffMs : Math.min(roughCutoffMs, maxSourceEndMs);
}
int scheduled = 0;

// Hard cap on iterations to defend against pathological partition maps. The loop
Expand Down Expand Up @@ -343,7 +363,8 @@ private PinotTaskConfig tryHandleStalePartition(String viewTableName, String sou
long windowEndMs = windowStartMs + bucketMs;
PartitionInfo staleInfo = partitionInfos.get(earliestStaleMs);

PartitionFingerprint currentFp = computeWindowFingerprint(sourceTableWithType, windowStartMs, windowEndMs);
PartitionFingerprint currentFp = MaterializedViewTaskUtils.computeWindowFingerprint(
_context.getSegmentsZKMetadata(sourceTableWithType), windowStartMs, windowEndMs);

if (currentFp.getSegmentCount() == 0) {
LOGGER.info("STALE partition [{}, {}) base data deleted for table: {}. Generating DELETE task.",
Expand All @@ -360,7 +381,18 @@ private PinotTaskConfig tryHandleStalePartition(String viewTableName, String sou
if (currentFp.equals(staleInfo.getFingerprint())) {
LOGGER.info("STALE partition [{}, {}) fingerprint matches for table: {}. "
+ "Reverting to VALID (false positive).", windowStartMs, windowEndMs, viewTableName);
persistPartitionStateChangeWithRetry(viewTableName, earliestStaleMs, PartitionState.VALID);
// Best-effort revert: if it fails (CAS budget exhausted, transient ZK error), the
// partition stays STALE and the next scheduling cycle either reverts again (if the
// fingerprint still matches) or generates an OVERWRITE task — either way the system
// self-heals. Spending the executor's critical-write retry budget on this avoidable
// optimization would be wasteful, so the manager uses its smaller `revert` profile.
try {
new MaterializedViewPartitionManager(_context.getPropertyStore(),
_context::getClusterConfig).revertValid(viewTableName, earliestStaleMs);
} catch (RuntimeException e) {
LOGGER.warn("Failed to revert STALE partition {} to VALID for MV table: {}; will retry "
+ "on the next scheduling cycle", earliestStaleMs, viewTableName, e);
}
return null;
}

Expand All @@ -371,82 +403,6 @@ private PinotTaskConfig tryHandleStalePartition(String viewTableName, String sou
effectiveLimit, userDeclaredLimit);
}

/// CAS-retry budget for STALE -> VALID transitions written by the scheduler.
/// The runtime znode is concurrently mutated by the executor (after each task completion) and by
/// the consistency manager (on base table changes). A bounded retry loop converges in practice.
private static final int MAX_PARTITION_STATE_PERSIST_RETRIES = 8;

/// Persists a STALE -> VALID transition (false-positive recovery) under a CAS retry loop.
/// On each attempt the latest runtime znode is re-fetched, the target partition's state is
/// re-evaluated, and the change is rewritten on top of the current version. This preserves
/// concurrent updates from the executor (watermark advance) and the consistency manager
/// (other partitions' STALE markings).
///
/// If the partition is no longer STALE on a retry (executor or consistency manager
/// already changed it), the method exits successfully — the desired transition is either
/// already done or no longer applicable to a stale view of the world.
///
/// If the budget is exhausted, logs ERROR. The next scheduling cycle will retry.
private void persistPartitionStateChangeWithRetry(String viewTableName, long partitionStartMs,
PartitionState newState) {
String viewTableWithType = TableNameBuilder.OFFLINE.tableNameWithType(viewTableName);
Exception lastException = null;
for (int attempt = 0; attempt < MAX_PARTITION_STATE_PERSIST_RETRIES; attempt++) {
Stat stat = new Stat();
MaterializedViewRuntimeMetadata current = MaterializedViewRuntimeMetadataUtils.fetchWithVersion(
_context.getPropertyStore(), viewTableWithType, stat);
if (current == null) {
LOGGER.warn("Runtime metadata missing for MV table: {} during partition state persist; aborting",
viewTableName);
return;
}
Map<Long, PartitionInfo> currentInfos = current.getPartitions();
PartitionInfo info = currentInfos.get(partitionStartMs);
if (info == null || info.getState() != PartitionState.STALE) {
LOGGER.info("Partition {} for MV table: {} is no longer STALE on attempt {}; skipping persist",
partitionStartMs, viewTableName, attempt + 1);
return;
}
Map<Long, PartitionInfo> updatedInfos = new HashMap<>(currentInfos);
updatedInfos.put(partitionStartMs, info.withState(newState));
MaterializedViewRuntimeMetadata updated = new MaterializedViewRuntimeMetadata(
current.getMaterializedViewTableNameWithType(),
current.getWatermarkMs(),
updatedInfos);
try {
MaterializedViewRuntimeMetadataUtils.persist(_context.getPropertyStore(), updated, stat.getVersion());
LOGGER.info("Persisted partition {} state {} -> {} for MV table: {} on attempt {}",
partitionStartMs, PartitionState.STALE, newState, viewTableName, attempt + 1);
return;
} catch (IllegalStateException e) {
// Writer-side invariant violation surfaced by `validateForPersist` on the freshly-fetched
// runtime. Retrying will not change the underlying state — fail fast so the caller can
// surface the bug instead of burning the retry budget.
LOGGER.error("Aborting CAS retry for MV table {}: writer-side invariant violation "
+ "({}). Generator will not retry until the underlying runtime znode is fixed.",
viewTableName, e.getMessage());
return;
} catch (Exception e) {
lastException = e;
LOGGER.debug("CAS conflict on attempt {} persisting partition {} state for MV table: {}",
attempt + 1, partitionStartMs, viewTableName, e);
}
// Small jittered backoff so a tight CAS race doesn't burn the budget in microseconds
// and starve the competing writers — also gives transient ZK errors a chance to resolve.
try {
Thread.sleep(5L + ThreadLocalRandom.current().nextInt(20));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while persisting partition {} state for MV table: {}",
partitionStartMs, viewTableName);
return;
}
}
LOGGER.error("Failed to persist partition {} state {} for MV table: {} after {} retries. "
+ "Generator will retry on next scheduling cycle. Last exception:",
partitionStartMs, newState, viewTableName, MAX_PARTITION_STATE_PERSIST_RETRIES, lastException);
}

/// Builds a complete [PinotTaskConfig] for either APPEND or OVERWRITE mode.
///
/// @param effectiveLimit pre-resolved LIMIT (user-declared, or
Expand All @@ -461,8 +417,8 @@ private PinotTaskConfig buildTaskConfig(String viewTableName, String sourceTable
boolean userDeclaredLimit) {
String taskType = MaterializedViewTask.TASK_TYPE;

PartitionFingerprint windowFingerprint =
computeWindowFingerprint(sourceTableWithType, windowStartMs, windowEndMs);
PartitionFingerprint windowFingerprint = MaterializedViewTaskUtils.computeWindowFingerprint(
_context.getSegmentsZKMetadata(sourceTableWithType), windowStartMs, windowEndMs);

// The source time column may use any DateTimeFieldSpec format (TIMESTAMP, INT-days, etc.).
// Convert the window boundaries to the source's native unit so the appended WHERE filter
Expand Down Expand Up @@ -989,43 +945,28 @@ private String resolveSourceTableNameWithType(String rawSourceTableName) {
return sourceTableWithType;
}

/// Computes a [PartitionFingerprint] by fetching segments from ZK.
private PartitionFingerprint computeWindowFingerprint(String sourceTableWithType,
long windowStartMs, long windowEndMs) {
return computeWindowFingerprint(_context.getSegmentsZKMetadata(sourceTableWithType),
windowStartMs, windowEndMs);
}

/// Computes a [PartitionFingerprint] for the given time window from pre-fetched
/// segment metadata.
/// Returns the maximum valid `endTimeMs` across all source segments, or [Long#MIN_VALUE]
/// when none of the segments carry a usable end time (empty list, or every segment has a
/// negative end time — the legacy "unset" marker `SegmentZKMetadata` returns for old
/// znodes without `TIME_UNIT`). Used by [#generateTasks] to cap the APPEND cutoff at the
/// latest known source data: without this cap, the scheduler would walk past the actual
/// data tail one bucket at a time (each empty bucket completes with zero rows, the
/// executor persists a VALID-empty partition + advances watermark), so `watermarkMs`
/// would drift toward `now - bufferMs` even when source ingestion has stalled — defeating
/// the staleness-threshold contract in the rewrite engine, where
/// `now - watermarkMs <= stalenessThresholdMs` is meant to reflect real data freshness.
///
/// The fingerprint is `Hashing.farmHashFingerprint64` over the sorted concatenation of
/// `<segmentName>\0<crc>\n` lines. Sorting makes the hash insensitive to listing order;
/// FarmHash64 is non-cryptographic but collision-resistant for non-adversarial inputs.
/// Replaces a previous XOR-CRC scheme that exhibited cancellation collisions (swap two
/// segments with the same combined contribution → identical fingerprint).
private PartitionFingerprint computeWindowFingerprint(List<SegmentZKMetadata> allSegments,
long windowStartMs, long windowEndMs) {
List<SegmentZKMetadata> overlapping = new ArrayList<>();
for (SegmentZKMetadata seg : allSegments) {
long segStartMs = seg.getStartTimeMs();
long segEndMs = seg.getEndTimeMs();
if (segStartMs < windowEndMs && segEndMs >= windowStartMs) {
overlapping.add(seg);
/// The `endMs >= 0` filter mirrors the cold-start scan in [#getWatermarkMs], which
/// already excludes negative `startTimeMs` for the same reason.
@VisibleForTesting
static long computeMaxSourceEndTimeMs(List<SegmentZKMetadata> segments) {
long maxEndMs = Long.MIN_VALUE;
for (SegmentZKMetadata seg : segments) {
long endMs = seg.getEndTimeMs();
if (endMs >= 0 && endMs > maxEndMs) {
maxEndMs = endMs;
}
}
overlapping.sort(Comparator.comparing(SegmentZKMetadata::getSegmentName));

Hasher hasher = Hashing.farmHashFingerprint64().newHasher();
for (SegmentZKMetadata seg : overlapping) {
hasher.putString(seg.getSegmentName(), StandardCharsets.UTF_8);
hasher.putByte((byte) 0);
hasher.putLong(seg.getCrc());
hasher.putByte((byte) '\n');
}
long crcFingerprint = hasher.hash().asLong();
LOGGER.info("Computed partition fingerprint for window [{}, {}): segmentCount={}, crcFingerprint={}",
windowStartMs, windowEndMs, overlapping.size(), crcFingerprint);
return new PartitionFingerprint(overlapping.size(), crcFingerprint);
return maxEndMs;
}
}
Loading
Loading