Skip to content

Commit 3c46d4c

Browse files
Ekaterina Bulatovacursoragent
authored andcommitted
feat(sdk,core): offload large trigger payloads via object storage
Upload oversized trigger payloads before the API request and send an application/store pointer instead of embedding large JSON in the trigger body. Validate pointer payloads in TriggerTaskRequestBody. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9211032 commit 3c46d4c

4 files changed

Lines changed: 93 additions & 20 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Offload large trigger payloads to object storage before sending the trigger API request. The SDK uploads packets at or above the existing 128KB limit and sends an `application/store` pointer instead of embedding large JSON in the request body. `TriggerTaskRequestBody` now validates that `application/store` payloads are non-empty storage paths.

packages/core/src/v3/schemas/api-type.test.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, it, expect } from "vitest";
2-
import { InitializeDeploymentRequestBody } from "./api.js";
2+
import { InitializeDeploymentRequestBody, TriggerTaskRequestBody } from "./api.js";
33
import type { InitializeDeploymentRequestBody as InitializeDeploymentRequestBodyType } from "./api.js";
44

55
describe("InitializeDeploymentRequestBody", () => {
@@ -139,3 +139,41 @@ describe("InitializeDeploymentRequestBody", () => {
139139
});
140140
});
141141
});
142+
143+
describe("TriggerTaskRequestBody", () => {
144+
it("accepts application/store payload as a non-empty string", () => {
145+
const result = TriggerTaskRequestBody.safeParse({
146+
payload: "packets/payloads/file.json",
147+
context: {},
148+
options: {
149+
payloadType: "application/store",
150+
},
151+
});
152+
153+
expect(result.success).toBe(true);
154+
});
155+
156+
it("rejects application/store payload when payload is not a string", () => {
157+
const result = TriggerTaskRequestBody.safeParse({
158+
payload: { foo: "bar" },
159+
context: {},
160+
options: {
161+
payloadType: "application/store",
162+
},
163+
});
164+
165+
expect(result.success).toBe(false);
166+
});
167+
168+
it("rejects application/store payload when payload is an empty string", () => {
169+
const result = TriggerTaskRequestBody.safeParse({
170+
payload: "",
171+
context: {},
172+
options: {
173+
payloadType: "application/store",
174+
},
175+
});
176+
177+
expect(result.success).toBe(false);
178+
});
179+
});

