From 70324f5ec208225f2a04350910aef76a6556e7d1 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 28 Feb 2024 22:58:36 +0000 Subject: [PATCH] remove `ThinClient` from `dos/`. requires testing --- Cargo.lock | 4 +- bench-tps/Cargo.toml | 1 - bench-tps/src/bench_tps_client.rs | 1 - bench-tps/src/bench_tps_client/thin_client.rs | 113 ------------------ client/src/tpu_client.rs | 6 + dos/Cargo.toml | 1 + dos/src/main.rs | 85 ++++++++----- gossip/Cargo.toml | 2 + gossip/src/gossip_service.rs | 68 ++++++----- gossip/src/legacy_contact_info.rs | 15 --- 10 files changed, 110 insertions(+), 186 deletions(-) delete mode 100644 bench-tps/src/bench_tps_client/thin_client.rs diff --git a/Cargo.lock b/Cargo.lock index 78ff40111ee0b0..8a82b1d9251aff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5589,7 +5589,6 @@ dependencies = [ "solana-sdk", "solana-streamer", "solana-test-validator", - "solana-thin-client", "solana-tpu-client", "solana-version", "spl-instruction-padding", @@ -6077,6 +6076,7 @@ dependencies = [ "solana-measure", "solana-net-utils", "solana-perf", + "solana-quic-client", "solana-rpc", "solana-rpc-client", "solana-sdk", @@ -6276,6 +6276,7 @@ dependencies = [ "solana-bloom", "solana-clap-utils", "solana-client", + "solana-connection-cache", "solana-entry", "solana-frozen-abi", "solana-frozen-abi-macro", @@ -6285,6 +6286,7 @@ dependencies = [ "solana-metrics", "solana-net-utils", "solana-perf", + "solana-quic-client", "solana-rayon-threadlimit", "solana-runtime", "solana-sdk", diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index cd40eb1c833c1c..8ebdbbcf4a2425 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -35,7 +35,6 @@ solana-rpc-client-nonce-utils = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-streamer = { workspace = true } -solana-thin-client = { workspace = true } solana-tpu-client = { workspace = true } solana-version = { workspace = true } spl-instruction-padding = { workspace = true } diff --git a/bench-tps/src/bench_tps_client.rs b/bench-tps/src/bench_tps_client.rs index 3ab15bec11f7ee..4aae9ee266d38e 100644 --- a/bench-tps/src/bench_tps_client.rs +++ b/bench-tps/src/bench_tps_client.rs @@ -97,5 +97,4 @@ pub trait BenchTpsClient { mod bank_client; mod rpc_client; -mod thin_client; mod tpu_client; diff --git a/bench-tps/src/bench_tps_client/thin_client.rs b/bench-tps/src/bench_tps_client/thin_client.rs deleted file mode 100644 index 6696774d679a8a..00000000000000 --- a/bench-tps/src/bench_tps_client/thin_client.rs +++ /dev/null @@ -1,113 +0,0 @@ -use { - crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result}, - solana_client::thin_client::ThinClient, - solana_sdk::{ - account::Account, - client::{AsyncClient, Client, SyncClient}, - commitment_config::CommitmentConfig, - epoch_info::EpochInfo, - hash::Hash, - message::Message, - pubkey::Pubkey, - signature::Signature, - transaction::Transaction, - }, -}; - -impl BenchTpsClient for ThinClient { - fn send_transaction(&self, transaction: Transaction) -> Result { - AsyncClient::async_send_transaction(self, transaction).map_err(|err| err.into()) - } - fn send_batch(&self, transactions: Vec) -> Result<()> { - AsyncClient::async_send_batch(self, transactions).map_err(|err| err.into()) - } - fn get_latest_blockhash(&self) -> Result { - SyncClient::get_latest_blockhash(self).map_err(|err| err.into()) - } - - fn get_latest_blockhash_with_commitment( - &self, - commitment_config: CommitmentConfig, - ) -> Result<(Hash, u64)> { - SyncClient::get_latest_blockhash_with_commitment(self, commitment_config) - .map_err(|err| err.into()) - } - - fn get_transaction_count(&self) -> Result { - SyncClient::get_transaction_count(self).map_err(|err| err.into()) - } - - fn get_transaction_count_with_commitment( - &self, - commitment_config: CommitmentConfig, - ) -> Result { - SyncClient::get_transaction_count_with_commitment(self, commitment_config) - .map_err(|err| err.into()) - } - - fn get_epoch_info(&self) -> Result { - SyncClient::get_epoch_info(self).map_err(|err| err.into()) - } - - fn get_balance(&self, pubkey: &Pubkey) -> Result { - SyncClient::get_balance(self, pubkey).map_err(|err| err.into()) - } - - fn get_balance_with_commitment( - &self, - pubkey: &Pubkey, - commitment_config: CommitmentConfig, - ) -> Result { - SyncClient::get_balance_with_commitment(self, pubkey, commitment_config) - .map_err(|err| err.into()) - } - - fn get_fee_for_message(&self, message: &Message) -> Result { - SyncClient::get_fee_for_message(self, message).map_err(|err| err.into()) - } - - fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result { - SyncClient::get_minimum_balance_for_rent_exemption(self, data_len).map_err(|err| err.into()) - } - - fn addr(&self) -> String { - Client::tpu_addr(self) - } - - fn request_airdrop_with_blockhash( - &self, - pubkey: &Pubkey, - lamports: u64, - recent_blockhash: &Hash, - ) -> Result { - self.rpc_client() - .request_airdrop_with_blockhash(pubkey, lamports, recent_blockhash) - .map_err(|err| err.into()) - } - - fn get_account(&self, pubkey: &Pubkey) -> Result { - self.rpc_client() - .get_account(pubkey) - .map_err(|err| err.into()) - } - - fn get_account_with_commitment( - &self, - pubkey: &Pubkey, - commitment_config: CommitmentConfig, - ) -> Result { - SyncClient::get_account_with_commitment(self, pubkey, commitment_config) - .map_err(|err| err.into()) - .and_then(|account| { - account.ok_or_else(|| { - BenchTpsError::Custom(format!("AccountNotFound: pubkey={pubkey}")) - }) - }) - } - - fn get_multiple_accounts(&self, pubkeys: &[Pubkey]) -> Result>> { - self.rpc_client() - .get_multiple_accounts(pubkeys) - .map_err(|err| err.into()) - } -} diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 45394151340070..038dd86774ea98 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -13,6 +13,7 @@ use { transport::Result as TransportResult, }, solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient}, + solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool}, std::sync::Arc, }; pub use { @@ -20,6 +21,11 @@ pub use { solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS}, }; +pub enum TpuClientWrapper { + Quic(TpuClient), + Udp(TpuClient), +} + /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info /// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency. diff --git a/dos/Cargo.toml b/dos/Cargo.toml index 179fc40bf84820..47f7aab0f19625 100644 --- a/dos/Cargo.toml +++ b/dos/Cargo.toml @@ -26,6 +26,7 @@ solana-logger = { workspace = true } solana-measure = { workspace = true } solana-net-utils = { workspace = true } solana-perf = { workspace = true } +solana-quic-client = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } solana-sdk = { workspace = true } diff --git a/dos/src/main.rs b/dos/src/main.rs index 8e6c3c5b2b11b5..173e5299174eaf 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -45,7 +45,11 @@ use { log::*, rand::{thread_rng, Rng}, solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient}, - solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, + solana_client::{ + connection_cache::ConnectionCache, + tpu_client::{TpuClientConfig, TpuClientWrapper}, + tpu_connection::TpuConnection, + }, solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ @@ -790,11 +794,7 @@ fn main() { DEFAULT_TPU_CONNECTION_POOL_SIZE, ), }; - let (client, num_clients) = get_multi_client( - &validators, - &SocketAddrSpace::Unspecified, - Arc::new(connection_cache), - ); + let (client, num_clients) = get_multi_client(&validators, Arc::new(connection_cache)); if validators.len() < num_clients { eprintln!( "Error: Insufficient nodes discovered. Expecting {} or more", @@ -802,21 +802,29 @@ fn main() { ); exit(1); } - (gossip_nodes, Some(Arc::new(client))) + (gossip_nodes, Some(client)) } else { (vec![], None) }; info!("done found {} nodes", nodes.len()); - - run_dos(&nodes, 0, client, cmd_params); + if let Some(tpu_client) = client { + match tpu_client { + TpuClientWrapper::Quic(quic_client) => { + run_dos(&nodes, 0, Some(Arc::new(quic_client)), cmd_params); + } + TpuClientWrapper::Udp(udp_client) => { + run_dos(&nodes, 0, Some(Arc::new(udp_client)), cmd_params); + } + }; + } } #[cfg(test)] pub mod test { use { super::*, - solana_client::thin_client::ThinClient, + solana_client::tpu_client::TpuClient, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_gossip::contact_info::LegacyContactInfo, @@ -825,6 +833,7 @@ pub mod test { local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::timing::timestamp, }; @@ -834,7 +843,9 @@ pub mod test { // thin wrapper for the run_dos function // to avoid specifying everywhere generic parameters fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) { - run_dos::(nodes, iterations, None, params); + run_dos::>( + nodes, iterations, None, params, + ); } #[test] @@ -974,14 +985,24 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc().unwrap(), - cluster - .entry_point_info - .tpu(cluster.connection_cache.protocol()) - .unwrap(), - cluster.connection_cache.clone(), - )); + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { + panic!("Expected a Quic ConnectionCache."); + }; + + let client = Arc::new( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ); // creates one transaction with 8 valid signatures and sends it 10 times run_dos( @@ -1113,14 +1134,24 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = Arc::new(ThinClient::new( - cluster.entry_point_info.rpc().unwrap(), - cluster - .entry_point_info - .tpu(cluster.connection_cache.protocol()) - .unwrap(), - cluster.connection_cache.clone(), - )); + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { + panic!("Expected a Quic ConnectionCache."); + }; + + let client = Arc::new( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ); // creates one transaction and sends it 10 times // this is done in single thread diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index f9870ac1ee380c..77255baacc7afb 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -31,6 +31,7 @@ serde_derive = { workspace = true } solana-bloom = { workspace = true } solana-clap-utils = { workspace = true } solana-client = { workspace = true } +solana-connection-cache = { workspace = true } solana-entry = { workspace = true } solana-frozen-abi = { workspace = true } solana-frozen-abi-macro = { workspace = true } @@ -40,6 +41,7 @@ solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-net-utils = { workspace = true } solana-perf = { workspace = true } +solana-quic-client = { workspace = true } solana-rayon-threadlimit = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 806ee23a4fb0be..db85f5655e70ef 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -4,7 +4,11 @@ use { crate::{cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo}, crossbeam_channel::{unbounded, Sender}, rand::{thread_rng, Rng}, - solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, + solana_client::{ + connection_cache::ConnectionCache, + rpc_client::RpcClient, + tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper}, + }, solana_perf::recycler::Recycler, solana_runtime::bank_forks::BankForks, solana_sdk::{ @@ -193,37 +197,45 @@ pub fn discover( )) } -/// Creates a ThinClient by selecting a valid node at random -pub fn get_client( +pub fn get_multi_client( nodes: &[ContactInfo], - socket_addr_space: &SocketAddrSpace, connection_cache: Arc, -) -> ThinClient { - let protocol = connection_cache.protocol(); - let nodes: Vec<_> = nodes - .iter() - .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space)) - .collect(); +) -> (TpuClientWrapper, usize) { let select = thread_rng().gen_range(0..nodes.len()); - let (rpc, tpu) = nodes[select]; - ThinClient::new(rpc, tpu, connection_cache) -} -pub fn get_multi_client( - nodes: &[ContactInfo], - socket_addr_space: &SocketAddrSpace, - connection_cache: Arc, -) -> (ThinClient, usize) { - let protocol = connection_cache.protocol(); - let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes - .iter() - .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space)) - .unzip(); - let num_nodes = tpu_addrs.len(); - ( - ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache), - num_nodes, - ) + let rpc_pubsub_url = format!("ws://{}/", nodes[select].rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", nodes[select].rpc().unwrap()); + + match &*connection_cache { + ConnectionCache::Quic(cache) => { + let client = TpuClientWrapper::Quic( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + }), + ); + (client, nodes.len()) + } + ConnectionCache::Udp(cache) => { + let client = TpuClientWrapper::Udp( + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap_or_else(|err| { + panic!("Could not create TpuClient with Udp Cache {err:?}"); + }), + ); + (client, nodes.len()) + } + } } fn spy( diff --git a/gossip/src/legacy_contact_info.rs b/gossip/src/legacy_contact_info.rs index d3dead1910d6ab..882df75864e357 100644 --- a/gossip/src/legacy_contact_info.rs +++ b/gossip/src/legacy_contact_info.rs @@ -229,21 +229,6 @@ impl LegacyContactInfo { pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool { addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr) } - - pub(crate) fn valid_client_facing_addr( - &self, - protocol: Protocol, - socket_addr_space: &SocketAddrSpace, - ) -> Option<(SocketAddr, SocketAddr)> { - Some(( - self.rpc() - .ok() - .filter(|addr| socket_addr_space.check(addr))?, - self.tpu(protocol) - .ok() - .filter(|addr| socket_addr_space.check(addr))?, - )) - } } impl TryFrom<&ContactInfo> for LegacyContactInfo {