From 47a7771b734f39ef47068d487d6ba9a43fb28d88 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Tue, 28 Apr 2026 16:28:54 +0800 Subject: [PATCH 1/2] [server] Respect the option of 'table.log.ttl' for local segments --- .../apache/fluss/server/log/LogTablet.java | 24 ++++-- .../log/remote/TieredLocalSegmentTtlTest.java | 84 +++++++++++++++++++ 2 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 59cf99bb17..344cc7189e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -103,6 +103,7 @@ public final class LogTablet { private volatile int tieredLogLocalSegments; private final Clock clock; private final boolean isChangeLog; + private final long logTtlMs; @GuardedBy("lock") private volatile LogOffsetMetadata highWatermarkMetadata; @@ -147,6 +148,7 @@ private LogTablet( (int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL).toMillis(); this.writerStateManager = writerStateManager; this.highWatermarkMetadata = new LogOffsetMetadata(0L); + this.logTtlMs = conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis(); this.scheduler = scheduler; // scheduler the writer expiration interval check. @@ -1172,7 +1174,7 @@ private void deleteOldSegments(long endOffset, SegmentDeletionReason reason) } /** Returns the segments that can be deleted by checking log end offset. */ - private List deletableSegments(long endOffset) { + private List deletableSegments(long endOffset) throws IOException { if (localLog.getSegments().isEmpty()) { return Collections.emptyList(); } @@ -1181,17 +1183,27 @@ private List deletableSegments(long endOffset) { // readers is in progress. List deletableSegments = new ArrayList<>(); List logSegments = localLog.getSegments().values(); - // ignore the segments configured to be retained - for (int i = 0; i < logSegments.size() - tieredLogLocalSegments; i++) { - if (logSegments.get(i + 1).getBaseOffset() <= endOffset) { - deletableSegments.add(logSegments.get(i)); - } else { + int tierProtectedStartIndex = logSegments.size() - tieredLogLocalSegments; + long now = clock.milliseconds(); + + for (int i = 0; i < logSegments.size() - 1; i++) { + if (logSegments.get(i + 1).getBaseOffset() > endOffset) { break; } + if (i < tierProtectedStartIndex || isSegmentExpired(now, logSegments.get(i))) { + deletableSegments.add(logSegments.get(i)); + } } return deletableSegments; } + private boolean isSegmentExpired(long now, LogSegment segment) throws IOException { + if (logTtlMs <= 0L) { + return false; + } + return now - segment.maxTimestampSoFar() > logTtlMs; + } + private void deleteSegments(List deletableSegments, SegmentDeletionReason reason) throws IOException { localLog.checkIfMemoryMappedBufferClosed(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java new file mode 100644 index 0000000000..3c409569dd --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.log.remote; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.log.LogTablet; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; + +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that among the last {@code table.log.tiered.local-segments} local segments, the inactive + * ones can still be removed once {@link ConfigOptions#TABLE_LOG_TTL} has passed (while the default + * long TTL would keep them until offset-based cleanup allows it). + */ +final class TieredLocalSegmentTtlTest extends RemoteLogTestBase { + + @Override + public Configuration getServerConf() { + Configuration conf = super.getServerConf(); + conf.set(ConfigOptions.TABLE_LOG_TTL, Duration.ofMillis(50)); + return conf; + } + + @BeforeEach + public void setup() throws Exception { + super.setup(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testInactiveTieredLocalSegmentRemovedAfterTtl(boolean partitionTable) throws Exception { + TableBucket tb = + partitionTable + ? new TableBucket(DATA1_TABLE_ID, 0L, 0) + : new TableBucket(DATA1_TABLE_ID, 0); + + makeKvTableAsLeader(tb, DATA1_TABLE_PATH_PK, INITIAL_LEADER_EPOCH, partitionTable); + LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet(); + + addMultiSegmentsToLogTablet(logTablet, 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + logTablet.updateRemoteLogEndOffset(40L); + assertThat(logTablet.getSegments()).hasSize(5); + + logTablet.updateMinRetainOffset(33L); + assertThat(logTablet.getSegments()).hasSize(2); + + // Below the inactive tier-retained segment's TTL: cleanup runs but segment stays. + logTablet.updateMinRetainOffset(34L); + assertThat(logTablet.getSegments()).hasSize(2); + + manualClock.advanceTime(Duration.ofMillis(200)); + + logTablet.updateMinRetainOffset(35L); + assertThat(logTablet.getSegments()).hasSize(1); + } +} From e9c812c09f544846fe75186d5d4ff2ff5d836411 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Wed, 29 Apr 2026 11:50:46 +0800 Subject: [PATCH 2/2] optimize --- .../apache/fluss/server/log/LogTablet.java | 26 ++++++ .../server/log/remote/LogTieringTask.java | 1 + .../server/log/remote/RemoteLogTTLTest.java | 67 +++++++++++++-- .../log/remote/TieredLocalSegmentTtlTest.java | 84 ------------------- 4 files changed, 85 insertions(+), 93 deletions(-) delete mode 100644 fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 344cc7189e..c0c8378a00 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -917,6 +917,32 @@ public void roll(Optional expectedNextOffset) throws Exception { } } + /** + * Rolls the active segment if it is non-empty and all of its records have passed the log TTL. + * + *

This keeps an empty active segment available for future appends while making the expired + * segment eligible for remote tiering and local cleanup. + */ + public void rollActiveSegmentIfExpired() throws Exception { + if (logTtlMs <= 0L) { + return; + } + + synchronized (lock) { + LogSegment activeSegment = localLog.getSegments().activeSegment(); + if (activeSegment.getSizeInBytes() == 0 + || !isSegmentExpired(clock.milliseconds(), activeSegment)) { + return; + } + + LOG.info( + "Rolling expired active log segment {} for bucket {}.", + activeSegment, + getTableBucket()); + roll(Optional.empty()); + } + } + /** Truncate this log so that it ends with the greatest offset < targetOffset. */ boolean truncateTo(long targetOffset) throws LogStorageException { if (targetOffset < 0) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java index 8c7d0d8832..06e531cf6d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java @@ -127,6 +127,7 @@ private void runOnce() throws InterruptedException { TableMetricGroup metricGroup = replica.tableMetrics(); maybeUpdateCopiedOffset(logTablet); + logTablet.rollActiveSegmentIfExpired(); // Get these candidate log segments to copy and these expired remote log segments to // clean up. List candidateToCopySegments = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java index d23667431f..216c59d883 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.server.log.LogTablet; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -33,7 +34,9 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId; import static org.assertj.core.api.Assertions.assertThat; /** Test for remote log ttl in {@link RemoteLogManager}. */ @@ -72,11 +75,13 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { // advance time past TTL (7 days) manualClock.advanceTime(Duration.ofDays(7).plusHours(1)); - // since data lake is enabled and no data has been tiered to data lake, - // the expired segments should not be deleted. + // Since data lake is enabled and no data has been tiered to data lake, + // the expired segments should not be deleted. The expired active segment is rolled + // and uploaded in this task run. remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); - assertThat(remoteLog.allRemoteLogSegments()).hasSize(4); + assertThat(remoteLog.allRemoteLogSegments()).hasSize(5); assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L); // set lake log end offset to 20, meaning only the first 2 segments // ([0,10) and [10,20)) have been tiered to lake @@ -86,10 +91,10 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); // only segments with remoteLogEndOffset <= 20 should be deleted (first 2 segments) - // remaining segments: [20,30) and [30,40) - assertThat(remoteLog.allRemoteLogSegments()).hasSize(2); + // remaining segments: [20,30), [30,40) and [40,50) + assertThat(remoteLog.allRemoteLogSegments()).hasSize(3); assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(20L); - assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(40L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L); // verify remaining segments have the expected offsets assertThat(remoteLog.allRemoteLogSegments()) .allSatisfy( @@ -99,7 +104,14 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { // now advance lake log end offset to include all remaining segments logTablet.updateLakeLogEndOffset(40L); - // trigger again, remaining expired segments should now be deleted + // trigger again, segments whose end offset is <= 40 should now be deleted + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + assertThat(remoteLog.allRemoteLogSegments()).hasSize(1); + assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(40L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L); + + logTablet.updateLakeLogEndOffset(50L); + // trigger again, all remaining expired segments should now be deleted remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); assertThat(remoteLog.allRemoteLogSegments()).isEmpty(); assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); @@ -107,8 +119,8 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { // Fetch records from remote. // mock to update remote log end offset and remote log start offset as // NotifyRemoteLogOffsetsRequest do. - logTablet.updateRemoteLogStartOffset(40L); - logTablet.updateRemoteLogEndOffset(40L); + logTablet.updateRemoteLogStartOffset(50L); + logTablet.updateRemoteLogEndOffset(50L); CompletableFuture> future = new CompletableFuture<>(); replicaManager.fetchLogRecords( @@ -122,4 +134,41 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { assertThat(resultForBucket.getErrorCode()) .isEqualTo(Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION.code()); } + + @Test + void testExpiredActiveSegmentRolledUploadedAndLocallyDeleted() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + makeLogTableAsLeader(tb, false); + LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet(); + + addMultiSegmentsToLogTablet(logTablet, 1); + assertThat(logTablet.getSegments()).hasSize(1); + assertThat(logTablet.activeLogSegment().getSizeInBytes()).isGreaterThan(0); + + manualClock.advanceTime(Duration.ofDays(7).plusHours(1)); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb); + assertThat(remoteLog.allRemoteLogSegments()).hasSize(1); + assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(10L); + assertThat(logTablet.getSegments()).hasSize(1); + assertThat(logTablet.activeLogSegment().getBaseOffset()).isEqualTo(10L); + assertThat(logTablet.activeLogSegment().getSizeInBytes()).isZero(); + + logTablet.appendAsLeader( + genMemoryLogRecordsWithWriterId( + Collections.singletonList(DATA1.get(0)), + manualClock.milliseconds(), + 0, + 10L)); + + assertThat(logTablet.localLogEndOffset()).isEqualTo(11L); + assertThat(logTablet.activeLogSegment().getBaseOffset()).isEqualTo(10L); + assertThat(logTablet.activeLogSegment().getSizeInBytes()).isGreaterThan(0); + + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + assertThat(remoteLog.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java deleted file mode 100644 index 3c409569dd..0000000000 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/TieredLocalSegmentTtlTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.log.remote; - -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.server.log.LogTablet; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.time.Duration; - -import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; -import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; -import static org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Verifies that among the last {@code table.log.tiered.local-segments} local segments, the inactive - * ones can still be removed once {@link ConfigOptions#TABLE_LOG_TTL} has passed (while the default - * long TTL would keep them until offset-based cleanup allows it). - */ -final class TieredLocalSegmentTtlTest extends RemoteLogTestBase { - - @Override - public Configuration getServerConf() { - Configuration conf = super.getServerConf(); - conf.set(ConfigOptions.TABLE_LOG_TTL, Duration.ofMillis(50)); - return conf; - } - - @BeforeEach - public void setup() throws Exception { - super.setup(); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testInactiveTieredLocalSegmentRemovedAfterTtl(boolean partitionTable) throws Exception { - TableBucket tb = - partitionTable - ? new TableBucket(DATA1_TABLE_ID, 0L, 0) - : new TableBucket(DATA1_TABLE_ID, 0); - - makeKvTableAsLeader(tb, DATA1_TABLE_PATH_PK, INITIAL_LEADER_EPOCH, partitionTable); - LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet(); - - addMultiSegmentsToLogTablet(logTablet, 5); - remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); - - logTablet.updateRemoteLogEndOffset(40L); - assertThat(logTablet.getSegments()).hasSize(5); - - logTablet.updateMinRetainOffset(33L); - assertThat(logTablet.getSegments()).hasSize(2); - - // Below the inactive tier-retained segment's TTL: cleanup runs but segment stays. - logTablet.updateMinRetainOffset(34L); - assertThat(logTablet.getSegments()).hasSize(2); - - manualClock.advanceTime(Duration.ofMillis(200)); - - logTablet.updateMinRetainOffset(35L); - assertThat(logTablet.getSegments()).hasSize(1); - } -}