Skip to content

Commit

Permalink
Add parquet example
Browse files Browse the repository at this point in the history
  • Loading branch information
mbrobbel committed Dec 11, 2023
1 parent bc0f459 commit 094f3a0
Show file tree
Hide file tree
Showing 26 changed files with 1,505 additions and 31 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@ categories.workspace = true

[features]
default = ["arrow-rs", "derive"]
arrow-rs = ["dep:arrow-array", "dep:arrow-buffer", "dep:arrow-schema"]
arrow-rs = ["dep:arrow-array", "dep:arrow-buffer", "dep:arrow-schema", "narrow-derive/arrow-rs"]
derive = ["dep:narrow-derive"]

[dependencies]
# arrow-array = { version = "49.0.0", optional = true }
# arrow-buffer = { version = "49.0.0", optional = true }
# arrow-schema = { version = "49.0.0", optional = true }
arrow-array = { git = "https://github.com/apache/arrow-rs", branch = "master", optional = true }
arrow-buffer = { git = "https://github.com/apache/arrow-rs", branch = "master", optional = true }
arrow-schema = { git = "https://github.com/apache/arrow-rs", branch = "master", optional = true }
arrow-array = { git = "https://github.com/mbrobbel/arrow-rs", branch = "buffer-builder-scalar-buffer", optional = true }
# arrow-buffer = { git = "https://github.com/apache/arrow-rs", branch = "master", optional = true }
arrow-buffer = { git = "https://github.com/mbrobbel/arrow-rs", branch = "buffer-builder-scalar-buffer", optional = true }
arrow-schema = { git = "https://github.com/mbrobbel/arrow-rs", branch = "buffer-builder-scalar-buffer", optional = true }
narrow-derive = { path = "narrow-derive", version = "^0.3.4", optional = true }

[dev-dependencies]
# arrow-cast = { version = "49.0.0", default-features = false, features = ["prettyprint"] }
arrow-cast = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["prettyprint"] }
arrow-cast = { git = "https://github.com/mbrobbel/arrow-rs", branch = "buffer-builder-scalar-buffer", default-features = false, features = ["prettyprint"] }
bytes = "1.5.0"
criterion = { version = "0.5.1", default-features = false }
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", features = ["arrow"] }
parquet = { git = "https://github.com/mbrobbel/arrow-rs", branch = "buffer-builder-scalar-buffer", features = ["arrow"] }
# parquet = { version = "49.0.0", default-features = false, features = ["arrow"] }

[profile.bench]
Expand Down
56 changes: 56 additions & 0 deletions examples/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
fn main() {
use arrow_array::RecordBatch;
use arrow_cast::pretty;
use bytes::Bytes;
use narrow::{array::StructArray, arrow::buffer_builder::ArrowBufferBuilder, ArrayType};
use parquet::arrow::{arrow_reader::ParquetRecordBatchReader, ArrowWriter};

#[derive(ArrayType, Default)]
struct Bar(Option<bool>);

#[derive(ArrayType, Default)]
struct Foo {
a: u32,
b: Option<u8>,
c: bool,
d: String,
e: Option<Vec<Option<bool>>>,
f: Bar,
}
let input = [
Foo {
a: 1,
b: Some(2),
c: true,
d: "hello world!".to_string(),
e: Some(vec![Some(true), None]),
f: Bar(Some(true)),
},
Foo {
a: 42,
b: None,
c: false,
d: "narrow".to_string(),
e: None,
f: Bar(None),
},
];

let narrow_array = input
.into_iter()
.collect::<StructArray<Foo, false, ArrowBufferBuilder>>();

let arrow_struct_array = arrow_array::StructArray::from(narrow_array);
let record_batch = RecordBatch::from(arrow_struct_array);
pretty::print_batches(&[record_batch.clone()]).unwrap();

let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None).unwrap();
writer.write(&record_batch).unwrap();
writer.close().unwrap();

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
let read = reader.next().unwrap().unwrap();
pretty::print_batches(&[read.clone()]).unwrap();
assert_eq!(record_batch, read);
}

Check warning on line 56 in examples/parquet.rs

View check run for this annotation

Codecov / codecov/patch

examples/parquet.rs#L1-L56

Added lines #L1 - L56 were not covered by tests
4 changes: 4 additions & 0 deletions narrow-derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ license.workspace = true
keywords.workspace = true
categories.workspace = true

[features]
default = ["arrow-rs"]
arrow-rs = []

[lib]
proc-macro = true

