diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index a7db46cf3..90079e61f 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -244,6 +244,7 @@ fn main() { #[cfg(test)] mod test { use super::*; + use futures::stream::StreamExt; use std::sync::OnceLock; use std::thread; use std::time::Duration; @@ -364,7 +365,7 @@ mod test { async fn async_data_sender_fn( offered_producer: VehicleOfferedProducer, ) -> VehicleOfferedProducer { - for i in 0..10 { + for i in 0..6 { let uninit_sample = match offered_producer.left_tire.allocate() { Ok(sample) => sample, Err(e) => { @@ -393,7 +394,7 @@ mod test { ) { println!("[RECEIVER] Async data processor started"); let mut buffer = SampleContainer::new(5); - for _ in 0..5 { + for _ in 0..3 { let (returned_buf, result) = if is_timeout { let timeout = tokio::time::sleep(Duration::from_millis(1000)); subscribed.cancellable_receive(buffer, 2, 3, timeout).await @@ -412,6 +413,30 @@ mod test { } } + async fn stream_processor_fn(mut subscribed: impl Subscription) { + let mut stream = subscribed.to_stream(); + let mut cnt = 5usize; + println!("[RECEIVER] Stream processor started"); + while cnt > 0 { + // Use timeout to avoid waiting indefinitely in case of issues with the producer or subscription + match tokio::time::timeout(tokio::time::Duration::from_secs(3), stream.next()).await { + Ok(Some(Ok(sample))) => { + println!( + "[RECEIVER] Stream received sample: {:.2} psi", + sample.pressure + ) + } + Ok(Some(Err(e))) => eprintln!("[RECEIVER] Stream error: {:?}", e), + Ok(None) => break, + Err(_) => { + eprintln!("[RECEIVER] Timeout while waiting for stream sample"); + break; + } + } + cnt -= 1; + } + } + // Test case: Async subscription and sending on multi-threaded runtime // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads #[tokio::test(flavor = "multi_thread")] @@ -451,6 +476,7 @@ mod test { } #[tokio::test(flavor = "multi_thread")] + #[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"] async fn receive_with_timeout_and_send_using_multi_thread() { println!("Starting async subscription test with Lola runtime"); //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. @@ -477,6 +503,45 @@ mod test { processor_join_handle .await .expect("Error returned from task"); + + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } + + // Test case: Async subscription and sending on multi-threaded runtime + // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads + #[tokio::test(flavor = "multi_thread")] + async fn stream_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + let stream_processor_join_handle = tokio::spawn(stream_processor_fn(subscribed)); + stream_processor_join_handle + .await + .expect("Error returned from stream processor task"); + let producer = sender_join_handle.await.expect("Error returned from task"); match producer.unoffer() { diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/BUILD b/score/mw/com/impl/rust/com-api/com-api-concept/BUILD index 71be54086..1e8e01851 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/BUILD +++ b/score/mw/com/impl/rust/com-api/com-api-concept/BUILD @@ -32,6 +32,7 @@ rust_library( ], deps = [ "@score_baselibs_rust//src/containers", + "@score_communication_crate_index//:futures", "@score_communication_crate_index//:thiserror", ], ) diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs index 39f5d7780..79697a168 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/com_api_concept.rs @@ -55,6 +55,7 @@ use containers::fixed_capacity::FixedCapacityQueue; use core::fmt::Debug; use core::future::Future; use core::ops::{Deref, DerefMut}; +use futures::stream::Stream; use std::path::Path; /// Result type alias with `std::result::Result` using `com_api::Error` as error type @@ -911,6 +912,42 @@ pub trait Subscription { max_samples: usize, cancellation: impl Future + Send + 'static, ) -> impl Future>, Result)> + 'a; + + /// This method returns a stream that yields samples from the subscription as they become available. + /// + /// The stream creates and maintains an internal `SampleContainer` buffer during its lifetime, + /// and the buffer is created when the stream is created and persists until the stream is dropped. + /// The internal buffer has a maximum capacity of subscribed slots size (max_num_sample). + /// It does not fetch the samples in background, + /// samples only get fetched when the stream is polled and the internal buffer is empty, + /// otherwise the stream will yield samples from the internal buffer. + /// + /// **Polling Behavior:** + /// On each `poll_next()`: + /// - First, any buffered samples from a previous batch fetch are yielded (one sample at a time). + /// - Otherwise, new samples are fetched from the communication buffer. If at least one sample + /// is available, the first one is yielded. If no samples are available, `Poll::Pending` is + /// returned and the waker is registered to be notified when new samples arrive. + /// + /// The lifetime `'a` ties all yielded `Sample<'a>` to the subscription borrow, ensuring + /// samples cannot outlive the subscription that manages the underlying proxy connection. + /// + /// Why we need Unpin bound? - + /// We need to add the Unpin bound because the `next()` method of `Stream` requires the stream + /// to be polled, but the stream returned with `impl Trait` does not return as `Unpin` by default, + /// even though the actual stream implementation might be `Unpin`. The compiler cannot guarantee + /// that the returned stream is `Unpin` without the explicit bound, so we add it to ensure that + /// the returned stream can be safely polled without additional pinning at the user side. + /// + /// # Returns + /// A stream that yields individual `Sample<'a>` items one at a time. + /// + /// # Errors + /// Returns an error if a problem occurs during sample reception but the stream is still processing further samples. + /// If the stream encounters an error, it will yield `Err(Error)` for that sample, but will continue to yield subsequent samples as they become available. + /// The stream only terminates when the subscription is unsubscribed or dropped or if the stream is explicitly dropped by the user. + /// With this design, users can handle errors on it side and take appropriate actions. + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a; } /// A trait for types that can be default-constructed in place, skipping intermediate moves. diff --git a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs index 95d680626..b9e8a6717 100644 --- a/score/mw/com/impl/rust/com-api/com-api-concept/error.rs +++ b/score/mw/com/impl/rust/com-api/com-api-concept/error.rs @@ -77,6 +77,8 @@ pub enum ReceiveFailedReason { Cancelled, #[error("Input value out of bounds, maximum sample {max}, but new sample is {requested}")] InputValueOutOfBounds { max: usize, requested: usize }, + #[error("Stream buffer overflow: capacity is {max}, excess samples were discarded")] + BufferOverflow { max: usize }, } /// Comprehensive error reasons for event-related failures @@ -90,6 +92,8 @@ pub enum EventFailedReason { SendingDataFailed, #[error("Event not available for subscription, possibly due to missing event type or incompatible service")] EventNotAvailable, + #[error("Failed to subscribe to event, due to the max_samples parameter being invalid (e.g., zero or exceeding allowed limits)")] + InvalidMaxSamples, } /// Error enumeration for different failure cases in the Consumer/Producer/Runtime APIs. diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index 8404a5702..65e2b7887 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -36,6 +36,7 @@ use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::panic; use core::ptr::NonNull; +use futures::stream::Stream; use futures::task::{AtomicWaker, Context, Poll}; use std::cmp::Ordering; use std::pin::Pin; @@ -304,6 +305,9 @@ impl Subscriber for SubscribableImpl }) } fn subscribe(self, max_num_samples: usize) -> Result { + if max_num_samples == 0 { + return Err(Error::EventError(EventFailedReason::InvalidMaxSamples)); + } let instance_info = self.instance_info.clone(); let event_instance = NativeProxyEventBase::new( &self.proxy_instance.0.proxy, @@ -609,6 +613,21 @@ where .await } } + + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a { + // Initialize the async receive callback only once when the first receive call is made + // We are using std::sync::Once to ensure that the callback is set only once. + self.async_init_status.call_once(|| { + self.init_async_receive(&mut self.event.get_proxy_event()) + .expect("Failed to initialize async receive callback"); + }); + // Return stream that yields samples one at a time, fetching new batches as needed. + // The guard is moved into the stream and held for its lifetime to prevent concurrent receives. + SampleStream { + subscriber: self, + sample_container: SampleContainer::new(self.max_num_samples), + } + } } // The ReceiveFuture struct encapsulates the state and logic for asynchronously receiving samples @@ -703,6 +722,57 @@ impl<'a, T: CommData + Debug, F: Future> Future for ReceiveFuture<' } } +/// A `Stream` that continuously delivers one sample at a time from the subscription. +/// It maintains an internal `SampleContainer` to buffer samples received from the FFI callback. +/// It also holds an exclusive `ProxyEventManagerGuard` for the lifetime of the stream to prevent +/// concurrent receives on the same subscriber instance. +/// On each poll, it first yields any buffered samples before attempting to receive more from the FFI callback. +struct SampleStream<'a, T: CommData + Debug> { + subscriber: &'a SubscriberImpl, + sample_container: SampleContainer>, +} + +impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Yield any buffered samples from a previous batch fetch first. + if let Some(sample) = self.sample_container.pop_front() { + return Poll::Ready(Some(Ok(sample))); + } + + // Register the waker before attempting to receive so that a concurrent FFI + // callback cannot fire between the receive attempt and registering the waker, + // which would cause a missed wake-up. + self.subscriber.waker_storage.register(cx.waker()); + let max_num_samples = self.subscriber.max_num_samples; + + // get a mutable reference of pinned self, with this + // we can avoid borrow checker issue for self in try_receive_samples function call + let this = self.as_mut().get_mut(); + + let samples_received = try_receive_samples::( + this.subscriber + .event + .get_proxy_event() + .deref_mut(), // Get mutable reference to the proxy event for FFI call + &mut this.sample_container, + max_num_samples, + max_num_samples, + ); + + match samples_received { + Ok(_count) => { + match this.sample_container.pop_front() { + Some(sample) => Poll::Ready(Some(Ok(sample))), + None => Poll::Pending, + } + } + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} + /// Holds the shared state populated asynchronously by the C++ service discovery callback. /// /// `handles` is wrapped in `Option` because: @@ -1054,11 +1124,14 @@ pub fn create_sample_callback<'a, T: CommData + Debug>( data: ManuallyDrop::new(sample_ptr), }, }; + + // try_receive / receive path: drop the oldest sample to make room + // so the buffer always contains the newest samples (sliding window). + // But for stream container already empty the buffer before receiving new samples, + // so it will not drop old samples when new samples arrive. while scratch.sample_count() >= max_samples { scratch.pop_front(); } - // After pop from SampleContainer to make room, - // push should always succeed, otherwise we lose the data assert!( scratch.push_back(wrapped_sample).is_ok(), "Failed to push sample after making room in buffer" diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs index 8f0a4dc03..d026b68a0 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-mock/runtime.rs @@ -31,6 +31,7 @@ use core::marker::PhantomData; use core::mem::MaybeUninit; use core::ops::{Deref, DerefMut}; use core::sync::atomic::AtomicUsize; +use futures::stream::{self, Stream}; use std::collections::VecDeque; use std::path::Path; @@ -337,6 +338,11 @@ where ) -> impl Future>, Result)> + 'a { async { todo!() } } + + #[allow(clippy::manual_async_fn)] + fn to_stream<'a>(&'a mut self) -> impl Stream>> + Unpin + 'a { + stream::empty() + } } pub struct Publisher { diff --git a/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml new file mode 100644 index 000000000..8705c2341 --- /dev/null +++ b/score/mw/com/impl/rust/doc/sequence_diagram/com_api_async_stream_api_lola_runtime.puml @@ -0,0 +1,38 @@ +@startuml com_api_stream_api_lola_runtime + +' Copyright (c) 2026 Contributors to the Eclipse Foundation +' +' See the NOTICE file(s) distributed with this work for additional +' information regarding copyright ownership. +' +' This program and the accompanying materials are made available under the +' terms of the Apache License Version 2.0 which is available at +' +' +' SPDX-License-Identifier: Apache-2.0 + +title Async Stream API in Lola Runtime + +actor User +participant "Subscription" as Subscription +participant "SampleContainer" as SampleContainer +participant "Lola" as Lola + +User -> Subscription: Subscription::to_stream() +Subscription --> SampleContainer: Create internal buffer (fixed capacity) + +Note over SampleContainer: No background push, samples are fetched during poll and stored in the buffer +loop User polls the stream + User -> Subscription: poll_next().await + activate Subscription + Subscription -> SampleContainer: Check for >= 1 sample + + alt Sample available in buffer + Subscription --> User: Ready(Sample) + else No samples available + Subscription --> User: Pending + end + deactivate Subscription +end + +@enduml