Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public final class LogTablet {
private volatile int tieredLogLocalSegments;
private final Clock clock;
private final boolean isChangeLog;
private final long logTtlMs;

@GuardedBy("lock")
private volatile LogOffsetMetadata highWatermarkMetadata;
Expand Down Expand Up @@ -147,6 +148,7 @@ private LogTablet(
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL).toMillis();
this.writerStateManager = writerStateManager;
this.highWatermarkMetadata = new LogOffsetMetadata(0L);
this.logTtlMs = conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis();

this.scheduler = scheduler;
// scheduler the writer expiration interval check.
Expand Down Expand Up @@ -915,6 +917,32 @@ public void roll(Optional<Long> expectedNextOffset) throws Exception {
}
}

/**
* Rolls the active segment if it is non-empty and all of its records have passed the log TTL.
*
* <p>This keeps an empty active segment available for future appends while making the expired
* segment eligible for remote tiering and local cleanup.
*/
public void rollActiveSegmentIfExpired() throws Exception {
if (logTtlMs <= 0L) {
return;
}

synchronized (lock) {
LogSegment activeSegment = localLog.getSegments().activeSegment();
if (activeSegment.getSizeInBytes() == 0
|| !isSegmentExpired(clock.milliseconds(), activeSegment)) {
return;
}

LOG.info(
"Rolling expired active log segment {} for bucket {}.",
activeSegment,
getTableBucket());
roll(Optional.empty());
}
}

