Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions internal/utils/buf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"io"

"github.com/apache/arrow-go/v18/arrow/memory"
)

type Reader interface {
Expand All @@ -38,6 +40,7 @@ type byteReader struct {

// NewByteReader creates a new ByteReader instance from the given byte slice.
// It wraps the bytes.NewReader function to implement BufferedReader interface.
// It is considered not to own the underlying byte slice, so the Free method is a no-op.
func NewByteReader(buf []byte) *byteReader {
r := bytes.NewReader(buf)
return &byteReader{
Expand Down Expand Up @@ -108,10 +111,48 @@ func (r *byteReader) Reset(Reader) {}

func (r *byteReader) BufferSize() int { return len(r.buf) }

func (r *byteReader) Buffered() int { return len(r.buf) - r.pos }

func (r *byteReader) Free() {}

// bytesBufferReader is a byte slice with a bytes reader wrapped around it.
// It uses an allocator to allocate and free the underlying byte slice.
type bytesBufferReader struct {
alloc memory.Allocator
byteReader
}

// NewBytesBufferReader creates a new bytesBufferReader with the given size and allocator.
func NewBytesBufferReader(size int, alloc memory.Allocator) *bytesBufferReader {
if alloc == nil {
alloc = memory.DefaultAllocator
}
buf := alloc.Allocate(size)
return &bytesBufferReader{
alloc: alloc,
byteReader: byteReader{
bytes.NewReader(buf),
buf,
0,
},
}
}

// Outer returns the underlying byte slice.
func (r *bytesBufferReader) Buffer() []byte {
return r.buf
}

// Free releases the underlying byte slice back to the allocator.
func (r *bytesBufferReader) Free() {
r.alloc.Free(r.buf)
}

// bufferedReader is similar to bufio.Reader except
// it will expand the buffer if necessary when asked to Peek
// more bytes than are in the buffer
type bufferedReader struct {
alloc memory.Allocator // allocator used to allocate the buffer
bufferSz int
buf []byte
r, w int
Expand All @@ -122,9 +163,13 @@ type bufferedReader struct {
// NewBufferedReader returns a buffered reader with similar semantics to bufio.Reader
// except Peek will expand the internal buffer if needed rather than return
// an error.
func NewBufferedReader(rd Reader, sz int) *bufferedReader {
func NewBufferedReader(rd Reader, sz int, alloc memory.Allocator) *bufferedReader {
if alloc == nil {
alloc = memory.DefaultAllocator
}
r := &bufferedReader{
rd: rd,
alloc: alloc,
rd: rd,
}
r.resizeBuffer(sz)
return r
Expand All @@ -140,11 +185,9 @@ func (b *bufferedReader) Reset(rd Reader) {

func (b *bufferedReader) resetBuffer() {
if b.buf == nil {
b.buf = make([]byte, b.bufferSz)
b.buf = b.alloc.Allocate(b.bufferSz)
} else if b.bufferSz > cap(b.buf) {
buf := b.buf
b.buf = make([]byte, b.bufferSz)
copy(b.buf, buf)
b.buf = b.alloc.Reallocate(b.bufferSz, b.buf)
} else {
b.buf = b.buf[:b.bufferSz]
}
Expand Down Expand Up @@ -298,3 +341,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) {
b.r += n
return n, nil
}

func (b *bufferedReader) Free() {
b.alloc.Free(b.buf)
}
152 changes: 152 additions & 0 deletions parquet/file/column_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/compress"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/internal/encryption"
"github.com/apache/arrow-go/v18/parquet/internal/testutils"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/arrow-go/v18/parquet/schema"
Expand All @@ -42,6 +44,17 @@ import (
"github.com/stretchr/testify/suite"
)

const (
FooterEncryptionKey = "0123456789012345"
ColumnEncryptionKey1 = "1234567890123450"
ColumnEncryptionKey2 = "1234567890123451"
ColumnEncryptionKey3 = "1234567890123452"
FooterEncryptionKeyID = "kf"
ColumnEncryptionKey1ID = "kc1"
ColumnEncryptionKey2ID = "kc2"
ColumnEncryptionKey3ID = "kc3"
)

func initValues(values reflect.Value) {
if values.Kind() != reflect.Slice {
panic("must init values with slice")
Expand Down Expand Up @@ -813,6 +826,145 @@ func TestFullSeekRow(t *testing.T) {
}
}

func checkDecryptedValues(t *testing.T, writerProps *parquet.WriterProperties, readProps *parquet.ReaderProperties) {
sc := arrow.NewSchema([]arrow.Field{
{Name: "c0", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
{Name: "c1", Type: arrow.BinaryTypes.String, Nullable: true},
{Name: "c2", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64), Nullable: true},
}, nil)

tbl, err := array.TableFromJSON(mem, sc, []string{`[
{"c0": 1, "c1": "a", "c2": [1]},
{"c0": 2, "c1": "b", "c2": [1, 2]},
{"c0": 3, "c1": "c", "c2": [null]},
{"c0": null, "c1": "d", "c2": []},
{"c0": 5, "c1": null, "c2": [3, 3, 3]},
{"c0": 6, "c1": "f", "c2": null}
]`})
require.NoError(t, err)
defer tbl.Release()

schema := tbl.Schema()
arrWriterProps := pqarrow.NewArrowWriterProperties()

var buf bytes.Buffer
wr, err := pqarrow.NewFileWriter(schema, &buf, writerProps, arrWriterProps)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade I think there is a bug in pqarrow writer. For the data page v2 buffer, it first writes the levels (definition and repetition) and then the values. Only values are compressed. However, this whole buffer is then encrypted. And unless ChatGPT is hallucinating on me then only the compressed values should also be encrypted, levels should stay unencrypted and uncompressed.
I'll check with some encrypted parquet create in a different way on Monday and see what happens.

Copy link
Contributor Author

@daniel-adam-tfs daniel-adam-tfs Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zeroshade OK, I should have some time this week to finish this. In fact, I think the memory allocation is done, but the decryption needs fixing before the tests pass.

I've been trying to use PyArrow to encrypt/decrypt files, but there seems to be some discrepancy in implementations. I cannot get files encrypted by PyArrow to decrypt using go-arrow and vice versa. I'll open an issue for the encryption/decryption.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please link the issue here when you file it, as PyArrow and arrow-go should be agreeing on encrypting/decrypting and vice versa since pyarrow binds to Arrow C++ and the tests for parquet C++ and arrow-go should be the same tests that are passing. I'd be interested to reproduce the failure and debug it

require.NoError(t, err)

require.NoError(t, wr.WriteTable(tbl, tbl.NumRows()))
require.NoError(t, wr.Close())

rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(readProps))
require.NoError(t, err)
defer rdr.Close()

rgr := rdr.RowGroup(0)
col0, err := rgr.Column(0)
require.NoError(t, err)

icr := col0.(*file.Int64ColumnChunkReader)
// require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I uncomment this, then this causes a panic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't be panicing as the SeekToRow works correctly based on my last tests.... I'll see if i can debug this


vals := make([]int64, 6)
defLvls := make([]int16, 6)
repLvls := make([]int16, 6)

totalLvls, read, err := icr.ReadBatch(6, vals, defLvls, repLvls)
require.NoError(t, err)
assert.EqualValues(t, 6, totalLvls)
assert.EqualValues(t, 5, read)
assert.Equal(t, []int64{1, 2, 3, 5, 6}, vals[:read])
assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLvls[:totalLvls])
assert.Equal(t, []int16{0, 0, 0, 0, 0, 0}, repLvls[:totalLvls])

col1, err := rgr.Column(1)
require.NoError(t, err)

scr := col1.(*file.ByteArrayColumnChunkReader)

bavals := make([]parquet.ByteArray, 6)
badefLvls := make([]int16, 6)
barepLvls := make([]int16, 6)

totalLvls, read, err = scr.ReadBatch(6, bavals, badefLvls, barepLvls)
require.NoError(t, err)
assert.EqualValues(t, 6, totalLvls)
assert.EqualValues(t, 5, read)
expectedBAs := []parquet.ByteArray{
[]byte("a"),
[]byte("b"),
[]byte("c"),
[]byte("d"),
[]byte("f"),
}
assert.Equal(t, expectedBAs, bavals[:read])
assert.Equal(t, []int16{1, 1, 1, 1, 0, 1}, badefLvls[:totalLvls])
assert.Equal(t, []int16{0, 0, 0, 0, 0, 0}, barepLvls[:totalLvls])

col2, err := rgr.Column(2)
require.NoError(t, err)

lcr := col2.(*file.Int64ColumnChunkReader)
vals = make([]int64, 10)
defLvls = make([]int16, 10)
repLvls = make([]int16, 10)
totalLvls, read, err = lcr.ReadBatch(6, vals, defLvls, repLvls)
require.NoError(t, err)

assert.EqualValues(t, 6, totalLvls)
assert.EqualValues(t, 4, read)

assert.Equal(t, []int64{1, 1, 2, 3}, vals[:read])
assert.Equal(t, []int16{3, 3, 3, 2, 1, 3}, defLvls[:totalLvls])
assert.Equal(t, []int16{0, 0, 1, 0, 0, 0}, repLvls[:totalLvls])
}

func TestDecryptColumns(t *testing.T) {
encryptCols := make(parquet.ColumnPathToEncryptionPropsMap)
encryptCols["c0"] = parquet.NewColumnEncryptionProperties("c0", parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID(ColumnEncryptionKey1ID))
encryptCols["c1"] = parquet.NewColumnEncryptionProperties("c1", parquet.WithKey(ColumnEncryptionKey2), parquet.WithKeyID(ColumnEncryptionKey2ID))
encryptCols["c2.list.element"] = parquet.NewColumnEncryptionProperties("c2.list.element", parquet.WithKey(ColumnEncryptionKey3), parquet.WithKeyID(ColumnEncryptionKey3ID))
encryptProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithFooterKeyMetadata(FooterEncryptionKeyID),
parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr))

stringKr1 := make(encryption.StringKeyIDRetriever)
stringKr1.PutKey(FooterEncryptionKeyID, FooterEncryptionKey)
stringKr1.PutKey(ColumnEncryptionKey1ID, ColumnEncryptionKey1)
stringKr1.PutKey(ColumnEncryptionKey2ID, ColumnEncryptionKey2)
stringKr1.PutKey(ColumnEncryptionKey3ID, ColumnEncryptionKey3)
decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr1))

tests := []struct {
name string
dataPageVersion parquet.DataPageVersion
bufferedStream bool
compression compress.Compression
}{
{"DataPageV2_BufferedRead", parquet.DataPageV2, true, compress.Codecs.Uncompressed},
{"DataPageV2_DirectRead", parquet.DataPageV2, false, compress.Codecs.Uncompressed},
{"DataPageV2_BufferedRead_Compressed", parquet.DataPageV2, true, compress.Codecs.Snappy},
{"DataPageV2_DirectRead_Compressed", parquet.DataPageV2, false, compress.Codecs.Snappy},
// {"DataPageV1_BufferedRead", parquet.DataPageV1, true, compress.Codecs.Uncompressed},
// {"DataPageV1_DirectRead", parquet.DataPageV1, false, compress.Codecs.Uncompressed},
// {"DataPageV1_BufferedRead_Compressed", parquet.DataPageV1, true, compress.Codecs.Snappy},
// {"DataPageV1_DirectRead_Compressed", parquet.DataPageV1, false, compress.Codecs.Snappy},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
writerProps := parquet.NewWriterProperties(
parquet.WithDataPageVersion(tt.dataPageVersion),
parquet.WithEncryptionProperties(encryptProps.Clone("")),
parquet.WithCompression(tt.compression),
)
readProps := parquet.NewReaderProperties(nil)
readProps.FileDecryptProps = decryptProps.Clone("")
readProps.BufferedStreamEnabled = tt.bufferedStream
checkDecryptedValues(t, writerProps, readProps)
})
}
}

func BenchmarkReadInt32Column(b *testing.B) {
// generate parquet with RLE-dictionary encoded int32 column
tempdir := b.TempDir()
Expand Down
Loading
Loading