From e7450464ddc4ebae4b0481aea03ff069de60e850 Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Mon, 16 Feb 2026 11:21:56 -0800 Subject: [PATCH 1/8] Support early request body buffering before upstream peer selection Add opt-in request body buffering via request_body_buffer_limit() trait method. When implemented, the full request body is read and filtered before request_filter runs, making it available for auth signature verification and content-based routing decisions. Resolves #780 --- pingora-proxy/src/lib.rs | 326 ++++++++++++++++++++++++++++++ pingora-proxy/src/proxy_common.rs | 7 +- pingora-proxy/src/proxy_h1.rs | 42 +++- pingora-proxy/src/proxy_h2.rs | 40 +++- pingora-proxy/src/proxy_trait.rs | 24 +++ 5 files changed, 417 insertions(+), 22 deletions(-) diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 4ddcb1dc..6baa4e46 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -446,6 +446,13 @@ pub struct Session { upstream_write_pending_time: Duration, /// Flag that is set when the shutdown process has begun. shutdown_flag: Arc, + /// Request body buffered early (before upstream connection) for auth/routing decisions. + /// When set, body forwarding will use this instead of re-reading from downstream. + /// Use accessor methods: `get_buffered_body()`, `take_buffered_body()`, `set_buffered_body()`. + buffered_request_body: Option, + /// Whether body has been fully consumed for buffering. + /// Use accessor: `is_body_buffered()`. + body_buffered: bool, } impl Session { @@ -467,6 +474,8 @@ impl Session { upstream_body_bytes_received: 0, upstream_write_pending_time: Duration::ZERO, shutdown_flag, + buffered_request_body: None, + body_buffered: false, } } @@ -653,6 +662,73 @@ impl Session { self.shutdown_flag.load(Ordering::Acquire) } + /// Returns a reference to the buffered request body, if any. + /// + /// The body is buffered by `buffer_request_body_early()` when the trait method + /// `request_body_buffer_limit()` returns `Some(max_size)`. + pub fn get_buffered_body(&self) -> Option<&Bytes> { + self.buffered_request_body.as_ref() + } + + /// Takes ownership of the buffered request body, leaving `None` in its place. + /// + /// Use this when forwarding the body to upstream - takes the body once for sending. + pub fn take_buffered_body(&mut self) -> Option { + self.buffered_request_body.take() + } + + /// Sets the buffered request body. + /// + /// This is called by `buffer_request_body_early()` after reading the full body. + /// Also useful for app code that wants to replace the body (e.g., decompression). + pub fn set_buffered_body(&mut self, body: Option) { + self.body_buffered = body.is_some() || self.body_buffered; + self.buffered_request_body = body; + } + + /// Returns whether a body has been buffered (or confirmed empty). + /// + /// When `true`, the body has been fully read and is available via `get_buffered_body()`, + /// or the request has no body. Body forwarding will skip re-reading from downstream. + pub fn is_body_buffered(&self) -> bool { + self.body_buffered + } + + /// Marks the body as buffered without setting a body. + /// + /// Used when the request has no body (no Content-Length or Transfer-Encoding), + /// to prevent `buffer_request_body_early()` from attempting to read. + pub fn mark_body_buffered(&mut self) { + self.body_buffered = true; + } + + /// Creates a Session from an H1 HttpSession (for testing only). + #[cfg(test)] + pub fn new_h1_with_http_session( + http_session: pingora_core::protocols::http::v1::server::HttpSession, + ) -> Self { + use pingora_cache::HttpCache; + use pingora_core::protocols::http::compression::ResponseCompressionCtx; + use pingora_core::protocols::http::ServerSession; + + let shutdown_flag = Arc::new(AtomicBool::new(false)); + Session { + downstream_session: Box::new(ServerSession::H1(http_session)), + cache: HttpCache::new(), + upstream_compression: ResponseCompressionCtx::new(0, false, false), + ignore_downstream_range: false, + upstream_headers_mutated_for_cache: false, + subrequest_ctx: None, + subrequest_spawner: None, + downstream_modules_ctx: HttpModuleCtx::empty(), + upstream_body_bytes_received: 0, + upstream_write_pending_time: Duration::ZERO, + shutdown_flag, + buffered_request_body: None, + body_buffered: false, + } + } + pub fn downstream_custom_message( &mut self, ) -> Result< @@ -735,6 +811,19 @@ where .await; } + // Early body buffering: read full request body BEFORE request_filter. + // This allows request_filter to access the body for auth signature verification + // and other content-based decisions. The body is buffered based on the size limit + // returned by request_body_buffer_limit() (set in early_request_filter via route matching). + // See: https://github.com/cloudflare/pingora/issues/780 + if !session.is_body_buffered() { + if let Err(e) = self.buffer_request_body_early(&mut session, &mut ctx).await { + return self + .handle_error(session, &mut ctx, e, "Failed to buffer request body:") + .await; + } + } + if self.inner.allow_spawning_subrequest(&session, &ctx) { session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone())); } @@ -961,6 +1050,167 @@ where None } } + + /// Buffer the entire request body before connecting to upstream. + /// + /// This enables request_body_filter to run BEFORE upstream_peer selection, + /// allowing auth signature verification and content-based routing. + /// + /// Buffering is controlled by the trait method `request_body_buffer_limit()`: + /// - Returns `None`: Skip buffering, stream body to upstream (default) + /// - Returns `Some(max_size)`: Buffer body with size limit enforcement + /// + /// Size limit enforcement: + /// - Content-Length checked first (fail fast before reading) + /// - Accumulated size checked during reading (streaming protection) + /// - Returns HTTP 413 (Payload Too Large) if exceeded + async fn buffer_request_body_early( + &self, + session: &mut Session, + ctx: &mut ::CTX, + ) -> Result<()> + where + SV: ProxyHttp + Send + Sync, + ::CTX: Send + Sync, + { + // Check trait method for opt-in and size limit + let Some(max_size) = self.inner.request_body_buffer_limit(session, ctx) else { + return Ok(()); // No buffering requested + }; + + // Skip if already buffered (e.g., by request_filter) + if session.is_body_buffered() { + return Ok(()); + } + + // Get Content-Length if present (for early size check) + let content_length = session + .downstream_session + .req_header() + .headers + .get(header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + + // Fail fast: check Content-Length before reading + if let Some(cl) = content_length { + if cl > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body too large: Content-Length {} exceeds limit {} bytes", + cl, max_size + ), + ); + } + } + + // Check if there's a body to read (Content-Length > 0 or Transfer-Encoding) + let has_body = content_length.is_some_and(|len| len > 0) + || session + .downstream_session + .req_header() + .headers + .get(header::TRANSFER_ENCODING) + .is_some(); + + if !has_body { + // No body to buffer, mark as done + session.mark_body_buffered(); + return Ok(()); + } + + let mut body_parts: Vec = Vec::new(); + let mut total_size: usize = 0; + + // Read body chunks until end of stream + loop { + // read_body_or_idle(false) means we expect a body (not done yet) + let body_chunk: Option = + session.downstream_session.read_body_or_idle(false).await?; + + match body_chunk { + Some(data) => { + let is_body_done = session.downstream_session.is_body_done(); + + // Call request_body_filter for each chunk + let mut filter_data: Option = Some(data); + session + .downstream_modules_ctx + .request_body_filter(&mut filter_data, is_body_done) + .await?; + self.inner + .request_body_filter(session, &mut filter_data, is_body_done, ctx) + .await?; + + // Accumulate the (possibly filtered) data + if let Some(filtered) = filter_data { + total_size += filtered.len(); + + // Check size limit during accumulation (streaming protection) + if total_size > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body exceeded limit: {} > {} bytes", + total_size, max_size + ), + ); + } + + body_parts.push(filtered); + } + + if is_body_done { + break; + } + } + None => { + // End of body, call filter with end_of_stream=true + let mut filter_data: Option = None; + session + .downstream_modules_ctx + .request_body_filter(&mut filter_data, true) + .await?; + self.inner + .request_body_filter(session, &mut filter_data, true, ctx) + .await?; + + // Collect any final data from the filter + if let Some(filtered) = filter_data { + total_size += filtered.len(); + + // Final size check + if total_size > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body exceeded limit: {} > {} bytes", + total_size, max_size + ), + ); + } + + body_parts.push(filtered); + } + break; + } + } + } + + // Combine all chunks into a single Bytes buffer + if total_size > 0 { + let mut combined = bytes::BytesMut::with_capacity(total_size); + for part in body_parts { + combined.extend_from_slice(&part); + } + session.set_buffered_body(Some(combined.freeze())); + } else { + session.mark_body_buffered(); + } + + Ok(()) + } } /* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649 @@ -1255,3 +1505,79 @@ where Service::new(name.to_string(), proxy) } + +#[cfg(test)] +mod tests { + use super::*; + + // Test Session body buffering accessors + mod body_buffer { + use super::*; + use pingora_core::protocols::http::v1::server::HttpSession; + use tokio_test::io::Builder; + + fn create_test_session() -> Session { + // Create a minimal mock stream for testing using tokio-test Builder. + // Use empty mock since we only test accessor methods, not HTTP parsing. + let mock_io = Builder::new().build(); + let http_session = HttpSession::new(Box::new(mock_io)); + Session::new_h1_with_http_session(http_session) + } + + #[test] + fn test_initial_state() { + let session = create_test_session(); + assert!(!session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + + #[test] + fn test_set_and_get_buffered_body() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body.clone())); + + assert!(session.is_body_buffered()); + assert_eq!(session.get_buffered_body(), Some(&body)); + } + + #[test] + fn test_take_buffered_body() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body.clone())); + let taken = session.take_buffered_body(); + + assert_eq!(taken, Some(body)); + assert!(session.get_buffered_body().is_none()); + // is_body_buffered should remain true after take + assert!(session.is_body_buffered()); + } + + #[test] + fn test_mark_body_buffered() { + let mut session = create_test_session(); + + assert!(!session.is_body_buffered()); + session.mark_body_buffered(); + assert!(session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + + #[test] + fn test_set_none_preserves_buffered_flag() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body)); + assert!(session.is_body_buffered()); + + // Setting None should preserve the buffered flag + session.set_buffered_body(None); + assert!(session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + } +} diff --git a/pingora-proxy/src/proxy_common.rs b/pingora-proxy/src/proxy_common.rs index e1d36f69..19dbd65b 100644 --- a/pingora-proxy/src/proxy_common.rs +++ b/pingora-proxy/src/proxy_common.rs @@ -5,6 +5,8 @@ pub(crate) enum DownstreamStateMachine { Reading, /// no more data to read ReadingFinished, + /// body was pre-buffered before upstream connection, skip all downstream polling + PreBuffered, /// downstream is already errored or closed Errored, } @@ -19,9 +21,10 @@ impl DownstreamStateMachine { } } - // Can call read() to read more data or wait on closing + // Can call read() to read more data or wait on closing. + // PreBuffered skips polling since we already have the complete body. pub fn can_poll(&self) -> bool { - !matches!(self, Self::Errored) + !matches!(self, Self::Errored | Self::PreBuffered) } pub fn is_reading(&self) -> bool { diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 9f04289c..65ec5d82 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -302,9 +302,25 @@ where .await?; } - let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + // Check if body was pre-buffered (before upstream connection) + // If so, use buffered body and skip downstream reading + let pre_buffered_body = session.take_buffered_body(); + let body_was_buffered = session.is_body_buffered() && pre_buffered_body.is_some(); + + // If body was pre-buffered, use PreBuffered state to skip all downstream polling. + // This prevents "Sent data after end of body" errors from Content-Length mismatches. + let mut downstream_state = if body_was_buffered { + DownstreamStateMachine::PreBuffered + } else { + DownstreamStateMachine::new(session.as_mut().is_body_done()) + }; - let buffer = session.as_ref().get_retry_buffer(); + // Use pre-buffered body if available, otherwise check for retry buffer + let buffer = if body_was_buffered { + pre_buffered_body + } else { + session.as_ref().get_retry_buffer() + }; // retry, send buffer if it exists or body empty if buffer.is_some() || session.as_mut().is_body_empty() { @@ -773,16 +789,20 @@ where // affected by the request_body_filter let end_of_body = end_of_body || data.is_none(); - session - .downstream_modules_ctx - .request_body_filter(&mut data, end_of_body) - .await?; + // Skip request_body_filter if body was already pre-buffered and filtered + // (before upstream connection, in buffer_request_body_early) + if !session.is_body_buffered() { + session + .downstream_modules_ctx + .request_body_filter(&mut data, end_of_body) + .await?; - // TODO: request body filter to have info about upgraded status? - // (can also check session.was_upgraded()) - self.inner - .request_body_filter(session, &mut data, end_of_body, ctx) - .await?; + // TODO: request body filter to have info about upgraded status? + // (can also check session.was_upgraded()) + self.inner + .request_body_filter(session, &mut data, end_of_body, ctx) + .await?; + } // the flag to signal to upstream let upstream_end_of_body = end_of_body || data.is_none(); diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 808da5bc..0fccb523 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -297,10 +297,28 @@ where .await?; } - let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + // Check if body was pre-buffered (before upstream connection) + // If so, use buffered body and skip downstream reading + let pre_buffered_body = session.take_buffered_body(); + let body_was_buffered = session.is_body_buffered() && pre_buffered_body.is_some(); + + // If body was pre-buffered, use PreBuffered state to skip all downstream polling. + // This prevents "Sent data after end of body" errors from Content-Length mismatches. + let mut downstream_state = if body_was_buffered { + DownstreamStateMachine::PreBuffered + } else { + DownstreamStateMachine::new(session.as_mut().is_body_done()) + }; + + // Use pre-buffered body if available, otherwise check for retry buffer + let buffer = if body_was_buffered { + pre_buffered_body + } else { + session.as_mut().get_retry_buffer() + }; // retry, send buffer if it exists - if let Some(buffer) = session.as_mut().get_retry_buffer() { + if let Some(buffer) = buffer { self.send_body_to2( session, Some(buffer), @@ -720,14 +738,18 @@ where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { - session - .downstream_modules_ctx - .request_body_filter(&mut data, end_of_body) - .await?; + // Skip request_body_filter if body was already pre-buffered and filtered + // (before upstream connection, in buffer_request_body_early) + if !session.is_body_buffered() { + session + .downstream_modules_ctx + .request_body_filter(&mut data, end_of_body) + .await?; - self.inner - .request_body_filter(session, &mut data, end_of_body, ctx) - .await?; + self.inner + .request_body_filter(session, &mut data, end_of_body, ctx) + .await?; + } /* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter. * Although there is no harm writing empty byte to h2, unlike h1, we ignore it diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index f243a8bf..0522b61e 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -196,6 +196,30 @@ pub trait ProxyHttp { Ok(true) } + /// Determine whether to buffer the entire request body before connecting to upstream. + /// + /// This is called after [`Self::early_request_filter()`] but before [`Self::request_filter()`] + /// and [`Self::upstream_peer()`]. The body is buffered in `Session::buffered_request_body` + /// and can be accessed via [`Session::get_buffered_body()`]. + /// + /// # Returns + /// - `None`: Don't buffer, stream body to upstream (default) + /// - `Some(max_size)`: Buffer body with size limit, return 413 error if exceeded + /// + /// # Use Cases + /// - Auth signature verification (need full body before auth decision) + /// - Content-based routing decisions + /// - Body transformation before upstream selection + /// + /// # Size Limit Enforcement + /// When returning `Some(max_size)`: + /// - Content-Length header is checked first (fail fast before reading) + /// - Body size is checked during accumulation (streaming protection) + /// - If exceeded, returns HTTP 413 (Payload Too Large) + fn request_body_buffer_limit(&self, _session: &Session, _ctx: &Self::CTX) -> Option { + None // Default: stream body to upstream + } + /// Decide if the response is cacheable fn response_cache_filter( &self, From b50ad3f6a30bb73e1ef711e1780918a07d9d6fe0 Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Mon, 16 Feb 2026 11:41:15 -0800 Subject: [PATCH 2/8] Add body_routing example for early request body buffering --- pingora-proxy/examples/body_routing.rs | 118 +++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 pingora-proxy/examples/body_routing.rs diff --git a/pingora-proxy/examples/body_routing.rs b/pingora-proxy/examples/body_routing.rs new file mode 100644 index 00000000..54f62cbc --- /dev/null +++ b/pingora-proxy/examples/body_routing.rs @@ -0,0 +1,118 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example: early request body buffering for routing and body mutation. +//! +//! Demonstrates two patterns enabled by `request_body_buffer_limit()`: +//! +//! 1. **Peek**: read the buffered body with `get_buffered_body()` to make +//! routing decisions without modifying the body. +//! +//! 2. **Mutate**: read the buffered body, transform it, and replace it with +//! `set_buffered_body()` so the upstream receives the modified version. +//! +//! Uses httpbin.org as the upstream — its `/post` endpoint echoes back the +//! request body, so you can verify mutations in the response. + +use async_trait::async_trait; +use bytes::Bytes; +use log::info; + +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_proxy::{ProxyHttp, Session}; + +pub struct MyProxy; + +pub struct MyCtx { + route_beta: bool, +} + +#[async_trait] +impl ProxyHttp for MyProxy { + type CTX = MyCtx; + fn new_ctx(&self) -> Self::CTX { + MyCtx { route_beta: false } + } + + /// Opt in to body buffering for POST requests up to 4KB. + fn request_body_buffer_limit(&self, session: &Session, _ctx: &Self::CTX) -> Option { + if session.req_header().method == http::Method::POST { + Some(4096) + } else { + None + } + } + + async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result { + // By the time request_filter runs, the body is already buffered. + if let Some(body) = session.get_buffered_body() { + let text = std::str::from_utf8(body).unwrap_or(""); + + // Peek: inspect body content for routing decisions. + ctx.route_beta = text.contains("beta"); + info!("peek: route_beta={}", ctx.route_beta); + + // Mutate: wrap the original body in an envelope before forwarding. + let wrapped = format!(r#"{{"envelope":true,"original":{text}}}"#); + info!("mutate: {wrapped}"); + session.set_buffered_body(Some(Bytes::from(wrapped.clone()))); + + // Update Content-Length to match the new body size. + session + .req_header_mut() + .insert_header(http::header::CONTENT_LENGTH, wrapped.len().to_string())?; + } + Ok(false) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + // httpbin.org echoes the request body back in its JSON response, + // so we can verify the mutation in the curl output. + let peer = Box::new(HttpPeer::new( + ("httpbin.org", 443), + true, + "httpbin.org".to_string(), + )); + Ok(peer) + } +} + +// RUST_LOG=INFO cargo run --features openssl --example body_routing +// +// Peek + mutate — body is inspected for routing then wrapped in an envelope: +// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "beta"}' +// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "default"}' +// +// No buffering — GET requests pass through unchanged: +// curl 127.0.0.1:6193/get -H "Host: httpbin.org" +fn main() { + env_logger::init(); + + let opt = Opt::parse_args(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, MyProxy); + my_proxy.add_tcp("0.0.0.0:6193"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} From 42e909d1eabfa489fdc537025e66017835522f0f Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Tue, 17 Feb 2026 16:07:52 -0800 Subject: [PATCH 3/8] Mark body read errors as downstream in buffer_request_body_early --- pingora-proxy/src/lib.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 6baa4e46..fdc4f92b 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -1125,9 +1125,19 @@ where // Read body chunks until end of stream loop { - // read_body_or_idle(false) means we expect a body (not done yet) + // read_body_or_idle(false) means we expect a body (not done yet). + // Only known client-side read failures are explicitly marked downstream. + // Leave other errors unchanged so we do not mask internal failures. let body_chunk: Option = - session.downstream_session.read_body_or_idle(false).await?; + match session.downstream_session.read_body_or_idle(false).await { + Ok(chunk) => chunk, + Err(mut e) => { + if matches!(e.etype(), ConnectionClosed | ReadTimedout) { + e.as_down(); + } + return Err(e); + } + }; match body_chunk { Some(data) => { From 56f906516ec3cb7fe2e925f46fa10774ceddb0eb Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Wed, 18 Feb 2026 17:12:12 -0800 Subject: [PATCH 4/8] Simplify early body buffering read errors to use into_down() --- pingora-proxy/src/lib.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index fdc4f92b..9767da91 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -1126,17 +1126,10 @@ where // Read body chunks until end of stream loop { // read_body_or_idle(false) means we expect a body (not done yet). - // Only known client-side read failures are explicitly marked downstream. - // Leave other errors unchanged so we do not mask internal failures. let body_chunk: Option = match session.downstream_session.read_body_or_idle(false).await { Ok(chunk) => chunk, - Err(mut e) => { - if matches!(e.etype(), ConnectionClosed | ReadTimedout) { - e.as_down(); - } - return Err(e); - } + Err(e) => return Err(e.into_down()), }; match body_chunk { @@ -1451,7 +1444,7 @@ use pingora_core::services::listening::Service; /// // In your custom accept loop: /// loop { /// let (stream, addr) = listener.accept().await?; -/// +/// /// // Peek SNI, decide routing... /// if should_terminate_tls { /// let tls_stream = my_acceptor.accept(stream).await?; From 678d3d49132f6c8f32eb828db8d7199868200794 Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Mon, 16 Mar 2026 23:15:52 -0700 Subject: [PATCH 5/8] fix: H2 body detection + dedup read loop in buffer_request_body_early - Fix HTTP/2 body detection: replace has_body heuristic (Content-Length or Transfer-Encoding) with explicit Content-Length == 0 skip. H2 POST without Content-Length was incorrectly treated as bodyless. - Collapse two near-identical match arms (Some/None) into a unified flow using end_of_body flag, removing ~40 lines of duplication. Addresses review comments 1 and 3 from @PiotrSikora. --- pingora-proxy/src/lib.rs | 108 ++++++++++++--------------------------- 1 file changed, 34 insertions(+), 74 deletions(-) diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index b8f739ce..7e0289fb 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -1127,17 +1127,10 @@ where } } - // Check if there's a body to read (Content-Length > 0 or Transfer-Encoding) - let has_body = content_length.is_some_and(|len| len > 0) - || session - .downstream_session - .req_header() - .headers - .get(header::TRANSFER_ENCODING) - .is_some(); - - if !has_body { - // No body to buffer, mark as done + // Content-Length: 0 explicitly means no body. For all other cases + // (no Content-Length, Transfer-Encoding, HTTP/2 without Content-Length), + // attempt to read — read_body_or_idle returns None immediately if empty. + if content_length == Some(0) { session.mark_body_buffered(); return Ok(()); } @@ -1147,79 +1140,46 @@ where // Read body chunks until end of stream loop { - // read_body_or_idle(false) means we expect a body (not done yet). let body_chunk: Option = match session.downstream_session.read_body_or_idle(false).await { Ok(chunk) => chunk, Err(e) => return Err(e.into_down()), }; - match body_chunk { - Some(data) => { - let is_body_done = session.downstream_session.is_body_done(); - - // Call request_body_filter for each chunk - let mut filter_data: Option = Some(data); - session - .downstream_modules_ctx - .request_body_filter(&mut filter_data, is_body_done) - .await?; - self.inner - .request_body_filter(session, &mut filter_data, is_body_done, ctx) - .await?; - - // Accumulate the (possibly filtered) data - if let Some(filtered) = filter_data { - total_size += filtered.len(); - - // Check size limit during accumulation (streaming protection) - if total_size > max_size { - return Error::e_explain( - HTTPStatus(413), - format!( - "Request body exceeded limit: {} > {} bytes", - total_size, max_size - ), - ); - } - - body_parts.push(filtered); - } + // Determine end-of-stream: None means no more data, or downstream + // reports done after delivering the final chunk. + let end_of_body = body_chunk.is_none() || session.downstream_session.is_body_done(); - if is_body_done { - break; - } + // Run body filters on the chunk (or None for end-of-stream signal) + let mut filter_data = body_chunk; + session + .downstream_modules_ctx + .request_body_filter(&mut filter_data, end_of_body) + .await?; + self.inner + .request_body_filter(session, &mut filter_data, end_of_body, ctx) + .await?; + + // Accumulate the (possibly filtered) data + if let Some(filtered) = filter_data { + total_size += filtered.len(); + + // Check size limit during accumulation (streaming protection) + if total_size > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body exceeded limit: {} > {} bytes", + total_size, max_size + ), + ); } - None => { - // End of body, call filter with end_of_stream=true - let mut filter_data: Option = None; - session - .downstream_modules_ctx - .request_body_filter(&mut filter_data, true) - .await?; - self.inner - .request_body_filter(session, &mut filter_data, true, ctx) - .await?; - // Collect any final data from the filter - if let Some(filtered) = filter_data { - total_size += filtered.len(); - - // Final size check - if total_size > max_size { - return Error::e_explain( - HTTPStatus(413), - format!( - "Request body exceeded limit: {} > {} bytes", - total_size, max_size - ), - ); - } + body_parts.push(filtered); + } - body_parts.push(filtered); - } - break; - } + if end_of_body { + break; } } From 0bfe603b5e8e2af36f3786acf2d9a86b7d09319e Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Mon, 16 Mar 2026 23:54:18 -0700 Subject: [PATCH 6/8] Add early_request_body_filter to fix callback ordering violation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New trait method runs per-chunk during buffer_request_body_early(), before request_header_filter — avoids calling request_body_filter out of phase order. - Remove is_body_buffered() skip guards from proxy_h1/h2 — normal request_body_filter runs unguarded during upstream forwarding. - Update body_routing example to demonstrate the streaming callback. - Add early_request_body_filter to phase docs and mermaid charts. Addresses review comment 2 from @PiotrSikora. --- docs/user_guide/phase.md | 12 ++++-- docs/user_guide/phase_chart.md | 3 +- pingora-proxy/examples/body_routing.rs | 55 +++++++++++++++++++++++--- pingora-proxy/src/lib.rs | 11 ++---- pingora-proxy/src/proxy_h1.rs | 22 +++++------ pingora-proxy/src/proxy_h2.rs | 18 ++++----- pingora-proxy/src/proxy_trait.rs | 23 +++++++++++ 7 files changed, 103 insertions(+), 41 deletions(-) diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index 3c80f913..3a610122 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -17,7 +17,8 @@ Pingora-proxy allows users to insert arbitrary logic into the life of a request. ```mermaid graph TD; start("new request")-->early_request_filter; - early_request_filter-->request_filter; + early_request_filter-->early_request_body_filter; + early_request_body_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; @@ -59,11 +60,16 @@ This is the first phase of every request. This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. +### `early_request_body_filter()` +This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. + +Use this for processing that must happen before header filters run, such as streaming decompression. The buffered body is then available via `session.get_buffered_body()` in `request_filter()` for routing decisions, auth signature verification, or body mutation. + ### `request_filter()` -This phase is usually for validating request inputs, rate limiting, and initializing context. +This phase is usually for validating request inputs, rate limiting, and initializing context. When early body buffering is enabled, the full body is already available via `session.get_buffered_body()`. ### `request_body_filter()` -This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. +This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. This runs during the upstream forwarding phase, after `upstream_peer()` and connection establishment. ### `proxy_upstream_filter()` This phase determines if we should continue to the upstream to serve a response. If we short-circuit, a 502 is returned by default, but a different response can be implemented. diff --git a/docs/user_guide/phase_chart.md b/docs/user_guide/phase_chart.md index 94988724..73adae14 100644 --- a/docs/user_guide/phase_chart.md +++ b/docs/user_guide/phase_chart.md @@ -2,7 +2,8 @@ Pingora proxy phases without caching ```mermaid graph TD; start("new request")-->early_request_filter; - early_request_filter-->request_filter; + early_request_filter-->early_request_body_filter; + early_request_body_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; diff --git a/pingora-proxy/examples/body_routing.rs b/pingora-proxy/examples/body_routing.rs index 54f62cbc..f4a5295a 100644 --- a/pingora-proxy/examples/body_routing.rs +++ b/pingora-proxy/examples/body_routing.rs @@ -14,13 +14,18 @@ //! Example: early request body buffering for routing and body mutation. //! -//! Demonstrates two patterns enabled by `request_body_buffer_limit()`: +//! Demonstrates three patterns enabled by `request_body_buffer_limit()` and +//! `early_request_body_filter()`: //! -//! 1. **Peek**: read the buffered body with `get_buffered_body()` to make -//! routing decisions without modifying the body. +//! 1. **Stream**: process each body chunk as it arrives in +//! `early_request_body_filter()` — before any header-phase filters run. +//! The example logs each chunk's byte count to show the streaming nature. //! -//! 2. **Mutate**: read the buffered body, transform it, and replace it with -//! `set_buffered_body()` so the upstream receives the modified version. +//! 2. **Peek**: read the assembled buffered body with `get_buffered_body()` +//! in `request_filter()` to make routing decisions. +//! +//! 3. **Mutate**: replace the buffered body with `set_buffered_body()` so +//! the upstream receives the modified version. //! //! Uses httpbin.org as the upstream — its `/post` endpoint echoes back the //! request body, so you can verify mutations in the response. @@ -39,13 +44,19 @@ pub struct MyProxy; pub struct MyCtx { route_beta: bool, + chunks_received: usize, + bytes_received: usize, } #[async_trait] impl ProxyHttp for MyProxy { type CTX = MyCtx; fn new_ctx(&self) -> Self::CTX { - MyCtx { route_beta: false } + MyCtx { + route_beta: false, + chunks_received: 0, + bytes_received: 0, + } } /// Opt in to body buffering for POST requests up to 4KB. @@ -57,6 +68,35 @@ impl ProxyHttp for MyProxy { } } + /// Stream: process each body chunk as it arrives during early buffering. + /// + /// This fires per-chunk, BEFORE request_filter sees the assembled body. + async fn early_request_body_filter( + &self, + _session: &mut Session, + body: &mut Option, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result<()> { + if let Some(data) = body { + ctx.chunks_received += 1; + ctx.bytes_received += data.len(); + info!( + "early_request_body_filter: chunk {} ({} bytes, {} total)", + ctx.chunks_received, + data.len(), + ctx.bytes_received + ); + } + if end_of_stream { + info!( + "early_request_body_filter: done — {} chunks, {} bytes total", + ctx.chunks_received, ctx.bytes_received + ); + } + Ok(()) + } + async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result { // By the time request_filter runs, the body is already buffered. if let Some(body) = session.get_buffered_body() { @@ -101,6 +141,9 @@ impl ProxyHttp for MyProxy { // curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "beta"}' // curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "default"}' // +// Multi-chunk — use chunked transfer encoding to see early_request_body_filter fire per-chunk: +// printf 'POST /post HTTP/1.1\r\nHost: httpbin.org\r\nTransfer-Encoding: chunked\r\n\r\na\r\n{"part":1}\r\na\r\n{"part":2}\r\n0\r\n\r\n' | nc 127.0.0.1 6193 +// // No buffering — GET requests pass through unchanged: // curl 127.0.0.1:6193/get -H "Host: httpbin.org" fn main() { diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 7e0289fb..a3611f53 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -1075,7 +1075,7 @@ where /// Buffer the entire request body before connecting to upstream. /// - /// This enables request_body_filter to run BEFORE upstream_peer selection, + /// This enables early_request_body_filter to run BEFORE upstream_peer selection, /// allowing auth signature verification and content-based routing. /// /// Buffering is controlled by the trait method `request_body_buffer_limit()`: @@ -1150,14 +1150,11 @@ where // reports done after delivering the final chunk. let end_of_body = body_chunk.is_none() || session.downstream_session.is_body_done(); - // Run body filters on the chunk (or None for end-of-stream signal) + // Run early body filter (not module filters — modules haven't + // had their header filter phase yet). let mut filter_data = body_chunk; - session - .downstream_modules_ctx - .request_body_filter(&mut filter_data, end_of_body) - .await?; self.inner - .request_body_filter(session, &mut filter_data, end_of_body, ctx) + .early_request_body_filter(session, &mut filter_data, end_of_body, ctx) .await?; // Accumulate the (possibly filtered) data diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 65ec5d82..b2d2907b 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -789,20 +789,16 @@ where // affected by the request_body_filter let end_of_body = end_of_body || data.is_none(); - // Skip request_body_filter if body was already pre-buffered and filtered - // (before upstream connection, in buffer_request_body_early) - if !session.is_body_buffered() { - session - .downstream_modules_ctx - .request_body_filter(&mut data, end_of_body) - .await?; + session + .downstream_modules_ctx + .request_body_filter(&mut data, end_of_body) + .await?; - // TODO: request body filter to have info about upgraded status? - // (can also check session.was_upgraded()) - self.inner - .request_body_filter(session, &mut data, end_of_body, ctx) - .await?; - } + // TODO: request body filter to have info about upgraded status? + // (can also check session.was_upgraded()) + self.inner + .request_body_filter(session, &mut data, end_of_body, ctx) + .await?; // the flag to signal to upstream let upstream_end_of_body = end_of_body || data.is_none(); diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 0fccb523..400499b8 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -738,18 +738,14 @@ where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { - // Skip request_body_filter if body was already pre-buffered and filtered - // (before upstream connection, in buffer_request_body_early) - if !session.is_body_buffered() { - session - .downstream_modules_ctx - .request_body_filter(&mut data, end_of_body) - .await?; + session + .downstream_modules_ctx + .request_body_filter(&mut data, end_of_body) + .await?; - self.inner - .request_body_filter(session, &mut data, end_of_body, ctx) - .await?; - } + self.inner + .request_body_filter(session, &mut data, end_of_body, ctx) + .await?; /* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter. * Although there is no harm writing empty byte to h2, unlike h1, we ignore it diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 93979cc3..f6609c73 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -230,6 +230,29 @@ pub trait ProxyHttp { None // Default: stream body to upstream } + /// Handle each chunk of request body during early buffering. + /// + /// This is called during [`buffer_request_body_early()`] for each body chunk, **before** + /// [`Self::request_filter()`] and [`Self::upstream_peer()`]. Use this for processing + /// that must happen before header filters run (e.g., streaming decompression). + /// + /// Unlike [`Self::request_body_filter()`], this callback explicitly runs before any + /// header-phase filters, so it should not depend on state set by [`Self::request_filter()`]. + /// + /// The normal [`Self::request_body_filter()`] still runs during upstream body forwarding. + async fn early_request_body_filter( + &self, + _session: &mut Session, + _body: &mut Option, + _end_of_stream: bool, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + Ok(()) + } + /// Decide if the response is cacheable fn response_cache_filter( &self, From 2a75cf7a9f9cd8f29197ff6cfec7dac772ccf370 Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Tue, 17 Mar 2026 10:41:46 -0700 Subject: [PATCH 7/8] Rename request_body_buffer_limit, tuple return in h1/h2, comment style MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename request_body_buffer_limit → early_request_body_buffer_limit for consistency with early_request_body_filter (comment 6) - Collapse downstream_state + buffer into tuple return in proxy_h1 and proxy_h2 (comment 5) - Align inline comments with existing Cloudflare style - Update body_routing example and phase docs Addresses review comments 5 and 6 from @PiotrSikora. --- docs/user_guide/phase.md | 2 +- pingora-proxy/examples/body_routing.rs | 8 +++-- pingora-proxy/src/lib.rs | 42 +++++++++++--------------- pingora-proxy/src/proxy_h1.rs | 28 +++++++---------- pingora-proxy/src/proxy_h2.rs | 28 +++++++---------- pingora-proxy/src/proxy_trait.rs | 6 +++- 6 files changed, 52 insertions(+), 62 deletions(-) diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index 3a610122..2e357211 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -61,7 +61,7 @@ This is the first phase of every request. This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. ### `early_request_body_filter()` -This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. +This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `early_request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. Use this for processing that must happen before header filters run, such as streaming decompression. The buffered body is then available via `session.get_buffered_body()` in `request_filter()` for routing decisions, auth signature verification, or body mutation. diff --git a/pingora-proxy/examples/body_routing.rs b/pingora-proxy/examples/body_routing.rs index f4a5295a..85baf1b9 100644 --- a/pingora-proxy/examples/body_routing.rs +++ b/pingora-proxy/examples/body_routing.rs @@ -14,7 +14,7 @@ //! Example: early request body buffering for routing and body mutation. //! -//! Demonstrates three patterns enabled by `request_body_buffer_limit()` and +//! Demonstrates three patterns enabled by `early_request_body_buffer_limit()` and //! `early_request_body_filter()`: //! //! 1. **Stream**: process each body chunk as it arrives in @@ -60,7 +60,11 @@ impl ProxyHttp for MyProxy { } /// Opt in to body buffering for POST requests up to 4KB. - fn request_body_buffer_limit(&self, session: &Session, _ctx: &Self::CTX) -> Option { + fn early_request_body_buffer_limit( + &self, + session: &Session, + _ctx: &Self::CTX, + ) -> Option { if session.req_header().method == http::Method::POST { Some(4096) } else { diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index a3611f53..4199931b 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -687,7 +687,7 @@ impl Session { /// Returns a reference to the buffered request body, if any. /// /// The body is buffered by `buffer_request_body_early()` when the trait method - /// `request_body_buffer_limit()` returns `Some(max_size)`. + /// `early_request_body_buffer_limit()` returns `Some(max_size)`. pub fn get_buffered_body(&self) -> Option<&Bytes> { self.buffered_request_body.as_ref() } @@ -833,11 +833,8 @@ where .await; } - // Early body buffering: read full request body BEFORE request_filter. - // This allows request_filter to access the body for auth signature verification - // and other content-based decisions. The body is buffered based on the size limit - // returned by request_body_buffer_limit() (set in early_request_filter via route matching). - // See: https://github.com/cloudflare/pingora/issues/780 + // early body buffering: read full request body before request_filter + // see https://github.com/cloudflare/pingora/issues/780 if !session.is_body_buffered() { if let Err(e) = self.buffer_request_body_early(&mut session, &mut ctx).await { return self @@ -1078,7 +1075,7 @@ where /// This enables early_request_body_filter to run BEFORE upstream_peer selection, /// allowing auth signature verification and content-based routing. /// - /// Buffering is controlled by the trait method `request_body_buffer_limit()`: + /// Buffering is controlled by the trait method `early_request_body_buffer_limit()`: /// - Returns `None`: Skip buffering, stream body to upstream (default) /// - Returns `Some(max_size)`: Buffer body with size limit enforcement /// @@ -1095,17 +1092,16 @@ where SV: ProxyHttp + Send + Sync, ::CTX: Send + Sync, { - // Check trait method for opt-in and size limit - let Some(max_size) = self.inner.request_body_buffer_limit(session, ctx) else { - return Ok(()); // No buffering requested + // check trait method for opt-in and size limit + let Some(max_size) = self.inner.early_request_body_buffer_limit(session, ctx) else { + return Ok(()); }; - // Skip if already buffered (e.g., by request_filter) + // skip if already buffered if session.is_body_buffered() { return Ok(()); } - // Get Content-Length if present (for early size check) let content_length = session .downstream_session .req_header() @@ -1114,7 +1110,7 @@ where .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); - // Fail fast: check Content-Length before reading + // fail fast: reject before reading if Content-Length exceeds limit if let Some(cl) = content_length { if cl > max_size { return Error::e_explain( @@ -1127,9 +1123,9 @@ where } } - // Content-Length: 0 explicitly means no body. For all other cases - // (no Content-Length, Transfer-Encoding, HTTP/2 without Content-Length), - // attempt to read — read_body_or_idle returns None immediately if empty. + // Content-Length: 0 means no body; for all other cases (no Content-Length, + // Transfer-Encoding, HTTP/2) attempt to read. read_body_or_idle returns + // None immediately if there's nothing. if content_length == Some(0) { session.mark_body_buffered(); return Ok(()); @@ -1138,7 +1134,7 @@ where let mut body_parts: Vec = Vec::new(); let mut total_size: usize = 0; - // Read body chunks until end of stream + // read body chunks until end of stream loop { let body_chunk: Option = match session.downstream_session.read_body_or_idle(false).await { @@ -1146,22 +1142,20 @@ where Err(e) => return Err(e.into_down()), }; - // Determine end-of-stream: None means no more data, or downstream - // reports done after delivering the final chunk. + // end of stream: None means no more data, or downstream reports done let end_of_body = body_chunk.is_none() || session.downstream_session.is_body_done(); - // Run early body filter (not module filters — modules haven't - // had their header filter phase yet). + // run early body filter (not module filters, they haven't run header filter yet) let mut filter_data = body_chunk; self.inner .early_request_body_filter(session, &mut filter_data, end_of_body, ctx) .await?; - // Accumulate the (possibly filtered) data + // accumulate the (possibly filtered) data if let Some(filtered) = filter_data { total_size += filtered.len(); - // Check size limit during accumulation (streaming protection) + // check size limit during accumulation if total_size > max_size { return Error::e_explain( HTTPStatus(413), @@ -1180,7 +1174,7 @@ where } } - // Combine all chunks into a single Bytes buffer + // combine all chunks into a single Bytes buffer if total_size > 0 { let mut combined = bytes::BytesMut::with_capacity(total_size); for part in body_parts { diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index b2d2907b..05bef547 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -302,24 +302,18 @@ where .await?; } - // Check if body was pre-buffered (before upstream connection) - // If so, use buffered body and skip downstream reading - let pre_buffered_body = session.take_buffered_body(); - let body_was_buffered = session.is_body_buffered() && pre_buffered_body.is_some(); - - // If body was pre-buffered, use PreBuffered state to skip all downstream polling. - // This prevents "Sent data after end of body" errors from Content-Length mismatches. - let mut downstream_state = if body_was_buffered { - DownstreamStateMachine::PreBuffered - } else { - DownstreamStateMachine::new(session.as_mut().is_body_done()) - }; - - // Use pre-buffered body if available, otherwise check for retry buffer - let buffer = if body_was_buffered { - pre_buffered_body + // determine initial downstream state and body buffer + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + let (mut downstream_state, buffer) = if let Some(body) = session + .take_buffered_body() + .filter(|_| session.is_body_buffered()) + { + (DownstreamStateMachine::PreBuffered, Some(body)) } else { - session.as_ref().get_retry_buffer() + ( + DownstreamStateMachine::new(session.as_mut().is_body_done()), + session.as_ref().get_retry_buffer(), + ) }; // retry, send buffer if it exists or body empty diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 400499b8..b40ac53f 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -297,24 +297,18 @@ where .await?; } - // Check if body was pre-buffered (before upstream connection) - // If so, use buffered body and skip downstream reading - let pre_buffered_body = session.take_buffered_body(); - let body_was_buffered = session.is_body_buffered() && pre_buffered_body.is_some(); - - // If body was pre-buffered, use PreBuffered state to skip all downstream polling. - // This prevents "Sent data after end of body" errors from Content-Length mismatches. - let mut downstream_state = if body_was_buffered { - DownstreamStateMachine::PreBuffered - } else { - DownstreamStateMachine::new(session.as_mut().is_body_done()) - }; - - // Use pre-buffered body if available, otherwise check for retry buffer - let buffer = if body_was_buffered { - pre_buffered_body + // determine initial downstream state and body buffer + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + let (mut downstream_state, buffer) = if let Some(body) = session + .take_buffered_body() + .filter(|_| session.is_body_buffered()) + { + (DownstreamStateMachine::PreBuffered, Some(body)) } else { - session.as_mut().get_retry_buffer() + ( + DownstreamStateMachine::new(session.as_mut().is_body_done()), + session.as_mut().get_retry_buffer(), + ) }; // retry, send buffer if it exists diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index f6609c73..61625cd4 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -226,7 +226,11 @@ pub trait ProxyHttp { /// - Content-Length header is checked first (fail fast before reading) /// - Body size is checked during accumulation (streaming protection) /// - If exceeded, returns HTTP 413 (Payload Too Large) - fn request_body_buffer_limit(&self, _session: &Session, _ctx: &Self::CTX) -> Option { + fn early_request_body_buffer_limit( + &self, + _session: &Session, + _ctx: &Self::CTX, + ) -> Option { None // Default: stream body to upstream } From b2524b9272b1d83206293b0f8f85d776656c876c Mon Sep 17 00:00:00 2001 From: Cody Carlsen Date: Mon, 4 May 2026 10:11:40 -0700 Subject: [PATCH 8/8] Gate early_body_buffer behind cargo feature --- docs/user_guide/phase.md | 2 +- pingora-proxy/Cargo.toml | 5 +++++ pingora-proxy/src/lib.rs | 17 +++++++++++++++-- pingora-proxy/src/proxy_common.rs | 13 ++++++++++--- pingora-proxy/src/proxy_h1.rs | 8 +++++++- pingora-proxy/src/proxy_h2.rs | 8 +++++++- pingora-proxy/src/proxy_trait.rs | 6 ++++++ pingora/Cargo.toml | 7 ++++++- 8 files changed, 57 insertions(+), 9 deletions(-) diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index 322ce788..982305c1 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -62,7 +62,7 @@ This is the first phase of every request. This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. ### `early_request_body_filter()` -This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `early_request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. +This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `early_request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. Requires the `early_body_buffer` cargo feature. Use this for processing that must happen before header filters run, such as streaming decompression. The buffered body is then available via `session.get_buffered_body()` in `request_filter()` for routing decisions, auth signature verification, or body mutation. diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index e1cc1cbb..0aa349b7 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -76,11 +76,16 @@ sentry = ["pingora-core/sentry"] upstream_modules = [] connection_filter = ["pingora-core/connection_filter"] trace = ["pingora-cache/trace"] +early_body_buffer = [] [[example]] name = "connection_filter" required-features = ["connection_filter"] +[[example]] +name = "body_routing" +required-features = ["early_body_buffer"] + # or locally cargo doc --config "build.rustdocflags='--cfg doc_async_trait'" [package.metadata.docs.rs] rustdoc-args = ["--cfg", "doc_async_trait"] diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 5601c490..ea6ebc6c 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -497,9 +497,11 @@ pub struct Session { /// Request body buffered early (before upstream connection) for auth/routing decisions. /// When set, body forwarding will use this instead of re-reading from downstream. /// Use accessor methods: `get_buffered_body()`, `take_buffered_body()`, `set_buffered_body()`. + #[cfg(feature = "early_body_buffer")] buffered_request_body: Option, /// Whether body has been fully consumed for buffering. /// Use accessor: `is_body_buffered()`. + #[cfg(feature = "early_body_buffer")] body_buffered: bool, } @@ -525,7 +527,9 @@ impl Session { upstream_body_bytes_received: 0, upstream_write_pending_time: Duration::ZERO, shutdown_flag, + #[cfg(feature = "early_body_buffer")] buffered_request_body: None, + #[cfg(feature = "early_body_buffer")] body_buffered: false, } } @@ -814,6 +818,7 @@ impl Session { /// /// The body is buffered by `buffer_request_body_early()` when the trait method /// `early_request_body_buffer_limit()` returns `Some(max_size)`. + #[cfg(feature = "early_body_buffer")] pub fn get_buffered_body(&self) -> Option<&Bytes> { self.buffered_request_body.as_ref() } @@ -821,6 +826,7 @@ impl Session { /// Takes ownership of the buffered request body, leaving `None` in its place. /// /// Use this when forwarding the body to upstream - takes the body once for sending. + #[cfg(feature = "early_body_buffer")] pub fn take_buffered_body(&mut self) -> Option { self.buffered_request_body.take() } @@ -829,6 +835,7 @@ impl Session { /// /// This is called by `buffer_request_body_early()` after reading the full body. /// Also useful for app code that wants to replace the body (e.g., decompression). + #[cfg(feature = "early_body_buffer")] pub fn set_buffered_body(&mut self, body: Option) { self.body_buffered = body.is_some() || self.body_buffered; self.buffered_request_body = body; @@ -838,6 +845,7 @@ impl Session { /// /// When `true`, the body has been fully read and is available via `get_buffered_body()`, /// or the request has no body. Body forwarding will skip re-reading from downstream. + #[cfg(feature = "early_body_buffer")] pub fn is_body_buffered(&self) -> bool { self.body_buffered } @@ -846,12 +854,13 @@ impl Session { /// /// Used when the request has no body (no Content-Length or Transfer-Encoding), /// to prevent `buffer_request_body_early()` from attempting to read. + #[cfg(feature = "early_body_buffer")] pub fn mark_body_buffered(&mut self) { self.body_buffered = true; } /// Creates a Session from an H1 HttpSession (for testing only). - #[cfg(test)] + #[cfg(all(test, feature = "early_body_buffer"))] pub fn new_h1_with_http_session( http_session: pingora_core::protocols::http::v1::server::HttpSession, ) -> Self { @@ -869,6 +878,8 @@ impl Session { subrequest_ctx: None, subrequest_spawner: None, downstream_modules_ctx: HttpModuleCtx::empty(), + #[cfg(feature = "upstream_modules")] + upstream_modules_ctx: HttpModuleCtx::empty(), upstream_body_bytes_received: 0, upstream_write_pending_time: Duration::ZERO, shutdown_flag, @@ -961,6 +972,7 @@ where // early body buffering: read full request body before request_filter // see https://github.com/cloudflare/pingora/issues/780 + #[cfg(feature = "early_body_buffer")] if !session.is_body_buffered() { if let Err(e) = self.buffer_request_body_early(&mut session, &mut ctx).await { return self @@ -1215,6 +1227,7 @@ where /// - Content-Length checked first (fail fast before reading) /// - Accumulated size checked during reading (streaming protection) /// - Returns HTTP 413 (Payload Too Large) if exceeded + #[cfg(feature = "early_body_buffer")] async fn buffer_request_body_early( &self, session: &mut Session, @@ -1746,7 +1759,7 @@ where } } -#[cfg(test)] +#[cfg(all(test, feature = "early_body_buffer"))] mod tests { use super::*; diff --git a/pingora-proxy/src/proxy_common.rs b/pingora-proxy/src/proxy_common.rs index 61990d94..a5f808ed 100644 --- a/pingora-proxy/src/proxy_common.rs +++ b/pingora-proxy/src/proxy_common.rs @@ -20,6 +20,7 @@ pub(crate) enum DownstreamStateMachine { /// no more data to read ReadingFinished, /// body was pre-buffered before upstream connection, skip all downstream polling + #[cfg(feature = "early_body_buffer")] PreBuffered, /// downstream is already errored or closed Errored, @@ -35,10 +36,16 @@ impl DownstreamStateMachine { } } - // Can call read() to read more data or wait on closing. - // PreBuffered skips polling since we already have the complete body. + // Can call read() to read more data or wait on closing pub fn can_poll(&self) -> bool { - !matches!(self, Self::Errored | Self::PreBuffered) + #[cfg(feature = "early_body_buffer")] + { + !matches!(self, Self::Errored | Self::PreBuffered) + } + #[cfg(not(feature = "early_body_buffer"))] + { + !matches!(self, Self::Errored) + } } pub fn is_reading(&self) -> bool { diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 9edc2bea..2e0de50c 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -379,8 +379,14 @@ where .await?; } - // determine initial downstream state and body buffer + #[cfg(not(feature = "early_body_buffer"))] + let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + + #[cfg(not(feature = "early_body_buffer"))] + let buffer = session.as_ref().get_retry_buffer(); + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + #[cfg(feature = "early_body_buffer")] let (mut downstream_state, buffer) = if let Some(body) = session .take_buffered_body() .filter(|_| session.is_body_buffered()) diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 71682243..98ca5e12 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -383,8 +383,14 @@ where .await?; } - // determine initial downstream state and body buffer + #[cfg(not(feature = "early_body_buffer"))] + let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + + #[cfg(not(feature = "early_body_buffer"))] + let buffer = session.as_mut().get_retry_buffer(); + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + #[cfg(feature = "early_body_buffer")] let (mut downstream_state, buffer) = if let Some(body) = session .take_buffered_body() .filter(|_| session.is_body_buffered()) diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 258490aa..11bb0e25 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -244,6 +244,9 @@ pub trait ProxyHttp { /// - Content-Length header is checked first (fail fast before reading) /// - Body size is checked during accumulation (streaming protection) /// - If exceeded, returns HTTP 413 (Payload Too Large) + /// + /// Requires the `early_body_buffer` feature. + #[cfg(feature = "early_body_buffer")] fn early_request_body_buffer_limit( &self, _session: &Session, @@ -262,6 +265,9 @@ pub trait ProxyHttp { /// header-phase filters, so it should not depend on state set by [`Self::request_filter()`]. /// /// The normal [`Self::request_body_filter()`] still runs during upstream body forwarding. + /// + /// Requires the `early_body_buffer` feature. + #[cfg(feature = "early_body_buffer")] async fn early_request_body_filter( &self, _session: &mut Session, diff --git a/pingora/Cargo.toml b/pingora/Cargo.toml index 8e30130a..7bc37a55 100644 --- a/pingora/Cargo.toml +++ b/pingora/Cargo.toml @@ -143,6 +143,10 @@ connection_filter = [ "pingora-proxy?/connection_filter", ] +## Enable buffering the full request body before `upstream_peer`, via the +## `early_request_body_buffer_limit` and `early_request_body_filter` trait hooks. +early_body_buffer = ["pingora-proxy?/early_body_buffer"] + # These features are intentionally not documented openssl_derived = ["any_tls"] @@ -155,6 +159,7 @@ document-features = [ "cache", "time", "sentry", - "connection_filter" + "connection_filter", + "early_body_buffer" ] trace = ["pingora-cache?/trace", "pingora-proxy?/trace"]