Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ public class FlinkConnectorOptions {
+ "as a small value would cause frequent requests and increase server load. In the future, "
+ "once list partitions is optimized, the default value of this parameter can be reduced.");

public static final ConfigOption<Boolean> SCAN_WATERMARK_ENABLED =
ConfigOptions.key("scan.watermark.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to enable watermark generation. "
+ "Set to false via SQL hints to disable watermark for tables "
+ "that already have a table-level WATERMARK definition.");

public static final ConfigOption<String> SCAN_WATERMARK_COLUMN =
ConfigOptions.key("scan.watermark.column")
.stringType()
.noDefaultValue()
.withDescription(
"The column name used for event-time watermark generation. "
+ "When specified via SQL hints, it overrides the table-level WATERMARK definition. "
+ "The column must be of type TIMESTAMP, TIMESTAMP_LTZ, or BIGINT (epoch millis).");

public static final ConfigOption<Duration> SCAN_WATERMARK_DELAY =
ConfigOptions.key("scan.watermark.delay")
.durationType()
.defaultValue(Duration.ZERO)
.withDescription(
"The maximum out-of-orderness allowed for event-time watermark generation. "
+ "Used together with 'scan.watermark.column'. Default is 0 (no delay).");

public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
ConfigOptions.key("sink.ignore-delete")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.fluss.flink.source.BinlogFlinkTableSource;
import org.apache.fluss.flink.source.ChangelogFlinkTableSource;
import org.apache.fluss.flink.source.FlinkTableSource;
import org.apache.fluss.flink.source.WatermarkContext;
import org.apache.fluss.flink.source.reader.LeaseContext;
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
import org.apache.fluss.metadata.MergeEngineType;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
Expand All @@ -49,8 +51,11 @@
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -148,6 +153,22 @@ public DynamicTableSource createDynamicTableSource(Context context) {
.toMillis();

LeaseContext leaseContext = LeaseContext.fromConf(tableOptions);

// Build watermark context from hint configuration
boolean watermarkEnabled = tableOptions.get(FlinkConnectorOptions.SCAN_WATERMARK_ENABLED);
String watermarkColumn =
tableOptions.getOptional(FlinkConnectorOptions.SCAN_WATERMARK_COLUMN).orElse(null);
if (watermarkColumn != null) {
validateWatermarkColumn(watermarkColumn, tableOutputType);
}
Duration watermarkDelay = tableOptions.get(FlinkConnectorOptions.SCAN_WATERMARK_DELAY);
Duration sourceIdleTimeout =
context.getConfiguration()
.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
WatermarkContext watermarkContext =
new WatermarkContext(
watermarkEnabled, watermarkColumn, watermarkDelay, sourceIdleTimeout);

return new FlinkTableSource(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(
Expand All @@ -166,7 +187,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
context.getCatalogTable().getOptions(),
leaseContext);
leaseContext,
watermarkContext);
}

@Override
Expand Down Expand Up @@ -234,6 +256,9 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_MODE,
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.SCAN_WATERMARK_ENABLED,
FlinkConnectorOptions.SCAN_WATERMARK_COLUMN,
FlinkConnectorOptions.SCAN_WATERMARK_DELAY,
FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_ID,
FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION,
FlinkConnectorOptions.LOOKUP_ASYNC,
Expand Down Expand Up @@ -318,6 +343,33 @@ private static void validateSourceOptions(ReadableConfig tableOptions) {
FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);
}

/**
* Validates that the watermark column specified in the hint exists in the table schema and has
* a supported type (TIMESTAMP, TIMESTAMP_LTZ, or BIGINT).
*/
private static void validateWatermarkColumn(String watermarkColumn, RowType tableOutputType) {
int columnIndex = tableOutputType.getFieldIndex(watermarkColumn);
if (columnIndex < 0) {
throw new IllegalArgumentException(
String.format(
"Watermark column '%s' specified in hint does not exist in the table. "
+ "Available columns: %s",
watermarkColumn, tableOutputType.getFieldNames()));
}

LogicalType columnType = tableOutputType.getTypeAt(columnIndex);
LogicalTypeRoot typeRoot = columnType.getTypeRoot();
if (typeRoot != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
&& typeRoot != LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
&& typeRoot != LogicalTypeRoot.BIGINT) {
throw new IllegalArgumentException(
String.format(
"Watermark column '%s' has unsupported type '%s'. "
+ "Only TIMESTAMP, TIMESTAMP_LTZ, and BIGINT (epoch millis) are supported.",
watermarkColumn, columnType.asSummaryString()));
}
}

