Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed guide and improved examples #1247

Merged
merged 4 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ jobs:
uses: peaceiris/[email protected]
with:
personal_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./target/doc/arrow2
publish_dir: ./target/doc
destination_dir: ./${{github.ref_name}}/docs
keep_files: false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ implementation.

Check out [the guide](https://jorgecarleitao.github.io/arrow2/main/guide)
for a general introduction on how to use this crate, and
[API docs](https://jorgecarleitao.github.io/arrow2/main/docs)
[API docs](https://jorgecarleitao.github.io/arrow2/main/docs/arrow2)
for a detailed documentation of each of its APIs.

## Features
Expand Down
60 changes: 45 additions & 15 deletions examples/ipc_file_mmap.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Example showing how to memory map an Arrow IPC file into a [`Chunk`].
use std::sync::Arc;

use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow2::array::{Array, BooleanArray};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Error;

// Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which
// usually `Arc<Mmap>` supports. Here we mock it
#[derive(Clone)]
// Arrow2 requires something that implements `AsRef<[u8]>`, which
// `Mmap` supports. Here we mock it
struct Mmap(Vec<u8>);

impl AsRef<[u8]> for Mmap {
Expand All @@ -17,19 +17,49 @@ impl AsRef<[u8]> for Mmap {
}
}

fn main() -> Result<()> {
// Auxiliary function to write an arrow file
// This function is guaranteed to produce a valid arrow file
fn write(
chunks: &[Chunk<Box<dyn Array>>],
schema: &Schema,
ipc_fields: Option<Vec<arrow2::io::ipc::IpcField>>,
compression: Option<arrow2::io::ipc::write::Compression>,
) -> Result<Vec<u8>, Error> {
let result = vec![];
let options = arrow2::io::ipc::write::WriteOptions { compression };
let mut writer =
arrow2::io::ipc::write::FileWriter::try_new(result, schema, ipc_fields.clone(), options)?;
for chunk in chunks {
writer.write(chunk, ipc_fields.as_ref().map(|x| x.as_ref()))?;
}
writer.finish()?;
Ok(writer.into_inner())
}

fn check_round_trip(array: Box<dyn Array>) -> Result<(), Error> {
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array.clone()])?;

// given a mmap
let mmap = Arc::new(Mmap(vec![]));
let data = Arc::new(write(&[columns], &schema, None, None)?);

// read the metadata
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;
// we first read the files' metadata
let metadata =
arrow2::io::ipc::read::read_file_metadata(&mut std::io::Cursor::new(data.as_ref()))?;

// mmap the dictionaries
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
// next we mmap the dictionaries
// Safety: `write` above guarantees that this is a valid Arrow IPC file
let dictionaries =
unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? };

// and finally mmap a chunk (0 in this case).
let chunk = unsafe { mmap_unchecked(&metadata, &dictionaries, mmap, 0) }?;

println!("{chunk:?}");
// Safety: `write` above guarantees that this is a valid Arrow IPC file
let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, &dictionaries, data, 0)? };
assert_eq!(new_array.into_arrays()[0], array);
Ok(())
}

fn main() -> Result<(), Error> {
let array = BooleanArray::from([None, None, Some(true)]).boxed();
check_round_trip(array)
}
31 changes: 25 additions & 6 deletions guide/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,29 @@ interoperability with the arrow format.

The typical use-case for this library is to perform CPU and memory-intensive analytics in a format that supports heterogeneous data structures, null values, and IPC and FFI interfaces across languages.

Arrow2 is divided into 5 main parts:
Arrow2 is divided in 5 main APIs:

* a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions;
* a [high-level API](./high_level.md) to operate with arrow arrays;
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata;
* a [compute API](./compute.md) with operators to operate over arrays;
* an [IO API](./io/README.md) with interfaces to read from, and write to, other formats.
* a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions
* a [high-level API](./high_level.md) to operate with arrow arrays
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata
* a [compute API](./compute.md) with operators to operate over arrays
* an IO API with interfaces to read from, and write to, other formats
* Arrow
* [Read files](./io/ipc_read.md)
* [Read streams](./io/ipc_stream_read.md)
* [Memory map files](./io/ipc_mmap.md)
* [Write](./io/ipc_write.md)
* CSV
* [Read](./io/csv_read.md)
* [Write](./io/csv_write.md)
* Parquet
* [Read](./io/parquet_read.md)
* [Write](./io/parquet_write.md)
* JSON and NDJSON
* [Read](./io/json_read.md)
* [Write](./io/json_write.md)
* Avro
* [Read](./io/avro_read.md)
* [Write](./io/avro_write.md)
* ODBC
* [Read and write](./io/odbc.md)
2 changes: 1 addition & 1 deletion guide/src/high_level.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,5 @@ Below is a complete example of how to operate on a `Box<dyn Array>` without
extra allocations.

