Skip to content

Commit

Permalink
Implement inefficient long polling
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Dec 14, 2022
1 parent 16714cd commit 99f244f
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 58 deletions.
133 changes: 103 additions & 30 deletions zebra-rpc/src/methods/get_block_template_rpcs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! RPC methods related to mining only available with `getblocktemplate-rpcs` rust feature.
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use futures::{FutureExt, TryFutureExt};
use futures::{future::OptionFuture, FutureExt, TryFutureExt};
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tower::{buffer::Buffer, Service, ServiceExt};
Expand Down Expand Up @@ -333,40 +333,113 @@ where
// These checks always have the same result during long polling.
let miner_address = check_miner_address(miner_address)?;

let mut client_long_poll_id = None;
if let Some(parameters) = parameters {
check_block_template_parameters(parameters)?;
check_block_template_parameters(&parameters)?;

client_long_poll_id = parameters.long_poll_id;
}

// - Checks and fetches that can change during long polling

// Check if we are synced to the tip.
// The result of this check can change during long polling.
check_synced_to_tip(network, latest_chain_tip, sync_status)?;

// Fetch the state data and local time for the block template:
// - if the tip block hash changes, we must return from long polling,
// - if the local clock changes on testnet, we might return from long polling
//
// We also return after 90 minutes on mainnet, even if we have the same response.
let chain_tip_and_local_time = fetch_state_tip_and_local_time(state).await?;
// Set up the loop.
let mut max_time_reached = false;

// The loop returns the server long poll ID,
// which should be different to the client long poll ID.
let (server_long_poll_id, chain_tip_and_local_time, mempool_txs) = loop {
// Check if we are synced to the tip.
// The result of this check can change during long polling.
//
// TODO:
// - add `async changed()` methods to ChainTip and ChainSyncStatus
// (using `changed() -> Changed` and `impl Future<()> for Changed`)
check_synced_to_tip(network, latest_chain_tip.clone(), sync_status.clone())?;

// Fetch the state data and local time for the block template:
// - if the tip block hash changes, we must return from long polling,
// - if the local clock changes on testnet, we might return from long polling
//
// We always return after 90 minutes on mainnet, even if we have the same response,
// because the max time has been reached.
//
// TODO: timeout and exit the loop when max time is reached
let chain_tip_and_local_time =
fetch_state_tip_and_local_time(state.clone()).await?;

// Fetch the mempool data for the block template:
// - if the mempool transactions change, we might return from long polling.
let mempool_txs = fetch_mempool_transactions(mempool).await?;
// Fetch the mempool data for the block template:
// - if the mempool transactions change, we might return from long polling.
//
// TODO:
// - add a `MempoolChange` type with an `async changed()` method.
// - if we are long polling, pause between state and mempool,
// to allow transactions to re-verify (only works after PR #5841)
let mempool_txs = fetch_mempool_transactions(mempool.clone()).await?;

// - Long poll ID calculation
let server_long_poll_id = LongPollInput::new(
chain_tip_and_local_time.tip_height,
chain_tip_and_local_time.tip_hash,
chain_tip_and_local_time.max_time,
mempool_txs.iter().map(|tx| tx.transaction.id),
)
.generate_id();

// The loop finishes if:
// - the client didn't pass a long poll ID,
// - the server long poll ID is different to the client long poll ID, or
// - the previous loop iteration waited until the max time.
if Some(&server_long_poll_id) != client_long_poll_id.as_ref() || max_time_reached {
break (server_long_poll_id, chain_tip_and_local_time, mempool_txs);
}

// - Long poll ID calculation
//
// TODO: check if the client passed the same long poll id,
// if they did, wait until the inputs change,
// or the max time, or Zebra is no longer synced to the tip.

let long_poll_id = LongPollInput::new(
chain_tip_and_local_time.tip_height,
chain_tip_and_local_time.tip_hash,
chain_tip_and_local_time.max_time,
mempool_txs.iter().map(|tx| tx.transaction.id),
)
.into();
// This duration can be slightly lower than needed, if cur_time was clamped
// to min_time. In that case the wait is very long, and it's ok to return early.
//
// It can also be zero if cur_time was clamped to max_time.
// In that case, we want to wait for another change, and ignore this timeout.
let duration_until_max_time = chain_tip_and_local_time
.max_time
.saturating_duration_since(chain_tip_and_local_time.cur_time);
let wait_for_max_time: OptionFuture<_> = if duration_until_max_time.seconds() > 0 {
Some(tokio::time::sleep(duration_until_max_time.to_std()))
} else {
None
}
.into();

// TODO: remove this polling wait after we've switched to
// using futures to detect state tip, sync status, and mempool changes
let temp_wait_before_requests = tokio::time::sleep(Duration::from_secs(5));

tokio::select! {
// Poll the futures in the same order as they are listed here.
biased;

// TODO: change logging to debug after testing
Some(_elapsed) = wait_for_max_time => {
tracing::info!(
max_time = ?chain_tip_and_local_time.max_time,
cur_time = ?chain_tip_and_local_time.cur_time,
?server_long_poll_id,
?client_long_poll_id,
"returning from long poll because max time was reached"
);

max_time_reached = true;
}

_elapsed = temp_wait_before_requests => {
tracing::info!(
max_time = ?chain_tip_and_local_time.max_time,
cur_time = ?chain_tip_and_local_time.cur_time,
?server_long_poll_id,
?client_long_poll_id,
"checking long poll inputs again after waiting 5 seconds"
);
}
}
};

