Skip to content

Commit

Permalink
Merge pull request #4317 from esl/instrument/global-metrics
Browse files Browse the repository at this point in the history
Instrument: global probes and counters
  • Loading branch information
jacekwegr authored Jul 16, 2024
2 parents 8abfb09 + 5c7ea1c commit 783dbea
Show file tree
Hide file tree
Showing 21 changed files with 469 additions and 497 deletions.
2 changes: 1 addition & 1 deletion big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
{suites, "tests", graphql_http_upload_SUITE}.
{suites, "tests", graphql_server_SUITE}.
{suites, "tests", graphql_metric_SUITE}.
{suites, "tests", instrument_cets_SUITE}.
{suites, "tests", inbox_SUITE}.
{suites, "tests", inbox_extensions_SUITE}.
{suites, "tests", jingle_SUITE}.
Expand Down Expand Up @@ -108,6 +107,7 @@
{suites, "tests", sic_SUITE}.
{suites, "tests", smart_markers_SUITE}.
{suites, "tests", sm_SUITE}.
{suites, "tests", system_probes_SUITE}.
{suites, "tests", vcard_SUITE}.
{suites, "tests", vcard_simple_SUITE}.
{suites, "tests", websockets_SUITE}.
Expand Down
3 changes: 1 addition & 2 deletions big_tests/dynamic_domains.spec
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
{suites, "tests", graphql_server_SUITE}.
{suites, "tests", graphql_metric_SUITE}.

{suites, "tests", instrument_cets_SUITE}.

{suites, "tests", inbox_SUITE}.
{suites, "tests", inbox_extensions_SUITE}.

Expand Down Expand Up @@ -154,6 +152,7 @@

{suites, "tests", smart_markers_SUITE}.
{suites, "tests", sm_SUITE}.
{suites, "tests", system_probes_SUITE}.
{suites, "tests", vcard_SUITE}.
{suites, "tests", vcard_simple_SUITE}.
{suites, "tests", websockets_SUITE}.
Expand Down
4 changes: 1 addition & 3 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,7 @@ start_cets_discovery_with_file_backnend(Config) ->

stop_cets_discovery() ->
ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
rpc(mim(), mongoose_instrument_probe_cets, stop, []),
rpc(mim2(), mongoose_instrument_probe_cets, stop, []).
ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]).

