diff --git a/.server-changes/cancel-dequeued-runs.md b/.server-changes/cancel-dequeued-runs.md new file mode 100644 index 00000000000..4e393411010 --- /dev/null +++ b/.server-changes/cancel-dequeued-runs.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Show the cancel button in the runs list for runs in `DEQUEUED` status. `DEQUEUED` was missing from `NON_FINAL_RUN_STATUSES` so the list hid the button even though the single run page allowed it. diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index b5e1d915cf7..8606bcdafce 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -18,6 +18,7 @@ export const NON_FINAL_RUN_STATUSES = [ "PENDING", "PENDING_VERSION", "WAITING_FOR_DEPLOY", + "DEQUEUED", "EXECUTING", "WAITING_TO_RESUME", "RETRYING_AFTER_FAILURE", diff --git a/internal-packages/run-engine/src/engine/tests/cancelling.test.ts b/internal-packages/run-engine/src/engine/tests/cancelling.test.ts index 1e5947cfbc1..aecae7a2632 100644 --- a/internal-packages/run-engine/src/engine/tests/cancelling.test.ts +++ b/internal-packages/run-engine/src/engine/tests/cancelling.test.ts @@ -322,5 +322,132 @@ describe("RunEngine cancelling", () => { } }); + containerTest("Cancelling a run (dequeued)", async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const parentTask = "parent-task"; + + //create background worker + await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]); + + //trigger the run + const parentRun = await engine.trigger( + { + number: 1, + friendlyId: "run_p1234", + environment: authenticatedEnvironment, + taskIdentifier: parentTask, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: `task/${parentTask}`, + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run, but don't start an attempt — this leaves TaskRun.status = DEQUEUED + //and execution snapshot = PENDING_EXECUTING (a worker has claimed the run) + await setTimeout(500); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test_12345", + workerQueue: "main", + }); + expect(dequeued.length).toBe(1); + + const dequeuedRun = await prisma.taskRun.findFirstOrThrow({ + where: { id: parentRun.id }, + }); + expect(dequeuedRun.status).toBe("DEQUEUED"); + + //cancel the dequeued run — a worker has already claimed it, so the snapshot goes to + //PENDING_CANCEL pending the worker ack. TaskRun.status flips to CANCELED immediately + //so the UI reflects cancellation without waiting. + const result = await engine.cancelRun({ + runId: parentRun.id, + completedAt: new Date(), + reason: "Cancelled by the user", + }); + expect(result.snapshot.executionStatus).toBe("PENDING_CANCEL"); + + const pendingCancel = await engine.getRunExecutionData({ runId: parentRun.id }); + expect(pendingCancel?.snapshot.executionStatus).toBe("PENDING_CANCEL"); + expect(pendingCancel?.run.status).toBe("CANCELED"); + + let cancelledEventData: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (result) => { + cancelledEventData.push(result); + }); + + //simulate worker acknowledging the cancellation + const completeResult = await engine.completeRunAttempt({ + runId: parentRun.id, + snapshotId: pendingCancel!.snapshot.id, + completion: { + ok: false, + id: parentRun.id, + error: { + type: "INTERNAL_ERROR" as const, + code: "TASK_RUN_CANCELLED" as const, + }, + }, + }); + expect(completeResult.snapshot.executionStatus).toBe("FINISHED"); + expect(completeResult.run.status).toBe("CANCELED"); + + //check emitted event after worker ack + expect(cancelledEventData.length).toBe(1); + const parentEvent = cancelledEventData.find((r) => r.run.id === parentRun.id); + assertNonNullable(parentEvent); + expect(parentEvent.run.spanId).toBe(parentRun.spanId); + + //concurrency should have been released + const envConcurrencyCompleted = await engine.runQueue.currentConcurrencyOfEnvironment( + authenticatedEnvironment + ); + expect(envConcurrencyCompleted).toBe(0); + } finally { + await engine.quit(); + } + }); + //todo bulk cancelling runs });