Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): Parallel write in the CacheMultiStore #20817

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions store/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* [#20817](https://github.com/cosmos/cosmos-sdk/pull/20817) Parallelize the `CacheMultiStore.Write` method.
* [#19770](https://github.com/cosmos/cosmos-sdk/pull/19770) Upgrade IAVL to IAVL v1.1.1.

## v1.0.2 (January 10, 2024)
Expand Down
9 changes: 8 additions & 1 deletion store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cachemulti
import (
"fmt"
"io"
"sync"

dbm "github.com/cosmos/cosmos-db"

Expand Down Expand Up @@ -122,9 +123,15 @@ func (cms Store) GetStoreType() types.StoreType {
// Write calls Write on each underlying store.
func (cms Store) Write() {
cms.db.Write()
wg := sync.WaitGroup{}
wg.Add(len(cms.stores))
for _, store := range cms.stores {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a concurrency limit? or is it good enough to delegate this work completely to the go scheduler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do some benchmarks with databases and come up with a optimal number of runners

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we can see in #20817 (comment), from 8 stores, there is not much improvement, let me benchmark with limited number of runners

store.Write()
go func(s types.CacheWrap) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the time, the dirty writes are small or even empty, does it justify the overhead of goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it would be helpful if there were only two meaningful writes (two stores), since WorkingHash requires the iavl tree re-building and root hash (one of bottlenecks)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should get some benchmarks.

defer wg.Done()
s.Write()
}(store)
Fixed Show fixed Hide fixed
}
wg.Wait()
}

// Implements CacheWrapper.
Expand Down
39 changes: 38 additions & 1 deletion store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestCacheMultiStoreWithVersion(t *testing.T) {
// require we cannot commit (write) to a cache-versioned multi-store
require.Panics(t, func() {
kvStore.Set(k, []byte("newValue"))
cms.Write()
kvStore.(types.CacheWrap).Write()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I realized recovering panics from multi goroutines is tricky, @alpe have you any idea?
the above update is same check with original one, but curious

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume, you want to test for cms.Write() to panic. Your workaround with kvStore.(types.CacheWrap).Write() is pragmatic but with a cms.Write() you give away control from the main thread. Therefore the caller can not handle panics which is the bigger problem.
Another issue would be deterministic behaviour when more than one store panics.
You could recover in the go routines, collect and sort the errors. Maybe it is worth to refactor to error return values instead of panics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

})
}

Expand Down Expand Up @@ -1032,3 +1032,40 @@ func TestCommitStores(t *testing.T) {
})
}
}

func TestCacheMultiStoreWrite(t *testing.T) {
// for testing the cacheMultiStore parallel write, file based db is used
db, err := dbm.NewDB("test", dbm.GoLevelDBBackend, t.TempDir())
require.NoError(t, err)

ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
require.NoError(t, ms.LoadLatestVersion())

cacheMulti := ms.CacheMultiStore()

toVersion := int64(100)
keyCount := 100
storeKeys := []types.StoreKey{testStoreKey1, testStoreKey2, testStoreKey3}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
Comment on lines +1045 to +1054
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the nested loops into a helper function.

This will make the test function more concise and easier to understand.

+ func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for i := int64(1); i <= toVersion; i++ {
+         for _, storeKey := range storeKeys {
+             store := cacheMulti.GetKVStore(storeKey)
+             for j := 0; j < keyCount; j++ {
+                 store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
+             }
+         }
+         cacheMulti.Write()
+         ms.Commit()
+     }
+ }

for i := int64(1); i <= toVersion; i++ {
    for _, storeKey := range storeKeys {
        store := cacheMulti.GetKVStore(storeKey)
        for j := 0; j < keyCount; j++ {
            store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
        }
    }
    cacheMulti.Write()
    ms.Commit()
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}


// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
Comment on lines +1056 to +1066
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the verification loop into a helper function.

This will make the test function more concise and easier to understand.

+ func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for _, storeKey := range storeKeys {
+         store := cacheMulti.GetKVStore(storeKey)
+         for i := int64(1); i <= toVersion; i++ {
+             for j := 0; j < keyCount; j++ {
+                 key := []byte(fmt.Sprintf("key-%d-%d", i, j))
+                 value := store.Get(key)
+                 require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
+             }
+         }
+     }
+ }

for _, storeKey := range storeKeys {
    store := cacheMulti.GetKVStore(storeKey)
    for i := int64(1); i <= toVersion; i++ {
        for j := 0; j < keyCount; j++ {
            key := []byte(fmt.Sprintf("key-%d-%d", i, j))
            value := store.Get(key)
            require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
        }
    }
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
}
// check the data
verifyStoreData(t, cacheMulti, storeKeys, toVersion, keyCount)

}
Loading