diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 44a7f8b586bfa..0b8ae34eab241 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1153,8 +1153,6 @@ mod tests { use crate::physical_plan::functions::{make_scalar_function, Volatility}; use crate::physical_plan::{collect, collect_partitioned}; use crate::test; - use crate::test::object_store::TestObjectStore; - use crate::test_util::arrow_test_data; use crate::variable::VarType; use crate::{ assert_batches_eq, assert_batches_sorted_eq, @@ -3293,51 +3291,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn read_files_from_partitioned_path() -> Result<()> { - let mut ctx = ExecutionContext::new(); - - let testdata = arrow_test_data(); - let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata); - let file_schema = aggr_test_schema(); - let object_store = TestObjectStore::new_mirror( - csv_file_path, - &[ - "mytable/date=2021-10-27/file.csv", - "mytable/date=2021-10-28/file.csv", - ], - ); - - let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); - options.table_partition_cols = vec!["date".to_owned()]; - - let table = - ListingTable::new(object_store, "mytable".to_owned(), file_schema, options); - - ctx.register_table("t", Arc::new(table))?; - - let result = ctx - .sql("SELECT c1, date FROM t WHERE date='2021-10-27' LIMIT 5") - .await? - .collect() - .await?; - - let expected = vec![ - "+----+------------+", - "| c1 | date |", - "+----+------------+", - "| a | 2021-10-27 |", - "| b | 2021-10-27 |", - "| b | 2021-10-27 |", - "| c | 2021-10-27 |", - "| d | 2021-10-27 |", - "+----+------------+", - ]; - assert_batches_sorted_eq!(expected, &result); - - Ok(()) - } - #[tokio::test] async fn custom_query_planner() -> Result<()> { let mut ctx = ExecutionContext::with_config( diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index 9050e22155640..e93b4cd2d410d 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -14,51 +14,35 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - //! Object store implem used for testing use std::{ - fs, io, + io, io::{Cursor, Read}, sync::Arc, }; -use crate::datasource::object_store::{ - local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, - ObjectStore, SizedFile, +use crate::{ + datasource::object_store::{ + FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + }, + error::{DataFusionError, Result}, }; -use crate::error::{DataFusionError, Result}; - use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; #[derive(Debug)] /// An object store implem that is useful for testing. -/// Can either generate `ObjectReader`s that are filled with zero- -/// bytes or mirror a given file to multiple path. +/// `ObjectReader`s are filled with zero bytes. pub struct TestObjectStore { /// The `(path,size)` of the files that "exist" in the store files: Vec<(String, u64)>, - /// The file that will be read at all path. If none fille the - /// file with zero-bytes. - mirrored_file: Option, } impl TestObjectStore { pub fn new_arc(files: &[(&str, u64)]) -> Arc { Arc::new(Self { files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(), - mirrored_file: None, - }) - } - pub fn new_mirror(mirrored_file: String, paths: &[&str]) -> Arc { - let metadata = fs::metadata(&mirrored_file).expect("Local file metadata"); - Arc::new(Self { - files: paths - .iter() - .map(|&f| (f.to_owned(), metadata.len())) - .collect(), - mirrored_file: Some(mirrored_file), }) } } @@ -96,15 +80,8 @@ impl ObjectStore for TestObjectStore { fn file_reader(&self, file: SizedFile) -> Result> { match self.files.iter().find(|item| file.path == item.0) { - Some(&(_, size)) if size == file.size => { - if let Some(mirrored_file) = &self.mirrored_file { - Ok(LocalFileSystem {}.file_reader(SizedFile { - path: mirrored_file.clone(), - size, - })?) - } else { - Ok(Arc::new(EmptyObjectReader(size))) - } + Some((_, size)) if *size == file.size => { + Ok(Arc::new(EmptyObjectReader(*size))) } Some(_) => Err(DataFusionError::IoError(io::Error::new( io::ErrorKind::NotFound, diff --git a/datafusion/tests/common.rs b/datafusion/tests/common.rs new file mode 100644 index 0000000000000..3490db5e091fd --- /dev/null +++ b/datafusion/tests/common.rs @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! methods that are common to multiple integration test setups + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + +pub fn aggr_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::UInt32, false), + Field::new("c3", DataType::Int8, false), + Field::new("c4", DataType::Int16, false), + Field::new("c5", DataType::Int32, false), + Field::new("c6", DataType::Int64, false), + Field::new("c7", DataType::UInt8, false), + Field::new("c8", DataType::UInt16, false), + Field::new("c9", DataType::UInt32, false), + Field::new("c10", DataType::UInt64, false), + Field::new("c11", DataType::Float32, false), + Field::new("c12", DataType::Float64, false), + Field::new("c13", DataType::Utf8, false), + ])) +} diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs new file mode 100644 index 0000000000000..897baaa154df1 --- /dev/null +++ b/datafusion/tests/path_partition.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test queries on partitioned datasets + +use std::{fs, io, sync::Arc}; + +use async_trait::async_trait; +use datafusion::{ + assert_batches_sorted_eq, + datasource::{ + file_format::csv::CsvFormat, + listing::{ListingOptions, ListingTable}, + object_store::{ + local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream, + ObjectReader, ObjectStore, SizedFile, + }, + }, + error::{DataFusionError, Result}, + prelude::ExecutionContext, + test_util::arrow_test_data, +}; +use futures::{stream, StreamExt}; + +mod common; + +#[tokio::test] +async fn csv_filter_with_file_col() -> Result<()> { + let mut ctx = ExecutionContext::new(); + + register_partitioned_aggregate_csv( + &mut ctx, + &[ + "mytable/date=2021-10-27/file.csv", + "mytable/date=2021-10-28/file.csv", + ], + &["date"], + "mytable", + ); + + let result = ctx + .sql("SELECT c1, c2 FROM t WHERE date='2021-10-27' and date!=c1 LIMIT 5") + .await? + .collect() + .await?; + + let expected = vec![ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| a | 1 |", + "| b | 1 |", + "| b | 5 |", + "| c | 2 |", + "| d | 5 |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + +#[tokio::test] +async fn csv_projection_on_partition() -> Result<()> { + let mut ctx = ExecutionContext::new(); + + register_partitioned_aggregate_csv( + &mut ctx, + &[ + "mytable/date=2021-10-27/file.csv", + "mytable/date=2021-10-28/file.csv", + ], + &["date"], + "mytable", + ); + + let result = ctx + .sql("SELECT c1, date FROM t WHERE date='2021-10-27' LIMIT 5") + .await? + .collect() + .await?; + + let expected = vec![ + "+----+------------+", + "| c1 | date |", + "+----+------------+", + "| a | 2021-10-27 |", + "| b | 2021-10-27 |", + "| b | 2021-10-27 |", + "| c | 2021-10-27 |", + "| d | 2021-10-27 |", + "+----+------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + +#[tokio::test] +async fn csv_grouping_by_partition() -> Result<()> { + let mut ctx = ExecutionContext::new(); + + register_partitioned_aggregate_csv( + &mut ctx, + &[ + "mytable/date=2021-10-26/file.csv", + "mytable/date=2021-10-27/file.csv", + "mytable/date=2021-10-28/file.csv", + ], + &["date"], + "mytable", + ); + + let result = ctx + .sql("SELECT date, count(*), count(distinct(c1)) FROM t WHERE date<='2021-10-27' GROUP BY date") + .await? + .collect() + .await?; + + let expected = vec![ + "+------------+-----------------+----------------------+", + "| date | COUNT(UInt8(1)) | COUNT(DISTINCT t.c1) |", + "+------------+-----------------+----------------------+", + "| 2021-10-26 | 100 | 5 |", + "| 2021-10-27 | 100 | 5 |", + "+------------+-----------------+----------------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + +fn register_partitioned_aggregate_csv( + ctx: &mut ExecutionContext, + store_paths: &[&str], + partition_cols: &[&str], + table_path: &str, +) { + let testdata = arrow_test_data(); + let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata); + let file_schema = common::aggr_test_schema(); + let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths); + + let mut options = ListingOptions::new(Arc::new(CsvFormat::default())); + options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); + + let table = + ListingTable::new(object_store, table_path.to_owned(), file_schema, options); + + ctx.register_table("t", Arc::new(table)) + .expect("registering listing table failed"); +} + +#[derive(Debug)] +/// An object store implem that is mirrors a given file to multiple paths. +pub struct MirroringObjectStore { + /// The `(path,size)` of the files that "exist" in the store + files: Vec, + /// The file that will be read at all path + mirrored_file: String, + /// Size of the mirrored file + file_size: u64, +} + +impl MirroringObjectStore { + pub fn new_arc(mirrored_file: String, paths: &[&str]) -> Arc { + let metadata = fs::metadata(&mirrored_file).expect("Local file metadata"); + Arc::new(Self { + files: paths.iter().map(|&f| f.to_owned()).collect(), + mirrored_file, + file_size: metadata.len(), + }) + } +} + +#[async_trait] +impl ObjectStore for MirroringObjectStore { + async fn list_file(&self, prefix: &str) -> Result { + let prefix = prefix.to_owned(); + let size = self.file_size; + Ok(Box::pin( + stream::iter( + self.files + .clone() + .into_iter() + .filter(move |f| f.starts_with(&prefix)), + ) + .map(move |f| { + Ok(FileMeta { + sized_file: SizedFile { path: f, size }, + last_modified: None, + }) + }), + )) + } + + async fn list_dir( + &self, + _prefix: &str, + _delimiter: Option, + ) -> Result { + unimplemented!() + } + + fn file_reader(&self, file: SizedFile) -> Result> { + assert_eq!( + self.file_size, file.size, + "Requested files should have the same size as the mirrored file" + ); + match self.files.iter().find(|&item| &file.path == item) { + Some(_) => Ok(LocalFileSystem {}.file_reader(SizedFile { + path: self.mirrored_file.clone(), + size: self.file_size, + })?), + None => Err(DataFusionError::IoError(io::Error::new( + io::ErrorKind::NotFound, + "not in provided test list", + ))), + } + } +} diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 283033bcde4e0..e0b96c387e75c 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -47,6 +47,8 @@ use datafusion::{ }; use datafusion::{execution::context::ExecutionContext, physical_plan::displayable}; +mod common; + /// A macro to assert that one string is contained within another with /// a nice error message if they are not. /// @@ -3100,24 +3102,6 @@ async fn explain_analyze_runs_optimizers() { assert_contains!(actual, expected); } -fn aggr_test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::UInt32, false), - Field::new("c3", DataType::Int8, false), - Field::new("c4", DataType::Int16, false), - Field::new("c5", DataType::Int32, false), - Field::new("c6", DataType::Int64, false), - Field::new("c7", DataType::UInt8, false), - Field::new("c8", DataType::UInt16, false), - Field::new("c9", DataType::UInt32, false), - Field::new("c10", DataType::UInt64, false), - Field::new("c11", DataType::Float32, false), - Field::new("c12", DataType::Float64, false), - Field::new("c13", DataType::Utf8, false), - ])) -} - async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { let testdata = datafusion::test_util::arrow_test_data(); @@ -3161,7 +3145,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { async fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> { let testdata = datafusion::test_util::arrow_test_data(); - let schema = aggr_test_schema(); + let schema = common::aggr_test_schema(); ctx.register_csv( "aggregate_test_100", &format!("{}/csv/aggregate_test_100.csv", testdata),