Skip to content

Commit

Permalink
Untrack blocks from expiration tracker only after successful removal
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Sep 14, 2023
1 parent 9684493 commit c07391e
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 20 deletions.
48 changes: 44 additions & 4 deletions lib/src/store/block_expiration_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,57 @@ impl BlockExpirationTracker {
self.watch_tx.send(()).unwrap_or(());
}

pub fn handle_block_removed(&self, block: &BlockId) {
self.shared.lock().unwrap().remove_block(block);
}

pub fn set_expiration_time(&self, expiration_time: Duration) {
self.expiration_time_tx.send(expiration_time).unwrap_or(());
}

pub fn block_expiration(&self) -> Duration {
*self.expiration_time_tx.borrow()
}

pub fn begin_untrack_blocks(&self) -> UntrackTransaction {
UntrackTransaction {
shared: self.shared.clone(),
block_ids: Default::default(),
}
}

#[cfg(test)]
pub fn has_block(&self, block: &BlockId) -> bool {
self.shared.lock().unwrap().blocks_by_id.contains_key(block)
}
}

/// This struct is used to stop tracking blocks inside the BlockExpirationTracker. The reason for
/// "untracking" blocks in a transaction - as opposed to just removing blocks through a simple
/// BlockExpirationTracker method - is that we only want to actually untrack the block once the
/// blocks have been removed from the main DB and the removing DB transaction has been committed
/// successfully.
///
/// Not doing so could result in untracking blocks while those blocks are still in the main DB,
/// therefore they would never expire.
pub(crate) struct UntrackTransaction {
shared: Arc<BlockingMutex<Shared>>,
block_ids: HashSet<BlockId>,
}

impl UntrackTransaction {
pub fn untrack(&mut self, block_id: BlockId) {
self.block_ids.insert(block_id);
}

pub fn commit(self) {
if self.block_ids.is_empty() {
return;
}

let mut shared = self.shared.lock().unwrap();

println!("Untrack commit on {:?}", self.block_ids);
for block_id in &self.block_ids {
shared.remove_block(block_id);
}
}
}

// For semantics
Expand Down
86 changes: 70 additions & 16 deletions lib/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl Store {
block_expiration_tracker: self.block_expiration_tracker.read().await.clone(),
},
},
untrack_blocks: None,
})
}

Expand Down Expand Up @@ -435,6 +436,7 @@ impl DerefMut for ReadTransaction {

pub(crate) struct WriteTransaction {
inner: ReadTransaction,
untrack_blocks: Option<block_expiration_tracker::UntrackTransaction>,
}

impl WriteTransaction {
Expand All @@ -450,8 +452,21 @@ impl WriteTransaction {
index::update_summaries(db, cache, parent_hashes, UpdateSummaryReason::BlockRemoved)
.await?;

if let Some(tracker) = &self.block_expiration_tracker {
tracker.handle_block_removed(id);
let WriteTransaction {
inner:
ReadTransaction {
inner:
Reader {
block_expiration_tracker,
..
},
},
untrack_blocks,
} = self;

if let Some(tracker) = block_expiration_tracker {
let untrack_tx = untrack_blocks.get_or_insert_with(|| tracker.begin_untrack_blocks());
untrack_tx.untrack(*id);
}

Ok(())
Expand Down Expand Up @@ -588,11 +603,31 @@ impl WriteTransaction {
let inner = self.inner.inner.inner.into_write();
let cache = self.inner.inner.cache;

if cache.is_dirty() {
inner.commit_and_then(move || cache.commit()).await?;
} else {
inner.commit().await?;
}
match (cache.is_dirty(), self.untrack_blocks) {
(true, Some(tx)) => {
inner
.commit_and_then(move || {
cache.commit();
tx.commit();
})
.await?
}
(false, Some(tx)) => {
inner
.commit_and_then(move || {
tx.commit();
})
.await?
}
(true, None) => {
inner
.commit_and_then(move || {
cache.commit();
})
.await?
}
(false, None) => inner.commit().await?,
};

Ok(())
}
Expand Down Expand Up @@ -642,15 +677,34 @@ impl WriteTransaction {
let inner = self.inner.inner.inner.into_write();
let cache = self.inner.inner.cache;

if cache.is_dirty() {
let f = move || {
cache.commit();
f()
};
Ok(inner.commit_and_then(f).await?)
} else {
Ok(inner.commit_and_then(f).await?)
}
Ok(match (cache.is_dirty(), self.untrack_blocks) {
(true, Some(tx)) => {
inner
.commit_and_then(move || {
cache.commit();
tx.commit();
f()
})
.await?
}
(false, Some(tx)) => {
inner
.commit_and_then(move || {
tx.commit();
f()
})
.await?
}
(true, None) => {
inner
.commit_and_then(move || {
cache.commit();
f()
})
.await?
}
(false, None) => inner.commit_and_then(f).await?,
})
}

// Access the underlying database transaction.
Expand Down

0 comments on commit c07391e

Please sign in to comment.