Skip to content

Commit

Permalink
Add a failing expiration_race test
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Sep 14, 2023
1 parent c07391e commit 79e8e75
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 6 deletions.
30 changes: 28 additions & 2 deletions lib/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use tempfile::TempDir;
use thiserror::Error;
use tokio::{fs, sync::OwnedMutexGuard as AsyncOwnedMutexGuard, task};

#[cfg(test)]
use crate::sync::break_point::BreakPoint;

const WARN_AFTER_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3);

pub(crate) use self::connection::Connection;
Expand Down Expand Up @@ -182,9 +185,10 @@ impl fmt::Debug for ReadTransaction {
impl_executor_by_deref!(ReadTransaction);

/// Transaction that allows both reading and writing.
#[derive(Debug)]
pub(crate) struct WriteTransaction {
inner: ReadTransaction,
#[cfg(test)]
break_on_commit: Option<BreakPoint>,
}

impl WriteTransaction {
Expand All @@ -200,6 +204,8 @@ impl WriteTransaction {
inner: tx,
track_lifetime: Some(track_lifetime),
},
#[cfg(test)]
break_on_commit: None,
})
}

Expand All @@ -211,7 +217,16 @@ impl WriteTransaction {
/// is guaranteed to be either committed or rolled back but there is no way to tell in advance
/// which of the two operations happens.
pub async fn commit(self) -> Result<(), sqlx::Error> {
self.inner.inner.commit().await
let result = self.inner.inner.commit().await;

#[cfg(test)]
if let Some(mut break_point) = self.break_on_commit {
// Unwrap is OK because this is code is only executed in tests and we want to make sure
// the BreakPointController is used appropriately.
break_point.hit().await.unwrap();
}

result

Check warning on line 229 in lib/src/db/mod.rs

View workflow job for this annotation

GitHub Actions / check (windows)

returning the result of a `let` binding from a block

Check warning on line 229 in lib/src/db/mod.rs

View workflow job for this annotation

GitHub Actions / check (android)

returning the result of a `let` binding from a block
}

/// Commits the transaction and if (and only if) the commit completes successfully, runs the
Expand Down Expand Up @@ -259,6 +274,11 @@ impl WriteTransaction {
.await
.unwrap()
}

#[cfg(test)]
pub fn break_on_commit(&mut self, break_point: BreakPoint) {
self.break_on_commit = Some(break_point);
}
}

impl Deref for WriteTransaction {
Expand All @@ -275,6 +295,12 @@ impl DerefMut for WriteTransaction {
}
}

impl std::fmt::Debug for WriteTransaction {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "WriteTransaction{{ inner:{:?} }}", self.inner)
}
}

impl_executor_by_deref!(WriteTransaction);

