Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] limit persistent index compaction by disk #36681

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,10 @@ CONF_Bool(enable_pindex_minor_compaction, "true");
// if l2 num is larger than this, stop doing async compaction,
// add this config to prevent l2 grow too large.
CONF_mInt64(max_allow_pindex_l2_num, "5");
// control the background compaction threads
CONF_mInt64(pindex_major_compaction_num_threads, "0");
// Number of max major compaction threads
CONF_mInt32(pindex_major_compaction_num_threads, "0");
// Limit of major compaction per disk.
CONF_mInt32(pindex_major_compaction_limit_per_disk, "2");
// control the persistent index schedule compaction interval
CONF_mInt64(pindex_major_compaction_schedule_interval_seconds, "15");
// control the local persistent index in shared_data gc/evict interval
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
PersistentIndexCompactionManager* mgr =
StorageEngine::instance()->update_manager()->get_pindex_compaction_mgr();
if (mgr != nullptr) {
(void)mgr->update_max_threads(config::pindex_major_compaction_num_threads);
const int max_pk_index_compaction_thread_cnt = std::max(1, config::pindex_major_compaction_num_threads);
(void)mgr->update_max_threads(max_pk_index_compaction_thread_cnt);
}
});
_config_callback.emplace("update_memory_limit_percent", [&]() {
Expand Down
6 changes: 4 additions & 2 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ void* StorageEngine::_pk_index_major_compaction_thread_callback(void* arg) {
ProfilerRegisterThread();
#endif
while (!_bg_worker_stopped.load(std::memory_order_consume)) {
SLEEP_IN_BG_WORKER(config::pindex_major_compaction_schedule_interval_seconds);
SLEEP_IN_BG_WORKER(1);
// schedule persistent index compaction
_update_manager->get_pindex_compaction_mgr()->schedule();
_update_manager->get_pindex_compaction_mgr()->schedule([&]() {
return StorageEngine::instance()->tablet_manager()->pick_tablets_to_do_pk_index_major_compaction();
});
}

return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4842,8 +4842,8 @@ StatusOr<EditVersion> PersistentIndex::_major_compaction_impl(
return new_l2_version;
}

static void modify_l2_versions(const std::vector<EditVersion>& input_l2_versions, const EditVersion& output_l2_version,
PersistentIndexMetaPB& index_meta) {
void PersistentIndex::modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta) {
// delete input l2 versions, and add output l2 version
std::vector<EditVersion> new_l2_versions;
std::vector<bool> new_l2_version_merged;
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ class PersistentIndex {

void reset_cancel_major_compaction();

static void modify_l2_versions(const std::vector<EditVersion>& input_l2_versions,
const EditVersion& output_l2_version, PersistentIndexMetaPB& index_meta);

protected:
Status _delete_expired_index_file(const EditVersion& l0_version, const EditVersion& l1_version,
const EditVersionWithMerge& min_l2_version);
Expand Down
61 changes: 38 additions & 23 deletions be/src/storage/persistent_index_compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PersistentIndexCompactionManager::~PersistentIndexCompactionManager() {
}

Status PersistentIndexCompactionManager::init() {
int max_pk_index_compaction_thread_cnt =
const int max_pk_index_compaction_thread_cnt =
config::pindex_major_compaction_num_threads > 0
? config::pindex_major_compaction_num_threads
: std::max((size_t)1, StorageEngine::instance()->get_store_num() * 2);
Expand All @@ -50,56 +50,71 @@ class PkIndexMajorCompactionTask : public Runnable {

void run() override {
WARN_IF_ERROR(_tablet->updates()->pk_index_major_compaction(), "Failed to run PkIndexMajorCompactionTask");
_mgr->unmark_running(_tablet->tablet_id());
_mgr->unmark_running(_tablet.get());
}

private:
TabletSharedPtr _tablet;
PersistentIndexCompactionManager* _mgr;
};

void PersistentIndexCompactionManager::schedule() {
if (_too_many_tasks()) {
// too many running tasks, stop schedule
LOG(WARNING) << "PersistentIndex compaction schedule failed, too many running tasks";
return;
}
std::vector<TabletAndScore> pick_tablets =
StorageEngine::instance()->tablet_manager()->pick_tablets_to_do_pk_index_major_compaction();
for (auto& tablet_score : pick_tablets) {
const int64_t tablet_id = tablet_score.first->tablet_id();
if (_need_skip(tablet_id)) {
void PersistentIndexCompactionManager::schedule(const std::function<std::vector<TabletAndScore>()>& pick_algo) {
update_ready_tablet_queue(pick_algo);
for (auto it = _ready_tablets_queue.begin(); it != _ready_tablets_queue.end();) {
auto& tablet_score = *it;
if (is_running(tablet_score.first.get())) {
// remove this tablet because it is already running
it = _ready_tablets_queue.erase(it);
continue;
}
if (disk_limit(tablet_score.first.get())) {
// skip it, may re-run it next round.
++it;
continue;
}
mark_running(tablet_id);
mark_running(tablet_score.first.get());
std::shared_ptr<Runnable> r = std::make_shared<PkIndexMajorCompactionTask>(tablet_score.first, this);
auto st = _worker_thread_pool->submit(std::move(r));
if (!st.ok()) {
unmark_running(tablet_id);
// Resource busy, break and quit
unmark_running(tablet_score.first.get());
LOG(ERROR) << strings::Substitute("submit pk index compaction task failed: $0", st.to_string());
break;
}
it = _ready_tablets_queue.erase(it);
}
}

void PersistentIndexCompactionManager::update_ready_tablet_queue(
const std::function<std::vector<TabletAndScore>()>& pick_algo) {
size_t current_time = time(nullptr);
if (current_time - _last_schedule_time > config::pindex_major_compaction_schedule_interval_seconds) {
// need re-schedule
_ready_tablets_queue = pick_algo();
_last_schedule_time = current_time;
}
}

void PersistentIndexCompactionManager::mark_running(int64_t tablet_id) {
void PersistentIndexCompactionManager::mark_running(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
_running_tablets.insert(tablet_id);
_running_tablets.insert(tablet->tablet_id());
_data_dir_to_task_num_map[tablet->data_dir()]++;
}

void PersistentIndexCompactionManager::unmark_running(int64_t tablet_id) {
void PersistentIndexCompactionManager::unmark_running(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
_running_tablets.erase(tablet_id);
_running_tablets.erase(tablet->tablet_id());
_data_dir_to_task_num_map[tablet->data_dir()]--;
}

bool PersistentIndexCompactionManager::_too_many_tasks() {
bool PersistentIndexCompactionManager::is_running(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
return _running_tablets.size() > MAX_RUNNING_TABLETS;
return _running_tablets.count(tablet->tablet_id()) > 0;
}

bool PersistentIndexCompactionManager::_need_skip(int64_t tablet_id) {
bool PersistentIndexCompactionManager::disk_limit(Tablet* tablet) {
std::lock_guard<std::mutex> guard(_mutex);
return _running_tablets.count(tablet_id) > 0;
return _data_dir_to_task_num_map[tablet->data_dir()] >= std::max(1, config::pindex_major_compaction_limit_per_disk);
}

Status PersistentIndexCompactionManager::update_max_threads(int max_threads) {
luohaha marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
24 changes: 17 additions & 7 deletions be/src/storage/persistent_index_compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "storage/olap_common.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"

namespace starrocks {

Expand All @@ -31,19 +32,28 @@ class PersistentIndexCompactionManager {
PersistentIndexCompactionManager() {}
~PersistentIndexCompactionManager();
Status init();
void schedule();
void mark_running(int64_t tablet_id);
void unmark_running(int64_t tablet_id);
void schedule(const std::function<std::vector<TabletAndScore>()>& pick_algo);
// Mark tablet is running and increase disk concurrency
void mark_running(Tablet* tablet);
// Mark tablet is no running and decrease disk concurrency
void unmark_running(Tablet* tablet);
// change the thread pool thread count
Status update_max_threads(int max_threads);
// Call pick algo function, and refresh ready tablet queue
void update_ready_tablet_queue(const std::function<std::vector<TabletAndScore>()>& pick_algo);
// Is tablet in running state
bool is_running(Tablet* tablet);
// Is tablet's disk out of concurrency limit
bool disk_limit(Tablet* tablet);

private:
bool _too_many_tasks();
bool _need_skip(int64_t tablet_id);
static const uint32_t MAX_RUNNING_TABLETS = 1000;

std::mutex _mutex;
// Sorted by prority
std::vector<TabletAndScore> _ready_tablets_queue;
std::unordered_set<int64_t> _running_tablets;
std::unique_ptr<ThreadPool> _worker_thread_pool;
std::unordered_map<DataDir*, uint64_t> _data_dir_to_task_num_map;
size_t _last_schedule_time = 0;
};

} // namespace starrocks
Loading
Loading