From 1d1b87f7e630a83ee1b9860511835f92d759bb93 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 25 May 2026 12:28:33 -0700 Subject: [PATCH] stream: fix merge() idle source draining Defer pulling a merged source again until the consumer requests the next merged item. This prevents fast sources from being drained while the merged iterator is idle. Fixes: https://github.com/nodejs/node/issues/63566 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/consumers.js | 159 ++++++++++++------ .../test-stream-iter-consumers-merge.js | 26 +++ 2 files changed, 138 insertions(+), 47 deletions(-) diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index bcfe5d5ab29749..ae7341fc846792 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -17,6 +17,7 @@ const { ArrayPrototypeSlice, Promise, PromisePrototypeThen, + PromiseResolve, SafePromiseAllReturnVoid, SymbolAsyncIterator, TypedArrayPrototypeGetBuffer, @@ -412,19 +413,22 @@ function merge(...args) { return { __proto__: null, - async *[SymbolAsyncIterator]() { + [SymbolAsyncIterator]() { const signal = options?.signal; signal?.throwIfAborted(); - if (normalized.length === 0) return; + if (normalized.length === 0) { + return (async function*() {})(); + } if (normalized.length === 1) { - for await (const batch of normalized[0]) { - signal?.throwIfAborted(); - yield batch; - } - return; + return (async function*() { + for await (const batch of normalized[0]) { + signal?.throwIfAborted(); + yield batch; + } + })(); } // Multiple sources - use a ready queue so that batches that settle @@ -434,27 +438,22 @@ function merge(...args) { const ready = []; let activeCount = normalized.length; let waitResolve = null; + let lastIterator = null; + let started = false; + let done = false; // Called when a source's .next() settles. Pushes the result into // the ready queue and wakes the consumer if it's waiting. const onSettled = (iterator, result) => { + if (done) return; if (result.done) { activeCount--; } else { - ArrayPrototypePush(ready, result.value); - // Immediately request the next value from this source - // (at most one pending .next() per source) - PromisePrototypeThen( - iterator.next(), - (r) => onSettled(iterator, r), - (err) => { - ArrayPrototypePush(ready, { __proto__: null, error: err }); - if (waitResolve) { - waitResolve(); - waitResolve = null; - } - }, - ); + ArrayPrototypePush(ready, { + __proto__: null, + iterator, + value: result.value, + }); } if (waitResolve) { waitResolve(); @@ -462,15 +461,12 @@ function merge(...args) { } }; - // Start one .next() per source - const iterators = []; - for (let i = 0; i < normalized.length; i++) { - const iterator = normalized[i][SymbolAsyncIterator](); - ArrayPrototypePush(iterators, iterator); + const requestNext = (iterator) => { PromisePrototypeThen( iterator.next(), (r) => onSettled(iterator, r), (err) => { + if (done) return; ArrayPrototypePush(ready, { __proto__: null, error: err }); if (waitResolve) { waitResolve(); @@ -478,30 +474,27 @@ function merge(...args) { } }, ); - } + }; - try { - while (activeCount > 0 || ready.length > 0) { - signal?.throwIfAborted(); + const iterators = []; - // Drain ready queue synchronously - while (ready.length > 0) { - const item = ArrayPrototypeShift(ready); - if (item?.error) { - throw item.error; - } - yield item; - } + const start = () => { + if (started) return; + started = true; + for (let i = 0; i < normalized.length; i++) { + const iterator = normalized[i][SymbolAsyncIterator](); + ArrayPrototypePush(iterators, iterator); + requestNext(iterator); + } + }; - // If sources are still active, wait for the next settlement - if (activeCount > 0) { - await new Promise((resolve) => { - waitResolve = resolve; - }); - } + const cleanup = async () => { + if (done) return; + done = true; + if (waitResolve) { + waitResolve(); + waitResolve = null; } - } finally { - // Clean up: return all iterators await SafePromiseAllReturnVoid(iterators, async (iterator) => { if (iterator.return) { try { @@ -511,7 +504,79 @@ function merge(...args) { } } }); - } + }; + + const nextImpl = async () => { + try { + if (done) { + return { __proto__: null, done: true, value: undefined }; + } + + signal?.throwIfAborted(); + + start(); + + if (lastIterator !== null) { + requestNext(lastIterator); + lastIterator = null; + } + + while (!done) { + signal?.throwIfAborted(); + + while (ready.length > 0) { + const item = ArrayPrototypeShift(ready); + if (item?.error) { + await cleanup(); + throw item.error; + } + lastIterator = item.iterator; + return { __proto__: null, done: false, value: item.value }; + } + + if (activeCount === 0) { + await cleanup(); + return { __proto__: null, done: true, value: undefined }; + } + + await new Promise((resolve) => { + waitResolve = resolve; + }); + } + + return { __proto__: null, done: true, value: undefined }; + } catch (err) { + await cleanup(); + throw err; + } + }; + + const returnImpl = async () => { + await cleanup(); + return { __proto__: null, done: true, value: undefined }; + }; + + let nextQueue = PromiseResolve(); + const enqueue = (fn) => { + const result = PromisePrototypeThen(nextQueue, fn, fn); + nextQueue = PromisePrototypeThen(result, () => {}, () => {}); + return result; + }; + + return { + __proto__: null, + [SymbolAsyncIterator]() { + return this; + }, + + next() { + return enqueue(nextImpl); + }, + + return() { + return enqueue(returnImpl); + }, + }; }, }; } diff --git a/test/parallel/test-stream-iter-consumers-merge.js b/test/parallel/test-stream-iter-consumers-merge.js index b777a3ee205fad..0b9bcadc95ca7f 100644 --- a/test/parallel/test-stream-iter-consumers-merge.js +++ b/test/parallel/test-stream-iter-consumers-merge.js @@ -131,6 +131,31 @@ async function testMergeConsumerBreak() { assert.strictEqual(source1Return && source2Return, true); } +async function testMergeDoesNotDrainIdleSources() { + function source(n) { + return { + pulls: 0, + async *[Symbol.asyncIterator]() { + while (this.pulls < n) { + yield [Buffer.from(`${++this.pulls}`)]; + } + }, + }; + } + + const source1 = source(5); + const source2 = source(5); + const iterator = merge(source1, source2)[Symbol.asyncIterator](); + + await iterator.next(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + assert.ok(source1.pulls <= 1); + assert.ok(source2.pulls <= 1); + + await iterator.return?.(); +} + async function testMergeSignalMidIteration() { const ac = new AbortController(); async function* slowSource() { @@ -190,6 +215,7 @@ Promise.all([ testMergeSyncSources(), testMergeSourceError(), testMergeConsumerBreak(), + testMergeDoesNotDrainIdleSources(), testMergeSignalMidIteration(), testMergeStringSources(), testMergeObjectLikeSources(),