diff --git a/big_tests/default.spec b/big_tests/default.spec index e8e862774c8..9f4432b9b12 100644 --- a/big_tests/default.spec +++ b/big_tests/default.spec @@ -26,6 +26,7 @@ {suites, "tests", extdisco_SUITE}. {suites, "tests", gdpr_SUITE}. {suites, "tests", graphql_SUITE}. +{suites, "tests", graphql_sse_SUITE}. {suites, "tests", graphql_account_SUITE}. {suites, "tests", graphql_domain_SUITE}. {suites, "tests", graphql_inbox_SUITE}. diff --git a/big_tests/tests/graphql_helper.erl b/big_tests/tests/graphql_helper.erl index f3f880daeaa..791a49c339d 100644 --- a/big_tests/tests/graphql_helper.erl +++ b/big_tests/tests/graphql_helper.erl @@ -17,16 +17,18 @@ execute(EpName, Body, Creds) -> -spec execute(node(), atom(), binary(), {binary(), binary()} | undefined) -> {Status :: tuple(), Data :: map()}. execute(Node, EpName, Body, Creds) -> - Request = - #{port => get_listener_port(Node, EpName), - role => {graphql, EpName}, - method => <<"POST">>, - return_maps => true, - creds => Creds, - path => "/graphql", - body => Body}, + Request = build_request(Node, EpName, Body, Creds), rest_helper:make_request(Request). +build_request(Node, EpName, Body, Creds) -> + #{port => get_listener_port(Node, EpName), + role => {graphql, EpName}, + method => <<"POST">>, + return_maps => true, + creds => Creds, + path => "/graphql", + body => Body}. + execute_sse(EpName, Params, Creds) -> #{node := Node} = mim(), execute_sse(Node, EpName, Params, Creds). @@ -213,6 +215,9 @@ get_unauthorized({Code, #{<<"errors">> := Errors}}) -> get_bad_request({Code, _Msg}) -> assert_response_code(bad_request, Code). +get_method_not_allowed({Code, _Msg}) -> + assert_response_code(method_not_allowed, Code). + get_coercion_err_msg({Code, #{<<"errors">> := [Error]}}) -> assert_response_code(bad_request, Code), ?assertEqual(<<"input_coercion">>, get_value([extensions, code], Error)), @@ -236,8 +241,14 @@ get_ok_value(Path, {Code, Data}) -> assert_response_code(bad_request, {<<"400">>, <<"Bad Request">>}) -> ok; assert_response_code(unauthorized, {<<"401">>, <<"Unauthorized">>}) -> ok; +assert_response_code(method_not_allowed, {<<"405">>, <<"Method Not Allowed">>}) -> ok; assert_response_code(error, {<<"200">>, <<"OK">>}) -> ok; assert_response_code(ok, {<<"200">>, <<"OK">>}) -> ok; +assert_response_code(bad_request, 400) -> ok; +assert_response_code(unauthorized, 401) -> ok; +assert_response_code(method_not_allowed, 405) -> ok; +assert_response_code(error, 200) -> ok; +assert_response_code(ok, 200) -> ok; assert_response_code(bad_request, {exit_status, 1}) -> ok; assert_response_code(error, {exit_status, 1}) -> ok; assert_response_code(ok, {exit_status, 0}) -> ok; diff --git a/big_tests/tests/graphql_sse_SUITE.erl b/big_tests/tests/graphql_sse_SUITE.erl new file mode 100644 index 00000000000..48cb6321e49 --- /dev/null +++ b/big_tests/tests/graphql_sse_SUITE.erl @@ -0,0 +1,138 @@ +%% @doc Tests for the SSE handling of GraphQL subscriptions +-module(graphql_sse_SUITE). + +-compile([export_all, nowarn_export_all]). + +-import(distributed_helper, [mim/0, require_rpc_nodes/1, rpc/4]). +-import(graphql_helper, [get_bad_request/1, get_unauthorized/1, get_method_not_allowed/1, + build_request/4, execute_user_sse/3, build_request/4, make_creds/1, + execute_auth/2, execute_sse/3, execute_auth_sse/2]). + +%% common_test callbacks + +suite() -> + require_rpc_nodes([mim]) ++ escalus:suite(). + +all() -> + [{group, admin}, + {group, user}]. + +groups() -> + [{admin, [parallel], admin_tests()}, + {user, [parallel], user_tests()}]. + +init_per_suite(Config) -> + Config1 = escalus:init_per_suite(Config), + application:ensure_all_started(gun), + Config1. + +end_per_suite(Config) -> + escalus:end_per_suite(Config). + +init_per_group(user, Config) -> + graphql_helper:init_user(Config); +init_per_group(admin, Config) -> + graphql_helper:init_admin_handler(Config). + +end_per_group(user, _Config) -> + escalus_fresh:clean(), + graphql_helper:clean(); +end_per_group(_, _Config) -> + graphql_helper:clean(). + +init_per_testcase(CaseName, Config) -> + escalus:init_per_testcase(CaseName, Config). + +end_per_testcase(CaseName, Config) -> + escalus:end_per_testcase(CaseName, Config). + +admin_tests() -> + [admin_missing_query, + admin_invalid_query_string, + admin_missing_creds, + admin_invalid_creds, + admin_invalid_method, + admin_invalid_operation_type]. + +user_tests() -> + [user_missing_query, + user_invalid_query_string, + user_missing_creds, + user_invalid_creds, + user_invalid_method, + user_invalid_operation_type]. + +%% Test cases and stories + +admin_missing_query(Config) -> + get_bad_request(execute_auth_sse(#{}, Config)). + +user_missing_query(Config) -> + escalus:fresh_story_with_config(Config, [{alice, 1}], fun user_missing_query_story/2). + +user_missing_query_story(Config, Alice) -> + get_bad_request(execute_user_sse(#{}, Alice, Config)). + +admin_invalid_query_string(_Config) -> + Port = graphql_helper:get_listener_port(admin), + get_bad_request(sse_helper:connect_to_sse(Port, "/api/graphql/sse?=invalid", undefined, #{})). + +user_invalid_query_string(Config) -> + escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_query_string_story/1). + +user_invalid_query_string_story(Alice) -> + Port = graphql_helper:get_listener_port(user), + Creds = make_creds(Alice), + get_bad_request(sse_helper:connect_to_sse(Port, "/api/graphql/sse?=invalid", Creds, #{})). + +admin_missing_creds(_Config) -> + get_unauthorized(execute_sse(admin, #{query => doc(), variables => args()}, undefined)). + +user_missing_creds(_Config) -> + get_unauthorized(execute_sse(user, #{query => doc()}, undefined)). + +admin_invalid_creds(_Config) -> + Creds = {<<"invalid">>, <<"creds">>}, + get_unauthorized(execute_sse(admin, #{query => doc(), variables => args()}, Creds)). + +user_invalid_creds(_Config) -> + get_unauthorized(execute_sse(user, #{query => doc()}, {<<"invalid">>, <<"creds">>})). + +admin_invalid_method(_Config) -> + #{node := Node} = mim(), + Request = build_request(Node, admin, #{query => doc(), variables => args()}, undefined), + %% POST was used, while SSE accepts only GET + get_method_not_allowed(rest_helper:make_request(Request#{path => "/graphql/sse"})). + +user_invalid_method(Config) -> + escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_method_story/1). + +user_invalid_method_story(Alice) -> + #{node := Node} = mim(), + Request = build_request(Node, user, #{query => doc()}, make_creds(Alice)), + %% POST was used, while SSE accepts only GET + get_method_not_allowed(rest_helper:make_request(Request#{path => "/graphql/sse"})). + +admin_invalid_operation_type(Config) -> + Creds = graphql_helper:make_admin_creds(admin, Config), + get_bad_request(execute_sse(admin, #{query => query_doc(), variables => args()}, Creds)). + +user_invalid_operation_type(Config) -> + escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_operation_type_story/1). + +user_invalid_operation_type_story(Alice) -> + get_bad_request(execute_sse(user, #{query => query_doc()}, make_creds(Alice))). + +%% Helpers + +%% Subscription - works only with the SSE handler +doc() -> + graphql_helper:get_doc(<<"stanza">>, <<"getMessages">>). + +%% Query - works only with the REST handler +query_doc() -> + graphql_helper:get_doc(<<"stanza">>, <<"getLastMessages">>). + +%% Same args used by both operations - only for Admin +args() -> + #{caller => <<"alice@localhost">>}. diff --git a/big_tests/tests/graphql_stanza_SUITE.erl b/big_tests/tests/graphql_stanza_SUITE.erl index ca623499f53..993cc6d35d8 100644 --- a/big_tests/tests/graphql_stanza_SUITE.erl +++ b/big_tests/tests/graphql_stanza_SUITE.erl @@ -52,7 +52,8 @@ admin_stanza_cases() -> admin_send_stanza_from_unknown_domain]. admin_sse_cases() -> - [admin_get_messages]. + [admin_get_messages, + admin_get_messages_for_unknown_user]. admin_mam_cases() -> [admin_get_last_messages, @@ -164,6 +165,12 @@ admin_send_message_story(Config, Alice, Bob) -> assert_not_empty(StanzaId), escalus:assert(is_message, escalus:wait_for_stanza(Bob)). +admin_get_messages_for_unknown_user(Config) -> + Domain = domain_helper:domain(), + Res = get_messages(<<"baduser@", Domain/binary>>, Config), + ?assertEqual(<<"unknown_user">>, get_err_code(Res)), + ?assertEqual(<<"User does not exist">>, get_err_msg(Res)). + admin_get_messages(Config) -> escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], fun admin_get_messages_story/3). @@ -172,6 +179,9 @@ admin_get_messages_story(Config, Alice, Bob) -> From = escalus_client:full_jid(Alice), To = escalus_client:short_jid(Bob), {200, Stream} = get_messages(To, Config), + %% Presence should be skipped + escalus:send(Alice, escalus_stanza:presence_direct(To, <<"available">>)), + %% Message should be delivered escalus:send(Alice, escalus_stanza:chat(From, To, <<"Hi!">>)), Event = sse_helper:wait_for_event(Stream), #{<<"stanza">> := StanzaBin, <<"sender">> := From} = diff --git a/big_tests/tests/sse_helper.erl b/big_tests/tests/sse_helper.erl index 7ad51d5c1dd..6fc4ef6d762 100644 --- a/big_tests/tests/sse_helper.erl +++ b/big_tests/tests/sse_helper.erl @@ -3,24 +3,28 @@ -compile([export_all, nowarn_export_all]). connect_to_sse(Port, Path, Creds, ExtraOpts) -> + ct:log("Connect to SSE, port: ~p, path: ~s, creds: ~p, extra opts: ~p", + [Port, Path, Creds, ExtraOpts]), Headers = auth_headers(Creds) ++ [{<<"host">>, <<"localhost">>}, {<<"accept">>, <<"text/event-stream">>}], Opts = maps:merge(basic_opts(), ExtraOpts), {ok, ConnPid} = gun:open("localhost", Port, Opts), {ok, _} = gun:await_up(ConnPid), StreamRef = gun:get(ConnPid, Path, Headers), - {response, nofin, Status, _Headers} = gun:await(ConnPid, StreamRef), - Result = case Status of - 200 -> + {response, nofin, Status, RespHeaders} = gun:await(ConnPid, StreamRef), + Result = case {Status, maps:from_list(RespHeaders)} of + {200, #{<<"content-type">> := <<"text/event-stream">>}} -> #{pid => ConnPid, stream_ref => StreamRef}; _ -> {ok, Body} = gun:await_body(ConnPid, StreamRef), jiffy:decode(Body, [return_maps]) end, + ct:log("SSE response code: ~p, headers: ~p, result: ~p", [Status, RespHeaders, Result]), {Status, Result}. wait_for_event(#{pid := Pid, stream_ref := StreamRef}) -> - {sse, #{data := [Response]}} = gun:await(Pid, StreamRef), + {sse, #{data := [Response]} = Event} = gun:await(Pid, StreamRef), + ct:log("Received SSE event: ~p", [Event]), jiffy:decode(Response, [return_maps]). stop_sse(#{pid := Pid}) -> diff --git a/priv/graphql/schemas/admin/stanza.gql b/priv/graphql/schemas/admin/stanza.gql index e6407ad345e..e17b315a9f8 100644 --- a/priv/graphql/schemas/admin/stanza.gql +++ b/priv/graphql/schemas/admin/stanza.gql @@ -20,6 +20,8 @@ type StanzaAdminMutation @protected{ @protected(type: GLOBAL) } -type StanzaAdminSubscription { +type StanzaAdminSubscription @protected{ + "Subscribe for the given user's incoming messages" getMessages(caller: JID!): StanzaMap + @protected(type: DOMAIN, args: ["caller"]) } diff --git a/priv/graphql/schemas/user/stanza.gql b/priv/graphql/schemas/user/stanza.gql index dbbc2f4ad3d..b18908feb8f 100644 --- a/priv/graphql/schemas/user/stanza.gql +++ b/priv/graphql/schemas/user/stanza.gql @@ -19,6 +19,7 @@ type StanzaUserMutation @protected{ sendStanza(stanza: Stanza): SendStanzaPayload } -type StanzaUserSubscription { +type StanzaUserSubscription @protected{ + "Subscribe to incoming messages" getMessages: StanzaMap } diff --git a/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl b/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl index b6e2d18c61e..65893bfe255 100644 --- a/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl +++ b/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl @@ -13,8 +13,14 @@ execute(Ctx, _Obj, <<"getMessages">>, Args) -> get_messages(Ctx, Args). get_messages(#{event := terminate, stream := Session}, _) -> - {ok, null, [{stream, mongoose_stanza_event_api:close_session(Session)}]}; + mongoose_stanza_api:close_session(Session), + {ok, null, [{stream, closed}]}; get_messages(#{event := Event}, _) -> mongoose_graphql_stanza_helper:handle_event(Event); get_messages(_Ctx, #{<<"caller">> := Jid}) -> - {ok, null, [{stream, mongoose_stanza_event_api:open_session(Jid)}]}. + case mongoose_stanza_api:open_session(Jid, true) of + {ok, Stream} -> + {ok, null, [{stream, Stream}]}; + Error -> + format_result(Error, #{caller => Jid}) + end. diff --git a/src/graphql/mongoose_graphql_errors.erl b/src/graphql/mongoose_graphql_errors.erl index 385b066bfa3..6a67f74b4ee 100644 --- a/src/graphql/mongoose_graphql_errors.erl +++ b/src/graphql/mongoose_graphql_errors.erl @@ -121,6 +121,8 @@ decode_err_msg(no_query_supplied) -> "The query was not supplied in the request body"; decode_err_msg(invalid_json_body) -> "The request JSON body is invalid"; +decode_err_msg(invalid_query_parameters) -> + "The query string is invalid"; decode_err_msg(variables_invalid_json) -> "The variables' JSON is invalid". diff --git a/src/graphql/mongoose_graphql_sse_handler.erl b/src/graphql/mongoose_graphql_sse_handler.erl index c8b28805954..cea54be9dd4 100644 --- a/src/graphql/mongoose_graphql_sse_handler.erl +++ b/src/graphql/mongoose_graphql_sse_handler.erl @@ -54,15 +54,21 @@ handle_notify(Msg, State) -> handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) -> Ctx1 = Ctx#{event => Event}, - case mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}) of - {ok, #{data := null}} -> + {ok, #{data := Data} = Response} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), + case has_non_null_value(Data) of + false -> {nosend, State}; - {ok, #{} = Response} -> - Data = mongoose_graphql_response:term_to_json(Response), - {send, #{id => integer_to_binary(Id), data => Data}, + true -> + EventData = mongoose_graphql_response:term_to_json(Response), + {send, #{id => integer_to_binary(Id), data => EventData}, State#{id := Id + 1}} end. +%% Check if there is any value that is non-null. Any list is considered non-null. +has_non_null_value(M) when is_map(M) -> + lists:any(fun has_non_null_value/1, maps:values(M)); +has_non_null_value(V) -> V =/= null. + handle_error(Msg, Reason, State) -> ?LOG_ERROR(#{what => mongoose_graphql_sse_handler_failed, reason => Reason, text => Msg}), diff --git a/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl b/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl index ac4594090b3..e209c959299 100644 --- a/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl +++ b/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl @@ -13,8 +13,10 @@ execute(Ctx, _Obj, <<"getMessages">>, #{}) -> get_messages(Ctx). get_messages(#{event := terminate, stream := Session}) -> - {ok, null, [{stream, mongoose_stanza_event_api:close_session(Session)}]}; + mongoose_stanza_api:close_session(Session), + {ok, null, [{stream, closed}]}; get_messages(#{event := Event}) -> mongoose_graphql_stanza_helper:handle_event(Event); get_messages(#{user := Jid}) -> - {ok, null, [{stream, mongoose_stanza_event_api:open_session(Jid)}]}. + {ok, Stream} = mongoose_stanza_api:open_session(Jid, false), + {ok, null, [{stream, Stream}]}. diff --git a/src/mongoose_client_api/mongoose_client_api_sse.erl b/src/mongoose_client_api/mongoose_client_api_sse.erl index e62b1c15910..f75f32296be 100644 --- a/src/mongoose_client_api/mongoose_client_api_sse.erl +++ b/src/mongoose_client_api/mongoose_client_api_sse.erl @@ -24,7 +24,7 @@ init(_InitArgs, _LastEvtId, Req) -> maybe_init(Authorization, Req2, State#{id => 1}). maybe_init(true, Req, #{jid := JID} = State) -> - Session = mongoose_stanza_event_api:open_session(JID), + Session = mongoose_stanza_api:open_session(JID, false), {ok, Req, State#{session => Session}}; maybe_init(true, Req, State) -> %% This is for OPTIONS method @@ -54,7 +54,7 @@ handle_error(Msg, Reason, State) -> {nosend, State}. terminate(_Reason, _Req, #{session := Session}) -> - mongoose_stanza_event_api:close_session(Session), + mongoose_stanza_api:close_session(Session), ok; terminate(_Reason, _Req, _State) -> ok. diff --git a/src/mongoose_stanza_api.erl b/src/mongoose_stanza_api.erl index 03adb04843b..025a2c5de1e 100644 --- a/src/mongoose_stanza_api.erl +++ b/src/mongoose_stanza_api.erl @@ -1,10 +1,19 @@ -module(mongoose_stanza_api). + +%% API -export([send_chat_message/4, send_headline_message/5, send_stanza/2, lookup_recent_messages/5]). +%% Event API +-export([open_session/2, close_session/1]). + -include("jlib.hrl"). -include("mongoose_rsm.hrl"). -include("mongoose_logger.hrl"). +-type stream() :: #{jid := jid:jid(), + sid := ejabberd_sm:sid(), + host_type := mongooseim:host_type()}. + %% API -spec send_chat_message(User :: jid:jid() | undefined, From :: jid:jid() | undefined, @@ -40,6 +49,23 @@ lookup_recent_messages(User, With, Before, Limit, CheckUser) -> M = #{user => User, with => With, before => Before, limit => Limit, check_user => CheckUser}, fold(M, [fun get_host_type/1, fun check_user/1, fun lookup_messages/1]). +%% Event API + +-spec open_session(jid:jid(), boolean()) -> {unknown_user, iodata()} | {ok, stream()}. +open_session(User, CheckUser) -> + M = #{user => User, check_user => CheckUser}, + fold(M, [fun get_host_type/1, fun check_user/1, fun do_open_session/1]). + +-spec close_session(stream()) -> {ok, closed}. +close_session(#{jid := Jid = #jid{lserver = S}, sid := Sid, host_type := HostType}) -> + Acc = mongoose_acc:new( + #{location => ?LOCATION, + lserver => S, + host_type => HostType, + element => undefined}), + ejabberd_sm:close_session(Acc, Sid, Jid, normal), + {ok, closed}. + %% Steps %% @doc Determine the user's bare JID and the 'from' JID, and check if they match @@ -142,6 +168,14 @@ lookup_messages(#{user := UserJid, with := WithJid, before := Before, limit := L {ok, {_, _, Rows}} = mod_mam_pm:lookup_messages(HostType, Params), {ok, {Rows, Limit2}}. +do_open_session(#{host_type := HostType, user := JID}) -> + SID = ejabberd_sm:make_new_sid(), + UUID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), + Resource = <<"sse-", UUID/binary>>, + NewJid = jid:replace_resource(JID, Resource), + ejabberd_sm:open_session(HostType, SID, NewJid, 1, #{}), + {ok, #{sid => SID, jid => NewJid, host_type => HostType}}. + %% Helpers -spec build_chat_message(jid:literal_jid(), jid:literal_jid(), binary()) -> exml:element(). diff --git a/src/mongoose_stanza_event_api.erl b/src/mongoose_stanza_event_api.erl deleted file mode 100644 index 11c81bb507d..00000000000 --- a/src/mongoose_stanza_event_api.erl +++ /dev/null @@ -1,24 +0,0 @@ --module(mongoose_stanza_event_api). - --export([open_session/1, close_session/1]). - --include("mongoose.hrl"). --include("jlib.hrl"). - -open_session(JID = #jid{lserver = Domain}) -> - SID = ejabberd_sm:make_new_sid(), - UUID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), - Resource = <<"sse-", UUID/binary>>, - NewJid = jid:replace_resource(JID, Resource), - {ok, HostType} = mongoose_domain_api:get_domain_host_type(Domain), - ejabberd_sm:open_session(HostType, SID, NewJid, 1, #{}), - #{sid => SID, jid => NewJid, host_type => HostType}. - -close_session(#{jid := Jid = #jid{lserver = S}, sid := Sid, host_type := HostType}) -> - Acc = mongoose_acc:new( - #{location => ?LOCATION, - lserver => S, - host_type => HostType, - element => undefined}), - ejabberd_sm:close_session(Acc, Sid, Jid, normal), - closed.