Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3187,6 +3187,17 @@ impl Session {
}

pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) {
// Ack-enabled websocket responses cannot be interrupted mid-stream. If mailbox mail was
// already queued before the final answer boundary, keep it eligible for the immediate
// post-ack follow-up instead of silently pushing it to a later user turn.
if self
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
&& self.has_pending_mailbox_items().await
{
return;
}

let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
Expand Down
40 changes: 40 additions & 0 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8797,6 +8797,46 @@ async fn queue_only_mailbox_mail_waits_for_next_turn_after_answer_boundary() {
);
}

#[tokio::test]
async fn queued_mailbox_mail_stays_in_current_turn_after_answer_boundary_when_ack_enabled() {
let (sess, tc, _rx) = make_session_and_context_with_auth_and_config_and_rx(
CodexAuth::from_api_key("Test API Key"),
Vec::new(),
|config| {
config
.features
.enable(Feature::ResponsesWebsocketResponseProcessed)
.expect("response processed websocket mode should be enableable in tests");
},
)
.await;
let communication = InterAgentCommunication::new(
AgentPath::try_from("/root/worker").expect("worker path should parse"),
AgentPath::root(),
Vec::new(),
"queued child update".to_string(),
/*trigger_turn*/ false,
);
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: true,
},
)
.await;

sess.enqueue_mailbox_communication(communication.clone());
sess.defer_mailbox_delivery_to_next_turn(&tc.sub_id).await;

assert!(sess.has_pending_input().await);
assert_eq!(
sess.get_pending_input().await,
vec![communication.to_response_input_item()],
);
}

#[tokio::test]
async fn trigger_turn_mailbox_mail_waits_for_next_turn_after_answer_boundary() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
Expand Down
10 changes: 8 additions & 2 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2021,8 +2021,14 @@ async fn try_run_sampling_request(
last_agent_message = Some(agent_message);
}
needs_follow_up |= output_result.needs_follow_up;
// todo: remove before stabilizing multi-agent v2
if preempt_for_mailbox_mail && sess.mailbox_rx.lock().await.has_pending() {
// Ack-enabled websocket flows must observe `response.completed` so they can
// acknowledge that exact response before issuing any mailbox follow-up request.
if !sess
.features
.enabled(Feature::ResponsesWebsocketResponseProcessed)
&& preempt_for_mailbox_mail
&& sess.mailbox_rx.lock().await.has_pending()
{
break Ok(SamplingRequestResult {
needs_follow_up: true,
last_agent_message,
Expand Down
69 changes: 69 additions & 0 deletions codex-rs/core/tests/suite/pending_input.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use codex_core::CodexThread;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::items::TurnItem;
use codex_protocol::models::PermissionProfile;
Expand Down Expand Up @@ -373,6 +374,74 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() {
server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_inter_agent_mail_waits_for_response_completion_when_ack_enabled() {
let (gate_reasoning_done_tx, gate_reasoning_done_rx) = oneshot::channel();

let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_reasoning_item_added("reason-1", &["thinking"])),
gated_chunk(
gate_reasoning_done_rx,
vec![
ev_reasoning_item("reason-1", &["thinking"], &[]),
ev_message_item_added("msg-1", ""),
ev_output_text_delta("first answer"),
ev_message_item_done("msg-1", "first answer"),
ev_completed("resp-1"),
],
),
];

let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, response_completed_chunks("resp-2")]).await;

let codex = test_codex()
.with_model("gpt-5.4")
.with_config(|config| {
config
.features
.enable(Feature::ResponsesWebsocketResponseProcessed)
.expect("test config should allow feature update");
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"))
.codex;

submit_user_input(&codex, "first prompt").await;

wait_for_reasoning_item_started(&codex).await;
submit_queue_only_agent_mail(&codex, "queued child update").await;

let _ = gate_reasoning_done_tx.send(());

wait_for_agent_message(&codex, "first answer").await;
wait_for_turn_complete(&codex).await;

let requests = server.requests().await;
assert_eq!(requests.len(), 2);

let second_body: Value = from_slice(&requests[1]).expect("parse second request");
let has_queued_mail = second_body
.get("input")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter(|item| item.get("role").and_then(Value::as_str) == Some("assistant"))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("output_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str))
.any(|text| text.contains("\"content\":\"queued child update\""));
assert!(
has_queued_mail,
"queued child update should be injected only after the first response completes"
);

server.shutdown().await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_item() {
let (gate_message_done_tx, gate_message_done_rx) = oneshot::channel();
Expand Down
Loading