Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/EventStore.ClusterNode/metricsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@
"Regex": "Timer",
"Label": "Timer"
},
{
"Regex": "ThreadPoolBacklog",
"Label": "ThreadPoolBacklog"
},
{
"Regex": "StorageReaderQueue #.*",
"Label": "Readers"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using EventStore.Core.Bus;
using Xunit;

namespace EventStore.Core.XUnit.Tests.Metrics;

public class QueueStatsCollectorTests
{
[Fact]
public void statistics_include_the_current_queue_length_in_the_peak()
{
var collector = new QueueStatsCollector("queue");
collector.Start();
try
{
var stats = collector.GetStatistics(currentQueueLength: 5);

Assert.Equal(5, stats.Length);
Assert.Equal(5, stats.LengthCurrentTryPeak);
Assert.Equal(5, stats.LengthLifetimePeak);
}
finally
{
collector.Stop();
}
}

[Fact]
public void reported_queue_length_updates_the_peak()
{
var collector = new QueueStatsCollector("queue");
collector.Start();
try
{
collector.ReportQueueLength(7);
var stats = collector.GetStatistics(currentQueueLength: 3);

Assert.Equal(3, stats.Length);
Assert.Equal(7, stats.LengthCurrentTryPeak);
Assert.Equal(7, stats.LengthLifetimePeak);
}
finally
{
collector.Stop();
}
}
}
11 changes: 9 additions & 2 deletions src/EventStore.Core/Bus/QueueStatsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ public void ProcessingStarted<T>(int queueLength)

public void ProcessingStarted(Type msgType, int queueLength)
{
_lifetimeQueueLengthPeak = _lifetimeQueueLengthPeak > queueLength ? _lifetimeQueueLengthPeak : queueLength;
_currentQueueLengthPeak = _currentQueueLengthPeak > queueLength ? _currentQueueLengthPeak : queueLength;
ReportQueueLength(queueLength);

_inProgressMsgType = msgType;
}

public void ReportQueueLength(int queueLength)
{
_lifetimeQueueLengthPeak = Math.Max(_lifetimeQueueLengthPeak, queueLength);
_currentQueueLengthPeak = Math.Max(_currentQueueLengthPeak, queueLength);
}

public void ProcessingEnded(int itemsProcessed)
{
Interlocked.Add(ref _totalItems, itemsProcessed);
Expand Down Expand Up @@ -159,6 +164,8 @@ public QueueStats GetStatistics(int currentQueueLength)
{
lock (_statisticsLock)
{
ReportQueueLength(currentQueueLength);

var totalTime = _totalTimeWatch.Elapsed;
var totalIdleTime = _totalIdleWatch.Elapsed;
var totalBusyTime = _totalBusyWatch.Elapsed;
Expand Down
7 changes: 7 additions & 0 deletions src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using EventStore.Core.LogAbstraction;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Core.Metrics;
using EventStore.Core.Services;
using EventStore.Core.Services.Archive;
using EventStore.Core.Services.Archive.Naming;
Expand Down Expand Up @@ -205,6 +206,7 @@ internal MultiQueuedHandler WorkersHandler
private readonly Func<CancellationToken, Task> _start;
private readonly INodeHttpClientFactory _nodeHttpClientFactory;
private readonly EventStoreClusterClientCache _eventStoreClusterClientCache;
private ThreadPoolBacklogMonitor _threadPoolBacklogMonitor;

private int _stopCalled;
private int _systemInitPublished;
Expand Down Expand Up @@ -634,6 +636,9 @@ void StartSubsystems()
monitoringInnerBus.Subscribe<MonitoringMessage.GetFreshStats>(monitoring);
monitoringInnerBus.Subscribe<MonitoringMessage.GetFreshTcpConnectionStats>(monitoring);

_threadPoolBacklogMonitor = new ThreadPoolBacklogMonitor(_queueStatsManager, trackers.QueueTrackers);
_threadPoolBacklogMonitor.Start();

var indexPath = options.Database.Index ?? Path.Combine(Db.Config.Path, ESConsts.DefaultIndexDirectoryName);

var pTableMaxReaderCount = GetPTableMaxReaderCount(readerThreadsCount);
Expand Down Expand Up @@ -2007,6 +2012,8 @@ public async ValueTask HandleAsync(SystemMessage.BecomeShuttingDown message, Can

_reloadConfigSignalRegistration?.Dispose();
_reloadConfigSignalRegistration = null;
_threadPoolBacklogMonitor?.Dispose();
_threadPoolBacklogMonitor = null;

foreach (var subsystem in _subsystems ?? [])
{
Expand Down
105 changes: 105 additions & 0 deletions src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using System;
using System.Threading;
using EventStore.Core.Bus;
using EventStore.Core.Services.Monitoring.Stats;
using EventStore.Core.Time;

namespace EventStore.Core.Metrics;

public sealed class ThreadPoolBacklogMonitor : IMonitoredQueue, IThreadPoolWorkItem, IDisposable
{
private static readonly TimeSpan SampleInterval = TimeSpan.FromSeconds(1);
private const string QueueName = "ThreadPoolBacklog";

private readonly QueueStatsCollector _queueStats;
private readonly QueueTracker _tracker;
private readonly Timer _timer;
private Instant _enqueuedAt;
private int _stopped;

public ThreadPoolBacklogMonitor(QueueStatsManager queueStatsManager, QueueTrackers trackers)
{
_queueStats = queueStatsManager.CreateQueueStatsCollector(QueueName);
_tracker = trackers.GetTrackerForQueue(QueueName);
_timer = new Timer(_ => QueueWorkItem());
}

public string Name => _queueStats.Name;

public void Start()
{
_queueStats.Start();
QueueMonitor.Default.Register(this);
QueueWorkItem();
}

public void Stop()
{
if (Interlocked.Exchange(ref _stopped, 1) != 0)
{
return;
}

_timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
QueueMonitor.Default.Unregister(this);
_queueStats.Stop();
}

public void Dispose()
{
Stop();
_timer.Dispose();
}

public void Execute()
{
if (Volatile.Read(ref _stopped) != 0)
{
return;
}

_queueStats.EnterBusy();
try
{
var length = GetPendingWorkItemCount();
_queueStats.ProcessingStarted<ThreadPoolBacklogSample>(length);
_queueStats.ReportQueueLength(length);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant ReportQueueLength call after ProcessingStarted

Low Severity

ProcessingStarted<ThreadPoolBacklogSample>(length) already calls ReportQueueLength(length) internally (via the overload in QueueStatsCollector). The explicit _queueStats.ReportQueueLength(length) call immediately after is redundant since it applies Math.Max with the same value, producing no effect. This could mislead future maintainers into thinking the two calls serve distinct purposes.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit d460a51. Configure here.

_tracker.RecordQueueLength(length);
_tracker.RecordMessageDequeued(_enqueuedAt);
_queueStats.ProcessingEnded(1);
}
finally
{
_queueStats.EnterIdle();
}

if (Volatile.Read(ref _stopped) == 0)
{
_timer.Change(SampleInterval, Timeout.InfiniteTimeSpan);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timer disposal race can throw during shutdown

Low Severity

A TOCTOU race exists between the Volatile.Read(ref _stopped) check and the _timer.Change(...) call in Execute(). If Dispose() is called concurrently (during shutdown), Stop() sets _stopped and then _timer.Dispose() runs between the check and the Change call, causing an ObjectDisposedException on the thread pool — which in .NET 6+ crashes the process by default.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit d460a51. Configure here.

}

public QueueStats GetStatistics() =>
_queueStats.GetStatistics(GetPendingWorkItemCount());

private void QueueWorkItem()
{
if (Volatile.Read(ref _stopped) != 0)
{
return;
}

_enqueuedAt = _tracker.Now;
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

private static int GetPendingWorkItemCount()
{
var count = ThreadPool.PendingWorkItemCount;
return count > int.MaxValue ? int.MaxValue : (int)count;
}

private sealed class ThreadPoolBacklogSample
{
}
}
Loading