Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:6.2
// swift-tools-version:6.4
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift HTTP Server open source project
Expand All @@ -16,7 +16,7 @@
import PackageDescription

let extraSettings: [SwiftSetting] = [
.enableExperimentalFeature("SuppressedAssociatedTypes"),
.enableExperimentalFeature("SuppressedAssociatedTypesWithDefaults"),
.enableExperimentalFeature("LifetimeDependence"),
.enableExperimentalFeature("Lifetimes"),
.enableUpcomingFeature("LifetimeDependence"),
Expand All @@ -29,6 +29,13 @@ let extraSettings: [SwiftSetting] = [

let package = Package(
name: "swift-http-server",
platforms: [ // TODO: Needed until https://github.com/swiftlang/swift/issues/89028 is fixed
.macOS(.v15),
.iOS(.v18),
.watchOS(.v11),
.tvOS(.v18),
.visionOS(.v2),
],
products: [
.library(
name: "NIOHTTPServer",
Expand All @@ -42,7 +49,12 @@ let package = Package(
dependencies: [
.package(
url: "https://github.com/apple/swift-http-api-proposal.git",
branch: "2cb0ef6722e2086a04b5a14c0c40971b8038099a"
revision: "140b8c2aa773514e2464c15db8d94f4eca46d4a1"
),
.package(
url: "https://github.com/apple/swift-async-algorithms.git",
from: "1.1.4",
traits: ["UnstableAsyncStreaming"]
),
.package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"),
Expand Down Expand Up @@ -71,7 +83,7 @@ let package = Package(
.target(
name: "NIOHTTPServer",
dependencies: [
.product(name: "AsyncStreaming", package: "swift-http-api-proposal"),
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
.product(name: "X509", package: "swift-certificates"),
.product(name: "HTTPTypes", package: "swift-http-types"),
.product(name: "NIOCore", package: "swift-nio"),
Expand Down
144 changes: 35 additions & 109 deletions Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
//===----------------------------------------------------------------------===//

public import AsyncStreaming
public import BasicContainers
public import HTTPAPIs
public import HTTPTypes
import NIOCore
import NIOHTTPTypes
Expand All @@ -38,93 +40,19 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
/// The type of errors that can occur during reading operations.
public typealias ReadFailure = any Error

/// The buffer type used to hand elements to the caller.
public typealias Buffer = UniqueArray<UInt8>
Comment thread
FranzBusch marked this conversation as resolved.

/// The HTTP trailer fields captured at the end of the request.
fileprivate var state: ReaderState

struct RequestBodyStateMachine {
enum State {
// The request body is currently being read: expecting more request body parts or a request end part.
case readingBody(ReadingBodyState)

// The request end part was received. We have finished.
case finished

enum ReadingBodyState {
// All received bytes have been consumed; no excess bytes need to be stored.
case noExcess

// `read` was called with a `maximumCount` value that was lower than the bytes available. The excess
// bytes are stored here so they can be dispensed in future calls to `read`.
case excess(ByteBuffer)
}
}

private var state: State

/// The iterator that provides HTTP request parts from the underlying channel.
private var iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator

init(iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator) {
self.state = .readingBody(.noExcess)
self.iterator = iterator
}

enum ReadResult {
case readBody(ByteBuffer)
case readEnd(HTTPFields?)
case streamFinished
}

mutating func read(limit: Int?) async throws -> ReadResult {
switch self.state {
case .readingBody(let readingBodyState):
var bodyElement: ByteBuffer

switch readingBodyState {
case .excess(let excessElement):
// There was an excess of bytes from the previous call to `read`. We read directly from this
// excess and don't advance the iterator.
bodyElement = excessElement

case .noExcess:
// There is no excess from previous reads. We obtain the next element from the stream.
let requestPart = try await self.iterator.next(isolation: #isolation)

switch requestPart {
case .head:
fatalError("Unexpectedly received a request head.")
/// The iterator that provides HTTP request parts from the underlying channel.
private var iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator

case .none:
throw RequestBodyReadError.streamEndedBeforeReceivingRequestEnd

case .body(let element):
bodyElement = element

case .end(let trailers):
self.state = .finished
return .readEnd(trailers)
}
}

if let limit, limit < bodyElement.readableBytes,
let truncated = bodyElement.readSlice(length: limit)
{
// There are more bytes available than `limit`. We must store the excess in a buffer for it to
// be consumed in the next call to `read`.
self.state = .readingBody(.excess(bodyElement))
return .readBody(truncated)
}

self.state = .readingBody(.noExcess)
return .readBody(bodyElement)

case .finished:
return .streamFinished
}
}
}

var requestBodyStateMachine: RequestBodyStateMachine
/// A reusable buffer handed to the body closure on each call to ``read(body:)``.
/// Reusing it across calls preserves the allocation; the buffer is cleared
/// (while keeping its capacity) at the start of every read.
private var buffer: UniqueArray<UInt8>

/// Initializes a new request body reader with the given NIO async channel iterator.
///
Expand All @@ -133,42 +61,40 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator,
readerState: ReaderState
) {
self.requestBodyStateMachine = .init(iterator: iterator)
self.iterator = iterator
self.state = readerState
self.buffer = UniqueArray<UInt8>()
}

/// Reads a chunk of request body data.
///
/// - Parameter body: A function that consumes the read element (or nil for end of stream)
/// and returns a value of type `Return`.
/// - Returns: The value returned by the body function after processing the read element.
/// - Throws: An error if the reading operation fails.
public mutating func read<Return, Failure: Error>(
maximumCount: Int?,
body: nonisolated(nonsending) (consuming Span<ReadElement>) async throws(Failure) -> Return
public mutating func read<Return: ~Copyable, Failure: Error>(
body: nonisolated(nonsending) (inout Buffer) async throws(Failure) -> Return
) async throws(EitherError<ReadFailure, Failure>) -> Return {
let readResult: RequestBodyStateMachine.ReadResult
let requestPart: HTTPRequestPart?
do {
readResult = try await self.requestBodyStateMachine.read(limit: maximumCount)
requestPart = try await self.iterator.next(isolation: #isolation)
} catch {
throw .first(error)
}

do {
switch readResult {
case .readBody(let readElement):
return try await body(Array(buffer: readElement).span)

case .readEnd(let trailers):
self.state.wrapped.withLock { state in
state.trailers = trailers
state.finishedReading = true
}
return try await body(.init())

case .streamFinished:
return try await body(.init())
self.buffer.removeAll(keepingCapacity: true)
switch requestPart {
case .head:
fatalError()
case .body(let element):
self.buffer.reserveCapacity(element.readableBytes)
self.buffer.append(copying: element.readableBytesUInt8Span)
case .end(let trailers):
self.state.wrapped.withLock { state in
state.trailers = trailers
state.finishedReading = true
}
case .none:
throw .first(RequestBodyReadError.streamEndedBeforeReceivingRequestEnd)
}

do {
return try await body(&self.buffer)
} catch {
throw .second(error)
}
Expand Down Expand Up @@ -208,7 +134,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator,
readerState: ReaderState
) {
self.iterator = Disconnected(value: iterator)
self.iterator = .init(value: iterator)
self.state = readerState
}

Expand Down
44 changes: 25 additions & 19 deletions Sources/NIOHTTPServer/HTTPResponseConcludingAsyncWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
//===----------------------------------------------------------------------===//

public import AsyncStreaming
import BasicContainers
public import BasicContainers
public import HTTPAPIs
public import HTTPTypes
import NIOCore
import NIOHTTPTypes
Expand All @@ -34,47 +35,52 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl
///
/// This writer handles the body parts of an HTTP response, allowing them to be written
/// incrementally as spans of bytes.
public struct ResponseBodyAsyncWriter: AsyncWriter {
public struct ResponseBodyAsyncWriter: AsyncWriter, ~Copyable {
/// The type of elements this writer accepts (byte arrays representing body chunks).
public typealias WriteElement = UInt8

/// The type of errors that can occur during writing operations.
public typealias WriteFailure = Error
public typealias WriteFailure = any Error

/// The buffer type used to receive elements from the caller.
public typealias Buffer = UniqueArray<UInt8>
Comment thread
FranzBusch marked this conversation as resolved.

/// The underlying NIO writer for HTTP response parts.
private var writer: NIOAsyncChannelOutboundWriter<HTTPResponsePart>

/// A reusable buffer handed to the body closure on each call to ``write(_:)``.
/// Reusing it across calls preserves the allocation; the buffer is cleared
/// (while keeping its capacity) at the start of every write.
private var buffer: UniqueArray<UInt8>

/// Initializes a new response body writer with the given NIO async channel writer.
///
/// - Parameter writer: The NIO async channel outbound writer to use for writing response parts.
init(writer: NIOAsyncChannelOutboundWriter<HTTPResponsePart>) {
self.writer = writer
self.buffer = UniqueArray<UInt8>()
}

/// Writes a chunk of response body data to the underlying writer.
///
/// - Parameter element: A span of bytes representing the body chunk to write.
/// - Throws: An error if the writing operation fails.
public mutating func write<Result, Failure: Error>(
_ body: nonisolated(nonsending) (inout OutputSpan<WriteElement>) async throws(Failure) -> Result
) async throws(EitherError<any WriteFailure, Failure>) -> Result {
var buffer = RigidArray<WriteElement>.init(capacity: 1024)

let result: Result
public mutating func write<Return: ~Copyable, Failure: Error>(
_ body: nonisolated(nonsending) (inout Buffer) async throws(Failure) -> Return
) async throws(EitherError<WriteFailure, Failure>) -> Return {
self.buffer.removeAll(keepingCapacity: true)
let result: Return
do {
result = try await buffer.append(count: 1024) { (span) async throws(Failure) -> Result in
try await body(&span)
}
result = try await body(&self.buffer)
} catch {
throw .second(error)
}

var byteBuffer = ByteBuffer()
byteBuffer.reserveCapacity(buffer.count)
for index in buffer.indices {
byteBuffer.writeInteger(buffer[index])
if self.buffer.count == 0 {
return result
}

var byteBuffer = ByteBuffer()
byteBuffer.reserveCapacity(self.buffer.count)
byteBuffer.writeBytes(self.buffer.span.bytes)

do {
try await self.writer.write(.body(byteBuffer))
} catch {
Expand Down
Loading
Loading