Skip to content

Commit e4fd87e

Browse files
committed
feat(sdk): route chat session streaming to the realtime host by default
Chat clients now send realtime session reads and input appends to the dedicated realtime host on Trigger.dev Cloud instead of the API service, isolating long-lived chat streaming from regular API traffic. Custom and self-hosted base URLs are unchanged; the change takes effect as clients upgrade.
1 parent fa4804e commit e4fd87e

5 files changed

Lines changed: 172 additions & 8 deletions

File tree

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Chat clients (`TriggerChatTransport` and `AgentChat`) now stream over Trigger.dev Cloud's dedicated realtime host (`realtime.trigger.dev`) by default. A chat session's long-lived SSE reads and input appends no longer run through the main Cloud API host, keeping chat streaming isolated from regular API traffic.
6+
7+
This only changes the Cloud default. Custom and self-hosted base URLs are left untouched (they keep serving realtime on the same origin), and passing a `baseURL` resolver function opts out entirely:
8+
9+
```ts
10+
new TriggerChatTransport({
11+
task: "my-chat",
12+
// realtime in/out endpoints stay on your own host
13+
baseURL: ({ endpoint }) => "https://trigger.acme.internal",
14+
});
15+
```
16+
17+
If you gate chat traffic behind a CSP or network allowlist, add `realtime.trigger.dev`.

packages/trigger-sdk/src/v3/ai-shared.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,28 @@ import type { InferUITools, ModelMessage, ToolSet, UIDataTypes, UIMessage } from
2424
*/
2525
export const PENDING_MESSAGE_INJECTED_TYPE = "data-pending-message-injected" as const;
2626

