Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/green-birds-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/voice": patch
---

Support AI SDK fullStream responses in voice turns and warn when textStream is used.
2 changes: 1 addition & 1 deletion docs/voice.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async onTurn(transcript: string, context: VoiceTurnContext) {
abortSignal: context.signal
});

return result.textStream;
return result.fullStream;
}
```

Expand Down
2 changes: 1 addition & 1 deletion examples/playground/src/demos/voice/voice-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class PlaygroundVoiceAgent extends VoiceAgent<Env> {
abortSignal: context.signal
});

return result.textStream;
return result.fullStream;
}

async onCallStart(connection: Connection) {
Expand Down
2 changes: 1 addition & 1 deletion examples/telnyx-voice-agent/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class MyVoiceAgent extends VoiceAgent<Env> {
]
});

return result.textStream;
return result.fullStream;
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/voice-agent/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class MyVoiceAgent extends VoiceAgent<Env> {
abortSignal: context.signal
});

return result.textStream;
return result.fullStream;
}

async onCallStart(connection: Connection) {
Expand Down
4 changes: 2 additions & 2 deletions experimental/voice.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class MyAgent extends VoiceAgent<Env> {
abortSignal: context.signal
});

return result.textStream;
return result.fullStream;
}
}

Expand Down Expand Up @@ -188,7 +188,7 @@ async onTurn(
**Return value:**

- `string` — The agent's full response. Synthesized as a single TTS call.
- `AsyncIterable<string>` — A token stream (e.g. from `streamText().textStream`). The pipeline chunks it into sentences and synthesizes TTS per-sentence for lower latency.
- AI SDK `fullStream` or `AsyncIterable<string>` — The pipeline chunks text into sentences and synthesizes TTS per-sentence for lower latency.

### Lifecycle hooks — optional

Expand Down
2 changes: 1 addition & 1 deletion ideas/voice.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class RestaurantAgent extends VoiceAgent<Env> {
tools: { book_table: tool({...}) },
abortSignal: context.signal
});
return result.textStream; // streamed through sentence chunker → TTS
return result.fullStream; // streamed through sentence chunker → TTS
}

async onCallStart(connection) {
Expand Down
6 changes: 3 additions & 3 deletions packages/voice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class MyAgent extends VoiceAgent<Env> {
}
```

`onTurn()` can also return streaming text, including AI SDK `textStream` values:
`onTurn()` can also return streaming text, including AI SDK `fullStream` values:

```typescript
import { streamText } from "ai";
Expand All @@ -55,7 +55,7 @@ async onTurn(transcript: string) {
messages: [{ role: "user", content: transcript }]
});

return result.textStream;
return result.fullStream;
}
```

