Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
"chunked_download_chunk_size": 524288,
"chunked_download_max_parallel": 8,
"chunked_download_max_chunks": 256,
"yt_chunk_size": 4194304,
"yt_max_parallel": 4,
"block_hosts": [],
"bypass_hosts": [
"localhost",
Expand Down
261 changes: 197 additions & 64 deletions src/domain_fronter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import tempfile
import time
from dataclasses import dataclass
from urllib.parse import urlparse
from urllib.parse import parse_qs, urlparse

try:
import certifi
Expand Down Expand Up @@ -565,6 +565,42 @@ async def _relay_payload_h1(self, payload: dict) -> bytes:
else:
raise

async def _probe_with_retry(self, url: str, headers: dict,
start_off: int, end_off: int,
*,
max_tries: int = 5,
timeout: float = 15.0) -> bytes:
"""Range probe with configurable per-attempt timeout and retry count.

Intended for cases where the default ``_range_probe`` budget
(3 × 25 s = 75 s worst-case) is too slow — for example when the
caller must respond before a client-side socket timeout fires.
Retries on both exceptions and 5xx status codes.
"""
probe_headers = dict(headers) if headers else {}
probe_headers["Range"] = f"bytes={start_off}-{end_off}"
payload = self._build_payload("GET", url, probe_headers, b"")
last_raw = b""
for attempt in range(max_tries):
try:
last_raw = await asyncio.wait_for(
self._relay_payload_h1(payload), timeout=timeout
)
status, _, _ = self._split_raw_response(last_raw)
if status == 206 or status < 500:
return last_raw
log.warning(
"Range probe %d-%d attempt %d/%d: status %d, retrying",
start_off, end_off, attempt + 1, max_tries, status,
)
except Exception as exc:
log.warning(
"Range probe %d-%d attempt %d/%d failed: %r, retrying",
start_off, end_off, attempt + 1, max_tries, exc,
)
await asyncio.sleep(1.0 * (attempt + 1))
return last_raw

async def _range_probe(self, url: str, headers: dict, start_off: int,
end_off: int, *, max_tries: int = 3) -> bytes:
probe_headers = dict(headers) if headers else {}
Expand Down Expand Up @@ -1240,15 +1276,58 @@ async def stream_parallel_download(self, url: str, headers: dict,
chunk_size: int = 512 * 1024,
max_parallel: int = 8,
max_chunks: int = 256,
min_size: int = 0) -> bool:
min_size: int = 0,
yt_chunk_size: int = 4 * 1024 * 1024,
yt_max_parallel: int = 4) -> bool:
"""Stream a large range-capable download to the client incrementally.

Returns False when the target should fall back to the normal relay
path (for example no range support or the file is too small).
Returns True once this method has taken ownership of the client
response, even if the stream later aborts.

YouTube/googlevideo fast path
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
googlevideo URLs carry ``clen=<total_bytes>`` so the file size is known
upfront. This lets us skip the Content-Range probe, use larger chunks
(``yt_chunk_size``, default 4 MiB) and lower parallelism
(``yt_max_parallel``, default 4) to stay within Apps Script quotas.
The robust ``_probe_with_retry`` helper is used instead of
``_range_probe`` to avoid racing the client's socket timeout.
Resume via ``Range: bytes=N-`` (e.g. from yt-dlp) is honoured.
"""
first_resp = await self._range_probe(url, headers, 0, chunk_size - 1)
# ── YouTube / googlevideo fast path ───────────────────────────────
is_yt = "googlevideo.com" in url
if is_yt:
_qs = parse_qs(urlparse(url).query)
_clen = _qs.get("clen", [None])[0]
if _clen:
chunk_size = yt_chunk_size
max_parallel = yt_max_parallel
total_size = int(_clen)
else:
is_yt = False

# Honour yt-dlp resume: Range: bytes=N- means "start from byte N"
start_offset = 0
if is_yt and headers:
for _k, _v in headers.items():
if _k.lower() == "range":
_m = re.match(r"bytes=(\d+)-$", str(_v))
if _m:
start_offset = int(_m.group(1))
break
# ─────────────────────────────────────────────────────────────────

