Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cets:insert_serial/2 #33

Merged
merged 5 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
insert/2,
insert_many/2,
insert_new/2,
insert_serial/2,
delete/2,
delete_many/2,
delete_object/2,
Expand Down Expand Up @@ -63,6 +64,7 @@
insert/2,
insert_many/2,
insert_new/2,
insert_serial/2,
delete/2,
delete_many/2,
delete_object/2,
Expand Down Expand Up @@ -249,6 +251,16 @@ insert_new(Server, Rec) when is_tuple(Rec) ->
handle_insert_new_result(ok) -> true;
handle_insert_new_result({error, rejected}) -> false.

%% @doc Serialized version of `insert/2'.
%%
%% All `insert_serial' calls are sent to the leader node first.
%%
%% Similar to `insert_new/2', but overwrites the data silently on conflict.
%% It could be used to update entries, which use not node-specific keys.
-spec insert_serial(server_ref(), tuple()) -> ok.
insert_serial(Server, Rec) when is_tuple(Rec) ->
ok = cets_call:send_leader_op(Server, {insert, Rec}).
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved

%% Removes an object with the key from all nodes in the cluster.
%% Ideally, nodes should only remove data that they've inserted, not data from other node.
-spec delete(server_ref(), term()) -> ok.
Expand Down Expand Up @@ -536,15 +548,17 @@ set_other_servers(Servers, State = #{tab := Tab, ack_pid := AckPid}) ->
pids_to_nodes(Pids) ->
lists:map(fun node/1, Pids).

-spec ets_delete_keys(table_name(), [term()]) -> ok.
%% ETS returns booleans instead of ok, because ETS API is old and inspired by Prolog.
%% So, match the API logic here.
-spec ets_delete_keys(table_name(), [term()]) -> true.
ets_delete_keys(Tab, Keys) ->
[ets:delete(Tab, Key) || Key <- Keys],
ok.
true.

-spec ets_delete_objects(table_name(), [tuple()]) -> ok.
-spec ets_delete_objects(table_name(), [tuple()]) -> true.
ets_delete_objects(Tab, Objects) ->
[ets:delete_object(Tab, Object) || Object <- Objects],
ok.
true.

%% Handle operation from a remote node
-spec handle_remote_op(op(), from(), ack_pid(), join_ref(), state()) -> ok.
Expand All @@ -563,11 +577,11 @@ handle_remote_op(Op, From, AckPid, RemoteJoinRef, #{join_ref := JoinRef}) ->
cets_ack:ack(AckPid, From, self()).

%% Apply operation for one local table only
-spec do_op(op(), state()) -> ok | boolean().
-spec do_op(op(), state()) -> boolean().
do_op(Op, #{tab := Tab}) ->
do_table_op(Op, Tab).

-spec do_table_op(op(), table_name()) -> ok | boolean().
-spec do_table_op(op(), table_name()) -> boolean().
do_table_op({insert, Rec}, Tab) ->
ets:insert(Tab, Rec);
do_table_op({delete, Key}, Tab) ->
Expand Down
194 changes: 194 additions & 0 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ cases() ->
insert_new_is_retried_when_leader_is_reelected,
insert_new_fails_if_the_leader_dies,
insert_new_fails_if_the_local_server_is_dead,
insert_serial_works,
insert_serial_overwrites_data,
insert_overwrites_data_inconsistently,
insert_new_does_not_overwrite_data,
insert_serial_overwrites_data_consistently,
insert_serial_works_when_leader_is_back,
insert_serial_blocks_when_leader_is_not_back,
leader_is_the_same_in_metadata_after_join,
join_with_the_same_pid,
join_ref_is_same_after_join,
Expand Down Expand Up @@ -381,6 +388,186 @@ insert_new_fails_if_the_local_server_is_dead(_Config) ->
exit:{noproc, {gen_server, call, _}} -> ok
end.

insert_serial_works(Config) ->
#{pid1 := Pid1, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config),
ok = cets:insert_serial(Pid1, {a, 1}),
[{a, 1}] = cets:dump(Tab1),
[{a, 1}] = cets:dump(Tab2).

insert_serial_overwrites_data(Config) ->
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved
#{pid1 := Pid1, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config),
ok = cets:insert_serial(Pid1, {a, 1}),
ok = cets:insert_serial(Pid1, {a, 2}),
[{a, 2}] = cets:dump(Tab1),
[{a, 2}] = cets:dump(Tab2).

%% Test case when both servers receive a request to update the same key.
%% Compare with insert_serial_overwrites_data_consistently
%% and insert_new_does_not_overwrite_data.
insert_overwrites_data_inconsistently(Config) ->
Me = self(),
#{pid1 := Pid1, pid2 := Pid2, tab1 := Tab1, tab2 := Tab2} =
given_two_joined_tables(Config),
spawn_link(fun() ->
sys:replace_state(Pid1, fun(State) ->
Me ! replacing_state1,
receive_message(continue_test),
State
end)
end),
spawn_link(fun() ->
sys:replace_state(Pid2, fun(State) ->
Me ! replacing_state2,
receive_message(continue_test),
State
end)
end),
receive_message(replacing_state1),
receive_message(replacing_state2),
%% Insert at the same time
spawn_link(fun() ->
ok = cets:insert(Tab1, {a, 1}),
Me ! inserted1
end),
spawn_link(fun() ->
ok = cets:insert(Tab2, {a, 2}),
Me ! inserted2
end),
%% Wait till got an insert op in the queue
wait_till_message_queue_length(Pid1, 1),
wait_till_message_queue_length(Pid2, 1),
Pid1 ! continue_test,
Pid2 ! continue_test,
receive_message(inserted1),
receive_message(inserted2),
%% Different values due to a race condition
[{a, 2}] = cets:dump(Tab1),
[{a, 1}] = cets:dump(Tab2).

insert_new_does_not_overwrite_data(Config) ->
Me = self(),
#{pid1 := Pid1, pid2 := Pid2, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config),
Leader = cets:get_leader(Pid1),
spawn_link(fun() ->
sys:replace_state(Pid1, fun(State) ->
Me ! replacing_state1,
receive_message(continue_test),
State
end)
end),
spawn_link(fun() ->
sys:replace_state(Pid2, fun(State) ->
Me ! replacing_state2,
receive_message(continue_test),
State
end)
end),
receive_message(replacing_state1),
receive_message(replacing_state2),
%% Insert at the same time
spawn_link(fun() ->
true = cets:insert_new(Tab1, {a, 1}),
Me ! inserted1
end),
wait_till_message_queue_length(Leader, 1),
spawn_link(fun() ->
false = cets:insert_new(Tab2, {a, 2}),
Me ! inserted2
end),
%% Wait till got the insert ops in the queue.
%% Leader gets both requests.
wait_till_message_queue_length(Leader, 2),
Pid1 ! continue_test,
Pid2 ! continue_test,
receive_message(inserted1),
receive_message(inserted2),
[{a, 1}] = cets:dump(Tab1),
[{a, 1}] = cets:dump(Tab2).

%% We have to use table names instead of pids to insert, because
%% get_leader is an ETS call, if ServerRef is a table name.
%% And get_leader is a gen_server call, if ServerRef is a pid.
insert_serial_overwrites_data_consistently(Config) ->
Me = self(),
#{pid1 := Pid1, pid2 := Pid2, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config),
Leader = cets:get_leader(Pid1),
spawn_link(fun() ->
sys:replace_state(Pid1, fun(State) ->
Me ! replacing_state1,
receive_message(continue_test),
State
end)
end),
spawn_link(fun() ->
sys:replace_state(Pid2, fun(State) ->
Me ! replacing_state2,
receive_message(continue_test),
State
end)
end),
receive_message(replacing_state1),
receive_message(replacing_state2),
%% Insert at the same time
spawn_link(fun() ->
ok = cets:insert_serial(Tab1, {a, 1}),
Me ! inserted1
end),
%% Ensure, that first insert comes before the second
%% (just to get a predictable value. The value would be still
%% consistent in case first insert comes after the second).
wait_till_message_queue_length(Leader, 1),
spawn_link(fun() ->
ok = cets:insert_serial(Tab2, {a, 2}),
Me ! inserted2
end),
%% Wait till got the insert ops in the queue.
%% Leader gets both requests.
wait_till_message_queue_length(Leader, 2),
Pid1 ! continue_test,
Pid2 ! continue_test,
receive_message(inserted1),
receive_message(inserted2),
[{a, 2}] = cets:dump(Tab1),
[{a, 2}] = cets:dump(Tab2).

%% Similar to insert_new_works_when_leader_is_back
insert_serial_works_when_leader_is_back(Config) ->
#{pid1 := Pid1, pid2 := Pid2} = given_two_joined_tables(Config),
Leader = cets:get_leader(Pid1),
NotLeader = not_leader(Pid1, Pid2, Leader),
cets:set_leader(Leader, false),
spawn(fun() ->
timer:sleep(100),
cets:set_leader(Leader, true)
end),
%% Blocks, until cets:set_leader sets leader back to true.
ok = cets:insert_serial(NotLeader, {alice, 32}).

insert_serial_blocks_when_leader_is_not_back(Config) ->
Me = self(),
F = fun(X) ->
put(test_stage, detected),
Me ! {wrong_leader_detected, X}
end,
#{pid1 := Pid1, pid2 := Pid2} = given_two_joined_tables(Config, #{handle_wrong_leader => F}),
Leader = cets:get_leader(Pid1),
NotLeader = not_leader(Pid1, Pid2, Leader),
cets:set_leader(Leader, false),
InserterPid = spawn(fun() ->
%% Will block indefinetely, because we set is_leader flag manually.
ok = cets:insert_serial(NotLeader, {alice, 32})
end),
receive
{wrong_leader_detected, Info} ->
ct:log("wrong_leader_detected ~p", [Info])
after 5000 ->
ct:fail(wrong_leader_not_detected)
end,
%% Still alive and blocking
pong = cets:ping(Pid1),
pong = cets:ping(Pid2),
?assert(erlang:is_process_alive(InserterPid)).

leader_is_the_same_in_metadata_after_join(Config) ->
#{tabs := [T1, T2], pids := [Pid1, Pid2]} = given_two_joined_tables(Config),
Leader = cets:get_leader(Pid1),
Expand Down Expand Up @@ -1856,6 +2043,13 @@ get_pd(Pid, Key) ->
wait_till_test_stage(Pid, Stage) ->
cets_test_wait:wait_until(fun() -> get_pd(Pid, test_stage) end, Stage).

wait_till_message_queue_length(Pid, Len) ->
cets_test_wait:wait_until(fun() -> get_message_queue_length(Pid) end, Len).

get_message_queue_length(Pid) ->
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
Len.

%% Disconnect node until manually connected
block_node(Node, Peer) when is_atom(Node), is_pid(Peer) ->
rpc(Peer, erlang, set_cookie, [node(), invalid_cookie]),
Expand Down