Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossip: notify state machine of duplicate proofs #32963

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl Tvu {
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand Down Expand Up @@ -337,6 +337,7 @@ impl Tvu {
blockstore,
leader_schedule_cache.clone(),
bank_forks.clone(),
duplicate_slots_sender,
),
);

Expand Down
2 changes: 2 additions & 0 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum Error {
BlockstoreInsertFailed(#[from] BlockstoreError),
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("unable to send duplicate slot to state machine")]
DuplicateSlotSenderFailure,
#[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
#[error("invalid duplicate shreds")]
Expand Down
147 changes: 129 additions & 18 deletions gossip/src/duplicate_shred_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use {
duplicate_shred::{self, DuplicateShred, Error},
duplicate_shred_listener::DuplicateShredHandlerTrait,
},
crossbeam_channel::Sender,
log::error,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey,
},
std::{
Expand Down Expand Up @@ -44,6 +46,8 @@ pub struct DuplicateShredHandler {
cached_on_epoch: Epoch,
cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
cached_slots_in_epoch: u64,
// Used to notify duplicate consensus state machine
duplicate_slots_sender: Sender<Slot>,
}

impl DuplicateShredHandlerTrait for DuplicateShredHandler {
Expand All @@ -63,6 +67,7 @@ impl DuplicateShredHandler {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
duplicate_slots_sender: Sender<Slot>,
) -> Self {
Self {
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
Expand All @@ -74,6 +79,7 @@ impl DuplicateShredHandler {
blockstore,
leader_schedule_cache,
bank_forks,
duplicate_slots_sender,
}
}

Expand Down Expand Up @@ -131,12 +137,30 @@ impl DuplicateShredHandler {
shred1.into_payload(),
shred2.into_payload(),
)?;
if self.should_notify_state_machine(slot) {
// Notify duplicate consensus state machine
self.duplicate_slots_sender
.send(slot)
.map_err(|_| Error::DuplicateSlotSenderFailure)?;
}
}
self.consumed.insert(slot, true);
}
Ok(())
}

fn should_notify_state_machine(&self, slot: Slot) -> bool {
let root_bank = self.bank_forks.read().unwrap().root_bank();
let Some(activated_slot) = root_bank
.feature_set
.activated_slot(&feature_set::enable_gossip_duplicate_proof_ingestion::id())
else {
return false;
};
root_bank.epoch_schedule().get_epoch(slot)
> root_bank.epoch_schedule().get_epoch(activated_slot)
}

fn should_consume_slot(&mut self, slot: Slot) -> bool {
slot > self.last_root
&& slot < self.last_root.saturating_add(self.cached_slots_in_epoch)
Expand Down Expand Up @@ -211,12 +235,14 @@ mod tests {
cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
duplicate_shred::{from_shred, tests::new_rand_shred},
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_ledger::{
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
shred::Shredder,
},
solana_runtime::bank::Bank,
solana_runtime::{accounts_background_service::AbsRequestSender, bank::Bank},
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
Expand Down Expand Up @@ -271,37 +297,59 @@ mod tests {
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let mut bank = Bank::new_for_tests(&genesis_config);
bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
let bank_forks_arc = BankForks::new_rw_arc(bank);
{
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
&bank_forks_arc.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let (sender, receiver) = unbounded();
// The feature will only be activated at Epoch 1.
let start_slot: Slot = slots_in_epoch + 1;

let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
1,
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
let chunks1 = create_duplicate_proof(
my_keypair.clone(),
None,
2,
start_slot + 1,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(1));
assert!(!blockstore.has_duplicate_shreds_in_slot(2));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
// Test that two proofs are mixed together, but we can store the proofs fine.
for (chunk1, chunk2) in chunks.zip(chunks1) {
duplicate_shred_handler.handle(chunk1);
duplicate_shred_handler.handle(chunk2);
}
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(blockstore.has_duplicate_shreds_in_slot(2));
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
assert_eq!(
receiver.try_iter().collect_vec(),
vec![start_slot, start_slot + 1]
);

// Test all kinds of bad proofs.
for error in [
Expand All @@ -312,7 +360,7 @@ mod tests {
match create_duplicate_proof(
my_keypair.clone(),
None,
3,
start_slot + 2,
Some(error),
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
) {
Expand All @@ -321,7 +369,8 @@ mod tests {
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(3));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 2));
assert!(receiver.is_empty());
}
}
}
Expand All @@ -337,13 +386,29 @@ mod tests {
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let mut bank = Bank::new_for_tests(&genesis_config);
bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
let bank_forks_arc = BankForks::new_rw_arc(bank);
{
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
&bank_forks_arc.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let start_slot: Slot = 1;
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
// The feature will only be activated at Epoch 1.
let start_slot: Slot = slots_in_epoch + 1;

// This proof will not be accepted because num_chunks is too large.
let chunks = create_duplicate_proof(
Expand All @@ -358,6 +423,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());

// This proof will be rejected because the slot is too far away in the future.
let future_slot =
Expand All @@ -374,6 +440,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot));
assert!(receiver.is_empty());

// Send in two proofs, the first proof showing up will be accepted, the following
// proofs will be discarded.
Expand All @@ -388,10 +455,54 @@ mod tests {
// handle chunk 0 of the first proof.
duplicate_shred_handler.handle(chunks.next().unwrap());
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());
// Now send in the rest of the first proof, it will succeed.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]);
}

#[test]
fn test_feature_disabled() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let my_keypair = Arc::new(Keypair::new());
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let mut bank = Bank::new_for_tests(&genesis_config);
bank.deactivate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
assert!(!bank
.feature_set
.is_active(&feature_set::enable_gossip_duplicate_proof_ingestion::id()));
let bank_forks_arc = BankForks::new_rw_arc(bank);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks_arc.read().unwrap().working_bank(),
));
let (sender, receiver) = unbounded();

