diff --git a/lbry/schema/result.py b/lbry/schema/result.py index f3c761bec2..a4dd7b773d 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -161,7 +161,6 @@ def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked=None) @classmethod def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> bytes: - extra_txo_rows = {row['claim_hash']: row for row in extra_txo_rows} page = OutputsMessage() page.offset = offset if total is not None: @@ -208,7 +207,7 @@ def encode_txo(cls, txo_message, resolve_result: Union['ResolveResult', Exceptio if resolve_result.canonical_url is not None: txo_message.claim.canonical_url = resolve_result.canonical_url - if resolve_result.last_take_over_height is not None: - txo_message.claim.take_over_height = resolve_result.last_take_over_height + if resolve_result.last_takeover_height is not None: + txo_message.claim.take_over_height = resolve_result.last_takeover_height if resolve_result.claims_in_channel is not None: txo_message.claim.claims_in_channel = resolve_result.claims_in_channel diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 64e060c57e..ed58903edf 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -18,6 +18,9 @@ from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport, get_expiration_height +from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, get_force_activate_ops, get_delay_for_name +from lbry.wallet.server.db.prefixes import PendingClaimActivationPrefixRow, Prefixes +from lbry.wallet.server.db.revertable import RevertablePut from lbry.wallet.server.udp import StatusServer if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB @@ -202,13 +205,12 @@ def __init__(self, env, db: 'LevelDB', daemon, notifications): self.history_cache = {} self.status_server = StatusServer() self.effective_amount_changes = defaultdict(list) - self.pending_claims = {} - self.pending_claim_txos = {} + self.pending_claims: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} + self.pending_claim_txos: typing.Dict[bytes, Tuple[int, int]] = {} self.pending_supports = defaultdict(set) self.pending_support_txos = {} self.pending_abandon = set() - - + self.staged_pending_abandoned = {} async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -239,7 +241,6 @@ async def check_and_advance_blocks(self, raw_blocks): try: for block in blocks: await self.run_in_thread_with_lock(self.advance_block, block) - print("advanced\n") except: self.logger.exception("advance blocks failed") raise @@ -412,7 +413,8 @@ def check_cache_size(self): return None def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: int, tx_count: int, txout, - spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]) -> List['RevertableOp']: + spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]], + zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']: try: claim_name = txo.normalized_name except UnicodeDecodeError: @@ -425,7 +427,13 @@ def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: in signing_channel_hash = None channel_claims_count = 0 - activation_height = 0 + activation_delay = self.db.get_activation_delay(claim_hash, claim_name) + if activation_delay == 0: + zero_delay_claims[(claim_name, claim_hash)] = tx_count, idx + # else: + # print("delay activation ", claim_name, activation_delay, height) + + activation_height = activation_delay + height try: signable = txo.signable except: # google.protobuf.message.DecodeError: Could not parse JSON. @@ -455,9 +463,13 @@ def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: in root_idx = previous_claim.root_claim_tx_position # prev_amount = previous_claim.amount else: - root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( + k, v = self.db.get_root_claim_txo_and_current_amount( claim_hash ) + root_tx_num = v.root_tx_num + root_idx = v.root_position + prev_amount = v.amount + pending = StagedClaimtrieItem( claim_name, claim_hash, txout.value, support_amount + txout.value, activation_height, get_expiration_height(height), tx_count, idx, root_tx_num, root_idx, @@ -469,11 +481,25 @@ def _add_claim_or_update(self, height: int, txo, script, tx_hash: bytes, idx: in self.effective_amount_changes[claim_hash].append(txout.value) return pending.get_add_claim_utxo_ops() - def _add_support(self, txo, txout, idx, tx_count) -> List['RevertableOp']: + def _add_support(self, height, txo, txout, idx, tx_count, + zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']: supported_claim_hash = txo.claim_hash[::-1] + claim_info = self.db.get_root_claim_txo_and_current_amount( + supported_claim_hash + ) + controlling_claim = None + supported_tx_num = supported_position = supported_activation_height = supported_name = None + if claim_info: + k, v = claim_info + supported_name = v.name + supported_tx_num = k.tx_num + supported_position = k.position + supported_activation_height = v.activation + controlling_claim = self.db.get_controlling_claim(v.name) + if supported_claim_hash in self.effective_amount_changes: - # print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") + # print(f"\tsupport claim {supported_claim_hash.hex()} {txout.value}") self.effective_amount_changes[supported_claim_hash].append(txout.value) self.pending_supports[supported_claim_hash].add((tx_count, idx)) self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value @@ -481,42 +507,84 @@ def _add_support(self, txo, txout, idx, tx_count) -> List['RevertableOp']: supported_claim_hash, tx_count, idx, txout.value ).get_add_support_utxo_ops() elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon: - if self.db.claim_exists(supported_claim_hash): - _, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount( - supported_claim_hash - ) + # print(f"\tsupport claim {supported_claim_hash.hex()} {txout.value}") + ops = [] + if claim_info: starting_amount = self.db.get_effective_amount(supported_claim_hash) + if supported_claim_hash not in self.effective_amount_changes: self.effective_amount_changes[supported_claim_hash].append(starting_amount) self.effective_amount_changes[supported_claim_hash].append(txout.value) + supported_amount = self._get_pending_effective_amount(supported_claim_hash) + + if controlling_claim and supported_claim_hash != controlling_claim.claim_hash: + if supported_amount + txo.amount > self._get_pending_effective_amount(controlling_claim.claim_hash): + # takeover could happen + if (supported_name, supported_claim_hash) not in zero_delay_claims: + takeover_delay = get_delay_for_name(height - supported_activation_height) + if takeover_delay == 0: + zero_delay_claims[(supported_name, supported_claim_hash)] = ( + supported_tx_num, supported_position + ) + else: + ops.append( + RevertablePut( + *Prefixes.pending_activation.pack_item( + height + takeover_delay, supported_tx_num, supported_position, + supported_claim_hash, supported_name + ) + ) + ) + self.pending_supports[supported_claim_hash].add((tx_count, idx)) self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value # print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") - return StagedClaimtrieSupport( + ops.extend(StagedClaimtrieSupport( supported_claim_hash, tx_count, idx, txout.value - ).get_add_support_utxo_ops() + ).get_add_support_utxo_ops()) + return ops else: print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}") return [] def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_count: int, idx: int, txo, txout, script, - spent_claims: typing.Dict[bytes, Tuple[int, int, str]]) -> List['RevertableOp']: + spent_claims: typing.Dict[bytes, Tuple[int, int, str]], + zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]]) -> List['RevertableOp']: if script.is_claim_name or script.is_update_claim: - return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims) + return self._add_claim_or_update(height, txo, script, tx_hash, idx, tx_count, txout, spent_claims, + zero_delay_claims) elif script.is_support_claim or script.is_support_claim_data: - return self._add_support(txo, txout, idx, tx_count) + return self._add_support(height, txo, txout, idx, tx_count, zero_delay_claims) return [] - def _spend_support(self, txin): + def _remove_support(self, txin, zero_delay_claims): txin_num = self.db.transaction_num_mapping[txin.prev_hash] - + supported_name = None if (txin_num, txin.prev_idx) in self.pending_support_txos: spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx)) + supported_name = self._get_pending_claim_name(spent_support) self.pending_supports[spent_support].remove((txin_num, txin.prev_idx)) else: spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) + if spent_support: + supported_name = self._get_pending_claim_name(spent_support) + if spent_support and support_amount is not None and spent_support not in self.pending_abandon: - # print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx})") + controlling = self.db.get_controlling_claim(supported_name) + if controlling: + bid_queue = { + claim_hash: self._get_pending_effective_amount(claim_hash) + for claim_hash in self.db.get_claims_for_name(supported_name) + if claim_hash not in self.pending_abandon + } + bid_queue[spent_support] -= support_amount + sorted_claims = sorted( + list(bid_queue.keys()), key=lambda claim_hash: bid_queue[claim_hash], reverse=True + ) + if controlling.claim_hash == spent_support and sorted_claims.index(controlling.claim_hash) > 0: + print("takeover due to abandoned support") + + # print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx}) {supported_name}") if spent_support not in self.effective_amount_changes: assert spent_support not in self.pending_claims prev_effective_amount = self.db.get_effective_amount(spent_support) @@ -527,7 +595,7 @@ def _spend_support(self, txin): ).get_spend_support_txo_ops() return [] - def _spend_claim(self, txin, spent_claims): + def _remove_claim(self, txin, spent_claims, zero_delay_claims): txin_num = self.db.transaction_num_mapping[txin.prev_hash] if (txin_num, txin.prev_idx) in self.pending_claims: spent = self.pending_claims[(txin_num, txin.prev_idx)] @@ -540,7 +608,7 @@ def _spend_claim(self, txin, spent_claims): ) if not spent_claim_hash_and_name: # txo is not a claim return [] - prev_claim_hash, txi_len_encoded_name = spent_claim_hash_and_name + prev_claim_hash = spent_claim_hash_and_name.claim_hash prev_signing_hash = self.db.get_channel_for_claim(prev_claim_hash) prev_claims_in_channel_count = None @@ -551,8 +619,14 @@ def _spend_claim(self, txin, spent_claims): prev_effective_amount = self.db.get_effective_amount( prev_claim_hash ) - claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) - activation_height = 0 + k, v = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) + claim_root_tx_num = v.root_tx_num + claim_root_idx = v.root_position + prev_amount = v.amount + name = v.name + tx_num = k.tx_num + position = k.position + activation_height = v.activation height = bisect_right(self.db.tx_counts, tx_num) spent = StagedClaimtrieItem( name, prev_claim_hash, prev_amount, prev_effective_amount, @@ -564,23 +638,29 @@ def _spend_claim(self, txin, spent_claims): if spent.claim_hash not in self.effective_amount_changes: self.effective_amount_changes[spent.claim_hash].append(spent.effective_amount) self.effective_amount_changes[spent.claim_hash].append(-spent.amount) + if (name, spent.claim_hash) in zero_delay_claims: + zero_delay_claims.pop((name, spent.claim_hash)) return spent.get_spend_claim_txo_ops() - def _spend_claim_or_support(self, txin, spent_claims): - spend_claim_ops = self._spend_claim(txin, spent_claims) + def _remove_claim_or_support(self, txin, spent_claims, zero_delay_claims): + spend_claim_ops = self._remove_claim(txin, spent_claims, zero_delay_claims) if spend_claim_ops: return spend_claim_ops - return self._spend_support(txin) + return self._remove_support(txin, zero_delay_claims) - def _abandon(self, spent_claims): + def _abandon(self, spent_claims) -> typing.Tuple[List['RevertableOp'], typing.Set[str]]: # Handle abandoned claims ops = [] + controlling_claims = {} + need_takeover = set() + for abandoned_claim_hash, (prev_tx_num, prev_idx, name) in spent_claims.items(): # print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {prev_tx_num} {prev_idx}") if (prev_tx_num, prev_idx) in self.pending_claims: pending = self.pending_claims.pop((prev_tx_num, prev_idx)) + self.staged_pending_abandoned[pending.claim_hash] = pending claim_root_tx_num = pending.root_claim_tx_num claim_root_idx = pending.root_claim_tx_position prev_amount = pending.amount @@ -588,9 +668,12 @@ def _abandon(self, spent_claims): prev_effective_amount = pending.effective_amount prev_claims_in_channel_count = pending.claims_in_channel_count else: - claim_root_tx_num, claim_root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( + k, v = self.db.get_root_claim_txo_and_current_amount( abandoned_claim_hash ) + claim_root_tx_num = v.root_tx_num + claim_root_idx = v.root_position + prev_amount = v.amount prev_signing_hash = self.db.get_channel_for_claim(abandoned_claim_hash) prev_claims_in_channel_count = None if prev_signing_hash: @@ -601,6 +684,13 @@ def _abandon(self, spent_claims): abandoned_claim_hash ) + if name not in controlling_claims: + controlling_claims[name] = self.db.get_controlling_claim(name) + controlling = controlling_claims[name] + if controlling and controlling.claim_hash == abandoned_claim_hash: + need_takeover.add(name) + # print("needs takeover") + for (support_tx_num, support_tx_idx) in self.pending_supports[abandoned_claim_hash]: _, support_amount = self.pending_support_txos.pop((support_tx_num, support_tx_idx)) ops.extend( @@ -628,7 +718,7 @@ def _abandon(self, spent_claims): self.effective_amount_changes.pop(abandoned_claim_hash) self.pending_abandon.add(abandoned_claim_hash) - # print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}") + # print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}, {len(need_takeover)} names need takeovers") ops.extend( StagedClaimtrieItem( name, abandoned_claim_hash, prev_amount, prev_effective_amount, @@ -636,20 +726,120 @@ def _abandon(self, spent_claims): claim_root_idx, prev_signing_hash, prev_claims_in_channel_count ).get_abandon_ops(self.db.db) ) - return ops + return ops, need_takeover - def _expire_claims(self, height: int): + def _expire_claims(self, height: int, zero_delay_claims): expired = self.db.get_expired_by_height(height) spent_claims = {} ops = [] + names_needing_takeover = set() for expired_claim_hash, (tx_num, position, name, txi) in expired.items(): if (tx_num, position) not in self.pending_claims: - ops.extend(self._spend_claim(txi, spent_claims)) + ops.extend(self._remove_claim(txi, spent_claims, zero_delay_claims)) if expired: + # do this to follow the same content claim removing pathway as if a claim (possible channel) was abandoned + abandon_ops, _names_needing_takeover = self._abandon(spent_claims) + if abandon_ops: + ops.extend(abandon_ops) + names_needing_takeover.update(_names_needing_takeover) ops.extend(self._abandon(spent_claims)) + return ops, names_needing_takeover + + def _get_pending_claim_amount(self, claim_hash: bytes) -> int: + if claim_hash in self.pending_claim_txos: + return self.pending_claims[self.pending_claim_txos[claim_hash]].amount + return self.db.get_claim_amount(claim_hash) + + def _get_pending_claim_name(self, claim_hash: bytes) -> str: + assert claim_hash is not None + if claim_hash in self.pending_claims: + return self.pending_claims[claim_hash].name + claim = self.db.get_claim_from_txo(claim_hash) + return claim.name + + def _get_pending_effective_amount(self, claim_hash: bytes) -> int: + claim_amount = self._get_pending_claim_amount(claim_hash) or 0 + support_amount = self.db.get_support_amount(claim_hash) or 0 + return claim_amount + support_amount + sum( + self.pending_support_txos[support_txnum, support_n][1] + for (support_txnum, support_n) in self.pending_supports.get(claim_hash, []) + ) # TODO: subtract pending spend supports + + def _get_name_takeover_ops(self, height: int, name: str, + activated_claims: typing.Set[bytes]) -> List['RevertableOp']: + controlling = self.db.get_controlling_claim(name) + if not controlling or controlling.claim_hash in self.pending_abandon: + # print("no controlling claim for ", name) + bid_queue = { + claim_hash: self._get_pending_effective_amount(claim_hash) for claim_hash in activated_claims + } + winning_claim = max(bid_queue, key=lambda k: bid_queue[k]) + if winning_claim in self.pending_claim_txos: + s = self.pending_claims[self.pending_claim_txos[winning_claim]] + else: + s = self.db.make_staged_claim_item(winning_claim) + ops = [] + if s.activation_height > height: + ops.extend(get_force_activate_ops( + name, s.tx_num, s.position, s.claim_hash, s.root_claim_tx_num, s.root_claim_tx_position, + s.amount, s.effective_amount, s.activation_height, height + )) + ops.extend(get_takeover_name_ops(name, winning_claim, height)) + return ops + else: + # print(f"current controlling claim for {name}#{controlling.claim_hash.hex()}") + controlling_effective_amount = self._get_pending_effective_amount(controlling.claim_hash) + bid_queue = { + claim_hash: self._get_pending_effective_amount(claim_hash) for claim_hash in activated_claims + } + highest_newly_activated = max(bid_queue, key=lambda k: bid_queue[k]) + if bid_queue[highest_newly_activated] > controlling_effective_amount: + # print(f"takeover controlling claim for {name}#{controlling.claim_hash.hex()}") + return get_takeover_name_ops(name, highest_newly_activated, height, controlling) + print(bid_queue[highest_newly_activated], controlling_effective_amount) + # print("no takeover") + return [] + + def _get_takeover_ops(self, height: int, zero_delay_claims) -> List['RevertableOp']: + ops = [] + pending = defaultdict(set) + + # get non delayed takeovers for new names + for (name, claim_hash) in zero_delay_claims: + if claim_hash not in self.pending_abandon: + pending[name].add(claim_hash) + # print("zero delay activate", name, claim_hash.hex()) + + # get takeovers from claims activated at this block + for activated in self.db.get_activated_claims_at_height(height): + if activated.claim_hash not in self.pending_abandon: + pending[activated.name].add(activated.claim_hash) + # print("delayed activate") + + # get takeovers from supports for controlling claims being abandoned + for abandoned_claim_hash in self.pending_abandon: + if abandoned_claim_hash in self.staged_pending_abandoned: + abandoned = self.staged_pending_abandoned[abandoned_claim_hash] + controlling = self.db.get_controlling_claim(abandoned.name) + if controlling and controlling.claim_hash == abandoned_claim_hash and abandoned.name not in pending: + pending[abandoned.name].update(self.db.get_claims_for_name(abandoned.name)) + else: + k, v = self.db.get_root_claim_txo_and_current_amount(abandoned_claim_hash) + controlling_claim = self.db.get_controlling_claim(v.name) + if controlling_claim and abandoned_claim_hash == controlling_claim.claim_hash and v.name not in pending: + pending[v.name].update(self.db.get_claims_for_name(v.name)) + # print("check abandoned winning") + + + + # get takeovers from controlling claims being abandoned + + for name, activated_claims in pending.items(): + ops.extend(self._get_name_takeover_ops(height, name, activated_claims)) return ops def advance_block(self, block): + # print("advance ", height) height = self.height + 1 txs: List[Tuple[Tx, bytes]] = block.transactions block_hash = self.coin.header_hash(block.header) @@ -672,7 +862,8 @@ def advance_block(self, block): append_hashX_by_tx = hashXs_by_tx.append hashX_from_script = self.coin.hashX_from_script - # unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} + zero_delay_claims: typing.Dict[Tuple[str, bytes], Tuple[int, int]] = {} + abandoned_or_expired_controlling = set() for tx, tx_hash in txs: spent_claims = {} @@ -690,7 +881,7 @@ def advance_block(self, block): undo_info_append(cache_value) append_hashX(cache_value[:-12]) - spend_claim_or_support_ops = self._spend_claim_or_support(txin, spent_claims) + spend_claim_or_support_ops = self._remove_claim_or_support(txin, spent_claims, zero_delay_claims) if spend_claim_or_support_ops: claimtrie_stash_extend(spend_claim_or_support_ops) @@ -708,15 +899,16 @@ def advance_block(self, block): txo = Output(txout.value, script) claim_or_support_ops = self._add_claim_or_support( - height, tx_hash, tx_count, idx, txo, txout, script, spent_claims + height, tx_hash, tx_count, idx, txo, txout, script, spent_claims, zero_delay_claims ) if claim_or_support_ops: claimtrie_stash_extend(claim_or_support_ops) # Handle abandoned claims - abandon_ops = self._abandon(spent_claims) + abandon_ops, abandoned_controlling_need_takeover = self._abandon(spent_claims) if abandon_ops: claimtrie_stash_extend(abandon_ops) + abandoned_or_expired_controlling.update(abandoned_controlling_need_takeover) append_hashX_by_tx(hashXs) update_touched(hashXs) @@ -725,11 +917,17 @@ def advance_block(self, block): tx_count += 1 # handle expired claims - expired_ops = self._expire_claims(height) + expired_ops, expired_need_takeover = self._expire_claims(height, zero_delay_claims) if expired_ops: - print(f"************\nexpire claims at block {height}\n************") + # print(f"************\nexpire claims at block {height}\n************") + abandoned_or_expired_controlling.update(expired_need_takeover) claimtrie_stash_extend(expired_ops) + # activate claims and process takeovers + takeover_ops = self._get_takeover_ops(height, zero_delay_claims) + if takeover_ops: + claimtrie_stash_extend(takeover_ops) + # self.db.add_unflushed(hashXs_by_tx, self.tx_count) _unflushed = self.db.hist_unflushed _count = 0 @@ -789,7 +987,6 @@ def backup_blocks(self, raw_blocks): coin = self.coin for raw_block in raw_blocks: self.logger.info("backup block %i", self.height) - print("backup", self.height) # Check and update self.tip block = coin.block(raw_block, self.height) header_hash = coin.header_hash(block.header) diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index f41fb5b7a5..52b9f4e331 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -15,6 +15,9 @@ class DB_PREFIXES(enum.Enum): claim_effective_amount_prefix = b'D' claim_expiration = b'O' + claim_takeover = b'P' + pending_activation = b'Q' + undo_claimtrie = b'M' HISTORY_PREFIX = b'A' diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 055e689122..9bc02a8956 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -2,7 +2,7 @@ from typing import Optional from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.prefixes import Prefixes +from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue nOriginalClaimExpirationTime = 262974 nExtendedClaimExpirationTime = 2102400 @@ -13,6 +13,12 @@ nWitnessForkHeight = 680770 # targeting 11 Dec 2019 nAllClaimsInMerkleForkHeight = 658310 # targeting 30 Oct 2019 proportionalDelayFactor = 32 +maxTakeoverDelay = 4032 + + +def get_delay_for_name(blocks_of_continuous_ownership: int) -> int: + return min(blocks_of_continuous_ownership // proportionalDelayFactor, maxTakeoverDelay) + def get_expiration_height(last_updated_height: int) -> int: if last_updated_height < nExtendedClaimExpirationForkHeight: @@ -57,18 +63,21 @@ def get_spend_support_txo_ops(self) -> typing.List[RevertableOp]: def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_effective_amount: int, tx_num: int, position: int, root_tx_num: int, root_position: int, claim_hash: bytes, + activation_height: int, prev_activation_height: int, signing_hash: Optional[bytes] = None, claims_in_channel_count: Optional[int] = None): assert root_position != root_tx_num, f"{tx_num} {position} {root_tx_num} {root_tx_num}" ops = [ RevertableDelete( *Prefixes.claim_effective_amount.pack_item( - name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position + name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position, + prev_activation_height ) ), RevertablePut( *Prefixes.claim_effective_amount.pack_item( - name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position + name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position, + activation_height ) ) ] @@ -88,6 +97,89 @@ def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_e return ops +def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, + previous_winning: Optional[ClaimTakeoverValue] = None): + if previous_winning: + # print(f"takeover previously owned {name} - {claim_hash.hex()} at {takeover_height}") + return [ + RevertableDelete( + *Prefixes.claim_takeover.pack_item( + name, previous_winning.claim_hash, previous_winning.height + ) + ), + RevertablePut( + *Prefixes.claim_takeover.pack_item( + name, claim_hash, takeover_height + ) + ) + ] + # print(f"takeover {name} - {claim_hash[::-1].hex()} at {takeover_height}") + return [ + RevertablePut( + *Prefixes.claim_takeover.pack_item( + name, claim_hash, takeover_height + ) + ) + ] + + +def get_force_activate_ops(name: str, tx_num: int, position: int, claim_hash: bytes, root_claim_tx_num: int, + root_claim_tx_position: int, amount: int, effective_amount: int, + prev_activation_height: int, new_activation_height: int): + return [ + # delete previous + RevertableDelete( + *Prefixes.claim_effective_amount.pack_item( + name, effective_amount, tx_num, position, claim_hash, + root_claim_tx_num, root_claim_tx_position, prev_activation_height + ) + ), + RevertableDelete( + *Prefixes.claim_to_txo.pack_item( + claim_hash, tx_num, position, root_claim_tx_num, root_claim_tx_position, + amount, prev_activation_height, name + ) + ), + RevertableDelete( + *Prefixes.claim_short_id.pack_item( + name, claim_hash, root_claim_tx_num, root_claim_tx_position, tx_num, + position, prev_activation_height + ) + ), + RevertableDelete( + *Prefixes.pending_activation.pack_item( + prev_activation_height, tx_num, position, claim_hash, name + ) + ), + + # insert new + RevertablePut( + *Prefixes.claim_effective_amount.pack_item( + name, effective_amount, tx_num, position, claim_hash, + root_claim_tx_num, root_claim_tx_position, new_activation_height + ) + ), + RevertablePut( + *Prefixes.claim_to_txo.pack_item( + claim_hash, tx_num, position, root_claim_tx_num, root_claim_tx_position, + amount, new_activation_height, name + ) + ), + RevertablePut( + *Prefixes.claim_short_id.pack_item( + name, claim_hash, root_claim_tx_num, root_claim_tx_position, tx_num, + position, new_activation_height + ) + ), + RevertablePut( + *Prefixes.pending_activation.pack_item( + new_activation_height, tx_num, position, claim_hash, name + ) + ) + + ] + + class StagedClaimtrieItem(typing.NamedTuple): name: str claim_hash: bytes @@ -119,21 +211,21 @@ def _get_add_remove_claim_utxo_ops(self, add=True): op( *Prefixes.claim_effective_amount.pack_item( self.name, self.effective_amount, self.tx_num, self.position, self.claim_hash, - self.root_claim_tx_num, self.root_claim_tx_position + self.root_claim_tx_num, self.root_claim_tx_position, self.activation_height ) ), # claim tip by claim hash op( *Prefixes.claim_to_txo.pack_item( self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position, - self.amount, self.name + self.amount, self.activation_height, self.name ) ), # short url resolution op( *Prefixes.claim_short_id.pack_item( self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num, - self.position + self.position, self.activation_height ) ), # claim hash by txo @@ -146,6 +238,12 @@ def _get_add_remove_claim_utxo_ops(self, add=True): self.expiration_height, self.tx_num, self.position, self.claim_hash, self.name ) + ), + # claim activation + op( + *Prefixes.pending_activation.pack_item( + self.activation_height, self.tx_num, self.position, self.claim_hash, self.name + ) ) ] if self.signing_hash and self.claims_in_channel_count is not None: @@ -187,4 +285,3 @@ def get_abandon_ops(self, db) -> typing.List[RevertableOp]: delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash) invalidate_channel_ops = self.get_invalidate_channel_ops(db) return delete_short_id_ops + delete_claim_ops + delete_supports_ops + invalidate_channel_ops - diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index c69d4f3aca..c34bbb0e53 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -46,6 +46,7 @@ class EffectiveAmountValue(typing.NamedTuple): claim_hash: bytes root_tx_num: int root_position: int + activation: int class ClaimToTXOKey(typing.NamedTuple): @@ -58,6 +59,7 @@ class ClaimToTXOValue(typing.NamedTuple): root_tx_num: int root_position: int amount: int + activation: int name: str @@ -81,6 +83,7 @@ class ClaimShortIDKey(typing.NamedTuple): class ClaimShortIDValue(typing.NamedTuple): tx_num: int position: int + activation: int class ClaimToChannelKey(typing.NamedTuple): @@ -134,10 +137,30 @@ class ClaimExpirationValue(typing.NamedTuple): name: str +class ClaimTakeoverKey(typing.NamedTuple): + name: str + + +class ClaimTakeoverValue(typing.NamedTuple): + claim_hash: bytes + height: int + + +class PendingActivationKey(typing.NamedTuple): + height: int + tx_num: int + position: int + + +class PendingActivationValue(typing.NamedTuple): + claim_hash: bytes + name: str + + class EffectiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_effective_amount_prefix.value key_struct = struct.Struct(b'>QLH') - value_struct = struct.Struct(b'>20sLH') + value_struct = struct.Struct(b'>20sLHL') @classmethod def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int): @@ -160,20 +183,20 @@ def unpack_value(cls, data: bytes) -> EffectiveAmountValue: return EffectiveAmountValue(*super().unpack_value(data)) @classmethod - def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int) -> bytes: - return super().pack_value(claim_hash, root_tx_num, root_position) + def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int, activation: int) -> bytes: + return super().pack_value(claim_hash, root_tx_num, root_position, activation) @classmethod def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes, - root_tx_num: int, root_position: int): + root_tx_num: int, root_position: int, activation: int): return cls.pack_key(name, effective_amount, tx_num, position), \ - cls.pack_value(claim_hash, root_tx_num, root_position) + cls.pack_value(claim_hash, root_tx_num, root_position, activation) class ClaimToTXOPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_to_txo.value key_struct = struct.Struct(b'>20sLH') - value_struct = struct.Struct(b'>LHQ') + value_struct = struct.Struct(b'>LHQL') @classmethod def pack_key(cls, claim_hash: bytes, tx_num: int, position: int): @@ -190,21 +213,21 @@ def unpack_key(cls, key: bytes) -> ClaimToTXOKey: ) @classmethod - def unpack_value(cls, data: bytes) ->ClaimToTXOValue: - root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14]) - name_len = int.from_bytes(data[14:16], byteorder='big') - name = data[16:16 + name_len].decode() - return ClaimToTXOValue(root_tx_num, root_position, amount, name) + def unpack_value(cls, data: bytes) -> ClaimToTXOValue: + root_tx_num, root_position, amount, activation = cls.value_struct.unpack(data[:18]) + name_len = int.from_bytes(data[18:20], byteorder='big') + name = data[20:20 + name_len].decode() + return ClaimToTXOValue(root_tx_num, root_position, amount, activation, name) @classmethod - def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes: - return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name) + def pack_value(cls, root_tx_num: int, root_position: int, amount: int, activation: int, name: str) -> bytes: + return cls.value_struct.pack(root_tx_num, root_position, amount, activation) + length_encoded_name(name) @classmethod def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int, - amount: int, name: str): + amount: int, activation: int, name: str): return cls.pack_key(claim_hash, tx_num, position), \ - cls.pack_value(root_tx_num, root_position, amount, name) + cls.pack_value(root_tx_num, root_position, amount, activation, name) class TXOToClaimPrefixRow(PrefixRow): @@ -240,15 +263,15 @@ def pack_item(cls, tx_num: int, position: int, claim_hash: bytes, name: str): class ClaimShortIDPrefixRow(PrefixRow): prefix = DB_PREFIXES.claim_short_id_prefix.value key_struct = struct.Struct(b'>20sLH') - value_struct = struct.Struct(b'>LH') + value_struct = struct.Struct(b'>LHL') @classmethod def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int): return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position) @classmethod - def pack_value(cls, tx_num: int, position: int): - return super().pack_value(tx_num, position) + def pack_value(cls, tx_num: int, position: int, activation: int): + return super().pack_value(tx_num, position, activation) @classmethod def unpack_key(cls, key: bytes) -> ClaimShortIDKey: @@ -263,9 +286,9 @@ def unpack_value(cls, data: bytes) -> ClaimShortIDValue: @classmethod def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int, - tx_num: int, position: int): + tx_num: int, position: int, activation: int): return cls.pack_key(name, claim_hash, root_tx_num, root_position), \ - cls.pack_value(tx_num, position) + cls.pack_value(tx_num, position, activation) class ClaimToChannelPrefixRow(PrefixRow): @@ -418,6 +441,63 @@ def unpack_item(cls, key: bytes, value: bytes) -> typing.Tuple[ClaimExpirationKe return cls.unpack_key(key), cls.unpack_value(value) +class ClaimTakeoverPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_takeover.value + value_struct = struct.Struct(b'>20sL') + + @classmethod + def pack_key(cls, name: str): + return cls.prefix + length_encoded_name(name) + + @classmethod + def pack_value(cls, claim_hash: bytes, takeover_height: int): + return super().pack_value(claim_hash, takeover_height) + + @classmethod + def unpack_key(cls, key: bytes) -> ClaimTakeoverKey: + assert key[:1] == cls.prefix + name_len = int.from_bytes(key[1:3], byteorder='big') + name = key[3:3 + name_len].decode() + return ClaimTakeoverKey(name) + + @classmethod + def unpack_value(cls, data: bytes) -> ClaimTakeoverValue: + return ClaimTakeoverValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, name: str, claim_hash: bytes, takeover_height: int): + return cls.pack_key(name), cls.pack_value(claim_hash, takeover_height) + + +class PendingClaimActivationPrefixRow(PrefixRow): + prefix = DB_PREFIXES.pending_activation.value + key_struct = struct.Struct(b'>LLH') + + @classmethod + def pack_key(cls, height: int, tx_num: int, position: int): + return super().pack_key(height, tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> PendingActivationKey: + return PendingActivationKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, claim_hash: bytes, name: str) -> bytes: + return claim_hash + length_encoded_name(name) + + @classmethod + def unpack_value(cls, data: bytes) -> PendingActivationValue: + claim_hash = data[:20] + name_len = int.from_bytes(data[20:22], byteorder='big') + name = data[22:22 + name_len].decode() + return PendingActivationValue(claim_hash, name) + + @classmethod + def pack_item(cls, height: int, tx_num: int, position: int, claim_hash: bytes, name: str): + return cls.pack_key(height, tx_num, position), \ + cls.pack_value(claim_hash, name) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -432,4 +512,7 @@ class Prefixes: claim_effective_amount = EffectiveAmountPrefixRow claim_expiration = ClaimExpirationPrefixRow + claim_takeover = ClaimTakeoverPrefixRow + pending_activation = PendingClaimActivationPrefixRow + # undo_claimtrie = b'M' diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 533f7a780c..e081000a3e 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -16,6 +16,8 @@ import typing import struct import attr +import zlib +import base64 from typing import Optional, Iterable from functools import partial from asyncio import sleep @@ -34,9 +36,10 @@ from lbry.wallet.server.storage import db_class from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.prefixes import Prefixes +from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name -from lbry.wallet.server.db.claimtrie import get_expiration_height +from lbry.wallet.server.db.claimtrie import get_expiration_height, get_delay_for_name + from lbry.wallet.server.db.elasticsearch import SearchIndex @@ -79,7 +82,7 @@ class FlushData: adds = attr.ib() deletes = attr.ib() tip = attr.ib() - undo_claimtrie = attr.ib() + undo = attr.ib() class ResolveResult(typing.NamedTuple): @@ -89,6 +92,7 @@ class ResolveResult(typing.NamedTuple): position: int tx_hash: bytes height: int + amount: int short_url: str is_controlling: bool canonical_url: str @@ -97,12 +101,14 @@ class ResolveResult(typing.NamedTuple): expiration_height: int effective_amount: int support_amount: int - last_take_over_height: Optional[int] + last_takeover_height: Optional[int] claims_in_channel: Optional[int] channel_hash: Optional[bytes] reposted_claim_hash: Optional[bytes] +OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]] + DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll') DB_STATE_STRUCT_SIZE = 92 @@ -183,12 +189,10 @@ def __init__(self, env): self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout) def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int): - claim_hash_and_name = self.db.get( - DB_PREFIXES.txo_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx) - ) + claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx)) if not claim_hash_and_name: return - return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:] + return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name) def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]: key = Prefixes.support_to_claim.pack_key(tx_num, position) @@ -213,20 +217,22 @@ def get_supports(self, claim_hash: bytes): unpacked_k = Prefixes.claim_to_support.unpack_key(k) unpacked_v = Prefixes.claim_to_support.unpack_value(v) supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount)) - return supports def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, root_tx_num: int, - root_position: int) -> ResolveResult: + root_position: int, activation_height: int) -> ResolveResult: + controlling_claim = self.get_controlling_claim(name) + tx_hash = self.total_transactions[tx_num] height = bisect_right(self.tx_counts, tx_num) created_height = bisect_right(self.tx_counts, root_tx_num) - last_take_over_height = 0 - activation_height = created_height + last_take_over_height = controlling_claim.height expiration_height = get_expiration_height(height) support_amount = self.get_support_amount(claim_hash) - effective_amount = self.get_effective_amount(claim_hash) + claim_amount = self.get_claim_txo_amount(claim_hash, tx_num, position) + + effective_amount = support_amount + claim_amount channel_hash = self.get_channel_for_claim(claim_hash) claims_in_channel = None @@ -235,27 +241,35 @@ def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, if channel_hash: channel_vals = self.get_root_claim_txo_and_current_amount(channel_hash) if channel_vals: - _, _, _, channel_name, _, _ = channel_vals + channel_name = channel_vals[1].name claims_in_channel = self.get_claims_in_channel_count(channel_hash) canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}' return ResolveResult( - name, claim_hash, tx_num, position, tx_hash, height, short_url=short_url, - is_controlling=False, canonical_url=canonical_url, last_take_over_height=last_take_over_height, - claims_in_channel=claims_in_channel, creation_height=created_height, activation_height=activation_height, + name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url, + is_controlling=controlling_claim.claim_hash == claim_hash, canonical_url=canonical_url, + last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel, + creation_height=created_height, activation_height=activation_height, expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, channel_hash=channel_hash, reposted_claim_hash=None ) def _resolve(self, normalized_name: str, claim_id: Optional[str] = None, - amount_order: int = 1) -> Optional[ResolveResult]: + amount_order: Optional[int] = None) -> Optional[ResolveResult]: """ :param normalized_name: name :param claim_id: partial or complete claim id :param amount_order: '$' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided """ + if not amount_order and not claim_id: + # winning resolution + controlling = self.get_controlling_claim(normalized_name) + if not controlling: + return + return self._fs_get_claim_by_hash(controlling.claim_hash) encoded_name = length_encoded_name(normalized_name) amount_order = max(int(amount_order or 1), 1) + if claim_id: # resolve by partial/complete claim id short_claim_hash = bytes.fromhex(claim_id) @@ -263,19 +277,22 @@ def _resolve(self, normalized_name: str, claim_id: Optional[str] = None, for k, v in self.db.iterator(prefix=prefix): key = Prefixes.claim_short_id.unpack_key(k) claim_txo = Prefixes.claim_short_id.unpack_value(v) - return self._prepare_resolve_result(claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name, - key.root_tx_num, key.root_position) + return self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name, key.root_tx_num, + key.root_position, claim_txo.activation + ) return # resolve by amount ordering, 1 indexed - for idx, (k, v) in enumerate(self.db.iterator(prefix=DB_PREFIXES.claim_effective_amount_prefix.value + encoded_name)): + for idx, (k, v) in enumerate(self.db.iterator( + prefix=DB_PREFIXES.claim_effective_amount_prefix.value + encoded_name)): if amount_order > idx + 1: continue key = Prefixes.claim_effective_amount.unpack_key(k) claim_val = Prefixes.claim_effective_amount.unpack_value(v) return self._prepare_resolve_result( key.tx_num, key.position, claim_val.claim_hash, key.name, claim_val.root_tx_num, - claim_val.root_position + claim_val.root_position, claim_val.activation ) return @@ -293,7 +310,7 @@ def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str): return return list(sorted(candidates, key=lambda item: item[1]))[0] - def _fs_resolve(self, url): + def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: try: parsed = URL.parse(url) except ValueError as e: @@ -326,7 +343,7 @@ def _fs_resolve(self, url): return resolved_stream, resolved_channel - async def fs_resolve(self, url): + async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url) def _fs_get_claim_by_hash(self, claim_hash): @@ -335,7 +352,7 @@ def _fs_get_claim_by_hash(self, claim_hash): unpacked_v = Prefixes.claim_to_txo.unpack_value(v) return self._prepare_resolve_result( unpacked_k.tx_num, unpacked_k.position, unpacked_k.claim_hash, unpacked_v.name, - unpacked_v.root_tx_num, unpacked_v.root_position + unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.activation ) async def fs_getclaimbyid(self, claim_id): @@ -352,17 +369,21 @@ def get_root_claim_txo_and_current_amount(self, claim_hash): for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash): unpacked_k = Prefixes.claim_to_txo.unpack_key(k) unpacked_v = Prefixes.claim_to_txo.unpack_value(v) - return unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.amount, unpacked_v.name,\ - unpacked_k.tx_num, unpacked_k.position + return unpacked_k, unpacked_v - def make_staged_claim_item(self, claim_hash: bytes) -> StagedClaimtrieItem: - root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount( - claim_hash - ) + def make_staged_claim_item(self, claim_hash: bytes) -> Optional[StagedClaimtrieItem]: + claim_info = self.get_root_claim_txo_and_current_amount(claim_hash) + k, v = claim_info + root_tx_num = v.root_tx_num + root_idx = v.root_position + value = v.amount + name = v.name + tx_num = k.tx_num + idx = k.position height = bisect_right(self.tx_counts, tx_num) - effective_amount = self.db.get_support_amount(claim_hash) + value + effective_amount = self.get_support_amount(claim_hash) + value signing_hash = self.get_channel_for_claim(claim_hash) - activation_height = 0 + activation_height = v.activation if signing_hash: count = self.get_claims_in_channel_count(signing_hash) else: @@ -372,17 +393,36 @@ def make_staged_claim_item(self, claim_hash: bytes) -> StagedClaimtrieItem: root_tx_num, root_idx, signing_hash, count ) - def get_effective_amount(self, claim_hash): + def get_claim_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]: + v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash, tx_num, position)) + if v: + return Prefixes.claim_to_txo.unpack_value(v).amount + + def get_claim_from_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: + assert claim_hash for v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_key=False): - return Prefixes.claim_to_txo.unpack_value(v).amount + self.get_support_amount(claim_hash) - fnord - return None + return Prefixes.claim_to_txo.unpack_value(v) + + def get_claim_amount(self, claim_hash: bytes) -> Optional[int]: + claim = self.get_claim_from_txo(claim_hash) + if claim: + return claim.amount + + def get_effective_amount(self, claim_hash: bytes): + return (self.get_claim_amount(claim_hash) or 0) + self.get_support_amount(claim_hash) def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: int): claim_info = self.get_root_claim_txo_and_current_amount(claim_hash) if not claim_info: return [] - root_tx_num, root_position, amount, name, tx_num, position = claim_info + + root_tx_num = claim_info[1].root_tx_num + root_position = claim_info[1].root_position + amount = claim_info[1].amount + name = claim_info[1].name + tx_num = claim_info[0].tx_num + position = claim_info[0].position + activation = claim_info[1].activation signing_hash = self.get_channel_for_claim(claim_hash) claims_in_channel_count = None if signing_hash: @@ -390,7 +430,8 @@ def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: i prev_effective_amount = self.get_effective_amount(claim_hash) return get_update_effective_amount_ops( name, effective_amount, prev_effective_amount, tx_num, position, - root_tx_num, root_position, claim_hash, signing_hash, claims_in_channel_count + root_tx_num, root_position, claim_hash, activation, activation, signing_hash, + claims_in_channel_count ) def get_claims_in_channel_count(self, channel_hash) -> int: @@ -415,6 +456,35 @@ def get_expired_by_height(self, height: int): ) return expired + def get_controlling_claim(self, name: str) -> Optional[ClaimTakeoverValue]: + controlling = self.db.get(Prefixes.claim_takeover.pack_key(name)) + if not controlling: + return + return Prefixes.claim_takeover.unpack_value(controlling) + + def get_claims_for_name(self, name: str): + claim_hashes = set() + for k in self.db.iterator(prefix=Prefixes.claim_short_id.prefix + length_encoded_name(name), + include_value=False): + claim_hashes.add(Prefixes.claim_short_id.unpack_key(k).claim_hash) + return claim_hashes + + def get_activated_claims_at_height(self, height: int) -> typing.Set[PendingActivationValue]: + claims = set() + prefix = Prefixes.pending_activation.prefix + height.to_bytes(4, byteorder='big') + for _v in self.db.iterator(prefix=prefix, include_key=False): + v = Prefixes.pending_activation.unpack_value(_v) + claims.add(v) + return claims + + def get_activation_delay(self, claim_hash: bytes, name: str) -> int: + controlling = self.get_controlling_claim(name) + if not controlling: + return 0 + if claim_hash == controlling.claim_hash: + return 0 + return get_delay_for_name(self.db_height - controlling.height) + async def _read_tx_counts(self): if self.tx_counts is not None: return @@ -685,9 +755,9 @@ def flush_dbs(self, flush_data: FlushData): batch_delete(staged_change.key) flush_data.claimtrie_stash.clear() - for undo_claims, height in flush_data.undo_claimtrie: - batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims) - flush_data.undo_claimtrie.clear() + for undo_ops, height in flush_data.undo: + batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_ops) + flush_data.undo.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count @@ -788,11 +858,10 @@ def flush_backup(self, flush_data, touched): claim_reorg_height = self.fs_height # print("flush undos", flush_data.undo_claimtrie) - for (ops, height) in reversed(flush_data.undo_claimtrie): - claimtrie_ops = RevertableOp.unpack_stack(ops) - print("%i undo ops for %i" % (len(claimtrie_ops), height)) - for op in reversed(claimtrie_ops): - print("REWIND", op) + for (packed_ops, height) in reversed(flush_data.undo): + undo_ops = RevertableOp.unpack_stack(packed_ops) + for op in reversed(undo_ops): + # print("REWIND", op) if op.is_put: batch_put(op.key, op.value) else: @@ -800,7 +869,7 @@ def flush_backup(self, flush_data, touched): batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height)) claim_reorg_height -= 1 - flush_data.undo_claimtrie.clear() + flush_data.undo.clear() flush_data.claimtrie_stash.clear() while self.fs_height > flush_data.height: @@ -828,9 +897,9 @@ def flush_backup(self, flush_data, touched): batch_put(key, value) # New undo information - for undo_info, height in flush_data.undo_infos: + for undo_info, height in flush_data.undo: batch.put(self.undo_key(height), b''.join(undo_info)) - flush_data.undo_infos.clear() + flush_data.undo.clear() # Spends for key in sorted(flush_data.deletes): @@ -1023,9 +1092,9 @@ def min_undo_height(self, max_height): """Returns a height from which we should store undo info.""" return max_height - self.env.reorg_limit + 1 - def undo_key(self, height): + def undo_key(self, height: int) -> bytes: """DB key for undo information at the given height.""" - return UNDO_PREFIX + pack('>I', height) + return DB_PREFIXES.UNDO_PREFIX.value + pack('>I', height) def read_undo_info(self, height): """Read undo information from a file for the current height."""