evolvedValueFilterMapping = new ConcurrentHashMap<>();
+ private final boolean pkTableLimitPushDownEnabled;
+
public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -88,7 +98,8 @@ public KeyValueFileStoreScan(
boolean deletionVectorsEnabled,
MergeEngine mergeEngine,
ChangelogProducer changelogProducer,
- boolean fileIndexReadEnabled) {
+ boolean fileIndexReadEnabled,
+ boolean pkTableLimitPushDownEnabled) {
super(
manifestsReader,
snapshotManager,
@@ -110,6 +121,7 @@ public KeyValueFileStoreScan(
this.mergeEngine = mergeEngine;
this.changelogProducer = changelogProducer;
this.fileIndexReadEnabled = fileIndexReadEnabled;
+ this.pkTableLimitPushDownEnabled = pkTableLimitPushDownEnabled;
}
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -203,6 +215,124 @@ private boolean isValueFilterEnabled() {
}
}
+ /**
+ * Check if limit pushdown is supported for PK tables.
+ *
+ * Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion
+ * vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks
+ * (no overlapping, no delete rows) are done in applyLimitPushdownForBucket.
+ */
+ @Override
+ public boolean supportsLimitPushManifestEntries() {
+ if (mergeEngine == PARTIAL_UPDATE
+ || mergeEngine == AGGREGATE
+ || !pkTableLimitPushDownEnabled) {
+ return false;
+ }
+
+ return limit != null && limit > 0 && !deletionVectorsEnabled;
+ }
+
+ /**
+ * Apply limit pushdown by grouping files by bucket and accumulating row counts until limit is
+ * reached. For buckets that can't safely push down limit (overlapping files or delete rows),
+ * include all files.
+ */
+ @Override
+ protected Iterator limitPushManifestEntries(Iterator entries) {
+ long startTime = System.nanoTime();
+ List allEntries = ListUtils.toList(entries);
+ Map, List> buckets = groupByBucket(allEntries);
+
+ List result = new ArrayList<>();
+ long accumulatedRowCount = 0;
+
+ for (List bucketEntries : buckets.values()) {
+ if (accumulatedRowCount >= limit) {
+ break;
+ }
+
+ long remainingLimit = limit - accumulatedRowCount;
+ List processedBucket =
+ applyLimitPushdownForBucket(bucketEntries, remainingLimit);
+ if (processedBucket == null) {
+ result.addAll(bucketEntries);
+ } else {
+ result.addAll(processedBucket);
+ for (ManifestEntry entry : processedBucket) {
+ long fileRowCount = entry.file().rowCount();
+ accumulatedRowCount += fileRowCount;
+ }
+ }
+ }
+
+ long duration = (System.nanoTime() - startTime) / 1_000_000;
+ LOG.info(
+ "Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, "
+ + "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}",
+ duration,
+ limit,
+ allEntries.size(),
+ result.size(),
+ mergeEngine,
+ scanMode,
+ deletionVectorsEnabled);
+ return result.iterator();
+ }
+
+ /**
+ * Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe.
+ *
+ * Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For
+ * non-overlapping files with no delete rows, accumulates row counts until limit is reached.
+ *
+ * @param bucketEntries files in the same bucket
+ * @param limit the limit to apply
+ * @return files to include, or null if we can't safely push down limit
+ */
+ @Nullable
+ private List applyLimitPushdownForBucket(
+ List bucketEntries, long limit) {
+ // Check if this bucket has overlapping files (LSM property)
+ boolean hasOverlapping = !noOverlapping(bucketEntries);
+
+ if (hasOverlapping) {
+ // For buckets with overlapping, we can't safely push down limit because files
+ // need to be merged and we can't accurately calculate the merged row count.
+ return null;
+ }
+
+ // For buckets without overlapping and with merge engines that don't require
+ // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
+ // and stop when limit is reached, but only if files have no delete rows.
+ List result = new ArrayList<>();
+ long accumulatedRowCount = 0;
+
+ for (ManifestEntry entry : bucketEntries) {
+ long fileRowCount = entry.file().rowCount();
+ // Check if file has delete rows - if so, we can't accurately calculate
+ // the merged row count, so we need to stop limit pushdown
+ boolean hasDeleteRows =
+ entry.file().deleteRowCount().map(count -> count > 0L).orElse(false);
+
+ if (hasDeleteRows) {
+ // If file has delete rows, we can't accurately calculate merged row count
+ // without reading the actual data. Can't safely push down limit.
+ return null;
+ }
+
+ // File has no delete rows, no overlapping, and merge engine doesn't require merge.
+ // Safe to count rows.
+ result.add(entry);
+ accumulatedRowCount += fileRowCount;
+ if (accumulatedRowCount >= limit) {
+ break;
+ }
+ }
+
+ return result;
+ }
+
@Override
protected boolean postFilterManifestEntriesEnabled() {
return valueFilter != null && scanMode == ScanMode.ALL;
@@ -214,15 +344,8 @@ protected List postFilterManifestEntries(List file
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
- return files.stream()
- .collect(
- Collectors.groupingBy(
- // we use LinkedHashMap to avoid disorder
- file -> Pair.of(file.partition(), file.bucket()),
- LinkedHashMap::new,
- Collectors.toList()))
- .values()
- .stream()
+ Map, List> buckets = groupByBucket(files);
+ return buckets.values().stream()
.map(this::doFilterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());
@@ -316,4 +439,19 @@ private static boolean noOverlapping(List entries) {
return true;
}
+
+ /**
+ * Group manifest entries by (partition, bucket) while preserving order. This is a common
+ * operation used by both limitPushManifestEntries and postFilterManifestEntries.
+ */
+ private Map, List> groupByBucket(
+ List entries) {
+ return entries.stream()
+ .collect(
+ Collectors.groupingBy(
+ // we use LinkedHashMap to avoid disorder
+ file -> Pair.of(file.partition(), file.bucket()),
+ LinkedHashMap::new,
+ Collectors.toList()));
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 4419b3890ef5..41efbe25222a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -31,6 +31,9 @@
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import org.apache.paimon.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -40,6 +43,8 @@
/** {@link TableScan} implementation for batch planning. */
public class DataTableBatchScan extends AbstractDataTableScan {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTableBatchScan.class);
+
private StartingScanner startingScanner;
private boolean hasNext;
@@ -134,6 +139,7 @@ private Optional applyPushDownLimit() {
long scannedRowCount = 0;
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
List splits = plan.dataSplits();
+ LOG.info("Applying limit pushdown. Original splits count: {}", splits.size());
if (splits.isEmpty()) {
return Optional.of(result);
}
@@ -147,6 +153,11 @@ private Optional applyPushDownLimit() {
if (scannedRowCount >= pushDownLimit) {
SnapshotReader.Plan newPlan =
new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);
+ LOG.info(
+ "Limit pushdown applied successfully. Original splits: {}, Limited splits: {}, Pushdown limit: {}",
+ splits.size(),
+ limitedSplits.size(),
+ pushDownLimit);
return Optional.of(new ScannedResult(newPlan));
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index f904cde0819f..ba30fd7d7fdd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -778,6 +778,7 @@ public static class Builder {
private final TableSchema tableSchema;
private CoreOptions.ChangelogProducer changelogProducer;
+ private Boolean pkTableLimitPushDownEnabled;
public Builder(
String format,
@@ -800,6 +801,7 @@ public Builder(
this.tableSchema = tableSchema;
this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
+ this.pkTableLimitPushDownEnabled = null; // Use default from CoreOptions
}
public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer) {
@@ -807,6 +809,11 @@ public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer
return this;
}
+ public Builder pkTableLimitPushDownEnabled(boolean enabled) {
+ this.pkTableLimitPushDownEnabled = enabled;
+ return this;
+ }
+
public TestFileStore build() {
Options conf =
tableSchema == null ? new Options() : Options.fromMap(tableSchema.options());
@@ -829,6 +836,10 @@ public TestFileStore build() {
// disable dynamic-partition-overwrite in FileStoreCommit layer test
conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false);
+ if (pkTableLimitPushDownEnabled != null) {
+ conf.set(CoreOptions.PK_TABLE_LIMIT_PUSH_DOWN_ENABLED, pkTableLimitPushDownEnabled);
+ }
+
return new TestFileStore(
root,
new CoreOptions(conf),
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 4f3d5c1c24dd..66962f00fcee 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
@@ -30,9 +31,10 @@
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,7 +61,6 @@ public class KeyValueFileStoreScanTest {
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
private TestFileStore store;
- private SnapshotManager snapshotManager;
@BeforeEach
public void beforeEach() throws Exception {
@@ -75,8 +76,8 @@ public void beforeEach() throws Exception {
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
DeduplicateMergeFunction.factory(),
null)
+ .pkTableLimitPushDownEnabled(true)
.build();
- snapshotManager = store.snapshotManager();
SchemaManager schemaManager =
new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()));
@@ -271,6 +272,338 @@ public void testDropStatsInPlan() throws Exception {
}
}
+ @Test
+ public void testLimitPushdownWithoutValueFilter() throws Exception {
+ // Write multiple files to test limit pushdown
+ List data1 = generateData(50);
+ writeData(data1);
+ List data2 = generateData(50);
+ writeData(data2);
+ List data3 = generateData(50);
+ Snapshot snapshot = writeData(data3);
+
+ // Without limit, should read all files
+ KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+ assertThat(totalFiles).isGreaterThan(0);
+
+ // With limit, should read fewer files (limit pushdown should work)
+ KeyValueFileStoreScan scanWithLimit = store.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(10);
+ List filesWithLimit = scanWithLimit.plan().files();
+ // Limit pushdown should reduce the number of files read
+ assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles);
+ assertThat(filesWithLimit.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownWithValueFilter() throws Exception {
+ // Write data with different item values
+ List data1 = generateData(50, 0, 100L);
+ writeData(data1);
+ List data2 = generateData(50, 0, 200L);
+ writeData(data2);
+ List data3 = generateData(50, 0, 300L);
+ Snapshot snapshot = writeData(data3);
+
+ // Without valueFilter, limit pushdown should work
+ KeyValueFileStoreScan scanWithoutFilter = store.newScan();
+ scanWithoutFilter.withSnapshot(snapshot.id()).withLimit(10);
+ List filesWithoutFilter = scanWithoutFilter.plan().files();
+ int totalFilesWithoutFilter = filesWithoutFilter.size();
+ assertThat(totalFilesWithoutFilter).isGreaterThan(0);
+
+ // With valueFilter, limit pushdown should still work (postFilterManifestEntries runs first)
+ // postFilterManifestEntries filters files first, then limitPushManifestEntries operates
+ // on the filtered list. Using file.rowCount() as a conservative upper bound ensures
+ // we don't return fewer than limit rows, even if we might read slightly more files.
+ KeyValueFileStoreScan scanWithFilter = store.newScan();
+ scanWithFilter.withSnapshot(snapshot.id());
+ scanWithFilter.withValueFilter(
+ new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
+ .between(4, 100L, 200L));
+ scanWithFilter.withLimit(10);
+ List filesWithFilter = scanWithFilter.plan().files();
+
+ // Limit pushdown should work with valueFilter
+ // The number of files should be less than or equal to the total files after filtering
+ assertThat(filesWithFilter.size()).isGreaterThan(0);
+ assertThat(filesWithFilter.size()).isLessThanOrEqualTo(totalFilesWithoutFilter);
+ }
+
+ @Test
+ public void testLimitPushdownWithKeyFilter() throws Exception {
+ // Write data with different shop IDs
+ List data = generateData(200);
+ Snapshot snapshot = writeData(data);
+
+ // With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit
+ // pushdown)
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withKeyFilter(
+ new PredicateBuilder(RowType.of(new IntType(false)))
+ .equal(0, data.get(0).key().getInt(0)));
+ scan.withLimit(5);
+ List files = scan.plan().files();
+ assertThat(files.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownMultipleBuckets() throws Exception {
+ // Write data to multiple buckets to test limit pushdown across buckets
+ List data1 = generateData(30);
+ writeData(data1);
+ List data2 = generateData(30);
+ writeData(data2);
+ List data3 = generateData(30);
+ Snapshot snapshot = writeData(data3);
+
+ // Without limit, should read all files
+ KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+ assertThat(totalFiles).isGreaterThan(0);
+
+ // With limit, should read fewer files (limit pushdown should work across buckets)
+ KeyValueFileStoreScan scanWithLimit = store.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(20);
+ List filesWithLimit = scanWithLimit.plan().files();
+ // Limit pushdown should reduce the number of files read
+ assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles);
+ assertThat(filesWithLimit.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownWithSmallLimit() throws Exception {
+ // Test limit pushdown with a very small limit
+ List data1 = generateData(100);
+ writeData(data1);
+ List data2 = generateData(100);
+ writeData(data2);
+ Snapshot snapshot = writeData(data2);
+
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(1);
+ List files = scan.plan().files();
+ // Should read at least one file, but fewer than all files
+ assertThat(files.size()).isGreaterThan(0);
+ }
+
+ @Test
+ public void testLimitPushdownWithLargeLimit() throws Exception {
+ // Test limit pushdown with a large limit (larger than total rows)
+ List data1 = generateData(50);
+ writeData(data1);
+ List data2 = generateData(50);
+ Snapshot snapshot = writeData(data2);
+
+ KeyValueFileStoreScan scanWithoutLimit = store.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ KeyValueFileStoreScan scanWithLimit = store.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(10000);
+ List filesWithLimit = scanWithLimit.plan().files();
+ // With a large limit, should read all files
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownDisabled() throws Exception {
+ // Test that limit pushdown is disabled when PK_TABLE_LIMIT_PUSH_DOWN_ENABLED is false
+ TestFileStore storeDisabled =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ null)
+ .pkTableLimitPushDownEnabled(false)
+ .build();
+
+ List data1 = generateData(50);
+ writeData(data1, storeDisabled);
+ List data2 = generateData(50);
+ Snapshot snapshot = writeData(data2, storeDisabled);
+
+ // Without limit, should read all files
+ KeyValueFileStoreScan scanWithoutLimit = storeDisabled.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+ assertThat(totalFiles).isGreaterThan(0);
+
+ // With limit but pushdown disabled, should read all files (no pushdown)
+ KeyValueFileStoreScan scanWithLimit = storeDisabled.newScan();
+ scanWithLimit.withSnapshot(snapshot.id()).withLimit(10);
+ assertThat(scanWithLimit.supportsLimitPushManifestEntries()).isFalse();
+ List filesWithLimit = scanWithLimit.plan().files();
+ // Should read all files since limit pushdown is disabled
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownWithPartialUpdateMergeEngine() throws Exception {
+ // Test that limit pushdown is disabled for PARTIAL_UPDATE merge engine
+ // Create a store with PARTIAL_UPDATE merge engine by setting it in schema options
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()));
+ Schema schema =
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.singletonMap(
+ CoreOptions.MERGE_ENGINE.key(),
+ CoreOptions.MergeEngine.PARTIAL_UPDATE.name()),
+ null);
+ TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema);
+
+ TestFileStore storePartialUpdate =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .pkTableLimitPushDownEnabled(true)
+ .build();
+
+ List data1 = generateData(50);
+ writeData(data1, storePartialUpdate);
+ List data2 = generateData(50);
+ Snapshot snapshot = writeData(data2, storePartialUpdate);
+
+ KeyValueFileStoreScan scan = storePartialUpdate.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(10);
+ // supportsLimitPushManifestEntries should return false for PARTIAL_UPDATE
+ assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+ // Should read all files since limit pushdown is disabled
+ KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ List filesWithLimit = scan.plan().files();
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownWithAggregateMergeEngine() throws Exception {
+ // Test that limit pushdown is disabled for AGGREGATE merge engine
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()));
+ Schema schema =
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.singletonMap(
+ CoreOptions.MERGE_ENGINE.key(),
+ CoreOptions.MergeEngine.AGGREGATE.name()),
+ null);
+ TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema);
+
+ TestFileStore storeAggregate =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .pkTableLimitPushDownEnabled(true)
+ .build();
+
+ List data1 = generateData(50);
+ writeData(data1, storeAggregate);
+ List data2 = generateData(50);
+ Snapshot snapshot = writeData(data2, storeAggregate);
+
+ KeyValueFileStoreScan scan = storeAggregate.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(10);
+ // supportsLimitPushManifestEntries should return false for AGGREGATE
+ assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+ // Should read all files since limit pushdown is disabled
+ KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ List filesWithLimit = scan.plan().files();
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
+ @Test
+ public void testLimitPushdownWithDeletionVectors() throws Exception {
+ // Test that limit pushdown is disabled when deletion vectors are enabled
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()));
+ Schema schema =
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.singletonMap(
+ CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"),
+ null);
+ TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema);
+
+ TestFileStore storeWithDV =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .pkTableLimitPushDownEnabled(true)
+ .build();
+
+ List data1 = generateData(50);
+ writeData(data1, storeWithDV);
+ List data2 = generateData(50);
+ Snapshot snapshot = writeData(data2, storeWithDV);
+
+ KeyValueFileStoreScan scan = storeWithDV.newScan();
+ scan.withSnapshot(snapshot.id()).withLimit(10);
+ // supportsLimitPushManifestEntries should return false when deletion vectors are enabled
+ assertThat(scan.supportsLimitPushManifestEntries()).isFalse();
+
+ // Should read all files since limit pushdown is disabled
+ KeyValueFileStoreScan scanWithoutLimit = storeWithDV.newScan();
+ scanWithoutLimit.withSnapshot(snapshot.id());
+ List filesWithoutLimit = scanWithoutLimit.plan().files();
+ int totalFiles = filesWithoutLimit.size();
+
+ List filesWithLimit = scan.plan().files();
+ assertThat(filesWithLimit.size()).isEqualTo(totalFiles);
+ }
+
private void runTestExactMatch(
FileStoreScan scan, Long expectedSnapshotId, Map expected)
throws Exception {
@@ -307,10 +640,6 @@ private List generateData(int numRecords) {
return data;
}
- private List generateData(int numRecords, int hr) {
- return generateData(numRecords, hr, null);
- }
-
private List generateData(int numRecords, int hr, Long itemId) {
List data = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
@@ -320,7 +649,11 @@ private List generateData(int numRecords, int hr, Long itemId) {
}
private Snapshot writeData(List kvs) throws Exception {
- List snapshots = store.commitData(kvs, gen::getPartition, this::getBucket);
+ return writeData(kvs, store);
+ }
+
+ private Snapshot writeData(List kvs, TestFileStore testStore) throws Exception {
+ List snapshots = testStore.commitData(kvs, gen::getPartition, this::getBucket);
return snapshots.get(snapshots.size() - 1);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index e1426fc8b56d..e27e16eb609e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -1579,4 +1579,186 @@ private void assertResult(int numProducers) {
}
}
}
+
+ @Test
+ public void testLimitPushdownWithTimeFilter() throws Exception {
+ // This test verifies that limit pushdown works correctly when valueFilter
+ // (e.g., time-based where conditions) is present. postFilterManifestEntries runs first
+ // to filter files, then limitPushManifestEntries operates on the filtered list.
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "ts TIMESTAMP(3), "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ")");
+
+ // Insert data with different timestamps
+ tEnv.executeSql(
+ "INSERT INTO T VALUES "
+ + "(1, 'a', TIMESTAMP '2024-01-01 10:00:00'), "
+ + "(2, 'b', TIMESTAMP '2024-01-01 11:00:00'), "
+ + "(3, 'c', TIMESTAMP '2024-01-01 12:00:00'), "
+ + "(4, 'd', TIMESTAMP '2024-01-01 13:00:00'), "
+ + "(5, 'e', TIMESTAMP '2024-01-01 14:00:00')")
+ .await();
+
+ // Without filter, limit pushdown should work
+ try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) {
+ List allRows = new ArrayList<>();
+ iter.forEachRemaining(allRows::add);
+ assertThat(allRows.size()).isEqualTo(3);
+ }
+
+ // Test limit pushdown with time filter (4 rows match, LIMIT 3)
+ try (CloseableIterator iter =
+ tEnv.executeSql(
+ "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 11:00:00' LIMIT 3")
+ .collect()) {
+ List filteredRows = new ArrayList<>();
+ iter.forEachRemaining(filteredRows::add);
+ assertThat(filteredRows.size()).isGreaterThanOrEqualTo(3);
+ assertThat(filteredRows.size()).isLessThanOrEqualTo(4);
+ for (Row row : filteredRows) {
+ java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2);
+ java.time.LocalDateTime filterTime =
+ java.time.LocalDateTime.parse("2024-01-01T11:00:00");
+ assertThat(ts).isAfterOrEqualTo(filterTime);
+ }
+ }
+
+ // Test with more restrictive filter (3 rows match, LIMIT 2)
+ try (CloseableIterator iter =
+ tEnv.executeSql(
+ "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 12:00:00' LIMIT 2")
+ .collect()) {
+ List filteredRows2 = new ArrayList<>();
+ iter.forEachRemaining(filteredRows2::add);
+ assertThat(filteredRows2.size()).isGreaterThanOrEqualTo(2);
+ assertThat(filteredRows2.size()).isLessThanOrEqualTo(3);
+ for (Row row : filteredRows2) {
+ java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2);
+ java.time.LocalDateTime filterTime =
+ java.time.LocalDateTime.parse("2024-01-01T12:00:00");
+ assertThat(ts).isAfterOrEqualTo(filterTime);
+ }
+ }
+ }
+
+ @Test
+ public void testLimitPushdownBasic() throws Exception {
+ // Test basic limit pushdown
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await();
+
+ try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) {
+ List rows = new ArrayList<>();
+ iter.forEachRemaining(rows::add);
+
+ assertThat(rows.size()).isEqualTo(5);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownWithDeletionVector() throws Exception {
+ // Test limit pushdown is disabled when deletion vector is enabled
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + "'deletion-vectors.enabled' = 'true'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await();
+
+ tEnv.executeSql("DELETE FROM T WHERE id = 2").await();
+
+ // Limit pushdown should be disabled when deletion vector is enabled
+ // because we can't accurately calculate row count after applying deletion vectors
+ try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) {
+ List rows = new ArrayList<>();
+ iter.forEachRemaining(rows::add);
+
+ assertThat(rows.size()).isEqualTo(3);
+
+ for (Row row : rows) {
+ assertThat(row.getField(0)).isNotEqualTo(2);
+ }
+ }
+ }
+
+ @Test
+ public void testLimitPushdownWithPkTableLimitPushDownEnabled() throws Exception {
+ // Test limit pushdown when PK_TABLE_LIMIT_PUSH_DOWN_ENABLED is explicitly enabled
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + "'pk-table-limit-push-down.enabled' = 'true'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await();
+
+ // Limit pushdown should work when explicitly enabled
+ try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) {
+ List rows = new ArrayList<>();
+ iter.forEachRemaining(rows::add);
+ assertThat(rows.size()).isEqualTo(5);
+ }
+ }
+
+ @Test
+ public void testLimitPushdownWithPkTableLimitPushDownDisabled() throws Exception {
+ // Test limit pushdown is disabled when PK_TABLE_LIMIT_PUSH_DOWN_ENABLED is false
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ tEnv.executeSql("USE CATALOG testCatalog");
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + "id INT, "
+ + "name STRING, "
+ + "PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + "'pk-table-limit-push-down.enabled' = 'false'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await();
+ tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await();
+
+ // Limit pushdown should be disabled, but query should still work correctly
+ try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) {
+ List rows = new ArrayList<>();
+ iter.forEachRemaining(rows::add);
+ // Should still return correct number of rows, just without pushdown optimization
+ assertThat(rows.size()).isEqualTo(5);
+ }
+ }
}