Skip to content

Commit

Permalink
Updates:
Browse files Browse the repository at this point in the history
  - Introduces tests.
  - Adds report/0 to FSM.
  - Ignores non-mirrored queues holding messages.
  - Updates documentation.
  • Loading branch information
Ayanda-D committed Oct 22, 2017
1 parent 2bb3e9f commit 123cc0d
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 42 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@
/logs/
/plugins/
/xrefr
/test

rabbitmq_queue_master_balancer.d
70 changes: 58 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@

RabbitMQ Queue Master Balancer is a tool used for attaining queue master equilibrium across a RabbitMQ cluster installation. The plugin achieves this by computing queue master counts on nodes and engaging in shuffling procedures, with the ultimate goal of evenly distributing `queue masters` across RabbitMQ cluster nodes. Internally, the tool comprises of an FSM engine which transitions between different states of operation, to allow the procedures to be carried in a fully controllable manner.

**NOTE:** This plugin is still experimental and yet to mature. We strictly recommend usage for RabbitMQ support operations only, at planned and scheduled time periods when the cluster nodes are under minimal, or most preferrably, zero traffic load.
We define **Queue Equilibrium** as the state in which the following condition is/has been satisfied across all running cluster nodes:

![inline fit](./priv/images/QueueMasterBalancerQueueEquilibrium.png)

Points to consider prior usage:

