Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,49 @@ void testAlterTableConfig() throws Exception {
.get();
}

@Test
void testAlterTableLogTtl() throws Exception {
// Verify that altering 'table.log.ttl' is supported: the new value should be persisted in
// TableInfo so that subsequent reads observe the updated retention.
TablePath tablePath = TablePath.of("test_db", "alter_table_log_ttl");
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();

// verify initial value matches the property set in DEFAULT_TABLE_DESCRIPTOR (1 day)
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(1).toMillis());

// alter to 3d and verify metadata
List<TableChange> tableChanges =
Collections.singletonList(TableChange.set(ConfigOptions.TABLE_LOG_TTL.key(), "3d"));
admin.alterTable(tablePath, tableChanges, false).get();

tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(3).toMillis());

// alter to another value (30d) to verify multiple updates work.
tableChanges =
Collections.singletonList(
TableChange.set(ConfigOptions.TABLE_LOG_TTL.key(), "30d"));
admin.alterTable(tablePath, tableChanges, false).get();

tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(30).toMillis());

// reset to remove the property; value should fall back to the default (7 days).
tableChanges =
Collections.singletonList(TableChange.reset(ConfigOptions.TABLE_LOG_TTL.key()));
admin.alterTable(tablePath, tableChanges, false).get();

tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.toTableDescriptor().getProperties())
.doesNotContainKey(ConfigOptions.TABLE_LOG_TTL.key());
assertThat(tableInfo.getTableConfig().getLogTTLMs())
.isEqualTo(Duration.ofDays(7).toMillis());
}

