Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 388 additions & 0 deletions apps/api/core/substitution.py

Large diffs are not rendered by default.

54 changes: 53 additions & 1 deletion apps/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# ── Version and globals ───────────────────────────────────────────────────────

VERSION = "0.9.0"
VERSION = "0.9.1"
ADMIN_KEY = os.getenv("ADMIN_KEY", "")
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
SENTRY_DSN = os.getenv("SENTRY_DSN", "")
Expand Down Expand Up @@ -1144,6 +1144,58 @@ async def lifespan(app: FastAPI):
ON hosted_agents(next_run_at)
WHERE trigger_type = 'schedule' AND next_run_at IS NOT NULL
""")
# Migration 064: substitution/failover engine — groups + event log.
# Mirrored in infra/migrations/064_substitution_engine.sql.
await _mconn.execute("""
CREATE TABLE IF NOT EXISTS substitution_groups (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
category TEXT NOT NULL,
service_slug TEXT NOT NULL,
manual_rank INTEGER,
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (category, service_slug)
)
""")
await _mconn.execute("""
CREATE INDEX IF NOT EXISTS idx_subst_groups_category
ON substitution_groups (category) WHERE active = TRUE
""")
await _mconn.execute("""
INSERT INTO substitution_groups (category, service_slug, manual_rank) VALUES
('web-search','serper',1),('web-search','brave',2),
('web-search','tavily',3),('web-search','perplexity',4),
('llm-inference','groq',1),('llm-inference','together',2),
('llm-inference','mistral',3),('llm-inference','gemini',4)
ON CONFLICT (category, service_slug) DO NOTHING
""")
await _mconn.execute("""
CREATE TABLE IF NOT EXISTS substitution_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
slug TEXT NOT NULL,
category TEXT,
primary_provider TEXT NOT NULL,
failure_reason TEXT,
substitute_chosen TEXT,
latency_ms INTEGER,
success BOOLEAN NOT NULL DEFAULT FALSE,
cost_credits INTEGER,
settlement_class TEXT NOT NULL,
rail TEXT NOT NULL DEFAULT 'managed',
duplicate_upstream_cost_possible BOOLEAN NOT NULL DEFAULT FALSE,
second_upstream_cost_credits INTEGER,
retried_primary BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
await _mconn.execute("""
CREATE INDEX IF NOT EXISTS idx_subst_events_primary
ON substitution_events (primary_provider, created_at DESC)
""")
await _mconn.execute("""
CREATE INDEX IF NOT EXISTS idx_subst_events_category
ON substitution_events (category, created_at DESC)
""")
except Exception as e:
logger.error("STARTUP ERROR: %s: %s", type(e).__name__, e, exc_info=True)
logger.critical("DB pool creation or migrations failed: %s — exiting so the orchestrator can restart cleanly", e)
Expand Down
1 change: 1 addition & 0 deletions apps/api/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ python_files = test_suite_v060.py test_suite_v062.py test_suite_v0610.py test_se
test_tier3_provider_gating.py
test_x402_client.py
test_rails_flag.py
test_substitution.py
test_uptime_source.py
markers =
no_api_key: test does not require WAYFORTH_TEST_API_KEY (e.g. probes unauthenticated paths)
63 changes: 63 additions & 0 deletions apps/api/routers/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,69 @@ async def _try_execute_managed(
return result, error_msg, round((_time_mod.time() - t0) * 1000)


# Settlement classes for the substitution/failover engine (core/substitution.py).
# pre_send — the request never reached the upstream or the upstream
# replied with an error before doing billable work
# (connection refused, connect-timeout, HTTP 5xx/429).
# Safe to fail over: no duplicate upstream cost.
# post_send_ambiguous — the request was sent and the response was lost, or a
# 200 came back with an unusable body. The upstream MAY
# have done (and billed) the work → failing over risks a
# duplicate upstream charge / on-chain double-settlement.
_SETTLEMENT_PRE = "pre_send"
_SETTLEMENT_POST = "post_send_ambiguous"


async def _try_execute_managed_ex(
slug: str, params: dict, key: str, *, timeout: float = 10.0
) -> tuple[object, str | None, int, str]:
"""Like _try_execute_managed but ALSO returns a settlement_class.

Distinguishes pre-settlement failures (safe to fail over) from
post-send-ambiguous ones (the upstream may have settled). httpx raises typed
exceptions which we catch BEFORE the asyncio.TimeoutError catch-all so the
distinction survives. No retry here — the engine owns retry/failover policy.
"""
adapter = ADAPTERS[slug]
result = None
error_msg = None
settlement = _SETTLEMENT_PRE
t0 = _time_mod.time()
# Generous outer guard only against a hung adapter; the typed httpx
# exceptions below fire first for the cases we care about.
_guard = 40.0 if slug == "assemblyai" else max(timeout + 5.0, 15.0)
try:
result = await asyncio.wait_for(adapter(params, key), timeout=_guard)
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.PoolTimeout):
# No TCP connection was established (ConnectError/ConnectTimeout) or none
# was ever acquired from the pool (PoolTimeout) → the request never left
# the box → zero upstream work → SAFE to fail over.
error_msg = "Connection failed"
settlement = _SETTLEMENT_PRE
except (httpx.ReadTimeout, httpx.WriteError, httpx.WriteTimeout):
# ReadTimeout: the request WAS sent and the response was lost. Write* : a
# partial write may have reached AND been processed by the upstream. Both
# are ambiguous — the upstream may have done (and billed) the work — so
# NEVER blind-failover. This is the core of the #5 idempotency guarantee.
error_msg = "Timeout/write error after send"
settlement = _SETTLEMENT_POST
except (asyncio.TimeoutError, TimeoutError):
# Outer guard fired mid-call — we can't prove the upstream didn't run.
error_msg = "Service timeout"
settlement = _SETTLEMENT_POST
except Exception as _e:
msg = str(_e)
error_msg = msg[:300]
_m = re.search(r"\b([45]\d{2})\b", msg)
if _m and _m.group(1) in ("500", "502", "503", "504", "429"):
settlement = _SETTLEMENT_PRE # upstream errored before/without billable work
elif _m and _m.group(1).startswith("4"):
settlement = _SETTLEMENT_PRE # client error — won't be chained anyway
else:
settlement = _SETTLEMENT_POST # unclassifiable → fail safe
return result, error_msg, round((_time_mod.time() - t0) * 1000), settlement


