Skip to content

Commit

Permalink
Merge work on block expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Aug 30, 2023
2 parents a62de49 + b604415 commit eb88357
Show file tree
Hide file tree
Showing 28 changed files with 1,081 additions and 110 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ tokio = { version = "1.28.2", default-features = false }
tokio-stream = { version = "0.1.9", default-features = false }
tokio-util = "0.7.4"
tracing = { version = "0.1.35" }
tracing-subscriber = { version = "0.3.15" }
tracing-subscriber = { version = "0.3.17" }
turmoil = { git = "https://github.com/tokio-rs/turmoil" }

[profile.bench]
Expand Down
4 changes: 3 additions & 1 deletion bridge/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl ToErrorCode for Error {
AmbiguousEntry => ErrorCode::AmbiguousEntry,
DirectoryNotEmpty => ErrorCode::DirectoryNotEmpty,
OperationNotSupported => ErrorCode::OperationNotSupported,
NonUtf8FileName | OffsetOutOfRange => ErrorCode::InvalidArgument,
InvalidArgument | NonUtf8FileName | OffsetOutOfRange => {
ErrorCode::InvalidArgument
}
StorageVersionMismatch => ErrorCode::StorageVersionMismatch,
EntryIsFile | EntryIsDirectory | Writer(_) | Locked => ErrorCode::Other,
}
Expand Down
38 changes: 36 additions & 2 deletions bridge/src/repository.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
config::{ConfigError, ConfigKey, ConfigStore},
device_id,
error::Result,
error::{Error, Result},
protocol::remote::{Request, Response},
transport::RemoteClient,
};
Expand All @@ -11,10 +11,14 @@ use ouisync_lib::{
crypto::Password, Access, AccessMode, AccessSecrets, LocalSecret, ReopenToken, Repository,
RepositoryParams, ShareToken, StateMonitor, StorageSize,
};
use std::{borrow::Cow, sync::Arc};
use std::{borrow::Cow, sync::Arc, time::Duration};
use tokio_rustls::rustls;

const DEFAULT_QUOTA_KEY: ConfigKey<u64> = ConfigKey::new("default_quota", "Default storage quota");
const DEFAULT_BLOCK_EXPIRATION_MILLIS: ConfigKey<u64> = ConfigKey::new(
"default_block_expiration",
"Default time in seconds when blocks start to expire if not used",
);

/// Creates a new repository and set access to it based on the following table:
///
Expand Down Expand Up @@ -57,6 +61,9 @@ pub async fn create(
let quota = get_default_quota(config).await?;
repository.set_quota(quota).await?;

let block_expiration = get_default_block_expiration(config).await?;
repository.set_block_expiration(block_expiration).await?;

Ok(repository)
}

Expand Down Expand Up @@ -218,6 +225,33 @@ pub async fn get_default_quota(config: &ConfigStore) -> Result<Option<StorageSiz
}
}

pub async fn set_default_block_expiration(
config: &ConfigStore,
value: Option<Duration>,
) -> Result<()> {
let entry = config.entry(DEFAULT_BLOCK_EXPIRATION_MILLIS);

if let Some(value) = value {
entry
.set(&u64::try_from(value.as_millis()).map_err(|_| Error::InvalidArgument)?)
.await?;
} else {
entry.remove().await?;
}

Ok(())
}

pub async fn get_default_block_expiration(config: &ConfigStore) -> Result<Option<Duration>> {
let entry = config.entry::<u64>(DEFAULT_BLOCK_EXPIRATION_MILLIS);

match entry.get().await {
Ok(millis) => Ok(Some(Duration::from_millis(millis))),
Err(ConfigError::NotFound) => Ok(None),
Err(error) => Err(error.into()),
}
}

/// Mirror the repository to the storage servers
pub async fn mirror(
repository: &Repository,
Expand Down
36 changes: 35 additions & 1 deletion cli/src/handler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ouisync_bridge::{
transport::NotificationSender,
};
use ouisync_lib::{PeerAddr, ShareToken};
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, time::Duration};

