diff --git a/juniper/CHANGELOG.md b/juniper/CHANGELOG.md index d3ac63d19..c725666dd 100644 --- a/juniper/CHANGELOG.md +++ b/juniper/CHANGELOG.md @@ -20,6 +20,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi - Renamed `ast::Operation::variable_definitions` field to `variables_definition`. - Changed `ScalarToken::String` to contain raw quoted and escaped `StringLiteral` (was unquoted but escaped string before). ([#1349]) - Added `LexerError::UnterminatedBlockString` variant. ([#1349]) +- Fixed `ValuesStream` to return batch of `ExecutionError`s instead of a single one. ([#1371]) ### Added @@ -62,6 +63,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi - Incorrect `__Type.specifiedByUrl` field to `__Type.specifiedByURL`. ([#1348]) - Missing `@specifiedBy(url:)` directive in [SDL] generated by `RootNode::as_sdl()` and `RootNode::as_document()` methods. ([#1348]) - Incorrect double escaping in `ScalarToken::String` `Display`ing. ([#1349]) +- Memory leak caused by incorrect error handling in `#[graphql_subscription]` macro expansion. ([#1371]) [#864]: /../../issues/864 [#1055]: /../../issues/1055 @@ -75,6 +77,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi [#1355]: /../../pull/1355 [#1358]: /../../pull/1358 [#1361]: /../../pull/1361 +[#1371]: /../../pull/1371 [graphql/graphql-spec#525]: https://github.com/graphql/graphql-spec/pull/525 [graphql/graphql-spec#687]: https://github.com/graphql/graphql-spec/issues/687 [graphql/graphql-spec#805]: https://github.com/graphql/graphql-spec/pull/805 diff --git a/juniper/src/executor/mod.rs b/juniper/src/executor/mod.rs index 33a2477d0..accddefda 100644 --- a/juniper/src/executor/mod.rs +++ b/juniper/src/executor/mod.rs @@ -5,6 +5,8 @@ use std::{ cmp::Ordering, collections::HashMap, fmt::{Debug, Display}, + mem, + pin::Pin, sync::{Arc, RwLock}, }; @@ -228,17 +230,17 @@ impl FieldError { } } -/// The result of resolving the value of a field of type `T` +/// [`Result`] of resolving the value of a field of type `T`. pub type FieldResult = Result>; -/// The result of resolving an unspecified field +/// [`Result`] of resolving an unspecified field. pub type ExecutionResult = Result, FieldError>; -/// Boxed `Stream` yielding `Result, ExecutionError>` +/// [`Box`]ed [`Stream`] yielding `Result, Vec>>`. pub type ValuesStream<'a, S = DefaultScalarValue> = - std::pin::Pin, ExecutionError>> + Send + 'a>>; + Pin, Vec>>> + Send + 'a>>; -/// The map of variables used for substitution during query execution +/// [`HashMap`] of variables used for substitution during query execution. pub type Variables = HashMap>; /// Custom error handling trait to enable error types other than [`FieldError`] @@ -682,6 +684,12 @@ where } } + /// Takes errors from this [`Executor`] clearing its internal [`ExecutionError`]s buffer. + #[must_use] + pub fn take_errors(&self) -> Vec> { + mem::take(&mut self.errors.write().unwrap()) + } + /// Construct a lookahead selection for the current selection. /// /// This allows seeing the whole selection and perform operations diff --git a/juniper/src/tests/subscriptions.rs b/juniper/src/tests/subscriptions.rs index 80a23b160..9edab8f4b 100644 --- a/juniper/src/tests/subscriptions.rs +++ b/juniper/src/tests/subscriptions.rs @@ -90,7 +90,7 @@ fn create_and_execute( ) -> Result< ( Vec, - Vec, ExecutionError>>>, + Vec, Vec>>>>, ), Vec>, > { diff --git a/juniper_codegen/CHANGELOG.md b/juniper_codegen/CHANGELOG.md index 928f59798..b1bcd67f1 100644 --- a/juniper_codegen/CHANGELOG.md +++ b/juniper_codegen/CHANGELOG.md @@ -18,12 +18,17 @@ All user visible changes to `juniper_codegen` crate will be documented in this f - Placing `#[graphql(deprecated)]` attribute on method arguments in `#[graphql_object]` and `#[graphql_interface]` macros. - Support of `#[graphql(rename_all = "snake_case")]` attribute. ([#1354]) +### Fixed + +- Memory leak caused by incorrect error handling in `#[graphql_subscription]` macro expansion. ([#1371]) + [#864]: /../../issues/864 [#1055]: /../../issues/1055 [#1062]: /../../issues/1062 [#1347]: /../../issues/1347 [#1348]: /../../pull/1348 [#1354]: /../../pull/1354 +[#1371]: /../../pull/1371 [graphql/graphql-spec#525]: https://github.com/graphql/graphql-spec/pull/525 [graphql/graphql-spec#805]: https://github.com/graphql/graphql-spec/pull/805 [graphql/graphql-spec#825]: https://github.com/graphql/graphql-spec/pull/825 diff --git a/juniper_codegen/src/common/field/mod.rs b/juniper_codegen/src/common/field/mod.rs index bc87b2fb7..e7b9bca9b 100644 --- a/juniper_codegen/src/common/field/mod.rs +++ b/juniper_codegen/src/common/field/mod.rs @@ -359,15 +359,20 @@ impl Definition { ::core::option::Option::Some((ctx, r)), ) => { let sub = ex.replaced_context(ctx); - sub.resolve_with_ctx_async(&(), &r) + let val = sub.resolve_with_ctx_async(&(), &r) .await - .map_err(|e| ex.new_error(e)) + .map_err(|e| ::std::vec![ex.new_error(e)])?; + let errs = sub.take_errors(); + if !errs.is_empty() { + return ::core::result::Result::Err(errs) + } + ::core::result::Result::Ok(val) } ::core::result::Result::Ok(::core::option::Option::None) => { ::core::result::Result::Ok(::juniper::Value::null()) } ::core::result::Result::Err(e) => { - ::core::result::Result::Err(ex.new_error(e)) + ::core::result::Result::Err(::std::vec![ex.new_error(e)]) } } } diff --git a/juniper_graphql_ws/CHANGELOG.md b/juniper_graphql_ws/CHANGELOG.md index 9c40e4f40..d2bdb12e1 100644 --- a/juniper_graphql_ws/CHANGELOG.md +++ b/juniper_graphql_ws/CHANGELOG.md @@ -16,13 +16,22 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi - Made [WebSocket] connection closed once `ConnectionConfig::keep_alive::timeout` is reached in [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-6.0.7]. ([#1367]) > **COMPATIBILITY**: Previously, a [WebSocket] connection was kept alive, even when clients do not respond to server's `Pong` messages at all. To preserve the previous behavior, the `ConnectionConfig::keep_alive::timeout` should be set to `Duration:::ZERO`. +### Added + +- `ConnectionConfig::panic_handler` field and `ConnectionConfig::with_panic_handler()` method allowing to specify `PanicHandler` for panics happened during execution of [GraphQL] operations. ([#1371]) + +### Changed + +- Merged `graphql_transport_ws::NextPayload` and `graphql_ws::DataPayload` into a single struct. ([#1371]) + ### Fixed -- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368]) +- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368]) [#1367]: /../../pull/1367 [#1368]: /../../pull/1368 [#1369]: /../../pull/1369 +[#1371]: /../../pull/1371 [proto-6.0.7]: https://github.com/enisdenjo/graphql-ws/blob/v6.0.7/PROTOCOL.md diff --git a/juniper_graphql_ws/src/graphql_transport_ws/mod.rs b/juniper_graphql_ws/src/graphql_transport_ws/mod.rs index 53c334f02..ec918d6f8 100644 --- a/juniper_graphql_ws/src/graphql_transport_ws/mod.rs +++ b/juniper_graphql_ws/src/graphql_transport_ws/mod.rs @@ -14,13 +14,13 @@ mod client_message; mod server_message; use std::{ - collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin, - sync::Arc, time::Duration, + collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, + panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration, }; use derive_more::with_trait::From; use juniper::{ - GraphQLError, RuleError, ScalarValue, + GraphQLError, RuleError, ScalarValue, Value, futures::{ Sink, Stream, channel::oneshot, @@ -40,7 +40,7 @@ pub use self::{ struct ExecutionParams { subscribe_payload: SubscribePayload, - config: ConnectionConfig, + config: ConnectionConfig, schema: S, } @@ -71,9 +71,9 @@ pub enum Output { } impl Output { - /// Converts the reaction into a one-item stream. - fn into_stream(self) -> BoxStream<'static, Self> { - stream::once(future::ready(self)).boxed() + /// Wraps this [`Output`] into a singe-item [`Stream`]. + fn into_stream(self) -> stream::Once> { + stream::once(future::ready(self)) } } @@ -82,7 +82,7 @@ enum ConnectionState> { PreInit { init: I, schema: S }, /// Active is the state after a ConnectionInit message has been accepted. Active { - config: ConnectionConfig, + config: ConnectionConfig, stoppers: HashMap>, ping: Arc, schema: S, @@ -107,26 +107,23 @@ impl> ConnectionState { let ping = Arc::new(Notify::new()); - let mut s = Output::Message(ServerMessage::ConnectionAck) - .into_stream() - .boxed(); + let s = Output::Message(ServerMessage::ConnectionAck).into_stream(); - if keep_alive_interval > Duration::from_secs(0) { - s = s - .chain(Output::Message(ServerMessage::Pong).into_stream()) - .boxed(); - s = s + let s = if keep_alive_interval > Duration::from_secs(0) { + s.chain(Output::Message(ServerMessage::Pong).into_stream()) .chain(stream::repeat(()).then(move |()| { tokio::time::sleep(keep_alive_interval) .map(|()| Output::Message(ServerMessage::Pong)) })) - .boxed(); - } + .right_stream() + } else { + s.left_stream() + }; - if keep_alive_timeout > Duration::from_secs(0) { + let s = if keep_alive_timeout > Duration::from_secs(0) { let ping_rx = ping.clone(); - s = stream::select_all([ - s, + stream::select_all([ + s.boxed(), stream::repeat(()) .then(move |()| { let ping_rx = ping_rx.clone(); @@ -143,8 +140,10 @@ impl> ConnectionState { .filter_map(future::ready) .boxed(), ]) - .boxed(); - } + .boxed() + } else { + s.boxed() + }; ( Self::Active { @@ -199,6 +198,7 @@ impl> ConnectionState { message: format!("Subscriber for {id} already exists"), } .into_stream() + .boxed() } else if config.max_in_flight_operations > 0 && stoppers.len() >= config.max_in_flight_operations { @@ -286,35 +286,80 @@ impl> ConnectionState { let params = Arc::new(params); - // Try to execute this as a query or mutation. - match juniper::execute( + let fut = juniper::execute( ¶ms.subscribe_payload.query, params.subscribe_payload.operation_name.as_deref(), params.schema.root_node(), ¶ms.subscribe_payload.variables, ¶ms.config.context, ) - .await - { - Ok((data, errors)) => { - return Output::Message(ServerMessage::Next { - id: id.clone(), - payload: NextPayload { data, errors }, - }) - .into_stream(); - } - Err(GraphQLError::IsSubscription) => {} - Err(e) => { - return Output::Message(ServerMessage::Error { + .map_ok(|(data, errors)| { + Output::Message(ServerMessage::Next { + id: id.clone(), + payload: NextPayload { data, errors }, + }) + .into_stream() + .left_stream() + }) + .unwrap_or_else(|e| { + if matches!(e, GraphQLError::IsSubscription) { + SubscriptionStart::new(id.clone(), params.clone()).right_stream() + } else { + Output::Message(ServerMessage::Error { id: id.clone(), payload: ErrorPayload::new(Box::new(params.clone()), e), }) - .into_stream(); + .into_stream() + .left_stream() } + }); + if let Some(panic_handler) = params.config.panic_handler.as_ref().map(Arc::clone) { + let stream = AssertUnwindSafe(fut) + .catch_unwind() + .await + .unwrap_or_else(|e| { + if let Some(e) = panic_handler(e, ¶ms.config.context) { + Output::Message(ServerMessage::Next { + id: id.clone(), + payload: NextPayload { + data: Value::null(), + errors: vec![e], + }, + }) + } else { + Output::Close { + code: 1000, + message: "Operation execution panicked".into(), + } + } + .into_stream() + .left_stream() + }); + AssertUnwindSafe(stream) + .catch_unwind() + .map(move |res| match res { + Ok(item) => item, + Err(e) => { + if let Some(e) = panic_handler(e, ¶ms.config.context) { + Output::Message(ServerMessage::Next { + id: id.clone(), + payload: NextPayload { + data: Value::null(), + errors: vec![e], + }, + }) + } else { + Output::Close { + code: 1000, + message: "Subscription execution panicked".into(), + } + } + } + }) + .boxed() + } else { + fut.await.boxed() } - - // Try to execute as a subscription. - SubscriptionStart::new(id, params.clone()).boxed() } } @@ -349,12 +394,12 @@ struct SubscriptionStart { } impl SubscriptionStart { - fn new(id: String, params: Arc>) -> Pin> { - Box::pin(Self { + fn new(id: String, params: Arc>) -> Self { + Self { params, state: SubscriptionStartState::Init { id }, _marker: PhantomPinned, - }) + } } } @@ -543,7 +588,8 @@ where code: 1000, message: "Normal Closure".into(), } - .into_stream(), + .into_stream() + .boxed(), ); ConnectionSinkState::Closed } @@ -554,7 +600,8 @@ where code: 4400, message: e.to_string(), } - .into_stream(), + .into_stream() + .boxed(), ); ConnectionSinkState::Closed } diff --git a/juniper_graphql_ws/src/graphql_transport_ws/server_message.rs b/juniper_graphql_ws/src/graphql_transport_ws/server_message.rs index 48eb48d18..7b65a4f09 100644 --- a/juniper_graphql_ws/src/graphql_transport_ws/server_message.rs +++ b/juniper_graphql_ws/src/graphql_transport_ws/server_message.rs @@ -1,21 +1,6 @@ -use juniper::{ExecutionError, Value}; use serde::Serialize; -pub use crate::server_message::ErrorPayload; - -/// Sent after execution of an operation. For queries and mutations, this is sent to the client -/// once. For subscriptions, this is sent for every event in the event stream. -#[derive(Debug, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct NextPayload { - /// The result data. - pub data: Value, - - /// The errors that have occurred during execution. Note that parse and validation errors are - /// not included here. They are sent via Error messages. - #[serde(skip_serializing_if = "Vec::is_empty")] - pub errors: Vec>, -} +pub use crate::server_message::{ErrorPayload, NextPayload}; /// ServerMessage defines the message types that servers can send. #[derive(Debug, PartialEq, Serialize)] diff --git a/juniper_graphql_ws/src/graphql_ws/mod.rs b/juniper_graphql_ws/src/graphql_ws/mod.rs index 3b389e52e..558bfde22 100644 --- a/juniper_graphql_ws/src/graphql_ws/mod.rs +++ b/juniper_graphql_ws/src/graphql_ws/mod.rs @@ -16,12 +16,12 @@ mod client_message; mod server_message; use std::{ - collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin, - sync::Arc, time::Duration, + collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, + panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration, }; use juniper::{ - GraphQLError, RuleError, + GraphQLError, RuleError, Value, futures::{ Sink, Stream, channel::oneshot, @@ -40,7 +40,7 @@ pub use self::{ struct ExecutionParams { start_payload: StartPayload, - config: ConnectionConfig, + config: ConnectionConfig, schema: S, } @@ -50,9 +50,9 @@ enum Reaction { } impl Reaction { - /// Converts the reaction into a one-item stream. - fn into_stream(self) -> BoxStream<'static, Self> { - stream::once(future::ready(self)).boxed() + /// Wraps this [`Reaction`] into a singe-item [`Stream`]. + fn into_stream(self) -> stream::Once> { + stream::once(future::ready(self)) } } @@ -61,7 +61,7 @@ enum ConnectionState> { PreInit { init: I, schema: S }, /// Active is the state after a ConnectionInit message has been accepted. Active { - config: ConnectionConfig, + config: ConnectionConfig, stoppers: HashMap>, schema: S, }, @@ -77,7 +77,7 @@ impl> ConnectionState { msg: ClientMessage, ) -> (Self, BoxStream<'static, Reaction>) { if let ClientMessage::ConnectionTerminate = msg { - return (self, Reaction::EndStream.into_stream()); + return (self, Reaction::EndStream.into_stream().boxed()); } match self { @@ -86,29 +86,27 @@ impl> ConnectionState { Ok(config) => { let keep_alive_interval = config.keep_alive.interval; - let mut s = stream::iter(vec![Reaction::ServerMessage( + let s = stream::iter(vec![Reaction::ServerMessage( ServerMessage::ConnectionAck, - )]) - .boxed(); + )]); #[expect(closure_returning_async_block, reason = "not possible")] - if keep_alive_interval > Duration::from_secs(0) { - s = s - .chain( - Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive) - .into_stream(), - ) - .boxed(); - s = s - .chain(stream::unfold((), move |_| async move { - tokio::time::sleep(keep_alive_interval).await; - Some(( - Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive), - (), - )) - })) - .boxed(); - } + let s = if keep_alive_interval > Duration::from_secs(0) { + s.chain( + Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive) + .into_stream(), + ) + .chain(stream::unfold((), move |_| async move { + tokio::time::sleep(keep_alive_interval).await; + Some(( + Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive), + (), + )) + })) + .boxed() + } else { + s.boxed() + }; ( Self::Active { @@ -228,35 +226,74 @@ impl> ConnectionState { let params = Arc::new(params); - // Try to execute this as a query or mutation. - match juniper::execute( + let fut = juniper::execute( ¶ms.start_payload.query, params.start_payload.operation_name.as_deref(), params.schema.root_node(), ¶ms.start_payload.variables, ¶ms.config.context, ) - .await - { - Ok((data, errors)) => { - return Reaction::ServerMessage(ServerMessage::Data { - id: id.clone(), - payload: DataPayload { data, errors }, - }) - .into_stream(); - } - Err(GraphQLError::IsSubscription) => {} - Err(e) => { - return Reaction::ServerMessage(ServerMessage::Error { + .map_ok(|(data, errors)| { + Reaction::ServerMessage(ServerMessage::Data { + id: id.clone(), + payload: DataPayload { data, errors }, + }) + .into_stream() + .left_stream() + }) + .unwrap_or_else(|e| { + if matches!(e, GraphQLError::IsSubscription) { + SubscriptionStart::new(id.clone(), params.clone()).right_stream() + } else { + Reaction::ServerMessage(ServerMessage::Error { id: id.clone(), payload: ErrorPayload::new(Box::new(params.clone()), e), }) - .into_stream(); + .into_stream() + .left_stream() } + }); + if let Some(panic_handler) = params.config.panic_handler.as_ref().map(Arc::clone) { + let stream = AssertUnwindSafe(fut) + .catch_unwind() + .await + .unwrap_or_else(|e| { + if let Some(e) = panic_handler(e, ¶ms.config.context) { + Reaction::ServerMessage(ServerMessage::Data { + id: id.clone(), + payload: DataPayload { + data: Value::null(), + errors: vec![e], + }, + }) + } else { + Reaction::EndStream + } + .into_stream() + .left_stream() + }); + AssertUnwindSafe(stream) + .catch_unwind() + .map(move |res| match res { + Ok(item) => item, + Err(e) => { + if let Some(e) = panic_handler(e, ¶ms.config.context) { + Reaction::ServerMessage(ServerMessage::Data { + id: id.clone(), + payload: DataPayload { + data: Value::null(), + errors: vec![e], + }, + }) + } else { + Reaction::EndStream + } + } + }) + .boxed() + } else { + fut.await.boxed() } - - // Try to execute as a subscription. - SubscriptionStart::new(id, params.clone()).boxed() } } @@ -291,12 +328,12 @@ struct SubscriptionStart { } impl SubscriptionStart { - fn new(id: String, params: Arc>) -> Pin> { - Box::pin(Self { + fn new(id: String, params: Arc>) -> Self { + Self { params, state: SubscriptionStartState::Init { id }, _marker: PhantomPinned, - }) + } } } @@ -468,7 +505,8 @@ where message: e.to_string(), }, }) - .into_stream(), + .into_stream() + .boxed(), ); ConnectionSinkState::Ready { state } } diff --git a/juniper_graphql_ws/src/graphql_ws/server_message.rs b/juniper_graphql_ws/src/graphql_ws/server_message.rs index bfefc2312..d073240fd 100644 --- a/juniper_graphql_ws/src/graphql_ws/server_message.rs +++ b/juniper_graphql_ws/src/graphql_ws/server_message.rs @@ -1,7 +1,6 @@ -use juniper::{ExecutionError, Value}; use serde::Serialize; -pub use crate::server_message::ErrorPayload; +pub use crate::server_message::{ErrorPayload, NextPayload as DataPayload}; /// The payload for errors that are not associated with a GraphQL operation. #[derive(Debug, Eq, PartialEq, Serialize)] @@ -11,20 +10,6 @@ pub struct ConnectionErrorPayload { pub message: String, } -/// Sent after execution of an operation. For queries and mutations, this is sent to the client -/// once. For subscriptions, this is sent for every event in the event stream. -#[derive(Debug, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DataPayload { - /// The result data. - pub data: Value, - - /// The errors that have occurred during execution. Note that parse and validation errors are - /// not included here. They are sent via Error messages. - #[serde(skip_serializing_if = "Vec::is_empty")] - pub errors: Vec>, -} - /// ServerMessage defines the message types that servers can send. #[derive(Debug, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] diff --git a/juniper_graphql_ws/src/lib.rs b/juniper_graphql_ws/src/lib.rs index 3a299b486..07d01395b 100644 --- a/juniper_graphql_ws/src/lib.rs +++ b/juniper_graphql_ws/src/lib.rs @@ -15,16 +15,17 @@ mod schema; mod server_message; mod util; -use std::{convert::Infallible, error::Error, future, time::Duration}; +use std::{any::Any, convert::Infallible, error::Error, future, sync::Arc, time::Duration}; -use juniper::{ScalarValue, Variables}; +use derive_more::with_trait::Debug; +use juniper::{ExecutionError, ScalarValue, Variables}; pub use self::schema::{ArcSchema, Schema}; /// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit /// message. -#[derive(Clone, Copy, Debug)] -pub struct ConnectionConfig { +#[derive(Clone, Debug)] +pub struct ConnectionConfig { /// Custom-provided [`juniper::Context`]. pub context: CtxT, @@ -36,15 +37,20 @@ pub struct ConnectionConfig { /// Keep-alive configuration. pub keep_alive: KeepAliveConfig, + + /// Optional [`PanicHandler`] for panics happened during execution of operations. + #[debug(ignore)] + pub panic_handler: Option>>, } -impl ConnectionConfig { +impl ConnectionConfig { /// Constructs the configuration required for a connection to be accepted. pub fn new(context: CtxT) -> Self { Self { context, max_in_flight_operations: 0, keep_alive: KeepAliveConfig::default(), + panic_handler: None, } } @@ -95,9 +101,26 @@ impl ConnectionConfig { self.keep_alive.timeout = timeout; self } + + /// Specifies the [`PanicHandler`] for panics happened during execution of operations. + /// + /// # [`PanicHandler`]'s result + /// + /// - If an [`ExecutionError`] is returned from the [`PanicHandler`], it will be emitted to + /// clients as a regular operation result. + /// - Otherwise (if [`None`] is returned), then no data will be emitted to clients, and the + /// whole connection will be terminated immediately. + #[must_use] + pub fn with_panic_handler( + mut self, + panic_handler: impl PanicHandler + 'static, + ) -> Self { + self.panic_handler = Some(Arc::new(panic_handler)); + self + } } -impl Init for ConnectionConfig { +impl Init for ConnectionConfig { type Error = Infallible; type Future = future::Ready>; @@ -146,6 +169,26 @@ impl Default for KeepAliveConfig { } } +/// Handler of panics used in [`catch_unwind()`]. +/// +/// # Result +/// +/// - If an [`ExecutionError`] is returned, it will be emitted to clients as a regular operation +/// result. +/// - Otherwise (if [`None`] is returned), then no data will be emitted to clients, and the whole +/// connection will be terminated immediately. +/// +/// [`catch_unwind()`]: std::panic::catch_unwind +pub trait PanicHandler: + Fn(Box, &CtxT) -> Option> + Send + Sync +{ +} + +impl PanicHandler for T where + T: Fn(Box, &CtxT) -> Option> + Send + Sync + ?Sized +{ +} + /// Init defines the requirements for types that can provide connection configurations when /// ConnectionInit messages are received. Implementations are provided for `ConnectionConfig` and /// closures that meet the requirements. @@ -155,7 +198,7 @@ pub trait Init: Unpin + 'static { type Error: Error; /// The future configuration type. - type Future: Future, Self::Error>> + Send + 'static; + type Future: Future, Self::Error>> + Send + 'static; /// Returns a future for the configuration to use. fn init(self, params: Variables) -> Self::Future; @@ -165,7 +208,7 @@ impl Init for F where S: ScalarValue, F: FnOnce(Variables) -> Fut + Unpin + 'static, - Fut: Future, E>> + Send + 'static, + Fut: Future, E>> + Send + 'static, E: Error, { type Error = E; diff --git a/juniper_graphql_ws/src/server_message.rs b/juniper_graphql_ws/src/server_message.rs index 46c78074f..7ea21b79b 100644 --- a/juniper_graphql_ws/src/server_message.rs +++ b/juniper_graphql_ws/src/server_message.rs @@ -3,9 +3,29 @@ use std::{any::Any, marker::PhantomPinned}; use derive_more::with_trait::Debug; -use juniper::GraphQLError; +#[cfg(doc)] +use juniper::futures::Stream; +use juniper::{ExecutionError, GraphQLError, Value}; use serde::{Serialize, Serializer}; +/// Payload to be send after execution of an operation. +/// +/// - For queries and mutations, this is sent to the client once. +/// - For subscriptions, this is sent for every event in the event [`Stream`]. +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NextPayload { + /// Execution result data. + pub data: Value, + + /// Errors that have occurred during execution. + /// + /// Note, that parse and validation errors are not included here. They are sent via + /// [`ErrorPayload`]. + #[serde(skip_serializing_if = "Vec::is_empty")] + pub errors: Vec>, +} + /// Payload for errors that can happen before execution. /// /// Errors that happen during execution are instead sent to the client via diff --git a/juniper_subscriptions/src/lib.rs b/juniper_subscriptions/src/lib.rs index 327d101c6..948a150aa 100644 --- a/juniper_subscriptions/src/lib.rs +++ b/juniper_subscriptions/src/lib.rs @@ -144,9 +144,9 @@ where )))), Value::Scalar(s) => Box::pin(s.map(|res| match res { Ok(val) => ExecutionOutput::from_data(val), - Err(err) => ExecutionOutput { + Err(errors) => ExecutionOutput { data: Value::null(), - errors: vec![err], + errors, }, })), Value::List(list) => { @@ -217,7 +217,7 @@ where match val { Ok(value) => (name, value), Err(e) => { - errors.push(e); + errors.extend(e); (name, Value::Null) } } @@ -288,7 +288,7 @@ mod whole_responses_stream { assert_eq!(result, expected); } - type PollResult = Result, ExecutionError>; + type PollResult = Result, Vec>>; #[tokio::test] async fn value_scalar() { diff --git a/tests/integration/tests/common/mod.rs b/tests/integration/tests/common/mod.rs index 323a4cc07..8c02455d4 100644 --- a/tests/integration/tests/common/mod.rs +++ b/tests/integration/tests/common/mod.rs @@ -65,7 +65,7 @@ pub mod util { if let Value::Scalar(ref mut stream) = val { return match stream.next().await { Some(Ok(val)) => Ok((graphql_value!({ name: val }), vec![])), - Some(Err(e)) => Ok((Value::Null, vec![e])), + Some(Err(errs)) => Ok((Value::Null, errs)), None => Ok((Value::Null, vec![])), }; }