Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
sc-consensus-beefy: improve beefy gossip validator (#13606)
Browse files Browse the repository at this point in the history
* sc-consensus-beefy: improve beefy gossip validator

Old gossip validator was pretty dumb, being very permissive with
incoming votes - only condition it had was to be newer than best
finalized.

New filter conditions:
 - voter rounds are initialized (discarding votes until voter is
   actually active),
 - only votes for current active set id are accepted,
 - only votes for rounds in the current voting session are accepted,
 - only votes for GRANDPA finalized blocks are accepted,
 - when BEEFY voter reaches mandatory round, only votes for said
   mandatory round are accepted.

New validator uses the VoterOracle to easily implement above conditions
and only allow through votes that are immediately useful to the voter.

After every GRANDPA or BEEFY finality, the gossip validator filter is
updated.

* sc-consensus-beefy: remove votes enqueueing

Since gossip validator will simply disallow votes for future rounds,
and only allow votes that the voter can immediately process, there
is no need for the voter to enqueue votes.

It will see these "future" votes later in rebroadcasts, when voter
will also be able to process them. Only at that point does gossip
accept and consume them.

* sc-consensus-beefy: refactor persistent state

Move best-beefy and best-grandpa into VoterOracle instead
of passing them around as params.
VoterOracle ultimately needs to know best-beefy and/or best-grandpa
for most of its functions.

* sc-consensus-beefy: further restrict gossip validator

Assuming mandatory done in current session:
Instead of allowing votes for any round in the current session, only
accept votes for rounds equal or better than best BEEFY finalized.

* sc-consensus-beefy: add a couple of comments

* sc-consensus-beefy: fix tests involving multiple tasks

Finalize blocks one a time in tests where we want gossip to happen
in a certain round. Otherwise, some tasks may be left behind in
terms of gossip round numbers because once "scheduled" a task will
greedily process as much as possible.

This change should be in line with the real-world scenario where
voters run "in parallel" across nodes, the only points of
synchronization being the finality notifications.

* sc-consensus-beefy: address review comments

---------

Signed-off-by: acatangiu <[email protected]>
  • Loading branch information
acatangiu authored Mar 16, 2023
1 parent 9f933c4 commit b0700a6
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 574 deletions.
6 changes: 3 additions & 3 deletions client/consensus/beefy/src/aux_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_runtime::traits::Block as BlockT;
const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state";

const CURRENT_VERSION: u32 = 2;
const CURRENT_VERSION: u32 = 3;

pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> ClientResult<()> {
info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION);
Expand Down Expand Up @@ -63,8 +63,8 @@ where

match version {
None => (),
Some(1) => (), // version 1 is totally obsolete and should be simply ignored
Some(2) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
Some(1) | Some(2) => (), // versions 1 & 2 are obsolete and should be simply ignored
Some(3) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
other =>
return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
}
Expand Down
204 changes: 81 additions & 123 deletions client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ use wasm_timer::Instant;
use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET};
use sp_consensus_beefy::{
crypto::{Public, Signature},
VoteMessage,
ValidatorSetId, VoteMessage,
};

// Timeout for rebroadcasting messages.
const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5);
#[cfg(not(test))]
const REBROADCAST_AFTER: Duration = Duration::from_secs(60);
#[cfg(test)]
const REBROADCAST_AFTER: Duration = Duration::from_secs(5);

/// Gossip engine messages topic
pub(crate) fn topic<B: Block>() -> B::Hash
Expand All @@ -45,45 +48,51 @@ where
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
}

#[derive(Debug)]
pub(crate) struct GossipVoteFilter<B: Block> {
pub start: NumberFor<B>,
pub end: NumberFor<B>,
pub validator_set_id: ValidatorSetId,
}

/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];

