diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index 505f7c2e7eb..e01478545dd 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -71,6 +71,13 @@ pagination_before10/1, pagination_after10/1, pagination_simple_before10/1, + pagination_simple_before3/1, + pagination_simple_before6/1, + pagination_simple_before1_pagesize0/1, + pagination_simple_before2_pagesize0/1, + pagination_simple_after5/1, + pagination_simple_after10/1, + pagination_simple_after12/1, pagination_last_after_id5/1, pagination_last_after_id5_before_id11/1, pagination_empty_rset/1, @@ -178,9 +185,10 @@ rsm_send/3, stanza_page_archive_request/3, wait_empty_rset/2, + wait_message_range/2, wait_message_range/3, - message_id/2, wait_message_range/5, + message_id/2, stanza_prefs_set_request/4, stanza_prefs_get_request/1, stanza_query_get_request/1, @@ -498,6 +506,13 @@ rsm_cases() -> pagination_last_page_after_id4, %% Simple cases pagination_simple_before10, + pagination_simple_before3, + pagination_simple_before6, + pagination_simple_before1_pagesize0, + pagination_simple_before2_pagesize0, + pagination_simple_after5, + pagination_simple_after10, + pagination_simple_after12, %% item_not_found response for nonexistent message ID in before/after filters server_returns_item_not_found_for_before_filter_with_nonexistent_id, server_returns_item_not_found_for_after_filter_with_nonexistent_id, @@ -2477,7 +2492,6 @@ pagination_offset5_max0(Config) -> pagination_before10(Config) -> P = ?config(props, Config), F = fun(Alice) -> - %% Get the last page of size 5. RSM = #rsm_in{max=5, direction=before, id=message_id(10, Config)}, rsm_send(Config, Alice, stanza_page_archive_request(P, <<"before10">>, RSM)), @@ -2487,17 +2501,37 @@ pagination_before10(Config) -> parallel_story(Config, [{alice, 1}], F). pagination_simple_before10(Config) -> - P = ?config(props, Config), - F = fun(Alice) -> - %% Get the last page of size 5. - RSM = #rsm_in{max=5, direction=before, id=message_id(10, Config), simple=true}, - rsm_send(Config, Alice, - stanza_page_archive_request(P, <<"before10">>, RSM)), - %% wait_message_range(Client, TotalCount, Offset, FromN, ToN), - wait_message_range(Alice, undefined, undefined, 5, 9), - ok - end, - parallel_story(Config, [{alice, 1}], F). + RSM = #rsm_in{max = 5, direction = before, id = message_id(10, Config), simple = true}, + pagination_test(before10, RSM, simple_range(5, 9, false), Config). + +pagination_simple_before3(Config) -> + RSM = #rsm_in{max = 5, direction = before, id = message_id(3, Config), simple = true}, + pagination_test(before3, RSM, simple_range(1, 2, true), Config). + +pagination_simple_before6(Config) -> + RSM = #rsm_in{max = 5, direction = before, id = message_id(6, Config), simple = true}, + pagination_test(before6, RSM, simple_range(1, 5, true), Config). + +pagination_simple_before1_pagesize0(Config) -> + %% No messages forwarded, but is_complete is set + RSM = #rsm_in{max = 0, direction = before, id = message_id(1, Config), simple = true}, + pagination_test(before1, RSM, simple_range(undefined, undefined, true), Config). + +pagination_simple_before2_pagesize0(Config) -> + RSM = #rsm_in{max = 0, direction = before, id = message_id(2, Config), simple = true}, + pagination_test(before2, RSM, simple_range(undefined, undefined, false), Config). + +pagination_simple_after5(Config) -> + RSM = #rsm_in{max = 3, direction = 'after', id = message_id(5, Config), simple = true}, + pagination_test(after5, RSM, simple_range(6, 8, false), Config). + +pagination_simple_after10(Config) -> + RSM = #rsm_in{max = 5, direction = 'after', id = message_id(10, Config), simple = true}, + pagination_test(after10, RSM, simple_range(11, 15, true), Config). + +pagination_simple_after12(Config) -> + RSM = #rsm_in{max = 5, direction = 'after', id = message_id(12, Config), simple = true}, + pagination_test(after12, RSM, simple_range(13, 15, true), Config). pagination_after10(Config) -> P = ?config(props, Config), @@ -3134,3 +3168,15 @@ retract_element(stanza_id) -> origin_id() -> <<"orig-id-1">>. + +simple_range(From, To, IsComplete) -> + #{total_count => undefined, offset => undefined, + from => From, to => To, is_complete => IsComplete}. + +pagination_test(Name, RSM, Range, Config) -> + P = ?config(props, Config), + F = fun(Alice) -> + rsm_send(Config, Alice, stanza_page_archive_request(P, atom_to_binary(Name), RSM)), + wait_message_range(Alice, Range) + end, + parallel_story(Config, [{alice, 1}], F). diff --git a/big_tests/tests/mam_helper.erl b/big_tests/tests/mam_helper.erl index 770528eb603..9d22e0a6a1d 100644 --- a/big_tests/tests/mam_helper.erl +++ b/big_tests/tests/mam_helper.erl @@ -84,9 +84,10 @@ rsm_send/3, stanza_page_archive_request/3, wait_empty_rset/2, + wait_message_range/2, wait_message_range/3, - message_id/2, wait_message_range/5, + message_id/2, stanza_prefs_set_request/4, stanza_prefs_get_request/1, stanza_query_get_request/1, @@ -823,6 +824,12 @@ wait_message_range(Client, FromN, ToN) -> wait_message_range(Client, 15, FromN-1, FromN, ToN). wait_message_range(Client, TotalCount, Offset, FromN, ToN) -> + wait_message_range(Client, #{total_count => TotalCount, offset => Offset, + from => FromN, to => ToN}). + +wait_message_range(Client, Params = #{total_count := TotalCount, offset := Offset, + from := FromN, to := ToN}) -> + IsComplete = maps:get(is_complete, Params, undefined), Result = wait_archive_respond(Client), Messages = respond_messages(Result), IQ = respond_iq(Result), @@ -833,8 +840,13 @@ wait_message_range(Client, TotalCount, Offset, FromN, ToN) -> ?assert_equal(TotalCount, ParsedIQ#result_iq.count), ?assert_equal(Offset, ParsedIQ#result_iq.first_index), %% Compare body of the messages. - ?assert_equal([generate_message_text(N) || N <- lists:seq(FromN, ToN)], + ?assert_equal([generate_message_text(N) || N <- maybe_seq(FromN, ToN)], [B || #forwarded_message{message_body=B} <- ParsedMessages]), + case IsComplete of + true -> ?assert_equal(<<"true">>, ParsedIQ#result_iq.complete); + false -> ?assert_equal(<<"false">>, ParsedIQ#result_iq.complete); + undefined -> ok + end, ok catch Class:Reason:StackTrace -> ct:pal("IQ: ~p~n" @@ -845,6 +857,8 @@ wait_message_range(Client, TotalCount, Offset, FromN, ToN) -> erlang:raise(Class, Reason, StackTrace) end. +maybe_seq(undefined, undefined) -> []; +maybe_seq(A, B) -> lists:seq(A, B). wait_empty_rset(Alice, TotalCount) -> Result = wait_archive_respond(Alice), diff --git a/src/mam/mod_mam.erl b/src/mam/mod_mam.erl index 88e0e7d79f7..ac1818c5af2 100644 --- a/src/mam/mod_mam.erl +++ b/src/mam/mod_mam.erl @@ -43,6 +43,11 @@ Offset :: non_neg_integer() | undefined, MessageRows :: [message_row()]}. +-type lookup_result_map() :: #{total_count := TotalCount :: non_neg_integer() | undefined, + offset := Offset :: non_neg_integer() | undefined, + messages := MessageRows :: [message_row()], + is_complete => boolean()}. + %% Internal types -type iterator_fun() :: fun(() -> {'ok', {_, _}}). -type rewriter_fun() :: fun((JID :: jid:literal_jid()) @@ -73,6 +78,7 @@ unix_timestamp/0, archive_id/0, lookup_result/0, + lookup_result_map/0, message_row/0, message_id/0, restore_option/0, diff --git a/src/mam/mod_mam_muc.erl b/src/mam/mod_mam_muc.erl index bd7e6525c19..c173aed162b 100644 --- a/src/mam/mod_mam_muc.erl +++ b/src/mam/mod_mam_muc.erl @@ -89,8 +89,7 @@ %% Other -import(mod_mam_utils, - [mess_id_to_external_binary/1, - is_complete_result_page/4]). + [mess_id_to_external_binary/1]). -include_lib("mongoose.hrl"). -include_lib("jlib.hrl"). @@ -406,7 +405,7 @@ handle_set_message_form(HostType, #jid{} = From, #jid{} = ArcJID, IQ) -> jlib:iq() | ignore | {error, term(), jlib:iq()}. do_handle_set_message_form(HostType, From, ArcID, ArcJID, IQ, Params0) -> Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From), - Result = lookup_messages(HostType, Params), + Result = mod_mam_utils:lookup(HostType, Params, fun lookup_messages/2), handle_lookup_result(Result, HostType, From, IQ, Params). -spec handle_lookup_result({ok, mod_mam:lookup_result()} | {error, term()}, @@ -421,16 +420,16 @@ handle_lookup_result(Result, HostType, From, IQ, #{owner_jid := ArcJID} = Params send_messages_and_iq_result(Res, HostType, From, IQ, Params) end. -send_messages_and_iq_result({TotalCount, Offset, MessageRows}, HostType, From, +send_messages_and_iq_result(#{total_count := TotalCount, offset := Offset, + messages := MessageRows, is_complete := IsComplete}, + HostType, From, #iq{xmlns = MamNs, sub_el = QueryEl} = IQ, #{owner_jid := ArcJID} = Params) -> %% Forward messages QueryID = exml_query:attr(QueryEl, <<"queryid">>, <<>>), {FirstMessID, LastMessID} = forward_messages(HostType, From, ArcJID, MamNs, QueryID, MessageRows, true), - %% Make fin iq - IsComplete = is_complete_result_page(TotalCount, Offset, MessageRows, Params), IsStable = true, ResultSetEl = result_set(FirstMessID, LastMessID, Offset, TotalCount), ExtFinMod = mod_mam_params:extra_fin_element_module(?MODULE, HostType), diff --git a/src/mam/mod_mam_muc_cassandra_arch.erl b/src/mam/mod_mam_muc_cassandra_arch.erl index 4d60a2542d2..29da664ff62 100644 --- a/src/mam/mod_mam_muc_cassandra_arch.erl +++ b/src/mam/mod_mam_muc_cassandra_arch.erl @@ -757,5 +757,5 @@ db_message_format(HostType) -> gen_mod:get_module_opt(HostType, ?MODULE, db_message_format). -spec pool_name(HostType :: host_type()) -> mongoose_wpool:pool_name(). -pool_name(HostType) -> +pool_name(_HostType) -> default. diff --git a/src/mam/mod_mam_pm.erl b/src/mam/mod_mam_pm.erl index 6f2d939fbf6..6588bb2e205 100644 --- a/src/mam/mod_mam_pm.erl +++ b/src/mam/mod_mam_pm.erl @@ -95,8 +95,7 @@ %% Other -import(mod_mam_utils, - [mess_id_to_external_binary/1, - is_complete_result_page/4]). + [mess_id_to_external_binary/1]). %% ejabberd -import(mod_mam_utils, @@ -398,16 +397,16 @@ do_handle_set_message_form(Params0, From, ArcID, ArcJID, HostType) -> QueryID = exml_query:attr(QueryEl, <<"queryid">>, <<>>), Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From), - case lookup_messages(HostType, Params) of + case mod_mam_utils:lookup(HostType, Params, fun lookup_messages/2) of {error, Reason} -> report_issue(Reason, mam_lookup_failed, ArcJID, IQ), return_error_iq(IQ, Reason); - {ok, {TotalCount, Offset, MessageRows}} -> + {ok, #{total_count := TotalCount, offset := Offset, messages := MessageRows, + is_complete := IsComplete}} -> %% Forward messages {FirstMessID, LastMessID} = forward_messages(HostType, From, ArcJID, MamNs, QueryID, MessageRows, true), %% Make fin iq - IsComplete = is_complete_result_page(TotalCount, Offset, MessageRows, Params), IsStable = true, ResultSetEl = result_set(FirstMessID, LastMessID, Offset, TotalCount), ExtFinMod = mod_mam_params:extra_fin_element_module(?MODULE, HostType), diff --git a/src/mam/mod_mam_utils.erl b/src/mam/mod_mam_utils.erl index 987f233b2a3..83222a1f81e 100644 --- a/src/mam/mod_mam_utils.erl +++ b/src/mam/mod_mam_utils.erl @@ -80,7 +80,6 @@ calculate_msg_id_borders/3, calculate_msg_id_borders/4, maybe_encode_compact_uuid/2, - is_complete_result_page/4, wait_shaper/4, check_for_item_not_found/3]). @@ -90,7 +89,8 @@ is_jid_in_user_roster/3]). %% Shared logic --export([check_result_for_policy_violation/2]). +-export([check_result_for_policy_violation/2, + lookup/3]). -callback extra_fin_element(mongooseim:host_type(), mam_iq:lookup_params(), @@ -1039,14 +1039,15 @@ maybe_previous_id(X) -> %% It's the most efficient way to query archive, if the client side does %% not care about the total number of messages and if it's stateless %% (i.e. web interface). --spec is_complete_result_page(TotalCount, Offset, MessageRows, Params) -> +%% Handles case when we have TotalCount and Offset as integers +-spec is_complete_result_page_using_offset(Params, Result) -> boolean() when - TotalCount :: non_neg_integer()|undefined, - Offset :: non_neg_integer()|undefined, - MessageRows :: list(), - Params :: mam_iq:lookup_params(). -is_complete_result_page(TotalCount, Offset, MessageRows, - #{page_size := PageSize} = Params) -> + Params :: mam_iq:lookup_params(), + Result :: mod_mam:lookup_result_map(). +is_complete_result_page_using_offset(#{page_size := PageSize} = Params, + #{total_count := TotalCount, offset := Offset, + messages := MessageRows}) + when is_integer(TotalCount), is_integer(Offset) -> case maps:get(ordering_direction, Params, forward) of forward -> is_most_recent_page(PageSize, TotalCount, Offset, MessageRows); @@ -1155,19 +1156,75 @@ is_policy_violation(TotalCount, Offset, MaxResultLimit, LimitPassed) -> LookupResult :: mod_mam:lookup_result(), R :: {ok, mod_mam:lookup_result()} | {error, item_not_found}. check_for_item_not_found(#rsm_in{direction = before, id = ID}, - PageSize, {TotalCount, Offset, MessageRows}) -> + _PageSize, {TotalCount, Offset, MessageRows}) -> case maybe_last(MessageRows) of - {ok, #{id := ID}} = _IntervalEndpoint -> - Page = lists:sublist(MessageRows, PageSize), - {ok, {TotalCount, Offset, Page}}; + {ok, #{id := ID}} -> + {ok, {TotalCount, Offset, list_without_last(MessageRows)}}; undefined -> {error, item_not_found} end; check_for_item_not_found(#rsm_in{direction = aft, id = ID}, _PageSize, {TotalCount, Offset, MessageRows0}) -> case MessageRows0 of - [#{id := ID} = _IntervalEndpoint | MessageRows] -> + [#{id := ID} | MessageRows] -> {ok, {TotalCount, Offset, MessageRows}}; _ -> {error, item_not_found} end. + +-spec lookup(HostType :: mongooseim:host_type(), + Params :: mam_iq:lookup_params(), + F :: fun()) -> + {ok, mod_mam:lookup_result_map()} | {error, Reason :: term()}. +lookup(HostType, Params, F) -> + F1 = patch_fun_to_make_result_as_map(F), + process_lookup_with_complete_check(HostType, Params, F1). + +process_lookup_with_complete_check(HostType, Params = #{is_simple := true}, F) -> + process_simple_lookup_with_complete_check(HostType, Params, F); +process_lookup_with_complete_check(HostType, Params, F) -> + case F(HostType, Params) of + {ok, Result} -> + IsComplete = is_complete_result_page_using_offset(Params, Result), + {ok, Result#{is_complete => IsComplete}}; + Other -> + Other + end. + +patch_fun_to_make_result_as_map(F) -> + fun(HostType, Params) -> result_to_map(F(HostType, Params)) end. + +result_to_map({ok, {TotalCount, Offset, MessageRows}}) -> + {ok, #{total_count => TotalCount, offset => Offset, messages => MessageRows}}; +result_to_map(Other) -> + Other. + +%% We query an extra message by changing page_size. +%% After that we remove this message from the result set when returning. +process_simple_lookup_with_complete_check(HostType, Params = #{page_size := PageSize}, F) -> + Params2 = Params#{page_size => PageSize + 1}, + case F(HostType, Params2) of + {ok, Result} -> + {ok, set_complete_result_page_using_extra_message(PageSize, Params, Result)}; + Other -> + Other + end. + +set_complete_result_page_using_extra_message(PageSize, Params, Result = #{messages := MessageRows}) -> + case length(MessageRows) =:= (PageSize + 1) of + true -> + Result#{is_complete => false, messages => remove_extra_message(Params, MessageRows)}; + false -> + Result#{is_complete => true} + end. + +remove_extra_message(Params, Messages) -> + case maps:get(ordering_direction, Params, forward) of + forward -> + list_without_last(Messages); + backward -> + tl(Messages) + end. + +list_without_last(List) -> + lists:reverse(tl(lists:reverse(List))).