diff --git a/docs/navigate/advanced-programming/toc.yml b/docs/navigate/advanced-programming/toc.yml index efb7756c4eade..7353e67a2d9ea 100644 --- a/docs/navigate/advanced-programming/toc.yml +++ b/docs/navigate/advanced-programming/toc.yml @@ -34,6 +34,12 @@ items: href: ../../standard/asynchronous-programming-patterns/common-async-bugs.md - name: Async lambda pitfalls href: ../../standard/asynchronous-programming-patterns/async-lambda-pitfalls.md + - name: Coordination primitives + items: + - name: Build async coordination primitives + href: ../../standard/asynchronous-programming-patterns/async-coordination-primitives.md + - name: Async semaphores, locks, and reader/writer coordination + href: ../../standard/asynchronous-programming-patterns/async-coordination-primitives-advanced.md - name: Event-based asynchronous pattern (EAP) items: - name: Documentation overview diff --git a/docs/standard/asynchronous-programming-patterns/async-coordination-primitives-advanced.md b/docs/standard/asynchronous-programming-patterns/async-coordination-primitives-advanced.md new file mode 100644 index 0000000000000..3432204772f2a --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/async-coordination-primitives-advanced.md @@ -0,0 +1,105 @@ +--- +title: "Async semaphores, locks, and reader/writer coordination" +description: Learn to use SemaphoreSlim.WaitAsync for async throttling, build async locks for mutual exclusion, and coordinate readers and writers in async code. +ms.date: 04/16/2026 +ai-usage: ai-assisted +dev_langs: + - "csharp" + - "vb" +helpviewer_keywords: + - "SemaphoreSlim.WaitAsync" + - "async lock" + - "async semaphore" + - "AsyncLock" + - "reader/writer lock, async" + - "ConcurrentExclusiveSchedulerPair" + - "System.Threading.Channels" +--- + +# Async semaphores, locks, and reader/writer coordination + +When async code needs throttling, mutual exclusion, or reader/writer coordination, use the built-in .NET types rather than building your own. This article shows how to apply those types, and then walks through custom implementations to explain how they work internally. + +## Async semaphore — throttle concurrent access + +A semaphore limits how many callers can access a resource concurrently. provides a method that lets you await entry without blocking a thread: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="SemaphoreSlimUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="SemaphoreSlimUsage"::: + +Always pair `WaitAsync` with `Release` in a `try`/`finally` block. If you forget to release, the semaphore count never increases, and other callers wait indefinitely. + +### How an async semaphore works + +Internally, an async semaphore maintains a count and a queue of waiters. When the count is above zero, `WaitAsync` decrements the count and returns immediately. When the count is zero, `WaitAsync` enqueues a and returns its task. `Release` either dequeues a waiter and completes it, or increments the count: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="AsyncSemaphore"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="AsyncSemaphore"::: + +The `Release` method completes the `TaskCompletionSource` outside the lock, just like the `AsyncAutoResetEvent` in [Build async coordination primitives](async-coordination-primitives.md). This approach prevents synchronous continuations from running while the lock is held. + +> [!TIP] +> In production code, use instead of this custom type. `SemaphoreSlim.WaitAsync` supports cancellation tokens, timeouts, and has been thoroughly tested. + +## Async lock: mutual exclusion across awaits + +A lock with a count of 1 provides mutual exclusion. The C# `lock` statement and (.NET 9+) don't work across `await` boundaries because they're thread-affine. The thread that acquires the lock might not be the thread that resumes after the `await`. Use with a count of 1 instead: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="SemaphoreSlimAsLock"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="SemaphoreSlimAsLock"::: + +### How an async lock works + +You can wrap the semaphore pattern in a type that supports `using` for automatic release. The `LockAsync` method returns a disposable `Releaser`; when the `Releaser` is disposed, it releases the semaphore: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="AsyncLock"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="AsyncLock"::: + +Usage is concise and safe: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="AsyncLockUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="AsyncLockUsage"::: + +> [!TIP] +> In production code, use `SemaphoreSlim(1, 1)` directly with `try`/`finally`. The custom `AsyncLock` type shown here illustrates the disposable-releaser pattern but adds complexity without adding capabilities beyond what `SemaphoreSlim` provides. + +## Async reader/writer coordination + +A reader/writer lock allows multiple concurrent readers but only one exclusive writer. .NET provides , which offers reader/writer scheduling for tasks through two instances: + +- — runs tasks concurrently (like readers), as long as no exclusive task is active. +- — runs tasks exclusively (like writers), with no other tasks running. + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="ConcurrentExclusiveUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="ConcurrentExclusiveUsage"::: + +> [!IMPORTANT] +> `ConcurrentExclusiveSchedulerPair` protects at the task level, not across `await` boundaries. If a task queued to the `ExclusiveScheduler` contains an `await` on an incomplete operation, the exclusive lock releases when the `await` yields and reacquires when the continuation runs. Another exclusive or concurrent task can run during that gap. This behavior works well when you protect in-memory data structures and ensure no `await` interrupts the critical section. For scenarios that require holding the lock across awaits, use a custom `AsyncReaderWriterLock` like the one shown in the following section. + +### Custom async reader/writer lock + +The following implementation gives writers priority over readers. When a writer is waiting, new readers queue behind it. When a writer finishes and no other writers are waiting, all queued readers run together: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="AsyncReaderWriterLock"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="AsyncReaderWriterLock"::: + +Usage follows the same disposable-releaser pattern as `AsyncLock`: + +:::code language="csharp" source="./snippets/async-coordination-primitives-advanced/csharp/Program.cs" id="AsyncReaderWriterLockUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives-advanced/vb/Program.vb" id="AsyncReaderWriterLockUsage"::: + +> [!TIP] +> A production reader/writer lock requires thorough testing for edge cases: reentrancy, error paths, cancellation, and fairness policies. Consider established libraries (such as [Nito.AsyncEx](https://github.com/StephenCleary/AsyncEx)) before building your own. + +## Channels as an alternative coordination pattern + + provides a thread-safe producer-consumer queue that supports `async` reads and writes. Bounded channels () provide natural back-pressure, replacing some scenarios where you'd otherwise use a semaphore for throttling. + +For more information, see [System.Threading.Channels](/dotnet/core/extensions/channels). + +## See also + +- [Build async coordination primitives](async-coordination-primitives.md) +- [Keeping async methods alive](keeping-async-methods-alive.md) +- [Complete your tasks](complete-your-tasks.md) +- [Consuming the Task-based Asynchronous Pattern](consuming-the-task-based-asynchronous-pattern.md) diff --git a/docs/standard/asynchronous-programming-patterns/async-coordination-primitives.md b/docs/standard/asynchronous-programming-patterns/async-coordination-primitives.md new file mode 100644 index 0000000000000..2bea3b3243587 --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/async-coordination-primitives.md @@ -0,0 +1,104 @@ +--- +title: "Build async coordination primitives" +description: Learn how to build async coordination primitives using TaskCompletionSource, including manual-reset events, auto-reset events, countdown events, and barriers. +ms.date: 04/16/2026 +ai-usage: ai-assisted +dev_langs: + - "csharp" + - "vb" +helpviewer_keywords: + - "async coordination" + - "TaskCompletionSource" + - "AsyncManualResetEvent" + - "AsyncAutoResetEvent" + - "AsyncCountdownEvent" + - "AsyncBarrier" + - "coordination primitives" +--- + +# Build async coordination primitives + +Synchronous coordination primitives like , , and block the calling thread while waiting. In async code, blocking a thread wastes a resource that could be doing other work. Use to build async equivalents that let callers `await` instead of blocking. + +A `TaskCompletionSource` produces a that you complete manually by calling , , or . Code that awaits that task suspends without blocking a thread, and resumes when you complete the source. This pattern forms the building block for every primitive in this article. + +> [!NOTE] +> The primitives in this article are educational implementations. For production throttling and mutual exclusion, use the built-in types covered in [Async semaphores, locks, and reader/writer coordination](async-coordination-primitives-advanced.md). Always complete every `TaskCompletionSource` you create; see [Complete your tasks](complete-your-tasks.md) for guidance. + +## Async manual-reset event + +A manual-reset event starts in a non-signaled state. Callers wait for the event, and all waiters resume when another party signals (sets) the event. The event stays signaled until you explicitly reset it. The synchronous equivalent is . + +`TaskCompletionSource` is itself a one-shot manual-reset event: its `Task` is incomplete until you call a `Set*` method, and then all awaiters resume. Add a `Reset` method that swaps in a new `TaskCompletionSource`, and you have a reusable async manual-reset event. + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncManualResetEvent"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncManualResetEvent"::: + +Key implementation details: + +- The constructor passes to prevent `Set` from running waiter continuations synchronously on the calling thread. Without this flag, `Set` could block for an unpredictable amount of time. +- `Reset` uses to swap in a new `TaskCompletionSource` only when the current one is already completed. This atomic swap prevents orphaning a task that a waiter already received. + +The following example shows how two tasks coordinate through the event: + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncManualResetEventUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncManualResetEventUsage"::: + +## Async auto-reset event + +An auto-reset event is similar to a manual-reset event, but it automatically returns to the non-signaled state after releasing exactly one waiter. If multiple callers are waiting when the event is signaled, only one waiter resumes. The synchronous equivalent is . + +Because each signal releases only one waiter, you need a collection of `TaskCompletionSource` instances—one per waiter—so you can complete them individually: + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncAutoResetEvent"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncAutoResetEvent"::: + +Key implementation details: + +- The `Set` method completes the `TaskCompletionSource` *outside* the lock. Completing a TCS inside the lock runs synchronous continuations while the lock is held, which could cause deadlocks or unexpected reentrancy. +- When `Set` is called and no waiter is queued, the signal is stored so the next `WaitAsync` call completes immediately. + +The following example shows a producer signaling a consumer through the event: + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncAutoResetEventUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncAutoResetEventUsage"::: + +## Async countdown event + +A countdown event waits for a specified number of signals before it allows waiters to proceed. This pattern is useful for fork/join scenarios where you start N operations and want to await all N completions. The synchronous equivalent is . + +Build the async version by composing the `AsyncManualResetEvent` from the previous section with an atomic counter: + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncCountdownEvent"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncCountdownEvent"::: + +The `Signal` method decrements the count atomically with . When the count reaches zero, it sets the inner event, and all waiters resume. + +The following example uses a countdown event to await three concurrent operations: + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncCountdownEventUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncCountdownEventUsage"::: + +## Async barrier + +A barrier coordinates a fixed set of participants across multiple rounds. Each participant signals when it finishes its work for the current round and then waits for all other participants to finish. When the last participant signals, all participants resume, and the barrier resets for the next round. The synchronous equivalent is . + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncBarrier"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncBarrier"::: + +Key implementation details: + +- Before completing the shared `TaskCompletionSource`, the method resets the count and swaps in a new `TaskCompletionSource` for the next round. This ordering ensures that when waiters resume, the barrier is already ready for the next round. +- All participants share the same `Task`, which means all synchronous continuations run in series on the thread that completes the task. If that serialization is a concern, give each participant its own `TaskCompletionSource` and complete them in parallel. + +The following example runs three participants through two rounds of a barrier: + +:::code language="csharp" source="./snippets/async-coordination-primitives/csharp/Program.cs" id="AsyncBarrierUsage"::: +:::code language="vb" source="./snippets/async-coordination-primitives/vb/Program.vb" id="AsyncBarrierUsage"::: + +## See also + +- [Async semaphores, locks, and reader/writer coordination](async-coordination-primitives-advanced.md) +- [Task-based asynchronous pattern (TAP)](task-based-asynchronous-pattern-tap.md) +- [Complete your tasks](complete-your-tasks.md) +- [Keeping async methods alive](keeping-async-methods-alive.md) diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/csharp/AsyncCoordinationPrimitivesAdvanced.csproj b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/csharp/AsyncCoordinationPrimitivesAdvanced.csproj new file mode 100644 index 0000000000000..dfb40caafcf9a --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/csharp/AsyncCoordinationPrimitivesAdvanced.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/csharp/Program.cs b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/csharp/Program.cs new file mode 100644 index 0000000000000..a6200e281f630 --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/csharp/Program.cs @@ -0,0 +1,375 @@ +using System.Collections.Concurrent; + +// +public static class SemaphoreSlimDemo +{ + public static async Task RunAsync() + { + using var semaphore = new SemaphoreSlim(3); + + Task[] tasks = Enumerable.Range(1, 6).Select(id => Task.Run(async () => + { + await semaphore.WaitAsync(); + try + { + Console.WriteLine($"Task {id}: entered (count = {semaphore.CurrentCount})"); + await Task.Delay(100); + } + finally + { + semaphore.Release(); + Console.WriteLine($"Task {id}: released"); + } + })).ToArray(); + + await Task.WhenAll(tasks); + } +} +// + +// +public class AsyncSemaphore +{ + private readonly Queue _waiters = new(); + private int _currentCount; + + public AsyncSemaphore(int initialCount) + { + ArgumentOutOfRangeException.ThrowIfNegative(initialCount, nameof(initialCount)); + _currentCount = initialCount; + } + + public Task WaitAsync() + { + lock (_waiters) + { + if (_currentCount > 0) + { + _currentCount--; + return Task.CompletedTask; + } + else + { + var waiter = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _waiters.Enqueue(waiter); + return waiter.Task; + } + } + } + + public void Release() + { + TaskCompletionSource? toRelease = null; + + lock (_waiters) + { + if (_waiters.Count > 0) + toRelease = _waiters.Dequeue(); + else + _currentCount++; + } + + toRelease?.TrySetResult(); + } +} +// + +// +public static class SemaphoreSlimAsLockDemo +{ + private static readonly SemaphoreSlim s_lock = new(1, 1); + private static int s_sharedCounter; + + public static async Task RunAsync() + { + Task[] tasks = Enumerable.Range(1, 5).Select(_ => Task.Run(async () => + { + await s_lock.WaitAsync(); + try + { + int before = s_sharedCounter; + await Task.Delay(10); + s_sharedCounter = before + 1; + } + finally + { + s_lock.Release(); + } + })).ToArray(); + + await Task.WhenAll(tasks); + Console.WriteLine($"Counter = {s_sharedCounter} (expected 5)"); + } +} +// + +// +public class AsyncLock : IDisposable +{ + private readonly SemaphoreSlim _semaphore = new(1, 1); + private readonly Task _releaser; + + public AsyncLock() + { + _releaser = Task.FromResult(new Releaser(this)); + } + + public Task LockAsync() + { + Task wait = _semaphore.WaitAsync(); + return wait.IsCompleted + ? _releaser + : wait.ContinueWith( + (_, state) => new Releaser((AsyncLock)state!), + this, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + + public struct Releaser : IDisposable + { + private readonly AsyncLock? _toRelease; + + internal Releaser(AsyncLock toRelease) => _toRelease = toRelease; + + public void Dispose() => _toRelease?._semaphore.Release(); + } + + public void Dispose() => _semaphore.Dispose(); +} +// + +// +public static class AsyncLockDemo +{ + private static readonly AsyncLock s_lock = new(); + private static int s_sharedValue; + + public static async Task RunAsync() + { + Task[] tasks = Enumerable.Range(1, 5).Select(id => Task.Run(async () => + { + using (await s_lock.LockAsync()) + { + int before = s_sharedValue; + await Task.Delay(10); + s_sharedValue = before + 1; + Console.WriteLine($"Task {id}: incremented to {s_sharedValue}"); + } + })).ToArray(); + + await Task.WhenAll(tasks); + Console.WriteLine($"Final value = {s_sharedValue} (expected 5)"); + } +} +// + +// +public static class ConcurrentExclusiveDemo +{ + public static async Task RunAsync() + { + var pair = new ConcurrentExclusiveSchedulerPair(); + var factory = new TaskFactory(pair.ExclusiveScheduler); + + int sharedValue = 0; + + Task writerTask = factory.StartNew(() => + { + sharedValue = 42; + Console.WriteLine($"Writer: set value to {sharedValue}"); + }); + + var readerFactory = new TaskFactory(pair.ConcurrentScheduler); + + Task[] readerTasks = Enumerable.Range(1, 3).Select(id => + readerFactory.StartNew(() => + { + Console.WriteLine($"Reader {id}: value = {sharedValue}"); + })).ToArray(); + + await writerTask; + await Task.WhenAll(readerTasks); + } +} +// + +// +public class AsyncReaderWriterLock +{ + private readonly Queue> _waitingWriters = new(); + private TaskCompletionSource _waitingReader = new(); + private int _readersWaiting; + private int _status; // 0 = free, -1 = writer active, >0 = reader count + + private readonly Task _readerReleaser; + private readonly Task _writerReleaser; + + public AsyncReaderWriterLock() + { + _readerReleaser = Task.FromResult(new Releaser(this, isWriter: false)); + _writerReleaser = Task.FromResult(new Releaser(this, isWriter: true)); + } + + public Task ReaderLockAsync() + { + lock (_waitingWriters) + { + if (_status >= 0 && _waitingWriters.Count == 0) + { + _status++; + return _readerReleaser; + } + else + { + _readersWaiting++; + return _waitingReader.Task.ContinueWith(t => t.Result); + } + } + } + + public Task WriterLockAsync() + { + lock (_waitingWriters) + { + if (_status == 0) + { + _status = -1; + return _writerReleaser; + } + else + { + var waiter = new TaskCompletionSource(); + _waitingWriters.Enqueue(waiter); + return waiter.Task; + } + } + } + + private void ReaderRelease() + { + TaskCompletionSource? toWake = null; + + lock (_waitingWriters) + { + _status--; + if (_status == 0 && _waitingWriters.Count > 0) + { + _status = -1; + toWake = _waitingWriters.Dequeue(); + } + } + + toWake?.SetResult(new Releaser(this, isWriter: true)); + } + + private void WriterRelease() + { + TaskCompletionSource? toWake = null; + bool toWakeIsWriter = false; + + lock (_waitingWriters) + { + if (_waitingWriters.Count > 0) + { + toWake = _waitingWriters.Dequeue(); + toWakeIsWriter = true; + } + else if (_readersWaiting > 0) + { + toWake = _waitingReader; + _status = _readersWaiting; + _readersWaiting = 0; + _waitingReader = new TaskCompletionSource(); + } + else + { + _status = 0; + } + } + + toWake?.SetResult(new Releaser(this, toWakeIsWriter)); + } + + public struct Releaser : IDisposable + { + private readonly AsyncReaderWriterLock? _lock; + private readonly bool _isWriter; + + internal Releaser(AsyncReaderWriterLock lockObj, bool isWriter) + { + _lock = lockObj; + _isWriter = isWriter; + } + + public void Dispose() + { + if (_lock is not null) + { + if (_isWriter) _lock.WriterRelease(); + else _lock.ReaderRelease(); + } + } + } +} +// + +// +public static class AsyncReaderWriterLockDemo +{ + private static readonly AsyncReaderWriterLock s_rwLock = new(); + private static string s_data = "initial"; + + public static async Task RunAsync() + { + Task writer = Task.Run(async () => + { + using (await s_rwLock.WriterLockAsync()) + { + Console.WriteLine("Writer: acquired exclusive lock"); + await Task.Delay(50); + s_data = "updated"; + Console.WriteLine("Writer: data updated"); + } + }); + + Task[] readers = Enumerable.Range(1, 3).Select(id => Task.Run(async () => + { + await Task.Delay(10); + using (await s_rwLock.ReaderLockAsync()) + { + Console.WriteLine($"Reader {id}: data = {s_data}"); + } + })).ToArray(); + + await writer; + await Task.WhenAll(readers); + } +} +// + +public static class Program +{ + public static async Task Main() + { + Console.WriteLine("--- SemaphoreSlim ---"); + await SemaphoreSlimDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- SemaphoreSlim as lock ---"); + await SemaphoreSlimAsLockDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- AsyncLock ---"); + await AsyncLockDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- ConcurrentExclusiveSchedulerPair ---"); + await ConcurrentExclusiveDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- AsyncReaderWriterLock ---"); + await AsyncReaderWriterLockDemo.RunAsync(); + } +} diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/vb/AsyncCoordinationPrimitivesAdvanced.vbproj b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/vb/AsyncCoordinationPrimitivesAdvanced.vbproj new file mode 100644 index 0000000000000..219b1b9e39ace --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/vb/AsyncCoordinationPrimitivesAdvanced.vbproj @@ -0,0 +1,9 @@ + + + + Exe + AsyncCoordinationPrimitivesAdvanced + net10.0 + + + diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/vb/Program.vb b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/vb/Program.vb new file mode 100644 index 0000000000000..4c4ce7483885f --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives-advanced/vb/Program.vb @@ -0,0 +1,331 @@ +Imports System.Collections.Concurrent +Imports System.Threading + +' +Public Module SemaphoreSlimDemo + Public Async Function RunAsync() As Task + Using semaphore As New SemaphoreSlim(3) + Dim tasks As Task() = Enumerable.Range(1, 6).Select( + Function(id) Task.Run(Async Function() + Await semaphore.WaitAsync() + Try + Console.WriteLine($"Task {id}: entered (count = {semaphore.CurrentCount})") + Await Task.Delay(100) + Finally + semaphore.Release() + Console.WriteLine($"Task {id}: released") + End Try + End Function)).ToArray() + + Await Task.WhenAll(tasks) + End Using + End Function +End Module +' + +' +Public Class AsyncSemaphore + Private ReadOnly _waiters As New Queue(Of TaskCompletionSource)() + Private _currentCount As Integer + + Public Sub New(initialCount As Integer) + If initialCount < 0 Then Throw New ArgumentOutOfRangeException(NameOf(initialCount)) + _currentCount = initialCount + End Sub + + Public Function WaitAsync() As Task + SyncLock _waiters + If _currentCount > 0 Then + _currentCount -= 1 + Return Task.CompletedTask + Else + Dim waiter As New TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) + _waiters.Enqueue(waiter) + Return waiter.Task + End If + End SyncLock + End Function + + Public Sub Release() + Dim toRelease As TaskCompletionSource = Nothing + + SyncLock _waiters + If _waiters.Count > 0 Then + toRelease = _waiters.Dequeue() + Else + _currentCount += 1 + End If + End SyncLock + + toRelease?.TrySetResult() + End Sub +End Class +' + +' +Public Module SemaphoreSlimAsLockDemo + Private ReadOnly s_lock As New SemaphoreSlim(1, 1) + Private s_sharedCounter As Integer + + Public Async Function RunAsync() As Task + Dim tasks As Task() = Enumerable.Range(1, 5).Select( + Function(unused) Task.Run(Async Function() + Await s_lock.WaitAsync() + Try + Dim before As Integer = s_sharedCounter + Await Task.Delay(10) + s_sharedCounter = before + 1 + Finally + s_lock.Release() + End Try + End Function)).ToArray() + + Await Task.WhenAll(tasks) + Console.WriteLine($"Counter = {s_sharedCounter} (expected 5)") + End Function +End Module +' + +' +Public Class AsyncLock + Implements IDisposable + + Private ReadOnly _semaphore As New SemaphoreSlim(1, 1) + Private ReadOnly _releaser As Task(Of Releaser) + + Public Sub New() + _releaser = Task.FromResult(New Releaser(Me)) + End Sub + + Public Function LockAsync() As Task(Of Releaser) + Dim wait As Task = _semaphore.WaitAsync() + If wait.IsCompleted Then + Return _releaser + Else + Return wait.ContinueWith( + Function(unused, state) New Releaser(DirectCast(state, AsyncLock)), + Me, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default) + End If + End Function + + Public Structure Releaser + Implements IDisposable + + Private ReadOnly _toRelease As AsyncLock + + Friend Sub New(toRelease As AsyncLock) + _toRelease = toRelease + End Sub + + Public Sub Dispose() Implements IDisposable.Dispose + _toRelease?._semaphore.Release() + End Sub + End Structure + + Public Sub Dispose() Implements IDisposable.Dispose + _semaphore.Dispose() + End Sub +End Class +' + +' +Public Module AsyncLockDemo + Private ReadOnly s_lock As New AsyncLock() + Private s_sharedValue As Integer + + Public Async Function RunAsync() As Task + Dim tasks As Task() = Enumerable.Range(1, 5).Select( + Function(id) Task.Run(Async Function() + Using Await s_lock.LockAsync() + Dim before As Integer = s_sharedValue + Await Task.Delay(10) + s_sharedValue = before + 1 + Console.WriteLine($"Task {id}: incremented to {s_sharedValue}") + End Using + End Function)).ToArray() + + Await Task.WhenAll(tasks) + Console.WriteLine($"Final value = {s_sharedValue} (expected 5)") + End Function +End Module +' + +' +Public Module ConcurrentExclusiveDemo + Public Async Function RunAsync() As Task + Dim pair As New ConcurrentExclusiveSchedulerPair() + Dim exclusiveFactory As New TaskFactory(pair.ExclusiveScheduler) + + Dim sharedValue As Integer = 0 + + Dim writerTask As Task = exclusiveFactory.StartNew(Sub() + sharedValue = 42 + Console.WriteLine($"Writer: set value to {sharedValue}") + End Sub) + + Dim readerFactory As New TaskFactory(pair.ConcurrentScheduler) + + Dim readerTasks As Task() = Enumerable.Range(1, 3).Select( + Function(id) readerFactory.StartNew(Sub() + Console.WriteLine($"Reader {id}: value = {sharedValue}") + End Sub)).ToArray() + + Await writerTask + Await Task.WhenAll(readerTasks) + End Function +End Module +' + +' +Public Class AsyncReaderWriterLock + Private ReadOnly _waitingWriters As New Queue(Of TaskCompletionSource(Of Releaser))() + Private _waitingReader As New TaskCompletionSource(Of Releaser)() + Private _readersWaiting As Integer + Private _status As Integer ' 0 = free, -1 = writer active, >0 = reader count + + Private ReadOnly _readerReleaser As Task(Of Releaser) + Private ReadOnly _writerReleaser As Task(Of Releaser) + + Public Sub New() + _readerReleaser = Task.FromResult(New Releaser(Me, isWriter:=False)) + _writerReleaser = Task.FromResult(New Releaser(Me, isWriter:=True)) + End Sub + + Public Function ReaderLockAsync() As Task(Of Releaser) + SyncLock _waitingWriters + If _status >= 0 AndAlso _waitingWriters.Count = 0 Then + _status += 1 + Return _readerReleaser + Else + _readersWaiting += 1 + Return _waitingReader.Task.ContinueWith(Function(t) t.Result) + End If + End SyncLock + End Function + + Public Function WriterLockAsync() As Task(Of Releaser) + SyncLock _waitingWriters + If _status = 0 Then + _status = -1 + Return _writerReleaser + Else + Dim waiter As New TaskCompletionSource(Of Releaser)() + _waitingWriters.Enqueue(waiter) + Return waiter.Task + End If + End SyncLock + End Function + + Private Sub ReaderRelease() + Dim toWake As TaskCompletionSource(Of Releaser) = Nothing + + SyncLock _waitingWriters + _status -= 1 + If _status = 0 AndAlso _waitingWriters.Count > 0 Then + _status = -1 + toWake = _waitingWriters.Dequeue() + End If + End SyncLock + + toWake?.SetResult(New Releaser(Me, isWriter:=True)) + End Sub + + Private Sub WriterRelease() + Dim toWake As TaskCompletionSource(Of Releaser) = Nothing + Dim toWakeIsWriter As Boolean = False + + SyncLock _waitingWriters + If _waitingWriters.Count > 0 Then + toWake = _waitingWriters.Dequeue() + toWakeIsWriter = True + ElseIf _readersWaiting > 0 Then + toWake = _waitingReader + _status = _readersWaiting + _readersWaiting = 0 + _waitingReader = New TaskCompletionSource(Of Releaser)() + Else + _status = 0 + End If + End SyncLock + + toWake?.SetResult(New Releaser(Me, toWakeIsWriter)) + End Sub + + Public Structure Releaser + Implements IDisposable + + Private ReadOnly _lock As AsyncReaderWriterLock + Private ReadOnly _isWriter As Boolean + + Friend Sub New(lockObj As AsyncReaderWriterLock, isWriter As Boolean) + _lock = lockObj + _isWriter = isWriter + End Sub + + Public Sub Dispose() Implements IDisposable.Dispose + If _lock IsNot Nothing Then + If _isWriter Then + _lock.WriterRelease() + Else + _lock.ReaderRelease() + End If + End If + End Sub + End Structure +End Class +' + +' +Public Module AsyncReaderWriterLockDemo + Private ReadOnly s_rwLock As New AsyncReaderWriterLock() + Private s_data As String = "initial" + + Public Async Function RunAsync() As Task + Dim writer As Task = Task.Run(Async Function() + Using Await s_rwLock.WriterLockAsync() + Console.WriteLine("Writer: acquired exclusive lock") + Await Task.Delay(50) + s_data = "updated" + Console.WriteLine("Writer: data updated") + End Using + End Function) + + Dim readers As Task() = Enumerable.Range(1, 3).Select( + Function(id) Task.Run(Async Function() + Await Task.Delay(10) + Using Await s_rwLock.ReaderLockAsync() + Console.WriteLine($"Reader {id}: data = {s_data}") + End Using + End Function)).ToArray() + + Await writer + Await Task.WhenAll(readers) + End Function +End Module +' + +Module Program + Sub Main() + Console.WriteLine("--- SemaphoreSlim ---") + SemaphoreSlimDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- SemaphoreSlim as lock ---") + SemaphoreSlimAsLockDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- AsyncLock ---") + AsyncLockDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- ConcurrentExclusiveSchedulerPair ---") + ConcurrentExclusiveDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- AsyncReaderWriterLock ---") + AsyncReaderWriterLockDemo.RunAsync().Wait() + End Sub +End Module diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/csharp/AsyncCoordinationPrimitives.csproj b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/csharp/AsyncCoordinationPrimitives.csproj new file mode 100644 index 0000000000000..dfb40caafcf9a --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/csharp/AsyncCoordinationPrimitives.csproj @@ -0,0 +1,10 @@ + + + + Exe + net10.0 + enable + enable + + + diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/csharp/Program.cs b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/csharp/Program.cs new file mode 100644 index 0000000000000..659f2ce3c41c7 --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/csharp/Program.cs @@ -0,0 +1,256 @@ +// +public class AsyncManualResetEvent +{ + private volatile TaskCompletionSource _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public Task WaitAsync() => _tcs.Task; + + public void Set() => _tcs.TrySetResult(); + + public void Reset() + { + while (true) + { + TaskCompletionSource tcs = _tcs; + if (!tcs.Task.IsCompleted || + Interlocked.CompareExchange( + ref _tcs, + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + tcs) == tcs) + { + return; + } + } + } +} +// + +// +public static class AsyncManualResetEventDemo +{ + public static async Task RunAsync() + { + var gate = new AsyncManualResetEvent(); + + Task waiter = Task.Run(async () => + { + Console.WriteLine("Waiter: waiting for signal..."); + await gate.WaitAsync(); + Console.WriteLine("Waiter: signal received!"); + }); + + await Task.Delay(100); + Console.WriteLine("Signaler: setting the event."); + gate.Set(); + + await waiter; + } +} +// + +// +public class AsyncAutoResetEvent +{ + private readonly Queue _waiters = new(); + private bool _signaled; + + public Task WaitAsync() + { + lock (_waiters) + { + if (_signaled) + { + _signaled = false; + return Task.CompletedTask; + } + else + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _waiters.Enqueue(tcs); + return tcs.Task; + } + } + } + + public void Set() + { + TaskCompletionSource? toRelease = null; + + lock (_waiters) + { + if (_waiters.Count > 0) + { + toRelease = _waiters.Dequeue(); + } + else if (!_signaled) + { + _signaled = true; + } + } + + toRelease?.TrySetResult(); + } +} +// + +// +public static class AsyncAutoResetEventDemo +{ + public static async Task RunAsync() + { + var autoEvent = new AsyncAutoResetEvent(); + + Task consumer = Task.Run(async () => + { + for (int i = 0; i < 3; i++) + { + await autoEvent.WaitAsync(); + Console.WriteLine($"Consumer: received signal {i + 1}"); + } + }); + + for (int i = 0; i < 3; i++) + { + await Task.Delay(50); + Console.WriteLine($"Producer: sending signal {i + 1}"); + autoEvent.Set(); + } + + await consumer; + } +} +// + +// +public class AsyncCountdownEvent +{ + private readonly AsyncManualResetEvent _event = new(); + private int _count; + + public AsyncCountdownEvent(int initialCount) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(initialCount, nameof(initialCount)); + _count = initialCount; + } + + public Task WaitAsync() => _event.WaitAsync(); + + public void Signal() + { + if (_count <= 0) + throw new InvalidOperationException("The event is already signaled."); + + int newCount = Interlocked.Decrement(ref _count); + + if (newCount == 0) + _event.Set(); + else if (newCount < 0) + throw new InvalidOperationException("Too many signals."); + } + + public Task SignalAndWait() + { + Signal(); + return WaitAsync(); + } +} +// + +// +public static class AsyncCountdownEventDemo +{ + public static async Task RunAsync() + { + var countdown = new AsyncCountdownEvent(3); + + for (int i = 1; i <= 3; i++) + { + int id = i; + _ = Task.Run(async () => + { + await Task.Delay(id * 30); + Console.WriteLine($"Worker {id}: done."); + countdown.Signal(); + }); + } + + await countdown.WaitAsync(); + Console.WriteLine("All workers finished."); + } +} +// + +// +public class AsyncBarrier +{ + private readonly int _participantCount; + private int _remainingParticipants; + private TaskCompletionSource _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public AsyncBarrier(int participantCount) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(participantCount, nameof(participantCount)); + _remainingParticipants = _participantCount = participantCount; + } + + public Task SignalAndWait() + { + TaskCompletionSource tcs = _tcs; + + if (Interlocked.Decrement(ref _remainingParticipants) == 0) + { + _remainingParticipants = _participantCount; + _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + tcs.SetResult(); + } + + return tcs.Task; + } +} +// + +// +public static class AsyncBarrierDemo +{ + public static async Task RunAsync() + { + var barrier = new AsyncBarrier(3); + int rounds = 2; + + Task[] participants = Enumerable.Range(1, 3).Select(id => Task.Run(async () => + { + for (int round = 1; round <= rounds; round++) + { + Console.WriteLine($"Participant {id}: working on round {round}"); + await Task.Delay(id * 20); + Console.WriteLine($"Participant {id}: finished round {round}, waiting at barrier"); + await barrier.SignalAndWait(); + } + })).ToArray(); + + await Task.WhenAll(participants); + Console.WriteLine("All rounds complete."); + } +} +// + +public static class Program +{ + public static async Task Main() + { + Console.WriteLine("--- AsyncManualResetEvent ---"); + await AsyncManualResetEventDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- AsyncAutoResetEvent ---"); + await AsyncAutoResetEventDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- AsyncCountdownEvent ---"); + await AsyncCountdownEventDemo.RunAsync(); + + Console.WriteLine(); + Console.WriteLine("--- AsyncBarrier ---"); + await AsyncBarrierDemo.RunAsync(); + } +} diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/vb/AsyncCoordinationPrimitives.vbproj b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/vb/AsyncCoordinationPrimitives.vbproj new file mode 100644 index 0000000000000..d91391285e5e7 --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/vb/AsyncCoordinationPrimitives.vbproj @@ -0,0 +1,9 @@ + + + + Exe + AsyncCoordinationPrimitives + net10.0 + + + diff --git a/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/vb/Program.vb b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/vb/Program.vb new file mode 100644 index 0000000000000..4b017fa20c951 --- /dev/null +++ b/docs/standard/asynchronous-programming-patterns/snippets/async-coordination-primitives/vb/Program.vb @@ -0,0 +1,227 @@ +Imports System.Threading + +' +Public Class AsyncManualResetEvent + Private _tcs As TaskCompletionSource = New TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) + + Public Function WaitAsync() As Task + Return _tcs.Task + End Function + + Public Sub [Set]() + _tcs.TrySetResult() + End Sub + + Public Sub Reset() + Do + Dim tcs As TaskCompletionSource = _tcs + If Not tcs.Task.IsCompleted OrElse + Interlocked.CompareExchange( + _tcs, + New TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), + tcs) Is tcs Then + Return + End If + Loop + End Sub +End Class +' + +' +Public Module AsyncManualResetEventDemo + Public Async Function RunAsync() As Task + Dim gate As New AsyncManualResetEvent() + + Dim waiter As Task = Task.Run(Async Function() + Console.WriteLine("Waiter: waiting for signal...") + Await gate.WaitAsync() + Console.WriteLine("Waiter: signal received!") + End Function) + + Await Task.Delay(100) + Console.WriteLine("Signaler: setting the event.") + gate.Set() + + Await waiter + End Function +End Module +' + +' +Public Class AsyncAutoResetEvent + Private ReadOnly _waiters As New Queue(Of TaskCompletionSource)() + Private _signaled As Boolean + + Public Function WaitAsync() As Task + SyncLock _waiters + If _signaled Then + _signaled = False + Return Task.CompletedTask + Else + Dim tcs As New TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) + _waiters.Enqueue(tcs) + Return tcs.Task + End If + End SyncLock + End Function + + Public Sub [Set]() + Dim toRelease As TaskCompletionSource = Nothing + + SyncLock _waiters + If _waiters.Count > 0 Then + toRelease = _waiters.Dequeue() + ElseIf Not _signaled Then + _signaled = True + End If + End SyncLock + + toRelease?.TrySetResult() + End Sub +End Class +' + +' +Public Module AsyncAutoResetEventDemo + Public Async Function RunAsync() As Task + Dim autoEvent As New AsyncAutoResetEvent() + + Dim consumer As Task = Task.Run(Async Function() + For i As Integer = 0 To 2 + Await autoEvent.WaitAsync() + Console.WriteLine($"Consumer: received signal {i + 1}") + Next + End Function) + + For i As Integer = 0 To 2 + Await Task.Delay(50) + Console.WriteLine($"Producer: sending signal {i + 1}") + autoEvent.Set() + Next + + Await consumer + End Function +End Module +' + +' +Public Class AsyncCountdownEvent + Private ReadOnly _event As New AsyncManualResetEvent() + Private _count As Integer + + Public Sub New(initialCount As Integer) + If initialCount <= 0 Then Throw New ArgumentOutOfRangeException(NameOf(initialCount)) + _count = initialCount + End Sub + + Public Function WaitAsync() As Task + Return _event.WaitAsync() + End Function + + Public Sub Signal() + If _count <= 0 Then + Throw New InvalidOperationException("The event is already signaled.") + End If + + Dim newCount As Integer = Interlocked.Decrement(_count) + + If newCount = 0 Then + _event.Set() + ElseIf newCount < 0 Then + Throw New InvalidOperationException("Too many signals.") + End If + End Sub + + Public Function SignalAndWait() As Task + Signal() + Return WaitAsync() + End Function +End Class +' + +' +Public Module AsyncCountdownEventDemo + Public Async Function RunAsync() As Task + Dim countdown As New AsyncCountdownEvent(3) + + For i As Integer = 1 To 3 + Dim id As Integer = i + Dim backgroundTask As Task = Task.Run(Async Function() + Await Task.Delay(id * 30) + Console.WriteLine($"Worker {id}: done.") + countdown.Signal() + End Function) + Next + + Await countdown.WaitAsync() + Console.WriteLine("All workers finished.") + End Function +End Module +' + +' +Public Class AsyncBarrier + Private ReadOnly _participantCount As Integer + Private _remainingParticipants As Integer + Private _tcs As TaskCompletionSource = New TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) + + Public Sub New(participantCount As Integer) + If participantCount <= 0 Then Throw New ArgumentOutOfRangeException(NameOf(participantCount)) + _participantCount = participantCount + _remainingParticipants = participantCount + End Sub + + Public Function SignalAndWait() As Task + Dim tcs As TaskCompletionSource = _tcs + + If Interlocked.Decrement(_remainingParticipants) = 0 Then + _remainingParticipants = _participantCount + _tcs = New TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) + tcs.SetResult() + End If + + Return tcs.Task + End Function +End Class +' + +' +Public Module AsyncBarrierDemo + Public Async Function RunAsync() As Task + Dim barrier As New AsyncBarrier(3) + Dim rounds As Integer = 2 + + Dim participants As Task() = Enumerable.Range(1, 3).Select( + Function(id) Task.Run(Async Function() + For round As Integer = 1 To rounds + Console.WriteLine($"Participant {id}: working on round {round}") + Await Task.Delay(id * 20) + Console.WriteLine($"Participant {id}: finished round {round}, waiting at barrier") + Await barrier.SignalAndWait() + Next + End Function)).ToArray() + + Await Task.WhenAll(participants) + Console.WriteLine("All rounds complete.") + End Function +End Module +' + +Module Program + Sub Main() + Console.WriteLine("--- AsyncManualResetEvent ---") + AsyncManualResetEventDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- AsyncAutoResetEvent ---") + AsyncAutoResetEventDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- AsyncCountdownEvent ---") + AsyncCountdownEventDemo.RunAsync().Wait() + + Console.WriteLine() + Console.WriteLine("--- AsyncBarrier ---") + AsyncBarrierDemo.RunAsync().Wait() + End Sub +End Module