Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use maps for mongoose_wpool options #3645

Merged
merged 17 commits into from
May 13, 2022
218 changes: 90 additions & 128 deletions src/config/mongoose_config_spec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
-export([root/0]).

%% spec parts used by http handlers, modules and services
-export([wpool_items/0,
wpool_defaults/0,
-export([wpool/1,
iqdisc/0,
xmpp_listener_extra/1]).

Expand All @@ -23,11 +22,8 @@
process_sasl_mechanism/1,
process_auth/1,
process_pool/2,
process_cassandra_auth/1,
process_rdbms_connection/1,
process_ldap_connection/1,
process_riak_tls/1,
process_cassandra_server/1,
process_riak_credentials/1,
process_iqdisc/1,
process_acl_condition/1,
process_s2s_host_policy/1,
Expand Down Expand Up @@ -487,136 +483,137 @@ outgoing_pools() ->
Items = [{Type, #section{items = #{default => outgoing_pool(Type)},
validate_keys = non_empty,
wrap = none}} || Type <- PoolTypes],
#section{
items = maps:from_list(Items),
wrap = global_config
}.
#section{items = maps:from_list(Items),
wrap = global_config,
include = always}.

%% path: outgoing_pools.*.*
outgoing_pool(Type) ->
WPool = wpool_items(),
#section{
items = WPool#{<<"scope">> => #option{type = atom,
validate = {enum, [global, host, single_host]}},
<<"host">> => #option{type = binary,
validate = non_empty},
<<"connection">> => outgoing_pool_connection(Type)
},
process = fun ?MODULE:process_pool/2,
format_items = map,
wrap = item,
defaults = maps:merge(#{<<"scope">> => global}, wpool_defaults(Type))
}.

wpool_items() ->
#{<<"workers">> => #option{type = integer,
validate = positive},
<<"strategy">> => #option{type = atom,
validate = {enum, wpool_strategy_values()}},
<<"call_timeout">> => #option{type = integer,
validate = positive}
}.

wpool_defaults(<<"cassandra">>) ->
maps:merge(wpool_defaults(), #{<<"workers">> => 20});
wpool_defaults(<<"rdbms">>) ->
maps:merge(wpool_defaults(), #{<<"call_timeout">> => 60000});
wpool_defaults(_) ->
wpool_defaults().

wpool_defaults() ->
#{<<"workers">> => 10,
<<"strategy">> => best_worker,
<<"call_timeout">> => 5000}.
ExtraDefaults = extra_wpool_defaults(Type),
Pool = mongoose_config_utils:merge_sections(wpool(ExtraDefaults), outgoing_pool_extra(Type)),
Pool#section{wrap = item}.

extra_wpool_defaults(<<"cassandra">>) ->
#{<<"workers">> => 20};
extra_wpool_defaults(<<"rdbms">>) ->
#{<<"call_timeout">> => 60000};
extra_wpool_defaults(_) ->
#{}.

wpool(ExtraDefaults) ->
#section{items = #{<<"workers">> => #option{type = integer,
validate = positive},
<<"strategy">> => #option{type = atom,
validate = {enum, wpool_strategy_values()}},
<<"call_timeout">> => #option{type = integer,
validate = positive}
},
defaults = maps:merge(#{<<"workers">> => 10,
<<"strategy">> => best_worker,
<<"call_timeout">> => 5000}, ExtraDefaults),
format_items = map}.

outgoing_pool_extra(Type) ->
#section{items = #{<<"scope">> => #option{type = atom,
validate = {enum, [global, host, single_host]}},
<<"host">> => #option{type = binary,
validate = non_empty},
<<"connection">> => outgoing_pool_connection(Type)
},
process = fun ?MODULE:process_pool/2,
format_items = map,
defaults = #{<<"scope">> => global}
}.

