Skip to content

Commit

Permalink
Added tests, docs and coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 1, 2022
1 parent f979242 commit a42daee
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 11 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Coverage

on: [pull_request, push]

jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust
run: rustup toolchain install stable --component llvm-tools-preview
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
run: cargo llvm-cov --features full --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
files: lcov.info
fail_ci_if_error: true
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
run: rustup update stable
- uses: Swatinem/rust-cache@v1
- name: Run
run: cargo test
run: cargo test --features full
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async-stream = { version = "0.3.2", optional = true }

[features]
default = []
full = ["compression", "async"]
compression = [
"libflate",
"snap",
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Avro-schema - Avro schema in Rust

[![test](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml/badge.svg)](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/DataEngineeringLabs/avro-schema/branch/main/graph/badge.svg)](https://codecov.io/gh/DataEngineeringLabs/avro-schema)

This crate contains the complete implementation of the schemas of
the [Avro specification](https://avro.apache.org/docs/current/spec.html) in
native Rust.
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! Contains [`Error`]
/// Error from this crate
#[derive(Debug, Clone, Copy)]
pub enum Error {
/// Generic error when the file is out of spec
OutOfSpec,
/// When reading or writing with compression but the feature flag "compression" is not active.
RequiresCompression,
Expand Down
4 changes: 4 additions & 0 deletions src/file.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
//! Contains structs found in Avro files
use crate::schema::Record;

/// Avro file's Metadata
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct FileMetadata {
/// The Record represented in the file's Schema
pub record: Record,
/// The files' compression
pub compression: Option<Compression>,
/// The files' marker, present in every block
pub marker: [u8; 16],
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![doc = include_str!("lib.md")]
#![forbid(unsafe_code)]
#![forbid(missing_docs)]

pub mod error;
pub mod file;
Expand Down
1 change: 1 addition & 0 deletions src/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Functions to read and decompress Files' metadata and blocks
mod block;
mod decode;
pub(crate) mod decompress;
Expand Down
55 changes: 55 additions & 0 deletions src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,72 @@
//! Contains structs defining Avro's logical types
mod de;
mod se;

/// An Avro Schema. It describes all _physical_ and _logical_ types.
/// See [the spec](https://avro.apache.org/docs/current/spec.html) for details.
#[derive(Debug, Clone, PartialEq, Hash)]
pub enum Schema {
/// A null type
Null,
/// Boolean (physically represented as a single byte)
Boolean,
/// 32 bit signed integer (physically represented as a zigzag encoded variable number of bytes)
Int(Option<IntLogical>),
/// 64 bit signed integer (physically represented as a zigzag encoded variable number of bytes)
Long(Option<LongLogical>),
/// 32 bit float (physically represented as 4 bytes in little endian)
Float,
/// 64 bit float (physically represented as 8 bytes in little endian)
Double,
/// variable length bytes (physically represented by a zigzag encoded positive integer followed by its number of bytes)
Bytes(Option<BytesLogical>),
/// variable length utf8 (physically represented by a zigzag encoded positive integer followed by its number of bytes)
String(Option<StringLogical>),
/// Record
Record(Record),
/// Enum with a known number of variants
Enum(Enum),
/// Array of a uniform type with N entries
Array(Box<Schema>),
/// A map String -> type
Map(Box<Schema>),
/// A union of a heterogeneous number of types
Union(Vec<Schema>),
/// todo
Fixed(Fixed),
}

/// Order of a [`Field`].
#[derive(Debug, Clone, Copy, PartialEq, Hash)]
pub enum Order {
/// Ascending order
Ascending,
/// Descending order
Descending,
/// Order is to be ignored
Ignore,
}

/// An Avro field.
/// See [the spec](https://avro.apache.org/docs/current/spec.html) for details.
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct Field {
/// Its name
pub name: String,
/// Its optional documentation
pub doc: Option<String>,
/// Its Schema
pub schema: Schema,
/// Its default value
pub default: Option<Schema>,
/// Its optional order
pub order: Option<Order>,
/// Its aliases
pub aliases: Vec<String>,
}

impl Field {
/// Returns a new [`Field`] without a doc, default, order or aliases
pub fn new<I: Into<String>>(name: I, schema: Schema) -> Self {
Self {
name: name.into(),
Expand All @@ -57,14 +82,20 @@ impl Field {
/// Struct to hold data from a [`Schema::Record`].
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct Record {
/// Its name
pub name: String,
/// Its optional namespace
pub namespace: Option<String>,
/// Its optional documentation
pub doc: Option<String>,
/// Its aliases
pub aliases: Vec<String>,
/// Its children fields
pub fields: Vec<Field>,
}

impl Record {
/// Returns a new [`Record`] without a namespace, doc or aliases
pub fn new<I: Into<String>>(name: I, fields: Vec<Field>) -> Self {
Self {
name: name.into(),
Expand All @@ -79,15 +110,22 @@ impl Record {
/// Struct to hold data from a [`Schema::Fixed`].
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct Fixed {
/// Its name
pub name: String,
/// Its optional namespace
pub namespace: Option<String>,
/// Its optional documentation
pub doc: Option<String>,
/// Its aliases
pub aliases: Vec<String>,
/// Its size
pub size: usize,
/// Its optional logical type
pub logical: Option<FixedLogical>,
}

impl Fixed {
/// Returns a new [`Fixed`] without a namespace, doc or aliases
pub fn new<I: Into<String>>(name: I, size: usize) -> Self {
Self {
name: name.into(),
Expand All @@ -103,11 +141,17 @@ impl Fixed {
/// Struct to hold data from a [`Schema::Enum`].
#[derive(Debug, Clone, PartialEq, Hash)]
pub struct Enum {
/// Its name
pub name: String,
/// Its optional namespace
pub namespace: Option<String>,
/// Its aliases
pub aliases: Vec<String>,
/// Its optional documentation
pub doc: Option<String>,
/// Its set of symbols
pub symbols: Vec<String>,
/// Its default symbol
pub default: Option<String>,
}

Expand Down Expand Up @@ -146,35 +190,46 @@ impl From<Fixed> for Schema {
/// Enum of all logical types of [`Schema::Int`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IntLogical {
/// A date
Date,
/// A time
Time,
}

/// Enum of all logical types of [`Schema::Long`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LongLogical {
/// A time
Time,
/// A timestamp
TimestampMillis,
/// A timestamp
TimestampMicros,
/// A timestamp without timezone
LocalTimestampMillis,
/// A timestamp without timezone
LocalTimestampMicros,
}

/// Enum of all logical types of [`Schema::String`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StringLogical {
/// A UUID
Uuid,
}

/// Enum of all logical types of [`Schema::Fixed`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FixedLogical {
/// A decimal
Decimal(usize, usize),
/// A duration
Duration,
}

/// Enum of all logical types of [`Schema::Bytes`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BytesLogical {
/// A decimal
Decimal(usize, usize),
}
2 changes: 2 additions & 0 deletions src/write/encode.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Functions used to encode Avro physical types
use crate::error::Error;

/// Zigzag encoding of a signed integer.
#[inline]
pub fn zigzag_encode<W: std::io::Write>(n: i64, writer: &mut W) -> Result<(), Error> {
_zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer)
Expand Down
1 change: 1 addition & 0 deletions src/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Functions to compress and write Files' metadata and blocks
mod compression;
pub use compression::compress;
mod block;
Expand Down
1 change: 1 addition & 0 deletions src/write_async.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Functions to asynchronously write Files' metadata and blocks
use futures::{AsyncWrite, AsyncWriteExt};

use crate::{
Expand Down
91 changes: 91 additions & 0 deletions tests/it/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::convert::TryInto;

use avro_schema::error::Error;
use avro_schema::file::{Block, Compression};
use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;
use avro_schema::schema::{Field, Record, Schema};

fn read_avro(mut data: &[u8]) -> Result<Vec<f32>, Error> {
let metadata = avro_schema::read::read_metadata(&mut data)?;

let mut blocks = avro_schema::read::BlockStreamingIterator::new(
&mut data,
metadata.compression,
metadata.marker,
);

let mut values = vec![];
while let Some(block) = blocks.next()? {
let _fields = &metadata.record.fields;
let length = block.number_of_rows;
let mut block: &[u8] = block.data.as_ref();
// at this point you can deserialize the block based on `_fields` according
// to avro's specification. Note that `Block` is already decompressed.
// for example, if there was a single field with f32, we would use
for _ in 0..length {
let (item, remaining) = block.split_at(4);
block = remaining;
let value = f32::from_le_bytes(item.try_into().unwrap());
values.push(value)
// if there were more fields, we would need to consume (or skip) the remaining
// here. You can use `avro_schema::read::decode::zigzag_i64` for integers :D
}
}

Ok(values)
}

fn write_avro(
compression: Option<avro_schema::file::Compression>,
array: &[f32],
) -> Result<Vec<u8>, Error> {
let mut file = vec![];

let record = Record::new("", vec![Field::new("value", Schema::Float)]);

avro_schema::write::write_metadata(&mut file, record, compression)?;

// we need to create a `Block`
let mut data: Vec<u8> = vec![];
for item in array.iter() {
let bytes = item.to_le_bytes();
data.extend(bytes);
}
let mut block = Block::new(array.len(), data);

// once completed, we compress it
let mut compressed_block = avro_schema::file::CompressedBlock::default();
let _ = avro_schema::write::compress(&mut block, &mut compressed_block, compression)?;

// and finally write it to the file
avro_schema::write::write_block(&mut file, &compressed_block)?;

Ok(file)
}

#[test]
fn round_trip() -> Result<(), Error> {
let original = vec![0.1, 0.2];
let file = write_avro(None, &original)?;
let read = read_avro(&file)?;
assert_eq!(read, original);
Ok(())
}

#[test]
fn round_trip_deflate() -> Result<(), Error> {
let original = vec![0.1, 0.2];
let file = write_avro(Some(Compression::Deflate), &original)?;
let read = read_avro(&file)?;
assert_eq!(read, original);
Ok(())
}

#[test]
fn round_trip_snappy() -> Result<(), Error> {
let original = vec![0.1, 0.2];
let file = write_avro(Some(Compression::Snappy), &original)?;
let read = read_avro(&file)?;
assert_eq!(read, original);
Ok(())
}
12 changes: 2 additions & 10 deletions tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod file;

use serde_json::Result;

use avro_schema::schema::{BytesLogical, Field, LongLogical, Schema};
Expand Down Expand Up @@ -154,13 +156,3 @@ fn test_deserialize() -> Result<()> {
}
Ok(())
}

#[test]
fn test_round_trip() -> Result<()> {
for (_, expected) in cases() {
let serialized = serde_json::to_string(&expected)?;
let v: avro_schema::schema::Schema = serde_json::from_str(&serialized)?;
assert_eq!(expected, v);
}
Ok(())
}

0 comments on commit a42daee

Please sign in to comment.