diff --git a/src/libraries/System.Text.Json/src/System.Text.Json.csproj b/src/libraries/System.Text.Json/src/System.Text.Json.csproj index 687d263b4a3197..ef2714d1c81642 100644 --- a/src/libraries/System.Text.Json/src/System.Text.Json.csproj +++ b/src/libraries/System.Text.Json/src/System.Text.Json.csproj @@ -107,6 +107,7 @@ The System.Text.Json library is built-in as part of the shared framework in .NET + diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncEnumeratorState.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncEnumeratorState.cs new file mode 100644 index 00000000000000..1dca59f265b9d3 --- /dev/null +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncEnumeratorState.cs @@ -0,0 +1,31 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Text.Json +{ + /// + /// Tracks the state of an async enumerator within a . + /// + internal enum AsyncEnumeratorState : byte + { + /// + /// No async enumerator is active; the enumerator has not been created yet. + /// + None, + + /// + /// The async enumerator has been created and is actively being iterated. + /// + Enumerating, + + /// + /// The converter has been suspended due to a pending MoveNextAsync() task. + /// + PendingMoveNext, + + /// + /// The converter has been suspended due to a pending DisposeAsync() task. + /// + PendingDisposal, + } +} diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs index 9c55998ef85c00..906cbdf76c23a8 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs @@ -49,47 +49,59 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va IAsyncEnumerator enumerator; ValueTask moveNextTask; - if (state.Current.AsyncDisposable is null) + switch (state.Current.AsyncEnumeratorState) { - enumerator = value.GetAsyncEnumerator(state.CancellationToken); - // async enumerators can only be disposed asynchronously; - // store in the WriteStack for future disposal - // by the root async serialization context. - state.Current.AsyncDisposable = enumerator; - // enumerator.MoveNextAsync() calls can throw, - // ensure the enumerator already is stored - // in the WriteStack for proper disposal. - moveNextTask = enumerator.MoveNextAsync(); + case AsyncEnumeratorState.None: + enumerator = value.GetAsyncEnumerator(state.CancellationToken); + // async enumerators can only be disposed asynchronously; + // store in the WriteStack for disposal on exception. + state.Current.AsyncEnumerator = enumerator; + state.Current.AsyncEnumeratorState = AsyncEnumeratorState.Enumerating; + // enumerator.MoveNextAsync() calls can throw, + // ensure the enumerator already is stored + // in the WriteStack for proper disposal. + moveNextTask = enumerator.MoveNextAsync(); + + if (!moveNextTask.IsCompleted) + { + // It is common for first-time MoveNextAsync() calls to return pending tasks, + // since typically that is when underlying network connections are being established. + // For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of the written array) + // to give the stream owner the ability to recover in case of a connection error. + state.SuppressFlush = true; + goto SuspendDueToPendingTask; + } + break; + + case AsyncEnumeratorState.PendingMoveNext: + Debug.Assert(state.Current.AsyncEnumerator is IAsyncEnumerator); + enumerator = (IAsyncEnumerator)state.Current.AsyncEnumerator; - if (!moveNextTask.IsCompleted) - { - // It is common for first-time MoveNextAsync() calls to return pending tasks, - // since typically that is when underlying network connections are being established. - // For this case only, suppress flushing the current buffer contents (e.g. the leading '[' token of the written array) - // to give the stream owner the ability to recover in case of a connection error. - state.SuppressFlush = true; - goto SuspendDueToPendingTask; - } - } - else - { - Debug.Assert(state.Current.AsyncDisposable is IAsyncEnumerator); - enumerator = (IAsyncEnumerator)state.Current.AsyncDisposable; - - if (state.Current.AsyncEnumeratorIsPendingCompletion) - { // converter was previously suspended due to a pending MoveNextAsync() task Debug.Assert(state.PendingTask is Task && state.PendingTask.IsCompleted); moveNextTask = new ValueTask((Task)state.PendingTask); - state.Current.AsyncEnumeratorIsPendingCompletion = false; + state.Current.AsyncEnumeratorState = AsyncEnumeratorState.Enumerating; state.PendingTask = null; - } - else - { + break; + + case AsyncEnumeratorState.PendingDisposal: + // Converter was previously suspended due to a pending DisposeAsync() task. + Debug.Assert(state.Current.AsyncEnumerator is null); + Debug.Assert(state.PendingTask is not null && state.PendingTask.IsCompleted); + state.PendingTask.GetAwaiter().GetResult(); + state.Current.AsyncEnumeratorState = AsyncEnumeratorState.None; + state.PendingTask = null; + return true; + + default: + Debug.Assert(state.Current.AsyncEnumeratorState == AsyncEnumeratorState.Enumerating); + Debug.Assert(state.Current.AsyncEnumerator is IAsyncEnumerator); + enumerator = (IAsyncEnumerator)state.Current.AsyncEnumerator; + // converter was suspended for a different reason; // the last MoveNextAsync() call can only have completed with 'true'. moveNextTask = new ValueTask(true); - } + break; } Debug.Assert(moveNextTask.IsCompleted); @@ -100,10 +112,21 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va { if (!moveNextTask.Result) { - // we have completed serialization for the enumerator, - // clear from the stack and schedule for async disposal. - state.Current.AsyncDisposable = null; - state.AddCompletedAsyncDisposable(enumerator); + // Enumeration complete, dispose the enumerator inline. + // Clear from the stack first to prevent double disposal on exception. + state.Current.AsyncEnumerator = null; + state.Current.AsyncEnumeratorState = AsyncEnumeratorState.None; + ValueTask disposeTask = enumerator.DisposeAsync(); + if (!disposeTask.IsCompleted) + { + // DisposeAsync is pending; store as a pending task + // and yield control to the root-level async serialization loop. + state.PendingTask = disposeTask.AsTask(); + state.Current.AsyncEnumeratorState = AsyncEnumeratorState.PendingDisposal; + return false; + } + + disposeTask.GetAwaiter().GetResult(); return true; } @@ -128,7 +151,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va // mark the current stackframe as pending completion. Debug.Assert(state.PendingTask is null); state.PendingTask = moveNextTask.AsTask(); - state.Current.AsyncEnumeratorIsPendingCompletion = true; + state.Current.AsyncEnumeratorState = AsyncEnumeratorState.PendingMoveNext; return false; } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs index 6dcbdb5746fa4e..6cae18c524a3f9 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Metadata/JsonTypeInfoOfT.WriteHelpers.cs @@ -195,7 +195,7 @@ rootValue is not null && } finally { - // Await any pending resumable converter tasks (currently these can only be IAsyncEnumerator.MoveNextAsync() tasks). + // Await any pending resumable converter tasks (currently these can only be IAsyncEnumerator.MoveNextAsync() or DisposeAsync() tasks). // Note that pending tasks are always awaited, even if an exception has been thrown or the cancellation token has fired. if (state.PendingTask is not null) { @@ -210,12 +210,6 @@ rootValue is not null && catch { } #endif } - - // Dispose any pending async disposables (currently these can only be completed IAsyncEnumerators). - if (state.CompletedAsyncDisposables?.Count > 0) - { - await state.DisposeCompletedAsyncDisposables().ConfigureAwait(false); - } } } while (!isFinalBlock); diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs index 4941b6e3420d0a..568f70b8e2f1fa 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs @@ -2,7 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections; -using System.Collections.Generic; using System.Diagnostics; using System.IO.Pipelines; using System.Runtime.ExceptionServices; @@ -78,11 +77,6 @@ public readonly ref WriteStackFrame Parent /// public Task? PendingTask; - /// - /// List of completed IAsyncDisposables that have been scheduled for disposal by converters. - /// - public List? CompletedAsyncDisposables; - /// /// The amount of bytes to write before the underlying Stream should be flushed and the /// current buffer adjusted to remove the processed bytes. @@ -277,35 +271,6 @@ public void Pop(bool success) } } - public void AddCompletedAsyncDisposable(IAsyncDisposable asyncDisposable) - => (CompletedAsyncDisposables ??= new List()).Add(asyncDisposable); - - // Asynchronously dispose of any AsyncDisposables that have been scheduled for disposal - public readonly async ValueTask DisposeCompletedAsyncDisposables() - { - Debug.Assert(CompletedAsyncDisposables?.Count > 0); - Exception? exception = null; - - foreach (IAsyncDisposable asyncDisposable in CompletedAsyncDisposables) - { - try - { - await asyncDisposable.DisposeAsync().ConfigureAwait(false); - } - catch (Exception e) - { - exception = e; - } - } - - if (exception is not null) - { - ExceptionDispatchInfo.Capture(exception).Throw(); - } - - CompletedAsyncDisposables.Clear(); - } - /// /// Walks the stack cleaning up any leftover IDisposables /// in the event of an exception on serialization @@ -314,7 +279,7 @@ public readonly void DisposePendingDisposablesOnException() { Exception? exception = null; - Debug.Assert(Current.AsyncDisposable is null); + Debug.Assert(Current.AsyncEnumerator is null); DisposeFrame(Current.CollectionEnumerator, ref exception); if (_stack is not null) @@ -323,7 +288,7 @@ public readonly void DisposePendingDisposablesOnException() int stackSize = Math.Max(currentIndex, _continuationCount); for (int i = 0; i < stackSize; i++) { - Debug.Assert(_stack[i].AsyncDisposable is null); + Debug.Assert(_stack[i].AsyncEnumerator is null); if (i == currentIndex) { @@ -365,7 +330,7 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync() { Exception? exception = null; - exception = await DisposeFrame(Current.CollectionEnumerator, Current.AsyncDisposable, exception).ConfigureAwait(false); + exception = await DisposeFrame(Current.CollectionEnumerator, Current.AsyncEnumerator, exception).ConfigureAwait(false); if (_stack is not null) { @@ -378,11 +343,11 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync() { // Matches the entry in Current, skip to avoid double disposal. Debug.Assert(_stack[i].CollectionEnumerator is null || ReferenceEquals(Current.CollectionEnumerator, _stack[i].CollectionEnumerator)); - Debug.Assert(_stack[i].AsyncDisposable is null || ReferenceEquals(Current.AsyncDisposable, _stack[i].AsyncDisposable)); + Debug.Assert(_stack[i].AsyncEnumerator is null || ReferenceEquals(Current.AsyncEnumerator, _stack[i].AsyncEnumerator)); continue; } - exception = await DisposeFrame(_stack[i].CollectionEnumerator, _stack[i].AsyncDisposable, exception).ConfigureAwait(false); + exception = await DisposeFrame(_stack[i].CollectionEnumerator, _stack[i].AsyncEnumerator, exception).ConfigureAwait(false); } } @@ -391,9 +356,9 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync() ExceptionDispatchInfo.Capture(exception).Throw(); } - static async ValueTask DisposeFrame(IEnumerator? collectionEnumerator, IAsyncDisposable? asyncDisposable, Exception? exception) + static async ValueTask DisposeFrame(IEnumerator? collectionEnumerator, object? asyncEnumerator, Exception? exception) { - Debug.Assert(!(collectionEnumerator is not null && asyncDisposable is not null)); + Debug.Assert(!(collectionEnumerator is not null && asyncEnumerator is not null)); try { @@ -401,7 +366,7 @@ public readonly async ValueTask DisposePendingDisposablesOnExceptionAsync() { disposable.Dispose(); } - else if (asyncDisposable is not null) + else if (asyncEnumerator is IAsyncDisposable asyncDisposable) { await asyncDisposable.DisposeAsync().ConfigureAwait(false); } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs index a2b6a787c6ac47..93a1af9f8d6dbf 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs @@ -19,15 +19,14 @@ internal struct WriteStackFrame public IEnumerator? CollectionEnumerator; /// - /// The enumerator for resumable async disposables. + /// The async enumerator for resumable async enumerable collections. /// - public IAsyncDisposable? AsyncDisposable; + public object? AsyncEnumerator; /// - /// The current stackframe has suspended serialization due to a pending task, - /// stored in the property. + /// The state of the async enumerator for the current stack frame. /// - public bool AsyncEnumeratorIsPendingCompletion; + public AsyncEnumeratorState AsyncEnumeratorState; /// /// The original JsonPropertyInfo that is not changed. It contains all properties. diff --git a/src/libraries/System.Text.Json/tests/Common/CollectionTests/CollectionTests.AsyncEnumerable.cs b/src/libraries/System.Text.Json/tests/Common/CollectionTests/CollectionTests.AsyncEnumerable.cs index e525d988769fde..7baaa72488c458 100644 --- a/src/libraries/System.Text.Json/tests/Common/CollectionTests/CollectionTests.AsyncEnumerable.cs +++ b/src/libraries/System.Text.Json/tests/Common/CollectionTests/CollectionTests.AsyncEnumerable.cs @@ -367,6 +367,182 @@ static async IAsyncEnumerable GetFailingAsyncEnumerable() } } + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task WriteSequentialNestedAsyncEnumerables_EachEnumeratorDisposedBeforeNextStarts(bool asyncDisposal) + { + if (StreamingSerializer?.IsAsyncSerializer != true) + { + return; + } + + var events = new List(); + var enumerable1 = new DisposalTrackingAsyncEnumerable( + new[] { 1, 2, 3 }, "A", events, asyncDisposal); + var enumerable2 = new DisposalTrackingAsyncEnumerable( + new[] { 4, 5, 6 }, "B", events, asyncDisposal); + + using var stream = new Utf8MemoryStream(); + await StreamingSerializer.SerializeWrapper(stream, new AsyncEnumerableDtoWithTwoProperties + { + Data1 = enumerable1, + Data2 = enumerable2, + }); + + JsonTestHelper.AssertJsonEqual("""{"Data1":[1,2,3],"Data2":[4,5,6]}""", stream.AsString()); + + int disposeAIndex = events.IndexOf("A:Disposed"); + int startBIndex = events.IndexOf("B:MoveNext"); + Assert.True(disposeAIndex >= 0, "A should have been disposed"); + Assert.True(startBIndex >= 0, "B should have started enumeration"); + Assert.True(disposeAIndex < startBIndex, + $"A should be disposed before B starts enumeration. Events: [{string.Join(", ", events)}]"); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task WriteNestedAsyncEnumerableInsideAsyncEnumerable_InnerEnumeratorsDisposedPromptly(bool asyncDisposal) + { + if (StreamingSerializer?.IsAsyncSerializer != true) + { + return; + } + + // This test requires reflection to serialize custom IAsyncEnumerable types; + // skip in source-gen contexts where reflection is disabled. + if (!JsonSerializer.IsReflectionEnabledByDefault) + { + return; + } + + var events = new List(); + var inner1 = new DisposalTrackingAsyncEnumerable( + new[] { 1, 2 }, "Inner1", events, asyncDisposal); + var inner2 = new DisposalTrackingAsyncEnumerable( + new[] { 3, 4 }, "Inner2", events, asyncDisposal); + + var outer = new DisposalTrackingAsyncEnumerable>( + new IAsyncEnumerable[] { inner1, inner2 }, "Outer", events, asyncDisposal); + + using var stream = new Utf8MemoryStream(); + await JsonSerializer.SerializeAsync>>(stream, outer); + + JsonTestHelper.AssertJsonEqual("[[1,2],[3,4]]", stream.AsString()); + + Assert.Contains("Inner1:Disposed", events); + Assert.Contains("Inner2:Disposed", events); + Assert.Contains("Outer:Disposed", events); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task WriteEmptyNestedAsyncEnumerables_AllDisposed(bool asyncDisposal) + { + if (StreamingSerializer?.IsAsyncSerializer != true) + { + return; + } + + var events = new List(); + var enumerable1 = new DisposalTrackingAsyncEnumerable( + Array.Empty(), "A", events, asyncDisposal); + var enumerable2 = new DisposalTrackingAsyncEnumerable( + Array.Empty(), "B", events, asyncDisposal); + + using var stream = new Utf8MemoryStream(); + await StreamingSerializer.SerializeWrapper(stream, new AsyncEnumerableDtoWithTwoProperties + { + Data1 = enumerable1, + Data2 = enumerable2, + }); + + JsonTestHelper.AssertJsonEqual("""{"Data1":[],"Data2":[]}""", stream.AsString()); + + int disposeAIndex = events.IndexOf("A:Disposed"); + int startBIndex = events.IndexOf("B:MoveNext"); + Assert.True(disposeAIndex >= 0, "A should have been disposed"); + Assert.True(startBIndex >= 0, "B should have started enumeration"); + Assert.True(disposeAIndex < startBIndex, + $"A should be disposed before B starts enumeration. Events: [{string.Join(", ", events)}]"); + } + + [Fact] + public async Task WriteAsyncEnumerable_DisposeAsyncThrows_ExceptionPropagated() + { + if (StreamingSerializer?.IsAsyncSerializer != true) + { + return; + } + + using var stream = new Utf8MemoryStream(); + var enumerable = new ThrowingDisposeAsyncEnumerable(new[] { 1, 2 }); + + await Assert.ThrowsAsync(async () => + await StreamingSerializer.SerializeWrapper(stream, new AsyncEnumerableDto { Data = enumerable })); + } + + private sealed class DisposalTrackingAsyncEnumerable( + IEnumerable source, + string id, + List events, + bool asyncDisposal) : IAsyncEnumerable + { + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + => new Enumerator(source.GetEnumerator(), id, events, asyncDisposal); + + private sealed class Enumerator( + IEnumerator inner, + string id, + List events, + bool asyncDisposal) : IAsyncEnumerator + { + public TElement Current => inner.Current; + + public ValueTask MoveNextAsync() + { + bool result = inner.MoveNext(); + events.Add($"{id}:MoveNext"); + return new ValueTask(result); + } + + public ValueTask DisposeAsync() + { + inner.Dispose(); + events.Add($"{id}:Disposed"); + if (asyncDisposal) + { + return new ValueTask(Task.Delay(1)); + } + return default; + } + } + } + + private sealed class ThrowingDisposeAsyncEnumerable(IEnumerable source) : IAsyncEnumerable + { + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + => new Enumerator(source.GetEnumerator()); + + private sealed class Enumerator(IEnumerator inner) : IAsyncEnumerator + { + public TElement Current => inner.Current; + public ValueTask MoveNextAsync() + { + bool result = inner.MoveNext(); + return new ValueTask(result); + } + + public ValueTask DisposeAsync() + { + inner.Dispose(); + throw new InvalidOperationException("Simulated DisposeAsync failure for testing"); + } + } + } + public class MockedAsyncEnumerable : IAsyncEnumerable, IEnumerable { private readonly IEnumerable _source;