Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions codex-rs/core/tests/common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,54 +237,6 @@ pub fn find_codex_linux_sandbox_exe() -> Result<PathBuf, CargoBinError> {
codex_utils_cargo_bin::cargo_bin("codex-linux-sandbox")
}

/// Builds an SSE stream body from a JSON fixture.
///
/// The fixture must contain an array of objects where each object represents a
/// single SSE event with at least a `type` field matching the `event:` value.
/// Additional fields become the JSON payload for the `data:` line. An object
/// with only a `type` field results in an event with no `data:` section. This
/// makes it trivial to extend the fixtures as OpenAI adds new event kinds or
/// fields.
pub fn load_sse_fixture(path: impl AsRef<std::path::Path>) -> String {
let events: Vec<serde_json::Value> =
serde_json::from_reader(std::fs::File::open(path).expect("read fixture"))
.expect("parse JSON fixture");
events
.into_iter()
.map(|e| {
let kind = e
.get("type")
.and_then(|v| v.as_str())
.expect("fixture event missing type");
if e.as_object().map(|o| o.len() == 1).unwrap_or(false) {
format!("event: {kind}\n\n")
} else {
format!("event: {kind}\ndata: {e}\n\n")
}
})
.collect()
}

pub fn load_sse_fixture_with_id_from_str(raw: &str, id: &str) -> String {
let replaced = raw.replace("__ID__", id);
let events: Vec<serde_json::Value> =
serde_json::from_str(&replaced).expect("parse JSON fixture");
events
.into_iter()
.map(|e| {
let kind = e
.get("type")
.and_then(|v| v.as_str())
.expect("fixture event missing type");
if e.as_object().map(|o| o.len() == 1).unwrap_or(false) {
format!("event: {kind}\n\n")
} else {
format!("event: {kind}\ndata: {e}\n\n")
}
})
.collect()
}

pub async fn wait_for_event<F>(
codex: &CodexThread,
predicate: F,
Expand Down
3 changes: 0 additions & 3 deletions codex-rs/core/tests/fixtures/incomplete_sse.json

This file was deleted.

26 changes: 10 additions & 16 deletions codex-rs/core/tests/suite/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use core_test_support::PathBufExt;
use core_test_support::apps_test_server::AppsTestServer;
use core_test_support::load_default_config_for_test;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_message_item_added;
Expand Down Expand Up @@ -3016,22 +3017,15 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
let server = MockServer::start().await;

// Build a small SSE stream with deltas and a final assistant message.
// We emit the same body for all 3 turns; ids vary but are unused by assertions.
let sse_raw = r##"[
{"type":"response.output_item.added", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":""}]
}},
{"type":"response.output_text.delta", "delta":"Hey "},
{"type":"response.output_text.delta", "delta":"there"},
{"type":"response.output_text.delta", "delta":"!\n"},
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":"Hey there!\n"}]
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"##;
let sse1 = core_test_support::load_sse_fixture_with_id_from_str(sse_raw, "resp1");
// We emit the same body for all 3 turns.
let sse1 = sse(vec![
ev_message_item_added("msg-1", ""),
ev_output_text_delta("Hey "),
ev_output_text_delta("there"),
ev_output_text_delta("!\n"),
ev_assistant_message("msg-1", "Hey there!\n"),
ev_completed("resp1"),
]);

let request_log = mount_sse_sequence(&server, vec![sse1.clone(), sse1.clone(), sse1]).await;

Expand Down
128 changes: 50 additions & 78 deletions codex-rs/core/tests/suite/review.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
use core_test_support::PathBufExt;
use core_test_support::load_sse_fixture_with_id_from_str;
use core_test_support::responses;
use core_test_support::responses::ResponseMock;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::start_mock_server;
Expand Down Expand Up @@ -61,17 +61,11 @@ async fn review_op_emits_lifecycle_and_review_output() {
"overall_confidence_score": 0.8
})
.to_string();
let sse_template = r#"[
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":__REVIEW__}]
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let review_json_escaped = serde_json::to_string(&review_json).unwrap();
let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped);
let (server, _request_log) =
start_responses_server_with_sse(&sse_raw, /*expected_requests*/ 1).await;
let (server, _request_log) = start_responses_server_with_sse(
assistant_message_sse(&review_json),
/*expected_requests*/ 1,
)
.await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await;

