From f00f609107707f731db0a9417a654726fe37195b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:29:57 +0000 Subject: [PATCH 1/3] perf(run-engine): merge dequeue snapshot creation into taskRun.update transaction Nest the TaskRunExecutionSnapshot create inside the preceding taskRun.update() call in the dequeue flow, reducing 2 explicit BEGIN/COMMIT transactions to 1 per dequeue operation. This follows the same pattern already used in the completion path (runAttemptSystem.ts:735) and trigger path (engine/index.ts:674-686). Side effects (heartbeat enqueue, executionSnapshotCreated event) are kept outside the transaction and fed the result of the merged write. Also adds a public enqueueHeartbeatIfNeeded() method to ExecutionSnapshotSystem for reuse by other flows that will adopt the same merged pattern. Refs: TRI-8450 Co-Authored-By: Eric Allam --- ...merge-dequeue-snapshot-into-transaction.md | 6 ++ .../src/engine/systems/dequeueSystem.ts | 66 ++++++++++++------- .../engine/systems/executionSnapshotSystem.ts | 21 ++++++ 3 files changed, 69 insertions(+), 24 deletions(-) create mode 100644 .server-changes/merge-dequeue-snapshot-into-transaction.md diff --git a/.server-changes/merge-dequeue-snapshot-into-transaction.md b/.server-changes/merge-dequeue-snapshot-into-transaction.md new file mode 100644 index 00000000000..62c9a0ec6ca --- /dev/null +++ b/.server-changes/merge-dequeue-snapshot-into-transaction.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Merge execution snapshot creation into the dequeue taskRun.update transaction, reducing 2 DB commits to 1 per dequeue operation diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 9476a081fee..f83d61a274e 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -435,6 +435,32 @@ export class DequeueSystem { cliVersion: result.worker.cliVersion, maxDurationInSeconds, maxAttempts: maxAttempts ?? undefined, + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "PENDING_EXECUTING", + description: "Run was dequeued for execution", + // Map DEQUEUED -> PENDING for backwards compatibility with older runners + runStatus: "PENDING", + attemptNumber: result.run.attemptNumber ?? undefined, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + checkpointId: snapshot.checkpointId ?? undefined, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: { + connect: snapshot.completedWaitpoints.map((w) => ({ id: w.id })), + }, + completedWaitpointOrder: snapshot.completedWaitpoints + .filter((c) => c.index !== undefined) + .sort((a, b) => a.index! - b.index!) + .map((w) => w.id), + workerId, + runnerId, + }, + }, }, include: { runtimeEnvironment: true, @@ -516,30 +542,22 @@ export class DequeueSystem { hasPrivateLink = billingResult.val.hasPrivateLink; } - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - prisma, - { - run: { - id: runId, - status: lockedTaskRun.status, - attemptNumber: lockedTaskRun.attemptNumber, - }, - snapshot: { - executionStatus: "PENDING_EXECUTING", - description: "Run was dequeued for execution", - }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - checkpointId: snapshot.checkpointId ?? undefined, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: snapshot.completedWaitpoints, - workerId, - runnerId, - } - ); + // Snapshot was created as part of the taskRun.update above (single transaction). + // Fetch the enhanced snapshot and handle side effects (heartbeat + event) manually. + const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); + + this.$.eventBus.emit("executionSnapshotCreated", { + time: newSnapshot.createdAt, + run: { + id: newSnapshot.runId, + }, + snapshot: { + ...newSnapshot, + completedWaitpointIds: newSnapshot.completedWaitpoints.map((wp) => wp.id), + }, + }); + + await this.executionSnapshotSystem.enqueueHeartbeatIfNeeded(newSnapshot); return { version: "1" as const, diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index a224e5a86b0..d615c066b85 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -518,6 +518,27 @@ export class ExecutionSnapshotSystem { return executionResultFromSnapshot(latestSnapshot); } + /** + * Enqueues a heartbeat job for a snapshot if the execution status requires one. + * Use this after nesting a snapshot create inside a taskRun.update() to replicate + * the heartbeat side effect that createExecutionSnapshot normally handles. + */ + public async enqueueHeartbeatIfNeeded(snapshot: { + id: string; + runId: string; + executionStatus: TaskRunExecutionStatus; + }) { + const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus); + if (intervalMs !== null) { + await this.$.worker.enqueue({ + id: `heartbeatSnapshot.${snapshot.runId}`, + job: "heartbeatSnapshot", + payload: { snapshotId: snapshot.id, runId: snapshot.runId }, + availableAt: new Date(Date.now() + intervalMs), + }); + } + } + #getHeartbeatIntervalMs(status: TaskRunExecutionStatus): number | null { switch (status) { case "PENDING_EXECUTING": { From c5134e906f3a7b38afb6a1e14d98e0c44f8c8061 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:45:17 +0000 Subject: [PATCH 2/3] perf: eliminate getLatestExecutionSnapshot read in dequeue flow Pre-generate the snapshot ID with SnapshotId.generate() and construct the event/return data from values already available in scope. This removes the extra DB read that was added in the initial merge commit. Co-Authored-By: Eric Allam --- .../src/engine/systems/dequeueSystem.ts | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index f83d61a274e..e5260584d6f 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -3,7 +3,7 @@ import { startSpan } from "@internal/tracing"; import { assertExhaustive, tryCatch } from "@trigger.dev/core"; import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3"; import { placementTag } from "@trigger.dev/core/v3/serverOnly"; -import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; +import { getMaxDuration, SnapshotId } from "@trigger.dev/core/v3/isomorphic"; import { BackgroundWorker, BackgroundWorkerTask, @@ -416,6 +416,9 @@ export class DequeueSystem { ? undefined : result.task.retryConfig; + // Pre-generate snapshot ID so we can construct the result without an extra read + const snapshotIds = SnapshotId.generate(); + const lockedTaskRun = await prisma.taskRun.update({ where: { id: runId, @@ -437,6 +440,7 @@ export class DequeueSystem { maxAttempts: maxAttempts ?? undefined, executionSnapshots: { create: { + id: snapshotIds.id, engine: "V2", executionStatus: "PENDING_EXECUTING", description: "Run was dequeued for execution", @@ -543,35 +547,49 @@ export class DequeueSystem { } // Snapshot was created as part of the taskRun.update above (single transaction). - // Fetch the enhanced snapshot and handle side effects (heartbeat + event) manually. - const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); + // Construct the snapshot info from data we already have and handle side effects + // (heartbeat + event) manually — no extra DB read needed. + const snapshotCreatedAt = new Date(); this.$.eventBus.emit("executionSnapshotCreated", { - time: newSnapshot.createdAt, + time: snapshotCreatedAt, run: { - id: newSnapshot.runId, + id: runId, }, snapshot: { - ...newSnapshot, - completedWaitpointIds: newSnapshot.completedWaitpoints.map((wp) => wp.id), + id: snapshotIds.id, + executionStatus: "PENDING_EXECUTING", + description: "Run was dequeued for execution", + runStatus: "PENDING", + attemptNumber: result.run.attemptNumber ?? null, + checkpointId: snapshot.checkpointId ?? null, + workerId: workerId ?? null, + runnerId: runnerId ?? null, + isValid: true, + error: null, + completedWaitpointIds: snapshot.completedWaitpoints.map((wp) => wp.id), }, }); - await this.executionSnapshotSystem.enqueueHeartbeatIfNeeded(newSnapshot); + await this.executionSnapshotSystem.enqueueHeartbeatIfNeeded({ + id: snapshotIds.id, + runId, + executionStatus: "PENDING_EXECUTING", + }); return { version: "1" as const, dequeuedAt: new Date(), workerQueueLength: message.workerQueueLength, snapshot: { - id: newSnapshot.id, - friendlyId: newSnapshot.friendlyId, - executionStatus: newSnapshot.executionStatus, - description: newSnapshot.description, - createdAt: newSnapshot.createdAt, + id: snapshotIds.id, + friendlyId: snapshotIds.friendlyId, + executionStatus: "PENDING_EXECUTING" as const, + description: "Run was dequeued for execution", + createdAt: snapshotCreatedAt, }, image: result.deployment?.imageReference ?? undefined, - checkpoint: newSnapshot.checkpoint ?? undefined, + checkpoint: snapshot.checkpoint ?? undefined, completedWaitpoints: snapshot.completedWaitpoints, backgroundWorker: { id: result.worker.id, From 2a874065524058afa23c751346332f92f6ab9d53 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:50:00 +0000 Subject: [PATCH 3/3] refactor: use plain cuid via generateInternalId instead of SnapshotId.generate() Co-Authored-By: Eric Allam --- .../run-engine/src/engine/systems/dequeueSystem.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index e5260584d6f..15d79e76baa 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -3,7 +3,7 @@ import { startSpan } from "@internal/tracing"; import { assertExhaustive, tryCatch } from "@trigger.dev/core"; import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3"; import { placementTag } from "@trigger.dev/core/v3/serverOnly"; -import { getMaxDuration, SnapshotId } from "@trigger.dev/core/v3/isomorphic"; +import { generateInternalId, getMaxDuration, SnapshotId } from "@trigger.dev/core/v3/isomorphic"; import { BackgroundWorker, BackgroundWorkerTask, @@ -417,7 +417,7 @@ export class DequeueSystem { : result.task.retryConfig; // Pre-generate snapshot ID so we can construct the result without an extra read - const snapshotIds = SnapshotId.generate(); + const snapshotId = generateInternalId(); const lockedTaskRun = await prisma.taskRun.update({ where: { @@ -440,7 +440,7 @@ export class DequeueSystem { maxAttempts: maxAttempts ?? undefined, executionSnapshots: { create: { - id: snapshotIds.id, + id: snapshotId, engine: "V2", executionStatus: "PENDING_EXECUTING", description: "Run was dequeued for execution", @@ -557,7 +557,7 @@ export class DequeueSystem { id: runId, }, snapshot: { - id: snapshotIds.id, + id: snapshotId, executionStatus: "PENDING_EXECUTING", description: "Run was dequeued for execution", runStatus: "PENDING", @@ -572,7 +572,7 @@ export class DequeueSystem { }); await this.executionSnapshotSystem.enqueueHeartbeatIfNeeded({ - id: snapshotIds.id, + id: snapshotId, runId, executionStatus: "PENDING_EXECUTING", }); @@ -582,8 +582,8 @@ export class DequeueSystem { dequeuedAt: new Date(), workerQueueLength: message.workerQueueLength, snapshot: { - id: snapshotIds.id, - friendlyId: snapshotIds.friendlyId, + id: snapshotId, + friendlyId: SnapshotId.toFriendlyId(snapshotId), executionStatus: "PENDING_EXECUTING" as const, description: "Run was dequeued for execution", createdAt: snapshotCreatedAt,