diff --git a/Cargo.lock b/Cargo.lock index b05be05030e..810009a43ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2305,6 +2305,17 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "lzma-sys" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb4b7c3eddad11d3af9e86c487607d2d2442d185d848575365c4856ba96d619" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "mach" version = "0.3.2" @@ -3246,6 +3257,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "xz2", ] [[package]] @@ -5997,6 +6009,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" +[[package]] +name = "xz2" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c179869f34fc7c01830d3ce7ea2086bc3a07e0d35289b667d0a8bf910258926c" +dependencies = [ + "lzma-sys", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index 75c3eea585a..fadf10c6d49 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -35,6 +35,7 @@ near-rust-allocator-proxy = { version = "0.4", optional = true } lazy-static-include = "3" tempfile = "3" indicatif = "0.15.0" +xz2 = "0.1.6" near-crypto = { path = "../core/crypto" } near-primitives = { path = "../core/primitives" } diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 5dbedbb28a0..9c2909f5190 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -12,7 +12,7 @@ use near_primitives::time::Clock; use num_rational::Rational; use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; -use tracing::info; +use tracing::{error, info}; use near_chain_configs::{ get_initial_supply, ClientConfig, Genesis, GenesisConfig, GenesisValidationMode, @@ -1227,6 +1227,10 @@ pub enum FileDownloadError { OpenError(#[source] std::io::Error), #[error("Failed to write to temporary file at {0:?}")] WriteError(PathBuf, #[source] std::io::Error), + #[error("Failed to decompress XZ stream: {0}")] + XzDecodeError(#[from] xz2::stream::Error), + #[error("Failed to decompress XZ stream: internal error: unexpected status {0:?}")] + XzStatusError(String), #[error("Failed to rename temporary file {0:?} to {1:?}")] RenameError(PathBuf, PathBuf, #[source] std::io::Error), #[error("Invalid URI")] @@ -1235,13 +1239,240 @@ pub enum FileDownloadError { RemoveTemporaryFileError(std::io::Error, #[source] Box), } +/// Object which allows transparent XZ decoding when saving data to a file. +/// It automatically detects whether the data being read is compressed by +/// looking at the magic at the beginning of the file. +struct AutoXzDecoder<'a> { + path: &'a std::path::Path, + file: tokio::fs::File, + state: AutoXzState, +} + +/// State in which of the AutoXzDecoder +enum AutoXzState { + /// Given number of bytes have been read so far and all of them match bytes + /// in [`XZ_HEADER_MAGIC`]. The object starts in `Probing(0)` state and the + /// number never reaches the length of the [`XZ_HEADER_MAGIC`] buffer. + Probing(usize), + + /// The header did not match XZ stream header and thus the data is passed + /// through. + PlainText, + + /// The header did match XZ stream header and thus the data is being + /// decompressed. + Compressed(xz2::stream::Stream, Box<[u8]>), +} + +/// Header that every XZ streams starts with. See +/// § 2.1.1.1. +static XZ_HEADER_MAGIC: [u8; 6] = [0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00]; + +impl<'a> AutoXzDecoder<'a> { + fn new(path: &'a std::path::Path, file: tokio::fs::File) -> Self { + Self { path, file: file, state: AutoXzState::Probing(0) } + } + + /// Writes data from the chunk to the output file automatically + /// decompressing it if the stream is XZ-compressed. Note that once all the + /// data has been written [`finish`] function must be called to flush + /// internal buffers. + async fn write_all(&mut self, chunk: &[u8]) -> Result<(), FileDownloadError> { + if let Some(len) = self.probe(chunk) { + if len != 0 { + self.write_all_impl(&XZ_HEADER_MAGIC[..len]).await?; + } + self.write_all_impl(&chunk).await?; + } + Ok(()) + } + + /// Flushes all internal buffers and closes the output file. + async fn finish(mut self) -> Result<(), FileDownloadError> { + match self.state { + AutoXzState::Probing(pos) => self.write_all_raw(&XZ_HEADER_MAGIC[..pos]).await?, + AutoXzState::PlainText => (), + AutoXzState::Compressed(ref mut stream, ref mut buffer) => { + Self::decompress(self.path, &mut self.file, stream, buffer, b"").await? + } + } + self.file + .flush() + .await + .map_err(|e| FileDownloadError::WriteError(self.path.to_path_buf(), e)) + } + + /// If object is still in `Probing` state, read more data from the input to + /// determine whether it’s XZ stream or not. Updates `state` accordingly. + /// If probing succeeded, returns number of bytes from XZ header magic that + /// need to be processed before `chunk` is processed. If the entire data + /// from `chunk` has been processed and it should be discarded by the + /// caller, returns `None`. + fn probe(&mut self, chunk: &[u8]) -> Option { + if chunk.is_empty() { + None + } else if let AutoXzState::Probing(pos) = self.state { + let len = std::cmp::min(XZ_HEADER_MAGIC.len() - pos, chunk.len()); + if XZ_HEADER_MAGIC[pos..(pos + len)] != chunk[..len] { + self.state = AutoXzState::PlainText; + Some(pos) + } else if pos + len == XZ_HEADER_MAGIC.len() { + let stream = xz2::stream::Stream::new_stream_decoder(u64::max_value(), 0).unwrap(); + // TODO(mina86): Once ‘new_uninit’ feature gets stabilised + // replaced buffer initialisation by: + // let buffer = Box::new_uninit_slice(64 << 10); + // let buffer = unsafe { buffer.assume_init() }; + let buffer = vec![0u8; 64 << 10].into_boxed_slice(); + self.state = AutoXzState::Compressed(stream, buffer); + Some(pos) + } else { + self.state = AutoXzState::Probing(pos + len); + None + } + } else { + Some(0) + } + } + + /// Writes data to the output file. Panics if the object is still in + /// probing stage. + async fn write_all_impl(&mut self, chunk: &[u8]) -> Result<(), FileDownloadError> { + match self.state { + AutoXzState::Probing(_) => unreachable!(), + AutoXzState::PlainText => self.write_all_raw(chunk).await, + AutoXzState::Compressed(ref mut stream, ref mut buffer) => { + Self::decompress(self.path, &mut self.file, stream, buffer, chunk).await + } + } + } + + /// Writes data to output file directly. + async fn write_all_raw(&mut self, chunk: &[u8]) -> Result<(), FileDownloadError> { + self.file + .write_all(chunk) + .await + .map_err(|e| FileDownloadError::WriteError(self.path.to_path_buf(), e)) + } + + /// Internal implementation for [`write_all`] and [`finish`] methods used + /// when performing decompression. Calling it with an empty `chunk` + /// indicates the end of the compressed data. + async fn decompress( + path: &std::path::Path, + file: &mut tokio::fs::File, + stream: &mut xz2::stream::Stream, + buffer: &mut [u8], + mut chunk: &[u8], + ) -> Result<(), FileDownloadError> { + let action = + if chunk.is_empty() { xz2::stream::Action::Finish } else { xz2::stream::Action::Run }; + loop { + let total_in = stream.total_in(); + let total_out = stream.total_out(); + let status = stream.process(chunk, buffer, action)?; + match status { + xz2::stream::Status::Ok => (), + xz2::stream::Status::StreamEnd => (), + status => { + let status = format!("{:?}", status); + error!(target: "near", "Got unexpected status ‘{}’ when decompressing downloaded file.", status); + return Err(FileDownloadError::XzStatusError(status)); + } + }; + let read = (stream.total_in() - total_in).try_into().unwrap(); + chunk = &chunk[read..]; + let out = (stream.total_out() - total_out).try_into().unwrap(); + file.write_all(&buffer[..out]) + .await + .map_err(|e| FileDownloadError::WriteError(path.to_path_buf(), e))?; + if chunk.is_empty() { + break Ok(()); + } + } + } +} + +#[cfg(test)] +fn auto_xz_test_write_file(buffer: &[u8], chunk_size: usize) -> Result, FileDownloadError> { + let (file, path) = tempfile::NamedTempFile::new().unwrap().into_parts(); + let mut out = AutoXzDecoder::new(&path, tokio::fs::File::from_std(file)); + tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( + async move { + for chunk in buffer.chunks(chunk_size) { + out.write_all(chunk).await?; + } + out.finish().await + }, + )?; + Ok(std::fs::read(path).unwrap()) +} + +/// Tests writing plain text of varying lengths through [`AutoXzDecoder`]. +/// Includes test cases where prefix of a XZ header is present at the beginning +/// of the stream being written. That tests the object not being fooled by +/// partial prefix. +#[test] +fn test_auto_xz_decode_plain() { + let mut data: [u8; 38] = *b"A quick brow fox jumps over a lazy dog"; + // On first iteration we’re testing just a plain text data. On subsequent + // iterations, we’re testing uncompressed data whose first few bytes match + // the XZ header. + for (pos, &ch) in XZ_HEADER_MAGIC.iter().enumerate() { + for len in [0, 1, 2, 3, 4, 5, 6, 10, 20, data.len()] { + let buffer = &data[0..len]; + for chunk_size in 1..11 { + let got = auto_xz_test_write_file(&buffer, chunk_size).unwrap(); + assert_eq!(got, buffer, "pos={}, len={}, chunk_size={}", pos, len, chunk_size); + } + } + data[pos] = ch; + } +} + +/// Tests writing XZ stream through [`AutoXzDecoder`]. The stream should be +/// properly decompressed. +#[test] +fn test_auto_xz_decode_compressed() { + let buffer = b"\xfd\x37\x7a\x58\x5a\x00\x00\x04\xe6\xd6\xb4\x46\ + \x02\x00\x21\x01\x1c\x00\x00\x00\x10\xcf\x58\xcc\ + \x01\x00\x19\x5a\x61\xc5\xbc\xc3\xb3\xc5\x82\xc4\ + \x87\x20\x67\xc4\x99\xc5\x9b\x6c\xc4\x85\x20\x6a\ + \x61\xc5\xba\xc5\x84\x00\x00\x00\x89\x4e\xdf\x72\ + \x66\xbe\xa9\x51\x00\x01\x32\x1a\x20\x18\x94\x30\ + \x1f\xb6\xf3\x7d\x01\x00\x00\x00\x00\x04\x59\x5a"; + for chunk_size in 1..11 { + let got = auto_xz_test_write_file(buffer, chunk_size).unwrap(); + assert_eq!(got, "Zażółć gęślą jaźń".as_bytes()); + } +} + +/// Tests [`AutoXzDecoder`]’s handling of corrupt XZ streams. The data being +/// processed starts with a proper XZ header but what follows is an invalid XZ +/// data. This should result in [`FileDownloadError::XzDecodeError`]. +#[test] +fn test_auto_xz_decode_corrupted() { + let buffer = b"\xfd\x37\x7a\x58\x5a\x00A quick brown fox"; + for chunk_size in 1..11 { + let got = auto_xz_test_write_file(buffer, chunk_size); + assert!( + matches!(got, Err(FileDownloadError::XzDecodeError(xz2::stream::Error::Data))), + "got {:?}", + got + ); + } +} + /// Downloads resource at given `uri` and saves it to `file`. On failure, /// `file` may be left in inconsistent state (i.e. may contain partial data). +/// +/// If the downloaded file is an XZ stream (i.e. starts with the XZ 6-byte magic +/// number), transparently decompresses the file as it’s being downloaded. async fn download_file_impl( uri: hyper::Uri, - path: &tempfile::TempPath, - mut file: tokio::fs::File, + path: &std::path::Path, + file: tokio::fs::File, ) -> anyhow::Result<(), FileDownloadError> { + let mut out = AutoXzDecoder::new(path, file); let https_connector = hyper_tls::HttpsConnector::new(); let client = hyper::Client::builder().build::<_, hyper::Body>(https_connector); let mut resp = client.get(uri).await.map_err(FileDownloadError::HttpError)?; @@ -1263,11 +1494,10 @@ async fn download_file_impl( }; while let Some(next_chunk_result) = resp.data().await { let next_chunk = next_chunk_result.map_err(FileDownloadError::HttpError)?; - file.write_all(next_chunk.as_ref()) - .await - .map_err(|e| FileDownloadError::WriteError(path.to_path_buf(), e))?; + out.write_all(next_chunk.as_ref()).await?; bar.inc(next_chunk.len() as u64); } + out.finish().await?; bar.finish(); Ok(()) }