Skip to content

Commit 0d24d0d

Browse files
sanityclaude
andauthored
fix: drain pending messages on all error exit paths (#2266)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2b31f08 commit 0d24d0d

File tree

2 files changed

+228
-1
lines changed

2 files changed

+228
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ jobs:
130130

131131
six_peer_regression:
132132
name: six-peer-regression
133-
needs: test_all
133+
needs: test_all # Run after Test to avoid resource contention on self-hosted runner
134134
runs-on: self-hosted
135135
timeout-minutes: 30
136136

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2674,13 +2674,51 @@ async fn notify_transport_closed(
26742674
}
26752675
}
26762676

2677+
/// Drains and sends all pending outbound messages before shutting down.
2678+
///
2679+
/// This is critical for preventing message loss: when we detect an error condition
2680+
/// (transport error, channel closed, etc.), there may be messages that were queued
2681+
/// to the channel AFTER we started waiting in select! but BEFORE we detected the error.
2682+
/// Without this drain, those messages would be silently lost.
2683+
async fn drain_pending_before_shutdown(
2684+
rx: &mut PeerConnChannelRecv,
2685+
conn: &mut PeerConnection,
2686+
remote_addr: SocketAddr,
2687+
) -> usize {
2688+
let mut drained = 0;
2689+
while let Ok(msg) = rx.try_recv() {
2690+
// Best-effort send - connection may already be degraded
2691+
if let Err(e) = handle_peer_channel_message(conn, msg).await {
2692+
tracing::debug!(
2693+
to = %remote_addr,
2694+
?e,
2695+
drained,
2696+
"[CONN_LIFECYCLE] Error during shutdown drain (expected if connection closed)"
2697+
);
2698+
break;
2699+
}
2700+
drained += 1;
2701+
}
2702+
if drained > 0 {
2703+
tracing::info!(
2704+
to = %remote_addr,
2705+
drained,
2706+
"[CONN_LIFECYCLE] Drained pending messages before shutdown"
2707+
);
2708+
}
2709+
drained
2710+
}
2711+
26772712
/// Listens for messages on a peer connection using drain-then-select pattern.
26782713
///
26792714
/// On each iteration, drains all pending outbound messages via `try_recv()` before
26802715
/// waiting for either new outbound or inbound messages. This approach:
26812716
/// 1. Ensures queued outbound messages are sent promptly
26822717
/// 2. Avoids starving inbound (which would happen with biased select)
26832718
/// 3. No messages are lost due to poll ordering
2719+
///
2720+
/// IMPORTANT: Before exiting on any error path, we drain pending messages to prevent
2721+
/// loss of messages that arrived during the select! wait.
26842722
async fn peer_connection_listener(
26852723
mut rx: PeerConnChannelRecv,
26862724
mut conn: PeerConnection,
@@ -2707,6 +2745,9 @@ async fn peer_connection_listener(
27072745
?error,
27082746
"[CONN_LIFECYCLE] Connection closed after channel command"
27092747
);
2748+
// Drain any messages that arrived after our try_recv() but before
2749+
// handle_peer_channel_message returned an error
2750+
drain_pending_before_shutdown(&mut rx, &mut conn, remote_addr).await;
27102751
notify_transport_closed(&conn_events, remote_addr, error).await;
27112752
return;
27122753
}
@@ -2717,6 +2758,7 @@ async fn peer_connection_listener(
27172758
to = %remote_addr,
27182759
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
27192760
);
2761+
// Channel disconnected means no more messages can arrive
27202762
notify_transport_closed(
27212763
&conn_events,
27222764
remote_addr,
@@ -2739,6 +2781,8 @@ async fn peer_connection_listener(
27392781
?error,
27402782
"[CONN_LIFECYCLE] Connection closed after channel command"
27412783
);
2784+
// Drain any messages that arrived while we were processing this one
2785+
drain_pending_before_shutdown(&mut rx, &mut conn, remote_addr).await;
27422786
notify_transport_closed(&conn_events, remote_addr, error).await;
27432787
return;
27442788
}
@@ -2748,6 +2792,7 @@ async fn peer_connection_listener(
27482792
to = %remote_addr,
27492793
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
27502794
);
2795+
// Channel closed means no more messages can arrive
27512796
notify_transport_closed(
27522797
&conn_events,
27532798
remote_addr,
@@ -2783,6 +2828,9 @@ async fn peer_connection_listener(
27832828
from = %remote_addr,
27842829
"[CONN_LIFECYCLE] conn_events receiver dropped; stopping listener"
27852830
);
2831+
// Drain pending messages - they may still be sendable even if
2832+
// the conn_events channel is closed
2833+
drain_pending_before_shutdown(&mut rx, &mut conn, remote_addr).await;
27862834
return;
27872835
}
27882836
}
@@ -2792,6 +2840,8 @@ async fn peer_connection_listener(
27922840
?error,
27932841
"[CONN_LIFECYCLE] Failed to deserialize inbound message; closing connection"
27942842
);
2843+
// Drain pending outbound messages before closing - they may still succeed
2844+
drain_pending_before_shutdown(&mut rx, &mut conn, remote_addr).await;
27952845
let transport_error = TransportError::Other(anyhow!(
27962846
"Failed to deserialize inbound message from {remote_addr}: {error:?}"
27972847
));
@@ -2805,6 +2855,10 @@ async fn peer_connection_listener(
28052855
?error,
28062856
"[CONN_LIFECYCLE] peer_connection_listener terminating after recv error"
28072857
);
2858+
// CRITICAL: Drain pending outbound messages before exiting.
2859+
// Messages may have been queued to the channel while we were
2860+
// waiting in select!, and they would be lost without this drain.
2861+
drain_pending_before_shutdown(&mut rx, &mut conn, remote_addr).await;
28082862
notify_transport_closed(&conn_events, remote_addr, error).await;
28092863
return;
28102864
}
@@ -2870,3 +2924,176 @@ fn extract_sender_from_message_mut(msg: &mut NetMessage) -> Option<&mut PeerKeyL
28702924
}
28712925

