Skip to content

Recipes

Pim Feltkamp edited this page Apr 27, 2026 · 2 revisions

Recipes

Practical, copyable patterns for the cryptohopper Python SDK. Every snippet runs as-is — drop into a .py file and execute. They use only the public SDK surface, never internals.

The SDK is synchronous and built on httpx. If you need async, wrap calls in asyncio.to_thread or run them in a thread pool — there's no separate AsyncCryptohopperClient (yet).

Contents

Use the client as a context manager

The client owns an httpx.Client connection pool. with blocks close it cleanly when you're done.

import os
from cryptohopper import CryptohopperClient

with CryptohopperClient(api_key=os.environ["CRYPTOHOPPER_TOKEN"]) as ch:
    me = ch.user.get()
    print(me["email"])

Outside a with block, call ch.close() explicitly when done — leaking pools holds open file descriptors.

Wait for a backtest to finish

Backtests run async on the server. create returns immediately with an ID; you poll get until status is terminal.

import time
from cryptohopper import CryptohopperClient

def run_backtest(ch: CryptohopperClient, hopper_id: int, from_date: str, to_date: str) -> dict:
    submitted = ch.backtest.create({
        "hopper_id": hopper_id,
        "start_date": from_date,
        "end_date": to_date,
    })
    bt_id = submitted["id"]

    while True:
        bt = ch.backtest.get(bt_id)
        if bt.get("status") in {"completed", "failed"}:
            return bt
        time.sleep(5)

The backtest rate bucket is separate (1 request per 2 seconds). 5-second polling stays well clear.

Find every open position across all your hoppers

with CryptohopperClient(api_key=os.environ["CRYPTOHOPPER_TOKEN"]) as ch:
    for h in ch.hoppers.list():
        positions = ch.hoppers.positions(h["id"])
        for p in positions:
            print(f'{h.get("name")} (#{h["id"]}): {p.get("amount")} {p.get("coin")} @ {p.get("rate")}')

This is sequential — one request per hopper. With 50+ hoppers, see the thread-pool recipe below for parallelisation.

Detect new fills since the last poll

import time

seen: set[int | str] = set()

def poll_fills(ch: CryptohopperClient, hopper_id: int) -> None:
    for o in ch.hoppers.orders(hopper_id):
        oid = o.get("id")
        if oid is not None and oid not in seen and o.get("status") == "filled":
            seen.add(oid)
            print(f'Fill: {o["market"]} {o["type"]} {o["amount"]} @ {o["price"]}')

while True:
    poll_fills(ch, hopper_id=42)
    time.sleep(10)

For production-grade fill notifications, configure the webhooks resource — push beats poll for event delivery.

Fail fast on auth errors, retry on transient ones

The SDK auto-retries 429s. For 5xx and network errors you may want a tighter retry; auth errors should never be retried.

import time
from cryptohopper import CryptohopperClient, CryptohopperError

def with_retry(fn, max_attempts: int = 3):
    for attempt in range(max_attempts):
        try:
            return fn()
        except CryptohopperError as e:
            if e.code in {"UNAUTHORIZED", "FORBIDDEN", "NOT_FOUND", "VALIDATION_ERROR"}:
                raise
            if attempt == max_attempts - 1:
                raise
            time.sleep(0.5 * (2 ** attempt))

me = with_retry(lambda: ch.user.get())

Read your remaining backtest quota

limits = ch.backtest.limits()
print(f"Backtests remaining: {limits.get('remaining')} of {limits.get('limit')}")

For the normal and order buckets there's no explicit quota endpoint — the only signal is Retry-After on a 429 (read it via error.retry_after_ms).

Run multiple SDK calls in parallel from a thread pool

The SDK is sync but reentrant — you can share one client across threads.

from concurrent.futures import ThreadPoolExecutor

def positions_for(hopper_id: int) -> list:
    return ch.hoppers.positions(hopper_id)

with ThreadPoolExecutor(max_workers=10) as pool:
    hoppers = ch.hoppers.list()
    results = list(pool.map(lambda h: (h["id"], positions_for(h["id"])), hoppers))

for hopper_id, positions in results:
    print(hopper_id, len(positions))

Each in-flight call counts against the normal bucket (30 req/min). With many concurrent calls, expect 429s — the SDK will retry transparently.

Bring your own httpx.Client (proxies, mTLS, instrumentation)

import httpx
from cryptohopper import CryptohopperClient

custom = httpx.Client(
    proxy="http://corp-proxy:8080",
    verify="/etc/ssl/certs/corp-ca.pem",
    event_hooks={
        "request": [lambda req: print(f"-> {req.method} {req.url}")],
        "response": [lambda res: print(f"<- {res.status_code} {res.url}")],
    },
)

with CryptohopperClient(
    api_key=os.environ["CRYPTOHOPPER_TOKEN"],
    http_client=custom,
) as ch:
    ch.user.get()

