diff --git a/polars/polars-core/src/chunked_array/logical/categorical/builder.rs b/polars/polars-core/src/chunked_array/logical/categorical/builder.rs index afb6d93235f4..8da3ed730bff 100644 --- a/polars/polars-core/src/chunked_array/logical/categorical/builder.rs +++ b/polars/polars-core/src/chunked_array/logical/categorical/builder.rs @@ -37,7 +37,7 @@ impl RevMappingBuilder { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum RevMapping { /// Hashmap: maps the indexes from the global cache/categorical array to indexes in the local Utf8Array /// Utf8Array: caches the string values diff --git a/polars/polars-core/src/frame/row.rs b/polars/polars-core/src/frame/row.rs index d67dce53ef03..8fdaedadde4d 100644 --- a/polars/polars-core/src/frame/row.rs +++ b/polars/polars-core/src/frame/row.rs @@ -310,7 +310,7 @@ impl<'a> From<&AnyValue<'a>> for DataType { Int8(_) => DataType::Int8, Int16(_) => DataType::Int16, #[cfg(feature = "dtype-categorical")] - Categorical(_, _) => DataType::Categorical(None), + Categorical(_, rev_map) => DataType::Categorical(Some(Arc::new((*rev_map).clone()))), #[cfg(feature = "object")] Object(o) => DataType::Object(o.type_name()), } @@ -347,6 +347,8 @@ pub(crate) enum AnyValueBuffer<'a> { Float32(PrimitiveChunkedBuilder), Float64(PrimitiveChunkedBuilder), Utf8(Utf8ChunkedBuilder), + #[cfg(feature = "dtype-categorical")] + Categorical(CategoricalChunkedBuilder), All(Vec>), } @@ -413,6 +415,8 @@ impl<'a> AnyValueBuffer<'a> { Float32(b) => b.finish().into_series(), Float64(b) => b.finish().into_series(), Utf8(b) => b.finish().into_series(), + #[cfg(feature = "dtype-categorical")] + Categorical(b) => b.finish().into_series(), All(vals) => Series::new("", vals), } } @@ -440,6 +444,8 @@ impl From<(&DataType, usize)> for AnyValueBuffer<'_> { Float32 => AnyValueBuffer::Float32(PrimitiveChunkedBuilder::new("", len)), Float64 => AnyValueBuffer::Float64(PrimitiveChunkedBuilder::new("", len)), Utf8 => AnyValueBuffer::Utf8(Utf8ChunkedBuilder::new("", len, len * 5)), + #[cfg(feature = "dtype-categorical")] + Categorical(_) => AnyValueBuffer::Categorical(CategoricalChunkedBuilder::new("", len)), // Struct and List can be recursive so use anyvalues for that _ => AnyValueBuffer::All(Vec::with_capacity(len)), } diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 74922274cf8c..e8be361172f2 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -51,6 +51,8 @@ use mimalloc::MiMalloc; use polars::functions::{diag_concat_df, hor_concat_df}; use polars::prelude::Null; use polars_core::datatypes::TimeUnit; +use polars_core::frame::row::Row; +use polars_core::prelude::DataFrame; use polars_core::prelude::IntoSeries; use polars_core::POOL; use pyo3::panic::PanicException; @@ -263,18 +265,43 @@ fn py_duration( } #[pyfunction] -fn concat_df(dfs: &PyAny) -> PyResult { +fn concat_df(dfs: &PyAny, py: Python) -> PyResult { + use polars_core::utils::rayon::prelude::*; + let (seq, _len) = get_pyseq(dfs)?; let mut iter = seq.iter()?; let first = iter.next().unwrap()?; - let mut df = get_df(first)?; + let first_rdf = get_df(first)?; + let schema = first_rdf.schema(); - for res in iter { - let item = res?; - let other = get_df(item)?; - df.vstack_mut(&other).map_err(PyPolarsErr::from)?; + let mut rdfs: Vec> = vec![Ok(first_rdf)]; + + for item in iter { + let rdf = get_df(item?)?; + rdfs.push(Ok(rdf)); } + + let identity = || DataFrame::from_rows_and_schema(&[Row::default()], &schema); + + let df = py + .allow_threads(|| { + polars_core::POOL.install(|| { + rdfs.into_par_iter() + .fold(identity, |acc, df| { + let mut acc = acc?; + acc.vstack_mut(&df?)?; + Ok(acc) + }) + .reduce(identity, |acc, df| { + let mut acc = acc?; + acc.vstack_mut(&df?)?; + Ok(acc) + }) + }) + }) + .map_err(PyPolarsErr::from)?; + Ok(df.into()) }