Skip to content

Commit

Permalink
GC trie
Browse files Browse the repository at this point in the history
  • Loading branch information
Kouprin committed Feb 5, 2020
1 parent 772bb73 commit dbbb295
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 60 deletions.
10 changes: 5 additions & 5 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use near_primitives::transaction::{
};
use near_primitives::types::{
AccountId, Balance, BlockExtra, BlockHeight, BlockHeightDelta, ChunkExtra, EpochId, Gas,
NumBlocks, ShardId, ValidatorStake,
NumBlocks, ShardId, StateHeaderKey, ValidatorStake,
};
use near_primitives::unwrap_or_return;
use near_primitives::views::{
ExecutionOutcomeWithIdView, ExecutionStatusView, FinalExecutionOutcomeView,
FinalExecutionStatus, LightClientBlockView,
};
use near_store::{ColStateHeaders, ColStateParts, Store};
use near_store::{ColStateHeaders, ColStateParts, Store, Trie};

use crate::error::{Error, ErrorKind};
use crate::finality::{ApprovalVerificationError, FinalityGadget, FinalityGadgetQuorums};
Expand All @@ -39,7 +39,7 @@ use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, ShardInfo, St
use crate::types::{
AcceptedBlock, ApplyTransactionResult, Block, BlockHeader, BlockStatus, Provenance,
ReceiptList, ReceiptProofResponse, ReceiptResponse, RootProof, RuntimeAdapter,
ShardStateSyncResponseHeader, StateHeaderKey, StatePartKey, Tip,
ShardStateSyncResponseHeader, StatePartKey, Tip,
};
use crate::validate::{
validate_challenge, validate_chunk_proofs, validate_chunk_transactions,
Expand Down Expand Up @@ -505,14 +505,14 @@ impl Chain {
});
}

