diff --git a/apps/api/core/substitution.py b/apps/api/core/substitution.py new file mode 100644 index 0000000..70539c9 --- /dev/null +++ b/apps/api/core/substitution.py @@ -0,0 +1,388 @@ +"""core/substitution.py — deterministic substitution / failover engine (v0.9.1). + +Extends the Reliability Proxy from single-hop into a config-driven, multi-hop +failover across an equivalence GROUP of interchangeable providers. The proxy +calls run_with_failover() ONLY on the failure branch — the primary success path +never touches this module (zero-overhead guarantee). + +What it does, per the approved plan: + * classify the primary failure as pre_send vs post_send_ambiguous (idempotency); + * optional retry-first on the SAME provider for a read-timeout before substituting; + * chain through the group (ordered by wri_score when present, else curated + manual_rank — never sorts on nulls, never random), depth-capped; + * bill ONLY the provider that actually served (deduct→run→refund-on-fail→next), + so the surviving ledger row is the served provider + its embedded 1.5% fee; + * gate post-send-ambiguous failover (managed rail only, under a cost cap, with + full instrumentation; x402/on-chain is strict — never fails over post-send); + * log every attempt to substitution_events (future learned-layer training signal). + +ML-ranked selection is a later phase; this is the deterministic layer. +""" +from __future__ import annotations + +import asyncio +import logging +import os +import time as _time +from dataclasses import dataclass, field + +from services.managed import SERVICE_CONFIGS +from services.param_mapper import MANAGED_TO_CATALOG, map_params + +# Helpers reused from the execute path. Imported at module level so tests can +# monkeypatch core.substitution.. +from core.credits import check_and_deduct_credits +from routers.execute import ( + _classify_failure, + _do_refund, + _mk_refund_key, + _try_execute_managed_ex, + _SETTLEMENT_PRE, + _SETTLEMENT_POST, +) + +logger = logging.getLogger("wayforth") + +# Curated seed (mirrors migration 064). Used when the DB table is empty/unreachable +# and by unit tests. DB rows override this. (category -> [(slug, manual_rank), ...]) +_SEED_GROUPS: dict[str, list[tuple[str, int]]] = { + "web-search": [("serper", 1), ("brave", 2), ("tavily", 3), ("perplexity", 4)], + "llm-inference": [("groq", 1), ("together", 2), ("mistral", 3), ("gemini", 4)], +} + +_LLM_SLUGS = frozenset({"groq", "together", "mistral", "gemini", "perplexity"}) + +# Providers that honor a client idempotency key on retry (so a same-provider +# retry after a post-send-ambiguous failure is server-side deduped, not a second +# billable call). EMPTY today — no managed adapter passes one yet. When a provider +# gains support, add its slug here AND pass the key in the retry call below. Until +# then, a post-send retry would CREATE the duplicate cost it is meant to prevent, +# so it is skipped (we go straight to the cost-capped, instrumented substitution). +_IDEMPOTENCY_KEY_PROVIDERS: frozenset[str] = frozenset() + + +def _supports_idempotency_key(slug: str) -> bool: + return slug in _IDEMPOTENCY_KEY_PROVIDERS + +# Per-category in-process cache of the ordered chain (TTL bounded). Keyed by +# category; value = (expiry_monotonic, [ordered_slugs]). +_CHAIN_CACHE: dict[str, tuple[float, list[str]]] = {} +_CACHE_TTL = 300.0 + + +@dataclass +class FailoverPolicy: + max_depth: int = int(os.environ.get("WAYFORTH_FAILOVER_MAX_DEPTH", "3")) + retry_primary_on_transient: bool = ( + os.environ.get("WAYFORTH_FAILOVER_RETRY_PRIMARY", "true").lower() == "true" + ) + # Managed-rail default: DO fail over on post-send-ambiguous (user is always + # refunded). Flip to false to be strict. x402/on-chain is ALWAYS strict + # regardless of this flag (see the rail check in run_with_failover). + failover_post_send: bool = ( + os.environ.get("WAYFORTH_FAILOVER_POST_SEND", "true").lower() == "true" + ) + # Cap the duplicate-upstream risk: only auto-fail-over post-send when the + # candidate's cost is below this (credits). Micro-calls clear it; an expensive + # outlier never gets double-paid. + failover_post_send_max_cost: int = int( + os.environ.get("WAYFORTH_FAILOVER_POST_SEND_MAX_COST", "25") + ) + + +DEFAULT_POLICY = FailoverPolicy() + + +@dataclass +class FailoverOutcome: + served_slug: str | None = None + result: object = None + cost: int = 0 + balance_after: int = 0 + tx_id: object = None + fallback_from: str | None = None + category: str | None = None + original_failure_code: str | None = None + settlement_class: str = _SETTLEMENT_PRE + execution_ms: int = 0 + retried_primary: bool = False + client_error: str | None = None # set when a hop failed with a non-service (bad-param) error + providers_tried: list[tuple[str, str]] = field(default_factory=list) + + +# ── group loader ────────────────────────────────────────────────────────────── + + +def _seed_category_of(slug: str) -> str | None: + for cat, members in _SEED_GROUPS.items(): + if any(m[0] == slug for m in members): + return cat + return None + + +async def get_substitution_chain(db, primary_slug: str) -> tuple[str | None, list[str]]: + """Return (category, ordered substitute slugs excluding the primary). + + Order: COALESCE(wri_score, -1) DESC, manual_rank ASC, slug ASC — wri dominates + WHEN present (post-launch); pre-launch all wri are null so the curated + manual_rank is the deterministic baseline. Cached per category. Falls back to + the in-module seed if the DB is empty/unreachable, so the engine + tests run + without a populated table. + """ + # Resolve the primary's category (DB first, then seed). + category: str | None = None + members: list[tuple[str, int | None]] = [] # (slug, manual_rank) + try: + cat_row = await db.fetchrow( + "SELECT category FROM substitution_groups WHERE service_slug = $1 AND active = TRUE LIMIT 1", + primary_slug, + ) + if cat_row: + category = cat_row["category"] + rows = await db.fetch( + "SELECT service_slug, manual_rank FROM substitution_groups " + "WHERE category = $1 AND active = TRUE", + category, + ) + members = [(r["service_slug"], r["manual_rank"]) for r in rows] + except Exception as exc: # DB unreachable / table missing → seed fallback + logger.warning("substitution group DB lookup failed (%s); using seed", exc) + category = None + + if not members: + category = _seed_category_of(primary_slug) + if not category: + return None, [] + members = [(s, r) for s, r in _SEED_GROUPS[category]] + + now = _time.monotonic() + cached = _CHAIN_CACHE.get(category) + if cached and cached[0] > now: + return category, [s for s in cached[1] if s != primary_slug] + + # WRI per member from the services table (catalog slug via MANAGED_TO_CATALOG). + wri: dict[str, float] = {} + try: + cat_slugs = {s: MANAGED_TO_CATALOG.get(s, s) for s, _ in members} + rows = await db.fetch( + "SELECT slug, wri_score FROM services WHERE slug = ANY($1::text[])", + list(cat_slugs.values()), + ) + by_catalog = {r["slug"]: r["wri_score"] for r in rows} + for s, cslug in cat_slugs.items(): + v = by_catalog.get(cslug) + if v is not None: + wri[s] = float(v) + except Exception: + wri = {} # no rank data yet → ordering falls back to manual_rank + + ordered = sorted( + members, + key=lambda m: (-(wri.get(m[0], -1.0)), (m[1] if m[1] is not None else 999), m[0]), + ) + ordered_slugs = [m[0] for m in ordered] + _CHAIN_CACHE[category] = (now + _CACHE_TTL, ordered_slugs) + return category, [s for s in ordered_slugs if s != primary_slug] + + +# ── category validity checks (empty/malformed body detection) ────────────────── + + +def _validate_result(category: str | None, slug: str, result) -> bool: + """True if the HTTP-200 result is a usable body for its category.""" + if result is None: + return False + if not isinstance(result, dict): + return bool(result) + if category == "web-search": + items = result.get("organic") or result.get("results") or result.get("web", {}).get("results") + return bool(items) + if category == "llm-inference": + return bool(result.get("content") or result.get("choices")) + return True # unknown category → don't reject + + +# ── event log (fire-and-forget) ──────────────────────────────────────────────── + + +async def _record_event(pool, **cols) -> None: + if pool is None: + return + try: + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO substitution_events + (slug, category, primary_provider, failure_reason, substitute_chosen, + latency_ms, success, cost_credits, settlement_class, rail, + duplicate_upstream_cost_possible, second_upstream_cost_credits, + retried_primary) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) + """, + cols.get("slug"), cols.get("category"), cols.get("primary_provider"), + cols.get("failure_reason"), cols.get("substitute_chosen"), + cols.get("latency_ms"), bool(cols.get("success")), cols.get("cost_credits"), + cols.get("settlement_class", _SETTLEMENT_PRE), cols.get("rail", "managed"), + bool(cols.get("duplicate_upstream_cost_possible")), + cols.get("second_upstream_cost_credits"), bool(cols.get("retried_primary")), + ) + except Exception as exc: + logger.warning("substitution_events write failed: %s", exc) + + +def _emit_event(pool, **cols) -> None: + """Schedule an event write without blocking the request.""" + try: + asyncio.create_task(_record_event(pool, **cols)) + except RuntimeError: + pass # no running loop (e.g. unit context) — non-critical + + +# ── the engine ────────────────────────────────────────────────────────────── + + +async def run_with_failover( + db, *, pool, request_id: str, + user_id: str, api_key_id: str | None, agent_id: str | None, + primary_slug: str, user_params: dict, + primary_error: str, primary_settlement: str, + primary_cost: int, primary_balance_after: int, primary_tx_id, + primary_svc_key: str, + rail: str = "managed", + policy: FailoverPolicy | None = None, +) -> FailoverOutcome: + """Classify → (retry-first) → idempotency-gate → chain → bill → log. + + The primary has already been deducted and run (and failed with a service-side + error). On entry the primary deduction still stands; we only refund it once we + commit to substituting (or after a failed retry). Returns a FailoverOutcome the + caller adopts (served provider) or turns into a 502 (served_slug is None). + """ + policy = policy or DEFAULT_POLICY + failure_code = _classify_failure(None, primary_error) + category, chain = await get_substitution_chain(db, primary_slug) + settlement = primary_settlement + retried = False + out = FailoverOutcome(category=category, original_failure_code=failure_code, + balance_after=primary_balance_after, settlement_class=settlement) + out.providers_tried.append((primary_slug, failure_code)) + + def _may_act(s: str) -> bool: + # pre_send is always safe to act on. post_send may double-charge upstream + # / double-settle on-chain → only act on the MANAGED rail with the flag on. + # (Even a same-provider retry repeats the upstream call, so it's gated too.) + return s == _SETTLEMENT_PRE or (rail == "managed" and policy.failover_post_send) + + # 1. Hard idempotency gate on the primary's settlement. x402/on-chain or + # flag-off + post-send → surface immediately (no retry, no substitution). + if not _may_act(settlement): + out.settlement_class = settlement + return out # served_slug None → caller surfaces + + # 2. Retry-first on the SAME provider, BEFORE refunding — the primary charge + # stands, so a successful retry never double-charges and avoids touching a + # substitute (and its duplicate-upstream risk). A pre_send retry is always + # safe (no upstream work happened). A POST-send retry repeats a call the + # upstream may have already run+billed, so it is only safe when the provider + # honors an idempotency key — otherwise we SKIP it (going straight to the + # cost-capped, instrumented substitution) rather than create duplicate cost. + _retry_safe = settlement == _SETTLEMENT_PRE or _supports_idempotency_key(primary_slug) + if policy.retry_primary_on_transient and _retry_safe: + retried = True + _mapped, _miss = map_params(primary_slug, user_params) + r_res, r_err, r_ms, r_settle = await _try_execute_managed_ex(primary_slug, _mapped, primary_svc_key) + if not r_err and _validate_result(category, primary_slug, r_res): + _emit_event(pool, slug=primary_slug, category=category, primary_provider=primary_slug, + failure_reason=failure_code, substitute_chosen=primary_slug, latency_ms=r_ms, + success=True, cost_credits=primary_cost, settlement_class=settlement, + rail=rail, retried_primary=True) + out.served_slug, out.result, out.cost = primary_slug, r_res, primary_cost + out.tx_id, out.execution_ms, out.retried_primary = primary_tx_id, r_ms, True + return out + settlement = r_settle if r_err else _SETTLEMENT_POST # retry also failed + if not _may_act(settlement): # retry turned it ambiguous on a strict rail + new_bal = await _do_refund( + db, user_id, primary_cost, primary_slug, primary_error, "/proxy", + primary_balance_after, _mk_refund_key(request_id, primary_slug, "proxy_primary")) + out.balance_after = new_bal if new_bal is not None else primary_balance_after + out.settlement_class, out.retried_primary = settlement, retried + return out + + # 3. Refund the primary (failed, no successful retry). User never double-charged. + new_bal = await _do_refund( + db, user_id, primary_cost, primary_slug, primary_error, "/proxy", + primary_balance_after, _mk_refund_key(request_id, primary_slug, "proxy_primary"), + ) + out.balance_after = new_bal if new_bal is not None else primary_balance_after + + origin_post_send = settlement == _SETTLEMENT_POST + + # 4. Chain through the group, depth-capped. + for candidate in chain[: policy.max_depth]: + cfg = SERVICE_CONFIGS.get(candidate) + if not cfg: + out.providers_tried.append((candidate, "unknown_service")) + continue + cand_key = os.environ.get(cfg["key_var"], "") + if not cand_key: + out.providers_tried.append((candidate, "no_key")) + continue + cand_cost = cfg["credits"] + # Cost cap only bites when the ORIGIN failure was post-send (duplicate risk). + if origin_post_send and cand_cost >= policy.failover_post_send_max_cost: + out.providers_tried.append((candidate, "over_post_send_cost_cap")) + continue + mapped, missing = map_params(candidate, user_params) + if missing: + out.providers_tried.append((candidate, "missing_param")) + continue + + ok, bal, tx_id = await check_and_deduct_credits( + db, user_id, cand_cost, "/proxy", service_id=candidate, tx_type="execution", + agent_id=agent_id, api_key_id=api_key_id, return_tx_id=True, + ) + if not ok: + out.providers_tried.append((candidate, "insufficient_credits")) + out.balance_after = bal + break + + result, err, ms, hop_settle = await _try_execute_managed_ex(candidate, mapped, cand_key) + valid = (not err) and _validate_result(category, candidate, result) + if not valid and not err: + err, hop_settle = "invalid_body_after_200", _SETTLEMENT_POST + + _emit_event(pool, slug=candidate, category=category, primary_provider=primary_slug, + failure_reason=failure_code, substitute_chosen=candidate, latency_ms=ms, + success=valid, cost_credits=cand_cost, settlement_class=hop_settle, rail=rail, + duplicate_upstream_cost_possible=origin_post_send, + second_upstream_cost_credits=cand_cost if origin_post_send else None, + retried_primary=retried) + + if valid: + out.served_slug, out.result, out.cost = candidate, result, cand_cost + out.balance_after, out.tx_id = bal, tx_id + out.fallback_from, out.execution_ms, out.retried_primary = primary_slug, ms, retried + return out + + # Hop failed → refund it (per-hop idempotency key) and continue. + refunded = await _do_refund( + db, user_id, cand_cost, candidate, err or "failed", "/proxy", bal, + _mk_refund_key(request_id, candidate, f"proxy_fb_{len(out.providers_tried)}"), + ) + out.balance_after = refunded if refunded is not None else bal + hop_code = _classify_failure(None, err) + out.providers_tried.append((candidate, hop_code)) + # A non-service (bad-param/client) error won't be fixed by another provider. + from routers.execute import _classify_error + if _classify_error(err or "") != "service_failure": + out.client_error = err + break + # A post-send hop tightens the gate for subsequent candidates. + if hop_settle == _SETTLEMENT_POST: + origin_post_send = True + if rail != "managed" or not policy.failover_post_send: + break + + # 5. Exhausted (or stopped) — no provider served. + out.retried_primary = retried + return out diff --git a/apps/api/main.py b/apps/api/main.py index 5b131cb..a57837b 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -23,7 +23,7 @@ # ── Version and globals ─────────────────────────────────────────────────────── -VERSION = "0.9.0" +VERSION = "0.9.1" ADMIN_KEY = os.getenv("ADMIN_KEY", "") ENVIRONMENT = os.getenv("ENVIRONMENT", "development") SENTRY_DSN = os.getenv("SENTRY_DSN", "") @@ -1144,6 +1144,58 @@ async def lifespan(app: FastAPI): ON hosted_agents(next_run_at) WHERE trigger_type = 'schedule' AND next_run_at IS NOT NULL """) + # Migration 064: substitution/failover engine — groups + event log. + # Mirrored in infra/migrations/064_substitution_engine.sql. + await _mconn.execute(""" + CREATE TABLE IF NOT EXISTS substitution_groups ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + category TEXT NOT NULL, + service_slug TEXT NOT NULL, + manual_rank INTEGER, + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (category, service_slug) + ) + """) + await _mconn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subst_groups_category + ON substitution_groups (category) WHERE active = TRUE + """) + await _mconn.execute(""" + INSERT INTO substitution_groups (category, service_slug, manual_rank) VALUES + ('web-search','serper',1),('web-search','brave',2), + ('web-search','tavily',3),('web-search','perplexity',4), + ('llm-inference','groq',1),('llm-inference','together',2), + ('llm-inference','mistral',3),('llm-inference','gemini',4) + ON CONFLICT (category, service_slug) DO NOTHING + """) + await _mconn.execute(""" + CREATE TABLE IF NOT EXISTS substitution_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + slug TEXT NOT NULL, + category TEXT, + primary_provider TEXT NOT NULL, + failure_reason TEXT, + substitute_chosen TEXT, + latency_ms INTEGER, + success BOOLEAN NOT NULL DEFAULT FALSE, + cost_credits INTEGER, + settlement_class TEXT NOT NULL, + rail TEXT NOT NULL DEFAULT 'managed', + duplicate_upstream_cost_possible BOOLEAN NOT NULL DEFAULT FALSE, + second_upstream_cost_credits INTEGER, + retried_primary BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """) + await _mconn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subst_events_primary + ON substitution_events (primary_provider, created_at DESC) + """) + await _mconn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subst_events_category + ON substitution_events (category, created_at DESC) + """) except Exception as e: logger.error("STARTUP ERROR: %s: %s", type(e).__name__, e, exc_info=True) logger.critical("DB pool creation or migrations failed: %s — exiting so the orchestrator can restart cleanly", e) diff --git a/apps/api/pytest.ini b/apps/api/pytest.ini index 1398b03..739a8e1 100644 --- a/apps/api/pytest.ini +++ b/apps/api/pytest.ini @@ -11,6 +11,7 @@ python_files = test_suite_v060.py test_suite_v062.py test_suite_v0610.py test_se test_tier3_provider_gating.py test_x402_client.py test_rails_flag.py + test_substitution.py test_uptime_source.py markers = no_api_key: test does not require WAYFORTH_TEST_API_KEY (e.g. probes unauthenticated paths) diff --git a/apps/api/routers/execute.py b/apps/api/routers/execute.py index 36c44f5..f27f519 100644 --- a/apps/api/routers/execute.py +++ b/apps/api/routers/execute.py @@ -316,6 +316,69 @@ async def _try_execute_managed( return result, error_msg, round((_time_mod.time() - t0) * 1000) +# Settlement classes for the substitution/failover engine (core/substitution.py). +# pre_send — the request never reached the upstream or the upstream +# replied with an error before doing billable work +# (connection refused, connect-timeout, HTTP 5xx/429). +# Safe to fail over: no duplicate upstream cost. +# post_send_ambiguous — the request was sent and the response was lost, or a +# 200 came back with an unusable body. The upstream MAY +# have done (and billed) the work → failing over risks a +# duplicate upstream charge / on-chain double-settlement. +_SETTLEMENT_PRE = "pre_send" +_SETTLEMENT_POST = "post_send_ambiguous" + + +async def _try_execute_managed_ex( + slug: str, params: dict, key: str, *, timeout: float = 10.0 +) -> tuple[object, str | None, int, str]: + """Like _try_execute_managed but ALSO returns a settlement_class. + + Distinguishes pre-settlement failures (safe to fail over) from + post-send-ambiguous ones (the upstream may have settled). httpx raises typed + exceptions which we catch BEFORE the asyncio.TimeoutError catch-all so the + distinction survives. No retry here — the engine owns retry/failover policy. + """ + adapter = ADAPTERS[slug] + result = None + error_msg = None + settlement = _SETTLEMENT_PRE + t0 = _time_mod.time() + # Generous outer guard only against a hung adapter; the typed httpx + # exceptions below fire first for the cases we care about. + _guard = 40.0 if slug == "assemblyai" else max(timeout + 5.0, 15.0) + try: + result = await asyncio.wait_for(adapter(params, key), timeout=_guard) + except (httpx.ConnectError, httpx.ConnectTimeout, httpx.PoolTimeout): + # No TCP connection was established (ConnectError/ConnectTimeout) or none + # was ever acquired from the pool (PoolTimeout) → the request never left + # the box → zero upstream work → SAFE to fail over. + error_msg = "Connection failed" + settlement = _SETTLEMENT_PRE + except (httpx.ReadTimeout, httpx.WriteError, httpx.WriteTimeout): + # ReadTimeout: the request WAS sent and the response was lost. Write* : a + # partial write may have reached AND been processed by the upstream. Both + # are ambiguous — the upstream may have done (and billed) the work — so + # NEVER blind-failover. This is the core of the #5 idempotency guarantee. + error_msg = "Timeout/write error after send" + settlement = _SETTLEMENT_POST + except (asyncio.TimeoutError, TimeoutError): + # Outer guard fired mid-call — we can't prove the upstream didn't run. + error_msg = "Service timeout" + settlement = _SETTLEMENT_POST + except Exception as _e: + msg = str(_e) + error_msg = msg[:300] + _m = re.search(r"\b([45]\d{2})\b", msg) + if _m and _m.group(1) in ("500", "502", "503", "504", "429"): + settlement = _SETTLEMENT_PRE # upstream errored before/without billable work + elif _m and _m.group(1).startswith("4"): + settlement = _SETTLEMENT_PRE # client error — won't be chained anyway + else: + settlement = _SETTLEMENT_POST # unclassifiable → fail safe + return result, error_msg, round((_time_mod.time() - t0) * 1000), settlement + + # Per-key concurrent SSE stream cap. SSE connections hold a worker open for the # full upstream response duration; an unbounded number per key would let one # caller exhaust connection capacity for everyone else. diff --git a/apps/api/routers/proxy.py b/apps/api/routers/proxy.py index a3addd3..d6e4637 100644 --- a/apps/api/routers/proxy.py +++ b/apps/api/routers/proxy.py @@ -54,9 +54,10 @@ _fetch_wri, _mk_refund_key, _patch_tx_signals, - _try_execute_managed, + _try_execute_managed_ex, _update_search_signal, ) +from core.substitution import run_with_failover logger = logging.getLogger("wayforth") router = APIRouter() @@ -166,69 +167,58 @@ async def proxy_call(request: Request, slug: str, db=Depends(get_db)): }) # ── Primary execution ───────────────────────────────────────────────────── - result, error_msg, execution_ms = await _try_execute_managed(slug, params, svc_key) + # _ex variant also yields the settlement_class (pre_send vs post_send_ambiguous) + # the failover engine needs for the idempotency gate. + result, error_msg, execution_ms, _primary_settlement = await _try_execute_managed_ex( + slug, params, svc_key + ) - # ── Failover on service-side failures ──────────────────────────────────── + # ── Failover on service-side failures (delegated to the substitution engine) ── + # ZERO-OVERHEAD on the happy path: the engine is only ever invoked here, inside + # the failure branch. A successful primary skips everything below. _proxy_fallback_from: str | None = None _original_failure_code: str | None = None if error_msg and _classify_error(error_msg) == "service_failure": - _original_failure_code = _classify_failure(None, error_msg) - new_bal = await _do_refund( - db, user_id, credit_cost, slug, error_msg, "/proxy", balance_after, - _mk_refund_key(getattr(request.state, "request_id", ""), slug, "proxy_managed"), + from main import app as _app_ref + outcome = await run_with_failover( + db, pool=_app_ref.state.pool, + request_id=getattr(request.state, "request_id", ""), + user_id=str(user_id), api_key_id=str(_api_key_id), agent_id=agent_id, + primary_slug=slug, user_params=_user_params, + primary_error=error_msg, primary_settlement=_primary_settlement, + primary_cost=credit_cost, primary_balance_after=balance_after, + primary_tx_id=_tx_id, primary_svc_key=svc_key, + rail="managed", ) - - _fb_slug = SERVICE_ALTERNATIVES.get(slug) - if _fb_slug and _fb_slug in SERVICE_CONFIGS: - _fb_cfg = SERVICE_CONFIGS[_fb_slug] - _fb_key = os.environ.get(_fb_cfg["key_var"], "") - if _fb_key: - _fb_mapped, _fb_miss = map_params(_fb_slug, _user_params) - if not _fb_miss: - _fb_cost = _fb_cfg["credits"] - _fb_ok, _fb_bal, _fb_tx_id = await check_and_deduct_credits( - db, str(user_id), _fb_cost, "/proxy", - service_id=_fb_slug, tx_type="execution", - agent_id=agent_id, api_key_id=str(_api_key_id), - return_tx_id=True, - ) - if _fb_ok: - result, _fb_err, execution_ms = await _try_execute_managed( - _fb_slug, _fb_mapped, _fb_key, - ) - if _fb_err and _classify_error(_fb_err) == "service_failure": - await _do_refund( - db, user_id, _fb_cost, _fb_slug, _fb_err, "/proxy", _fb_bal, - _mk_refund_key( - getattr(request.state, "request_id", ""), - _fb_slug, "proxy_managed_fb", - ), - ) - result = None - elif _fb_err: - raise HTTPException(status_code=400, detail={ - "error": _fb_err, "refunded": False, "credits_restored": 0, - }) - else: - _proxy_fallback_from = slug - slug = _fb_slug - credit_cost = _fb_cost - balance_after = _fb_bal - _tx_id = _fb_tx_id - - if result is None: - from main import app as _app_ref + if outcome.client_error: + raise HTTPException(status_code=400, detail={ + "error": outcome.client_error, "refunded": True, "credits_restored": credit_cost, + }) + if outcome.served_slug is None: + # Group exhausted (or post-send-ambiguous and not eligible to fail over). asyncio.create_task(_patch_tx_signals( - _app_ref.state.pool, _tx_id, - failure_code=_original_failure_code, + _app_ref.state.pool, _tx_id, failure_code=outcome.original_failure_code, )) - raise HTTPException(status_code=503, detail={ - "error": "Service unavailable", + raise HTTPException(status_code=502, detail={ + "error": "all_providers_failed", + "category": outcome.category, + "providers_tried": [ + {"provider": s, "reason": r} for s, r in outcome.providers_tried + ], "refunded": True, "credits_restored": credit_cost, - "credits_remaining": new_bal, + "credits_remaining": outcome.balance_after, }) + # Adopt the provider that actually served for the rest of the handler. + _proxy_fallback_from = outcome.fallback_from + _original_failure_code = outcome.original_failure_code + slug = outcome.served_slug + credit_cost = outcome.cost + balance_after = outcome.balance_after + _tx_id = outcome.tx_id + result = outcome.result + execution_ms = outcome.execution_ms elif error_msg: raise HTTPException(status_code=400, detail={ @@ -260,6 +250,11 @@ async def proxy_call(request: Request, slug: str, db=Depends(get_db)): wri = await _fetch_wri(db, slug) headers: dict[str, str] = { "X-Wayforth-Failover": "true" if _proxy_fallback_from else "false", + # Visible self-heal surface: which provider actually served, and whether + # it came via fallback. X-Wayforth-Fallback mirrors -Failover under the + # name the spec calls out; both kept for back-compat. + "X-Wayforth-Served-By": slug, + "X-Wayforth-Fallback": "true" if _proxy_fallback_from else "false", "X-Wayforth-WRI": str(wri) if wri is not None else "unknown", "X-Wayforth-Cost": str(credit_cost), "X-Wayforth-Rail": "managed", @@ -289,6 +284,8 @@ async def proxy_call(request: Request, slug: str, db=Depends(get_db)): body = { "status": "ok", "service": slug, + "served_by": slug, + "fallback": bool(_proxy_fallback_from), "result": result, "credits_deducted": credit_cost, "execution_ms": execution_ms, diff --git a/apps/api/tests/test_substitution.py b/apps/api/tests/test_substitution.py new file mode 100644 index 0000000..1ce31e5 --- /dev/null +++ b/apps/api/tests/test_substitution.py @@ -0,0 +1,297 @@ +"""test_substitution.py — deterministic substitution/failover engine (v0.9.1). + +Pure-unit: the engine's external helpers (provider execution, deduct, refund, +group loader) are monkeypatched, so no DB/network is touched. Covers ordering, +depth cap, settlement classification + idempotency gate, served-only billing, +the 502-on-exhaustion shape, and the substitution_events emission. +""" +from __future__ import annotations + +import asyncio + +import httpx +import pytest + +from core import substitution as sub +from core.substitution import FailoverPolicy, run_with_failover +import routers.execute as _ex +from routers.execute import _SETTLEMENT_PRE, _SETTLEMENT_POST + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +_OK_SEARCH = {"organic": [{"title": "r", "link": "u", "snippet": "s"}]} + + +def _fake_exec(script: dict): + """script: slug -> (result, err, ms, settlement). Default = success.""" + async def fake(slug, params, key, **kw): + return script.get(slug, (_OK_SEARCH, None, 5, _SETTLEMENT_PRE)) + return fake + + +@pytest.fixture +def harness(monkeypatch): + # Every managed provider needs a key present, else the engine skips it as + # "no_key" before it can be attempted. + from services.managed import SERVICE_CONFIGS + for _cfg in SERVICE_CONFIGS.values(): + monkeypatch.setenv(_cfg["key_var"], "TESTKEY") + + deducts: list = [] + refunds: list = [] + events: list = [] + + async def fake_deduct(db, user_id, cost, endpoint, **kw): + deducts.append((kw.get("service_id"), cost)) + return True, 1000 - cost, f"tx_{kw.get('service_id')}" + + async def fake_refund(db, user_id, cost, slug, err, ep, bal, key): + refunds.append((slug, cost, key)) + return (bal or 0) + cost + + monkeypatch.setattr(sub, "check_and_deduct_credits", fake_deduct) + monkeypatch.setattr(sub, "_do_refund", fake_refund) + monkeypatch.setattr(sub, "_emit_event", lambda pool, **cols: events.append(cols)) + + def set_chain(category, chain): + async def fake_chain(db, primary): + return category, [s for s in chain if s != primary] + monkeypatch.setattr(sub, "get_substitution_chain", fake_chain) + + def set_exec(script): + monkeypatch.setattr(sub, "_try_execute_managed_ex", _fake_exec(script)) + + return {"deducts": deducts, "refunds": refunds, "events": events, + "set_chain": set_chain, "set_exec": set_exec} + + +async def _run(primary="serper", primary_err="Service timeout", + primary_settlement=_SETTLEMENT_PRE, rail="managed", policy=None): + # Chain-focused tests run with retry-first OFF so they exercise substitution + # directly; the retry-first test passes its own policy. + if policy is None: + policy = FailoverPolicy(retry_primary_on_transient=False) + return await run_with_failover( + db=None, pool=None, request_id="req1", + user_id="u1", api_key_id="k1", agent_id=None, + primary_slug=primary, user_params={"query": "hello"}, + primary_error=primary_err, primary_settlement=primary_settlement, + primary_cost=3, primary_balance_after=997, primary_tx_id="tx_serper", + primary_svc_key="KEY", rail=rail, policy=policy, + ) + + +# ── 0. classification matrix (the #5 idempotency core) ──────────────────────── + +@pytest.mark.parametrize("exc,expected", [ + (httpx.ConnectError("x"), _SETTLEMENT_PRE), # no TCP connection + (httpx.ConnectTimeout("x"), _SETTLEMENT_PRE), # never connected + (httpx.PoolTimeout("x"), _SETTLEMENT_PRE), # never acquired a connection + (httpx.ReadTimeout("x"), _SETTLEMENT_POST), # sent, response lost + (httpx.WriteError("x"), _SETTLEMENT_POST), # partial write may have landed + (httpx.WriteTimeout("x"), _SETTLEMENT_POST), # partial write may have landed +]) +def test_classification_matrix_httpx(monkeypatch, exc, expected): + async def boom(params, key): + raise exc + monkeypatch.setitem(_ex.ADAPTERS, "serper", boom) + _r, err, _ms, settle = asyncio.run(_ex._try_execute_managed_ex("serper", {"query": "x"}, "K")) + assert err and settle == expected + + +@pytest.mark.parametrize("msg,expected", [ + ("Brave Search error 503: down", _SETTLEMENT_PRE), # 5xx received → fail-over-safe + ("error 500 internal", _SETTLEMENT_PRE), + ("rate limited 429", _SETTLEMENT_PRE), # 429 received → fail-over-safe + ("totally weird no status", _SETTLEMENT_POST), # unclassifiable → fail safe +]) +def test_classification_matrix_http_status(monkeypatch, msg, expected): + async def boom(params, key): + raise Exception(msg) + monkeypatch.setitem(_ex.ADAPTERS, "serper", boom) + _r, err, _ms, settle = asyncio.run(_ex._try_execute_managed_ex("serper", {"query": "x"}, "K")) + assert err and settle == expected + + +# ── 1. group ordering (real loader, seed + wri) ──────────────────────────────── + +class _OrderDB: + """Fake DB routing the loader's queries; provides wri for some members.""" + def __init__(self, wri): + self._wri = wri + async def fetchrow(self, q, *a): + return {"category": "web-search"} # primary belongs to web-search + async def fetch(self, q, *a): + if "substitution_groups" in q: + return [{"service_slug": s, "manual_rank": r} + for s, r in (("serper", 1), ("brave", 2), ("tavily", 3), ("perplexity", 4))] + # services wri query — catalog slugs + return [{"slug": k, "wri_score": v} for k, v in self._wri.items()] + + +def test_ordering_manual_rank_when_no_wri(): + sub._CHAIN_CACHE.clear() + cat, chain = asyncio.run(sub.get_substitution_chain(_OrderDB({}), "serper")) + assert cat == "web-search" + assert chain == ["brave", "tavily", "perplexity"] # curated manual_rank, primary excluded + + +def test_ordering_wri_dominates_when_present(): + sub._CHAIN_CACHE.clear() + # perplexity catalog slug is perplexity_ai; give it the top score. + db = _OrderDB({"perplexity_ai": 95.0, "brave_search": 60.0}) + cat, chain = asyncio.run(sub.get_substitution_chain(db, "serper")) + # wri-having first by wri desc (perplexity, brave), then the rest by manual_rank (tavily) + assert chain[0] == "perplexity" and chain[1] == "brave" + assert "tavily" in chain and "serper" not in chain + + +def test_never_out_of_group(): + sub._CHAIN_CACHE.clear() + _, chain = asyncio.run(sub.get_substitution_chain(_OrderDB({}), "serper")) + assert all(s in {"brave", "tavily", "perplexity"} for s in chain) + assert "groq" not in chain # llm member never leaks into web-search + + +# ── 2. chain + billing: charge served-only ──────────────────────────────────── + +def test_chains_until_success_and_bills_served_only(harness): + harness["set_chain"]("web-search", ["brave", "tavily", "perplexity"]) + harness["set_exec"]({ + "brave": ({}, "Brave Search error 503", 5, _SETTLEMENT_PRE), # fails + "tavily": ({"results": [{"title": "t"}]}, None, 7, _SETTLEMENT_PRE), # serves + }) + out = asyncio.run(_run()) + assert out.served_slug == "tavily" + assert out.fallback_from == "serper" + # billed: brave (refunded) + tavily (served); primary serper refunded. + assert ("brave", 6) in harness["deducts"] and ("tavily", 10) in harness["deducts"] + refunded_slugs = [r[0] for r in harness["refunds"]] + assert "serper" in refunded_slugs and "brave" in refunded_slugs + assert "tavily" not in refunded_slugs # served provider is NOT refunded + # per-hop refund keys are distinct (no collision) + assert len({r[2] for r in harness["refunds"]}) == len(harness["refunds"]) + + +# ── 3. depth cap + 502 on exhaustion ────────────────────────────────────────── + +def test_depth_cap_and_502_shape(harness): + harness["set_chain"]("web-search", ["brave", "tavily", "perplexity"]) + harness["set_exec"]({ # all fail pre_send + "brave": ({}, "err 503", 5, _SETTLEMENT_PRE), + "tavily": ({}, "err 503", 5, _SETTLEMENT_PRE), + "perplexity": ({}, "err 503", 5, _SETTLEMENT_PRE), + }) + out = asyncio.run(_run(policy=FailoverPolicy(max_depth=2, retry_primary_on_transient=False))) + assert out.served_slug is None + tried = [s for s, _ in out.providers_tried] + assert tried[0] == "serper" # primary + assert tried[1:] == ["brave", "tavily"] # exactly max_depth=2 candidates, not perplexity + + +# ── 4. idempotency gate: pre vs post settlement ─────────────────────────────── + +def test_pre_send_fails_over(harness): + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({"brave": (_OK_SEARCH, None, 5, _SETTLEMENT_PRE)}) + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_PRE)) + assert out.served_slug == "brave" + + +def test_post_send_managed_fails_over_under_cap(harness): + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({"brave": (_OK_SEARCH, None, 5, _SETTLEMENT_PRE)}) + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_POST, rail="managed", + policy=FailoverPolicy(retry_primary_on_transient=False))) + assert out.served_slug == "brave" + # the substitute hop is flagged as a possible duplicate-upstream cost + ev = [e for e in harness["events"] if e.get("slug") == "brave"][0] + assert ev["duplicate_upstream_cost_possible"] is True + assert ev["second_upstream_cost_credits"] == 6 + + +def test_post_send_x402_does_not_fail_over(harness): + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({"brave": (_OK_SEARCH, None, 5, _SETTLEMENT_PRE)}) + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_POST, rail="x402")) + assert out.served_slug is None # strict on on-chain — surfaces instead + assert ("brave", 6) not in harness["deducts"] # never even attempted + + +def test_post_send_over_cost_cap_skips_expensive(harness): + # stability (86 cr) is over the default 25-cap → skipped on a post-send origin. + harness["set_chain"]("media", ["stability"]) + harness["set_exec"]({"stability": ({"image_base64": "x"}, None, 5, _SETTLEMENT_PRE)}) + out = asyncio.run(_run(primary="elevenlabs", primary_settlement=_SETTLEMENT_POST, + policy=FailoverPolicy(retry_primary_on_transient=False))) + assert out.served_slug is None + assert ("stability", 86) not in harness["deducts"] + assert ("stability", "over_post_send_cost_cap") in out.providers_tried + # Refund safety: the primary is STILL refunded even though nothing served — + # the user is never charged-with-no-result-and-no-refund. + assert "elevenlabs" in [r[0] for r in harness["refunds"]] + + +def test_post_send_flag_off_strict(harness): + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({"brave": (_OK_SEARCH, None, 5, _SETTLEMENT_PRE)}) + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_POST, + policy=FailoverPolicy(failover_post_send=False))) + assert out.served_slug is None + + +# ── 5. retry-first on read-timeout ──────────────────────────────────────────── + +def test_retry_first_pre_send_succeeds_keeps_primary_charge(harness): + # pre_send retry is always safe (no upstream work happened on the first try). + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({"serper": (_OK_SEARCH, None, 9, _SETTLEMENT_PRE)}) + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_PRE, + policy=FailoverPolicy(retry_primary_on_transient=True))) + assert out.served_slug == "serper" and out.retried_primary is True + assert harness["refunds"] == [] # primary charge stands; nothing refunded + assert harness["deducts"] == [] # no substitute deducted + + +def test_post_send_retry_skipped_without_idempotency_key(harness): + # post_send + no provider idempotency key → retry-first is SKIPPED (it would + # create the duplicate cost it's meant to prevent) → substitute instead. The + # served provider is the substitute, NOT a retried primary. + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({ + "serper": (_OK_SEARCH, None, 9, _SETTLEMENT_PRE), # would serve IF retried + "brave": (_OK_SEARCH, None, 5, _SETTLEMENT_PRE), + }) + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_POST, rail="managed", + policy=FailoverPolicy(retry_primary_on_transient=True))) + assert out.served_slug == "brave" # substitute served, not a retried serper + assert ("brave", 6) in harness["deducts"] + assert out.retried_primary is False # retry never ran + + +# ── 6. invalid body after 200 → treated as post-send ────────────────────────── + +def test_invalid_body_after_200_does_not_serve(harness): + harness["set_chain"]("web-search", ["brave"]) + harness["set_exec"]({"brave": ({"organic": []}, None, 5, _SETTLEMENT_PRE)}) # empty → invalid + out = asyncio.run(_run(primary_settlement=_SETTLEMENT_PRE)) + # brave returned 200 but empty → invalid → refunded, not served + assert out.served_slug is None + assert "brave" in [r[0] for r in harness["refunds"]] + + +# ── 7. event row emitted per hop with settlement_class ──────────────────────── + +def test_events_emitted_per_hop(harness): + harness["set_chain"]("web-search", ["brave", "tavily"]) + harness["set_exec"]({ + "brave": ({}, "err 503", 5, _SETTLEMENT_PRE), + "tavily": ({"results": [{"t": 1}]}, None, 6, _SETTLEMENT_PRE), + }) + asyncio.run(_run()) + slugs = [e["slug"] for e in harness["events"]] + assert "brave" in slugs and "tavily" in slugs + for e in harness["events"]: + assert e["settlement_class"] in (_SETTLEMENT_PRE, _SETTLEMENT_POST) + assert e["primary_provider"] == "serper" diff --git a/infra/migrations/064_substitution_engine.sql b/infra/migrations/064_substitution_engine.sql new file mode 100644 index 0000000..2e2e235 --- /dev/null +++ b/infra/migrations/064_substitution_engine.sql @@ -0,0 +1,69 @@ +-- 064_substitution_engine.sql — config-driven substitution groups + failover event log (v0.9.1) +-- +-- Extends the Reliability Proxy (/proxy/{slug}) from single-hop into a full +-- multi-hop substitution/failover engine. Two tables: +-- substitution_groups — equivalence sets of interchangeable providers per +-- Wayforth logical category. A provider is ONLY ever substituted within its +-- group. Ordering at runtime is COALESCE(wri_score,-1) DESC, manual_rank ASC, +-- slug ASC — so pre-launch (all wri_score null) the curated manual_rank is the +-- deterministic baseline, and wri_score takes over once rank data exists. +-- substitution_events — one row per failover ATTEMPT (the future learned-layer +-- training signal). Nothing consumes it yet; written fire-and-forget on the +-- failure path only. +-- +-- Additive + idempotent. Mirrored as CREATE TABLE IF NOT EXISTS in main.py startup. + +BEGIN; + +CREATE TABLE IF NOT EXISTS substitution_groups ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + category TEXT NOT NULL, -- 'web-search' | 'llm-inference' | ... + service_slug TEXT NOT NULL, -- managed slug (SERVICE_CONFIGS key) + manual_rank INTEGER, -- curated priority; lower = preferred + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (category, service_slug) +); +CREATE INDEX IF NOT EXISTS idx_subst_groups_category + ON substitution_groups (category) WHERE active = TRUE; + +-- Seed with categories that have >=2 genuinely interchangeable providers today. +-- Curated manual_rank makes pre-launch ordering deterministic. Weather (only +-- openweather) and geocoding/maps (no providers) are intentionally NOT seeded +-- until peers exist — substitution within a 1-member group is a no-op. +INSERT INTO substitution_groups (category, service_slug, manual_rank) VALUES + ('web-search', 'serper', 1), + ('web-search', 'brave', 2), + ('web-search', 'tavily', 3), + ('web-search', 'perplexity', 4), + ('llm-inference', 'groq', 1), + ('llm-inference', 'together', 2), + ('llm-inference', 'mistral', 3), + ('llm-inference', 'gemini', 4) +ON CONFLICT (category, service_slug) DO NOTHING; + +-- One row per failover attempt. settlement_class + duplicate_upstream_cost_possible +-- + second_upstream_cost_credits make the idempotency leak MEASURED, not assumed. +CREATE TABLE IF NOT EXISTS substitution_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + slug TEXT NOT NULL, -- candidate attempted this hop + category TEXT, + primary_provider TEXT NOT NULL, -- originally-requested slug + failure_reason TEXT, -- failure_code that triggered this hop + substitute_chosen TEXT, -- slug actually attempted (== slug) + latency_ms INTEGER, + success BOOLEAN NOT NULL DEFAULT FALSE, + cost_credits INTEGER, + settlement_class TEXT NOT NULL, -- 'pre_send' | 'post_send_ambiguous' + rail TEXT NOT NULL DEFAULT 'managed', + duplicate_upstream_cost_possible BOOLEAN NOT NULL DEFAULT FALSE, + second_upstream_cost_credits INTEGER, -- measured duplicate-cost leak + retried_primary BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_subst_events_primary + ON substitution_events (primary_provider, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_subst_events_category + ON substitution_events (category, created_at DESC); + +COMMIT;