Skip to content

Add SMT plugin for Kafka Connect S3 object key partitioning#2410

Closed
delthas wants to merge 1 commit into
development/2.15from
improvement/ZENKO-5274/smt-plugin-kafka-connect
Closed

Add SMT plugin for Kafka Connect S3 object key partitioning#2410
delthas wants to merge 1 commit into
development/2.15from
improvement/ZENKO-5274/smt-plugin-kafka-connect

Conversation

@delthas
Copy link
Copy Markdown
Contributor

@delthas delthas commented May 11, 2026

Summary

  • New Maven module under solution/kafka-connect/smt/ providing com.scality.kafka.connect.transforms.TransformObjectKey, a Kafka Connect Single Message Transform that rewrites the oplog topic message key from the MongoDB change-stream's documentKey._id, stripping the Scality master/version encoding produced by arsenal.
  • solution/kafka-connect/Dockerfile gains an smt-build multi-stage that compiles the SMT JAR and copies it into /usr/local/share/kafka/plugins/ next to the existing MongoDB source connector.
  • The Kafka version targeted by the SMT is threaded from solution/deps.yaml (kafka.tag: 2.13-3.1.2) via kafka_build_vars.shenv.kafka_version → build-arg KAFKA_VERSIONmvn -Dkafka.version=, so the SMT compiles against the same Kafka release the kafka image ships.

Context

Backbeat BB-768 fixes a long-standing partitioning bug on the oplog Kafka topic: today the message key is derived from fullDocument.value.key, but fullDocument is null on update and delete events (since BB-355 removed change.stream.full.document=updateLookup to avoid per-update round-trips). The result is that updates/deletes for an S3 object collapse onto a single partition while inserts spread across the topic — breaking per-object ordering across op types.

The Backbeat-side fix is to project documentKey._id (always populated, regardless of op type) and let an SMT strip the master/version encoding so master + all versions of the same logical S3 object land on the same partition. That SMT must live in Connect's plugin path before the Backbeat PR can ship — which is what this PR delivers.

Validated end-to-end on a local stack (mongo 6.0 replica set + kafka 7.1.7 + Connect with this image): inserts, updates, and deletes for 4 distinct S3 objects each routed to a single partition per object, matching the BB-768 acceptance criterion.

Issue: ZENKO-5274

@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 11, 2026

Hello delthas,

My role is to assist you with the merge of this
pull request. Please type @bert-e help to get information
on this process, or consult the user documentation.

Available options
name description privileged authored
/after_pull_request Wait for the given pull request id to be merged before continuing with the current one.
/bypass_author_approval Bypass the pull request author's approval
/bypass_build_status Bypass the build and test status
/bypass_commit_size Bypass the check on the size of the changeset TBA
/bypass_incompatible_branch Bypass the check on the source branch prefix
/bypass_jira_check Bypass the Jira issue check
/bypass_peer_approval Bypass the pull request peers' approval
/bypass_leader_approval Bypass the pull request leaders' approval
/approve Instruct Bert-E that the author has approved the pull request. ✍️
/create_pull_requests Allow the creation of integration pull requests.
/create_integration_branches Allow the creation of integration branches.
/no_octopus Prevent Wall-E from doing any octopus merge and use multiple consecutive merge instead
/unanimity Change review acceptance criteria from one reviewer at least to all reviewers
/wait Instruct Bert-E not to run until further notice.
Available commands
name description privileged
/help Print Bert-E's manual in the pull request.
/status Print Bert-E's current status in the pull request TBA
/clear Remove all comments from Bert-E from the history TBA
/retry Re-start a fresh build TBA
/build Re-start a fresh build TBA
/force_reset Delete integration branches & pull requests, and restart merge process from the beginning.
/reset Try to remove integration branches unless there are commits on them which do not appear on the source branch.

Status report is not available.

@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 11, 2026

Request integration branches

Waiting for integration branch creation to be requested by the user.

To request integration branches, please comment on this pull request with the following command:

/create_integration_branches

Alternatively, the /approve and /create_pull_requests commands will automatically
create the integration branches.

@delthas delthas force-pushed the improvement/ZENKO-5274/smt-plugin-kafka-connect branch from cbf607d to 4b57f42 Compare May 11, 2026 14:12
Comment thread solution/kafka-connect/Dockerfile Outdated
@delthas delthas force-pushed the improvement/ZENKO-5274/smt-plugin-kafka-connect branch from 4b57f42 to 0b0120c Compare May 11, 2026 14:16
@delthas delthas marked this pull request as draft May 11, 2026 15:02
Introduces a Kafka Connect Single Message Transform
(TransformObjectKey) that rewrites the oplog topic message key from the
MongoDB change-stream's documentKey._id, stripping the Scality
master/version encoding produced by arsenal. All events for the same
logical S3 object then hash to the same Kafka partition, regardless of
op type.

