Skip to content

MQTT: Support wildcards in topic filters matching retained messages#1

Open
getlarge wants to merge 57 commits intomainfrom
rabbitmq-server-8096
Open

MQTT: Support wildcards in topic filters matching retained messages#1
getlarge wants to merge 57 commits intomainfrom
rabbitmq-server-8096

Conversation

@getlarge
Copy link
Copy Markdown
Owner

@getlarge getlarge commented Feb 3, 2025

See description and conversation in rabbitmq#13048

@robertsLando
Copy link
Copy Markdown

when I delete a retained message I see a publish of {} instead. After this if I disconnect and re-connect I don't see it anymore so it's partially working.

Screencast.da.2025-02-12.10-50-09.webm

@mortdiggiddy
Copy link
Copy Markdown

mortdiggiddy commented Mar 4, 2025

Hello there Mr. getLarge.

I have followed your work regarding this ticket and all of the progress you made, right up until the point where it would not be accepted by the RabbitMQ team :)

We are very interested in a feature like this, I work in a Robotics Lab in Dubai. I ran into this issue the other day trying to understand why retained topics with a '+' for word replacement was not working when the retain flag is set by the publisher.

I have thoroughly reviewed the entire history of this request, all the way back to July 2018 where it seems to have started.

How can my team use your work and adopt this (Earlang plugin?) into our running RabbitMQ single instance as well as our 3.13 instances running inside the Rabbit MQ cluster operator on Kubernetes.

I am willing to help write any code to make this happen.

@getlarge @robertsLando

michaelklishin and others added 8 commits January 15, 2026 10:04
This yields very significant efficiency gains
with hundreds or thousands of links.

To avoid overwhelming the upstream schema data store
(which could be a 7-9 node cluster on 3.x with Mnesia),
we limit the degree of parallelism and add configurable
throttling delays into the process.

Technical design pair: @ansd.
without it, the new keys (or rather, their defaults) will spill into the `config_schema_SUITE`s of other plugins.
We implement the `revive/0` part for symmetry. As with the revive command in general, it serves as a last resort available for rollback.

Usually nodes put into maintenance mode are shortly stopped for upgrading or reconfiguration.
Previously, the following three supervisors used the wrong `shutdown`
and wrong `type`:
* rabbit_exchange_federation_sup
* rabbit_federation_sup
* rabbit_queue_federation_sup

For `shutdown` Erlang/OTP recommends:
"If the child process is another supervisor, the shutdown time must be
set to infinity to give the subtree ample time to shut down. Setting the
shutdown time to anything other than infinity for a child of type supervisor
can cause a race condition where the child in question unlinks its own children,
but fails to terminate them before it is killed."

For `type` Erlang/OTP recommends:
"type specifies if the child process is a supervisor or a worker.
The type key is optional. If it is not specified, it defaults to
worker."

This commit fixes the wrong child spec by using a timeout of `infinity`
and type `supervisor`.

(cherry picked from commit cfcf6cf)
 ## What?

Federation links started in the federation plugins are put
under the `rabbit` app supervision tree (unfortunately).

This commit ensures that the entire federation supervision hierarchies
(including all federation links) are stopped **before** stopping app
`rabbit` when stopping RabbittMQ.

 ## Why?

Previously, we've seen cases where hundreds of federation links are
stopped during the shutdown procedure in app `rabbit` leading to
federation link restarts happening in parallel to vhosts being stopped.
In one case, the shutdown of app `rabbit` even got stuck (although there
is no evidence that federation was the problem).

Either way, the cleaner appraoch is to gracefully stop all federation
links, i.e. the entire supervision hierarchy under
`rabbit_exchange_federation_sup` and `rabbit_queue_federation_sup`
when stopping the federation apps, i.e. **before** proceeding to stop
app `rabbit`.

 ## How?

The boot step cleanup steps for the federation plugins are skipped when
stopping RabbitMQ.

Hence, this commit ensures that the supervisors are stopped in the
stop/1 application callback.

This commit does something similar to rabbitmq#14054
but uses a simpler approach.

