Skip to content

Commit

Permalink
ethdb, core: implement delete for db batch (#17101)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 authored and karalabe committed Jul 2, 2018
1 parent fdfd6d3 commit a4a2343
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 14 deletions.
9 changes: 6 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ func (bc *BlockChain) SetHead(head uint64) error {
defer bc.mu.Unlock()

// Rewind the header chain, deleting all block bodies until then
delFn := func(hash common.Hash, num uint64) {
rawdb.DeleteBody(bc.db, hash, num)
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
rawdb.DeleteBody(db, hash, num)
}
bc.hc.SetHead(head, delFn)
currentHeader := bc.hc.CurrentHeader()
Expand Down Expand Up @@ -1340,9 +1340,12 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
diff := types.TxDifference(deletedTxs, addedTxs)
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
batch := bc.db.NewBatch()
for _, tx := range diff {
rawdb.DeleteTxLookupEntry(bc.db, tx.Hash())
rawdb.DeleteTxLookupEntry(batch, tx.Hash())
}
batch.Write()

if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
Expand Down
19 changes: 12 additions & 7 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
// Delete any canonical number assignments above the new head
batch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(hc.chainDb, i)
rawdb.DeleteCanonicalHash(batch, i)
}
batch.Write()

// Overwrite any stale canonical number assignments
var (
headHash = header.ParentHash
Expand Down Expand Up @@ -438,7 +441,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {

// DeleteCallback is a callback function that is called by SetHead before
// each header is deleted.
type DeleteCallback func(common.Hash, uint64)
type DeleteCallback func(rawdb.DatabaseDeleter, common.Hash, uint64)

// SetHead rewinds the local chain to a new head. Everything above the new head
// will be deleted and the new one set.
Expand All @@ -448,22 +451,24 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
if hdr := hc.CurrentHeader(); hdr != nil {
height = hdr.Number.Uint64()
}

batch := hc.chainDb.NewBatch()
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash := hdr.Hash()
num := hdr.Number.Uint64()
if delFn != nil {
delFn(hash, num)
delFn(batch, hash, num)
}
rawdb.DeleteHeader(hc.chainDb, hash, num)
rawdb.DeleteTd(hc.chainDb, hash, num)
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)

hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
}
// Roll back the canonical chain numbering
for i := height; i > head; i-- {
rawdb.DeleteCanonicalHash(hc.chainDb, i)
rawdb.DeleteCanonicalHash(batch, i)
}
batch.Write()

// Clear out any stale content from the caches
hc.headerCache.Purge()
hc.tdCache.Purge()
Expand Down
10 changes: 10 additions & 0 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ func (b *ldbBatch) Put(key, value []byte) error {
return nil
}

func (b *ldbBatch) Delete(key []byte) error {
b.b.Delete(key)
b.size += 1
return nil
}

func (b *ldbBatch) Write() error {
return b.db.Write(b.b, nil)
}
Expand Down Expand Up @@ -453,6 +459,10 @@ func (tb *tableBatch) Put(key, value []byte) error {
return tb.batch.Put(append([]byte(tb.prefix), key...), value)
}

func (tb *tableBatch) Delete(key []byte) error {
return tb.batch.Delete(append([]byte(tb.prefix), key...))
}

func (tb *tableBatch) Write() error {
return tb.batch.Write()
}
Expand Down
8 changes: 7 additions & 1 deletion ethdb/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ type Putter interface {
Put(key []byte, value []byte) error
}

// Deleter wraps the database delete operation supported by both batches and regular databases.
type Deleter interface {
Delete(key []byte) error
}

// Database wraps all database operations. All methods are safe for concurrent use.
type Database interface {
Putter
Deleter
Get(key []byte) ([]byte, error)
Has(key []byte) (bool, error)
Delete(key []byte) error
Close()
NewBatch() Batch
}
Expand All @@ -39,6 +44,7 @@ type Database interface {
// when Write is called. Batch cannot be used concurrently.
type Batch interface {
Putter
Deleter
ValueSize() int // amount of data in the batch
Write() error
// Reset resets the batch for reuse
Expand Down
9 changes: 9 additions & 0 deletions ethdb/memory_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,20 @@ func (b *memBatch) Put(key, value []byte) error {
return nil
}

func (b *memBatch) Delete(key []byte) error {
b.writes = append(b.writes, kv{common.CopyBytes(key), nil})
return nil
}

func (b *memBatch) Write() error {
b.db.lock.Lock()
defer b.db.lock.Unlock()

for _, kv := range b.writes {
if kv.v == nil {
delete(b.db.db, string(kv.k))
continue
}
b.db.db[string(kv.k)] = kv.v
}
return nil
Expand Down
10 changes: 7 additions & 3 deletions light/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,17 @@ func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number
// rollbackTxs marks the transactions contained in recently rolled back blocks
// as rolled back. It also removes any positional lookup entries.
func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) {
batch := pool.chainDb.NewBatch()
if list, ok := pool.mined[hash]; ok {
for _, tx := range list {
txHash := tx.Hash()
rawdb.DeleteTxLookupEntry(pool.chainDb, txHash)
rawdb.DeleteTxLookupEntry(batch, txHash)
pool.pending[txHash] = tx
txc.setState(txHash, false)
}
delete(pool.mined, hash)
}
batch.Write()
}

// reorgOnNewHead sets a new head header, processing (and rolling back if necessary)
Expand Down Expand Up @@ -504,14 +506,16 @@ func (self *TxPool) Content() (map[common.Address]types.Transactions, map[common
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()

var hashes []common.Hash
batch := self.chainDb.NewBatch()
for _, tx := range txs {
//self.RemoveTx(tx.Hash())
hash := tx.Hash()
delete(self.pending, hash)
self.chainDb.Delete(hash[:])
batch.Delete(hash.Bytes())
hashes = append(hashes, hash)
}
batch.Write()
self.relay.Discard(hashes)
}

Expand Down

0 comments on commit a4a2343

Please sign in to comment.