Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chain): Improved changes RPC #2148

Merged
merged 8 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde = "1.0"
serde_derive = "1.0"
cached = "0.11.0"
lazy_static = "1.4"
owning_ref = "0.4.0"

borsh = "0.5.0"

Expand Down
143 changes: 106 additions & 37 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use borsh::{BorshDeserialize, BorshSerialize};
use cached::{Cached, SizedCache};
use chrono::Utc;
use owning_ref::OwningRef;
use serde::Serialize;

use near_primitives::block::{Approval, BlockScore};
Expand All @@ -20,8 +21,8 @@ use near_primitives::transaction::{
ExecutionOutcomeWithId, ExecutionOutcomeWithIdAndProof, SignedTransaction,
};
use near_primitives::types::{
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, ShardId, StateChangeCause,
StateChanges, StateChangesRequest,
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, RawStateChangesList,
ShardId, StateChanges, StateChangesExt, StateChangesRequest,
};
use near_primitives::utils::{index_to_bytes, to_timestamp};
use near_primitives::views::LightClientBlockView;
Expand Down Expand Up @@ -917,56 +918,124 @@ impl ChainStoreAccess for ChainStore {
.map_err(|e| e.into())
}

/// Retrieve the key-value changes from the store and decode them appropriately.
///
/// We store different types of data, so we need to take care of all the types. That is, the
/// account data and the access keys are internally-serialized and we have to deserialize those
/// values appropriately. Code and data changes are simple blobs of data, so we return them as
/// base64-encoded blobs.
fn get_key_value_changes(
&self,
block_hash: &CryptoHash,
state_changes_request: &StateChangesRequest,
) -> Result<StateChanges, Error> {
use near_primitives::utils;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the essential changes are in this method.

// We store the trie changes under a compound key: `block_hash + trie_key`, so when we
// query the changes, we reverse the process by splitting the key using simple slicing of an
// array of bytes, essentially, extracting `trie_key`.
//
// Example: data changes are stored under a key:
//
// block_hash + (col::ACCOUNT + account_id + ACCOUNT_DATA_SEPARATOR + user_specified_key)
//
// Thus, to query all the changes by a user-specified key prefix, we do the following:
// 1. Query RocksDB for
// block_hash + (col::ACCOUNT + account_id + ACCOUNT_DATA_SEPARATOR + user_specified_key_prefix)
//
// 2. In the simplest case, to extract the full key we need to slice the RocksDB key by a length of
// block_hash + (col::ACCOUNT + account_id + ACCOUNT_DATA_SEPARATOR)
//
// In this implementation, however, we decoupled this process into two steps:
//
// 2.1. Split off the `block_hash` (using `common_storage_key_prefix_len`), thus we are
// left working with a key that was used in the trie.
// 2.2. Parse the trie key with a relevant KeyFor* implementation to ensure consistency

let mut storage_key = block_hash.as_ref().to_vec();
storage_key.extend(match state_changes_request {
let common_storage_key_prefix_len = storage_key.len();
frol marked this conversation as resolved.
Show resolved Hide resolved

let data_key: Vec<u8> = match state_changes_request {
StateChangesRequest::AccountChanges { account_id } => {
utils::key_for_account(account_id)
}
StateChangesRequest::DataChanges { account_id, key_prefix } => {
utils::key_for_data(account_id, key_prefix)
utils::KeyForAccount::new(account_id).into()
}
StateChangesRequest::SingleAccessKeyChanges { account_id, access_key_pk } => {
utils::key_for_access_key(account_id, access_key_pk)
utils::KeyForAccessKey::new(account_id, access_key_pk).into()
}
StateChangesRequest::AllAccessKeyChanges { account_id } => {
utils::key_for_all_access_keys(account_id)
utils::KeyForAccessKey::get_prefix(account_id).into()
}
StateChangesRequest::CodeChanges { account_id } => utils::key_for_code(account_id),
StateChangesRequest::SinglePostponedReceiptChanges { account_id, data_id } => {
utils::key_for_postponed_receipt_id(account_id, data_id)
StateChangesRequest::CodeChanges { account_id } => {
utils::KeyForCode::new(account_id).into()
}
StateChangesRequest::AllPostponedReceiptChanges { account_id } => {
utils::key_for_all_postponed_receipts(account_id)
StateChangesRequest::DataChanges { account_id, key_prefix } => {
utils::KeyForData::new(account_id, key_prefix.as_ref()).into()
}
});
let common_key_prefix_len = block_hash.as_ref().len()
+ match state_changes_request {
StateChangesRequest::AccountChanges { .. }
| StateChangesRequest::SingleAccessKeyChanges { .. }
| StateChangesRequest::AllAccessKeyChanges { .. }
| StateChangesRequest::CodeChanges { .. }
| StateChangesRequest::SinglePostponedReceiptChanges { .. }
| StateChangesRequest::AllPostponedReceiptChanges { .. } => storage_key.len(),
StateChangesRequest::DataChanges { account_id, .. } => {
utils::key_for_data(account_id, b"").len()
}
};
let mut changes = StateChanges::new();
let changes_iter = self.store.iter_prefix_ser::<Vec<(StateChangeCause, Option<Vec<u8>>)>>(
ColKeyValueChanges,
&storage_key,
);
for change in changes_iter {
let (key, value) = change?;
changes.insert(key[common_key_prefix_len..].to_owned(), value);
}
Ok(changes)
};
storage_key.extend(&data_key);

let mut changes_per_key_prefix = self
.store
.iter_prefix_ser::<RawStateChangesList>(ColKeyValueChanges, &storage_key)
.map(|change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
let key = OwningRef::new(key).map(|key| &key[common_storage_key_prefix_len..]);
Ok((key, state_changes))
});

// It is a lifetime workaround. We cannot return `&mut changes_per_key_prefix.filter_map(...`
// as that is a temporary object created there with `filter_map`, and you cannot leak a
// reference to a temporary object.
let mut changes_per_exact_key;

let changes_per_key: &mut dyn Iterator<Item = _> = match state_changes_request {
// These request types are expected to match the key exactly:
StateChangesRequest::AccountChanges { .. }
| StateChangesRequest::SingleAccessKeyChanges { .. }
| StateChangesRequest::CodeChanges { .. } => {
changes_per_exact_key = changes_per_key_prefix.filter_map(|change| {
let (key, state_changes) = match change {
Ok(change) => change,
error => {
return Some(error);
}
};
if key.len() != data_key.len() {
None
} else {
debug_assert_eq!(key.as_ref(), data_key.as_ref() as &[u8]);
Some(Ok((key, state_changes)))
}
});
&mut changes_per_exact_key
}

StateChangesRequest::AllAccessKeyChanges { .. }
| StateChangesRequest::DataChanges { .. } => &mut changes_per_key_prefix,
};

Ok(match state_changes_request {
StateChangesRequest::AccountChanges { account_id, .. } => {
StateChanges::from_account_changes(changes_per_key, account_id)?
}
StateChangesRequest::SingleAccessKeyChanges { account_id, .. }
| StateChangesRequest::AllAccessKeyChanges { account_id, .. } => {
let access_key_pk = match state_changes_request {
StateChangesRequest::SingleAccessKeyChanges { access_key_pk, .. } => {
Some(access_key_pk)
}
_ => None,
};
StateChanges::from_access_key_changes(changes_per_key, account_id, access_key_pk)?
}
StateChangesRequest::CodeChanges { account_id, .. } => {
StateChanges::from_code_changes(changes_per_key, account_id)?
}
StateChangesRequest::DataChanges { account_id, .. } => {
StateChanges::from_data_changes(changes_per_key, account_id)?
}
})
}
}

