Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,12 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
for _, rwp := range rows {
qr := sqltypes.Proto3ToResult(rwp.Result)
for _, row := range qr.Rows {
resultCount += 1
sqlResult := &sqltypes.Result{
Fields: fields,
}
sqlResult.Rows = append(sqlResult.Rows, row)
// Results queued to Airbyte here, and flushed at the end of sync()
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps, tc.Position, resultCount)
resultCount += p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, s.PrimaryKeys, &ps, tc.Position, resultCount+1)
}
}
}
Expand Down Expand Up @@ -546,16 +545,21 @@ func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps
func (p PlanetScaleEdgeDatabase) printQueryResult(
qr *sqltypes.Result,
tableNamespace, tableName string,
primaryKeys [][]string,
ps *PlanetScaleSource,
position string,
resultCounter int,
) {
) int {
data := QueryResultToRecords(qr, ps)
recordsQueued := 0

for _, record := range data {
if record == nil {
continue
}
if recordHasOnlyNullOrMissingPrimaryKeys(record, primaryKeys) {
continue
}

if ps.IncludeMetadata {
// Ensure there's a _metadata field (map[string]interface{})
Expand All @@ -569,12 +573,48 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(
// Attach the extraction timestamp inside _metadata
metadata["extracted_at"] = time.Now().UnixNano()
// Attach a per sync sequence number inside _metadata
metadata["sequence_number"] = resultCounter
metadata["sequence_number"] = resultCounter + recordsQueued
record["_planetscale_metadata"] = metadata
}

p.Logger.Record(tableNamespace, tableName, record)
recordsQueued += 1
}

return recordsQueued
}

func recordHasOnlyNullOrMissingPrimaryKeys(record map[string]interface{}, primaryKeys [][]string) bool {
hasConfiguredPrimaryKey := false
for _, primaryKey := range primaryKeys {
if len(primaryKey) == 0 {
continue
}
hasConfiguredPrimaryKey = true
value, ok := recordValueForPath(record, primaryKey)
if ok && value != nil {
if sqlValue, ok := value.(sqltypes.Value); !ok || !sqlValue.IsNull() {
return false
}
}
}

return hasConfiguredPrimaryKey
}

func recordValueForPath(record map[string]interface{}, path []string) (interface{}, bool) {
var value interface{} = record
for _, component := range path {
nestedRecord, ok := value.(map[string]interface{})
if !ok {
return nil, false
}
value, ok = nestedRecord[component]
if !ok {
return nil, false
}
}
return value, true
}

func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard string, keyspace string, gtid string, lastKnownPk *query.QueryResult) *vtgate.VStreamRequest {
Expand Down
263 changes: 263 additions & 0 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,269 @@ func TestRead_IncrementalSync_CanStopIfNoRows(t *testing.T) {
assert.Equal(t, 0, len(tal.records["connect-test.products"]))
}

func TestRead_IncrementalSync_SkipsAllNullPrimaryKeyRowAfterDDLBoundary(t *testing.T) {
tma := getTestMysqlAccess()
tal := testAirbyteLogger{}
ped := PlanetScaleEdgeDatabase{
Logger: &tal,
Mysql: tma,
}

keyspace := "connect-test"
shard := "-"
table := "receipts"
startVGtid := "MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-2"
stopVGtid := "MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-3"

startCursor := &psdbconnect.TableCursor{
Shard: shard,
Position: startVGtid,
Keyspace: keyspace,
}
expectedCursor := &psdbconnect.TableCursor{
Shard: shard,
Position: stopVGtid,
Keyspace: keyspace,
}

getCurrentVGtidClient := &vtgateVStreamClientMock{
vstreamResponses: []*vstreamResponse{
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: stopVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
},
}

vstreamSyncClient := &vtgateVStreamClientMock{
vstreamResponses: []*vstreamResponse{
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: startVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_FIELD,
FieldEvent: &binlogdata.FieldEvent{
TableName: table,
Fields: []*query.Field{
{
Name: "id",
Type: query.Type_INT64,
Table: table,
OrgTable: table,
Database: keyspace,
ColumnLength: 20,
Charset: 63,
ColumnType: "bigint",
},
{
Name: "status",
Type: query.Type_VARCHAR,
Table: table,
OrgTable: table,
Database: keyspace,
ColumnLength: 255,
Charset: 255,
ColumnType: "varchar(255)",
},
},
},
},
},
},
},
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_DDL,
Keyspace: keyspace,
Shard: shard,
Statement: "ALTER TABLE receipts ADD COLUMN settlement_currency varchar(3)",
},
},
},
},
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_ROW,
RowEvent: &binlogdata.RowEvent{
TableName: table,
Keyspace: keyspace,
Shard: shard,
RowChanges: []*binlogdata.RowChange{
{
After: &query.Row{
Lengths: []int64{-1, 4},
Values: []byte("paid"),
},
},
},
},
},
},
},
},
{
response: &vtgate.VStreamResponse{
Events: []*binlogdata.VEvent{
{
Type: binlogdata.VEventType_VGTID,
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Gtid: stopVGtid,
Keyspace: keyspace,
},
},
},
},
},
},
},
},
}

