-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMessageQueue.php
More file actions
63 lines (53 loc) · 1.62 KB
/
MessageQueue.php
File metadata and controls
63 lines (53 loc) · 1.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?php
// namespace TO-DO ;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPNoDataException;
class MessageQueue
{
private $config;
private $channel;
private string $queue;
private bool $durable;
private AMQPStreamConnection $connection;
public function __construct(string $queue, bool $durable = false)
{
$this->queue = $queue;
$this->durable = $durable;
// $this->config = TO-DO ;
$this->connection = new AMQPStreamConnection(
/* TO-DO
$this->config["host"],
$this->config["porta"],
$this->config["user"],
$this->config["passwd"]
*/
);
}
public function connect(): void
{
$this->channel = $this->connection->channel();
$this->channel->queue_declare($this->queue, false, $this->durable, false, false);
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
public function publish(array $message): void
{
$message = new AMQPMessage(json_encode($message));
$this->channel->basic_publish($message, "", $this->queue);
}
public function consume(callable $callback, bool $autoAck = false, string $consumer_tag = ""): void
{
try {
$this->channel->basic_consume($this->queue, $consumer_tag, false, $autoAck, false, false, $callback);
while ($this->channel->is_open()) {
$this->channel->wait();
}
} catch (AMQPNoDataException $e) {
return;
}
}
}