Fix/handle null partition for pg/mysql minmax and pg NTILE#3983
Fix/handle null partition for pg/mysql minmax and pg NTILE#3983dtunikov wants to merge 12 commits intoPeerDB-io:mainfrom
Conversation
| name string | ||
| config *protos.QRepConfig | ||
| last *protos.QRepPartition | ||
| want []*protos.QRepPartition |
There was a problem hiding this comment.
this hasn't been used by any of test cases, so I removed it
| case *protos.PartitionRange_ObjectIdRange: | ||
| rangeStart = x.ObjectIdRange.Start | ||
| rangeEnd = x.ObjectIdRange.End | ||
| case *protos.PartitionRange_NullRange: |
There was a problem hiding this comment.
otherwise this monitoring insertion would fail
rangeStart and rangeEnd are non nullable fields
the other option would be to make them nullable with a migration, but I thought it would be too much for this issue
There was a problem hiding this comment.
DB migration is preferable here, they're reliable and easy to do. Want to do it?
| } | ||
|
|
||
| var minmaxQuery string | ||
| minmaxQuery := fmt.Sprintf("SELECT MIN(`%[2]s`),MAX(`%[2]s`) FROM %[1]s", |
There was a problem hiding this comment.
simplified if/else statements here a bit as this condition "WHERE %[2]s > " has been moved below
❌ 2 Tests Failed:
View the top 2 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
There was a problem hiding this comment.
Pull request overview
This PR fixes a critical data loss issue in QRep (Query Replication) partitioning where rows with NULL values in the watermark column were silently skipped during initial snapshots. The fix adds a dedicated "null partition" to capture these rows for both MySQL (MinMax) and Postgres (NTILE + MinMax) partitioning strategies.
Changes:
- Added
NullPartitionRangeproto message andbase_queryfield toQRepConfigfor clean null partition query construction - Implemented null partition support in MySQL and Postgres connectors, only enabled for
InitialCopyOnlymode to avoid duplicating rows in ongoing replication - Updated tests to include NULL watermark values and verify partition counts
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| protos/flow.proto | Added NullPartitionRange message type and base_query field |
| flow/connectors/utils/partition.go | Added AddNullPartition() helper method |
| flow/connectors/utils/monitoring/monitoring.go | Added NullRange handling for monitoring |
| flow/connectors/mysql/qrep.go | Added null partition creation and NullRange query handling |
| flow/connectors/postgres/qrep_partition.go | Added null partition to NTILE and MinMax strategies |
| flow/connectors/postgres/qrep.go | Added NullRange handling in record pulling |
| flow/connectors/postgres/qrep_query_build_test.go | Updated BuildQuery test signature |
| flow/workflows/snapshot_flow.go | Extracted base_query and populated BaseQuery field |
| flow/connectors/postgres/qrep_partition_test.go | Added NULL value test data and updated partition count logic |
| docs/architecture.md | Added architecture documentation (unrelated to PR) |
| .gitignore | Added .claude/ directory (unrelated to PR) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case *protos.PartitionRange_NullRange: | ||
| query = fmt.Sprintf("%s WHERE `%s` IS NULL", config.BaseQuery, config.WatermarkColumn) | ||
| default: | ||
| return 0, 0, fmt.Errorf("unknown range type: %v", x) | ||
| } | ||
|
|
||
| // Build the query to pull records within the range from the source table | ||
| // Be sure to order the results by the watermark column to ensure consistency across pulls | ||
| query, err := BuildQuery(c.logger, config.Query, rangeStart, rangeEnd) | ||
| query, err := BuildQuery(c.logger, query, rangeStart, rangeEnd) | ||
| if err != nil { | ||
| return 0, 0, err | ||
| } |
There was a problem hiding this comment.
When handling NullRange, the code rebuilds the query without template variables but still calls BuildQuery with uninitialized rangeStart and rangeEnd (empty strings). This is inconsistent and could lead to issues if the template execution encounters problems with missing variables.
The code should either:
- Skip calling BuildQuery for NullRange (since the query is already fully built), or
- Set rangeStart and rangeEnd to empty strings explicitly before calling BuildQuery to make the intent clear
For consistency with the Postgres implementation, consider skipping BuildQuery when the query has no template variables to expand.
| // handling it just in case to keep the query correct, but this case should never happen because we only add null partition for InitialCopyOnly replication | ||
| minVal = "" |
There was a problem hiding this comment.
When a NullRange is encountered in the last partition (which should never happen according to the comment), the code silently sets minVal to empty string and skips adding the WHERE clause. This is inconsistent with the Postgres implementation which returns an error for unknown range types.
For defensive programming and consistency, consider either:
- Adding an error return if NullRange is encountered (similar to Postgres):
return nil, fmt.Errorf("unexpected null range in last partition") - Adding a comment explaining why it's safe to continue without a WHERE clause in this impossible scenario
| // handling it just in case to keep the query correct, but this case should never happen because we only add null partition for InitialCopyOnly replication | |
| minVal = "" | |
| // this case should never happen because we only add null partition for InitialCopyOnly replication; | |
| // fail fast for defensive programming and consistency with the Postgres implementation | |
| return nil, fmt.Errorf("unexpected null range in last partition") |
| if pp.addNullPartition { | ||
| partitionHelper.AddNullPartition() | ||
| } |
There was a problem hiding this comment.
When addNullPartition is true, the NTILE query (defined earlier at line 36-41) may include rows with NULL watermark values, resulting in buckets with NULL MIN/MAX ranges. These NULL-ranged partitions won't match any rows when queried (since WHERE col >= NULL AND col < NULL returns no rows). While the explicit null partition added here captures all NULLs, the NULL-ranged partitions from NTILE become no-ops, creating unnecessary overhead.
Consider filtering out NULLs from the NTILE query when addNullPartition is true to make partition boundaries cleaner and avoid creating empty partitions.
| // add null values partition to the end, if nulls aren't present it will be an empty partition that gets skipped during replication, but if nulls are present it ensures they get replicated | ||
| if pp.addNullPartition { | ||
| partitionHelper.AddNullPartition() | ||
| } |
There was a problem hiding this comment.
When addNullPartition is true, the MIN/MAX query (defined earlier at line 87-94) may include rows with NULL watermark values. Since SQL MIN/MAX functions ignore NULLs, this works correctly for non-NULL values. However, to be more explicit and avoid potential confusion, consider filtering out NULLs from the MIN/MAX query when addNullPartition is true, since those rows will be handled by the dedicated null partition added here.
| OffsetNumber: uint16(x.TidRange.End.OffsetNumber), | ||
| Valid: true, | ||
| } | ||
| case *protos.PartitionRange_NullRange: |
There was a problem hiding this comment.
When handling NullRange, the code uses config.BaseQuery without validating that it's non-empty. If BaseQuery is not set (which is possible since it's only set in snapshot_flow.go), this will generate an invalid query like " WHERE col IS NULL".
Consider adding validation to ensure BaseQuery is set when InitialCopyOnly is true, or add a runtime check here to return a more descriptive error if BaseQuery is empty when handling NullRange.
| case *protos.PartitionRange_NullRange: | |
| case *protos.PartitionRange_NullRange: | |
| if strings.TrimSpace(config.BaseQuery) == "" { | |
| return 0, 0, fmt.Errorf("BaseQuery must be set when handling NullRange for flow %s", config.FlowJobName) | |
| } |
| case *protos.PartitionRange_TimestampRange: | ||
| rangeStart = "'" + x.TimestampRange.Start.AsTime().Format("2006-01-02 15:04:05.999999") + "'" | ||
| rangeEnd = "'" + x.TimestampRange.End.AsTime().Format("2006-01-02 15:04:05.999999") + "'" | ||
| case *protos.PartitionRange_NullRange: |
There was a problem hiding this comment.
When handling NullRange, the code uses config.BaseQuery without validating that it's non-empty. If BaseQuery is not set (which is possible since it's only set in snapshot_flow.go), this will generate an invalid query like " WHERE col IS NULL".
Consider adding validation to ensure BaseQuery is set when InitialCopyOnly is true, or add a runtime check here to return a more descriptive error if BaseQuery is empty when handling NullRange.
| case *protos.PartitionRange_NullRange: | |
| case *protos.PartitionRange_NullRange: | |
| if config.BaseQuery == "" { | |
| return 0, 0, fmt.Errorf("BaseQuery must be set when handling NullRange for watermark column %q", config.WatermarkColumn) | |
| } |
| 2. **Network Bandwidth**: Between source → worker → destination | ||
| 3. **Destination Write Speed**: Bulk insert performance varies by destination | ||
| 4. **Normalization Lag**: If destination is slow to normalize, sync blocks via backpressure | ||
| 5. **Memory**: Large batches can cause OOM if not streaming properly (channel-based streaming prevents this) |
There was a problem hiding this comment.
This file addition appears unrelated to the PR's stated purpose of fixing null partition handling. The architecture documentation, while potentially useful, should be added in a separate PR focused on documentation improvements. Including unrelated changes makes the PR harder to review and understand.
| 5. **Memory**: Large batches can cause OOM if not streaming properly (channel-based streaming prevents this) | |
| 5. **Memory**: Large batches and skewed/null partitions can cause OOM if not streaming properly (the null partition handling fix ensures channel-based streaming prevents this) |
ilidemi
left a comment
There was a problem hiding this comment.
Good start! I'll need another pass across QRep to see if anything could be simplified, here are some comments in the meantime
| minVal = "'" + time.Format("2006-01-02 15:04:05.999999") + "'" | ||
| case *protos.PartitionRange_NullRange: | ||
| // handling it just in case to keep the query correct, but this case should never happen because we only add null partition for InitialCopyOnly replication | ||
| minVal = "" |
There was a problem hiding this comment.
It's better to error out here to be explicit that this situation is not supposed to happen. As-is this branch is a no-op, as minVal is already an empty string and the switch is non-exhaustive.
| case *protos.PartitionRange_ObjectIdRange: | ||
| rangeStart = x.ObjectIdRange.Start | ||
| rangeEnd = x.ObjectIdRange.End | ||
| case *protos.PartitionRange_NullRange: |
There was a problem hiding this comment.
DB migration is preferable here, they're reliable and easy to do. Want to do it?
| DestinationName: s.config.DestinationName, | ||
| Query: query, | ||
| BaseQuery: baseQuery, | ||
| WatermarkColumn: mapping.PartitionKey, |
There was a problem hiding this comment.
We have the information on whether the watermark column is nullable here. Let's pass it too and be precise
| OffsetNumber: uint16(r.TidRange.End.OffsetNumber), | ||
| } | ||
| case *protos.PartitionRange_NullRange: | ||
| // null partition is always last, no need to track prev start/end |
There was a problem hiding this comment.
If this code is never called with null, can return an explicit error here
There was a problem hiding this comment.
A welcome addition. Let's submit this as a separate PR.
Summary
The PR implements this issue - #3951
NullPartitionRangeproto type andbase_queryfield toQRepConfigfor building theWHERE col IS NULLquery cleanlyContext
When QRep partitioning uses MIN/MAX and range queries (
col BETWEEN start AND end) to split tables into chunks, rows with NULL in the watermark column are silently lost:MIN()/MAX()ignore NULLscol >= X AND col < Y) exclude NULLsThis only matters when a custom partitioning key is set (not the default
ctidon Postgres, which scans physical blocks and includes all rows).Changes
NullPartitionRangemessage,null_rangeinPartitionRangeoneof,base_queryinQRepConfigAddNullPartition()to sharedPartitionHelperGetQRepPartitionswhenInitialCopyOnly, handleNullRangeinPullQRepRecords, refactored MIN/MAX query buildingaddNullPartitiontoPartitionParams, append null partition in both NTILE and MinMax strategiesaddNullPartitionfromInitialCopyOnly, handleNullRangeinPullQRepRecordsbaseQueryand setBaseQueryin QRepConfigDesign decisions
I decided that it would be easier to always have null partition (regardless of column nullability), it will be no-op anyways later during ingestion
But we can also add a check to partition creation code and make it optional depending on watermarkColumn type
InitialCopyOnly=true: Continuous QRep would re-replicate NULL rows every iteration since there's no way to track progress on themBaseQueryproto field: Avoids fragile string manipulation on the query template to build theWHERE col IS NULLqueryObjectIdPartitionRangeand MongoDB uses_id(never null) — adding null partitions generically would break other connectors