diff --git a/block/manager_test.go b/block/manager_test.go index a29fdd517e1..24bb05258d3 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -1,6 +1,7 @@ package block import ( + "context" "crypto/rand" "testing" "time" @@ -33,9 +34,12 @@ func TestInitialState(t *testing.T) { NextValidators: getRandomValidatorSet(), } - emptyStore := store.New(store.NewDefaultInMemoryKVStore()) + ctx := context.Background() + es, _ := store.NewDefaultInMemoryKVStore() + emptyStore := store.New(ctx, es) - fullStore := store.New(store.NewDefaultInMemoryKVStore()) + es2, _ := store.NewDefaultInMemoryKVStore() + fullStore := store.New(ctx, es2) err := fullStore.UpdateState(sampleState) require.NoError(t, err) diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 33c0e1aabe9..0ea10862af4 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -7,11 +7,11 @@ import ( "time" "github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-datastore" "github.com/celestiaorg/go-cnc" "github.com/celestiaorg/rollmint/da" "github.com/celestiaorg/rollmint/log" - "github.com/celestiaorg/rollmint/store" "github.com/celestiaorg/rollmint/types" pb "github.com/celestiaorg/rollmint/types/pb/rollmint" ) @@ -37,7 +37,7 @@ type Config struct { } // Init initializes DataAvailabilityLayerClient instance. -func (c *DataAvailabilityLayerClient) Init(namespaceID types.NamespaceID, config []byte, kvStore store.KVStore, logger log.Logger) error { +func (c *DataAvailabilityLayerClient) Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error { c.namespaceID = namespaceID c.logger = logger diff --git a/da/celestia/mock/server.go b/da/celestia/mock/server.go index e6665ec6158..55a665f8980 100644 --- a/da/celestia/mock/server.go +++ b/da/celestia/mock/server.go @@ -39,7 +39,11 @@ func NewServer(blockTime time.Duration, logger log.Logger) *Server { // Start starts HTTP server with given listener. func (s *Server) Start(listener net.Listener) error { - err := s.mock.Init([8]byte{}, []byte(s.blockTime.String()), store.NewDefaultInMemoryKVStore(), s.logger) + kvStore, err := store.NewDefaultInMemoryKVStore() + if err != nil { + return err + } + err = s.mock.Init([8]byte{}, []byte(s.blockTime.String()), kvStore, s.logger) if err != nil { return err } diff --git a/da/da.go b/da/da.go index 2d9d149c5ec..b7cd3648708 100644 --- a/da/da.go +++ b/da/da.go @@ -3,8 +3,9 @@ package da import ( "context" + ds "github.com/ipfs/go-datastore" + "github.com/celestiaorg/rollmint/log" - "github.com/celestiaorg/rollmint/store" "github.com/celestiaorg/rollmint/types" ) @@ -60,7 +61,7 @@ type ResultRetrieveBlocks struct { // It also contains life-cycle methods. type DataAvailabilityLayerClient interface { // Init is called once to allow DA client to read configuration and initialize resources. - Init(namespaceID types.NamespaceID, config []byte, kvStore store.KVStore, logger log.Logger) error + Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error // Start is called once, after Init. It's implementation should start operation of DataAvailabilityLayerClient. Start() error diff --git a/da/grpc/grpc.go b/da/grpc/grpc.go index 485778b5735..40b278372f6 100644 --- a/da/grpc/grpc.go +++ b/da/grpc/grpc.go @@ -7,9 +7,10 @@ import ( "google.golang.org/grpc" + ds "github.com/ipfs/go-datastore" + "github.com/celestiaorg/rollmint/da" "github.com/celestiaorg/rollmint/log" - "github.com/celestiaorg/rollmint/store" "github.com/celestiaorg/rollmint/types" "github.com/celestiaorg/rollmint/types/pb/dalc" ) @@ -41,7 +42,7 @@ var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{} var _ da.BlockRetriever = &DataAvailabilityLayerClient{} // Init sets the configuration options. -func (d *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, _ store.KVStore, logger log.Logger) error { +func (d *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, _ ds.Datastore, logger log.Logger) error { d.logger = logger if len(config) == 0 { d.config = DefaultConfig diff --git a/da/grpc/mockserv/cmd/main.go b/da/grpc/mockserv/cmd/main.go index ee16c67b26c..a1dd93aabf3 100644 --- a/da/grpc/mockserv/cmd/main.go +++ b/da/grpc/mockserv/cmd/main.go @@ -22,7 +22,10 @@ func main() { flag.StringVar(&conf.Host, "host", "0.0.0.0", "listening address") flag.Parse() - kv := store.NewDefaultKVStore(".", "db", "rollmint") + kv, err := store.NewDefaultKVStore(".", "db", "rollmint") + if err != nil { + log.Panic(err) + } lis, err := net.Listen("tcp", conf.Host+":"+strconv.Itoa(conf.Port)) if err != nil { log.Panic(err) diff --git a/da/grpc/mockserv/mockserv.go b/da/grpc/mockserv/mockserv.go index 67cdbae4893..69d0f1601e3 100644 --- a/da/grpc/mockserv/mockserv.go +++ b/da/grpc/mockserv/mockserv.go @@ -3,19 +3,19 @@ package mockserv import ( "context" + ds "github.com/ipfs/go-datastore" tmlog "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" grpcda "github.com/celestiaorg/rollmint/da/grpc" "github.com/celestiaorg/rollmint/da/mock" - "github.com/celestiaorg/rollmint/store" "github.com/celestiaorg/rollmint/types" "github.com/celestiaorg/rollmint/types/pb/dalc" "github.com/celestiaorg/rollmint/types/pb/rollmint" ) // GetServer creates and returns gRPC server instance. -func GetServer(kv store.KVStore, conf grpcda.Config, mockConfig []byte, logger tmlog.Logger) *grpc.Server { +func GetServer(kv ds.Datastore, conf grpcda.Config, mockConfig []byte, logger tmlog.Logger) *grpc.Server { srv := grpc.NewServer() mockImpl := &mockImpl{} err := mockImpl.mock.Init([8]byte{}, mockConfig, kv, logger) diff --git a/da/mock/mock.go b/da/mock/mock.go index 1912552024f..66233240208 100644 --- a/da/mock/mock.go +++ b/da/mock/mock.go @@ -2,11 +2,13 @@ package mock import ( "context" - "encoding/binary" + "encoding/hex" "math/rand" "sync/atomic" "time" + ds "github.com/ipfs/go-datastore" + "github.com/celestiaorg/rollmint/da" "github.com/celestiaorg/rollmint/log" "github.com/celestiaorg/rollmint/store" @@ -17,7 +19,7 @@ import ( // It does actually ensures DA - it stores data in-memory. type DataAvailabilityLayerClient struct { logger log.Logger - dalcKV store.KVStore + dalcKV ds.Datastore daHeight uint64 config config } @@ -32,7 +34,7 @@ var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{} var _ da.BlockRetriever = &DataAvailabilityLayerClient{} // Init is called once to allow DA client to read configuration and initialize resources. -func (m *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, dalcKV store.KVStore, logger log.Logger) error { +func (m *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, dalcKV ds.Datastore, logger log.Logger) error { m.logger = logger m.dalcKV = dalcKV m.daHeight = 1 @@ -79,11 +81,12 @@ func (m *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} } - err = m.dalcKV.Set(getKey(daHeight, block.Header.Height), hash[:]) + err = m.dalcKV.Put(ctx, getKey(daHeight, block.Header.Height), hash[:]) if err != nil { return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} } - err = m.dalcKV.Set(hash[:], blob) + + err = m.dalcKV.Put(ctx, ds.NewKey(hex.EncodeToString(hash[:])), blob) if err != nil { return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} } @@ -109,14 +112,14 @@ func (m *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, daHeig return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: "block not found"}} } - iter := m.dalcKV.PrefixIterator(getPrefix(daHeight)) - defer iter.Discard() + results, err := store.PrefixEntries(ctx, m.dalcKV, getPrefix(daHeight)) + if err != nil { + return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} + } var blocks []*types.Block - for iter.Valid() { - hash := iter.Value() - - blob, err := m.dalcKV.Get(hash) + for result := range results.Next() { + blob, err := m.dalcKV.Get(ctx, ds.NewKey(hex.EncodeToString(result.Entry.Value))) if err != nil { return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} } @@ -127,24 +130,17 @@ func (m *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, daHeig return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} } blocks = append(blocks, block) - - iter.Next() } return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusSuccess}, Blocks: blocks} } -func getPrefix(daHeight uint64) []byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, daHeight) - return b +func getPrefix(daHeight uint64) string { + return store.GenerateKey([]interface{}{daHeight}) } -func getKey(daHeight uint64, height uint64) []byte { - b := make([]byte, 16) - binary.BigEndian.PutUint64(b, daHeight) - binary.BigEndian.PutUint64(b[8:], height) - return b +func getKey(daHeight uint64, height uint64) ds.Key { + return ds.NewKey(store.GenerateKey([]interface{}{daHeight, height})) } func (m *DataAvailabilityLayerClient) updateDAHeight() { diff --git a/da/test/da_test.go b/da/test/da_test.go index c4b8967742d..11e044d9bc3 100644 --- a/da/test/da_test.go +++ b/da/test/da_test.go @@ -88,7 +88,8 @@ func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) { } conf, _ = json.Marshal(config) } - err := dalc.Init(testNamespaceID, conf, store.NewDefaultInMemoryKVStore(), test.NewLogger(t)) + kvStore, _ := store.NewDefaultInMemoryKVStore() + err := dalc.Init(testNamespaceID, conf, kvStore, test.NewLogger(t)) require.NoError(err) err = dalc.Start() @@ -149,7 +150,8 @@ func startMockGRPCServ(t *testing.T) *grpc.Server { conf := grpcda.DefaultConfig logger := tmlog.NewTMLogger(os.Stdout) - srv := mockserv.GetServer(store.NewDefaultInMemoryKVStore(), conf, []byte(mockDaBlockTime.String()), logger) + kvStore, _ := store.NewDefaultInMemoryKVStore() + srv := mockserv.GetServer(kvStore, conf, []byte(mockDaBlockTime.String()), logger) lis, err := net.Listen("tcp", conf.Host+":"+strconv.Itoa(conf.Port)) if err != nil { t.Fatal(err) @@ -192,7 +194,8 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) { } conf, _ = json.Marshal(config) } - err := dalc.Init(testNamespaceID, conf, store.NewDefaultInMemoryKVStore(), test.NewLogger(t)) + kvStore, _ := store.NewDefaultInMemoryKVStore() + err := dalc.Init(testNamespaceID, conf, kvStore, test.NewLogger(t)) require.NoError(err) err = dalc.Start() diff --git a/go.mod b/go.mod index 19e2ae79a5b..4ebf9a0c407 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/rpc v1.2.0 github.com/gorilla/websocket v1.5.0 + github.com/ipfs/go-datastore v0.6.0 + github.com/ipfs/go-ds-badger3 v0.0.2 github.com/ipfs/go-log v1.0.5 github.com/libp2p/go-libp2p v0.22.0 github.com/libp2p/go-libp2p-kad-dht v0.20.0 @@ -73,7 +75,6 @@ require ( github.com/huin/goupnp v1.0.3 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/ipfs/go-cid v0.3.2 // indirect - github.com/ipfs/go-datastore v0.6.0 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect github.com/ipfs/go-ipns v0.2.0 // indirect github.com/ipfs/go-log/v2 v2.5.1 // indirect diff --git a/go.sum b/go.sum index 07c2f55cde9..cd8b08c0167 100644 --- a/go.sum +++ b/go.sum @@ -300,6 +300,8 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= +github.com/ipfs/go-ds-badger3 v0.0.2 h1:+pME0YfRnbUKhvySnakNMuCMsUUhmGfwIsH/nnHZ7QY= +github.com/ipfs/go-ds-badger3 v0.0.2/go.mod h1:6/yjF1KaOU+IpCaqMV43yoWIdxHqOAJlO9EhWLnZSkI= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= github.com/ipfs/go-ipns v0.2.0 h1:BgmNtQhqOw5XEZ8RAfWEpK4DhqaYiuP6h71MhIp7xXU= diff --git a/mempool/cache.go b/mempool/cache.go index 49428a02462..79bc8029366 100644 --- a/mempool/cache.go +++ b/mempool/cache.go @@ -3,8 +3,9 @@ package mempool import ( "container/list" - "github.com/tendermint/tendermint/types" "sync" + + "github.com/tendermint/tendermint/types" ) // TxCache defines an interface for raw transaction caching in a mempool. diff --git a/mempool/clist/clist.go b/mempool/clist/clist.go index 5579b1d0f23..2e4171b1c79 100644 --- a/mempool/clist/clist.go +++ b/mempool/clist/clist.go @@ -24,7 +24,6 @@ import ( const MaxLength = int(^uint(0) >> 1) /* - CElement is an element of a linked-list Traversal from a CElement is goroutine-safe. @@ -41,7 +40,6 @@ the for-loop. Use sync.Cond when you need serial access to the "condition". In our case our condition is if `next != nil || removed`, and there's no reason to serialize that condition for goroutines waiting on NextWait() (since it's just a read operation). - */ type CElement struct { mtx tmsync.RWMutex diff --git a/mempool/clist/clist_test.go b/mempool/clist/clist_test.go index d10a1e5ae9b..ccb50ca8392 100644 --- a/mempool/clist/clist_test.go +++ b/mempool/clist/clist_test.go @@ -68,6 +68,7 @@ func TestSmall(t *testing.T) { // This test is quite hacky because it relies on SetFinalizer // which isn't guaranteed to run at all. +// //nolint:unused,deadcode func _TestGCFifo(t *testing.T) { if runtime.GOARCH != "amd64" { @@ -117,6 +118,7 @@ func _TestGCFifo(t *testing.T) { // This test is quite hacky because it relies on SetFinalizer // which isn't guaranteed to run at all. +// //nolint:unused,deadcode func _TestGCRandom(t *testing.T) { if runtime.GOARCH != "amd64" { diff --git a/mempool/mempool.go b/mempool/mempool.go index 425e57d7f5a..f1819e488d8 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -4,6 +4,7 @@ import ( "crypto/sha256" "errors" "fmt" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/types" ) diff --git a/node/integration_test.go b/node/integration_test.go index 34fa5830471..7fce22d1eda 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -257,7 +257,8 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn nodes := make([]*Node, num) apps := make([]*mocks.Application, num) dalc := &mockda.DataAvailabilityLayerClient{} - _ = dalc.Init([8]byte{}, nil, store.NewDefaultInMemoryKVStore(), log.TestingLogger()) + ds, _ := store.NewDefaultInMemoryKVStore() + _ = dalc.Init([8]byte{}, nil, ds, log.TestingLogger()) _ = dalc.Start() nodes[0], apps[0] = createNode(aggCtx, 0, isMalicious, true, dalc, keys, wg, t) for i := 1; i < num; i++ { diff --git a/node/node.go b/node/node.go index 52f434042d7..22b54950713 100644 --- a/node/node.go +++ b/node/node.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" + ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" "github.com/libp2p/go-libp2p/core/crypto" "go.uber.org/multierr" @@ -35,9 +37,9 @@ import ( // prefixes used in KV store to separate main node data from DALC data var ( - mainPrefix = []byte{0} - dalcPrefix = []byte{1} - indexerPrefix = []byte{2} + mainPrefix = "0" + dalcPrefix = "1" + indexerPrefix = "2" // indexPrefix uses "i", so using "0-2" to avoid clash ) const ( @@ -99,18 +101,23 @@ func NewNode( return nil, err } - var baseKV store.KVStore + var baseKV ds.TxnDatastore + if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing logger.Info("WARNING: working in in-memory mode") - baseKV = store.NewDefaultInMemoryKVStore() + baseKV, err = store.NewDefaultInMemoryKVStore() } else { - baseKV = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollmint") + baseKV, err = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "rollmint") + } + if err != nil { + return nil, err } - mainKV := store.NewPrefixKV(baseKV, mainPrefix) - dalcKV := store.NewPrefixKV(baseKV, dalcPrefix) - indexerKV := store.NewPrefixKV(baseKV, indexerPrefix) - s := store.New(mainKV) + mainKV := newPrefixKV(baseKV, mainPrefix) + dalcKV := newPrefixKV(baseKV, dalcPrefix) + indexerKV := newPrefixKV(baseKV, indexerPrefix) + + s := store.New(ctx, mainKV) dalc := registry.GetClient(conf.DALayer) if dalc == nil { @@ -121,7 +128,7 @@ func NewNode( return nil, fmt.Errorf("data availability layer client initialization error: %w", err) } - indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(conf, indexerKV, eventBus, logger) + indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(ctx, conf, indexerKV, eventBus, logger) if err != nil { return nil, err } @@ -367,9 +374,14 @@ func (n *Node) newFraudProofValidator() p2p.GossipValidator { } } +func newPrefixKV(kvStore ds.Datastore, prefix string) ds.TxnDatastore { + return (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)}).Children()[0]).(ds.TxnDatastore) +} + func createAndStartIndexerService( + ctx context.Context, conf config.NodeConfig, - kvStore store.KVStore, + kvStore ds.TxnDatastore, eventBus *tmtypes.EventBus, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { @@ -379,8 +391,8 @@ func createAndStartIndexerService( blockIndexer indexer.BlockIndexer ) - txIndexer = kv.NewTxIndex(kvStore) - blockIndexer = blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events"))) + txIndexer = kv.NewTxIndex(ctx, kvStore) + blockIndexer = blockidxkv.New(ctx, newPrefixKV(kvStore, "block_events")) indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) diff --git a/state/indexer/block/kv/kv.go b/state/indexer/block/kv/kv.go index bc7f26333ba..d1cc7765462 100644 --- a/state/indexer/block/kv/kv.go +++ b/state/indexer/block/kv/kv.go @@ -8,7 +8,7 @@ import ( "strconv" "strings" - "github.com/google/orderedcode" + ds "github.com/ipfs/go-datastore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -24,25 +24,23 @@ var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) // events with an underlying KV store. Block events are indexed by their height, // such that matching search criteria returns the respective block height(s). type BlockerIndexer struct { - store store.KVStore + store ds.TxnDatastore + + ctx context.Context } -func New(store store.KVStore) *BlockerIndexer { +func New(ctx context.Context, store ds.TxnDatastore) *BlockerIndexer { return &BlockerIndexer{ store: store, + ctx: ctx, } } // Has returns true if the given height has been indexed. An error is returned // upon database query failure. func (idx *BlockerIndexer) Has(height int64) (bool, error) { - key, err := heightKey(height) - if err != nil { - return false, fmt.Errorf("failed to create block height index key: %w", err) - } - - _, err = idx.store.Get(key) - if err == store.ErrKeyNotFound { + _, err := idx.store.Get(idx.ctx, ds.NewKey(heightKey(height))) + if err == ds.ErrNotFound { return false, nil } return err == nil, err @@ -55,17 +53,16 @@ func (idx *BlockerIndexer) Has(height int64) (bool, error) { // BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height) // EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height) func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error { - batch := idx.store.NewBatch() - defer batch.Discard() + batch, err := idx.store.NewTransaction(idx.ctx, false) + if err != nil { + return fmt.Errorf("failed to create a new batch for transaction: %w", err) + } + defer batch.Discard(idx.ctx) height := bh.Header.Height // 1. index by height - key, err := heightKey(height) - if err != nil { - return fmt.Errorf("failed to create block height index key: %w", err) - } - if err := batch.Set(key, int64ToBytes(height)); err != nil { + if err := batch.Put(idx.ctx, ds.NewKey(heightKey(height)), int64ToBytes(height)); err != nil { return err } @@ -79,7 +76,7 @@ func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error { return fmt.Errorf("failed to index EndBlock events: %w", err) } - return batch.Commit() + return batch.Commit(idx.ctx) } // Search performs a query for block heights that match a given BeginBlock @@ -126,17 +123,13 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, // Extract ranges. If both upper and lower bounds exist, it's better to get // them in order as to not iterate over kvs that are not within range. ranges, rangeIndexes := indexer.LookForRanges(conditions) + if len(ranges) > 0 { skipIndexes = append(skipIndexes, rangeIndexes...) for _, qr := range ranges { - prefix, err := orderedcode.Append(nil, qr.Key) - if err != nil { - return nil, fmt.Errorf("failed to create prefix key: %w", err) - } - if !heightsInitialized { - filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true) + filteredHeights, err = idx.matchRange(ctx, qr, ds.NewKey(qr.Key).String(), filteredHeights, true) if err != nil { return nil, err } @@ -149,7 +142,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, break } } else { - filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false) + filteredHeights, err = idx.matchRange(ctx, qr, ds.NewKey(qr.Key).String(), filteredHeights, false) if err != nil { return nil, err } @@ -163,13 +156,10 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, continue } - startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand)) - if err != nil { - return nil, err - } + startKey := store.GenerateKey([]interface{}{c.CompositeKey, c.Operand}) if !heightsInitialized { - filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true) + filteredHeights, err = idx.match(ctx, c, ds.NewKey(startKey).String(), filteredHeights, true) if err != nil { return nil, err } @@ -182,7 +172,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, break } } else { - filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false) + filteredHeights, err = idx.match(ctx, c, ds.NewKey(string(startKey)).String(), filteredHeights, false) if err != nil { return nil, err } @@ -229,7 +219,7 @@ func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, func (idx *BlockerIndexer) matchRange( ctx context.Context, qr indexer.QueryRange, - startKey []byte, + startKey string, filteredHeights map[string][]byte, firstRun bool, ) (map[string][]byte, error) { @@ -244,34 +234,31 @@ func (idx *BlockerIndexer) matchRange( lowerBound := qr.LowerBoundValue() upperBound := qr.UpperBoundValue() - it := idx.store.PrefixIterator(startKey) - defer it.Discard() + results, err := store.PrefixEntries(ctx, idx.store, startKey) + if err != nil { + return nil, err + } -LOOP: - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true var ( - eventValue string - err error + v int64 + vStr string + err error ) if qr.Key == types.BlockHeightKey { - eventValue, err = parseValueFromPrimaryKey(it.Key()) + v, err = parseValueFromPrimaryKey(result.Entry.Key) } else { - eventValue, err = parseValueFromEventKey(it.Key()) + vStr = parseValueFromEventKey(result.Entry.Key) + v, err = strconv.ParseInt(vStr, 10, 64) } - if err != nil { continue } if _, ok := qr.AnyBound().(int64); ok { - v, err := strconv.ParseInt(eventValue, 10, 64) - if err != nil { - continue LOOP - } - include := true if lowerBound != nil && v < lowerBound.(int64) { include = false @@ -282,7 +269,7 @@ LOOP: } if include { - tmpHeights[string(it.Value())] = it.Value() + tmpHeights[string(result.Entry.Value)] = result.Entry.Value } } @@ -298,10 +285,6 @@ LOOP: } } - if err := it.Error(); err != nil { - return nil, err - } - if len(tmpHeights) == 0 || firstRun { // Either: // @@ -346,7 +329,7 @@ LOOP: func (idx *BlockerIndexer) match( ctx context.Context, c query.Condition, - startKeyBz []byte, + startKeyBz string, filteredHeights map[string][]byte, firstRun bool, ) (map[string][]byte, error) { @@ -361,34 +344,30 @@ func (idx *BlockerIndexer) match( switch { case c.Op == query.OpEqual: - it := idx.store.PrefixIterator(startKeyBz) - defer it.Discard() + results, err := store.PrefixEntries(ctx, idx.store, startKeyBz) + if err != nil { + return nil, err + } - for ; it.Valid(); it.Next() { - tmpHeights[string(it.Value())] = it.Value() + for result := range results.Next() { + tmpHeights[string(result.Entry.Value)] = result.Entry.Value if err := ctx.Err(); err != nil { break } } - if err := it.Error(); err != nil { - return nil, err - } - case c.Op == query.OpExists: - prefix, err := orderedcode.Append(nil, c.CompositeKey) + + results, err := store.PrefixEntries(ctx, idx.store, ds.NewKey(c.CompositeKey).String()) if err != nil { return nil, err } - it := idx.store.PrefixIterator(prefix) - defer it.Discard() - - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true - tmpHeights[string(it.Value())] = it.Value() + tmpHeights[string(result.Entry.Value)] = result.Entry.Value select { case <-ctx.Done(): @@ -402,29 +381,20 @@ func (idx *BlockerIndexer) match( } } - if err := it.Error(); err != nil { - return nil, err - } - case c.Op == query.OpContains: - prefix, err := orderedcode.Append(nil, c.CompositeKey) + + results, err := store.PrefixEntries(ctx, idx.store, ds.NewKey(c.CompositeKey).String()) if err != nil { return nil, err } - it := idx.store.PrefixIterator(prefix) - defer it.Discard() - - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true - eventValue, err := parseValueFromEventKey(it.Key()) - if err != nil { - continue - } + eventValue := parseValueFromEventKey(result.Entry.Key) if strings.Contains(eventValue, c.Operand.(string)) { - tmpHeights[string(it.Value())] = it.Value() + tmpHeights[string(result.Entry.Value)] = result.Entry.Value } select { @@ -438,9 +408,6 @@ func (idx *BlockerIndexer) match( break } } - if err := it.Error(); err != nil { - return nil, err - } default: return nil, errors.New("other operators should be handled already") @@ -481,7 +448,7 @@ func (idx *BlockerIndexer) match( return filteredHeights, nil } -func (idx *BlockerIndexer) indexEvents(batch store.Batch, events []abci.Event, typ string, height int64) error { +func (idx *BlockerIndexer) indexEvents(batch ds.Txn, events []abci.Event, typ string, height int64) error { heightBz := int64ToBytes(height) for _, event := range events { @@ -502,12 +469,9 @@ func (idx *BlockerIndexer) indexEvents(batch store.Batch, events []abci.Event, t } if attr.GetIndex() { - key, err := eventKey(compositeKey, typ, string(attr.Value), height) - if err != nil { - return fmt.Errorf("failed to create block index key: %w", err) - } + key := eventKey(compositeKey, typ, string(attr.Value), height) - if err := batch.Set(key, heightBz); err != nil { + if err := batch.Put(idx.ctx, ds.NewKey(key), heightBz); err != nil { return err } } diff --git a/state/indexer/block/kv/kv_test.go b/state/indexer/block/kv/kv_test.go index 59a15b77f43..c0d4af6db3a 100644 --- a/state/indexer/block/kv/kv_test.go +++ b/state/indexer/block/kv/kv_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -15,8 +17,10 @@ import ( ) func TestBlockIndexer(t *testing.T) { - prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events")) - indexer := blockidxkv.New(prefixStore) + kvStore, err := store.NewDefaultInMemoryKVStore() + require.NoError(t, err) + prefixStore := (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey("block_events")}).Children()[0]).(ds.TxnDatastore) + indexer := blockidxkv.New(context.Background(), prefixStore) require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{ Header: types.Header{Height: 1}, diff --git a/state/indexer/block/kv/util.go b/state/indexer/block/kv/util.go index c0b88018e06..844a67f36c4 100644 --- a/state/indexer/block/kv/util.go +++ b/state/indexer/block/kv/util.go @@ -4,8 +4,9 @@ import ( "encoding/binary" "fmt" "strconv" + "strings" - "github.com/google/orderedcode" + "github.com/celestiaorg/rollmint/store" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" ) @@ -31,58 +32,26 @@ func int64ToBytes(i int64) []byte { return buf[:n] } -func heightKey(height int64) ([]byte, error) { - return orderedcode.Append( - nil, - types.BlockHeightKey, - height, - ) +func heightKey(height int64) string { + return store.GenerateKey([]interface{}{types.BlockHeightKey, height}) } -func eventKey(compositeKey, typ, eventValue string, height int64) ([]byte, error) { - return orderedcode.Append( - nil, - compositeKey, - eventValue, - height, - typ, - ) +func eventKey(compositeKey, typ, eventValue string, height int64) string { + return store.GenerateKey([]interface{}{compositeKey, eventValue, height, typ}) } -func parseValueFromPrimaryKey(key []byte) (string, error) { - var ( - compositeKey string - height int64 - ) - - remaining, err := orderedcode.Parse(string(key), &compositeKey, &height) +func parseValueFromPrimaryKey(key string) (int64, error) { + parts := strings.SplitN(key, "/", 3) + height, err := strconv.ParseInt(parts[2], 10, 64) if err != nil { - return "", fmt.Errorf("failed to parse event key: %w", err) - } - - if len(remaining) != 0 { - return "", fmt.Errorf("unexpected remainder in key: %s", remaining) + return 0, fmt.Errorf("failed to parse event key: %w", err) } - - return strconv.FormatInt(height, 10), nil + return height, nil } -func parseValueFromEventKey(key []byte) (string, error) { - var ( - compositeKey, typ, eventValue string - height int64 - ) - - remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ) - if err != nil { - return "", fmt.Errorf("failed to parse event key: %w", err) - } - - if len(remaining) != 0 { - return "", fmt.Errorf("unexpected remainder in key: %s", remaining) - } - - return eventValue, nil +func parseValueFromEventKey(key string) string { + parts := strings.SplitN(key, "/", 5) + return parts[2] } func lookForHeight(conditions []query.Condition) (int64, bool) { diff --git a/state/indexer/block/null/null.go b/state/indexer/block/null/null.go index cf4c681e9d8..6917f112f87 100644 --- a/state/indexer/block/null/null.go +++ b/state/indexer/block/null/null.go @@ -4,9 +4,10 @@ import ( "context" "errors" - "github.com/celestiaorg/rollmint/state/indexer" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/rollmint/state/indexer" ) var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 382ae2ed895..81e8ffbcc64 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -3,9 +3,10 @@ package txindex import ( "context" - "github.com/celestiaorg/rollmint/state/indexer" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/rollmint/state/indexer" ) // XXX/TODO: These types should be moved to the indexer package. diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go index 0dd9290f4bf..276c89d3c7c 100644 --- a/state/txindex/indexer_service_test.go +++ b/state/txindex/indexer_service_test.go @@ -1,9 +1,12 @@ package txindex_test import ( + "context" "testing" "time" + ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" @@ -28,9 +31,10 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { }) // tx indexer - kvStore := store.NewDefaultInMemoryKVStore() - txIndexer := kv.NewTxIndex(kvStore) - blockIndexer := blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events"))) + kvStore, _ := store.NewDefaultInMemoryKVStore() + txIndexer := kv.NewTxIndex(context.Background(), kvStore) + prefixStore := (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey("block_events")}).Children()[0]).(ds.TxnDatastore) + blockIndexer := blockidxkv.New(context.Background(), prefixStore) service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) service.SetLogger(log.TestingLogger()) diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 7d1a3593f79..5c9fc6c0ef4 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -1,7 +1,6 @@ package kv import ( - "bytes" "context" "encoding/hex" "fmt" @@ -9,6 +8,7 @@ import ( "strings" "github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-datastore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" @@ -27,13 +27,16 @@ var _ txindex.TxIndexer = (*TxIndex)(nil) // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). type TxIndex struct { - store store.KVStore + store ds.TxnDatastore + + ctx context.Context } // NewTxIndex creates new KV indexer. -func NewTxIndex(store store.KVStore) *TxIndex { +func NewTxIndex(ctx context.Context, store ds.TxnDatastore) *TxIndex { return &TxIndex{ store: store, + ctx: ctx, } } @@ -44,7 +47,7 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { return nil, txindex.ErrorEmptyHash } - rawBytes, err := txi.store.Get(hash) + rawBytes, err := txi.store.Get(txi.ctx, ds.NewKey(hex.EncodeToString(hash))) if err != nil { panic(err) } @@ -66,8 +69,11 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { // the respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. func (txi *TxIndex) AddBatch(b *txindex.Batch) error { - storeBatch := txi.store.NewBatch() - defer storeBatch.Discard() + storeBatch, err := txi.store.NewTransaction(txi.ctx, false) + if err != nil { + return fmt.Errorf("failed to create a new batch for transaction: %w", err) + } + defer storeBatch.Discard(txi.ctx) for _, result := range b.Ops { hash := types.Tx(result.Tx).Hash() @@ -79,7 +85,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { } // index by height (always) - err = storeBatch.Set(keyForHeight(result), hash) + err = storeBatch.Put(txi.ctx, ds.NewKey(keyForHeight(result)), hash) if err != nil { return err } @@ -89,13 +95,13 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { return err } // index by hash (always) - err = storeBatch.Set(hash, rawBytes) + err = storeBatch.Put(txi.ctx, ds.NewKey(hex.EncodeToString(hash)), rawBytes) if err != nil { return err } } - return storeBatch.Commit() + return storeBatch.Commit(txi.ctx) } // Index indexes a single transaction using the given list of events. Each key @@ -103,19 +109,22 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { // respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. func (txi *TxIndex) Index(result *abci.TxResult) error { - b := txi.store.NewBatch() - defer b.Discard() + b, err := txi.store.NewTransaction(txi.ctx, false) + if err != nil { + return fmt.Errorf("failed to create a new batch for transaction: %w", err) + } + defer b.Discard(txi.ctx) hash := types.Tx(result.Tx).Hash() // index tx by events - err := txi.indexEvents(result, hash, b) + err = txi.indexEvents(result, hash, b) if err != nil { return err } // index by height (always) - err = b.Set(keyForHeight(result), hash) + err = b.Put(txi.ctx, ds.NewKey(keyForHeight(result)), hash) if err != nil { return err } @@ -125,15 +134,15 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { return err } // index by hash (always) - err = b.Set(hash, rawBytes) + err = b.Put(txi.ctx, ds.NewKey(hex.EncodeToString(hash)), rawBytes) if err != nil { return err } - return b.Commit() + return b.Commit(txi.ctx) } -func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store store.Batch) error { +func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store ds.Txn) error { for _, event := range result.Result.Events { // only index events with a non-empty type if len(event.Type) == 0 { @@ -148,7 +157,7 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store store. // index if `index: true` is set compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) if attr.GetIndex() { - err := store.Set(keyForEvent(compositeTag, attr.Value, result), hash) + err := store.Put(txi.ctx, ds.NewKey(keyForEvent(compositeTag, attr.Value, result)), hash) if err != nil { return err } @@ -305,7 +314,7 @@ func lookForHeight(conditions []query.Condition) (height int64) { func (txi *TxIndex) match( ctx context.Context, c query.Condition, - startKeyBz []byte, + startKeyBz string, filteredHashes map[string][]byte, firstRun bool, ) map[string][]byte { @@ -319,13 +328,15 @@ func (txi *TxIndex) match( switch { case c.Op == query.OpEqual: - it := txi.store.PrefixIterator(startKeyBz) - defer it.Discard() + results, err := store.PrefixEntries(ctx, txi.store, startKeyBz) + if err != nil { + panic(err) + } - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true - tmpHashes[string(it.Value())] = it.Value() + tmpHashes[string(result.Entry.Value)] = result.Entry.Value // Potentially exit early. select { @@ -338,20 +349,19 @@ func (txi *TxIndex) match( break } } - if err := it.Error(); err != nil { - panic(err) - } case c.Op == query.OpExists: // XXX: can't use startKeyBz here because c.Operand is nil // (e.g. "account.owner//" won't match w/ a single row) - it := txi.store.PrefixIterator(startKey(c.CompositeKey)) - defer it.Discard() + results, err := store.PrefixEntries(ctx, txi.store, startKey(c.CompositeKey)) + if err != nil { + panic(err) + } - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true - tmpHashes[string(it.Value())] = it.Value() + tmpHashes[string(result.Entry.Value)] = result.Entry.Value // Potentially exit early. select { @@ -364,26 +374,25 @@ func (txi *TxIndex) match( break } } - if err := it.Error(); err != nil { - panic(err) - } case c.Op == query.OpContains: // XXX: startKey does not apply here. // For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" - it := txi.store.PrefixIterator(startKey(c.CompositeKey)) - defer it.Discard() + results, err := store.PrefixEntries(ctx, txi.store, startKey(c.CompositeKey)) + if err != nil { + panic(err) + } - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true - if !isTagKey(it.Key()) { + if !isTagKey([]byte(result.Entry.Key)) { continue } - if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { - tmpHashes[string(it.Value())] = it.Value() + if strings.Contains(extractValueFromKey([]byte(result.Entry.Key)), c.Operand.(string)) { + tmpHashes[string(result.Entry.Value)] = result.Entry.Value } // Potentially exit early. @@ -397,9 +406,6 @@ func (txi *TxIndex) match( break } } - if err := it.Error(); err != nil { - panic(err) - } default: panic("other operators should be handled already") } @@ -447,7 +453,7 @@ func (txi *TxIndex) match( func (txi *TxIndex) matchRange( ctx context.Context, qr indexer.QueryRange, - startKey []byte, + startKey string, filteredHashes map[string][]byte, firstRun bool, ) map[string][]byte { @@ -461,19 +467,21 @@ func (txi *TxIndex) matchRange( lowerBound := qr.LowerBoundValue() upperBound := qr.UpperBoundValue() - it := txi.store.PrefixIterator(startKey) - defer it.Discard() + results, err := store.PrefixEntries(ctx, txi.store, startKey) + if err != nil { + panic(err) + } LOOP: - for ; it.Valid(); it.Next() { + for result := range results.Next() { cont := true - if !isTagKey(it.Key()) { + if !isTagKey([]byte(result.Entry.Key)) { continue } if _, ok := qr.AnyBound().(int64); ok { - v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + v, err := strconv.ParseInt(extractValueFromKey([]byte(result.Entry.Key)), 10, 64) if err != nil { continue LOOP } @@ -488,7 +496,7 @@ LOOP: } if include { - tmpHashes[string(it.Value())] = it.Value() + tmpHashes[string(result.Entry.Value)] = result.Entry.Value } // XXX: passing time in a ABCI Events is not yet implemented @@ -510,9 +518,6 @@ LOOP: break } } - if err := it.Error(); err != nil { - panic(err) - } if len(tmpHashes) == 0 || firstRun { // Either: @@ -552,43 +557,39 @@ LOOP: // Keys func isTagKey(key []byte) bool { - return strings.Count(string(key), tagKeySeparator) == 3 + return strings.Count(string(key), tagKeySeparator) == 4 } func extractValueFromKey(key []byte) string { - parts := strings.SplitN(string(key), tagKeySeparator, 3) - return parts[1] + parts := strings.SplitN(string(key), tagKeySeparator, 4) + return parts[2] } -func keyForEvent(key string, value []byte, result *abci.TxResult) []byte { - return []byte(fmt.Sprintf("%s/%s/%d/%d", +func keyForEvent(key string, value []byte, result *abci.TxResult) string { + return fmt.Sprintf("%s/%s/%d/%d", key, value, result.Height, result.Index, - )) + ) } -func keyForHeight(result *abci.TxResult) []byte { - return []byte(fmt.Sprintf("%s/%d/%d/%d", +func keyForHeight(result *abci.TxResult) string { + return fmt.Sprintf("%s/%d/%d/%d", types.TxHeightKey, result.Height, result.Height, result.Index, - )) + ) } -func startKeyForCondition(c query.Condition, height int64) []byte { +func startKeyForCondition(c query.Condition, height int64) string { if height > 0 { return startKey(c.CompositeKey, c.Operand, height) } return startKey(c.CompositeKey, c.Operand) } -func startKey(fields ...interface{}) []byte { - var b bytes.Buffer - for _, f := range fields { - b.Write([]byte(fmt.Sprintf("%v", f) + tagKeySeparator)) - } - return b.Bytes() +func startKey(fields ...interface{}) string { + return store.GenerateKey(fields) } diff --git a/state/txindex/kv/kv_bench_test.go b/state/txindex/kv/kv_bench_test.go index 566046306b7..2ec6973b8a7 100644 --- a/state/txindex/kv/kv_bench_test.go +++ b/state/txindex/kv/kv_bench_test.go @@ -20,12 +20,12 @@ func BenchmarkTxSearch(b *testing.B) { b.Errorf("failed to create temporary directory: %s", err) } - db := store.NewDefaultKVStore(dbDir, "db", "benchmark_tx_search_test") + db, _ := store.NewDefaultKVStore(dbDir, "db", "benchmark_tx_search_test") if err != nil { b.Errorf("failed to create database: %s", err) } - indexer := NewTxIndex(db) + indexer := NewTxIndex(context.Background(), db) for i := 0; i < 35000; i++ { events := []abci.Event{ diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 48b9c058891..193239bb18f 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -2,12 +2,14 @@ package kv import ( "context" + "encoding/hex" "fmt" "io/ioutil" "os" "testing" "github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-datastore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,7 +23,8 @@ import ( ) func TestTxIndex(t *testing.T) { - indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + kvStore, _ := store.NewDefaultInMemoryKVStore() + indexer := NewTxIndex(context.Background(), kvStore) tx := types.Tx("HELLO WORLD") txResult := &abci.TxResult{ @@ -67,7 +70,8 @@ func TestTxIndex(t *testing.T) { } func TestTxSearch(t *testing.T) { - indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + kvStore, _ := store.NewDefaultInMemoryKVStore() + indexer := NewTxIndex(context.Background(), kvStore) txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}}}, @@ -141,7 +145,8 @@ func TestTxSearch(t *testing.T) { } func TestTxSearchWithCancelation(t *testing.T) { - indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + kvStore, _ := store.NewDefaultInMemoryKVStore() + indexer := NewTxIndex(context.Background(), kvStore) txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}}}, @@ -159,7 +164,9 @@ func TestTxSearchWithCancelation(t *testing.T) { } func TestTxSearchDeprecatedIndexing(t *testing.T) { - indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + ctx := context.Background() + kvStore, _ := store.NewDefaultInMemoryKVStore() + indexer := NewTxIndex(ctx, kvStore) // index tx using events indexing (composite key) txResult1 := txResultWithEvents([]abci.Event{ @@ -175,25 +182,27 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { txResult2.Tx = types.Tx("HELLO WORLD 2") hash2 := types.Tx(txResult2.Tx).Hash() - b := indexer.store.NewBatch() + + b, _ := indexer.store.NewTransaction(ctx, false) + defer b.Discard(ctx) rawBytes, err := proto.Marshal(txResult2) require.NoError(t, err) - depKey := []byte(fmt.Sprintf("%s/%s/%d/%d", + depKey := fmt.Sprintf("%s/%s/%d/%d", "sender", "addr1", txResult2.Height, txResult2.Index, - )) + ) - err = b.Set(depKey, hash2) + err = b.Put(ctx, ds.NewKey(depKey), hash2) require.NoError(t, err) - err = b.Set(keyForHeight(txResult2), hash2) + err = b.Put(ctx, ds.NewKey(keyForHeight(txResult2)), hash2) require.NoError(t, err) - err = b.Set(hash2, rawBytes) + err = b.Put(ctx, ds.NewKey(hex.EncodeToString(hash2)), rawBytes) require.NoError(t, err) - err = b.Commit() + err = b.Commit(ctx) require.NoError(t, err) testCases := []struct { @@ -221,8 +230,6 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { {"sender = 'addr1'", []*abci.TxResult{txResult2}}, } - ctx := context.Background() - for _, tc := range testCases { tc := tc t.Run(tc.q, func(t *testing.T) { @@ -238,7 +245,8 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { } func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { - indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + kvStore, _ := store.NewDefaultInMemoryKVStore() + indexer := NewTxIndex(context.Background(), kvStore) txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}}}, @@ -260,7 +268,8 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { } func TestTxSearchMultipleTxs(t *testing.T) { - indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) + kvStore, _ := store.NewDefaultInMemoryKVStore() + indexer := NewTxIndex(context.Background(), kvStore) // indexed first, but bigger height (to test the order of transactions) txResult := txResultWithEvents([]abci.Event{ @@ -333,9 +342,9 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - store := store.NewDefaultKVStore(dir, "db", "tx_index") + store, err := store.NewDefaultKVStore(dir, "db", "tx_index") require.NoError(b, err) - indexer := NewTxIndex(store) + indexer := NewTxIndex(context.Background(), store) batch := txindex.NewBatch(txsCount) txIndex := uint32(0) diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 40f7362b114..fadd9a642a9 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -4,9 +4,10 @@ import ( "context" "errors" - "github.com/celestiaorg/rollmint/state/txindex" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" + + "github.com/celestiaorg/rollmint/state/txindex" ) var _ txindex.TxIndexer = (*TxIndex)(nil) diff --git a/store/badger.go b/store/badger.go deleted file mode 100644 index 2e6d0457082..00000000000 --- a/store/badger.go +++ /dev/null @@ -1,151 +0,0 @@ -package store - -import ( - "errors" - - "github.com/dgraph-io/badger/v3" -) - -var _ KVStore = &BadgerKV{} -var _ Batch = &BadgerBatch{} - -var ( - // ErrKeyNotFound is returned if key is not found in KVStore. - ErrKeyNotFound = errors.New("key not found") -) - -// BadgerKV is a implementation of KVStore using Badger v3. -type BadgerKV struct { - db *badger.DB -} - -// Get returns value for given key, or error. -func (b *BadgerKV) Get(key []byte) ([]byte, error) { - txn := b.db.NewTransaction(false) - defer txn.Discard() - item, err := txn.Get(key) - if errors.Is(err, badger.ErrKeyNotFound) { - return nil, ErrKeyNotFound - } - if err != nil { - return nil, err - } - return item.ValueCopy(nil) -} - -// Set saves key-value mapping in store. -func (b *BadgerKV) Set(key []byte, value []byte) error { - txn := b.db.NewTransaction(true) - err := txn.Set(key, value) - if err != nil { - txn.Discard() - return err - } - return txn.Commit() -} - -// Delete removes key and corresponding value from store. -func (b *BadgerKV) Delete(key []byte) error { - txn := b.db.NewTransaction(true) - err := txn.Delete(key) - if err != nil { - txn.Discard() - return err - } - return txn.Commit() -} - -// NewBatch creates new batch. -// Note: badger batches should be short lived as they use extra resources. -func (b *BadgerKV) NewBatch() Batch { - return &BadgerBatch{ - txn: b.db.NewTransaction(true), - } -} - -// BadgerBatch encapsulates badger transaction -type BadgerBatch struct { - txn *badger.Txn -} - -// Set accumulates key-value entries in a transaction -func (bb *BadgerBatch) Set(key, value []byte) error { - if err := bb.txn.Set(key, value); err != nil { - return err - } - - return nil -} - -// Delete removes the key and associated value from store -func (bb *BadgerBatch) Delete(key []byte) error { - return bb.txn.Delete(key) -} - -// Commit commits a transaction -func (bb *BadgerBatch) Commit() error { - return bb.txn.Commit() -} - -// Discard cancels a transaction -func (bb *BadgerBatch) Discard() { - bb.txn.Discard() -} - -var _ Iterator = &BadgerIterator{} - -// PrefixIterator returns instance of prefix Iterator for BadgerKV. -func (b *BadgerKV) PrefixIterator(prefix []byte) Iterator { - txn := b.db.NewTransaction(false) - iter := txn.NewIterator(badger.DefaultIteratorOptions) - iter.Seek(prefix) - return &BadgerIterator{ - txn: txn, - iter: iter, - prefix: prefix, - lastError: nil, - } -} - -// BadgerIterator encapsulates prefix iterator for badger kv store. -type BadgerIterator struct { - txn *badger.Txn - iter *badger.Iterator - prefix []byte - lastError error -} - -// Valid returns true if iterator is inside its prefix, false otherwise. -func (i *BadgerIterator) Valid() bool { - return i.iter.ValidForPrefix(i.prefix) -} - -// Next progresses iterator to the next key-value pair. -func (i *BadgerIterator) Next() { - i.iter.Next() -} - -// Key returns key pointed by iterator. -func (i *BadgerIterator) Key() []byte { - return i.iter.Item().KeyCopy(nil) -} - -// Value returns value pointer by iterator. -func (i *BadgerIterator) Value() []byte { - val, err := i.iter.Item().ValueCopy(nil) - if err != nil { - i.lastError = err - } - return val -} - -// Error returns last error that occurred during iteration. -func (i *BadgerIterator) Error() error { - return i.lastError -} - -// Discard has to be called to free iterator resources. -func (i *BadgerIterator) Discard() { - i.iter.Close() - i.txn.Discard() -} diff --git a/store/badger_test.go b/store/badger_test.go deleted file mode 100644 index c2be027f9d5..00000000000 --- a/store/badger_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package store - -import ( - "errors" - "testing" - - "github.com/dgraph-io/badger/v3" -) - -func TestGetErrors(t *testing.T) { - dalcKV := NewDefaultInMemoryKVStore() - - tc := []struct { - name string - key []byte - err error - }{ - {"empty key", []byte{}, badger.ErrEmptyKey}, - {"not found key", []byte("missing key"), ErrKeyNotFound}, - } - - for _, tt := range tc { - t.Run(tt.name, func(t *testing.T) { - _, err := dalcKV.Get(tt.key) - if !errors.Is(err, tt.err) { - t.Errorf("Invalid err, got: %v expected %v", err, tt.err) - } - }) - } -} - -func TestSetErrors(t *testing.T) { - dalcKV := NewDefaultInMemoryKVStore() - - tc := []struct { - name string - key []byte - value []byte - err error - }{ - {"empty key", []byte{}, []byte{}, badger.ErrEmptyKey}, - {"invalid key", []byte("!badger!key"), []byte("invalid header"), badger.ErrInvalidKey}, - } - - for _, tt := range tc { - t.Run(tt.name, func(t *testing.T) { - err := dalcKV.Set(tt.key, tt.value) - if !errors.Is(tt.err, err) { - t.Errorf("Invalid err, got: %v expected %v", err, tt.err) - } - }) - } -} - -func TestDeleteErrors(t *testing.T) { - dalcKV := NewDefaultInMemoryKVStore() - - tc := []struct { - name string - key []byte - err error - }{ - {"empty key", []byte{}, badger.ErrEmptyKey}, - {"invalid key", []byte("!badger!key"), badger.ErrInvalidKey}, - } - - for _, tt := range tc { - t.Run(tt.name, func(t *testing.T) { - err := dalcKV.Delete(tt.key) - if !errors.Is(err, tt.err) { - t.Errorf("Invalid err, got: %v expected %v", err, tt.err) - } - }) - } -} diff --git a/store/kv.go b/store/kv.go index e118d2c0ccd..456d941eb4e 100644 --- a/store/kv.go +++ b/store/kv.go @@ -1,61 +1,53 @@ package store import ( + "bytes" + "context" + "fmt" + "path" "path/filepath" + "time" "github.com/dgraph-io/badger/v3" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + badger3 "github.com/ipfs/go-ds-badger3" ) -// KVStore encapsulates key-value store abstraction, in minimalistic interface. -// -// KVStore MUST be thread safe. -type KVStore interface { - Get(key []byte) ([]byte, error) // Get gets the value for a key. - Set(key []byte, value []byte) error // Set updates the value for a key. - Delete(key []byte) error // Delete deletes a key. - NewBatch() Batch // NewBatch creates a new batch. - PrefixIterator(prefix []byte) Iterator // PrefixIterator creates iterator to traverse given prefix. -} - -// Batch enables batching of transactions. -type Batch interface { - Set(key, value []byte) error // Accumulates KV entries in a transaction. - Delete(key []byte) error // Deletes the given key. - Commit() error // Commits the transaction. - Discard() // Discards the transaction. -} - -// Iterator enables traversal over a given prefix. -type Iterator interface { - Valid() bool - Next() - Key() []byte - Value() []byte - Error() error - Discard() -} - // NewDefaultInMemoryKVStore builds KVStore that works in-memory (without accessing disk). -func NewDefaultInMemoryKVStore() KVStore { - db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true)) - if err != nil { - panic(err) - } - return &BadgerKV{ - db: db, +func NewDefaultInMemoryKVStore() (ds.TxnDatastore, error) { + inMemoryOptions := &badger3.Options{ + GcDiscardRatio: 0.2, + GcInterval: 15 * time.Minute, + GcSleep: 10 * time.Second, + Options: badger.DefaultOptions("").WithInMemory(true), } + return badger3.NewDatastore("", inMemoryOptions) } // NewDefaultKVStore creates instance of default key-value store. -func NewDefaultKVStore(rootDir, dbPath, dbName string) KVStore { +func NewDefaultKVStore(rootDir, dbPath, dbName string) (ds.TxnDatastore, error) { path := filepath.Join(rootify(rootDir, dbPath), dbName) - db, err := badger.Open(badger.DefaultOptions(path)) + return badger3.NewDatastore(path, nil) +} + +// PrefixEntries retrieves all entries in the datastore whose keys have the supplied prefix +func PrefixEntries(ctx context.Context, store ds.Datastore, prefix string) (dsq.Results, error) { + results, err := store.Query(ctx, dsq.Query{Prefix: prefix}) if err != nil { - panic(err) + return nil, err } - return &BadgerKV{ - db: db, + return results, nil +} + +// GenerateKey ... +func GenerateKey(fields []interface{}) string { + var b bytes.Buffer + b.WriteString("/") + for _, f := range fields { + b.Write([]byte(fmt.Sprintf("%v", f) + "/")) } + return path.Clean(b.String()) } // rootify works just like in cosmos-sdk diff --git a/store/prefix.go b/store/prefix.go deleted file mode 100644 index 84255b5478c..00000000000 --- a/store/prefix.go +++ /dev/null @@ -1,72 +0,0 @@ -package store - -var _ KVStore = &PrefixKV{} -var _ Batch = &PrefixKVBatch{} - -// PrefixKV is a key-value store that prepends all keys with given prefix. -type PrefixKV struct { - kv KVStore - prefix []byte -} - -// NewPrefixKV creates new PrefixKV on top of other KVStore. -func NewPrefixKV(kv KVStore, prefix []byte) *PrefixKV { - return &PrefixKV{ - kv: kv, - prefix: prefix, - } -} - -// Get returns value for given key. -func (p *PrefixKV) Get(key []byte) ([]byte, error) { - return p.kv.Get(append(p.prefix, key...)) -} - -// Set updates the value for given key. -func (p *PrefixKV) Set(key []byte, value []byte) error { - return p.kv.Set(append(p.prefix, key...), value) -} - -// Delete deletes key-value pair for given key. -func (p *PrefixKV) Delete(key []byte) error { - return p.kv.Delete(append(p.prefix, key...)) -} - -// NewBatch creates a new batch. -func (p *PrefixKV) NewBatch() Batch { - return &PrefixKVBatch{ - b: p.kv.NewBatch(), - prefix: p.prefix, - } -} - -// PrefixIterator creates iterator to traverse given prefix. -func (p *PrefixKV) PrefixIterator(prefix []byte) Iterator { - return p.kv.PrefixIterator(append(p.prefix, prefix...)) -} - -// PrefixKVBatch enables batching of operations on PrefixKV. -type PrefixKVBatch struct { - b Batch - prefix []byte -} - -// Set adds key-value pair to batch. -func (pb *PrefixKVBatch) Set(key, value []byte) error { - return pb.b.Set(append(pb.prefix, key...), value) -} - -// Delete adds delete operation to batch. -func (pb *PrefixKVBatch) Delete(key []byte) error { - return pb.b.Delete(append(pb.prefix, key...)) -} - -// Commit applies all operations in the batch atomically. -func (pb *PrefixKVBatch) Commit() error { - return pb.b.Commit() -} - -// Discard discards all operations in the batch. -func (pb *PrefixKVBatch) Discard() { - pb.b.Discard() -} diff --git a/store/prefix_test.go b/store/prefix_test.go index b95e40cf595..0578afb2bf6 100644 --- a/store/prefix_test.go +++ b/store/prefix_test.go @@ -1,25 +1,29 @@ package store import ( + "context" "testing" + ds "github.com/ipfs/go-datastore" + ktds "github.com/ipfs/go-datastore/keytransform" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestPrefixKV(t *testing.T) { +func TestPrefixKV1(t *testing.T) { t.Parallel() assert := assert.New(t) require := require.New(t) - base := NewDefaultInMemoryKVStore() + ctx := context.Background() + base, _ := NewDefaultInMemoryKVStore() - p1 := NewPrefixKV(base, []byte{1}) - p2 := NewPrefixKV(base, []byte{2}) + p1 := ktds.Wrap(base, ktds.PrefixTransform{Prefix: ds.NewKey("1")}) + p2 := ktds.Wrap(base, ktds.PrefixTransform{Prefix: ds.NewKey("2")}) - key1 := []byte("key1") - key2 := []byte("key2") + key1 := ds.NewKey("key1") + key2 := ds.NewKey("key2") val11 := []byte("val11") val21 := []byte("val21") @@ -27,47 +31,47 @@ func TestPrefixKV(t *testing.T) { val22 := []byte("val22") // set different values in each preffix - err := p1.Set(key1, val11) + err := p1.Put(ctx, key1, val11) require.NoError(err) - err = p1.Set(key2, val12) + err = p1.Put(ctx, key2, val12) require.NoError(err) - err = p2.Set(key1, val21) + err = p2.Put(ctx, key1, val21) require.NoError(err) - err = p2.Set(key2, val22) + err = p2.Put(ctx, key2, val22) require.NoError(err) // ensure that each PrefixKV returns proper data - v, err := p1.Get(key1) + v, err := p1.Get(ctx, key1) require.NoError(err) assert.Equal(val11, v) - v, err = p2.Get(key1) + v, err = p2.Get(ctx, key1) require.NoError(err) assert.Equal(val21, v) - v, err = p1.Get(key2) + v, err = p1.Get(ctx, key2) require.NoError(err) assert.Equal(val12, v) - v, err = p2.Get(key2) + v, err = p2.Get(ctx, key2) require.NoError(err) assert.Equal(val22, v) // delete from one prefix, ensure that second contains data - err = p1.Delete(key1) + err = p1.Delete(ctx, key1) require.NoError(err) - err = p1.Delete(key2) + err = p1.Delete(ctx, key2) require.NoError(err) - v, err = p2.Get(key1) + v, err = p2.Get(ctx, key1) require.NoError(err) assert.Equal(val21, v) - v, err = p2.Get(key2) + v, err = p2.Get(ctx, key2) require.NoError(err) assert.Equal(val22, v) } @@ -78,29 +82,33 @@ func TestPrefixKVBatch(t *testing.T) { assert := assert.New(t) require := require.New(t) - basekv := NewDefaultInMemoryKVStore() - prefixkv := NewPrefixKV(basekv, []byte("prefix1")) - prefixbatchkv1 := prefixkv.NewBatch() + ctx := context.Background() - keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3"), []byte("key4")} + basekv, _ := NewDefaultInMemoryKVStore() + prefixkv := ktds.Wrap(basekv, ktds.PrefixTransform{Prefix: ds.NewKey("prefix1")}).Children()[0] + + badgerPrefixkv, _ := prefixkv.(ds.TxnDatastore) + prefixbatchkv1, _ := badgerPrefixkv.NewTransaction(ctx, false) + + keys := []ds.Key{ds.NewKey("key1"), ds.NewKey("key2"), ds.NewKey("key3"), ds.NewKey("key4")} values := [][]byte{[]byte("value1"), []byte("value2"), []byte("value3"), []byte("value4")} for i := 0; i < len(keys); i++ { - err := prefixbatchkv1.Set(keys[i], values[i]) + err := prefixbatchkv1.Put(ctx, keys[i], values[i]) require.NoError(err) } - err := prefixbatchkv1.Commit() + err := prefixbatchkv1.Commit(ctx) require.NoError(err) for i := 0; i < len(keys); i++ { - vals, err := prefixkv.Get(keys[i]) + vals, err := prefixkv.Get(ctx, keys[i]) assert.Equal(vals, values[i]) require.NoError(err) } - prefixbatchkv2 := prefixkv.NewBatch() - err = prefixbatchkv2.Delete([]byte("key1")) + prefixbatchkv2, _ := badgerPrefixkv.NewTransaction(ctx, false) + err = prefixbatchkv2.Delete(ctx, ds.NewKey("key1")) require.NoError(err) } diff --git a/store/store.go b/store/store.go index e3af8d6121c..ae0210fa611 100644 --- a/store/store.go +++ b/store/store.go @@ -1,11 +1,13 @@ package store import ( - "encoding/binary" + "context" + "encoding/hex" "errors" "fmt" "sync/atomic" + ds "github.com/ipfs/go-datastore" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" tmtypes "github.com/tendermint/tendermint/types" @@ -16,27 +18,29 @@ import ( ) var ( - blockPrefix = [1]byte{1} - indexPrefix = [1]byte{2} - commitPrefix = [1]byte{3} - statePrefix = [1]byte{4} - responsesPrefix = [1]byte{5} - validatorsPrefix = [1]byte{6} + blockPrefix = "b" + indexPrefix = "i" + commitPrefix = "c" + statePrefix = "s" + responsesPrefix = "r" + validatorsPrefix = "v" ) // DefaultStore is a default store implmementation. type DefaultStore struct { - db KVStore + db ds.TxnDatastore height uint64 + ctx context.Context } var _ Store = &DefaultStore{} // New returns new, default store. -func New(kv KVStore) Store { +func New(ctx context.Context, ds ds.TxnDatastore) Store { return &DefaultStore{ - db: kv, + db: ds, + ctx: ctx, } } @@ -67,17 +71,21 @@ func (s *DefaultStore) SaveBlock(block *types.Block, commit *types.Commit) error return fmt.Errorf("failed to marshal Commit to binary: %w", err) } - bb := s.db.NewBatch() - err = multierr.Append(err, bb.Set(getBlockKey(hash), blockBlob)) - err = multierr.Append(err, bb.Set(getCommitKey(hash), commitBlob)) - err = multierr.Append(err, bb.Set(getIndexKey(block.Header.Height), hash[:])) + bb, err := s.db.NewTransaction(s.ctx, false) + if err != nil { + return fmt.Errorf("failed to create a new batch for transaction: %w", err) + } + + err = multierr.Append(err, bb.Put(s.ctx, ds.NewKey(getBlockKey(hash)), blockBlob)) + err = multierr.Append(err, bb.Put(s.ctx, ds.NewKey(getCommitKey(hash)), commitBlob)) + err = multierr.Append(err, bb.Put(s.ctx, ds.NewKey(getIndexKey(block.Header.Height)), hash[:])) if err != nil { - bb.Discard() + bb.Discard(s.ctx) return err } - if err = bb.Commit(); err != nil { + if err = bb.Commit(s.ctx); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } @@ -98,7 +106,7 @@ func (s *DefaultStore) LoadBlock(height uint64) (*types.Block, error) { // LoadBlockByHash returns block with given block header hash, or error if it's not found in Store. func (s *DefaultStore) LoadBlockByHash(hash [32]byte) (*types.Block, error) { - blockData, err := s.db.Get(getBlockKey(hash)) + blockData, err := s.db.Get(s.ctx, ds.NewKey(getBlockKey(hash))) if err != nil { return nil, fmt.Errorf("failed to load block data: %w", err) } @@ -117,12 +125,12 @@ func (s *DefaultStore) SaveBlockResponses(height uint64, responses *tmstate.ABCI if err != nil { return fmt.Errorf("failed to marshal response: %w", err) } - return s.db.Set(getResponsesKey(height), data) + return s.db.Put(s.ctx, ds.NewKey(getResponsesKey(height)), data) } // LoadBlockResponses returns block results at given height, or error if it's not found in Store. func (s *DefaultStore) LoadBlockResponses(height uint64) (*tmstate.ABCIResponses, error) { - data, err := s.db.Get(getResponsesKey(height)) + data, err := s.db.Get(s.ctx, ds.NewKey(getResponsesKey(height))) if err != nil { return nil, fmt.Errorf("failed to retrieve block results from height %v: %w", height, err) } @@ -145,7 +153,7 @@ func (s *DefaultStore) LoadCommit(height uint64) (*types.Commit, error) { // LoadCommitByHash returns commit for a block with given block header hash, or error if it's not found in Store. func (s *DefaultStore) LoadCommitByHash(hash [32]byte) (*types.Commit, error) { - commitData, err := s.db.Get(getCommitKey(hash)) + commitData, err := s.db.Get(s.ctx, ds.NewKey(getCommitKey(hash))) if err != nil { return nil, fmt.Errorf("failed to retrieve commit from hash %v: %w", hash, err) } @@ -168,12 +176,12 @@ func (s *DefaultStore) UpdateState(state types.State) error { if err != nil { return err } - return s.db.Set(getStateKey(), data) + return s.db.Put(s.ctx, ds.NewKey(getStateKey()), data) } // LoadState returns last state saved with UpdateState. func (s *DefaultStore) LoadState() (types.State, error) { - blob, err := s.db.Get(getStateKey()) + blob, err := s.db.Get(s.ctx, ds.NewKey(getStateKey())) if err != nil { return types.State{}, fmt.Errorf("failed to retrieve state: %w", err) } @@ -200,12 +208,12 @@ func (s *DefaultStore) SaveValidators(height uint64, validatorSet *tmtypes.Valid return fmt.Errorf("failed to marshal ValidatorSet: %w", err) } - return s.db.Set(getValidatorsKey(height), blob) + return s.db.Put(s.ctx, ds.NewKey(getValidatorsKey(height)), blob) } // LoadValidators loads validator set at given block height from store. func (s *DefaultStore) LoadValidators(height uint64) (*tmtypes.ValidatorSet, error) { - blob, err := s.db.Get(getValidatorsKey(height)) + blob, err := s.db.Get(s.ctx, ds.NewKey(getValidatorsKey(height))) if err != nil { return nil, fmt.Errorf("failed to load Validators for height %v: %w", height, err) } @@ -218,8 +226,9 @@ func (s *DefaultStore) LoadValidators(height uint64) (*tmtypes.ValidatorSet, err return tmtypes.ValidatorSetFromProto(&pbValSet) } +// loadHashFromIndex returns the hash of a block given its height func (s *DefaultStore) loadHashFromIndex(height uint64) ([32]byte, error) { - blob, err := s.db.Get(getIndexKey(height)) + blob, err := s.db.Get(s.ctx, ds.NewKey(getIndexKey(height))) var hash [32]byte if err != nil { @@ -232,32 +241,26 @@ func (s *DefaultStore) loadHashFromIndex(height uint64) ([32]byte, error) { return hash, nil } -func getBlockKey(hash [32]byte) []byte { - return append(blockPrefix[:], hash[:]...) +func getBlockKey(hash [32]byte) string { + return GenerateKey([]interface{}{blockPrefix, hex.EncodeToString(hash[:])}) } -func getCommitKey(hash [32]byte) []byte { - return append(commitPrefix[:], hash[:]...) +func getCommitKey(hash [32]byte) string { + return GenerateKey([]interface{}{commitPrefix, hex.EncodeToString(hash[:])}) } -func getIndexKey(height uint64) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, height) - return append(indexPrefix[:], buf[:]...) +func getIndexKey(height uint64) string { + return GenerateKey([]interface{}{indexPrefix, height}) } -func getStateKey() []byte { - return statePrefix[:] +func getStateKey() string { + return statePrefix } -func getResponsesKey(height uint64) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, height) - return append(responsesPrefix[:], buf[:]...) +func getResponsesKey(height uint64) string { + return GenerateKey([]interface{}{responsesPrefix, height}) } -func getValidatorsKey(height uint64) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, height) - return append(validatorsPrefix[:], buf[:]...) +func getValidatorsKey(height uint64) string { + return GenerateKey([]interface{}{validatorsPrefix, height}) } diff --git a/store/store_test.go b/store/store_test.go index 9c66e17b2b5..9dee2c8cc95 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,10 +1,12 @@ package store import ( + "context" "math/rand" "os" "testing" + ds "github.com/ipfs/go-datastore" abcitypes "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/ed25519" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" @@ -43,7 +45,8 @@ func TestStoreHeight(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { assert := assert.New(t) - bstore := New(NewDefaultInMemoryKVStore()) + ds, _ := NewDefaultInMemoryKVStore() + bstore := New(context.Background(), ds) assert.Equal(uint64(0), bstore.Height()) for _, block := range c.blocks { @@ -87,13 +90,15 @@ func TestStoreLoad(t *testing.T) { } }() - for _, kv := range []KVStore{NewDefaultInMemoryKVStore(), NewDefaultKVStore(tmpDir, "db", "test")} { + mKV, _ := NewDefaultInMemoryKVStore() + dKV, _ := NewDefaultKVStore(tmpDir, "db", "test") + for _, kv := range []ds.TxnDatastore{mKV, dKV} { for _, c := range cases { t.Run(c.name, func(t *testing.T) { assert := assert.New(t) require := require.New(t) - bstore := New(kv) + bstore := New(context.Background(), kv) lastCommit := &types.Commit{} for _, block := range c.blocks { @@ -132,8 +137,9 @@ func TestRestart(t *testing.T) { validatorSet := getRandomValidatorSet() - kv := NewDefaultInMemoryKVStore() - s1 := New(kv) + ctx := context.Background() + kv, _ := NewDefaultInMemoryKVStore() + s1 := New(ctx, kv) expectedHeight := uint64(10) err := s1.UpdateState(types.State{ LastBlockHeight: int64(expectedHeight), @@ -143,7 +149,7 @@ func TestRestart(t *testing.T) { }) assert.NoError(err) - s2 := New(kv) + s2 := New(ctx, kv) _, err = s2.LoadState() assert.NoError(err) @@ -154,8 +160,8 @@ func TestBlockResponses(t *testing.T) { t.Parallel() assert := assert.New(t) - kv := NewDefaultInMemoryKVStore() - s := New(kv) + kv, _ := NewDefaultInMemoryKVStore() + s := New(context.Background(), kv) expected := &tmstate.ABCIResponses{ BeginBlock: &abcitypes.ResponseBeginBlock{