stop_and_delete_cets_discovery() ->
stop_cets_discovery(),
Expand Down
33 changes: 24 additions & 9 deletions big_tests/tests/cluster_commands_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,18 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
init_per_group(clustered, Config) ->
Node2 = mim2(),
Config1 = add_node_to_cluster(Node2, Config),
case is_sm_distributed() of
true ->
escalus:create_users(Config1, escalus:get_users([alice, clusterguy]));
instrument_helper:start([{system_dist_data, #{}}]),
Config2 = mongoose_helper:backup_and_set_config_option(
Config1, [instrumentation, probe_interval], 1),
restart(mongoose_system_probes),
escalus:create_users(Config2, escalus:get_users([alice, clusterguy]));
{false, Backend} ->
ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
Node2 = mim2(),
remove_node_from_cluster(Node2, Config1),
{skip, nondistributed_sm}
end;
Expand All @@ -116,10 +119,13 @@ init_per_group(Group, _Config) when Group == clustering_two orelse Group == clus
init_per_group(_GroupName, Config) ->
escalus:create_users(Config).

end_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
end_per_group(clustered, Config) ->
escalus:delete_users(Config, escalus:get_users([alice, clusterguy])),
Node2 = mim2(),
remove_node_from_cluster(Node2, Config);
remove_node_from_cluster(Node2, Config),
mongoose_helper:restore_config_option(Config, [instrumentation, probe_interval]),
restart(mongoose_system_probes),
instrument_helper:stop();

%% Users are gone after mnesia cleaning
%% hence there is no need to delete them manually
Expand Down Expand Up @@ -164,10 +170,10 @@ end_per_testcase(CaseName, Config) ->
%% Message tests
%%--------------------------------------------------------------------

one_to_one_message(ConfigIn) ->
one_to_one_message(Config) ->
%% Given Alice connected to node one and ClusterGuy connected to node two
Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}],
Config = [{mongoose_metrics, Metrics} | ConfigIn],
#{send_oct := InitialSendOct, recv_oct := InitialRecvOct}
= rpc(mim(), mongoose_system_probes, probe, [system_dist_data, #{}]),
escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) ->
%% When Alice sends a message to ClusterGuy
Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>),
Expand All @@ -182,7 +188,12 @@ one_to_one_message(ConfigIn) ->
%% Then Alice also receives it
Stanza2 = escalus:wait_for_stanza(Alice, 5000),
escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2)
end).
end),
instrument_helper:wait_and_assert(
system_dist_data, #{},
fun(#{send_oct := SendOct, recv_oct := RecvOct}) ->
SendOct > InitialSendOct andalso RecvOct > InitialRecvOct
end).

%%--------------------------------------------------------------------
%% Manage cluster commands tests
Expand Down Expand Up @@ -469,3 +480,7 @@ wait_for_process_to_stop(Pid, Timeout) ->
after Timeout ->
ct:fail(wait_for_process_to_stop_timeout)
end.

restart(Module) ->
rpc(mim(), Module, stop, []),
rpc(mim(), Module, start, []).
58 changes: 20 additions & 38 deletions big_tests/tests/graphql_metric_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,23 @@ type_to_keys(<<"counter">>) ->
type_to_keys(<<"spiral">>) ->
[<<"one">>, <<"count">>];
type_to_keys(<<"gauge">>) ->
[<<"value">>];
type_to_keys(<<"merged_inet_stats">>) ->
[<<"connections">>, <<"recv_cnt">>, <<"recv_max">>, <<"recv_oct">>,
<<"send_cnt">>, <<"send_max">>, <<"send_oct">>, <<"send_pend">>];
type_to_keys(<<"rdbms_stats">>) ->
[<<"workers">>, <<"recv_cnt">>, <<"recv_max">>, <<"recv_oct">>,
<<"send_cnt">>, <<"send_max">>, <<"send_oct">>, <<"send_pend">>];
type_to_keys(<<"vm_stats_memory">>) ->
[<<"atom_used">>, <<"binary">>, <<"ets">>,
<<"processes_used">>, <<"system">>, <<"total">>];
type_to_keys(<<"vm_system_info">>) ->
[<<"ets_limit">>, <<"port_count">>, <<"port_limit">>,
<<"process_count">>, <<"process_limit">>];
type_to_keys(<<"probe_queues">>) ->
[<<"fsm">>, <<"regular">>, <<"total">>].
[<<"value">>].

cets_info_keys() ->
[<<"available_nodes">>, <<"unavailable_nodes">>,
<<"remote_nodes_without_disco">>, <<"joined_nodes">>,
<<"remote_nodes_with_unknown_tables">>, <<"remote_unknown_tables">>,
<<"remote_nodes_with_missing_tables">>, <<"remote_missing_tables">>,
<<"conflict_nodes">>, <<"conflict_tables">>,
<<"discovered_nodes">>, <<"discovery_works">>].
<<"remote_nodes_without_disco">>, <<"joined_nodes">>,
<<"remote_nodes_with_unknown_tables">>, <<"remote_unknown_tables">>,
<<"remote_nodes_with_missing_tables">>, <<"remote_missing_tables">>,
<<"conflict_nodes">>, <<"conflict_tables">>,
<<"discovered_nodes">>, <<"discovery_works">>].

get_by_name_global_erlang_metrics(Config) ->
%% Filter by name works
Result = get_metrics([<<"global">>, <<"erlang">>], Config),
Result = get_metrics([<<"global">>, <<"system_info">>], Config),
ParsedResult = get_ok_value([data, metric, getMetrics], Result),
Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]),
Info = maps:get([<<"global">>, <<"erlang">>, <<"system_info">>], Map),
%% VMSystemInfoMetric type
#{<<"type">> := <<"vm_system_info">>} = Info,
Info = maps:get([<<"global">>, <<"system_info">>, <<"port_count">>], Map),
#{<<"type">> := <<"counter">>} = Info,
check_metric_by_type(Info),
%% Other metrics are filtered out
undef = maps:get(roster_reads_key(), Map, undef).
Expand Down Expand Up @@ -178,30 +163,27 @@ get_metrics_for_specific_host_type(Config) ->
[_|_] = ParsedResult.

