Skip to content

Commit

Permalink
feat: walk state for 4 finality
Browse files Browse the repository at this point in the history
  • Loading branch information
LinZexiao committed Dec 25, 2023
1 parent dc9d2be commit 2e41814
Show file tree
Hide file tree
Showing 14 changed files with 1,589 additions and 332 deletions.
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func main() {
_ = logging.SetLogLevel("pubsub", "error")
_ = logging.SetLogLevel("relay", "error")
_ = logging.SetLogLevel("dht/RtRefreshManager", "error")
// todo: remove it
_ = logging.SetLogLevel("splitstore", "debug")
_ = logging.SetLogLevel("chainsync.syncer", "debug")

} else {
level, err := logging.LevelFromString(lvl)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ func newDefaultAPIConfig() *APIConfig {
// DatastoreConfig holds all the configuration options for the datastore.
// TODO: use the advanced datastore configuration from ipfs
type DatastoreConfig struct {
Type string `json:"type"`
Path string `json:"path"`
SplitstoreSize int64 `json:"splitstore_size"`
SplitstoreCount int `json:"splitstore_count"`
Type string `json:"type"`
Path string `json:"path"`
SplitstoreSize int64 `json:"splitstoreSize"`
SplitstoreCount int `json:"splitstoreCount"`
SplitstoreInitProtectEpoch int64 `json:"splitstoreInitProtectEpoch"`
}

// Validators hold the list of validation functions for each configuration
Expand All @@ -98,7 +99,7 @@ func newDefaultDatastoreConfig() *DatastoreConfig {
return &DatastoreConfig{
Type: "badgerds",
Path: "badger",
SplitstoreSize: int64(3 * policy.ChainFinality),
SplitstoreSize: int64(5 * policy.ChainFinality),
SplitstoreCount: 3,
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/consensus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/venus/pkg/fvm"
"github.com/filecoin-project/venus/pkg/vm/vmcontext"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/reward"
"github.com/filecoin-project/venus/venus-shared/blockstore"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
Expand Down Expand Up @@ -95,6 +96,11 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
)

makeVM := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) {
bs := vmOpts.Bsstore
if ts.Height() == 1185554 {
bs = blockstore.NewLogStore("./bs.log", bs)
// _, _ = bs.Has(ctx, base)
}
vmOpt := vm.VmOption{
CircSupplyCalculator: vmOpts.CircSupplyCalculator,
LookbackStateGetter: vmOpts.LookbackStateGetter,
Expand All @@ -107,7 +113,7 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
Timestamp: timestamp,
GasPriceSchedule: vmOpts.GasPriceSchedule,
PRoot: base,
Bsstore: vmOpts.Bsstore,
Bsstore: bs,
SysCallsImpl: vmOpts.SysCallsImpl,
TipSetGetter: vmOpts.TipSetGetter,
Tracing: vmOpts.Tracing,
Expand Down
5 changes: 3 additions & 2 deletions pkg/repo/fsrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,9 @@ func (r *FSRepo) openDatastore() error {

ssPath := filepath.Join(r.path, splitstorePrefix)
opt := splitstore.Option{
MaxStoreCount: r.cfg.Datastore.SplitstoreCount,
StoreSize: abi.ChainEpoch(r.cfg.Datastore.SplitstoreSize),
MaxLayerCount: Config.Datastore.SplitstoreCount,
LayerSize: abi.ChainEpoch(Config.Datastore.SplitstoreSize),
InitSyncProtect: abi.ChainEpoch(Config.Datastore.SplitstoreInitProtectEpoch),
}
splitstore, err := splitstore.NewSplitstore(ssPath, ds, opt)
if err != nil {
Expand Down
102 changes: 102 additions & 0 deletions venus-shared/blockstore/log_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package blockstore

import (
"context"
llog "log"
"os"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)

type LogStore struct {
logger *llog.Logger
bs Blockstore
}

// DeleteMany implements Blockstore.
func (l *LogStore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
for _, c := range cids {
l.log(c, "delete", "")
}
return l.bs.DeleteMany(ctx, cids)
}

// Flush implements Blockstore.
func (l *LogStore) Flush(ctx context.Context) error {
l.logger.Println("flush")
return l.bs.Flush(ctx)
}

// View implements Blockstore.
func (l *LogStore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
l.log(cid, "view", "")
return l.bs.View(ctx, cid, callback)
}

// AllKeysChan implements blockstore.Blockstore.
func (l *LogStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return l.AllKeysChan(ctx)

Check failure on line 39 in venus-shared/blockstore/log_store.go

View workflow job for this annotation

GitHub Actions / check

SA5007: infinite recursive call (staticcheck)
}

// DeleteBlock implements blockstore.Blockstore.
func (l *LogStore) DeleteBlock(ctx context.Context, c cid.Cid) error {
l.log(c, "delete", "")
return l.bs.DeleteBlock(ctx, c)
}

// Get implements blockstore.Blockstore.
func (l *LogStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
l.log(c, "get", "")
return l.bs.Get(ctx, c)
}

// GetSize implements blockstore.Blockstore.
func (l *LogStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
l.log(c, "getsize", "")
return l.bs.GetSize(ctx, c)
}

// Has implements blockstore.Blockstore.
func (l *LogStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
l.log(c, "has", "")
return l.bs.Has(ctx, c)
}

// HashOnRead implements blockstore.Blockstore.
func (l *LogStore) HashOnRead(enabled bool) {
l.bs.HashOnRead(enabled)
}

// Put implements blockstore.Blockstore.
func (l *LogStore) Put(ctx context.Context, b blocks.Block) error {
l.log(b.Cid(), "put", "")
return l.bs.Put(ctx, b)
}

// DeleteMany implements blockstore.Blockstore.
func (l *LogStore) PutMany(ctx context.Context, bs []blocks.Block) error {
for _, b := range bs {
l.log(b.Cid(), "put", "")
}
return l.bs.PutMany(ctx, bs)
}

var _ Blockstore = (*LogStore)(nil)

func NewLogStore(path string, bs Blockstore) *LogStore {
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
logger := llog.New(file, "", llog.LstdFlags)
logger.Println("log store opened")
return &LogStore{
logger: logger,
bs: bs,
}
}

func (l *LogStore) log(c cid.Cid, op, msg string) {
l.logger.Printf("%s %s %s", c.String(), op, msg)
}
43 changes: 36 additions & 7 deletions venus-shared/blockstore/splitstore/compose_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,48 @@ var log = logging.New("splitstore")
// Write: write to primary store only
// Delete: Delete from all store
type ComposeStore struct {
primary blockstore.Blockstore
secondary blockstore.Blockstore
shouldSync bool
primary blockstore.Blockstore
secondary blockstore.Blockstore
}

// NewComposeStore create a new ComposeStore with a list of blockstore
// low priority come first
func NewComposeStore(bs ...blockstore.Blockstore) blockstore.Blockstore {
if len(bs) == 0 {
switch len(bs) {
case 0:
return nil
case 1:
return bs[0]
}
ret := bs[0]
for i := 1; i < len(bs); i++ {
return Compose(bs...)
}

func Compose(bs ...blockstore.Blockstore) *ComposeStore {
switch len(bs) {
case 0:
return nil
case 1:
return &ComposeStore{
shouldSync: false,
primary: bs[0],
secondary: bs[0],
}
}

ret := &ComposeStore{
shouldSync: false,
primary: bs[1],
secondary: bs[0],
}
for i := 2; i < len(bs); i++ {
ret = &ComposeStore{
primary: bs[i],
secondary: ret,
shouldSync: i == len(bs)-1,
primary: bs[i],
secondary: ret,
}
}

return ret
}

Expand Down Expand Up @@ -187,6 +212,10 @@ func (cs *ComposeStore) View(ctx context.Context, c cid.Cid, cb func([]byte) err

// sync sync block from secondly to primary
func (cs *ComposeStore) sync(ctx context.Context, c cid.Cid, b blocks.Block) {
if !cs.shouldSync {
return
}

go func() {
select {
case <-ctx.Done():
Expand Down
24 changes: 10 additions & 14 deletions venus-shared/blockstore/splitstore/compose_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import (
"github.com/stretchr/testify/require"
)

func TestNewComposeStore(t *testing.T) {
s := NewComposeStore(nil, nil)
require.True(t, s.(*ComposeStore).shouldSync)

s = NewComposeStore(nil, nil, nil)
require.True(t, s.(*ComposeStore).shouldSync)
require.False(t, s.(*ComposeStore).secondary.(*ComposeStore).shouldSync)
require.Nil(t, s.(*ComposeStore).secondary.(*ComposeStore).secondary)
}

func TestComposeStoreGet(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t)
Expand Down Expand Up @@ -340,20 +350,6 @@ func TestComposeStoreDelete(t *testing.T) {
})
}

func TestNewComposeStore(t *testing.T) {
tempDir := t.TempDir()

primaryPath := filepath.Join(tempDir, "primary")
optPri, err := blockstore.BadgerBlockstoreOptions(primaryPath, false)
require.NoError(t, err)
dsPri, err := blockstore.Open(optPri)
require.NoError(t, err)

cs := NewComposeStore(dsPri)
_, err = cs.Has(context.Background(), cid.Undef)
require.NoError(t, err)
}

func getBlockstore(t *testing.T) (compose, primary, secondary, tertiary blockstore.Blockstore) {
tempDir := t.TempDir()

Expand Down
Loading

0 comments on commit 2e41814

Please sign in to comment.