(cherry picked from commit 8bffa58)
when the core now interacts with a part of the
supervision tree owned by this plugin for
more efficient shutdown.
getlarge pushed a commit that referenced this pull request Jan 16, 2026
Fix the following test flake in CI:
```
direct_reply_to_amqp_SUITE > cluster_size_1 > many_replies
    #1. {error,
            {test_case_failed,{timeout,{num_received,2959},{num_missing,41}}}}
```
As documented in https://www.rabbitmq.com/docs/direct-reply-to#caveats-amqp
the volatile queue may drop messages if the writer proc can't send fast
enough.
getlarge pushed a commit that referenced this pull request Jan 16, 2026
Fix the following test flake in CI:
```
direct_reply_to_amqp_SUITE > cluster_size_1 > many_replies
    #1. {error,
            {test_case_failed,{timeout,{num_received,2959},{num_missing,41}}}}
```
As documented in https://www.rabbitmq.com/docs/direct-reply-to#caveats-amqp
the volatile queue may drop messages if the writer proc can't send fast
enough.
getlarge pushed a commit that referenced this pull request Jan 16, 2026
Test case `resends_lost_command` of `rabbit_fifo_int_SUITE` succeeded
without testing what it is supposed to test.
That's because the mock used the wrong arity.
Applying the following diff
```
-    meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end),
+    meck:expect(ra, pipeline_command, fun (_, _, _, low) -> ok end),
```
made the test case then fail with:
```
rabbit_fifo_int_SUITE > tests > resends_lost_command
    #1. {error,
            {{badmatch,
                 {empty,
                     {state,
                         {cfg,
                             [{resends_lost_command,ct_rabbit@VQD7JFK37T}],
                             32,10000},
                         {resends_lost_command,ct_rabbit@VQD7JFK37T},
                         go,3,4,false,#{},
                         #{1 => {undefined,{e,2,msg2,{0,0}}},
                           2 => {undefined,{e,3,msg3,{0,0}}}},
                         #{},undefined,undefined}}},
             [{rabbit_fifo_int_SUITE,resends_lost_command,1,
                  [{file,"rabbit_fifo_int_SUITE.erl"},{line,315}]},
              {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1796}]},
              {test_server,run_test_case_eval1,6,
                  [{file,"test_server.erl"},{line,1305}]},
              {test_server,run_test_case_eval,9,
                  [{file,"test_server.erl"},{line,1237}]}]}}
```

The timer was never actually scheduled in `rabbit_fifo_client`.
This commit therefore removes all the dead code.

Note that `resend_all_pending/1` is nowadays executed upon leader changes.
`nodeup` in the queue proc also causes `resend_all_pending/1` to be executed
even if the leader didn't change.
lhoguin and others added 17 commits January 16, 2026 12:58
reusing an auto-delete exchange is asking for trouble
[Why]
For a long time, there has been race condition when deleting
exclusive queues - if a connection was re-established and a queue
with the same name was declared, we could delete the new queue.

For example, with many MQTT consumers, if we performed a rolling restart
of the cluster and the clients reconnected without any delay, after the
restart, we sometimes had the expected number of connections but a lower
number of queues, even though there should be a queue for each consumer.

[How]
Check that the exclusive_owner has the value we expect when requesting
deletion. If the value is different, this means this is effectively a
different queue (same name, but a different connection), so we should
not delete it.
…l-disconnect

Federation: disconnect links before stopping, in parallel
…ting-exclusive-queue

Check exclusive queue owner before deleting a queue
Wait for both federation links to be established before proceeding
with expect_federation calls. The test was flaky because it declared
fed1.downstream2 inside the test function and immediately tried to
verify federation without waiting for the asynchronous link setup.
to reduce flakiness and speed test execution up.
…r negative

They can become negative due to ETS tables being concurrently
added, removed and updated.

So simply report 0 in such cases.
michaelklishin and others added 27 commits January 17, 2026 10:45
Add a test enabling all stable feature flags with exchange logging enabled.

This is a more general test case for rabbitmq#11652
that should catch all future feature flag issues when `rabbit_ff_controller` publishes a message.

Relates
rabbitmq#11652
rabbitmq#14069
rabbitmq#14796
Add test enabling all feature flags with exchange logger
@getlarge getlarge force-pushed the rabbitmq-server-8096 branch from 94495e1 to 95e8547 Compare January 19, 2026 13:02
@mortdiggiddy
Copy link
Copy Markdown

Hey there friend. Got a notification about this PR in email automation.

Is this work revitalized?

Hallelujah if so.

@getlarge
Copy link
Copy Markdown
Owner Author

@mortdiggiddy I will soon get in touch with the RabbitMQ maintainers and see if they have plan to improve the retained messages store for MQTT plugin.
Until then, this fork and branch are still alive, we are using them in our custom Docker image for RabbitMQ.
We also use this new plugin to add extra metadata about the MQTT clients.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants