From dc2d0ddaf66fb3dc34d74a6c3f0f97a4f395a71e Mon Sep 17 00:00:00 2001 From: aymimkay <174902388+aymimkay@users.noreply.github.com> Date: Sat, 25 Apr 2026 13:01:31 +0330 Subject: [PATCH 1/2] googlevideo fast path --- src/domain_fronter.py | 324 +++++++++++++++++++++++++++++++++++++++++- src/proxy_server.py | 17 ++- 2 files changed, 337 insertions(+), 4 deletions(-) diff --git a/src/domain_fronter.py b/src/domain_fronter.py index c2581f0..b73b2be 100644 --- a/src/domain_fronter.py +++ b/src/domain_fronter.py @@ -1247,8 +1247,330 @@ async def stream_parallel_download(self, url: str, headers: dict, path (for example no range support or the file is too small). Returns True once this method has taken ownership of the client response, even if the stream later aborts. + + YouTube/googlevideo fast path + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + googlevideo URLs carry ``clen=`` so the file size is known + before we send a single request. This lets us: + + * Skip the Content-Range probe and go straight to chunking. + * Use 4 MiB chunks (vs 512 KiB default) because each Apps Script relay + call costs ~2 s regardless of payload size. + * Cap parallelism at 4 concurrent chunks (16 MiB in-flight) to avoid + saturating the Apps Script daily quota. + * Use a robust 5-attempt / 15 s-per-attempt probe so we never race + yt-dlp's 60 s socket timeout with a single slow probe. + * Honour yt-dlp's ``Range: bytes=N-`` resume header transparently. """ - first_resp = await self._range_probe(url, headers, 0, chunk_size - 1) + # ── YouTube / googlevideo fast path ─────────────────────────────── + is_yt = "googlevideo.com" in url + if is_yt: + from urllib.parse import parse_qs as _pqs, urlparse as _upl + _qs = _pqs(_upl(url).query) + _clen = _qs.get("clen", [None])[0] + if _clen: + chunk_size = 4 * 1024 * 1024 # 4 MiB per relay call + max_parallel = 4 # 16 MiB in-flight max + total_size = int(_clen) + else: + is_yt = False + + # Honour yt-dlp resume: Range: bytes=N- means "start from N" + start_offset = 0 + if is_yt and headers: + for _k, _v in (headers.items() if isinstance(headers, dict) + else headers): + if _k.lower() == "range": + import re as _re + _m = _re.match(r"bytes=(\d+)-", str(_v)) + if _m: + start_offset = int(_m.group(1)) + break + # ───────────────────────────────────────────────────────────────── + + if is_yt: + # Robust probe: 5 attempts × 15 s each so we never lose a race + # with yt-dlp's 60 s socket timeout (old single _range_probe + # could take 3 × 25 s = 75 s in the worst case). + _first_end = min(start_offset + chunk_size - 1, total_size - 1) + _ph = dict(headers) if isinstance(headers, dict) else dict(headers) + _ph["Range"] = f"bytes={start_offset}-{_first_end}" + _pp = self._build_payload("GET", url, _ph, b"") + first_resp = b"" + _probe_ok = False + for _att in range(5): + try: + first_resp = await asyncio.wait_for( + self._relay_payload_h1(_pp), timeout=15.0 + ) + _ps, _, _ = self._split_raw_response(first_resp) + if _ps == 206 or _ps < 500: + _probe_ok = True + break + log.warning("YT stream probe %d/5: status %d, retrying", + _att + 1, _ps) + except Exception as _pe: + log.warning("YT stream probe attempt %d/5 failed: %r, retrying", + _att + 1, _pe) + await asyncio.sleep(1.0 * (_att + 1)) + if not _probe_ok: + log.error("YT stream probe failed after 5 attempts for %s", + url[:80]) + return False + else: + first_resp = await self._range_probe(url, headers, 0, chunk_size - 1) + + status, resp_hdrs, resp_body = self._split_raw_response(first_resp) + if status != 206: + log.info( + "Streaming download fallback: initial probe returned %s for %s", + status, url[:80], + ) + return False + + if is_yt: + # We already know total_size from clen — skip Content-Range parsing. + first_start = start_offset + first_err = None if resp_body else "empty probe body" + else: + parsed_range = self._parse_content_range(resp_hdrs.get("content-range", "")) + if not parsed_range: + log.info( + "Streaming download fallback: missing/invalid Content-Range for %s", + url[:80], + ) + return False + first_start, first_end, total_size = parsed_range + first_err = self._validate_range_response( + status, resp_hdrs, resp_body, first_start, first_end, total_size, + ) + if first_start != 0 or first_err: + log.info( + "Streaming download fallback: invalid first range (%s) for %s", + first_err or f"start={first_start}", + url[:80], + ) + return False + + if not is_yt: + if min_size > 0 and total_size < min_size: + log.info( + "Streaming download fallback: file too small (%d < %d) for %s", + total_size, min_size, url[:80], + ) + return False + if max_chunks > 0: + required_chunk_size = max( + chunk_size, + (total_size + max_chunks - 1) // max_chunks, + ) + if required_chunk_size != chunk_size: + log.info( + "Parallel download tuning: chunk size raised from %d KB to %d KB " + "to keep request count under %d", + chunk_size // 1024, + required_chunk_size // 1024, + max_chunks, + ) + chunk_size = required_chunk_size + + remaining = total_size - start_offset + + if remaining <= chunk_size or len(resp_body) >= remaining: + if is_yt and start_offset > 0: + _hdr = ( + f"HTTP/1.1 206 Partial Content\r\n" + f"Content-Range: bytes {start_offset}-{total_size - 1}/{total_size}\r\n" + f"Content-Length: {remaining}\r\n\r\n" + ) + writer.write(_hdr.encode() + resp_body) + else: + writer.write(self._render_streaming_headers(resp_hdrs, total_size)) + writer.write(resp_body) + await writer.drain() + return True + + ranges = [] + start = start_offset + len(resp_body) + while start < total_size: + end = min(start + chunk_size - 1, total_size - 1) + ranges.append((start, end)) + start = end + 1 + + if is_yt: + log.info( + "YT stream: %s from offset %s, %d chunks of 4 MiB", + self._format_bytes_human(remaining), + self._format_bytes_human(start_offset), + len(ranges) + 1, + ) + else: + log.info("Parallel streaming download: %d bytes, %d chunks of %d KB", + total_size, len(ranges) + 1, chunk_size // 1024) + + temp_file = tempfile.TemporaryFile(prefix="mhrvpn_dl_") + file_lock = asyncio.Lock() + sem = asyncio.Semaphore(max_parallel) + cancel_event = asyncio.Event() + tasks: list[asyncio.Task] = [] + ready = [asyncio.Event() for _ in ranges] + errors: list[Exception | None] = [None for _ in ranges] + delivered_chunks = 1 + delivered_bytes = len(resp_body) + total_chunks = len(ranges) + 1 + last_progress_log = time.perf_counter() + t0 = time.perf_counter() + + async def _write_progress(force: bool = False) -> None: + nonlocal last_progress_log + now = time.perf_counter() + if not force and (now - last_progress_log) < 5.0: + return + elapsed = max(0.001, now - t0) + speed_bps = delivered_bytes / elapsed + log.info( + "Parallel download progress: %s [%d/%d chunks]", + self._progress_line( + elapsed=elapsed, + done=delivered_bytes, + total=remaining, + speed_bytes_per_sec=speed_bps, + ), + delivered_chunks, total_chunks, + ) + last_progress_log = now + + async def fetch_range(index: int, start_off: int, end_off: int, + max_tries: int = 3) -> None: + async with sem: + base_headers = dict(headers) if isinstance(headers, dict) \ + else dict(headers) + base_headers["Range"] = f"bytes={start_off}-{end_off}" + payload = self._build_payload("GET", url, base_headers, b"") + expected = end_off - start_off + 1 + last_err = "unknown" + try: + for attempt in range(max_tries): + if cancel_event.is_set(): + return + try: + raw = await self._relay_payload_h1(payload) + chunk_status, chunk_headers, chunk_body = \ + self._split_raw_response(raw) + if is_yt: + if chunk_status != 206: + last_err = f"unexpected status {chunk_status}" + elif len(chunk_body) == expected: + async with file_lock: + await asyncio.to_thread( + self._spool_write, temp_file, + start_off, chunk_body, + ) + ready[index].set() + return + else: + last_err = (f"short chunk " + f"{len(chunk_body)}/{expected} B") + else: + err = self._validate_range_response( + chunk_status, chunk_headers, chunk_body, + start_off, end_off, total_size, + ) + if err is None: + async with file_lock: + await asyncio.to_thread( + self._spool_write, temp_file, + start_off, chunk_body, + ) + ready[index].set() + return + last_err = err + except Exception as exc: + last_err = repr(exc) + if cancel_event.is_set(): + return + log.warning("Range %d-%d retry %d/%d: %s", + start_off, end_off, attempt + 1, + max_tries, last_err) + await asyncio.sleep(0.3 * (attempt + 1)) + errors[index] = RuntimeError( + f"chunk {start_off}-{end_off} failed after " + f"{max_tries} tries: {last_err}" + ) + ready[index].set() + except asyncio.CancelledError: + raise + + try: + # Send headers + first chunk immediately so the client sees data + # within ~8 s and doesn't hit its socket timeout. + if is_yt and start_offset > 0: + _hdr = ( + f"HTTP/1.1 206 Partial Content\r\n" + f"Content-Range: bytes {start_offset}-{total_size - 1}/{total_size}\r\n" + f"Content-Length: {remaining}\r\n\r\n" + ) + writer.write(_hdr.encode()) + else: + writer.write(self._render_streaming_headers(resp_hdrs, total_size)) + writer.write(resp_body) + await writer.drain() + + for index, (start_off, end_off) in enumerate(ranges): + tasks.append(asyncio.create_task(fetch_range(index, start_off, end_off))) + + for index, (start_off, end_off) in enumerate(ranges): + await ready[index].wait() + if errors[index] is not None: + raise errors[index] + expected = end_off - start_off + 1 + async with file_lock: + chunk = await asyncio.to_thread( + self._spool_read, temp_file, start_off, expected, + ) + if len(chunk) != expected: + raise RuntimeError( + f"spooled chunk {start_off}-{end_off} was truncated " + f"({len(chunk)}/{expected} B)" + ) + writer.write(chunk) + await writer.drain() + delivered_chunks += 1 + delivered_bytes += len(chunk) + await _write_progress(force=(index == len(ranges) - 1)) + + elapsed = max(0.001, time.perf_counter() - t0) + log.info( + "Parallel streaming download complete: %s", + self._progress_line( + elapsed=elapsed, + done=remaining, + total=remaining, + speed_bytes_per_sec=remaining / elapsed, + ), + ) + return True + except (ConnectionError, BrokenPipeError, TimeoutError) as exc: + log.info("Parallel download cancelled by client: %s", exc) + cancel_event.set() + return True + except Exception as exc: + self._mark_stream_download_failure(url, str(exc)) + log.error("Parallel streaming download failed (%s): %s", url[:60], exc) + cancel_event.set() + try: + if not writer.is_closing(): + writer.close() + except Exception: + pass + return True + finally: + cancel_event.set() + for task in tasks: + task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + temp_file.close() status, resp_hdrs, resp_body = self._split_raw_response(first_resp) if status != 206: diff --git a/src/proxy_server.py b/src/proxy_server.py index ff83b47..3fb2677 100644 --- a/src/proxy_server.py +++ b/src/proxy_server.py @@ -1359,15 +1359,21 @@ async def _relay_smart(self, method, url, headers, body): challenge pages. """ if method == "GET" and not body: - # Respect client's own Range header verbatim. + # Respect client's own Range header verbatim — + # EXCEPT for YouTube: stream_parallel_download handles chunking + # internally using clen, so yt-dlp's own Range header is ignored. if headers: for k in headers: if k.lower() == "range": + if "googlevideo.com" in url and "clen=" in url: + break # fall through to relay_parallel below return await self.fronter.relay( method, url, headers, body ) # Only probe with Range when the URL looks like a big file. - if self._is_likely_download(url, headers): + if self._is_likely_download(url, headers) or ( + "googlevideo.com" in url and "clen=" in url + ): return await self.fronter.relay_parallel( method, url, @@ -1401,9 +1407,14 @@ async def _maybe_stream_download(self, method: str, url: str, if headers: for key in headers: if key.lower() == "range": + # YouTube fast path: yt-dlp sends Range for resume, but + # stream_parallel_download handles chunking from clen. + if "googlevideo.com" in url and "clen=" in url: + break return False effective_headers = headers or {} - if not self._is_likely_download(url, effective_headers): + is_yt = "googlevideo.com" in url and "clen=" in url + if not is_yt and not self._is_likely_download(url, effective_headers): return False if not self.fronter.stream_download_allowed(url): return False From 26a24d338235335d725c58bd5f7da33fc6ee01c0 Mon Sep 17 00:00:00 2001 From: aymimkay <174902388+aymimkay@users.noreply.github.com> Date: Sat, 25 Apr 2026 23:53:37 +0330 Subject: [PATCH 2/2] googlevideo fast path: cleanup --- config.example.json | 2 + src/domain_fronter.py | 317 +++++++++--------------------------------- src/proxy_server.py | 16 ++- 3 files changed, 78 insertions(+), 257 deletions(-) diff --git a/config.example.json b/config.example.json index 0ec2456..9ef0036 100644 --- a/config.example.json +++ b/config.example.json @@ -53,6 +53,8 @@ "chunked_download_chunk_size": 524288, "chunked_download_max_parallel": 8, "chunked_download_max_chunks": 256, + "yt_chunk_size": 4194304, + "yt_max_parallel": 4, "block_hosts": [], "bypass_hosts": [ "localhost", diff --git a/src/domain_fronter.py b/src/domain_fronter.py index b73b2be..29db0fd 100644 --- a/src/domain_fronter.py +++ b/src/domain_fronter.py @@ -19,7 +19,7 @@ import tempfile import time from dataclasses import dataclass -from urllib.parse import urlparse +from urllib.parse import parse_qs, urlparse try: import certifi @@ -565,6 +565,42 @@ async def _relay_payload_h1(self, payload: dict) -> bytes: else: raise + async def _probe_with_retry(self, url: str, headers: dict, + start_off: int, end_off: int, + *, + max_tries: int = 5, + timeout: float = 15.0) -> bytes: + """Range probe with configurable per-attempt timeout and retry count. + + Intended for cases where the default ``_range_probe`` budget + (3 × 25 s = 75 s worst-case) is too slow — for example when the + caller must respond before a client-side socket timeout fires. + Retries on both exceptions and 5xx status codes. + """ + probe_headers = dict(headers) if headers else {} + probe_headers["Range"] = f"bytes={start_off}-{end_off}" + payload = self._build_payload("GET", url, probe_headers, b"") + last_raw = b"" + for attempt in range(max_tries): + try: + last_raw = await asyncio.wait_for( + self._relay_payload_h1(payload), timeout=timeout + ) + status, _, _ = self._split_raw_response(last_raw) + if status == 206 or status < 500: + return last_raw + log.warning( + "Range probe %d-%d attempt %d/%d: status %d, retrying", + start_off, end_off, attempt + 1, max_tries, status, + ) + except Exception as exc: + log.warning( + "Range probe %d-%d attempt %d/%d failed: %r, retrying", + start_off, end_off, attempt + 1, max_tries, exc, + ) + await asyncio.sleep(1.0 * (attempt + 1)) + return last_raw + async def _range_probe(self, url: str, headers: dict, start_off: int, end_off: int, *, max_tries: int = 3) -> bytes: probe_headers = dict(headers) if headers else {} @@ -1240,7 +1276,9 @@ async def stream_parallel_download(self, url: str, headers: dict, chunk_size: int = 512 * 1024, max_parallel: int = 8, max_chunks: int = 256, - min_size: int = 0) -> bool: + min_size: int = 0, + yt_chunk_size: int = 4 * 1024 * 1024, + yt_max_parallel: int = 4) -> bool: """Stream a large range-capable download to the client incrementally. Returns False when the target should fall back to the normal relay @@ -1251,72 +1289,43 @@ async def stream_parallel_download(self, url: str, headers: dict, YouTube/googlevideo fast path ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ googlevideo URLs carry ``clen=`` so the file size is known - before we send a single request. This lets us: - - * Skip the Content-Range probe and go straight to chunking. - * Use 4 MiB chunks (vs 512 KiB default) because each Apps Script relay - call costs ~2 s regardless of payload size. - * Cap parallelism at 4 concurrent chunks (16 MiB in-flight) to avoid - saturating the Apps Script daily quota. - * Use a robust 5-attempt / 15 s-per-attempt probe so we never race - yt-dlp's 60 s socket timeout with a single slow probe. - * Honour yt-dlp's ``Range: bytes=N-`` resume header transparently. + upfront. This lets us skip the Content-Range probe, use larger chunks + (``yt_chunk_size``, default 4 MiB) and lower parallelism + (``yt_max_parallel``, default 4) to stay within Apps Script quotas. + The robust ``_probe_with_retry`` helper is used instead of + ``_range_probe`` to avoid racing the client's socket timeout. + Resume via ``Range: bytes=N-`` (e.g. from yt-dlp) is honoured. """ # ── YouTube / googlevideo fast path ─────────────────────────────── is_yt = "googlevideo.com" in url if is_yt: - from urllib.parse import parse_qs as _pqs, urlparse as _upl - _qs = _pqs(_upl(url).query) + _qs = parse_qs(urlparse(url).query) _clen = _qs.get("clen", [None])[0] if _clen: - chunk_size = 4 * 1024 * 1024 # 4 MiB per relay call - max_parallel = 4 # 16 MiB in-flight max + chunk_size = yt_chunk_size + max_parallel = yt_max_parallel total_size = int(_clen) else: is_yt = False - # Honour yt-dlp resume: Range: bytes=N- means "start from N" + # Honour yt-dlp resume: Range: bytes=N- means "start from byte N" start_offset = 0 if is_yt and headers: - for _k, _v in (headers.items() if isinstance(headers, dict) - else headers): + for _k, _v in headers.items(): if _k.lower() == "range": - import re as _re - _m = _re.match(r"bytes=(\d+)-", str(_v)) + _m = re.match(r"bytes=(\d+)-$", str(_v)) if _m: start_offset = int(_m.group(1)) break # ───────────────────────────────────────────────────────────────── if is_yt: - # Robust probe: 5 attempts × 15 s each so we never lose a race - # with yt-dlp's 60 s socket timeout (old single _range_probe - # could take 3 × 25 s = 75 s in the worst case). - _first_end = min(start_offset + chunk_size - 1, total_size - 1) - _ph = dict(headers) if isinstance(headers, dict) else dict(headers) - _ph["Range"] = f"bytes={start_offset}-{_first_end}" - _pp = self._build_payload("GET", url, _ph, b"") - first_resp = b"" - _probe_ok = False - for _att in range(5): - try: - first_resp = await asyncio.wait_for( - self._relay_payload_h1(_pp), timeout=15.0 - ) - _ps, _, _ = self._split_raw_response(first_resp) - if _ps == 206 or _ps < 500: - _probe_ok = True - break - log.warning("YT stream probe %d/5: status %d, retrying", - _att + 1, _ps) - except Exception as _pe: - log.warning("YT stream probe attempt %d/5 failed: %r, retrying", - _att + 1, _pe) - await asyncio.sleep(1.0 * (_att + 1)) - if not _probe_ok: - log.error("YT stream probe failed after 5 attempts for %s", - url[:80]) - return False + # Use _probe_with_retry: 5 × 15 s budget is well within yt-dlp's + # default 60 s socket timeout; _range_probe's 3 × 25 s could race. + _end = min(start_offset + chunk_size - 1, total_size - 1) + first_resp = await self._probe_with_retry( + url, headers, start_offset, _end, + ) else: first_resp = await self._range_probe(url, headers, 0, chunk_size - 1) @@ -1329,9 +1338,8 @@ async def stream_parallel_download(self, url: str, headers: dict, return False if is_yt: - # We already know total_size from clen — skip Content-Range parsing. - first_start = start_offset - first_err = None if resp_body else "empty probe body" + # File size already known from clen — skip Content-Range parsing. + first_err = None if resp_body else "empty probe body" else: parsed_range = self._parse_content_range(resp_hdrs.get("content-range", "")) if not parsed_range: @@ -1399,10 +1407,11 @@ async def stream_parallel_download(self, url: str, headers: dict, if is_yt: log.info( - "YT stream: %s from offset %s, %d chunks of 4 MiB", + "YT stream: %s from offset %s, %d chunks of %s", self._format_bytes_human(remaining), self._format_bytes_human(start_offset), len(ranges) + 1, + self._format_bytes_human(chunk_size), ) else: log.info("Parallel streaming download: %d bytes, %d chunks of %d KB", @@ -1443,8 +1452,7 @@ async def _write_progress(force: bool = False) -> None: async def fetch_range(index: int, start_off: int, end_off: int, max_tries: int = 3) -> None: async with sem: - base_headers = dict(headers) if isinstance(headers, dict) \ - else dict(headers) + base_headers = dict(headers) if headers else {} base_headers["Range"] = f"bytes={start_off}-{end_off}" payload = self._build_payload("GET", url, base_headers, b"") expected = end_off - start_off + 1 @@ -1503,7 +1511,7 @@ async def fetch_range(index: int, start_off: int, end_off: int, try: # Send headers + first chunk immediately so the client sees data - # within ~8 s and doesn't hit its socket timeout. + # within ~chunk_size/bandwidth seconds and doesn't hit its timeout. if is_yt and start_offset > 0: _hdr = ( f"HTTP/1.1 206 Partial Content\r\n" @@ -1572,203 +1580,6 @@ async def fetch_range(index: int, start_off: int, end_off: int, await asyncio.gather(*tasks, return_exceptions=True) temp_file.close() - status, resp_hdrs, resp_body = self._split_raw_response(first_resp) - if status != 206: - log.info( - "Streaming download fallback: initial probe returned %s for %s", - status, url[:80], - ) - return False - - parsed_range = self._parse_content_range(resp_hdrs.get("content-range", "")) - if not parsed_range: - log.info( - "Streaming download fallback: missing/invalid Content-Range for %s", - url[:80], - ) - return False - first_start, first_end, total_size = parsed_range - first_err = self._validate_range_response( - status, resp_hdrs, resp_body, first_start, first_end, total_size, - ) - if first_start != 0 or first_err: - log.info( - "Streaming download fallback: invalid first range (%s) for %s", - first_err or f"start={first_start}", - url[:80], - ) - return False - if min_size > 0 and total_size < min_size: - log.info( - "Streaming download fallback: file too small (%d < %d) for %s", - total_size, min_size, url[:80], - ) - return False - if max_chunks > 0: - required_chunk_size = max( - chunk_size, - (total_size + max_chunks - 1) // max_chunks, - ) - if required_chunk_size != chunk_size: - log.info( - "Parallel download tuning: chunk size raised from %d KB to %d KB " - "to keep request count under %d", - chunk_size // 1024, - required_chunk_size // 1024, - max_chunks, - ) - chunk_size = required_chunk_size - - if total_size <= chunk_size or len(resp_body) >= total_size: - writer.write(self._render_streaming_headers(resp_hdrs, total_size)) - writer.write(resp_body) - await writer.drain() - return True - - ranges = [] - start = len(resp_body) - while start < total_size: - end = min(start + chunk_size - 1, total_size - 1) - ranges.append((start, end)) - start = end + 1 - - log.info("Parallel streaming download: %d bytes, %d chunks of %d KB", - total_size, len(ranges) + 1, chunk_size // 1024) - - temp_file = tempfile.TemporaryFile(prefix="mhrvpn_dl_") - file_lock = asyncio.Lock() - sem = asyncio.Semaphore(max_parallel) - cancel_event = asyncio.Event() - tasks: list[asyncio.Task] = [] - ready = [asyncio.Event() for _ in ranges] - errors: list[Exception | None] = [None for _ in ranges] - delivered_chunks = 1 - delivered_bytes = len(resp_body) - total_chunks = len(ranges) + 1 - last_progress_log = time.perf_counter() - t0 = time.perf_counter() - - async def _write_progress(force: bool = False) -> None: - nonlocal last_progress_log - now = time.perf_counter() - if not force and (now - last_progress_log) < 5.0: - return - elapsed = max(0.001, now - t0) - speed_bps = delivered_bytes / elapsed - log.info( - "Parallel download progress: %s [%d/%d chunks]", - self._progress_line( - elapsed=elapsed, - done=delivered_bytes, - total=total_size, - speed_bytes_per_sec=speed_bps, - ), - delivered_chunks, total_chunks, - ) - last_progress_log = now - - async def fetch_range(index: int, start_off: int, end_off: int, - max_tries: int = 3) -> None: - async with sem: - base_headers = dict(headers) if headers else {} - base_headers["Range"] = f"bytes={start_off}-{end_off}" - payload = self._build_payload("GET", url, base_headers, b"") - expected = end_off - start_off + 1 - last_err = "unknown" - try: - for attempt in range(max_tries): - if cancel_event.is_set(): - return - try: - raw = await self._relay_payload_h1(payload) - chunk_status, chunk_headers, chunk_body = self._split_raw_response(raw) - err = self._validate_range_response( - chunk_status, chunk_headers, chunk_body, - start_off, end_off, total_size, - ) - if err is None: - async with file_lock: - await asyncio.to_thread( - self._spool_write, temp_file, start_off, chunk_body, - ) - ready[index].set() - return - last_err = err - except Exception as exc: - last_err = repr(exc) - if cancel_event.is_set(): - return - log.warning("Range %d-%d retry %d/%d: %s", - start_off, end_off, attempt + 1, max_tries, last_err) - await asyncio.sleep(0.3 * (attempt + 1)) - errors[index] = RuntimeError( - f"chunk {start_off}-{end_off} failed after {max_tries} tries: {last_err}" - ) - ready[index].set() - except asyncio.CancelledError: - raise - - try: - writer.write(self._render_streaming_headers(resp_hdrs, total_size)) - writer.write(resp_body) - await writer.drain() - - for index, (start_off, end_off) in enumerate(ranges): - tasks.append(asyncio.create_task(fetch_range(index, start_off, end_off))) - - for index, (start_off, end_off) in enumerate(ranges): - await ready[index].wait() - if errors[index] is not None: - raise errors[index] - expected = end_off - start_off + 1 - async with file_lock: - chunk = await asyncio.to_thread( - self._spool_read, temp_file, start_off, expected, - ) - if len(chunk) != expected: - raise RuntimeError( - f"spooled chunk {start_off}-{end_off} was truncated " - f"({len(chunk)}/{expected} B)" - ) - writer.write(chunk) - await writer.drain() - delivered_chunks += 1 - delivered_bytes += len(chunk) - await _write_progress(force=(index == len(ranges) - 1)) - - elapsed = max(0.001, time.perf_counter() - t0) - log.info( - "Parallel streaming download complete: %s", - self._progress_line( - elapsed=elapsed, - done=total_size, - total=total_size, - speed_bytes_per_sec=total_size / elapsed, - ), - ) - return True - except (ConnectionError, BrokenPipeError, TimeoutError) as exc: - log.info("Parallel download cancelled by client: %s", exc) - cancel_event.set() - return True - except Exception as exc: - self._mark_stream_download_failure(url, str(exc)) - log.error("Parallel streaming download failed (%s): %s", url[:60], exc) - cancel_event.set() - try: - if not writer.is_closing(): - writer.close() - except Exception: - pass - return True - finally: - cancel_event.set() - for task in tasks: - task.cancel() - if tasks: - await asyncio.gather(*tasks, return_exceptions=True) - temp_file.close() - @staticmethod def _rewrite_206_to_200(raw: bytes) -> bytes: """Rewrite a 206 Partial Content response to 200 OK. diff --git a/src/proxy_server.py b/src/proxy_server.py index 3fb2677..3929b3a 100644 --- a/src/proxy_server.py +++ b/src/proxy_server.py @@ -212,6 +212,12 @@ def __init__(self, config: dict): self._download_max_chunks = self._cfg_int( config, "chunked_download_max_chunks", 256, minimum=1, ) + self._yt_chunk_size = self._cfg_int( + config, "yt_chunk_size", 4 * 1024 * 1024, minimum=64 * 1024, + ) + self._yt_max_parallel = self._cfg_int( + config, "yt_max_parallel", 4, minimum=1, + ) self._download_extensions, self._download_any_extension = ( self._normalize_download_extensions( config.get( @@ -1404,16 +1410,16 @@ async def _maybe_stream_download(self, method: str, url: str, writer) -> bool: if method.upper() != "GET" or body: return False + is_yt = "googlevideo.com" in url and "clen=" in url if headers: for key in headers: if key.lower() == "range": - # YouTube fast path: yt-dlp sends Range for resume, but - # stream_parallel_download handles chunking from clen. - if "googlevideo.com" in url and "clen=" in url: + # YouTube: yt-dlp sends Range for resume but we handle + # chunking internally — let it through to streaming path. + if is_yt: break return False effective_headers = headers or {} - is_yt = "googlevideo.com" in url and "clen=" in url if not is_yt and not self._is_likely_download(url, effective_headers): return False if not self.fronter.stream_download_allowed(url): @@ -1426,6 +1432,8 @@ async def _maybe_stream_download(self, method: str, url: str, max_parallel=self._download_max_parallel, max_chunks=self._download_max_chunks, min_size=self._download_min_size, + yt_chunk_size=self._yt_chunk_size, + yt_max_parallel=self._yt_max_parallel, ) # ── Plain HTTP forwarding ─────────────────────────────────────