Skip to content

Commit

Permalink
separate dbs
Browse files Browse the repository at this point in the history
  • Loading branch information
Geoff Stuart committed Jan 5, 2023
1 parent 41490ff commit b686a5e
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 67 deletions.
146 changes: 146 additions & 0 deletions chain/eth_transaction_hash_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package chain

import (
"database/sql"

"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/chain/types/ethtypes"
)

var pragmas = []string{
"PRAGMA synchronous = normal",
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA page_size = 32768",
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA read_uncommitted = ON",
}

var ddls = []string{
`CREATE TABLE IF NOT EXISTS tx_hash_lookup (
hash TEXT PRIMARY KEY,
cid TEXT NOT NULL
)`,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,

// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
}

const schemaVersion = 1

const (
insertTxHash = `INSERT OR IGNORE INTO tx_hash_lookup
(hash, cid)
VALUES(?, ?)`
)

type TransactionHashLookup struct {
db *sql.DB
}

func (ei *TransactionHashLookup) InsertTxHash(txHash ethtypes.EthHash, c cid.Cid) error {
hashEntry, err := ei.db.Prepare(insertTxHash)
if err != nil {
return xerrors.Errorf("prepare insert event: %w", err)
}

_, err = hashEntry.Exec(txHash.String(), c.String())
return err
}

func (ei *TransactionHashLookup) LookupCidFromTxHash(txHash ethtypes.EthHash) (cid.Cid, error) {
q, err := ei.db.Query("SELECT cid FROM tx_hash_lookup WHERE hash = :hash;", sql.Named("hash", txHash.String()))
if err != nil {
return cid.Undef, err
}

var c string
if !q.Next() {
return cid.Undef, xerrors.Errorf("transaction hash %s not found", txHash.String())
}
err = q.Scan(&c)
if err != nil {
return cid.Undef, err
}
return cid.Decode(c)
}

func (ei *TransactionHashLookup) LookupTxHashFromCid(c cid.Cid) (ethtypes.EthHash, error) {
q, err := ei.db.Query("SELECT hash FROM tx_hash_lookup WHERE cid = :cid;", sql.Named("cid", c.String()))
if err != nil {
return ethtypes.EmptyEthHash, err
}

var hashString string
if !q.Next() {
return ethtypes.EmptyEthHash, xerrors.Errorf("transaction hash %s not found", c.String())
}
err = q.Scan(&hashString)
if err != nil {
return ethtypes.EmptyEthHash, err
}
return ethtypes.EthHashFromHex(hashString)
}

func NewTransactionHashLookup(path string) (*TransactionHashLookup, error) {
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
return nil, xerrors.Errorf("open sqlite3 database: %w", err)
}

for _, pragma := range pragmas {
if _, err := db.Exec(pragma); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("exec pragma %q: %w", pragma, err)
}
}

q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if err == sql.ErrNoRows || !q.Next() {
// empty database, create the schema
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("exec ddl %q: %w", ddl, err)
}
}
} else if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("looking for _meta table: %w", err)
} else {
// Ensure we don't open a database from a different schema version

row := db.QueryRow("SELECT max(version) FROM _meta")
var version int
err := row.Scan(&version)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: no version found")
}
if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
}
}

return &TransactionHashLookup{
db: db,
}, nil
}

func (ei *TransactionHashLookup) Close() error {
if ei.db == nil {
return nil
}
return ei.db.Close()
}
54 changes: 0 additions & 54 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)

