From f850d9a02dbf630be1a9ed1e260831201b4ff343 Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Thu, 23 Nov 2023 11:08:02 +0800 Subject: [PATCH] [Enhancement] Delete files with retry in shared data mode (#35566) Signed-off-by: Alex Zhu (cherry picked from commit 03df07a4f6e3b8ddba43805acce58bd12ead8860) --- be/src/common/config.h | 3 + be/src/fs/fs_posix.cpp | 2 + be/src/service/staros_worker.cpp | 2 + be/src/storage/lake/vacuum.cpp | 33 +++++++- be/test/storage/lake/vacuum_test.cpp | 122 ++++++++++++++++++++++++++- 5 files changed, 158 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index e3910eb889775..af88dac30ed1d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -925,6 +925,9 @@ CONF_mInt64(experimental_lake_wait_per_get_ms, "0"); CONF_mInt64(experimental_lake_wait_per_delete_ms, "0"); CONF_mInt64(lake_publish_version_slow_log_ms, "1000"); CONF_mBool(lake_enable_publish_version_trace_log, "false"); +CONF_mString(lake_vacuum_retry_pattern, "*request rate*"); +CONF_mInt64(lake_vacuum_retry_max_attempts, "5"); +CONF_mInt64(lake_vacuum_retry_min_delay_ms, "10"); CONF_mBool(dependency_librdkafka_debug_enable, "false"); diff --git a/be/src/fs/fs_posix.cpp b/be/src/fs/fs_posix.cpp index 5b145640ab6de..5a4ac019fea68 100644 --- a/be/src/fs/fs_posix.cpp +++ b/be/src/fs/fs_posix.cpp @@ -29,6 +29,7 @@ #include "gutil/strings/substitute.h" #include "gutil/strings/util.h" #include "io/fd_input_stream.h" +#include "testutil/sync_point.h" #include "util/errno.h" #include "util/slice.h" @@ -443,6 +444,7 @@ class PosixFileSystem : public FileSystem { } Status delete_file(const std::string& fname) override { + TEST_ERROR_POINT("PosixFileSystem::delete_file"); if (config::file_descriptor_cache_capacity > 0 && enable_fd_cache(fname)) { FdCache::Instance()->erase(fname); } diff --git a/be/src/service/staros_worker.cpp b/be/src/service/staros_worker.cpp index c74f852908f86..5cfab36b2ad83 100644 --- a/be/src/service/staros_worker.cpp +++ b/be/src/service/staros_worker.cpp @@ -405,6 +405,8 @@ Status to_status(absl::Status absl_status) { return Status::InvalidArgument(fmt::format("starlet err {}", absl_status.message())); case absl::StatusCode::kNotFound: return Status::NotFound(fmt::format("starlet err {}", absl_status.message())); + case absl::StatusCode::kResourceExhausted: + return Status::ResourceBusy(fmt::format("starlet err {}", absl_status.message())); default: return Status::InternalError(fmt::format("starlet err {}", absl_status.message())); } diff --git a/be/src/storage/lake/vacuum.cpp b/be/src/storage/lake/vacuum.cpp index 977cd41de427d..42ad1fa82288b 100644 --- a/be/src/storage/lake/vacuum.cpp +++ b/be/src/storage/lake/vacuum.cpp @@ -23,6 +23,7 @@ #include "common/config.h" #include "common/status.h" #include "fs/fs.h" +#include "gutil/strings/util.h" #include "storage/lake/filenames.h" #include "storage/lake/join_path.h" #include "storage/lake/location_provider.h" @@ -72,6 +73,35 @@ std::future completed_future(Status value) { return p.get_future(); } +bool should_retry(const Status& st, int64_t attempted_retries) { + if (attempted_retries >= config::lake_vacuum_retry_max_attempts) { + return false; + } + if (st.is_resource_busy()) { + return true; + } + Slice message = st.message(); + return MatchPattern(StringPiece(message.data, message.size), config::lake_vacuum_retry_pattern); +} + +int64_t calculate_retry_delay(int64_t attempted_retries) { + int64_t min_delay = config::lake_vacuum_retry_min_delay_ms; + return min_delay * (1 << attempted_retries); +} + +Status delete_files_with_retry(FileSystem* fs, const std::vector& paths) { + for (int64_t attempted_retries = 0; /**/; attempted_retries++) { + auto st = fs->delete_files(paths); + if (!st.ok() && should_retry(st, attempted_retries)) { + int64_t delay = calculate_retry_delay(attempted_retries); + LOG(WARNING) << "Fail to delete: " << st << " will retry after " << delay << "ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(delay)); + } else { + return st; + } + } +} + // Batch delete files with specified FileSystem object |fs| Status do_delete_files(FileSystem* fs, const std::vector& paths) { if (UNLIKELY(paths.empty())) { @@ -90,8 +120,7 @@ Status do_delete_files(FileSystem* fs, const std::vector& paths) { } auto t0 = butil::gettimeofday_us(); - auto st = fs->delete_files(paths); - TEST_SYNC_POINT_CALLBACK("vacuum.delete_files", &st); + auto st = delete_files_with_retry(fs, paths); if (st.ok()) { auto t1 = butil::gettimeofday_us(); g_del_file_latency << (t1 - t0); diff --git a/be/test/storage/lake/vacuum_test.cpp b/be/test/storage/lake/vacuum_test.cpp index 86e208fadcdd2..4bbc56494f194 100644 --- a/be/test/storage/lake/vacuum_test.cpp +++ b/be/test/storage/lake/vacuum_test.cpp @@ -896,7 +896,7 @@ TEST_P(LakeVacuumTest, test_delete_file_failed) { } )DEL"))); - SyncPoint::GetInstance()->SetCallBack("vacuum.delete_files", [](void* arg) { + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [](void* arg) { auto st = (Status*)arg; EXPECT_TRUE(st->ok()) << *st; st->update(Status::IOError("injected error")); @@ -921,7 +921,7 @@ TEST_P(LakeVacuumTest, test_delete_file_failed) { EXPECT_TRUE(file_exist(tablet_metadata_filename(500, 2))); EXPECT_TRUE(file_exist(tablet_metadata_filename(500, 3))); - SyncPoint::GetInstance()->ClearCallBack("vacuum.delete_files"); + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); SyncPoint::GetInstance()->DisableProcessing(); } @@ -1211,4 +1211,122 @@ TEST(LakeVacuumTest2, test_delete_files_thread_pool_full) { delete_files_async({"any_non_exist_file"}); } +TEST(LakeVacuumTest2, test_delete_files_retry) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + if (attempts++ < 2) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::InternalError("Reduce your request rate")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future = delete_files_callable({"test_vacuum_delete_files_retry.txt"}); + ASSERT_TRUE(future.valid()); + ASSERT_TRUE(future.get().ok()); + ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt")); + EXPECT_GT(attempts, 1); +} + +TEST(LakeVacuumTest2, test_delete_files_retry2) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry2.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + auto backup = config::lake_vacuum_retry_pattern; + config::lake_vacuum_retry_pattern = ""; // Disable retry + DeferOp defer0([&]() { config::lake_vacuum_retry_pattern = backup; }); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::InternalError("Reduce your request rate")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + attempts++; + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future2 = delete_files_callable({"test_vacuum_delete_files_retry2.txt"}); + ASSERT_TRUE(future2.valid()); + ASSERT_FALSE(future2.get().ok()); + ASSERT_TRUE(fs::path_exist("test_vacuum_delete_files_retry2.txt")); + EXPECT_EQ(0, attempts); +} + +TEST(LakeVacuumTest2, test_delete_files_retry3) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry3.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + auto backup = config::lake_vacuum_retry_max_attempts; + config::lake_vacuum_retry_max_attempts = 0; // Disable retry + DeferOp defer0([&]() { config::lake_vacuum_retry_max_attempts = backup; }); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::InternalError("Reduce your request rate")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + attempts++; + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future = delete_files_callable({"test_vacuum_delete_files_retry3.txt"}); + ASSERT_TRUE(future.valid()); + ASSERT_FALSE(future.get().ok()); + ASSERT_TRUE(fs::path_exist("test_vacuum_delete_files_retry3.txt")); + EXPECT_EQ(0, attempts); +} + +TEST(LakeVacuumTest2, test_delete_files_retry4) { + WritableFileOptions options; + options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE; + ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry.txt")); + ASSERT_OK(f1->append("111")); + ASSERT_OK(f1->close()); + + int attempts = 0; + SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) { + if (attempts++ < 2) { + auto st = (Status*)arg; + EXPECT_TRUE(st->ok()) << *st; + st->update(Status::ResourceBusy("")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + DeferOp defer([&]() { + SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file"); + SyncPoint::GetInstance()->DisableProcessing(); + }); + + auto future = delete_files_callable({"test_vacuum_delete_files_retry.txt"}); + ASSERT_TRUE(future.valid()); + ASSERT_TRUE(future.get().ok()); + ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt")); + EXPECT_GT(attempts, 1); +} + } // namespace starrocks::lake