Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jobs:
KAFKA_IMAGE=${{ env.KAFKA_IMAGE }}
KAFKA_TAG=${{ env.KAFKA_TAG }}-${{ env.BUILD_TREE_HASH }}
MONGODB_CONNECTOR_TAG=${{ env.MONGODB_CONNECTOR_TAG }}
KAFKA_VERSION=${{ env.kafka_version }}
tags: "${{ env.KAFKA_CONNECT_IMAGE }}:${{ env.KAFKA_CONNECT_TAG }}-${{ env.BUILD_TREE_HASH }}"
cache-from: |
type=registry,ref=${{ env.KAFKA_CONNECT_IMAGE }}:${{ env.KAFKA_CONNECT_TAG }}-${{ env.BUILD_TREE_HASH }}
Expand Down
11 changes: 11 additions & 0 deletions solution/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ WORKDIR /tmp
RUN curl -LO https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/${MONGODB_CONNECTOR_TAG}/mongo-kafka-connect-${MONGODB_CONNECTOR_TAG}-all.jar && \
mv /tmp/mongo-kafka-connect-${MONGODB_CONNECTOR_TAG}-all.jar /tmp/mongo-kafka-connect.jar

# Build Scality SMT plugin
FROM maven:3.9-eclipse-temurin-17 AS smt-build
ARG KAFKA_VERSION
WORKDIR /build
COPY smt/ ./
RUN mvn -B -Dkafka.version=${KAFKA_VERSION} package

# Use Kafka base image
FROM ${KAFKA_IMAGE}:${KAFKA_TAG}

Expand All @@ -30,3 +37,7 @@ ENV KAFKA_OPTS=-javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exp

# mongodb ingestor
COPY --from=mongodb-connector /tmp/mongo-kafka-connect.jar /usr/local/share/kafka/plugins/

# scality smt plugin
COPY --from=smt-build /build/target/scality-kafka-connect-transforms-*.jar \
/usr/local/share/kafka/plugins/
1 change: 1 addition & 0 deletions solution/kafka-connect/smt/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
52 changes: 52 additions & 0 deletions solution/kafka-connect/smt/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.scality.kafka.connect</groupId>
<artifactId>scality-kafka-connect-transforms</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>3.1.2</kafka.version>
<junit.version>5.10.2</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.scality.kafka.connect.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Kafka Connect SMT that rewrites the message key to the raw S3 object key,
* stripping the Scality master/version encoding used in the MongoDB metadata
* collections' _id field.
*
* Encoded _id forms produced by arsenal:
* - V1 master: "\x7FM" + rawKey
* - V1 version: "\x7FV" + rawKey + "\x00" + versionId
* - V0 legacy: rawKey (no prefix)
*
* This SMT collapses master and all versions of the same logical S3 object
* onto the same Kafka partition.
*
* Expects the connector's output.schema.key to project documentKey._id, so
* record.key() is a Struct with a nested documentKey.{_id}. Falls through
* unchanged if the key is null or of an unexpected shape; unexpected shapes
* are logged at DEBUG so configuration mismatches can be diagnosed.
*/
public class TransformObjectKey<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger log = LoggerFactory.getLogger(TransformObjectKey.class);
private static final ConfigDef CONFIG_DEF = new ConfigDef();

private static final char SCALITY_PREFIX_BYTE = '\u007F';
private static final char MASTER_TAG = 'M';
private static final char VERSION_TAG = 'V';
private static final char VERSION_SEPARATOR = '\u0000';

static String stripObjectKey(String id) {
if (id == null || id.length() < 2 || id.charAt(0) != SCALITY_PREFIX_BYTE) {
return id;
}
char tag = id.charAt(1);
if (tag == MASTER_TAG) {
return id.substring(2);
}
if (tag == VERSION_TAG) {
String tail = id.substring(2);
int sep = tail.indexOf(VERSION_SEPARATOR);
return sep >= 0 ? tail.substring(0, sep) : tail;
}
return id;
}

@Override
public R apply(R record) {
String id = extractDocumentKeyId(record.key());
if (id == null) {
return record;
}
String stripped = stripObjectKey(id);
// Pass null partition so Connect's partitioner re-hashes on the new key.
// Forwarding record.kafkaPartition() would pin the message to whatever
// partition the source connector chose, making this SMT a no-op for routing.
return record.newRecord(
record.topic(),
null,
Schema.STRING_SCHEMA,
stripped,
record.valueSchema(),
record.value(),
record.timestamp(),
record.headers());
}

