Skip to content

Commit

Permalink
db state struct
Browse files Browse the repository at this point in the history
  • Loading branch information
jackrobison committed Feb 24, 2021
1 parent 66c50c1 commit 5698bb0
Showing 1 changed file with 53 additions and 75 deletions.
128 changes: 53 additions & 75 deletions lbry/wallet/server/leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,42 @@ class ResolveResult(typing.NamedTuple):
reposted_claim_hash: Optional[bytes]


DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')


class DBState(typing.NamedTuple):
genesis: bytes
height: int
tx_count: int
tip: bytes
utxo_flush_count: int
wall_time: int
first_sync: bool
db_version: int
hist_flush_count: int
comp_flush_count: int
comp_cursor: int

def pack(self) -> bytes:
return DB_STATE_STRUCT.pack(
self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count,
self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count,
self.comp_flush_count, self.comp_cursor
)

@classmethod
def unpack(cls, packed: bytes) -> 'DBState':
return cls(*DB_STATE_STRUCT.unpack(packed[:90]))


class LevelDB:
"""Simple wrapper of the backend database for querying.
Performs no DB update, though the DB will be cleaned on opening if
it was shutdown uncleanly.
"""

DB_VERSIONS = [6]
HIST_DB_VERSIONS = [0, 6]
DB_VERSIONS = HIST_DB_VERSIONS = [7]

class DBError(Exception):
"""Raised on general DB errors generally indicating corruption."""
Expand Down Expand Up @@ -469,16 +496,16 @@ async def _open_dbs(self, for_sync, compacting):
self.hist_flush_count = state['flush_count']
self.hist_comp_flush_count = state.get('comp_flush_count', -1)
self.hist_comp_cursor = state.get('comp_cursor', -1)
self.hist_db_version = state.get('db_version', 0)
self.hist_db_version = state.get('db_version', max(self.DB_VERSIONS))
else:
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.hist_db_version = max(self.HIST_DB_VERSIONS)
self.hist_db_version = max(self.DB_VERSIONS)

self.logger.info(f'history DB version: {self.hist_db_version}')
if self.hist_db_version not in self.HIST_DB_VERSIONS:
msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}'
if self.hist_db_version not in self.DB_VERSIONS:
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
self.logger.error(msg)
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
Expand Down Expand Up @@ -630,17 +657,6 @@ def flush_utxo_db(self, batch, flush_data):
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip

def write_history_state(self, batch):
state = {
'flush_count': self.hist_flush_count,
'comp_flush_count': self.hist_comp_flush_count,
'comp_cursor': self.hist_comp_cursor,
'db_version': self.db_version,
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(DB_PREFIXES.HIST_STATE.value, repr(state).encode())

def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
Expand Down Expand Up @@ -716,7 +732,6 @@ def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
for hashX in sorted(unflushed):
key = hashX + flush_id
batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes())
self.write_history_state(batch)

unflushed.clear()
self.hist_unflushed_count = 0
Expand Down Expand Up @@ -760,29 +775,18 @@ def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip

# self.flush_state(batch)
#
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)

# # Update and put the wall time again - otherwise we drop the
# # time it took to commit the batch
# # self.flush_state(self.db)
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)

self.write_db_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.hist_flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')

# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
Expand All @@ -794,14 +798,6 @@ def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')

# def flush_state(self, batch):
# """Flush chain state to the batch."""
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)

def flush_backup(self, flush_data, touched):
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
assert not flush_data.headers
Expand Down Expand Up @@ -865,9 +861,6 @@ def flush_backup(self, flush_data, touched):
for key, value in puts.items():
batch_put(key, value)


self.write_history_state(batch)

# New undo information
for undo_info, height in flush_data.undo_infos:
batch.put(self.undo_key(height), b''.join(undo_info))
Expand Down Expand Up @@ -906,14 +899,12 @@ def flush_backup(self, flush_data, touched):
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip



# Flush state last as it reads the wall time.
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
self.write_db_state(batch)


self.logger.info(f'backing up removed {nremoves:,d} history entries')
Expand Down Expand Up @@ -1131,26 +1122,22 @@ def read_utxo_state(self):
self.wall_time = 0
self.first_sync = True
else:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from DB')
self.db_version = state['db_version']
state = DBState.unpack(state)
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
raise self.DBError(f'your UTXO DB version is {self.db_version} but this '
f'software only handles versions {self.DB_VERSIONS}')
# backwards compat
genesis_hash = state['genesis']
if isinstance(genesis_hash, bytes):
genesis_hash = genesis_hash.decode()
if genesis_hash != self.coin.GENESIS_HASH:
genesis_hash = state.genesis
if genesis_hash.hex() != self.coin.GENESIS_HASH:
raise self.DBError(f'DB genesis hash {genesis_hash} does not '
f'match coin {self.coin.GENESIS_HASH}')
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
self.db_height = state.height
self.db_tx_count = state.tx_count
self.db_tip = state.tip
self.utxo_flush_count = state.utxo_flush_count
self.wall_time = state.wall_time
self.first_sync = state.first_sync

# These are our state as we move ahead of DB state
self.fs_height = self.db_height
Expand All @@ -1169,24 +1156,15 @@ def read_utxo_state(self):
if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')

def write_utxo_state(self, batch):
def write_db_state(self, batch):
"""Write (UTXO) state to the batch."""
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(DB_PREFIXES.UTXO_STATE.value, repr(state).encode())

def set_flush_count(self, count):
self.utxo_flush_count = count
with self.db.write_batch() as batch:
self.write_utxo_state(batch)
batch.put(
DB_PREFIXES.UTXO_STATE.value, DBState(
bytes.fromhex(self.coin.GENESIS_HASH), self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor
).pack()
)

async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""
Expand Down

0 comments on commit 5698bb0

Please sign in to comment.