-
Notifications
You must be signed in to change notification settings - Fork 85
Utilize memory allocator in ReadProperties.GetStream #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
43589bd
c926904
eec120a
8edb901
c7bee5e
1dde56d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,9 @@ import ( | |
| "github.com/apache/arrow-go/v18/arrow/memory" | ||
| "github.com/apache/arrow-go/v18/internal/utils" | ||
| "github.com/apache/arrow-go/v18/parquet" | ||
| "github.com/apache/arrow-go/v18/parquet/compress" | ||
| "github.com/apache/arrow-go/v18/parquet/file" | ||
| "github.com/apache/arrow-go/v18/parquet/internal/encryption" | ||
| "github.com/apache/arrow-go/v18/parquet/internal/testutils" | ||
| "github.com/apache/arrow-go/v18/parquet/pqarrow" | ||
| "github.com/apache/arrow-go/v18/parquet/schema" | ||
|
|
@@ -42,6 +44,17 @@ import ( | |
| "github.com/stretchr/testify/suite" | ||
| ) | ||
|
|
||
| const ( | ||
| FooterEncryptionKey = "0123456789012345" | ||
| ColumnEncryptionKey1 = "1234567890123450" | ||
| ColumnEncryptionKey2 = "1234567890123451" | ||
| ColumnEncryptionKey3 = "1234567890123452" | ||
| FooterEncryptionKeyID = "kf" | ||
| ColumnEncryptionKey1ID = "kc1" | ||
| ColumnEncryptionKey2ID = "kc2" | ||
| ColumnEncryptionKey3ID = "kc3" | ||
| ) | ||
|
|
||
| func initValues(values reflect.Value) { | ||
| if values.Kind() != reflect.Slice { | ||
| panic("must init values with slice") | ||
|
|
@@ -813,6 +826,145 @@ func TestFullSeekRow(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| func checkDecryptedValues(t *testing.T, writerProps *parquet.WriterProperties, readProps *parquet.ReaderProperties) { | ||
| sc := arrow.NewSchema([]arrow.Field{ | ||
| {Name: "c0", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, | ||
| {Name: "c1", Type: arrow.BinaryTypes.String, Nullable: true}, | ||
| {Name: "c2", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64), Nullable: true}, | ||
| }, nil) | ||
|
|
||
| tbl, err := array.TableFromJSON(mem, sc, []string{`[ | ||
| {"c0": 1, "c1": "a", "c2": [1]}, | ||
| {"c0": 2, "c1": "b", "c2": [1, 2]}, | ||
| {"c0": 3, "c1": "c", "c2": [null]}, | ||
| {"c0": null, "c1": "d", "c2": []}, | ||
| {"c0": 5, "c1": null, "c2": [3, 3, 3]}, | ||
| {"c0": 6, "c1": "f", "c2": null} | ||
| ]`}) | ||
| require.NoError(t, err) | ||
| defer tbl.Release() | ||
|
|
||
| schema := tbl.Schema() | ||
| arrWriterProps := pqarrow.NewArrowWriterProperties() | ||
|
|
||
| var buf bytes.Buffer | ||
| wr, err := pqarrow.NewFileWriter(schema, &buf, writerProps, arrWriterProps) | ||
| require.NoError(t, err) | ||
|
|
||
| require.NoError(t, wr.WriteTable(tbl, tbl.NumRows())) | ||
| require.NoError(t, wr.Close()) | ||
|
|
||
| rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(readProps)) | ||
| require.NoError(t, err) | ||
| defer rdr.Close() | ||
|
|
||
| rgr := rdr.RowGroup(0) | ||
| col0, err := rgr.Column(0) | ||
| require.NoError(t, err) | ||
|
|
||
| icr := col0.(*file.Int64ColumnChunkReader) | ||
| // require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I uncomment this, then this causes a panic
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That shouldn't be panicing as the SeekToRow works correctly based on my last tests.... I'll see if i can debug this |
||
|
|
||
| vals := make([]int64, 6) | ||
| defLvls := make([]int16, 6) | ||
| repLvls := make([]int16, 6) | ||
|
|
||
| totalLvls, read, err := icr.ReadBatch(6, vals, defLvls, repLvls) | ||
| require.NoError(t, err) | ||
| assert.EqualValues(t, 6, totalLvls) | ||
| assert.EqualValues(t, 5, read) | ||
| assert.Equal(t, []int64{1, 2, 3, 5, 6}, vals[:read]) | ||
| assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLvls[:totalLvls]) | ||
| assert.Equal(t, []int16{0, 0, 0, 0, 0, 0}, repLvls[:totalLvls]) | ||
|
|
||
| col1, err := rgr.Column(1) | ||
| require.NoError(t, err) | ||
|
|
||
| scr := col1.(*file.ByteArrayColumnChunkReader) | ||
|
|
||
| bavals := make([]parquet.ByteArray, 6) | ||
| badefLvls := make([]int16, 6) | ||
| barepLvls := make([]int16, 6) | ||
|
|
||
| totalLvls, read, err = scr.ReadBatch(6, bavals, badefLvls, barepLvls) | ||
| require.NoError(t, err) | ||
| assert.EqualValues(t, 6, totalLvls) | ||
| assert.EqualValues(t, 5, read) | ||
| expectedBAs := []parquet.ByteArray{ | ||
| []byte("a"), | ||
| []byte("b"), | ||
| []byte("c"), | ||
| []byte("d"), | ||
| []byte("f"), | ||
| } | ||
| assert.Equal(t, expectedBAs, bavals[:read]) | ||
| assert.Equal(t, []int16{1, 1, 1, 1, 0, 1}, badefLvls[:totalLvls]) | ||
| assert.Equal(t, []int16{0, 0, 0, 0, 0, 0}, barepLvls[:totalLvls]) | ||
|
|
||
| col2, err := rgr.Column(2) | ||
| require.NoError(t, err) | ||
|
|
||
| lcr := col2.(*file.Int64ColumnChunkReader) | ||
| vals = make([]int64, 10) | ||
| defLvls = make([]int16, 10) | ||
| repLvls = make([]int16, 10) | ||
| totalLvls, read, err = lcr.ReadBatch(6, vals, defLvls, repLvls) | ||
| require.NoError(t, err) | ||
|
|
||
| assert.EqualValues(t, 6, totalLvls) | ||
| assert.EqualValues(t, 4, read) | ||
|
|
||
| assert.Equal(t, []int64{1, 1, 2, 3}, vals[:read]) | ||
| assert.Equal(t, []int16{3, 3, 3, 2, 1, 3}, defLvls[:totalLvls]) | ||
| assert.Equal(t, []int16{0, 0, 1, 0, 0, 0}, repLvls[:totalLvls]) | ||
| } | ||
|
|
||
| func TestDecryptColumns(t *testing.T) { | ||
| encryptCols := make(parquet.ColumnPathToEncryptionPropsMap) | ||
| encryptCols["c0"] = parquet.NewColumnEncryptionProperties("c0", parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID(ColumnEncryptionKey1ID)) | ||
| encryptCols["c1"] = parquet.NewColumnEncryptionProperties("c1", parquet.WithKey(ColumnEncryptionKey2), parquet.WithKeyID(ColumnEncryptionKey2ID)) | ||
| encryptCols["c2.list.element"] = parquet.NewColumnEncryptionProperties("c2.list.element", parquet.WithKey(ColumnEncryptionKey3), parquet.WithKeyID(ColumnEncryptionKey3ID)) | ||
| encryptProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithFooterKeyMetadata(FooterEncryptionKeyID), | ||
| parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr)) | ||
|
|
||
| stringKr1 := make(encryption.StringKeyIDRetriever) | ||
| stringKr1.PutKey(FooterEncryptionKeyID, FooterEncryptionKey) | ||
| stringKr1.PutKey(ColumnEncryptionKey1ID, ColumnEncryptionKey1) | ||
| stringKr1.PutKey(ColumnEncryptionKey2ID, ColumnEncryptionKey2) | ||
| stringKr1.PutKey(ColumnEncryptionKey3ID, ColumnEncryptionKey3) | ||
| decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr1)) | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| dataPageVersion parquet.DataPageVersion | ||
| bufferedStream bool | ||
| compression compress.Compression | ||
| }{ | ||
| {"DataPageV2_BufferedRead", parquet.DataPageV2, true, compress.Codecs.Uncompressed}, | ||
| {"DataPageV2_DirectRead", parquet.DataPageV2, false, compress.Codecs.Uncompressed}, | ||
| {"DataPageV2_BufferedRead_Compressed", parquet.DataPageV2, true, compress.Codecs.Snappy}, | ||
| {"DataPageV2_DirectRead_Compressed", parquet.DataPageV2, false, compress.Codecs.Snappy}, | ||
| // {"DataPageV1_BufferedRead", parquet.DataPageV1, true, compress.Codecs.Uncompressed}, | ||
| // {"DataPageV1_DirectRead", parquet.DataPageV1, false, compress.Codecs.Uncompressed}, | ||
| // {"DataPageV1_BufferedRead_Compressed", parquet.DataPageV1, true, compress.Codecs.Snappy}, | ||
| // {"DataPageV1_DirectRead_Compressed", parquet.DataPageV1, false, compress.Codecs.Snappy}, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| writerProps := parquet.NewWriterProperties( | ||
| parquet.WithDataPageVersion(tt.dataPageVersion), | ||
| parquet.WithEncryptionProperties(encryptProps.Clone("")), | ||
| parquet.WithCompression(tt.compression), | ||
| ) | ||
| readProps := parquet.NewReaderProperties(nil) | ||
| readProps.FileDecryptProps = decryptProps.Clone("") | ||
| readProps.BufferedStreamEnabled = tt.bufferedStream | ||
| checkDecryptedValues(t, writerProps, readProps) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkReadInt32Column(b *testing.B) { | ||
| // generate parquet with RLE-dictionary encoded int32 column | ||
| tempdir := b.TempDir() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade I think there is a bug in pqarrow writer. For the data page v2 buffer, it first writes the levels (definition and repetition) and then the values. Only values are compressed. However, this whole buffer is then encrypted. And unless ChatGPT is hallucinating on me then only the compressed values should also be encrypted, levels should stay unencrypted and uncompressed.
I'll check with some encrypted parquet create in a different way on Monday and see what happens.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade OK, I should have some time this week to finish this. In fact, I think the memory allocation is done, but the decryption needs fixing before the tests pass.
I've been trying to use PyArrow to encrypt/decrypt files, but there seems to be some discrepancy in implementations. I cannot get files encrypted by PyArrow to decrypt using go-arrow and vice versa. I'll open an issue for the encryption/decryption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please link the issue here when you file it, as PyArrow and arrow-go should be agreeing on encrypting/decrypting and vice versa since pyarrow binds to Arrow C++ and the tests for parquet C++ and arrow-go should be the same tests that are passing. I'd be interested to reproduce the failure and debug it