From dc47a2603ef4da9989b8b3c819919e6392be7e83 Mon Sep 17 00:00:00 2001 From: Yevhenii Shcherbina Date: Fri, 17 Apr 2026 15:14:09 +0000 Subject: [PATCH 1/6] feat: add bedrock provider --- api.go | 6 + config/config.go | 12 ++ intercept/bedrock/interceptor.go | 184 +++++++++++++++++++++++++++++++ provider/bedrock.go | 162 +++++++++++++++++++++++++++ 4 files changed, 364 insertions(+) create mode 100644 intercept/bedrock/interceptor.go create mode 100644 provider/bedrock.go diff --git a/api.go b/api.go index f8fc60f1..3669ae24 100644 --- a/api.go +++ b/api.go @@ -19,6 +19,7 @@ const ( ProviderAnthropic = config.ProviderAnthropic ProviderOpenAI = config.ProviderOpenAI ProviderCopilot = config.ProviderCopilot + ProviderBedrock = config.ProviderBedrock ) type ( @@ -37,6 +38,7 @@ type ( AnthropicConfig = config.Anthropic AWSBedrockConfig = config.AWSBedrock + BedrockConfig = config.Bedrock OpenAIConfig = config.OpenAI CopilotConfig = config.Copilot ) @@ -57,6 +59,10 @@ func NewCopilotProvider(cfg config.Copilot) provider.Provider { return provider.NewCopilot(cfg) } +func NewBedrockProvider(cfg config.Bedrock) provider.Provider { + return provider.NewBedrock(cfg) +} + func NewMetrics(reg prometheus.Registerer) *metrics.Metrics { return metrics.NewMetrics(reg) } diff --git a/config/config.go b/config/config.go index 17ce01e4..1e3e9651 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,7 @@ const ( ProviderAnthropic = "anthropic" ProviderOpenAI = "openai" ProviderCopilot = "copilot" + ProviderBedrock = "bedrock" ) type Anthropic struct { @@ -34,6 +35,17 @@ type AWSBedrock struct { BaseURL string } +// Bedrock is a standalone Bedrock provider configuration. It acts as a +// SigV4-signing reverse proxy, forwarding native Bedrock API requests +// to AWS and adding centralized AWS credentials. +type Bedrock struct { + // Name is the provider instance name. If empty, defaults to "bedrock". + Name string + APIDumpDir string + CircuitBreaker *CircuitBreaker + AWSBedrock // Region, AccessKey, AccessKeySecret, SessionToken, Model, SmallFastModel, BaseURL +} + type OpenAI struct { // Name is the provider instance name. If empty, defaults to "openai". Name string diff --git a/intercept/bedrock/interceptor.go b/intercept/bedrock/interceptor.go new file mode 100644 index 00000000..b3150f2b --- /dev/null +++ b/intercept/bedrock/interceptor.go @@ -0,0 +1,184 @@ +// Package bedrock provides a SigV4-signing reverse proxy interceptor +// for native Bedrock API requests. It forwards requests to AWS Bedrock +// with centralized AWS credentials. +package bedrock + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "io" + "net/http" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/xerrors" + + "cdr.dev/slog/v3" + "github.com/coder/aibridge/config" + "github.com/coder/aibridge/intercept" + "github.com/coder/aibridge/mcp" + "github.com/coder/aibridge/recorder" + "github.com/coder/aibridge/tracing" +) + +var _ intercept.Interceptor = &Interceptor{} + +// Interceptor is a SigV4-signing reverse proxy for native Bedrock API +// requests. It forwards the request body as-is, signs with centralized +// AWS credentials, and streams the response back to the client. +type Interceptor struct { + id uuid.UUID + modelID string + streaming bool + reqBody []byte + originalPath string // e.g. /model/us.anthropic.claude-sonnet-4-6/invoke-with-response-stream + bedrockCfg config.AWSBedrock + providerName string + tracer trace.Tracer + credential intercept.CredentialInfo + + logger slog.Logger + recorder recorder.Recorder +} + +func NewInterceptor( + id uuid.UUID, + modelID string, + streaming bool, + reqBody []byte, + originalPath string, + bedrockCfg config.AWSBedrock, + providerName string, + tracer trace.Tracer, + cred intercept.CredentialInfo, +) *Interceptor { + return &Interceptor{ + id: id, + modelID: modelID, + streaming: streaming, + reqBody: reqBody, + originalPath: originalPath, + bedrockCfg: bedrockCfg, + providerName: providerName, + tracer: tracer, + credential: cred, + } +} + +func (i *Interceptor) ID() uuid.UUID { return i.id } +func (i *Interceptor) Model() string { return i.modelID } + +func (i *Interceptor) Setup(logger slog.Logger, rec recorder.Recorder, _ mcp.ServerProxier) { + i.logger = logger + i.recorder = rec +} + +func (i *Interceptor) Streaming() bool { return i.streaming } +func (i *Interceptor) Credential() intercept.CredentialInfo { return i.credential } +func (i *Interceptor) CorrelatingToolCallID() *string { return nil } + +func (i *Interceptor) TraceAttributes(r *http.Request) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(tracing.Provider, i.providerName), + attribute.String(tracing.Model, i.modelID), + attribute.Bool(tracing.Streaming, i.streaming), + attribute.Bool(tracing.IsBedrock, true), + } +} + +// ProcessRequest signs the request with centralized AWS credentials +// and proxies it to the real Bedrock endpoint. +func (i *Interceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + _, span := i.tracer.Start(ctx, "bedrock.ProcessRequest") + defer span.End() + + baseURL := i.bedrockCfg.BaseURL + if baseURL == "" { + baseURL = "https://bedrock-runtime." + i.bedrockCfg.Region + ".amazonaws.com" + } + + targetURL := baseURL + i.originalPath + + outReq, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(i.reqBody)) + if err != nil { + return xerrors.Errorf("create outbound request: %w", err) + } + outReq.Header.Set("Content-Type", "application/json") + + awsCreds, err := i.loadCredentials(ctx) + if err != nil { + return xerrors.Errorf("load AWS credentials: %w", err) + } + + hash := sha256.Sum256(i.reqBody) + signer := v4.NewSigner() + err = signer.SignHTTP(ctx, awsCreds, outReq, hex.EncodeToString(hash[:]), "bedrock", i.bedrockCfg.Region, time.Now()) + if err != nil { + return xerrors.Errorf("sign request: %w", err) + } + + resp, err := http.DefaultClient.Do(outReq) + if err != nil { + return xerrors.Errorf("send request to bedrock: %w", err) + } + defer resp.Body.Close() + + // Copy response headers. + for key, values := range resp.Header { + for _, val := range values { + w.Header().Add(key, val) + } + } + w.WriteHeader(resp.StatusCode) + + // Stream the response body to the client. + _, err = io.Copy(w, resp.Body) + if err != nil { + return xerrors.Errorf("copy response body: %w", err) + } + + return nil +} + +// loadCredentials resolves AWS credentials from the config. Static +// credentials are used when both AccessKey and AccessKeySecret are +// set; otherwise the SDK default credential chain is used. +func (i *Interceptor) loadCredentials(ctx context.Context) (aws.Credentials, error) { + loadOpts := []func(*awsconfig.LoadOptions) error{ + awsconfig.WithRegion(i.bedrockCfg.Region), + } + + switch { + case i.bedrockCfg.AccessKey != "" && i.bedrockCfg.AccessKeySecret != "": + loadOpts = append(loadOpts, awsconfig.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider( + i.bedrockCfg.AccessKey, + i.bedrockCfg.AccessKeySecret, + i.bedrockCfg.SessionToken, + ), + )) + case i.bedrockCfg.AccessKey != "" || i.bedrockCfg.AccessKeySecret != "": + return aws.Credentials{}, xerrors.New("both access key and access key secret must be provided together") + } + + cfg, err := awsconfig.LoadDefaultConfig(ctx, loadOpts...) + if err != nil { + return aws.Credentials{}, xerrors.Errorf("load AWS config: %w", err) + } + + creds, err := cfg.Credentials.Retrieve(ctx) + if err != nil { + return aws.Credentials{}, xerrors.Errorf("retrieve AWS credentials: %w", err) + } + + return creds, nil +} diff --git a/provider/bedrock.go b/provider/bedrock.go new file mode 100644 index 00000000..7c28de22 --- /dev/null +++ b/provider/bedrock.go @@ -0,0 +1,162 @@ +package provider + +import ( + "fmt" + "io" + "net/http" + "os" + "strings" + + "github.com/google/uuid" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "golang.org/x/xerrors" + + "github.com/coder/aibridge/config" + "github.com/coder/aibridge/intercept" + bedrockintercept "github.com/coder/aibridge/intercept/bedrock" + "github.com/coder/aibridge/tracing" +) + +var _ Provider = &Bedrock{} + +// Bedrock is a standalone Bedrock provider that accepts native Bedrock +// API requests and proxies them to AWS with centralized SigV4 signing. +type Bedrock struct { + cfg config.Bedrock +} + +func NewBedrock(cfg config.Bedrock) *Bedrock { + if cfg.Name == "" { + cfg.Name = config.ProviderBedrock + } + if cfg.APIDumpDir == "" { + cfg.APIDumpDir = os.Getenv("BRIDGE_DUMP_DIR") + } + if cfg.CircuitBreaker != nil { + // Bedrock returns Anthropic-compatible errors for Claude models, + // and also returns 429/503 for rate limiting across all models. + cfg.CircuitBreaker.IsFailure = anthropicIsFailure + cfg.CircuitBreaker.OpenErrorResponse = anthropicOpenErrorResponse + } + + return &Bedrock{cfg: cfg} +} + +func (*Bedrock) Type() string { + return config.ProviderBedrock +} + +func (p *Bedrock) Name() string { + return p.cfg.Name +} + +func (p *Bedrock) BaseURL() string { + if p.cfg.AWSBedrock.BaseURL != "" { + return p.cfg.AWSBedrock.BaseURL + } + return fmt.Sprintf("https://bedrock-runtime.%s.amazonaws.com", p.cfg.Region) +} + +func (p *Bedrock) RoutePrefix() string { + return fmt.Sprintf("/%s", p.Name()) +} + +// BridgedRoutes returns a prefix pattern that catches all Bedrock +// invoke paths: /model/{modelId}/invoke and +// /model/{modelId}/invoke-with-response-stream. +func (*Bedrock) BridgedRoutes() []string { + return []string{"/model/"} +} + +// PassthroughRoutes returns an empty slice. All Bedrock requests +// require SigV4 signing which cannot be done via simple header +// injection. +func (*Bedrock) PassthroughRoutes() []string { + return nil +} + +func (*Bedrock) AuthHeader() string { + return "Authorization" +} + +// InjectAuthHeader is a no-op for Bedrock. Authentication is handled +// by SigV4 signing inside the interceptor, not via header injection. +func (*Bedrock) InjectAuthHeader(_ *http.Header) {} + +func (p *Bedrock) CircuitBreakerConfig() *config.CircuitBreaker { + return p.cfg.CircuitBreaker +} + +func (p *Bedrock) APIDumpDir() string { + return p.cfg.APIDumpDir +} + +// parseBedrockPath extracts the model ID and streaming flag from a +// Bedrock invoke path. Expected format: +// +// /model/{modelId}/invoke +// /model/{modelId}/invoke-with-response-stream +func parseBedrockPath(path string) (modelID string, streaming bool, err error) { + // path should be like /model/us.anthropic.claude-sonnet-4-6/invoke-with-response-stream + const modelPrefix = "/model/" + if !strings.HasPrefix(path, modelPrefix) { + return "", false, xerrors.Errorf("path does not start with %s: %s", modelPrefix, path) + } + + rest := path[len(modelPrefix):] + // rest = "us.anthropic.claude-sonnet-4-6/invoke-with-response-stream" + + switch { + case strings.HasSuffix(rest, "/invoke-with-response-stream"): + modelID = strings.TrimSuffix(rest, "/invoke-with-response-stream") + streaming = true + case strings.HasSuffix(rest, "/invoke"): + modelID = strings.TrimSuffix(rest, "/invoke") + streaming = false + default: + return "", false, xerrors.Errorf("path does not end with /invoke or /invoke-with-response-stream: %s", path) + } + + if modelID == "" { + return "", false, xerrors.Errorf("empty model ID in path: %s", path) + } + + return modelID, streaming, nil +} + +func (p *Bedrock) CreateInterceptor(_ http.ResponseWriter, r *http.Request, tracer trace.Tracer) (_ intercept.Interceptor, outErr error) { + id := uuid.New() + _, span := tracer.Start(r.Context(), "Intercept.CreateInterceptor") + defer tracing.EndSpanErr(span, &outErr) + + path := strings.TrimPrefix(r.URL.Path, p.RoutePrefix()) + + modelID, streaming, err := parseBedrockPath(path) + if err != nil { + span.SetStatus(codes.Error, "unknown route: "+r.URL.Path) + return nil, ErrUnknownRoute + } + + body, err := io.ReadAll(r.Body) + if err != nil { + return nil, xerrors.Errorf("read body: %w", err) + } + + cred := intercept.NewCredentialInfo(intercept.CredentialKindCentralized, "") + + interceptor := bedrockintercept.NewInterceptor( + id, + modelID, + streaming, + body, + path, + p.cfg.AWSBedrock, + p.Name(), + tracer, + cred, + ) + + span.SetAttributes(interceptor.TraceAttributes(r)...) + return interceptor, nil +} From 9a2371e00731e2d14ac92ff4943fdfe64481b2a7 Mon Sep 17 00:00:00 2001 From: Yevhenii Shcherbina Date: Tue, 21 Apr 2026 13:02:22 +0000 Subject: [PATCH 2/6] feat: forward headers --- intercept/bedrock/interceptor.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/intercept/bedrock/interceptor.go b/intercept/bedrock/interceptor.go index b3150f2b..67116de2 100644 --- a/intercept/bedrock/interceptor.go +++ b/intercept/bedrock/interceptor.go @@ -13,9 +13,9 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" - v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/google/uuid" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -74,14 +74,14 @@ func NewInterceptor( } func (i *Interceptor) ID() uuid.UUID { return i.id } -func (i *Interceptor) Model() string { return i.modelID } +func (i *Interceptor) Model() string { return i.modelID } func (i *Interceptor) Setup(logger slog.Logger, rec recorder.Recorder, _ mcp.ServerProxier) { i.logger = logger i.recorder = rec } -func (i *Interceptor) Streaming() bool { return i.streaming } +func (i *Interceptor) Streaming() bool { return i.streaming } func (i *Interceptor) Credential() intercept.CredentialInfo { return i.credential } func (i *Interceptor) CorrelatingToolCallID() *string { return nil } @@ -112,6 +112,10 @@ func (i *Interceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) err if err != nil { return xerrors.Errorf("create outbound request: %w", err) } + + // Forward client headers with hop-by-hop, transport, and auth + // headers stripped. SigV4 signing below will add Authorization. + outReq.Header = intercept.PrepareClientHeaders(r.Header) outReq.Header.Set("Content-Type", "application/json") awsCreds, err := i.loadCredentials(ctx) @@ -126,6 +130,21 @@ func (i *Interceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) err return xerrors.Errorf("sign request: %w", err) } + i.logger.Warn( + r.Context(), "Bedrock OUT SIGNED Request", + slog.F("signedOutReq.Method", outReq.Method), + slog.F("signedOutReq.URL.String()", outReq.URL.String()), + slog.F("signedOutReq.Host", outReq.Host), + slog.F("len(signedOutReq.Header)", len(outReq.Header)), + ) + + for name, vals := range outReq.Header { + i.logger.Warn( + r.Context(), "Bedrock SIGNED OUT Header", + slog.F(name, vals), + ) + } + resp, err := http.DefaultClient.Do(outReq) if err != nil { return xerrors.Errorf("send request to bedrock: %w", err) From 8033e5f25252b2ada341a26f4c7a8ec36f860a35 Mon Sep 17 00:00:00 2001 From: Yevhenii Shcherbina Date: Wed, 22 Apr 2026 14:52:31 +0000 Subject: [PATCH 3/6] initial impl of audit --- intercept/bedrock/interceptor.go | 287 ++++++++++++++++++++++---- intercept/messages/blocking.go | 2 +- intercept/messages/reqpayload.go | 4 +- intercept/messages/reqpayload_test.go | 2 +- intercept/messages/streaming.go | 2 +- 5 files changed, 257 insertions(+), 40 deletions(-) diff --git a/intercept/bedrock/interceptor.go b/intercept/bedrock/interceptor.go index 67116de2..a9a05eea 100644 --- a/intercept/bedrock/interceptor.go +++ b/intercept/bedrock/interceptor.go @@ -1,22 +1,28 @@ // Package bedrock provides a SigV4-signing reverse proxy interceptor // for native Bedrock API requests. It forwards requests to AWS Bedrock -// with centralized AWS credentials. +// with centralized AWS credentials and extracts audit metadata from +// the response stream. package bedrock import ( "bytes" "context" "crypto/sha256" + "encoding/base64" "encoding/hex" + "encoding/json" "io" "net/http" "time" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream" + "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi" v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/google/uuid" + "github.com/tidwall/gjson" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/xerrors" @@ -24,6 +30,7 @@ import ( "cdr.dev/slog/v3" "github.com/coder/aibridge/config" "github.com/coder/aibridge/intercept" + "github.com/coder/aibridge/intercept/messages" "github.com/coder/aibridge/mcp" "github.com/coder/aibridge/recorder" "github.com/coder/aibridge/tracing" @@ -33,13 +40,13 @@ var _ intercept.Interceptor = &Interceptor{} // Interceptor is a SigV4-signing reverse proxy for native Bedrock API // requests. It forwards the request body as-is, signs with centralized -// AWS credentials, and streams the response back to the client. +// AWS credentials, and extracts audit metadata from the response. type Interceptor struct { id uuid.UUID modelID string streaming bool reqBody []byte - originalPath string // e.g. /model/us.anthropic.claude-sonnet-4-6/invoke-with-response-stream + originalPath string bedrockCfg config.AWSBedrock providerName string tracer trace.Tracer @@ -94,27 +101,28 @@ func (i *Interceptor) TraceAttributes(r *http.Request) []attribute.KeyValue { } } -// ProcessRequest signs the request with centralized AWS credentials -// and proxies it to the real Bedrock endpoint. func (i *Interceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) error { ctx := r.Context() _, span := i.tracer.Start(ctx, "bedrock.ProcessRequest") defer span.End() + // Extract user prompt before sending the request. + var promptText string + var promptFound bool + if reqPayload, err := messages.NewRequestPayload(i.reqBody); err == nil { + promptText, promptFound, _ = reqPayload.LastUserPrompt() + } + baseURL := i.bedrockCfg.BaseURL if baseURL == "" { baseURL = "https://bedrock-runtime." + i.bedrockCfg.Region + ".amazonaws.com" } - targetURL := baseURL + i.originalPath - - outReq, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bytes.NewReader(i.reqBody)) + outReq, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+i.originalPath, bytes.NewReader(i.reqBody)) if err != nil { return xerrors.Errorf("create outbound request: %w", err) } - // Forward client headers with hop-by-hop, transport, and auth - // headers stripped. SigV4 signing below will add Authorization. outReq.Header = intercept.PrepareClientHeaders(r.Header) outReq.Header.Set("Content-Type", "application/json") @@ -125,33 +133,16 @@ func (i *Interceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) err hash := sha256.Sum256(i.reqBody) signer := v4.NewSigner() - err = signer.SignHTTP(ctx, awsCreds, outReq, hex.EncodeToString(hash[:]), "bedrock", i.bedrockCfg.Region, time.Now()) - if err != nil { + if err = signer.SignHTTP(ctx, awsCreds, outReq, hex.EncodeToString(hash[:]), "bedrock", i.bedrockCfg.Region, time.Now()); err != nil { return xerrors.Errorf("sign request: %w", err) } - i.logger.Warn( - r.Context(), "Bedrock OUT SIGNED Request", - slog.F("signedOutReq.Method", outReq.Method), - slog.F("signedOutReq.URL.String()", outReq.URL.String()), - slog.F("signedOutReq.Host", outReq.Host), - slog.F("len(signedOutReq.Header)", len(outReq.Header)), - ) - - for name, vals := range outReq.Header { - i.logger.Warn( - r.Context(), "Bedrock SIGNED OUT Header", - slog.F(name, vals), - ) - } - resp, err := http.DefaultClient.Do(outReq) if err != nil { return xerrors.Errorf("send request to bedrock: %w", err) } defer resp.Body.Close() - // Copy response headers. for key, values := range resp.Header { for _, val := range values { w.Header().Add(key, val) @@ -159,18 +150,244 @@ func (i *Interceptor) ProcessRequest(w http.ResponseWriter, r *http.Request) err } w.WriteHeader(resp.StatusCode) - // Stream the response body to the client. - _, err = io.Copy(w, resp.Body) - if err != nil { - return xerrors.Errorf("copy response body: %w", err) + if resp.StatusCode != http.StatusOK { + _, _ = io.Copy(w, resp.Body) + return nil + } + + // Buffer the response while streaming to the client so we can + // parse it for audit data after the stream completes. + var auditBuf bytes.Buffer + tee := io.TeeReader(resp.Body, &auditBuf) + + if i.streaming { + flusher, ok := w.(http.Flusher) + buf := make([]byte, 32*1024) + for { + n, readErr := tee.Read(buf) + if n > 0 { + if _, writeErr := w.Write(buf[:n]); writeErr != nil { + return xerrors.Errorf("write streaming chunk: %w", writeErr) + } + if ok { + flusher.Flush() + } + } + if readErr != nil { + if readErr == io.EOF { + break + } + return xerrors.Errorf("read streaming chunk: %w", readErr) + } + } + } else { + if _, err = io.Copy(w, tee); err != nil { + return xerrors.Errorf("copy response body: %w", err) + } + } + + // Extract audit metadata from the buffered response. + if i.streaming { + i.extractStreamingAudit(ctx, auditBuf.Bytes(), promptText, promptFound) + } else { + i.extractBlockingAudit(ctx, auditBuf.Bytes(), promptText, promptFound) } return nil } -// loadCredentials resolves AWS credentials from the config. Static -// credentials are used when both AccessKey and AccessKeySecret are -// set; otherwise the SDK default credential chain is used. +// extractStreamingAudit parses a buffered AWS EventStream response +// and records audit metadata. +func (i *Interceptor) extractStreamingAudit(ctx context.Context, data []byte, promptText string, promptFound bool) { + decoder := eventstream.NewDecoder() + reader := bytes.NewReader(data) + + var msgID string + + // Accumulators for content blocks indexed by block position. + type toolBlock struct { + id string + name string + args bytes.Buffer + } + var toolBlocks []toolBlock + thinkingBlocks := map[int]*bytes.Buffer{} + blockTypes := map[int]string{} + + for { + msg, err := decoder.Decode(reader, nil) + if err != nil { + break + } + + messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader) + if messageType == nil || messageType.String() != eventstreamapi.EventMessageType { + continue + } + eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader) + if eventType == nil || eventType.String() != "chunk" { + continue + } + + var chunk struct { + Bytes string `json:"bytes"` + } + if err := json.Unmarshal(msg.Payload, &chunk); err != nil { + continue + } + decoded, err := base64.StdEncoding.DecodeString(chunk.Bytes) + if err != nil { + continue + } + + eventKind := gjson.GetBytes(decoded, "type").String() + + switch eventKind { + case "message_start": + msgID = gjson.GetBytes(decoded, "message.id").String() + usage := gjson.GetBytes(decoded, "message.usage") + if usage.Exists() { + _ = i.recorder.RecordTokenUsage(ctx, &recorder.TokenUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + Input: usage.Get("input_tokens").Int(), + Output: usage.Get("output_tokens").Int(), + CacheReadInputTokens: usage.Get("cache_read_input_tokens").Int(), + CacheWriteInputTokens: usage.Get("cache_creation_input_tokens").Int(), + }) + } + if promptFound { + _ = i.recorder.RecordPromptUsage(ctx, &recorder.PromptUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + Prompt: promptText, + }) + promptFound = false + } + + case "message_delta": + usage := gjson.GetBytes(decoded, "usage") + if usage.Exists() { + _ = i.recorder.RecordTokenUsage(ctx, &recorder.TokenUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + Output: usage.Get("output_tokens").Int(), + }) + } + + case "content_block_start": + idx := int(gjson.GetBytes(decoded, "index").Int()) + blockType := gjson.GetBytes(decoded, "content_block.type").String() + blockTypes[idx] = blockType + + if blockType == "tool_use" { + toolBlocks = append(toolBlocks, toolBlock{ + id: gjson.GetBytes(decoded, "content_block.id").String(), + name: gjson.GetBytes(decoded, "content_block.name").String(), + }) + } + if blockType == "thinking" { + thinkingBlocks[idx] = &bytes.Buffer{} + } + + case "content_block_delta": + idx := int(gjson.GetBytes(decoded, "index").Int()) + switch blockTypes[idx] { + case "tool_use": + partialJSON := gjson.GetBytes(decoded, "delta.partial_json").String() + for ti := range toolBlocks { + if toolBlocks[ti].id != "" { + toolBlocks[len(toolBlocks)-1].args.WriteString(partialJSON) + break + } + } + case "thinking": + if buf, ok := thinkingBlocks[idx]; ok { + buf.WriteString(gjson.GetBytes(decoded, "delta.thinking").String()) + } + } + + case "message_stop": + for _, tb := range toolBlocks { + var args json.RawMessage + if tb.args.Len() > 0 { + args = json.RawMessage(tb.args.Bytes()) + } + _ = i.recorder.RecordToolUsage(ctx, &recorder.ToolUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + ToolCallID: tb.id, + Tool: tb.name, + Args: args, + Injected: false, + }) + } + for _, buf := range thinkingBlocks { + if buf.Len() > 0 { + _ = i.recorder.RecordModelThought(ctx, &recorder.ModelThoughtRecord{ + InterceptionID: i.id.String(), + Content: buf.String(), + Metadata: recorder.Metadata{"source": recorder.ThoughtSourceThinking}, + }) + } + } + } + } +} + +// extractBlockingAudit parses a JSON response body and records audit +// metadata. +func (i *Interceptor) extractBlockingAudit(ctx context.Context, data []byte, promptText string, promptFound bool) { + msgID := gjson.GetBytes(data, "id").String() + + if promptFound { + _ = i.recorder.RecordPromptUsage(ctx, &recorder.PromptUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + Prompt: promptText, + }) + } + + usage := gjson.GetBytes(data, "usage") + if usage.Exists() { + _ = i.recorder.RecordTokenUsage(ctx, &recorder.TokenUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + Input: usage.Get("input_tokens").Int(), + Output: usage.Get("output_tokens").Int(), + CacheReadInputTokens: usage.Get("cache_read_input_tokens").Int(), + CacheWriteInputTokens: usage.Get("cache_creation_input_tokens").Int(), + }) + } + + content := gjson.GetBytes(data, "content") + if content.IsArray() { + content.ForEach(func(_, block gjson.Result) bool { + switch block.Get("type").String() { + case "tool_use": + _ = i.recorder.RecordToolUsage(ctx, &recorder.ToolUsageRecord{ + InterceptionID: i.id.String(), + MsgID: msgID, + ToolCallID: block.Get("id").String(), + Tool: block.Get("name").String(), + Args: json.RawMessage(block.Get("input").Raw), + Injected: false, + }) + case "thinking": + thinking := block.Get("thinking").String() + if thinking != "" { + _ = i.recorder.RecordModelThought(ctx, &recorder.ModelThoughtRecord{ + InterceptionID: i.id.String(), + Content: thinking, + Metadata: recorder.Metadata{"source": recorder.ThoughtSourceThinking}, + }) + } + } + return true + }) + } +} + func (i *Interceptor) loadCredentials(ctx context.Context) (aws.Credentials, error) { loadOpts := []func(*awsconfig.LoadOptions) error{ awsconfig.WithRegion(i.bedrockCfg.Region), diff --git a/intercept/messages/blocking.go b/intercept/messages/blocking.go index 7fb3f562..277e769f 100644 --- a/intercept/messages/blocking.go +++ b/intercept/messages/blocking.go @@ -77,7 +77,7 @@ func (i *BlockingInterception) ProcessRequest(w http.ResponseWriter, r *http.Req i.injectTools() var prompt *string - promptText, promptFound, promptErr := i.reqPayload.lastUserPrompt() + promptText, promptFound, promptErr := i.reqPayload.LastUserPrompt() if promptErr != nil { i.logger.Warn(ctx, "failed to retrieve last user prompt", slog.Error(promptErr)) } else if promptFound { diff --git a/intercept/messages/reqpayload.go b/intercept/messages/reqpayload.go index dfe52fc8..6374ae92 100644 --- a/intercept/messages/reqpayload.go +++ b/intercept/messages/reqpayload.go @@ -144,10 +144,10 @@ func (p RequestPayload) correlatingToolCallID() *string { return nil } -// lastUserPrompt returns the prompt text from the last user message. If no prompt +// LastUserPrompt returns the prompt text from the last user message. If no prompt // is found, it returns empty string, false, nil. Unexpected shapes are treated as // unsupported and do not fail the request path. -func (p RequestPayload) lastUserPrompt() (string, bool, error) { +func (p RequestPayload) LastUserPrompt() (string, bool, error) { messages := gjson.GetBytes(p, messagesReqPathMessages) if !messages.Exists() || messages.Type == gjson.Null { return "", false, nil diff --git a/intercept/messages/reqpayload_test.go b/intercept/messages/reqpayload_test.go index a5de61f8..d73eaeca 100644 --- a/intercept/messages/reqpayload_test.go +++ b/intercept/messages/reqpayload_test.go @@ -216,7 +216,7 @@ func TestRequestPayloadLastUserPrompt(t *testing.T) { t.Parallel() payload := mustMessagesPayload(t, testCase.requestBody) - prompt, found, err := payload.lastUserPrompt() + prompt, found, err := payload.LastUserPrompt() if testCase.expectError { require.Error(t, err) return diff --git a/intercept/messages/streaming.go b/intercept/messages/streaming.go index d62441a9..0c4d0dbe 100644 --- a/intercept/messages/streaming.go +++ b/intercept/messages/streaming.go @@ -111,7 +111,7 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re err error ) - prompt, promptFound, err = i.reqPayload.lastUserPrompt() + prompt, promptFound, err = i.reqPayload.LastUserPrompt() if err != nil { logger.Warn(ctx, "failed to determine last user prompt", slog.Error(err)) } From 096d9f01470d3732396f50f959665ff8ef26918b Mon Sep 17 00:00:00 2001 From: Yevhenii Shcherbina Date: Fri, 24 Apr 2026 22:07:31 +0000 Subject: [PATCH 4/6] Added TestBedrockSimple (streaming) --- fixtures/bedrock/parse_eventstream.go | 88 ++++ fixtures/bedrock/simple.req.json | 10 + fixtures/bedrock/simple.resp.bin | Bin 0 -> 10302 bytes fixtures/bedrock/simple.resp.decoded | 511 +++++++++++++++++++++++ fixtures/fixtures.go | 17 + intercept/bedrock/interceptor.go | 45 +- internal/integrationtest/bridge_test.go | 48 +++ internal/integrationtest/helpers.go | 14 + internal/integrationtest/mockupstream.go | 18 + internal/integrationtest/setupbridge.go | 11 +- internal/integrationtest/trace_test.go | 8 +- provider/bedrock.go | 1 + 12 files changed, 761 insertions(+), 10 deletions(-) create mode 100644 fixtures/bedrock/parse_eventstream.go create mode 100644 fixtures/bedrock/simple.req.json create mode 100644 fixtures/bedrock/simple.resp.bin create mode 100644 fixtures/bedrock/simple.resp.decoded diff --git a/fixtures/bedrock/parse_eventstream.go b/fixtures/bedrock/parse_eventstream.go new file mode 100644 index 00000000..51ca8529 --- /dev/null +++ b/fixtures/bedrock/parse_eventstream.go @@ -0,0 +1,88 @@ +//go:build ignore + +// Usage: go run parse_eventstream.go +// +// Decodes an AWS EventStream binary file and prints each frame's +// decoded JSON body. Use this to inspect captured Bedrock responses +// and verify fixture contents. +package main + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "os" + + "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream" + "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream/eventstreamapi" +) + +func main() { + if len(os.Args) < 2 { + fmt.Fprintf(os.Stderr, "usage: go run parse_eventstream.go \n") + os.Exit(1) + } + + data, err := os.ReadFile(os.Args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "read file: %v\n", err) + os.Exit(1) + } + + decoder := eventstream.NewDecoder() + reader := bytes.NewReader(data) + frameNum := 0 + + for { + msg, err := decoder.Decode(reader, nil) + if err != nil { + break + } + frameNum++ + + messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader) + eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader) + + fmt.Printf("=== Frame %d ===\n", frameNum) + fmt.Printf(" message-type: %s\n", headerStr(messageType)) + fmt.Printf(" event-type: %s\n", headerStr(eventType)) + + if headerStr(eventType) != "chunk" { + fmt.Printf(" payload: %s\n\n", string(msg.Payload)) + continue + } + + var chunk struct { + Bytes string `json:"bytes"` + } + if err := json.Unmarshal(msg.Payload, &chunk); err != nil { + fmt.Printf(" unmarshal error: %v\n\n", err) + continue + } + + decoded, err := base64.StdEncoding.DecodeString(chunk.Bytes) + if err != nil { + fmt.Printf(" base64 decode error: %v\n\n", err) + continue + } + + var pretty json.RawMessage + if err := json.Unmarshal(decoded, &pretty); err != nil { + fmt.Printf(" json: %s\n\n", string(decoded)) + continue + } + + indented, _ := json.MarshalIndent(pretty, " ", " ") + fmt.Printf(" body:\n %s\n\n", string(indented)) + } + + fmt.Printf("Total frames: %d\n", frameNum) +} + +func headerStr(h eventstream.Value) string { + if h == nil { + return "" + } + return h.String() +} diff --git a/fixtures/bedrock/simple.req.json b/fixtures/bedrock/simple.req.json new file mode 100644 index 00000000..b61b2ac2 --- /dev/null +++ b/fixtures/bedrock/simple.req.json @@ -0,0 +1,10 @@ +{ + "anthropic_version": "bedrock-2023-05-31", + "max_tokens": 50, + "messages": [ + { + "role": "user", + "content": "how many angels can dance on the head of a pin" + } + ] +} diff --git a/fixtures/bedrock/simple.resp.bin b/fixtures/bedrock/simple.resp.bin new file mode 100644 index 0000000000000000000000000000000000000000..056c54a028a12fdb51e472aa526e1b4c10225148 GIT binary patch literal 10302 zcmds7X^0zT7*4HXJ*xGhiYFgZZN28$tee%Uo6Tm9-AvqMcPR&MR)2`%`F>fqP2H@n>rrNZS+!;*dtm4QvrzX+i~G&8SKD)RNv2Y9Y%^1{ z%icM~YS~-d?`EsjOg=kxS^FNBBcoZ%R#897Hbp9{MjfG8whcvYnM~Z%j~5OrR67QZtz* z*E^;5y;aFm8L?!#wOk?=S0pwrjmi!s4OwxE*WFaiVUt7jut`}p!EV?V?=+27y)Dj} zFq}x2a|ZysRM41$qq3r-DXCmW2{$Y?m4oN6(UouucxiOKpWS}GV{!!t)}*N^eoj*8 znXcdKyo6HddR{KwunktGOkSo`HZ_r9s}xR^}Ji0A55@?lJT;E?s4Z_SX*B1fpR|N4` zA$D@05kvJ?%rs|G0>UHTwJrgmUlXurZ(Mpsz`@F;j3P4{l)K%MBf_Kaf7=A8pAxut zT>El4&~QygDjQ4}s0jBjPp*b+b{fHX{>}NP1B_EFko`_;MjKEKRSi~}fGRsYr>dwk zvoUNB!mF3Qr~yTv5SmWfcIlG=rb)T?l`q0;Z#sWFsNe_!{>WV`b_5taw5!+@l>AZGbR#u^C+#c0V)3Q=?3ID=^SK9M zJ6y1Y5OU5<3*`V4BIMItgw<8uF=wTQuKlugH#_$M7rzodB>u{ugUm-&;2cV1yO#*v zb`*pq{<`Nu4kr-!pRd}f1Q|YLNr|$Qc-7!be{WNrYM*u%j|kts{jHaQmw8e-rUzCJ zcL@`O?|C76I7r}lA_4Uxh6}I+#2QQ=ivPH8M!F*HZ-+Na*PI8md_$=D#=qc>AX8J+ z)hLxwBK2XrobM?Qj!)k;5PqW70J$$XgaGcld&6@91}x+)nADi=_?%J!!rQjK4k0l= zHcHQ&u|4z`1Er~qi0}*Rj#~wM>`$bzc^7kZprv8BeR(JxDlnX?YcNFZ)=8SSVIsU? z;nfYmzmsrq)QcCq8f*@fKkcL;{G+mntoVEwvk7@A6s|mPXKnG&X4!ZN!`8X$5Y2=Ia`I9A}8C^wmF1P z92j5$;}`H7e(&^$p9*joi{N#TO9G!N!*p!ghakesPkDO>#PCuA_0ESrj0YL2X*hme zhD}}I!Quio6H?6xoL>Rk1T>On8h}jS;SdhpKlTjJvXIcS=(U~u2bdNiZ?IxTXPmmG zlJ>ytXh8UlGd6Dq_!|iYTQ9$5*}kBlZTcZbAu!>TbTtoVp6K322!tPa<+6TQDu=x1 zQI8z9{y%@u4#U+{!U5Az$}#v`PK@kZ%4Cg^bi=g4`4KP| z?F&QG1el!uXmp|ze24J_;VU;C3cK8b4B`3<^wRSGlk44D&=k6mbtCl=MRastoY0tR z4fMf)VQA9WRiYh>7b_YxI*9r0cLUdW+YnZgCM;q;E0+D_;2v|8JZbKVfx)D0Mw7;f z5zMJuZlnld16GT0f`ktk&%$tRONtG&V4d6yR0`IR7=|R<`M)oXjW=dhGwx;3f=T Date: Sat, 25 Apr 2026 13:10:10 +0000 Subject: [PATCH 5/6] minor fixes in TestBedrockSimple --- internal/integrationtest/bridge_test.go | 19 +++++++++++-------- internal/integrationtest/setupbridge.go | 3 +-- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/internal/integrationtest/bridge_test.go b/internal/integrationtest/bridge_test.go index 6e7d32d1..24dc4e2a 100644 --- a/internal/integrationtest/bridge_test.go +++ b/internal/integrationtest/bridge_test.go @@ -832,6 +832,8 @@ func TestSimple(t *testing.T) { func TestBedrockSimple(t *testing.T) { t.Parallel() + const bedrockModel = "us.anthropic.claude-sonnet-4-5-20250929-v1:0" + ctx, cancel := context.WithTimeout(t.Context(), testutil.WaitLong) t.Cleanup(cancel) @@ -844,11 +846,10 @@ func TestBedrockSimple(t *testing.T) { Streaming: fix.Response, }) - bridgeServer := newBridgeTestServer(ctx, t, upstream.URL, - withProvider(config.ProviderBedrock), - ) + bridgeServer := newBridgeTestServer(ctx, t, upstream.URL) - resp, err := bridgeServer.makeRequest(t, http.MethodPost, pathBedrockInvokeStream, fix.Request) + path := "/bedrock/model/" + bedrockModel + "/invoke-with-response-stream" + resp, err := bridgeServer.makeRequest(t, http.MethodPost, path, fix.Request) require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusOK, resp.StatusCode) @@ -861,17 +862,19 @@ func TestBedrockSimple(t *testing.T) { // Verify the upstream received the request at the correct path. received := upstream.receivedRequests() require.Len(t, received, 1) - assert.Contains(t, received[0].Path, "/model/") - assert.Contains(t, received[0].Path, "/invoke-with-response-stream") + assert.Equal(t, "/model/"+bedrockModel+"/invoke-with-response-stream", received[0].Path) // Verify prompt was recorded. promptUsages := bridgeServer.Recorder.RecordedPromptUsages() require.NotEmpty(t, promptUsages, "no prompts tracked") assert.Contains(t, promptUsages[0].Prompt, "how many angels can dance on the head of a pin") - // Verify token usage was recorded (at least 1 record). + // Streaming produces 2 token records: message_start + message_delta. tokenUsages := bridgeServer.Recorder.RecordedTokenUsages() - require.GreaterOrEqual(t, len(tokenUsages), 1, "no token usage recorded") + require.Len(t, tokenUsages, 2) + assert.EqualValues(t, 18, bridgeServer.Recorder.TotalInputTokens(), "input tokens") + // message_start reports output_tokens=1, message_delta reports output_tokens=50. + assert.EqualValues(t, 51, bridgeServer.Recorder.TotalOutputTokens(), "output tokens") // Verify interception lifecycle. bridgeServer.Recorder.VerifyAllInterceptionsEnded(t) diff --git a/internal/integrationtest/setupbridge.go b/internal/integrationtest/setupbridge.go index 9358b067..fd017f1e 100644 --- a/internal/integrationtest/setupbridge.go +++ b/internal/integrationtest/setupbridge.go @@ -32,8 +32,6 @@ const ( pathCopilotChatCompletions = "/copilot/chat/completions" pathCopilotResponses = "/copilot/responses" - pathBedrockInvokeStream = "/bedrock/model/us.anthropic.claude-sonnet-4-5-20250929-v1:0/invoke-with-response-stream" - // providerBedrockAnthropic identifies a Bedrock-via-Anthropic provider. providerBedrockAnthropic = "bedrock-anthropic" @@ -164,6 +162,7 @@ func newBridgeTestServer( providers = []aibridge.Provider{ newDefaultProvider(config.ProviderAnthropic, upstreamURL), newDefaultProvider(config.ProviderOpenAI, upstreamURL), + newDefaultProvider(config.ProviderBedrock, upstreamURL), } } From fd62722d691ffd0d7d3b44ee180f0d2b9eeff874 Mon Sep 17 00:00:00 2001 From: Yevhenii Shcherbina Date: Sat, 25 Apr 2026 13:18:23 +0000 Subject: [PATCH 6/6] minor fix --- internal/integrationtest/bridge_test.go | 105 ++++++++++++------------ 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/internal/integrationtest/bridge_test.go b/internal/integrationtest/bridge_test.go index 24dc4e2a..4332fb50 100644 --- a/internal/integrationtest/bridge_test.go +++ b/internal/integrationtest/bridge_test.go @@ -829,57 +829,6 @@ func TestSimple(t *testing.T) { } } -func TestBedrockSimple(t *testing.T) { - t.Parallel() - - const bedrockModel = "us.anthropic.claude-sonnet-4-5-20250929-v1:0" - - ctx, cancel := context.WithTimeout(t.Context(), testutil.WaitLong) - t.Cleanup(cancel) - - fix := fixtures.BedrockFixture{ - Request: fixtures.BedrockSimpleReq, - Response: fixtures.BedrockSimpleResp, - } - - upstream := newMockUpstream(ctx, t, upstreamResponse{ - Streaming: fix.Response, - }) - - bridgeServer := newBridgeTestServer(ctx, t, upstream.URL) - - path := "/bedrock/model/" + bedrockModel + "/invoke-with-response-stream" - resp, err := bridgeServer.makeRequest(t, http.MethodPost, path, fix.Request) - require.NoError(t, err) - defer resp.Body.Close() - require.Equal(t, http.StatusOK, resp.StatusCode) - - // Verify non-empty response was forwarded to client. - bodyBytes, err := io.ReadAll(resp.Body) - require.NoError(t, err) - assert.NotEmpty(t, bodyBytes, "should have received response body") - - // Verify the upstream received the request at the correct path. - received := upstream.receivedRequests() - require.Len(t, received, 1) - assert.Equal(t, "/model/"+bedrockModel+"/invoke-with-response-stream", received[0].Path) - - // Verify prompt was recorded. - promptUsages := bridgeServer.Recorder.RecordedPromptUsages() - require.NotEmpty(t, promptUsages, "no prompts tracked") - assert.Contains(t, promptUsages[0].Prompt, "how many angels can dance on the head of a pin") - - // Streaming produces 2 token records: message_start + message_delta. - tokenUsages := bridgeServer.Recorder.RecordedTokenUsages() - require.Len(t, tokenUsages, 2) - assert.EqualValues(t, 18, bridgeServer.Recorder.TotalInputTokens(), "input tokens") - // message_start reports output_tokens=1, message_delta reports output_tokens=50. - assert.EqualValues(t, 51, bridgeServer.Recorder.TotalOutputTokens(), "output tokens") - - // Verify interception lifecycle. - bridgeServer.Recorder.VerifyAllInterceptionsEnded(t) -} - func TestSessionIDTracking(t *testing.T) { t.Parallel() @@ -2183,3 +2132,57 @@ func TestActorHeaders(t *testing.T) { } } } + +// Native Bedrock tests use the AWS EventStream binary protocol +// (application/vnd.amazon.eventstream) instead of Anthropic's SSE format. +// The provider acts as a SigV4-signing reverse proxy. +func TestNativeBedrockSimple(t *testing.T) { + t.Parallel() + + const bedrockModel = "us.anthropic.claude-sonnet-4-5-20250929-v1:0" + + ctx, cancel := context.WithTimeout(t.Context(), testutil.WaitLong) + t.Cleanup(cancel) + + fix := fixtures.BedrockFixture{ + Request: fixtures.BedrockSimpleReq, + Response: fixtures.BedrockSimpleResp, + } + + upstream := newMockUpstream(ctx, t, upstreamResponse{ + Streaming: fix.Response, + }) + + bridgeServer := newBridgeTestServer(ctx, t, upstream.URL) + + path := "/bedrock/model/" + bedrockModel + "/invoke-with-response-stream" + resp, err := bridgeServer.makeRequest(t, http.MethodPost, path, fix.Request) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + // Verify non-empty response was forwarded to client. + bodyBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.NotEmpty(t, bodyBytes, "should have received response body") + + // Verify the upstream received the request at the correct path. + received := upstream.receivedRequests() + require.Len(t, received, 1) + assert.Equal(t, "/model/"+bedrockModel+"/invoke-with-response-stream", received[0].Path) + + // Verify prompt was recorded. + promptUsages := bridgeServer.Recorder.RecordedPromptUsages() + require.NotEmpty(t, promptUsages, "no prompts tracked") + assert.Contains(t, promptUsages[0].Prompt, "how many angels can dance on the head of a pin") + + // Streaming produces 2 token records: message_start + message_delta. + tokenUsages := bridgeServer.Recorder.RecordedTokenUsages() + require.Len(t, tokenUsages, 2) + assert.EqualValues(t, 18, bridgeServer.Recorder.TotalInputTokens(), "input tokens") + // message_start reports output_tokens=1, message_delta reports output_tokens=50. + assert.EqualValues(t, 51, bridgeServer.Recorder.TotalOutputTokens(), "output tokens") + + // Verify interception lifecycle. + bridgeServer.Recorder.VerifyAllInterceptionsEnded(t) +}