struct KnownVotes<B: Block> {
last_done: Option<NumberFor<B>>,
struct VotesFilter<B: Block> {
filter: Option<GossipVoteFilter<B>>,
live: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
}

impl<B: Block> KnownVotes<B> {
impl<B: Block> VotesFilter<B> {
pub fn new() -> Self {
Self { last_done: None, live: BTreeMap::new() }
}

/// Create new round votes set if not already present.
fn insert(&mut self, round: NumberFor<B>) {
self.live.entry(round).or_default();
Self { filter: None, live: BTreeMap::new() }
}

/// Remove `round` and older from live set, update `last_done` accordingly.
fn conclude(&mut self, round: NumberFor<B>) {
self.live.retain(|&number, _| number > round);
self.last_done = self.last_done.max(Some(round));
/// Update filter to new `start` and `set_id`.
fn update(&mut self, filter: GossipVoteFilter<B>) {
self.live.retain(|&round, _| round >= filter.start && round <= filter.end);
self.filter = Some(filter);
}

/// Return true if `round` is newer than previously concluded rounds.
/// Return true if `round` is >= than `max(session_start, best_beefy)`,
/// and vote set id matches session set id.
///
/// Latest concluded round is still considered alive to allow proper gossiping for it.
fn is_live(&self, round: &NumberFor<B>) -> bool {
Some(*round) >= self.last_done
fn is_live(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
self.filter
.as_ref()
.map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end)
.unwrap_or(false)
}

/// Add new _known_ `hash` to the round's known votes.
fn add_known(&mut self, round: &NumberFor<B>, hash: MessageHash) {
self.live.get_mut(round).map(|known| known.insert(hash));
fn add_known(&mut self, round: NumberFor<B>, hash: MessageHash) {
self.live.entry(round).or_default().insert(hash);
}

/// Check if `hash` is already part of round's known votes.
fn is_known(&self, round: &NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(round).map(|known| known.contains(hash)).unwrap_or(false)
fn is_known(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
}
}

Expand All @@ -100,7 +109,7 @@ where
B: Block,
{
topic: B::Hash,
known_votes: RwLock<KnownVotes<B>>,
votes_filter: RwLock<VotesFilter<B>>,
next_rebroadcast: Mutex<Instant>,
known_peers: Arc<Mutex<KnownPeers<B>>>,
}
Expand All @@ -112,26 +121,18 @@ where
pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
GossipValidator {
topic: topic::<B>(),
known_votes: RwLock::new(KnownVotes::new()),
votes_filter: RwLock::new(VotesFilter::new()),
next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
known_peers,
}
}

/// Note a voting round.
///
/// Noting round will track gossiped votes for `round`.
pub(crate) fn note_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to note gossip round #{}", round);
self.known_votes.write().insert(round);
}

/// Conclude a voting round.
/// Update gossip validator filter.
///
/// This can be called once round is complete so we stop gossiping for it.
pub(crate) fn conclude_round(&self, round: NumberFor<B>) {
debug!(target: LOG_TARGET, "🥩 About to drop gossip round #{}", round);
self.known_votes.write().conclude(round);
/// Only votes for `set_id` and rounds `start <= round <= end` will be accepted.
pub(crate) fn update_filter(&self, filter: GossipVoteFilter<B>) {
debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter);
self.votes_filter.write().update(filter);
}
}

Expand All @@ -152,25 +153,26 @@ where
if let Ok(msg) = VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
let msg_hash = twox_64(data);
let round = msg.commitment.block_number;
let set_id = msg.commitment.validator_set_id;
self.known_peers.lock().note_vote_for(*sender, round);

// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
{
let known_votes = self.known_votes.read();
let filter = self.votes_filter.read();

if !known_votes.is_live(&round) {
if !filter.is_live(round, set_id) {
return ValidationResult::Discard
}

if known_votes.is_known(&round, &msg_hash) {
if filter.is_known(round, &msg_hash) {
return ValidationResult::ProcessAndKeep(self.topic)
}
}

if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
self.known_votes.write().add_known(&round, msg_hash);
self.known_peers.lock().note_vote_for(*sender, round);
self.votes_filter.write().add_known(round, msg_hash);
return ValidationResult::ProcessAndKeep(self.topic)
} else {
// TODO: report peer
Expand All @@ -185,15 +187,16 @@ where
}

fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
let known_votes = self.known_votes.read();
let filter = self.votes_filter.read();
Box::new(move |_topic, mut data| {
let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
Ok(vote) => vote,
Err(_) => return true,
};

let round = msg.commitment.block_number;
let expired = !known_votes.is_live(&round);
let set_id = msg.commitment.validator_set_id;
let expired = !filter.is_live(round, set_id);

trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired);

Expand All @@ -208,14 +211,15 @@ where
let now = Instant::now();
let mut next_rebroadcast = self.next_rebroadcast.lock();
if now >= *next_rebroadcast {
trace!(target: LOG_TARGET, "🥩 Gossip rebroadcast");
*next_rebroadcast = now + REBROADCAST_AFTER;
true
} else {
false
}
};

let known_votes = self.known_votes.read();
let filter = self.votes_filter.read();
Box::new(move |_who, intent, _topic, mut data| {
if let MessageIntent::PeriodicRebroadcast = intent {
return do_rebroadcast
Expand All @@ -227,7 +231,8 @@ where
};

let round = msg.commitment.block_number;
let allowed = known_votes.is_live(&round);
let set_id = msg.commitment.validator_set_id;
let allowed = filter.is_live(round, set_id);

trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed);

Expand All @@ -252,81 +257,35 @@ mod tests {

#[test]
fn known_votes_insert_remove() {
let mut kv = KnownVotes::<Block>::new();
let mut kv = VotesFilter::<Block>::new();
let msg_hash = twox_64(b"data");

kv.insert(1);
kv.insert(1);
kv.insert(2);
kv.add_known(1, msg_hash);
kv.add_known(1, msg_hash);
kv.add_known(2, msg_hash);
assert_eq!(kv.live.len(), 2);

let mut kv = KnownVotes::<Block>::new();
kv.insert(1);
kv.insert(2);
kv.insert(3);
kv.add_known(3, msg_hash);
assert!(kv.is_known(3, &msg_hash));
assert!(!kv.is_known(3, &twox_64(b"other")));
assert!(!kv.is_known(4, &msg_hash));
assert_eq!(kv.live.len(), 3);

assert!(kv.last_done.is_none());
kv.conclude(2);
assert_eq!(kv.live.len(), 1);
assert!(!kv.live.contains_key(&2));
assert_eq!(kv.last_done, Some(2));
assert!(kv.filter.is_none());
assert!(!kv.is_live(1, 1));

kv.conclude(1);
assert_eq!(kv.last_done, Some(2));
kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 });
assert_eq!(kv.live.len(), 1);
assert!(kv.live.contains_key(&3));
assert!(!kv.is_live(2, 1));
assert!(kv.is_live(3, 1));
assert!(kv.is_live(4, 1));
assert!(!kv.is_live(4, 2));

kv.conclude(3);
assert_eq!(kv.last_done, Some(3));
kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 });
assert!(kv.live.is_empty());
}