27+
/** Trigger.dev Cloud api host. */
28+
export const CLOUD_API_BASE_URL = "https://api.trigger.dev";
29+
30+
/**
31+
* Trigger.dev Cloud host that serves realtime chat-session endpoints
32+
* (`/realtime/v1/sessions/*`). These are long-lived SSE reads and frequent
33+
* input appends, so Cloud serves them from a dedicated host rather than
34+
* loading the api service. Mirrors the worker-side `STREAM_ORIGIN` config.
35+
*/
36+
export const CLOUD_STREAM_BASE_URL = "https://realtime.trigger.dev";
37+
38+
/**
39+
* Map a chat client base URL to the host that should serve its realtime
40+
* session endpoints. On Trigger.dev Cloud the realtime endpoints live on a
41+
* dedicated host, so the Cloud api host is routed there. Any other base URL (a
42+
* custom domain or a self-hosted instance) is returned unchanged, since those
43+
* serve realtime on the same origin as the api.
44+
*/
45+
export function resolveChatStreamBaseURL(baseURL: string): string {
46+
return baseURL.replace(/\/$/, "") === CLOUD_API_BASE_URL ? CLOUD_STREAM_BASE_URL : baseURL;
47+
}
48+
2749
/**
2850
* The wire payload shape sent by `TriggerChatTransport`.
2951
* Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name.

packages/trigger-sdk/src/v3/chat-client.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ import {
2828
TRIGGER_CONTROL_SUBTYPE,
2929
} from "@trigger.dev/core/v3";
3030
import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js";
31-
import { slimSubmitMessageForWire } from "./ai-shared.js";
31+
import {
32+
CLOUD_API_BASE_URL,
33+
resolveChatStreamBaseURL,
34+
slimSubmitMessageForWire,
35+
} from "./ai-shared.js";
3236
import { sessions } from "./sessions.js";
3337

3438
// ─── Type inference ────────────────────────────────────────────────
@@ -328,9 +332,18 @@ export class AgentChat<TAgent = unknown> {
328332
this.onTriggered = options.onTriggered;
329333
this.onTurnComplete = options.onTurnComplete;
330334
const baseURLOption = options.baseURL;
331-
this.baseURLResolver = typeof baseURLOption === "function"
332-
? baseURLOption
333-
: () => baseURLOption ?? apiClientManager.baseURL ?? "https://api.trigger.dev";
335+
// `AgentChat` only addresses realtime session endpoints (`in`/`out`). For a
336+
// string base URL pointing at Trigger.dev Cloud, route those to the
337+
// dedicated realtime host so the SSE reads and input appends don't load the
338+
// api service. Custom/self-hosted base URLs pass through unchanged, and a
339+
// resolver function is honored verbatim.
340+
this.baseURLResolver =
341+
typeof baseURLOption === "function"
342+
? baseURLOption
343+
: () =>
344+
resolveChatStreamBaseURL(
345+
baseURLOption ?? apiClientManager.baseURL ?? CLOUD_API_BASE_URL
346+
);
334347
this.fetchOverride = options.fetch;
335348

336349
// Hydration: a non-empty `session` means the caller knows the

packages/trigger-sdk/src/v3/chat.test.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,103 @@ describe("TriggerChatTransport", () => {
654654
expect(subscribe!.url.startsWith("https://stream.example.com/")).toBe(true);
655655
});
656656

657+
it("defaults realtime endpoints to realtime.trigger.dev on Trigger.dev Cloud", async () => {
658+
const requests: string[] = [];
659+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
660+
const urlStr = typeof url === "string" ? url : url.toString();
661+
requests.push(urlStr);
662+
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
663+
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
664+
throw new Error(`Unexpected URL: ${urlStr}`);
665+
});
666+
667+
// No baseURL -> defaults to https://api.trigger.dev (Cloud). The realtime
668+
// session endpoints should be routed to the dedicated realtime host.
669+
const transport = new TriggerChatTransport({
670+
task: "my-chat-task",
671+
accessToken: () => "pat",
672+
sessions: { "chat-cloud": { publicAccessToken: "p" } },
673+
});
674+
675+
const stream = await transport.sendMessages({
676+
trigger: "submit-message",
677+
chatId: "chat-cloud",
678+
messageId: undefined,
679+
messages: [createUserMessage("Hi")],
680+
abortSignal: undefined,
681+
});
682+
await drainChunks(stream);
683+
684+
const append = requests.find(isSessionStreamAppendUrl);
685+
const subscribe = requests.find(isSessionOutSubscribeUrl);
686+
expect(append!.startsWith("https://realtime.trigger.dev/")).toBe(true);
687+
expect(subscribe!.startsWith("https://realtime.trigger.dev/")).toBe(true);
688+
});
689+
690+
it("leaves a custom/self-hosted baseURL untouched for realtime endpoints", async () => {
691+
const requests: string[] = [];
692+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
693+
const urlStr = typeof url === "string" ? url : url.toString();
694+
requests.push(urlStr);
695+
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
696+
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
697+
throw new Error(`Unexpected URL: ${urlStr}`);
698+
});
699+
700+
const transport = new TriggerChatTransport({
701+
task: "my-chat-task",
702+
accessToken: () => "pat",
703+
baseURL: "https://trigger.acme.internal",
704+
sessions: { "chat-self": { publicAccessToken: "p" } },
705+
});
706+
707+
const stream = await transport.sendMessages({
708+
trigger: "submit-message",
709+
chatId: "chat-self",
710+
messageId: undefined,
711+
messages: [createUserMessage("Hi")],
712+
abortSignal: undefined,
713+
});
714+
await drainChunks(stream);
715+
716+
const append = requests.find(isSessionStreamAppendUrl);
717+
const subscribe = requests.find(isSessionOutSubscribeUrl);
718+
expect(append!.startsWith("https://trigger.acme.internal/")).toBe(true);
719+
expect(subscribe!.startsWith("https://trigger.acme.internal/")).toBe(true);
720+
});
721+
722+
it("does not remap non-prod cloud hosts (mapping is prod-only)", async () => {
723+
const requests: string[] = [];
724+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
725+
const urlStr = typeof url === "string" ? url : url.toString();
726+
requests.push(urlStr);
727+
if (isSessionStreamAppendUrl(urlStr)) return defaultAppendResponse();
728+
if (isSessionOutSubscribeUrl(urlStr)) return defaultSseResponse();
729+
throw new Error(`Unexpected URL: ${urlStr}`);
730+
});
731+
732+
const transport = new TriggerChatTransport({
733+
task: "my-chat-task",
734+
accessToken: () => "pat",
735+
baseURL: "https://test-api.trigger.dev",
736+
sessions: { "chat-test": { publicAccessToken: "p" } },
737+
});
738+
739+
const stream = await transport.sendMessages({
740+
trigger: "submit-message",
741+
chatId: "chat-test",
742+
messageId: undefined,
743+
messages: [createUserMessage("Hi")],
744+
abortSignal: undefined,
745+
});
746+
await drainChunks(stream);
747+
748+
const append = requests.find(isSessionStreamAppendUrl);
749+
const subscribe = requests.find(isSessionOutSubscribeUrl);
750+
expect(append!.startsWith("https://test-api.trigger.dev/")).toBe(true);
751+
expect(subscribe!.startsWith("https://test-api.trigger.dev/")).toBe(true);
752+
});
753+
657754
it("fetch override is invoked for both .in/append and .out SSE with endpoint ctx", async () => {
658755
const fetchCalls: Array<{ url: string; endpoint: string; chatId: string }> = [];
659756

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
} from "@trigger.dev/core/v3";
3434
import { ChatTabCoordinator } from "./chat-tab-coordinator.js";
3535
import type { ChatInputChunk, ChatTaskWirePayload } from "./ai-shared.js";
36-
import { slimSubmitMessageForWire } from "./ai-shared.js";
36+
import { resolveChatStreamBaseURL, slimSubmitMessageForWire } from "./ai-shared.js";
3737

3838
const DEFAULT_BASE_URL = "https://api.trigger.dev";
3939
const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
@@ -284,6 +284,11 @@ export type TriggerChatTransportOptions<TClientData = unknown> = {
284284
* endpoint, or a function called per request that picks a base URL from the
285285
* endpoint discriminator and chat ID. @default "https://api.trigger.dev"
286286
*
287+
* When a string base URL points at Trigger.dev Cloud (`https://api.trigger.dev`),
288+
* the realtime session endpoints (`in`/`out`) are routed to the dedicated
289+
* realtime host (`https://realtime.trigger.dev`) automatically. Pass a
290+
* resolver function to take full control and opt out of this.
291+
*
287292
* @example Route appends through a proxy, SSE direct:
288293
* ```ts
289294
* baseURL: ({ endpoint }) =>
@@ -462,9 +467,19 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
462467
| undefined;
463468
const baseURLOption = options.baseURL ?? DEFAULT_BASE_URL;
464469
const streamOverride = options.streamBaseURL;
465-
this.resolveBaseURLFn = typeof baseURLOption === "function"
466-
? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx))
467-
: (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption);
470+
// The transport only ever talks to realtime session endpoints (`in`/`out`).
471+
// For a string base URL pointing at Trigger.dev Cloud, route those to the
472+
// dedicated realtime host (`resolveChatStreamBaseURL`) so the long-lived
473+
// SSE reads and input appends don't load the api service. An explicit
474+
// `streamBaseURL` (SSE only, deprecated) and a `baseURL` resolver function
475+
// are honored verbatim — the customer owns routing in those cases.
476+
this.resolveBaseURLFn =
477+
typeof baseURLOption === "function"
478+
? (ctx) => (ctx.endpoint === "out" && streamOverride ? streamOverride : baseURLOption(ctx))
479+
: (ctx) =>
480+
ctx.endpoint === "out" && streamOverride
481+
? streamOverride
482+
: resolveChatStreamBaseURL(baseURLOption);
468483
this.fetchOverride = options.fetch;
469484
this.extraHeaders = options.headers ?? {};
470485
this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;

0 commit comments

Comments
 (0)