diff --git a/.changeset/fix-bedrock-streaming-usage.md b/.changeset/fix-bedrock-streaming-usage.md new file mode 100644 index 00000000000..879d268c534 --- /dev/null +++ b/.changeset/fix-bedrock-streaming-usage.md @@ -0,0 +1,5 @@ +--- +"@effect/ai-amazon-bedrock": patch +--- + +Fix `@effect/ai-amazon-bedrock` streaming so the terminal `"finish"` part carries real token counts. The Bedrock Converse stream sends `metadata` (with the populated `usage` block) **after** `messageStop`, but the SDK was emitting `"finish"` synchronously on `messageStop`, capturing the still-empty `usage` defaults. Buffer the finish reason on `messageStop` and emit `"finish"` from the `metadata` case once `inputTokens` / `outputTokens` / `totalTokens` are filled in. diff --git a/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts b/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts index 72e87f4c08d..1c24e0d1f4c 100644 --- a/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts +++ b/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts @@ -656,6 +656,7 @@ const makeStreamResponse: ( let trace: ConverseTrace | undefined = undefined let cacheWriteInputTokens: number | undefined = undefined + let finishReason: Response.FinishReason | undefined = undefined const usage: Mutable = { inputTokens: undefined, outputTokens: undefined, @@ -677,16 +678,12 @@ const makeStreamResponse: ( } case "messageStop": { - const reason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason) - parts.push({ - type: "finish", - reason, - usage, - metadata: { - bedrock: { trace, usage: { cacheWriteInputTokens } } - } - }) - + // Buffer the finish reason — the Bedrock Converse stream sends + // `metadata` (with usage / cache / trace) AFTER `messageStop`, + // so emitting the `"finish"` part here would leak the empty + // `usage` defaults from the case below to the caller. Wait for + // `metadata` to land and emit `"finish"` there. + finishReason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason) break } @@ -908,6 +905,22 @@ const makeStreamResponse: ( if (Predicate.isNotUndefined(event.metadata.trace)) { trace = event.metadata.trace } + // Bedrock sends `metadata` after `messageStop`, so by the time + // we land here we have both the finish reason (buffered above) + // and the populated usage / cache / trace. Emit the `"finish"` + // part now so the caller sees real token counts on the + // terminal stream event. + if (Predicate.isNotUndefined(finishReason)) { + parts.push({ + type: "finish", + reason: finishReason, + usage, + metadata: { + bedrock: { trace, usage: { cacheWriteInputTokens } } + } + }) + finishReason = undefined + } break }