Skip to content

Commit

Permalink
fix use of non-active queue master pids on balance transitions
Browse files Browse the repository at this point in the history
References: #6
  • Loading branch information
Ayanda-D committed Nov 23, 2018
1 parent 1950b59 commit 3b76e04
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 46 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
.*.sw?
*.beam
.erlang.mk/
.DS_Store
MnesiaCore.*
/cover/
/deps/
/doc/
Expand Down
5 changes: 4 additions & 1 deletion include/rabbit_queue_master_balancer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
-define(DEFAULT_OPERATIONAL_PRIORITY, 5).
-define(DEFAULT_POLICY_TRANSITION_DELAY, 50).
-define(DEFAULT_SYNC_DELAY_TIMEOUT, 3000).
-define(DEFAULT_MASTER_VERIFICATION_TIMEOUT, 300000).
-define(DEFAULT_QLOOKUP_DELAY, 200).
-define(DELAY(T), timer:sleep(T)).

-define(STATE_IDLE, idle).
-define(STATE_READY, ready).
-define(STATE_BALANCING_QUEUES, balancing_queues).
-define(STATE_PAUSE, pause).
-define(STATE_PAUSE, pause).
50 changes: 34 additions & 16 deletions src/rabbit_queue_master_balancer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -303,27 +303,21 @@ shuffle_queue(Q, _MinMaster, _Priority, _PTD, _SynchTimeout, _VSNComp) ->

shuffle(_, QN, _, _, _, _SPids = [], _, _, _, _User, M) when M > 0 ->
{ok, QN};
shuffle(VHost, QN, Policy, MinMaster, _QPid, SPids, Priority, PTD, SynchTimeout,
shuffle(VHost, QN, Policy, MinMaster, _QPid, _SPids, Priority, PTD, SynchTimeout,
User, _M) ->
Pattern = list_to_binary(lists:concat(["^", binary_to_list(QN), "$"])),
ok = rabbit_queue_master_balancer_sync:sync_mirrors(SPids, get_queue(VHost, QN)),
ok = policy_transition_delay(PTD),
ok = ensure_sync(VHost, QN, SynchTimeout),
ok = set_policy(VHost, QN, Pattern, [{<<"ha-mode">>, <<"nodes">>},{<<"ha-params">>,
[list_to_binary(atom_to_list(MinMaster))]}], Priority, <<"queues">>, User),
ok = policy_transition_delay(PTD),
ok = ensure_sync(VHost, QN, SynchTimeout),
ok = delete_policy(VHost, QN, User),
ok = policy_transition_delay(PTD),
ok = ensure_sync(VHost, QN, SynchTimeout),
ok = reset_policy(Policy, PTD, User),
ok = policy_transition_delay(PTD),
ok = rabbit_queue_master_balancer_sync:sync_mirrors(SPids, get_queue(VHost, QN)),
try
rabbit_queue_master_balancer_sync:verify_sync(VHost, QN, SPids, SynchTimeout)
catch
_:Reason ->
error_logger:error_msg("Queue Master Balancer synchronisation error. "
"Queue: ~p, Reason: ~p~n", [QN, Reason]),
void
end,
ok = ensure_sync(VHost, QN, SynchTimeout),
{ok, QN}.

set_policy(VHost, QN, Pattern, Spec, Priority, ApplyTo, undefined) ->
Expand Down Expand Up @@ -374,8 +368,8 @@ get_queue(VHost, QN) ->
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(VHost, queue, QN)),
Q.