Expand Down Expand Up @@ -186,15 +180,11 @@ async fn review_op_emits_lifecycle_and_review_output() {
async fn review_op_with_plain_text_emits_review_fallback() {
skip_if_no_network!();

let sse_raw = r#"[
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":"just plain text"}]
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let (server, _request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await;
let (server, _request_log) = start_responses_server_with_sse(
assistant_message_sse("just plain text"),
/*expected_requests*/ 1,
)
.await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await;

Expand Down Expand Up @@ -240,22 +230,17 @@ async fn review_op_with_plain_text_emits_review_fallback() {
async fn review_filters_agent_message_related_events() {
skip_if_no_network!();

// Stream simulating a typing assistant message with deltas and finalization.
let sse_raw = r#"[
{"type":"response.output_item.added", "item":{
"type":"message", "role":"assistant", "id":"msg-1",
"content":[{"type":"output_text","text":""}]
}},
{"type":"response.output_text.delta", "delta":"Hi"},
{"type":"response.output_text.delta", "delta":" there"},
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant", "id":"msg-1",
"content":[{"type":"output_text","text":"Hi there"}]
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let (server, _request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await;
let (server, _request_log) = start_responses_server_with_sse(
vec![
responses::ev_message_item_added("msg-1", ""),
responses::ev_output_text_delta("Hi"),
responses::ev_output_text_delta(" there"),
responses::ev_assistant_message("msg-1", "Hi there"),
responses::ev_completed("resp-1"),
],
/*expected_requests*/ 1,
)
.await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await;

Expand Down Expand Up @@ -325,17 +310,11 @@ async fn review_does_not_emit_agent_message_on_structured_output() {
"overall_confidence_score": 0.5
})
.to_string();
let sse_template = r#"[
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":__REVIEW__}]
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let review_json_escaped = serde_json::to_string(&review_json).unwrap();
let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped);
let (server, _request_log) =
start_responses_server_with_sse(&sse_raw, /*expected_requests*/ 1).await;
let (server, _request_log) = start_responses_server_with_sse(
assistant_message_sse(&review_json),
/*expected_requests*/ 1,
)
.await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await;

Expand Down Expand Up @@ -386,12 +365,8 @@ async fn review_does_not_emit_agent_message_on_structured_output() {
async fn review_uses_custom_review_model_from_config() {
skip_if_no_network!();

// Minimal stream: just a completed event
let sse_raw = r#"[
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let (server, request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await;
start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await;
let codex_home = Arc::new(TempDir::new().unwrap());
// Choose a review model different from the main model; ensure it is used.
let codex = new_conversation_for_server(&server, codex_home.clone(), |cfg| {
Expand Down Expand Up @@ -441,12 +416,8 @@ async fn review_uses_custom_review_model_from_config() {
async fn review_uses_session_model_when_review_model_unset() {
skip_if_no_network!();

// Minimal stream: just a completed event
let sse_raw = r#"[
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let (server, request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await;
start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = new_conversation_for_server(&server, codex_home.clone(), |cfg| {
cfg.model = Some("gpt-4.1".to_string());
Expand Down Expand Up @@ -496,12 +467,8 @@ async fn review_uses_session_model_when_review_model_unset() {
async fn review_input_isolated_from_parent_history() {
skip_if_no_network!();

// Mock server for the single review request
let sse_raw = r#"[
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let (server, request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await;
start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await;

// Seed a parent session history via resume file with both user + assistant items.
let codex_home = Arc::new(TempDir::new().unwrap());
Expand Down Expand Up @@ -674,16 +641,11 @@ async fn review_input_isolated_from_parent_history() {
async fn review_history_surfaces_in_parent_session() {
skip_if_no_network!();

// Respond to both the review request and the subsequent parent request.
let sse_raw = r#"[
{"type":"response.output_item.done", "item":{
"type":"message", "role":"assistant",
"content":[{"type":"output_text","text":"review assistant output"}]
}},
{"type":"response.completed", "response": {"id": "__ID__"}}
]"#;
let (server, request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 2).await;
let (server, request_log) = start_responses_server_with_sse(
assistant_message_sse("review assistant output"),
/*expected_requests*/ 2,
)
.await;
let codex_home = Arc::new(TempDir::new().unwrap());
let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await;

Expand Down Expand Up @@ -776,9 +738,8 @@ async fn review_history_surfaces_in_parent_session() {
async fn review_uses_overridden_cwd_for_base_branch_merge_base() {
skip_if_no_network!();

let sse_raw = r#"[{"type":"response.completed", "response": {"id": "__ID__"}}]"#;
let (server, request_log) =
start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await;
start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await;

let initial_cwd = TempDir::new().unwrap();

Expand Down Expand Up @@ -881,13 +842,24 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() {
server.verify().await;
}

/// Start a mock Responses API server and mount the given SSE stream body.
fn assistant_message_sse(text: &str) -> Vec<serde_json::Value> {
vec![
responses::ev_assistant_message("msg-1", text),
responses::ev_completed("resp-1"),
]
}

fn completed_sse() -> Vec<serde_json::Value> {
vec![responses::ev_completed("resp-1")]
}

/// Start a mock Responses API server and mount the given SSE events.
async fn start_responses_server_with_sse(
sse_raw: &str,
events: Vec<serde_json::Value>,
expected_requests: usize,
) -> (MockServer, ResponseMock) {
let server = start_mock_server().await;
let sse = load_sse_fixture_with_id_from_str(sse_raw, &Uuid::new_v4().to_string());
let sse = responses::sse(events);
let responses = vec![sse; expected_requests];
let request_log = mount_sse_sequence(&server, responses).await;
(server, request_log)
Expand Down
8 changes: 3 additions & 5 deletions codex-rs/core/tests/suite/stream_no_completed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use codex_model_provider_info::WireApi;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use codex_utils_cargo_bin::find_resource;
use core_test_support::load_sse_fixture;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
Expand All @@ -17,9 +15,9 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;

fn sse_incomplete() -> String {
let fixture = find_resource!("tests/fixtures/incomplete_sse.json")
.unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}"));
load_sse_fixture(fixture)
responses::sse(vec![serde_json::json!({
"type": "response.output_item.done",
})])
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
Loading