Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast various requests as per #4684 #4920

Merged
merged 9 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 64 additions & 6 deletions lighthouse/tests/validator_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use validator_client::Config;
use validator_client::{ApiTopic, Config};

use crate::exec::CommandLineTestExec;
use bls::{Keypair, PublicKeyBytes};
Expand Down Expand Up @@ -493,20 +493,78 @@ fn monitoring_endpoint() {
assert_eq!(api_conf.update_period_secs, Some(30));
});
}

#[test]
fn disable_run_on_all_default() {
fn disable_run_on_all_flag() {
CommandLineTest::new()
.flag("disable-run-on-all", None)
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![]);
});
// --broadcast flag takes precedence
CommandLineTest::new()
.flag("disable-run-on-all", None)
.flag("broadcast", Some("attestations"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]);
});
}

#[test]
fn no_broadcast_flag() {
CommandLineTest::new().run().with_config(|config| {
assert!(!config.disable_run_on_all);
assert_eq!(config.broadcast_topics, vec![ApiTopic::Subscriptions]);
});
}

#[test]
fn disable_run_on_all() {
fn broadcast_flag() {
// "none" variant
CommandLineTest::new()
.flag("disable-run-on-all", None)
.flag("broadcast", Some("none"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![]);
});
// "none" with other values is ignored
CommandLineTest::new()
.flag("broadcast", Some("none,sync-committee"))
.run()
.with_config(|config| {
assert!(config.disable_run_on_all);
assert_eq!(config.broadcast_topics, vec![ApiTopic::SyncCommittee]);
});
// Other valid variants
CommandLineTest::new()
.flag("broadcast", Some("blocks, subscriptions"))
.run()
.with_config(|config| {
assert_eq!(
config.broadcast_topics,
vec![ApiTopic::Blocks, ApiTopic::Subscriptions],
);
});
// Omitted "subscription" overrides default
CommandLineTest::new()
.flag("broadcast", Some("attestations"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]);
});
}

#[test]
#[should_panic(expected = "Unknown API topic")]
fn wrong_broadcast_flag() {
CommandLineTest::new()
.flag("broadcast", Some("foo, subscriptions"))
.run()
.with_config(|config| {
assert_eq!(
config.broadcast_topics,
vec![ApiTopic::Blocks, ApiTopic::Subscriptions],
);
});
}

Expand Down
2 changes: 1 addition & 1 deletion testing/node_test_rig/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ types = { workspace = true }
tempfile = { workspace = true }
eth2 = { workspace = true }
validator_client = { workspace = true }
validator_dir = { workspace = true }
validator_dir = { workspace = true, features = ["insecure_keys"] }
sensitive_url = { workspace = true }
execution_layer = { workspace = true }
tokio = { workspace = true }
2 changes: 1 addition & 1 deletion testing/node_test_rig/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use eth2;
pub use execution_layer::test_utils::{
Config as MockServerConfig, MockExecutionConfig, MockServer,
};
pub use validator_client::Config as ValidatorConfig;
pub use validator_client::{ApiTopic, Config as ValidatorConfig};

/// The global timeout for HTTP requests to the beacon node.
const HTTP_TIMEOUT: Duration = Duration::from_secs(8);
Expand Down
26 changes: 21 additions & 5 deletions testing/simulator/src/eth1_sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use futures::prelude::*;
use node_test_rig::environment::RuntimeContext;
use node_test_rig::{
environment::{EnvironmentBuilder, LoggerConfig},
testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles,
testing_client_config, testing_validator_config, ApiTopic, ClientConfig, ClientGenesis,
ValidatorFiles,
};
use rayon::prelude::*;
use sensitive_url::SensitiveUrl;
Expand Down Expand Up @@ -159,10 +160,25 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
validator_config.fee_recipient = Some(SUGGESTED_FEE_RECIPIENT.into());
}
println!("Adding validator client {}", i);
network_1
.add_validator_client(validator_config, i, files, i % 2 == 0)
.await
.expect("should add validator");

// Enable broadcast on every 4th node.
if i % 4 == 0 {
validator_config.broadcast_topics = ApiTopic::all();
let beacon_nodes = vec![i, (i + 1) % node_count];
network_1
.add_validator_client_with_fallbacks(
validator_config,
i,
beacon_nodes,
files,
)
.await
} else {
network_1
.add_validator_client(validator_config, i, files, i % 2 == 0)
.await
}
.expect("should add validator");
},
"vc",
);
Expand Down
42 changes: 42 additions & 0 deletions testing/simulator/src/local_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,48 @@ impl<E: EthSpec> LocalNetwork<E> {
Ok(())
}

