diff --git a/config.example.json b/config.example.json index 0ec2456..9ef0036 100644 --- a/config.example.json +++ b/config.example.json @@ -53,6 +53,8 @@ "chunked_download_chunk_size": 524288, "chunked_download_max_parallel": 8, "chunked_download_max_chunks": 256, + "yt_chunk_size": 4194304, + "yt_max_parallel": 4, "block_hosts": [], "bypass_hosts": [ "localhost", diff --git a/src/domain_fronter.py b/src/domain_fronter.py index c2581f0..29db0fd 100644 --- a/src/domain_fronter.py +++ b/src/domain_fronter.py @@ -19,7 +19,7 @@ import tempfile import time from dataclasses import dataclass -from urllib.parse import urlparse +from urllib.parse import parse_qs, urlparse try: import certifi @@ -565,6 +565,42 @@ async def _relay_payload_h1(self, payload: dict) -> bytes: else: raise + async def _probe_with_retry(self, url: str, headers: dict, + start_off: int, end_off: int, + *, + max_tries: int = 5, + timeout: float = 15.0) -> bytes: + """Range probe with configurable per-attempt timeout and retry count. + + Intended for cases where the default ``_range_probe`` budget + (3 × 25 s = 75 s worst-case) is too slow — for example when the + caller must respond before a client-side socket timeout fires. + Retries on both exceptions and 5xx status codes. + """ + probe_headers = dict(headers) if headers else {} + probe_headers["Range"] = f"bytes={start_off}-{end_off}" + payload = self._build_payload("GET", url, probe_headers, b"") + last_raw = b"" + for attempt in range(max_tries): + try: + last_raw = await asyncio.wait_for( + self._relay_payload_h1(payload), timeout=timeout + ) + status, _, _ = self._split_raw_response(last_raw) + if status == 206 or status < 500: + return last_raw + log.warning( + "Range probe %d-%d attempt %d/%d: status %d, retrying", + start_off, end_off, attempt + 1, max_tries, status, + ) + except Exception as exc: + log.warning( + "Range probe %d-%d attempt %d/%d failed: %r, retrying", + start_off, end_off, attempt + 1, max_tries, exc, + ) + await asyncio.sleep(1.0 * (attempt + 1)) + return last_raw + async def _range_probe(self, url: str, headers: dict, start_off: int, end_off: int, *, max_tries: int = 3) -> bytes: probe_headers = dict(headers) if headers else {} @@ -1240,15 +1276,58 @@ async def stream_parallel_download(self, url: str, headers: dict, chunk_size: int = 512 * 1024, max_parallel: int = 8, max_chunks: int = 256, - min_size: int = 0) -> bool: + min_size: int = 0, + yt_chunk_size: int = 4 * 1024 * 1024, + yt_max_parallel: int = 4) -> bool: """Stream a large range-capable download to the client incrementally. Returns False when the target should fall back to the normal relay path (for example no range support or the file is too small). Returns True once this method has taken ownership of the client response, even if the stream later aborts. + + YouTube/googlevideo fast path + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + googlevideo URLs carry ``clen=`` so the file size is known + upfront. This lets us skip the Content-Range probe, use larger chunks + (``yt_chunk_size``, default 4 MiB) and lower parallelism + (``yt_max_parallel``, default 4) to stay within Apps Script quotas. + The robust ``_probe_with_retry`` helper is used instead of + ``_range_probe`` to avoid racing the client's socket timeout. + Resume via ``Range: bytes=N-`` (e.g. from yt-dlp) is honoured. """ - first_resp = await self._range_probe(url, headers, 0, chunk_size - 1) + # ── YouTube / googlevideo fast path ─────────────────────────────── + is_yt = "googlevideo.com" in url + if is_yt: + _qs = parse_qs(urlparse(url).query) + _clen = _qs.get("clen", [None])[0] + if _clen: + chunk_size = yt_chunk_size + max_parallel = yt_max_parallel + total_size = int(_clen) + else: + is_yt = False + + # Honour yt-dlp resume: Range: bytes=N- means "start from byte N" + start_offset = 0 + if is_yt and headers: + for _k, _v in headers.items(): + if _k.lower() == "range": + _m = re.match(r"bytes=(\d+)-$", str(_v)) + if _m: + start_offset = int(_m.group(1)) + break + # ───────────────────────────────────────────────────────────────── + + if is_yt: + # Use _probe_with_retry: 5 × 15 s budget is well within yt-dlp's + # default 60 s socket timeout; _range_probe's 3 × 25 s could race. + _end = min(start_offset + chunk_size - 1, total_size - 1) + first_resp = await self._probe_with_retry( + url, headers, start_offset, _end, + ) + else: + first_resp = await self._range_probe(url, headers, 0, chunk_size - 1) status, resp_hdrs, resp_body = self._split_raw_response(first_resp) if status != 206: @@ -1258,60 +1337,85 @@ async def stream_parallel_download(self, url: str, headers: dict, ) return False - parsed_range = self._parse_content_range(resp_hdrs.get("content-range", "")) - if not parsed_range: - log.info( - "Streaming download fallback: missing/invalid Content-Range for %s", - url[:80], - ) - return False - first_start, first_end, total_size = parsed_range - first_err = self._validate_range_response( - status, resp_hdrs, resp_body, first_start, first_end, total_size, - ) - if first_start != 0 or first_err: - log.info( - "Streaming download fallback: invalid first range (%s) for %s", - first_err or f"start={first_start}", - url[:80], - ) - return False - if min_size > 0 and total_size < min_size: - log.info( - "Streaming download fallback: file too small (%d < %d) for %s", - total_size, min_size, url[:80], - ) - return False - if max_chunks > 0: - required_chunk_size = max( - chunk_size, - (total_size + max_chunks - 1) // max_chunks, + if is_yt: + # File size already known from clen — skip Content-Range parsing. + first_err = None if resp_body else "empty probe body" + else: + parsed_range = self._parse_content_range(resp_hdrs.get("content-range", "")) + if not parsed_range: + log.info( + "Streaming download fallback: missing/invalid Content-Range for %s", + url[:80], + ) + return False + first_start, first_end, total_size = parsed_range + first_err = self._validate_range_response( + status, resp_hdrs, resp_body, first_start, first_end, total_size, ) - if required_chunk_size != chunk_size: + if first_start != 0 or first_err: log.info( - "Parallel download tuning: chunk size raised from %d KB to %d KB " - "to keep request count under %d", - chunk_size // 1024, - required_chunk_size // 1024, - max_chunks, + "Streaming download fallback: invalid first range (%s) for %s", + first_err or f"start={first_start}", + url[:80], ) - chunk_size = required_chunk_size + return False - if total_size <= chunk_size or len(resp_body) >= total_size: - writer.write(self._render_streaming_headers(resp_hdrs, total_size)) - writer.write(resp_body) + if not is_yt: + if min_size > 0 and total_size < min_size: + log.info( + "Streaming download fallback: file too small (%d < %d) for %s", + total_size, min_size, url[:80], + ) + return False + if max_chunks > 0: + required_chunk_size = max( + chunk_size, + (total_size + max_chunks - 1) // max_chunks, + ) + if required_chunk_size != chunk_size: + log.info( + "Parallel download tuning: chunk size raised from %d KB to %d KB " + "to keep request count under %d", + chunk_size // 1024, + required_chunk_size // 1024, + max_chunks, + ) + chunk_size = required_chunk_size + + remaining = total_size - start_offset + + if remaining <= chunk_size or len(resp_body) >= remaining: + if is_yt and start_offset > 0: + _hdr = ( + f"HTTP/1.1 206 Partial Content\r\n" + f"Content-Range: bytes {start_offset}-{total_size - 1}/{total_size}\r\n" + f"Content-Length: {remaining}\r\n\r\n" + ) + writer.write(_hdr.encode() + resp_body) + else: + writer.write(self._render_streaming_headers(resp_hdrs, total_size)) + writer.write(resp_body) await writer.drain() return True ranges = [] - start = len(resp_body) + start = start_offset + len(resp_body) while start < total_size: end = min(start + chunk_size - 1, total_size - 1) ranges.append((start, end)) start = end + 1 - log.info("Parallel streaming download: %d bytes, %d chunks of %d KB", - total_size, len(ranges) + 1, chunk_size // 1024) + if is_yt: + log.info( + "YT stream: %s from offset %s, %d chunks of %s", + self._format_bytes_human(remaining), + self._format_bytes_human(start_offset), + len(ranges) + 1, + self._format_bytes_human(chunk_size), + ) + else: + log.info("Parallel streaming download: %d bytes, %d chunks of %d KB", + total_size, len(ranges) + 1, chunk_size // 1024) temp_file = tempfile.TemporaryFile(prefix="mhrvpn_dl_") file_lock = asyncio.Lock() @@ -1338,7 +1442,7 @@ async def _write_progress(force: bool = False) -> None: self._progress_line( elapsed=elapsed, done=delivered_bytes, - total=total_size, + total=remaining, speed_bytes_per_sec=speed_bps, ), delivered_chunks, total_chunks, @@ -1359,35 +1463,64 @@ async def fetch_range(index: int, start_off: int, end_off: int, return try: raw = await self._relay_payload_h1(payload) - chunk_status, chunk_headers, chunk_body = self._split_raw_response(raw) - err = self._validate_range_response( - chunk_status, chunk_headers, chunk_body, - start_off, end_off, total_size, - ) - if err is None: - async with file_lock: - await asyncio.to_thread( - self._spool_write, temp_file, start_off, chunk_body, - ) - ready[index].set() - return - last_err = err + chunk_status, chunk_headers, chunk_body = \ + self._split_raw_response(raw) + if is_yt: + if chunk_status != 206: + last_err = f"unexpected status {chunk_status}" + elif len(chunk_body) == expected: + async with file_lock: + await asyncio.to_thread( + self._spool_write, temp_file, + start_off, chunk_body, + ) + ready[index].set() + return + else: + last_err = (f"short chunk " + f"{len(chunk_body)}/{expected} B") + else: + err = self._validate_range_response( + chunk_status, chunk_headers, chunk_body, + start_off, end_off, total_size, + ) + if err is None: + async with file_lock: + await asyncio.to_thread( + self._spool_write, temp_file, + start_off, chunk_body, + ) + ready[index].set() + return + last_err = err except Exception as exc: last_err = repr(exc) if cancel_event.is_set(): return log.warning("Range %d-%d retry %d/%d: %s", - start_off, end_off, attempt + 1, max_tries, last_err) + start_off, end_off, attempt + 1, + max_tries, last_err) await asyncio.sleep(0.3 * (attempt + 1)) errors[index] = RuntimeError( - f"chunk {start_off}-{end_off} failed after {max_tries} tries: {last_err}" + f"chunk {start_off}-{end_off} failed after " + f"{max_tries} tries: {last_err}" ) ready[index].set() except asyncio.CancelledError: raise try: - writer.write(self._render_streaming_headers(resp_hdrs, total_size)) + # Send headers + first chunk immediately so the client sees data + # within ~chunk_size/bandwidth seconds and doesn't hit its timeout. + if is_yt and start_offset > 0: + _hdr = ( + f"HTTP/1.1 206 Partial Content\r\n" + f"Content-Range: bytes {start_offset}-{total_size - 1}/{total_size}\r\n" + f"Content-Length: {remaining}\r\n\r\n" + ) + writer.write(_hdr.encode()) + else: + writer.write(self._render_streaming_headers(resp_hdrs, total_size)) writer.write(resp_body) await writer.drain() @@ -1419,9 +1552,9 @@ async def fetch_range(index: int, start_off: int, end_off: int, "Parallel streaming download complete: %s", self._progress_line( elapsed=elapsed, - done=total_size, - total=total_size, - speed_bytes_per_sec=total_size / elapsed, + done=remaining, + total=remaining, + speed_bytes_per_sec=remaining / elapsed, ), ) return True diff --git a/src/proxy_server.py b/src/proxy_server.py index ff83b47..3929b3a 100644 --- a/src/proxy_server.py +++ b/src/proxy_server.py @@ -212,6 +212,12 @@ def __init__(self, config: dict): self._download_max_chunks = self._cfg_int( config, "chunked_download_max_chunks", 256, minimum=1, ) + self._yt_chunk_size = self._cfg_int( + config, "yt_chunk_size", 4 * 1024 * 1024, minimum=64 * 1024, + ) + self._yt_max_parallel = self._cfg_int( + config, "yt_max_parallel", 4, minimum=1, + ) self._download_extensions, self._download_any_extension = ( self._normalize_download_extensions( config.get( @@ -1359,15 +1365,21 @@ async def _relay_smart(self, method, url, headers, body): challenge pages. """ if method == "GET" and not body: - # Respect client's own Range header verbatim. + # Respect client's own Range header verbatim — + # EXCEPT for YouTube: stream_parallel_download handles chunking + # internally using clen, so yt-dlp's own Range header is ignored. if headers: for k in headers: if k.lower() == "range": + if "googlevideo.com" in url and "clen=" in url: + break # fall through to relay_parallel below return await self.fronter.relay( method, url, headers, body ) # Only probe with Range when the URL looks like a big file. - if self._is_likely_download(url, headers): + if self._is_likely_download(url, headers) or ( + "googlevideo.com" in url and "clen=" in url + ): return await self.fronter.relay_parallel( method, url, @@ -1398,12 +1410,17 @@ async def _maybe_stream_download(self, method: str, url: str, writer) -> bool: if method.upper() != "GET" or body: return False + is_yt = "googlevideo.com" in url and "clen=" in url if headers: for key in headers: if key.lower() == "range": + # YouTube: yt-dlp sends Range for resume but we handle + # chunking internally — let it through to streaming path. + if is_yt: + break return False effective_headers = headers or {} - if not self._is_likely_download(url, effective_headers): + if not is_yt and not self._is_likely_download(url, effective_headers): return False if not self.fronter.stream_download_allowed(url): return False @@ -1415,6 +1432,8 @@ async def _maybe_stream_download(self, method: str, url: str, max_parallel=self._download_max_parallel, max_chunks=self._download_max_chunks, min_size=self._download_min_size, + yt_chunk_size=self._yt_chunk_size, + yt_max_parallel=self._yt_max_parallel, ) # ── Plain HTTP forwarding ─────────────────────────────────────