Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions score/mw/com/example/com-api-example/basic-consumer-producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ fn main() {
#[cfg(test)]
mod test {
use super::*;
use futures::stream::StreamExt;
use std::sync::OnceLock;
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -364,7 +365,7 @@ mod test {
async fn async_data_sender_fn<R: Runtime>(
offered_producer: VehicleOfferedProducer<R>,
) -> VehicleOfferedProducer<R> {
for i in 0..10 {
for i in 0..6 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 6 better than 10?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was reduced because we have now more test and sending 10 sample is taking time because each test is invoking producer but as already i have mention in above comment , we will optimize this example app from test based to proper API function based testing using user input from CLI , ticket for same #489

let uninit_sample = match offered_producer.left_tire.allocate() {
Ok(sample) => sample,
Err(e) => {
Expand Down Expand Up @@ -393,7 +394,7 @@ mod test {
) {
println!("[RECEIVER] Async data processor started");
let mut buffer = SampleContainer::new(5);
for _ in 0..5 {
for _ in 0..3 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 3 better than 5? Consider turning these numbers into constants and explain why they have the limit they have.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we are testing developed API using this example app test, it takes some time to run because of that iteration is reduced but we will optimize this example app from test based to proper API function-based testing using user input from CLI , ticket for same #489

let (returned_buf, result) = if is_timeout {
let timeout = tokio::time::sleep(Duration::from_millis(1000));
subscribed.cancellable_receive(buffer, 2, 3, timeout).await
Expand All @@ -412,6 +413,30 @@ mod test {
}
}

async fn stream_processor_fn<R: Runtime>(mut subscribed: impl Subscription<Tire, R>) {
let mut stream = subscribed.to_stream();
let mut cnt = 5usize;
println!("[RECEIVER] Stream processor started");
while cnt > 0 {
// Use timeout to avoid waiting indefinitely in case of issues with the producer or subscription
match tokio::time::timeout(tokio::time::Duration::from_secs(3), stream.next()).await {
Ok(Some(Ok(sample))) => {
println!(
"[RECEIVER] Stream received sample: {:.2} psi",
sample.pressure
)
}
Ok(Some(Err(e))) => eprintln!("[RECEIVER] Stream error: {:?}", e),
Ok(None) => break,
Err(_) => {
eprintln!("[RECEIVER] Timeout while waiting for stream sample");
break;
}
}
cnt -= 1;
}
}

// Test case: Async subscription and sending on multi-threaded runtime
// Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads
#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -451,6 +476,7 @@ mod test {
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test cases ignored in example app because of it required separate service instance.
We already have plan to optimize this example app which can run as an app and can select which API want to test using user input - #489

async fn receive_with_timeout_and_send_using_multi_thread() {
println!("Starting async subscription test with Lola runtime");
//Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here.
Expand All @@ -477,6 +503,45 @@ mod test {
processor_join_handle
.await
.expect("Error returned from task");

let producer = sender_join_handle.await.expect("Error returned from task");

match producer.unoffer() {
Ok(_) => println!("Successfully unoffered the service"),
Err(e) => eprintln!("Failed to unoffer: {:?}", e),
}

println!("=== Async subscription test with Lola runtime completed ===\n");
}

// Test case: Async subscription and sending on multi-threaded runtime
// Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads
#[tokio::test(flavor = "multi_thread")]
async fn stream_and_send_using_multi_thread() {
println!("Starting async subscription test with Lola runtime");
let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance")
.expect("Failed to create InstanceSpecifier");
let service_id_clone = service_id.clone();
//consumer create
let consumer_runtime = get_test_runtime();
//starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result
let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id));
//simulate some delay before producer offer service, so that consumer is waiting for discovery
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
//Producer create
let producer_runtime = get_test_runtime();
let producer = create_producer(producer_runtime, service_id_clone);
// Spawn async data sender
let sender_join_handle = tokio::spawn(async_data_sender_fn(producer));
// Await consumer creation and subscribe to events
let consumer = consumer.await.expect("Failed to create consumer");
// Subscribe to one event
let subscribed = consumer.left_tire.subscribe(5).unwrap();
let stream_processor_join_handle = tokio::spawn(stream_processor_fn(subscribed));
stream_processor_join_handle
.await
.expect("Error returned from stream processor task");

let producer = sender_join_handle.await.expect("Error returned from task");

match producer.unoffer() {
Expand Down
1 change: 1 addition & 0 deletions score/mw/com/impl/rust/com-api/com-api-concept/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rust_library(
],
deps = [
"@score_baselibs_rust//src/containers",
"@score_communication_crate_index//:futures",
"@score_communication_crate_index//:thiserror",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use containers::fixed_capacity::FixedCapacityQueue;
use core::fmt::Debug;
use core::future::Future;
use core::ops::{Deref, DerefMut};
use futures::stream::Stream;
use std::path::Path;

/// Result type alias with `std::result::Result` using `com_api::Error` as error type
Expand Down Expand Up @@ -911,6 +912,39 @@ pub trait Subscription<T: CommData + Debug, R: Runtime + ?Sized> {
max_samples: usize,
cancellation: impl Future<Output = ()> + Send + 'static,
) -> impl Future<Output = (SampleContainer<Self::Sample<'a>>, Result<usize>)> + 'a;

/// This method returns a stream that yields samples from the subscription as they become available.
///
/// The stream creates and maintains an internal `SampleContainer` buffer during its lifetime,
/// and the buffer is created when the stream is created and persists until the stream is dropped.
/// The internal buffer has a maximum capacity of subscribed slots size (max_num_sample).
/// It does not fetch the samples in background,
/// samples only get fetched when the stream is polled and the internal buffer is empty,
/// otherwise the stream will yield samples from the internal buffer.
///
/// **Polling Behavior:**
/// On each `poll_next()`:
/// - First, any buffered samples from a previous batch fetch are yielded (one sample at a time).
/// - Otherwise, new samples are fetched from the communication buffer. If at least one sample
/// is available, the first one is yielded. If no samples are available, `Poll::Pending` is
/// returned and the waker is registered to be notified when new samples arrive.
///
/// The lifetime `'a` ties all yielded `Sample<'a>` to the subscription borrow, ensuring
/// samples cannot outlive the subscription that manages the underlying proxy connection.
///
/// Why we need Unpin bound? -
/// We need to add the Unpin bound because the `next()` method of `Stream` requires the stream
/// to be polled, but the stream returned with `impl Trait` does not return as `Unpin` by default,
/// even though the actual stream implementation might be `Unpin`. The compiler cannot guarantee
/// that the returned stream is `Unpin` without the explicit bound, so we add it to ensure that
/// the returned stream can be safely polled without additional pinning at the user side.
///
/// # Returns
/// A stream that yields individual `Sample<'a>` items one at a time.
///
/// # Errors
/// Returns an error if a problem occurs during sample reception.
fn to_stream<'a>(&'a mut self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a;
}

/// A trait for types that can be default-constructed in place, skipping intermediate moves.
Expand Down
4 changes: 4 additions & 0 deletions score/mw/com/impl/rust/com-api/com-api-concept/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub enum ReceiveFailedReason {
Cancelled,
#[error("Input value out of bounds, maximum sample {max}, but new sample is {requested}")]
InputValueOutOfBounds { max: usize, requested: usize },
#[error("Stream buffer overflow: capacity is {max}, excess samples were discarded")]
BufferOverflow { max: usize },
}

/// Comprehensive error reasons for event-related failures
Expand All @@ -90,6 +92,8 @@ pub enum EventFailedReason {
SendingDataFailed,
#[error("Event not available for subscription, possibly due to missing event type or incompatible service")]
EventNotAvailable,
#[error("Failed to subscribe to event, due to the max_samples parameter being invalid (e.g., zero or exceeding allowed limits)")]
InvalidMaxSamples,
}

/// Error enumeration for different failure cases in the Consumer/Producer/Runtime APIs.
Expand Down
79 changes: 77 additions & 2 deletions score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use core::mem::ManuallyDrop;
use core::ops::{Deref, DerefMut};
use core::panic;
use core::ptr::NonNull;
use futures::stream::Stream;
use futures::task::{AtomicWaker, Context, Poll};
use std::cmp::Ordering;
use std::pin::Pin;
Expand Down Expand Up @@ -304,6 +305,9 @@ impl<T: CommData + Debug> Subscriber<T, LolaRuntimeImpl> for SubscribableImpl<T>
})
}
fn subscribe(self, max_num_samples: usize) -> Result<Self::Subscription> {
if max_num_samples == 0 {
return Err(Error::EventError(EventFailedReason::InvalidMaxSamples));
}
let instance_info = self.instance_info.clone();
let event_instance = NativeProxyEventBase::new(
&self.proxy_instance.0.proxy,
Expand Down Expand Up @@ -609,6 +613,25 @@ where
.await
}
}

fn to_stream<'a>(&'a mut self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a {
// Get the event guard to ensure no concurrent receive calls
// on the same subscriber instance. This guard is held for the lifetime of the stream.
let mut proxy_event_guard = self.event.get_proxy_event();
// Initialize the async receive callback only once when the first receive call is made
// We are using std::sync::Once to ensure that the callback is set only once.
self.async_init_status.call_once(|| {
self.init_async_receive(&mut proxy_event_guard)
.expect("Failed to initialize async receive callback");
});
// Return stream that yields samples one at a time, fetching new batches as needed.
// The guard is moved into the stream and held for its lifetime to prevent concurrent receives.
SampleStream {
subscriber: self,
sample_container: SampleContainer::new(self.max_num_samples),
event_guard: proxy_event_guard,
}
}
}

// The ReceiveFuture struct encapsulates the state and logic for asynchronously receiving samples
Expand Down Expand Up @@ -703,6 +726,55 @@ impl<'a, T: CommData + Debug, F: Future<Output = ()>> Future for ReceiveFuture<'
}
}

/// A `Stream` that continuously delivers one sample at a time from the subscription.
/// It maintains an internal `SampleContainer` to buffer samples received from the FFI callback.
/// It also holds an exclusive `ProxyEventManagerGuard` for the lifetime of the stream to prevent
/// concurrent receives on the same subscriber instance.
/// On each poll, it first yields any buffered samples before attempting to receive more from the FFI callback.
struct SampleStream<'a, T: CommData + Debug> {
subscriber: &'a SubscriberImpl<T>,
sample_container: SampleContainer<Sample<T>>,
event_guard: ProxyEventManagerGuard<'a>,
}

impl<'a, T: CommData + Debug> Stream for SampleStream<'a, T> {
type Item = Result<Sample<T>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Yield any buffered samples from a previous batch fetch first.
if let Some(sample) = self.sample_container.pop_front() {
return Poll::Ready(Some(Ok(sample)));
}

// Register the waker before attempting to receive so that a concurrent FFI
// callback cannot fire between the receive attempt and registering the waker,
// which would cause a missed wake-up.
self.subscriber.waker_storage.register(cx.waker());
let max_num_samples = self.subscriber.max_num_samples;

// get a mutable reference of pinned self, with this
// we can avoid borrow checker issue for self in try_receive_samples function call
let this = self.as_mut().get_mut();

let samples_received = try_receive_samples::<T>(
this.event_guard.deref_mut(),
&mut this.sample_container,
max_num_samples,
max_num_samples,
);

match samples_received {
Ok(_count) => {
match self.sample_container.pop_front() {
Some(sample) => Poll::Ready(Some(Ok(sample))),
None => Poll::Pending,
}
}
Err(e) => Poll::Ready(Some(Err(e))),
Copy link
Copy Markdown
Contributor Author

@bharatGoswami8 bharatGoswami8 Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream need to returns Poll::Ready(None) for dropping the stream.

Should library close the stream after error occurred or user can handle from its side to drop the stream when error occurred.

while let Some(item) = stream.next().await {
    match item {
        Ok(v) => {}
        Err(e) => break, // stop consuming
    }
}
drop(stream);

we can explicitly mention in API documentation a bout user needs to drop the stream if error occurred and implement the drop function if required in library for stream type.

library side Stream close -
We need to introduce one more flag to track the error and if error occurred , will return the error and in iteration of next call will return the None.

// Close stream immediately in next iteration after error
        if self.errored {
            return Poll::Ready(None);
        }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think combination of both the approach will make better solution, we will ask user to explicitly call the drop on error but if it does not call then we can terminate when next iteration received to library by user.

}
}
}

/// Holds the shared state populated asynchronously by the C++ service discovery callback.
///
/// `handles` is wrapped in `Option` because:
Expand Down Expand Up @@ -1054,11 +1126,14 @@ pub fn create_sample_callback<'a, T: CommData + Debug>(
data: ManuallyDrop::new(sample_ptr),
},
};

// try_receive / receive path: drop the oldest sample to make room
// so the buffer always contains the newest samples (sliding window).
// But for stream container already empty the buffer before receiving new samples,
// so it will not drop old samples when new samples arrive.
while scratch.sample_count() >= max_samples {
scratch.pop_front();
}
// After pop from SampleContainer to make room,
// push should always succeed, otherwise we lose the data
assert!(
scratch.push_back(wrapped_sample).is_ok(),
"Failed to push sample after making room in buffer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::ops::{Deref, DerefMut};
use core::sync::atomic::AtomicUsize;
use futures::stream::{self, Stream};
use std::collections::VecDeque;
use std::path::Path;

Expand Down Expand Up @@ -337,6 +338,11 @@ where
) -> impl Future<Output = (SampleContainer<Self::Sample<'a>>, Result<usize>)> + 'a {
async { todo!() }
}

#[allow(clippy::manual_async_fn)]
fn to_stream<'a>(&'a mut self) -> impl Stream<Item = Result<Self::Sample<'a>>> + Unpin + 'a {
stream::empty()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a ticket to extend this at some point?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created ticket for same - #490

}
}

pub struct Publisher<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
@startuml com_api_stream_api_lola_runtime

' Copyright (c) 2026 Contributors to the Eclipse Foundation
'
' See the NOTICE file(s) distributed with this work for additional
' information regarding copyright ownership.
'
' This program and the accompanying materials are made available under the
' terms of the Apache License Version 2.0 which is available at
' <https://www.apache.org/licenses/LICENSE-2.0>
'
' SPDX-License-Identifier: Apache-2.0

title Async Stream API in Lola Runtime

actor User
participant "Subscription" as Subscription
participant "SampleContainer" as SampleContainer
participant "Lola" as Lola

User -> Subscription: Subscription::to_stream()
Subscription --> SampleContainer: Create internal buffer (fixed capacity)

Note over SampleContainer: No background push, samples are fetched during poll and stored in the buffer
loop User polls the stream
User -> Subscription: poll_next().await
activate Subscription
Subscription -> SampleContainer: Check for >= 1 sample

alt Sample available in buffer
Subscription --> User: Ready(Sample)
else No samples available
Subscription --> User: Pending
end
deactivate Subscription
end

@enduml
Loading