- This plugin will only apply to **mirrored queues (with or without messages)**, and **non-mirrored queues without messages**. For non-mirrored queues holding messages, we recommend users first setup an [HA policy](https://www.rabbitmq.com/ha.html) of choice, based on their cluster setup and needs, and then make use of this plugin.
- The plugin considers **single, non-mirrored queues holding messages** a risk to migrate around cluster nodes, for example, in case of any network problems during migration procedures of such a queue, possibility of losing messages cannot be ignored. It therefore ignores such queues, up until users have setup an [HA policy](https://www.rabbitmq.com/ha.html) of choice and have slave queues running on the cluster for message redundancy. This is a safety precaution to protect users from any possibility of message loss during queue migration.
- The plugin is young, and yet to mature. We strictly recommend usage for RabbitMQ support operations only, at planned and scheduled time periods when the cluster nodes are under minimal, or most preferably, zero traffic load.

## Design

Expand All @@ -23,7 +31,7 @@ In addition, regardless of its current state, the Queue Master Balancer plugin i

## Supported RabbitMQ Versions

This plugin is compatible with RabbitMQ 3.6.x and beyond.
This plugin is compatible with RabbitMQ 3.6.x and beyond, to the latest release.


## Installation
Expand All @@ -34,18 +42,22 @@ Download pre-compiled versions from [https://github.com/Ayanda-D/rabbitmq-queue-

### Build

Clone and build the plugin by executing `make`. To create a package, execute `make dist` and find the `.ez` package file in the `plugins` directory.
Clone and switch to branch `stable` of this plugin, then build the plugin by executing `make`. To create a package, execute `make dist` and find the `.ez` package file in the `plugins` directory.

### Testing

Likewise, clone and switch to branch `stable`, then execute `make tests` to test the plugin. View test results from the generated HTML files.

## Configuration

The Queue Master Balancer may be configured in the `rabbitmq.config` file as follows:
The Queue Master Balancer is configured in the `rabbitmq.config` file as follows:

```
[{rabbitmq_queue_master_balancer,
[{operational_priority, 5},
{preload_queues, false},
{sync_delay_timeout, 3000},
{inter_policy_delay, 50}]
[{operational_priority, 5},
{preload_queues, false},
{sync_delay_timeout, 3000},
{policy_transition_delay, 50}]
}].
```
Expand All @@ -57,8 +69,8 @@ The following table summarizes the meaning of these configuration parameters.
|---|---|---|---|
| operational\_priority | Priority level the plugin will use to balance queues across the cluster. This should be higher than the highest configured policy priority | Integer | 5 |
| preload\_queues | Determines whether queues are automatically loaded on plugin start-up before the balancing operation is started | Boolean | false |
| sync\_delay_timeout | Time period (in milliseconds) the plugin should wait for slave queues to synchronize to the master queue before the balancing procedure is completed | Integer | 3000 |
| inter\_policy_delay | Time period (in milliseconds) the plugin should wait while changing/transitioning from one policy to another. The plugin undergoes `4` transitions when balancing a queue | Integer | 50 |
| sync\_delay_timeout | Time period (in milliseconds) the plugin should wait for slave queues to synchronize to the master queue before balancing procedure is marked complete | Integer | 3000 |
| policy\_transition_delay | Time period (in milliseconds) the plugin should wait while changing/transitioning from one policy to another. The plugin undergoes `4` transitions when balancing a queue | Integer | 50 |


## Operation
Expand Down Expand Up @@ -113,20 +125,54 @@ In context of the plugin, stopping is similar to `reset`, in that it is set back

`rabbitmqctl eval 'rabbit_queue_master_balancer:stop().'`

### 9. Disable plugin
### 9. Report

To acquire a report illustrating the distribution of queues across the cluster (at any moment in time), the following command may be used:

`rabbitmqctl eval 'rabbit_queue_master_balancer:report().'`

### 10. Disable plugin

The plugin is disabled as follows:

`rabbitmq-plugins disable rabbitmq_queue_master_balancer`


## Additional information

The following aspects must be put into consideration when putting it to use:

- Queue balancing is a delicate operation which **must** be carried out in a very controlled manner and environment not prone to network partitions. Ensure your network is in a stable condition prior to executing queue balancing procedures.
- Configuration parameters such as `inter_policy_delay` need to be bumped up as aspects such as cluster size, queue slave count and message size increase.
- Configuration parameters such as `policy_transition_delay` need to be bumped up as aspects such as cluster size, queue slave count and message size increase.
- The distribution of Queues within a cluster is non-deterministic and a single execution round of the Queue Master Balancer may not be enough to attain immediate equilibria. The plugin may (or may not) need multiple execution rounds before satisfactory queue equilibrium is attained.

## Example Usage

This [link illustrates](https://gist.github.com/Ayanda-D/ddd5fcb5d87c8761fbf2c663fdd07ce6) the plugin in full use. An **unbalanced 3-node** cluster exists, consisting of 21 queues, in which all queue masters reside on a single node: `'rabbit@Ayandas-MacBook-Pro'`. The Queue Master Balancer is then executed, and queried for its status information during the process until balancing procedures have completed. To illustrate its results, we generate queue distribution report of the cluster prior balancing the queues, and another after the balancing procedures have completed. The results summary are as follows:

- Report **before** Queue Master Balancer is executed:

```
Ayandas-MacBook-Pro:sbin ayandadube$ ./rabbitmqctl eval 'rabbit_queue_master_balancer:report().'
{ok,[{'rabbit_2@Ayandas-MacBook-Pro',{queues,0}},
{'rabbit@Ayandas-MacBook-Pro',{queues,21}},
{'rabbit_1@Ayandas-MacBook-Pro',{queues,0}}]}
```

- Report **after** Queue Master Balancer is executed:

```
Ayandas-MacBook-Pro:sbin ayandadube$ ./rabbitmqctl eval 'rabbit_queue_master_balancer:report().'
{ok,[{'rabbit_2@Ayandas-MacBook-Pro',{queues,7}},
{'rabbit@Ayandas-MacBook-Pro',{queues,6}},
{'rabbit_1@Ayandas-MacBook-Pro',{queues,8}}]}
```
The resulting distribution of queues across the cluster is near even, a state in which we have attained an acceptable level of queue equilibrium. Computing Queue Equilibrium for the node with the least number of queues, `'rabbit@Ayandas-MacBook-Pro'`, with 6 queues, yields the following percentage:

![inline fit](./priv/images/QueueMasterBalancerQueueEquilibriumResult.png)

which is satisfies condition/equation [i], being **>= 70%**.

## License and Copyright

(c) Erlang Solutions Ltd. 2017-2018
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
86 changes: 62 additions & 24 deletions src/rabbit_queue_master_balancer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
-behaviour(gen_fsm).

-export([start_link/0, load_queues/0, go/0, pause/0, continue/0,
info/0, reset/0, stop/0, shutdown/0]).
info/0, reset/0, report/0, stop/0, shutdown/0]).

-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
code_change/4, terminate/3]).

-export([idle/2, ready/2, balancing_queues/2, pause/2]).

-import(rabbit_misc, [pget/2]).

-include("rabbit_queue_master_balancer.hrl").

% ------------------------------------------------------------
Expand Down Expand Up @@ -71,6 +69,9 @@ continue() ->
info() ->
gen_fsm:sync_send_all_state_event(?MODULE, '$info').

