Skip to content

fix(proxy,rtsp): pause upstream reads on client backpressure#465

Merged
stackia merged 10 commits intomainfrom
fix/upstream-flow-control
May 5, 2026
Merged

fix(proxy,rtsp): pause upstream reads on client backpressure#465
stackia merged 10 commits intomainfrom
fix/upstream-flow-control

Conversation

@stackia
Copy link
Copy Markdown
Owner

@stackia stackia commented May 5, 2026

Summary

Slow HTTP clients downloading large segments via /http/ proxy were getting dropped after only ~12% of the content was transferred. RTSP TCP-interleaved suffered a milder version of the same issue (silent packet loss creating gaps in the RTP stream).

Root cause — When the client's send queue saturated, connection_queue_zerocopy() returned -1 (packet-drop semantics inherited from RTP/UDP). http_proxy.c treated this as fatal and tore down the connection (RTSP just dropped the buffer silently). For TCP byte streams that's wrong: dropping a single byte corrupts the response.

Fix — Real TCP flow control:

  • When the client zc_queue reaches HWM (75%), the upstream module sets upstream_paused = 1 and stops reading from the upstream socket. Data accumulates in the upstream TCP receive buffer; the upstream server's TCP send window naturally throttles the producer.
  • When connection_handle_write drains the queue below LWM (50%), stream_on_client_drain resumes upstream reads.
  • A connection-level any_upstream_paused bit keeps the per-write notification cheap for the 99% of streams that have no paused upstream.

Scope

Path Change
HTTP proxy (/http/...) ✅ Pause/resume implemented
RTSP TCP interleaved ✅ Pause/resume implemented (keepalives flow during pause, so the server never times us out)
RTSP UDP / FCC / Multicast ⏭️ Unchanged — kernel-level UDP RX buffer overflow already drops packets, so application-layer pause is meaningless for UDP

How it works

Upstream-paused state machine

stateDiagram-v2
    [*] --> Active : connection established
    Active --> Paused : queued ≥ HWM (75%)<br/>checked at recv-loop entry
    Paused --> Active : queued ≤ LWM (50%)<br/>checked from connection_handle_write<br/>via stream_on_client_drain()
    Active --> [*] : EOF / error / client close
Loading

End-to-end pause/resume protocol

sequenceDiagram
    autonumber
    participant US as Upstream<br/>(HTTP / RTSP TCP server)
    participant Sock as Upstream socket<br/>(rtp2httpd side)
    participant Mod as HTTP proxy /<br/>RTSP TCP module
    participant Q as connection_t<br/>(zc_queue + flags)
    participant Cli as Slow client

    rect rgb(232, 245, 233)
    Note over US,Cli: 1. Steady state — queued < HWM
    US->>Sock: TCP data
    Sock-->>Mod: POLLER_IN
    Mod->>Mod: recv() into buffer pool
    Mod->>Q: connection_queue_zerocopy(buf)
    Q->>Cli: zerocopy_send()
    end

    rect rgb(255, 243, 224)
    Note over US,Cli: 2. Client slows → queued reaches HWM (75%)
    US->>Sock: more TCP data
    Sock-->>Mod: POLLER_IN
    Mod->>Q: connection_should_pause_upstream()?
    Q-->>Mod: YES
    Mod->>Mod: upstream_paused = 1<br/>(skips recv on subsequent POLLER_IN events)
    Mod->>Q: connection_record_pause()<br/>connection_recompute_any_upstream_paused()<br/>→ any_upstream_paused = 1
    Note over Sock,US: Kernel TCP recv buffer fills →<br/>zero-window ACKs → server's send blocks
    end

    rect rgb(227, 242, 253)
    Note over US,Cli: 3. Client drains → queued ≤ LWM (50%)
    Q->>Cli: zerocopy_send() (writes complete)
    Note over Q: connection_handle_write
    Q->>Mod: stream_on_client_drain()<br/>(fast-path early-out when any_upstream_paused == 0)
    Mod->>Q: connection_can_resume_upstream()?
    Q-->>Mod: YES
    Mod->>Mod: upstream_paused = 0
    Mod->>Sock: drain loop: recv until EAGAIN, HWM, or EOF<br/>(needed because edge-triggered POLLER_IN<br/>did not fire while paused)
    Sock->>US: TCP window opens → server resumes sending
    end
Loading

Why flag-only instead of poller_mod

The first iteration of this PR removed POLLER_IN from the upstream socket on pause and re-added it on resume. That broke macOS/kqueue: poller_mod with a mask lacking both POLLER_IN and POLLER_OUT deletes both kqueue filters, losing EV_EOF / HUP detection, and the next pause/resume cycle leaves the socket without any registered filters. The flag-only approach gates reads at the recv handler instead — same effect (no recv() while paused), portable across epoll/kqueue, and the resume-time drain loop reliably picks up any data that arrived during the pause regardless of edge-triggered semantics.

