diff --git a/src/EventStore.ClusterNode/metricsconfig.json b/src/EventStore.ClusterNode/metricsconfig.json index b8389696e..b1bc02497 100644 --- a/src/EventStore.ClusterNode/metricsconfig.json +++ b/src/EventStore.ClusterNode/metricsconfig.json @@ -167,6 +167,10 @@ "Regex": "Timer", "Label": "Timer" }, + { + "Regex": "ThreadPoolBacklog", + "Label": "ThreadPoolBacklog" + }, { "Regex": "StorageReaderQueue #.*", "Label": "Readers" diff --git a/src/EventStore.Core.XUnit.Tests/Metrics/QueueStatsCollectorTests.cs b/src/EventStore.Core.XUnit.Tests/Metrics/QueueStatsCollectorTests.cs new file mode 100644 index 000000000..3709238ac --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Metrics/QueueStatsCollectorTests.cs @@ -0,0 +1,46 @@ +using EventStore.Core.Bus; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Metrics; + +public class QueueStatsCollectorTests +{ + [Fact] + public void statistics_include_the_current_queue_length_in_the_peak() + { + var collector = new QueueStatsCollector("queue"); + collector.Start(); + try + { + var stats = collector.GetStatistics(currentQueueLength: 5); + + Assert.Equal(5, stats.Length); + Assert.Equal(5, stats.LengthCurrentTryPeak); + Assert.Equal(5, stats.LengthLifetimePeak); + } + finally + { + collector.Stop(); + } + } + + [Fact] + public void reported_queue_length_updates_the_peak() + { + var collector = new QueueStatsCollector("queue"); + collector.Start(); + try + { + collector.ReportQueueLength(7); + var stats = collector.GetStatistics(currentQueueLength: 3); + + Assert.Equal(3, stats.Length); + Assert.Equal(7, stats.LengthCurrentTryPeak); + Assert.Equal(7, stats.LengthLifetimePeak); + } + finally + { + collector.Stop(); + } + } +} diff --git a/src/EventStore.Core/Bus/QueueStatsCollector.cs b/src/EventStore.Core/Bus/QueueStatsCollector.cs index 467194c70..c760bd114 100644 --- a/src/EventStore.Core/Bus/QueueStatsCollector.cs +++ b/src/EventStore.Core/Bus/QueueStatsCollector.cs @@ -89,12 +89,17 @@ public void ProcessingStarted(int queueLength) public void ProcessingStarted(Type msgType, int queueLength) { - _lifetimeQueueLengthPeak = _lifetimeQueueLengthPeak > queueLength ? _lifetimeQueueLengthPeak : queueLength; - _currentQueueLengthPeak = _currentQueueLengthPeak > queueLength ? _currentQueueLengthPeak : queueLength; + ReportQueueLength(queueLength); _inProgressMsgType = msgType; } + public void ReportQueueLength(int queueLength) + { + _lifetimeQueueLengthPeak = Math.Max(_lifetimeQueueLengthPeak, queueLength); + _currentQueueLengthPeak = Math.Max(_currentQueueLengthPeak, queueLength); + } + public void ProcessingEnded(int itemsProcessed) { Interlocked.Add(ref _totalItems, itemsProcessed); @@ -159,6 +164,8 @@ public QueueStats GetStatistics(int currentQueueLength) { lock (_statisticsLock) { + ReportQueueLength(currentQueueLength); + var totalTime = _totalTimeWatch.Elapsed; var totalIdleTime = _totalIdleWatch.Elapsed; var totalBusyTime = _totalBusyWatch.Elapsed; diff --git a/src/EventStore.Core/ClusterVNode.cs b/src/EventStore.Core/ClusterVNode.cs index d7bfea6b4..a85980d0e 100644 --- a/src/EventStore.Core/ClusterVNode.cs +++ b/src/EventStore.Core/ClusterVNode.cs @@ -31,6 +31,7 @@ using EventStore.Core.LogAbstraction; using EventStore.Core.Messages; using EventStore.Core.Messaging; +using EventStore.Core.Metrics; using EventStore.Core.Services; using EventStore.Core.Services.Archive; using EventStore.Core.Services.Archive.Naming; @@ -205,6 +206,7 @@ internal MultiQueuedHandler WorkersHandler private readonly Func _start; private readonly INodeHttpClientFactory _nodeHttpClientFactory; private readonly EventStoreClusterClientCache _eventStoreClusterClientCache; + private ThreadPoolBacklogMonitor _threadPoolBacklogMonitor; private int _stopCalled; private int _systemInitPublished; @@ -634,6 +636,9 @@ void StartSubsystems() monitoringInnerBus.Subscribe(monitoring); monitoringInnerBus.Subscribe(monitoring); + _threadPoolBacklogMonitor = new ThreadPoolBacklogMonitor(_queueStatsManager, trackers.QueueTrackers); + _threadPoolBacklogMonitor.Start(); + var indexPath = options.Database.Index ?? Path.Combine(Db.Config.Path, ESConsts.DefaultIndexDirectoryName); var pTableMaxReaderCount = GetPTableMaxReaderCount(readerThreadsCount); @@ -2007,6 +2012,8 @@ public async ValueTask HandleAsync(SystemMessage.BecomeShuttingDown message, Can _reloadConfigSignalRegistration?.Dispose(); _reloadConfigSignalRegistration = null; + _threadPoolBacklogMonitor?.Dispose(); + _threadPoolBacklogMonitor = null; foreach (var subsystem in _subsystems ?? []) { diff --git a/src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs b/src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs new file mode 100644 index 000000000..b92cf3373 --- /dev/null +++ b/src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs @@ -0,0 +1,105 @@ +using System; +using System.Threading; +using EventStore.Core.Bus; +using EventStore.Core.Services.Monitoring.Stats; +using EventStore.Core.Time; + +namespace EventStore.Core.Metrics; + +public sealed class ThreadPoolBacklogMonitor : IMonitoredQueue, IThreadPoolWorkItem, IDisposable +{ + private static readonly TimeSpan SampleInterval = TimeSpan.FromSeconds(1); + private const string QueueName = "ThreadPoolBacklog"; + + private readonly QueueStatsCollector _queueStats; + private readonly QueueTracker _tracker; + private readonly Timer _timer; + private Instant _enqueuedAt; + private int _stopped; + + public ThreadPoolBacklogMonitor(QueueStatsManager queueStatsManager, QueueTrackers trackers) + { + _queueStats = queueStatsManager.CreateQueueStatsCollector(QueueName); + _tracker = trackers.GetTrackerForQueue(QueueName); + _timer = new Timer(_ => QueueWorkItem()); + } + + public string Name => _queueStats.Name; + + public void Start() + { + _queueStats.Start(); + QueueMonitor.Default.Register(this); + QueueWorkItem(); + } + + public void Stop() + { + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; + } + + _timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + QueueMonitor.Default.Unregister(this); + _queueStats.Stop(); + } + + public void Dispose() + { + Stop(); + _timer.Dispose(); + } + + public void Execute() + { + if (Volatile.Read(ref _stopped) != 0) + { + return; + } + + _queueStats.EnterBusy(); + try + { + var length = GetPendingWorkItemCount(); + _queueStats.ProcessingStarted(length); + _queueStats.ReportQueueLength(length); + _tracker.RecordQueueLength(length); + _tracker.RecordMessageDequeued(_enqueuedAt); + _queueStats.ProcessingEnded(1); + } + finally + { + _queueStats.EnterIdle(); + } + + if (Volatile.Read(ref _stopped) == 0) + { + _timer.Change(SampleInterval, Timeout.InfiniteTimeSpan); + } + } + + public QueueStats GetStatistics() => + _queueStats.GetStatistics(GetPendingWorkItemCount()); + + private void QueueWorkItem() + { + if (Volatile.Read(ref _stopped) != 0) + { + return; + } + + _enqueuedAt = _tracker.Now; + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + + private static int GetPendingWorkItemCount() + { + var count = ThreadPool.PendingWorkItemCount; + return count > int.MaxValue ? int.MaxValue : (int)count; + } + + private sealed class ThreadPoolBacklogSample + { + } +}