Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: evm: align events implementation with FIP-0049 and FIP-0054 #5695

Merged
merged 3 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/submodule/eth/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ func parseEthTopics(topics types.EthTopicSpec) (map[string][][]byte, error) {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
key := fmt.Sprintf("t%d", idx+1)
for _, v := range vals {
encodedVal, err := cborEncodeTopicValue(v[:])
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions app/submodule/eth/eth_event_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-varint"
"golang.org/x/xerrors"
)

const ChainHeadConfidence = 1
Expand Down Expand Up @@ -434,7 +433,7 @@ func (e *ethEventAPI) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ty

ethCb, ok := jsonrpc.ExtractReverseClient[v1.EthSubscriberMethods](ctx)
if !ok {
return types.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
return types.EthSubscriptionID{}, fmt.Errorf("connection doesn't support callbacks")
}

sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription)
Expand Down Expand Up @@ -469,7 +468,7 @@ func (e *ethEventAPI) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ty
for _, ea := range params.Params.Address {
a, err := ea.ToFilecoinAddress()
if err != nil {
return types.EthSubscriptionID{}, xerrors.Errorf("invalid address %x", ea)
return types.EthSubscriptionID{}, fmt.Errorf("invalid address %x", ea)
}
addresses = append(addresses, a)
}
Expand Down
1 change: 1 addition & 0 deletions app/submodule/syncer/syncer_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewSyncerSubmodule(ctx context.Context,
chn.SystemCall,
circulatingSupplyCalculator,
config.Repo().Config().NetworkParams.ActorDebugging,
config.Repo().Config().FevmConfig.EnableEthRPC,
)