private static String extractDocumentKeyId(Object key) {
if (key == null) {
return null;
}
if (key instanceof String) {
return (String) key;
}
if (!(key instanceof Struct)) {
log.debug("Unsupported key type {}; passing through unchanged",
key.getClass().getName());
return null;
}
Struct s = (Struct) key;
if (s.schema().field("documentKey") == null) {
log.debug("Key Struct has no documentKey field; passing through unchanged");
return null;
}
Object docKey = s.get("documentKey");
if (!(docKey instanceof Struct)) {
log.debug("documentKey is not a Struct ({}); passing through unchanged",
docKey == null ? "null" : docKey.getClass().getName());
return null;
}
Struct d = (Struct) docKey;
if (d.schema().field("_id") == null) {
log.debug("documentKey Struct has no _id field; passing through unchanged");
return null;
}
Object id = d.get("_id");
if (!(id instanceof String)) {
log.debug("documentKey._id is not a String ({}); passing through unchanged",
id == null ? "null" : id.getClass().getName());
return null;
}
return (String) id;
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.scality.kafka.connect.transforms;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;

class TransformObjectKeyTest {

private static final String VID = "98765432101234567890ABCD";
private static final String M = "\u007FM";
private static final String V = "\u007FV";
private static final String NUL = "\u0000";

@Test
void stripsV1Master() {
assertEquals("my/object", TransformObjectKey.stripObjectKey(M + "my/object"));
}

@Test
void stripsV1Version() {
assertEquals("my/object",
TransformObjectKey.stripObjectKey(V + "my/object" + NUL + VID));
}

@Test
void passesThroughV0Legacy() {
assertEquals("legacy-key-no-prefix",
TransformObjectKey.stripObjectKey("legacy-key-no-prefix"));
}

@Test
void keepsNullInsideMasterRawKey() {
assertEquals("foo" + NUL + "bar",
TransformObjectKey.stripObjectKey(M + "foo" + NUL + "bar"));
}

@Test
void versionWithEmptyKey() {
assertEquals("", TransformObjectKey.stripObjectKey(V + NUL + "vidonly"));
}

@Test
void masterWithEmptyKey() {
assertEquals("", TransformObjectKey.stripObjectKey(M));
}

@Test
void versionWithoutSeparator() {
assertEquals("orphan", TransformObjectKey.stripObjectKey(V + "orphan"));
}

@Test
void unrecognizedPrefixPassesThrough() {
assertEquals("NotMV-passthrough",
TransformObjectKey.stripObjectKey("NotMV-passthrough"));
}

@Test
void unicodeRawKey() {
assertEquals("ünîçødé/path",
TransformObjectKey.stripObjectKey(M + "ünîçødé/path"));
}

@Test
void nullId() {
assertNull(TransformObjectKey.stripObjectKey(null));
}

@Test
void applyRewritesStructKey() {
Schema docKeySchema = SchemaBuilder.struct()
.field("_id", Schema.STRING_SCHEMA).build();
Schema keySchema = SchemaBuilder.struct()
.field("documentKey", docKeySchema).build();
Struct docKey = new Struct(docKeySchema).put("_id", M + "bucket/obj");
Struct key = new Struct(keySchema).put("documentKey", docKey);

SourceRecord in = sourceRecord(keySchema, key);

try (TransformObjectKey<SourceRecord> smt = new TransformObjectKey<>()) {
smt.configure(Collections.emptyMap());
SourceRecord out = smt.apply(in);
assertEquals("bucket/obj", out.key());
assertEquals(Schema.STRING_SCHEMA, out.keySchema());
assertSame(in.value(), out.value());
}
}

@Test
void applyPassesThroughOnNullKey() {
SourceRecord in = sourceRecord(null, null);
try (TransformObjectKey<SourceRecord> smt = new TransformObjectKey<>()) {
assertSame(in, smt.apply(in));
}
}

@Test
void applyPassesThroughOnStructWithoutDocumentKey() {
Schema keySchema = SchemaBuilder.struct()
.field("other", Schema.STRING_SCHEMA).build();
Struct key = new Struct(keySchema).put("other", "irrelevant");
SourceRecord in = sourceRecord(keySchema, key);
try (TransformObjectKey<SourceRecord> smt = new TransformObjectKey<>()) {
assertSame(in, smt.apply(in));
}
}

@Test
void applyPassesThroughOnDocumentKeyWithoutId() {
Schema docKeySchema = SchemaBuilder.struct()
.field("other", Schema.STRING_SCHEMA).build();
Schema keySchema = SchemaBuilder.struct()
.field("documentKey", docKeySchema).build();
Struct docKey = new Struct(docKeySchema).put("other", "x");
Struct key = new Struct(keySchema).put("documentKey", docKey);
SourceRecord in = sourceRecord(keySchema, key);
try (TransformObjectKey<SourceRecord> smt = new TransformObjectKey<>()) {
assertSame(in, smt.apply(in));
}
}

@Test
void applyPassesThroughOnNonStringId() {
Schema docKeySchema = SchemaBuilder.struct()
.field("_id", Schema.INT64_SCHEMA).build();
Schema keySchema = SchemaBuilder.struct()
.field("documentKey", docKeySchema).build();
Struct docKey = new Struct(docKeySchema).put("_id", 42L);
Struct key = new Struct(keySchema).put("documentKey", docKey);
SourceRecord in = sourceRecord(keySchema, key);
try (TransformObjectKey<SourceRecord> smt = new TransformObjectKey<>()) {
assertSame(in, smt.apply(in));
}
}

@Test
void applyTreatsRawStringKeyAsId() {
SourceRecord in = sourceRecord(Schema.STRING_SCHEMA, M + "bucket/obj");
try (TransformObjectKey<SourceRecord> smt = new TransformObjectKey<>()) {
SourceRecord out = smt.apply(in);
assertEquals("bucket/obj", out.key());
assertEquals(Schema.STRING_SCHEMA, out.keySchema());
}
}

private static SourceRecord sourceRecord(Schema keySchema, Object key) {
return new SourceRecord(
Collections.emptyMap(), Collections.emptyMap(),
"topic", 0,
keySchema, key,
Schema.STRING_SCHEMA, "value");
}
}
Loading