Skip to content

Commit

Permalink
gossip: notify state machine of duplicate proofs (#32963)
Browse files Browse the repository at this point in the history
* gossip: notify state machine of duplicate proofs

* Add feature flag for ingesting duplicate proofs from Gossip.

* Use the Epoch the shred is in instead of the root bank epoch.

* Fix unittest by activating the feature.

* Add a test for feature disabled case.

* EpochSchedule is now not copyable, clone it explicitly.

* pr feedback: read epoch schedule on startup, add guard for ff recache

* pr feedback: bank_forks lock, -cached_slots_in_epoch, init ff

* pr feedback: bank.forks_try_read() -> read()

* pr feedback: fix local-cluster setup

* local-cluster: do not expose gossip internals, use retry mechanism instead

* local-cluster: split out case 4b into separate test and ignore

* pr feedback: avoid taking lock if ff is already found

* pr feedback: do not cache ff epoch

* pr feedback: bank_forks lock, revert to cached_slots_in_epoch

* pr feedback: move local variable into helper function

* pr feedback: use let else, remove epoch 0 hack

---------

Co-authored-by: Wen <[email protected]>
  • Loading branch information
AshwinSekar and wen-coding authored Jan 26, 2024
1 parent 663a1bb commit 93271d9
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 49 deletions.
3 changes: 2 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl Tvu {
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand Down Expand Up @@ -337,6 +337,7 @@ impl Tvu {
blockstore,
leader_schedule_cache.clone(),
bank_forks.clone(),
duplicate_slots_sender,
),
);

Expand Down
2 changes: 2 additions & 0 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum Error {
BlockstoreInsertFailed(#[from] BlockstoreError),
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("unable to send duplicate slot to state machine")]
DuplicateSlotSenderFailure,
#[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
#[error("invalid duplicate shreds")]
Expand Down
147 changes: 129 additions & 18 deletions gossip/src/duplicate_shred_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use {
duplicate_shred::{self, DuplicateShred, Error},
duplicate_shred_listener::DuplicateShredHandlerTrait,
},
crossbeam_channel::Sender,
log::error,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey,
},
std::{
Expand Down Expand Up @@ -44,6 +46,8 @@ pub struct DuplicateShredHandler {
cached_on_epoch: Epoch,
cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
cached_slots_in_epoch: u64,
// Used to notify duplicate consensus state machine
duplicate_slots_sender: Sender<Slot>,
}

impl DuplicateShredHandlerTrait for DuplicateShredHandler {
Expand All @@ -63,6 +67,7 @@ impl DuplicateShredHandler {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
duplicate_slots_sender: Sender<Slot>,
) -> Self {
Self {
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
Expand All @@ -74,6 +79,7 @@ impl DuplicateShredHandler {
blockstore,
leader_schedule_cache,
bank_forks,
duplicate_slots_sender,
}
}

Expand Down Expand Up @@ -131,12 +137,30 @@ impl DuplicateShredHandler {
shred1.into_payload(),
shred2.into_payload(),
)?;
if self.should_notify_state_machine(slot) {
// Notify duplicate consensus state machine
self.duplicate_slots_sender
.send(slot)
.map_err(|_| Error::DuplicateSlotSenderFailure)?;
}
}
self.consumed.insert(slot, true);
}
Ok(())
}

fn should_notify_state_machine(&self, slot: Slot) -> bool {
let root_bank = self.bank_forks.read().unwrap().root_bank();
let Some(activated_slot) = root_bank
.feature_set
.activated_slot(&feature_set::enable_gossip_duplicate_proof_ingestion::id())
else {
return false;
};
root_bank.epoch_schedule().get_epoch(slot)
> root_bank.epoch_schedule().get_epoch(activated_slot)
}

fn should_consume_slot(&mut self, slot: Slot) -> bool {
slot > self.last_root
&& slot < self.last_root.saturating_add(self.cached_slots_in_epoch)
Expand Down Expand Up @@ -211,12 +235,14 @@ mod tests {
cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
duplicate_shred::{from_shred, tests::new_rand_shred},
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_ledger::{
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
shred::Shredder,
},
solana_runtime::bank::Bank,
solana_runtime::{accounts_background_service::AbsRequestSender, bank::Bank},
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
Expand Down Expand Up @@ -271,37 +297,59 @@ mod tests {
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let mut bank = Bank::new_for_tests(&genesis_config);
bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
let bank_forks_arc = BankForks::new_rw_arc(bank);
{
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
&bank_forks_arc.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let (sender, receiver) = unbounded();
// The feature will only be activated at Epoch 1.
let start_slot: Slot = slots_in_epoch + 1;

let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
1,
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
let chunks1 = create_duplicate_proof(
my_keypair.clone(),
None,
2,
start_slot + 1,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(1));
assert!(!blockstore.has_duplicate_shreds_in_slot(2));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
// Test that two proofs are mixed together, but we can store the proofs fine.
for (chunk1, chunk2) in chunks.zip(chunks1) {
duplicate_shred_handler.handle(chunk1);
duplicate_shred_handler.handle(chunk2);
}
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(blockstore.has_duplicate_shreds_in_slot(2));
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
assert_eq!(
receiver.try_iter().collect_vec(),
vec![start_slot, start_slot + 1]
);

// Test all kinds of bad proofs.
for error in [
Expand All @@ -312,7 +360,7 @@ mod tests {
match create_duplicate_proof(
my_keypair.clone(),
None,
3,
start_slot + 2,
Some(error),
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
) {
Expand All @@ -321,7 +369,8 @@ mod tests {
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(3));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 2));
assert!(receiver.is_empty());
}
}
}
Expand All @@ -337,13 +386,29 @@ mod tests {
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let mut bank = Bank::new_for_tests(&genesis_config);
bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
let bank_forks_arc = BankForks::new_rw_arc(bank);
{
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
&bank_forks_arc.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let start_slot: Slot = 1;
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
// The feature will only be activated at Epoch 1.
let start_slot: Slot = slots_in_epoch + 1;

// This proof will not be accepted because num_chunks is too large.
let chunks = create_duplicate_proof(
Expand All @@ -358,6 +423,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());

// This proof will be rejected because the slot is too far away in the future.
let future_slot =
Expand All @@ -374,6 +440,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot));
assert!(receiver.is_empty());

// Send in two proofs, the first proof showing up will be accepted, the following
// proofs will be discarded.
Expand All @@ -388,10 +455,54 @@ mod tests {
// handle chunk 0 of the first proof.
duplicate_shred_handler.handle(chunks.next().unwrap());
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());
// Now send in the rest of the first proof, it will succeed.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]);
}

#[test]
fn test_feature_disabled() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let my_keypair = Arc::new(Keypair::new());
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let mut bank = Bank::new_for_tests(&genesis_config);
bank.deactivate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
assert!(!bank
.feature_set
.is_active(&feature_set::enable_gossip_duplicate_proof_ingestion::id()));
let bank_forks_arc = BankForks::new_rw_arc(bank);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks_arc.read().unwrap().working_bank(),
));
let (sender, receiver) = unbounded();

