Skip to content

Commit ad29045

Browse files
fix: handle COMPLETED status in post-seal race check too
The second race check at the seal updateMany only accepted sealed=true && status=PROCESSING. But the BatchQueue completion path can set status=COMPLETED (without sealed=true) between getEnqueuedCount and the seal updateMany, causing the check to reject a legitimate success state and throw ServiceValidationError. Also switch the post-seal re-query from findUnique to findFirst per webapp convention. Co-Authored-By: Matt Aitken <matt@mattaitken.com>
1 parent 43e8543 commit ad29045

2 files changed

Lines changed: 143 additions & 5 deletions

File tree

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,18 @@ export class StreamBatchItemsService extends WithRunEngine {
282282

283283
// Check if we won the race to seal the batch
284284
if (sealResult.count === 0) {
285-
// Another request sealed the batch first - re-query to check current state
286-
const currentBatch = await this._prisma.batchTaskRun.findUnique({
285+
// The conditional update failed because the batch was no longer in
286+
// PENDING status. Re-query to determine which path got there first:
287+
// - A concurrent streaming request already sealed and moved it to
288+
// PROCESSING.
289+
// - The BatchQueue completion path finished all runs and set it to
290+
// COMPLETED (without setting sealed=true — that's this endpoint's
291+
// job). This window exists between completionCallback (which calls
292+
// tryCompleteBatch) and cleanup() in BatchQueue — see
293+
// batch-queue/index.ts.
294+
// Either way the goal — a durable batch that the SDK stops retrying —
295+
// has been achieved, so we return sealed: true.
296+
const currentBatch = await this._prisma.batchTaskRun.findFirst({
287297
where: { id: batchId },
288298
select: {
289299
id: true,
@@ -293,13 +303,17 @@ export class StreamBatchItemsService extends WithRunEngine {
293303
},
294304
});
295305

296-
if (currentBatch?.sealed && currentBatch.status === "PROCESSING") {
297-
// The batch was sealed by another request - this is fine, the goal was achieved
298-
logger.info("Batch already sealed by concurrent request", {
306+
if (
307+
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
308+
currentBatch?.status === "COMPLETED"
309+
) {
310+
logger.info("Batch already sealed/completed by concurrent path", {
299311
batchId: batchFriendlyId,
300312
itemsAccepted,
301313
itemsDeduplicated,
302314
envId: environment.id,
315+
batchStatus: currentBatch.status,
316+
batchSealed: currentBatch.sealed,
303317
});
304318

305319
span.setAttribute("itemsAccepted", itemsAccepted);

apps/webapp/test/engine/streamBatchItems.test.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,130 @@ describe("StreamBatchItemsService", () => {
384384
}
385385
);
386386

387+
containerTest(
388+
"should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt",
389+
async ({ prisma, redisOptions }) => {
390+
const engine = new RunEngine({
391+
prisma,
392+
worker: {
393+
redis: redisOptions,
394+
workers: 1,
395+
tasksPerWorker: 10,
396+
pollIntervalMs: 100,
397+
disabled: true,
398+
},
399+
queue: {
400+
redis: redisOptions,
401+
},
402+
runLock: {
403+
redis: redisOptions,
404+
},
405+
machines: {
406+
defaultMachine: "small-1x",
407+
machines: {
408+
"small-1x": {
409+
name: "small-1x" as const,
410+
cpu: 0.5,
411+
memory: 0.5,
412+
centsPerMs: 0.0001,
413+
},
414+
},
415+
baseCostInCents: 0.0005,
416+
},
417+
batchQueue: {
418+
redis: redisOptions,
419+
},
420+
tracer: trace.getTracer("test", "0.0.0"),
421+
});
422+
423+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
424+
425+
// Create a batch in PENDING state
426+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
427+
runCount: 2,
428+
status: "PENDING",
429+
sealed: false,
430+
});
431+
432+
// Initialize the batch in Redis
433+
await engine.initializeBatch({
434+
batchId: batch.id,
435+
friendlyId: batch.friendlyId,
436+
environmentId: authenticatedEnvironment.id,
437+
environmentType: authenticatedEnvironment.type,
438+
organizationId: authenticatedEnvironment.organizationId,
439+
projectId: authenticatedEnvironment.projectId,
440+
runCount: 2,
441+
processingConcurrency: 10,
442+
});
443+
444+
// Enqueue items - the enqueued count check passes but the seal updateMany
445+
// will race with tryCompleteBatch moving status to COMPLETED.
446+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
447+
task: "test-task",
448+
payload: JSON.stringify({ data: "item1" }),
449+
payloadType: "application/json",
450+
});
451+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
452+
task: "test-task",
453+
payload: JSON.stringify({ data: "item2" }),
454+
payloadType: "application/json",
455+
});
456+
457+
// Simulate the race where BatchQueue's completionCallback runs
458+
// tryCompleteBatch between getEnqueuedCount and the seal updateMany.
459+
// tryCompleteBatch sets status=COMPLETED but NOT sealed=true.
460+
const racingPrisma = {
461+
...prisma,
462+
batchTaskRun: {
463+
...prisma.batchTaskRun,
464+
findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun),
465+
updateMany: async () => {
466+
await prisma.batchTaskRun.update({
467+
where: { id: batch.id },
468+
data: {
469+
status: "COMPLETED",
470+
},
471+
});
472+
// The conditional updateMany(where: status="PENDING") would now fail
473+
return { count: 0 };
474+
},
475+
findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun),
476+
},
477+
} as unknown as PrismaClient;
478+
479+
const service = new StreamBatchItemsService({
480+
prisma: racingPrisma,
481+
engine,
482+
});
483+
484+
const result = await service.call(
485+
authenticatedEnvironment,
486+
batch.friendlyId,
487+
itemsToAsyncIterable([]),
488+
{
489+
maxItemBytes: 1024 * 1024,
490+
}
491+
);
492+
493+
// The endpoint should accept the COMPLETED state as a success case so the
494+
// SDK does not retry a batch whose child runs have already finished.
495+
expect(result.sealed).toBe(true);
496+
expect(result.id).toBe(batch.friendlyId);
497+
498+
const updatedBatch = await prisma.batchTaskRun.findUnique({
499+
where: { id: batch.id },
500+
});
501+
502+
expect(updatedBatch?.status).toBe("COMPLETED");
503+
// sealed stays false because the BatchQueue completion path does not set
504+
// it - that's fine, the batch is terminal.
505+
expect(updatedBatch?.sealed).toBe(false);
506+
507+
await engine.quit();
508+
}
509+
);
510+
387511
containerTest(
388512
"should throw error when race condition leaves batch in unexpected state",
389513
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)