Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;

Expand All @@ -57,9 +58,9 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
private final FetchTask.Context taskContext;
private final ExecutorService executorService;
private final Set<TableId> pureStreamPhaseTables;
private final AtomicInteger numberOfRunningTasks;

private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
private volatile Throwable readException;

private FetchTask<SourceSplitBase> streamFetchTask;
Expand All @@ -77,10 +78,10 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureStreamPhaseTables = new HashSet<>();
this.isBackfillSkipped = taskContext.getSourceConfig().isSkipSnapshotBackfill();
this.supportsSplitKeyOptimization = taskContext.supportsSplitKeyOptimization();
this.numberOfRunningTasks = new AtomicInteger(0);
}

@Override
Expand All @@ -90,6 +91,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
configureFilter();
taskContext.configure(currentStreamSplit);
this.queue = taskContext.getQueue();
startReadTask();
executorService.submit(
() -> {
try {
Expand All @@ -107,7 +109,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {

@Override
public boolean isFinished() {
return currentStreamSplit == null || !currentTaskRunning;
return currentStreamSplit == null || numberOfRunningTasks.get() == 0;
}

@Nullable
Expand All @@ -116,7 +118,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
// what happens if currentTaskRunning
if (currentTaskRunning) {
if (numberOfRunningTasks.get() > 0) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (isEndWatermarkEvent(event.getRecord())) {
Expand Down Expand Up @@ -282,8 +284,12 @@ private void configureFilter() {
this.pureStreamPhaseTables.clear();
}

public void startReadTask() {
this.numberOfRunningTasks.incrementAndGet();
}

public void stopReadTask() throws Exception {
this.currentTaskRunning = false;
this.numberOfRunningTasks.decrementAndGet();

if (taskContext != null) {
taskContext.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,57 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
void testMultipleSplitsWithBackfill() throws Exception {
customDatabase.createAndInitialize();

TestTableId tableId = new TestTableId(schemaName, tableName);
PostgresSourceConfigFactory sourceConfigFactory =
getMockPostgresSourceConfigFactory(
customDatabase, schemaName, tableName, null, 4, false);
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));

SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPreHighWatermarkAction(
(postgresSourceConfig, split) -> {
try (PostgresConnection conn = postgresDialect.openJdbcConnection()) {
conn.execute(
"UPDATE "
+ tableId.toSql()
+ " SET address = 'Beijing' WHERE \"Id\" = 103");
conn.commit();
}
});

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("Id", DataTypes.BIGINT()),
DataTypes.FIELD("Name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));

PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);

List<String> actual =
readTableSnapshotSplits(
reOrderSnapshotSplits(snapshotSplits),
postgresSourceFetchTaskContext,
snapshotSplits.size(),
dataType,
snapshotHooks);

// Verify the ScanFetcher can successfully process all splits without getting stuck
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
// when reusing a stopped ScanFetcher for the next split).
// The preHighWatermark hook forces backfill phase for each split by making
// highWatermark > lowWatermark.
assertThat(actual).hasSize(21);
assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
}

@Test
void testSnapshotFetchSize() throws Exception {
customDatabase.createAndInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,65 @@ void testInsertDataInSnapshotScan() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
void testMultipleSplitsWithBackfill() throws Exception {
String databaseName = "customer";
String tableName = "dbo.customers";

initializeSqlServerTable(databaseName);

SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 4);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);

String tableId = databaseName + "." + tableName;
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPreHighWatermarkAction(
(config, split) -> {
executeSql(
(SqlServerSourceConfig) config,
new String[] {
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103"
});
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
sqlServerDialect,
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);

List<String> actual =
readTableSnapshotSplits(
reOrderSnapshotSplits(snapshotSplits),
sqlServerSourceFetchTaskContext,
snapshotSplits.size(),
dataType,
hooks);

// Verify the ScanFetcher can successfully process all splits without getting stuck
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
// when reusing a stopped ScanFetcher for the next split).
// The preHighWatermark hook forces backfill phase for each split by making
// highWatermark > lowWatermark.
Assertions.assertThat(actual).hasSize(21);
Assertions.assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
}

@Test
void testDateTimePrimaryKey() throws Exception {
String databaseName = "pk";
Expand Down
Loading