diff --git a/Cargo.lock b/Cargo.lock index 1424a2515ea89..bdbeb6a6d28ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -672,6 +672,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compat" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-compression" version = "0.4.3" @@ -4979,6 +4992,7 @@ dependencies = [ "arrow-udf-wasm", "async-backtrace", "async-channel 1.9.0", + "async-compat", "async-recursion", "async-stream", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index eb73f1ccd05f1..b03924294e20b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,6 +233,7 @@ arrow-udf-python = { version = "0.4.0" } arrow-udf-wasm = { version = "0.4.1" } async-backtrace = "0.2" async-channel = "1.7.1" +async-compat = { version = "0.2" } async-compression = { git = "https://github.com/datafuse-extras/async-compression", rev = "dc81082", features = [ "futures-io", "all-algorithms", diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 4e1c9292b0c4f..045ca39167c6e 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -38,6 +38,7 @@ arrow-udf-python = { workspace = true, optional = true } arrow-udf-wasm = { workspace = true } async-backtrace = { workspace = true } async-channel = { workspace = true } +async-compat = { workspace = true } async-recursion = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } diff --git a/src/query/service/src/servers/http/v1/stage.rs b/src/query/service/src/servers/http/v1/stage.rs index daea164b230cd..aa9e35ea1571d 100644 --- a/src/query/service/src/servers/http/v1/stage.rs +++ b/src/query/service/src/servers/http/v1/stage.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_compat::CompatExt; use databend_common_meta_app::principal::StageInfo; use databend_common_storages_stage::StageTable; use databend_common_users::UserApiProvider; +use futures_util::io; +use futures_util::AsyncWriteExt; use http::StatusCode; use poem::error::InternalServerError; use poem::error::Result as PoemResult; @@ -115,13 +118,24 @@ pub async fn upload_to_stage( Some(name) => name.to_string(), None => uuid::Uuid::new_v4().to_string(), }; - let bytes = field.bytes().await.map_err(InternalServerError)?; let file_path = format!("{}/{}", args.relative_path, name) .trim_start_matches('/') .to_string(); - op.write(&file_path, bytes) + + // Read field with 1MiB buf. + let mut r = io::BufReader::with_capacity(1024 * 1024, field.into_async_read().compat()); + // Upload in 16MiB chunks. + let mut w = op + .writer_with(&file_path) + .chunk(16 * 1024 * 1024) + .await + .map_err(InternalServerError)? + .into_futures_async_write(); + + io::copy_buf(&mut r, &mut w) .await .map_err(InternalServerError)?; + w.close().await.map_err(InternalServerError)?; files.push(name.clone()); }