From cbf0957ff1207df5f610c3fdd962f159afcba477 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 25 May 2026 00:28:32 -0700 Subject: [PATCH] stream: reject pending reads on iterator throw Settle reads that are already waiting when a push stream iterator is returned or thrown. This prevents next() promises from hanging after consumer cancellation. Fixes: https://github.com/nodejs/node/issues/63554 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/push.js | 9 +++++ test/parallel/test-stream-iter-push-writer.js | 40 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index e531e65428ef6d..79339f8e1f02e1 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -426,6 +426,7 @@ class PushQueue { if (this.#consumerState !== 'active') return; this.#consumerState = 'returned'; this.#cleanup(); + this.#resolvePendingReads(); this.#rejectPendingWrites( new ERR_INVALID_STATE.TypeError('Stream closed by consumer')); // If closing, reject the pending end promise @@ -443,7 +444,12 @@ class PushQueue { this.#consumerState = 'thrown'; this.#error = error; this.#cleanup(); + this.#rejectPendingReads(error); this.#rejectPendingWrites(error); + if (this.#writerState === 'closing' && this.#pendingEnd) { + this.#pendingEnd.reject(error); + this.#pendingEnd = null; + } // Reject pending drains - the consumer errored this.#rejectPendingDrains(error); } @@ -485,6 +491,9 @@ class PushQueue { } else if (this.#writerState === 'errored' && this.#error) { const pending = this.#pendingReads.shift(); pending.reject(this.#error); + } else if (this.#consumerState === 'returned') { + const pending = this.#pendingReads.shift(); + pending.resolve({ __proto__: null, value: undefined, done: true }); } else { break; } diff --git a/test/parallel/test-stream-iter-push-writer.js b/test/parallel/test-stream-iter-push-writer.js index acac652b23941b..f07ccb575ec570 100644 --- a/test/parallel/test-stream-iter-push-writer.js +++ b/test/parallel/test-stream-iter-push-writer.js @@ -323,6 +323,44 @@ async function testFailRejectsPendingRead() { ); } +// iterator.return() resolves a pending read with done:true +async function testConsumerReturnResolvesPendingRead() { + const { readable } = push(); + + const iter = readable[Symbol.asyncIterator](); + const readPromise = iter.next(); + + await new Promise(setImmediate); + + const returnResult = await iter.return(); + assert.strictEqual(returnResult.value, undefined); + assert.strictEqual(returnResult.done, true); + + const readResult = await readPromise; + assert.strictEqual(readResult.value, undefined); + assert.strictEqual(readResult.done, true); +} + +// iterator.throw() rejects a pending read with the thrown error +async function testConsumerThrowRejectsPendingRead() { + const { readable } = push(); + + const iter = readable[Symbol.asyncIterator](); + const readPromise = iter.next(); + + await new Promise(setImmediate); + + const err = new Error('consumer read boom'); + const throwResult = await iter.throw(err); + assert.strictEqual(throwResult.value, undefined); + assert.strictEqual(throwResult.done, true); + + await assert.rejects( + () => readPromise, + (e) => e === err, + ); +} + // end() while writes are pending rejects those writes async function testEndRejectsPendingWrites() { const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' }); @@ -435,6 +473,8 @@ Promise.all([ testConsumerThrowRejectsWrites(), testEndResolvesPendingRead(), testFailRejectsPendingRead(), + testConsumerReturnResolvesPendingRead(), + testConsumerThrowRejectsPendingRead(), testEndRejectsPendingWrites(), testEndIdempotentWhenClosed(), testEndRejectsWhenErrored(),