%% path: outgoing_pools.*.*.connection
outgoing_pool_connection(<<"cassandra">>) ->
#section{
items = #{<<"servers">> => #list{items = cassandra_server()},
items = #{<<"servers">> => #list{items = cassandra_server(),
validate = unique_non_empty},
<<"keyspace">> => #option{type = atom,
validate = non_empty},
<<"auth">> => #section{items = #{<<"plain">> => cassandra_auth_plain()},
required = all,
process = fun ?MODULE:process_cassandra_auth/1},
format_items = map},
<<"tls">> => #section{items = tls_items(),
wrap = {kv, ssl},
process = fun ?MODULE:process_tls_sni/1}
},
format_items = map,
include = always,
defaults = #{<<"servers">> => [{"localhost", 9042}],
defaults = #{<<"servers">> => [#{host => "localhost", port => 9042}],
<<"keyspace">> => mongooseim}
};
outgoing_pool_connection(<<"elastic">>) ->
#section{
items = #{<<"host">> => #option{type = string,
items = #{<<"host">> => #option{type = binary,
validate = non_empty},
<<"port">> => #option{type = integer,
validate = port}
},
format_items = map,
include = always,
defaults = #{<<"host">> => "localhost",
defaults = #{<<"host">> => <<"localhost">>,
<<"port">> => 9200}
};
outgoing_pool_connection(<<"http">>) ->
#section{
items = #{<<"host">> => #option{type = string,
validate = non_empty,
wrap = {kv, server}},
<<"path_prefix">> => #option{type = string,
validate = non_empty},
<<"path_prefix">> => #option{type = binary,
validate = non_empty},
<<"request_timeout">> => #option{type = integer,
validate = non_negative},
<<"tls">> => #section{items = tls_items(),
wrap = {kv, http_opts},
process = fun ?MODULE:process_tls_sni/1}
},
format_items = map,
include = always,
defaults = #{<<"path_prefix">> => "/",
required = [<<"host">>],
defaults = #{<<"path_prefix">> => <<"/">>,
<<"request_timeout">> => 2000}
};
outgoing_pool_connection(<<"ldap">>) ->
#section{
items = #{<<"port">> => #option{type = integer,
items = #{<<"servers">> => #list{items = #option{type = string},
validate = unique_non_empty},
<<"port">> => #option{type = integer,
validate = port},
<<"rootdn">> => #option{type = binary},
<<"root_dn">> => #option{type = binary},
<<"password">> => #option{type = binary},
<<"encrypt">> => #option{type = atom,
validate = {enum, [none, tls]}},
<<"servers">> => #list{items = #option{type = string}},
<<"connect_interval">> => #option{type = integer,
validate = positive},
<<"tls">> => #section{items = tls_items(),
wrap = {kv, tls_options},
process = fun ?MODULE:process_tls_sni/1}
},
format_items = map,
include = always,
defaults = #{<<"rootdn">> => <<>>,
defaults = #{<<"servers">> => ["localhost"],
<<"root_dn">> => <<>>,
<<"password">> => <<>>,
<<"encrypt">> => none,
<<"servers">> => ["localhost"],
<<"connect_interval">> => 10000}
<<"connect_interval">> => 10000},
process = fun ?MODULE:process_ldap_connection/1
};
outgoing_pool_connection(<<"rabbit">>) ->
#section{
items = #{<<"amqp_host">> => #option{type = string,
validate = non_empty},
<<"amqp_port">> => #option{type = integer,
validate = port},
<<"amqp_username">> => #option{type = binary,
validate = non_empty},
<<"amqp_password">> => #option{type = binary,
validate = non_empty},
items = #{<<"host">> => #option{type = string,
validate = non_empty},
<<"port">> => #option{type = integer,
validate = port},
<<"username">> => #option{type = binary,
validate = non_empty},
<<"password">> => #option{type = binary,
validate = non_empty},
<<"confirms_enabled">> => #option{type = boolean},
<<"max_worker_queue_len">> => #option{type = int_or_infinity,
validate = non_negative}
},
format_items = map,
include = always,
defaults = #{<<"confirms_enabled">> => false,
defaults = #{<<"host">> => "localhost",
<<"port">> => 5672,
<<"username">> => <<"guest">>,
<<"password">> => <<"guest">>,
<<"confirms_enabled">> => false,
<<"max_worker_queue_len">> => 1000}
};
outgoing_pool_connection(<<"rdbms">>) ->
Expand All @@ -625,6 +622,8 @@ outgoing_pool_connection(<<"rdbms">>) ->
validate = {enum, [odbc, pgsql, mysql]}},
<<"keepalive_interval">> => #option{type = integer,
validate = positive},
<<"max_start_interval">> => #option{type = integer,
validate = positive},

% odbc
<<"settings">> => #option{type = string},
Expand All @@ -642,7 +641,9 @@ outgoing_pool_connection(<<"rdbms">>) ->
validate = port},
<<"tls">> => sql_tls()
},
process = fun ?MODULE:process_rdbms_connection/1,
required = [<<"driver">>],
defaults = #{<<"max_start_interval">> => 30},
process = fun mongoose_rdbms:process_options/1,
format_items = map
};
outgoing_pool_connection(<<"redis">>) ->
Expand Down Expand Up @@ -670,27 +671,29 @@ outgoing_pool_connection(<<"riak">>) ->
validate = port},
<<"credentials">> => riak_credentials(),
<<"tls">> => #section{items = tls_items(),
process = fun ?MODULE:process_riak_tls/1,
wrap = none}},
process = fun ?MODULE:process_riak_tls/1}},
required = [<<"address">>, <<"port">>],
format_items = map
}.

cassandra_server() ->
#section{
items = #{<<"ip_address">> => #option{type = string,
validate = non_empty},
items = #{<<"host">> => #option{type = string,
validate = non_empty},
<<"port">> => #option{type = integer,
validate = port}},
required = [<<"ip_address">>],
process = fun ?MODULE:process_cassandra_server/1
required = [<<"host">>],
defaults = #{<<"port">> => 9042},
format_items = map
}.

%% path: outgoing_pools.cassandra.*.connection.auth.plain
cassandra_auth_plain() ->
#section{
items = #{<<"username">> => #option{type = binary},
<<"password">> => #option{type = binary}},
required = all
required = all,
format_items = map
}.

