Skip to content

Commit

Permalink
getting rid of ugly add_virtual_pubsub_host interface
Browse files Browse the repository at this point in the history
  • Loading branch information
DenysGonchar committed May 4, 2021
1 parent d536d35 commit a4a6480
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 110 deletions.
162 changes: 73 additions & 89 deletions big_tests/tests/push_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
-define(RPC_SPEC, distributed_helper:mim()).
-define(SESSION_KEY, publish_service).

-define(VIRTUAL_PUBSUB_DOMAIN,<<"virtual.domain">>).

%%--------------------------------------------------------------------
%% Suite configuration
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -92,11 +94,9 @@ suite() ->
init_per_suite(Config) ->
%% For mocking with unnamed functions
mongoose_helper:inject_module(?MODULE),
start_execution_queue(),
escalus:init_per_suite(Config).
end_per_suite(Config) ->
escalus_fresh:clean(),
stop_execution_queue(),
escalus:end_per_suite(Config).

init_per_group(disco, Config) ->
Expand Down Expand Up @@ -134,14 +134,13 @@ init_per_testcase(CaseName = push_notifications_not_listed_disco_when_not_availa
escalus:init_per_testcase(CaseName, Config);
init_per_testcase(CaseName, Config0) ->
Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}, {kate, 1}]),
Config = [{case_name, CaseName} | Config1],
Config = add_pubsub_jid([{case_name, CaseName} | Config1]),

case ?config(pubsub_host, Config0) of
virtual ->
add_virtual_host_to_pusher(pubsub_jid(Config)),
start_hook_listener();
start_hook_listener(Config);
_ ->
start_route_listener(CaseName)
start_route_listener(Config)
end,

escalus:init_per_testcase(CaseName, Config).
Expand All @@ -156,43 +155,9 @@ end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

%% --------------------- Helpers ------------------------
execution_queue() ->
receive
{Pid, Fun, Args} when is_pid(Pid), is_list(Args), is_function(Fun, length(Args)) ->
Ret = (catch apply(Fun, Args)),
Pid ! {ret, Ret},
execution_queue();
{Pid, stop} when is_pid(Pid) ->
Pid ! stopped
end.

start_execution_queue() ->
register(execution_queue, spawn(fun execution_queue/0)).

stop_execution_queue() ->
execution_queue ! {self(), stop},
receive
stopped -> ok
end.

queue(F, A) -> queue(F, A, 5000).

queue(F, A, Timeout) ->
execution_queue ! {self(), F, A},
receive
{ret, Ret} -> Ret
after Timeout ->
error({timeout, F, A, Timeout})
end.

add_virtual_host_to_pusher(VirtualHost) ->
%% this function is executed in parallel environment,
%% so to prevent race conditions queue rpc calls.
Args = [mod_event_pusher_push, add_virtual_pubsub_host, [<<"localhost">>, VirtualHost]],
queue(fun escalus_ejabberd:rpc/3, Args).

ensure_pusher_module_and_save_old_mods(Config) ->
PushOpts = [{backend, mongoose_helper:mnesia_or_rdbms_backend()}],
PushOpts = [{virtual_pubsub_hosts, [subhost_pattern(?VIRTUAL_PUBSUB_DOMAIN)]},
{backend, mongoose_helper:mnesia_or_rdbms_backend()}],
Host = ct:get_config({hosts, mim, domain}),
Config1 = dynamic_modules:save_modules(Host, Config),
PusherMod = {mod_event_pusher, [{backends, [{push, PushOpts}]}]},
Expand Down Expand Up @@ -488,7 +453,7 @@ pm_no_msg_notifications_if_not_enabled(Config) ->
become_unavailable(Bob),
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),

?assert(not truly(received_push(Config))),
?assert(not truly(received_push())),
ok
end).

Expand All @@ -503,7 +468,7 @@ pm_no_msg_notifications_if_user_online(Config) ->

escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),

?assert(not truly(received_push(Config))),
?assert(not truly(received_push())),
ok
end).

Expand All @@ -520,7 +485,7 @@ pm_msg_notify_if_user_offline(Config) ->

escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),

