diff --git a/src/index.rs b/src/index.rs index 19f6f7c513..c09a50c31f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -4,6 +4,7 @@ use { BlockHashValue, Entry, InscriptionEntry, InscriptionEntryValue, InscriptionIdValue, OutPointValue, SatPointValue, SatRange, }, + index::block_index::BlockIndex, reorg::*, updater::Updater, }, @@ -13,7 +14,6 @@ use { bitcoincore_rpc::{json::GetBlockHeaderResult, Client}, chrono::SubsecRound, indicatif::{ProgressBar, ProgressStyle}, - itertools::Itertools, log::log_enabled, redb::{ Database, MultimapTable, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, Table, @@ -23,6 +23,7 @@ use { std::io::{BufWriter, Read, Write}, }; +pub mod block_index; mod entry; mod fetcher; mod reorg; @@ -925,29 +926,12 @@ impl Index { Ok((inscriptions, prev, next, lowest, highest)) } - pub(crate) fn get_inscriptions_in_block(&self, block_height: u64) -> Result> { - // This is a naive approach and will require optimization, but we don't have an index by block - let block_inscriptions = self - .database - .begin_read()? - .open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)? - .iter()? - .filter_map(|result| match result { - Ok((key, entry_value)) => { - let entry = InscriptionEntry::load(entry_value.value()); - if entry.height == block_height { - Some((InscriptionId::load(*key.value()), entry.number)) - } else { - None - } - } - Err(_) => None, - }) - .sorted_by_key(|&(_id, number)| number) - .map(|(id, _)| id) - .collect(); - - Ok(block_inscriptions) + pub(crate) fn get_inscriptions_in_block( + &self, + block_index: &BlockIndex, + block_height: u64, + ) -> Result> { + block_index.get_inscriptions_in_block(self, block_height) } pub(crate) fn get_feed_inscriptions(&self, n: usize) -> Result> { diff --git a/src/index/block_index.rs b/src/index/block_index.rs new file mode 100644 index 0000000000..c036080f22 --- /dev/null +++ b/src/index/block_index.rs @@ -0,0 +1,201 @@ +use super::*; + +#[derive(Clone)] +pub struct BlockIndex { + first_inscription_height: u64, + lowest_blessed_by_block: Vec, + lowest_cursed_by_block: Vec, + highest_indexed_blessed: i64, + lowest_indexed_cursed: i64, +} + +impl BlockIndex { + pub(crate) fn new(index: &Index) -> Result { + Ok(BlockIndex { + first_inscription_height: index.options.first_inscription_height(), + lowest_blessed_by_block: Vec::new(), + lowest_cursed_by_block: Vec::new(), + highest_indexed_blessed: i64::MIN, + lowest_indexed_cursed: i64::MAX, + }) + } + + pub(crate) fn update(&mut self, index: &Index) -> Result { + let index_height = index.block_count()?; + let inscribed_block_count = index_height.saturating_sub(self.first_inscription_height); + let indexed_up_to: isize = self + .lowest_blessed_by_block + .len() + .try_into() + .unwrap_or(isize::MAX); + + let gap = inscribed_block_count.try_into().unwrap_or(isize::MAX) - indexed_up_to; + if gap <= 0 { + return Ok(()); + } + + log::debug!( + "Updating block index for {} new blocks ({} to {})", + gap, + indexed_up_to, + inscribed_block_count + ); + + self + .lowest_blessed_by_block + .resize(usize::try_from(inscribed_block_count)?, i64::MAX); + + self + .lowest_cursed_by_block + .resize(usize::try_from(inscribed_block_count)?, i64::MAX); + + let rtx = index.database.begin_read()?; + + // Use a more efficient approach for the initial indexing - since we have + // to traverse all inscriptions, it is most efficient to do so using one table. + if indexed_up_to == 0 { + for result in rtx + .open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)? + .iter()? + { + let (_, entry) = result?; + let entry = InscriptionEntry::load(entry.value()); + let height_index: usize = entry + .height + .try_into() + .unwrap_or(usize::MAX) + .saturating_sub(self.first_inscription_height.try_into().unwrap()); + + if entry.number < 0 { + self.lowest_cursed_by_block[height_index] = + cmp::min(self.lowest_cursed_by_block[height_index], entry.number); + self.lowest_indexed_cursed = cmp::min(self.lowest_indexed_cursed, entry.number); + } else { + self.lowest_blessed_by_block[height_index] = + cmp::min(self.lowest_blessed_by_block[height_index], entry.number); + self.highest_indexed_blessed = cmp::max(self.highest_indexed_blessed, entry.number); + } + } + } else { + // Use default approach where we iterate in order of inscription number + // so we can easily skip over already indexed inscriptions. + let mut prev_block_height = usize::MAX; + + for result in rtx + .open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)? + .iter()? + { + let (number, id) = result?; + + if number.value() >= self.lowest_indexed_cursed + && number.value() <= self.highest_indexed_blessed + { + continue; + } + + let inscription_id = InscriptionId::load(*id.value()); + + if let Some(entry) = index.get_inscription_entry(inscription_id)? { + let current_height = entry.height.try_into().unwrap_or(usize::MAX); + + if prev_block_height != current_height { + prev_block_height = current_height; + + if number.value() < 0 { + self.lowest_cursed_by_block[prev_block_height + .saturating_sub(usize::try_from(self.first_inscription_height)?)] = number.value(); + self.lowest_indexed_cursed = cmp::min(self.lowest_indexed_cursed, number.value()); + } else { + self.lowest_blessed_by_block[prev_block_height + .saturating_sub(usize::try_from(self.first_inscription_height)?)] = number.value(); + self.highest_indexed_blessed = cmp::max(self.highest_indexed_blessed, number.value()); + } + } + } + } + } + + log::debug!( + "Updated block index for {} new blocks ({} to {})", + gap, + indexed_up_to, + inscribed_block_count + ); + + Ok(()) + } + + // Return all consecutively numbered inscriptions in the block at the given height, starting from the given number + fn get_inscriptions_in_block_from( + &self, + index: &Index, + block_height: u64, + from_number: i64, + cursed: bool, + ) -> Result> { + let mut block_inscriptions = Vec::new(); + + let rtx = index.database.begin_read()?; + let inscription_id_by_number = rtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?; + + let highest = if cursed { + -1 + } else { + match inscription_id_by_number.iter()?.next_back() { + Some(Ok((number, _id))) => number.value(), + Some(Err(err)) => return Err(err.into()), + None => i64::MIN, + } + }; + + for number in from_number..=highest { + match inscription_id_by_number.get(number)? { + Some(inscription_id) => { + let inscription_id = InscriptionId::load(*inscription_id.value()); + if let Some(entry) = index.get_inscription_entry(inscription_id)? { + if entry.height != block_height { + break; + } + block_inscriptions.push(inscription_id); + } + } + None => break, + } + } + + Ok(block_inscriptions) + } + + pub(crate) fn get_inscriptions_in_block( + &self, + index: &Index, + block_height: u64, + ) -> Result> { + if block_height >= index.block_count()? || block_height < self.first_inscription_height { + return Ok(Vec::new()); + } + let lowest_cursed = self.lowest_cursed_by_block + [usize::try_from(block_height.saturating_sub(self.first_inscription_height))?]; + let lowest_blessed = self.lowest_blessed_by_block + [usize::try_from(block_height.saturating_sub(self.first_inscription_height))?]; + + let mut inscriptions = + self.get_inscriptions_in_block_from(index, block_height, lowest_cursed, true)?; + inscriptions.extend(self.get_inscriptions_in_block_from( + index, + block_height, + lowest_blessed, + false, + )?); + + log::debug!( + "Got {} inscriptions in block {} ({} - {})", + inscriptions.len(), + block_height, + lowest_cursed, + lowest_blessed + ); + + Ok(inscriptions) + } +} diff --git a/src/subcommand/server.rs b/src/subcommand/server.rs index 8cc8f672a1..46b9fac786 100644 --- a/src/subcommand/server.rs +++ b/src/subcommand/server.rs @@ -5,6 +5,7 @@ use { error::{OptionExt, ServerError, ServerResult}, }, super::*, + crate::index::block_index::BlockIndex, crate::page_config::PageConfig, crate::templates::{ BlockHtml, ClockSvg, HomeHtml, InputHtml, InscriptionHtml, InscriptionJson, InscriptionsHtml, @@ -29,7 +30,7 @@ use { caches::DirCache, AcmeConfig, }, - std::{cmp::Ordering, str, sync::Arc}, + std::{cmp::Ordering, str, sync::Arc, sync::RwLock}, tokio_stream::StreamExt, tower_http::{ compression::CompressionLayer, @@ -46,6 +47,10 @@ pub struct ServerConfig { pub is_json_api_enabled: bool, } +struct BlockIndexState { + block_index: RwLock, +} + enum BlockQuery { Height(u64), Hash(BlockHash), @@ -134,18 +139,38 @@ pub(crate) struct Server { impl Server { pub(crate) fn run(self, options: Options, index: Arc, handle: Handle) -> Result { Runtime::new()?.block_on(async { + let block_index_state = BlockIndexState { + block_index: RwLock::new(BlockIndex::new(&index)?), + }; + + let block_index_state = Arc::new(block_index_state); + let index_clone = index.clone(); + let block_index_clone = block_index_state.clone(); + let index_thread = thread::spawn(move || loop { if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) { break; } if let Err(error) = index_clone.update() { - log::warn!("{error}"); + log::warn!("Updating index: {error}"); + } + if let Err(error) = block_index_clone + .block_index + .write() + .unwrap() + .update(&index_clone) + { + log::warn!("Updating block index: {error}"); } thread::sleep(Duration::from_millis(5000)); }); INDEXER.lock().unwrap().replace(index_thread); + let server_config = Arc::new(ServerConfig { + is_json_api_enabled: index.is_json_api_enabled(), + }); + let config = options.load_config()?; let acme_domains = self.acme_domains()?; @@ -154,10 +179,6 @@ impl Server { domain: acme_domains.first().cloned(), }); - let server_config = Arc::new(ServerConfig { - is_json_api_enabled: index.is_json_api_enabled(), - }); - let router = Router::new() .route("/", get(Self::home)) .route("/block/:query", get(Self::block)) @@ -193,6 +214,7 @@ impl Server { .layer(Extension(index)) .layer(Extension(page_config)) .layer(Extension(Arc::new(config))) + .layer(Extension(block_index_state)) .layer(SetResponseHeaderLayer::if_not_present( header::CONTENT_SECURITY_POLICY, HeaderValue::from_static("default-src 'self'"), @@ -1007,10 +1029,12 @@ impl Server { async fn inscriptions_in_block( Extension(page_config): Extension>, Extension(index): Extension>, + Extension(block_index_state): Extension>, Path(block_height): Path, accept_json: AcceptJson, ) -> ServerResult { - let inscriptions = index.get_inscriptions_in_block(block_height)?; + let inscriptions = index + .get_inscriptions_in_block(&block_index_state.block_index.read().unwrap(), block_height)?; Ok(if accept_json.0 { Json(InscriptionsJson::new(inscriptions, None, None, None, None)).into_response() } else { diff --git a/tests/json_api.rs b/tests/json_api.rs index 893ede9327..893d7c7673 100644 --- a/tests/json_api.rs +++ b/tests/json_api.rs @@ -321,7 +321,15 @@ fn get_inscriptions_in_block() { } rpc_server.mine_blocks(1); - let server = TestServer::spawn_with_args(&rpc_server, &["--index-sats", "--enable-json-api"]); + let server = TestServer::spawn_with_args( + &rpc_server, + &[ + "--index-sats", + "--enable-json-api", + "--first-inscription-height", + "0", + ], + ); // get all inscriptions from block 11 let response = server.json_request(format!("/inscriptions/block/{}", 11)); @@ -330,7 +338,6 @@ fn get_inscriptions_in_block() { let inscriptions_json: InscriptionsJson = serde_json::from_str(&response.text().unwrap()).unwrap(); - assert_eq!(inscriptions_json.inscriptions.len(), 3); pretty_assert_eq!( inscriptions_json.inscriptions, vec![