let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
1,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(1));
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
// If feature disabled, blockstore gets signal but state machine doesn't see it.
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(receiver.try_iter().collect_vec().is_empty());
}
}
48 changes: 35 additions & 13 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use {
solana_vote_program::vote_transaction,
std::{
borrow::Borrow,
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
path::Path,
sync::{
Expand Down Expand Up @@ -489,6 +489,9 @@ pub fn start_gossip_voter(
+ std::marker::Send
+ 'static,
sleep_ms: u64,
num_expected_peers: usize,
refresh_ms: u64,
max_votes_to_refresh: usize,
) -> GossipVoter {
let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node(
Expand All @@ -503,6 +506,15 @@ pub fn start_gossip_voter(
SocketAddrSpace::Unspecified,
);

// Wait for peer discovery
while cluster_info.gossip_peers().len() < num_expected_peers {
sleep(Duration::from_millis(sleep_ms));
}

let mut latest_voted_slot = 0;
let mut refreshable_votes: VecDeque<(Transaction, VoteTransaction)> = VecDeque::new();
let mut latest_push_attempt = Instant::now();

let t_voter = {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
Expand All @@ -514,6 +526,18 @@ pub fn start_gossip_voter(
}

let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor);
if labels.is_empty() {
if latest_push_attempt.elapsed() > Duration::from_millis(refresh_ms) {
for (leader_vote_tx, parsed_vote) in refreshable_votes.iter().rev() {
let vote_slot = parsed_vote.last_voted_slot().unwrap();
info!("gossip voter refreshing vote {}", vote_slot);
process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
latest_push_attempt = Instant::now();
}
}
sleep(Duration::from_millis(sleep_ms));
continue;
}
let mut parsed_vote_iter: Vec<_> = labels
.into_iter()
.zip(votes)
Expand All @@ -527,22 +551,20 @@ pub fn start_gossip_voter(
});

for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", latest_vote_slot);
process_vote_tx(
latest_vote_slot,
leader_vote_tx,
parsed_vote,
&cluster_info,
)
if let Some(vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", vote_slot);
if vote_slot > latest_voted_slot {
latest_voted_slot = vote_slot;
refreshable_votes
.push_front((leader_vote_tx.clone(), parsed_vote.clone()));
refreshable_votes.truncate(max_votes_to_refresh);
}
process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
latest_push_attempt = Instant::now();
}
// Give vote some time to propagate
sleep(Duration::from_millis(sleep_ms));
}

if parsed_vote_iter.is_empty() {
sleep(Duration::from_millis(sleep_ms));
}
}
})
};
Expand Down
Loading

0 comments on commit 93271d9

Please sign in to comment.