Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,47 +65,52 @@ extension NIOHTTPServer {

func setupHTTP1_1ServerChannels(
bindTargets: [NIOHTTPServerConfiguration.BindTarget]
) async throws -> [NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>] {
let bootstrap = ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
) async throws -> [(
NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>, ServerQuiescingHelper
)] {
let bootstrap = ServerBootstrap(group: self.eventLoopGroup)
.serverChannelOption(.socketOption(.so_reuseaddr), value: 1)
.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
self.serverQuiescingHelper.makeServerChannelHandler(channel: channel)
)
}
}

var serverChannels = [NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>]()
var serverChannels = [
(NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>, ServerQuiescingHelper)
]()

do {
for bindTarget in bindTargets {
switch bindTarget.backing {
case .hostAndPort(let host, let port):
let serverChannel =
try await bootstrap.bind(host: host, port: port) { channel in
self.setupHTTP1_1Connection(
channel: channel,
asyncChannelConfiguration: .init(
backPressureStrategy: .init(self.configuration.backpressureStrategy),
isOutboundHalfClosureEnabled: true
),
isSecure: false
let serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)

let serverChannel = try await bootstrap.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
serverQuiescingHelper.makeServerChannelHandler(channel: channel)
)
}
serverChannels.append(serverChannel)
}.bind(host: host, port: port) { channel in
self.setupHTTP1_1Connection(
channel: channel,
asyncChannelConfiguration: .init(
backPressureStrategy: .init(self.configuration.backpressureStrategy),
isOutboundHalfClosureEnabled: true
),
isSecure: false
)
}
serverChannels.append((serverChannel, serverQuiescingHelper))
}
}
} catch {
// A later bind failed: close any channels we already bound to avoid leaking sockets.
// We await the closes so the sockets are fully released by the time we throw, giving the
// caller deterministic semantics: when `serve` throws, all cleanup is done.
for serverChannel in serverChannels {
for (serverChannel, _) in serverChannels {
try? await serverChannel.channel.close()
}
throw error
}

try self.addressesBound(serverChannels.map { $0.channel.localAddress })
try self.addressesBound(serverChannels.map { (serverChannel, _) in serverChannel.channel.localAddress })

return serverChannels
}
Expand Down
38 changes: 19 additions & 19 deletions Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,44 +157,44 @@ extension NIOHTTPServer {
bindTargets: [NIOHTTPServerConfiguration.BindTarget],
supportedHTTPVersions: Set<NIOHTTPServerConfiguration.HTTPVersion>,
sslContext: NIOSSLContext
) async throws -> [NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>] {
let bootstrap = ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
) async throws -> [(NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>, ServerQuiescingHelper)] {
let bootstrap = ServerBootstrap(group: self.eventLoopGroup)
.serverChannelOption(.socketOption(.so_reuseaddr), value: 1)
.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
self.serverQuiescingHelper.makeServerChannelHandler(channel: channel)
)
}
}

