Skip to content

Commit

Permalink
v2.1: Add not unique leader discovery (backport of #3546) (#3658)
Browse files Browse the repository at this point in the history
Add not unique leader discovery (#3546)

* rename get_leader_sockets to get_unique_leader_sockets

* Add methods for getting not unique tpu sockets

(cherry picked from commit dffcdb4)

Co-authored-by: kirill lykov <[email protected]>
  • Loading branch information
mergify[bot] and KirillLykov authored Nov 22, 2024
1 parent f0e17c6 commit cefd3f1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
47 changes: 37 additions & 10 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use {
response::{RpcContactInfo, SlotUpdate},
},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
clock::{Slot, DEFAULT_MS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS},
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
pubkey::Pubkey,
Expand Down Expand Up @@ -122,24 +122,43 @@ impl LeaderTpuCache {
)
}

// Get the TPU sockets for the current leader and upcoming leaders according to fanout size
// Get the TPU sockets for the current leader and upcoming *unique* leaders according to fanout size.
fn get_unique_leader_sockets(
&self,
estimated_current_slot: Slot,
fanout_slots: u64,
) -> Vec<SocketAddr> {
let all_leader_sockets = self.get_leader_sockets(estimated_current_slot, fanout_slots);

let mut unique_sockets = Vec::new();
let mut seen = HashSet::new();

for socket in all_leader_sockets {
if seen.insert(socket) {
unique_sockets.push(socket);
}
}

unique_sockets
}

// Get the TPU sockets for the current leader and upcoming leaders according to fanout size.
fn get_leader_sockets(
&self,
estimated_current_slot: Slot,
fanout_slots: u64,
) -> Vec<SocketAddr> {
let mut leader_set = HashSet::new();
let mut leader_sockets = Vec::new();
// `first_slot` might have been advanced since caller last read the `estimated_current_slot`
// value. Take the greater of the two values to ensure we are reading from the latest
// leader schedule.
let current_slot = std::cmp::max(estimated_current_slot, self.first_slot);
for leader_slot in current_slot..current_slot + fanout_slots {
for leader_slot in (current_slot..current_slot + fanout_slots)
.step_by(NUM_CONSECUTIVE_LEADER_SLOTS as usize)
{
if let Some(leader) = self.get_slot_leader(leader_slot) {
if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
if leader_set.insert(*leader) {
leader_sockets.push(*tpu_socket);
}
leader_sockets.push(*tpu_socket);
} else {
// The leader is probably delinquent
trace!("TPU not available for leader {}", leader);
Expand Down Expand Up @@ -411,7 +430,7 @@ where
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
.unique_leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
Expand Down Expand Up @@ -455,7 +474,7 @@ where
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
.unique_leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
Expand Down Expand Up @@ -568,7 +587,7 @@ where
let wire_transaction = serialize(transaction).unwrap();
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
.unique_leader_tpu_sockets(self.fanout_slots);
futures.extend(send_wire_transaction_futures(
&progress_bar,
&progress,
Expand Down Expand Up @@ -803,6 +822,14 @@ impl LeaderTpuService {
self.recent_slots.estimated_current_slot()
}

pub fn unique_leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
let current_slot = self.recent_slots.estimated_current_slot();
self.leader_tpu_cache
.read()
.unwrap()
.get_unique_leader_sockets(current_slot, fanout_slots)
}

pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
let current_slot = self.recent_slots.estimated_current_slot();
self.leader_tpu_cache
Expand Down
2 changes: 1 addition & 1 deletion tpu-client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ where
let leaders = self
.tpu_client
.get_leader_tpu_service()
.leader_tpu_sockets(self.tpu_client.get_fanout_slots());
.unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots());

for tpu_address in &leaders {
let cache = self.tpu_client.get_connection_cache();
Expand Down

0 comments on commit cefd3f1

Please sign in to comment.