-
Notifications
You must be signed in to change notification settings - Fork 84
Rust::com Rust Stream API Implementation #480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6021b70
f0a3b5c
f5cb982
0b85790
c8a5cbb
95cd73f
38fc999
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -244,6 +244,7 @@ fn main() { | |
| #[cfg(test)] | ||
| mod test { | ||
| use super::*; | ||
| use futures::stream::StreamExt; | ||
| use std::sync::OnceLock; | ||
| use std::thread; | ||
| use std::time::Duration; | ||
|
|
@@ -364,7 +365,7 @@ mod test { | |
| async fn async_data_sender_fn<R: Runtime>( | ||
| offered_producer: VehicleOfferedProducer<R>, | ||
| ) -> VehicleOfferedProducer<R> { | ||
| for i in 0..10 { | ||
| for i in 0..6 { | ||
| let uninit_sample = match offered_producer.left_tire.allocate() { | ||
| Ok(sample) => sample, | ||
| Err(e) => { | ||
|
|
@@ -393,7 +394,7 @@ mod test { | |
| ) { | ||
| println!("[RECEIVER] Async data processor started"); | ||
| let mut buffer = SampleContainer::new(5); | ||
| for _ in 0..5 { | ||
| for _ in 0..3 { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is 3 better than 5? Consider turning these numbers into constants and explain why they have the limit they have.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we are testing developed API using this example app test, it takes some time to run because of that iteration is reduced but we will optimize this example app from test based to proper API function-based testing using user input from CLI , ticket for same #489 |
||
| let (returned_buf, result) = if is_timeout { | ||
| let timeout = tokio::time::sleep(Duration::from_millis(1000)); | ||
| subscribed.cancellable_receive(buffer, 2, 3, timeout).await | ||
|
|
@@ -412,6 +413,30 @@ mod test { | |
| } | ||
| } | ||
|
|
||
| async fn stream_processor_fn<R: Runtime>(mut subscribed: impl Subscription<Tire, R>) { | ||
| let mut stream = subscribed.to_stream(); | ||
| let mut cnt = 5usize; | ||
| println!("[RECEIVER] Stream processor started"); | ||
| while cnt > 0 { | ||
| // Use timeout to avoid waiting indefinitely in case of issues with the producer or subscription | ||
| match tokio::time::timeout(tokio::time::Duration::from_secs(3), stream.next()).await { | ||
| Ok(Some(Ok(sample))) => { | ||
| println!( | ||
| "[RECEIVER] Stream received sample: {:.2} psi", | ||
| sample.pressure | ||
| ) | ||
| } | ||
| Ok(Some(Err(e))) => eprintln!("[RECEIVER] Stream error: {:?}", e), | ||
| Ok(None) => break, | ||
| Err(_) => { | ||
| eprintln!("[RECEIVER] Timeout while waiting for stream sample"); | ||
| break; | ||
| } | ||
| } | ||
| cnt -= 1; | ||
| } | ||
| } | ||
|
|
||
| // Test case: Async subscription and sending on multi-threaded runtime | ||
| // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads | ||
| #[tokio::test(flavor = "multi_thread")] | ||
|
|
@@ -451,6 +476,7 @@ mod test { | |
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| #[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"] | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test cases ignored in example app because of it required separate service instance. |
||
| async fn receive_with_timeout_and_send_using_multi_thread() { | ||
| println!("Starting async subscription test with Lola runtime"); | ||
| //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. | ||
|
|
@@ -477,6 +503,45 @@ mod test { | |
| processor_join_handle | ||
| .await | ||
| .expect("Error returned from task"); | ||
|
|
||
| let producer = sender_join_handle.await.expect("Error returned from task"); | ||
|
|
||
| match producer.unoffer() { | ||
| Ok(_) => println!("Successfully unoffered the service"), | ||
| Err(e) => eprintln!("Failed to unoffer: {:?}", e), | ||
| } | ||
|
|
||
| println!("=== Async subscription test with Lola runtime completed ===\n"); | ||
| } | ||
|
|
||
| // Test case: Async subscription and sending on multi-threaded runtime | ||
| // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads | ||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn stream_and_send_using_multi_thread() { | ||
| println!("Starting async subscription test with Lola runtime"); | ||
| let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") | ||
| .expect("Failed to create InstanceSpecifier"); | ||
| let service_id_clone = service_id.clone(); | ||
| //consumer create | ||
| let consumer_runtime = get_test_runtime(); | ||
| //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result | ||
| let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); | ||
| //simulate some delay before producer offer service, so that consumer is waiting for discovery | ||
| tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; | ||
| //Producer create | ||
| let producer_runtime = get_test_runtime(); | ||
| let producer = create_producer(producer_runtime, service_id_clone); | ||
| // Spawn async data sender | ||
| let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); | ||
| // Await consumer creation and subscribe to events | ||
| let consumer = consumer.await.expect("Failed to create consumer"); | ||
| // Subscribe to one event | ||
| let subscribed = consumer.left_tire.subscribe(5).unwrap(); | ||
| let stream_processor_join_handle = tokio::spawn(stream_processor_fn(subscribed)); | ||
| stream_processor_join_handle | ||
| .await | ||
| .expect("Error returned from stream processor task"); | ||
|
|
||
| let producer = sender_join_handle.await.expect("Error returned from task"); | ||
|
|
||
| match producer.unoffer() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ use core::mem::ManuallyDrop; | |
| use core::ops::{Deref, DerefMut}; | ||
| use core::panic; | ||
| use core::ptr::NonNull; | ||
| use futures::stream::Stream; | ||
| use futures::task::{AtomicWaker, Context, Poll}; | ||
| use std::cmp::Ordering; | ||
| use std::pin::Pin; | ||
|
|
@@ -304,6 +305,9 @@ impl<T: CommData + Debug> Subscriber<T, LolaRuntimeImpl> for SubscribableImpl<T> | |
| }) | ||
| } | ||
| fn subscribe(self, max_num_samples: usize) -> Result<Self::Subscription> { | ||
| if max_num_samples == 0 { | ||
| return Err(Error::EventError(EventFailedReason::InvalidMaxSamples)); | ||
| } | ||
| let instance_info = self.instance_info.clone(); | ||
| let event_instance = NativeProxyEventBase::new( | ||
| &self.proxy_instance.0.proxy, | ||
|
|
@@ -609,6 +613,25 @@ where | |
| .await | ||
| } | ||
| } | ||
|
|
||
| fn to_stream<'a>(&'a mut self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a { | ||
| // Get the event guard to ensure no concurrent receive calls | ||
| // on the same subscriber instance. This guard is held for the lifetime of the stream. | ||
| let mut proxy_event_guard = self.event.get_proxy_event(); | ||
| // Initialize the async receive callback only once when the first receive call is made | ||
| // We are using std::sync::Once to ensure that the callback is set only once. | ||
| self.async_init_status.call_once(|| { | ||
| self.init_async_receive(&mut proxy_event_guard) | ||
| .expect("Failed to initialize async receive callback"); | ||
| }); | ||
| // Return stream that yields samples one at a time, fetching new batches as needed. | ||
| // The guard is moved into the stream and held for its lifetime to prevent concurrent receives. | ||
| SampleStream { | ||
| subscriber: self, | ||
| sample_container: SampleContainer::new(self.max_num_samples), | ||
| event_guard: proxy_event_guard, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // The ReceiveFuture struct encapsulates the state and logic for asynchronously receiving samples | ||
|
|
@@ -703,6 +726,55 @@ impl<'a, T: CommData + Debug, F: Future<Output = ()>> Future for ReceiveFuture<' | |
| } | ||
| } | ||
|
|
||
| /// A `Stream` that continuously delivers one sample at a time from the subscription. | ||
| /// It maintains an internal `SampleContainer` to buffer samples received from the FFI callback. | ||
| /// It also holds an exclusive `ProxyEventManagerGuard` for the lifetime of the stream to prevent | ||
| /// concurrent receives on the same subscriber instance. | ||
| /// On each poll, it first yields any buffered samples before attempting to receive more from the FFI callback. | ||
| struct SampleStream<'a, T: CommData + Debug> { | ||
| subscriber: &'a SubscriberImpl<T>, | ||
| sample_container: SampleContainer<Sample<T>>, | ||
| event_guard: ProxyEventManagerGuard<'a>, | ||
| } | ||
|
|
||
| impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> { | ||
| type Item = Result<Sample<T>>; | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| // Yield any buffered samples from a previous batch fetch first. | ||
| if let Some(sample) = self.sample_container.pop_front() { | ||
| return Poll::Ready(Some(Ok(sample))); | ||
| } | ||
|
|
||
| // Register the waker before attempting to receive so that a concurrent FFI | ||
| // callback cannot fire between the receive attempt and registering the waker, | ||
| // which would cause a missed wake-up. | ||
| self.subscriber.waker_storage.register(cx.waker()); | ||
| let max_num_samples = self.subscriber.max_num_samples; | ||
|
|
||
| // get a mutable reference of pinned self, with this | ||
| // we can avoid borrow checker issue for self in try_receive_samples function call | ||
| let this = self.as_mut().get_mut(); | ||
|
|
||
| let samples_received = try_receive_samples::<T>( | ||
| this.event_guard.deref_mut(), | ||
| &mut this.sample_container, | ||
| max_num_samples, | ||
| max_num_samples, | ||
| ); | ||
|
|
||
| match samples_received { | ||
| Ok(_count) => { | ||
| match self.sample_container.pop_front() { | ||
| Some(sample) => Poll::Ready(Some(Ok(sample))), | ||
| None => Poll::Pending, | ||
| } | ||
| } | ||
| Err(e) => Poll::Ready(Some(Err(e))), | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stream need to returns Poll::Ready(None) for dropping the stream. Should library close the stream after error occurred or user can handle from its side to drop the stream when error occurred. we can explicitly mention in API documentation a bout user needs to drop the stream if error occurred and implement the drop function if required in library for stream type. library side Stream close -
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think combination of both the approach will make better solution, we will ask user to explicitly call the drop on error but if it does not call then we can terminate when next iteration received to library by user. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Holds the shared state populated asynchronously by the C++ service discovery callback. | ||
| /// | ||
| /// `handles` is wrapped in `Option` because: | ||
|
|
@@ -1054,11 +1126,14 @@ pub fn create_sample_callback<'a, T: CommData + Debug>( | |
| data: ManuallyDrop::new(sample_ptr), | ||
| }, | ||
| }; | ||
|
|
||
| // try_receive / receive path: drop the oldest sample to make room | ||
| // so the buffer always contains the newest samples (sliding window). | ||
| // But for stream container already empty the buffer before receiving new samples, | ||
| // so it will not drop old samples when new samples arrive. | ||
| while scratch.sample_count() >= max_samples { | ||
| scratch.pop_front(); | ||
| } | ||
| // After pop from SampleContainer to make room, | ||
| // push should always succeed, otherwise we lose the data | ||
| assert!( | ||
| scratch.push_back(wrapped_sample).is_ok(), | ||
| "Failed to push sample after making room in buffer" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ use core::marker::PhantomData; | |
| use core::mem::MaybeUninit; | ||
| use core::ops::{Deref, DerefMut}; | ||
| use core::sync::atomic::AtomicUsize; | ||
| use futures::stream::{self, Stream}; | ||
| use std::collections::VecDeque; | ||
| use std::path::Path; | ||
|
|
||
|
|
@@ -337,6 +338,11 @@ where | |
| ) -> impl Future<Output = (SampleContainer<Self::Sample<'a>>, Result<usize>)> + 'a { | ||
| async { todo!() } | ||
| } | ||
|
|
||
| #[allow(clippy::manual_async_fn)] | ||
| fn to_stream<'a>(&'a mut self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a { | ||
| stream::empty() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a ticket to extend this at some point?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created ticket for same - #490 |
||
| } | ||
| } | ||
|
|
||
| pub struct Publisher<T> { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| @startuml com_api_stream_api_lola_runtime | ||
|
|
||
| ' Copyright (c) 2026 Contributors to the Eclipse Foundation | ||
| ' | ||
| ' See the NOTICE file(s) distributed with this work for additional | ||
| ' information regarding copyright ownership. | ||
| ' | ||
| ' This program and the accompanying materials are made available under the | ||
| ' terms of the Apache License Version 2.0 which is available at | ||
| ' <https://www.apache.org/licenses/LICENSE-2.0> | ||
| ' | ||
| ' SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| title Async Stream API in Lola Runtime | ||
|
|
||
| actor User | ||
| participant "Subscription" as Subscription | ||
| participant "SampleContainer" as SampleContainer | ||
| participant "Lola" as Lola | ||
|
|
||
| User -> Subscription: Subscription::to_stream() | ||
| Subscription --> SampleContainer: Create internal buffer (fixed capacity) | ||
|
|
||
| Note over SampleContainer: No background push, samples are fetched during poll and stored in the buffer | ||
| loop User polls the stream | ||
| User -> Subscription: poll_next().await | ||
| activate Subscription | ||
| Subscription -> SampleContainer: Check for >= 1 sample | ||
|
|
||
| alt Sample available in buffer | ||
| Subscription --> User: Ready(Sample) | ||
| else No samples available | ||
| Subscription --> User: Pending | ||
| end | ||
| deactivate Subscription | ||
| end | ||
|
|
||
| @enduml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is 6 better than 10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was reduced because we have now more test and sending 10 sample is taking time because each test is invoking producer but as already i have mention in above comment , we will optimize this example app from test based to proper API function based testing using user input from CLI , ticket for same #489