diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 4b1dc28ac7..f205396104 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -422,6 +422,49 @@ void testAlterTableConfig() throws Exception { .get(); } + @Test + void testAlterTableLogTtl() throws Exception { + // Verify that altering 'table.log.ttl' is supported: the new value should be persisted in + // TableInfo so that subsequent reads observe the updated retention. + TablePath tablePath = TablePath.of("test_db", "alter_table_log_ttl"); + admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get(); + + // verify initial value matches the property set in DEFAULT_TABLE_DESCRIPTOR (1 day) + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getLogTTLMs()) + .isEqualTo(Duration.ofDays(1).toMillis()); + + // alter to 3d and verify metadata + List tableChanges = + Collections.singletonList(TableChange.set(ConfigOptions.TABLE_LOG_TTL.key(), "3d")); + admin.alterTable(tablePath, tableChanges, false).get(); + + tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getLogTTLMs()) + .isEqualTo(Duration.ofDays(3).toMillis()); + + // alter to another value (30d) to verify multiple updates work. + tableChanges = + Collections.singletonList( + TableChange.set(ConfigOptions.TABLE_LOG_TTL.key(), "30d")); + admin.alterTable(tablePath, tableChanges, false).get(); + + tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.getTableConfig().getLogTTLMs()) + .isEqualTo(Duration.ofDays(30).toMillis()); + + // reset to remove the property; value should fall back to the default (7 days). + tableChanges = + Collections.singletonList(TableChange.reset(ConfigOptions.TABLE_LOG_TTL.key())); + admin.alterTable(tablePath, tableChanges, false).get(); + + tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.toTableDescriptor().getProperties()) + .doesNotContainKey(ConfigOptions.TABLE_LOG_TTL.key()); + assertThat(tableInfo.getTableConfig().getLogTTLMs()) + .isEqualTo(Duration.ofDays(7).toMillis()); + } + @Test void testAlterTableColumn() throws Exception { // create table diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index adbbce0af4..78bcc0ed8d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -48,6 +48,7 @@ public class FlussConfigUtils { Arrays.asList( ConfigOptions.TABLE_DATALAKE_ENABLED.key(), ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), + ConfigOptions.TABLE_LOG_TTL.key(), ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(), ConfigOptions.TABLE_STATISTICS_COLUMNS.key()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index 282ae8d9ee..6eae358749 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -328,6 +328,20 @@ public RemoteLogTablet remoteLogTablet(TableBucket tableBucket) { return remoteLog; } + /** + * Returns the {@link RemoteLogTablet} for the given {@link TableBucket}, or {@link + * Optional#empty()} if it has not been registered yet (e.g. remote logging is disabled, or the + * replica is still in the early stage of construction). + * + *

This is the production-safe accessor for callers that legitimately tolerate the absence of + * the tablet, such as {@code Replica#updateLogTtlMs(long)} reacting to alter-table events. + * Unlike {@link #remoteLogTablet(TableBucket)}, it does not throw and is not annotated as + * {@code @VisibleForTesting}. + */ + public Optional getRemoteLogTablet(TableBucket tableBucket) { + return Optional.ofNullable(remoteLogs.get(tableBucket)); + } + @Override public void close() throws IOException { rlmTasks.values().forEach(TaskWithFuture::cancel); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java index 953fb7e17f..0207d9b77c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java @@ -78,7 +78,7 @@ public class RemoteLogTablet { /** The lock to protect the remote log segment list. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final long ttlMs; + private volatile long ttlMs; /** The registered metrics for remote log. */ private volatile MetricGroup remoteLogMetrics; @@ -161,7 +161,12 @@ public List allRemoteLogSegments() { */ public List expiredRemoteLogSegments( long currentTimeMs, Long lakeLogEndOffset) { - if (!logExpireEnable()) { + // Snapshot the volatile ttlMs once so that a concurrent updateTtlMs() cannot change the + // comparison base mid-iteration. Otherwise an in-flight change to a non-positive value + // (which disables expiration) could make currentTimeMs - ts > ttlMs hold for ALL segments + // and result in wrongly deleting every remote segment. + final long ttlSnapshotMs = ttlMs; + if (ttlSnapshotMs <= 0) { return Collections.emptyList(); } return inReadLock( @@ -171,7 +176,7 @@ public List expiredRemoteLogSegments( for (Map.Entry> entry : timestampToRemoteLogSegmentId.entrySet()) { long ts = entry.getKey(); - if (currentTimeMs - ts > ttlMs) { + if (currentTimeMs - ts > ttlSnapshotMs) { for (UUID uuid : entry.getValue()) { RemoteLogSegment segment = idToRemoteLogSegment.get(uuid); if (lakeLogEndOffset != null) { @@ -353,8 +358,24 @@ public void addAndDeleteLogSegments( }); } - private boolean logExpireEnable() { - return ttlMs > 0; + /** + * Returns the current ttl in milliseconds for remote log segments. This is part of the runtime + * API: it is read by the remote-log expiration check on every round, and by code paths that + * apply alter-table updates (see {@code Replica#updateLogTtlMs(long)}). + */ + public long getTtlMs() { + return ttlMs; + } + + /** + * Update the ttl in milliseconds for remote log segments. This is invoked when the user alters + * the table option {@code table.log.ttl}. The new value takes effect on the next round of + * expired-segment evaluation. + * + * @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration + */ + public void updateTtlMs(long newTtlMs) { + this.ttlMs = newTtlMs; } private void reset() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 7a48813237..f0be617bad 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -84,6 +84,7 @@ import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.log.remote.RemoteLogManager; +import org.apache.fluss.server.log.remote.RemoteLogTablet; import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metrics.group.BucketMetricGroup; @@ -637,6 +638,42 @@ public void updateTieredLogLocalSegments(int tieredLogLocalSegments) { tieredLogLocalSegments); } + /** + * Update the ttl in milliseconds for remote log segments. This method is called when the table + * configuration {@code table.log.ttl} is altered, so that the new ttl takes effect immediately + * for the next round of expired-segment evaluation, without requiring a server restart or table + * re-creation. + * + * @param newTtlMs the new ttl in milliseconds; a non-positive value disables expiration + */ + public void updateLogTtlMs(long newTtlMs) { + Optional remoteLogTabletOpt = + remoteLogManager.getRemoteLogTablet(tableBucket); + if (!remoteLogTabletOpt.isPresent()) { + // RemoteLogTablet may be unavailable when remote logging is disabled, or during the + // early stage of replica creation before the tablet is registered. In both cases, the + // alter is a no-op here: if remote logging is disabled the value is irrelevant; if the + // tablet is not yet registered, the Replica will read the up-to-date ttl from the + // persisted TableInfo when constructing the RemoteLogTablet. + LOG.debug( + "RemoteLogTablet for {} is unavailable; skip applying new logTtlMs={} " + + "(remote logging may be disabled or the replica is still initializing).", + tableBucket, + newTtlMs); + return; + } + + RemoteLogTablet remoteLogTablet = remoteLogTabletOpt.get(); + long oldValue = remoteLogTablet.getTtlMs(); + if (oldValue == newTtlMs) { + return; + } + + remoteLogTablet.updateTtlMs(newTtlMs); + + LOG.info("Replica for {} logTtlMs changed from {} to {}", tableBucket, oldValue, newTtlMs); + } + private void createKv() { try { // create a closeable registry for the closable related to kv diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 3b847266eb..8c46e5e157 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -530,6 +530,7 @@ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clust private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { Map tableIdToLakeFlag = new HashMap<>(); Map tableIdToTieredLogLocalSegments = new HashMap<>(); + Map tableIdToLogTtlMs = new HashMap<>(); for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) { TableInfo tableInfo = tableMetadata.getTableInfo(); @@ -544,9 +545,15 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { // Collect tiered log local segments configuration int tieredLogLocalSegments = tableInfo.getTableConfig().getTieredLogLocalSegments(); tableIdToTieredLogLocalSegments.put(tableId, tieredLogLocalSegments); + + // Collect log ttl configuration + long logTtlMs = tableInfo.getTableConfig().getLogTTLMs(); + tableIdToLogTtlMs.put(tableId, logTtlMs); } - if (tableIdToLakeFlag.isEmpty() && tableIdToTieredLogLocalSegments.isEmpty()) { + if (tableIdToLakeFlag.isEmpty() + && tableIdToTieredLogLocalSegments.isEmpty() + && tableIdToLogTtlMs.isEmpty()) { return; } @@ -566,6 +573,11 @@ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) { replica.updateTieredLogLocalSegments( tableIdToTieredLogLocalSegments.get(tableId)); } + + // Update log ttl configuration + if (tableIdToLogTtlMs.containsKey(tableId)) { + replica.updateLogTtlMs(tableIdToLogTtlMs.get(tableId)); + } } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java index 2ac4e20967..8422b9366f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java @@ -664,6 +664,44 @@ void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws Excep assertThat(logTablet.getSegments()).hasSize(3); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAlterTableLogTtl(boolean partitionedTable) throws Exception { + // 1. Create table with default table.log.ttl (= 7 days) + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 201L, + Collections.emptyList(), + Collections.emptyMap()); + TableBucket tb = makeTableBucket(tableId, partitionedTable); + makeLogTableAsLeader(tb, partitionedTable); + + Replica replica = replicaManager.getReplicaOrException(tb); + RemoteLogTablet remoteLog = remoteLogManager.remoteLogTablet(tb); + + // Verify initial ttl matches the configured default table.log.ttl, so the test stays + // robust if the default value of ConfigOptions.TABLE_LOG_TTL ever changes. + long defaultTtlMs = ConfigOptions.TABLE_LOG_TTL.defaultValue().toMillis(); + assertThat(remoteLog.getTtlMs()).isEqualTo(defaultTtlMs); + + // 2. Update ttl via Replica.updateLogTtlMs (simulating metadata propagation) + long newTtlMs = java.time.Duration.ofDays(1).toMillis(); + replica.updateLogTtlMs(newTtlMs); + + // Verify RemoteLogTablet internal state has been updated + assertThat(remoteLog.getTtlMs()).isEqualTo(newTtlMs); + + // 3. Update with the same value should be a no-op + replica.updateLogTtlMs(newTtlMs); + assertThat(remoteLog.getTtlMs()).isEqualTo(newTtlMs); + + // 4. Disabling expiration via a non-positive ttl should propagate as-is. + replica.updateLogTtlMs(-1L); + assertThat(remoteLog.getTtlMs()).isEqualTo(-1L); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testCopySegmentPartialFailureCommitsSuccessfulOnes(boolean partitionTable) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java index 0d571e1ea5..75d940bf28 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.log.remote; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.server.log.LogTablet; @@ -181,6 +182,37 @@ void testFindRemoteLogSegmentByTimestamp(boolean partitionTable) throws Exceptio assertThat(remoteLogTablet.findSegmentByTimestamp(51L)).isNull(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testUpdateTtlMs(boolean partitionTable) throws Exception { + LogTablet logTablet = makeLogTabletAndAddSegments(partitionTable); + RemoteLogTablet remoteLogTablet = buildRemoteLogTablet(logTablet); + + // initial ttl follows the default in ConfigOptions.TABLE_LOG_TTL (7 days) + long defaultTtlMs = conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis(); + assertThat(remoteLogTablet.getTtlMs()).isEqualTo(defaultTtlMs); + + // add 1 segment with maxTimestamp = 0 + RemoteLogSegment segment = createLogSegmentWithMaxTimestamp(logTablet, 0L, 0L, 10L); + remoteLogTablet.addAndDeleteLogSegments( + Collections.singletonList(segment), Collections.emptyList()); + + // currentTime = 1 hour. (1h - 0) < 7d, so the segment is NOT expired. + long oneHourMs = java.time.Duration.ofHours(1).toMillis(); + assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)).isEmpty(); + + // shrink ttl to 1 ms via updateTtlMs, the same segment should now be expired. + remoteLogTablet.updateTtlMs(1L); + assertThat(remoteLogTablet.getTtlMs()).isEqualTo(1L); + assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)) + .containsExactly(segment); + + // disable expiration by setting ttl to a non-positive value. + remoteLogTablet.updateTtlMs(-1L); + assertThat(remoteLogTablet.getTtlMs()).isEqualTo(-1L); + assertThat(remoteLogTablet.expiredRemoteLogSegments(oneHourMs, null)).isEmpty(); + } + RemoteLogSegment createLogSegmentWithMaxTimestamp( LogTablet logTablet, long timestamp,