Why no max-pause timeout for RTSP

An earlier iteration aborted the session after 30s of continuous pause, on the theory that the upstream RTSP server would otherwise time out our session (~60s) and leave a half-dead connection. In practice that never happens: pause only suspends the recv path, while rtsp_session_tick keeps emitting keepalives on the same socket. The server treats those as activity and resets its session timer. Even in the pathological case where the server did reclaim us, we'd see EOF on the next resume drain — rtsp_resume_upstream already cleans that up via rtsp_force_cleanup + connection_begin_drain_close. The guard was killing slow-but-recovering clients for no benefit, so it's been dropped.

Hot-path performance

connection_should_pause_upstream / connection_can_resume_upstream need fresh inputs (active stream count, pool utilization), but recomputing the dynamic limit on every packet would be wasteful. Resolved with a fast-path early-out using compile-time MIN_HWM_BYTES / MIN_LWM_BYTES constants (derived from the floor of the dynamic limit): when the queue is below those thresholds — the steady-state common case — the check is a load + const compare + branch. Above the floor, the helpers refresh c->queue_limit_bytes via a side-effect-free pure-compute path so the decision uses current inputs rather than whatever was cached at the last enqueue.

Status semantics

backpressure_events was previously a 1:1 alias of dropped_packets (incremented inside connection_record_drop). It's been repurposed to count upstream-pause edges (each pause_upstream() 0→1 transition). The two counters are now orthogonal:

  • dropped_packets — packets actually dropped (UDP paths only, since TCP paths now pause instead of drop)
  • backpressure_events — number of times an upstream TCP session was throttled (HTTP proxy + RTSP TCP only)

This lets the status page distinguish congestion from throttling.

Files changed

  • src/connection.{c,h}any_upstream_paused flag; connection_should_pause_upstream() / connection_can_resume_upstream() with fast-path + dynamic-limit refresh; connection_record_pause() helper; connection_queue_bytes() helper; connection_recompute_any_upstream_paused(); connection_begin_drain_close(); stream_on_client_drain() invocation in connection_handle_write
  • src/http_proxy.{c,h} — pause/resume primitives; HWM check before recv() in STREAMING phase; resume-time drain loop with EOF cleanup
  • src/rtsp.{c,h} — pause/resume primitives (TCP transport only); HWM check in rtsp_handle_tcp_interleaved_data; resume-time drain with EOF cleanup
  • src/stream.{c,h}stream_on_client_drain() dispatch with cheap fast-path early-out
  • src/status.hbackpressure_events doc updated to reflect new semantics
  • e2e/test_flow_control.py — regression test: 1 MiB body via /http/ proxy with -b 128 (forces ~96 KiB queue limit) and slow client (8 KiB chunks @ 20 ms) — asserts byte-for-byte completeness

Test plan

  • Build clean (no new warnings) on Linux/epoll
  • Build on macOS/kqueue (flag-only approach is portable)
  • E2E: e2e/test_flow_control.py::TestHTTPProxyBackpressure::test_slow_client_receives_full_body passes
  • Full e2e suite (./scripts/run-e2e.sh) regression — 425 tests pass
  • Lint clean (pnpm lint: biome / ruff / clang-format)
  • Manual: curl --limit-rate 62500 against catchup HTTP proxy → full byte-for-byte content delivered, no abort
  • Manual: slow RTSP TCP client → continuous RTP sequence numbers, server logs show pause/resume

🤖 Generated with Claude Code

Slow HTTP clients downloading large segments via /http/ proxy were dropped
after only ~12% of content. When connection_queue_zerocopy() returned -1
(packet-drop semantics) on a saturated client send queue, http_proxy.c
treated it as fatal and tore down the connection. RTSP TCP-interleaved
suffered a milder variant: silent packet drops creating gaps in the RTP
stream.

Apply real TCP flow control: when the client zc_queue reaches the high
watermark (75%), remove POLLER_IN from the upstream socket so data backs
up in the upstream TCP receive buffer and the upstream server's send
window naturally slows down. When connection_handle_write drains the
queue below the low watermark (50%), stream_on_client_drain re-arms
POLLER_IN. A connection-level any_upstream_paused flag keeps the per-write
notify cheap for the common case where no upstream is paused.

For RTSP TCP, the same socket carries control responses; rtsp_session_tick
enforces a 30s max-pause guard to avoid outlasting the server's session
timeout (typically 60s).

UDP paths (RTSP UDP, FCC, multicast) are unchanged — kernel-level UDP RX
buffer overflow already drops packets, so application-layer pause is moot.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 5, 2026 09:07
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 301b77c911

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/http_proxy.c Outdated
Comment thread src/rtsp.c Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces TCP flow-control for /http/ proxy and RTSP TCP-interleaved streaming by pausing upstream reads when the client’s zerocopy send queue is near saturation, then resuming reads once the queue drains below a low watermark. This prevents TCP byte-stream corruption (HTTP) and silent media gaps (RTSP interleaved) under slow-client backpressure.

