oplogPopulator: route oplog key through TransformObjectKey SMT when available#2741
oplogPopulator: route oplog key through TransformObjectKey SMT when available#2741delthas wants to merge 1 commit into
Conversation
Hello delthas,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
... and 7 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.5 #2741 +/- ##
===================================================
- Coverage 74.73% 74.66% -0.07%
===================================================
Files 199 199
Lines 13650 13659 +9
===================================================
- Hits 10201 10199 -2
- Misses 3439 3450 +11
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
650dd4e to
396d23b
Compare
396d23b to
55af2e9
Compare
Request integration branchesWaiting for integration branch creation to be requested by the user. To request integration branches, please comment on this pull request with the following command: Alternatively, the |
55af2e9 to
3f61602
Compare
francoisferrand
left a comment
There was a problem hiding this comment.
- Do we need to make this dynamic, or should we just let ZKOP enable the plugin depending on backbeat/zenko version (could be a "feature flag" in zenkoversion cr, so it can be enabled when Zenko has both the the plugin, the supporting backbeat version, and we decide it is time) ?
- How critical/dangerous is this whole fix (SMT...) ? Should it land in a patch release, or should we be more cautious?
3f61602 to
aa29999
Compare
…a config flag Adds an oplogPopulator 'transformObjectKey' config flag (default false). When enabled, oplog source connectors are configured with the com.scality.kafka.connect.transforms.TransformObjectKey SMT (shipped by Zenko via ZENKO-5274), keying messages by the raw S3 object key derived from documentKey._id with the arsenal master/version encoding stripped. Master and all versions of the same S3 object then hash to the same partition, regardless of op type — fixing BB-768, where update/delete events collapse onto one partition because fullDocument (today's key source) is null on those op types. The flag is owned by the operator (ZKOP / zenkoversion CR) and flipped on once the Kafka Connect image ships the TransformObjectKey plugin and the supporting Backbeat version is deployed — rather than Backbeat probing the Connect plugin path at runtime. This keeps Backbeat free of the auto-detection / runtime-flip machinery and gives a controlled rollout. * extensions/oplogPopulator/constants: defaultConnectorConfig (legacy key schema) + smtKeyConfig (documentKey._id projection + transforms + key.converter), applied on top when the flag is set. * ConnectorsManager: _getDefaultConnectorConfiguration picks the SMT key config from the flag; _processOldConnectors scrubs SMT-only keys from a stale oldConfig when the flag is off (so a connector that previously ran with the SMT doesn't keep referencing a missing class). * OplogPopulatorConfigValidator: validate the new boolean flag. Issue: BB-768
aa29999 to
5bbc166
Compare
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
|
|
Pivoted per your first point: dropped the runtime Per your second point: retargeted to Follow-up: the zenkoversion CR → Backbeat config is tracked in ZKOP-553. |
| // SMT so the oplog message key is the raw S3 object key. Enabled via | ||
| // the oplogPopulator 'transformObjectKey' config flag, set by the | ||
| // operator once Kafka Connect ships the TransformObjectKey plugin. | ||
| this._transformObjectKey = params.transformObjectKey || false; |
There was a problem hiding this comment.
defaulting is not needed here, already handling by joi ?
| 'value.converter': 'org.apache.kafka.connect.storage.StringConverter', | ||
| // Kafka message key config (legacy). | ||
| // The key schema projects {ns.coll, fullDocument.value.key}. fullDocument | ||
| // is null on update/delete events (BB-355 removed updateLookup), so for |
There was a problem hiding this comment.
c.f. https://scality.atlassian.net/browse/BB-768?focusedCommentId=477005 :
- we always pass the full document (AFAIK) on
update, so the key should be available (but maybe not in "fullDocument") - the
deleteevent is not used AFAIK (we generate an update with the previous document right before it), and may be dropped instead
→ is there a simpler (as in "less maitenance") fix?
|
Replaced by #2744 |
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
Summary
oplogPopulator.transformObjectKeyconfig flag (defaultfalse). When enabled, oplog source connectors are configured with thecom.scality.kafka.connect.transforms.TransformObjectKeySMT (shipped by Zenko via ZENKO-5274), keying messages by the raw S3 object key.extensions/oplogPopulator/constants.js:defaultConnectorConfig(legacy{ns, fullDocument.value.key}key schema) +smtKeyConfig(projectsdocumentKey._id, adds the SMT +key.converter=StringConverter), applied on top when the flag is set.ConnectorsManager._getDefaultConnectorConfigurationpicks the SMT key config from the flag._processOldConnectorsscrubs SMT-only keys from a stale connector config when the flag is off, so a connector that previously ran with the SMT doesn't keep referencing a class that isn't there.OplogPopulatorConfigValidator: validates the new boolean flag.Context
BB-768: the oplog Kafka topic keys messages from
fullDocument.value.key, which isnullonupdate/deleteevents (BB-355 removedchange.stream.full.document=updateLookup). Those op types collapse ontohash({ns, null})while inserts spread across partitions, breaking per-object ordering across op types.The fix lives in the Zenko-side SMT (ZENKO-5274): it derives the key from
documentKey._id(always populated) and strips the arsenal master/version encoding, so master and all versions of an S3 object hash to the same partition. This PR is the Backbeat side that configures the connector to use it.Enablement model
Rather than Backbeat probing the Kafka Connect plugin path at runtime, the SMT is gated by an explicit config flag owned by the operator (ZKOP / zenkoversion CR). The flag is flipped on once the Connect image ships the
TransformObjectKeyplugin and the supporting Backbeat version is deployed. This keeps Backbeat free of auto-detection/runtime-flip machinery and gives a controlled rollout. A config change goes through the existing reconciliation: the connector is updated in place viaPUT /connectors/{name}/config(no recreate, no resume-token loss — the change touches only the key schema + transforms, not the pipeline match stage).Follow-up
transformObjectKeyinto the generated Backbeat config (tracked separately).Issue: BB-768