%% path: outgoing_pools.riak.*.connection.credentials
Expand All @@ -701,7 +704,7 @@ riak_credentials() ->
<<"password">> => #option{type = string,
validate = non_empty}},
required = all,
process = fun ?MODULE:process_riak_credentials/1
format_items = map
}.

%% path: outgoing_pools.rdbms.*.connection.tls
Expand Down Expand Up @@ -1129,9 +1132,8 @@ check_auth_method(Method, Opts) ->
false -> error(#{what => missing_section_for_auth_method, auth_method => Method})
end.

process_pool([Tag, Type|_], AllOpts = #{scope := ScopeIn}) ->
process_pool([Tag, Type|_], AllOpts = #{scope := ScopeIn, connection := Connection}) ->
Scope = pool_scope(ScopeIn, maps:get(host, AllOpts, none)),
Connection = maps:get(connection, AllOpts, #{}),
Opts = maps:without([scope, host, connection], AllOpts),
#{type => b2a(Type),
scope => Scope,
Expand All @@ -1146,45 +1148,9 @@ pool_scope(single_host, Host) -> Host;
pool_scope(host, none) -> host;
pool_scope(global, none) -> global.
Premwoik marked this conversation as resolved.
Show resolved Hide resolved

process_cassandra_server(KVs) ->
{[[{ip_address, IPAddr}]], Opts} = proplists:split(KVs, [ip_address]),
case Opts of
[] -> IPAddr;
[{port, Port}] -> {IPAddr, Port}
end.

process_cassandra_auth([{plain, KVs}]) ->
{[[{username, User}], [{password, Pass}]], []} = proplists:split(KVs, [username, password]),
{cqerl_auth_plain_handler, [{User, Pass}]}.

process_rdbms_connection(Map) ->
KIMap = maps:with([keepalive_interval], Map),
maps:merge(KIMap, #{server => rdbms_server(Map)}).

rdbms_server(Opts = #{driver := odbc}) ->
maps:get(settings, Opts);
rdbms_server(Opts = #{host := Host, database := DB, username := User, password := Pass, driver := Driver}) ->
PortOpts = maps:get(port, Opts, none),
TLSOpts = maps:get(tls, Opts, none),
list_to_tuple([Driver, Host] ++ db_port(PortOpts) ++
[DB, User, Pass] ++ db_tls(Driver, TLSOpts)).

db_port(none) -> [];
db_port(Port) -> [Port].

db_tls(_, none) -> [];
db_tls(Driver, KVs) ->
{[ModeOpts], Opts} = proplists:split(KVs, [required]),
[ssl_mode(Driver, ModeOpts) ++ ssl_opts(Driver, Opts)].

ssl_mode(pgsql, [{required, true}]) -> [{ssl, required}];
ssl_mode(pgsql, [{required, false}]) -> [{ssl, true}];
ssl_mode(pgsql, []) -> [{ssl, true}];
ssl_mode(mysql, []) -> [].

ssl_opts(pgsql, []) -> [];
ssl_opts(pgsql, Opts) -> [{ssl_opts, Opts}];
ssl_opts(mysql, Opts) -> Opts.
process_ldap_connection(ConnOpts = #{port := _}) -> ConnOpts;
process_ldap_connection(ConnOpts = #{tls := _}) -> ConnOpts#{port => 636};
process_ldap_connection(ConnOpts) -> ConnOpts#{port => 389}.

process_riak_tls(KVs) ->
KVsWithSNI = process_tls_sni(KVs),
Expand Down Expand Up @@ -1216,10 +1182,6 @@ process_tls_sni(KVs) ->
[{server_name_indication, SNIHost} | SSLOpts]
end.

process_riak_credentials(KVs) ->
{[[{user, User}], [{password, Pass}]], []} = proplists:split(KVs, [user, password]),
{User, Pass}.

b2a(B) -> binary_to_atom(B, utf8).

a2b(A) -> atom_to_binary(A, utf8).
Expand Down
7 changes: 2 additions & 5 deletions src/event_pusher/mod_event_pusher_push.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ config_spec() ->
}.

wpool_spec() ->
WpoolDefaults = mongoose_config_spec:wpool_defaults(),
#section{items = mongoose_config_spec:wpool_items(),
defaults = WpoolDefaults#{<<"strategy">> := available_worker},
format_items = map,
include = always}.
Wpool = mongoose_config_spec:wpool(#{<<"strategy">> => available_worker}),
Wpool#section{include = always}.

%%--------------------------------------------------------------------
%% mod_event_pusher callbacks
Expand Down
2 changes: 1 addition & 1 deletion src/system_metrics/mongoose_system_metrics_collector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ extract_tls_options(#{tls := Opts}) ->
extract_tls_options(_) -> [].

get_outgoing_pools() ->
OutgoingPools = mongoose_config:get_opt(outgoing_pools, []),
OutgoingPools = mongoose_config:get_opt(outgoing_pools),
[#{report_name => outgoing_pools,
key => type,
value => Type} || #{type := Type} <- OutgoingPools].
Expand Down