diff --git a/store/changeset.go b/store/changeset.go index b9ceaa50764f..6982344375af 100644 --- a/store/changeset.go +++ b/store/changeset.go @@ -4,8 +4,9 @@ package store // track writes. Deletion can be denoted by a nil value or explicitly by the // Delete field. type KVPair struct { - Key []byte - Value []byte + Key []byte + Value []byte + StoreKey string // Optional for snapshot restore } type KVPairs []KVPair diff --git a/store/commitment/iavl/exporter.go b/store/commitment/iavl/exporter.go new file mode 100644 index 000000000000..20f00d1a1722 --- /dev/null +++ b/store/commitment/iavl/exporter.go @@ -0,0 +1,40 @@ +package iavl + +import ( + "errors" + + "github.com/cosmos/iavl" + + "cosmossdk.io/store/v2/commitment" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +// Exporter is a wrapper around iavl.Exporter. +type Exporter struct { + exporter *iavl.Exporter +} + +// Next returns the next item in the exporter. +func (e *Exporter) Next() (*snapshotstypes.SnapshotIAVLItem, error) { + item, err := e.exporter.Next() + if err != nil { + if errors.Is(err, iavl.ErrorExportDone) { + return nil, commitment.ErrorExportDone + } + return nil, err + } + + return &snapshotstypes.SnapshotIAVLItem{ + Key: item.Key, + Value: item.Value, + Version: item.Version, + Height: int32(item.Height), + }, nil +} + +// Close closes the exporter. +func (e *Exporter) Close() error { + e.exporter.Close() + + return nil +} diff --git a/store/commitment/iavl/importer.go b/store/commitment/iavl/importer.go new file mode 100644 index 000000000000..6f1b0eedf21f --- /dev/null +++ b/store/commitment/iavl/importer.go @@ -0,0 +1,34 @@ +package iavl + +import ( + "github.com/cosmos/iavl" + + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +// Importer is a wrapper around iavl.Importer. +type Importer struct { + importer *iavl.Importer +} + +// Add adds the given item to the importer. +func (i *Importer) Add(item *snapshotstypes.SnapshotIAVLItem) error { + return i.importer.Add(&iavl.ExportNode{ + Key: item.Key, + Value: item.Value, + Version: item.Version, + Height: int8(item.Height), + }) +} + +// Commit commits the importer. +func (i *Importer) Commit() error { + return i.importer.Commit() +} + +// Close closes the importer. +func (i *Importer) Close() error { + i.importer.Close() + + return nil +} diff --git a/store/commitment/iavl/tree.go b/store/commitment/iavl/tree.go index 5388769dba74..419240b5b365 100644 --- a/store/commitment/iavl/tree.go +++ b/store/commitment/iavl/tree.go @@ -77,6 +77,34 @@ func (t *IavlTree) Prune(version uint64) error { return t.tree.DeleteVersionsTo(int64(version)) } +// Export exports the tree exporter at the given version. +func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) { + tree, err := t.tree.GetImmutable(int64(version)) + if err != nil { + return nil, err + } + exporter, err := tree.Export() + if err != nil { + return nil, err + } + + return &Exporter{ + exporter: exporter, + }, nil +} + +// Import imports the tree importer at the given version. +func (t *IavlTree) Import(version uint64) (commitment.Importer, error) { + importer, err := t.tree.Import(int64(version)) + if err != nil { + return nil, err + } + + return &Importer{ + importer: importer, + }, nil +} + // Close closes the iavl tree. func (t *IavlTree) Close() error { return nil diff --git a/store/commitment/iavl/tree_test.go b/store/commitment/iavl/tree_test.go index 9b73c0b7b560..a1ea79bcc7c8 100644 --- a/store/commitment/iavl/tree_test.go +++ b/store/commitment/iavl/tree_test.go @@ -5,11 +5,29 @@ import ( dbm "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "cosmossdk.io/log" + "cosmossdk.io/store/v2/commitment" ) -func generateTree(treeType string) *IavlTree { +func TestCommitterSuite(t *testing.T) { + s := &commitment.CommitStoreTestSuite{ + NewStore: func(db dbm.DB, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) { + multiTrees := make(map[string]commitment.Tree) + cfg := DefaultConfig() + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(db, []byte(storeKey)) + multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg) + } + return commitment.NewCommitStore(multiTrees, logger) + }, + } + + suite.Run(t, s) +} + +func generateTree() *IavlTree { cfg := DefaultConfig() db := dbm.NewMemDB() return NewIavlTree(db, log.NewNopLogger(), cfg) @@ -17,7 +35,7 @@ func generateTree(treeType string) *IavlTree { func TestIavlTree(t *testing.T) { // generate a new tree - tree := generateTree("iavl") + tree := generateTree() require.NotNil(t, tree) initVersion := tree.GetLatestVersion() diff --git a/store/commitment/store.go b/store/commitment/store.go index 32952ef58f89..ad391b1df3a2 100644 --- a/store/commitment/store.go +++ b/store/commitment/store.go @@ -3,14 +3,22 @@ package commitment import ( "errors" "fmt" + "io" + "math" + protoio "github.com/cosmos/gogoproto/io" ics23 "github.com/cosmos/ics23/go" "cosmossdk.io/log" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) -var _ store.Committer = (*CommitStore)(nil) +var ( + _ store.Committer = (*CommitStore)(nil) + _ snapshots.CommitSnapshotter = (*CommitStore)(nil) +) // CommitStore is a wrapper around multiple Tree objects mapped by a unique store // key. Each store key reflects dedicated and unique usage within a module. A caller @@ -127,6 +135,146 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { return ferr } +// Snapshot implements snapshotstypes.CommitSnapshotter. +func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error { + if version == 0 { + return fmt.Errorf("the snapshot version must be greater than 0") + } + + latestVersion, err := c.GetLatestVersion() + if err != nil { + return err + } + if version > latestVersion { + return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) + } + + for storeKey, tree := range c.multiTrees { + // TODO: check the parallelism of this loop + if err := func() error { + exporter, err := tree.Export(version) + if err != nil { + return fmt.Errorf("failed to export tree for version %d: %w", version, err) + } + defer exporter.Close() + + err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Store{ + Store: &snapshotstypes.SnapshotStoreItem{ + Name: storeKey, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to write store name: %w", err) + } + + for { + item, err := exporter.Next() + if errors.Is(err, ErrorExportDone) { + break + } else if err != nil { + return fmt.Errorf("failed to get the next export node: %w", err) + } + + if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_IAVL{ + IAVL: item, + }, + }); err != nil { + return fmt.Errorf("failed to write iavl node: %w", err) + } + } + + return nil + }(); err != nil { + return err + } + } + + return nil +} + +// Restore implements snapshotstypes.CommitSnapshotter. +func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshotstypes.SnapshotItem, error) { + var ( + importer Importer + snapshotItem snapshotstypes.SnapshotItem + storeKey string + ) + +loop: + for { + snapshotItem = snapshotstypes.SnapshotItem{} + err := protoReader.ReadMsg(&snapshotItem) + if errors.Is(err, io.EOF) { + break + } else if err != nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) + } + + switch item := snapshotItem.Item.(type) { + case *snapshotstypes.SnapshotItem_Store: + if importer != nil { + if err := importer.Commit(); err != nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + } + importer.Close() + } + storeKey = item.Store.Name + tree := c.multiTrees[storeKey] + if tree == nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) + } + importer, err = tree.Import(version) + if err != nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) + } + defer importer.Close() + + case *snapshotstypes.SnapshotItem_IAVL: + if importer == nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") + } + node := item.IAVL + if node.Height > int32(math.MaxInt8) { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", + item.IAVL.Height, math.MaxInt8) + } + // Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does + // not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. + if node.Key == nil { + node.Key = []byte{} + } + if node.Height == 0 { + if node.Value == nil { + node.Value = []byte{} + } + // If the node is a leaf node, it will be written to the storage. + chStorage <- &store.KVPair{ + Key: node.Key, + Value: node.Value, + StoreKey: storeKey, + } + } + err := importer.Add(node) + if err != nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) + } + default: + break loop + } + } + + if importer != nil { + if err := importer.Commit(); err != nil { + return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) + } + } + + return snapshotItem, c.LoadVersion(version) +} + func (c *CommitStore) Close() (ferr error) { for _, tree := range c.multiTrees { if err := tree.Close(); err != nil { diff --git a/store/commitment/store_test_suite.go b/store/commitment/store_test_suite.go new file mode 100644 index 000000000000..370958cb3f10 --- /dev/null +++ b/store/commitment/store_test_suite.go @@ -0,0 +1,121 @@ +package commitment + +import ( + "fmt" + "io" + "sync" + + dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/suite" + + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +const ( + storeKey1 = "store1" + storeKey2 = "store2" +) + +// CommitStoreTestSuite is a test suite to be used for all tree backends. +type CommitStoreTestSuite struct { + suite.Suite + + NewStore func(db dbm.DB, storeKeys []string, logger log.Logger) (*CommitStore, error) +} + +func (s *CommitStoreTestSuite) TestSnapshotter() { + storeKeys := []string{storeKey1, storeKey2} + commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) + s.Require().NoError(err) + + latestVersion := uint64(10) + kvCount := 10 + for i := uint64(1); i <= latestVersion; i++ { + kvPairs := make(map[string]store.KVPairs) + for _, storeKey := range storeKeys { + kvPairs[storeKey] = store.KVPairs{} + for j := 0; j < kvCount; j++ { + key := []byte(fmt.Sprintf("key-%d-%d", i, j)) + value := []byte(fmt.Sprintf("value-%d-%d", i, j)) + kvPairs[storeKey] = append(kvPairs[storeKey], store.KVPair{Key: key, Value: value}) + } + } + s.Require().NoError(commitStore.WriteBatch(store.NewChangeset(kvPairs))) + + _, err = commitStore.Commit() + s.Require().NoError(err) + } + + latestStoreInfos := commitStore.WorkingStoreInfos(latestVersion) + s.Require().Equal(len(storeKeys), len(latestStoreInfos)) + + // create a snapshot + dummyExtensionItem := snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Extension{ + Extension: &snapshotstypes.SnapshotExtensionMeta{ + Name: "test", + Format: 1, + }, + }, + } + + targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger()) + s.Require().NoError(err) + + chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) + go func() { + streamWriter := snapshots.NewStreamWriter(chunks) + s.Require().NotNil(streamWriter) + defer streamWriter.Close() + err := commitStore.Snapshot(latestVersion, streamWriter) + s.Require().NoError(err) + // write an extension metadata + err = streamWriter.WriteMsg(&dummyExtensionItem) + s.Require().NoError(err) + }() + + streamReader, err := snapshots.NewStreamReader(chunks) + s.Require().NoError(err) + chStorage := make(chan *store.KVPair, 100) + leaves := make(map[string]string) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + for kv := range chStorage { + leaves[fmt.Sprintf("%s_%s", kv.StoreKey, kv.Key)] = string(kv.Value) + } + wg.Done() + }() + nextItem, err := targetStore.Restore(latestVersion, snapshotstypes.CurrentFormat, streamReader, chStorage) + s.Require().NoError(err) + s.Require().Equal(*dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) + + close(chStorage) + wg.Wait() + s.Require().Equal(len(storeKeys)*kvCount*int(latestVersion), len(leaves)) + for _, storeKey := range storeKeys { + for i := 1; i <= int(latestVersion); i++ { + for j := 0; j < kvCount; j++ { + key := fmt.Sprintf("%s_key-%d-%d", storeKey, i, j) + s.Require().Equal(leaves[key], fmt.Sprintf("value-%d-%d", i, j)) + } + } + } + + // check the restored tree hash + targetStoreInfos := targetStore.WorkingStoreInfos(latestVersion) + s.Require().Equal(len(storeKeys), len(targetStoreInfos)) + for _, storeInfo := range targetStoreInfos { + matched := false + for _, latestStoreInfo := range latestStoreInfos { + if storeInfo.Name == latestStoreInfo.Name { + s.Require().Equal(latestStoreInfo.GetHash(), storeInfo.GetHash()) + matched = true + } + } + s.Require().True(matched) + } +} diff --git a/store/commitment/tree.go b/store/commitment/tree.go index b55c90c5fad1..67acffdf3099 100644 --- a/store/commitment/tree.go +++ b/store/commitment/tree.go @@ -1,11 +1,17 @@ package commitment import ( + "errors" "io" ics23 "github.com/cosmos/ics23/go" + + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) +// ErrorExportDone is returned by Exporter.Next() when all items have been exported. +var ErrorExportDone = errors.New("export is complete") + // Tree is the interface that wraps the basic Tree methods. type Tree interface { Set(key, value []byte) error @@ -16,6 +22,23 @@ type Tree interface { Commit() ([]byte, error) GetProof(version uint64, key []byte) (*ics23.CommitmentProof, error) Prune(version uint64) error + Export(version uint64) (Exporter, error) + Import(version uint64) (Importer, error) + + io.Closer +} + +// Exporter is the interface that wraps the basic Export methods. +type Exporter interface { + Next() (*snapshotstypes.SnapshotIAVLItem, error) + + io.Closer +} + +// Importer is the interface that wraps the basic Import methods. +type Importer interface { + Add(*snapshotstypes.SnapshotIAVLItem) error + Commit() error io.Closer } diff --git a/store/kv/branch/store_test.go b/store/kv/branch/store_test.go index 4353f280f65f..424e396e0b2e 100644 --- a/store/kv/branch/store_test.go +++ b/store/kv/branch/store_test.go @@ -8,6 +8,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/kv/branch" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -25,7 +26,8 @@ func TestStorageTestSuite(t *testing.T) { } func (s *StoreTestSuite) SetupTest() { - storage, err := sqlite.New(s.T().TempDir()) + sqliteDB, err := sqlite.New(s.T().TempDir()) + ss := storage.NewStorageStore(sqliteDB) s.Require().NoError(err) cs := store.NewChangeset(map[string]store.KVPairs{storeKey: {}}) @@ -36,12 +38,12 @@ func (s *StoreTestSuite) SetupTest() { cs.AddKVPair(storeKey, store.KVPair{Key: []byte(key), Value: []byte(val)}) } - s.Require().NoError(storage.ApplyChangeset(1, cs)) + s.Require().NoError(ss.ApplyChangeset(1, cs)) - kvStore, err := branch.New(storeKey, storage) + kvStore, err := branch.New(storeKey, ss) s.Require().NoError(err) - s.storage = storage + s.storage = ss s.kvStore = kvStore } diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index 47cfd3025438..65e7fe1b09ec 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -11,6 +11,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -34,8 +35,9 @@ func (s *PruningTestSuite) SetupTest() { logger = log.NewTestLogger(s.T()) } - ss, err := sqlite.New(s.T().TempDir()) + sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB) tree := iavl.NewIavlTree(dbm.NewMemDB(), log.NewNopLogger(), iavl.DefaultConfig()) sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, logger) @@ -58,7 +60,7 @@ func (s *PruningTestSuite) TestPruning() { latestVersion := uint64(100) - // write 10 batches + // write batches for i := uint64(0); i < latestVersion; i++ { version := i + 1 diff --git a/store/root/store_test.go b/store/root/store_test.go index 0c94ed158ffc..ad020fe87193 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/commitment/iavl" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/pruning" "cosmossdk.io/store/v2/storage/sqlite" ) @@ -29,11 +30,12 @@ func TestStorageTestSuite(t *testing.T) { func (s *RootStoreTestSuite) SetupTest() { noopLog := log.NewNopLogger() - ss, err := sqlite.New(s.T().TempDir()) + sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB) tree := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) - sc, err := commitment.NewCommitStore(map[string]commitment.Tree{"default": tree}, noopLog) + sc, err := commitment.NewCommitStore(map[string]commitment.Tree{defaultStoreKey: tree}, noopLog) s.Require().NoError(err) rs, err := New(noopLog, ss, sc, pruning.DefaultOptions(), pruning.DefaultOptions(), nil) diff --git a/store/snapshots/chunk.go b/store/snapshots/chunk.go index c70fc074b0e0..874bf966871b 100644 --- a/store/snapshots/chunk.go +++ b/store/snapshots/chunk.go @@ -6,7 +6,7 @@ import ( "cosmossdk.io/errors" "cosmossdk.io/store/v2" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) // ChunkWriter reads an input stream, splits it into fixed-size chunks, and writes them to a @@ -169,15 +169,15 @@ func DrainChunks(chunks <-chan io.ReadCloser) { // ValidRestoreHeight will check height is valid for snapshot restore or not func ValidRestoreHeight(format uint32, height uint64) error { - if format != snapshottypes.CurrentFormat { - return errors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format) + if format != snapshotstypes.CurrentFormat { + return errors.Wrapf(snapshotstypes.ErrUnknownFormat, "format %v", format) } if height == 0 { return errors.Wrap(store.ErrLogic, "cannot restore snapshot at height 0") } if height > uint64(math.MaxInt64) { - return errors.Wrapf(snapshottypes.ErrInvalidMetadata, + return errors.Wrapf(snapshotstypes.ErrInvalidMetadata, "snapshot height %v cannot exceed %v", height, int64(math.MaxInt64)) } diff --git a/store/snapshots/helpers_test.go b/store/snapshots/helpers_test.go index 7c6cf04bcd72..9711ab538347 100644 --- a/store/snapshots/helpers_test.go +++ b/store/snapshots/helpers_test.go @@ -17,8 +17,9 @@ import ( errorsmod "cosmossdk.io/errors" "cosmossdk.io/log" + "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/snapshots" - snapshottypes "cosmossdk.io/store/v2/snapshots/types" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" ) func checksums(slice [][]byte) [][]byte { @@ -62,7 +63,7 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte { } // snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks. -func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]byte { +func snapshotItems(items [][]byte, ext snapshots.ExtensionSnapshotter) [][]byte { // copy the same parameters from the code snapshotChunkSize := uint64(10e6) snapshotBufferSize := int(snapshotChunkSize) @@ -74,19 +75,19 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b zWriter, _ := zlib.NewWriterLevel(bufWriter, 7) protoWriter := protoio.NewDelimitedWriter(zWriter) for _, item := range items { - _ = snapshottypes.WriteExtensionPayload(protoWriter, item) + _ = snapshotstypes.WriteExtensionPayload(protoWriter, item) } // write extension metadata - _ = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ - Item: &snapshottypes.SnapshotItem_Extension{ - Extension: &snapshottypes.SnapshotExtensionMeta{ + _ = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ + Item: &snapshotstypes.SnapshotItem_Extension{ + Extension: &snapshotstypes.SnapshotExtensionMeta{ Name: ext.SnapshotName(), Format: ext.SnapshotFormat(), }, }, }) _ = ext.SnapshotExtension(0, func(payload []byte) error { - return snapshottypes.WriteExtensionPayload(protoWriter, payload) + return snapshotstypes.WriteExtensionPayload(protoWriter, payload) }) _ = protoWriter.Close() _ = bufWriter.Flush() @@ -105,23 +106,21 @@ func snapshotItems(items [][]byte, ext snapshottypes.ExtensionSnapshotter) [][]b return chunks } -type mockSnapshotter struct { - items [][]byte - prunedHeights map[int64]struct{} - snapshotInterval uint64 +type mockCommitSnapshotter struct { + items [][]byte } -func (m *mockSnapshotter) Restore( - height uint64, format uint32, protoReader protoio.Reader, -) (snapshottypes.SnapshotItem, error) { +func (m *mockCommitSnapshotter) Restore( + height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, +) (snapshotstypes.SnapshotItem, error) { if format == 0 { - return snapshottypes.SnapshotItem{}, snapshottypes.ErrUnknownFormat + return snapshotstypes.SnapshotItem{}, snapshotstypes.ErrUnknownFormat } if m.items != nil { - return snapshottypes.SnapshotItem{}, errors.New("already has contents") + return snapshotstypes.SnapshotItem{}, errors.New("already has contents") } - var item snapshottypes.SnapshotItem + var item snapshotstypes.SnapshotItem m.items = [][]byte{} for { item.Reset() @@ -129,7 +128,7 @@ func (m *mockSnapshotter) Restore( if err == io.EOF { break } else if err != nil { - return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message") + return snapshotstypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message") } payload := item.GetExtensionPayload() if payload == nil { @@ -141,65 +140,49 @@ func (m *mockSnapshotter) Restore( return item, nil } -func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { +func (m *mockCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { for _, item := range m.items { - if err := snapshottypes.WriteExtensionPayload(protoWriter, item); err != nil { + if err := snapshotstypes.WriteExtensionPayload(protoWriter, item); err != nil { return err } } return nil } -func (m *mockSnapshotter) SnapshotFormat() uint32 { - return snapshottypes.CurrentFormat +func (m *mockCommitSnapshotter) SnapshotFormat() uint32 { + return snapshotstypes.CurrentFormat } -func (m *mockSnapshotter) SupportedFormats() []uint32 { - return []uint32{snapshottypes.CurrentFormat} +func (m *mockCommitSnapshotter) SupportedFormats() []uint32 { + return []uint32{snapshotstypes.CurrentFormat} } -func (m *mockSnapshotter) PruneSnapshotHeight(height int64) { - m.prunedHeights[height] = struct{}{} -} - -func (m *mockSnapshotter) GetSnapshotInterval() uint64 { - return m.snapshotInterval -} +type mockStorageSnapshotter struct{} -func (m *mockSnapshotter) SetSnapshotInterval(snapshotInterval uint64) { - m.snapshotInterval = snapshotInterval +func (m *mockStorageSnapshotter) Restore(version uint64, chStorage <-chan *store.KVPair) error { + return nil } -type mockErrorSnapshotter struct{} +type mockErrorCommitSnapshotter struct{} -var _ snapshottypes.Snapshotter = (*mockErrorSnapshotter)(nil) +var _ snapshots.CommitSnapshotter = (*mockErrorCommitSnapshotter)(nil) -func (m *mockErrorSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { +func (m *mockErrorCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { return errors.New("mock snapshot error") } -func (m *mockErrorSnapshotter) Restore( - height uint64, format uint32, protoReader protoio.Reader, -) (snapshottypes.SnapshotItem, error) { - return snapshottypes.SnapshotItem{}, errors.New("mock restore error") -} - -func (m *mockErrorSnapshotter) SnapshotFormat() uint32 { - return snapshottypes.CurrentFormat -} - -func (m *mockErrorSnapshotter) SupportedFormats() []uint32 { - return []uint32{snapshottypes.CurrentFormat} -} - -func (m *mockErrorSnapshotter) PruneSnapshotHeight(height int64) { +func (m *mockErrorCommitSnapshotter) Restore( + height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, +) (snapshotstypes.SnapshotItem, error) { + return snapshotstypes.SnapshotItem{}, errors.New("mock restore error") } -func (m *mockErrorSnapshotter) GetSnapshotInterval() uint64 { - return 0 +func (m *mockErrorCommitSnapshotter) SnapshotFormat() uint32 { + return snapshotstypes.CurrentFormat } -func (m *mockErrorSnapshotter) SetSnapshotInterval(snapshotInterval uint64) { +func (m *mockErrorCommitSnapshotter) SupportedFormats() []uint32 { + return []uint32{snapshotstypes.CurrentFormat} } // setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1. @@ -208,10 +191,8 @@ func setupBusyManager(t *testing.T) *snapshots.Manager { t.Helper() store, err := snapshots.NewStore(db.NewMemDB(), t.TempDir()) require.NoError(t, err) - hung := newHungSnapshotter() - hung.SetSnapshotInterval(opts.Interval) - mgr := snapshots.NewManager(store, opts, hung, nil, log.NewNopLogger()) - require.Equal(t, opts.Interval, hung.snapshotInterval) + hung := newHungCommitSnapshotter() + mgr := snapshots.NewManager(store, opts, hung, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) // Channel to ensure the test doesn't finish until the goroutine is done. // Without this, there are intermittent test failures about @@ -222,8 +203,6 @@ func setupBusyManager(t *testing.T) *snapshots.Manager { defer close(done) _, err := mgr.Create(1) require.NoError(t, err) - _, didPruneHeight := hung.prunedHeights[1] - require.True(t, didPruneHeight) }() time.Sleep(10 * time.Millisecond) @@ -236,40 +215,29 @@ func setupBusyManager(t *testing.T) *snapshots.Manager { return mgr } -// hungSnapshotter can be used to test operations in progress. Call close to end the snapshot. -type hungSnapshotter struct { - ch chan struct{} - prunedHeights map[int64]struct{} - snapshotInterval uint64 +// hungCommitSnapshotter can be used to test operations in progress. Call close to end the snapshot. +type hungCommitSnapshotter struct { + ch chan struct{} } -func newHungSnapshotter() *hungSnapshotter { - return &hungSnapshotter{ - ch: make(chan struct{}), - prunedHeights: make(map[int64]struct{}), +func newHungCommitSnapshotter() *hungCommitSnapshotter { + return &hungCommitSnapshotter{ + ch: make(chan struct{}), } } -func (m *hungSnapshotter) Close() { +func (m *hungCommitSnapshotter) Close() { close(m.ch) } -func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { +func (m *hungCommitSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error { <-m.ch return nil } -func (m *hungSnapshotter) PruneSnapshotHeight(height int64) { - m.prunedHeights[height] = struct{}{} -} - -func (m *hungSnapshotter) SetSnapshotInterval(snapshotInterval uint64) { - m.snapshotInterval = snapshotInterval -} - -func (m *hungSnapshotter) Restore( - height uint64, format uint32, protoReader protoio.Reader, -) (snapshottypes.SnapshotItem, error) { +func (m *hungCommitSnapshotter) Restore( + height uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair, +) (snapshotstypes.SnapshotItem, error) { panic("not implemented") } @@ -299,16 +267,16 @@ func (s *extSnapshotter) SupportedFormats() []uint32 { return []uint32{1} } -func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshottypes.ExtensionPayloadWriter) error { +func (s *extSnapshotter) SnapshotExtension(height uint64, payloadWriter snapshots.ExtensionPayloadWriter) error { for _, i := range s.state { - if err := payloadWriter(snapshottypes.Uint64ToBigEndian(i)); err != nil { + if err := payloadWriter(snapshotstypes.Uint64ToBigEndian(i)); err != nil { return err } } return nil } -func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshottypes.ExtensionPayloadReader) error { +func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadReader snapshots.ExtensionPayloadReader) error { for { payload, err := payloadReader() if err == io.EOF { @@ -316,7 +284,7 @@ func (s *extSnapshotter) RestoreExtension(height uint64, format uint32, payloadR } else if err != nil { return err } - s.state = append(s.state, snapshottypes.BigEndianToUint64(payload)) + s.state = append(s.state, snapshotstypes.BigEndianToUint64(payload)) } // finalize restoration return nil diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 8dd1381e1e2b..1c2d4ec65a31 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -31,13 +31,16 @@ import ( // 2. io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary // errors via io.Pipe.CloseWithError(). type Manager struct { - extensions map[string]types.ExtensionSnapshotter + extensions map[string]ExtensionSnapshotter // store is the snapshot store where all completed snapshots are persisted. store *Store - opts types.SnapshotOptions - // multistore is the store from which snapshots are taken. - multistore types.Snapshotter - logger log.Logger + opts SnapshotOptions + // commitSnapshotter is the snapshotter for the commitment state. + commitSnapshotter CommitSnapshotter + // storageSnapshotter is the snapshotter for the storage state. + storageSnapshotter StorageSnapshotter + + logger log.Logger mtx sync.Mutex operation operation @@ -62,8 +65,9 @@ const ( opPrune operation = "prune" opRestore operation = "restore" - chunkBufferSize = 4 - chunkIDBufferSize = 1024 + chunkBufferSize = 4 + chunkIDBufferSize = 1024 + defaultStorageChannelBufferSize = 1024 snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit ) @@ -71,23 +75,24 @@ const ( var ErrOptsZeroSnapshotInterval = errors.New("snaphot-interval must not be 0") // NewManager creates a new manager. -func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snapshotter, extensions map[string]types.ExtensionSnapshotter, logger log.Logger) *Manager { +func NewManager(store *Store, opts SnapshotOptions, commitSnapshotter CommitSnapshotter, storageSnapshotter StorageSnapshotter, extensions map[string]ExtensionSnapshotter, logger log.Logger) *Manager { if extensions == nil { - extensions = map[string]types.ExtensionSnapshotter{} + extensions = map[string]ExtensionSnapshotter{} } return &Manager{ - store: store, - opts: opts, - multistore: multistore, - extensions: extensions, - logger: logger, + store: store, + opts: opts, + commitSnapshotter: commitSnapshotter, + storageSnapshotter: storageSnapshotter, + extensions: extensions, + logger: logger.With("module", "snapshot_manager"), } } // RegisterExtensions register extension snapshotters to manager -func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error { +func (m *Manager) RegisterExtensions(extensions ...ExtensionSnapshotter) error { if m.extensions == nil { - m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions)) + m.extensions = make(map[string]ExtensionSnapshotter, len(extensions)) } for _, extension := range extensions { name := extension.SnapshotName() @@ -161,11 +166,9 @@ func (m *Manager) GetSnapshotBlockRetentionHeights() int64 { // Create creates a snapshot and returns its metadata. func (m *Manager) Create(height uint64) (*types.Snapshot, error) { if m == nil { - return nil, errorsmod.Wrap(store.ErrLogic, "no snapshot store configured") + return nil, errorsmod.Wrap(store.ErrLogic, "Snapshot Manager is nil") } - defer m.multistore.PruneSnapshotHeight(int64(height)) - err := m.begin(opSnapshot) if err != nil { return nil, err @@ -201,7 +204,7 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) { } }() - if err := m.multistore.Snapshot(height, streamWriter); err != nil { + if err := m.commitSnapshotter.Snapshot(height, streamWriter); err != nil { streamWriter.CloseWithError(err) return } @@ -363,7 +366,20 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io. return payload.Payload, nil } - nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader) + // chStorage is the channel to pass the KV pairs to the storage snapshotter. + chStorage := make(chan *store.KVPair, defaultStorageChannelBufferSize) + defer close(chStorage) + + storageErrs := make(chan error, 1) + go func() { + defer close(storageErrs) + err := m.storageSnapshotter.Restore(snapshot.Height, chStorage) + if err != nil { + storageErrs <- err + } + }() + + nextItem, err = m.commitSnapshotter.Restore(snapshot.Height, snapshot.Format, streamReader, chStorage) if err != nil { return errorsmod.Wrap(err, "multistore restore") } @@ -393,6 +409,12 @@ func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io. return errorsmod.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name) } } + + // wait for storage snapshotter to complete + if err := <-storageErrs; err != nil { + return errorsmod.Wrap(err, "storage snapshotter") + } + return nil } @@ -495,7 +517,7 @@ func (m *Manager) sortedExtensionNames() []string { } // IsFormatSupported returns if the snapshotter supports restoration from given format. -func IsFormatSupported(snapshotter types.ExtensionSnapshotter, format uint32) bool { +func IsFormatSupported(snapshotter ExtensionSnapshotter, format uint32) bool { for _, i := range snapshotter.SupportedFormats() { if i == format { return true diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index c3276d01ed7e..af5b6eb1e130 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -13,14 +13,13 @@ import ( "cosmossdk.io/store/v2/snapshots/types" ) -var opts = types.NewSnapshotOptions(1500, 2) +var opts = snapshots.NewSnapshotOptions(1500, 2) func TestManager_List(t *testing.T) { store := setupStore(t) - snapshotter := &mockSnapshotter{} - snapshotter.SetSnapshotInterval(opts.Interval) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) - require.Equal(t, opts.Interval, snapshotter.GetSnapshotInterval()) + commitSnapshotter := &mockCommitSnapshotter{} + storageSnapshotter := &mockStorageSnapshotter{} + manager := snapshots.NewManager(store, opts, commitSnapshotter, storageSnapshotter, nil, log.NewNopLogger()) mgrList, err := manager.List() require.NoError(t, err) @@ -41,7 +40,7 @@ func TestManager_List(t *testing.T) { func TestManager_LoadChunk(t *testing.T) { store := setupStore(t) - manager := snapshots.NewManager(store, opts, &mockSnapshotter{}, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, &mockCommitSnapshotter{}, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) // Existing chunk should return body chunk, err := manager.LoadChunk(2, 1, 1) @@ -67,14 +66,13 @@ func TestManager_Take(t *testing.T) { {4, 5, 6}, {7, 8, 9}, } - snapshotter := &mockSnapshotter{ - items: items, - prunedHeights: make(map[int64]struct{}), + commitSnapshotter := &mockCommitSnapshotter{ + items: items, } extSnapshotter := newExtSnapshotter(10) expectChunks := snapshotItems(items, extSnapshotter) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, commitSnapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) err := manager.RegisterExtensions(extSnapshotter) require.NoError(t, err) @@ -85,18 +83,14 @@ func TestManager_Take(t *testing.T) { // creating a snapshot at a lower height than the latest should error _, err = manager.Create(3) require.Error(t, err) - _, didPruneHeight := snapshotter.prunedHeights[3] - require.True(t, didPruneHeight) // creating a snapshot at a higher height should be fine, and should return it snapshot, err := manager.Create(5) require.NoError(t, err) - _, didPruneHeight = snapshotter.prunedHeights[5] - require.True(t, didPruneHeight) assert.Equal(t, &types.Snapshot{ Height: 5, - Format: snapshotter.SnapshotFormat(), + Format: commitSnapshotter.SnapshotFormat(), Chunks: 1, Hash: []uint8{0xc5, 0xf7, 0xfe, 0xea, 0xd3, 0x4d, 0x3e, 0x87, 0xff, 0x41, 0xa2, 0x27, 0xfa, 0xcb, 0x38, 0x17, 0xa, 0x5, 0xeb, 0x27, 0x4e, 0x16, 0x5e, 0xf3, 0xb2, 0x8b, 0x47, 0xd1, 0xe6, 0x94, 0x7e, 0x8b}, Metadata: types.Metadata{ @@ -117,9 +111,7 @@ func TestManager_Take(t *testing.T) { func TestManager_Prune(t *testing.T) { store := setupStore(t) - snapshotter := &mockSnapshotter{} - snapshotter.SetSnapshotInterval(opts.Interval) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, &mockCommitSnapshotter{}, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) pruned, err := manager.Prune(2) require.NoError(t, err) @@ -137,11 +129,9 @@ func TestManager_Prune(t *testing.T) { func TestManager_Restore(t *testing.T) { store := setupStore(t) - target := &mockSnapshotter{ - prunedHeights: make(map[int64]struct{}), - } + target := &mockCommitSnapshotter{} extSnapshotter := newExtSnapshotter(0) - manager := snapshots.NewManager(store, opts, target, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, target, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) err := manager.RegisterExtensions(extSnapshotter) require.NoError(t, err) @@ -191,8 +181,6 @@ func TestManager_Restore(t *testing.T) { // While the restore is in progress, any other operations fail _, err = manager.Create(4) require.Error(t, err) - _, didPruneHeight := target.prunedHeights[4] - require.True(t, didPruneHeight) _, err = manager.Prune(1) require.Error(t, err) @@ -248,10 +236,10 @@ func TestManager_Restore(t *testing.T) { } func TestManager_TakeError(t *testing.T) { - snapshotter := &mockErrorSnapshotter{} + snapshotter := &mockErrorCommitSnapshotter{} store, err := snapshots.NewStore(db.NewMemDB(), GetTempDir(t)) require.NoError(t, err) - manager := snapshots.NewManager(store, opts, snapshotter, nil, log.NewNopLogger()) + manager := snapshots.NewManager(store, opts, snapshotter, &mockStorageSnapshotter{}, nil, log.NewNopLogger()) _, err = manager.Create(1) require.Error(t, err) diff --git a/store/snapshots/types/options.go b/store/snapshots/options.go similarity index 96% rename from store/snapshots/types/options.go rename to store/snapshots/options.go index 9c6ec79a11e2..565a0ce105de 100644 --- a/store/snapshots/types/options.go +++ b/store/snapshots/options.go @@ -1,4 +1,4 @@ -package types +package snapshots // SnapshotOptions defines the snapshot strategy used when determining which // heights are snapshotted for state sync. diff --git a/store/snapshots/types/snapshotter.go b/store/snapshots/snapshotter.go similarity index 55% rename from store/snapshots/types/snapshotter.go rename to store/snapshots/snapshotter.go index de9fcfe3d3ff..7c8321f3c838 100644 --- a/store/snapshots/types/snapshotter.go +++ b/store/snapshots/snapshotter.go @@ -1,29 +1,26 @@ -package types +package snapshots import ( protoio "github.com/cosmos/gogoproto/io" + + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots/types" ) -// Snapshotter is something that can create and restore snapshots, consisting of streamed binary -// chunks - all of which must be read from the channel and closed. If an unsupported format is -// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf). -type Snapshotter interface { - // Snapshot writes snapshot items into the protobuf writer. - Snapshot(height uint64, protoWriter protoio.Writer) error - - // PruneSnapshotHeight prunes the given height according to the prune strategy. - // If PruneNothing, this is a no-op. - // If other strategy, this height is persisted until it is - // less than - KeepRecent and % Interval == 0 - PruneSnapshotHeight(height int64) - - // SetSnapshotInterval sets the interval at which the snapshots are taken. - // It is used by the store that implements the Snapshotter interface - // to determine which heights to retain until after the snapshot is complete. - SetSnapshotInterval(snapshotInterval uint64) - - // Restore restores a state snapshot, taking the reader of protobuf message stream as input. - Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error) +// CommitSnapshotter defines an API for creating and restoring snapshots of the +// commitment state. +type CommitSnapshotter interface { + // Snapshot writes a snapshot of the commitment state at the given version. + Snapshot(version uint64, protoWriter protoio.Writer) error + + // Restore restores the commitment state from the snapshot reader. + Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (types.SnapshotItem, error) +} + +// StorageSnapshotter defines an API for restoring snapshots of the storage state. +type StorageSnapshotter interface { + // Restore restores the storage state from the given channel. + Restore(version uint64, chStorage <-chan *store.KVPair) error } // ExtensionPayloadReader read extension payloads, diff --git a/store/storage/database.go b/store/storage/database.go new file mode 100644 index 000000000000..884981f6138f --- /dev/null +++ b/store/storage/database.go @@ -0,0 +1,25 @@ +package storage + +import ( + "io" + + "cosmossdk.io/store/v2" +) + +// Database is an interface that wraps the storage database methods. A wrapper +// is useful for instances where you want to perform logic that is identical for all SS +// backends, such as restoring snapshots. +type Database interface { + NewBatch(version uint64) (store.Batch, error) + Has(storeKey string, version uint64, key []byte) (bool, error) + Get(storeKey string, version uint64, key []byte) ([]byte, error) + GetLatestVersion() (uint64, error) + SetLatestVersion(version uint64) error + + Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) + ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) + + Prune(version uint64) error + + io.Closer +} diff --git a/store/storage/pebbledb/db.go b/store/storage/pebbledb/db.go index 4e61857f9352..42b9a2812ca4 100644 --- a/store/storage/pebbledb/db.go +++ b/store/storage/pebbledb/db.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/pebble" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" ) const ( @@ -25,7 +26,7 @@ const ( tombstoneVal = "TOMBSTONE" ) -var _ store.VersionedDatabase = (*Database)(nil) +var _ storage.Database = (*Database)(nil) type Database struct { storage *pebble.DB @@ -92,6 +93,15 @@ func (db *Database) Close() error { return err } +func (db *Database) NewBatch(version uint64) (store.Batch, error) { + b, err := NewBatch(db.storage, version, db.sync) + if err != nil { + return nil, err + } + + return b, nil +} + func (db *Database) SetLatestVersion(version uint64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], version) @@ -175,29 +185,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by return nil, nil } -func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { - b, err := NewBatch(db.storage, version, db.sync) - if err != nil { - return err - } - - for storeKey, pairs := range cs.Pairs { - for _, kvPair := range pairs { - if kvPair.Value == nil { - if err := b.Delete(storeKey, kvPair.Key); err != nil { - return err - } - } else { - if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - } - } - } - - return b.Write() -} - // Prune removes all versions of all keys that are <= the given version. // // Note, the implementation of this method is inefficient and can be potentially diff --git a/store/storage/pebbledb/db_test.go b/store/storage/pebbledb/db_test.go index 934660042167..a8310386bc37 100644 --- a/store/storage/pebbledb/db_test.go +++ b/store/storage/pebbledb/db_test.go @@ -19,7 +19,7 @@ func TestStorageTestSuite(t *testing.T) { db.SetSync(false) } - return db, err + return storage.NewStorageStore(db), err }, EmptyBatchSize: 12, } diff --git a/store/storage/rocksdb/db.go b/store/storage/rocksdb/db.go index d73fd29be5ee..d1806cce1e2d 100644 --- a/store/storage/rocksdb/db.go +++ b/store/storage/rocksdb/db.go @@ -12,6 +12,7 @@ import ( "golang.org/x/exp/slices" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/util" ) @@ -23,7 +24,7 @@ const ( ) var ( - _ store.VersionedDatabase = (*Database)(nil) + _ storage.Database = (*Database)(nil) defaultWriteOpts = grocksdb.NewDefaultWriteOptions() defaultReadOpts = grocksdb.NewDefaultReadOptions() @@ -90,6 +91,10 @@ func (db *Database) Close() error { return nil } +func (db *Database) NewBatch(version uint64) (store.Batch, error) { + return NewBatch(db, version), nil +} + func (db *Database) getSlice(storeKey string, version uint64, key []byte) (*grocksdb.Slice, error) { if version < db.tsLow { return nil, store.ErrVersionPruned{EarliestVersion: db.tsLow} @@ -141,26 +146,6 @@ func (db *Database) Get(storeKey string, version uint64, key []byte) ([]byte, er return copyAndFreeSlice(slice), nil } -func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { - b := NewBatch(db, version) - - for storeKey, pairs := range cs.Pairs { - for _, kvPair := range pairs { - if kvPair.Value == nil { - if err := b.Delete(storeKey, kvPair.Key); err != nil { - return err - } - } else { - if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - } - } - } - - return b.Write() -} - // Prune attempts to prune all versions up to and including the provided version. // This is done internally by updating the full_history_ts_low RocksDB value on // the column families, s.t. all versions less than full_history_ts_low will be diff --git a/store/storage/rocksdb/db_test.go b/store/storage/rocksdb/db_test.go index c1d2868cc68f..8788bfe632e2 100644 --- a/store/storage/rocksdb/db_test.go +++ b/store/storage/rocksdb/db_test.go @@ -21,7 +21,8 @@ const ( func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ NewDB: func(dir string) (store.VersionedDatabase, error) { - return New(dir) + db, err := New(dir) + return storage.NewStorageStore(db), err }, EmptyBatchSize: 12, } @@ -33,15 +34,15 @@ func TestDatabase_ReverseIterator(t *testing.T) { require.NoError(t, err) defer db.Close() - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch := NewBatch(db, 1) for i := 0; i < 100; i++ { key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099 val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099 - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(1, cs)) + require.NoError(t, batch.Write()) // reverse iterator without an end key iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil) diff --git a/store/storage/sqlite/db.go b/store/storage/sqlite/db.go index 9a1c3421b806..05fde45b03fd 100644 --- a/store/storage/sqlite/db.go +++ b/store/storage/sqlite/db.go @@ -11,6 +11,7 @@ import ( _ "github.com/mattn/go-sqlite3" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" ) const ( @@ -40,7 +41,7 @@ const ( ` ) -var _ store.VersionedDatabase = (*Database)(nil) +var _ storage.Database = (*Database)(nil) type Database struct { storage *sql.DB @@ -91,6 +92,10 @@ func (db *Database) Close() error { return err } +func (db *Database) NewBatch(version uint64) (store.Batch, error) { + return NewBatch(db.storage, version) +} + func (db *Database) GetLatestVersion() (uint64, error) { stmt, err := db.storage.Prepare("SELECT value FROM state_storage WHERE store_key = ? AND key = ?") if err != nil { @@ -168,29 +173,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by return nil, nil } -func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { - b, err := NewBatch(db.storage, version) - if err != nil { - return err - } - - for storeKey, pairs := range cs.Pairs { - for _, kvPair := range pairs { - if kvPair.Value == nil { - if err := b.Delete(storeKey, kvPair.Key); err != nil { - return err - } - } else { - if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { - return err - } - } - } - } - - return b.Write() -} - // Prune removes all versions of all keys that are <= the given version. It keeps // the latest (non-tombstoned) version of each key/value tuple to handle queries // above the prune version. This is analogous to RocksDB full_history_ts_low. diff --git a/store/storage/sqlite/db_test.go b/store/storage/sqlite/db_test.go index f80f88baf7e0..deff98ec4348 100644 --- a/store/storage/sqlite/db_test.go +++ b/store/storage/sqlite/db_test.go @@ -19,7 +19,8 @@ const ( func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ NewDB: func(dir string) (store.VersionedDatabase, error) { - return New(dir) + db, err := New(dir) + return storage.NewStorageStore(db), err }, EmptyBatchSize: 0, } @@ -31,15 +32,16 @@ func TestDatabase_ReverseIterator(t *testing.T) { require.NoError(t, err) defer db.Close() - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch, err := db.NewBatch(1) + require.NoError(t, err) for i := 0; i < 100; i++ { key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099 val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099 - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(1, cs)) + require.NoError(t, batch.Write()) // reverse iterator without an end key iter, err := db.ReverseIterator(storeKey1, 1, []byte("key000"), nil) @@ -106,15 +108,16 @@ func TestParallelWrites(t *testing.T) { go func(i int) { <-triggerStartCh defer wg.Done() - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch, err := db.NewBatch(uint64(i + 1)) + require.NoError(t, err) for j := 0; j < kvCount; j++ { key := fmt.Sprintf("key-%d-%03d", i, j) val := fmt.Sprintf("val-%d-%03d", i, j) - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) + require.NoError(t, batch.Write()) }(i) } @@ -155,15 +158,16 @@ func TestParallelWriteAndPruning(t *testing.T) { <-triggerStartCh defer wg.Done() for i := 0; i < latestVersion; i++ { - cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {}}) + batch, err := db.NewBatch(uint64(i + 1)) + require.NoError(t, err) for j := 0; j < kvCount; j++ { key := fmt.Sprintf("key-%d-%03d", i, j) val := fmt.Sprintf("val-%d-%03d", i, j) - cs.AddKVPair(storeKey1, store.KVPair{Key: []byte(key), Value: []byte(val)}) + require.NoError(t, batch.Set(storeKey1, []byte(key), []byte(val))) } - require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) + require.NoError(t, batch.Write()) } }() // start a goroutine that prunes the database diff --git a/store/storage/storage_bench_test.go b/store/storage/storage_bench_test.go index 08ba4c8093ac..5639d5d4e84f 100644 --- a/store/storage/storage_bench_test.go +++ b/store/storage/storage_bench_test.go @@ -1,7 +1,7 @@ //go:build rocksdb // +build rocksdb -package storage +package storage_test import ( "bytes" @@ -13,21 +13,29 @@ import ( "github.com/stretchr/testify/require" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/storage" "cosmossdk.io/store/v2/storage/pebbledb" "cosmossdk.io/store/v2/storage/rocksdb" "cosmossdk.io/store/v2/storage/sqlite" ) +const ( + storeKey1 = "store1" +) + var ( backends = map[string]func(dataDir string) (store.VersionedDatabase, error){ "rocksdb_versiondb_opts": func(dataDir string) (store.VersionedDatabase, error) { - return rocksdb.New(dataDir) + db, err := rocksdb.New(dataDir) + return storage.NewStorageStore(db), err }, "pebbledb_default_opts": func(dataDir string) (store.VersionedDatabase, error) { - return pebbledb.New(dataDir) + db, err := pebbledb.New(dataDir) + return storage.NewStorageStore(db), err }, "btree_sqlite": func(dataDir string) (store.VersionedDatabase, error) { - return sqlite.New(dataDir) + db, err := sqlite.New(dataDir) + return storage.NewStorageStore(db), err }, } rng = rand.New(rand.NewSource(567320)) diff --git a/store/storage/store.go b/store/storage/store.go new file mode 100644 index 000000000000..3a6fed65db2d --- /dev/null +++ b/store/storage/store.go @@ -0,0 +1,130 @@ +package storage + +import ( + "fmt" + + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" +) + +const ( + // TODO: it is a random number, need to be tuned + defaultBatchBufferSize = 100000 +) + +var ( + _ store.VersionedDatabase = (*StorageStore)(nil) + _ snapshots.StorageSnapshotter = (*StorageStore)(nil) +) + +// StorageStore is a wrapper around the store.VersionedDatabase interface. +type StorageStore struct { + db Database +} + +// NewStorageStore returns a reference to a new StorageStore. +func NewStorageStore(db Database) *StorageStore { + return &StorageStore{ + db: db, + } +} + +// Has returns true if the key exists in the store. +func (ss *StorageStore) Has(storeKey string, version uint64, key []byte) (bool, error) { + return ss.db.Has(storeKey, version, key) +} + +// Get returns the value associated with the given key. +func (ss *StorageStore) Get(storeKey string, version uint64, key []byte) ([]byte, error) { + return ss.db.Get(storeKey, version, key) +} + +// ApplyChangeset applies the given changeset to the storage. +func (ss *StorageStore) ApplyChangeset(version uint64, cs *store.Changeset) error { + b, err := ss.db.NewBatch(version) + if err != nil { + return err + } + + for storeKey, pairs := range cs.Pairs { + for _, kvPair := range pairs { + if kvPair.Value == nil { + if err := b.Delete(storeKey, kvPair.Key); err != nil { + return err + } + } else { + if err := b.Set(storeKey, kvPair.Key, kvPair.Value); err != nil { + return err + } + } + } + } + + return b.Write() +} + +// GetLatestVersion returns the latest version of the store. +func (ss *StorageStore) GetLatestVersion() (uint64, error) { + return ss.db.GetLatestVersion() +} + +// SetLatestVersion sets the latest version of the store. +func (ss *StorageStore) SetLatestVersion(version uint64) error { + return ss.db.SetLatestVersion(version) +} + +// Iterator returns an iterator over the specified domain and prefix. +func (ss *StorageStore) Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { + return ss.db.Iterator(storeKey, version, start, end) +} + +// ReverseIterator returns an iterator over the specified domain and prefix in reverse. +func (ss *StorageStore) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { + return ss.db.ReverseIterator(storeKey, version, start, end) +} + +// Prune prunes the store up to the given version. +func (ss *StorageStore) Prune(version uint64) error { + return ss.db.Prune(version) +} + +// Restore restores the store from the given channel. +func (ss *StorageStore) Restore(version uint64, chStorage <-chan *store.KVPair) error { + latestVersion, err := ss.db.GetLatestVersion() + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + if version <= latestVersion { + return fmt.Errorf("the snapshot version %d is not greater than latest version %d", version, latestVersion) + } + + b, err := ss.db.NewBatch(version) + if err != nil { + return err + } + + for kvPair := range chStorage { + if err := b.Set(kvPair.StoreKey, kvPair.Key, kvPair.Value); err != nil { + return err + } + + if b.Size() > defaultBatchBufferSize { + if err := b.Write(); err != nil { + return err + } + } + } + + if b.Size() > 0 { + if err := b.Write(); err != nil { + return err + } + } + + return ss.db.SetLatestVersion(version) +} + +// Close closes the store. +func (ss *StorageStore) Close() error { + return ss.db.Close() +}