Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Sep 13, 2023
1 parent ec51784 commit 887c216
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 38 deletions.
81 changes: 61 additions & 20 deletions lib/src/store/block_expiration_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,50 @@ impl BlockExpirationTracker {
*self.expiration_time_tx.borrow()
}

pub fn begin_untrack_blocks(&self) -> UntrackTransaction {
UntrackTransaction {
shared: self.shared.clone(),
block_ids: Default::default(),
}
}

#[cfg(test)]
pub fn has_block(&self, block: &BlockId) -> bool {
self.shared.lock().unwrap().blocks_by_id.contains_key(block)
}
}

/// This struct is used to stop tracking blocks inside the BlockExpirationTracker. The reason for
/// "untracking" blocks in a transaction - as opposed to just removing blocks through a simple
/// BlockExpirationTracker method - is that we only want to actually untrack the block once the
/// blocks have been removed from the main DB and the removing DB transaction has been committed
/// successfully.
///
/// Not doing so could result in untracking blocks while those blocks are still in the main DB,
/// therefore they would never expire.
pub(crate) struct UntrackTransaction {
shared: Arc<BlockingMutex<Shared>>,
block_ids: HashSet<BlockId>,
}

impl UntrackTransaction {
pub fn untrack(&mut self, block_id: BlockId) {
self.block_ids.insert(block_id);
}

pub fn commit(self) {
if self.block_ids.is_empty() {
return;
}

let mut shared = self.shared.lock().unwrap();

for block_id in &self.block_ids {
shared.remove_block(block_id);
}
}
}

// For semantics
type TimeUpdated = SystemTime;

