diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f4e0a6d1cd43..88c51418a93c 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1001,6 +1001,12 @@ String You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'. + +
pk-table-limit-push-down.enabled
+ false + Boolean + Whether to enable limit pushdown for primary key tables. If enabled, paimon will determine whether to push down the limit based on the table's merge engine type, file overlapping status, deletion vectors, and delete rows, which can further reduce the number of splits. +
postpone.batch-write-fixed-bucket
true diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index ade2ac9c3741..4e1148361e36 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2102,6 +2102,16 @@ public InlineElement getDescription() { .withDescription( "Whether to try upgrading the data files after overwriting a primary key table."); + public static final ConfigOption PK_TABLE_LIMIT_PUSH_DOWN_ENABLED = + key("pk-table-limit-push-down.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable limit pushdown for primary key tables. " + + "If enabled, paimon will determine whether to push down the limit " + + "based on the table's merge engine type, file overlapping status, " + + "deletion vectors, and delete rows, which can further reduce the number of splits."); + private final Options options; public CoreOptions(Map options) { @@ -3048,6 +3058,10 @@ public boolean commitDiscardDuplicateFiles() { return options.get(COMMIT_DISCARD_DUPLICATE_FILES); } + public boolean pkTableLimitPushDownEnabled() { + return options.get(PK_TABLE_LIMIT_PUSH_DOWN_ENABLED); + } + private Map callbacks( ConfigOption callbacks, ConfigOption callbackParam) { Map result = new HashMap<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 077068df73ff..2ae1a4649de4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -236,7 +236,8 @@ public KeyValueFileStoreScan newScan() { options.deletionVectorsEnabled(), options.mergeEngine(), options.changelogProducer(), - options.fileIndexReadEnabled() && options.deletionVectorsEnabled()); + options.fileIndexReadEnabled() && options.deletionVectorsEnabled(), + options.pkTableLimitPushDownEnabled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 284a2ef19579..64ccb38b0cc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -46,6 +46,9 @@ import org.apache.paimon.utils.Range; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.ArrayList; @@ -68,6 +71,8 @@ /** Default implementation of {@link FileStoreScan}. */ public abstract class AbstractFileStoreScan implements FileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class); + private final ManifestsReader manifestsReader; private final SnapshotManager snapshotManager; private final ManifestFile.Factory manifestFileFactory; @@ -277,18 +282,24 @@ public Plan plan() { List manifests = manifestsResult.filteredManifests; Iterator iterator = readManifestEntries(manifests, false); - if (supportsLimitPushManifestEntries()) { - iterator = limitPushManifestEntries(iterator); - } List files = ListUtils.toList(iterator); if (postFilterManifestEntriesEnabled()) { files = postFilterManifestEntries(files); } + if (supportsLimitPushManifestEntries()) { + iterator = limitPushManifestEntries(files.iterator()); + files = ListUtils.toList(iterator); + } + List result = files; long scanDuration = (System.nanoTime() - started) / 1_000_000; + LOG.info( + "File store scan plan completed in {} ms. Files size : {}", + scanDuration, + result.size()); if (scanMetrics != null) { long allDataFiles = manifestsResult.allManifests.stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index ef6fd1e52b8e..1ab2ab5e3e60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FilteredManifestEntry; @@ -34,15 +35,20 @@ import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ListUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -55,6 +61,8 @@ /** {@link FileStoreScan} for {@link KeyValueFileStore}. */ public class KeyValueFileStoreScan extends AbstractFileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreScan.class); + private final SimpleStatsEvolutions fieldKeyStatsConverters; private final SimpleStatsEvolutions fieldValueStatsConverters; private final BucketSelectConverter bucketSelectConverter; @@ -76,6 +84,8 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { // cache evolved filter by schema id private final Map evolvedValueFilterMapping = new ConcurrentHashMap<>(); + private final boolean pkTableLimitPushDownEnabled; + public KeyValueFileStoreScan( ManifestsReader manifestsReader, BucketSelectConverter bucketSelectConverter, @@ -88,7 +98,8 @@ public KeyValueFileStoreScan( boolean deletionVectorsEnabled, MergeEngine mergeEngine, ChangelogProducer changelogProducer, - boolean fileIndexReadEnabled) { + boolean fileIndexReadEnabled, + boolean pkTableLimitPushDownEnabled) { super( manifestsReader, snapshotManager, @@ -110,6 +121,7 @@ public KeyValueFileStoreScan( this.mergeEngine = mergeEngine; this.changelogProducer = changelogProducer; this.fileIndexReadEnabled = fileIndexReadEnabled; + this.pkTableLimitPushDownEnabled = pkTableLimitPushDownEnabled; } public KeyValueFileStoreScan withKeyFilter(Predicate predicate) { @@ -203,6 +215,124 @@ private boolean isValueFilterEnabled() { } } + /** + * Check if limit pushdown is supported for PK tables. + * + *

Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion + * vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks + * (no overlapping, no delete rows) are done in applyLimitPushdownForBucket. + */ + @Override + public boolean supportsLimitPushManifestEntries() { + if (mergeEngine == PARTIAL_UPDATE + || mergeEngine == AGGREGATE + || !pkTableLimitPushDownEnabled) { + return false; + } + + return limit != null && limit > 0 && !deletionVectorsEnabled; + } + + /** + * Apply limit pushdown by grouping files by bucket and accumulating row counts until limit is + * reached. For buckets that can't safely push down limit (overlapping files or delete rows), + * include all files. + */ + @Override + protected Iterator limitPushManifestEntries(Iterator entries) { + long startTime = System.nanoTime(); + List allEntries = ListUtils.toList(entries); + Map, List> buckets = groupByBucket(allEntries); + + List result = new ArrayList<>(); + long accumulatedRowCount = 0; + + for (List bucketEntries : buckets.values()) { + if (accumulatedRowCount >= limit) { + break; + } + + long remainingLimit = limit - accumulatedRowCount; + List processedBucket = + applyLimitPushdownForBucket(bucketEntries, remainingLimit); + if (processedBucket == null) { + result.addAll(bucketEntries); + } else { + result.addAll(processedBucket); + for (ManifestEntry entry : processedBucket) { + long fileRowCount = entry.file().rowCount(); + accumulatedRowCount += fileRowCount; + } + } + } + + long duration = (System.nanoTime() - startTime) / 1_000_000; + LOG.info( + "Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, " + + "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}", + duration, + limit, + allEntries.size(), + result.size(), + mergeEngine, + scanMode, + deletionVectorsEnabled); + return result.iterator(); + } + + /** + * Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe. + * + *

Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For + * non-overlapping files with no delete rows, accumulates row counts until limit is reached. + * + * @param bucketEntries files in the same bucket + * @param limit the limit to apply + * @return files to include, or null if we can't safely push down limit + */ + @Nullable + private List applyLimitPushdownForBucket( + List bucketEntries, long limit) { + // Check if this bucket has overlapping files (LSM property) + boolean hasOverlapping = !noOverlapping(bucketEntries); + + if (hasOverlapping) { + // For buckets with overlapping, we can't safely push down limit because files + // need to be merged and we can't accurately calculate the merged row count. + return null; + } + + // For buckets without overlapping and with merge engines that don't require + // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count + // and stop when limit is reached, but only if files have no delete rows. + List result = new ArrayList<>(); + long accumulatedRowCount = 0; + + for (ManifestEntry entry : bucketEntries) { + long fileRowCount = entry.file().rowCount(); + // Check if file has delete rows - if so, we can't accurately calculate + // the merged row count, so we need to stop limit pushdown + boolean hasDeleteRows = + entry.file().deleteRowCount().map(count -> count > 0L).orElse(false); + + if (hasDeleteRows) { + // If file has delete rows, we can't accurately calculate merged row count + // without reading the actual data. Can't safely push down limit. + return null; + } + + // File has no delete rows, no overlapping, and merge engine doesn't require merge. + // Safe to count rows. + result.add(entry); + accumulatedRowCount += fileRowCount; + if (accumulatedRowCount >= limit) { + break; + } + } + + return result; + } + @Override protected boolean postFilterManifestEntriesEnabled() { return valueFilter != null && scanMode == ScanMode.ALL; @@ -214,15 +344,8 @@ protected List postFilterManifestEntries(List file // Why do this: because in primary key table, we can't just filter the value // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`), // but we can do this by filter the whole bucket files - return files.stream() - .collect( - Collectors.groupingBy( - // we use LinkedHashMap to avoid disorder - file -> Pair.of(file.partition(), file.bucket()), - LinkedHashMap::new, - Collectors.toList())) - .values() - .stream() + Map, List> buckets = groupByBucket(files); + return buckets.values().stream() .map(this::doFilterWholeBucketByStats) .flatMap(Collection::stream) .collect(Collectors.toList()); @@ -316,4 +439,19 @@ private static boolean noOverlapping(List entries) { return true; } + + /** + * Group manifest entries by (partition, bucket) while preserving order. This is a common + * operation used by both limitPushManifestEntries and postFilterManifestEntries. + */ + private Map, List> groupByBucket( + List entries) { + return entries.stream() + .collect( + Collectors.groupingBy( + // we use LinkedHashMap to avoid disorder + file -> Pair.of(file.partition(), file.bucket()), + LinkedHashMap::new, + Collectors.toList())); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 4419b3890ef5..41efbe25222a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -31,6 +31,9 @@ import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; import org.apache.paimon.types.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -40,6 +43,8 @@ /** {@link TableScan} implementation for batch planning. */ public class DataTableBatchScan extends AbstractDataTableScan { + private static final Logger LOG = LoggerFactory.getLogger(DataTableBatchScan.class); + private StartingScanner startingScanner; private boolean hasNext; @@ -134,6 +139,7 @@ private Optional applyPushDownLimit() { long scannedRowCount = 0; SnapshotReader.Plan plan = ((ScannedResult) result).plan(); List splits = plan.dataSplits(); + LOG.info("Applying limit pushdown. Original splits count: {}", splits.size()); if (splits.isEmpty()) { return Optional.of(result); } @@ -147,6 +153,11 @@ private Optional applyPushDownLimit() { if (scannedRowCount >= pushDownLimit) { SnapshotReader.Plan newPlan = new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits); + LOG.info( + "Limit pushdown applied successfully. Original splits: {}, Limited splits: {}, Pushdown limit: {}", + splits.size(), + limitedSplits.size(), + pushDownLimit); return Optional.of(new ScannedResult(newPlan)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index f904cde0819f..ba30fd7d7fdd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -778,6 +778,7 @@ public static class Builder { private final TableSchema tableSchema; private CoreOptions.ChangelogProducer changelogProducer; + private Boolean pkTableLimitPushDownEnabled; public Builder( String format, @@ -800,6 +801,7 @@ public Builder( this.tableSchema = tableSchema; this.changelogProducer = CoreOptions.ChangelogProducer.NONE; + this.pkTableLimitPushDownEnabled = null; // Use default from CoreOptions } public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer) { @@ -807,6 +809,11 @@ public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer return this; } + public Builder pkTableLimitPushDownEnabled(boolean enabled) { + this.pkTableLimitPushDownEnabled = enabled; + return this; + } + public TestFileStore build() { Options conf = tableSchema == null ? new Options() : Options.fromMap(tableSchema.options()); @@ -829,6 +836,10 @@ public TestFileStore build() { // disable dynamic-partition-overwrite in FileStoreCommit layer test conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false); + if (pkTableLimitPushDownEnabled != null) { + conf.set(CoreOptions.PK_TABLE_LIMIT_PUSH_DOWN_ENABLED, pkTableLimitPushDownEnabled); + } + return new TestFileStore( root, new CoreOptions(conf), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 4f3d5c1c24dd..66962f00fcee 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; import org.apache.paimon.TestFileStore; @@ -30,9 +31,10 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,7 +61,6 @@ public class KeyValueFileStoreScanTest { private TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; private TestFileStore store; - private SnapshotManager snapshotManager; @BeforeEach public void beforeEach() throws Exception { @@ -75,8 +76,8 @@ public void beforeEach() throws Exception { TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, DeduplicateMergeFunction.factory(), null) + .pkTableLimitPushDownEnabled(true) .build(); - snapshotManager = store.snapshotManager(); SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); @@ -271,6 +272,338 @@ public void testDropStatsInPlan() throws Exception { } } + @Test + public void testLimitPushdownWithoutValueFilter() throws Exception { + // Write multiple files to test limit pushdown + List data1 = generateData(50); + writeData(data1); + List data2 = generateData(50); + writeData(data2); + List data3 = generateData(50); + Snapshot snapshot = writeData(data3); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit, should read fewer files (limit pushdown should work) + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10); + List filesWithLimit = scanWithLimit.plan().files(); + // Limit pushdown should reduce the number of files read + assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles); + assertThat(filesWithLimit.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithValueFilter() throws Exception { + // Write data with different item values + List data1 = generateData(50, 0, 100L); + writeData(data1); + List data2 = generateData(50, 0, 200L); + writeData(data2); + List data3 = generateData(50, 0, 300L); + Snapshot snapshot = writeData(data3); + + // Without valueFilter, limit pushdown should work + KeyValueFileStoreScan scanWithoutFilter = store.newScan(); + scanWithoutFilter.withSnapshot(snapshot.id()).withLimit(10); + List filesWithoutFilter = scanWithoutFilter.plan().files(); + int totalFilesWithoutFilter = filesWithoutFilter.size(); + assertThat(totalFilesWithoutFilter).isGreaterThan(0); + + // With valueFilter, limit pushdown should still work (postFilterManifestEntries runs first) + // postFilterManifestEntries filters files first, then limitPushManifestEntries operates + // on the filtered list. Using file.rowCount() as a conservative upper bound ensures + // we don't return fewer than limit rows, even if we might read slightly more files. + KeyValueFileStoreScan scanWithFilter = store.newScan(); + scanWithFilter.withSnapshot(snapshot.id()); + scanWithFilter.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE) + .between(4, 100L, 200L)); + scanWithFilter.withLimit(10); + List filesWithFilter = scanWithFilter.plan().files(); + + // Limit pushdown should work with valueFilter + // The number of files should be less than or equal to the total files after filtering + assertThat(filesWithFilter.size()).isGreaterThan(0); + assertThat(filesWithFilter.size()).isLessThanOrEqualTo(totalFilesWithoutFilter); + } + + @Test + public void testLimitPushdownWithKeyFilter() throws Exception { + // Write data with different shop IDs + List data = generateData(200); + Snapshot snapshot = writeData(data); + + // With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit + // pushdown) + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withKeyFilter( + new PredicateBuilder(RowType.of(new IntType(false))) + .equal(0, data.get(0).key().getInt(0))); + scan.withLimit(5); + List files = scan.plan().files(); + assertThat(files.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownMultipleBuckets() throws Exception { + // Write data to multiple buckets to test limit pushdown across buckets + List data1 = generateData(30); + writeData(data1); + List data2 = generateData(30); + writeData(data2); + List data3 = generateData(30); + Snapshot snapshot = writeData(data3); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit, should read fewer files (limit pushdown should work across buckets) + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(20); + List filesWithLimit = scanWithLimit.plan().files(); + // Limit pushdown should reduce the number of files read + assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles); + assertThat(filesWithLimit.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithSmallLimit() throws Exception { + // Test limit pushdown with a very small limit + List data1 = generateData(100); + writeData(data1); + List data2 = generateData(100); + writeData(data2); + Snapshot snapshot = writeData(data2); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(1); + List files = scan.plan().files(); + // Should read at least one file, but fewer than all files + assertThat(files.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithLargeLimit() throws Exception { + // Test limit pushdown with a large limit (larger than total rows) + List data1 = generateData(50); + writeData(data1); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2); + + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10000); + List filesWithLimit = scanWithLimit.plan().files(); + // With a large limit, should read all files + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownDisabled() throws Exception { + // Test that limit pushdown is disabled when PK_TABLE_LIMIT_PUSH_DOWN_ENABLED is false + TestFileStore storeDisabled = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + null) + .pkTableLimitPushDownEnabled(false) + .build(); + + List data1 = generateData(50); + writeData(data1, storeDisabled); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2, storeDisabled); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = storeDisabled.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit but pushdown disabled, should read all files (no pushdown) + KeyValueFileStoreScan scanWithLimit = storeDisabled.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10); + assertThat(scanWithLimit.supportsLimitPushManifestEntries()).isFalse(); + List filesWithLimit = scanWithLimit.plan().files(); + // Should read all files since limit pushdown is disabled + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownWithPartialUpdateMergeEngine() throws Exception { + // Test that limit pushdown is disabled for PARTIAL_UPDATE merge engine + // Create a store with PARTIAL_UPDATE merge engine by setting it in schema options + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Schema schema = + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.singletonMap( + CoreOptions.MERGE_ENGINE.key(), + CoreOptions.MergeEngine.PARTIAL_UPDATE.name()), + null); + TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema); + + TestFileStore storePartialUpdate = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .pkTableLimitPushDownEnabled(true) + .build(); + + List data1 = generateData(50); + writeData(data1, storePartialUpdate); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2, storePartialUpdate); + + KeyValueFileStoreScan scan = storePartialUpdate.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(10); + // supportsLimitPushManifestEntries should return false for PARTIAL_UPDATE + assertThat(scan.supportsLimitPushManifestEntries()).isFalse(); + + // Should read all files since limit pushdown is disabled + KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + List filesWithLimit = scan.plan().files(); + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownWithAggregateMergeEngine() throws Exception { + // Test that limit pushdown is disabled for AGGREGATE merge engine + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Schema schema = + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.singletonMap( + CoreOptions.MERGE_ENGINE.key(), + CoreOptions.MergeEngine.AGGREGATE.name()), + null); + TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema); + + TestFileStore storeAggregate = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .pkTableLimitPushDownEnabled(true) + .build(); + + List data1 = generateData(50); + writeData(data1, storeAggregate); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2, storeAggregate); + + KeyValueFileStoreScan scan = storeAggregate.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(10); + // supportsLimitPushManifestEntries should return false for AGGREGATE + assertThat(scan.supportsLimitPushManifestEntries()).isFalse(); + + // Should read all files since limit pushdown is disabled + KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + List filesWithLimit = scan.plan().files(); + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownWithDeletionVectors() throws Exception { + // Test that limit pushdown is disabled when deletion vectors are enabled + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Schema schema = + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.singletonMap( + CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"), + null); + TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema); + + TestFileStore storeWithDV = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .pkTableLimitPushDownEnabled(true) + .build(); + + List data1 = generateData(50); + writeData(data1, storeWithDV); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2, storeWithDV); + + KeyValueFileStoreScan scan = storeWithDV.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(10); + // supportsLimitPushManifestEntries should return false when deletion vectors are enabled + assertThat(scan.supportsLimitPushManifestEntries()).isFalse(); + + // Should read all files since limit pushdown is disabled + KeyValueFileStoreScan scanWithoutLimit = storeWithDV.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + List filesWithLimit = scan.plan().files(); + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + private void runTestExactMatch( FileStoreScan scan, Long expectedSnapshotId, Map expected) throws Exception { @@ -307,10 +640,6 @@ private List generateData(int numRecords) { return data; } - private List generateData(int numRecords, int hr) { - return generateData(numRecords, hr, null); - } - private List generateData(int numRecords, int hr, Long itemId) { List data = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { @@ -320,7 +649,11 @@ private List generateData(int numRecords, int hr, Long itemId) { } private Snapshot writeData(List kvs) throws Exception { - List snapshots = store.commitData(kvs, gen::getPartition, this::getBucket); + return writeData(kvs, store); + } + + private Snapshot writeData(List kvs, TestFileStore testStore) throws Exception { + List snapshots = testStore.commitData(kvs, gen::getPartition, this::getBucket); return snapshots.get(snapshots.size() - 1); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index e1426fc8b56d..e27e16eb609e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -1579,4 +1579,186 @@ private void assertResult(int numProducers) { } } } + + @Test + public void testLimitPushdownWithTimeFilter() throws Exception { + // This test verifies that limit pushdown works correctly when valueFilter + // (e.g., time-based where conditions) is present. postFilterManifestEntries runs first + // to filter files, then limitPushManifestEntries operates on the filtered list. + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "ts TIMESTAMP(3), " + + "PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + // Insert data with different timestamps + tEnv.executeSql( + "INSERT INTO T VALUES " + + "(1, 'a', TIMESTAMP '2024-01-01 10:00:00'), " + + "(2, 'b', TIMESTAMP '2024-01-01 11:00:00'), " + + "(3, 'c', TIMESTAMP '2024-01-01 12:00:00'), " + + "(4, 'd', TIMESTAMP '2024-01-01 13:00:00'), " + + "(5, 'e', TIMESTAMP '2024-01-01 14:00:00')") + .await(); + + // Without filter, limit pushdown should work + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) { + List allRows = new ArrayList<>(); + iter.forEachRemaining(allRows::add); + assertThat(allRows.size()).isEqualTo(3); + } + + // Test limit pushdown with time filter (4 rows match, LIMIT 3) + try (CloseableIterator iter = + tEnv.executeSql( + "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 11:00:00' LIMIT 3") + .collect()) { + List filteredRows = new ArrayList<>(); + iter.forEachRemaining(filteredRows::add); + assertThat(filteredRows.size()).isGreaterThanOrEqualTo(3); + assertThat(filteredRows.size()).isLessThanOrEqualTo(4); + for (Row row : filteredRows) { + java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2); + java.time.LocalDateTime filterTime = + java.time.LocalDateTime.parse("2024-01-01T11:00:00"); + assertThat(ts).isAfterOrEqualTo(filterTime); + } + } + + // Test with more restrictive filter (3 rows match, LIMIT 2) + try (CloseableIterator iter = + tEnv.executeSql( + "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 12:00:00' LIMIT 2") + .collect()) { + List filteredRows2 = new ArrayList<>(); + iter.forEachRemaining(filteredRows2::add); + assertThat(filteredRows2.size()).isGreaterThanOrEqualTo(2); + assertThat(filteredRows2.size()).isLessThanOrEqualTo(3); + for (Row row : filteredRows2) { + java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2); + java.time.LocalDateTime filterTime = + java.time.LocalDateTime.parse("2024-01-01T12:00:00"); + assertThat(ts).isAfterOrEqualTo(filterTime); + } + } + } + + @Test + public void testLimitPushdownBasic() throws Exception { + // Test basic limit pushdown + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await(); + + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + + assertThat(rows.size()).isEqualTo(5); + } + } + + @Test + public void testLimitPushdownWithDeletionVector() throws Exception { + // Test limit pushdown is disabled when deletion vector is enabled + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + "'deletion-vectors.enabled' = 'true'" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + + tEnv.executeSql("DELETE FROM T WHERE id = 2").await(); + + // Limit pushdown should be disabled when deletion vector is enabled + // because we can't accurately calculate row count after applying deletion vectors + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + + assertThat(rows.size()).isEqualTo(3); + + for (Row row : rows) { + assertThat(row.getField(0)).isNotEqualTo(2); + } + } + } + + @Test + public void testLimitPushdownWithPkTableLimitPushDownEnabled() throws Exception { + // Test limit pushdown when PK_TABLE_LIMIT_PUSH_DOWN_ENABLED is explicitly enabled + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + "'pk-table-limit-push-down.enabled' = 'true'" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await(); + + // Limit pushdown should work when explicitly enabled + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + assertThat(rows.size()).isEqualTo(5); + } + } + + @Test + public void testLimitPushdownWithPkTableLimitPushDownDisabled() throws Exception { + // Test limit pushdown is disabled when PK_TABLE_LIMIT_PUSH_DOWN_ENABLED is false + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + "'pk-table-limit-push-down.enabled' = 'false'" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await(); + + // Limit pushdown should be disabled, but query should still work correctly + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + // Should still return correct number of rows, just without pushdown optimization + assertThat(rows.size()).isEqualTo(5); + } + } }