```rust,ignore
{{#include ../examples/cow.rs}}
{{#include ../../examples/cow.rs}}
```
11 changes: 0 additions & 11 deletions guide/src/io/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1 @@
# IO

This crate offers optional features that enable interoperability with different formats:

* Arrow (`io_ipc`)
* CSV (`io_csv`)
* Parquet (`io_parquet`)
* JSON and NDJSON (`io_json`)
* Avro (`io_avro` and `io_avro_async`)
* ODBC-compliant databases (`io_odbc`)

In this section you can find a guide and examples for each one of them.
2 changes: 1 addition & 1 deletion guide/src/io/odbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ When compiled with feature `io_odbc`, this crate can be used to read from, and w
any [ODBC](https://en.wikipedia.org/wiki/Open_Database_Connectivity) interface:

```rust
{{#include ../../../examples/odbc.rs}}
{{#include ../../../examples/io_odbc.rs}}
```
9 changes: 1 addition & 8 deletions guide/src/io/parquet_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@ Here is how to read a single column chunk from a single row group:
The example above minimizes memory usage at the expense of mixing IO and CPU tasks
on the same thread, which may hurt performance if one of them is a bottleneck.

For single-threaded reading, buffers used to read and decompress pages can be re-used.
This create offers an API that encapsulates the above logic:

```rust
{{#include ../../../examples/parquet_read_record.rs}}
```

### Parallelism decoupling of CPU from IO

One important aspect of the pages created by the iterator above is that they can cross
Expand All @@ -38,7 +31,7 @@ and that it is advantageous to have a single thread performing all IO-intensive
by delegating all CPU-intensive tasks to separate threads.

```rust
{{#include ../../../examples/parquet_read_parallel.rs}}
{{#include ../../../examples/parquet_read_parallel/src/main.rs}}
```

This can of course be reversed; in configurations where IO is bounded (e.g. when a
Expand Down
12 changes: 6 additions & 6 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ fn encode_dictionary(
}

pub fn encode_chunk(
columns: &Chunk<Box<dyn Array>>,
chunk: &Chunk<Box<dyn Array>>,
fields: &[IpcField],
dictionary_tracker: &mut DictionaryTracker,
options: &WriteOptions,
) -> Result<(Vec<EncodedData>, EncodedData)> {
let mut encoded_dictionaries = vec![];

for (field, array) in fields.iter().zip(columns.as_ref()) {
for (field, array) in fields.iter().zip(chunk.as_ref()) {
encode_dictionary(
field,
array.as_ref(),
Expand All @@ -189,7 +189,7 @@ pub fn encode_chunk(
)?;
}

let encoded_message = columns_to_bytes(columns, options);
let encoded_message = chunk_to_bytes(chunk, options);

Ok((encoded_dictionaries, encoded_message))
}
Expand All @@ -213,12 +213,12 @@ fn serialize_compression(

/// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the
/// other for the batch's data
fn columns_to_bytes(columns: &Chunk<Box<dyn Array>>, options: &WriteOptions) -> EncodedData {
fn chunk_to_bytes(chunk: &Chunk<Box<dyn Array>>, options: &WriteOptions) -> EncodedData {
let mut nodes: Vec<arrow_format::ipc::FieldNode> = vec![];
let mut buffers: Vec<arrow_format::ipc::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
let mut offset = 0;
for array in columns.arrays() {
for array in chunk.arrays() {
write(
array.as_ref(),
&mut buffers,
Expand All @@ -236,7 +236,7 @@ fn columns_to_bytes(columns: &Chunk<Box<dyn Array>>, options: &WriteOptions) ->
version: arrow_format::ipc::MetadataVersion::V5,
header: Some(arrow_format::ipc::MessageHeader::RecordBatch(Box::new(
arrow_format::ipc::RecordBatch {
length: columns.len() as i64,
length: chunk.len() as i64,
nodes: Some(nodes),
buffers: Some(buffers),
compression,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<W: Write> FileWriter<W> {
/// Writes [`Chunk`] to the file
pub fn write(
&mut self,
columns: &Chunk<Box<dyn Array>>,
chunk: &Chunk<Box<dyn Array>>,
ipc_fields: Option<&[IpcField]>,
) -> Result<()> {
if self.state != State::Started {
Expand All @@ -134,7 +134,7 @@ impl<W: Write> FileWriter<W> {
};

let (encoded_dictionaries, encoded_message) = encode_chunk(
columns,
chunk,
ipc_fields,
&mut self.dictionary_tracker,
&self.options,
Expand Down