diff --git a/benches/filter_kernels.rs b/benches/filter_kernels.rs index be29dd58ae9..b6981f0e4f0 100644 --- a/benches/filter_kernels.rs +++ b/benches/filter_kernels.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; -use arrow2::compute::filter::{build_filter, filter, filter_record_batch, Filter}; +use arrow2::columns::Columns; +use arrow2::compute::filter::{build_filter, filter, filter_columns, Filter}; use arrow2::datatypes::{DataType, Field, Schema}; -use arrow2::record_batch::RecordBatch; use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array}; fn bench_filter(data_array: &dyn Array, filter_array: &BooleanArray) { @@ -125,13 +125,10 @@ fn add_benchmark(c: &mut Criterion) { let data_array = create_primitive_array::(size, 0.0); - let field = Field::new("c1", data_array.data_type().clone(), true); - let schema = Schema::new(vec![field]); - - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data_array)]).unwrap(); + let columns = Columns::try_new(vec![Arc::new(data_array)]).unwrap(); c.bench_function("filter single record batch", |b| { - b.iter(|| filter_record_batch(&batch, &filter_array)) + b.iter(|| filter_record_batch(&columns, &filter_array)) }); } diff --git a/benches/write_ipc.rs b/benches/write_ipc.rs index 77b1ab95905..52dff65a324 100644 --- a/benches/write_ipc.rs +++ b/benches/write_ipc.rs @@ -4,21 +4,21 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; +use arrow2::columns::Columns; use arrow2::datatypes::{Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::write::*; -use arrow2::record_batch::RecordBatch; use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array}; fn write(array: &dyn Array) -> Result<()> { let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::new(vec![field]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?; + let columns = Columns::try_new(vec![clone(array).into()])?; let writer = Cursor::new(vec![]); - let mut writer = FileWriter::try_new(writer, &schema, Default::default())?; + let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?; - writer.write(&batch) + writer.write(&columns, None) } fn add_benchmark(c: &mut Criterion) { diff --git a/benches/write_json.rs b/benches/write_json.rs index 354ebfab0e0..3c3ab4328b1 100644 --- a/benches/write_json.rs +++ b/benches/write_json.rs @@ -3,19 +3,19 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; +use arrow2::columns::Columns; use arrow2::error::Result; use arrow2::io::json::write; -use arrow2::record_batch::RecordBatch; use arrow2::util::bench_util::*; -fn write_batch(batch: &RecordBatch) -> Result<()> { +fn write_batch(columns: &Columns>) -> Result<()> { let mut writer = vec![]; let format = write::JsonArray::default(); - let batches = vec![Ok(batch.clone())].into_iter(); + let batches = vec![Ok(columns.clone())].into_iter(); // Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded) - let blocks = write::Serializer::new(batches, vec![], format); + let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format); // the operation of writing is IO-bounded. write::write(&mut writer, format, blocks)?; @@ -23,8 +23,8 @@ fn write_batch(batch: &RecordBatch) -> Result<()> { Ok(()) } -fn make_batch(array: impl Array + 'static) -> RecordBatch { - RecordBatch::try_from_iter([("a", Arc::new(array) as Arc)]).unwrap() +fn make_columns(array: impl Array + 'static) -> Columns> { + Columns::new(vec![Arc::new(array) as Arc]) } fn add_benchmark(c: &mut Criterion) { @@ -32,24 +32,24 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(log2_size); let array = create_primitive_array::(size, 0.1); - let batch = make_batch(array); + let columns = make_columns(array); c.bench_function(&format!("json write i32 2^{}", log2_size), |b| { - b.iter(|| write_batch(&batch)) + b.iter(|| write_batch(&columns)) }); let array = create_string_array::(size, 100, 0.1, 42); - let batch = make_batch(array); + let columns = make_columns(array); c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| { - b.iter(|| write_batch(&batch)) + b.iter(|| write_batch(&columns)) }); let array = create_primitive_array::(size, 0.1); - let batch = make_batch(array); + let columns = make_columns(array); c.bench_function(&format!("json write f64 2^{}", log2_size), |b| { - b.iter(|| write_batch(&batch)) + b.iter(|| write_batch(&columns)) }); }); } diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index df89c36d777..ee06a4e24ad 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -3,13 +3,13 @@ use std::io::Cursor; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::{clone, Array}; +use arrow2::columns::Columns; use arrow2::error::Result; use arrow2::io::parquet::write::*; -use arrow2::record_batch::RecordBatch; use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array}; fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { - let batch = RecordBatch::try_from_iter([("c1", clone(array).into())])?; + let columns = Columns::new(vec![clone(array).into()]); let schema = batch.schema().clone(); let options = WriteOptions { @@ -19,7 +19,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { }; let row_groups = RowGroupIterator::try_new( - vec![Ok(batch)].into_iter(), + vec![Ok(columns)].into_iter(), &schema, options, vec![encoding], diff --git a/examples/avro_read.rs b/examples/avro_read.rs index 7e4e501740e..f03237e9e46 100644 --- a/examples/avro_read.rs +++ b/examples/avro_read.rs @@ -1,6 +1,5 @@ use std::fs::File; use std::io::BufReader; -use std::sync::Arc; use arrow2::error::Result; use arrow2::io::avro::read; @@ -20,12 +19,12 @@ fn main() -> Result<()> { let reader = read::Reader::new( read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), avro_schema, - Arc::new(schema), + schema.fields, ); - for batch in reader { - let batch = batch?; - assert!(batch.len() > 0); + for maybe_columns in reader { + let columns = maybe_columns?; + assert!(!columns.is_empty()); } Ok(()) } diff --git a/examples/avro_read_async.rs b/examples/avro_read_async.rs index 8aa8f260d49..d3c5940b398 100644 --- a/examples/avro_read_async.rs +++ b/examples/avro_read_async.rs @@ -34,7 +34,7 @@ async fn main() -> Result<()> { deserialize(&decompressed, schema.fields(), &avro_schemas) }); let batch = handle.await.unwrap()?; - assert!(batch.len() > 0); + assert!(!batch.is_empty()); } Ok(()) diff --git a/examples/csv_write.rs b/examples/csv_write.rs index afba2b6c0a6..c1225c77399 100644 --- a/examples/csv_write.rs +++ b/examples/csv_write.rs @@ -1,22 +1,22 @@ -use std::sync::Arc; - use arrow2::{ - array::Int32Array, - datatypes::{Field, Schema}, + array::{Array, Int32Array}, + columns::Columns, error::Result, io::csv::write, - record_batch::RecordBatch, }; -fn write_batch(path: &str, batches: &[RecordBatch]) -> Result<()> { +fn write_batch>( + path: &str, + columns: &[Columns], +) -> Result<()> { let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, batches[0].schema())?; + write::write_header(writer, &["c1"])?; let options = write::SerializeOptions::default(); - batches + columns .iter() - .try_for_each(|batch| write::write_batch(writer, batch, &options)) + .try_for_each(|batch| write::write_columns(writer, batch, &options)) } fn main() -> Result<()> { @@ -29,9 +29,7 @@ fn main() -> Result<()> { Some(5), Some(6), ]); - let field = Field::new("c1", array.data_type().clone(), true); - let schema = Schema::new(vec![field]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + let batch = Columns::try_new(vec![&array as &dyn Array])?; write_batch("example.csv", &[batch]) } diff --git a/examples/csv_write_parallel.rs b/examples/csv_write_parallel.rs index 54e8453fcfa..a12a26a2e5f 100644 --- a/examples/csv_write_parallel.rs +++ b/examples/csv_write_parallel.rs @@ -4,19 +4,18 @@ use std::sync::Arc; use std::thread; use arrow2::{ - array::Int32Array, - datatypes::{Field, Schema}, + array::{Array, Int32Array}, + columns::Columns, error::Result, io::csv::write, - record_batch::RecordBatch, }; -fn parallel_write(path: &str, batches: [RecordBatch; 2]) -> Result<()> { +fn parallel_write(path: &str, batches: [Columns>; 2]) -> Result<()> { let options = write::SerializeOptions::default(); // write a header let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, batches[0].schema())?; + write::write_header(writer, &["c1"])?; // prepare a channel to send serialized records from threads let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); @@ -61,9 +60,7 @@ fn main() -> Result<()> { Some(5), Some(6), ]); - let field = Field::new("c1", array.data_type().clone(), true); - let schema = Schema::new(vec![field]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + let columns = Columns::new(vec![Arc::new(array) as Arc]); - parallel_write("example.csv", [batch.clone(), batch]) + parallel_write("example.csv", [columns.clone(), columns]) } diff --git a/examples/extension.rs b/examples/extension.rs index a9659a821ae..cf185bb6897 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -2,11 +2,11 @@ use std::io::{Cursor, Seek, Write}; use std::sync::Arc; use arrow2::array::*; +use arrow2::columns::Columns; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::ipc::read; use arrow2::io::ipc::write; -use arrow2::record_batch::RecordBatch; fn main() -> Result<()> { // declare an extension. @@ -40,14 +40,14 @@ fn write_ipc(writer: W, array: impl Array + 'static) -> Result< let options = write::WriteOptions { compression: None }; let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?; - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + let batch = Columns::try_new(vec![Arc::new(array) as Arc])?; writer.write(&batch, None)?; Ok(writer.into_inner()) } -fn read_ipc(buf: &[u8]) -> Result { +fn read_ipc(buf: &[u8]) -> Result>> { let mut cursor = Cursor::new(buf); let metadata = read::read_file_metadata(&mut cursor)?; let mut reader = read::FileReader::new(cursor, metadata, None); diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index a1792f10399..5193ef09eee 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -1,20 +1,26 @@ use std::fs::File; +use std::sync::Arc; +use arrow2::array::Array; +use arrow2::columns::Columns; +use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::ipc::read::{read_file_metadata, FileReader}; use arrow2::io::print; -use arrow2::record_batch::RecordBatch; -fn read_batches(path: &str) -> Result> { +fn read_batches(path: &str) -> Result<(Schema, Vec>>)> { let mut file = File::open(path)?; // read the files' metadata. At this point, we can distribute the read whatever we like. let metadata = read_file_metadata(&mut file)?; + let schema = metadata.schema().as_ref().clone(); + // Simplest way: use the reader, an iterator over batches. let reader = FileReader::new(file, metadata, None); - reader.collect() + let columns = reader.collect::>>()?; + Ok((schema, columns)) } fn main() -> Result<()> { @@ -23,7 +29,8 @@ fn main() -> Result<()> { let file_path = &args[1]; - let batches = read_batches(file_path)?; - print::print(&batches); + let (schema, batches) = read_batches(file_path)?; + let names = schema.fields().iter().map(|f| f.name()).collect::>(); + println!("{}", print::write(&batches, &names)); Ok(()) } diff --git a/examples/ipc_file_write.rs b/examples/ipc_file_write.rs index 757622b8ced..ca91783fef3 100644 --- a/examples/ipc_file_write.rs +++ b/examples/ipc_file_write.rs @@ -1,20 +1,20 @@ use std::fs::File; use std::sync::Arc; -use arrow2::array::{Int32Array, Utf8Array}; +use arrow2::array::{Array, Int32Array, Utf8Array}; +use arrow2::columns::Columns; use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::write; -use arrow2::record_batch::RecordBatch; -fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> { +fn write_batches(path: &str, schema: &Schema, columns: &[Columns>]) -> Result<()> { let file = File::create(path)?; let options = write::WriteOptions { compression: None }; let mut writer = write::FileWriter::try_new(file, schema, None, options)?; - for batch in batches { - writer.write(batch, None)? + for columns in columns { + writer.write(columns, None)? } writer.finish() } @@ -34,7 +34,7 @@ fn main() -> Result<()> { let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]); let b = Utf8Array::::from_slice(&["a", "b", "c", "d", "e"]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a), Arc::new(b)])?; + let batch = Columns::try_new(vec![Arc::new(a) as Arc, Arc::new(b)])?; // write it write_batches(file_path, &schema, &[batch])?; diff --git a/examples/metadata.rs b/examples/metadata.rs index 6b020d7b288..ab57146c78d 100644 --- a/examples/metadata.rs +++ b/examples/metadata.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use arrow2::datatypes::{DataType, Field, Schema}; diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index fd474201c19..01a146129d9 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -6,11 +6,11 @@ use std::time::SystemTime; use crossbeam_channel::unbounded; use arrow2::{ - array::Array, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator, - record_batch::RecordBatch, + array::Array, columns::Columns, error::Result, io::parquet::read, + io::parquet::read::MutStreamingIterator, }; -fn parallel_read(path: &str, row_group: usize) -> Result { +fn parallel_read(path: &str, row_group: usize) -> Result>> { // prepare a channel to send compressed pages across threads. let (tx, rx) = unbounded(); @@ -99,7 +99,7 @@ fn parallel_read(path: &str, row_group: usize) -> Result { let columns = columns.into_iter().map(|x| x.1.into()).collect(); println!("Finished - {:?}", start.elapsed().unwrap()); - RecordBatch::try_new(arrow_schema, columns) + Columns::try_new(columns) } fn main() -> Result<()> { @@ -109,7 +109,7 @@ fn main() -> Result<()> { let start = SystemTime::now(); let batch = parallel_read(file_path, 0)?; - assert!(batch.num_rows() > 0); + assert!(!batch.is_empty()); println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) } diff --git a/examples/parquet_read_record.rs b/examples/parquet_read_record.rs index ed0ba5b9ed9..1342db14f94 100644 --- a/examples/parquet_read_record.rs +++ b/examples/parquet_read_record.rs @@ -14,9 +14,9 @@ fn main() -> Result<()> { let reader = read::RecordReader::try_new(reader, None, None, None, None)?; let start = SystemTime::now(); - for maybe_batch in reader { - let batch = maybe_batch?; - assert!(batch.len() > 0); + for maybe_columns in reader { + let columns = maybe_columns?; + assert!(!columns.is_empty()); } println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) diff --git a/examples/parquet_write_record.rs b/examples/parquet_write_record.rs index e08c6d0b8b0..86800a491a7 100644 --- a/examples/parquet_write_record.rs +++ b/examples/parquet_write_record.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::sync::Arc; use arrow2::{ - array::Int32Array, + array::{Array, Int32Array}, columns::Columns, datatypes::{Field, Schema}, error::Result, @@ -51,7 +51,7 @@ fn main() -> Result<()> { ]); let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::new(vec![field]); - let columns = Columns::new(vec![Arc::new(array)]); + let columns = Columns::new(vec![Arc::new(array) as Arc]); write_batch("test.parquet", schema, columns) } diff --git a/src/array/dictionary/mutable.rs b/src/array/dictionary/mutable.rs index bf152bd693b..c322ed63d49 100644 --- a/src/array/dictionary/mutable.rs +++ b/src/array/dictionary/mutable.rs @@ -101,12 +101,18 @@ impl MutableDictionaryArray { &self.values } - /// converts itself into `Arc` + /// converts itself into [`Arc`] pub fn into_arc(self) -> Arc { let a: DictionaryArray = self.into(); Arc::new(a) } + /// converts itself into [`Box`] + pub fn into_box(self) -> Box { + let a: DictionaryArray = self.into(); + Box::new(a) + } + /// Shrinks the capacity of the [`MutableDictionaryArray`] to fit its current length. pub fn shrink_to_fit(&mut self) { self.values.shrink_to_fit(); diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index 52bb9ec5704..23e4aae5121 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -183,6 +183,12 @@ impl MutableListArray { Arc::new(a) } + /// converts itself into [`Box`] + pub fn into_box(self) -> Box { + let a: ListArray = self.into(); + Box::new(a) + } + /// Shrinks the capacity of the [`MutableListArray`] to fit its current length. pub fn shrink_to_fit(&mut self) { self.values.shrink_to_fit(); diff --git a/src/columns.rs b/src/columns.rs index edf562bf9cf..3957049796b 100644 --- a/src/columns.rs +++ b/src/columns.rs @@ -1,18 +1,17 @@ //! Contains [`Columns`], a container of [`Array`] where every array has the //! same length. -use std::sync::Arc; use crate::array::Array; use crate::error::{ArrowError, Result}; -use crate::record_batch::RecordBatch; -/// A vector of [`Array`] where every array has the same length. +/// A vector of trait objects of [`Array`] where every item has +/// the same length, [`Columns::len`]. #[derive(Debug, Clone, PartialEq)] -pub struct Columns> { +pub struct Columns> { arrays: Vec, } -impl> Columns { +impl> Columns { /// Creates a new [`Columns`]. /// # Panic /// Iff the arrays do not have the same length @@ -25,10 +24,10 @@ impl> Columns { /// Iff the arrays do not have the same length pub fn try_new(arrays: Vec) -> Result { if !arrays.is_empty() { - let len = arrays.first().unwrap().as_ref().len(); + let len = arrays.first().unwrap().borrow().len(); if arrays .iter() - .map(|array| array.as_ref()) + .map(|array| array.borrow()) .any(|array| array.len() != len) { return Err(ArrowError::InvalidArgumentError( @@ -39,19 +38,29 @@ impl> Columns { Ok(Self { arrays }) } - /// returns the [`Array`]s in [`Columns`]. + /// returns the [`Array`]s in [`Columns`] pub fn arrays(&self) -> &[A] { &self.arrays } - /// returns the length (number of rows) + /// returns the [`Array`]s in [`Columns`] + pub fn columns(&self) -> &[A] { + &self.arrays + } + + /// returns the number of rows of every array pub fn len(&self) -> usize { self.arrays .first() - .map(|x| x.as_ref().len()) + .map(|x| x.borrow().len()) .unwrap_or_default() } + /// returns whether the columns have any rows + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Consumes [`Columns`] into its underlying arrays. /// The arrays are guaranteed to have the same length pub fn into_arrays(self) -> Vec { @@ -59,13 +68,13 @@ impl> Columns { } } -impl> From> for Vec { +impl> From> for Vec { fn from(c: Columns) -> Self { c.into_arrays() } } -impl> std::ops::Deref for Columns { +impl> std::ops::Deref for Columns { type Target = [A]; #[inline] @@ -73,11 +82,3 @@ impl> std::ops::Deref for Columns { self.arrays() } } - -impl From for Columns> { - fn from(batch: RecordBatch) -> Self { - Self { - arrays: batch.into_inner().0, - } - } -} diff --git a/src/compute/filter.rs b/src/compute/filter.rs index 262ac2b9add..eab6041e991 100644 --- a/src/compute/filter.rs +++ b/src/compute/filter.rs @@ -154,7 +154,7 @@ pub fn filter(array: &dyn Array, filter: &BooleanArray) -> Result /// Returns a new [Columns] with arrays containing only values matching the filter. /// This is a convenience function: filter multiple columns is embarassingly parallel. -pub fn filter_columns>( +pub fn filter_columns>( columns: &Columns, filter_values: &BooleanArray, ) -> Result>> { @@ -164,11 +164,11 @@ pub fn filter_columns>( let filtered_arrays = match num_colums { 1 => { - vec![filter(columns.arrays()[0].as_ref(), filter_values)?] + vec![filter(columns.arrays()[0].borrow(), filter_values)?] } _ => { let filter = build_filter(filter_values)?; - arrays.iter().map(|a| filter(a.as_ref())).collect() + arrays.iter().map(|a| filter(a.borrow())).collect() } }; Columns::try_new(filtered_arrays) diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index 858595c2c08..b391cfc3993 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -200,30 +200,6 @@ pub enum IntervalUnit { } impl DataType { - /// Compares the datatype with another, ignoring nested field names - /// and metadata. - pub(crate) fn equals_datatype(&self, other: &DataType) -> bool { - match (&self, other) { - (DataType::List(a), DataType::List(b)) - | (DataType::LargeList(a), DataType::LargeList(b)) => { - a.is_nullable() == b.is_nullable() && a.data_type().equals_datatype(b.data_type()) - } - (DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => { - a_size == b_size - && a.is_nullable() == b.is_nullable() - && a.data_type().equals_datatype(b.data_type()) - } - (DataType::Struct(a), DataType::Struct(b)) => { - a.len() == b.len() - && a.iter().zip(b).all(|(a, b)| { - a.is_nullable() == b.is_nullable() - && a.data_type().equals_datatype(b.data_type()) - }) - } - _ => self == other, - } - } - /// the [`PhysicalType`] of this [`DataType`]. pub fn to_physical_type(&self) -> PhysicalType { use DataType::*; diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index a442b8c7587..156543ba582 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -1,4 +1,5 @@ use std::convert::TryInto; +use std::sync::Arc; use avro_schema::{Enum, Schema as AvroSchema}; @@ -246,7 +247,7 @@ pub fn deserialize( block: &Block, fields: &[Field], avro_schemas: &[AvroSchema], -) -> Result>> { +) -> Result>> { let rows = block.number_of_rows; let mut block = block.data.as_ref(); @@ -270,5 +271,5 @@ pub fn deserialize( block = deserialize_item(array.as_mut(), field.is_nullable(), avro_field, block)? } } - Columns::try_new(arrays.iter_mut().map(|array| array.as_box()).collect()) + Columns::try_new(arrays.iter_mut().map(|array| array.as_arc()).collect()) } diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index c7386500ec4..6c6f88ca7c5 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -1,5 +1,6 @@ //! APIs to read from Avro format to arrow. use std::io::Read; +use std::sync::Arc; use avro_schema::{Record, Schema as AvroSchema}; use fallible_streaming_iterator::FallibleStreamingIterator; @@ -66,7 +67,7 @@ impl Reader { } impl Iterator for Reader { - type Item = Result>>; + type Item = Result>>; fn next(&mut self) -> Option { let fields = &self.fields[..]; diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 7de8e2ef75f..1e8788f3fa1 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -16,20 +16,20 @@ use crate::error::Result; /// Creates serializers that iterate over each column that serializes each item according /// to `options`. -fn new_serializers<'a, A: AsRef>( +fn new_serializers<'a, A: std::borrow::Borrow>( columns: &'a [A], options: &'a SerializeOptions, ) -> Result + 'a>>> { columns .iter() - .map(|column| new_serializer(column.as_ref(), options)) + .map(|column| new_serializer(column.borrow(), options)) .collect() } /// Serializes [`Columns`] to a vector of `ByteRecord`. /// The vector is guaranteed to have `columns.len()` entries. /// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields. -pub fn serialize>( +pub fn serialize>( columns: &Columns, options: &SerializeOptions, ) -> Result> { @@ -47,7 +47,7 @@ pub fn serialize>( } /// Writes [`Columns`] to `writer` according to the serialization options `options`. -pub fn write_batch>( +pub fn write_columns>( writer: &mut Writer, columns: &Columns, options: &SerializeOptions, @@ -70,8 +70,8 @@ pub fn write_batch>( Ok(()) } -/// Writes a header to `writer` -pub fn write_header(writer: &mut Writer, names: &[T]) -> Result<()> +/// Writes a CSV header to `writer` +pub fn write_header(writer: &mut Writer, names: &[T]) -> Result<()> where T: AsRef, { diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 21ef52dc572..be74b11c6e1 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -4,28 +4,28 @@ use arrow_format::flight::data::{FlightData, SchemaResult}; use arrow_format::ipc; use crate::{ + array::Array, + columns::Columns, datatypes::*, error::{ArrowError, Result}, io::ipc::read, io::ipc::write, io::ipc::write::common::{encode_columns, DictionaryTracker, EncodedData, WriteOptions}, - record_batch::RecordBatch, }; use super::ipc::{IpcField, IpcSchema}; -/// Serializes a [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries +/// Serializes a [`Columns`] to a vector of [`FlightData`] representing the serialized dictionaries /// and a [`FlightData`] representing the batch. pub fn serialize_batch( - batch: &RecordBatch, + columns: &Columns>, fields: &[IpcField], options: &WriteOptions, ) -> (Vec, FlightData) { let mut dictionary_tracker = DictionaryTracker::new(false); - let columns = batch.clone().into(); let (encoded_dictionaries, encoded_batch) = - encode_columns(&columns, fields, &mut dictionary_tracker, options) + encode_columns(columns, fields, &mut dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); @@ -99,13 +99,13 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { } } -/// Deserializes [`FlightData`] to a [`RecordBatch`]. +/// Deserializes [`FlightData`] to [`Columns`]. pub fn deserialize_batch( data: &FlightData, schema: Arc, ipc_schema: &IpcSchema, dictionaries: &read::Dictionaries, -) -> Result { +) -> Result>> { // check that the data_header is a record batch message let message = ipc::Message::root_as_message(&data.data_header[..]).map_err(|err| { ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index dd4b93590af..33bda3491b4 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -10,9 +10,8 @@ //! using the more integrated approach that is exposed in this module. //! //! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) -//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es or -//! [`DictionaryBatch`](gen::Message::DictionaryBatch) to be passed -//! around due to its reliance on a pre-defined data scheme. This limitation +//! allows only batch or dictionary columns to be passed +//! around due to its reliance on a pre-defined data scheme. This constraint //! provides a large performance gain because serialized data will always have a //! known structutre, i.e. the same fields and datatypes, with the only variance //! being the number of rows and the actual data inside the Batch. This dramatically diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 1bc9b87ac31..251640f9df5 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -6,10 +6,10 @@ use arrow_format::ipc; use arrow_format::ipc::Schema::MetadataVersion; use crate::array::*; +use crate::columns::Columns; use crate::datatypes::{DataType, Field, Schema}; use crate::error::{ArrowError, Result}; use crate::io::ipc::{IpcField, IpcSchema}; -use crate::record_batch::RecordBatch; use super::deserialize::{read, skip}; use super::Dictionaries; @@ -82,12 +82,12 @@ pub fn read_record_batch( batch: ipc::Message::RecordBatch, schema: Arc, ipc_schema: &IpcSchema, - projection: Option<(&[usize], Arc)>, + projection: Option<&[usize]>, dictionaries: &Dictionaries, version: MetadataVersion, reader: &mut R, block_offset: u64, -) -> Result { +) -> Result>> { assert_eq!(schema.fields().len(), ipc_schema.fields.len()); let buffers = batch.buffers().ok_or_else(|| { ArrowError::OutOfSpec("Unable to get buffers from IPC RecordBatch".to_string()) @@ -99,15 +99,13 @@ pub fn read_record_batch( let mut field_nodes = field_nodes.iter().collect::>(); - let (schema, columns) = if let Some(projection) = projection { - let projected_schema = projection.1.clone(); - + let columns = if let Some(projection) = projection { let projection = ProjectionIter::new( - projection.0, + projection, schema.fields().iter().zip(ipc_schema.fields.iter()), ); - let arrays = projection + projection .map(|maybe_field| match maybe_field { ProjectionResult::Selected((field, ipc_field)) => Some(read( &mut field_nodes, @@ -127,10 +125,9 @@ pub fn read_record_batch( } }) .flatten() - .collect::>>()?; - (projected_schema, arrays) + .collect::>>()? } else { - let arrays = schema + schema .fields() .iter() .zip(ipc_schema.fields.iter()) @@ -148,10 +145,9 @@ pub fn read_record_batch( version, ) }) - .collect::>>()?; - (schema.clone(), arrays) + .collect::>>()? }; - RecordBatch::try_new(schema, columns) + Columns::try_new(columns) } fn find_first_dict_field_d<'a>( @@ -242,7 +238,7 @@ pub fn read_dictionary( }; assert_eq!(ipc_schema.fields.len(), schema.fields().len()); // Read a single column - let record_batch = read_record_batch( + let columns = read_record_batch( batch.data().unwrap(), schema, &ipc_schema, @@ -252,7 +248,8 @@ pub fn read_dictionary( reader, block_offset, )?; - Some(record_batch.column(0).clone()) + let mut arrays = columns.into_arrays(); + Some(arrays.pop().unwrap()) } _ => None, } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index af980b7ef75..4aa7b994e8b 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -5,10 +5,11 @@ use arrow_format::ipc; use arrow_format::ipc::flatbuffers::VerifierOptions; use arrow_format::ipc::File::Block; +use crate::array::Array; +use crate::columns::Columns; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; use crate::io::ipc::IpcSchema; -use crate::record_batch::RecordBatch; use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::*; @@ -205,10 +206,10 @@ fn get_serialized_batch<'a>( pub fn read_batch( reader: &mut R, metadata: &FileMetadata, - projection: Option<(&[usize], Arc)>, + projection: Option<&[usize]>, block: usize, block_data: &mut Vec, -) -> Result { +) -> Result>> { let block = metadata.blocks[block]; // read length @@ -293,7 +294,7 @@ impl FileReader { } impl Iterator for FileReader { - type Item = Result; + type Item = Result>>; fn next(&mut self) -> Option { // get current block @@ -303,9 +304,7 @@ impl Iterator for FileReader { Some(read_batch( &mut self.reader, &self.metadata, - self.projection - .as_ref() - .map(|x| (x.0.as_ref(), x.1.clone())), + self.projection.as_ref().map(|x| x.0.as_ref()), block, &mut self.buffer, )) diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index ebc6a83c4f1..4b8983e6c36 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -4,10 +4,11 @@ use std::sync::Arc; use arrow_format::ipc; use arrow_format::ipc::Schema::MetadataVersion; +use crate::array::Array; +use crate::columns::Columns; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; use crate::io::ipc::IpcSchema; -use crate::record_batch::RecordBatch; use super::super::CONTINUATION_MARKER; use super::common::*; @@ -64,15 +65,15 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { /// A stream is an iterator, and an iterator returns `Option`. The `Item` /// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow /// stream may yield one of three values: (1) `None`, which signals that the stream -/// is done; (2) `Some(StreamState::Some(RecordBatch))`, which signals that there was +/// is done; (2) [`StreamState::Some`], which signals that there was /// data waiting in the stream and we read it; and finally (3) -/// `Some(StreamState::Waiting)`, which means that the stream is still "live", it +/// [`Some(StreamState::Waiting)`], which means that the stream is still "live", it /// just doesn't hold any data right now. pub enum StreamState { /// A live stream without data Waiting, /// Next item in the stream - Some(RecordBatch), + Some(Columns>), } impl StreamState { @@ -81,7 +82,7 @@ impl StreamState { /// # Panics /// /// If the `StreamState` was `Waiting`. - pub fn unwrap(self) -> RecordBatch { + pub fn unwrap(self) -> Columns> { if let StreamState::Some(batch) = self { batch } else { @@ -184,7 +185,7 @@ fn read_next( 0, )?; - // read the next message until we encounter a RecordBatch + // read the next message until we encounter a RecordBatch message read_next(reader, metadata, dictionaries, message_buffer, data_buffer) } ipc::Message::MessageHeader::NONE => Ok(Some(StreamState::Waiting)), diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index 20890c93538..d2d56796140 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -195,7 +195,7 @@ pub fn encode_columns( Ok((encoded_dictionaries, encoded_message)) } -/// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Schema::Message) and the +/// Write [`Columns`] into two sets of bytes, one for the header (ipc::Schema::Message) and the /// other for the batch's data fn columns_to_bytes(columns: &Columns>, options: &WriteOptions) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index b19a76feeb8..6c76d69bbcd 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -4,15 +4,17 @@ //! however the `FileWriter` expects a reader that supports `Seek`ing use std::io::Write; +use std::sync::Arc; use super::super::IpcField; use super::common::{encode_columns, DictionaryTracker, EncodedData, WriteOptions}; use super::common_sync::{write_continuation, write_message}; use super::schema_to_bytes; +use crate::array::Array; +use crate::columns::Columns; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::record_batch::RecordBatch; /// Arrow stream writer /// @@ -52,8 +54,8 @@ impl StreamWriter { Ok(()) } - /// Writes [`RecordBatch`] to the stream - pub fn write(&mut self, batch: &RecordBatch, fields: &[IpcField]) -> Result<()> { + /// Writes [`Columns`] to the stream + pub fn write(&mut self, columns: &Columns>, fields: &[IpcField]) -> Result<()> { if self.finished { return Err(ArrowError::Io(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, @@ -61,9 +63,8 @@ impl StreamWriter { ))); } - let columns = batch.clone().into(); let (encoded_dictionaries, encoded_message) = encode_columns( - &columns, + columns, fields, &mut self.dictionary_tracker, &self.write_options, diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index e84df52342c..1606a11f268 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -1,4 +1,5 @@ //! `async` writing of arrow streams +use std::sync::Arc; use futures::AsyncWrite; @@ -8,9 +9,10 @@ use super::common::{encode_columns, DictionaryTracker, EncodedData}; use super::common_async::{write_continuation, write_message}; use super::{default_ipc_fields, schema_to_bytes}; +use crate::array::Array; +use crate::columns::Columns; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::record_batch::RecordBatch; /// An `async` writer to the Apache Arrow stream format. pub struct StreamWriter { @@ -53,10 +55,11 @@ impl StreamWriter { Ok(()) } - /// Writes [`RecordBatch`] to the stream + /// Writes [`Columns`] to the stream pub async fn write( &mut self, - batch: &RecordBatch, + columns: &Columns>, + schema: &Schema, ipc_fields: Option<&[IpcField]>, ) -> Result<()> { if self.finished { @@ -67,18 +70,16 @@ impl StreamWriter { } let (encoded_dictionaries, encoded_message) = if let Some(ipc_fields) = ipc_fields { - let columns = batch.clone().into(); encode_columns( - &columns, + columns, ipc_fields, &mut self.dictionary_tracker, &self.write_options, )? } else { - let ipc_fields = default_ipc_fields(batch.schema().fields()); - let columns = batch.clone().into(); + let ipc_fields = default_ipc_fields(schema.fields()); encode_columns( - &columns, + columns, &ipc_fields, &mut self.dictionary_tracker, &self.write_options, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 4591f9a3953..b5fe5d94232 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -1,4 +1,4 @@ -use std::io::Write; +use std::{io::Write, sync::Arc}; use arrow_format::ipc; use arrow_format::ipc::flatbuffers::FlatBufferBuilder; @@ -11,9 +11,10 @@ use super::{ default_ipc_fields, schema, schema_to_bytes, }; +use crate::array::Array; +use crate::columns::Columns; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::record_batch::RecordBatch; /// Arrow file writer pub struct FileWriter { @@ -78,8 +79,12 @@ impl FileWriter { self.writer } - /// Writes [`RecordBatch`] to the file - pub fn write(&mut self, batch: &RecordBatch, ipc_fields: Option<&[IpcField]>) -> Result<()> { + /// Writes [`Columns`] to the file + pub fn write( + &mut self, + columns: &Columns>, + ipc_fields: Option<&[IpcField]>, + ) -> Result<()> { if self.finished { return Err(ArrowError::Io(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, @@ -93,9 +98,8 @@ impl FileWriter { self.ipc_fields.as_ref() }; - let columns = batch.clone().into(); let (encoded_dictionaries, encoded_message) = encode_columns( - &columns, + columns, ipc_fields, &mut self.dictionary_tracker, &self.options, diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index 766d58a5fa6..3e5cf224859 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -30,12 +30,12 @@ where Ok(()) } -/// [`FallibleStreamingIterator`] that serializes a [`RecordBatch`] to bytes. +/// [`FallibleStreamingIterator`] that serializes a [`Columns`] to bytes. /// Advancing it is CPU-bounded pub struct Serializer where F: JsonFormat, - A: AsRef, + A: std::borrow::Borrow, I: Iterator>>, { batches: I, @@ -47,7 +47,7 @@ where impl Serializer where F: JsonFormat, - A: AsRef, + A: std::borrow::Borrow, I: Iterator>>, { /// Creates a new [`Serializer`]. @@ -64,7 +64,7 @@ where impl FallibleStreamingIterator for Serializer where F: JsonFormat, - A: AsRef, + A: std::borrow::Borrow, I: Iterator>>, { type Item = [u8]; diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 4618989f2f6..2899f2cf874 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -83,7 +83,7 @@ fn struct_serializer<'a>( serializers .iter_mut() .zip(names) - // `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch` + // `unwrap` is infalible because `array.len()` equals `len` on `Columns` .for_each(|(iter, name)| { let item = iter.next().unwrap(); record.push((name, item)); @@ -202,7 +202,7 @@ fn serialize_item( pub fn serialize(names: &[N], columns: &Columns, format: F, buffer: &mut Vec) where N: AsRef, - A: AsRef, + A: std::borrow::Borrow, F: JsonFormat, { let num_rows = columns.len(); @@ -210,7 +210,7 @@ where let mut serializers: Vec<_> = columns .arrays() .iter() - .map(|array| new_serializer(array.as_ref())) + .map(|array| new_serializer(array.borrow())) .collect(); let mut is_first_row = true; @@ -219,7 +219,7 @@ where serializers .iter_mut() .zip(names.iter()) - // `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch` + // `unwrap` is infalible because `array.len()` equals `len` on `Columns` .for_each(|(iter, name)| { let item = iter.next().unwrap(); record.push((name.as_ref(), item)); diff --git a/src/io/json_integration/read/array.rs b/src/io/json_integration/read/array.rs index 6579d7f815c..a6913139b39 100644 --- a/src/io/json_integration/read/array.rs +++ b/src/io/json_integration/read/array.rs @@ -7,10 +7,10 @@ use crate::{ array::*, bitmap::{Bitmap, MutableBitmap}, buffer::Buffer, + columns::Columns, datatypes::{DataType, PhysicalType, PrimitiveType, Schema}, error::{ArrowError, Result}, io::ipc::IpcField, - record_batch::RecordBatch, types::{days_ms, months_days_ns, NativeType}, }; @@ -410,13 +410,13 @@ pub fn to_array( } } -pub fn to_record_batch( +pub fn deserialize_columns( schema: &Schema, ipc_fields: &[IpcField], json_batch: &ArrowJsonBatch, json_dictionaries: &HashMap, -) -> Result { - let columns = schema +) -> Result>> { + let arrays = schema .fields() .iter() .zip(&json_batch.columns) @@ -429,7 +429,7 @@ pub fn to_record_batch( json_dictionaries, ) }) - .collect::>>()?; + .collect::>()?; - RecordBatch::try_new(Arc::new(schema.clone()), columns) + Columns::try_new(arrays) } diff --git a/src/io/json_integration/write/array.rs b/src/io/json_integration/write/array.rs index ea0bc1c7669..edb01823e3f 100644 --- a/src/io/json_integration/write/array.rs +++ b/src/io/json_integration/write/array.rs @@ -1,19 +1,24 @@ -use crate::record_batch::RecordBatch; -use crate::{array::PrimitiveArray, datatypes::DataType}; +use std::sync::Arc; + +use crate::{ + array::{Array, PrimitiveArray}, + columns::Columns, + datatypes::DataType, +}; use super::super::{ArrowJsonBatch, ArrowJsonColumn}; -/// Serializes a [`RecordBatch`] to [`ArrowJsonBatch`]. -pub fn from_record_batch(batch: &RecordBatch) -> ArrowJsonBatch { - let mut json_batch = ArrowJsonBatch { - count: batch.num_rows(), - columns: Vec::with_capacity(batch.num_columns()), - }; +/// Serializes a [`Columns`] to [`ArrowJsonBatch`]. +pub fn serialize_columns(columns: &Columns>, names: &[&str]) -> ArrowJsonBatch { + let count = columns.len(); - for (col, field) in batch.columns().iter().zip(batch.schema().fields.iter()) { - let json_col = match field.data_type() { + let columns = columns + .arrays() + .iter() + .zip(names.iter()) + .map(|(array, name)| match array.data_type() { DataType::Int8 => { - let array = col.as_any().downcast_ref::>().unwrap(); + let array = array.as_any().downcast_ref::>().unwrap(); let (validity, data) = array .iter() @@ -21,8 +26,8 @@ pub fn from_record_batch(batch: &RecordBatch) -> ArrowJsonBatch { .unzip(); ArrowJsonColumn { - name: field.name().clone(), - count: col.len(), + name: name.to_string(), + count: array.len(), validity: Some(validity), data: Some(data), offset: None, @@ -31,18 +36,16 @@ pub fn from_record_batch(batch: &RecordBatch) -> ArrowJsonBatch { } } _ => ArrowJsonColumn { - name: field.name().clone(), - count: col.len(), + name: name.to_string(), + count: array.len(), validity: None, data: None, offset: None, type_id: None, children: None, }, - }; - - json_batch.columns.push(json_col); - } + }) + .collect(); - json_batch + ArrowJsonBatch { count, columns } } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index d442254f4e2..832527ee271 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -12,18 +12,23 @@ use crate::{ error::{ArrowError, Result}, }; -/// An iterator adapter that converts an iterator over [`RecordBatch`] into an iterator +/// An iterator adapter that converts an iterator over [`Columns`] into an iterator /// of row groups. /// Use it to create an iterator consumable by the parquet's API. -pub struct RowGroupIterator + 'static, I: Iterator>>> { +pub struct RowGroupIterator< + A: std::borrow::Borrow + 'static, + I: Iterator>>, +> { iter: I, options: WriteOptions, parquet_schema: SchemaDescriptor, encodings: Vec, } -impl + 'static, I: Iterator>>> RowGroupIterator { - /// Creates a new [`RowGroupIterator`] from an iterator over [`RecordBatch`]. +impl + 'static, I: Iterator>>> + RowGroupIterator +{ + /// Creates a new [`RowGroupIterator`] from an iterator over [`Columns`]. pub fn try_new( iter: I, schema: &Schema, @@ -48,7 +53,7 @@ impl + 'static, I: Iterator>>> RowG } } -impl + 'static, I: Iterator>>> Iterator +impl + 'static, I: Iterator>>> Iterator for RowGroupIterator { type Item = Result>; @@ -66,7 +71,7 @@ impl + 'static, I: Iterator>>> Iter .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) .map(move |((array, descriptor), encoding)| { - array_to_pages(array.as_ref(), descriptor, options, encoding).map( + array_to_pages(array.borrow(), descriptor, options, encoding).map( move |pages| { let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); let compressed_pages = diff --git a/src/io/print.rs b/src/io/print.rs index 4cc46f31e73..e47639e4a8f 100644 --- a/src/io/print.rs +++ b/src/io/print.rs @@ -1,51 +1,42 @@ -//! APIs to represent [`RecordBatch`] as a formatted table. +//! APIs to represent [`Columns`] as a formatted table. -use crate::{array::get_display, record_batch::RecordBatch}; +use crate::{ + array::{get_display, Array}, + columns::Columns, +}; use comfy_table::{Cell, Table}; -/// Returns a visual representation of multiple [`RecordBatch`]es. -pub fn write(batches: &[RecordBatch]) -> String { - create_table(batches).to_string() -} - -/// Prints a visual representation of record batches to stdout -pub fn print(results: &[RecordBatch]) { - println!("{}", create_table(results)) -} - -/// Convert a series of record batches into a table -fn create_table(results: &[RecordBatch]) -> Table { +/// Returns a visual representation of [`Columns`] +pub fn write, N: AsRef>( + batches: &[Columns], + names: &[N], +) -> String { let mut table = Table::new(); table.load_preset("||--+-++| ++++++"); - if results.is_empty() { - return table; + if batches.is_empty() { + return table.to_string(); } - let schema = results[0].schema(); - - let mut header = Vec::new(); - for field in schema.fields() { - header.push(Cell::new(field.name())); - } + let header = names.iter().map(|name| Cell::new(name.as_ref())); table.set_header(header); - for batch in results { + for batch in batches { let displayes = batch - .columns() + .arrays() .iter() - .map(|array| get_display(array.as_ref())) + .map(|array| get_display(array.borrow())) .collect::>(); - for row in 0..batch.num_rows() { + for row in 0..batch.len() { let mut cells = Vec::new(); - (0..batch.num_columns()).for_each(|col| { + (0..batch.arrays().len()).for_each(|col| { let string = displayes[col](row); cells.push(Cell::new(&string)); }); table.add_row(cells); } } - table + table.to_string() } diff --git a/src/lib.rs b/src/lib.rs index fd62ca0465a..de2ed0cd1d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ pub mod types; pub mod compute; pub mod io; -pub mod record_batch; +//pub mod record_batch; pub mod temporal_conversions; pub mod datatypes; diff --git a/src/record_batch.rs b/src/record_batch.rs deleted file mode 100644 index 369e74a0481..00000000000 --- a/src/record_batch.rs +++ /dev/null @@ -1,340 +0,0 @@ -//! Contains [`RecordBatch`]. -use std::sync::Arc; - -use crate::array::*; -use crate::datatypes::*; -use crate::error::{ArrowError, Result}; - -/// A two-dimensional dataset with a number of -/// columns ([`Array`]) and rows and defined [`Schema`](crate::datatypes::Schema). -/// # Implementation -/// Cloning is `O(C)` where `C` is the number of columns. -#[derive(Clone, Debug, PartialEq)] -pub struct RecordBatch { - schema: Arc, - columns: Vec>, -} - -impl RecordBatch { - /// Creates a [`RecordBatch`] from a schema and columns. - /// # Errors - /// This function errors iff - /// * `columns` is empty - /// * the schema and column data types do not match - /// * `columns` have a different length - /// # Example - /// - /// ``` - /// # use std::sync::Arc; - /// # use arrow2::array::PrimitiveArray; - /// # use arrow2::datatypes::{Schema, Field, DataType}; - /// # use arrow2::record_batch::RecordBatch; - /// # fn main() -> arrow2::error::Result<()> { - /// let id_array = PrimitiveArray::from_slice([1i32, 2, 3, 4, 5]); - /// let schema = Arc::new(Schema::new(vec![ - /// Field::new("id", DataType::Int32, false) - /// ])); - /// - /// let batch = RecordBatch::try_new( - /// schema, - /// vec![Arc::new(id_array)] - /// )?; - /// # Ok(()) - /// # } - /// ``` - pub fn try_new(schema: Arc, columns: Vec>) -> Result { - let options = RecordBatchOptions::default(); - Self::validate_new_batch(&schema, columns.as_slice(), &options)?; - Ok(RecordBatch { schema, columns }) - } - - /// Creates a [`RecordBatch`] from a schema and columns, with additional options, - /// such as whether to strictly validate field names. - /// - /// See [`Self::try_new()`] for the expected conditions. - pub fn try_new_with_options( - schema: Arc, - columns: Vec>, - options: &RecordBatchOptions, - ) -> Result { - Self::validate_new_batch(&schema, &columns, options)?; - Ok(RecordBatch { schema, columns }) - } - - /// Creates a new empty [`RecordBatch`]. - pub fn new_empty(schema: Arc) -> Self { - let columns = schema - .fields() - .iter() - .map(|field| new_empty_array(field.data_type().clone()).into()) - .collect(); - RecordBatch { schema, columns } - } - - /// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error - /// if any validation check fails. - fn validate_new_batch( - schema: &Schema, - columns: &[Arc], - options: &RecordBatchOptions, - ) -> Result<()> { - // check that there are some columns - if columns.is_empty() { - return Err(ArrowError::InvalidArgumentError( - "at least one column must be defined to create a record batch".to_string(), - )); - } - // check that number of fields in schema match column length - if schema.fields().len() != columns.len() { - return Err(ArrowError::InvalidArgumentError(format!( - "number of columns({}) must match number of fields({}) in schema", - columns.len(), - schema.fields().len(), - ))); - } - // check that all columns have the same row count, and match the schema - let len = columns[0].len(); - - // This is a bit repetitive, but it is better to check the condition outside the loop - if options.match_field_names { - for (i, column) in columns.iter().enumerate() { - if column.len() != len { - return Err(ArrowError::InvalidArgumentError( - "all columns in a record batch must have the same length".to_string(), - )); - } - if column.data_type() != schema.field(i).data_type() { - return Err(ArrowError::InvalidArgumentError(format!( - "column types must match schema types, expected {:?} but found {:?} at column index {}", - schema.field(i).data_type(), - column.data_type(), - i))); - } - } - } else { - for (i, column) in columns.iter().enumerate() { - if column.len() != len { - return Err(ArrowError::InvalidArgumentError( - "all columns in a record batch must have the same length".to_string(), - )); - } - if !column - .data_type() - .equals_datatype(schema.field(i).data_type()) - { - return Err(ArrowError::InvalidArgumentError(format!( - "column types must match schema types, expected {:?} but found {:?} at column index {}", - schema.field(i).data_type(), - column.data_type(), - i))); - } - } - } - - Ok(()) - } - - /// Returns the [`Schema`](crate::datatypes::Schema) of the record batch. - pub fn schema(&self) -> &Arc { - &self.schema - } - - /// Returns the number of columns in the record batch. - /// - /// # Example - /// - /// ``` - /// # use std::sync::Arc; - /// # use arrow2::array::PrimitiveArray; - /// # use arrow2::datatypes::{Schema, Field, DataType}; - /// # use arrow2::record_batch::RecordBatch; - /// # fn main() -> arrow2::error::Result<()> { - /// let id_array = PrimitiveArray::from_slice([1i32, 2, 3, 4, 5]); - /// let schema = Arc::new(Schema::new(vec![ - /// Field::new("id", DataType::Int32, false) - /// ])); - /// - /// let batch = RecordBatch::try_new(schema, vec![Arc::new(id_array)])?; - /// - /// assert_eq!(batch.num_columns(), 1); - /// # Ok(()) - /// # } - /// ``` - pub fn num_columns(&self) -> usize { - self.columns.len() - } - - /// Returns the number of rows in each column. - /// - /// # Panics - /// - /// Panics if the `RecordBatch` contains no columns. - /// - /// # Example - /// - /// ``` - /// # use std::sync::Arc; - /// # use arrow2::array::PrimitiveArray; - /// # use arrow2::datatypes::{Schema, Field, DataType}; - /// # use arrow2::record_batch::RecordBatch; - /// # fn main() -> arrow2::error::Result<()> { - /// let id_array = PrimitiveArray::from_slice([1i32, 2, 3, 4, 5]); - /// let schema = Arc::new(Schema::new(vec![ - /// Field::new("id", DataType::Int32, false) - /// ])); - /// - /// let batch = RecordBatch::try_new(schema, vec![Arc::new(id_array)])?; - /// - /// assert_eq!(batch.num_rows(), 5); - /// # Ok(()) - /// # } - /// ``` - pub fn num_rows(&self) -> usize { - self.columns[0].len() - } - - /// Get a reference to a column's array by index. - /// - /// # Panics - /// - /// Panics if `index` is outside of `0..num_columns`. - pub fn column(&self, index: usize) -> &Arc { - &self.columns[index] - } - - /// Get a reference to all columns in the record batch. - pub fn columns(&self) -> &[Arc] { - &self.columns[..] - } - - /// Create a `RecordBatch` from an iterable list of pairs of the - /// form `(field_name, array)`, with the same requirements on - /// fields and arrays as [`RecordBatch::try_new`]. This method is - /// often used to create a single `RecordBatch` from arrays, - /// e.g. for testing. - /// - /// The resulting schema is marked as nullable for each column if - /// the array for that column is has any nulls. To explicitly - /// specify nullibility, use [`RecordBatch::try_from_iter_with_nullable`] - /// - /// Example: - /// ``` - /// use std::sync::Arc; - /// use arrow2::array::*; - /// use arrow2::datatypes::DataType; - /// use arrow2::record_batch::RecordBatch; - /// - /// let a: Arc = Arc::new(Int32Array::from_slice(&[1, 2])); - /// let b: Arc = Arc::new(Utf8Array::::from_slice(&["a", "b"])); - /// - /// let record_batch = RecordBatch::try_from_iter(vec![ - /// ("a", a), - /// ("b", b), - /// ]); - /// ``` - pub fn try_from_iter(value: I) -> Result - where - I: IntoIterator)>, - F: AsRef, - { - // TODO: implement `TryFrom` trait, once - // https://github.com/rust-lang/rust/issues/50133 is no longer an - // issue - let iter = value.into_iter().map(|(field_name, array)| { - let nullable = array.null_count() > 0; - (field_name, array, nullable) - }); - - Self::try_from_iter_with_nullable(iter) - } - - /// Create a `RecordBatch` from an iterable list of tuples of the - /// form `(field_name, array, nullable)`, with the same requirements on - /// fields and arrays as [`RecordBatch::try_new`]. This method is often - /// used to create a single `RecordBatch` from arrays, e.g. for - /// testing. - /// - /// Example: - /// ``` - /// use std::sync::Arc; - /// use arrow2::array::*; - /// use arrow2::datatypes::DataType; - /// use arrow2::record_batch::RecordBatch; - /// - /// let a: Arc = Arc::new(Int32Array::from_slice(&[1, 2])); - /// let b: Arc = Arc::new(Utf8Array::::from_slice(&["a", "b"])); - /// - /// // Note neither `a` nor `b` has any actual nulls, but we mark - /// // b an nullable - /// let record_batch = RecordBatch::try_from_iter_with_nullable(vec![ - /// ("a", a, false), - /// ("b", b, true), - /// ]); - /// ``` - pub fn try_from_iter_with_nullable(value: I) -> Result - where - I: IntoIterator, bool)>, - F: AsRef, - { - // TODO: implement `TryFrom` trait, once - // https://github.com/rust-lang/rust/issues/50133 is no longer an - // issue - let (fields, columns) = value - .into_iter() - .map(|(field_name, array, nullable)| { - let field_name = field_name.as_ref(); - let field = Field::new(field_name, array.data_type().clone(), nullable); - (field, array) - }) - .unzip(); - - let schema = Arc::new(Schema::new(fields)); - RecordBatch::try_new(schema, columns) - } - - /// Deconstructs itself into its internal components - pub fn into_inner(self) -> (Vec>, Arc) { - let Self { columns, schema } = self; - (columns, schema) - } -} - -/// Options that control the behaviour used when creating a [`RecordBatch`]. -#[derive(Debug)] -pub struct RecordBatchOptions { - /// Match field names of structs and lists. If set to `true`, the names must match. - pub match_field_names: bool, -} - -impl Default for RecordBatchOptions { - fn default() -> Self { - Self { - match_field_names: true, - } - } -} - -impl From for RecordBatch { - /// # Panics iff the null count of the array is not null. - fn from(array: StructArray) -> Self { - assert!(array.null_count() == 0); - let (fields, values, _) = array.into_data(); - RecordBatch { - schema: Arc::new(Schema::new(fields)), - columns: values, - } - } -} - -impl From for StructArray { - fn from(batch: RecordBatch) -> Self { - let (fields, values) = batch - .schema - .fields - .iter() - .zip(batch.columns.iter()) - .map(|t| (t.0.clone(), t.1.clone())) - .unzip(); - StructArray::from_data(DataType::Struct(fields), values, None) - } -} diff --git a/testing/arrow-testing b/testing/arrow-testing index e8ce32338f2..83ada4ec0f2 160000 --- a/testing/arrow-testing +++ b/testing/arrow-testing @@ -1 +1 @@ -Subproject commit e8ce32338f2dfeca3a5126f7677bdee159604000 +Subproject commit 83ada4ec0f2cfe36f4168628d7f470e6199e663a diff --git a/testing/parquet-testing b/testing/parquet-testing index 8e7badc6a38..d4d485956a6 160000 --- a/testing/parquet-testing +++ b/testing/parquet-testing @@ -1 +1 @@ -Subproject commit 8e7badc6a3817a02e06d17b5d8ab6b6dc356e890 +Subproject commit d4d485956a643c693b5549e1a62d52ca61c170f1 diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index bd75048efc4..4bd43676d3b 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use arrow2::columns::Columns; use avro_rs::types::{Record, Value}; use avro_rs::{Codec, Writer}; use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema}; @@ -8,7 +9,6 @@ use arrow2::array::*; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::avro::read; -use arrow2::record_batch::RecordBatch; pub(super) fn schema() -> (AvroSchema, Schema) { let raw_schema = r#" @@ -69,7 +69,7 @@ pub(super) fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -pub(super) fn data() -> RecordBatch { +pub(super) fn data() -> Columns> { let data = vec![ Some(vec![Some(1i32), None, Some(3)]), Some(vec![Some(1i32), None, Some(3)]), @@ -80,21 +80,21 @@ pub(super) fn data() -> RecordBatch { let columns = vec![ Arc::new(Int64Array::from_slice([27, 47])) as Arc, - Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, - Arc::new(Int32Array::from_slice([1, 1])) as Arc, - Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc, - Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])) as Arc, - Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])) as Arc, - Arc::new(BooleanArray::from_slice([true, false])) as Arc, - Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, + Arc::new(Utf8Array::::from_slice(["foo", "bar"])), + Arc::new(Int32Array::from_slice([1, 1])), + Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)), + Arc::new(BinaryArray::::from_slice([b"foo", b"bar"])), + Arc::new(PrimitiveArray::::from_slice([1.0, 2.0])), + Arc::new(BooleanArray::from_slice([true, false])), + Arc::new(Utf8Array::::from([Some("foo"), None])), array.into_arc(), Arc::new(DictionaryArray::::from_data( Int32Array::from_slice([1, 0]), Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), - )) as Arc, + )), ]; - RecordBatch::try_new(Arc::new(schema().1), columns).unwrap() + Columns::try_new(columns).unwrap() } pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs::Error> { @@ -149,7 +149,7 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Ok(writer.into_inner().unwrap()) } -pub(super) fn read_avro(mut avro: &[u8]) -> Result { +pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Columns>, Schema)> { let file = &mut avro; let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; @@ -157,18 +157,20 @@ pub(super) fn read_avro(mut avro: &[u8]) -> Result { let mut reader = read::Reader::new( read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), avro_schema, - Arc::new(schema), + schema.fields.clone(), ); - reader.next().unwrap() + reader.next().unwrap().map(|x| (x, schema)) } fn test(codec: Codec) -> Result<()> { let avro = write_avro(codec).unwrap(); let expected = data(); + let (_, expected_schema) = schema(); - let result = read_avro(&avro)?; + let (result, schema) = read_avro(&avro)?; + assert_eq!(schema, expected_schema); assert_eq!(result, expected); Ok(()) } diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs index 5d0c688b7c7..3dd04cdb479 100644 --- a/tests/it/io/avro/read_async.rs +++ b/tests/it/io/avro/read_async.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use avro_rs::Codec; use futures::pin_mut; @@ -8,18 +6,17 @@ use futures::StreamExt; use arrow2::error::Result; use arrow2::io::avro::read_async::*; -use super::read::{data, write_avro}; +use super::read::{schema, write_avro}; async fn test(codec: Codec) -> Result<()> { let avro_data = write_avro(codec).unwrap(); - let expected = data(); + let (_, expected_schema) = schema(); let mut reader = &mut &avro_data[..]; let (_, schema, _, marker) = read_metadata(&mut reader).await?; - let schema = Arc::new(schema); - assert_eq!(schema.as_ref(), expected.schema().as_ref()); + assert_eq!(schema, expected_schema); let blocks = block_stream(&mut reader, marker).await; diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index 6e605828dab..bf0cad5e22e 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use arrow2::array::*; +use arrow2::columns::Columns; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::avro::write; -use arrow2::record_batch::RecordBatch; fn schema() -> Schema { Schema::new(vec![ @@ -19,7 +19,7 @@ fn schema() -> Schema { ]) } -fn data() -> RecordBatch { +fn data() -> Columns> { let columns = vec![ Arc::new(Int64Array::from_slice([27, 47])) as Arc, Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, @@ -31,25 +31,26 @@ fn data() -> RecordBatch { Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, ]; - RecordBatch::try_new(Arc::new(schema()), columns).unwrap() + Columns::try_new(columns).unwrap() } use super::read::read_avro; -fn write_avro>( - arrays: &[R], +fn write_avro>( + columns: &Columns, schema: &Schema, compression: Option, ) -> Result> { let avro_fields = write::to_avro_schema(schema)?; - let mut serializers = arrays + let mut serializers = columns + .arrays() .iter() - .map(|x| x.as_ref()) + .map(|x| x.borrow()) .zip(avro_fields.iter()) .map(|(array, field)| write::new_serializer(array, &field.schema)) .collect::>(); - let mut block = write::Block::new(arrays[0].as_ref().len(), vec![]); + let mut block = write::Block::new(columns.len(), vec![]); write::serialize(&mut serializers, &mut block); @@ -68,19 +69,16 @@ fn write_avro>( fn roundtrip(compression: Option) -> Result<()> { let expected = data(); + let expected_schema = schema(); - let arrays = expected.columns(); - let schema = expected.schema(); + let data = write_avro(&expected, &expected_schema, compression)?; - let data = write_avro(arrays, schema, compression)?; + let (result, read_schema) = read_avro(&data)?; - let result = read_avro(&data)?; - - assert_eq!(result.schema(), expected.schema()); + assert_eq!(expected_schema, read_schema); for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) { - assert_eq!(c1, c2); + assert_eq!(c1.as_ref(), c2.as_ref()); } - assert_eq!(result, expected); Ok(()) } diff --git a/tests/it/io/csv/read.rs b/tests/it/io/csv/read.rs index 40e5344ea3c..d7981230871 100644 --- a/tests/it/io/csv/read.rs +++ b/tests/it/io/csv/read.rs @@ -32,7 +32,7 @@ fn read() -> Result<()> { let mut rows = vec![ByteRecord::default(); 100]; let rows_read = read_rows(&mut reader, 0, &mut rows)?; - let batch = deserialize_batch( + let columns = deserialize_batch( &rows[..rows_read], schema.fields(), None, @@ -40,21 +40,16 @@ fn read() -> Result<()> { deserialize_column, )?; - let batch_schema = batch.schema(); + assert_eq!(14, columns.len()); + assert_eq!(3, columns.arrays().len()); - assert_eq!(&schema, batch_schema); - assert_eq!(14, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - let lat = batch - .column(1) + let lat = columns.arrays()[1] .as_any() .downcast_ref::() .unwrap(); assert!((57.653484 - lat.value(0)).abs() < f64::EPSILON); - let city = batch - .column(0) + let city = columns.arrays()[0] .as_any() .downcast_ref::>() .unwrap(); diff --git a/tests/it/io/csv/read_async.rs b/tests/it/io/csv/read_async.rs index 6d22004ad7c..73c2ad60078 100644 --- a/tests/it/io/csv/read_async.rs +++ b/tests/it/io/csv/read_async.rs @@ -1,5 +1,4 @@ use futures::io::Cursor; -use std::sync::Arc; use arrow2::array::*; use arrow2::error::Result; @@ -24,12 +23,12 @@ async fn read() -> Result<()> { "Aberdeen, Aberdeen City, UK",57.149651,-2.099075"#; let mut reader = AsyncReaderBuilder::new().create_reader(Cursor::new(data.as_bytes())); - let schema = Arc::new(infer_schema(&mut reader, None, true, &infer).await?); + let schema = infer_schema(&mut reader, None, true, &infer).await?; let mut rows = vec![ByteRecord::default(); 100]; let rows_read = read_rows(&mut reader, 0, &mut rows).await?; - let batch = deserialize_batch( + let columns = deserialize_batch( &rows[..rows_read], schema.fields(), None, @@ -37,21 +36,16 @@ async fn read() -> Result<()> { deserialize_column, )?; - let batch_schema = batch.schema(); + assert_eq!(14, columns.len()); + assert_eq!(3, columns.arrays().len()); - assert_eq!(&schema, batch_schema); - assert_eq!(14, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - let lat = batch - .column(1) + let lat = columns.arrays()[1] .as_any() .downcast_ref::() .unwrap(); assert!((57.653484 - lat.value(0)).abs() < f64::EPSILON); - let city = batch - .column(0) + let city = columns.arrays()[0] .as_any() .downcast_ref::>() .unwrap(); diff --git a/tests/it/io/csv/write.rs b/tests/it/io/csv/write.rs index c8a8fc0984f..c59ceadf400 100644 --- a/tests/it/io/csv/write.rs +++ b/tests/it/io/csv/write.rs @@ -2,12 +2,12 @@ use std::io::Cursor; use std::sync::Arc; use arrow2::array::*; +use arrow2::columns::Columns; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::csv::write::*; -use arrow2::record_batch::RecordBatch; -fn data() -> RecordBatch { +fn data() -> Columns> { let c1 = Utf8Array::::from_slice(["a b", "c", "d"]); let c2 = Float64Array::from([Some(123.564532), None, Some(-556132.25)]); let c3 = UInt32Array::from_slice(&[3, 2, 1]); @@ -19,28 +19,27 @@ fn data() -> RecordBatch { let keys = UInt32Array::from_slice(&[2, 0, 1]); let c7 = DictionaryArray::from_data(keys, Arc::new(c1.clone())); - RecordBatch::try_from_iter(vec![ - ("c1", Arc::new(c1) as Arc), - ("c2", Arc::new(c2) as Arc), - ("c3", Arc::new(c3) as Arc), - ("c4", Arc::new(c4) as Arc), - ("c5", Arc::new(c5) as Arc), - ("c6", Arc::new(c6) as Arc), - ("c7", Arc::new(c7) as Arc), + Columns::new(vec![ + Box::new(c1) as Box, + Box::new(c2), + Box::new(c3), + Box::new(c4), + Box::new(c5), + Box::new(c6), + Box::new(c7), ]) - .unwrap() } #[test] fn write_csv() -> Result<()> { - let batch = data(); + let columns = data(); let write = Cursor::new(Vec::::new()); let mut writer = WriterBuilder::new().from_writer(write); - write_header(&mut writer, batch.schema())?; + write_header(&mut writer, &["c1", "c2", "c3", "c4", "c5", "c6", "c7"])?; let options = SerializeOptions::default(); - write_batch(&mut writer, &batch, &options)?; + write_columns(&mut writer, &columns, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); @@ -68,7 +67,7 @@ fn write_csv_custom_options() -> Result<()> { time64_format: Some("%r".to_string()), ..Default::default() }; - write_batch(&mut writer, &batch, &options)?; + write_columns(&mut writer, &batch, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); @@ -83,7 +82,7 @@ d|-556132.25|1||2019-04-18 02:45:55.555|11:46:03 PM|c Ok(()) } -fn data_array(column: usize) -> (RecordBatch, Vec<&'static str>) { +fn data_array(column: usize) -> (Columns>, Vec<&'static str>) { let (array, expected) = match column { 0 => ( Arc::new(Utf8Array::::from_slice(["a b", "c", "d"])) as Arc, @@ -208,21 +207,18 @@ fn data_array(column: usize) -> (RecordBatch, Vec<&'static str>) { _ => todo!(), }; - ( - RecordBatch::try_from_iter(vec![("c1", array)]).unwrap(), - expected, - ) + (Columns::new(vec![array]), expected) } fn write_single(column: usize) -> Result<()> { - let (batch, data) = data_array(column); + let (columns, data) = data_array(column); let write = Cursor::new(Vec::::new()); let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); - write_header(&mut writer, batch.schema())?; + write_header(&mut writer, &["c1"])?; let options = SerializeOptions::default(); - write_batch(&mut writer, &batch, &options)?; + write_columns(&mut writer, &columns, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); @@ -230,7 +226,7 @@ fn write_single(column: usize) -> Result<()> { let mut expected = "c1\n".to_owned(); expected.push_str(&data.join("\n")); expected.push('\n'); - assert_eq!(expected, String::from_utf8(buffer).unwrap(),); + assert_eq!(expected, String::from_utf8(buffer).unwrap()); Ok(()) } diff --git a/tests/it/io/ipc/common.rs b/tests/it/io/ipc/common.rs index 52a75e0052a..6778dc92c02 100644 --- a/tests/it/io/ipc/common.rs +++ b/tests/it/io/ipc/common.rs @@ -1,9 +1,9 @@ -use std::{collections::HashMap, fs::File, io::Read}; +use std::{collections::HashMap, fs::File, io::Read, sync::Arc}; use arrow2::{ - datatypes::Schema, error::Result, io::ipc::read::read_stream_metadata, - io::ipc::read::StreamReader, io::ipc::IpcField, io::json_integration::read, - io::json_integration::ArrowJson, record_batch::RecordBatch, + array::Array, columns::Columns, datatypes::Schema, error::Result, + io::ipc::read::read_stream_metadata, io::ipc::read::StreamReader, io::ipc::IpcField, + io::json_integration::read, io::json_integration::ArrowJson, }; use flate2::read::GzDecoder; @@ -12,7 +12,7 @@ use flate2::read::GzDecoder; pub fn read_gzip_json( version: &str, file_name: &str, -) -> Result<(Schema, Vec, Vec)> { +) -> Result<(Schema, Vec, Vec>>)> { let testdata = crate::test_util::arrow_test_data(); let file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.json.gz", @@ -41,7 +41,7 @@ pub fn read_gzip_json( let batches = arrow_json .batches .iter() - .map(|batch| read::to_record_batch(&schema, &ipc_fields, batch, &dictionaries)) + .map(|batch| read::deserialize_columns(&schema, &ipc_fields, batch, &dictionaries)) .collect::>>()?; Ok((schema, ipc_fields, batches)) @@ -50,7 +50,7 @@ pub fn read_gzip_json( pub fn read_arrow_stream( version: &str, file_name: &str, -) -> (Schema, Vec, Vec) { +) -> (Schema, Vec, Vec>>) { let testdata = crate::test_util::arrow_test_data(); let mut file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.stream", diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 8cce26b81c5..4578a4b0e02 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -168,7 +168,7 @@ fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> assert_eq!(reader.schema().fields().len(), 1); reader.try_for_each(|rhs| { - assert_eq!(rhs?.num_columns(), 1); + assert_eq!(rhs?.arrays().len(), 1); Result::Ok(()) })?; Ok(()) diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 979d970c150..8156b834a59 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -1,16 +1,17 @@ use std::io::Cursor; +use std::sync::Arc; use arrow2::array::*; -use arrow2::datatypes::Schema; +use arrow2::columns::Columns; +use arrow2::datatypes::{Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::read::{read_file_metadata, FileReader}; use arrow2::io::ipc::{write::*, IpcField}; -use arrow2::record_batch::RecordBatch; use crate::io::ipc::common::read_gzip_json; fn write_( - batches: &[RecordBatch], + batches: &[Columns>], schema: &Schema, ipc_fields: Option>, compression: Option, @@ -25,18 +26,26 @@ fn write_( Ok(writer.into_inner()) } -fn round_trip(batch: RecordBatch, ipc_fields: Option>) -> Result<()> { - let (expected_schema, expected_batches) = (batch.schema().clone(), vec![batch.clone()]); - - let schema = batch.schema().clone(); - let result = write_(&[batch], &schema, ipc_fields, Some(Compression::ZSTD))?; +fn round_trip( + columns: Columns>, + schema: Schema, + ipc_fields: Option>, +) -> Result<()> { + let (expected_schema, expected_batches) = (schema.clone(), vec![columns]); + + let result = write_( + &expected_batches, + &schema, + ipc_fields, + Some(Compression::ZSTD), + )?; let mut reader = Cursor::new(result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema().clone(); let reader = FileReader::new(reader, metadata, None); - assert_eq!(schema.as_ref(), expected_schema.as_ref()); + assert_eq!(schema.as_ref(), &expected_schema); let batches = reader.collect::>>()?; @@ -329,8 +338,9 @@ fn write_boolean() -> Result<()> { None, Some(true), ])) as Arc; - let batch = RecordBatch::try_from_iter(vec![("a", array)])?; - round_trip(batch, None) + let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Columns::try_new(vec![array])?; + round_trip(columns, schema, None) } #[test] @@ -338,8 +348,9 @@ fn write_boolean() -> Result<()> { fn write_sliced_utf8() -> Result<()> { use std::sync::Arc; let array = Arc::new(Utf8Array::::from_slice(["aa", "bb"]).slice(1, 1)) as Arc; - let batch = RecordBatch::try_from_iter(vec![("a", array)])?; - round_trip(batch, None) + let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Columns::try_new(vec![array])?; + round_trip(columns, schema, None) } #[test] @@ -353,7 +364,9 @@ fn write_sliced_list() -> Result<()> { let mut array = MutableListArray::>::new(); array.try_extend(data).unwrap(); - let array = array.into_arc().slice(1, 2).into(); - let batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap(); - round_trip(batch, None) + let array: Arc = array.into_arc().slice(1, 2).into(); + + let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Columns::try_new(vec![array])?; + round_trip(columns, schema, None) } diff --git a/tests/it/io/ipc/write/stream.rs b/tests/it/io/ipc/write/stream.rs index 80fe5f505fe..db13c2b734a 100644 --- a/tests/it/io/ipc/write/stream.rs +++ b/tests/it/io/ipc/write/stream.rs @@ -1,17 +1,23 @@ use std::io::Cursor; +use std::sync::Arc; +use arrow2::array::Array; +use arrow2::columns::Columns; use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::ipc::read::read_stream_metadata; use arrow2::io::ipc::read::StreamReader; use arrow2::io::ipc::write::{StreamWriter, WriteOptions}; use arrow2::io::ipc::IpcField; -use arrow2::record_batch::RecordBatch; use crate::io::ipc::common::read_arrow_stream; use crate::io::ipc::common::read_gzip_json; -fn write_(schema: &Schema, ipc_fields: &[IpcField], batches: &[RecordBatch]) -> Vec { +fn write_( + schema: &Schema, + ipc_fields: &[IpcField], + batches: &[Columns>], +) -> Vec { let mut result = vec![]; let options = WriteOptions { compression: None }; diff --git a/tests/it/io/ipc/write_async.rs b/tests/it/io/ipc/write_async.rs index e77558ce510..6f803bb4667 100644 --- a/tests/it/io/ipc/write_async.rs +++ b/tests/it/io/ipc/write_async.rs @@ -1,11 +1,13 @@ use std::io::Cursor; +use std::sync::Arc; +use arrow2::array::Array; +use arrow2::columns::Columns; use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::ipc::read; use arrow2::io::ipc::write::stream_async; use arrow2::io::ipc::IpcField; -use arrow2::record_batch::RecordBatch; use futures::io::Cursor as AsyncCursor; use crate::io::ipc::common::read_arrow_stream; @@ -14,7 +16,7 @@ use crate::io::ipc::common::read_gzip_json; async fn write_( schema: &Schema, ipc_fields: &[IpcField], - batches: &[RecordBatch], + batches: &[Columns>], ) -> Result> { let mut result = AsyncCursor::new(vec![]); @@ -22,7 +24,7 @@ async fn write_( let mut writer = stream_async::StreamWriter::new(&mut result, options); writer.start(schema, Some(ipc_fields)).await?; for batch in batches { - writer.write(batch, Some(ipc_fields)).await?; + writer.write(batch, schema, Some(ipc_fields)).await?; } writer.finish().await?; Ok(result.into_inner()) diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index df54fa0c404..3cb5314a024 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -4,16 +4,14 @@ mod write; use std::io::Cursor; use std::sync::Arc; -use serde_json::Value; - use arrow2::array::*; +use arrow2::columns::Columns; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::json::read as json_read; use arrow2::io::json::write as json_write; -use arrow2::record_batch::RecordBatch; -fn read_batch(data: String, fields: Vec) -> Result { +fn read_batch(data: String, fields: &[Field]) -> Result>> { let mut reader = Cursor::new(data); let mut rows = vec![String::default(); 1024]; @@ -22,12 +20,15 @@ fn read_batch(data: String, fields: Vec) -> Result { json_read::deserialize(rows, fields) } -fn write_batch(batch: RecordBatch) -> Result> { +fn write_batch>( + batch: Columns, + names: Vec, +) -> Result> { let format = json_write::LineDelimited::default(); let batches = vec![Ok(batch)].into_iter(); - let blocks = json_write::Serializer::new(batches, vec![], format); + let blocks = json_write::Serializer::new(batches, names, vec![], format); let mut buf = Vec::new(); json_write::write(&mut buf, format, blocks)?; @@ -39,27 +40,16 @@ fn round_trip(data: String) -> Result<()> { let fields = json_read::infer(&mut reader, None)?; let data = reader.into_inner(); - let batch = read_batch(data.clone(), fields)?; - - let buf = write_batch(batch)?; - - let result = String::from_utf8(buf).unwrap(); - println!("{}", result); - for (r, e) in result.lines().zip(data.lines()) { - let mut result_json = serde_json::from_str::(r).unwrap(); - let expected_json = serde_json::from_str::(e).unwrap(); - if let Value::Object(e) = &expected_json { - // remove null value from object to make comparison consistent: - if let Value::Object(r) = result_json { - result_json = Value::Object( - r.into_iter() - .filter(|(k, v)| e.contains_key(k) || *v != Value::Null) - .collect(), - ); - } - assert_eq!(result_json, expected_json); - } - } + let columns = read_batch(data, &fields)?; + + let buf = write_batch( + columns.clone(), + fields.iter().map(|x| x.name().to_string()).collect(), + )?; + + let new_columns = read_batch(String::from_utf8(buf).unwrap(), &fields)?; + + assert_eq!(columns, new_columns); Ok(()) } diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index c0aa763032b..d8eccb931a9 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -11,9 +11,7 @@ use super::*; fn basic() -> Result<()> { let (data, schema, columns) = case_basics(); - let batch = read_batch(data, schema.fields.clone())?; - - assert_eq!(&schema, batch.schema().as_ref()); + let batch = read_batch(data, &schema.fields)?; columns .iter() @@ -26,9 +24,7 @@ fn basic() -> Result<()> { fn basic_projection() -> Result<()> { let (data, schema, columns) = case_basics_schema(); - let batch = read_batch(data, schema.fields.clone())?; - - assert_eq!(&schema, batch.schema().as_ref()); + let batch = read_batch(data, &schema.fields)?; columns .iter() @@ -41,9 +37,7 @@ fn basic_projection() -> Result<()> { fn lists() -> Result<()> { let (data, schema, columns) = case_list(); - let batch = read_batch(data, schema.fields.clone())?; - - assert_eq!(&schema, batch.schema().as_ref()); + let batch = read_batch(data, &schema.fields)?; columns .iter() @@ -60,10 +54,7 @@ fn line_break_in_values() -> Result<()> { {"a":null} "#; - let batch = read_batch( - data.to_string(), - vec![Field::new("a", DataType::Utf8, true)], - )?; + let batch = read_batch(data.to_string(), &[Field::new("a", DataType::Utf8, true)])?; let expected = Utf8Array::::from(&[Some("aa\n\n"), Some("aa\n"), None]); @@ -88,7 +79,7 @@ fn invalid_read_record() -> Result<()> { DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]), true, )]; - let batch = read_batch("city,lat,lng".to_string(), fields); + let batch = read_batch("city,lat,lng".to_string(), &fields); assert_eq!( batch.err().unwrap().to_string(), @@ -101,9 +92,7 @@ fn invalid_read_record() -> Result<()> { fn nested_struct_arrays() -> Result<()> { let (data, schema, columns) = case_struct(); - let batch = read_batch(data, schema.fields.clone())?; - - assert_eq!(&schema, batch.schema().as_ref()); + let batch = read_batch(data, &schema.fields)?; columns .iter() @@ -124,7 +113,6 @@ fn nested_list_arrays() -> Result<()> { ); let a_list_data_type = DataType::List(Box::new(a_struct_field)); let a_field = Field::new("a", a_list_data_type.clone(), true); - let fields = vec![a_field]; let data = r#" {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]} @@ -134,7 +122,7 @@ fn nested_list_arrays() -> Result<()> { {"a": []} "#; - let batch = read_batch(data.to_string(), fields)?; + let batch = read_batch(data.to_string(), &[a_field])?; // build expected output let d = Utf8Array::::from(&vec![ @@ -168,7 +156,7 @@ fn nested_list_arrays() -> Result<()> { Some(Bitmap::from_u8_slice([0b00010111], 5)), ); - assert_eq!(expected, batch.column(0).as_ref()); + assert_eq!(expected, batch.columns()[0].as_ref()); Ok(()) } @@ -181,14 +169,10 @@ fn skip_empty_lines() { {\"a\": 3}"; - let batch = read_batch( - data.to_string(), - vec![Field::new("a", DataType::Int64, true)], - ) - .unwrap(); + let batch = read_batch(data.to_string(), &[Field::new("a", DataType::Int64, true)]).unwrap(); - assert_eq!(1, batch.num_columns()); - assert_eq!(3, batch.num_rows()); + assert_eq!(1, batch.arrays().len()); + assert_eq!(3, batch.len()); } #[test] @@ -208,9 +192,7 @@ fn row_type_validation() { fn list_of_string_dictionary_from_with_nulls() -> Result<()> { let (data, schema, columns) = case_dict(); - let batch = read_batch(data, schema.fields.clone())?; - - assert_eq!(&schema, batch.schema().as_ref()); + let batch = read_batch(data, &schema.fields)?; assert_eq!(columns[0].as_ref(), batch.columns()[0].as_ref()); Ok(()) diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 9bbec6dd155..3a6f6019bda 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -4,26 +4,20 @@ use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, - datatypes::{DataType, Field, Schema}, + datatypes::{DataType, Field}, error::Result, - record_batch::RecordBatch, }; use super::*; #[test] fn write_simple_rows() -> Result<()> { - let schema = Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Utf8, false), - ]); - let a = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); let b = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); + let batch = Columns::try_new(vec![&a as &dyn Array, &b]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string(), "c2".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -47,10 +41,6 @@ fn write_nested_struct_with_validity() -> Result<()> { Field::new("c11", DataType::Int32, false), Field::new("c12", DataType::Struct(inner.clone()), false), ]; - let schema = Schema::new(vec![ - Field::new("c1", DataType::Struct(fields.clone()), false), - Field::new("c2", DataType::Utf8, false), - ]); let c1 = StructArray::from_data( DataType::Struct(fields), @@ -69,9 +59,9 @@ fn write_nested_struct_with_validity() -> Result<()> { ); let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string(), "c2".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -90,10 +80,6 @@ fn write_nested_structs() -> Result<()> { Field::new("c11", DataType::Int32, false), Field::new("c12", DataType::Struct(vec![c121.clone()]), false), ]; - let schema = Schema::new(vec![ - Field::new("c1", DataType::Struct(fields.clone()), false), - Field::new("c2", DataType::Utf8, false), - ]); let c1 = StructArray::from_data( DataType::Struct(fields), @@ -114,9 +100,9 @@ fn write_nested_structs() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string(), "c2".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -130,11 +116,6 @@ fn write_nested_structs() -> Result<()> { #[test] fn write_struct_with_list_field() -> Result<()> { - let list_datatype = DataType::List(Box::new(Field::new("c_list", DataType::Utf8, false))); - let field_c1 = Field::new("c1", list_datatype, false); - let field_c2 = Field::new("c2", DataType::Int32, false); - let schema = Schema::new(vec![field_c1, field_c2]); - let iter = vec![vec!["a", "a1"], vec!["b"], vec!["c"], vec!["d"], vec!["e"]]; let iter = iter @@ -151,9 +132,9 @@ fn write_struct_with_list_field() -> Result<()> { let b = PrimitiveArray::from_slice([1, 2, 3, 4, 5]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); + let batch = Columns::try_new(vec![&a as &dyn Array, &b]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string(), "c2".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -169,12 +150,6 @@ fn write_struct_with_list_field() -> Result<()> { #[test] fn write_nested_list() -> Result<()> { - let list_inner = DataType::List(Box::new(Field::new("b", DataType::Int32, false))); - let list_datatype = DataType::List(Box::new(Field::new("a", list_inner, false))); - let field_c1 = Field::new("c1", list_datatype, true); - let field_c2 = Field::new("c2", DataType::Utf8, true); - let schema = Schema::new(vec![field_c1, field_c2]); - let iter = vec![ vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])], vec![], @@ -197,9 +172,9 @@ fn write_nested_list() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string(), "c2".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -223,9 +198,6 @@ fn write_list_of_struct() -> Result<()> { DataType::Struct(fields.clone()), false, ))); - let field_c1 = Field::new("c1", c1_datatype.clone(), true); - let field_c2 = Field::new("c2", DataType::Int32, false); - let schema = Schema::new(vec![field_c1, field_c2]); let s = StructArray::from_data( DataType::Struct(fields), @@ -257,9 +229,9 @@ fn write_list_of_struct() -> Result<()> { let c2 = Int32Array::from_slice(&[1, 2, 3]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string(), "c2".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -273,12 +245,11 @@ fn write_list_of_struct() -> Result<()> { #[test] fn write_escaped_utf8() -> Result<()> { - let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]); let a = Utf8Array::::from(&vec![Some("a\na"), None]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + let batch = Columns::try_new(vec![&a as &dyn Array]).unwrap(); - let buf = write_batch(batch)?; + let buf = write_batch(batch, vec!["c1".to_string()])?; assert_eq!( String::from_utf8(buf).unwrap().as_bytes(), diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index ada15b32615..954838ee3e2 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -3,9 +3,8 @@ use std::sync::Arc; use arrow2::error::ArrowError; use arrow2::{ - array::*, bitmap::Bitmap, buffer::Buffer, datatypes::*, error::Result, + array::*, bitmap::Bitmap, buffer::Buffer, columns::Columns, datatypes::*, error::Result, io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, - record_batch::RecordBatch, }; use crate::io::ipc::read_gzip_json; @@ -621,7 +620,7 @@ pub fn pyarrow_struct_statistics(column: usize) -> Option> { } /// Round-trip with parquet using the same integration files used for IPC integration tests. -fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result> { +fn integration_write(schema: &Schema, batches: &[Columns>]) -> Result> { let options = WriteOptions { write_statistics: true, compression: Compression::Uncompressed, @@ -668,7 +667,7 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result Ok(writer.into_inner()) } -fn integration_read(data: &[u8]) -> Result<(Arc, Vec)> { +fn integration_read(data: &[u8]) -> Result<(Arc, Vec>>)> { let reader = Cursor::new(data); let reader = RecordReader::try_new(reader, None, None, None, None)?; let schema = reader.schema().clone(); @@ -719,10 +718,7 @@ fn arrow_type() -> Result<()> { Field::new("a1", dt1, true), Field::new("a2", array2.data_type().clone(), true), ]); - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(array), Arc::new(array2)], - )?; + let batch = Columns::try_new(vec![Arc::new(array) as Arc, Arc::new(array2)])?; let r = integration_write(&schema, &[batch.clone()])?; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 800770248ec..ee4b7723b20 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -373,15 +373,13 @@ fn all_types() -> Result<()> { let batches = reader.collect::>>()?; assert_eq!(batches.len(), 1); - let result = batches[0] - .column(0) + let result = batches[0].columns()[0] .as_any() .downcast_ref::() .unwrap(); assert_eq!(result, &Int32Array::from_slice([4, 5, 6, 7, 2, 3, 0, 1])); - let result = batches[0] - .column(6) + let result = batches[0].columns()[6] .as_any() .downcast_ref::() .unwrap(); @@ -390,8 +388,7 @@ fn all_types() -> Result<()> { &Float32Array::from_slice([0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]) ); - let result = batches[0] - .column(9) + let result = batches[0].columns()[9] .as_any() .downcast_ref::>() .unwrap(); diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 4feb8405d52..a7733a1860a 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -1,7 +1,7 @@ use std::io::Cursor; +use arrow2::error::Result; use arrow2::io::parquet::write::*; -use arrow2::{error::Result, record_batch::RecordBatch}; use super::*; @@ -42,10 +42,7 @@ fn round_trip( let parquet_schema = to_parquet_schema(&schema)?; - let iter = vec![RecordBatch::try_new( - Arc::new(schema.clone()), - vec![array.clone()], - )]; + let iter = vec![Columns::try_new(vec![array.clone()])]; let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![encoding])?; diff --git a/tests/it/io/print.rs b/tests/it/io/print.rs index b1fd5f5040c..5497e112915 100644 --- a/tests/it/io/print.rs +++ b/tests/it/io/print.rs @@ -1,33 +1,23 @@ use std::sync::Arc; use arrow2::{ - array::*, bitmap::Bitmap, buffer::Buffer, datatypes::*, error::Result, io::print::*, - record_batch::RecordBatch, + array::*, + bitmap::Bitmap, + buffer::Buffer, + columns::Columns, + datatypes::{DataType, Field, TimeUnit, UnionMode}, + error::Result, + io::print::*, }; #[test] fn write_basics() -> Result<()> { - // define a schema. - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, true), - Field::new("b", DataType::Int32, true), - ])); - - // define data. - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(Utf8Array::::from(vec![ - Some("a"), - Some("b"), - None, - Some("d"), - ])), - Arc::new(Int32Array::from(vec![Some(1), None, Some(10), Some(100)])), - ], - )?; - - let table = write(&[batch]); + let a = Utf8Array::::from(vec![Some("a"), Some("b"), None, Some("d")]); + let b = Int32Array::from(vec![Some(1), None, Some(10), Some(100)]); + + let batch = Columns::try_new(vec![&a as &dyn Array, &b])?; + + let table = write(&[batch], &["a".to_string(), "b".to_string()]); let expected = vec![ "+---+-----+", @@ -49,23 +39,16 @@ fn write_basics() -> Result<()> { #[test] fn write_null() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Null, true), - ])); - let num_rows = 4; - let arrays = schema - .fields() + let arrays = [DataType::Utf8, DataType::Int32, DataType::Null] .iter() - .map(|f| new_null_array(f.data_type().clone(), num_rows).into()) + .map(|dt| new_null_array(dt.clone(), num_rows)) .collect(); // define data (null) - let batch = RecordBatch::try_new(schema, arrays)?; + let columns = Columns::try_new(arrays)?; - let table = write(&[batch]); + let table = write(&[columns], &["a", "b", "c"]); let expected = vec![ "+---+---+---+", @@ -86,18 +69,14 @@ fn write_null() -> Result<()> { #[test] fn write_dictionary() -> Result<()> { - // define a schema. - let field_type = DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8), false); - let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)])); - let mut array = MutableDictionaryArray::>::new(); array.try_extend(vec![Some("one"), None, Some("three")])?; - let array = array.into_arc(); + let array = array.into_box(); - let batch = RecordBatch::try_new(schema, vec![array])?; + let batch = Columns::new(vec![array]); - let table = write(&[batch]); + let table = write(&[batch], &["d1"]); let expected = vec![ "+-------+", @@ -118,17 +97,13 @@ fn write_dictionary() -> Result<()> { #[test] fn dictionary_validities() -> Result<()> { - // define a schema. - let field_type = DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Int32), false); - let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)])); - let keys = PrimitiveArray::::from([Some(1), None, Some(0)]); let values = PrimitiveArray::::from([None, Some(10)]); let array = DictionaryArray::::from_data(keys, Arc::new(values)); - let batch = RecordBatch::try_new(schema, vec![Arc::new(array)])?; + let columns = Columns::new(vec![&array as &dyn Array]); - let table = write(&[batch]); + let table = write(&[columns], &["d1"]); let expected = vec![ "+----+", "| d1 |", "+----+", "| 10 |", "| |", "| |", "+----+", @@ -146,16 +121,10 @@ fn dictionary_validities() -> Result<()> { /// formatting that array with `write` macro_rules! check_datetime { ($ty:ty, $datatype:expr, $value:expr, $EXPECTED_RESULT:expr) => { - let array = Arc::new(PrimitiveArray::<$ty>::from(&[Some($value), None]).to($datatype)); - - let schema = Arc::new(Schema::new(vec![Field::new( - "f", - array.data_type().clone(), - true, - )])); - let batch = RecordBatch::try_new(schema, vec![array]).unwrap(); + let array = PrimitiveArray::<$ty>::from(&[Some($value), None]).to($datatype); + let batch = Columns::new(vec![&array as &dyn Array]); - let table = write(&[batch]); + let table = write(&[batch], &["f"]); let expected = $EXPECTED_RESULT; let actual: Vec<&str> = table.lines().collect(); @@ -362,11 +331,9 @@ fn write_struct() -> Result<()> { let array = StructArray::from_data(DataType::Struct(fields), values, validity); - let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Columns::new(vec![&array as &dyn Array]); - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; - - let table = write(&[batch]); + let table = write(&[columns], &["a"]); let expected = vec![ "+--------------+", @@ -400,11 +367,9 @@ fn write_union() -> Result<()> { let array = UnionArray::from_data(data_type, types, fields, None); - let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); - - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + let batch = Columns::new(vec![&array as &dyn Array]); - let table = write(&[batch]); + let table = write(&[batch], &["a"]); let expected = vec![ "+---+", "| a |", "+---+", "| 1 |", "| |", "| c |", "+---+", diff --git a/tests/it/record_batch.rs b/tests/it/record_batch.rs deleted file mode 100644 index 8bda0484b88..00000000000 --- a/tests/it/record_batch.rs +++ /dev/null @@ -1,108 +0,0 @@ -use arrow2::array::*; -use arrow2::datatypes::*; -use arrow2::record_batch::RecordBatch; - -#[test] -fn basic() { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), - ]); - - let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]); - let b = Utf8Array::::from_slice(&["a", "b", "c", "d", "e"]); - - let record_batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); - check_batch(record_batch) -} - -fn check_batch(record_batch: RecordBatch) { - assert_eq!(5, record_batch.num_rows()); - assert_eq!(2, record_batch.num_columns()); - assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type()); - assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type()); - assert_eq!(5, record_batch.column(0).len()); - assert_eq!(5, record_batch.column(1).len()); -} - -#[test] -fn try_from_iter() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(1), - Some(2), - None, - Some(4), - Some(5), - ])); - let b: ArrayRef = Arc::new(Utf8Array::::from_slice(&["a", "b", "c", "d", "e"])); - - let record_batch = - RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).expect("valid conversion"); - - let expected_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Utf8, false), - ]); - assert_eq!(record_batch.schema().as_ref(), &expected_schema); - check_batch(record_batch); -} - -#[test] -fn try_from_iter_with_nullable() { - let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5])); - let b: ArrayRef = Arc::new(Utf8Array::::from_slice(&["a", "b", "c", "d", "e"])); - - // Note there are no nulls in a or b, but we specify that b is nullable - let record_batch = - RecordBatch::try_from_iter_with_nullable(vec![("a", a, false), ("b", b, true)]) - .expect("valid conversion"); - - let expected_schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, true), - ]); - assert_eq!(record_batch.schema().as_ref(), &expected_schema); - check_batch(record_batch); -} - -#[test] -fn type_mismatch() { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let a = Int64Array::from_slice(&[1, 2, 3, 4, 5]); - - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]); - assert!(batch.is_err()); -} - -#[test] -fn number_of_fields_mismatch() { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]); - let b = Int32Array::from_slice(&[1, 2, 3, 4, 5]); - - let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]); - assert!(batch.is_err()); -} - -#[test] -fn from_struct_array() { - let boolean = Arc::new(BooleanArray::from_slice(&[false, false, true, true])) as ArrayRef; - let int = Arc::new(Int32Array::from_slice(&[42, 28, 19, 31])) as ArrayRef; - - let fields = vec![ - Field::new("b", DataType::Boolean, false), - Field::new("c", DataType::Int32, false), - ]; - - let array = StructArray::from_data(fields.clone(), vec![boolean.clone(), int.clone()], None); - - let batch = RecordBatch::from(array); - assert_eq!(2, batch.num_columns()); - assert_eq!(4, batch.num_rows()); - assert_eq!(&fields, batch.schema().fields()); - assert_eq!(boolean.as_ref(), batch.column(0).as_ref()); - assert_eq!(int.as_ref(), batch.column(1).as_ref()); -}