Skip to content

Commit

Permalink
Write pubkey cache to disk after releasing lock
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Nov 18, 2022
1 parent 0435365 commit b9fa166
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 31 deletions.
17 changes: 12 additions & 5 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2638,10 +2638,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// If there are new validators in this block, update our pubkey cache.
//
// We perform this _before_ adding the block to fork choice because the pubkey cache is
// used by attestation processing which will only process an attestation if the block is
// known to fork choice. This ordering ensure that the pubkey cache is always up-to-date.
self.validator_pubkey_cache
// The only keys imported here will be ones for validators deposited in this block, because
// the cache *must* already have been updated for the parent block when it was imported.
// Newly deposited validators are not active and their keys are not required by other parts
// of block processing. The reason we do this here and not after making the block attestable
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
.import_new_pubkeys(&state)?;
Expand Down Expand Up @@ -2773,7 +2778,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutState(block.state_root(), &state));
let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically(ops) {
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);

if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
error!(
self.log,
"Database write failed!";
Expand Down
48 changes: 22 additions & 26 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::{BeaconChainTypes, BeaconStore};
use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use store::{DBColumn, Error as StoreError, StoreItem};
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};

/// Provides a mapping of `validator_index -> validator_publickey`.
Expand All @@ -14,21 +15,17 @@ use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// 2. To reduce the amount of public key _decompression_ required. A `BeaconState` stores public
/// keys in compressed form and they are needed in decompressed form for signature verification.
/// Decompression is expensive when many keys are involved.
///
/// The cache has a `backing` that it uses to maintain a persistent, on-disk
/// copy of itself. This allows it to be restored between process invocations.
pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
store: BeaconStore<T>,
_phantom: PhantomData<T>,
}

impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
/// Create a new public key cache using the keys in `state.validators`.
///
/// Also creates a new persistence file, returning an error if there is already a file at
/// `persistence_path`.
/// The new cache will be updated with the keys from `state` and immediately written to disk.
pub fn new(
state: &BeaconState<T::EthSpec>,
store: BeaconStore<T>,
Expand All @@ -37,10 +34,11 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
store,
_phantom: PhantomData,
};

cache.import_new_pubkeys(state)?;
let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?;

Ok(cache)
}
Expand Down Expand Up @@ -69,55 +67,52 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pubkeys,
indices,
pubkey_bytes,
store,
_phantom: PhantomData,
})
}

/// Scan the given `state` and add any new validator public keys.
///
/// Does not delete any keys from `self` if they don't appear in `state`.
///
/// NOTE: The caller *must* commit the returned I/O batch as part of the block import process.
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<T::EthSpec>,
) -> Result<(), BeaconChainError> {
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> {
if state.validators().len() > self.pubkeys.len() {
self.import(
state.validators()[self.pubkeys.len()..]
.iter()
.map(|v| v.pubkey),
)
} else {
Ok(())
Ok(vec![])
}
}

/// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<(), BeaconChainError>
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError>
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
self.pubkey_bytes.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());

let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey in validator_keys {
let i = self.pubkeys.len();

if self.indices.contains_key(&pubkey) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}

// The item is written to disk _before_ it is written into
// the local struct.
//
// This means that a pubkey cache read from disk will always be equivalent to or
// _later than_ the cache that was running in the previous instance of Lighthouse.
//
// The motivation behind this ordering is that we do not want to have states that
// reference a pubkey that is not in our cache. However, it's fine to have pubkeys
// that are never referenced in a state.
self.store
.put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?;
// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)));

self.pubkeys.push(
(&pubkey)
Expand All @@ -129,7 +124,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
self.indices.insert(pubkey, i);
}

Ok(())
Ok(store_ops)
}

/// Get the public key for a validator with index `i`.
Expand Down Expand Up @@ -296,9 +291,10 @@ mod test {

// Add some more keypairs.
let (state, keypairs) = get_state(12);
cache
let ops = cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]);
drop(cache);

Expand Down

0 comments on commit b9fa166

Please sign in to comment.