var pragmas = []string{
Expand Down Expand Up @@ -53,11 +52,6 @@ var ddls = []string{
value BLOB NOT NULL
)`,

`CREATE TABLE IF NOT EXISTS tx_hash_lookup (
hash TEXT PRIMARY KEY,
cid TEXT NOT NULL
)`,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
Expand All @@ -77,60 +71,12 @@ const (
insertEntry = `INSERT OR IGNORE INTO event_entry
(event_id, indexed, flags, key, value)
VALUES(?, ?, ?, ?, ?)`

insertTxHash = `INSERT OR IGNORE INTO tx_hash_lookup
(hash, cid)
VALUES(?, ?)`
)

type EventIndex struct {
db *sql.DB
}

func (ei *EventIndex) InsertTxHash(txHash ethtypes.EthHash, c cid.Cid) error {
hashEntry, err := ei.db.Prepare(insertTxHash)
if err != nil {
return xerrors.Errorf("prepare insert event: %w", err)
}

_, err = hashEntry.Exec(txHash.String(), c.String())
return err
}

func (ei *EventIndex) LookupCidFromTxHash(txHash ethtypes.EthHash) (cid.Cid, error) {
q, err := ei.db.Query("SELECT cid FROM tx_hash_lookup WHERE hash = :hash;", sql.Named("hash", txHash.String()))
if err != nil {
return cid.Undef, err
}

var c string
if !q.Next() {
return cid.Undef, xerrors.Errorf("transaction hash %s not found", txHash.String())
}
err = q.Scan(&c)
if err != nil {
return cid.Undef, err
}
return cid.Decode(c)
}

func (ei *EventIndex) LookupTxHashFromCid(c cid.Cid) (ethtypes.EthHash, error) {
q, err := ei.db.Query("SELECT hash FROM tx_hash_lookup WHERE cid = :cid;", sql.Named("cid", c.String()))
if err != nil {
return ethtypes.EmptyEthHash, err
}

var hashString string
if !q.Next() {
return ethtypes.EmptyEthHash, xerrors.Errorf("transaction hash %s not found", c.String())
}
err = q.Scan(&hashString)
if err != nil {
return ethtypes.EmptyEthHash, err
}
return ethtypes.EthHashFromHex(hashString)
}

func NewEventIndex(path string) (*EventIndex, error) {
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/actors"
builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/events/filter"
Expand Down Expand Up @@ -258,7 +259,7 @@ func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *ethtype
var cid cid.Cid
if a.EthTxHashManager != nil {
var err error
cid, err = a.EthTxHashManager.EventIndex.LookupCidFromTxHash(*txHash)
cid, err = a.EthTxHashManager.TransactionHashLookup.LookupCidFromTxHash(*txHash)
if err != nil {
log.Debug("could not find transaction hash %s in lookup table", txHash.String())

Expand Down Expand Up @@ -324,7 +325,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash ethtype
var cid cid.Cid
if a.EthTxHashManager != nil {
var err error
cid, err = a.EthTxHashManager.EventIndex.LookupCidFromTxHash(txHash)
cid, err = a.EthTxHashManager.TransactionHashLookup.LookupCidFromTxHash(txHash)
if err != nil {
log.Debug("could not find transaction hash %s in lookup table", txHash.String())

Expand Down Expand Up @@ -1778,7 +1779,7 @@ func (m EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) err
return err
}

err = m.EventIndex.InsertTxHash(hash, smsg.Cid())
err = m.TransactionHashLookup.InsertTxHash(hash, smsg.Cid())
if err != nil {
return err
}
Expand All @@ -1789,8 +1790,8 @@ func (m EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) err
}

type EthTxHashManager struct {
StateAPI StateAPI
EventIndex *filter.EventIndex
StateAPI StateAPI
TransactionHashLookup *chain.TransactionHashLookup
}

func (m EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error {
Expand All @@ -1806,7 +1807,6 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager
if u.Type != api.MpoolAdd {
continue
}
fmt.Println(">>>>>> Mpool Add ", u.Message.Signature.Type)
if u.Message.Signature.Type != crypto.SigTypeDelegated {
continue
}
Expand All @@ -1816,8 +1816,7 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager
log.Errorf("error converting filecoin message to eth tx: %s", err)
}

fmt.Println(">>>>>> DB ADD ")
err = manager.EventIndex.InsertTxHash(ethTx.Hash, u.Message.Cid())
err = manager.TransactionHashLookup.InsertTxHash(ethTx.Hash, u.Message.Cid())
if err != nil {
log.Errorf("error inserting tx mapping to db: %s", err)
}
Expand Down
10 changes: 5 additions & 5 deletions node/modules/ethmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"go.uber.org/fx"

"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
Expand All @@ -31,20 +31,20 @@ func EthModuleAPI(cfg config.EthTxHashConfig) func(helpers.MetricsCtx, fx.Lifecy
return em, nil
}

eventIndex, err := filter.NewEventIndex(cfg.TransactionHashLookupDatabasePath)
transactionHashLookup, err := chain.NewTransactionHashLookup(cfg.TransactionHashLookupDatabasePath)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return eventIndex.Close()
return transactionHashLookup.Close()
},
})

ethTxHashManager := full.EthTxHashManager{
StateAPI: stateapi,
EventIndex: eventIndex,
StateAPI: stateapi,
TransactionHashLookup: transactionHashLookup,
}

em.EthTxHashManager = &ethTxHashManager
Expand Down

0 comments on commit b686a5e

Please sign in to comment.