get_queue_node(Q) ->
node(case Q of
get_queue_node(AMQQueue) ->
node(case AMQQueue of
{amqqueue, {resource, _,queue,_},_,_,_,_,Pid,_,_,_,_,_,_,live} ->
Pid;
{amqqueue, {resource, _,queue,_},_,_,_,_,Pid,_,_,_,_,_,_,live,_} ->
Expand All @@ -385,6 +379,17 @@ get_queue_node(Q) ->
Other -> error({unsupported_version, Other})
end).

get_queue_slaves(AMQQueue) ->
case AMQQueue of
{amqqueue, {resource, _, queue, _},_,_,_,_,_,SPids,_,_,_,_,_,live} ->
SPids;
{amqqueue, {resource, _, queue, _},_,_,_,_,_,SPids,_,_,_,_,_,live,_} ->
SPids;
{amqqueue,{resource, _, queue, _},_,_,_,_,_,SPids,_,_,_,_,_,_,live,_,_,_,_} ->
SPids;
Other -> error({unsupported_version, Other})
end.

ts() ->
{Mega, Sec, USec} = os:timestamp(),
(Mega * 1000000 + Sec) * 1000 + round(USec/1000).
Expand All @@ -401,9 +406,7 @@ get_policy_trans_delay() ->
?DEFAULT_POLICY_TRANSITION_DELAY
end.

policy_transition_delay(PTD) -> delay(PTD).

delay(T) -> timer:sleep(T).
policy_transition_delay(PTD) -> ?DELAY(PTD).

messages(Q) ->
[{messages, Messages}] = rabbit_amqqueue:info(Q, [messages]),
Expand All @@ -423,3 +426,18 @@ maybe_drop(Key) -> ets:delete(?TAB, Key).

get_config(Tag, Default) ->
rabbit_misc:get_env(rabbitmq_queue_master_balancer, Tag, Default).

ensure_sync(VHost, QN, SynchTimeout) ->
try
%% TODO: Distinct queue master verification timeout!
ok = rabbit_queue_master_balancer_sync:verify_master(VHost, QN),
AMQQueue = get_queue(VHost, QN),
SPids = get_queue_slaves(AMQQueue),
ok = rabbit_queue_master_balancer_sync:sync_mirrors(AMQQueue),
ok = rabbit_queue_master_balancer_sync:verify_sync(VHost, QN, SPids, SynchTimeout)
catch
_:Reason ->
error_logger:error_msg("Queue Master Balancer synchronisation error. "
"Queue: ~p, Reason: ~p~n", [QN, Reason]),
exit(Reason)
end.
58 changes: 54 additions & 4 deletions src/rabbit_queue_master_balancer_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@

-module(rabbit_queue_master_balancer_sync).

-export([sync_mirrors/2, verify_sync/3, verify_sync/4]).
-export([sync_mirrors/1, verify_sync/3, verify_sync/4]).
-export([verify_master/2, verify_master/3, verify_master/4]).

-include("rabbit_queue_master_balancer.hrl").

% ---------------------------------------------------------------
-spec sync_mirrors(list(), rabbit_types:amqqueue() | pid()) -> 'ok'.
-spec sync_mirrors(rabbit_types:amqqueue() | pid()) -> 'ok'.
-spec verify_sync(binary(), binary(), list()) -> 'ok'.
-spec verify_sync(binary(), binary(), list(), integer()) -> 'ok'.
% ----------------------------------------------------------------

sync_mirrors([], _Q) -> ok;
sync_mirrors(_SPids, Q) ->
-define(SLEEP, (?DELAY(?DEFAULT_QLOOKUP_DELAY))).

sync_mirrors(Q) ->
_Any = rabbit_amqqueue:sync_mirrors(Q),
ok.

Expand All @@ -54,10 +56,58 @@ verify_sync(VHost, SynchPPid, SynchRef, QN, SPids) ->
end.

synchronised_slave_pids(VHost, Queue) ->
?SLEEP,
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(VHost, queue, Queue)),
SSP = synchronised_slave_pids,
[{SSP, Pids}] = rabbit_amqqueue:info(Q, [SSP]),
case Pids of
'' -> [];
_ -> Pids
end.

verify_master(VHost, QN) ->
verify_master(VHost, QN, ?DEFAULT_MASTER_VERIFICATION_TIMEOUT).
verify_master(VHost, QN, Timeout) ->
VerifierPPid = self(),
VerifierRef = make_ref(),
Verifier = spawn(fun() ->
verify_master(VHost, VerifierPPid, VerifierRef, QN)
end),
receive
{Verifier, _VerifierRef, alive} -> ok;
{'EXIT', _VerifierRef, Reason} -> throw({verify_master_termination, Reason})
after Timeout ->
exit(Verifier, {verify_master_timeout, ?MODULE})
end.

verify_master(VHost, VerifierPPid, VerifierRef, QN) ->
IsQMasterAlive = is_queue_master_alive(VHost, QN),
if IsQMasterAlive -> VerifierPPid ! {self(), VerifierRef, alive};
true -> verify_master(VHost, VerifierPPid, VerifierRef, QN)
end.

is_queue_master_alive(VHost, Queue) ->
?SLEEP,
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(VHost, queue, Queue)),
{Pid, State} = get_pid_and_state(Q),
is_pid_alive(Pid) andalso (State =:= live).

%% Queue process can now be exisisting on remote node after migration operations
is_pid_alive(Pid) when is_pid(Pid) ->
LocalNode = node(),
case node(Pid) of
LocalNode -> is_process_alive(Pid);
RemoteNode ->
rpc:call(RemoteNode, erlang, is_process_alive, [Pid])
end.

