Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make retrieving inscriptions in block fast #2333

Merged
merged 11 commits into from
Aug 17, 2023
32 changes: 8 additions & 24 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
BlockHashValue, Entry, InscriptionEntry, InscriptionEntryValue, InscriptionIdValue,
OutPointValue, SatPointValue, SatRange,
},
index::block_index::BlockIndex,
reorg::*,
updater::Updater,
},
Expand All @@ -13,7 +14,6 @@ use {
bitcoincore_rpc::{json::GetBlockHeaderResult, Client},
chrono::SubsecRound,
indicatif::{ProgressBar, ProgressStyle},
itertools::Itertools,
log::log_enabled,
redb::{
Database, MultimapTable, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, Table,
Expand All @@ -23,6 +23,7 @@ use {
std::io::{BufWriter, Read, Write},
};

pub mod block_index;
mod entry;
mod fetcher;
mod reorg;
Expand Down Expand Up @@ -925,29 +926,12 @@ impl Index {
Ok((inscriptions, prev, next, lowest, highest))
}

pub(crate) fn get_inscriptions_in_block(&self, block_height: u64) -> Result<Vec<InscriptionId>> {
// This is a naive approach and will require optimization, but we don't have an index by block
let block_inscriptions = self
.database
.begin_read()?
.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?
.iter()?
.filter_map(|result| match result {
Ok((key, entry_value)) => {
let entry = InscriptionEntry::load(entry_value.value());
if entry.height == block_height {
Some((InscriptionId::load(*key.value()), entry.number))
} else {
None
}
}
Err(_) => None,
})
.sorted_by_key(|&(_id, number)| number)
.map(|(id, _)| id)
.collect();

Ok(block_inscriptions)
pub(crate) fn get_inscriptions_in_block(
&self,
block_index: &BlockIndex,
block_height: u64,
) -> Result<Vec<InscriptionId>> {
block_index.get_inscriptions_in_block(self, block_height)
}

pub(crate) fn get_feed_inscriptions(&self, n: usize) -> Result<Vec<(i64, InscriptionId)>> {
Expand Down
201 changes: 201 additions & 0 deletions src/index/block_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use super::*;

#[derive(Clone)]
pub struct BlockIndex {
first_inscription_height: u64,
lowest_blessed_by_block: Vec<i64>,
lowest_cursed_by_block: Vec<i64>,
highest_indexed_blessed: i64,
lowest_indexed_cursed: i64,
}

impl BlockIndex {
pub(crate) fn new(index: &Index) -> Result<BlockIndex> {
Ok(BlockIndex {
first_inscription_height: index.options.first_inscription_height(),
lowest_blessed_by_block: Vec::new(),
lowest_cursed_by_block: Vec::new(),
highest_indexed_blessed: i64::MIN,
lowest_indexed_cursed: i64::MAX,
})
}

pub(crate) fn update(&mut self, index: &Index) -> Result {
let index_height = index.block_count()?;
let inscribed_block_count = index_height.saturating_sub(self.first_inscription_height);
let indexed_up_to: isize = self
.lowest_blessed_by_block
.len()
.try_into()
.unwrap_or(isize::MAX);

let gap = inscribed_block_count.try_into().unwrap_or(isize::MAX) - indexed_up_to;
if gap <= 0 {
return Ok(());
}

log::debug!(
"Updating block index for {} new blocks ({} to {})",
gap,
indexed_up_to,
inscribed_block_count
);

self
.lowest_blessed_by_block
.resize(usize::try_from(inscribed_block_count)?, i64::MAX);

self
.lowest_cursed_by_block
.resize(usize::try_from(inscribed_block_count)?, i64::MAX);

let rtx = index.database.begin_read()?;

// Use a more efficient approach for the initial indexing - since we have
// to traverse all inscriptions, it is most efficient to do so using one table.
if indexed_up_to == 0 {
for result in rtx
.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?
.iter()?
{
let (_, entry) = result?;
let entry = InscriptionEntry::load(entry.value());
let height_index: usize = entry
.height
.try_into()
.unwrap_or(usize::MAX)
.saturating_sub(self.first_inscription_height.try_into().unwrap());

if entry.number < 0 {
self.lowest_cursed_by_block[height_index] =
cmp::min(self.lowest_cursed_by_block[height_index], entry.number);
self.lowest_indexed_cursed = cmp::min(self.lowest_indexed_cursed, entry.number);
} else {
self.lowest_blessed_by_block[height_index] =
cmp::min(self.lowest_blessed_by_block[height_index], entry.number);
self.highest_indexed_blessed = cmp::max(self.highest_indexed_blessed, entry.number);
}
}
} else {
// Use default approach where we iterate in order of inscription number
// so we can easily skip over already indexed inscriptions.
let mut prev_block_height = usize::MAX;

for result in rtx
.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?
.iter()?
{
let (number, id) = result?;

if number.value() >= self.lowest_indexed_cursed
&& number.value() <= self.highest_indexed_blessed
{
continue;
}

let inscription_id = InscriptionId::load(*id.value());

if let Some(entry) = index.get_inscription_entry(inscription_id)? {
let current_height = entry.height.try_into().unwrap_or(usize::MAX);

if prev_block_height != current_height {
prev_block_height = current_height;

if number.value() < 0 {
self.lowest_cursed_by_block[prev_block_height
.saturating_sub(usize::try_from(self.first_inscription_height)?)] = number.value();
self.lowest_indexed_cursed = cmp::min(self.lowest_indexed_cursed, number.value());
} else {
self.lowest_blessed_by_block[prev_block_height
.saturating_sub(usize::try_from(self.first_inscription_height)?)] = number.value();
self.highest_indexed_blessed = cmp::max(self.highest_indexed_blessed, number.value());
}
}
}
}
}

log::debug!(
"Updated block index for {} new blocks ({} to {})",
gap,
indexed_up_to,
inscribed_block_count
);

Ok(())
}

// Return all consecutively numbered inscriptions in the block at the given height, starting from the given number
fn get_inscriptions_in_block_from(
&self,
index: &Index,
block_height: u64,
from_number: i64,
cursed: bool,
) -> Result<Vec<InscriptionId>> {
let mut block_inscriptions = Vec::new();

let rtx = index.database.begin_read()?;
let inscription_id_by_number = rtx.open_table(INSCRIPTION_NUMBER_TO_INSCRIPTION_ID)?;

let highest = if cursed {
-1
} else {
match inscription_id_by_number.iter()?.next_back() {
Some(Ok((number, _id))) => number.value(),
Some(Err(err)) => return Err(err.into()),
None => i64::MIN,
}
};

for number in from_number..=highest {
match inscription_id_by_number.get(number)? {
Some(inscription_id) => {
let inscription_id = InscriptionId::load(*inscription_id.value());
if let Some(entry) = index.get_inscription_entry(inscription_id)? {
if entry.height != block_height {
break;
}
block_inscriptions.push(inscription_id);
}
}
None => break,
}
}

Ok(block_inscriptions)
}

pub(crate) fn get_inscriptions_in_block(
&self,
index: &Index,
block_height: u64,
) -> Result<Vec<InscriptionId>> {
if block_height >= index.block_count()? || block_height < self.first_inscription_height {
return Ok(Vec::new());
}
let lowest_cursed = self.lowest_cursed_by_block
[usize::try_from(block_height.saturating_sub(self.first_inscription_height))?];
let lowest_blessed = self.lowest_blessed_by_block
[usize::try_from(block_height.saturating_sub(self.first_inscription_height))?];

let mut inscriptions =
self.get_inscriptions_in_block_from(index, block_height, lowest_cursed, true)?;
inscriptions.extend(self.get_inscriptions_in_block_from(
index,
block_height,
lowest_blessed,
false,
)?);

log::debug!(
"Got {} inscriptions in block {} ({} - {})",
inscriptions.len(),
block_height,
lowest_cursed,
lowest_blessed
);

Ok(inscriptions)
}
}
38 changes: 31 additions & 7 deletions src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
error::{OptionExt, ServerError, ServerResult},
},
super::*,
crate::index::block_index::BlockIndex,
crate::page_config::PageConfig,
crate::templates::{
BlockHtml, ClockSvg, HomeHtml, InputHtml, InscriptionHtml, InscriptionJson, InscriptionsHtml,
Expand All @@ -29,7 +30,7 @@ use {
caches::DirCache,
AcmeConfig,
},
std::{cmp::Ordering, str, sync::Arc},
std::{cmp::Ordering, str, sync::Arc, sync::RwLock},
tokio_stream::StreamExt,
tower_http::{
compression::CompressionLayer,
Expand All @@ -46,6 +47,10 @@ pub struct ServerConfig {
pub is_json_api_enabled: bool,
}

struct BlockIndexState {
block_index: RwLock<BlockIndex>,
}

enum BlockQuery {
Height(u64),
Hash(BlockHash),
Expand Down Expand Up @@ -134,18 +139,38 @@ pub(crate) struct Server {
impl Server {
pub(crate) fn run(self, options: Options, index: Arc<Index>, handle: Handle) -> Result {
Runtime::new()?.block_on(async {
let block_index_state = BlockIndexState {
block_index: RwLock::new(BlockIndex::new(&index)?),
};

let block_index_state = Arc::new(block_index_state);

let index_clone = index.clone();
let block_index_clone = block_index_state.clone();

let index_thread = thread::spawn(move || loop {
if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) {
break;
}
if let Err(error) = index_clone.update() {
log::warn!("{error}");
log::warn!("Updating index: {error}");
}
if let Err(error) = block_index_clone
.block_index
.write()
.unwrap()
.update(&index_clone)
{
log::warn!("Updating block index: {error}");
}
thread::sleep(Duration::from_millis(5000));
});
INDEXER.lock().unwrap().replace(index_thread);

let server_config = Arc::new(ServerConfig {
is_json_api_enabled: index.is_json_api_enabled(),
});

let config = options.load_config()?;
let acme_domains = self.acme_domains()?;

Expand All @@ -154,10 +179,6 @@ impl Server {
domain: acme_domains.first().cloned(),
});

let server_config = Arc::new(ServerConfig {
is_json_api_enabled: index.is_json_api_enabled(),
});

let router = Router::new()
.route("/", get(Self::home))
.route("/block/:query", get(Self::block))
Expand Down Expand Up @@ -193,6 +214,7 @@ impl Server {
.layer(Extension(index))
.layer(Extension(page_config))
.layer(Extension(Arc::new(config)))
.layer(Extension(block_index_state))
.layer(SetResponseHeaderLayer::if_not_present(
header::CONTENT_SECURITY_POLICY,
HeaderValue::from_static("default-src 'self'"),
Expand Down Expand Up @@ -1007,10 +1029,12 @@ impl Server {
async fn inscriptions_in_block(
Extension(page_config): Extension<Arc<PageConfig>>,
Extension(index): Extension<Arc<Index>>,
Extension(block_index_state): Extension<Arc<BlockIndexState>>,
Path(block_height): Path<u64>,
accept_json: AcceptJson,
) -> ServerResult<Response> {
let inscriptions = index.get_inscriptions_in_block(block_height)?;
let inscriptions = index
.get_inscriptions_in_block(&block_index_state.block_index.read().unwrap(), block_height)?;
Ok(if accept_json.0 {
Json(InscriptionsJson::new(inscriptions, None, None, None, None)).into_response()
} else {
Expand Down
11 changes: 9 additions & 2 deletions tests/json_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,15 @@ fn get_inscriptions_in_block() {
}
rpc_server.mine_blocks(1);

let server = TestServer::spawn_with_args(&rpc_server, &["--index-sats", "--enable-json-api"]);
let server = TestServer::spawn_with_args(
&rpc_server,
&[
"--index-sats",
"--enable-json-api",
"--first-inscription-height",
"0",
],
);

// get all inscriptions from block 11
let response = server.json_request(format!("/inscriptions/block/{}", 11));
Expand All @@ -330,7 +338,6 @@ fn get_inscriptions_in_block() {
let inscriptions_json: InscriptionsJson =
serde_json::from_str(&response.text().unwrap()).unwrap();

assert_eq!(inscriptions_json.inscriptions.len(), 3);
pretty_assert_eq!(
inscriptions_json.inscriptions,
vec![
Expand Down