Skip to content

Commit

Permalink
Merge pull request #3485 from esl/rdbms_casts
Browse files Browse the repository at this point in the history
Rdbms casts
  • Loading branch information
arcusfelis authored Jan 4, 2022
2 parents 1262811 + 73e02d9 commit ac670f8
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 21 deletions.
24 changes: 21 additions & 3 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
-module(rdbms_SUITE).
-compile([export_all, nowarn_export_all]).

-include_lib("escalus/include/escalus.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("exml/include/exml.hrl").
-include_lib("eunit/include/eunit.hrl").

%% We need assert from it
-include("mam_helper.hrl").
Expand Down Expand Up @@ -67,6 +65,7 @@ rdbms_queries_cases() ->
select_like_prep_case,

insert_batch_with_null_case,
test_cast_insert,
arguments_from_two_tables].

suite() ->
Expand Down Expand Up @@ -341,6 +340,19 @@ insert_batch_with_null_case(Config) ->
?assert_equal({selected, [{null}, {null}, {<<"check1">>}, {<<"check2">>}]},
selected_to_sorted(SelectResult)).

test_cast_insert(Config) ->
erase_table(Config),
sql_prepare(Config, insert_one, test_types, [unicode],
<<"INSERT INTO test_types(unicode) VALUES (?)">>),
sql_execute_cast(Config, insert_one, [<<"check1">>]),
sql_query_cast(Config, <<"INSERT INTO test_types(unicode) VALUES ('check2')">>),
mongoose_helper:wait_until(
fun() ->
SelectResult = sql_query(Config, "SELECT unicode FROM test_types"),
?assertEqual({selected, [{<<"check1">>}, {<<"check2">>}]},
selected_to_sorted(SelectResult))
end, ok, #{name => cast_queries}).

%%--------------------------------------------------------------------
%% Text searching
%%--------------------------------------------------------------------
Expand All @@ -365,6 +377,12 @@ sql_prepare(_Config, Name, Table, Fields, Query) ->
sql_execute(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute, [host_type(), Name, Parameters]).

sql_execute_cast(_Config, Name, Parameters) ->
slow_rpc(mongoose_rdbms, execute_cast, [host_type(), Name, Parameters]).

sql_query_cast(_Config, Query) ->
slow_rpc(mongoose_rdbms, sql_query_cast, [host_type(), Query]).

escape_null(_Config) ->
escalus_ejabberd:rpc(mongoose_rdbms, escape_null, []).

Expand Down
68 changes: 50 additions & 18 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@
-export([prepare/4,
prepared/1,
execute/3,
execute_cast/3,
execute_successfully/3,
sql_query/2,
sql_query_cast/2,
sql_query_t/1,
sql_transaction/2,
transaction_with_delayed_retry/3,
Expand Down Expand Up @@ -114,7 +116,9 @@
terminate/2,
code_change/3]).

-ignore_xref([behaviour_info/1, print_state/1, sql_query_t/1, use_escaped/1,
-ignore_xref([behaviour_info/1, print_state/1,
sql_query_cast/2, execute_cast/3,
sql_query_t/1, use_escaped/1,
escape_like/1, escape_like_prefix/1, use_escaped_like/1,
escape_binary/2, use_escaped_binary/1,
escape_integer/1, use_escaped_integer/1,
Expand Down Expand Up @@ -188,6 +192,11 @@ prepared(Name) ->
execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_call(HostType, {sql_execute, Name, Parameters}).

-spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
sql_cast(HostType, {sql_execute, Name, Parameters}).

%% Same as execute/3, but would fail loudly on any error.
-spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
query_result().
Expand Down Expand Up @@ -224,6 +233,10 @@ query_name_to_string(Name) ->
sql_query(HostType, Query) ->
sql_call(HostType, {sql_query, Query}).

-spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
sql_query_cast(HostType, Query) ->
sql_cast(HostType, {sql_query, Query}).

%% @doc SQL transaction based on a list of queries
-spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
sql_transaction(HostType, Queries) when is_list(Queries) ->
Expand Down Expand Up @@ -282,6 +295,21 @@ sql_call0(HostType, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).

-spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_cast(HostType, Msg) ->
case get_state() of
undefined -> sql_cast0(HostType, Msg);
State ->
{Res, NewState} = nested_op(Msg, State),
put_state(NewState),
Res
end.

-spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
sql_cast0(HostType, Msg) ->
Timestamp = erlang:monotonic_time(millisecond),
mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).

-spec get_db_info(Target :: server() | pid()) ->
{ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
get_db_info(Pid) when is_pid(Pid) ->
Expand Down Expand Up @@ -519,13 +547,23 @@ init(Opts) ->


handle_call({sql_cmd, Command, Timestamp}, From, State) ->
run_sql_cmd(Command, From, State, Timestamp);
{Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
case abort_on_driver_error(Result) of
{stop, Reason} -> {stop, Reason, Result, NewState};
continue -> {reply, Result, NewState}
end;
handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
{reply, {ok, db_engine(global), DbRef}, State};
handle_call(Request, From, State) ->
?UNEXPECTED_CALL(Request, From),
{reply, {error, badarg}, State}.

handle_cast({sql_cmd, Command, Timestamp}, State) ->
{Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
case abort_on_driver_error(Result) of
{stop, Reason} -> {stop, Reason, NewState};
continue -> {noreply, NewState}
end;
handle_cast(Request, State) ->
?UNEXPECTED_CAST(Request),
{noreply, State}.
Expand Down Expand Up @@ -563,16 +601,15 @@ print_state(State) ->
%%%----------------------------------------------------------------------

-spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
{reply, Reply :: any(), state()} | {stop, Reason :: term(), state()} |
{noreply, state()}.
{Result :: term(), state()}.
run_sql_cmd(Command, _From, State, Timestamp) ->
Now = erlang:monotonic_time(millisecond),
case Now - Timestamp of
Age when Age < ?TRANSACTION_TIMEOUT ->
abort_on_driver_error(outer_op(Command, State));
outer_op(Command, State);
Age ->
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
text => <<"Discarding request">>, age => Age, command => Command}),
text => <<"Discarding request">>, age => Age, command => Command}),
{reply, {error, timeout}, State}
end.

Expand Down Expand Up @@ -720,18 +757,13 @@ lookup_statement(Name) ->
[] -> error({lookup_statement_failed, Name})
end.

%% @doc Generate the OTP callback return tuple depending on the driver result.
-spec abort_on_driver_error({_, state()}) ->
{reply, Reply :: term(), state()} |
{stop, timeout | closed, state()}.
abort_on_driver_error({{error, "query timed out"} = Reply, State}) ->
%% mysql driver error
{stop, timeout, Reply, State};
abort_on_driver_error({{error, "Failed sending data on socket" ++ _} = Reply, State}) ->
%% mysql driver error
{stop, closed, Reply, State};
abort_on_driver_error({Reply, State}) ->
{reply, Reply, State}.
-spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
{stop, timeout};
abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
{stop, closed};
abort_on_driver_error(_) ->
continue.

-spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
db_engine(_HostType) ->
Expand Down

0 comments on commit ac670f8

Please sign in to comment.