From d1485e592797205c731013f1646764823db9790c Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Thu, 7 May 2026 09:13:51 +0900 Subject: [PATCH] fix: handle serveStatic stream fallback backpressure --- src/serve-static.ts | 37 +--------------- src/utils/stream.ts | 88 +++++++++++++++++++++++++++++++++++++++ test/utils/stream.test.ts | 75 +++++++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 35 deletions(-) create mode 100644 src/utils/stream.ts create mode 100644 test/utils/stream.test.ts diff --git a/src/serve-static.ts b/src/serve-static.ts index 2e1ef85..842c6c7 100644 --- a/src/serve-static.ts +++ b/src/serve-static.ts @@ -1,10 +1,9 @@ import type { Context, Env, MiddlewareHandler } from 'hono' import { getMimeType } from 'hono/utils/mime' -import type { ReadStream, Stats } from 'node:fs' +import type { Stats } from 'node:fs' import { createReadStream, statSync, existsSync } from 'node:fs' import { join } from 'node:path' -import { versions } from 'node:process' -import { Readable } from 'node:stream' +import { createStreamBody } from './utils/stream' export type ServeStaticOptions = { /** @@ -28,38 +27,6 @@ const ENCODINGS = { } as const const ENCODINGS_ORDERED_KEYS = Object.keys(ENCODINGS) as (keyof typeof ENCODINGS)[] -// In Node.js versions that do not have the following PR applied, using Readable.toWeb may cause unexpected exceptions. -// https://github.com/nodejs/node/pull/54206 -const pr54206Applied = () => { - const [major, minor] = versions.node.split('.').map((component) => parseInt(component)) - return major >= 23 || (major === 22 && minor >= 7) || (major === 20 && minor >= 18) -} -const useReadableToWeb = pr54206Applied() - -const createStreamBody = (stream: ReadStream) => { - if (useReadableToWeb) { - return Readable.toWeb(stream) as ReadableStream - } - const body = new ReadableStream({ - start(controller) { - stream.on('data', (chunk) => { - controller.enqueue(chunk) - }) - stream.on('error', (err) => { - controller.error(err) - }) - stream.on('end', () => { - controller.close() - }) - }, - - cancel() { - stream.destroy() - }, - }) - return body -} - const getStats = (path: string) => { let stats: Stats | undefined try { diff --git a/src/utils/stream.ts b/src/utils/stream.ts new file mode 100644 index 0000000..01158d9 --- /dev/null +++ b/src/utils/stream.ts @@ -0,0 +1,88 @@ +import type { ReadStream } from 'node:fs' +import { versions } from 'node:process' +import { Readable } from 'node:stream' + +// In Node.js versions that do not have the following PR applied, using Readable.toWeb may cause unexpected exceptions. +// https://github.com/nodejs/node/pull/54206 +const pr54206Applied = () => { + const [major, minor] = versions.node.split('.').map((component) => parseInt(component)) + return major >= 23 || (major === 22 && minor >= 7) || (major === 20 && minor >= 18) +} +const useReadableToWeb = pr54206Applied() + +export const createStreamBody = ( + stream: ReadStream, + useNativeReadableToWeb = useReadableToWeb +): ReadableStream => { + if (useNativeReadableToWeb) { + return Readable.toWeb(stream) as ReadableStream + } + + let controller: ReadableStreamDefaultController | undefined + let settled = false + + const cleanup = () => { + stream.off('data', onData) + stream.off('error', onError) + stream.off('end', onTerminate) + stream.off('close', onTerminate) + } + + const settle = (callback?: () => void) => { + if (settled) { + return + } + settled = true + cleanup() + callback?.() + } + + const onData = (chunk: Buffer | string) => { + if (settled || !controller) { + return + } + // createReadStream is called without `encoding`, so chunks are always Buffer. + controller.enqueue(chunk as Buffer) + if ((controller.desiredSize ?? 0) <= 0) { + stream.pause() + } + } + + const onError = (error: Error) => { + settle(() => { + controller?.error(error) + }) + } + + const onTerminate = () => { + settle(() => { + controller?.close() + }) + } + + return new ReadableStream({ + start(streamController) { + controller = streamController + stream.on('data', onData) + stream.on('error', onError) + stream.on('end', onTerminate) + stream.on('close', onTerminate) + stream.pause() + }, + + pull() { + if (!settled) { + stream.resume() + } + }, + + cancel() { + settle() + // Suppress late `error` emitted between destroy() and the terminal `close`. + const ignoreError = () => {} + stream.on('error', ignoreError) + stream.once('close', () => stream.off('error', ignoreError)) + stream.destroy() + }, + }) +} diff --git a/test/utils/stream.test.ts b/test/utils/stream.test.ts new file mode 100644 index 0000000..08c38e5 --- /dev/null +++ b/test/utils/stream.test.ts @@ -0,0 +1,75 @@ +import { EventEmitter } from 'node:events' +import type { ReadStream } from 'node:fs' +import { createStreamBody } from '../../src/utils/stream' + +class FakeReadStream extends EventEmitter { + pause = vi.fn(() => this) + resume = vi.fn(() => this) + destroy = vi.fn(() => this) +} + +const asReadStream = (stream: FakeReadStream) => stream as unknown as ReadStream + +describe('createStreamBody fallback', () => { + it('pauses the node stream when the web stream queue is full and resumes on pull', async () => { + const stream = new FakeReadStream() + const body = createStreamBody(asReadStream(stream), false) + const pauseCallsAfterStart = stream.pause.mock.calls.length + + stream.emit('data', Buffer.from('a')) + + expect(stream.pause.mock.calls.length).toBeGreaterThan(pauseCallsAfterStart) + + const reader = body.getReader() + const resumeCallsBeforeRead = stream.resume.mock.calls.length + const result = await reader.read() + await Promise.resolve() + + expect(result.done).toBe(false) + expect(Buffer.from(result.value ?? []).toString()).toBe('a') + expect(stream.resume.mock.calls.length).toBeGreaterThan(resumeCallsBeforeRead) + + await reader.cancel() + }) + + it('destroys the node stream on cancel and ignores later terminal events', async () => { + const stream = new FakeReadStream() + const body = createStreamBody(asReadStream(stream), false) + const reader = body.getReader() + + await reader.cancel() + + expect(stream.destroy).toHaveBeenCalledTimes(1) + expect(stream.listenerCount('data')).toBe(0) + expect(stream.listenerCount('end')).toBe(0) + expect(() => stream.emit('end')).not.toThrow() + expect(() => stream.emit('error', new Error('late error'))).not.toThrow() + + stream.emit('close') + expect(stream.listenerCount('error')).toBe(0) + }) + + it('propagates node stream errors to the web stream reader', async () => { + const stream = new FakeReadStream() + const body = createStreamBody(asReadStream(stream), false) + const reader = body.getReader() + const error = new Error('read failed') + const readPromise = reader.read() + + stream.emit('error', error) + + await expect(readPromise).rejects.toBe(error) + }) + + it('closes the web stream on end and ignores a later close event', async () => { + const stream = new FakeReadStream() + const body = createStreamBody(asReadStream(stream), false) + const reader = body.getReader() + const readPromise = reader.read() + + stream.emit('end') + + await expect(readPromise).resolves.toEqual({ done: true, value: undefined }) + expect(() => stream.emit('close')).not.toThrow() + }) +})