Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-bedrock-streaming-usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/ai-amazon-bedrock": patch
---

Fix `@effect/ai-amazon-bedrock` streaming so the terminal `"finish"` part carries real token counts. The Bedrock Converse stream sends `metadata` (with the populated `usage` block) **after** `messageStop`, but the SDK was emitting `"finish"` synchronously on `messageStop`, capturing the still-empty `usage` defaults. Buffer the finish reason on `messageStop` and emit `"finish"` from the `metadata` case once `inputTokens` / `outputTokens` / `totalTokens` are filled in.
33 changes: 23 additions & 10 deletions packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ const makeStreamResponse: (

let trace: ConverseTrace | undefined = undefined
let cacheWriteInputTokens: number | undefined = undefined
let finishReason: Response.FinishReason | undefined = undefined
const usage: Mutable<typeof Response.Usage.Encoded> = {
inputTokens: undefined,
outputTokens: undefined,
Expand All @@ -677,16 +678,12 @@ const makeStreamResponse: (
}

case "messageStop": {
const reason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason)
parts.push({
type: "finish",
reason,
usage,
metadata: {
bedrock: { trace, usage: { cacheWriteInputTokens } }
}
})

// Buffer the finish reason — the Bedrock Converse stream sends
// `metadata` (with usage / cache / trace) AFTER `messageStop`,
// so emitting the `"finish"` part here would leak the empty
// `usage` defaults from the case below to the caller. Wait for
// `metadata` to land and emit `"finish"` there.
finishReason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason)
break
}

Expand Down Expand Up @@ -908,6 +905,22 @@ const makeStreamResponse: (
if (Predicate.isNotUndefined(event.metadata.trace)) {
trace = event.metadata.trace
}
// Bedrock sends `metadata` after `messageStop`, so by the time
// we land here we have both the finish reason (buffered above)
// and the populated usage / cache / trace. Emit the `"finish"`
// part now so the caller sees real token counts on the
// terminal stream event.
if (Predicate.isNotUndefined(finishReason)) {
parts.push({
type: "finish",
reason: finishReason,
usage,
metadata: {
bedrock: { trace, usage: { cacheWriteInputTokens } }
}
})
finishReason = undefined
}
break
}

Expand Down
Loading