Skip to content

Commit

Permalink
Enable global_distrib and fix stream_mgmt timeouts
Browse files Browse the repository at this point in the history
Note that for `gen_statem`, timeouts of the type `{timeout, Time, Msg}` are
considered event timeouts, and are canceled upon any event delivered before
them, so they can get lost for example upon the process receiving any other
routed message. So we need to name them, `{{timeout, ?MODULE}, Time, Msg}`.
  • Loading branch information
NelsonVides committed Jan 4, 2023
1 parent 2e94f64 commit 270412a
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 7 deletions.
2 changes: 1 addition & 1 deletion big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
{suites, "tests", mod_event_pusher_http_SUITE}.
{suites, "tests", mod_event_pusher_rabbit_SUITE}.
{suites, "tests", mod_event_pusher_sns_SUITE}.
% {suites, "tests", mod_global_distrib_SUITE}.
{suites, "tests", mod_global_distrib_SUITE}.
{suites, "tests", mod_http_upload_SUITE}.
{suites, "tests", mod_ping_SUITE}.
{suites, "tests", mod_time_SUITE}.
Expand Down
1 change: 0 additions & 1 deletion big_tests/tests/mod_global_distrib_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,6 @@ do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResum

%% Trigger rerouting
ok = rpc(asia_node, sys, resume, [C2sPid]),
C2sPid ! resume_timeout,

%% Let C2sPid to process the message and reroute (and die finally, poor little thing)
mongoose_helper:wait_for_pid_to_die(C2sPid),
Expand Down
4 changes: 3 additions & 1 deletion src/global_distrib/mod_global_distrib_receiver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ start_listeners() ->
RetriesLeft :: non_neg_integer()) -> any().
start_listener({Addr, Port} = Ref, RetriesLeft) ->
?LOG_INFO(#{what => gd_start_listener, address => Addr, port => Port}),
case ranch:start_listener(Ref, ranch_tcp, #{num_acceptors => 10, ip => Addr, port => Port}, ?MODULE, []) of
SocketOpts = [{ip, Addr}, {port, Port}],
RanchOpts = #{max_connections => infinity, num_acceptors => 10, socket_opts => SocketOpts},
case ranch:start_listener(Ref, ranch_tcp, RanchOpts, ?MODULE, []) of
{ok, _} -> ok;
{error, eaddrinuse} when RetriesLeft > 0 ->
?LOG_ERROR(#{what => gd_start_listener_failed, address => Addr, port => Port,
Expand Down
8 changes: 4 additions & 4 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ handle_buffer_and_ack(Acc, C2SState, Jid, #sm_state{buffer = Buffer, buffer_max
NewBufferSize = BufferSize + 1,
MaybeActions = case is_buffer_full(NewBufferSize, BufferMax) of
true ->
{timeout, ?CONSTRAINT_CHECK_TIMEOUT, check_buffer_full};
{{timeout, ?MODULE}, ?CONSTRAINT_CHECK_TIMEOUT, check_buffer_full};
false ->
[]
end,
Expand Down Expand Up @@ -290,7 +290,7 @@ foreign_event(Acc, #{c2s_data := StateData,
error ->
{stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, bad_stream_management_request)}
end;
foreign_event(Acc, #{c2s_data := StateData, event_type := timeout, event_content := check_buffer_full}, _Extra) ->
foreign_event(Acc, #{c2s_data := StateData, event_type := {timeout, ?MODULE}, event_content := check_buffer_full}, _Extra) ->
#sm_state{buffer_size = BufferSize, buffer_max = BufferMax} = get_mod_state(StateData),
case is_buffer_full(BufferSize, BufferMax) of
true ->
Expand All @@ -316,7 +316,7 @@ handle_user_stopping(Acc, #{c2s_data := StateData}, #{host_type := HostType}) ->
SmState ->
Timeout = get_resume_timeout(HostType),
NewSmState = notify_unacknowledged_messages(Acc, StateData, SmState),
Actions = [{push_callback_module, ?MODULE}, {timeout, Timeout, resume_timeout}, hibernate],
Actions = [{push_callback_module, ?MODULE}, {{timeout, ?MODULE}, Timeout, resume_timeout}, hibernate],
ToAcc = [{c2s_state, ?EXT_C2S_STATE(resume_session)}, {actions, Actions}, {state_mod, {?MODULE, NewSmState}}],
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}
end.
Expand Down Expand Up @@ -767,7 +767,7 @@ handle_event({call, From}, {stream_mgmt_resume, H}, ?EXT_C2S_STATE(resume_sessio
error ->
{stop, {shutdown, resumed}}
end;
handle_event(timeout, resume_timeout, ?EXT_C2S_STATE(resume_session), StateData) ->
handle_event({timeout, ?MODULE}, resume_timeout, ?EXT_C2S_STATE(resume_session), StateData) ->
{stop, {shutdown, ?MODULE}, StateData};
handle_event(EventType, EventContent, C2SState, StateData) ->
mongoose_c2s:handle_event(EventType, EventContent, C2SState, StateData).
Expand Down

0 comments on commit 270412a

Please sign in to comment.