-
Notifications
You must be signed in to change notification settings - Fork 137
feat(table): Support Dynamic Partition Overwrite #482
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(table): Support Dynamic Partition Overwrite #482
Conversation
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
table/transaction.go
Outdated
| // Check that all partition fields use identity transforms | ||
| currentSpec := t.meta.CurrentSpec() | ||
| for field := range currentSpec.Fields() { | ||
| if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { | ||
| return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", | ||
| ErrInvalidOperation, field.Name) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this defined in the spec? Or is this just a NotYetImplemented thing?
| if tbl.NumRows() == 0 { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this overwrite the partition with an empty partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cmiiw but in the spark writer is it quite similar https://github.com/apache/iceberg/blob/0651b8913d27c3b1c9aca4a9609bec521905fb36/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L297-L305
wdyt 🤔 ?
| var allDataFiles []iceberg.DataFile | ||
| for df, err := range dataFiles { | ||
| if err != nil { | ||
| return err | ||
| } | ||
| allDataFiles = append(allDataFiles, df) | ||
| } | ||
|
|
||
| partitionsToOverwrite := make(map[string]struct{}) | ||
| for _, df := range allDataFiles { | ||
| partitionKey := fmt.Sprintf("%v", df.Partition()) | ||
| partitionsToOverwrite[partitionKey] = struct{}{} | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can probably merge these loops
table/transaction.go
Outdated
| return err | ||
| } | ||
|
|
||
| deleteProducer := t.updateSnapshot(fs, snapshotProps).mergeOverwrite(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this use the commitUUID?
table/transaction.go
Outdated
| partitionExpr := partitionExprs[0] | ||
| for _, expr := range partitionExprs[1:] { | ||
| partitionExpr = iceberg.NewAnd(partitionExpr, expr) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already handled via NewAnd. You can do: partitionExpr := iceberg.NewAnd(partitionExprs[0], partitionExprs[1], partitionExprs[2:]...)
table/transaction.go
Outdated
| result := expressions[0] | ||
| for _, expr := range expressions[1:] { | ||
| result = iceberg.NewOr(result, expr) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above, iceberg.NewOr already handles an arbitrary number of arguments so you don't have to do this loop manually
table/transaction.go
Outdated
| func parsePartitionKey(partitionKey string, fieldNames []string) []interface{} { | ||
| // Simple parsing for demonstration - assumes a format like "field1=value1/field2=value2" | ||
| parts := strings.Split(partitionKey, "/") | ||
| values := make([]interface{}, len(fieldNames)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have the schema, we can use the field names to determine the types so we know what type to parse into from the strings
table/transaction.go
Outdated
| switch t := typ.(type) { | ||
| case iceberg.PrimitiveType: | ||
| switch t { | ||
| case iceberg.PrimitiveTypes.Int32: | ||
| if v, ok := value.(int32); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Int64: | ||
| if v, ok := value.(int64); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Float32: | ||
| if v, ok := value.(float32); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Float64: | ||
| if v, ok := value.(float64); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.String: | ||
| if v, ok := value.(string); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| case iceberg.PrimitiveTypes.Bool: | ||
| if v, ok := value.(bool); ok { | ||
| return iceberg.EqualTo(term, v) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the types and casting should be handled for you once the expression is bound. So you shouldn't need the iceberg.Type, just do a switch on value.(type) and calling iceberg.EqualTo(term, v)
table/transaction.go
Outdated
| } | ||
|
|
||
| // deleteFileByFilter performs a delete operation with the given filter and snapshot properties. | ||
| func (t *Transaction) deleteFileByFilter(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also working on a complete delete API (CoW) that can delete row level and file level based on predicate in #518.
Hopefully we don't need this method once the full delete API is supported.
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
No description provided.