Skip to content

Commit

Permalink
chore: add logging for compaction and vacuum. (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Mar 16, 2022
1 parent 591f67b commit 38fe36a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
8 changes: 8 additions & 0 deletions rust/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

pub use compactor_manager::*;
pub use hummock_manager::*;
use itertools::Itertools;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
pub use vacuum::*;
Expand Down Expand Up @@ -59,6 +60,11 @@ where
continue;
}
};
let input_ssts = compact_task
.input_ssts
.iter()
.flat_map(|v| v.level.as_ref().unwrap().table_ids.clone())
.collect_vec();
if !compactor_manager_ref
.try_assign_compact_task(Some(compact_task.clone()), None)
.await
Expand All @@ -71,7 +77,9 @@ where
{
tracing::warn!("failed to report_compact_task {}", e);
}
continue;
}
tracing::debug!("Try to compact SSTs {:?}", input_ssts);
}
});

Expand Down
14 changes: 8 additions & 6 deletions rust/storage/src/hummock/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use bytes::{Bytes, BytesMut};
use futures::stream::{self, StreamExt};
use futures::Future;
use itertools::Itertools;
use risingwave_common::config::StorageConfig;
use risingwave_common::error::RwError;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -288,12 +289,6 @@ impl Compactor {
) -> HummockResult<()> {
let result = Compactor::run_compact(context, &mut compact_task).await;
if result.is_err() {
for _sst_to_delete in &compact_task.sorted_output_ssts {
// TODO: delete these tables in (S3) storage
// However, if we request a table_id from hummock storage service every time we
// generate a table, we would not delete here, or we should notify
// hummock storage service to delete them.
}
compact_task.sorted_output_ssts.clear();
}

Expand Down Expand Up @@ -376,13 +371,20 @@ impl Compactor {
vacuum_task,
})) => {
if let Some(compact_task) = compact_task {
let input_ssts = compact_task
.input_ssts
.iter()
.flat_map(|v| v.level.as_ref().unwrap().table_ids.clone())
.collect_vec();
tracing::debug!("Try to compact SSTs {:?}", input_ssts);
if let Err(e) =
Compactor::compact(&sub_compact_context, compact_task).await
{
tracing::warn!("failed to compact. {}", RwError::from(e));
}
}
if let Some(vacuum_task) = vacuum_task {
tracing::debug!("Try to vacuum SSTs {:?}", vacuum_task.sstable_ids);
if let Err(e) = Vacuum::vacuum(
sstable_store.clone(),
vacuum_task,
Expand Down

0 comments on commit 38fe36a

Please sign in to comment.