#[test]
fn note_and_drop_round_works() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));

gv.note_round(1u64);

assert!(gv.known_votes.read().is_live(&1u64));

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

assert_eq!(gv.known_votes.read().live.len(), 4);

gv.conclude_round(7u64);

let votes = gv.known_votes.read();

// rounds 1 and 3 are outdated, don't gossip anymore
assert!(!votes.is_live(&1u64));
assert!(!votes.is_live(&3u64));
// latest concluded round is still gossiped
assert!(votes.is_live(&7u64));
// round 10 is alive and in-progress
assert!(votes.is_live(&10u64));
}

#[test]
fn note_same_round_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

assert_eq!(gv.known_votes.read().live.len(), 3);

// note round #7 again -> should not change anything
gv.note_round(7u64);

let votes = gv.known_votes.read();

assert_eq!(votes.live.len(), 3);

assert!(votes.is_live(&3u64));
assert!(votes.is_live(&7u64));
assert!(votes.is_live(&10u64));
}

struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
Expand Down Expand Up @@ -368,21 +327,18 @@ mod tests {
#[test]
fn should_avoid_verifying_signatures_twice() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let mut context = TestContext;

let vote = dummy_vote(3);

gv.note_round(3u64);
gv.note_round(7u64);
gv.note_round(10u64);

// first time the cache should be populated
let res = gv.validate(&mut context, &sender, &vote.encode());

assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
assert_eq!(
gv.known_votes.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
Some(1)
);

Expand All @@ -392,9 +348,11 @@ mod tests {
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));

// next we should quickly reject if the round is not live
gv.conclude_round(7_u64);
gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 });

assert!(!gv.known_votes.read().is_live(&vote.commitment.block_number));
let number = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
assert!(!gv.votes_filter.read().is_live(number, set_id));

let res = gv.validate(&mut context, &sender, &vote.encode());

Expand All @@ -404,14 +362,13 @@ mod tests {
#[test]
fn messages_allowed_and_expired() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let topic = Default::default();
let intent = MessageIntent::Broadcast;

// note round 2 and 3, then conclude 2
gv.note_round(2u64);
gv.note_round(3u64);
gv.conclude_round(2u64);
// conclude 2
gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 });
let mut allowed = gv.message_allowed();
let mut expired = gv.message_expired();

Expand Down Expand Up @@ -447,6 +404,7 @@ mod tests {
#[test]
fn messages_rebroadcast() {
let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
let sender = sc_network::PeerId::random();
let topic = Default::default();

Expand Down
Loading

0 comments on commit b0700a6

Please sign in to comment.