Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Removed RecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 28, 2021
1 parent bb5d46f commit c070ad7
Show file tree
Hide file tree
Showing 61 changed files with 452 additions and 1,009 deletions.
11 changes: 4 additions & 7 deletions benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::compute::filter::{build_filter, filter, filter_record_batch, Filter};
use arrow2::columns::Columns;
use arrow2::compute::filter::{build_filter, filter, filter_columns, Filter};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn bench_filter(data_array: &dyn Array, filter_array: &BooleanArray) {
Expand Down Expand Up @@ -125,13 +125,10 @@ fn add_benchmark(c: &mut Criterion) {

let data_array = create_primitive_array::<f32>(size, 0.0);

let field = Field::new("c1", data_array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data_array)]).unwrap();
let columns = Columns::try_new(vec![Arc::new(data_array)]).unwrap();

c.bench_function("filter single record batch", |b| {
b.iter(|| filter_record_batch(&batch, &filter_array))
b.iter(|| filter_record_batch(&columns, &filter_array))
});
}

Expand Down
8 changes: 4 additions & 4 deletions benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write::*;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?;
let columns = Columns::try_new(vec![clone(array).into()])?;

let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema, Default::default())?;
let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?;

writer.write(&batch)
writer.write(&columns, None)
}

fn add_benchmark(c: &mut Criterion) {
Expand Down
24 changes: 12 additions & 12 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,53 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::error::Result;
use arrow2::io::json::write;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;

fn write_batch(batch: &RecordBatch) -> Result<()> {
fn write_batch(columns: &Columns<Arc<dyn Array>>) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();

let batches = vec![Ok(batch.clone())].into_iter();
let batches = vec![Ok(columns.clone())].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec![], format);
let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;

Ok(())
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
RecordBatch::try_from_iter([("a", Arc::new(array) as Arc<dyn Array>)]).unwrap()
fn make_columns(array: impl Array + 'static) -> Columns<Arc<dyn Array>> {
Columns::new(vec![Arc::new(array) as Arc<dyn Array>])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let batch = make_batch(array);
let columns = make_columns(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
b.iter(|| write_batch(&columns))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);
let columns = make_columns(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
b.iter(|| write_batch(&columns))
});

let array = create_primitive_array::<f64>(size, 0.1);
let batch = make_batch(array);
let columns = make_columns(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
b.iter(|| write_batch(&columns))
});
});
}
Expand Down
6 changes: 3 additions & 3 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::io::Cursor;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
use arrow2::columns::Columns;
use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let batch = RecordBatch::try_from_iter([("c1", clone(array).into())])?;
let columns = Columns::new(vec![clone(array).into()]);
let schema = batch.schema().clone();

let options = WriteOptions {
Expand All @@ -19,7 +19,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
};

