Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): Parallel write in the CacheMultiStore #20817

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions store/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* [#20817](https://github.com/cosmos/cosmos-sdk/pull/20817) Parallelize the `CacheMultiStore.Write` method.
* [#19770](https://github.com/cosmos/cosmos-sdk/pull/19770) Upgrade IAVL to IAVL v1.1.1.

## v1.0.2 (January 10, 2024)
Expand Down
70 changes: 70 additions & 0 deletions store/cachemulti/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cachemulti

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

coretesting "cosmossdk.io/core/testing"
"cosmossdk.io/log"
dbm "cosmossdk.io/store/db"
"cosmossdk.io/store/iavl"
"cosmossdk.io/store/types"
)

func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) {
b.Helper()

db := coretesting.NewMemDB()
storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preallocate maps to improve performance.

Preallocating the storeKeys and stores maps with the known size storeCount can improve performance.

-  storeKeys := make(map[string]types.StoreKey)
-  stores := make(map[types.StoreKey]types.CacheWrapper)
+  storeKeys := make(map[string]types.StoreKey, storeCount)
+  stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
storeKeys := make(map[string]types.StoreKey, storeCount)
stores := make(map[types.StoreKey]types.CacheWrapper, storeCount)

for i := uint(0); i < storeCount; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%d", i))
storeKeys[key.Name()] = key
sdb := dbm.NewPrefixDB(db, []byte(key.Name()))
istore, err := iavl.LoadStore(sdb, log.NewNopLogger(), key, types.CommitID{}, 1000, false, nil)
require.NoError(b, err)
stores[key] = types.KVStore(istore)
}

return NewStore(db, stores, storeKeys, nil, types.TraceContext{}), storeKeys
}

func benchmarkStore(b *testing.B, storeCount, runnerCount, keyCount uint) {
b.Helper()
store, storeKeys := setupStore(b, storeCount)
b.ResetTimer()

b.ReportAllocs()
for i := 0; i < b.N; i++ {
b.StopTimer()
for _, key := range storeKeys {
cstore := store.GetKVStore(key)
for j := uint(0); j < keyCount; j++ {
dataKey := fmt.Sprintf("key%s-%d", key.Name(), j)
dataValue := fmt.Sprintf("value%s-%d", key.Name(), j)
cstore.Set([]byte(dataKey), []byte(dataValue))
}
}
b.StartTimer()
err := store.writeStoresParallel(int(runnerCount))
require.NoError(b, err)
}
}

func BenchmarkCacheMultiStore(b *testing.B) {
storeCounts := []uint{2, 4, 8, 16, 32}
runnerCounts := []uint{1, 2, 4, 8, 16}
keyCounts := []uint{100, 1000, 10000}

for _, storeCount := range storeCounts {
for _, keyCount := range keyCounts {
for _, runnerCount := range runnerCounts {
b.Run(fmt.Sprintf("storeCount=%d/runnerCount=%d/keyCount=%d/", storeCount, runnerCount, keyCount), func(sub *testing.B) {
benchmarkStore(sub, storeCount, runnerCount, keyCount)
})
}
}
}
}
54 changes: 49 additions & 5 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cachemulti

import (
"errors"
"fmt"
"io"
"sync"

corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/cachekv"
Expand All @@ -11,9 +13,14 @@
"cosmossdk.io/store/types"
)

// storeNameCtxKey is the TraceContext metadata key that identifies
// the store which emitted a given trace.
const storeNameCtxKey = "store_name"
const (
// storeNameCtxKey is the TraceContext metadata key that identifies
// the store which emitted a given trace.
storeNameCtxKey = "store_name"
// maxRunners is the maximum number of concurrent goroutines that
// can be used to write to the underlying stores in parallel.
maxRunners = 4
)

