From 6021b708e405e11308d3b9642dbbab7862327379 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Wed, 15 Apr 2026 10:14:00 +0200 Subject: [PATCH 1/8] Rust::com Async Stream Receive API proposal * Proposal for async stream receive api with basic sequence diagram --- .../impl/rust/com-api/com-api-concept/BUILD | 1 + .../com-api-concept/com_api_concept.rs | 34 +++++++++++++++ .../com-api/com-api-runtime-lola/consumer.rs | 10 +++++ .../com-api/com-api-runtime-mock/runtime.rs | 9 ++++ ...com_api_async_stream_api_lola_runtime.puml | 41 +++++++++++++++++++ 5 files changed, 95 insertions(+) create mode 100644 score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/BUILD b/score/mw/com/impl/rust/com-api/com-api-concept/BUILD index 71be54086..1e8e01851 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/BUILD +++ b/score/mw/com/impl/rust/com-api/com-api-concept/BUILD @@ -32,6 +32,7 @@ rust_library( ], deps = [ "@score_baselibs_rust//src/containers", + "@score_communication_crate_index//:futures", "@score_communication_crate_index//:thiserror", ], ) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 39f5d7780..9ddbccbe3 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -55,6 +55,7 @@ use containers::fixed_capacity::FixedCapacityQueue; use core::fmt::Debug; use core::future::Future; use core::ops::{Deref, DerefMut}; +use futures::stream::Stream; use std::path::Path; /// Result type alias with `std::result::Result` using `com_api::Error` as error type @@ -911,6 +912,39 @@ pub trait Subscription { max_samples: usize, cancellation: impl Future + Send + 'static, ) -> impl Future>, Result)> + 'a; + + /// Returns a stream that continuously yields `SampleContainer` whenever at least `new_samples` are available. + /// + /// The stream creates and maintains an internal `SampleContainer` buffer during its lifetime, + /// automatically collecting samples from the communication buffer. The buffer is created when + /// the stream is created and persists until the subscription is dropped. + /// + /// **Polling Behavior:** + /// When user poll the stream, it checks if the internal buffer contains at least `1` sample. + /// If yes, it yields those samples to user. If not, it returns pending and waits for samples. + /// + /// **Buffer Management:** + /// The internal buffer has a maximum capacity of `max_samples`. When the buffer is full and + /// new samples arrive, the oldest samples are automatically dropped to make room. This allows + /// continuous sample flow without requiring repeated `receive()` calls. + /// + /// The stream manages its own internal `SampleContainer`, so it does not take one as a parameter. + /// Instead, it yields the updated container each time you poll for new samples. + /// + /// # Parameters + /// * `max_samples` - Maximum capacity of the internal buffer + /// + /// # Returns + /// A stream that yields `Sample` with at least `1` event each poll + /// + /// # Errors + /// Returns an error if a problem occurs during sample reception + fn to_stream<'a>( + //TODO: We may take self by value and consume it in the stream and can be recover before dropping the stream. + //Based on implementation complexity, we can decide whether to take self by value or by reference. + &'a self, + max_samples: usize, + ) -> impl Stream>> + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 8404a5702..de2376b07 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -36,6 +36,7 @@ use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::panic; use core::ptr::NonNull; +use futures::stream::{self, Stream}; use futures::task::{AtomicWaker, Context, Poll}; use std::cmp::Ordering; use std::pin::Pin; @@ -609,6 +610,15 @@ where .await } } + + #[allow(clippy::manual_async_fn)] + fn to_stream<'a>( + &'a self, + _max_samples: usize, + ) -> impl Stream>> + 'a { + //TODO: Implementation of to_stream for LoLa will be be done once API approved. + stream::empty() + } } // The ReceiveFuture struct encapsulates the state and logic for asynchronously receiving samples diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index 8f0a4dc03..ded24e9ed 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -31,6 +31,7 @@ use core::marker::PhantomData; use core::mem::MaybeUninit; use core::ops::{Deref, DerefMut}; use core::sync::atomic::AtomicUsize; +use futures::stream::{self, Stream}; use std::collections::VecDeque; use std::path::Path; @@ -337,6 +338,14 @@ where ) -> impl Future>, Result)> + 'a { async { todo!() } } + + #[allow(clippy::manual_async_fn)] + fn to_stream<'a>( + &'a self, + _max_samples: usize, + ) -> impl Stream>> + 'a { + stream::empty() + } } pub struct Publisher { diff --git a/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml new file mode 100644 index 000000000..c3f84134a --- /dev/null +++ b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml @@ -0,0 +1,41 @@ +@startuml com_api_stream_api_lola_runtime + +' Copyright (c) 2026 Contributors to the Eclipse Foundation +' +' See the NOTICE file(s) distributed with this work for additional +' information regarding copyright ownership. +' +' This program and the accompanying materials are made available under the +' terms of the Apache License Version 2.0 which is available at +' +' +' SPDX-License-Identifier: Apache-2.0 + +title Async Stream API in Lola Runtime + +actor User +participant "Subscription" as Subscription +participant "SampleContainer" as SampleContainer +participant "Lola" as Lola + +User -> Subscription: stream(new_samples, max_samples) +Subscription --> SampleContainer: Create with max_samples capacity + +Note over Lola: Backend asynchronously\npushes samples in the SampleContainer + +loop User polls the stream + User -> Subscription: poll_next().await + activate Subscription + Subscription -> SampleContainer: Check samples >= 1 + + alt Samples available + Subscription --> User: Ready(Samples) + else Waiting for Samples + Subscription --> User: Pending + end + deactivate Subscription +end + +Note over SampleContainer: Oldest samples dropped\nwhen buffer full + +@enduml From f0a3b5c600a37310c9f67a0af7e9b38ff93a09cf Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Wed, 13 May 2026 07:14:39 +0200 Subject: [PATCH 2/8] Rust::com Implementation for Stream API * Implemented poll_next function for stream api in consumer crate --- .../com-api-concept/com_api_concept.rs | 10 +- .../rust/com-api/com-api-concept/error.rs | 2 + .../com-api/com-api-runtime-lola/consumer.rs | 162 ++++++++++++++++-- 3 files changed, 159 insertions(+), 15 deletions(-) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 9ddbccbe3..88f24c9f6 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -734,6 +734,14 @@ impl SampleContainer { { self.inner.front().map(::deref) } + + /// Returns the maximum capacity of the container. + /// + /// # Returns + /// The maximum number of samples that the container can hold. + pub fn capacity(&self) -> usize { + self.inner.capacity() + } } /// Active event subscription with polling and async receive capabilities. @@ -940,8 +948,6 @@ pub trait Subscription { /// # Errors /// Returns an error if a problem occurs during sample reception fn to_stream<'a>( - //TODO: We may take self by value and consume it in the stream and can be recover before dropping the stream. - //Based on implementation complexity, we can decide whether to take self by value or by reference. &'a self, max_samples: usize, ) -> impl Stream>> + 'a; diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs index 95d680626..a9eed00b3 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs @@ -77,6 +77,8 @@ pub enum ReceiveFailedReason { Cancelled, #[error("Input value out of bounds, maximum sample {max}, but new sample is {requested}")] InputValueOutOfBounds { max: usize, requested: usize }, + #[error("Stream buffer overflow: capacity is {max}, excess samples were discarded")] + BufferOverflow { max: usize }, } /// Comprehensive error reasons for event-related failures diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index de2376b07..fc18e7074 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -36,6 +36,7 @@ use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::panic; use core::ptr::NonNull; +use futures::future::Either; use futures::stream::{self, Stream}; use futures::task::{AtomicWaker, Context, Poll}; use std::cmp::Ordering; @@ -567,6 +568,7 @@ where scratch, self.max_num_samples, max_samples, + false, ) } @@ -611,13 +613,40 @@ where } } - #[allow(clippy::manual_async_fn)] fn to_stream<'a>( &'a self, - _max_samples: usize, + max_samples: usize, ) -> impl Stream>> + 'a { - //TODO: Implementation of to_stream for LoLa will be be done once API approved. - stream::empty() + // Validate max_samples up front + if max_samples > self.max_num_samples { + // This is a single error stream, so it yields only once and populated the error then ends the stream. + return Either::Left(stream::once(async move { + Err(Error::ReceiveError( + ReceiveFailedReason::SampleCountOutOfBounds { + max: self.max_num_samples, + requested: max_samples, + }, + )) + })); + } + + // Get the event guard to ensure no concurrent receive calls + // on the same subscriber instance. + let mut event_guard = self.event.get_proxy_event(); + // Initialize the async receive callback only once when the first receive call is made + // We are using std::sync::Once to ensure that the callback is set only once. + self.async_init_status.call_once(|| { + self.init_async_receive(&mut event_guard) + .expect("Failed to initialize async receive callback"); + }); + drop(event_guard); + // Return stream that yields samples one at a time, fetching new batches as needed. + Either::Right(ToStreamFuture { + subscriber: self, + max_samples, + sample_container: SampleContainer::new(max_samples), + overflow_occurred: false, + }) } } @@ -671,6 +700,7 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' &mut scratch, max_num_samples, max_samples - total_received, + false, ); self.scratch = Some(scratch); result @@ -713,6 +743,93 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' } } +/// A `Stream` that continuously delivers one sample at a time from the subscription. +/// +/// Internally maintains a `SampleContainer` whose lifetime equals the stream's lifetime. +/// On each `poll_next`: +/// - Buffered samples (leftover from a previous batch fetch) are yielded first. +/// - Once the buffer is drained, if a previous fetch detected overflow (more samples arrived +/// than `max_samples`), `Err(BufferOverflow)` is returned once, then the stream resumes. +/// - Otherwise the waker is registered, new samples are fetched from the FFI layer, and the +/// first sample is returned. `Pending` is returned when no samples are currently available. +/// +/// The lifetime `'a` ties all yielded `Sample<'a>` to the subscription borrow, ensuring +/// samples cannot outlive the subscription that manages the underlying C++ proxy. +struct ToStreamFuture<'a, T: CommData + Debug> { + subscriber: &'a SubscriberImpl, + max_samples: usize, + sample_container: SampleContainer>, + overflow_occurred: bool, +} + +impl<'a, T: CommData + Debug> Stream for ToStreamFuture<'a, T> { + type Item = Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + //extarcting self from pin to access the fields and modify the fields in the function + let this = self.get_mut(); + + // Yield any buffered samples from a previous batch fetch first. + if let Some(sample) = this.sample_container.pop_front() { + return Poll::Ready(Some(Ok(sample))); + } + + // This is condition is rare because already we are yeilding samples and once the buffer is + // empty then only we are fetching again but if C++ side we get more sample then the current + // buffer can hold then we are setting the overflow_occurred flag and once the avaiable + // samples are yeilded then we are populating error to user and reset the flag. + // Why we are not ruturnig error immediately or next call because we have already buffered + // sample which is valid so retuning those sample first and then returning error + // to user and then resuming the stream for next incoming sample. + if this.overflow_occurred { + this.overflow_occurred = false; + return Poll::Ready(Some(Err(Error::ReceiveError( + ReceiveFailedReason::BufferOverflow { + max: this.max_samples, + }, + )))); + } + + // Register the waker before attempting to receive so that a concurrent FFI + // callback cannot fire between the receive attempt and registering the waker, + // which would cause a missed wake-up. + this.subscriber.waker_storage.register(cx.waker()); + let max_samples = this.max_samples; + let max_num_samples = this.subscriber.max_num_samples; + + // Acquire exclusive access to the proxy event for this poll. + let mut event_guard = this.subscriber.event.get_proxy_event(); + // Fetch new samples into the container. Excess samples (beyond max_samples) are + // skipped by the callback, the raw FFI count lets us detect the overflow. + let result = try_receive_samples::( + event_guard.deref_mut(), + &mut this.sample_container, + max_num_samples, + max_samples, + true, + ); + + // Release exclusive event access before yielding control. + drop(event_guard); + + match result { + Ok(count) => { + // FFI delivered more samples than the buffer can hold — overflow. + // Remaining buffered samples are yielded first (top of this function); + // the error is surfaced once the buffer is fully drained. + if count > max_samples { + this.overflow_occurred = true; + } + match this.sample_container.pop_front() { + Some(sample) => Poll::Ready(Some(Ok(sample))), + None => Poll::Pending, + } + } + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} + /// Holds the shared state populated asynchronously by the C++ service discovery callback. /// /// `handles` is wrapped in `Option` because: @@ -983,11 +1100,15 @@ impl ConsumerDescriptor for LolaConsumerBuilder( event: &mut ProxyEventBase, scratch: &mut SampleContainer>, max_num_samples: usize, max_samples: usize, + buffer_overflow_check: bool, ) -> Result { if max_samples == 0 { return Err(Error::ReceiveError( @@ -1006,7 +1127,7 @@ fn try_receive_samples( )); } // Create a callback that will be called by the C++ side for each new sample arrival - let mut callback = create_sample_callback(scratch, max_samples); + let mut callback = create_sample_callback(scratch, max_samples, buffer_overflow_check); // Convert closure to FatPtr for C++ callback let dyn_callback: &mut dyn FnMut(*mut sample_ptr_rs::SamplePtr) = &mut callback; // SAFETY: it is safe to transmute the closure reference to a FatPtr because @@ -1046,9 +1167,13 @@ fn try_receive_samples( /// # Parameters /// * `scratch` - Mutable reference to the sample container /// * `max_samples` - Maximum number of samples to maintain in the container +/// * `buffer_overflow_check` - When `true`, incoming samples are **skipped** when the buffer is +/// full, so the caller can detect overflow from the FFI count. When `false`, the oldest +/// sample is **dropped** to make room for the new one (sliding-window behaviour). pub fn create_sample_callback<'a, T: CommData + Debug>( scratch: &'a mut SampleContainer>, max_samples: usize, + buffer_overflow_check: bool, ) -> impl FnMut(*mut sample_ptr_rs::SamplePtr) + 'a { move |raw_sample: *mut sample_ptr_rs::SamplePtr| { if !raw_sample.is_null() { @@ -1064,15 +1189,26 @@ pub fn create_sample_callback<'a, T: CommData + Debug>( data: ManuallyDrop::new(sample_ptr), }, }; - while scratch.sample_count() >= max_samples { - scratch.pop_front(); + if buffer_overflow_check { + // Stream path: skip the incoming sample when full so the caller + // can detect overflow from the FFI count. + if scratch.sample_count() < scratch.capacity() { + assert!( + scratch.push_back(wrapped_sample).is_ok(), + "Failed to push sample into buffer" + ); + } + } else { + // try_receive / receive path: drop the oldest sample to make room + // so the buffer always contains the newest samples (sliding window). + while scratch.sample_count() >= max_samples { + scratch.pop_front(); + } + assert!( + scratch.push_back(wrapped_sample).is_ok(), + "Failed to push sample after making room in buffer" + ); } - // After pop from SampleContainer to make room, - // push should always succeed, otherwise we lose the data - assert!( - scratch.push_back(wrapped_sample).is_ok(), - "Failed to push sample after making room in buffer" - ); } } } From f5cb98247eb445269fd67a0b3cf8713fdb635ec3 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Mon, 18 May 2026 15:09:30 +0530 Subject: [PATCH 3/8] Rust::com Added the unpin bound to user to return API * Unpin bound required with impl stream return --- .../com-api-concept/com_api_concept.rs | 12 +++++- .../com-api/com-api-runtime-lola/consumer.rs | 37 +++++++++++-------- .../com-api/com-api-runtime-mock/runtime.rs | 2 +- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 88f24c9f6..baf9b16a2 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -736,7 +736,7 @@ impl SampleContainer { } /// Returns the maximum capacity of the container. - /// + /// /// # Returns /// The maximum number of samples that the container can hold. pub fn capacity(&self) -> usize { @@ -939,6 +939,14 @@ pub trait Subscription { /// The stream manages its own internal `SampleContainer`, so it does not take one as a parameter. /// Instead, it yields the updated container each time you poll for new samples. /// + /// Why we need Unpin bound? - + /// We need to add the Unpin bound because the next method of stream required unpin stream to be polled, + /// but the stream return with impl trait does not return as Unpin by default, + /// even though the actual stream implementation might be Unpin. + /// Compiler cannot guarantee that the returned stream is Unpint without the explicit Unpin bound, + /// so we need to add the Unpin bound to ensure that the returned stream can be safely polled without + /// additinal pinning at user side. + /// /// # Parameters /// * `max_samples` - Maximum capacity of the internal buffer /// @@ -950,7 +958,7 @@ pub trait Subscription { fn to_stream<'a>( &'a self, max_samples: usize, - ) -> impl Stream>> + 'a; + ) -> impl Stream>> + Unpin + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index fc18e7074..54201ff05 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -36,8 +36,7 @@ use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::panic; use core::ptr::NonNull; -use futures::future::Either; -use futures::stream::{self, Stream}; +use futures::stream::Stream; use futures::task::{AtomicWaker, Context, Poll}; use std::cmp::Ordering; use std::pin::Pin; @@ -616,19 +615,18 @@ where fn to_stream<'a>( &'a self, max_samples: usize, - ) -> impl Stream>> + 'a { + ) -> impl Stream>> + Unpin + 'a { // Validate max_samples up front - if max_samples > self.max_num_samples { - // This is a single error stream, so it yields only once and populated the error then ends the stream. - return Either::Left(stream::once(async move { - Err(Error::ReceiveError( - ReceiveFailedReason::SampleCountOutOfBounds { - max: self.max_num_samples, - requested: max_samples, - }, - )) - })); - } + let initial_error = if max_samples > self.max_num_samples { + Some(Error::ReceiveError( + ReceiveFailedReason::SampleCountOutOfBounds { + max: self.max_num_samples, + requested: max_samples, + }, + )) + } else { + None + }; // Get the event guard to ensure no concurrent receive calls // on the same subscriber instance. @@ -641,12 +639,13 @@ where }); drop(event_guard); // Return stream that yields samples one at a time, fetching new batches as needed. - Either::Right(ToStreamFuture { + ToStreamFuture { subscriber: self, max_samples, sample_container: SampleContainer::new(max_samples), overflow_occurred: false, - }) + initial_error, + } } } @@ -760,6 +759,7 @@ struct ToStreamFuture<'a, T: CommData + Debug> { max_samples: usize, sample_container: SampleContainer>, overflow_occurred: bool, + initial_error: Option, } impl<'a, T: CommData + Debug> Stream for ToStreamFuture<'a, T> { @@ -769,6 +769,11 @@ impl<'a, T: CommData + Debug> Stream for ToStreamFuture<'a, T> { //extarcting self from pin to access the fields and modify the fields in the function let this = self.get_mut(); + // Return validation error first if present, then end the stream + if let Some(error) = this.initial_error.take() { + return Poll::Ready(Some(Err(error))); + } + // Yield any buffered samples from a previous batch fetch first. if let Some(sample) = this.sample_container.pop_front() { return Poll::Ready(Some(Ok(sample))); diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index ded24e9ed..d5e247716 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -343,7 +343,7 @@ where fn to_stream<'a>( &'a self, _max_samples: usize, - ) -> impl Stream>> + 'a { + ) -> impl Stream>> + Unpin + 'a { stream::empty() } } From 0b857906fff4fff7cc0ec4983fb37eb1599542d3 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Mon, 18 May 2026 15:13:14 +0530 Subject: [PATCH 4/8] Rust::com Example update for stream api usage * Added Test case for to_stream api usage --- .../basic-consumer-producer.rs | 64 ++++++++++++++++++- .../com-api-concept/com_api_concept.rs | 2 +- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index a7db46cf3..5c40a23ed 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -244,6 +244,7 @@ fn main() { #[cfg(test)] mod test { use super::*; + use futures::stream::StreamExt; use std::sync::OnceLock; use std::thread; use std::time::Duration; @@ -364,7 +365,7 @@ mod test { async fn async_data_sender_fn( offered_producer: VehicleOfferedProducer, ) -> VehicleOfferedProducer { - for i in 0..10 { + for i in 0..6 { let uninit_sample = match offered_producer.left_tire.allocate() { Ok(sample) => sample, Err(e) => { @@ -393,7 +394,7 @@ mod test { ) { println!("[RECEIVER] Async data processor started"); let mut buffer = SampleContainer::new(5); - for _ in 0..5 { + for _ in 0..3 { let (returned_buf, result) = if is_timeout { let timeout = tokio::time::sleep(Duration::from_millis(1000)); subscribed.cancellable_receive(buffer, 2, 3, timeout).await @@ -412,6 +413,25 @@ mod test { } } + async fn stream_processor_fn(subscribed: impl Subscription) { + let mut stream = subscribed.to_stream(5); + let mut cnt = 5; + println!("[RECEIVER] Stream processor started"); + while let Some(sample_result) = stream.next().await { + match sample_result { + Ok(sample) => println!( + "[RECEIVER] Stream received sample: {:.2} psi", + sample.pressure + ), + Err(e) => eprintln!("[RECEIVER] Stream error: {:?}", e), + } + cnt -= 1; + if cnt == 0 { + break; + } + } + } + // Test case: Async subscription and sending on multi-threaded runtime // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads #[tokio::test(flavor = "multi_thread")] @@ -451,6 +471,7 @@ mod test { } #[tokio::test(flavor = "multi_thread")] + #[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"] async fn receive_with_timeout_and_send_using_multi_thread() { println!("Starting async subscription test with Lola runtime"); //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. @@ -477,6 +498,45 @@ mod test { processor_join_handle .await .expect("Error returned from task"); + + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } + + // Test case: Async subscription and sending on multi-threaded runtime + // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads + #[tokio::test(flavor = "multi_thread")] + async fn stream_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + let stream_processor_join_handle = tokio::spawn(stream_processor_fn(subscribed)); + stream_processor_join_handle + .await + .expect("Error returned from stream processor task"); + let producer = sender_join_handle.await.expect("Error returned from task"); match producer.unoffer() { diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index baf9b16a2..ddf6fb5c7 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -943,7 +943,7 @@ pub trait Subscription { /// We need to add the Unpin bound because the next method of stream required unpin stream to be polled, /// but the stream return with impl trait does not return as Unpin by default, /// even though the actual stream implementation might be Unpin. - /// Compiler cannot guarantee that the returned stream is Unpint without the explicit Unpin bound, + /// Compiler cannot guarantee that the returned stream is Unpin without the explicit Unpin bound, /// so we need to add the Unpin bound to ensure that the returned stream can be safely polled without /// additinal pinning at user side. /// From c8a5cbb65a5e9089170e36af9571079de2618e5e Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Mon, 25 May 2026 09:47:41 +0530 Subject: [PATCH 5/8] Rust::com Update the Stream API Implementation * Removed the max sample paramter from API * Removed the overflow parameter --- .../basic-consumer-producer.rs | 29 ++-- .../com-api-concept/com_api_concept.rs | 48 +++--- .../com-api/com-api-runtime-lola/consumer.rs | 139 ++++-------------- .../com-api/com-api-runtime-mock/runtime.rs | 5 +- ...com_api_async_stream_api_lola_runtime.puml | 17 +-- 5 files changed, 74 insertions(+), 164 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index 5c40a23ed..4d0186bad 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -414,21 +414,26 @@ mod test { } async fn stream_processor_fn(subscribed: impl Subscription) { - let mut stream = subscribed.to_stream(5); - let mut cnt = 5; + let mut stream = subscribed.to_stream(); + let mut cnt = 5usize; println!("[RECEIVER] Stream processor started"); - while let Some(sample_result) = stream.next().await { - match sample_result { - Ok(sample) => println!( - "[RECEIVER] Stream received sample: {:.2} psi", - sample.pressure - ), - Err(e) => eprintln!("[RECEIVER] Stream error: {:?}", e), + while cnt > 0 { + // Use timeout to avoid waiting indefinitely in case of issues with the producer or subscription + match tokio::time::timeout(tokio::time::Duration::from_secs(3), stream.next()).await { + Ok(Some(Ok(sample))) => { + println!( + "[RECEIVER] Stream received sample: {:.2} psi", + sample.pressure + ) + } + Ok(Some(Err(e))) => eprintln!("[RECEIVER] Stream error: {:?}", e), + Ok(None) => break, + Err(_) => { + eprintln!("[RECEIVER] Timeout while waiting for stream sample"); + break; + } } cnt -= 1; - if cnt == 0 { - break; - } } } diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index ddf6fb5c7..d081024b7 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -921,44 +921,38 @@ pub trait Subscription { cancellation: impl Future + Send + 'static, ) -> impl Future>, Result)> + 'a; - /// Returns a stream that continuously yields `SampleContainer` whenever at least `new_samples` are available. + /// This method returns a stream that yields samples from the subscription as they become available. /// /// The stream creates and maintains an internal `SampleContainer` buffer during its lifetime, - /// automatically collecting samples from the communication buffer. The buffer is created when - /// the stream is created and persists until the subscription is dropped. + /// and the buffer is created when the stream is created and persists until the stream is dropped. + /// The internal buffer has a maximum capacity of subscribed slots size (max_num_sample). + /// It does not fetch the samples in background, + /// samples only get fetched when the stream is polled and the internal buffer is empty, + /// otherwise the stream will yield samples from the internal buffer. /// /// **Polling Behavior:** - /// When user poll the stream, it checks if the internal buffer contains at least `1` sample. - /// If yes, it yields those samples to user. If not, it returns pending and waits for samples. + /// On each `poll_next()`: + /// - First, any buffered samples from a previous batch fetch are yielded (one sample at a time). + /// - Otherwise, new samples are fetched from the communication buffer. If at least one sample + /// is available, the first one is yielded. If no samples are available, `Poll::Pending` is + /// returned and the waker is registered to be notified when new samples arrive. /// - /// **Buffer Management:** - /// The internal buffer has a maximum capacity of `max_samples`. When the buffer is full and - /// new samples arrive, the oldest samples are automatically dropped to make room. This allows - /// continuous sample flow without requiring repeated `receive()` calls. - /// - /// The stream manages its own internal `SampleContainer`, so it does not take one as a parameter. - /// Instead, it yields the updated container each time you poll for new samples. + /// The lifetime `'a` ties all yielded `Sample<'a>` to the subscription borrow, ensuring + /// samples cannot outlive the subscription that manages the underlying proxy connection. /// /// Why we need Unpin bound? - - /// We need to add the Unpin bound because the next method of stream required unpin stream to be polled, - /// but the stream return with impl trait does not return as Unpin by default, - /// even though the actual stream implementation might be Unpin. - /// Compiler cannot guarantee that the returned stream is Unpin without the explicit Unpin bound, - /// so we need to add the Unpin bound to ensure that the returned stream can be safely polled without - /// additinal pinning at user side. - /// - /// # Parameters - /// * `max_samples` - Maximum capacity of the internal buffer + /// We need to add the Unpin bound because the `next()` method of `Stream` requires the stream + /// to be polled, but the stream returned with `impl Trait` does not return as `Unpin` by default, + /// even though the actual stream implementation might be `Unpin`. The compiler cannot guarantee + /// that the returned stream is `Unpin` without the explicit bound, so we add it to ensure that + /// the returned stream can be safely polled without additional pinning at the user side. /// /// # Returns - /// A stream that yields `Sample` with at least `1` event each poll + /// A stream that yields individual `Sample<'a>` items one at a time. /// /// # Errors - /// Returns an error if a problem occurs during sample reception - fn to_stream<'a>( - &'a self, - max_samples: usize, - ) -> impl Stream>> + Unpin + 'a; + /// Returns an error if a problem occurs during sample reception. + fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 54201ff05..1212aa3ce 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -567,7 +567,6 @@ where scratch, self.max_num_samples, max_samples, - false, ) } @@ -612,22 +611,7 @@ where } } - fn to_stream<'a>( - &'a self, - max_samples: usize, - ) -> impl Stream>> + Unpin + 'a { - // Validate max_samples up front - let initial_error = if max_samples > self.max_num_samples { - Some(Error::ReceiveError( - ReceiveFailedReason::SampleCountOutOfBounds { - max: self.max_num_samples, - requested: max_samples, - }, - )) - } else { - None - }; - + fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a { // Get the event guard to ensure no concurrent receive calls // on the same subscriber instance. let mut event_guard = self.event.get_proxy_event(); @@ -641,10 +625,7 @@ where // Return stream that yields samples one at a time, fetching new batches as needed. ToStreamFuture { subscriber: self, - max_samples, - sample_container: SampleContainer::new(max_samples), - overflow_occurred: false, - initial_error, + sample_container: SampleContainer::new(self.max_num_samples), } } } @@ -699,7 +680,6 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' &mut scratch, max_num_samples, max_samples - total_received, - false, ); self.scratch = Some(scratch); result @@ -743,93 +723,46 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' } /// A `Stream` that continuously delivers one sample at a time from the subscription. -/// -/// Internally maintains a `SampleContainer` whose lifetime equals the stream's lifetime. -/// On each `poll_next`: -/// - Buffered samples (leftover from a previous batch fetch) are yielded first. -/// - Once the buffer is drained, if a previous fetch detected overflow (more samples arrived -/// than `max_samples`), `Err(BufferOverflow)` is returned once, then the stream resumes. -/// - Otherwise the waker is registered, new samples are fetched from the FFI layer, and the -/// first sample is returned. `Pending` is returned when no samples are currently available. -/// -/// The lifetime `'a` ties all yielded `Sample<'a>` to the subscription borrow, ensuring -/// samples cannot outlive the subscription that manages the underlying C++ proxy. +/// It maintains an internal `SampleContainer` to buffer samples received from the FFI callback. +/// On each poll, it first yields any buffered samples before attempting to receive more from the FFI callback. struct ToStreamFuture<'a, T: CommData + Debug> { subscriber: &'a SubscriberImpl, - max_samples: usize, sample_container: SampleContainer>, - overflow_occurred: bool, - initial_error: Option, } impl<'a, T: CommData + Debug> Stream for ToStreamFuture<'a, T> { type Item = Result>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - //extarcting self from pin to access the fields and modify the fields in the function - let this = self.get_mut(); - - // Return validation error first if present, then end the stream - if let Some(error) = this.initial_error.take() { - return Poll::Ready(Some(Err(error))); - } - + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Yield any buffered samples from a previous batch fetch first. - if let Some(sample) = this.sample_container.pop_front() { + if let Some(sample) = self.sample_container.pop_front() { return Poll::Ready(Some(Ok(sample))); } - // This is condition is rare because already we are yeilding samples and once the buffer is - // empty then only we are fetching again but if C++ side we get more sample then the current - // buffer can hold then we are setting the overflow_occurred flag and once the avaiable - // samples are yeilded then we are populating error to user and reset the flag. - // Why we are not ruturnig error immediately or next call because we have already buffered - // sample which is valid so retuning those sample first and then returning error - // to user and then resuming the stream for next incoming sample. - if this.overflow_occurred { - this.overflow_occurred = false; - return Poll::Ready(Some(Err(Error::ReceiveError( - ReceiveFailedReason::BufferOverflow { - max: this.max_samples, - }, - )))); - } - // Register the waker before attempting to receive so that a concurrent FFI // callback cannot fire between the receive attempt and registering the waker, // which would cause a missed wake-up. - this.subscriber.waker_storage.register(cx.waker()); - let max_samples = this.max_samples; - let max_num_samples = this.subscriber.max_num_samples; + self.subscriber.waker_storage.register(cx.waker()); + let max_num_samples = self.subscriber.max_num_samples; // Acquire exclusive access to the proxy event for this poll. - let mut event_guard = this.subscriber.event.get_proxy_event(); - // Fetch new samples into the container. Excess samples (beyond max_samples) are - // skipped by the callback, the raw FFI count lets us detect the overflow. + let mut event_guard = self.subscriber.event.get_proxy_event(); + // Fetch new samples into the container let result = try_receive_samples::( event_guard.deref_mut(), - &mut this.sample_container, + &mut self.sample_container, + max_num_samples, max_num_samples, - max_samples, - true, ); // Release exclusive event access before yielding control. drop(event_guard); match result { - Ok(count) => { - // FFI delivered more samples than the buffer can hold — overflow. - // Remaining buffered samples are yielded first (top of this function); - // the error is surfaced once the buffer is fully drained. - if count > max_samples { - this.overflow_occurred = true; - } - match this.sample_container.pop_front() { - Some(sample) => Poll::Ready(Some(Ok(sample))), - None => Poll::Pending, - } - } + Ok(_count) => match self.sample_container.pop_front() { + Some(sample) => Poll::Ready(Some(Ok(sample))), + None => Poll::Pending, + }, Err(e) => Poll::Ready(Some(Err(e))), } } @@ -1105,15 +1038,11 @@ impl ConsumerDescriptor for LolaConsumerBuilder( event: &mut ProxyEventBase, scratch: &mut SampleContainer>, max_num_samples: usize, max_samples: usize, - buffer_overflow_check: bool, ) -> Result { if max_samples == 0 { return Err(Error::ReceiveError( @@ -1132,7 +1061,7 @@ fn try_receive_samples( )); } // Create a callback that will be called by the C++ side for each new sample arrival - let mut callback = create_sample_callback(scratch, max_samples, buffer_overflow_check); + let mut callback = create_sample_callback(scratch, max_samples); // Convert closure to FatPtr for C++ callback let dyn_callback: &mut dyn FnMut(*mut sample_ptr_rs::SamplePtr) = &mut callback; // SAFETY: it is safe to transmute the closure reference to a FatPtr because @@ -1172,13 +1101,9 @@ fn try_receive_samples( /// # Parameters /// * `scratch` - Mutable reference to the sample container /// * `max_samples` - Maximum number of samples to maintain in the container -/// * `buffer_overflow_check` - When `true`, incoming samples are **skipped** when the buffer is -/// full, so the caller can detect overflow from the FFI count. When `false`, the oldest -/// sample is **dropped** to make room for the new one (sliding-window behaviour). pub fn create_sample_callback<'a, T: CommData + Debug>( scratch: &'a mut SampleContainer>, max_samples: usize, - buffer_overflow_check: bool, ) -> impl FnMut(*mut sample_ptr_rs::SamplePtr) + 'a { move |raw_sample: *mut sample_ptr_rs::SamplePtr| { if !raw_sample.is_null() { @@ -1194,26 +1119,18 @@ pub fn create_sample_callback<'a, T: CommData + Debug>( data: ManuallyDrop::new(sample_ptr), }, }; - if buffer_overflow_check { - // Stream path: skip the incoming sample when full so the caller - // can detect overflow from the FFI count. - if scratch.sample_count() < scratch.capacity() { - assert!( - scratch.push_back(wrapped_sample).is_ok(), - "Failed to push sample into buffer" - ); - } - } else { - // try_receive / receive path: drop the oldest sample to make room - // so the buffer always contains the newest samples (sliding window). - while scratch.sample_count() >= max_samples { - scratch.pop_front(); - } - assert!( - scratch.push_back(wrapped_sample).is_ok(), - "Failed to push sample after making room in buffer" - ); + + // try_receive / receive path: drop the oldest sample to make room + // so the buffer always contains the newest samples (sliding window). + // But for stream container already empty the buffer before receiving new samples, + // so it will not drop old samples when new samples arrive. + while scratch.sample_count() >= max_samples { + scratch.pop_front(); } + assert!( + scratch.push_back(wrapped_sample).is_ok(), + "Failed to push sample after making room in buffer" + ); } } } diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index d5e247716..0325533d3 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -340,10 +340,7 @@ where } #[allow(clippy::manual_async_fn)] - fn to_stream<'a>( - &'a self, - _max_samples: usize, - ) -> impl Stream>> + Unpin + 'a { + fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a { stream::empty() } } diff --git a/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml index c3f84134a..8705c2341 100644 --- a/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml +++ b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml @@ -18,24 +18,21 @@ participant "Subscription" as Subscription participant "SampleContainer" as SampleContainer participant "Lola" as Lola -User -> Subscription: stream(new_samples, max_samples) -Subscription --> SampleContainer: Create with max_samples capacity - -Note over Lola: Backend asynchronously\npushes samples in the SampleContainer +User -> Subscription: Subscription::to_stream() +Subscription --> SampleContainer: Create internal buffer (fixed capacity) +Note over SampleContainer: No background push, samples are fetched during poll and stored in the buffer loop User polls the stream User -> Subscription: poll_next().await activate Subscription - Subscription -> SampleContainer: Check samples >= 1 + Subscription -> SampleContainer: Check for >= 1 sample - alt Samples available - Subscription --> User: Ready(Samples) - else Waiting for Samples + alt Sample available in buffer + Subscription --> User: Ready(Sample) + else No samples available Subscription --> User: Pending end deactivate Subscription end -Note over SampleContainer: Oldest samples dropped\nwhen buffer full - @enduml From 95cd73f38b160fee2c1f93ab49d7be3ad1c75488 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Mon, 25 May 2026 11:57:55 +0530 Subject: [PATCH 6/8] Rust::com Stream implementaion update * Stream taking ProxyEventManagerGuard as value * Updated field type to Option --- .../com-api-concept/com_api_concept.rs | 8 -- .../rust/com-api/com-api-concept/error.rs | 2 + .../com-api/com-api-runtime-lola/consumer.rs | 86 +++++++++++++------ 3 files changed, 60 insertions(+), 36 deletions(-) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index d081024b7..409ac7e77 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -734,14 +734,6 @@ impl SampleContainer { { self.inner.front().map(::deref) } - - /// Returns the maximum capacity of the container. - /// - /// # Returns - /// The maximum number of samples that the container can hold. - pub fn capacity(&self) -> usize { - self.inner.capacity() - } } /// Active event subscription with polling and async receive capabilities. diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs index a9eed00b3..b9e8a6717 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs @@ -92,6 +92,8 @@ pub enum EventFailedReason { SendingDataFailed, #[error("Event not available for subscription, possibly due to missing event type or incompatible service")] EventNotAvailable, + #[error("Failed to subscribe to event, due to the max_samples parameter being invalid (e.g., zero or exceeding allowed limits)")] + InvalidMaxSamples, } /// Error enumeration for different failure cases in the Consumer/Producer/Runtime APIs. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 1212aa3ce..a6d3387f7 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -305,6 +305,9 @@ impl Subscriber for SubscribableImpl }) } fn subscribe(self, max_num_samples: usize) -> Result { + if max_num_samples == 0 { + return Err(Error::EventError(EventFailedReason::InvalidMaxSamples)); + } let instance_info = self.instance_info.clone(); let event_instance = NativeProxyEventBase::new( &self.proxy_instance.0.proxy, @@ -613,19 +616,20 @@ where fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a { // Get the event guard to ensure no concurrent receive calls - // on the same subscriber instance. - let mut event_guard = self.event.get_proxy_event(); + // on the same subscriber instance. This guard is held for the lifetime of the stream. + let mut proxy_event_guard = self.event.get_proxy_event(); // Initialize the async receive callback only once when the first receive call is made // We are using std::sync::Once to ensure that the callback is set only once. self.async_init_status.call_once(|| { - self.init_async_receive(&mut event_guard) + self.init_async_receive(&mut proxy_event_guard) .expect("Failed to initialize async receive callback"); }); - drop(event_guard); // Return stream that yields samples one at a time, fetching new batches as needed. - ToStreamFuture { + // The guard is moved into the stream and held for its lifetime to prevent concurrent receives. + SampleStream { subscriber: self, - sample_container: SampleContainer::new(self.max_num_samples), + sample_container: Some(SampleContainer::new(self.max_num_samples)), + event_guard: Some(proxy_event_guard), } } } @@ -724,19 +728,26 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' /// A `Stream` that continuously delivers one sample at a time from the subscription. /// It maintains an internal `SampleContainer` to buffer samples received from the FFI callback. +/// It also holds an exclusive `ProxyEventManagerGuard` for the lifetime of the stream to prevent +/// concurrent receives on the same subscriber instance. /// On each poll, it first yields any buffered samples before attempting to receive more from the FFI callback. -struct ToStreamFuture<'a, T: CommData + Debug> { +struct SampleStream<'a, T: CommData + Debug> { subscriber: &'a SubscriberImpl, - sample_container: SampleContainer>, + sample_container: Option>>, + event_guard: Option>, } -impl<'a, T: CommData + Debug> Stream for ToStreamFuture<'a, T> { +impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { type Item = Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Yield any buffered samples from a previous batch fetch first. - if let Some(sample) = self.sample_container.pop_front() { - return Poll::Ready(Some(Ok(sample))); + if let Some(mut container) = self.sample_container.take() { + if let Some(sample) = container.pop_front() { + self.sample_container = Some(container); + return Poll::Ready(Some(Ok(sample))); + } + self.sample_container = Some(container); } // Register the waker before attempting to receive so that a concurrent FFI @@ -745,24 +756,43 @@ impl<'a, T: CommData + Debug> Stream for ToStreamFuture<'a, T> { self.subscriber.waker_storage.register(cx.waker()); let max_num_samples = self.subscriber.max_num_samples; - // Acquire exclusive access to the proxy event for this poll. - let mut event_guard = self.subscriber.event.get_proxy_event(); - // Fetch new samples into the container - let result = try_receive_samples::( - event_guard.deref_mut(), - &mut self.sample_container, - max_num_samples, - max_num_samples, - ); - - // Release exclusive event access before yielding control. - drop(event_guard); + let samples_received = { + // Temporarily take ownership of scratch to avoid borrow issues + if let Some(mut scratch) = self.sample_container.take() { + if let Some(event_guard) = self.event_guard.as_mut() { + let result = try_receive_samples::( + event_guard.deref_mut(), + &mut scratch, + max_num_samples, + max_num_samples, + ); + self.sample_container = Some(scratch); + result + } else { + Err(Error::ReceiveError(ReceiveFailedReason::ReceiveError)) + } + } else { + Err(Error::ReceiveError(ReceiveFailedReason::BufferUnavailable)) + } + }; - match result { - Ok(_count) => match self.sample_container.pop_front() { - Some(sample) => Poll::Ready(Some(Ok(sample))), - None => Poll::Pending, - }, + match samples_received { + Ok(_count) => { + if let Some(mut container) = self.sample_container.take() { + match container.pop_front() { + Some(sample) => { + self.sample_container = Some(container); + Poll::Ready(Some(Ok(sample))) + } + None => { + self.sample_container = Some(container); + Poll::Pending + } + } + } else { + Poll::Pending + } + } Err(e) => Poll::Ready(Some(Err(e))), } } From 38fc999ea620820a7560fbf0d51c1cb85bd812e6 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Sun, 31 May 2026 08:55:26 +0530 Subject: [PATCH 7/8] Rust::com Stream API Update * API is taking mut self * Removed Option for internal field --- .../basic-consumer-producer.rs | 2 +- .../com-api-concept/com_api_concept.rs | 2 +- .../com-api/com-api-runtime-lola/consumer.rs | 63 ++++++------------- .../com-api/com-api-runtime-mock/runtime.rs | 2 +- 4 files changed, 23 insertions(+), 46 deletions(-) diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index 4d0186bad..90079e61f 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -413,7 +413,7 @@ mod test { } } - async fn stream_processor_fn(subscribed: impl Subscription) { + async fn stream_processor_fn(mut subscribed: impl Subscription) { let mut stream = subscribed.to_stream(); let mut cnt = 5usize; println!("[RECEIVER] Stream processor started"); diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 409ac7e77..20e09491c 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -944,7 +944,7 @@ pub trait Subscription { /// /// # Errors /// Returns an error if a problem occurs during sample reception. - fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a; + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index a6d3387f7..41b96d721 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -614,7 +614,7 @@ where } } - fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a { + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a { // Get the event guard to ensure no concurrent receive calls // on the same subscriber instance. This guard is held for the lifetime of the stream. let mut proxy_event_guard = self.event.get_proxy_event(); @@ -628,8 +628,8 @@ where // The guard is moved into the stream and held for its lifetime to prevent concurrent receives. SampleStream { subscriber: self, - sample_container: Some(SampleContainer::new(self.max_num_samples)), - event_guard: Some(proxy_event_guard), + sample_container: SampleContainer::new(self.max_num_samples), + event_guard: proxy_event_guard, } } } @@ -733,8 +733,8 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' /// On each poll, it first yields any buffered samples before attempting to receive more from the FFI callback. struct SampleStream<'a, T: CommData + Debug> { subscriber: &'a SubscriberImpl, - sample_container: Option>>, - event_guard: Option>, + sample_container: SampleContainer>, + event_guard: ProxyEventManagerGuard<'a>, } impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { @@ -742,12 +742,8 @@ impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Yield any buffered samples from a previous batch fetch first. - if let Some(mut container) = self.sample_container.take() { - if let Some(sample) = container.pop_front() { - self.sample_container = Some(container); - return Poll::Ready(Some(Ok(sample))); - } - self.sample_container = Some(container); + if let Some(sample) = self.sample_container.pop_front() { + return Poll::Ready(Some(Ok(sample))); } // Register the waker before attempting to receive so that a concurrent FFI @@ -756,41 +752,22 @@ impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { self.subscriber.waker_storage.register(cx.waker()); let max_num_samples = self.subscriber.max_num_samples; - let samples_received = { - // Temporarily take ownership of scratch to avoid borrow issues - if let Some(mut scratch) = self.sample_container.take() { - if let Some(event_guard) = self.event_guard.as_mut() { - let result = try_receive_samples::( - event_guard.deref_mut(), - &mut scratch, - max_num_samples, - max_num_samples, - ); - self.sample_container = Some(scratch); - result - } else { - Err(Error::ReceiveError(ReceiveFailedReason::ReceiveError)) - } - } else { - Err(Error::ReceiveError(ReceiveFailedReason::BufferUnavailable)) - } - }; + // get a mutable reference of pinned self, with this + // we can avoid borrow checker issue for self in try_receive_samples function call + let this = self.as_mut().get_mut(); + + let samples_received = try_receive_samples::( + this.event_guard.deref_mut(), + &mut this.sample_container, + max_num_samples, + max_num_samples, + ); match samples_received { Ok(_count) => { - if let Some(mut container) = self.sample_container.take() { - match container.pop_front() { - Some(sample) => { - self.sample_container = Some(container); - Poll::Ready(Some(Ok(sample))) - } - None => { - self.sample_container = Some(container); - Poll::Pending - } - } - } else { - Poll::Pending + match self.sample_container.pop_front() { + Some(sample) => Poll::Ready(Some(Ok(sample))), + None => Poll::Pending, } } Err(e) => Poll::Ready(Some(Err(e))), diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index 0325533d3..d026b68a0 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -340,7 +340,7 @@ where } #[allow(clippy::manual_async_fn)] - fn to_stream<'a>(&'a self) -> impl Stream>> + Unpin + 'a { + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a { stream::empty() } } From a0afb0ba61ecbef33aa02b4ce2e406494f5ba450 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Fri, 5 Jun 2026 16:26:32 +0530 Subject: [PATCH 8/8] Rust::com Removed the ProxyEventManagerGuard from SampleStream * Stream already taking mut reference so concurrent guard check is not required --- .../com-api/com-api-concept/com_api_concept.rs | 5 ++++- .../com-api/com-api-runtime-lola/consumer.rs | 16 +++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 20e09491c..79697a168 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -943,7 +943,10 @@ pub trait Subscription { /// A stream that yields individual `Sample<'a>` items one at a time. /// /// # Errors - /// Returns an error if a problem occurs during sample reception. + /// Returns an error if a problem occurs during sample reception but the stream is still processing further samples. + /// If the stream encounters an error, it will yield `Err(Error)` for that sample, but will continue to yield subsequent samples as they become available. + /// The stream only terminates when the subscription is unsubscribed or dropped or if the stream is explicitly dropped by the user. + /// With this design, users can handle errors on it side and take appropriate actions. fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a; } diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 41b96d721..65e2b7887 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -614,14 +614,11 @@ where } } - fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a { - // Get the event guard to ensure no concurrent receive calls - // on the same subscriber instance. This guard is held for the lifetime of the stream. - let mut proxy_event_guard = self.event.get_proxy_event(); + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a { // Initialize the async receive callback only once when the first receive call is made // We are using std::sync::Once to ensure that the callback is set only once. self.async_init_status.call_once(|| { - self.init_async_receive(&mut proxy_event_guard) + self.init_async_receive(&mut self.event.get_proxy_event()) .expect("Failed to initialize async receive callback"); }); // Return stream that yields samples one at a time, fetching new batches as needed. @@ -629,7 +626,6 @@ where SampleStream { subscriber: self, sample_container: SampleContainer::new(self.max_num_samples), - event_guard: proxy_event_guard, } } } @@ -734,7 +730,6 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' struct SampleStream<'a, T: CommData + Debug> { subscriber: &'a SubscriberImpl, sample_container: SampleContainer>, - event_guard: ProxyEventManagerGuard<'a>, } impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { @@ -757,7 +752,10 @@ impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { let this = self.as_mut().get_mut(); let samples_received = try_receive_samples::( - this.event_guard.deref_mut(), + this.subscriber + .event + .get_proxy_event() + .deref_mut(), // Get mutable reference to the proxy event for FFI call &mut this.sample_container, max_num_samples, max_num_samples, @@ -765,7 +763,7 @@ impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { match samples_received { Ok(_count) => { - match self.sample_container.pop_front() { + match this.sample_container.pop_front() { Some(sample) => Poll::Ready(Some(Ok(sample))), None => Poll::Pending, }