From 916bd84aa8821925476b65548040e2956178365c Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Fri, 8 Oct 2021 23:21:32 +0530 Subject: [PATCH 01/13] Use validity when serializing `StructArray` --- src/io/json/write/serialize.rs | 50 +++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 708019b6f89..13cc9ec3b08 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -18,6 +18,8 @@ use serde_json::map::Map; use serde_json::{Number, Value}; +use crate::bitmap::utils::zip_validity; +use crate::bitmap::Bitmap; use crate::{array::*, datatypes::*, record_batch::RecordBatch, types::NativeType}; trait JsonSerializable { @@ -117,7 +119,17 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec>>(); - // todo: use validity... + // Combine the two incoming validity `Bitmaps` using `and` logic + let combine_validity = |col_validity: Option<&Bitmap>, array_validity: Option<&Bitmap>| match ( + col_validity, + array_validity, + ) { + (Some(cv), Some(av)) => Some(cv & av), + (Some(cv), None) => Some(cv.clone()), + (None, Some(av)) => Some(av.clone()), + (None, None) => None, + }; + array .values() .iter() @@ -127,6 +139,8 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec], row_count: usize, array: &dyn Array, + validity: Option<&Bitmap>, col_name: &str, ) { match array.data_type() { @@ -243,7 +258,10 @@ fn set_column_for_json_rows( DataType::Boolean => { let array = array.as_any().downcast_ref::().unwrap(); rows.iter_mut() - .zip(array.iter()) + .zip(zip_validity( + array.values().iter(), + validity.map(|v| v.iter()), + )) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -265,7 +283,10 @@ fn set_column_for_json_rows( DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(array.iter()) + .zip(zip_validity( + array.values().iter(), + validity.map(|v| v.iter()), + )) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -279,7 +300,10 @@ fn set_column_for_json_rows( DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(array.iter()) + .zip(zip_validity( + array.values().iter(), + validity.map(|v| v.iter()), + )) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -303,7 +327,10 @@ fn set_column_for_json_rows( DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(array.iter()) + .zip(zip_validity( + array.values_iter(), + validity.map(|x| x.iter()), + )) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -317,7 +344,10 @@ fn set_column_for_json_rows( DataType::LargeList(_) => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(array.iter()) + .zip(zip_validity( + array.values_iter(), + validity.map(|x| x.iter()), + )) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -365,7 +395,13 @@ pub fn write_record_batches(batches: &[RecordBatch]) -> Vec> let row_count = batch.num_rows(); batch.columns().iter().enumerate().for_each(|(j, col)| { let col_name = schema.field(j).name(); - set_column_for_json_rows(&mut rows[base..], row_count, col.as_ref(), col_name); + set_column_for_json_rows( + &mut rows[base..], + row_count, + col.as_ref(), + None, + col_name, + ); }); base += row_count; }); From 4cc39ace04606167172a81a844e87ed15a50565c Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Fri, 8 Oct 2021 23:42:39 +0530 Subject: [PATCH 02/13] Pass validity when serializing record batches --- src/io/json/write/serialize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 13cc9ec3b08..b0675324091 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -399,7 +399,7 @@ pub fn write_record_batches(batches: &[RecordBatch]) -> Vec> &mut rows[base..], row_count, col.as_ref(), - None, + col.validity(), col_name, ); }); From 9a21d894d3a33acfb984b464c19ecb6b953202cb Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sat, 9 Oct 2021 00:28:13 +0530 Subject: [PATCH 03/13] Pass validity for serializing primitive array --- src/io/json/write/serialize.rs | 46 ++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index b0675324091..c5cd4a7b94e 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -231,12 +231,16 @@ fn set_column_by_primitive_type( rows: &mut [Map], row_count: usize, array: &dyn Array, + validity: Option<&Bitmap>, col_name: &str, ) { let primitive_arr = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(primitive_arr.iter()) + .zip(zip_validity( + primitive_arr.values().iter(), + validity.map(|v| v.iter()), + )) .take(row_count) .for_each(|(row, value)| { let value = to_json::(value); @@ -270,16 +274,36 @@ fn set_column_for_json_rows( ); }); } - DataType::Int8 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Int16 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Int32 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Int64 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt8 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt16 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt32 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt64 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Float32 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Float64 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::Int8 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::Int16 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::Int32 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::Int64 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::UInt8 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::UInt16 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::UInt32 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::UInt64 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::Float32 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } + DataType::Float64 => { + set_column_by_primitive_type::(rows, row_count, array, validity, col_name) + } DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() From 36e1032111212724e3e985daafbd61755fa5505e Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sat, 9 Oct 2021 00:28:42 +0530 Subject: [PATCH 04/13] Use the correct iterator for Utf8 values --- src/io/json/write/serialize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index c5cd4a7b94e..e6fa360e882 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -308,7 +308,7 @@ fn set_column_for_json_rows( let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() .zip(zip_validity( - array.values().iter(), + array.values_iter(), validity.map(|v| v.iter()), )) .take(row_count) From 3be5049c23eb213cdaa654db553ac2b93d26e2ff Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sat, 9 Oct 2021 13:40:26 +0530 Subject: [PATCH 05/13] Propagate validity across nested `StructArray`s --- src/io/json/write/serialize.rs | 37 ++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index e6fa360e882..589732dda0f 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -104,7 +104,11 @@ where array.iter().map(to_json).collect() } -fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec> { +fn struct_array_to_jsonmap_array( + array: &StructArray, + validity: Option<&Bitmap>, + row_count: usize, +) -> Vec> { // {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}} // [ // {"a": 1, "b": a, "c": {"a": 1}}, @@ -119,17 +123,6 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec>>(); - // Combine the two incoming validity `Bitmaps` using `and` logic - let combine_validity = |col_validity: Option<&Bitmap>, array_validity: Option<&Bitmap>| match ( - col_validity, - array_validity, - ) { - (Some(cv), Some(av)) => Some(cv & av), - (Some(cv), None) => Some(cv.clone()), - (None, Some(av)) => Some(av.clone()), - (None, None) => None, - }; - array .values() .iter() @@ -139,8 +132,7 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec Value { DataType::Struct(_) => { let jsonmaps = struct_array_to_jsonmap_array( array.as_any().downcast_ref::().unwrap(), + array.validity(), array.len(), ); jsonmaps.into_iter().map(Value::Object).collect() @@ -340,7 +333,11 @@ fn set_column_for_json_rows( } DataType::Struct(_) => { let array = array.as_any().downcast_ref::().unwrap(); - let inner_objs = struct_array_to_jsonmap_array(array, row_count); + let inner_objs = struct_array_to_jsonmap_array( + array, + combine_validity(validity, array.validity()).as_ref(), + row_count, + ); rows.iter_mut() .take(row_count) .zip(inner_objs.into_iter()) @@ -432,3 +429,13 @@ pub fn write_record_batches(batches: &[RecordBatch]) -> Vec> } rows } + +// Combine the two incoming validity `Bitmaps` using `and` logic +fn combine_validity(validity1: Option<&Bitmap>, validity2: Option<&Bitmap>) -> Option { + match (validity1, validity2) { + (Some(v1), Some(v2)) => Some(v1 & v2), + (Some(v1), None) => Some(v1.clone()), + (None, Some(v2)) => Some(v2.clone()), + (None, None) => None, + } +} From 7ad5f8448b5aedc42bb4ad23247dd878338dd8cb Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sat, 9 Oct 2021 13:41:14 +0530 Subject: [PATCH 06/13] Add test for serializing nested `StructArray` with validity --- tests/it/io/json/write.rs | 49 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index f2131778503..e744206ba52 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -38,6 +38,55 @@ fn write_simple_rows() { ); } +#[test] +fn write_nested_struct_with_validity() { + let inner = vec![ + Field::new("c121", DataType::Utf8, false), + Field::new("c122", DataType::Int32, false), + ]; + let fields = vec![ + Field::new("c11", DataType::Int32, false), + Field::new("c12", DataType::Struct(inner.clone()), false), + ]; + let schema = Schema::new(vec![ + Field::new("c1", DataType::Struct(fields.clone()), false), + Field::new("c2", DataType::Utf8, false), + ]); + + let c1 = StructArray::from_data( + DataType::Struct(fields), + vec![ + Arc::new(Int32Array::from(&[Some(1), None, Some(5)])), + Arc::new(StructArray::from_data( + DataType::Struct(inner), + vec![ + Arc::new(Utf8Array::::from(&vec![None, Some("f"), Some("g")])), + Arc::new(Int32Array::from(&[Some(20), None, Some(43)])), + ], + Some(Bitmap::from([false, true, true])), + )), + ], + Some(Bitmap::from([true, true, false])), + ); + let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); + + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + + let mut buf = Vec::new(); + { + let mut writer = LineDelimitedWriter::new(&mut buf); + writer.write_batches(&[batch]).unwrap(); + } + + assert_eq!( + String::from_utf8(buf).unwrap(), + r#"{"c1":{"c11":1,"c12":{"c121":null,"c122":null}},"c2":"a"} +{"c1":{"c11":null,"c12":{"c121":"f","c122":null}},"c2":"b"} +{"c1":{"c11":null,"c12":{"c121":null,"c122":null}},"c2":"c"} +"# + ); +} + #[test] fn write_nested_structs() { let c121 = Field::new("c121", DataType::Utf8, false); From 452631846fd76f0c12ee11bc78e7550eb30202ec Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 11:45:50 +0530 Subject: [PATCH 07/13] Revert changes for adding validity to serialization --- src/io/json/write/serialize.rs | 107 ++++++--------------------------- 1 file changed, 20 insertions(+), 87 deletions(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 589732dda0f..708019b6f89 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -18,8 +18,6 @@ use serde_json::map::Map; use serde_json::{Number, Value}; -use crate::bitmap::utils::zip_validity; -use crate::bitmap::Bitmap; use crate::{array::*, datatypes::*, record_batch::RecordBatch, types::NativeType}; trait JsonSerializable { @@ -104,11 +102,7 @@ where array.iter().map(to_json).collect() } -fn struct_array_to_jsonmap_array( - array: &StructArray, - validity: Option<&Bitmap>, - row_count: usize, -) -> Vec> { +fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec> { // {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}} // [ // {"a": 1, "b": a, "c": {"a": 1}}, @@ -123,6 +117,7 @@ fn struct_array_to_jsonmap_array( .take(row_count) .collect::>>(); + // todo: use validity... array .values() .iter() @@ -132,7 +127,6 @@ fn struct_array_to_jsonmap_array( &mut inner_objs, row_count, struct_col.as_ref(), - combine_validity(struct_col.validity(), validity).as_ref(), fields[j].name(), ); }); @@ -206,7 +200,6 @@ fn write_array(array: &dyn Array) -> Value { DataType::Struct(_) => { let jsonmaps = struct_array_to_jsonmap_array( array.as_any().downcast_ref::().unwrap(), - array.validity(), array.len(), ); jsonmaps.into_iter().map(Value::Object).collect() @@ -224,16 +217,12 @@ fn set_column_by_primitive_type( rows: &mut [Map], row_count: usize, array: &dyn Array, - validity: Option<&Bitmap>, col_name: &str, ) { let primitive_arr = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(zip_validity( - primitive_arr.values().iter(), - validity.map(|v| v.iter()), - )) + .zip(primitive_arr.iter()) .take(row_count) .for_each(|(row, value)| { let value = to_json::(value); @@ -245,7 +234,6 @@ fn set_column_for_json_rows( rows: &mut [Map], row_count: usize, array: &dyn Array, - validity: Option<&Bitmap>, col_name: &str, ) { match array.data_type() { @@ -255,10 +243,7 @@ fn set_column_for_json_rows( DataType::Boolean => { let array = array.as_any().downcast_ref::().unwrap(); rows.iter_mut() - .zip(zip_validity( - array.values().iter(), - validity.map(|v| v.iter()), - )) + .zip(array.iter()) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -267,43 +252,20 @@ fn set_column_for_json_rows( ); }); } - DataType::Int8 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::Int16 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::Int32 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::Int64 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::UInt8 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::UInt16 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::UInt32 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::UInt64 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::Float32 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } - DataType::Float64 => { - set_column_by_primitive_type::(rows, row_count, array, validity, col_name) - } + DataType::Int8 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::Int16 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::Int32 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::Int64 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::UInt8 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::UInt16 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::UInt32 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::UInt64 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::Float32 => set_column_by_primitive_type::(rows, row_count, array, col_name), + DataType::Float64 => set_column_by_primitive_type::(rows, row_count, array, col_name), DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(zip_validity( - array.values_iter(), - validity.map(|v| v.iter()), - )) + .zip(array.iter()) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -317,10 +279,7 @@ fn set_column_for_json_rows( DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(zip_validity( - array.values().iter(), - validity.map(|v| v.iter()), - )) + .zip(array.iter()) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -333,11 +292,7 @@ fn set_column_for_json_rows( } DataType::Struct(_) => { let array = array.as_any().downcast_ref::().unwrap(); - let inner_objs = struct_array_to_jsonmap_array( - array, - combine_validity(validity, array.validity()).as_ref(), - row_count, - ); + let inner_objs = struct_array_to_jsonmap_array(array, row_count); rows.iter_mut() .take(row_count) .zip(inner_objs.into_iter()) @@ -348,10 +303,7 @@ fn set_column_for_json_rows( DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(zip_validity( - array.values_iter(), - validity.map(|x| x.iter()), - )) + .zip(array.iter()) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -365,10 +317,7 @@ fn set_column_for_json_rows( DataType::LargeList(_) => { let array = array.as_any().downcast_ref::>().unwrap(); rows.iter_mut() - .zip(zip_validity( - array.values_iter(), - validity.map(|x| x.iter()), - )) + .zip(array.iter()) .take(row_count) .for_each(|(row, value)| { row.insert( @@ -416,26 +365,10 @@ pub fn write_record_batches(batches: &[RecordBatch]) -> Vec> let row_count = batch.num_rows(); batch.columns().iter().enumerate().for_each(|(j, col)| { let col_name = schema.field(j).name(); - set_column_for_json_rows( - &mut rows[base..], - row_count, - col.as_ref(), - col.validity(), - col_name, - ); + set_column_for_json_rows(&mut rows[base..], row_count, col.as_ref(), col_name); }); base += row_count; }); } rows } - -// Combine the two incoming validity `Bitmaps` using `and` logic -fn combine_validity(validity1: Option<&Bitmap>, validity2: Option<&Bitmap>) -> Option { - match (validity1, validity2) { - (Some(v1), Some(v2)) => Some(v1 & v2), - (Some(v1), None) => Some(v1.clone()), - (None, Some(v2)) => Some(v2.clone()), - (None, None) => None, - } -} From 9238c72fc02182c9318638aedbbef46caeacb8d6 Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 13:28:51 +0530 Subject: [PATCH 08/13] Revamp logic for validity inclusion --- src/io/json/write/serialize.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 708019b6f89..6504b09d07a 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -117,7 +117,6 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec>>(); - // todo: use validity... array .values() .iter() @@ -299,6 +298,17 @@ fn set_column_for_json_rows( .for_each(|(row, obj)| { row.insert(col_name.to_string(), Value::Object(obj)); }); + + if let Some(validity) = array.validity() { + rows.iter_mut() + .zip(validity) + .filter(|(_, v)| !*v) + .for_each(|(row, _)| { + if let Some(value) = row.get_mut(col_name) { + *value = Value::Null; + } + }); + } } DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); From 2acd91c4d164bdcef4468724aca6ff17a99fd771 Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 13:29:47 +0530 Subject: [PATCH 09/13] Make test reflect the desired outcome --- tests/it/io/json/write.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index e744206ba52..80029b2023d 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -80,9 +80,9 @@ fn write_nested_struct_with_validity() { assert_eq!( String::from_utf8(buf).unwrap(), - r#"{"c1":{"c11":1,"c12":{"c121":null,"c122":null}},"c2":"a"} + r#"{"c1":{"c11":1,"c12":null},"c2":"a"} {"c1":{"c11":null,"c12":{"c121":"f","c122":null}},"c2":"b"} -{"c1":{"c11":null,"c12":{"c121":null,"c122":null}},"c2":"c"} +{"c1":null,"c2":"c"} "# ); } From 3a04402cea59af504e8ad10bd9e67d19f95dffbb Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 14:13:59 +0530 Subject: [PATCH 10/13] Blend two iterators into one --- src/io/json/write/serialize.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 6504b09d07a..ccf85499836 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -18,6 +18,7 @@ use serde_json::map::Map; use serde_json::{Number, Value}; +use crate::bitmap::utils::zip_validity; use crate::{array::*, datatypes::*, record_batch::RecordBatch, types::NativeType}; trait JsonSerializable { @@ -294,21 +295,16 @@ fn set_column_for_json_rows( let inner_objs = struct_array_to_jsonmap_array(array, row_count); rows.iter_mut() .take(row_count) - .zip(inner_objs.into_iter()) + .zip(zip_validity( + inner_objs.into_iter(), + array.validity().map(|v| v.iter()), + )) .for_each(|(row, obj)| { - row.insert(col_name.to_string(), Value::Object(obj)); + row.insert( + col_name.to_string(), + obj.map(|o| Value::Object(o)).unwrap_or(Value::Null), + ); }); - - if let Some(validity) = array.validity() { - rows.iter_mut() - .zip(validity) - .filter(|(_, v)| !*v) - .for_each(|(row, _)| { - if let Some(value) = row.get_mut(col_name) { - *value = Value::Null; - } - }); - } } DataType::List(_) => { let array = array.as_any().downcast_ref::>().unwrap(); From 6e10337f4d6caefcdd192c98c1aae68287e7f05b Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 16:10:31 +0530 Subject: [PATCH 11/13] Check validity of Structs in Lists --- src/io/json/write/serialize.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index ccf85499836..18dbdfdc826 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -202,7 +202,9 @@ fn write_array(array: &dyn Array) -> Value { array.as_any().downcast_ref::().unwrap(), array.len(), ); - jsonmaps.into_iter().map(Value::Object).collect() + zip_validity(jsonmaps.into_iter(), array.validity().map(|v| v.iter())) + .map(|m| m.map(|o| Value::Object(o)).unwrap_or(Value::Null)) + .collect() } _ => { panic!( From 9ea077df3915b659dd66419500fa8c36e0834744 Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 16:11:14 +0530 Subject: [PATCH 12/13] Modify list_of_struct test to include validity --- tests/it/io/json/write.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 80029b2023d..3d514f333cd 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -251,10 +251,10 @@ fn write_list_of_struct() { Some("f"), Some("g"), ]))], - None, + Some(Bitmap::from([false, true, true])), )), ], - None, + Some(Bitmap::from([true, true, false])), ); // list column rows (c1): @@ -280,9 +280,9 @@ fn write_list_of_struct() { assert_eq!( String::from_utf8(buf).unwrap(), - r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c11":null,"c12":{"c121":"f"}}],"c2":1} + r#"{"c1":[{"c11":1,"c12":null},{"c11":null,"c12":{"c121":"f"}}],"c2":1} {"c1":null,"c2":2} -{"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3} +{"c1":[null],"c2":3} "# ); } From ddf2e9fc5efb48a5c887bd4cdcc647557f5bcad1 Mon Sep 17 00:00:00 2001 From: VasanthakumarV Date: Sun, 10 Oct 2021 16:26:37 +0530 Subject: [PATCH 13/13] Shorten the closures --- src/io/json/write/serialize.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 18dbdfdc826..8aa43ce946e 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -203,7 +203,7 @@ fn write_array(array: &dyn Array) -> Value { array.len(), ); zip_validity(jsonmaps.into_iter(), array.validity().map(|v| v.iter())) - .map(|m| m.map(|o| Value::Object(o)).unwrap_or(Value::Null)) + .map(|m| m.map(Value::Object).unwrap_or(Value::Null)) .collect() } _ => { @@ -304,7 +304,7 @@ fn set_column_for_json_rows( .for_each(|(row, obj)| { row.insert( col_name.to_string(), - obj.map(|o| Value::Object(o)).unwrap_or(Value::Null), + obj.map(Value::Object).unwrap_or(Value::Null), ); }); }