pub async fn add_validator_client_with_fallbacks(
&self,
mut validator_config: ValidatorConfig,
validator_index: usize,
beacon_nodes: Vec<usize>,
validator_files: ValidatorFiles,
) -> Result<(), String> {
let context = self
.context
.service_context(format!("validator_{}", validator_index));
let self_1 = self.clone();
let mut beacon_node_urls = vec![];
for beacon_node in beacon_nodes {
let socket_addr = {
let read_lock = self.beacon_nodes.read();
let beacon_node = read_lock
.get(beacon_node)
.ok_or_else(|| format!("No beacon node for index {}", beacon_node))?;
beacon_node
.client
.http_api_listen_addr()
.expect("Must have http started")
};
let beacon_node = SensitiveUrl::parse(
format!("http://{}:{}", socket_addr.ip(), socket_addr.port()).as_str(),
)
.unwrap();
beacon_node_urls.push(beacon_node);
}

validator_config.beacon_nodes = beacon_node_urls;

let validator_client = LocalValidatorClient::production_with_insecure_keypairs(
context,
validator_config,
validator_files,
)
.await?;
self_1.validator_clients.write().push(validator_client);
Ok(())
}

/// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API.
pub fn remote_nodes(&self) -> Result<Vec<BeaconNodeHttpClient>, String> {
let beacon_nodes = self.beacon_nodes.read();
Expand Down
1 change: 1 addition & 0 deletions validator_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ malloc_utils = { workspace = true }
sysinfo = { workspace = true }
system_health = { path = "../common/system_health" }
logging = { workspace = true }
strum = { workspace = true }
5 changes: 3 additions & 2 deletions validator_client/src/attestation_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
Expand Down Expand Up @@ -433,9 +433,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.first_success(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Attestations,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
Expand Down
57 changes: 47 additions & 10 deletions validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::fmt;
Expand All @@ -15,6 +16,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::{EnumString, EnumVariantNames};
use tokio::{sync::RwLock, time::sleep};
use types::{ChainSpec, Config, EthSpec};

Expand Down Expand Up @@ -330,22 +332,22 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
pub struct BeaconNodeFallback<T, E> {
candidates: Vec<CandidateBeaconNode<E>>,
slot_clock: Option<T>,
disable_run_on_all: bool,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
}

impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub fn new(
candidates: Vec<CandidateBeaconNode<E>>,
disable_run_on_all: bool,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
) -> Self {
Self {
candidates,
slot_clock: None,
disable_run_on_all,
broadcast_topics,
spec,
log,
}
Expand Down Expand Up @@ -579,7 +581,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// It returns a list of errors along with the beacon node id that failed for `func`.
/// Since this ignores the actual result of `func`, this function should only be used for beacon
/// node calls whose results we do not care about, only that they completed successfully.
pub async fn run_on_all<'a, F, O, Err, R>(
pub async fn broadcast<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
Expand Down Expand Up @@ -687,25 +689,60 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
}

/// Call `func` on first beacon node that returns success or on all beacon nodes
/// depending on the value of `disable_run_on_all`.
pub async fn run<'a, F, Err, R>(
/// depending on the `topic` and configuration.
pub async fn request<'a, F, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
topic: ApiTopic,
func: F,
) -> Result<(), Errors<Err>>
where
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
if self.disable_run_on_all {
if self.broadcast_topics.contains(&topic) {
self.broadcast(require_synced, offline_on_failure, func)
.await
} else {
self.first_success(require_synced, offline_on_failure, func)
.await?;
Ok(())
} else {
self.run_on_all(require_synced, offline_on_failure, func)
.await
}
}
}

/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted.
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
pub enum ApiTopic {
Attestations,
Blocks,
Subscriptions,
SyncCommittee,
}

impl ApiTopic {
pub fn all() -> Vec<ApiTopic> {
use ApiTopic::*;
vec![Attestations, Blocks, Subscriptions, SyncCommittee]
}
}

#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
use strum::VariantNames;

#[test]
fn api_topic_all() {
let all = ApiTopic::all();
assert_eq!(all.len(), ApiTopic::VARIANTS.len());
assert!(ApiTopic::VARIANTS
.iter()
.map(|topic| ApiTopic::from_str(topic).unwrap())
.eq(all.into_iter()));
}
}
Loading