diff --git a/big_tests/tests/pubsub_SUITE.erl b/big_tests/tests/pubsub_SUITE.erl index 56dcb40debd..5c03f94d69b 100644 --- a/big_tests/tests/pubsub_SUITE.erl +++ b/big_tests/tests/pubsub_SUITE.erl @@ -179,9 +179,11 @@ last_item_cache_tests() -> %%-------------------------------------------------------------------- init_per_suite(Config) -> + instrument_helper:start(instrument_helper:declared_events(mod_pubsub)), escalus:init_per_suite(Config). end_per_suite(Config) -> + instrument_helper:stop(), escalus_fresh:clean(), escalus:end_per_suite(Config). @@ -352,7 +354,17 @@ subscribe_unsubscribe_test(Config) -> pubsub_tools:subscribe(Bob, Node, [{jid_type, bare}]), pubsub_tools:unsubscribe(Bob, Node, [{jid_type, bare}]), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + BobJid = escalus_utils:get_jid(Bob), + instrument_helper:assert(mod_pubsub_set_subscribe, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso BobJid =:= jid:to_binary(From) + end), + instrument_helper:assert(mod_pubsub_set_unsubscribe, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso BobJid =:= jid:to_binary(From) + end) end). subscribe_options_test(Config) -> @@ -379,7 +391,11 @@ subscribe_options_test(Config) -> pubsub_tools:get_subscription_options(Bob, {node_addr(), NodeName}, [{expected_result, BobOpts}]), - pubsub_tools:delete_node(Alice, Node, []) + BobJid = escalus_utils:get_jid(Bob), + instrument_helper:assert(mod_pubsub_get_options, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso BobJid =:= jid:to_binary(From) + end) end). subscribe_options_deliver_option_test(Config) -> @@ -428,7 +444,13 @@ subscribe_options_separate_request_test(Config) -> [{expected_result, [OptionAfterUpdate]}]) || Client <- Clients ], - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_set_options, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). publish_test(Config) -> @@ -443,7 +465,13 @@ publish_test(Config) -> Node = pubsub_node(), pubsub_tools:publish(Alice, <<"item1">>, Node, []), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_set_publish, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). publish_with_max_items_test(Config) -> @@ -541,7 +569,13 @@ request_all_items_test(Config) -> [{expected_result, [<<"item2">>, <<"item1">>]}]), %% TODO check ordering (although XEP does not specify this) - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + BobJid = escalus_utils:get_jid(Bob), + instrument_helper:assert(mod_pubsub_get_items, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso BobJid =:= jid:to_binary(From) + end) end). request_particular_item_test(Config) -> @@ -633,7 +667,13 @@ purge_all_items_test(Config) -> pubsub_tools:get_all_items(Bob, Node, [{expected_result, []}]), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_set_purge, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). publish_only_retract_items_scope_test(Config) -> @@ -659,7 +699,13 @@ publish_only_retract_items_scope_test(Config) -> pubsub_tools:retract_item(Bob, Node, <<"item2">>, [{expected_error_type, <<"auth">>}]), pubsub_tools:get_all_items(Alice, Node, [{expected_result, [<<"item2">>]}]), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + BobJid = escalus_utils:get_jid(Bob), + instrument_helper:assert(mod_pubsub_set_retract, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso BobJid =:= jid:to_binary(From) + end) end). @@ -694,7 +740,14 @@ retrieve_default_configuration_test(Config) -> fun(Alice) -> NodeAddr = node_addr(), pubsub_tools:get_default_configuration(Alice, NodeAddr, - [{expected_result, default_config()}]) + [{expected_result, default_config()}]), + + AliceJid = escalus_utils:get_jid(Alice), + Measurements = instrument_helper:wait_for(mod_pubsub_get_default, #{host_type => domain()}), + instrument_helper:assert(mod_pubsub_get_default, #{host_type => domain()}, Measurements, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). retrieve_configuration_test(Config) -> @@ -708,7 +761,13 @@ retrieve_configuration_test(Config) -> NodeConfig = pubsub_tools:get_configuration(Alice, Node, []), verify_config_fields(NodeConfig), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_get_configure, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). set_configuration_test(Config) -> @@ -724,7 +783,13 @@ set_configuration_test(Config) -> [{response_timeout, 10000}]), pubsub_tools:get_configuration(Alice, Node, [{expected_result, ValidNodeConfig}]), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_set_configure, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). set_configuration_errors_test(Config) -> @@ -935,7 +1000,13 @@ get_affiliations_test(Config) -> verify_affiliations(pubsub_tools:get_affiliations(Alice, Node, []), [{Alice, <<"owner">>}]), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_get_affiliations, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). add_publisher_and_member_test(Config) -> @@ -960,7 +1031,21 @@ add_publisher_and_member_test(Config) -> pubsub_tools:publish(Bob, <<"item1">>, Node, []), pubsub_tools:receive_item_notification(Kate, <<"item1">>, Node, []), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_set_create, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end), + instrument_helper:assert(mod_pubsub_set_affiliations, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end), + Measurements = instrument_helper:wait_for(mod_pubsub_set_delete, #{host_type => domain()}), + instrument_helper:assert(mod_pubsub_set_delete, #{host_type => domain()}, Measurements, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). swap_owners_test(Config) -> @@ -1033,7 +1118,13 @@ retrieve_user_subscriptions_test(Config) -> pubsub_tools:get_user_subscriptions(Alice, node_addr(), [{expected_result, []}]), pubsub_tools:delete_node(Alice, Node, []), - pubsub_tools:delete_node(Alice, Node2, []) + pubsub_tools:delete_node(Alice, Node2, []), + + BobJid = escalus_utils:get_jid(Bob), + instrument_helper:assert(mod_pubsub_get_subscriptions, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso BobJid =:= jid:to_binary(From) + end) end). retrieve_node_subscriptions_test(Config) -> @@ -1097,7 +1188,18 @@ modify_node_subscriptions_test(Config) -> ModSubs = [{Geralt, bare, <<"subscribed">>}, {Geralt, full, <<"subscribed">>}], pubsub_tools:get_node_subscriptions(Alice, Node, [{expected_result, ModSubs}]), - pubsub_tools:delete_node(Alice, Node, []) + pubsub_tools:delete_node(Alice, Node, []), + + BobJid = escalus_utils:get_jid(Bob), + instrument_helper:assert(mod_pubsub_set_subscriptions, #{host_type => domain()}, + fun(#{errors := 1, jid := From}) -> + BobJid =:= jid:to_binary(From) + end), + AliceJid = escalus_utils:get_jid(Alice), + instrument_helper:assert(mod_pubsub_set_subscriptions, #{host_type => domain()}, + fun(#{count := 1, jid := From, time := T}) -> + T >= 0 andalso AliceJid =:= jid:to_binary(From) + end) end). process_subscription_requests_test(Config) -> diff --git a/src/pubsub/mod_pubsub.erl b/src/pubsub/mod_pubsub.erl index a33ca713b9a..189998a5834 100644 --- a/src/pubsub/mod_pubsub.erl +++ b/src/pubsub/mod_pubsub.erl @@ -101,7 +101,7 @@ %% API and gen_server callbacks -export([start_link/2, start/2, stop/1, deps/2, init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, instrumentation/1]). %% Config callbacks -export([config_spec/0, process_pep_mapping/1]). @@ -279,7 +279,6 @@ start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, 1000, worker, [?MODULE]}, - ensure_metrics(Host), start_pool(Host, Opts), ejabberd_sup:start_child(ChildSpec). @@ -1313,18 +1312,16 @@ iq_pubsub(Host, ServerHost, From, IQType, #xmlel{children = SubEls} = QueryEl, Lang, Access, Plugins) -> case xml:remove_cdata(SubEls) of [#xmlel{name = Name} = ActionEl | _] -> - report_iq_action_metrics_before_result(ServerHost, IQType, Name), Node = exml_query:attr(ActionEl, <<"node">>, <<>>), - {Time, Result} = timer:tc(fun iq_pubsub_action/6, - [IQType, Name, Host, Node, From, - #{server_host => ServerHost, - plugins => Plugins, - access => Access, - action_el => ActionEl, - query_el => QueryEl, - lang => Lang}]), - report_iq_action_metrics_after_return(ServerHost, Result, Time, IQType, Name), - Result; + mongoose_instrument:span(event_name(IQType, Name), #{host_type => ServerHost}, fun iq_pubsub_action/6, + [IQType, Name, Host, Node, From, + #{server_host => ServerHost, + plugins => Plugins, + access => Access, + action_el => ActionEl, + query_el => QueryEl, + lang => Lang}], + fun(Time, Result) -> ip_action_result_to_measurements(Time, Result, From) end); Other -> ?LOG_INFO(#{what => pubsub_bad_request, exml_packet => Other}), {error, mongoose_xmpp_errors:bad_request()} @@ -1356,12 +1353,11 @@ iq_pubsub_action(IQType, Name, Host, Node, From, ExtraArgs) -> {error, mongoose_xmpp_errors:feature_not_implemented()} end. -ensure_metrics(Host) -> - [mongoose_metrics:ensure_metric(Host, metric_name(IQType, Name, MetricSuffix), Type) || - {IQType, Name} <- all_metrics(), - {MetricSuffix, Type} <- [{count, spiral}, - {errors, spiral}, - {time, histogram}]]. +-spec instrumentation(mongooseim:host_type()) -> [mongoose_instrument:spec()]. +instrumentation(HostType) -> + Measurements = #{count => spiral, errors => spiral, time => histogram}, + [{event_name(IQType, Name), #{host_type => HostType}, #{metrics => Measurements}} || + {IQType, Name} <- all_metrics()]. all_metrics() -> [{set, create}, @@ -1396,23 +1392,48 @@ iq_action_to_metric_name(<<"purge">>) -> purge; iq_action_to_metric_name(<<"subscriptions">>) -> subscriptions; iq_action_to_metric_name(<<"affiliations">>) -> affiliations. - -metric_name(IQType, Name, MetricSuffix) when is_binary(Name) -> +event_name(IQType, Name) when is_binary(Name) -> NameAtom = iq_action_to_metric_name(Name), - metric_name(IQType, NameAtom, MetricSuffix); -metric_name(IQType, Name, MetricSuffix) when is_atom(Name) -> - [pubsub, IQType, Name, MetricSuffix]. - -report_iq_action_metrics_before_result(Host, IQType, Name) -> - mongoose_metrics:update(Host, metric_name(IQType, Name, count), 1). - -report_iq_action_metrics_after_return(Host, Result, Time, IQType, Name) -> - case Result of - {error, _} -> - mongoose_metrics:update(Host, metric_name(IQType, Name, erros), 1); - _ -> - mongoose_metrics:update(Host, metric_name(IQType, Name, time), Time) - end. + event_name(IQType, NameAtom); +event_name(set, create) -> + mod_pubsub_set_create; +event_name(set, publish) -> + mod_pubsub_set_publish; +event_name(set, retract) -> + mod_pubsub_set_retract; +event_name(set, subscribe) -> + mod_pubsub_set_subscribe; +event_name(set, unsubscribe) -> + mod_pubsub_set_unsubscribe; +event_name(get, items) -> + mod_pubsub_get_items; +event_name(get, options) -> + mod_pubsub_get_options; +event_name(set, options) -> + mod_pubsub_set_options; +event_name(get, configure) -> + mod_pubsub_get_configure; +event_name(set, configure) -> + mod_pubsub_set_configure; +event_name(get, default) -> + mod_pubsub_get_default; +event_name(set, delete) -> + mod_pubsub_set_delete; +event_name(set, purge) -> + mod_pubsub_set_purge; +event_name(get, subscriptions) -> + mod_pubsub_get_subscriptions; +event_name(set, subscriptions) -> + mod_pubsub_set_subscriptions; +event_name(get, affiliations) -> + mod_pubsub_get_affiliations; +event_name(set, affiliations) -> + mod_pubsub_set_affiliations. + +ip_action_result_to_measurements(_Time, {error, _}, From) -> + #{errors => 1, jid => From}; +ip_action_result_to_measurements(Time, _Result, From) -> + #{time => Time, count => 1, jid => From}. iq_pubsub_set_create(Host, Node, From, #{server_host := ServerHost, access := Access, plugins := Plugins, @@ -1522,15 +1543,13 @@ iq_pubsub_owner(Host, ServerHost, From, IQType, SubEl, Lang) -> Action = xml:remove_cdata(SubEls), case Action of [#xmlel{name = Name} = ActionEl] -> - report_iq_action_metrics_before_result(ServerHost, IQType, Name), Node = exml_query:attr(ActionEl, <<"node">>, <<>>), - {Time, Result} = timer:tc(fun iq_pubsub_owner_action/6, - [IQType, Name, Host, From, Node, - #{server_host => ServerHost, - action_el => ActionEl, - lang => Lang}]), - report_iq_action_metrics_after_return(ServerHost, Result, Time, IQType, Name), - Result; + mongoose_instrument:span(event_name(IQType, Name), #{host_type => ServerHost}, fun iq_pubsub_owner_action/6, + [IQType, Name, Host, From, Node, + #{server_host => ServerHost, + action_el => ActionEl, + lang => Lang}], + fun(Time, Result) -> ip_action_result_to_measurements(Time, Result, From) end); _ -> ?LOG_INFO(#{what => pubsub_too_many_actions, exml_packet => Action}), {error, mongoose_xmpp_errors:bad_request()}