Expand Down
24 changes: 13 additions & 11 deletions chain/client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use near_network::PeerInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ChunkHash;
use near_primitives::types::{
AccountId, BlockHeight, BlockId, MaybeBlockId, ShardId, StateChanges, StateChangesRequest,
AccountId, BlockHeight, BlockIdOrFinality, MaybeBlockId, ShardId, StateChangesRequest,
};
use near_primitives::utils::generate_random_string;
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, Finality, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse,
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse, StateChangesView,
};
pub use near_primitives::views::{StatusResponse, StatusSyncInfo};

Expand Down Expand Up @@ -142,9 +142,12 @@ impl SyncStatus {
}

/// Actor message requesting block by id or hash.
pub enum GetBlock {
BlockId(BlockId),
Finality(Finality),
pub struct GetBlock(pub BlockIdOrFinality);

impl GetBlock {
pub fn latest() -> Self {
Self(BlockIdOrFinality::latest())
}
}

impl Message for GetBlock {
Expand All @@ -166,14 +169,13 @@ impl Message for GetChunk {
#[derive(Deserialize, Clone)]
pub struct Query {
pub query_id: String,
pub block_id: MaybeBlockId,
pub block_checkpoint: BlockIdOrFinality,
pub request: QueryRequest,
pub finality: Finality,
}

impl Query {
pub fn new(block_id: MaybeBlockId, request: QueryRequest, finality: Finality) -> Self {
Query { query_id: generate_random_string(10), block_id, request, finality }
pub fn new(block_checkpoint: BlockIdOrFinality, request: QueryRequest) -> Self {
Query { query_id: generate_random_string(10), block_checkpoint, request }
}
}

Expand Down Expand Up @@ -246,5 +248,5 @@ pub struct GetKeyValueChanges {
}

impl Message for GetKeyValueChanges {
type Result = Result<StateChanges, String>;
type Result = Result<StateChangesView, String>;
}
40 changes: 24 additions & 16 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ use near_primitives::block::{BlockHeader, BlockScore, GenesisId};
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::verify_path;
use near_primitives::network::AnnounceAccount;
use near_primitives::types::{AccountId, BlockHeight, BlockId, MaybeBlockId, StateChanges};
use near_primitives::types::{
AccountId, BlockHeight, BlockId, BlockIdOrFinality, Finality, MaybeBlockId,
};
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, FinalExecutionStatus,
Finality, GasPriceView, LightClientBlockView, QueryRequest, QueryResponse,
GasPriceView, LightClientBlockView, QueryRequest, QueryResponse, StateChangesView,
};
use near_store::Store;

Expand Down Expand Up @@ -147,12 +149,16 @@ impl ViewClientActor {
return response.map(Some);
}

let header = match msg.block_id {
Some(BlockId::Height(block_height)) => self.chain.get_header_by_height(block_height),
Some(BlockId::Hash(block_hash)) => self.chain.get_block_header(&block_hash),
None => {
let header = match msg.block_checkpoint {
BlockIdOrFinality::BlockId(BlockId::Height(block_height)) => {
self.chain.get_header_by_height(block_height)
}
BlockIdOrFinality::BlockId(BlockId::Hash(block_hash)) => {
self.chain.get_block_header(&block_hash)
}
BlockIdOrFinality::Finality(ref finality) => {
let block_hash =
self.get_block_hash_by_finality(&msg.finality).map_err(|e| e.to_string())?;
self.get_block_hash_by_finality(&finality).map_err(|e| e.to_string())?;
self.chain.get_block_header(&block_hash)
}
};
Expand Down Expand Up @@ -199,9 +205,8 @@ impl ViewClientActor {
self.network_adapter.do_send(NetworkRequests::Query {
query_id: msg.query_id.clone(),
account_id: validator,
block_id: msg.block_id.clone(),
block_checkpoint: msg.block_checkpoint.clone(),
request: msg.request.clone(),
finality: msg.finality.clone(),
});
}

Expand Down Expand Up @@ -351,16 +356,18 @@ impl Handler<GetBlock> for ViewClientActor {
type Result = Result<BlockView, String>;

fn handle(&mut self, msg: GetBlock, _: &mut Context<Self>) -> Self::Result {
match msg {
GetBlock::Finality(finality) => {
match msg.0 {
BlockIdOrFinality::Finality(finality) => {
let block_hash =
self.get_block_hash_by_finality(&finality).map_err(|e| e.to_string())?;
self.chain.get_block(&block_hash).map(Clone::clone)
}
GetBlock::BlockId(BlockId::Height(height)) => {
BlockIdOrFinality::BlockId(BlockId::Height(height)) => {
self.chain.get_block_by_height(height).map(Clone::clone)
}
GetBlock::BlockId(BlockId::Hash(hash)) => self.chain.get_block(&hash).map(Clone::clone),
BlockIdOrFinality::BlockId(BlockId::Hash(hash)) => {
self.chain.get_block(&hash).map(Clone::clone)
}
}
.and_then(|block| {
self.runtime_adapter
Expand Down Expand Up @@ -443,12 +450,13 @@ impl Handler<GetValidatorInfo> for ViewClientActor {

/// Returns a list of changes in a store for a given block.
impl Handler<GetKeyValueChanges> for ViewClientActor {
type Result = Result<StateChanges, String>;
type Result = Result<StateChangesView, String>;

fn handle(&mut self, msg: GetKeyValueChanges, _: &mut Context<Self>) -> Self::Result {
self.chain
.store()
.get_key_value_changes(&msg.block_hash, &msg.state_changes_request)
.map(|state_changes| state_changes.into_iter().map(Into::into).collect())
.map_err(|e| e.to_string())
}
}
Expand Down Expand Up @@ -545,8 +553,8 @@ impl Handler<NetworkViewClientMessages> for ViewClientActor {
}
NetworkViewClientResponses::NoResponse
}
NetworkViewClientMessages::Query { query_id, block_id, request, finality } => {
let query = Query { query_id: query_id.clone(), block_id, request, finality };
NetworkViewClientMessages::Query { query_id, block_checkpoint, request } => {
let query = Query { query_id: query_id.clone(), block_checkpoint, request };
match self.handle_query(query) {
Ok(Some(r)) => {
NetworkViewClientResponses::QueryResponse { query_id, response: Ok(r) }
Expand Down
Loading