get_pid_and_state(AMQQueue) ->
case AMQQueue of
{amqqueue, {resource, _, queue, _},_,_,_,_,Pid,_,_,_,_,_,_,State} ->
{Pid, State};
{amqqueue, {resource, _, queue, _},_,_,_,_,Pid,_,_,_,_,_,_,State,_} ->
{Pid, State};
{amqqueue,{resource, _, queue, _},_,_,_,_,Pid,_,_,_,_,_,_,_,State,_,_,_,_} ->
{Pid, State};
Other -> error({unsupported_version, Other})
end.
99 changes: 74 additions & 25 deletions test/rabbit_queue_master_balancer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ groups() ->
successful_queue_balancing_no_ha_no_messages,
successful_queue_balancing_with_ha_no_messages,
successful_queue_balancing_with_ha_and_messages,
successful_queue_balancing_with_ha_75_queues_300_messages_notraffic,
successful_queue_balancing_with_ha_75_queues_300_messages_traffic,
unsuccessful_queue_balancing_no_ha_with_messages
]}
].
Expand Down Expand Up @@ -87,30 +89,55 @@ successful_queue_balancing_no_ha_no_messages(Config) ->
setup_queue_master_balancer(Config, A),
Queues = 4,
Messages= 0,
Delay = 1000,
init_queues(Config, 0, Queues, Messages),
passed = successful_run(Config, Queues, Nodes, Delay).
Delay = 50,
{ok, _Ch} = init_queues(Config, 0, Queues, Messages),
passed = successful_run(Config, Queues, Nodes, Delay).

successful_queue_balancing_with_ha_no_messages(Config) ->
[A|_] = Nodes = get_node_names(Config, 3),
setup_queue_master_balancer(Config, A),
Queues = 4,
Messages= 0,
Delay = 1000,
init_queues(Config, 0, Queues, Messages),
Delay = 50,
{ok, _Ch} = init_queues(Config, 0, Queues, Messages),
passed = successful_run(Config, Queues, Nodes, Delay),
verify_ha(Config).
passed = verify_ha(Config).

successful_queue_balancing_with_ha_and_messages(Config) ->
[A|_] = Nodes = get_node_names(Config, 3),
setup_queue_master_balancer(Config, A),
Queues = 4,
Messages = 10,
Delay = 1000,
init_queues(Config, 0, Queues, Messages),
Delay = 50,
{ok, _Ch} = init_queues(Config, 0, Queues, Messages),
passed = successful_run(Config, Queues, Nodes, Delay),
verify_ha(Config),
verify_messages(Config, 0, Messages).
passed = verify_ha(Config),
passed = verify_messages(Config, 0, Messages).

successful_queue_balancing_with_ha_75_queues_300_messages_notraffic(Config) ->
[A|_] = Nodes = get_node_names(Config, 3),
setup_queue_master_balancer(Config, A),
Queues = 75,
Messages = 300,
Delay = 50,
{ok, _Ch} = init_queues(Config, 0, Queues, Messages),
passed = successful_run(Config, Queues, Nodes, Delay),
passed = verify_ha(Config),
passed = verify_messages(Config, 0, Messages).

successful_queue_balancing_with_ha_75_queues_300_messages_traffic(Config) ->
[A|_] = Nodes = get_node_names(Config, 3),
setup_queue_master_balancer(Config, A),
Queues = 75,
Messages = 200,
ActiveMessages = 100,
Rate = 10, %% Messages per second
Delay = 50,
{ok, Ch} = init_queues(Config, 0, Queues, Messages),
generate_traffic(Ch, ActiveMessages, Rate),
passed = successful_run(Config, Queues, Nodes, Delay),
passed = verify_ha(Config),
passed = verify_messages(Config, 0, Messages + ActiveMessages).

unsuccessful_queue_balancing_no_ha_with_messages(Config) ->
[A|_] = Nodes = get_node_names(Config, 3),
Expand All @@ -119,8 +146,8 @@ unsuccessful_queue_balancing_no_ha_with_messages(Config) ->
Messages= 10,
Delay = 50,
init_queues(Config, 0, Queues, Messages),
passed = unsuccessful_run(Config, Queues, Nodes, Delay),
verify_messages(Config, 0, Messages).
passed = unsuccessful_run(Config, Queues, Nodes, Delay),
passed = verify_messages(Config, 0, Messages).


successful_run(Config, N, [A, B, C], Delay) ->
Expand Down Expand Up @@ -156,11 +183,8 @@ successful_run(Config, N, [A, B, C], Delay) ->
ok = continue(Config, A),
Info4 = info(Config, A),

