Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CETS backend for mongoose_cluster_id (minimal logic) #4136

Merged
merged 9 commits into from
Oct 6, 2023
27 changes: 23 additions & 4 deletions big_tests/tests/distributed_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,39 @@ add_node_to_cluster(Config) ->
Node2 = mim2(),
add_node_to_cluster(Node2, Config).

has_mnesia(Node) ->
%% TODO We should check that Mnesia is configured here instead of is_running.
%% But it would require the issue fixed first:
%% "MIM-2067 Actually disable mnesia from starting in tests in pgsql_cets"
rpc(Node, mnesia, system_info, [is_running]) =:= yes.

add_node_to_cluster(Node, Config) ->
case has_mnesia(Node) of
true ->
add_node_to_mnesia_cluster(Node, Config);
false ->
ok
end,
Config.

add_node_to_mnesia_cluster(Node, Config) ->
ClusterMemberNode = maps:get(node, mim()),
ok = rpc(Node#{timeout => cluster_op_timeout()},
mongoose_cluster, join, [ClusterMemberNode]),
verify_result(Node, add),
Config.
verify_result(Node, add).

remove_node_from_cluster(_Config) ->
Node = mim2(),
remove_node_from_cluster(Node, _Config).

remove_node_from_cluster(Node, _Config) ->
ok = rpc(Node#{timeout => cluster_op_timeout()}, mongoose_cluster, leave, []),
verify_result(Node, remove),
case has_mnesia(Node) of
true ->
ok = rpc(Node#{timeout => cluster_op_timeout()}, mongoose_cluster, leave, []),
verify_result(Node, remove);
false ->
ok
end,
ok.

ctl_path(Node, Config) ->
Expand Down
9 changes: 7 additions & 2 deletions big_tests/tests/metrics_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ end_per_group(GroupName, Config) ->
metrics_helper:finalise_by_all_metrics_are_global(Config, GroupName =:= all_metrics_are_global).

init_per_testcase(cluster_size = CN, Config) ->
Config1 = ensure_nodes_not_clustered(Config),
escalus:init_per_testcase(CN, Config1);
case distributed_helper:has_mnesia(mim()) of
true ->
Config1 = ensure_nodes_not_clustered(Config),
escalus:init_per_testcase(CN, Config1);
false ->
{skip, "Requires Mnesia"}
end;
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

Expand Down
31 changes: 28 additions & 3 deletions big_tests/tests/persistent_cluster_id_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
id_persists_after_restart/1,
same_cluster_id_in_backend_and_mnesia/1,
backed_up_id_if_rdbms_is_added/1,
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost/1
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost/1,
clean_start_and_two_nodes/1
]).

-import(distributed_helper, [mim/0, mim2/0]).
Expand All @@ -39,7 +40,8 @@ tests() ->
id_persists_after_restart,
same_cluster_id_in_backend_and_mnesia,
backed_up_id_if_rdbms_is_added,
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost
cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost,
clean_start_and_two_nodes
].

groups() ->
Expand All @@ -52,7 +54,7 @@ groups() ->
%%% Overall setup/teardown
%%%===================================================================
init_per_suite(Config) ->
Config.
distributed_helper:require_rpc_nodes([mim]) ++ Config.

end_per_suite(_Config) ->
ok.
Expand Down Expand Up @@ -144,3 +146,26 @@ cluster_id_is_restored_to_mnesia_from_rdbms_if_mnesia_lost(_Config) ->
{ok, SecondID} = mongoose_helper:successful_rpc(
Node, mongoose_cluster_id, get_cached_cluster_id, []),
?assertEqual(FirstID, SecondID).

