diff --git a/README.md b/README.md index 6b3c6d2e..0b0a6f2a 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,7 @@ maturin build --release ## Benchmark -Outperforms `requests`, `httpx`, `aiohttp`, and `curl_cffi`, and you can see the [benchmark](https://github.com/0x676e67/wreq-python/tree/main/bench) for details — benchmark data is for reference only and actual performance may vary based on your environment and use case. +Outperforms `requests`, `httpx`, `aiohttp`, and `curl_cffi`, and you can see the [benchmark](https://github.com/0x676e67/wreq-python/tree/main/bench) for details. The benchmark suite is driven by `pytest-benchmark`, and results are still for reference only because network and system conditions affect outcomes. ## Services diff --git a/bench/README.md b/bench/README.md index a6fc1aa9..3eea87e6 100644 --- a/bench/README.md +++ b/bench/README.md @@ -2,37 +2,6 @@ benchmark between wreq and other python http clients - -Machine ------- - -```log - ..' MacBook - ,xNMM. ---------------- - .OMMMMo OS: macOS Sequoia 15.7.1 arm64 - lMM" Host: MacBook Pro (16-inch, Nov 2023, Three Thunderbolt 4) - .;loddo:. .olloddol;. Kernel: Darwin 24.6.0 - cKMMMMMMMMMMNWMMMMMMMMMM0: Uptime: 300 days, 18 hours, 5 mins - .KMMMMMMMMMMMMMMMMMMMMMMMWd. Packages: 117 (brew), 11 (brew-cask) - XMMMMMMMMMMMMMMMMMMMMMMMX. Shell: zsh 5.9 -;MMMMMMMMMMMMMMMMMMMMMMMM: Display (Color LCD): 3456x2234 @ 120 Hz (as 1728x1117) in] -:MMMMMMMMMMMMMMMMMMMMMMMM: DE: Aqua -.MMMMMMMMMMMMMMMMMMMMMMMMX. WM: Quartz Compositor - kMMMMMMMMMMMMMMMMMMMMMMMMWd. WM Theme: Blue (Dark) - 'XMMMMMMMMMMMMMMMMMMMMMMMMMMk Font: .AppleSystemUIFont [System], Helvetica [User] - 'XMMMMMMMMMMMMMMMMMMMMMMMMK. Cursor: Fill - Black, Outline - White (32px) - kMMMMMMMMMMMMMMMMMMMMMMd Terminal: iTerm 3.6.4 - ;KMMMMMMMWXXWMMMMMMMk. Terminal Font: MesloLGS-NF-Regular (14pt) - "cooc*" "*coo'" CPU: Apple M3 Max (16) @ 4.06 GHz - GPU: Apple M3 Max (40) @ 1.38 GHz [Integrated] - Memory: 41.12 GiB / 128.00 GiB (32%) - Swap: Disabled - Local IP (en0): 192.168.1.172/24 - Battery: 75% [AC connected] - Power Adapter: 140W USB-C Power Adapter - Locale: en_US.UTF-8 -``` - Sync clients ------ @@ -40,7 +9,6 @@ Sync clients - requests - niquests - pycurl -- [python-tls-client](https://github.com/FlorianREGAZ/Python-Tls-Client.git) - httpx - wreq - ry @@ -64,12 +32,12 @@ All the clients run with session/client enabled. ## Run benchmark ```bash -# Install dependencies -pip install -r requirements.txt +# Install project + benchmark dependencies +pip install -e .[bench] # Start server python server.py -# Start benchmark +# Run benchmark suite python benchmark.py ``` diff --git a/bench/benchmark.py b/bench/benchmark.py index 5aba525c..e4c91478 100644 --- a/bench/benchmark.py +++ b/bench/benchmark.py @@ -1,48 +1,79 @@ -""" -HTTP Client Benchmark Tool +"""pyperf-based HTTP client benchmark suite. + +Each benchmark measures *concurrent* HTTP throughput: the reported value is +wall-clock time per request (after normalising by --http-requests), so lower +is better. req/s ≈ 1 / mean_seconds. + +Usage:: -This module provides comprehensive benchmarking for various HTTP client libraries. -Each client has dedicated test methods to eliminate runtime overhead from dynamic dispatch. + python benchmark.py # 3 processes, 3 values, 1 warmup + python benchmark.py --fast # quick estimate (~1 process) + python benchmark.py --rigorous # more accurate (6 processes) + python benchmark.py -o results.json # save JSON for later comparison + python benchmark.py --http-requests=200 --http-workers=16 + python benchmark.py -v # verbose: show each value as it lands """ -import argparse +from __future__ import annotations + import asyncio -import time +import inspect +import sys +import threading +import urllib.error +import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed -from importlib.metadata import version +from dataclasses import dataclass +from importlib.metadata import PackageNotFoundError, version from io import BytesIO -from typing import Any, Dict, List, Tuple +from typing import Any, Awaitable, Callable -import pandas as pd -import pycurl +import pyperf # Import all HTTP clients +import pycurl import aiohttp import httpx import niquests import requests -import tls_client import curl_cffi import curl_cffi.requests import wreq import wreq.blocking -import uvloop import ry -from chart import plot_benchmark_multi +try: + import uvloop # type: ignore +except ImportError: + uvloop = None +else: + uvloop.install() -# Install uvloop for better async performance -uvloop.install() +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +PAYLOAD_SIZES = ("20k", "50k", "200k") -# ============================================================================= -# Helper Classes -# ============================================================================= +# --------------------------------------------------------------------------- +# Dataclasses +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class BenchmarkCase: + id: str + runner: Callable[..., None] -class PycurlSession: - """Wrapper for pycurl to match session interface""" +@dataclass(frozen=True) +class AsyncBenchmarkCase: + id: str + runner: Callable[[str, int], Awaitable[None]] + + +class PycurlSession: def __init__(self): self.c = pycurl.Curl() self.content = None @@ -62,626 +93,497 @@ def get(self, url): return self @property - def text(self): + def text(self) -> bytes | None: return self.content -# ============================================================================= -# Utility Functions -# ============================================================================= +# --------------------------------------------------------------------------- +# Utility helpers +# --------------------------------------------------------------------------- -def add_package_version(packages: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]: - """Add version information to package names""" - return [(f"{name} {version(name)}", cls) for name, cls in packages] +def add_package_version(packages: list[tuple[str, Any]]) -> list[tuple[str, Any]]: + results = [] + for name, target in packages: + try: + label = f"{name} {version(name)}" + except PackageNotFoundError: + label = name + results.append((label, target)) + return results -def record_test_result( - name: str, - session_type: str, - url: str, - start_time: float, - cpu_start: float, - threads: int | None = None, -) -> Dict[str, Any]: - """Record benchmark test results""" - dur = round((time.perf_counter() - start_time) * 1000, 2) # Convert to milliseconds - cpu_dur = round( - (time.process_time() - cpu_start) * 1000, 2 - ) # Convert to milliseconds +def maybe_close(resource: Any) -> None: + close = getattr(resource, "close", None) + if callable(close): + close() - result = { - "name": name, - "session": session_type, - "size": url.split("/")[-1], - "time": dur, - "cpu_time": cpu_dur, - } - if threads is not None: - result["threads"] = threads +async def maybe_aclose(resource: Any) -> None: + close = getattr(resource, "close", None) + if not callable(close): + return + result = close() + if inspect.isawaitable(result): + await result - return result +# --------------------------------------------------------------------------- +# Concurrency helpers +# --------------------------------------------------------------------------- -# ============================================================================= -# Sync Client Implementations - Session Mode -# ============================================================================= +def _run_concurrent_requests( + fetch_fn: Callable[[], None], count: int, workers: int +) -> None: + """Run *fetch_fn* concurrently *count* times using at most *workers* threads.""" + with ThreadPoolExecutor(max_workers=min(workers, count)) as executor: + futures = [executor.submit(fetch_fn) for _ in range(count)] + for future in as_completed(futures): + future.result() -def requests_session_test(url: str, count: int): - """Benchmark requests.Session""" - s = requests.Session() - try: - for _ in range(count): - s.get(url).content - finally: - s.close() +def run_parallel_non_session_case( + runner_fn: Callable[[str], None], url: str, count: int, workers: int +) -> None: + """Parallel non-session: each worker creates its own client.""" + _run_concurrent_requests(lambda: runner_fn(url), count, workers) -def httpx_session_test(url: str, count: int): - """Benchmark httpx.Client""" - s = httpx.Client() - try: - for _ in range(count): - s.get(url).content - finally: - s.close() +# --------------------------------------------------------------------------- +# Sync session benchmarks (shared client, concurrent requests) +# --------------------------------------------------------------------------- -def niquests_session_test(url: str, count: int): - """Benchmark niquests.Session""" - s = niquests.Session() - try: - for _ in range(count): - s.get(url).content - finally: + +def requests_session_test(url: str, count: int, workers: int) -> None: + with requests.Session() as s: + _run_concurrent_requests(lambda: s.get(url).content, count, workers) s.close() -def tls_client_session_test(url: str, count: int): - """Benchmark tls_client.Session""" - s = tls_client.Session() - try: - for _ in range(count): - s.get(url).content - finally: +def httpx_session_test(url: str, count: int, workers: int) -> None: + with httpx.Client() as s: + _run_concurrent_requests(lambda: s.get(url).content, count, workers) s.close() -def curl_cffi_session_test(url: str, count: int): - """Benchmark curl_cffi.requests.Session""" - s = curl_cffi.requests.Session() - try: - for _ in range(count): - s.get(url).content - finally: +def niquests_session_test(url: str, count: int, workers: int) -> None: + with niquests.Session() as s: + _run_concurrent_requests(lambda: s.get(url).content, count, workers) s.close() -def wreq_blocking_session_test(url: str, count: int): - """Benchmark wreq.blocking.Client""" - s = wreq.blocking.Client() - for _ in range(count): - s.get(url).bytes() +def curl_cffi_session_test(url: str, count: int, workers: int) -> None: + with curl_cffi.requests.Session() as s: + _run_concurrent_requests(lambda: s.get(url).content, count, workers) + s.close() -def pycurl_session_test(url: str, count: int): - """Benchmark PycurlSession""" - s = PycurlSession() - try: - for _ in range(count): - s.get(url).content - finally: +def wreq_blocking_session_test(url: str, count: int, workers: int) -> None: + with wreq.blocking.Client() as s: + _run_concurrent_requests(lambda: s.get(url).bytes(), count, workers) s.close() -def ry_blocking_session_test(url: str, count: int): - """Benchmark ry blocking Client""" - s = ry.BlockingClient() - for _ in range(count): - s.get(url).bytes() +def pycurl_session_test(url: str, count: int, workers: int) -> None: + # pycurl Curl handles are not thread-safe; use thread-local storage so each + # worker thread gets its own handle that persists across its assigned requests. + _local = threading.local() + def _fetch() -> None: + if not hasattr(_local, "curl"): + _local.curl = pycurl.Curl() + buf = BytesIO() + _local.curl.setopt(pycurl.URL, url) + _local.curl.setopt(pycurl.WRITEDATA, buf) + _local.curl.perform() -# ============================================================================= -# Sync Client Implementations - Non-Session Mode -# ============================================================================= + _run_concurrent_requests(_fetch, count, workers) -def requests_non_session_test(url: str, count: int): - """Benchmark requests without session""" - for _ in range(count): - s = requests.Session() - try: - s.get(url).content - finally: - s.close() +def ry_blocking_session_test(url: str, count: int, workers: int) -> None: + s = ry.BlockingClient() + try: + _run_concurrent_requests(lambda: s.get(url).bytes(), count, workers) + finally: + maybe_close(s) -def httpx_non_session_test(url: str, count: int): - """Benchmark httpx without session""" - for _ in range(count): - s = httpx.Client() - try: - s.get(url).content - finally: - s.close() +# --------------------------------------------------------------------------- +# Sync non-session benchmarks (one request per call; parallelism is external) +# --------------------------------------------------------------------------- -def niquests_non_session_test(url: str, count: int): - """Benchmark niquests without session""" - for _ in range(count): - s = niquests.Session() - try: - s.get(url).content - finally: - s.close() +def requests_non_session_test(url: str) -> None: + with requests.get(url) as resp: + resp.content + resp.close() -def tls_client_non_session_test(url: str, count: int): - """Benchmark tls_client without session""" - for _ in range(count): - s = tls_client.Session() - try: - s.get(url).content - finally: - s.close() +def httpx_non_session_test(url: str) -> None: + resp = httpx.get(url) + _ = resp.content + resp.close() -def curl_cffi_non_session_test(url: str, count: int): - """Benchmark curl_cffi without session""" - for _ in range(count): - s = curl_cffi.requests.Session() - try: - s.get(url).content - finally: - s.close() +def niquests_non_session_test(url: str) -> None: + with niquests.get(url) as resp: + _ = resp.content + resp.close() -def wreq_blocking_non_session_test(url: str, count: int): - """Benchmark wreq.blocking without session""" - for _ in range(count): - s = wreq.blocking.Client() - s.get(url).bytes() +def curl_cffi_non_session_test(url: str) -> None: + resp = curl_cffi.requests.get(url) + _ = resp.content + resp.close() -def pycurl_non_session_test(url: str, count: int): - """Benchmark pycurl without session""" - for _ in range(count): - s = PycurlSession() - try: - s.get(url).content - finally: - s.close() +def wreq_blocking_non_session_test(url: str) -> None: + with wreq.blocking.get(url) as resp: + resp.bytes() + resp.close() -def ry_blocking_non_session_test(url: str, count: int): - """Benchmark ry blocking without session""" - for _ in range(count): - s = ry.BlockingClient() - s.get(url).bytes() +def pycurl_non_session_test(url: str) -> None: + s = PycurlSession() + try: + s.get(url).content + finally: + maybe_close(s) -# ============================================================================= -# Async Client Implementations - Session Mode -# ============================================================================= +def ry_blocking_non_session_test(url: str) -> None: + s = ry.BlockingClient() + try: + s.get(url).bytes() + finally: + maybe_close(s) -async def httpx_async_session_test(url: str, count: int): - """Benchmark httpx.AsyncClient with session""" - async with httpx.AsyncClient() as s: +# --------------------------------------------------------------------------- +# Async session benchmarks (shared client, asyncio.gather) +# --------------------------------------------------------------------------- - async def _fetch(): - resp = await s.get(url) - return resp.content - tasks = [_fetch() for _ in range(count)] - await asyncio.gather(*tasks) +async def httpx_async_session_test(url: str, count: int) -> None: + async with httpx.AsyncClient() as s: + await asyncio.gather(*[s.get(url) for _ in range(count)]) -async def aiohttp_async_session_test(url: str, count: int): - """Benchmark aiohttp.ClientSession""" +async def aiohttp_async_session_test(url: str, count: int) -> None: async with aiohttp.ClientSession() as s: - async def _fetch(): + async def _fetch() -> None: async with await s.get(url) as resp: - return await resp.read() + await resp.read() - tasks = [_fetch() for _ in range(count)] - await asyncio.gather(*tasks) + await asyncio.gather(*[_fetch() for _ in range(count)]) -async def niquests_async_session_test(url: str, count: int): - """Benchmark niquests.AsyncSession""" +async def niquests_async_session_test(url: str, count: int) -> None: s = niquests.AsyncSession() + try: - async def _fetch(): - resp = await s.get(url) - return resp.content + async def _fetch() -> None: + resp = await s.get(url) + _ = resp.content - tasks = [_fetch() for _ in range(count)] - await asyncio.gather(*tasks) + await asyncio.gather(*[_fetch() for _ in range(count)]) + finally: + await maybe_aclose(s) -async def wreq_async_session_test(url: str, count: int): - """Benchmark wreq.Client with session""" +async def wreq_async_session_test(url: str, count: int) -> None: s = wreq.Client() + try: - async def _fetch(): - resp = await s.get(url) - return await resp.bytes() + async def _fetch() -> None: + resp = await s.get(url) + await resp.bytes() - tasks = [_fetch() for _ in range(count)] - await asyncio.gather(*tasks) + await asyncio.gather(*[_fetch() for _ in range(count)]) + finally: + await maybe_aclose(s) -async def curl_cffi_async_session_test(url: str, count: int): - """Benchmark curl_cffi.requests.AsyncSession""" +async def curl_cffi_async_session_test(url: str, count: int) -> None: s = curl_cffi.requests.AsyncSession() try: - async def _fetch(): + async def _fetch() -> None: resp = await s.get(url) - return resp.text + _ = resp.text - tasks = [_fetch() for _ in range(count)] - await asyncio.gather(*tasks) + await asyncio.gather(*[_fetch() for _ in range(count)]) finally: await s.close() -async def ry_async_session_test(url: str, count: int): - """Benchmark ry.HttpClient""" +async def ry_async_session_test(url: str, count: int) -> None: s = ry.HttpClient() + try: - async def _fetch(): - resp = await s.get(url) - return await resp.bytes() + async def _fetch(): + resp = await s.get(url) + return await resp.bytes() - tasks = [_fetch() for _ in range(count)] - await asyncio.gather(*tasks) + await asyncio.gather(*[_fetch() for _ in range(count)]) + finally: + await maybe_aclose(s) -# ============================================================================= -# Async Client Implementations - Non-Session Mode -# ============================================================================= +# --------------------------------------------------------------------------- +# Async non-session benchmarks (one request per call; parallelism is external) +# --------------------------------------------------------------------------- -async def httpx_async_non_session_test(url: str, count: int): - """Benchmark httpx.AsyncClient without session""" - for _ in range(count): +async def httpx_async_non_session_test(url: str, count: int) -> None: + async def _fetch() -> None: async with httpx.AsyncClient() as s: - resp = await s.get(url) - resp.text + await s.get(url) + await asyncio.gather(*[_fetch() for _ in range(count)]) -async def aiohttp_async_non_session_test(url: str, count: int): - """Benchmark aiohttp without session""" - for _ in range(count): + +async def aiohttp_async_non_session_test(url: str, count: int) -> None: + async def _fetch() -> None: async with aiohttp.ClientSession() as s: async with await s.get(url) as resp: await resp.read() + await asyncio.gather(*[_fetch() for _ in range(count)]) -async def niquests_async_non_session_test(url: str, count: int): - """Benchmark niquests without session""" - for _ in range(count): - s = niquests.AsyncSession() - try: - resp = await s.get(url) - resp.content - finally: - await s.close() +async def niquests_async_non_session_test(url: str, count: int) -> None: + async def _fetch() -> None: + async with niquests.AsyncSession() as s: + resp = await s.get(url) + _ = resp.content -async def wreq_async_non_session_test(url: str, count: int): - """Benchmark wreq without session""" - for _ in range(count): - s = wreq.Client() - resp = await s.get(url) - await resp.bytes() + await asyncio.gather(*[_fetch() for _ in range(count)]) -async def curl_cffi_async_non_session_test(url: str, count: int): - """Benchmark curl_cffi without session""" - for _ in range(count): - s = curl_cffi.requests.AsyncSession() - try: +async def wreq_async_non_session_test(url: str, count: int) -> None: + async def _fetch() -> None: + async with wreq.Client() as s: resp = await s.get(url) - resp.text - finally: - await s.close() + await resp.bytes() + await asyncio.gather(*[_fetch() for _ in range(count)]) -async def ry_async_non_session_test(url: str, count: int): - """Benchmark ry without session""" - for _ in range(count): - s = ry.HttpClient() - resp = await s.get(url) - await resp.bytes() - - -# ============================================================================= -# Test Mappings -# ============================================================================= - -# Mapping of sync client classes to their dedicated test functions -SYNC_SESSION_TESTS = { - requests.Session: requests_session_test, - httpx.Client: httpx_session_test, - niquests.Session: niquests_session_test, - tls_client.Session: tls_client_session_test, - curl_cffi.requests.Session: curl_cffi_session_test, - wreq.blocking.Client: wreq_blocking_session_test, - PycurlSession: pycurl_session_test, - ry.BlockingClient: ry_blocking_session_test, -} - -SYNC_NON_SESSION_TESTS = { - requests.Session: requests_non_session_test, - httpx.Client: httpx_non_session_test, - niquests.Session: niquests_non_session_test, - tls_client.Session: tls_client_non_session_test, - curl_cffi.requests.Session: curl_cffi_non_session_test, - wreq.blocking.Client: wreq_blocking_non_session_test, - PycurlSession: pycurl_non_session_test, - ry.BlockingClient: ry_blocking_non_session_test, -} - -# Mapping of async client classes to their dedicated test functions -ASYNC_SESSION_TESTS = { - httpx.AsyncClient: httpx_async_session_test, - aiohttp.ClientSession: aiohttp_async_session_test, - niquests.AsyncSession: niquests_async_session_test, - wreq.Client: wreq_async_session_test, - curl_cffi.requests.AsyncSession: curl_cffi_async_session_test, - ry.HttpClient: ry_async_session_test, -} - -ASYNC_NON_SESSION_TESTS = { - httpx.AsyncClient: httpx_async_non_session_test, - aiohttp.ClientSession: aiohttp_async_non_session_test, - niquests.AsyncSession: niquests_async_non_session_test, - wreq.Client: wreq_async_non_session_test, - curl_cffi.requests.AsyncSession: curl_cffi_async_non_session_test, - ry.HttpClient: ry_async_non_session_test, -} - - -# ============================================================================= -# Test Runners -# ============================================================================= - - -def run_sync_tests( - packages: List[Tuple[str, Any]], url: str, requests_number: int -) -> List[Dict[str, Any]]: - """Run synchronous benchmark tests""" - results = [] - for name, session_class in packages: - # Test with session - if session_class in SYNC_SESSION_TESTS: - start = time.perf_counter() - cpu_start = time.process_time() - SYNC_SESSION_TESTS[session_class](url, requests_number) - results.append( - record_test_result(name, "Sync-Session", url, start, cpu_start) - ) +async def curl_cffi_async_non_session_test(url: str, count: int) -> None: + async def _fetch() -> None: + async with curl_cffi.requests.AsyncSession() as s: + resp = await s.get(url) + _ = resp.text - # Test without session - if session_class in SYNC_NON_SESSION_TESTS: - start = time.perf_counter() - cpu_start = time.process_time() - SYNC_NON_SESSION_TESTS[session_class](url, requests_number) - results.append( - record_test_result(name, "Sync-NonSession", url, start, cpu_start) - ) + await asyncio.gather(*[_fetch() for _ in range(count)]) - return results +async def ry_async_non_session_test(url: str, count: int) -> None: + async def _fetch() -> None: + s = ry.HttpClient() + try: + resp = await s.get(url) + await resp.bytes() + finally: + await maybe_aclose(s) + + await asyncio.gather(*[_fetch() for _ in range(count)]) + + +# --------------------------------------------------------------------------- +# Benchmark case builders +# --------------------------------------------------------------------------- + + +def build_sync_session_cases() -> list[BenchmarkCase]: + cases = [] + if httpx is not None: + cases.append(("httpx", httpx_session_test)) + if requests is not None: + cases.append(("requests", requests_session_test)) + if niquests is not None: + cases.append(("niquests", niquests_session_test)) + if curl_cffi.requests is not None: + cases.append(("curl_cffi", curl_cffi_session_test)) + if pycurl is not None: + cases.append(("pycurl", pycurl_session_test)) + if ry is not None: + cases.append(("ry", ry_blocking_session_test)) + if wreq.blocking is not None: + cases.append(("wreq", wreq_blocking_session_test)) + return [BenchmarkCase(name, runner) for name, runner in add_package_version(cases)] + + +def build_sync_non_session_cases() -> list[BenchmarkCase]: + cases = [] + if httpx is not None: + cases.append(("httpx", httpx_non_session_test)) + if requests is not None: + cases.append(("requests", requests_non_session_test)) + if niquests is not None: + cases.append(("niquests", niquests_non_session_test)) + if curl_cffi.requests is not None: + cases.append(("curl_cffi", curl_cffi_non_session_test)) + if pycurl is not None: + cases.append(("pycurl", pycurl_non_session_test)) + if ry is not None: + cases.append(("ry", ry_blocking_non_session_test)) + if wreq.blocking is not None: + cases.append(("wreq", wreq_blocking_non_session_test)) + return [BenchmarkCase(name, runner) for name, runner in add_package_version(cases)] + + +def build_async_session_cases() -> list[AsyncBenchmarkCase]: + cases = [] + if httpx is not None: + cases.append(("httpx", httpx_async_session_test)) + if niquests is not None: + cases.append(("niquests", niquests_async_session_test)) + if curl_cffi.requests is not None: + cases.append(("curl_cffi", curl_cffi_async_session_test)) + if aiohttp is not None: + cases.append(("aiohttp", aiohttp_async_session_test)) + if ry is not None: + cases.append(("ry", ry_async_session_test)) + if wreq is not None: + cases.append(("wreq", wreq_async_session_test)) + return [ + AsyncBenchmarkCase(name, runner) for name, runner in add_package_version(cases) + ] -def run_threaded_tests( - packages: List[Tuple[str, Any]], url: str, requests_number: int, threads: int -) -> List[Dict[str, Any]]: - """Run multi-threaded benchmark tests""" - results = [] - for name, session_class in packages: - # Test with session - using ThreadPoolExecutor - if session_class in SYNC_SESSION_TESTS: - start = time.perf_counter() - cpu_start = time.process_time() - with ThreadPoolExecutor(threads) as executor: - futures = [ - executor.submit( - SYNC_SESSION_TESTS[session_class], - url, - requests_number // threads, - ) - for _ in range(threads) - ] - for f in as_completed(futures): - f.result() - results.append( - record_test_result( - name, "Threaded-Session", url, start, cpu_start, threads - ) - ) +def build_async_non_session_cases() -> list[AsyncBenchmarkCase]: + cases = [] + if httpx is not None: + cases.append(("httpx", httpx_async_non_session_test)) + if niquests is not None: + cases.append(("niquests", niquests_async_non_session_test)) + if curl_cffi.requests is not None: + cases.append(("curl_cffi", curl_cffi_async_non_session_test)) + if aiohttp is not None: + cases.append(("aiohttp", aiohttp_async_non_session_test)) + if ry is not None: + cases.append(("ry", ry_async_non_session_test)) + if wreq is not None: + cases.append(("wreq", wreq_async_non_session_test)) + return [ + AsyncBenchmarkCase(name, runner) for name, runner in add_package_version(cases) + ] - # Test without session - using ThreadPoolExecutor - if session_class in SYNC_NON_SESSION_TESTS: - start = time.perf_counter() - cpu_start = time.process_time() - with ThreadPoolExecutor(threads) as executor: - futures = [ - executor.submit( - SYNC_NON_SESSION_TESTS[session_class], - url, - requests_number // threads, - ) - for _ in range(threads) - ] - for f in as_completed(futures): - f.result() - results.append( - record_test_result( - name, "Threaded-NonSession", url, start, cpu_start, threads - ) - ) - return results +SYNC_SESSION_CASES = build_sync_session_cases() +SYNC_NON_SESSION_CASES = build_sync_non_session_cases() +ASYNC_SESSION_CASES = build_async_session_cases() +ASYNC_NON_SESSION_CASES = build_async_non_session_cases() -def run_async_tests( - async_packages: List[Tuple[str, Any]], url: str, requests_number: int -) -> List[Dict[str, Any]]: - """Run asynchronous benchmark tests""" - results = [] +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- - for name, session_class in async_packages: - # Test with session - if session_class in ASYNC_SESSION_TESTS: - start = time.perf_counter() - cpu_start = time.process_time() - asyncio.run(ASYNC_SESSION_TESTS[session_class](url, requests_number)) - results.append( - record_test_result(name, "Async-Session", url, start, cpu_start) - ) - # Test without session - if session_class in ASYNC_NON_SESSION_TESTS: - start = time.perf_counter() - cpu_start = time.process_time() - asyncio.run(ASYNC_NON_SESSION_TESTS[session_class](url, requests_number)) - results.append( - record_test_result(name, "Async-NonSession", url, start, cpu_start) +def main() -> int: + # Quick server probe before spawning any subprocesses. + if "--worker" not in sys.argv: + base_url_probe = "http://127.0.0.1:8000" + for arg in sys.argv[1:]: + if arg.startswith("--http-base-url="): + base_url_probe = arg.split("=", 1)[1] + break + probe_url = f"{base_url_probe.rstrip('/')}/20k" + try: + with urllib.request.urlopen(probe_url, timeout=2) as resp: + resp.read(1) + except (OSError, urllib.error.URLError) as exc: + print( + f"ERROR: benchmark server unavailable at {probe_url}: {exc}", + file=sys.stderr, ) - - return results - - -# ============================================================================= -# Main Execution -# ============================================================================= - - -def parse_arguments(): - """Parse command line arguments""" - parser = argparse.ArgumentParser( - description="HTTP Client Benchmark Tool", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - - parser.add_argument( - "--requests", "-r", type=int, default=400, help="Number of requests per test" + print("Start bench/server.py before running benchmarks.", file=sys.stderr) + return 1 + + # 3 processes × 3 values × 1 warmup = 9 measurements per benchmark. + # Pass --fast for a quick estimate or --rigorous for higher confidence. + runner = pyperf.Runner(processes=3, values=3, warmups=1) + runner.argparser.add_argument( + "--http-base-url", + default="http://127.0.0.1:8000", + metavar="URL", + help="Base URL for the benchmark server. (default: %(default)s)", ) - - parser.add_argument( - "--threads", - "-t", + runner.argparser.add_argument( + "--http-requests", type=int, - nargs="+", - default=[1, 4, 8, 16], - help="Thread counts to test (e.g., --threads 1 2 4 8)", - ) - - parser.add_argument( - "--output", - "-o", - type=str, - default="benchmark_results.csv", - help="Output CSV file name", + default=100, + metavar="N", + help="Concurrent HTTP requests per benchmark call. (default: %(default)s)", ) - - parser.add_argument( - "--chart", - "-c", - type=str, - default="benchmark_multi.png", - help="Output chart file name", - ) - - parser.add_argument( - "--base-url", - type=str, - default="http://127.0.0.1:8000", - help="Base URL for the benchmark server", + runner.argparser.add_argument( + "--http-workers", + type=int, + default=32, + metavar="N", + help="Thread pool size for sync benchmarks. (default: %(default)s)", ) - return parser.parse_args() - - -def main(): - """Main benchmark execution""" - args = parse_arguments() - - # Use command line arguments - requests_number = args.requests - thread_counts = args.threads - - print("Starting benchmark with:") - print(f" Requests per test: {requests_number}") - print(f" Thread counts: {thread_counts}") - print(f" Base URL: {args.base_url}") - print() - - # Define sync packages - sync_packages = [ - ("tls_client", tls_client.Session), - ("httpx", httpx.Client), - ("requests", requests.Session), - ("niquests", niquests.Session), - ("curl_cffi", curl_cffi.requests.Session), - ("pycurl", PycurlSession), - ("ry", ry.BlockingClient), - ("wreq", wreq.blocking.Client), - ] - - # Define async packages - async_packages = [ - ("httpx", httpx.AsyncClient), - ("niquests", niquests.AsyncSession), - ("curl_cffi", curl_cffi.requests.AsyncSession), - ("aiohttp", aiohttp.ClientSession), - ("ry", ry.HttpClient), - ("wreq", wreq.Client), - ] - - # Add version information - sync_packages = add_package_version(sync_packages) - async_packages = add_package_version(async_packages) - - all_results = [] - - # Run tests for different payload sizes - for size in ["20k", "50k", "200k"]: - url = f"{args.base_url}/{size}" - print(f"Testing with {size} payload...") - - # Run sync tests - all_results += run_sync_tests(sync_packages, url, requests_number) - - # Run async tests - all_results += run_async_tests(async_packages, url, requests_number) + args = runner.parse_args() + base_url: str = args.http_base_url.rstrip("/") + http_requests: int = args.http_requests + workers: int = args.http_workers + + # Register all benchmarks. pyperf assigns each a task ID in order; + # worker subprocesses run only the specific task they are assigned. + # inner_loops=http_requests normalises the reported time to per-request, + # so the reported mean is time/request and req/s = http_requests / mean. + + for size in PAYLOAD_SIZES: + for case in SYNC_NON_SESSION_CASES: + runner.bench_func( + f"sync-non-session/{size}/{case.id}", + run_parallel_non_session_case, + case.runner, + f"{base_url}/{size}", + http_requests, + workers, + inner_loops=http_requests, + ) - # Run threaded tests - for threads in thread_counts: - all_results += run_threaded_tests( - sync_packages, url, requests_number, threads + for size in PAYLOAD_SIZES: + for case in SYNC_SESSION_CASES: + runner.bench_func( + f"sync-session/{size}/{case.id}", + case.runner, + f"{base_url}/{size}", + http_requests, + workers, + inner_loops=http_requests, ) - # Save results - print(f"Saving results to {args.output}...") - df = pd.DataFrame(all_results) - df.to_csv(args.output, index=False) + for size in PAYLOAD_SIZES: + for case in ASYNC_NON_SESSION_CASES: + runner.bench_async_func( + f"async-non-session/{size}/{case.id}", + case.runner, + f"{base_url}/{size}", + http_requests, + inner_loops=http_requests, + ) - # Generate chart - print(f"Generating chart {args.chart}...") - plot_benchmark_multi(df, args.chart) + for size in PAYLOAD_SIZES: + for case in ASYNC_SESSION_CASES: + runner.bench_async_func( + f"async-session/{size}/{case.id}", + case.runner, + f"{base_url}/{size}", + http_requests, + inner_loops=http_requests, + ) - print("Benchmark completed!") + return 0 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/bench/chart.py b/bench/chart.py deleted file mode 100644 index 013bdc2b..00000000 --- a/bench/chart.py +++ /dev/null @@ -1,160 +0,0 @@ -import matplotlib.pyplot as plt -import numpy as np - - -def _plot_main_sessions(df, main_sessions, sizes, stat_types, filename): - """Plot main sessions (sync and async)""" - num_sessions = len(main_sessions) - # Allocate more height for each subplot to ensure sufficient spacing - subplot_height = 8 # Fixed height for each subplot - total_height = subplot_height * num_sessions + 2 # Extra 2 inches for spacing - - fig, axes = plt.subplots( - num_sessions, - 1, - figsize=(20, total_height), - constrained_layout=False, # Disable constrained_layout, use manual layout - ) - - if num_sessions == 1: - axes = [axes] - - for idx, session in enumerate(main_sessions): - ax = axes[idx] - subdf = df[df["session"] == session] - names = subdf["name"].unique() - x = np.arange(len(names)) - width = 0.12 - - max_height = 0 - - for i, size in enumerate(sizes): - for j, stat in enumerate(stat_types): - vals = [] - for name in names: - v = subdf[(subdf["name"] == name) & (subdf["size"] == size)][stat] - vals.append(v.values[0] if not v.empty else 0) - offset = (i * len(stat_types) + j) * width - rects = ax.bar(x + offset, vals, width, label=f"{stat} {size}") - ax.bar_label(rects, padding=2, fontsize=7, rotation=90) - if vals: - max_height = max(max_height, max(vals)) - - ax.set_xticks(x + (len(sizes) * len(stat_types) * width) / 2 - width / 2) - ax.set_xticklabels(names, rotation=0, ha="center", fontsize=8) - ax.set_ylabel("Time (s)") - ax.set_title(f"Benchmark | {session}", fontsize=12, fontweight="bold") - ax.legend(loc="upper left", ncol=3, prop={"size": 7}) - ax.tick_params(axis="x", labelsize=8) - ax.grid(True, alpha=0.3) - - if max_height > 0: - ax.set_ylim(0, max_height * 1.35) - - plt.subplots_adjust(hspace=0.5, top=0.95, bottom=0.1, left=0.08, right=0.98) - # Set explicit margins for all sides - plt.savefig(filename, format="png", dpi=150, bbox_inches="tight") - plt.close(fig) - - -def _plot_threaded_sessions(df, threaded_sessions, sizes, stat_types, filename): - """Plot threaded sessions separately""" - threaded_df = df[df["session"].str.startswith("Threaded")].copy() - thread_counts = sorted(threaded_df["threads"].unique()) - - fig2, axes2 = plt.subplots( - len(thread_counts), - 1, - figsize=(20, 10 * len(thread_counts)), - constrained_layout=False, # Disable constrained_layout, use manual layout - ) - - if len(thread_counts) == 1: - axes2 = [axes2] - - for idx, thread_count in enumerate(thread_counts): - ax = axes2[idx] - thread_df = threaded_df[threaded_df["threads"] == thread_count] - - # Get all unique session types for this thread count - thread_session_types = thread_df["session"].unique() - - names = thread_df["name"].unique() - x = np.arange(len(names)) - width = 0.08 - max_height = 0 - bar_index = 0 - - # Plot each session type - for session_type in thread_session_types: - session_df = thread_df[thread_df["session"] == session_type] - session_label = session_type.replace("Threaded-", "") - - for i, size in enumerate(sizes): - for j, stat in enumerate(stat_types): - vals = [] - for name in names: - v = session_df[ - (session_df["name"] == name) & (session_df["size"] == size) - ][stat] - vals.append(v.values[0] if not v.empty else 0) - offset = bar_index * width - rects = ax.bar( - x + offset, - vals, - width, - label=f"{session_label} {stat} {size}", - ) - ax.bar_label(rects, padding=2, fontsize=6, rotation=90) - if vals: - max_height = max(max_height, max(vals)) - bar_index += 1 - - ax.set_xticks(x + (bar_index * width) / 2 - width / 2) - ax.set_xticklabels(names, rotation=0, ha="center", fontsize=8) - ax.set_ylabel("Time (s)") - ax.set_title( - f"Benchmark | Threaded ({thread_count} threads)", - fontsize=12, - fontweight="bold", - ) - ax.legend(loc="upper left", ncol=4, prop={"size": 6}) - ax.tick_params(axis="x", labelsize=8) - ax.grid(True, alpha=0.3) - - if max_height > 0: - ax.set_ylim(0, max_height * 1.35) - - threaded_filename = filename.replace(".png", "_threaded.png") - plt.subplots_adjust(hspace=0.5, top=0.95, bottom=0.1, left=0.08, right=0.98) - # Set explicit margins for all sides - plt.savefig(threaded_filename, format="png", dpi=150, bbox_inches="tight") - plt.close(fig2) - - -def plot_benchmark_multi(df, filename): - """ - Draw multi-subplot, multi-group, multi-metric bar charts for time/cpu_time/different payload sizes. - Generate separate plots for sync/async and session/non-session combinations. - """ - # Keep only necessary columns - df = df[["name", "session", "threads", "size", "time", "cpu_time"]].copy() - df["threads"] = df["threads"].fillna(1).astype(int) - - # Get unique session types - existing_session_types = df["session"].unique() - - sizes = sorted(df["size"].unique(), key=lambda x: int(x.replace("k", ""))) - stat_types = ["time", "cpu_time"] - - # Separate main sessions (non-threaded) and threaded sessions - main_sessions = [s for s in existing_session_types if not s.startswith("Threaded")] - threaded_sessions = [s for s in existing_session_types if s.startswith("Threaded")] - - # Plot main sessions (sync and async) - if main_sessions: - _plot_main_sessions(df, main_sessions, sizes, stat_types, filename) - - # Plot threaded sessions separately - if threaded_sessions: - _plot_threaded_sessions(df, threaded_sessions, sizes, stat_types, filename) diff --git a/pyproject.toml b/pyproject.toml index c1042461..f33531b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,8 +48,10 @@ docs = [ "mike>=2.0.0", ] bench = [ + "pytest", + "pyperf", + "starlette", "niquests", - "tls_client", "httpx", "requests", "wreq", @@ -58,14 +60,14 @@ bench = [ "pycurl", "uvicorn", "starlette", - "matplotlib", - "pandas", - "seaborn", "typing_extensions", - "uvloop", + "uvloop; platform_system != 'Windows'", "ry", ] [tool.ruff] [tool.ruff.lint] ignore = ["F403", "F405"] + +[tool.pyright] +reportArgumentType = false diff --git a/python/wreq/http2.py b/python/wreq/http2.py index ed1398c4..dddba050 100644 --- a/python/wreq/http2.py +++ b/python/wreq/http2.py @@ -238,7 +238,7 @@ def __str__(self) -> str: Return a string representation of the type. """ ... - + @final class SettingsOrder: @@ -263,7 +263,7 @@ def __str__(self) -> str: Return a string representation of the type. """ ... - + class Params(TypedDict): """ @@ -400,4 +400,4 @@ def __str__(self) -> str: """ Return a string representation of the type. """ - ... \ No newline at end of file + ...