Skip to content

Commit

Permalink
[test] added proper integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 28, 2021
1 parent 1c7b0e7 commit 5c189dc
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 98 deletions.
47 changes: 0 additions & 47 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,8 +1153,6 @@ mod tests {
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
use crate::test;
use crate::test::object_store::TestObjectStore;
use crate::test_util::arrow_test_data;
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
Expand Down Expand Up @@ -3293,51 +3291,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_files_from_partitioned_path() -> Result<()> {
let mut ctx = ExecutionContext::new();

let testdata = arrow_test_data();
let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata);
let file_schema = aggr_test_schema();
let object_store = TestObjectStore::new_mirror(
csv_file_path,
&[
"mytable/date=2021-10-27/file.csv",
"mytable/date=2021-10-28/file.csv",
],
);

let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
options.table_partition_cols = vec!["date".to_owned()];

let table =
ListingTable::new(object_store, "mytable".to_owned(), file_schema, options);

ctx.register_table("t", Arc::new(table))?;

let result = ctx
.sql("SELECT c1, date FROM t WHERE date='2021-10-27' LIMIT 5")
.await?
.collect()
.await?;

let expected = vec![
"+----+------------+",
"| c1 | date |",
"+----+------------+",
"| a | 2021-10-27 |",
"| b | 2021-10-27 |",
"| b | 2021-10-27 |",
"| c | 2021-10-27 |",
"| d | 2021-10-27 |",
"+----+------------+",
];
assert_batches_sorted_eq!(expected, &result);

Ok(())
}

#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let mut ctx = ExecutionContext::with_config(
Expand Down
41 changes: 9 additions & 32 deletions datafusion/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,35 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Object store implem used for testing
use std::{
fs, io,
io,
io::{Cursor, Read},
sync::Arc,
};

use crate::datasource::object_store::{
local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore, SizedFile,
use crate::{
datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
},
error::{DataFusionError, Result},
};
use crate::error::{DataFusionError, Result};

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};

#[derive(Debug)]
/// An object store implem that is useful for testing.
/// Can either generate `ObjectReader`s that are filled with zero-
/// bytes or mirror a given file to multiple path.
/// `ObjectReader`s are filled with zero bytes.
pub struct TestObjectStore {
/// The `(path,size)` of the files that "exist" in the store
files: Vec<(String, u64)>,
/// The file that will be read at all path. If none fille the
/// file with zero-bytes.
mirrored_file: Option<String>,
}

impl TestObjectStore {
pub fn new_arc(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
Arc::new(Self {
files: files.iter().map(|f| (f.0.to_owned(), f.1)).collect(),
mirrored_file: None,
})
}
pub fn new_mirror(mirrored_file: String, paths: &[&str]) -> Arc<dyn ObjectStore> {
let metadata = fs::metadata(&mirrored_file).expect("Local file metadata");
Arc::new(Self {
files: paths
.iter()
.map(|&f| (f.to_owned(), metadata.len()))
.collect(),
mirrored_file: Some(mirrored_file),
})
}
}
Expand Down Expand Up @@ -96,15 +80,8 @@ impl ObjectStore for TestObjectStore {

fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
match self.files.iter().find(|item| file.path == item.0) {
Some(&(_, size)) if size == file.size => {
if let Some(mirrored_file) = &self.mirrored_file {
Ok(LocalFileSystem {}.file_reader(SizedFile {
path: mirrored_file.clone(),
size,
})?)
} else {
Ok(Arc::new(EmptyObjectReader(size)))
}
Some((_, size)) if *size == file.size => {
Ok(Arc::new(EmptyObjectReader(*size)))
}
Some(_) => Err(DataFusionError::IoError(io::Error::new(
io::ErrorKind::NotFound,
Expand Down
40 changes: 40 additions & 0 deletions datafusion/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! methods that are common to multiple integration test setups
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

pub fn aggr_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]))
}
Loading

0 comments on commit 5c189dc

Please sign in to comment.