diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index 5f3a891a..982305c1 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -17,7 +17,8 @@ Pingora-proxy allows users to insert arbitrary logic into the life of a request. ```mermaid graph TD; start("new request")-->early_request_filter; - early_request_filter-->request_filter; + early_request_filter-->early_request_body_filter; + early_request_body_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; @@ -60,11 +61,16 @@ This is the first phase of every request. This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. +### `early_request_body_filter()` +This phase runs during early body buffering, **before** `request_filter()` and `upstream_peer()`. It is only called when `early_request_body_buffer_limit()` returns `Some(max_size)`, which opts in to reading and buffering the full request body before upstream peer selection. Requires the `early_body_buffer` cargo feature. + +Use this for processing that must happen before header filters run, such as streaming decompression. The buffered body is then available via `session.get_buffered_body()` in `request_filter()` for routing decisions, auth signature verification, or body mutation. + ### `request_filter()` -This phase is usually for validating request inputs, rate limiting, and initializing context. +This phase is usually for validating request inputs, rate limiting, and initializing context. When early body buffering is enabled, the full body is already available via `session.get_buffered_body()`. ### `request_body_filter()` -This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. +This phase is triggered after a request body is ready to send to upstream. It will be called every time a piece of request body is received. This runs during the upstream forwarding phase, after `upstream_peer()` and connection establishment. ### `proxy_upstream_filter()` This phase determines if we should continue to the upstream to serve a response. If we short-circuit, a 502 is returned by default, but a different response can be implemented. diff --git a/docs/user_guide/phase_chart.md b/docs/user_guide/phase_chart.md index b915f950..7fa208d2 100644 --- a/docs/user_guide/phase_chart.md +++ b/docs/user_guide/phase_chart.md @@ -2,7 +2,8 @@ Pingora proxy phases without caching ```mermaid graph TD; start("new request")-->early_request_filter; - early_request_filter-->request_filter; + early_request_filter-->early_request_body_filter; + early_request_body_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index e1cc1cbb..0aa349b7 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -76,11 +76,16 @@ sentry = ["pingora-core/sentry"] upstream_modules = [] connection_filter = ["pingora-core/connection_filter"] trace = ["pingora-cache/trace"] +early_body_buffer = [] [[example]] name = "connection_filter" required-features = ["connection_filter"] +[[example]] +name = "body_routing" +required-features = ["early_body_buffer"] + # or locally cargo doc --config "build.rustdocflags='--cfg doc_async_trait'" [package.metadata.docs.rs] rustdoc-args = ["--cfg", "doc_async_trait"] diff --git a/pingora-proxy/examples/body_routing.rs b/pingora-proxy/examples/body_routing.rs new file mode 100644 index 00000000..85baf1b9 --- /dev/null +++ b/pingora-proxy/examples/body_routing.rs @@ -0,0 +1,165 @@ +// Copyright 2026 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example: early request body buffering for routing and body mutation. +//! +//! Demonstrates three patterns enabled by `early_request_body_buffer_limit()` and +//! `early_request_body_filter()`: +//! +//! 1. **Stream**: process each body chunk as it arrives in +//! `early_request_body_filter()` — before any header-phase filters run. +//! The example logs each chunk's byte count to show the streaming nature. +//! +//! 2. **Peek**: read the assembled buffered body with `get_buffered_body()` +//! in `request_filter()` to make routing decisions. +//! +//! 3. **Mutate**: replace the buffered body with `set_buffered_body()` so +//! the upstream receives the modified version. +//! +//! Uses httpbin.org as the upstream — its `/post` endpoint echoes back the +//! request body, so you can verify mutations in the response. + +use async_trait::async_trait; +use bytes::Bytes; +use log::info; + +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_proxy::{ProxyHttp, Session}; + +pub struct MyProxy; + +pub struct MyCtx { + route_beta: bool, + chunks_received: usize, + bytes_received: usize, +} + +#[async_trait] +impl ProxyHttp for MyProxy { + type CTX = MyCtx; + fn new_ctx(&self) -> Self::CTX { + MyCtx { + route_beta: false, + chunks_received: 0, + bytes_received: 0, + } + } + + /// Opt in to body buffering for POST requests up to 4KB. + fn early_request_body_buffer_limit( + &self, + session: &Session, + _ctx: &Self::CTX, + ) -> Option { + if session.req_header().method == http::Method::POST { + Some(4096) + } else { + None + } + } + + /// Stream: process each body chunk as it arrives during early buffering. + /// + /// This fires per-chunk, BEFORE request_filter sees the assembled body. + async fn early_request_body_filter( + &self, + _session: &mut Session, + body: &mut Option, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result<()> { + if let Some(data) = body { + ctx.chunks_received += 1; + ctx.bytes_received += data.len(); + info!( + "early_request_body_filter: chunk {} ({} bytes, {} total)", + ctx.chunks_received, + data.len(), + ctx.bytes_received + ); + } + if end_of_stream { + info!( + "early_request_body_filter: done — {} chunks, {} bytes total", + ctx.chunks_received, ctx.bytes_received + ); + } + Ok(()) + } + + async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result { + // By the time request_filter runs, the body is already buffered. + if let Some(body) = session.get_buffered_body() { + let text = std::str::from_utf8(body).unwrap_or(""); + + // Peek: inspect body content for routing decisions. + ctx.route_beta = text.contains("beta"); + info!("peek: route_beta={}", ctx.route_beta); + + // Mutate: wrap the original body in an envelope before forwarding. + let wrapped = format!(r#"{{"envelope":true,"original":{text}}}"#); + info!("mutate: {wrapped}"); + session.set_buffered_body(Some(Bytes::from(wrapped.clone()))); + + // Update Content-Length to match the new body size. + session + .req_header_mut() + .insert_header(http::header::CONTENT_LENGTH, wrapped.len().to_string())?; + } + Ok(false) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result> { + // httpbin.org echoes the request body back in its JSON response, + // so we can verify the mutation in the curl output. + let peer = Box::new(HttpPeer::new( + ("httpbin.org", 443), + true, + "httpbin.org".to_string(), + )); + Ok(peer) + } +} + +// RUST_LOG=INFO cargo run --features openssl --example body_routing +// +// Peek + mutate — body is inspected for routing then wrapped in an envelope: +// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "beta"}' +// curl -X POST 127.0.0.1:6193/post -H "Host: httpbin.org" -H "Content-Type: application/json" -d '{"route": "default"}' +// +// Multi-chunk — use chunked transfer encoding to see early_request_body_filter fire per-chunk: +// printf 'POST /post HTTP/1.1\r\nHost: httpbin.org\r\nTransfer-Encoding: chunked\r\n\r\na\r\n{"part":1}\r\na\r\n{"part":2}\r\n0\r\n\r\n' | nc 127.0.0.1 6193 +// +// No buffering — GET requests pass through unchanged: +// curl 127.0.0.1:6193/get -H "Host: httpbin.org" +fn main() { + env_logger::init(); + + let opt = Opt::parse_args(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, MyProxy); + my_proxy.add_tcp("0.0.0.0:6193"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 4ce9e5e5..ea6ebc6c 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -494,6 +494,15 @@ pub struct Session { upstream_write_pending_time: Duration, /// Flag that is set when the shutdown process has begun. shutdown_flag: Arc, + /// Request body buffered early (before upstream connection) for auth/routing decisions. + /// When set, body forwarding will use this instead of re-reading from downstream. + /// Use accessor methods: `get_buffered_body()`, `take_buffered_body()`, `set_buffered_body()`. + #[cfg(feature = "early_body_buffer")] + buffered_request_body: Option, + /// Whether body has been fully consumed for buffering. + /// Use accessor: `is_body_buffered()`. + #[cfg(feature = "early_body_buffer")] + body_buffered: bool, } impl Session { @@ -518,6 +527,10 @@ impl Session { upstream_body_bytes_received: 0, upstream_write_pending_time: Duration::ZERO, shutdown_flag, + #[cfg(feature = "early_body_buffer")] + buffered_request_body: None, + #[cfg(feature = "early_body_buffer")] + body_buffered: false, } } @@ -801,6 +814,80 @@ impl Session { self.shutdown_flag.load(Ordering::Acquire) } + /// Returns a reference to the buffered request body, if any. + /// + /// The body is buffered by `buffer_request_body_early()` when the trait method + /// `early_request_body_buffer_limit()` returns `Some(max_size)`. + #[cfg(feature = "early_body_buffer")] + pub fn get_buffered_body(&self) -> Option<&Bytes> { + self.buffered_request_body.as_ref() + } + + /// Takes ownership of the buffered request body, leaving `None` in its place. + /// + /// Use this when forwarding the body to upstream - takes the body once for sending. + #[cfg(feature = "early_body_buffer")] + pub fn take_buffered_body(&mut self) -> Option { + self.buffered_request_body.take() + } + + /// Sets the buffered request body. + /// + /// This is called by `buffer_request_body_early()` after reading the full body. + /// Also useful for app code that wants to replace the body (e.g., decompression). + #[cfg(feature = "early_body_buffer")] + pub fn set_buffered_body(&mut self, body: Option) { + self.body_buffered = body.is_some() || self.body_buffered; + self.buffered_request_body = body; + } + + /// Returns whether a body has been buffered (or confirmed empty). + /// + /// When `true`, the body has been fully read and is available via `get_buffered_body()`, + /// or the request has no body. Body forwarding will skip re-reading from downstream. + #[cfg(feature = "early_body_buffer")] + pub fn is_body_buffered(&self) -> bool { + self.body_buffered + } + + /// Marks the body as buffered without setting a body. + /// + /// Used when the request has no body (no Content-Length or Transfer-Encoding), + /// to prevent `buffer_request_body_early()` from attempting to read. + #[cfg(feature = "early_body_buffer")] + pub fn mark_body_buffered(&mut self) { + self.body_buffered = true; + } + + /// Creates a Session from an H1 HttpSession (for testing only). + #[cfg(all(test, feature = "early_body_buffer"))] + pub fn new_h1_with_http_session( + http_session: pingora_core::protocols::http::v1::server::HttpSession, + ) -> Self { + use pingora_cache::HttpCache; + use pingora_core::protocols::http::compression::ResponseCompressionCtx; + use pingora_core::protocols::http::ServerSession; + + let shutdown_flag = Arc::new(AtomicBool::new(false)); + Session { + downstream_session: Box::new(ServerSession::H1(http_session)), + cache: HttpCache::new(), + upstream_compression: ResponseCompressionCtx::new(0, false, false), + ignore_downstream_range: false, + upstream_headers_mutated_for_cache: false, + subrequest_ctx: None, + subrequest_spawner: None, + downstream_modules_ctx: HttpModuleCtx::empty(), + #[cfg(feature = "upstream_modules")] + upstream_modules_ctx: HttpModuleCtx::empty(), + upstream_body_bytes_received: 0, + upstream_write_pending_time: Duration::ZERO, + shutdown_flag, + buffered_request_body: None, + body_buffered: false, + } + } + pub fn downstream_custom_message( &mut self, ) -> Result< @@ -883,6 +970,17 @@ where .await; } + // early body buffering: read full request body before request_filter + // see https://github.com/cloudflare/pingora/issues/780 + #[cfg(feature = "early_body_buffer")] + if !session.is_body_buffered() { + if let Err(e) = self.buffer_request_body_early(&mut session, &mut ctx).await { + return self + .handle_error(session, &mut ctx, e, "Failed to buffer request body:") + .await; + } + } + if self.inner.allow_spawning_subrequest(&session, &ctx) { session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone())); } @@ -1115,6 +1213,125 @@ where None } } + + /// Buffer the entire request body before connecting to upstream. + /// + /// This enables early_request_body_filter to run BEFORE upstream_peer selection, + /// allowing auth signature verification and content-based routing. + /// + /// Buffering is controlled by the trait method `early_request_body_buffer_limit()`: + /// - Returns `None`: Skip buffering, stream body to upstream (default) + /// - Returns `Some(max_size)`: Buffer body with size limit enforcement + /// + /// Size limit enforcement: + /// - Content-Length checked first (fail fast before reading) + /// - Accumulated size checked during reading (streaming protection) + /// - Returns HTTP 413 (Payload Too Large) if exceeded + #[cfg(feature = "early_body_buffer")] + async fn buffer_request_body_early( + &self, + session: &mut Session, + ctx: &mut ::CTX, + ) -> Result<()> + where + SV: ProxyHttp + Send + Sync, + ::CTX: Send + Sync, + { + // check trait method for opt-in and size limit + let Some(max_size) = self.inner.early_request_body_buffer_limit(session, ctx) else { + return Ok(()); + }; + + // skip if already buffered + if session.is_body_buffered() { + return Ok(()); + } + + let content_length = session + .downstream_session + .req_header() + .headers + .get(header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + + // fail fast: reject before reading if Content-Length exceeds limit + if let Some(cl) = content_length { + if cl > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body too large: Content-Length {} exceeds limit {} bytes", + cl, max_size + ), + ); + } + } + + // Content-Length: 0 means no body; for all other cases (no Content-Length, + // Transfer-Encoding, HTTP/2) attempt to read. read_body_or_idle returns + // None immediately if there's nothing. + if content_length == Some(0) { + session.mark_body_buffered(); + return Ok(()); + } + + let mut body_parts: Vec = Vec::new(); + let mut total_size: usize = 0; + + // read body chunks until end of stream + loop { + let body_chunk: Option = + match session.downstream_session.read_body_or_idle(false).await { + Ok(chunk) => chunk, + Err(e) => return Err(e.into_down()), + }; + + // end of stream: None means no more data, or downstream reports done + let end_of_body = body_chunk.is_none() || session.downstream_session.is_body_done(); + + // run early body filter (not module filters, they haven't run header filter yet) + let mut filter_data = body_chunk; + self.inner + .early_request_body_filter(session, &mut filter_data, end_of_body, ctx) + .await?; + + // accumulate the (possibly filtered) data + if let Some(filtered) = filter_data { + total_size += filtered.len(); + + // check size limit during accumulation + if total_size > max_size { + return Error::e_explain( + HTTPStatus(413), + format!( + "Request body exceeded limit: {} > {} bytes", + total_size, max_size + ), + ); + } + + body_parts.push(filtered); + } + + if end_of_body { + break; + } + } + + // combine all chunks into a single Bytes buffer + if total_size > 0 { + let mut combined = bytes::BytesMut::with_capacity(total_size); + for part in body_parts { + combined.extend_from_slice(&part); + } + session.set_buffered_body(Some(combined.freeze())); + } else { + session.mark_body_buffered(); + } + + Ok(()) + } } /* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649 @@ -1541,3 +1758,79 @@ where Service::new(name, proxy) } } + +#[cfg(all(test, feature = "early_body_buffer"))] +mod tests { + use super::*; + + // Test Session body buffering accessors + mod body_buffer { + use super::*; + use pingora_core::protocols::http::v1::server::HttpSession; + use tokio_test::io::Builder; + + fn create_test_session() -> Session { + // Create a minimal mock stream for testing using tokio-test Builder. + // Use empty mock since we only test accessor methods, not HTTP parsing. + let mock_io = Builder::new().build(); + let http_session = HttpSession::new(Box::new(mock_io)); + Session::new_h1_with_http_session(http_session) + } + + #[test] + fn test_initial_state() { + let session = create_test_session(); + assert!(!session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + + #[test] + fn test_set_and_get_buffered_body() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body.clone())); + + assert!(session.is_body_buffered()); + assert_eq!(session.get_buffered_body(), Some(&body)); + } + + #[test] + fn test_take_buffered_body() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body.clone())); + let taken = session.take_buffered_body(); + + assert_eq!(taken, Some(body)); + assert!(session.get_buffered_body().is_none()); + // is_body_buffered should remain true after take + assert!(session.is_body_buffered()); + } + + #[test] + fn test_mark_body_buffered() { + let mut session = create_test_session(); + + assert!(!session.is_body_buffered()); + session.mark_body_buffered(); + assert!(session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + + #[test] + fn test_set_none_preserves_buffered_flag() { + let mut session = create_test_session(); + let body = Bytes::from("test body"); + + session.set_buffered_body(Some(body)); + assert!(session.is_body_buffered()); + + // Setting None should preserve the buffered flag + session.set_buffered_body(None); + assert!(session.is_body_buffered()); + assert!(session.get_buffered_body().is_none()); + } + } +} diff --git a/pingora-proxy/src/proxy_common.rs b/pingora-proxy/src/proxy_common.rs index 6c40760c..a5f808ed 100644 --- a/pingora-proxy/src/proxy_common.rs +++ b/pingora-proxy/src/proxy_common.rs @@ -19,6 +19,9 @@ pub(crate) enum DownstreamStateMachine { Reading, /// no more data to read ReadingFinished, + /// body was pre-buffered before upstream connection, skip all downstream polling + #[cfg(feature = "early_body_buffer")] + PreBuffered, /// downstream is already errored or closed Errored, } @@ -35,7 +38,14 @@ impl DownstreamStateMachine { // Can call read() to read more data or wait on closing pub fn can_poll(&self) -> bool { - !matches!(self, Self::Errored) + #[cfg(feature = "early_body_buffer")] + { + !matches!(self, Self::Errored | Self::PreBuffered) + } + #[cfg(not(feature = "early_body_buffer"))] + { + !matches!(self, Self::Errored) + } } pub fn is_reading(&self) -> bool { diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index e74309eb..2e0de50c 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -379,10 +379,26 @@ where .await?; } + #[cfg(not(feature = "early_body_buffer"))] let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + #[cfg(not(feature = "early_body_buffer"))] let buffer = session.as_ref().get_retry_buffer(); + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + #[cfg(feature = "early_body_buffer")] + let (mut downstream_state, buffer) = if let Some(body) = session + .take_buffered_body() + .filter(|_| session.is_body_buffered()) + { + (DownstreamStateMachine::PreBuffered, Some(body)) + } else { + ( + DownstreamStateMachine::new(session.as_mut().is_body_done()), + session.as_ref().get_retry_buffer(), + ) + }; + // retry, send buffer if it exists or body empty if buffer.is_some() || session.as_mut().is_body_empty() { let send_permit = tx diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index e5030819..98ca5e12 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -383,10 +383,28 @@ where .await?; } + #[cfg(not(feature = "early_body_buffer"))] let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); + #[cfg(not(feature = "early_body_buffer"))] + let buffer = session.as_mut().get_retry_buffer(); + + // pre-buffered body (from buffer_request_body_early) takes precedence over retry buffer + #[cfg(feature = "early_body_buffer")] + let (mut downstream_state, buffer) = if let Some(body) = session + .take_buffered_body() + .filter(|_| session.is_body_buffered()) + { + (DownstreamStateMachine::PreBuffered, Some(body)) + } else { + ( + DownstreamStateMachine::new(session.as_mut().is_body_done()), + session.as_mut().get_retry_buffer(), + ) + }; + // retry, send buffer if it exists - if let Some(buffer) = session.as_mut().get_retry_buffer() { + if let Some(buffer) = buffer { self.send_body_to2( session, Some(buffer), diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 2411092d..11bb0e25 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -224,6 +224,63 @@ pub trait ProxyHttp { Ok(true) } + /// Determine whether to buffer the entire request body before connecting to upstream. + /// + /// This is called after [`Self::early_request_filter()`] but before [`Self::request_filter()`] + /// and [`Self::upstream_peer()`]. The body is buffered in `Session::buffered_request_body` + /// and can be accessed via [`Session::get_buffered_body()`]. + /// + /// # Returns + /// - `None`: Don't buffer, stream body to upstream (default) + /// - `Some(max_size)`: Buffer body with size limit, return 413 error if exceeded + /// + /// # Use Cases + /// - Auth signature verification (need full body before auth decision) + /// - Content-based routing decisions + /// - Body transformation before upstream selection + /// + /// # Size Limit Enforcement + /// When returning `Some(max_size)`: + /// - Content-Length header is checked first (fail fast before reading) + /// - Body size is checked during accumulation (streaming protection) + /// - If exceeded, returns HTTP 413 (Payload Too Large) + /// + /// Requires the `early_body_buffer` feature. + #[cfg(feature = "early_body_buffer")] + fn early_request_body_buffer_limit( + &self, + _session: &Session, + _ctx: &Self::CTX, + ) -> Option { + None // Default: stream body to upstream + } + + /// Handle each chunk of request body during early buffering. + /// + /// This is called during [`buffer_request_body_early()`] for each body chunk, **before** + /// [`Self::request_filter()`] and [`Self::upstream_peer()`]. Use this for processing + /// that must happen before header filters run (e.g., streaming decompression). + /// + /// Unlike [`Self::request_body_filter()`], this callback explicitly runs before any + /// header-phase filters, so it should not depend on state set by [`Self::request_filter()`]. + /// + /// The normal [`Self::request_body_filter()`] still runs during upstream body forwarding. + /// + /// Requires the `early_body_buffer` feature. + #[cfg(feature = "early_body_buffer")] + async fn early_request_body_filter( + &self, + _session: &mut Session, + _body: &mut Option, + _end_of_stream: bool, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + Ok(()) + } + /// Decide if the response is cacheable fn response_cache_filter( &self, diff --git a/pingora/Cargo.toml b/pingora/Cargo.toml index 8e30130a..7bc37a55 100644 --- a/pingora/Cargo.toml +++ b/pingora/Cargo.toml @@ -143,6 +143,10 @@ connection_filter = [ "pingora-proxy?/connection_filter", ] +## Enable buffering the full request body before `upstream_peer`, via the +## `early_request_body_buffer_limit` and `early_request_body_filter` trait hooks. +early_body_buffer = ["pingora-proxy?/early_body_buffer"] + # These features are intentionally not documented openssl_derived = ["any_tls"] @@ -155,6 +159,7 @@ document-features = [ "cache", "time", "sentry", - "connection_filter" + "connection_filter", + "early_body_buffer" ] trace = ["pingora-cache?/trace", "pingora-proxy?/trace"]