MQTT: Support wildcards in topic filters matching retained messages#1
MQTT: Support wildcards in topic filters matching retained messages#1
Conversation
|
when I delete a retained message I see a publish of {} instead. After this if I disconnect and re-connect I don't see it anymore so it's partially working. Screencast.da.2025-02-12.10-50-09.webm |
|
Hello there Mr. getLarge. I have followed your work regarding this ticket and all of the progress you made, right up until the point where it would not be accepted by the RabbitMQ team :) We are very interested in a feature like this, I work in a Robotics Lab in Dubai. I ran into this issue the other day trying to understand why retained topics with a '+' for word replacement was not working when the retain flag is set by the publisher. I have thoroughly reviewed the entire history of this request, all the way back to July 2018 where it seems to have started. How can my team use your work and adopt this (Earlang plugin?) into our running RabbitMQ single instance as well as our 3.13 instances running inside the Rabbit MQ cluster operator on Kubernetes. I am willing to help write any code to make this happen. |
This yields very significant efficiency gains with hundreds or thousands of links. To avoid overwhelming the upstream schema data store (which could be a 7-9 node cluster on 3.x with Mnesia), we limit the degree of parallelism and add configurable throttling delays into the process. Technical design pair: @ansd.
without it, the new keys (or rather, their defaults) will spill into the `config_schema_SUITE`s of other plugins.
We implement the `revive/0` part for symmetry. As with the revive command in general, it serves as a last resort available for rollback. Usually nodes put into maintenance mode are shortly stopped for upgrading or reconfiguration.
Previously, the following three supervisors used the wrong `shutdown` and wrong `type`: * rabbit_exchange_federation_sup * rabbit_federation_sup * rabbit_queue_federation_sup For `shutdown` Erlang/OTP recommends: "If the child process is another supervisor, the shutdown time must be set to infinity to give the subtree ample time to shut down. Setting the shutdown time to anything other than infinity for a child of type supervisor can cause a race condition where the child in question unlinks its own children, but fails to terminate them before it is killed." For `type` Erlang/OTP recommends: "type specifies if the child process is a supervisor or a worker. The type key is optional. If it is not specified, it defaults to worker." This commit fixes the wrong child spec by using a timeout of `infinity` and type `supervisor`. (cherry picked from commit cfcf6cf)
## What? Federation links started in the federation plugins are put under the `rabbit` app supervision tree (unfortunately). This commit ensures that the entire federation supervision hierarchies (including all federation links) are stopped **before** stopping app `rabbit` when stopping RabbittMQ. ## Why? Previously, we've seen cases where hundreds of federation links are stopped during the shutdown procedure in app `rabbit` leading to federation link restarts happening in parallel to vhosts being stopped. In one case, the shutdown of app `rabbit` even got stuck (although there is no evidence that federation was the problem). Either way, the cleaner appraoch is to gracefully stop all federation links, i.e. the entire supervision hierarchy under `rabbit_exchange_federation_sup` and `rabbit_queue_federation_sup` when stopping the federation apps, i.e. **before** proceeding to stop app `rabbit`. ## How? The boot step cleanup steps for the federation plugins are skipped when stopping RabbitMQ. Hence, this commit ensures that the supervisors are stopped in the stop/1 application callback. This commit does something similar to rabbitmq#14054 but uses a simpler approach. (cherry picked from commit 8bffa58)
when the core now interacts with a part of the supervision tree owned by this plugin for more efficient shutdown.
Fix the following test flake in CI:
```
direct_reply_to_amqp_SUITE > cluster_size_1 > many_replies
#1. {error,
{test_case_failed,{timeout,{num_received,2959},{num_missing,41}}}}
```
As documented in https://www.rabbitmq.com/docs/direct-reply-to#caveats-amqp
the volatile queue may drop messages if the writer proc can't send fast
enough.
Fix the following test flake in CI:
```
direct_reply_to_amqp_SUITE > cluster_size_1 > many_replies
#1. {error,
{test_case_failed,{timeout,{num_received,2959},{num_missing,41}}}}
```
As documented in https://www.rabbitmq.com/docs/direct-reply-to#caveats-amqp
the volatile queue may drop messages if the writer proc can't send fast
enough.
Test case `resends_lost_command` of `rabbit_fifo_int_SUITE` succeeded
without testing what it is supposed to test.
That's because the mock used the wrong arity.
Applying the following diff
```
- meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end),
+ meck:expect(ra, pipeline_command, fun (_, _, _, low) -> ok end),
```
made the test case then fail with:
```
rabbit_fifo_int_SUITE > tests > resends_lost_command
#1. {error,
{{badmatch,
{empty,
{state,
{cfg,
[{resends_lost_command,ct_rabbit@VQD7JFK37T}],
32,10000},
{resends_lost_command,ct_rabbit@VQD7JFK37T},
go,3,4,false,#{},
#{1 => {undefined,{e,2,msg2,{0,0}}},
2 => {undefined,{e,3,msg3,{0,0}}}},
#{},undefined,undefined}}},
[{rabbit_fifo_int_SUITE,resends_lost_command,1,
[{file,"rabbit_fifo_int_SUITE.erl"},{line,315}]},
{test_server,ts_tc,3,[{file,"test_server.erl"},{line,1796}]},
{test_server,run_test_case_eval1,6,
[{file,"test_server.erl"},{line,1305}]},
{test_server,run_test_case_eval,9,
[{file,"test_server.erl"},{line,1237}]}]}}
```
The timer was never actually scheduled in `rabbit_fifo_client`.
This commit therefore removes all the dead code.
Note that `resend_all_pending/1` is nowadays executed upon leader changes.
`nodeup` in the queue proc also causes `resend_all_pending/1` to be executed
even if the leader didn't change.
CI: Remove unused shard5 node
reusing an auto-delete exchange is asking for trouble
[Why] For a long time, there has been race condition when deleting exclusive queues - if a connection was re-established and a queue with the same name was declared, we could delete the new queue. For example, with many MQTT consumers, if we performed a rolling restart of the cluster and the clients reconnected without any delay, after the restart, we sometimes had the expected number of connections but a lower number of queues, even though there should be a queue for each consumer. [How] Check that the exclusive_owner has the value we expect when requesting deletion. If the value is different, this means this is effectively a different queue (same name, but a different connection), so we should not delete it.
…l-disconnect Federation: disconnect links before stopping, in parallel
…t_condition/2 (cherry picked from commit 7c3f039)
…ting-exclusive-queue Check exclusive queue owner before deleting a queue
Wait for both federation links to be established before proceeding with expect_federation calls. The test was flaky because it declared fed1.downstream2 inside the test function and immediately tried to verify federation without waiting for the asynchronous link setup.
to reduce flakiness and speed test execution up.
…r negative They can become negative due to ETS tables being concurrently added, removed and updated. So simply report 0 in such cases.
Add a test enabling all stable feature flags with exchange logging enabled. This is a more general test case for rabbitmq#11652 that should catch all future feature flag issues when `rabbit_ff_controller` publishes a message. Relates rabbitmq#11652 rabbitmq#14069 rabbitmq#14796
Add test enabling all feature flags with exchange logger
…le path in DETS store
94495e1 to
95e8547
Compare
|
Hey there friend. Got a notification about this PR in email automation. Is this work revitalized? Hallelujah if so. |
|
@mortdiggiddy I will soon get in touch with the RabbitMQ maintainers and see if they have plan to improve the retained messages store for MQTT plugin. |
See description and conversation in rabbitmq#13048