get_process_queue_length(Config) ->
Result = get_metrics([<<"global">>, <<"processQueueLengths">>], Config),
Result = get_metrics([<<"global">>, <<"system_process_queue_lengths">>], Config),
ParsedResult = get_ok_value([data, metric, getMetrics], Result),
Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]),
Lens = maps:get([<<"global">>, <<"processQueueLengths">>], Map),
%% ProbeQueuesMetric type
#{<<"type">> := <<"probe_queues">>} = Lens,
Lens = maps:get([<<"global">>, <<"system_process_queue_lengths">>, <<"total">>], Map),
#{<<"type">> := <<"counter">>} = Lens,
check_metric_by_type(Lens).

get_inet_stats(Config) ->
Result = get_metrics([<<"global">>, <<"data">>, <<"dist">>], Config),
Result = get_metrics([<<"global">>, <<"system_dist_data">>], Config),
ParsedResult = get_ok_value([data, metric, getMetrics], Result),
Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]),
Stats = maps:get([<<"global">>, <<"data">>, <<"dist">>], Map),
%% MergedInetStatsMetric type
#{<<"type">> := <<"merged_inet_stats">>} = Stats,
Stats = maps:get([<<"global">>, <<"system_dist_data">>, <<"connections">>], Map),
#{<<"type">> := <<"counter">>} = Stats,
check_metric_by_type(Stats).

