diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 95871e99037d58..e0c5fcc6ea0da3 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -723,6 +723,18 @@ async function* createAsyncPipeline(source, transforms, signal) { } } +/** + * Check if a false sync write result means accepted backpressure. + * @param {object} writer - The writer whose sync method returned. + * @param {*} result - The return value from writeSync() or writevSync(). + * @returns {boolean} + */ +function isAcceptedSyncWriteBackpressure(writer, result) { + return result === false && + writer[kSyncWriteAcceptedOnFalse] === true && + writer.desiredSize === 0; +} + // ============================================================================= // Public API: pull() and pullSync() // ============================================================================= @@ -810,16 +822,29 @@ function pipeToSync(source, ...args) { const hasEndSync = typeof writer.endSync === 'function'; try { + let canContinue = true; for (const batch of pipeline) { + if (!canContinue) { + break; + } if (hasWritevSync && batch.length > 1) { - writer.writevSync(batch); + const result = writer.writevSync(batch); + if (result === false && + !isAcceptedSyncWriteBackpressure(writer, result)) { + break; + } for (let i = 0; i < batch.length; i++) { totalBytes += TypedArrayPrototypeGetByteLength(batch[i]); } } else { for (let i = 0; i < batch.length; i++) { const chunk = batch[i]; - writer.writeSync(chunk); + const result = writer.writeSync(chunk); + if (result === false && + !isAcceptedSyncWriteBackpressure(writer, result)) { + canContinue = false; + break; + } totalBytes += TypedArrayPrototypeGetByteLength(chunk); } } @@ -871,11 +896,6 @@ async function pipeTo(source, ...args) { const hasWritev = typeof writer.writev === 'function'; const hasWritevSync = typeof writer.writevSync === 'function'; const hasEndSync = typeof writer.endSync === 'function'; - const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true; - - function syncFalseWasAccepted() { - return syncFalseCanBeAccepted && writer.desiredSize === 0; - } function waitForSyncBackpressure() { const ondrain = writer[drainableProtocol]; @@ -892,9 +912,10 @@ async function pipeTo(source, ...args) { async function writeBatchAsyncFallback(batch, startIndex) { for (let i = startIndex; i < batch.length; i++) { const chunk = batch[i]; - if (hasWriteSync && writer.writeSync(chunk)) { + const result = hasWriteSync && writer.writeSync(chunk); + if (result) { // Sync retry succeeded - } else if (syncFalseWasAccepted()) { + } else if (isAcceptedSyncWriteBackpressure(writer, result)) { totalBytes += TypedArrayPrototypeGetByteLength(chunk); await waitForSyncBackpressure(); continue; @@ -914,8 +935,9 @@ async function pipeTo(source, ...args) { // is required. Callers must check: const p = writeBatch(b); if (p) await p; function writeBatch(batch) { if (hasWritev && batch.length > 1) { - if (!hasWritevSync || !writer.writevSync(batch)) { - if (hasWritevSync && syncFalseWasAccepted()) { + const result = hasWritevSync && writer.writevSync(batch); + if (!result) { + if (isAcceptedSyncWriteBackpressure(writer, result)) { for (let i = 0; i < batch.length; i++) { totalBytes += TypedArrayPrototypeGetByteLength(batch[i]); } @@ -935,8 +957,9 @@ async function pipeTo(source, ...args) { } for (let i = 0; i < batch.length; i++) { const chunk = batch[i]; - if (!hasWriteSync || !writer.writeSync(chunk)) { - if (hasWriteSync && syncFalseWasAccepted()) { + const result = hasWriteSync && writer.writeSync(chunk); + if (!result) { + if (isAcceptedSyncWriteBackpressure(writer, result)) { totalBytes += TypedArrayPrototypeGetByteLength(chunk); return writeBatchAfterAcceptedBackpressure(batch, i + 1); } diff --git a/test/parallel/test-stream-iter-pipeto-writev.js b/test/parallel/test-stream-iter-pipeto-writev.js index 71a691dd6627b7..3383a2a30405b3 100644 --- a/test/parallel/test-stream-iter-pipeto-writev.js +++ b/test/parallel/test-stream-iter-pipeto-writev.js @@ -134,6 +134,43 @@ async function testPushWriterBlockSyncFalseAccepted() { })(), 'abcd', 4); } +async function testPipeToSyncPushWriterStrictFalseRejected() { + const decoder = new TextDecoder(); + const { writer, readable } = push({ highWaterMark: 1 }); + + const total = pipeToSync(['a', 'b'], writer, { preventClose: true }); + assert.strictEqual(total, 1); + + const iter = readable[Symbol.asyncIterator](); + const first = await iter.next(); + assert.strictEqual(first.done, false); + assert.strictEqual(decoder.decode(first.value[0]), 'a'); + + const second = await Promise.race([ + iter.next().then((result) => { + return result.done ? '' : decoder.decode(result.value[0]); + }), + setImmediatePromise().then(() => ''), + ]); + assert.strictEqual(second, ''); + + await iter.return?.(); +} + +async function testPipeToSyncWritevFalseNotCounted() { + const writer = { + writevSync() { return false; }, + writeSync: common.mustNotCall(), + endSync() { return 0; }, + }; + function* source() { + yield [new Uint8Array([1]), new Uint8Array([2])]; + } + + const total = pipeToSync(source(), writer); + assert.strictEqual(total, 0); +} + // pipeToSync with writevSync async function testPipeToSyncWritev() { const batches = []; @@ -194,6 +231,8 @@ Promise.all([ testWriteSyncFailsMidBatch(), testWriteSyncAlwaysFails(), testPushWriterBlockSyncFalseAccepted(), + testPipeToSyncPushWriterStrictFalseRejected(), + testPipeToSyncWritevFalseNotCounted(), testPipeToSyncWritev(), testPipeToSyncPlainChunksWritev(), testPipeToSyncWriteFallback(),