-
Notifications
You must be signed in to change notification settings - Fork 0
Recipes
Practical, copyable patterns for the cryptohopper Python SDK. Every snippet runs as-is — drop into a .py file and execute. They use only the public SDK surface, never internals.
The SDK is synchronous and built on httpx. If you need async, wrap calls in asyncio.to_thread or run them in a thread pool — there's no separate AsyncCryptohopperClient (yet).
- Use the client as a context manager
- Wait for a backtest to finish
- Find every open position across all your hoppers
- Detect new fills since the last poll
- Fail fast on auth errors, retry on transient ones
- Read your remaining backtest quota
- Run multiple SDK calls in parallel from a thread pool
- Bring your own httpx.Client (proxies, mTLS, instrumentation)
- Tighten timeouts for short-lived workers
- Disable the SDK's built-in retry and handle 429 yourself
- Mock the SDK in tests with pytest-httpx
- Debug an auth failure end-to-end
The client owns an httpx.Client connection pool. with blocks close it cleanly when you're done.
import os
from cryptohopper import CryptohopperClient
with CryptohopperClient(api_key=os.environ["CRYPTOHOPPER_TOKEN"]) as ch:
me = ch.user.get()
print(me["email"])Outside a with block, call ch.close() explicitly when done — leaking pools holds open file descriptors.
Backtests run async on the server. create returns immediately with an ID; you poll get until status is terminal.
import time
from cryptohopper import CryptohopperClient
def run_backtest(ch: CryptohopperClient, hopper_id: int, from_date: str, to_date: str) -> dict:
submitted = ch.backtest.create({
"hopper_id": hopper_id,
"start_date": from_date,
"end_date": to_date,
})
bt_id = submitted["id"]
while True:
bt = ch.backtest.get(bt_id)
if bt.get("status") in {"completed", "failed"}:
return bt
time.sleep(5)The backtest rate bucket is separate (1 request per 2 seconds). 5-second polling stays well clear.
with CryptohopperClient(api_key=os.environ["CRYPTOHOPPER_TOKEN"]) as ch:
for h in ch.hoppers.list():
positions = ch.hoppers.positions(h["id"])
for p in positions:
print(f'{h.get("name")} (#{h["id"]}): {p.get("amount")} {p.get("coin")} @ {p.get("rate")}')This is sequential — one request per hopper. With 50+ hoppers, see the thread-pool recipe below for parallelisation.
import time
seen: set[int | str] = set()
def poll_fills(ch: CryptohopperClient, hopper_id: int) -> None:
for o in ch.hoppers.orders(hopper_id):
oid = o.get("id")
if oid is not None and oid not in seen and o.get("status") == "filled":
seen.add(oid)
print(f'Fill: {o["market"]} {o["type"]} {o["amount"]} @ {o["price"]}')
while True:
poll_fills(ch, hopper_id=42)
time.sleep(10)For production-grade fill notifications, configure the webhooks resource — push beats poll for event delivery.
The SDK auto-retries 429s. For 5xx and network errors you may want a tighter retry; auth errors should never be retried.
import time
from cryptohopper import CryptohopperClient, CryptohopperError
def with_retry(fn, max_attempts: int = 3):
for attempt in range(max_attempts):
try:
return fn()
except CryptohopperError as e:
if e.code in {"UNAUTHORIZED", "FORBIDDEN", "NOT_FOUND", "VALIDATION_ERROR"}:
raise
if attempt == max_attempts - 1:
raise
time.sleep(0.5 * (2 ** attempt))
me = with_retry(lambda: ch.user.get())limits = ch.backtest.limits()
print(f"Backtests remaining: {limits.get('remaining')} of {limits.get('limit')}")For the normal and order buckets there's no explicit quota endpoint — the only signal is Retry-After on a 429 (read it via error.retry_after_ms).
The SDK is sync but reentrant — you can share one client across threads.
from concurrent.futures import ThreadPoolExecutor
def positions_for(hopper_id: int) -> list:
return ch.hoppers.positions(hopper_id)
with ThreadPoolExecutor(max_workers=10) as pool:
hoppers = ch.hoppers.list()
results = list(pool.map(lambda h: (h["id"], positions_for(h["id"])), hoppers))
for hopper_id, positions in results:
print(hopper_id, len(positions))Each in-flight call counts against the normal bucket (30 req/min). With many concurrent calls, expect 429s — the SDK will retry transparently.
import httpx
from cryptohopper import CryptohopperClient
custom = httpx.Client(
proxy="http://corp-proxy:8080",
verify="/etc/ssl/certs/corp-ca.pem",
event_hooks={
"request": [lambda req: print(f"-> {req.method} {req.url}")],
"response": [lambda res: print(f"<- {res.status_code} {res.url}")],
},
)
with CryptohopperClient(
api_key=os.environ["CRYPTOHOPPER_TOKEN"],
http_client=custom,
) as ch:
ch.user.get()When you pass http_client, the SDK uses it as-is — timeout and base_url settings on the SDK still apply (they're set on a per-request basis). Make sure your custom client doesn't have a conflicting global timeout.
Default timeout is 30 seconds. Inside an AWS Lambda (15s) or other short-lived worker, the default outlives your invocation, leading to confusing "function killed" errors instead of clean SDK timeouts.
ch = CryptohopperClient(
api_key=os.environ["CRYPTOHOPPER_TOKEN"],
timeout=8.0, # ~half your function budget
max_retries=1, # leave headroom for one retry inside the function lifetime
)A CryptohopperError with code == "TIMEOUT" is much easier to handle than a process kill.
from cryptohopper import CryptohopperClient, CryptohopperError
ch = CryptohopperClient(
api_key=os.environ["CRYPTOHOPPER_TOKEN"],
max_retries=0,
)
try:
ch.hoppers.list()
except CryptohopperError as e:
if e.code == "RATE_LIMITED":
print(f"Rate limited; server says wait {e.retry_after_ms}ms")
# your custom queue / circuit breaker / etc.
else:
raiseUseful when you have your own queue, want exact backoff control, or are running inside something that already does retries (Celery, RQ, Airflow).
The test suite uses pytest-httpx — your tests can do the same.
import os
import pytest
from cryptohopper import CryptohopperClient
@pytest.fixture
def ch():
with CryptohopperClient(api_key="test") as client:
yield client
def test_user_get(httpx_mock, ch):
httpx_mock.add_response(
method="GET",
url="https://api.cryptohopper.com/v1/user/get",
json={"data": {"id": 42, "email": "alice@example.com"}},
)
me = ch.user.get()
assert me["id"] == 42
def test_rate_limit_retry(httpx_mock, ch):
httpx_mock.add_response(status_code=429, headers={"Retry-After": "0"})
httpx_mock.add_response(json={"data": {"id": 42}})
me = ch.user.get()
assert me["id"] == 42The SDK pulls data out of the envelope automatically — your mock returns {"data": ...}, your assertion sees the inner value.
If the SDK is returning UNAUTHORIZED / FORBIDDEN on every call, this script narrows down why — most commonly: token typo, expired token, IP not on the OAuth-app allowlist, or wrong SDK version.
import os
import sys
from cryptohopper import CryptohopperClient, CryptohopperError
from cryptohopper._version import CURRENT_VERSION
def debug_auth() -> None:
token = os.environ.get("CRYPTOHOPPER_TOKEN", "")
# 1. SDK version sanity check. 0.4.0a1 and earlier sent the wrong auth
# header (Authorization: Bearer instead of access-token); 0.4.0a2+ is fixed.
print(f"SDK version: {CURRENT_VERSION}")
if CURRENT_VERSION == "0.4.0a1":
print("⚠ You're on 0.4.0a1, which sent the wrong auth header. Upgrade to 0.4.0a2+.")
# 2. Token shape sanity — Cryptohopper bearer tokens are 40 chars.
print(f"Token length: {len(token)} (expected 40)")
if not token:
print("✗ CRYPTOHOPPER_TOKEN is empty. Set it from your Cryptohopper developer dashboard.")
sys.exit(1)
# 3. The minimal authenticated probe.
with CryptohopperClient(api_key=token, max_retries=0) as ch:
try:
me = ch.user.get()
who = me.get("username") or me.get("email") or me.get("id")
print(f"✓ Authenticated. User: {who}")
except CryptohopperError as e:
print(f"✗ {e.code} (HTTP {e.status}): {e}")
if e.ip_address:
print(f" Cryptohopper saw your IP as: {e.ip_address}")
print(f" → If your OAuth app has IP allowlisting, add this IP to the allowlist.")
if e.code == "UNAUTHORIZED":
print(" → Token rejected. Either it's expired, revoked, or never existed.")
print(" → Re-issue at https://www.cryptohopper.com developer dashboard.")
elif e.code == "FORBIDDEN":
print(" → Token is valid but lacks the required scope, or your IP is blocked.")
print(" → Check the OAuth app's scopes in the developer dashboard.")
elif e.status == 405 and "Missing Authentication Token" in str(e):
print(" → AWS API Gateway rejected the request — the SDK is sending the wrong auth header.")
print(" → Confirm SDK version is 0.4.0a2 or later.")
if __name__ == "__main__":
debug_auth()Run with python debug_auth.py. The output gives you the next action in plain English. Sample output:
SDK version: 0.4.0a2
Token length: 40 (expected 40)
✓ Authenticated. User: alice
Or, on IP-allowlist failure:
SDK version: 0.4.0a2
Token length: 40 (expected 40)
✗ FORBIDDEN (HTTP 403): IP address mismatch
Cryptohopper saw your IP as: 203.0.113.42
→ If your OAuth app has IP allowlisting, add this IP to the allowlist.
Save it as debug_auth.py once you have a setup that needs to be re-checked periodically.
Pages
Other SDKs
Resources