diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java index 72c62d600d3..5c37e3eb566 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -47,6 +47,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent; @@ -57,9 +58,9 @@ public class IncrementalSourceStreamFetcher implements Fetcher pureStreamPhaseTables; + private final AtomicInteger numberOfRunningTasks; private volatile ChangeEventQueue queue; - private volatile boolean currentTaskRunning; private volatile Throwable readException; private FetchTask streamFetchTask; @@ -77,10 +78,10 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build(); this.executorService = Executors.newSingleThreadExecutor(threadFactory); - this.currentTaskRunning = true; this.pureStreamPhaseTables = new HashSet<>(); this.isBackfillSkipped = taskContext.getSourceConfig().isSkipSnapshotBackfill(); this.supportsSplitKeyOptimization = taskContext.supportsSplitKeyOptimization(); + this.numberOfRunningTasks = new AtomicInteger(0); } @Override @@ -90,6 +91,7 @@ public void submitTask(FetchTask fetchTask) { configureFilter(); taskContext.configure(currentStreamSplit); this.queue = taskContext.getQueue(); + startReadTask(); executorService.submit( () -> { try { @@ -107,7 +109,7 @@ public void submitTask(FetchTask fetchTask) { @Override public boolean isFinished() { - return currentStreamSplit == null || !currentTaskRunning; + return currentStreamSplit == null || numberOfRunningTasks.get() == 0; } @Nullable @@ -116,7 +118,7 @@ public Iterator pollSplitRecords() throws InterruptedException { checkReadException(); final List sourceRecords = new ArrayList<>(); // what happens if currentTaskRunning - if (currentTaskRunning) { + if (numberOfRunningTasks.get() > 0) { List batch = queue.poll(); for (DataChangeEvent event : batch) { if (isEndWatermarkEvent(event.getRecord())) { @@ -282,8 +284,12 @@ private void configureFilter() { this.pureStreamPhaseTables.clear(); } + public void startReadTask() { + this.numberOfRunningTasks.incrementAndGet(); + } + public void stopReadTask() throws Exception { - this.currentTaskRunning = false; + this.numberOfRunningTasks.decrementAndGet(); if (taskContext != null) { taskContext.close(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java index a40f70296a1..8406b98bbba 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java @@ -243,6 +243,57 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception { assertEqualsInAnyOrder(Arrays.asList(expected), actual); } + @Test + void testMultipleSplitsWithBackfill() throws Exception { + customDatabase.createAndInitialize(); + + TestTableId tableId = new TestTableId(schemaName, tableName); + PostgresSourceConfigFactory sourceConfigFactory = + getMockPostgresSourceConfigFactory( + customDatabase, schemaName, tableName, null, 4, false); + PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0); + PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0)); + + SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks(); + snapshotHooks.setPreHighWatermarkAction( + (postgresSourceConfig, split) -> { + try (PostgresConnection conn = postgresDialect.openJdbcConnection()) { + conn.execute( + "UPDATE " + + tableId.toSql() + + " SET address = 'Beijing' WHERE \"Id\" = 103"); + conn.commit(); + } + }); + + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("Id", DataTypes.BIGINT()), + DataTypes.FIELD("Name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + + PostgresSourceFetchTaskContext postgresSourceFetchTaskContext = + new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect); + List snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect); + + List actual = + readTableSnapshotSplits( + reOrderSnapshotSplits(snapshotSplits), + postgresSourceFetchTaskContext, + snapshotSplits.size(), + dataType, + snapshotHooks); + + // Verify the ScanFetcher can successfully process all splits without getting stuck + // (the FLINK-39207 bug would cause the reader to appear finished/stuck + // when reusing a stopped ScanFetcher for the next split). + // The preHighWatermark hook forces backfill phase for each split by making + // highWatermark > lowWatermark. + assertThat(actual).hasSize(21); + assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]"); + } + @Test void testSnapshotFetchSize() throws Exception { customDatabase.createAndInitialize(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java index 09008a3d03f..f541cfe439a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java @@ -202,6 +202,65 @@ void testInsertDataInSnapshotScan() throws Exception { assertEqualsInAnyOrder(Arrays.asList(expected), actual); } + @Test + void testMultipleSplitsWithBackfill() throws Exception { + String databaseName = "customer"; + String tableName = "dbo.customers"; + + initializeSqlServerTable(databaseName); + + SqlServerSourceConfigFactory sourceConfigFactory = + getConfigFactory(databaseName, new String[] {tableName}, 4); + SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0); + SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig); + + String tableId = databaseName + "." + tableName; + SnapshotPhaseHooks hooks = new SnapshotPhaseHooks(); + hooks.setPreHighWatermarkAction( + (config, split) -> { + executeSql( + (SqlServerSourceConfig) config, + new String[] { + "UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103" + }); + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext = + new SqlServerSourceFetchTaskContext( + sourceConfig, + sqlServerDialect, + createSqlServerConnection(sourceConfig.getDbzConnectorConfig()), + createSqlServerConnection(sourceConfig.getDbzConnectorConfig())); + + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + List snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect); + + List actual = + readTableSnapshotSplits( + reOrderSnapshotSplits(snapshotSplits), + sqlServerSourceFetchTaskContext, + snapshotSplits.size(), + dataType, + hooks); + + // Verify the ScanFetcher can successfully process all splits without getting stuck + // (the FLINK-39207 bug would cause the reader to appear finished/stuck + // when reusing a stopped ScanFetcher for the next split). + // The preHighWatermark hook forces backfill phase for each split by making + // highWatermark > lowWatermark. + Assertions.assertThat(actual).hasSize(21); + Assertions.assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]"); + } + @Test void testDateTimePrimaryKey() throws Exception { String databaseName = "pk";