From f44bffee015ccdf2543206a1649bcb35190e6c7b Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Mon, 1 Aug 2022 16:13:43 +0200 Subject: [PATCH 01/13] core,eth: cache logs in blockchain --- core/blockchain.go | 5 +++++ core/blockchain_reader.go | 17 +++++++++++++++++ eth/api_backend.go | 12 +----------- eth/filters/filter.go | 4 ++++ 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 3638a1dcea16..42e96ed1ec7b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -88,6 +88,7 @@ const ( bodyCacheLimit = 256 blockCacheLimit = 256 receiptsCacheLimit = 32 + logsCacheLimit = 32 txLookupCacheLimit = 1024 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 @@ -198,6 +199,7 @@ type BlockChain struct { bodyCache *lru.Cache // Cache for the most recent block bodies bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format receiptsCache *lru.Cache // Cache for the most recent receipts per block + logsCache *lru.Cache // Cache for the most recent logs per block blockCache *lru.Cache // Cache for the most recent entire blocks txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing @@ -225,6 +227,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) receiptsCache, _ := lru.New(receiptsCacheLimit) + logsCache, _ := lru.New(logsCacheLimit) blockCache, _ := lru.New(blockCacheLimit) txLookupCache, _ := lru.New(txLookupCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) @@ -244,6 +247,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, receiptsCache: receiptsCache, + logsCache: logsCache, blockCache: blockCache, txLookupCache: txLookupCache, futureBlocks: futureBlocks, @@ -681,6 +685,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo bc.bodyCache.Purge() bc.bodyRLPCache.Purge() bc.receiptsCache.Purge() + bc.logsCache.Purge() bc.blockCache.Purge() bc.txLookupCache.Purge() bc.futureBlocks.Purge() diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 96e9f80b6aac..dc3fd5b22df8 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -222,6 +222,23 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { return receipts } +// GetLogsByHash retrieves the logs for all transactions in a given block. +func (bc *BlockChain) GetLogsByHash(hash common.Hash) [][]*types.Log { + if logs, ok := bc.logsCache.Get(hash); ok { + return logs.([][]*types.Log) + } + number := rawdb.ReadHeaderNumber(bc.db, hash) + if number == nil { + return nil + } + logs := rawdb.ReadLogs(bc.db, hash, *number, bc.chainConfig) + if logs == nil { + return nil + } + bc.logsCache.Add(hash, logs) + return logs +} + // GetUnclesInChain retrieves all the uncles from a given block backwards until // a specific distance is reached. func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header { diff --git a/eth/api_backend.go b/eth/api_backend.go index 1d8ba8ea5cae..eb312e89d8c9 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -19,7 +19,6 @@ package eth import ( "context" "errors" - "fmt" "math/big" "time" @@ -203,16 +202,7 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type } func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - db := b.eth.ChainDb() - number := rawdb.ReadHeaderNumber(db, hash) - if number == nil { - return nil, fmt.Errorf("failed to get block number for hash %#x", hash) - } - logs := rawdb.ReadLogs(db, hash, *number, b.eth.blockchain.Config()) - if logs == nil { - return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", *number, hash.TerminalString()) - } - return logs, nil + return b.eth.blockchain.GetLogsByHash(hash), nil } func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 9ff7ab7f55e1..b009b90b5b0f 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -19,6 +19,7 @@ package filters import ( "context" "errors" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -266,6 +267,9 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ if err != nil { return nil, err } + if logsList == nil { + return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", header.Number.Uint64(), header.Hash().TerminalString()) + } var unfiltered []*types.Log for _, logs := range logsList { unfiltered = append(unfiltered, logs...) From c1229c8118a99335645ded57c84838402fa3f1e4 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 2 Aug 2022 19:20:53 +0200 Subject: [PATCH 02/13] eth/filters: add global logs cache This adds a cache for block logs which is shared by all filters. In order to share the cache reference, filters now need to be created through a 'filter system' object. --- accounts/abi/bind/backends/simulated.go | 31 +++++----- cmd/geth/config.go | 20 ++++++- cmd/utils/flags.go | 6 +- core/blockchain.go | 5 -- core/blockchain_reader.go | 17 ------ eth/api_backend.go | 4 +- eth/backend.go | 5 -- eth/filters/api.go | 20 +++---- eth/filters/bench_test.go | 16 +++-- eth/filters/filter.go | 61 ++++++++++++-------- eth/filters/filter_system.go | 46 ++++++++++++++- eth/filters/filter_system_test.go | 77 +++++++++++-------------- eth/filters/filter_test.go | 20 +++---- graphql/graphql.go | 7 ++- graphql/graphql_test.go | 9 ++- graphql/service.go | 9 +-- internal/ethapi/backend.go | 13 ++--- les/api_backend.go | 7 +-- les/client.go | 4 -- 19 files changed, 205 insertions(+), 172 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index cd14afa14755..d03ea760483d 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -68,7 +68,8 @@ type SimulatedBackend struct { pendingState *state.StateDB // Currently pending state that will be the active on request pendingReceipts types.Receipts // Currently receipts for the pending block - events *filters.EventSystem // Event system for filtering log events live + events *filters.EventSystem // for filtering log events live + filterSystem *filters.FilterSystem // for filtering database logs config *params.ChainConfig } @@ -86,7 +87,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis blockchain: blockchain, config: genesis.Config, } - backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false) + + filterBackend := &filterBackend{database, blockchain, backend} + backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) + backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.rollback(blockchain.CurrentBlock()) return backend } @@ -689,7 +694,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter var filter *filters.Filter if query.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics) + filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics) } else { // Initialize unset filter boundaries to run from genesis to chain head from := int64(0) @@ -701,7 +706,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter to = query.ToBlock.Int64() } // Construct the range filter - filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics) + filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -827,7 +832,8 @@ type filterBackend struct { backend *SimulatedBackend } -func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db } +func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db } + func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") } func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) { @@ -853,19 +859,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil } -func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - number := rawdb.ReadHeaderNumber(fb.db, hash) - if number == nil { - return nil, nil - } - receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()) - if receipts == nil { - return nil, nil - } - logs := make([][]*types.Log, len(receipts)) - for i, receipt := range receipts { - logs[i] = receipt.Logs - } +func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config()) return logs, nil } diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 2562de8ae9ea..2319a0e7972b 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -32,13 +32,16 @@ import ( "github.com/ethereum/go-ethereum/accounts/usbwallet" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/naoina/toml" ) @@ -163,7 +166,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name) cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override } + backend, eth := utils.RegisterEthService(stack, &cfg.Eth) + // Warn users to migrate if they have a legacy freezer format. if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) { firstIdx := uint64(0) @@ -181,10 +186,21 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.") } } - // Configure GraphQL if requested + + // Enable filter RPC API. + filterBackend := backend.(filters.Backend) + filterSystem := filters.NewFilterSystem(filterBackend, filters.Config{}) + isLightClient := cfg.Eth.SyncMode == downloader.LightSync + stack.RegisterAPIs([]rpc.API{{ + Namespace: "eth", + Service: filters.NewFilterAPI(filterSystem, isLightClient), + }}) + + // Configure GraphQL if requested. if ctx.IsSet(utils.GraphQLEnabledFlag.Name) { - utils.RegisterGraphQLService(stack, backend, cfg.Node) + utils.RegisterGraphQLService(stack, backend, filterSystem, cfg.Node) } + // Add the Ethereum Stats daemon if requested. if cfg.Ethstats.URL != "" { utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ff85e259aea3..1c101917783a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -43,6 +43,7 @@ import ( ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" @@ -2014,8 +2015,9 @@ func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url strin } // RegisterGraphQLService is a utility function to construct a new service and register it against a node. -func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) { - if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil { +func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg node.Config) { + err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts) + if err != nil { Fatalf("Failed to register the GraphQL service: %v", err) } } diff --git a/core/blockchain.go b/core/blockchain.go index 42e96ed1ec7b..3638a1dcea16 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -88,7 +88,6 @@ const ( bodyCacheLimit = 256 blockCacheLimit = 256 receiptsCacheLimit = 32 - logsCacheLimit = 32 txLookupCacheLimit = 1024 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 @@ -199,7 +198,6 @@ type BlockChain struct { bodyCache *lru.Cache // Cache for the most recent block bodies bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format receiptsCache *lru.Cache // Cache for the most recent receipts per block - logsCache *lru.Cache // Cache for the most recent logs per block blockCache *lru.Cache // Cache for the most recent entire blocks txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing @@ -227,7 +225,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) receiptsCache, _ := lru.New(receiptsCacheLimit) - logsCache, _ := lru.New(logsCacheLimit) blockCache, _ := lru.New(blockCacheLimit) txLookupCache, _ := lru.New(txLookupCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) @@ -247,7 +244,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, receiptsCache: receiptsCache, - logsCache: logsCache, blockCache: blockCache, txLookupCache: txLookupCache, futureBlocks: futureBlocks, @@ -685,7 +681,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo bc.bodyCache.Purge() bc.bodyRLPCache.Purge() bc.receiptsCache.Purge() - bc.logsCache.Purge() bc.blockCache.Purge() bc.txLookupCache.Purge() bc.futureBlocks.Purge() diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index dc3fd5b22df8..96e9f80b6aac 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -222,23 +222,6 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { return receipts } -// GetLogsByHash retrieves the logs for all transactions in a given block. -func (bc *BlockChain) GetLogsByHash(hash common.Hash) [][]*types.Log { - if logs, ok := bc.logsCache.Get(hash); ok { - return logs.([][]*types.Log) - } - number := rawdb.ReadHeaderNumber(bc.db, hash) - if number == nil { - return nil - } - logs := rawdb.ReadLogs(bc.db, hash, *number, bc.chainConfig) - if logs == nil { - return nil - } - bc.logsCache.Add(hash, logs) - return logs -} - // GetUnclesInChain retrieves all the uncles from a given block backwards until // a specific distance is reached. func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header { diff --git a/eth/api_backend.go b/eth/api_backend.go index eb312e89d8c9..00ecacc31df7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -201,8 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type return b.eth.blockchain.GetReceiptsByHash(hash), nil } -func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - return b.eth.blockchain.GetLogsByHash(hash), nil +func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil } func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { diff --git a/eth/backend.go b/eth/backend.go index ebe7001c7994..778207636344 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -25,7 +25,6 @@ import ( "strings" "sync" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -41,7 +40,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API { }, { Namespace: "eth", Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux), - }, { - Namespace: "eth", - Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute), }, { Namespace: "admin", Service: NewAdminAPI(s), diff --git a/eth/filters/api.go b/eth/filters/api.go index 07714791d263..98566d4a01bd 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -46,7 +46,7 @@ type filter struct { // FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type FilterAPI struct { - backend Backend + sys *FilterSystem events *EventSystem filtersMu sync.Mutex filters map[rpc.ID]*filter @@ -54,14 +54,14 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI { +func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { api := &FilterAPI{ - backend: backend, - events: NewEventSystem(backend, lightMode), + sys: system, + events: NewEventSystem(system, lightMode), filters: make(map[rpc.ID]*filter), - timeout: timeout, + timeout: system.cfg.Timeout, } - go api.timeoutLoop(timeout) + go api.timeoutLoop(system.cfg.Timeout) return api } @@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type var filter *Filter if crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics) } else { // Convert the RPC block numbers into internal representations begin := rpc.LatestBlockNumber.Int64() @@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type end = crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo var filter *Filter if f.crit.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) } else { // Convert the RPC block numbers into internal representations begin := rpc.LatestBlockNumber.Int64() @@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo end = f.crit.ToBlock.Int64() } // Construct the range filter - filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index 694d73735028..73b96b77af62 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -122,22 +122,27 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { b.Log("Running filter benchmarks...") start = time.Now() - var backend *testBackend + var ( + backend *testBackend + sys *FilterSystem + ) for i := 0; i < benchFilterCnt; i++ { if i%20 == 0 { db.Close() db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "", false) backend = &testBackend{db: db, sections: cnt} + sys = NewFilterSystem(backend, Config{}) } var addr common.Address addr[0] = byte(i) addr[1] = byte(i / 256) - filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + filter := sys.NewRangeFilter(0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) if _, err := filter.Logs(context.Background()); err != nil { - b.Error("filter.Find error:", err) + b.Error("filter.Logs error:", err) } } + d = time.Since(start) b.Log("Finished running filter benchmarks") b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") @@ -171,10 +176,11 @@ func BenchmarkNoBloomBits(b *testing.B) { clearBloomBits(db) + _, sys := newTestFilterSystem(b, db, Config{}) + b.Log("Running filter benchmarks...") start := time.Now() - backend := &testBackend{db: db} - filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil) + filter := sys.NewRangeFilter(0, int64(*headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) b.Log("Finished running filter benchmarks") diff --git a/eth/filters/filter.go b/eth/filters/filter.go index b009b90b5b0f..53686f55e8aa 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,7 +36,7 @@ type Backend interface { HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) - GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) + GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) PendingBlockAndReceipts() (*types.Block, types.Receipts) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription @@ -49,9 +49,27 @@ type Backend interface { ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) } +// cachedGetLogs loads block logs from the backend and caches the result. +func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { + cached, ok := sys.logsCache.Get(blockHash) + if ok { + return cached.([][]*types.Log), nil + } + + logs, err := sys.backend.GetLogs(ctx, blockHash, number) + if err != nil { + return nil, err + } + if logs == nil { + return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) + } + sys.logsCache.Add(blockHash, logs) + return logs, nil +} + // Filter can be used to retrieve and filter logs. type Filter struct { - backend Backend + sys *FilterSystem db ethdb.Database addresses []common.Address @@ -65,7 +83,7 @@ type Filter struct { // NewRangeFilter creates a new filter which uses a bloom filter on blocks to // figure out whether a particular block is interesting or not. -func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -84,10 +102,10 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres } filters = append(filters, filter) } - size, _ := backend.BloomStatus() + size, _ := sys.backend.BloomStatus() // Create a generic filter and convert it into a range filter - filter := newFilter(backend, addresses, topics) + filter := newFilter(sys, addresses, topics) filter.matcher = bloombits.NewMatcher(size, filters) filter.begin = begin @@ -98,21 +116,21 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres // NewBlockFilter creates a new filter which directly inspects the contents of // a block to figure out whether it is interesting or not. -func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { +func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { // Create a generic filter and convert it into a block filter - filter := newFilter(backend, addresses, topics) + filter := newFilter(sys, addresses, topics) filter.block = block return filter } // newFilter creates a generic filter that can either filter based on a block hash, // or based on range queries. The search criteria needs to be explicitly set. -func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { +func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ - backend: backend, + sys: sys, addresses: addresses, topics: topics, - db: backend.ChainDb(), + db: sys.backend.ChainDb(), } } @@ -121,7 +139,7 @@ func newFilter(backend Backend, addresses []common.Address, topics [][]common.Ha func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // If we're doing singleton block filtering, execute and return if f.block != (common.Hash{}) { - header, err := f.backend.HeaderByHash(ctx, f.block) + header, err := f.sys.backend.HeaderByHash(ctx, f.block) if err != nil { return nil, err } @@ -138,7 +156,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return f.pendingLogs() } // Figure out the limits of the filter range - header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } @@ -157,7 +175,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { var ( logs []*types.Log err error - size, sections = f.backend.BloomStatus() + size, sections = f.sys.backend.BloomStatus() ) if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { @@ -193,7 +211,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err } defer session.Close() - f.backend.ServiceFilter(ctx, session) + f.sys.backend.ServiceFilter(ctx, session) // Iterate over the matches until exhausted or context closed var logs []*types.Log @@ -212,7 +230,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err f.begin = int64(number) + 1 // Retrieve the suggested block and pull any truly matching logs - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { return logs, err } @@ -234,7 +252,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { - header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err } @@ -262,14 +280,11 @@ func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*t // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { - // Get the logs of the block - logsList, err := f.backend.GetLogs(ctx, header.Hash()) + logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - if logsList == nil { - return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", header.Number.Uint64(), header.Hash().TerminalString()) - } + var unfiltered []*types.Log for _, logs := range logsList { unfiltered = append(unfiltered, logs...) @@ -278,7 +293,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client if logs[0].TxHash == (common.Hash{}) { - receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + receipts, err := f.sys.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil, err } @@ -295,7 +310,7 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ // pendingLogs returns the logs matching the filter criteria within the pending block. func (f *Filter) pendingLogs() ([]*types.Log, error) { - block, receipts := f.backend.PendingBlockAndReceipts() + block, receipts := f.sys.backend.PendingBlockAndReceipts() if bloomFilter(block.Bloom(), f.addresses, f.topics) { var unfiltered []*types.Log for _, r := range receipts { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index c1a1b408b7a7..bcfa87d2c6c5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -32,8 +32,46 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" + lru "github.com/hashicorp/golang-lru" ) +// Config represents the configuration of the filter system. +type Config struct { + LogCacheSize int // maximum number of cached blocks (default: 32) + Timeout time.Duration // how long filters stay active (default: 5min) +} + +func (cfg Config) withDefaults() Config { + if cfg.Timeout == 0 { + cfg.Timeout = 5 * time.Minute + } + if cfg.LogCacheSize == 0 { + cfg.LogCacheSize = 32 + } + return cfg +} + +// FilterSystem holds resources shared by all filters. +type FilterSystem struct { + backend Backend + logsCache *lru.Cache + cfg *Config +} + +func NewFilterSystem(backend Backend, config Config) *FilterSystem { + config = config.withDefaults() + + cache, err := lru.New(config.LogCacheSize) + if err != nil { + panic(err) + } + return &FilterSystem{ + backend: backend, + logsCache: cache, + cfg: &config, + } +} + // Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. type Type byte @@ -84,6 +122,7 @@ type subscription struct { // subscription which match the subscription criteria. type EventSystem struct { backend Backend + sys *FilterSystem lightMode bool lastHead *types.Header @@ -110,9 +149,10 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(backend Backend, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m := &EventSystem{ - backend: backend, + sys: sys, + backend: sys.backend, lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), @@ -405,7 +445,7 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // Get the logs of the block ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - logsList, err := es.backend.GetLogs(ctx, header.Hash()) + logsList, err := es.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index c7fc4331b222..51bda29b4244 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -39,10 +39,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -var ( - deadline = 5 * time.Minute -) - type testBackend struct { db ethdb.Database sections uint64 @@ -91,17 +87,8 @@ func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types. return nil, nil } -func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - number := rawdb.ReadHeaderNumber(b.db, hash) - if number == nil { - return nil, nil - } - receipts := rawdb.ReadReceipts(b.db, hash, *number, params.TestChainConfig) - - logs := make([][]*types.Log, len(receipts)) - for i, receipt := range receipts { - logs[i] = receipt.Logs - } +func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + logs := rawdb.ReadLogs(b.db, hash, number, params.TestChainConfig) return logs, nil } @@ -160,6 +147,12 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { + backend := &testBackend{db: db} + sys := NewFilterSystem(backend, cfg) + return backend, sys +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -169,12 +162,12 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) - genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) - chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) + genesis = (&core.Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) + chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) for _, blk := range chain { @@ -221,9 +214,9 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -276,9 +269,9 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) testCases = []struct { crit FilterCriteria @@ -323,9 +316,9 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) ) // different situations where log filter creation should fail. @@ -346,8 +339,8 @@ func TestInvalidLogFilterCreation(t *testing.T) { func TestInvalidGetLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + _, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -370,9 +363,9 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -484,9 +477,9 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, deadline) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -668,10 +661,10 @@ func TestPendingTxFilterDeadlock(t *testing.T) { timeout := 100 * time.Millisecond var ( - db = rawdb.NewMemoryDatabase() - backend = &testBackend{db: db} - api = NewFilterAPI(backend, false, timeout) - done = make(chan struct{}) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) + api = NewFilterAPI(sys, false) + done = make(chan struct{}) ) go func() { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index ae4f069048f3..3860cae69723 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -44,7 +44,7 @@ func BenchmarkFilters(b *testing.B) { var ( db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false) - backend = &testBackend{db: db} + _, sys = newTestFilterSystem(b, db, Config{}) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr2 = common.BytesToAddress([]byte("jeff")) @@ -82,7 +82,7 @@ func BenchmarkFilters(b *testing.B) { } b.ResetTimer() - filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { logs, _ := filter.Logs(context.Background()) @@ -97,7 +97,7 @@ func TestFilters(t *testing.T) { var ( db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false) - backend = &testBackend{db: db} + _, sys = newTestFilterSystem(t, db, Config{}) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -161,14 +161,14 @@ func TestFilters(t *testing.T) { rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) } - filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) + filter := sys.NewRangeFilter(0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -177,7 +177,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = sys.NewRangeFilter(990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -186,7 +186,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) + filter = sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { @@ -194,7 +194,7 @@ func TestFilters(t *testing.T) { } failHash := common.BytesToHash([]byte("fail")) - filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}) + filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { @@ -202,14 +202,14 @@ func TestFilters(t *testing.T) { } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil) + filter = sys.NewRangeFilter(0, -1, []common.Address{failAddr}, nil) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) + filter = sys.NewRangeFilter(0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { diff --git a/graphql/graphql.go b/graphql/graphql.go index 0949c34803cf..04af530d0c3d 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -978,7 +978,7 @@ func (b *Block) Logs(ctx context.Context, args struct{ Filter BlockFilterCriteri hash = header.Hash() } // Construct the range filter - filter := filters.NewBlockFilter(b.r.backend, hash, addresses, topics) + filter := b.r.filterSystem.NewBlockFilter(hash, addresses, topics) // Run the filter and return all the logs return runFilter(ctx, b.r, filter) @@ -1137,7 +1137,8 @@ func (p *Pending) EstimateGas(ctx context.Context, args struct { // Resolver is the top-level object in the GraphQL hierarchy. type Resolver struct { - backend ethapi.Backend + backend ethapi.Backend + filterSystem *filters.FilterSystem } func (r *Resolver) Block(ctx context.Context, args struct { @@ -1284,7 +1285,7 @@ func (r *Resolver) Logs(ctx context.Context, args struct{ Filter FilterCriteria topics = *args.Filter.Topics } // Construct the range filter - filter := filters.NewRangeFilter(r.backend, begin, end, addresses, topics) + filter := r.filterSystem.NewRangeFilter(begin, end, addresses, topics) return runFilter(ctx, r, filter) } diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index 4b7f7bf96021..d55f4e063486 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" @@ -50,7 +51,7 @@ func TestBuildSchema(t *testing.T) { } defer stack.Close() // Make sure the schema can be parsed and matched up to the object model. - if err := newHandler(stack, nil, []string{}, []string{}); err != nil { + if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { t.Errorf("Could not construct GraphQL handler: %v", err) } } @@ -263,7 +264,8 @@ func createGQLService(t *testing.T, stack *node.Node) { t.Fatalf("could not create import blocks: %v", err) } // create gql service - err = New(stack, ethBackend.APIBackend, []string{}, []string{}) + filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) + err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } @@ -348,7 +350,8 @@ func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { t.Fatalf("could not create import blocks: %v", err) } // create gql service - err = New(stack, ethBackend.APIBackend, []string{}, []string{}) + filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) + err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } diff --git a/graphql/service.go b/graphql/service.go index 1a2ffaa9469d..019026bc7ea7 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -20,6 +20,7 @@ import ( "encoding/json" "net/http" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/node" "github.com/graph-gophers/graphql-go" @@ -55,14 +56,14 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // New constructs a new GraphQL service instance. -func New(stack *node.Node, backend ethapi.Backend, cors, vhosts []string) error { - return newHandler(stack, backend, cors, vhosts) +func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { + return newHandler(stack, backend, filterSystem, cors, vhosts) } // newHandler returns a new `http.Handler` that will answer GraphQL queries. // It additionally exports an interactive query browser on the / endpoint. -func newHandler(stack *node.Node, backend ethapi.Backend, cors, vhosts []string) error { - q := Resolver{backend} +func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { + q := Resolver{backend, filterSystem} s, err := graphql.ParseSchema(schema, &q) if err != nil { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d13547f234a3..fa42139dded6 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -84,16 +83,12 @@ type Backend interface { TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription - // Filter API - BloomStatus() (uint64, uint64) - GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) - SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription - ChainConfig() *params.ChainConfig Engine() consensus.Engine + + // eth/filters needs to be initialized from this backend type, so methods needed by + // it must also be included here. + // filters.Backend } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/les/api_backend.go b/les/api_backend.go index 11a9ca128aab..5b4213134b24 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -168,11 +168,8 @@ func (b *LesApiBackend) GetReceipts(ctx context.Context, hash common.Hash) (type return nil, nil } -func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { - return light.GetBlockLogs(ctx, b.eth.odr, hash, *number) - } - return nil, nil +func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) { + return light.GetBlockLogs(ctx, b.eth.odr, hash, number) } func (b *LesApiBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int { diff --git a/les/client.go b/les/client.go index 7caaf2c18a58..6504fe2af8f6 100644 --- a/les/client.go +++ b/les/client.go @@ -32,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -298,9 +297,6 @@ func (s *LightEthereum) APIs() []rpc.API { }, { Namespace: "eth", Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux), - }, { - Namespace: "eth", - Service: filters.NewFilterAPI(s.ApiBackend, true, 5*time.Minute), }, { Namespace: "net", Service: s.netRPCService, From 7e10c2956b3017734435f3f449ec8b39d8340651 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Aug 2022 01:43:50 +0200 Subject: [PATCH 03/13] cmd/geth: add flag for log cache size --- cmd/geth/config.go | 13 +++---------- cmd/geth/main.go | 1 + cmd/utils/flags.go | 30 ++++++++++++++++++++++++++---- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 2319a0e7972b..02074650d836 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -34,14 +34,12 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rpc" "github.com/naoina/toml" ) @@ -187,18 +185,13 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } } - // Enable filter RPC API. - filterBackend := backend.(filters.Backend) - filterSystem := filters.NewFilterSystem(filterBackend, filters.Config{}) + // Configure log filter RPC API. isLightClient := cfg.Eth.SyncMode == downloader.LightSync - stack.RegisterAPIs([]rpc.API{{ - Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, isLightClient), - }}) + filterSystem := utils.RegisterFilterAPI(ctx, stack, backend, isLightClient) // Configure GraphQL if requested. if ctx.IsSet(utils.GraphQLEnabledFlag.Name) { - utils.RegisterGraphQLService(stack, backend, filterSystem, cfg.Node) + utils.RegisterGraphQLService(stack, backend, filterSystem, &cfg.Node) } // Add the Ethereum Stats daemon if requested. diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c0f636fb26ff..8c32e0ea2d06 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -117,6 +117,7 @@ var ( utils.CacheSnapshotFlag, utils.CacheNoPrefetchFlag, utils.CachePreimagesFlag, + utils.CacheLogCacheSizeFlag, utils.FDLimitFlag, utils.ListenPortFlag, utils.DiscoveryPortFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1c101917783a..3fc4e16ca3b3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -65,6 +65,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" pcsclite "github.com/gballet/go-libpcsclite" gopsutil "github.com/shirou/gopsutil/mem" "github.com/urfave/cli/v2" @@ -492,6 +493,11 @@ var ( Usage: "Enable recording the SHA3/keccak preimages of trie keys", Category: flags.PerfCategory, } + CacheLogCacheSizeFlag = &cli.IntFlag{ + Name: "cache.logcache", + Usage: "Size of the block logs cache (for RPC filters)", + Category: flags.PerfCategory, + } FDLimitFlag = &cli.IntFlag{ Name: "fdlimit", Usage: "Raise the open file descriptor resource limit (default = system fd limit)", @@ -2006,22 +2012,38 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend return backend.APIBackend, backend } -// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to -// the given node. +// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node. func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url string) { if err := ethstats.New(stack, backend, backend.Engine(), url); err != nil { Fatalf("Failed to register the Ethereum Stats service: %v", err) } } -// RegisterGraphQLService is a utility function to construct a new service and register it against a node. -func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg node.Config) { +// RegisterGraphQLService adds the GraphQL API to the node. +func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg *node.Config) { err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts) if err != nil { Fatalf("Failed to register the GraphQL service: %v", err) } } +// RegisterFilterAPI adds the eth log filtering RPC API to the node. +func RegisterFilterAPI(ctx *cli.Context, stack *node.Node, backend ethapi.Backend, isLightClient bool) *filters.FilterSystem { + var cfg filters.Config + if ctx.IsSet(CacheLogCacheSizeFlag.Name) { + cfg.LogCacheSize = ctx.Int(CacheLogCacheSizeFlag.Name) + } + + filterBackend := backend.(filters.Backend) + + filterSystem := filters.NewFilterSystem(filterBackend, cfg) + stack.RegisterAPIs([]rpc.API{{ + Namespace: "eth", + Service: filters.NewFilterAPI(filterSystem, isLightClient), + }}) + return filterSystem +} + func SetupMetrics(ctx *cli.Context) { if metrics.Enabled { log.Info("Enabling metrics collection") From 3eab20d95ccad642a674efc677ddd52d34791eba Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Aug 2022 01:55:43 +0200 Subject: [PATCH 04/13] eth/ethconfig: add setting for log cache size --- cmd/geth/config.go | 4 +--- cmd/utils/flags.go | 17 +++++++++-------- eth/ethconfig/config.go | 4 ++++ eth/ethconfig/gen_config.go | 6 ++++++ 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 02074650d836..30565fda6185 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -32,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/usbwallet" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" @@ -186,8 +185,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } // Configure log filter RPC API. - isLightClient := cfg.Eth.SyncMode == downloader.LightSync - filterSystem := utils.RegisterFilterAPI(ctx, stack, backend, isLightClient) + filterSystem := utils.RegisterFilterAPI(stack, backend, &cfg.Eth) // Configure GraphQL if requested. if ctx.IsSet(utils.GraphQLEnabledFlag.Name) { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 3fc4e16ca3b3..fbd25f8cfeb1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -497,6 +497,7 @@ var ( Name: "cache.logcache", Usage: "Size of the block logs cache (for RPC filters)", Category: flags.PerfCategory, + Value: ethconfig.Defaults.FilterLogCacheSize, } FDLimitFlag = &cli.IntFlag{ Name: "fdlimit", @@ -1815,6 +1816,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) { cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100 } + if ctx.IsSet(CacheLogCacheSizeFlag.Name) { + cfg.FilterLogCacheSize = ctx.Int(CacheLogCacheSizeFlag.Name) + } if !ctx.Bool(SnapshotFlag.Name) { // If snap-sync is requested, this flag is also required if cfg.SyncMode == downloader.SnapSync { @@ -2028,15 +2032,12 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst } // RegisterFilterAPI adds the eth log filtering RPC API to the node. -func RegisterFilterAPI(ctx *cli.Context, stack *node.Node, backend ethapi.Backend, isLightClient bool) *filters.FilterSystem { - var cfg filters.Config - if ctx.IsSet(CacheLogCacheSizeFlag.Name) { - cfg.LogCacheSize = ctx.Int(CacheLogCacheSizeFlag.Name) - } - +func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem { filterBackend := backend.(filters.Backend) - - filterSystem := filters.NewFilterSystem(filterBackend, cfg) + isLightClient := ethcfg.SyncMode == downloader.LightSync + filterSystem := filters.NewFilterSystem(filterBackend, filters.Config{ + LogCacheSize: ethcfg.FilterLogCacheSize, + }) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", Service: filters.NewFilterAPI(filterSystem, isLightClient), diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 7ba2faf791b4..69e43e3ecbe1 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -83,6 +83,7 @@ var Defaults = Config{ TrieDirtyCache: 256, TrieTimeout: 60 * time.Minute, SnapshotCache: 102, + FilterLogCacheSize: 32, Miner: miner.Config{ GasCeil: 30000000, GasPrice: big.NewInt(params.GWei), @@ -171,6 +172,9 @@ type Config struct { SnapshotCache int Preimages bool + // This is the number of blocks for which logs will be cached in the filter system. + FilterLogCacheSize int + // Mining options Miner miner.Config diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index a6528c8df5f7..9c7a04364d20 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -48,6 +48,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TrieTimeout time.Duration SnapshotCache int Preimages bool + FilterLogCacheSize int Miner miner.Config Ethash ethash.Config TxPool core.TxPoolConfig @@ -93,6 +94,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TrieTimeout = c.TrieTimeout enc.SnapshotCache = c.SnapshotCache enc.Preimages = c.Preimages + enc.FilterLogCacheSize = c.FilterLogCacheSize enc.Miner = c.Miner enc.Ethash = c.Ethash enc.TxPool = c.TxPool @@ -142,6 +144,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TrieTimeout *time.Duration SnapshotCache *int Preimages *bool + FilterLogCacheSize *int Miner *miner.Config Ethash *ethash.Config TxPool *core.TxPoolConfig @@ -250,6 +253,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.Preimages != nil { c.Preimages = *dec.Preimages } + if dec.FilterLogCacheSize != nil { + c.FilterLogCacheSize = *dec.FilterLogCacheSize + } if dec.Miner != nil { c.Miner = *dec.Miner } From 601bd8a91c48bc7f66233afbaaa2dcfd4daf4949 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Aug 2022 01:57:07 +0200 Subject: [PATCH 05/13] internal/ethapi: re-add filter methods to backend --- cmd/utils/flags.go | 3 +-- internal/ethapi/backend.go | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index fbd25f8cfeb1..f172f49b4ca1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2033,9 +2033,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSyst // RegisterFilterAPI adds the eth log filtering RPC API to the node. func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem { - filterBackend := backend.(filters.Backend) isLightClient := ethcfg.SyncMode == downloader.LightSync - filterSystem := filters.NewFilterSystem(filterBackend, filters.Config{ + filterSystem := filters.NewFilterSystem(backend, filters.Config{ LogCacheSize: ethcfg.FilterLogCacheSize, }) stack.RegisterAPIs([]rpc.API{{ diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index fa42139dded6..5b4ceb631069 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" @@ -88,7 +89,7 @@ type Backend interface { // eth/filters needs to be initialized from this backend type, so methods needed by // it must also be included here. - // filters.Backend + filters.Backend } func GetAPIs(apiBackend Backend) []rpc.API { From a05dbf1c55d13a77a2e74e6d15a9772424a757fa Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Aug 2022 13:23:50 +0200 Subject: [PATCH 06/13] eth/filters: move more code to filter_system.go --- eth/filters/filter.go | 39 ------------------------------------ eth/filters/filter_system.go | 39 ++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 53686f55e8aa..430b77a09235 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -19,54 +19,15 @@ package filters import ( "context" "errors" - "fmt" "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) -type Backend interface { - ChainDb() ethdb.Database - HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) - HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) - GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) - GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) - - SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription - SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription - SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription - SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription - - BloomStatus() (uint64, uint64) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) -} - -// cachedGetLogs loads block logs from the backend and caches the result. -func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { - cached, ok := sys.logsCache.Get(blockHash) - if ok { - return cached.([][]*types.Log), nil - } - - logs, err := sys.backend.GetLogs(ctx, blockHash, number) - if err != nil { - return nil, err - } - if logs == nil { - return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) - } - sys.logsCache.Add(blockHash, logs) - return logs, nil -} - // Filter can be used to retrieve and filter logs. type Filter struct { sys *FilterSystem diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bcfa87d2c6c5..79a9b089f422 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -27,8 +27,10 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" @@ -51,6 +53,24 @@ func (cfg Config) withDefaults() Config { return cfg } +type Backend interface { + ChainDb() ethdb.Database + HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) + GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) + PendingBlockAndReceipts() (*types.Block, types.Receipts) + + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription + + BloomStatus() (uint64, uint64) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) +} + // FilterSystem holds resources shared by all filters. type FilterSystem struct { backend Backend @@ -58,6 +78,7 @@ type FilterSystem struct { cfg *Config } +// NewFilterSystem creates a filter system. func NewFilterSystem(backend Backend, config Config) *FilterSystem { config = config.withDefaults() @@ -72,6 +93,24 @@ func NewFilterSystem(backend Backend, config Config) *FilterSystem { } } +// cachedGetLogs loads block logs from the backend and caches the result. +func (sys *FilterSystem) cachedGetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) { + cached, ok := sys.logsCache.Get(blockHash) + if ok { + return cached.([][]*types.Log), nil + } + + logs, err := sys.backend.GetLogs(ctx, blockHash, number) + if err != nil { + return nil, err + } + if logs == nil { + return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", number, blockHash.TerminalString()) + } + sys.logsCache.Add(blockHash, logs) + return logs, nil +} + // Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. type Type byte From a0edae2da9848d95cf65113db38f34cb47c920a7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Aug 2022 13:55:35 +0200 Subject: [PATCH 07/13] eth/filters: remove db reference in Filter --- eth/filters/filter.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 430b77a09235..f2a22f1bfdb2 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -24,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rpc" ) @@ -32,7 +31,6 @@ import ( type Filter struct { sys *FilterSystem - db ethdb.Database addresses []common.Address topics [][]common.Hash @@ -91,7 +89,6 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common. sys: sys, addresses: addresses, topics: topics, - db: sys.backend.ChainDb(), } } From dfb6bd7d6bc1b3e166794f5375e0840dfdaf509a Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Aug 2022 13:58:48 +0200 Subject: [PATCH 08/13] cmd/utils: update description of --cache.logcache --- cmd/utils/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f172f49b4ca1..cd871f91f9e5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -495,7 +495,7 @@ var ( } CacheLogCacheSizeFlag = &cli.IntFlag{ Name: "cache.logcache", - Usage: "Size of the block logs cache (for RPC filters)", + Usage: "Size (in number of blocks) of the log cache for filtering", Category: flags.PerfCategory, Value: ethconfig.Defaults.FilterLogCacheSize, } From e7d49e209a6d7677381d5c7a654164b6c871508a Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Thu, 4 Aug 2022 16:28:47 +0200 Subject: [PATCH 09/13] eth/filters: add shortcut for empty filter criteria --- eth/filters/filter.go | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index f2a22f1bfdb2..0a70c9ece1db 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -104,7 +104,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { if header == nil { return nil, errors.New("unknown block") } - return f.blockLogs(ctx, header) + return f.blockLogs(ctx, header, false) } // Short-cut if all we care about is pending logs if f.begin == rpc.PendingBlockNumber.Int64() { @@ -192,7 +192,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err if header == nil || err != nil { return logs, err } - found, err := f.checkMatches(ctx, header) + found, err := f.blockLogs(ctx, header, true) if err != nil { return logs, err } @@ -214,7 +214,7 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e if header == nil || err != nil { return logs, err } - found, err := f.blockLogs(ctx, header) + found, err := f.blockLogs(ctx, header, false) if err != nil { return logs, err } @@ -224,30 +224,30 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e } // blockLogs returns the logs matching the filter criteria within a single block. -func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { - if bloomFilter(header.Bloom, f.addresses, f.topics) { - found, err := f.checkMatches(ctx, header) +func (f *Filter) blockLogs(ctx context.Context, header *types.Header, skipBloom bool) ([]*types.Log, error) { + // Fast track: no filtering criteria + if len(f.addresses) == 0 && len(f.topics) == 0 { + list, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { - return logs, err + return nil, err } - logs = append(logs, found...) + return flatten(list), nil + } else if skipBloom || bloomFilter(header.Bloom, f.addresses, f.topics) { + return f.checkMatches(ctx, header) } - return logs, nil + return nil, nil } // checkMatches checks if the receipts belonging to the given header contain any log events that // match the filter criteria. This function is called when the bloom filter signals a potential match. -func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*types.Log, error) { logsList, err := f.sys.cachedGetLogs(ctx, header.Hash(), header.Number.Uint64()) if err != nil { return nil, err } - var unfiltered []*types.Log - for _, logs := range logsList { - unfiltered = append(unfiltered, logs...) - } - logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + unfiltered := flatten(logsList) + logs := filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client if logs[0].TxHash == (common.Hash{}) { @@ -353,3 +353,11 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } return true } + +func flatten(list [][]*types.Log) []*types.Log { + var flat []*types.Log + for _, logs := range list { + flat = append(flat, logs...) + } + return flat +} From 66845892fd4bb9c9567424d92d1f0e33925b2521 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Fri, 5 Aug 2022 12:50:59 +0200 Subject: [PATCH 10/13] graphql: go through filter for tx logs --- graphql/graphql.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/graphql/graphql.go b/graphql/graphql.go index 04af530d0c3d..97b460c205ce 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -450,12 +450,36 @@ func (t *Transaction) CreatedContract(ctx context.Context, args BlockNumberArgs) } func (t *Transaction) Logs(ctx context.Context) (*[]*Log, error) { - receipt, err := t.getReceipt(ctx) - if err != nil || receipt == nil { + if _, err := t.resolve(ctx); err != nil { return nil, err } - ret := make([]*Log, 0, len(receipt.Logs)) - for _, log := range receipt.Logs { + if t.block == nil { + return nil, nil + } + if _, ok := t.block.numberOrHash.Hash(); !ok { + header, err := t.r.backend.HeaderByNumberOrHash(ctx, *t.block.numberOrHash) + if err != nil { + return nil, err + } + hash := header.Hash() + t.block.numberOrHash.BlockHash = &hash + } + return t.getLogs(ctx) +} + +// getLogs returns log objects for the given tx. +// Assumes block hash is resolved. +func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) { + var ( + hash, _ = t.block.numberOrHash.Hash() + filter = t.r.filterSystem.NewBlockFilter(hash, nil, nil) + logs, err = filter.Logs(ctx) + ) + if err != nil { + return nil, err + } + ret := make([]*Log, 0, len(logs)) + for _, log := range logs { ret = append(ret, &Log{ r: t.r, transaction: t, From 79c524c9f2ee8bffcde6312ef093870fea7efd84 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Fri, 5 Aug 2022 12:53:52 +0200 Subject: [PATCH 11/13] cmd: rename flag to cache.blocklogs --- cmd/geth/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 8c32e0ea2d06..b9e3ed31e813 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -117,7 +117,7 @@ var ( utils.CacheSnapshotFlag, utils.CacheNoPrefetchFlag, utils.CachePreimagesFlag, - utils.CacheLogCacheSizeFlag, + utils.CacheLogSizeFlag, utils.FDLimitFlag, utils.ListenPortFlag, utils.DiscoveryPortFlag, From 515f9f4a861da4647a24dcc0ae933e7de1a74b5a Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Mon, 8 Aug 2022 11:15:01 +0200 Subject: [PATCH 12/13] forgot to commit other file --- cmd/utils/flags.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index cd871f91f9e5..41da60998212 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -493,8 +493,8 @@ var ( Usage: "Enable recording the SHA3/keccak preimages of trie keys", Category: flags.PerfCategory, } - CacheLogCacheSizeFlag = &cli.IntFlag{ - Name: "cache.logcache", + CacheLogSizeFlag = &cli.IntFlag{ + Name: "cache.blocklogs", Usage: "Size (in number of blocks) of the log cache for filtering", Category: flags.PerfCategory, Value: ethconfig.Defaults.FilterLogCacheSize, @@ -1816,8 +1816,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) { cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100 } - if ctx.IsSet(CacheLogCacheSizeFlag.Name) { - cfg.FilterLogCacheSize = ctx.Int(CacheLogCacheSizeFlag.Name) + if ctx.IsSet(CacheLogSizeFlag.Name) { + cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name) } if !ctx.Bool(SnapshotFlag.Name) { // If snap-sync is requested, this flag is also required From eb4dc73eced4e1786ad9a7edfee36da6ac9a4a61 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Mon, 8 Aug 2022 16:42:57 +0200 Subject: [PATCH 13/13] register filter api in gethclient test --- ethclient/gethclient/gethclient_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ethclient/gethclient/gethclient_test.go b/ethclient/gethclient/gethclient_test.go index a0c17a034226..3bc01b13b289 100644 --- a/ethclient/gethclient/gethclient_test.go +++ b/ethclient/gethclient/gethclient_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" @@ -60,6 +61,12 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { if err != nil { t.Fatalf("can't create new ethereum service: %v", err) } + filterSystem := filters.NewFilterSystem(ethservice.APIBackend, filters.Config{}) + n.RegisterAPIs([]rpc.API{{ + Namespace: "eth", + Service: filters.NewFilterAPI(filterSystem, false), + }}) + // Import the test chain. if err := n.Start(); err != nil { t.Fatalf("can't start test node: %v", err)