Skip to content

Commit

Permalink
feat: add file-based transaction indexing (#546)
Browse files Browse the repository at this point in the history
# Description

This PR introduces file-based transaction indexing. As discussed in
#275, transaction index parsing should be done as a separate process
from the main node, meaning other services can be instantiated to index
transactions as readers.

The general architecture of the transaction indexers in this PR can be
described with the following image:
<img width="1481" alt="Architecture"
src="https://user-images.githubusercontent.com/16712663/221845299-ff552470-8efc-4134-8c3d-e71e74929acc.png">

Each concrete indexer implementation decides how to handle transaction
events, and where to store them.
Independent processes from the indexers themselves read these events (by
parsing files, logs, executing RPC queries...).

## File Indexer

The `file` transaction indexer that is included in this PR utilizes
`autofile.Group`s to write down transaction events.

Users can now specify to use the file-based indexer with the following
added flags to the `gnoland` command:
- `--tx-indexer-type` - specify the type of indexer (none is default)
- `--tx-indexer-path` - path for the file-based tx indexer

# Changes include

- [ ] Bugfix (non-breaking change that solves an issue)
- [ ] Hotfix (change that solves an urgent issue, and requires immediate
attention)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (change that is not backwards-compatible and/or
changes current functionality)

# Checklist (for contributors)

- [x] I have assigned this PR to myself
- [x] I have added at least 1 reviewer
- [x] I have added the relevant labels
- [ ] I have updated the official documentation
- [x] I have added sufficient documentation in code

# Testing

- [x] I have tested this code with the official test suite
- [x] I have tested this code manually

## Manual tests

- Manually executed transactions and verified they were saved to disk.
- Added unit tests that cover all added functionality.

# Additional comments

- [Relevant tendermint2
issue](tendermint/tendermint2#2)
- Resolves #275 

EDIT:
After comments from @jaekwon, this `Indexer` functionality has been
renamed to `EventStore`, and work on an independent indexer process
(process that can read from the event store) will begin soon that will
offer indexing functionality

cc @ilgooz
  • Loading branch information
zivkovicmilos authored Oct 19, 2023
1 parent 09dfe6e commit 0600d41
Show file tree
Hide file tree
Showing 18 changed files with 865 additions and 169 deletions.
62 changes: 62 additions & 0 deletions gno.land/cmd/gnoland/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"path/filepath"
Expand All @@ -17,6 +18,9 @@ import (
"github.com/gnolang/gno/tm2/pkg/bft/config"
"github.com/gnolang/gno/tm2/pkg/bft/node"
"github.com/gnolang/gno/tm2/pkg/bft/privval"
"github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file"
"github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/null"
eventstorecfg "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types"
bft "github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/commands"
"github.com/gnolang/gno/tm2/pkg/crypto"
Expand All @@ -35,6 +39,9 @@ type startCfg struct {
rootDir string
genesisMaxVMCycles int64
config string

txEventStoreType string
txEventStorePath string
}

func newStartCmd(io *commands.IO) *commands.Command {
Expand Down Expand Up @@ -116,6 +123,29 @@ func (c *startCfg) RegisterFlags(fs *flag.FlagSet) {
"",
"config file (optional)",
)

fs.StringVar(
&c.txEventStoreType,
"tx-event-store-type",
null.EventStoreType,
fmt.Sprintf(
"type of transaction event store [%s]",
strings.Join(
[]string{
null.EventStoreType,
file.EventStoreType,
},
", ",
),
),
)

fs.StringVar(
&c.txEventStorePath,
"tx-event-store-path",
"",
fmt.Sprintf("path for the file tx event store (required if event store is '%s')", file.EventStoreType),
)
}

func execStart(c *startCfg, args []string, io *commands.IO) error {
Expand Down Expand Up @@ -145,6 +175,14 @@ func execStart(c *startCfg, args []string, io *commands.IO) error {
writeGenesisFile(genDoc, genesisFilePath)
}

// Initialize the indexer config
txEventStoreCfg, err := getTxEventStoreConfig(c)
if err != nil {
return fmt.Errorf("unable to parse indexer config, %w", err)
}

cfg.TxEventStore = txEventStoreCfg

// create application and node.
gnoApp, err := gnoland.NewApp(rootDir, c.skipFailingGenesisTxs, logger, c.genesisMaxVMCycles)
if err != nil {
Expand Down Expand Up @@ -180,6 +218,30 @@ func execStart(c *startCfg, args []string, io *commands.IO) error {
select {} // run forever
}

// getTxEventStoreConfig constructs an event store config from provided user options
func getTxEventStoreConfig(c *startCfg) (*eventstorecfg.Config, error) {
var cfg *eventstorecfg.Config

switch c.txEventStoreType {
case file.EventStoreType:
if c.txEventStorePath == "" {
return nil, errors.New("unspecified file transaction indexer path")
}

// Fill out the configuration
cfg = &eventstorecfg.Config{
EventStoreType: file.EventStoreType,
Params: map[string]any{
file.Path: c.txEventStorePath,
},
}
default:
cfg = eventstorecfg.DefaultEventStoreConfig()
}

return cfg, nil
}

// Makes a local test genesis doc with local privValidator.
func makeGenesisDoc(
pvPub crypto.PubKey,
Expand Down
21 changes: 17 additions & 4 deletions tm2/pkg/autofile/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (g *Group) OnStart() error {
// OnStop implements service.Service by stopping the goroutine described above.
// NOTE: g.Head must be closed separately using Close.
func (g *Group) OnStop() {
g.FlushAndSync()
if err := g.FlushAndSync(); err != nil {
g.Logger.Error(
fmt.Sprintf("unable to gracefully flush data, %s", err.Error()),
)
}
}

// Wait blocks until all internal goroutines are finished. Supposed to be
Expand All @@ -136,11 +140,20 @@ func (g *Group) Wait() {

// Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() {
g.FlushAndSync()
if err := g.FlushAndSync(); err != nil {
g.Logger.Error(
fmt.Sprintf("unable to gracefully flush data, %s", err.Error()),
)
}

g.mtx.Lock()
_ = g.Head.Close()
g.mtx.Unlock()
defer g.mtx.Unlock()

if err := g.Head.Close(); err != nil {
g.Logger.Error(
fmt.Sprintf("unable to gracefully close group head, %s", err.Error()),
)
}
}

// HeadSizeLimit returns the current head size limit.
Expand Down
34 changes: 19 additions & 15 deletions tm2/pkg/bft/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
cns "github.com/gnolang/gno/tm2/pkg/bft/consensus/config"
mem "github.com/gnolang/gno/tm2/pkg/bft/mempool/config"
rpc "github.com/gnolang/gno/tm2/pkg/bft/rpc/config"
eventstore "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/types"
"github.com/gnolang/gno/tm2/pkg/errors"
osm "github.com/gnolang/gno/tm2/pkg/os"
p2p "github.com/gnolang/gno/tm2/pkg/p2p/config"
Expand All @@ -20,20 +21,22 @@ type Config struct {
BaseConfig `toml:",squash"`

// Options for services
RPC *rpc.RPCConfig `toml:"rpc"`
P2P *p2p.P2PConfig `toml:"p2p"`
Mempool *mem.MempoolConfig `toml:"mempool"`
Consensus *cns.ConsensusConfig `toml:"consensus"`
RPC *rpc.RPCConfig `toml:"rpc"`
P2P *p2p.P2PConfig `toml:"p2p"`
Mempool *mem.MempoolConfig `toml:"mempool"`
Consensus *cns.ConsensusConfig `toml:"consensus"`
TxEventStore *eventstore.Config `toml:"tx_event_store"`
}

// DefaultConfig returns a default configuration for a Tendermint node
func DefaultConfig() *Config {
return &Config{
BaseConfig: DefaultBaseConfig(),
RPC: rpc.DefaultRPCConfig(),
P2P: p2p.DefaultP2PConfig(),
Mempool: mem.DefaultMempoolConfig(),
Consensus: cns.DefaultConsensusConfig(),
BaseConfig: DefaultBaseConfig(),
RPC: rpc.DefaultRPCConfig(),
P2P: p2p.DefaultP2PConfig(),
Mempool: mem.DefaultMempoolConfig(),
Consensus: cns.DefaultConsensusConfig(),
TxEventStore: eventstore.DefaultEventStoreConfig(),
}
}

Expand Down Expand Up @@ -68,11 +71,12 @@ func LoadOrMakeConfigWithOptions(root string, options ConfigOptions) (cfg *Confi
// TestConfig returns a configuration that can be used for testing
func TestConfig() *Config {
return &Config{
BaseConfig: TestBaseConfig(),
RPC: rpc.TestRPCConfig(),
P2P: p2p.TestP2PConfig(),
Mempool: mem.TestMempoolConfig(),
Consensus: cns.TestConsensusConfig(),
BaseConfig: TestBaseConfig(),
RPC: rpc.TestRPCConfig(),
P2P: p2p.TestP2PConfig(),
Mempool: mem.TestMempoolConfig(),
Consensus: cns.TestConsensusConfig(),
TxEventStore: eventstore.DefaultEventStoreConfig(),
}
}

Expand Down Expand Up @@ -121,7 +125,7 @@ func (cfg *Config) ValidateBasic() error {
return nil
}

//-----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// BaseConfig

const (
Expand Down
Loading

0 comments on commit 0600d41

Please sign in to comment.