Skip to content

Commit

Permalink
nearcore: support dling compressed genesis.json and config.json (#…
Browse files Browse the repository at this point in the history
…5732)

Introduce `AutoXzDecoder` struct (which automatically detects
xz-compressed files and transparently decompresses them) and use it in
`download_file` to support compressed `genesis.json` and `config.json`
files.

Especially the former file may get quite large resulting in long
downloads and wasted bandwidth.  Being able to download compressed
JSON files will reduce the time it takes for `neard init` to finish.
For example, testnet’s `genesis.json` file is 5.5 GB and shrinks to 45
MB when compressed.

The default download URLs still point to uncompressed files (since
compressed ones are not yet available).

Issue: #4951
  • Loading branch information
mina86 authored Feb 1, 2022
1 parent cf61e17 commit a25386c
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 6 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ near-rust-allocator-proxy = { version = "0.4", optional = true }
lazy-static-include = "3"
tempfile = "3"
indicatif = "0.15.0"
xz2 = "0.1.6"

near-crypto = { path = "../core/crypto" }
near-primitives = { path = "../core/primitives" }
Expand Down
242 changes: 236 additions & 6 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use near_primitives::time::Clock;
use num_rational::Rational;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tracing::info;
use tracing::{error, info};

use near_chain_configs::{
get_initial_supply, ClientConfig, Genesis, GenesisConfig, GenesisValidationMode,
Expand Down Expand Up @@ -1227,6 +1227,10 @@ pub enum FileDownloadError {
OpenError(#[source] std::io::Error),
#[error("Failed to write to temporary file at {0:?}")]
WriteError(PathBuf, #[source] std::io::Error),
#[error("Failed to decompress XZ stream: {0}")]
XzDecodeError(#[from] xz2::stream::Error),
#[error("Failed to decompress XZ stream: internal error: unexpected status {0:?}")]
XzStatusError(String),
#[error("Failed to rename temporary file {0:?} to {1:?}")]
RenameError(PathBuf, PathBuf, #[source] std::io::Error),
#[error("Invalid URI")]
Expand All @@ -1235,13 +1239,240 @@ pub enum FileDownloadError {
RemoveTemporaryFileError(std::io::Error, #[source] Box<FileDownloadError>),
}

/// Object which allows transparent XZ decoding when saving data to a file.
/// It automatically detects whether the data being read is compressed by
/// looking at the magic at the beginning of the file.
struct AutoXzDecoder<'a> {
path: &'a std::path::Path,
file: tokio::fs::File,
state: AutoXzState,
}

/// State in which of the AutoXzDecoder
enum AutoXzState {
/// Given number of bytes have been read so far and all of them match bytes
/// in [`XZ_HEADER_MAGIC`]. The object starts in `Probing(0)` state and the
/// number never reaches the length of the [`XZ_HEADER_MAGIC`] buffer.
Probing(usize),

/// The header did not match XZ stream header and thus the data is passed
/// through.
PlainText,

/// The header did match XZ stream header and thus the data is being
/// decompressed.
Compressed(xz2::stream::Stream, Box<[u8]>),
}

/// Header that every XZ streams starts with. See
/// <https://tukaani.org/xz/xz-file-format-1.0.4.txt> § 2.1.1.1.
static XZ_HEADER_MAGIC: [u8; 6] = [0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00];

impl<'a> AutoXzDecoder<'a> {
fn new(path: &'a std::path::Path, file: tokio::fs::File) -> Self {
Self { path, file: file, state: AutoXzState::Probing(0) }
}

/// Writes data from the chunk to the output file automatically
/// decompressing it if the stream is XZ-compressed. Note that once all the
/// data has been written [`finish`] function must be called to flush
/// internal buffers.
async fn write_all(&mut self, chunk: &[u8]) -> Result<(), FileDownloadError> {
if let Some(len) = self.probe(chunk) {
if len != 0 {
self.write_all_impl(&XZ_HEADER_MAGIC[..len]).await?;
}
self.write_all_impl(&chunk).await?;
}
Ok(())
}

/// Flushes all internal buffers and closes the output file.
async fn finish(mut self) -> Result<(), FileDownloadError> {
match self.state {
AutoXzState::Probing(pos) => self.write_all_raw(&XZ_HEADER_MAGIC[..pos]).await?,
AutoXzState::PlainText => (),
AutoXzState::Compressed(ref mut stream, ref mut buffer) => {
Self::decompress(self.path, &mut self.file, stream, buffer, b"").await?
}
}
self.file
.flush()
.await
.map_err(|e| FileDownloadError::WriteError(self.path.to_path_buf(), e))
}

/// If object is still in `Probing` state, read more data from the input to
/// determine whether it’s XZ stream or not. Updates `state` accordingly.
/// If probing succeeded, returns number of bytes from XZ header magic that
/// need to be processed before `chunk` is processed. If the entire data
/// from `chunk` has been processed and it should be discarded by the
/// caller, returns `None`.
fn probe(&mut self, chunk: &[u8]) -> Option<usize> {
if chunk.is_empty() {
None
} else if let AutoXzState::Probing(pos) = self.state {
let len = std::cmp::min(XZ_HEADER_MAGIC.len() - pos, chunk.len());
if XZ_HEADER_MAGIC[pos..(pos + len)] != chunk[..len] {
self.state = AutoXzState::PlainText;
Some(pos)
} else if pos + len == XZ_HEADER_MAGIC.len() {
let stream = xz2::stream::Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
// TODO(mina86): Once ‘new_uninit’ feature gets stabilised
// replaced buffer initialisation by:
// let buffer = Box::new_uninit_slice(64 << 10);
// let buffer = unsafe { buffer.assume_init() };
let buffer = vec![0u8; 64 << 10].into_boxed_slice();
self.state = AutoXzState::Compressed(stream, buffer);
Some(pos)
} else {
self.state = AutoXzState::Probing(pos + len);
None
}
} else {
Some(0)
}
}

/// Writes data to the output file. Panics if the object is still in
/// probing stage.
async fn write_all_impl(&mut self, chunk: &[u8]) -> Result<(), FileDownloadError> {
match self.state {
AutoXzState::Probing(_) => unreachable!(),
AutoXzState::PlainText => self.write_all_raw(chunk).await,
AutoXzState::Compressed(ref mut stream, ref mut buffer) => {
Self::decompress(self.path, &mut self.file, stream, buffer, chunk).await
}
}
}

/// Writes data to output file directly.
async fn write_all_raw(&mut self, chunk: &[u8]) -> Result<(), FileDownloadError> {
self.file
.write_all(chunk)
.await
.map_err(|e| FileDownloadError::WriteError(self.path.to_path_buf(), e))
}

/// Internal implementation for [`write_all`] and [`finish`] methods used
/// when performing decompression. Calling it with an empty `chunk`
/// indicates the end of the compressed data.
async fn decompress(
path: &std::path::Path,
file: &mut tokio::fs::File,
stream: &mut xz2::stream::Stream,
buffer: &mut [u8],
mut chunk: &[u8],
) -> Result<(), FileDownloadError> {
let action =
if chunk.is_empty() { xz2::stream::Action::Finish } else { xz2::stream::Action::Run };
loop {
let total_in = stream.total_in();
let total_out = stream.total_out();
let status = stream.process(chunk, buffer, action)?;
match status {
xz2::stream::Status::Ok => (),
xz2::stream::Status::StreamEnd => (),
status => {
let status = format!("{:?}", status);
error!(target: "near", "Got unexpected status ‘{}’ when decompressing downloaded file.", status);
return Err(FileDownloadError::XzStatusError(status));
}
};
let read = (stream.total_in() - total_in).try_into().unwrap();
chunk = &chunk[read..];
let out = (stream.total_out() - total_out).try_into().unwrap();
file.write_all(&buffer[..out])
.await
.map_err(|e| FileDownloadError::WriteError(path.to_path_buf(), e))?;
if chunk.is_empty() {
break Ok(());
}
}
}
}

#[cfg(test)]
fn auto_xz_test_write_file(buffer: &[u8], chunk_size: usize) -> Result<Vec<u8>, FileDownloadError> {
let (file, path) = tempfile::NamedTempFile::new().unwrap().into_parts();
let mut out = AutoXzDecoder::new(&path, tokio::fs::File::from_std(file));
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
async move {
for chunk in buffer.chunks(chunk_size) {
out.write_all(chunk).await?;
}
out.finish().await
},
)?;
Ok(std::fs::read(path).unwrap())
}

/// Tests writing plain text of varying lengths through [`AutoXzDecoder`].
/// Includes test cases where prefix of a XZ header is present at the beginning
/// of the stream being written. That tests the object not being fooled by
/// partial prefix.
#[test]
fn test_auto_xz_decode_plain() {
let mut data: [u8; 38] = *b"A quick brow fox jumps over a lazy dog";
// On first iteration we’re testing just a plain text data. On subsequent
// iterations, we’re testing uncompressed data whose first few bytes match
// the XZ header.
for (pos, &ch) in XZ_HEADER_MAGIC.iter().enumerate() {
for len in [0, 1, 2, 3, 4, 5, 6, 10, 20, data.len()] {
let buffer = &data[0..len];
for chunk_size in 1..11 {
let got = auto_xz_test_write_file(&buffer, chunk_size).unwrap();
assert_eq!(got, buffer, "pos={}, len={}, chunk_size={}", pos, len, chunk_size);
}
}
data[pos] = ch;
}
}

/// Tests writing XZ stream through [`AutoXzDecoder`]. The stream should be
/// properly decompressed.
#[test]
fn test_auto_xz_decode_compressed() {
let buffer = b"\xfd\x37\x7a\x58\x5a\x00\x00\x04\xe6\xd6\xb4\x46\
\x02\x00\x21\x01\x1c\x00\x00\x00\x10\xcf\x58\xcc\
\x01\x00\x19\x5a\x61\xc5\xbc\xc3\xb3\xc5\x82\xc4\
\x87\x20\x67\xc4\x99\xc5\x9b\x6c\xc4\x85\x20\x6a\
\x61\xc5\xba\xc5\x84\x00\x00\x00\x89\x4e\xdf\x72\
\x66\xbe\xa9\x51\x00\x01\x32\x1a\x20\x18\x94\x30\
\x1f\xb6\xf3\x7d\x01\x00\x00\x00\x00\x04\x59\x5a";
for chunk_size in 1..11 {
let got = auto_xz_test_write_file(buffer, chunk_size).unwrap();
assert_eq!(got, "Zażółć gęślą jaźń".as_bytes());
}
}

/// Tests [`AutoXzDecoder`]’s handling of corrupt XZ streams. The data being
/// processed starts with a proper XZ header but what follows is an invalid XZ
/// data. This should result in [`FileDownloadError::XzDecodeError`].
#[test]
fn test_auto_xz_decode_corrupted() {
let buffer = b"\xfd\x37\x7a\x58\x5a\x00A quick brown fox";
for chunk_size in 1..11 {
let got = auto_xz_test_write_file(buffer, chunk_size);
assert!(
matches!(got, Err(FileDownloadError::XzDecodeError(xz2::stream::Error::Data))),
"got {:?}",
got
);
}
}

/// Downloads resource at given `uri` and saves it to `file`. On failure,
/// `file` may be left in inconsistent state (i.e. may contain partial data).
///
/// If the downloaded file is an XZ stream (i.e. starts with the XZ 6-byte magic
/// number), transparently decompresses the file as it’s being downloaded.
async fn download_file_impl(
uri: hyper::Uri,
path: &tempfile::TempPath,
mut file: tokio::fs::File,
path: &std::path::Path,
file: tokio::fs::File,
) -> anyhow::Result<(), FileDownloadError> {
let mut out = AutoXzDecoder::new(path, file);
let https_connector = hyper_tls::HttpsConnector::new();
let client = hyper::Client::builder().build::<_, hyper::Body>(https_connector);
let mut resp = client.get(uri).await.map_err(FileDownloadError::HttpError)?;
Expand All @@ -1263,11 +1494,10 @@ async fn download_file_impl(
};
while let Some(next_chunk_result) = resp.data().await {
let next_chunk = next_chunk_result.map_err(FileDownloadError::HttpError)?;
file.write_all(next_chunk.as_ref())
.await
.map_err(|e| FileDownloadError::WriteError(path.to_path_buf(), e))?;
out.write_all(next_chunk.as_ref()).await?;
bar.inc(next_chunk.len() as u64);
}
out.finish().await?;
bar.finish();
Ok(())
}
Expand Down

0 comments on commit a25386c

Please sign in to comment.