Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaning logic for discovery_nodes table - using database timestamp #4049

Merged
merged 5 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(cets_disco_SUITE).
-compile([export_all, nowarn_export_all]).

-import(distributed_helper, [mim/0, rpc/4]).
-import(distributed_helper, [mim/0, mim2/0, rpc/4]).
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved
-include_lib("common_test/include/ct.hrl").

%%--------------------------------------------------------------------
Expand All @@ -19,10 +19,11 @@ file_cases() ->
[file_backend].

rdbms_cases() ->
[rdbms_backend].
[rdbms_backend,
rdbms_backend_supports_auto_cleaning].

suite() ->
escalus:suite().
distributed_helper:require_rpc_nodes([mim, mim2]) ++ escalus:suite().

%%--------------------------------------------------------------------
%% Init & teardown
Expand All @@ -45,9 +46,15 @@ init_per_group(_, Config) ->
end_per_group(_, Config) ->
Config.

init_per_testcase(rdbms_backend_supports_auto_cleaning = CaseName, Config) ->
mock_timestamp(mim(), month_ago()) ++
escalus:init_per_testcase(CaseName, Config);
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

end_per_testcase(rdbms_backend_supports_auto_cleaning = CaseName, Config) ->
unmock_timestamp(mim()),
escalus:end_per_testcase(CaseName, Config);
end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

Expand All @@ -63,14 +70,68 @@ file_backend(Config) ->
['node1@localhost', 'node2@otherhost'] = lists:sort(Nodes).

rdbms_backend(_Config) ->
Opts1 = #{cluster_name => <<"big_test">>, node_name_to_insert => <<"test1">>},
Opts2 = #{cluster_name => <<"big_test">>, node_name_to_insert => <<"test2">>},
State1 = rpc(mim(), mongoose_cets_discovery_rdbms, init, [Opts1]),
rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State1]),
State2 = rpc(mim(), mongoose_cets_discovery_rdbms, init, [Opts2]),
{{ok, Nodes}, State2_2} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2]),
CN = <<"big_test">>,
Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
State1 = disco_init(mim(), Opts1),
disco_get_nodes(mim(), State1),
State2 = disco_init(mim2(), Opts2),
{{ok, Nodes}, State2_2} = disco_get_nodes(mim2(), State2),
%% "test2" node can see "test1"
true = lists:member(test1, Nodes),
{{ok, _}, State2_3} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2_2]),
{{ok, _}, State2_3} = disco_get_nodes(mim2(), State2_2),
%% Check that we follow the right code branch
#{last_query_info := #{already_registered := true}} = State2_3.

rdbms_backend_supports_auto_cleaning(Config) ->
ensure_mocked(Config),
CN = <<"big_test2">>,
Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
%% test1 row is written with an old (mocked) timestamp
State1 = disco_init(mim(), Opts1),
{_, State1_2} = disco_get_nodes(mim(), State1),
{{ok, Nodes1}, State1_3} = disco_get_nodes(mim(), State1_2),
Timestamp = proplists:get_value(mocked_timestamp, Config),
#{last_query_info := #{timestamp := Timestamp}} = State1_3,
%% It is in DB
true = lists:member(test1, Nodes1),
%% test2 would clean test1 registration
%% We don't mock on mim2 node, so timestamps would differ
State2 = disco_init(mim2(), Opts2),
{{ok, Nodes2}, State2_2} = disco_get_nodes(mim2(), State2),
false = lists:member(test1, Nodes2),
#{last_query_info := #{run_cleaning_result := {removed, [test1]}}} = State2_2.

%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------

disco_init(Node, Opts) ->
rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]).

disco_get_nodes(Node, State) ->
rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]).

timestamp() ->
os:system_time(second).

month_ago() ->
timestamp() - timer:hours(24 * 30) div 1000.

mock_timestamp(Node, Timestamp) ->
ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]),
ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]),
%% Ensure that we mock
EnsureMocked = fun() ->
Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, [])
end,
EnsureMocked(),
Copy link
Member

@chrzaszcz chrzaszcz Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why execute and ignore result? Seems to do nothing.

Copy link
Contributor Author

@arcusfelis arcusfelis Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am matching Timestamp variable here.
So, EnsureMocked will crash if select() returns something different.

Copy link
Contributor Author

@arcusfelis arcusfelis Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.e. if meck fails to mock for some reason, we would get:

