Skip to content

Commit 43e8543

Browse files
committed
fix: handle fast-completion race in StreamBatchItemsService count check
When all batch runs complete before getBatchEnqueuedCount() is called, cleanup() has already deleted the enqueuedItemsKey in Redis, causing it to return 0. The existing Postgres fallback only checked sealed, but the BatchQueue completion path sets status=COMPLETED without setting sealed=true. Add the status check so the endpoint returns sealed:true instead of triggering SDK retries into a dead BatchQueue. Also switch findUnique to findFirst per webapp convention.
1 parent 7d7ebdd commit 43e8543

1 file changed

Lines changed: 8 additions & 5 deletions

File tree

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,18 @@ export class StreamBatchItemsService extends WithRunEngine {
212212
// Validate we received the expected number of items
213213
if (enqueuedCount !== batch.runCount) {
214214
// The batch queue consumers may have already processed all items and
215-
// cleaned up the Redis keys before we got here (especially likely when
216-
// items include pre-failed runs that complete instantly). Check if the
217-
// batch was already sealed/completed in Postgres.
218-
const currentBatch = await this._prisma.batchTaskRun.findUnique({
215+
// cleaned up the Redis keys before we got here. This happens when all
216+
// runs complete fast enough that cleanup() deletes the enqueuedItemsKey
217+
// before we read it — typically when the last item executes in the
218+
// milliseconds between the loop ending and getBatchEnqueuedCount() being called.
219+
// Check both sealed (sealed by this endpoint on a concurrent request) and
220+
// COMPLETED (sealed by the BatchQueue completion path before we got here).
221+
const currentBatch = await this._prisma.batchTaskRun.findFirst({
219222
where: { id: batchId },
220223
select: { sealed: true, status: true },
221224
});
222225

223-
if (currentBatch?.sealed) {
226+
if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
224227
logger.info("Batch already sealed before count check (fast completion)", {
225228
batchId: batchFriendlyId,
226229
itemsAccepted,

0 commit comments

Comments
 (0)