Communication via fanout exchange works fine.
using AMQPClient
ENV["JULIA_DEBUG"] = "all"
const HOST = "localhost"
const PORT = 5672
const EXCHANGE = "exchange"
const AUTH = Dict{String,Any}(
"MECHANISM" => "AMQPLAIN",
"LOGIN" => "guest",
"PASSWORD" => "guest"
)
function produce()
conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)
() -> begin
timestamp = round(Int, time())
msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
basic_publish(chnl, msg; exchange=EXCHANGE, routing_key="")
@info "sent" data=timestamp
end
end
function consume()
conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
success, queue_name, message_count, consumer_count = queue_declare(chnl, "")
success = queue_bind(chnl, queue_name, EXCHANGE, "")
function consumer(msg)
@info "recieved" data=parse(Int, String(msg.data))
basic_ack(chnl, msg.delivery_tag)
end
success, consumer_tag = basic_consume(chnl, queue_name, consumer)
end
consumer = consume()
producer = produce()
producer()
┌ Info: sent
└ data = 1572692964
┌ Info: recieved
└ data = 1572692964
But when I restart RabbitMQ (docker restart ...) and call producer again julia process hangs and eats all available memory without any error reporting.
The RabbitMQ reliability guide says that in such cases it is necessary to create new connection and channel. How can I do that with AMQPClient?
Communication via fanout exchange works fine.
But when I restart RabbitMQ (
docker restart ...) and callproduceragain julia process hangs and eats all available memory without any error reporting.The RabbitMQ reliability guide says that in such cases it is necessary to create new connection and channel. How can I do that with AMQPClient?