stmgr := statemanger.NewStateManger(chn.ChainReader, nodeConsensus, rnd,
Expand Down
2 changes: 1 addition & 1 deletion extern/filecoin-ffi
4 changes: 4 additions & 0 deletions pkg/consensus/expected.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type Expected struct {
blockValidator *BlockValidator

actorDebugging bool
returnEvents bool
}

// NewExpected is the constructor for the Expected consenus.Protocol module.
Expand All @@ -143,6 +144,7 @@ func NewExpected(cs cbor.IpldStore,
syscalls vm.SyscallsImpl,
circulatingSupplyCalculator chain.ICirculatingSupplyCalcualtor,
actorDebugging bool,
returnEvents bool,
) *Expected {
processor := NewDefaultProcessor(syscalls, circulatingSupplyCalculator)
return &Expected{
Expand All @@ -157,6 +159,7 @@ func NewExpected(cs cbor.IpldStore,
gasPirceSchedule: gasPirceSchedule,
blockValidator: blockValidator,
actorDebugging: actorDebugging,
returnEvents: returnEvents,
}
}

Expand Down Expand Up @@ -215,6 +218,7 @@ func (c *Expected) RunStateTransition(ctx context.Context, ts *types.TipSet, cb
TipSetGetter: vmcontext.TipSetGetterForTipset(c.chainState.GetTipSetByHeight, ts),
Tracing: false,
ActorDebugging: c.actorDebugging,
ReturnEvents: c.returnEvents,
}

var parentEpoch abi.ChainEpoch
Expand Down
43 changes: 41 additions & 2 deletions pkg/consensus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/venus/venus-shared/actors/builtin/reward"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-actors/v7/actors/builtin"
Expand All @@ -19,7 +20,9 @@ import (
"github.com/filecoin-project/venus/venus-shared/actors/builtin/cron"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
)

var processLog = logging.Logger("process block")
Expand Down Expand Up @@ -71,8 +74,12 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
cb vm.ExecCallBack,
) (cid.Cid, []types.MessageReceipt, error) {
toProcessTipset := time.Now()
var receipts []types.MessageReceipt
var err error
var (
receipts []types.MessageReceipt
err error
storingEvents = vmOpts.ReturnEvents
events [][]types.Event
)

makeVMWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (vm.Interface, error) {
vmOpt := vm.VmOption{
Expand Down Expand Up @@ -168,6 +175,12 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
minerPenaltyTotal = big.Add(minerPenaltyTotal, ret.OutPuts.MinerPenalty)
minerGasRewardTotal = big.Add(minerGasRewardTotal, ret.OutPuts.MinerTip)
receipts = append(receipts, ret.Receipt)

if storingEvents {
// Appends nil when no events are returned to preserve positional alignment.
events = append(events, ret.Events)
}

if cb != nil {
if err := cb(mcid, m.VMMessage(), ret); err != nil {
return cid.Undef, nil, err
Expand Down Expand Up @@ -210,6 +223,23 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
}
}

// Slice will be empty if not storing events.
for i, evs := range events {
if len(evs) == 0 {
continue
}
switch root, err := storeEventsAMT(ctx, vmOpts.Bsstore, evs); {
case err != nil:
return cid.Undef, nil, fmt.Errorf("failed to store events amt: %w", err)
case i >= len(receipts):
return cid.Undef, nil, fmt.Errorf("assertion failed: receipt and events array lengths inconsistent")
case receipts[i].EventsRoot == nil:
return cid.Undef, nil, fmt.Errorf("assertion failed: VM returned events with no events root")
case root != *receipts[i].EventsRoot:
return cid.Undef, nil, fmt.Errorf("assertion failed: returned events AMT root does not match derived")
}
}

processLog.Debugf("process cron: %v", time.Since(toProcessCron).Milliseconds())

root, err := vm.Flush(ctx)
Expand Down Expand Up @@ -263,3 +293,12 @@ func makeBlockRewardMessage(blockMiner address.Address,
Params: buf.Bytes(),
}
}

func storeEventsAMT(ctx context.Context, bs cbor.IpldBlockstore, events []types.Event) (cid.Cid, error) {
cst := cbor.NewCborStore(bs)
objs := make([]cbg.CBORMarshaler, len(events))
for i := 0; i < len(events); i++ {
objs[i] = &events[i]
}
return amt4.FromArray(ctx, cst, objs, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
}
33 changes: 27 additions & 6 deletions pkg/fvm/fvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ func (x *FvmExtern) workerKeyAtLookback(ctx context.Context, minerID address.Add
type FVM struct {
fvm *ffi.FVM
nv network.Version

// returnEvents specifies whether to parse and return events when applying messages.
returnEvents bool
}

func defaultFVMOpts(ctx context.Context, opts *vm.VmOption) (*ffi.FVMOpts, error) {
Expand Down Expand Up @@ -350,8 +353,9 @@ func NewFVM(ctx context.Context, opts *vm.VmOption) (*FVM, error) {
}

return &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
fvm: fvm,
nv: opts.NetworkVersion,
returnEvents: opts.ReturnEvents,
}, nil
}

Expand Down Expand Up @@ -452,8 +456,9 @@ func NewDebugFVM(ctx context.Context, opts *vm.VmOption) (*FVM, error) {
}

return &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
fvm: fvm,
nv: opts.NetworkVersion,
returnEvents: opts.ReturnEvents,
}, nil
}

Expand Down Expand Up @@ -506,7 +511,7 @@ func (fvm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*vm.Ret,
et.Error = aerr.Error()
}

return &vm.Ret{
applyRet := &vm.Ret{
Receipt: receipt,
OutPuts: gas.GasOutputs{
BaseFeeBurn: ret.BaseFeeBurn,
Expand All @@ -522,7 +527,16 @@ func (fvm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*vm.Ret,
ExecutionTrace: et,
},
Duration: duration,
}, nil
}

if fvm.returnEvents && len(ret.EventsBytes) > 0 {
applyRet.Events, err = types.DecodeEvents(ret.EventsBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode events returned by the FVM: %w", err)
}
}

return applyRet, nil
}

func (fvm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg types.ChainMsg) (*vm.Ret, error) {
Expand Down Expand Up @@ -584,6 +598,13 @@ func (fvm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg types.ChainMsg) (
Duration: duration,
}

if fvm.returnEvents && len(ret.EventsBytes) > 0 {
applyRet.Events, err = types.DecodeEvents(ret.EventsBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode events returned by the FVM: %w", err)
}
}

if ret.ExitCode != 0 {
return applyRet, fmt.Errorf("implicit message failed with exit code: %d and error: %w", ret.ExitCode, applyRet.ActorErr)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/vmcontext/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type VmOption struct { //nolint
Tracing bool

ActorDebugging bool
// ReturnEvents decodes and returns emitted events.
ReturnEvents bool
}

type ILookBack interface {
Expand Down Expand Up @@ -93,6 +95,7 @@ type Ret struct {
Receipt types.MessageReceipt
ActorErr error
Duration time.Duration
Events []types.Event
}

// Failure returns with a non-zero exit code.
Expand Down
Binary file modified venus-shared/actors/builtin-actors-code/v10.tar.zst
Binary file not shown.
Loading