Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple MAM queries with complete result flag #3734

Merged
merged 5 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 59 additions & 13 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
pagination_before10/1,
pagination_after10/1,
pagination_simple_before10/1,
pagination_simple_before3/1,
pagination_simple_before6/1,
pagination_simple_before1_pagesize0/1,
pagination_simple_before2_pagesize0/1,
pagination_simple_after5/1,
pagination_simple_after10/1,
pagination_simple_after12/1,
pagination_last_after_id5/1,
pagination_last_after_id5_before_id11/1,
pagination_empty_rset/1,
Expand Down Expand Up @@ -178,9 +185,10 @@
rsm_send/3,
stanza_page_archive_request/3,
wait_empty_rset/2,
wait_message_range/2,
wait_message_range/3,
message_id/2,
wait_message_range/5,
message_id/2,
stanza_prefs_set_request/4,
stanza_prefs_get_request/1,
stanza_query_get_request/1,
Expand Down Expand Up @@ -498,6 +506,13 @@ rsm_cases() ->
pagination_last_page_after_id4,
%% Simple cases
pagination_simple_before10,
pagination_simple_before3,
pagination_simple_before6,
pagination_simple_before1_pagesize0,
pagination_simple_before2_pagesize0,
pagination_simple_after5,
pagination_simple_after10,
pagination_simple_after12,
%% item_not_found response for nonexistent message ID in before/after filters
server_returns_item_not_found_for_before_filter_with_nonexistent_id,
server_returns_item_not_found_for_after_filter_with_nonexistent_id,
Expand Down Expand Up @@ -2477,7 +2492,6 @@ pagination_offset5_max0(Config) ->
pagination_before10(Config) ->
P = ?config(props, Config),
F = fun(Alice) ->
%% Get the last page of size 5.
RSM = #rsm_in{max=5, direction=before, id=message_id(10, Config)},
rsm_send(Config, Alice,
stanza_page_archive_request(P, <<"before10">>, RSM)),
Expand All @@ -2487,17 +2501,37 @@ pagination_before10(Config) ->
parallel_story(Config, [{alice, 1}], F).

pagination_simple_before10(Config) ->
P = ?config(props, Config),
F = fun(Alice) ->
%% Get the last page of size 5.
RSM = #rsm_in{max=5, direction=before, id=message_id(10, Config), simple=true},
rsm_send(Config, Alice,
stanza_page_archive_request(P, <<"before10">>, RSM)),
%% wait_message_range(Client, TotalCount, Offset, FromN, ToN),
wait_message_range(Alice, undefined, undefined, 5, 9),
ok
end,
parallel_story(Config, [{alice, 1}], F).
RSM = #rsm_in{max = 5, direction = before, id = message_id(10, Config), simple = true},
pagination_test(before10, RSM, simple_range(5, 9, false), Config).

pagination_simple_before3(Config) ->
RSM = #rsm_in{max = 5, direction = before, id = message_id(3, Config), simple = true},
pagination_test(before3, RSM, simple_range(1, 2, true), Config).

pagination_simple_before6(Config) ->
RSM = #rsm_in{max = 5, direction = before, id = message_id(6, Config), simple = true},
pagination_test(before6, RSM, simple_range(1, 5, true), Config).

pagination_simple_before1_pagesize0(Config) ->
%% No messages forwarded, but is_complete is set
RSM = #rsm_in{max = 0, direction = before, id = message_id(1, Config), simple = true},
pagination_test(before1, RSM, simple_range(undefined, undefined, true), Config).

pagination_simple_before2_pagesize0(Config) ->
RSM = #rsm_in{max = 0, direction = before, id = message_id(2, Config), simple = true},
pagination_test(before2, RSM, simple_range(undefined, undefined, false), Config).

pagination_simple_after5(Config) ->
RSM = #rsm_in{max = 3, direction = 'after', id = message_id(5, Config), simple = true},
pagination_test(after5, RSM, simple_range(6, 8, false), Config).

pagination_simple_after10(Config) ->
RSM = #rsm_in{max = 5, direction = 'after', id = message_id(10, Config), simple = true},
pagination_test(after10, RSM, simple_range(11, 15, true), Config).

pagination_simple_after12(Config) ->
RSM = #rsm_in{max = 5, direction = 'after', id = message_id(12, Config), simple = true},
pagination_test(after12, RSM, simple_range(13, 15, true), Config).

