diff --git a/crates/polars-io/src/ndjson/buffer.rs b/crates/polars-io/src/ndjson/buffer.rs index 8a3d8f917ca1..df526dc49ec4 100644 --- a/crates/polars-io/src/ndjson/buffer.rs +++ b/crates/polars-io/src/ndjson/buffer.rs @@ -195,7 +195,7 @@ fn deserialize_all<'a>( if ignore_errors { return Ok(AnyValue::Null); } - polars_bail!(ComputeError: "expected list/array in json value, got {}", dtype); + polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json); }; let vals: Vec = arr .iter() diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index e74a3ef2559b..378b0d83e75f 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -84,6 +84,12 @@ where self.low_memory = toggle; self } + + /// Set values as `Null` if parsing fails because of schema mismatches. + pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self { + self.ignore_errors = ignore_errors; + self + } } impl<'a> JsonLineReader<'a, File> { diff --git a/crates/polars-json/src/json/infer_schema.rs b/crates/polars-json/src/json/infer_schema.rs index ae623e7fbe98..a525334a3d8c 100644 --- a/crates/polars-json/src/json/infer_schema.rs +++ b/crates/polars-json/src/json/infer_schema.rs @@ -2,7 +2,6 @@ use std::borrow::Borrow; use arrow::datatypes::{ArrowDataType, Field}; use indexmap::map::Entry; -use indexmap::IndexMap; use simd_json::borrowed::Object; use simd_json::{BorrowedValue, StaticNode}; @@ -91,7 +90,7 @@ pub(crate) fn coerce_data_type>(datatypes: &[A]) -> Arr }); // group fields by unique let fields = fields.iter().fold( - IndexMap::<&str, PlHashSet<&ArrowDataType>, ahash::RandomState>::default(), + PlIndexMap::<&str, PlHashSet<&ArrowDataType>>::default(), |mut acc, field| { match acc.entry(field.name.as_str()) { Entry::Occupied(mut v) => { @@ -132,7 +131,13 @@ pub(crate) fn coerce_data_type>(datatypes: &[A]) -> Arr true, ))); } else if datatypes.len() > 2 { - return LargeUtf8; + return datatypes + .iter() + .map(|dt| dt.borrow().clone()) + .reduce(|a, b| coerce_data_type(&[a, b])) + .unwrap() + .borrow() + .clone(); } let (lhs, rhs) = (datatypes[0].borrow(), datatypes[1].borrow()); @@ -142,7 +147,7 @@ pub(crate) fn coerce_data_type>(datatypes: &[A]) -> Arr let inner = coerce_data_type(&[lhs.data_type(), rhs.data_type()]); LargeList(Box::new(Field::new(ITEM_NAME, inner, true))) }, - (scalar, List(list)) => { + (scalar, LargeList(list)) => { let inner = coerce_data_type(&[scalar, list.data_type()]); LargeList(Box::new(Field::new(ITEM_NAME, inner, true))) }, @@ -154,6 +159,8 @@ pub(crate) fn coerce_data_type>(datatypes: &[A]) -> Arr (Int64, Float64) => Float64, (Int64, Boolean) => Int64, (Boolean, Int64) => Int64, + (Null, rhs) => rhs.clone(), + (lhs, Null) => lhs.clone(), (_, _) => LargeUtf8, }; } diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ndjson.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ndjson.rs index 880f66040c3d..88a4d5b1b4a0 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ndjson.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ndjson.rs @@ -13,21 +13,29 @@ impl AnonymousScan for LazyJsonLineReader { .with_chunk_size(self.batch_size) .low_memory(self.low_memory) .with_n_rows(scan_opts.n_rows) + .with_ignore_errors(self.ignore_errors) .with_chunk_size(self.batch_size) .finish() } fn schema(&self, infer_schema_length: Option) -> PolarsResult { - // Short-circuit schema inference if the schema has been explicitly provided. - if let Some(schema) = &self.schema { + // Short-circuit schema inference if the schema has been explicitly provided, + // or already inferred + if let Some(schema) = &(*self.schema.read().unwrap()) { return Ok(schema.clone()); } let f = polars_utils::open_file(&self.path)?; let mut reader = std::io::BufReader::new(f); - let schema = polars_io::ndjson::infer_schema(&mut reader, infer_schema_length)?; - Ok(Arc::new(schema)) + let schema = Arc::new(polars_io::ndjson::infer_schema( + &mut reader, + infer_schema_length, + )?); + let mut guard = self.schema.write().unwrap(); + *guard = Some(schema.clone()); + + Ok(schema) } fn allows_projection_pushdown(&self) -> bool { true diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index e7570b6ea831..4be317822977 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::RwLock; use polars_core::prelude::*; use polars_io::RowIndex; @@ -13,10 +14,11 @@ pub struct LazyJsonLineReader { pub(crate) batch_size: Option, pub(crate) low_memory: bool, pub(crate) rechunk: bool, - pub(crate) schema: Option, + pub(crate) schema: Arc>>, pub(crate) row_index: Option, pub(crate) infer_schema_length: Option, pub(crate) n_rows: Option, + pub(crate) ignore_errors: bool, } impl LazyJsonLineReader { @@ -31,9 +33,10 @@ impl LazyJsonLineReader { batch_size: None, low_memory: false, rechunk: false, - schema: None, + schema: Arc::new(Default::default()), row_index: None, infer_schema_length: Some(100), + ignore_errors: false, n_rows: None, } } @@ -43,6 +46,13 @@ impl LazyJsonLineReader { self.row_index = row_index; self } + + /// Set values as `Null` if parsing fails because of schema mismatches. + #[must_use] + pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self { + self.ignore_errors = ignore_errors; + self + } /// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot /// be guaranteed. #[must_use] @@ -62,7 +72,7 @@ impl LazyJsonLineReader { /// Set the JSON file's schema #[must_use] pub fn with_schema(mut self, schema: Option) -> Self { - self.schema = schema; + self.schema = Arc::new(RwLock::new(schema)); self } @@ -87,7 +97,7 @@ impl LazyFileListReader for LazyJsonLineReader { infer_schema_length: self.infer_schema_length, n_rows: self.n_rows, row_index: self.row_index.clone(), - schema: self.schema.clone(), + schema: self.schema.read().unwrap().clone(), ..ScanArgsAnonymous::default() }; diff --git a/py-polars/polars/io/ndjson.py b/py-polars/polars/io/ndjson.py index f3ef6084a745..4f5dc87e61fb 100644 --- a/py-polars/polars/io/ndjson.py +++ b/py-polars/polars/io/ndjson.py @@ -63,10 +63,11 @@ def scan_ndjson( batch_size: int | None = 1024, n_rows: int | None = None, low_memory: bool = False, - rechunk: bool = True, + rechunk: bool = False, row_index_name: str | None = None, row_index_offset: int = 0, schema: SchemaDefinition | None = None, + ignore_errors: bool = False, ) -> LazyFrame: """ Lazily read from a newline delimited JSON file or multiple files via glob patterns. @@ -103,6 +104,8 @@ def scan_ndjson( If you supply a list of column names that does not match the names in the underlying data, the names given here will overwrite them. The number of names given in the schema should match the underlying data dimensions. + ignore_errors + Return `Null` if parsing fails because of schema mismatches. """ return pl.LazyFrame._scan_ndjson( source, @@ -114,4 +117,5 @@ def scan_ndjson( rechunk=rechunk, row_index_name=row_index_name, row_index_offset=row_index_offset, + ignore_errors=ignore_errors, ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 199a1e49f343..cac404b8fb46 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -530,9 +530,10 @@ def _scan_ndjson( batch_size: int | None = None, n_rows: int | None = None, low_memory: bool = False, - rechunk: bool = True, + rechunk: bool = False, row_index_name: str | None = None, row_index_offset: int = 0, + ignore_errors: bool = False, ) -> Self: """ Lazily read from a newline delimited JSON file. @@ -561,6 +562,7 @@ def _scan_ndjson( low_memory, rechunk, _prepare_row_index_args(row_index_name, row_index_offset), + ignore_errors, ) return self diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 41a6f34e9f40..3f0b44662bfa 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -114,7 +114,7 @@ impl PyLazyFrame { #[staticmethod] #[cfg(feature = "json")] #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, row_index))] + #[pyo3(signature = (path, paths, infer_schema_length, schema, batch_size, n_rows, low_memory, rechunk, row_index, ignore_errors))] fn new_from_ndjson( path: Option, paths: Vec, @@ -125,6 +125,7 @@ impl PyLazyFrame { low_memory: bool, rechunk: bool, row_index: Option<(String, IdxSize)>, + ignore_errors: bool, ) -> PyResult { let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); @@ -142,6 +143,7 @@ impl PyLazyFrame { .with_rechunk(rechunk) .with_schema(schema.map(|schema| Arc::new(schema.0))) .with_row_index(row_index) + .with_ignore_errors(ignore_errors) .finish() .map_err(PyPolarsErr::from)?;