Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rdbms casts #3485

Merged
merged 2 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
-module(rdbms_SUITE).
-compile([export_all, nowarn_export_all]).

-include_lib("escalus/include/escalus.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("exml/include/exml.hrl").
-include_lib("eunit/include/eunit.hrl").

%% We need assert from it
-include("mam_helper.hrl").
Expand Down Expand Up @@ -67,6 +65,7 @@ rdbms_queries_cases() ->
select_like_prep_case,

insert_batch_with_null_case,
test_cast_insert,
arguments_from_two_tables].

suite() ->
Expand Down Expand Up @@ -341,6 +340,19 @@ insert_batch_with_null_case(Config) ->
?assert_equal({selected, [{null}, {null}, {<<"check1">>}, {<<"check2">>}]},
selected_to_sorted(SelectResult)).

test_cast_insert(Config) ->
erase_table(Config),
sql_prepare(Config, insert_one, test_types, [unicode],
<<"INSERT INTO test_types(unicode) VALUES (?)">>),
sql_execute_cast(Config, insert_one, [<<"check1">>]),
sql_query_cast(Config, <<"INSERT INTO test_types(unicode) VALUES ('check2')">>),
mongoose_helper:wait_until(
fun() ->
SelectResult = sql_query(Config, "SELECT unicode FROM test_types"),
?assertEqual({selected, [{<<"check1">>}, {<<"check2">>}]},
selected_to_sorted(SelectResult))
end, ok, #{name => cast_queries}).

%%--------------------------------------------------------------------
%% Text searching
%%--------------------------------------------------------------------
Expand All @@ -365,6 +377,12 @@ sql_prepare(_Config, Name, Table, Fields, Query) ->
sql_execute(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute, [host_type(), Name, Parameters]).

sql_execute_cast(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute_cast, [host_type(), Name, Parameters]).

sql_query_cast(_Config, Query) ->
slow_rpc(mongoose_rdbms, sql_query_cast, [host_type(), Query]).

escape_null(_Config) ->
escalus_ejabberd:rpc(mongoose_rdbms, escape_null, []).

Expand Down
68 changes: 50 additions & 18 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@
-export([prepare/4,
prepared/1,
execute/3,
execute_cast/3,
execute_successfully/3,
sql_query/2,
sql_query_cast/2,
sql_query_t/1,
sql_transaction/2,
transaction_with_delayed_retry/3,
Expand Down Expand Up @@ -114,7 +116,9 @@
terminate/2,
code_change/3]).

-ignore_xref([behaviour_info/1, print_state/1, sql_query_t/1, use_escaped/1,
-ignore_xref([behaviour_info/1, print_state/1,
sql_query_cast/2, execute_cast/3,
sql_query_t/1, use_escaped/1,
escape_like/1, escape_like_prefix/1, use_escaped_like/1,
escape_binary/2, use_escaped_binary/1,
escape_integer/1, use_escaped_integer/1,
Expand Down Expand Up @@ -188,6 +192,11 @@ prepared(Name) ->
execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_call(HostType, {sql_execute, Name, Parameters}).

-spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_cast(HostType, {sql_execute, Name, Parameters}).

%% Same as execute/3, but would fail loudly on any error.
-spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
Expand Down Expand Up @@ -224,6 +233,10 @@ query_name_to_string(Name) ->
sql_query(HostType, Query) ->
sql_call(HostType, {sql_query, Query}).

-spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
sql_query_cast(HostType, Query) ->
sql_cast(HostType, {sql_query, Query}).

%% @doc SQL transaction based on a list of queries
-spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
sql_transaction(HostType, Queries) when is_list(Queries) ->
Expand Down Expand Up @@ -282,6 +295,21 @@ sql_call0(HostType, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).

-spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_cast(HostType, Msg) ->
case get_state() of
undefined -> sql_cast0(HostType, Msg);
State ->
{Res, NewState} = nested_op(Msg, State),
put_state(NewState),
Res
end.

-spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_cast0(HostType, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).

-spec get_db_info(Target :: server() | pid()) ->
{ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
get_db_info(Pid) when is_pid(Pid) ->
Expand Down Expand Up @@ -519,13 +547,23 @@ init(Opts) ->


handle_call({sql_cmd, Command, Timestamp}, From, State) ->
run_sql_cmd(Command, From, State, Timestamp);
{Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
case abort_on_driver_error(Result) of
{stop, Reason} -> {stop, Reason, Result, NewState};
continue -> {reply, Result, NewState}
end;
handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
{reply, {ok, db_engine(global), DbRef}, State};
handle_call(Request, From, State) ->
?UNEXPECTED_CALL(Request, From),
{reply, {error, badarg}, State}.

handle_cast({sql_cmd, Command, Timestamp}, State) ->
{Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
case abort_on_driver_error(Result) of
{stop, Reason} -> {stop, Reason, NewState};
continue -> {noreply, NewState}
end;
handle_cast(Request, State) ->
?UNEXPECTED_CAST(Request),
{noreply, State}.
Expand Down Expand Up @@ -563,16 +601,15 @@ print_state(State) ->
%%%----------------------------------------------------------------------

-spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
{reply, Reply :: any(), state()} | {stop, Reason :: term(), state()} |
{noreply, state()}.
{Result :: term(), state()}.
run_sql_cmd(Command, _From, State, Timestamp) ->
Now = erlang:monotonic_time(millisecond),
case Now - Timestamp of
Age when Age < ?TRANSACTION_TIMEOUT ->
abort_on_driver_error(outer_op(Command, State));
outer_op(Command, State);
Age ->
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
text => <<"Discarding request">>, age => Age, command => Command}),
text => <<"Discarding request">>, age => Age, command => Command}),
{reply, {error, timeout}, State}
end.

Expand Down Expand Up @@ -720,18 +757,13 @@ lookup_statement(Name) ->
[] -> error({lookup_statement_failed, Name})
end.

%% @doc Generate the OTP callback return tuple depending on the driver result.
-spec abort_on_driver_error({_, state()}) ->
{reply, Reply :: term(), state()} |
{stop, timeout | closed, state()}.
abort_on_driver_error({{error, "query timed out"} = Reply, State}) ->
%% mysql driver error
{stop, timeout, Reply, State};
abort_on_driver_error({{error, "Failed sending data on socket" ++ _} = Reply, State}) ->
%% mysql driver error
{stop, closed, Reply, State};
abort_on_driver_error({Reply, State}) ->
{reply, Reply, State}.
-spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
{stop, timeout};
abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
{stop, closed};
abort_on_driver_error(_) ->
continue.

-spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
db_engine(_HostType) ->
Expand Down