if is_yt:
# Use _probe_with_retry: 5 × 15 s budget is well within yt-dlp's
# default 60 s socket timeout; _range_probe's 3 × 25 s could race.
_end = min(start_offset + chunk_size - 1, total_size - 1)
first_resp = await self._probe_with_retry(
url, headers, start_offset, _end,
)
else:
first_resp = await self._range_probe(url, headers, 0, chunk_size - 1)

status, resp_hdrs, resp_body = self._split_raw_response(first_resp)
if status != 206:
Expand All @@ -1258,60 +1337,85 @@ async def stream_parallel_download(self, url: str, headers: dict,
)
return False

parsed_range = self._parse_content_range(resp_hdrs.get("content-range", ""))
if not parsed_range:
log.info(
"Streaming download fallback: missing/invalid Content-Range for %s",
url[:80],
)
return False
first_start, first_end, total_size = parsed_range
first_err = self._validate_range_response(
status, resp_hdrs, resp_body, first_start, first_end, total_size,
)
if first_start != 0 or first_err:
log.info(
"Streaming download fallback: invalid first range (%s) for %s",
first_err or f"start={first_start}",
url[:80],
)
return False
if min_size > 0 and total_size < min_size:
log.info(
"Streaming download fallback: file too small (%d < %d) for %s",
total_size, min_size, url[:80],
)
return False
if max_chunks > 0:
required_chunk_size = max(
chunk_size,
(total_size + max_chunks - 1) // max_chunks,
if is_yt:
# File size already known from clen — skip Content-Range parsing.
first_err = None if resp_body else "empty probe body"
else:
parsed_range = self._parse_content_range(resp_hdrs.get("content-range", ""))
if not parsed_range:
log.info(
"Streaming download fallback: missing/invalid Content-Range for %s",
url[:80],
)
return False
first_start, first_end, total_size = parsed_range
first_err = self._validate_range_response(
status, resp_hdrs, resp_body, first_start, first_end, total_size,
)
if required_chunk_size != chunk_size:
if first_start != 0 or first_err:
log.info(
"Parallel download tuning: chunk size raised from %d KB to %d KB "
"to keep request count under %d",
chunk_size // 1024,
required_chunk_size // 1024,
max_chunks,
"Streaming download fallback: invalid first range (%s) for %s",
first_err or f"start={first_start}",
url[:80],
)
chunk_size = required_chunk_size
return False

if total_size <= chunk_size or len(resp_body) >= total_size:
writer.write(self._render_streaming_headers(resp_hdrs, total_size))
writer.write(resp_body)
if not is_yt:
if min_size > 0 and total_size < min_size:
log.info(
"Streaming download fallback: file too small (%d < %d) for %s",
total_size, min_size, url[:80],
)
return False
if max_chunks > 0:
required_chunk_size = max(
chunk_size,
(total_size + max_chunks - 1) // max_chunks,
)
if required_chunk_size != chunk_size:
log.info(
"Parallel download tuning: chunk size raised from %d KB to %d KB "
"to keep request count under %d",
chunk_size // 1024,
required_chunk_size // 1024,
max_chunks,
)
chunk_size = required_chunk_size

remaining = total_size - start_offset

if remaining <= chunk_size or len(resp_body) >= remaining:
if is_yt and start_offset > 0:
_hdr = (
f"HTTP/1.1 206 Partial Content\r\n"
f"Content-Range: bytes {start_offset}-{total_size - 1}/{total_size}\r\n"
f"Content-Length: {remaining}\r\n\r\n"
)
writer.write(_hdr.encode() + resp_body)
else:
writer.write(self._render_streaming_headers(resp_hdrs, total_size))
writer.write(resp_body)
await writer.drain()
return True

ranges = []
start = len(resp_body)
start = start_offset + len(resp_body)
while start < total_size:
end = min(start + chunk_size - 1, total_size - 1)
ranges.append((start, end))
start = end + 1

log.info("Parallel streaming download: %d bytes, %d chunks of %d KB",
total_size, len(ranges) + 1, chunk_size // 1024)
if is_yt:
log.info(
"YT stream: %s from offset %s, %d chunks of %s",
self._format_bytes_human(remaining),
self._format_bytes_human(start_offset),
len(ranges) + 1,
self._format_bytes_human(chunk_size),
)
else:
log.info("Parallel streaming download: %d bytes, %d chunks of %d KB",
total_size, len(ranges) + 1, chunk_size // 1024)

temp_file = tempfile.TemporaryFile(prefix="mhrvpn_dl_")
file_lock = asyncio.Lock()
Expand All @@ -1338,7 +1442,7 @@ async def _write_progress(force: bool = False) -> None:
self._progress_line(
elapsed=elapsed,
done=delivered_bytes,
total=total_size,
total=remaining,
speed_bytes_per_sec=speed_bps,
),
delivered_chunks, total_chunks,
Expand All @@ -1359,35 +1463,64 @@ async def fetch_range(index: int, start_off: int, end_off: int,
return
try:
raw = await self._relay_payload_h1(payload)
chunk_status, chunk_headers, chunk_body = self._split_raw_response(raw)
err = self._validate_range_response(
chunk_status, chunk_headers, chunk_body,
start_off, end_off, total_size,
)
if err is None:
async with file_lock:
await asyncio.to_thread(
self._spool_write, temp_file, start_off, chunk_body,
)
ready[index].set()
return
last_err = err
chunk_status, chunk_headers, chunk_body = \
self._split_raw_response(raw)
if is_yt:
if chunk_status != 206:
last_err = f"unexpected status {chunk_status}"
elif len(chunk_body) == expected:
async with file_lock:
await asyncio.to_thread(
self._spool_write, temp_file,
start_off, chunk_body,
)
ready[index].set()
return
else:
last_err = (f"short chunk "
f"{len(chunk_body)}/{expected} B")
else:
err = self._validate_range_response(
chunk_status, chunk_headers, chunk_body,
start_off, end_off, total_size,
)
if err is None:
async with file_lock:
await asyncio.to_thread(
self._spool_write, temp_file,
start_off, chunk_body,
)
ready[index].set()
return
last_err = err
except Exception as exc:
last_err = repr(exc)
if cancel_event.is_set():
return
log.warning("Range %d-%d retry %d/%d: %s",
start_off, end_off, attempt + 1, max_tries, last_err)
start_off, end_off, attempt + 1,
max_tries, last_err)
await asyncio.sleep(0.3 * (attempt + 1))
errors[index] = RuntimeError(
f"chunk {start_off}-{end_off} failed after {max_tries} tries: {last_err}"
f"chunk {start_off}-{end_off} failed after "
f"{max_tries} tries: {last_err}"
)
ready[index].set()
except asyncio.CancelledError:
raise

try:
writer.write(self._render_streaming_headers(resp_hdrs, total_size))
# Send headers + first chunk immediately so the client sees data
# within ~chunk_size/bandwidth seconds and doesn't hit its timeout.
if is_yt and start_offset > 0:
_hdr = (
f"HTTP/1.1 206 Partial Content\r\n"
f"Content-Range: bytes {start_offset}-{total_size - 1}/{total_size}\r\n"
f"Content-Length: {remaining}\r\n\r\n"
)
writer.write(_hdr.encode())
else:
writer.write(self._render_streaming_headers(resp_hdrs, total_size))
writer.write(resp_body)
await writer.drain()

Expand Down Expand Up @@ -1419,9 +1552,9 @@ async def fetch_range(index: int, start_off: int, end_off: int,
"Parallel streaming download complete: %s",
self._progress_line(
elapsed=elapsed,
done=total_size,
total=total_size,
speed_bytes_per_sec=total_size / elapsed,
done=remaining,
total=remaining,
speed_bytes_per_sec=remaining / elapsed,
),
)
return True
Expand Down
Loading