Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions circuitbreaker/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
// and the request was rejected without calling the handler.
var ErrCircuitOpen = xerrors.New("circuit breaker is open")

// DefaultIsFailure returns true for standard HTTP status codes that typically
// indicate upstream overload.
// DefaultIsFailure returns true for standard HTTP status codes that
// typically indicate upstream overload.
//
// Note: 429 (Too Many Requests) is intentionally excluded. Rate
// limits are key-specific and handled by automatic key failover.
func DefaultIsFailure(statusCode int) bool {
switch statusCode {
case http.StatusTooManyRequests, // 429
http.StatusServiceUnavailable, // 503
http.StatusGatewayTimeout: // 504
case http.StatusServiceUnavailable, // 503
http.StatusGatewayTimeout: // 504
return true
default:
return false
Expand Down
12 changes: 6 additions & 6 deletions circuitbreaker/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func TestExecute_PerModelIsolation(t *testing.T) {
sonnetModel := "claude-sonnet-4-20250514"
haikuModel := "claude-3-5-haiku-20241022"

// Trip circuit on sonnet model (returns 429)
// Trip circuit on sonnet model (returns 503)
w := httptest.NewRecorder()
err := cbs.Execute(endpoint, sonnetModel, w, func(rw http.ResponseWriter) error {
sonnetCalls.Add(1)
rw.WriteHeader(http.StatusTooManyRequests)
rw.WriteHeader(http.StatusServiceUnavailable)
return nil
})
assert.NoError(t, err)
Expand Down Expand Up @@ -79,11 +79,11 @@ func TestExecute_PerEndpointIsolation(t *testing.T) {

model := "test-model"

// Trip circuit on /v1/messages endpoint (returns 429)
// Trip circuit on /v1/messages endpoint (returns 503)
w := httptest.NewRecorder()
err := cbs.Execute("/v1/messages", model, w, func(rw http.ResponseWriter) error {
messagesCalls.Add(1)
rw.WriteHeader(http.StatusTooManyRequests)
rw.WriteHeader(http.StatusServiceUnavailable)
return nil
})
assert.NoError(t, err)
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestExecute_OnStateChange(t *testing.T) {
// Trip circuit
w := httptest.NewRecorder()
err := cbs.Execute(endpoint, model, w, func(rw http.ResponseWriter) error {
rw.WriteHeader(http.StatusTooManyRequests)
rw.WriteHeader(http.StatusServiceUnavailable)
return nil
})
assert.NoError(t, err)
Expand All @@ -202,7 +202,7 @@ func TestDefaultIsFailure(t *testing.T) {
{http.StatusOK, false},
{http.StatusBadRequest, false},
{http.StatusUnauthorized, false},
{http.StatusTooManyRequests, true}, // 429
{http.StatusTooManyRequests, false}, // 429 — handled by key failover, not circuit breaker
{http.StatusInternalServerError, false},
{http.StatusBadGateway, false},
{http.StatusServiceUnavailable, true}, // 503
Expand Down
42 changes: 21 additions & 21 deletions internal/integrationtest/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

// Common response bodies for circuit breaker tests.
const (
anthropicRateLimitError = `{"type":"error","error":{"type":"rate_limit_error","message":"rate limited"}}`
openAIRateLimitError = `{"error":{"type":"rate_limit_error","message":"rate limited","code":"rate_limit_exceeded"}}`
anthropicOverloadedError = `{"type":"error","error":{"type":"api_error","message":"Internal server error"}}`
openAIOverloadedError = `{"error":{"message":"Service Unavailable.","type":"cf_service_unavailable","code":503}}`
)

func anthropicSuccessResponse(model string) string {
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestCircuitBreaker_FullRecoveryCycle(t *testing.T) {
expectProvider: config.ProviderAnthropic,
expectEndpoint: "/v1/messages",
expectModel: "claude-sonnet-4-20250514",
errorBody: anthropicRateLimitError,
errorBody: anthropicOverloadedError,
successBody: anthropicSuccessResponse("claude-sonnet-4-20250514"),
requestBody: `{"model":"claude-sonnet-4-20250514","max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`,
headers: http.Header{
Expand All @@ -80,7 +80,7 @@ func TestCircuitBreaker_FullRecoveryCycle(t *testing.T) {
expectProvider: config.ProviderOpenAI,
expectEndpoint: "/v1/chat/completions",
expectModel: "gpt-4o",
errorBody: openAIRateLimitError,
errorBody: openAIOverloadedError,
successBody: openAISuccessResponse("gpt-4o"),
requestBody: `{"model":"gpt-4o","messages":[{"role":"user","content":"hi"}]}`,
headers: http.Header{"Authorization": {"Bearer test-key"}},
Expand All @@ -103,14 +103,14 @@ func TestCircuitBreaker_FullRecoveryCycle(t *testing.T) {
var shouldFail atomic.Bool
shouldFail.Store(true)

// Mock upstream that returns 429 or 200 based on shouldFail flag.
// Mock upstream that returns 503 or 200 based on shouldFail flag.
// x-should-retry: false is required to disable SDK automatic retries (default MaxRetries=2).
mockUpstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upstreamCalls.Add(1)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("x-should-retry", "false")
if shouldFail.Load() {
w.WriteHeader(http.StatusTooManyRequests)
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(tc.errorBody))
} else {
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -146,10 +146,10 @@ func TestCircuitBreaker_FullRecoveryCycle(t *testing.T) {
}

// Phase 1: Trip the circuit breaker
// First FailureThreshold requests hit upstream, get 429
// First FailureThreshold requests hit upstream, get 503
for i := uint32(0); i < cbConfig.FailureThreshold; i++ {
status := doRequest()
assert.Equal(t, http.StatusTooManyRequests, status)
assert.Equal(t, http.StatusServiceUnavailable, status)
}
//nolint:gosec // G115: test constant, no overflow risk
assert.Equal(t, int32(cbConfig.FailureThreshold), upstreamCalls.Load())
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestCircuitBreaker_HalfOpenFailure(t *testing.T) {
expectProvider: config.ProviderAnthropic,
expectEndpoint: "/v1/messages",
expectModel: "claude-sonnet-4-20250514",
errorBody: anthropicRateLimitError,
errorBody: anthropicOverloadedError,
requestBody: `{"model":"claude-sonnet-4-20250514","max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`,
headers: http.Header{
"x-api-key": {"test"},
Expand All @@ -247,7 +247,7 @@ func TestCircuitBreaker_HalfOpenFailure(t *testing.T) {
expectProvider: config.ProviderOpenAI,
expectEndpoint: "/v1/chat/completions",
expectModel: "gpt-4o",
errorBody: openAIRateLimitError,
errorBody: openAIOverloadedError,
requestBody: `{"model":"gpt-4o","messages":[{"role":"user","content":"hi"}]}`,
headers: http.Header{"Authorization": {"Bearer test-key"}},
path: pathOpenAIChatCompletions,
Expand All @@ -267,12 +267,12 @@ func TestCircuitBreaker_HalfOpenFailure(t *testing.T) {

var upstreamCalls atomic.Int32

// Mock upstream that always returns 429.
// Mock upstream that always returns 503.
mockUpstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upstreamCalls.Add(1)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("x-should-retry", "false")
w.WriteHeader(http.StatusTooManyRequests)
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(tc.errorBody))
}))
defer mockUpstream.Close()
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestCircuitBreaker_HalfOpenFailure(t *testing.T) {
// Phase 1: Trip the circuit
for i := uint32(0); i < cbConfig.FailureThreshold; i++ {
status := doRequest()
assert.Equal(t, http.StatusTooManyRequests, status)
assert.Equal(t, http.StatusServiceUnavailable, status)
}

// Verify circuit is open
Expand All @@ -321,7 +321,7 @@ func TestCircuitBreaker_HalfOpenFailure(t *testing.T) {
// Phase 3: Request in half-open state fails, circuit should re-open
upstreamCallsBefore := upstreamCalls.Load()
status = doRequest()
assert.Equal(t, http.StatusTooManyRequests, status, "Request should fail in half-open state")
assert.Equal(t, http.StatusServiceUnavailable, status, "Request should fail in half-open state")
assert.Equal(t, upstreamCallsBefore+1, upstreamCalls.Load(), "Request should reach upstream in half-open state")

// Circuit should be open again - next request should be rejected immediately
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestCircuitBreaker_HalfOpenMaxRequests(t *testing.T) {
expectProvider: config.ProviderAnthropic,
expectEndpoint: "/v1/messages",
expectModel: "claude-sonnet-4-20250514",
errorBody: anthropicRateLimitError,
errorBody: anthropicOverloadedError,
successBody: anthropicSuccessResponse("claude-sonnet-4-20250514"),
requestBody: `{"model":"claude-sonnet-4-20250514","max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`,
headers: http.Header{
Expand All @@ -384,7 +384,7 @@ func TestCircuitBreaker_HalfOpenMaxRequests(t *testing.T) {
expectProvider: config.ProviderOpenAI,
expectEndpoint: "/v1/chat/completions",
expectModel: "gpt-4o",
errorBody: openAIRateLimitError,
errorBody: openAIOverloadedError,
successBody: openAISuccessResponse("gpt-4o"),
requestBody: `{"model":"gpt-4o","messages":[{"role":"user","content":"hi"}]}`,
headers: http.Header{"Authorization": {"Bearer test-key"}},
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestCircuitBreaker_HalfOpenMaxRequests(t *testing.T) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("x-should-retry", "false")
if shouldFail.Load() {
w.WriteHeader(http.StatusTooManyRequests)
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(tc.errorBody))
} else {
// Slow response to ensure requests overlap
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestCircuitBreaker_HalfOpenMaxRequests(t *testing.T) {
// Phase 1: Trip the circuit
for i := uint32(0); i < cbConfig.FailureThreshold; i++ {
status := doRequest()
assert.Equal(t, http.StatusTooManyRequests, status)
assert.Equal(t, http.StatusServiceUnavailable, status)
}

// Verify circuit is open
Expand Down Expand Up @@ -529,8 +529,8 @@ func TestCircuitBreaker_PerModelIsolation(t *testing.T) {
if strings.Contains(string(body), "claude-sonnet-4-20250514") {
sonnetCalls.Add(1)
if sonnetShouldFail.Load() {
w.WriteHeader(http.StatusTooManyRequests)
_, _ = w.Write([]byte(anthropicRateLimitError))
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(anthropicOverloadedError))
} else {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(anthropicSuccessResponse("claude-sonnet-4-20250514")))
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestCircuitBreaker_PerModelIsolation(t *testing.T) {
// Phase 1: Trip the circuit for sonnet model
for i := uint32(0); i < cbConfig.FailureThreshold; i++ {
status := doRequest("claude-sonnet-4-20250514")
assert.Equal(t, http.StatusTooManyRequests, status)
assert.Equal(t, http.StatusServiceUnavailable, status)
}
//nolint:gosec // G115: test constant, no overflow risk
assert.Equal(t, int32(cbConfig.FailureThreshold), sonnetCalls.Load())
Expand Down
2 changes: 1 addition & 1 deletion provider/anthropic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func Test_anthropicIsFailure(t *testing.T) {
{http.StatusOK, false},
{http.StatusBadRequest, false},
{http.StatusUnauthorized, false},
{http.StatusTooManyRequests, true}, // 429
{http.StatusTooManyRequests, false}, // 429 — handled by key failover, not circuit breaker
{http.StatusInternalServerError, false},
{http.StatusBadGateway, false},
{http.StatusServiceUnavailable, true}, // 503
Expand Down
Loading