diff --git a/Package.swift b/Package.swift index 859d9fa..50eec46 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:6.2 +// swift-tools-version:6.4 //===----------------------------------------------------------------------===// // // This source file is part of the Swift HTTP Server open source project @@ -16,7 +16,7 @@ import PackageDescription let extraSettings: [SwiftSetting] = [ - .enableExperimentalFeature("SuppressedAssociatedTypes"), + .enableExperimentalFeature("SuppressedAssociatedTypesWithDefaults"), .enableExperimentalFeature("LifetimeDependence"), .enableExperimentalFeature("Lifetimes"), .enableUpcomingFeature("LifetimeDependence"), @@ -29,6 +29,13 @@ let extraSettings: [SwiftSetting] = [ let package = Package( name: "swift-http-server", + platforms: [ // TODO: Needed until https://github.com/swiftlang/swift/issues/89028 is fixed + .macOS(.v15), + .iOS(.v18), + .watchOS(.v11), + .tvOS(.v18), + .visionOS(.v2), + ], products: [ .library( name: "NIOHTTPServer", @@ -42,7 +49,12 @@ let package = Package( dependencies: [ .package( url: "https://github.com/apple/swift-http-api-proposal.git", - branch: "2cb0ef6722e2086a04b5a14c0c40971b8038099a" + revision: "140b8c2aa773514e2464c15db8d94f4eca46d4a1" + ), + .package( + url: "https://github.com/apple/swift-async-algorithms.git", + from: "1.1.4", + traits: ["UnstableAsyncStreaming"] ), .package(url: "https://github.com/apple/swift-http-types.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), @@ -71,7 +83,7 @@ let package = Package( .target( name: "NIOHTTPServer", dependencies: [ - .product(name: "AsyncStreaming", package: "swift-http-api-proposal"), + .product(name: "AsyncStreaming", package: "swift-async-algorithms"), .product(name: "X509", package: "swift-certificates"), .product(name: "HTTPTypes", package: "swift-http-types"), .product(name: "NIOCore", package: "swift-nio"), diff --git a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift index b70c4dd..d22323c 100644 --- a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -13,6 +13,8 @@ //===----------------------------------------------------------------------===// public import AsyncStreaming +public import BasicContainers +public import HTTPAPIs public import HTTPTypes import NIOCore import NIOHTTPTypes @@ -38,93 +40,19 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias ReadFailure = any Error + /// The buffer type used to hand elements to the caller. + public typealias Buffer = UniqueArray + /// The HTTP trailer fields captured at the end of the request. fileprivate var state: ReaderState - struct RequestBodyStateMachine { - enum State { - // The request body is currently being read: expecting more request body parts or a request end part. - case readingBody(ReadingBodyState) - - // The request end part was received. We have finished. - case finished - - enum ReadingBodyState { - // All received bytes have been consumed; no excess bytes need to be stored. - case noExcess - - // `read` was called with a `maximumCount` value that was lower than the bytes available. The excess - // bytes are stored here so they can be dispensed in future calls to `read`. - case excess(ByteBuffer) - } - } - - private var state: State - - /// The iterator that provides HTTP request parts from the underlying channel. - private var iterator: NIOAsyncChannelInboundStream.AsyncIterator - - init(iterator: NIOAsyncChannelInboundStream.AsyncIterator) { - self.state = .readingBody(.noExcess) - self.iterator = iterator - } - - enum ReadResult { - case readBody(ByteBuffer) - case readEnd(HTTPFields?) - case streamFinished - } - - mutating func read(limit: Int?) async throws -> ReadResult { - switch self.state { - case .readingBody(let readingBodyState): - var bodyElement: ByteBuffer - - switch readingBodyState { - case .excess(let excessElement): - // There was an excess of bytes from the previous call to `read`. We read directly from this - // excess and don't advance the iterator. - bodyElement = excessElement - - case .noExcess: - // There is no excess from previous reads. We obtain the next element from the stream. - let requestPart = try await self.iterator.next(isolation: #isolation) - - switch requestPart { - case .head: - fatalError("Unexpectedly received a request head.") + /// The iterator that provides HTTP request parts from the underlying channel. + private var iterator: NIOAsyncChannelInboundStream.AsyncIterator - case .none: - throw RequestBodyReadError.streamEndedBeforeReceivingRequestEnd - - case .body(let element): - bodyElement = element - - case .end(let trailers): - self.state = .finished - return .readEnd(trailers) - } - } - - if let limit, limit < bodyElement.readableBytes, - let truncated = bodyElement.readSlice(length: limit) - { - // There are more bytes available than `limit`. We must store the excess in a buffer for it to - // be consumed in the next call to `read`. - self.state = .readingBody(.excess(bodyElement)) - return .readBody(truncated) - } - - self.state = .readingBody(.noExcess) - return .readBody(bodyElement) - - case .finished: - return .streamFinished - } - } - } - - var requestBodyStateMachine: RequestBodyStateMachine + /// A reusable buffer handed to the body closure on each call to ``read(body:)``. + /// Reusing it across calls preserves the allocation; the buffer is cleared + /// (while keeping its capacity) at the start of every read. + private var buffer: UniqueArray /// Initializes a new request body reader with the given NIO async channel iterator. /// @@ -133,42 +61,40 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, readerState: ReaderState ) { - self.requestBodyStateMachine = .init(iterator: iterator) + self.iterator = iterator self.state = readerState + self.buffer = UniqueArray() } /// Reads a chunk of request body data. - /// - /// - Parameter body: A function that consumes the read element (or nil for end of stream) - /// and returns a value of type `Return`. - /// - Returns: The value returned by the body function after processing the read element. - /// - Throws: An error if the reading operation fails. - public mutating func read( - maximumCount: Int?, - body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + public mutating func read( + body: nonisolated(nonsending) (inout Buffer) async throws(Failure) -> Return ) async throws(EitherError) -> Return { - let readResult: RequestBodyStateMachine.ReadResult + let requestPart: HTTPRequestPart? do { - readResult = try await self.requestBodyStateMachine.read(limit: maximumCount) + requestPart = try await self.iterator.next(isolation: #isolation) } catch { throw .first(error) } - do { - switch readResult { - case .readBody(let readElement): - return try await body(Array(buffer: readElement).span) - - case .readEnd(let trailers): - self.state.wrapped.withLock { state in - state.trailers = trailers - state.finishedReading = true - } - return try await body(.init()) - - case .streamFinished: - return try await body(.init()) + self.buffer.removeAll(keepingCapacity: true) + switch requestPart { + case .head: + fatalError() + case .body(let element): + self.buffer.reserveCapacity(element.readableBytes) + self.buffer.append(copying: element.readableBytesUInt8Span) + case .end(let trailers): + self.state.wrapped.withLock { state in + state.trailers = trailers + state.finishedReading = true } + case .none: + throw .first(RequestBodyReadError.streamEndedBeforeReceivingRequestEnd) + } + + do { + return try await body(&self.buffer) } catch { throw .second(error) } @@ -208,7 +134,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, readerState: ReaderState ) { - self.iterator = Disconnected(value: iterator) + self.iterator = .init(value: iterator) self.state = readerState } diff --git a/Sources/NIOHTTPServer/HTTPResponseConcludingAsyncWriter.swift b/Sources/NIOHTTPServer/HTTPResponseConcludingAsyncWriter.swift index 16f0af5..7e36128 100644 --- a/Sources/NIOHTTPServer/HTTPResponseConcludingAsyncWriter.swift +++ b/Sources/NIOHTTPServer/HTTPResponseConcludingAsyncWriter.swift @@ -13,7 +13,8 @@ //===----------------------------------------------------------------------===// public import AsyncStreaming -import BasicContainers +public import BasicContainers +public import HTTPAPIs public import HTTPTypes import NIOCore import NIOHTTPTypes @@ -34,47 +35,52 @@ public struct HTTPResponseConcludingAsyncWriter: ConcludingAsyncWriter, ~Copyabl /// /// This writer handles the body parts of an HTTP response, allowing them to be written /// incrementally as spans of bytes. - public struct ResponseBodyAsyncWriter: AsyncWriter { + public struct ResponseBodyAsyncWriter: AsyncWriter, ~Copyable { /// The type of elements this writer accepts (byte arrays representing body chunks). public typealias WriteElement = UInt8 /// The type of errors that can occur during writing operations. - public typealias WriteFailure = Error + public typealias WriteFailure = any Error + + /// The buffer type used to receive elements from the caller. + public typealias Buffer = UniqueArray /// The underlying NIO writer for HTTP response parts. private var writer: NIOAsyncChannelOutboundWriter + /// A reusable buffer handed to the body closure on each call to ``write(_:)``. + /// Reusing it across calls preserves the allocation; the buffer is cleared + /// (while keeping its capacity) at the start of every write. + private var buffer: UniqueArray + /// Initializes a new response body writer with the given NIO async channel writer. /// /// - Parameter writer: The NIO async channel outbound writer to use for writing response parts. init(writer: NIOAsyncChannelOutboundWriter) { self.writer = writer + self.buffer = UniqueArray() } /// Writes a chunk of response body data to the underlying writer. - /// - /// - Parameter element: A span of bytes representing the body chunk to write. - /// - Throws: An error if the writing operation fails. - public mutating func write( - _ body: nonisolated(nonsending) (inout OutputSpan) async throws(Failure) -> Result - ) async throws(EitherError) -> Result { - var buffer = RigidArray.init(capacity: 1024) - - let result: Result + public mutating func write( + _ body: nonisolated(nonsending) (inout Buffer) async throws(Failure) -> Return + ) async throws(EitherError) -> Return { + self.buffer.removeAll(keepingCapacity: true) + let result: Return do { - result = try await buffer.append(count: 1024) { (span) async throws(Failure) -> Result in - try await body(&span) - } + result = try await body(&self.buffer) } catch { throw .second(error) } - var byteBuffer = ByteBuffer() - byteBuffer.reserveCapacity(buffer.count) - for index in buffer.indices { - byteBuffer.writeInteger(buffer[index]) + if self.buffer.count == 0 { + return result } + var byteBuffer = ByteBuffer() + byteBuffer.reserveCapacity(self.buffer.count) + byteBuffer.writeBytes(self.buffer.span.bytes) + do { try await self.writer.write(.body(byteBuffer)) } catch { diff --git a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift index 7095595..36641b6 100644 --- a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift +++ b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import AsyncStreaming +import BasicContainers import HTTPTypes import NIOCore import NIOHTTP1 @@ -42,7 +43,7 @@ struct HTTPRequestConcludingAsyncReaderTests { _ = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader - try await bodyReader.read(maximumCount: nil) { _ in } + try await bodyReader.read { _ in } } } } @@ -65,9 +66,9 @@ struct HTTPRequestConcludingAsyncReaderTests { _ = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader - try await bodyReader.read(maximumCount: nil) { _ in } + try await bodyReader.read { _ in } // The stream has finished without an end part. Calling `read` now should result in a fatal error. - try await bodyReader.read(maximumCount: nil) { _ in } + try await bodyReader.read { _ in } } } } @@ -94,20 +95,19 @@ struct HTTPRequestConcludingAsyncReaderTests { let (requestBody, finalElement) = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader - var buffer = ByteBuffer() + var requestBody = ByteBuffer() // Read the body chunk - try await bodyReader.read(maximumCount: nil) { element in - buffer.writeBytes(element.bytes) - return + try await bodyReader.read { buffer in + _ = requestBody.writeBytes(buffer.span.bytes) } // Now read the trailer. We should get back an empty element here, but the trailer should be available in // the tuple returned by `consumeAndConclude` - try await bodyReader.read(maximumCount: nil) { element in + try await bodyReader.read { element in #expect(element.count == 0) } - return buffer + return requestBody } #expect(requestBody == body) @@ -146,11 +146,10 @@ struct HTTPRequestConcludingAsyncReaderTests { let (_, finalElement) = try await requestReader.consumeAndConclude { bodyReader in // Read all body chunks var chunksProcessed = 0 - // swift-format-ignore: ReplaceForEachWithForLoop - try await bodyReader.forEach { element in - var buffer = ByteBuffer() - buffer.writeBytes(element.bytes) - #expect(bodyChunks[chunksProcessed] == buffer) + try await bodyReader.forEachBuffer { buffer in + var chunk = ByteBuffer() + chunk.writeBytes(buffer.span.bytes) + #expect(bodyChunks[chunksProcessed] == chunk) chunksProcessed += 1 } @@ -184,7 +183,7 @@ struct HTTPRequestConcludingAsyncReaderTests { // Check that the read error is propagated await #expect(throws: TestError.errorWhileReading) { do { - try await bodyReader.read(maximumCount: nil) { (element) throws(TestError) in + try await bodyReader.read { (element) throws(TestError) in throw TestError.errorWhileReading } } catch let eitherError as EitherError { @@ -208,114 +207,23 @@ struct HTTPRequestConcludingAsyncReaderTests { readerState: .init() ) - _ = try await requestReader.consumeAndConclude { requestBodyReader in + _ = await requestReader.consumeAndConclude { requestBodyReader in var requestBodyReader = requestBodyReader // There are more bytes available than our limit. - let collected = try await requestBodyReader.collect(upTo: 9) { element in - var buffer = ByteBuffer() - buffer.writeBytes(element.bytes) - return buffer - } - - // We should only collect up to the limit (the first 9 bytes). - #expect(collected == .init(repeating: 5, count: 9)) - } - } - - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - @Test("Multiple body chunks; multiple reads with limits") - func testReadWithLimits() async throws { - let (stream, source) = NIOAsyncChannelInboundStream.makeTestingStream() - - // First write 10 bytes; - source.yield(.body(.init(repeating: 1, count: 10))) - // Then write another 5 bytes. - source.yield(.body(.init(repeating: 2, count: 5))) - source.yield(.end(nil)) - source.finish() - - let streamIterator = stream.makeAsyncIterator() - - let requestReader = HTTPRequestConcludingAsyncReader(iterator: streamIterator, readerState: .init()) - _ = try await requestReader.consumeAndConclude { requestBodyReader in - var requestBodyReader = requestBodyReader - - // Collect 8 bytes (partial of first write). - let collectedPartOne = try await requestBodyReader.collect(upTo: 8) { element in - var buffer = ByteBuffer() - buffer.writeBytes(element.bytes) - return buffer - } - - // Then collect 4 more bytes (overlap of first and second write). - let collectedPartTwo = try await requestBodyReader.collect(upTo: 4) { element in - var buffer = ByteBuffer() - buffer.writeBytes(element.bytes) - return buffer - } - - // Then collect 3 more bytes (partial of second write). - let collectedPartThree = try await requestBodyReader.collect(upTo: 3) { element in - var buffer = ByteBuffer() - buffer.writeBytes(element.bytes) - return buffer - } - - #expect(collectedPartOne == .init(repeating: 1, count: 8)) - #expect(collectedPartTwo == .init([1, 1, 2, 2])) - #expect(collectedPartThree == .init(repeating: 2, count: 3)) - } - } - - @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) - @Test("Multiple random-length chunks; multiple reads with random limits") - func testMultipleReadsWithRandomLimits() async throws { - let (stream, source) = NIOAsyncChannelInboundStream.makeTestingStream() - - // Generate random ByteBuffers of varying length and write them to the stream. - var randomBuffer = ByteBuffer() - for _ in 0..<100 { - let randomNumber = UInt8.random(in: 1...50) - let randomCount = Int.random(in: 1...50) - - let randomData = ByteBuffer(repeating: randomNumber, count: randomCount) - // Store the data so we can track what we have wrote - randomBuffer.writeImmutableBuffer(randomData) - - source.yield(.body(randomData)) - } - source.yield(.end(nil)) - source.finish() - - let streamIterator = stream.makeAsyncIterator() - - let requestReader = HTTPRequestConcludingAsyncReader(iterator: streamIterator, readerState: .init()) - _ = try await requestReader.consumeAndConclude { requestBodyReader in - var requestBodyReader = requestBodyReader - - var collectedBuffer = ByteBuffer() - while true { - let randomMaxCount = Int.random(in: 1...100) - - let collected = try await requestBodyReader.collect(upTo: randomMaxCount) { element in - var localBuffer = ByteBuffer() - localBuffer.writeBytes(element.bytes) - return localBuffer - } - - if collected.readableBytes == 0 { - break + await #expect(throws: AsyncReaderLeftOverElementsError.self) { + do { + try await requestBodyReader.collect(upTo: 9) { _ in } + } catch let eitherEitherError + as EitherError, Never> + { + do { + try eitherEitherError.unwrap() + } catch let eitherError as EitherError { + try eitherError.unwrap() + } } - - // The collected buffer should never exceed the specified max count. - try #require(collected.readableBytes <= randomMaxCount) - - collectedBuffer.writeImmutableBuffer(collected) } - - // Check if the collected buffer exactly matches what was written to the stream. - try #require(randomBuffer == collectedBuffer) } } } diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift index ce64962..1b2175c 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift @@ -57,11 +57,11 @@ struct NIOHTTPServiceLifecycleTests { try await server.serve { request, requestContext, requestReader, responseSender in _ = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader - try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } + try await bodyReader.read { _ in } firstChunkReadPromise.succeed() - try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } + try await bodyReader.read { _ in } } let responseBodyWriter = try await responseSender.send(.init(status: .ok)) @@ -146,13 +146,13 @@ struct NIOHTTPServiceLifecycleTests { var bodyReader = bodyReader let error = try await #require(throws: EitherError.self) { - try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } + try await bodyReader.read { _ in } firstChunkReadPromise.succeed() // The following call will block: the client will never send a request end part. This is // intentional because we want to keep the connection alive. - try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } + try await bodyReader.read { _ in } } #expect(throws: CancellationError.self) { try error.unwrap() } } @@ -226,13 +226,13 @@ struct NIOHTTPServiceLifecycleTests { var bodyReader = bodyReader let error = try await #require(throws: EitherError.self) { - try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } + try await bodyReader.read { _ in } firstChunkReadPromise.succeed() // The following call will block: the client will never send a request end part. This is // intentional because we want to keep the connection alive until the grace timer (500ms) fires. - try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } + try await bodyReader.read { _ in } } #expect(throws: RequestBodyReadError.streamEndedBeforeReceivingRequestEnd) { try error.unwrap() } } diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift index de5f0cd..00bf10b 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift @@ -84,7 +84,7 @@ struct NIOHTTPServerTests { let (_, finalElement) = try await reader.consumeAndConclude { bodyReader in var bodyReader = bodyReader return try await bodyReader.collect(upTo: Self.bodyData.readableBytes + 1) { body in - buffer.writeBytes(body.bytes) + buffer.writeBytes(body.span.bytes) } } #expect(buffer == Self.bodyData) @@ -157,7 +157,7 @@ struct NIOHTTPServerTests { var bodyReader = bodyReader var buffer = ByteBuffer() _ = try await bodyReader.collect(upTo: Self.bodyData.readableBytes + 1) { body in - buffer.writeBytes(body.bytes) + buffer.writeBytes(body.span.bytes) } return buffer } @@ -268,7 +268,7 @@ struct NIOHTTPServerTests { // This should fail: the client has closed the stream without sending an end part. let error = try await #require(throws: EitherError.self) { - try await bodyReader.read(maximumCount: nil) { _ in } + try await bodyReader.read { _ in } } switch httpVersion { @@ -326,14 +326,13 @@ struct NIOHTTPServerTests { let (_, finalElement) = try await reader.consumeAndConclude { bodyAsyncReader in var count = 1 - // swift-format-ignore: ReplaceForEachWithForLoop - try await bodyAsyncReader.forEach { span in - var buffer = ByteBuffer() - buffer.writeBytes(span.bytes) - #expect(buffer == ByteBuffer(bytes: [UInt8(count)])) + try await bodyAsyncReader.forEachBuffer { buffer in + var chunk = ByteBuffer() + chunk.writeBytes(buffer.span.bytes) + #expect(chunk == ByteBuffer(bytes: [UInt8(count)])) count += 1 - try await responseBodyWriter.write(span) + try await responseBodyWriter.write(buffer.span) } } #expect(finalElement == Self.trailer) @@ -959,9 +958,9 @@ extension NIOHTTPServerTests { ) async throws { let (requestBody, trailers) = try await reader.consumeAndConclude { bodyReader in var bodyReader = bodyReader - return try await bodyReader.collect(upTo: limit) { span in + return try await bodyReader.collect(upTo: limit) { inputSpan in var buffer = ByteBuffer() - buffer.writeBytes(span.bytes) + buffer.writeBytes(inputSpan.span.bytes) return buffer } }