Skip to content

Commit

Permalink
Brought back parquet options and stream write.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 7, 2021
1 parent fe6b846 commit 78fa620
Show file tree
Hide file tree
Showing 106 changed files with 395 additions and 460 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5179,7 +5179,7 @@
* [ARROW-4384](https://issues.apache.org/jira/browse/ARROW-4384) - [C++] Running "format" target on new Windows 10 install opens "how do you want to open this file" dialog
* [ARROW-4385](https://issues.apache.org/jira/browse/ARROW-4385) - [Python] default\_version of a release should not include SNAPSHOT
* [ARROW-4389](https://issues.apache.org/jira/browse/ARROW-4389) - [R] Installing clang-tools in CI is failing on trusty
* [ARROW-4395](https://issues.apache.org/jira/browse/ARROW-4395) - ts-node throws type error running \`bin/arrow2csv.js\`
* [ARROW-4395](https://issues.apache.org/jira/browse/ARROW-4395) - ts-node throws type error running \`bin/arrowcsv.js\`
* [ARROW-4400](https://issues.apache.org/jira/browse/ARROW-4400) - [CI] install of clang tools failing
* [ARROW-4403](https://issues.apache.org/jira/browse/ARROW-4403) - [Rust] CI fails due to formatting errors
* [ARROW-4404](https://issues.apache.org/jira/browse/ARROW-4404) - [CI] AppVeyor toolchain build does not build anything
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ Run a SQL query against data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand All @@ -92,8 +92,8 @@ Use the DataFrame API to process data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }

arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" }
arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" }
arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" }

datafusion = { path = "../../../datafusion" }

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
unimplemented,
};

use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod roundtrip_tests {
use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use core::panic;
use datafusion::arrow2::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::physical_plan::functions::BuiltinScalarFunction::Sqrt;
use datafusion::{
logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder},
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{

use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
use datafusion::arrow2::datatypes::{DataType, Schema};
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::datasource::CsvFile;
use datafusion::logical_plan::{Expr, JoinType, LogicalPlan};
use datafusion::physical_plan::aggregates::AggregateFunction;
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod roundtrip_tests {
use datafusion::physical_plan::hash_utils::JoinType;
use std::{convert::TryInto, sync::Arc};

use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::ColumnarValue;
use datafusion::physical_plan::{
empty::EmptyExec,
Expand Down Expand Up @@ -146,7 +146,7 @@ mod roundtrip_tests {

#[test]
fn roundtrip_sort() -> Result<()> {
use datafusion::arrow2::compute::sort::SortOptions;
use datafusion::arrow::compute::sort::SortOptions;
let field_a = Field::new("a", DataType::Boolean, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

use std::{collections::HashMap, sync::Arc};

use datafusion::arrow2::array::*;
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow::array::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use serde::Serialize;
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::path::PathBuf;
use std::process;
use std::time::Instant;

use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::io::print;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::io::print;

use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
Expand Down
34 changes: 28 additions & 6 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use futures::StreamExt;

//use ballista::context::BallistaContext;

use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::io::parquet::write::CompressionCodec;
use datafusion::arrow2::io::print;
use datafusion::arrow2::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::io::parquet::write::{CompressionCodec, WriteOptions};
use datafusion::arrow::io::print;
use datafusion::arrow::record_batch::RecordBatch;

use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{CsvFile, MemTable, TableProvider};
Expand Down Expand Up @@ -406,7 +406,29 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
);
match opt.file_format.as_str() {
"csv" => ctx.write_csv(csv, output_path).await?,
"parquet" => ctx.write_parquet(csv, output_path).await?,
"parquet" => {
let compression = match opt.compression.as_str() {
"none" => CompressionCodec::Uncompressed,
"snappy" => CompressionCodec::Snappy,
"brotli" => CompressionCodec::Brotli,
"gzip" => CompressionCodec::Gzip,
"lz4" => CompressionCodec::Lz4,
"lz0" => CompressionCodec::Lzo,
"zstd" => CompressionCodec::Zstd,
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {}",
other
)))
}
};

let options = WriteOptions {
compression,
write_statistics: false,
};
ctx.write_parquet(csv, options, output_path).await?
}
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid output format: {}",
Expand Down Expand Up @@ -557,7 +579,7 @@ mod tests {
use std::env;
use std::sync::Arc;

use datafusion::arrow2::array::*;
use datafusion::arrow::array::*;

use datafusion::logical_plan::Expr;
use datafusion::logical_plan::Expr::Cast;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.
pub mod print_format;

use datafusion::arrow2::record_batch::RecordBatch;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use print_format::PrintFormat;
use std::time::Instant;
Expand Down
8 changes: 4 additions & 4 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

//! Print format variants
use datafusion::arrow2::io::{csv::write, json::Writer, print};
use datafusion::arrow2::record_batch::RecordBatch;
use datafusion::arrow::io::{csv::write, json::Writer, print};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use std::fmt;
use std::str::FromStr;
Expand Down Expand Up @@ -115,8 +115,8 @@ impl PrintFormat {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow2::array::Int32Array;
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow2::io::print;
use datafusion::arrow::io::print;

use datafusion::error::Result;
use datafusion::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl FlightService for FlightServiceImpl {
let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();

let options =
datafusion::arrow2::io::ipc::write::common::IpcWriteOptions::default();
datafusion::arrow::io::ipc::write::common::IpcWriteOptions::default();
let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema(
table.schema().as_ref(),
&options,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
crypto_expressions = ["md-5", "sha2"]
regex_expressions = ["regex", "lazy_static"]
unicode_expressions = ["unicode-segmentation"]
simd = ["arrow2/simd"]
simd = ["arrow/simd"]

[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "5e82a98787934829409bff249b84d437707cdde5" }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;

use arrow2::{
use arrow::{
array::*,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/filter_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow2::{
use arrow::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex};

use tokio::runtime::Runtime;

use arrow2::{
use arrow::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use criterion::Criterion;

use std::sync::{Arc, Mutex};

use arrow2::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema};

use datafusion::datasource::{CsvFile, CsvReadOptions, MemTable};
use datafusion::execution::context::ExecutionContext;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::{any, sync::Arc};

use arrow2::{
use arrow::{
array::*,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFrame API for building and executing query plans.
use crate::arrow2::record_batch::RecordBatch;
use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
Expand Down
8 changes: 3 additions & 5 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use std::io::{Read, Seek};
use std::string::String;
use std::sync::{Arc, Mutex};

use arrow2::datatypes::Schema;
use arrow2::io::csv::read as csv_read;
use arrow::datatypes::SchemaRef;
use arrow::io::csv::read as csv_read;

use crate::datasource::datasource::Statistics;
use crate::datasource::{Source, TableProvider};
Expand All @@ -49,8 +49,6 @@ use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};

type SchemaRef = Arc<Schema>;

/// Represents a CSV file with a provided schema
pub struct CsvFile {
source: Source,
Expand Down Expand Up @@ -225,7 +223,7 @@ mod tests {
use super::*;
use crate::prelude::*;

use arrow2::array::*;
use arrow::array::*;

#[tokio::test]
async fn csv_file_from_reader() -> Result<()> {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use std::sync::Arc;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;
use crate::{arrow2::datatypes::Schema, scalar::ScalarValue};

type SchemaRef = Arc<Schema>;
use crate::{arrow::datatypes::SchemaRef, scalar::ScalarValue};

/// This table statistics are estimates.
/// It can not be used directly in the precise compute
Expand Down
4 changes: 1 addition & 3 deletions datafusion/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use arrow2::datatypes::*;

type SchemaRef = Arc<Schema>;
use arrow::datatypes::*;

use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
ExecutionPlan,
},
};
use arrow2::datatypes::Schema;
use arrow2::io::json::infer_json_schema_from_seekable;
use arrow::datatypes::Schema;
use arrow::io::json::infer_json_schema_from_seekable;

use super::datasource::Statistics;

Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow2::array::Int64Array>()
.downcast_ref::<arrow::array::Int64Array>()
.unwrap()
.value(0),
100000000000011
Expand Down
10 changes: 4 additions & 6 deletions datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ use log::debug;
use std::any::Any;
use std::sync::Arc;

use arrow2::datatypes::{Field, Schema};
use arrow2::record_batch::RecordBatch;

type SchemaRef = Arc<Schema>;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -240,8 +238,8 @@ impl TableProvider for MemTable {
#[cfg(test)]
mod tests {
use super::*;
use arrow2::array::Int32Array;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
use std::collections::HashMap;

Expand Down
Loading

0 comments on commit 78fa620

Please sign in to comment.