Skip to content

Commit

Permalink
Further improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 7, 2021
1 parent 78fa620 commit d71f268
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 94 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ members = [
"ballista/rust/executor",
"ballista/rust/scheduler",
]

exclude = ["python"]
4 changes: 2 additions & 2 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }

arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" }
arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "83ee995a0cf4c3415f54abc9fd39f26839247f7f" }
arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "83ee995a0cf4c3415f54abc9fd39f26839247f7f" }

datafusion = { path = "../../../datafusion" }

Expand Down
9 changes: 4 additions & 5 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::serde::scheduler::{

use arrow::record_batch::RecordBatch;
use arrow::{
array::{StringArray, StructArray},
array::{StructArray, Utf8Array},
error::{ArrowError, Result as ArrowResult},
};
use arrow::{datatypes::Schema, datatypes::SchemaRef};
Expand Down Expand Up @@ -104,10 +104,8 @@ impl BallistaClient {
let path = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect(
"execute_partition expected column 0 to be a StringArray",
);
.downcast_ref::<Utf8Array<i32>>()
.expect("execute_partition expected column 0 to be a Utf8Array");

let stats = batch
.column(1)
Expand Down Expand Up @@ -206,6 +204,7 @@ impl Stream for FlightDataStream {
flight_data_to_arrow_batch(
&flight_data_chunk,
self.schema.clone(),
true,
&[],
)
});
Expand Down
4 changes: 1 addition & 3 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
unimplemented,
};

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
Expand Down Expand Up @@ -356,7 +356,6 @@ impl TryInto<DataType> for &protobuf::arrow_type::ArrowTypeEnum {
type Error = BallistaError;
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::arrow_type;
use DataType;
Ok(match self {
arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
Expand Down Expand Up @@ -470,7 +469,6 @@ impl TryInto<DataType> for &protobuf::arrow_type::ArrowTypeEnum {
#[allow(clippy::from_over_into)]
impl Into<DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> DataType {
use DataType;
match self {
protobuf::PrimitiveScalarType::Bool => DataType::Boolean,
protobuf::PrimitiveScalarType::Uint8 => DataType::UInt8,
Expand Down
1 change: 0 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
&filenames,
Some(projection),
None,
scan.batch_size as usize,
scan.num_partitions as usize,
None,
)?))
Expand Down
49 changes: 13 additions & 36 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,51 +141,28 @@ impl PartitionStats {
}

pub fn to_arrow_arrayref(&self) -> Result<Arc<StructArray>, BallistaError> {
let mut field_builders = Vec::new();

let mut num_rows_builder = UInt64Builder::new(1);
match self.num_rows {
Some(n) => num_rows_builder.append_value(n)?,
None => num_rows_builder.append_null()?,
}
field_builders.push(Box::new(num_rows_builder) as Box<dyn ArrayBuilder>);

let mut num_batches_builder = UInt64Builder::new(1);
match self.num_batches {
Some(n) => num_batches_builder.append_value(n)?,
None => num_batches_builder.append_null()?,
}
field_builders.push(Box::new(num_batches_builder) as Box<dyn ArrayBuilder>);

let mut num_bytes_builder = UInt64Builder::new(1);
match self.num_bytes {
Some(n) => num_bytes_builder.append_value(n)?,
None => num_bytes_builder.append_null()?,
}
field_builders.push(Box::new(num_bytes_builder) as Box<dyn ArrayBuilder>);

let mut struct_builder =
StructBuilder::new(self.arrow_struct_fields(), field_builders);
struct_builder.append(true)?;
Ok(Arc::new(struct_builder.finish()))
let num_rows = Arc::new(UInt64Array::from(&[self.num_rows])) as ArrayRef;
let num_batches = Arc::new(UInt64Array::from(&[self.num_batches])) as ArrayRef;
let num_bytes = Arc::new(UInt64Array::from(&[self.num_bytes])) as ArrayRef;
let values = vec![num_rows, num_batches, num_bytes];

Ok(Arc::new(StructArray::from_data(
self.arrow_struct_fields(),
values,
None,
)))
}

pub fn from_arrow_struct_array(struct_array: &StructArray) -> PartitionStats {
let num_rows = struct_array
.column_by_name("num_rows")
.expect("from_arrow_struct_array expected a field num_rows")
let num_rows = struct_array.values()[0]
.as_any()
.downcast_ref::<UInt64Array>()
.expect("from_arrow_struct_array expected num_rows to be a UInt64Array");
let num_batches = struct_array
.column_by_name("num_batches")
.expect("from_arrow_struct_array expected a field num_batches")
let num_batches = struct_array.values()[1]
.as_any()
.downcast_ref::<UInt64Array>()
.expect("from_arrow_struct_array expected num_batches to be a UInt64Array");
let num_bytes = struct_array
.column_by_name("num_bytes")
.expect("from_arrow_struct_array expected a field num_bytes")
let num_bytes = struct_array.values()[2]
.as_any()
.downcast_ref::<UInt64Array>()
.expect("from_arrow_struct_array expected num_bytes to be a UInt64Array");
Expand Down
12 changes: 5 additions & 7 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ use crate::error::{BallistaError, Result};
use crate::execution_plans::{QueryStageExec, UnresolvedShuffleExec};
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;
use arrow::array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
};
use arrow::array::*;
use arrow::datatypes::{DataType, Field};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow::io::ipc::read::FileReader;
use arrow::io::ipc::write::FileWriter;
use arrow::record_batch::RecordBatch;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::logical_plan::Operator;
Expand Down Expand Up @@ -59,7 +57,7 @@ pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
let mut file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
"Failed to create partition file at {}: {:?}",
path, e
Expand All @@ -69,7 +67,7 @@ pub async fn write_stream_to_disk(
let mut num_rows = 0;
let mut num_batches = 0;
let mut num_bytes = 0;
let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;
let mut writer = FileWriter::try_new(&mut file, stream.schema().as_ref())?;

while let Some(result) = stream.next().await {
let batch = result?;
Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
futures = "0.3"
Expand Down
24 changes: 15 additions & 9 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
// under the License.

//! Print format variants
use datafusion::arrow::io::{csv::write, json::Writer, print};
use datafusion::arrow::io::{
csv::write,
json::{JsonArray, JsonFormat, LineDelimited, Writer},
print,
};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use std::fmt;
Expand Down Expand Up @@ -69,10 +73,10 @@ impl fmt::Display for PrintFormat {
}
}

fn print_batches_to_json(batches: &[RecordBatch]) -> Result<String> {
fn print_batches_to_json<J: JsonFormat>(batches: &[RecordBatch]) -> Result<String> {
let mut bytes = vec![];
{
let mut writer = Writer::new(&mut bytes);
let mut writer = Writer::<_, J>::new(&mut bytes);
writer.write_batches(batches)?;
}
let formatted = String::from_utf8(bytes)
Expand Down Expand Up @@ -103,9 +107,11 @@ impl PrintFormat {
Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?),
Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?),
Self::Table => print::print(batches)?,
Self::Json => println!("{}", print_batches_to_json(batches)?),
Self::Json => {
println!("{}", print_batches_to_json::<JsonArray>(batches)?)
}
Self::NdJson => {
todo!()
println!("{}", print_batches_to_json::<LineDelimited>(batches)?)
}
}
Ok(())
Expand Down Expand Up @@ -180,10 +186,10 @@ mod tests {
#[test]
fn test_print_batches_to_json_empty() -> Result<()> {
let batches = vec![];
let r = batches_to_json!(ArrayWriter, &batches);
let r = print_batches_to_json::<JsonArray>(&batches)?;
assert_eq!("", r);

let r = batches_to_json!(LineDelimitedWriter, &batches);
let r = print_batches_to_json::<LineDelimited>(&batches)?;
assert_eq!("", r);

let schema = Arc::new(Schema::new(vec![
Expand All @@ -203,10 +209,10 @@ mod tests {
.unwrap();

let batches = vec![batch];
let r = batches_to_json!(ArrayWriter, &batches);
let r = print_batches_to_json::<JsonArray>(&batches)?;
assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]", r);

let r = batches_to_json!(LineDelimitedWriter, &batches);
let r = print_batches_to_json::<LineDelimited>(&batches)?;
assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", r);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ simd = ["arrow/simd"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "05a415da0ec1bde2a46f45563f5d068719791466" }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "83ee995a0cf4c3415f54abc9fd39f26839247f7f" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
41 changes: 11 additions & 30 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1724,42 +1724,23 @@ async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<Vec<String>> {
result_vec(&results)
}

/// Specialised String representation
fn col_str(column: &dyn Array, row_index: usize) -> String {
if column.is_null(row_index) {
return "NULL".to_string();
}

// Special case ListArray as there is no pretty print support for it yet
if let DataType::FixedSizeList(_, n) = column.data_type() {
let array = column
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap()
.value(row_index);

let mut r = Vec::with_capacity(*n as usize);
for i in 0..*n {
r.push(col_str(array.as_ref(), i as usize));
}
return format!("[{}]", r.join(","));
}

array_value_to_string(column, row_index)
.ok()
.unwrap_or_else(|| "???".to_string())
}

/// Converts the results into a 2d array of strings, `result[row][column]`
/// Special cases nulls to NULL for testing
fn result_vec(results: &[RecordBatch]) -> Vec<Vec<String>> {
let mut result = vec![];
for batch in results {
let display_col = batch
.columns()
.iter()
.map(|x| {
get_display(x.as_ref())
.unwrap_or_else(|_| Box::new(|x| "???".to_string()))
})
.collect::<Vec<_>>();
for row_index in 0..batch.num_rows() {
let row_vec = batch
.columns()
let row_vec = display_col
.iter()
.map(|column| col_str(column.as_ref(), row_index))
.map(|display_col| display_col(row_index))
.collect();
result.push(row_vec);
}
Expand Down Expand Up @@ -2340,7 +2321,7 @@ async fn group_by_timestamp_millis() -> Result<()> {
let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Primitive::<i64>::from(timestamps).to(data_type)),
Arc::new(Primitive::<i64>::from_slice(&timestamps).to(data_type)),
Arc::new(Int32Array::from_slice(&[10, 20, 30, 40, 50, 60])),
],
)?;
Expand Down

0 comments on commit d71f268

Please sign in to comment.