Skip to content

Commit

Permalink
feat: implement binview for polars-json (pola-rs#13737)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and r-brink committed Jan 24, 2024
1 parent dc97062 commit 4fe54d6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 188 deletions.
220 changes: 35 additions & 185 deletions crates/polars-json/src/json/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use std::fmt::Write;

use arrow::array::*;
use arrow::bitmap::MutableBitmap;
use arrow::chunk::Chunk;
use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit};
use arrow::datatypes::{ArrowDataType, IntervalUnit};
use arrow::offset::{Offset, Offsets};
use arrow::temporal_conversions;
use arrow::types::{f16, NativeType};
use arrow::types::NativeType;
use num_traits::NumCast;
use simd_json::{BorrowedValue, StaticNode};

Expand Down Expand Up @@ -69,6 +68,27 @@ fn deserialize_utf8_into<'a, O: Offset, A: Borrow<BorrowedValue<'a>>>(
}
}

fn deserialize_utf8view_into<'a, A: Borrow<BorrowedValue<'a>>>(
target: &mut MutableBinaryViewArray<str>,
rows: &[A],
) {
let mut scratch = String::new();
for row in rows {
match row.borrow() {
BorrowedValue::String(v) => target.push_value(v.as_ref()),
BorrowedValue::Static(StaticNode::Bool(v)) => {
target.push_value(if *v { "true" } else { "false" })
},
BorrowedValue::Static(node) if !matches!(node, StaticNode::Null) => {
write!(scratch, "{node}").unwrap();
target.push_value(scratch.as_str());
scratch.clear();
},
_ => target.push_null(),
}
}
}

fn deserialize_list<'a, A: Borrow<BorrowedValue<'a>>>(
rows: &[A],
data_type: ArrowDataType,
Expand Down Expand Up @@ -106,104 +126,6 @@ fn deserialize_list<'a, A: Borrow<BorrowedValue<'a>>>(
ListArray::<i64>::new(data_type, offsets.into(), values, validity.into())
}

// TODO: due to nesting, deduplicating this from the above is trickier than
// other `deserialize_xxx_into` functions. Punting on that for now.
fn deserialize_list_into<'a, A: Borrow<BorrowedValue<'a>>>(
target: &mut MutableListArray<i64, Box<dyn MutableArray>>,
rows: &[A],
) {
let empty = vec![];
let inner: Vec<_> = rows
.iter()
.flat_map(|row| match row.borrow() {
BorrowedValue::Array(value) => value.iter(),
_ => empty.iter(),
})
.collect();

deserialize_into(target.mut_values(), &inner);

let lengths = rows.iter().map(|row| match row.borrow() {
BorrowedValue::Array(value) => Some(value.len()),
_ => None,
});

target
.try_extend_from_lengths(lengths)
.expect("Offsets overflow");
}

fn primitive_dispatch<'a, A: Borrow<BorrowedValue<'a>>, T: NativeType>(
target: &mut Box<dyn MutableArray>,
rows: &[A],
deserialize_into: fn(&mut MutablePrimitiveArray<T>, &[A]) -> (),
) {
generic_deserialize_into(target, rows, deserialize_into)
}

fn generic_deserialize_into<'a, A: Borrow<BorrowedValue<'a>>, M: 'static>(
target: &mut Box<dyn MutableArray>,
rows: &[A],
deserialize_into: fn(&mut M, &[A]) -> (),
) {
deserialize_into(target.as_mut_any().downcast_mut::<M>().unwrap(), rows);
}

/// Deserialize `rows` by extending them into the given `target`
fn deserialize_into<'a, A: Borrow<BorrowedValue<'a>>>(
target: &mut Box<dyn MutableArray>,
rows: &[A],
) {
match target.data_type() {
ArrowDataType::Boolean => generic_deserialize_into(target, rows, deserialize_boolean_into),
ArrowDataType::Float32 => {
primitive_dispatch::<_, f32>(target, rows, deserialize_primitive_into)
},
ArrowDataType::Float64 => {
primitive_dispatch::<_, f64>(target, rows, deserialize_primitive_into)
},
ArrowDataType::Int8 => {
primitive_dispatch::<_, i8>(target, rows, deserialize_primitive_into)
},
ArrowDataType::Int16 => {
primitive_dispatch::<_, i16>(target, rows, deserialize_primitive_into)
},
ArrowDataType::Int32 => {
primitive_dispatch::<_, i32>(target, rows, deserialize_primitive_into)
},
ArrowDataType::Int64 => {
primitive_dispatch::<_, i64>(target, rows, deserialize_primitive_into)
},
ArrowDataType::UInt8 => {
primitive_dispatch::<_, u8>(target, rows, deserialize_primitive_into)
},
ArrowDataType::UInt16 => {
primitive_dispatch::<_, u16>(target, rows, deserialize_primitive_into)
},
ArrowDataType::UInt32 => {
primitive_dispatch::<_, u32>(target, rows, deserialize_primitive_into)
},
ArrowDataType::UInt64 => {
primitive_dispatch::<_, u64>(target, rows, deserialize_primitive_into)
},
ArrowDataType::LargeUtf8 => generic_deserialize_into::<_, MutableUtf8Array<i64>>(
target,
rows,
deserialize_utf8_into,
),
ArrowDataType::LargeList(_) => deserialize_list_into(
target
.as_mut_any()
.downcast_mut::<MutableListArray<i64, Box<dyn MutableArray>>>()
.unwrap(),
rows,
),
_ => {
todo!()
},
}
}

