Skip to content

Commit

Permalink
Merge pull request #5695 from filecoin-project/feat/events-alignment
Browse files Browse the repository at this point in the history
feat: evm: align events implementation with FIP-0049 and FIP-0054
  • Loading branch information
diwufeiwen authored Feb 7, 2023
2 parents 964b0ad + 10eac96 commit 554a814
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 163 deletions.
2 changes: 1 addition & 1 deletion app/submodule/eth/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ func parseEthTopics(topics types.EthTopicSpec) (map[string][][]byte, error) {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
key := fmt.Sprintf("t%d", idx+1)
for _, v := range vals {
encodedVal, err := cborEncodeTopicValue(v[:])
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions app/submodule/eth/eth_event_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-varint"
"golang.org/x/xerrors"
)

const ChainHeadConfidence = 1
Expand Down Expand Up @@ -434,7 +433,7 @@ func (e *ethEventAPI) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ty

ethCb, ok := jsonrpc.ExtractReverseClient[v1.EthSubscriberMethods](ctx)
if !ok {
return types.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
return types.EthSubscriptionID{}, fmt.Errorf("connection doesn't support callbacks")
}

sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription)
Expand Down Expand Up @@ -469,7 +468,7 @@ func (e *ethEventAPI) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ty
for _, ea := range params.Params.Address {
a, err := ea.ToFilecoinAddress()
if err != nil {
return types.EthSubscriptionID{}, xerrors.Errorf("invalid address %x", ea)
return types.EthSubscriptionID{}, fmt.Errorf("invalid address %x", ea)
}
addresses = append(addresses, a)
}
Expand Down
1 change: 1 addition & 0 deletions app/submodule/syncer/syncer_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewSyncerSubmodule(ctx context.Context,
chn.SystemCall,
circulatingSupplyCalculator,
config.Repo().Config().NetworkParams.ActorDebugging,
config.Repo().Config().FevmConfig.EnableEthRPC,
)