#{ payload := Payload } = received_push(Config),
#{ payload := Payload } = received_push(),
?assertMatch(<<"OH, HAI!">>, proplists:get_value(<<"last-message-body">>, Payload)),
?assertMatch(AliceJID,
proplists:get_value(<<"last-message-sender">>, Payload)),
Expand All @@ -542,7 +507,7 @@ pm_msg_notify_if_user_offline_with_publish_options(Config) ->

escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),

#{ publish_options := PublishOptions } = received_push(Config),
#{ publish_options := PublishOptions } = received_push(),

?assertMatch(<<"value1">>, proplists:get_value(<<"field1">>, PublishOptions)),
?assertMatch(<<"value2">>, proplists:get_value(<<"field2">>, PublishOptions)),
Expand All @@ -566,7 +531,7 @@ pm_msg_notify_stops_after_disabling(Config) ->

escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),

?assert(not received_push(Config)),
?assert(not received_push()),

ok
end).
Expand All @@ -593,7 +558,7 @@ pm_msg_notify_stops_after_removal(Config) ->
fun(Bob, Alice) ->
become_unavailable(Bob),
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
?assert(not truly(received_push(Config)))
?assert(not truly(received_push()))
end).

%%--------------------------------------------------------------------
Expand All @@ -614,7 +579,7 @@ muclight_no_msg_notifications_if_not_enabled(Config) ->

escalus:send(Bob, Stanza),

?assert(not truly(received_push(Config))),
?assert(not truly(received_push())),

ok
end).
Expand All @@ -635,7 +600,7 @@ muclight_no_msg_notifications_if_user_online(Config) ->
Stanza = escalus_stanza:groupchat_to(room_bin_jid(Room), Msg),
escalus:send(Bob, Stanza),

?assert(not truly(received_push(Config))),
?assert(not truly(received_push())),
ok
end).

Expand All @@ -656,7 +621,7 @@ muclight_msg_notify_if_user_offline(Config) ->
Stanza = escalus_stanza:groupchat_to(room_bin_jid(Room), Msg),
escalus:send(Bob, Stanza),

#{ payload := Payload } = received_push(Config),
#{ payload := Payload } = received_push(),

?assertMatch(Msg, proplists:get_value(<<"last-message-body">>, Payload)),
SenderId = <<(room_bin_jid(Room))/binary, "/" ,BobJID/binary>>,
Expand All @@ -683,7 +648,7 @@ muclight_msg_notify_if_user_offline_with_publish_options(Config) ->
Stanza = escalus_stanza:groupchat_to(room_bin_jid(Room), Msg),
escalus:send(Bob, Stanza),

#{ publish_options := PublishOptions } = received_push(Config),
#{ publish_options := PublishOptions } = received_push(),

?assertMatch(<<"value1">>, proplists:get_value(<<"field1">>, PublishOptions)),
?assertMatch(<<"value2">>, proplists:get_value(<<"field2">>, PublishOptions)),
Expand Down Expand Up @@ -711,7 +676,7 @@ muclight_msg_notify_stops_after_disabling(Config) ->
Stanza = escalus_stanza:groupchat_to(room_bin_jid(Room), Msg),
escalus:send(Bob, Stanza),

?assert(not truly(received_push(Config))),
?assert(not truly(received_push())),
ok
end).

Expand All @@ -720,13 +685,14 @@ muclight_msg_notify_stops_after_disabling(Config) ->
%% Functions that will be executed in MongooseIM context + helpers that set them up
%%--------------------------------------------------------------------

start_route_listener(CaseName) ->
start_route_listener(Config) ->
%% We put namespaces in the state to avoid injecting push_helper module to MIM as well
State = #{ pid => self(),
pub_options_ns => push_helper:ns_pubsub_pub_options(),
push_form_ns => push_helper:push_form_type() },
Handler = rpc(mongoose_packet_handler, new, [?MODULE, State]),
rpc(ejabberd_router, register_route, [atom_to_binary(CaseName, utf8), Handler]).
Domain = pubsub_domain(Config),
rpc(ejabberd_router, register_route, [Domain, Handler]).