# Per-key concurrent SSE stream cap. SSE connections hold a worker open for the
# full upstream response duration; an unbounded number per key would let one
# caller exhaust connection capacity for everyone else.
Expand Down
103 changes: 50 additions & 53 deletions apps/api/routers/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@
_fetch_wri,
_mk_refund_key,
_patch_tx_signals,
_try_execute_managed,
_try_execute_managed_ex,
_update_search_signal,
)
from core.substitution import run_with_failover

logger = logging.getLogger("wayforth")
router = APIRouter()
Expand Down Expand Up @@ -166,69 +167,58 @@ async def proxy_call(request: Request, slug: str, db=Depends(get_db)):
})

# ── Primary execution ─────────────────────────────────────────────────────
result, error_msg, execution_ms = await _try_execute_managed(slug, params, svc_key)
# _ex variant also yields the settlement_class (pre_send vs post_send_ambiguous)
# the failover engine needs for the idempotency gate.
result, error_msg, execution_ms, _primary_settlement = await _try_execute_managed_ex(
slug, params, svc_key
)

# ── Failover on service-side failures ────────────────────────────────────
# ── Failover on service-side failures (delegated to the substitution engine) ──
# ZERO-OVERHEAD on the happy path: the engine is only ever invoked here, inside
# the failure branch. A successful primary skips everything below.
_proxy_fallback_from: str | None = None
_original_failure_code: str | None = None

