Skip to content

Commit

Permalink
feat(store/v2): snapshot manager (#18458)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Dec 7, 2023
1 parent 14bb52a commit f6df368
Show file tree
Hide file tree
Showing 26 changed files with 777 additions and 261 deletions.
5 changes: 3 additions & 2 deletions store/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ package store
// track writes. Deletion can be denoted by a nil value or explicitly by the
// Delete field.
type KVPair struct {
Key []byte
Value []byte
Key []byte
Value []byte
StoreKey string // Optional for snapshot restore
}

type KVPairs []KVPair
Expand Down
40 changes: 40 additions & 0 deletions store/commitment/iavl/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package iavl

import (
"errors"

"github.com/cosmos/iavl"

"cosmossdk.io/store/v2/commitment"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)

// Exporter is a wrapper around iavl.Exporter.
type Exporter struct {
exporter *iavl.Exporter
}

// Next returns the next item in the exporter.
func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) {
item, err := e.exporter.Next()
if err != nil {
if errors.Is(err, iavl.ErrorExportDone) {
return nil, commitment.ErrorExportDone
}
return nil, err
}

return &snapshotstypes.SnapshotIAVLItem{
Key: item.Key,
Value: item.Value,
Version: item.Version,
Height: int32(item.Height),
}, nil
}

// Close closes the exporter.
func (e *Exporter) Close() error {
e.exporter.Close()

return nil
}
34 changes: 34 additions & 0 deletions store/commitment/iavl/importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package iavl

import (
"github.com/cosmos/iavl"

snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)

// Importer is a wrapper around iavl.Importer.
type Importer struct {
importer *iavl.Importer
}

// Add adds the given item to the importer.
func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error {
return i.importer.Add(&iavl.ExportNode{
Key: item.Key,
Value: item.Value,
Version: item.Version,
Height: int8(item.Height),
})
}

// Commit commits the importer.
func (i *Importer) Commit() error {
return i.importer.Commit()
}

// Close closes the importer.
func (i *Importer) Close() error {
i.importer.Close()

return nil
}
28 changes: 28 additions & 0 deletions store/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}

// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))
if err != nil {
return nil, err
}
exporter, err := tree.Export()
if err != nil {
return nil, err
}

return &Exporter{
exporter: exporter,
}, nil
}

// Import imports the tree importer at the given version.
func (t *IavlTree) Import(version uint64) (commitment.Importer, error) {
importer, err := t.tree.Import(int64(version))
if err != nil {
return nil, err
}

return &Importer{
importer: importer,
}, nil
}

// Close closes the iavl tree.
func (t *IavlTree) Close() error {
return nil
Expand Down
22 changes: 20 additions & 2 deletions store/commitment/iavl/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,37 @@ import (

dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"cosmossdk.io/log"
"cosmossdk.io/store/v2/commitment"
)

func generateTree(treeType string) *IavlTree {
func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db dbm.DB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, logger)
},
}

suite.Run(t, s)
}

func generateTree() *IavlTree {
cfg := DefaultConfig()
db := dbm.NewMemDB()
return NewIavlTree(db, log.NewNopLogger(), cfg)
}

func TestIavlTree(t *testing.T) {
// generate a new tree
tree := generateTree("iavl")
tree := generateTree()
require.NotNil(t, tree)

initVersion := tree.GetLatestVersion()
Expand Down
150 changes: 149 additions & 1 deletion store/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@ package commitment
import (
"errors"
"fmt"
"io"
"math"

protoio "github.com/cosmos/gogoproto/io"
ics23 "github.com/cosmos/ics23/go"

"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)

var _ store.Committer = (*CommitStore)(nil)
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
)

// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
// key. Each store key reflects dedicated and unique usage within a module. A caller
Expand Down Expand Up @@ -127,6 +135,146 @@ func (c *CommitStore) Prune(version uint64) (ferr error) {
return ferr
}

// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
return fmt.Errorf("the snapshot version must be greater than 0")
}

latestVersion, err := c.GetLatestVersion()
if err != nil {
return err
}
if version > latestVersion {
return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion)
}

for storeKey, tree := range c.multiTrees {
// TODO: check the parallelism of this loop
if err := func() error {
exporter, err := tree.Export(version)
if err != nil {
return fmt.Errorf("failed to export tree for version %d: %w", version, err)
}
defer exporter.Close()

err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{
Item: &snapshotstypes.SnapshotItem_Store{
Store: &snapshotstypes.SnapshotStoreItem{
Name: storeKey,
},
},
})
if err != nil {
return fmt.Errorf("failed to write store name: %w", err)
}

for {
item, err := exporter.Next()
if errors.Is(err, ErrorExportDone) {
break
} else if err != nil {
return fmt.Errorf("failed to get the next export node: %w", err)
}

if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{
Item: &snapshotstypes.SnapshotItem_IAVL{
IAVL: item,
},
}); err != nil {
return fmt.Errorf("failed to write iavl node: %w", err)
}
}

return nil
}(); err != nil {
return err
}
}

return nil
}

// Restore implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshotstypes.SnapshotItem, error) {
var (
importer Importer
snapshotItem snapshotstypes.SnapshotItem
storeKey string
)

loop:
for {
snapshotItem = snapshotstypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err)
}

switch item := snapshotItem.Item.(type) {
case *snapshotstypes.SnapshotItem_Store:
if importer != nil {
if err := importer.Commit(); err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err)
}
importer.Close()
}
storeKey = item.Store.Name
tree := c.multiTrees[storeKey]
if tree == nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey)
}
importer, err = tree.Import(version)
if err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err)
}
defer importer.Close()

case *snapshotstypes.SnapshotItem_IAVL:
if importer == nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item")
}
node := item.IAVL
if node.Height > int32(math.MaxInt8) {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
// Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty.
if node.Key == nil {
node.Key = []byte{}
}
if node.Height == 0 {
if node.Value == nil {
node.Value = []byte{}
}
// If the node is a leaf node, it will be written to the storage.
chStorage <- &store.KVPair{
Key: node.Key,
Value: node.Value,
StoreKey: storeKey,
}
}
err := importer.Add(node)
if err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err)
}
default:
break loop
}
}

if importer != nil {
if err := importer.Commit(); err != nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err)
}
}

return snapshotItem, c.LoadVersion(version)
}

func (c *CommitStore) Close() (ferr error) {
for _, tree := range c.multiTrees {
if err := tree.Close(); err != nil {
Expand Down
Loading

0 comments on commit f6df368

Please sign in to comment.