Skip to content

Commit

Permalink
Speedup based on number of files marked for compaction (#12306)
Browse files Browse the repository at this point in the history
Summary:
RocksDB self throttles per-DB compaction parallelism until it detects compaction pressure. This PR adds pressure detection based on the number of files marked for compaction.

Pull Request resolved: #12306

Reviewed By: cbi42

Differential Revision: D53200559

Pulled By: ajkr

fbshipit-source-id: 63402ee336881a4539204d255960f04338ab7a0e
  • Loading branch information
ajkr authored and facebook-github-bot committed Jan 30, 2024
1 parent 61ed0de commit aacf60d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
18 changes: 18 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,14 @@ uint64_t GetPendingCompactionBytesForCompactionSpeedup(
uint64_t size_threshold = bottommost_files_size / kBottommostSizeDivisor;
return std::min(size_threshold, slowdown_threshold);
}

uint64_t GetMarkedFileCountForCompactionSpeedup() {
// When just one file is marked, it is not clear that parallel compaction will
// help the compaction that the user nicely requested to happen sooner. When
// multiple files are marked, however, it is pretty clearly helpful, except
// for the rare case in which a single compaction grabs all the marked files.
return 2;
}
} // anonymous namespace

std::pair<WriteStallCondition, WriteStallCause>
Expand Down Expand Up @@ -1074,6 +1082,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"compaction "
"bytes %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes());
} else if (uint64_t(vstorage->FilesMarkedForCompaction().size()) >=
GetMarkedFileCountForCompactionSpeedup()) {
write_controller_token_ =
write_controller->GetCompactionPressureToken();
ROCKS_LOG_INFO(
ioptions_.logger,
"[%s] Increasing compaction threads because we have %" PRIu64
" files marked for compaction",
name_.c_str(),
uint64_t(vstorage->FilesMarkedForCompaction().size()));
} else {
write_controller_token_.reset();
}
Expand Down
69 changes: 69 additions & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,75 @@ TEST_P(ColumnFamilyTest, CompactionSpeedupForCompactionDebt) {
}
}

TEST_P(ColumnFamilyTest, CompactionSpeedupForMarkedFiles) {
const int kParallelismLimit = 3;
class AlwaysCompactTpc : public TablePropertiesCollector {
public:
Status Finish(UserCollectedProperties* /* properties */) override {
return Status::OK();
}

UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}

const char* Name() const override { return "AlwaysCompactTpc"; }

bool NeedCompact() const override { return true; }
};

class AlwaysCompactTpcf : public TablePropertiesCollectorFactory {
public:
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context /* context */) override {
return new AlwaysCompactTpc();
}

const char* Name() const override { return "AlwaysCompactTpcf"; }
};

column_family_options_.num_levels = 2;
column_family_options_.table_properties_collector_factories.emplace_back(
std::make_shared<AlwaysCompactTpcf>());
db_options_.max_background_compactions = kParallelismLimit;
Open();

// Make a nonempty last level. Only marked files in upper levels count.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Flush(FlushOptions()));
WaitForCompaction();
AssertFilesPerLevel("0,1", 0 /* cf */);

// Block the compaction thread pool so marked files accumulate in L0.
test::SleepingBackgroundTask sleeping_tasks[kParallelismLimit];
for (int i = 0; i < kParallelismLimit; i++) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_tasks[i], Env::Priority::LOW);
sleeping_tasks[i].WaitUntilSleeping();
}

// Zero marked upper-level files. No speedup.
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
AssertFilesPerLevel("0,1", 0 /* cf */);

// One marked upper-level file. No speedup.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
AssertFilesPerLevel("1,1", 0 /* cf */);

// Two marked upper-level files. Speedup.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(kParallelismLimit, dbfull()->TEST_BGCompactionsAllowed());
AssertFilesPerLevel("2,1", 0 /* cf */);

for (int i = 0; i < kParallelismLimit; i++) {
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}
}

TEST_P(ColumnFamilyTest, CreateAndDestroyOptions) {
std::unique_ptr<ColumnFamilyOptions> cfo(new ColumnFamilyOptions());
ColumnFamilyHandle* cfh;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Compactions can be scheduled in parallel in an additional scenario: multiple files are marked for compaction within a single column family

0 comments on commit aacf60d

Please sign in to comment.