Skip to content

Commit

Permalink
Periodically reconcile virtual host processes
Browse files Browse the repository at this point in the history
for up to 10 times.

When a cluster is formed from scratch and a virtual
host is declared in the process (can be via
definitions, or plugins, or any other way),
that virtual host's process tree will be started
on all reachable cluster nodes at that moment
in time.

However, this can be just a subset of all nodes
expected to join the cluster.

The most effective solution is to run this
reconciliation process on a timer for up to
5 minutes by default. This matches how long
some other parts of RabbitMQ (3.x) expect
cluster formation to take, at most.

Per discussion with @dcorbacho @mkuratczyk.
  • Loading branch information
Michael Klishin authored and michaelklishin committed Jun 4, 2024
1 parent 096015b commit ff0cf27
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 8 deletions.
2 changes: 2 additions & 0 deletions deps/oauth2_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ APP_MODULE = "oauth2_client_app"
# gazelle:erlang_app_extra_app inets
# gazelle:erlang_app_extra_app crypto
# gazelle:erlang_app_extra_app public_key
# gazelle:erlang_app_extra_app cowlib

# gazelle:erlang_app_dep_exclude rabbit

Expand All @@ -44,6 +45,7 @@ rabbitmq_app(
"inets",
"ssl",
"public_key",
"cowlib",
],
license_files = [":license_files"],
priv = [":priv"],
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ _APP_ENV = """[
{dead_letter_worker_publisher_confirm_timeout, 180000},
%% EOL date for the current release series, if known/announced
{release_series_eol_date, none}
{release_series_eol_date, none},
{vhost_process_reconciliation_run_interval, 30}
]
"""

Expand Down
13 changes: 7 additions & 6 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ define PROJECT_ENV
%% interval at which connection/channel tracking executes post operations
{tracking_execution_timeout, 15000},
{stream_messages_soft_limit, 256},
{track_auth_attempt_source, false},
{credentials_obfuscation_fallback_secret, <<"nocookie">>},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000},
%% EOL date for the current release series, if known/announced
{release_series_eol_date, none}
{track_auth_attempt_source, false},
{credentials_obfuscation_fallback_secret, <<"nocookie">>},
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000},
%% EOL date for the current release series, if known/announced
{release_series_eol_date, none},
{vhost_process_reconciliation_run_interval, 30}
]
endef

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_vhost_sup.erl",
"src/rabbit_vhost_sup_sup.erl",
"src/rabbit_vhost_sup_wrapper.erl",
"src/rabbit_vhosts.erl",
"src/rabbit_vm.erl",
"src/supervised_lifecycle.erl",
"src/tcp_listener.erl",
Expand Down Expand Up @@ -511,6 +512,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_vhost_sup.erl",
"src/rabbit_vhost_sup_sup.erl",
"src/rabbit_vhost_sup_wrapper.erl",
"src/rabbit_vhosts.erl",
"src/rabbit_vm.erl",
"src/supervised_lifecycle.erl",
"src/tcp_listener.erl",
Expand Down Expand Up @@ -802,6 +804,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_vhost_sup.erl",
"src/rabbit_vhost_sup_sup.erl",
"src/rabbit_vhost_sup_wrapper.erl",
"src/rabbit_vhosts.erl",
"src/rabbit_vm.erl",
"src/supervised_lifecycle.erl",
"src/tcp_listener.erl",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,15 @@
{mfa, {logger, debug, ["'networking' boot step skipped and moved to end of startup", [], #{domain => ?RMQLOG_DOMAIN_GLOBAL}]}},
{requires, notify_cluster}]}).

%% This mechanism is necessary in environments where a cluster is formed in parallel,
%% which is the case with many container orchestration tools.
%% In such scenarios, a virtual host can be declared before the cluster is formed and all
%% cluster members are known, e.g. via definition import.
-rabbit_boot_step({virtual_host_reconciliation,
[{description, "makes sure all virtual host have running processes on all nodes"},
{mfa, {rabbit_vhosts, boot, []}},
{requires, notify_cluster}]}).

