From 508d76c8360863c4a7baf29a3b6d01c992d6a8c2 Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Wed, 13 May 2026 18:17:51 -0400 Subject: [PATCH] feat(metrics): expose queued work backlog Signed-off-by: Yordis Prieto --- docs/diagnostics/metrics.md | 1 + .../Metrics/QueueTrackersTests.cs | 55 ++++++++++++++++++- .../Bus/QueuedHandlerThreadPool.cs | 2 + .../Metrics/QueueLengthTracker.cs | 35 ++++++++++++ src/EventStore.Core/Metrics/QueueTracker.cs | 5 ++ src/EventStore.Core/Metrics/QueueTrackers.cs | 27 +++++++-- src/EventStore.Core/MetricsBootstrapper.cs | 14 ++++- .../TimerService/ThreadBasedScheduler.cs | 2 + 8 files changed, 132 insertions(+), 9 deletions(-) create mode 100644 src/EventStore.Core/Metrics/QueueLengthTracker.cs diff --git a/docs/diagnostics/metrics.md b/docs/diagnostics/metrics.md index bec21debbb..6bd6ca4906 100644 --- a/docs/diagnostics/metrics.md +++ b/docs/diagnostics/metrics.md @@ -439,6 +439,7 @@ EventStoreDB uses various queues for asynchronous processing for which it also c | Time series | Type | Description | |:--------------------------------------------------------------------------------------------------------------------|:---------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `eventstore_queue_busy_seconds{queue=}` | [Counter](#common-types) | Total time spent processing in seconds, averaged across the queues in the _QUEUE_GROUP_. The rate of this metric is therefore the average busyness of the group during the period (from 0-1 s/s) | +| `eventstore_queue_length_items{queue=}` | [Gauge](#common-types) | Current number of queued items in queues belonging to the _QUEUE_GROUP_ | | `eventstore_queue_queueing_duration_max_seconds{`
`name=,range=}` | [RecentMax](#recentmax) | Recent maximum time in seconds for which any item was queued in queues belonging to the _QUEUE_GROUP_. This is essentially the length of the longest queue in the group in seconds | | `eventstore_queue_processing_duration_seconds_bucket{`
`message_type=,queue=,le=}` | [Histogram](#common-types) | Number of messages of type _TYPE_ processed by _QUEUE_GROUP_ group that took less than or equal to _DURATION_ in seconds | diff --git a/src/EventStore.Core.XUnit.Tests/Metrics/QueueTrackersTests.cs b/src/EventStore.Core.XUnit.Tests/Metrics/QueueTrackersTests.cs index 348e3951e8..f051e6b2f0 100644 --- a/src/EventStore.Core.XUnit.Tests/Metrics/QueueTrackersTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Metrics/QueueTrackersTests.cs @@ -120,15 +120,63 @@ public void matches_groups() Assert.Equal("Worker Queue 3", tracker.Name); } + [Fact] + public void matched_queue_records_item_count() + { + var lengthTracker = new FakeTracker { Name = "MainQueue" }; + var sut = new QueueTrackers( + [ + new Conf.LabelMappingCase + { + Regex = "MainQueue", + Label = "MainQueue", + }, + ], + x => new FakeTracker { Name = x }, + _ => lengthTracker, + x => new FakeTracker { Name = x }, + x => new FakeTracker { Name = x }); + + var tracker = sut.GetTrackerForQueue("MainQueue"); + tracker.RecordQueueLength(42); + + Assert.Equal(42, lengthTracker.Length); + } + + [Fact] + public void not_matched_queue_does_not_record_item_count() + { + var lengthTracker = new FakeTracker { Name = "MainQueue" }; + var sut = new QueueTrackers( + [ + new Conf.LabelMappingCase + { + Regex = "MainQueue", + Label = "MainQueue", + }, + ], + x => new FakeTracker { Name = x }, + _ => lengthTracker, + x => new FakeTracker { Name = x }, + x => new FakeTracker { Name = x }); + + var tracker = sut.GetTrackerForQueue("OtherQueue"); + tracker.RecordQueueLength(42); + + Assert.Null(lengthTracker.Length); + } + QueueTrackers GenSut(params Conf.LabelMappingCase[] map) => new(map, + x => new FakeTracker { Name = x }, x => new FakeTracker { Name = x }, x => new FakeTracker { Name = x }, x => new FakeTracker { Name = x }); - class FakeTracker : IDurationMaxTracker, IQueueProcessingTracker, IQueueBusyTracker + class FakeTracker : IDurationMaxTracker, IQueueProcessingTracker, IQueueBusyTracker, IQueueLengthTracker { public string Name { get; init; } + public int? Length { get; private set; } public void EnterBusy() { @@ -141,5 +189,10 @@ public void EnterIdle() public Instant RecordNow(Instant start) => start; public Instant RecordNow(Instant start, string messageType) => start; + + public void SetQueueLength(int length) + { + Length = length; + } } } diff --git a/src/EventStore.Core/Bus/QueuedHandlerThreadPool.cs b/src/EventStore.Core/Bus/QueuedHandlerThreadPool.cs index 44c76bd87d..533326530c 100644 --- a/src/EventStore.Core/Bus/QueuedHandlerThreadPool.cs +++ b/src/EventStore.Core/Bus/QueuedHandlerThreadPool.cs @@ -156,6 +156,7 @@ async void IThreadPoolWorkItem.Execute() while (!_lifetimeToken.IsCancellationRequested && _queue.TryDequeue(out var item)) { + _tracker.RecordQueueLength(_queue.Count); var start = _tracker.RecordMessageDequeued(item.EnqueuedAt); var msg = item.Message; #if DEBUG @@ -259,6 +260,7 @@ public void Publish(Message message) _queueStats.Enqueued(); #endif _queue.Enqueue(new(_tracker.Now, message)); + _tracker.RecordQueueLength(_queue.Count); if (!_lifetimeToken.IsCancellationRequested && Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0) { ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); diff --git a/src/EventStore.Core/Metrics/QueueLengthTracker.cs b/src/EventStore.Core/Metrics/QueueLengthTracker.cs new file mode 100644 index 0000000000..02cb1d39f9 --- /dev/null +++ b/src/EventStore.Core/Metrics/QueueLengthTracker.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using System.Threading; + +namespace EventStore.Core.Metrics; + +public interface IQueueLengthTracker +{ + void SetQueueLength(int length); +} + +public class QueueLengthTracker : IQueueLengthTracker +{ + private readonly KeyValuePair[] _tags; + private int _length; + + public QueueLengthTracker(ObservableUpDownMetric metric, string queueName) + { + _tags = [new("queue", queueName)]; + metric.Register(() => new Measurement(Volatile.Read(ref _length), _tags.AsSpan())); + } + + public void SetQueueLength(int length) + { + Volatile.Write(ref _length, length); + } + + public class NoOp : IQueueLengthTracker + { + public void SetQueueLength(int length) + { + } + } +} diff --git a/src/EventStore.Core/Metrics/QueueTracker.cs b/src/EventStore.Core/Metrics/QueueTracker.cs index ef0e73c540..a354a5354e 100644 --- a/src/EventStore.Core/Metrics/QueueTracker.cs +++ b/src/EventStore.Core/Metrics/QueueTracker.cs @@ -11,6 +11,7 @@ public class QueueTracker { private readonly string _name; private readonly IQueueBusyTracker _busyTracker; + private readonly IQueueLengthTracker _queueLengthTracker; private readonly IDurationMaxTracker _queueingDurationTracker; private readonly IQueueProcessingTracker _queueProcessingTracker; private readonly IClock _clock; @@ -18,6 +19,7 @@ public class QueueTracker public QueueTracker( string name, IQueueBusyTracker busyTracker, + IQueueLengthTracker queueLengthTracker, IDurationMaxTracker queueingDurationTracker, IQueueProcessingTracker processingDurationTracker, IClock clock = null) @@ -27,6 +29,7 @@ public QueueTracker( _queueingDurationTracker = queueingDurationTracker; _queueProcessingTracker = processingDurationTracker; _busyTracker = busyTracker; + _queueLengthTracker = queueLengthTracker; _clock = clock ?? Clock.Instance; } @@ -38,6 +41,8 @@ public QueueTracker( public void EnterIdle() => _busyTracker.EnterIdle(); + public void RecordQueueLength(int length) => _queueLengthTracker.SetQueueLength(length); + public Instant RecordMessageDequeued(Instant enqueuedAt) { return _queueingDurationTracker.RecordNow(enqueuedAt); diff --git a/src/EventStore.Core/Metrics/QueueTrackers.cs b/src/EventStore.Core/Metrics/QueueTrackers.cs index 13f3a5a8e7..a7cb1e47f0 100644 --- a/src/EventStore.Core/Metrics/QueueTrackers.cs +++ b/src/EventStore.Core/Metrics/QueueTrackers.cs @@ -16,15 +16,18 @@ public class QueueTrackers private readonly Dictionary _sharedTrackers = new(); private readonly PrivateTrackers _noOpPrivate = new( - new QueueBusyTracker.NoOp()); + new QueueBusyTracker.NoOp(), + new QueueLengthTracker.NoOp()); private readonly SharedTrackers _noOpShared = new( + false, "NoOp", new DurationMaxTracker.NoOp(), new QueueProcessingTracker.NoOp()); private readonly Conf.LabelMappingCase[] _cases; private readonly Func _busyTrackerFactory; + private readonly Func _lengthTrackerFactory; private readonly Func _durationTrackerFactory; private readonly Func _processingTrackerFactory; @@ -32,6 +35,7 @@ public QueueTrackers() { _cases = Array.Empty(); _busyTrackerFactory = _ => _noOpPrivate.QueueBusyTracker; + _lengthTrackerFactory = _ => _noOpPrivate.QueueLengthTracker; _durationTrackerFactory = _ => _noOpShared.QueueingDurationTracker; _processingTrackerFactory = _ => _noOpShared.QueueProcessingTracker; } @@ -39,12 +43,14 @@ public QueueTrackers() public QueueTrackers( Conf.LabelMappingCase[] cases, Func busyTrackerFactory, + Func lengthTrackerFactory, Func durationTrackerFactory, Func processingTrackerFactory) { _cases = cases; _busyTrackerFactory = busyTrackerFactory; + _lengthTrackerFactory = lengthTrackerFactory; _durationTrackerFactory = durationTrackerFactory; _processingTrackerFactory = processingTrackerFactory; } @@ -52,11 +58,12 @@ public QueueTrackers( public QueueTracker GetTrackerForQueue(string queueName) { var sharedTrackers = GetSharedTrackerForQueue(queueName); - var privateTrackers = GetPrivateTrackerForLabel(sharedTrackers.Label); + var privateTrackers = GetPrivateTracker(sharedTrackers); return new QueueTracker( sharedTrackers.Label, privateTrackers.QueueBusyTracker, + privateTrackers.QueueLengthTracker, sharedTrackers.QueueingDurationTracker, sharedTrackers.QueueProcessingTracker); } @@ -105,6 +112,7 @@ private SharedTrackers GetSharedTrackerForLabel(string label) if (!_sharedTrackers.TryGetValue(label, out var tracker)) { tracker = new( + true, label, _durationTrackerFactory(label), _processingTrackerFactory(label)); @@ -114,18 +122,27 @@ private SharedTrackers GetSharedTrackerForLabel(string label) return tracker; } - private PrivateTrackers GetPrivateTrackerForLabel(string label) + private PrivateTrackers GetPrivateTracker(SharedTrackers sharedTrackers) { - return new(_busyTrackerFactory(label)); + if (!sharedTrackers.Enabled) + { + return _noOpPrivate; + } + + return new( + _busyTrackerFactory(sharedTrackers.Label), + _lengthTrackerFactory(sharedTrackers.Label)); } // each queue gets its own instance of the busytracker because it deals with the aggregation // on observation rather than on measurement. two queues trying to start/stop the same stopwatch // wouldn't make sense. private record PrivateTrackers( - IQueueBusyTracker QueueBusyTracker); + IQueueBusyTracker QueueBusyTracker, + IQueueLengthTracker QueueLengthTracker); private record SharedTrackers( + bool Enabled, string Label, IDurationMaxTracker QueueingDurationTracker, IQueueProcessingTracker QueueProcessingTracker); diff --git a/src/EventStore.Core/MetricsBootstrapper.cs b/src/EventStore.Core/MetricsBootstrapper.cs index d8f1f89846..ab01793a4b 100644 --- a/src/EventStore.Core/MetricsBootstrapper.cs +++ b/src/EventStore.Core/MetricsBootstrapper.cs @@ -92,6 +92,7 @@ public static void Bootstrap( var queueQueueingDurationMaxMetric = new DurationMaxMetric(coreMeter, "eventstore-queue-queueing-duration-max"); var queueProcessingDurationMetric = new DurationMetric(coreMeter, "eventstore-queue-processing-duration"); var queueBusyMetric = new AverageMetric(coreMeter, "eventstore-queue-busy", "seconds", label => new("queue", label)); + var queueLengthMetric = new ObservableUpDownMetric(coreMeter, "eventstore-queue-length", "items"); var byteMetric = new CounterMetric(coreMeter, "eventstore-io", unit: "bytes"); var eventMetric = new CounterMetric(coreMeter, "eventstore-io", unit: "events"); var recordReadDurationMetric = new DurationMetric(coreMeter, "eventstore-io-record-read-duration"); @@ -277,7 +278,8 @@ public static void Bootstrap( // queue trackers Func busyTrackerFactory = name => new QueueBusyTracker.NoOp(); - Func lengthFactory = name => new DurationMaxTracker.NoOp(); + Func lengthTrackerFactory = name => new QueueLengthTracker.NoOp(); + Func queueingDurationFactory = name => new DurationMaxTracker.NoOp(); Func processingFactory = name => new QueueProcessingTracker.NoOp(); if (conf.Queues.TryGetValue(Conf.QueueTracker.Busy, out var busyEnabled) && busyEnabled) @@ -287,7 +289,8 @@ public static void Bootstrap( if (conf.Queues.TryGetValue(Conf.QueueTracker.Length, out var lengthEnabled) && lengthEnabled) { - lengthFactory = name => new DurationMaxTracker( + lengthTrackerFactory = name => new QueueLengthTracker(queueLengthMetric, name); + queueingDurationFactory = name => new DurationMaxTracker( name: name, metric: queueQueueingDurationMaxMetric, expectedScrapeIntervalSeconds: conf.ExpectedScrapeIntervalSeconds); @@ -298,7 +301,12 @@ public static void Bootstrap( processingFactory = name => new QueueProcessingTracker(queueProcessingDurationMetric, name); } - trackers.QueueTrackers = new QueueTrackers(conf.QueueLabels, busyTrackerFactory, lengthFactory, processingFactory); + trackers.QueueTrackers = new QueueTrackers( + conf.QueueLabels, + busyTrackerFactory, + lengthTrackerFactory, + queueingDurationFactory, + processingFactory); // kestrel if (conf.Kestrel.TryGetValue(Conf.KestrelTracker.ConnectionCount, out var kestrelConnections) && kestrelConnections) diff --git a/src/EventStore.Core/Services/TimerService/ThreadBasedScheduler.cs b/src/EventStore.Core/Services/TimerService/ThreadBasedScheduler.cs index 98de2dde05..d667f75d2e 100644 --- a/src/EventStore.Core/Services/TimerService/ThreadBasedScheduler.cs +++ b/src/EventStore.Core/Services/TimerService/ThreadBasedScheduler.cs @@ -101,6 +101,7 @@ private void DoTiming() } _queueStats.ProcessingEnded(pending); + _tracker.RecordQueueLength(_pending.Count + _tasks.Count); _queueStats.ProcessingStarted(_tasks.Count); int processed = 0; @@ -129,6 +130,7 @@ private void DoTiming() } _queueStats.ProcessingEnded(processed); + _tracker.RecordQueueLength(_pending.Count + _tasks.Count); if (processed == 0 && !_pendingEvent.IsSet) {