Skip to content

Commit

Permalink
feat: Add storage endpoint check (#13550)
Browse files Browse the repository at this point in the history
* feat: Add storage endpoint check

Signed-off-by: Xuanwo <[email protected]>

* Update src/meta/app/src/storage/storage_params.rs

Co-authored-by: Yang Xiufeng <[email protected]>

* Fix stage test

Signed-off-by: Xuanwo <[email protected]>

* Fix test

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Yang Xiufeng <[email protected]>
  • Loading branch information
Xuanwo and youngsofun authored Nov 8, 2023
1 parent a24cb74 commit 09edee5
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 9 deletions.
40 changes: 36 additions & 4 deletions src/meta/app/src/storage/storage_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::time::Duration;

use common_exception::ErrorCode;
use common_exception::Result;
use serde::Deserialize;
use serde::Serialize;

Expand Down Expand Up @@ -117,17 +120,46 @@ impl StorageParams {
/// auto_detect is used to do auto detect for some storage params under async context.
///
/// - This action should be taken before storage params been passed out.
/// - This action should not return errors, we will return it as is if any error happened.
pub async fn auto_detect(self) -> Self {
match self {
pub async fn auto_detect(self) -> Result<Self> {
let sp = match self {
StorageParams::S3(mut s3) if s3.region.is_empty() => {
// TODO: endpoint related logic should be moved out from opendal as a new API.
// Remove the possible trailing `/` in endpoint.
let endpoint = s3.endpoint_url.trim_end_matches('/');

// Make sure the endpoint contains the scheme.
let endpoint = if endpoint.starts_with("http") {
endpoint.to_string()
} else {
// Prefix https if endpoint doesn't start with scheme.
format!("https://{}", endpoint)
};

// We should not return error if client create failed, just ignore it.
if let Ok(client) = opendal::raw::HttpClient::new() {
// The response itself doesn't important.
let _ = client
.client()
.get(&endpoint)
.timeout(Duration::from_secs(10))
.send()
.await
.map_err(|err| {
ErrorCode::InvalidConfig(format!(
"s3 endpoint_url {} is invalid or incomplete: {err:?}",
s3.endpoint_url
))
})?;
}
s3.region = opendal::services::S3::detect_region(&s3.endpoint_url, &s3.bucket)
.await
.unwrap_or_default();
StorageParams::S3(s3)
}
v => v,
}
};

Ok(sp)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl InnerConfig {
cfg.query.node_id = GlobalUniqName::unique();

// Handle auto detect for storage params.
cfg.storage.params = cfg.storage.params.auto_detect().await;
cfg.storage.params = cfg.storage.params.auto_detect().await?;

// Only check meta config when cmd is empty.
if cfg.subcommand.is_none() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sharing_endpoint/src/configs/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Config {
/// In the future, we could have `ConfigV1` and `ConfigV2`.
pub async fn load() -> Result<Self> {
let mut cfg: Self = OuterV0Config::load(true)?.try_into()?;
cfg.storage.params = cfg.storage.params.auto_detect().await;
cfg.storage.params = cfg.storage.params.auto_detect().await?;

Ok(cfg)
}
Expand Down
7 changes: 6 additions & 1 deletion src/query/sql/src/planner/binder/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,12 @@ pub async fn parse_uri_location(l: &mut UriLocation) -> Result<(StorageParams, S
}
};

let sp = sp.auto_detect().await;
let sp = sp.auto_detect().await.map_err(|err| {
Error::new(
ErrorKind::InvalidInput,
anyhow!("storage params is invalid for it's auto detect failed for {err:?}"),
)
})?;

Ok((sp, path))
}
2 changes: 1 addition & 1 deletion src/query/storages/iceberg/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Database for IcebergDatabase {
}

let table_sp = self.db_root.params().map_root(|r| format!("{r}{path}"));
let table_sp = table_sp.auto_detect().await;
let table_sp = table_sp.auto_detect().await?;
let tbl_root = DataOperator::try_create(&table_sp).await?;

let tbl = IcebergTable::try_create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ DROP STAGE ~
statement error 2506
CREATE STAGE ~

statement error 1005
statement error 4000
create stage tpch100_data url='s3://wubx/tb101' connection=(aws_key_id='minioadmin' aws_secret_key='minioadmin' endpoint_url='http://127.0.0.1:9900');

statement ok
Expand Down

0 comments on commit 09edee5

Please sign in to comment.