Skip to content

Commit

Permalink
[Enhancement] Delete files with retry in shared data mode (#35566)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Zhu <[email protected]>
(cherry picked from commit 03df07a)
  • Loading branch information
sduzh authored and github-actions[bot] committed Nov 23, 2023
1 parent a14d866 commit f850d9a
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 4 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,9 @@ CONF_mInt64(experimental_lake_wait_per_get_ms, "0");
CONF_mInt64(experimental_lake_wait_per_delete_ms, "0");
CONF_mInt64(lake_publish_version_slow_log_ms, "1000");
CONF_mBool(lake_enable_publish_version_trace_log, "false");
CONF_mString(lake_vacuum_retry_pattern, "*request rate*");
CONF_mInt64(lake_vacuum_retry_max_attempts, "5");
CONF_mInt64(lake_vacuum_retry_min_delay_ms, "10");

CONF_mBool(dependency_librdkafka_debug_enable, "false");

Expand Down
2 changes: 2 additions & 0 deletions be/src/fs/fs_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "gutil/strings/substitute.h"
#include "gutil/strings/util.h"
#include "io/fd_input_stream.h"
#include "testutil/sync_point.h"
#include "util/errno.h"
#include "util/slice.h"

Expand Down Expand Up @@ -443,6 +444,7 @@ class PosixFileSystem : public FileSystem {
}

Status delete_file(const std::string& fname) override {
TEST_ERROR_POINT("PosixFileSystem::delete_file");
if (config::file_descriptor_cache_capacity > 0 && enable_fd_cache(fname)) {
FdCache::Instance()->erase(fname);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/staros_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ Status to_status(absl::Status absl_status) {
return Status::InvalidArgument(fmt::format("starlet err {}", absl_status.message()));
case absl::StatusCode::kNotFound:
return Status::NotFound(fmt::format("starlet err {}", absl_status.message()));
case absl::StatusCode::kResourceExhausted:
return Status::ResourceBusy(fmt::format("starlet err {}", absl_status.message()));
default:
return Status::InternalError(fmt::format("starlet err {}", absl_status.message()));
}
Expand Down
33 changes: 31 additions & 2 deletions be/src/storage/lake/vacuum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/config.h"
#include "common/status.h"
#include "fs/fs.h"
#include "gutil/strings/util.h"
#include "storage/lake/filenames.h"
#include "storage/lake/join_path.h"
#include "storage/lake/location_provider.h"
Expand Down Expand Up @@ -72,6 +73,35 @@ std::future<Status> completed_future(Status value) {
return p.get_future();
}

bool should_retry(const Status& st, int64_t attempted_retries) {
if (attempted_retries >= config::lake_vacuum_retry_max_attempts) {
return false;
}
if (st.is_resource_busy()) {
return true;
}
Slice message = st.message();
return MatchPattern(StringPiece(message.data, message.size), config::lake_vacuum_retry_pattern);
}

int64_t calculate_retry_delay(int64_t attempted_retries) {
int64_t min_delay = config::lake_vacuum_retry_min_delay_ms;
return min_delay * (1 << attempted_retries);
}

Status delete_files_with_retry(FileSystem* fs, const std::vector<std::string>& paths) {
for (int64_t attempted_retries = 0; /**/; attempted_retries++) {
auto st = fs->delete_files(paths);
if (!st.ok() && should_retry(st, attempted_retries)) {
int64_t delay = calculate_retry_delay(attempted_retries);
LOG(WARNING) << "Fail to delete: " << st << " will retry after " << delay << "ms";
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
} else {
return st;
}
}
}

// Batch delete files with specified FileSystem object |fs|
Status do_delete_files(FileSystem* fs, const std::vector<std::string>& paths) {
if (UNLIKELY(paths.empty())) {
Expand All @@ -90,8 +120,7 @@ Status do_delete_files(FileSystem* fs, const std::vector<std::string>& paths) {
}

auto t0 = butil::gettimeofday_us();
auto st = fs->delete_files(paths);
TEST_SYNC_POINT_CALLBACK("vacuum.delete_files", &st);
auto st = delete_files_with_retry(fs, paths);
if (st.ok()) {
auto t1 = butil::gettimeofday_us();
g_del_file_latency << (t1 - t0);
Expand Down
122 changes: 120 additions & 2 deletions be/test/storage/lake/vacuum_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ TEST_P(LakeVacuumTest, test_delete_file_failed) {
}
)DEL")));

SyncPoint::GetInstance()->SetCallBack("vacuum.delete_files", [](void* arg) {
SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [](void* arg) {
auto st = (Status*)arg;
EXPECT_TRUE(st->ok()) << *st;
st->update(Status::IOError("injected error"));
Expand All @@ -921,7 +921,7 @@ TEST_P(LakeVacuumTest, test_delete_file_failed) {
EXPECT_TRUE(file_exist(tablet_metadata_filename(500, 2)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(500, 3)));

SyncPoint::GetInstance()->ClearCallBack("vacuum.delete_files");
SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file");
SyncPoint::GetInstance()->DisableProcessing();
}

Expand Down Expand Up @@ -1211,4 +1211,122 @@ TEST(LakeVacuumTest2, test_delete_files_thread_pool_full) {
delete_files_async({"any_non_exist_file"});
}

TEST(LakeVacuumTest2, test_delete_files_retry) {
WritableFileOptions options;
options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE;
ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry.txt"));
ASSERT_OK(f1->append("111"));
ASSERT_OK(f1->close());

int attempts = 0;
SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) {
if (attempts++ < 2) {
auto st = (Status*)arg;
EXPECT_TRUE(st->ok()) << *st;
st->update(Status::InternalError("Reduce your request rate"));
}
});
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([&]() {
SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file");
SyncPoint::GetInstance()->DisableProcessing();
});

auto future = delete_files_callable({"test_vacuum_delete_files_retry.txt"});
ASSERT_TRUE(future.valid());
ASSERT_TRUE(future.get().ok());
ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt"));
EXPECT_GT(attempts, 1);
}

TEST(LakeVacuumTest2, test_delete_files_retry2) {
WritableFileOptions options;
options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE;
ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry2.txt"));
ASSERT_OK(f1->append("111"));
ASSERT_OK(f1->close());

auto backup = config::lake_vacuum_retry_pattern;
config::lake_vacuum_retry_pattern = ""; // Disable retry
DeferOp defer0([&]() { config::lake_vacuum_retry_pattern = backup; });

int attempts = 0;
SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) {
auto st = (Status*)arg;
EXPECT_TRUE(st->ok()) << *st;
st->update(Status::InternalError("Reduce your request rate"));
});
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([&]() {
attempts++;
SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file");
SyncPoint::GetInstance()->DisableProcessing();
});

auto future2 = delete_files_callable({"test_vacuum_delete_files_retry2.txt"});
ASSERT_TRUE(future2.valid());
ASSERT_FALSE(future2.get().ok());
ASSERT_TRUE(fs::path_exist("test_vacuum_delete_files_retry2.txt"));
EXPECT_EQ(0, attempts);
}

TEST(LakeVacuumTest2, test_delete_files_retry3) {
WritableFileOptions options;
options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE;
ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry3.txt"));
ASSERT_OK(f1->append("111"));
ASSERT_OK(f1->close());

auto backup = config::lake_vacuum_retry_max_attempts;
config::lake_vacuum_retry_max_attempts = 0; // Disable retry
DeferOp defer0([&]() { config::lake_vacuum_retry_max_attempts = backup; });

int attempts = 0;
SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) {
auto st = (Status*)arg;
EXPECT_TRUE(st->ok()) << *st;
st->update(Status::InternalError("Reduce your request rate"));
});
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([&]() {
attempts++;
SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file");
SyncPoint::GetInstance()->DisableProcessing();
});

auto future = delete_files_callable({"test_vacuum_delete_files_retry3.txt"});
ASSERT_TRUE(future.valid());
ASSERT_FALSE(future.get().ok());
ASSERT_TRUE(fs::path_exist("test_vacuum_delete_files_retry3.txt"));
EXPECT_EQ(0, attempts);
}

TEST(LakeVacuumTest2, test_delete_files_retry4) {
WritableFileOptions options;
options.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE;
ASSIGN_OR_ABORT(auto f1, fs::new_writable_file(options, "test_vacuum_delete_files_retry.txt"));
ASSERT_OK(f1->append("111"));
ASSERT_OK(f1->close());

int attempts = 0;
SyncPoint::GetInstance()->SetCallBack("PosixFileSystem::delete_file", [&](void* arg) {
if (attempts++ < 2) {
auto st = (Status*)arg;
EXPECT_TRUE(st->ok()) << *st;
st->update(Status::ResourceBusy(""));
}
});
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([&]() {
SyncPoint::GetInstance()->ClearCallBack("PosixFileSystem::delete_file");
SyncPoint::GetInstance()->DisableProcessing();
});

auto future = delete_files_callable({"test_vacuum_delete_files_retry.txt"});
ASSERT_TRUE(future.valid());
ASSERT_TRUE(future.get().ok());
ASSERT_FALSE(fs::path_exist("test_vacuum_delete_files_retry.txt"));
EXPECT_GT(attempts, 1);
}

} // namespace starrocks::lake

0 comments on commit f850d9a

Please sign in to comment.