Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store/v2): Fix PebbleDB Iteration Edge Cases #18948

Merged
merged 11 commits into from
Jan 10, 2024
13 changes: 9 additions & 4 deletions store/storage/pebbledb/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,25 @@ func (f mvccKeyFormatter) Format(s fmt.State, verb rune) {

// SplitMVCCKey accepts an MVCC key and returns the "user" key, the MVCC version,
// and a boolean indicating if the provided key is an MVCC key.
//
// Note, internally, we must make a copy of the provided mvccKey argument, which
// typically comes from the Key() method as it's not safe.
func SplitMVCCKey(mvccKey []byte) (key, version []byte, ok bool) {
if len(mvccKey) == 0 {
return nil, nil, false
}

n := len(mvccKey) - 1
tsLen := int(mvccKey[n])
mvccKeyCopy := bytes.Clone(mvccKey)

n := len(mvccKeyCopy) - 1
tsLen := int(mvccKeyCopy[n])
if n < tsLen {
return nil, nil, false
}

key = mvccKey[:n-tsLen]
key = mvccKeyCopy[:n-tsLen]
if tsLen > 0 {
version = mvccKey[n-tsLen+1 : len(mvccKey)-1]
version = mvccKeyCopy[n-tsLen+1 : len(mvccKeyCopy)-1]
}

return key, version, true
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
39 changes: 30 additions & 9 deletions store/storage/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ func (itr *iterator) Value() []byte {
}

func (itr *iterator) Next() bool {
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}
Comment on lines +116 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of panic should be replaced with proper error handling to prevent application crashes due to malformed MVCC keys.

- panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
+ // Handle error properly instead of panic
+ return fmt.Errorf("invalid PebbleDB MVCC key: %s", itr.source.Key())

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
// Handle error properly instead of panic
return fmt.Errorf("invalid PebbleDB MVCC key: %s", itr.source.Key())
}

The panic in the Next method when encountering a malformed MVCC key is a drastic measure that could cause the entire application to crash. Consider replacing this with proper error handling that allows the application to recover gracefully from such an error.


var next bool
if itr.reverse {
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}

// Since PebbleDB has no PrevPrefix API, we must manually seek to the next
// key that is lexicographically less than the current key.
next = itr.source.SeekLT(MVCCEncode(currKey, 0))
Expand All @@ -135,7 +135,7 @@ func (itr *iterator) Next() bool {

// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
// so we seek back to the latest desired version, s.t. the version <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
Comment on lines 133 to 138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for handling reverse iteration and seeking to the next prefix seems to be missing the actual implementation for reverse iteration, as the SeekLT method is called regardless of the itr.reverse value. This could be an oversight or incomplete implementation. The code should either implement the reverse iteration logic or remove the itr.reverse check if reverse iteration is not supported.

Expand All @@ -150,10 +150,31 @@ func (itr *iterator) Next() bool {
return itr.valid
}

// Move the iterator to the closest version to the desired version, so we
// Move the iterator to the closest version of the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))

tmpKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return itr.valid
}

// There exists cases where the SeekLT() call moved us back to the same key
// we started at, so we must move to next key, i.e. two keys forward.
if bytes.Equal(tmpKey, currKey) {
if itr.source.SeekGE(MVCCEncode(nextKey, 0)) {
if itr.source.NextPrefix() {
return itr.Next()
}
}

itr.valid = false
return itr.valid
}
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
Expand Down
30 changes: 30 additions & 0 deletions store/storage/storage_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,40 @@ func (s *StorageTestSuite) TestDatabase_IteratorMultiVersion() {
i = (i + 1) % 10
count++
}

s.Require().Equal(10, count)
s.Require().NoError(itr.Error())
}

func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)

defer db.Close()

cs := store.NewChangeset(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyC"), Value: []byte("value003")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))

cs = store.NewChangeset(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyC"), Value: []byte("value004")},
}})
s.Require().NoError(db.ApplyChangeset(58833605, cs))

cs = store.NewChangeset(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyD"), Value: []byte("value006")},
}})
s.Require().NoError(db.ApplyChangeset(58833606, cs))

itr, err := db.Iterator(storeKey1, 58831525, []byte("key"), nil)
s.Require().NoError(err)
defer itr.Close()

for ; itr.Valid(); itr.Next() {
}
}
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)
Expand Down