if error_msg and _classify_error(error_msg) == "service_failure":
_original_failure_code = _classify_failure(None, error_msg)
new_bal = await _do_refund(
db, user_id, credit_cost, slug, error_msg, "/proxy", balance_after,
_mk_refund_key(getattr(request.state, "request_id", ""), slug, "proxy_managed"),
from main import app as _app_ref
outcome = await run_with_failover(
db, pool=_app_ref.state.pool,
request_id=getattr(request.state, "request_id", ""),
user_id=str(user_id), api_key_id=str(_api_key_id), agent_id=agent_id,
primary_slug=slug, user_params=_user_params,
primary_error=error_msg, primary_settlement=_primary_settlement,
primary_cost=credit_cost, primary_balance_after=balance_after,
primary_tx_id=_tx_id, primary_svc_key=svc_key,
rail="managed",
)

_fb_slug = SERVICE_ALTERNATIVES.get(slug)
if _fb_slug and _fb_slug in SERVICE_CONFIGS:
_fb_cfg = SERVICE_CONFIGS[_fb_slug]
_fb_key = os.environ.get(_fb_cfg["key_var"], "")
if _fb_key:
_fb_mapped, _fb_miss = map_params(_fb_slug, _user_params)
if not _fb_miss:
_fb_cost = _fb_cfg["credits"]
_fb_ok, _fb_bal, _fb_tx_id = await check_and_deduct_credits(
db, str(user_id), _fb_cost, "/proxy",
service_id=_fb_slug, tx_type="execution",
agent_id=agent_id, api_key_id=str(_api_key_id),
return_tx_id=True,
)
if _fb_ok:
result, _fb_err, execution_ms = await _try_execute_managed(
_fb_slug, _fb_mapped, _fb_key,
)
if _fb_err and _classify_error(_fb_err) == "service_failure":
await _do_refund(
db, user_id, _fb_cost, _fb_slug, _fb_err, "/proxy", _fb_bal,
_mk_refund_key(
getattr(request.state, "request_id", ""),
_fb_slug, "proxy_managed_fb",
),
)
result = None
elif _fb_err:
raise HTTPException(status_code=400, detail={
"error": _fb_err, "refunded": False, "credits_restored": 0,
})
else:
_proxy_fallback_from = slug
slug = _fb_slug
credit_cost = _fb_cost
balance_after = _fb_bal
_tx_id = _fb_tx_id

if result is None:
from main import app as _app_ref
if outcome.client_error:
raise HTTPException(status_code=400, detail={
"error": outcome.client_error, "refunded": True, "credits_restored": credit_cost,
})
if outcome.served_slug is None:
# Group exhausted (or post-send-ambiguous and not eligible to fail over).
asyncio.create_task(_patch_tx_signals(
_app_ref.state.pool, _tx_id,
failure_code=_original_failure_code,
_app_ref.state.pool, _tx_id, failure_code=outcome.original_failure_code,
))
raise HTTPException(status_code=503, detail={
"error": "Service unavailable",
raise HTTPException(status_code=502, detail={
"error": "all_providers_failed",
"category": outcome.category,
"providers_tried": [
{"provider": s, "reason": r} for s, r in outcome.providers_tried
],
"refunded": True,
"credits_restored": credit_cost,
"credits_remaining": new_bal,
"credits_remaining": outcome.balance_after,
})
# Adopt the provider that actually served for the rest of the handler.
_proxy_fallback_from = outcome.fallback_from
_original_failure_code = outcome.original_failure_code
slug = outcome.served_slug
credit_cost = outcome.cost
balance_after = outcome.balance_after
_tx_id = outcome.tx_id
result = outcome.result
execution_ms = outcome.execution_ms

elif error_msg:
raise HTTPException(status_code=400, detail={
Expand Down Expand Up @@ -260,6 +250,11 @@ async def proxy_call(request: Request, slug: str, db=Depends(get_db)):
wri = await _fetch_wri(db, slug)
headers: dict[str, str] = {
"X-Wayforth-Failover": "true" if _proxy_fallback_from else "false",
# Visible self-heal surface: which provider actually served, and whether
# it came via fallback. X-Wayforth-Fallback mirrors -Failover under the
# name the spec calls out; both kept for back-compat.
"X-Wayforth-Served-By": slug,
"X-Wayforth-Fallback": "true" if _proxy_fallback_from else "false",
"X-Wayforth-WRI": str(wri) if wri is not None else "unknown",
"X-Wayforth-Cost": str(credit_cost),
"X-Wayforth-Rail": "managed",
Expand Down Expand Up @@ -289,6 +284,8 @@ async def proxy_call(request: Request, slug: str, db=Depends(get_db)):
body = {
"status": "ok",
"service": slug,
"served_by": slug,
"fallback": bool(_proxy_fallback_from),
"result": result,
"credits_deducted": credit_cost,
"execution_ms": execution_ms,
Expand Down
Loading
Loading