fn deserialize_struct<'a, A: Borrow<BorrowedValue<'a>>>(
rows: &[A],
data_type: ArrowDataType,
Expand Down Expand Up @@ -287,6 +209,15 @@ impl Container for MutableFixedSizeBinaryArray {
}
}

impl Container for MutableBinaryViewArray<str> {
fn with_capacity(capacity: usize) -> Self
where
Self: Sized,
{
MutableBinaryViewArray::with_capacity(capacity)
}
}

impl<O: Offset, M: MutableArray + Default + 'static> Container for MutableListArray<O, M> {
fn with_capacity(capacity: usize) -> Self {
MutableListArray::with_capacity(capacity)
Expand Down Expand Up @@ -399,6 +330,9 @@ pub(crate) fn _deserialize<'a, A: Borrow<BorrowedValue<'a>>>(
ArrowDataType::LargeUtf8 => {
fill_generic_array_from::<_, _, Utf8Array<i64>>(deserialize_utf8_into, rows)
},
ArrowDataType::Utf8View => {
fill_generic_array_from::<_, _, Utf8ViewArray>(deserialize_utf8view_into, rows)
},
ArrowDataType::LargeList(_) => Box::new(deserialize_list(rows, data_type)),
ArrowDataType::LargeBinary => Box::new(deserialize_binary(rows)),
ArrowDataType::Struct(_) => Box::new(deserialize_struct(rows, data_type)),
Expand All @@ -415,87 +349,3 @@ pub fn deserialize(json: &BorrowedValue, data_type: ArrowDataType) -> PolarsResu
_ => Ok(_deserialize(&[json], data_type)),
}
}

fn allocate_array(f: &Field) -> Box<dyn MutableArray> {
match f.data_type() {
ArrowDataType::Int8 => Box::new(MutablePrimitiveArray::<i8>::new()),
ArrowDataType::Int16 => Box::new(MutablePrimitiveArray::<i16>::new()),
ArrowDataType::Int32 => Box::new(MutablePrimitiveArray::<i32>::new()),
ArrowDataType::Int64 => Box::new(MutablePrimitiveArray::<i64>::new()),
ArrowDataType::UInt8 => Box::new(MutablePrimitiveArray::<u8>::new()),
ArrowDataType::UInt16 => Box::new(MutablePrimitiveArray::<u16>::new()),
ArrowDataType::UInt32 => Box::new(MutablePrimitiveArray::<u32>::new()),
ArrowDataType::UInt64 => Box::new(MutablePrimitiveArray::<u64>::new()),
ArrowDataType::Float16 => Box::new(MutablePrimitiveArray::<f16>::new()),
ArrowDataType::Float32 => Box::new(MutablePrimitiveArray::<f32>::new()),
ArrowDataType::Float64 => Box::new(MutablePrimitiveArray::<f64>::new()),
ArrowDataType::LargeList(inner) => match inner.data_type() {
ArrowDataType::LargeList(_) => Box::new(MutableListArray::<i64, _>::new_from(
allocate_array(inner),
inner.data_type().clone(),
0,
)),
_ => allocate_array(inner),
},
_ => todo!(),
}
}

/// Deserializes a `json` [`simd_json::value::Value`] serialized in Pandas record format into
/// a [`Chunk`].
///
/// Uses the `Schema` provided, which can be inferred from arbitrary JSON with
/// [`infer_records_schema`].
///
/// This is CPU-bounded.
///
/// # Errors
///
/// This function errors iff either:
///
/// * `json` is not an [`Array`]
/// * `data_type` contains any incompatible types:
/// * [`ArrowDataType::Struct`]
/// * [`ArrowDataType::Dictionary`]
/// * [`ArrowDataType::LargeList`]
pub fn deserialize_records(
json: &BorrowedValue,
schema: &ArrowSchema,
) -> PolarsResult<Chunk<ArrayRef>> {
let mut results = schema
.fields
.iter()
.map(|f| (f.name.as_str(), allocate_array(f)))
.collect::<PlHashMap<_, _>>();

match json {
BorrowedValue::Array(rows) => {
for row in rows.iter() {
match row {
BorrowedValue::Object(record) => {
for (key, value) in record.iter() {
let arr = results.get_mut(key.as_ref()).ok_or_else(|| {
PolarsError::ComputeError(format!("unexpected key: '{key}'").into())
})?;
deserialize_into(arr, &[value]);
}
},
_ => {
return Err(PolarsError::ComputeError(
"each row must be an Object".into(),
))
},
}
}
},
_ => {
return Err(PolarsError::ComputeError(
"outer type must be an Array".into(),
))
},
}

Ok(Chunk::new(
results.into_values().map(|mut ma| ma.as_box()).collect(),
))
}
21 changes: 18 additions & 3 deletions crates/polars-json/src/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ fn utf8_serializer<'a, O: Offset>(
materialize_serializer(f, array.iter(), offset, take)
}

fn utf8view_serializer<'a>(
array: &'a Utf8ViewArray,
offset: usize,
take: usize,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
if let Some(x) = x {
utf8::write_str(buf, x).unwrap();
} else {
buf.extend_from_slice(b"null")
}
};
materialize_serializer(f, array.iter(), offset, take)
}

fn struct_serializer<'a>(
array: &'a StructArray,
offset: usize,
Expand Down Expand Up @@ -406,12 +421,12 @@ pub(crate) fn new_serializer<'a>(
ArrowDataType::Float64 => {
float_serializer::<f64>(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Utf8 => {
utf8_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::LargeUtf8 => {
utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Utf8View => {
utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
ArrowDataType::Struct(_) => {
struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
},
Expand Down

0 comments on commit 4fe54d6

Please sign in to comment.