Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Validity taken into account when writing StructArray to json #511

Merged
merged 13 commits into from
Oct 10, 2021
Merged
50 changes: 43 additions & 7 deletions src/io/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use serde_json::map::Map;
use serde_json::{Number, Value};

use crate::bitmap::utils::zip_validity;
use crate::bitmap::Bitmap;
use crate::{array::*, datatypes::*, record_batch::RecordBatch, types::NativeType};

trait JsonSerializable {
Expand Down Expand Up @@ -117,7 +119,17 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec<M
.take(row_count)
.collect::<Vec<Map<String, Value>>>();

// todo: use validity...
// Combine the two incoming validity `Bitmaps` using `and` logic
let combine_validity = |col_validity: Option<&Bitmap>, array_validity: Option<&Bitmap>| match (
col_validity,
array_validity,
) {
(Some(cv), Some(av)) => Some(cv & av),
(Some(cv), None) => Some(cv.clone()),
(None, Some(av)) => Some(av.clone()),
(None, None) => None,
};

array
.values()
.iter()
Expand All @@ -127,6 +139,8 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec<M
&mut inner_objs,
row_count,
struct_col.as_ref(),
// For each column we combine its validity with the `StructArray`'s validity
combine_validity(struct_col.validity(), array.validity()).as_ref(),
VasanthakumarV marked this conversation as resolved.
Show resolved Hide resolved
fields[j].name(),
);
});
Expand Down Expand Up @@ -234,6 +248,7 @@ fn set_column_for_json_rows(
rows: &mut [Map<String, Value>],
row_count: usize,
array: &dyn Array,
validity: Option<&Bitmap>,
col_name: &str,
) {
match array.data_type() {
Expand All @@ -243,7 +258,10 @@ fn set_column_for_json_rows(
DataType::Boolean => {
let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values().iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -265,7 +283,10 @@ fn set_column_for_json_rows(
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values().iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -279,7 +300,10 @@ fn set_column_for_json_rows(
DataType::LargeUtf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values().iter(),
validity.map(|v| v.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -303,7 +327,10 @@ fn set_column_for_json_rows(
DataType::List(_) => {
let array = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values_iter(),
validity.map(|x| x.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand All @@ -317,7 +344,10 @@ fn set_column_for_json_rows(
DataType::LargeList(_) => {
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
rows.iter_mut()
.zip(array.iter())
.zip(zip_validity(
array.values_iter(),
validity.map(|x| x.iter()),
))
.take(row_count)
.for_each(|(row, value)| {
row.insert(
Expand Down Expand Up @@ -365,7 +395,13 @@ pub fn write_record_batches(batches: &[RecordBatch]) -> Vec<Map<String, Value>>
let row_count = batch.num_rows();
batch.columns().iter().enumerate().for_each(|(j, col)| {
let col_name = schema.field(j).name();
set_column_for_json_rows(&mut rows[base..], row_count, col.as_ref(), col_name);
set_column_for_json_rows(
&mut rows[base..],
row_count,
col.as_ref(),
None,
col_name,
);
});
base += row_count;
});
Expand Down