process_packet(_Acc, _From, To, El, State) ->
#{ pid := TestCasePid, pub_options_ns := PubOptionsNS, push_form_ns := PushFormNS } = State,
Expand All @@ -745,9 +711,7 @@ process_packet(_Acc, _From, To, El, State) ->
case valid_ns_if_defined(PubOptionsNS, PublishOptions) andalso
valid_ns_if_defined(PushFormNS, Payload) of
true ->
TestCasePid ! #{ publish_options => PublishOptions,
payload => Payload,
pubsub_jid_bin => jid:to_binary(To) };
TestCasePid ! push_notification(jid:to_binary(To), Payload, PublishOptions);
false ->
%% We use publish_options0 and payload0 to avoid accidental match in received_push
%% even after some tests updates and refactors
Expand All @@ -772,26 +736,27 @@ valid_ns_if_defined(_, undefined) ->
valid_ns_if_defined(NS, FormProplist) ->
NS =:= proplists:get_value(<<"FORM_TYPE">>, FormProplist).

start_hook_listener() ->
start_hook_listener(Config) ->
TestCasePid = self(),
rpc(?MODULE, rpc_start_hook_handler, [TestCasePid]).
PubSubJID = pubsub_jid(Config),
rpc(?MODULE, rpc_start_hook_handler, [TestCasePid, PubSubJID]).

rpc_start_hook_handler(TestCasePid) ->
rpc_start_hook_handler(TestCasePid, PubSubJID) ->
Handler = fun(Acc, _Host, [PayloadMap], OptionMap) ->
try jid:to_binary(mongoose_acc:get(push_notifications, pubsub_jid, Acc)) of
PubsubJIDBin ->
TestCasePid ! #{ publish_options => maps:to_list(OptionMap),
payload => maps:to_list(PayloadMap),
pubsub_jid_bin => PubsubJIDBin },
Acc
catch
C:R:S ->
TestCasePid ! #{ event => handler_error,
class => C,
reason => R,
stacktrace => S },
Acc
end
try jid:to_binary(mongoose_acc:get(push_notifications, pubsub_jid, Acc)) of
PubSubJIDBin when PubSubJIDBin =:= PubSubJID->
TestCasePid ! push_notification(PubSubJIDBin,
maps:to_list(PayloadMap),
maps:to_list(OptionMap));
_ -> ok
catch
C:R:S ->
TestCasePid ! #{ event => handler_error,
class => C,
reason => R,
stacktrace => S }
end,
Acc
end,
ejabberd_hooks:add(push_notifications, <<"localhost">>, Handler, 50).

Expand All @@ -804,13 +769,9 @@ create_room(Room, [Owner | Members], Config) ->
create_room(Room, <<"muclight.", Domain/binary>>, Owner, Members,
Config, <<"v1">>).

