From 68e3886904198ad7338da5483419053f577bf575 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Tue, 28 Apr 2026 17:30:17 +0800 Subject: [PATCH 1/2] [server] Skip bucket recovery for non-existent partitions --- .../org/apache/fluss/server/log/LogManager.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index f7f35f4618..d8844e5ea3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.LogStorageException; +import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; @@ -338,6 +339,15 @@ private LogTablet loadLog( PhysicalTablePath physicalTablePath = pathAndBucket.f0; TablePath tablePath = physicalTablePath.getTablePath(); TableInfo tableInfo = getTableInfo(zkClient, tablePath); + + // to check the partition existence + Long partitionId = tableBucket.getPartitionId(); + if (partitionId != null) { + if (!zkClient.getPartitionIdAndNames(tablePath).keySet().contains(partitionId)) { + throw new PartitionNotExistException("Partition not exist"); + } + } + LogTablet logTablet = LogTablet.create( physicalTablePath, @@ -487,9 +497,10 @@ public void run() { loadLog(tabletDir, cleanShutdown, recoveryPoints, conf, clock); } catch (Exception e) { LOG.error("Fail to loadLog from {}", tabletDir, e); - if (e instanceof SchemaNotExistException) { + if (e instanceof SchemaNotExistException + || e instanceof PartitionNotExistException) { LOG.error( - "schema not exist, table for {} has already been dropped, the residual data will be removed.", + "table bucket for {} has already been dropped, the residual data will be removed.", tabletDir, e); FileUtils.deleteDirectoryQuietly(tabletDir); From 61aee288e0279f3588b2e2b1c18970a957eed961 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Tue, 28 Apr 2026 17:39:44 +0800 Subject: [PATCH 2/2] add test --- .../fluss/server/log/LogManagerTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java index 981685a3d1..beab0ec51a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java @@ -29,6 +29,8 @@ import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.BucketAssignment; +import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.utils.clock.SystemClock; @@ -47,6 +49,7 @@ import java.io.File; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -300,6 +303,49 @@ void testDeleteLog(String partitionName) throws Exception { assertThat(logManager.getLog(log1.getTableBucket()).isPresent()).isTrue(); } + /** + * When a partitioned bucket's on-disk data remains but the partition was removed from metadata, + * startup should skip normal recovery and delete the residual log directory. + */ + @Test + void testLogRecoverySkipWhenPartitionDropped() throws Exception { + String partitionName = "2024"; + long partitionId = 11L; + PartitionAssignment partitionAssignment = + new PartitionAssignment( + DATA1_TABLE_ID, Collections.singletonMap(1, BucketAssignment.of(1, 2))); + zkClient.registerPartitionAssignmentAndMetadata( + partitionId, + partitionName, + partitionAssignment, + DEFAULT_REMOTE_DATA_DIR, + tablePath1, + DATA1_TABLE_ID); + + initTableBuckets(partitionName); + LogTablet log1 = getOrCreateLog(tablePath1, partitionName, tableBucket1); + File logDir = log1.getLogDir(); + assertThat(logDir).exists(); + + logManager.shutdown(); + logManager = null; + + zkClient.deletePartition(tablePath1, partitionName); + + LogManager newLogManager = + LogManager.create( + conf, + zkClient, + new FlussScheduler(1), + SystemClock.getInstance(), + TestingMetricGroups.TABLET_SERVER_METRICS); + newLogManager.startup(); + logManager = newLogManager; + + assertThat(logDir).doesNotExist(); + assertThat(logManager.getLog(tableBucket1).isPresent()).isFalse(); + } + private LogTablet getOrCreateLog( TablePath tablePath, String partitionName, TableBucket tableBucket) throws Exception { return logManager.getOrCreateLog(