diff --git a/core/core/src/docs/internals/accessor.rs b/core/core/src/docs/internals/accessor.rs index 8522ba46aa8d..c1c9bf351a0a 100644 --- a/core/core/src/docs/internals/accessor.rs +++ b/core/core/src/docs/internals/accessor.rs @@ -27,7 +27,7 @@ //! ```ignore //! // <----------Trait Bound--------------> //! pub trait Access: Send + Sync + Debug + Unpin + 'static { -//! type Reader: oio::ReadStream; // --+ +//! type Reader: oio::Read; // --+ //! type Writer: oio::Write; // +--> Associated Type //! type Lister: oio::List; // + //! type Deleter: oio::Delete; // --+ diff --git a/core/core/src/docs/rfcs/7660_move_read_range_to_reader.md b/core/core/src/docs/rfcs/7660_move_read_range_to_reader.md index f1a36338e9ff..623906f608c2 100644 --- a/core/core/src/docs/rfcs/7660_move_read_range_to_reader.md +++ b/core/core/src/docs/rfcs/7660_move_read_range_to_reader.md @@ -220,11 +220,14 @@ they need for wrapping and cloning. - `read(range)` reads one bounded planner range and returns exactly one materialized `Buffer`. -`read` is not a syscall-style partial read. Backends should not return a short -buffer just because one underlying syscall or network read returned fewer bytes -than requested. For a valid bounded planner range, `read` should either return -the complete range content or return an error. The complete layer can still -enforce the final length check. +This is the adapter-level `oio::Read` contract. Native services usually +implement either `StreamRead::open` or `PositionRead::read_at`; `StreamReader` +and `PositionReader` then provide the full `oio::Read` contract. + +`read` is not a syscall-style partial read. `oio::Read` implementations should +not return a short buffer just because one underlying syscall or network read +returned fewer bytes than requested. For a valid bounded planner range, `read` +should either return the complete range content or return an error. The public `Reader::read` can still accept a large user range. The core planner must not blindly pass that large range to raw `read`. It should choose between @@ -313,13 +316,14 @@ pub struct ReadContext { `acc`, `path`, and `args` are still useful for planning, especially when OpenDAL needs `stat` to resolve unbounded ranges for chunked reads or -`AsyncSeek` adapters. Actual range I/O should go through `reader.open`, -`reader.read`, or `reader.fetch`, not through repeated `acc.read` calls. +`AsyncSeek` adapters. Actual raw range I/O should go through `reader.open` or +`reader.read`, not through repeated `acc.read` calls. `Reader::metadata()` remains cache-only. The cache is updated from `RpRead` -returned by `Access::read`, `open`, `read`, or `fetch`. Lazy backends may return -`RpRead::default()` from `Access::read` and fill metadata after the first range -operation. +returned by `Access::read`, `open`, or `read`. Lazy backends may return +`RpRead::default()` from `Access::read` and fill metadata after the first raw +range operation. Public `Reader::fetch` observes metadata from the planned raw +`read` calls it executes. ## Layers @@ -433,8 +437,9 @@ This keeps object-store behavior unchanged. ### Memory and database-like services -Services that already have the data content in memory can store the content or -lookup key in their raw reader. `read(range)` can slice bounded ranges directly. +Services that already have the data content in memory can implement +`StreamRead` by opening a stream over the selected slice. Their +`StreamReader` adapter will provide bounded `read(range)`. ### Handle based services @@ -571,9 +576,9 @@ can be implemented. `object_store` exposes both single-range and multi-range reads through `get_range` and `get_ranges`. OpenDAL's public `Reader::fetch` serves a similar -user-facing purpose. This RFC places the raw batch primitive on the raw reader, -where both object-store requests and file-handle positioned reads can be -implemented. +user-facing purpose. The accepted raw contract does not expose a batch +primitive; public fetch is planned by core and executed through bounded raw +`read(range)` calls. # Unresolved questions @@ -581,8 +586,8 @@ implemented. The initial implementation can preserve current public behavior by batching merged ranges according to `OpReader::concurrent`. We should benchmark whether -the default policy should prefer one large `raw.fetch` call, several batches, or -one task per merged range for different backend classes. +the default policy should prefer fewer larger bounded raw reads or one task per +merged range for different backend classes. ## Native preferred read size diff --git a/core/core/src/raw/accessor.rs b/core/core/src/raw/accessor.rs index c033981dd704..14bcf220793a 100644 --- a/core/core/src/raw/accessor.rs +++ b/core/core/src/raw/accessor.rs @@ -121,15 +121,16 @@ pub trait Access: Send + Sync + Debug + Unpin + 'static { ))) } - /// Invoke the `read` operation on the specified path, returns a - /// [`Reader`][crate::Reader] if operate successful. + /// Invoke the `read` operation on the specified path, returns a raw + /// reader if operate successful. /// /// Require [`Capability::read`] /// /// # Behavior /// /// - Input path MUST be file path, DON'T NEED to check mode. - /// - The returning content length may be smaller than the range specified. + /// - Range I/O is selected by the returned reader's `open` or `read` + /// operation. fn read( &self, path: &str, diff --git a/core/layers/dtrace/src/lib.rs b/core/layers/dtrace/src/lib.rs index 7242c37e9b12..f33238b5fddd 100644 --- a/core/layers/dtrace/src/lib.rs +++ b/core/layers/dtrace/src/lib.rs @@ -58,9 +58,9 @@ use probe::probe_lazy; /// /// ### For Reader /// -/// 1. reader_read_start, arguments: path -/// 2. reader_read_ok, arguments: path, length -/// 3. reader_read_error, arguments: path +/// 1. reader_read_start, arguments: path, range +/// 2. reader_read_ok, arguments: path, range, length +/// 3. reader_read_error, arguments: path, range /// /// ### For Writer /// @@ -249,28 +249,57 @@ impl LayeredAccess for DTraceAccessor { pub struct DtraceLayerWrapper { inner: R, path: String, + range: Option, } impl DtraceLayerWrapper { fn new(inner: R, path: &String) -> Self { + Self::with_range(inner, path, None) + } + + fn with_range(inner: R, path: &String, range: Option) -> Self { Self { inner, path: path.to_string(), + range, } } + + fn range_label(&self) -> String { + self.range + .map(|range| range.to_string()) + .unwrap_or_default() + } } impl oio::ReadStream for DtraceLayerWrapper { async fn read(&mut self) -> Result { let c_path = CString::new(self.path.clone()).unwrap(); - probe_lazy!(opendal, reader_read_start, c_path.as_ptr()); + let c_range = CString::new(self.range_label()).unwrap(); + probe_lazy!( + opendal, + reader_read_start, + c_path.as_ptr(), + c_range.as_ptr() + ); match self.inner.read().await { Ok(bs) => { - probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.remaining()); + probe_lazy!( + opendal, + reader_read_ok, + c_path.as_ptr(), + c_range.as_ptr(), + bs.remaining() + ); Ok(bs) } Err(e) => { - probe_lazy!(opendal, reader_read_error, c_path.as_ptr()); + probe_lazy!( + opendal, + reader_read_error, + c_path.as_ptr(), + c_range.as_ptr() + ); Err(e) } } @@ -280,18 +309,38 @@ impl oio::ReadStream for DtraceLayerWrapper { impl oio::Read for DtraceLayerWrapper { async fn open(&self, range: BytesRange) -> Result<(RpRead, Box)> { let c_path = CString::new(self.path.clone()).unwrap(); - probe_lazy!(opendal, reader_read_start, c_path.as_ptr()); + let c_range = CString::new(range.to_string()).unwrap(); + probe_lazy!( + opendal, + reader_read_start, + c_path.as_ptr(), + c_range.as_ptr() + ); match self.inner.open(range).await { Ok((rp, stream)) => { - probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), 0); + probe_lazy!( + opendal, + reader_read_ok, + c_path.as_ptr(), + c_range.as_ptr(), + 0 + ); Ok(( rp, - Box::new(DtraceLayerWrapper::new(stream, &self.path)) - as Box, + Box::new(DtraceLayerWrapper::with_range( + stream, + &self.path, + Some(range), + )) as Box, )) } Err(e) => { - probe_lazy!(opendal, reader_read_error, c_path.as_ptr()); + probe_lazy!( + opendal, + reader_read_error, + c_path.as_ptr(), + c_range.as_ptr() + ); Err(e) } } @@ -299,14 +348,31 @@ impl oio::Read for DtraceLayerWrapper { async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> { let c_path = CString::new(self.path.clone()).unwrap(); - probe_lazy!(opendal, reader_read_start, c_path.as_ptr()); + let c_range = CString::new(range.to_string()).unwrap(); + probe_lazy!( + opendal, + reader_read_start, + c_path.as_ptr(), + c_range.as_ptr() + ); match self.inner.read(range).await { Ok((rp, buffer)) => { - probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), buffer.len()); + probe_lazy!( + opendal, + reader_read_ok, + c_path.as_ptr(), + c_range.as_ptr(), + buffer.len() + ); Ok((rp, buffer)) } Err(e) => { - probe_lazy!(opendal, reader_read_error, c_path.as_ptr()); + probe_lazy!( + opendal, + reader_read_error, + c_path.as_ptr(), + c_range.as_ptr() + ); Err(e) } } diff --git a/core/layers/logging/src/lib.rs b/core/layers/logging/src/lib.rs index c84285f8486c..797871e68e60 100644 --- a/core/layers/logging/src/lib.rs +++ b/core/layers/logging/src/lib.rs @@ -558,6 +558,7 @@ pub struct LoggingReader { info: Arc, logger: I, path: String, + range: Option, read: u64, inner: R, @@ -565,26 +566,45 @@ pub struct LoggingReader { impl LoggingReader { fn new(info: Arc, logger: I, path: &str, reader: R) -> Self { + Self::with_range(info, logger, path, None, reader) + } + + fn with_range( + info: Arc, + logger: I, + path: &str, + range: Option, + reader: R, + ) -> Self { Self { info, logger, path: path.to_string(), + range, read: 0, inner: reader, } } + + fn range_label(&self) -> String { + self.range + .map(|range| range.to_string()) + .unwrap_or_default() + } } impl oio::ReadStream for LoggingReader { async fn read(&mut self) -> Result { match self.inner.read().await { Ok(bs) if bs.is_empty() => { + let range = self.range_label(); self.logger.log( &self.info, Operation::Read, &[ ("path", &self.path), + ("range", &range), ("read", &self.read.to_string()), ("size", &bs.len().to_string()), ], @@ -598,10 +618,15 @@ impl oio::ReadStream for LoggingReade Ok(bs) } Err(err) => { + let range = self.range_label(); self.logger.log( &self.info, Operation::Read, - &[("path", &self.path), ("read", &self.read.to_string())], + &[ + ("path", &self.path), + ("range", &range), + ("read", &self.read.to_string()), + ], "failed", Some(&err), ); @@ -616,10 +641,11 @@ impl oio::Read for LoggingReader { match self.inner.open(range).await { Ok((rp, stream)) => Ok(( rp, - Box::new(LoggingReader::new( + Box::new(LoggingReader::with_range( self.info.clone(), self.logger.clone(), &self.path, + Some(range), stream, )) as Box, )), diff --git a/core/layers/oteltrace/src/lib.rs b/core/layers/oteltrace/src/lib.rs index e91666142169..ed60e5089419 100644 --- a/core/layers/oteltrace/src/lib.rs +++ b/core/layers/oteltrace/src/lib.rs @@ -28,7 +28,6 @@ use opendal_core::*; use opentelemetry::Context as TraceContext; use opentelemetry::KeyValue; use opentelemetry::global; -use opentelemetry::global::BoxedSpan; use opentelemetry::trace::FutureExt as TraceFutureExt; use opentelemetry::trace::Span; use opentelemetry::trace::TraceContextExt; @@ -109,10 +108,12 @@ impl LayeredAccess for OtelTraceAccessor { let mut span = tracer.start("read"); span.set_attribute(KeyValue::new("path", path.to_string())); span.set_attribute(KeyValue::new("args", format!("{args:?}"))); + let cx = TraceContext::current_with_span(span); self.inner .read(path, args) + .with_context(cx.clone()) .await - .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r))) + .map(|(rp, r)| (rp, OtelTraceWrapper::new(cx, r))) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -120,10 +121,12 @@ impl LayeredAccess for OtelTraceAccessor { let mut span = tracer.start("write"); span.set_attribute(KeyValue::new("path", path.to_string())); span.set_attribute(KeyValue::new("args", format!("{args:?}"))); + let cx = TraceContext::current_with_span(span); self.inner .write(path, args) + .with_context(cx.clone()) .await - .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r))) + .map(|(rp, r)| (rp, OtelTraceWrapper::new(cx, r))) } async fn copy( @@ -173,10 +176,12 @@ impl LayeredAccess for OtelTraceAccessor { let mut span = tracer.start("list"); span.set_attribute(KeyValue::new("path", path.to_string())); span.set_attribute(KeyValue::new("args", format!("{args:?}"))); + let cx = TraceContext::current_with_span(span); self.inner .list(path, args) + .with_context(cx.clone()) .await - .map(|(rp, s)| (rp, OtelTraceWrapper::new(span, s))) + .map(|(rp, s)| (rp, OtelTraceWrapper::new(cx, s))) } async fn presign(&self, path: &str, args: OpPresign) -> Result { @@ -191,36 +196,36 @@ impl LayeredAccess for OtelTraceAccessor { #[doc(hidden)] pub struct OtelTraceWrapper { - span: Arc, + cx: TraceContext, inner: R, } impl OtelTraceWrapper { - fn new(span: BoxedSpan, inner: R) -> Self { - Self { - span: Arc::new(span), - inner, - } + fn new(cx: TraceContext, inner: R) -> Self { + Self { cx, inner } } - fn with_span(span: Arc, inner: R) -> Self { - Self { span, inner } + fn child_context(&self, name: &'static str, range: BytesRange) -> TraceContext { + let tracer = global::tracer("opendal"); + let mut span = tracer.start_with_context(name, &self.cx); + span.set_attribute(KeyValue::new("range", range.to_string())); + self.cx.with_span(span) } } impl oio::ReadStream for OtelTraceWrapper { async fn read(&mut self) -> Result { - self.inner.read().await + self.inner.read().with_context(self.cx.clone()).await } } impl oio::Read for OtelTraceWrapper { async fn open(&self, range: BytesRange) -> Result<(RpRead, Box)> { - let (rp, stream) = self.inner.open(range).await?; + let cx = self.child_context("reader.open", range); + let (rp, stream) = self.inner.open(range).with_context(cx.clone()).await?; Ok(( rp, - Box::new(OtelTraceWrapper::with_span(self.span.clone(), stream)) - as Box, + Box::new(OtelTraceWrapper::new(cx, stream)) as Box, )) } @@ -228,26 +233,27 @@ impl oio::Read for OtelTraceWrapper { &self, range: BytesRange, ) -> impl Future> + MaybeSend { - self.inner.read(range) + let cx = self.child_context("reader.read", range); + self.inner.read(range).with_context(cx) } } impl oio::Write for OtelTraceWrapper { fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { - self.inner.write(bs) + self.inner.write(bs).with_context(self.cx.clone()) } fn abort(&mut self) -> impl Future> + MaybeSend { - self.inner.abort() + self.inner.abort().with_context(self.cx.clone()) } fn close(&mut self) -> impl Future> + MaybeSend { - self.inner.close() + self.inner.close().with_context(self.cx.clone()) } } impl oio::List for OtelTraceWrapper { async fn next(&mut self) -> Result> { - self.inner.next().await + self.inner.next().with_context(self.cx.clone()).await } } diff --git a/core/layers/tracing/src/lib.rs b/core/layers/tracing/src/lib.rs index bb2a152ac714..2b47d34053b4 100644 --- a/core/layers/tracing/src/lib.rs +++ b/core/layers/tracing/src/lib.rs @@ -296,15 +296,17 @@ impl oio::ReadStream for TracingWrapper { impl oio::Read for TracingWrapper { async fn open(&self, range: BytesRange) -> Result<(RpRead, Box)> { - let (rp, stream) = self.inner.open(range).instrument(self.span.clone()).await?; + let span = span!(parent: &self.span, Level::DEBUG, "reader.open", range = %range); + let (rp, stream) = self.inner.open(range).instrument(span.clone()).await?; Ok(( rp, - Box::new(TracingWrapper::new(self.span.clone(), stream)) as Box, + Box::new(TracingWrapper::new(span, stream)) as Box, )) } async fn read(&self, range: BytesRange) -> Result<(RpRead, Buffer)> { - self.inner.read(range).instrument(self.span.clone()).await + let span = span!(parent: &self.span, Level::DEBUG, "reader.read", range = %range); + self.inner.read(range).instrument(span).await } }