Skip to content

Commit

Permalink
Add tests for SSE
Browse files Browse the repository at this point in the history
  • Loading branch information
chrzaszcz committed Nov 3, 2022
1 parent 199e75e commit 68c9ff2
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 50 deletions.
1 change: 1 addition & 0 deletions big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
{suites, "tests", extdisco_SUITE}.
{suites, "tests", gdpr_SUITE}.
{suites, "tests", graphql_SUITE}.
{suites, "tests", graphql_sse_SUITE}.
{suites, "tests", graphql_account_SUITE}.
{suites, "tests", graphql_domain_SUITE}.
{suites, "tests", graphql_inbox_SUITE}.
Expand Down
27 changes: 19 additions & 8 deletions big_tests/tests/graphql_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ execute(EpName, Body, Creds) ->
-spec execute(node(), atom(), binary(), {binary(), binary()} | undefined) ->
{Status :: tuple(), Data :: map()}.
execute(Node, EpName, Body, Creds) ->
Request =
#{port => get_listener_port(Node, EpName),
role => {graphql, EpName},
method => <<"POST">>,
return_maps => true,
creds => Creds,
path => "/graphql",
body => Body},
Request = build_request(Node, EpName, Body, Creds),
rest_helper:make_request(Request).

build_request(Node, EpName, Body, Creds) ->
#{port => get_listener_port(Node, EpName),
role => {graphql, EpName},
method => <<"POST">>,
return_maps => true,
creds => Creds,
path => "/graphql",
body => Body}.

