From 52772fe133562966811eb98ff10f148714a890d5 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 May 2026 23:33:53 +0200 Subject: [PATCH] stream: drop per-chunk Promise alloc in pipeTo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace setPromiseHandled with markPromiseAsHandled at the two pipeTo per-chunk call sites in readablestream.js (the fast-path drain loop and PipeToReadableStreamReadRequest's chunk handler). setPromiseHandled chains a `.then(undefined, () => {})` to the writer's per-chunk write Promise, purely to suppress its unhandled-rejection event. That chain Promise and the noop closure have no other observer in this code path: errors already propagate to pipeTo through the writer.closed promise that watchErrored monitors, and the LAST state.currentWrite is awaited by waitForCurrentWrite during shutdown (which still throws because markAsHandled only suppresses the event, not the rejection state — subsequent .then chains still observe it). markPromiseAsHandled sets V8's MarkAsHandled + MarkAsSilent flags directly via the util binding, with no per-chunk JS allocation. `benchmark/webstreams/pipe-to.js` at HWM=1024 improves by ~5% on my workstation (728K vs 694K ops/s mean over 6 alternating samples), and notably exhibits much tighter run-to-run variance because the per-chunk allocations no longer perturb GC timing. WPT streams parity preserved: 1403 subtests passing, 0 unexpected failures. --- lib/internal/webstreams/readablestream.js | 19 ++++++++++++++++--- lib/internal/webstreams/util.js | 2 ++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 876e3a5bf6e2f0..320ca1452fa30b 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -111,6 +111,7 @@ const { kState, kType, lazyTransfer, + markPromiseAsHandled, nonOpCancel, nonOpPull, nonOpStart, @@ -1569,9 +1570,18 @@ function readableStreamPipeTo( } // Write the chunk - we're already in a separate microtask from enqueue - // because we awaited writer[kState].ready.promise above + // because we awaited writer[kState].ready.promise above. + // + // setPromiseHandled's job here is purely to silence the per-chunk + // unhandled-rejection event — the chain Promise + noop closure it + // would allocate are unobserved by anything else (errors propagate + // through writer.closed, which pipeTo monitors via watchErrored). + // markPromiseAsHandled sets V8's MarkAsHandled + MarkAsSilent flags + // directly, with no per-chunk allocation. The LAST state.currentWrite + // is awaited in waitForCurrentWrite during shutdown — markAsHandled + // does not affect that subsequent .then chain. state.currentWrite = writableStreamDefaultWriterWrite(writer, chunk); - setPromiseHandled(state.currentWrite); + markPromiseAsHandled(state.currentWrite); // Check backpressure after each write if (dest[kState].backpressure) { @@ -1675,7 +1685,10 @@ class PipeToReadableStreamReadRequest { // "ReadableStreamPipeTo" step 15's "chunk steps". queueMicrotask(() => { this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk); - setPromiseHandled(this.state.currentWrite); + // See comment in pipeTo's fast path on the markPromiseAsHandled vs + // setPromiseHandled choice. The per-chunk silencing has no other + // observer; errors reach pipeTo via writer.closed monitoring. + markPromiseAsHandled(this.state.currentWrite); this.promise.resolve(false); }); } diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 808b0b069e57f7..20299685a96ad6 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -35,6 +35,7 @@ const { kPending, }, getPromiseDetails, + markPromiseAsHandled, } = internalBinding('util'); const assert = require('internal/assert'); @@ -225,6 +226,7 @@ module.exports = { kState, kType, lazyTransfer, + markPromiseAsHandled, nonOpCancel, nonOpFlush, nonOpPull,