Skip to content

Commit

Permalink
Revert blocks during reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Dec 14, 2024
1 parent 249848d commit d0a2a94
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 36 deletions.
10 changes: 8 additions & 2 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::chain::{BNetwork, BlockHash, Network, Txid};
use crate::elements::peg::{get_pegin_data, get_pegout_data, PeginInfo, PegoutInfo};
use crate::elements::registry::{AssetMeta, AssetRegistry};
use crate::errors::*;
use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::schema::{Operation, TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query};
use crate::util::{bincode_util, full_hash, Bytes, FullHash, TransactionStatus, TxInput};

Expand Down Expand Up @@ -178,11 +178,17 @@ pub fn index_confirmed_tx_assets(
network: Network,
parent_network: BNetwork,
rows: &mut Vec<DBRow>,
op: &Operation,
) {
let (history, issuances) = index_tx_assets(tx, network, parent_network);

rows.extend(history.into_iter().map(|(asset_id, info)| {
asset_history_row(&asset_id, confirmed_height, tx_position, info).into_row()
let history_row = asset_history_row(&asset_id, confirmed_height, tx_position, info);
if let Operation::DeleteBlocksWithHistory(tx) = op {
tx.send(history_row.key.hash)
.expect("unbounded channel won't fail");
}
history_row.into_row()
}));

// the initial issuance is kept twice: once in the history index under I<asset><height><txid:vin>,
Expand Down
9 changes: 9 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ impl DB {
self.db.write_opt(batch, &opts).unwrap();
}

pub fn delete(&self, keys: Vec<Vec<u8>>) {
debug!("deleting {} rows from {:?}", keys.len(), self.db);
for key in keys {
let _ = self.db.delete(key).inspect_err(|err| {
warn!("Error while deleting DB row: {err}");
});
}
}

pub fn flush(&self) {
self.db.flush().unwrap();
}
Expand Down
138 changes: 107 additions & 31 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ pub struct ChainQuery {
network: Network,
}

#[derive(Debug, Clone)]
pub enum Operation {
AddBlocks,
DeleteBlocks,
DeleteBlocksWithHistory(crossbeam_channel::Sender<[u8; 32]>),
}

// TODO: &[Block] should be an iterator / a queue.
impl Indexer {
pub fn open(store: Arc<Store>, from: FetchFrom, config: &Config, metrics: &Metrics) -> Self {
Expand Down Expand Up @@ -273,13 +280,53 @@ impl Indexer {
let tip = daemon.getbestblockhash()?;
let new_headers = self.get_new_headers(&daemon, &tip)?;

// Must rollback blocks before rolling forward
let headers_len = {
let mut headers = self.store.indexed_headers.write().unwrap();
let reorged = headers.apply(new_headers.clone());
assert_eq!(tip, *headers.tip());
let headers_len = headers.len();
drop(headers);

if !reorged.is_empty() {
if reorged.len() > 10 {
warn!("reorg of over 10 blocks detected! Wonky stuff might happen!");
}
let (tx, rx) = crossbeam_channel::unbounded();
// Delete txstore
start_fetcher(self.from, &daemon, reorged.clone())?
.map(|blocks| self.add(&blocks, Operation::DeleteBlocks));
// Delete history_db
start_fetcher(self.from, &daemon, reorged)?.map(|blocks| {
self.index(&blocks, Operation::DeleteBlocksWithHistory(tx.clone()))
});
// Needed to clone in order to pass into the closure otherwise
drop(tx);

// All senders are dropped by now, so the receiver will iterate until the
// end of the unbounded queue.
let scripts = rx.into_iter().collect::<HashSet<_>>();
for script in scripts {
// cancel the script cache DB for these scripts. They might get incorrect data mixed in.
self.store.cache_db.delete(vec![
StatsCacheRow::key(&script),
UtxoCacheRow::key(&script),
#[cfg(feature = "liquid")]
[b"z", &script[..]].concat(), // asset cache key
]);
}
}
headers_len
};

let to_add = self.headers_to_add(&new_headers);
debug!(
"adding transactions from {} blocks using {:?}",
to_add.len(),
self.from
);
start_fetcher(self.from, &daemon, to_add)?.map(|blocks| self.add(&blocks));
start_fetcher(self.from, &daemon, to_add)?
.map(|blocks| self.add(&blocks, Operation::AddBlocks));
self.start_auto_compactions(&self.store.txstore_db);

let to_index = self.headers_to_index(&new_headers);
Expand All @@ -288,7 +335,8 @@ impl Indexer {
to_index.len(),
self.from
);
start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks));
start_fetcher(self.from, &daemon, to_index)?
.map(|blocks| self.index(&blocks, Operation::AddBlocks));
self.start_auto_compactions(&self.store.history_db);

if let DBFlush::Disable = self.flush {
Expand All @@ -302,20 +350,16 @@ impl Indexer {
debug!("updating synced tip to {:?}", tip);
self.store.txstore_db.put_sync(b"t", &serialize(&tip));

let mut headers = self.store.indexed_headers.write().unwrap();
headers.apply(new_headers);
assert_eq!(tip, *headers.tip());

if let FetchFrom::BlkFiles = self.from {
self.from = FetchFrom::Bitcoind;
}

self.tip_metric.set(headers.len() as i64 - 1);
self.tip_metric.set(headers_len as i64 - 1);

Ok(tip)
}

fn add(&self, blocks: &[BlockEntry]) {
fn add(&self, blocks: &[BlockEntry], op: Operation) {
debug!("Adding {} blocks to Indexer", blocks.len());
// TODO: skip orphaned blocks?
let rows = {
Expand All @@ -324,43 +368,64 @@ impl Indexer {
};
{
let _timer = self.start_timer("add_write");
self.store.txstore_db.write(rows, self.flush);
if let Operation::AddBlocks = op {
self.store.txstore_db.write(rows, self.flush);
} else {
self.store
.txstore_db
.delete(rows.into_iter().map(|r| r.key).collect());
}
}

self.store
.added_blockhashes
.write()
.unwrap()
.extend(blocks.iter().map(|b| {
if b.entry.height() % 10_000 == 0 {
info!("Tx indexing is up to height={}", b.entry.height());
}
b.entry.hash()
}));
if let Operation::AddBlocks = op {
self.store
.added_blockhashes
.write()
.unwrap()
.extend(blocks.iter().map(|b| {
if b.entry.height() % 10_000 == 0 {
info!("Tx indexing is up to height={}", b.entry.height());
}
b.entry.hash()
}));
} else {
let mut added_blockhashes = self.store.added_blockhashes.write().unwrap();
for b in blocks {
added_blockhashes.remove(b.entry.hash());
}
}
}

fn index(&self, blocks: &[BlockEntry]) {
fn index(&self, blocks: &[BlockEntry], op: Operation) {
debug!("Indexing {} blocks with Indexer", blocks.len());
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
};
let rows = {
let _timer = self.start_timer("index_process");
let added_blockhashes = self.store.added_blockhashes.read().unwrap();
for b in blocks {
if b.entry.height() % 10_000 == 0 {
info!("History indexing is up to height={}", b.entry.height());
}
let blockhash = b.entry.hash();
// TODO: replace by lookup into txstore_db?
if !added_blockhashes.contains(blockhash) {
panic!("cannot index block {} (missing from store)", blockhash);
if let Operation::AddBlocks = op {
let added_blockhashes = self.store.added_blockhashes.read().unwrap();
for b in blocks {
if b.entry.height() % 10_000 == 0 {
info!("History indexing is up to height={}", b.entry.height());
}
let blockhash = b.entry.hash();
// TODO: replace by lookup into txstore_db?
if !added_blockhashes.contains(blockhash) {
panic!("cannot index block {} (missing from store)", blockhash);
}
}
}
index_blocks(blocks, &previous_txos_map, &self.iconfig)
index_blocks(blocks, &previous_txos_map, &self.iconfig, &op)
};
self.store.history_db.write(rows, self.flush);
if let Operation::AddBlocks = op {
self.store.history_db.write(rows, self.flush);
} else {
self.store
.history_db
.delete(rows.into_iter().map(|r| r.key).collect());
}
}
}

Expand Down Expand Up @@ -1370,6 +1435,7 @@ fn index_blocks(
block_entries: &[BlockEntry],
previous_txos_map: &HashMap<OutPoint, TxOut>,
iconfig: &IndexerConfig,
op: &Operation,
) -> Vec<DBRow> {
block_entries
.par_iter() // serialization is CPU-intensive
Expand All @@ -1384,6 +1450,7 @@ fn index_blocks(
previous_txos_map,
&mut rows,
iconfig,
op,
);
}
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
Expand All @@ -1401,13 +1468,19 @@ fn index_transaction(
previous_txos_map: &HashMap<OutPoint, TxOut>,
rows: &mut Vec<DBRow>,
iconfig: &IndexerConfig,
op: &Operation,
) {
// persist history index:
// H{funding-scripthash}{spending-height}{spending-block-pos}S{spending-txid:vin}{funding-txid:vout} → ""
// H{funding-scripthash}{funding-height}{funding-block-pos}F{funding-txid:vout} → ""
// persist "edges" for fast is-this-TXO-spent check
// S{funding-txid:vout}{spending-txid:vin} → ""
let txid = full_hash(&tx.txid()[..]);
let script_callback = |script_hash| {
if let Operation::DeleteBlocksWithHistory(tx) = op {
tx.send(script_hash).expect("unbounded channel won't fail");
}
};
for (txo_index, txo) in tx.output.iter().enumerate() {
if is_spendable(txo) || iconfig.index_unspendables {
let history = TxHistoryRow::new(
Expand All @@ -1420,6 +1493,7 @@ fn index_transaction(
value: txo.value,
}),
);
script_callback(history.key.hash);
rows.push(history.into_row());

if iconfig.address_search {
Expand Down Expand Up @@ -1449,6 +1523,7 @@ fn index_transaction(
value: prev_txo.value,
}),
);
script_callback(history.key.hash);
rows.push(history.into_row());

let edge = TxEdgeRow::new(
Expand All @@ -1469,6 +1544,7 @@ fn index_transaction(
iconfig.network,
iconfig.parent_network,
rows,
op,
);
}

Expand Down
9 changes: 6 additions & 3 deletions src/util/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ impl HeaderList {
.collect()
}

pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) {
/// Returns any rolled back blocks in order from old tip first and first block in the fork is last
pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) -> Vec<HeaderEntry> {
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
for i in 1..new_headers.len() {
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
Expand All @@ -175,21 +176,23 @@ impl HeaderList {
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
height
}
None => return,
None => return vec![],
};
debug!(
"applying {} new headers from height {}",
new_headers.len(),
new_height
);
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries
let mut removed = self.headers.split_off(new_height); // keep [0..new_height) entries
for new_header in new_headers {
let height = new_header.height();
assert_eq!(height, self.headers.len());
self.tip = *new_header.hash();
self.headers.push(new_header);
self.heights.insert(self.tip, height);
}
removed.reverse();
removed
}

pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {
Expand Down

0 comments on commit d0a2a94

Please sign in to comment.