%% Wait for Balancing Complete
delay(Delay),

%% Balancing Complete: Info5
Info5 = info(Config, A),
%% Wait for Balancing Complete: Info5
{ok, Info5} = wait_until_complete(Config, A, Delay),

%% Stop: Info6
ok = stop(Config, A),
Expand Down Expand Up @@ -290,7 +314,8 @@ init_queues(Config, Node, NQs, NMsgs) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
setup_exchange(Ch),
declare_queues(Ch, NQs),
publish(Ch, NMsgs).
publish(Ch, NMsgs),
{ok, Ch}.

setup_exchange(Ch) ->
XD = #'exchange.declare'{exchange = ?EXHANGE,
Expand All @@ -310,6 +335,17 @@ declare_queues(Ch, NQs) ->
end || N <- lists:seq(1, NQs)],
ok.

%% Very simple traffic generator. Spawn a publisher at passed Rate
%% every second till messages aggregate have been published
generate_traffic(_Ch, 0, _Rate) -> ok;
generate_traffic(Ch, Aggregate, Rate) when Aggregate < Rate ->
%% Send remaining messages
_Pid = spawn(fun() -> publish(Ch, Aggregate) end);
generate_traffic(Ch, Aggregate, Rate) when Aggregate >= Rate ->
_Pid = spawn(fun() -> publish(Ch, Rate) end),
timer:sleep(1000),
generate_traffic(Ch, Aggregate - Rate, Rate).

publish(Ch, NMsgs) ->
[begin
Msg = list_to_binary("test.message."++integer_to_list(N)),
Expand Down Expand Up @@ -360,7 +396,7 @@ setup_queue_master_balancer(Config, Node) ->
init_queue_master_balancer_remote() ->
application:set_env(rabbitmq_queue_master_balancer, operational_priority, 15),
application:set_env(rabbitmq_queue_master_balancer, preload_queues, false),
application:set_env(rabbitmq_queue_master_balancer, sync_delay_timeout, 100),
application:set_env(rabbitmq_queue_master_balancer, sync_delay_timeout, 6000000),
application:set_env(rabbitmq_queue_master_balancer, policy_transition_delay, 100),

ok = application:start(rabbitmq_queue_master_balancer).
Expand All @@ -380,13 +416,13 @@ occurance(N, [N|Rem], C) -> occurance(N, Rem, C+1);
occurance(N, [_|Rem], C) -> occurance(N, Rem, C).

info(Config, N) ->
fire_fsm_event(Config, N, info).
fire_fsm_event(Config, N, info, [infinity]).

status(Config, N) ->
fire_fsm_event(Config, N, status).

report(Config, N) ->
fire_fsm_event(Config, N, report).
fire_fsm_event(Config, N, report, [infinity]).

load_queues(Config, N) ->
fire_fsm_event(Config, N, load_queues).
Expand All @@ -407,13 +443,16 @@ stop(Config, N) ->
fire_fsm_event(Config, N, stop).

fire_fsm_event(Config, N, Cmd) ->
fire_fsm_event(Config, N, Cmd, []).
fire_fsm_event(Config, N, Cmd, Args) ->
rabbit_ct_broker_helpers:rpc(Config, N,
rabbit_queue_master_balancer, Cmd, []).
rabbit_queue_master_balancer, Cmd, Args).

verify_ha(Config) ->
Cluster = get_node_names(Config, 3),
QNs = get_queue_names_and_nodes(Config),
[passed = assert_slaves(Config, 0, QN, {N, Cluster--[N]}) || {QN, N} <- QNs].
[passed = assert_slaves(Config, 0, QN, {N, Cluster--[N]}) || {QN, N} <- QNs],
passed.

assert_slaves(Config, RPCNode, QName, {ExpMNode, ExpSNodes}) ->
Q = find_queue(Config, QName, RPCNode),
Expand Down Expand Up @@ -467,6 +506,16 @@ verify_messages(Config, Node, ExpectedMessageCount) ->
rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info,
[Q, [messages]])
|| Q <- rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, list, [])
].
],
passed.

delay(T) -> timer:sleep(T).

delay(T) -> timer:sleep(T).
%% CT tests will timeout and fail if completion is prolonged!
wait_until_complete(Config, A, T) ->
case pget(phase, Info = info(Config, A)) of
?STATE_IDLE -> {ok, Info};
_ ->
delay(T),
wait_until_complete(Config, A, T)
end.

0 comments on commit 3b76e04

Please sign in to comment.