28722926
// TODO: add testing for the network loop, now it should be possible to do since we don't depend upon having real connections
2927+
2928+
#[cfg(test)]
2929+
mod tests {
2930+
use std::sync::atomic::{AtomicUsize, Ordering};
2931+
use std::sync::Arc;
2932+
use tokio::sync::mpsc;
2933+
use tokio::time::{sleep, timeout, Duration};
2934+
2935+
/// Regression test for message loss during shutdown.
2936+
///
2937+
/// This test validates the drain-before-shutdown pattern that prevents message loss
2938+
/// when an error occurs during select! wait. The bug scenario:
2939+
///
2940+
/// 1. Listener enters select! waiting for inbound/outbound
2941+
/// 2. Message is queued to the channel while select! is waiting
2942+
/// 3. Error occurs (e.g., connection timeout, deserialize failure)
2943+
/// 4. select! returns with the error
2944+
/// 5. BUG: Without drain, we return immediately and the queued message is lost
2945+
/// 6. FIX: Drain pending messages before returning
2946+
///
2947+
/// This test simulates the pattern by:
2948+
/// - Having a "listener" that waits on select! then encounters an error
2949+
/// - Sending messages during the wait period
2950+
/// - Verifying all messages are processed (drained) before exit
2951+
#[tokio::test]
2952+
async fn test_drain_before_shutdown_prevents_message_loss() {
2953+
let (tx, mut rx) = mpsc::channel::<String>(10);
2954+
let processed = Arc::new(AtomicUsize::new(0));
2955+
let processed_clone = processed.clone();
2956+
2957+
// Simulate peer_connection_listener pattern
2958+
let listener = tokio::spawn(async move {
2959+
// Drain-then-select loop (simplified)
2960+
loop {
2961+
// Phase 1: Drain pending messages (like the loop at start of peer_connection_listener)
2962+
loop {
2963+
match rx.try_recv() {
2964+
Ok(msg) => {
2965+
tracing::debug!("Drained message: {}", msg);
2966+
processed_clone.fetch_add(1, Ordering::SeqCst);
2967+
}
2968+
Err(mpsc::error::TryRecvError::Empty) => break,
2969+
Err(mpsc::error::TryRecvError::Disconnected) => return,
2970+
}
2971+
}
2972+
2973+
// Phase 2: Wait for new messages or "error" (simulated by timeout)
2974+
tokio::select! {
2975+
msg = rx.recv() => {
2976+
match msg {
2977+
Some(m) => {
2978+
tracing::debug!("Received via select: {}", m);
2979+
processed_clone.fetch_add(1, Ordering::SeqCst);
2980+
}
2981+
None => return, // Channel closed
2982+
}
2983+
}
2984+
// Simulate error condition (e.g., connection timeout)
2985+
_ = sleep(Duration::from_millis(50)) => {
2986+
tracing::debug!("Simulated error occurred");
2987+
2988+
// CRITICAL: This is the fix - drain before shutdown
2989+
// Without this, messages queued during the 50ms wait would be lost
2990+
let mut drained = 0;
2991+
while let Ok(msg) = rx.try_recv() {
2992+
tracing::debug!("Drain before shutdown: {}", msg);
2993+
processed_clone.fetch_add(1, Ordering::SeqCst);
2994+
drained += 1;
2995+
}
2996+
tracing::debug!("Drained {} messages before shutdown", drained);
2997+
return;
2998+
}
2999+
}
3000+
}
3001+
});
3002+
3003+
// Send messages with timing that causes them to arrive DURING the select! wait
3004+
// This is the race condition that causes message loss without the drain
3005+
tokio::spawn(async move {
3006+
// Wait for listener to enter select!
3007+
sleep(Duration::from_millis(10)).await;
3008+
3009+
// Send messages while listener is in select! waiting
3010+
for i in 0..5 {
3011+
tx.send(format!("msg{}", i)).await.unwrap();
3012+
sleep(Duration::from_millis(5)).await;
3013+
}
3014+
// Don't close the channel - let the timeout trigger the "error"
3015+
});
3016+
3017+
// Wait for listener to complete
3018+
let result = timeout(Duration::from_millis(200), listener).await;
3019+
assert!(result.is_ok(), "Listener should complete");
3020+
3021+
// All 5 messages should have been processed
3022+
let count = processed.load(Ordering::SeqCst);
3023+
assert_eq!(
3024+
count, 5,
3025+
"All messages should be processed (drained before shutdown). Got {} instead of 5. \
3026+
If this fails, messages are being lost during the error-exit drain.",
3027+
count
3028+
);
3029+
}
3030+
3031+
/// Test that demonstrates what happens WITHOUT the drain (the bug we're preventing).
3032+
/// This test intentionally shows the failure mode when drain is missing.
3033+
///
3034+
/// The key is to ensure messages are queued AFTER select! starts but the error
3035+
/// fires immediately, so select! returns with error before processing the messages.
3036+
#[tokio::test]
3037+
async fn test_message_loss_without_drain() {
3038+
let (tx, mut rx) = mpsc::channel::<String>(10);
3039+
let processed = Arc::new(AtomicUsize::new(0));
3040+
let processed_clone = processed.clone();
3041+
3042+
// Signal when listener is ready for messages
3043+
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
3044+
3045+
// Buggy listener - NO drain before shutdown (demonstrates the bug)
3046+
let listener = tokio::spawn(async move {
3047+
// Drain at loop start (the original PR #2255 fix)
3048+
loop {
3049+
match rx.try_recv() {
3050+
Ok(msg) => {
3051+
tracing::debug!("Drained message: {}", msg);
3052+
processed_clone.fetch_add(1, Ordering::SeqCst);
3053+
}
3054+
Err(mpsc::error::TryRecvError::Empty) => break,
3055+
Err(mpsc::error::TryRecvError::Disconnected) => return,
3056+
}
3057+
}
3058+
3059+
// Signal that we're about to enter select! and then immediately timeout
3060+
let _ = ready_tx.send(());
3061+
3062+
// Immediate timeout to simulate error - messages sent after this starts are lost
3063+
tokio::select! {
3064+
biased; // Ensure timeout wins if both are ready
3065+
_ = sleep(Duration::from_millis(1)) => {
3066+
tracing::debug!("Error occurred - BUG: returning without drain!");
3067+
// BUG: No drain here! Messages queued during this 1ms window are lost.
3068+
}
3069+
msg = rx.recv() => {
3070+
if let Some(m) = msg {
3071+
tracing::debug!("Received via select: {}", m);
3072+
processed_clone.fetch_add(1, Ordering::SeqCst);
3073+
}
3074+
}
3075+
}
3076+
});
3077+
3078+
// Wait for listener to be ready, then immediately send messages
3079+
let _ = ready_rx.await;
3080+
3081+
// Send messages - these arrive while select! is running with a very short timeout
3082+
// Some or all will be lost because we return without draining
3083+
for i in 0..5 {
3084+
let _ = tx.send(format!("msg{}", i)).await;
3085+
}
3086+
3087+
let _ = timeout(Duration::from_millis(100), listener).await;
3088+
3089+
// Without the drain fix, messages are lost
3090+
let count = processed.load(Ordering::SeqCst);
3091+
// The listener exits immediately due to timeout, so no messages should be processed
3092+
assert!(
3093+
count < 5,
3094+
"Without drain, messages should be lost. Got {} (expected < 5). \
3095+
This test validates the bug exists when drain is missing.",
3096+
count
3097+
);
3098+
}
3099+
}

0 commit comments

Comments
 (0)