Skip to content

Commit

Permalink
using lzma to compress/decompress state_parts
Browse files Browse the repository at this point in the history
  • Loading branch information
Kouprin committed Mar 4, 2020
1 parent 4774eba commit 942002d
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 13 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde = "1.0"
serde_derive = "1.0"
cached = "0.11.0"
lazy_static = "1.4"
rust-lzma = "0.5"

borsh = "0.2.10"

Expand Down
116 changes: 104 additions & 12 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::sync::Arc;
use std::time::{Duration as TimeDuration, Instant};

Expand Down Expand Up @@ -61,6 +62,9 @@ const MAX_ORPHAN_AGE_SECS: u64 = 300;
/// Refuse blocks more than this many block intervals in the future (as in bitcoin).
const ACCEPTABLE_TIME_DIFFERENCE: i64 = 12 * 10;

/// Maximal size of the part of the state which we accept.
pub const MAX_PART_LEN: u64 = 2 * 1024 * 1024;

/// Over this block height delta in advance if we are not chunk producer - route tx to upcoming validators.
pub const TX_ROUTING_HEIGHT_HORIZON: BlockHeightDelta = 4;

Expand Down Expand Up @@ -1366,8 +1370,9 @@ impl Chain {
return Err(ErrorKind::InvalidStateRequest("part_id out of bound".to_string()).into());
}
let state_part = self.runtime_adapter.obtain_state_part(&state_root, part_id, num_parts);
let compressed_part = lzma::compress(&state_part, 6).expect("compressor should not fail");

Ok(state_part)
Ok(compressed_part)
}

pub fn set_state_header(
Expand Down Expand Up @@ -1576,19 +1581,36 @@ impl Chain {
let shard_state_header = self.get_received_state_header(shard_id, sync_hash)?;
let ShardStateSyncResponseHeader { chunk, .. } = shard_state_header;
let state_root = chunk.header.inner.prev_state_root;
if !self.runtime_adapter.validate_state_part(&state_root, part_id, num_parts, data) {
byzantine_assert!(false);
return Err(ErrorKind::Other(
"set_state_part failed: validate_state_part failed".into(),
)
.into());
match lzma_decompress(data) {
Ok(decompressed_data) => {
if !self.runtime_adapter.validate_state_part(
&state_root,
part_id,
num_parts,
&decompressed_data,
) {
byzantine_assert!(false);
return Err(ErrorKind::Other(
"set_state_part failed: validate_state_part failed".into(),
)
.into());
}

// Saving the part data.
let mut store_update = self.store.owned_store().store_update();
let key = StatePartKey(sync_hash, shard_id, part_id).try_to_vec()?;
store_update.set_ser(ColStateParts, &key, &decompressed_data)?;
store_update.commit()?;
}
_ => {
byzantine_assert!(false);
return Err(ErrorKind::Other(
"set_state_part failed: lzma::decompress failed".into(),
)
.into());
}
}

// Saving the part data.
let mut store_update = self.store.owned_store().store_update();
let key = StatePartKey(sync_hash, shard_id, part_id).try_to_vec()?;
store_update.set_ser(ColStateParts, &key, data)?;
store_update.commit()?;
Ok(())
}

Expand Down Expand Up @@ -3286,3 +3308,73 @@ pub fn collect_receipts_from_response(
let receipt_proofs = &receipt_proof_response.iter().map(|x| x.1.clone()).flatten().collect();
collect_receipts(receipt_proofs)
}

pub fn lzma_decompress(compressed_data: &Vec<u8>) -> Result<Vec<u8>, Error> {
let reader = lzma::LzmaReader::new_decompressor(compressed_data.as_slice()).unwrap();
let mut buf = vec![];
let mut handle = reader.take(MAX_PART_LEN + 1);
handle.read_to_end(&mut buf).unwrap();
if buf.len() > MAX_PART_LEN as usize {
return Err(ErrorKind::Other("Invalid decompressed data size".into()).into());
}
Ok(buf)
}

#[cfg(test)]
mod tests {
use crate::{lzma_decompress, MAX_PART_LEN};
use rand::Rng;

fn compress_decompress(data: &Vec<u8>) -> Vec<u8> {
println!("len(data) = {:?}", data.len());
let compressed_data = lzma::compress(&data, 6).unwrap();
println!("len(compressed_data) = {:?}", compressed_data.len());
match lzma_decompress(&compressed_data) {
Ok(data) => data,
_ => vec![],
}
}

fn check_compression_limit(is_random: bool) {
let mut data = vec![];
let mut rng = rand::thread_rng();
for i in 0..MAX_PART_LEN - 1 {
if is_random {
data.push(rng.gen());
} else {
data.push((i % 256) as u8);
}
}

// Limit minus one
let buf = compress_decompress(&data);
assert_eq!(data.len(), buf.len());
for (a, b) in data.iter().zip(&buf) {
assert_eq!(*a, *b)
}

// Limit
if is_random {
data.push(rng.gen());
} else {
data.push(255);
}
let buf = compress_decompress(&data);
assert_eq!(data.len(), buf.len());
for (a, b) in data.iter().zip(&buf) {
assert_eq!(*a, *b)
}

// Limit exceeded
data.push(123);
let buf = compress_decompress(&data);
assert!(buf.len() <= MAX_PART_LEN as usize);
assert_ne!(data.len(), buf.len());
}

#[test]
fn test_compression_limit() {
check_compression_limit(false);
check_compression_limit(true);
}
}
5 changes: 4 additions & 1 deletion chain/chain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[macro_use]
extern crate lazy_static;
extern crate lzma;

pub use chain::{collect_receipts, Chain, ChainGenesis, MAX_ORPHAN_SIZE};
pub use chain::{
collect_receipts, lzma_decompress, Chain, ChainGenesis, MAX_ORPHAN_SIZE, MAX_PART_LEN,
};
pub use doomslug::{Doomslug, DoomslugBlockProductionReadiness, DoomslugThresholdMode};
pub use error::{Error, ErrorKind};
pub use finality::{FinalityGadget, FinalityGadgetQuorums};
Expand Down

0 comments on commit 942002d

Please sign in to comment.