From 24f4522206f5beb3a560da3a3a608f82033f0adb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jan 2022 09:58:48 -0500 Subject: [PATCH 1/3] Move some tests out of context.rs and into sql --- datafusion/src/execution/context.rs | 165 ---------------------------- datafusion/tests/sql/aggregates.rs | 116 +++++++++++++++++++ datafusion/tests/sql/joins.rs | 51 +++++++++ 3 files changed, 167 insertions(+), 165 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 9cc54dfe1f37..6ed8223f0c52 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -2265,121 +2265,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn aggregate_timestamps_sum() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); - - let results = plan_and_collect( - &mut ctx, - "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t", - ) - .await - .unwrap_err(); - - assert_eq!(results.to_string(), "Error during planning: The function Sum does not support inputs of type Timestamp(Nanosecond, None)."); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_timestamps_count() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); - - let results = plan_and_collect( - &mut ctx, - "SELECT count(nanos), count(micros), count(millis), count(secs) FROM t", - ) - .await - .unwrap(); - - let expected = vec![ - "+----------------+-----------------+-----------------+---------------+", - "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |", - "+----------------+-----------------+-----------------+---------------+", - "| 3 | 3 | 3 | 3 |", - "+----------------+-----------------+-----------------+---------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_timestamps_min() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); - - let results = plan_and_collect( - &mut ctx, - "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t", - ) - .await - .unwrap(); - - let expected = vec![ - "+----------------------------+----------------------------+-------------------------+---------------------+", - "| MIN(t.nanos) | MIN(t.micros) | MIN(t.millis) | MIN(t.secs) |", - "+----------------------------+----------------------------+-------------------------+---------------------+", - "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |", - "+----------------------------+----------------------------+-------------------------+---------------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_timestamps_max() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); - - let results = plan_and_collect( - &mut ctx, - "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t", - ) - .await - .unwrap(); - - let expected = vec![ - "+-------------------------+-------------------------+-------------------------+---------------------+", - "| MAX(t.nanos) | MAX(t.micros) | MAX(t.millis) | MAX(t.secs) |", - "+-------------------------+-------------------------+-------------------------+---------------------+", - "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 |", - "+-------------------------+-------------------------+-------------------------+---------------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn aggregate_timestamps_avg() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); - - let results = plan_and_collect( - &mut ctx, - "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t", - ) - .await - .unwrap_err(); - - assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None)."); - Ok(()) - } - #[tokio::test] async fn aggregate_avg_add() -> Result<()> { let results = execute( @@ -2418,56 +2303,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn join_timestamp() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); - - let expected = vec![ - "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", - "| nanos | micros | millis | secs | name | nanos | micros | millis | secs | name |", - "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", - "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 |", - "| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 |", - "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 |", - "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", - ]; - - let results = plan_and_collect( - &mut ctx, - "SELECT * FROM t as t1 \ - JOIN (SELECT * FROM t) as t2 \ - ON t1.nanos = t2.nanos", - ) - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - let results = plan_and_collect( - &mut ctx, - "SELECT * FROM t as t1 \ - JOIN (SELECT * FROM t) as t2 \ - ON t1.micros = t2.micros", - ) - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - let results = plan_and_collect( - &mut ctx, - "SELECT * FROM t as t1 \ - JOIN (SELECT * FROM t) as t2 \ - ON t1.millis = t2.millis", - ) - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - #[tokio::test] async fn count_basic() -> Result<()> { let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?; diff --git a/datafusion/tests/sql/aggregates.rs b/datafusion/tests/sql/aggregates.rs index 9d72752b091d..91c19801a8e5 100644 --- a/datafusion/tests/sql/aggregates.rs +++ b/datafusion/tests/sql/aggregates.rs @@ -473,3 +473,119 @@ async fn csv_query_array_agg_distinct() -> Result<()> { Ok(()) } + + +#[tokio::test] +async fn aggregate_timestamps_sum() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, 1).await?; + ctx.register_table("t", test::table_with_timestamps()) + .unwrap(); + + let results = plan_and_collect( + &mut ctx, + "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t", + ) + .await + .unwrap_err(); + + assert_eq!(results.to_string(), "Error during planning: The function Sum does not support inputs of type Timestamp(Nanosecond, None)."); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_timestamps_count() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, 1).await?; + ctx.register_table("t", test::table_with_timestamps()) + .unwrap(); + + let results = plan_and_collect( + &mut ctx, + "SELECT count(nanos), count(micros), count(millis), count(secs) FROM t", + ) + .await + .unwrap(); + + let expected = vec![ + "+----------------+-----------------+-----------------+---------------+", + "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |", + "+----------------+-----------------+-----------------+---------------+", + "| 3 | 3 | 3 | 3 |", + "+----------------+-----------------+-----------------+---------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_timestamps_min() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, 1).await?; + ctx.register_table("t", test::table_with_timestamps()) + .unwrap(); + + let results = plan_and_collect( + &mut ctx, + "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t", + ) + .await + .unwrap(); + + let expected = vec![ + "+----------------------------+----------------------------+-------------------------+---------------------+", + "| MIN(t.nanos) | MIN(t.micros) | MIN(t.millis) | MIN(t.secs) |", + "+----------------------------+----------------------------+-------------------------+---------------------+", + "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |", + "+----------------------------+----------------------------+-------------------------+---------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_timestamps_max() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, 1).await?; + ctx.register_table("t", test::table_with_timestamps()) + .unwrap(); + + let results = plan_and_collect( + &mut ctx, + "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t", + ) + .await + .unwrap(); + + let expected = vec![ + "+-------------------------+-------------------------+-------------------------+---------------------+", + "| MAX(t.nanos) | MAX(t.micros) | MAX(t.millis) | MAX(t.secs) |", + "+-------------------------+-------------------------+-------------------------+---------------------+", + "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 |", + "+-------------------------+-------------------------+-------------------------+---------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_timestamps_avg() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, 1).await?; + ctx.register_table("t", test::table_with_timestamps()) + .unwrap(); + + let results = plan_and_collect( + &mut ctx, + "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t", + ) + .await + .unwrap_err(); + + assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None)."); + Ok(()) +} diff --git a/datafusion/tests/sql/joins.rs b/datafusion/tests/sql/joins.rs index 70d824b12e1a..4d2348482602 100644 --- a/datafusion/tests/sql/joins.rs +++ b/datafusion/tests/sql/joins.rs @@ -882,3 +882,54 @@ async fn join_tables_with_duplicated_column_name_not_in_on_constraint() -> Resul assert_batches_eq!(expected, &actual); Ok(()) } + + +#[tokio::test] +async fn join_timestamp() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, 1).await?; + ctx.register_table("t", test::table_with_timestamps()) + .unwrap(); + + let expected = vec![ + "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", + "| nanos | micros | millis | secs | name | nanos | micros | millis | secs | name |", + "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", + "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 |", + "| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 |", + "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 |", + "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", + ]; + + let results = plan_and_collect( + &mut ctx, + "SELECT * FROM t as t1 \ + JOIN (SELECT * FROM t) as t2 \ + ON t1.nanos = t2.nanos", + ) + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); + + let results = plan_and_collect( + &mut ctx, + "SELECT * FROM t as t1 \ + JOIN (SELECT * FROM t) as t2 \ + ON t1.micros = t2.micros", + ) + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); + + let results = plan_and_collect( + &mut ctx, + "SELECT * FROM t as t1 \ + JOIN (SELECT * FROM t) as t2 \ + ON t1.millis = t2.millis", + ) + .await + .unwrap(); + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} From 82dce0f82bf392493456e30f6153842e38c460c8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jan 2022 10:02:18 -0500 Subject: [PATCH 2/3] Move support test out of context.rs and into sql tests --- datafusion/src/test/mod.rs | 87 ----------------------------------- datafusion/tests/sql/mod.rs | 90 +++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 87 deletions(-) diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 844d03188eae..13c4a6548d4d 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -185,14 +185,6 @@ pub fn make_partition(sz: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![arr]).unwrap() } -/// Return a new table provider containing all of the supported timestamp types -pub fn table_with_timestamps() -> Arc { - let batch = make_timestamps(); - let schema = batch.schema(); - let partitions = vec![vec![batch]]; - Arc::new(MemTable::try_new(schema, partitions).unwrap()) -} - /// Return a new table which provide this decimal column pub fn table_with_decimal() -> Arc { let batch_decimal = make_decimal(); @@ -214,85 +206,6 @@ fn make_decimal() -> RecordBatch { RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap() } -/// Return record batch with all of the supported timestamp types -/// values -/// -/// Columns are named: -/// "nanos" --> TimestampNanosecondArray -/// "micros" --> TimestampMicrosecondArray -/// "millis" --> TimestampMillisecondArray -/// "secs" --> TimestampSecondArray -/// "names" --> StringArray -pub fn make_timestamps() -> RecordBatch { - let ts_strings = vec![ - Some("2018-11-13T17:11:10.011375885995"), - Some("2011-12-13T11:13:10.12345"), - None, - Some("2021-1-1T05:11:10.432"), - ]; - - let ts_nanos = ts_strings - .into_iter() - .map(|t| { - t.map(|t| { - t.parse::() - .unwrap() - .timestamp_nanos() - }) - }) - .collect::>(); - - let ts_micros = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) - .collect::>(); - - let ts_millis = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) - .collect::>(); - - let ts_secs = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) - .collect::>(); - - let names = ts_nanos - .iter() - .enumerate() - .map(|(i, _)| format!("Row {}", i)) - .collect::>(); - - let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None); - let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None); - let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None); - let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None); - - let names = names.iter().map(|s| s.as_str()).collect::>(); - let arr_names = StringArray::from(names); - - let schema = Schema::new(vec![ - Field::new("nanos", arr_nanos.data_type().clone(), true), - Field::new("micros", arr_micros.data_type().clone(), true), - Field::new("millis", arr_millis.data_type().clone(), true), - Field::new("secs", arr_secs.data_type().clone(), true), - Field::new("name", arr_names.data_type().clone(), true), - ]); - let schema = Arc::new(schema); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(arr_nanos), - Arc::new(arr_micros), - Arc::new(arr_millis), - Arc::new(arr_secs), - Arc::new(arr_names), - ], - ) - .unwrap() -} - /// Asserts that given future is pending. pub fn assert_is_pending<'a, T>(fut: &mut Pin + Send + 'a>>) { let waker = futures::task::noop_waker(); diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index f2496c36814b..e3f95e67c0e0 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -734,6 +734,96 @@ fn normalize_vec_for_explain(v: Vec>) -> Vec> { .collect::>() } + +/// Return a new table provider containing all of the supported timestamp types +pub fn table_with_timestamps() -> Arc { + let batch = make_timestamps(); + let schema = batch.schema(); + let partitions = vec![vec![batch]]; + Arc::new(MemTable::try_new(schema, partitions).unwrap()) +} + + +/// Return record batch with all of the supported timestamp types +/// values +/// +/// Columns are named: +/// "nanos" --> TimestampNanosecondArray +/// "micros" --> TimestampMicrosecondArray +/// "millis" --> TimestampMillisecondArray +/// "secs" --> TimestampSecondArray +/// "names" --> StringArray +pub fn make_timestamps() -> RecordBatch { + let ts_strings = vec![ + Some("2018-11-13T17:11:10.011375885995"), + Some("2011-12-13T11:13:10.12345"), + None, + Some("2021-1-1T05:11:10.432"), + ]; + + let ts_nanos = ts_strings + .into_iter() + .map(|t| { + t.map(|t| { + t.parse::() + .unwrap() + .timestamp_nanos() + }) + }) + .collect::>(); + + let ts_micros = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) + .collect::>(); + + let ts_millis = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) + .collect::>(); + + let ts_secs = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) + .collect::>(); + + let names = ts_nanos + .iter() + .enumerate() + .map(|(i, _)| format!("Row {}", i)) + .collect::>(); + + let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None); + let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None); + let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None); + let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None); + + let names = names.iter().map(|s| s.as_str()).collect::>(); + let arr_names = StringArray::from(names); + + let schema = Schema::new(vec![ + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("secs", arr_secs.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(arr_nanos), + Arc::new(arr_micros), + Arc::new(arr_millis), + Arc::new(arr_secs), + Arc::new(arr_names), + ], + ) + .unwrap() +} + + #[tokio::test] async fn nyc() -> Result<()> { // schema for nyxtaxi csv files From c8347f15ee7385b2bff33ebd90399ed472876958 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jan 2022 10:10:10 -0500 Subject: [PATCH 3/3] Fixup tests and make them compile --- datafusion/src/test/mod.rs | 5 +-- datafusion/tests/sql/aggregates.rs | 54 +++++++++++------------------- datafusion/tests/sql/joins.rs | 25 ++++++-------- datafusion/tests/sql/mod.rs | 14 +++++--- 4 files changed, 41 insertions(+), 57 deletions(-) diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 13c4a6548d4d..497bfe59e1a1 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -22,10 +22,7 @@ use crate::datasource::{MemTable, PartitionedFile, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; -use array::{ - Array, ArrayRef, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; +use array::{Array, ArrayRef}; use arrow::array::{self, DecimalBuilder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; diff --git a/datafusion/tests/sql/aggregates.rs b/datafusion/tests/sql/aggregates.rs index 91c19801a8e5..2d4287054388 100644 --- a/datafusion/tests/sql/aggregates.rs +++ b/datafusion/tests/sql/aggregates.rs @@ -474,20 +474,17 @@ async fn csv_query_array_agg_distinct() -> Result<()> { Ok(()) } - #[tokio::test] async fn aggregate_timestamps_sum() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", table_with_timestamps()).unwrap(); let results = plan_and_collect( &mut ctx, "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t", ) - .await - .unwrap_err(); + .await + .unwrap_err(); assert_eq!(results.to_string(), "Error during planning: The function Sum does not support inputs of type Timestamp(Nanosecond, None)."); @@ -496,17 +493,14 @@ async fn aggregate_timestamps_sum() -> Result<()> { #[tokio::test] async fn aggregate_timestamps_count() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", table_with_timestamps()).unwrap(); - let results = plan_and_collect( + let results = execute_to_batches( &mut ctx, "SELECT count(nanos), count(micros), count(millis), count(secs) FROM t", ) - .await - .unwrap(); + .await; let expected = vec![ "+----------------+-----------------+-----------------+---------------+", @@ -522,17 +516,14 @@ async fn aggregate_timestamps_count() -> Result<()> { #[tokio::test] async fn aggregate_timestamps_min() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", table_with_timestamps()).unwrap(); - let results = plan_and_collect( + let results = execute_to_batches( &mut ctx, "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t", ) - .await - .unwrap(); + .await; let expected = vec![ "+----------------------------+----------------------------+-------------------------+---------------------+", @@ -548,17 +539,14 @@ async fn aggregate_timestamps_min() -> Result<()> { #[tokio::test] async fn aggregate_timestamps_max() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", table_with_timestamps()).unwrap(); - let results = plan_and_collect( + let results = execute_to_batches( &mut ctx, "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t", ) - .await - .unwrap(); + .await; let expected = vec![ "+-------------------------+-------------------------+-------------------------+---------------------+", @@ -574,17 +562,15 @@ async fn aggregate_timestamps_max() -> Result<()> { #[tokio::test] async fn aggregate_timestamps_avg() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", table_with_timestamps()).unwrap(); let results = plan_and_collect( &mut ctx, "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t", ) - .await - .unwrap_err(); + .await + .unwrap_err(); assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None)."); Ok(()) diff --git a/datafusion/tests/sql/joins.rs b/datafusion/tests/sql/joins.rs index 4d2348482602..04436ed460b1 100644 --- a/datafusion/tests/sql/joins.rs +++ b/datafusion/tests/sql/joins.rs @@ -883,13 +883,10 @@ async fn join_tables_with_duplicated_column_name_not_in_on_constraint() -> Resul Ok(()) } - #[tokio::test] async fn join_timestamp() -> Result<()> { - let tmp_dir = TempDir::new()?; - let mut ctx = create_ctx(&tmp_dir, 1).await?; - ctx.register_table("t", test::table_with_timestamps()) - .unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", table_with_timestamps()).unwrap(); let expected = vec![ "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", @@ -901,34 +898,34 @@ async fn join_timestamp() -> Result<()> { "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", ]; - let results = plan_and_collect( + let results = execute_to_batches( &mut ctx, "SELECT * FROM t as t1 \ JOIN (SELECT * FROM t) as t2 \ ON t1.nanos = t2.nanos", ) - .await - .unwrap(); + .await; + assert_batches_sorted_eq!(expected, &results); - let results = plan_and_collect( + let results = execute_to_batches( &mut ctx, "SELECT * FROM t as t1 \ JOIN (SELECT * FROM t) as t2 \ ON t1.micros = t2.micros", ) - .await - .unwrap(); + .await; + assert_batches_sorted_eq!(expected, &results); - let results = plan_and_collect( + let results = execute_to_batches( &mut ctx, "SELECT * FROM t as t1 \ JOIN (SELECT * FROM t) as t2 \ ON t1.millis = t2.millis", ) - .await - .unwrap(); + .await; + assert_batches_sorted_eq!(expected, &results); Ok(()) diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index e3f95e67c0e0..90fe5138ac44 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -521,8 +521,15 @@ async fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> { Ok(()) } -/// Execute query and return result set as 2-d table of Vecs -/// `result[row][column]` +/// Execute SQL and return results as a RecordBatch +async fn plan_and_collect( + ctx: &mut ExecutionContext, + sql: &str, +) -> Result> { + ctx.sql(sql).await?.collect().await +} + +/// Execute query and return results as a Vec of RecordBatches async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx.create_logical_plan(sql).expect(&msg); @@ -734,7 +741,6 @@ fn normalize_vec_for_explain(v: Vec>) -> Vec> { .collect::>() } - /// Return a new table provider containing all of the supported timestamp types pub fn table_with_timestamps() -> Arc { let batch = make_timestamps(); @@ -743,7 +749,6 @@ pub fn table_with_timestamps() -> Arc { Arc::new(MemTable::try_new(schema, partitions).unwrap()) } - /// Return record batch with all of the supported timestamp types /// values /// @@ -823,7 +828,6 @@ pub fn make_timestamps() -> RecordBatch { .unwrap() } - #[tokio::test] async fn nyc() -> Result<()> { // schema for nyxtaxi csv files