When you pass http_client, the SDK uses it as-is — timeout and base_url settings on the SDK still apply (they're set on a per-request basis). Make sure your custom client doesn't have a conflicting global timeout.

Tighten timeouts for short-lived workers

Default timeout is 30 seconds. Inside an AWS Lambda (15s) or other short-lived worker, the default outlives your invocation, leading to confusing "function killed" errors instead of clean SDK timeouts.

ch = CryptohopperClient(
    api_key=os.environ["CRYPTOHOPPER_TOKEN"],
    timeout=8.0,        # ~half your function budget
    max_retries=1,      # leave headroom for one retry inside the function lifetime
)

A CryptohopperError with code == "TIMEOUT" is much easier to handle than a process kill.

Disable the SDK's built-in retry and handle 429 yourself

from cryptohopper import CryptohopperClient, CryptohopperError

ch = CryptohopperClient(
    api_key=os.environ["CRYPTOHOPPER_TOKEN"],
    max_retries=0,
)

try:
    ch.hoppers.list()
except CryptohopperError as e:
    if e.code == "RATE_LIMITED":
        print(f"Rate limited; server says wait {e.retry_after_ms}ms")
        # your custom queue / circuit breaker / etc.
    else:
        raise

Useful when you have your own queue, want exact backoff control, or are running inside something that already does retries (Celery, RQ, Airflow).

Mock the SDK in tests with pytest-httpx

The test suite uses pytest-httpx — your tests can do the same.

import os
import pytest
from cryptohopper import CryptohopperClient

@pytest.fixture
def ch():
    with CryptohopperClient(api_key="test") as client:
        yield client

def test_user_get(httpx_mock, ch):
    httpx_mock.add_response(
        method="GET",
        url="https://api.cryptohopper.com/v1/user/get",
        json={"data": {"id": 42, "email": "alice@example.com"}},
    )
    me = ch.user.get()
    assert me["id"] == 42

def test_rate_limit_retry(httpx_mock, ch):
    httpx_mock.add_response(status_code=429, headers={"Retry-After": "0"})
    httpx_mock.add_response(json={"data": {"id": 42}})
    me = ch.user.get()
    assert me["id"] == 42

The SDK pulls data out of the envelope automatically — your mock returns {"data": ...}, your assertion sees the inner value.

Debug an auth failure end-to-end

If the SDK is returning UNAUTHORIZED / FORBIDDEN on every call, this script narrows down why — most commonly: token typo, expired token, IP not on the OAuth-app allowlist, or wrong SDK version.

import os
import sys
from cryptohopper import CryptohopperClient, CryptohopperError
from cryptohopper._version import CURRENT_VERSION


def debug_auth() -> None:
    token = os.environ.get("CRYPTOHOPPER_TOKEN", "")

    # 1. SDK version sanity check. 0.4.0a1 and earlier sent the wrong auth
    #    header (Authorization: Bearer instead of access-token); 0.4.0a2+ is fixed.
    print(f"SDK version: {CURRENT_VERSION}")
    if CURRENT_VERSION == "0.4.0a1":
        print("⚠ You're on 0.4.0a1, which sent the wrong auth header. Upgrade to 0.4.0a2+.")

    # 2. Token shape sanity — Cryptohopper bearer tokens are 40 chars.
    print(f"Token length: {len(token)} (expected 40)")
    if not token:
        print("✗ CRYPTOHOPPER_TOKEN is empty. Set it from your Cryptohopper developer dashboard.")
        sys.exit(1)

    # 3. The minimal authenticated probe.
    with CryptohopperClient(api_key=token, max_retries=0) as ch:
        try:
            me = ch.user.get()
            who = me.get("username") or me.get("email") or me.get("id")
            print(f"✓ Authenticated. User: {who}")
        except CryptohopperError as e:
            print(f"✗ {e.code} (HTTP {e.status}): {e}")

            if e.ip_address:
                print(f"  Cryptohopper saw your IP as: {e.ip_address}")
                print(f"  → If your OAuth app has IP allowlisting, add this IP to the allowlist.")

            if e.code == "UNAUTHORIZED":
                print("  → Token rejected. Either it's expired, revoked, or never existed.")
                print("  → Re-issue at https://www.cryptohopper.com developer dashboard.")
            elif e.code == "FORBIDDEN":
                print("  → Token is valid but lacks the required scope, or your IP is blocked.")
                print("  → Check the OAuth app's scopes in the developer dashboard.")
            elif e.status == 405 and "Missing Authentication Token" in str(e):
                print("  → AWS API Gateway rejected the request — the SDK is sending the wrong auth header.")
                print("  → Confirm SDK version is 0.4.0a2 or later.")


if __name__ == "__main__":
    debug_auth()

Run with python debug_auth.py. The output gives you the next action in plain English. Sample output:

SDK version: 0.4.0a2
Token length: 40 (expected 40)
✓ Authenticated. User: alice

Or, on IP-allowlist failure:

SDK version: 0.4.0a2
Token length: 40 (expected 40)
✗ FORBIDDEN (HTTP 403): IP address mismatch
  Cryptohopper saw your IP as: 203.0.113.42
  → If your OAuth app has IP allowlisting, add this IP to the allowlist.

Save it as debug_auth.py once you have a setup that needs to be re-checked periodically.

See also