Skip to content

Commit

Permalink
Merge pull request #17 from lbryio/precomputed-hashx-status
Browse files Browse the repository at this point in the history
Precomputed hashx status
  • Loading branch information
jackrobison authored Apr 7, 2022
2 parents 5763d38 + 881fdf3 commit 311db52
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 56 deletions.
110 changes: 110 additions & 0 deletions scribe/blockchain/mempool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import itertools
import attr
import typing
from collections import defaultdict
from scribe.blockchain.transaction.deserializer import Deserializer

if typing.TYPE_CHECKING:
from scribe.db import HubDB


@attr.s(slots=True)
class MemPoolTx:
prevouts = attr.ib()
# A pair is a (hashX, value) tuple
in_pairs = attr.ib()
out_pairs = attr.ib()
fee = attr.ib()
size = attr.ib()
raw_tx = attr.ib()


@attr.s(slots=True)
class MemPoolTxSummary:
hash = attr.ib()
fee = attr.ib()
has_unconfirmed_inputs = attr.ib()


class MemPool:
def __init__(self, coin, db: 'HubDB'):
self.coin = coin
self._db = db
self.txs = {}
self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key

def mempool_history(self, hashX: bytes) -> str:
result = ''
for tx_hash in self.touched_hashXs.get(hashX, ()):
if tx_hash not in self.txs:
continue # the tx hash for the touched address is an input that isn't in mempool anymore
result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:'
return result

def remove(self, to_remove: typing.Dict[bytes, bytes]):
# Remove txs that aren't in mempool anymore
for tx_hash in set(self.txs).intersection(to_remove.keys()):
tx = self.txs.pop(tx_hash)
tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
for hashX in tx_hashXs:
if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
self.touched_hashXs[hashX].remove(tx_hash)
if not self.touched_hashXs[hashX]:
self.touched_hashXs.pop(hashX)

def update_mempool(self, to_add: typing.List[typing.Tuple[bytes, bytes]]) -> typing.Set[bytes]:
prefix_db = self._db.prefix_db
touched_hashXs = set()

# Re-sync with the new set of hashes
tx_map = {}
for tx_hash, raw_tx in to_add:
if tx_hash in self.txs:
continue
tx, tx_size = Deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((self.coin.hashX_from_txo(txout), txout.value)
for txout in tx.outputs if txout.pk_script)
tx_map[tx_hash] = MemPoolTx(None, txin_pairs, txout_pairs, 0, tx_size, raw_tx)

for tx_hash, tx in tx_map.items():
prevouts = []
# Look up the prevouts
for prev_hash, prev_index in tx.in_pairs:
if prev_hash in self.txs: # accepted mempool
utxo = self.txs[prev_hash].out_pairs[prev_index]
elif prev_hash in tx_map: # this set of changes
utxo = tx_map[prev_hash].out_pairs[prev_index]
else: # get it from the db
prev_tx_num = prefix_db.tx_num.get(prev_hash)
if not prev_tx_num:
continue
prev_tx_num = prev_tx_num.tx_num
hashX_val = prefix_db.hashX_utxo.get(prev_hash[:4], prev_tx_num, prev_index)
if not hashX_val:
continue
hashX = hashX_val.hashX
utxo_value = prefix_db.utxo.get(hashX, prev_tx_num, prev_index)
utxo = (hashX, utxo_value.amount)
prevouts.append(utxo)

# Save the prevouts, compute the fee and accept the TX
tx.prevouts = tuple(prevouts)
# Avoid negative fees if dealing with generation-like transactions
# because some in_parts would be missing
tx.fee = max(0, (sum(v for _, v in tx.prevouts) -
sum(v for _, v in tx.out_pairs)))
self.txs[tx_hash] = tx
for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs):
self.touched_hashXs[hashX].add(tx_hash)
touched_hashXs.add(hashX)

return touched_hashXs

def clear(self):
self.txs.clear()
self.touched_hashXs.clear()
95 changes: 75 additions & 20 deletions scribe/blockchain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256
from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher
from scribe.blockchain.mempool import MemPool
from scribe.schema.url import normalize_name
from scribe.service import BlockchainService
if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -45,6 +46,7 @@ class BlockchainProcessorService(BlockchainService):
def __init__(self, env: 'Env'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.daemon = LBCDaemon(env.coin, env.daemon_url)
self.mempool = MemPool(env.coin, self.db)
self.coin = env.coin
self.wait_for_blocks_duration = 0.1
self._ready_to_stop = asyncio.Event()
Expand Down Expand Up @@ -147,6 +149,10 @@ def fetch_mempool(mempool_prefix):
}