pagination_after10(Config) ->
P = ?config(props, Config),
Expand Down Expand Up @@ -3134,3 +3168,15 @@ retract_element(stanza_id) ->

origin_id() ->
<<"orig-id-1">>.

simple_range(From, To, IsComplete) ->
#{total_count => undefined, offset => undefined,
from => From, to => To, is_complete => IsComplete}.

pagination_test(Name, RSM, Range, Config) ->
P = ?config(props, Config),
F = fun(Alice) ->
rsm_send(Config, Alice, stanza_page_archive_request(P, atom_to_binary(Name), RSM)),
wait_message_range(Alice, Range)
end,
parallel_story(Config, [{alice, 1}], F).
18 changes: 16 additions & 2 deletions big_tests/tests/mam_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@
rsm_send/3,
stanza_page_archive_request/3,
wait_empty_rset/2,
wait_message_range/2,
wait_message_range/3,
message_id/2,
wait_message_range/5,
message_id/2,
stanza_prefs_set_request/4,
stanza_prefs_get_request/1,
stanza_query_get_request/1,
Expand Down Expand Up @@ -823,6 +824,12 @@ wait_message_range(Client, FromN, ToN) ->
wait_message_range(Client, 15, FromN-1, FromN, ToN).

wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
wait_message_range(Client, #{total_count => TotalCount, offset => Offset,
from => FromN, to => ToN}).

