From 7bd26f4cb72d0a95d8e5248c5e039bfb8e6e515d Mon Sep 17 00:00:00 2001 From: Jan van Lindt Date: Thu, 9 Apr 2026 14:14:33 +0200 Subject: [PATCH 1/2] fix(proxy): invoke response_body_filter for bodyless upstream responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the upstream returns 204/304, the H1 client emits a single HttpTask::Header(end=true) and no HttpTask::Body; the H2 path produces a header frame with eos=true and no DATA. The existing call to response_body_filter lives in the HttpTask::Body arm, so the filter is unreachable — making the canonical pattern (mutate status to 200 in response_filter, inject body in response_body_filter) impossible. After the upstream task loop in proxy_h1/h2.rs, detect the bodyless case and invoke response_body_filter once with body=None, end_of_stream=true. If the filter returns non-empty bytes, clear the header's end flag, rewrite Content-Length, and append a HttpTask::Body. Behavior is unchanged for responses with a body. --- pingora-proxy/src/proxy_h1.rs | 70 +++++++++++++++++++++++++++++++++++ pingora-proxy/src/proxy_h2.rs | 7 ++++ 2 files changed, 77 insertions(+) diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 9f498aa0..1a07fdb7 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -488,6 +488,13 @@ where continue; } + // Give body filter a chance to inject for bodyless responses (204/304) + self.maybe_synthesize_body_filter_call( + session, + &mut filtered_tasks, + ctx, + ).await?; + // set to downstream let upgraded = session.was_upgraded(); let response_done = session.write_response_tasks(filtered_tasks).await?; @@ -761,6 +768,69 @@ where res } + /// Invoke `response_body_filter` once with `body=None, end_of_stream=true` + /// for bodyless upstream responses (204/304), so users can inject a + /// synthesized body — typically after mutating the status in + /// `response_filter`. Without this call, the body filter is unreachable + /// for these responses because the upstream client emits no `HttpTask::Body`. + /// + /// If the filter returns non-empty bytes, the header's framing is updated + /// and a `HttpTask::Body` is appended. + pub(crate) async fn maybe_synthesize_body_filter_call( + &self, + session: &mut Session, + filtered_tasks: &mut Vec, + ctx: &mut SV::CTX, + ) -> Result<()> + where + SV: ProxyHttp + Send + Sync, + SV::CTX: Send + Sync, + { + // Fire only when the batch is a Header(end=true) with no Body task. + // Anything with a Body goes through the existing body filter call. + let mut has_header_end = false; + let mut has_body = false; + for t in filtered_tasks.iter() { + match t { + HttpTask::Header(_, true) => has_header_end = true, + HttpTask::Body(_, _) => has_body = true, + _ => {} + } + } + if !has_header_end || has_body { + return Ok(()); + } + + let mut synthetic_body: Option = None; + if let Some(duration) = + self.inner + .response_body_filter(session, &mut synthetic_body, true, ctx)? + { + trace!("delaying downstream response for {:?}", duration); + time::sleep(duration).await; + } + + let Some(injected) = synthetic_body else { + return Ok(()); + }; + if injected.is_empty() { + return Ok(()); + } + + // Rewrite framing — body filter is the source of truth for length. + for t in filtered_tasks.iter_mut() { + if let HttpTask::Header(header, end) = t { + *end = false; + header.remove_header(&header::TRANSFER_ENCODING); + header.remove_header(&header::CONTENT_LENGTH); + header.insert_header(header::CONTENT_LENGTH, injected.len().to_string())?; + break; + } + } + filtered_tasks.push(HttpTask::Body(Some(injected), true)); + Ok(()) + } + // TODO:: use this function to replace send_body_to2 async fn send_body_to_pipe( &self, diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index acf61f07..03f43d23 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -444,6 +444,13 @@ where continue; } + // Give body filter a chance to inject for bodyless responses (204/304) + self.maybe_synthesize_body_filter_call( + session, + &mut filtered_tasks, + ctx, + ).await?; + let response_done = session.write_response_tasks(filtered_tasks).await?; if session.was_upgraded() { // it is very weird if the downstream session decides to upgrade From dd6d8e63fbab1e1780fe55330b45c110b0527820 Mon Sep 17 00:00:00 2001 From: Jan van Lindt Date: Thu, 9 Apr 2026 19:42:47 +0200 Subject: [PATCH 2/2] test(proxy): regression test for body filter on bodyless upstream responses --- .../tests/test_body_filter_bodyless.rs | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 pingora-proxy/tests/test_body_filter_bodyless.rs diff --git a/pingora-proxy/tests/test_body_filter_bodyless.rs b/pingora-proxy/tests/test_body_filter_bodyless.rs new file mode 100644 index 00000000..f05b4e26 --- /dev/null +++ b/pingora-proxy/tests/test_body_filter_bodyless.rs @@ -0,0 +1,188 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Regression test for the bodyless-response body filter fix. +//! +//! Self-contained: spawns a pure-Rust HTTP/1 mock upstream and a pingora +//! proxy whose `response_filter` mutates 204 -> 200 and whose +//! `response_body_filter` injects a synthesized body. Verifies that the +//! downstream client receives the synthesized body, not an empty response. +//! +//! Does NOT depend on the openresty-based test fixture, so it runs +//! standalone (`cargo test -p pingora-proxy --test test_body_filter_bodyless`). + +use async_trait::async_trait; +use bytes::Bytes; +use once_cell::sync::Lazy; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_http::ResponseHeader; +use pingora_proxy::{ProxyHttp, Session}; + +const SYNTHESIZED_BODY: &[u8] = b""; +const PROXY_ADDR: &str = "127.0.0.1:6180"; +const UPSTREAM_ADDR: &str = "127.0.0.1:6181"; + +struct MockUpstream { + _handle: thread::JoinHandle<()>, +} + +impl MockUpstream { + fn start() -> Self { + let (tx, rx) = mpsc::channel(); + let handle = thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(Self::run(tx)); + }); + rx.recv_timeout(Duration::from_secs(5)) + .expect("mock upstream failed to bind within 5s"); + MockUpstream { _handle: handle } + } + + async fn run(ready: mpsc::Sender<()>) { + let listener = TcpListener::bind(UPSTREAM_ADDR).await.unwrap(); + let _ = ready.send(()); + loop { + let (mut sock, _) = listener.accept().await.unwrap(); + tokio::spawn(async move { + // Read request (best effort, tiny test). + let mut buf = [0u8; 1024]; + let _ = sock.read(&mut buf).await; + let req = String::from_utf8_lossy(&buf); + let resp: &[u8] = if req.starts_with("GET /no-body") { + b"HTTP/1.1 204 No Content\r\n\r\n" + } else { + b"HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello World!\n" + }; + let _ = sock.write_all(resp).await; + }); + } + } +} + +struct TestProxy; + +struct TestCtx { + inject: bool, +} + +#[async_trait] +impl ProxyHttp for TestProxy { + type CTX = TestCtx; + fn new_ctx(&self) -> Self::CTX { + TestCtx { inject: false } + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + Ok(Box::new(HttpPeer::new(UPSTREAM_ADDR, false, String::new()))) + } + + async fn response_filter( + &self, + _session: &mut Session, + upstream_response: &mut ResponseHeader, + ctx: &mut Self::CTX, + ) -> Result<()> { + if upstream_response.status.as_u16() == 204 { + upstream_response.set_status(200)?; + ctx.inject = true; + } + Ok(()) + } + + fn response_body_filter( + &self, + _session: &mut Session, + body: &mut Option, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result> { + if end_of_stream && ctx.inject { + *body = Some(Bytes::from_static(SYNTHESIZED_BODY)); + } + Ok(None) + } +} + +struct ProxyServer { + _handle: thread::JoinHandle<()>, +} + +impl ProxyServer { + fn start() -> Self { + let handle = thread::spawn(|| { + let opt = Opt { + upgrade: false, + daemon: false, + nocapture: false, + test: false, + conf: None, + }; + let mut server = Server::new(Some(opt)).unwrap(); + server.bootstrap(); + let mut svc = pingora_proxy::http_proxy_service(&server.configuration, TestProxy); + svc.add_tcp(PROXY_ADDR); + server.add_service(svc); + server.run_forever(); + }); + ProxyServer { _handle: handle } + } +} + +static UPSTREAM: Lazy = Lazy::new(MockUpstream::start); +static PROXY: Lazy = Lazy::new(ProxyServer::start); + +fn init() { + let _ = &*UPSTREAM; + let _ = &*PROXY; + // Give the pingora server a moment to bind. + thread::sleep(Duration::from_millis(300)); +} + +#[tokio::test] +async fn test_body_filter_reaches_204_upstream() { + init(); + let res = reqwest::get(format!("http://{PROXY_ADDR}/no-body")) + .await + .unwrap(); + assert_eq!(res.status(), reqwest::StatusCode::OK); + let body = res.bytes().await.unwrap(); + assert_eq!( + body.as_ref(), + SYNTHESIZED_BODY, + "expected synthesized body, got {:?}", + String::from_utf8_lossy(&body) + ); +} + +#[tokio::test] +async fn test_body_filter_passthrough_on_200_upstream() { + init(); + let res = reqwest::get(format!("http://{PROXY_ADDR}/")).await.unwrap(); + assert_eq!(res.status(), reqwest::StatusCode::OK); + let body = res.text().await.unwrap(); + assert_eq!(body, "Hello World!\n"); +}