Expand Down
132 changes: 129 additions & 3 deletions narrow-derive/src/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use quote::{format_ident, quote, ToTokens, TokenStreamExt};
use std::iter::{Enumerate, Map};
use syn::{
parse2, parse_quote, punctuated, token::Paren, visit_mut::VisitMut, DeriveInput, Field, Fields,
File, Generics, Ident, Index, ItemImpl, ItemStruct, Type, TypeParamBound, Visibility,
WherePredicate,
Generics, Ident, Index, ItemImpl, ItemStruct, Type, TypeParamBound, Visibility, WherePredicate,
};

pub(super) fn derive(input: &DeriveInput, fields: &Fields) -> TokenStream {
Expand All @@ -23,6 +22,12 @@ pub(super) fn derive(input: &DeriveInput, fields: &Fields) -> TokenStream {
// Generate the StructArrayType impl.
let struct_array_type_impl = input.struct_array_type_impl();

// Optionally generate the StructArrayTypeFields impl.
let struct_array_type_fields_impl = input.struct_array_type_fields_impl();

// Optionally generates the conversion to vec of array refs
let struct_array_into_array_refs = input.struct_array_into_array_refs();

// Generate the array wrapper struct definition.
let array_struct_def = input.array_struct_def();

Expand All @@ -45,6 +50,10 @@ pub(super) fn derive(input: &DeriveInput, fields: &Fields) -> TokenStream {

#struct_array_type_impl

#struct_array_type_fields_impl

#struct_array_into_array_refs

#array_struct_def

#array_default_impl
Expand Down Expand Up @@ -164,7 +173,7 @@ impl Struct<'_> {
}

/// Add an `StructArrayType` implementation for the derive input.
fn struct_array_type_impl(&self) -> File {
fn struct_array_type_impl(&self) -> ItemImpl {
let narrow = util::narrow();

// Generics
Expand All @@ -188,6 +197,111 @@ impl Struct<'_> {
parse2(tokens).expect("struct_array_type_impl")
}

/// Add an `StructArrayTypeFields` implementation for the derive input.

Check warning on line 200 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L200

Added line #L200 was not covered by tests
fn struct_array_type_fields_impl(&self) -> ItemImpl {
let narrow = util::narrow();

// Generics
let mut generics = self.generics.clone();
SelfReplace::new(self.ident, &generics).visit_generics_mut(&mut generics);
AddTypeParamBound(Self::array_type_bound()).visit_generics_mut(&mut generics);
AddTypeParam(parse_quote!(Buffer: #narrow::buffer::BufferType))
.visit_generics_mut(&mut generics);
generics
.make_where_clause()
.predicates
.extend(self.where_predicate_fields(parse_quote!(#narrow::arrow::ArrowArray)));
let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();

// Fields
let field_ident = self.field_idents().map(|ident| ident.to_string());
let field_ty = self.field_types();
let fields = quote!(
#(
::std::sync::Arc::new(<<#field_ty as ::narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA> as #narrow::arrow::ArrowArray>::as_field(#field_ident)),
)*
);

Check warning on line 224 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L224

Added line #L224 was not covered by tests
let ident = self.array_struct_ident();
let tokens = quote!(
impl #impl_generics #narrow::arrow::StructArrayTypeFields for #ident #ty_generics #where_clause {
fn fields() -> ::arrow_schema::Fields {
::arrow_schema::Fields::from([
#fields
])
}
}
);
parse2(tokens).expect("struct_array_type_fields_impl")
}

/// Add an `Into` implementation for the array to convert to a vec of array refs

Check warning on line 238 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L237-L238

Added lines #L237 - L238 were not covered by tests
fn struct_array_into_array_refs(&self) -> ItemImpl {
let narrow = util::narrow();

// Generics
let mut generics = self.generics.clone();
SelfReplace::new(self.ident, &generics).visit_generics_mut(&mut generics);
AddTypeParamBound(Self::array_type_bound()).visit_generics_mut(&mut generics);
AddTypeParam(parse_quote!(Buffer: #narrow::buffer::BufferType))
.visit_generics_mut(&mut generics);
generics
.make_where_clause()
.predicates
.extend(self.where_predicate_fields_arrow_array_into());
let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();

// Fields
let field_ty = self.field_types();
let field_arrays = match self.fields {
Fields::Named(_) => {

Check warning on line 257 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L257

Added line #L257 was not covered by tests
let field_ident = self.field_idents();
quote!(
#(
::std::sync::Arc::<
<<#field_ty as #narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA> as #narrow::arrow::ArrowArray>::Array
>::new(value.#field_ident.into()),
)*
)
}

Check warning on line 266 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L266

Added line #L266 was not covered by tests
Fields::Unnamed(_) => {
let field_idx = self
.fields
.iter()
.enumerate()
.map(|(idx, _)| Index::from(idx));
quote!(
#(
::std::sync::Arc::<
<<#field_ty as #narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA> as #narrow::arrow::ArrowArray>::Array
>::new(value.#field_idx.into()),
)*
)
}
Fields::Unit => {
quote!(
#(
::std::sync::Arc::<
<<#field_ty as #narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA> as #narrow::arrow::ArrowArray>::Array
>::new(value.0.into())
)*
)
}
};

Check warning on line 291 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L282-L291

Added lines #L282 - L291 were not covered by tests
let ident = self.array_struct_ident();
let tokens = quote!(
impl #impl_generics ::std::convert::From<#ident #ty_generics> for ::std::vec::Vec<::std::sync::Arc<dyn ::arrow_array::Array>> #where_clause {
fn from(value: #ident #ty_generics) -> Self {
vec![
#field_arrays
]
}
}
);
parse2(tokens).expect("struct_array_into_array_refs")
}

Check warning on line 304 in narrow-derive/src/struct.rs

View check run for this annotation

Codecov / codecov/patch

narrow-derive/src/struct.rs#L304

Added line #L304 was not covered by tests
/// Returns the struct definition of the Array wrapper struct.
fn array_struct_def(&self) -> ItemStruct {
let narrow = util::narrow();
Expand Down Expand Up @@ -467,6 +581,18 @@ impl Struct<'_> {
self.field_types()
.map(move |ty| parse_quote!(<#ty as #narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA>: #bound))
}

fn where_predicate_fields_arrow_array_into(&self) -> impl Iterator<Item = WherePredicate> + '_ {
let narrow = util::narrow();
self.field_types()
.map(move |ty| parse_quote!(
<#ty as #narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA>:
::std::convert::Into<
<<#ty as #narrow::array::ArrayType>::Array<Buffer, #narrow::offset::NA, #narrow::array::union::NA>
as #narrow::arrow::ArrowArray>::Array
>
))
}
}

#[cfg(test)]
Expand Down
60 changes: 60 additions & 0 deletions narrow-derive/tests/expand/struct/named/generic.expanded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,66 @@ where
{
type Array<Buffer: narrow::buffer::BufferType> = FooArray<'a, T, Buffer>;
}
impl<
'a,
T: narrow::array::ArrayType,
Buffer: narrow::buffer::BufferType,
> narrow::arrow::StructArrayTypeFields for FooArray<'a, T, Buffer>
where
T: Copy,
<&'a T as narrow::array::ArrayType>::Array<
Buffer,
narrow::offset::NA,
narrow::array::union::NA,
>: narrow::arrow::ArrowArray,
{
fn fields() -> ::arrow_schema::Fields {
::arrow_schema::Fields::from([
::std::sync::Arc::new(
<<&'a T as ::narrow::array::ArrayType>::Array<
Buffer,
narrow::offset::NA,
narrow::array::union::NA,
> as narrow::arrow::ArrowArray>::as_field("a"),
),
])
}
}
impl<
'a,
T: narrow::array::ArrayType,
Buffer: narrow::buffer::BufferType,
> ::std::convert::From<FooArray<'a, T, Buffer>>
for ::std::vec::Vec<::std::sync::Arc<dyn ::arrow_array::Array>>
where
T: Copy,
<&'a T as narrow::array::ArrayType>::Array<
Buffer,
narrow::offset::NA,
narrow::array::union::NA,
>: ::std::convert::Into<
<<&'a T as narrow::array::ArrayType>::Array<
Buffer,
narrow::offset::NA,
narrow::array::union::NA,
> as narrow::arrow::ArrowArray>::Array,
>,
{
fn from(value: FooArray<'a, T, Buffer>) -> Self {
<[_]>::into_vec(
#[rustc_box]
::alloc::boxed::Box::new([
::std::sync::Arc::<
<<&'a T as narrow::array::ArrayType>::Array<
Buffer,
narrow::offset::NA,
narrow::array::union::NA,
> as narrow::arrow::ArrowArray>::Array,
>::new(value.a.into()),
]),
)
}
}
struct FooArray<'a, T: narrow::array::ArrayType, Buffer: narrow::buffer::BufferType>
where
T: Copy,
Expand Down
Loading

0 comments on commit 094f3a0

Please sign in to comment.