From 8890826ddb8dde1395f6851681e7ace51d007c72 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Wed, 13 May 2026 23:43:23 -0400 Subject: [PATCH] chore(bus): cover serial queue processing Signed-off-by: Yordis Prieto --- .../Bus/QueuedHandlerThreadPoolTests.cs | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 src/EventStore.Core.XUnit.Tests/Bus/QueuedHandlerThreadPoolTests.cs diff --git a/src/EventStore.Core.XUnit.Tests/Bus/QueuedHandlerThreadPoolTests.cs b/src/EventStore.Core.XUnit.Tests/Bus/QueuedHandlerThreadPoolTests.cs new file mode 100644 index 000000000..3fa54fc27 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Bus/QueuedHandlerThreadPoolTests.cs @@ -0,0 +1,99 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using EventStore.Core.Bus; +using EventStore.Core.Messaging; +using EventStore.Core.Metrics; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Bus; + +public class QueuedHandlerThreadPoolTests +{ + [Fact] + public async Task processes_messages_serially() + { + const int messageCount = 8; + var consumer = new RecordingConsumer(messageCount, TimeSpan.FromMilliseconds(20)); + var queue = new QueuedHandlerThreadPool( + consumer, + "serial-queue-test", + new QueueStatsManager(), + new QueueTrackers(), + watchSlowMsg: false, + threadStopWaitTimeout: TimeSpan.FromSeconds(5)); + + _ = queue.Start(); + try + { + for (var i = 0; i < messageCount; i++) + { + queue.Publish(new TestMessage()); + } + + await consumer.WaitUntilComplete(TimeSpan.FromSeconds(5)); + + Assert.Equal(1, consumer.MaxConcurrency); + Assert.Equal(messageCount, consumer.HandledCount); + } + finally + { + await queue.Stop(); + } + } + + private sealed class RecordingConsumer(int expectedCount, TimeSpan delay) : IAsyncHandle + { + private readonly TaskCompletionSource _complete = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + private int _handledCount; + private int _inFlight; + private int _maxConcurrency; + + public int HandledCount => Volatile.Read(ref _handledCount); + + public int MaxConcurrency => Volatile.Read(ref _maxConcurrency); + + public async ValueTask HandleAsync(Message message, CancellationToken token) + { + var current = Interlocked.Increment(ref _inFlight); + RecordMaxConcurrency(current); + try + { + await Task.Delay(delay, token); + } + finally + { + Interlocked.Decrement(ref _inFlight); + } + + if (Interlocked.Increment(ref _handledCount) == expectedCount) + { + _complete.TrySetResult(); + } + } + + public Task WaitUntilComplete(TimeSpan timeout) => + _complete.Task.WaitAsync(timeout); + + private void RecordMaxConcurrency(int current) + { + while (true) + { + var observed = Volatile.Read(ref _maxConcurrency); + if (current <= observed) + { + return; + } + + if (Interlocked.CompareExchange(ref _maxConcurrency, current, observed) == observed) + { + return; + } + } + } + } + + private sealed class TestMessage : Message; +}