diff --git a/big_tests/default.spec b/big_tests/default.spec index e8e862774c8..9f4432b9b12 100644 --- a/big_tests/default.spec +++ b/big_tests/default.spec @@ -26,6 +26,7 @@ {suites, "tests", extdisco_SUITE}. {suites, "tests", gdpr_SUITE}. {suites, "tests", graphql_SUITE}. +{suites, "tests", graphql_sse_SUITE}. {suites, "tests", graphql_account_SUITE}. {suites, "tests", graphql_domain_SUITE}. {suites, "tests", graphql_inbox_SUITE}. diff --git a/big_tests/dynamic_domains.spec b/big_tests/dynamic_domains.spec index f83e7ed9e60..af849042f52 100644 --- a/big_tests/dynamic_domains.spec +++ b/big_tests/dynamic_domains.spec @@ -42,6 +42,7 @@ "at the moment mod_pubsub doesn't support dynamic domains"}. {suites, "tests", graphql_SUITE}. +{suites, "tests", graphql_sse_SUITE}. {suites, "tests", graphql_account_SUITE}. {suites, "tests", graphql_domain_SUITE}. {suites, "tests", graphql_inbox_SUITE}. diff --git a/big_tests/tests/graphql_helper.erl b/big_tests/tests/graphql_helper.erl index 09d00ea3d98..791a49c339d 100644 --- a/big_tests/tests/graphql_helper.erl +++ b/big_tests/tests/graphql_helper.erl @@ -17,20 +17,40 @@ execute(EpName, Body, Creds) -> -spec execute(node(), atom(), binary(), {binary(), binary()} | undefined) -> {Status :: tuple(), Data :: map()}. execute(Node, EpName, Body, Creds) -> - Request = - #{port => get_listener_port(Node, EpName), - role => {graphql, EpName}, - method => <<"POST">>, - return_maps => true, - creds => Creds, - path => "/graphql", - body => Body}, + Request = build_request(Node, EpName, Body, Creds), rest_helper:make_request(Request). +build_request(Node, EpName, Body, Creds) -> + #{port => get_listener_port(Node, EpName), + role => {graphql, EpName}, + method => <<"POST">>, + return_maps => true, + creds => Creds, + path => "/graphql", + body => Body}. + +execute_sse(EpName, Params, Creds) -> + #{node := Node} = mim(), + execute_sse(Node, EpName, Params, Creds). + +execute_sse(Node, EpName, Params, Creds) -> + Port = get_listener_port(Node, EpName), + Path = "/api/graphql/sse", + QS = uri_string:compose_query([{atom_to_binary(K), encode_sse_value(V)} + || {K, V} <- maps:to_list(Params)]), + sse_helper:connect_to_sse(Port, [Path, "?", QS], Creds, #{}). + +encode_sse_value(M) when is_map(M) -> jiffy:encode(M); +encode_sse_value(V) when is_binary(V) -> V. + execute_user_command(Category, Command, User, Args, Config) -> - #{Category := #{commands := #{Command := #{doc := Doc}}}} = get_specs(), + Doc = get_doc(Category, Command), execute_user(#{query => Doc, variables => Args}, User, Config). +execute_user_command_sse(Category, Command, User, Args, Config) -> + Doc = get_doc(Category, Command), + execute_user_sse(#{query => Doc, variables => Args}, User, Config). + execute_command(Category, Command, Args, Config) -> #{node := Node} = mim(), Protocol = ?config(protocol, Config), @@ -40,9 +60,13 @@ execute_command(Node, Category, Command, Args, Config) -> Protocol = ?config(protocol, Config), execute_command(Node, Category, Command, Args, Config, Protocol). +execute_command_sse(Category, Command, Args, Config) -> + Doc = get_doc(Category, Command), + execute_auth_sse(#{query => Doc, variables => Args}, Config). + %% Admin commands can be executed as GraphQL over HTTP or with CLI (mongooseimctl) execute_command(Node, Category, Command, Args, Config, http) -> - #{Category := #{commands := #{Command := #{doc := Doc}}}} = get_specs(), + Doc = get_doc(Category, Command), execute_auth(Node, #{query => Doc, variables => Args}, Config); execute_command(Node, Category, Command, Args, Config, cli) -> CLIArgs = encode_cli_args(Args), @@ -50,6 +74,10 @@ execute_command(Node, Category, Command, Args, Config, cli) -> = mongooseimctl_helper:mongooseimctl(Node, Category, [Command | CLIArgs], Config), {{exit_status, Code}, rest_helper:decode(Result, #{return_maps => true})}. +get_doc(Category, Command) -> + #{Category := #{commands := #{Command := #{doc := Doc}}}} = get_specs(), + Doc. + encode_cli_args(Args) -> lists:flatmap(fun({Name, Value}) -> encode_cli_arg(Name, Value) end, maps:to_list(Args)). encode_cli_arg(_Name, null) -> @@ -71,14 +99,22 @@ execute_auth(Body, Config) -> execute_auth(Node, Body, Config). execute_auth(Node, Body, Config) -> - case Ep = ?config(schema_endpoint, Config) of - admin -> - #{username := Username, password := Password} = get_listener_opts(Ep), - execute(Node, Ep, Body, {Username, Password}); - domain_admin -> - Creds = ?config(domain_admin, Config), - execute(Node, Ep, Body, Creds) - end. + Ep = ?config(schema_endpoint, Config), + execute(Node, Ep, Body, make_admin_creds(Ep, Config)). + +execute_auth_sse(Body, Config) -> + #{node := Node} = mim(), + execute_auth_sse(Node, Body, Config). + +execute_auth_sse(Node, Body, Config) -> + Ep = ?config(schema_endpoint, Config), + execute_sse(Node, Ep, Body, make_admin_creds(Ep, Config)). + +make_admin_creds(admin = Ep, _Config) -> + #{username := Username, password := Password} = get_listener_opts(Ep), + {Username, Password}; +make_admin_creds(domain_admin, Config) -> + ?config(domain_admin, Config). execute_user(Body, User, Config) -> Ep = ?config(schema_endpoint, Config), @@ -86,6 +122,12 @@ execute_user(Body, User, Config) -> #{node := Node} = mim(), execute(Node, Ep, Body, Creds). +execute_user_sse(Body, User, Config) -> + Ep = ?config(schema_endpoint, Config), + Creds = make_creds(User), + #{node := Node} = mim(), + execute_sse(Node, Ep, Body, Creds). + -spec get_listener_port(binary()) -> integer(). get_listener_port(EpName) -> #{node := Node} = mim(), @@ -173,6 +215,9 @@ get_unauthorized({Code, #{<<"errors">> := Errors}}) -> get_bad_request({Code, _Msg}) -> assert_response_code(bad_request, Code). +get_method_not_allowed({Code, _Msg}) -> + assert_response_code(method_not_allowed, Code). + get_coercion_err_msg({Code, #{<<"errors">> := [Error]}}) -> assert_response_code(bad_request, Code), ?assertEqual(<<"input_coercion">>, get_value([extensions, code], Error)), @@ -196,8 +241,14 @@ get_ok_value(Path, {Code, Data}) -> assert_response_code(bad_request, {<<"400">>, <<"Bad Request">>}) -> ok; assert_response_code(unauthorized, {<<"401">>, <<"Unauthorized">>}) -> ok; +assert_response_code(method_not_allowed, {<<"405">>, <<"Method Not Allowed">>}) -> ok; assert_response_code(error, {<<"200">>, <<"OK">>}) -> ok; assert_response_code(ok, {<<"200">>, <<"OK">>}) -> ok; +assert_response_code(bad_request, 400) -> ok; +assert_response_code(unauthorized, 401) -> ok; +assert_response_code(method_not_allowed, 405) -> ok; +assert_response_code(error, 200) -> ok; +assert_response_code(ok, 200) -> ok; assert_response_code(bad_request, {exit_status, 1}) -> ok; assert_response_code(error, {exit_status, 1}) -> ok; assert_response_code(ok, {exit_status, 0}) -> ok; diff --git a/big_tests/tests/graphql_sse_SUITE.erl b/big_tests/tests/graphql_sse_SUITE.erl new file mode 100644 index 00000000000..29a039c9735 --- /dev/null +++ b/big_tests/tests/graphql_sse_SUITE.erl @@ -0,0 +1,138 @@ +%% @doc Tests for the SSE handling of GraphQL subscriptions +-module(graphql_sse_SUITE). + +-compile([export_all, nowarn_export_all]). + +-import(distributed_helper, [mim/0, require_rpc_nodes/1, rpc/4]). +-import(graphql_helper, [get_bad_request/1, get_unauthorized/1, get_method_not_allowed/1, + build_request/4, make_creds/1, execute_auth/2, + execute_sse/3, execute_user_sse/3, execute_auth_sse/2]). + +%% common_test callbacks + +suite() -> + require_rpc_nodes([mim]) ++ escalus:suite(). + +all() -> + [{group, admin}, + {group, user}]. + +groups() -> + [{admin, [parallel], admin_tests()}, + {user, [parallel], user_tests()}]. + +init_per_suite(Config) -> + Config1 = escalus:init_per_suite(Config), + application:ensure_all_started(gun), + Config1. + +end_per_suite(Config) -> + escalus:end_per_suite(Config). + +init_per_group(user, Config) -> + graphql_helper:init_user(Config); +init_per_group(admin, Config) -> + graphql_helper:init_admin_handler(Config). + +end_per_group(user, _Config) -> + escalus_fresh:clean(), + graphql_helper:clean(); +end_per_group(admin, _Config) -> + graphql_helper:clean(). + +init_per_testcase(CaseName, Config) -> + escalus:init_per_testcase(CaseName, Config). + +end_per_testcase(CaseName, Config) -> + escalus:end_per_testcase(CaseName, Config). + +admin_tests() -> + [admin_missing_query, + admin_invalid_query_string, + admin_missing_creds, + admin_invalid_creds, + admin_invalid_method, + admin_invalid_operation_type]. + +user_tests() -> + [user_missing_query, + user_invalid_query_string, + user_missing_creds, + user_invalid_creds, + user_invalid_method, + user_invalid_operation_type]. + +%% Test cases and stories + +admin_missing_query(Config) -> + get_bad_request(execute_auth_sse(#{}, Config)). + +user_missing_query(Config) -> + escalus:fresh_story_with_config(Config, [{alice, 1}], fun user_missing_query_story/2). + +user_missing_query_story(Config, Alice) -> + get_bad_request(execute_user_sse(#{}, Alice, Config)). + +admin_invalid_query_string(_Config) -> + Port = graphql_helper:get_listener_port(admin), + get_bad_request(sse_helper:connect_to_sse(Port, "/api/graphql/sse?=invalid", undefined, #{})). + +user_invalid_query_string(Config) -> + escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_query_string_story/1). + +user_invalid_query_string_story(Alice) -> + Port = graphql_helper:get_listener_port(user), + Creds = make_creds(Alice), + get_bad_request(sse_helper:connect_to_sse(Port, "/api/graphql/sse?=invalid", Creds, #{})). + +admin_missing_creds(_Config) -> + get_unauthorized(execute_sse(admin, #{query => doc(), variables => args()}, undefined)). + +user_missing_creds(_Config) -> + get_unauthorized(execute_sse(user, #{query => doc()}, undefined)). + +admin_invalid_creds(_Config) -> + Creds = {<<"invalid">>, <<"creds">>}, + get_unauthorized(execute_sse(admin, #{query => doc(), variables => args()}, Creds)). + +user_invalid_creds(_Config) -> + get_unauthorized(execute_sse(user, #{query => doc()}, {<<"invalid">>, <<"creds">>})). + +admin_invalid_method(_Config) -> + #{node := Node} = mim(), + Request = build_request(Node, admin, #{query => doc(), variables => args()}, undefined), + %% POST was used, while SSE accepts only GET + get_method_not_allowed(rest_helper:make_request(Request#{path => "/graphql/sse"})). + +user_invalid_method(Config) -> + escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_method_story/1). + +user_invalid_method_story(Alice) -> + #{node := Node} = mim(), + Request = build_request(Node, user, #{query => doc()}, make_creds(Alice)), + %% POST was used, while SSE accepts only GET + get_method_not_allowed(rest_helper:make_request(Request#{path => "/graphql/sse"})). + +admin_invalid_operation_type(Config) -> + Creds = graphql_helper:make_admin_creds(admin, Config), + get_bad_request(execute_sse(admin, #{query => query_doc(), variables => args()}, Creds)). + +user_invalid_operation_type(Config) -> + escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_operation_type_story/1). + +user_invalid_operation_type_story(Alice) -> + get_bad_request(execute_sse(user, #{query => query_doc()}, make_creds(Alice))). + +%% Helpers + +%% Subscription - works only with the SSE handler +doc() -> + graphql_helper:get_doc(<<"stanza">>, <<"getMessages">>). + +%% Query - works only with the REST handler +query_doc() -> + graphql_helper:get_doc(<<"stanza">>, <<"getLastMessages">>). + +%% Same args used by both operations - only for Admin +args() -> + #{caller => <<"alice@localhost">>}. diff --git a/big_tests/tests/graphql_stanza_SUITE.erl b/big_tests/tests/graphql_stanza_SUITE.erl index 88ca4fef9bc..993cc6d35d8 100644 --- a/big_tests/tests/graphql_stanza_SUITE.erl +++ b/big_tests/tests/graphql_stanza_SUITE.erl @@ -1,6 +1,5 @@ -module(graphql_stanza_SUITE). --include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("exml/include/exml.hrl"). @@ -8,7 +7,9 @@ -import(distributed_helper, [mim/0, require_rpc_nodes/1]). -import(graphql_helper, [execute_user_command/5, execute_command/4, - get_ok_value/2, get_err_code/1, get_err_msg/1, get_coercion_err_msg/1, + execute_user_command_sse/5, execute_command_sse/4, + get_ok_value/2, get_value/2, + get_err_code/1, get_err_msg/1, get_coercion_err_msg/1, get_unauthorized/1, get_not_loaded/1]). suite() -> @@ -22,19 +23,25 @@ all() -> groups() -> [{admin_stanza_http, [], [{group, admin_mam}, - {group, admin_no_mam}]}, + {group, admin_no_mam_http}]}, {admin_stanza_cli, [], [{group, admin_mam}, - {group, admin_no_mam}]}, + {group, admin_no_mam_cli}]}, {domain_admin_stanza, [], [{group, admin_mam}, % same as for admin {group, domain_admin_no_mam}]}, {user_stanza, [], [{group, user_mam}, {group, user_no_mam}]}, {admin_mam, [parallel], admin_mam_cases()}, - {admin_no_mam, [parallel], admin_stanza_cases() ++ admin_no_mam_cases()}, + {admin_no_mam_http, [parallel], admin_stanza_cases(http) ++ admin_no_mam_cases()}, + {admin_no_mam_cli, [], admin_stanza_cases(cli) ++ admin_no_mam_cases()}, {domain_admin_no_mam, [parallel], domain_admin_stanza_cases() ++ admin_no_mam_cases()}, {user_mam, [parallel], user_mam_cases()}, {user_no_mam, [parallel], user_stanza_cases() ++ user_no_mam_cases()}]. +admin_stanza_cases(cli) -> + admin_stanza_cases(); +admin_stanza_cases(http) -> + admin_stanza_cases() ++ admin_sse_cases(). + admin_stanza_cases() -> [admin_send_message, admin_send_message_to_unparsable_jid, @@ -44,6 +51,10 @@ admin_stanza_cases() -> admin_send_stanza_from_unknown_user, admin_send_stanza_from_unknown_domain]. +admin_sse_cases() -> + [admin_get_messages, + admin_get_messages_for_unknown_user]. + admin_mam_cases() -> [admin_get_last_messages, admin_get_last_messages_for_unknown_user, @@ -69,6 +80,7 @@ domain_admin_stanza_cases() -> user_stanza_cases() -> [user_send_message, + user_get_messages, user_send_message_without_from, user_send_message_with_spoofed_from, user_send_message_headline, @@ -89,7 +101,6 @@ init_per_suite(Config) -> dynamic_modules:save_modules(domain_helper:host_type(), Config2). end_per_suite(Config) -> - escalus_fresh:clean(), dynamic_modules:restore_modules(Config), escalus:end_per_suite(Config). @@ -105,7 +116,8 @@ init_per_group(GN, Config) when GN =:= admin_mam; GN =:= domain_admin_mam; GN =:= user_mam -> init_mam(Config); -init_per_group(GN, Config) when GN =:= admin_no_mam; +init_per_group(GN, Config) when GN =:= admin_no_mam_http; + GN =:= admin_no_mam_cli; GN =:= domain_admin_no_mam; GN =:= user_no_mam -> Mods = [{mod_mam, stopped}], @@ -118,7 +130,7 @@ end_per_group(GN, _Config) when GN =:= admin_stanza_http; GN =:= user_stanza -> graphql_helper:clean(); end_per_group(_GN, _Config) -> - ok. + escalus_fresh:clean(). init_per_testcase(CaseName, Config) -> escalus:init_per_testcase(CaseName, Config). @@ -153,6 +165,31 @@ admin_send_message_story(Config, Alice, Bob) -> assert_not_empty(StanzaId), escalus:assert(is_message, escalus:wait_for_stanza(Bob)). +admin_get_messages_for_unknown_user(Config) -> + Domain = domain_helper:domain(), + Res = get_messages(<<"baduser@", Domain/binary>>, Config), + ?assertEqual(<<"unknown_user">>, get_err_code(Res)), + ?assertEqual(<<"User does not exist">>, get_err_msg(Res)). + +admin_get_messages(Config) -> + escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], + fun admin_get_messages_story/3). + +admin_get_messages_story(Config, Alice, Bob) -> + From = escalus_client:full_jid(Alice), + To = escalus_client:short_jid(Bob), + {200, Stream} = get_messages(To, Config), + %% Presence should be skipped + escalus:send(Alice, escalus_stanza:presence_direct(To, <<"available">>)), + %% Message should be delivered + escalus:send(Alice, escalus_stanza:chat(From, To, <<"Hi!">>)), + Event = sse_helper:wait_for_event(Stream), + #{<<"stanza">> := StanzaBin, <<"sender">> := From} = + graphql_helper:get_value([data, stanza, getMessages], Event), + {ok, Stanza} = exml:parse(StanzaBin), + escalus:assert(is_message, Stanza), + sse_helper:stop_sse(Stream). + user_send_message(Config) -> escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], fun user_send_message_story/3). @@ -165,6 +202,22 @@ user_send_message_story(Config, Alice, Bob) -> assert_not_empty(StanzaId), escalus:assert(is_message, escalus:wait_for_stanza(Bob)). +user_get_messages(Config) -> + escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], + fun user_get_messages_story/3). + +user_get_messages_story(Config, Alice, Bob) -> + From = escalus_client:full_jid(Alice), + To = escalus_client:short_jid(Bob), + {200, Stream} = user_get_messages(Bob, Config), + escalus:send(Alice, escalus_stanza:chat(From, To, <<"Hi!">>)), + Event = sse_helper:wait_for_event(Stream), + #{<<"stanza">> := StanzaBin, <<"sender">> := From} = + graphql_helper:get_value([data, stanza, getMessages], Event), + {ok, Stanza} = exml:parse(StanzaBin), + escalus:assert(is_message, Stanza), + sse_helper:stop_sse(Stream). + user_send_message_without_from(Config) -> escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], fun user_send_message_without_from_story/3). @@ -527,6 +580,13 @@ user_send_stanza(User, Stanza, Config) -> Vars = #{stanza => Stanza}, execute_user_command(<<"stanza">>, <<"sendStanza">>, User, Vars, Config). +get_messages(Caller, Config) -> + Vars = #{caller => Caller}, + execute_command_sse(<<"stanza">>, <<"getMessages">>, Vars, Config). + +user_get_messages(User, Config) -> + execute_user_command_sse(<<"stanza">>, <<"getMessages">>, User, #{}, Config). + get_last_messages(Caller, With, Before, Config) -> Vars = #{caller => Caller, with => With, before => Before}, execute_command(<<"stanza">>, <<"getLastMessages">>, Vars, Config). diff --git a/big_tests/tests/rest_client_SUITE.erl b/big_tests/tests/rest_client_SUITE.erl index 0e75d5e3e89..6655ef838bb 100644 --- a/big_tests/tests/rest_client_SUITE.erl +++ b/big_tests/tests/rest_client_SUITE.erl @@ -228,15 +228,12 @@ msg_is_sent_and_delivered_over_sse(ConfigIn) -> Bob = escalus_users:get_userspec(Config, bob), Alice = escalus_users:get_userspec(Config, alice), - Conn = connect_to_sse({alice, Alice}), + {200, Conn} = connect_to_sse({alice, Alice}), M = send_message(bob, Bob, Alice), - Event = wait_for_event(Conn), - Data = jiffy:decode(maps:get(data, Event), [return_maps]), - - assert_json_message(M, Data), - - stop_sse(Conn). + Event = sse_helper:wait_for_event(Conn), + assert_json_message(M, Event), + sse_helper:stop_sse(Conn). message_sending_errors(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> @@ -258,31 +255,27 @@ room_msg_is_sent_and_delivered_over_sse(ConfigIn) -> RoomID = given_new_room_with_users({alice, Alice}, [{bob, Bob}]), RoomInfo = get_room_info({alice, Alice}, RoomID), true = is_participant(Bob, <<"member">>, RoomInfo), - Conn = connect_to_sse({bob, Bob}), + {200, Conn} = connect_to_sse({bob, Bob}), Message = given_message_sent_to_room(RoomID, {alice, Alice}), - Event = wait_for_event(Conn), - Data = jiffy:decode(maps:get(data, Event), [return_maps]), - assert_json_room_sse_message(Message#{room => RoomID, type => <<"message">>}, - Data), - stop_sse(Conn). + Event = sse_helper:wait_for_event(Conn), + assert_json_room_sse_message(Message#{room => RoomID, type => <<"message">>}, Event), + sse_helper:stop_sse(Conn). aff_change_msg_is_delivered_over_sse(ConfigIn) -> Config = escalus_fresh:create_users(ConfigIn, [{alice, 1}, {bob, 1}]), Bob = escalus_users:get_userspec(Config, bob), Alice = escalus_users:get_userspec(Config, alice), RoomID = given_new_room({alice, Alice}), - Conn = connect_to_sse({bob, Bob}), + {200, Conn} = connect_to_sse({bob, Bob}), given_user_invited({alice, Alice}, RoomID, Bob), - Event = wait_for_event(Conn), - Data = jiffy:decode(maps:get(data, Event), [return_maps]), + Event = sse_helper:wait_for_event(Conn), BobJID = user_jid(Bob), RoomJID = room_jid(RoomID, Config), assert_json_room_sse_message(#{room => RoomID, from => RoomJID, type => <<"affiliation">>, - user => BobJID}, - Data), - stop_sse(Conn). + user => BobJID}, Event), + sse_helper:stop_sse(Conn). all_messages_are_archived(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> @@ -1132,32 +1125,7 @@ is_participant(User, Role, RoomInfo) -> connect_to_sse(User) -> Port = ct:get_config({hosts, mim, http_api_client_endpoint_port}), - {Username, Password} = credentials(User), - Base64 = base64:encode(binary_to_list(Username) ++ [$: | binary_to_list(Password)]), - Headers = [{<<"authorization">>, <<"basic ", Base64/binary>>}, - {<<"host">>, <<"localhost">>}, - {<<"accept">>, <<"text/event-stream">>}], - {ok, ConnPid} = gun:open("localhost", Port, #{ - transport => tls, - protocols => [http], - http_opts => #{content_handlers => [gun_sse_h, gun_data_h]} - }), - {ok, _} = gun:await_up(ConnPid), - StreamRef = gun:get(ConnPid, "/api/sse", Headers), - #{pid => ConnPid, stream_ref => StreamRef}. - -wait_for_event(#{pid := Pid, stream_ref := StreamRef} = Opts) -> - case gun:await(Pid, StreamRef) of - {response, nofin, _Status, _} -> - wait_for_event(Opts); - {sse, #{data := [Response]}} -> - Opts#{data => Response}; - Error -> - Error - end. - -stop_sse(#{pid := Pid}) -> - gun:close(Pid). + sse_helper:connect_to_sse(Port, "/api/sse", credentials(User), #{transport => tls}). assert_json_message(Sent, Received) -> #{<<"body">> := Body, diff --git a/big_tests/tests/sse_helper.erl b/big_tests/tests/sse_helper.erl new file mode 100644 index 00000000000..6fc4ef6d762 --- /dev/null +++ b/big_tests/tests/sse_helper.erl @@ -0,0 +1,41 @@ +-module(sse_helper). + +-compile([export_all, nowarn_export_all]). + +connect_to_sse(Port, Path, Creds, ExtraOpts) -> + ct:log("Connect to SSE, port: ~p, path: ~s, creds: ~p, extra opts: ~p", + [Port, Path, Creds, ExtraOpts]), + Headers = auth_headers(Creds) ++ [{<<"host">>, <<"localhost">>}, + {<<"accept">>, <<"text/event-stream">>}], + Opts = maps:merge(basic_opts(), ExtraOpts), + {ok, ConnPid} = gun:open("localhost", Port, Opts), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, Path, Headers), + {response, nofin, Status, RespHeaders} = gun:await(ConnPid, StreamRef), + Result = case {Status, maps:from_list(RespHeaders)} of + {200, #{<<"content-type">> := <<"text/event-stream">>}} -> + #{pid => ConnPid, stream_ref => StreamRef}; + _ -> + {ok, Body} = gun:await_body(ConnPid, StreamRef), + jiffy:decode(Body, [return_maps]) + end, + ct:log("SSE response code: ~p, headers: ~p, result: ~p", [Status, RespHeaders, Result]), + {Status, Result}. + +wait_for_event(#{pid := Pid, stream_ref := StreamRef}) -> + {sse, #{data := [Response]} = Event} = gun:await(Pid, StreamRef), + ct:log("Received SSE event: ~p", [Event]), + jiffy:decode(Response, [return_maps]). + +stop_sse(#{pid := Pid}) -> + gun:close(Pid). + +auth_headers({Username, Password}) -> + Base64 = base64:encode(binary_to_list(Username) ++ [$: | binary_to_list(Password)]), + [{<<"authorization">>, <<"basic ", Base64/binary>>}]; +auth_headers(undefined) -> + []. + +basic_opts() -> + #{protocols => [http], + http_opts => #{content_handlers => [gun_sse_h, gun_data_h]}}. diff --git a/priv/graphql/schemas/admin/admin_schema.gql b/priv/graphql/schemas/admin/admin_schema.gql index 6209c281d07..6f55d867275 100644 --- a/priv/graphql/schemas/admin/admin_schema.gql +++ b/priv/graphql/schemas/admin/admin_schema.gql @@ -1,6 +1,7 @@ schema{ query: AdminQuery, - mutation: AdminMutation + mutation: AdminMutation, + subscription: AdminSubscription } """ @@ -80,3 +81,7 @@ type AdminMutation @protected{ "Server info and management" server: ServerAdminMutation } + +type AdminSubscription { + stanza: StanzaAdminSubscription +} diff --git a/priv/graphql/schemas/admin/stanza.gql b/priv/graphql/schemas/admin/stanza.gql index 460b1733701..e17b315a9f8 100644 --- a/priv/graphql/schemas/admin/stanza.gql +++ b/priv/graphql/schemas/admin/stanza.gql @@ -19,3 +19,9 @@ type StanzaAdminMutation @protected{ sendStanza(stanza: Stanza): SendStanzaPayload @protected(type: GLOBAL) } + +type StanzaAdminSubscription @protected{ + "Subscribe for the given user's incoming messages" + getMessages(caller: JID!): StanzaMap + @protected(type: DOMAIN, args: ["caller"]) +} diff --git a/priv/graphql/schemas/user/stanza.gql b/priv/graphql/schemas/user/stanza.gql index 47e78590280..b18908feb8f 100644 --- a/priv/graphql/schemas/user/stanza.gql +++ b/priv/graphql/schemas/user/stanza.gql @@ -18,3 +18,8 @@ type StanzaUserMutation @protected{ "Send an arbitrary stanza" sendStanza(stanza: Stanza): SendStanzaPayload } + +type StanzaUserSubscription @protected{ + "Subscribe to incoming messages" + getMessages: StanzaMap +} diff --git a/priv/graphql/schemas/user/user_schema.gql b/priv/graphql/schemas/user/user_schema.gql index cbda662808f..b7b94c29829 100644 --- a/priv/graphql/schemas/user/user_schema.gql +++ b/priv/graphql/schemas/user/user_schema.gql @@ -1,6 +1,7 @@ schema{ query: UserQuery, - mutation: UserMutation + mutation: UserMutation, + subscription: UserSubscription } """ @@ -58,3 +59,7 @@ type UserMutation @protected{ "OAUTH token management" token: TokenUserMutation } + +type UserSubscription { + stanza: StanzaUserSubscription +} diff --git a/src/graphql/admin/mongoose_graphql_admin_subscription.erl b/src/graphql/admin/mongoose_graphql_admin_subscription.erl new file mode 100644 index 00000000000..fb8cdbbca8e --- /dev/null +++ b/src/graphql/admin/mongoose_graphql_admin_subscription.erl @@ -0,0 +1,11 @@ +-module(mongoose_graphql_admin_subscription). +-behaviour(mongoose_graphql). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("../mongoose_graphql_types.hrl"). + +execute(_Ctx, _Obj, <<"stanza">>, _Args) -> + {ok, stanza}. diff --git a/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl b/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl new file mode 100644 index 00000000000..65893bfe255 --- /dev/null +++ b/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl @@ -0,0 +1,26 @@ +-module(mongoose_graphql_stanza_admin_subscription). +-behaviour(mongoose_graphql). + +-import(mongoose_graphql_helper, [format_result/2]). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("../mongoose_graphql_types.hrl"). + +execute(Ctx, _Obj, <<"getMessages">>, Args) -> + get_messages(Ctx, Args). + +get_messages(#{event := terminate, stream := Session}, _) -> + mongoose_stanza_api:close_session(Session), + {ok, null, [{stream, closed}]}; +get_messages(#{event := Event}, _) -> + mongoose_graphql_stanza_helper:handle_event(Event); +get_messages(_Ctx, #{<<"caller">> := Jid}) -> + case mongoose_stanza_api:open_session(Jid, true) of + {ok, Stream} -> + {ok, null, [{stream, Stream}]}; + Error -> + format_result(Error, #{caller => Jid}) + end. diff --git a/src/graphql/mongoose_graphql.erl b/src/graphql/mongoose_graphql.erl index 4264bcbe03f..c14d70c340a 100644 --- a/src/graphql/mongoose_graphql.erl +++ b/src/graphql/mongoose_graphql.erl @@ -9,7 +9,7 @@ -export([init/0, get_endpoint/1, create_endpoint/3, - execute/2, + execute/2, prepare/2, execute/3, execute_cli/3]). @@ -19,13 +19,14 @@ operation_name := binary() | undefined, vars := map(), authorized := boolean(), - ctx := map()}. + ctx := map(), + ast => graphql:document()}. -type context() :: map(). -type object() :: term(). -type field() :: binary(). -type args() :: map(). --type result() :: {ok, term()} | {error, term()}. +-type result() :: {ok, term()} | {ok, term(), Aux :: term()} | {error, term()}. -callback execute(Ctx :: context(), Obj :: object(), Field :: field(), Args :: args()) -> result(). @@ -65,24 +66,40 @@ create_endpoint(Name, Mapping, Patterns) -> %% @doc Execute request on a given endpoint. -spec execute(graphql:endpoint_context(), request()) -> {ok, map()} | {error, term()}. -execute(Ep, #{document := Doc, - operation_name := OpName, - authorized := AuthStatus, - vars := Vars, - ctx := Ctx}) -> - try - {ok, Ast} = graphql_parse(Doc), - {ok, #{ast := Ast2, - fun_env := FunEnv}} = graphql:type_check(Ep, Ast), - ok = graphql:validate(Ast2), - Vars2 = remove_null_args(Vars), - Coerced = graphql:type_check_params(Ep, FunEnv, OpName, Vars2), - Ctx2 = Ctx#{params => Coerced, - operation_name => OpName, - authorized => AuthStatus, - error_module => mongoose_graphql_errors}, - Ast3 = mongoose_graphql_directive:process_directives(Ctx2, Ast2), - {ok, graphql:execute(Ep, Ctx2, Ast3)} +execute(Ep, Req = #{ast := _}) -> + execute_graphql(Ep, Req); +execute(Ep, Req) -> + case prepare(Ep, Req) of + {error, _} = Error -> + Error; + {ok, Req1} -> + execute_graphql(Ep, Req1) + end. + +prepare_request(Ep, #{document := Doc, + operation_name := OpName, + authorized := AuthStatus, + vars := Vars, + ctx := Ctx} = Request) -> + {ok, Ast} = graphql_parse(Doc), + {ok, #{ast := Ast2, + fun_env := FunEnv}} = graphql:type_check(Ep, Ast), + ok = graphql:validate(Ast2), + Vars2 = remove_null_args(Vars), + Coerced = graphql:type_check_params(Ep, FunEnv, OpName, Vars2), + Ctx2 = Ctx#{params => Coerced, + operation_name => OpName, + authorized => AuthStatus, + error_module => mongoose_graphql_errors}, + Ast3 = mongoose_graphql_directive:process_directives(Ctx2, Ast2), + mongoose_graphql_operations:verify_operations(Ctx2, Ast3), + Request#{ast => Ast3, ctx := Ctx2}. + +execute_graphql(Ep, #{ast := Ast, ctx := Ctx}) -> + {ok, graphql:execute(Ep, Ctx, Ast)}. + +prepare(Ep, Req) -> + try {ok, prepare_request(Ep, Req)} catch throw:{error, Err} -> {error, Err}; @@ -143,10 +160,11 @@ remove_null_args(Vars) -> admin_mapping_rules() -> #{objects => #{ 'AdminQuery' => mongoose_graphql_admin_query, + 'AdminMutation' => mongoose_graphql_admin_mutation, + 'AdminSubscription' => mongoose_graphql_admin_subscription, 'AdminAuthInfo' => mongoose_graphql_admin_auth_info, 'DomainAdminQuery' => mongoose_graphql_domain_admin_query, 'GdprAdminQuery' => mongoose_graphql_gdpr_admin_query, - 'AdminMutation' => mongoose_graphql_admin_mutation, 'DomainAdminMutation' => mongoose_graphql_domain_admin_mutation, 'InboxAdminMutation' => mongoose_graphql_inbox_admin_mutation, 'SessionAdminMutation' => mongoose_graphql_session_admin_mutation, @@ -157,6 +175,7 @@ admin_mapping_rules() -> 'GlobalStats' => mongoose_graphql_stats_global, 'DomainStats' => mongoose_graphql_stats_domain, 'StanzaAdminQuery' => mongoose_graphql_stanza_admin_query, + 'StanzaAdminSubscription' => mongoose_graphql_stanza_admin_subscription, 'ServerAdminQuery' => mongoose_graphql_server_admin_query, 'ServerAdminMutation' => mongoose_graphql_server_admin_mutation, 'LastAdminMutation' => mongoose_graphql_last_admin_mutation, @@ -189,6 +208,7 @@ user_mapping_rules() -> #{objects => #{ 'UserQuery' => mongoose_graphql_user_query, 'UserMutation' => mongoose_graphql_user_mutation, + 'UserSubscription' => mongoose_graphql_user_subscription, 'AccountUserQuery' => mongoose_graphql_account_user_query, 'AccountUserMutation' => mongoose_graphql_account_user_mutation, 'InboxUserMutation' => mongoose_graphql_inbox_user_mutation, @@ -208,6 +228,7 @@ user_mapping_rules() -> 'StanzaUserMutation' => mongoose_graphql_stanza_user_mutation, 'TokenUserMutation' => mongoose_graphql_token_user_mutation, 'StanzaUserQuery' => mongoose_graphql_stanza_user_query, + 'StanzaUserSubscription' => mongoose_graphql_stanza_user_subscription, 'HttpUploadUserMutation' => mongoose_graphql_http_upload_user_mutation, 'UserAuthInfo' => mongoose_graphql_user_auth_info, default => mongoose_graphql_default}, diff --git a/src/graphql/mongoose_graphql_commands.erl b/src/graphql/mongoose_graphql_commands.erl index dce16741c2b..d83a10544ee 100644 --- a/src/graphql/mongoose_graphql_commands.erl +++ b/src/graphql/mongoose_graphql_commands.erl @@ -181,12 +181,17 @@ execute(#{doc := Doc, vars := Vars} = Ctx) -> -spec get_category_specs(ep()) -> [{category(), category_spec()}]. get_category_specs(Ep) -> - CSQuery = category_spec_query(), - Doc = iolist_to_binary(["{ __schema { queryType ", CSQuery, " mutationType ", CSQuery, " } }"]), - {ok, #{data := #{<<"__schema">> := Schema}}} = mongoose_graphql:execute(Ep, undefined, Doc), - #{<<"queryType">> := #{<<"fields">> := Queries}, - <<"mutationType">> := #{<<"fields">> := Mutations}} = Schema, - get_category_specs(Ep, <<"query">>, Queries) ++ get_category_specs(Ep, <<"mutation">>, Mutations). + lists:flatmap(fun(OpType) -> get_category_specs(Ep, OpType) end, op_types()). + +get_category_specs(Ep, OpType) -> + OpTypeName = <>, + Doc = iolist_to_binary(["{ __schema { ", OpTypeName, " ", category_spec_query(), " } }"]), + {ok, #{data := #{<<"__schema">> := Schema}}} = mongoose_graphql:execute(Ep, undefined, Doc), + #{OpTypeName := #{<<"fields">> := Categories}} = Schema, + get_category_specs(Ep, OpType, Categories). + +op_types() -> + [<<"query">>, <<"mutation">>, <<"subscription">>]. -spec get_category_specs(ep(), op_type(), [json_map()]) -> [{category(), category_spec()}]. get_category_specs(Ep, OpType, Categories) -> diff --git a/src/graphql/mongoose_graphql_cowboy_handler.erl b/src/graphql/mongoose_graphql_cowboy_handler.erl index 7d830eb4049..e2b666fa994 100644 --- a/src/graphql/mongoose_graphql_cowboy_handler.erl +++ b/src/graphql/mongoose_graphql_cowboy_handler.erl @@ -10,7 +10,8 @@ -behavior(cowboy_rest). %% mongoose_http_handler callbacks --export([config_spec/0]). +-export([config_spec/0, + routes/1]). %% config processing callbacks -export([process_config/1]). @@ -31,6 +32,10 @@ to_json/2, to_html/2]). +%% Utilities used by the SSE handler +-export([check_auth_header/2, + gather/1]). + -ignore_xref([from_json/2, to_html/2, to_json/2]). -include("mongoose_config_spec.hrl"). @@ -57,6 +62,12 @@ process_config(Opts) -> error(#{what => both_username_and_password_required, opts => Opts}) end. +-spec routes(mongoose_http_handler:options()) -> mongoose_http_handler:routes(). +routes(Opts = #{path := Path}) -> + [{Path, ?MODULE, Opts}, + {Path ++ "/sse", lasse_handler, #{module => mongoose_graphql_sse_handler, + init_args => Opts}}]. + %% cowboy_rest callbacks init(Req, Opts) -> @@ -84,18 +95,11 @@ charsets_provided(Req, State) -> {[<<"utf-8">>], Req, State}. is_authorized(Req, State) -> - try cowboy_req:parse_header(<<"authorization">>, Req) of - Auth -> - case check_auth(Auth, State) of - {ok, State2} -> - {true, Req, State2}; - error -> - Msg = make_error(authorize, wrong_credentials), - reply_error(Msg, Req, State) - end - catch - exit:Err -> - reply_error(make_error(authorize, Err), Req, State) + case check_auth_header(Req, State) of + {ok, State2} -> + {true, Req, State2}; + {error, Error} -> + reply_error(Error, Req, State) end. resource_exists(#{method := <<"GET">>} = Req, State) -> @@ -119,6 +123,34 @@ json_request(Req, State) -> from_json(Req, State) -> json_request(Req, State). to_json(Req, State) -> json_request(Req, State). +%% Utils used also by the SSE handler + +check_auth_header(Req, State) -> + try cowboy_req:parse_header(<<"authorization">>, Req) of + Auth -> + case check_auth(Auth, State) of + {ok, State2} -> + {ok, State2}; + error -> + {error, make_error(authorize, wrong_credentials)} + end + catch + exit:Err -> + {error, make_error(authorize, Err)} + end. + +gather(Req, Params) -> + QueryDocument = document(Params), + case variables(Params) of + {ok, Vars} -> + Operation = operation_name(Params), + {ok, Req, #{document => QueryDocument, + vars => Vars, + operation_name => Operation}}; + {error, Reason} -> + {error, Reason} + end. + %% Internal check_auth(Auth, #{schema_endpoint := domain_admin} = State) -> @@ -187,33 +219,25 @@ run_request(#{} = ReqCtx, Req, #{schema_endpoint := EpName, end. gather(Req) -> - {ok, Body, Req2} = cowboy_req:read_body(Req), - Bindings = cowboy_req:bindings(Req2), - try jiffy:decode(Body, [return_maps]) of - JSON -> - gather(Req2, JSON, Bindings) - catch - _:_ -> - {error, make_error(decode, invalid_json_body)} + case get_params(cowboy_req:method(Req), Req) of + {error, _} = Error -> Error; + Params -> gather(Req, Params) end. -gather(Req, Body, Params) -> - QueryDocument = document([Params, Body]), - case variables([Params, Body]) of - {ok, Vars} -> - Operation = operation_name([Params, Body]), - {ok, Req, #{document => QueryDocument, - vars => Vars, - operation_name => Operation}}; - {error, Reason} -> - {error, Reason} +get_params(<<"GET">>, Req) -> + try maps:from_list(cowboy_req:parse_qs(Req)) + catch _:_ -> {error, make_error(decode, invalid_query_parameters)} + end; +get_params(<<"POST">>, Req) -> + {ok, Body, _} = cowboy_req:read_body(Req), + try jiffy:decode(Body, [return_maps]) + catch _:_ -> {error, make_error(decode, invalid_json_body)} end. -document([#{<<"query">> := Q}|_]) -> Q; -document([_|Next]) -> document(Next); -document([]) -> undefined. +document(#{<<"query">> := Q}) -> Q; +document(#{}) -> undefined. -variables([#{<<"variables">> := Vars} | _]) -> +variables(#{<<"variables">> := Vars}) -> if is_binary(Vars) -> try jiffy:decode(Vars, [return_maps]) of @@ -229,16 +253,12 @@ variables([#{<<"variables">> := Vars} | _]) -> Vars == null -> {ok, #{}} end; -variables([_ | Next]) -> - variables(Next); -variables([]) -> +variables(#{}) -> {ok, #{}}. -operation_name([#{<<"operationName">> := OpName} | _]) -> +operation_name(#{<<"operationName">> := OpName}) -> OpName; -operation_name([_ | Next]) -> - operation_name(Next); -operation_name([]) -> +operation_name(#{}) -> undefined. make_error(Phase, Term) -> diff --git a/src/graphql/mongoose_graphql_errors.erl b/src/graphql/mongoose_graphql_errors.erl index d01b3d2c802..6a67f74b4ee 100644 --- a/src/graphql/mongoose_graphql_errors.erl +++ b/src/graphql/mongoose_graphql_errors.erl @@ -52,7 +52,8 @@ crash(_Ctx, Err = #{type := Type}) -> -spec format_error(term())-> {integer(), err_msg()}. format_error(#{phase := Phase, error_term := Term} = Err) when Phase =:= authorize; Phase =:= decode; - Phase =:= parse -> + Phase =:= parse; + Phase =:= verify -> Msg = #{extensions => #{code => err_code(Phase, Term)}, message => iolist_to_binary(err_msg(Phase, Term))}, {err_http_code(Phase), add_path(Err, Msg)}; @@ -91,7 +92,9 @@ err_msg(parse, Result) -> err_msg(decode, Result) -> decode_err_msg(Result); err_msg(authorize, Result) -> - authorize_err_msg(Result). + authorize_err_msg(Result); +err_msg(verify, Result) -> + verify_err_msg(Result). authorize_err_msg({request_error, {header, <<"authorization">>}, _}) -> "Malformed authorization header. Please consult the relevant specification"; @@ -118,9 +121,15 @@ decode_err_msg(no_query_supplied) -> "The query was not supplied in the request body"; decode_err_msg(invalid_json_body) -> "The request JSON body is invalid"; +decode_err_msg(invalid_query_parameters) -> + "The query string is invalid"; decode_err_msg(variables_invalid_json) -> "The variables' JSON is invalid". +verify_err_msg({unsupported_operation, Method, Operation}) -> + io_lib:format("The ~p execution method does not support ~p operations.", + [Method, Operation]). + add_path(#{path := Path}, ErrMsg) -> ErrMsg#{path => Path}; add_path(_, ErrMsg) -> diff --git a/src/graphql/mongoose_graphql_operations.erl b/src/graphql/mongoose_graphql_operations.erl new file mode 100644 index 00000000000..10ca5ab33c5 --- /dev/null +++ b/src/graphql/mongoose_graphql_operations.erl @@ -0,0 +1,30 @@ +-module(mongoose_graphql_operations). + +-export([verify_operations/2]). + +-include_lib("graphql/src/graphql_schema.hrl"). +-include_lib("graphql/src/graphql_internal.hrl"). + +verify_operations(Ctx, #document{definitions = Definitions}) -> + Method = maps:get(method, Ctx, undefined), + [verify_op_type(Method, op_type(Ty)) || #op{id = Id, ty = Ty} <- Definitions, + is_requested_op(Ctx, Id)], + ok. + +verify_op_type(Method, OpType) -> + case is_supported(Method, OpType) of + true -> + ok; + false -> + Error = {unsupported_operation, Method, OpType}, + graphql_err:abort([], verify, Error) + end. + +is_supported(Method, subscription) -> Method =:= sse; +is_supported(Method, _) -> Method =/= sse. + +is_requested_op(#{operation_name := undefined}, _) -> true; +is_requested_op(#{operation_name := OpName}, {name, _, Name}) -> OpName =:= Name. + +op_type(undefined) -> query; +op_type({OpType, _}) -> OpType. diff --git a/src/graphql/mongoose_graphql_sse_handler.erl b/src/graphql/mongoose_graphql_sse_handler.erl new file mode 100644 index 00000000000..0a10bd544e4 --- /dev/null +++ b/src/graphql/mongoose_graphql_sse_handler.erl @@ -0,0 +1,108 @@ +%% @doc An SSE handler for GraphQL subscriptions. +%% The graphql request is prepared, and then executed. +%% +%% 1. The first execution should return 'null' in 'data', and a new Stream in 'aux'. +%% 2. Then, whenever an Event is received, the prepared GraphQL request +%% is executed again, this time with the Stream and the Event in the context. +%% The resolver should then return either 'null' or the processed Event to send to the client. +%% 3. Upon termination, the request is executed one last time with the 'terminate' event. +%% This is an opportunity to clean up all stream resources. +-module(mongoose_graphql_sse_handler). + +-behaviour(lasse_handler). +-export([init/3, + handle_notify/2, + handle_info/2, + handle_error/3, + terminate/3]). + +-include("mongoose.hrl"). + +-type req() :: cowboy_req:req(). +-type state() :: #{atom() => term()}. + +-spec init(state(), any(), cowboy_req:req()) -> + {ok, req(), state()} | + {shutdown, cowboy:http_status(), cowboy:http_headers(), iodata(), req(), state()}. +init(State, _LastEvtId, Req) -> + process_flag(trap_exit, true), % needed for 'terminate' to be called + case cowboy_req:method(Req) of + <<"GET">> -> + case mongoose_graphql_cowboy_handler:check_auth_header(Req, State) of + {ok, State2} -> + case mongoose_graphql_cowboy_handler:gather(Req) of + {ok, Req2, Decoded} -> + run_request(Decoded, Req2, State2); + {error, Reason} -> + reply_error(Reason, Req, State) + end; + {error, Reason} -> + reply_error(Reason, Req, State) + end; + _ -> + {ok, Req, State} % lasse returns 405: Method Not Allowed + end. + +run_request(#{document := undefined}, Req, State) -> + reply_error(make_error(decode, no_query_supplied), Req, State); +run_request(GQLReq, Req, #{schema_endpoint := EpName, authorized := AuthStatus} = State) -> + Ep = mongoose_graphql:get_endpoint(EpName), + Ctx = maps:get(schema_ctx, State, #{}), + GQLReq2 = GQLReq#{authorized => AuthStatus, ctx => Ctx#{method => sse}}, + case mongoose_graphql:prepare(Ep, GQLReq2) of + {error, Reason} -> + reply_error(Reason, Req, State); + {ok, GQLReq3 = #{ctx := Ctx2}} -> + case mongoose_graphql:execute(Ep, GQLReq3) of + {ok, #{aux := [{stream, Stream}]}} -> + Ctx3 = Ctx2#{stream => Stream}, + {ok, Req, State#{id => 1, ep => Ep, req => GQLReq3#{ctx := Ctx3}}}; + {ok, Response} -> + Body = mongoose_graphql_response:term_to_json(Response), + {shutdown, 200, #{}, Body, Req, State} + end + end. + +-spec handle_notify(term(), state()) -> {nosend, state()}. +handle_notify(Msg, State) -> + ?UNEXPECTED_INFO(Msg), + {nosend, State}. + +-spec handle_info(term(), state()) -> {nosend, state()} | {send, lasse_handler:event(), state()}. +handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) -> + Ctx1 = Ctx#{event => Event}, + {ok, #{data := Data} = Response} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), + case has_non_null_value(Data) of + false -> + {nosend, State}; + true -> + EventData = mongoose_graphql_response:term_to_json(Response), + SseEvent = #{id => integer_to_binary(Id), data => EventData}, + {send, SseEvent, State#{id := Id + 1}} + end. + +%% Check if there is any value that is non-null. Any list is considered non-null. +has_non_null_value(M) when is_map(M) -> + lists:any(fun has_non_null_value/1, maps:values(M)); +has_non_null_value(V) -> V =/= null. + +-spec handle_error(iodata(), term(), state()) -> ok. +handle_error(Msg, Reason, _State) -> + ?LOG_ERROR(#{what => mongoose_graphql_sse_handler_failed, + reason => Reason, text => Msg}). + +-spec terminate(term(), req(), state()) -> ok. +terminate(_Reason, _Req, #{ep := Ep, req := Req = #{ctx := Ctx}}) -> + Ctx1 = Ctx#{event => terminate}, + {ok, #{aux := [{stream, closed}]}} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), + ok; +terminate(_Reason, _Req, #{}) -> + ok. + +make_error(Phase, Term) -> + #{error_term => Term, phase => Phase}. + +reply_error(Reason, Req, State) -> + {Code, Error} = mongoose_graphql_errors:format_error(Reason), + Body = jiffy:encode(#{errors => [Error]}), + {shutdown, Code, #{}, Body, Req, State}. diff --git a/src/graphql/mongoose_graphql_stanza_helper.erl b/src/graphql/mongoose_graphql_stanza_helper.erl index 44ff40be168..edf5e345f21 100644 --- a/src/graphql/mongoose_graphql_stanza_helper.erl +++ b/src/graphql/mongoose_graphql_stanza_helper.erl @@ -2,7 +2,10 @@ -import(mongoose_graphql_helper, [null_to_undefined/1, make_error/2]). --export([get_last_messages/5, row_to_map/1]). +-export([get_last_messages/5, row_to_map/1, handle_event/1]). + +-include("mongoose.hrl"). +-include("jlib.hrl"). -spec get_last_messages(Caller :: jid:jid(), Limit :: null | non_neg_integer(), @@ -20,10 +23,27 @@ get_last_messages(Caller, Limit, With, Before, CheckUser) -> Error end. +-spec handle_event(term()) -> {ok, map() | null}. +handle_event({route, From, _To, Acc}) -> + case mongoose_acc:element(Acc) of + Stanza = #xmlel{name = <<"message">>} -> + StanzaID = exml_query:attr(Stanza, <<"id">>, <<>>), + Timestamp = os:system_time(microsecond), + stanza_result(From, Timestamp, StanzaID, Stanza); + _ -> + {ok, null} % Skip other stanza types + end; +handle_event(Msg) -> + ?UNEXPECTED_INFO(Msg), + {ok, null}. + -spec row_to_map(mod_mam:message_row()) -> {ok, map()}. -row_to_map(#{id := Id, jid := From, packet := Msg}) -> +row_to_map(#{id := Id, jid := From, packet := Stanza}) -> {Microseconds, _} = mod_mam_utils:decode_compact_uuid(Id), StanzaID = mod_mam_utils:mess_id_to_external_binary(Id), - Map = #{<<"sender">> => From, <<"timestamp">> => Microseconds, - <<"stanza_id">> => StanzaID, <<"stanza">> => Msg}, + stanza_result(From, Microseconds, StanzaID, Stanza). + +stanza_result(From, Timestamp, StanzaID, Stanza) -> + Map = #{<<"sender">> => From, <<"timestamp">> => Timestamp, + <<"stanza_id">> => StanzaID, <<"stanza">> => Stanza}, {ok, Map}. diff --git a/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl b/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl new file mode 100644 index 00000000000..e209c959299 --- /dev/null +++ b/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl @@ -0,0 +1,22 @@ +-module(mongoose_graphql_stanza_user_subscription). +-behaviour(mongoose_graphql). + +-import(mongoose_graphql_helper, [format_result/2]). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("../mongoose_graphql_types.hrl"). + +execute(Ctx, _Obj, <<"getMessages">>, #{}) -> + get_messages(Ctx). + +get_messages(#{event := terminate, stream := Session}) -> + mongoose_stanza_api:close_session(Session), + {ok, null, [{stream, closed}]}; +get_messages(#{event := Event}) -> + mongoose_graphql_stanza_helper:handle_event(Event); +get_messages(#{user := Jid}) -> + {ok, Stream} = mongoose_stanza_api:open_session(Jid, false), + {ok, null, [{stream, Stream}]}. diff --git a/src/graphql/user/mongoose_graphql_user_subscription.erl b/src/graphql/user/mongoose_graphql_user_subscription.erl new file mode 100644 index 00000000000..39721f5a6ae --- /dev/null +++ b/src/graphql/user/mongoose_graphql_user_subscription.erl @@ -0,0 +1,11 @@ +-module(mongoose_graphql_user_subscription). +-behaviour(mongoose_graphql). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("../mongoose_graphql_types.hrl"). + +execute(_Ctx, _Obj, <<"stanza">>, _Args) -> + {ok, stanza}. diff --git a/src/mongoose_client_api/mongoose_client_api_sse.erl b/src/mongoose_client_api/mongoose_client_api_sse.erl index 18b3e88df4e..f75f32296be 100644 --- a/src/mongoose_client_api/mongoose_client_api_sse.erl +++ b/src/mongoose_client_api/mongoose_client_api_sse.erl @@ -23,14 +23,9 @@ init(_InitArgs, _LastEvtId, Req) -> {Authorization, Req2, State} = mongoose_client_api:is_authorized(Req1, State0), maybe_init(Authorization, Req2, State#{id => 1}). -maybe_init(true, Req, #{jid := JID, creds := Creds} = State) -> - SID = ejabberd_sm:make_new_sid(), - UUID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), - Resource = <<"sse-", UUID/binary>>, - NewJid = jid:replace_resource(JID, Resource), - HostType = mongoose_credentials:host_type(Creds), - ejabberd_sm:open_session(HostType, SID, NewJid, 1, #{}), - {ok, Req, State#{sid => SID, jid => NewJid}}; +maybe_init(true, Req, #{jid := JID} = State) -> + Session = mongoose_stanza_api:open_session(JID, false), + {ok, Req, State#{session => Session}}; maybe_init(true, Req, State) -> %% This is for OPTIONS method {shutdown, 200, #{}, <<>>, Req, State}; @@ -58,20 +53,11 @@ handle_error(Msg, Reason, State) -> ?LOG_WARNING(#{what => sse_handle_error, msg => Msg, reason => Reason}), {nosend, State}. -terminate(_Reason, _Req, #{creds := Creds} = State) -> - case maps:get(sid, State, undefined) of - undefined -> - ok; - SID -> - JID = #jid{lserver = S} = maps:get(jid, State), - Acc = mongoose_acc:new( - #{location => ?LOCATION, - lserver => S, - host_type => mongoose_credentials:host_type(Creds), - element => undefined}), - ejabberd_sm:close_session(Acc, SID, JID, normal) - end, - State. +terminate(_Reason, _Req, #{session := Session}) -> + mongoose_stanza_api:close_session(Session), + ok; +terminate(_Reason, _Req, _State) -> + ok. maybe_send_message_event(<<"chat">>, Packet, Timestamp, #{id := ID} = State) -> Data = jiffy:encode(mongoose_client_api_messages:encode(Packet, Timestamp)), diff --git a/src/mongoose_stanza_api.erl b/src/mongoose_stanza_api.erl index 03adb04843b..025a2c5de1e 100644 --- a/src/mongoose_stanza_api.erl +++ b/src/mongoose_stanza_api.erl @@ -1,10 +1,19 @@ -module(mongoose_stanza_api). + +%% API -export([send_chat_message/4, send_headline_message/5, send_stanza/2, lookup_recent_messages/5]). +%% Event API +-export([open_session/2, close_session/1]). + -include("jlib.hrl"). -include("mongoose_rsm.hrl"). -include("mongoose_logger.hrl"). +-type stream() :: #{jid := jid:jid(), + sid := ejabberd_sm:sid(), + host_type := mongooseim:host_type()}. + %% API -spec send_chat_message(User :: jid:jid() | undefined, From :: jid:jid() | undefined, @@ -40,6 +49,23 @@ lookup_recent_messages(User, With, Before, Limit, CheckUser) -> M = #{user => User, with => With, before => Before, limit => Limit, check_user => CheckUser}, fold(M, [fun get_host_type/1, fun check_user/1, fun lookup_messages/1]). +%% Event API + +-spec open_session(jid:jid(), boolean()) -> {unknown_user, iodata()} | {ok, stream()}. +open_session(User, CheckUser) -> + M = #{user => User, check_user => CheckUser}, + fold(M, [fun get_host_type/1, fun check_user/1, fun do_open_session/1]). + +-spec close_session(stream()) -> {ok, closed}. +close_session(#{jid := Jid = #jid{lserver = S}, sid := Sid, host_type := HostType}) -> + Acc = mongoose_acc:new( + #{location => ?LOCATION, + lserver => S, + host_type => HostType, + element => undefined}), + ejabberd_sm:close_session(Acc, Sid, Jid, normal), + {ok, closed}. + %% Steps %% @doc Determine the user's bare JID and the 'from' JID, and check if they match @@ -142,6 +168,14 @@ lookup_messages(#{user := UserJid, with := WithJid, before := Before, limit := L {ok, {_, _, Rows}} = mod_mam_pm:lookup_messages(HostType, Params), {ok, {Rows, Limit2}}. +do_open_session(#{host_type := HostType, user := JID}) -> + SID = ejabberd_sm:make_new_sid(), + UUID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), + Resource = <<"sse-", UUID/binary>>, + NewJid = jid:replace_resource(JID, Resource), + ejabberd_sm:open_session(HostType, SID, NewJid, 1, #{}), + {ok, #{sid => SID, jid => NewJid, host_type => HostType}}. + %% Helpers -spec build_chat_message(jid:literal_jid(), jid:literal_jid(), binary()) -> exml:element().