pub fn clear_old_data(&mut self) -> Result<(), Error> {
pub fn clear_old_data(&mut self, trie: Arc<Trie>) -> Result<(), Error> {
let mut chain_store_update = self.store.store_update();
let head = chain_store_update.head()?;
let height_diff = NUM_EPOCHS_TO_KEEP_STORE_DATA * self.epoch_length;
if head.height >= height_diff {
let last_height = head.height - height_diff;
for height in last_height.saturating_sub(HEIGHTS_TO_CLEAR)..last_height {
match chain_store_update.clear_old_data_on_height(height) {
match chain_store_update.clear_old_data_on_height(trie.clone(), height) {
Ok(_) => {}
Err(err) => {
error!(target: "client", "Error clearing old data on height {:?}, {:?}", height, err);
Expand Down
103 changes: 73 additions & 30 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use near_primitives::transaction::{
};
use near_primitives::types::{
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, ShardId, StateChangeCause,
StateChanges, StateChangesRequest,
StateChanges, StateChangesRequest, StateHeaderKey,
};
use near_primitives::utils::{index_to_bytes, to_timestamp};
use near_store::{
Expand All @@ -28,8 +28,9 @@ use near_store::{
ColChunkPerHeightShard, ColChunks, ColEpochLightClientBlocks, ColIncomingReceipts,
ColInvalidChunks, ColKeyValueChanges, ColLastApprovalPerAccount, ColLastBlockWithNewChunk,
ColMyLastApprovalsPerChain, ColNextBlockHashes, ColNextBlockWithNewChunk, ColOutgoingReceipts,
ColPartialChunks, ColReceiptIdToShardId, ColStateDlInfos, ColTransactionResult,
ColTransactions, Store, StoreUpdate, WrappedTrieChanges,
ColPartialChunks, ColReceiptIdToShardId, ColStateDlInfos, ColStateHeaders,
ColTransactionResult, ColTransactions, ColTrieChanges, Store, StoreUpdate, Trie, TrieChanges,
WrappedTrieChanges,
};

use crate::byzantine_assert;
Expand Down Expand Up @@ -1734,14 +1735,47 @@ impl<'a> ChainStoreUpdate<'a> {
.insert((height, shard_id), chunk_hash);
}

pub fn clear_old_data_on_height(&mut self, height: BlockHeight) -> Result<(), Error> {
pub fn clear_old_data_on_height(
&mut self,
trie: Arc<Trie>,
height: BlockHeight,
) -> Result<(), Error> {
let mut store_update = self.store().store_update();
let blocks_current_height = match self.get_all_block_hashes_by_height(height) {
Ok(blocks_current_height) => {
blocks_current_height.values().flatten().cloned().collect()
}
_ => vec![],
};

// 1. Trie cleaning
if let Ok(canonical_hash) = self.get_block_hash_by_height(height) {
for block_hash in blocks_current_height.iter() {
if *block_hash == canonical_hash {
// Block on canonical chain must be processed after all
continue;
}
// 1a. Apply revert insertions from ColTrieChanges, cleaning forks
self.store()
.get_ser(ColTrieChanges, block_hash.as_ref())?
.map(|trie_changes: TrieChanges| {
trie_changes
.revert_insertions_into(trie.clone(), &mut store_update)
.map_err(|err| ErrorKind::Other(err.to_string()))
})
.unwrap_or_else(|| Ok(()))?;
}
// 1b. Apply deletions from ColTrieChanges, cleaning main chain
self.store()
.get_ser(ColTrieChanges, canonical_hash.as_ref())?
.map(|trie_changes: TrieChanges| {
trie_changes
.deletions_into(trie.clone(), &mut store_update)
.map_err(|err| ErrorKind::Other(err.to_string()))
})
.unwrap_or_else(|| Ok(()))?;
}

for block_hash in blocks_current_height {
let block = match self.get_block(&block_hash) {
Ok(block) => block.clone(),
Expand All @@ -1751,93 +1785,99 @@ impl<'a> ChainStoreUpdate<'a> {
}
};

// 1. Delete shard_id-indexed data (shards, receipts, transactions)
// 2. Delete shard_id-indexed data (shards, receipts, transactions)
for shard_id in 0..block.header.inner_rest.chunk_mask.len() {
let shard_id = shard_id as ShardId;
// 1a. Delete outgoing receipts (ColOutgoingReceipts)
// 2a. Delete outgoing receipts (ColOutgoingReceipts)
store_update
.delete(ColOutgoingReceipts, &get_block_shard_id(&block_hash, shard_id));
self.chain_store
.outgoing_receipts
.cache_remove(&get_block_shard_id(&block_hash, shard_id));
// 1b. Delete incoming receipts (ColIncomingReceipts)
// 2b. Delete incoming receipts (ColIncomingReceipts)
store_update
.delete(ColIncomingReceipts, &get_block_shard_id(&block_hash, shard_id));
self.chain_store
.incoming_receipts
.cache_remove(&get_block_shard_id(&block_hash, shard_id));
// 1c. Delete from chunk_hash_per_height_shard (ColChunkPerHeightShard)
// 2c. Delete from chunk_hash_per_height_shard (ColChunkPerHeightShard)
store_update.delete(ColChunkPerHeightShard, &get_height_shard_id(height, shard_id));
self.chain_store
.chunk_hash_per_height_shard
.cache_remove(&get_height_shard_id(height, shard_id));
// 1d. Delete from next_block_with_new_chunk (ColNextBlockWithNewChunk)
// 2d. Delete from next_block_with_new_chunk (ColNextBlockWithNewChunk)
store_update
.delete(ColNextBlockWithNewChunk, &get_block_shard_id(&block_hash, shard_id));
self.chain_store
.next_block_with_new_chunk
.cache_remove(&get_block_shard_id(&block_hash, shard_id));
// 2e. Delete from ColStateHeaders
let key = StateHeaderKey(shard_id, block_hash).try_to_vec()?;
store_update.delete(ColStateHeaders, &key);
// 2f. Delete from ColStateParts
// Already done, check chain.clear_downloaded_parts()
}
for chunk_header in block.chunks {
let (receipts, transactions) = match self.get_chunk_clone_from_header(&chunk_header)
{
Ok(chunk) => (chunk.receipts, chunk.transactions),
_ => (vec![], vec![]),
};
// 1e. Delete from receipt_id_to_shard_id (ColReceiptIdToShardId)
// 2e. Delete from receipt_id_to_shard_id (ColReceiptIdToShardId)
for receipt in receipts {
store_update.delete(ColReceiptIdToShardId, receipt.receipt_id.as_ref());
self.chain_store
.receipt_id_to_shard_id
.cache_remove(&receipt.receipt_id.into());
}
// 1f. Delete from ColTransactions
// 2f. Delete from ColTransactions
for transaction in transactions {
store_update.delete(ColTransactions, transaction.get_hash().as_ref());
self.chain_store.transactions.cache_remove(&transaction.get_hash().into());
}

// 2. Delete chunk_hash-indexed data
// 2a. Delete chunks (ColChunks)
// 3. Delete chunk_hash-indexed data
// 3a. Delete chunks (ColChunks)
store_update.delete(ColChunks, chunk_header.hash.as_ref());
self.chain_store.chunks.cache_remove(&chunk_header.hash.clone().into());
// 2b. Delete chunk extras (ColChunkExtra)
// 3b. Delete chunk extras (ColChunkExtra)
store_update.delete(ColChunkExtra, chunk_header.hash.as_ref());
self.chain_store.chunk_extras.cache_remove(&chunk_header.hash.clone().into());
// 2c. Delete partial_chunks (ColPartialChunks)
// 3c. Delete partial_chunks (ColPartialChunks)
store_update.delete(ColPartialChunks, chunk_header.hash.as_ref());
self.chain_store.partial_chunks.cache_remove(&chunk_header.hash.clone().into());
// 2d. Delete invalid chunks (ColInvalidChunks)
// 3d. Delete invalid chunks (ColInvalidChunks)
store_update.delete(ColInvalidChunks, chunk_header.hash.as_ref());
self.chain_store.invalid_chunks.cache_remove(&chunk_header.hash.clone().into());
}

// 3. Delete block_hash-indexed data
// 3a. Delete block (ColBlock) if not genesis
// 4. Delete block_hash-indexed data
// 4a. Delete block (ColBlock) if not genesis
if height > 0 {
store_update.delete(ColBlock, block_hash.as_ref());
self.chain_store.blocks.cache_remove(&block_hash.clone().into());
}
// 3b. Delete block header (ColBlockHeader) - don't do because header sync needs headers
// 3c. Delete block extras (ColBlockExtra)
// 4b. Delete block header (ColBlockHeader) - don't do because header sync needs headers
// 4c. Delete block extras (ColBlockExtra)
store_update.delete(ColBlockExtra, block_hash.as_ref());
self.chain_store.block_extras.cache_remove(&block_hash.clone().into());
// 3d. Delete from next_block_hashes (ColNextBlockHashes)
// 4d. Delete from next_block_hashes (ColNextBlockHashes)
store_update.delete(ColNextBlockHashes, block_hash.as_ref());
self.chain_store.next_block_hashes.cache_remove(&block_hash.clone().into());
// 3e. Delete from my_last_approvals (ColMyLastApprovalsPerChain)
// 4e. Delete from my_last_approvals (ColMyLastApprovalsPerChain)
store_update.delete(ColMyLastApprovalsPerChain, block_hash.as_ref());
self.chain_store.my_last_approvals.cache_remove(&block_hash.clone().into());
// 3f. Delete from ColChallengedBlocks
// 4f. Delete from ColChallengedBlocks
store_update.delete(ColChallengedBlocks, block_hash.as_ref());
// 3g. Delete from ColBlocksToCatchup
// 4g. Delete from ColBlocksToCatchup
store_update.delete(ColBlocksToCatchup, block_hash.as_ref());
}
// 4. Delete height-indexed data
// 4a. Delete blocks with current height (ColBlockPerHeight)

// 5. Delete height-indexed data
// 5a. Delete blocks with current height (ColBlockPerHeight)
store_update.delete(ColBlockPerHeight, &index_to_bytes(height));
self.chain_store.block_hash_per_height.cache_remove(&index_to_bytes(height));
// 4b. Delete from ColBlockHeight if not genesis
// 5b. Delete from ColBlockHeight if not genesis
if height > 0 {
store_update.delete(ColBlockHeight, &index_to_bytes(height));
}
Expand Down Expand Up @@ -2023,7 +2063,8 @@ impl<'a> ChainStoreUpdate<'a> {
trie_changes
.key_value_changes_into(&mut store_update)
.map_err(|err| ErrorKind::Other(err.to_string()))?;
// TODO: save deletions separately for garbage collection.
let (block_hash, trie_changes) = trie_changes.get_trie_changes();
store_update.set_ser(ColTrieChanges, block_hash.as_ref(), &trie_changes)?;
}

let mut affected_catchup_blocks = HashSet::new();
Expand Down Expand Up @@ -2471,8 +2512,9 @@ mod tests {
);
assert!(chain.mut_store().get_next_block_hash(&blocks[5].hash()).is_ok());

let trie = chain.runtime_adapter.get_trie();
let mut store_update = chain.mut_store().store_update();
assert!(store_update.clear_old_data_on_height(5).is_ok());
assert!(store_update.clear_old_data_on_height(trie, 5).is_ok());
store_update.commit().unwrap();

assert!(chain.get_block(&blocks[4].hash()).is_ok());
Expand Down Expand Up @@ -2512,7 +2554,8 @@ mod tests {
store_update.commit().unwrap();

chain.epoch_length = 1;
assert!(chain.clear_old_data().is_ok());
let trie = chain.runtime_adapter.get_trie();
assert!(chain.clear_old_data(trie).is_ok());

assert!(chain.get_block(&blocks[0].hash()).is_ok());
for i in 1..15 {
Expand Down
4 changes: 4 additions & 0 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ impl RuntimeAdapter for KeyValueRuntime {
)
}

fn get_trie(&self) -> Arc<Trie> {
self.trie.clone()
}

fn verify_block_signature(&self, header: &BlockHeader) -> Result<(), Error> {
let validators = &self.validators
[self.get_epoch_and_valset(header.prev_hash).map_err(|err| err.to_string())?.1];
Expand Down
9 changes: 5 additions & 4 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;

use borsh::{BorshDeserialize, BorshSerialize};
use serde::Serialize;
Expand All @@ -20,7 +21,7 @@ use near_primitives::types::{
StateChangesRequest, StateRoot, StateRootNode, ValidatorStake, ValidatorStats,
};
use near_primitives::views::{EpochValidatorInfo, QueryRequest, QueryResponse};
use near_store::{PartialStorage, StoreUpdate, WrappedTrieChanges};
use near_store::{PartialStorage, StoreUpdate, Trie, WrappedTrieChanges};

use crate::error::Error;

Expand All @@ -33,9 +34,6 @@ pub struct ReceiptProofResponse(pub CryptoHash, pub Vec<ReceiptProof>);
#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize, Serialize)]
pub struct RootProof(pub CryptoHash, pub MerklePath);

#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize, Serialize)]
pub struct StateHeaderKey(pub ShardId, pub CryptoHash);

#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize, Serialize)]
pub struct StatePartKey(pub CryptoHash, pub ShardId, pub u64 /* PartId */);

Expand Down Expand Up @@ -116,6 +114,9 @@ pub trait RuntimeAdapter: Send + Sync {
/// StoreUpdate can be discarded if the chain past the genesis.
fn genesis_state(&self) -> (StoreUpdate, Vec<StateRoot>);

/// Returns trie.
fn get_trie(&self) -> Arc<Trie>;

/// Verify block producer validity
fn verify_block_signature(&self, header: &BlockHeader) -> Result<(), Error>;

Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ impl Client {

if status.is_new_head() {
self.shards_mgr.update_largest_seen_height(block.header.inner_lite.height);
if let Err(err) = self.chain.clear_old_data() {
if let Err(err) = self.chain.clear_old_data(self.runtime_adapter.get_trie()) {
error!(target: "client", "Can't clear old data, {:?}", err);
};
}
Expand Down
3 changes: 3 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,6 @@ pub struct BlockChunkValidatorStats {
pub block_stats: ValidatorStats,
pub chunk_stats: ValidatorStats,
}

#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize, Serialize)]
pub struct StateHeaderKey(pub ShardId, pub CryptoHash);
4 changes: 3 additions & 1 deletion core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub enum DBCol {
ColChunkPerHeightShard = 35,
/// Changes to key-values that we have recorded.
ColKeyValueChanges = 36,
ColTrieChanges = 37,
}

impl std::fmt::Display for DBCol {
Expand Down Expand Up @@ -126,12 +127,13 @@ impl std::fmt::Display for DBCol {
Self::ColTransactions => "transactions",
Self::ColChunkPerHeightShard => "hash of chunk per height and shard_id",
Self::ColKeyValueChanges => "key value changes",
Self::ColTrieChanges => "trie changes",
};
write!(formatter, "{}", desc)
}
}

const NUM_COLS: usize = 37;
const NUM_COLS: usize = 38;

pub struct DBTransaction {
pub ops: Vec<DBOp>,
Expand Down
Loading

0 comments on commit dbbb295

Please sign in to comment.