let row_groups = RowGroupIterator::try_new(
vec![Ok(batch)].into_iter(),
vec![Ok(columns)].into_iter(),
&schema,
options,
vec![encoding],
Expand Down
9 changes: 4 additions & 5 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::avro::read;
Expand All @@ -20,12 +19,12 @@ fn main() -> Result<()> {
let reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
Arc::new(schema),
schema.fields,
);

for batch in reader {
let batch = batch?;
assert!(batch.len() > 0);
for maybe_columns in reader {
let columns = maybe_columns?;
assert!(!columns.is_empty());
}
Ok(())
}
2 changes: 1 addition & 1 deletion examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn main() -> Result<()> {
deserialize(&decompressed, schema.fields(), &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(batch.len() > 0);
assert!(!batch.is_empty());
}

Ok(())
Expand Down
22 changes: 10 additions & 12 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::sync::Arc;

use arrow2::{
array::Int32Array,
datatypes::{Field, Schema},
array::{Array, Int32Array},
columns::Columns,
error::Result,
io::csv::write,
record_batch::RecordBatch,
};

fn write_batch(path: &str, batches: &[RecordBatch]) -> Result<()> {
fn write_batch<A: std::borrow::Borrow<dyn Array>>(
path: &str,
columns: &[Columns<A>],
) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, batches[0].schema())?;
write::write_header(writer, &["c1"])?;

let options = write::SerializeOptions::default();
batches
columns
.iter()
.try_for_each(|batch| write::write_batch(writer, batch, &options))
.try_for_each(|batch| write::write_columns(writer, batch, &options))
}

fn main() -> Result<()> {
Expand All @@ -29,9 +29,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
let batch = Columns::try_new(vec![&array as &dyn Array])?;

write_batch("example.csv", &[batch])
}
15 changes: 6 additions & 9 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ use std::sync::Arc;
use std::thread;

use arrow2::{
array::Int32Array,
datatypes::{Field, Schema},
array::{Array, Int32Array},
columns::Columns,
error::Result,
io::csv::write,
record_batch::RecordBatch,
};

fn parallel_write(path: &str, batches: [RecordBatch; 2]) -> Result<()> {
fn parallel_write(path: &str, batches: [Columns<Arc<dyn Array>>; 2]) -> Result<()> {
let options = write::SerializeOptions::default();

// write a header
let writer = &mut write::WriterBuilder::new().from_path(path)?;
write::write_header(writer, batches[0].schema())?;
write::write_header(writer, &["c1"])?;

// prepare a channel to send serialized records from threads
let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
Expand Down Expand Up @@ -61,9 +60,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
let columns = Columns::new(vec![Arc::new(array) as Arc<dyn Array>]);

parallel_write("example.csv", [batch.clone(), batch])
parallel_write("example.csv", [columns.clone(), columns])
}
6 changes: 3 additions & 3 deletions examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::io::{Cursor, Seek, Write};
use std::sync::Arc;

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write;
use arrow2::record_batch::RecordBatch;

fn main() -> Result<()> {
// declare an extension.
Expand Down Expand Up @@ -40,14 +40,14 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
let batch = Columns::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;

writer.write(&batch, None)?;

Ok(writer.into_inner())
}

fn read_ipc(buf: &[u8]) -> Result<RecordBatch> {
fn read_ipc(buf: &[u8]) -> Result<Columns<Arc<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
Expand Down
17 changes: 12 additions & 5 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::columns::Columns;
use arrow2::datatypes::Schema;
use arrow2::error::Result;
use arrow2::io::ipc::read::{read_file_metadata, FileReader};
use arrow2::io::print;
use arrow2::record_batch::RecordBatch;

fn read_batches(path: &str) -> Result<Vec<RecordBatch>> {
fn read_batches(path: &str) -> Result<(Schema, Vec<Columns<Arc<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
let metadata = read_file_metadata(&mut file)?;

let schema = metadata.schema().as_ref().clone();

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(file, metadata, None);

reader.collect()
let columns = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, columns))
}

fn main() -> Result<()> {
Expand All @@ -23,7 +29,8 @@ fn main() -> Result<()> {

let file_path = &args[1];

let batches = read_batches(file_path)?;
print::print(&batches);
let (schema, batches) = read_batches(file_path)?;
let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));
Ok(())
}
12 changes: 6 additions & 6 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::{Int32Array, Utf8Array};
use arrow2::array::{Array, Int32Array, Utf8Array};
use arrow2::columns::Columns;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;
use arrow2::record_batch::RecordBatch;

fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> {
fn write_batches(path: &str, schema: &Schema, columns: &[Columns<Arc<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(file, schema, None, options)?;

for batch in batches {
writer.write(batch, None)?
for columns in columns {
writer.write(columns, None)?
}
writer.finish()
}
Expand All @@ -34,7 +34,7 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a), Arc::new(b)])?;
let batch = Columns::try_new(vec![Arc::new(a) as Arc<dyn Array>, Arc::new(b)])?;

// write it
write_batches(file_path, &schema, &[batch])?;
Expand Down
2 changes: 1 addition & 1 deletion examples/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;

use arrow2::datatypes::{DataType, Field, Schema};

Expand Down
Loading

0 comments on commit c070ad7

Please sign in to comment.