diff --git a/big_tests/tests/graphql_SUITE.erl b/big_tests/tests/graphql_SUITE.erl index 2005f301134..660e5f7ae04 100644 --- a/big_tests/tests/graphql_SUITE.erl +++ b/big_tests/tests/graphql_SUITE.erl @@ -7,7 +7,7 @@ -compile([export_all, nowarn_export_all]). -import(distributed_helper, [mim/0, require_rpc_nodes/1, rpc/4]). --import(graphql_helper, [execute/3, execute_auth/2, execute_user/3]). +-import(graphql_helper, [execute/3, execute_sse/3, execute_auth/2, execute_user/3]). -define(assertAdminAuth(Domain, Type, Auth, Data), assert_auth(#{<<"domain">> => Domain, @@ -35,7 +35,10 @@ groups() -> cowboy_handler() -> [can_connect_to_admin, can_connect_to_domain_admin, - can_connect_to_user]. + can_connect_to_user, + can_connect_to_admin_sse, + can_connect_to_domain_admin_sse, + can_connect_to_user_sse]. user_handler() -> [user_checks_auth, @@ -96,6 +99,15 @@ can_connect_to_domain_admin(_Config) -> can_connect_to_user(_Config) -> ?assertMatch({{<<"400">>, <<"Bad Request">>}, _}, execute(user, #{}, undefined)). +can_connect_to_admin_sse(_Config) -> + ?assertMatch({400, _}, execute_sse(admin, #{}, undefined)). + +can_connect_to_domain_admin_sse(_Config) -> + ?assertMatch({400, _}, execute_sse(domain_admin, #{}, undefined)). + +can_connect_to_user_sse(_Config) -> + ?assertMatch({400, _}, execute_sse(user, #{}, undefined)). + can_load_graphiql(Config) -> Ep = ?config(schema_endpoint, Config), {Status, Html} = get_graphiql_website(Ep), diff --git a/big_tests/tests/graphql_helper.erl b/big_tests/tests/graphql_helper.erl index 09d00ea3d98..f3f880daeaa 100644 --- a/big_tests/tests/graphql_helper.erl +++ b/big_tests/tests/graphql_helper.erl @@ -27,10 +27,28 @@ execute(Node, EpName, Body, Creds) -> body => Body}, rest_helper:make_request(Request). +execute_sse(EpName, Params, Creds) -> + #{node := Node} = mim(), + execute_sse(Node, EpName, Params, Creds). + +execute_sse(Node, EpName, Params, Creds) -> + Port = get_listener_port(Node, EpName), + Path = "/api/graphql/sse", + QS = uri_string:compose_query([{atom_to_binary(K), encode_sse_value(V)} + || {K, V} <- maps:to_list(Params)]), + sse_helper:connect_to_sse(Port, [Path, "?", QS], Creds, #{}). + +encode_sse_value(M) when is_map(M) -> jiffy:encode(M); +encode_sse_value(V) when is_binary(V) -> V. + execute_user_command(Category, Command, User, Args, Config) -> - #{Category := #{commands := #{Command := #{doc := Doc}}}} = get_specs(), + Doc = get_doc(Category, Command), execute_user(#{query => Doc, variables => Args}, User, Config). +execute_user_command_sse(Category, Command, User, Args, Config) -> + Doc = get_doc(Category, Command), + execute_user_sse(#{query => Doc, variables => Args}, User, Config). + execute_command(Category, Command, Args, Config) -> #{node := Node} = mim(), Protocol = ?config(protocol, Config), @@ -40,9 +58,13 @@ execute_command(Node, Category, Command, Args, Config) -> Protocol = ?config(protocol, Config), execute_command(Node, Category, Command, Args, Config, Protocol). +execute_command_sse(Category, Command, Args, Config) -> + Doc = get_doc(Category, Command), + execute_auth_sse(#{query => Doc, variables => Args}, Config). + %% Admin commands can be executed as GraphQL over HTTP or with CLI (mongooseimctl) execute_command(Node, Category, Command, Args, Config, http) -> - #{Category := #{commands := #{Command := #{doc := Doc}}}} = get_specs(), + Doc = get_doc(Category, Command), execute_auth(Node, #{query => Doc, variables => Args}, Config); execute_command(Node, Category, Command, Args, Config, cli) -> CLIArgs = encode_cli_args(Args), @@ -50,6 +72,10 @@ execute_command(Node, Category, Command, Args, Config, cli) -> = mongooseimctl_helper:mongooseimctl(Node, Category, [Command | CLIArgs], Config), {{exit_status, Code}, rest_helper:decode(Result, #{return_maps => true})}. +get_doc(Category, Command) -> + #{Category := #{commands := #{Command := #{doc := Doc}}}} = get_specs(), + Doc. + encode_cli_args(Args) -> lists:flatmap(fun({Name, Value}) -> encode_cli_arg(Name, Value) end, maps:to_list(Args)). encode_cli_arg(_Name, null) -> @@ -71,14 +97,22 @@ execute_auth(Body, Config) -> execute_auth(Node, Body, Config). execute_auth(Node, Body, Config) -> - case Ep = ?config(schema_endpoint, Config) of - admin -> - #{username := Username, password := Password} = get_listener_opts(Ep), - execute(Node, Ep, Body, {Username, Password}); - domain_admin -> - Creds = ?config(domain_admin, Config), - execute(Node, Ep, Body, Creds) - end. + Ep = ?config(schema_endpoint, Config), + execute(Node, Ep, Body, make_admin_creds(Ep, Config)). + +execute_auth_sse(Body, Config) -> + #{node := Node} = mim(), + execute_auth_sse(Node, Body, Config). + +execute_auth_sse(Node, Body, Config) -> + Ep = ?config(schema_endpoint, Config), + execute_sse(Node, Ep, Body, make_admin_creds(Ep, Config)). + +make_admin_creds(admin = Ep, _Config) -> + #{username := Username, password := Password} = get_listener_opts(Ep), + {Username, Password}; +make_admin_creds(domain_admin, Config) -> + ?config(domain_admin, Config). execute_user(Body, User, Config) -> Ep = ?config(schema_endpoint, Config), @@ -86,6 +120,12 @@ execute_user(Body, User, Config) -> #{node := Node} = mim(), execute(Node, Ep, Body, Creds). +execute_user_sse(Body, User, Config) -> + Ep = ?config(schema_endpoint, Config), + Creds = make_creds(User), + #{node := Node} = mim(), + execute_sse(Node, Ep, Body, Creds). + -spec get_listener_port(binary()) -> integer(). get_listener_port(EpName) -> #{node := Node} = mim(), diff --git a/big_tests/tests/graphql_stanza_SUITE.erl b/big_tests/tests/graphql_stanza_SUITE.erl index 0b35efcb8c2..ca623499f53 100644 --- a/big_tests/tests/graphql_stanza_SUITE.erl +++ b/big_tests/tests/graphql_stanza_SUITE.erl @@ -1,6 +1,5 @@ -module(graphql_stanza_SUITE). --include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("exml/include/exml.hrl"). @@ -8,7 +7,9 @@ -import(distributed_helper, [mim/0, require_rpc_nodes/1]). -import(graphql_helper, [execute_user_command/5, execute_command/4, - get_ok_value/2, get_err_code/1, get_err_msg/1, get_coercion_err_msg/1, + execute_user_command_sse/5, execute_command_sse/4, + get_ok_value/2, get_value/2, + get_err_code/1, get_err_msg/1, get_coercion_err_msg/1, get_unauthorized/1, get_not_loaded/1]). suite() -> @@ -22,19 +23,25 @@ all() -> groups() -> [{admin_stanza_http, [], [{group, admin_mam}, - {group, admin_no_mam}]}, + {group, admin_no_mam_http}]}, {admin_stanza_cli, [], [{group, admin_mam}, - {group, admin_no_mam}]}, + {group, admin_no_mam_cli}]}, {domain_admin_stanza, [], [{group, admin_mam}, % same as for admin {group, domain_admin_no_mam}]}, {user_stanza, [], [{group, user_mam}, {group, user_no_mam}]}, {admin_mam, [parallel], admin_mam_cases()}, - {admin_no_mam, [parallel], admin_stanza_cases() ++ admin_no_mam_cases()}, + {admin_no_mam_http, [parallel], admin_stanza_cases(http) ++ admin_no_mam_cases()}, + {admin_no_mam_cli, [], admin_stanza_cases(cli) ++ admin_no_mam_cases()}, {domain_admin_no_mam, [parallel], domain_admin_stanza_cases() ++ admin_no_mam_cases()}, {user_mam, [parallel], user_mam_cases()}, {user_no_mam, [parallel], user_stanza_cases() ++ user_no_mam_cases()}]. +admin_stanza_cases(cli) -> + admin_stanza_cases(); +admin_stanza_cases(http) -> + admin_stanza_cases() ++ admin_sse_cases(). + admin_stanza_cases() -> [admin_send_message, admin_send_message_to_unparsable_jid, @@ -44,6 +51,9 @@ admin_stanza_cases() -> admin_send_stanza_from_unknown_user, admin_send_stanza_from_unknown_domain]. +admin_sse_cases() -> + [admin_get_messages]. + admin_mam_cases() -> [admin_get_last_messages, admin_get_last_messages_for_unknown_user, @@ -69,6 +79,7 @@ domain_admin_stanza_cases() -> user_stanza_cases() -> [user_send_message, + user_get_messages, user_send_message_without_from, user_send_message_with_spoofed_from, user_send_message_headline, @@ -89,7 +100,6 @@ init_per_suite(Config) -> dynamic_modules:save_modules(domain_helper:host_type(), Config2). end_per_suite(Config) -> - escalus_fresh:clean(), dynamic_modules:restore_modules(Config), escalus:end_per_suite(Config). @@ -105,7 +115,8 @@ init_per_group(GN, Config) when GN =:= admin_mam; GN =:= domain_admin_mam; GN =:= user_mam -> init_mam(Config); -init_per_group(GN, Config) when GN =:= admin_no_mam; +init_per_group(GN, Config) when GN =:= admin_no_mam_http; + GN =:= admin_no_mam_cli; GN =:= domain_admin_no_mam; GN =:= user_no_mam -> Mods = [{mod_mam, stopped}], @@ -118,7 +129,7 @@ end_per_group(GN, _Config) when GN =:= admin_stanza_http; GN =:= user_stanza -> graphql_helper:clean(); end_per_group(_GN, _Config) -> - ok. + escalus_fresh:clean(). init_per_testcase(CaseName, Config) -> escalus:init_per_testcase(CaseName, Config). @@ -155,21 +166,19 @@ admin_send_message_story(Config, Alice, Bob) -> admin_get_messages(Config) -> escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], - fun admin_send_message_story/3). + fun admin_get_messages_story/3). admin_get_messages_story(Config, Alice, Bob) -> From = escalus_client:full_jid(Alice), To = escalus_client:short_jid(Bob), - - + {200, Stream} = get_messages(To, Config), escalus:send(Alice, escalus_stanza:chat(From, To, <<"Hi!">>)), - - - - Res = send_message(From, To, <<"Hi!">>, Config), - #{<<"id">> := StanzaId} = get_ok_value([data, stanza, sendMessage], Res), - assert_not_empty(StanzaId), - escalus:assert(is_message, escalus:wait_for_stanza(Bob)). + Event = sse_helper:wait_for_event(Stream), + #{<<"stanza">> := StanzaBin, <<"sender">> := From} = + graphql_helper:get_value([data, stanza, getMessages], Event), + {ok, Stanza} = exml:parse(StanzaBin), + escalus:assert(is_message, Stanza), + sse_helper:stop_sse(Stream). user_send_message(Config) -> escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], @@ -183,6 +192,22 @@ user_send_message_story(Config, Alice, Bob) -> assert_not_empty(StanzaId), escalus:assert(is_message, escalus:wait_for_stanza(Bob)). +user_get_messages(Config) -> + escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], + fun user_get_messages_story/3). + +user_get_messages_story(Config, Alice, Bob) -> + From = escalus_client:full_jid(Alice), + To = escalus_client:short_jid(Bob), + {200, Stream} = user_get_messages(Bob, Config), + escalus:send(Alice, escalus_stanza:chat(From, To, <<"Hi!">>)), + Event = sse_helper:wait_for_event(Stream), + #{<<"stanza">> := StanzaBin, <<"sender">> := From} = + graphql_helper:get_value([data, stanza, getMessages], Event), + {ok, Stanza} = exml:parse(StanzaBin), + escalus:assert(is_message, Stanza), + sse_helper:stop_sse(Stream). + user_send_message_without_from(Config) -> escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}], fun user_send_message_without_from_story/3). @@ -545,6 +570,13 @@ user_send_stanza(User, Stanza, Config) -> Vars = #{stanza => Stanza}, execute_user_command(<<"stanza">>, <<"sendStanza">>, User, Vars, Config). +get_messages(Caller, Config) -> + Vars = #{caller => Caller}, + execute_command_sse(<<"stanza">>, <<"getMessages">>, Vars, Config). + +user_get_messages(User, Config) -> + execute_user_command_sse(<<"stanza">>, <<"getMessages">>, User, #{}, Config). + get_last_messages(Caller, With, Before, Config) -> Vars = #{caller => Caller, with => With, before => Before}, execute_command(<<"stanza">>, <<"getLastMessages">>, Vars, Config). diff --git a/big_tests/tests/rest_client_SUITE.erl b/big_tests/tests/rest_client_SUITE.erl index 0e75d5e3e89..6655ef838bb 100644 --- a/big_tests/tests/rest_client_SUITE.erl +++ b/big_tests/tests/rest_client_SUITE.erl @@ -228,15 +228,12 @@ msg_is_sent_and_delivered_over_sse(ConfigIn) -> Bob = escalus_users:get_userspec(Config, bob), Alice = escalus_users:get_userspec(Config, alice), - Conn = connect_to_sse({alice, Alice}), + {200, Conn} = connect_to_sse({alice, Alice}), M = send_message(bob, Bob, Alice), - Event = wait_for_event(Conn), - Data = jiffy:decode(maps:get(data, Event), [return_maps]), - - assert_json_message(M, Data), - - stop_sse(Conn). + Event = sse_helper:wait_for_event(Conn), + assert_json_message(M, Event), + sse_helper:stop_sse(Conn). message_sending_errors(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> @@ -258,31 +255,27 @@ room_msg_is_sent_and_delivered_over_sse(ConfigIn) -> RoomID = given_new_room_with_users({alice, Alice}, [{bob, Bob}]), RoomInfo = get_room_info({alice, Alice}, RoomID), true = is_participant(Bob, <<"member">>, RoomInfo), - Conn = connect_to_sse({bob, Bob}), + {200, Conn} = connect_to_sse({bob, Bob}), Message = given_message_sent_to_room(RoomID, {alice, Alice}), - Event = wait_for_event(Conn), - Data = jiffy:decode(maps:get(data, Event), [return_maps]), - assert_json_room_sse_message(Message#{room => RoomID, type => <<"message">>}, - Data), - stop_sse(Conn). + Event = sse_helper:wait_for_event(Conn), + assert_json_room_sse_message(Message#{room => RoomID, type => <<"message">>}, Event), + sse_helper:stop_sse(Conn). aff_change_msg_is_delivered_over_sse(ConfigIn) -> Config = escalus_fresh:create_users(ConfigIn, [{alice, 1}, {bob, 1}]), Bob = escalus_users:get_userspec(Config, bob), Alice = escalus_users:get_userspec(Config, alice), RoomID = given_new_room({alice, Alice}), - Conn = connect_to_sse({bob, Bob}), + {200, Conn} = connect_to_sse({bob, Bob}), given_user_invited({alice, Alice}, RoomID, Bob), - Event = wait_for_event(Conn), - Data = jiffy:decode(maps:get(data, Event), [return_maps]), + Event = sse_helper:wait_for_event(Conn), BobJID = user_jid(Bob), RoomJID = room_jid(RoomID, Config), assert_json_room_sse_message(#{room => RoomID, from => RoomJID, type => <<"affiliation">>, - user => BobJID}, - Data), - stop_sse(Conn). + user => BobJID}, Event), + sse_helper:stop_sse(Conn). all_messages_are_archived(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> @@ -1132,32 +1125,7 @@ is_participant(User, Role, RoomInfo) -> connect_to_sse(User) -> Port = ct:get_config({hosts, mim, http_api_client_endpoint_port}), - {Username, Password} = credentials(User), - Base64 = base64:encode(binary_to_list(Username) ++ [$: | binary_to_list(Password)]), - Headers = [{<<"authorization">>, <<"basic ", Base64/binary>>}, - {<<"host">>, <<"localhost">>}, - {<<"accept">>, <<"text/event-stream">>}], - {ok, ConnPid} = gun:open("localhost", Port, #{ - transport => tls, - protocols => [http], - http_opts => #{content_handlers => [gun_sse_h, gun_data_h]} - }), - {ok, _} = gun:await_up(ConnPid), - StreamRef = gun:get(ConnPid, "/api/sse", Headers), - #{pid => ConnPid, stream_ref => StreamRef}. - -wait_for_event(#{pid := Pid, stream_ref := StreamRef} = Opts) -> - case gun:await(Pid, StreamRef) of - {response, nofin, _Status, _} -> - wait_for_event(Opts); - {sse, #{data := [Response]}} -> - Opts#{data => Response}; - Error -> - Error - end. - -stop_sse(#{pid := Pid}) -> - gun:close(Pid). + sse_helper:connect_to_sse(Port, "/api/sse", credentials(User), #{transport => tls}). assert_json_message(Sent, Received) -> #{<<"body">> := Body, diff --git a/big_tests/tests/sse_helper.erl b/big_tests/tests/sse_helper.erl new file mode 100644 index 00000000000..7ad51d5c1dd --- /dev/null +++ b/big_tests/tests/sse_helper.erl @@ -0,0 +1,37 @@ +-module(sse_helper). + +-compile([export_all, nowarn_export_all]). + +connect_to_sse(Port, Path, Creds, ExtraOpts) -> + Headers = auth_headers(Creds) ++ [{<<"host">>, <<"localhost">>}, + {<<"accept">>, <<"text/event-stream">>}], + Opts = maps:merge(basic_opts(), ExtraOpts), + {ok, ConnPid} = gun:open("localhost", Port, Opts), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:get(ConnPid, Path, Headers), + {response, nofin, Status, _Headers} = gun:await(ConnPid, StreamRef), + Result = case Status of + 200 -> + #{pid => ConnPid, stream_ref => StreamRef}; + _ -> + {ok, Body} = gun:await_body(ConnPid, StreamRef), + jiffy:decode(Body, [return_maps]) + end, + {Status, Result}. + +wait_for_event(#{pid := Pid, stream_ref := StreamRef}) -> + {sse, #{data := [Response]}} = gun:await(Pid, StreamRef), + jiffy:decode(Response, [return_maps]). + +stop_sse(#{pid := Pid}) -> + gun:close(Pid). + +auth_headers({Username, Password}) -> + Base64 = base64:encode(binary_to_list(Username) ++ [$: | binary_to_list(Password)]), + [{<<"authorization">>, <<"basic ", Base64/binary>>}]; +auth_headers(undefined) -> + []. + +basic_opts() -> + #{protocols => [http], + http_opts => #{content_handlers => [gun_sse_h, gun_data_h]}}. diff --git a/priv/graphql/schemas/user/stanza.gql b/priv/graphql/schemas/user/stanza.gql index 47e78590280..dbbc2f4ad3d 100644 --- a/priv/graphql/schemas/user/stanza.gql +++ b/priv/graphql/schemas/user/stanza.gql @@ -18,3 +18,7 @@ type StanzaUserMutation @protected{ "Send an arbitrary stanza" sendStanza(stanza: Stanza): SendStanzaPayload } + +type StanzaUserSubscription { + getMessages: StanzaMap +} diff --git a/priv/graphql/schemas/user/user_schema.gql b/priv/graphql/schemas/user/user_schema.gql index cbda662808f..b7b94c29829 100644 --- a/priv/graphql/schemas/user/user_schema.gql +++ b/priv/graphql/schemas/user/user_schema.gql @@ -1,6 +1,7 @@ schema{ query: UserQuery, - mutation: UserMutation + mutation: UserMutation, + subscription: UserSubscription } """ @@ -58,3 +59,7 @@ type UserMutation @protected{ "OAUTH token management" token: TokenUserMutation } + +type UserSubscription { + stanza: StanzaUserSubscription +} diff --git a/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl b/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl index 2777acc396f..b6e2d18c61e 100644 --- a/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl +++ b/src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl @@ -12,14 +12,9 @@ execute(Ctx, _Obj, <<"getMessages">>, Args) -> get_messages(Ctx, Args). -get_messages(#{event := Event}, _) -> - case mongoose_graphql_stanza_helper:handle_event(Event) of - {EventName, Data} -> - {ok, Data, [{event, EventName}]}; - no_event -> - {ok, null, [{event, none}]} - end; get_messages(#{event := terminate, stream := Session}, _) -> {ok, null, [{stream, mongoose_stanza_event_api:close_session(Session)}]}; +get_messages(#{event := Event}, _) -> + mongoose_graphql_stanza_helper:handle_event(Event); get_messages(_Ctx, #{<<"caller">> := Jid}) -> {ok, null, [{stream, mongoose_stanza_event_api:open_session(Jid)}]}. diff --git a/src/graphql/mongoose_graphql.erl b/src/graphql/mongoose_graphql.erl index d193fcb3a00..c14d70c340a 100644 --- a/src/graphql/mongoose_graphql.erl +++ b/src/graphql/mongoose_graphql.erl @@ -160,11 +160,11 @@ remove_null_args(Vars) -> admin_mapping_rules() -> #{objects => #{ 'AdminQuery' => mongoose_graphql_admin_query, + 'AdminMutation' => mongoose_graphql_admin_mutation, + 'AdminSubscription' => mongoose_graphql_admin_subscription, 'AdminAuthInfo' => mongoose_graphql_admin_auth_info, 'DomainAdminQuery' => mongoose_graphql_domain_admin_query, 'GdprAdminQuery' => mongoose_graphql_gdpr_admin_query, - 'AdminMutation' => mongoose_graphql_admin_mutation, - 'AdminSubscription' => mongoose_graphql_admin_subscription, 'DomainAdminMutation' => mongoose_graphql_domain_admin_mutation, 'InboxAdminMutation' => mongoose_graphql_inbox_admin_mutation, 'SessionAdminMutation' => mongoose_graphql_session_admin_mutation, @@ -208,6 +208,7 @@ user_mapping_rules() -> #{objects => #{ 'UserQuery' => mongoose_graphql_user_query, 'UserMutation' => mongoose_graphql_user_mutation, + 'UserSubscription' => mongoose_graphql_user_subscription, 'AccountUserQuery' => mongoose_graphql_account_user_query, 'AccountUserMutation' => mongoose_graphql_account_user_mutation, 'InboxUserMutation' => mongoose_graphql_inbox_user_mutation, @@ -227,6 +228,7 @@ user_mapping_rules() -> 'StanzaUserMutation' => mongoose_graphql_stanza_user_mutation, 'TokenUserMutation' => mongoose_graphql_token_user_mutation, 'StanzaUserQuery' => mongoose_graphql_stanza_user_query, + 'StanzaUserSubscription' => mongoose_graphql_stanza_user_subscription, 'HttpUploadUserMutation' => mongoose_graphql_http_upload_user_mutation, 'UserAuthInfo' => mongoose_graphql_user_auth_info, default => mongoose_graphql_default}, diff --git a/src/graphql/mongoose_graphql_cowboy_handler.erl b/src/graphql/mongoose_graphql_cowboy_handler.erl index 0fe7189acb4..000e3d35743 100644 --- a/src/graphql/mongoose_graphql_cowboy_handler.erl +++ b/src/graphql/mongoose_graphql_cowboy_handler.erl @@ -30,7 +30,10 @@ %% Data input/output callbacks -export([from_json/2, to_json/2, - to_html/2, + to_html/2]). + +%% Utilities used by the SSE handler +-export([check_auth_header/2, gather/1]). -ignore_xref([from_json/2, to_html/2, to_json/2]). @@ -91,18 +94,11 @@ charsets_provided(Req, State) -> {[<<"utf-8">>], Req, State}. is_authorized(Req, State) -> - try cowboy_req:parse_header(<<"authorization">>, Req) of - Auth -> - case check_auth(Auth, State) of - {ok, State2} -> - {true, Req, State2}; - error -> - Msg = make_error(authorize, wrong_credentials), - reply_error(Msg, Req, State) - end - catch - exit:Err -> - reply_error(make_error(authorize, Err), Req, State) + case check_auth_header(Req, State) of + {ok, State2} -> + {true, Req, State2}; + {error, Error} -> + reply_error(Error, Req, State) end. resource_exists(#{method := <<"GET">>} = Req, State) -> @@ -126,6 +122,34 @@ json_request(Req, State) -> from_json(Req, State) -> json_request(Req, State). to_json(Req, State) -> json_request(Req, State). +%% Utils used also by the SSE handler + +check_auth_header(Req, State) -> + try cowboy_req:parse_header(<<"authorization">>, Req) of + Auth -> + case check_auth(Auth, State) of + {ok, State2} -> + {ok, State2}; + error -> + {error, make_error(authorize, wrong_credentials)} + end + catch + exit:Err -> + {error, make_error(authorize, Err)} + end. + +gather(Req, Params) -> + QueryDocument = document(Params), + case variables(Params) of + {ok, Vars} -> + Operation = operation_name(Params), + {ok, Req, #{document => QueryDocument, + vars => Vars, + operation_name => Operation}}; + {error, Reason} -> + {error, Reason} + end. + %% Internal check_auth(Auth, #{schema_endpoint := domain_admin} = State) -> @@ -209,18 +233,6 @@ get_params(<<"POST">>, Req) -> catch _:_ -> {error, make_error(decode, invalid_json_body)} end. -gather(Req, Params) -> - QueryDocument = document(Params), - case variables(Params) of - {ok, Vars} -> - Operation = operation_name(Params), - {ok, Req, #{document => QueryDocument, - vars => Vars, - operation_name => Operation}}; - {error, Reason} -> - {error, Reason} - end. - document(#{<<"query">> := Q}) -> Q; document(#{}) -> undefined. diff --git a/src/graphql/mongoose_graphql_operations.erl b/src/graphql/mongoose_graphql_operations.erl index 485e6a8a34c..10ca5ab33c5 100644 --- a/src/graphql/mongoose_graphql_operations.erl +++ b/src/graphql/mongoose_graphql_operations.erl @@ -7,19 +7,24 @@ verify_operations(Ctx, #document{definitions = Definitions}) -> Method = maps:get(method, Ctx, undefined), - [verify_op_type(Method, op_type(Ty)) || #op{ty = Ty} <- Definitions], + [verify_op_type(Method, op_type(Ty)) || #op{id = Id, ty = Ty} <- Definitions, + is_requested_op(Ctx, Id)], ok. -op_type(undefined) -> query; -op_type({query, _}) -> query; -op_type({mutation, _}) -> mutation; -op_type({subscription, _}) -> subscription. +verify_op_type(Method, OpType) -> + case is_supported(Method, OpType) of + true -> + ok; + false -> + Error = {unsupported_operation, Method, OpType}, + graphql_err:abort([], verify, Error) + end. + +is_supported(Method, subscription) -> Method =:= sse; +is_supported(Method, _) -> Method =/= sse. -verify_op_type(http, query) -> ok; -verify_op_type(http, mutation) -> ok; -verify_op_type(sse, subscription) -> ok; -verify_op_type(cli, _) -> ok; -verify_op_type(undefined, query) -> ok; -verify_op_type(Method, Type) -> - Error = {unsupported_operation, Method, Type}, - graphql_err:abort([], verify, Error). +is_requested_op(#{operation_name := undefined}, _) -> true; +is_requested_op(#{operation_name := OpName}, {name, _, Name}) -> OpName =:= Name. + +op_type(undefined) -> query; +op_type({OpType, _}) -> OpType. diff --git a/src/graphql/mongoose_graphql_sse_handler.erl b/src/graphql/mongoose_graphql_sse_handler.erl index dbb20f7b9be..c8b28805954 100644 --- a/src/graphql/mongoose_graphql_sse_handler.erl +++ b/src/graphql/mongoose_graphql_sse_handler.erl @@ -10,16 +10,27 @@ -include("mongoose.hrl"). init(State, _LastEvtId, Req) -> - case mongoose_graphql_cowboy_handler:is_authorized(Req, State) of - {true, Req1, State1} -> - case mongoose_graphql_cowboy_handler:gather(Req1) of - {ok, Req2, Decoded} -> - run_request(Decoded, Req2, State1) - end + process_flag(trap_exit, true), % needed for 'terminate' to be called + case cowboy_req:method(Req) of + <<"GET">> -> + case mongoose_graphql_cowboy_handler:check_auth_header(Req, State) of + {ok, State2} -> + case mongoose_graphql_cowboy_handler:gather(Req) of + {ok, Req2, Decoded} -> + run_request(Decoded, Req2, State2); + {error, Reason} -> + reply_error(Reason, Req, State) + end; + {error, Reason} -> + reply_error(Reason, Req, State) + end; + _ -> + {ok, Req, State} % lasse returns 405: Method Not Allowed end. -run_request(GQLReq, Req, #{schema_endpoint := EpName, - authorized := AuthStatus} = State) -> +run_request(#{document := undefined}, Req, State) -> + reply_error(make_error(decode, no_query_supplied), Req, State); +run_request(GQLReq, Req, #{schema_endpoint := EpName, authorized := AuthStatus} = State) -> Ep = mongoose_graphql:get_endpoint(EpName), Ctx = maps:get(schema_ctx, State, #{}), GQLReq2 = GQLReq#{authorized => AuthStatus, ctx => Ctx#{method => sse}}, @@ -44,11 +55,11 @@ handle_notify(Msg, State) -> handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) -> Ctx1 = Ctx#{event => Event}, case mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}) of - {ok, #{aux := [{event, none}]}} -> + {ok, #{data := null}} -> {nosend, State}; - {ok, Response = #{aux := [{event, EventName}]}} -> - Data = mongoose_graphql_response:term_to_json(maps:remove(aux, Response)), - {send, #{id => integer_to_binary(Id), event => EventName, data => Data}, + {ok, #{} = Response} -> + Data = mongoose_graphql_response:term_to_json(Response), + {send, #{id => integer_to_binary(Id), data => Data}, State#{id := Id + 1}} end. @@ -60,9 +71,14 @@ handle_error(Msg, Reason, State) -> terminate(_Reason, _Req, #{ep := Ep, req := Req = #{ctx := Ctx}}) -> Ctx1 = Ctx#{event => terminate}, {ok, #{aux := [{stream, closed}]}} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}), + ok; +terminate(_Reason, _Req, #{}) -> ok. -reply_error(Msg, Req, State) -> - {Code, Error} = mongoose_graphql_errors:format_error(Msg), +make_error(Phase, Term) -> + #{error_term => Term, phase => Phase}. + +reply_error(Reason, Req, State) -> + {Code, Error} = mongoose_graphql_errors:format_error(Reason), Body = jiffy:encode(#{errors => [Error]}), {shutdown, Code, #{}, Body, Req, State}. diff --git a/src/graphql/mongoose_graphql_stanza_helper.erl b/src/graphql/mongoose_graphql_stanza_helper.erl index 5862ed24f6a..edf5e345f21 100644 --- a/src/graphql/mongoose_graphql_stanza_helper.erl +++ b/src/graphql/mongoose_graphql_stanza_helper.erl @@ -5,6 +5,7 @@ -export([get_last_messages/5, row_to_map/1, handle_event/1]). -include("mongoose.hrl"). +-include("jlib.hrl"). -spec get_last_messages(Caller :: jid:jid(), Limit :: null | non_neg_integer(), @@ -22,22 +23,27 @@ get_last_messages(Caller, Limit, With, Before, CheckUser) -> Error end. --spec handle_event(term()) -> {binary(), map()} | no_event. +-spec handle_event(term()) -> {ok, map() | null}. handle_event({route, From, _To, Acc}) -> - Stanza = mongoose_acc:element(Acc), - StanzaID = exml_query:attr(Stanza, <<"id">>, <<>>), - Timestamp = os:system_time(microsecond), - Map = #{<<"sender">> => From, <<"timestamp">> => Timestamp, - <<"stanza_id">> => StanzaID, <<"stanza">> => Stanza}, - {<<"message">>, Map}; + case mongoose_acc:element(Acc) of + Stanza = #xmlel{name = <<"message">>} -> + StanzaID = exml_query:attr(Stanza, <<"id">>, <<>>), + Timestamp = os:system_time(microsecond), + stanza_result(From, Timestamp, StanzaID, Stanza); + _ -> + {ok, null} % Skip other stanza types + end; handle_event(Msg) -> ?UNEXPECTED_INFO(Msg), - no_event. + {ok, null}. -spec row_to_map(mod_mam:message_row()) -> {ok, map()}. -row_to_map(#{id := Id, jid := From, packet := Msg}) -> +row_to_map(#{id := Id, jid := From, packet := Stanza}) -> {Microseconds, _} = mod_mam_utils:decode_compact_uuid(Id), StanzaID = mod_mam_utils:mess_id_to_external_binary(Id), - Map = #{<<"sender">> => From, <<"timestamp">> => Microseconds, - <<"stanza_id">> => StanzaID, <<"stanza">> => Msg}, + stanza_result(From, Microseconds, StanzaID, Stanza). + +stanza_result(From, Timestamp, StanzaID, Stanza) -> + Map = #{<<"sender">> => From, <<"timestamp">> => Timestamp, + <<"stanza_id">> => StanzaID, <<"stanza">> => Stanza}, {ok, Map}. diff --git a/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl b/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl new file mode 100644 index 00000000000..ac4594090b3 --- /dev/null +++ b/src/graphql/user/mongoose_graphql_stanza_user_subscription.erl @@ -0,0 +1,20 @@ +-module(mongoose_graphql_stanza_user_subscription). +-behaviour(mongoose_graphql). + +-import(mongoose_graphql_helper, [format_result/2]). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("../mongoose_graphql_types.hrl"). + +execute(Ctx, _Obj, <<"getMessages">>, #{}) -> + get_messages(Ctx). + +get_messages(#{event := terminate, stream := Session}) -> + {ok, null, [{stream, mongoose_stanza_event_api:close_session(Session)}]}; +get_messages(#{event := Event}) -> + mongoose_graphql_stanza_helper:handle_event(Event); +get_messages(#{user := Jid}) -> + {ok, null, [{stream, mongoose_stanza_event_api:open_session(Jid)}]}. diff --git a/src/graphql/user/mongoose_graphql_user_subscription.erl b/src/graphql/user/mongoose_graphql_user_subscription.erl new file mode 100644 index 00000000000..39721f5a6ae --- /dev/null +++ b/src/graphql/user/mongoose_graphql_user_subscription.erl @@ -0,0 +1,11 @@ +-module(mongoose_graphql_user_subscription). +-behaviour(mongoose_graphql). + +-export([execute/4]). + +-ignore_xref([execute/4]). + +-include("../mongoose_graphql_types.hrl"). + +execute(_Ctx, _Obj, <<"stanza">>, _Args) -> + {ok, stanza}.