Skip to content

Commit

Permalink
Merge pull request #878 from freenet/184207118-add-anti-write-ampl-layer
Browse files Browse the repository at this point in the history
184207118 - Finish anti-write amplification layer
  • Loading branch information
iduartgomez authored Oct 24, 2023
2 parents cb51219 + 2d9c255 commit 618788a
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 218 deletions.
14 changes: 12 additions & 2 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ impl Display for Transaction {
}
}

impl PartialOrd for Transaction {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Transaction {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id)
}
}

/// Get the transaction type associated to a given message type.
pub(crate) trait TxType: sealed_msg_type::SealedTxType {
fn tx_type_id() -> TransactionTypeId;
Expand Down Expand Up @@ -91,7 +103,6 @@ mod sealed_msg_type {
Get,
Subscribe,
Update,
Canceled,
}

impl Display for TransactionType {
Expand All @@ -102,7 +113,6 @@ mod sealed_msg_type {
TransactionType::Get => write!(f, "get"),
TransactionType::Subscribe => write!(f, "subscribe"),
TransactionType::Update => write!(f, "update"),
TransactionType::Canceled => write!(f, "canceled"),
}
}
}
Expand Down
21 changes: 18 additions & 3 deletions crates/core/src/node/conn_manager/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,12 @@ impl ConnectionHandler for Handler {
if msg.track_stats() {
if let Ok(Some(mut op)) = self.op_manager.pop(op_id) {
op.record_transfer();
let _ = self.op_manager.push(*op_id, op);
let fut = self.op_manager.push(*op_id, op);
futures::pin_mut!(fut);
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
}
let op_id = *op_id;
Expand Down Expand Up @@ -954,7 +959,12 @@ impl ConnectionHandler for Handler {
if let Some(op_id) = op_id {
if let Ok(Some(mut op)) = self.op_manager.pop(&op_id) {
op.record_transfer();
let _ = self.op_manager.push(op_id, op);
let fut = self.op_manager.push(op_id, op);
futures::pin_mut!(fut);
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
}
stream = SubstreamState::WaitingMsg { substream, conn_id };
Expand Down Expand Up @@ -983,7 +993,12 @@ impl ConnectionHandler for Handler {
let op_id = msg.id();
if let Ok(Some(mut op)) = self.op_manager.pop(op_id) {
op.record_transfer();
let _ = self.op_manager.push(*op_id, op);
let fut = self.op_manager.push(*op_id, op);
futures::pin_mut!(fut);
match fut.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
if !msg.terminal() {
// received a message, the other peer is waiting for an answer
Expand Down
190 changes: 147 additions & 43 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
use std::{collections::BTreeMap, time::Instant};
use std::{cmp::Reverse, collections::BTreeSet, sync::Arc, time::Duration};

use dashmap::{DashMap, DashSet};
use either::Either;
use parking_lot::RwLock;
use tokio::sync::{mpsc::error::SendError, Mutex};
use tokio::sync::Mutex;

use crate::{
config::GlobalExecutor,
contract::{
ContractError, ContractHandlerEvent, ContractHandlerToEventLoopChannel,
NetEventListenerHalve,
},
dev_tool::ClientId,
message::{Message, Transaction, TransactionType},
operations::{
connect::ConnectOp, get::GetOp, put::PutOp, subscribe::SubscribeOp, update::UpdateOp,
connect::ConnectOp,
get::{self, GetOp},
put::PutOp,
subscribe::SubscribeOp,
update::UpdateOp,
OpEnum, OpError,
},
ring::Ring,
};

use super::{conn_manager::EventLoopNotificationsSender, PeerKey};

#[cfg(debug_assertions)]
macro_rules! check_id_op {
($get_ty:expr, $var:path) => {
if !matches!($get_ty, $var) {
return Err(OpError::IncorrectTxType($var, $get_ty));
}
};
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum OpNotAvailable {
#[error("operation running")]
Expand All @@ -32,49 +45,58 @@ pub(crate) enum OpNotAvailable {
/// Thread safe and friendly data structure to maintain state of the different operations
/// and enable their execution.
pub(crate) struct OpManager {
join_ring: DashMap<Transaction, ConnectOp>,
put: DashMap<Transaction, PutOp>,
get: DashMap<Transaction, GetOp>,
subscribe: DashMap<Transaction, SubscribeOp>,
update: DashMap<Transaction, UpdateOp>,
connect: Arc<DashMap<Transaction, ConnectOp>>,
put: Arc<DashMap<Transaction, PutOp>>,
get: Arc<DashMap<Transaction, GetOp>>,
subscribe: Arc<DashMap<Transaction, SubscribeOp>>,
update: Arc<DashMap<Transaction, UpdateOp>>,
completed: Arc<DashSet<Transaction>>,
under_progress: Arc<DashSet<Transaction>>,
to_event_listener: EventLoopNotificationsSender,
// todo: remove the need for a mutex here
// todo: remove the need for a mutex here if possible
ch_outbound: Mutex<ContractHandlerToEventLoopChannel<NetEventListenerHalve>>,
// FIXME: think of an optimal strategy to check for timeouts and clean up garbage
_ops_ttl: RwLock<BTreeMap<Instant, Vec<Transaction>>>,
// todo: improve this when the anti-write amplification functionality is added
completed: DashSet<Transaction>,
in_progress: DashSet<Transaction>,
new_transactions: tokio::sync::mpsc::Sender<Transaction>,
pub ring: Ring,
}

#[cfg(debug_assertions)]
macro_rules! check_id_op {
($get_ty:expr, $var:path) => {
if !matches!($get_ty, $var) {
return Err(OpError::IncorrectTxType($var, $get_ty));
}
};
}

impl OpManager {
pub(super) fn new(
ring: Ring,
notification_channel: EventLoopNotificationsSender,
contract_handler: ContractHandlerToEventLoopChannel<NetEventListenerHalve>,
) -> Self {
let connect = Arc::new(DashMap::new());
let put = Arc::new(DashMap::new());
let get = Arc::new(DashMap::new());
let subscribe = Arc::new(DashMap::new());
let update = Arc::new(DashMap::new());
let completed = Arc::new(DashSet::new());
let under_progress = Arc::new(DashSet::new());

let (new_transactions, rx) = tokio::sync::mpsc::channel(100);
GlobalExecutor::spawn(garbage_cleanup_task(
rx,
connect.clone(),
put.clone(),
get.clone(),
subscribe.clone(),
update.clone(),
completed.clone(),
under_progress.clone(),
));

Self {
join_ring: DashMap::default(),
put: DashMap::default(),
get: DashMap::default(),
subscribe: DashMap::default(),
update: DashMap::default(),
ring,
connect,
put,
get,
subscribe,
update,
completed,
under_progress,
to_event_listener: notification_channel,
ch_outbound: Mutex::new(contract_handler),
completed: DashSet::new(),
in_progress: DashSet::new(),
_ops_ttl: RwLock::new(BTreeMap::new()),
new_transactions,
ring,
}
}

Expand All @@ -88,13 +110,13 @@ impl OpManager {
msg: Message,
op: OpEnum,
client_id: Option<ClientId>,
) -> Result<(), SendError<(Message, Option<ClientId>)>> {
) -> Result<(), OpError> {
// push back the state to the stack
self.push(*msg.id(), op).expect("infallible");
self.push(*msg.id(), op).await?;
self.to_event_listener
.send(Either::Left((msg, client_id)))
.await
.map_err(|err| SendError(err.0.unwrap_left()))
.map_err(Into::into)
}

/// Send an event to the contract handler and await a response event from it if successful.
Expand All @@ -114,13 +136,14 @@ impl OpManager {
todo!()
}

pub fn push(&self, id: Transaction, op: OpEnum) -> Result<(), OpError> {
self.in_progress.remove(&id);
pub async fn push(&self, id: Transaction, op: OpEnum) -> Result<(), OpError> {
self.under_progress.remove(&id);
self.new_transactions.send(id).await?;
match op {
OpEnum::Connect(op) => {
#[cfg(debug_assertions)]
check_id_op!(id.tx_type(), TransactionType::Connect);
self.join_ring.insert(id, *op);
self.connect.insert(id, *op);
}
OpEnum::Put(op) => {
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -150,12 +173,12 @@ impl OpManager {
if self.completed.contains(id) {
return Err(OpNotAvailable::Completed);
}
if self.in_progress.contains(id) {
if self.under_progress.contains(id) {
return Err(OpNotAvailable::Running);
}
let op = match id.tx_type() {
TransactionType::Connect => self
.join_ring
.connect
.remove(id)
.map(|(_k, v)| v)
.map(|op| OpEnum::Connect(Box::new(op))),
Expand All @@ -167,9 +190,8 @@ impl OpManager {
.map(|(_k, v)| v)
.map(OpEnum::Subscribe),
TransactionType::Update => self.update.remove(id).map(|(_k, v)| v).map(OpEnum::Update),
TransactionType::Canceled => unreachable!(),
};
self.in_progress.insert(*id);
self.under_progress.insert(*id);
Ok(op)
}

Expand All @@ -182,3 +204,85 @@ impl OpManager {
self.ring.prune_connection(peer);
}
}

#[allow(clippy::too_many_arguments)]
async fn garbage_cleanup_task(
mut new_transactions: tokio::sync::mpsc::Receiver<Transaction>,
connect: Arc<DashMap<Transaction, ConnectOp>>,
put: Arc<DashMap<Transaction, PutOp>>,
get: Arc<DashMap<Transaction, GetOp>>,
subscribe: Arc<DashMap<Transaction, SubscribeOp>>,
update: Arc<DashMap<Transaction, UpdateOp>>,
completed: Arc<DashSet<Transaction>>,
under_progress: Arc<DashSet<Transaction>>,
) {
const CLEANUP_INTERVAL: Duration = Duration::from_secs(5);
let mut tick = tokio::time::interval(CLEANUP_INTERVAL);
tick.tick().await;

let mut ttl_set = BTreeSet::new();

let remove_old = move |ttl_set: &mut BTreeSet<Reverse<Transaction>>,
delayed: &mut Vec<Transaction>| {
// generate a random id, since those are sortable by time
// it will allow to get any older transactions, notice the use of reverse
// so the older transactions are removed instead of the newer ones
let older_than: Reverse<Transaction> = Reverse(Transaction::new::<get::GetMsg>());
let mut old_missing = std::mem::replace(delayed, Vec::with_capacity(200));
for tx in old_missing.drain(..) {
if completed.remove(&tx).is_some() {
continue;
}
let still_waiting = match tx.tx_type() {
TransactionType::Connect => connect.remove(&tx).is_none(),
TransactionType::Put => put.remove(&tx).is_none(),
TransactionType::Get => get.remove(&tx).is_none(),
TransactionType::Subscribe => subscribe.remove(&tx).is_none(),
TransactionType::Update => update.remove(&tx).is_none(),
};
if still_waiting {
delayed.push(tx);
}
}
for Reverse(tx) in ttl_set.split_off(&older_than).into_iter() {
if under_progress.contains(&tx) {
delayed.push(tx);
continue;
}
if completed.remove(&tx).is_some() {
continue;
}
match tx.tx_type() {
TransactionType::Connect => {
connect.remove(&tx);
}
TransactionType::Put => {
put.remove(&tx);
}
TransactionType::Get => {
get.remove(&tx);
}
TransactionType::Subscribe => {
subscribe.remove(&tx);
}
TransactionType::Update => {
update.remove(&tx);
}
}
}
};

let mut delayed = vec![];
loop {
tokio::select! {
tx = new_transactions.recv() => {
if let Some(tx) = tx {
ttl_set.insert(Reverse(tx));
}
}
_ = tick.tick() => {
remove_old(&mut ttl_set, &mut delayed);
}
}
}
}
Loading

0 comments on commit 618788a

Please sign in to comment.