Skip to content

Commit

Permalink
[test] add integ tests for partitioned parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 28, 2021
1 parent dfa6d0e commit f8edb38
Showing 1 changed file with 95 additions and 2 deletions.
97 changes: 95 additions & 2 deletions datafusion/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use async_trait::async_trait;
use datafusion::{
assert_batches_sorted_eq,
datasource::{
file_format::csv::CsvFormat,
file_format::{csv::CsvFormat, parquet::ParquetFormat},
listing::{ListingOptions, ListingTable},
object_store::{
local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream,
Expand All @@ -32,7 +32,7 @@ use datafusion::{
},
error::{DataFusionError, Result},
prelude::ExecutionContext,
test_util::arrow_test_data,
test_util::{arrow_test_data, parquet_test_data},
};
use futures::{stream, StreamExt};

Expand Down Expand Up @@ -144,6 +144,73 @@ async fn csv_grouping_by_partition() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn parquet_multiple_partitions() -> Result<()> {
let mut ctx = ExecutionContext::new();

register_partitioned_alltypes_parquet(
&mut ctx,
&[
"year=2021/month=09/day=09/file.parquet",
"year=2021/month=10/day=09/file.parquet",
"year=2021/month=10/day=28/file.parquet",
],
&["year", "month", "day"],
"",
)
.await;

let result = ctx
.sql("SELECT id, day FROM t WHERE day=month ORDER BY id")
.await?
.collect()
.await?;

let expected = vec![
"+----+-----+",
"| id | day |",
"+----+-----+",
"| 0 | 09 |",
"| 1 | 09 |",
"| 2 | 09 |",
"| 3 | 09 |",
"| 4 | 09 |",
"| 5 | 09 |",
"| 6 | 09 |",
"| 7 | 09 |",
"+----+-----+",
];
assert_batches_sorted_eq!(expected, &result);

Ok(())
}

#[tokio::test]
async fn parquet_overlapping_columns() -> Result<()> {
let mut ctx = ExecutionContext::new();

// `id` is both a column of the file and a partitioning col
register_partitioned_alltypes_parquet(
&mut ctx,
&[
"id=1/file.parquet",
"id=2/file.parquet",
"id=3/file.parquet",
],
&["id"],
"",
)
.await;

let result = ctx.sql("SELECT id FROM t WHERE id=1 ORDER BY id").await;

assert!(
result.is_err(),
"Dupplicate qualified name should raise error"
);
Ok(())
}

fn register_partitioned_aggregate_csv(
ctx: &mut ExecutionContext,
store_paths: &[&str],
Expand All @@ -165,6 +232,32 @@ fn register_partitioned_aggregate_csv(
.expect("registering listing table failed");
}

async fn register_partitioned_alltypes_parquet(
ctx: &mut ExecutionContext,
store_paths: &[&str],
partition_cols: &[&str],
table_path: &str,
) {
let testdata = parquet_test_data();
let parquet_file_path = format!("{}/alltypes_plain.parquet", testdata);
let object_store =
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths);

let mut options = ListingOptions::new(Arc::new(ParquetFormat::default()));
options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect();

let file_schema = options
.infer_schema(Arc::clone(&object_store), store_paths[0])
.await
.expect("Parquet schema inference failed");

let table =
ListingTable::new(object_store, table_path.to_owned(), file_schema, options);

ctx.register_table("t", Arc::new(table))
.expect("registering listing table failed");
}

#[derive(Debug)]
/// An object store implem that is mirrors a given file to multiple paths.
pub struct MirroringObjectStore {
Expand Down

0 comments on commit f8edb38

Please sign in to comment.