Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: add global block logs cache #25459

Merged
merged 13 commits into from
Aug 19, 2022
31 changes: 13 additions & 18 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type SimulatedBackend struct {
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live
events *filters.EventSystem // for filtering log events live
filterSystem *filters.FilterSystem // for filtering database logs

config *params.ChainConfig
}
Expand All @@ -86,7 +87,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
blockchain: blockchain,
config: genesis.Config,
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

backend.rollback(blockchain.CurrentBlock())
return backend
}
Expand Down Expand Up @@ -689,7 +694,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaries to run from genesis to chain head
from := int64(0)
Expand All @@ -701,7 +706,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -827,7 +832,8 @@ type filterBackend struct {
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }

func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }

func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
Expand All @@ -853,19 +859,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
number := rawdb.ReadHeaderNumber(fb.db, hash)
if number == nil {
return nil, nil
}
receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config())
if receipts == nil {
return nil, nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
}
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config())
return logs, nil
}

Expand Down
11 changes: 9 additions & 2 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name)
cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Warn users to migrate if they have a legacy freezer format.
if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) {
firstIdx := uint64(0)
Expand All @@ -181,10 +183,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
}
}
// Configure GraphQL if requested

// Configure log filter RPC API.
filterSystem := utils.RegisterFilterAPI(stack, backend, &cfg.Eth)

// Configure GraphQL if requested.
if ctx.IsSet(utils.GraphQLEnabledFlag.Name) {
utils.RegisterGraphQLService(stack, backend, cfg.Node)
utils.RegisterGraphQLService(stack, backend, filterSystem, &cfg.Node)
}

// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
utils.CachePreimagesFlag,
utils.CacheLogSizeFlag,
utils.FDLimitFlag,
utils.ListenPortFlag,
utils.DiscoveryPortFlag,
Expand Down
34 changes: 29 additions & 5 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
pcsclite "github.com/gballet/go-libpcsclite"
gopsutil "github.com/shirou/gopsutil/mem"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -491,6 +493,12 @@ var (
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
Category: flags.PerfCategory,
}
CacheLogSizeFlag = &cli.IntFlag{
Name: "cache.blocklogs",
Usage: "Size (in number of blocks) of the log cache for filtering",
Category: flags.PerfCategory,
Value: ethconfig.Defaults.FilterLogCacheSize,
}
FDLimitFlag = &cli.IntFlag{
Name: "fdlimit",
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
Expand Down Expand Up @@ -1808,6 +1816,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) {
cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100
}
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if !ctx.Bool(SnapshotFlag.Name) {
// If snap-sync is requested, this flag is also required
if cfg.SyncMode == downloader.SnapSync {
Expand Down Expand Up @@ -2005,21 +2016,34 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend
return backend.APIBackend, backend
}

// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
// the given node.
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url string) {
if err := ethstats.New(stack, backend, backend.Engine(), url); err != nil {
Fatalf("Failed to register the Ethereum Stats service: %v", err)
}
}

// RegisterGraphQLService is a utility function to construct a new service and register it against a node.
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) {
if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil {
// RegisterGraphQLService adds the GraphQL API to the node.
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg *node.Config) {
err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts)
if err != nil {
Fatalf("Failed to register the GraphQL service: %v", err)
}
}

// RegisterFilterAPI adds the eth log filtering RPC API to the node.
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, isLightClient),
}})
return filterSystem
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
Expand Down
14 changes: 2 additions & 12 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package eth
import (
"context"
"errors"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -202,17 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
return b.eth.blockchain.GetReceiptsByHash(hash), nil
}

func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
db := b.eth.ChainDb()
number := rawdb.ReadHeaderNumber(db, hash)
if number == nil {
return nil, fmt.Errorf("failed to get block number for hash %#x", hash)
}
logs := rawdb.ReadLogs(db, hash, *number, b.eth.blockchain.Config())
if logs == nil {
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", *number, hash.TerminalString())
}
return logs, nil
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil
}

func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
Expand Down
5 changes: 0 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
}, {
Namespace: "eth",
Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute),
}, {
Namespace: "admin",
Service: NewAdminAPI(s),
Expand Down
4 changes: 4 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var Defaults = Config{
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
Miner: miner.Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Expand Down Expand Up @@ -171,6 +172,9 @@ type Config struct {
SnapshotCache int
Preimages bool

// This is the number of blocks for which logs will be cached in the filter system.
FilterLogCacheSize int

// Mining options
Miner miner.Config

Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,22 @@ type filter struct {
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type FilterAPI struct {
backend Backend
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
}

// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
api := &FilterAPI{
backend: backend,
events: NewEventSystem(backend, lightMode),
sys: system,
events: NewEventSystem(system, lightMode),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
timeout: system.cfg.Timeout,
}
go api.timeoutLoop(timeout)
go api.timeoutLoop(system.cfg.Timeout)

return api
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down
Loading