def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
for tx_hash, raw_tx in to_put:
mempool_prefix.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
Expand All @@ -157,17 +163,17 @@ def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx)
_to_put = []
try:
mempool_hashes = await self.daemon.mempool_hashes()
mempool_txids = await self.daemon.mempool_hashes()
except (TypeError, RPCError) as err:
self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err)
return
for hh in mempool_hashes:
tx_hash = bytes.fromhex(hh)[::-1]
for mempool_txid in mempool_txids:
tx_hash = bytes.fromhex(mempool_txid)[::-1]
if tx_hash in current_mempool:
current_mempool.pop(tx_hash)
else:
try:
_to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh))))
_to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(mempool_txid))))
except (TypeError, RPCError):
self.log.warning("failed to get a mempool tx, reorg underway?")
return
Expand Down Expand Up @@ -1238,6 +1244,50 @@ def _get_cumulative_update_ops(self, height: int):
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes)

def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
existing = self.db.prefix_db.hashX_status.get(hashX)
if existing:
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))

def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
history += self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))

def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes):
if height > self.env.reorg_limit: # compactify existing history
hist_txs = b''
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit
for k, hist in self.db.prefix_db.hashX_history.iterate(
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit),
deserialize_key=False, deserialize_value=False):
hist_txs += hist
self.db.prefix_db.stage_raw_delete(k, hist)
if hist_txs:
# add the accumulated histories onto the existing compacted history at height 0
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
existing = self.db.prefix_db.get(key)
if existing is not None:
self.db.prefix_db.stage_raw_delete(key, existing)
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)

def advance_block(self, block: Block):
height = self.height + 1
# print("advance ", height)
Expand Down Expand Up @@ -1326,22 +1376,16 @@ def advance_block(self, block: Block):

self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))

for k, v in self.db.prefix_db.hashX_mempool_status.iterate(
start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False):
self.db.prefix_db.stage_raw_delete(k, v)

for hashX, new_history in self.hashXs_by_tx.items():
if height > self.env.reorg_limit: # compactify existing history
hist_txs = b''
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit
for k, hist in self.db.prefix_db.hashX_history.iterate(
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit),
deserialize_key=False, deserialize_value=False):
hist_txs += hist
self.db.prefix_db.stage_raw_delete(k, hist)
if hist_txs:
# add the accumulated histories onto the existing compacted history at height 0
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
existing = self.db.prefix_db.get(key)
if existing is not None:
self.db.prefix_db.stage_raw_delete(key, existing)
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)
# TODO: combine this with compaction so that we only read the history once
self._get_update_hashX_status_ops(
hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history]
)
self._get_compactify_hashX_history_ops(height, hashX)
if not new_history:
continue
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
Expand Down Expand Up @@ -1418,6 +1462,7 @@ def clear_after_advance_or_reorg(self):
self.pending_transactions.clear()
self.pending_support_amount_change.clear()
self.touched_hashXs.clear()
self.mempool.clear()

def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0
Expand Down Expand Up @@ -1592,6 +1637,16 @@ def flush():
await self.run_in_thread_with_lock(flush)

def _iter_start_tasks(self):
while self.db.db_version < max(self.db.DB_VERSIONS):
if self.db.db_version == 7:
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
else:
raise RuntimeError("unknown db version")
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")
migrate(self.db)
self.log.info("finished migration")
self.db.read_db_state()

self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
Expand Down
2 changes: 2 additions & 0 deletions scribe/db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class DB_PREFIXES(enum.Enum):
trending_notifications = b'c'
mempool_tx = b'd'
touched_hashX = b'e'
hashX_status = b'f'
hashX_mempool_status = b'g'


COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass
Expand Down
16 changes: 13 additions & 3 deletions scribe/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
from scribe.db.prefixes import HashXMempoolStatusPrefixRow


TXO_STRUCT = struct.Struct(b'>LH')
Expand All @@ -31,7 +32,7 @@


class HubDB:
DB_VERSIONS = HIST_DB_VERSIONS = [7]
DB_VERSIONS = [7, 8]

def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
Expand Down Expand Up @@ -804,7 +805,8 @@ def open_db(self):
self.prefix_db = PrefixDB(
db_path, cache_mb=self._cache_MB,
reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix}, secondary_path=secondary_path
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix},
secondary_path=secondary_path
)

if secondary_path != '':
Expand Down Expand Up @@ -848,6 +850,14 @@ def close(self):
self.prefix_db.close()
self.prefix_db = None

def get_hashX_status(self, hashX: bytes):
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
if mempool_status:
return mempool_status.hex()
status = self.prefix_db.hashX_status.get(hashX, deserialize_value=False)
if status:
return status.hex()

def get_tx_hash(self, tx_num: int) -> bytes:
if self._cache_all_tx_hashes:
return self.total_transactions[tx_num]
Expand Down Expand Up @@ -1017,7 +1027,7 @@ def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]:
txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend(hist)
if len(txs) >= limit:
if limit and len(txs) >= limit:
break
return txs

Expand Down
Empty file added scribe/db/migrators/__init__.py
Empty file.
Loading

0 comments on commit 311db52

Please sign in to comment.