Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/diagnostics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ EventStoreDB uses various queues for asynchronous processing for which it also c
| Time series | Type | Description |
|:--------------------------------------------------------------------------------------------------------------------|:---------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `eventstore_queue_busy_seconds{queue=<QUEUE_GROUP>}` | [Counter](#common-types) | Total time spent processing in seconds, averaged across the queues in the _QUEUE_GROUP_. The rate of this metric is therefore the average busyness of the group during the period (from 0-1 s/s) |
| `eventstore_queue_length_items{queue=<QUEUE_GROUP>}` | [Gauge](#common-types) | Current number of queued items in queues belonging to the _QUEUE_GROUP_ |
| `eventstore_queue_queueing_duration_max_seconds{`<br/>`name=<QUEUE_GROUP>,range=<RANGE>}` | [RecentMax](#recentmax) | Recent maximum time in seconds for which any item was queued in queues belonging to the _QUEUE_GROUP_. This is essentially the length of the longest queue in the group in seconds |
| `eventstore_queue_processing_duration_seconds_bucket{`<br/>`message_type=<TYPE>,queue=<QUEUE_GROUP>,le=<DURATION>}` | [Histogram](#common-types) | Number of messages of type _TYPE_ processed by _QUEUE_GROUP_ group that took less than or equal to _DURATION_ in seconds |

Expand Down
55 changes: 54 additions & 1 deletion src/EventStore.Core.XUnit.Tests/Metrics/QueueTrackersTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,63 @@ public void matches_groups()
Assert.Equal("Worker Queue 3", tracker.Name);
}

[Fact]
public void matched_queue_records_item_count()
{
var lengthTracker = new FakeTracker { Name = "MainQueue" };
var sut = new QueueTrackers(
[
new Conf.LabelMappingCase
{
Regex = "MainQueue",
Label = "MainQueue",
},
],
x => new FakeTracker { Name = x },
_ => lengthTracker,
x => new FakeTracker { Name = x },
x => new FakeTracker { Name = x });

var tracker = sut.GetTrackerForQueue("MainQueue");
tracker.RecordQueueLength(42);

Assert.Equal(42, lengthTracker.Length);
}

[Fact]
public void not_matched_queue_does_not_record_item_count()
{
var lengthTracker = new FakeTracker { Name = "MainQueue" };
var sut = new QueueTrackers(
[
new Conf.LabelMappingCase
{
Regex = "MainQueue",
Label = "MainQueue",
},
],
x => new FakeTracker { Name = x },
_ => lengthTracker,
x => new FakeTracker { Name = x },
x => new FakeTracker { Name = x });

var tracker = sut.GetTrackerForQueue("OtherQueue");
tracker.RecordQueueLength(42);

Assert.Null(lengthTracker.Length);
}

QueueTrackers GenSut(params Conf.LabelMappingCase[] map) =>
new(map,
x => new FakeTracker { Name = x },
x => new FakeTracker { Name = x },
x => new FakeTracker { Name = x },
x => new FakeTracker { Name = x });

class FakeTracker : IDurationMaxTracker, IQueueProcessingTracker, IQueueBusyTracker
class FakeTracker : IDurationMaxTracker, IQueueProcessingTracker, IQueueBusyTracker, IQueueLengthTracker
{
public string Name { get; init; }
public int? Length { get; private set; }

public void EnterBusy()
{
Expand All @@ -141,5 +189,10 @@ public void EnterIdle()
public Instant RecordNow(Instant start) => start;

public Instant RecordNow(Instant start, string messageType) => start;

public void SetQueueLength(int length)
{
Length = length;
}
}
}
2 changes: 2 additions & 0 deletions src/EventStore.Core/Bus/QueuedHandlerThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ async void IThreadPoolWorkItem.Execute()

while (!_lifetimeToken.IsCancellationRequested && _queue.TryDequeue(out var item))
{
_tracker.RecordQueueLength(_queue.Count);
var start = _tracker.RecordMessageDequeued(item.EnqueuedAt);
var msg = item.Message;
#if DEBUG
Expand Down Expand Up @@ -259,6 +260,7 @@ public void Publish(Message message)
_queueStats.Enqueued();
#endif
_queue.Enqueue(new(_tracker.Now, message));
_tracker.RecordQueueLength(_queue.Count);
if (!_lifetimeToken.IsCancellationRequested && Interlocked.CompareExchange(ref _isRunning, 1, 0) == 0)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
Expand Down
35 changes: 35 additions & 0 deletions src/EventStore.Core/Metrics/QueueLengthTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Threading;

namespace EventStore.Core.Metrics;

public interface IQueueLengthTracker
{
void SetQueueLength(int length);
}

public class QueueLengthTracker : IQueueLengthTracker
{
private readonly KeyValuePair<string, object>[] _tags;
private int _length;

public QueueLengthTracker(ObservableUpDownMetric<int> metric, string queueName)
{
_tags = [new("queue", queueName)];
metric.Register(() => new Measurement<int>(Volatile.Read(ref _length), _tags.AsSpan()));
}

public void SetQueueLength(int length)
{
Volatile.Write(ref _length, length);
}

public class NoOp : IQueueLengthTracker
{
public void SetQueueLength(int length)
{
}
}
}
5 changes: 5 additions & 0 deletions src/EventStore.Core/Metrics/QueueTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ public class QueueTracker
{
private readonly string _name;
private readonly IQueueBusyTracker _busyTracker;
private readonly IQueueLengthTracker _queueLengthTracker;
private readonly IDurationMaxTracker _queueingDurationTracker;
private readonly IQueueProcessingTracker _queueProcessingTracker;
private readonly IClock _clock;

public QueueTracker(
string name,
IQueueBusyTracker busyTracker,
IQueueLengthTracker queueLengthTracker,
IDurationMaxTracker queueingDurationTracker,
IQueueProcessingTracker processingDurationTracker,
IClock clock = null)
Expand All @@ -27,6 +29,7 @@ public QueueTracker(
_queueingDurationTracker = queueingDurationTracker;
_queueProcessingTracker = processingDurationTracker;
_busyTracker = busyTracker;
_queueLengthTracker = queueLengthTracker;
_clock = clock ?? Clock.Instance;
}

Expand All @@ -38,6 +41,8 @@ public QueueTracker(

public void EnterIdle() => _busyTracker.EnterIdle();

public void RecordQueueLength(int length) => _queueLengthTracker.SetQueueLength(length);

public Instant RecordMessageDequeued(Instant enqueuedAt)
{
return _queueingDurationTracker.RecordNow(enqueuedAt);
Expand Down
27 changes: 22 additions & 5 deletions src/EventStore.Core/Metrics/QueueTrackers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,54 @@ public class QueueTrackers
private readonly Dictionary<string, SharedTrackers> _sharedTrackers = new();

private readonly PrivateTrackers _noOpPrivate = new(
new QueueBusyTracker.NoOp());
new QueueBusyTracker.NoOp(),
new QueueLengthTracker.NoOp());

private readonly SharedTrackers _noOpShared = new(
false,
"NoOp",
new DurationMaxTracker.NoOp(),
new QueueProcessingTracker.NoOp());

private readonly Conf.LabelMappingCase[] _cases;
private readonly Func<string, IQueueBusyTracker> _busyTrackerFactory;
private readonly Func<string, IQueueLengthTracker> _lengthTrackerFactory;
private readonly Func<string, IDurationMaxTracker> _durationTrackerFactory;
private readonly Func<string, IQueueProcessingTracker> _processingTrackerFactory;

public QueueTrackers()
{
_cases = Array.Empty<Conf.LabelMappingCase>();
_busyTrackerFactory = _ => _noOpPrivate.QueueBusyTracker;
_lengthTrackerFactory = _ => _noOpPrivate.QueueLengthTracker;
_durationTrackerFactory = _ => _noOpShared.QueueingDurationTracker;
_processingTrackerFactory = _ => _noOpShared.QueueProcessingTracker;
}

public QueueTrackers(
Conf.LabelMappingCase[] cases,
Func<string, IQueueBusyTracker> busyTrackerFactory,
Func<string, IQueueLengthTracker> lengthTrackerFactory,
Func<string, IDurationMaxTracker> durationTrackerFactory,
Func<string, IQueueProcessingTracker> processingTrackerFactory)
{

_cases = cases;
_busyTrackerFactory = busyTrackerFactory;
_lengthTrackerFactory = lengthTrackerFactory;
_durationTrackerFactory = durationTrackerFactory;
_processingTrackerFactory = processingTrackerFactory;
}

public QueueTracker GetTrackerForQueue(string queueName)
{
var sharedTrackers = GetSharedTrackerForQueue(queueName);
var privateTrackers = GetPrivateTrackerForLabel(sharedTrackers.Label);
var privateTrackers = GetPrivateTracker(sharedTrackers);

return new QueueTracker(
sharedTrackers.Label,
privateTrackers.QueueBusyTracker,
privateTrackers.QueueLengthTracker,
sharedTrackers.QueueingDurationTracker,
sharedTrackers.QueueProcessingTracker);
}
Expand Down Expand Up @@ -105,6 +112,7 @@ private SharedTrackers GetSharedTrackerForLabel(string label)
if (!_sharedTrackers.TryGetValue(label, out var tracker))
{
tracker = new(
true,
label,
_durationTrackerFactory(label),
_processingTrackerFactory(label));
Expand All @@ -114,18 +122,27 @@ private SharedTrackers GetSharedTrackerForLabel(string label)
return tracker;
}

private PrivateTrackers GetPrivateTrackerForLabel(string label)
private PrivateTrackers GetPrivateTracker(SharedTrackers sharedTrackers)
{
return new(_busyTrackerFactory(label));
if (!sharedTrackers.Enabled)
{
return _noOpPrivate;
}

return new(
_busyTrackerFactory(sharedTrackers.Label),
_lengthTrackerFactory(sharedTrackers.Label));
}

// each queue gets its own instance of the busytracker because it deals with the aggregation
// on observation rather than on measurement. two queues trying to start/stop the same stopwatch
// wouldn't make sense.
private record PrivateTrackers(
IQueueBusyTracker QueueBusyTracker);
IQueueBusyTracker QueueBusyTracker,
IQueueLengthTracker QueueLengthTracker);

private record SharedTrackers(
bool Enabled,
string Label,
IDurationMaxTracker QueueingDurationTracker,
IQueueProcessingTracker QueueProcessingTracker);
Expand Down
14 changes: 11 additions & 3 deletions src/EventStore.Core/MetricsBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public static void Bootstrap(
var queueQueueingDurationMaxMetric = new DurationMaxMetric(coreMeter, "eventstore-queue-queueing-duration-max");
var queueProcessingDurationMetric = new DurationMetric(coreMeter, "eventstore-queue-processing-duration");
var queueBusyMetric = new AverageMetric(coreMeter, "eventstore-queue-busy", "seconds", label => new("queue", label));
var queueLengthMetric = new ObservableUpDownMetric<int>(coreMeter, "eventstore-queue-length", "items");
var byteMetric = new CounterMetric(coreMeter, "eventstore-io", unit: "bytes");
var eventMetric = new CounterMetric(coreMeter, "eventstore-io", unit: "events");
var recordReadDurationMetric = new DurationMetric(coreMeter, "eventstore-io-record-read-duration");
Expand Down Expand Up @@ -277,7 +278,8 @@ public static void Bootstrap(

// queue trackers
Func<string, IQueueBusyTracker> busyTrackerFactory = name => new QueueBusyTracker.NoOp();
Func<string, IDurationMaxTracker> lengthFactory = name => new DurationMaxTracker.NoOp();
Func<string, IQueueLengthTracker> lengthTrackerFactory = name => new QueueLengthTracker.NoOp();
Func<string, IDurationMaxTracker> queueingDurationFactory = name => new DurationMaxTracker.NoOp();
Func<string, IQueueProcessingTracker> processingFactory = name => new QueueProcessingTracker.NoOp();

if (conf.Queues.TryGetValue(Conf.QueueTracker.Busy, out var busyEnabled) && busyEnabled)
Expand All @@ -287,7 +289,8 @@ public static void Bootstrap(

if (conf.Queues.TryGetValue(Conf.QueueTracker.Length, out var lengthEnabled) && lengthEnabled)
{
lengthFactory = name => new DurationMaxTracker(
lengthTrackerFactory = name => new QueueLengthTracker(queueLengthMetric, name);
queueingDurationFactory = name => new DurationMaxTracker(
name: name,
metric: queueQueueingDurationMaxMetric,
expectedScrapeIntervalSeconds: conf.ExpectedScrapeIntervalSeconds);
Expand All @@ -298,7 +301,12 @@ public static void Bootstrap(
processingFactory = name => new QueueProcessingTracker(queueProcessingDurationMetric, name);
}

trackers.QueueTrackers = new QueueTrackers(conf.QueueLabels, busyTrackerFactory, lengthFactory, processingFactory);
trackers.QueueTrackers = new QueueTrackers(
conf.QueueLabels,
busyTrackerFactory,
lengthTrackerFactory,
queueingDurationFactory,
processingFactory);

// kestrel
if (conf.Kestrel.TryGetValue(Conf.KestrelTracker.ConnectionCount, out var kestrelConnections) && kestrelConnections)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void DoTiming()
}

_queueStats.ProcessingEnded(pending);
_tracker.RecordQueueLength(_pending.Count + _tasks.Count);

_queueStats.ProcessingStarted<ExecuteScheduledTasks>(_tasks.Count);
int processed = 0;
Expand Down Expand Up @@ -129,6 +130,7 @@ private void DoTiming()
}

_queueStats.ProcessingEnded(processed);
_tracker.RecordQueueLength(_pending.Count + _tasks.Count);

if (processed == 0 && !_pendingEvent.IsSet)
{
Expand Down
Loading