From 1723ee2171468ac4fc5f2a63ce635ec3470592d0 Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Thu, 14 May 2026 16:20:45 +0100 Subject: [PATCH] [core] No interruptions when acking is enabled. When we are using websockets and acking we also use CoT chunking so the next response message is never far off. By not allowing interruptions we can make stronger assumptions about the order of events in the stream which greatly simplifies (one of) the backend implementation. --- codex-rs/core/src/session/mod.rs | 11 ++++ codex-rs/core/src/session/tests.rs | 40 +++++++++++++ codex-rs/core/src/session/turn.rs | 10 +++- codex-rs/core/tests/suite/pending_input.rs | 69 ++++++++++++++++++++++ 4 files changed, 128 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 0a29e9788364..d03cdfacad40 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -3187,6 +3187,17 @@ impl Session { } pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) { + // Ack-enabled websocket responses cannot be interrupted mid-stream. If mailbox mail was + // already queued before the final answer boundary, keep it eligible for the immediate + // post-ack follow-up instead of silently pushing it to a later user turn. + if self + .features + .enabled(Feature::ResponsesWebsocketResponseProcessed) + && self.has_pending_mailbox_items().await + { + return; + } + let turn_state = self.turn_state_for_sub_id(sub_id).await; let Some(turn_state) = turn_state else { return; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 176ea9413f80..bd396665de7c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -8797,6 +8797,46 @@ async fn queue_only_mailbox_mail_waits_for_next_turn_after_answer_boundary() { ); } +#[tokio::test] +async fn queued_mailbox_mail_stays_in_current_turn_after_answer_boundary_when_ack_enabled() { + let (sess, tc, _rx) = make_session_and_context_with_auth_and_config_and_rx( + CodexAuth::from_api_key("Test API Key"), + Vec::new(), + |config| { + config + .features + .enable(Feature::ResponsesWebsocketResponseProcessed) + .expect("response processed websocket mode should be enableable in tests"); + }, + ) + .await; + let communication = InterAgentCommunication::new( + AgentPath::try_from("/root/worker").expect("worker path should parse"), + AgentPath::root(), + Vec::new(), + "queued child update".to_string(), + /*trigger_turn*/ false, + ); + sess.spawn_task( + Arc::clone(&tc), + Vec::new(), + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: true, + }, + ) + .await; + + sess.enqueue_mailbox_communication(communication.clone()); + sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; + + assert!(sess.has_pending_input().await); + assert_eq!( + sess.get_pending_input().await, + vec![communication.to_response_input_item()], + ); +} + #[tokio::test] async fn trigger_turn_mailbox_mail_waits_for_next_turn_after_answer_boundary() { let (sess, tc, _rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 86710e487139..f713c21b5199 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -2021,8 +2021,14 @@ async fn try_run_sampling_request( last_agent_message = Some(agent_message); } needs_follow_up |= output_result.needs_follow_up; - // todo: remove before stabilizing multi-agent v2 - if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() { + // Ack-enabled websocket flows must observe `response.completed` so they can + // acknowledge that exact response before issuing any mailbox follow-up request. + if !sess + .features + .enabled(Feature::ResponsesWebsocketResponseProcessed) + && preempt_for_mailbox_mail + && sess.mailbox_rx.lock().await.has_pending() + { break Ok(SamplingRequestResult { needs_follow_up: true, last_agent_message, diff --git a/codex-rs/core/tests/suite/pending_input.rs b/codex-rs/core/tests/suite/pending_input.rs index 62851c515d59..ddbb1dc71584 100644 --- a/codex-rs/core/tests/suite/pending_input.rs +++ b/codex-rs/core/tests/suite/pending_input.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use codex_core::CodexThread; +use codex_features::Feature; use codex_protocol::AgentPath; use codex_protocol::items::TurnItem; use codex_protocol::models::PermissionProfile; @@ -373,6 +374,74 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn queued_inter_agent_mail_waits_for_response_completion_when_ack_enabled() { + let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel(); + + let first_chunks = vec![ + chunk(ev_response_created("resp-1")), + chunk(ev_reasoning_item_added("reason-1", &["thinking"])), + gated_chunk( + gate_reasoning_done_rx, + vec![ + ev_reasoning_item("reason-1", &["thinking"], &[]), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("first answer"), + ev_message_item_done("msg-1", "first answer"), + ev_completed("resp-1"), + ], + ), + ]; + + let (server, _completions) = + start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await; + + let codex = test_codex() + .with_model("gpt-5.4") + .with_config(|config| { + config + .features + .enable(Feature::ResponsesWebsocketResponseProcessed) + .expect("test config should allow feature update"); + }) + .build_with_streaming_server(&server) + .await + .unwrap_or_else(|err| panic!("build streaming Codex test session: {err}")) + .codex; + + submit_user_input(&codex, "first prompt").await; + + wait_for_reasoning_item_started(&codex).await; + submit_queue_only_agent_mail(&codex, "queued child update").await; + + let _ = gate_reasoning_done_tx.send(()); + + wait_for_agent_message(&codex, "first answer").await; + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_eq!(requests.len(), 2); + + let second_body: Value = from_slice(&requests[1]).expect("parse second request"); + let has_queued_mail = second_body + .get("input") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter(|item| item.get("role").and_then(Value::as_str) == Some("assistant")) + .filter_map(|item| item.get("content").and_then(Value::as_array)) + .flatten() + .filter(|span| span.get("type").and_then(Value::as_str) == Some("output_text")) + .filter_map(|span| span.get("text").and_then(Value::as_str)) + .any(|text| text.contains("\"content\":\"queued child update\"")); + assert!( + has_queued_mail, + "queued child update should be injected only after the first response completes" + ); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_item() { let (gate_message_done_tx, gate_message_done_rx) = oneshot::channel();