diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 1336bc9..2fa6deb 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -438,13 +438,12 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * for _, rwp := range rows { qr := sqltypes.Proto3ToResult(rwp.Result) for _, row := range qr.Rows { - resultCount += 1 sqlResult := &sqltypes.Result{ Fields: fields, } sqlResult.Rows = append(sqlResult.Rows, row) // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount) + resultCount += p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, s.PrimaryKeys, &ps, tc.Position, resultCount+1) } } } @@ -546,16 +545,21 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps func (p PlanetScaleEdgeDatabase) printQueryResult( qr *sqltypes.Result, tableNamespace, tableName string, + primaryKeys [][]string, ps *PlanetScaleSource, position string, resultCounter int, -) { +) int { data := QueryResultToRecords(qr, ps) + recordsQueued := 0 for _, record := range data { if record == nil { continue } + if recordHasOnlyNullOrMissingPrimaryKeys(record, primaryKeys) { + continue + } if ps.IncludeMetadata { // Ensure there's a _metadata field (map[string]interface{}) @@ -569,12 +573,48 @@ func (p PlanetScaleEdgeDatabase) printQueryResult( // Attach the extraction timestamp inside _metadata metadata["extracted_at"] = time.Now().UnixNano() // Attach a per sync sequence number inside _metadata - metadata["sequence_number"] = resultCounter + metadata["sequence_number"] = resultCounter + recordsQueued record["_planetscale_metadata"] = metadata } p.Logger.Record(tableNamespace, tableName, record) + recordsQueued += 1 + } + + return recordsQueued +} + +func recordHasOnlyNullOrMissingPrimaryKeys(record map[string]interface{}, primaryKeys [][]string) bool { + hasConfiguredPrimaryKey := false + for _, primaryKey := range primaryKeys { + if len(primaryKey) == 0 { + continue + } + hasConfiguredPrimaryKey = true + value, ok := recordValueForPath(record, primaryKey) + if ok && value != nil { + if sqlValue, ok := value.(sqltypes.Value); !ok || !sqlValue.IsNull() { + return false + } + } + } + + return hasConfiguredPrimaryKey +} + +func recordValueForPath(record map[string]interface{}, path []string) (interface{}, bool) { + var value interface{} = record + for _, component := range path { + nestedRecord, ok := value.(map[string]interface{}) + if !ok { + return nil, false + } + value, ok = nestedRecord[component] + if !ok { + return nil, false + } } + return value, true } func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard string, keyspace string, gtid string, lastKnownPk *query.QueryResult) *vtgate.VStreamRequest { diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index 7d6eedd..0d5eb89 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -1672,6 +1672,269 @@ func TestRead_IncrementalSync_CanStopIfNoRows(t *testing.T) { assert.Equal(t, 0, len(tal.records["connect-test.products"])) } +func TestRead_IncrementalSync_SkipsAllNullPrimaryKeyRowAfterDDLBoundary(t *testing.T) { + tma := getTestMysqlAccess() + tal := testAirbyteLogger{} + ped := PlanetScaleEdgeDatabase{ + Logger: &tal, + Mysql: tma, + } + + keyspace := "connect-test" + shard := "-" + table := "receipts" + startVGtid := "MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-2" + stopVGtid := "MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-3" + + startCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: startVGtid, + Keyspace: keyspace, + } + expectedCursor := &psdbconnect.TableCursor{ + Shard: shard, + Position: stopVGtid, + Keyspace: keyspace, + } + + getCurrentVGtidClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + }, + } + + vstreamSyncClient := &vtgateVStreamClientMock{ + vstreamResponses: []*vstreamResponse{ + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: startVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_FIELD, + FieldEvent: &binlogdata.FieldEvent{ + TableName: table, + Fields: []*query.Field{ + { + Name: "id", + Type: query.Type_INT64, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 20, + Charset: 63, + ColumnType: "bigint", + }, + { + Name: "status", + Type: query.Type_VARCHAR, + Table: table, + OrgTable: table, + Database: keyspace, + ColumnLength: 255, + Charset: 255, + ColumnType: "varchar(255)", + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_DDL, + Keyspace: keyspace, + Shard: shard, + Statement: "ALTER TABLE receipts ADD COLUMN settlement_currency varchar(3)", + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_ROW, + RowEvent: &binlogdata.RowEvent{ + TableName: table, + Keyspace: keyspace, + Shard: shard, + RowChanges: []*binlogdata.RowChange{ + { + After: &query.Row{ + Lengths: []int64{-1, 4}, + Values: []byte("paid"), + }, + }, + }, + }, + }, + }, + }, + }, + { + response: &vtgate.VStreamResponse{ + Events: []*binlogdata.VEvent{ + { + Type: binlogdata.VEventType_VGTID, + Vgtid: &binlogdata.VGtid{ + ShardGtids: []*binlogdata.ShardGtid{ + { + Shard: shard, + Gtid: stopVGtid, + Keyspace: keyspace, + }, + }, + }, + }, + }, + }, + }, + }, + } + + vsc := vstreamClientMock{ + vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) { + assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType) + if in.Vgtid.ShardGtids[0].Gtid == "current" { + return getCurrentVGtidClient, nil + } + return vstreamSyncClient, nil + }, + } + + ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) { + return &vsc, nil + } + + ps := PlanetScaleSource{ + Database: "connect-test", + } + cs := ConfiguredStream{ + SyncMode: SYNC_MODE_INCREMENTAL, + Stream: Stream{ + Name: table, + Namespace: keyspace, + PrimaryKeys: [][]string{{"id"}}, + }, + } + + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor) + assert.NoError(t, err) + assert.NotNil(t, sc) + records := tal.records["connect-test.receipts"] + for _, record := range records { + id, ok := record["id"] + if !ok || id == nil { + assert.Failf(t, "Read emitted a record with a null or missing configured primary key", "record: %#v", record) + } + } + assert.Equal(t, 0, len(records), "DDL-boundary null-primary-key rows should be skipped") + + esc, err := TableCursorToSerializedCursor(expectedCursor) + assert.NoError(t, err) + assert.Equal(t, esc, sc) + assert.Equal(t, 2, vsc.vstreamFnInvokedCount) +} + +func TestRecordHasOnlyNullOrMissingPrimaryKeys(t *testing.T) { + tests := []struct { + name string + record map[string]interface{} + primaryKeys [][]string + want bool + }{ + { + name: "no configured primary keys keeps record", + record: map[string]interface{}{"id": nil}, + primaryKeys: nil, + want: false, + }, + { + name: "single primary key is nil", + record: map[string]interface{}{"id": nil}, + primaryKeys: [][]string{{"id"}}, + want: true, + }, + { + name: "single primary key is missing", + record: map[string]interface{}{"status": "paid"}, + primaryKeys: [][]string{{"id"}}, + want: true, + }, + { + name: "single primary key has value", + record: map[string]interface{}{"id": sqltypes.NewInt64(1)}, + primaryKeys: [][]string{{"id"}}, + want: false, + }, + { + name: "composite primary key has one value", + record: map[string]interface{}{"id": nil, "tenant_id": sqltypes.NewInt64(7)}, + primaryKeys: [][]string{{"id"}, {"tenant_id"}}, + want: false, + }, + { + name: "nested primary key has value", + record: map[string]interface{}{ + "metadata": map[string]interface{}{ + "id": "receipt_1", + }, + }, + primaryKeys: [][]string{{"metadata", "id"}}, + want: false, + }, + { + name: "sqltypes null is treated as nil", + record: map[string]interface{}{"id": sqltypes.NULL}, + primaryKeys: [][]string{{"id"}}, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, recordHasOnlyNullOrMissingPrimaryKeys(tt.record, tt.primaryKeys)) + }) + } +} + func getTestMysqlAccess() *mysqlAccessMock { tma := mysqlAccessMock{ PingContextFn: func(ctx context.Context, source PlanetScaleSource) error {