oplogPopulator: synthesise oplog key via $addFields in connector pipeline#2744
oplogPopulator: synthesise oplog key via $addFields in connector pipeline#2744delthas wants to merge 1 commit into
Conversation
Hello delthas,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
Waiting for approvalThe following approvals are needed before I can proceed with the merge:
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
... and 10 files with indirect coverage changes
@@ Coverage Diff @@
## development/9.5 #2744 +/- ##
===================================================
- Coverage 74.73% 74.50% -0.23%
===================================================
Files 199 199
Lines 13650 13650
===================================================
- Hits 10201 10170 -31
- Misses 3439 3470 +31
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
1193adb to
20249e5
Compare
4edd41b to
e24ee0a
Compare
e24ee0a to
37fb5a1
Compare
francoisferrand
left a comment
There was a problem hiding this comment.
will the recently updated logic correctly update the pipeline in all cases?
or do we have some situations were we may still stay with an old pipeline?
…line The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: append a $addFields stage to the MongoDB connector's change-stream pipeline that synthesises a top-level s3Key field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key is replaced with a single-field Avro record {s3Key: [string, null]} so the Kafka message key is just that field. All events for the same logical S3 object — insert, master/version updates, replication-status updates, delete-marker updates — yield the same s3Key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + pipeline only, not the pipeline match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): #2741 (SMT-track Backbeat PR) scality/Zenko#2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768
37fb5a1 to
e0ce22c
Compare
Yes, on the next Backbeat restart, no existing connector stays on the old pipeline:
The one path that doesn't flip in-process: an already-running Backbeat with old code in memory — the in-process Also addressed in
|
|
LGTM. Clean, well-scoped change. The $addFields pipeline stage correctly synthesises the key via $ifNull coalescing, the Avro key schema simplification is consistent with the new pipeline field, and the test coverage (unit + functional against real MongoDB) is thorough — covering insert, update, both-present, and neither-present paths. One minor nit: the PR description refers to the Avro field as Review by Claude Code |
Summary
extensions/oplogPopulator/pipeline/PipelineFactory.js: insert a$addFieldsstage into the connector's change-stream pipeline (right after$match, before the conditional location-strip$set) that synthesises a top-levelkeyfield via$ifNullcoalescing$fullDocument.value.keyand$updateDescription.updatedFields.value.key.extensions/oplogPopulator/constants.js: replace the brokenfullDocumentnested record inoutput.schema.keywith a top-levelkey: [string, null]field. The existingns: {coll}projection is preserved, so the Kafka message key remains{ns: {coll: <bucket>}, key: <object-key>}— bucket-level isolation is unchanged.connectorConfigfixture used byConnectorsManagertests.Pure Backbeat change. No SMT, no Zenko image change, no operator flag. Always on.
Context
BB-768: the current key schema projects
fullDocument.value.key, which isnullonupdateevents (since BB-355 removedchange.stream.full.document=updateLookup). Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition — both an ordering problem (insert → update on different partitions for the same object) and a throughput problem (a hot bucket's update traffic can't scale with partition count, blocking BB-756).After ticket discussion (Jira comment 477122) we picked the pipeline-
$addFieldsroute over a Kafka Connect SMT: it's a single-repo fix, and the measured server-side cost is ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s — not a throughput concern.For the same logical S3 object — insert (key in
fullDocument.value.key), master PUT update (key inupdatedFields.value.key), replication-status update, delete-marker update — all yield the samekeyvalue → same partition. Master and version documents both store the samevalue.key, so master/version events also collapse to the same partition without prefix-stripping.Coupling caveat
The
$ifNullvariant relies on metadata always writing the wholevaluesubdocument (full$set). Confirmed today against arsenal — there's no partial dotted-$setpath for object MD. A future partial update ($set: {"value.x": …}) would not populateupdatedFields.value.keyand the resulting event would mis-partition. Accepted risk per the ticket discussion.Migration
The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/configreconciliation (no recreate, no resume-token loss — the change touches the key schema + the added$addFieldsstage only, not the$matchstage). Downstream oplog consumers don't read the Kafka message key, so the new key shape is consumer-transparent. See this comment for the full migration trace.Issue: BB-768