// - Processing fetched data to create a transaction template
//
Expand Down Expand Up @@ -405,7 +478,7 @@ where
let response = GetBlockTemplate::new(
next_block_height,
&chain_tip_and_local_time,
long_poll_id,
server_long_poll_id,
coinbase_txn,
&mempool_txs,
default_roots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use crate::methods::get_block_template_rpcs::types::get_block_template::*;

/// Returns an error if the get block template RPC `parameters` are invalid.
pub fn check_block_template_parameters(
parameters: get_block_template::JsonParameters,
parameters: &get_block_template::JsonParameters,
) -> Result<()> {
if parameters.data.is_some() || parameters.mode == GetBlockTemplateRequestMode::Proposal {
return Err(Error {
Expand Down
53 changes: 26 additions & 27 deletions zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ impl LongPollInput {
mempool_transaction_mined_ids,
}
}

/// Returns the [`LongPollId`] for this [`LongPollInput`].
/// Performs lossy conversion on some fields.
pub fn generate_id(&self) -> LongPollId {
let mut tip_hash_checksum = 0;
update_checksum(&mut tip_hash_checksum, self.tip_hash.0);

let mut mempool_transaction_content_checksum: u32 = 0;
for tx_mined_id in self.mempool_transaction_mined_ids.iter() {
update_checksum(&mut mempool_transaction_content_checksum, tx_mined_id.0);
}

LongPollId {
tip_height: self.tip_height.0,

tip_hash_checksum,

max_timestamp: self.max_time.timestamp(),

// It's ok to do wrapping conversions here,
// because long polling checks are probabilistic.
mempool_transaction_count: self.mempool_transaction_mined_ids.len() as u32,

mempool_transaction_content_checksum,
}
}
}

/// The encoded long poll ID, generated from the [`LongPollInput`].
Expand Down Expand Up @@ -161,33 +187,6 @@ pub struct LongPollId {
pub mempool_transaction_content_checksum: u32,
}

impl From<LongPollInput> for LongPollId {
/// Lossy conversion from LongPollInput to LongPollId.
fn from(input: LongPollInput) -> Self {
let mut tip_hash_checksum = 0;
update_checksum(&mut tip_hash_checksum, input.tip_hash.0);

let mut mempool_transaction_content_checksum: u32 = 0;
for tx_mined_id in input.mempool_transaction_mined_ids.iter() {
update_checksum(&mut mempool_transaction_content_checksum, tx_mined_id.0);
}

Self {
tip_height: input.tip_height.0,

tip_hash_checksum,

max_timestamp: input.max_time.timestamp(),

// It's ok to do wrapping conversions here,
// because long polling checks are probabilistic.
mempool_transaction_count: input.mempool_transaction_mined_ids.len() as u32,

mempool_transaction_content_checksum,
}
}
}

/// Update `checksum` from `item`, so changes in `item` are likely to also change `checksum`.
///
/// This checksum is not cryptographically secure.
Expand Down

0 comments on commit 99f244f

Please sign in to comment.