diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 0a29e9788364..d03cdfacad40 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -3187,6 +3187,17 @@ impl Session { } pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) { + // Ack-enabled websocket responses cannot be interrupted mid-stream. If mailbox mail was + // already queued before the final answer boundary, keep it eligible for the immediate + // post-ack follow-up instead of silently pushing it to a later user turn. + if self + .features + .enabled(Feature::ResponsesWebsocketResponseProcessed) + && self.has_pending_mailbox_items().await + { + return; + } + let turn_state = self.turn_state_for_sub_id(sub_id).await; let Some(turn_state) = turn_state else { return; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 176ea9413f80..bd396665de7c 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -8797,6 +8797,46 @@ async fn queue_only_mailbox_mail_waits_for_next_turn_after_answer_boundary() { ); } +#[tokio::test] +async fn queued_mailbox_mail_stays_in_current_turn_after_answer_boundary_when_ack_enabled() { + let (sess, tc, _rx) = make_session_and_context_with_auth_and_config_and_rx( + CodexAuth::from_api_key("Test API Key"), + Vec::new(), + |config| { + config + .features + .enable(Feature::ResponsesWebsocketResponseProcessed) + .expect("response processed websocket mode should be enableable in tests"); + }, + ) + .await; + let communication = InterAgentCommunication::new( + AgentPath::try_from("/root/worker").expect("worker path should parse"), + AgentPath::root(), + Vec::new(), + "queued child update".to_string(), + /*trigger_turn*/ false, + ); + sess.spawn_task( + Arc::clone(&tc), + Vec::new(), + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: true, + }, + ) + .await; + + sess.enqueue_mailbox_communication(communication.clone()); + sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await; + + assert!(sess.has_pending_input().await); + assert_eq!( + sess.get_pending_input().await, + vec![communication.to_response_input_item()], + ); +} + #[tokio::test] async fn trigger_turn_mailbox_mail_waits_for_next_turn_after_answer_boundary() { let (sess, tc, _rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 86710e487139..f713c21b5199 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -2021,8 +2021,14 @@ async fn try_run_sampling_request( last_agent_message = Some(agent_message); } needs_follow_up |= output_result.needs_follow_up; - // todo: remove before stabilizing multi-agent v2 - if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() { + // Ack-enabled websocket flows must observe `response.completed` so they can + // acknowledge that exact response before issuing any mailbox follow-up request. + if !sess + .features + .enabled(Feature::ResponsesWebsocketResponseProcessed) + && preempt_for_mailbox_mail + && sess.mailbox_rx.lock().await.has_pending() + { break Ok(SamplingRequestResult { needs_follow_up: true, last_agent_message, diff --git a/codex-rs/core/tests/suite/pending_input.rs b/codex-rs/core/tests/suite/pending_input.rs index 62851c515d59..ddbb1dc71584 100644 --- a/codex-rs/core/tests/suite/pending_input.rs +++ b/codex-rs/core/tests/suite/pending_input.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use codex_core::CodexThread; +use codex_features::Feature; use codex_protocol::AgentPath; use codex_protocol::items::TurnItem; use codex_protocol::models::PermissionProfile; @@ -373,6 +374,74 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn queued_inter_agent_mail_waits_for_response_completion_when_ack_enabled() { + let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel(); + + let first_chunks = vec![ + chunk(ev_response_created("resp-1")), + chunk(ev_reasoning_item_added("reason-1", &["thinking"])), + gated_chunk( + gate_reasoning_done_rx, + vec![ + ev_reasoning_item("reason-1", &["thinking"], &[]), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("first answer"), + ev_message_item_done("msg-1", "first answer"), + ev_completed("resp-1"), + ], + ), + ]; + + let (server, _completions) = + start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await; + + let codex = test_codex() + .with_model("gpt-5.4") + .with_config(|config| { + config + .features + .enable(Feature::ResponsesWebsocketResponseProcessed) + .expect("test config should allow feature update"); + }) + .build_with_streaming_server(&server) + .await + .unwrap_or_else(|err| panic!("build streaming Codex test session: {err}")) + .codex; + + submit_user_input(&codex, "first prompt").await; + + wait_for_reasoning_item_started(&codex).await; + submit_queue_only_agent_mail(&codex, "queued child update").await; + + let _ = gate_reasoning_done_tx.send(()); + + wait_for_agent_message(&codex, "first answer").await; + wait_for_turn_complete(&codex).await; + + let requests = server.requests().await; + assert_eq!(requests.len(), 2); + + let second_body: Value = from_slice(&requests[1]).expect("parse second request"); + let has_queued_mail = second_body + .get("input") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter(|item| item.get("role").and_then(Value::as_str) == Some("assistant")) + .filter_map(|item| item.get("content").and_then(Value::as_array)) + .flatten() + .filter(|span| span.get("type").and_then(Value::as_str) == Some("output_text")) + .filter_map(|span| span.get("text").and_then(Value::as_str)) + .any(|text| text.contains("\"content\":\"queued child update\"")); + assert!( + has_queued_mail, + "queued child update should be injected only after the first response completes" + ); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_item() { let (gate_message_done_tx, gate_message_done_rx) = oneshot::channel();