Changes:

  • Add connection-level pause/resume watermark helpers and a cheap any_upstream_paused fast-path bit.
  • Implement upstream pause/resume in HTTP proxy and RTSP TCP-interleaved paths, including an RTSP max-pause guard.
  • Notify pause-resume opportunities from the client write-drain path (connection_handle_writestream_on_client_drain).

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/connection.h Adds flow-control watermark macros, pause/resume helper prototypes, and any_upstream_paused flag.
src/connection.c Implements watermark checks and calls stream_on_client_drain() after client queue drains.
src/stream.h Declares stream_on_client_drain() API and documents intended usage.
src/stream.c Implements drain notification dispatch to HTTP proxy / RTSP resume helpers.
src/http_proxy.h Adds upstream_paused state and http_proxy_resume_upstream() prototype.
src/http_proxy.c Adds upstream pause/resume logic and HWM check before streaming recv().
src/rtsp.h Adds flow-control fields and rtsp_resume_upstream() prototype; updates tick contract.
src/rtsp.c Adds RTSP pause/resume implementation, socket event computation helper, HWM check, and max-pause abort guard.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/http_proxy.c Outdated
Comment thread src/http_proxy.c
Comment thread src/rtsp.c
Comment thread src/stream.h Outdated
Comment thread src/rtsp.c Outdated
Comment thread src/rtsp.c Outdated
When connection_handle_write fully drains the zc_queue and falls into the
IDLE branch, it used to disarm POLLER_OUT first, then call
stream_on_client_drain.  If the drain hook resumed a paused upstream
session and queued new buffers in the same call frame, those buffers stayed
in the queue with POLLER_OUT disarmed — so the worker never re-entered
the writer until something else woke it up, manifesting as stalls under
backpressure.

Reorder so the drain hook fires before we recompute the poller mask, then
include POLLER_OUT in the mask iff the queue is non-empty after the hook.

Add an e2e regression test (test_flow_control.py) that reproduces the
original ~12 % truncation by running a slow client against a 1 MiB HTTP
proxy response with a constrained buffer pool (-b 128 → ~96 KiB queue
limit per connection).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Round-1 review feedback (Copilot + Codex):

1. **kqueue safety**: poller_mod with a mask that omits POLLER_IN/POLLER_OUT
   on kqueue (poller_kqueue.c) deletes BOTH read and write filters, which
   silently disables HUP/EOF detection on the upstream socket while paused.
   Switch pause/resume to flag-only — the existing HWM check at the top of
   the recv path is the actual gate.  Edge-triggered pollers still wake the
   worker on new data; the recv handler bails out via the flag when the
   queue is saturated.  This also removes the pause-failure stall vector.

2. **Drain to EAGAIN on resume (HTTP proxy)**: The previous resume did a
   single recv, but ET semantics require draining until EAGAIN.  Loop the
   try_receive call so multi-chunk buffered data is fully delivered.

3. **EOF cleanup on RTSP resume**: rtsp_resume_upstream now propagates the
   -1 return from rtsp_handle_tcp_interleaved_data (recv-returns-0 / error),
   triggering the same teardown the IN-event handler would have run.
   Without this, an upstream FIN that arrives while paused leaves the
   session orphaned until the keepalive timer expires.

4. **Doc nit (stream.h)**: clarify that stream_on_client_drain requires the
   struct to be at least zero-initialized; uninit stack memory is unsafe.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (2)

src/http_proxy.c:805

  • When backpressure hits, this branch returns early even if http_proxy_pause_upstream() fails (e.g., poller_mod error). In edge-triggered mode that can deadlock the proxy: the upstream fd stays readable but we neither read nor successfully un-arm POLLER_IN, so no further events are delivered. Consider only returning early if the pause actually took effect (e.g., check session->upstream_paused after calling pause, or have pause return a success/failure flag and continue recv() on failure).
        }
        session->rewrite_body_buffer = new_buf;
        session->rewrite_body_buffer_size = new_alloc;
      }

      memcpy(session->rewrite_body_buffer + session->rewrite_body_buffer_used, temp_buf, (size_t)received);
      session->rewrite_body_buffer_used = new_size;

