Skip to content

Commit

Permalink
Add partitioned_csv setup code to sql_integration test (#1743)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Feb 5, 2022
1 parent 40df55f commit e52f844
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 239 deletions.
239 changes: 1 addition & 238 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,11 +1281,9 @@ mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::from_slice::FromSlice;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::collect;
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
use crate::test;
use crate::variable::VarType;
use crate::{
Expand All @@ -1311,7 +1309,6 @@ mod tests {
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use tempfile::TempDir;
use test::*;

#[tokio::test]
async fn shared_memory_and_disk_manager() {
Expand Down Expand Up @@ -1347,62 +1344,6 @@ mod tests {
));
}

#[tokio::test]
async fn parallel_projection() -> Result<()> {
let partition_count = 4;
let results = execute("SELECT c1, c2 FROM test", partition_count).await?;

let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 3 | 1 |",
"| 3 | 2 |",
"| 3 | 3 |",
"| 3 | 4 |",
"| 3 | 5 |",
"| 3 | 6 |",
"| 3 | 7 |",
"| 3 | 8 |",
"| 3 | 9 |",
"| 3 | 10 |",
"| 2 | 1 |",
"| 2 | 2 |",
"| 2 | 3 |",
"| 2 | 4 |",
"| 2 | 5 |",
"| 2 | 6 |",
"| 2 | 7 |",
"| 2 | 8 |",
"| 2 | 9 |",
"| 2 | 10 |",
"| 1 | 1 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 1 | 4 |",
"| 1 | 5 |",
"| 1 | 6 |",
"| 1 | 7 |",
"| 1 | 8 |",
"| 1 | 9 |",
"| 1 | 10 |",
"| 0 | 1 |",
"| 0 | 2 |",
"| 0 | 3 |",
"| 0 | 4 |",
"| 0 | 5 |",
"| 0 | 6 |",
"| 0 | 7 |",
"| 0 | 8 |",
"| 0 | 9 |",
"| 0 | 10 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn create_variable_expr() -> Result<()> {
let tmp_dir = TempDir::new()?;
Expand Down Expand Up @@ -1447,184 +1388,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parallel_query_with_filter() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let ctx = create_ctx(&tmp_dir, partition_count).await?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;

let runtime = ctx.state.lock().runtime_env.clone();
let results = collect_partitioned(physical_plan, runtime).await?;

// note that the order of partitions is not deterministic
let mut num_rows = 0;
for partition in &results {
for batch in partition {
num_rows += batch.num_rows();
}
}
assert_eq!(20, num_rows);

let results: Vec<RecordBatch> = results.into_iter().flatten().collect();
let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 1 | 1 |",
"| 1 | 10 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 1 | 4 |",
"| 1 | 5 |",
"| 1 | 6 |",
"| 1 | 7 |",
"| 1 | 8 |",
"| 1 | 9 |",
"| 2 | 1 |",
"| 2 | 10 |",
"| 2 | 2 |",
"| 2 | 3 |",
"| 2 | 4 |",
"| 2 | 5 |",
"| 2 | 6 |",
"| 2 | 7 |",
"| 2 | 8 |",
"| 2 | 9 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn projection_on_table_scan() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let ctx = create_ctx(&tmp_dir, partition_count).await?;
let runtime = ctx.state.lock().runtime_env.clone();

let table = ctx.table("test")?;
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
},
_ => panic!("expect optimized_plan to be projection"),
}

let expected = "Projection: #test.c2\
\n TableScan: test projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());

let batches = collect(physical_plan, runtime).await?;
assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());

Ok(())
}

#[tokio::test]
async fn preserve_nullability_on_projection() -> Result<()> {
let tmp_dir = TempDir::new()?;
let ctx = create_ctx(&tmp_dir, 1).await?;

let schema: Schema = ctx.table("test").unwrap().schema().clone().into();
assert!(!schema.field_with_name("c1")?.is_nullable());

let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
.project(vec![col("c1")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
Ok(())
}

#[tokio::test]
async fn projection_on_memory_scan() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]);
let schema = SchemaRef::new(schema);

let partitions = vec![vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
],
)?]];

let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
.project(vec![col("b")])?
.build()?;
assert_fields_eq(&plan, vec!["b"]);

let ctx = ExecutionContext::new();
let optimized_plan = ctx.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be InMemoryScan"),
},
_ => panic!("expect optimized_plan to be projection"),
}

let expected = format!(
"Projection: #{}.b\
\n TableScan: {} projection=Some([1])",
UNNAMED_TABLE, UNNAMED_TABLE
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());

let runtime = ctx.state.lock().runtime_env.clone();
let batches = collect(physical_plan, runtime).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(4, batches[0].num_rows());

Ok(())
}

#[tokio::test]
async fn sort() -> Result<()> {
let results =
Expand Down
1 change: 1 addition & 0 deletions datafusion/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub mod window;

mod explain;
pub mod information_schema;
mod partitioned_csv;
#[cfg_attr(not(feature = "unicode_expressions"), ignore)]
pub mod unicode;

Expand Down
95 changes: 95 additions & 0 deletions datafusion/tests/sql/partitioned_csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utility functions for running with a partitioned csv dataset:
use std::{io::Write, sync::Arc};

use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion::{
error::Result,
prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext},
};
use tempfile::TempDir;

/// Execute SQL and return results
async fn plan_and_collect(
ctx: &mut ExecutionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
ctx.sql(sql).await?.collect().await
}

/// Execute SQL and return results
pub async fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
plan_and_collect(&mut ctx, sql).await
}

/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
file_extension: &str,
) -> Result<SchemaRef> {
// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));

// generate a partitioned file
for partition in 0..partition_count {
let filename = format!("partition-{}.{}", partition, file_extension);
let file_path = tmp_dir.path().join(&filename);
let mut file = std::fs::File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}

Ok(schema)
}

/// Generate a partitioned CSV file and register it with an execution context
pub async fn create_ctx(
tmp_dir: &TempDir,
partition_count: usize,
) -> Result<ExecutionContext> {
let mut ctx =
ExecutionContext::with_config(ExecutionConfig::new().with_target_partitions(8));

let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;

// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

Ok(ctx)
}
Loading

0 comments on commit e52f844

Please sign in to comment.