Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated code from arrow2 to here #1

Merged
merged 4 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Coverage

on: [pull_request, push]

jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust
run: rustup toolchain install stable --component llvm-tools-preview
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
run: cargo llvm-cov --features full --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
files: lcov.info
fail_ci_if_error: true
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
run: rustup update stable
- uses: Swatinem/rust-cache@v1
- name: Run
run: cargo test
run: cargo test --features full
22 changes: 21 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "avro-schema"
version = "0.2.2"
license = "Apache-2.0"
description = "Implementation of Apache Avro spec"
description = "Apache Avro specification"
homepage = "https://github.com/DataEngineeringLabs/avro-schema"
repository = "https://github.com/DataEngineeringLabs/avro-schema"
authors = ["Jorge C. Leitao <[email protected]>"]
Expand All @@ -12,3 +12,23 @@ edition = "2018"
[dependencies]
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0", default-features = false }

fallible-streaming-iterator = { version = "0.1" }

libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }

# for async
futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

[features]
default = []
full = ["compression", "async"]
compression = [
"libflate",
"snap",
"crc",
]
async = ["futures", "async-stream"]
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Avro-schema - Avro schema in Rust

[![test](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml/badge.svg)](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/DataEngineeringLabs/avro-schema/branch/main/graph/badge.svg)](https://codecov.io/gh/DataEngineeringLabs/avro-schema)

This crate contains the complete implementation of the schemas of
the [Avro specification](https://avro.apache.org/docs/current/spec.html) in
native Rust.

See API documentation with examples on how to read and write.

## License

Licensed under either of
Expand Down
22 changes: 22 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Contains [`Error`]

/// Error from this crate
#[derive(Debug, Clone, Copy)]
pub enum Error {
/// Generic error when the file is out of spec
OutOfSpec,
/// When reading or writing with compression but the feature flag "compression" is not active.
RequiresCompression,
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl From<std::io::Error> for Error {
fn from(_: std::io::Error) -> Self {
Error::OutOfSpec
}
}
60 changes: 60 additions & 0 deletions src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Contains structs found in Avro files
use crate::schema::Record;

/// Avro file's Metadata
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct FileMetadata {
/// The Record represented in the file's Schema
pub record: Record,
/// The files' compression
pub compression: Option<Compression>,
/// The files' marker, present in every block
pub marker: [u8; 16],
}

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}
81 changes: 81 additions & 0 deletions src/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,84 @@ This is a library containing declarations of the
[Avro specification](https://avro.apache.org/docs/current/spec.html)
in Rust's struct and enums together with serialization and deserialization
implementations based on `serde_json`.

It also contains basic functionality to read and deserialize Avro's file's metadata
and blocks.

Example of reading a file:

```rust
use std::convert::TryInto;
use std::fs::File;
use std::io::BufReader;

use avro_schema::error::Error;
use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;

fn read_avro(path: &str) -> Result<(), Error> {
let file = &mut BufReader::new(File::open(path)?);

let metadata = avro_schema::read::read_metadata(file)?;

println!("{:#?}", metadata);

let mut blocks =
avro_schema::read::BlockStreamingIterator::new(file, metadata.compression, metadata.marker);

while let Some(block) = blocks.next()? {
let _fields = &metadata.record.fields;
let length = block.number_of_rows;
let mut block: &[u8] = block.data.as_ref();
// at this point you can deserialize the block based on `_fields` according
// to avro's specification. Note that `Block` is already decompressed.
// for example, if there was a single field with f32, we would use
for _ in 0..length {
let (item, remaining) = block.split_at(4);
block = remaining;
let _value = f32::from_le_bytes(item.try_into().unwrap());
// if there were more fields, we would need to consume (or skip) the remaining
// here. You can use `avro_schema::read::decode::zigzag_i64` for integers :D
}
}

Ok(())
}
```

Example of writing a file

```rust
use std::fs::File;

use avro_schema::error::Error;
use avro_schema::file::Block;
use avro_schema::schema::{Field, Record, Schema};

fn write_avro(compression: Option<avro_schema::file::Compression>) -> Result<(), Error> {
let mut file = File::create("test.avro")?;

let record = Record::new("", vec![Field::new("value", Schema::Float)]);

avro_schema::write::write_metadata(&mut file, record, compression)?;

// given some data:
let array = vec![1.0f32, 2.0];

// we need to create a `Block`
let mut data: Vec<u8> = vec![];
for item in array.iter() {
let bytes = item.to_le_bytes();
data.extend(bytes);
}
let mut block = Block::new(array.len(), data);

// once completed, we compress it
let mut compressed_block = avro_schema::file::CompressedBlock::default();
let _ = avro_schema::write::compress(&mut block, &mut compressed_block, compression)?;

// and finally write it to the file
avro_schema::write::write_block(&mut file, &compressed_block)?;

Ok(())
}
```
18 changes: 14 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
#![doc = include_str!("lib.md")]
#![forbid(unsafe_code)]
#![forbid(missing_docs)]

mod de;
mod schema;
mod se;
pub use schema::*;
pub mod error;
pub mod file;
pub mod schema;

pub mod read;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub mod read_async;

pub mod write;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub mod write_async;
96 changes: 96 additions & 0 deletions src/read/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//! APIs to read from Avro format to arrow.
use std::io::Read;

use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::{error::Error, file::CompressedBlock};

use super::decode;

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize), Error> {
let rows = match decode::internal_zigzag_i64(reader) {
Ok(a) => a,
Err(error) => match error {
decode::DecodeError::EndOfFile => return Ok((0, 0)),
decode::DecodeError::OutOfSpec => return Err(Error::OutOfSpec),
},
};
let bytes = decode::zigzag_i64(reader)?;
Ok((rows as usize, bytes as usize))
}

/// Reads a [`CompressedBlock`] from the `reader`.
/// # Error
/// This function errors iff either the block cannot be read or the sync marker does not match
fn read_block<R: Read>(
reader: &mut R,
block: &mut CompressedBlock,
marker: [u8; 16],
) -> Result<(), Error> {
let (rows, bytes) = read_size(reader)?;
block.number_of_rows = rows;
if rows == 0 {
return Ok(());
};

block.data.clear();
block
.data
.try_reserve(bytes)
.map_err(|_| Error::OutOfSpec)?;
reader.take(bytes as u64).read_to_end(&mut block.data)?;

let mut block_marker = [0u8; 16];
reader.read_exact(&mut block_marker)?;

if block_marker != marker {
return Err(Error::OutOfSpec);
}
Ok(())
}

/// [`FallibleStreamingIterator`] of [`CompressedBlock`].
pub struct CompressedBlockStreamingIterator<R: Read> {
buf: CompressedBlock,
reader: R,
marker: [u8; 16],
}

impl<R: Read> CompressedBlockStreamingIterator<R> {
/// Creates a new [`CompressedBlockStreamingIterator`].
pub fn new(reader: R, marker: [u8; 16], scratch: Vec<u8>) -> Self {
Self {
reader,
marker,
buf: CompressedBlock::new(0, scratch),
}
}

/// The buffer of [`CompressedBlockStreamingIterator`].
pub fn buffer(&mut self) -> &mut CompressedBlock {
&mut self.buf
}

/// Deconstructs itself
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buf.data)
}
}

impl<R: Read> FallibleStreamingIterator for CompressedBlockStreamingIterator<R> {
type Error = Error;
type Item = CompressedBlock;

fn advance(&mut self) -> Result<(), Error> {
read_block(&mut self.reader, &mut self.buf, self.marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.number_of_rows > 0 {
Some(&self.buf)
} else {
None
}
}
}
Loading