diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 59cf99bb17..c0c8378a00 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -103,6 +103,7 @@ public final class LogTablet { private volatile int tieredLogLocalSegments; private final Clock clock; private final boolean isChangeLog; + private final long logTtlMs; @GuardedBy("lock") private volatile LogOffsetMetadata highWatermarkMetadata; @@ -147,6 +148,7 @@ private LogTablet( (int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL).toMillis(); this.writerStateManager = writerStateManager; this.highWatermarkMetadata = new LogOffsetMetadata(0L); + this.logTtlMs = conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis(); this.scheduler = scheduler; // scheduler the writer expiration interval check. @@ -915,6 +917,32 @@ public void roll(Optional expectedNextOffset) throws Exception { } } + /** + * Rolls the active segment if it is non-empty and all of its records have passed the log TTL. + * + *

This keeps an empty active segment available for future appends while making the expired + * segment eligible for remote tiering and local cleanup. + */ + public void rollActiveSegmentIfExpired() throws Exception { + if (logTtlMs <= 0L) { + return; + } + + synchronized (lock) { + LogSegment activeSegment = localLog.getSegments().activeSegment(); + if (activeSegment.getSizeInBytes() == 0 + || !isSegmentExpired(clock.milliseconds(), activeSegment)) { + return; + } + + LOG.info( + "Rolling expired active log segment {} for bucket {}.", + activeSegment, + getTableBucket()); + roll(Optional.empty()); + } + } + /** Truncate this log so that it ends with the greatest offset < targetOffset. */ boolean truncateTo(long targetOffset) throws LogStorageException { if (targetOffset < 0) { @@ -1172,7 +1200,7 @@ private void deleteOldSegments(long endOffset, SegmentDeletionReason reason) } /** Returns the segments that can be deleted by checking log end offset. */ - private List deletableSegments(long endOffset) { + private List deletableSegments(long endOffset) throws IOException { if (localLog.getSegments().isEmpty()) { return Collections.emptyList(); } @@ -1181,17 +1209,27 @@ private List deletableSegments(long endOffset) { // readers is in progress. List deletableSegments = new ArrayList<>(); List logSegments = localLog.getSegments().values(); - // ignore the segments configured to be retained - for (int i = 0; i < logSegments.size() - tieredLogLocalSegments; i++) { - if (logSegments.get(i + 1).getBaseOffset() <= endOffset) { - deletableSegments.add(logSegments.get(i)); - } else { + int tierProtectedStartIndex = logSegments.size() - tieredLogLocalSegments; + long now = clock.milliseconds(); + + for (int i = 0; i < logSegments.size() - 1; i++) { + if (logSegments.get(i + 1).getBaseOffset() > endOffset) { break; } + if (i < tierProtectedStartIndex || isSegmentExpired(now, logSegments.get(i))) { + deletableSegments.add(logSegments.get(i)); + } } return deletableSegments; } + private boolean isSegmentExpired(long now, LogSegment segment) throws IOException { + if (logTtlMs <= 0L) { + return false; + } + return now - segment.maxTimestampSoFar() > logTtlMs; + } + private void deleteSegments(List deletableSegments, SegmentDeletionReason reason) throws IOException { localLog.checkIfMemoryMappedBufferClosed(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java index 8c7d0d8832..06e531cf6d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java @@ -127,6 +127,7 @@ private void runOnce() throws InterruptedException { TableMetricGroup metricGroup = replica.tableMetrics(); maybeUpdateCopiedOffset(logTablet); + logTablet.rollActiveSegmentIfExpired(); // Get these candidate log segments to copy and these expired remote log segments to // clean up. List candidateToCopySegments = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java index d23667431f..216c59d883 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTTLTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.server.log.LogTablet; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -33,7 +34,9 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId; import static org.assertj.core.api.Assertions.assertThat; /** Test for remote log ttl in {@link RemoteLogManager}. */ @@ -72,11 +75,13 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { // advance time past TTL (7 days) manualClock.advanceTime(Duration.ofDays(7).plusHours(1)); - // since data lake is enabled and no data has been tiered to data lake, - // the expired segments should not be deleted. + // Since data lake is enabled and no data has been tiered to data lake, + // the expired segments should not be deleted. The expired active segment is rolled + // and uploaded in this task run. remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); - assertThat(remoteLog.allRemoteLogSegments()).hasSize(4); + assertThat(remoteLog.allRemoteLogSegments()).hasSize(5); assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L); // set lake log end offset to 20, meaning only the first 2 segments // ([0,10) and [10,20)) have been tiered to lake @@ -86,10 +91,10 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); // only segments with remoteLogEndOffset <= 20 should be deleted (first 2 segments) - // remaining segments: [20,30) and [30,40) - assertThat(remoteLog.allRemoteLogSegments()).hasSize(2); + // remaining segments: [20,30), [30,40) and [40,50) + assertThat(remoteLog.allRemoteLogSegments()).hasSize(3); assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(20L); - assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(40L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L); // verify remaining segments have the expected offsets assertThat(remoteLog.allRemoteLogSegments()) .allSatisfy( @@ -99,7 +104,14 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { // now advance lake log end offset to include all remaining segments logTablet.updateLakeLogEndOffset(40L); - // trigger again, remaining expired segments should now be deleted + // trigger again, segments whose end offset is <= 40 should now be deleted + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + assertThat(remoteLog.allRemoteLogSegments()).hasSize(1); + assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(40L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L); + + logTablet.updateLakeLogEndOffset(50L); + // trigger again, all remaining expired segments should now be deleted remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); assertThat(remoteLog.allRemoteLogSegments()).isEmpty(); assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); @@ -107,8 +119,8 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { // Fetch records from remote. // mock to update remote log end offset and remote log start offset as // NotifyRemoteLogOffsetsRequest do. - logTablet.updateRemoteLogStartOffset(40L); - logTablet.updateRemoteLogEndOffset(40L); + logTablet.updateRemoteLogStartOffset(50L); + logTablet.updateRemoteLogEndOffset(50L); CompletableFuture> future = new CompletableFuture<>(); replicaManager.fetchLogRecords( @@ -122,4 +134,41 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception { assertThat(resultForBucket.getErrorCode()) .isEqualTo(Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION.code()); } + + @Test + void testExpiredActiveSegmentRolledUploadedAndLocallyDeleted() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); + makeLogTableAsLeader(tb, false); + LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet(); + + addMultiSegmentsToLogTablet(logTablet, 1); + assertThat(logTablet.getSegments()).hasSize(1); + assertThat(logTablet.activeLogSegment().getSizeInBytes()).isGreaterThan(0); + + manualClock.advanceTime(Duration.ofDays(7).plusHours(1)); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb); + assertThat(remoteLog.allRemoteLogSegments()).hasSize(1); + assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L); + assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(10L); + assertThat(logTablet.getSegments()).hasSize(1); + assertThat(logTablet.activeLogSegment().getBaseOffset()).isEqualTo(10L); + assertThat(logTablet.activeLogSegment().getSizeInBytes()).isZero(); + + logTablet.appendAsLeader( + genMemoryLogRecordsWithWriterId( + Collections.singletonList(DATA1.get(0)), + manualClock.milliseconds(), + 0, + 10L)); + + assertThat(logTablet.localLogEndOffset()).isEqualTo(11L); + assertThat(logTablet.activeLogSegment().getBaseOffset()).isEqualTo(10L); + assertThat(logTablet.activeLogSegment().getSizeInBytes()).isGreaterThan(0); + + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + assertThat(remoteLog.allRemoteLogSegments()).isEmpty(); + assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE); + } }