From 2d5f12c98c4226db8f8d44b127efb4091ae0a097 Mon Sep 17 00:00:00 2001 From: wattt3 Date: Sat, 25 Apr 2026 10:51:52 +0900 Subject: [PATCH 1/2] [server] Support configurable time partition format for auto-partitioned tables --- .../apache/fluss/config/ConfigOptions.java | 13 +++ .../fluss/utils/AutoPartitionStrategy.java | 11 +++ .../apache/fluss/utils/PartitionUtils.java | 79 +++++++++-------- .../fluss/utils/PartitionUtilsTest.java | 29 ++++++- .../coordinator/AutoPartitionManager.java | 29 ++++--- .../utils/TableDescriptorValidation.java | 22 +++++ .../coordinator/TableManagerITCase.java | 2 +- .../utils/TableDescriptorValidationTest.java | 86 +++++++++++++++++++ website/docs/engine-flink/options.md | 1 + .../data-distribution/partitioning.md | 3 +- 10 files changed, 227 insertions(+), 48 deletions(-) create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index df21129db6..4920f06b0a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1450,6 +1450,19 @@ public class ConfigOptions { + "If the value is `YEAR`, the partition format for " + "auto created is yyyy."); + public static final ConfigOption TABLE_AUTO_PARTITION_TIME_FORMAT = + key("table.auto-partition.time-format") + .stringType() + .noDefaultValue() + .withDescription( + "The time format used for auto-created partition values. " + + "If not set, the format is derived from `table.auto-partition.time-unit` " + + "(e.g. `yyyyMMdd` for DAY, `yyyyMMddHH` for HOUR). " + + "When set, this value overrides the format derived from the time unit, " + + "while the partition granularity still follows `table.auto-partition.time-unit`. " + + "A custom format must use zero-padded fields covering at least the unit's precision, " + + "so partition values sort by time as strings (e.g. `yyyy-MM-dd` for DAY)."); + public static final ConfigOption TABLE_AUTO_PARTITION_TIMEZONE = key("table.auto-partition.time-zone") .stringType() diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/AutoPartitionStrategy.java b/fluss-common/src/main/java/org/apache/fluss/utils/AutoPartitionStrategy.java index 4da27b2b6a..d97624636d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/AutoPartitionStrategy.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/AutoPartitionStrategy.java @@ -21,6 +21,8 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import javax.annotation.Nullable; + import java.util.Map; import java.util.TimeZone; @@ -30,6 +32,7 @@ public class AutoPartitionStrategy { private final boolean autoPartitionEnable; private final String key; private final AutoPartitionTimeUnit timeUnit; + @Nullable private final String timeFormat; private final int numPreCreate; private final int numToRetain; private final TimeZone timeZone; @@ -38,12 +41,14 @@ private AutoPartitionStrategy( boolean autoPartitionEnable, String key, AutoPartitionTimeUnit autoPartitionTimeUnit, + @Nullable String timeFormat, int numPreCreate, int numToRetain, TimeZone timeZone) { this.autoPartitionEnable = autoPartitionEnable; this.key = key; this.timeUnit = autoPartitionTimeUnit; + this.timeFormat = timeFormat; this.numPreCreate = numPreCreate; this.numToRetain = numToRetain; this.timeZone = timeZone; @@ -58,6 +63,7 @@ public static AutoPartitionStrategy from(Configuration conf) { conf.getBoolean(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED), conf.getString(ConfigOptions.TABLE_AUTO_PARTITION_KEY), conf.get(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT), + conf.getString(ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT), conf.getInt(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE), conf.getInt(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION), TimeZone.getTimeZone(conf.getString(ConfigOptions.TABLE_AUTO_PARTITION_TIMEZONE))); @@ -75,6 +81,11 @@ public AutoPartitionTimeUnit timeUnit() { return timeUnit; } + @Nullable + public String timeFormat() { + return timeFormat; + } + public int numPreCreate() { return numPreCreate; } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java index 6474d2ce9f..aa29ed4d9f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java @@ -28,6 +28,8 @@ import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.types.DataTypeRoot; +import javax.annotation.Nullable; + import java.time.Instant; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; @@ -35,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.apache.fluss.metadata.TablePath.detectInvalidName; @@ -129,19 +132,24 @@ public static void validateAutoPartitionTime( : partitionKeys.get(0); String partitionTime = partitionSpec.getSpecMap().get(autoPartitionKey); AutoPartitionTimeUnit timeUnit = autoPartitionStrategy.timeUnit(); - if (partitionTime == null || !isValidPartitionTime(partitionTime, timeUnit)) { + String timeFormat = autoPartitionStrategy.timeFormat(); + String resolvedFormat = resolveTimeFormat(timeUnit, timeFormat); + if (partitionTime == null || !isValidPartitionTime(partitionTime, resolvedFormat)) { throw new InvalidPartitionException( String.format( "Partition value '%s' does not match the expected format '%s' " + "for auto-partition time unit '%s'.", - partitionTime, getPartitionTimeFormat(timeUnit), timeUnit)); + partitionTime, resolvedFormat, timeUnit)); } ZonedDateTime currentZonedDateTime = ZonedDateTime.ofInstant(Instant.now(), autoPartitionStrategy.timeZone().toZoneId()); // Get the earliest partition time that needs to be retained. String lastRetainPartitionTime = generateAutoPartitionTime( - currentZonedDateTime, -autoPartitionStrategy.numToRetain(), timeUnit); + currentZonedDateTime, + -autoPartitionStrategy.numToRetain(), + timeUnit, + timeFormat); if (lastRetainPartitionTime.compareTo(partitionTime) > 0) { throw new InvalidPartitionException( String.format( @@ -152,55 +160,60 @@ public static void validateAutoPartitionTime( } /** - * Generate {@link ResolvedPartitionSpec} for auto partition in server. When we auto creating a - * partition, we need to first generate a {@link ResolvedPartitionSpec}. + * Generates a {@link ResolvedPartitionSpec} for an auto-created partition. * - *

The value is the formatted time with the specified time unit. + *

The partition value is rendered with the default format for {@code timeUnit}; when {@code + * timeFormat} is non-null, it overrides that format. * * @param partitionKeys the partition keys - * @param current the current time - * @param offset the offset - * @param timeUnit the time unit + * @param current the base time + * @param offset the number of {@code timeUnit}s to advance from {@code current} + * @param timeUnit the time unit driving the partition granularity + * @param timeFormat the user-configured time format, or {@code null} to use the unit's default * @return the resolved partition spec */ public static ResolvedPartitionSpec generateAutoPartition( List partitionKeys, ZonedDateTime current, int offset, - AutoPartitionTimeUnit timeUnit) { - String autoPartitionFieldSpec = generateAutoPartitionTime(current, offset, timeUnit); - + AutoPartitionTimeUnit timeUnit, + @Nullable String timeFormat) { + String autoPartitionFieldSpec = + generateAutoPartitionTime(current, offset, timeUnit, timeFormat); return ResolvedPartitionSpec.fromPartitionName(partitionKeys, autoPartitionFieldSpec); } public static String generateAutoPartitionTime( - ZonedDateTime current, int offset, AutoPartitionTimeUnit timeUnit) { - String autoPartitionFieldSpec; + ZonedDateTime current, + int offset, + AutoPartitionTimeUnit timeUnit, + @Nullable String timeFormat) { + String format = resolveTimeFormat(timeUnit, timeFormat); switch (timeUnit) { case YEAR: - autoPartitionFieldSpec = getFormattedTime(current.plusYears(offset), YEAR_FORMAT); - break; + return getFormattedTime(current.plusYears(offset), format); case QUARTER: - autoPartitionFieldSpec = - getFormattedTime(current.plusMonths(offset * 3L), QUARTER_FORMAT); - break; + return getFormattedTime(current.plusMonths(offset * 3L), format); case MONTH: - autoPartitionFieldSpec = getFormattedTime(current.plusMonths(offset), MONTH_FORMAT); - break; + return getFormattedTime(current.plusMonths(offset), format); case DAY: - autoPartitionFieldSpec = getFormattedTime(current.plusDays(offset), DAY_FORMAT); - break; + return getFormattedTime(current.plusDays(offset), format); case HOUR: - autoPartitionFieldSpec = getFormattedTime(current.plusHours(offset), HOUR_FORMAT); - break; + return getFormattedTime(current.plusHours(offset), format); default: throw new IllegalArgumentException("Unsupported time unit: " + timeUnit); } - return autoPartitionFieldSpec; } - /** Returns the time string format pattern for the given time unit. */ - private static String getPartitionTimeFormat(AutoPartitionTimeUnit timeUnit) { + /** + * Returns given {@code timeFormat} when non-null, otherwise the time format for the given + * {@code timeUnit}. + */ + private static String resolveTimeFormat( + AutoPartitionTimeUnit timeUnit, @Nullable String timeFormat) { + if (timeFormat != null) { + return timeFormat; + } switch (timeUnit) { case YEAR: return YEAR_FORMAT; @@ -217,12 +230,10 @@ private static String getPartitionTimeFormat(AutoPartitionTimeUnit timeUnit) { } } - /** - * Returns true if the given time string matches the format expected for the given time unit. - */ - private static boolean isValidPartitionTime(String time, AutoPartitionTimeUnit timeUnit) { + /** Returns true if {@code time} can be parsed under the given date-time {@code format}. */ + private static boolean isValidPartitionTime(String time, String format) { try { - DateTimeFormatter.ofPattern(getPartitionTimeFormat(timeUnit)).parse(time); + DateTimeFormatter.ofPattern(format, Locale.ROOT).parse(time); return true; } catch (DateTimeParseException e) { return false; @@ -230,7 +241,7 @@ private static boolean isValidPartitionTime(String time, AutoPartitionTimeUnit t } private static String getFormattedTime(ZonedDateTime zonedDateTime, String format) { - return DateTimeFormatter.ofPattern(format).format(zonedDateTime); + return DateTimeFormatter.ofPattern(format, Locale.ROOT).format(zonedDateTime); } /** diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java index aeb2c1d7c3..921018a978 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java @@ -164,11 +164,38 @@ void testGenerateAutoPartitionName( Collections.singletonList("dt"), zonedDateTime, offsets[i], - autoPartitionTimeUnit); + autoPartitionTimeUnit, + null); assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo(expected[i]); } } + @Test + void testGenerateAutoPartitionNameWithCustomTimeFormat() { + ZonedDateTime zonedDateTime = + ZonedDateTime.of(LocalDateTime.of(2024, 11, 11, 11, 0), ZoneId.of("UTC-8")); + + assertThat( + generateAutoPartition( + Collections.singletonList("dt"), + zonedDateTime, + 0, + AutoPartitionTimeUnit.DAY, + "yyyy-MM-dd") + .getPartitionName()) + .isEqualTo("2024-11-11"); + + assertThat( + generateAutoPartition( + Collections.singletonList("dt"), + zonedDateTime, + 1, + AutoPartitionTimeUnit.HOUR, + "yyyy-MM-dd-HH") + .getPartitionName()) + .isEqualTo("2024-11-11-12"); + } + @Test void testString() { Object value = BinaryString.fromString("Fluss"); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java index 695c770d45..58b65d9903 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java @@ -17,6 +17,12 @@ package org.apache.fluss.server.coordinator; +import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; +import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition; +import static org.apache.fluss.utils.PartitionUtils.generateAutoPartitionTime; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; + import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.TabletServerInfo; import org.apache.fluss.config.AutoPartitionTimeUnit; @@ -37,13 +43,9 @@ import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; - import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; @@ -65,11 +67,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; -import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition; -import static org.apache.fluss.utils.PartitionUtils.generateAutoPartitionTime; -import static org.apache.fluss.utils.Preconditions.checkNotNull; -import static org.apache.fluss.utils.concurrent.LockUtils.inLock; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * An auto partition manager which will trigger auto partition for the tables in cluster @@ -419,11 +418,16 @@ private List partitionNamesToPreCreate( currentInstant, autoPartitionStrategy.timeZone().toZoneId()); int partitionToPreCreate = autoPartitionStrategy.numPreCreate(); + String timeFormat = autoPartitionStrategy.timeFormat(); List partitionsToCreate = new ArrayList<>(); for (int idx = 0; idx < partitionToPreCreate; idx++) { ResolvedPartitionSpec partition = generateAutoPartition( - partitionKeys, currentZonedDateTime, idx, autoPartitionTimeUnit); + partitionKeys, + currentZonedDateTime, + idx, + autoPartitionTimeUnit, + timeFormat); // if the partition already exists, we don't need to create it, otherwise, create it if (!currentPartitions.containsKey(partition.getPartitionName())) { partitionsToCreate.add(partition); @@ -451,7 +455,10 @@ private void dropPartitions( // Get the earliest one partition time that need to retain. String lastRetainPartitionTime = generateAutoPartitionTime( - currentZonedDateTime, -numToRetain, autoPartitionStrategy.timeUnit()); + currentZonedDateTime, + -numToRetain, + autoPartitionStrategy.timeUnit(), + autoPartitionStrategy.timeFormat()); // For partition table with a single partition key, for example dt(yyyyMMdd) // assuming now is 20250508, and table.auto-partition.num-retention=2 then partition diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index fcd5f1688a..573ce42c87 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -44,11 +44,13 @@ import javax.annotation.Nullable; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -478,6 +480,26 @@ private static void checkPartition( + "partition is enabled, please set table property '%s'.", ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key())); } + + String timeFormat = autoPartition.timeFormat(); + if (timeFormat != null) { + if (timeFormat.trim().isEmpty()) { + throw new InvalidConfigException( + String.format( + "'%s' must not be empty.", + ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key())); + } + try { + DateTimeFormatter.ofPattern(timeFormat, Locale.ROOT); + } catch (IllegalArgumentException e) { + throw new InvalidConfigException( + String.format( + "Invalid time format '%s' for '%s': %s", + timeFormat, + ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key(), + e.getMessage())); + } + } } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index b2abac8307..aeec378201 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -785,7 +785,7 @@ public static List getExpectAddedPartitions( List partitions = new ArrayList<>(); for (int i = 0; i < newPartitions; i++) { partitions.add( - generateAutoPartition(partitionKeys, addDateTime, i, timeUnit) + generateAutoPartition(partitionKeys, addDateTime, i, timeUnit, null) .getPartitionName()); } return partitions; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java new file mode 100644 index 0000000000..5200b00341 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.utils; + +import org.apache.fluss.config.AutoPartitionTimeUnit; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class TableDescriptorValidationTest { + + @Test + void validTimeFormatIsAccepted() { + TableDescriptor descriptor = + dayPartitionedBuilder() + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key(), "yyyy-MM-dd") + .build(); + assertThatCode(() -> validate(descriptor)).doesNotThrowAnyException(); + } + + @Test + void blankTimeFormatIsRejected() { + TableDescriptor descriptor = + dayPartitionedBuilder() + .property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key(), " ") + .build(); + assertThatThrownBy(() -> validate(descriptor)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining(ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key()) + .hasMessageContaining("must not be empty"); + } + + @Test + void malformedTimeFormatIsRejected() { + TableDescriptor descriptor = + dayPartitionedBuilder() + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key(), "yyyy-MM-dd'") + .build(); + assertThatThrownBy(() -> validate(descriptor)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining("Invalid time format"); + } + + private static TableDescriptor.Builder dayPartitionedBuilder() { + return TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("dt", DataTypes.STRING()) + .build()) + .distributedBy(1) + .partitionedBy("dt") + .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 1) + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.DAY) + .property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 0); + } + + private static void validate(TableDescriptor descriptor) { + TableDescriptorValidation.validateTableDescriptor(descriptor, Integer.MAX_VALUE, null); + } +} diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index af9f97a696..02f340500c 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -70,6 +70,7 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) | table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | | table.auto-partition.key | String | (None) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions. And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | | table.auto-partition.time-unit | ENUM | DAY | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | +| table.auto-partition.time-format | String | (derived from unit) | The time format used for auto-created partition values. If not set, the format is derived from `table.auto-partition.time-unit` (e.g. `yyyyMMdd` for DAY). When set, this value overrides the format derived from the time unit, while the partition granularity still follows `table.auto-partition.time-unit`. A custom format must use zero-padded numeric fields covering at least the unit's precision so that partition values sort by time as strings (e.g. `yyyy-MM-dd` for DAY). | | table.auto-partition.num-precreate | Integer | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow. For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | | table.auto-partition.num-retention | Integer | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will be retained. | | table.auto-partition.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | diff --git a/website/docs/table-design/data-distribution/partitioning.md b/website/docs/table-design/data-distribution/partitioning.md index ffeef84f6e..7621d608c0 100644 --- a/website/docs/table-design/data-distribution/partitioning.md +++ b/website/docs/table-design/data-distribution/partitioning.md @@ -60,6 +60,7 @@ In this case, when automatic partitioning occurs (Fluss will periodically operat | table.auto-partition.enabled | Boolean | no | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | | table.auto-partition.key | String | no | (none) | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions. And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | | table.auto-partition.time-unit | ENUM | no | DAY | The time granularity for auto created partitions. The default value is 'DAY'. Valid values are 'HOUR', 'DAY', 'MONTH', 'QUARTER', 'YEAR'. If the value is 'HOUR', the partition format for auto created is yyyyMMddHH. If the value is 'DAY', the partition format for auto created is yyyyMMdd. If the value is 'MONTH', the partition format for auto created is yyyyMM. If the value is 'QUARTER', the partition format for auto created is yyyyQ. If the value is 'YEAR', the partition format for auto created is yyyy. | +| table.auto-partition.time-format | String | no | (derived from unit) | The time format used for auto-created partition values. If not set, the format is derived from `table.auto-partition.time-unit` (e.g. `yyyyMMdd` for DAY). When set, this value overrides the format derived from the time unit, while the partition granularity still follows `table.auto-partition.time-unit`. A custom format must use zero-padded numeric fields covering at least the unit's precision so that partition values sort by time as strings (e.g. `yyyy-MM-dd` for DAY). | | table.auto-partition.num-precreate | Integer | no | 2 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the 'table.auto-partition.time-unit' is 'DAY'(default), one precreated partition is for today and another one is for tomorrow. For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | | table.auto-partition.num-retention | Integer | no | 7 | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7. | | table.auto-partition.time-zone | String | no | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | @@ -74,7 +75,7 @@ The time unit for the automatic partition table `auto-partition.time-unit` can t | MONTH | yyyyMM | 202409 | | QUARTER | yyyyQ | 20241 | | YEAR | yyyy | 2024 | - + ### Fluss Cluster Configuration Below are the configuration items related to Fluss cluster and automatic partitioning. From dd9ee1ace6625f1cba22f6aac767c8a0e2eb2f15 Mon Sep 17 00:00:00 2001 From: wattt3 Date: Thu, 30 Apr 2026 13:34:23 +0900 Subject: [PATCH 2/2] Apply spotless format --- .../coordinator/AutoPartitionManager.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java index 58b65d9903..8d7b22d80a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java @@ -17,12 +17,6 @@ package org.apache.fluss.server.coordinator; -import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; -import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition; -import static org.apache.fluss.utils.PartitionUtils.generateAutoPartitionTime; -import static org.apache.fluss.utils.Preconditions.checkNotNull; -import static org.apache.fluss.utils.concurrent.LockUtils.inLock; - import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.TabletServerInfo; import org.apache.fluss.config.AutoPartitionTimeUnit; @@ -43,9 +37,13 @@ import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; @@ -67,8 +65,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; +import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; +import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition; +import static org.apache.fluss.utils.PartitionUtils.generateAutoPartitionTime; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inLock; /** * An auto partition manager which will trigger auto partition for the tables in cluster