/** Creates a ChangelogFlinkTableSource for $changelog virtual tables. */
private DynamicTableSource createChangelogTableSource(
Context context, ObjectIdentifier tableIdentifier, String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -176,6 +177,9 @@ public class FlinkTableSource
/** Watermark strategy that is pushed down by the Flink optimizer. */
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;

/** Watermark context from SQL hints and Flink execution config. */
private final WatermarkContext watermarkContext;

public FlinkTableSource(
TablePath tablePath,
Configuration flussConfig,
Expand All @@ -193,7 +197,8 @@ public FlinkTableSource(
boolean isDataLakeEnabled,
@Nullable MergeEngineType mergeEngineType,
Map<String, String> tableOptions,
LeaseContext leaseContext) {
LeaseContext leaseContext,
WatermarkContext watermarkContext) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableOutputType = tableOutputType;
Expand All @@ -220,6 +225,7 @@ public FlinkTableSource(
"LakeSource must not be null if enable datalake");
}
this.tableConfig = checkNotNull(tableConfig, "tableConfig must not be null");
this.watermarkContext = checkNotNull(watermarkContext, "watermarkContext must not be null");

// Pre-compute available statistics columns to avoid repeated calculation
RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType);
Expand Down Expand Up @@ -405,10 +411,7 @@ public boolean isBounded() {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
WatermarkStrategy<RowData> strategy =
watermarkStrategy != null
? watermarkStrategy
: WatermarkStrategy.noWatermarks();
WatermarkStrategy<RowData> strategy = resolveWatermarkStrategy();
return execEnv.fromSource(source, strategy, "FlussSource-" + tablePath);
}

Expand Down Expand Up @@ -481,7 +484,8 @@ public DynamicTableSource copy() {
isDataLakeEnabled,
mergeEngineType,
tableOptions,
leaseContext);
leaseContext,
watermarkContext);
source.producedDataType = producedDataType;
source.projectedFields = projectedFields;
source.singleRowFilter = singleRowFilter;
Expand Down Expand Up @@ -518,6 +522,117 @@ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}

/**
* Resolves the effective watermark strategy. Hint-based watermark takes priority over
* table-level watermark pushed down by the Flink optimizer.
*/
private WatermarkStrategy<RowData> resolveWatermarkStrategy() {
// Watermark disabled via hint takes highest priority
if (!watermarkContext.isEnabled()) {
return WatermarkStrategy.noWatermarks();
}
// Hint-based watermark column overrides table-level
if (watermarkContext.getColumn() != null) {
return buildHintWatermarkStrategy();
}
// Fall back to pushed-down watermark from table schema
if (watermarkStrategy != null) {
return watermarkStrategy;
}
return WatermarkStrategy.noWatermarks();
}

/**
* Builds a WatermarkStrategy from hint-based configuration. Supports TIMESTAMP, TIMESTAMP_LTZ,
* and BIGINT column types.
*/
private WatermarkStrategy<RowData> buildHintWatermarkStrategy() {
String columnName = watermarkContext.getColumn();
// Find the column index in the produced (projected) data type
org.apache.flink.table.types.logical.RowType outputRowType;
if (producedDataType instanceof org.apache.flink.table.types.logical.RowType) {
outputRowType = (org.apache.flink.table.types.logical.RowType) producedDataType;
} else {
throw new IllegalStateException(
"Cannot apply hint watermark: produced data type is not a RowType.");
}

int columnIndex = -1;
for (int i = 0; i < outputRowType.getFieldCount(); i++) {
if (outputRowType.getFieldNames().get(i).equals(columnName)) {
columnIndex = i;
break;
}
}
if (columnIndex < 0) {
throw new IllegalArgumentException(
String.format(
"Watermark column '%s' specified in hint is not found in the output schema. "
+ "Available columns: %s. "
+ "Make sure the column is included in your SELECT clause.",
columnName, outputRowType.getFieldNames()));
}

LogicalType columnType = outputRowType.getTypeAt(columnIndex);

final int idx = columnIndex;
final int precision;

switch (columnType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
precision =
((org.apache.flink.table.types.logical.TimestampType) columnType)
.getPrecision();
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
precision =
((org.apache.flink.table.types.logical.LocalZonedTimestampType) columnType)
.getPrecision();
break;
case BIGINT:
precision = -1; // not used for BIGINT
break;
default:
throw new IllegalArgumentException(
String.format(
"Watermark column '%s' has unsupported type '%s'. "
+ "Only TIMESTAMP, TIMESTAMP_LTZ, and BIGINT are supported.",
columnName, columnType));
}

Duration delay =
watermarkContext.getDelay() != null ? watermarkContext.getDelay() : Duration.ZERO;
WatermarkStrategy<RowData> strategy =
WatermarkStrategy.<RowData>forBoundedOutOfOrderness(delay)
.withTimestampAssigner(
(rowData, ts) -> {
if (rowData.isNullAt(idx)) {
return Long.MIN_VALUE;
}
switch (columnType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return rowData.getTimestamp(idx, precision)
.getMillisecond();
case BIGINT:
return rowData.getLong(idx);
default:
throw new IllegalStateException(
"Unsupported watermark column type: "
+ columnType);
}
});

// Apply idle timeout to handle idle splits/buckets that would otherwise block watermark
// advancement. Skip if the timeout is not positive (0 or negative means disabled).
Duration sourceIdleTimeout = watermarkContext.getSourceIdleTimeout();
if (sourceIdleTimeout != null && sourceIdleTimeout.toMillis() > 0) {
strategy = strategy.withIdleness(sourceIdleTimeout);
}

return strategy;
}

@Override
public Result applyFilters(List<ResolvedExpression> filters) {

Expand Down
Loading