Skip to content

Commit

Permalink
core, eth/filter: implement bloombits matcher, remove mip map filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Aug 19, 2017
1 parent b5042bd commit 900d968
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 463 deletions.
15 changes: 0 additions & 15 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
log.Crit("Failed to write block receipts", "err", err)
return
}
if err := WriteMipmapBloom(bc.chainDb, block.NumberU64(), receipts); err != nil {
errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
atomic.AddInt32(&failed, 1)
log.Crit("Failed to write log blooms", "err", err)
return
}
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
errs[index] = fmt.Errorf("failed to write lookup metadata: %v", err)
atomic.AddInt32(&failed, 1)
Expand Down Expand Up @@ -1018,10 +1012,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
return i, err
}
// Write map map bloom filters
if err := WriteMipmapBloom(bc.chainDb, block.NumberU64(), receipts); err != nil {
return i, err
}
// Write hash preimages
if err := WritePreimages(bc.chainDb, block.NumberU64(), state.Preimages()); err != nil {
return i, err
Expand Down Expand Up @@ -1179,11 +1169,6 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
return err
}
// Write map map bloom filters
receipts := GetBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64())
if err := WriteMipmapBloom(bc.chainDb, block.NumberU64(), receipts); err != nil {
return err
}
addedTxs = append(addedTxs, block.Transactions()...)
}

Expand Down
69 changes: 21 additions & 48 deletions core/database_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -48,9 +47,6 @@ var (
lookupPrefix = []byte("l") // lookupPrefix + hash -> transaction/receipt lookup metadata
preimagePrefix = "secure-key-" // preimagePrefix + hash -> preimage

mipmapPre = []byte("mipmap-log-bloom-")
MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000}

configPrefix = []byte("ethereum-config-") // config prefix for the db

// used by old db, now only used for conversion
Expand All @@ -59,10 +55,10 @@ var (

ErrChainConfigNotFound = errors.New("ChainConfig not found") // general config not found error

mipmapBloomMu sync.Mutex // protect against race condition when updating mipmap blooms

preimageCounter = metrics.NewCounter("db/preimage/total")
preimageHitCounter = metrics.NewCounter("db/preimage/hits")

bloomBitsPrefix = []byte("bloomBits-")
)

// txLookupEntry is a positional metadata to help looking up the data content of
Expand Down Expand Up @@ -497,48 +493,6 @@ func DeleteTxLookupEntry(db ethdb.Database, hash common.Hash) {
db.Delete(append(lookupPrefix, hash.Bytes()...))
}

// returns a formatted MIP mapped key by adding prefix, canonical number and level
//
// ex. fn(98, 1000) = (prefix || 1000 || 0)
func mipmapKey(num, level uint64) []byte {
lkey := make([]byte, 8)
binary.BigEndian.PutUint64(lkey, level)
key := new(big.Int).SetUint64(num / level * level)

return append(mipmapPre, append(lkey, key.Bytes()...)...)
}

// WriteMipmapBloom writes each address included in the receipts' logs to the
// MIP bloom bin.
func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error {
mipmapBloomMu.Lock()
defer mipmapBloomMu.Unlock()

batch := db.NewBatch()
for _, level := range MIPMapLevels {
key := mipmapKey(number, level)
bloomDat, _ := db.Get(key)
bloom := types.BytesToBloom(bloomDat)
for _, receipt := range receipts {
for _, log := range receipt.Logs {
bloom.Add(log.Address.Big())
}
}
batch.Put(key, bloom.Bytes())
}
if err := batch.Write(); err != nil {
return fmt.Errorf("mipmap write fail for: %d: %v", number, err)
}
return nil
}

// GetMipmapBloom returns a bloom filter using the number and level as input
// parameters. For available levels see MIPMapLevels.
func GetMipmapBloom(db ethdb.Database, number, level uint64) types.Bloom {
bloomDat, _ := db.Get(mipmapKey(number, level))
return types.BytesToBloom(bloomDat)
}

// PreimageTable returns a Database instance with the key prefix for preimage entries.
func PreimageTable(db ethdb.Database) ethdb.Database {
return ethdb.NewTable(db, preimagePrefix)
Expand Down Expand Up @@ -637,3 +591,22 @@ func FindCommonAncestor(db ethdb.Database, a, b *types.Header) *types.Header {
}
return a
}

// GetBloomBits reads the compressed bloomBits vector belonging to the given section and bit index from the db
func GetBloomBits(db ethdb.Database, bitIdx, sectionIdx uint64, sectionHead common.Hash) ([]byte, error) {
var encKey [10]byte
binary.BigEndian.PutUint16(encKey[0:2], uint16(bitIdx))
binary.BigEndian.PutUint64(encKey[2:10], sectionIdx)
key := append(append(bloomBitsPrefix, encKey[:]...), sectionHead.Bytes()...)
bloomBits, err := db.Get(key)
return bloomBits, err
}

// StoreBloomBits writes the compressed bloomBits vector belonging to the given section and bit index to the db
func StoreBloomBits(db ethdb.Database, bitIdx, sectionIdx uint64, sectionHead common.Hash, bloomBits []byte) {
var encKey [10]byte
binary.BigEndian.PutUint16(encKey[0:2], uint16(bitIdx))
binary.BigEndian.PutUint64(encKey[2:10], sectionIdx)
key := append(append(bloomBitsPrefix, encKey[:]...), sectionHead.Bytes()...)
db.Put(key, bloomBits)
}
108 changes: 0 additions & 108 deletions core/database_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ package core

