-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(store/v2): snapshot manager #18458
Changes from 6 commits
8ddbff9
2569a3f
c0dcf8f
0f462be
9f876dc
beb0ef4
c792bde
5c25952
c558a40
2c30672
07185be
d03866b
19d0cd2
0598995
6d728b1
5026749
fdf6419
91dd007
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,39 @@ | ||
package iavl | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math" | ||
|
||
dbm "github.com/cosmos/cosmos-db" | ||
protoio "github.com/cosmos/gogoproto/io" | ||
"github.com/cosmos/iavl" | ||
ics23 "github.com/cosmos/ics23/go" | ||
|
||
log "cosmossdk.io/log" | ||
"cosmossdk.io/store/v2" | ||
snapshottypes "cosmossdk.io/store/v2/snapshots/types" | ||
) | ||
|
||
var _ store.Committer = (*IavlTree)(nil) | ||
|
||
var _ snapshottypes.CommitSnapshotter = (*IavlTree)(nil) | ||
|
||
// IavlTree is a wrapper around iavl.MutableTree. | ||
type IavlTree struct { | ||
tree *iavl.MutableTree | ||
|
||
// storeKey is the identifier of the store. | ||
storeKey string | ||
} | ||
|
||
// NewIavlTree creates a new IavlTree instance. | ||
func NewIavlTree(db dbm.DB, logger log.Logger, cfg *Config) *IavlTree { | ||
func NewIavlTree(db dbm.DB, logger log.Logger, storeKey string, cfg *Config) *IavlTree { | ||
tree := iavl.NewMutableTree(db, cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger) | ||
return &IavlTree{ | ||
tree: tree, | ||
tree: tree, | ||
storeKey: storeKey, | ||
} | ||
tac0turtle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
|
@@ -83,6 +94,142 @@ func (t *IavlTree) Prune(version uint64) error { | |
return t.tree.DeleteVersionsTo(int64(version)) | ||
} | ||
|
||
// Snapshot implements snapshottypes.CommitSnapshotter. | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error { | ||
if version == 0 { | ||
return fmt.Errorf("the snapshot version must be greater than 0") | ||
} | ||
|
||
latestVersion := t.GetLatestVersion() | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if version > latestVersion { | ||
return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) | ||
} | ||
|
||
tree, err := t.tree.GetImmutable(int64(version)) | ||
if err != nil { | ||
return fmt.Errorf("failed to get immutable tree for version %d: %w", version, err) | ||
} | ||
|
||
exporter, err := tree.Export() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we please call this
tac0turtle marked this conversation as resolved.
Show resolved
Hide resolved
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return fmt.Errorf("failed to export tree for version %d: %w", version, err) | ||
} | ||
|
||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer exporter.Close() | ||
|
||
err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ | ||
Item: &snapshottypes.SnapshotItem_Store{ | ||
Store: &snapshottypes.SnapshotStoreItem{ | ||
Name: t.storeKey, | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to write store name: %w", err) | ||
} | ||
|
||
for { | ||
node, err := exporter.Next() | ||
if errors.Is(err, iavl.ErrorExportDone) { | ||
break | ||
} else if err != nil { | ||
return fmt.Errorf("failed to get the next export node: %w", err) | ||
} | ||
|
||
if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{ | ||
Item: &snapshottypes.SnapshotItem_IAVL{ | ||
IAVL: &snapshottypes.SnapshotIAVLItem{ | ||
Key: node.Key, | ||
Value: node.Value, | ||
Height: int32(node.Height), | ||
Version: node.Version, | ||
}, | ||
}, | ||
}); err != nil { | ||
return fmt.Errorf("failed to write iavl node: %w", err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Restore implements snapshottypes.CommitSnapshotter. | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (t *IavlTree) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) { | ||
var ( | ||
importer *iavl.Importer | ||
snapshotItem snapshottypes.SnapshotItem | ||
) | ||
|
||
loop: | ||
for { | ||
snapshotItem = snapshottypes.SnapshotItem{} | ||
err := protoReader.ReadMsg(&snapshotItem) | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} else if err != nil { | ||
return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) | ||
} | ||
|
||
switch item := snapshotItem.Item.(type) { | ||
case *snapshottypes.SnapshotItem_Store: | ||
t.storeKey = item.Store.Name | ||
importer, err = t.tree.Import(int64(version)) | ||
if err != nil { | ||
return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) | ||
} | ||
defer importer.Close() | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
case *snapshottypes.SnapshotItem_IAVL: | ||
if importer == nil { | ||
return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") | ||
} | ||
if item.IAVL.Height > int32(math.MaxInt8) { | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", | ||
item.IAVL.Height, math.MaxInt8) | ||
} | ||
node := &iavl.ExportNode{ | ||
Key: item.IAVL.Key, | ||
Value: item.IAVL.Value, | ||
Height: int8(item.IAVL.Height), | ||
Version: item.IAVL.Version, | ||
} | ||
// Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does | ||
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. | ||
if node.Key == nil { | ||
node.Key = []byte{} | ||
} | ||
if node.Height == 0 { | ||
if node.Value == nil { | ||
node.Value = []byte{} | ||
} | ||
// If the node is a leaf node, it will be written to the storage. | ||
chStorage <- &store.KVPair{ | ||
StoreKey: t.storeKey, | ||
Key: node.Key, | ||
Value: node.Value, | ||
} | ||
} | ||
err := importer.Add(node) | ||
if err != nil { | ||
return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if err := import.Add(node); err != nil {
return si, fmt.Errorf("failed to add node to impoorter: %w", err)
} |
||
default: | ||
break loop | ||
} | ||
} | ||
|
||
if importer != nil { | ||
err := importer.Commit() | ||
if err != nil { | ||
return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) | ||
} | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
_, err := t.tree.LoadVersion(int64(version)) | ||
|
||
return snapshotItem, err | ||
} | ||
|
||
// Close closes the iavl tree. | ||
func (t *IavlTree) Close() error { | ||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,14 +26,12 @@ func TestPruningTestSuite(t *testing.T) { | |
} | ||
|
||
func (s *PruningTestSuite) SetupTest() { | ||
noopLog := log.NewNopLogger() | ||
|
||
ss, err := sqlite.New(s.T().TempDir()) | ||
s.Require().NoError(err) | ||
|
||
sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) | ||
sc := iavl.NewIavlTree(dbm.NewMemDB(), log.NewNopLogger(), "", iavl.DefaultConfig()) | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
s.manager = NewManager(noopLog, ss, sc) | ||
s.manager = NewManager(log.NewTestLogger(s.T()), ss, sc) | ||
s.ss = ss | ||
s.sc = sc | ||
} | ||
|
@@ -49,12 +47,16 @@ func (s *PruningTestSuite) TestPruning() { | |
s.manager.Start() | ||
|
||
latestVersion := uint64(100) | ||
kvCount := 10 | ||
|
||
// write 10 batches | ||
// write batches | ||
for i := uint64(0); i < latestVersion; i++ { | ||
version := i + 1 | ||
cs := store.NewChangeset() | ||
cs.Add([]byte("key"), []byte(fmt.Sprintf("value%d", version))) | ||
for j := 0; j < kvCount; j++ { | ||
cs.Add([]byte(fmt.Sprintf("key-%d", j)), []byte(fmt.Sprintf("value-%d-%d", version, j))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this cause a conflict of keys written given than they'll go kvCount times but be repeated latestVersion times? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The combination of |
||
} | ||
|
||
err := s.sc.WriteBatch(cs) | ||
s.Require().NoError(err) | ||
_, err = s.sc.Commit() | ||
|
@@ -68,20 +70,20 @@ func (s *PruningTestSuite) TestPruning() { | |
s.manager.Stop() | ||
|
||
// check the store for the version 96 | ||
val, err := s.ss.Get("", latestVersion-4, []byte("key")) | ||
val, err := s.ss.Get("", latestVersion-4, []byte("key-0")) | ||
s.Require().NoError(err) | ||
s.Require().Equal([]byte("value96"), val) | ||
s.Require().Equal([]byte("value-96-0"), val) | ||
// check the store for the version 50 | ||
val, err = s.ss.Get("", 50, []byte("key")) | ||
s.Require().NoError(err) | ||
s.Require().Nil(val) | ||
|
||
// check the commitment for the version 96 | ||
proof, err := s.sc.GetProof(latestVersion-4, []byte("key")) | ||
proof, err := s.sc.GetProof(latestVersion-4, []byte("key-0")) | ||
s.Require().NoError(err) | ||
s.Require().NotNil(proof.GetExist()) | ||
// check the commitment for the version 95 | ||
proof, err = s.sc.GetProof(latestVersion-5, []byte("key")) | ||
proof, err = s.sc.GetProof(latestVersion-5, []byte("key-0")) | ||
s.Require().Error(err) | ||
s.Require().Nil(proof) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought there was discussion around abstracting storekey away? does this change the direction of that conversation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the expectation is that
dbm.DB
is a PrefixDb (for iavl v0 & v1) so the store key prefix is already baked into that. I guess storekey is required for the snapshot API? (still reading this PR).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db dbm.DB
is not required or used in iavl v2 (since it's an interface to an underlying generic KV store). Maybe we should fold this intoConfig
if we want to support multiple iavl versions in store v2?