=== ERROR! init_per_testcase crashed!
	Location: [{cets_disco_SUITE,'-mock_timestamp/2-fun-0-',[127](file:///Users/mikhailuvarov/erlang/esl/MongooseIM/big_tests/ct_report/[email protected]_17.25.22/big_tests.tests.cets_disco_SUITE.logs/run.2023-09-01_17.25.23/cets_disco_suite.src.html#127)},
              {cets_disco_SUITE,mock_timestamp,[129](file:///Users/mikhailuvarov/erlang/esl/MongooseIM/big_tests/ct_report/[email protected]_17.25.22/big_tests.tests.cets_disco_SUITE.logs/run.2023-09-01_17.25.23/cets_disco_suite.src.html#129)},
              {cets_disco_SUITE,init_per_testcase,[50](file:///Users/mikhailuvarov/erlang/esl/MongooseIM/big_tests/ct_report/[email protected]_17.25.22/big_tests.tests.cets_disco_SUITE.logs/run.2023-09-01_17.25.23/cets_disco_suite.src.html#50)},
              {test_server,do_init_per_testcase,1552},
              {test_server,run_test_case_eval1,1253},
              {test_server,run_test_case_eval,1223}]
	Reason: {{badmatch,1693581924},
 [{cets_disco_SUITE,'-mock_timestamp/2-fun-0-',2,
                    [{file,"cets_disco_SUITE.erl"},{line,127}]},
  {cets_disco_SUITE,mock_timestamp,2,
                    [{file,"cets_disco_SUITE.erl"},{line,129}]},
  {cets_disco_SUITE,init_per_testcase,2,
                    [{file,"cets_disco_SUITE.erl"},{line,50}]},
  {test_server,do_init_per_testcase,2,[{file,"test_server.erl"},{line,1552}]},
  {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1253}]},
  {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1223}]}]}

I could remove EnsureMocked though, it is just to verify and mocking is working.

[{ensure_mocked, EnsureMocked}, {mocked_timestamp, Timestamp}].

ensure_mocked(Config) ->
EnsureMocked = proplists:get_value(ensure_mocked, Config),
EnsureMocked().

unmock_timestamp(Node) ->
ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).
8 changes: 8 additions & 0 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ rdbms_queries_cases() ->
read_prep_binary_16m_case,
read_prep_enum_char_case,
read_prep_boolean_case,
select_current_timestamp_case,

select_like_case,
select_like_prep_case,
Expand Down Expand Up @@ -322,6 +323,13 @@ read_prep_enum_char_case(Config) ->
read_prep_boolean_case(Config) ->
[check_prep_boolean(Config, Value) || Value <- [0, 1]].

select_current_timestamp_case(Config) ->
ok = rpc(mim(), mongoose_rdbms_timestamp, prepare, []),
assert_is_integer(rpc(mim(), mongoose_rdbms_timestamp, select, [])).

assert_is_integer(X) when is_integer(X) ->
X.

truncate_binaries(Len, List) ->
[truncate_binary(Len, Bin) || Bin <- List].

Expand Down
2 changes: 1 addition & 1 deletion priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ CREATE INDEX i_domain_events_domain ON domain_events(domain);
CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
updated_timestamp BIGINT NOT NULL, -- in seconds
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
Expand Down
2 changes: 1 addition & 1 deletion priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ CREATE INDEX i_domain_events_domain ON domain_events(domain);
CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
updated_timestamp BIGINT NOT NULL, -- in seconds
node_num INT UNSIGNED NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
Expand Down
2 changes: 1 addition & 1 deletion priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ CREATE INDEX i_domain_events_domain ON domain_events(domain);
CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
updated_timestamp BIGINT NOT NULL, -- in seconds
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"9965e3b35f3776dff5879effaab538a0ab94592d"}},
{ref,"4cf589591966d6dbc9201f55399360ad2b6db35b"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
Expand Down
70 changes: 56 additions & 14 deletions src/mongoose_cets_discovery_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map()}.
-type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map(),
expire_time := non_neg_integer()}.
-type state() :: opts().