Expand All @@ -70,7 +70,7 @@ async onTurn(transcript: string) {

| Method | Description |
| -------------------------------- | -------------------------------------------------------------------------------------------------------------- |
| `onTurn(transcript, context)` | **Required.** Handle a user utterance. Return `string` or `AsyncIterable<string>`. |
| `onTurn(transcript, context)` | **Required.** Handle a user utterance. Return `string`, AI SDK `fullStream`, or `AsyncIterable<string>`. |
| `createTranscriber(connection)` | Override to create a transcriber dynamically per connection. |
| `onCallStart(connection)` | Called when a voice call begins. |
| `onCallEnd(connection)` | Called when a voice call ends. |
Expand Down
272 changes: 272 additions & 0 deletions packages/voice/src/tests/agents/voice.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { Agent, type Connection, type WSMessage } from "agents";
import {
stepCountIs,
streamText,
tool,
type LanguageModel,
type ToolSet
} from "ai";
import { z } from "zod";
import { withVoice, type VoiceTurnContext } from "../../voice";
import type {
TTSProvider,
Expand Down Expand Up @@ -70,6 +78,202 @@ class TestTranscriber implements Transcriber {
}
}

const v3FinishReason = (unified: "stop" | "tool-calls") => ({
unified,
raw: undefined
});

const v3Usage = (inputTokens: number, outputTokens: number) => ({
inputTokens: {
total: inputTokens,
noCache: inputTokens,
cacheRead: 0,
cacheWrite: 0
},
outputTokens: { total: outputTokens, text: outputTokens, reasoning: 0 }
});

type MockTextStreamPart =
| { type: "text"; text: string }
| {
type: "tool-call";
toolName: string;
input: Record<string, unknown>;
output?: unknown;
outputDelayMs?: number;
toolCallId?: string;
};

type MockTextStreamResponse = MockTextStreamPart[][];

const defaultMockTextStreamResponse: MockTextStreamResponse = [
[
{ type: "text", text: "I can get the weather for you." },
{
type: "tool-call",
toolName: "getWeather",
input: { location: "San Francisco" },
output: "warm"
}
],
[{ type: "text", text: "The weather is warm" }]
];

function createToolCallingTextStreamModel(
response: MockTextStreamResponse
): LanguageModel {
let callCount = 0;

return {
specificationVersion: "v3",
provider: "test",
modelId: "mock-tool-text-stream",
supportedUrls: {},
doGenerate() {
throw new Error("doGenerate not implemented");
},
doStream(_options: Record<string, unknown>) {
callCount++;
const step = response[callCount - 1] ?? [];
const hasToolCall = step.some((part) => part.type === "tool-call");

const stream = new ReadableStream({
start(controller) {
controller.enqueue({ type: "stream-start", warnings: [] });

for (let i = 0; i < step.length; i++) {
const part = step[i];
if (part.type === "text") {
const id = `t-${callCount}-${i}`;
controller.enqueue({ type: "text-start", id });
controller.enqueue({
type: "text-delta",
id,
delta: part.text
});
controller.enqueue({ type: "text-end", id });
} else {
const id = part.toolCallId ?? `tc-${callCount}-${i}`;
controller.enqueue({
type: "tool-input-start",
id,
toolName: part.toolName
});
controller.enqueue({
type: "tool-input-delta",
id,
delta: JSON.stringify(part.input)
});
controller.enqueue({ type: "tool-input-end", id });
controller.enqueue({
type: "tool-call",
toolCallId: id,
toolName: part.toolName,
input: JSON.stringify(part.input)
});
}
}

controller.enqueue({
type: "finish",
finishReason: v3FinishReason(hasToolCall ? "tool-calls" : "stop"),
usage: v3Usage(10 * callCount, 5 * callCount)
});

controller.close();
}
});

return Promise.resolve({ stream });
}
} as LanguageModel;
}

function createMockTools(response: MockTextStreamResponse): ToolSet {
const toolOutputs = new Map<
string,
{ output: unknown; outputDelayMs?: number }[]
>();
for (const step of response) {
for (const part of step) {
if (part.type === "tool-call") {
const outputs = toolOutputs.get(part.toolName) ?? [];
outputs.push({
output: part.output ?? `${part.toolName} result`,
...(part.outputDelayMs === undefined
? {}
: { outputDelayMs: part.outputDelayMs })
});
toolOutputs.set(part.toolName, outputs);
}
}
}

const tools: ToolSet = {};
for (const [toolName, outputs] of toolOutputs) {
tools[toolName] = tool({
description: `Mock ${toolName} tool`,
inputSchema: z.record(z.string(), z.unknown()),
execute: async (_input: Record<string, unknown>) => {
const result = outputs.shift();
if (!result) return `${toolName} result`;
if (result.outputDelayMs) {
await new Promise((resolve) =>
setTimeout(resolve, result.outputDelayMs)
);
}
return result.output;
}
});
}

return tools;
}

function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

function isMockTextStreamResponse(
value: unknown
): value is MockTextStreamResponse {
return (
Array.isArray(value) &&
value.every(
(step) =>
Array.isArray(step) &&
step.every((part) => {
if (!isRecord(part)) return false;
if (part.type === "text") return typeof part.text === "string";
return (
part.type === "tool-call" &&
typeof part.toolName === "string" &&
isRecord(part.input) &&
(part.output === undefined || isJsonValue(part.output)) &&
(part.outputDelayMs === undefined ||
typeof part.outputDelayMs === "number") &&
(part.toolCallId === undefined ||
typeof part.toolCallId === "string")
);
})
)
);
}

function isJsonValue(value: unknown): boolean {
if (
value === null ||
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
return true;
}
if (Array.isArray(value)) return value.every(isJsonValue);
if (isRecord(value)) return Object.values(value).every(isJsonValue);
return false;
}

// --- Test agents ---

const VoiceBase = withVoice(Agent);
Expand Down Expand Up @@ -247,3 +451,71 @@ export class TestEmptyResponseVoiceAgent extends VoiceBase {
);
}
}

export class TestAiSdkFullStreamVoiceAgent extends VoiceBase {
static options = { hibernate: false };

transcriber = new TestTranscriber();
tts = new TestTTS();
#mockResponse = defaultMockTextStreamResponse;

async onTurn(_transcript: string, _context: VoiceTurnContext) {
const result = streamText({
model: createToolCallingTextStreamModel(this.#mockResponse),
tools: createMockTools(this.#mockResponse),
stopWhen: stepCountIs(3),
prompt: "Check the weather, then answer."
});

return result.fullStream;
}

onMessage(connection: Connection, message: WSMessage) {
if (typeof message !== "string") return;
try {
const parsed = JSON.parse(message) as Record<string, unknown>;
if (parsed.type === "_set_mock_response") {
if (isMockTextStreamResponse(parsed.response)) {
this.#mockResponse = parsed.response;
}
connection.send(JSON.stringify({ type: "_ack", command: parsed.type }));
}
} catch {
// ignore
}
}
}

export class TestAiSdkTextStreamVoiceAgent extends VoiceBase {
static options = { hibernate: false };

transcriber = new TestTranscriber();
tts = new TestTTS();
#mockResponse = defaultMockTextStreamResponse;

async onTurn(_transcript: string, _context: VoiceTurnContext) {
const result = streamText({
model: createToolCallingTextStreamModel(this.#mockResponse),
tools: createMockTools(this.#mockResponse),
stopWhen: stepCountIs(3),
prompt: "Check the weather, then answer."
});

return result.textStream;
}

onMessage(connection: Connection, message: WSMessage) {
if (typeof message !== "string") return;
try {
const parsed = JSON.parse(message) as Record<string, unknown>;
if (parsed.type === "_set_mock_response") {
if (isMockTextStreamResponse(parsed.response)) {
this.#mockResponse = parsed.response;
}
connection.send(JSON.stringify({ type: "_ack", command: parsed.type }));
}
} catch {
// ignore
}
}
}
Loading
Loading