feat(arrow-avro): HeaderInfo to expose OCF header#9548
feat(arrow-avro): HeaderInfo to expose OCF header#9548mzabaluev wants to merge 3 commits intoapache:mainfrom
HeaderInfo to expose OCF header#9548Conversation
Add HeaderInfo to expose OCF header information such as the writer schema and sync marker. Add read_header_info function to the reader module, and its async counterpart to the reader::async_reader module, to read the header from the file reader and return HeaderInfo. Add build_with_header method to async reader builder to enable reuse of the header with multiple readers.
HeaderInfo to expose OCF header
|
I have not added a method to the sync reader builder yet, because I have some doubts about the API. The implementation uses the sequential When some seeking capabilities are added to the sync reader to work with ranges, this may need to be revisited. |
|
@jecsand838 can you help review this PR? |
jecsand838
left a comment
There was a problem hiding this comment.
@mzabaluev LMGTM!
Just left a few comments regarding the doc comments and some duplicated code.
|
|
||
| /// Header information for an Avro OCF file. | ||
| /// | ||
| /// The header can be parsed once and shared used to construct multiple readers |
There was a problem hiding this comment.
| /// The header can be parsed once and shared used to construct multiple readers | |
| /// The header can be parsed once and shared to construct multiple readers |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Asyncronous implementation of Avro file reader. |
There was a problem hiding this comment.
| //! Asyncronous implementation of Avro file reader. | |
| //! Asynchronous implementation of Avro file reader. |
| let (header, header_len) = | ||
| read_header(&mut self.reader, self.file_size, self.header_size_hint).await?; | ||
| self.build_internal(&header, header_len) | ||
| } | ||
|
|
||
| /// Build the asynchronous Avro reader with the provided header. | ||
| /// | ||
| /// This allows initializing the reader with pre-parsed header information. | ||
| /// Note that this method is not async because it does not need to perform any I/O operations. | ||
| pub fn build_with_header( | ||
| self, | ||
| header_info: HeaderInfo, | ||
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | ||
| self.build_internal(header_info.header(), header_info.header_len()) | ||
| } | ||
|
|
||
| fn build_internal( | ||
| self, | ||
| header: &Header, | ||
| header_len: u64, | ||
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | ||
| let writer_schema = { | ||
| let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { | ||
| AvroError::ParseError("No Avro schema present in file header".to_string()) |
There was a problem hiding this comment.
You maybe able to get rid of the duplicated writer_schema code by doing something like this:
| let (header, header_len) = | |
| read_header(&mut self.reader, self.file_size, self.header_size_hint).await?; | |
| self.build_internal(&header, header_len) | |
| } | |
| /// Build the asynchronous Avro reader with the provided header. | |
| /// | |
| /// This allows initializing the reader with pre-parsed header information. | |
| /// Note that this method is not async because it does not need to perform any I/O operations. | |
| pub fn build_with_header( | |
| self, | |
| header_info: HeaderInfo, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| self.build_internal(header_info.header(), header_info.header_len()) | |
| } | |
| fn build_internal( | |
| self, | |
| header: &Header, | |
| header_len: u64, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| let writer_schema = { | |
| let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { | |
| AvroError::ParseError("No Avro schema present in file header".to_string()) | |
| let header_info = | |
| read_header_info(&mut self.reader, self.file_size, self.header_size_hint).await?; | |
| self.build_internal(&header_info) | |
| } | |
| /// Build the asynchronous Avro reader with the provided header. | |
| /// | |
| /// This allows initializing the reader with pre-parsed header information. | |
| /// Note that this method is not async because it does not need to perform any I/O operations. | |
| pub fn build_with_header( | |
| self, | |
| header_info: HeaderInfo, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| self.build_internal(&header_info) | |
| } | |
| fn build_internal( | |
| self, | |
| header: &HeaderInfo, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| let writer_schema = header.writer_schema()?; |
I know it adds the Arc allocation in HeaderInfo::new when the caller doesn't need to share the header, but that seems like a negligible cost for a once-per-file operation. Otherwise you could abstract the writer_schema() logic onto Header for re-use.
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | ||
| self.build_internal(header_info.header(), header_info.header_len()) |
There was a problem hiding this comment.
Also, do you think we should add the empty file check here as well?
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| self.build_internal(header_info.header(), header_info.header_len()) | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| if self.file_size == 0 { | |
| return Err(AvroError::InvalidArgument("File size cannot be 0".into())); | |
| } | |
| self.build_internal(header_info.header(), header_info.header_len()) |
| /// Note that this method is not async because it does not need to perform any I/O operations. | ||
| pub fn build_with_header( |
There was a problem hiding this comment.
Also might be worth adding a comment calling out the header_size_hint size behavior in the build_with_header method.
| /// Note that this method is not async because it does not need to perform any I/O operations. | |
| pub fn build_with_header( | |
| /// Note that this method is not async because it does not need to perform any I/O operations. | |
| /// | |
| /// Note: Any `header_size_hint` set via [`Self::with_header_size_hint`] is not used | |
| /// when building with a pre-parsed header, since no header fetching occurs. | |
| pub fn build_with_header( |
| @@ -1273,7 +1275,7 @@ impl ReaderBuilder { | |||
| /// the discovered writer (and optional reader) schema, and prepares to iterate blocks, | |||
| /// decompressing if necessary. | |||
| pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> { | |||
There was a problem hiding this comment.
I do think there's value in adding the HeaderInfo to the sync Reader, but that can be added in the future.
Which issue does this PR close?
Rationale for this change
Rework of #9462 along the lines proposed in #9462 (comment).
What changes are included in this PR?
Add
HeaderInfoas a cheaply cloneable value to expose header information parsed from an Avro OCF file.Add
read_header_infofunction to thereadermodule, and its async counterpart to thereader::async_readermodule, to read the header from the file reader and returnHeaderInfo.Add
build_with_headermethod to async reader builder to enable reuse of the header with multiple readers.Are these changes tested?
Added a test for the async reader.
Are there any user-facing changes?
New API in arrow-avro:
reader::HeaderInforeader::read_header_infoandreader::async_reader::read_header_infobuild_with_headermethod ofAvroAsyncFileReader's builder.