-spec init(opts()) -> state().
init(Opts = #{cluster_name := _, node_name_to_insert := _}) ->
Opts#{last_query_info => #{}}.
maps:merge(defaults(), Opts).

defaults() ->
#{expire_time => 60 * 60 * 1, %% 1 hour in seconds
last_query_info => #{}}.

-spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
try
case is_rdbms_running() of
true ->
try_register(ClusterName, Node);
try_register(ClusterName, Node, State);
false ->
skip
end
Expand All @@ -45,19 +50,19 @@ is_rdbms_running() ->
false
end.

try_register(ClusterName, NodeBin) when is_binary(NodeBin), is_binary(ClusterName) ->
Node = binary_to_atom(NodeBin),
try_register(ClusterName, NodeBin, State) when is_binary(NodeBin), is_binary(ClusterName) ->
prepare(),
Timestamp = timestamp(),
Node = binary_to_atom(NodeBin),
{selected, Rows} = select(ClusterName),
Pairs = [{binary_to_atom(DbNodeBin), Num} || {DbNodeBin, Num} <- Rows],
{Nodes, Nums} = lists:unzip(Pairs),
Zipped = [{binary_to_atom(DbNodeBin), Num, TS} || {DbNodeBin, Num, TS} <- Rows],
{Nodes, Nums, _Timestamps} = lists:unzip3(Zipped),
AlreadyRegistered = lists:member(Node, Nodes),
Timestamp = timestamp(),
NodeNum =
case AlreadyRegistered of
true ->
update_existing(ClusterName, NodeBin, Timestamp),
{value, {_, Num}} = lists:keysearch(Node, 1, Pairs),
{value, {_, Num, _TS}} = lists:keysearch(Node, 1, Zipped),
Num;
false ->
Num = next_free_num(lists:usort(Nums)),
Expand All @@ -66,21 +71,52 @@ try_register(ClusterName, NodeBin) when is_binary(NodeBin), is_binary(ClusterNam
insert_new(ClusterName, NodeBin, Timestamp, Num),
Num
end,
RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State),
%% This could be used for debugging
Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
node_num => Num, last_rows => Rows},
{NodeNum, Nodes, Info}.
node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult},
Nodes2 = skip_expired_nodes(Nodes, RunCleaningResult),
{NodeNum, Nodes2, Info}.

skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
Nodes -- ExpiredNodes;
skip_expired_nodes(Nodes, {skip, _}) ->
Nodes.

run_cleaning(ClusterName, Timestamp, Rows, State) ->
Expired = [{DbNodeBin, Num, DbTS} || {DbNodeBin, Num, DbTS} <- Rows,
is_expired(DbTS, Timestamp, State)],
ExpiredNodes = [binary_to_atom(DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired],
case Expired of
[] ->
{skip, nothing_expired};
_ ->
[delete_node_from_db(ClusterName, DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired],
?LOG_WARNING(#{what => cets_expired_nodes,
text => <<"Expired nodes are detected in discovery_nodes table">>,
expired_nodes => ExpiredNodes}),
{removed, ExpiredNodes}
end.

is_expired(DbTS, Timestamp, #{expire_time := ExpireTime}) when is_integer(DbTS) ->
(Timestamp - DbTS) > ExpireTime. %% compare seconds

delete_node_from_db(ClusterName, Node) ->
mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]).

prepare() ->
T = discovery_nodes,
mongoose_rdbms_timestamp:prepare(),
mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
mongoose_rdbms:prepare(cets_disco_insert_new, T,
[cluster_name, node_name, node_num, updated_timestamp], insert_new()),
mongoose_rdbms:prepare(cets_disco_update_existing, T,
[updated_timestamp, cluster_name, node_name], update_existing()).
[updated_timestamp, cluster_name, node_name], update_existing()),
mongoose_rdbms:prepare(cets_delete_node_from_db, T,
[cluster_name, node_name], delete_node_from_db()).

select() ->
<<"SELECT node_name, node_num FROM discovery_nodes WHERE cluster_name = ?">>.
<<"SELECT node_name, node_num, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>.

select(ClusterName) ->
mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).
Expand All @@ -95,11 +131,17 @@ insert_new(ClusterName, Node, Timestamp, Num) ->
update_existing() ->
<<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>.

delete_node_from_db() ->
<<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>.

update_existing(ClusterName, Node, Timestamp) ->
mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]).

%% in seconds
timestamp() ->
os:system_time(microsecond).
% We could use Erlang timestamp os:system_time(second).
% But we use the database server time as a central source of truth.
mongoose_rdbms_timestamp:select().

%% Returns a next free node id based on the currently registered ids
next_free_num([]) ->
Expand Down
25 changes: 25 additions & 0 deletions src/rdbms/mongoose_rdbms_timestamp.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-module(mongoose_rdbms_timestamp).
-export([prepare/0,
select/0]).

-spec prepare() -> ok.
prepare() ->
mongoose_rdbms:prepare(mim_timestamp, users, [], select_query()),
ok.

select_query() ->
case {mongoose_rdbms:db_engine(global), mongoose_rdbms:db_type()} of
{mysql, _} ->
<<"SELECT UNIX_TIMESTAMP()">>;
{pgsql, _} ->
<<"SELECT ROUND(extract(epoch from now()))">>;
{odbc, mssql} ->
<<"SELECT DATEDIFF_BIG(second, '1970-01-01 00:00:00', GETUTCDATE())">>;
Other ->
error({prepare_timestamp_query_failed, Other})
end.

-spec select() -> integer().
select() ->
Res = mongoose_rdbms:execute_successfully(global, mim_timestamp, []),
mongoose_rdbms:selected_to_integer(Res). %% ensure it is an integer