-rabbit_boot_step({pg_local,
[{description, "local-only pg scope"},
{mfa, {rabbit, pg_local, []}},
Expand Down
109 changes: 109 additions & 0 deletions deps/rabbit/src/rabbit_vhosts.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% This module exists to avoid circular module dependencies between
%% several others virtual hosts-related modules.
-module(rabbit_vhosts).

-define(PERSISTENT_TERM_COUNTER_KEY, rabbit_vhosts_reconciliation_run_counter).

%% API

-export([
list_names/0,
exists/1,
boot/0,
reconcile/0,
start_processes_for_all/0,
start_on_all_nodes/2
]).

%% Same as rabbit_vhost:exists/1.
-spec exists(vhost:name()) -> boolean().
exists(VirtualHost) ->
rabbit_db_vhost:exists(VirtualHost).

%% Same as rabbit_vhost:list_names/0.
-spec list_names() -> [vhost:name()].
list_names() -> rabbit_db_vhost:list().

-spec boot() -> 'ok'.
boot() ->
start_processes_for_all(),
increment_run_counter(),
maybe_start_timer(reconcile),
ok.

%% Performs a round of virtual host process reconciliation. See start_processes_for_all/1.
-spec reconcile() -> 'ok'.
reconcile() ->
rabbit_log:debug("Will reconcile virtual host processes on all cluster members..."),
start_processes_for_all(),
increment_run_counter(),
N = get_run_counter(),
rabbit_log:debug("Done with virtual host processes reconciliation (run ~tp)", [N]),
maybe_start_timer(?FUNCTION_NAME),
ok.

%% Starts a virtual host process on every specified nodes.
%% Only exists to allow for "virtual host process repair"
%% in clusters where nodes a booted in parallel and seeded
%% (e.g. using definitions) at the same time.
%%
%% In that case, during virtual host insertion into the schema database,
%% some processes predictably won't be started on the yet-to-be-discovered nodes.
-spec start_processes_for_all([node()]) -> rabbit_types:ok_or_error(any()).
start_processes_for_all(Nodes) ->
Names = list_names(),
N = length(Names),
rabbit_log:debug("Will make sure that processes of ~p virtual hosts are running on all reachable cluster nodes", [N]),
[begin
try
start_on_all_nodes(VH, Nodes)
catch
_:Err:_Stacktrace ->
rabbit_log:error("Could not reconcile virtual host ~ts: ~tp", [VH, Err])
end
end || VH <- Names].

-spec start_processes_for_all() -> rabbit_types:ok_or_error(any()).
start_processes_for_all() ->
start_processes_for_all(rabbit_nodes:list_reachable()).

%% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0.
-spec start_on_all_nodes(vhost:name(), [node()]) -> boolean().
start_on_all_nodes(VirtualHost, Nodes) ->
rabbit_vhost_sup_sup:start_on_all_nodes(VirtualHost, Nodes).

%%
%% Implementation
%%

-spec get_run_counter() -> non_neg_integer().
get_run_counter() ->
persistent_term:get(?PERSISTENT_TERM_COUNTER_KEY, 0).

-spec increment_run_counter() -> non_neg_integer().
increment_run_counter() ->
N = get_run_counter(),
persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, N + 1),
N.

-spec maybe_start_timer(atom()) -> ok | {ok, timer:tref()} | {error, any()}.
maybe_start_timer(FunName) ->
N = get_run_counter(),
DelayInSeconds = application:get_env(rabbit, vhost_process_reconciliation_run_interval, 30),
case N >= 10 of
true ->
%% Do nothing after ten runs
rabbit_log:debug("Will stop virtual host process reconciliation after ~tp runs", [N]),
ok;
false ->
Delay = DelayInSeconds * 1000,
rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]),
timer:apply_after(Delay, ?MODULE, FunName, [])
end.
4 changes: 3 additions & 1 deletion deps/rabbitmq_auth_backend_http/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ all_srcs(name = "all_srcs")

test_suite_beam_files(name = "test_suite_beam_files")

# gazelle:erlang_app_extra_app crypto
# gazelle:erlang_app_extra_app inets
# gazelle:erlang_app_extra_app ssl
# gazelle:erlang_app_extra_app public_key
# gazelle:erlang_app_dep rabbit

Expand Down Expand Up @@ -106,7 +108,7 @@ rabbitmq_integration_suite(
"test/auth_http_mock.beam",
],
deps = [
"@cowboy//:erlang_app"
"@cowboy//:erlang_app",
],
)

Expand Down
1 change: 1 addition & 0 deletions moduleindex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ rabbit:
- rabbit_vhost_sup
- rabbit_vhost_sup_sup
- rabbit_vhost_sup_wrapper
- rabbit_vhosts
- rabbit_vm
- supervised_lifecycle
- tcp_listener
Expand Down

0 comments on commit ff0cf27

Please sign in to comment.