let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
1,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(1));
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
// If feature disabled, blockstore gets signal but state machine doesn't see it.
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(receiver.try_iter().collect_vec().is_empty());
}
}
48 changes: 35 additions & 13 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use {
solana_vote_program::vote_transaction,
std::{
borrow::Borrow,
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
path::Path,
sync::{
Expand Down Expand Up @@ -489,6 +489,9 @@ pub fn start_gossip_voter(
+ std::marker::Send
+ 'static,
sleep_ms: u64,
num_expected_peers: usize,
refresh_ms: u64,
max_votes_to_refresh: usize,
) -> GossipVoter {
let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node(
Expand All @@ -503,6 +506,15 @@ pub fn start_gossip_voter(
SocketAddrSpace::Unspecified,
);

// Wait for peer discovery
while cluster_info.gossip_peers().len() < num_expected_peers {
sleep(Duration::from_millis(sleep_ms));
}

let mut latest_voted_slot = 0;
let mut refreshable_votes: VecDeque<(Transaction, VoteTransaction)> = VecDeque::new();
let mut latest_push_attempt = Instant::now();

let t_voter = {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
Expand All @@ -514,6 +526,18 @@ pub fn start_gossip_voter(
}

let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor);
if labels.is_empty() {
if latest_push_attempt.elapsed() > Duration::from_millis(refresh_ms) {
for (leader_vote_tx, parsed_vote) in refreshable_votes.iter().rev() {
let vote_slot = parsed_vote.last_voted_slot().unwrap();
info!("gossip voter refreshing vote {}", vote_slot);
process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
latest_push_attempt = Instant::now();
}
}
sleep(Duration::from_millis(sleep_ms));
continue;
}
let mut parsed_vote_iter: Vec<_> = labels
.into_iter()
.zip(votes)
Expand All @@ -527,22 +551,20 @@ pub fn start_gossip_voter(
});

for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", latest_vote_slot);
process_vote_tx(
latest_vote_slot,
leader_vote_tx,
parsed_vote,
&cluster_info,
)
if let Some(vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", vote_slot);
if vote_slot > latest_voted_slot {
latest_voted_slot = vote_slot;
refreshable_votes
.push_front((leader_vote_tx.clone(), parsed_vote.clone()));
refreshable_votes.truncate(max_votes_to_refresh);
}
process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
latest_push_attempt = Instant::now();
}
// Give vote some time to propagate
sleep(Duration::from_millis(sleep_ms));
}

if parsed_vote_iter.is_empty() {
sleep(Duration::from_millis(sleep_ms));
}
}
})
};
Expand Down
Loading
Loading