Skip to content

Commit

Permalink
feat: add CARv2 support to CarStream (#5232)
Browse files Browse the repository at this point in the history
Co-authored-by: Hubert <[email protected]>
  • Loading branch information
hanabi1224 and LesnyRumcajs authored Feb 7, 2025
1 parent c44dfeb commit ee7b9b5
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 62 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

- [#4954](https://github.com/ChainSafe/forest/issues/4954) Add `--format json` to `forest-cli chain head` command.

- [#5232](https://github.com/ChainSafe/forest/issues/5232) Support `CARv2` stream decoding.

- [#5230](https://github.com/ChainSafe/forest/issues/5230) Add `CARv2` support to `forest-tool archive` command.

### Changed
Expand Down
43 changes: 33 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ dialoguer = "0.11"
digest = "0.10"
directories = "6"
displaydoc = "0.2"
either = "1"
ethereum-types = { version = "0.15", features = ["ethbloom"] }
ez-jsonrpc-types = "0.5"
fil_actor_account_state = { version = "19" }
Expand Down
4 changes: 2 additions & 2 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn load_actor_bundles_from_path(
.await?;

// Validate the bundle
let roots = HashSet::from_iter(car_stream.header.roots.iter());
let roots = HashSet::from_iter(car_stream.header_v1.roots.iter());
for ActorBundleInfo {
manifest, network, ..
} in ACTOR_BUNDLES.iter().filter(|bundle| {
Expand Down Expand Up @@ -119,7 +119,7 @@ pub async fn load_actor_bundles_from_server(
while let Some(block) = stream.try_next().await? {
db.put_keyed_persistent(&block.cid, &block.data)?;
}
let header = stream.header;
let header = stream.header_v1;
anyhow::ensure!(header.roots.len() == 1);
anyhow::ensure!(header.roots.first() == root);
Ok(*header.roots.first())
Expand Down
2 changes: 1 addition & 1 deletion src/daemon/db_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()>
tokio::fs::File::open(from).await?,
))
.await?;
let roots = car_stream.header.roots.clone();
let roots = car_stream.header_v1.roots.clone();

let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
let frames = crate::db::car::forest::Encoder::compress_stream_default(
Expand Down
67 changes: 52 additions & 15 deletions src/db/car/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,15 @@ where
#[cfg(test)]
mod tests {
use super::PlainCar;
use crate::utils::db::car_util::load_car;
use futures::executor::block_on;
use fvm_ipld_blockstore::{Blockstore as _, MemoryBlockstore};
use crate::utils::db::{
car_stream::{CarStream, CarV1Header},
car_util::load_car,
};
use futures::{executor::block_on, TryStreamExt as _};
use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
use once_cell::sync::Lazy;
use tokio::io::AsyncBufRead;
use std::io::Cursor;
use tokio::io::{AsyncBufRead, AsyncSeek, BufReader};

#[test]
fn test_uncompressed_v1() {
Expand All @@ -500,11 +504,17 @@ mod tests {
assert_eq!(car_backed.roots().len(), 1);
assert_eq!(car_backed.cids().len(), 1222);

let reference = reference(car);
let reference_car = reference(Cursor::new(car));
let reference_car_zst = reference(Cursor::new(chain4_car_zst()));
let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst());
for cid in car_backed.cids() {
let expected = reference.get(&cid).unwrap().unwrap();
let expected = reference_car.get(&cid).unwrap().unwrap();
let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
let actual = car_backed.get(&cid).unwrap().unwrap();
assert_eq!(expected, actual);
assert_eq!(expected2, actual);
assert_eq!(expected3, actual);
}
}

Expand All @@ -517,23 +527,50 @@ mod tests {
assert_eq!(car_backed.roots().len(), 1);
assert_eq!(car_backed.cids().len(), 7153);

// Uncomment below lines once CarStream supports CARv2
// let reference = reference(car);
// for cid in car_backed.cids() {
// let expected = reference.get(&cid).unwrap().unwrap();
// let actual = car_backed.get(&cid).unwrap().unwrap();
// assert_eq!(expected, actual);
// }
let reference_car = reference(Cursor::new(car));
let reference_car_zst = reference(Cursor::new(carv2_car_zst()));
let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst());
for cid in car_backed.cids() {
let expected = reference_car.get(&cid).unwrap().unwrap();
let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
let actual = car_backed.get(&cid).unwrap().unwrap();
assert_eq!(expected, actual);
assert_eq!(expected2, actual);
assert_eq!(expected3, actual);
}
}

fn reference(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore {
let blockstore = MemoryBlockstore::new();
block_on(load_car(&blockstore, reader)).unwrap();
blockstore
}

fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
let blockstore = MemoryBlockstore::new();
block_on(load_car_unsafe(&blockstore, reader)).unwrap();
blockstore
}

pub async fn load_car_unsafe<R>(db: &impl Blockstore, reader: R) -> anyhow::Result<CarV1Header>
where
R: AsyncBufRead + Unpin,
{
let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed(&block.cid, &block.data)?;
}
Ok(stream.header_v1)
}

fn chain4_car_zst() -> &'static [u8] {
include_bytes!("../../../test-snapshots/chain4.car.zst")
}

fn chain4_car() -> &'static [u8] {
include_bytes!("../../../test-snapshots/chain4.car")
static CAR: Lazy<Vec<u8>> = Lazy::new(|| zstd::decode_all(chain4_car_zst()).unwrap());
CAR.as_slice()
}

fn carv2_car_zst() -> &'static [u8] {
Expand Down
13 changes: 8 additions & 5 deletions src/genesis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::path::Path;
use std::{io::Cursor, path::Path};

use crate::blocks::CachingBlockHeader;
use crate::state_manager::StateManager;
use crate::utils::db::car_util::load_car;
use anyhow::Context as _;
use fvm_ipld_blockstore::Blockstore;
use tokio::{fs::File, io::AsyncBufRead, io::BufReader};
use tokio::{
fs::File,
io::{AsyncBufRead, AsyncSeek, BufReader},
};
use tracing::{debug, info};

#[cfg(test)]
Expand All @@ -33,7 +36,7 @@ where
None => {
debug!("No specified genesis in config. Using default genesis.");
let genesis_bytes = genesis_bytes.context("No default genesis.")?;
process_car(genesis_bytes, db).await?
process_car(Cursor::new(genesis_bytes), db).await?
}
};

Expand All @@ -57,7 +60,7 @@ where

async fn process_car<R, BS>(reader: R, db: &BS) -> Result<CachingBlockHeader, anyhow::Error>
where
R: AsyncBufRead + Unpin,
R: AsyncBufRead + AsyncSeek + Unpin,
BS: Blockstore,
{
// Load genesis state into the database and get the Cid
Expand Down Expand Up @@ -98,6 +101,6 @@ mod tests {

async fn load_header_from_car(genesis_bytes: &[u8]) -> CachingBlockHeader {
let db = crate::db::MemoryDB::default();
process_car(genesis_bytes, &db).await.unwrap()
process_car(Cursor::new(genesis_bytes), &db).await.unwrap()
}
}
4 changes: 2 additions & 2 deletions src/libp2p/chain_exchange/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ mod tests {
use crate::shim::address::Address;
use crate::utils::db::car_util::load_car;
use nunny::Vec as NonEmpty;
use std::sync::Arc;
use std::{io::Cursor, sync::Arc};

async fn populate_db() -> (NonEmpty<Cid>, Arc<MemoryDB>) {
let db = Arc::new(MemoryDB::default());
// The cids are the tipset cids of the most recent tipset (39th)
let header = load_car(&db, EXPORT_SR_40).await.unwrap();
let header = load_car(&db, Cursor::new(EXPORT_SR_40)).await.unwrap();
(header.roots, db)
}

Expand Down
10 changes: 5 additions & 5 deletions src/networks/actors_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> {

let bytes = std::fs::read(&result.path)?;
let car = CarStream::new(Cursor::new(bytes)).await?;
ensure!(car.header.version == 1);
ensure!(car.header.roots.len() == 1);
ensure!(car.header.roots.first() == root);
ensure!(car.header_v1.version == 1);
ensure!(car.header_v1.roots.len() == 1);
ensure!(car.header_v1.roots.first() == root);
anyhow::Ok((*root, car.try_collect::<Vec<_>>().await?))
},
))
Expand Down Expand Up @@ -291,11 +291,11 @@ mod tests {
let car_secondary = CarStream::new(Cursor::new(alt)).await?;

assert_eq!(
car_primary.header.roots, car_secondary.header.roots,
car_primary.header_v1.roots, car_secondary.header_v1.roots,
"Roots for {url} and {alt_url} do not match"
);
assert_eq!(
car_primary.header.roots.first(),
car_primary.header_v1.roots.first(),
manifest,
"Manifest for {url} and {alt_url} does not match"
);
Expand Down
2 changes: 1 addition & 1 deletion src/tool/subcommands/benchmark_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async fn benchmark_forest_encoding(

let mut block_stream = CarStream::new(file).await?;
let roots = std::mem::replace(
&mut block_stream.header.roots,
&mut block_stream.header_v1.roots,
nunny::vec![Default::default()],
);

Expand Down
2 changes: 1 addition & 1 deletion src/tool/subcommands/car_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl CarCommands {
let all_roots = NonEmpty::new(
car_streams
.iter()
.flat_map(|it| it.header.roots.iter())
.flat_map(|it| it.header_v1.roots.iter())
.unique()
.cloned()
.collect_vec(),
Expand Down
2 changes: 1 addition & 1 deletion src/tool/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl SnapshotCommands {

let mut block_stream = CarStream::new(file).await?;
let roots = std::mem::replace(
&mut block_stream.header.roots,
&mut block_stream.header_v1.roots,
nunny::vec![Default::default()],
);

Expand Down
Loading

0 comments on commit ee7b9b5

Please sign in to comment.