Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Validity taken into account when writing StructArray to json #511

Merged
merged 13 commits into from
Oct 10, 2021
Merged
107 changes: 87 additions & 20 deletions src/io/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use serde_json::map::Map;
use serde_json::{Number, Value};

use crate::bitmap::utils::zip_validity;
use crate::bitmap::Bitmap;
use crate::{array::*, datatypes::*, record_batch::RecordBatch, types::NativeType};

trait JsonSerializable {
Expand Down Expand Up @@ -102,7 +104,11 @@ where
array.iter().map(to_json).collect()
}

fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec<Map<String, Value>> {
fn struct_array_to_jsonmap_array(
array: &StructArray,
validity: Option<&Bitmap>,
row_count: usize,
) -> Vec<Map<String, Value>> {
// {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}}
// [
// {"a": 1, "b": a, "c": {"a": 1}},
Expand All @@ -117,7 +123,6 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec<M
.take(row_count)
.collect::<Vec<Map<String, Value>>>();

// todo: use validity...
array
.values()
.iter()
Expand All @@ -127,6 +132,7 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec<M
&mut inner_objs,
row_count,
struct_col.as_ref(),
combine_validity(struct_col.validity(), validity).as_ref(),
fields[j].name(),
);
});
Expand Down Expand Up @@ -200,6 +206,7 @@ fn write_array(array: &dyn Array) -> Value {
DataType::Struct(_) => {
let jsonmaps = struct_array_to_jsonmap_array(
array.as_any().downcast_ref::<StructArray>().unwrap(),
array.validity(),
array.len(),
);
jsonmaps.into_iter().map(Value::Object).collect()
Expand All @@ -217,12 +224,16 @@ fn set_column_by_primitive_type<T: NativeType + JsonSerializable>(
rows: &mut [Map<String, Value>],
row_count: usize,
array: &dyn Array,
validity: Option<&Bitmap>,
col_name: &str,
) {
let primitive_arr = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();

rows.iter_mut()
.zip(primitive_arr.iter())
.zip(zip_validity(
primitive_arr.values().iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
let value = to_json::<T>(value);
Expand All @@ -234,6 +245,7 @@ fn set_column_for_json_rows(
rows: &mut [Map<String, Value>],
row_count: usize,
array: &dyn Array,
validity: Option<&Bitmap>,
col_name: &str,
) {
match array.data_type() {
Expand All @@ -243,7 +255,10 @@ fn set_column_for_json_rows(
DataType::Boolean => {
let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values().iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -252,20 +267,43 @@ fn set_column_for_json_rows(
);
});
}
DataType::Int8 => set_column_by_primitive_type::<i8>(rows, row_count, array, col_name),
DataType::Int16 => set_column_by_primitive_type::<i16>(rows, row_count, array, col_name),
DataType::Int32 => set_column_by_primitive_type::<i32>(rows, row_count, array, col_name),
DataType::Int64 => set_column_by_primitive_type::<i64>(rows, row_count, array, col_name),
DataType::UInt8 => set_column_by_primitive_type::<u8>(rows, row_count, array, col_name),
DataType::UInt16 => set_column_by_primitive_type::<u16>(rows, row_count, array, col_name),
DataType::UInt32 => set_column_by_primitive_type::<u32>(rows, row_count, array, col_name),
DataType::UInt64 => set_column_by_primitive_type::<u64>(rows, row_count, array, col_name),
DataType::Float32 => set_column_by_primitive_type::<f32>(rows, row_count, array, col_name),
DataType::Float64 => set_column_by_primitive_type::<f64>(rows, row_count, array, col_name),
DataType::Int8 => {
set_column_by_primitive_type::<i8>(rows, row_count, array, validity, col_name)
}
DataType::Int16 => {
set_column_by_primitive_type::<i16>(rows, row_count, array, validity, col_name)
}
DataType::Int32 => {
set_column_by_primitive_type::<i32>(rows, row_count, array, validity, col_name)
}
DataType::Int64 => {
set_column_by_primitive_type::<i64>(rows, row_count, array, validity, col_name)
}
DataType::UInt8 => {
set_column_by_primitive_type::<u8>(rows, row_count, array, validity, col_name)
}
DataType::UInt16 => {
set_column_by_primitive_type::<u16>(rows, row_count, array, validity, col_name)
}
DataType::UInt32 => {
set_column_by_primitive_type::<u32>(rows, row_count, array, validity, col_name)
}
DataType::UInt64 => {
set_column_by_primitive_type::<u64>(rows, row_count, array, validity, col_name)
}
DataType::Float32 => {
set_column_by_primitive_type::<f32>(rows, row_count, array, validity, col_name)
}
DataType::Float64 => {
set_column_by_primitive_type::<f64>(rows, row_count, array, validity, col_name)
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values_iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -279,7 +317,10 @@ fn set_column_for_json_rows(
DataType::LargeUtf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values().iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -292,7 +333,11 @@ fn set_column_for_json_rows(
}
DataType::Struct(_) => {
let array = array.as_any().downcast_ref::<StructArray>().unwrap();
let inner_objs = struct_array_to_jsonmap_array(array, row_count);
let inner_objs = struct_array_to_jsonmap_array(
array,
combine_validity(validity, array.validity()).as_ref(),
row_count,
);
rows.iter_mut()
.take(row_count)
.zip(inner_objs.into_iter())
Expand All @@ -303,7 +348,10 @@ fn set_column_for_json_rows(
DataType::List(_) => {
let array = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values_iter(),
validity.map(|x| x.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -317,7 +365,10 @@ fn set_column_for_json_rows(
DataType::LargeList(_) => {
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values_iter(),
validity.map(|x| x.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand Down Expand Up @@ -365,10 +416,26 @@ pub fn write_record_batches(batches: &[RecordBatch]) -> Vec<Map<String, Value>>
let row_count = batch.num_rows();
batch.columns().iter().enumerate().for_each(|(j, col)| {
let col_name = schema.field(j).name();
set_column_for_json_rows(&mut rows[base..], row_count, col.as_ref(), col_name);
set_column_for_json_rows(
&mut rows[base..],
row_count,
col.as_ref(),
col.validity(),
col_name,
);
});
base += row_count;
});
}
rows
}

// Combine the two incoming validity `Bitmaps` using `and` logic
fn combine_validity(validity1: Option<&Bitmap>, validity2: Option<&Bitmap>) -> Option<Bitmap> {
match (validity1, validity2) {
(Some(v1), Some(v2)) => Some(v1 & v2),
(Some(v1), None) => Some(v1.clone()),
(None, Some(v2)) => Some(v2.clone()),
(None, None) => None,
}
}
49 changes: 49 additions & 0 deletions tests/it/io/json/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,55 @@ fn write_simple_rows() {
);
}

#[test]
fn write_nested_struct_with_validity() {
let inner = vec![
Field::new("c121", DataType::Utf8, false),
Field::new("c122", DataType::Int32, false),
];
let fields = vec![
Field::new("c11", DataType::Int32, false),
Field::new("c12", DataType::Struct(inner.clone()), false),
];
let schema = Schema::new(vec![
Field::new("c1", DataType::Struct(fields.clone()), false),
Field::new("c2", DataType::Utf8, false),
]);

let c1 = StructArray::from_data(
DataType::Struct(fields),
vec![
Arc::new(Int32Array::from(&[Some(1), None, Some(5)])),
Arc::new(StructArray::from_data(
DataType::Struct(inner),
vec![
Arc::new(Utf8Array::<i32>::from(&vec![None, Some("f"), Some("g")])),
Arc::new(Int32Array::from(&[Some(20), None, Some(43)])),
],
Some(Bitmap::from([false, true, true])),
)),
],
Some(Bitmap::from([true, true, false])),
);
let c2 = Utf8Array::<i32>::from(&vec![Some("a"), Some("b"), Some("c")]);

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();

let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[batch]).unwrap();
}

assert_eq!(
String::from_utf8(buf).unwrap(),
r#"{"c1":{"c11":1,"c12":{"c121":null,"c122":null}},"c2":"a"}
{"c1":{"c11":null,"c12":{"c121":"f","c122":null}},"c2":"b"}
{"c1":{"c11":null,"c12":{"c121":null,"c122":null}},"c2":"c"}
VasanthakumarV marked this conversation as resolved.
Show resolved Hide resolved
"#
);
}
VasanthakumarV marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn write_nested_structs() {
let c121 = Field::new("c121", DataType::Utf8, false);
Expand Down