/** Truncate this log so that it ends with the greatest offset < targetOffset. */
boolean truncateTo(long targetOffset) throws LogStorageException {
if (targetOffset < 0) {
Expand Down Expand Up @@ -1172,7 +1200,7 @@ private void deleteOldSegments(long endOffset, SegmentDeletionReason reason)
}

/** Returns the segments that can be deleted by checking log end offset. */
private List<LogSegment> deletableSegments(long endOffset) {
private List<LogSegment> deletableSegments(long endOffset) throws IOException {
if (localLog.getSegments().isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -1181,17 +1209,27 @@ private List<LogSegment> deletableSegments(long endOffset) {
// readers is in progress.
List<LogSegment> deletableSegments = new ArrayList<>();
List<LogSegment> logSegments = localLog.getSegments().values();
// ignore the segments configured to be retained
for (int i = 0; i < logSegments.size() - tieredLogLocalSegments; i++) {
if (logSegments.get(i + 1).getBaseOffset() <= endOffset) {
deletableSegments.add(logSegments.get(i));
} else {
int tierProtectedStartIndex = logSegments.size() - tieredLogLocalSegments;
long now = clock.milliseconds();

for (int i = 0; i < logSegments.size() - 1; i++) {
if (logSegments.get(i + 1).getBaseOffset() > endOffset) {
break;
}
if (i < tierProtectedStartIndex || isSegmentExpired(now, logSegments.get(i))) {
deletableSegments.add(logSegments.get(i));
}
Comment on lines +1212 to +1221
}
return deletableSegments;
}

private boolean isSegmentExpired(long now, LogSegment segment) throws IOException {
if (logTtlMs <= 0L) {
return false;
}
return now - segment.maxTimestampSoFar() > logTtlMs;
}

private void deleteSegments(List<LogSegment> deletableSegments, SegmentDeletionReason reason)
throws IOException {
localLog.checkIfMemoryMappedBufferClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private void runOnce() throws InterruptedException {
TableMetricGroup metricGroup = replica.tableMetrics();
maybeUpdateCopiedOffset(logTablet);

logTablet.rollActiveSegmentIfExpired();
// Get these candidate log segments to copy and these expired remote log segments to
// clean up.
List<EnrichedLogSegment> candidateToCopySegments =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.server.log.LogTablet;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -33,7 +34,9 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for remote log ttl in {@link RemoteLogManager}. */
Expand Down Expand Up @@ -72,11 +75,13 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception {
// advance time past TTL (7 days)
manualClock.advanceTime(Duration.ofDays(7).plusHours(1));

// since data lake is enabled and no data has been tiered to data lake,
// the expired segments should not be deleted.
// Since data lake is enabled and no data has been tiered to data lake,
// the expired segments should not be deleted. The expired active segment is rolled
// and uploaded in this task run.
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
assertThat(remoteLog.allRemoteLogSegments()).hasSize(4);
assertThat(remoteLog.allRemoteLogSegments()).hasSize(5);
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L);
assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L);

// set lake log end offset to 20, meaning only the first 2 segments
// ([0,10) and [10,20)) have been tiered to lake
Expand All @@ -86,10 +91,10 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception {
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

// only segments with remoteLogEndOffset <= 20 should be deleted (first 2 segments)
// remaining segments: [20,30) and [30,40)
assertThat(remoteLog.allRemoteLogSegments()).hasSize(2);
// remaining segments: [20,30), [30,40) and [40,50)
assertThat(remoteLog.allRemoteLogSegments()).hasSize(3);
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(20L);
assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(40L);
assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L);
// verify remaining segments have the expected offsets
assertThat(remoteLog.allRemoteLogSegments())
.allSatisfy(
Expand All @@ -99,16 +104,23 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception {

// now advance lake log end offset to include all remaining segments
logTablet.updateLakeLogEndOffset(40L);
// trigger again, remaining expired segments should now be deleted
// trigger again, segments whose end offset is <= 40 should now be deleted
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
assertThat(remoteLog.allRemoteLogSegments()).hasSize(1);
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(40L);
assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(50L);

logTablet.updateLakeLogEndOffset(50L);
// trigger again, all remaining expired segments should now be deleted
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
assertThat(remoteLog.allRemoteLogSegments()).isEmpty();
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);

// Fetch records from remote.
// mock to update remote log end offset and remote log start offset as
// NotifyRemoteLogOffsetsRequest do.
logTablet.updateRemoteLogStartOffset(40L);
logTablet.updateRemoteLogEndOffset(40L);
logTablet.updateRemoteLogStartOffset(50L);
logTablet.updateRemoteLogEndOffset(50L);
CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> future =
new CompletableFuture<>();
replicaManager.fetchLogRecords(
Expand All @@ -122,4 +134,41 @@ void testRemoteLogTTL(boolean partitionTable) throws Exception {
assertThat(resultForBucket.getErrorCode())
.isEqualTo(Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION.code());
}

@Test
void testExpiredActiveSegmentRolledUploadedAndLocallyDeleted() throws Exception {
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
makeLogTableAsLeader(tb, false);
LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet();

addMultiSegmentsToLogTablet(logTablet, 1);
assertThat(logTablet.getSegments()).hasSize(1);
assertThat(logTablet.activeLogSegment().getSizeInBytes()).isGreaterThan(0);

manualClock.advanceTime(Duration.ofDays(7).plusHours(1));
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);
assertThat(remoteLog.allRemoteLogSegments()).hasSize(1);
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(0L);
assertThat(remoteLog.getRemoteLogEndOffset()).hasValue(10L);
assertThat(logTablet.getSegments()).hasSize(1);
assertThat(logTablet.activeLogSegment().getBaseOffset()).isEqualTo(10L);
assertThat(logTablet.activeLogSegment().getSizeInBytes()).isZero();

logTablet.appendAsLeader(
genMemoryLogRecordsWithWriterId(
Collections.singletonList(DATA1.get(0)),
manualClock.milliseconds(),
0,
10L));

assertThat(logTablet.localLogEndOffset()).isEqualTo(11L);
assertThat(logTablet.activeLogSegment().getBaseOffset()).isEqualTo(10L);
assertThat(logTablet.activeLogSegment().getSizeInBytes()).isGreaterThan(0);

remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
assertThat(remoteLog.allRemoteLogSegments()).isEmpty();
assertThat(remoteLog.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
}
}
Loading