From 86cd728b523467700efdc9dea02dd0b804989d93 Mon Sep 17 00:00:00 2001 From: Ehsan Pourtorab Date: Thu, 7 May 2026 19:13:14 +0000 Subject: [PATCH] fix: skip records with null primary keys Filter Airbyte records whose configured primary key paths are all null or missing before they are emitted. This lets VStream continue through DDL boundaries and advance to the next VGTID instead of surfacing invalid records to Airbyte. Add a fake VStream regression test for the DDL boundary case and helper coverage for primary-key path filtering. --- cmd/internal/planetscale_edge_database.go | 48 +++- .../planetscale_edge_database_test.go | 263 ++++++++++++++++++ 2 files changed, 307 insertions(+), 4 deletions(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 1336bc9..2fa6deb 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -438,13 +438,12 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * for _, rwp := range rows { qr := sqltypes.Proto3ToResult(rwp.Result) for _, row := range qr.Rows { - resultCount += 1 sqlResult := &sqltypes.Result{ Fields: fields, } sqlResult.Rows = append(sqlResult.Rows, row) // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount) + resultCount += p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, s.PrimaryKeys, &ps, tc.Position, resultCount+1) } } } @@ -546,16 +545,21 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps func (p PlanetScaleEdgeDatabase) printQueryResult( qr *sqltypes.Result, tableNamespace, tableName string, + primaryKeys [][]string, ps *PlanetScaleSource, position string, resultCounter int, -) { +) int { data := QueryResultToRecords(qr, ps) + recordsQueued := 0 for _, record := range data { if record == nil { continue } + if recordHasOnlyNullOrMissingPrimaryKeys(record, primaryKeys) { + continue + } if ps.IncludeMetadata { // Ensure there's a _metadata field (map[string]interface{}) @@ -569,12 +573,48 @@ func (p PlanetScaleEdgeDatabase) printQueryResult( // Attach the extraction timestamp inside _metadata metadata["extracted_at"] = time.Now().UnixNano() // Attach a per sync sequence number inside _metadata - metadata["sequence_number"] = resultCounter + metadata["sequence_number"] = resultCounter + recordsQueued record["_planetscale_metadata"] = metadata } p.Logger.Record(tableNamespace, tableName, record) + recordsQueued += 1 + } + + return recordsQueued +} + +func recordHasOnlyNullOrMissingPrimaryKeys(record map[string]interface{}, primaryKeys [][]string) bool { + hasConfiguredPrimaryKey := false + for _, primaryKey := range primaryKeys { + if len(primaryKey) == 0 { + continue + } + hasConfiguredPrimaryKey = true + value, ok := recordValueForPath(record, primaryKey) + if ok && value != nil { + if sqlValue, ok := value.(sqltypes.Value); !ok || !sqlValue.IsNull() { + return false + } + } + } + + return hasConfiguredPrimaryKey +} + +func recordValueForPath(record map[string]interface{}, path []string) (interface{}, bool) { + var value interface{} = record + for _, component := range path { + nestedRecord, ok := value.(map[string]interface{}) + if !ok { + return nil, false + } + value, ok = nestedRecord[component] + if !ok { + return nil, false + } } + return value, true } func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard string, keyspace string, gtid string, lastKnownPk *query.QueryResult) *vtgate.VStreamRequest { diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index 7d6eedd..0d5eb89 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -1672,6 +1672,269 @@ func TestRead_IncrementalSync_CanStopIfNoRows(t *testing.T) { assert.Equal(t, 0, len(tal.records["connect-test.products"])) } +func TestRead_IncrementalSync_SkipsAllNullPrimaryKeyRowAfterDDLBoundary(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "receipts" + startVGtid := "MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-2" + stopVGtid := "MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-3" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, + } + expectedCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: stopVGtid, + Keyspace: keyspace, + } + + getCurrentVGtidClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + }, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: []*query.Field{ + { + Name: "id", + Type: query.Type_INT64, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 20, + Charset: 63, + ColumnType: "bigint", + }, + { + Name: "status", + Type: query.Type_VARCHAR, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 255, + Charset: 255, + ColumnType: "varchar(255)", + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_DDL, + Keyspace: keyspace, + Shard: shard, + Statement: "ALTER TABLE receipts ADD COLUMN settlement_currency varchar(3)", + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: &query.Row{ + Lengths: []int64{-1, 4}, + Values: []byte("paid"), + }, + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType) + if in.Vgtid.ShardGtids[0].Gtid == "current" { + return getCurrentVGtidClient, nil + } + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + } + cs := ConfiguredStream{ + SyncMode: SYNC_MODE_INCREMENTAL, + Stream: Stream{ + Name: table, + Namespace: keyspace, + PrimaryKeys: [][]string{{"id"}}, + }, + } + + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + assert.NoError(t, err) + assert.NotNil(t, sc) + records := tal.records["connect-test.receipts"] + for _, record := range records { + id, ok := record["id"] + if !ok || id == nil { + assert.Failf(t, "Read emitted a record with a null or missing configured primary key", "record: %#v", record) + } + } + assert.Equal(t, 0, len(records), "DDL-boundary null-primary-key rows should be skipped") + + esc, err := TableCursorToSerializedCursor(expectedCursor) + assert.NoError(t, err) + assert.Equal(t, esc, sc) + assert.Equal(t, 2, vsc.vstreamFnInvokedCount) +} + +func TestRecordHasOnlyNullOrMissingPrimaryKeys(t *testing.T) { + tests := []struct { + name string + record map[string]interface{} + primaryKeys [][]string + want bool + }{ + { + name: "no configured primary keys keeps record", + record: map[string]interface{}{"id": nil}, + primaryKeys: nil, + want: false, + }, + { + name: "single primary key is nil", + record: map[string]interface{}{"id": nil}, + primaryKeys: [][]string{{"id"}}, + want: true, + }, + { + name: "single primary key is missing", + record: map[string]interface{}{"status": "paid"}, + primaryKeys: [][]string{{"id"}}, + want: true, + }, + { + name: "single primary key has value", + record: map[string]interface{}{"id": sqltypes.NewInt64(1)}, + primaryKeys: [][]string{{"id"}}, + want: false, + }, + { + name: "composite primary key has one value", + record: map[string]interface{}{"id": nil, "tenant_id": sqltypes.NewInt64(7)}, + primaryKeys: [][]string{{"id"}, {"tenant_id"}}, + want: false, + }, + { + name: "nested primary key has value", + record: map[string]interface{}{ + "metadata": map[string]interface{}{ + "id": "receipt_1", + }, + }, + primaryKeys: [][]string{{"metadata", "id"}}, + want: false, + }, + { + name: "sqltypes null is treated as nil", + record: map[string]interface{}{"id": sqltypes.NULL}, + primaryKeys: [][]string{{"id"}}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, recordHasOnlyNullOrMissingPrimaryKeys(tt.record, tt.primaryKeys)) + }) + } +} + func getTestMysqlAccess() *mysqlAccessMock { tma := mysqlAccessMock{ PingContextFn: func(ctx context.Context, source PlanetScaleSource) error {