@Test
void testAlterTableColumn() throws Exception {
// create table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class FlussConfigUtils {
Arrays.asList(
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
ConfigOptions.TABLE_LOG_TTL.key(),
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(),
ConfigOptions.TABLE_STATISTICS_COLUMNS.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,20 @@ public RemoteLogTablet remoteLogTablet(TableBucket tableBucket) {
return remoteLog;
}

/**
* Returns the {@link RemoteLogTablet} for the given {@link TableBucket}, or {@link
* Optional#empty()} if it has not been registered yet (e.g. remote logging is disabled, or the
* replica is still in the early stage of construction).
*
* <p>This is the production-safe accessor for callers that legitimately tolerate the absence of
* the tablet, such as {@code Replica#updateLogTtlMs(long)} reacting to alter-table events.
* Unlike {@link #remoteLogTablet(TableBucket)}, it does not throw and is not annotated as
* {@code @VisibleForTesting}.
*/
public Optional<RemoteLogTablet> getRemoteLogTablet(TableBucket tableBucket) {
return Optional.ofNullable(remoteLogs.get(tableBucket));
}

@Override
public void close() throws IOException {
rlmTasks.values().forEach(TaskWithFuture::cancel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class RemoteLogTablet {
/** The lock to protect the remote log segment list. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final long ttlMs;
private volatile long ttlMs;

/** The registered metrics for remote log. */
private volatile MetricGroup remoteLogMetrics;
Expand Down Expand Up @@ -161,7 +161,12 @@ public List<RemoteLogSegment> allRemoteLogSegments() {
*/
public List<RemoteLogSegment> expiredRemoteLogSegments(
long currentTimeMs, Long lakeLogEndOffset) {
if (!logExpireEnable()) {
// Snapshot the volatile ttlMs once so that a concurrent updateTtlMs() cannot change the
// comparison base mid-iteration. Otherwise an in-flight change to a non-positive value
// (which disables expiration) could make currentTimeMs - ts > ttlMs hold for ALL segments
// and result in wrongly deleting every remote segment.
final long ttlSnapshotMs = ttlMs;
if (ttlSnapshotMs <= 0) {
return Collections.emptyList();
}
return inReadLock(
Expand All @@ -171,7 +176,7 @@ public List<RemoteLogSegment> expiredRemoteLogSegments(
for (Map.Entry<Long, Set<UUID>> entry :
timestampToRemoteLogSegmentId.entrySet()) {
long ts = entry.getKey();
if (currentTimeMs - ts > ttlMs) {
if (currentTimeMs - ts > ttlSnapshotMs) {
for (UUID uuid : entry.getValue()) {
RemoteLogSegment segment = idToRemoteLogSegment.get(uuid);
if (lakeLogEndOffset != null) {
Expand Down Expand Up @@ -353,8 +358,24 @@ public void addAndDeleteLogSegments(
});
}

private boolean logExpireEnable() {
return ttlMs > 0;
/**
* Returns the current ttl in milliseconds for remote log segments. This is part of the runtime
* API: it is read by the remote-log expiration check on every round, and by code paths that
* apply alter-table updates (see {@code Replica#updateLogTtlMs(long)}).
*/
public long getTtlMs() {
return ttlMs;
}

/**
* Update the ttl in milliseconds for remote log segments. This is invoked when the user alters
* the table option {@code table.log.ttl}. The new value takes effect on the next round of
* expired-segment evaluation.
*
* @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration
*/
public void updateTtlMs(long newTtlMs) {
this.ttlMs = newTtlMs;
}

private void reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
import org.apache.fluss.server.log.remote.RemoteLogManager;
import org.apache.fluss.server.log.remote.RemoteLogTablet;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
Expand Down Expand Up @@ -637,6 +638,42 @@ public void updateTieredLogLocalSegments(int tieredLogLocalSegments) {
tieredLogLocalSegments);
}

/**
* Update the ttl in milliseconds for remote log segments. This method is called when the table
* configuration {@code table.log.ttl} is altered, so that the new ttl takes effect immediately
* for the next round of expired-segment evaluation, without requiring a server restart or table
* re-creation.
*
* @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration
*/
public void updateLogTtlMs(long newTtlMs) {
Optional<RemoteLogTablet> remoteLogTabletOpt =
remoteLogManager.getRemoteLogTablet(tableBucket);
if (!remoteLogTabletOpt.isPresent()) {
// RemoteLogTablet may be unavailable when remote logging is disabled, or during the
// early stage of replica creation before the tablet is registered. In both cases, the
// alter is a no-op here: if remote logging is disabled the value is irrelevant; if the
// tablet is not yet registered, the Replica will read the up-to-date ttl from the
// persisted TableInfo when constructing the RemoteLogTablet.
LOG.debug(
"RemoteLogTablet for {} is unavailable; skip applying new logTtlMs={} "
+ "(remote logging may be disabled or the replica is still initializing).",
tableBucket,
newTtlMs);
return;
}

RemoteLogTablet remoteLogTablet = remoteLogTabletOpt.get();
long oldValue = remoteLogTablet.getTtlMs();
if (oldValue == newTtlMs) {
return;
}

remoteLogTablet.updateTtlMs(newTtlMs);

LOG.info("Replica for {} logTtlMs changed from {} to {}", tableBucket, oldValue, newTtlMs);
}

private void createKv() {
try {
// create a closeable registry for the closable related to kv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust
private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
Map<Long, Integer> tableIdToTieredLogLocalSegments = new HashMap<>();
Map<Long, Long> tableIdToLogTtlMs = new HashMap<>();

for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) {
TableInfo tableInfo = tableMetadata.getTableInfo();
Expand All @@ -544,9 +545,15 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
// Collect tiered log local segments configuration
int tieredLogLocalSegments = tableInfo.getTableConfig().getTieredLogLocalSegments();
tableIdToTieredLogLocalSegments.put(tableId, tieredLogLocalSegments);

// Collect log ttl configuration
long logTtlMs = tableInfo.getTableConfig().getLogTTLMs();
tableIdToLogTtlMs.put(tableId, logTtlMs);
}

if (tableIdToLakeFlag.isEmpty() && tableIdToTieredLogLocalSegments.isEmpty()) {
if (tableIdToLakeFlag.isEmpty()
&& tableIdToTieredLogLocalSegments.isEmpty()
&& tableIdToLogTtlMs.isEmpty()) {
return;
}

Expand All @@ -566,6 +573,11 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
replica.updateTieredLogLocalSegments(
tableIdToTieredLogLocalSegments.get(tableId));
}

// Update log ttl configuration
if (tableIdToLogTtlMs.containsKey(tableId)) {
replica.updateLogTtlMs(tableIdToLogTtlMs.get(tableId));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,44 @@ void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws Excep
assertThat(logTablet.getSegments()).hasSize(3);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testAlterTableLogTtl(boolean partitionedTable) throws Exception {
// 1. Create table with default table.log.ttl (= 7 days)
long tableId =
registerTableInZkClient(
DATA1_TABLE_PATH,
DATA1_SCHEMA,
201L,
Collections.emptyList(),
Collections.emptyMap());
TableBucket tb = makeTableBucket(tableId, partitionedTable);
makeLogTableAsLeader(tb, partitionedTable);

Replica replica = replicaManager.getReplicaOrException(tb);
RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb);

// Verify initial ttl matches the configured default table.log.ttl, so the test stays
// robust if the default value of ConfigOptions.TABLE_LOG_TTL ever changes.
long defaultTtlMs = ConfigOptions.TABLE_LOG_TTL.defaultValue().toMillis();
assertThat(remoteLog.getTtlMs()).isEqualTo(defaultTtlMs);

// 2. Update ttl via Replica.updateLogTtlMs (simulating metadata propagation)
long newTtlMs = java.time.Duration.ofDays(1).toMillis();
replica.updateLogTtlMs(newTtlMs);

// Verify RemoteLogTablet internal state has been updated
assertThat(remoteLog.getTtlMs()).isEqualTo(newTtlMs);

// 3. Update with the same value should be a no-op
replica.updateLogTtlMs(newTtlMs);
assertThat(remoteLog.getTtlMs()).isEqualTo(newTtlMs);

// 4. Disabling expiration via a non-positive ttl should propagate as-is.
replica.updateLogTtlMs(-1L);
assertThat(remoteLog.getTtlMs()).isEqualTo(-1L);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCopySegmentPartialFailureCommitsSuccessfulOnes(boolean partitionTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.server.log.remote;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.remote.RemoteLogSegment;
import org.apache.fluss.server.log.LogTablet;

Expand Down Expand Up @@ -181,6 +182,37 @@ void testFindRemoteLogSegmentByTimestamp(boolean partitionTable) throws Exceptio
assertThat(remoteLogTablet.findSegmentByTimestamp(51L)).isNull();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testUpdateTtlMs(boolean partitionTable) throws Exception {
LogTablet logTablet = makeLogTabletAndAddSegments(partitionTable);
RemoteLogTablet remoteLogTablet = buildRemoteLogTablet(logTablet);

// initial ttl follows the default in ConfigOptions.TABLE_LOG_TTL (7 days)
long defaultTtlMs = conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis();
assertThat(remoteLogTablet.getTtlMs()).isEqualTo(defaultTtlMs);

// add 1 segment with maxTimestamp = 0
RemoteLogSegment segment = createLogSegmentWithMaxTimestamp(logTablet, 0L, 0L, 10L);
remoteLogTablet.addAndDeleteLogSegments(
Collections.singletonList(segment), Collections.emptyList());

// currentTime = 1 hour. (1h - 0) < 7d, so the segment is NOT expired.
long oneHourMs = java.time.Duration.ofHours(1).toMillis();
assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)).isEmpty();

// shrink ttl to 1 ms via updateTtlMs, the same segment should now be expired.
remoteLogTablet.updateTtlMs(1L);
assertThat(remoteLogTablet.getTtlMs()).isEqualTo(1L);
assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null))
.containsExactly(segment);

// disable expiration by setting ttl to a non-positive value.
remoteLogTablet.updateTtlMs(-1L);
assertThat(remoteLogTablet.getTtlMs()).isEqualTo(-1L);
assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)).isEmpty();
}

RemoteLogSegment createLogSegmentWithMaxTimestamp(
LogTablet logTablet,
long timestamp,
Expand Down