Skip to content

Commit

Permalink
Add jittered exponential backoff (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
sylane authored Feb 24, 2025
1 parent e35e74e commit b4e28c8
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ filtering out all progress messages, as it wasn't working reliably.
road for namespaces. foo.bar.Buz is parsed into [foo, bar, <<"Buz">>] (if foo
and bar are already existing atoms, but 'Buz' is not).
- Upgrade grisp dependency to 2.8.0.
- Add jittered exponential backoff for reconnection.

## Fixed

Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{<<"gun">>,{pkg,<<"gun">>,<<"2.1.0">>},1},
{<<"jarl">>,
{git,"https://github.com/grisp/jarl.git",
{ref,"24d53cc7b521b126588be1f36afecd1d4eb59db3"}},
{ref,"8d20c3ca314aa5c448d5e16f52a2b887e11b26f7"}},
0},
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},0},
{<<"mapz">>,{pkg,<<"mapz">>,<<"2.4.0">>},1}]}.
Expand Down
47 changes: 32 additions & 15 deletions src/grisp_connect_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@

-define(GRISP_IO_PROTOCOL, <<"grisp-io-v1">>).
-define(FORMAT(FMT, ARGS), iolist_to_binary(io_lib:format(FMT, ARGS))).
-define(STD_TIMEOUT, 1000).
-define(CONNECT_TIMEOUT, 5000).
-define(ENV(KEY, GUARDS), fun() ->
case application:get_env(grisp_connect, KEY) of
Expand Down Expand Up @@ -156,26 +155,38 @@ idle(cast, connect, Data) ->
{next_state, waiting_ip, Data};
?HANDLE_COMMON.

waiting_ip(enter, _OldState, Data) ->
Delay = case Data#data.retry_count > 0 of
true -> ?STD_TIMEOUT;
false -> 0
end,
{keep_state_and_data, [{state_timeout, Delay, retry}]};
waiting_ip(state_timeout, retry, Data = #data{retry_count = RetryCount}) ->
% @doc State waiting_ip is used to check the device has an IP address.
% The first time entering this state, the check will be performed right away.
% If the device do not have an IP address, it will wait a fixed amount of time
% and check again, without incrementing the retry counter.
waiting_ip(enter, _OldState, _Data) ->
% First IP check do not have any delay
{keep_state_and_data, [{state_timeout, 0, check_ip}]};
waiting_ip(state_timeout, check_ip, Data) ->
case check_inet_ipv4() of
{ok, IP} ->
?LOG_INFO(#{event => checked_ip, ip => IP}),
{next_state, connecting, Data};
invalid ->
?LOG_DEBUG(#{event => waiting_ip}),
{repeat_state, Data#data{retry_count = RetryCount + 1,
last_error = no_ip_available}}
{keep_state_and_data, [{state_timeout, 1000, check_ip}]}
end;
?HANDLE_COMMON.

connecting(enter, _OldState, Data) ->
{keep_state, Data, [{state_timeout, 0, connect}]};
% @doc State connecting is used to establish a connection to grisp.io.
connecting(enter, _OldState, #data{retry_count = 0}) ->
{keep_state_and_data, [{state_timeout, 0, connect}]};
connecting(enter, _OldState, #data{retry_count = RetryCount}) ->
%% Calculate the connection delay in milliseconds with exponential backoff.
%% The delay is selected randomly between `1000' and
%% `2 ^ RETRY_COUNT - 1000' with a maximum value of `64000'.
%% Loosely inspired by https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
MinDelay = 1000,
MaxDelay = 64000,
MaxRandomDelay = min(MaxDelay, (1 bsl RetryCount) * 1000) - MinDelay,
Delay = MinDelay + rand:uniform(MaxRandomDelay),
?LOG_DEBUG("Scheduling connection attempt in ~w ms", [Delay]),
{keep_state_and_data, [{state_timeout, Delay, connect}]};
connecting(state_timeout, connect, Data = #data{conn = undefined}) ->
?LOG_INFO(#{description => <<"Connecting to grisp.io">>,
event => connecting}),
Expand Down Expand Up @@ -307,17 +318,23 @@ handle_connection_message(Data, Msg) ->
keep_state_and_data
end.

% @doc Setup the state machine to rety connecting to grisp.io if the maximum
% number of allowed atempts has not been reached.
% Otherwise, the state machine will give up and go back to idle.
reconnect(Data = #data{retry_count = RetryCount,
max_retries = MaxRetries,
last_error = LastError}, Reason)
when MaxRetries =/= infinity, RetryCount > MaxRetries ->
when MaxRetries =/= infinity, RetryCount >= MaxRetries ->
Error = case Reason of undefined -> LastError; E -> E end,
?LOG_ERROR(#{description => <<"Max retries reached, giving up connecting to grisp.io">>,
event => max_retries_reached, last_error => LastError}),
{next_state, idle, Data#data{retry_count = 0, last_error = Error}};
reconnect(Data = #data{retry_count = RetryCount,
last_error = LastError}, Reason) ->
reconnect(Data = #data{retry_count = RetryCount, last_error = LastError},
Reason) ->
Error = case Reason of undefined -> LastError; E -> E end,
% When reconnecting we always increment the retry counter, even if we
% where connected and it was reset to 0, the next step will always be
% retry number 1. It should never reconnect right away.
{next_state, waiting_ip,
Data#data{retry_count = RetryCount + 1, last_error = Error}}.

Expand Down
152 changes: 147 additions & 5 deletions test/grisp_connect_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ all() ->

init_per_testcase(TestCase, Config)
when TestCase =:= bad_client_version_test;
TestCase =:= bad_server_version_test ->
TestCase =:= bad_server_version_test;
TestCase =:= exponential_backoff_test ->
Config;
init_per_testcase(TestCase, Config) ->
CertDir = cert_dir(),
Expand All @@ -51,7 +52,8 @@ init_per_testcase(TestCase, Config) ->

end_per_testcase(TestCase, Config)
when TestCase =:= bad_client_version_test;
TestCase =:= bad_server_version_test ->
TestCase =:= bad_server_version_test;
TestCase =:= exponential_backoff_test ->
Config;
end_per_testcase(_, Config) ->
ok = application:stop(grisp_connect),
Expand All @@ -62,6 +64,140 @@ end_per_testcase(_, Config) ->

%--- Tests ---------------------------------------------------------------------

exponential_backoff_test(_) ->
CertDir = cert_dir(),
CallRef = make_ref(),
Self = self(),

% First we test the exponential backoff algorithm when failing to connect

Apps = grisp_connect_test_server:start(#{
cert_dir => CertDir,
init_callback => fun(Req, Opts) ->
Self ! {CallRef, init, os:timestamp()},
{ok, cowboy_req:reply(400, #{}, <<"Canceled">>, Req), Opts}
end}),

{ok, _} = application:ensure_all_started(grisp_emulation),
application:load(grisp_connect),
{ok, OldConectEnv} = application:get_env(grisp_connect, connect),
application:set_env(grisp_connect, connect, false),
{ok, _} = application:ensure_all_started(grisp_connect),

{T1, T2, T3, T4} = try
T1a = os:timestamp(),
grisp_connect:connect(),
T2a = receive
{CallRef, init, V2} -> V2
after 300 ->
erlang:error(timeout2)
end,
T3a = receive
{CallRef, init, V3} -> V3
after 2300 ->
erlang:error(timeout3)
end,
T4a = receive
{CallRef, init, V4} -> V4
after 4300 ->
erlang:error(timeout4)
end,
{T1a, T2a, T3a, T4a}
catch
C1:R1:S1 ->
ok = application:stop(grisp_connect),
application:set_env(grisp_connect, connect, OldConectEnv),
erlang:raise(C1, R1, S1)
after
grisp_connect_test_server:wait_disconnection(),
?assertEqual([], flush()),
grisp_connect_test_server:stop(Apps)
end,

% Then we allow the connection to succeed

grisp_connect_test_server:start(#{
cert_dir => CertDir,
init_callback => fun(Req, Opts) ->
Self ! {CallRef, init, os:timestamp()},
Req2 = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"grisp-io-v1">>, Req),
{cowboy_websocket, Req2, Opts}
end}),
try
T5 = receive
{CallRef, init, V5} -> V5
after 8300 ->
erlang:error(timeout5)
end,

?assertMatch(ok, wait_connection()),
D1 = timer:now_diff(T2, T1) div 1000,
D2 = timer:now_diff(T3, T2) div 1000,
D3 = timer:now_diff(T4, T3) div 1000,
D4 = timer:now_diff(T5, T4) div 1000,
?assert(D1 < 300, D1),
?assert(D2 > 900, D2), % 100 ms less for reliability
?assert(D2 < 100 + 1000 * 1 bsl 1, D2), % Extra 100 ms for reliability
?assert(D3 > 900, D3), % 100 ms less for reliability
?assert(D3 < 100 + 1000 * 1 bsl 2, D3), % Extra 100 ms for reliability
?assert(D4 > 900, D4), % 100 ms less for reliability
?assert(D4 < 100 + 1000 * 1 bsl 3, D4) % Extra 100 ms for reliability

catch
C2:R2:S2 ->
ok = application:stop(grisp_connect),
application:set_env(grisp_connect, connect, OldConectEnv),
erlang:raise(C2, R2, S2)
after
grisp_connect_test_server:stop(Apps),
?assertEqual([], flush())
end,

% Wait for grisp_connect to not be connected anymore
fun WaitNotConnected() ->
case grisp_connect:is_connected() of
false -> ok;
true ->
timer:sleep(50),
WaitNotConnected()
end
end(),
T6 = os:timestamp(),

% Finally we test the delay is reset when reconnecting

grisp_connect_test_server:start(#{
cert_dir => CertDir,
init_callback => fun(Req, Opts) ->
Self ! {CallRef, init, os:timestamp()},
{ok, cowboy_req:reply(400, #{}, <<"Canceled">>, Req), Opts}
end}),
try
T7 = receive
{CallRef, init, V7} -> V7
after 2300 ->
erlang:error(timeout7)
end,
T8 = receive
{CallRef, init, V8} -> V8
after 4300 ->
erlang:error(timeout8)
end,
D5 = timer:now_diff(T7, T6) div 1000,
D6 = timer:now_diff(T8, T7) div 1000,
?assert(D5 > 900, D5), % 100 ms less for reliability
?assert(D5 < 100 + 1000 * 1 bsl 1, D5), % Extra 100 ms for reliability
?assert(D6 > 900, D6), % 100 ms less for reliability
?assert(D6 < 100 + 1000 * 1 bsl 2, D6) % Extra 100 ms for reliability
after
ok = application:stop(grisp_connect),
application:set_env(grisp_connect, connect, OldConectEnv),
grisp_connect_test_server:wait_disconnection(),
?assertEqual([], flush()),
grisp_connect_test_server:stop(Apps)
end,
ok.

bad_client_version_test(_) ->
CertDir = cert_dir(),
Apps = grisp_connect_test_server:start(#{
Expand All @@ -70,12 +206,14 @@ bad_client_version_test(_) ->
try
{ok, _} = application:ensure_all_started(grisp_emulation),
application:load(grisp_connect),
{ok, OldMaxRetryEnv} = application:get_env(grisp_connect, ws_max_retries),
application:set_env(grisp_connect, ws_max_retries, 2),
{ok, _} = application:ensure_all_started(grisp_connect),
try
?assertMatch({error, ws_upgrade_failed}, wait_connection())
after
ok = application:stop(grisp_connect)
ok = application:stop(grisp_connect),
application:set_env(grisp_connect, ws_max_retries, OldMaxRetryEnv)
end
after
grisp_connect_test_server:wait_disconnection(),
Expand All @@ -92,13 +230,17 @@ bad_server_version_test(_) ->
try
{ok, _} = application:ensure_all_started(grisp_emulation),
application:load(grisp_connect),
{ok, OldMaxRetryEnv} = application:get_env(grisp_connect, ws_max_retries),
application:set_env(grisp_connect, ws_max_retries, 2),
{ok, _} = application:ensure_all_started(grisp_connect),
try
% There is no way to know the reason why gun closed the connection
?assertMatch({error, {closed, _}}, wait_connection())
% but we don't want jarl to crash because the protocol couldn't be
% negociated.
?assertMatch({error, normal}, wait_connection())
after
ok = application:stop(grisp_connect)
ok = application:stop(grisp_connect),
application:set_env(grisp_connect, ws_max_retries, OldMaxRetryEnv)
end
after
grisp_connect_test_server:wait_disconnection(),
Expand Down
12 changes: 6 additions & 6 deletions test/grisp_connect_reconnect_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ end_per_testcase(_, Config) ->
%--- Tests ---------------------------------------------------------------------

reconnect_on_gun_crash_test(_) ->
?assertMatch(ok, wait_connection(100)),
?assertMatch(ok, wait_connection()),
GunPid = connection_gun_pid(),
proc_lib:stop(GunPid),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection()).
?assertMatch(ok, wait_connection(2300)).

reconnect_on_disconnection_test(Config) ->
?assertMatch(ok, wait_connection()),
stop_cowboy(),
?assertMatch(ok, wait_disconnection()),
start_cowboy(#{cert_dir => cert_dir()}),
?assertMatch(ok, wait_connection(1200)),
?assertMatch(ok, wait_connection(2300)),
Config.

reconnect_on_ping_timeout_test(_) ->
Expand All @@ -73,16 +73,16 @@ reconnect_on_ping_timeout_test(_) ->
% Now decrease ping timeout so that the WS closes after just 1 second
application:set_env(grisp_connect, ws_ping_timeout, 1500),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(1200)),
?assertMatch(ok, wait_connection(2300)),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(1200)),
?assertMatch(ok, wait_connection(2300)),
?assertMatch(ok, wait_disconnection()).

reconnect_on_closed_frame_test(_) ->
?assertMatch(ok, wait_connection()),
close_websocket(),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(1200)).
?assertMatch(ok, wait_connection(2300)).


%--- Internal Functions --------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions test/grisp_connect_test_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ wait_disconnection() ->

%--- Websocket Callbacks -------------------------------------------------------

init(Req, Opts = #{init_callback := Fun}) when is_function(Fun, 2) ->
Fun(Req, Opts);
init(Req, Opts) ->
ExpVer = maps:get(expected_protocol, Opts, <<"grisp-io-v1">>),
SelVer = maps:get(selected_protocol, Opts, <<"grisp-io-v1">>),
Expand Down

0 comments on commit b4e28c8

Please sign in to comment.