diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index a1bdbf1775b..24e51f7ecba 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -17,9 +17,7 @@ -module(rdbms_SUITE). -compile([export_all, nowarn_export_all]). --include_lib("escalus/include/escalus.hrl"). --include_lib("common_test/include/ct.hrl"). --include_lib("exml/include/exml.hrl"). +-include_lib("eunit/include/eunit.hrl"). %% We need assert from it -include("mam_helper.hrl"). @@ -67,6 +65,7 @@ rdbms_queries_cases() -> select_like_prep_case, insert_batch_with_null_case, + test_cast_insert, arguments_from_two_tables]. suite() -> @@ -341,6 +340,19 @@ insert_batch_with_null_case(Config) -> ?assert_equal({selected, [{null}, {null}, {<<"check1">>}, {<<"check2">>}]}, selected_to_sorted(SelectResult)). +test_cast_insert(Config) -> + erase_table(Config), + sql_prepare(Config, insert_one, test_types, [unicode], + <<"INSERT INTO test_types(unicode) VALUES (?)">>), + sql_execute_cast(Config, insert_one, [<<"check1">>]), + sql_query_cast(Config, <<"INSERT INTO test_types(unicode) VALUES ('check2')">>), + mongoose_helper:wait_until( + fun() -> + SelectResult = sql_query(Config, "SELECT unicode FROM test_types"), + ?assertEqual({selected, [{<<"check1">>}, {<<"check2">>}]}, + selected_to_sorted(SelectResult)) + end, ok, #{name => cast_queries}). + %%-------------------------------------------------------------------- %% Text searching %%-------------------------------------------------------------------- @@ -365,6 +377,12 @@ sql_prepare(_Config, Name, Table, Fields, Query) -> sql_execute(_Config, Name, Parameters) -> slow_rpc(mongoose_rdbms, execute, [host_type(), Name, Parameters]). +sql_execute_cast(_Config, Name, Parameters) -> + slow_rpc(mongoose_rdbms, execute_cast, [host_type(), Name, Parameters]). + +sql_query_cast(_Config, Query) -> + slow_rpc(mongoose_rdbms, sql_query_cast, [host_type(), Query]). + escape_null(_Config) -> escalus_ejabberd:rpc(mongoose_rdbms, escape_null, []). diff --git a/src/rdbms/mongoose_rdbms.erl b/src/rdbms/mongoose_rdbms.erl index 8646b9883d6..94f11d512ca 100644 --- a/src/rdbms/mongoose_rdbms.erl +++ b/src/rdbms/mongoose_rdbms.erl @@ -60,8 +60,10 @@ -export([prepare/4, prepared/1, execute/3, + execute_cast/3, execute_successfully/3, sql_query/2, + sql_query_cast/2, sql_query_t/1, sql_transaction/2, transaction_with_delayed_retry/3, @@ -114,7 +116,9 @@ terminate/2, code_change/3]). --ignore_xref([behaviour_info/1, print_state/1, sql_query_t/1, use_escaped/1, +-ignore_xref([behaviour_info/1, print_state/1, + sql_query_cast/2, execute_cast/3, + sql_query_t/1, use_escaped/1, escape_like/1, escape_like_prefix/1, use_escaped_like/1, escape_binary/2, use_escaped_binary/1, escape_integer/1, use_escaped_integer/1, @@ -188,6 +192,11 @@ prepared(Name) -> execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> sql_call(HostType, {sql_execute, Name, Parameters}). +-spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> + query_result(). +execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> + sql_cast(HostType, {sql_execute, Name, Parameters}). + %% Same as execute/3, but would fail loudly on any error. -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> query_result(). @@ -224,6 +233,10 @@ query_name_to_string(Name) -> sql_query(HostType, Query) -> sql_call(HostType, {sql_query, Query}). +-spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result(). +sql_query_cast(HostType, Query) -> + sql_cast(HostType, {sql_query, Query}). + %% @doc SQL transaction based on a list of queries -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result(). sql_transaction(HostType, Queries) when is_list(Queries) -> @@ -282,6 +295,21 @@ sql_call0(HostType, Msg) -> Timestamp = erlang:monotonic_time(millisecond), mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}). +-spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any(). +sql_cast(HostType, Msg) -> + case get_state() of + undefined -> sql_cast0(HostType, Msg); + State -> + {Res, NewState} = nested_op(Msg, State), + put_state(NewState), + Res + end. + +-spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any(). +sql_cast0(HostType, Msg) -> + Timestamp = erlang:monotonic_time(millisecond), + mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + -spec get_db_info(Target :: server() | pid()) -> {ok, DbType :: atom(), DbRef :: term()} | {error, any()}. get_db_info(Pid) when is_pid(Pid) -> @@ -519,13 +547,23 @@ init(Opts) -> handle_call({sql_cmd, Command, Timestamp}, From, State) -> - run_sql_cmd(Command, From, State, Timestamp); + {Result, NewState} = run_sql_cmd(Command, From, State, Timestamp), + case abort_on_driver_error(Result) of + {stop, Reason} -> {stop, Reason, Result, NewState}; + continue -> {reply, Result, NewState} + end; handle_call(get_db_info, _, #state{db_ref = DbRef} = State) -> {reply, {ok, db_engine(global), DbRef}, State}; handle_call(Request, From, State) -> ?UNEXPECTED_CALL(Request, From), {reply, {error, badarg}, State}. +handle_cast({sql_cmd, Command, Timestamp}, State) -> + {Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp), + case abort_on_driver_error(Result) of + {stop, Reason} -> {stop, Reason, NewState}; + continue -> {noreply, NewState} + end; handle_cast(Request, State) -> ?UNEXPECTED_CAST(Request), {noreply, State}. @@ -563,16 +601,15 @@ print_state(State) -> %%%---------------------------------------------------------------------- -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) -> - {reply, Reply :: any(), state()} | {stop, Reason :: term(), state()} | - {noreply, state()}. + {Result :: term(), state()}. run_sql_cmd(Command, _From, State, Timestamp) -> Now = erlang:monotonic_time(millisecond), case Now - Timestamp of Age when Age < ?TRANSACTION_TIMEOUT -> - abort_on_driver_error(outer_op(Command, State)); + outer_op(Command, State); Age -> ?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow, - text => <<"Discarding request">>, age => Age, command => Command}), + text => <<"Discarding request">>, age => Age, command => Command}), {reply, {error, timeout}, State} end. @@ -720,18 +757,13 @@ lookup_statement(Name) -> [] -> error({lookup_statement_failed, Name}) end. -%% @doc Generate the OTP callback return tuple depending on the driver result. --spec abort_on_driver_error({_, state()}) -> - {reply, Reply :: term(), state()} | - {stop, timeout | closed, state()}. -abort_on_driver_error({{error, "query timed out"} = Reply, State}) -> - %% mysql driver error - {stop, timeout, Reply, State}; -abort_on_driver_error({{error, "Failed sending data on socket" ++ _} = Reply, State}) -> - %% mysql driver error - {stop, closed, Reply, State}; -abort_on_driver_error({Reply, State}) -> - {reply, Reply, State}. +-spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}. +abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error + {stop, timeout}; +abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error + {stop, closed}; +abort_on_driver_error(_) -> + continue. -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined. db_engine(_HostType) ->