stmgr := statemanger.NewStateManger(chn.ChainReader, nodeConsensus, rnd,
Expand Down
2 changes: 1 addition & 1 deletion extern/filecoin-ffi
4 changes: 4 additions & 0 deletions pkg/consensus/expected.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type Expected struct {
blockValidator *BlockValidator

actorDebugging bool
returnEvents bool
}

// NewExpected is the constructor for the Expected consenus.Protocol module.
Expand All @@ -143,6 +144,7 @@ func NewExpected(cs cbor.IpldStore,
syscalls vm.SyscallsImpl,
circulatingSupplyCalculator chain.ICirculatingSupplyCalcualtor,
actorDebugging bool,
returnEvents bool,
) *Expected {
processor := NewDefaultProcessor(syscalls, circulatingSupplyCalculator)
return &Expected{
Expand All @@ -157,6 +159,7 @@ func NewExpected(cs cbor.IpldStore,
gasPirceSchedule: gasPirceSchedule,
blockValidator: blockValidator,
actorDebugging: actorDebugging,
returnEvents: returnEvents,
}
}

Expand Down Expand Up @@ -215,6 +218,7 @@ func (c *Expected) RunStateTransition(ctx context.Context, ts *types.TipSet, cb
TipSetGetter: vmcontext.TipSetGetterForTipset(c.chainState.GetTipSetByHeight, ts),
Tracing: false,
ActorDebugging: c.actorDebugging,
ReturnEvents: c.returnEvents,
}

var parentEpoch abi.ChainEpoch
Expand Down
43 changes: 41 additions & 2 deletions pkg/consensus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/venus/venus-shared/actors/builtin/reward"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-actors/v7/actors/builtin"
Expand All @@ -19,7 +20,9 @@ import (
"github.com/filecoin-project/venus/venus-shared/actors/builtin/cron"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
)

var processLog = logging.Logger("process block")
Expand Down Expand Up @@ -71,8 +74,12 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
cb vm.ExecCallBack,
) (cid.Cid, []types.MessageReceipt, error) {
toProcessTipset := time.Now()
var receipts []types.MessageReceipt
var err error
var (
receipts []types.MessageReceipt
err error
storingEvents = vmOpts.ReturnEvents
events [][]types.Event
)

makeVMWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (vm.Interface, error) {
vmOpt := vm.VmOption{
Expand Down Expand Up @@ -168,6 +175,12 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
minerPenaltyTotal = big.Add(minerPenaltyTotal, ret.OutPuts.MinerPenalty)
minerGasRewardTotal = big.Add(minerGasRewardTotal, ret.OutPuts.MinerTip)
receipts = append(receipts, ret.Receipt)

if storingEvents {
// Appends nil when no events are returned to preserve positional alignment.
events = append(events, ret.Events)
}

if cb != nil {
if err := cb(mcid, m.VMMessage(), ret); err != nil {
return cid.Undef, nil, err
Expand Down Expand Up @@ -210,6 +223,23 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
}
}

// Slice will be empty if not storing events.
for i, evs := range events {
if len(evs) == 0 {
continue
}
switch root, err := storeEventsAMT(ctx, vmOpts.Bsstore, evs); {
case err != nil:
return cid.Undef, nil, fmt.Errorf("failed to store events amt: %w", err)
case i >= len(receipts):
return cid.Undef, nil, fmt.Errorf("assertion failed: receipt and events array lengths inconsistent")
case receipts[i].EventsRoot == nil:
return cid.Undef, nil, fmt.Errorf("assertion failed: VM returned events with no events root")
case root != *receipts[i].EventsRoot:
return cid.Undef, nil, fmt.Errorf("assertion failed: returned events AMT root does not match derived")
}
}

processLog.Debugf("process cron: %v", time.Since(toProcessCron).Milliseconds())

root, err := vm.Flush(ctx)
Expand Down Expand Up @@ -263,3 +293,12 @@ func makeBlockRewardMessage(blockMiner address.Address,
Params: buf.Bytes(),
}
}

func storeEventsAMT(ctx context.Context, bs cbor.IpldBlockstore, events []types.Event) (cid.Cid, error) {
cst := cbor.NewCborStore(bs)
objs := make([]cbg.CBORMarshaler, len(events))
for i := 0; i < len(events); i++ {
objs[i] = &events[i]
}
return amt4.FromArray(ctx, cst, objs, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
}
33 changes: 27 additions & 6 deletions pkg/fvm/fvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ func (x *FvmExtern) workerKeyAtLookback(ctx context.Context, minerID address.Add
type FVM struct {
fvm *ffi.FVM
nv network.Version

// returnEvents specifies whether to parse and return events when applying messages.
returnEvents bool
}

func defaultFVMOpts(ctx context.Context, opts *vm.VmOption) (*ffi.FVMOpts, error) {
Expand Down Expand Up @@ -350,8 +353,9 @@ func NewFVM(ctx context.Context, opts *vm.VmOption) (*FVM, error) {
}

return &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
fvm: fvm,
nv: opts.NetworkVersion,
returnEvents: opts.ReturnEvents,
}, nil
}

Expand Down Expand Up @@ -452,8 +456,9 @@ func NewDebugFVM(ctx context.Context, opts *vm.VmOption) (*FVM, error) {
}

return &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
fvm: fvm,
nv: opts.NetworkVersion,
returnEvents: opts.ReturnEvents,
}, nil
}

Expand Down Expand Up @@ -506,7 +511,7 @@ func (fvm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*vm.Ret,
et.Error = aerr.Error()
}

return &vm.Ret{
applyRet := &vm.Ret{
Receipt: receipt,
OutPuts: gas.GasOutputs{
BaseFeeBurn: ret.BaseFeeBurn,
Expand All @@ -522,7 +527,16 @@ func (fvm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*vm.Ret,
ExecutionTrace: et,
},
Duration: duration,
}, nil
}

if fvm.returnEvents && len(ret.EventsBytes) > 0 {
applyRet.Events, err = types.DecodeEvents(ret.EventsBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode events returned by the FVM: %w", err)
}
}

return applyRet, nil
}

func (fvm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg types.ChainMsg) (*vm.Ret, error) {
Expand Down Expand Up @@ -584,6 +598,13 @@ func (fvm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg types.ChainMsg) (
Duration: duration,
}

if fvm.returnEvents && len(ret.EventsBytes) > 0 {
applyRet.Events, err = types.DecodeEvents(ret.EventsBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode events returned by the FVM: %w", err)
}
}

if ret.ExitCode != 0 {
return applyRet, fmt.Errorf("implicit message failed with exit code: %d and error: %w", ret.ExitCode, applyRet.ActorErr)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/vmcontext/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type VmOption struct { //nolint
Tracing bool

ActorDebugging bool
// ReturnEvents decodes and returns emitted events.
ReturnEvents bool
}

type ILookBack interface {
Expand Down Expand Up @@ -93,6 +95,7 @@ type Ret struct {
Receipt types.MessageReceipt
ActorErr error
Duration time.Duration
Events []types.Event
}

// Failure returns with a non-zero exit code.
Expand Down
Binary file modified venus-shared/actors/builtin-actors-code/v10.tar.zst
Binary file not shown.
Loading

0 comments on commit 554a814

Please sign in to comment.