From 063a78e6fcf8b6fcd66edc428f3efbd4ddf91b9b Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Thu, 14 May 2026 12:51:14 -0700 Subject: [PATCH] Remove SSE fixture loaders --- codex-rs/core/tests/common/lib.rs | 48 ------- .../core/tests/fixtures/incomplete_sse.json | 3 - codex-rs/core/tests/suite/client.rs | 26 ++-- codex-rs/core/tests/suite/review.rs | 128 +++++++----------- .../core/tests/suite/stream_no_completed.rs | 8 +- 5 files changed, 63 insertions(+), 150 deletions(-) delete mode 100644 codex-rs/core/tests/fixtures/incomplete_sse.json diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 70e1a3f0e441..ad7858f8046f 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -237,54 +237,6 @@ pub fn find_codex_linux_sandbox_exe() -> Result { codex_utils_cargo_bin::cargo_bin("codex-linux-sandbox") } -/// Builds an SSE stream body from a JSON fixture. -/// -/// The fixture must contain an array of objects where each object represents a -/// single SSE event with at least a `type` field matching the `event:` value. -/// Additional fields become the JSON payload for the `data:` line. An object -/// with only a `type` field results in an event with no `data:` section. This -/// makes it trivial to extend the fixtures as OpenAI adds new event kinds or -/// fields. -pub fn load_sse_fixture(path: impl AsRef) -> String { - let events: Vec = - serde_json::from_reader(std::fs::File::open(path).expect("read fixture")) - .expect("parse JSON fixture"); - events - .into_iter() - .map(|e| { - let kind = e - .get("type") - .and_then(|v| v.as_str()) - .expect("fixture event missing type"); - if e.as_object().map(|o| o.len() == 1).unwrap_or(false) { - format!("event: {kind}\n\n") - } else { - format!("event: {kind}\ndata: {e}\n\n") - } - }) - .collect() -} - -pub fn load_sse_fixture_with_id_from_str(raw: &str, id: &str) -> String { - let replaced = raw.replace("__ID__", id); - let events: Vec = - serde_json::from_str(&replaced).expect("parse JSON fixture"); - events - .into_iter() - .map(|e| { - let kind = e - .get("type") - .and_then(|v| v.as_str()) - .expect("fixture event missing type"); - if e.as_object().map(|o| o.len() == 1).unwrap_or(false) { - format!("event: {kind}\n\n") - } else { - format!("event: {kind}\ndata: {e}\n\n") - } - }) - .collect() -} - pub async fn wait_for_event( codex: &CodexThread, predicate: F, diff --git a/codex-rs/core/tests/fixtures/incomplete_sse.json b/codex-rs/core/tests/fixtures/incomplete_sse.json deleted file mode 100644 index 2876bbfd2970..000000000000 --- a/codex-rs/core/tests/fixtures/incomplete_sse.json +++ /dev/null @@ -1,3 +0,0 @@ -[ - {"type": "response.output_item.done"} -] diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 8a461edc63d1..42bacb9bdc5f 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -52,6 +52,7 @@ use core_test_support::PathBufExt; use core_test_support::apps_test_server::AppsTestServer; use core_test_support::load_default_config_for_test; use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_completed_with_tokens; use core_test_support::responses::ev_message_item_added; @@ -3016,22 +3017,15 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { let server = MockServer::start().await; // Build a small SSE stream with deltas and a final assistant message. - // We emit the same body for all 3 turns; ids vary but are unused by assertions. - let sse_raw = r##"[ - {"type":"response.output_item.added", "item":{ - "type":"message", "role":"assistant", - "content":[{"type":"output_text","text":""}] - }}, - {"type":"response.output_text.delta", "delta":"Hey "}, - {"type":"response.output_text.delta", "delta":"there"}, - {"type":"response.output_text.delta", "delta":"!\n"}, - {"type":"response.output_item.done", "item":{ - "type":"message", "role":"assistant", - "content":[{"type":"output_text","text":"Hey there!\n"}] - }}, - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"##; - let sse1 = core_test_support::load_sse_fixture_with_id_from_str(sse_raw, "resp1"); + // We emit the same body for all 3 turns. + let sse1 = sse(vec![ + ev_message_item_added("msg-1", ""), + ev_output_text_delta("Hey "), + ev_output_text_delta("there"), + ev_output_text_delta("!\n"), + ev_assistant_message("msg-1", "Hey there!\n"), + ev_completed("resp1"), + ]); let request_log = mount_sse_sequence(&server, vec![sse1.clone(), sse1.clone(), sse1]).await; diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index f371cdacad63..3611df6a6aaf 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -18,7 +18,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::user_input::UserInput; use core_test_support::PathBufExt; -use core_test_support::load_sse_fixture_with_id_from_str; +use core_test_support::responses; use core_test_support::responses::ResponseMock; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::start_mock_server; @@ -61,17 +61,11 @@ async fn review_op_emits_lifecycle_and_review_output() { "overall_confidence_score": 0.8 }) .to_string(); - let sse_template = r#"[ - {"type":"response.output_item.done", "item":{ - "type":"message", "role":"assistant", - "content":[{"type":"output_text","text":__REVIEW__}] - }}, - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; - let review_json_escaped = serde_json::to_string(&review_json).unwrap(); - let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped); - let (server, _request_log) = - start_responses_server_with_sse(&sse_raw, /*expected_requests*/ 1).await; + let (server, _request_log) = start_responses_server_with_sse( + assistant_message_sse(&review_json), + /*expected_requests*/ 1, + ) + .await; let codex_home = Arc::new(TempDir::new().unwrap()); let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await; @@ -186,15 +180,11 @@ async fn review_op_emits_lifecycle_and_review_output() { async fn review_op_with_plain_text_emits_review_fallback() { skip_if_no_network!(); - let sse_raw = r#"[ - {"type":"response.output_item.done", "item":{ - "type":"message", "role":"assistant", - "content":[{"type":"output_text","text":"just plain text"}] - }}, - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; - let (server, _request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await; + let (server, _request_log) = start_responses_server_with_sse( + assistant_message_sse("just plain text"), + /*expected_requests*/ 1, + ) + .await; let codex_home = Arc::new(TempDir::new().unwrap()); let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await; @@ -240,22 +230,17 @@ async fn review_op_with_plain_text_emits_review_fallback() { async fn review_filters_agent_message_related_events() { skip_if_no_network!(); - // Stream simulating a typing assistant message with deltas and finalization. - let sse_raw = r#"[ - {"type":"response.output_item.added", "item":{ - "type":"message", "role":"assistant", "id":"msg-1", - "content":[{"type":"output_text","text":""}] - }}, - {"type":"response.output_text.delta", "delta":"Hi"}, - {"type":"response.output_text.delta", "delta":" there"}, - {"type":"response.output_item.done", "item":{ - "type":"message", "role":"assistant", "id":"msg-1", - "content":[{"type":"output_text","text":"Hi there"}] - }}, - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; - let (server, _request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await; + let (server, _request_log) = start_responses_server_with_sse( + vec![ + responses::ev_message_item_added("msg-1", ""), + responses::ev_output_text_delta("Hi"), + responses::ev_output_text_delta(" there"), + responses::ev_assistant_message("msg-1", "Hi there"), + responses::ev_completed("resp-1"), + ], + /*expected_requests*/ 1, + ) + .await; let codex_home = Arc::new(TempDir::new().unwrap()); let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await; @@ -325,17 +310,11 @@ async fn review_does_not_emit_agent_message_on_structured_output() { "overall_confidence_score": 0.5 }) .to_string(); - let sse_template = r#"[ - {"type":"response.output_item.done", "item":{ - "type":"message", "role":"assistant", - "content":[{"type":"output_text","text":__REVIEW__}] - }}, - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; - let review_json_escaped = serde_json::to_string(&review_json).unwrap(); - let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped); - let (server, _request_log) = - start_responses_server_with_sse(&sse_raw, /*expected_requests*/ 1).await; + let (server, _request_log) = start_responses_server_with_sse( + assistant_message_sse(&review_json), + /*expected_requests*/ 1, + ) + .await; let codex_home = Arc::new(TempDir::new().unwrap()); let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await; @@ -386,12 +365,8 @@ async fn review_does_not_emit_agent_message_on_structured_output() { async fn review_uses_custom_review_model_from_config() { skip_if_no_network!(); - // Minimal stream: just a completed event - let sse_raw = r#"[ - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; let (server, request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await; + start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await; let codex_home = Arc::new(TempDir::new().unwrap()); // Choose a review model different from the main model; ensure it is used. let codex = new_conversation_for_server(&server, codex_home.clone(), |cfg| { @@ -441,12 +416,8 @@ async fn review_uses_custom_review_model_from_config() { async fn review_uses_session_model_when_review_model_unset() { skip_if_no_network!(); - // Minimal stream: just a completed event - let sse_raw = r#"[ - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; let (server, request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await; + start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await; let codex_home = Arc::new(TempDir::new().unwrap()); let codex = new_conversation_for_server(&server, codex_home.clone(), |cfg| { cfg.model = Some("gpt-4.1".to_string()); @@ -496,12 +467,8 @@ async fn review_uses_session_model_when_review_model_unset() { async fn review_input_isolated_from_parent_history() { skip_if_no_network!(); - // Mock server for the single review request - let sse_raw = r#"[ - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; let (server, request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await; + start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await; // Seed a parent session history via resume file with both user + assistant items. let codex_home = Arc::new(TempDir::new().unwrap()); @@ -674,16 +641,11 @@ async fn review_input_isolated_from_parent_history() { async fn review_history_surfaces_in_parent_session() { skip_if_no_network!(); - // Respond to both the review request and the subsequent parent request. - let sse_raw = r#"[ - {"type":"response.output_item.done", "item":{ - "type":"message", "role":"assistant", - "content":[{"type":"output_text","text":"review assistant output"}] - }}, - {"type":"response.completed", "response": {"id": "__ID__"}} - ]"#; - let (server, request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 2).await; + let (server, request_log) = start_responses_server_with_sse( + assistant_message_sse("review assistant output"), + /*expected_requests*/ 2, + ) + .await; let codex_home = Arc::new(TempDir::new().unwrap()); let codex = new_conversation_for_server(&server, codex_home.clone(), |_| {}).await; @@ -776,9 +738,8 @@ async fn review_history_surfaces_in_parent_session() { async fn review_uses_overridden_cwd_for_base_branch_merge_base() { skip_if_no_network!(); - let sse_raw = r#"[{"type":"response.completed", "response": {"id": "__ID__"}}]"#; let (server, request_log) = - start_responses_server_with_sse(sse_raw, /*expected_requests*/ 1).await; + start_responses_server_with_sse(completed_sse(), /*expected_requests*/ 1).await; let initial_cwd = TempDir::new().unwrap(); @@ -881,13 +842,24 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() { server.verify().await; } -/// Start a mock Responses API server and mount the given SSE stream body. +fn assistant_message_sse(text: &str) -> Vec { + vec![ + responses::ev_assistant_message("msg-1", text), + responses::ev_completed("resp-1"), + ] +} + +fn completed_sse() -> Vec { + vec![responses::ev_completed("resp-1")] +} + +/// Start a mock Responses API server and mount the given SSE events. async fn start_responses_server_with_sse( - sse_raw: &str, + events: Vec, expected_requests: usize, ) -> (MockServer, ResponseMock) { let server = start_mock_server().await; - let sse = load_sse_fixture_with_id_from_str(sse_raw, &Uuid::new_v4().to_string()); + let sse = responses::sse(events); let responses = vec![sse; expected_requests]; let request_log = mount_sse_sequence(&server, responses).await; (server, request_log) diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index 984220a0862b..30574718f20e 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -6,8 +6,6 @@ use codex_model_provider_info::WireApi; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::user_input::UserInput; -use codex_utils_cargo_bin::find_resource; -use core_test_support::load_sse_fixture; use core_test_support::responses; use core_test_support::skip_if_no_network; use core_test_support::streaming_sse::StreamingSseChunk; @@ -17,9 +15,9 @@ use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; fn sse_incomplete() -> String { - let fixture = find_resource!("tests/fixtures/incomplete_sse.json") - .unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}")); - load_sse_fixture(fixture) + responses::sse(vec![serde_json::json!({ + "type": "response.output_item.done", + })]) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)]