packages/core/src/v3/schemas/api.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,12 @@ export const IdempotencyKeyOptionsSchema = z.object({
157157

158158
export type IdempotencyKeyOptionsSchema = z.infer<typeof IdempotencyKeyOptionsSchema>;
159159

160-
export const TriggerTaskRequestBody = z.object({
161-
payload: z.any(),
162-
context: z.any(),
163-
options: z
164-
.object({
160+
export const TriggerTaskRequestBody = z
161+
.object({
162+
payload: z.any(),
163+
context: z.any(),
164+
options: z
165+
.object({
165166
/** @deprecated engine v1 only */
166167
dependentAttempt: z.string().optional(),
167168
/** @deprecated engine v1 only */
@@ -227,9 +228,22 @@ export const TriggerTaskRequestBody = z.object({
227228
maxDelay: z.string().optional(),
228229
})
229230
.optional(),
230-
})
231-
.optional(),
232-
});
231+
})
232+
.optional(),
233+
})
234+
.superRefine((value, ctx) => {
235+
if (value.options?.payloadType !== "application/store") {
236+
return;
237+
}
238+
239+
if (typeof value.payload !== "string" || value.payload.length === 0) {
240+
ctx.addIssue({
241+
code: z.ZodIssueCode.custom,
242+
message: "payload must be a non-empty string when options.payloadType is application/store",
243+
path: ["payload"],
244+
});
245+
}
246+
});
233247

234248
export type TriggerTaskRequestBody = z.infer<typeof TriggerTaskRequestBody>;
235249

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ import { SpanKind } from "@opentelemetry/api";
22
import { SerializableJson } from "@trigger.dev/core";
33
import {
44
accessoryAttributes,
5+
ApiClient,
56
ApiError,
67
apiClientManager,
78
ApiRequestOptions,
9+
conditionallyExportPacket,
810
conditionallyImportPacket,
911
convertToolParametersToSchema,
1012
createErrorTaskError,
@@ -25,6 +27,7 @@ import {
2527
sdkScope,
2628
SemanticInternalAttributes,
2729
stringifyIO,
30+
type IOPacket,
2831
SubtaskUnwrapError,
2932
taskContext,
3033
TaskFromIdentifier,
@@ -2212,8 +2215,7 @@ async function trigger_internal<TRunTypes extends AnyRunTypes>(
22122215
const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig);
22132216

22142217
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
2215-
2216-
const payloadPacket = await stringifyIO(parsedPayload);
2218+
const triggerPayloadPacket = await prepareTriggerPayload(parsedPayload, apiClient, id);
22172219

22182220
// Process idempotency key and extract options for storage
22192221
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
@@ -2224,12 +2226,12 @@ async function trigger_internal<TRunTypes extends AnyRunTypes>(
22242226
const handle = await apiClient.triggerTask(
22252227
id,
22262228
{
2227-
payload: payloadPacket.data,
2229+
payload: triggerPayloadPacket.data,
22282230
options: {
22292231
queue: options?.queue ? { name: options.queue } : undefined,
22302232
concurrencyKey: options?.concurrencyKey,
22312233
test: taskContext.ctx?.run.isTest,
2232-
payloadType: payloadPacket.dataType,
2234+
payloadType: triggerPayloadPacket.dataType,
22332235
idempotencyKey: processedIdempotencyKey?.toString(),
22342236
idempotencyKeyTTL: options?.idempotencyKeyTTL,
22352237
idempotencyKeyOptions,
@@ -2468,8 +2470,7 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
24682470
const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig);
24692471

24702472
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
2471-
2472-
const payloadPacket = await stringifyIO(parsedPayload);
2473+
const triggerPayloadPacket = await prepareTriggerPayload(parsedPayload, apiClient, id);
24732474

24742475
// Process idempotency key and extract options for storage
24752476
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
@@ -2483,13 +2484,13 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
24832484
const response = await apiClient.triggerTask(
24842485
id,
24852486
{
2486-
payload: payloadPacket.data,
2487+
payload: triggerPayloadPacket.data,
24872488
options: {
24882489
lockToVersion: taskContext.worker?.version, // Lock to current version because we're waiting for it to finish
24892490
queue: options?.queue ? { name: options.queue } : undefined,
24902491
concurrencyKey: options?.concurrencyKey,
24912492
test: taskContext.ctx?.run.isTest,
2492-
payloadType: payloadPacket.dataType,
2493+
payloadType: triggerPayloadPacket.dataType,
24932494
delay: options?.delay,
24942495
ttl: options?.ttl,
24952496
tags: options?.tags,
@@ -2555,7 +2556,7 @@ async function triggerAndSubscribe_internal<TIdentifier extends string, TPayload
25552556
const apiClient = apiClientManager.clientOrThrow(requestOptions?.clientConfig);
25562557

25572558
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
2558-
const payloadPacket = await stringifyIO(parsedPayload);
2559+
const triggerPayloadPacket = await prepareTriggerPayload(parsedPayload, apiClient, id);
25592560

25602561
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
25612562
const idempotencyKeyOptions = processedIdempotencyKey
@@ -2568,13 +2569,13 @@ async function triggerAndSubscribe_internal<TIdentifier extends string, TPayload
25682569
const response = await apiClient.triggerTask(
25692570
id,
25702571
{
2571-
payload: payloadPacket.data,
2572+
payload: triggerPayloadPacket.data,
25722573
options: {
25732574
lockToVersion: taskContext.worker?.version,
25742575
queue: options?.queue ? { name: options.queue } : undefined,
25752576
concurrencyKey: options?.concurrencyKey,
25762577
test: taskContext.ctx?.run.isTest,
2577-
payloadType: payloadPacket.dataType,
2578+
payloadType: triggerPayloadPacket.dataType,
25782579
delay: options?.delay,
25792580
ttl: options?.ttl,
25802581
tags: options?.tags,
@@ -3074,3 +3075,17 @@ function registerTaskLifecycleHooks<
30743075
});
30753076
}
30763077
}
3078+
3079+
async function prepareTriggerPayload(
3080+
payload: unknown,
3081+
apiClient: ApiClient,
3082+
taskId: string
3083+
): Promise<IOPacket> {
3084+
const payloadPacket = await stringifyIO(payload);
3085+
return await conditionallyExportPacket(payloadPacket, createTriggerPayloadPathPrefix(taskId));
3086+
}
3087+
3088+
function createTriggerPayloadPathPrefix(taskId: string): string {
3089+
const safeTaskId = encodeURIComponent(taskId);
3090+
return `trigger/${safeTaskId}/${Date.now()}-${Math.random().toString(36).slice(2)}/payload`;
3091+
}

0 commit comments

Comments
 (0)