Skip to content

Commit 1977bc3

Browse files
committed
Add queue depth telemetry gauge
1 parent f85ca00 commit 1977bc3

2 files changed

Lines changed: 185 additions & 0 deletions

File tree

src/Queue/Server.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Utopia\Servers\Hook;
99
use Utopia\Telemetry\Adapter as Telemetry;
1010
use Utopia\Telemetry\Adapter\None as NoTelemetry;
11+
use Utopia\Telemetry\Gauge;
1112
use Utopia\Telemetry\Histogram;
1213
use Utopia\Validator;
1314

@@ -67,6 +68,7 @@ class Server
6768

6869
private Histogram $jobWaitTime;
6970
private Histogram $processDuration;
71+
private Gauge $queueDepth;
7072

7173
/**
7274
* Creates an instance of a Queue server.
@@ -158,6 +160,30 @@ public function setTelemetry(Telemetry $telemetry): void
158160
],
159161
],
160162
);
163+
164+
$this->queueDepth = $telemetry->createGauge(
165+
'messaging.queue.depth',
166+
'{message}',
167+
'Number of pending messages in the queue.',
168+
);
169+
}
170+
171+
private function recordQueueDepth(): void
172+
{
173+
if (!$this->adapter->consumer instanceof Publisher) {
174+
return;
175+
}
176+
177+
try {
178+
$this->queueDepth->record(
179+
$this->adapter->consumer->getQueueSize($this->adapter->queue),
180+
[
181+
'messaging.destination.name' => $this->adapter->queue->name,
182+
'messaging.destination.namespace' => $this->adapter->queue->namespace,
183+
],
184+
);
185+
} catch (Throwable) {
186+
}
161187
}
162188

163189
/**
@@ -216,6 +242,8 @@ public function start(): self
216242
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
217243
}
218244

245+
$this->recordQueueDepth();
246+
219247
$this->adapter->consumer->consume(
220248
$this->adapter->queue,
221249
function (Message $message) {
@@ -269,6 +297,7 @@ function (Message $message) {
269297
$processDuration =
270298
microtime(true) - $receivedAtTimestamp;
271299
$this->processDuration->record($processDuration);
300+
$this->recordQueueDepth();
272301
}
273302
},
274303
function (Message $message) {
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
<?php
2+
3+
namespace Tests\E2E\Adapter;
4+
5+
use PHPUnit\Framework\TestCase;
6+
use Utopia\Queue\Adapter;
7+
use Utopia\Queue\Consumer;
8+
use Utopia\Queue\Message;
9+
use Utopia\Queue\Publisher;
10+
use Utopia\Queue\Queue;
11+
use Utopia\Queue\Server;
12+
use Utopia\Telemetry\Adapter\Test as TestTelemetry;
13+
14+
class ServerTelemetryTest extends TestCase
15+
{
16+
public function testRecordsQueueDepth(): void
17+
{
18+
$consumer = new ServerTelemetryPublisherConsumer([3, 2]);
19+
$adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite');
20+
$telemetry = new TestTelemetry();
21+
22+
$server = new Server($adapter);
23+
$server->setTelemetry($telemetry);
24+
$server
25+
->job()
26+
->inject('message')
27+
->action(fn (Message $message) => null);
28+
29+
$server->start();
30+
31+
$this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges);
32+
/** @var object{values: array<int, float|int>} $queueDepth */
33+
$queueDepth = $telemetry->gauges['messaging.queue.depth'];
34+
$this->assertObjectHasProperty('values', $queueDepth);
35+
$this->assertSame([3, 2], $queueDepth->values);
36+
}
37+
38+
public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void
39+
{
40+
$consumer = new ServerTelemetryConsumer();
41+
$adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite');
42+
$telemetry = new TestTelemetry();
43+
44+
$server = new Server($adapter);
45+
$server->setTelemetry($telemetry);
46+
$server
47+
->job()
48+
->inject('message')
49+
->action(fn (Message $message) => null);
50+
51+
$server->start();
52+
53+
$this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges);
54+
/** @var object{values: array<int, float|int>} $queueDepth */
55+
$queueDepth = $telemetry->gauges['messaging.queue.depth'];
56+
$this->assertObjectHasProperty('values', $queueDepth);
57+
$this->assertSame([], $queueDepth->values);
58+
}
59+
}
60+
61+
final class ServerTelemetryAdapter extends Adapter
62+
{
63+
/**
64+
* @var callable[]
65+
*/
66+
private array $onWorkerStart = [];
67+
68+
/**
69+
* @var callable[]
70+
*/
71+
private array $onWorkerStop = [];
72+
73+
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
74+
{
75+
parent::__construct($workerNum, $queue, $namespace);
76+
$this->consumer = $consumer;
77+
}
78+
79+
public function start(): self
80+
{
81+
foreach ($this->onWorkerStart as $callback) {
82+
$callback('0');
83+
}
84+
85+
foreach ($this->onWorkerStop as $callback) {
86+
$callback('0');
87+
}
88+
89+
return $this;
90+
}
91+
92+
public function stop(): self
93+
{
94+
return $this;
95+
}
96+
97+
public function workerStart(callable $callback): self
98+
{
99+
$this->onWorkerStart[] = $callback;
100+
return $this;
101+
}
102+
103+
public function workerStop(callable $callback): self
104+
{
105+
$this->onWorkerStop[] = $callback;
106+
return $this;
107+
}
108+
}
109+
110+
class ServerTelemetryConsumer implements Consumer
111+
{
112+
public function consume(
113+
Queue $queue,
114+
callable $messageCallback,
115+
callable $successCallback,
116+
callable $errorCallback
117+
): void {
118+
$message = new Message([
119+
'pid' => 'test-pid',
120+
'queue' => $queue->name,
121+
'timestamp' => time() - 1,
122+
'payload' => [],
123+
]);
124+
125+
$messageCallback($message);
126+
$successCallback($message);
127+
}
128+
129+
public function close(): void
130+
{
131+
}
132+
}
133+
134+
final class ServerTelemetryPublisherConsumer extends ServerTelemetryConsumer implements Publisher
135+
{
136+
/**
137+
* @param int[] $queueSizes
138+
*/
139+
public function __construct(private array $queueSizes)
140+
{
141+
}
142+
143+
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
144+
{
145+
return true;
146+
}
147+
148+
public function retry(Queue $queue, ?int $limit = null): void
149+
{
150+
}
151+
152+
public function getQueueSize(Queue $queue, bool $failedJobs = false): int
153+
{
154+
return array_shift($this->queueSizes) ?? 0;
155+
}
156+
}

0 commit comments

Comments
 (0)