clean_start_and_two_nodes(_Config) ->
{ok, MnesiaID} = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, get_cached_cluster_id, []),
{ok, MnesiaID2} = mongoose_helper:successful_rpc(
mim2(), mongoose_cluster_id, get_cached_cluster_id, []),
%% Sanity check: IDs are in sync
?assertEqual(MnesiaID, MnesiaID2),
%% Remove an old ID from anywhere
ok = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, clean_table, []),
ok = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, clean_cache, []),
ok = mongoose_helper:successful_rpc(
mim2(), mongoose_cluster_id, clean_cache, []),
{ok, AfterRestartID} = mongoose_helper:successful_rpc(
mim(), mongoose_cluster_id, start, []),
{ok, AfterRestartID2} = mongoose_helper:successful_rpc(
mim2(), mongoose_cluster_id, start, []),
%% We've created a new ID
?assertNotEqual(AfterRestartID, MnesiaID),
%% Both nodes have the same ID
?assertEqual(AfterRestartID, AfterRestartID2).
2 changes: 1 addition & 1 deletion doc/operation-and-maintenance/MongooseIM-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Metrics specific to an extension, e.g. Message Archive Management, are described
| `[global, uniqueSessionCount]` | value | A number of unique users connected to a MongooseIM cluster (e.g. 3 sessions of the same user will be counted as 1 in this metric). |
| `[global, cache, unique_sessions_number]` | gauge | A cached value of `uniqueSessionCount`. It is automatically updated when a unique session count is calculated. |
| `[global, nodeUpTime]` | value | Node uptime. |
| `[global, clusterSize]` | value | A number of nodes in a MongooseIM cluster seen by a given MongooseIM node. |
| `[global, clusterSize]` | value | A number of nodes in a MongooseIM cluster seen by a given MongooseIM node (based on Mnesia). For CETS use `global.cets.system.joined_nodes` instead. |
| `[global, tcpPortsUsed]` | value | A number of open tcp connections. This should relate to the number of connected sessions and databases, as well as federations and http requests, in order to detect connection leaks. |
| `[global, processQueueLengths]` | probe | The number of queued messages in the internal message queue of every erlang process, and the internal queue of every fsm (ejabberd\_c2s). This is sampled every 30 seconds asynchronously. It is a good indicator of an overloaded system: if too many messages are queued at the same time, the system is not able to process the data at the rate it was designed for. |

Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"def7da28917fe7e21ad2b50d1b9939b1da5046cf"}},
{ref,"e3ad43f3836ea457bcb54e2f8266e9d7c32b014f"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
Expand Down
24 changes: 1 addition & 23 deletions src/ejabberd_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ do_start() ->
update_status_file(starting),
mongoose_config:start(),
mongoose_metrics:init(),
db_init(),
mongoose_internal_databases:init(),
mongoose_graphql:init(),
translate:start(),
ejabberd_commands:init(),
Expand Down Expand Up @@ -111,31 +111,9 @@ stop(_State) ->
%% That is why we call mnesia:stop() inside of db_init_mnesia() instead.
ok.


%%%
%%% Internal functions
%%%
db_init() ->
case mongoose_config:lookup_opt([internal_databases, mnesia]) of
{ok, _} ->
db_init_mnesia(),
mongoose_node_num_mnesia:init();
{error, _} ->
ok
end.

db_init_mnesia() ->
%% Mnesia should not be running at this point, unless it is started by tests.
%% Ensure Mnesia is stopped
mnesia:stop(),
case mnesia:system_info(extra_db_nodes) of
[] ->
mnesia:create_schema([node()]);
_ ->
ok
end,
application:start(mnesia, permanent),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).

-spec broadcast_c2s_shutdown_listeners() -> ok.
broadcast_c2s_shutdown_listeners() ->
Expand Down
72 changes: 59 additions & 13 deletions src/mongoose_cluster_id.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,35 @@
]).

% For testing purposes only
-export([clean_table/0]).
-export([clean_table/0, clean_cache/0]).

-ignore_xref([clean_table/0, get_backend_cluster_id/0]).
-ignore_xref([clean_table/0, clean_cache/0, get_backend_cluster_id/0]).

-record(mongoose_cluster_id, {key :: atom(), value :: cluster_id()}).
-type cluster_id() :: binary().
-type maybe_cluster_id() :: {ok, cluster_id()} | {error, any()}.
-type mongoose_backend() :: rdbms
| mnesia.
| mnesia
| cets.

-spec start() -> maybe_cluster_id().
start() ->
init_mnesia_cache(),
%% Consider rewriting this logic, so it does not block the starting process.
%% Currently, we have to do an SQL query each time we restart MongooseIM
%% application in the tests.
init_cache(),
Backend = which_backend_available(),
IntBackend = which_volatile_backend_available(),
maybe_prepare_queries(Backend),
CachedRes = get_cached_cluster_id(),
CachedRes = get_cached_cluster_id(IntBackend),
BackendRes = get_backend_cluster_id(),
case {CachedRes, BackendRes} of
{{ok, ID}, {ok, ID}} ->
{ok, ID};
{{ok, ID}, {error, _}} ->
set_new_cluster_id(ID, Backend);
{{error, _}, {ok, ID}} ->
set_new_cluster_id(ID, mnesia);
set_new_cluster_id(ID, IntBackend);
{{error, _}, {error, _}} ->
make_and_set_new_cluster_id();
{{ok, CachedID}, {ok, BackendID}} ->
Expand All @@ -45,14 +50,25 @@ start() ->
%% Get cached version
-spec get_cached_cluster_id() -> maybe_cluster_id().
get_cached_cluster_id() ->
IntBackend = which_volatile_backend_available(),
get_cached_cluster_id(IntBackend).

