Skip to content

Commit

Permalink
add option to discard state writes in StateCompute and StateReplay.
Browse files Browse the repository at this point in the history
This required a small refactor in some execution APIs. Instead of
adding one more parameter in methods that are already parameter-heavy,
I chose to wrap arguments in an options struct.
  • Loading branch information
raulk committed Mar 12, 2023
1 parent 9412753 commit 530b1d1
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 19 deletions.
25 changes: 14 additions & 11 deletions chain/consensus/compute_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
bms []FilecoinBlockMessages,
epoch abi.ChainEpoch,
r vm.Rand,
em stmgr.ExecMonitor,
vmTracing bool,
opts *stmgr.ExecutorOpts,
baseFee abi.TokenAmount,
ts *types.TipSet) (cid.Cid, cid.Cid, error) {
done := metrics.Timer(ctx, metrics.VMApplyBlocksTotal)
Expand All @@ -91,22 +90,27 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
partDone()
}()

bs := sm.ChainStore().StateBlockstore()
if opts.DiscardState {
bs = blockstore.NewBuffered(bs)
}

ctx = blockstore.WithHotView(ctx)
makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) {
vmopt := &vm.VMOpts{
StateBase: base,
Epoch: e,
Timestamp: timestamp,
Rand: r,
Bstore: sm.ChainStore().StateBlockstore(),
Bstore: bs,
Actors: NewActorRegistry(),
Syscalls: sm.Syscalls,
CircSupplyCalc: sm.GetVMCirculatingSupply,
NetworkVersion: sm.GetNetworkVersion(ctx, e),
BaseFee: baseFee,
LookbackState: stmgr.LookbackStateGetterForTipset(sm, ts),
TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts),
Tracing: vmTracing,
Tracing: opts.VmTracing,
ReturnEvents: sm.ChainStore().IsStoringEvents(),
}

Expand All @@ -130,7 +134,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
return xerrors.Errorf("running cron: %w", err)
}

if em != nil {
if em := opts.ExecMonitor; em != nil {
if err := em.MessageApplied(ctx, ts, cronMsg.Cid(), cronMsg, ret, true); err != nil {
return xerrors.Errorf("callback failed on cron message: %w", err)
}
Expand Down Expand Up @@ -175,7 +179,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,

// handle state forks
// XXX: The state tree
pstate, err = sm.HandleStateForks(ctx, pstate, i, em, ts)
pstate, err = sm.HandleStateForks(ctx, pstate, i, opts.ExecMonitor, ts)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("error handling state forks: %w", err)
}
Expand Down Expand Up @@ -219,7 +223,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
events = append(events, r.Events)
}

if em != nil {
if em := opts.ExecMonitor; em != nil {
if err := em.MessageApplied(ctx, ts, cm.Cid(), m, r, false); err != nil {
return cid.Undef, cid.Undef, err
}
Expand All @@ -233,7 +237,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
GasReward: gasReward,
WinCount: b.WinCount,
}
rErr := t.reward(ctx, vmi, em, epoch, ts, params)
rErr := t.reward(ctx, vmi, opts.ExecMonitor, epoch, ts, params)
if rErr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("error applying reward: %w", rErr)
}
Expand Down Expand Up @@ -291,8 +295,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
func (t *TipSetExecutor) ExecuteTipSet(ctx context.Context,
sm *stmgr.StateManager,
ts *types.TipSet,
em stmgr.ExecMonitor,
vmTracing bool) (stateroot cid.Cid, rectsroot cid.Cid, err error) {
opts *stmgr.ExecutorOpts) (stateroot cid.Cid, rectsroot cid.Cid, err error) {
ctx, span := trace.StartSpan(ctx, "computeTipSetState")
defer span.End()

Expand Down Expand Up @@ -340,7 +343,7 @@ func (t *TipSetExecutor) ExecuteTipSet(ctx context.Context,
}
baseFee := blks[0].ParentBaseFee

return t.ApplyBlocks(ctx, sm, parentEpoch, pstate, fbmsgs, blks[0].Height, r, em, vmTracing, baseFee, ts)
return t.ApplyBlocks(ctx, sm, parentEpoch, pstate, fbmsgs, blks[0].Height, r, opts, baseFee, ts)
}