Expand Down Expand Up @@ -493,7 +531,10 @@ mod test {
async fn expiration_race() {
let (_base_dir, store) = setup().await;
store
.set_block_expiration(Some(Duration::from_secs(60)), BlockDownloadTracker::new())
.set_block_expiration(
Some(Duration::from_secs(60 * 60)),
BlockDownloadTracker::new(),
)
.await
.unwrap();

Expand All @@ -512,26 +553,13 @@ mod test {

tokio::task::spawn(async move {
let block: Block = rand::random();
add_block(block.clone(), &write_keys, &branch_id, &store).await;

link_block(block.id, &write_keys, &branch_id, &store).await;
//link_block(block.id, &write_keys, &branch_id, &store).await;

let mut tx = store.begin_write().await.unwrap();
tx.receive_block(&block).await.unwrap();
tx.commit().await.unwrap();

let task_rm = tokio::task::spawn({
let block_id = block.id;
let store = store.clone();
async move {
store
.begin_write()
.await
.unwrap()
.remove_block(&block_id)
.await
.unwrap();
}
});
//let mut tx = store.begin_write().await.unwrap();
//tx.receive_block(&block).await.unwrap();
//tx.commit().await.unwrap();

let task_update = tokio::task::spawn({
let block = block.clone();
Expand All @@ -545,14 +573,27 @@ mod test {
.read_block(&block.id, &mut content)
.await;

println!(">>>>>>>>>>>>>>>>>>> {:?} {:?}", block.id, r);
r.is_ok()
}
});

let _ = task_add.await;
let task_rm = tokio::task::spawn({
let block_id = block.id;

Check warning on line 582 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (android)

unused variable: `block_id`

Check warning on line 582 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (windows)

unused variable: `block_id`
let store = store.clone();

Check warning on line 583 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (android)

unused variable: `store`

Check warning on line 583 in lib/src/store/block_expiration_tracker.rs

View workflow job for this annotation

GitHub Actions / check (windows)

unused variable: `store`
async move {
//let mut tx = store.begin_write().await.unwrap();

//tx.remove_block(&block_id).await.unwrap();

//tx.commit().await.unwrap();
}
});

let _ = task_rm.await;
let was_in_db = task_update.await.unwrap();

assert!(was_in_db);
assert!(
!was_in_db
|| store
Expand Down
102 changes: 84 additions & 18 deletions lib/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Store {
block_expiration_tracker: self.block_expiration_tracker.read().await.clone(),
},
},
untrack_blocks: None,
})
}

Expand Down Expand Up @@ -441,6 +442,7 @@ impl DerefMut for ReadTransaction {

pub(crate) struct WriteTransaction {
inner: ReadTransaction,
untrack_blocks: Option<block_expiration_tracker::UntrackTransaction>,
}

impl WriteTransaction {
Expand All @@ -456,9 +458,23 @@ impl WriteTransaction {
index::update_summaries(db, cache, parent_hashes, UpdateSummaryReason::BlockRemoved)
.await?;

if let Some(tracker) = &self.block_expiration_tracker {
std::thread::sleep(Duration::from_millis(rand::random::<u64>() % 200));
tracker.handle_block_removed(id);
let WriteTransaction {
inner:
ReadTransaction {
inner:
Reader {
block_expiration_tracker,
..
},
},
untrack_blocks,
} = self;

if let Some(tracker) = block_expiration_tracker {
//std::thread::sleep(Duration::from_millis(rand::random::<u64>() % 200));
let untrack_tx = untrack_blocks.get_or_insert_with(|| tracker.begin_untrack_blocks());
//tracker.handle_block_removed(id);
untrack_tx.untrack(*id);
}

Ok(())
Expand Down Expand Up @@ -585,7 +601,8 @@ impl WriteTransaction {
let result = block::receive(db, cache, block).await;

if let Some(tracker) = &self.block_expiration_tracker {
std::thread::sleep(Duration::from_millis(rand::random::<u64>() % 200));
//std::thread::sleep(Duration::from_millis(rand::random::<u64>() % 200));
tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 200)).await;
tracker.handle_block_update(&block.id, false);
}

Expand All @@ -596,11 +613,39 @@ impl WriteTransaction {
let inner = self.inner.inner.inner.into_write();
let cache = self.inner.inner.cache;

if cache.is_dirty() {
inner.commit_and_then(move || cache.commit()).await?;
} else {
inner.commit().await?;
}
match (cache.is_dirty(), self.untrack_blocks) {
(true, Some(tx)) => {
inner
.commit_and_then(move || {
cache.commit();
std::thread::sleep(Duration::from_millis(rand::random::<u64>() % 200));
tx.commit();
})
.await?
}
(false, Some(tx)) => {
inner
.commit_and_then(move || {
std::thread::sleep(Duration::from_millis(rand::random::<u64>() % 200));
tx.commit();
})
.await?
}
(true, None) => {
inner
.commit_and_then(move || {
cache.commit();
})
.await?
}
(false, None) => inner.commit().await?,
};

//if let Some(then) = then {
// inner.commit_and_then(then).await?;
//} else {
// inner.commit().await?;
//}

Ok(())
}
Expand Down Expand Up @@ -650,15 +695,36 @@ impl WriteTransaction {
let inner = self.inner.inner.inner.into_write();
let cache = self.inner.inner.cache;

if cache.is_dirty() {
let f = move || {
cache.commit();
f()
};
Ok(inner.commit_and_then(f).await?)
} else {
Ok(inner.commit_and_then(f).await?)
}
Ok(match (cache.is_dirty(), self.untrack_blocks) {
(true, Some(tx)) => {
inner
.commit_and_then(move || {
cache.commit();
tx.commit();
f()
})
.await?
}
(false, Some(tx)) => {
inner
.commit_and_then(move || {
tx.commit();
f()
})
.await?
}
(true, None) => {
inner
.commit_and_then(move || {
cache.commit();
f()
})
.await?
}
(false, None) => inner.commit_and_then(f).await?,
})

//Ok(inner.commit_and_then(then).await?)
}

// Access the underlying database transaction.
Expand Down

0 comments on commit 887c216

Please sign in to comment.