Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] configure S3 client rename_file operation timeout (backport #48860) #49706

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,9 @@ CONF_Int64(object_storage_connect_timeout_ms, "-1");
// value is greater than 0 and less than 1000.
// When it's 0, low speed limit check will be disabled.
CONF_Int64(object_storage_request_timeout_ms, "-1");
// Request timeout for object storage specialized for rename_file operation.
// if this parameter is 0, use object_storage_request_timeout_ms instead.
CONF_Int64(object_storage_rename_file_request_timeout_ms, "30000");

CONF_Strings(fallback_to_hadoop_fs_list, "");
CONF_Strings(s3_compatible_fs_list, "s3n://, s3a://, s3://, oss://, cos://, cosn://, obs://, ks3://, tos://");
Expand Down
55 changes: 47 additions & 8 deletions be/src/fs/fs_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,23 @@ class S3ClientFactory {
return obj;
}

// Indicates the different S3 operation of using the client.
// This class is used to set different configuration for clients
// with different purposes.
enum class OperationType {
UNKNOWN,
RENAME_FILE,
};

~S3ClientFactory() = default;

S3ClientFactory(const S3ClientFactory&) = delete;
void operator=(const S3ClientFactory&) = delete;
S3ClientFactory(S3ClientFactory&&) = delete;
void operator=(S3ClientFactory&&) = delete;

S3ClientPtr new_client(const TCloudConfiguration& cloud_configuration);
S3ClientPtr new_client(const TCloudConfiguration& cloud_configuration,
S3ClientFactory::OperationType operation_type = S3ClientFactory::OperationType::UNKNOWN);
S3ClientPtr new_client(const ClientConfiguration& config, const FSOptions& opts);

void close();
Expand All @@ -102,6 +111,12 @@ class S3ClientFactory {
return instance;
}

// Only use for UT
bool find_client_cache_keys_by_config_TEST(const Aws::Client::ClientConfiguration& config,
AWSCloudConfiguration* cloud_config = nullptr) {
return _find_client_cache_keys_by_config_TEST(config);
}

private:
S3ClientFactory();

Expand All @@ -120,6 +135,17 @@ class S3ClientFactory {

constexpr static int kMaxItems = 8;

// Only use for UT
bool _find_client_cache_keys_by_config_TEST(const Aws::Client::ClientConfiguration& config,
AWSCloudConfiguration* cloud_config = nullptr) {
for (size_t i = 0; i < _items; i++) {
if (_client_cache_keys[i] ==
ClientCacheKey{config, cloud_config == nullptr ? AWSCloudConfiguration{} : *cloud_config})
return true;
}
return false;
}

std::mutex _lock;
int _items{0};
// _client_cache_keys[i] is the client cache key of |_clients[i].
Expand Down Expand Up @@ -171,7 +197,8 @@ void S3ClientFactory::close() {
}
}

S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfiguration& t_cloud_configuration) {
S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfiguration& t_cloud_configuration,
S3ClientFactory::OperationType operation_type) {
const AWSCloudConfiguration aws_cloud_configuration = CloudConfigurationFactory::create_aws(t_cloud_configuration);

Aws::Client::ClientConfiguration config = S3ClientFactory::getClientConfig();
Expand All @@ -194,7 +221,12 @@ S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const TCloudConfigurati
if (config::object_storage_connect_timeout_ms > 0) {
config.connectTimeoutMs = config::object_storage_connect_timeout_ms;
}
if (config::object_storage_request_timeout_ms >= 0) {

if (operation_type == S3ClientFactory::OperationType::RENAME_FILE &&
config::object_storage_rename_file_request_timeout_ms >= 0) {
config.requestTimeoutMs = config::object_storage_rename_file_request_timeout_ms;
} else if (config::object_storage_request_timeout_ms >= 0) {
// 0 is meaningful for object_storage_request_timeout_ms
config.requestTimeoutMs = config::object_storage_request_timeout_ms;
}

Expand Down Expand Up @@ -280,7 +312,9 @@ S3ClientFactory::S3ClientPtr S3ClientFactory::new_client(const ClientConfigurati
}

// If you find yourself change this code, see also `bool operator==(const Aws::Client::ClientConfiguration&, const Aws::Client::ClientConfiguration&)`
static std::shared_ptr<Aws::S3::S3Client> new_s3client(const S3URI& uri, const FSOptions& opts) {
static std::shared_ptr<Aws::S3::S3Client> new_s3client(
const S3URI& uri, const FSOptions& opts,
S3ClientFactory::OperationType operation_type = S3ClientFactory::OperationType::UNKNOWN) {
Aws::Client::ClientConfiguration config = S3ClientFactory::getClientConfig();
const THdfsProperties* hdfs_properties = opts.hdfs_properties();
// TODO(SmithCruise) If CloudType is DEFAULT, we should use hadoop sdk to access file,
Expand All @@ -291,7 +325,7 @@ static std::shared_ptr<Aws::S3::S3Client> new_s3client(const S3URI& uri, const F
const TCloudConfiguration& tCloudConfiguration = (opts.cloud_configuration != nullptr)
? *opts.cloud_configuration
: hdfs_properties->cloud_configuration;
return S3ClientFactory::instance().new_client(tCloudConfiguration);
return S3ClientFactory::instance().new_client(tCloudConfiguration, operation_type);
} else if (hdfs_properties != nullptr) {
DCHECK(hdfs_properties->__isset.end_point);
if (hdfs_properties->__isset.end_point) {
Expand Down Expand Up @@ -326,10 +360,15 @@ static std::shared_ptr<Aws::S3::S3Client> new_s3client(const S3URI& uri, const F
if (config::object_storage_connect_timeout_ms > 0) {
config.connectTimeoutMs = config::object_storage_connect_timeout_ms;
}
// 0 is meaningful for object_storage_request_timeout_ms
if (config::object_storage_request_timeout_ms >= 0) {

if (operation_type == S3ClientFactory::OperationType::RENAME_FILE &&
config::object_storage_rename_file_request_timeout_ms >= 0) {
config.requestTimeoutMs = config::object_storage_rename_file_request_timeout_ms;
} else if (config::object_storage_request_timeout_ms >= 0) {
// 0 is meaningful for object_storage_request_timeout_ms
config.requestTimeoutMs = config::object_storage_request_timeout_ms;
}

return S3ClientFactory::instance().new_client(config, opts);
}

Expand Down Expand Up @@ -501,7 +540,7 @@ Status S3FileSystem::rename_file(const std::string& src, const std::string& targ
if (!dest_uri.parse(target)) {
return Status::InvalidArgument(fmt::format("Invalid target S3 URI: {}", target));
}
auto client = new_s3client(src_uri, _options);
auto client = new_s3client(src_uri, _options, S3ClientFactory::OperationType::RENAME_FILE);
Aws::S3::Model::CopyObjectRequest copy_request;
copy_request.WithCopySource(src_uri.bucket() + "/" + src_uri.key());
copy_request.WithBucket(dest_uri.bucket());
Expand Down
68 changes: 68 additions & 0 deletions be/test/fs/fs_s3_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fstream>

#include "common/config.h"
#include "fs/fs_s3.cpp"
#include "gutil/strings/join.h"
#include "testutil/assert.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -511,4 +512,71 @@ TEST_F(S3FileSystemTest, test_delete_nonexist_file) {
ASSERT_OK(fs->delete_file(S3Path("/nonexist.dat")));
}

TEST_F(S3FileSystemTest, test_new_S3_client_with_rename_operation) {
int default_value = config::object_storage_rename_file_request_timeout_ms;
config::object_storage_rename_file_request_timeout_ms = 2000;
ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString("s3://"));
// only used for generate a new S3 client into global cache
(void)fs->rename_file(S3Path("/dir/source_name"), S3Path("/dir/target_name"));

// basic config
Aws::Client::ClientConfiguration config = S3ClientFactory::getClientConfig();
S3URI src_uri;
ASSERT_TRUE(src_uri.parse(S3Path("/dir/source_name")));
if (!src_uri.endpoint().empty()) {
config.endpointOverride = src_uri.endpoint();
} else if (!config::object_storage_endpoint.empty()) {
config.endpointOverride = config::object_storage_endpoint;
} else if (config::object_storage_endpoint_use_https) {
config.scheme = Aws::Http::Scheme::HTTPS;
} else {
config.scheme = Aws::Http::Scheme::HTTP;
}
if (!config::object_storage_region.empty()) {
config.region = config::object_storage_region;
}
config.maxConnections = config::object_storage_max_connection;
if (config::object_storage_connect_timeout_ms > 0) {
config.connectTimeoutMs = config::object_storage_connect_timeout_ms;
}

// reset requestTimeoutMs as config::object_storage_rename_file_request_timeout_ms
// to check hit the cache or not.
config.requestTimeoutMs = config::object_storage_rename_file_request_timeout_ms;
ASSERT_TRUE(S3ClientFactory::instance().find_client_cache_keys_by_config_TEST(config));

// use config::object_storage_request_timeout_ms instead
int old_object_storage_rename_file_request_timeout_ms = config::object_storage_rename_file_request_timeout_ms;
int old_object_storage_request_timeout_ms = config::object_storage_request_timeout_ms;
config::object_storage_rename_file_request_timeout_ms = -1;
config::object_storage_request_timeout_ms = 1000;
// only used for generate a new S3 client into global cache
(void)fs->rename_file(S3Path("/dir/source_name"), S3Path("/dir/target_name"));
config.requestTimeoutMs = config::object_storage_request_timeout_ms;
ASSERT_TRUE(S3ClientFactory::instance().find_client_cache_keys_by_config_TEST(config));
config::object_storage_rename_file_request_timeout_ms = old_object_storage_rename_file_request_timeout_ms;
config::object_storage_request_timeout_ms = old_object_storage_request_timeout_ms;

std::map<std::string, std::string> test_properties;
test_properties[AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR] = "true";
TCloudConfiguration tCloudConfiguration;
tCloudConfiguration.__set_cloud_type(TCloudType::AWS);
tCloudConfiguration.__set_cloud_properties_v2(test_properties);
auto cloud_config = CloudConfigurationFactory::create_aws(tCloudConfiguration);

config.requestTimeoutMs = config::object_storage_rename_file_request_timeout_ms;
(void)S3ClientFactory::instance().new_client(tCloudConfiguration, S3ClientFactory::OperationType::RENAME_FILE);
ASSERT_TRUE(S3ClientFactory::instance().find_client_cache_keys_by_config_TEST(config, &cloud_config));

old_object_storage_rename_file_request_timeout_ms = config::object_storage_rename_file_request_timeout_ms;
old_object_storage_request_timeout_ms = config::object_storage_request_timeout_ms;
config::object_storage_rename_file_request_timeout_ms = -1;
config::object_storage_request_timeout_ms = 1000;
(void)S3ClientFactory::instance().new_client(tCloudConfiguration, S3ClientFactory::OperationType::RENAME_FILE);
config.requestTimeoutMs = config::object_storage_request_timeout_ms;
ASSERT_TRUE(S3ClientFactory::instance().find_client_cache_keys_by_config_TEST(config, &cloud_config));
config::object_storage_rename_file_request_timeout_ms = default_value;
config::object_storage_request_timeout_ms = old_object_storage_request_timeout_ms;
}

} // namespace starrocks
Loading