func (t *TipSetExecutor) StoreEventsAMT(ctx context.Context, cs *store.ChainStore, events []types.Event) (cid.Cid, error) {
Expand Down
6 changes: 5 additions & 1 deletion chain/stmgr/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ func (sm *StateManager) Replay(ctx context.Context, ts *types.TipSet, mcid cid.C
// message to find
finder.mcid = mcid

_, _, err := sm.tsExec.ExecuteTipSet(ctx, sm, ts, &finder, true)
_, _, err := sm.tsExec.ExecuteTipSet(ctx, sm, ts, &ExecutorOpts{
ExecMonitor: &finder,
VmTracing: true,
DiscardState: true, // safe to skip the write since this is throwaway state.
})
if err != nil && !xerrors.Is(err, errHaltExecution) {
return nil, nil, xerrors.Errorf("unexpected error during execution: %w", err)
}
Expand Down
20 changes: 18 additions & 2 deletions chain/stmgr/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ import (
)

func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st cid.Cid, rec cid.Cid, err error) {
return sm.TipSetStateWithOpts(ctx, ts, &TipSetStateOpts{})
}

type TipSetStateOpts struct {
// DiscardState: see ExecutorOpts.DiscardState.
DiscardState bool
}

func (sm *StateManager) TipSetStateWithOpts(ctx context.Context, ts *types.TipSet, opts *TipSetStateOpts) (st cid.Cid, rec cid.Cid, err error) {
ctx, span := trace.StartSpan(ctx, "tipSetState")
defer span.End()
if span.IsRecordingEvents() {
Expand Down Expand Up @@ -59,7 +68,10 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st c
return st, rec, nil
}

st, rec, err = sm.tsExec.ExecuteTipSet(ctx, sm, ts, sm.tsExecMonitor, false)
st, rec, err = sm.tsExec.ExecuteTipSet(ctx, sm, ts, &ExecutorOpts{
ExecMonitor: sm.tsExecMonitor,
DiscardState: opts.DiscardState,
})
if err != nil {
return cid.Undef, cid.Undef, err
}
Expand Down Expand Up @@ -113,7 +125,11 @@ func tryLookupTipsetState(ctx context.Context, cs *store.ChainStore, ts *types.T
}

func (sm *StateManager) ExecutionTraceWithMonitor(ctx context.Context, ts *types.TipSet, em ExecMonitor) (cid.Cid, error) {
st, _, err := sm.tsExec.ExecuteTipSet(ctx, sm, ts, em, true)
st, _, err := sm.tsExec.ExecuteTipSet(ctx, sm, ts, &ExecutorOpts{
ExecMonitor: em,
VmTracing: true,
DiscardState: true, // safe to discard the output state since we're creating no new state anyway.
})
return st, err
}

Expand Down
12 changes: 11 additions & 1 deletion chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,19 @@ func (m *migrationResultCache) Store(ctx context.Context, root cid.Cid, resultCi
return nil
}

// ExecutorOpts defines options for the execution.
type ExecutorOpts struct {
ExecMonitor ExecMonitor
VmTracing bool
// DiscardState, when enabled, skips writing the output state tree into the
// blockstore. This is useful when requesting execution traces for tipsets
// already in the chain, or for speculative executions.
DiscardState bool
}

type Executor interface {
NewActorRegistry() *vm.ActorRegistry
ExecuteTipSet(ctx context.Context, sm *StateManager, ts *types.TipSet, em ExecMonitor, vmTracing bool) (stateroot cid.Cid, rectsroot cid.Cid, err error)
ExecuteTipSet(ctx context.Context, sm *StateManager, ts *types.TipSet, opts *ExecutorOpts) (stateroot cid.Cid, rectsroot cid.Cid, err error)
}

type StateManager struct {
Expand Down
4 changes: 3 additions & 1 deletion chain/stmgr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"

"github.com/filecoin-project/lotus/blockstore"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -86,13 +87,14 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
// future. It's not guaranteed to be accurate... but that's fine.
}

bs := blockstore.NewBuffered(sm.cs.StateBlockstore())
r := rand.NewStateRand(sm.cs, ts.Cids(), sm.beacon, sm.GetNetworkVersion)
vmopt := &vm.VMOpts{
StateBase: base,
Epoch: height,
Timestamp: ts.MinTimestamp(),
Rand: r,
Bstore: sm.cs.StateBlockstore(),
Bstore: bs,
Actors: sm.tsExec.NewActorRegistry(),
Syscalls: sm.Syscalls,
CircSupplyCalc: sm.GetVMCirculatingSupply,
Expand Down
6 changes: 4 additions & 2 deletions conformance/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params
blocks,
params.ExecEpoch,
params.Rand,
recordOutputs,
true,
&stmgr.ExecutorOpts{
ExecMonitor: recordOutputs,
VmTracing: true,
},
params.BaseFee,
nil,
)
Expand Down
2 changes: 1 addition & 1 deletion node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,7 +1843,7 @@ func messagesAndReceipts(ctx context.Context, ts *types.TipSet, cs *store.ChainS
return nil, nil, xerrors.Errorf("error loading messages for tipset: %v: %w", ts, err)
}

_, rcptRoot, err := sa.StateManager.TipSetState(ctx, ts)
_, rcptRoot, err := sa.StateManager.TipSetStateWithOpts(ctx, ts, &stmgr.TipSetStateOpts{DiscardState: true})
if err != nil {
return nil, nil, xerrors.Errorf("failed to compute state: %w", err)
}
Expand Down

0 comments on commit 530b1d1

Please sign in to comment.