#[derive(Clone)]
pub(crate) struct LocalHandler {
Expand Down Expand Up @@ -365,6 +365,40 @@ impl ouisync_bridge::transport::Handler for LocalHandler {
Ok(().into())
}
}
Request::BlockExpiration {
name,
remove,
value,
} => {
let value = value.map(Duration::from_secs);
let value = if remove { Some(None) } else { value.map(Some) };

if let Some(name) = name {
let holder = self.state.repositories.find(&name)?;

if let Some(value) = value {
holder.repository.set_block_expiration(value).await?;
Ok(().into())
} else {
let block_expiration = holder.repository.block_expiration().await;
Ok(Response::BlockExpiration(block_expiration))
}
} else if let Some(value) = value {
ouisync_bridge::repository::set_default_block_expiration(
&self.state.config,
value,
)
.await?;
Ok(().into())
} else {
let block_expiration =
ouisync_bridge::repository::get_default_block_expiration(
&self.state.config,
)
.await?;
Ok(Response::BlockExpiration(block_expiration))
}
}
}
}
}
22 changes: 21 additions & 1 deletion cli/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::{builder::BoolishValueParser, Subcommand};
use ouisync_bridge::logger::LogFormat;
use ouisync_lib::{AccessMode, PeerAddr, PeerInfo, StorageSize};
use serde::{Deserialize, Serialize};
use std::{fmt, net::SocketAddr, path::PathBuf};
use std::{fmt, net::SocketAddr, path::PathBuf, time::Duration};

#[derive(Subcommand, Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -201,6 +201,24 @@ pub(crate) enum Request {
/// Ti, Gi, ...) and decimal (k, M, T, G, ...) suffixes.
value: Option<StorageSize>,
},
/// Get or set block expiration
BlockExpiration {
/// Name of the repository to get/set the quota for
#[arg(
short,
long,
required_unless_present = "default",
conflicts_with = "default"
)]
name: Option<String>,

/// Remove the quota
#[arg(short, long, conflicts_with = "value")]
remove: bool,

/// Set duration after which blocks are removed if not used (in seconds).
value: Option<u64>,
},
}

#[derive(Serialize, Deserialize)]
Expand All @@ -213,6 +231,7 @@ pub(crate) enum Response {
SocketAddrs(Vec<SocketAddr>),
StorageSize(StorageSize),
QuotaInfo(QuotaInfo),
BlockExpiration(Option<Duration>),
}

impl From<()> for Response {
Expand Down Expand Up @@ -296,6 +315,7 @@ impl fmt::Display for Response {
}
Self::StorageSize(value) => write!(f, "{value}"),
Self::QuotaInfo(info) => write!(f, "{info}"),
Self::BlockExpiration(info) => write!(f, "{info:?}"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use self::{hash_map::HashMap, hash_set::HashSet};

pub mod hash_map {
pub use rand::RandomState;
pub use std::collections::hash_map::Entry;
pub use std::collections::hash_map::{Entry, OccupiedEntry, VacantEntry};

pub type HashMap<K, V, S = RandomState> = std::collections::HashMap<K, V, S>;
}
Expand Down
2 changes: 2 additions & 0 deletions lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum Error {
// TODO: remove
#[error("data is malformed")]
MalformedData,
#[error("invalid argument")]
InvalidArgument,
#[error("not a directory or directory malformed")]
MalformedDirectory,
#[error("entry already exists")]
Expand Down
40 changes: 38 additions & 2 deletions lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
crypto::{sign::PublicKey, CacheHash, Hashable},
error::{Error, Result},
protocol::{
BlockData, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence, UntrustedProof,
BlockData, BlockId, BlockNonce, InnerNodeMap, LeafNodeSet, MultiBlockPresence,
UntrustedProof,
},
repository::{BlockRequestMode, RepositoryMonitor, Vault},
store::{self, ReceiveFilter},
Expand Down Expand Up @@ -64,6 +65,8 @@ impl Client {
pub async fn run(&mut self, rx: &mut mpsc::Receiver<Response>) -> Result<()> {
self.receive_filter.reset().await?;

let mut reload_index_rx = self.vault.store().client_reload_index_tx.subscribe();

let mut block_promise_acceptor = self.block_tracker.acceptor();

// We're making sure to not send more requests than MAX_PENDING_RESPONSES, but there may be
Expand Down Expand Up @@ -108,6 +111,12 @@ impl Client {
result?;
break;
}
branch_to_reload = reload_index_rx.recv() => {
match branch_to_reload {
Ok(branch_to_reload) => self.reload_index(&branch_to_reload),
Err(_) => (),
}
}
}
}

Expand Down Expand Up @@ -186,6 +195,17 @@ impl Client {
.measure_ok(self.handle_block(data, nonce, block_promise, debug))
.await
}
PendingResponse::BlockNotFound {
block_id,
permit: _permit,
debug,
} => {
self.vault
.monitor
.handle_block_not_found_metric
.measure_ok(self.handle_block_not_found(block_id, debug))
.await
}
}
}

Expand Down Expand Up @@ -347,6 +367,18 @@ impl Client {
}
}