received_push(Config) ->
ExpectedPubsubJIDBin = pubsub_jid(Config),
%% With parallel test cases execution we might receive notifications from other cases
%% so it's essential to filter by our pubsub JID
received_push() ->
receive
#{ pubsub_jid_bin := PubsubJIDBin } = Push when PubsubJIDBin =:= ExpectedPubsubJIDBin ->
Push
#{ push_notification := true } = Push -> Push
after
timer:seconds(5) ->
ct:pal("~p", [#{ result => nomatch, msg_inbox => process_info(self(), messages) }]),
Expand All @@ -819,22 +780,45 @@ received_push(Config) ->

truly(false) ->
false;
truly(undefined) ->
false;
truly(_) ->
truly(#{ push_notification := true }) ->
true.

push_notification(PubsubJID, Payload, PublishOpts) ->
#{push_notification => true, pubsub_jid_bin => PubsubJID,
publish_options => PublishOpts, payload => Payload}.

bare_jid(JIDOrClient) ->
ShortJID = escalus_client:short_jid(JIDOrClient),
list_to_binary(string:to_lower(binary_to_list(ShortJID))).

pubsub_jid(Config) ->
add_pubsub_jid(Config) ->
CaseName = proplists:get_value(case_name, Config),
CaseNameBin = atom_to_binary(CaseName, utf8),
case ?config(pubsub_host, Config) of
virtual -> <<CaseNameBin/binary, ".hyperion">>;
_ -> <<"pubsub@", CaseNameBin/binary>>
end.
NameSuffix = uniq_name_suffix(),
UniqID = <<CaseNameBin/binary, "_", NameSuffix/binary>>,
{PubSubNodeName, PubSubDomain} =
case ?config(pubsub_host, Config) of
virtual ->
%% unique node name, but preconfigured domain name
{UniqID, ?VIRTUAL_PUBSUB_DOMAIN};
_ ->
%% any node name, but unique domain. unique domain
%% is required to intercept safely message routing
{<<"pub-sub">>, UniqID}
end,
PubSubJID = <<PubSubNodeName/binary, "@", PubSubDomain/binary>>,
[{pubsub_jid, PubSubJID}, {pubsub_domain, PubSubDomain} | Config].

uniq_name_suffix() ->
{_, S, US} = erlang:timestamp(),
L = lists:flatten([integer_to_list(S rem 100), ".", integer_to_list(US)]),
list_to_binary(L).

pubsub_domain(Config) ->
proplists:get_value(pubsub_domain, Config).

pubsub_jid(Config) ->
proplists:get_value(pubsub_jid, Config).

room_name(Config) ->
CaseName = proplists:get_value(case_name, Config),
Expand Down
26 changes: 5 additions & 21 deletions src/event_pusher/mod_event_pusher_push.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
-export([is_virtual_pubsub_host/3]).
-export([disable_node/3]).

%% Debug & testing
-export([add_virtual_pubsub_host/2]).

%% Types
-type publish_service() :: {PubSub :: jid:jid(), Node :: pubsub_node(), Form :: form()}.
-type pubsub_node() :: binary().
Expand Down Expand Up @@ -153,7 +150,8 @@ iq_handler(_From, _To, Acc, IQ = #iq{type = get, sub_el = SubEl}) ->
iq_handler(From, _To, Acc, IQ = #iq{type = set, sub_el = Request}) ->
Res = case parse_request(Request) of
{enable, BarePubSubJID, Node, FormFields} ->
maybe_enable_node(From, BarePubSubJID, Node, FormFields, IQ);
enable_node(From, BarePubSubJID, Node, FormFields, IQ),
IQ#iq{type = result, sub_el = []};
{disable, BarePubsubJID, Node} ->
ok = disable_node(From, BarePubsubJID, Node),
IQ#iq{type = result, sub_el = []};
Expand Down Expand Up @@ -188,19 +186,6 @@ is_virtual_pubsub_host(HostType, RecipientDomain, VirtPubsubDomain) ->
end,
lists:any(PredFn, Templates).

%%--------------------------------------------------------------------
%% Debug & testing API
%%--------------------------------------------------------------------
-spec add_virtual_pubsub_host(Host :: mongooseim:host_type(),
VirtualHostTemplate :: binary()) -> any().
add_virtual_pubsub_host(HostType, VirtualHostTemplate) ->
%% add_virtual_pubsub_host/2 is non-atomic interface, so execution in parallel
%% environment can result in race conditions.
Patterns = gen_mod:get_module_opt(HostType, ?MODULE, virtual_pubsub_hosts, []),
NewPattern = mongoose_subdomain_utils:make_subdomain_pattern(VirtualHostTemplate),
NewPatterns = lists:usort([NewPattern | Patterns]),
gen_mod:set_module_opt(HostType, ?MODULE, virtual_pubsub_hosts, NewPatterns).

%%--------------------------------------------------------------------
%% local functions
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -271,11 +256,10 @@ parse_form(Form) ->
invalid_form
end.

-spec maybe_enable_node(jid:jid(), jid:jid(), pubsub_node(), form(), jlib:iq()) -> jlib:iq().
maybe_enable_node(From, BarePubSubJID, Node, FormFields, IQ) ->
-spec enable_node(jid:jid(), jid:jid(), pubsub_node(), form(), jlib:iq()) -> jlib:iq().
enable_node(From, BarePubSubJID, Node, FormFields, IQ) ->
ok = mod_event_pusher_push_backend:enable(jid:to_bare(From), BarePubSubJID, Node, FormFields),
store_session_info(From, {BarePubSubJID, Node, FormFields}),
IQ#iq{type = result, sub_el = []}.
store_session_info(From, {BarePubSubJID, Node, FormFields}).

-spec store_session_info(jid:jid(), publish_service()) -> any().
store_session_info(Jid, Service) ->
Expand Down

0 comments on commit a4a6480

Please sign in to comment.