Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 2 additions & 35 deletions src/serve-static.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import type { Context, Env, MiddlewareHandler } from 'hono'
import { getMimeType } from 'hono/utils/mime'
import type { ReadStream, Stats } from 'node:fs'
import type { Stats } from 'node:fs'
import { createReadStream, statSync, existsSync } from 'node:fs'
import { join } from 'node:path'
import { versions } from 'node:process'
import { Readable } from 'node:stream'
import { createStreamBody } from './utils/stream'

export type ServeStaticOptions<E extends Env = Env> = {
/**
Expand All @@ -28,38 +27,6 @@ const ENCODINGS = {
} as const
const ENCODINGS_ORDERED_KEYS = Object.keys(ENCODINGS) as (keyof typeof ENCODINGS)[]

// In Node.js versions that do not have the following PR applied, using Readable.toWeb may cause unexpected exceptions.
// https://github.com/nodejs/node/pull/54206
const pr54206Applied = () => {
const [major, minor] = versions.node.split('.').map((component) => parseInt(component))
return major >= 23 || (major === 22 && minor >= 7) || (major === 20 && minor >= 18)
}
const useReadableToWeb = pr54206Applied()

const createStreamBody = (stream: ReadStream) => {
if (useReadableToWeb) {
return Readable.toWeb(stream) as ReadableStream
}
const body = new ReadableStream({
start(controller) {
stream.on('data', (chunk) => {
controller.enqueue(chunk)
})
stream.on('error', (err) => {
controller.error(err)
})
stream.on('end', () => {
controller.close()
})
},

cancel() {
stream.destroy()
},
})
return body
}

const getStats = (path: string) => {
let stats: Stats | undefined
try {
Expand Down
88 changes: 88 additions & 0 deletions src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import type { ReadStream } from 'node:fs'
import { versions } from 'node:process'
import { Readable } from 'node:stream'

// In Node.js versions that do not have the following PR applied, using Readable.toWeb may cause unexpected exceptions.
// https://github.com/nodejs/node/pull/54206
const pr54206Applied = () => {
const [major, minor] = versions.node.split('.').map((component) => parseInt(component))
return major >= 23 || (major === 22 && minor >= 7) || (major === 20 && minor >= 18)
}
const useReadableToWeb = pr54206Applied()

export const createStreamBody = (
stream: ReadStream,
useNativeReadableToWeb = useReadableToWeb
): ReadableStream<Uint8Array> => {
if (useNativeReadableToWeb) {
return Readable.toWeb(stream) as ReadableStream<Uint8Array>
}

let controller: ReadableStreamDefaultController<Uint8Array> | undefined
let settled = false

const cleanup = () => {
stream.off('data', onData)
stream.off('error', onError)
stream.off('end', onTerminate)
stream.off('close', onTerminate)
}

const settle = (callback?: () => void) => {
if (settled) {
return
}
settled = true
cleanup()
callback?.()
}

const onData = (chunk: Buffer | string) => {
if (settled || !controller) {
return
}
// createReadStream is called without `encoding`, so chunks are always Buffer.
controller.enqueue(chunk as Buffer)
if ((controller.desiredSize ?? 0) <= 0) {
stream.pause()
}
}

const onError = (error: Error) => {
settle(() => {
controller?.error(error)
})
}

const onTerminate = () => {
settle(() => {
controller?.close()
})
}

return new ReadableStream<Uint8Array>({
start(streamController) {
controller = streamController
stream.on('data', onData)
stream.on('error', onError)
stream.on('end', onTerminate)
stream.on('close', onTerminate)
stream.pause()
},

pull() {
if (!settled) {
stream.resume()
}
},

cancel() {
settle()
// Suppress late `error` emitted between destroy() and the terminal `close`.
const ignoreError = () => {}
stream.on('error', ignoreError)
stream.once('close', () => stream.off('error', ignoreError))
stream.destroy()
},
})
}
75 changes: 75 additions & 0 deletions test/utils/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { EventEmitter } from 'node:events'
import type { ReadStream } from 'node:fs'
import { createStreamBody } from '../../src/utils/stream'

class FakeReadStream extends EventEmitter {
pause = vi.fn(() => this)
resume = vi.fn(() => this)
destroy = vi.fn(() => this)
}

const asReadStream = (stream: FakeReadStream) => stream as unknown as ReadStream

describe('createStreamBody fallback', () => {
it('pauses the node stream when the web stream queue is full and resumes on pull', async () => {
const stream = new FakeReadStream()
const body = createStreamBody(asReadStream(stream), false)
const pauseCallsAfterStart = stream.pause.mock.calls.length

stream.emit('data', Buffer.from('a'))

expect(stream.pause.mock.calls.length).toBeGreaterThan(pauseCallsAfterStart)

const reader = body.getReader()
const resumeCallsBeforeRead = stream.resume.mock.calls.length
const result = await reader.read()
await Promise.resolve()

expect(result.done).toBe(false)
expect(Buffer.from(result.value ?? []).toString()).toBe('a')
expect(stream.resume.mock.calls.length).toBeGreaterThan(resumeCallsBeforeRead)

await reader.cancel()
})

it('destroys the node stream on cancel and ignores later terminal events', async () => {
const stream = new FakeReadStream()
const body = createStreamBody(asReadStream(stream), false)
const reader = body.getReader()

await reader.cancel()

expect(stream.destroy).toHaveBeenCalledTimes(1)
expect(stream.listenerCount('data')).toBe(0)
expect(stream.listenerCount('end')).toBe(0)
expect(() => stream.emit('end')).not.toThrow()
expect(() => stream.emit('error', new Error('late error'))).not.toThrow()

stream.emit('close')
expect(stream.listenerCount('error')).toBe(0)
})

it('propagates node stream errors to the web stream reader', async () => {
const stream = new FakeReadStream()
const body = createStreamBody(asReadStream(stream), false)
const reader = body.getReader()
const error = new Error('read failed')
const readPromise = reader.read()

stream.emit('error', error)

await expect(readPromise).rejects.toBe(error)
})

it('closes the web stream on end and ignores a later close event', async () => {
const stream = new FakeReadStream()
const body = createStreamBody(asReadStream(stream), false)
const reader = body.getReader()
const readPromise = reader.read()

stream.emit('end')

await expect(readPromise).resolves.toEqual({ done: true, value: undefined })
expect(() => stream.emit('close')).not.toThrow()
})
})
Loading