#[instrument(skip_all, fields(block_id), err(Debug))]
async fn handle_block_not_found(
&self,
block_id: BlockId,
_debug: DebugReceivedResponse,
) -> Result<()> {
tracing::trace!("Client received block not found {:?}", block_id);
self.vault
.receive_block_not_found(block_id, &self.receive_filter)
.await
}

// Request again the branches that became completed. This is to cover the following edge
// case:
//
Expand All @@ -366,7 +398,7 @@ impl Client {
// requested as soon as possible.
fn refresh_branches(&self, branches: &[PublicKey]) {
for branch_id in branches {
self.enqueue_request(PendingRequest::RootNode(*branch_id, DebugRequest::start()));
self.reload_index(branch_id);
}
}

Expand Down Expand Up @@ -406,6 +438,10 @@ impl Client {
);
}
}

fn reload_index(&self, branch_id: &PublicKey) {
self.enqueue_request(PendingRequest::RootNode(*branch_id, DebugRequest::start()));
}
}

fn start_sender(
Expand Down
6 changes: 0 additions & 6 deletions lib/src/network/dht_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ pub const DHT_ROUTERS: &[&str] = &[
pub const MIN_DHT_ANNOUNCE_DELAY: Duration = Duration::from_secs(3 * 60);
pub const MAX_DHT_ANNOUNCE_DELAY: Duration = Duration::from_secs(6 * 60);

#[derive(Clone)]
pub struct ActiveDhtNodes {
pub good: HashSet<SocketAddr>,
pub questionable: HashSet<SocketAddr>,
}

#[async_trait]
pub trait DhtContactsStoreTrait: Sync + Send + 'static {
async fn load_v4(&self) -> io::Result<HashSet<SocketAddrV4>>;
Expand Down
12 changes: 12 additions & 0 deletions lib/src/network/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ pub(super) enum PendingResponse {
permit: Option<ClientPermit>,
debug: DebugReceivedResponse,
},
BlockNotFound {
block_id: BlockId,
permit: Option<ClientPermit>,
debug: DebugReceivedResponse,
},
}

pub(super) struct PendingRequests {
Expand Down Expand Up @@ -184,6 +189,13 @@ impl PendingRequests {
};
Some(r)
}
ProcessedResponse::Failure(processed_response::Failure::Block(block_id, debug)) => {
Some(PendingResponse::BlockNotFound {
block_id,
permit,
debug,
})
}
ProcessedResponse::Failure(_) => None,
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl<'a> Responder<'a> {
.store()
.acquire_read()
.await?
.read_block(&id, &mut content)
.read_block_on_peer_request(&id, &mut content, &self.vault.block_tracker)
.await;

match result {
Expand Down
23 changes: 8 additions & 15 deletions lib/src/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
server::Server,
};
use crate::{
block_tracker::{BlockTracker, OfferState},
block_tracker::OfferState,
crypto::sign::{Keypair, PublicKey},
db,
event::{Event, EventSender, Payload},
Expand All @@ -14,9 +14,8 @@ use crate::{
test_utils::{receive_blocks, receive_nodes, Snapshot},
BlockId, RootNode, SingleBlockPresence, VersionVectorOp, BLOCK_SIZE,
},
repository::{BlockRequestMode, LocalId, RepositoryId, RepositoryMonitor, Vault},
repository::{BlockRequestMode, RepositoryId, RepositoryMonitor, Vault},
state_monitor::StateMonitor,
store::Store,
test_utils,
version_vector::VersionVector,
};
Expand Down Expand Up @@ -371,24 +370,18 @@ async fn create_repository<R: Rng + CryptoRng>(
write_keys: &Keypair,
) -> (TempDir, Vault, PublicKey) {
let (base_dir, db) = db::create_temp().await.unwrap();
let store = Store::new(db);
let writer_id = PublicKey::generate(rng);
let repository_id = RepositoryId::from(write_keys.public);
let event_tx = EventSender::new(1);

let state = Vault {

let state = Vault::new(
repository_id,
store,
event_tx,
block_tracker: BlockTracker::new(),
block_request_mode: BlockRequestMode::Greedy,
local_id: LocalId::new(),
monitor: Arc::new(RepositoryMonitor::new(
StateMonitor::make_root(),
Metrics::new(),
"test",
)),
};
db,
BlockRequestMode::Greedy,
RepositoryMonitor::new(StateMonitor::make_root(), Metrics::new(), "test"),
);

(base_dir, state, writer_id)
}
Expand Down
Loading

0 comments on commit eb88357

Please sign in to comment.