diff --git a/e2e/test_flow_control.py b/e2e/test_flow_control.py new file mode 100644 index 0000000..67d6eb6 --- /dev/null +++ b/e2e/test_flow_control.py @@ -0,0 +1,149 @@ +""" +E2E coverage for the upstream flow-control fix. + +A slow downstream client used to be aborted partway through a large HTTP +proxy response because the per-connection zerocopy queue would saturate and +``connection_queue_zerocopy()`` would return -1 (packet-drop semantics +inherited from RTP/UDP). The fix pauses upstream reads when the client send +queue exceeds the high watermark and resumes them once it drops back below +the low watermark. + +We can't deterministically observe individual pause/resume transitions +(they depend on kernel buffer sizes), so the test asserts the only thing +that matters end-to-end: byte-for-byte completeness of the proxied response. + +The same flow-control machinery is wired into RTSP TCP interleaved transport +in src/rtsp.c, but a deterministic e2e test for it would require a more +elaborate mock that drives backpressure without closing right after sending. +The HTTP test exercises the shared infrastructure (stream_on_client_drain, +the HWM/LWM helpers, the IDLE-path POLLER_OUT preservation). +""" + +import socket +import time + +import pytest + +from helpers import ( + MockHTTPUpstream, + R2HProcess, + find_free_port, +) + +# These tests intentionally throttle the client to provoke backpressure. +pytestmark = pytest.mark.slow + + +@pytest.fixture(scope="module") +def shared_r2h(r2h_binary): + # ``-b 128`` shrinks the global buffer pool cap to 128 buffers + # (~192 KiB), which forces the per-connection zerocopy queue limit + # down to ~96 KiB. With the default cap (16384 buffers / ~24 MiB) a + # short test body would be absorbed entirely without ever crossing + # the HWM, defeating the whole purpose of these tests. + port = find_free_port() + r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-m", "100", "-b", "128"]) + r2h.start() + yield r2h + r2h.stop() + + +def _slow_drain_until_eof( + host: str, + port: int, + path: str, + chunk_size: int, + sleep_per_chunk: float, + overall_timeout: float, +) -> tuple[int, dict, bytes]: + """HTTP/1.0 GET that reads `chunk_size` bytes then sleeps, repeating + until EOF or `overall_timeout` expires. + + Returns ``(status, headers_dict, body_bytes)``. Connection-level errors + return ``(0, {}, partial_body)`` — useful for asserting that the OLD + (un-fixed) code drops the connection mid-transfer. + """ + sock = socket.create_connection((host, port), timeout=overall_timeout) + body = b"" + try: + sock.sendall(("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (path, host)).encode()) + + deadline = time.monotonic() + overall_timeout + buf = b"" + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + sock.settimeout(min(remaining, 2.0)) + try: + piece = sock.recv(chunk_size) + except socket.timeout: + continue + except OSError: + break + if not piece: + break + buf += piece + if sleep_per_chunk > 0: + time.sleep(sleep_per_chunk) + + header_end = buf.find(b"\r\n\r\n") + if header_end < 0: + return 0, {}, buf + + header_text = buf[:header_end].decode(errors="replace") + body = buf[header_end + 4 :] + parts = header_text.split("\r\n") + # Guard against malformed status lines (truncated/corrupted response) + # so the test fails on the assertion, not on a parse exception. + try: + status_code = int(parts[0].split()[1]) + except IndexError, ValueError: + return 0, {}, buf + hdrs = {} + for line in parts[1:]: + if ":" in line: + k, v = line.split(":", 1) + hdrs[k.strip().lower()] = v.strip() + return status_code, hdrs, body + finally: + sock.close() + + +# --------------------------------------------------------------------------- +# HTTP proxy +# --------------------------------------------------------------------------- + + +@pytest.mark.http_proxy +class TestHTTPProxyBackpressure: + """A slow HTTP client must receive the full proxied body.""" + + def test_slow_client_receives_full_body(self, shared_r2h): + # 1 MiB body comfortably exceeds the ~96 KiB zerocopy queue limit + # imposed by the ``-b 128`` shared_r2h fixture, so the slow client + # forces multiple pause/resume cycles before EOF. + body_size = 1024 * 1024 + payload = bytes((i & 0xFF for i in range(body_size))) + + upstream = MockHTTPUpstream( + routes={"/big.ts": {"status": 200, "body": payload, "headers": {"Content-Type": "video/mp2t"}}} + ) + upstream.start() + try: + status, _, received = _slow_drain_until_eof( + "127.0.0.1", + shared_r2h.port, + "/http/127.0.0.1:%d/big.ts" % upstream.port, + chunk_size=8 * 1024, + sleep_per_chunk=0.02, # ~400 KB/s ceiling, well below pool refill rate + overall_timeout=20.0, + ) + assert status == 200, "Slow client should still receive a 200 response" + assert len(received) == body_size, "Slow client received %d/%d bytes — flow control regression?" % ( + len(received), + body_size, + ) + assert received == payload, "Body content mismatch — corruption in proxy path" + finally: + upstream.stop() diff --git a/src/connection.c b/src/connection.c index 48feddc..8ff5938 100644 --- a/src/connection.c +++ b/src/connection.c @@ -245,18 +245,26 @@ static size_t connection_compute_limit_bytes(buffer_pool_t *pool, size_t fair_by return limit_bytes; } -static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms) { +/* Side-effect-free inputs into the queue-limit calculation. */ +typedef struct { + buffer_pool_t *pool; + size_t fair_bytes; + double burst_factor; /* before slow_active clamp */ +} queue_limit_inputs_t; + +static void connection_prepare_queue_limit_inputs(queue_limit_inputs_t *out) { buffer_pool_t *pool = &zerocopy_state.pool; - size_t active = zerocopy_active_streams(); + out->pool = pool; + size_t active = zerocopy_active_streams(); if (active == 0) active = 1; size_t total_buffers = pool->num_buffers ? pool->num_buffers : BUFFER_POOL_INITIAL_SIZE; - size_t share_buffers = total_buffers / active; if (share_buffers < CONN_QUEUE_MIN_BUFFERS) share_buffers = CONN_QUEUE_MIN_BUFFERS; + out->fair_bytes = share_buffers * BUFFER_POOL_BUFFER_SIZE; double utilization = 0.0; if (pool->max_buffers > 0) { @@ -264,29 +272,48 @@ static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms) utilization = (double)used_buffers / (double)pool->max_buffers; } - double burst_factor = CONN_QUEUE_BURST_FACTOR; + out->burst_factor = CONN_QUEUE_BURST_FACTOR; if (pool->num_buffers >= pool->max_buffers || utilization >= CONN_QUEUE_HIGH_UTIL_THRESHOLD) - burst_factor = CONN_QUEUE_BURST_FACTOR_CONGESTED; + out->burst_factor = CONN_QUEUE_BURST_FACTOR_CONGESTED; if (pool->num_free < pool->low_watermark / 2 || utilization >= CONN_QUEUE_DRAIN_UTIL_THRESHOLD) - burst_factor = CONN_QUEUE_BURST_FACTOR_DRAIN; + out->burst_factor = CONN_QUEUE_BURST_FACTOR_DRAIN; +} - size_t fair_bytes = share_buffers * BUFFER_POOL_BUFFER_SIZE; - double queue_mem_bytes = (double)c->zc_queue.num_queued * (double)BUFFER_POOL_BUFFER_SIZE; +static inline double connection_apply_slow_clamp(double burst_factor, int slow_active) { + return (slow_active && burst_factor > CONN_QUEUE_SLOW_CLAMP_FACTOR) ? CONN_QUEUE_SLOW_CLAMP_FACTOR : burst_factor; +} + +/* Pure (no side effect) limit computation. Reads current pool state and the + * connection's existing slow_active flag, but does NOT update the EWMA or the + * slow-state debounce machine. Safe to call from hot-path checks where + * sampling EWMA at the wrong cadence would distort slow detection. */ +static inline size_t connection_compute_queue_limit(const connection_t *c) { + queue_limit_inputs_t in; + connection_prepare_queue_limit_inputs(&in); + double burst_factor = connection_apply_slow_clamp(in.burst_factor, c->slow_active); + return connection_compute_limit_bytes(in.pool, in.fair_bytes, burst_factor); +} +static size_t connection_update_queue_limit(connection_t *c, int64_t now_ms) { + queue_limit_inputs_t in; + connection_prepare_queue_limit_inputs(&in); + + double queue_mem_bytes = (double)c->zc_queue.num_queued * (double)BUFFER_POOL_BUFFER_SIZE; if (c->queue_avg_bytes <= 0.0) c->queue_avg_bytes = queue_mem_bytes; else c->queue_avg_bytes = (1.0 - CONN_QUEUE_EWMA_ALPHA) * c->queue_avg_bytes + CONN_QUEUE_EWMA_ALPHA * queue_mem_bytes; - size_t bursted_bytes = connection_compute_limit_bytes(pool, fair_bytes, burst_factor); - - double slow_threshold = (double)fair_bytes * CONN_QUEUE_SLOW_FACTOR; + /* Use unclamped burst_factor for slow thresholds — the "ideal" reference + * the slow-state machine compares the EWMA against. */ + size_t bursted_bytes = connection_compute_limit_bytes(in.pool, in.fair_bytes, in.burst_factor); + double slow_threshold = (double)in.fair_bytes * CONN_QUEUE_SLOW_FACTOR; double limit_based_threshold = (double)bursted_bytes * CONN_QUEUE_SLOW_LIMIT_RATIO; if (slow_threshold > limit_based_threshold) slow_threshold = limit_based_threshold; - double slow_exit_threshold = (double)fair_bytes * CONN_QUEUE_SLOW_EXIT_FACTOR; + double slow_exit_threshold = (double)in.fair_bytes * CONN_QUEUE_SLOW_EXIT_FACTOR; double limit_exit_threshold = (double)bursted_bytes * CONN_QUEUE_SLOW_EXIT_LIMIT_RATIO; if (slow_exit_threshold > limit_exit_threshold) slow_exit_threshold = limit_exit_threshold; @@ -309,18 +336,13 @@ static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms) c->slow_candidate_since = 0; } - if (c->slow_active && burst_factor > CONN_QUEUE_SLOW_CLAMP_FACTOR) - burst_factor = CONN_QUEUE_SLOW_CLAMP_FACTOR; - - size_t limit_bytes = connection_compute_limit_bytes(pool, fair_bytes, burst_factor); - - return limit_bytes; + double burst_factor = connection_apply_slow_clamp(in.burst_factor, c->slow_active); + return connection_compute_limit_bytes(in.pool, in.fair_bytes, burst_factor); } static inline void connection_record_drop(connection_t *c, size_t len) { c->dropped_packets++; c->dropped_bytes += len; - c->backpressure_events++; } static void connection_report_queue(connection_t *c) { @@ -328,12 +350,75 @@ static void connection_report_queue(connection_t *c) { return; size_t queue_buffers = c->zc_queue.num_queued; - size_t queue_bytes = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; + size_t queue_bytes = connection_queue_bytes(c); status_update_client_queue(c->status_index, queue_bytes, queue_buffers, c->queue_limit_bytes, c->queue_bytes_highwater, c->queue_buffers_highwater, c->dropped_packets, c->dropped_bytes, c->backpressure_events, c->slow_active); } + +/* Backpressure watermarks for TCP-to-TCP relay flow control. Upstream modules + * (HTTP proxy, RTSP TCP) pause reads when the client send queue exceeds HWM + * and resume when it falls back below LWM. The 25% hysteresis band prevents + * thrash. */ +#define CONN_FLOW_CONTROL_HWM_NUM 3 +#define CONN_FLOW_CONTROL_HWM_DEN 4 /* HWM = 75% of queue_limit_bytes */ +#define CONN_FLOW_CONTROL_LWM_NUM 1 +#define CONN_FLOW_CONTROL_LWM_DEN 2 /* LWM = 50% of queue_limit_bytes */ + +#define CONN_HWM(limit) (((limit) * CONN_FLOW_CONTROL_HWM_NUM) / CONN_FLOW_CONTROL_HWM_DEN) +#define CONN_LWM(limit) (((limit) * CONN_FLOW_CONTROL_LWM_NUM) / CONN_FLOW_CONTROL_LWM_DEN) + +/* Lower bound on the dynamic queue limit (in slot-equivalent bytes). Derived + * from the CONN_QUEUE_MIN_BUFFERS clamp on share_buffers and the DRAIN burst + * factor floor of 1.0 — the dynamic limit can never drop below this value + * regardless of active_streams or pool utilization shifts. Used to gate a + * fast-path early-out in the HWM/LWM hot-path checks. */ +#define CONN_FLOW_CONTROL_MIN_LIMIT_BYTES ((size_t)CONN_QUEUE_MIN_BUFFERS * (size_t)BUFFER_POOL_BUFFER_SIZE) +#define CONN_FLOW_CONTROL_MIN_HWM_BYTES CONN_HWM(CONN_FLOW_CONTROL_MIN_LIMIT_BYTES) +#define CONN_FLOW_CONTROL_MIN_LWM_BYTES CONN_LWM(CONN_FLOW_CONTROL_MIN_LIMIT_BYTES) + +int connection_should_pause_upstream(connection_t *c) { + if (!c) + return 0; + size_t queued = connection_queue_bytes(c); + /* Fast path: queue is below the HWM of the absolute-minimum dynamic limit. + * No pause possible regardless of how active_streams / pool utilization / + * slow_active have shifted since the last enqueue. */ + if (queued < CONN_FLOW_CONTROL_MIN_HWM_BYTES) + return 0; + /* Slow path: refresh limit so the decision uses current inputs. */ + c->queue_limit_bytes = connection_compute_queue_limit(c); + return queued >= CONN_HWM(c->queue_limit_bytes); +} + +int connection_can_resume_upstream(connection_t *c) { + if (!c) + return 0; + size_t queued = connection_queue_bytes(c); + /* Fast path: queue is below the LWM of the absolute-minimum dynamic limit. + * Always resumable regardless of input shifts. */ + if (queued <= CONN_FLOW_CONTROL_MIN_LWM_BYTES) + return 1; + /* Slow path: refresh limit so the decision uses current inputs. */ + c->queue_limit_bytes = connection_compute_queue_limit(c); + return queued <= CONN_LWM(c->queue_limit_bytes); +} + +void connection_recompute_any_upstream_paused(connection_t *c) { + if (!c) + return; + c->any_upstream_paused = (c->stream.http_proxy.initialized && c->stream.http_proxy.upstream_paused) || + (c->stream.rtsp.initialized && c->stream.rtsp.upstream_paused); +} + +void connection_begin_drain_close(connection_t *c) { + if (!c || c->state == CONN_CLOSING) + return; + c->state = CONN_CLOSING; + connection_epoll_update_events(c->epfd, c->fd, POLLER_IN | POLLER_OUT | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); +} + int connection_set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags < 0) @@ -532,11 +617,14 @@ connection_write_status_t connection_handle_write(connection_t *c) { return CONNECTION_WRITE_IDLE; } + size_t total_sent = 0; + /* Loop to drain all writable data for edge-triggered pollers where * EPOLLOUT / EV_CLEAR fires only once when the socket becomes writable. */ for (;;) { size_t bytes_sent = 0; int ret = zerocopy_send(c->fd, &c->zc_queue, &bytes_sent); + total_sent += bytes_sent; if (ret < 0 && ret != -2) { c->state = CONN_CLOSING; @@ -548,15 +636,27 @@ connection_write_status_t connection_handle_write(connection_t *c) { if (ret == -2) { /* EAGAIN - socket send buffer full, wait for next writable event */ connection_report_queue(c); + if (total_sent > 0) + stream_on_client_drain(&c->stream); return CONNECTION_WRITE_BLOCKED; } if (!c->zc_queue.head) { - /* All data sent - remove POLLER_OUT */ - connection_epoll_update_events(c->epfd, c->fd, POLLER_IN | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); - connection_report_queue(c); - if (c->state == CONN_CLOSING && !c->zc_queue.pending_head) + if (c->state == CONN_CLOSING && !c->zc_queue.pending_head) { + connection_epoll_update_events(c->epfd, c->fd, POLLER_IN | POLLER_RDHUP | POLLER_HUP | POLLER_ERR); + connection_report_queue(c); return CONNECTION_WRITE_CLOSED; + } + /* Notify upstream BEFORE arming the poller mask: resume() may queue + * new buffers in this same call frame, in which case POLLER_OUT must + * stay armed so the worker re-enters this function to drain them. */ + if (total_sent > 0) + stream_on_client_drain(&c->stream); + uint32_t mask = POLLER_IN | POLLER_RDHUP | POLLER_HUP | POLLER_ERR; + if (c->zc_queue.head) + mask |= POLLER_OUT; + connection_epoll_update_events(c->epfd, c->fd, mask); + connection_report_queue(c); return CONNECTION_WRITE_IDLE; } @@ -567,6 +667,8 @@ connection_write_status_t connection_handle_write(connection_t *c) { /* Queue still has data but we couldn't make progress */ connection_report_queue(c); + if (total_sent > 0) + stream_on_client_drain(&c->stream); return CONNECTION_WRITE_PENDING; } @@ -1012,8 +1114,8 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref) { return 0; int64_t now_ms = get_time_ms(); - size_t limit_bytes = connection_calculate_queue_limit(c, now_ms); - size_t queued_bytes = c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; + size_t limit_bytes = connection_update_queue_limit(c, now_ms); + size_t queued_bytes = connection_queue_bytes(c); size_t projected_bytes = queued_bytes + buf_ref->data_size; c->queue_limit_bytes = limit_bytes; @@ -1021,7 +1123,7 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref) { if (projected_bytes > limit_bytes) { connection_record_drop(c, buf_ref->data_size); - if (c->backpressure_events == 1 || (c->backpressure_events % 200) == 0) { + if (c->dropped_packets == 1 || (c->dropped_packets % 200) == 0) { logger(LOG_DEBUG, "Backpressure: dropping %zu bytes for client fd=%d (queued=%zu " "limit=%zu drops=%llu)", diff --git a/src/connection.h b/src/connection.h index 0f8a0e3..60dbed2 100644 --- a/src/connection.h +++ b/src/connection.h @@ -62,11 +62,19 @@ typedef struct connection_s { size_t queue_buffers_highwater; uint64_t dropped_packets; uint64_t dropped_bytes; + /* Number of times an upstream TCP session attached to this connection paused + * its reads due to client-side backpressure (HWM crossed). Pure pause + * counter — has no relation to dropped_packets. UDP paths never increment + * this since they don't pause; their congestion shows up in dropped_packets. */ uint32_t backpressure_events; int stream_registered; double queue_avg_bytes; int slow_active; int64_t slow_candidate_since; + /* Set when any TCP upstream session attached to this connection has paused + * its reads due to client-side backpressure. Lets the per-write notify + * fast-path skip cheaply when no upstream is paused (the common case). */ + int any_upstream_paused; /* r2h-token Set-Cookie flag: set cookie when token was provided via URL query */ int should_set_r2h_cookie; @@ -177,4 +185,49 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref); */ int connection_queue_file(connection_t *c, int file_fd, off_t file_offset, size_t file_size); +/* Slot-equivalent bytes currently queued (each pending buffer counts as a full + * BUFFER_POOL_BUFFER_SIZE slot, matching the unit used by queue_limit_bytes). */ +static inline size_t connection_queue_bytes(const connection_t *c) { + return c->zc_queue.num_queued * BUFFER_POOL_BUFFER_SIZE; +} + +/* Record one upstream-pause edge. Called by per-transport pause helpers + * (http_proxy_pause_upstream, rtsp_pause_upstream) on the 0->1 transition. */ +static inline void connection_record_pause(connection_t *c) { + if (c) + c->backpressure_events++; +} + +/** + * Returns true when the client send queue has reached the HWM and upstream + * reads should be paused. Refreshes c->queue_limit_bytes from current pool + * state when the queue rises above the absolute-minimum-limit fast-path + * threshold, so the decision uses fresh inputs (active stream count, pool + * utilization) rather than whatever was cached at the last enqueue. + */ +int connection_should_pause_upstream(connection_t *c); + +/** + * Returns true when the client send queue has fallen back below the LWM and + * paused upstream reads should be resumed. Refreshes c->queue_limit_bytes + * before deciding for the same reason as the pause helper. + */ +int connection_can_resume_upstream(connection_t *c); + +/** + * Recompute the connection-level `any_upstream_paused` bit by inspecting all + * attached upstream sessions. Call after any individual upstream session + * toggles its own `upstream_paused` flag so the cheap per-write fast path in + * stream_on_client_drain() stays accurate. + */ +void connection_recompute_any_upstream_paused(connection_t *c); + +/** + * Mark the connection for orderly shutdown after upstream EOF/error: switch + * to CONN_CLOSING and re-arm the full event mask so the worker keeps draining + * any queued bytes to the client before tearing down. No-op if the + * connection is already CONN_CLOSING. + */ +void connection_begin_drain_close(connection_t *c); + #endif /* CONNECTION_H */ diff --git a/src/http_proxy.c b/src/http_proxy.c index 84e78b4..52f85bc 100644 --- a/src/http_proxy.c +++ b/src/http_proxy.c @@ -35,6 +35,7 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session); static int http_proxy_parse_response_headers(http_proxy_session_t *session); static int http_proxy_append_raw_headers(http_proxy_session_t *session, char **dest, size_t *remaining, int *total_len, int filter_user_agent); +static void http_proxy_pause_upstream(http_proxy_session_t *session); static const char *http_proxy_get_override_user_agent(void) { if (config.http_proxy_user_agent && config.http_proxy_user_agent[0] != '\0') { @@ -75,9 +76,55 @@ void http_proxy_session_init(http_proxy_session_t *session) { session->bytes_received = 0; session->headers_received = 0; session->headers_forwarded = 0; + session->upstream_paused = 0; session->cleanup_done = 0; } +/* Pause/resume are flag-only. We deliberately avoid `poller_mod` because on + * kqueue (src/poller_kqueue.c) a mask without POLLER_IN/POLLER_OUT deletes + * the read AND write filters, silently losing HUP/EOF detection while + * paused. Edge-triggered pollers will keep waking the worker for new data + * arrivals; the recv handler bails out via the HWM check when the queue is + * saturated. This keeps the kernel TCP recv buffer the natural backstop: + * once it fills, the upstream sender's window closes naturally. + */ +static void http_proxy_pause_upstream(http_proxy_session_t *session) { + if (!session || session->upstream_paused) + return; + session->upstream_paused = 1; + connection_record_pause(session->conn); + connection_recompute_any_upstream_paused(session->conn); + logger(LOG_DEBUG, "HTTP Proxy: Paused upstream reads (queued=%zu limit=%zu)", + session->conn ? connection_queue_bytes(session->conn) : 0, + session->conn ? session->conn->queue_limit_bytes : 0); +} + +void http_proxy_resume_upstream(http_proxy_session_t *session) { + if (!session || !session->upstream_paused) + return; + session->upstream_paused = 0; + connection_recompute_any_upstream_paused(session->conn); + logger(LOG_DEBUG, "HTTP Proxy: Resumed upstream reads"); + /* Drain synchronously here: edge-triggered pollers won't deliver another + * EPOLLIN edge for bytes that arrived while paused, so anything still + * buffered in the kernel must come out in this call frame. Loop exits + * on EAGAIN, HWM re-pause, EOF, or error. */ + while (session->state == HTTP_PROXY_STATE_STREAMING && !session->upstream_paused) { + int ret = http_proxy_try_receive_response(session); + if (ret <= 0) + break; + } + /* EOF discovered mid-drain: no further upstream events will arrive, so + * close the upstream socket here and start the client drain. */ + if (session->state == HTTP_PROXY_STATE_COMPLETE) { + if (session->socket >= 0) { + worker_cleanup_socket_from_epoll(session->epoll_fd, session->socket); + session->socket = -1; + } + connection_begin_drain_close(session->conn); + } +} + int http_proxy_parse_url(http_proxy_session_t *session, const char *url) { const char *p; char *colon; @@ -762,6 +809,15 @@ static int http_proxy_try_receive_response(http_proxy_session_t *session) { } /* Phase 2: Zero-copy streaming - recv directly to buffer pool */ + + /* Pause upstream BEFORE recv when client queue is near limit. Dropping + * bytes mid-stream would corrupt the response body, so we instead push + * backpressure into the upstream TCP receive buffer. */ + if (connection_should_pause_upstream(session->conn)) { + http_proxy_pause_upstream(session); + return 0; + } + buffer_ref_t *buf = buffer_pool_alloc(); if (!buf) { logger(LOG_ERROR, "HTTP Proxy: Buffer pool exhausted"); diff --git a/src/http_proxy.h b/src/http_proxy.h index 58f9d66..b4b2de1 100644 --- a/src/http_proxy.h +++ b/src/http_proxy.h @@ -107,6 +107,9 @@ typedef struct { /* Per-service upstream interface override (resolved at init, non-owning) */ const char *upstream_ifname; + /* Flow control state */ + int upstream_paused; /* 1 = upstream POLLER_IN currently un-armed due to client backpressure */ + /* Cleanup state */ int cleanup_done; /* Flag: cleanup has been completed */ } http_proxy_session_t; @@ -196,6 +199,13 @@ int http_proxy_session_cleanup(http_proxy_session_t *session); */ int http_proxy_session_tick(http_proxy_session_t *session, int64_t now); +/** + * Resume reading from upstream after client send queue has drained. + * Called from stream_on_client_drain when zc_queue falls below LWM. + * @param session HTTP proxy session + */ +void http_proxy_resume_upstream(http_proxy_session_t *session); + /** * Build HTTP proxy URL for transformed M3U * Converts http://host:port/path to {BASE_URL}http/host:port/path diff --git a/src/rtsp.c b/src/rtsp.c index e1f3568..604fed0 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -312,6 +312,51 @@ void rtsp_session_init(rtsp_session_t *session) { session->teardown_requested = 0; session->teardown_reconnect_done = 0; session->state_before_teardown = RTSP_STATE_INIT; + + /* Initialize flow control state (TCP transport only) */ + session->upstream_paused = 0; +} + +/* Pause/resume are flag-only. We deliberately avoid `poller_mod` because on + * kqueue (src/poller_kqueue.c) a mask without POLLER_IN/POLLER_OUT deletes + * the read AND write filters, silently losing HUP/EOF detection while + * paused. The TCP socket also carries control-plane responses + * (PLAY/keepalive replies, TEARDOWN), and outbound work (state machine + * requests, keepalives) updates POLLER_OUT independently — keeping the read + * filter in place lets all of that continue to flow during pause, so the + * server's session timer keeps getting reset and never reclaims us. + * rtsp_handle_tcp_interleaved_data() bails out via the HWM check when the + * queue is saturated. */ +static void rtsp_pause_upstream(rtsp_session_t *session) { + if (!session || session->upstream_paused) + return; + if (session->transport_mode != RTSP_TRANSPORT_TCP) + return; + session->upstream_paused = 1; + connection_record_pause(session->conn); + connection_recompute_any_upstream_paused(session->conn); + logger(LOG_DEBUG, "RTSP TCP: Paused upstream reads (queued=%zu limit=%zu)", + session->conn ? connection_queue_bytes(session->conn) : 0, + session->conn ? session->conn->queue_limit_bytes : 0); +} + +void rtsp_resume_upstream(rtsp_session_t *session) { + if (!session || !session->upstream_paused) + return; + session->upstream_paused = 0; + connection_recompute_any_upstream_paused(session->conn); + logger(LOG_DEBUG, "RTSP TCP: Resumed upstream reads"); + /* Drain synchronously here: edge-triggered pollers won't deliver another + * EPOLLIN edge for bytes that arrived while paused. The drain function + * loops internally and returns -1 on EOF/recv error — close the upstream + * here in that case so the session isn't orphaned waiting for an event. */ + if (!session->conn) + return; + if (rtsp_handle_tcp_interleaved_data(session, session->conn) < 0) { + logger(LOG_DEBUG, "RTSP TCP: Upstream EOF during resume drain, closing"); + rtsp_force_cleanup(session); + connection_begin_drain_close(session->conn); + } } /** @@ -1751,6 +1796,14 @@ int rtsp_handle_tcp_interleaved_data(rtsp_session_t *session, connection_t *conn * per data arrival. The inner recv loop may fill the small response buffer * before hitting EAGAIN; after processing we must loop back to recv more. */ for (;;) { + /* Pause upstream BEFORE recv when client queue is near limit, so data + * accumulates in the upstream TCP receive buffer rather than being dropped + * at the application layer (which would tear holes in the RTP stream). */ + if (connection_should_pause_upstream(conn)) { + rtsp_pause_upstream(session); + return total_forwarded; + } + int hit_eagain = 0; /* Fill response buffer from socket */ diff --git a/src/rtsp.h b/src/rtsp.h index ac978d2..402a9d8 100644 --- a/src/rtsp.h +++ b/src/rtsp.h @@ -196,6 +196,9 @@ typedef struct { uint8_t response_buffer[RTSP_RESPONSE_BUFFER_SIZE]; /* Buffer for RTSP responses (control plane, not media) */ + + /* Flow control state (TCP interleaved transport only) */ + int upstream_paused; /* 1 = upstream recv currently suspended due to client backpressure */ } rtsp_session_t; /* Function prototypes */ @@ -287,11 +290,18 @@ int rtsp_send_keepalive(rtsp_session_t *session); int rtsp_state_machine_advance(rtsp_session_t *session); /** - * Periodic tick for RTSP session (STUN timeout, keepalive) + * Periodic tick for RTSP session (STUN timeout, keepalive, max-pause guard) * @param session RTSP session * @param now Current timestamp in milliseconds - * @return 0 on success + * @return 0 on success, -1 if session must be closed (e.g. paused too long) */ int rtsp_session_tick(rtsp_session_t *session, int64_t now); +/** + * Resume reading from upstream RTSP TCP socket after client send queue has + * drained. No-op for UDP transport mode. Called from stream_on_client_drain. + * @param session RTSP session + */ +void rtsp_resume_upstream(rtsp_session_t *session); + #endif /* __RTSP_H__ */ diff --git a/src/status.h b/src/status.h index f284a49..3e20221 100644 --- a/src/status.h +++ b/src/status.h @@ -80,7 +80,7 @@ typedef struct { uint32_t queue_buffers_highwater; /* Peak queued buffers */ uint64_t dropped_packets; /* Total dropped packets */ uint64_t dropped_bytes; /* Total dropped bytes */ - uint32_t backpressure_events; /* Times backpressure triggered */ + uint32_t backpressure_events; /* Times upstream reads were paused due to client backpressure (TCP only) */ int slow_active; } client_stats_t; diff --git a/src/stream.c b/src/stream.c index 5e54e85..d8c81d0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1,6 +1,7 @@ #include "stream.h" #include "connection.h" #include "fcc.h" +#include "http_proxy.h" #include "multicast.h" #include "rtp.h" #include "rtp_fec.h" @@ -19,12 +20,20 @@ #include #include -/* - * Process RTP payload with reordering - either forward to client (streaming) - * or capture I-frame (snapshot) - * Returns: bytes forwarded (>= 0), 1 if I-frame captured for snapshot, -1 on - * error - */ +void stream_on_client_drain(stream_context_t *ctx) { + /* Hot path: every successful client write hits this. Bail out cheaply when + * no upstream is paused (vast majority of streams). */ + if (!ctx || !ctx->conn || !ctx->conn->any_upstream_paused) + return; + if (!connection_can_resume_upstream(ctx->conn)) + return; + /* Resume functions are no-ops if not paused; no need to re-check here. */ + if (ctx->http_proxy.initialized) + http_proxy_resume_upstream(&ctx->http_proxy); + if (ctx->rtsp.initialized) + rtsp_resume_upstream(&ctx->rtsp); +} + int stream_process_rtp_payload(stream_context_t *ctx, buffer_ref_t *buf_ref) { uint8_t *data_ptr = (uint8_t *)buf_ref->data + buf_ref->data_offset; uint8_t *payload; diff --git a/src/stream.h b/src/stream.h index b23d26b..50c0c4d 100644 --- a/src/stream.h +++ b/src/stream.h @@ -106,4 +106,17 @@ int stream_context_cleanup(stream_context_t *ctx); */ int stream_process_rtp_payload(stream_context_t *ctx, buffer_ref_t *buf_ref); +/** + * Notify that the client send queue has just been drained (some buffers + * completed sending). If any TCP-based upstream session attached to this + * connection is currently paused due to backpressure, this resumes it when + * the queue has fallen below the low watermark. + * + * Called from connection_handle_write after a successful zerocopy_send. + * The struct must be at least zero-initialized (the embedded stream context + * in connection_t is via calloc); passing uninitialized stack memory is + * unsafe — `conn` and the `*.initialized` flags are dereferenced. + */ +void stream_on_client_drain(stream_context_t *ctx); + #endif /* __STREAM_H__ */