Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions juniper/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
- Renamed `ast::Operation::variable_definitions` field to `variables_definition`.
- Changed `ScalarToken::String` to contain raw quoted and escaped `StringLiteral` (was unquoted but escaped string before). ([#1349])
- Added `LexerError::UnterminatedBlockString` variant. ([#1349])
- Fixed `ValuesStream` to return batch of `ExecutionError`s instead of a single one. ([#1371])

### Added

Expand Down Expand Up @@ -62,6 +63,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
- Incorrect `__Type.specifiedByUrl` field to `__Type.specifiedByURL`. ([#1348])
- Missing `@specifiedBy(url:)` directive in [SDL] generated by `RootNode::as_sdl()` and `RootNode::as_document()` methods. ([#1348])
- Incorrect double escaping in `ScalarToken::String` `Display`ing. ([#1349])
- Memory leak caused by incorrect error handling in `#[graphql_subscription]` macro expansion. ([#1371])

[#864]: /../../issues/864
[#1055]: /../../issues/1055
Expand All @@ -75,6 +77,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
[#1355]: /../../pull/1355
[#1358]: /../../pull/1358
[#1361]: /../../pull/1361
[#1371]: /../../pull/1371
[graphql/graphql-spec#525]: https://github.com/graphql/graphql-spec/pull/525
[graphql/graphql-spec#687]: https://github.com/graphql/graphql-spec/issues/687
[graphql/graphql-spec#805]: https://github.com/graphql/graphql-spec/pull/805
Expand Down
18 changes: 13 additions & 5 deletions juniper/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::{
cmp::Ordering,
collections::HashMap,
fmt::{Debug, Display},
mem,
pin::Pin,
sync::{Arc, RwLock},
};

Expand Down Expand Up @@ -228,17 +230,17 @@ impl<S> FieldError<S> {
}
}

/// The result of resolving the value of a field of type `T`
/// [`Result`] of resolving the value of a field of type `T`.
pub type FieldResult<T, S = DefaultScalarValue> = Result<T, FieldError<S>>;

/// The result of resolving an unspecified field
/// [`Result`] of resolving an unspecified field.
pub type ExecutionResult<S = DefaultScalarValue> = Result<Value<S>, FieldError<S>>;

/// Boxed `Stream` yielding `Result<Value<S>, ExecutionError<S>>`
/// [`Box`]ed [`Stream`] yielding `Result<Value<S>, Vec<ExecutionError<S>>>`.
pub type ValuesStream<'a, S = DefaultScalarValue> =
std::pin::Pin<Box<dyn Stream<Item = Result<Value<S>, ExecutionError<S>>> + Send + 'a>>;
Pin<Box<dyn Stream<Item = Result<Value<S>, Vec<ExecutionError<S>>>> + Send + 'a>>;

/// The map of variables used for substitution during query execution
/// [`HashMap`] of variables used for substitution during query execution.
pub type Variables<S = DefaultScalarValue> = HashMap<String, InputValue<S>>;

/// Custom error handling trait to enable error types other than [`FieldError`]
Expand Down Expand Up @@ -682,6 +684,12 @@ where
}
}

/// Takes errors from this [`Executor`] clearing its internal [`ExecutionError`]s buffer.
#[must_use]
pub fn take_errors(&self) -> Vec<ExecutionError<S>> {
mem::take(&mut self.errors.write().unwrap())
}

/// Construct a lookahead selection for the current selection.
///
/// This allows seeing the whole selection and perform operations
Expand Down
2 changes: 1 addition & 1 deletion juniper/src/tests/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn create_and_execute(
) -> Result<
(
Vec<String>,
Vec<Vec<Result<Value<DefaultScalarValue>, ExecutionError<DefaultScalarValue>>>>,
Vec<Vec<Result<Value<DefaultScalarValue>, Vec<ExecutionError<DefaultScalarValue>>>>>,
),
Vec<ExecutionError<DefaultScalarValue>>,
> {
Expand Down
5 changes: 5 additions & 0 deletions juniper_codegen/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ All user visible changes to `juniper_codegen` crate will be documented in this f
- Placing `#[graphql(deprecated)]` attribute on method arguments in `#[graphql_object]` and `#[graphql_interface]` macros.
- Support of `#[graphql(rename_all = "snake_case")]` attribute. ([#1354])

### Fixed

- Memory leak caused by incorrect error handling in `#[graphql_subscription]` macro expansion. ([#1371])

[#864]: /../../issues/864
[#1055]: /../../issues/1055
[#1062]: /../../issues/1062
[#1347]: /../../issues/1347
[#1348]: /../../pull/1348
[#1354]: /../../pull/1354
[#1371]: /../../pull/1371
[graphql/graphql-spec#525]: https://github.com/graphql/graphql-spec/pull/525
[graphql/graphql-spec#805]: https://github.com/graphql/graphql-spec/pull/805
[graphql/graphql-spec#825]: https://github.com/graphql/graphql-spec/pull/825
Expand Down
11 changes: 8 additions & 3 deletions juniper_codegen/src/common/field/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,20 @@ impl Definition {
::core::option::Option::Some((ctx, r)),
) => {
let sub = ex.replaced_context(ctx);
sub.resolve_with_ctx_async(&(), &r)
let val = sub.resolve_with_ctx_async(&(), &r)
.await
.map_err(|e| ex.new_error(e))
.map_err(|e| ::std::vec![ex.new_error(e)])?;
let errs = sub.take_errors();
if !errs.is_empty() {
return ::core::result::Result::Err(errs)
}
::core::result::Result::Ok(val)
}
::core::result::Result::Ok(::core::option::Option::None) => {
::core::result::Result::Ok(::juniper::Value::null())
}
::core::result::Result::Err(e) => {
::core::result::Result::Err(ex.new_error(e))
::core::result::Result::Err(::std::vec![ex.new_error(e)])
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion juniper_graphql_ws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi
- Made [WebSocket] connection closed once `ConnectionConfig::keep_alive::timeout` is reached in [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-6.0.7]. ([#1367])
> **COMPATIBILITY**: Previously, a [WebSocket] connection was kept alive, even when clients do not respond to server's `Pong` messages at all. To preserve the previous behavior, the `ConnectionConfig::keep_alive::timeout` should be set to `Duration:::ZERO`.

### Added

- `ConnectionConfig::panic_handler` field and `ConnectionConfig::with_panic_handler()` method allowing to specify `PanicHandler` for panics happened during execution of [GraphQL] operations. ([#1371])

### Changed

- Merged `graphql_transport_ws::NextPayload` and `graphql_ws::DataPayload` into a single struct. ([#1371])

### Fixed

- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368])
- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368])

[#1367]: /../../pull/1367
[#1368]: /../../pull/1368
[#1369]: /../../pull/1369
[#1371]: /../../pull/1371
[proto-6.0.7]: https://github.com/enisdenjo/graphql-ws/blob/v6.0.7/PROTOCOL.md


Expand Down
139 changes: 93 additions & 46 deletions juniper_graphql_ws/src/graphql_transport_ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ mod client_message;
mod server_message;

use std::{
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin,
sync::Arc, time::Duration,
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned,
panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration,
};

use derive_more::with_trait::From;
use juniper::{
GraphQLError, RuleError, ScalarValue,
GraphQLError, RuleError, ScalarValue, Value,
futures::{
Sink, Stream,
channel::oneshot,
Expand All @@ -40,7 +40,7 @@ pub use self::{

struct ExecutionParams<S: Schema> {
subscribe_payload: SubscribePayload<S::ScalarValue>,
config: ConnectionConfig<S::Context>,
config: ConnectionConfig<S::Context, S::ScalarValue>,
schema: S,
}

Expand Down Expand Up @@ -71,9 +71,9 @@ pub enum Output<S: ScalarValue> {
}

impl<S: ScalarValue + Send> Output<S> {
/// Converts the reaction into a one-item stream.
fn into_stream(self) -> BoxStream<'static, Self> {
stream::once(future::ready(self)).boxed()
/// Wraps this [`Output`] into a singe-item [`Stream`].
fn into_stream(self) -> stream::Once<future::Ready<Self>> {
stream::once(future::ready(self))
}
}

Expand All @@ -82,7 +82,7 @@ enum ConnectionState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
PreInit { init: I, schema: S },
/// Active is the state after a ConnectionInit message has been accepted.
Active {
config: ConnectionConfig<S::Context>,
config: ConnectionConfig<S::Context, S::ScalarValue>,
stoppers: HashMap<String, oneshot::Sender<()>>,
ping: Arc<Notify>,
schema: S,
Expand All @@ -107,26 +107,23 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {

let ping = Arc::new(Notify::new());

let mut s = Output::Message(ServerMessage::ConnectionAck)
.into_stream()
.boxed();
let s = Output::Message(ServerMessage::ConnectionAck).into_stream();

if keep_alive_interval > Duration::from_secs(0) {
s = s
.chain(Output::Message(ServerMessage::Pong).into_stream())
.boxed();
s = s
let s = if keep_alive_interval > Duration::from_secs(0) {
s.chain(Output::Message(ServerMessage::Pong).into_stream())
.chain(stream::repeat(()).then(move |()| {
tokio::time::sleep(keep_alive_interval)
.map(|()| Output::Message(ServerMessage::Pong))
}))
.boxed();
}
.right_stream()
} else {
s.left_stream()
};

if keep_alive_timeout > Duration::from_secs(0) {
let s = if keep_alive_timeout > Duration::from_secs(0) {
let ping_rx = ping.clone();
s = stream::select_all([
s,
stream::select_all([
s.boxed(),
stream::repeat(())
.then(move |()| {
let ping_rx = ping_rx.clone();
Expand All @@ -143,8 +140,10 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
.filter_map(future::ready)
.boxed(),
])
.boxed();
}
.boxed()
} else {
s.boxed()
};

(
Self::Active {
Expand Down Expand Up @@ -199,6 +198,7 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
message: format!("Subscriber for {id} already exists"),
}
.into_stream()
.boxed()
} else if config.max_in_flight_operations > 0
&& stoppers.len() >= config.max_in_flight_operations
{
Expand Down Expand Up @@ -286,35 +286,80 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {

let params = Arc::new(params);

// Try to execute this as a query or mutation.
match juniper::execute(
let fut = juniper::execute(
&params.subscribe_payload.query,
params.subscribe_payload.operation_name.as_deref(),
params.schema.root_node(),
&params.subscribe_payload.variables,
&params.config.context,
)
.await
{
Ok((data, errors)) => {
return Output::Message(ServerMessage::Next {
id: id.clone(),
payload: NextPayload { data, errors },
})
.into_stream();
}
Err(GraphQLError::IsSubscription) => {}
Err(e) => {
return Output::Message(ServerMessage::Error {
.map_ok(|(data, errors)| {
Output::Message(ServerMessage::Next {
id: id.clone(),
payload: NextPayload { data, errors },
})
.into_stream()
.left_stream()
})
.unwrap_or_else(|e| {
if matches!(e, GraphQLError::IsSubscription) {
SubscriptionStart::new(id.clone(), params.clone()).right_stream()
} else {
Output::Message(ServerMessage::Error {
id: id.clone(),
payload: ErrorPayload::new(Box::new(params.clone()), e),
})
.into_stream();
.into_stream()
.left_stream()
}
});
if let Some(panic_handler) = params.config.panic_handler.as_ref().map(Arc::clone) {
let stream = AssertUnwindSafe(fut)
.catch_unwind()
.await
.unwrap_or_else(|e| {
if let Some(e) = panic_handler(e, &params.config.context) {
Output::Message(ServerMessage::Next {
id: id.clone(),
payload: NextPayload {
data: Value::null(),
errors: vec![e],
},
})
} else {
Output::Close {
code: 1000,
message: "Operation execution panicked".into(),
}
}
.into_stream()
.left_stream()
});
AssertUnwindSafe(stream)
.catch_unwind()
.map(move |res| match res {
Ok(item) => item,
Err(e) => {
if let Some(e) = panic_handler(e, &params.config.context) {
Output::Message(ServerMessage::Next {
id: id.clone(),
payload: NextPayload {
data: Value::null(),
errors: vec![e],
},
})
} else {
Output::Close {
code: 1000,
message: "Subscription execution panicked".into(),
}
}
}
})
.boxed()
} else {
fut.await.boxed()
}

// Try to execute as a subscription.
SubscriptionStart::new(id, params.clone()).boxed()
}
}

Expand Down Expand Up @@ -349,12 +394,12 @@ struct SubscriptionStart<S: Schema> {
}

impl<S: Schema> SubscriptionStart<S> {
fn new(id: String, params: Arc<ExecutionParams<S>>) -> Pin<Box<Self>> {
Box::pin(Self {
fn new(id: String, params: Arc<ExecutionParams<S>>) -> Self {
Self {
params,
state: SubscriptionStartState::Init { id },
_marker: PhantomPinned,
})
}
}
}

Expand Down Expand Up @@ -543,7 +588,8 @@ where
code: 1000,
message: "Normal Closure".into(),
}
.into_stream(),
.into_stream()
.boxed(),
);
ConnectionSinkState::Closed
}
Expand All @@ -554,7 +600,8 @@ where
code: 4400,
message: e.to_string(),
}
.into_stream(),
.into_stream()
.boxed(),
);
ConnectionSinkState::Closed
}
Expand Down
Loading