From a29824ff85d237757b1eeb02bc9dc145f50501a8 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 28 Feb 2024 22:58:36 +0000 Subject: [PATCH 1/2] remove `ThinClient` from `dos/` and replace `ThinClient` with `TpuClient` --- Cargo.lock | 4 +- client/src/tpu_client.rs | 6 +++ dos/Cargo.toml | 2 +- dos/src/main.rs | 83 ++++++++++++++++++------------- gossip/Cargo.toml | 2 +- gossip/src/gossip_service.rs | 60 ++++++++++++---------- gossip/src/legacy_contact_info.rs | 15 ------ programs/sbf/Cargo.lock | 3 +- 8 files changed, 93 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88f0fa0925dcac..46091cfbca5e82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6077,11 +6077,11 @@ dependencies = [ "solana-measure", "solana-net-utils", "solana-perf", + "solana-quic-client", "solana-rpc", "solana-rpc-client", "solana-sdk", "solana-streamer", - "solana-thin-client", "solana-tpu-client", "solana-version", ] @@ -6276,6 +6276,7 @@ dependencies = [ "solana-bloom", "solana-clap-utils", "solana-client", + "solana-connection-cache", "solana-entry", "solana-frozen-abi", "solana-frozen-abi-macro", @@ -6289,7 +6290,6 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-streamer", - "solana-thin-client", "solana-tpu-client", "solana-version", "solana-vote", diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 45394151340070..038dd86774ea98 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -13,6 +13,7 @@ use { transport::Result as TransportResult, }, solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient}, + solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool}, std::sync::Arc, }; pub use { @@ -20,6 +21,11 @@ pub use { solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS}, }; +pub enum TpuClientWrapper { + Quic(TpuClient), + Udp(TpuClient), +} + /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info /// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency. diff --git a/dos/Cargo.toml b/dos/Cargo.toml index 179fc40bf84820..0d7c76b007c4ea 100644 --- a/dos/Cargo.toml +++ b/dos/Cargo.toml @@ -26,6 +26,7 @@ solana-logger = { workspace = true } solana-measure = { workspace = true } solana-net-utils = { workspace = true } solana-perf = { workspace = true } +solana-quic-client = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } solana-sdk = { workspace = true } @@ -38,4 +39,3 @@ targets = ["x86_64-unknown-linux-gnu"] [dev-dependencies] solana-local-cluster = { workspace = true } -solana-thin-client = { workspace = true } diff --git a/dos/src/main.rs b/dos/src/main.rs index b9e0dceba40bf0..0f7b74bcb7b747 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -46,12 +46,16 @@ use { log::*, rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::{ + connection_cache::ConnectionCache, + tpu_client::TpuClientWrapper, + tpu_connection::TpuConnection, + }, solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ contact_info::Protocol, - gossip_service::{discover, get_multi_client}, + gossip_service::{discover, get_client}, legacy_contact_info::LegacyContactInfo as ContactInfo, }, solana_measure::measure::Measure, @@ -791,33 +795,30 @@ fn main() { DEFAULT_TPU_CONNECTION_POOL_SIZE, ), }; - let (client, num_clients) = get_multi_client( - &validators, - &SocketAddrSpace::Unspecified, - Arc::new(connection_cache), - ); - if validators.len() < num_clients { - eprintln!( - "Error: Insufficient nodes discovered. Expecting {} or more", - validators.len() - ); - exit(1); - } - (gossip_nodes, Some(Arc::new(client))) + let client = get_client(&validators, Arc::new(connection_cache)); + (gossip_nodes, Some(client)) } else { (vec![], None) }; info!("done found {} nodes", nodes.len()); - - run_dos(&nodes, 0, client, cmd_params); + if let Some(tpu_client) = client { + match tpu_client { + TpuClientWrapper::Quic(quic_client) => { + run_dos(&nodes, 0, Some(Arc::new(quic_client)), cmd_params); + } + TpuClientWrapper::Udp(udp_client) => { + run_dos(&nodes, 0, Some(Arc::new(udp_client)), cmd_params); + } + }; + } } #[cfg(test)] pub mod test { use { super::*, - solana_client::thin_client::ThinClient, + solana_client::tpu_client::TpuClient, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_gossip::contact_info::LegacyContactInfo, @@ -826,6 +827,7 @@ pub mod test { local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::timing::timestamp, }; @@ -835,7 +837,32 @@ pub mod test { // thin wrapper for the run_dos function // to avoid specifying everywhere generic parameters fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) { - run_dos::(nodes, iterations, None, params); + run_dos::>( + nodes, iterations, None, params, + ); + } + + fn build_tpu_quic_client( + cluster: &LocalCluster, + ) -> Arc> { + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { + panic!("Expected a Quic ConnectionCache."); + }; + + Arc::new( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ) } #[test] @@ -975,14 +1002,7 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc().unwrap(), - cluster - .entry_point_info - .tpu(cluster.connection_cache.protocol()) - .unwrap(), - cluster.connection_cache.clone(), - )); + let client = build_tpu_quic_client(&cluster); // creates one transaction with 8 valid signatures and sends it 10 times run_dos( @@ -1114,14 +1134,7 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc().unwrap(), - cluster - .entry_point_info - .tpu(cluster.connection_cache.protocol()) - .unwrap(), - cluster.connection_cache.clone(), - )); + let client = build_tpu_quic_client(&cluster); // creates one transaction and sends it 10 times // this is done in single thread diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index f9870ac1ee380c..2e62bc66f6866c 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -31,6 +31,7 @@ serde_derive = { workspace = true } solana-bloom = { workspace = true } solana-clap-utils = { workspace = true } solana-client = { workspace = true } +solana-connection-cache = { workspace = true } solana-entry = { workspace = true } solana-frozen-abi = { workspace = true } solana-frozen-abi-macro = { workspace = true } @@ -44,7 +45,6 @@ solana-rayon-threadlimit = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-streamer = { workspace = true } -solana-thin-client = { workspace = true } solana-tpu-client = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 9e1c56520993c5..0bd4750e269a48 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -4,7 +4,11 @@ use { crate::{cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo}, crossbeam_channel::{unbounded, Sender}, rand::{thread_rng, Rng}, - solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, + solana_client::{ + connection_cache::ConnectionCache, + rpc_client::RpcClient, + tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper}, + }, solana_perf::recycler::Recycler, solana_runtime::bank_forks::BankForks, solana_sdk::{ @@ -197,35 +201,37 @@ pub fn discover( #[deprecated(since = "1.18.6", note = "Interface will change")] pub fn get_client( nodes: &[ContactInfo], - socket_addr_space: &SocketAddrSpace, connection_cache: Arc, -) -> ThinClient { - let protocol = connection_cache.protocol(); - let nodes: Vec<_> = nodes - .iter() - .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space)) - .collect(); +) -> TpuClientWrapper { let select = thread_rng().gen_range(0..nodes.len()); - let (rpc, tpu) = nodes[select]; - ThinClient::new(rpc, tpu, connection_cache) -} -#[deprecated(since = "1.18.6", note = "Will be removed in favor of get_client")] -pub fn get_multi_client( - nodes: &[ContactInfo], - socket_addr_space: &SocketAddrSpace, - connection_cache: Arc, -) -> (ThinClient, usize) { - let protocol = connection_cache.protocol(); - let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes - .iter() - .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space)) - .unzip(); - let num_nodes = tpu_addrs.len(); - ( - ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache), - num_nodes, - ) + let rpc_pubsub_url = format!("ws://{}/", nodes[select].rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", nodes[select].rpc().unwrap()); + + match &*connection_cache { + ConnectionCache::Quic(cache) => TpuClientWrapper::Quic( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ), + ConnectionCache::Udp(cache) => TpuClientWrapper::Udp( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Udp Cache {err:?}"); + }), + ), + } } fn spy( diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index d3dead1910d6ab..882df75864e357 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -229,21 +229,6 @@ impl LegacyContactInfo { pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool { addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr) } - - pub(crate) fn valid_client_facing_addr( - &self, - protocol: Protocol, - socket_addr_space: &SocketAddrSpace, - ) -> Option<(SocketAddr, SocketAddr)> { - Some(( - self.rpc() - .ok() - .filter(|addr| socket_addr_space.check(addr))?, - self.tpu(protocol) - .ok() - .filter(|addr| socket_addr_space.check(addr))?, - )) - } } impl TryFrom<&ContactInfo> for LegacyContactInfo { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index c31befdf34c092..c634256de5cda2 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5169,6 +5169,7 @@ dependencies = [ "solana-bloom", "solana-clap-utils", "solana-client", + "solana-connection-cache", "solana-entry", "solana-frozen-abi", "solana-frozen-abi-macro", @@ -5178,11 +5179,11 @@ dependencies = [ "solana-metrics", "solana-net-utils", "solana-perf", + "solana-quic-client", "solana-rayon-threadlimit", "solana-runtime", "solana-sdk", "solana-streamer", - "solana-thin-client", "solana-tpu-client", "solana-version", "solana-vote", From 7d09b50c712186ac666ec03a59e6216d091abc69 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 6 Mar 2024 21:31:58 +0000 Subject: [PATCH 2/2] remove test for valid_client_facing_addr since it is no longer used --- dos/src/main.rs | 4 ++-- gossip/src/legacy_contact_info.rs | 18 ------------------ programs/sbf/Cargo.lock | 1 - 3 files changed, 2 insertions(+), 21 deletions(-) diff --git a/dos/src/main.rs b/dos/src/main.rs index 0f7b74bcb7b747..055b1f4bb65d4c 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -47,8 +47,7 @@ use { rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, solana_client::{ - connection_cache::ConnectionCache, - tpu_client::TpuClientWrapper, + connection_cache::ConnectionCache, tpu_client::TpuClientWrapper, tpu_connection::TpuConnection, }, solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, @@ -830,6 +829,7 @@ pub mod test { solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::timing::timestamp, + solana_tpu_client::tpu_client::TpuClientConfig, }; const TEST_SEND_BATCH_SIZE: usize = 1; diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index 882df75864e357..870f1c9aa49283 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -327,24 +327,6 @@ mod tests { assert!(ci.serve_repair.ip().is_unspecified()); } - #[test] - fn test_valid_client_facing() { - let mut ci = LegacyContactInfo::default(); - assert_eq!( - ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified), - None - ); - ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123); - assert_eq!( - ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified), - None - ); - ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234); - assert!(ci - .valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified) - .is_some()); - } - #[test] fn test_sanitize() { let mut ci = LegacyContactInfo::default(); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index c634256de5cda2..d148f0bea7b5d0 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5179,7 +5179,6 @@ dependencies = [ "solana-metrics", "solana-net-utils", "solana-perf", - "solana-quic-client", "solana-rayon-threadlimit", "solana-runtime", "solana-sdk",