import (
"bytes"
"io/ioutil"
"math/big"
"os"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)

Expand Down Expand Up @@ -390,107 +386,3 @@ func TestBlockReceiptStorage(t *testing.T) {
t.Fatalf("deleted receipts returned: %v", rs)
}
}

func TestMipmapBloom(t *testing.T) {
db, _ := ethdb.NewMemDatabase()

receipt1 := new(types.Receipt)
receipt1.Logs = []*types.Log{
{Address: common.BytesToAddress([]byte("test"))},
{Address: common.BytesToAddress([]byte("address"))},
}
receipt2 := new(types.Receipt)
receipt2.Logs = []*types.Log{
{Address: common.BytesToAddress([]byte("test"))},
{Address: common.BytesToAddress([]byte("address1"))},
}

WriteMipmapBloom(db, 1, types.Receipts{receipt1})
WriteMipmapBloom(db, 2, types.Receipts{receipt2})

for _, level := range MIPMapLevels {
bloom := GetMipmapBloom(db, 2, level)
if !bloom.Test(new(big.Int).SetBytes([]byte("address1"))) {
t.Error("expected test to be included on level:", level)
}
}

// reset
db, _ = ethdb.NewMemDatabase()
receipt := new(types.Receipt)
receipt.Logs = []*types.Log{
{Address: common.BytesToAddress([]byte("test"))},
}
WriteMipmapBloom(db, 999, types.Receipts{receipt1})

receipt = new(types.Receipt)
receipt.Logs = []*types.Log{
{Address: common.BytesToAddress([]byte("test 1"))},
}
WriteMipmapBloom(db, 1000, types.Receipts{receipt})

bloom := GetMipmapBloom(db, 1000, 1000)
if bloom.TestBytes([]byte("test")) {
t.Error("test should not have been included")
}
}

func TestMipmapChain(t *testing.T) {
dir, err := ioutil.TempDir("", "mipmap")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

var (
db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))

hash1 = common.BytesToHash([]byte("topic1"))
)
defer db.Close()

gspec := &Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{addr: {Balance: big.NewInt(1000000)}},
}
genesis := gspec.MustCommit(db)
chain, receipts := GenerateChain(params.TestChainConfig, genesis, db, 1010, func(i int, gen *BlockGen) {
var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.Logs = []*types.Log{{Address: addr, Topics: []common.Hash{hash1}}}
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
case 1000:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.Logs = []*types.Log{{Address: addr2}}
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}

}

// store the receipts
WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
WriteBlock(db, block)
if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
if err := WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}

bloom := GetMipmapBloom(db, 0, 1000)
if bloom.TestBytes(addr2[:]) {
t.Error("address was included in bloom and should not have")
}
}
14 changes: 14 additions & 0 deletions core/types/bloom9.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ func LogsBloom(logs []*Log) *big.Int {
return bin
}

type BloomIndexList [3]uint

// BloomIndexes returns the bloom filter bit indexes belonging to the given key
func BloomIndexes(b []byte) BloomIndexList {
b = crypto.Keccak256(b[:])

var r [3]uint
for i, _ := range r {
r[i] = (uint(b[i+i+1]) + (uint(b[i+i]) << 8)) & 2047
}

return r
}

func bloom9(b []byte) *big.Int {
b = crypto.Keccak256(b[:])

Expand Down
27 changes: 27 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -202,3 +203,29 @@ func (b *EthApiBackend) EventMux() *event.TypeMux {
func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
}

func (b *EthApiBackend) GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) {
results := make([][]byte, len(sectionIdxList))
var err error
for i, sectionIdx := range sectionIdxList {
sectionHead := core.GetCanonicalHash(b.eth.chainDb, (sectionIdx+1)*bloomBitsSection-1)
results[i], err = core.GetBloomBits(b.eth.chainDb, bitIdx, sectionIdx, sectionHead)
if err != nil {
return nil, err
}
}
return results, nil
}

func (b *EthApiBackend) BloomBitsSections() uint64 {
sections, _, _ := b.eth.bbIndexer.Sections()
return sections
}

func (b *EthApiBackend) BloomBitsConfig() filters.BloomConfig {
return filters.BloomConfig{
SectionSize: bloomBitsSection,
MaxRequestLen: 16,
MaxRequestWait: 0,
}
}
10 changes: 6 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager

bbIndexer *core.ChainIndexer

ApiBackend *EthApiBackend

miner *miner.Miner
Expand Down Expand Up @@ -121,11 +123,9 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bbIndexer: NewBloomBitsProcessor(chainDb, bloomBitsSection),
}

if err := addMipmapBloomBins(chainDb); err != nil {
return nil, err
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)

if !config.SkipBcVersionCheck {
Expand All @@ -147,6 +147,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bbIndexer.Start(eth.blockchain)

if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
Expand Down Expand Up @@ -268,7 +269,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.ApiBackend, false),
Service: filters.NewPublicFilterAPI(s.ApiBackend, false, bloomBitsSection),
Public: true,
}, {
Namespace: "admin",
Expand Down Expand Up @@ -389,6 +390,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
s.bbIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {
Expand Down
Loading

0 comments on commit 900d968

Please sign in to comment.