Skip to content

Commit

Permalink
Fetch blocks in background (ordinals#495)
Browse files Browse the repository at this point in the history
  • Loading branch information
casey authored Oct 28, 2022
1 parent 449787b commit f84c73d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 131 deletions.
65 changes: 0 additions & 65 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ lazy_static = "1.4.0"
log = "0.4.14"
mime = "0.3.16"
mime_guess = "2.0.4"
rayon = "1.5.1"
redb = "0.8.0"
regex = "1.6.0"
reqwest = { version = "0.11.10", features = ["blocking"] }
Expand Down
43 changes: 11 additions & 32 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use {
bitcoincore_rpc::{json::GetBlockHeaderResult, Auth, Client},
indicatif::{ProgressBar, ProgressStyle},
log::log_enabled,
rayon::iter::{IntoParallelRefIterator, ParallelIterator},
redb::{
Database, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, Table,
TableDefinition, WriteStrategy, WriteTransaction,
Expand Down Expand Up @@ -46,13 +45,15 @@ fn encode_satpoint(satpoint: SatPoint) -> [u8; 44] {
}

pub(crate) struct Index {
auth: Auth,
client: Client,
database: Database,
database_path: PathBuf,
genesis_block_coinbase_transaction: Transaction,
genesis_block_coinbase_txid: Txid,
height_limit: Option<u64>,
reorged: AtomicBool,
genesis_block_coinbase_txid: Txid,
genesis_block_coinbase_transaction: Transaction,
rpc_url: String,
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -114,8 +115,9 @@ impl Index {
cookie_file.display()
);

let client = Client::new(&rpc_url, Auth::CookieFile(cookie_file))
.context("failed to connect to RPC URL")?;
let auth = Auth::CookieFile(cookie_file);

let client = Client::new(&rpc_url, auth.clone()).context("failed to connect to RPC URL")?;

let data_dir = options.data_dir()?;

Expand Down Expand Up @@ -161,13 +163,15 @@ impl Index {
options.chain.genesis_block().coinbase().unwrap().clone();

Ok(Self {
genesis_block_coinbase_txid: genesis_block_coinbase_transaction.txid(),
auth,
client,
database,
database_path,
genesis_block_coinbase_transaction,
height_limit: options.height_limit,
reorged: AtomicBool::new(false),
genesis_block_coinbase_txid: genesis_block_coinbase_transaction.txid(),
genesis_block_coinbase_transaction,
rpc_url,
})
}

Expand Down Expand Up @@ -327,31 +331,6 @@ impl Index {
)
}

pub(crate) fn block_with_retries(&self, height: u64) -> Result<Option<Block>> {
let mut errors = 0;
loop {
match self.block(height) {
Err(err) => {
if cfg!(test) {
return Err(err);
}

errors += 1;
let seconds = 1 << errors;
log::error!("failed to fetch block {height}, retrying in {seconds}s: {err}");

if seconds > 120 {
log::error!("would sleep for more than 120s, giving up");
return Err(err);
}

thread::sleep(Duration::from_secs(seconds));
}
Ok(result) => return Ok(result),
}
}
}

pub(crate) fn block_header(&self, hash: BlockHash) -> Result<Option<BlockHeader>> {
self.client.get_block_header(&hash).into_option()
}
Expand Down
134 changes: 101 additions & 33 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::*;
use {super::*, std::sync::mpsc};

pub struct Updater {
cache: HashMap<[u8; 36], Vec<u8>>,
Expand Down Expand Up @@ -47,35 +47,43 @@ impl Updater {
Some(progress_bar)
};

let rx = Self::fetch_blocks_from(
index,
wtx
.open_table(super::HEIGHT_TO_BLOCK_HASH)?
.range(0..)?
.rev()
.next()
.map(|(height, _hash)| height + 1)
.unwrap_or(0),
)?;

let mut uncomitted = 0;
for i in 0.. {
if let Some(height_limit) = index.height_limit {
if self.height > height_limit {
break;
}
}
let block = match rx.recv() {
Ok(block) => block,
Err(mpsc::RecvError) => break,
};

let done = self.index_block(index, &mut wtx)?;
self.index_block(index, &mut wtx, block)?;

if !done {
if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);

if progress_bar.position() > progress_bar.length().unwrap() {
progress_bar.set_length(index.client.get_block_count()?);
}
if progress_bar.position() > progress_bar.length().unwrap() {
progress_bar.set_length(index.client.get_block_count()?);
}

uncomitted += 1;
}

if uncomitted > 0 && i % 5000 == 0 {
uncomitted += 1;

if i % 5000 == 0 {
self.commit(wtx)?;
wtx = index.begin_write()?;
uncomitted = 0;
}

if done || INTERRUPTS.load(atomic::Ordering::Relaxed) > 0 {
if INTERRUPTS.load(atomic::Ordering::Relaxed) > 0 {
break;
}
}
Expand All @@ -91,7 +99,76 @@ impl Updater {
Ok(())
}

pub(crate) fn index_block(&mut self, index: &Index, wtx: &mut WriteTransaction) -> Result<bool> {
fn fetch_blocks_from(index: &Index, mut height: u64) -> Result<mpsc::Receiver<Block>> {
let (tx, rx) = mpsc::sync_channel(32);

let height_limit = index.height_limit;

let client =
Client::new(&index.rpc_url, index.auth.clone()).context("failed to connect to RPC URL")?;

thread::spawn(move || loop {
if let Some(height_limit) = height_limit {
if height > height_limit {
break;
}
}

match Self::get_block_with_retries(&client, height) {
Ok(Some(block)) => {
if let Err(err) = tx.send(block) {
log::info!("Block receiver disconnected: {err}");
break;
}
height += 1;
}
Ok(None) => break,
Err(err) => {
log::error!("Failed to fetch block {height}: {err}");
break;
}
}
});

Ok(rx)
}

pub(crate) fn get_block_with_retries(client: &Client, height: u64) -> Result<Option<Block>> {
let mut errors = 0;
loop {
match client
.get_block_hash(height)
.into_option()?
.map(|hash| client.get_block(&hash))
.transpose()
{
Err(err) => {
if cfg!(test) {
return Err(err.into());
}

errors += 1;
let seconds = 1 << errors;
log::error!("failed to fetch block {height}, retrying in {seconds}s: {err}");

if seconds > 120 {
log::error!("would sleep for more than 120s, giving up");
return Err(err.into());
}

thread::sleep(Duration::from_secs(seconds));
}
Ok(result) => return Ok(result),
}
}
}

pub(crate) fn index_block(
&mut self,
index: &Index,
wtx: &mut WriteTransaction,
block: Block,
) -> Result<()> {
let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?;
let mut ordinal_to_satpoint = wtx.open_table(ORDINAL_TO_SATPOINT)?;
let mut outpoint_to_ordinal_ranges = wtx.open_table(OUTPOINT_TO_ORDINAL_RANGES)?;
Expand All @@ -100,11 +177,6 @@ impl Updater {
let mut ordinal_ranges_written = 0;
let mut outputs_in_block = 0;

let block = match index.block_with_retries(self.height)? {
Some(block) => block,
None => return Ok(true),
};

let time = Utc.timestamp(block.header.time as i64, 0);

log::info!(
Expand All @@ -131,13 +203,9 @@ impl Updater {
coinbase_inputs.push_front((start.n(), (start + h.subsidy()).n()));
}

let txdata = block
.txdata
.par_iter()
.map(|tx| (tx.txid(), tx))
.collect::<Vec<(Txid, &Transaction)>>();
for (tx_offset, tx) in block.txdata.iter().enumerate().skip(1) {
let txid = tx.txid();

for (tx_offset, (txid, tx)) in txdata.iter().enumerate().skip(1) {
log::trace!("Indexing transaction {tx_offset}…");

let mut input_ordinal_ranges = VecDeque::new();
Expand All @@ -163,7 +231,7 @@ impl Updater {
}

self.index_transaction(
*txid,
txid,
tx,
&mut ordinal_to_satpoint,
&mut input_ordinal_ranges,
Expand All @@ -174,9 +242,9 @@ impl Updater {
coinbase_inputs.extend(input_ordinal_ranges);
}

if let Some((txid, tx)) = txdata.first() {
if let Some(tx) = block.coinbase() {
self.index_transaction(
*txid,
tx.txid(),
tx,
&mut ordinal_to_satpoint,
&mut coinbase_inputs,
Expand All @@ -195,7 +263,7 @@ impl Updater {
(Instant::now() - start).as_millis(),
);

Ok(false)
Ok(())
}

pub(crate) fn index_transaction(
Expand Down

0 comments on commit f84c73d

Please sign in to comment.