Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of eth_getLogs #583

Merged
merged 13 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ type SimulatedBackend struct {
database ethdb.Database // In memory database to store our testing data
blockchain *core.BlockChain // Ethereum blockchain to handle the consensus

mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on on request
mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live
events *filters.EventSystem // for filtering log events live
filterSystem *filters.FilterSystem // for filtering database logs

config *params.ChainConfig
}
Expand Down Expand Up @@ -94,9 +96,7 @@ func SimulateWalletAddressAndSignFn() (common.Address, func(account accounts.Acc

// XDC simulated backend for testing purpose.
func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfig *params.ChainConfig) *SimulatedBackend {
// database := ethdb.NewMemDatabase()
database := rawdb.NewMemoryDatabase()

genesis := core.Genesis{
GasLimit: gasLimit, // need this big, support initial smart contract
Config: chainConfig,
Expand Down Expand Up @@ -126,8 +126,12 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
}

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

blockchain.Client = backend
backend.rollback()
return backend
Expand All @@ -146,8 +150,12 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
}

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

backend.rollback()
return backend
}
Expand Down Expand Up @@ -400,7 +408,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}

// Include tx in chain.
blocks, _ := core.GenerateChain(b.config, block, b.blockchain.Engine(), b.database, 1, func(number int, block *core.BlockGen) {
blocks, receipts := core.GenerateChain(b.config, block, b.blockchain.Engine(), b.database, 1, func(number int, block *core.BlockGen) {
for _, tx := range b.pendingBlock.Transactions() {
block.AddTxWithChain(b.blockchain, tx)
}
Expand All @@ -410,6 +418,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database())
b.pendingReceipts = receipts[0]
return nil
}

Expand All @@ -421,7 +430,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
Expand All @@ -433,7 +442,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -523,8 +532,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
type filterBackend struct {
db ethdb.Database
bc *core.BlockChain
db ethdb.Database
bc *core.BlockChain
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
Expand All @@ -545,35 +555,51 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash))
if receipts == nil {
return nil, nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
func (fb *filterBackend) GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) {
if body := fb.bc.GetBody(hash); body != nil {
return body, nil
}
return nil, errors.New("block body not found")
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
logs := rawdb.ReadLogs(fb.db, hash, number)
return logs, nil
}

func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
return nullSubscription()
}

func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}

func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}

func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}

func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return nullSubscription()
}

func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }

func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
}

func nullSubscription() event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}
1 change: 1 addition & 0 deletions cmd/XDC/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var (
utils.CacheDatabaseFlag,
//utils.CacheGCFlag,
//utils.TrieCacheGenFlag,
utils.CacheLogSizeFlag,
utils.FDLimitFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
Expand Down
24 changes: 24 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import (
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/eth/ethconfig"
"github.com/XinFinOrg/XDPoSChain/eth/filters"
"github.com/XinFinOrg/XDPoSChain/eth/gasprice"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/internal/ethapi"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/metrics/exp"
Expand All @@ -54,6 +56,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rpc"
whisper "github.com/XinFinOrg/XDPoSChain/whisper/whisperv6"
gopsutil "github.com/shirou/gopsutil/mem"
"gopkg.in/urfave/cli.v1"
Expand Down Expand Up @@ -316,6 +319,11 @@ var (
Usage: "Percentage of cache memory allowance to use for trie pruning",
Value: 25,
}
CacheLogSizeFlag = &cli.IntFlag{
Name: "cache.blocklogs",
Usage: "Size (in number of blocks) of the log cache for filtering",
Value: ethconfig.Defaults.FilterLogCacheSize,
}
FDLimitFlag = cli.IntFlag{
Name: "fdlimit",
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
Expand Down Expand Up @@ -1244,6 +1252,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(GasPriceFlag.Name) {
cfg.GasPrice = GlobalBig(ctx, GasPriceFlag.Name)
}
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if ctx.GlobalIsSet(VMEnableDebugFlag.Name) {
// TODO(fjl): force-enable this in --dev mode
cfg.EnablePreimageRecording = ctx.GlobalBool(VMEnableDebugFlag.Name)
Expand Down Expand Up @@ -1443,6 +1454,19 @@ func WalkMatch(root, pattern string) ([]string, error) {
return matches, nil
}

// RegisterFilterAPI adds the eth log filtering RPC API to the node.
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, isLightClient),
}})
return filterSystem
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
Expand Down
3 changes: 1 addition & 2 deletions cmd/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func RegisterShhService(stack *node.Node, cfg *whisper.Config) {
}
}

// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
// th egiven node.
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
func RegisterEthStatsService(stack *node.Node, url string) {
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
// Retrieve both eth and les services
Expand Down
6 changes: 3 additions & 3 deletions core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ func makeChainForBench(db ethdb.Database, full bool, count uint64) {
ReceiptHash: types.EmptyRootHash,
}
hash = header.Hash()
WriteHeader(db, header)
WriteCanonicalHash(db, hash, n)
rawdb.WriteHeader(db, header)
rawdb.WriteCanonicalHash(db, hash, n)
WriteTd(db, hash, n, big.NewInt(int64(n+1)))
if full || n == 0 {
block := types.NewBlockWithHeader(header)
WriteBody(db, hash, n, block.Body())
rawdb.WriteBody(db, hash, n, block.Body())
WriteBlockReceipts(db, hash, n, nil)
}
}
Expand Down
37 changes: 10 additions & 27 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
contractValidator "github.com/XinFinOrg/XDPoSChain/contracts/validator/contract"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
Expand Down Expand Up @@ -437,9 +438,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
if err := WriteHeadBlockHash(bc.db, currentBlock.Hash()); err != nil {
log.Crit("Failed to reset head full block", "err", err)
}
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
if err := WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil {
log.Crit("Failed to reset head fast block", "err", err)
}
Expand Down Expand Up @@ -586,9 +585,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
log.Crit("Failed to write genesis block TD", "err", err)
}
if err := WriteBlock(bc.db, genesis); err != nil {
log.Crit("Failed to write genesis block", "err", err)
}
rawdb.WriteBlock(bc.db, genesis)
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock, false)
bc.currentBlock.Store(bc.genesisBlock)
Expand Down Expand Up @@ -685,17 +682,9 @@ func (bc *BlockChain) insert(block *types.Block, writeBlock bool) {
updateHeads := GetCanonicalHash(bc.db, block.NumberU64()) != block.Hash()

// Add the block to the canonical chain number scheme and mark as the head
if err := WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()); err != nil {
log.Crit("Failed to insert block number", "err", err)
}
if err := WriteHeadBlockHash(bc.db, block.Hash()); err != nil {
log.Crit("Failed to insert head block hash", "err", err)
}
if writeBlock {
if err := WriteBlock(bc.db, block); err != nil {
log.Crit("Failed to insert block", "err", err)
}
}
JukLee0ira marked this conversation as resolved.
Show resolved Hide resolved
rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(bc.db, block.Hash())
rawdb.WriteBlock(bc.db, block)
bc.currentBlock.Store(block)

// save cache BlockSigners
Expand Down Expand Up @@ -1044,7 +1033,7 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
bc.currentBlock.Store(newBlock)
WriteHeadBlockHash(bc.db, newBlock.Hash())
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
}
}
}
Expand Down Expand Up @@ -1134,9 +1123,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return i, fmt.Errorf("failed to set receipts data: %v", err)
}
// Write all the data out into the database
if err := WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return i, fmt.Errorf("failed to write block body: %v", err)
}
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return i, fmt.Errorf("failed to write block receipts: %v", err)
}
Expand Down Expand Up @@ -1196,9 +1183,7 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
return err
}
if err := WriteBlock(bc.db, block); err != nil {
return err
}
rawdb.WriteBlock(bc.db, block)
return nil
}

Expand Down Expand Up @@ -1226,9 +1211,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
}
// Write other block data using a batch.
batch := bc.db.NewBatch()
if err := WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
rawdb.WriteBlock(batch, block)
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
}
blockchain.mu.Lock()
WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
WriteBlock(blockchain.db, block)
rawdb.WriteBlock(blockchain.db, block)
statedb.Commit(true)
blockchain.mu.Unlock()
}
Expand All @@ -148,7 +148,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
// Manually insert the header into the database, but don't reorganise (allows subsequent testing)
blockchain.mu.Lock()
WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash)))
WriteHeader(blockchain.db, header)
rawdb.WriteHeader(blockchain.db, header)
blockchain.mu.Unlock()
}
return nil
Expand Down
7 changes: 4 additions & 3 deletions core/chain_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package core

import (
"fmt"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"math/rand"
"testing"
"time"

"github.com/XinFinOrg/XDPoSChain/core/rawdb"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/types"
)
Expand Down Expand Up @@ -94,8 +95,8 @@ func testChainIndexer(t *testing.T, count int) {
if number > 0 {
header.ParentHash = GetCanonicalHash(db, number-1)
}
WriteHeader(db, header)
WriteCanonicalHash(db, header.Hash(), number)
rawdb.WriteHeader(db, header)
rawdb.WriteCanonicalHash(db, header.Hash(), number)
}
// Start indexer with an already existing chain
for i := uint64(0); i <= 100; i++ {
Expand Down
Loading