Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand Down Expand Up @@ -81,6 +82,15 @@ default Offset displayCommittedOffset(C sourceConfig) {
/** The task context used for fetch task to fetch data from external systems. */
FetchTask.Context createFetchTaskContext(C sourceConfig);

/**
* The stream fetcher used to fetch data of a stream split. Dialects may override this to
* provide connector-specific filtering or routing behavior.
*/
default IncrementalSourceStreamFetcher createStreamFetcher(
FetchTask.Context taskContext, int subtaskId) {
return new IncrementalSourceStreamFetcher(taskContext, subtaskId);
}

/**
* We may need the offset corresponding to the checkpointId. For example, we should commit LSN
* of checkpoint to postgres's slot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected TableChanges getTableChangeRecord(SourceRecord element) throws IOExcep
return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
}

private void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) {
protected void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) {
if (splitState.isStreamSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asStreamSplitState().setStartingOffset(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private IncrementalSourceScanFetcher getScanFetcher() {
private IncrementalSourceStreamFetcher getStreamFetcher() {
if (reusedStreamFetcher == null) {
reusedStreamFetcher =
new IncrementalSourceStreamFetcher(
dataSourceDialect.createStreamFetcher(
dataSourceDialect.createFetchTaskContext(sourceConfig), subtaskId);
}
return reusedStreamFetcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void close() {
* only the change event belong to [1024, 2048) and offset is after highWatermark1 should send.
* </pre>
*/
private boolean shouldEmit(SourceRecord sourceRecord) {
protected boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
TableId tableId = taskContext.getTableId(sourceRecord);
Offset position = taskContext.getStreamOffset(sourceRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
import org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceStreamFetcher;
import org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -239,6 +241,13 @@ public JdbcSourceFetchTaskContext createFetchTaskContext(JdbcSourceConfig taskSo
return new PostgresSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
public IncrementalSourceStreamFetcher createStreamFetcher(
FetchTask.Context taskContext, int subtaskId) {
return new PostgresSourceStreamFetcher(
taskContext, subtaskId, sourceConfig.isLogicalMessageEnabled());
}

@Override
public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
if (streamFetchTask != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ public PostgresSourceBuilder<T> includeDatabaseInTableId(boolean includeDatabase
return this;
}

/** Whether to emit Postgres logical decoding messages to the deserializer. */
public PostgresSourceBuilder<T> includeLogicalMessages(boolean includeLogicalMessages) {
this.configFactory.setIncludeLogicalMessages(includeLogicalMessages);
return this;
}

/** Whether to infer schema change event on relation message. */
public PostgresSourceBuilder<T> includeSchemaChanges(boolean includeSchemaChanges) {
this.configFactory.includeSchemaChanges(includeSchemaChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private final int lsnCommitCheckpointsDelay;
private final boolean includePartitionedTables;
private final boolean includeDatabaseInTableId;
private final boolean includeLogicalMessages;

public PostgresSourceConfig(
int subtaskId,
Expand Down Expand Up @@ -71,7 +72,8 @@ public PostgresSourceConfig(
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
boolean includePartitionedTables,
boolean includeDatabaseInTableId) {
boolean includeDatabaseInTableId,
boolean includeLogicalMessages) {
super(
startupOptions,
databaseList,
Expand Down Expand Up @@ -103,6 +105,7 @@ public PostgresSourceConfig(
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.includeLogicalMessages = includeLogicalMessages;
}

/**
Expand Down Expand Up @@ -156,4 +159,9 @@ public PostgresConnectorConfig getDbzConnectorConfig() {
public boolean isIncludeDatabaseInTableId() {
return includeDatabaseInTableId;
}

/** Returns whether to emit Postgres logical decoding messages to the deserializer. */
public boolean isLogicalMessageEnabled() {
return includeLogicalMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
private boolean includeDatabaseInTableId =
PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();

private boolean includeLogicalMessages =
PostgresSourceOptions.SCAN_LOGICAL_MESSAGE_ENABLED.defaultValue();

/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
Expand Down Expand Up @@ -140,7 +143,8 @@ public PostgresSourceConfig create(int subtaskId) {
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
includePartitionedTables,
includeDatabaseInTableId);
includeDatabaseInTableId,
includeLogicalMessages);
}

/**
Expand Down Expand Up @@ -198,4 +202,9 @@ public void setIncludePartitionedTables(boolean includePartitionedTables) {
public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
this.includeDatabaseInTableId = includeDatabaseInTableId;
}

/** Set whether to emit Postgres logical decoding messages to the deserializer. */
public void setIncludeLogicalMessages(boolean includeLogicalMessages) {
this.includeLogicalMessages = includeLogicalMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
+ "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n"
+ "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.");

public static final ConfigOption<Boolean> SCAN_LOGICAL_MESSAGE_ENABLED =
ConfigOptions.key("scan.logical-message.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to emit Postgres logical messages produced by "
+ "pg_logical_emit_message() to the deserializer. "
+ "Disabled by default; logical messages are not bound to "
+ "any table and are dropped by table-based watermark filtering.");

public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
ConfigOptions.key("table-id.include-database")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.flink.cdc.connectors.postgres.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
Expand All @@ -43,6 +45,18 @@ public PostgresSourceRecordEmitter(
offsetFactory);
}

@Override
protected void processElement(
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState)
throws Exception {
if (PostgresSourceStreamFetcher.isLogicalMessage(element)) {
updateStreamSplitState(splitState, element);
emitElement(element, output);
return;
}
super.processElement(element, output, splitState);
}

@Override
protected TableChanges getTableChangeRecord(SourceRecord element) throws IOException {
if (element instanceof PostgresSchemaRecord) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.postgres.source.reader;

import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;

import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
* Stream fetcher for Postgres. When {@code includeLogicalMessages} is enabled, lets {@code
* pg_logical_emit_message} records (op="m") bypass table-based watermark filtering, since logical
* messages are not bound to a table.
*/
public class PostgresSourceStreamFetcher extends IncrementalSourceStreamFetcher {

private final boolean includeLogicalMessages;

public PostgresSourceStreamFetcher(
FetchTask.Context taskContext, int subtaskId, boolean includeLogicalMessages) {
super(taskContext, subtaskId);
this.includeLogicalMessages = includeLogicalMessages;
}

@Override
protected boolean shouldEmit(SourceRecord sourceRecord) {
if (includeLogicalMessages && isLogicalMessage(sourceRecord)) {
return true;
}
return super.shouldEmit(sourceRecord);
}

static boolean isLogicalMessage(SourceRecord record) {
if (record.value() instanceof Struct) {
Struct struct = (Struct) record.value();
return struct.schema().field(Envelope.FieldName.OPERATION) != null
&& "m".equals(struct.getString(Envelope.FieldName.OPERATION));
}
return false;
}
}
Loading
Loading