Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 12 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# GraphQLTransportWS

[![](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2FGraphQLSwift%2FGraphQLTransportWS%2Fbadge%3Ftype%3Dplatforms)](https://swiftpackageindex.com/GraphQLSwift/GraphQLTransportWS)
[![](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2FGraphQLSwift%2FGraphQLTransportWS%2Fbadge%3Ftype%3Dswift-versions)](https://swiftpackageindex.com/GraphQLSwift/GraphQLTransportWS)

This implements the [graphql-transport-ws WebSocket subprotocol](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md).
It is mainly intended for server support, but there is a basic client implementation included.

Expand All @@ -14,7 +17,7 @@ Features:
To use this package, include it in your `Package.swift` dependencies:

```swift
.package(url: "git@gitlab.com:PassiveLogic/platform/GraphQLTransportWS.git", from: "<version>")
.package(url: "https://github.com/GraphQLSwift/GraphQLTransportWS", from: "<version>")
```

Then create a class to implement the `Messenger` protocol. Here's an example using
Expand All @@ -25,33 +28,18 @@ import WebSocketKit
import GraphQLTransportWS

/// Messenger wrapper for WebSockets
class WebSocketMessenger: Messenger {
private weak var websocket: WebSocket?
private var onReceive: (String) async throws -> Void = { _ in }

init(websocket: WebSocket) {
self.websocket = websocket
websocket.onText { _, message in
try await self.onReceive(message)
}
}
struct WebSocketMessenger: Messenger {
let websocket: WebSocket

func send<S>(_ message: S) where S: Collection, S.Element == Character async throws {
guard let websocket = websocket else { return }
try await websocket.send(message)
}

func onReceive(callback: @escaping (String) async throws -> Void) {
self.onReceive = callback
}

func error(_ message: String, code: Int) async throws {
guard let websocket = websocket else { return }
try await websocket.send("\(code): \(message)")
}

func close() async throws {
guard let websocket = websocket else { return }
try await websocket.close()
}
}
Expand Down Expand Up @@ -85,6 +73,12 @@ routes.webSocket(
)
}
)
let incoming = AsyncStream<String> { continuation in
websocket.onText { _, message in
continuation.yield(message)
}
}
try await server.listen(to: incoming)
}
)
```
Expand Down Expand Up @@ -125,12 +119,3 @@ This example would require `connection_init` message from the client to look lik
```

If the `payload` field is not required on your server, you may make Server's generic declaration optional like `Server<Payload?>`

## Memory Management

Memory ownership among the Server, Client, and Messenger may seem a little backwards. This is because the Swift/Vapor WebSocket
implementation persists WebSocket objects long after their callback and they are expected to retain strong memory references to the
objects required for responses. In order to align cleanly and avoid memory cycles, Server and Client are injected strongly into Messenger
callbacks, and only hold weak references to their Messenger. This means that Messenger objects (or their enclosing WebSocket) must
be persisted to have the connected Server or Client objects function. That is, if a Server's Messenger falls out of scope and deinitializes,
the Server will no longer respond to messages.
99 changes: 39 additions & 60 deletions Sources/GraphQLTransportWS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ import Foundation
import GraphQL

