Skip to content

Commit

Permalink
perf(hummmock manager): remove unnecessary hummock version deltas (#3689
Browse files Browse the repository at this point in the history
)

* perf(hummmock manager): remove unnecessary hummock version deltas

* delete multiple version deltas in one trx

* do not use vartrx for hummock version

* do not use vartrx for hummockversion in commit_epoch

* remove checkpoint_version_id

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
soundOfDestiny and mergify[bot] authored Jul 11, 2022
1 parent 208ab51 commit 00422c6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
49 changes: 33 additions & 16 deletions src/meta/src/hummock/hummock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::hummock::model::{
};
use crate::hummock::CompactorManagerRef;
use crate::manager::{IdCategory, MetaSrvEnv};
use crate::model::{MetadataModel, ValTransaction, VarTransaction};
use crate::model::{MetadataModel, Transactional, ValTransaction, VarTransaction};
use crate::rpc::metrics::MetaMetrics;
use crate::rpc::{META_CF_NAME, META_LEADER_KEY};
use crate::storage::{MetaStore, Transaction};
Expand Down Expand Up @@ -773,7 +773,6 @@ where
let old_version = versioning_guard.current_version();
let versioning = versioning_guard.deref_mut();
let mut current_version_id = VarTransaction::new(&mut versioning.current_version_id);
let mut hummock_versions = VarTransaction::new(&mut versioning.hummock_versions);
let mut hummock_version_deltas =
VarTransaction::new(&mut versioning.hummock_version_deltas);
let mut stale_sstables = VarTransaction::new(&mut versioning.stale_sstables);
Expand Down Expand Up @@ -818,7 +817,6 @@ where
current_version_id.increase();
new_version.id = current_version_id.id();
version_delta.id = current_version_id.id();
hummock_versions.insert(new_version.id, new_version);
hummock_version_deltas.insert(version_delta.id, version_delta);

for SstableInfo { id: ref sst_id, .. } in &compact_task.sorted_output_ssts {
Expand All @@ -845,7 +843,9 @@ where
version_stale_sstables,
sstable_id_infos
)?;
hummock_versions.commit();
versioning
.hummock_versions
.insert(new_version.id, new_version);
} else {
// The compaction task is cancelled.
commit_multi_var!(
Expand Down Expand Up @@ -921,11 +921,11 @@ where
let old_version = versioning_guard.current_version();
let versioning = versioning_guard.deref_mut();
let mut current_version_id = VarTransaction::new(&mut versioning.current_version_id);
let mut hummock_versions = VarTransaction::new(&mut versioning.hummock_versions);
let mut hummock_version_deltas =
VarTransaction::new(&mut versioning.hummock_version_deltas);
let mut sstable_id_infos = VarTransaction::new(&mut versioning.sstable_id_infos);
current_version_id.increase();
let new_version_id = current_version_id.id();
let mut new_version_delta = hummock_version_deltas.new_entry_txn_or_default(
current_version_id.id(),
HummockVersionDelta {
Expand All @@ -934,8 +934,7 @@ where
..Default::default()
},
);
let mut new_hummock_version =
hummock_versions.new_entry_txn_or_default(current_version_id.id(), old_version);
let mut new_hummock_version = old_version.clone();
new_hummock_version.id = current_version_id.id();
new_version_delta.id = current_version_id.id();
if epoch <= new_hummock_version.max_committed_epoch {
Expand Down Expand Up @@ -1018,7 +1017,9 @@ where
current_version_id,
sstable_id_infos
)?;
new_hummock_version.commit();
versioning
.hummock_versions
.insert(new_version_id, new_hummock_version);
self.max_committed_epoch.store(epoch, Ordering::Release);

// Update metrics
Expand Down Expand Up @@ -1153,14 +1154,30 @@ where
}

pub async fn proceed_version_checkpoint(&self) -> risingwave_common::error::Result<()> {
let versioning_guard = self.versioning.read().await;
let new_checkpoint = versioning_guard
.hummock_versions
.first_key_value()
.unwrap()
.1
.clone();
new_checkpoint.insert(self.env.meta_store()).await?;
let mut version_deltas_to_delete = BTreeMap::new();

{
let mut versioning_guard = self.versioning.write().await;
let new_checkpoint = versioning_guard
.hummock_versions
.first_key_value()
.unwrap()
.1
.clone();
new_checkpoint.insert(self.env.meta_store()).await?;

version_deltas_to_delete.append(&mut versioning_guard.hummock_version_deltas);
versioning_guard.hummock_version_deltas =
version_deltas_to_delete.split_off(&(new_checkpoint.id + 1));
}

let mut trx = Transaction::default();
for (_, version_delta_item) in version_deltas_to_delete {
version_delta_item.delete_in_transaction(&mut trx)?;
}
self.commit_trx(self.env.meta_store(), trx, None, self.env.get_leader_info())
.await?;

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ where
hummock_manager.delete_versions(&versions_to_delete).await?;
versions_to_delete.clear();
}
hummock_manager.proceed_version_checkpoint().await.unwrap();
Ok(vacuum_count as u64)
}

Expand Down

0 comments on commit 00422c6

Please sign in to comment.