Skip to content

Commit

Permalink
Migrated code to here (#1)
Browse files Browse the repository at this point in the history
* Added more content

* Added toolkit to read files

* Added examples

* Added tests, docs and coverage
  • Loading branch information
jorgecarleitao authored Aug 1, 2022
1 parent b4fb45a commit ff2fc8b
Show file tree
Hide file tree
Showing 26 changed files with 1,176 additions and 26 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Coverage

on: [pull_request, push]

jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust
run: rustup toolchain install stable --component llvm-tools-preview
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
run: cargo llvm-cov --features full --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
files: lcov.info
fail_ci_if_error: true
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
run: rustup update stable
- uses: Swatinem/rust-cache@v1
- name: Run
run: cargo test
run: cargo test --features full
22 changes: 21 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "avro-schema"
version = "0.2.2"
license = "Apache-2.0"
description = "Implementation of Apache Avro spec"
description = "Apache Avro specification"
homepage = "https://github.com/DataEngineeringLabs/avro-schema"
repository = "https://github.com/DataEngineeringLabs/avro-schema"
authors = ["Jorge C. Leitao <[email protected]>"]
Expand All @@ -12,3 +12,23 @@ edition = "2018"
[dependencies]
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0", default-features = false }

fallible-streaming-iterator = { version = "0.1" }

libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }

# for async
futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

[features]
default = []
full = ["compression", "async"]
compression = [
"libflate",
"snap",
"crc",
]
async = ["futures", "async-stream"]
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Avro-schema - Avro schema in Rust

