-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT: Support wildcards in topic filters matching retained messages #13048
base: main
Are you sure you want to change the base?
Conversation
@getlarge thank you for taking the time to contribute. Have you see There really isn't anything to add for Make, and Bazel is not used in
and it will update the Bazel files. It would be a good idea to do so even though in As for the implementation, are you referring to the test cases? Because I don't see a complete implementation in the GitHub diff, only a stub of sorts. |
Yes, I read it but saw all those declarations in the Bazel files, so I wondered if the same had to be done for the new module and test files.
I'll try that.
The implementation has not been pushed yet. I want the rabbit_globber unit tests to pass first, but I'm stuck there at the moment, and I can't make much sense of the error message. c(rabbit_globber).
c(rabbit_globber_tests).
eunit:test(rabbit_globber_tests). outputs:
but not in the # run the command from CONTRIBUTING.md, then:
cd deps/rabbitmq_mqtt
gmake ct-globber Which outputs: == globber_SUITE ==
* [tests]
Updating /Users/edouard/Dev/rabbitmq/rabbitmq-server/logs/index.html ... done
Updating /Users/edouard/Dev/rabbitmq/rabbitmq-server/logs/all_runs.html ... done
gmake: *** [../../erlang.mk:6083: ct-globber] Error 1 Is there anything obvious that i am missing? To hint at the eventual globber's usage, here's a suggested implementation of a -spec lookup_by_pattern(topic(), store_state()) -> [mqtt_msg()].
lookup_by_pattern(Pattern, #store_state{table = T}) ->
Globber = rabbit_globber:new(<<"/">>, <<"+">>, <<"#">>),
rabbit_globber:add(Globber, Pattern),
Matcher =
fun(#retained_message{topic = Topic}) -> rabbit_globber:test(Globber, Topic) end,
Msgs = ets:tab2list(T),
lists:filter(Matcher, Msgs).
|
@getlarge there is no shortage of unit tests (tests that do not start and stop nodes):
to list just a few examples. |
We run all of our suites with Common Test, EUnit is only used as matcher library in RabbitMQ (individual dependencies such as Ra or Khepri can use EUnit but you are contributing to RabbitMQ itself). That's that "ct" in So you are comparing apples to oranges: EUnit tests run from the shell to Common Test suites driven by erlang.mk. |
listed by Common Test is misleading. The problem is not with the return value, CT tests can return anything. The problem is that these functions do not accept a mandatory argument for CT tests: a CT CT seemingly has a problem with a test function named |
@getlarge with the following changes, all tests succeed with diff --git a/deps/rabbitmq_mqtt/test/globber_SUITE.erl b/deps/rabbitmq_mqtt/test/globber_SUITE.erl
index cc9f19e42a..fcdeceb977 100644
--- a/deps/rabbitmq_mqtt/test/globber_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/globber_SUITE.erl
@@ -1,3 +1,8 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module(globber_SUITE).
-compile([export_all, nowarn_export_all]).
@@ -8,10 +13,22 @@
all() ->
[{group, tests}].
-groups() ->
- [{tests, [shuffle], [new]}].
+ groups() ->
+ [
+ {tests, [parallel], [
+ new,
+ add,
+ remove,
+ match,
+ matching,
+ match_iter,
+ clear,
+ multiple_patterns
+ ]
+ }
+ ].
-new() ->
+new(_Config) ->
Globber = rabbit_globber:new(),
?assertEqual(#globber{}, Globber),
Globber2 = rabbit_globber:new(<<"/">>, <<"*">>, <<"#">>),
@@ -20,20 +37,20 @@ new() ->
wildcard_some = <<"#">>},
Globber2).
-add() ->
+add(_Config) ->
Globber = rabbit_globber:new(),
Globber1 = rabbit_globber:add(Globber, <<"test.*">>, <<"matches">>),
?assertMatch(#globber{trie = _}, Globber1),
Globber2 = rabbit_globber:add(Globber1, <<"test.#">>, <<"it n">>),
?assertMatch(#globber{trie = _}, Globber2).
-remove() ->
+remove(_Config) ->
Globber = rabbit_globber:new(),
Globber1 = rabbit_globber:add(Globber, <<"test.*">>, <<"matches">>),
Globber2 = rabbit_globber:remove(Globber1, <<"test.*">>, <<"matches">>),
?assertEqual(Globber, Globber2).
-match() ->
+match(_Config) ->
Globber = rabbit_globber:new(),
Globber1 = rabbit_globber:add(Globber, <<"test.*">>, <<"it matches">>),
Result = rabbit_globber:match(Globber1, <<"test.bar">>),
@@ -43,25 +60,25 @@ match() ->
Result3 = rabbit_globber:match(Globber1, <<"not.foo">>),
?assertEqual([], Result3).
-test() ->
+matching(_Config) ->
Globber = rabbit_globber:new(),
Globber1 = rabbit_globber:add(Globber, <<"test.*">>),
?assertEqual(true, rabbit_globber:test(Globber1, <<"test.bar">>)),
?assertEqual(false, rabbit_globber:test(Globber1, <<"foo.bar">>)).
-match_iter() ->
+match_iter(_Config) ->
Globber = rabbit_globber:new(),
Globber1 = rabbit_globber:add(Globber, <<"test.*">>, <<"matches">>),
Result = rabbit_globber:match_iter(Globber1, <<"test.bar">>),
?assertEqual([<<"matches">>], Result).
-clear() ->
+clear(_Config) ->
Globber = rabbit_globber:new(),
Globber1 = rabbit_globber:add(Globber, <<"test.*">>, <<"matches">>),
Globber2 = rabbit_globber:clear(Globber1),
?assertEqual(Globber, Globber2).
-multiple_patterns() ->
+multiple_patterns(_Config) ->
Globber = rabbit_globber:new(<<".">>, <<"*">>, <<"#">>),
Globber1 = rabbit_globber:add(Globber, <<"foo.#">>, <<"catchall">>),
Globber2 = rabbit_globber:add(Globber1, <<"foo.*.bar">>, <<"single_wildcard">>),
The module and suite need a better name, at the very least, they must be prefixed with The namespace for modules in Erlang is flat, so a single word module is rarely appropriate. And yes, we ended up with a module named |
Note that for unit tests that have no shared state, the group option of |
@getlarge I just updated the description in #8824 (comment) to add requirements a solution should comply with. Specifically, the solution should not perform full ETS / DETS table scans every time a client subscribes with a topic filter containing a wildcard because this will be prohibitively expensive if there are many retained messages. |
@ansd, As mentioned in my note in this comment, we could derive a pattern compatible with ETS from the topic containing wildcard and use
What would be a reasonable amount? |
Thanks for the explanation. |
@michaelklishin I did not use the complete plugin name as a prefix, because the module usage could be extended to different protocols. To keep it simple, i will do as you suggest and we'll see in the future if you want to consider using this matcher for more general purpose usage. |
@getlarge Either way, the bottom line is we want to avoid full O(n) table scans. O (log n) is acceptable. |
I changed the strategy to simplify things: instead of creating an abstract module that would support X, Y, Z storage, I focused on the ETS storage. Following @ansd's suggestion, I implemented a trie to store exploded topics. TestI added a new test module (
BenchmarkI also ran a benchmark to measure its performance:
EnvironmentModel Name: Mac mini ResultsExact matches are fast (logarithmic scaling ?), while wildcard matches are slower but perform reasonably well, even with large numbers of topics stored.
CodeThis is the code used to generate these charts: -module(rabbit_mqtt_topic_storage_bench).
-export([run/0, run/1]).
-include("rabbit_mqtt_topic_storage_ets.hrl").
% Test with exponentially increasing topic counts
-define(TOPIC_COUNTS, [1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000]).
-define(DEPTH, 7).
-define(WARMUP_ITERATIONS, 1000).
-define(TEST_ITERATIONS, 5000).
-define(BATCH_SIZE, 100).
run() ->
init_random(),
run(?TOPIC_COUNTS).
run(TopicCounts) when is_list(TopicCounts) ->
Results = lists:map(fun run_scenario/1, TopicCounts),
print_results(Results),
generate_charts(Results).
run_scenario(TopicCount) ->
io:format("~nTesting with ~p topics~n", [TopicCount]),
{ok, State} = rabbit_mqtt_topic_storage_ets:init(),
% Create varied test data
TestData = create_test_data(TopicCount, ?DEPTH),
PopulatedState = populate_store(State, TestData),
% Warm up heavily to ensure JIT stabilization
_ = bench_warmup(PopulatedState, ?WARMUP_ITERATIONS),
% Run actual benchmark
{ExactTimes, WildcardTimes} = bench_lookups(PopulatedState, ?TEST_ITERATIONS, TestData),
cleanup(PopulatedState),
#{topic_count => TopicCount,
exact_times => analyze_times(ExactTimes),
wildcard_times => analyze_times(WildcardTimes)}.
% test data section
create_test_data(Count, Depth) ->
% Create diverse topics that will match our wildcard patterns
lists:map(fun(N) ->
% Generate base topic like "1/level1/level2/level3/level4"
Topic = generate_topic(N, Depth),
% Create wildcard pattern that will match this and similar topics
% For a topic "1/level1/level2/level3/level4"
% Pattern will be "1/+/level2/#" - guaranteed to match some topics
Parts = binary:split(Topic, <<"/">>, [global]),
WildcardPattern =
case Parts of
[First | _] ->
% Create pattern that will match this and similar topics
iolist_to_binary([First, "/+/level2/#"])
end,
{N, Topic, WildcardPattern}
end,
lists:seq(1, Count)).
populate_store(State, TestData) ->
lists:foldl(fun({N, Topic, _}, AccState) ->
Value = iolist_to_binary(["msg", integer_to_list(N)]),
{ok, NewState} = rabbit_mqtt_topic_storage_ets:insert(Topic, Value, AccState),
NewState
end,
State,
TestData).
generate_topic(N, Depth) ->
% For each N, create several similar topics that will match the same wildcard
TopicNum = N div 10, % Group topics by tens to ensure wildcard matches
Variation = N rem 10, % Use remainder to create variations
Parts =
[integer_to_list(TopicNum), % First level is the group number
lists:concat(["var", integer_to_list(Variation)]), % Second level varies
"level2" % Fixed level that wildcards will match
| [lists:concat(["level", integer_to_list(I)]) || I <- lists:seq(3, Depth - 1)]],
iolist_to_binary(string:join(Parts, "/")).
cleanup(State) ->
ets:delete(State#state.node_table),
ets:delete(State#state.edge_table),
ets:delete(State#state.topic_table).
% benchmark
bench_warmup(State, Iterations) ->
Topics = [generate_topic(N, ?DEPTH) || N <- lists:seq(1, 10)],
Patterns = [iolist_to_binary([integer_to_list(N), "/+/#"]) || N <- lists:seq(1, 10)],
lists:foreach(fun(_) ->
[rabbit_mqtt_topic_storage_ets:lookup(T, State) || T <- Topics],
[rabbit_mqtt_topic_storage_ets:lookup(P, State) || P <- Patterns]
end,
lists:seq(1, Iterations)).
bench_lookups(State, Iterations, TestData) ->
% Select random test cases for each batch
BatchCount = Iterations div ?BATCH_SIZE,
ExactBatches =
[bench_exact_batch(State, TestData, ?BATCH_SIZE) || _ <- lists:seq(1, BatchCount)],
WildBatches =
[bench_wildcard_batch(State, TestData, ?BATCH_SIZE) || _ <- lists:seq(1, BatchCount)],
{lists:flatten(ExactBatches), lists:flatten(WildBatches)}.
bench_exact_batch(State, TestData, BatchSize) ->
% Take random samples for each batch
Samples = random_samples(TestData, BatchSize),
[{Time, Matches}
|| {_, Topic, _} <- Samples,
{Time, {ok, Matches}}
<- [timer:tc(fun() -> rabbit_mqtt_topic_storage_ets:lookup(Topic, State) end)]].
bench_wildcard_batch(State, TestData, BatchSize) ->
Samples = random_samples(TestData, BatchSize),
[{Time, Matches}
|| {_, _, Pattern} <- Samples,
{Time, {ok, Matches}}
<- [timer:tc(fun() -> rabbit_mqtt_topic_storage_ets:lookup(Pattern, State) end)]].
random_samples(List, N) ->
% Select random elements without replacement using older random functionality since rand was not available
Length = length(List),
Indices = lists:sort([{random:uniform(), X} || X <- lists:seq(1, Length)]),
Selected = lists:sublist([I || {_, I} <- Indices], N),
[lists:nth(I, List) || I <- Selected].
% Initialize random seed at the start
init_random() ->
{A, B, C} = os:timestamp(),
random:seed(A, B, C).
% measure
analyze_times(TimedResults) ->
Times = [Time / 1000.0 || {Time, _} <- TimedResults], % Convert to ms
Matches = [length(M) || {_, M} <- TimedResults],
#{times =>
#{min => lists:min(Times),
max => lists:max(Times),
avg => lists:sum(Times) / length(Times),
median => median(Times),
p95 => percentile(Times, 95)},
matches =>
#{min => lists:min(Matches),
max => lists:max(Matches),
avg => lists:sum(Matches) / length(Matches)}}.
median(List) ->
Sorted = lists:sort(List),
Length = length(Sorted),
Middle = Length div 2,
case Length rem 2 of
0 ->
(lists:nth(Middle, Sorted) + lists:nth(Middle + 1, Sorted)) / 2;
1 ->
lists:nth(Middle + 1, Sorted)
end.
percentile(List, P) when P >= 0, P =< 100 ->
Sorted = lists:sort(List),
Length = length(Sorted),
N = round(P * Length / 100),
lists:nth(max(1, min(N, Length)), Sorted).
print_results(Results) ->
io:format("~n=== Benchmark Results for depth ~B ===~n", [?DEPTH]),
io:format("~-12s ~-15s ~-15s ~-15s ~-15s ~-15s~n",
["Topics",
"Exact Avg(ms)",
"Exact P95(ms)",
"Wild Avg(ms)",
"Wild P95(ms)",
"Wild Matches"]),
io:format("~s~n", [string:copies("-", 87)]),
lists:foreach(fun(R) ->
#{topic_count := Count,
exact_times := #{times := #{avg := ExactAvg, p95 := ExactP95}},
wildcard_times :=
#{times := #{avg := WildAvg, p95 := WildP95},
matches := #{avg := MatchAvg}}} =
R,
io:format("~-12B ~-15.3f ~-15.3f ~-15.3f ~-15.3f ~-15.1f~n",
[Count, ExactAvg, ExactP95, WildAvg, WildP95, MatchAvg])
end,
Results).
% generate charts section
generate_charts(Results) ->
Charts = [generate_time_chart(Results), generate_matches_chart(Results)],
file:write_file("complexity_analysis.md", Charts).
generate_time_chart(Results) ->
XAxis = [integer_to_list(Count) || #{topic_count := Count} <- Results],
YExact =
[maps:get(avg, maps:get(times, ExactTimes)) || #{exact_times := ExactTimes} <- Results],
YWild =
[maps:get(avg, maps:get(times, WildTimes)) || #{wildcard_times := WildTimes} <- Results],
["```mermaid\n",
"%%{init: {'theme': 'base'}}%%\n",
"xychart-beta\n",
" title \"Average Lookup Time vs Topic Count with depth = ",
integer_to_list(?DEPTH),
"\"\n",
" x-axis [",
string:join(XAxis, ", "),
"]\n",
" y-axis \"Time (ms)\" 0 --> ",
io_lib:format("~.3f", [lists:max(YWild) * 1.2]),
"\n",
" bar [",
string:join([io_lib:format("~.3f", [Y]) || Y <- YWild], ", "),
"]\n",
" bar [",
string:join([io_lib:format("~.3f", [Y]) || Y <- YExact], ", "),
"]\n",
"```\n\n"].
generate_matches_chart(Results) ->
XAxis = [integer_to_list(Count) || #{topic_count := Count} <- Results],
YMatches =
[maps:get(avg, maps:get(matches, WildTimes))
|| #{wildcard_times := WildTimes} <- Results],
["```mermaid\n",
"%%{init: {'theme': 'base'}}%%\n",
"xychart-beta\n",
" title \"Average Wildcard Matches vs Topic Count\"\n",
" x-axis [",
string:join(XAxis, ", "),
"]\n",
" y-axis \"Matches\" 0 --> ",
io_lib:format("~.1f", [lists:max(YMatches) * 1.2]),
"\n",
" bar [",
string:join([io_lib:format("~.1f", [Y]) || Y <- YMatches], ", "),
"]\n",
"```\n"]. |
@getlarge these numbers look promising. Can you compare the throughput (and latency, if you have the MQTT tooling that makes that easy) vs. I understand that we are talking retained messages here, this is mostly to make sure there are not meaningful regressions. |
To be sure I got it right, you asked to run RabbitMQ with the MQTT plugin using this new ETS store VS RabbitMQ 4.0.5 with the current MQTT plugin retained messages (ETS) store, correct? |
@getlarge I wasn't talking about the retention case. Here is an example of what our team does but you don't have to be that thorough, just a basic benchmark, 3 runs or so, will give you an idea. Retention specifically can be tested next. You can try 10K retained messages, then 100K, then 1M, then 10M. Just measure how long it takes for them to be delivered, for example. |
Proposed Changes
This PR would fix #8824.
The first step is to add a glob matcher (
deps/rabbitmq_mqtt/src/rabbit_globber.erl
) that could work with different separators and wildcards symbols (e.g./
,.
,+
). It is inspired by therabbit_db_topic_exchange,
except that it is storage agnostic to accommodate the current and future message store modules (#8096).The second step is to integrate the
rabbit_globber
in the storage modules:Either via a separate exported function or inside the
lookup
function.This would require
rabbit_mqtt_retainer:handle_call
to accept and reply with a list of#mqtt_msg
records.Questions
rabbit_globber
tests, but not in this more complex setup (usinggmake ct-globber
). I am new to Erlang and Bazel.rabbit_mqtt_retained_msg_store
?Types of Changes
Checklist
CONTRIBUTING.md
documentFurther Comments
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution
you did and what alternatives you considered, etc.