From a42daeecc87111dd9c298e41b71f581db717f8fe Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 1 Aug 2022 20:55:55 +0000 Subject: [PATCH] Added tests, docs and coverage --- .github/workflows/coverage.yml | 21 ++++++++ .github/workflows/test.yml | 2 +- Cargo.toml | 1 + README.md | 3 ++ src/error.rs | 4 ++ src/file.rs | 4 ++ src/lib.rs | 1 + src/read/mod.rs | 1 + src/schema/mod.rs | 55 ++++++++++++++++++++ src/write/encode.rs | 2 + src/write/mod.rs | 1 + src/write_async.rs | 1 + tests/it/file.rs | 91 ++++++++++++++++++++++++++++++++++ tests/it/main.rs | 12 +---- 14 files changed, 188 insertions(+), 11 deletions(-) create mode 100644 .github/workflows/coverage.yml create mode 100644 tests/it/file.rs diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml new file mode 100644 index 0000000..effc75b --- /dev/null +++ b/.github/workflows/coverage.yml @@ -0,0 +1,21 @@ +name: Coverage + +on: [pull_request, push] + +jobs: + coverage: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup toolchain install stable --component llvm-tools-preview + - name: Install cargo-llvm-cov + uses: taiki-e/install-action@cargo-llvm-cov + - uses: Swatinem/rust-cache@v1 + - name: Generate code coverage + run: cargo llvm-cov --features full --lcov --output-path lcov.info + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v1 + with: + files: lcov.info + fail_ci_if_error: true diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ac8fd8d..407844b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,4 +12,4 @@ jobs: run: rustup update stable - uses: Swatinem/rust-cache@v1 - name: Run - run: cargo test + run: cargo test --features full diff --git a/Cargo.toml b/Cargo.toml index 7bf7d14..3f3886e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ async-stream = { version = "0.3.2", optional = true } [features] default = [] +full = ["compression", "async"] compression = [ "libflate", "snap", diff --git a/README.md b/README.md index d41d663..2ef6a2a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ # Avro-schema - Avro schema in Rust +[![test](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml/badge.svg)](https://github.com/DataEngineeringLabs/avro-schema/actions/workflows/test.yml) +[![codecov](https://codecov.io/gh/DataEngineeringLabs/avro-schema/branch/main/graph/badge.svg)](https://codecov.io/gh/DataEngineeringLabs/avro-schema) + This crate contains the complete implementation of the schemas of the [Avro specification](https://avro.apache.org/docs/current/spec.html) in native Rust. diff --git a/src/error.rs b/src/error.rs index 0e5ee7d..490ebdb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,9 @@ +//! Contains [`Error`] + +/// Error from this crate #[derive(Debug, Clone, Copy)] pub enum Error { + /// Generic error when the file is out of spec OutOfSpec, /// When reading or writing with compression but the feature flag "compression" is not active. RequiresCompression, diff --git a/src/file.rs b/src/file.rs index 0096439..0412ce5 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,10 +1,14 @@ +//! Contains structs found in Avro files use crate::schema::Record; /// Avro file's Metadata #[derive(Debug, Clone, PartialEq, Hash)] pub struct FileMetadata { + /// The Record represented in the file's Schema pub record: Record, + /// The files' compression pub compression: Option, + /// The files' marker, present in every block pub marker: [u8; 16], } diff --git a/src/lib.rs b/src/lib.rs index 932ca52..c9ba6a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![doc = include_str!("lib.md")] #![forbid(unsafe_code)] +#![forbid(missing_docs)] pub mod error; pub mod file; diff --git a/src/read/mod.rs b/src/read/mod.rs index 5b3db68..a62119e 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -1,3 +1,4 @@ +//! Functions to read and decompress Files' metadata and blocks mod block; mod decode; pub(crate) mod decompress; diff --git a/src/schema/mod.rs b/src/schema/mod.rs index d128155..c7c4a3f 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -1,3 +1,4 @@ +//! Contains structs defining Avro's logical types mod de; mod se; @@ -5,27 +6,44 @@ mod se; /// See [the spec](https://avro.apache.org/docs/current/spec.html) for details. #[derive(Debug, Clone, PartialEq, Hash)] pub enum Schema { + /// A null type Null, + /// Boolean (physically represented as a single byte) Boolean, + /// 32 bit signed integer (physically represented as a zigzag encoded variable number of bytes) Int(Option), + /// 64 bit signed integer (physically represented as a zigzag encoded variable number of bytes) Long(Option), + /// 32 bit float (physically represented as 4 bytes in little endian) Float, + /// 64 bit float (physically represented as 8 bytes in little endian) Double, + /// variable length bytes (physically represented by a zigzag encoded positive integer followed by its number of bytes) Bytes(Option), + /// variable length utf8 (physically represented by a zigzag encoded positive integer followed by its number of bytes) String(Option), + /// Record Record(Record), + /// Enum with a known number of variants Enum(Enum), + /// Array of a uniform type with N entries Array(Box), + /// A map String -> type Map(Box), + /// A union of a heterogeneous number of types Union(Vec), + /// todo Fixed(Fixed), } /// Order of a [`Field`]. #[derive(Debug, Clone, Copy, PartialEq, Hash)] pub enum Order { + /// Ascending order Ascending, + /// Descending order Descending, + /// Order is to be ignored Ignore, } @@ -33,15 +51,22 @@ pub enum Order { /// See [the spec](https://avro.apache.org/docs/current/spec.html) for details. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Field { + /// Its name pub name: String, + /// Its optional documentation pub doc: Option, + /// Its Schema pub schema: Schema, + /// Its default value pub default: Option, + /// Its optional order pub order: Option, + /// Its aliases pub aliases: Vec, } impl Field { + /// Returns a new [`Field`] without a doc, default, order or aliases pub fn new>(name: I, schema: Schema) -> Self { Self { name: name.into(), @@ -57,14 +82,20 @@ impl Field { /// Struct to hold data from a [`Schema::Record`]. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Record { + /// Its name pub name: String, + /// Its optional namespace pub namespace: Option, + /// Its optional documentation pub doc: Option, + /// Its aliases pub aliases: Vec, + /// Its children fields pub fields: Vec, } impl Record { + /// Returns a new [`Record`] without a namespace, doc or aliases pub fn new>(name: I, fields: Vec) -> Self { Self { name: name.into(), @@ -79,15 +110,22 @@ impl Record { /// Struct to hold data from a [`Schema::Fixed`]. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Fixed { + /// Its name pub name: String, + /// Its optional namespace pub namespace: Option, + /// Its optional documentation pub doc: Option, + /// Its aliases pub aliases: Vec, + /// Its size pub size: usize, + /// Its optional logical type pub logical: Option, } impl Fixed { + /// Returns a new [`Fixed`] without a namespace, doc or aliases pub fn new>(name: I, size: usize) -> Self { Self { name: name.into(), @@ -103,11 +141,17 @@ impl Fixed { /// Struct to hold data from a [`Schema::Enum`]. #[derive(Debug, Clone, PartialEq, Hash)] pub struct Enum { + /// Its name pub name: String, + /// Its optional namespace pub namespace: Option, + /// Its aliases pub aliases: Vec, + /// Its optional documentation pub doc: Option, + /// Its set of symbols pub symbols: Vec, + /// Its default symbol pub default: Option, } @@ -146,35 +190,46 @@ impl From for Schema { /// Enum of all logical types of [`Schema::Int`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum IntLogical { + /// A date Date, + /// A time Time, } /// Enum of all logical types of [`Schema::Long`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum LongLogical { + /// A time Time, + /// A timestamp TimestampMillis, + /// A timestamp TimestampMicros, + /// A timestamp without timezone LocalTimestampMillis, + /// A timestamp without timezone LocalTimestampMicros, } /// Enum of all logical types of [`Schema::String`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum StringLogical { + /// A UUID Uuid, } /// Enum of all logical types of [`Schema::Fixed`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum FixedLogical { + /// A decimal Decimal(usize, usize), + /// A duration Duration, } /// Enum of all logical types of [`Schema::Bytes`] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum BytesLogical { + /// A decimal Decimal(usize, usize), } diff --git a/src/write/encode.rs b/src/write/encode.rs index 24c2a25..4746a77 100644 --- a/src/write/encode.rs +++ b/src/write/encode.rs @@ -1,5 +1,7 @@ +//! Functions used to encode Avro physical types use crate::error::Error; +/// Zigzag encoding of a signed integer. #[inline] pub fn zigzag_encode(n: i64, writer: &mut W) -> Result<(), Error> { _zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer) diff --git a/src/write/mod.rs b/src/write/mod.rs index 6b12fb0..03f26cf 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -1,3 +1,4 @@ +//! Functions to compress and write Files' metadata and blocks mod compression; pub use compression::compress; mod block; diff --git a/src/write_async.rs b/src/write_async.rs index 7256e01..96b2bbd 100644 --- a/src/write_async.rs +++ b/src/write_async.rs @@ -1,3 +1,4 @@ +//! Functions to asynchronously write Files' metadata and blocks use futures::{AsyncWrite, AsyncWriteExt}; use crate::{ diff --git a/tests/it/file.rs b/tests/it/file.rs new file mode 100644 index 0000000..913ce7b --- /dev/null +++ b/tests/it/file.rs @@ -0,0 +1,91 @@ +use std::convert::TryInto; + +use avro_schema::error::Error; +use avro_schema::file::{Block, Compression}; +use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator; +use avro_schema::schema::{Field, Record, Schema}; + +fn read_avro(mut data: &[u8]) -> Result, Error> { + let metadata = avro_schema::read::read_metadata(&mut data)?; + + let mut blocks = avro_schema::read::BlockStreamingIterator::new( + &mut data, + metadata.compression, + metadata.marker, + ); + + let mut values = vec![]; + while let Some(block) = blocks.next()? { + let _fields = &metadata.record.fields; + let length = block.number_of_rows; + let mut block: &[u8] = block.data.as_ref(); + // at this point you can deserialize the block based on `_fields` according + // to avro's specification. Note that `Block` is already decompressed. + // for example, if there was a single field with f32, we would use + for _ in 0..length { + let (item, remaining) = block.split_at(4); + block = remaining; + let value = f32::from_le_bytes(item.try_into().unwrap()); + values.push(value) + // if there were more fields, we would need to consume (or skip) the remaining + // here. You can use `avro_schema::read::decode::zigzag_i64` for integers :D + } + } + + Ok(values) +} + +fn write_avro( + compression: Option, + array: &[f32], +) -> Result, Error> { + let mut file = vec![]; + + let record = Record::new("", vec![Field::new("value", Schema::Float)]); + + avro_schema::write::write_metadata(&mut file, record, compression)?; + + // we need to create a `Block` + let mut data: Vec = vec![]; + for item in array.iter() { + let bytes = item.to_le_bytes(); + data.extend(bytes); + } + let mut block = Block::new(array.len(), data); + + // once completed, we compress it + let mut compressed_block = avro_schema::file::CompressedBlock::default(); + let _ = avro_schema::write::compress(&mut block, &mut compressed_block, compression)?; + + // and finally write it to the file + avro_schema::write::write_block(&mut file, &compressed_block)?; + + Ok(file) +} + +#[test] +fn round_trip() -> Result<(), Error> { + let original = vec![0.1, 0.2]; + let file = write_avro(None, &original)?; + let read = read_avro(&file)?; + assert_eq!(read, original); + Ok(()) +} + +#[test] +fn round_trip_deflate() -> Result<(), Error> { + let original = vec![0.1, 0.2]; + let file = write_avro(Some(Compression::Deflate), &original)?; + let read = read_avro(&file)?; + assert_eq!(read, original); + Ok(()) +} + +#[test] +fn round_trip_snappy() -> Result<(), Error> { + let original = vec![0.1, 0.2]; + let file = write_avro(Some(Compression::Snappy), &original)?; + let read = read_avro(&file)?; + assert_eq!(read, original); + Ok(()) +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 708004d..099bdcc 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -1,3 +1,5 @@ +mod file; + use serde_json::Result; use avro_schema::schema::{BytesLogical, Field, LongLogical, Schema}; @@ -154,13 +156,3 @@ fn test_deserialize() -> Result<()> { } Ok(()) } - -#[test] -fn test_round_trip() -> Result<()> { - for (_, expected) in cases() { - let serialized = serde_json::to_string(&expected)?; - let v: avro_schema::schema::Schema = serde_json::from_str(&serialized)?; - assert_eq!(expected, v); - } - Ok(()) -}