Skip to content

Commit

Permalink
migrate datasource from store.KVStore to datastore.Datastore (cosmos#665
Browse files Browse the repository at this point in the history
)

* fixing merge conflicts

* fix breaking changes related to substore, prefixing, and keys

* fix all the failing tests

* test that is no longer needed

* fix missing merge conflict

* fixing golangci-lint errors

* more lint errors

* goimports fixes

* fixing golangci-lint failures

* changing from DataStore to TxnDatasource, cutting some LOC, prefix entires as iterator, etc

Co-authored-by: Ganesha Upadhyaya <[email protected]>
  • Loading branch information
gupadhyaya and Ganesha Upadhyaya authored Jan 9, 2023
1 parent de83cbc commit 90ff206
Show file tree
Hide file tree
Showing 34 changed files with 396 additions and 701 deletions.
8 changes: 6 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package block

import (
"context"
"crypto/rand"
"testing"
"time"
Expand Down Expand Up @@ -33,9 +34,12 @@ func TestInitialState(t *testing.T) {
NextValidators: getRandomValidatorSet(),
}

emptyStore := store.New(store.NewDefaultInMemoryKVStore())
ctx := context.Background()
es, _ := store.NewDefaultInMemoryKVStore()
emptyStore := store.New(ctx, es)

fullStore := store.New(store.NewDefaultInMemoryKVStore())
es2, _ := store.NewDefaultInMemoryKVStore()
fullStore := store.New(ctx, es2)
err := fullStore.UpdateState(sampleState)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"time"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"

"github.com/celestiaorg/go-cnc"
"github.com/celestiaorg/rollmint/da"
"github.com/celestiaorg/rollmint/log"
"github.com/celestiaorg/rollmint/store"
"github.com/celestiaorg/rollmint/types"
pb "github.com/celestiaorg/rollmint/types/pb/rollmint"
)
Expand All @@ -37,7 +37,7 @@ type Config struct {
}

// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(namespaceID types.NamespaceID, config []byte, kvStore store.KVStore, logger log.Logger) error {
func (c *DataAvailabilityLayerClient) Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error {
c.namespaceID = namespaceID
c.logger = logger

Expand Down
6 changes: 5 additions & 1 deletion da/celestia/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func NewServer(blockTime time.Duration, logger log.Logger) *Server {

// Start starts HTTP server with given listener.
func (s *Server) Start(listener net.Listener) error {
err := s.mock.Init([8]byte{}, []byte(s.blockTime.String()), store.NewDefaultInMemoryKVStore(), s.logger)
kvStore, err := store.NewDefaultInMemoryKVStore()
if err != nil {
return err
}
err = s.mock.Init([8]byte{}, []byte(s.blockTime.String()), kvStore, s.logger)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package da
import (
"context"

ds "github.com/ipfs/go-datastore"

"github.com/celestiaorg/rollmint/log"
"github.com/celestiaorg/rollmint/store"
"github.com/celestiaorg/rollmint/types"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ type ResultRetrieveBlocks struct {
// It also contains life-cycle methods.
type DataAvailabilityLayerClient interface {
// Init is called once to allow DA client to read configuration and initialize resources.
Init(namespaceID types.NamespaceID, config []byte, kvStore store.KVStore, logger log.Logger) error
Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error

// Start is called once, after Init. It's implementation should start operation of DataAvailabilityLayerClient.
Start() error
Expand Down
5 changes: 3 additions & 2 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (

"google.golang.org/grpc"

ds "github.com/ipfs/go-datastore"

"github.com/celestiaorg/rollmint/da"
"github.com/celestiaorg/rollmint/log"
"github.com/celestiaorg/rollmint/store"
"github.com/celestiaorg/rollmint/types"
"github.com/celestiaorg/rollmint/types/pb/dalc"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
var _ da.BlockRetriever = &DataAvailabilityLayerClient{}

// Init sets the configuration options.
func (d *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, _ store.KVStore, logger log.Logger) error {
func (d *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, _ ds.Datastore, logger log.Logger) error {
d.logger = logger
if len(config) == 0 {
d.config = DefaultConfig
Expand Down
5 changes: 4 additions & 1 deletion da/grpc/mockserv/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ func main() {
flag.StringVar(&conf.Host, "host", "0.0.0.0", "listening address")
flag.Parse()

kv := store.NewDefaultKVStore(".", "db", "rollmint")
kv, err := store.NewDefaultKVStore(".", "db", "rollmint")
if err != nil {
log.Panic(err)
}
lis, err := net.Listen("tcp", conf.Host+":"+strconv.Itoa(conf.Port))
if err != nil {
log.Panic(err)
Expand Down
4 changes: 2 additions & 2 deletions da/grpc/mockserv/mockserv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ package mockserv
import (
"context"

ds "github.com/ipfs/go-datastore"
tmlog "github.com/tendermint/tendermint/libs/log"
"google.golang.org/grpc"

grpcda "github.com/celestiaorg/rollmint/da/grpc"
"github.com/celestiaorg/rollmint/da/mock"
"github.com/celestiaorg/rollmint/store"
"github.com/celestiaorg/rollmint/types"
"github.com/celestiaorg/rollmint/types/pb/dalc"
"github.com/celestiaorg/rollmint/types/pb/rollmint"
)

// GetServer creates and returns gRPC server instance.
func GetServer(kv store.KVStore, conf grpcda.Config, mockConfig []byte, logger tmlog.Logger) *grpc.Server {
func GetServer(kv ds.Datastore, conf grpcda.Config, mockConfig []byte, logger tmlog.Logger) *grpc.Server {
srv := grpc.NewServer()
mockImpl := &mockImpl{}
err := mockImpl.mock.Init([8]byte{}, mockConfig, kv, logger)
Expand Down
40 changes: 18 additions & 22 deletions da/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package mock

import (
"context"
"encoding/binary"
"encoding/hex"
"math/rand"
"sync/atomic"
"time"

ds "github.com/ipfs/go-datastore"

"github.com/celestiaorg/rollmint/da"
"github.com/celestiaorg/rollmint/log"
"github.com/celestiaorg/rollmint/store"
Expand All @@ -17,7 +19,7 @@ import (
// It does actually ensures DA - it stores data in-memory.
type DataAvailabilityLayerClient struct {
logger log.Logger
dalcKV store.KVStore
dalcKV ds.Datastore
daHeight uint64
config config
}
Expand All @@ -32,7 +34,7 @@ var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
var _ da.BlockRetriever = &DataAvailabilityLayerClient{}

// Init is called once to allow DA client to read configuration and initialize resources.
func (m *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, dalcKV store.KVStore, logger log.Logger) error {
func (m *DataAvailabilityLayerClient) Init(_ types.NamespaceID, config []byte, dalcKV ds.Datastore, logger log.Logger) error {
m.logger = logger
m.dalcKV = dalcKV
m.daHeight = 1
Expand Down Expand Up @@ -79,11 +81,12 @@ func (m *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}

err = m.dalcKV.Set(getKey(daHeight, block.Header.Height), hash[:])
err = m.dalcKV.Put(ctx, getKey(daHeight, block.Header.Height), hash[:])
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
err = m.dalcKV.Set(hash[:], blob)

err = m.dalcKV.Put(ctx, ds.NewKey(hex.EncodeToString(hash[:])), blob)
if err != nil {
return da.ResultSubmitBlock{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
Expand All @@ -109,14 +112,14 @@ func (m *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, daHeig
return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: "block not found"}}
}

iter := m.dalcKV.PrefixIterator(getPrefix(daHeight))
defer iter.Discard()
results, err := store.PrefixEntries(ctx, m.dalcKV, getPrefix(daHeight))
if err != nil {
return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}

var blocks []*types.Block
for iter.Valid() {
hash := iter.Value()

blob, err := m.dalcKV.Get(hash)
for result := range results.Next() {
blob, err := m.dalcKV.Get(ctx, ds.NewKey(hex.EncodeToString(result.Entry.Value)))
if err != nil {
return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
Expand All @@ -127,24 +130,17 @@ func (m *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, daHeig
return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}
blocks = append(blocks, block)

iter.Next()
}

return da.ResultRetrieveBlocks{BaseResult: da.BaseResult{Code: da.StatusSuccess}, Blocks: blocks}
}

func getPrefix(daHeight uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, daHeight)
return b
func getPrefix(daHeight uint64) string {
return store.GenerateKey([]interface{}{daHeight})
}

func getKey(daHeight uint64, height uint64) []byte {
b := make([]byte, 16)
binary.BigEndian.PutUint64(b, daHeight)
binary.BigEndian.PutUint64(b[8:], height)
return b
func getKey(daHeight uint64, height uint64) ds.Key {
return ds.NewKey(store.GenerateKey([]interface{}{daHeight, height}))
}

func (m *DataAvailabilityLayerClient) updateDAHeight() {
Expand Down
9 changes: 6 additions & 3 deletions da/test/da_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) {
}
conf, _ = json.Marshal(config)
}
err := dalc.Init(testNamespaceID, conf, store.NewDefaultInMemoryKVStore(), test.NewLogger(t))
kvStore, _ := store.NewDefaultInMemoryKVStore()
err := dalc.Init(testNamespaceID, conf, kvStore, test.NewLogger(t))
require.NoError(err)

err = dalc.Start()
Expand Down Expand Up @@ -149,7 +150,8 @@ func startMockGRPCServ(t *testing.T) *grpc.Server {
conf := grpcda.DefaultConfig
logger := tmlog.NewTMLogger(os.Stdout)

srv := mockserv.GetServer(store.NewDefaultInMemoryKVStore(), conf, []byte(mockDaBlockTime.String()), logger)
kvStore, _ := store.NewDefaultInMemoryKVStore()
srv := mockserv.GetServer(kvStore, conf, []byte(mockDaBlockTime.String()), logger)
lis, err := net.Listen("tcp", conf.Host+":"+strconv.Itoa(conf.Port))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -192,7 +194,8 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {
}
conf, _ = json.Marshal(config)
}
err := dalc.Init(testNamespaceID, conf, store.NewDefaultInMemoryKVStore(), test.NewLogger(t))
kvStore, _ := store.NewDefaultInMemoryKVStore()
err := dalc.Init(testNamespaceID, conf, kvStore, test.NewLogger(t))
require.NoError(err)

err = dalc.Start()
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/rpc v1.2.0
github.com/gorilla/websocket v1.5.0
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger3 v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-kad-dht v0.20.0
Expand Down Expand Up @@ -73,7 +75,6 @@ require (
github.com/huin/goupnp v1.0.3 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/ipfs/go-cid v0.3.2 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipns v0.2.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger3 v0.0.2 h1:+pME0YfRnbUKhvySnakNMuCMsUUhmGfwIsH/nnHZ7QY=
github.com/ipfs/go-ds-badger3 v0.0.2/go.mod h1:6/yjF1KaOU+IpCaqMV43yoWIdxHqOAJlO9EhWLnZSkI=
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ=
github.com/ipfs/go-ipns v0.2.0 h1:BgmNtQhqOw5XEZ8RAfWEpK4DhqaYiuP6h71MhIp7xXU=
Expand Down
3 changes: 2 additions & 1 deletion mempool/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package mempool
import (
"container/list"

"github.com/tendermint/tendermint/types"
"sync"

"github.com/tendermint/tendermint/types"
)

// TxCache defines an interface for raw transaction caching in a mempool.
Expand Down
2 changes: 0 additions & 2 deletions mempool/clist/clist.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
const MaxLength = int(^uint(0) >> 1)

/*
CElement is an element of a linked-list
Traversal from a CElement is goroutine-safe.
Expand All @@ -41,7 +40,6 @@ the for-loop. Use sync.Cond when you need serial access to the
"condition". In our case our condition is if `next != nil || removed`,
and there's no reason to serialize that condition for goroutines
waiting on NextWait() (since it's just a read operation).
*/
type CElement struct {
mtx tmsync.RWMutex
Expand Down
2 changes: 2 additions & 0 deletions mempool/clist/clist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestSmall(t *testing.T) {

// This test is quite hacky because it relies on SetFinalizer
// which isn't guaranteed to run at all.
//
//nolint:unused,deadcode
func _TestGCFifo(t *testing.T) {
if runtime.GOARCH != "amd64" {
Expand Down Expand Up @@ -117,6 +118,7 @@ func _TestGCFifo(t *testing.T) {

// This test is quite hacky because it relies on SetFinalizer
// which isn't guaranteed to run at all.
//
//nolint:unused,deadcode
func _TestGCRandom(t *testing.T) {
if runtime.GOARCH != "amd64" {
Expand Down
1 change: 1 addition & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"errors"
"fmt"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/types"
)
Expand Down
3 changes: 2 additions & 1 deletion node/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func createNodes(aggCtx, ctx context.Context, num int, isMalicious bool, wg *syn
nodes := make([]*Node, num)
apps := make([]*mocks.Application, num)
dalc := &mockda.DataAvailabilityLayerClient{}
_ = dalc.Init([8]byte{}, nil, store.NewDefaultInMemoryKVStore(), log.TestingLogger())
ds, _ := store.NewDefaultInMemoryKVStore()
_ = dalc.Init([8]byte{}, nil, ds, log.TestingLogger())
_ = dalc.Start()
nodes[0], apps[0] = createNode(aggCtx, 0, isMalicious, true, dalc, keys, wg, t)
for i := 1; i < num; i++ {
Expand Down
Loading

0 comments on commit 90ff206

Please sign in to comment.