From 301b77c9113e22e32999de946a3ea4b548c75a92 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 17:06:39 +0800 Subject: [PATCH 01/10] fix(proxy,rtsp): pause upstream reads on client backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slow HTTP clients downloading large segments via /http/ proxy were dropped after only ~12% of content. When connection_queue_zerocopy() returned -1 (packet-drop semantics) on a saturated client send queue, http_proxy.c treated it as fatal and tore down the connection. RTSP TCP-interleaved suffered a milder variant: silent packet drops creating gaps in the RTP stream. Apply real TCP flow control: when the client zc_queue reaches the high watermark (75%), remove POLLER_IN from the upstream socket so data backs up in the upstream TCP receive buffer and the upstream server's send window naturally slows down. When connection_handle_write drains the queue below the low watermark (50%), stream_on_client_drain re-arms POLLER_IN. A connection-level any_upstream_paused flag keeps the per-write notify cheap for the common case where no upstream is paused. For RTSP TCP, the same socket carries control responses; rtsp_session_tick enforces a 30s max-pause guard to avoid outlasting the server's session timeout (typically 60s). UDP paths (RTSP UDP, FCC, multicast) are unchanged — kernel-level UDP RX buffer overflow already drops packets, so application-layer pause is moot. Co-Authored-By: Claude Opus 4.7 --- src/connection.c | 25 +++++++++++++ src/connection.h | 27 ++++++++++++++ src/http_proxy.c | 42 ++++++++++++++++++++++ src/http_proxy.h | 10 ++++++ src/rtsp.c | 91 ++++++++++++++++++++++++++++++++++++++++++++++++ src/rtsp.h | 15 ++++++-- src/stream.c | 21 +++++++---- src/stream.h | 11 ++++++ 8 files changed, 234 insertions(+), 8 deletions(-) diff --git a/src/connection.c b/src/connection.c index 48feddc..8b7c94d 100644 --- a/src/connection.c +++ b/src/connection.c @@ -334,6 +334,22 @@ static void connection_report_queue(connection_t *c) { c->queue_bytes_highwater, c->queue_buffers_highwater, c->dropped_packets, c->dropped_bytes, c->backpressure_events, c->slow_active); } +int connection_should_pause_upstream(const connection_t *c) { + if (!c || c->queue_limit_bytes == 0) + return 0; + size_t queued = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; + size_t hwm = (c->queue_limit_bytes * CONN_FLOW_CONTROL_HWM_NUM) / CONN_FLOW_CONTROL_HWM_DEN; + return queued >= hwm; +} + +int connection_can_resume_upstream(const connection_t *c) { + if (!c || c->queue_limit_bytes == 0) + return 0; + size_t queued = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; + size_t lwm = (c->queue_limit_bytes * CONN_FLOW_CONTROL_LWM_NUM) / CONN_FLOW_CONTROL_LWM_DEN; + return queued <= lwm; +} + int connection_set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) @@ -532,11 +548,14 @@ connection_write_status_t connection_handle_write(connection_t *c) { return CONNECTION_WRITE_IDLE; } + size_t total_sent = 0; + /* Loop to drain all writable data for edge-triggered pollers where * EPOLLOUT / EV_CLEAR fires only once when the socket becomes writable. */ for (;;) { size_t bytes_sent = 0; int ret = zerocopy_send(c->fd, &c->zc_queue, &bytes_sent); + total_sent += bytes_sent; if (ret < 0 && ret != -2) { c->state = CONN_CLOSING; @@ -548,6 +567,8 @@ connection_write_status_t connection_handle_write(connection_t *c) { if (ret == -2) { /* EAGAIN - socket send buffer full, wait for next writable event */ connection_report_queue(c); + if (total_sent > 0) + stream_on_client_drain(&c->stream); return CONNECTION_WRITE_BLOCKED; } @@ -557,6 +578,8 @@ connection_write_status_t connection_handle_write(connection_t *c) { connection_report_queue(c); if (c->state == CONN_CLOSING && !c->zc_queue.pending_head) return CONNECTION_WRITE_CLOSED; + if (total_sent > 0) + stream_on_client_drain(&c->stream); return CONNECTION_WRITE_IDLE; } @@ -567,6 +590,8 @@ connection_write_status_t connection_handle_write(connection_t *c) { /* Queue still has data but we couldn't make progress */ connection_report_queue(c); + if (total_sent > 0) + stream_on_client_drain(&c->stream); return CONNECTION_WRITE_PENDING; } diff --git a/src/connection.h b/src/connection.h index 0f8a0e3..b9f536c 100644 --- a/src/connection.h +++ b/src/connection.h @@ -67,6 +67,10 @@ typedef struct connection_s { double queue_avg_bytes; int slow_active; int64_t slow_candidate_since; + /* Set when any TCP upstream session attached to this connection has paused + * its reads due to client-side backpressure. Lets the per-write notify + * fast-path skip cheaply when no upstream is paused (the common case). */ + int any_upstream_paused; /* r2h-token Set-Cookie flag: set cookie when token was provided via URL query */ int should_set_r2h_cookie; @@ -177,4 +181,27 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref); */ int connection_queue_file(connection_t *c, int file_fd, off_t file_offset, size_t file_size); +/* Backpressure watermarks for TCP-to-TCP relay flow control. Upstream + * modules (HTTP proxy, RTSP TCP) pause reads when the client send queue + * exceeds HWM and resume when it falls back below LWM. The 25% hysteresis + * band prevents thrash. */ +#define CONN_FLOW_CONTROL_HWM_NUM 3 +#define CONN_FLOW_CONTROL_HWM_DEN 4 /* HWM = 75% of queue_limit_bytes */ +#define CONN_FLOW_CONTROL_LWM_NUM 1 +#define CONN_FLOW_CONTROL_LWM_DEN 2 /* LWM = 50% of queue_limit_bytes */ + +/** + * Returns true when the client send queue has reached the HWM and upstream + * reads should be paused. Returns false if queue_limit_bytes is unset + * (e.g. before the first zerocopy enqueue). + */ +int connection_should_pause_upstream(const connection_t *c); + +/** + * Returns true when the client send queue has fallen back below the LWM and + * paused upstream reads should be resumed. Returns false if queue_limit_bytes + * is unset. + */ +int connection_can_resume_upstream(const connection_t *c); + #endif /* CONNECTION_H */ diff --git a/src/http_proxy.c b/src/http_proxy.c index 84e78b4..b9885a5 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -35,6 +35,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session); static int http_proxy_parse_response_headers(http_proxy_session_t *session); static int http_proxy_append_raw_headers(http_proxy_session_t *session, char **dest, size_t *remaining, int *total_len, int filter_user_agent); +static void http_proxy_pause_upstream(http_proxy_session_t *session); static const char *http_proxy_get_override_user_agent(void) { if (config.http_proxy_user_agent && config.http_proxy_user_agent[0] != '\0') { @@ -75,9 +76,41 @@ void http_proxy_session_init(http_proxy_session_t *session) { session->bytes_received = 0; session->headers_received = 0; session->headers_forwarded = 0; + session->upstream_paused = 0; session->cleanup_done = 0; } +static void http_proxy_pause_upstream(http_proxy_session_t *session) { + if (!session || session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) + return; + if (poller_mod(session->epoll_fd, session->socket, POLLER_HUP | POLLER_ERR | POLLER_RDHUP) < 0) { + logger(LOG_DEBUG, "HTTP Proxy: poller_mod (pause) failed: %s", strerror(errno)); + return; + } + session->upstream_paused = 1; + if (session->conn) + session->conn->any_upstream_paused = 1; + logger(LOG_DEBUG, "HTTP Proxy: Paused upstream reads (queued=%zu limit=%zu)", + session->conn ? session->conn->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE : 0, + session->conn ? session->conn->queue_limit_bytes : 0); +} + +void http_proxy_resume_upstream(http_proxy_session_t *session) { + if (!session || !session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) + return; + if (poller_mod(session->epoll_fd, session->socket, POLLER_IN | POLLER_HUP | POLLER_ERR | POLLER_RDHUP) < 0) { + logger(LOG_DEBUG, "HTTP Proxy: poller_mod (resume) failed: %s", strerror(errno)); + return; + } + session->upstream_paused = 0; + if (session->conn && (!session->conn->stream.rtsp.initialized || !session->conn->stream.rtsp.upstream_paused)) + session->conn->any_upstream_paused = 0; + logger(LOG_DEBUG, "HTTP Proxy: Resumed upstream reads"); + /* Edge-triggered pollers may not fire EPOLLIN on EPOLL_CTL_MOD if data + * arrived while paused. Drain proactively to guarantee progress. */ + http_proxy_try_receive_response(session); +} + int http_proxy_parse_url(http_proxy_session_t *session, const char *url) { const char *p; char *colon; @@ -762,6 +795,15 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { } /* Phase 2: Zero-copy streaming - recv directly to buffer pool */ + + /* Pause upstream BEFORE recv when client queue is near limit. Dropping + * bytes mid-stream would corrupt the response body, so we instead push + * backpressure into the upstream TCP receive buffer. */ + if (connection_should_pause_upstream(session->conn)) { + http_proxy_pause_upstream(session); + return 0; + } + buffer_ref_t *buf = buffer_pool_alloc(); if (!buf) { logger(LOG_ERROR, "HTTP Proxy: Buffer pool exhausted"); diff --git a/src/http_proxy.h b/src/http_proxy.h index 58f9d66..b4b2de1 100644 --- a/src/http_proxy.h +++ b/src/http_proxy.h @@ -107,6 +107,9 @@ typedef struct { /* Per-service upstream interface override (resolved at init, non-owning) */ const char *upstream_ifname; + /* Flow control state */ + int upstream_paused; /* 1 = upstream POLLER_IN currently un-armed due to client backpressure */ + /* Cleanup state */ int cleanup_done; /* Flag: cleanup has been completed */ } http_proxy_session_t; @@ -196,6 +199,13 @@ int http_proxy_session_cleanup(http_proxy_session_t *session); */ int http_proxy_session_tick(http_proxy_session_t *session, int64_t now); +/** + * Resume reading from upstream after client send queue has drained. + * Called from stream_on_client_drain when zc_queue falls below LWM. + * @param session HTTP proxy session + */ +void http_proxy_resume_upstream(http_proxy_session_t *session); + /** * Build HTTP proxy URL for transformed M3U * Converts http://host:port/path to {BASE_URL}http/host:port/path diff --git a/src/rtsp.c b/src/rtsp.c index e1f3568..67df06a 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -312,6 +312,78 @@ void rtsp_session_init(rtsp_session_t *session) { session->teardown_requested = 0; session->teardown_reconnect_done = 0; session->state_before_teardown = RTSP_STATE_INIT; + + /* Initialize flow control state (TCP transport only) */ + session->upstream_paused = 0; + session->pause_started_ms = 0; +} + +/* Maximum time the upstream socket may stay paused before we force-disconnect. + * Must be safely below typical RTSP session timeouts (60s on most servers) so + * the server does not silently reclaim the session, leaving us with a half-dead + * connection. */ +#define RTSP_MAX_PAUSE_MS 30000 + +/** + * Compute the desired event mask for the RTSP TCP socket based on current + * session state. POLLER_IN is included only when not paused; POLLER_OUT is + * included whenever there's outbound work pending (request headers not fully + * sent or a keepalive queued). + */ +static uint32_t rtsp_compute_socket_events(const rtsp_session_t *s) { + uint32_t mask = POLLER_HUP | POLLER_ERR | POLLER_RDHUP; + if (!s->upstream_paused) + mask |= POLLER_IN; + if (s->pending_request_sent < s->pending_request_len || s->keepalive_pending) + mask |= POLLER_OUT; + return mask; +} + +/* The TCP socket carries both control-plane responses (PLAY/keepalive replies, + * TEARDOWN) and `$`-prefixed media frames. Pausing recv() delays control + * responses too — acceptable because keepalives are sent FROM us and the + * server's reply just sits in its TCP send buffer until we resume. + * rtsp_session_tick() enforces RTSP_MAX_PAUSE_MS so we don't outlast the + * server's session timeout. */ +static void rtsp_pause_upstream(rtsp_session_t *session) { + if (!session || session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) + return; + if (session->transport_mode != RTSP_TRANSPORT_TCP) + return; + session->upstream_paused = 1; + if (poller_mod(session->epoll_fd, session->socket, rtsp_compute_socket_events(session)) < 0) { + logger(LOG_DEBUG, "RTSP TCP: poller_mod (pause) failed: %s", strerror(errno)); + session->upstream_paused = 0; + return; + } + session->pause_started_ms = get_time_ms(); + if (session->conn) + session->conn->any_upstream_paused = 1; + logger(LOG_DEBUG, "RTSP TCP: Paused upstream reads (queued=%zu limit=%zu)", + session->conn ? session->conn->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE : 0, + session->conn ? session->conn->queue_limit_bytes : 0); +} + +void rtsp_resume_upstream(rtsp_session_t *session) { + if (!session || !session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) + return; + /* Tentatively clear the paused flag so rtsp_compute_socket_events includes + * POLLER_IN; revert on poller_mod failure. */ + session->upstream_paused = 0; + if (poller_mod(session->epoll_fd, session->socket, rtsp_compute_socket_events(session)) < 0) { + logger(LOG_DEBUG, "RTSP TCP: poller_mod (resume) failed: %s", strerror(errno)); + session->upstream_paused = 1; + return; + } + session->pause_started_ms = 0; + if (session->conn && + (!session->conn->stream.http_proxy.initialized || !session->conn->stream.http_proxy.upstream_paused)) + session->conn->any_upstream_paused = 0; + logger(LOG_DEBUG, "RTSP TCP: Resumed upstream reads"); + /* Edge-triggered pollers may not fire EPOLLIN on EPOLL_CTL_MOD if data + * arrived while paused. Drain proactively to guarantee progress. */ + if (session->conn) + rtsp_handle_tcp_interleaved_data(session, session->conn); } /** @@ -1595,6 +1667,17 @@ int rtsp_session_tick(rtsp_session_t *session, int64_t now) { } } + /* Max-pause guard for TCP transport: if the client has been too slow for + * too long, abort rather than risk the upstream server timing out our + * session (would leave a half-dead connection). */ + if (session->upstream_paused && session->pause_started_ms > 0 && + now - session->pause_started_ms > RTSP_MAX_PAUSE_MS) { + logger(LOG_WARN, "RTSP TCP: Client too slow, upstream paused for %lld ms — aborting session", + (long long)(now - session->pause_started_ms)); + rtsp_session_set_state(session, RTSP_STATE_ERROR); + return -1; + } + /* Send periodic keepalive to prevent server session timeout */ if (session->state == RTSP_STATE_PLAYING && session->keepalive_interval_ms > 0 && session->session_id[0] != '\0') { if (session->last_keepalive_ms == 0) { @@ -1751,6 +1834,14 @@ int rtsp_handle_tcp_interleaved_data(rtsp_session_t *session, connection_t *conn * per data arrival. The inner recv loop may fill the small response buffer * before hitting EAGAIN; after processing we must loop back to recv more. */ for (;;) { + /* Pause upstream BEFORE recv when client queue is near limit, so data + * accumulates in the upstream TCP receive buffer rather than being dropped + * at the application layer (which would tear holes in the RTP stream). */ + if (connection_should_pause_upstream(conn)) { + rtsp_pause_upstream(session); + return total_forwarded; + } + int hit_eagain = 0; /* Fill response buffer from socket */ diff --git a/src/rtsp.h b/src/rtsp.h index ac978d2..11c5b20 100644 --- a/src/rtsp.h +++ b/src/rtsp.h @@ -196,6 +196,10 @@ typedef struct { uint8_t response_buffer[RTSP_RESPONSE_BUFFER_SIZE]; /* Buffer for RTSP responses (control plane, not media) */ + + /* Flow control state (TCP interleaved transport only) */ + int upstream_paused; /* 1 = upstream POLLER_IN currently un-armed due to client backpressure */ + int64_t pause_started_ms; /* When pause began; used by tick to enforce max-pause guard */ } rtsp_session_t; /* Function prototypes */ @@ -287,11 +291,18 @@ int rtsp_send_keepalive(rtsp_session_t *session); int rtsp_state_machine_advance(rtsp_session_t *session); /** - * Periodic tick for RTSP session (STUN timeout, keepalive) + * Periodic tick for RTSP session (STUN timeout, keepalive, max-pause guard) * @param session RTSP session * @param now Current timestamp in milliseconds - * @return 0 on success + * @return 0 on success, -1 if session must be closed (e.g. paused too long) */ int rtsp_session_tick(rtsp_session_t *session, int64_t now); +/** + * Resume reading from upstream RTSP TCP socket after client send queue has + * drained. No-op for UDP transport mode. Called from stream_on_client_drain. + * @param session RTSP session + */ +void rtsp_resume_upstream(rtsp_session_t *session); + #endif /* __RTSP_H__ */ diff --git a/src/stream.c b/src/stream.c index 5e54e85..d8c81d0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1,6 +1,7 @@ #include "stream.h" #include "connection.h" #include "fcc.h" +#include "http_proxy.h" #include "multicast.h" #include "rtp.h" #include "rtp_fec.h" @@ -19,12 +20,20 @@ #include #include -/* - * Process RTP payload with reordering - either forward to client (streaming) - * or capture I-frame (snapshot) - * Returns: bytes forwarded (>= 0), 1 if I-frame captured for snapshot, -1 on - * error - */ +void stream_on_client_drain(stream_context_t *ctx) { + /* Hot path: every successful client write hits this. Bail out cheaply when + * no upstream is paused (vast majority of streams). */ + if (!ctx || !ctx->conn || !ctx->conn->any_upstream_paused) + return; + if (!connection_can_resume_upstream(ctx->conn)) + return; + /* Resume functions are no-ops if not paused; no need to re-check here. */ + if (ctx->http_proxy.initialized) + http_proxy_resume_upstream(&ctx->http_proxy); + if (ctx->rtsp.initialized) + rtsp_resume_upstream(&ctx->rtsp); +} + int stream_process_rtp_payload(stream_context_t *ctx, buffer_ref_t *buf_ref) { uint8_t *data_ptr = (uint8_t *)buf_ref->data + buf_ref->data_offset; uint8_t *payload; diff --git a/src/stream.h b/src/stream.h index b23d26b..3243aa6 100644 --- a/src/stream.h +++ b/src/stream.h @@ -106,4 +106,15 @@ int stream_context_cleanup(stream_context_t *ctx); */ int stream_process_rtp_payload(stream_context_t *ctx, buffer_ref_t *buf_ref); +/** + * Notify that the client send queue has just been drained (some buffers + * completed sending). If any TCP-based upstream session attached to this + * connection is currently paused due to backpressure, this resumes it when + * the queue has fallen below the low watermark. + * + * Called from connection_handle_write after a successful zerocopy_send. + * @param ctx Stream context (may be uninitialized — function tolerates that) + */ +void stream_on_client_drain(stream_context_t *ctx); + #endif /* __STREAM_H__ */ From aa28452fe9c3af10cad512dfea237a5c0ee0090f Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 18:14:01 +0800 Subject: [PATCH 02/10] fix(connection): preserve POLLER_OUT when drain hook re-queues data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When connection_handle_write fully drains the zc_queue and falls into the IDLE branch, it used to disarm POLLER_OUT first, then call stream_on_client_drain. If the drain hook resumed a paused upstream session and queued new buffers in the same call frame, those buffers stayed in the queue with POLLER_OUT disarmed — so the worker never re-entered the writer until something else woke it up, manifesting as stalls under backpressure. Reorder so the drain hook fires before we recompute the poller mask, then include POLLER_OUT in the mask iff the queue is non-empty after the hook. Add an e2e regression test (test_flow_control.py) that reproduces the original ~12 % truncation by running a slow client against a 1 MiB HTTP proxy response with a constrained buffer pool (-b 128 → ~96 KiB queue limit per connection). Co-Authored-By: Claude Opus 4.7 --- e2e/test_flow_control.py | 146 +++++++++++++++++++++++++++++++++++++++ src/connection.c | 16 +++-- 2 files changed, 158 insertions(+), 4 deletions(-) create mode 100644 e2e/test_flow_control.py diff --git a/e2e/test_flow_control.py b/e2e/test_flow_control.py new file mode 100644 index 0000000..7f234e3 --- /dev/null +++ b/e2e/test_flow_control.py @@ -0,0 +1,146 @@ +""" +E2E coverage for the upstream flow-control fix. + +A slow downstream client used to be aborted partway through a large HTTP +proxy response because the per-connection zerocopy queue would saturate and +``connection_queue_zerocopy()`` would return -1 (packet-drop semantics +inherited from RTP/UDP). The fix pauses upstream reads when the client send +queue exceeds the high watermark and resumes them once it drops back below +the low watermark. + +We can't deterministically observe individual pause/resume transitions +(they depend on kernel buffer sizes), so the test asserts the only thing +that matters end-to-end: byte-for-byte completeness of the proxied response. + +The same flow-control machinery is wired into RTSP TCP interleaved transport +in src/rtsp.c, but a deterministic e2e test for it would require a more +elaborate mock that drives backpressure without closing right after sending. +The HTTP test exercises the shared infrastructure (stream_on_client_drain, +the HWM/LWM helpers, the IDLE-path POLLER_OUT preservation). +""" + +import socket +import time + +import pytest + +from helpers import ( + MockHTTPUpstream, + R2HProcess, + find_free_port, +) + +# These tests intentionally throttle the client to provoke backpressure. +pytestmark = pytest.mark.slow + + +@pytest.fixture(scope="module") +def shared_r2h(r2h_binary): + # ``-b 128`` shrinks the global buffer pool cap to 128 buffers + # (~192 KiB), which forces the per-connection zerocopy queue limit + # down to ~96 KiB. With the default cap (16384 buffers / ~24 MiB) a + # short test body would be absorbed entirely without ever crossing + # the HWM, defeating the whole purpose of these tests. + port = find_free_port() + r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-m", "100", "-b", "128"]) + r2h.start() + yield r2h + r2h.stop() + + +def _slow_drain_until_eof( + host: str, + port: int, + path: str, + chunk_size: int, + sleep_per_chunk: float, + overall_timeout: float, +) -> tuple[int, dict, bytes]: + """HTTP/1.0 GET that reads `chunk_size` bytes then sleeps, repeating + until EOF or `overall_timeout` expires. + + Returns ``(status, headers_dict, body_bytes)``. Connection-level errors + return ``(0, {}, partial_body)`` — useful for asserting that the OLD + (un-fixed) code drops the connection mid-transfer. + """ + sock = socket.create_connection((host, port), timeout=overall_timeout) + body = b"" + try: + sock.sendall(("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (path, host)).encode()) + + deadline = time.monotonic() + overall_timeout + buf = b"" + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + sock.settimeout(min(remaining, 2.0)) + try: + piece = sock.recv(chunk_size) + except socket.timeout: + continue + except OSError: + break + if not piece: + break + buf += piece + if sleep_per_chunk > 0: + time.sleep(sleep_per_chunk) + + header_end = buf.find(b"\r\n\r\n") + if header_end < 0: + return 0, {}, buf + + header_text = buf[:header_end].decode(errors="replace") + body = buf[header_end + 4 :] + parts = header_text.split("\r\n") + status_code = int(parts[0].split()[1]) + hdrs = {} + for line in parts[1:]: + if ":" in line: + k, v = line.split(":", 1) + hdrs[k.strip().lower()] = v.strip() + return status_code, hdrs, body + finally: + sock.close() + + +# --------------------------------------------------------------------------- +# HTTP proxy +# --------------------------------------------------------------------------- + + +@pytest.mark.http_proxy +class TestHTTPProxyBackpressure: + """A slow HTTP client must receive the full proxied body.""" + + def test_slow_client_receives_full_body(self, shared_r2h): + # 1 MiB body comfortably exceeds the ~96 KiB zerocopy queue limit + # imposed by the ``-b 128`` shared_r2h fixture, so the slow client + # forces multiple pause/resume cycles before EOF. + body_size = 1024 * 1024 + payload = bytes((i & 0xFF for i in range(body_size))) + + upstream = MockHTTPUpstream( + routes={"/big.ts": {"status": 200, "body": payload, "headers": {"Content-Type": "video/mp2t"}}} + ) + upstream.start() + try: + status, _, received = _slow_drain_until_eof( + "127.0.0.1", + shared_r2h.port, + "/http/127.0.0.1:%d/big.ts" % upstream.port, + chunk_size=8 * 1024, + sleep_per_chunk=0.02, # ~400 KB/s ceiling, well below pool refill rate + overall_timeout=20.0, + ) + assert status == 200, "Slow client should still receive a 200 response" + assert len(received) == body_size, ( + "Slow client received %d/%d bytes — flow control regression?" + % (len(received), body_size) + ) + assert received == payload, "Body content mismatch — corruption in proxy path" + finally: + upstream.stop() + + diff --git a/src/connection.c b/src/connection.c index 8b7c94d..f338134 100644 --- a/src/connection.c +++ b/src/connection.c @@ -573,13 +573,21 @@ connection_write_status_t connection_handle_write(connection_t *c) { } if (!c->zc_queue.head) { - /* All data sent - remove POLLER_OUT */ - connection_epoll_update_events(c->epfd, c->fd, POLLER_IN | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); - connection_report_queue(c); - if (c->state == CONN_CLOSING && !c->zc_queue.pending_head) + if (c->state == CONN_CLOSING && !c->zc_queue.pending_head) { + connection_epoll_update_events(c->epfd, c->fd, POLLER_IN | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); + connection_report_queue(c); return CONNECTION_WRITE_CLOSED; + } + /* Notify upstream BEFORE arming the poller mask: resume() may queue + * new buffers in this same call frame, in which case POLLER_OUT must + * stay armed so the worker re-enters this function to drain them. */ if (total_sent > 0) stream_on_client_drain(&c->stream); + uint32_t mask = POLLER_IN | POLLER_RDHUP | POLLER_HUP | POLLER_ERR; + if (c->zc_queue.head) + mask |= POLLER_OUT; + connection_epoll_update_events(c->epfd, c->fd, mask); + connection_report_queue(c); return CONNECTION_WRITE_IDLE; } From d4f4828156ef0b0f78ce8ae776959e7f3d6beef4 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 18:20:07 +0800 Subject: [PATCH 03/10] fix(flow-control): drop poller_mod from pause/resume, drain to EAGAIN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-1 review feedback (Copilot + Codex): 1. **kqueue safety**: poller_mod with a mask that omits POLLER_IN/POLLER_OUT on kqueue (poller_kqueue.c) deletes BOTH read and write filters, which silently disables HUP/EOF detection on the upstream socket while paused. Switch pause/resume to flag-only — the existing HWM check at the top of the recv path is the actual gate. Edge-triggered pollers still wake the worker on new data; the recv handler bails out via the flag when the queue is saturated. This also removes the pause-failure stall vector. 2. **Drain to EAGAIN on resume (HTTP proxy)**: The previous resume did a single recv, but ET semantics require draining until EAGAIN. Loop the try_receive call so multi-chunk buffered data is fully delivered. 3. **EOF cleanup on RTSP resume**: rtsp_resume_upstream now propagates the -1 return from rtsp_handle_tcp_interleaved_data (recv-returns-0 / error), triggering the same teardown the IN-event handler would have run. Without this, an upstream FIN that arrives while paused leaves the session orphaned until the keepalive timer expires. 4. **Doc nit (stream.h)**: clarify that stream_on_client_drain requires the struct to be at least zero-initialized; uninit stack memory is unsafe. Co-Authored-By: Claude Opus 4.7 --- src/http_proxy.c | 45 ++++++++++++++++++++++---------- src/rtsp.c | 67 ++++++++++++++++++++---------------------------- src/stream.h | 4 ++- 3 files changed, 63 insertions(+), 53 deletions(-) diff --git a/src/http_proxy.c b/src/http_proxy.c index b9885a5..3f1297b 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -80,13 +80,17 @@ void http_proxy_session_init(http_proxy_session_t *session) { session->cleanup_done = 0; } +/* Pause/resume are flag-only. We deliberately avoid `poller_mod` because on + * kqueue (src/poller_kqueue.c) a mask without POLLER_IN/POLLER_OUT deletes + * the read AND write filters, silently losing HUP/EOF detection while + * paused. Edge-triggered pollers will keep waking the worker for new data + * arrivals; the recv handler bails out via the HWM check when the queue is + * saturated. This keeps the kernel TCP recv buffer the natural backstop: + * once it fills, the upstream sender's window closes naturally. + */ static void http_proxy_pause_upstream(http_proxy_session_t *session) { - if (!session || session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) - return; - if (poller_mod(session->epoll_fd, session->socket, POLLER_HUP | POLLER_ERR | POLLER_RDHUP) < 0) { - logger(LOG_DEBUG, "HTTP Proxy: poller_mod (pause) failed: %s", strerror(errno)); + if (!session || session->upstream_paused) return; - } session->upstream_paused = 1; if (session->conn) session->conn->any_upstream_paused = 1; @@ -96,19 +100,34 @@ static void http_proxy_pause_upstream(http_proxy_session_t *session) { } void http_proxy_resume_upstream(http_proxy_session_t *session) { - if (!session || !session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) - return; - if (poller_mod(session->epoll_fd, session->socket, POLLER_IN | POLLER_HUP | POLLER_ERR | POLLER_RDHUP) < 0) { - logger(LOG_DEBUG, "HTTP Proxy: poller_mod (resume) failed: %s", strerror(errno)); + if (!session || !session->upstream_paused) return; - } session->upstream_paused = 0; if (session->conn && (!session->conn->stream.rtsp.initialized || !session->conn->stream.rtsp.upstream_paused)) session->conn->any_upstream_paused = 0; logger(LOG_DEBUG, "HTTP Proxy: Resumed upstream reads"); - /* Edge-triggered pollers may not fire EPOLLIN on EPOLL_CTL_MOD if data - * arrived while paused. Drain proactively to guarantee progress. */ - http_proxy_try_receive_response(session); + /* Drain everything we can in this call frame. Loop until EAGAIN, + * HWM re-pause, EOF, or error so we don't leave bytes buffered in the + * kernel waiting for an edge that never comes. */ + while (session->state == HTTP_PROXY_STATE_STREAMING && !session->upstream_paused) { + int ret = http_proxy_try_receive_response(session); + if (ret <= 0) + break; + } + /* If draining detected EOF, mirror the cleanup that + * http_proxy_handle_socket_event would do on STATE_COMPLETE so the session + * is not left orphaned waiting for an event that won't come. */ + if (session->state == HTTP_PROXY_STATE_COMPLETE) { + if (session->socket >= 0) { + worker_cleanup_socket_from_epoll(session->epoll_fd, session->socket); + session->socket = -1; + } + if (session->conn && session->conn->state != CONN_CLOSING) { + session->conn->state = CONN_CLOSING; + connection_epoll_update_events(session->conn->epfd, session->conn->fd, + POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); + } + } } int http_proxy_parse_url(http_proxy_session_t *session, const char *url) { diff --git a/src/rtsp.c b/src/rtsp.c index 67df06a..b7a0a43 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -324,38 +324,22 @@ void rtsp_session_init(rtsp_session_t *session) { * connection. */ #define RTSP_MAX_PAUSE_MS 30000 -/** - * Compute the desired event mask for the RTSP TCP socket based on current - * session state. POLLER_IN is included only when not paused; POLLER_OUT is - * included whenever there's outbound work pending (request headers not fully - * sent or a keepalive queued). - */ -static uint32_t rtsp_compute_socket_events(const rtsp_session_t *s) { - uint32_t mask = POLLER_HUP | POLLER_ERR | POLLER_RDHUP; - if (!s->upstream_paused) - mask |= POLLER_IN; - if (s->pending_request_sent < s->pending_request_len || s->keepalive_pending) - mask |= POLLER_OUT; - return mask; -} - -/* The TCP socket carries both control-plane responses (PLAY/keepalive replies, - * TEARDOWN) and `$`-prefixed media frames. Pausing recv() delays control - * responses too — acceptable because keepalives are sent FROM us and the - * server's reply just sits in its TCP send buffer until we resume. - * rtsp_session_tick() enforces RTSP_MAX_PAUSE_MS so we don't outlast the - * server's session timeout. */ +/* Pause/resume are flag-only. We deliberately avoid `poller_mod` because on + * kqueue (src/poller_kqueue.c) a mask without POLLER_IN/POLLER_OUT deletes + * the read AND write filters, silently losing HUP/EOF detection while + * paused. The TCP socket also carries control-plane responses + * (PLAY/keepalive replies, TEARDOWN), and outbound work (state machine + * requests, keepalives) updates POLLER_OUT independently — keeping the read + * filter in place lets all of that continue to flow. rtsp_handle_tcp_- + * interleaved_data() bails out via the HWM check when the queue is + * saturated; rtsp_session_tick() enforces RTSP_MAX_PAUSE_MS so we don't + * outlast the server's session timeout. */ static void rtsp_pause_upstream(rtsp_session_t *session) { - if (!session || session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) + if (!session || session->upstream_paused) return; if (session->transport_mode != RTSP_TRANSPORT_TCP) return; session->upstream_paused = 1; - if (poller_mod(session->epoll_fd, session->socket, rtsp_compute_socket_events(session)) < 0) { - logger(LOG_DEBUG, "RTSP TCP: poller_mod (pause) failed: %s", strerror(errno)); - session->upstream_paused = 0; - return; - } session->pause_started_ms = get_time_ms(); if (session->conn) session->conn->any_upstream_paused = 1; @@ -365,25 +349,30 @@ static void rtsp_pause_upstream(rtsp_session_t *session) { } void rtsp_resume_upstream(rtsp_session_t *session) { - if (!session || !session->upstream_paused || session->socket < 0 || session->epoll_fd < 0) + if (!session || !session->upstream_paused) return; - /* Tentatively clear the paused flag so rtsp_compute_socket_events includes - * POLLER_IN; revert on poller_mod failure. */ session->upstream_paused = 0; - if (poller_mod(session->epoll_fd, session->socket, rtsp_compute_socket_events(session)) < 0) { - logger(LOG_DEBUG, "RTSP TCP: poller_mod (resume) failed: %s", strerror(errno)); - session->upstream_paused = 1; - return; - } session->pause_started_ms = 0; if (session->conn && (!session->conn->stream.http_proxy.initialized || !session->conn->stream.http_proxy.upstream_paused)) session->conn->any_upstream_paused = 0; logger(LOG_DEBUG, "RTSP TCP: Resumed upstream reads"); - /* Edge-triggered pollers may not fire EPOLLIN on EPOLL_CTL_MOD if data - * arrived while paused. Drain proactively to guarantee progress. */ - if (session->conn) - rtsp_handle_tcp_interleaved_data(session, session->conn); + /* Drain everything we can in this call frame. The function loops + * internally until EAGAIN, HWM re-pause, EOF, or buffer-full. If it + * returns -1 (EOF/recv error), mirror the cleanup the IN-event handler + * would have done so the session is not orphaned. */ + if (!session->conn) + return; + int result = rtsp_handle_tcp_interleaved_data(session, session->conn); + if (result < 0) { + logger(LOG_DEBUG, "RTSP TCP: Upstream EOF during resume drain, closing"); + rtsp_force_cleanup(session); + if (session->conn->state != CONN_CLOSING) { + session->conn->state = CONN_CLOSING; + connection_epoll_update_events(session->conn->epfd, session->conn->fd, + POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); + } + } } /** diff --git a/src/stream.h b/src/stream.h index 3243aa6..50c0c4d 100644 --- a/src/stream.h +++ b/src/stream.h @@ -113,7 +113,9 @@ int stream_process_rtp_payload(stream_context_t *ctx, buffer_ref_t *buf_ref); * the queue has fallen below the low watermark. * * Called from connection_handle_write after a successful zerocopy_send. - * @param ctx Stream context (may be uninitialized — function tolerates that) + * The struct must be at least zero-initialized (the embedded stream context + * in connection_t is via calloc); passing uninitialized stack memory is + * unsafe — `conn` and the `*.initialized` flags are dereferenced. */ void stream_on_client_drain(stream_context_t *ctx); From 8efbc4d23be0fa7994054b0fbe3e7b008634f3df Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 18:22:40 +0800 Subject: [PATCH 04/10] test(flow-control): guard helper against malformed status line Round-2 review (Copilot): if the slow-drain helper observes a truncated or corrupted HTTP response (a plausible failure mode for the regression this test guards against), parsing the status line would raise IndexError / ValueError instead of letting the assertion report a clean test failure. Wrap the parse and return (0, {}, buf) on failure so the test message points at the actual problem. Co-Authored-By: Claude Opus 4.7 --- e2e/test_flow_control.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/e2e/test_flow_control.py b/e2e/test_flow_control.py index 7f234e3..6e21b0e 100644 --- a/e2e/test_flow_control.py +++ b/e2e/test_flow_control.py @@ -94,7 +94,12 @@ def _slow_drain_until_eof( header_text = buf[:header_end].decode(errors="replace") body = buf[header_end + 4 :] parts = header_text.split("\r\n") - status_code = int(parts[0].split()[1]) + # Guard against malformed status lines (truncated/corrupted response) + # so the test fails on the assertion, not on a parse exception. + try: + status_code = int(parts[0].split()[1]) + except (IndexError, ValueError): + return 0, {}, buf hdrs = {} for line in parts[1:]: if ":" in line: From 684e6c79713ac222d9130614830b7233a6b91ecf Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 20:03:05 +0800 Subject: [PATCH 05/10] refactor(connection): extract shared upstream-flow-control helpers Simplify pass after the round-1/round-2 review fixes. - Extract `connection_recompute_any_upstream_paused()` so HTTP proxy and RTSP no longer reach into each other's session structs to decide when to clear the connection-level fast-path bit. Adding a third upstream type (or removing one) is now a one-line change in the helper. - Extract `connection_begin_drain_close()` for the `state = CONN_CLOSING + epoll_update_events(POLLER_*)` incantation that appeared inline in both resume EOF-cleanup paths. Net: -31 / +48 lines, but the deltas in http_proxy.c and rtsp.c shrink (-7 each), and the new helpers can absorb the equivalent inline pattern elsewhere in rtsp.c on a future sweep. Co-Authored-By: Claude Opus 4.7 --- src/connection.c | 15 +++++++++++++++ src/connection.h | 16 ++++++++++++++++ src/http_proxy.c | 24 +++++++++--------------- src/rtsp.c | 24 ++++++++---------------- 4 files changed, 48 insertions(+), 31 deletions(-) diff --git a/src/connection.c b/src/connection.c index f338134..6163721 100644 --- a/src/connection.c +++ b/src/connection.c @@ -350,6 +350,21 @@ int connection_can_resume_upstream(const connection_t *c) { return queued <= lwm; } +void connection_recompute_any_upstream_paused(connection_t *c) { + if (!c) + return; + c->any_upstream_paused = + (c->stream.http_proxy.initialized && c->stream.http_proxy.upstream_paused) || + (c->stream.rtsp.initialized && c->stream.rtsp.upstream_paused); +} + +void connection_begin_drain_close(connection_t *c) { + if (!c || c->state == CONN_CLOSING) + return; + c->state = CONN_CLOSING; + connection_epoll_update_events(c->epfd, c->fd, POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); +} + int connection_set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) diff --git a/src/connection.h b/src/connection.h index b9f536c..f36c938 100644 --- a/src/connection.h +++ b/src/connection.h @@ -204,4 +204,20 @@ int connection_should_pause_upstream(const connection_t *c); */ int connection_can_resume_upstream(const connection_t *c); +/** + * Recompute the connection-level `any_upstream_paused` bit by inspecting all + * attached upstream sessions. Call after any individual upstream session + * toggles its own `upstream_paused` flag so the cheap per-write fast path in + * stream_on_client_drain() stays accurate. + */ +void connection_recompute_any_upstream_paused(connection_t *c); + +/** + * Mark the connection for orderly shutdown after upstream EOF/error: switch + * to CONN_CLOSING and re-arm the full event mask so the worker keeps draining + * any queued bytes to the client before tearing down. No-op if the + * connection is already CONN_CLOSING. + */ +void connection_begin_drain_close(connection_t *c); + #endif /* CONNECTION_H */ diff --git a/src/http_proxy.c b/src/http_proxy.c index 3f1297b..cefb02d 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -92,8 +92,7 @@ static void http_proxy_pause_upstream(http_proxy_session_t *session) { if (!session || session->upstream_paused) return; session->upstream_paused = 1; - if (session->conn) - session->conn->any_upstream_paused = 1; + connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "HTTP Proxy: Paused upstream reads (queued=%zu limit=%zu)", session->conn ? session->conn->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE : 0, session->conn ? session->conn->queue_limit_bytes : 0); @@ -103,30 +102,25 @@ void http_proxy_resume_upstream(http_proxy_session_t *session) { if (!session || !session->upstream_paused) return; session->upstream_paused = 0; - if (session->conn && (!session->conn->stream.rtsp.initialized || !session->conn->stream.rtsp.upstream_paused)) - session->conn->any_upstream_paused = 0; + connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "HTTP Proxy: Resumed upstream reads"); - /* Drain everything we can in this call frame. Loop until EAGAIN, - * HWM re-pause, EOF, or error so we don't leave bytes buffered in the - * kernel waiting for an edge that never comes. */ + /* Drain synchronously here: edge-triggered pollers won't deliver another + * EPOLLIN edge for bytes that arrived while paused, so anything still + * buffered in the kernel must come out in this call frame. Loop exits + * on EAGAIN, HWM re-pause, EOF, or error. */ while (session->state == HTTP_PROXY_STATE_STREAMING && !session->upstream_paused) { int ret = http_proxy_try_receive_response(session); if (ret <= 0) break; } - /* If draining detected EOF, mirror the cleanup that - * http_proxy_handle_socket_event would do on STATE_COMPLETE so the session - * is not left orphaned waiting for an event that won't come. */ + /* EOF discovered mid-drain: no further upstream events will arrive, so + * close the upstream socket here and start the client drain. */ if (session->state == HTTP_PROXY_STATE_COMPLETE) { if (session->socket >= 0) { worker_cleanup_socket_from_epoll(session->epoll_fd, session->socket); session->socket = -1; } - if (session->conn && session->conn->state != CONN_CLOSING) { - session->conn->state = CONN_CLOSING; - connection_epoll_update_events(session->conn->epfd, session->conn->fd, - POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); - } + connection_begin_drain_close(session->conn); } } diff --git a/src/rtsp.c b/src/rtsp.c index b7a0a43..de5535b 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -341,8 +341,7 @@ static void rtsp_pause_upstream(rtsp_session_t *session) { return; session->upstream_paused = 1; session->pause_started_ms = get_time_ms(); - if (session->conn) - session->conn->any_upstream_paused = 1; + connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "RTSP TCP: Paused upstream reads (queued=%zu limit=%zu)", session->conn ? session->conn->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE : 0, session->conn ? session->conn->queue_limit_bytes : 0); @@ -353,25 +352,18 @@ void rtsp_resume_upstream(rtsp_session_t *session) { return; session->upstream_paused = 0; session->pause_started_ms = 0; - if (session->conn && - (!session->conn->stream.http_proxy.initialized || !session->conn->stream.http_proxy.upstream_paused)) - session->conn->any_upstream_paused = 0; + connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "RTSP TCP: Resumed upstream reads"); - /* Drain everything we can in this call frame. The function loops - * internally until EAGAIN, HWM re-pause, EOF, or buffer-full. If it - * returns -1 (EOF/recv error), mirror the cleanup the IN-event handler - * would have done so the session is not orphaned. */ + /* Drain synchronously here: edge-triggered pollers won't deliver another + * EPOLLIN edge for bytes that arrived while paused. The drain function + * loops internally and returns -1 on EOF/recv error — close the upstream + * here in that case so the session isn't orphaned waiting for an event. */ if (!session->conn) return; - int result = rtsp_handle_tcp_interleaved_data(session, session->conn); - if (result < 0) { + if (rtsp_handle_tcp_interleaved_data(session, session->conn) < 0) { logger(LOG_DEBUG, "RTSP TCP: Upstream EOF during resume drain, closing"); rtsp_force_cleanup(session); - if (session->conn->state != CONN_CLOSING) { - session->conn->state = CONN_CLOSING; - connection_epoll_update_events(session->conn->epfd, session->conn->fd, - POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); - } + connection_begin_drain_close(session->conn); } } From 297ac05cc521c32b2db9ac77b6250bcf615eea49 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 20:28:04 +0800 Subject: [PATCH 06/10] chore: apply formatter fixes (biome/ruff/clang-format) Co-Authored-By: Claude Opus 4.7 --- e2e/test_flow_control.py | 10 ++++------ src/connection.c | 5 ++--- src/rtsp.h | 4 ++-- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/e2e/test_flow_control.py b/e2e/test_flow_control.py index 6e21b0e..67d6eb6 100644 --- a/e2e/test_flow_control.py +++ b/e2e/test_flow_control.py @@ -98,7 +98,7 @@ def _slow_drain_until_eof( # so the test fails on the assertion, not on a parse exception. try: status_code = int(parts[0].split()[1]) - except (IndexError, ValueError): + except IndexError, ValueError: return 0, {}, buf hdrs = {} for line in parts[1:]: @@ -140,12 +140,10 @@ def test_slow_client_receives_full_body(self, shared_r2h): overall_timeout=20.0, ) assert status == 200, "Slow client should still receive a 200 response" - assert len(received) == body_size, ( - "Slow client received %d/%d bytes — flow control regression?" - % (len(received), body_size) + assert len(received) == body_size, "Slow client received %d/%d bytes — flow control regression?" % ( + len(received), + body_size, ) assert received == payload, "Body content mismatch — corruption in proxy path" finally: upstream.stop() - - diff --git a/src/connection.c b/src/connection.c index 6163721..5358f88 100644 --- a/src/connection.c +++ b/src/connection.c @@ -353,9 +353,8 @@ int connection_can_resume_upstream(const connection_t *c) { void connection_recompute_any_upstream_paused(connection_t *c) { if (!c) return; - c->any_upstream_paused = - (c->stream.http_proxy.initialized && c->stream.http_proxy.upstream_paused) || - (c->stream.rtsp.initialized && c->stream.rtsp.upstream_paused); + c->any_upstream_paused = (c->stream.http_proxy.initialized && c->stream.http_proxy.upstream_paused) || + (c->stream.rtsp.initialized && c->stream.rtsp.upstream_paused); } void connection_begin_drain_close(connection_t *c) { diff --git a/src/rtsp.h b/src/rtsp.h index 11c5b20..47a4f94 100644 --- a/src/rtsp.h +++ b/src/rtsp.h @@ -198,8 +198,8 @@ typedef struct { plane, not media) */ /* Flow control state (TCP interleaved transport only) */ - int upstream_paused; /* 1 = upstream POLLER_IN currently un-armed due to client backpressure */ - int64_t pause_started_ms; /* When pause began; used by tick to enforce max-pause guard */ + int upstream_paused; /* 1 = upstream POLLER_IN currently un-armed due to client backpressure */ + int64_t pause_started_ms; /* When pause began; used by tick to enforce max-pause guard */ } rtsp_session_t; /* Function prototypes */ From 9f41b711f805979bf36ef289cda4f7dfe33d8e0c Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 21:18:29 +0800 Subject: [PATCH 07/10] refactor(connection): fast-path HWM/LWM checks, dedupe queue helpers Hot-path check for upstream pause/resume now exits in 3 instructions when the queue is below the absolute-minimum HWM/LWM (derived from the floor of the dynamic limit), and refreshes c->queue_limit_bytes via a side-effect-free compute path only above that threshold. This fixes the stale-limit bug Copilot flagged without paying recompute cost on every packet. - Split connection_calculate_queue_limit into compute (pure, hot-path safe) and update (EWMA + slow-state machine, called per enqueue). - Extract queue_limit_inputs_t + connection_prepare_queue_limit_inputs + connection_apply_slow_clamp so both compute and update share input prep without duplicating utilization math. - Add connection_queue_bytes() inline helper, replace 6 call sites of num_queued * BUFFER_POOL_BUFFER_SIZE across connection/http_proxy/rtsp. - Add CONN_HWM/CONN_LWM macros, move the HWM/LWM NUM/DEN constants from the header to connection.c (no external callers). Co-Authored-By: Claude Opus 4.7 --- src/connection.c | 118 ++++++++++++++++++++++++++++++++++------------- src/connection.h | 27 ++++++----- src/http_proxy.c | 2 +- src/rtsp.c | 2 +- 4 files changed, 102 insertions(+), 47 deletions(-) diff --git a/src/connection.c b/src/connection.c index 5358f88..489e561 100644 --- a/src/connection.c +++ b/src/connection.c @@ -245,18 +245,26 @@ static size_t connection_compute_limit_bytes(buffer_pool_t *pool, size_t fair_by return limit_bytes; } -static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms) { +/* Side-effect-free inputs into the queue-limit calculation. */ +typedef struct { + buffer_pool_t *pool; + size_t fair_bytes; + double burst_factor; /* before slow_active clamp */ +} queue_limit_inputs_t; + +static void connection_prepare_queue_limit_inputs(queue_limit_inputs_t *out) { buffer_pool_t *pool = &zerocopy_state.pool; - size_t active = zerocopy_active_streams(); + out->pool = pool; + size_t active = zerocopy_active_streams(); if (active == 0) active = 1; size_t total_buffers = pool->num_buffers ? pool->num_buffers : BUFFER_POOL_INITIAL_SIZE; - size_t share_buffers = total_buffers / active; if (share_buffers < CONN_QUEUE_MIN_BUFFERS) share_buffers = CONN_QUEUE_MIN_BUFFERS; + out->fair_bytes = share_buffers * BUFFER_POOL_BUFFER_SIZE; double utilization = 0.0; if (pool->max_buffers > 0) { @@ -264,29 +272,48 @@ static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms) utilization = (double)used_buffers / (double)pool->max_buffers; } - double burst_factor = CONN_QUEUE_BURST_FACTOR; + out->burst_factor = CONN_QUEUE_BURST_FACTOR; if (pool->num_buffers >= pool->max_buffers || utilization >= CONN_QUEUE_HIGH_UTIL_THRESHOLD) - burst_factor = CONN_QUEUE_BURST_FACTOR_CONGESTED; + out->burst_factor = CONN_QUEUE_BURST_FACTOR_CONGESTED; if (pool->num_free < pool->low_watermark / 2 || utilization >= CONN_QUEUE_DRAIN_UTIL_THRESHOLD) - burst_factor = CONN_QUEUE_BURST_FACTOR_DRAIN; + out->burst_factor = CONN_QUEUE_BURST_FACTOR_DRAIN; +} - size_t fair_bytes = share_buffers * BUFFER_POOL_BUFFER_SIZE; - double queue_mem_bytes = (double)c->zc_queue.num_queued * (double)BUFFER_POOL_BUFFER_SIZE; +static inline double connection_apply_slow_clamp(double burst_factor, int slow_active) { + return (slow_active && burst_factor > CONN_QUEUE_SLOW_CLAMP_FACTOR) ? CONN_QUEUE_SLOW_CLAMP_FACTOR : burst_factor; +} + +/* Pure (no side effect) limit computation. Reads current pool state and the + * connection's existing slow_active flag, but does NOT update the EWMA or the + * slow-state debounce machine. Safe to call from hot-path checks where + * sampling EWMA at the wrong cadence would distort slow detection. */ +static inline size_t connection_compute_queue_limit(const connection_t *c) { + queue_limit_inputs_t in; + connection_prepare_queue_limit_inputs(&in); + double burst_factor = connection_apply_slow_clamp(in.burst_factor, c->slow_active); + return connection_compute_limit_bytes(in.pool, in.fair_bytes, burst_factor); +} + +static size_t connection_update_queue_limit(connection_t *c, int64_t now_ms) { + queue_limit_inputs_t in; + connection_prepare_queue_limit_inputs(&in); + double queue_mem_bytes = (double)c->zc_queue.num_queued * (double)BUFFER_POOL_BUFFER_SIZE; if (c->queue_avg_bytes <= 0.0) c->queue_avg_bytes = queue_mem_bytes; else c->queue_avg_bytes = (1.0 - CONN_QUEUE_EWMA_ALPHA) * c->queue_avg_bytes + CONN_QUEUE_EWMA_ALPHA * queue_mem_bytes; - size_t bursted_bytes = connection_compute_limit_bytes(pool, fair_bytes, burst_factor); - - double slow_threshold = (double)fair_bytes * CONN_QUEUE_SLOW_FACTOR; + /* Use unclamped burst_factor for slow thresholds — the "ideal" reference + * the slow-state machine compares the EWMA against. */ + size_t bursted_bytes = connection_compute_limit_bytes(in.pool, in.fair_bytes, in.burst_factor); + double slow_threshold = (double)in.fair_bytes * CONN_QUEUE_SLOW_FACTOR; double limit_based_threshold = (double)bursted_bytes * CONN_QUEUE_SLOW_LIMIT_RATIO; if (slow_threshold > limit_based_threshold) slow_threshold = limit_based_threshold; - double slow_exit_threshold = (double)fair_bytes * CONN_QUEUE_SLOW_EXIT_FACTOR; + double slow_exit_threshold = (double)in.fair_bytes * CONN_QUEUE_SLOW_EXIT_FACTOR; double limit_exit_threshold = (double)bursted_bytes * CONN_QUEUE_SLOW_EXIT_LIMIT_RATIO; if (slow_exit_threshold > limit_exit_threshold) slow_exit_threshold = limit_exit_threshold; @@ -309,12 +336,8 @@ static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms) c->slow_candidate_since = 0; } - if (c->slow_active && burst_factor > CONN_QUEUE_SLOW_CLAMP_FACTOR) - burst_factor = CONN_QUEUE_SLOW_CLAMP_FACTOR; - - size_t limit_bytes = connection_compute_limit_bytes(pool, fair_bytes, burst_factor); - - return limit_bytes; + double burst_factor = connection_apply_slow_clamp(in.burst_factor, c->slow_active); + return connection_compute_limit_bytes(in.pool, in.fair_bytes, burst_factor); } static inline void connection_record_drop(connection_t *c, size_t len) { @@ -328,26 +351,59 @@ static void connection_report_queue(connection_t *c) { return; size_t queue_buffers = c->zc_queue.num_queued; - size_t queue_bytes = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; + size_t queue_bytes = connection_queue_bytes(c); status_update_client_queue(c->status_index, queue_bytes, queue_buffers, c->queue_limit_bytes, c->queue_bytes_highwater, c->queue_buffers_highwater, c->dropped_packets, c->dropped_bytes, c->backpressure_events, c->slow_active); } -int connection_should_pause_upstream(const connection_t *c) { - if (!c || c->queue_limit_bytes == 0) + +/* Backpressure watermarks for TCP-to-TCP relay flow control. Upstream modules + * (HTTP proxy, RTSP TCP) pause reads when the client send queue exceeds HWM + * and resume when it falls back below LWM. The 25% hysteresis band prevents + * thrash. */ +#define CONN_FLOW_CONTROL_HWM_NUM 3 +#define CONN_FLOW_CONTROL_HWM_DEN 4 /* HWM = 75% of queue_limit_bytes */ +#define CONN_FLOW_CONTROL_LWM_NUM 1 +#define CONN_FLOW_CONTROL_LWM_DEN 2 /* LWM = 50% of queue_limit_bytes */ + +#define CONN_HWM(limit) (((limit) * CONN_FLOW_CONTROL_HWM_NUM) / CONN_FLOW_CONTROL_HWM_DEN) +#define CONN_LWM(limit) (((limit) * CONN_FLOW_CONTROL_LWM_NUM) / CONN_FLOW_CONTROL_LWM_DEN) + +/* Lower bound on the dynamic queue limit (in slot-equivalent bytes). Derived + * from the CONN_QUEUE_MIN_BUFFERS clamp on share_buffers and the DRAIN burst + * factor floor of 1.0 — the dynamic limit can never drop below this value + * regardless of active_streams or pool utilization shifts. Used to gate a + * fast-path early-out in the HWM/LWM hot-path checks. */ +#define CONN_FLOW_CONTROL_MIN_LIMIT_BYTES ((size_t)CONN_QUEUE_MIN_BUFFERS * (size_t)BUFFER_POOL_BUFFER_SIZE) +#define CONN_FLOW_CONTROL_MIN_HWM_BYTES CONN_HWM(CONN_FLOW_CONTROL_MIN_LIMIT_BYTES) +#define CONN_FLOW_CONTROL_MIN_LWM_BYTES CONN_LWM(CONN_FLOW_CONTROL_MIN_LIMIT_BYTES) + +int connection_should_pause_upstream(connection_t *c) { + if (!c) return 0; - size_t queued = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; - size_t hwm = (c->queue_limit_bytes * CONN_FLOW_CONTROL_HWM_NUM) / CONN_FLOW_CONTROL_HWM_DEN; - return queued >= hwm; + size_t queued = connection_queue_bytes(c); + /* Fast path: queue is below the HWM of the absolute-minimum dynamic limit. + * No pause possible regardless of how active_streams / pool utilization / + * slow_active have shifted since the last enqueue. */ + if (queued < CONN_FLOW_CONTROL_MIN_HWM_BYTES) + return 0; + /* Slow path: refresh limit so the decision uses current inputs. */ + c->queue_limit_bytes = connection_compute_queue_limit(c); + return queued >= CONN_HWM(c->queue_limit_bytes); } -int connection_can_resume_upstream(const connection_t *c) { - if (!c || c->queue_limit_bytes == 0) +int connection_can_resume_upstream(connection_t *c) { + if (!c) return 0; - size_t queued = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; - size_t lwm = (c->queue_limit_bytes * CONN_FLOW_CONTROL_LWM_NUM) / CONN_FLOW_CONTROL_LWM_DEN; - return queued <= lwm; + size_t queued = connection_queue_bytes(c); + /* Fast path: queue is below the LWM of the absolute-minimum dynamic limit. + * Always resumable regardless of input shifts. */ + if (queued <= CONN_FLOW_CONTROL_MIN_LWM_BYTES) + return 1; + /* Slow path: refresh limit so the decision uses current inputs. */ + c->queue_limit_bytes = connection_compute_queue_limit(c); + return queued <= CONN_LWM(c->queue_limit_bytes); } void connection_recompute_any_upstream_paused(connection_t *c) { @@ -1059,8 +1115,8 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref) { return 0; int64_t now_ms = get_time_ms(); - size_t limit_bytes = connection_calculate_queue_limit(c, now_ms); - size_t queued_bytes = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; + size_t limit_bytes = connection_update_queue_limit(c, now_ms); + size_t queued_bytes = connection_queue_bytes(c); size_t projected_bytes = queued_bytes + buf_ref->data_size; c->queue_limit_bytes = limit_bytes; diff --git a/src/connection.h b/src/connection.h index f36c938..9ec45e1 100644 --- a/src/connection.h +++ b/src/connection.h @@ -181,28 +181,27 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref); */ int connection_queue_file(connection_t *c, int file_fd, off_t file_offset, size_t file_size); -/* Backpressure watermarks for TCP-to-TCP relay flow control. Upstream - * modules (HTTP proxy, RTSP TCP) pause reads when the client send queue - * exceeds HWM and resume when it falls back below LWM. The 25% hysteresis - * band prevents thrash. */ -#define CONN_FLOW_CONTROL_HWM_NUM 3 -#define CONN_FLOW_CONTROL_HWM_DEN 4 /* HWM = 75% of queue_limit_bytes */ -#define CONN_FLOW_CONTROL_LWM_NUM 1 -#define CONN_FLOW_CONTROL_LWM_DEN 2 /* LWM = 50% of queue_limit_bytes */ +/* Slot-equivalent bytes currently queued (each pending buffer counts as a full + * BUFFER_POOL_BUFFER_SIZE slot, matching the unit used by queue_limit_bytes). */ +static inline size_t connection_queue_bytes(const connection_t *c) { + return c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; +} /** * Returns true when the client send queue has reached the HWM and upstream - * reads should be paused. Returns false if queue_limit_bytes is unset - * (e.g. before the first zerocopy enqueue). + * reads should be paused. Refreshes c->queue_limit_bytes from current pool + * state when the queue rises above the absolute-minimum-limit fast-path + * threshold, so the decision uses fresh inputs (active stream count, pool + * utilization) rather than whatever was cached at the last enqueue. */ -int connection_should_pause_upstream(const connection_t *c); +int connection_should_pause_upstream(connection_t *c); /** * Returns true when the client send queue has fallen back below the LWM and - * paused upstream reads should be resumed. Returns false if queue_limit_bytes - * is unset. + * paused upstream reads should be resumed. Refreshes c->queue_limit_bytes + * before deciding for the same reason as the pause helper. */ -int connection_can_resume_upstream(const connection_t *c); +int connection_can_resume_upstream(connection_t *c); /** * Recompute the connection-level `any_upstream_paused` bit by inspecting all diff --git a/src/http_proxy.c b/src/http_proxy.c index cefb02d..21efbd8 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -94,7 +94,7 @@ static void http_proxy_pause_upstream(http_proxy_session_t *session) { session->upstream_paused = 1; connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "HTTP Proxy: Paused upstream reads (queued=%zu limit=%zu)", - session->conn ? session->conn->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE : 0, + session->conn ? connection_queue_bytes(session->conn) : 0, session->conn ? session->conn->queue_limit_bytes : 0); } diff --git a/src/rtsp.c b/src/rtsp.c index de5535b..b838028 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -343,7 +343,7 @@ static void rtsp_pause_upstream(rtsp_session_t *session) { session->pause_started_ms = get_time_ms(); connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "RTSP TCP: Paused upstream reads (queued=%zu limit=%zu)", - session->conn ? session->conn->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE : 0, + session->conn ? connection_queue_bytes(session->conn) : 0, session->conn ? session->conn->queue_limit_bytes : 0); } From 92f67a3e25b7740884f9ac7f683fdf062bb6efc6 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 21:29:25 +0800 Subject: [PATCH 08/10] refactor(stats): repurpose backpressure_events to count upstream pause events Previously backpressure_events was a 1:1 alias of dropped_packets, incremented inside connection_record_drop. With the new HWM-pause flow control, HTTP-proxy and RTSP-TCP almost never drop, so the field was always 0 on TCP paths and just duplicated dropped_packets on UDP. Redefine the field to count upstream-pause occurrences: - connection_record_drop no longer touches backpressure_events; dropped_packets remains the sole drop counter (UDP paths). - http_proxy_pause_upstream and rtsp_pause_upstream increment it once per pause edge (when upstream_paused flips 0 -> 1). - The drop-log throttle in connection_queue_zerocopy now keys off dropped_packets so it survives the dropped increment. Net effect: dropped_packets and backpressure_events are now orthogonal - "how many packets we dropped" vs. "how many times we throttled the upstream", letting the status page distinguish congestion from throttling. Co-Authored-By: Claude Opus 4.7 --- src/connection.c | 3 +-- src/connection.h | 4 ++++ src/http_proxy.c | 2 ++ src/rtsp.c | 2 ++ src/status.h | 2 +- 5 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/connection.c b/src/connection.c index 489e561..8ff5938 100644 --- a/src/connection.c +++ b/src/connection.c @@ -343,7 +343,6 @@ static size_t connection_update_queue_limit(connection_t *c, int64_t now_ms) { static inline void connection_record_drop(connection_t *c, size_t len) { c->dropped_packets++; c->dropped_bytes += len; - c->backpressure_events++; } static void connection_report_queue(connection_t *c) { @@ -1124,7 +1123,7 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref) { if (projected_bytes > limit_bytes) { connection_record_drop(c, buf_ref->data_size); - if (c->backpressure_events == 1 || (c->backpressure_events % 200) == 0) { + if (c->dropped_packets == 1 || (c->dropped_packets % 200) == 0) { logger(LOG_DEBUG, "Backpressure: dropping %zu bytes for client fd=%d (queued=%zu " "limit=%zu drops=%llu)", diff --git a/src/connection.h b/src/connection.h index 9ec45e1..5e9afa6 100644 --- a/src/connection.h +++ b/src/connection.h @@ -62,6 +62,10 @@ typedef struct connection_s { size_t queue_buffers_highwater; uint64_t dropped_packets; uint64_t dropped_bytes; + /* Number of times an upstream TCP session attached to this connection paused + * its reads due to client-side backpressure (HWM crossed). Pure pause + * counter — has no relation to dropped_packets. UDP paths never increment + * this since they don't pause; their congestion shows up in dropped_packets. */ uint32_t backpressure_events; int stream_registered; double queue_avg_bytes; diff --git a/src/http_proxy.c b/src/http_proxy.c index 21efbd8..685e136 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -92,6 +92,8 @@ static void http_proxy_pause_upstream(http_proxy_session_t *session) { if (!session || session->upstream_paused) return; session->upstream_paused = 1; + if (session->conn) + session->conn->backpressure_events++; connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "HTTP Proxy: Paused upstream reads (queued=%zu limit=%zu)", session->conn ? connection_queue_bytes(session->conn) : 0, diff --git a/src/rtsp.c b/src/rtsp.c index b838028..33c3fad 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -341,6 +341,8 @@ static void rtsp_pause_upstream(rtsp_session_t *session) { return; session->upstream_paused = 1; session->pause_started_ms = get_time_ms(); + if (session->conn) + session->conn->backpressure_events++; connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "RTSP TCP: Paused upstream reads (queued=%zu limit=%zu)", session->conn ? connection_queue_bytes(session->conn) : 0, diff --git a/src/status.h b/src/status.h index f284a49..3e20221 100644 --- a/src/status.h +++ b/src/status.h @@ -80,7 +80,7 @@ typedef struct { uint32_t queue_buffers_highwater; /* Peak queued buffers */ uint64_t dropped_packets; /* Total dropped packets */ uint64_t dropped_bytes; /* Total dropped bytes */ - uint32_t backpressure_events; /* Times backpressure triggered */ + uint32_t backpressure_events; /* Times upstream reads were paused due to client backpressure (TCP only) */ int slow_active; } client_stats_t; From 5fc045deb98ecab8f115ae22bcba0148a9635bec Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 21:49:00 +0800 Subject: [PATCH 09/10] refactor(rtsp): drop max-pause guard, rely on keepalives during pause MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 30s pause guard was originally added on the assumption that the upstream RTSP server would reclaim our session if we stayed paused near its session timeout (~60s). In practice this never materialises: - Pause only blocks the recv path (HWM check in rtsp_handle_tcp_interleaved_data); the send side is untouched. - rtsp_session_tick() keeps emitting keepalives on the same socket, which the server treats as activity and resets its session timer. - Even if the server did reclaim us, we'd see EOF in the kernel buffer on next resume — rtsp_resume_upstream already handles that path cleanly via rtsp_force_cleanup + connection_begin_drain_close. So the only thing the 30s guard achieved was killing slow-but-recovering clients early. Drop the macro, the pause_started_ms field, and the guard block in rtsp_session_tick. Co-Authored-By: Claude Opus 4.7 --- src/rtsp.c | 28 ++++------------------------ src/rtsp.h | 3 +-- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/src/rtsp.c b/src/rtsp.c index 33c3fad..4111c25 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -315,32 +315,24 @@ void rtsp_session_init(rtsp_session_t *session) { /* Initialize flow control state (TCP transport only) */ session->upstream_paused = 0; - session->pause_started_ms = 0; } -/* Maximum time the upstream socket may stay paused before we force-disconnect. - * Must be safely below typical RTSP session timeouts (60s on most servers) so - * the server does not silently reclaim the session, leaving us with a half-dead - * connection. */ -#define RTSP_MAX_PAUSE_MS 30000 - /* Pause/resume are flag-only. We deliberately avoid `poller_mod` because on * kqueue (src/poller_kqueue.c) a mask without POLLER_IN/POLLER_OUT deletes * the read AND write filters, silently losing HUP/EOF detection while * paused. The TCP socket also carries control-plane responses * (PLAY/keepalive replies, TEARDOWN), and outbound work (state machine * requests, keepalives) updates POLLER_OUT independently — keeping the read - * filter in place lets all of that continue to flow. rtsp_handle_tcp_- - * interleaved_data() bails out via the HWM check when the queue is - * saturated; rtsp_session_tick() enforces RTSP_MAX_PAUSE_MS so we don't - * outlast the server's session timeout. */ + * filter in place lets all of that continue to flow during pause, so the + * server's session timer keeps getting reset and never reclaims us. + * rtsp_handle_tcp_interleaved_data() bails out via the HWM check when the + * queue is saturated. */ static void rtsp_pause_upstream(rtsp_session_t *session) { if (!session || session->upstream_paused) return; if (session->transport_mode != RTSP_TRANSPORT_TCP) return; session->upstream_paused = 1; - session->pause_started_ms = get_time_ms(); if (session->conn) session->conn->backpressure_events++; connection_recompute_any_upstream_paused(session->conn); @@ -353,7 +345,6 @@ void rtsp_resume_upstream(rtsp_session_t *session) { if (!session || !session->upstream_paused) return; session->upstream_paused = 0; - session->pause_started_ms = 0; connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "RTSP TCP: Resumed upstream reads"); /* Drain synchronously here: edge-triggered pollers won't deliver another @@ -1650,17 +1641,6 @@ int rtsp_session_tick(rtsp_session_t *session, int64_t now) { } } - /* Max-pause guard for TCP transport: if the client has been too slow for - * too long, abort rather than risk the upstream server timing out our - * session (would leave a half-dead connection). */ - if (session->upstream_paused && session->pause_started_ms > 0 && - now - session->pause_started_ms > RTSP_MAX_PAUSE_MS) { - logger(LOG_WARN, "RTSP TCP: Client too slow, upstream paused for %lld ms — aborting session", - (long long)(now - session->pause_started_ms)); - rtsp_session_set_state(session, RTSP_STATE_ERROR); - return -1; - } - /* Send periodic keepalive to prevent server session timeout */ if (session->state == RTSP_STATE_PLAYING && session->keepalive_interval_ms > 0 && session->session_id[0] != '\0') { if (session->last_keepalive_ms == 0) { diff --git a/src/rtsp.h b/src/rtsp.h index 47a4f94..402a9d8 100644 --- a/src/rtsp.h +++ b/src/rtsp.h @@ -198,8 +198,7 @@ typedef struct { plane, not media) */ /* Flow control state (TCP interleaved transport only) */ - int upstream_paused; /* 1 = upstream POLLER_IN currently un-armed due to client backpressure */ - int64_t pause_started_ms; /* When pause began; used by tick to enforce max-pause guard */ + int upstream_paused; /* 1 = upstream recv currently suspended due to client backpressure */ } rtsp_session_t; /* Function prototypes */ From 27d8159ee8587d58d189fc3542c65880849ea17f Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Tue, 5 May 2026 21:51:22 +0800 Subject: [PATCH 10/10] refactor(connection): extract connection_record_pause helper Both http_proxy_pause_upstream and rtsp_pause_upstream open-coded the same null-guarded backpressure_events bump. Mirror connection_record_- drop with a connection_record_pause inline helper so the counter's ownership stays with the connection module. Co-Authored-By: Claude Opus 4.7 --- src/connection.h | 7 +++++++ src/http_proxy.c | 3 +-- src/rtsp.c | 3 +-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/connection.h b/src/connection.h index 5e9afa6..60dbed2 100644 --- a/src/connection.h +++ b/src/connection.h @@ -191,6 +191,13 @@ static inline size_t connection_queue_bytes(const connection_t *c) { return c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; } +/* Record one upstream-pause edge. Called by per-transport pause helpers + * (http_proxy_pause_upstream, rtsp_pause_upstream) on the 0->1 transition. */ +static inline void connection_record_pause(connection_t *c) { + if (c) + c->backpressure_events++; +} + /** * Returns true when the client send queue has reached the HWM and upstream * reads should be paused. Refreshes c->queue_limit_bytes from current pool diff --git a/src/http_proxy.c b/src/http_proxy.c index 685e136..52f85bc 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -92,8 +92,7 @@ static void http_proxy_pause_upstream(http_proxy_session_t *session) { if (!session || session->upstream_paused) return; session->upstream_paused = 1; - if (session->conn) - session->conn->backpressure_events++; + connection_record_pause(session->conn); connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "HTTP Proxy: Paused upstream reads (queued=%zu limit=%zu)", session->conn ? connection_queue_bytes(session->conn) : 0, diff --git a/src/rtsp.c b/src/rtsp.c index 4111c25..604fed0 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -333,8 +333,7 @@ static void rtsp_pause_upstream(rtsp_session_t *session) { if (session->transport_mode != RTSP_TRANSPORT_TCP) return; session->upstream_paused = 1; - if (session->conn) - session->conn->backpressure_events++; + connection_record_pause(session->conn); connection_recompute_any_upstream_paused(session->conn); logger(LOG_DEBUG, "RTSP TCP: Paused upstream reads (queued=%zu limit=%zu)", session->conn ? connection_queue_bytes(session->conn) : 0,