/// Client is an open-ended implementation of the client side of the protocol. It parses and adds callbacks for each type of server respose.
public class Client<InitPayload: Equatable & Codable> {
// We keep this weak because we strongly inject this object into the messenger callback
weak var messenger: Messenger?
public actor Client<InitPayload: Equatable & Codable> {
let messenger: Messenger

var onConnectionAck: (ConnectionAckResponse, Client) async throws -> Void = { _, _ in }
var onNext: (NextResponse, Client) async throws -> Void = { _, _ in }
var onError: (ErrorResponse, Client) async throws -> Void = { _, _ in }
var onComplete: (CompleteResponse, Client) async throws -> Void = { _, _ in }
var onMessage: (String, Client) async throws -> Void = { _, _ in }
let onConnectionAck: (ConnectionAckResponse, Client) async throws -> Void
let onNext: (NextResponse, Client) async throws -> Void
let onError: (ErrorResponse, Client) async throws -> Void
let onComplete: (CompleteResponse, Client) async throws -> Void

let encoder = GraphQLJSONEncoder()
let decoder = JSONDecoder()
Expand All @@ -19,96 +17,80 @@ public class Client<InitPayload: Equatable & Codable> {
///
/// - Parameters:
/// - messenger: The messenger to bind the client to.
/// - onConnectionAck: The callback run on receipt of a `connection_ack` message
/// - onNext: The callback run on receipt of a `next` message
/// - onError: The callback run on receipt of an `error` message
/// - onComplete: The callback run on receipt of a `complete` message
public init(
messenger: Messenger
messenger: Messenger,
onConnectionAck: @escaping (ConnectionAckResponse, Client) async throws -> Void = { _, _ in },
onNext: @escaping (NextResponse, Client) async throws -> Void = { _, _ in },
onError: @escaping (ErrorResponse, Client) async throws -> Void = { _, _ in },
onComplete: @escaping (CompleteResponse, Client) async throws -> Void = { _, _ in }
) {
self.messenger = messenger
messenger.onReceive { message in
try await self.onMessage(message, self)
self.onConnectionAck = onConnectionAck
self.onNext = onNext
self.onError = onError
self.onComplete = onComplete
}

/// Listen and react to the provided async sequence of server messages. This function will block until the stream is completed.
/// - Parameter incoming: The server message sequence that the client should react to.
public func listen<A: AsyncSequence & Sendable>(to incoming: A) async throws -> Void where A.Element == String {
for try await message in incoming {
// Detect and ignore error responses.
if message.starts(with: "44") {
// TODO: Determine what to do with returned error messages
return
}

guard let json = message.data(using: .utf8) else {
try await self.error(.invalidEncoding())
try await error(.invalidEncoding())
return
}

let response: Response
do {
response = try self.decoder.decode(Response.self, from: json)
response = try decoder.decode(Response.self, from: json)
} catch {
try await self.error(.noType())
return
}

switch response.type {
case .connectionAck:
guard let connectionAckResponse = try? self.decoder.decode(ConnectionAckResponse.self, from: json) else {
try await self.error(.invalidResponseFormat(messageType: .connectionAck))
guard let connectionAckResponse = try? decoder.decode(ConnectionAckResponse.self, from: json) else {
try await error(.invalidResponseFormat(messageType: .connectionAck))
return
}
try await self.onConnectionAck(connectionAckResponse, self)
try await onConnectionAck(connectionAckResponse, self)
case .next:
guard let nextResponse = try? self.decoder.decode(NextResponse.self, from: json) else {
try await self.error(.invalidResponseFormat(messageType: .next))
guard let nextResponse = try? decoder.decode(NextResponse.self, from: json) else {
try await error(.invalidResponseFormat(messageType: .next))
return
}
try await self.onNext(nextResponse, self)
try await onNext(nextResponse, self)
case .error:
guard let errorResponse = try? self.decoder.decode(ErrorResponse.self, from: json) else {
try await self.error(.invalidResponseFormat(messageType: .error))
guard let errorResponse = try? decoder.decode(ErrorResponse.self, from: json) else {
try await error(.invalidResponseFormat(messageType: .error))
return
}
try await self.onError(errorResponse, self)
try await onError(errorResponse, self)
case .complete:
guard let completeResponse = try? self.decoder.decode(CompleteResponse.self, from: json) else {
try await self.error(.invalidResponseFormat(messageType: .complete))
guard let completeResponse = try? decoder.decode(CompleteResponse.self, from: json) else {
try await error(.invalidResponseFormat(messageType: .complete))
return
}
try await self.onComplete(completeResponse, self)
try await onComplete(completeResponse, self)
default:
try await self.error(.invalidType())
try await error(.invalidType())
}
}
}

/// Define the callback run on receipt of a `connection_ack` message
/// - Parameter callback: The callback to assign
public func onConnectionAck(_ callback: @escaping (ConnectionAckResponse, Client) async throws -> Void) {
onConnectionAck = callback
}

/// Define the callback run on receipt of a `next` message
/// - Parameter callback: The callback to assign
public func onNext(_ callback: @escaping (NextResponse, Client) async throws -> Void) {
onNext = callback
}

/// Define the callback run on receipt of an `error` message
/// - Parameter callback: The callback to assign
public func onError(_ callback: @escaping (ErrorResponse, Client) async throws -> Void) {
onError = callback
}

/// Define the callback run on receipt of a `complete` message
/// - Parameter callback: The callback to assign
public func onComplete(_ callback: @escaping (CompleteResponse, Client) async throws -> Void) {
onComplete = callback
}

/// Define the callback run on receipt of any message
/// - Parameter callback: The callback to assign
public func onMessage(_ callback: @escaping (String, Client) async throws -> Void) {
onMessage = callback
}

/// Send a `connection_init` request through the messenger
public func sendConnectionInit(payload: InitPayload) async throws {
guard let messenger = messenger else { return }
try await messenger.send(
ConnectionInitRequest(
payload: payload
Expand All @@ -118,7 +100,6 @@ public class Client<InitPayload: Equatable & Codable> {

/// Send a `subscribe` request through the messenger
public func sendStart(payload: GraphQLRequest, id: String) async throws {
guard let messenger = messenger else { return }
try await messenger.send(
SubscribeRequest(
payload: payload,
Expand All @@ -129,7 +110,6 @@ public class Client<InitPayload: Equatable & Codable> {

/// Send a `complete` request through the messenger
public func sendStop(id: String) async throws {
guard let messenger = messenger else { return }
try await messenger.send(
CompleteRequest(
id: id
Expand All @@ -139,7 +119,6 @@ public class Client<InitPayload: Equatable & Codable> {

/// Send an error through the messenger and close the connection
private func error(_ error: GraphQLTransportWSError) async throws {
guard let messenger = messenger else { return }
try await messenger.error(error.message, code: error.code.rawValue)
}
}
2 changes: 1 addition & 1 deletion Sources/GraphQLTransportWS/GraphqlTransportWSError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct GraphQLTransportWSError: Error {

/// Error codes for miscellaneous issues
public enum ErrorCode: Int, CustomStringConvertible, Sendable {
// Miscellaneous
/// Miscellaneous
case miscellaneous = 4400

// Internal errors
Expand Down
11 changes: 3 additions & 8 deletions Sources/GraphQLTransportWS/Messenger.swift
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import Foundation

/// Protocol for an object that can send and recieve messages. This allows mocking in tests
public protocol Messenger: AnyObject {
// AnyObject compliance requires that the implementing object is a class and we can reference it weakly

/// Protocol for an object that can send messages.
public protocol Messenger: Sendable {
/// Send a message through this messenger
/// - Parameter message: The message to send
func send<S>(_ message: S) async throws -> Void where S: Collection, S.Element == Character

/// Set the callback that should be run when a message is recieved
func onReceive(callback: @escaping (String) async throws -> Void)
func send<S: Sendable & Collection>(_ message: S) async throws -> Void where S.Element == Character

/// Close the messenger
func close() async throws
Expand Down
Loading