Add SMT plugin for Kafka Connect S3 object key partitioning#2410
Closed
delthas wants to merge 1 commit into
Closed
Add SMT plugin for Kafka Connect S3 object key partitioning#2410delthas wants to merge 1 commit into
delthas wants to merge 1 commit into
Conversation
Contributor
Hello delthas,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
Contributor
Request integration branchesWaiting for integration branch creation to be requested by the user. To request integration branches, please comment on this pull request with the following command: Alternatively, the |
cbf607d to
4b57f42
Compare
4b57f42 to
0b0120c
Compare
Introduces a Kafka Connect Single Message Transform (TransformObjectKey) that rewrites the oplog topic message key from the MongoDB change-stream's documentKey._id, stripping the Scality master/version encoding produced by arsenal. All events for the same logical S3 object then hash to the same Kafka partition, regardless of op type. This enables the Backbeat-side fix in BB-768, where update and delete events currently land on the wrong partition because fullDocument (today's key source) is null on those op types since BB-355 removed change.stream.full.document=updateLookup. Wiring: * New Maven module under solution/kafka-connect/smt/ with 16 unit tests covering strip logic and SMT shape handling. * solution/kafka-connect/Dockerfile gains an smt-build multi-stage that compiles the JAR with mvn and a final-stage COPY into /usr/local/share/kafka/plugins/. * kafka.version is threaded from solution/deps.yaml via kafka_build_vars.sh -> env.kafka_version -> build-arg KAFKA_VERSION -> mvn -Dkafka.version=, so the SMT compiles against the same Kafka release the kafka image ships. Issue: ZENKO-5274
0b0120c to
9f310a7
Compare
Contributor
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
Contributor
Author
|
Replaced by scality/backbeat#2744 |
delthas
added a commit
to scality/backbeat
that referenced
this pull request
May 29, 2026
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
delthas
added a commit
to scality/backbeat
that referenced
this pull request
May 29, 2026
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
delthas
added a commit
to scality/backbeat
that referenced
this pull request
May 29, 2026
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
delthas
added a commit
to scality/backbeat
that referenced
this pull request
May 29, 2026
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
delthas
added a commit
to scality/backbeat
that referenced
this pull request
May 29, 2026
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
solution/kafka-connect/smt/providingcom.scality.kafka.connect.transforms.TransformObjectKey, a Kafka Connect Single Message Transform that rewrites the oplog topic message key from the MongoDB change-stream'sdocumentKey._id, stripping the Scality master/version encoding produced by arsenal.solution/kafka-connect/Dockerfilegains ansmt-buildmulti-stage that compiles the SMT JAR and copies it into/usr/local/share/kafka/plugins/next to the existing MongoDB source connector.solution/deps.yaml(kafka.tag: 2.13-3.1.2) viakafka_build_vars.sh→env.kafka_version→ build-argKAFKA_VERSION→mvn -Dkafka.version=, so the SMT compiles against the same Kafka release the kafka image ships.Context
Backbeat BB-768 fixes a long-standing partitioning bug on the oplog Kafka topic: today the message key is derived from
fullDocument.value.key, butfullDocumentisnullonupdateanddeleteevents (since BB-355 removedchange.stream.full.document=updateLookupto avoid per-update round-trips). The result is that updates/deletes for an S3 object collapse onto a single partition while inserts spread across the topic — breaking per-object ordering across op types.The Backbeat-side fix is to project
documentKey._id(always populated, regardless of op type) and let an SMT strip the master/version encoding so master + all versions of the same logical S3 object land on the same partition. That SMT must live in Connect's plugin path before the Backbeat PR can ship — which is what this PR delivers.Validated end-to-end on a local stack (mongo 6.0 replica set + kafka 7.1.7 + Connect with this image): inserts, updates, and deletes for 4 distinct S3 objects each routed to a single partition per object, matching the BB-768 acceptance criterion.
Issue: ZENKO-5274