src/rtsp.c:1843

  • If rtsp_pause_upstream() fails (poller_mod error), this code still returns early. With edge-triggered pollers that can stall the session: unread data remains in the socket buffer, but EPOLLIN won’t retrigger, and we also didn’t successfully pause reads to push backpressure upstream. Suggest only returning when the pause is actually in effect (e.g., call rtsp_pause_upstream(); if (session->upstream_paused) return total_forwarded; otherwise continue draining).
    while (session->response_buffer_pos < RTSP_RESPONSE_BUFFER_SIZE) {
      int bytes_received = recv(session->socket, session->response_buffer + session->response_buffer_pos,
                                RTSP_RESPONSE_BUFFER_SIZE - session->response_buffer_pos, 0);
      if (bytes_received < 0) {
        if (errno == EAGAIN) {
          hit_eagain = 1;
          break; /* No more data available */

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread e2e/test_flow_control.py
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Round-2 review (Copilot): if the slow-drain helper observes a truncated or
corrupted HTTP response (a plausible failure mode for the regression this
test guards against), parsing the status line would raise IndexError /
ValueError instead of letting the assertion report a clean test failure.
Wrap the parse and return (0, {}, buf) on failure so the test message
points at the actual problem.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Simplify pass after the round-1/round-2 review fixes.

- Extract `connection_recompute_any_upstream_paused()` so HTTP proxy and
  RTSP no longer reach into each other's session structs to decide when
  to clear the connection-level fast-path bit.  Adding a third upstream
  type (or removing one) is now a one-line change in the helper.
- Extract `connection_begin_drain_close()` for the
  `state = CONN_CLOSING + epoll_update_events(POLLER_*)` incantation that
  appeared inline in both resume EOF-cleanup paths.

Net: -31 / +48 lines, but the deltas in http_proxy.c and rtsp.c shrink
(-7 each), and the new helpers can absorb the equivalent inline pattern
elsewhere in rtsp.c on a future sweep.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/connection.c Outdated
Comment thread src/stream.c
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Hot-path check for upstream pause/resume now exits in 3 instructions
when the queue is below the absolute-minimum HWM/LWM (derived from the
floor of the dynamic limit), and refreshes c->queue_limit_bytes via a
side-effect-free compute path only above that threshold. This fixes the
stale-limit bug Copilot flagged without paying recompute cost on every
packet.

- Split connection_calculate_queue_limit into compute (pure, hot-path
  safe) and update (EWMA + slow-state machine, called per enqueue).
- Extract queue_limit_inputs_t + connection_prepare_queue_limit_inputs
  + connection_apply_slow_clamp so both compute and update share input
  prep without duplicating utilization math.
- Add connection_queue_bytes() inline helper, replace 6 call sites of
  num_queued * BUFFER_POOL_BUFFER_SIZE across connection/http_proxy/rtsp.
- Add CONN_HWM/CONN_LWM macros, move the HWM/LWM NUM/DEN constants from
  the header to connection.c (no external callers).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

…e events

Previously backpressure_events was a 1:1 alias of dropped_packets,
incremented inside connection_record_drop. With the new HWM-pause
flow control, HTTP-proxy and RTSP-TCP almost never drop, so the field
was always 0 on TCP paths and just duplicated dropped_packets on UDP.

Redefine the field to count upstream-pause occurrences:

- connection_record_drop no longer touches backpressure_events;
  dropped_packets remains the sole drop counter (UDP paths).
- http_proxy_pause_upstream and rtsp_pause_upstream increment it once
  per pause edge (when upstream_paused flips 0 -> 1).
- The drop-log throttle in connection_queue_zerocopy now keys off
  dropped_packets so it survives the dropped increment.

Net effect: dropped_packets and backpressure_events are now orthogonal
- "how many packets we dropped" vs. "how many times we throttled the
upstream", letting the status page distinguish congestion from
throttling.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

The 30s pause guard was originally added on the assumption that the
upstream RTSP server would reclaim our session if we stayed paused
near its session timeout (~60s).  In practice this never materialises:

- Pause only blocks the recv path (HWM check in
  rtsp_handle_tcp_interleaved_data); the send side is untouched.
- rtsp_session_tick() keeps emitting keepalives on the same socket,
  which the server treats as activity and resets its session timer.
- Even if the server did reclaim us, we'd see EOF in the kernel buffer
  on next resume — rtsp_resume_upstream already handles that path
  cleanly via rtsp_force_cleanup + connection_begin_drain_close.

So the only thing the 30s guard achieved was killing slow-but-recovering
clients early.  Drop the macro, the pause_started_ms field, and the
guard block in rtsp_session_tick.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

Both http_proxy_pause_upstream and rtsp_pause_upstream open-coded the
same null-guarded backpressure_events bump.  Mirror connection_record_-
drop with a connection_record_pause inline helper so the counter's
ownership stays with the connection module.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Azure Static Web Apps: Your stage site is ready! Visit it here: https://thankful-water-0a297bf00-465.eastasia.1.azurestaticapps.net

@stackia stackia merged commit c2ee86c into main May 5, 2026
5 checks passed
@stackia stackia deleted the fix/upstream-flow-control branch May 5, 2026 13:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants