From 0b1299573095c852548760eeec2b14bb13c79111 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 10 Jan 2024 03:55:24 -0500 Subject: [PATCH] fix(store/v2): Fix PebbleDB Iteration Edge Cases (#18948) --- store/storage/pebbledb/comparator.go | 13 +++++--- store/storage/pebbledb/iterator.go | 37 +++++++++++++++++------ store/storage/storage_test_suite.go | 44 ++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/store/storage/pebbledb/comparator.go b/store/storage/pebbledb/comparator.go index b6f5aef24a66..08b360e6c079 100644 --- a/store/storage/pebbledb/comparator.go +++ b/store/storage/pebbledb/comparator.go @@ -137,20 +137,25 @@ func (f mvccKeyFormatter) Format(s fmt.State, verb rune) { // SplitMVCCKey accepts an MVCC key and returns the "user" key, the MVCC version, // and a boolean indicating if the provided key is an MVCC key. +// +// Note, internally, we must make a copy of the provided mvccKey argument, which +// typically comes from the Key() method as it's not safe. func SplitMVCCKey(mvccKey []byte) (key, version []byte, ok bool) { if len(mvccKey) == 0 { return nil, nil, false } - n := len(mvccKey) - 1 - tsLen := int(mvccKey[n]) + mvccKeyCopy := bytes.Clone(mvccKey) + + n := len(mvccKeyCopy) - 1 + tsLen := int(mvccKeyCopy[n]) if n < tsLen { return nil, nil, false } - key = mvccKey[:n-tsLen] + key = mvccKeyCopy[:n-tsLen] if tsLen > 0 { - version = mvccKey[n-tsLen+1 : len(mvccKey)-1] + version = mvccKeyCopy[n-tsLen+1 : len(mvccKeyCopy)-1] } return key, version, true diff --git a/store/storage/pebbledb/iterator.go b/store/storage/pebbledb/iterator.go index 10b03bbe1143..abb746863d88 100644 --- a/store/storage/pebbledb/iterator.go +++ b/store/storage/pebbledb/iterator.go @@ -113,15 +113,15 @@ func (itr *iterator) Value() []byte { } func (itr *iterator) Next() { + currKey, _, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key())) + } + var next bool if itr.reverse { - currKey, _, ok := SplitMVCCKey(itr.source.Key()) - if !ok { - // XXX: This should not happen as that would indicate we have a malformed - // MVCC key. - panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key())) - } - // Since PebbleDB has no PrevPrefix API, we must manually seek to the next // key that is lexicographically less than the current key. next = itr.source.SeekLT(MVCCEncode(currKey, 0)) @@ -132,7 +132,7 @@ func (itr *iterator) Next() { // First move the iterator to the next prefix, which may not correspond to the // desired version for that key, e.g. if the key was written at a later version, - // so we seek back to the latest desired version, s.t. the version is <= itr.version. + // so we seek back to the latest desired version, s.t. the version <= itr.version. if next { nextKey, _, ok := SplitMVCCKey(itr.source.Key()) if !ok { @@ -147,10 +147,29 @@ func (itr *iterator) Next() { return } - // Move the iterator to the closest version to the desired version, so we + // Move the iterator to the closest version of the desired version, so we // append the current iterator key to the prefix and seek to that key. itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1)) + tmpKey, _, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + itr.valid = false + return + } + + // There exists cases where the SeekLT() call moved us back to the same key + // we started at, so we must move to next key, i.e. two keys forward. + if bytes.Equal(tmpKey, currKey) { + if itr.source.NextPrefix() { + itr.Next() + } else { + itr.valid = false + return + } + } + // The cursor might now be pointing at a key/value pair that is tombstoned. // If so, we must move the cursor. if itr.valid && itr.cursorTombstoned() { diff --git a/store/storage/storage_test_suite.go b/store/storage/storage_test_suite.go index 303d77bd13c2..b43ed408958a 100644 --- a/store/storage/storage_test_suite.go +++ b/store/storage/storage_test_suite.go @@ -391,10 +391,54 @@ func (s *StorageTestSuite) TestDatabase_IteratorMultiVersion() { i = (i + 1) % 10 count++ } + s.Require().Equal(10, count) s.Require().NoError(itr.Error()) } +func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() { + db, err := s.NewDB(s.T().TempDir()) + s.Require().NoError(err) + + defer db.Close() + + cs := store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { + {Key: []byte("keyC"), Value: []byte("value003")}, + }}) + s.Require().NoError(db.ApplyChangeset(58827506, cs)) + + cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { + {Key: []byte("keyE"), Value: []byte("value000")}, + }}) + s.Require().NoError(db.ApplyChangeset(58827506, cs)) + + cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { + {Key: []byte("keyF"), Value: []byte("value000")}, + }}) + s.Require().NoError(db.ApplyChangeset(58827506, cs)) + + cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { + {Key: []byte("keyC"), Value: []byte("value004")}, + }}) + s.Require().NoError(db.ApplyChangeset(58833605, cs)) + + cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { + {Key: []byte("keyD"), Value: []byte("value006")}, + }}) + s.Require().NoError(db.ApplyChangeset(58833606, cs)) + + itr, err := db.Iterator(storeKey1, 58831525, []byte("key"), nil) + s.Require().NoError(err) + defer itr.Close() + + count := make(map[string]struct{}) + for ; itr.Valid(); itr.Next() { + count[string(itr.Key())] = struct{}{} + } + + s.Require().Equal(3, len(count)) +} + func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() { db, err := s.NewDB(s.T().TempDir()) s.Require().NoError(err)