Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 112 additions & 47 deletions lib/internal/streams/iter/consumers.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const {
ArrayPrototypeSlice,
Promise,
PromisePrototypeThen,
PromiseResolve,
SafePromiseAllReturnVoid,
SymbolAsyncIterator,
TypedArrayPrototypeGetBuffer,
Expand Down Expand Up @@ -412,19 +413,22 @@ function merge(...args) {

return {
__proto__: null,
async *[SymbolAsyncIterator]() {
[SymbolAsyncIterator]() {
const signal = options?.signal;

signal?.throwIfAborted();

if (normalized.length === 0) return;
if (normalized.length === 0) {
return (async function*() {})();
}

if (normalized.length === 1) {
for await (const batch of normalized[0]) {
signal?.throwIfAborted();
yield batch;
}
return;
return (async function*() {
for await (const batch of normalized[0]) {
signal?.throwIfAborted();
yield batch;
}
})();
}

// Multiple sources - use a ready queue so that batches that settle
Expand All @@ -434,74 +438,63 @@ function merge(...args) {
const ready = [];
let activeCount = normalized.length;
let waitResolve = null;
let lastIterator = null;
let started = false;
let done = false;

// Called when a source's .next() settles. Pushes the result into
// the ready queue and wakes the consumer if it's waiting.
const onSettled = (iterator, result) => {
if (done) return;
if (result.done) {
activeCount--;
} else {
ArrayPrototypePush(ready, result.value);
// Immediately request the next value from this source
// (at most one pending .next() per source)
PromisePrototypeThen(
iterator.next(),
(r) => onSettled(iterator, r),
(err) => {
ArrayPrototypePush(ready, { __proto__: null, error: err });
if (waitResolve) {
waitResolve();
waitResolve = null;
}
},
);
ArrayPrototypePush(ready, {
__proto__: null,
iterator,
value: result.value,
});
}
if (waitResolve) {
waitResolve();
waitResolve = null;
}
};

// Start one .next() per source
const iterators = [];
for (let i = 0; i < normalized.length; i++) {
const iterator = normalized[i][SymbolAsyncIterator]();
ArrayPrototypePush(iterators, iterator);
const requestNext = (iterator) => {
PromisePrototypeThen(
iterator.next(),
(r) => onSettled(iterator, r),
(err) => {
if (done) return;
ArrayPrototypePush(ready, { __proto__: null, error: err });
if (waitResolve) {
waitResolve();
waitResolve = null;
}
},
);
}
};

try {
while (activeCount > 0 || ready.length > 0) {
signal?.throwIfAborted();
const iterators = [];

// Drain ready queue synchronously
while (ready.length > 0) {
const item = ArrayPrototypeShift(ready);
if (item?.error) {
throw item.error;
}
yield item;
}
const start = () => {
if (started) return;
started = true;
for (let i = 0; i < normalized.length; i++) {
const iterator = normalized[i][SymbolAsyncIterator]();
ArrayPrototypePush(iterators, iterator);
requestNext(iterator);
}
};

// If sources are still active, wait for the next settlement
if (activeCount > 0) {
await new Promise((resolve) => {
waitResolve = resolve;
});
}
const cleanup = async () => {
if (done) return;
done = true;
if (waitResolve) {
waitResolve();
waitResolve = null;
}
} finally {
// Clean up: return all iterators
await SafePromiseAllReturnVoid(iterators, async (iterator) => {
if (iterator.return) {
try {
Expand All @@ -511,7 +504,79 @@ function merge(...args) {
}
}
});
}
};

const nextImpl = async () => {
try {
if (done) {
return { __proto__: null, done: true, value: undefined };
}

signal?.throwIfAborted();

start();

if (lastIterator !== null) {
requestNext(lastIterator);
lastIterator = null;
}

while (!done) {
signal?.throwIfAborted();

while (ready.length > 0) {
const item = ArrayPrototypeShift(ready);
if (item?.error) {
await cleanup();
throw item.error;
}
lastIterator = item.iterator;
return { __proto__: null, done: false, value: item.value };
}

if (activeCount === 0) {
await cleanup();
return { __proto__: null, done: true, value: undefined };
}

await new Promise((resolve) => {
waitResolve = resolve;
});
}

return { __proto__: null, done: true, value: undefined };
} catch (err) {
await cleanup();
throw err;
}
};

const returnImpl = async () => {
await cleanup();
return { __proto__: null, done: true, value: undefined };
};

let nextQueue = PromiseResolve();
const enqueue = (fn) => {
const result = PromisePrototypeThen(nextQueue, fn, fn);
nextQueue = PromisePrototypeThen(result, () => {}, () => {});
return result;
};

return {
__proto__: null,
[SymbolAsyncIterator]() {
return this;
},

next() {
return enqueue(nextImpl);
},

return() {
return enqueue(returnImpl);
},
};
},
};
}
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-stream-iter-consumers-merge.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,31 @@ async function testMergeConsumerBreak() {
assert.strictEqual(source1Return && source2Return, true);
}

async function testMergeDoesNotDrainIdleSources() {
function source(n) {
return {
pulls: 0,
async *[Symbol.asyncIterator]() {
while (this.pulls < n) {
yield [Buffer.from(`${++this.pulls}`)];
}
},
};
}

const source1 = source(5);
const source2 = source(5);
const iterator = merge(source1, source2)[Symbol.asyncIterator]();

await iterator.next();
await new Promise((resolve) => setTimeout(resolve, 20));

assert.ok(source1.pulls <= 1);
assert.ok(source2.pulls <= 1);

await iterator.return?.();
}

async function testMergeSignalMidIteration() {
const ac = new AbortController();
async function* slowSource() {
Expand Down Expand Up @@ -190,6 +215,7 @@ Promise.all([
testMergeSyncSources(),
testMergeSourceError(),
testMergeConsumerBreak(),
testMergeDoesNotDrainIdleSources(),
testMergeSignalMidIteration(),
testMergeStringSources(),
testMergeObjectLikeSources(),
Expand Down
Loading