execute_sse(EpName, Params, Creds) ->
#{node := Node} = mim(),
execute_sse(Node, EpName, Params, Creds).
Expand Down Expand Up @@ -213,6 +215,9 @@ get_unauthorized({Code, #{<<"errors">> := Errors}}) ->
get_bad_request({Code, _Msg}) ->
assert_response_code(bad_request, Code).

get_method_not_allowed({Code, _Msg}) ->
assert_response_code(method_not_allowed, Code).

get_coercion_err_msg({Code, #{<<"errors">> := [Error]}}) ->
assert_response_code(bad_request, Code),
?assertEqual(<<"input_coercion">>, get_value([extensions, code], Error)),
Expand All @@ -236,8 +241,14 @@ get_ok_value(Path, {Code, Data}) ->

assert_response_code(bad_request, {<<"400">>, <<"Bad Request">>}) -> ok;
assert_response_code(unauthorized, {<<"401">>, <<"Unauthorized">>}) -> ok;
assert_response_code(method_not_allowed, {<<"405">>, <<"Method Not Allowed">>}) -> ok;
assert_response_code(error, {<<"200">>, <<"OK">>}) -> ok;
assert_response_code(ok, {<<"200">>, <<"OK">>}) -> ok;
assert_response_code(bad_request, 400) -> ok;
assert_response_code(unauthorized, 401) -> ok;
assert_response_code(method_not_allowed, 405) -> ok;
assert_response_code(error, 200) -> ok;
assert_response_code(ok, 200) -> ok;
assert_response_code(bad_request, {exit_status, 1}) -> ok;
assert_response_code(error, {exit_status, 1}) -> ok;
assert_response_code(ok, {exit_status, 0}) -> ok;
Expand Down
138 changes: 138 additions & 0 deletions big_tests/tests/graphql_sse_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
%% @doc Tests for the SSE handling of GraphQL subscriptions
-module(graphql_sse_SUITE).

-compile([export_all, nowarn_export_all]).

-import(distributed_helper, [mim/0, require_rpc_nodes/1, rpc/4]).
-import(graphql_helper, [get_bad_request/1, get_unauthorized/1, get_method_not_allowed/1,
build_request/4, execute_user_sse/3, build_request/4, make_creds/1,
execute_auth/2, execute_sse/3, execute_auth_sse/2]).

%% common_test callbacks

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

all() ->
[{group, admin},
{group, user}].

groups() ->
[{admin, [parallel], admin_tests()},
{user, [parallel], user_tests()}].

init_per_suite(Config) ->
Config1 = escalus:init_per_suite(Config),
application:ensure_all_started(gun),
Config1.

end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(user, Config) ->
graphql_helper:init_user(Config);
init_per_group(admin, Config) ->
graphql_helper:init_admin_handler(Config).

end_per_group(user, _Config) ->
escalus_fresh:clean(),
graphql_helper:clean();
end_per_group(_, _Config) ->
graphql_helper:clean().

init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

admin_tests() ->
[admin_missing_query,
admin_invalid_query_string,
admin_missing_creds,
admin_invalid_creds,
admin_invalid_method,
admin_invalid_operation_type].

user_tests() ->
[user_missing_query,
user_invalid_query_string,
user_missing_creds,
user_invalid_creds,
user_invalid_method,
user_invalid_operation_type].

%% Test cases and stories

admin_missing_query(Config) ->
get_bad_request(execute_auth_sse(#{}, Config)).

user_missing_query(Config) ->
escalus:fresh_story_with_config(Config, [{alice, 1}], fun user_missing_query_story/2).

user_missing_query_story(Config, Alice) ->
get_bad_request(execute_user_sse(#{}, Alice, Config)).

admin_invalid_query_string(_Config) ->
Port = graphql_helper:get_listener_port(admin),
get_bad_request(sse_helper:connect_to_sse(Port, "/api/graphql/sse?=invalid", undefined, #{})).

user_invalid_query_string(Config) ->
escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_query_string_story/1).

user_invalid_query_string_story(Alice) ->
Port = graphql_helper:get_listener_port(user),
Creds = make_creds(Alice),
get_bad_request(sse_helper:connect_to_sse(Port, "/api/graphql/sse?=invalid", Creds, #{})).

admin_missing_creds(_Config) ->
get_unauthorized(execute_sse(admin, #{query => doc(), variables => args()}, undefined)).

user_missing_creds(_Config) ->
get_unauthorized(execute_sse(user, #{query => doc()}, undefined)).

admin_invalid_creds(_Config) ->
Creds = {<<"invalid">>, <<"creds">>},
get_unauthorized(execute_sse(admin, #{query => doc(), variables => args()}, Creds)).

user_invalid_creds(_Config) ->
get_unauthorized(execute_sse(user, #{query => doc()}, {<<"invalid">>, <<"creds">>})).

admin_invalid_method(_Config) ->
#{node := Node} = mim(),
Request = build_request(Node, admin, #{query => doc(), variables => args()}, undefined),
%% POST was used, while SSE accepts only GET
get_method_not_allowed(rest_helper:make_request(Request#{path => "/graphql/sse"})).

user_invalid_method(Config) ->
escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_method_story/1).

user_invalid_method_story(Alice) ->
#{node := Node} = mim(),
Request = build_request(Node, user, #{query => doc()}, make_creds(Alice)),
%% POST was used, while SSE accepts only GET
get_method_not_allowed(rest_helper:make_request(Request#{path => "/graphql/sse"})).

admin_invalid_operation_type(Config) ->
Creds = graphql_helper:make_admin_creds(admin, Config),
get_bad_request(execute_sse(admin, #{query => query_doc(), variables => args()}, Creds)).

user_invalid_operation_type(Config) ->
escalus:fresh_story(Config, [{alice, 1}], fun user_invalid_operation_type_story/1).

user_invalid_operation_type_story(Alice) ->
get_bad_request(execute_sse(user, #{query => query_doc()}, make_creds(Alice))).

%% Helpers

%% Subscription - works only with the SSE handler
doc() ->
graphql_helper:get_doc(<<"stanza">>, <<"getMessages">>).

%% Query - works only with the REST handler
query_doc() ->
graphql_helper:get_doc(<<"stanza">>, <<"getLastMessages">>).

%% Same args used by both operations - only for Admin
args() ->
#{caller => <<"alice@localhost">>}.
12 changes: 11 additions & 1 deletion big_tests/tests/graphql_stanza_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ admin_stanza_cases() ->
admin_send_stanza_from_unknown_domain].

admin_sse_cases() ->
[admin_get_messages].
[admin_get_messages,
admin_get_messages_for_unknown_user].

admin_mam_cases() ->
[admin_get_last_messages,
Expand Down Expand Up @@ -164,6 +165,12 @@ admin_send_message_story(Config, Alice, Bob) ->
assert_not_empty(StanzaId),
escalus:assert(is_message, escalus:wait_for_stanza(Bob)).

admin_get_messages_for_unknown_user(Config) ->
Domain = domain_helper:domain(),
Res = get_messages(<<"baduser@", Domain/binary>>, Config),
?assertEqual(<<"unknown_user">>, get_err_code(Res)),
?assertEqual(<<"User does not exist">>, get_err_msg(Res)).

admin_get_messages(Config) ->
escalus:fresh_story_with_config(Config, [{alice, 1}, {bob, 1}],
fun admin_get_messages_story/3).
Expand All @@ -172,6 +179,9 @@ admin_get_messages_story(Config, Alice, Bob) ->
From = escalus_client:full_jid(Alice),
To = escalus_client:short_jid(Bob),
{200, Stream} = get_messages(To, Config),
%% Presence should be skipped
escalus:send(Alice, escalus_stanza:presence_direct(To, <<"available">>)),
%% Message should be delivered
escalus:send(Alice, escalus_stanza:chat(From, To, <<"Hi!">>)),
Event = sse_helper:wait_for_event(Stream),
#{<<"stanza">> := StanzaBin, <<"sender">> := From} =
Expand Down
12 changes: 8 additions & 4 deletions big_tests/tests/sse_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,28 @@
-compile([export_all, nowarn_export_all]).

connect_to_sse(Port, Path, Creds, ExtraOpts) ->
ct:log("Connect to SSE, port: ~p, path: ~s, creds: ~p, extra opts: ~p",
[Port, Path, Creds, ExtraOpts]),
Headers = auth_headers(Creds) ++ [{<<"host">>, <<"localhost">>},
{<<"accept">>, <<"text/event-stream">>}],
Opts = maps:merge(basic_opts(), ExtraOpts),
{ok, ConnPid} = gun:open("localhost", Port, Opts),
{ok, _} = gun:await_up(ConnPid),
StreamRef = gun:get(ConnPid, Path, Headers),
{response, nofin, Status, _Headers} = gun:await(ConnPid, StreamRef),
Result = case Status of
200 ->
{response, nofin, Status, RespHeaders} = gun:await(ConnPid, StreamRef),
Result = case {Status, maps:from_list(RespHeaders)} of
{200, #{<<"content-type">> := <<"text/event-stream">>}} ->
#{pid => ConnPid, stream_ref => StreamRef};
_ ->
{ok, Body} = gun:await_body(ConnPid, StreamRef),
jiffy:decode(Body, [return_maps])
end,
ct:log("SSE response code: ~p, headers: ~p, result: ~p", [Status, RespHeaders, Result]),
{Status, Result}.

wait_for_event(#{pid := Pid, stream_ref := StreamRef}) ->
{sse, #{data := [Response]}} = gun:await(Pid, StreamRef),
{sse, #{data := [Response]} = Event} = gun:await(Pid, StreamRef),
ct:log("Received SSE event: ~p", [Event]),
jiffy:decode(Response, [return_maps]).

stop_sse(#{pid := Pid}) ->
Expand Down
4 changes: 3 additions & 1 deletion priv/graphql/schemas/admin/stanza.gql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type StanzaAdminMutation @protected{
@protected(type: GLOBAL)
}

type StanzaAdminSubscription {
type StanzaAdminSubscription @protected{
"Subscribe for the given user's incoming messages"
getMessages(caller: JID!): StanzaMap
@protected(type: DOMAIN, args: ["caller"])
}
3 changes: 2 additions & 1 deletion priv/graphql/schemas/user/stanza.gql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type StanzaUserMutation @protected{
sendStanza(stanza: Stanza): SendStanzaPayload
}

type StanzaUserSubscription {
type StanzaUserSubscription @protected{
"Subscribe to incoming messages"
getMessages: StanzaMap
}
10 changes: 8 additions & 2 deletions src/graphql/admin/mongoose_graphql_stanza_admin_subscription.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ execute(Ctx, _Obj, <<"getMessages">>, Args) ->
get_messages(Ctx, Args).

get_messages(#{event := terminate, stream := Session}, _) ->
{ok, null, [{stream, mongoose_stanza_event_api:close_session(Session)}]};
mongoose_stanza_api:close_session(Session),
{ok, null, [{stream, closed}]};
get_messages(#{event := Event}, _) ->
mongoose_graphql_stanza_helper:handle_event(Event);
get_messages(_Ctx, #{<<"caller">> := Jid}) ->
{ok, null, [{stream, mongoose_stanza_event_api:open_session(Jid)}]}.
case mongoose_stanza_api:open_session(Jid, true) of
{ok, Stream} ->
{ok, null, [{stream, Stream}]};
Error ->
format_result(Error, #{caller => Jid})
end.
2 changes: 2 additions & 0 deletions src/graphql/mongoose_graphql_errors.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ decode_err_msg(no_query_supplied) ->
"The query was not supplied in the request body";
decode_err_msg(invalid_json_body) ->
"The request JSON body is invalid";
decode_err_msg(invalid_query_parameters) ->
"The query string is invalid";
decode_err_msg(variables_invalid_json) ->
"The variables' JSON is invalid".

Expand Down
16 changes: 11 additions & 5 deletions src/graphql/mongoose_graphql_sse_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ handle_notify(Msg, State) ->

handle_info(Event, State = #{ep := Ep, req := Req = #{ctx := Ctx}, id := Id}) ->
Ctx1 = Ctx#{event => Event},
case mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}) of
{ok, #{data := null}} ->
{ok, #{data := Data} = Response} = mongoose_graphql:execute(Ep, Req#{ctx := Ctx1}),
case has_non_null_value(Data) of
false ->
{nosend, State};
{ok, #{} = Response} ->
Data = mongoose_graphql_response:term_to_json(Response),
{send, #{id => integer_to_binary(Id), data => Data},
true ->
EventData = mongoose_graphql_response:term_to_json(Response),
{send, #{id => integer_to_binary(Id), data => EventData},
State#{id := Id + 1}}
end.

%% Check if there is any value that is non-null. Any list is considered non-null.
has_non_null_value(M) when is_map(M) ->
lists:any(fun has_non_null_value/1, maps:values(M));
has_non_null_value(V) -> V =/= null.

handle_error(Msg, Reason, State) ->
?LOG_ERROR(#{what => mongoose_graphql_sse_handler_failed,
reason => Reason, text => Msg}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ execute(Ctx, _Obj, <<"getMessages">>, #{}) ->
get_messages(Ctx).

get_messages(#{event := terminate, stream := Session}) ->
{ok, null, [{stream, mongoose_stanza_event_api:close_session(Session)}]};
mongoose_stanza_api:close_session(Session),
{ok, null, [{stream, closed}]};
get_messages(#{event := Event}) ->
mongoose_graphql_stanza_helper:handle_event(Event);
get_messages(#{user := Jid}) ->
{ok, null, [{stream, mongoose_stanza_event_api:open_session(Jid)}]}.
{ok, Stream} = mongoose_stanza_api:open_session(Jid, false),
{ok, null, [{stream, Stream}]}.
4 changes: 2 additions & 2 deletions src/mongoose_client_api/mongoose_client_api_sse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ init(_InitArgs, _LastEvtId, Req) ->
maybe_init(Authorization, Req2, State#{id => 1}).

maybe_init(true, Req, #{jid := JID} = State) ->
Session = mongoose_stanza_event_api:open_session(JID),
Session = mongoose_stanza_api:open_session(JID, false),
{ok, Req, State#{session => Session}};
maybe_init(true, Req, State) ->
%% This is for OPTIONS method
Expand Down Expand Up @@ -54,7 +54,7 @@ handle_error(Msg, Reason, State) ->
{nosend, State}.

terminate(_Reason, _Req, #{session := Session}) ->
mongoose_stanza_event_api:close_session(Session),
mongoose_stanza_api:close_session(Session),
ok;
terminate(_Reason, _Req, _State) ->
ok.
Expand Down
Loading

0 comments on commit 68c9ff2

Please sign in to comment.