get_cached_cluster_id(mnesia) ->
T = fun() -> mnesia:read(mongoose_cluster_id, cluster_id) end,
case mnesia:transaction(T) of
{atomic, [#mongoose_cluster_id{value = ClusterID}]} ->
{ok, ClusterID};
{atomic, []} ->
{error, cluster_id_not_in_mnesia};
{error, cluster_id_not_in_cache};
{aborted, Reason} ->
{error, Reason}
end;
get_cached_cluster_id(cets) ->
case ets:lookup(cets_cluster_id, cluster_id) of
[{cluster_id, ClusterID}] ->
{ok, ClusterID};
[] ->
{error, cluster_id_not_in_cache}
end.

%% ====================================================================
Expand All @@ -74,14 +90,23 @@ make_and_set_new_cluster_id() ->
%% ====================================================================
%% Internal functions
%% ====================================================================
init_mnesia_cache() ->
init_cache() ->
init_cache(which_volatile_backend_available()).

init_cache(mnesia) ->
mnesia:create_table(mongoose_cluster_id,
[{type, set},
{record_name, mongoose_cluster_id},
{attributes, record_info(fields, mongoose_cluster_id)},
{ram_copies, [node()]}
]),
mnesia:add_table_copy(mongoose_cluster_id, node(), ram_copies).
mnesia:add_table_copy(mongoose_cluster_id, node(), ram_copies);
init_cache(cets) ->
cets:start(cets_cluster_id, #{}),
cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id),
%% We have to do it, because we want to read from across the cluster
%% in the start/0 function.
ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity).

-spec maybe_prepare_queries(mongoose_backend()) -> ok.
maybe_prepare_queries(mnesia) -> ok;
Expand All @@ -104,15 +129,23 @@ make_cluster_id() ->
-spec which_backend_available() -> mongoose_backend().
which_backend_available() ->
case mongoose_wpool:get_pool_settings(rdbms, global, default) of
undefined -> mnesia;
undefined -> which_volatile_backend_available();
_ -> rdbms
end.

which_volatile_backend_available() ->
case mongoose_config:get_opt(internal_databases) of
#{cets := _} ->
cets;
#{mnesia := _} ->
mnesia
end.

-spec set_new_cluster_id(cluster_id(), mongoose_backend()) -> ok | {error, any()}.
set_new_cluster_id(ID, rdbms) ->
try execute_cluster_insert_new(ID) of
{updated, 1} ->
set_new_cluster_id(ID, mnesia),
set_new_cluster_id(ID, which_volatile_backend_available()),
{ok, ID}
catch
Class:Reason:Stacktrace ->
Expand All @@ -129,7 +162,10 @@ set_new_cluster_id(ID, mnesia) ->
{ok, ID};
{aborted, Reason} ->
{error, Reason}
end.
end;
set_new_cluster_id(ID, cets) ->
cets:insert_serial(cets_cluster_id, {cluster_id, ID}),
{ok, ID}.

%% Get cluster ID
-spec get_backend_cluster_id(mongoose_backend()) -> maybe_cluster_id().
Expand All @@ -145,7 +181,9 @@ get_backend_cluster_id(rdbms) ->
{error, {Class, Reason}}
end;
get_backend_cluster_id(mnesia) ->
get_cached_cluster_id().
get_cached_cluster_id(mnesia);
get_backend_cluster_id(cets) ->
get_cached_cluster_id(cets).

clean_table() ->
clean_table(which_backend_available()).
Expand All @@ -166,3 +204,11 @@ clean_table(rdbms) ->
{error, {Class, Reason}}
end;
clean_table(_) -> ok.

clean_cache() ->
clean_cache(which_volatile_backend_available()).

clean_cache(mnesia) ->
mnesia:dirty_delete(mongoose_cluster_id, cluster_id);
clean_cache(cets) ->
cets:delete(cets_cluster_id, cluster_id).
29 changes: 29 additions & 0 deletions src/mongoose_internal_databases.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-module(mongoose_internal_databases).
-export([init/0]).

init() ->
case mongoose_config:lookup_opt([internal_databases, mnesia]) of
{ok, _} ->
init_mnesia(),
mongoose_node_num_mnesia:init();
{error, _} ->
%% Ensure mnesia is stopped when applying the test presets from the big tests.
%% So, we accidentually do not test with mnesia enabled, when starting the
%% test cases from the clean test build.
%% TODO Stopping here would break a lot of tests, stop here once tests are fixed.
% mnesia:stop(),
ok
end.

init_mnesia() ->
%% Mnesia should not be running at this point, unless it is started by tests.
%% Ensure Mnesia is stopped
mnesia:stop(),
case mnesia:system_info(extra_db_nodes) of
[] ->
mnesia:create_schema([node()]);
_ ->
ok
end,
application:start(mnesia, permanent),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).