From 78fa62010b5902adb50fa4cbd6376ff47fa6f619 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 6 Jun 2021 17:40:41 +0000 Subject: [PATCH] Brought back parquet options and stream write. --- CHANGELOG.md | 2 +- README.md | 8 +- ballista/rust/core/Cargo.toml | 4 +- .../core/src/serde/logical_plan/from_proto.rs | 2 +- .../rust/core/src/serde/logical_plan/mod.rs | 2 +- .../core/src/serde/logical_plan/to_proto.rs | 2 +- .../rust/core/src/serde/physical_plan/mod.rs | 4 +- ballista/rust/core/src/serde/scheduler/mod.rs | 4 +- benchmarks/src/bin/nyctaxi.rs | 4 +- benchmarks/src/bin/tpch.rs | 34 ++++++-- datafusion-cli/src/lib.rs | 2 +- datafusion-cli/src/print_format.rs | 8 +- datafusion-examples/examples/dataframe.rs | 2 +- datafusion-examples/examples/flight_server.rs | 2 +- datafusion/Cargo.toml | 4 +- datafusion/benches/aggregate_query_sql.rs | 2 +- datafusion/benches/filter_query_sql.rs | 2 +- datafusion/benches/math_query_sql.rs | 2 +- datafusion/benches/sort_limit_query_sql.rs | 2 +- datafusion/src/catalog/information_schema.rs | 2 +- datafusion/src/dataframe.rs | 2 +- datafusion/src/datasource/csv.rs | 8 +- datafusion/src/datasource/datasource.rs | 4 +- datafusion/src/datasource/empty.rs | 4 +- datafusion/src/datasource/json.rs | 6 +- datafusion/src/datasource/memory.rs | 10 +-- datafusion/src/datasource/parquet.rs | 8 +- datafusion/src/error.rs | 4 +- datafusion/src/execution/context.rs | 84 ++++++++++--------- datafusion/src/execution/dataframe_impl.rs | 4 +- datafusion/src/lib.rs | 24 +++--- datafusion/src/logical_plan/builder.rs | 7 +- datafusion/src/logical_plan/dfschema.rs | 6 +- datafusion/src/logical_plan/display.rs | 6 +- datafusion/src/logical_plan/expr.rs | 18 ++-- datafusion/src/logical_plan/plan.rs | 12 ++- datafusion/src/optimizer/constant_folding.rs | 6 +- datafusion/src/optimizer/filter_push_down.rs | 10 +-- .../src/optimizer/hash_build_probe_order.rs | 4 +- .../src/optimizer/projection_push_down.rs | 6 +- datafusion/src/optimizer/utils.rs | 4 +- .../src/physical_optimizer/repartition.rs | 2 +- datafusion/src/physical_plan/aggregates.rs | 2 +- .../src/physical_plan/array_expressions.rs | 6 +- .../src/physical_plan/coalesce_batches.rs | 13 ++- datafusion/src/physical_plan/common.rs | 7 +- datafusion/src/physical_plan/cross_join.rs | 8 +- .../src/physical_plan/crypto_expressions.rs | 2 +- datafusion/src/physical_plan/csv.rs | 10 +-- .../src/physical_plan/datetime_expressions.rs | 10 +-- .../src/physical_plan/distinct_expressions.rs | 12 +-- datafusion/src/physical_plan/empty.rs | 8 +- datafusion/src/physical_plan/explain.rs | 4 +- .../src/physical_plan/expressions/average.rs | 15 ++-- .../src/physical_plan/expressions/binary.rs | 14 ++-- .../src/physical_plan/expressions/case.rs | 14 ++-- .../src/physical_plan/expressions/cast.rs | 8 +- .../src/physical_plan/expressions/coercion.rs | 8 +- .../src/physical_plan/expressions/column.rs | 2 +- .../src/physical_plan/expressions/count.rs | 10 +-- .../src/physical_plan/expressions/in_list.rs | 8 +- .../physical_plan/expressions/is_not_null.rs | 6 +- .../src/physical_plan/expressions/is_null.rs | 6 +- .../src/physical_plan/expressions/literal.rs | 6 +- .../src/physical_plan/expressions/min_max.rs | 9 +- .../src/physical_plan/expressions/mod.rs | 8 +- .../src/physical_plan/expressions/negative.rs | 4 +- .../src/physical_plan/expressions/not.rs | 10 +-- .../src/physical_plan/expressions/nullif.rs | 6 +- .../physical_plan/expressions/row_number.rs | 12 ++- .../src/physical_plan/expressions/sum.rs | 14 ++-- .../src/physical_plan/expressions/try_cast.rs | 8 +- datafusion/src/physical_plan/filter.rs | 12 ++- datafusion/src/physical_plan/functions.rs | 8 +- .../src/physical_plan/hash_aggregate.rs | 21 ++--- datafusion/src/physical_plan/hash_join.rs | 11 ++- datafusion/src/physical_plan/hash_utils.rs | 2 +- datafusion/src/physical_plan/json.rs | 17 ++-- datafusion/src/physical_plan/limit.rs | 13 ++- .../src/physical_plan/math_expressions.rs | 8 +- datafusion/src/physical_plan/memory.rs | 7 +- datafusion/src/physical_plan/merge.rs | 8 +- datafusion/src/physical_plan/mod.rs | 13 ++- datafusion/src/physical_plan/parquet.rs | 6 +- datafusion/src/physical_plan/planner.rs | 10 +-- datafusion/src/physical_plan/projection.rs | 8 +- .../src/physical_plan/regex_expressions.rs | 6 +- datafusion/src/physical_plan/repartition.rs | 9 +- datafusion/src/physical_plan/sort.rs | 19 ++--- .../src/physical_plan/string_expressions.rs | 8 +- datafusion/src/physical_plan/type_coercion.rs | 4 +- datafusion/src/physical_plan/udaf.rs | 2 +- datafusion/src/physical_plan/udf.rs | 2 +- .../src/physical_plan/unicode_expressions.rs | 4 +- datafusion/src/physical_plan/union.rs | 5 +- .../src/physical_plan/window_functions.rs | 2 +- datafusion/src/physical_plan/windows.rs | 9 +- datafusion/src/scalar.rs | 11 ++- datafusion/src/sql/planner.rs | 4 +- datafusion/src/test/exec.rs | 7 +- datafusion/src/test/mod.rs | 10 +-- datafusion/tests/custom_sources.rs | 8 +- datafusion/tests/dataframe.rs | 4 +- datafusion/tests/provider_filter_pushdown.rs | 6 +- datafusion/tests/sql.rs | 6 +- datafusion/tests/user_defined_plan.rs | 2 +- 106 files changed, 395 insertions(+), 460 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ed715d7f4fc4..7f5e9ad779427 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5179,7 +5179,7 @@ * [ARROW-4384](https://issues.apache.org/jira/browse/ARROW-4384) - [C++] Running "format" target on new Windows 10 install opens "how do you want to open this file" dialog * [ARROW-4385](https://issues.apache.org/jira/browse/ARROW-4385) - [Python] default\_version of a release should not include SNAPSHOT * [ARROW-4389](https://issues.apache.org/jira/browse/ARROW-4389) - [R] Installing clang-tools in CI is failing on trusty -* [ARROW-4395](https://issues.apache.org/jira/browse/ARROW-4395) - ts-node throws type error running \`bin/arrow2csv.js\` +* [ARROW-4395](https://issues.apache.org/jira/browse/ARROW-4395) - ts-node throws type error running \`bin/arrowcsv.js\` * [ARROW-4400](https://issues.apache.org/jira/browse/ARROW-4400) - [CI] install of clang tools failing * [ARROW-4403](https://issues.apache.org/jira/browse/ARROW-4403) - [Rust] CI fails due to formatting errors * [ARROW-4404](https://issues.apache.org/jira/browse/ARROW-4404) - [CI] AppVeyor toolchain build does not build anything diff --git a/README.md b/README.md index a3ffd4106f37e..f3ae412fde940 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,8 @@ Run a SQL query against data stored in a CSV: ```rust use datafusion::prelude::*; -use arrow2::util::pretty::print_batches; -use arrow2::record_batch::RecordBatch; +use arrow::util::pretty::print_batches; +use arrow::record_batch::RecordBatch; #[tokio::main] async fn main() -> datafusion::error::Result<()> { @@ -92,8 +92,8 @@ Use the DataFrame API to process data stored in a CSV: ```rust use datafusion::prelude::*; -use arrow2::util::pretty::print_batches; -use arrow2::record_batch::RecordBatch; +use arrow::util::pretty::print_batches; +use arrow::record_batch::RecordBatch; #[tokio::main] async fn main() -> datafusion::error::Result<()> { diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 70e612b1f455e..25f326c01b08e 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -40,8 +40,8 @@ tokio = "1.0" tonic = "0.4" uuid = { version = "0.8", features = ["v4"] } -arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" } -arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" } +arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" } datafusion = { path = "../../../datafusion" } diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index ecd1c480b67dc..ac58e5955d9dd 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -26,7 +26,7 @@ use std::{ unimplemented, }; -use datafusion::arrow2::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::{ abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin, sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 7257650b545fb..dfad7c5be8802 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -25,7 +25,7 @@ mod roundtrip_tests { use super::super::{super::error::Result, protobuf}; use crate::error::BallistaError; use core::panic; - use datafusion::arrow2::datatypes::{DataType, Field, Schema, TimeUnit}; + use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::physical_plan::functions::BuiltinScalarFunction::Sqrt; use datafusion::{ logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder}, diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 884f81b93b241..5b1dc7abafc12 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -26,7 +26,7 @@ use std::{ use crate::datasource::DfTableAdapter; use crate::serde::{protobuf, BallistaError}; -use datafusion::arrow2::datatypes::{DataType, Schema}; +use datafusion::arrow::datatypes::{DataType, Schema}; use datafusion::datasource::CsvFile; use datafusion::logical_plan::{Expr, JoinType, LogicalPlan}; use datafusion::physical_plan::aggregates::AggregateFunction; diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index a2eb3e5176d58..b9330217898b7 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -23,7 +23,7 @@ mod roundtrip_tests { use datafusion::physical_plan::hash_utils::JoinType; use std::{convert::TryInto, sync::Arc}; - use datafusion::arrow2::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::physical_plan::ColumnarValue; use datafusion::physical_plan::{ empty::EmptyExec, @@ -146,7 +146,7 @@ mod roundtrip_tests { #[test] fn roundtrip_sort() -> Result<()> { - use datafusion::arrow2::compute::sort::SortOptions; + use datafusion::arrow::compute::sort::SortOptions; let field_a = Field::new("a", DataType::Boolean, false); let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 2b9b6b91e56f4..9ccea71c91946 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -17,8 +17,8 @@ use std::{collections::HashMap, sync::Arc}; -use datafusion::arrow2::array::*; -use datafusion::arrow2::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::array::*; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; use serde::Serialize; diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index 88b6d0f2cfd5d..731e81cb4ac86 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -22,8 +22,8 @@ use std::path::PathBuf; use std::process; use std::time::Instant; -use datafusion::arrow2::datatypes::{DataType, Field, Schema}; -use datafusion::arrow2::io::print; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::io::print; use datafusion::error::Result; use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 711c980d305bc..ee4eb21a19c23 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -30,10 +30,10 @@ use futures::StreamExt; //use ballista::context::BallistaContext; -use datafusion::arrow2::datatypes::{DataType, Field, Schema}; -use datafusion::arrow2::io::parquet::write::CompressionCodec; -use datafusion::arrow2::io::print; -use datafusion::arrow2::record_batch::RecordBatch; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::io::parquet::write::{CompressionCodec, WriteOptions}; +use datafusion::arrow::io::print; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::parquet::ParquetTable; use datafusion::datasource::{CsvFile, MemTable, TableProvider}; @@ -406,7 +406,29 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { ); match opt.file_format.as_str() { "csv" => ctx.write_csv(csv, output_path).await?, - "parquet" => ctx.write_parquet(csv, output_path).await?, + "parquet" => { + let compression = match opt.compression.as_str() { + "none" => CompressionCodec::Uncompressed, + "snappy" => CompressionCodec::Snappy, + "brotli" => CompressionCodec::Brotli, + "gzip" => CompressionCodec::Gzip, + "lz4" => CompressionCodec::Lz4, + "lz0" => CompressionCodec::Lzo, + "zstd" => CompressionCodec::Zstd, + other => { + return Err(DataFusionError::NotImplemented(format!( + "Invalid compression format: {}", + other + ))) + } + }; + + let options = WriteOptions { + compression, + write_statistics: false, + }; + ctx.write_parquet(csv, options, output_path).await? + } other => { return Err(DataFusionError::NotImplemented(format!( "Invalid output format: {}", @@ -557,7 +579,7 @@ mod tests { use std::env; use std::sync::Arc; - use datafusion::arrow2::array::*; + use datafusion::arrow::array::*; use datafusion::logical_plan::Expr; use datafusion::logical_plan::Expr::Cast; diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs index b15fb89fa26a1..5bd16e333030c 100644 --- a/datafusion-cli/src/lib.rs +++ b/datafusion-cli/src/lib.rs @@ -16,7 +16,7 @@ // under the License. pub mod print_format; -use datafusion::arrow2::record_batch::RecordBatch; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result; use print_format::PrintFormat; use std::time::Instant; diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 551bcf854c00d..86461d4fe6883 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -16,8 +16,8 @@ // under the License. //! Print format variants -use datafusion::arrow2::io::{csv::write, json::Writer, print}; -use datafusion::arrow2::record_batch::RecordBatch; +use datafusion::arrow::io::{csv::write, json::Writer, print}; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use std::fmt; use std::str::FromStr; @@ -115,8 +115,8 @@ impl PrintFormat { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow2::array::Int32Array; - use datafusion::arrow2::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use std::sync::Arc; #[test] diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 0d4cc51b2bb9a..8df2ce4b84d89 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::arrow2::io::print; +use datafusion::arrow::io::print; use datafusion::error::Result; use datafusion::prelude::*; diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 8425d08f1a459..5ad5253c43f4d 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -67,7 +67,7 @@ impl FlightService for FlightServiceImpl { let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap(); let options = - datafusion::arrow2::io::ipc::write::common::IpcWriteOptions::default(); + datafusion::arrow::io::ipc::write::common::IpcWriteOptions::default(); let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema( table.schema().as_ref(), &options, diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index d1073171008bc..f45df094d3f01 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -41,12 +41,12 @@ default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] crypto_expressions = ["md-5", "sha2"] regex_expressions = ["regex", "lazy_static"] unicode_expressions = ["unicode-segmentation"] -simd = ["arrow2/simd"] +simd = ["arrow/simd"] [dependencies] ahash = "0.7" hashbrown = "0.11" -arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "5e82a98787934829409bff249b84d437707cdde5" } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" } sqlparser = "0.9.0" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/benches/aggregate_query_sql.rs b/datafusion/benches/aggregate_query_sql.rs index b1ee06f526b18..73e2b93455b83 100644 --- a/datafusion/benches/aggregate_query_sql.rs +++ b/datafusion/benches/aggregate_query_sql.rs @@ -23,7 +23,7 @@ use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; -use arrow2::{ +use arrow::{ array::*, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, diff --git a/datafusion/benches/filter_query_sql.rs b/datafusion/benches/filter_query_sql.rs index fd7aff6a87a3f..607e1b642e412 100644 --- a/datafusion/benches/filter_query_sql.rs +++ b/datafusion/benches/filter_query_sql.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow2::{ +use arrow::{ array::{Float32Array, Float64Array}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, diff --git a/datafusion/benches/math_query_sql.rs b/datafusion/benches/math_query_sql.rs index 54a663c8bc33f..5bbc4038ef936 100644 --- a/datafusion/benches/math_query_sql.rs +++ b/datafusion/benches/math_query_sql.rs @@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; -use arrow2::{ +use arrow::{ array::{Float32Array, Float64Array}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index 0b4821f418a04..fd389d62be931 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -21,7 +21,7 @@ use criterion::Criterion; use std::sync::{Arc, Mutex}; -use arrow2::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema}; use datafusion::datasource::{CsvFile, CsvReadOptions, MemTable}; use datafusion::execution::context::ExecutionContext; diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs index 4a58698e0083d..770ab4209836a 100644 --- a/datafusion/src/catalog/information_schema.rs +++ b/datafusion/src/catalog/information_schema.rs @@ -21,7 +21,7 @@ use std::{any, sync::Arc}; -use arrow2::{ +use arrow::{ array::*, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index c244b2d1d71ea..9c7c2ef96d6be 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -17,7 +17,7 @@ //! DataFrame API for building and executing query plans. -use crate::arrow2::record_batch::RecordBatch; +use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::logical_plan::{ DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning, diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index 4dc2c327f051f..c4331d7148b80 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -38,8 +38,8 @@ use std::io::{Read, Seek}; use std::string::String; use std::sync::{Arc, Mutex}; -use arrow2::datatypes::Schema; -use arrow2::io::csv::read as csv_read; +use arrow::datatypes::SchemaRef; +use arrow::io::csv::read as csv_read; use crate::datasource::datasource::Statistics; use crate::datasource::{Source, TableProvider}; @@ -49,8 +49,6 @@ use crate::physical_plan::csv::CsvExec; pub use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::{common, ExecutionPlan}; -type SchemaRef = Arc; - /// Represents a CSV file with a provided schema pub struct CsvFile { source: Source, @@ -225,7 +223,7 @@ mod tests { use super::*; use crate::prelude::*; - use arrow2::array::*; + use arrow::array::*; #[tokio::test] async fn csv_file_from_reader() -> Result<()> { diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs index 0c8065ae6f074..0349a49e491ba 100644 --- a/datafusion/src/datasource/datasource.rs +++ b/datafusion/src/datasource/datasource.rs @@ -23,9 +23,7 @@ use std::sync::Arc; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::ExecutionPlan; -use crate::{arrow2::datatypes::Schema, scalar::ScalarValue}; - -type SchemaRef = Arc; +use crate::{arrow::datatypes::SchemaRef, scalar::ScalarValue}; /// This table statistics are estimates. /// It can not be used directly in the precise compute diff --git a/datafusion/src/datasource/empty.rs b/datafusion/src/datasource/empty.rs index e0033f29df2e1..e6140cdb8de69 100644 --- a/datafusion/src/datasource/empty.rs +++ b/datafusion/src/datasource/empty.rs @@ -20,9 +20,7 @@ use std::any::Any; use std::sync::Arc; -use arrow2::datatypes::*; - -type SchemaRef = Arc; +use arrow::datatypes::*; use crate::datasource::datasource::Statistics; use crate::datasource::TableProvider; diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs index 8912603f3312f..1e095194ce8cd 100644 --- a/datafusion/src/datasource/json.rs +++ b/datafusion/src/datasource/json.rs @@ -35,8 +35,8 @@ use crate::{ ExecutionPlan, }, }; -use arrow2::datatypes::Schema; -use arrow2::io::json::infer_json_schema_from_seekable; +use arrow::datatypes::Schema; +use arrow::io::json::infer_json_schema_from_seekable; use super::datasource::Statistics; @@ -181,7 +181,7 @@ mod tests { batches[0] .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() .value(0), 100000000000011 diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs index 7131c33e105aa..6a1ac53cc52dc 100644 --- a/datafusion/src/datasource/memory.rs +++ b/datafusion/src/datasource/memory.rs @@ -24,10 +24,8 @@ use log::debug; use std::any::Any; use std::sync::Arc; -use arrow2::datatypes::{Field, Schema}; -use arrow2::record_batch::RecordBatch; - -type SchemaRef = Arc; +use arrow::datatypes::{Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -240,8 +238,8 @@ impl TableProvider for MemTable { #[cfg(test)] mod tests { use super::*; - use arrow2::array::Int32Array; - use arrow2::datatypes::{DataType, Field, Schema}; + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; use futures::StreamExt; use std::collections::HashMap; diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index d1d1c2adb991b..0e0c782f307a5 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::string::String; use std::sync::Arc; -use arrow2::datatypes::*; +use arrow::datatypes::*; use crate::datasource::datasource::Statistics; use crate::datasource::TableProvider; @@ -32,8 +32,6 @@ use crate::physical_plan::ExecutionPlan; use super::datasource::TableProviderFilterPushDown; -type SchemaRef = Arc; - /// Table-based representation of a `ParquetFile`. pub struct ParquetTable { path: String, @@ -105,8 +103,8 @@ impl TableProvider for ParquetTable { #[cfg(test)] mod tests { use super::*; - use arrow2::array::*; - use arrow2::record_batch::RecordBatch; + use arrow::array::*; + use arrow::record_batch::RecordBatch; use futures::StreamExt; #[tokio::test] diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs index b7b656733fde0..a229198e9daea 100644 --- a/datafusion/src/error.rs +++ b/datafusion/src/error.rs @@ -22,7 +22,7 @@ use std::fmt::{Display, Formatter}; use std::io; use std::result; -use arrow2::error::ArrowError; +use arrow::error::ArrowError; use sqlparser::parser::ParserError; /// Result type for operations that could result in an [DataFusionError] @@ -56,7 +56,7 @@ pub enum DataFusionError { } impl DataFusionError { - /// Wraps this [DataFusionError] as an [arrow2::error::ArrowError]. + /// Wraps this [DataFusionError] as an [arrow::error::ArrowError]. pub fn into_arrow_external_error(self) -> ArrowError { ArrowError::from_external_error(Box::new(self)) } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 98884af579f40..1266712c29c37 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -39,9 +39,10 @@ use std::{ use futures::{StreamExt, TryStreamExt}; use tokio::task::{self, JoinHandle}; -use arrow2::error::{ArrowError, Result as ArrowResult}; -use arrow2::io::csv::write as csv_write; -use arrow2::io::parquet::write; +use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::io::csv::write as csv_write; +use arrow::io::parquet::write; +use arrow::record_batch::RecordBatch; use crate::catalog::{ catalog::{CatalogProvider, MemoryCatalogProvider}, @@ -536,6 +537,7 @@ impl ExecutionContext { &self, plan: Arc, path: String, + options: write::WriteOptions, ) -> Result<()> { // create directory to contain the Parquet files (one per partition) let fs_path = Path::new(&path); @@ -551,38 +553,39 @@ impl ExecutionContext { let mut file = fs::File::create(path)?; let stream = plan.execute(i).await?; - let compression = write::CompressionCodec::Uncompressed; - let handle: JoinHandle> = task::spawn(async move { - let parquet_types = schema - .fields() - .iter() - .map(write::to_parquet_type) - .collect::>>()?; - - // do not do this. - let batches = - crate::physical_plan::common::collect(stream).await?; - - let groups = batches.iter().map(|batch| { - Ok(batch.columns().iter().zip(parquet_types.iter()).map( - |(array, type_)| { - Ok(std::iter::once(write::array_to_page( - array.as_ref(), - type_, - compression, - ))) - }, - )) + let parquet_schema = write::to_parquet_schema(&schema)?; + + let a = parquet_schema.clone(); + let stream = stream.map(|batch: ArrowResult| { + batch.map(|batch| { + let columns = batch.columns().to_vec(); + write::DynIter::new( + columns + .into_iter() + .zip(a.columns().to_vec().into_iter()) + .map(|(array, type_)| { + Ok(write::DynIter::new(std::iter::once( + write::array_to_page( + array.as_ref(), + type_, + options, + ), + ))) + }), + ) + }) }); - Ok(write::write_file( + Ok(write::stream::write_stream( &mut file, - groups, + stream, schema.as_ref(), - compression, + parquet_schema, + options, None, - )?) + ) + .await?) }); tasks.push(handle); } @@ -936,18 +939,15 @@ mod tests { logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator, }; - use arrow2::array::*; - use arrow2::datatypes::*; - use arrow2::record_batch::RecordBatch; + use arrow::array::*; + use arrow::datatypes::*; + use arrow::record_batch::RecordBatch; use std::fs::File; use std::thread::{self, JoinHandle}; use std::{io::prelude::*, sync::Mutex}; use tempfile::TempDir; use test::*; - type ArrayRef = Arc; - type SchemaRef = Arc; - #[tokio::test] async fn parallel_projection() -> Result<()> { let partition_count = 4; @@ -2463,10 +2463,7 @@ mod tests { .as_any() .downcast_ref::() .expect("cast failed"); - Ok( - Arc::new(arrow2::compute::arithmetics::basic::add::add(l, r)?) - as ArrayRef, - ) + Ok(Arc::new(arrow::compute::arithmetics::basic::add::add(l, r)?) as ArrayRef) }; let myfunc = make_scalar_function(myfunc); @@ -3312,7 +3309,14 @@ mod tests { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; - ctx.write_parquet(physical_plan, out_dir.to_string()).await + + let options = write::WriteOptions { + compression: write::CompressionCodec::Uncompressed, + write_statistics: false, + }; + + ctx.write_parquet(physical_plan, out_dir.to_string(), options) + .await } /// Generate CSV partitions within the supplied directory diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 02d8522e275bb..d5ba3f8b609fc 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -19,7 +19,7 @@ use std::sync::{Arc, Mutex}; -use crate::arrow2::record_batch::RecordBatch; +use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ @@ -184,7 +184,7 @@ mod tests { use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext}; use crate::{datasource::csv::CsvReadOptions, physical_plan::ColumnarValue}; use crate::{physical_plan::functions::ScalarFunctionImplementation, test}; - use arrow2::datatypes::DataType; + use arrow::datatypes::DataType; #[test] fn select_columns() -> Result<()> { diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index ced5a9be6ba3c..17de8e2682dd1 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -39,7 +39,7 @@ //! ```rust //! # use datafusion::prelude::*; //! # use datafusion::error::Result; -//! # use arrow2::record_batch::RecordBatch; +//! # use arrow::record_batch::RecordBatch; //! //! # #[tokio::main] //! # async fn main() -> Result<()> { @@ -57,7 +57,7 @@ //! let results: Vec = df.collect().await?; //! //! // format the results -//! let pretty_results = arrow2::util::pretty::pretty_format_batches(&results)?; +//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?; //! //! let expected = vec![ //! "+---+--------+", @@ -77,7 +77,7 @@ //! ``` //! # use datafusion::prelude::*; //! # use datafusion::error::Result; -//! # use arrow2::record_batch::RecordBatch; +//! # use arrow::record_batch::RecordBatch; //! //! # #[tokio::main] //! # async fn main() -> Result<()> { @@ -92,7 +92,7 @@ //! let results: Vec = df.collect().await?; //! //! // format the results -//! let pretty_results = arrow2::util::pretty::pretty_format_batches(&results)?; +//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?; //! //! let expected = vec![ //! "+---+--------+", @@ -130,7 +130,7 @@ //! ### Logical plan //! //! Logical planning yields [`logical plans`](logical_plan::LogicalPlan) and [`logical expressions`](logical_plan::Expr). -//! These are [`Schema`](arrow2::datatypes::Schema)-aware traits that represent statements whose result is independent of how it should physically be executed. +//! These are [`Schema`](arrow::datatypes::Schema)-aware traits that represent statements whose result is independent of how it should physically be executed. //! //! A [`LogicalPlan`](logical_plan::LogicalPlan) is a Direct Asyclic graph of other [`LogicalPlan`s](logical_plan::LogicalPlan) and each node contains logical expressions ([`Expr`s](logical_plan::Expr)). //! All of these are located in [`logical_plan`](logical_plan). @@ -152,12 +152,12 @@ //! Broadly speaking, //! //! * an [`ExecutionPlan`](physical_plan::ExecutionPlan) receives a partition number and asyncronosly returns -//! an iterator over [`RecordBatch`](arrow2::record_batch::RecordBatch) -//! (a node-specific struct that implements [`RecordBatchReader`](arrow2::record_batch::RecordBatchReader)) -//! * a [`PhysicalExpr`](physical_plan::PhysicalExpr) receives a [`RecordBatch`](arrow2::record_batch::RecordBatch) -//! and returns an [`Array`](arrow2::array::Array) -//! * an [`AggregateExpr`](physical_plan::AggregateExpr) receives [`RecordBatch`es](arrow2::record_batch::RecordBatch) -//! and returns a [`RecordBatch`](arrow2::record_batch::RecordBatch) of a single row(*) +//! an iterator over [`RecordBatch`](arrow::record_batch::RecordBatch) +//! (a node-specific struct that implements [`RecordBatchReader`](arrow::record_batch::RecordBatchReader)) +//! * a [`PhysicalExpr`](physical_plan::PhysicalExpr) receives a [`RecordBatch`](arrow::record_batch::RecordBatch) +//! and returns an [`Array`](arrow::array::Array) +//! * an [`AggregateExpr`](physical_plan::AggregateExpr) receives [`RecordBatch`es](arrow::record_batch::RecordBatch) +//! and returns a [`RecordBatch`](arrow::record_batch::RecordBatch) of a single row(*) //! //! (*) Technically, it aggregates the results on each partition and then merges the results into a single partition. //! @@ -200,7 +200,7 @@ pub mod sql; pub mod variable; // re-export dependencies from arrow-rs to minimise version maintenance for crate users -pub use arrow2; +pub use arrow; #[cfg(test)] pub mod test; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 7d63fd3c0fc0e..a159cdbb019c9 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -19,9 +19,8 @@ use std::{collections::HashMap, sync::Arc}; -use arrow2::{datatypes::Schema, record_batch::RecordBatch}; +use arrow::{datatypes::{Schema, SchemaRef}, record_batch::RecordBatch}; -type SchemaRef = Arc; use super::dfschema::ToDFSchema; use super::{ col, exprlist_to_fields, Expr, JoinType, LogicalPlan, PlanType, StringifiedPlan, @@ -42,7 +41,7 @@ use std::collections::HashSet; /// # use datafusion::prelude::*; /// # use datafusion::logical_plan::LogicalPlanBuilder; /// # use datafusion::error::Result; -/// # use arrow2::datatypes::{Schema, DataType, Field}; +/// # use arrow::datatypes::{Schema, DataType, Field}; /// # /// # fn main() -> Result<()> { /// # @@ -460,7 +459,7 @@ fn validate_unique_names<'a>( #[cfg(test)] mod tests { - use arrow2::datatypes::{DataType, Field}; + use arrow::datatypes::{DataType, Field}; use super::super::{lit, sum}; use super::*; diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs index 651eecb9aa185..9adb22b43d075 100644 --- a/datafusion/src/logical_plan/dfschema.rs +++ b/datafusion/src/logical_plan/dfschema.rs @@ -24,11 +24,9 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use arrow2::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use std::fmt::{Display, Formatter}; -type SchemaRef = Arc; - /// A reference-counted reference to a `DFSchema`. pub type DFSchemaRef = Arc; @@ -358,7 +356,7 @@ impl DFField { #[cfg(test)] mod tests { use super::*; - use arrow2::datatypes::DataType; + use arrow::datatypes::DataType; #[test] fn from_unqualified_field() { diff --git a/datafusion/src/logical_plan/display.rs b/datafusion/src/logical_plan/display.rs index 8fe96ecf8aeec..f285534fdf1b6 100644 --- a/datafusion/src/logical_plan/display.rs +++ b/datafusion/src/logical_plan/display.rs @@ -17,7 +17,7 @@ //! This module provides logic for displaying LogicalPlans in various styles use super::{LogicalPlan, PlanVisitor}; -use arrow2::datatypes::Schema; +use arrow::datatypes::Schema; use std::fmt; /// Formats plans with a single line per node. For example: @@ -81,7 +81,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> { /// `foo:Utf8;N` if `foo` is nullable. /// /// ``` -/// use arrow2::datatypes::{Field, Schema, DataType}; +/// use arrow::datatypes::{Field, Schema, DataType}; /// # use datafusion::logical_plan::display_schema; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), @@ -238,7 +238,7 @@ impl<'a, 'b> PlanVisitor for GraphvizVisitor<'a, 'b> { #[cfg(test)] mod tests { - use arrow2::datatypes::{DataType, Field}; + use arrow::datatypes::{DataType, Field}; use super::*; diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index e9b8704ccacf0..d5c972cdfce54 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -24,7 +24,7 @@ use std::fmt; use std::sync::Arc; use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction}; -use arrow2::{compute::cast::can_cast_types, datatypes::DataType}; +use arrow::{compute::cast::can_cast_types, datatypes::DataType}; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{DFField, DFSchema}; @@ -40,7 +40,7 @@ use std::collections::HashSet; /// represent logical expressions such as `A + 1`, or `CAST(c1 AS /// int)`. /// -/// An `Expr` can compute its [DataType](arrow2::datatypes::DataType) +/// An `Expr` can compute its [DataType](arrow::datatypes::DataType) /// and nullability, and has functions for building up complex /// expressions. /// @@ -219,11 +219,11 @@ pub enum Expr { } impl Expr { - /// Returns the [arrow2::datatypes::DataType] of the expression based on [arrow2::datatypes::Schema]. + /// Returns the [arrow::datatypes::DataType] of the expression based on [arrow::datatypes::Schema]. /// /// # Errors /// - /// This function errors when it is not possible to compute its [arrow2::datatypes::DataType]. + /// This function errors when it is not possible to compute its [arrow::datatypes::DataType]. /// This happens when e.g. the expression refers to a column that does not exist in the schema, or when /// the expression is incorrectly typed (e.g. `[utf8] + [bool]`). pub fn get_type(&self, schema: &DFSchema) -> Result { @@ -295,7 +295,7 @@ impl Expr { } } - /// Returns the nullability of the expression based on [arrow2::datatypes::Schema]. + /// Returns the nullability of the expression based on [arrow::datatypes::Schema]. /// /// # Errors /// @@ -352,14 +352,14 @@ impl Expr { } } - /// Returns the name of this expression based on [arrow2::datatypes::Schema]. + /// Returns the name of this expression based on [arrow::datatypes::Schema]. /// /// This represents how a column with this expression is named when no alias is chosen pub fn name(&self, input_schema: &DFSchema) -> Result { create_name(self, input_schema) } - /// Returns a [arrow2::datatypes::Field] compatible with this expression. + /// Returns a [arrow::datatypes::Field] compatible with this expression. pub fn to_field(&self, input_schema: &DFSchema) -> Result { Ok(DFField::new( None, //TODO qualifier @@ -369,12 +369,12 @@ impl Expr { )) } - /// Wraps this expression in a cast to a target [arrow2::datatypes::DataType]. + /// Wraps this expression in a cast to a target [arrow::datatypes::DataType]. /// /// # Errors /// /// This function errors when it is impossible to cast the - /// expression to the target [arrow2::datatypes::DataType]. + /// expression to the target [arrow::datatypes::DataType]. pub fn cast_to(self, cast_to_type: &DataType, schema: &DFSchema) -> Result { let this_type = self.get_type(schema)?; if this_type == *cast_to_type { diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 7311b81b5e372..92a5c778b668f 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -use arrow2::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use crate::datasource::TableProvider; use crate::sql::parser::FileType; @@ -36,8 +36,6 @@ use super::{ }; use crate::logical_plan::dfschema::DFSchemaRef; -type SchemaRef = Arc; - /// Join type #[derive(Debug, Clone, Copy)] pub enum JoinType { @@ -501,7 +499,7 @@ impl LogicalPlan { /// ``` /// /// ``` - /// use arrow2::datatypes::{Field, Schema, DataType}; + /// use arrow::datatypes::{Field, Schema, DataType}; /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), @@ -542,7 +540,7 @@ impl LogicalPlan { /// ``` /// /// ``` - /// use arrow2::datatypes::{Field, Schema, DataType}; + /// use arrow::datatypes::{Field, Schema, DataType}; /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), @@ -582,7 +580,7 @@ impl LogicalPlan { /// structure, and one with additional details such as schema. /// /// ``` - /// use arrow2::datatypes::{Field, Schema, DataType}; + /// use arrow::datatypes::{Field, Schema, DataType}; /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), @@ -641,7 +639,7 @@ impl LogicalPlan { /// Projection: #id /// ``` /// ``` - /// use arrow2::datatypes::{Field, Schema, DataType}; + /// use arrow::datatypes::{Field, Schema, DataType}; /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index a38c224fec880..bc70168ad5d23 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -20,8 +20,8 @@ use std::sync::Arc; -use arrow2::compute::cast::utf8_to_timestamp_ns_scalar; -use arrow2::datatypes::DataType; +use arrow::compute::cast::utf8_to_timestamp_ns_scalar; +use arrow::datatypes::DataType; use crate::error::Result; use crate::execution::context::ExecutionProps; @@ -263,7 +263,7 @@ mod tests { col, lit, max, min, DFField, DFSchema, LogicalPlanBuilder, }; - use arrow2::datatypes::*; + use arrow::datatypes::*; use chrono::{DateTime, Utc}; fn test_table_scan() -> Result { diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 0565d14237891..da627ed647d0c 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -458,11 +458,9 @@ mod tests { use crate::physical_plan::ExecutionPlan; use crate::test::*; use crate::{logical_plan::col, prelude::JoinType}; - use arrow2::datatypes::Schema; + use arrow::datatypes::SchemaRef; use std::sync::Arc; - type SchemaRef = Arc; - fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterPushDown::new(); rule.optimize(plan, &ExecutionProps::new()) @@ -969,10 +967,10 @@ mod tests { impl TableProvider for PushDownProvider { fn schema(&self) -> SchemaRef { - Arc::new(arrow2::datatypes::Schema::new(vec![ - arrow2::datatypes::Field::new( + Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new( "a", - arrow2::datatypes::DataType::Int32, + arrow::datatypes::DataType::Int32, true, ), ])) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index 5bda5024fc584..32ba136875d9a 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -229,9 +229,7 @@ mod tests { logical_plan::{DFSchema, Expr}, test::*, }; - use arrow2::datatypes::Schema; - - type SchemaRef = Arc; + use arrow::datatypes::SchemaRef; struct TestTableProvider { num_rows: usize, diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 1b3ea349c7e27..e47832b07f921 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -23,8 +23,8 @@ use crate::execution::context::ExecutionProps; use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; -use arrow2::datatypes::Schema; -use arrow2::error::Result as ArrowResult; +use arrow::datatypes::Schema; +use arrow::error::Result as ArrowResult; use std::{collections::HashSet, sync::Arc}; use utils::optimize_explain; @@ -379,7 +379,7 @@ mod tests { use crate::logical_plan::{col, lit}; use crate::logical_plan::{max, min, Expr, LogicalPlanBuilder}; use crate::test::*; - use arrow2::datatypes::DataType; + use arrow::datatypes::DataType; #[test] fn aggregate_no_group_by() -> Result<()> { diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index cb9d251d7cc38..284ead252ac67 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -19,7 +19,7 @@ use std::{collections::HashSet, sync::Arc}; -use arrow2::datatypes::Schema; +use arrow::datatypes::Schema; use super::optimizer::OptimizerRule; use crate::execution::context::ExecutionProps; @@ -442,7 +442,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { mod tests { use super::*; use crate::logical_plan::{col, LogicalPlanBuilder}; - use arrow2::datatypes::DataType; + use arrow::datatypes::DataType; use std::collections::HashSet; #[test] diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 020713d9e424f..744735067c8b0 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -106,7 +106,7 @@ impl PhysicalOptimizerRule for Repartition { } #[cfg(test)] mod tests { - use arrow2::datatypes::Schema; + use arrow::datatypes::Schema; use super::*; use crate::datasource::datasource::Statistics; diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 5ab92a36a1ee3..3607f29debba1 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -34,7 +34,7 @@ use super::{ use crate::error::{DataFusionError, Result}; use crate::physical_plan::distinct_expressions; use crate::physical_plan::expressions; -use arrow2::datatypes::{DataType, Schema, TimeUnit}; +use arrow::datatypes::{DataType, Schema, TimeUnit}; use expressions::{avg_return_type, sum_return_type}; use std::{fmt, str::FromStr, sync::Arc}; /// the implementation of an aggregate function diff --git a/datafusion/src/physical_plan/array_expressions.rs b/datafusion/src/physical_plan/array_expressions.rs index add079314eae6..a416512e0c48e 100644 --- a/datafusion/src/physical_plan/array_expressions.rs +++ b/datafusion/src/physical_plan/array_expressions.rs @@ -18,9 +18,9 @@ //! Array expressions use crate::error::{DataFusionError, Result}; -use arrow2::array::*; -use arrow2::compute::concat; -use arrow2::datatypes::DataType; +use arrow::array::*; +use arrow::compute::concat; +use arrow::datatypes::DataType; use std::sync::Arc; use super::ColumnarValue; diff --git a/datafusion/src/physical_plan/coalesce_batches.rs b/datafusion/src/physical_plan/coalesce_batches.rs index bc7a199fd7d3e..2fba73c149e46 100644 --- a/datafusion/src/physical_plan/coalesce_batches.rs +++ b/datafusion/src/physical_plan/coalesce_batches.rs @@ -29,11 +29,10 @@ use crate::physical_plan::{ SendableRecordBatchStream, }; -use arrow2::compute::concat::concatenate; -use arrow2::datatypes::Schema; -type SchemaRef = Arc; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; +use arrow::compute::concat::concatenate; +use arrow::datatypes::{SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -261,8 +260,8 @@ pub fn concat_batches( mod tests { use super::*; use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec}; - use arrow2::array::UInt32Array; - use arrow2::datatypes::{DataType, Field, Schema}; + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema}; #[tokio::test(flavor = "multi_thread")] async fn test_concat_batches() -> Result<()> { diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index 761eab48fbc2d..f1ed3742340b0 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -25,10 +25,9 @@ use std::task::{Context, Poll}; use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; -use arrow2::datatypes::Schema; -type SchemaRef = Arc; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use futures::{Stream, TryStreamExt}; /// Stream of record batches diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 06f209858a857..8c0c07e534008 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -22,9 +22,9 @@ use futures::{lock::Mutex, StreamExt}; use std::{any::Any, sync::Arc, task::Poll}; use crate::physical_plan::memory::MemoryStream; -use arrow2::datatypes::Schema; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use futures::{Stream, TryStreamExt}; @@ -42,8 +42,6 @@ use super::{ }; use log::debug; -type SchemaRef = Arc; - /// Data of the left side type JoinLeftData = RecordBatch; diff --git a/datafusion/src/physical_plan/crypto_expressions.rs b/datafusion/src/physical_plan/crypto_expressions.rs index 07a68d30207ce..4a65bf2f91661 100644 --- a/datafusion/src/physical_plan/crypto_expressions.rs +++ b/datafusion/src/physical_plan/crypto_expressions.rs @@ -28,7 +28,7 @@ use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; -use arrow2::{ +use arrow::{ array::{Array, BinaryArray, Offset, Utf8Array}, datatypes::DataType, }; diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 89bc3e69c1c42..15f7b40f1be29 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -24,10 +24,10 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; -use arrow2::datatypes::Schema; -use arrow2::error::{ArrowError, Result as ArrowResult}; -use arrow2::io::csv::read; -use arrow2::record_batch::RecordBatch; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::io::csv::read; +use arrow::record_batch::RecordBatch; use futures::Stream; use std::any::Any; @@ -41,8 +41,6 @@ use crate::physical_plan::{ common, source::Source, DisplayFormatType, ExecutionPlan, Partitioning, }; -type SchemaRef = Arc; - use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index c63e99dfc7429..ba5969c9d12a1 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -23,13 +23,13 @@ use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; -use arrow2::{ +use arrow::{ array::*, compute::cast, datatypes::{DataType, TimeUnit}, types::NativeType, }; -use arrow2::{compute::temporal, temporal_conversions::timestamp_ns_to_datetime}; +use arrow::{compute::temporal, temporal_conversions::timestamp_ns_to_datetime}; use chrono::prelude::{DateTime, Local, NaiveDateTime, Utc}; use chrono::Datelike; use chrono::Duration; @@ -37,8 +37,6 @@ use chrono::LocalResult; use chrono::TimeZone; use chrono::Timelike; -type ArrayRef = Arc; - #[inline] /// Accepts a string in RFC3339 / ISO8601 standard format and some /// variants and converts it to a nanosecond precision timestamp. @@ -430,8 +428,8 @@ pub fn date_part(args: &[ColumnarValue]) -> Result { mod tests { use std::sync::Arc; - use arrow2::array::*; - use arrow2::datatypes::*; + use arrow::array::*; + use arrow::datatypes::*; use super::*; diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index da51b7611398c..db90f11928e01 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -25,9 +25,8 @@ use std::sync::Arc; use ahash::RandomState; -use arrow2::array::new_empty_array; -use arrow2::{ - array::{Array, ListArray}, +use arrow::{ + array::*, datatypes::{DataType, Field}, }; @@ -182,7 +181,7 @@ impl Accumulator for DistinctCountAccumulator { .map(|distinct_values| ScalarValue::from(&distinct_values[i]).to_array()) .collect::>(); let arrays = arrays.iter().map(|x| x.as_ref()).collect::>(); - Ok(arrow2::compute::concat::concatenate(&arrays).map(|x| (x, type_))?) + Ok(arrow::compute::concat::concatenate(&arrays).map(|x| (x, type_))?) }); a.map(|values: Result<(Box, &DataType)>| { values.map(|(values, type_)| { @@ -208,14 +207,11 @@ impl Accumulator for DistinctCountAccumulator { #[cfg(test)] mod tests { - type ArrayRef = Arc; - use std::iter::FromIterator; use super::*; - use arrow2::array::*; - use arrow2::datatypes::DataType; + use arrow::datatypes::DataType; macro_rules! state_to_vec { ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ diff --git a/datafusion/src/physical_plan/empty.rs b/datafusion/src/physical_plan/empty.rs index 1cb57b716d907..98f4aac111c6d 100644 --- a/datafusion/src/physical_plan/empty.rs +++ b/datafusion/src/physical_plan/empty.rs @@ -25,16 +25,14 @@ use crate::physical_plan::{ memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, }; -use arrow2::array::NullArray; -use arrow2::datatypes::{DataType, Field, Schema}; -use arrow2::record_batch::RecordBatch; +use arrow::array::NullArray; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use super::SendableRecordBatchStream; use async_trait::async_trait; -type SchemaRef = Arc; - /// Execution plan for empty relation (produces no rows) #[derive(Debug)] pub struct EmptyExec { diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index 801e72a24ea39..f207d9294fc11 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -26,9 +26,7 @@ use crate::{ physical_plan::Partitioning, physical_plan::{common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan}, }; -use arrow2::{array::*, datatypes::Schema, record_batch::RecordBatch}; - -type SchemaRef = Arc; +use arrow::{array::*, datatypes::SchemaRef, record_batch::RecordBatch}; use super::SendableRecordBatchStream; use async_trait::async_trait; diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index ca94d344ef95a..fba65d74dd9ee 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -24,14 +24,9 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow2::compute; -use arrow2::datatypes::DataType; -use arrow2::{ - array::{Array, UInt64Array}, - datatypes::Field, -}; - -type ArrayRef = Arc; +use arrow::compute; +use arrow::datatypes::DataType; +use arrow::{array::*, datatypes::Field}; use super::{format_state_name, sum}; @@ -198,8 +193,8 @@ mod tests { use super::*; use crate::physical_plan::expressions::col; use crate::{error::Result, generic_test_op}; - use arrow2::record_batch::RecordBatch; - use arrow2::{array::*, datatypes::*}; + use arrow::datatypes::*; + use arrow::record_batch::RecordBatch; #[test] fn avg_i32() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 95dc63449e43c..6067fcb67b65f 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -17,10 +17,10 @@ use std::{any::Any, convert::TryInto, sync::Arc}; -use arrow2::array::*; -use arrow2::compute; -use arrow2::datatypes::{DataType, Schema}; -use arrow2::record_batch::RecordBatch; +use arrow::array::*; +use arrow::compute; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; use crate::error::{DataFusionError, Result}; use crate::logical_plan::Operator; @@ -448,9 +448,9 @@ pub fn binary( #[cfg(test)] mod tests { - use arrow2::datatypes::*; - use arrow2::error::Result as ArrowResult; - use arrow2::{array::*, types::NativeType}; + use arrow::datatypes::*; + use arrow::error::Result as ArrowResult; + use arrow::{array::*, types::NativeType}; use super::*; use crate::error::Result; diff --git a/datafusion/src/physical_plan/expressions/case.rs b/datafusion/src/physical_plan/expressions/case.rs index 225c464f1f21b..176598fd59278 100644 --- a/datafusion/src/physical_plan/expressions/case.rs +++ b/datafusion/src/physical_plan/expressions/case.rs @@ -17,11 +17,11 @@ use std::{any::Any, sync::Arc}; -use arrow2::array::*; -use arrow2::compute::comparison; -use arrow2::compute::if_then_else; -use arrow2::datatypes::{DataType, Schema}; -use arrow2::record_batch::RecordBatch; +use arrow::array::*; +use arrow::compute::comparison; +use arrow::compute::if_then_else; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ColumnarValue, PhysicalExpr}; @@ -268,8 +268,8 @@ mod tests { physical_plan::expressions::{binary, col, lit}, scalar::ScalarValue, }; - use arrow2::array::Utf8Array; - use arrow2::datatypes::*; + use arrow::array::Utf8Array; + use arrow::datatypes::*; #[test] fn case_with_expr() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/cast.rs b/datafusion/src/physical_plan/expressions/cast.rs index 20cea215393b6..735a9d4d37fe0 100644 --- a/datafusion/src/physical_plan/expressions/cast.rs +++ b/datafusion/src/physical_plan/expressions/cast.rs @@ -23,9 +23,9 @@ use super::ColumnarValue; use crate::error::{DataFusionError, Result}; use crate::physical_plan::PhysicalExpr; use crate::scalar::ScalarValue; -use arrow2::compute::cast; -use arrow2::datatypes::{DataType, Schema}; -use arrow2::record_batch::RecordBatch; +use arrow::compute::cast; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; /// CAST expression casts an expression to a specific data type and returns a runtime error on invalid cast #[derive(Debug)] @@ -129,7 +129,7 @@ mod tests { use super::*; use crate::error::Result; use crate::physical_plan::expressions::col; - use arrow2::{array::*, datatypes::*}; + use arrow::{array::*, datatypes::*}; type StringArray = Utf8Array; diff --git a/datafusion/src/physical_plan/expressions/coercion.rs b/datafusion/src/physical_plan/expressions/coercion.rs index 73470d5428492..e9949f5199e88 100644 --- a/datafusion/src/physical_plan/expressions/coercion.rs +++ b/datafusion/src/physical_plan/expressions/coercion.rs @@ -17,7 +17,7 @@ //! Coercion rules used to coerce types to match existing expressions' implementations -use arrow2::datatypes::DataType; +use arrow::datatypes::DataType; /// Determine if a DataType is signed numeric or not pub fn is_signed_numeric(dt: &DataType) -> bool { @@ -79,7 +79,7 @@ pub fn dictionary_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { - use arrow2::datatypes::DataType::*; + use arrow::datatypes::DataType::*; match (lhs_type, rhs_type) { (Utf8, Utf8) => Some(Utf8), (LargeUtf8, Utf8) => Some(LargeUtf8), @@ -92,7 +92,7 @@ pub fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { - use arrow2::datatypes::DataType::*; + use arrow::datatypes::DataType::*; match (lhs_type, rhs_type) { (Utf8, Date32) => Some(Date32), (Date32, Utf8) => Some(Date32), @@ -106,7 +106,7 @@ pub fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { - use arrow2::datatypes::DataType::*; + use arrow::datatypes::DataType::*; // error on any non-numeric type if !is_numeric(lhs_type) || !is_numeric(rhs_type) { diff --git a/datafusion/src/physical_plan/expressions/column.rs b/datafusion/src/physical_plan/expressions/column.rs index de64f8fed98a9..7e0304e51fe73 100644 --- a/datafusion/src/physical_plan/expressions/column.rs +++ b/datafusion/src/physical_plan/expressions/column.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use arrow2::{ +use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs index 4390fb52d2798..47f5e9cca2ba1 100644 --- a/datafusion/src/physical_plan/expressions/count.rs +++ b/datafusion/src/physical_plan/expressions/count.rs @@ -24,9 +24,9 @@ use super::ArrayRef; use crate::error::Result; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow2::compute; -use arrow2::datatypes::DataType; -use arrow2::{array::UInt64Array, datatypes::Field}; +use arrow::compute; +use arrow::datatypes::DataType; +use arrow::{array::UInt64Array, datatypes::Field}; use super::format_state_name; @@ -148,8 +148,8 @@ mod tests { use crate::physical_plan::expressions::col; use crate::physical_plan::expressions::tests::aggregate; use crate::{error::Result, generic_test_op}; - use arrow2::record_batch::RecordBatch; - use arrow2::{array::*, datatypes::*}; + use arrow::record_batch::RecordBatch; + use arrow::{array::*, datatypes::*}; #[test] fn count_elements() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index cab06344793e9..fc9ce37d7f2c9 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -20,9 +20,9 @@ use std::any::Any; use std::sync::Arc; -use arrow2::array::Utf8Array; -use arrow2::array::*; -use arrow2::{ +use arrow::array::Utf8Array; +use arrow::array::*; +use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -283,7 +283,7 @@ pub fn in_list( #[cfg(test)] mod tests { - use arrow2::{array::Utf8Array, datatypes::Field}; + use arrow::{array::Utf8Array, datatypes::Field}; type StringArray = Utf8Array; diff --git a/datafusion/src/physical_plan/expressions/is_not_null.rs b/datafusion/src/physical_plan/expressions/is_not_null.rs index 16694b384accf..081ad37815f1a 100644 --- a/datafusion/src/physical_plan/expressions/is_not_null.rs +++ b/datafusion/src/physical_plan/expressions/is_not_null.rs @@ -19,8 +19,8 @@ use std::{any::Any, sync::Arc}; -use arrow2::compute; -use arrow2::{ +use arrow::compute; +use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -89,7 +89,7 @@ pub fn is_not_null(arg: Arc) -> Result> mod tests { use super::*; use crate::physical_plan::expressions::col; - use arrow2::{ + use arrow::{ array::{BooleanArray, Utf8Array}, datatypes::*, record_batch::RecordBatch, diff --git a/datafusion/src/physical_plan/expressions/is_null.rs b/datafusion/src/physical_plan/expressions/is_null.rs index 2fb4acbbd3171..ab7f4a174c8c5 100644 --- a/datafusion/src/physical_plan/expressions/is_null.rs +++ b/datafusion/src/physical_plan/expressions/is_null.rs @@ -19,8 +19,8 @@ use std::{any::Any, sync::Arc}; -use arrow2::compute; -use arrow2::{ +use arrow::compute; +use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -89,7 +89,7 @@ pub fn is_null(arg: Arc) -> Result> { mod tests { use super::*; use crate::physical_plan::expressions::col; - use arrow2::{ + use arrow::{ array::{BooleanArray, Utf8Array}, datatypes::*, record_batch::RecordBatch, diff --git a/datafusion/src/physical_plan/expressions/literal.rs b/datafusion/src/physical_plan/expressions/literal.rs index 0bf71d63d89b6..45ecf5c9f9fe4 100644 --- a/datafusion/src/physical_plan/expressions/literal.rs +++ b/datafusion/src/physical_plan/expressions/literal.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::sync::Arc; -use arrow2::{ +use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -80,8 +80,8 @@ pub fn lit(value: ScalarValue) -> Arc { mod tests { use super::*; use crate::error::Result; - use arrow2::array::*; - use arrow2::datatypes::*; + use arrow::array::*; + use arrow::datatypes::*; #[test] fn literal_i32() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index f0dce1f497a43..b702485f410db 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -21,9 +21,9 @@ use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; -use arrow2::array::*; -use arrow2::compute::aggregate::*; -use arrow2::datatypes::*; +use arrow::array::*; +use arrow::compute::aggregate::*; +use arrow::datatypes::*; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; @@ -31,7 +31,6 @@ use crate::scalar::ScalarValue; type StringArray = Utf8Array; type LargeStringArray = Utf8Array; -type ArrayRef = Arc; use super::format_state_name; @@ -444,7 +443,7 @@ mod tests { use crate::physical_plan::expressions::col; use crate::physical_plan::expressions::tests::aggregate; use crate::{error::Result, generic_test_op}; - use arrow2::record_batch::RecordBatch; + use arrow::record_batch::RecordBatch; #[test] fn max_i32() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 9983794637393..8e5cdde69eb92 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -22,11 +22,9 @@ use std::sync::Arc; use super::ColumnarValue; use crate::error::{DataFusionError, Result}; use crate::physical_plan::PhysicalExpr; -use arrow2::array::Array; -use arrow2::compute::sort::SortOptions; -use arrow2::record_batch::RecordBatch; - -type ArrayRef = Arc; +use arrow::array::*; +use arrow::compute::sort::SortOptions; +use arrow::record_batch::RecordBatch; /// One column to be used in lexicographical sort #[derive(Clone, Debug)] diff --git a/datafusion/src/physical_plan/expressions/negative.rs b/datafusion/src/physical_plan/expressions/negative.rs index d17acf899b09d..8eefc0406742d 100644 --- a/datafusion/src/physical_plan/expressions/negative.rs +++ b/datafusion/src/physical_plan/expressions/negative.rs @@ -20,15 +20,13 @@ use std::any::Any; use std::sync::Arc; -use arrow2::{ +use arrow::{ array::*, compute::arithmetics::negate, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -type ArrayRef = Arc; - use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ColumnarValue, PhysicalExpr}; diff --git a/datafusion/src/physical_plan/expressions/not.rs b/datafusion/src/physical_plan/expressions/not.rs index ac467a8cc344d..ebcbf6deedb9f 100644 --- a/datafusion/src/physical_plan/expressions/not.rs +++ b/datafusion/src/physical_plan/expressions/not.rs @@ -25,9 +25,9 @@ use super::ColumnarValue; use crate::error::{DataFusionError, Result}; use crate::physical_plan::PhysicalExpr; use crate::scalar::ScalarValue; -use arrow2::array::BooleanArray; -use arrow2::datatypes::{DataType, Schema}; -use arrow2::record_batch::RecordBatch; +use arrow::array::BooleanArray; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; /// Not expression #[derive(Debug)] @@ -82,7 +82,7 @@ impl PhysicalExpr for NotExpr { ) })?; Ok(ColumnarValue::Array(Arc::new( - arrow2::compute::boolean::not(array), + arrow::compute::boolean::not(array), ))) } ColumnarValue::Scalar(scalar) => { @@ -121,7 +121,7 @@ mod tests { use super::*; use crate::error::Result; use crate::physical_plan::expressions::col; - use arrow2::datatypes::*; + use arrow::datatypes::*; #[test] fn neg_op() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/nullif.rs b/datafusion/src/physical_plan/expressions/nullif.rs index b632cc45c67f4..e6be0a8c8e901 100644 --- a/datafusion/src/physical_plan/expressions/nullif.rs +++ b/datafusion/src/physical_plan/expressions/nullif.rs @@ -17,8 +17,8 @@ use super::ColumnarValue; use crate::error::{DataFusionError, Result}; -use arrow2::compute::nullif; -use arrow2::datatypes::DataType; +use arrow::compute::nullif; +use arrow::datatypes::DataType; /// Implements NULLIF(expr1, expr2) /// Args: 0 - left expr is any array @@ -73,7 +73,7 @@ mod tests { use super::*; use crate::{error::Result, scalar::ScalarValue}; - use arrow2::array::Int32Array; + use arrow::array::Int32Array; #[test] fn nullif_int32() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 67303388c9951..f491fabbfb856 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -22,14 +22,12 @@ use crate::physical_plan::{ window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator, }; use crate::scalar::ScalarValue; -use arrow2::array::{Array, UInt64Array}; -use arrow2::buffer::Buffer; -use arrow2::datatypes::{DataType, Field}; +use arrow::array::{Array, UInt64Array}; +use arrow::buffer::Buffer; +use arrow::datatypes::{DataType, Field}; use std::any::Any; use std::sync::Arc; -type ArrayRef = Arc; - /// row_number expression #[derive(Debug)] pub struct RowNumber { @@ -111,8 +109,8 @@ impl WindowAccumulator for RowNumberAccumulator { mod tests { use super::*; use crate::error::Result; - use arrow2::record_batch::RecordBatch; - use arrow2::{array::*, datatypes::*}; + use arrow::record_batch::RecordBatch; + use arrow::{array::*, datatypes::*}; #[test] fn row_number_all_null() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs index f7e611de439c4..b8988810b470e 100644 --- a/datafusion/src/physical_plan/expressions/sum.rs +++ b/datafusion/src/physical_plan/expressions/sum.rs @@ -24,11 +24,11 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow2::compute; -use arrow2::datatypes::DataType; -use arrow2::{array::*, datatypes::Field}; - -type ArrayRef = Arc; +use arrow::compute; +use arrow::{ + array::*, + datatypes::{DataType, Field}, +}; use super::format_state_name; @@ -272,8 +272,8 @@ mod tests { use super::*; use crate::physical_plan::expressions::col; use crate::{error::Result, generic_test_op}; - use arrow2::datatypes::*; - use arrow2::record_batch::RecordBatch; + use arrow::datatypes::*; + use arrow::record_batch::RecordBatch; #[test] fn sum_i32() -> Result<()> { diff --git a/datafusion/src/physical_plan/expressions/try_cast.rs b/datafusion/src/physical_plan/expressions/try_cast.rs index 1d34fde9f70f2..6457f175853b0 100644 --- a/datafusion/src/physical_plan/expressions/try_cast.rs +++ b/datafusion/src/physical_plan/expressions/try_cast.rs @@ -23,9 +23,9 @@ use super::ColumnarValue; use crate::error::{DataFusionError, Result}; use crate::physical_plan::PhysicalExpr; use crate::scalar::ScalarValue; -use arrow2::compute; -use arrow2::datatypes::{DataType, Schema}; -use arrow2::record_batch::RecordBatch; +use arrow::compute; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; use compute::cast; /// TRY_CAST expression casts an expression to a specific data type and retuns NULL on invalid cast @@ -118,7 +118,7 @@ mod tests { use super::*; use crate::error::Result; use crate::physical_plan::expressions::col; - use arrow2::{array::*, datatypes::*}; + use arrow::{array::*, datatypes::*}; type StringArray = Utf8Array; diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index 1bdd9a90042ae..227e6fd6d860e 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -29,13 +29,11 @@ use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, }; -use arrow2::array::BooleanArray; -use arrow2::compute::filter::filter_record_batch; -use arrow2::datatypes::{DataType, Schema}; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; - -type SchemaRef = Arc; +use arrow::array::BooleanArray; +use arrow::compute::filter::filter_record_batch; +use arrow::datatypes::{DataType, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index fd06bdddbc3cd..dd3b8c6e31a84 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -46,8 +46,8 @@ use crate::{ execution::context::ExecutionContextState, physical_plan::array_expressions::SUPPORTED_ARRAY_TYPES, }; -use arrow2::{ - array::{Array, NullArray}, +use arrow::{ + array::*, compute::length::length, datatypes::TimeUnit, datatypes::{DataType, Field, Schema}, @@ -57,8 +57,6 @@ use fmt::{Debug, Formatter}; use std::convert::From; use std::{any::Any, fmt, str::FromStr, sync::Arc}; -type ArrayRef = Arc; - /// A function's signature, which defines the function's supported argument types. #[derive(Debug, Clone, PartialEq)] pub enum Signature { @@ -1277,7 +1275,7 @@ mod tests { physical_plan::expressions::{col, lit}, scalar::ScalarValue, }; - use arrow2::{array::*, datatypes::Field, record_batch::RecordBatch}; + use arrow::{datatypes::Field, record_batch::RecordBatch}; type StringArray = Utf8Array; diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 1a02ac662600d..771194f45adf2 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -35,20 +35,17 @@ use crate::{ scalar::ScalarValue, }; -use arrow2::error::{ArrowError, Result as ArrowResult}; -use arrow2::{array::*, compute}; -use arrow2::{buffer::MutableBuffer, datatypes::*}; -use arrow2::{ - datatypes::{DataType, Field, Schema}, +use arrow::error::{ArrowError, Result as ArrowResult}; +use arrow::{array::*, compute}; +use arrow::{buffer::MutableBuffer, datatypes::*}; +use arrow::{ + datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; use hashbrown::HashMap; use ordered_float::OrderedFloat; use pin_project_lite::pin_project; -type SchemaRef = Arc; -type ArrayRef = Arc; - use async_trait::async_trait; use super::hash_join::{combine_hashes, IdHashBuilder}; @@ -337,10 +334,10 @@ fn hash_(group_values: &[ArrayRef]) -> Result> { // todo: think about how to perform this more efficiently // * first hash, then unpack // * do not unpack at all, and instead figure out a way to leverage dictionary-encoded. - let unpacked = arrow2::compute::cast::cast(x.as_ref(), d)?; - arrow2::compute::hash::hash(unpacked.as_ref()) + let unpacked = arrow::compute::cast::cast(x.as_ref(), d)?; + arrow::compute::hash::hash(unpacked.as_ref()) } - _ => arrow2::compute::hash::hash(x.as_ref()), + _ => arrow::compute::hash::hash(x.as_ref()), }; Ok(a?) }) @@ -1023,7 +1020,7 @@ pub(crate) fn create_group_by_values( #[cfg(test)] mod tests { - use arrow2::array::Float64Array; + use arrow::array::Float64Array; use super::*; use crate::physical_plan::expressions::{col, Avg}; diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 33f50bc280e33..a787c67a31c32 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -31,12 +31,12 @@ use futures::{Stream, StreamExt, TryStreamExt}; use hashbrown::HashMap; use tokio::sync::Mutex; -use arrow2::datatypes::*; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; -use arrow2::{array::*, buffer::MutableBuffer}; +use arrow::datatypes::*; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use arrow::{array::*, buffer::MutableBuffer}; -use arrow2::compute::take; +use arrow::compute::take; use super::{expressions::col, ArrayRef}; use super::{ @@ -52,7 +52,6 @@ use super::{ use crate::physical_plan::coalesce_batches::concat_batches; use log::debug; -type SchemaRef = Arc; type StringArray = Utf8Array; type LargeStringArray = Utf8Array; diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 5bcf74547d444..7e030af3a124c 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -18,7 +18,7 @@ //! Functionality used both on logical and physical plans use crate::error::{DataFusionError, Result}; -use arrow2::datatypes::{Field, Schema}; +use arrow::datatypes::{Field, Schema}; use std::collections::HashSet; /// All valid types of joins. diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs index 9b3756afbf281..7835192b96fea 100644 --- a/datafusion/src/physical_plan/json.rs +++ b/datafusion/src/physical_plan/json.rs @@ -21,8 +21,11 @@ use futures::Stream; use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream}; use crate::error::{DataFusionError, Result}; -use arrow2::{ - datatypes::Schema, error::Result as ArrowResult, io::json, record_batch::RecordBatch, +use arrow::{ + datatypes::{Schema, SchemaRef}, + error::Result as ArrowResult, + io::json, + record_batch::RecordBatch, }; use std::fs::File; use std::{any::Any, io::Seek}; @@ -33,8 +36,6 @@ use std::{ task::{Context, Poll}, }; -type SchemaRef = Arc; - /// Line-delimited JSON read options #[derive(Clone)] pub struct NdJsonReadOptions<'a> { @@ -268,7 +269,7 @@ impl ExecutionPlan for NdJsonExec { partition: usize, ) -> Result { let mut builder = json::ReaderBuilder::new() - .with_schema(self.schema.as_ref().clone()) + .with_schema(self.schema.clone()) .with_batch_size(self.batch_size); if let Some(proj) = &self.projection { builder = builder.with_projection( @@ -362,15 +363,15 @@ impl Stream for NdJsonStream { impl RecordBatchStream for NdJsonStream { fn schema(&self) -> SchemaRef { - Arc::new(self.reader.schema().clone()) + self.reader.schema().clone() } } #[cfg(test)] mod tests { use super::*; - use arrow2::array::{Int64Array, Utf8Array}; - use arrow2::datatypes::DataType; + use arrow::array::{Int64Array, Utf8Array}; + use arrow::datatypes::DataType; use futures::StreamExt; const TEST_DATA_BASE: &str = "tests/jsons"; diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index 33ba2a28f4783..a6cdd73fc1a9d 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -30,14 +30,11 @@ use crate::physical_plan::{ DisplayFormatType, Distribution, ExecutionPlan, Partitioning, }; -use arrow2::array::Array; -use arrow2::compute::limit::limit; -use arrow2::datatypes::Schema; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; - -type SchemaRef = Arc; -type ArrayRef = Arc; +use arrow::array::ArrayRef; +use arrow::compute::limit::limit; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use super::{RecordBatchStream, SendableRecordBatchStream}; diff --git a/datafusion/src/physical_plan/math_expressions.rs b/datafusion/src/physical_plan/math_expressions.rs index ce71711c60a92..79cd419232f17 100644 --- a/datafusion/src/physical_plan/math_expressions.rs +++ b/datafusion/src/physical_plan/math_expressions.rs @@ -20,9 +20,9 @@ use rand::{thread_rng, Rng}; use std::iter; use std::sync::Arc; -use arrow2::array::Float64Array; -use arrow2::compute::arity::unary; -use arrow2::datatypes::DataType; +use arrow::array::Float64Array; +use arrow::compute::arity::unary; +use arrow::datatypes::DataType; use super::{ColumnarValue, ScalarValue}; use crate::error::{DataFusionError, Result}; @@ -115,7 +115,7 @@ pub fn random(args: &[ColumnarValue]) -> Result { mod tests { use super::*; - use arrow2::array::{Array, Float64Array, NullArray}; + use arrow::array::{Array, Float64Array, NullArray}; #[test] fn test_random_expression() { diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index e29d3d227fbb2..85d8aeef073c1 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -27,10 +27,9 @@ use super::{ SendableRecordBatchStream, }; use crate::error::{DataFusionError, Result}; -use arrow2::datatypes::Schema; -type SchemaRef = Arc; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::Stream; diff --git a/datafusion/src/physical_plan/merge.rs b/datafusion/src/physical_plan/merge.rs index 6cc4a8dcd7637..5b096760fca6d 100644 --- a/datafusion/src/physical_plan/merge.rs +++ b/datafusion/src/physical_plan/merge.rs @@ -28,9 +28,9 @@ use futures::Stream; use async_trait::async_trait; -use arrow2::record_batch::RecordBatch; -use arrow2::{ - datatypes::Schema, +use arrow::record_batch::RecordBatch; +use arrow::{ + datatypes::SchemaRef, error::{ArrowError, Result as ArrowResult}, }; @@ -41,8 +41,6 @@ use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use super::SendableRecordBatchStream; use pin_project_lite::pin_project; -type SchemaRef = Arc; - /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. #[derive(Debug)] diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 2140cfbbb443e..48fd71659d442 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -26,13 +26,10 @@ use crate::error::DataFusionError; use crate::execution::context::ExecutionContextState; use crate::logical_plan::LogicalPlan; use crate::{error::Result, scalar::ScalarValue}; -use arrow2::array::Array; -use arrow2::datatypes::{DataType, Field, Schema}; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; - -type ArrayRef = Arc; -type SchemaRef = Arc; +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; pub use display::DisplayFormatType; @@ -42,7 +39,7 @@ use std::{any::Any, pin::Pin}; use self::{display::DisplayableExecutionPlan, merge::MergeExec}; use hashbrown::HashMap; -/// Trait for types that stream [arrow2::record_batch::RecordBatch] +/// Trait for types that stream [arrow::record_batch::RecordBatch] pub trait RecordBatchStream: Stream> { /// Returns the schema of this `RecordBatchStream`. /// diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index feeb3354a8936..2e283623b0699 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -30,7 +30,7 @@ use crate::{ logical_plan::Expr, }; -use arrow2::{ +use arrow::{ datatypes::*, error::Result as ArrowResult, io::parquet::read, record_batch::RecordBatch, }; @@ -46,8 +46,6 @@ use crate::datasource::datasource::{ColumnStatistics, Statistics}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; -type SchemaRef = Arc; - /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -422,7 +420,7 @@ impl RecordBatchStream for ParquetStream { mod tests { use super::*; use arrow::datatypes::{DataType, Field}; - use arrow2::array::{Int32Array, StringArray}; + use arrow::array::{Int32Array, StringArray}; use futures::StreamExt; use parquet::{ basic::Type as PhysicalType, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 5c2ff2ceccb3d..b703a51ba187c 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -52,14 +52,12 @@ use crate::{ physical_plan::displayable, }; -use arrow2::compute::cast::can_cast_types; -use arrow2::compute::sort::SortOptions; -use arrow2::datatypes::*; +use arrow::compute::cast::can_cast_types; +use arrow::compute::sort::SortOptions; +use arrow::datatypes::*; use expressions::col; use log::debug; -type SchemaRef = Arc; - /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`]. pub trait ExtensionPlanner { /// Create a physical plan for a [`UserDefinedLogicalNode`]. @@ -850,7 +848,7 @@ mod tests { logical_plan::{col, lit, sum, LogicalPlanBuilder}, physical_plan::SendableRecordBatchStream, }; - use arrow2::datatypes::{DataType, Field}; + use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use fmt::Debug; use std::{any::Any, fmt}; diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index e97bb7ca0e419..e1811fe03bc11 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -30,11 +30,9 @@ use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, }; -use arrow2::datatypes::{Field, Schema}; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; - -type SchemaRef = Arc; +use arrow::datatypes::{Field, Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use super::{RecordBatchStream, SendableRecordBatchStream}; use async_trait::async_trait; diff --git a/datafusion/src/physical_plan/regex_expressions.rs b/datafusion/src/physical_plan/regex_expressions.rs index b31eb1f38c2cd..945dea5ea1070 100644 --- a/datafusion/src/physical_plan/regex_expressions.rs +++ b/datafusion/src/physical_plan/regex_expressions.rs @@ -25,13 +25,11 @@ use std::any::type_name; use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use arrow2::array::*; -use arrow2::error::ArrowError; +use arrow::array::*; +use arrow::error::ArrowError; use hashbrown::HashMap; use regex::Regex; -type ArrayRef = Arc; - macro_rules! downcast_string_arg { ($ARG:expr, $NAME:expr, $T:ident) => {{ $ARG.as_any() diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index 146140574b4a1..feee66480dc8d 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -27,7 +27,7 @@ use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; -use arrow2::{ +use arrow::{ array::*, buffer::MutableBuffer, compute::take, datatypes::*, error::Result as ArrowResult, record_batch::RecordBatch, }; @@ -47,7 +47,6 @@ use tokio::sync::{ }; use tokio::task::JoinHandle; -type SchemaRef = Arc; type MaybeBatch = Option>; /// The repartition operator maps N input partitions to M output partitions based on a @@ -368,9 +367,9 @@ impl RecordBatchStream for RepartitionStream { mod tests { use super::*; use crate::physical_plan::memory::MemoryExec; - use arrow2::array::UInt32Array; - use arrow2::datatypes::{DataType, Field, Schema}; - use arrow2::record_batch::RecordBatch; + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 4f91de7d5d470..a5ddbd1a3585b 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -30,12 +30,12 @@ use hashbrown::HashMap; use pin_project_lite::pin_project; -pub use arrow2::compute::sort::SortOptions; -use arrow2::compute::{concat, sort::lexsort_to_indices, sort::SortColumn, take}; -use arrow2::datatypes::Schema; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; -use arrow2::{array::Array, error::ArrowError}; +pub use arrow::compute::sort::SortOptions; +use arrow::compute::{concat, sort::lexsort_to_indices, sort::SortColumn, take}; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use arrow::{array::ArrayRef, error::ArrowError}; use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; @@ -44,9 +44,6 @@ use crate::physical_plan::{ common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SQLMetric, }; -type SchemaRef = Arc; -type ArrayRef = Arc; - /// Sort execution plan #[derive(Debug)] pub struct SortExec { @@ -348,8 +345,8 @@ mod tests { csv::{CsvExec, CsvReadOptions}, }; use crate::test; - use arrow2::array::*; - use arrow2::datatypes::*; + use arrow::array::*; + use arrow::datatypes::*; #[tokio::test] async fn test_sort() -> Result<()> { diff --git a/datafusion/src/physical_plan/string_expressions.rs b/datafusion/src/physical_plan/string_expressions.rs index 7c82b8d269266..2ce79c176f1f6 100644 --- a/datafusion/src/physical_plan/string_expressions.rs +++ b/datafusion/src/physical_plan/string_expressions.rs @@ -28,17 +28,11 @@ use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; -use arrow2::{ - array::{ - Array, BooleanArray, Int32Array, Int64Array, Offset, PrimitiveArray, Utf8Array, - }, - datatypes::DataType, -}; +use arrow::{array::*, datatypes::DataType}; use super::ColumnarValue; type StringArray = Utf8Array; -type ArrayRef = Arc; macro_rules! downcast_string_arg { ($ARG:expr, $NAME:expr, $T:ident) => {{ diff --git a/datafusion/src/physical_plan/type_coercion.rs b/datafusion/src/physical_plan/type_coercion.rs index 6b64a75e8207f..06d3739b53b27 100644 --- a/datafusion/src/physical_plan/type_coercion.rs +++ b/datafusion/src/physical_plan/type_coercion.rs @@ -31,7 +31,7 @@ use std::{sync::Arc, vec}; -use arrow2::datatypes::{DataType, Schema, TimeUnit}; +use arrow::datatypes::{DataType, Schema, TimeUnit}; use super::{functions::Signature, PhysicalExpr}; use crate::error::{DataFusionError, Result}; @@ -212,7 +212,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool { mod tests { use super::*; use crate::physical_plan::expressions::col; - use arrow2::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, Schema}; #[test] fn test_maybe_data_types() { diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs index c50e1991c8153..f7515d326d0a5 100644 --- a/datafusion/src/physical_plan/udaf.rs +++ b/datafusion/src/physical_plan/udaf.rs @@ -21,7 +21,7 @@ use fmt::{Debug, Formatter}; use std::any::Any; use std::fmt; -use arrow2::{ +use arrow::{ datatypes::Field, datatypes::{DataType, Schema}, }; diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs index 78f9f018cb9ca..a79c0a8a36059 100644 --- a/datafusion/src/physical_plan/udf.rs +++ b/datafusion/src/physical_plan/udf.rs @@ -20,7 +20,7 @@ use fmt::{Debug, Formatter}; use std::fmt; -use arrow2::datatypes::Schema; +use arrow::datatypes::Schema; use crate::error::Result; use crate::{logical_plan::Expr, physical_plan::PhysicalExpr}; diff --git a/datafusion/src/physical_plan/unicode_expressions.rs b/datafusion/src/physical_plan/unicode_expressions.rs index 00ac6ed93abb2..6340a63a264e4 100644 --- a/datafusion/src/physical_plan/unicode_expressions.rs +++ b/datafusion/src/physical_plan/unicode_expressions.rs @@ -25,14 +25,12 @@ use std::any::type_name; use std::cmp::Ordering; use std::sync::Arc; -use arrow2::array::*; +use arrow::array::*; use hashbrown::HashMap; use unicode_segmentation::UnicodeSegmentation; use crate::error::{DataFusionError, Result}; -type ArrayRef = Arc; - macro_rules! downcast_string_arg { ($ARG:expr, $NAME:expr, $T:ident) => {{ $ARG.as_any() diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index 836045354821e..cbab728a8428b 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -23,8 +23,7 @@ use std::{any::Any, sync::Arc}; -use arrow2::datatypes::Schema; -type SchemaRef = Arc; +use arrow::datatypes::SchemaRef; use super::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use crate::error::Result; @@ -105,7 +104,7 @@ mod tests { csv::{CsvExec, CsvReadOptions}, }; use crate::test; - use arrow2::record_batch::RecordBatch; + use arrow::record_batch::RecordBatch; #[tokio::test] async fn test_union_partitions() -> Result<()> { diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index 03a468ca29457..774464c20420d 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -25,7 +25,7 @@ use crate::physical_plan::{ aggregates, aggregates::AggregateFunction, functions::Signature, type_coercion::data_types, PhysicalExpr, WindowAccumulator, }; -use arrow2::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field}; use std::any::Any; use std::sync::Arc; use std::{fmt, str::FromStr}; diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 0c941d3b00546..801d9723b620f 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -27,10 +27,10 @@ use crate::physical_plan::{ RecordBatchStream, SendableRecordBatchStream, WindowAccumulator, WindowExpr, }; use crate::scalar::ScalarValue; -use arrow2::compute::concat; -use arrow2::{ +use arrow::compute::concat; +use arrow::{ array::*, - datatypes::{Field, Schema}, + datatypes::{Field, Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; @@ -44,9 +44,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -type ArrayRef = Arc; -type SchemaRef = Arc; - /// Window execution plan #[derive(Debug)] pub struct WindowAggExec { diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index ac5af03f02f5c..6b3e95fd0a7df 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -19,7 +19,7 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; -use arrow2::{ +use arrow::{ array::*, bitmap::MutableBitmap, buffer::MutableBuffer, @@ -28,7 +28,6 @@ use arrow2::{ types::days_ms, }; -type ArrayRef = Arc; type StringArray = Utf8Array; type LargeStringArray = Utf8Array; type SmallBinaryArray = BinaryArray; @@ -382,7 +381,7 @@ impl ScalarValue { offsets.push(length); length }); - let values = arrow2::compute::concat::concatenate(&values)?; + let values = arrow::compute::concat::concatenate(&values)?; Arc::new(ListArray::from_data( data_type, offsets.into(), @@ -490,8 +489,8 @@ impl ScalarValue { .take(size) .collect::>(); let values = - arrow2::compute::concat::concatenate(&refs).unwrap().into(); - let offsets: arrow2::buffer::Buffer = + arrow::compute::concat::concatenate(&refs).unwrap().into(); + let offsets: arrow::buffer::Buffer = (0..=size).map(|i| (i * length) as i32).collect(); Arc::new(ListArray::from_data( data_type.clone(), @@ -861,7 +860,7 @@ impl fmt::Debug for ScalarValue { #[cfg(test)] mod tests { - use arrow2::datatypes::Field; + use arrow::datatypes::Field; use super::*; diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index d8490ce9d53b8..df10f8f29c437 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -39,8 +39,8 @@ use crate::{ sql::parser::{CreateExternalTable, FileType, Statement as DFStatement}, }; -use arrow2::datatypes::*; -use arrow2::types::days_ms; +use arrow::datatypes::*; +use arrow::types::days_ms; use hashbrown::HashMap; diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 3ec49548ea8c4..514508cfe143c 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -17,16 +17,13 @@ //! Simple iterator over batches for use in testing -use std::sync::Arc; use std::task::{Context, Poll}; -use arrow2::{ - datatypes::Schema, error::Result as ArrowResult, record_batch::RecordBatch, +use arrow::{ + datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, }; use futures::Stream; -type SchemaRef = Arc; - use crate::physical_plan::RecordBatchStream; /// Index into the data that has been returned so far diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 12c9c1fe369ed..b4b8ad48202ec 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -25,9 +25,9 @@ use std::{env, error::Error, path::PathBuf}; use tempfile::TempDir; -use arrow2::array::*; -use arrow2::datatypes::*; -use arrow2::record_batch::RecordBatch; +use arrow::array::*; +use arrow::datatypes::*; +use arrow::record_batch::RecordBatch; use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; @@ -292,7 +292,7 @@ macro_rules! assert_batches_eq { let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); - let formatted = arrow2::io::print::write($CHUNKS).unwrap(); + let formatted = arrow::io::print::write($CHUNKS).unwrap(); let actual_lines: Vec<&str> = formatted.trim().lines().collect(); @@ -326,7 +326,7 @@ macro_rules! assert_batches_sorted_eq { expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable() } - let formatted = arrow2::io::print::write($CHUNKS).unwrap(); + let formatted = arrow::io::print::write($CHUNKS).unwrap(); // fix for windows: \r\n --> let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 2f8cd03199b3f..99690d8f2b93b 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use arrow2::array::Int32Array; -use arrow2::datatypes::{DataType, Field, Schema}; -use arrow2::error::Result as ArrowResult; -use arrow2::record_batch::RecordBatch; +use arrow::array::Int32Array; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; use datafusion::{ datasource::{datasource::Statistics, TableProvider}, diff --git a/datafusion/tests/dataframe.rs b/datafusion/tests/dataframe.rs index 14d8f203af909..2af8d5320af2f 100644 --- a/datafusion/tests/dataframe.rs +++ b/datafusion/tests/dataframe.rs @@ -17,8 +17,8 @@ use std::sync::Arc; -use arrow2::datatypes::{DataType, Field, Schema}; -use arrow2::{ +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::{ array::{Int32Array, Utf8Array}, record_batch::RecordBatch, }; diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index 8316649511938..40b3bceb0c6a9 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use arrow2::array::*; -use arrow2::datatypes::*; -use arrow2::record_batch::RecordBatch; +use arrow::array::*; +use arrow::datatypes::*; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::datasource::datasource::{ Statistics, TableProvider, TableProviderFilterPushDown, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 6db940223823e..405c58c9976aa 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use chrono::Duration; -use arrow2::{array::*, datatypes::*, record_batch::RecordBatch}; +use arrow::{array::*, datatypes::*, record_batch::RecordBatch}; use chrono::TimeZone; use datafusion::logical_plan::LogicalPlan; @@ -2078,7 +2078,7 @@ async fn query_on_string_dictionary() -> Result<()> { )])); let data = vec![Some("one"), None, Some("three")]; - let data = data.into_iter().map(arrow2::error::Result::Ok); + let data = data.into_iter().map(arrow::error::Result::Ok); let array = DictionaryPrimitive::, _>::try_from_iter(data)? .into_arc(); @@ -2955,7 +2955,7 @@ async fn test_cast_expressions_error() -> Result<()> { Ok(_) => panic!("expected error"), Err(e) => { assert!(e.to_string().contains( - "Cast error: Cannot cast string 'c' to value of arrow2::datatypes::types::Int32Type type" + "Cast error: Cannot cast string 'c' to value of arrow::datatypes::types::Int32Type type" )) } } diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index aeaac360db0be..1f9487734b689 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -60,7 +60,7 @@ use futures::{Stream, StreamExt}; -use arrow2::{ +use arrow::{ array::{Int64Array, Utf8Array}, datatypes::Schema, error::ArrowError,