[![test](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml/badge.svg)](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/DataEngineeringLabs/avro-schema/branch/main/graph/badge.svg)](https://codecov.io/gh/DataEngineeringLabs/avro-schema)

This crate contains the complete implementation of the schemas of
the [Avro specification](https://avro.apache.org/docs/current/spec.html) in
native Rust.

See API documentation with examples on how to read and write.

## License

Licensed under either of
Expand Down
22 changes: 22 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Contains [`Error`]
/// Error from this crate
#[derive(Debug, Clone, Copy)]
pub enum Error {
/// Generic error when the file is out of spec
OutOfSpec,
/// When reading or writing with compression but the feature flag "compression" is not active.
RequiresCompression,
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl From<std::io::Error> for Error {
fn from(_: std::io::Error) -> Self {
Error::OutOfSpec
}
}
60 changes: 60 additions & 0 deletions src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Contains structs found in Avro files
use crate::schema::Record;

/// Avro file's Metadata
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct FileMetadata {
/// The Record represented in the file's Schema
pub record: Record,
/// The files' compression
pub compression: Option<Compression>,
/// The files' marker, present in every block
pub marker: [u8; 16],
}

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}
81 changes: 81 additions & 0 deletions src/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,84 @@ This is a library containing declarations of the
[Avro specification](https://avro.apache.org/docs/current/spec.html)
in Rust's struct and enums together with serialization and deserialization
implementations based on `serde_json`.

It also contains basic functionality to read and deserialize Avro's file's metadata
and blocks.

Example of reading a file:

```rust
use std::convert::TryInto;
use std::fs::File;
use std::io::BufReader;

use avro_schema::error::Error;
use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;

fn read_avro(path: &str) -> Result<(), Error> {
let file = &mut BufReader::new(File::open(path)?);

let metadata = avro_schema::read::read_metadata(file)?;

println!("{:#?}", metadata);

let mut blocks =
avro_schema::read::BlockStreamingIterator::new(file, metadata.compression, metadata.marker);

while let Some(block) = blocks.next()? {
let _fields = &metadata.record.fields;
let length = block.number_of_rows;
let mut block: &[u8] = block.data.as_ref();
// at this point you can deserialize the block based on `_fields` according
// to avro's specification. Note that `Block` is already decompressed.
// for example, if there was a single field with f32, we would use
for _ in 0..length {
let (item, remaining) = block.split_at(4);
block = remaining;
let _value = f32::from_le_bytes(item.try_into().unwrap());
// if there were more fields, we would need to consume (or skip) the remaining
// here. You can use `avro_schema::read::decode::zigzag_i64` for integers :D
}
}

Ok(())
}
```

Example of writing a file

```rust
use std::fs::File;

use avro_schema::error::Error;
use avro_schema::file::Block;
use avro_schema::schema::{Field, Record, Schema};

fn write_avro(compression: Option<avro_schema::file::Compression>) -> Result<(), Error> {
let mut file = File::create("test.avro")?;

let record = Record::new("", vec![Field::new("value", Schema::Float)]);

avro_schema::write::write_metadata(&mut file, record, compression)?;

// given some data:
let array = vec![1.0f32, 2.0];

// we need to create a `Block`
let mut data: Vec<u8> = vec![];
for item in array.iter() {
let bytes = item.to_le_bytes();
data.extend(bytes);
}
let mut block = Block::new(array.len(), data);

// once completed, we compress it
let mut compressed_block = avro_schema::file::CompressedBlock::default();
let _ = avro_schema::write::compress(&mut block, &mut compressed_block, compression)?;

// and finally write it to the file
avro_schema::write::write_block(&mut file, &compressed_block)?;

Ok(())
}
```
18 changes: 14 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
#![doc = include_str!("lib.md")]
#![forbid(unsafe_code)]
#![forbid(missing_docs)]

mod de;
mod schema;
mod se;
pub use schema::*;
pub mod error;
pub mod file;
pub mod schema;

pub mod read;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub mod read_async;

pub mod write;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub mod write_async;
96 changes: 96 additions & 0 deletions src/read/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//! APIs to read from Avro format to arrow.
use std::io::Read;

use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::{error::Error, file::CompressedBlock};

use super::decode;

fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize), Error> {
let rows = match decode::internal_zigzag_i64(reader) {
Ok(a) => a,
Err(error) => match error {
decode::DecodeError::EndOfFile => return Ok((0, 0)),
decode::DecodeError::OutOfSpec => return Err(Error::OutOfSpec),
},
};
let bytes = decode::zigzag_i64(reader)?;
Ok((rows as usize, bytes as usize))
}

/// Reads a [`CompressedBlock`] from the `reader`.
/// # Error
/// This function errors iff either the block cannot be read or the sync marker does not match
fn read_block<R: Read>(
reader: &mut R,
block: &mut CompressedBlock,
marker: [u8; 16],
) -> Result<(), Error> {
let (rows, bytes) = read_size(reader)?;
block.number_of_rows = rows;
if rows == 0 {
return Ok(());
};

block.data.clear();
block
.data
.try_reserve(bytes)
.map_err(|_| Error::OutOfSpec)?;
reader.take(bytes as u64).read_to_end(&mut block.data)?;

let mut block_marker = [0u8; 16];
reader.read_exact(&mut block_marker)?;

if block_marker != marker {
return Err(Error::OutOfSpec);
}
Ok(())
}

/// [`FallibleStreamingIterator`] of [`CompressedBlock`].
pub struct CompressedBlockStreamingIterator<R: Read> {
buf: CompressedBlock,
reader: R,
marker: [u8; 16],
}

impl<R: Read> CompressedBlockStreamingIterator<R> {
/// Creates a new [`CompressedBlockStreamingIterator`].
pub fn new(reader: R, marker: [u8; 16], scratch: Vec<u8>) -> Self {
Self {
reader,
marker,
buf: CompressedBlock::new(0, scratch),
}
}

/// The buffer of [`CompressedBlockStreamingIterator`].
pub fn buffer(&mut self) -> &mut CompressedBlock {
&mut self.buf
}

/// Deconstructs itself
pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buf.data)
}
}

impl<R: Read> FallibleStreamingIterator for CompressedBlockStreamingIterator<R> {
type Error = Error;
type Item = CompressedBlock;

fn advance(&mut self) -> Result<(), Error> {
read_block(&mut self.reader, &mut self.buf, self.marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if self.buf.number_of_rows > 0 {
Some(&self.buf)
} else {
None
}
}
}
Loading

0 comments on commit ff2fc8b

Please sign in to comment.