Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ Flink SQL> SELECT * FROM orders;
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。
例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
</tr>
<tr>
<td>scan.rate-limit.records-per-second</td>
<td>optional</td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>
数据读取的限速配置(快照和 binlog 阶段均生效),单位为每秒记录数。<br>
默认值 -1 表示不限速。
</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
Expand Down
10 changes: 10 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ pipeline:
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。
例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
</tr>
<tr>
<td>scan.rate-limit.records-per-second</td>
<td>optional</td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>
数据读取的限速配置(快照和 binlog 阶段均生效),单位为每秒记录数。<br>
默认值 -1 表示不限速。
</td>
</tr>
<tr>
<td>metadata.list</td>
<td>optional</td>
Expand Down
12 changes: 11 additions & 1 deletion docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,12 +464,22 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to skip backfill in snapshot reading phase.<br>
Whether to skip backfill in snapshot reading phase.<br>
If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.<br>
WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised).
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.rate-limit.records-per-second</td>
<td>optional</td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>
Rate limit for source reading (both snapshot and binlog phases), in records per second.<br>
A value of -1 (default) means no rate limit is applied.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 11 additions & 1 deletion docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,22 @@ pipeline:
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to skip backfill in snapshot reading phase.<br>
Whether to skip backfill in snapshot reading phase.<br>
If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.<br>
WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised).
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.rate-limit.records-per-second</td>
<td>optional</td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>
Rate limit for source reading (both snapshot and binlog phases), in records per second.<br>
A value of -1 (default) means no rate limit is applied.
</td>
</tr>
<tr>
<td>metadata.list</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
Expand Down Expand Up @@ -167,6 +168,7 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
long rateLimit = config.get(SCAN_RATE_LIMIT_RECORDS_PER_SECOND);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -220,7 +222,8 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.skipSnapshotBackfill(skipSnapshotBackfill);
.skipSnapshotBackfill(skipSnapshotBackfill)
.rateLimit(rateLimit);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down Expand Up @@ -358,6 +361,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_RATE_LIMIT_RECORDS_PER_SECOND);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,13 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

@Experimental
public static final ConfigOption<Long> SCAN_RATE_LIMIT_RECORDS_PER_SECOND =
ConfigOptions.key("scan.rate-limit.records-per-second")
.longType()
.defaultValue(-1L)
.withDescription(
"Rate limit for source reading (both snapshot and binlog phases), "
+ "in records per second. A value of -1 (default) means no rate limit is applied.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.ratelimit.RateLimitedSourceReaderAdapter;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
Expand Down Expand Up @@ -51,7 +52,6 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -180,12 +180,21 @@ public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContex
readerContext.getIndexOfSubtask(),
mySqlSourceReaderContext,
snapshotHooks);
return new MySqlSourceReader<>(
splitReaderSupplier,
recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig),
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig);
MySqlRecordEmitter<T> recordEmitter =
recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig);
MySqlSourceReader<T> sourceReader =
new MySqlSourceReader<>(
splitReaderSupplier,
recordEmitter,
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig);
long rateLimit = sourceConfig.getRateLimit();
if (rateLimit > 0) {
return RateLimitedSourceReaderAdapter.wrapWithRateLimiter(
sourceReader, (double) rateLimit, readerContext.currentParallelism());
}
return sourceReader;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,15 @@ public MySqlSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedCh
return this;
}

/**
* The rate limit for source reading in records per second, applied to both snapshot and binlog
* phases. A value of -1 means no rate limit.
*/
public MySqlSourceBuilder<T> rateLimit(long rateLimit) {
this.configFactory.rateLimit(rateLimit);
return this;
}

/**
* Build the {@link MySqlSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
private final long rateLimit;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -112,7 +113,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
long rateLimit) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand Down Expand Up @@ -158,6 +160,7 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.rateLimit = rateLimit;
}

public String getHostname() {
Expand Down Expand Up @@ -299,4 +302,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}

public long getRateLimit() {
return rateLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
private long rateLimit = MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND.defaultValue();

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -341,6 +342,15 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde
return this;
}

/**
* The rate limit for source reading in records per second, applied to both snapshot and binlog
* phases. A value of -1 means no rate limit.
*/
public MySqlSourceConfigFactory rateLimit(long rateLimit) {
this.rateLimit = rateLimit;
return this;
}

/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
Expand Down Expand Up @@ -444,6 +454,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
rateLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,13 @@ public class MySqlSourceOptions {
.defaultValue(true)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.");

@Experimental
public static final ConfigOption<Long> SCAN_RATE_LIMIT_RECORDS_PER_SECOND =
ConfigOptions.key("scan.rate-limit.records-per-second")
.longType()
.defaultValue(-1L)
.withDescription(
"Rate limit for source reading (both snapshot and binlog phases), "
+ "in records per second. A value of -1 (default) means no rate limit is applied.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat

private final boolean appendOnly;

private final long rateLimit;

// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -144,7 +146,8 @@ public MySqlTableSource(
boolean parseOnlineSchemaChanges,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst,
boolean appendOnly) {
boolean appendOnly,
long rateLimit) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand Down Expand Up @@ -178,6 +181,7 @@ public MySqlTableSource(
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.appendOnly = appendOnly;
this.rateLimit = rateLimit;
}

@Override
Expand Down Expand Up @@ -241,6 +245,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
.rateLimit(rateLimit)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -330,7 +335,8 @@ public DynamicTableSource copy() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
appendOnly);
appendOnly,
rateLimit);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -376,6 +382,7 @@ public boolean equals(Object o) {
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
&& useLegacyJsonFormat == that.useLegacyJsonFormat
&& assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
&& rateLimit == that.rateLimit
&& Objects.equals(appendOnly, that.appendOnly);
}

Expand Down Expand Up @@ -413,7 +420,8 @@ public int hashCode() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
appendOnly);
appendOnly,
rateLimit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

boolean appendOnly =
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
long rateLimit = config.get(MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND);

if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
Expand Down Expand Up @@ -156,7 +157,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
parseOnLineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
appendOnly);
appendOnly,
rateLimit);
}

@Override
Expand Down Expand Up @@ -206,6 +208,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
options.add(MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND);
return options;
}

Expand Down
Loading