Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: block sync on big blocks #3304

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ version = "0.12"
[dependencies.rand]
version = "0.8"

[dependencies.rayon]
version = "1.10"

[dependencies.reqwest]
version = "0.11"

Expand Down
21 changes: 17 additions & 4 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use snarkvm::prelude::{

use anyhow::{anyhow, bail, Result};
use snarkos_node_tcp::is_bogon_ip;
use std::{net::SocketAddr, time::Instant};
use std::net::SocketAddr;
use tokio::task::spawn_blocking;

/// The max number of peers to send in a `PeerResponse` message.
Expand Down Expand Up @@ -76,6 +76,9 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {

trace!("Received '{}' from '{peer_ip}'", message.name());

// Update the last seen timestamp of the peer.
self.router().update_last_seen_for_connected_peer(peer_ip);

// This match statement handles the inbound message by deserializing the message,
// checking that the message is valid, and then calling the appropriate (trait) handler.
match message {
Expand Down Expand Up @@ -110,7 +113,19 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
}
// Perform the deferred non-blocking deserialization of the blocks.
let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?;
// The deserialization can take a long time (minutes). We should not be running
// this on a blocking task, but on a rayon thread pool.
let (send, recv) = tokio::sync::oneshot::channel();
rayon::spawn_fifo(move || {
let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
Comment on lines +116 to +120
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider moving away from JSON to something more efficient. CapnProto, protobuf, msgpack, etc

let _ = send.send(blocks);
});
let blocks = match recv.await {
Ok(Ok(blocks)) => blocks,
Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
};

// Ensure the block response is well-formed.
blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;

Expand Down Expand Up @@ -169,8 +184,6 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
peer.set_version(message.version);
// Update the node type of the peer.
peer.set_node_type(message.node_type);
// Update the last seen timestamp of the peer.
peer.set_last_seen(Instant::now());
})
{
bail!("[Ping] {error}");
Expand Down
6 changes: 6 additions & 0 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@ impl<N: Network> Router<N> {
Ok(())
}

pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) {
peer.set_last_seen(Instant::now());
}
}

/// Removes the connected peer and adds them to the candidate peers.
pub fn remove_connected_peer(&self, peer_ip: SocketAddr) {
// Removes the bidirectional map between the listener address and (ambiguous) peer address.
Expand Down
21 changes: 20 additions & 1 deletion node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {

/// Processes a message received from the network.
async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
let clone = self.clone();
if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
// Handle BlockRequest and BlockResponse messages in a separate task to not block the
// inbound queue.
tokio::spawn(async move {
clone.process_message_inner(peer_addr, message).await;
});
} else {
self.process_message_inner(peer_addr, message).await;
}
Ok(())
}
}

impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
async fn process_message_inner(
&self,
peer_addr: SocketAddr,
message: <Client<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
) {
// Process the message. Disconnect if the peer violated the protocol.
if let Err(error) = self.inbound(peer_addr, message).await {
if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
Expand All @@ -124,7 +144,6 @@ impl<N: Network, C: ConsensusStorage<N>> Reading for Client<N, C> {
self.router().disconnect(peer_ip);
}
}
Ok(())
}
}

Expand Down
21 changes: 20 additions & 1 deletion node/src/validator/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,26 @@ impl<N: Network, C: ConsensusStorage<N>> Reading for Validator<N, C> {

/// Processes a message received from the network.
async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
let clone = self.clone();
if matches!(message, Message::BlockRequest(_) | Message::BlockResponse(_)) {
// Handle BlockRequest and BlockResponse messages in a separate task to not block the
// inbound queue.
tokio::spawn(async move {
clone.process_message_inner(peer_addr, message).await;
});
} else {
self.process_message_inner(peer_addr, message).await;
}
Ok(())
}
}

impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
async fn process_message_inner(
&self,
peer_addr: SocketAddr,
message: <Validator<N, C> as snarkos_node_tcp::protocols::Reading>::Message,
) {
// Process the message. Disconnect if the peer violated the protocol.
if let Err(error) = self.inbound(peer_addr, message).await {
if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) {
Expand All @@ -120,7 +140,6 @@ impl<N: Network, C: ConsensusStorage<N>> Reading for Validator<N, C> {
self.router().disconnect(peer_ip);
}
}
Ok(())
}
}

Expand Down