diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 62d5d9df14..07751fa091 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -99,10 +99,16 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { // check the status of replica after synced assertReplicaStatus(bucketLogEndOffset); + List partitions = new ArrayList<>(); + if (isPartitioned) { + partitions.addAll(waitUntilPartitions(t1).values()); + Collections.sort(partitions); + } + // will read paimon snapshot, won't merge log since it's empty List resultEmptyLog = toSortedRows(batchTEnv.executeSql("select * from " + tableName)); - String expetedResultFromPaimon = buildExpectedResult(isPartitioned, 0, 1); + String expetedResultFromPaimon = buildExpectedResult(isPartitioned, partitions, 0, 1); assertThat(resultEmptyLog.toString().replace("+U", "+I")) .isEqualTo(expetedResultFromPaimon); @@ -379,7 +385,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception { // now, query the result, it must be the union result of lake snapshot and log List result = toSortedRows(batchTEnv.executeSql("select * from " + tableName)); - String expectedResult = buildExpectedResult(isPartitioned, 0, 2); + String expectedResult = buildExpectedResult(isPartitioned, partitions, 0, 2); assertThat(result.toString().replace("+U", "+I")).isEqualTo(expectedResult); // query with project push down @@ -987,7 +993,8 @@ private Map getBucketLogEndOffset( return bucketLogEndOffsets; } - private String buildExpectedResult(boolean isPartitioned, int record1, int record2) { + private String buildExpectedResult( + boolean isPartitioned, List partitions, int record1, int record2) { List records = new ArrayList<>(); records.add( "+I[false, 1, 2, 3, 4, 5.1, 6.0, string, 0.09, 10, " @@ -1014,10 +1021,10 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor if (isPartitioned) { return String.format( "[%s, %s, %s, %s]", - String.format(records.get(record1), "2025"), - String.format(records.get(record1), "2026"), - String.format(records.get(record2), "2025"), - String.format(records.get(record2), "2026")); + String.format(records.get(record1), partitions.get(0)), + String.format(records.get(record1), partitions.get(1)), + String.format(records.get(record2), partitions.get(0)), + String.format(records.get(record2), partitions.get(1))); } else { return String.format( "[%s, %s]",