Skip to content

Commit

Permalink
feat(chain): Improved changes RPC (#2148)
Browse files Browse the repository at this point in the history
Resolves: #2034 and #2048

The changes RPC API was broken implementation-wise (#2048), and design-wise (#2034).

This version has meaningful API design for all the exposed data, and it is also tested better. This PR is massive since, initially, we missed quite a point of exposing deserialized internal data (like account info, access key used to be returned as a Borsh-serialized blob, which is useless for the API user as they don't have easy access to the schema of those structures).

## Test plan

I did not succeed in writing Rust tests as we used a mocked runtime there. Thus, I had extended end-to-end tests (pytest) with:

* Test for account changes on account creation
* Test for access key changes on account creation and access key removal
* Test for code changes
* Test for several transactions on the same block/chunk
  • Loading branch information
frol authored Mar 10, 2020
1 parent ce75181 commit 6d07a2d
Show file tree
Hide file tree
Showing 48 changed files with 1,792 additions and 1,074 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde = "1.0"
serde_derive = "1.0"
cached = "0.11.0"
lazy_static = "1.4"
owning_ref = "0.4.0"

borsh = "0.5.0"

Expand Down
143 changes: 106 additions & 37 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use borsh::{BorshDeserialize, BorshSerialize};
use cached::{Cached, SizedCache};
use chrono::Utc;
use owning_ref::OwningRef;
use serde::Serialize;

use near_primitives::block::{Approval, BlockScore};
Expand All @@ -20,8 +21,8 @@ use near_primitives::transaction::{
ExecutionOutcomeWithId, ExecutionOutcomeWithIdAndProof, SignedTransaction,
};
use near_primitives::types::{
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, ShardId, StateChangeCause,
StateChanges, StateChangesRequest,
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, RawStateChangesList,
ShardId, StateChanges, StateChangesExt, StateChangesRequest,
};
use near_primitives::utils::{index_to_bytes, to_timestamp};
use near_primitives::views::LightClientBlockView;
Expand Down Expand Up @@ -917,56 +918,124 @@ impl ChainStoreAccess for ChainStore {
.map_err(|e| e.into())
}

/// Retrieve the key-value changes from the store and decode them appropriately.
///
/// We store different types of data, so we need to take care of all the types. That is, the
/// account data and the access keys are internally-serialized and we have to deserialize those
/// values appropriately. Code and data changes are simple blobs of data, so we return them as
/// base64-encoded blobs.
fn get_key_value_changes(
&self,
block_hash: &CryptoHash,
state_changes_request: &StateChangesRequest,
) -> Result<StateChanges, Error> {
use near_primitives::utils;

// We store the trie changes under a compound key: `block_hash + trie_key`, so when we
// query the changes, we reverse the process by splitting the key using simple slicing of an
// array of bytes, essentially, extracting `trie_key`.
//
// Example: data changes are stored under a key:
//
// block_hash + (col::ACCOUNT + account_id + ACCOUNT_DATA_SEPARATOR + user_specified_key)
//
// Thus, to query all the changes by a user-specified key prefix, we do the following:
// 1. Query RocksDB for
// block_hash + (col::ACCOUNT + account_id + ACCOUNT_DATA_SEPARATOR + user_specified_key_prefix)
//
// 2. In the simplest case, to extract the full key we need to slice the RocksDB key by a length of
// block_hash + (col::ACCOUNT + account_id + ACCOUNT_DATA_SEPARATOR)
//
// In this implementation, however, we decoupled this process into two steps:
//
// 2.1. Split off the `block_hash` (using `common_storage_key_prefix_len`), thus we are
// left working with a key that was used in the trie.
// 2.2. Parse the trie key with a relevant KeyFor* implementation to ensure consistency

let mut storage_key = block_hash.as_ref().to_vec();
storage_key.extend(match state_changes_request {
let common_storage_key_prefix_len = storage_key.len();

let data_key: Vec<u8> = match state_changes_request {
StateChangesRequest::AccountChanges { account_id } => {
utils::key_for_account(account_id)
}
StateChangesRequest::DataChanges { account_id, key_prefix } => {
utils::key_for_data(account_id, key_prefix)
utils::KeyForAccount::new(account_id).into()
}
StateChangesRequest::SingleAccessKeyChanges { account_id, access_key_pk } => {
utils::key_for_access_key(account_id, access_key_pk)
utils::KeyForAccessKey::new(account_id, access_key_pk).into()
}
StateChangesRequest::AllAccessKeyChanges { account_id } => {
utils::key_for_all_access_keys(account_id)
utils::KeyForAccessKey::get_prefix(account_id).into()
}
StateChangesRequest::CodeChanges { account_id } => utils::key_for_code(account_id),
StateChangesRequest::SinglePostponedReceiptChanges { account_id, data_id } => {
utils::key_for_postponed_receipt_id(account_id, data_id)
StateChangesRequest::CodeChanges { account_id } => {
utils::KeyForCode::new(account_id).into()
}
StateChangesRequest::AllPostponedReceiptChanges { account_id } => {
utils::key_for_all_postponed_receipts(account_id)
StateChangesRequest::DataChanges { account_id, key_prefix } => {
utils::KeyForData::new(account_id, key_prefix.as_ref()).into()
}
});
let common_key_prefix_len = block_hash.as_ref().len()
+ match state_changes_request {
StateChangesRequest::AccountChanges { .. }
| StateChangesRequest::SingleAccessKeyChanges { .. }
| StateChangesRequest::AllAccessKeyChanges { .. }
| StateChangesRequest::CodeChanges { .. }
| StateChangesRequest::SinglePostponedReceiptChanges { .. }
| StateChangesRequest::AllPostponedReceiptChanges { .. } => storage_key.len(),
StateChangesRequest::DataChanges { account_id, .. } => {
utils::key_for_data(account_id, b"").len()
}
};
let mut changes = StateChanges::new();
let changes_iter = self.store.iter_prefix_ser::<Vec<(StateChangeCause, Option<Vec<u8>>)>>(
ColKeyValueChanges,
&storage_key,
);
for change in changes_iter {
let (key, value) = change?;
changes.insert(key[common_key_prefix_len..].to_owned(), value);
}
Ok(changes)
};
storage_key.extend(&data_key);

let mut changes_per_key_prefix = self
.store
.iter_prefix_ser::<RawStateChangesList>(ColKeyValueChanges, &storage_key)
.map(|change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
let key = OwningRef::new(key).map(|key| &key[common_storage_key_prefix_len..]);
Ok((key, state_changes))
});

// It is a lifetime workaround. We cannot return `&mut changes_per_key_prefix.filter_map(...`
// as that is a temporary object created there with `filter_map`, and you cannot leak a
// reference to a temporary object.
let mut changes_per_exact_key;

let changes_per_key: &mut dyn Iterator<Item = _> = match state_changes_request {
// These request types are expected to match the key exactly:
StateChangesRequest::AccountChanges { .. }
| StateChangesRequest::SingleAccessKeyChanges { .. }
| StateChangesRequest::CodeChanges { .. } => {
changes_per_exact_key = changes_per_key_prefix.filter_map(|change| {
let (key, state_changes) = match change {
Ok(change) => change,
error => {
return Some(error);
}
};
if key.len() != data_key.len() {
None
} else {
debug_assert_eq!(key.as_ref(), data_key.as_ref() as &[u8]);
Some(Ok((key, state_changes)))
}
});
&mut changes_per_exact_key
}

StateChangesRequest::AllAccessKeyChanges { .. }
| StateChangesRequest::DataChanges { .. } => &mut changes_per_key_prefix,
};

Ok(match state_changes_request {
StateChangesRequest::AccountChanges { account_id, .. } => {
StateChanges::from_account_changes(changes_per_key, account_id)?
}
StateChangesRequest::SingleAccessKeyChanges { account_id, .. }
| StateChangesRequest::AllAccessKeyChanges { account_id, .. } => {
let access_key_pk = match state_changes_request {
StateChangesRequest::SingleAccessKeyChanges { access_key_pk, .. } => {
Some(access_key_pk)
}
_ => None,
};
StateChanges::from_access_key_changes(changes_per_key, account_id, access_key_pk)?
}
StateChangesRequest::CodeChanges { account_id, .. } => {
StateChanges::from_code_changes(changes_per_key, account_id)?
}
StateChangesRequest::DataChanges { account_id, .. } => {
StateChanges::from_data_changes(changes_per_key, account_id)?
}
})
}
}

Expand Down
24 changes: 13 additions & 11 deletions chain/client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use near_network::PeerInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ChunkHash;
use near_primitives::types::{
AccountId, BlockHeight, BlockId, MaybeBlockId, ShardId, StateChanges, StateChangesRequest,
AccountId, BlockHeight, BlockIdOrFinality, MaybeBlockId, ShardId, StateChangesRequest,
};
use near_primitives::utils::generate_random_string;
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, Finality, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse,
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse, StateChangesView,
};
pub use near_primitives::views::{StatusResponse, StatusSyncInfo};

Expand Down Expand Up @@ -142,9 +142,12 @@ impl SyncStatus {
}

/// Actor message requesting block by id or hash.
pub enum GetBlock {
BlockId(BlockId),
Finality(Finality),
pub struct GetBlock(pub BlockIdOrFinality);

impl GetBlock {
pub fn latest() -> Self {
Self(BlockIdOrFinality::latest())
}
}

impl Message for GetBlock {
Expand All @@ -166,14 +169,13 @@ impl Message for GetChunk {
#[derive(Deserialize, Clone)]
pub struct Query {
pub query_id: String,
pub block_id: MaybeBlockId,
pub block_id_or_finality: BlockIdOrFinality,
pub request: QueryRequest,
pub finality: Finality,
}

impl Query {
pub fn new(block_id: MaybeBlockId, request: QueryRequest, finality: Finality) -> Self {
Query { query_id: generate_random_string(10), block_id, request, finality }
pub fn new(block_id_or_finality: BlockIdOrFinality, request: QueryRequest) -> Self {
Query { query_id: generate_random_string(10), block_id_or_finality, request }
}
}

Expand Down Expand Up @@ -246,5 +248,5 @@ pub struct GetKeyValueChanges {
}

impl Message for GetKeyValueChanges {
type Result = Result<StateChanges, String>;
type Result = Result<StateChangesView, String>;
}
40 changes: 24 additions & 16 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ use near_primitives::block::{BlockHeader, BlockScore, GenesisId};
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::verify_path;
use near_primitives::network::AnnounceAccount;
use near_primitives::types::{AccountId, BlockHeight, BlockId, MaybeBlockId, StateChanges};
use near_primitives::types::{
AccountId, BlockHeight, BlockId, BlockIdOrFinality, Finality, MaybeBlockId,
};
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, FinalExecutionStatus,
Finality, GasPriceView, LightClientBlockView, QueryRequest, QueryResponse,
GasPriceView, LightClientBlockView, QueryRequest, QueryResponse, StateChangesView,
};
use near_store::Store;

Expand Down Expand Up @@ -147,12 +149,16 @@ impl ViewClientActor {
return response.map(Some);
}

let header = match msg.block_id {
Some(BlockId::Height(block_height)) => self.chain.get_header_by_height(block_height),
Some(BlockId::Hash(block_hash)) => self.chain.get_block_header(&block_hash),
None => {
let header = match msg.block_id_or_finality {
BlockIdOrFinality::BlockId(BlockId::Height(block_height)) => {
self.chain.get_header_by_height(block_height)
}
BlockIdOrFinality::BlockId(BlockId::Hash(block_hash)) => {
self.chain.get_block_header(&block_hash)
}
BlockIdOrFinality::Finality(ref finality) => {
let block_hash =
self.get_block_hash_by_finality(&msg.finality).map_err(|e| e.to_string())?;
self.get_block_hash_by_finality(&finality).map_err(|e| e.to_string())?;
self.chain.get_block_header(&block_hash)
}
};
Expand Down Expand Up @@ -199,9 +205,8 @@ impl ViewClientActor {
self.network_adapter.do_send(NetworkRequests::Query {
query_id: msg.query_id.clone(),
account_id: validator,
block_id: msg.block_id.clone(),
block_id_or_finality: msg.block_id_or_finality.clone(),
request: msg.request.clone(),
finality: msg.finality.clone(),
});
}

Expand Down Expand Up @@ -351,16 +356,18 @@ impl Handler<GetBlock> for ViewClientActor {
type Result = Result<BlockView, String>;

fn handle(&mut self, msg: GetBlock, _: &mut Context<Self>) -> Self::Result {
match msg {
GetBlock::Finality(finality) => {
match msg.0 {
BlockIdOrFinality::Finality(finality) => {
let block_hash =
self.get_block_hash_by_finality(&finality).map_err(|e| e.to_string())?;
self.chain.get_block(&block_hash).map(Clone::clone)
}
GetBlock::BlockId(BlockId::Height(height)) => {
BlockIdOrFinality::BlockId(BlockId::Height(height)) => {
self.chain.get_block_by_height(height).map(Clone::clone)
}
GetBlock::BlockId(BlockId::Hash(hash)) => self.chain.get_block(&hash).map(Clone::clone),
BlockIdOrFinality::BlockId(BlockId::Hash(hash)) => {
self.chain.get_block(&hash).map(Clone::clone)
}
}
.and_then(|block| {
self.runtime_adapter
Expand Down Expand Up @@ -443,12 +450,13 @@ impl Handler<GetValidatorInfo> for ViewClientActor {

/// Returns a list of changes in a store for a given block.
impl Handler<GetKeyValueChanges> for ViewClientActor {
type Result = Result<StateChanges, String>;
type Result = Result<StateChangesView, String>;

fn handle(&mut self, msg: GetKeyValueChanges, _: &mut Context<Self>) -> Self::Result {
self.chain
.store()
.get_key_value_changes(&msg.block_hash, &msg.state_changes_request)
.map(|state_changes| state_changes.into_iter().map(Into::into).collect())
.map_err(|e| e.to_string())
}
}
Expand Down Expand Up @@ -545,8 +553,8 @@ impl Handler<NetworkViewClientMessages> for ViewClientActor {
}
NetworkViewClientResponses::NoResponse
}
NetworkViewClientMessages::Query { query_id, block_id, request, finality } => {
let query = Query { query_id: query_id.clone(), block_id, request, finality };
NetworkViewClientMessages::Query { query_id, block_id_or_finality, request } => {
let query = Query { query_id: query_id.clone(), block_id_or_finality, request };
match self.handle_query(query) {
Ok(Some(r)) => {
NetworkViewClientResponses::QueryResponse { query_id, response: Ok(r) }
Expand Down
Loading

0 comments on commit 6d07a2d

Please sign in to comment.