This enables the Backbeat-side fix in BB-768, where update and delete
events currently land on the wrong partition because fullDocument
(today's key source) is null on those op types since BB-355 removed
change.stream.full.document=updateLookup.

Wiring:

* New Maven module under solution/kafka-connect/smt/ with 16 unit
  tests covering strip logic and SMT shape handling.
* solution/kafka-connect/Dockerfile gains an smt-build multi-stage
  that compiles the JAR with mvn and a final-stage COPY into
  /usr/local/share/kafka/plugins/.
* kafka.version is threaded from solution/deps.yaml via
  kafka_build_vars.sh -> env.kafka_version -> build-arg KAFKA_VERSION
  -> mvn -Dkafka.version=, so the SMT compiles against the same Kafka
  release the kafka image ships.

Issue: ZENKO-5274
@delthas delthas force-pushed the improvement/ZENKO-5274/smt-plugin-kafka-connect branch from 0b0120c to 9f310a7 Compare May 18, 2026 07:51
@delthas delthas changed the base branch from development/2.14 to development/2.15 May 18, 2026 07:51
@scality scality deleted a comment from bert-e May 18, 2026
@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 18, 2026

Waiting for approval

The following approvals are needed before I can proceed with the merge:

  • the author

  • 2 peers

@delthas delthas marked this pull request as ready for review May 21, 2026 14:02
@delthas delthas requested a review from francoisferrand May 22, 2026 15:40
@delthas
Copy link
Copy Markdown
Contributor Author

delthas commented May 29, 2026

Replaced by scality/backbeat#2744

@delthas delthas closed this May 29, 2026
delthas added a commit to scality/backbeat that referenced this pull request May 29, 2026
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: append a $addFields stage to the MongoDB connector's change-stream
pipeline that synthesises a top-level s3Key field by $ifNull coalescing
$fullDocument.value.key and $updateDescription.updatedFields.value.key.
The connector's output.schema.key is replaced with a single-field Avro
record {s3Key: [string, null]} so the Kafka message key is just that
field. All events for the same logical S3 object — insert, master/version
updates, replication-status updates, delete-marker updates — yield the
same s3Key value and hash to the same partition. Master and version
documents share the same value.key, so master/version events also
collapse to the same partition without prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + pipeline only,
not the pipeline match stage). Downstream oplog consumers do not read
the Kafka message key, so the new key shape is consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
delthas added a commit to scality/backbeat that referenced this pull request May 29, 2026
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: append a $addFields stage to the MongoDB connector's change-stream
pipeline that synthesises a top-level s3Key field by $ifNull coalescing
$fullDocument.value.key and $updateDescription.updatedFields.value.key.
The connector's output.schema.key is replaced with a single-field Avro
record {s3Key: [string, null]} so the Kafka message key is just that
field. All events for the same logical S3 object — insert, master/version
updates, replication-status updates, delete-marker updates — yield the
same s3Key value and hash to the same partition. Master and version
documents share the same value.key, so master/version events also
collapse to the same partition without prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + pipeline only,
not the pipeline match stage). Downstream oplog consumers do not read
the Kafka message key, so the new key shape is consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
delthas added a commit to scality/backbeat that referenced this pull request May 29, 2026
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: append a $addFields stage to the MongoDB connector's change-stream
pipeline that synthesises a top-level s3Key field by $ifNull coalescing
$fullDocument.value.key and $updateDescription.updatedFields.value.key.
The connector's output.schema.key is replaced with a single-field Avro
record {s3Key: [string, null]} so the Kafka message key is just that
field. All events for the same logical S3 object — insert, master/version
updates, replication-status updates, delete-marker updates — yield the
same s3Key value and hash to the same partition. Master and version
documents share the same value.key, so master/version events also
collapse to the same partition without prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + pipeline only,
not the pipeline match stage). Downstream oplog consumers do not read
the Kafka message key, so the new key shape is consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
delthas added a commit to scality/backbeat that referenced this pull request May 29, 2026
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: append a $addFields stage to the MongoDB connector's change-stream
pipeline that synthesises a top-level s3Key field by $ifNull coalescing
$fullDocument.value.key and $updateDescription.updatedFields.value.key.
The connector's output.schema.key is replaced with a single-field Avro
record {s3Key: [string, null]} so the Kafka message key is just that
field. All events for the same logical S3 object — insert, master/version
updates, replication-status updates, delete-marker updates — yield the
same s3Key value and hash to the same partition. Master and version
documents share the same value.key, so master/version events also
collapse to the same partition without prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + pipeline only,
not the pipeline match stage). Downstream oplog consumers do not read
the Kafka message key, so the new key shape is consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
delthas added a commit to scality/backbeat that referenced this pull request May 29, 2026
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: append a $addFields stage to the MongoDB connector's change-stream
pipeline that synthesises a top-level s3Key field by $ifNull coalescing
$fullDocument.value.key and $updateDescription.updatedFields.value.key.
The connector's output.schema.key is replaced with a single-field Avro
record {s3Key: [string, null]} so the Kafka message key is just that
field. All events for the same logical S3 object — insert, master/version
updates, replication-status updates, delete-marker updates — yield the
same s3Key value and hash to the same partition. Master and version
documents share the same value.key, so master/version events also
collapse to the same partition without prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + pipeline only,
not the pipeline match stage). Downstream oplog consumers do not read
the Kafka message key, so the new key shape is consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants