Skip to content

Commit

Permalink
Merge pull request #4049 from esl/feature-cets-rdbms-disco-cleaning-u…
Browse files Browse the repository at this point in the history
…se-db-time

Cleaning logic for discovery_nodes table - using database timestamp
  • Loading branch information
chrzaszcz authored Sep 4, 2023
2 parents 5366d6d + d1dade1 commit c140a5b
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 28 deletions.
81 changes: 71 additions & 10 deletions big_tests/tests/cets_disco_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(cets_disco_SUITE).
-compile([export_all, nowarn_export_all]).

-import(distributed_helper, [mim/0, rpc/4]).
-import(distributed_helper, [mim/0, mim2/0, rpc/4]).
-include_lib("common_test/include/ct.hrl").

%%--------------------------------------------------------------------
Expand All @@ -19,10 +19,11 @@ file_cases() ->
[file_backend].

rdbms_cases() ->
[rdbms_backend].
[rdbms_backend,
rdbms_backend_supports_auto_cleaning].

suite() ->
escalus:suite().
distributed_helper:require_rpc_nodes([mim, mim2]) ++ escalus:suite().

%%--------------------------------------------------------------------
%% Init & teardown
Expand All @@ -45,9 +46,15 @@ init_per_group(_, Config) ->
end_per_group(_, Config) ->
Config.

init_per_testcase(rdbms_backend_supports_auto_cleaning = CaseName, Config) ->
mock_timestamp(mim(), month_ago()) ++
escalus:init_per_testcase(CaseName, Config);
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

end_per_testcase(rdbms_backend_supports_auto_cleaning = CaseName, Config) ->
unmock_timestamp(mim()),
escalus:end_per_testcase(CaseName, Config);
end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

Expand All @@ -63,14 +70,68 @@ file_backend(Config) ->
['node1@localhost', 'node2@otherhost'] = lists:sort(Nodes).

rdbms_backend(_Config) ->
Opts1 = #{cluster_name => <<"big_test">>, node_name_to_insert => <<"test1">>},
Opts2 = #{cluster_name => <<"big_test">>, node_name_to_insert => <<"test2">>},
State1 = rpc(mim(), mongoose_cets_discovery_rdbms, init, [Opts1]),
rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State1]),
State2 = rpc(mim(), mongoose_cets_discovery_rdbms, init, [Opts2]),
{{ok, Nodes}, State2_2} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2]),
CN = <<"big_test">>,
Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
State1 = disco_init(mim(), Opts1),
disco_get_nodes(mim(), State1),
State2 = disco_init(mim2(), Opts2),
{{ok, Nodes}, State2_2} = disco_get_nodes(mim2(), State2),
%% "test2" node can see "test1"
true = lists:member(test1, Nodes),
{{ok, _}, State2_3} = rpc(mim(), mongoose_cets_discovery_rdbms, get_nodes, [State2_2]),
{{ok, _}, State2_3} = disco_get_nodes(mim2(), State2_2),
%% Check that we follow the right code branch
#{last_query_info := #{already_registered := true}} = State2_3.

rdbms_backend_supports_auto_cleaning(Config) ->
ensure_mocked(Config),
CN = <<"big_test2">>,
Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
%% test1 row is written with an old (mocked) timestamp
State1 = disco_init(mim(), Opts1),
{_, State1_2} = disco_get_nodes(mim(), State1),
{{ok, Nodes1}, State1_3} = disco_get_nodes(mim(), State1_2),
Timestamp = proplists:get_value(mocked_timestamp, Config),
#{last_query_info := #{timestamp := Timestamp}} = State1_3,
%% It is in DB
true = lists:member(test1, Nodes1),
%% test2 would clean test1 registration
%% We don't mock on mim2 node, so timestamps would differ
State2 = disco_init(mim2(), Opts2),
{{ok, Nodes2}, State2_2} = disco_get_nodes(mim2(), State2),
false = lists:member(test1, Nodes2),
#{last_query_info := #{run_cleaning_result := {removed, [test1]}}} = State2_2.

%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------

disco_init(Node, Opts) ->
rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]).

disco_get_nodes(Node, State) ->
rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]).

timestamp() ->
os:system_time(second).

month_ago() ->
timestamp() - timer:hours(24 * 30) div 1000.

