Skip to content

Commit ce2c612

Browse files
committed
Fixed wait method for callbacks.
1 parent 74ecf0f commit ce2c612

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

python/tests/test_callback_subscription.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async def test_callback_wait_method(nats: Nats) -> None:
5252
limit = 10
5353

5454
async def callback(msg: Message) -> None:
55+
await asyncio.sleep(1)
5556
received.append(msg.payload)
5657

5758
async with nats.subscribe(subject=subj, callback=callback) as sub:

src/subscriptions/callback.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,18 @@ async fn start_py_sub(
4747
mut unsub_receiver: tokio::sync::mpsc::Receiver<UnsubscribeCommand>,
4848
end_event: AsyncEvent,
4949
) {
50+
// Required to wait for completion of processing tasks.
51+
//
52+
// This will ensure that end_event is only set after
53+
// processing of all messages is finished.
54+
let mut tasks = tokio::task::JoinSet::new();
5055
loop {
5156
tokio::select! {
5257
msg = sub.next() => {
5358
match msg {
5459
Some(message) => {
5560
let py_cb = py_callback.clone();
56-
tokio::spawn(pyo3_async_runtimes::tokio::scope(
61+
tasks.spawn(pyo3_async_runtimes::tokio::scope(
5762
locals.clone(),
5863
process_message(message, py_cb),
5964
));
@@ -77,6 +82,7 @@ async fn start_py_sub(
7782
}
7883
}
7984
}
85+
tasks.join_all().await;
8086
end_event.set();
8187
}
8288

0 commit comments

Comments
 (0)