wait_message_range(Client, Params = #{total_count := TotalCount, offset := Offset,
from := FromN, to := ToN}) ->
IsComplete = maps:get(is_complete, Params, undefined),
Result = wait_archive_respond(Client),
Messages = respond_messages(Result),
IQ = respond_iq(Result),
Expand All @@ -833,8 +840,13 @@ wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
?assert_equal(TotalCount, ParsedIQ#result_iq.count),
?assert_equal(Offset, ParsedIQ#result_iq.first_index),
%% Compare body of the messages.
?assert_equal([generate_message_text(N) || N <- lists:seq(FromN, ToN)],
?assert_equal([generate_message_text(N) || N <- maybe_seq(FromN, ToN)],
[B || #forwarded_message{message_body=B} <- ParsedMessages]),
case IsComplete of
true -> ?assert_equal(<<"true">>, ParsedIQ#result_iq.complete);
false -> ?assert_equal(<<"false">>, ParsedIQ#result_iq.complete);
undefined -> ok
end,
ok
catch Class:Reason:StackTrace ->
ct:pal("IQ: ~p~n"
Expand All @@ -845,6 +857,8 @@ wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
erlang:raise(Class, Reason, StackTrace)
end.

maybe_seq(undefined, undefined) -> [];
maybe_seq(A, B) -> lists:seq(A, B).

wait_empty_rset(Alice, TotalCount) ->
Result = wait_archive_respond(Alice),
Expand Down
6 changes: 6 additions & 0 deletions src/mam/mod_mam.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
Offset :: non_neg_integer() | undefined,
MessageRows :: [message_row()]}.

-type lookup_result_map() :: #{total_count := TotalCount :: non_neg_integer() | undefined,
offset := Offset :: non_neg_integer() | undefined,
messages := MessageRows :: [message_row()],
is_complete => boolean()}.

%% Internal types
-type iterator_fun() :: fun(() -> {'ok', {_, _}}).
-type rewriter_fun() :: fun((JID :: jid:literal_jid())
Expand Down Expand Up @@ -73,6 +78,7 @@
unix_timestamp/0,
archive_id/0,
lookup_result/0,
lookup_result_map/0,
message_row/0,
message_id/0,
restore_option/0,
Expand Down
11 changes: 5 additions & 6 deletions src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@

%% Other
-import(mod_mam_utils,
[mess_id_to_external_binary/1,
is_complete_result_page/4]).
[mess_id_to_external_binary/1]).

-include_lib("mongoose.hrl").
-include_lib("jlib.hrl").
Expand Down Expand Up @@ -406,7 +405,7 @@ handle_set_message_form(HostType, #jid{} = From, #jid{} = ArcJID, IQ) ->
jlib:iq() | ignore | {error, term(), jlib:iq()}.
do_handle_set_message_form(HostType, From, ArcID, ArcJID, IQ, Params0) ->
Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From),
Result = lookup_messages(HostType, Params),
Result = mod_mam_utils:lookup(HostType, Params, fun lookup_messages/2),
handle_lookup_result(Result, HostType, From, IQ, Params).

-spec handle_lookup_result({ok, mod_mam:lookup_result()} | {error, term()},
Expand All @@ -421,16 +420,16 @@ handle_lookup_result(Result, HostType, From, IQ, #{owner_jid := ArcJID} = Params
send_messages_and_iq_result(Res, HostType, From, IQ, Params)
end.

send_messages_and_iq_result({TotalCount, Offset, MessageRows}, HostType, From,
send_messages_and_iq_result(#{total_count := TotalCount, offset := Offset,
messages := MessageRows, is_complete := IsComplete},
HostType, From,
#iq{xmlns = MamNs, sub_el = QueryEl} = IQ,
#{owner_jid := ArcJID} = Params) ->
%% Forward messages
QueryID = exml_query:attr(QueryEl, <<"queryid">>, <<>>),
{FirstMessID, LastMessID} = forward_messages(HostType, From, ArcJID, MamNs,
QueryID, MessageRows, true),

%% Make fin iq
IsComplete = is_complete_result_page(TotalCount, Offset, MessageRows, Params),
IsStable = true,
ResultSetEl = result_set(FirstMessID, LastMessID, Offset, TotalCount),
ExtFinMod = mod_mam_params:extra_fin_element_module(?MODULE, HostType),
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_muc_cassandra_arch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -757,5 +757,5 @@ db_message_format(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).

-spec pool_name(HostType :: host_type()) -> mongoose_wpool:pool_name().
pool_name(HostType) ->
pool_name(_HostType) ->
default.
9 changes: 4 additions & 5 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@

%% Other
-import(mod_mam_utils,
[mess_id_to_external_binary/1,
is_complete_result_page/4]).
[mess_id_to_external_binary/1]).

%% ejabberd
-import(mod_mam_utils,
Expand Down Expand Up @@ -398,16 +397,16 @@ do_handle_set_message_form(Params0, From, ArcID, ArcJID,
HostType) ->
QueryID = exml_query:attr(QueryEl, <<"queryid">>, <<>>),
Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From),
case lookup_messages(HostType, Params) of
case mod_mam_utils:lookup(HostType, Params, fun lookup_messages/2) of
{error, Reason} ->
report_issue(Reason, mam_lookup_failed, ArcJID, IQ),
return_error_iq(IQ, Reason);
{ok, {TotalCount, Offset, MessageRows}} ->
{ok, #{total_count := TotalCount, offset := Offset, messages := MessageRows,
is_complete := IsComplete}} ->
%% Forward messages
{FirstMessID, LastMessID} = forward_messages(HostType, From, ArcJID, MamNs,
QueryID, MessageRows, true),
%% Make fin iq
IsComplete = is_complete_result_page(TotalCount, Offset, MessageRows, Params),
IsStable = true,
ResultSetEl = result_set(FirstMessID, LastMessID, Offset, TotalCount),
ExtFinMod = mod_mam_params:extra_fin_element_module(?MODULE, HostType),
Expand Down
85 changes: 71 additions & 14 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
calculate_msg_id_borders/3,
calculate_msg_id_borders/4,
maybe_encode_compact_uuid/2,
is_complete_result_page/4,
wait_shaper/4,
check_for_item_not_found/3]).

Expand All @@ -90,7 +89,8 @@
is_jid_in_user_roster/3]).

%% Shared logic
-export([check_result_for_policy_violation/2]).
-export([check_result_for_policy_violation/2,
lookup/3]).

-callback extra_fin_element(mongooseim:host_type(),
mam_iq:lookup_params(),
Expand Down Expand Up @@ -1039,14 +1039,15 @@ maybe_previous_id(X) ->
%% It's the most efficient way to query archive, if the client side does
%% not care about the total number of messages and if it's stateless
%% (i.e. web interface).
-spec is_complete_result_page(TotalCount, Offset, MessageRows, Params) ->
%% Handles case when we have TotalCount and Offset as integers
-spec is_complete_result_page_using_offset(Params, Result) ->
boolean() when
TotalCount :: non_neg_integer()|undefined,
Offset :: non_neg_integer()|undefined,
MessageRows :: list(),
Params :: mam_iq:lookup_params().
is_complete_result_page(TotalCount, Offset, MessageRows,
#{page_size := PageSize} = Params) ->
Params :: mam_iq:lookup_params(),
Result :: mod_mam:lookup_result_map().
is_complete_result_page_using_offset(#{page_size := PageSize} = Params,
#{total_count := TotalCount, offset := Offset,
messages := MessageRows})
when is_integer(TotalCount), is_integer(Offset) ->
case maps:get(ordering_direction, Params, forward) of
forward ->
is_most_recent_page(PageSize, TotalCount, Offset, MessageRows);
Expand Down Expand Up @@ -1155,19 +1156,75 @@ is_policy_violation(TotalCount, Offset, MaxResultLimit, LimitPassed) ->
LookupResult :: mod_mam:lookup_result(),
R :: {ok, mod_mam:lookup_result()} | {error, item_not_found}.
check_for_item_not_found(#rsm_in{direction = before, id = ID},
PageSize, {TotalCount, Offset, MessageRows}) ->
_PageSize, {TotalCount, Offset, MessageRows}) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this parameter is not needed it can be removed, maybe?

Copy link
Contributor Author

@arcusfelis arcusfelis Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course xd
Remained for compat/lazy.
Will get refactored with the tuple. And we probably wanna pass Params, instead of just RSM.

case maybe_last(MessageRows) of
{ok, #{id := ID}} = _IntervalEndpoint ->
Page = lists:sublist(MessageRows, PageSize),
{ok, {TotalCount, Offset, Page}};
{ok, #{id := ID}} ->
{ok, {TotalCount, Offset, list_without_last(MessageRows)}};
undefined ->
{error, item_not_found}
end;
check_for_item_not_found(#rsm_in{direction = aft, id = ID},
_PageSize, {TotalCount, Offset, MessageRows0}) ->
case MessageRows0 of
[#{id := ID} = _IntervalEndpoint | MessageRows] ->
[#{id := ID} | MessageRows] ->
{ok, {TotalCount, Offset, MessageRows}};
_ ->
{error, item_not_found}
end.

-spec lookup(HostType :: mongooseim:host_type(),
Params :: mam_iq:lookup_params(),
F :: fun()) ->
{ok, mod_mam:lookup_result_map()} | {error, Reason :: term()}.
lookup(HostType, Params, F) ->
F1 = patch_fun_to_make_result_as_map(F),
process_lookup_with_complete_check(HostType, Params, F1).

process_lookup_with_complete_check(HostType, Params = #{is_simple := true}, F) ->
process_simple_lookup_with_complete_check(HostType, Params, F);
process_lookup_with_complete_check(HostType, Params, F) ->
case F(HostType, Params) of
{ok, Result} ->
IsComplete = is_complete_result_page_using_offset(Params, Result),
{ok, Result#{is_complete => IsComplete}};
Other ->
Other
end.

patch_fun_to_make_result_as_map(F) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh god, this is patching extreme dynamic typing 😱 anyway, for future major refactorings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew you will love it!
Yea, lookup_result_map should replace the tuple. Bad news is that it is like 50 places we use the tuple :D

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I imagined 😱

fun(HostType, Params) -> result_to_map(F(HostType, Params)) end.

result_to_map({ok, {TotalCount, Offset, MessageRows}}) ->
{ok, #{total_count => TotalCount, offset => Offset, messages => MessageRows}};
result_to_map(Other) ->
Other.

%% We query an extra message by changing page_size.
%% After that we remove this message from the result set when returning.
process_simple_lookup_with_complete_check(HostType, Params = #{page_size := PageSize}, F) ->
Params2 = Params#{page_size => PageSize + 1},
case F(HostType, Params2) of
{ok, Result} ->
{ok, set_complete_result_page_using_extra_message(PageSize, Params, Result)};
Other ->
Other
end.

set_complete_result_page_using_extra_message(PageSize, Params, Result = #{messages := MessageRows}) ->
case length(MessageRows) =:= (PageSize + 1) of
true ->
Result#{is_complete => false, messages => remove_extra_message(Params, MessageRows)};
false ->
Result#{is_complete => true}
end.

remove_extra_message(Params, Messages) ->
case maps:get(ordering_direction, Params, forward) of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be pattern-matched on the function clause, it'll be even faster (see maps performance guide if curious).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, in case key exists - yea

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in the case it does not, a second catch-all clause just throws the default.

forward ->
list_without_last(Messages);
backward ->
tl(Messages)
end.

list_without_last(List) ->
lists:reverse(tl(lists:reverse(List))).