Skip to content

Commit

Permalink
gossip: notify state machine of duplicate proofs
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Nov 30, 2023
1 parent c3323c0 commit bd931bf
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
3 changes: 2 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl Tvu {
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand Down Expand Up @@ -327,6 +327,7 @@ impl Tvu {
blockstore,
leader_schedule_cache.clone(),
bank_forks.clone(),
duplicate_slots_sender,
),
);

Expand Down
2 changes: 2 additions & 0 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum Error {
BlockstoreInsertFailed(#[from] BlockstoreError),
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("unable to send duplicate slot to state machine")]
DuplicateSlotSenderFailure,
#[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
#[error("invalid duplicate shreds")]
Expand Down
35 changes: 31 additions & 4 deletions gossip/src/duplicate_shred_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
duplicate_shred::{self, DuplicateShred, Error},
duplicate_shred_listener::DuplicateShredHandlerTrait,
},
crossbeam_channel::Sender,
log::error,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_runtime::bank_forks::BankForks,
Expand Down Expand Up @@ -44,6 +45,8 @@ pub struct DuplicateShredHandler {
cached_on_epoch: Epoch,
cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
cached_slots_in_epoch: u64,
// Used to notify duplicate consensus state machine
duplicate_slots_sender: Sender<Slot>,
}

impl DuplicateShredHandlerTrait for DuplicateShredHandler {
Expand All @@ -63,6 +66,7 @@ impl DuplicateShredHandler {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
duplicate_slots_sender: Sender<Slot>,
) -> Self {
Self {
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
Expand All @@ -74,6 +78,7 @@ impl DuplicateShredHandler {
blockstore,
leader_schedule_cache,
bank_forks,
duplicate_slots_sender,
}
}

Expand Down Expand Up @@ -131,6 +136,10 @@ impl DuplicateShredHandler {
shred1.into_payload(),
shred2.into_payload(),
)?;
// Notify duplicate consensus state machine
self.duplicate_slots_sender
.send(slot)
.map_err(|_| Error::DuplicateSlotSenderFailure)?;
}
self.consumed.insert(slot, true);
}
Expand Down Expand Up @@ -211,6 +220,8 @@ mod tests {
cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
duplicate_shred::{from_shred, tests::new_rand_shred},
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_ledger::{
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
Expand Down Expand Up @@ -275,8 +286,13 @@ mod tests {
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
Expand All @@ -302,6 +318,7 @@ mod tests {
}
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(blockstore.has_duplicate_shreds_in_slot(2));
assert_eq!(receiver.try_iter().collect_vec(), vec![1, 2]);

// Test all kinds of bad proofs.
for error in [
Expand All @@ -322,6 +339,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(3));
assert!(receiver.is_empty());
}
}
}
Expand All @@ -341,8 +359,13 @@ mod tests {
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks,
sender,
);
let start_slot: Slot = 1;

// This proof will not be accepted because num_chunks is too large.
Expand All @@ -358,6 +381,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());

// This proof will be rejected because the slot is too far away in the future.
let future_slot =
Expand All @@ -374,6 +398,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot));
assert!(receiver.is_empty());

// Send in two proofs, the first proof showing up will be accepted, the following
// proofs will be discarded.
Expand All @@ -388,10 +413,12 @@ mod tests {
// handle chunk 0 of the first proof.
duplicate_shred_handler.handle(chunks.next().unwrap());
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());
// Now send in the rest of the first proof, it will succeed.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]);
}
}

0 comments on commit bd931bf

Please sign in to comment.