Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ _(Experimental)_

The `DebeziumTransform` SMT transforms a Debezium formatted message for use by the sink's CDC feature.
It will promote the `before` or `after` element fields to top level and add the following metadata fields:
`_cdc.op`, `_cdc.ts`, `_cdc.offset`, `_cdc.source`, `_cdc.target`, and `_cdc.key`.
`_cdc.op`, `_cdc.ts`, `_cdc.source_ts`, `_cdc.offset`, `_cdc.source`, `_cdc.target`, and `_cdc.key`.
`_cdc.ts` is set from the timestamp the connector processed the event (`ts_ms`), while `_cdc.source_ts`
is set from the timestamp the change was made in the source database (`source.ts_ms`).

##### Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface CdcConstants {
String COL_CDC = "_cdc";
String COL_OP = "op";
String COL_TS = "ts";
String COL_SOURCE_TS = "source_ts";
String COL_OFFSET = "offset";
String COL_SOURCE = "source";
String COL_TARGET = "target";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ private R applyWithSchema(R record) {

// create the CDC metadata
Schema cdcSchema = makeCdcSchema(record.keySchema());
Struct source = value.getStruct("source");
Struct cdcMetadata = new Struct(cdcSchema);
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, new java.util.Date(value.getInt64("ts_ms")));
cdcMetadata.put(CdcConstants.COL_SOURCE_TS, new java.util.Date(source.getInt64("ts_ms")));
if (record instanceof SinkRecord) {
cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset());
}
setTableAndTargetFromSourceStruct(value.getStruct("source"), cdcMetadata);
setTableAndTargetFromSourceStruct(source, cdcMetadata);

if (record.keySchema() != null) {
cdcMetadata.put(CdcConstants.COL_KEY, record.key());
Expand Down Expand Up @@ -143,6 +145,9 @@ private R applySchemaless(R record) {
Map<String, Object> cdcMetadata = Maps.newHashMap();
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, value.get("ts_ms"));
cdcMetadata.put(
CdcConstants.COL_SOURCE_TS,
Requirements.requireMap(value.get("source"), "Debezium transform").get("ts_ms"));
if (record instanceof SinkRecord) {
cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset());
}
Expand Down Expand Up @@ -219,6 +224,7 @@ private Schema makeCdcSchema(Schema keySchema) {
SchemaBuilder.struct()
.field(CdcConstants.COL_OP, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_SOURCE_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_OFFSET, Schema.OPTIONAL_INT64_SCHEMA)
.field(CdcConstants.COL_SOURCE, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.math.BigDecimal;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -33,6 +34,9 @@

public class TestDebeziumTransform {

private static final long EVENT_TS_MS = 1718000001000L;
private static final long SOURCE_TS_MS = 1718000000000L;

private static final Schema KEY_SCHEMA =
SchemaBuilder.struct().field("account_id", Schema.INT64_SCHEMA).build();

Expand All @@ -48,6 +52,7 @@ public class TestDebeziumTransform {
.field("db", Schema.STRING_SCHEMA)
.field("schema", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA)
.field("ts_ms", Schema.INT64_SCHEMA)
.build();

private static final Schema VALUE_SCHEMA =
Expand Down Expand Up @@ -86,6 +91,8 @@ public void testDebeziumTransformSchemaless() {

Map<String, Object> cdcMetadata = (Map<String, Object>) value.get("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("ts")).isEqualTo(EVENT_TS_MS);
assertThat(cdcMetadata.get("source_ts")).isEqualTo(SOURCE_TS_MS);
assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl");
assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x");
assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class);
Expand All @@ -109,6 +116,9 @@ public void testDebeziumTransformWithSchema() {

Struct cdcMetadata = value.getStruct("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("ts")).isEqualTo(Date.from(Instant.ofEpochMilli(EVENT_TS_MS)));
assertThat(cdcMetadata.get("source_ts"))
.isEqualTo(Date.from(Instant.ofEpochMilli(SOURCE_TS_MS)));
assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl");
assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x");
assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class);
Expand All @@ -120,7 +130,8 @@ private Map<String, Object> createDebeziumEventMap(String operation) {
ImmutableMap.of(
"db", "db",
"schema", "schema",
"table", "tbl");
"table", "tbl",
"ts_ms", SOURCE_TS_MS);

Map<String, Object> data =
ImmutableMap.of(
Expand All @@ -130,15 +141,19 @@ private Map<String, Object> createDebeziumEventMap(String operation) {

return ImmutableMap.of(
"op", operation,
"ts_ms", System.currentTimeMillis(),
"ts_ms", EVENT_TS_MS,
"source", source,
"before", data,
"after", data);
}

private Struct createDebeziumEventStruct(String operation) {
Struct source =
new Struct(SOURCE_SCHEMA).put("db", "db").put("schema", "schema").put("table", "tbl");
new Struct(SOURCE_SCHEMA)
.put("db", "db")
.put("schema", "schema")
.put("table", "tbl")
.put("ts_ms", SOURCE_TS_MS);

Struct data =
new Struct(ROW_SCHEMA)
Expand All @@ -148,7 +163,7 @@ private Struct createDebeziumEventStruct(String operation) {

return new Struct(VALUE_SCHEMA)
.put("op", operation)
.put("ts_ms", System.currentTimeMillis())
.put("ts_ms", EVENT_TS_MS)
.put("source", source)
.put("before", data)
.put("after", data);
Expand Down