diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java index d2fa4269b8..2e4431a256 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/source/TestingLakeSource.java @@ -57,7 +57,7 @@ public void withLimit(int limit) {} @Override public FilterPushDownResult withFilters(List predicates) { - return null; + return FilterPushDownResult.of(predicates, new ArrayList<>()); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java index f26ebddfdd..5a25ed6e12 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java @@ -55,6 +55,7 @@ public class LakeSplitGenerator { private final TableInfo tableInfo; private final Admin flussAdmin; private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever; + private final OffsetsInitializer startingOffsetInitializer; private final OffsetsInitializer stoppingOffsetInitializer; private final int bucketCount; private final Supplier> listPartitionSupplier; @@ -66,6 +67,7 @@ public LakeSplitGenerator( Admin flussAdmin, LakeSource lakeSource, OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever, + OffsetsInitializer startingOffsetInitializer, OffsetsInitializer stoppingOffsetInitializer, int bucketCount, Supplier> listPartitionSupplier) { @@ -73,6 +75,7 @@ public LakeSplitGenerator( this.flussAdmin = flussAdmin; this.lakeSource = lakeSource; this.bucketOffsetsRetriever = bucketOffsetsRetriever; + this.startingOffsetInitializer = startingOffsetInitializer; this.stoppingOffsetInitializer = stoppingOffsetInitializer; this.bucketCount = bucketCount; this.listPartitionSupplier = listPartitionSupplier; @@ -229,32 +232,28 @@ private List generateSplit( if (lakeSplits != null) { splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, partitionId)); } + Map bucketStartOffset = + startingOffsetInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); for (int bucket = 0; bucket < bucketCount; bucket++) { TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucket); Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket); + Long startingOffset = bucketStartOffset.get(bucket); Long stoppingOffset = bucketEndOffset.get(bucket); - if (snapshotLogOffset == null) { - // no data committed to lake for this bucket, scan from fluss log - if (stoppingOffset == NO_STOPPING_OFFSET || stoppingOffset > 0) { - splits.add( - new LogSplit( - tableBucket, - partitionName, - EARLIEST_OFFSET, - stoppingOffset)); - } - } else { - // need to read remain fluss log - if (stoppingOffset == NO_STOPPING_OFFSET - || snapshotLogOffset < stoppingOffset) { - splits.add( - new LogSplit( - tableBucket, - partitionName, - snapshotLogOffset, - stoppingOffset)); - } + long splitStartingOffset = + snapshotLogOffset == null + ? startingOffset + : Math.max(snapshotLogOffset, startingOffset); + if (stoppingOffset == NO_STOPPING_OFFSET || splitStartingOffset < stoppingOffset) { + splits.add( + new LogSplit( + tableBucket, + partitionName, + splitStartingOffset, + stoppingOffset)); } } } else { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 77fd1e6a8c..cecfda57d8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -44,6 +44,8 @@ import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; import org.apache.fluss.predicate.PredicateVisitor; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.types.DataField; import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.RowType; @@ -102,6 +104,9 @@ import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE; import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals; import static org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate; +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** Flink table source to scan Fluss data. */ @@ -169,6 +174,9 @@ public class FlinkTableSource @Nullable private LakeSource lakeSource; @Nullable private Predicate logRecordBatchFilter; + // to avoid duplicate lake filter + private List acceptedLakePredicates = Collections.emptyList(); + public FlinkTableSource( TablePath tablePath, Configuration flussConfig, @@ -346,6 +354,11 @@ public boolean isBounded() { case TIMESTAMP: offsetsInitializer = OffsetsInitializer.timestamp(startupOptions.startupTimestampMs); + // todo: startup timestamp filter for PK table + enableLakeSource = lakeSource != null && !hasPrimaryKey(); + if (enableLakeSource) { + applyTimestampStartupLakeFilter(startupOptions.startupTimestampMs); + } break; default: throw new IllegalArgumentException( @@ -647,15 +660,57 @@ private void pushdownLakeFilters( LakeSource.FilterPushDownResult filterPushDownResult = checkNotNull(lakeSource).withFilters(lakePredicates); - Set acceptedLakePredicates = + Set lakeAcceptedPredicateSet = Collections.newSetFromMap(new IdentityHashMap()); - acceptedLakePredicates.addAll(filterPushDownResult.acceptedPredicates()); + lakeAcceptedPredicateSet.addAll(filterPushDownResult.acceptedPredicates()); + List acceptedPredicates = new ArrayList<>(); for (int i = 0; i < lakePredicates.size(); i++) { - if (acceptedLakePredicates.contains(lakePredicates.get(i)) - && !acceptedFilters.contains(convertedFilters.get(i))) { - acceptedFilters.add(convertedFilters.get(i)); + if (lakeAcceptedPredicateSet.contains(lakePredicates.get(i))) { + acceptedPredicates.add(lakePredicates.get(i)); + if (!acceptedFilters.contains(convertedFilters.get(i))) { + acceptedFilters.add(convertedFilters.get(i)); + } } } + this.acceptedLakePredicates = acceptedPredicates; + } + + private void applyTimestampStartupLakeFilter(long startupTimestampMs) { + Predicate timestampPredicate = createLakeTimestampPredicate(startupTimestampMs); + List lakePredicates = new ArrayList<>(acceptedLakePredicates); + lakePredicates.add(timestampPredicate); + + LakeSource.FilterPushDownResult filterPushDownResult = + checkNotNull(lakeSource).withFilters(lakePredicates); + if (!filterPushDownResult.acceptedPredicates().containsAll(acceptedLakePredicates)) { + throw new IllegalStateException( + String.format( + "Lake source for table %s does not preserve accepted user filters when applying timestamp startup filter %s.", + tablePath, timestampPredicate)); + } + if (!filterPushDownResult.acceptedPredicates().contains(timestampPredicate)) { + throw new IllegalStateException( + String.format( + "Lake source for table %s does not support timestamp startup filter %s.", + tablePath, timestampPredicate)); + } + acceptedLakePredicates = filterPushDownResult.acceptedPredicates(); + } + + private Predicate createLakeTimestampPredicate(long startupTimestampMs) { + RowType rowType = FlinkConversions.toFlussRowType(tableOutputType); + List lakeFields = new ArrayList<>(rowType.getFields()); + lakeFields.add(new DataField(BUCKET_COLUMN_NAME, org.apache.fluss.types.DataTypes.INT())); + lakeFields.add( + new DataField(OFFSET_COLUMN_NAME, org.apache.fluss.types.DataTypes.BIGINT())); + lakeFields.add( + new DataField( + TIMESTAMP_COLUMN_NAME, org.apache.fluss.types.DataTypes.TIMESTAMP_LTZ(3))); + RowType lakeRowType = new RowType(rowType.isNullable(), lakeFields); + PredicateBuilder predicateBuilder = new PredicateBuilder(lakeRowType); + return predicateBuilder.greaterOrEqual( + predicateBuilder.indexOf(TIMESTAMP_COLUMN_NAME), + TimestampLtz.fromEpochMillis(startupTimestampMs)); } /** diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 3ad234aa01..a193002e49 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -750,6 +750,7 @@ private List generateHybridLakeFlussSplits() { flussAdmin, lakeSource, bucketOffsetsRetriever, + startingOffsetsInitializer, stoppingOffsetsInitializer, tableInfo.getNumBuckets(), this::listPartitions); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitGeneratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitGeneratorTest.java new file mode 100644 index 0000000000..ce8d2d55c3 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitGeneratorTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.lake; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.initializer.OffsetsInitializer; +import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; +import org.apache.fluss.flink.source.split.LogSplit; +import org.apache.fluss.flink.source.split.SourceSplitBase; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.lake.source.TestingLakeSource; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.ResolvedPartitionSpec; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** Test for {@link LakeSplitGenerator}. */ +class LakeSplitGeneratorTest { + + private static final TablePath TABLE_PATH = TablePath.of("fluss", "test_table"); + private static final long TABLE_ID = 1L; + + @Test + void testLogTableHybridSplitsStartFromMaxSnapshotAndTimestampOffset() throws Exception { + Admin admin = mock(Admin.class); + when(admin.getReadableLakeSnapshot(TABLE_PATH)) + .thenReturn( + CompletableFuture.completedFuture( + new LakeSnapshot( + 1L, + Collections.singletonMap( + new TableBucket(TABLE_ID, 0), 50L)))); + + LakeSource lakeSource = + new TestingLakeSource( + 2, + Collections.singletonList( + new PartitionInfo( + -1L, + new ResolvedPartitionSpec( + Collections.emptyList(), Collections.emptyList()), + DEFAULT_REMOTE_DATA_DIR))); + LakeSplitGenerator generator = + new LakeSplitGenerator( + createLogTableInfo(), + admin, + lakeSource, + new TestingBucketOffsetsRetriever(), + OffsetsInitializer.timestamp(1_000L), + OffsetsInitializer.latest(), + 2, + Collections::emptySet); + + List splits = generator.generateHybridLakeFlussSplits(); + + assertThat(splits).filteredOn(split -> split instanceof LakeSnapshotSplit).hasSize(2); + assertThat( + splits.stream() + .filter(split -> split instanceof LogSplit) + .map(split -> (LogSplit) split) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + new LogSplit(new TableBucket(TABLE_ID, 0), null, 70L, 100L), + new LogSplit(new TableBucket(TABLE_ID, 1), null, 30L, 100L)); + } + + private static TableInfo createLogTableInfo() { + return new TableInfo( + TABLE_PATH, + TABLE_ID, + 1, + Schema.newBuilder().column("id", DataTypes.INT()).build(), + Collections.emptyList(), + Collections.emptyList(), + 2, + new Configuration(), + new Configuration(), + null, + null, + 0L, + 0L); + } + + private static class TestingBucketOffsetsRetriever + implements OffsetsInitializer.BucketOffsetsRetriever { + + @Override + public Map latestOffsets(String partitionName, Collection buckets) { + return offsets(buckets, 100L); + } + + @Override + public Map earliestOffsets( + String partitionName, Collection buckets) { + return offsets(buckets, 0L); + } + + @Override + public Map offsetsFromTimestamp( + String partitionName, Collection buckets, long timestamp) { + Map offsets = new HashMap<>(); + offsets.put(0, 70L); + offsets.put(1, 30L); + return offsets; + } + + private Map offsets(Collection buckets, long offset) { + Map offsets = new HashMap<>(); + for (Integer bucket : buckets) { + offsets.put(bucket, offset); + } + return offsets; + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceLakeStartupTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceLakeStartupTest.java new file mode 100644 index 0000000000..e8e9ce712b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceLakeStartupTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.flink.FlinkConnectorOptions; +import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.TestingLakeSource; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.row.TimestampLtz; + +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.shaded.guava31.com.google.common.collect.Maps; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.source.ScanTableSource.ScanContext; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** Tests for lake source startup behavior in {@link FlinkTableSource}. */ +class FlinkTableSourceLakeStartupTest { + + @Test + void testTimestampFilter() throws Exception { + long startupTimestampMs = 12_345L; + RecordingLakeSource lakeSource = new RecordingLakeSource(true); + FlinkTableSource tableSource = + createTableSource(new int[0], timestampStartup(startupTimestampMs)); + setField(tableSource, "lakeSource", lakeSource); + + Source source = createSource(tableSource); + + assertThat(getField(source, "lakeSource")).isSameAs(lakeSource); + assertThat(lakeSource.withFiltersCalls).isOne(); + assertThat(lakeSource.pushedPredicates).hasSize(1); + LeafPredicate timestampPredicate = (LeafPredicate) lakeSource.pushedPredicates.get(0); + assertThat(timestampPredicate.fieldName()).isEqualTo("__timestamp"); + assertThat(timestampPredicate.index()).isEqualTo(4); + assertThat(timestampPredicate.literals()) + .containsExactly(TimestampLtz.fromEpochMillis(startupTimestampMs)); + } + + @Test + void testPkTableWithoutTimestampFilter() throws Exception { + RecordingLakeSource lakeSource = new RecordingLakeSource(true); + FlinkTableSource tableSource = createTableSource(new int[] {0}, timestampStartup(1_000L)); + setField(tableSource, "lakeSource", lakeSource); + + Source source = createSource(tableSource); + + assertThat(getField(source, "lakeSource")).isNull(); + assertThat(lakeSource.withFiltersCalls).isZero(); + } + + private static FlinkTableSource createTableSource( + int[] primaryKeyIndexes, FlinkConnectorOptionsUtils.StartupOptions startupOptions) { + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())) + .getLogicalType(); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + return new FlinkTableSource( + TablePath.of("test_db", "test_table"), + flussConfig, + new TableConfig(new Configuration()), + tableOutputType, + primaryKeyIndexes, + new int[0], + new int[0], + true, + startupOptions, + false, + false, + null, + 1_000L, + false, + null, + Maps.newHashMap(), + null); + } + + private static FlinkConnectorOptionsUtils.StartupOptions timestampStartup( + long startupTimestampMs) { + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.TIMESTAMP; + startupOptions.startupTimestampMs = startupTimestampMs; + return startupOptions; + } + + private static Source createSource(FlinkTableSource tableSource) { + ScanRuntimeProvider provider = tableSource.getScanRuntimeProvider(mock(ScanContext.class)); + return ((SourceProvider) provider).createSource(); + } + + private static Object getField(Object target, String fieldName) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(target); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private static class RecordingLakeSource extends TestingLakeSource { + private final boolean acceptFilters; + private int withFiltersCalls; + private List pushedPredicates = Collections.emptyList(); + + private RecordingLakeSource(boolean acceptFilters) { + this.acceptFilters = acceptFilters; + } + + @Override + public LakeSource.FilterPushDownResult withFilters(List predicates) { + withFiltersCalls++; + pushedPredicates = new ArrayList<>(predicates); + if (acceptFilters) { + return LakeSource.FilterPushDownResult.of(predicates, Collections.emptyList()); + } else { + return LakeSource.FilterPushDownResult.of( + Collections.emptyList(), new ArrayList<>(predicates)); + } + } + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java index b75ce8ad44..20476a1c07 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java @@ -39,6 +39,10 @@ import javax.annotation.Nullable; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @@ -100,9 +104,25 @@ public static Object toIcebergLiteral(Types.NestedField field, Object flussLiter Types.StructType.of(field), RowType.of(convertIcebergTypeToFlussType(field.type())), flussRow); + if (field.type() instanceof Types.TimestampType) { + return toIcebergTimestampLiteral(flussRowAsIcebergRecord.get(0)); + } return flussRowAsIcebergRecord.get(0, field.type().typeId().javaClass()); } + private static long toIcebergTimestampLiteral(Object timestamp) { + if (timestamp instanceof OffsetDateTime) { + OffsetDateTime offsetDateTime = (OffsetDateTime) timestamp; + return offsetDateTime.toEpochSecond() * 1_000_000L + offsetDateTime.getNano() / 1_000L; + } else if (timestamp instanceof LocalDateTime) { + return ChronoUnit.MICROS.between( + LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC), (LocalDateTime) timestamp); + } else { + throw new IllegalArgumentException( + "Unsupported Iceberg timestamp literal: " + timestamp); + } + } + /** Converts Iceberg data types to Fluss data types. */ private static DataType convertIcebergTypeToFlussType(Type icebergType) { if (icebergType instanceof Types.BooleanType) { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java index 15774d4f79..7eafe3583c 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java @@ -145,6 +145,56 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception { } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadLogTableStartupTimestampFilter(boolean isPartitioned) throws Exception { + // first of all, start tiering + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = + "logTable_timestampFilter_" + (isPartitioned ? "partitioned" : "non_partitioned"); + + TablePath t1 = TablePath.of(DEFAULT_DB, tableName); + List writtenRows = new ArrayList<>(); + // insert with 30 rows + long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows); + // wait until records has been synced + waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned); + + // now, start to read the log table, which will read iceberg + // may read fluss or not, depends on the log offset of iceberg snapshot + List actual = + CollectionUtil.iteratorToList( + batchTEnv.executeSql("select * from " + tableName).collect()); + assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows); + + long startupTimestampMillis = System.currentTimeMillis(); + List afterTimestampRows = new ArrayList<>(); + + // write some log data again, that will be tiered into the lake + afterTimestampRows.addAll(writeRows(t1, 3, isPartitioned)); + + // cancel the tiering job + jobClient.cancel().get(); + + // write some logs again, but will only be reserved in the fluss log tablet + afterTimestampRows.addAll(writeRows(t1, 3, isPartitioned)); + + String timestampStartupOptions = + String.format( + "/*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%d') */", + startupTimestampMillis); + actual = + CollectionUtil.iteratorToList( + batchTEnv + .executeSql( + String.format( + "select * from %s %s", + tableName, timestampStartupOptions)) + .collect()); + assertThat(actual).containsExactlyInAnyOrderElementsOf(afterTimestampRows); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java index d69def5357..ae14c7e128 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.predicate.PredicateBuilder; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.IntType; import org.apache.fluss.types.RowType; @@ -89,7 +90,8 @@ void testWithFilters() throws Exception { "name" + i, 0, (long) i, - OffsetDateTime.now(ZoneOffset.UTC))); + OffsetDateTime.ofInstant( + java.time.Instant.ofEpochMilli(i * 1000L), ZoneOffset.UTC))); } writeRecord(table, rows, null, 0); @@ -104,7 +106,8 @@ void testWithFilters() throws Exception { "name" + i, 0, (long) i, - OffsetDateTime.now(ZoneOffset.UTC))); + OffsetDateTime.ofInstant( + java.time.Instant.ofEpochMilli(i * 1000L), ZoneOffset.UTC))); } writeRecord(table, rows, null, 0); table.refresh(); @@ -140,6 +143,36 @@ void testWithFilters() throws Exception { } assertThat(actual.toString()).isEqualTo("[+I[2, name2], +I[3, name3]]"); + // test __timestamp filter + Predicate timestampFilter = + new PredicateBuilder( + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.TIMESTAMP_LTZ(3))) + .greaterOrEqual(4, TimestampLtz.fromEpochMillis(14_000L)); + lakeSource = lakeStorage.createLakeSource(tablePath); + filterPushDownResult = lakeSource.withFilters(Collections.singletonList(timestampFilter)); + assertThat(filterPushDownResult.acceptedPredicates()) + .isEqualTo(Collections.singletonList(timestampFilter)); + assertThat(filterPushDownResult.remainingPredicates()).isEmpty(); + + icebergSplits = lakeSource.createPlanner(() -> table.currentSnapshot().snapshotId()).plan(); + actual = new ArrayList<>(); + for (IcebergSplit split : icebergSplits) { + recordReader = lakeSource.createRecordReader(() -> split); + try (CloseableIterator iterator = recordReader.read()) { + actual.addAll( + convertToFlinkRow( + fieldGetters, + TransformingCloseableIterator.transform( + iterator, LogRecord::getRow))); + } + } + assertThat(actual.toString()).isEqualTo("[+I[14, name14], +I[15, name15], +I[16, name16]]"); + // test mix one unaccepted filter Predicate nonConvertibleFilter = FLUSS_BUILDER.endsWith(1, BinaryString.fromString("name")); allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java index 5c18af9e74..fe7f2e165b 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java @@ -141,6 +141,56 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception { } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testReadLogTableStartupTimestampFilter(boolean isPartitioned) throws Exception { + // first of all, start tiering + JobClient jobClient = buildTieringJob(execEnv); + + String tableName = + "logTable_timestampFilter_" + (isPartitioned ? "partitioned" : "non_partitioned"); + + TablePath t1 = TablePath.of(DEFAULT_DB, tableName); + List writtenRows = new ArrayList<>(); + // insert with 30 rows + long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows); + // wait until records has been synced + waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned); + + // now, start to read the log table, which will read iceberg + // may read fluss or not, depends on the log offset of iceberg snapshot + List actual = + CollectionUtil.iteratorToList( + batchTEnv.executeSql("select * from " + tableName).collect()); + assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows); + + long startupTimestampMillis = System.currentTimeMillis(); + List afterTimestampRows = new ArrayList<>(); + + // write some log data again, that will be tiered into the lake + afterTimestampRows.addAll(writeRows(t1, 3, isPartitioned)); + + // cancel the tiering job + jobClient.cancel().get(); + + // write some logs again, but will only be reserved in the fluss log tablet + afterTimestampRows.addAll(writeRows(t1, 3, isPartitioned)); + + String timestampStartupOptions = + String.format( + "/*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%d') */", + startupTimestampMillis); + actual = + CollectionUtil.iteratorToList( + batchTEnv + .executeSql( + String.format( + "select * from %s %s", + tableName, timestampStartupOptions)) + .collect()); + assertThat(actual).containsExactlyInAnyOrderElementsOf(afterTimestampRows); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java index d1c8d04905..56271bb56b 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java @@ -28,6 +28,7 @@ import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.IntType; @@ -62,7 +63,7 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase { .column("name", org.apache.paimon.types.DataTypes.STRING()) .column("__bucket", org.apache.paimon.types.DataTypes.INT()) .column("__offset", org.apache.paimon.types.DataTypes.BIGINT()) - .column("__timestamp", org.apache.paimon.types.DataTypes.TIMESTAMP(6)) + .column("__timestamp", org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()) .primaryKey("id") .option(CoreOptions.BUCKET.key(), "1") .build(); @@ -89,7 +90,7 @@ void testWithFilters() throws Exception { BinaryString.fromString("name" + i), 0, (long) i, - Timestamp.fromEpochMillis(System.currentTimeMillis()))); + Timestamp.fromEpochMillis(i * 1000L))); } writeRecord(tablePath, rows); @@ -102,7 +103,7 @@ void testWithFilters() throws Exception { BinaryString.fromString("name" + i), 0, (long) i, - Timestamp.fromEpochMillis(System.currentTimeMillis()))); + Timestamp.fromEpochMillis(i * 1000L))); } writeRecord(tablePath, rows); @@ -140,6 +141,37 @@ void testWithFilters() throws Exception { } assertThat(actual.toString()).isEqualTo("[+I[2, name2], +I[3, name3]]"); + Predicate timestampFilter = + new PredicateBuilder( + RowType.of( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.TIMESTAMP_LTZ(3))) + .greaterOrEqual(4, TimestampLtz.fromEpochMillis(10_000L)); + lakeSource = lakeStorage.createLakeSource(tablePath); + filterPushDownResult = lakeSource.withFilters(Collections.singletonList(timestampFilter)); + assertThat(filterPushDownResult.acceptedPredicates()) + .isEqualTo(Collections.singletonList(timestampFilter)); + assertThat(filterPushDownResult.remainingPredicates()).isEmpty(); + + paimonSplits = lakeSource.createPlanner(() -> 2).plan(); + actual = new ArrayList<>(); + for (PaimonSplit split : paimonSplits) { + recordReader = lakeSource.createRecordReader(() -> split); + try (CloseableIterator iterator = recordReader.read()) { + actual.addAll( + convertToFlinkRow( + fieldGetters, + TransformingCloseableIterator.transform( + iterator, LogRecord::getRow))); + } + } + assertThat(actual.toString()) + .isEqualTo( + "[+I[10, name10], +I[11, name11], +I[12, name12], +I[13, name13], +I[14, name14]]"); + // test mix one unaccepted filter Predicate nonConvertibleFilter = new LeafPredicate(