Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/core/src/docs/internals/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! ```ignore
//! // <----------Trait Bound-------------->
//! pub trait Access: Send + Sync + Debug + Unpin + 'static {
//! type Reader: oio::ReadStream; // --+
//! type Reader: oio::Read; // --+
//! type Writer: oio::Write; // +--> Associated Type
//! type Lister: oio::List; // +
//! type Deleter: oio::Delete; // --+
Expand Down
39 changes: 22 additions & 17 deletions core/core/src/docs/rfcs/7660_move_read_range_to_reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,14 @@ they need for wrapping and cloning.
- `read(range)` reads one bounded planner range and returns exactly one
materialized `Buffer`.

`read` is not a syscall-style partial read. Backends should not return a short
buffer just because one underlying syscall or network read returned fewer bytes
than requested. For a valid bounded planner range, `read` should either return
the complete range content or return an error. The complete layer can still
enforce the final length check.
This is the adapter-level `oio::Read` contract. Native services usually
implement either `StreamRead::open` or `PositionRead::read_at`; `StreamReader`
and `PositionReader` then provide the full `oio::Read` contract.

`read` is not a syscall-style partial read. `oio::Read` implementations should
not return a short buffer just because one underlying syscall or network read
returned fewer bytes than requested. For a valid bounded planner range, `read`
should either return the complete range content or return an error.

The public `Reader::read` can still accept a large user range. The core planner
must not blindly pass that large range to raw `read`. It should choose between
Expand Down Expand Up @@ -313,13 +316,14 @@ pub struct ReadContext {

`acc`, `path`, and `args` are still useful for planning, especially when
OpenDAL needs `stat` to resolve unbounded ranges for chunked reads or
`AsyncSeek` adapters. Actual range I/O should go through `reader.open`,
`reader.read`, or `reader.fetch`, not through repeated `acc.read` calls.
`AsyncSeek` adapters. Actual raw range I/O should go through `reader.open` or
`reader.read`, not through repeated `acc.read` calls.

`Reader::metadata()` remains cache-only. The cache is updated from `RpRead`
returned by `Access::read`, `open`, `read`, or `fetch`. Lazy backends may return
`RpRead::default()` from `Access::read` and fill metadata after the first range
operation.
returned by `Access::read`, `open`, or `read`. Lazy backends may return
`RpRead::default()` from `Access::read` and fill metadata after the first raw
range operation. Public `Reader::fetch` observes metadata from the planned raw
`read` calls it executes.

## Layers

Expand Down Expand Up @@ -433,8 +437,9 @@ This keeps object-store behavior unchanged.

### Memory and database-like services

Services that already have the data content in memory can store the content or
lookup key in their raw reader. `read(range)` can slice bounded ranges directly.
Services that already have the data content in memory can implement
`StreamRead` by opening a stream over the selected slice. Their
`StreamReader` adapter will provide bounded `read(range)`.

### Handle based services

Expand Down Expand Up @@ -571,18 +576,18 @@ can be implemented.

`object_store` exposes both single-range and multi-range reads through
`get_range` and `get_ranges`. OpenDAL's public `Reader::fetch` serves a similar
user-facing purpose. This RFC places the raw batch primitive on the raw reader,
where both object-store requests and file-handle positioned reads can be
implemented.
user-facing purpose. The accepted raw contract does not expose a batch
primitive; public fetch is planned by core and executed through bounded raw
`read(range)` calls.

# Unresolved questions

## Fetch batching policy

The initial implementation can preserve current public behavior by batching
merged ranges according to `OpReader::concurrent`. We should benchmark whether
the default policy should prefer one large `raw.fetch` call, several batches, or
one task per merged range for different backend classes.
the default policy should prefer fewer larger bounded raw reads or one task per
merged range for different backend classes.

## Native preferred read size

Expand Down
7 changes: 4 additions & 3 deletions core/core/src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,16 @@ pub trait Access: Send + Sync + Debug + Unpin + 'static {
)))
}

/// Invoke the `read` operation on the specified path, returns a
/// [`Reader`][crate::Reader] if operate successful.
/// Invoke the `read` operation on the specified path, returns a raw
/// reader if operate successful.
///
/// Require [`Capability::read`]
///
/// # Behavior
///
/// - Input path MUST be file path, DON'T NEED to check mode.
/// - The returning content length may be smaller than the range specified.
/// - Range I/O is selected by the returned reader's `open` or `read`
/// operation.
fn read(
&self,
path: &str,
Expand Down
94 changes: 80 additions & 14 deletions core/layers/dtrace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use probe::probe_lazy;
///
/// ### For Reader
///
/// 1. reader_read_start, arguments: path
/// 2. reader_read_ok, arguments: path, length
/// 3. reader_read_error, arguments: path
/// 1. reader_read_start, arguments: path, range
/// 2. reader_read_ok, arguments: path, range, length
/// 3. reader_read_error, arguments: path, range
///
/// ### For Writer
///
Expand Down Expand Up @@ -249,28 +249,57 @@ impl<A: Access> LayeredAccess for DTraceAccessor<A> {
pub struct DtraceLayerWrapper<R> {
inner: R,
path: String,
range: Option<BytesRange>,
}

impl<R> DtraceLayerWrapper<R> {
fn new(inner: R, path: &String) -> Self {
Self::with_range(inner, path, None)
}

fn with_range(inner: R, path: &String, range: Option<BytesRange>) -> Self {
Self {
inner,
path: path.to_string(),
range,
}
}

fn range_label(&self) -> String {
self.range
.map(|range| range.to_string())
.unwrap_or_default()
}
}

impl<R: oio::ReadStream> oio::ReadStream for DtraceLayerWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
let c_range = CString::new(self.range_label()).unwrap();
probe_lazy!(
opendal,
reader_read_start,
c_path.as_ptr(),
c_range.as_ptr()
);
match self.inner.read().await {
Ok(bs) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.remaining());
probe_lazy!(
opendal,
reader_read_ok,
c_path.as_ptr(),
c_range.as_ptr(),
bs.remaining()
);
Ok(bs)
}
Err(e) => {
probe_lazy!(opendal, reader_read_error, c_path.as_ptr());
probe_lazy!(
opendal,
reader_read_error,
c_path.as_ptr(),
c_range.as_ptr()
);
Err(e)
}
}
Expand All @@ -280,33 +309,70 @@ impl<R: oio::ReadStream> oio::ReadStream for DtraceLayerWrapper<R> {
impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
async fn open(&self, range: BytesRange) -> Result<(RpRead, Box<dyn oio::ReadStreamDyn>)> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
let c_range = CString::new(range.to_string()).unwrap();
probe_lazy!(
opendal,
reader_read_start,
c_path.as_ptr(),
c_range.as_ptr()
);
match self.inner.open(range).await {
Ok((rp, stream)) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), 0);
probe_lazy!(
opendal,
reader_read_ok,
c_path.as_ptr(),
c_range.as_ptr(),
0
);
Ok((
rp,
Box::new(DtraceLayerWrapper::new(stream, &self.path))
as Box<dyn oio::ReadStreamDyn>,
Box::new(DtraceLayerWrapper::with_range(
stream,
&self.path,
Some(range),
)) as Box<dyn oio::ReadStreamDyn>,
))
}
Err(e) => {
probe_lazy!(opendal, reader_read_error, c_path.as_ptr());
probe_lazy!(
opendal,
reader_read_error,
c_path.as_ptr(),
c_range.as_ptr()
);
Err(e)
}
}
}

async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
let c_range = CString::new(range.to_string()).unwrap();
probe_lazy!(
opendal,
reader_read_start,
c_path.as_ptr(),
c_range.as_ptr()
);
match self.inner.read(range).await {
Ok((rp, buffer)) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), buffer.len());
probe_lazy!(
opendal,
reader_read_ok,
c_path.as_ptr(),
c_range.as_ptr(),
buffer.len()
);
Ok((rp, buffer))
}
Err(e) => {
probe_lazy!(opendal, reader_read_error, c_path.as_ptr());
probe_lazy!(
opendal,
reader_read_error,
c_path.as_ptr(),
c_range.as_ptr()
);
Err(e)
}
}
Expand Down
30 changes: 28 additions & 2 deletions core/layers/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,33 +558,53 @@ pub struct LoggingReader<R, I: LoggingInterceptor> {
info: Arc<AccessorInfo>,
logger: I,
path: String,
range: Option<BytesRange>,

read: u64,
inner: R,
}

impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
Self::with_range(info, logger, path, None, reader)
}

fn with_range(
info: Arc<AccessorInfo>,
logger: I,
path: &str,
range: Option<BytesRange>,
reader: R,
) -> Self {
Self {
info,
logger,
path: path.to_string(),
range,

read: 0,
inner: reader,
}
}

fn range_label(&self) -> String {
self.range
.map(|range| range.to_string())
.unwrap_or_default()
}
}

impl<R: oio::ReadStream, I: LoggingInterceptor> oio::ReadStream for LoggingReader<R, I> {
async fn read(&mut self) -> Result<Buffer> {
match self.inner.read().await {
Ok(bs) if bs.is_empty() => {
let range = self.range_label();
self.logger.log(
&self.info,
Operation::Read,
&[
("path", &self.path),
("range", &range),
("read", &self.read.to_string()),
("size", &bs.len().to_string()),
],
Expand All @@ -598,10 +618,15 @@ impl<R: oio::ReadStream, I: LoggingInterceptor> oio::ReadStream for LoggingReade
Ok(bs)
}
Err(err) => {
let range = self.range_label();
self.logger.log(
&self.info,
Operation::Read,
&[("path", &self.path), ("read", &self.read.to_string())],
&[
("path", &self.path),
("range", &range),
("read", &self.read.to_string()),
],
"failed",
Some(&err),
);
Expand All @@ -616,10 +641,11 @@ impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
match self.inner.open(range).await {
Ok((rp, stream)) => Ok((
rp,
Box::new(LoggingReader::new(
Box::new(LoggingReader::with_range(
self.info.clone(),
self.logger.clone(),
&self.path,
Some(range),
stream,
)) as Box<dyn oio::ReadStreamDyn>,
)),
Expand Down
Loading
Loading