vsc := vstreamClientMock{
vstreamFn: func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) {
assert.Equal(t, topodata.TabletType_PRIMARY, in.TabletType)
if in.Vgtid.ShardGtids[0].Gtid == "current" {
return getCurrentVGtidClient, nil
}
return vstreamSyncClient, nil
},
}

ped.vtgateClientFn = func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error) {
return &vsc, nil
}

ps := PlanetScaleSource{
Database: "connect-test",
}
cs := ConfiguredStream{
SyncMode: SYNC_MODE_INCREMENTAL,
Stream: Stream{
Name: table,
Namespace: keyspace,
PrimaryKeys: [][]string{{"id"}},
},
}

sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, startCursor)
assert.NoError(t, err)
assert.NotNil(t, sc)
records := tal.records["connect-test.receipts"]
for _, record := range records {
id, ok := record["id"]
if !ok || id == nil {
assert.Failf(t, "Read emitted a record with a null or missing configured primary key", "record: %#v", record)
}
}
assert.Equal(t, 0, len(records), "DDL-boundary null-primary-key rows should be skipped")

esc, err := TableCursorToSerializedCursor(expectedCursor)
assert.NoError(t, err)
assert.Equal(t, esc, sc)
assert.Equal(t, 2, vsc.vstreamFnInvokedCount)
}

func TestRecordHasOnlyNullOrMissingPrimaryKeys(t *testing.T) {
tests := []struct {
name string
record map[string]interface{}
primaryKeys [][]string
want bool
}{
{
name: "no configured primary keys keeps record",
record: map[string]interface{}{"id": nil},
primaryKeys: nil,
want: false,
},
{
name: "single primary key is nil",
record: map[string]interface{}{"id": nil},
primaryKeys: [][]string{{"id"}},
want: true,
},
{
name: "single primary key is missing",
record: map[string]interface{}{"status": "paid"},
primaryKeys: [][]string{{"id"}},
want: true,
},
{
name: "single primary key has value",
record: map[string]interface{}{"id": sqltypes.NewInt64(1)},
primaryKeys: [][]string{{"id"}},
want: false,
},
{
name: "composite primary key has one value",
record: map[string]interface{}{"id": nil, "tenant_id": sqltypes.NewInt64(7)},
primaryKeys: [][]string{{"id"}, {"tenant_id"}},
want: false,
},
{
name: "nested primary key has value",
record: map[string]interface{}{
"metadata": map[string]interface{}{
"id": "receipt_1",
},
},
primaryKeys: [][]string{{"metadata", "id"}},
want: false,
},
{
name: "sqltypes null is treated as nil",
record: map[string]interface{}{"id": sqltypes.NULL},
primaryKeys: [][]string{{"id"}},
want: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, recordHasOnlyNullOrMissingPrimaryKeys(tt.record, tt.primaryKeys))
})
}
}

func getTestMysqlAccess() *mysqlAccessMock {
tma := mysqlAccessMock{
PingContextFn: func(ctx context.Context, source PlanetScaleSource) error {
Expand Down
Loading