get_vm_stats_memory(Config) ->
Result = get_metrics([<<"global">>], Config),
ParsedResult = get_ok_value([data, metric, getMetrics], Result),
Map = maps:from_list([{Name, X} || X = #{<<"name">> := Name} <- ParsedResult]),
Mem = maps:get([<<"global">>, <<"erlang">>, <<"memory">>], Map),
%% VMStatsMemoryMetric type
#{<<"type">> := <<"vm_stats_memory">>} = Mem,
Mem = maps:get([<<"global">>, <<"system_memory">>, <<"total">>], Map),
#{<<"type">> := <<"counter">>} = Mem,
check_metric_by_type(Mem).

get_cets_system(Config) ->
Expand Down Expand Up @@ -265,17 +247,17 @@ get_metrics_as_dicts_empty_args(Config) ->
[#{<<"key">> := <<"median">>, <<"value">> := Median}] = maps:get(RecvName, Map),
?assert(is_integer(Median)),
%% Empty keys
Result2 = get_metrics_as_dicts([<<"global">>, <<"erlang">>], [], Config),
Result2 = get_metrics_as_dicts([<<"global">>, <<"system_info">>], [], Config),
ParsedResult2 = get_ok_value([data, metric, getMetricsAsDicts], Result2),
?assertEqual(length(ParsedResult2), 2).
?assertEqual(6, length(ParsedResult2)).

get_metrics_as_dicts_empty_strings(Config) ->
%% Name is an empty string
Result = get_metrics_as_dicts([<<>>], [<<"median">>], Config),
ParsedResult = get_ok_value([data, metric, getMetricsAsDicts], Result),
[] = ParsedResult,
%% Key is an empty string
Result2 = get_metrics_as_dicts([<<"global">>, <<"erlang">>], [<<>>], Config),
Result2 = get_metrics_as_dicts([<<"global">>, <<"system_info">>], [<<>>], Config),
ParsedResult2 = get_ok_value([data, metric, getMetricsAsDicts], Result2),
[_|_] = ParsedResult2.

Expand Down
69 changes: 0 additions & 69 deletions big_tests/tests/instrument_cets_SUITE.erl

This file was deleted.

47 changes: 22 additions & 25 deletions big_tests/tests/metrics_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ init_per_suite(Config) ->
dynamic_modules:ensure_stopped(HostType, [mod_offline]),
Config2 = mongoose_helper:backup_and_set_config_option(Config1,
[instrumentation, probe_interval], 1),
restart_sm_probes(),
restart_probes(),
escalus:init_per_suite(Config2).

end_per_suite(Config) ->
dynamic_modules:restore_modules(Config),
escalus:end_per_suite(Config),
mongoose_helper:restore_config_option(Config, [instrumentation, probe_interval]),
restart_sm_probes().
restart_probes().

init_per_group(GroupName, Config) ->
metrics_helper:prepare_by_all_metrics_are_global(Config, GroupName =:= all_metrics_are_global).
Expand Down Expand Up @@ -234,32 +234,21 @@ session_counters(Config) ->
escalus:story
(Config, [{alice, 2}, {bob, 1}],
fun(_User11, _User12, _User2) ->
timer:sleep(timer:seconds(1)),
?assertEqual(3, fetch_global_gauge_value('sm_total_sessions.count', Config)),
?assertEqual(2, fetch_global_gauge_value('sm_unique_sessions.count', Config)),
?assertEqual(3, fetch_global_gauge_value('sm_node_sessions.count', Config))
wait_for_global_gauge_value('sm_total_sessions.count', 3, Config),
wait_for_global_gauge_value('sm_unique_sessions.count', 2, Config),
wait_for_global_gauge_value('sm_node_sessions.count', 3, Config)
end).

node_uptime(Config) ->
X = fetch_global_incrementing_gauge_value(nodeUpTime, Config),
timer:sleep(timer:seconds(1)),
Y = fetch_global_incrementing_gauge_value(nodeUpTime, Config),
?assertEqual(true, Y > X, [{counter, nodeUpTime}, {first, X}, {second, Y}]).
UpTime = fetch_global_incrementing_gauge_value('system_up_time.seconds', Config),
?assert(UpTime >= 0).

cluster_size(Config) ->
SingleNodeClusterState =
fetch_global_incrementing_gauge_value(clusterSize, Config),
?assertEqual(1, SingleNodeClusterState),

distributed_helper:add_node_to_cluster(Config),
TwoNodesClusterState =
fetch_global_incrementing_gauge_value(clusterSize, Config),
?assertEqual(2, TwoNodesClusterState),

distributed_helper:remove_node_from_cluster(Config),
SingleNodeClusterState2 =
fetch_global_incrementing_gauge_value(clusterSize, Config),
?assertEqual(1, SingleNodeClusterState2).
wait_for_global_gauge_value('mnesia_info.running_db_nodes', 1, Config),
distributed_helper:add_node_to_cluster(Config),
wait_for_global_gauge_value('mnesia_info.running_db_nodes', 2, Config),
distributed_helper:remove_node_from_cluster(Config),
wait_for_global_gauge_value('mnesia_info.running_db_nodes', 1, Config).

%%--------------------------------------------------------------------
%% Helpers
Expand Down Expand Up @@ -410,6 +399,12 @@ fetch_counter_value(Counter, _Config) ->

[HostTypeValue, HostTypeValueList, TotalValue, TotalValueList].

%% Wait until the two different API calls to get the metric return the same expected value.
%% The values could disagree temporarily while the gauge is being updated.
wait_for_global_gauge_value(Name, Value, Config) ->
mongoose_helper:wait_until(fun() -> fetch_global_gauge_values(Name, Config) end,
[Value, Value], #{name => Name}).

%% @doc Fetch counter that is static.
fetch_global_gauge_value(Counter, Config) ->
[Value, ValueList] = fetch_global_gauge_values(Counter, Config),
Expand Down Expand Up @@ -494,6 +489,8 @@ request(Method, Path, Server) ->
make_request(#{role => admin, method => Method, path => iolist_to_binary(Path),
return_headers => true, return_maps => true, server => Server}).

restart_sm_probes() ->
restart_probes() ->
rpc_call(ejabberd_sm, stop_probes, []),
rpc_call(ejabberd_sm, start_probes, []).
rpc_call(ejabberd_sm, start_probes, []),
rpc_call(mongoose_system_probes, stop, []),
rpc_call(mongoose_system_probes, start, []).
Loading

0 comments on commit 783dbea

Please sign in to comment.