feat: two-stage builder API for async Avro reader#9462
feat: two-stage builder API for async Avro reader#9462mzabaluev wants to merge 1 commit intoapache:mainfrom
Conversation
Expose the read_header method in reader::async_reader::ReaderBuilder, returning another builder typestate that exposes the writer schema as it was read from the file header.
|
|
||
| impl<R: AsyncFileReader> ReaderBuilder<R> { | ||
| async fn read_header(&mut self) -> Result<(Header, u64), AvroError> { | ||
| impl<R> ReaderBuilder<R> |
There was a problem hiding this comment.
I wonder, do we want to allow maybe a with_header function as well? that will accept a user's header directly?
There was a problem hiding this comment.
Not the typical usecase, but makes it more flexible
There was a problem hiding this comment.
What would be the behavior with this method? Skip reading the header from the file, and start decoding from...?
There was a problem hiding this comment.
Presumable the range?
The behaviour would be the exact same, since the header ends with the magic I believe? and we start the actual decoding from the first magic we encounter
There was a problem hiding this comment.
So the use case would be, the application parses the header once (or just supplies their own), and then passes it to read ranges in the file on the object store, assuming the header stays the same?
There was a problem hiding this comment.
So the use case would be, the application parses the header once (or just supplies their own), and then passes it to read ranges in the file on the object store, assuming the header stays the same?
At the worst case (range is 0-something), we scan the header bytes very fast until we find the magic, no decoding needed, then we start scanning normally.
Best case is range is middleOfFile-something, and we don't need to do the first call to read the header at all since the user provided it. we just scan until the first magic and party on
There was a problem hiding this comment.
Headeris not currently public, but this could be just an oversight. Its interface looks public-ready.
I also think so, but maybe it's better to do this in a separate PR, making this public has a tendency to bite back 😅
There was a problem hiding this comment.
Presumable the range?
What if the range is not given?
The behaviour would be the exact same, since the header ends with the magic I believe?
The current behavior uses the discovered length of the header as it was parsed from the file.
If the application supplies its own, the with_header method should also give the length, i.e. the offset past the header to start parsing the data from. Alternatively, we could just scan for the magic from the start of the file (unless the range option directs otherwise), but I'm not sure this is bulletproof.
There was a problem hiding this comment.
if range is not given it is 0..EOF
in which case, as I said - we scan the bytes quickly for the magic(which was provided in the header by the user), no decoding happens, then we start decoding normally.
There was a problem hiding this comment.
I wonder, do we want to allow maybe a with_header function as well? that will accept a user's header directly?
I’d be a bit hesitant about with_header. For OCF, the file header is the source of truth as it carries the required avro.schema metadata, and this reader also relies on the parsed header length to know where block decoding should begin. If callers can inject a header, we’d be operating on out-of-band metadata rather than the file’s actual header bytes, and we’d also need to define how header length / start offset are supplied and validated. That feels like a separate optimization for cached/ranged reads, rather than part of this PR’s goal of exposing the discovered writer schema imo.
Header is not currently public, but this could be just an oversight. Its interface looks public-ready.
We had tightened the crate's public API prior to initial release. That way shipping potential fixes with minor releases would be simpler. However Header, HeaderDecoder, and read_header being publicly exposed makes complete sense now that there's a good use-case for it imo.
|
Thinking about the long-term public surface a bit more, I’m a little wary of adding a second public builder stage / intermediate builder type in the async API. The reusable thing seems to be the parsed OCF header plus the discovered Because of that, I’d lean toward exposing a small reusable #[derive(Clone)]
pub struct HeaderInfo(std::sync::Arc<HeaderInfoInner>);
struct HeaderInfoInner {
header: Header,
header_len: u64,
}
impl HeaderInfo {
/// load_async would get the logic currently in async `ReaderBuilder::read_header()`
pub async fn load_async<R: AsyncFileReader>(
in: &mut R,
file_size: u64,
size_hint: Option<u64>,
) -> Result<Self, AvroError>;
/// load could replace the `read_header()` used by the sync `ReaderBuilder`
pub fn load<R: Reader>(in: &mut R) -> Result<Self, AvroError>;
pub fn writer_schema(&self) -> Result<AvroSchema, AvroError>;
pub fn compression(&self) -> Result<Option<CompressionCodec>, AvroError>;
pub fn header_len(&self) -> u64 { self.0.header_len };
pub fn sync(&self) -> [u8; 16];
}
impl<R: AsyncFileReader> ReaderBuilder<R> {
pub fn with_header_info(self, header_info: HeaderInfo) -> Self;
}and then usage along these lines: let header_info = HeaderInfo::load_async(&mut input, file_size, None).await?;
// derive reader_schema from writer_schema
let reader_schema = make_reader_from_writer(header_info.writer_schema()?)?;
let reader = ReaderBuilder::new(input, file_size, batch_size)
.with_reader_schema(reader_schema)
.with_header_info(header_info) // Optional to prevent re-reading the `Header`
.with_projection(vec![0, 2])
.try_build()
.await?;That feels closer to the Parquet It also seems like a narrower semver surface with the added benefit of centrally organizing the crate's // Assuming this header is 100% compatible with both `input_a` and `input_b`, caller is responsible for verifying
let header_info = HeaderInfo::load_async(&mut input_a, size_a, None).await?;
let a = ReaderBuilder::new(input_a, size_a, batch_size)
.with_header_info(header_info.clone())
.try_build()
.await?;
let b = ReaderBuilder::new(input_b, size_b, batch_size)
.with_header_info(header_info)
.try_build()
.await?;Lastly, I'd consider implementing this behavior in the sync Anyway, I'm definitely curious about what you all think? |
Wouldn't this have the same problem of allowing an outside source of truth on the writer schema, header length, etc.? |
Oh 100%. After thinking about it some more I began to align more with what @EmilyMatt mentioned. My opinion is the responsibility for providing a correct I think the overall advantage of the larger approach though stems from centralizing the OCF Header logic in a re-usable manner while loosely coupling it to the readers. I can foresee other future use-cases such as a caller only wanting to check |
Which issue does this PR close?
What changes are included in this PR?
Expose the
read_headermethod inreader::async_reader::ReaderBuilder, returning another builder typestate that exposes the writer schema as it was read from the file header.Are these changes tested?
Tests and doc tests to be added for the new API, showing possible use.
Are there any user-facing changes?
The new API augments the existing ReaderBuilder in a backward-compatible way.