From ebe6f539844ba781553c87bdaa2dd25190047c49 Mon Sep 17 00:00:00 2001 From: askoa <112126368+askoa@users.noreply.github.com> Date: Thu, 23 Feb 2023 04:52:12 -0500 Subject: [PATCH] feat: Sort kernel for `RunArray` (#3695) * Handle sliced array in run array iterator * sort_to_indices for RunArray * better loop * sort for run array * improve docs * some minor tweaks * doc fix * format fix * fix sort run to return all logical indices * pr comment * rename test function, pull sort run logic into a separate function --------- Co-authored-by: ask --- arrow-array/src/array/run_array.rs | 28 ++- arrow-array/src/run_iterator.rs | 9 +- arrow-ord/src/sort.rs | 331 +++++++++++++++++++++++++++++ arrow/benches/sort_kernel.rs | 21 +- 4 files changed, 374 insertions(+), 15 deletions(-) diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 709933e1b103..9dba3ddab6ae 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -112,17 +112,37 @@ impl RunArray { /// Returns a reference to run_ends array /// - /// Note: any slicing of this array is not applied to the returned array + /// Note: any slicing of this [`RunArray`] array is not applied to the returned array /// and must be handled separately pub fn run_ends(&self) -> &PrimitiveArray { &self.run_ends } /// Returns a reference to values array + /// + /// Note: any slicing of this [`RunArray`] array is not applied to the returned array + /// and must be handled separately pub fn values(&self) -> &ArrayRef { &self.values } + /// Returns the physical index at which the array slice starts. + pub fn get_start_physical_index(&self) -> usize { + if self.offset() == 0 { + return 0; + } + self.get_zero_offset_physical_index(self.offset()).unwrap() + } + + /// Returns the physical index at which the array slice ends. + pub fn get_end_physical_index(&self) -> usize { + if self.offset() + self.len() == Self::logical_len(&self.run_ends) { + return self.run_ends.len() - 1; + } + self.get_zero_offset_physical_index(self.offset() + self.len() - 1) + .unwrap() + } + /// Downcast this [`RunArray`] to a [`TypedRunArray`] /// /// ``` @@ -230,11 +250,7 @@ impl RunArray { } // Skip some physical indices based on offset. - let skip_value = if self.offset() > 0 { - self.get_zero_offset_physical_index(self.offset()).unwrap() - } else { - 0 - }; + let skip_value = self.get_start_physical_index(); let mut physical_indices = vec![0; indices_len]; diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs index a79969c3cb91..fbf173b1dbe0 100644 --- a/arrow-array/src/run_iterator.rs +++ b/arrow-array/src/run_iterator.rs @@ -57,13 +57,8 @@ where { /// create a new iterator pub fn new(array: TypedRunArray<'a, R, V>) -> Self { - let current_front_physical: usize = - array.run_array().get_physical_index(0).unwrap(); - let current_back_physical: usize = array - .run_array() - .get_physical_index(array.len() - 1) - .unwrap() - + 1; + let current_front_physical = array.run_array().get_start_physical_index(); + let current_back_physical = array.run_array().get_end_physical_index() + 1; RunArrayIter { array, current_front_logical: array.offset(), diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs index 207f499ef275..c4baa2283885 100644 --- a/arrow-ord/src/sort.rs +++ b/arrow-ord/src/sort.rs @@ -18,14 +18,17 @@ //! Defines sort kernel for `ArrayRef` use crate::ord::{build_compare, DynComparator}; +use arrow_array::builder::BufferBuilder; use arrow_array::cast::*; use arrow_array::types::*; use arrow_array::*; use arrow_buffer::{ArrowNativeType, MutableBuffer}; use arrow_data::ArrayData; +use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, IntervalUnit, TimeUnit}; use arrow_select::take::take; use std::cmp::Ordering; +use std::sync::Arc; pub use arrow_schema::SortOptions; @@ -55,6 +58,9 @@ pub fn sort( values: &ArrayRef, options: Option, ) -> Result { + if let DataType::RunEndEncoded(_, _) = values.data_type() { + return sort_run(values, options, None); + } let indices = sort_to_indices(values, options, None)?; take(values.as_ref(), &indices, None) } @@ -94,6 +100,9 @@ pub fn sort_limit( options: Option, limit: Option, ) -> Result { + if let DataType::RunEndEncoded(_, _) = values.data_type() { + return sort_run(values, options, limit); + } let indices = sort_to_indices(values, options, limit)?; take(values.as_ref(), &indices, None) } @@ -357,6 +366,16 @@ pub fn sort_to_indices( sort_binary::(values, v, n, &options, limit) } DataType::LargeBinary => sort_binary::(values, v, n, &options, limit), + DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() { + DataType::Int16 => sort_run_to_indices::(values, &options, limit), + DataType::Int32 => sort_run_to_indices::(values, &options, limit), + DataType::Int64 => sort_run_to_indices::(values, &options, limit), + dt => { + return Err(ArrowError::ComputeError(format!( + "Inavlid run end data type: {dt}" + ))) + } + }, t => { return Err(ArrowError::ComputeError(format!( "Sort not supported for data type {t:?}" @@ -599,6 +618,194 @@ fn insert_valid_values(result_slice: &mut [u32], offset: usize, valids: &[(u3 append_valids(&mut result_slice[offset..offset + valids.len()]); } +// Sort run array and return sorted run array. +// The output RunArray will be encoded at the same level as input run array. +// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] } +// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] } +// and not RunArray { run_ends = [4,8], values = [1,2] } +fn sort_run( + values: &ArrayRef, + options: Option, + limit: Option, +) -> Result { + match values.data_type() { + DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() { + DataType::Int16 => sort_run_downcasted::(values, options, limit), + DataType::Int32 => sort_run_downcasted::(values, options, limit), + DataType::Int64 => sort_run_downcasted::(values, options, limit), + dt => unreachable!("Not valid run ends data type {dt}"), + }, + dt => Err(ArrowError::InvalidArgumentError(format!( + "Input is not a run encoded array. Input data type {dt}" + ))), + } +} + +fn sort_run_downcasted( + values: &ArrayRef, + options: Option, + limit: Option, +) -> Result { + let run_array = values.as_any().downcast_ref::>().unwrap(); + + // Determine the length of output run array. + let output_len = if let Some(limit) = limit { + limit.min(run_array.len()) + } else { + run_array.len() + }; + + let run_ends = run_array.run_ends(); + + let mut new_run_ends_builder = BufferBuilder::::new(run_ends.len()); + let mut new_run_end: usize = 0; + let mut new_physical_len: usize = 0; + + let consume_runs = |run_length, _| { + new_run_end += run_length; + new_physical_len += 1; + new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap()); + }; + + let (values_indices, run_values) = + sort_run_inner(run_array, options, output_len, consume_runs); + + let new_run_ends = unsafe { + // Safety: + // The function builds a valid run_ends array and hence need not be validated. + ArrayDataBuilder::new(run_array.run_ends().data_type().clone()) + .len(new_physical_len) + .null_count(0) + .add_buffer(new_run_ends_builder.finish()) + .build_unchecked() + }; + + // slice the sorted value indices based on limit. + let new_values_indices: PrimitiveArray = values_indices + .slice(0, new_run_ends.len()) + .into_data() + .into(); + + let new_values = take(&run_values, &new_values_indices, None)?; + + // Build sorted run array + let builder = ArrayDataBuilder::new(run_array.data_type().clone()) + .len(new_run_end) + .add_child_data(new_run_ends) + .add_child_data(new_values.into_data()); + let array_data: RunArray = unsafe { + // Safety: + // This function builds a valid run array and hence can skip validation. + builder.build_unchecked().into() + }; + Ok(Arc::new(array_data)) +} + +// Sort to indices for run encoded array. +// This function will be slow for run array as it decodes the physical indices to +// logical indices and to get the run array back, the logical indices has to be +// encoded back to run array. +fn sort_run_to_indices( + values: &ArrayRef, + options: &SortOptions, + limit: Option, +) -> UInt32Array { + let run_array = values.as_any().downcast_ref::>().unwrap(); + let output_len = if let Some(limit) = limit { + limit.min(run_array.len()) + } else { + run_array.len() + }; + let mut result: Vec = Vec::with_capacity(output_len); + + //Add all logical indices belonging to a physical index to the output + let consume_runs = |run_length, logical_start| { + result.extend(logical_start as u32..(logical_start + run_length) as u32); + }; + sort_run_inner(run_array, Some(*options), output_len, consume_runs); + + UInt32Array::from(result) +} + +fn sort_run_inner( + run_array: &RunArray, + options: Option, + output_len: usize, + mut consume_runs: F, +) -> (PrimitiveArray, ArrayRef) +where + F: FnMut(usize, usize), +{ + // slice the run_array.values based on offset and length. + let start_physical_index = run_array.get_start_physical_index(); + let end_physical_index = run_array.get_end_physical_index(); + let physical_len = end_physical_index - start_physical_index + 1; + let run_values = run_array.values().slice(start_physical_index, physical_len); + + // All the values have to be sorted irrespective of input limit. + let values_indices = sort_to_indices(&run_values, options, None).unwrap(); + + let mut remaining_len = output_len; + + let run_ends = run_array.run_ends(); + + assert_eq!( + 0, + values_indices.null_count(), + "The output of sort_to_indices should not have null values. Its values is {}", + values_indices.null_count() + ); + + // Calculate `run length` of sorted value indices. + // Find the `logical index` at which the run starts. + // Call the consumer using the run length and starting logical index. + for physical_index in values_indices.values() { + // As the values were sliced with offset = start_physical_index, it has to be added back + // before accesing `RunArray::run_ends` + let physical_index = *physical_index as usize + start_physical_index; + + // calculate the run length and logical index of sorted values + let (run_length, logical_index_start) = unsafe { + // Safety: + // The index will be within bounds as its in bounds of start_physical_index + // and len, both of which are within bounds of run_array + if physical_index == start_physical_index { + ( + run_ends.value_unchecked(physical_index).as_usize() + - run_array.offset(), + 0, + ) + } else if physical_index == end_physical_index { + let prev_run_end = + run_ends.value_unchecked(physical_index - 1).as_usize(); + ( + run_array.offset() + run_array.len() - prev_run_end, + prev_run_end - run_array.offset(), + ) + } else { + let prev_run_end = + run_ends.value_unchecked(physical_index - 1).as_usize(); + ( + run_ends.value_unchecked(physical_index).as_usize() - prev_run_end, + prev_run_end - run_array.offset(), + ) + } + }; + let new_run_length = run_length.min(remaining_len); + consume_runs(new_run_length, logical_index_start); + remaining_len -= new_run_length; + + if remaining_len == 0 { + break; + } + } + + if remaining_len > 0 { + panic!("Remaining length should be zero its values is {remaining_len}") + } + (values_indices, run_values) +} + /// Sort strings fn sort_string( values: &ArrayRef, @@ -1057,6 +1264,7 @@ fn sort_valids_array( #[cfg(test)] mod tests { use super::*; + use arrow_array::builder::PrimitiveRunBuilder; use arrow_buffer::i256; use rand::rngs::StdRng; use rand::{Rng, RngCore, SeedableRng}; @@ -2882,6 +3090,129 @@ mod tests { ); } + #[test] + fn test_sort_run_to_run() { + test_sort_run_inner(|array, sort_options, limit| { + sort_run(array, sort_options, limit) + }); + } + + #[test] + fn test_sort_run_to_indices() { + test_sort_run_inner(|array, sort_options, limit| { + let indices = sort_to_indices(array, sort_options, limit).unwrap(); + take(array, &indices, None) + }); + } + + fn test_sort_run_inner(sort_fn: F) + where + F: Fn( + &ArrayRef, + Option, + Option, + ) -> Result, + { + // Create an input array for testing + let total_len = 80; + let vals: Vec> = + vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)]; + let repeats: Vec = vec![1, 3, 2, 4]; + let mut input_array: Vec> = Vec::with_capacity(total_len); + for ix in 0_usize..32 { + let repeat: usize = repeats[ix % repeats.len()]; + let val: Option = vals[ix % vals.len()]; + input_array.resize(input_array.len() + repeat, val); + } + + // create run array using input_array + // Encode the input_array to run array + let mut builder = + PrimitiveRunBuilder::::with_capacity(input_array.len()); + builder.extend(input_array.iter().copied()); + let run_array = builder.finish(); + + // slice lengths that are tested + let slice_lens = [ + 1, 2, 3, 4, 5, 6, 7, 37, 38, 39, 40, 41, 42, 43, 74, 75, 76, 77, 78, 79, 80, + ]; + for slice_len in slice_lens { + test_sort_run_inner2( + input_array.as_slice(), + &run_array, + 0, + slice_len, + None, + &sort_fn, + ); + test_sort_run_inner2( + input_array.as_slice(), + &run_array, + total_len - slice_len, + slice_len, + None, + &sort_fn, + ); + // Test with non zero limit + if slice_len > 1 { + test_sort_run_inner2( + input_array.as_slice(), + &run_array, + 0, + slice_len, + Some(slice_len / 2), + &sort_fn, + ); + test_sort_run_inner2( + input_array.as_slice(), + &run_array, + total_len - slice_len, + slice_len, + Some(slice_len / 2), + &sort_fn, + ); + } + } + } + + fn test_sort_run_inner2( + input_array: &[Option], + run_array: &RunArray, + offset: usize, + length: usize, + limit: Option, + sort_fn: &F, + ) where + F: Fn( + &ArrayRef, + Option, + Option, + ) -> Result, + { + // Run the sort and build actual result + let sliced_array = run_array.slice(offset, length); + let sorted_sliced_array = sort_fn(&sliced_array, None, limit).unwrap(); + let sorted_run_array = sorted_sliced_array + .as_any() + .downcast_ref::>() + .unwrap(); + let typed_run_array = sorted_run_array + .downcast::>() + .unwrap(); + let actual: Vec> = typed_run_array.into_iter().collect(); + + // build expected result. + let mut sliced_input = input_array[offset..(offset + length)].to_owned(); + sliced_input.sort(); + let expected = if let Some(limit) = limit { + sliced_input.iter().take(limit).copied().collect() + } else { + sliced_input + }; + + assert_eq!(expected, actual) + } + #[test] fn test_sort_string_dicts() { test_sort_string_dict_arrays::( diff --git a/arrow/benches/sort_kernel.rs b/arrow/benches/sort_kernel.rs index c4c6819df097..43a9a84d9a74 100644 --- a/arrow/benches/sort_kernel.rs +++ b/arrow/benches/sort_kernel.rs @@ -24,8 +24,8 @@ use std::sync::Arc; extern crate arrow; use arrow::compute::kernels::sort::{lexsort, SortColumn}; -use arrow::compute::sort_to_indices; -use arrow::datatypes::Int32Type; +use arrow::compute::{sort_limit, sort_to_indices}; +use arrow::datatypes::{Int16Type, Int32Type}; use arrow::util::bench_util::*; use arrow::{array::*, datatypes::Float32Type}; @@ -61,6 +61,10 @@ fn bench_sort_to_indices(array: &ArrayRef, limit: Option) { criterion::black_box(sort_to_indices(array, None, limit).unwrap()); } +fn bench_sort_run(array: &ArrayRef, limit: Option) { + criterion::black_box(sort_limit(array, None, limit).unwrap()); +} + fn add_benchmark(c: &mut Criterion) { let arr_a = create_f32_array(2u64.pow(10) as usize, false); let arr_b = create_f32_array(2u64.pow(10) as usize, false); @@ -107,6 +111,19 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_sort_to_indices(&dict_arr, None)) }); + let run_encoded_array = Arc::new(create_primitive_run_array::( + 2u64.pow(12) as usize, + 2u64.pow(10) as usize, + )) as ArrayRef; + + c.bench_function("sort primitive run to indices 2^12", |b| { + b.iter(|| bench_sort_to_indices(&run_encoded_array, None)) + }); + + c.bench_function("sort primitive run to run 2^12", |b| { + b.iter(|| bench_sort_run(&run_encoded_array, None)) + }); + // with limit { let arr_a = create_f32_array(2u64.pow(12) as usize, false);