Skip to content

Commit

Permalink
refactor to support str and bin
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Jan 17, 2022
1 parent f027e5f commit 8ad8c0a
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 61 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/dataframe_in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() -> Result<()> {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(StringArray::from_slice(&["a", "b", "c", "d"])),
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
],
)?;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3885,8 +3885,8 @@ mod tests {
Arc::new(Float64Array::from_slice(&[1.0])),
Arc::new(StringArray::from(vec![Some("foo")])),
Arc::new(LargeStringArray::from(vec![Some("bar")])),
Arc::new(BinaryArray::from(vec![b"foo" as &[u8]])),
Arc::new(LargeBinaryArray::from(vec![b"foo" as &[u8]])),
Arc::new(BinaryArray::from_slice(&[b"foo" as &[u8]])),
Arc::new(LargeBinaryArray::from_slice(&[b"foo" as &[u8]])),
Arc::new(TimestampNanosecondArray::from_opt_vec(
vec![Some(123)],
None,
Expand Down Expand Up @@ -4150,7 +4150,7 @@ mod tests {

// create mock record batch
let ids = Arc::new(Int32Array::from_slice(&[i as i32]));
let names = Arc::new(StringArray::from(vec!["test"]));
let names = Arc::new(StringArray::from_slice(&["test"]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![ids, names]).unwrap();

Expand Down
114 changes: 103 additions & 11 deletions datafusion/src/from_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,119 @@
//!
//! This file essentially exists to ease the transition onto arrow2
use arrow::array::{ArrayData, PrimitiveArray};
use arrow::buffer::Buffer;
use arrow::datatypes::ArrowPrimitiveType;
use arrow::array::{
ArrayData, BinaryOffsetSizeTrait, BooleanArray, GenericBinaryArray,
GenericStringArray, PrimitiveArray, StringOffsetSizeTrait,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{ArrowPrimitiveType, DataType};
use arrow::util::bit_util;

/// A trait to define from_slice functions for arrow primitive array types
pub trait FromSlice<T>
pub trait FromSlice<S, E>
where
T: ArrowPrimitiveType,
S: AsRef<[E]>,
{
/// convert a slice of native types into a primitive array (without nulls)
fn from_slice(slice: &[T::Native]) -> PrimitiveArray<T>;
fn from_slice(slice: S) -> Self;
}

/// default implementation for primitive types
// #[cfg(test)]
impl<T: ArrowPrimitiveType> FromSlice<T> for PrimitiveArray<T> {
fn from_slice(slice: &[T::Native]) -> PrimitiveArray<T> {
/// default implementation for primitive array types, adapted from `From<Vec<_>>`
impl<S, T> FromSlice<S, T::Native> for PrimitiveArray<T>
where
T: ArrowPrimitiveType,
S: AsRef<[T::Native]>,
{
fn from_slice(slice: S) -> Self {
let slice = slice.as_ref();
let array_data = ArrayData::builder(T::DATA_TYPE)
.len(slice.len())
.add_buffer(Buffer::from_slice_ref(&slice));
let array_data = unsafe { array_data.build_unchecked() };
PrimitiveArray::<T>::from(array_data)
Self::from(array_data)
}
}

/// default implementation for binary array types, adapted from `From<Vec<_>>`
impl<S, I, OffsetSize> FromSlice<S, I> for GenericBinaryArray<OffsetSize>
where
OffsetSize: BinaryOffsetSizeTrait,
S: AsRef<[I]>,
I: AsRef<[u8]>,
{
fn from_slice(slice: S) -> Self {
let slice = slice.as_ref();
let mut offsets = Vec::with_capacity(slice.len() + 1);
let mut values = Vec::new();
let mut length_so_far: OffsetSize = OffsetSize::zero();
offsets.push(length_so_far);
for s in slice {
let s = s.as_ref();
length_so_far += OffsetSize::from_usize(s.len()).unwrap();
offsets.push(length_so_far);
values.extend_from_slice(s);
}
let array_data = ArrayData::builder(OffsetSize::DATA_TYPE)
.len(slice.len())
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_buffer(Buffer::from_slice_ref(&values));
let array_data = unsafe { array_data.build_unchecked() };
Self::from(array_data)
}
}

/// default implementation for utf8 array types, adapted from `From<Vec<_>>`
impl<S, I, OffsetSize> FromSlice<S, I> for GenericStringArray<OffsetSize>
where
OffsetSize: StringOffsetSizeTrait,
S: AsRef<[I]>,
I: AsRef<str>,
{
fn from_slice(slice: S) -> Self {
let slice = slice.as_ref();
let mut offsets =
MutableBuffer::new((slice.len() + 1) * std::mem::size_of::<OffsetSize>());
let mut values = MutableBuffer::new(0);

let mut length_so_far = OffsetSize::zero();
offsets.push(length_so_far);

for s in slice {
let s = s.as_ref();
length_so_far += OffsetSize::from_usize(s.len()).unwrap();
offsets.push(length_so_far);
values.extend_from_slice(s.as_bytes());
}
let array_data = ArrayData::builder(OffsetSize::DATA_TYPE)
.len(slice.len())
.add_buffer(offsets.into())
.add_buffer(values.into());
let array_data = unsafe { array_data.build_unchecked() };
Self::from(array_data)
}
}

/// default implementation for boolean array type, adapted from `From<Vec<bool>>`
impl<S> FromSlice<S, bool> for BooleanArray
where
S: AsRef<[bool]>,
{
fn from_slice(slice: S) -> Self {
let slice = slice.as_ref();
let mut mut_buf = MutableBuffer::new_null(slice.len());
{
let mut_slice = mut_buf.as_slice_mut();
for (i, b) in slice.iter().enumerate() {
if *b {
bit_util::set_bit(mut_slice, i);
}
}
}
let array_data = ArrayData::builder(DataType::Boolean)
.len(slice.len())
.add_buffer(mut_buf.into());

let array_data = unsafe { array_data.build_unchecked() };
Self::from(array_data)
}
}
6 changes: 3 additions & 3 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,15 +706,15 @@ enum StatisticsType {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::from_slice::FromSlice;
use crate::logical_plan::{col, lit};
use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType};
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray},
datatypes::{DataType, TimeUnit},
};
use std::collections::HashMap;

#[derive(Debug)]
/// Test for container stats
Expand Down Expand Up @@ -972,7 +972,7 @@ mod tests {

// Note the statistics return binary (which can't be cast to string)
let statistics = OneContainerStats {
min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
min_values: Some(Arc::new(BinaryArray::from_slice(&[&[255u8] as &[u8]]))),
max_values: None,
num_containers: 1,
};
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,12 @@ mod tests {

let zero_count_values = BooleanArray::from(Vec::<bool>::new());

let one_count_values = BooleanArray::from(vec![false, false]);
let one_count_values = BooleanArray::from_slice(&[false, false]);
let one_count_values_with_null =
BooleanArray::from(vec![Some(true), Some(true), None, None]);

let two_count_values = BooleanArray::from(vec![true, false, true, false, true]);
let two_count_values =
BooleanArray::from_slice(&[true, false, true, false, true]);
let two_count_values_with_null = BooleanArray::from(vec![
Some(true),
Some(false),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub fn cast(
mod tests {
use super::*;
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::physical_plan::expressions::col;
use arrow::{
array::{
Expand Down Expand Up @@ -458,7 +459,7 @@ mod tests {
fn invalid_cast_with_options_error() -> Result<()> {
// Ensure a useful error happens at plan time if invalid casts are used
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
let a = StringArray::from(vec!["9.1"]);
let a = StringArray::from_slice(&["9.1"]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
let expression = cast_with_options(
col("a", &schema)?,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod tests {
#[test]
fn count_utf8() -> Result<()> {
let a: ArrayRef =
Arc::new(StringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"]));
Arc::new(StringArray::from_slice(&["a", "bb", "ccc", "dddd", "ad"]));
generic_test_op!(
a,
DataType::Utf8,
Expand All @@ -221,8 +221,9 @@ mod tests {

#[test]
fn count_large_utf8() -> Result<()> {
let a: ArrayRef =
Arc::new(LargeStringArray::from(vec!["a", "bb", "ccc", "dddd", "ad"]));
let a: ArrayRef = Arc::new(LargeStringArray::from_slice(&[
"a", "bb", "ccc", "dddd", "ad",
]));
generic_test_op!(
a,
DataType::LargeUtf8,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub fn is_not_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::expressions::col;
use arrow::{
array::{BooleanArray, StringArray},
Expand All @@ -110,7 +111,7 @@ mod tests {
.downcast_ref::<BooleanArray>()
.expect("failed to downcast to BooleanArray");

let expected = &BooleanArray::from(vec![true, false]);
let expected = &BooleanArray::from_slice(&[true, false]);

assert_eq!(expected, result);

Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub fn is_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::expressions::col;
use arrow::{
array::{BooleanArray, StringArray},
Expand All @@ -111,7 +112,7 @@ mod tests {
.downcast_ref::<BooleanArray>()
.expect("failed to downcast to BooleanArray");

let expected = &BooleanArray::from(vec![false, true]);
let expected = &BooleanArray::from_slice(&[false, true]);

assert_eq!(expected, result);

Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/physical_plan/expressions/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ mod tests {

#[test]
fn max_utf8() -> Result<()> {
let a: ArrayRef = Arc::new(StringArray::from(vec!["d", "a", "c", "b"]));
let a: ArrayRef = Arc::new(StringArray::from_slice(&["d", "a", "c", "b"]));
generic_test_op!(
a,
DataType::Utf8,
Expand All @@ -787,7 +787,7 @@ mod tests {

#[test]
fn max_large_utf8() -> Result<()> {
let a: ArrayRef = Arc::new(LargeStringArray::from(vec!["d", "a", "c", "b"]));
let a: ArrayRef = Arc::new(LargeStringArray::from_slice(&["d", "a", "c", "b"]));
generic_test_op!(
a,
DataType::LargeUtf8,
Expand All @@ -799,7 +799,7 @@ mod tests {

#[test]
fn min_utf8() -> Result<()> {
let a: ArrayRef = Arc::new(StringArray::from(vec!["d", "a", "c", "b"]));
let a: ArrayRef = Arc::new(StringArray::from_slice(&["d", "a", "c", "b"]));
generic_test_op!(
a,
DataType::Utf8,
Expand All @@ -811,7 +811,7 @@ mod tests {

#[test]
fn min_large_utf8() -> Result<()> {
let a: ArrayRef = Arc::new(LargeStringArray::from(vec!["d", "a", "c", "b"]));
let a: ArrayRef = Arc::new(LargeStringArray::from_slice(&["d", "a", "c", "b"]));
generic_test_op!(
a,
DataType::LargeUtf8,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/expressions/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl PartitionEvaluator for NumRowsEvaluator {
mod tests {
use super::*;
use crate::error::Result;
use crate::from_slice::FromSlice;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};

Expand All @@ -105,7 +106,7 @@ mod tests {

#[test]
fn row_number_all_values() -> Result<()> {
let arr: ArrayRef = Arc::new(BooleanArray::from(vec![
let arr: ArrayRef = Arc::new(BooleanArray::from_slice(&[
true, false, true, false, false, true, false, true,
]));
let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3989,8 +3989,8 @@ mod tests {
#[test]
fn test_array() -> Result<()> {
generic_test_array(
Arc::new(StringArray::from(vec!["aa"])),
Arc::new(StringArray::from(vec!["bb"])),
Arc::new(StringArray::from_slice(&["aa"])),
Arc::new(StringArray::from_slice(&["bb"])),
DataType::Utf8,
"StringArray\n[\n \"aa\",\n \"bb\",\n]",
)?;
Expand Down Expand Up @@ -4019,7 +4019,7 @@ mod tests {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
let ctx_state = ExecutionContextState::new();

let col_value: ArrayRef = Arc::new(StringArray::from(vec!["aaa-555"]));
let col_value: ArrayRef = Arc::new(StringArray::from_slice(&["aaa-555"]));
let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string())));
let columns: Vec<ArrayRef> = vec![col_value];
let expr = create_physical_expr(
Expand Down
11 changes: 6 additions & 5 deletions datafusion/src/physical_plan/regex_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,14 @@ pub fn regexp_replace<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<Arr
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use arrow::array::*;

#[test]
fn test_case_sensitive_regexp_match() {
let values = StringArray::from(vec!["abc"; 5]);
let values = StringArray::from_slice(&["abc"; 5]);
let patterns =
StringArray::from(vec!["^(a)", "^(A)", "(b|d)", "(B|D)", "^(b|c)"]);
StringArray::from_slice(&["^(a)", "^(A)", "(b|d)", "(B|D)", "^(b|c)"]);

let elem_builder: GenericStringBuilder<i32> = GenericStringBuilder::new(0);
let mut expected_builder = ListBuilder::new(elem_builder);
Expand All @@ -208,10 +209,10 @@ mod tests {

#[test]
fn test_case_insensitive_regexp_match() {
let values = StringArray::from(vec!["abc"; 5]);
let values = StringArray::from_slice(&["abc"; 5]);
let patterns =
StringArray::from(vec!["^(a)", "^(A)", "(b|d)", "(B|D)", "^(b|c)"]);
let flags = StringArray::from(vec!["i"; 5]);
StringArray::from_slice(&["^(a)", "^(A)", "(b|d)", "(B|D)", "^(b|c)"]);
let flags = StringArray::from_slice(&["i"; 5]);

let elem_builder: GenericStringBuilder<i32> = GenericStringBuilder::new(0);
let mut expected_builder = ListBuilder::new(elem_builder);
Expand Down
Loading

0 comments on commit 8ad8c0a

Please sign in to comment.