Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Adding iter methods to io::ndjson::read
Browse files Browse the repository at this point in the history
  • Loading branch information
cjermain committed May 15, 2022
1 parent 00d300e commit d0017a3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
20 changes: 20 additions & 0 deletions src/io/ndjson/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,23 @@ pub fn deserialize(rows: &[String], data_type: DataType) -> Result<Arc<dyn Array
// deserialize &[Value] to Array
Ok(_deserialize(&rows, data_type))
}


/// Deserializes an iterator of rows into an [`Array`] of [`DataType`].
/// # Implementation
/// This function is CPU-bounded.
/// This function is guaranteed to return an array of length equal to the leng
/// # Errors
/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON).
pub fn deserialize_iter<'a>(
rows: impl Iterator<Item=Option<&'a str>>,
data_type: DataType,
) -> Result<Arc<dyn Array>, ArrowError> {
// deserialize strings to `Value`s
let rows = rows
.map(|row| serde_json::from_str(row.unwrap_or("null")).map_err(ArrowError::from))
.collect::<Result<Vec<Value>, ArrowError>>()?;

// deserialize &[Value] to Array
Ok(_deserialize(&rows, data_type))
}
20 changes: 20 additions & 0 deletions src/io/ndjson/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,23 @@ pub fn infer<R: std::io::BufRead>(
let v: Vec<&DataType> = data_types.iter().collect();
Ok(coerce_data_type(&v))
}

/// Infers the [`DataType`] from an iterator of JSON strings. A limited number of
/// rows can be used by passing `rows.take(number_of_rows)` as an input.
///
/// # Implementation
/// This implementation infers each row by going through the entire iterator.
pub fn infer_iter<'a>(rows: impl Iterator<Item=Option<&'a str>>) -> Result<DataType>
{
let mut data_types = HashSet::new();
for row in rows.flatten() {
let v: Value = serde_json::from_str(row)?;
let data_type = infer_json(&v)?;
if data_type != DataType::Null {
data_types.insert(data_type);
}
}

let v: Vec<&DataType> = data_types.iter().collect();
Ok(coerce_data_type(&v))
}
4 changes: 2 additions & 2 deletions src/io/ndjson/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ pub use fallible_streaming_iterator::FallibleStreamingIterator;

mod deserialize;
mod file;
pub use deserialize::deserialize;
pub use file::{infer, FileReader};
pub use deserialize::{deserialize, deserialize_iter};
pub use file::{infer, infer_iter, FileReader};
23 changes: 23 additions & 0 deletions tests/it/io/ndjson/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,26 @@ fn skip_empty_lines() -> Result<()> {
assert_eq!(3, arrays[0].len());
Ok(())
}

#[test]
fn utf8_array() -> Result<()> {
let array = Utf8Array::<i64>::from([
Some(r#"{"a": 1, "b": [{"c": 0}, {"c": 1}]}"#),
None,
Some(r#"{"a": 2, "b": [{"c": 2}, {"c": 5}]}"#),
None,
]);
let data_type = ndjson_read::infer_iter(array.iter()).unwrap();
let new_array = ndjson_read::deserialize_iter(array.iter(), data_type).unwrap();

// Explicitly cast as StructArray
let new_array = new_array.as_any().downcast_ref::<StructArray>().unwrap();

assert_eq!(array.len(), new_array.len());
assert_eq!(array.null_count(), new_array.null_count());
assert_eq!(array.validity().unwrap(), new_array.validity().unwrap());

let field_names: Vec<String> = new_array.fields().iter().map(|f| f.name.clone()).collect();
assert_eq!(field_names, vec!["a".to_string(), "b".to_string()]);
Ok(())
}

0 comments on commit d0017a3

Please sign in to comment.