Skip to content

Commit

Permalink
feat(chain): Improved changes RPC
Browse files Browse the repository at this point in the history
The `changes` RPC API was broken implementation-wise (#2048), and
design-wise (#2034).

This version has meaningful API design for all the exposed data, and it
is also tested better.

 # Test plan

I did not succeed in writing Rust tests as we used a mocked runtime
there. Thus, I had extended end-to-end tests (pytest) with:

* Test for account changes on account creation
* Test for several transactions on the same block/chunk
  • Loading branch information
frol committed Mar 6, 2020
1 parent f127f55 commit f082d36
Show file tree
Hide file tree
Showing 47 changed files with 1,483 additions and 1,053 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

193 changes: 159 additions & 34 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use near_primitives::transaction::{
ExecutionOutcomeWithId, ExecutionOutcomeWithIdAndProof, SignedTransaction,
};
use near_primitives::types::{
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, ShardId, StateChangeCause,
StateChanges, StateChangesRequest,
AccountId, BlockExtra, BlockHeight, ChunkExtra, EpochId, NumBlocks, RawStateChangesList,
ShardId, StateChangeValue, StateChangeWithCause, StateChanges, StateChangesRequest,
};
use near_primitives::utils::{index_to_bytes, to_timestamp};
use near_primitives::utils::{index_to_bytes, to_timestamp, KeyForAccessKey, KeyForData};
use near_primitives::views::LightClientBlockView;
use near_store::{
read_with_cache, ColBlock, ColBlockExtra, ColBlockHeader, ColBlockHeight, ColBlockMisc,
Expand Down Expand Up @@ -923,50 +923,175 @@ impl ChainStoreAccess for ChainStore {
state_changes_request: &StateChangesRequest,
) -> Result<StateChanges, Error> {
use near_primitives::utils;

let mut storage_key = block_hash.as_ref().to_vec();
storage_key.extend(match state_changes_request {
let common_storage_key_prefix_len = storage_key.len();
let data_key: Vec<u8> = match state_changes_request {
StateChangesRequest::AccountChanges { account_id } => {
utils::key_for_account(account_id)
utils::KeyForAccount::new(account_id).into()
}
StateChangesRequest::DataChanges { account_id, key_prefix } => {
utils::key_for_data(account_id, key_prefix)
utils::KeyForData::new(account_id, key_prefix.as_ref()).into()
}
StateChangesRequest::SingleAccessKeyChanges { account_id, access_key_pk } => {
utils::key_for_access_key(account_id, access_key_pk)
utils::KeyForAccessKey::new(account_id, access_key_pk).into()
}
StateChangesRequest::AllAccessKeyChanges { account_id } => {
utils::key_for_all_access_keys(account_id)
utils::KeyForAccessKey::get_prefix(account_id).into()
}
StateChangesRequest::CodeChanges { account_id } => utils::key_for_code(account_id),
StateChangesRequest::SinglePostponedReceiptChanges { account_id, data_id } => {
utils::key_for_postponed_receipt_id(account_id, data_id)
StateChangesRequest::CodeChanges { account_id } => {
utils::KeyForCode::new(account_id).into()
}
StateChangesRequest::AllPostponedReceiptChanges { account_id } => {
utils::key_for_all_postponed_receipts(account_id)
};
storage_key.extend(&data_key);

let mut changes_iter =
self.store.iter_prefix_ser::<RawStateChangesList>(ColKeyValueChanges, &storage_key);
let mut exact_key_changes_iter;
let changes_iter: &mut dyn Iterator<Item = _> = match state_changes_request {
// These request types should have the key matching exactly
StateChangesRequest::AccountChanges { .. }
| StateChangesRequest::SingleAccessKeyChanges { .. }
| StateChangesRequest::CodeChanges { .. } => {
exact_key_changes_iter = changes_iter.filter_map(|change| {
let (key, state_changes) = match change {
Ok(change) => change,
error => {
return Some(error);
}
};
if key.len() != storage_key.len() {
None
} else {
debug_assert_eq!(key, storage_key.as_ref() as &[u8]);
Some(Ok((key, state_changes)))
}
});
&mut exact_key_changes_iter
}
});
let common_key_prefix_len = block_hash.as_ref().len()
+ match state_changes_request {
StateChangesRequest::AccountChanges { .. }
| StateChangesRequest::SingleAccessKeyChanges { .. }
| StateChangesRequest::AllAccessKeyChanges { .. }
| StateChangesRequest::CodeChanges { .. }
| StateChangesRequest::SinglePostponedReceiptChanges { .. }
| StateChangesRequest::AllPostponedReceiptChanges { .. } => storage_key.len(),
StateChangesRequest::DataChanges { account_id, .. } => {
utils::key_for_data(account_id, b"").len()

StateChangesRequest::AllAccessKeyChanges { .. }
| StateChangesRequest::DataChanges { .. } => &mut changes_iter,
};

match state_changes_request {
StateChangesRequest::AccountChanges { account_id, .. } => {
let mut changes = StateChanges::new();

for change in changes_iter {
let (_, state_changes) = change?;
changes.extend(state_changes.into_iter().map(|(cause, state_change)| {
StateChangeWithCause {
cause,
value: if let Some(state_change) = state_change {
StateChangeValue::AccountUpdate {
account_id: account_id.clone(),
account: <_>::try_from_slice(&state_change).expect(
"Failed to parse internally stored account information",
),
}
} else {
StateChangeValue::AccountDeletion { account_id: account_id.clone() }
},
}
}));
}
};
let mut changes = StateChanges::new();
let changes_iter = self.store.iter_prefix_ser::<Vec<(StateChangeCause, Option<Vec<u8>>)>>(
ColKeyValueChanges,
&storage_key,
);
for change in changes_iter {
let (key, value) = change?;
changes.insert(key[common_key_prefix_len..].to_owned(), value);

Ok(changes)
}

StateChangesRequest::SingleAccessKeyChanges { account_id, .. }
| StateChangesRequest::AllAccessKeyChanges { account_id, .. } => {
let mut changes = StateChanges::new();

for change in changes_iter {
let (key, state_changes) = change?;

let access_key_pk = match state_changes_request {
StateChangesRequest::SingleAccessKeyChanges { access_key_pk, .. } => {
access_key_pk.clone()
}
_ => KeyForAccessKey::parse_public_key(
&key[common_storage_key_prefix_len..],
&account_id,
)
.expect("Failed to parse internally stored public key"),
};

changes.extend(state_changes.into_iter().map(|(cause, state_change)| {
StateChangeWithCause {
cause,
value: if let Some(state_change) = state_change {
StateChangeValue::AccessKeyUpdate {
public_key: access_key_pk.clone(),
access_key: <_>::try_from_slice(&state_change)
.expect("Failed to parse internally stored access key"),
}
} else {
StateChangeValue::AccessKeyDeletion {
public_key: access_key_pk.clone(),
}
},
}
}));
}

Ok(changes)
}

StateChangesRequest::CodeChanges { account_id, .. } => {
let mut changes = StateChanges::new();

for change in changes_iter {
let (_, state_changes) = change?;
changes.extend(state_changes.into_iter().map(|(cause, state_change)| {
StateChangeWithCause {
cause,
value: if let Some(state_change) = state_change {
StateChangeValue::CodeUpdate {
account_id: account_id.clone(),
code: state_change.into(),
}
} else {
StateChangeValue::CodeDeletion { account_id: account_id.clone() }
},
}
}));
}

Ok(changes)
}

StateChangesRequest::DataChanges { account_id, .. } => {
let mut changes = StateChanges::new();

for change in changes_iter {
let (key, state_changes) = change?;

let key = KeyForData::parse_data_key(
&key[common_storage_key_prefix_len..],
&account_id,
)
.expect("Failed to parse internally stored data key");

changes.extend(state_changes.into_iter().map(|(cause, state_change)| {
StateChangeWithCause {
cause,
value: if let Some(state_change) = state_change {
StateChangeValue::DataUpdate {
key: key.to_vec().into(),
value: state_change.into(),
}
} else {
StateChangeValue::DataDeletion { key: key.to_vec().into() }
},
}
}));
}

Ok(changes)
}
}
Ok(changes)
}
}

Expand Down
24 changes: 13 additions & 11 deletions chain/client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use near_network::PeerInfo;
use near_primitives::hash::CryptoHash;
use near_primitives::sharding::ChunkHash;
use near_primitives::types::{
AccountId, BlockHeight, BlockId, MaybeBlockId, ShardId, StateChanges, StateChangesRequest,
AccountId, BlockCheckpoint, BlockHeight, MaybeBlockId, ShardId, StateChangesRequest,
};
use near_primitives::utils::generate_random_string;
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, Finality, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse,
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse, StateChangesView,
};
pub use near_primitives::views::{StatusResponse, StatusSyncInfo};

Expand Down Expand Up @@ -142,9 +142,12 @@ impl SyncStatus {
}

/// Actor message requesting block by id or hash.
pub enum GetBlock {
BlockId(BlockId),
Finality(Finality),
pub struct GetBlock(pub BlockCheckpoint);

impl GetBlock {
pub fn latest() -> Self {
Self(BlockCheckpoint::latest())
}
}

impl Message for GetBlock {
Expand All @@ -166,14 +169,13 @@ impl Message for GetChunk {
#[derive(Deserialize, Clone)]
pub struct Query {
pub query_id: String,
pub block_id: MaybeBlockId,
pub block_checkpoint: BlockCheckpoint,
pub request: QueryRequest,
pub finality: Finality,
}

impl Query {
pub fn new(block_id: MaybeBlockId, request: QueryRequest, finality: Finality) -> Self {
Query { query_id: generate_random_string(10), block_id, request, finality }
pub fn new(block_checkpoint: BlockCheckpoint, request: QueryRequest) -> Self {
Query { query_id: generate_random_string(10), block_checkpoint, request }
}
}

Expand Down Expand Up @@ -246,5 +248,5 @@ pub struct GetKeyValueChanges {
}

impl Message for GetKeyValueChanges {
type Result = Result<StateChanges, String>;
type Result = Result<StateChangesView, String>;
}
40 changes: 24 additions & 16 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ use near_primitives::block::{BlockHeader, BlockScore, GenesisId};
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::verify_path;
use near_primitives::network::AnnounceAccount;
use near_primitives::types::{AccountId, BlockHeight, BlockId, MaybeBlockId, StateChanges};
use near_primitives::types::{
AccountId, BlockCheckpoint, BlockHeight, BlockId, Finality, MaybeBlockId,
};
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, FinalExecutionOutcomeView, FinalExecutionStatus,
Finality, GasPriceView, LightClientBlockView, QueryRequest, QueryResponse,
GasPriceView, LightClientBlockView, QueryRequest, QueryResponse, StateChangesView,
};
use near_store::Store;

Expand Down Expand Up @@ -147,12 +149,16 @@ impl ViewClientActor {
return response.map(Some);
}

let header = match msg.block_id {
Some(BlockId::Height(block_height)) => self.chain.get_header_by_height(block_height),
Some(BlockId::Hash(block_hash)) => self.chain.get_block_header(&block_hash),
None => {
let header = match msg.block_checkpoint {
BlockCheckpoint::BlockId(BlockId::Height(block_height)) => {
self.chain.get_header_by_height(block_height)
}
BlockCheckpoint::BlockId(BlockId::Hash(block_hash)) => {
self.chain.get_block_header(&block_hash)
}
BlockCheckpoint::Finality(ref finality) => {
let block_hash =
self.get_block_hash_by_finality(&msg.finality).map_err(|e| e.to_string())?;
self.get_block_hash_by_finality(&finality).map_err(|e| e.to_string())?;
self.chain.get_block_header(&block_hash)
}
};
Expand Down Expand Up @@ -199,9 +205,8 @@ impl ViewClientActor {
self.network_adapter.do_send(NetworkRequests::Query {
query_id: msg.query_id.clone(),
account_id: validator,
block_id: msg.block_id.clone(),
block_checkpoint: msg.block_checkpoint.clone(),
request: msg.request.clone(),
finality: msg.finality.clone(),
});
}

Expand Down Expand Up @@ -351,16 +356,18 @@ impl Handler<GetBlock> for ViewClientActor {
type Result = Result<BlockView, String>;

fn handle(&mut self, msg: GetBlock, _: &mut Context<Self>) -> Self::Result {
match msg {
GetBlock::Finality(finality) => {
match msg.0 {
BlockCheckpoint::Finality(finality) => {
let block_hash =
self.get_block_hash_by_finality(&finality).map_err(|e| e.to_string())?;
self.chain.get_block(&block_hash).map(Clone::clone)
}
GetBlock::BlockId(BlockId::Height(height)) => {
BlockCheckpoint::BlockId(BlockId::Height(height)) => {
self.chain.get_block_by_height(height).map(Clone::clone)
}
GetBlock::BlockId(BlockId::Hash(hash)) => self.chain.get_block(&hash).map(Clone::clone),
BlockCheckpoint::BlockId(BlockId::Hash(hash)) => {
self.chain.get_block(&hash).map(Clone::clone)
}
}
.and_then(|block| {
self.runtime_adapter
Expand Down Expand Up @@ -443,12 +450,13 @@ impl Handler<GetValidatorInfo> for ViewClientActor {

/// Returns a list of changes in a store for a given block.
impl Handler<GetKeyValueChanges> for ViewClientActor {
type Result = Result<StateChanges, String>;
type Result = Result<StateChangesView, String>;

fn handle(&mut self, msg: GetKeyValueChanges, _: &mut Context<Self>) -> Self::Result {
self.chain
.store()
.get_key_value_changes(&msg.block_hash, &msg.state_changes_request)
.map(|state_changes| state_changes.into_iter().map(Into::into).collect())
.map_err(|e| e.to_string())
}
}
Expand Down Expand Up @@ -545,8 +553,8 @@ impl Handler<NetworkViewClientMessages> for ViewClientActor {
}
NetworkViewClientResponses::NoResponse
}
NetworkViewClientMessages::Query { query_id, block_id, request, finality } => {
let query = Query { query_id: query_id.clone(), block_id, request, finality };
NetworkViewClientMessages::Query { query_id, block_checkpoint, request } => {
let query = Query { query_id: query_id.clone(), block_checkpoint, request };
match self.handle_query(query) {
Ok(Some(r)) => {
NetworkViewClientResponses::QueryResponse { query_id, response: Ok(r) }
Expand Down
Loading

0 comments on commit f082d36

Please sign in to comment.