//----------------------------------------
// Store
Expand Down Expand Up @@ -121,9 +128,46 @@
// Write calls Write on each underlying store.
func (cms Store) Write() {
cms.db.Write()
for _, store := range cms.stores {
store.Write()

if err := cms.writeStoresParallel(maxRunners); err != nil {
panic(err)
}
}

func (cms Store) writeStoresParallel(runnerCount int) error {
sem := make(chan struct{}, runnerCount) // Semaphore to limit number of concurrent goroutines
errChan := make(chan error, len(cms.stores)) // Channel to collect errors from goroutines
var wg sync.WaitGroup

for storeKey, store := range cms.stores {
wg.Add(1)
sem <- struct{}{}

go func() {
defer func() {
wg.Done()
<-sem // Release the slot

if r := recover(); r != nil {
errChan <- fmt.Errorf("panic in Write for store %s: %v", storeKey.Name(), r)
}
}()
store.Write()
}()
Comment on lines +146 to +156

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
Comment on lines +142 to +157

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism

go func() {
wg.Wait()
close(errChan)
}()
Comment on lines +159 to +162

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

// Collect errors from goroutines
var allErrors []error
for err := range errChan {
allErrors = append(allErrors, err)
}

return errors.Join(allErrors...)
}

// Implements CacheWrapper.
Expand Down
34 changes: 34 additions & 0 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,3 +1031,37 @@ func TestCommitStores(t *testing.T) {
})
}
}

func TestCacheMultiStoreWrite(t *testing.T) {
db := coretesting.NewMemDB()
ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))
require.NoError(t, ms.LoadLatestVersion())

cacheMulti := ms.CacheMultiStore()

toVersion := int64(100)
keyCount := 100
storeKeys := []types.StoreKey{testStoreKey1, testStoreKey2, testStoreKey3}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
Comment on lines +1045 to +1054
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the nested loops into a helper function.

This will make the test function more concise and easier to understand.

+ func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for i := int64(1); i <= toVersion; i++ {
+         for _, storeKey := range storeKeys {
+             store := cacheMulti.GetKVStore(storeKey)
+             for j := 0; j < keyCount; j++ {
+                 store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
+             }
+         }
+         cacheMulti.Write()
+         ms.Commit()
+     }
+ }

for i := int64(1); i <= toVersion; i++ {
    for _, storeKey := range storeKeys {
        store := cacheMulti.GetKVStore(storeKey)
        for j := 0; j < keyCount; j++ {
            store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
        }
    }
    cacheMulti.Write()
    ms.Commit()
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
func populateStore(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}
}
for i := int64(1); i <= toVersion; i++ {
for _, storeKey := storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for j := 0; j < keyCount; j++ {
store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j)))
}
}
cacheMulti.Write()
ms.Commit()
}


// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
Comment on lines +1056 to +1066
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance readability by extracting the verification loop into a helper function.

This will make the test function more concise and easier to understand.

+ func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
+     for _, storeKey := range storeKeys {
+         store := cacheMulti.GetKVStore(storeKey)
+         for i := int64(1); i <= toVersion; i++ {
+             for j := 0; j < keyCount; j++ {
+                 key := []byte(fmt.Sprintf("key-%d-%d", i, j))
+                 value := store.Get(key)
+                 require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
+             }
+         }
+     }
+ }

for _, storeKey := range storeKeys {
    store := cacheMulti.GetKVStore(storeKey)
    for i := int64(1); i <= toVersion; i++ {
        for j := 0; j < keyCount; j++ {
            key := []byte(fmt.Sprintf("key-%d-%d", i, j))
            value := store.Get(key)
            require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
        }
    }
}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// check the data
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
func verifyStoreData(t *testing.T, cacheMulti types.CacheMultiStore, storeKeys []types.StoreKey, toVersion int64, keyCount int) {
for _, storeKey := range storeKeys {
store := cacheMulti.GetKVStore(storeKey)
for i := int64(1); i <= toVersion; i++ {
for j := 0; j < keyCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := store.Get(key)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value)
}
}
}
}
// check the data
verifyStoreData(t, cacheMulti, storeKeys, toVersion, keyCount)

}
Loading