diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 46822cd5487..662b8757d1a 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -444,6 +444,16 @@ Flink SQL> SELECT * FROM orders; 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。 + + scan.rate-limit.records-per-second + optional + -1 + Long + + 数据读取的限速配置(快照和 binlog 阶段均生效),单位为每秒记录数。
+ 默认值 -1 表示不限速。 + + use.legacy.json.format optional diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 04ec2844551..5d925f2880f 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -341,6 +341,16 @@ pipeline: 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。 + + scan.rate-limit.records-per-second + optional + -1 + Long + + 数据读取的限速配置(快照和 binlog 阶段均生效),单位为每秒记录数。
+ 默认值 -1 表示不限速。 + + metadata.list optional diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index ad641ef0e93..b8ee157b370 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -464,12 +464,22 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will false Boolean - Whether to skip backfill in snapshot reading phase.
+ Whether to skip backfill in snapshot reading phase.
If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.
WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. + + scan.rate-limit.records-per-second + optional + -1 + Long + + Rate limit for source reading (both snapshot and binlog phases), in records per second.
+ A value of -1 (default) means no rate limit is applied. + + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index e6e61e20720..5da7666a0c1 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -356,12 +356,22 @@ pipeline: false Boolean - Whether to skip backfill in snapshot reading phase.
+ Whether to skip backfill in snapshot reading phase.
If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.
WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. + + scan.rate-limit.records-per-second + optional + -1 + Long + + Rate limit for source reading (both snapshot and binlog phases), in records per second.
+ A value of -1 (default) means no rate limit is applied. + + metadata.list optional diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 1b3540da0bb..13cc7976fe3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -82,6 +82,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; @@ -167,6 +168,7 @@ public DataSource createDataSource(Context context) { boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); boolean isAssignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + long rateLimit = config.get(SCAN_RATE_LIMIT_RECORDS_PER_SECOND); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -220,7 +222,8 @@ public DataSource createDataSource(Context context) { .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) - .skipSnapshotBackfill(skipSnapshotBackfill); + .skipSnapshotBackfill(skipSnapshotBackfill) + .rateLimit(rateLimit); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -358,6 +361,7 @@ public Set> optionalOptions() { options.add(PARSE_ONLINE_SCHEMA_CHANGES); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(SCAN_RATE_LIMIT_RECORDS_PER_SECOND); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..f33d0d8a0d2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -330,4 +330,13 @@ public class MySqlDataSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + @Experimental + public static final ConfigOption SCAN_RATE_LIMIT_RECORDS_PER_SECOND = + ConfigOptions.key("scan.rate-limit.records-per-second") + .longType() + .defaultValue(-1L) + .withDescription( + "Rate limit for source reading (both snapshot and binlog phases), " + + "in records per second. A value of -1 (default) means no rate limit is applied."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 5fcbae1a1e4..9564425bc0b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -24,6 +24,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.ratelimit.RateLimitedSourceReaderAdapter; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.PublicEvolving; @@ -51,7 +52,6 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer; import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; -import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.FlinkRuntimeException; @@ -180,12 +180,21 @@ public SourceReader createReader(SourceReaderContext readerContex readerContext.getIndexOfSubtask(), mySqlSourceReaderContext, snapshotHooks); - return new MySqlSourceReader<>( - splitReaderSupplier, - recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig), - readerContext.getConfiguration(), - mySqlSourceReaderContext, - sourceConfig); + MySqlRecordEmitter recordEmitter = + recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig); + MySqlSourceReader sourceReader = + new MySqlSourceReader<>( + splitReaderSupplier, + recordEmitter, + readerContext.getConfiguration(), + mySqlSourceReaderContext, + sourceConfig); + long rateLimit = sourceConfig.getRateLimit(); + if (rateLimit > 0) { + return RateLimitedSourceReaderAdapter.wrapWithRateLimiter( + sourceReader, (double) rateLimit, readerContext.currentParallelism()); + } + return sourceReader; } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 93fa2a0d36a..4d7fb8752cb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -312,6 +312,15 @@ public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedCh return this; } + /** + * The rate limit for source reading in records per second, applied to both snapshot and binlog + * phases. A value of -1 means no rate limit. + */ + public MySqlSourceBuilder rateLimit(long rateLimit) { + this.configFactory.rateLimit(rateLimit); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index cf456fcaed0..2d95c421abe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -72,6 +72,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; + private final long rateLimit; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -112,7 +113,8 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + long rateLimit) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -158,6 +160,7 @@ public class MySqlSourceConfig implements Serializable { this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.rateLimit = rateLimit; } public String getHostname() { @@ -299,4 +302,8 @@ public boolean isSkipSnapshotBackfill() { public boolean isTreatTinyInt1AsBoolean() { return treatTinyInt1AsBoolean; } + + public long getRateLimit() { + return rateLimit; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 569b62232db..5699532f250 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -78,6 +78,7 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; private boolean assignUnboundedChunkFirst = false; + private long rateLimit = MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND.defaultValue(); public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -341,6 +342,15 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde return this; } + /** + * The rate limit for source reading in records per second, applied to both snapshot and binlog + * phases. A value of -1 means no rate limit. + */ + public MySqlSourceConfigFactory rateLimit(long rateLimit) { + this.rateLimit = rateLimit; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -444,6 +454,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + rateLimit); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a8e143f5fc5..af6b397271e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -292,4 +292,13 @@ public class MySqlSourceOptions { .defaultValue(true) .withDescription( "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk."); + + @Experimental + public static final ConfigOption SCAN_RATE_LIMIT_RECORDS_PER_SECOND = + ConfigOptions.key("scan.rate-limit.records-per-second") + .longType() + .defaultValue(-1L) + .withDescription( + "Rate limit for source reading (both snapshot and binlog phases), " + + "in records per second. A value of -1 (default) means no rate limit is applied."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 2a1f0519435..0cc7bf10131 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -104,6 +104,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final boolean appendOnly; + private final long rateLimit; + // -------------------------------------------------------------------------------------------- // Mutable attributes // -------------------------------------------------------------------------------------------- @@ -144,7 +146,8 @@ public MySqlTableSource( boolean parseOnlineSchemaChanges, boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst, - boolean appendOnly) { + boolean appendOnly, + long rateLimit) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -178,6 +181,7 @@ public MySqlTableSource( this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; this.appendOnly = appendOnly; + this.rateLimit = rateLimit; } @Override @@ -241,6 +245,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .parseOnLineSchemaChanges(parseOnlineSchemaChanges) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) + .rateLimit(rateLimit) .build(); return SourceProvider.of(parallelSource); } else { @@ -330,7 +335,8 @@ public DynamicTableSource copy() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + rateLimit); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -376,6 +382,7 @@ public boolean equals(Object o) { && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges && useLegacyJsonFormat == that.useLegacyJsonFormat && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst + && rateLimit == that.rateLimit && Objects.equals(appendOnly, that.appendOnly); } @@ -413,7 +420,8 @@ public int hashCode() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + rateLimit); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 5ea430d94e7..08740659857 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -109,6 +109,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean appendOnly = config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); + long rateLimit = config.get(MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND); if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); @@ -156,7 +157,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { parseOnLineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + rateLimit); } @Override @@ -206,6 +208,7 @@ public Set> optionalOptions() { options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); + options.add(MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 01c2dff84da..f4a22c3d0b5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -56,6 +56,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST; +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT; @@ -129,7 +130,8 @@ void testCommonProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -179,7 +181,8 @@ void testEnableParallelReadSource() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -225,7 +228,8 @@ void testEnableParallelReadSourceWithSingleServerId() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -269,7 +273,8 @@ void testEnableParallelReadSourceLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -330,7 +335,8 @@ void testOptionalProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), true, SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource) .isEqualTo(expectedSource) .isInstanceOf(MySqlTableSource.class); @@ -389,7 +395,8 @@ void testStartupFromSpecificOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -431,7 +438,8 @@ void testStartupFromInitial() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -474,7 +482,8 @@ void testStartupFromEarliestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -518,7 +527,8 @@ void testStartupFromSpecificTimestamp() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -560,7 +570,8 @@ void testStartupFromLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -607,7 +618,8 @@ void testMetadataColumns() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + -1L); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -810,7 +822,91 @@ void testEnablingExperimentalOptions() { true, true, true, - false); + false, + -1L); + Assertions.assertThat(actualSource).isEqualTo(expectedSource); + } + + @Test + void testRateLimitOption() { + Map properties = getAllOptions(); + properties.put("scan.rate-limit.records-per-second", "5000"); + + DynamicTableSource actualSource = createTableSource(properties); + MySqlTableSource expectedSource = + new MySqlTableSource( + SCHEMA, + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.systemDefault(), + PROPERTIES, + null, + false, + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(), + CHUNK_META_GROUP_SIZE.defaultValue(), + SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(), + CONNECT_TIMEOUT.defaultValue(), + CONNECT_MAX_RETRIES.defaultValue(), + CONNECTION_POOL_SIZE.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + StartupOptions.initial(), + false, + false, + new Properties(), + HEARTBEAT_INTERVAL.defaultValue(), + null, + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue(), + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + false, + 5000L); + Assertions.assertThat(actualSource).isEqualTo(expectedSource); + } + + @Test + void testRateLimitDefaultIsDisabled() { + Map properties = getAllOptions(); + + DynamicTableSource actualSource = createTableSource(properties); + MySqlTableSource expectedSource = + new MySqlTableSource( + SCHEMA, + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.systemDefault(), + PROPERTIES, + null, + false, + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(), + CHUNK_META_GROUP_SIZE.defaultValue(), + SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(), + CONNECT_TIMEOUT.defaultValue(), + CONNECT_MAX_RETRIES.defaultValue(), + CONNECTION_POOL_SIZE.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + StartupOptions.initial(), + false, + false, + new Properties(), + HEARTBEAT_INTERVAL.defaultValue(), + null, + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), + USE_LEGACY_JSON_FORMAT.defaultValue(), + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + false, + SCAN_RATE_LIMIT_RECORDS_PER_SECOND.defaultValue()); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } diff --git a/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapter.java b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapter.java new file mode 100644 index 00000000000..6adef8e2796 --- /dev/null +++ b/flink-cdc-flink1-compat/src/main/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.ratelimit; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceSplit; + +/** + * Compatibility adapter for Flink 1.20. Rate limiting is not supported in Flink 1.x, so this + * adapter simply returns the original source reader without applying any rate limiting. + */ +@Internal +public class RateLimitedSourceReaderAdapter { + + /** + * Wraps the given source reader with rate limiting. In Flink 1.20, rate limiting is not + * supported, so the original reader is returned as-is. + * + * @param sourceReader the source reader to wrap + * @param recordsPerSecond the desired rate limit (ignored in Flink 1.x) + * @param parallelism the current parallelism (ignored in Flink 1.x) + * @return the original source reader without rate limiting + */ + @SuppressWarnings("unchecked") + public static SourceReader wrapWithRateLimiter( + SourceReader sourceReader, double recordsPerSecond, int parallelism) { + return sourceReader; + } +} diff --git a/flink-cdc-flink2-compat/src/main/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapter.java b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapter.java new file mode 100644 index 00000000000..95fe34d694c --- /dev/null +++ b/flink-cdc-flink2-compat/src/main/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.ratelimit; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.InputStatus; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Compatibility adapter for Flink 2.2. Wraps a {@link SourceReader} with rate limiting. + * + *

Unlike the built-in {@link RateLimitedSourceReader}, this adapter correctly delegates all + * methods including {@link SourceReader#handleSourceEvents(SourceEvent)}, which is critical for + * connectors (e.g. MySQL CDC) that rely on source events for split coordination. + */ +@Internal +public class RateLimitedSourceReaderAdapter { + + /** + * Wraps the given source reader with rate limiting. + * + * @param sourceReader the source reader to wrap + * @param recordsPerSecond the desired rate limit in records per second + * @param parallelism the current parallelism + * @return a rate-limited source reader that delegates all methods including handleSourceEvents + */ + public static SourceReader wrapWithRateLimiter( + SourceReader sourceReader, double recordsPerSecond, int parallelism) { + RateLimiter rateLimiter = + RateLimiterStrategy.perSecond(recordsPerSecond) + .createRateLimiter(parallelism); + return new DelegatingRateLimitedSourceReader<>(sourceReader, rateLimiter); + } + + /** + * A rate-limited source reader that delegates all methods to the wrapped reader, including + * {@link #handleSourceEvents(SourceEvent)}. + */ + private static class DelegatingRateLimitedSourceReader + implements SourceReader { + + private final SourceReader sourceReader; + private final RateLimiter rateLimiter; + private CompletableFuture availabilityFuture = null; + + DelegatingRateLimitedSourceReader( + SourceReader sourceReader, RateLimiter rateLimiter) { + this.sourceReader = checkNotNull(sourceReader); + this.rateLimiter = checkNotNull(rateLimiter); + } + + @Override + public void start() { + sourceReader.start(); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + if (availabilityFuture == null) { + return InputStatus.NOTHING_AVAILABLE; + } + availabilityFuture = null; + final InputStatus inputStatus = sourceReader.pollNext(output); + if (inputStatus == InputStatus.MORE_AVAILABLE) { + return InputStatus.NOTHING_AVAILABLE; + } + return inputStatus; + } + + @Override + public CompletableFuture isAvailable() { + if (availabilityFuture == null) { + availabilityFuture = + rateLimiter + .acquire() + .toCompletableFuture() + .thenCombine(sourceReader.isAvailable(), (l, r) -> null); + } + return availabilityFuture; + } + + @Override + public void addSplits(List splits) { + sourceReader.addSplits(splits); + } + + @Override + public void notifyNoMoreSplits() { + sourceReader.notifyNoMoreSplits(); + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + sourceReader.handleSourceEvents(sourceEvent); + } + + @Override + public List snapshotState(long checkpointId) { + return sourceReader.snapshotState(checkpointId); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + rateLimiter.notifyCheckpointComplete(checkpointId); + sourceReader.notifyCheckpointComplete(checkpointId); + } + + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume); + } + + @Override + public void close() throws Exception { + sourceReader.close(); + } + } +} diff --git a/flink-cdc-flink2-compat/src/test/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapterTest.java b/flink-cdc-flink2-compat/src/test/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapterTest.java new file mode 100644 index 00000000000..e5cb7dc6e77 --- /dev/null +++ b/flink-cdc-flink2-compat/src/test/java/org/apache/flink/api/connector/source/ratelimit/RateLimitedSourceReaderAdapterTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.ratelimit; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Tests for {@link RateLimitedSourceReaderAdapter}. */ +class RateLimitedSourceReaderAdapterTest { + + private TrackingSourceReader delegate; + private SourceReader rateLimitedReader; + + @BeforeEach + void setUp() { + delegate = new TrackingSourceReader(); + rateLimitedReader = RateLimitedSourceReaderAdapter.wrapWithRateLimiter(delegate, 1000.0, 1); + } + + @Test + void testWrapWithRateLimiterReturnsDifferentInstance() { + Assertions.assertThat(rateLimitedReader).isNotSameAs(delegate); + } + + @Test + void testStartDelegates() { + rateLimitedReader.start(); + Assertions.assertThat(delegate.startCalled).isTrue(); + } + + @Test + void testAddSplitsDelegates() { + List splits = Arrays.asList(new DummySplit("s1"), new DummySplit("s2")); + rateLimitedReader.addSplits(splits); + Assertions.assertThat(delegate.addedSplits).isEqualTo(splits); + } + + @Test + void testNotifyNoMoreSplitsDelegates() { + rateLimitedReader.notifyNoMoreSplits(); + Assertions.assertThat(delegate.notifyNoMoreSplitsCalled).isTrue(); + } + + @Test + void testHandleSourceEventsDelegates() { + SourceEvent event = new DummySourceEvent(); + rateLimitedReader.handleSourceEvents(event); + Assertions.assertThat(delegate.handledEvents).containsExactly(event); + } + + @Test + void testSnapshotStateDelegates() throws Exception { + delegate.snapshotResult = Arrays.asList(new DummySplit("snap")); + List result = rateLimitedReader.snapshotState(42L); + Assertions.assertThat(result).isEqualTo(delegate.snapshotResult); + Assertions.assertThat(delegate.snapshotCheckpointId).isEqualTo(42L); + } + + @Test + void testNotifyCheckpointCompleteDelegates() throws Exception { + rateLimitedReader.notifyCheckpointComplete(99L); + Assertions.assertThat(delegate.completedCheckpointId).isEqualTo(99L); + } + + @Test + void testPauseOrResumeSplitsDelegates() { + Collection toPause = Arrays.asList("s1"); + Collection toResume = Arrays.asList("s2"); + rateLimitedReader.pauseOrResumeSplits(toPause, toResume); + Assertions.assertThat(delegate.pausedSplits).isEqualTo(toPause); + Assertions.assertThat(delegate.resumedSplits).isEqualTo(toResume); + } + + @Test + void testCloseDelegates() throws Exception { + rateLimitedReader.close(); + Assertions.assertThat(delegate.closeCalled).isTrue(); + } + + @Test + void testIsAvailableReturnsFutureAndDoesNotBlock() { + // isAvailable() should return a non-null future (may or may not be done) + CompletableFuture future = rateLimitedReader.isAvailable(); + Assertions.assertThat(future).isNotNull(); + } + + @Test + void testPollNextReturnsNothingAvailableWhenFutureNotCompleted() throws Exception { + // Without calling isAvailable first, pollNext should return NOTHING_AVAILABLE + InputStatus status = rateLimitedReader.pollNext(new DummyReaderOutput<>()); + Assertions.assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE); + } + + // ----------- helpers ----------- + + private static class DummySplit implements SourceSplit { + private final String id; + + DummySplit(String id) { + this.id = id; + } + + @Override + public String splitId() { + return id; + } + } + + private static class DummySourceEvent implements SourceEvent {} + + private static class DummyReaderOutput implements ReaderOutput { + @Override + public void collect(T record) {} + + @Override + public void collect(T record, long timestamp) {} + + @Override + public void emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {} + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + + @Override + public org.apache.flink.api.connector.source.SourceOutput createOutputForSplit( + String splitId) { + return null; + } + + @Override + public void releaseOutputForSplit(String splitId) {} + } + + private static class TrackingSourceReader implements SourceReader { + boolean startCalled = false; + boolean notifyNoMoreSplitsCalled = false; + boolean closeCalled = false; + long snapshotCheckpointId = -1L; + long completedCheckpointId = -1L; + List addedSplits = new ArrayList<>(); + List handledEvents = new ArrayList<>(); + List snapshotResult = new ArrayList<>(); + Collection pausedSplits = null; + Collection resumedSplits = null; + + @Override + public void start() { + startCalled = true; + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + return InputStatus.END_OF_INPUT; + } + + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List splits) { + addedSplits.addAll(splits); + } + + @Override + public void notifyNoMoreSplits() { + notifyNoMoreSplitsCalled = true; + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + handledEvents.add(sourceEvent); + } + + @Override + public List snapshotState(long checkpointId) { + snapshotCheckpointId = checkpointId; + return snapshotResult; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + completedCheckpointId = checkpointId; + } + + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + pausedSplits = splitsToPause; + resumedSplits = splitsToResume; + } + + @Override + public void close() throws Exception { + closeCalled = true; + } + } +}