From 5e2413fad252138269080449ec841e1cb4ea7acb Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Wed, 22 Nov 2023 12:48:40 +0800 Subject: [PATCH 1/4] [Enhancement] Delete files with retry Signed-off-by: Alex Zhu --- be/src/common/config.h | 3 +++ be/src/fs/fs_posix.cpp | 2 ++ be/src/storage/lake/vacuum.cpp | 31 +++++++++++++++++++++++-- be/test/storage/lake/vacuum_test.cpp | 34 ++++++++++++++++++++++++++-- 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 82aec267a7edc..b28b11950961b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -913,6 +913,9 @@ CONF_mInt64(experimental_lake_wait_per_get_ms, "0"); CONF_mInt64(experimental_lake_wait_per_delete_ms, "0"); CONF_mInt64(lake_publish_version_slow_log_ms, "1000"); CONF_mBool(lake_enable_publish_version_trace_log, "false"); +CONF_mString(lake_vacuum_retry_pattern, "*request rate*"); +CONF_mInt64(lake_vacuum_retry_max_attempts, "5"); +CONF_mInt64(lake_vacuum_retry_min_delay, "10"); CONF_mBool(dependency_librdkafka_debug_enable, "false"); diff --git a/be/src/fs/fs_posix.cpp b/be/src/fs/fs_posix.cpp index 5b145640ab6de..5a4ac019fea68 100644 --- a/be/src/fs/fs_posix.cpp +++ b/be/src/fs/fs_posix.cpp @@ -29,6 +29,7 @@ #include "gutil/strings/substitute.h" #include "gutil/strings/util.h" #include "io/fd_input_stream.h" +#include "testutil/sync_point.h" #include "util/errno.h" #include "util/slice.h" @@ -443,6 +444,7 @@ class PosixFileSystem : public FileSystem { } Status delete_file(const std::string& fname) override { + TEST_ERROR_POINT("PosixFileSystem::delete_file"); if (config::file_descriptor_cache_capacity > 0 && enable_fd_cache(fname)) { FdCache::Instance()->erase(fname); } diff --git a/be/src/storage/lake/vacuum.cpp b/be/src/storage/lake/vacuum.cpp index 977cd41de427d..e709f9875eae9 100644 --- a/be/src/storage/lake/vacuum.cpp +++ b/be/src/storage/lake/vacuum.cpp @@ -34,6 +34,7 @@ #include "testutil/sync_point.h" #include "util/defer_op.h" #include "util/raw_container.h" +#include "gutil/strings/util.h" namespace starrocks::lake { @@ -72,6 +73,33 @@ std::future completed_future(Status value) { return p.get_future(); } +bool should_retry(const Status& st, int64_t attempted_retries) { + if (attempted_retries >= config::lake_vacuum_retry_max_attempts) { + return false; + } + Slice message = st.message(); + return MatchPattern(StringPiece(message.data, message.size), config::lake_vacuum_retry_pattern); +} + +int64_t calculate_retry_delay(int64_t attempted_retries) { + int64_t min_delay = config::lake_vacuum_retry_min_delay; + return min_delay * (1 << attempted_retries); +} + +Status delete_files_with_retry(FileSystem*fs, const std::vector& paths) { + for (int64_t attempted_retries = 0; /**/; attempted_retries++) { + auto st = fs->delete_files(paths); + if (!st.ok() && should_retry(st, attempted_retries)) { + int64_t delay = calculate_retry_delay(attempted_retries); + LOG(WARNING) << "Fail to delete: " << st << " will retry after " << delay << "ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(delay)); + } else { + LOG_IF(WARNING, !st.ok()) << "Fail to delete: " << st; + return st; + } + } +} + // Batch delete files with specified FileSystem object |fs| Status do_delete_files(FileSystem* fs, const std::vector& paths) { if (UNLIKELY(paths.empty())) { @@ -90,8 +118,7 @@ Status do_delete_files(FileSystem* fs, const std::vector& paths) { } auto t0 = butil::gettimeofday_us(); - auto st = fs->delete_files(paths); - TEST_SYNC_POINT_CALLBACK("vacuum.delete_files", &st); + auto st = delete_files_with_retry(fs, paths); if (st.ok()) { auto t1 = butil::gettimeofday_us(); g_del_file_latency << (t1 - t0); diff --git a/be/test/storage/lake/vacuum_test.cpp b/be/test/storage/lake/vacuum_test.cpp index 86e208fadcdd2..2a2753e1532aa 100644 --- a/be/test/storage/lake/vacuum_test.cpp +++ b/be/test/storage/lake/vacuum_test.cpp @@ -896,7 +896,7 @@ TEST_P(LakeVacuumTest, test_delete_file_failed) { } )DEL"))); - SyncPoint::GetInstance()->SetCallBack("vacuum.delete_files", [](void* arg) { + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [](void* arg) { auto st = (Status*)arg; EXPECT_TRUE(st->ok()) << *st; st->update(Status::IOError("injected error")); @@ -921,7 +921,7 @@ TEST_P(LakeVacuumTest, test_delete_file_failed) { EXPECT_TRUE(file_exist(tablet_metadata_filename(500, 2))); EXPECT_TRUE(file_exist(tablet_metadata_filename(500, 3))); - SyncPoint::GetInstance()->ClearCallBack("vacuum.delete_files"); + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); SyncPoint::GetInstance()->DisableProcessing(); } @@ -1211,4 +1211,34 @@ TEST(LakeVacuumTest2, test_delete_files_thread_pool_full) { delete_files_async({"any_non_exist_file"}); } +TEST(LakeVacuumTest2, test_delete_files_retry) { + auto future = delete_files_callable({}); + ASSERT_TRUE(future.valid()); + ASSERT_TRUE(future.get().ok()); + + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file("test_vacuum_delete_files_retry.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + if (attempts++ < 2) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::InternalError("Reduce your request rate")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future2 = delete_files_callable({"test_vacuum_delete_files_retry.txt"}); + ASSERT_TRUE(future2.valid()); + ASSERT_TRUE(future2.get().ok()); + ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt")); + EXPECT_GT(attempts, 1); +} + } // namespace starrocks::lake From 4aba6bf369673a0538c148462d8732efb5831b8c Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Wed, 22 Nov 2023 13:15:31 +0800 Subject: [PATCH 2/4] [UT] more tests Signed-off-by: Alex Zhu --- be/src/storage/lake/vacuum.cpp | 4 +- be/test/storage/lake/vacuum_test.cpp | 76 +++++++++++++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/be/src/storage/lake/vacuum.cpp b/be/src/storage/lake/vacuum.cpp index e709f9875eae9..70cf5fa62b0f9 100644 --- a/be/src/storage/lake/vacuum.cpp +++ b/be/src/storage/lake/vacuum.cpp @@ -23,6 +23,7 @@ #include "common/config.h" #include "common/status.h" #include "fs/fs.h" +#include "gutil/strings/util.h" #include "storage/lake/filenames.h" #include "storage/lake/join_path.h" #include "storage/lake/location_provider.h" @@ -34,7 +35,6 @@ #include "testutil/sync_point.h" #include "util/defer_op.h" #include "util/raw_container.h" -#include "gutil/strings/util.h" namespace starrocks::lake { @@ -86,7 +86,7 @@ int64_t calculate_retry_delay(int64_t attempted_retries) { return min_delay * (1 << attempted_retries); } -Status delete_files_with_retry(FileSystem*fs, const std::vector& paths) { +Status delete_files_with_retry(FileSystem* fs, const std::vector& paths) { for (int64_t attempted_retries = 0; /**/; attempted_retries++) { auto st = fs->delete_files(paths); if (!st.ok() && should_retry(st, attempted_retries)) { diff --git a/be/test/storage/lake/vacuum_test.cpp b/be/test/storage/lake/vacuum_test.cpp index 2a2753e1532aa..c9552f079dc85 100644 --- a/be/test/storage/lake/vacuum_test.cpp +++ b/be/test/storage/lake/vacuum_test.cpp @@ -1212,11 +1212,9 @@ TEST(LakeVacuumTest2, test_delete_files_thread_pool_full) { } TEST(LakeVacuumTest2, test_delete_files_retry) { - auto future = delete_files_callable({}); - ASSERT_TRUE(future.valid()); - ASSERT_TRUE(future.get().ok()); - - ASSIGN_OR_ABORT(auto f1, fs::new_writable_file("test_vacuum_delete_files_retry.txt")); + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry.txt")); ASSERT_OK(f1->append("111")); ASSERT_OK(f1->close()); @@ -1234,11 +1232,73 @@ TEST(LakeVacuumTest2, test_delete_files_retry) { SyncPoint::GetInstance()->DisableProcessing(); }); - auto future2 = delete_files_callable({"test_vacuum_delete_files_retry.txt"}); - ASSERT_TRUE(future2.valid()); - ASSERT_TRUE(future2.get().ok()); + auto future = delete_files_callable({"test_vacuum_delete_files_retry.txt"}); + ASSERT_TRUE(future.valid()); + ASSERT_TRUE(future.get().ok()); ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt")); EXPECT_GT(attempts, 1); } +TEST(LakeVacuumTest2, test_delete_files_retry2) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry2.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + auto backup = config::lake_vacuum_retry_pattern; + config::lake_vacuum_retry_pattern = ""; // Disable retry + DeferOp defer0([&]() { config::lake_vacuum_retry_pattern = backup; }); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::InternalError("Reduce your request rate")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + attempts++; + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future2 = delete_files_callable({"test_vacuum_delete_files_retry2.txt"}); + ASSERT_TRUE(future2.valid()); + ASSERT_FALSE(future2.get().ok()); + ASSERT_TRUE(fs::path_exist("test_vacuum_delete_files_retry2.txt")); + EXPECT_EQ(0, attempts); +} + +TEST(LakeVacuumTest2, test_delete_files_retry3) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry3.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + auto backup = config::lake_vacuum_retry_max_attempts; + config::lake_vacuum_retry_max_attempts = 0; // Disable retry + DeferOp defer0([&]() { config::lake_vacuum_retry_max_attempts = backup; }); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::InternalError("Reduce your request rate")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + attempts++; + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future = delete_files_callable({"test_vacuum_delete_files_retry3.txt"}); + ASSERT_TRUE(future.valid()); + ASSERT_FALSE(future.get().ok()); + ASSERT_TRUE(fs::path_exist("test_vacuum_delete_files_retry3.txt")); + EXPECT_EQ(0, attempts); +} + } // namespace starrocks::lake From 9b2b246bf460e4db91157fbf916652644e8dcd66 Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Wed, 22 Nov 2023 14:14:23 +0800 Subject: [PATCH 3/4] [Refactor] Check ResourceBusy error Signed-off-by: Alex Zhu --- be/src/service/staros_worker.cpp | 2 ++ be/src/storage/lake/vacuum.cpp | 3 +++ be/test/storage/lake/vacuum_test.cpp | 28 ++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/be/src/service/staros_worker.cpp b/be/src/service/staros_worker.cpp index d882dc11ee760..f5c03f0eb05a0 100644 --- a/be/src/service/staros_worker.cpp +++ b/be/src/service/staros_worker.cpp @@ -329,6 +329,8 @@ Status to_status(const absl::Status& absl_status) { return Status::InvalidArgument(fmt::format("starlet err {}", absl_status.message())); case absl::StatusCode::kNotFound: return Status::NotFound(fmt::format("starlet err {}", absl_status.message())); + case absl::StatusCode::kResourceExhausted: + return Status::ResourceBusy(fmt::format("starlet err {}", absl_status.message())); default: return Status::InternalError(fmt::format("starlet err {}", absl_status.message())); } diff --git a/be/src/storage/lake/vacuum.cpp b/be/src/storage/lake/vacuum.cpp index 70cf5fa62b0f9..f95f4199ee6a7 100644 --- a/be/src/storage/lake/vacuum.cpp +++ b/be/src/storage/lake/vacuum.cpp @@ -77,6 +77,9 @@ bool should_retry(const Status& st, int64_t attempted_retries) { if (attempted_retries >= config::lake_vacuum_retry_max_attempts) { return false; } + if (st.is_resource_busy()) { + return true; + } Slice message = st.message(); return MatchPattern(StringPiece(message.data, message.size), config::lake_vacuum_retry_pattern); } diff --git a/be/test/storage/lake/vacuum_test.cpp b/be/test/storage/lake/vacuum_test.cpp index c9552f079dc85..4bbc56494f194 100644 --- a/be/test/storage/lake/vacuum_test.cpp +++ b/be/test/storage/lake/vacuum_test.cpp @@ -1301,4 +1301,32 @@ TEST(LakeVacuumTest2, test_delete_files_retry3) { EXPECT_EQ(0, attempts); } +TEST(LakeVacuumTest2, test_delete_files_retry4) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + if (attempts++ < 2) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::ResourceBusy("")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future = delete_files_callable({"test_vacuum_delete_files_retry.txt"}); + ASSERT_TRUE(future.valid()); + ASSERT_TRUE(future.get().ok()); + ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt")); + EXPECT_GT(attempts, 1); +} + } // namespace starrocks::lake From 79e53688288a76388cbe57994e39f1015d8b8f36 Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Wed, 22 Nov 2023 20:43:58 +0800 Subject: [PATCH 4/4] [Refactor] Rename Signed-off-by: Alex Zhu --- be/src/common/config.h | 2 +- be/src/storage/lake/vacuum.cpp | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index b28b11950961b..dc3022be5b731 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -915,7 +915,7 @@ CONF_mInt64(lake_publish_version_slow_log_ms, "1000"); CONF_mBool(lake_enable_publish_version_trace_log, "false"); CONF_mString(lake_vacuum_retry_pattern, "*request rate*"); CONF_mInt64(lake_vacuum_retry_max_attempts, "5"); -CONF_mInt64(lake_vacuum_retry_min_delay, "10"); +CONF_mInt64(lake_vacuum_retry_min_delay_ms, "10"); CONF_mBool(dependency_librdkafka_debug_enable, "false"); diff --git a/be/src/storage/lake/vacuum.cpp b/be/src/storage/lake/vacuum.cpp index f95f4199ee6a7..42ad1fa82288b 100644 --- a/be/src/storage/lake/vacuum.cpp +++ b/be/src/storage/lake/vacuum.cpp @@ -85,7 +85,7 @@ bool should_retry(const Status& st, int64_t attempted_retries) { } int64_t calculate_retry_delay(int64_t attempted_retries) { - int64_t min_delay = config::lake_vacuum_retry_min_delay; + int64_t min_delay = config::lake_vacuum_retry_min_delay_ms; return min_delay * (1 << attempted_retries); } @@ -97,7 +97,6 @@ Status delete_files_with_retry(FileSystem* fs, const std::vector& p LOG(WARNING) << "Fail to delete: " << st << " will retry after " << delay << "ms"; std::this_thread::sleep_for(std::chrono::milliseconds(delay)); } else { - LOG_IF(WARNING, !st.ok()) << "Fail to delete: " << st; return st; } }