diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index 3136bb97d25..0e8c8ab53cc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -27,6 +27,7 @@ import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.DataChangeEvent; +import io.debezium.relational.TableId; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; @@ -260,13 +261,23 @@ private void assertLowWatermark(SourceRecord lowWatermark) { lowWatermark)); } - private boolean isChangeRecordInChunkRange(SourceRecord record) { - if (taskContext.isDataChangeRecord(record)) { - return taskContext.isRecordBetween( - record, - currentSnapshotSplit.getSplitStart(), - currentSnapshotSplit.getSplitEnd()); + @VisibleForTesting + boolean isChangeRecordInChunkRange(SourceRecord record) { + if (!taskContext.isDataChangeRecord(record)) { + return false; + } + // Skip records of other captured tables; their schema may not be loaded yet + // and their PKs do not align with this chunk's bounds. + TableId recordTableId = taskContext.getTableId(record); + if (recordTableId == null || !recordTableId.equals(currentSnapshotSplit.getTableId())) { + return false; } - return false; + return taskContext.isRecordBetween( + record, currentSnapshotSplit.getSplitStart(), currentSnapshotSplit.getSplitEnd()); + } + + @VisibleForTesting + void setCurrentSnapshotSplit(SnapshotSplit currentSnapshotSplit) { + this.currentSnapshotSplit = currentSnapshotSplit; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcherTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcherTest.java new file mode 100644 index 00000000000..56bcd11aea2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcherTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.source.reader.external; + +import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.logical.RowType; + +import io.debezium.relational.TableId; +import org.apache.kafka.connect.source.SourceRecord; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Tests for {@link IncrementalSourceScanFetcher}. */ +class IncrementalSourceScanFetcherTest { + + private static final TableId CURRENT_SPLIT_TABLE = TableId.parse("test_db.table_a"); + private static final TableId OTHER_TABLE = TableId.parse("test_db.table_b"); + + /** + * Reproduces the NPE seen in PostgreSQL backfill when the WAL stream carries change records for + * a captured table other than the one currently being snapshotted. Before the fix, the fetcher + * passed the foreign-table record to {@code isRecordBetween}, which dereferenced a null + * Debezium {@code Table} from the schema cache and threw NPE. After the fix, the foreign-table + * record is filtered out by tableId and {@code isRecordBetween} is never invoked. + */ + @Test + void testIsChangeRecordInChunkRangeFiltersOutForeignTableRecord() { + FetchTask.Context taskContext = mock(FetchTask.Context.class); + SourceRecord foreignTableRecord = mock(SourceRecord.class); + + when(taskContext.isDataChangeRecord(foreignTableRecord)).thenReturn(true); + when(taskContext.getTableId(foreignTableRecord)).thenReturn(OTHER_TABLE); + // Mirrors the production failure mode: the record is for a table whose schema is not in + // the cache, so any downstream lookup explodes with NPE. The test asserts we never get + // here. + when(taskContext.isRecordBetween(any(), any(), any())) + .thenThrow( + new NullPointerException( + "Cannot invoke \"io.debezium.relational.Table.primaryKeyColumns()\" because \"table\" is null")); + + IncrementalSourceScanFetcher fetcher = new IncrementalSourceScanFetcher(taskContext, 0); + fetcher.setCurrentSnapshotSplit(newSnapshotSplit(CURRENT_SPLIT_TABLE)); + + boolean result = fetcher.isChangeRecordInChunkRange(foreignTableRecord); + Assertions.assertThat(result).isFalse(); + verify(taskContext, never()).isRecordBetween(any(), any(), any()); + } + + @Test + void testIsChangeRecordInChunkRangeDelegatesForCurrentTableRecord() { + FetchTask.Context taskContext = mock(FetchTask.Context.class); + SourceRecord currentTableRecord = mock(SourceRecord.class); + + when(taskContext.isDataChangeRecord(currentTableRecord)).thenReturn(true); + when(taskContext.getTableId(currentTableRecord)).thenReturn(CURRENT_SPLIT_TABLE); + when(taskContext.isRecordBetween(any(), any(), any())).thenReturn(true); + + IncrementalSourceScanFetcher fetcher = new IncrementalSourceScanFetcher(taskContext, 0); + fetcher.setCurrentSnapshotSplit(newSnapshotSplit(CURRENT_SPLIT_TABLE)); + + Assertions.assertThat(fetcher.isChangeRecordInChunkRange(currentTableRecord)).isTrue(); + verify(taskContext).isRecordBetween(any(), any(), any()); + } + + @Test + void testIsChangeRecordInChunkRangeIgnoresNonDataChangeRecord() { + FetchTask.Context taskContext = mock(FetchTask.Context.class); + SourceRecord watermarkRecord = mock(SourceRecord.class); + + when(taskContext.isDataChangeRecord(watermarkRecord)).thenReturn(false); + + IncrementalSourceScanFetcher fetcher = new IncrementalSourceScanFetcher(taskContext, 0); + fetcher.setCurrentSnapshotSplit(newSnapshotSplit(CURRENT_SPLIT_TABLE)); + + Assertions.assertThat(fetcher.isChangeRecordInChunkRange(watermarkRecord)).isFalse(); + verify(taskContext, never()).getTableId(any()); + verify(taskContext, never()).isRecordBetween(any(), any(), any()); + } + + private static SnapshotSplit newSnapshotSplit(TableId tableId) { + RowType splitKeyType = + (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); + return new SnapshotSplit( + tableId, + 0, + splitKeyType, + new Object[] {0L}, + new Object[] {1024L}, + null, + Collections.emptyMap()); + } +}