diff --git a/doc/configuration/outgoing-connections.md b/doc/configuration/outgoing-connections.md index ed99a563ffa..87808ad81bd 100644 --- a/doc/configuration/outgoing-connections.md +++ b/doc/configuration/outgoing-connections.md @@ -30,6 +30,10 @@ This allows you to create multiple dedicated pools of the same type. * `host` - the pool will be started for each XMPP host or host type served by MongooseIM * `single_host` - the pool will be started for the selected host or host type only (you must provide the name). + !!! Note + A global pool with name `default` is used by services that are not configured by host_type, like `service_domain_db` or `service_mongoose_system_metrics`, or by modules that don't support dynamic domains, like `mod_pubsub`. + If a global pool is not configured, these services will fail. + ## Worker pool options All pools are managed by the [inaka/worker_pool](https://github.com/inaka/worker_pool) library. diff --git a/src/rdbms/mongoose_rdbms.erl b/src/rdbms/mongoose_rdbms.erl index 19ba47ad29b..c24f4ca52f2 100644 --- a/src/rdbms/mongoose_rdbms.erl +++ b/src/rdbms/mongoose_rdbms.erl @@ -67,19 +67,19 @@ %% External exports -export([prepare/4, prepared/1, - execute/3, - execute_cast/3, - execute_request/3, - execute_wrapped_request/4, - execute_successfully/3, - sql_query/2, - sql_query_cast/2, - sql_query_request/2, + execute/3, execute/4, + execute_cast/3, execute_cast/4, + execute_request/3, execute_request/4, + execute_wrapped_request/4, execute_wrapped_request/5, + execute_successfully/3, execute_successfully/4, + sql_query/2, sql_query/3, + sql_query_cast/2, sql_query_cast/3, + sql_query_request/2, sql_query_request/3, + sql_transaction/2, sql_transaction/3, + sql_transaction_request/2, sql_transaction_request/3, + sql_dirty/2, sql_dirty/3, sql_query_t/1, - sql_transaction/2, - sql_transaction_request/2, transaction_with_delayed_retry/3, - sql_dirty/2, to_bool/1, db_engine/1, db_type/0, @@ -127,10 +127,16 @@ terminate/2, code_change/3]). --ignore_xref([sql_query_cast/2, sql_query_request/2, - execute_cast/3, execute_request/3, +-ignore_xref([ + sql_query_cast/2, sql_query_request/2, + execute/4, execute_wrapped_request/5, + sql_query/3, sql_query_cast/3, sql_query_request/3, + sql_dirty/3, sql_transaction/3, + execute_successfully/4, send_request/3, + execute_cast/3, execute_cast/4, + execute_request/3, execute_request/4, execute_wrapped_request/4, - sql_transaction_request/2, + sql_transaction_request/2, sql_transaction_request/3, sql_query_t/1, use_escaped/1, escape_like/1, escape_like_prefix/1, use_escaped_like/1, escape_binary/2, use_escaped_binary/1, @@ -152,6 +158,7 @@ }). -type state() :: #state{}. +-define(DEFAULT_POOL, default). -define(STATE_KEY, mongoose_rdbms_state). -define(MAX_TRANSACTION_RESTARTS, 10). -define(TRANSACTION_TIMEOUT, 60000). % milliseconds @@ -232,33 +239,62 @@ prepared(Name) -> -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> query_result(). -execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> - sql_call(HostType, {sql_execute, Name, Parameters}). +execute(HostType, Name, Parameters) -> + execute(HostType, ?DEFAULT_POOL, Name, Parameters). + +-spec execute(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) -> + query_result(). +execute(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) -> + sql_call(HostType, PoolName, {sql_execute, Name, Parameters}). -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - query_result(). -execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> - sql_cast(HostType, {sql_execute, Name, Parameters}). + query_result(). +execute_cast(HostType, Name, Parameters) -> + execute_cast(HostType, ?DEFAULT_POOL, Name, Parameters). + +-spec execute_cast(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) -> + query_result(). +execute_cast(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) -> + sql_cast(HostType, PoolName, {sql_execute, Name, Parameters}). -spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - request_id(). + request_id(). execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) -> - sql_request(HostType, {sql_execute, Name, Parameters}). + execute_request(HostType, ?DEFAULT_POOL, Name, Parameters). + +-spec execute_request(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) -> + request_id(). +execute_request(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) -> + sql_request(HostType, PoolName, {sql_execute, Name, Parameters}). + +-spec execute_wrapped_request( + HostType :: server(), + Name :: atom(), + Parameters :: [term()], + Wrapper :: request_wrapper()) -> request_id(). +execute_wrapped_request(HostType, Name, Parameters, Wrapper) -> + execute_wrapped_request(HostType, ?DEFAULT_POOL, Name, Parameters, Wrapper). -spec execute_wrapped_request( HostType :: server(), + PoolName :: atom(), Name :: atom(), Parameters :: [term()], Wrapper :: request_wrapper()) -> request_id(). -execute_wrapped_request(HostType, Name, Parameters, Wrapper) - when is_atom(Name), is_list(Parameters), is_function(Wrapper) -> - sql_request(HostType, {sql_execute_wrapped, Name, Parameters, Wrapper}). +execute_wrapped_request(HostType, PoolName, Name, Parameters, Wrapper) + when is_atom(PoolName), is_atom(Name), is_list(Parameters), is_function(Wrapper) -> + sql_request(HostType, PoolName, {sql_execute_wrapped, Name, Parameters, Wrapper}). %% Same as execute/3, but would fail loudly on any error. -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) -> - query_result(). + query_result(). execute_successfully(HostType, Name, Parameters) -> - try execute(HostType, Name, Parameters) of + execute_successfully(HostType, ?DEFAULT_POOL, Name, Parameters). + +-spec execute_successfully(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) -> + query_result(). +execute_successfully(HostType, PoolName, Name, Parameters) -> + try execute(HostType, PoolName, Name, Parameters) of {selected, _} = Result -> Result; {updated, _} = Result -> @@ -288,33 +324,53 @@ query_name_to_string(Name) -> -spec sql_query(HostType :: server(), Query :: any()) -> query_result(). sql_query(HostType, Query) -> - sql_call(HostType, {sql_query, Query}). + sql_query(HostType, ?DEFAULT_POOL, Query). + +-spec sql_query(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result(). +sql_query(HostType, PoolName, Query) -> + sql_call(HostType, PoolName, {sql_query, Query}). -spec sql_query_request(HostType :: server(), Query :: any()) -> request_id(). sql_query_request(HostType, Query) -> - sql_request(HostType, {sql_query, Query}). + sql_query_request(HostType, ?DEFAULT_POOL, Query). + +-spec sql_query_request(HostType :: server(), PoolName :: atom(), Query :: any()) -> request_id(). +sql_query_request(HostType, PoolName, Query) -> + sql_request(HostType, PoolName, {sql_query, Query}). -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result(). sql_query_cast(HostType, Query) -> - sql_cast(HostType, {sql_query, Query}). + sql_query_cast(HostType, ?DEFAULT_POOL, Query). + +-spec sql_query_cast(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result(). +sql_query_cast(HostType, PoolName, Query) -> + sql_cast(HostType, PoolName, {sql_query, Query}). %% @doc SQL transaction based on a list of queries -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result(). -sql_transaction(HostType, Queries) when is_list(Queries) -> +sql_transaction(HostType, Msg) -> + sql_transaction(HostType, ?DEFAULT_POOL, Msg). + +-spec sql_transaction(server(), atom(), fun() | maybe_improper_list()) -> transaction_result(). +sql_transaction(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) -> F = fun() -> lists:map(fun sql_query_t/1, Queries) end, - sql_transaction(HostType, F); + sql_transaction(HostType, PoolName, F); %% SQL transaction, based on a erlang anonymous function (F = fun) -sql_transaction(HostType, F) when is_function(F) -> - sql_call(HostType, {sql_transaction, F}). +sql_transaction(HostType, PoolName, F) when is_atom(PoolName), is_function(F) -> + sql_call(HostType, PoolName, {sql_transaction, F}). %% @doc SQL transaction based on a list of queries -spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id(). -sql_transaction_request(HostType, Queries) when is_list(Queries) -> +sql_transaction_request(HostType, Queries) -> + sql_transaction_request(HostType, ?DEFAULT_POOL, Queries). + +-spec sql_transaction_request(server(), atom(), fun() | maybe_improper_list()) -> request_id(). +sql_transaction_request(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) -> F = fun() -> lists:map(fun sql_query_t/1, Queries) end, - sql_transaction_request(HostType, F); + sql_transaction_request(HostType, PoolName, F); %% SQL transaction, based on a erlang anonymous function (F = fun) -sql_transaction_request(HostType, F) when is_function(F) -> - sql_request(HostType, {sql_transaction, F}). +sql_transaction_request(HostType, PoolName, F) when is_atom(PoolName), is_function(F) -> + sql_request(HostType, PoolName, {sql_transaction, F}). %% This function allows to specify delay between retries. -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result(). @@ -343,57 +399,61 @@ do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) -> end. -spec sql_dirty(server(), fun()) -> any() | no_return(). -sql_dirty(HostType, F) when is_function(F) -> - case sql_call(HostType, {sql_dirty, F}) of +sql_dirty(HostType, F) -> + sql_dirty(HostType, ?DEFAULT_POOL, F). + +-spec sql_dirty(server(), atom(), fun()) -> any() | no_return(). +sql_dirty(HostType, PoolName, F) when is_function(F) -> + case sql_call(HostType, PoolName, {sql_dirty, F}) of {ok, Result} -> Result; {error, Error} -> throw(Error) end. %% TODO: Better spec for RPC calls --spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_call(HostType, Msg) -> +-spec sql_call(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any(). +sql_call(HostType, PoolName, Msg) -> case get_state() of - undefined -> sql_call0(HostType, Msg); + undefined -> sql_call0(HostType, PoolName, Msg); State -> {Res, NewState} = nested_op(Msg, State), put_state(NewState), Res end. --spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_call0(HostType, Msg) -> +-spec sql_call0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any(). +sql_call0(HostType, PoolName, Msg) -> Timestamp = erlang:monotonic_time(millisecond), - mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + mongoose_wpool:call(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}). --spec sql_request(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_request(HostType, Msg) -> +-spec sql_request(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any(). +sql_request(HostType, PoolName, Msg) -> case get_state() of - undefined -> sql_request0(HostType, Msg); + undefined -> sql_request0(HostType, PoolName, Msg); State -> {Res, NewState} = nested_op(Msg, State), put_state(NewState), Res end. --spec sql_request0(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_request0(HostType, Msg) -> +-spec sql_request0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any(). +sql_request0(HostType, PoolName, Msg) -> Timestamp = erlang:monotonic_time(millisecond), - mongoose_wpool:send_request(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + mongoose_wpool:send_request(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}). --spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_cast(HostType, Msg) -> +-spec sql_cast(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any(). +sql_cast(HostType, PoolName, Msg) -> case get_state() of - undefined -> sql_cast0(HostType, Msg); + undefined -> sql_cast0(HostType, PoolName, Msg); State -> {Res, NewState} = nested_op(Msg, State), put_state(NewState), Res end. --spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any(). -sql_cast0(HostType, Msg) -> +-spec sql_cast0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any(). +sql_cast0(HostType, PoolName, Msg) -> Timestamp = erlang:monotonic_time(millisecond), - mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}). + mongoose_wpool:cast(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}). -spec get_db_info(Target :: server() | pid()) -> {ok, DbType :: atom(), DbRef :: term()} | {error, any()}. diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index 52f5acf26b6..81397e16168 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -38,7 +38,7 @@ -export([expand_pools/2]). -ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2, - send_request/2, send_request/4, send_request/5, + send_request/2, send_request/3, send_request/4, send_request/5, is_configured/2, is_configured/1, is_configured/1, start/2, start/3, start/5, start_configured_pools/1, start_configured_pools/2, stats/3, stop/1, stop/2]).