diff --git a/datafusion-examples/examples/dataframe-to-s3.rs b/datafusion-examples/examples/dataframe-to-s3.rs new file mode 100644 index 000000000000..5cd329009bdf --- /dev/null +++ b/datafusion-examples/examples/dataframe-to-s3.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::datasource::file_format::file_type::{FileType, GetExt}; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::error::Result; +use datafusion::prelude::*; + +use object_store::aws::AmazonS3Builder; +use std::env; +use std::sync::Arc; +use url::Url; + +/// This example demonstrates querying data from AmazonS3 and writing +/// the result of a query back to AmazonS3 +#[tokio::main] +async fn main() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); + + //enter region and bucket to which your credentials have GET and PUT access + let region = ""; + let bucket_name = ""; + + let s3 = AmazonS3Builder::new() + .with_bucket_name(bucket_name) + .with_region(region) + .with_access_key_id(env::var("AWS_ACCESS_KEY_ID").unwrap()) + .with_secret_access_key(env::var("AWS_SECRET_ACCESS_KEY").unwrap()) + .build()?; + + let path = format!("s3://{bucket_name}"); + let s3_url = Url::parse(&path).unwrap(); + let arc_s3 = Arc::new(s3); + ctx.runtime_env() + .register_object_store(&s3_url, arc_s3.clone()); + + let path = format!("s3://{bucket_name}/test_data/"); + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(FileType::PARQUET.get_ext()); + ctx.register_listing_table("test", &path, listing_options, None, None) + .await?; + + // execute the query + let df = ctx.sql("SELECT * from test").await?; + + let out_path = format!("s3://{bucket_name}/test_write/"); + df.clone().write_parquet(&out_path, None).await?; + + //write as JSON to s3 + let json_out = format!("s3://{bucket_name}/json_out"); + df.clone().write_json(&json_out).await?; + + //write as csv to s3 + let csv_out = format!("s3://{bucket_name}/csv_out"); + df.write_csv(&csv_out).await?; + + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(FileType::PARQUET.get_ext()); + ctx.register_listing_table("test2", &out_path, listing_options, None, None) + .await?; + + let df = ctx + .sql( + "SELECT * \ + FROM test2 \ + ", + ) + .await?; + + df.show_limit(20).await?; + + Ok(()) +} diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index fc25893d3619..9a7602b792fe 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -18,7 +18,7 @@ //! Execution plan for reading CSV files use crate::datasource::file_format::file_type::FileCompressionType; -use crate::datasource::listing::FileRange; +use crate::datasource::listing::{FileRange, ListingTableUrl}; use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -34,6 +34,7 @@ use arrow::csv; use arrow::datatypes::SchemaRef; use datafusion_execution::TaskContext; use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties}; +use tokio::io::AsyncWriteExt; use super::FileScanConfig; @@ -43,10 +44,8 @@ use futures::{StreamExt, TryStreamExt}; use object_store::local::LocalFileSystem; use object_store::{GetOptions, GetResult, ObjectStore}; use std::any::Any; -use std::fs; use std::io::Cursor; use std::ops::Range; -use std::path::Path; use std::sync::Arc; use std::task::Poll; use tokio::task::JoinSet; @@ -566,30 +565,37 @@ pub async fn plan_to_csv( path: impl AsRef, ) -> Result<()> { let path = path.as_ref(); - // create directory to contain the CSV files (one per partition) - let fs_path = Path::new(path); - if let Err(e) = fs::create_dir(fs_path) { - return Err(DataFusionError::Execution(format!( - "Could not create directory {path}: {e:?}" - ))); - } - + let parsed = ListingTableUrl::parse(path)?; + let object_store_url = parsed.object_store(); + let store = task_ctx.runtime_env().object_store(&object_store_url)?; let mut join_set = JoinSet::new(); for i in 0..plan.output_partitioning().partition_count() { - let plan = plan.clone(); - let filename = format!("part-{i}.csv"); - let path = fs_path.join(filename); - let file = fs::File::create(path)?; - let mut writer = csv::Writer::new(file); - let stream = plan.execute(i, task_ctx.clone())?; + let storeref = store.clone(); + let plan: Arc = plan.clone(); + let filename = format!("{}/part-{i}.csv", parsed.prefix()); + let file = object_store::path::Path::parse(filename)?; + let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let result: Result<()> = stream - .map(|batch| writer.write(&batch?)) - .try_collect() + let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buffer = Vec::with_capacity(1024); + //only write headers on first iteration + let mut write_headers = true; + while let Some(batch) = stream.next().await.transpose()? { + let mut writer = csv::WriterBuilder::new() + .has_headers(write_headers) + .build(buffer); + writer.write(&batch)?; + buffer = writer.into_inner(); + multipart_writer.write_all(&buffer).await?; + buffer.clear(); + //prevent writing headers more than once + write_headers = false; + } + multipart_writer + .shutdown() .await - .map_err(DataFusionError::from); - result + .map_err(DataFusionError::from) }); } @@ -1033,14 +1039,20 @@ mod tests { #[tokio::test] async fn write_csv_results_error_handling() -> Result<()> { let ctx = SessionContext::new(); + + // register a local file system object store + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); let options = CsvReadOptions::default() .schema_infer_max_records(2) .has_header(true); let df = ctx.read_csv("tests/data/corrupt.csv", options).await?; - let tmp_dir = TempDir::new()?; - let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + + let out_dir_url = "file://local/out"; let e = df - .write_csv(&out_dir) + .write_csv(out_dir_url) .await .expect_err("should fail because input file does not match inferred schema"); assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); @@ -1064,10 +1076,18 @@ mod tests { ) .await?; + // register a local file system object store + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + + ctx.runtime_env().register_object_store(&local_url, local); + // execute a simple query and write the results to CSV let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let out_dir_url = "file://local/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_csv(&out_dir).await?; + df.write_csv(out_dir_url).await?; // create a new context and verify that the results were saved to a partitioned csv file let ctx = SessionContext::new(); diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 541e448cfef7..327a10f5c3ef 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -17,6 +17,7 @@ //! Execution plan for reading line-delimited JSON files use crate::datasource::file_format::file_type::FileCompressionType; +use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -36,13 +37,13 @@ use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties}; use bytes::{Buf, Bytes}; use futures::{ready, stream, StreamExt, TryStreamExt}; +use object_store; use object_store::{GetResult, ObjectStore}; use std::any::Any; -use std::fs; use std::io::BufReader; -use std::path::Path; use std::sync::Arc; use std::task::Poll; +use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; use super::FileScanConfig; @@ -259,29 +260,31 @@ pub async fn plan_to_json( path: impl AsRef, ) -> Result<()> { let path = path.as_ref(); - // create directory to contain the CSV files (one per partition) - let fs_path = Path::new(path); - if let Err(e) = fs::create_dir(fs_path) { - return Err(DataFusionError::Execution(format!( - "Could not create directory {path}: {e:?}" - ))); - } - + let parsed = ListingTableUrl::parse(path)?; + let object_store_url = parsed.object_store(); + let store = task_ctx.runtime_env().object_store(&object_store_url)?; let mut join_set = JoinSet::new(); for i in 0..plan.output_partitioning().partition_count() { - let plan = plan.clone(); - let filename = format!("part-{i}.json"); - let path = fs_path.join(filename); - let file = fs::File::create(path)?; - let mut writer = json::LineDelimitedWriter::new(file); - let stream = plan.execute(i, task_ctx.clone())?; + let storeref = store.clone(); + let plan: Arc = plan.clone(); + let filename = format!("{}/part-{i}.json", parsed.prefix()); + let file = object_store::path::Path::parse(filename)?; + + let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let result: Result<()> = stream - .map(|batch| writer.write(&batch?)) - .try_collect() + let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buffer = Vec::with_capacity(1024); + while let Some(batch) = stream.next().await.transpose()? { + let mut writer = json::LineDelimitedWriter::new(buffer); + writer.write(&batch)?; + buffer = writer.into_inner(); + multipart_writer.write_all(&buffer).await?; + buffer.clear(); + } + multipart_writer + .shutdown() .await - .map_err(DataFusionError::from); - result + .map_err(DataFusionError::from) }); } @@ -320,6 +323,7 @@ mod tests { use crate::test::partitioned_file_groups; use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array}; use rstest::*; + use std::path::Path; use tempfile::TempDir; use url::Url; @@ -649,7 +653,6 @@ mod tests { #[tokio::test] async fn write_json_results() -> Result<()> { // create partitioned input file and context - let tmp_dir = TempDir::new()?; let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(8)); @@ -659,10 +662,17 @@ mod tests { ctx.register_json("test", path.as_str(), NdJsonReadOptions::default()) .await?; + // register a local file system object store for /tmp directory + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); + // execute a simple query and write the results to CSV let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let out_dir_url = "file://local/out"; let df = ctx.sql("SELECT a, b FROM test").await?; - df.write_json(&out_dir).await?; + df.write_json(out_dir_url).await?; // create a new context and verify that the results were saved to a partitioned csv file let ctx = SessionContext::new(); @@ -720,14 +730,18 @@ mod tests { #[tokio::test] async fn write_json_results_error_handling() -> Result<()> { let ctx = SessionContext::new(); + // register a local file system object store for /tmp directory + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); let options = CsvReadOptions::default() .schema_infer_max_records(2) .has_header(true); let df = ctx.read_csv("tests/data/corrupt.csv", options).await?; - let tmp_dir = TempDir::new()?; - let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let out_dir_url = "file://local/out"; let e = df - .write_json(&out_dir) + .write_json(out_dir_url) .await .expect_err("should fail because input file does not match inferred schema"); assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index b61c1e1bfa0a..03e5f9ebdc57 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -26,6 +26,7 @@ use crate::datasource::physical_plan::{ }; use crate::{ config::ConfigOptions, + datasource::listing::ListingTableUrl, error::{DataFusionError, Result}, execution::context::TaskContext, physical_optimizer::pruning::PruningPredicate, @@ -37,11 +38,12 @@ use crate::{ }; use datafusion_physical_expr::PhysicalSortExpr; use fmt::Debug; +use object_store::path::Path; use std::any::Any; use std::fmt; -use std::fs; use std::ops::Range; use std::sync::Arc; +use tokio::task::JoinSet; use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::ArrowError; @@ -56,11 +58,10 @@ use log::debug; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; -use tokio::task::JoinSet; mod metrics; pub mod page_filter; @@ -634,31 +635,35 @@ pub async fn plan_to_parquet( writer_properties: Option, ) -> Result<()> { let path = path.as_ref(); - // create directory to contain the Parquet files (one per partition) - let fs_path = std::path::Path::new(path); - if let Err(e) = fs::create_dir(fs_path) { - return Err(DataFusionError::Execution(format!( - "Could not create directory {path}: {e:?}" - ))); - } - + let parsed = ListingTableUrl::parse(path)?; + let object_store_url = parsed.object_store(); + let store = task_ctx.runtime_env().object_store(&object_store_url)?; let mut join_set = JoinSet::new(); for i in 0..plan.output_partitioning().partition_count() { - let plan = plan.clone(); - let filename = format!("part-{i}.parquet"); - let path = fs_path.join(filename); - let file = fs::File::create(path)?; - let mut writer = - ArrowWriter::try_new(file, plan.schema(), writer_properties.clone())?; - let stream = plan.execute(i, task_ctx.clone())?; + let plan: Arc = plan.clone(); + let filename = format!("{}/part-{i}.parquet", parsed.prefix()); + let file = Path::parse(filename)?; + let propclone = writer_properties.clone(); + + let storeref = store.clone(); + let (_, multipart_writer) = storeref.put_multipart(&file).await?; + let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - stream - .map(|batch| writer.write(&batch?).map_err(DataFusionError::ParquetError)) - .try_collect() + let mut writer = AsyncArrowWriter::try_new( + multipart_writer, + plan.schema(), + 10485760, + propclone, + )?; + while let Some(next_batch) = stream.next().await { + let batch = next_batch?; + writer.write(&batch).await?; + } + writer + .close() .await - .map_err(DataFusionError::from)?; - - writer.close().map_err(DataFusionError::from).map(|_| ()) + .map_err(DataFusionError::from) + .map(|_| ()) }); } @@ -760,6 +765,7 @@ mod tests { use std::fs::File; use std::io::Write; use tempfile::TempDir; + use url::Url; struct RoundTripResult { /// Data that was read back from ParquetFiles @@ -908,14 +914,19 @@ mod tests { #[tokio::test] async fn write_parquet_results_error_handling() -> Result<()> { let ctx = SessionContext::new(); + // register a local file system object store for /tmp directory + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); + let options = CsvReadOptions::default() .schema_infer_max_records(2) .has_header(true); let df = ctx.read_csv("tests/data/corrupt.csv", options).await?; - let tmp_dir = TempDir::new()?; - let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let out_dir_url = "file://local/out"; let e = df - .write_parquet(&out_dir, None) + .write_parquet(out_dir_url, None) .await .expect_err("should fail because input file does not match inferred schema"); assert_eq!("Arrow error: Parser error: Error while parsing value d for column 0 at line 4", format!("{e}")); @@ -1927,10 +1938,16 @@ mod tests { ) .await?; + // register a local file system object store for /tmp directory + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + ctx.runtime_env().register_object_store(&local_url, local); + // execute a simple query and write the results to parquet let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let out_dir_url = "file://local/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_parquet(&out_dir, None).await?; + df.write_parquet(out_dir_url, None).await?; // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?; // create a new context and verify that the results were saved to a partitioned csv file