fix(proxy,rtsp): pause upstream reads on client backpressure#465
fix(proxy,rtsp): pause upstream reads on client backpressure#465
Conversation
Slow HTTP clients downloading large segments via /http/ proxy were dropped after only ~12% of content. When connection_queue_zerocopy() returned -1 (packet-drop semantics) on a saturated client send queue, http_proxy.c treated it as fatal and tore down the connection. RTSP TCP-interleaved suffered a milder variant: silent packet drops creating gaps in the RTP stream. Apply real TCP flow control: when the client zc_queue reaches the high watermark (75%), remove POLLER_IN from the upstream socket so data backs up in the upstream TCP receive buffer and the upstream server's send window naturally slows down. When connection_handle_write drains the queue below the low watermark (50%), stream_on_client_drain re-arms POLLER_IN. A connection-level any_upstream_paused flag keeps the per-write notify cheap for the common case where no upstream is paused. For RTSP TCP, the same socket carries control responses; rtsp_session_tick enforces a 30s max-pause guard to avoid outlasting the server's session timeout (typically 60s). UDP paths (RTSP UDP, FCC, multicast) are unchanged — kernel-level UDP RX buffer overflow already drops packets, so application-layer pause is moot. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 301b77c911
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
This PR introduces TCP flow-control for /http/ proxy and RTSP TCP-interleaved streaming by pausing upstream reads when the client’s zerocopy send queue is near saturation, then resuming reads once the queue drains below a low watermark. This prevents TCP byte-stream corruption (HTTP) and silent media gaps (RTSP interleaved) under slow-client backpressure.
Changes:
- Add connection-level pause/resume watermark helpers and a cheap
any_upstream_pausedfast-path bit. - Implement upstream pause/resume in HTTP proxy and RTSP TCP-interleaved paths, including an RTSP max-pause guard.
- Notify pause-resume opportunities from the client write-drain path (
connection_handle_write→stream_on_client_drain).
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/connection.h | Adds flow-control watermark macros, pause/resume helper prototypes, and any_upstream_paused flag. |
| src/connection.c | Implements watermark checks and calls stream_on_client_drain() after client queue drains. |
| src/stream.h | Declares stream_on_client_drain() API and documents intended usage. |
| src/stream.c | Implements drain notification dispatch to HTTP proxy / RTSP resume helpers. |
| src/http_proxy.h | Adds upstream_paused state and http_proxy_resume_upstream() prototype. |
| src/http_proxy.c | Adds upstream pause/resume logic and HWM check before streaming recv(). |
| src/rtsp.h | Adds flow-control fields and rtsp_resume_upstream() prototype; updates tick contract. |
| src/rtsp.c | Adds RTSP pause/resume implementation, socket event computation helper, HWM check, and max-pause abort guard. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
When connection_handle_write fully drains the zc_queue and falls into the IDLE branch, it used to disarm POLLER_OUT first, then call stream_on_client_drain. If the drain hook resumed a paused upstream session and queued new buffers in the same call frame, those buffers stayed in the queue with POLLER_OUT disarmed — so the worker never re-entered the writer until something else woke it up, manifesting as stalls under backpressure. Reorder so the drain hook fires before we recompute the poller mask, then include POLLER_OUT in the mask iff the queue is non-empty after the hook. Add an e2e regression test (test_flow_control.py) that reproduces the original ~12 % truncation by running a slow client against a 1 MiB HTTP proxy response with a constrained buffer pool (-b 128 → ~96 KiB queue limit per connection). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
Round-1 review feedback (Copilot + Codex): 1. **kqueue safety**: poller_mod with a mask that omits POLLER_IN/POLLER_OUT on kqueue (poller_kqueue.c) deletes BOTH read and write filters, which silently disables HUP/EOF detection on the upstream socket while paused. Switch pause/resume to flag-only — the existing HWM check at the top of the recv path is the actual gate. Edge-triggered pollers still wake the worker on new data; the recv handler bails out via the flag when the queue is saturated. This also removes the pause-failure stall vector. 2. **Drain to EAGAIN on resume (HTTP proxy)**: The previous resume did a single recv, but ET semantics require draining until EAGAIN. Loop the try_receive call so multi-chunk buffered data is fully delivered. 3. **EOF cleanup on RTSP resume**: rtsp_resume_upstream now propagates the -1 return from rtsp_handle_tcp_interleaved_data (recv-returns-0 / error), triggering the same teardown the IN-event handler would have run. Without this, an upstream FIN that arrives while paused leaves the session orphaned until the keepalive timer expires. 4. **Doc nit (stream.h)**: clarify that stream_on_client_drain requires the struct to be at least zero-initialized; uninit stack memory is unsafe. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
src/http_proxy.c:805
- When backpressure hits, this branch returns early even if http_proxy_pause_upstream() fails (e.g., poller_mod error). In edge-triggered mode that can deadlock the proxy: the upstream fd stays readable but we neither read nor successfully un-arm POLLER_IN, so no further events are delivered. Consider only returning early if the pause actually took effect (e.g., check session->upstream_paused after calling pause, or have pause return a success/failure flag and continue recv() on failure).
}
session->rewrite_body_buffer = new_buf;
session->rewrite_body_buffer_size = new_alloc;
}
memcpy(session->rewrite_body_buffer + session->rewrite_body_buffer_used, temp_buf, (size_t)received);
session->rewrite_body_buffer_used = new_size;
src/rtsp.c:1843
- If rtsp_pause_upstream() fails (poller_mod error), this code still returns early. With edge-triggered pollers that can stall the session: unread data remains in the socket buffer, but EPOLLIN won’t retrigger, and we also didn’t successfully pause reads to push backpressure upstream. Suggest only returning when the pause is actually in effect (e.g., call rtsp_pause_upstream(); if (session->upstream_paused) return total_forwarded; otherwise continue draining).
while (session->response_buffer_pos < RTSP_RESPONSE_BUFFER_SIZE) {
int bytes_received = recv(session->socket, session->response_buffer + session->response_buffer_pos,
RTSP_RESPONSE_BUFFER_SIZE - session->response_buffer_pos, 0);
if (bytes_received < 0) {
if (errno == EAGAIN) {
hit_eagain = 1;
break; /* No more data available */
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
Round-2 review (Copilot): if the slow-drain helper observes a truncated or
corrupted HTTP response (a plausible failure mode for the regression this
test guards against), parsing the status line would raise IndexError /
ValueError instead of letting the assertion report a clean test failure.
Wrap the parse and return (0, {}, buf) on failure so the test message
points at the actual problem.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
Simplify pass after the round-1/round-2 review fixes. - Extract `connection_recompute_any_upstream_paused()` so HTTP proxy and RTSP no longer reach into each other's session structs to decide when to clear the connection-level fast-path bit. Adding a third upstream type (or removing one) is now a one-line change in the helper. - Extract `connection_begin_drain_close()` for the `state = CONN_CLOSING + epoll_update_events(POLLER_*)` incantation that appeared inline in both resume EOF-cleanup paths. Net: -31 / +48 lines, but the deltas in http_proxy.c and rtsp.c shrink (-7 each), and the new helpers can absorb the equivalent inline pattern elsewhere in rtsp.c on a future sweep. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
Hot-path check for upstream pause/resume now exits in 3 instructions when the queue is below the absolute-minimum HWM/LWM (derived from the floor of the dynamic limit), and refreshes c->queue_limit_bytes via a side-effect-free compute path only above that threshold. This fixes the stale-limit bug Copilot flagged without paying recompute cost on every packet. - Split connection_calculate_queue_limit into compute (pure, hot-path safe) and update (EWMA + slow-state machine, called per enqueue). - Extract queue_limit_inputs_t + connection_prepare_queue_limit_inputs + connection_apply_slow_clamp so both compute and update share input prep without duplicating utilization math. - Add connection_queue_bytes() inline helper, replace 6 call sites of num_queued * BUFFER_POOL_BUFFER_SIZE across connection/http_proxy/rtsp. - Add CONN_HWM/CONN_LWM macros, move the HWM/LWM NUM/DEN constants from the header to connection.c (no external callers). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
…e events Previously backpressure_events was a 1:1 alias of dropped_packets, incremented inside connection_record_drop. With the new HWM-pause flow control, HTTP-proxy and RTSP-TCP almost never drop, so the field was always 0 on TCP paths and just duplicated dropped_packets on UDP. Redefine the field to count upstream-pause occurrences: - connection_record_drop no longer touches backpressure_events; dropped_packets remains the sole drop counter (UDP paths). - http_proxy_pause_upstream and rtsp_pause_upstream increment it once per pause edge (when upstream_paused flips 0 -> 1). - The drop-log throttle in connection_queue_zerocopy now keys off dropped_packets so it survives the dropped increment. Net effect: dropped_packets and backpressure_events are now orthogonal - "how many packets we dropped" vs. "how many times we throttled the upstream", letting the status page distinguish congestion from throttling. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
The 30s pause guard was originally added on the assumption that the upstream RTSP server would reclaim our session if we stayed paused near its session timeout (~60s). In practice this never materialises: - Pause only blocks the recv path (HWM check in rtsp_handle_tcp_interleaved_data); the send side is untouched. - rtsp_session_tick() keeps emitting keepalives on the same socket, which the server treats as activity and resets its session timer. - Even if the server did reclaim us, we'd see EOF in the kernel buffer on next resume — rtsp_resume_upstream already handles that path cleanly via rtsp_force_cleanup + connection_begin_drain_close. So the only thing the 30s guard achieved was killing slow-but-recovering clients early. Drop the macro, the pause_started_ms field, and the guard block in rtsp_session_tick. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
Both http_proxy_pause_upstream and rtsp_pause_upstream open-coded the same null-guarded backpressure_events bump. Mirror connection_record_- drop with a connection_record_pause inline helper so the counter's ownership stays with the connection module. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net |
Summary
Slow HTTP clients downloading large segments via
/http/proxy were getting dropped after only ~12% of the content was transferred. RTSP TCP-interleaved suffered a milder version of the same issue (silent packet loss creating gaps in the RTP stream).Root cause — When the client's send queue saturated,
connection_queue_zerocopy()returned-1(packet-drop semantics inherited from RTP/UDP).http_proxy.ctreated this as fatal and tore down the connection (RTSP just dropped the buffer silently). For TCP byte streams that's wrong: dropping a single byte corrupts the response.Fix — Real TCP flow control:
zc_queuereaches HWM (75%), the upstream module setsupstream_paused = 1and stops reading from the upstream socket. Data accumulates in the upstream TCP receive buffer; the upstream server's TCP send window naturally throttles the producer.connection_handle_writedrains the queue below LWM (50%),stream_on_client_drainresumes upstream reads.any_upstream_pausedbit keeps the per-write notification cheap for the 99% of streams that have no paused upstream.Scope
/http/...)How it works
Upstream-paused state machine
stateDiagram-v2 [*] --> Active : connection established Active --> Paused : queued ≥ HWM (75%)<br/>checked at recv-loop entry Paused --> Active : queued ≤ LWM (50%)<br/>checked from connection_handle_write<br/>via stream_on_client_drain() Active --> [*] : EOF / error / client closeEnd-to-end pause/resume protocol
sequenceDiagram autonumber participant US as Upstream<br/>(HTTP / RTSP TCP server) participant Sock as Upstream socket<br/>(rtp2httpd side) participant Mod as HTTP proxy /<br/>RTSP TCP module participant Q as connection_t<br/>(zc_queue + flags) participant Cli as Slow client rect rgb(232, 245, 233) Note over US,Cli: 1. Steady state — queued < HWM US->>Sock: TCP data Sock-->>Mod: POLLER_IN Mod->>Mod: recv() into buffer pool Mod->>Q: connection_queue_zerocopy(buf) Q->>Cli: zerocopy_send() end rect rgb(255, 243, 224) Note over US,Cli: 2. Client slows → queued reaches HWM (75%) US->>Sock: more TCP data Sock-->>Mod: POLLER_IN Mod->>Q: connection_should_pause_upstream()? Q-->>Mod: YES Mod->>Mod: upstream_paused = 1<br/>(skips recv on subsequent POLLER_IN events) Mod->>Q: connection_record_pause()<br/>connection_recompute_any_upstream_paused()<br/>→ any_upstream_paused = 1 Note over Sock,US: Kernel TCP recv buffer fills →<br/>zero-window ACKs → server's send blocks end rect rgb(227, 242, 253) Note over US,Cli: 3. Client drains → queued ≤ LWM (50%) Q->>Cli: zerocopy_send() (writes complete) Note over Q: connection_handle_write Q->>Mod: stream_on_client_drain()<br/>(fast-path early-out when any_upstream_paused == 0) Mod->>Q: connection_can_resume_upstream()? Q-->>Mod: YES Mod->>Mod: upstream_paused = 0 Mod->>Sock: drain loop: recv until EAGAIN, HWM, or EOF<br/>(needed because edge-triggered POLLER_IN<br/>did not fire while paused) Sock->>US: TCP window opens → server resumes sending endWhy flag-only instead of
poller_modThe first iteration of this PR removed
POLLER_INfrom the upstream socket on pause and re-added it on resume. That broke macOS/kqueue:poller_modwith a mask lacking bothPOLLER_INandPOLLER_OUTdeletes both kqueue filters, losingEV_EOF/HUPdetection, and the next pause/resume cycle leaves the socket without any registered filters. The flag-only approach gates reads at the recv handler instead — same effect (norecv()while paused), portable across epoll/kqueue, and the resume-time drain loop reliably picks up any data that arrived during the pause regardless of edge-triggered semantics.Why no max-pause timeout for RTSP
An earlier iteration aborted the session after 30s of continuous pause, on the theory that the upstream RTSP server would otherwise time out our session (~60s) and leave a half-dead connection. In practice that never happens: pause only suspends the recv path, while
rtsp_session_tickkeeps emitting keepalives on the same socket. The server treats those as activity and resets its session timer. Even in the pathological case where the server did reclaim us, we'd see EOF on the next resume drain —rtsp_resume_upstreamalready cleans that up viartsp_force_cleanup+connection_begin_drain_close. The guard was killing slow-but-recovering clients for no benefit, so it's been dropped.Hot-path performance
connection_should_pause_upstream/connection_can_resume_upstreamneed fresh inputs (active stream count, pool utilization), but recomputing the dynamic limit on every packet would be wasteful. Resolved with a fast-path early-out using compile-timeMIN_HWM_BYTES/MIN_LWM_BYTESconstants (derived from the floor of the dynamic limit): when the queue is below those thresholds — the steady-state common case — the check is a load + const compare + branch. Above the floor, the helpers refreshc->queue_limit_bytesvia a side-effect-free pure-compute path so the decision uses current inputs rather than whatever was cached at the last enqueue.Status semantics
backpressure_eventswas previously a 1:1 alias ofdropped_packets(incremented insideconnection_record_drop). It's been repurposed to count upstream-pause edges (eachpause_upstream()0→1 transition). The two counters are now orthogonal:dropped_packets— packets actually dropped (UDP paths only, since TCP paths now pause instead of drop)backpressure_events— number of times an upstream TCP session was throttled (HTTP proxy + RTSP TCP only)This lets the status page distinguish congestion from throttling.
Files changed
src/connection.{c,h}—any_upstream_pausedflag;connection_should_pause_upstream()/connection_can_resume_upstream()with fast-path + dynamic-limit refresh;connection_record_pause()helper;connection_queue_bytes()helper;connection_recompute_any_upstream_paused();connection_begin_drain_close();stream_on_client_drain()invocation inconnection_handle_writesrc/http_proxy.{c,h}— pause/resume primitives; HWM check beforerecv()in STREAMING phase; resume-time drain loop with EOF cleanupsrc/rtsp.{c,h}— pause/resume primitives (TCP transport only); HWM check inrtsp_handle_tcp_interleaved_data; resume-time drain with EOF cleanupsrc/stream.{c,h}—stream_on_client_drain()dispatch with cheap fast-path early-outsrc/status.h—backpressure_eventsdoc updated to reflect new semanticse2e/test_flow_control.py— regression test: 1 MiB body via/http/proxy with-b 128(forces ~96 KiB queue limit) and slow client (8 KiB chunks @ 20 ms) — asserts byte-for-byte completenessTest plan
e2e/test_flow_control.py::TestHTTPProxyBackpressure::test_slow_client_receives_full_bodypasses./scripts/run-e2e.sh) regression — 425 tests passpnpm lint: biome / ruff / clang-format)curl --limit-rate 62500against catchup HTTP proxy → full byte-for-byte content delivered, no abort🤖 Generated with Claude Code