diff --git a/Cargo.toml b/Cargo.toml index 3596ebec6f6b2..c0f56094f4721 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,5 @@ members = [ "ballista/rust/executor", "ballista/rust/scheduler", ] + +exclude = ["python"] diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 25f326c01b08e..642ffaa828660 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -40,8 +40,8 @@ tokio = "1.0" tonic = "0.4" uuid = { version = "0.8", features = ["v4"] } -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" } -arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "83ee995a0cf4c3415f54abc9fd39f26839247f7f" } +arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "83ee995a0cf4c3415f54abc9fd39f26839247f7f" } datafusion = { path = "../../../datafusion" } diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 1d0fedca7b4ef..69aff18f1da19 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -33,7 +33,7 @@ use crate::serde::scheduler::{ use arrow::record_batch::RecordBatch; use arrow::{ - array::{StringArray, StructArray}, + array::{StructArray, Utf8Array}, error::{ArrowError, Result as ArrowResult}, }; use arrow::{datatypes::Schema, datatypes::SchemaRef}; @@ -104,10 +104,8 @@ impl BallistaClient { let path = batch .column(0) .as_any() - .downcast_ref::() - .expect( - "execute_partition expected column 0 to be a StringArray", - ); + .downcast_ref::>() + .expect("execute_partition expected column 0 to be a Utf8Array"); let stats = batch .column(1) @@ -206,6 +204,7 @@ impl Stream for FlightDataStream { flight_data_to_arrow_batch( &flight_data_chunk, self.schema.clone(), + true, &[], ) }); diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index ac58e5955d9dd..10c4670e809aa 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -26,7 +26,7 @@ use std::{ unimplemented, }; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::logical_plan::{ abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin, sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, @@ -356,7 +356,6 @@ impl TryInto for &protobuf::arrow_type::ArrowTypeEnum { type Error = BallistaError; fn try_into(self) -> Result { use protobuf::arrow_type; - use DataType; Ok(match self { arrow_type::ArrowTypeEnum::None(_) => DataType::Null, arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean, @@ -470,7 +469,6 @@ impl TryInto for &protobuf::arrow_type::ArrowTypeEnum { #[allow(clippy::from_over_into)] impl Into for protobuf::PrimitiveScalarType { fn into(self) -> DataType { - use DataType; match self { protobuf::PrimitiveScalarType::Bool => DataType::Boolean, protobuf::PrimitiveScalarType::Uint8 => DataType::UInt8, diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 0d8cab9e2b08c..080c2c297cd76 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -129,7 +129,6 @@ impl TryInto> for &protobuf::PhysicalPlanNode { &filenames, Some(projection), None, - scan.batch_size as usize, scan.num_partitions as usize, None, )?)) diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 9ccea71c91946..9780b7941ae6f 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -141,51 +141,28 @@ impl PartitionStats { } pub fn to_arrow_arrayref(&self) -> Result, BallistaError> { - let mut field_builders = Vec::new(); - - let mut num_rows_builder = UInt64Builder::new(1); - match self.num_rows { - Some(n) => num_rows_builder.append_value(n)?, - None => num_rows_builder.append_null()?, - } - field_builders.push(Box::new(num_rows_builder) as Box); - - let mut num_batches_builder = UInt64Builder::new(1); - match self.num_batches { - Some(n) => num_batches_builder.append_value(n)?, - None => num_batches_builder.append_null()?, - } - field_builders.push(Box::new(num_batches_builder) as Box); - - let mut num_bytes_builder = UInt64Builder::new(1); - match self.num_bytes { - Some(n) => num_bytes_builder.append_value(n)?, - None => num_bytes_builder.append_null()?, - } - field_builders.push(Box::new(num_bytes_builder) as Box); - - let mut struct_builder = - StructBuilder::new(self.arrow_struct_fields(), field_builders); - struct_builder.append(true)?; - Ok(Arc::new(struct_builder.finish())) + let num_rows = Arc::new(UInt64Array::from(&[self.num_rows])) as ArrayRef; + let num_batches = Arc::new(UInt64Array::from(&[self.num_batches])) as ArrayRef; + let num_bytes = Arc::new(UInt64Array::from(&[self.num_bytes])) as ArrayRef; + let values = vec![num_rows, num_batches, num_bytes]; + + Ok(Arc::new(StructArray::from_data( + self.arrow_struct_fields(), + values, + None, + ))) } pub fn from_arrow_struct_array(struct_array: &StructArray) -> PartitionStats { - let num_rows = struct_array - .column_by_name("num_rows") - .expect("from_arrow_struct_array expected a field num_rows") + let num_rows = struct_array.values()[0] .as_any() .downcast_ref::() .expect("from_arrow_struct_array expected num_rows to be a UInt64Array"); - let num_batches = struct_array - .column_by_name("num_batches") - .expect("from_arrow_struct_array expected a field num_batches") + let num_batches = struct_array.values()[1] .as_any() .downcast_ref::() .expect("from_arrow_struct_array expected num_batches to be a UInt64Array"); - let num_bytes = struct_array - .column_by_name("num_bytes") - .expect("from_arrow_struct_array expected a field num_bytes") + let num_bytes = struct_array.values()[2] .as_any() .downcast_ref::() .expect("from_arrow_struct_array expected num_bytes to be a UInt64Array"); diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index dc570f81f2c7e..3c735c0080cde 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -26,12 +26,10 @@ use crate::error::{BallistaError, Result}; use crate::execution_plans::{QueryStageExec, UnresolvedShuffleExec}; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; -use arrow::array::{ - ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder, -}; +use arrow::array::*; use arrow::datatypes::{DataType, Field}; -use arrow::ipc::reader::FileReader; -use arrow::ipc::writer::FileWriter; +use arrow::io::ipc::read::FileReader; +use arrow::io::ipc::write::FileWriter; use arrow::record_batch::RecordBatch; use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use datafusion::logical_plan::Operator; @@ -59,7 +57,7 @@ pub async fn write_stream_to_disk( stream: &mut Pin>, path: &str, ) -> Result { - let file = File::create(&path).map_err(|e| { + let mut file = File::create(&path).map_err(|e| { BallistaError::General(format!( "Failed to create partition file at {}: {:?}", path, e @@ -69,7 +67,7 @@ pub async fn write_stream_to_disk( let mut num_rows = 0; let mut num_batches = 0; let mut num_bytes = 0; - let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?; + let mut writer = FileWriter::try_new(&mut file, stream.schema().as_ref())?; while let Some(result) = stream.next().await { let batch = result?; diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 7ab5ad15c6976..6a763420c7823 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -32,6 +32,7 @@ snmalloc = ["snmalloc-rs"] [dependencies] datafusion = { path = "../datafusion" } +ballista = { path = "../ballista/rust/client" } structopt = { version = "0.3", default-features = false } tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] } futures = "0.3" diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 86461d4fe6883..1914e9bc06c31 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -16,7 +16,11 @@ // under the License. //! Print format variants -use datafusion::arrow::io::{csv::write, json::Writer, print}; +use datafusion::arrow::io::{ + csv::write, + json::{JsonArray, JsonFormat, LineDelimited, Writer}, + print, +}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use std::fmt; @@ -69,10 +73,10 @@ impl fmt::Display for PrintFormat { } } -fn print_batches_to_json(batches: &[RecordBatch]) -> Result { +fn print_batches_to_json(batches: &[RecordBatch]) -> Result { let mut bytes = vec![]; { - let mut writer = Writer::new(&mut bytes); + let mut writer = Writer::<_, J>::new(&mut bytes); writer.write_batches(batches)?; } let formatted = String::from_utf8(bytes) @@ -103,9 +107,11 @@ impl PrintFormat { Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), Self::Table => print::print(batches)?, - Self::Json => println!("{}", print_batches_to_json(batches)?), + Self::Json => { + println!("{}", print_batches_to_json::(batches)?) + } Self::NdJson => { - todo!() + println!("{}", print_batches_to_json::(batches)?) } } Ok(()) @@ -180,10 +186,10 @@ mod tests { #[test] fn test_print_batches_to_json_empty() -> Result<()> { let batches = vec![]; - let r = batches_to_json!(ArrayWriter, &batches); + let r = print_batches_to_json::(&batches)?; assert_eq!("", r); - let r = batches_to_json!(LineDelimitedWriter, &batches); + let r = print_batches_to_json::(&batches)?; assert_eq!("", r); let schema = Arc::new(Schema::new(vec![ @@ -203,10 +209,10 @@ mod tests { .unwrap(); let batches = vec![batch]; - let r = batches_to_json!(ArrayWriter, &batches); + let r = print_batches_to_json::(&batches)?; assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]", r); - let r = batches_to_json!(LineDelimitedWriter, &batches); + let r = print_batches_to_json::(&batches)?; assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", r); Ok(()) } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index f45df094d3f01..2fb81b40efc51 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -46,7 +46,7 @@ simd = ["arrow/simd"] [dependencies] ahash = "0.7" hashbrown = "0.11" -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "83ee995a0cf4c3415f54abc9fd39f26839247f7f" } sqlparser = "0.9.0" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 405c58c9976aa..804111939828c 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1724,42 +1724,23 @@ async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec> { result_vec(&results) } -/// Specialised String representation -fn col_str(column: &dyn Array, row_index: usize) -> String { - if column.is_null(row_index) { - return "NULL".to_string(); - } - - // Special case ListArray as there is no pretty print support for it yet - if let DataType::FixedSizeList(_, n) = column.data_type() { - let array = column - .as_any() - .downcast_ref::() - .unwrap() - .value(row_index); - - let mut r = Vec::with_capacity(*n as usize); - for i in 0..*n { - r.push(col_str(array.as_ref(), i as usize)); - } - return format!("[{}]", r.join(",")); - } - - array_value_to_string(column, row_index) - .ok() - .unwrap_or_else(|| "???".to_string()) -} - /// Converts the results into a 2d array of strings, `result[row][column]` /// Special cases nulls to NULL for testing fn result_vec(results: &[RecordBatch]) -> Vec> { let mut result = vec![]; for batch in results { + let display_col = batch + .columns() + .iter() + .map(|x| { + get_display(x.as_ref()) + .unwrap_or_else(|_| Box::new(|x| "???".to_string())) + }) + .collect::>(); for row_index in 0..batch.num_rows() { - let row_vec = batch - .columns() + let row_vec = display_col .iter() - .map(|column| col_str(column.as_ref(), row_index)) + .map(|display_col| display_col(row_index)) .collect(); result.push(row_vec); } @@ -2340,7 +2321,7 @@ async fn group_by_timestamp_millis() -> Result<()> { let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Primitive::::from(timestamps).to(data_type)), + Arc::new(Primitive::::from_slice(×tamps).to(data_type)), Arc::new(Int32Array::from_slice(&[10, 20, 30, 40, 50, 60])), ], )?;