-
Notifications
You must be signed in to change notification settings - Fork 0
feat(metrics): track thread pool backlog #358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| using EventStore.Core.Bus; | ||
| using Xunit; | ||
|
|
||
| namespace EventStore.Core.XUnit.Tests.Metrics; | ||
|
|
||
| public class QueueStatsCollectorTests | ||
| { | ||
| [Fact] | ||
| public void statistics_include_the_current_queue_length_in_the_peak() | ||
| { | ||
| var collector = new QueueStatsCollector("queue"); | ||
| collector.Start(); | ||
| try | ||
| { | ||
| var stats = collector.GetStatistics(currentQueueLength: 5); | ||
|
|
||
| Assert.Equal(5, stats.Length); | ||
| Assert.Equal(5, stats.LengthCurrentTryPeak); | ||
| Assert.Equal(5, stats.LengthLifetimePeak); | ||
| } | ||
| finally | ||
| { | ||
| collector.Stop(); | ||
| } | ||
| } | ||
|
|
||
| [Fact] | ||
| public void reported_queue_length_updates_the_peak() | ||
| { | ||
| var collector = new QueueStatsCollector("queue"); | ||
| collector.Start(); | ||
| try | ||
| { | ||
| collector.ReportQueueLength(7); | ||
| var stats = collector.GetStatistics(currentQueueLength: 3); | ||
|
|
||
| Assert.Equal(3, stats.Length); | ||
| Assert.Equal(7, stats.LengthCurrentTryPeak); | ||
| Assert.Equal(7, stats.LengthLifetimePeak); | ||
| } | ||
| finally | ||
| { | ||
| collector.Stop(); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| using System; | ||
| using System.Threading; | ||
| using EventStore.Core.Bus; | ||
| using EventStore.Core.Services.Monitoring.Stats; | ||
| using EventStore.Core.Time; | ||
|
|
||
| namespace EventStore.Core.Metrics; | ||
|
|
||
| public sealed class ThreadPoolBacklogMonitor : IMonitoredQueue, IThreadPoolWorkItem, IDisposable | ||
| { | ||
| private static readonly TimeSpan SampleInterval = TimeSpan.FromSeconds(1); | ||
| private const string QueueName = "ThreadPoolBacklog"; | ||
|
|
||
| private readonly QueueStatsCollector _queueStats; | ||
| private readonly QueueTracker _tracker; | ||
| private readonly Timer _timer; | ||
| private Instant _enqueuedAt; | ||
| private int _stopped; | ||
|
|
||
| public ThreadPoolBacklogMonitor(QueueStatsManager queueStatsManager, QueueTrackers trackers) | ||
| { | ||
| _queueStats = queueStatsManager.CreateQueueStatsCollector(QueueName); | ||
| _tracker = trackers.GetTrackerForQueue(QueueName); | ||
| _timer = new Timer(_ => QueueWorkItem()); | ||
| } | ||
|
|
||
| public string Name => _queueStats.Name; | ||
|
|
||
| public void Start() | ||
| { | ||
| _queueStats.Start(); | ||
| QueueMonitor.Default.Register(this); | ||
| QueueWorkItem(); | ||
| } | ||
|
|
||
| public void Stop() | ||
| { | ||
| if (Interlocked.Exchange(ref _stopped, 1) != 0) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| _timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); | ||
| QueueMonitor.Default.Unregister(this); | ||
| _queueStats.Stop(); | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| Stop(); | ||
| _timer.Dispose(); | ||
| } | ||
|
|
||
| public void Execute() | ||
| { | ||
| if (Volatile.Read(ref _stopped) != 0) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| _queueStats.EnterBusy(); | ||
| try | ||
| { | ||
| var length = GetPendingWorkItemCount(); | ||
| _queueStats.ProcessingStarted<ThreadPoolBacklogSample>(length); | ||
| _queueStats.ReportQueueLength(length); | ||
| _tracker.RecordQueueLength(length); | ||
| _tracker.RecordMessageDequeued(_enqueuedAt); | ||
| _queueStats.ProcessingEnded(1); | ||
| } | ||
| finally | ||
| { | ||
| _queueStats.EnterIdle(); | ||
| } | ||
|
|
||
| if (Volatile.Read(ref _stopped) == 0) | ||
| { | ||
| _timer.Change(SampleInterval, Timeout.InfiniteTimeSpan); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Timer disposal race can throw during shutdownLow Severity A TOCTOU race exists between the Additional Locations (1)Reviewed by Cursor Bugbot for commit d460a51. Configure here. |
||
| } | ||
|
|
||
| public QueueStats GetStatistics() => | ||
| _queueStats.GetStatistics(GetPendingWorkItemCount()); | ||
|
|
||
| private void QueueWorkItem() | ||
| { | ||
| if (Volatile.Read(ref _stopped) != 0) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| _enqueuedAt = _tracker.Now; | ||
| ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); | ||
| } | ||
|
|
||
| private static int GetPendingWorkItemCount() | ||
| { | ||
| var count = ThreadPool.PendingWorkItemCount; | ||
| return count > int.MaxValue ? int.MaxValue : (int)count; | ||
| } | ||
|
|
||
| private sealed class ThreadPoolBacklogSample | ||
| { | ||
| } | ||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant
ReportQueueLengthcall afterProcessingStartedLow Severity
ProcessingStarted<ThreadPoolBacklogSample>(length)already callsReportQueueLength(length)internally (via the overload inQueueStatsCollector). The explicit_queueStats.ReportQueueLength(length)call immediately after is redundant since it appliesMath.Maxwith the same value, producing no effect. This could mislead future maintainers into thinking the two calls serve distinct purposes.Reviewed by Cursor Bugbot for commit d460a51. Configure here.