/// Shared write transaction
Expand Down
100 changes: 96 additions & 4 deletions lib/src/store/block_expiration_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ mod test {
let write_keys = Keypair::random();
let branch_id = PublicKey::random();

add_block(&write_keys, &branch_id, &store).await;
add_block(rand::random(), &write_keys, &branch_id, &store).await;

assert_eq!(count_blocks(store.db()).await, 1);

Expand All @@ -510,7 +510,7 @@ mod test {

sleep(Duration::from_millis(700)).await;

let block_id = add_block(&write_keys, &branch_id, &store).await;
let block_id = add_block(rand::random(), &write_keys, &branch_id, &store).await;
tracker.handle_block_update(&block_id, false);

assert_eq!(count_blocks(store.db()).await, 2);
Expand All @@ -524,11 +524,103 @@ mod test {
assert_eq!(count_blocks(store.db()).await, 0);
}

async fn add_block(write_keys: &Keypair, branch_id: &PublicKey, store: &Store) -> BlockId {
/// This test checks the condition that "if there is a block in the main database, then it must
/// be in the expiration tracker" in the presence of a race condition as described in the
/// following example:
///
/// When adding a block (`add`), we do "add the block into the main database" (`add.db`), and then
/// "add the block into the expiration tracker" (`add.ex`). Similarly, when removing a block (`rm`)
/// we do "remove the block from the main database" (`rm.db`) and then "remove the block from the
/// expiration tracker" (`rm.ex`).
///
/// As such calling `rm` and `add` concurrently could result in the `{add.db, add.ex, rm.db,
/// rm.ex}` operations to be executed in the following order:
///
/// rm.db -> add.db -> add.ex -> rm.ex
///
/// Unless we explicitly take care of this situation, we'll end up with the block being present
/// in the main database, but not in the expiration tracker. Which would be a violation of the
/// condition from the first paragraph of this comment.
#[tokio::test]
async fn expiration_race() {
let (_base_dir, store) = setup().await;
store
.set_block_expiration(
// Setting expiration time to something big, we don't care about blocks actually
// expiring in this test.
Some(Duration::from_secs(60 * 60 /* one hour */)),
BlockDownloadTracker::new(),
)
.await
.unwrap();

let write_keys = Arc::new(Keypair::random());
let branch_id = PublicKey::random();

let store = store.clone();
let write_keys = write_keys.clone();
let branch_id = branch_id.clone();

Check warning on line 562 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (windows)

using `clone` on type `PublicKey` which implements the `Copy` trait

Check warning on line 562 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (android)

using `clone` on type `PublicKey` which implements the `Copy` trait

let block: Block = rand::random();
add_block(block.clone(), &write_keys, &branch_id, &store).await;

let (on_rm, mut on_rm_controll) = crate::sync::break_point::new();

let task_rm = tokio::task::spawn({
let block_id = block.id;
let store = store.clone();

async move {
let mut tx = store.begin_write().await.unwrap();
tx.break_on_commit(on_rm);
tx.remove_block(&block_id).await.unwrap();
tx.commit().await.unwrap();
}
});

let task_update = tokio::task::spawn({
let block = block.clone();
let store = store.clone();

async move {
on_rm_controll.on_hit().await.unwrap();

let mut tx = store.begin_write().await.unwrap();
tx.receive_block(&block).await.unwrap();
tx.commit().await.unwrap();

on_rm_controll.release();
}
});

task_rm.await.unwrap();
task_update.await.unwrap();

let is_in_exp_tracker = store
.block_expiration_tracker()
.await
.unwrap()
.has_block(&block.id);

let is_in_db = block::exists(&mut store.db().acquire().await.unwrap(), &block.id)
.await
.unwrap();

assert!(
(!is_in_db) || (is_in_db && is_in_exp_tracker),

Check warning on line 610 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (windows)

this boolean expression can be simplified
"is_in_db:{is_in_db:?} is_in_exp_tracker:{is_in_exp_tracker:?}"
);
}

async fn add_block(
block: Block,
write_keys: &Keypair,
branch_id: &PublicKey,
store: &Store,
) -> BlockId {
let mut writer = store.begin_write().await.unwrap();
let mut changeset = Changeset::new();

let block: Block = rand::random();
let block_id = block.id;

changeset.write_block(block);
Expand Down
14 changes: 14 additions & 0 deletions lib/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ mod root_node;

#[cfg(test)]
mod tests;
#[cfg(test)]
use crate::sync::break_point::BreakPoint;

pub use error::Error;

Expand Down Expand Up @@ -116,6 +118,11 @@ impl Store {
.map(|tracker| tracker.block_expiration())
}

#[cfg(test)]
pub async fn block_expiration_tracker(&self) -> Option<Arc<BlockExpirationTracker>> {
self.block_expiration_tracker.read().await.as_ref().cloned()
}

/// Acquires a `Reader`
pub async fn acquire_read(&self) -> Result<Reader, Error> {
Ok(Reader {
Expand Down Expand Up @@ -705,6 +712,8 @@ impl WriteTransaction {
}
(false, None) => inner.commit_and_then(f).await?,
})

//Ok(inner.commit_and_then(then).await?)
}

// Access the underlying database transaction.
Expand All @@ -718,6 +727,11 @@ impl WriteTransaction {
&mut self.inner.inner.cache,
)
}

#[cfg(test)]
pub fn break_on_commit(&mut self, break_point: BreakPoint) {
self.db().break_on_commit(break_point)
}
}

impl Deref for WriteTransaction {
Expand Down

0 comments on commit 79e8e75

Please sign in to comment.