Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): Parallel write in the CacheMultiStore #20817

Closed
wants to merge 13 commits into from
10 changes: 6 additions & 4 deletions store/cachemulti/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"fmt"
"testing"

dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"

"cosmossdk.io/core/log"
"cosmossdk.io/store/iavl"
"cosmossdk.io/store/types"
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"
)

func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) {
db := dbm.NewMemDB()
b.Helper()

db := dbm.NewMemDB()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing against a LSM db would be more illustrative of performance improvements, let's try leveldb and pebbledb?

Copy link
Contributor Author

@cool-develope cool-develope Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not for i/o, mostly for tree updates in memory, in WorkingHash liefcycle

storeKeys := make(map[string]types.StoreKey)
stores := make(map[types.StoreKey]types.CacheWrapper)
for i := uint(0); i < storeCount; i++ {
Expand All @@ -29,6 +31,7 @@ func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey
}

func benchmarkStore(b *testing.B, storeCount, keyCount uint) {
b.Helper()
store, storeKeys := setupStore(b, storeCount)
b.ResetTimer()

Expand Down Expand Up @@ -59,5 +62,4 @@ func BenchmarkCacheMultiStore(b *testing.B) {
})
}
}

}
21 changes: 14 additions & 7 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package cachemulti
import (
"fmt"
"io"
"sync"

dbm "github.com/cosmos/cosmos-db"
"golang.org/x/sync/errgroup"

"cosmossdk.io/store/cachekv"
"cosmossdk.io/store/dbadapter"
Expand Down Expand Up @@ -123,15 +123,22 @@ func (cms Store) GetStoreType() types.StoreType {
// Write calls Write on each underlying store.
func (cms Store) Write() {
cms.db.Write()
wg := sync.WaitGroup{}
wg.Add(len(cms.stores))
eg := new(errgroup.Group)
for _, store := range cms.stores {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a concurrency limit? or is it good enough to delegate this work completely to the go scheduler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do some benchmarks with databases and come up with a optimal number of runners

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we can see in #20817 (comment), from 8 stores, there is not much improvement, let me benchmark with limited number of runners

go func(s types.CacheWrap) {
defer wg.Done()
s := store // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in Write: %v", r)
}
}()
s.Write()
}(store)
return nil
})
}
if err := eg.Wait(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the panic is handled somewhere else in case: this is not deterministic for multiple errors.

In the godoc for Wait:

returns the first non-nil error (if any) from them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to care about the multiple errors, the Write is the memory action and it won't affect the finalized state

panic(err)
}
wg.Wait()
}

// Implements CacheWrapper.
Expand Down
1 change: 1 addition & 0 deletions store/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/tidwall/btree v1.7.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
gotest.tools/v3 v3.5.1
Expand Down
2 changes: 1 addition & 1 deletion store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestCacheMultiStoreWithVersion(t *testing.T) {
// require we cannot commit (write) to a cache-versioned multi-store
require.Panics(t, func() {
kvStore.Set(k, []byte("newValue"))
kvStore.(types.CacheWrap).Write()
cms.Write()
})
}

Expand Down
Loading