var serverChannels = [NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>]()
var serverChannels = [(NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>, ServerQuiescingHelper)]()
do {
for bindTarget in bindTargets {
switch bindTarget.backing {
case .hostAndPort(let host, let port):
let serverChannel =
try await bootstrap.bind(host: host, port: port) { channel in
self.setupSecureUpgradeConnectionChildChannel(
channel: channel,
supportedHTTPVersions: supportedHTTPVersions,
sslContext: sslContext
let serverQuiescingHelper = ServerQuiescingHelper(group: self.eventLoopGroup)

let serverChannel = try await bootstrap.serverChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(
serverQuiescingHelper.makeServerChannelHandler(channel: channel)
)
}
serverChannels.append(serverChannel)
}.bind(host: host, port: port) { channel in
self.setupSecureUpgradeConnectionChildChannel(
channel: channel,
supportedHTTPVersions: supportedHTTPVersions,
sslContext: sslContext
)
}
serverChannels.append((serverChannel, serverQuiescingHelper))
}
}
} catch {
// A later bind failed: close any channels we already bound to avoid leaking sockets.
// We await the closes so the sockets are fully released by the time we throw, giving the
// caller deterministic semantics: when `serve` throws, all cleanup is done.
for serverChannel in serverChannels {
for (serverChannel, _) in serverChannels {
try? await serverChannel.channel.close()
}
throw error
}

try self.addressesBound(serverChannels.map { $0.channel.localAddress })
try self.addressesBound(serverChannels.map { (serverChannel, _) in serverChannel.channel.localAddress })

return serverChannels
}
Expand Down
63 changes: 38 additions & 25 deletions Sources/NIOHTTPServer/NIOHTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ public struct NIOHTTPServer: HTTPServer {
let logger: Logger
let configuration: NIOHTTPServerConfiguration

let serverQuiescingHelper: ServerQuiescingHelper
/// The event loop group on which the server runs.
///
/// This event loop group is used for every channel the server binds. It also provides the event loop that fulfills
/// the listening address promise and the group from which a `ServerQuiescingHelper` is created for each bound
/// channel.
let eventLoopGroup: MultiThreadedEventLoopGroup

var listeningAddressState: NIOLockedValueBox<State>

Expand All @@ -109,10 +114,8 @@ public struct NIOHTTPServer: HTTPServer {
self.configuration = configuration

// TODO: If we allow users to pass in an event loop, use that instead of the singleton MTELG.
let eventLoopGroup: MultiThreadedEventLoopGroup = .singletonMultiThreadedEventLoopGroup
self.listeningAddressState = .init(.idle(eventLoopGroup.any().makePromise()))

self.serverQuiescingHelper = .init(group: eventLoopGroup)
self.eventLoopGroup = .singletonMultiThreadedEventLoopGroup
self.listeningAddressState = .init(.idle(self.eventLoopGroup.any().makePromise()))
}

/// Starts an HTTP server with the specified request handler.
Expand All @@ -123,13 +126,9 @@ public struct NIOHTTPServer: HTTPServer {
///
/// ## All-or-nothing listening
///
/// The server treats its set of listening addresses as a single unit. If any one of the bound addresses
/// stops listening — whether due to its underlying socket closing, an unrecoverable error on the
/// listening channel, or any other reason — the server stops listening on **all** remaining addresses
/// and this method returns. After that point, ``listeningAddresses`` will throw
/// ``ListeningAddressError/serverClosed``.
///
/// This also applies during graceful shutdown and task cancellation: all channels are shut down together.
/// The server treats its set of listening addresses as a single unit. If an unrecoverable error occurs on any of
/// the listening channels, the server stops listening on **all** remaining addresses and this method returns. After
/// that point, ``listeningAddresses`` will throw ``ListeningAddressError/serverClosed``.
///
/// - Parameter handler: A ``HTTPServerRequestHandler`` implementation that processes incoming HTTP
/// requests. The handler receives each request along with a body reader and response sender function.
Expand Down Expand Up @@ -164,7 +163,7 @@ public struct NIOHTTPServer: HTTPServer {
try await withGracefulShutdownHandler {
try await self._serve(serverChannels: serverChannels, handler: handler)
} onGracefulShutdown: {
self.beginGracefulShutdown()
self.beginGracefulShutdown(serverChannels: serverChannels)
}
} onCancel: {
// Forcefully close down the server channels
Expand All @@ -177,7 +176,9 @@ public struct NIOHTTPServer: HTTPServer {
switch self.configuration.transportSecurity.backing {
case .plaintext:
return try await self.setupHTTP1_1ServerChannels(bindTargets: self.configuration.bindTargets)
.map { .plaintextHTTP1_1($0) }
.map { channel, quiescingHelper in
.plaintextHTTP1_1(channel: channel, quiescingHelper: quiescingHelper)
}

case .tls, .mTLS:
return try await self.setupSecureUpgradeServerChannels(
Expand All @@ -187,7 +188,9 @@ public struct NIOHTTPServer: HTTPServer {
transportSecurity: self.configuration.transportSecurity,
alpnIdentifiers: self.configuration.supportedHTTPVersions.alpnIdentifiers
),
).map { .secureUpgrade($0) }
).map { channel, quiescingHelper in
.secureUpgrade(channel: channel, quiescingHelper: quiescingHelper)
}
}
}

Expand All @@ -199,19 +202,22 @@ public struct NIOHTTPServer: HTTPServer {
for serverChannel in serverChannels {
group.addTask {
switch serverChannel {
case .plaintextHTTP1_1(let http1Channel):
case .plaintextHTTP1_1(let http1Channel, _):
try await self.serveInsecureHTTP1_1(serverChannel: http1Channel, handler: handler)

case .secureUpgrade(let secureUpgradeChannel):
case .secureUpgrade(let secureUpgradeChannel, _):
try await self.serveSecureUpgrade(serverChannel: secureUpgradeChannel, handler: handler)
}
}
}

// Wait for the first channel to complete (either normally or by throwing).
// If any channel stops serving, bring down all remaining channels.
try await group.next()
group.cancelAll()
// If an error occurs in any channel, bring down all other channels too and propagate the error.
do {
for try await _ in group {}
} catch {
// Propagate the error. This will cancel the entire group.
throw error
}
}
}

Expand Down Expand Up @@ -303,9 +309,16 @@ public struct NIOHTTPServer: HTTPServer {
}

/// Initiates a graceful shutdown, allowing existing connections to drain before closing.
private func beginGracefulShutdown() {
private func beginGracefulShutdown(serverChannels: [ServerChannel]) {
self.finishListeningAddressPromise()
self.serverQuiescingHelper.initiateShutdown(promise: nil)

for serverChannel in serverChannels {
switch serverChannel {
case .plaintextHTTP1_1(_, let quiescingHelper),
.secureUpgrade(_, let quiescingHelper):
quiescingHelper.initiateShutdown(promise: nil)
}
}
}

/// Forcefully closes the server channels without waiting for existing connections to drain.
Expand All @@ -314,10 +327,10 @@ public struct NIOHTTPServer: HTTPServer {

for serverChannel in serverChannels {
switch serverChannel {
case .plaintextHTTP1_1(let http1Channel):
case .plaintextHTTP1_1(let http1Channel, _):
http1Channel.channel.close(promise: nil)

case .secureUpgrade(let secureUpgradeChannel):
case .secureUpgrade(let secureUpgradeChannel, _):
secureUpgradeChannel.channel.close(promise: nil)
}
}
Expand Down
12 changes: 10 additions & 2 deletions Sources/NIOHTTPServer/ServerChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@
//===----------------------------------------------------------------------===//

import NIOCore
import NIOExtras
import NIOHTTPTypes

@available(anyAppleOS 26.0, *)
extension NIOHTTPServer {
/// Abstracts over the two types of server channels ``NIOHTTPServer`` can create: plaintext HTTP/1.1 and Secure
/// Upgrade.
enum ServerChannel {
case plaintextHTTP1_1(NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>)
case secureUpgrade(NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>)
case plaintextHTTP1_1(
channel: NIOAsyncChannel<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>, Never>,
quiescingHelper: ServerQuiescingHelper
)

case secureUpgrade(
channel: NIOAsyncChannel<EventLoopFuture<NegotiatedChannel>, Never>,
quiescingHelper: ServerQuiescingHelper
)
}
}
4 changes: 2 additions & 2 deletions Tests/NIOHTTPServerTests/HTTPKeepAliveHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,13 @@ struct HTTPKeepAliveHandlerTests {
try await writer.writeAndConclude("".utf8.span, finalElement: nil)
},
body: { serverAddress in
let clientChannel = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup)
let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup)
.connectToTestSecureUpgradeHTTPServer(
at: serverAddress,
trustRoots: serverChain.chain,
applicationProtocol: HTTPVersion.http1_1.alpnIdentifier
)
let client = try await NIOHTTPServerTests.unwrapNegotiatedChannel(clientChannel, .http1_1)
.unwrapChannel(expectedHTTPVersion: .http1_1)

try await client.executeThenClose { inbound, outbound in
try await outbound.write(
Expand Down
Loading
Loading