mock_timestamp(Node, Timestamp) ->
ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]),
ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]),
%% Ensure that we mock
EnsureMocked = fun() ->
Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, [])
end,
EnsureMocked(),
[{ensure_mocked, EnsureMocked}, {mocked_timestamp, Timestamp}].

ensure_mocked(Config) ->
EnsureMocked = proplists:get_value(ensure_mocked, Config),
EnsureMocked().

unmock_timestamp(Node) ->
ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).
8 changes: 8 additions & 0 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ rdbms_queries_cases() ->
read_prep_binary_16m_case,
read_prep_enum_char_case,
read_prep_boolean_case,
select_current_timestamp_case,

select_like_case,
select_like_prep_case,
Expand Down Expand Up @@ -322,6 +323,13 @@ read_prep_enum_char_case(Config) ->
read_prep_boolean_case(Config) ->
[check_prep_boolean(Config, Value) || Value <- [0, 1]].

select_current_timestamp_case(Config) ->
ok = rpc(mim(), mongoose_rdbms_timestamp, prepare, []),
assert_is_integer(rpc(mim(), mongoose_rdbms_timestamp, select, [])).

assert_is_integer(X) when is_integer(X) ->
X.

truncate_binaries(Len, List) ->
[truncate_binary(Len, Bin) || Bin <- List].

Expand Down
2 changes: 1 addition & 1 deletion priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ CREATE INDEX i_domain_events_domain ON domain_events(domain);
CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
updated_timestamp BIGINT NOT NULL, -- in seconds
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
Expand Down
2 changes: 1 addition & 1 deletion priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ CREATE INDEX i_domain_events_domain ON domain_events(domain);
CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
updated_timestamp BIGINT NOT NULL, -- in seconds
node_num INT UNSIGNED NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
Expand Down
2 changes: 1 addition & 1 deletion priv/pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ CREATE INDEX i_domain_events_domain ON domain_events(domain);
CREATE TABLE discovery_nodes (
node_name varchar(250),
cluster_name varchar(250),
updated_timestamp BIGINT NOT NULL, -- in microseconds
updated_timestamp BIGINT NOT NULL, -- in seconds
node_num INT NOT NULL,
PRIMARY KEY (cluster_name, node_name)
);
Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},1},
{<<"cets">>,
{git,"https://github.com/esl/cets.git",
{ref,"9965e3b35f3776dff5879effaab538a0ab94592d"}},
{ref,"4cf589591966d6dbc9201f55399360ad2b6db35b"}},
0},
{<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},0},
{<<"cowboy_swagger">>,{pkg,<<"cowboy_swagger">>,<<"2.5.1">>},0},
Expand Down
70 changes: 56 additions & 14 deletions src/mongoose_cets_discovery_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map()}.
-type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map(),
expire_time := non_neg_integer()}.
-type state() :: opts().

-spec init(opts()) -> state().
init(Opts = #{cluster_name := _, node_name_to_insert := _}) ->
Opts#{last_query_info => #{}}.
maps:merge(defaults(), Opts).

defaults() ->
#{expire_time => 60 * 60 * 1, %% 1 hour in seconds
last_query_info => #{}}.

-spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
try
case is_rdbms_running() of
true ->
try_register(ClusterName, Node);
try_register(ClusterName, Node, State);
false ->
skip
end
Expand All @@ -45,19 +50,19 @@ is_rdbms_running() ->
false
end.

try_register(ClusterName, NodeBin) when is_binary(NodeBin), is_binary(ClusterName) ->
Node = binary_to_atom(NodeBin),
try_register(ClusterName, NodeBin, State) when is_binary(NodeBin), is_binary(ClusterName) ->
prepare(),
Timestamp = timestamp(),
Node = binary_to_atom(NodeBin),
{selected, Rows} = select(ClusterName),
Pairs = [{binary_to_atom(DbNodeBin), Num} || {DbNodeBin, Num} <- Rows],
{Nodes, Nums} = lists:unzip(Pairs),
Zipped = [{binary_to_atom(DbNodeBin), Num, TS} || {DbNodeBin, Num, TS} <- Rows],
{Nodes, Nums, _Timestamps} = lists:unzip3(Zipped),
AlreadyRegistered = lists:member(Node, Nodes),
Timestamp = timestamp(),
NodeNum =
case AlreadyRegistered of
true ->
update_existing(ClusterName, NodeBin, Timestamp),
{value, {_, Num}} = lists:keysearch(Node, 1, Pairs),
{value, {_, Num, _TS}} = lists:keysearch(Node, 1, Zipped),
Num;
false ->
Num = next_free_num(lists:usort(Nums)),
Expand All @@ -66,21 +71,52 @@ try_register(ClusterName, NodeBin) when is_binary(NodeBin), is_binary(ClusterNam
insert_new(ClusterName, NodeBin, Timestamp, Num),
Num
end,
RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State),
%% This could be used for debugging
Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
node_num => Num, last_rows => Rows},
{NodeNum, Nodes, Info}.
node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult},
Nodes2 = skip_expired_nodes(Nodes, RunCleaningResult),
{NodeNum, Nodes2, Info}.

skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
Nodes -- ExpiredNodes;
skip_expired_nodes(Nodes, {skip, _}) ->
Nodes.

run_cleaning(ClusterName, Timestamp, Rows, State) ->
Expired = [{DbNodeBin, Num, DbTS} || {DbNodeBin, Num, DbTS} <- Rows,
is_expired(DbTS, Timestamp, State)],
ExpiredNodes = [binary_to_atom(DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired],
case Expired of
[] ->
{skip, nothing_expired};
_ ->
[delete_node_from_db(ClusterName, DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired],
?LOG_WARNING(#{what => cets_expired_nodes,
text => <<"Expired nodes are detected in discovery_nodes table">>,
expired_nodes => ExpiredNodes}),
{removed, ExpiredNodes}
end.

is_expired(DbTS, Timestamp, #{expire_time := ExpireTime}) when is_integer(DbTS) ->
(Timestamp - DbTS) > ExpireTime. %% compare seconds

delete_node_from_db(ClusterName, Node) ->
mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]).

prepare() ->
T = discovery_nodes,
mongoose_rdbms_timestamp:prepare(),
mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
mongoose_rdbms:prepare(cets_disco_insert_new, T,
[cluster_name, node_name, node_num, updated_timestamp], insert_new()),
mongoose_rdbms:prepare(cets_disco_update_existing, T,
[updated_timestamp, cluster_name, node_name], update_existing()).
[updated_timestamp, cluster_name, node_name], update_existing()),
mongoose_rdbms:prepare(cets_delete_node_from_db, T,
[cluster_name, node_name], delete_node_from_db()).

select() ->
<<"SELECT node_name, node_num FROM discovery_nodes WHERE cluster_name = ?">>.
<<"SELECT node_name, node_num, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>.

select(ClusterName) ->
mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).
Expand All @@ -95,11 +131,17 @@ insert_new(ClusterName, Node, Timestamp, Num) ->
update_existing() ->
<<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>.

delete_node_from_db() ->
<<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>.

update_existing(ClusterName, Node, Timestamp) ->
mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]).

%% in seconds
timestamp() ->
os:system_time(microsecond).
% We could use Erlang timestamp os:system_time(second).
% But we use the database server time as a central source of truth.
mongoose_rdbms_timestamp:select().

%% Returns a next free node id based on the currently registered ids
next_free_num([]) ->
Expand Down
25 changes: 25 additions & 0 deletions src/rdbms/mongoose_rdbms_timestamp.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-module(mongoose_rdbms_timestamp).
-export([prepare/0,
select/0]).

-spec prepare() -> ok.
prepare() ->
mongoose_rdbms:prepare(mim_timestamp, users, [], select_query()),
ok.

select_query() ->
case {mongoose_rdbms:db_engine(global), mongoose_rdbms:db_type()} of
{mysql, _} ->
<<"SELECT UNIX_TIMESTAMP()">>;
{pgsql, _} ->
<<"SELECT ROUND(extract(epoch from now()))">>;
{odbc, mssql} ->
<<"SELECT DATEDIFF_BIG(second, '1970-01-01 00:00:00', GETUTCDATE())">>;
Other ->
error({prepare_timestamp_query_failed, Other})
end.

-spec select() -> integer().
select() ->
Res = mongoose_rdbms:execute_successfully(global, mim_timestamp, []),
mongoose_rdbms:selected_to_integer(Res). %% ensure it is an integer

0 comments on commit c140a5b

Please sign in to comment.