report() ->
gen_fsm:sync_send_all_state_event(?MODULE, '$report').

reset() ->
gen_fsm:send_all_state_event(?MODULE, '$reset').

Expand Down Expand Up @@ -103,16 +104,19 @@ handle_sync_event('$load_queues', _From, _StateName, State) ->
error_logger:info_msg("Queue Master Balancer loading ~p queues~n",
[Count = length(Queues)]),
{reply, {ok, Count}, ?STATE_READY,
State#state{queues = Queues,
phase = ?STATE_READY}};
State#state{queues = Queues,
phase = ?STATE_READY}};
handle_sync_event('$info', _From, StateName, State) ->
Reply = to_info(State),
{reply, Reply, StateName, State}.
{reply, Reply, StateName, State};
handle_sync_event('$report', _From, StateName, State) ->
Reply = make_report(),
{reply, {ok, Reply}, StateName, State}.

handle_event('$reset', _StateName, State) ->
{next_state, ?STATE_IDLE, State#state{queues = [],
balanced = [],
phase = ?STATE_IDLE}};
{next_state, ?STATE_IDLE, State#state{queues = [],
balanced = [],
phase = ?STATE_IDLE}};
handle_event('$stop', _StateName, State) ->
{next_state, ?STATE_IDLE, State#state{phase = ?STATE_IDLE}}.

Expand Down Expand Up @@ -158,7 +162,7 @@ balancing_queues('$balance_queues',
phase = ?STATE_BALANCING_QUEUES,
balance_ts = ts()}};
balancing_queues('$pause', State = #state{queues = Queues, balanced = B}) ->
error_logger:info_msg("Queue Master Balancer paused. ~p queues pending",
error_logger:info_msg("Queue Master Balancer paused. ~p queues pending "
"and ~p queues balanced", [length(Queues), length(B)]),
{next_state, ?STATE_PAUSE, State#state{phase = ?STATE_PAUSE}};
balancing_queues(_Event, State = #state{phase = ?STATE_BALANCING_QUEUES}) ->
Expand Down Expand Up @@ -209,29 +213,42 @@ balance_queue(Q, Priority, PTD, SynchTimeout) ->
_:Reason -> {error, Reason}
end.

%% Re-shuffle live queues only
shuffle_queue({amqqueue, {resource, VHost, queue, QName},_,_,_,_,QPid,SPids,_,_,
%% 3.6.0 <--> 3.6.5
shuffle_queue(Q = {amqqueue, {resource, VHost, queue, QName},_,_,_,_,QPid,SPids,_,_,
Policy,_,_,live}, MinMaster, Priority, PTD, SynchTimeout) ->
execute_shuffle(VHost, QName, Policy, MinMaster, QPid, SPids, Priority, PTD,
SynchTimeout);
shuffle_queue({amqqueue, {resource, VHost, queue, QName},_,_,_,_,QPid,SPids,_,_,
shuffle(VHost, QName, Policy, MinMaster, QPid, SPids, Priority, PTD,
SynchTimeout, messages(Q));
shuffle_queue({amqqueue, {resource, _, queue, QName},_,_,_,_,_,_,_,_,_,_,_,_},
_MinMaster, _Priority, _PTD, _SynchTimeout) ->
{ok, QName};

%% 3.6.6 <--> 3.6.x
shuffle_queue(Q = {amqqueue, {resource, VHost, queue, QName},_,_,_,_,QPid,SPids,_,_,
Policy,_,_,live,_}, MinMaster, Priority, PTD, SynchTimeout) ->
execute_shuffle(VHost, QName, Policy, MinMaster, QPid, SPids, Priority, PTD,
SynchTimeout);
shuffle_queue(_, _MinMaster, _Priority, _PTD, _SynchTimeout) -> ok.

execute_shuffle(VHost, QN, Policy, MinMaster, QPid, SPids, Priority, PTD, SynchTimeout) ->
ok = rabbit_queue_master_balancer_sync:sync_mirrors(QPid),
shuffle(VHost, QName, Policy, MinMaster, QPid, SPids, Priority, PTD,
SynchTimeout, messages(Q));
shuffle_queue({amqqueue, {resource, _, queue, QName},_,_,_,_,_,_,_,_,_,_,_,_,_},
_MinMaster, _Priority, _PTD, _SynchTimeout) ->
{ok, QName};

%% Unsupported version
shuffle_queue(Q, _MinMaster, _Priority, _PTD, _SynchTimeout) ->
throw({unsupported_version, Q}).

shuffle(_, QN, _, _, _, _SPids = [], _, _, _, M) when M > 0 -> {ok, QN};
shuffle(VHost, QN, Policy, MinMaster, _QPid, SPids, Priority, PTD, SynchTimeout, _M) ->
Pattern = list_to_binary(lists:concat(["^", binary_to_list(QN), "$"])),
ok = rabbit_queue_master_balancer_sync:sync_mirrors(SPids, get_queue(VHost, QN)),
ok = policy_transition_delay(PTD),
ok = rabbit_policy:set(VHost, QN, Pattern,
[{<<"ha-mode">>, <<"nodes">>},{<<"ha-params">>,
[{<<"ha-mode">>, <<"nodes">>},{<<"ha-params">>,
[list_to_binary(atom_to_list(MinMaster))]}], Priority, <<"queues">>),
ok = policy_transition_delay(PTD),
ok = rabbit_policy:delete(VHost, QN),
ok = policy_transition_delay(PTD),
ok = reset_policy(Policy, PTD),
ok = policy_transition_delay(PTD),
ok = rabbit_queue_master_balancer_sync:sync_mirrors(get_queue(VHost, QN)),
ok = rabbit_queue_master_balancer_sync:sync_mirrors(SPids, get_queue(VHost, QN)),
try
rabbit_queue_master_balancer_sync:verify_sync(VHost, QN, SPids, SynchTimeout)
catch
Expand All @@ -255,12 +272,29 @@ reset_policy(Policy, PTD) ->
ok = policy_transition_delay(PTD),
ok = rabbit_policy:set(VHost, Name, Pattern, Def, Priority, ApplyTo).

fetch_queues() -> rabbit_amqqueue:list().
fetch_queues() -> rabbit_amqqueue:list().

make_report() ->
QNs = lists:foldl(fun(Q, Acc) -> [get_queue_node(Q)|Acc] end, [], fetch_queues()),
[count(N, QNs, 0) || N <- rabbit_mnesia:cluster_nodes(running)].

count(N, [], C) -> {N, {queues, C}};
count(N, [N|Rem], C) -> count(N, Rem, C+1);
count(N, [_|Rem], C) -> count(N, Rem, C).

get_queue(VHost, QN) ->
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(VHost, queue, QN)),
Q.

get_queue_node(Q) ->
node(case Q of
{amqqueue, {resource, _,queue,_},_,_,_,_,Pid,_,_,_,_,_,_,live} ->
Pid;
{amqqueue, {resource, _,queue,_},_,_,_,_,Pid,_,_,_,_,_,_,live,_} ->
Pid;
Other -> error({unsupported_version, Other})
end).

ts() ->
{Mega, Sec, USec} = os:timestamp(),
(Mega * 1000000 + Sec) * 1000 + round(USec/1000).
Expand All @@ -276,3 +310,7 @@ get_policy_trans_delay() ->
end.

policy_transition_delay(PTD) -> timer:sleep(PTD).

messages(Q) ->
[{messages, Messages}] = rabbit_amqqueue:info(Q, [messages]),
Messages.
11 changes: 6 additions & 5 deletions src/rabbit_queue_master_balancer_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

-module(rabbit_queue_master_balancer_sync).

-export([sync_mirrors/1, verify_sync/3, verify_sync/4]).
-export([sync_mirrors/2, verify_sync/3, verify_sync/4]).

-include("rabbit_queue_master_balancer.hrl").

% ---------------------------------------------------------------
-spec sync_mirrors(rabbit_types:amqqueue() | pid()) -> 'ok'.
-spec verify_sync(binary(), binary(), list()) -> 'ok'.
-spec verify_sync(binary(), binary(), list(), integer()) -> 'ok'.
-spec sync_mirrors(list(), rabbit_types:amqqueue() | pid()) -> 'ok'.
-spec verify_sync(binary(), binary(), list()) -> 'ok'.
-spec verify_sync(binary(), binary(), list(), integer()) -> 'ok'.
% ----------------------------------------------------------------

sync_mirrors(Q) ->
sync_mirrors([], _Q) -> ok;
sync_mirrors(_SPids, Q) ->
_Any = rabbit_amqqueue:sync_mirrors(Q),
ok.

Expand Down
Loading

0 comments on commit 123cc0d

Please sign in to comment.