diff --git a/docs/docs/kafka-connect.md b/docs/docs/kafka-connect.md index 9c7d3d83f2e4..23cd0475601e 100644 --- a/docs/docs/kafka-connect.md +++ b/docs/docs/kafka-connect.md @@ -406,7 +406,9 @@ _(Experimental)_ The `DebeziumTransform` SMT transforms a Debezium formatted message for use by the sink's CDC feature. It will promote the `before` or `after` element fields to top level and add the following metadata fields: -`_cdc.op`, `_cdc.ts`, `_cdc.offset`, `_cdc.source`, `_cdc.target`, and `_cdc.key`. +`_cdc.op`, `_cdc.ts`, `_cdc.source_ts`, `_cdc.offset`, `_cdc.source`, `_cdc.target`, and `_cdc.key`. +`_cdc.ts` is set from the timestamp the connector processed the event (`ts_ms`), while `_cdc.source_ts` +is set from the timestamp the change was made in the source database (`source.ts_ms`). ##### Configuration diff --git a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/CdcConstants.java b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/CdcConstants.java index d34eaee5fce1..c44a88f95606 100644 --- a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/CdcConstants.java +++ b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/CdcConstants.java @@ -27,6 +27,7 @@ public interface CdcConstants { String COL_CDC = "_cdc"; String COL_OP = "op"; String COL_TS = "ts"; + String COL_SOURCE_TS = "source_ts"; String COL_OFFSET = "offset"; String COL_SOURCE = "source"; String COL_TARGET = "target"; diff --git a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/DebeziumTransform.java b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/DebeziumTransform.java index c15c3c8d5b18..3298e4d20014 100644 --- a/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/DebeziumTransform.java +++ b/kafka-connect/kafka-connect-transforms/src/main/java/org/apache/iceberg/connect/transforms/DebeziumTransform.java @@ -90,13 +90,15 @@ private R applyWithSchema(R record) { // create the CDC metadata Schema cdcSchema = makeCdcSchema(record.keySchema()); + Struct source = value.getStruct("source"); Struct cdcMetadata = new Struct(cdcSchema); cdcMetadata.put(CdcConstants.COL_OP, op); cdcMetadata.put(CdcConstants.COL_TS, new java.util.Date(value.getInt64("ts_ms"))); + cdcMetadata.put(CdcConstants.COL_SOURCE_TS, new java.util.Date(source.getInt64("ts_ms"))); if (record instanceof SinkRecord) { cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset()); } - setTableAndTargetFromSourceStruct(value.getStruct("source"), cdcMetadata); + setTableAndTargetFromSourceStruct(source, cdcMetadata); if (record.keySchema() != null) { cdcMetadata.put(CdcConstants.COL_KEY, record.key()); @@ -143,6 +145,9 @@ private R applySchemaless(R record) { Map cdcMetadata = Maps.newHashMap(); cdcMetadata.put(CdcConstants.COL_OP, op); cdcMetadata.put(CdcConstants.COL_TS, value.get("ts_ms")); + cdcMetadata.put( + CdcConstants.COL_SOURCE_TS, + Requirements.requireMap(value.get("source"), "Debezium transform").get("ts_ms")); if (record instanceof SinkRecord) { cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset()); } @@ -219,6 +224,7 @@ private Schema makeCdcSchema(Schema keySchema) { SchemaBuilder.struct() .field(CdcConstants.COL_OP, Schema.STRING_SCHEMA) .field(CdcConstants.COL_TS, Timestamp.SCHEMA) + .field(CdcConstants.COL_SOURCE_TS, Timestamp.SCHEMA) .field(CdcConstants.COL_OFFSET, Schema.OPTIONAL_INT64_SCHEMA) .field(CdcConstants.COL_SOURCE, Schema.STRING_SCHEMA) .field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA); diff --git a/kafka-connect/kafka-connect-transforms/src/test/java/org/apache/iceberg/connect/transforms/TestDebeziumTransform.java b/kafka-connect/kafka-connect-transforms/src/test/java/org/apache/iceberg/connect/transforms/TestDebeziumTransform.java index aa7eb99628aa..cdd2627a7618 100644 --- a/kafka-connect/kafka-connect-transforms/src/test/java/org/apache/iceberg/connect/transforms/TestDebeziumTransform.java +++ b/kafka-connect/kafka-connect-transforms/src/test/java/org/apache/iceberg/connect/transforms/TestDebeziumTransform.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.time.Instant; +import java.util.Date; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.kafka.connect.data.Decimal; @@ -33,6 +34,9 @@ public class TestDebeziumTransform { + private static final long EVENT_TS_MS = 1718000001000L; + private static final long SOURCE_TS_MS = 1718000000000L; + private static final Schema KEY_SCHEMA = SchemaBuilder.struct().field("account_id", Schema.INT64_SCHEMA).build(); @@ -48,6 +52,7 @@ public class TestDebeziumTransform { .field("db", Schema.STRING_SCHEMA) .field("schema", Schema.STRING_SCHEMA) .field("table", Schema.STRING_SCHEMA) + .field("ts_ms", Schema.INT64_SCHEMA) .build(); private static final Schema VALUE_SCHEMA = @@ -86,6 +91,8 @@ public void testDebeziumTransformSchemaless() { Map cdcMetadata = (Map) value.get("_cdc"); assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("ts")).isEqualTo(EVENT_TS_MS); + assertThat(cdcMetadata.get("source_ts")).isEqualTo(SOURCE_TS_MS); assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class); @@ -109,6 +116,9 @@ public void testDebeziumTransformWithSchema() { Struct cdcMetadata = value.getStruct("_cdc"); assertThat(cdcMetadata.get("op")).isEqualTo("U"); + assertThat(cdcMetadata.get("ts")).isEqualTo(Date.from(Instant.ofEpochMilli(EVENT_TS_MS))); + assertThat(cdcMetadata.get("source_ts")) + .isEqualTo(Date.from(Instant.ofEpochMilli(SOURCE_TS_MS))); assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl"); assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x"); assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class); @@ -120,7 +130,8 @@ private Map createDebeziumEventMap(String operation) { ImmutableMap.of( "db", "db", "schema", "schema", - "table", "tbl"); + "table", "tbl", + "ts_ms", SOURCE_TS_MS); Map data = ImmutableMap.of( @@ -130,7 +141,7 @@ private Map createDebeziumEventMap(String operation) { return ImmutableMap.of( "op", operation, - "ts_ms", System.currentTimeMillis(), + "ts_ms", EVENT_TS_MS, "source", source, "before", data, "after", data); @@ -138,7 +149,11 @@ private Map createDebeziumEventMap(String operation) { private Struct createDebeziumEventStruct(String operation) { Struct source = - new Struct(SOURCE_SCHEMA).put("db", "db").put("schema", "schema").put("table", "tbl"); + new Struct(SOURCE_SCHEMA) + .put("db", "db") + .put("schema", "schema") + .put("table", "tbl") + .put("ts_ms", SOURCE_TS_MS); Struct data = new Struct(ROW_SCHEMA) @@ -148,7 +163,7 @@ private Struct createDebeziumEventStruct(String operation) { return new Struct(VALUE_SCHEMA) .put("op", operation) - .put("ts_ms", System.currentTimeMillis()) + .put("ts_ms", EVENT_TS_MS) .put("source", source) .put("before", data) .put("after", data);