Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/correct eth sub #5693

Merged
merged 3 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (b *Builder) build(ctx context.Context) (*Node, error) {
if err != nil {
return nil, err
}
if nd.eth, err = eth.NewEthSubModule(b.repo.Config(), nd.chain, nd.mpool, sqlitePath); err != nil {
if nd.eth, err = eth.NewEthSubModule(ctx, b.repo.Config(), nd.chain, nd.mpool, sqlitePath); err != nil {
return nil, err
}

Expand Down
7 changes: 6 additions & 1 deletion app/node/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ func (builder *RPCBuilder) AddService(service RPCService) error {
}

func (builder *RPCBuilder) Build(version string, limiter *ratelimit.RateLimiter) *jsonrpc.RPCServer {
var server *jsonrpc.RPCServer
serverOptions := make([]jsonrpc.ServerOption, 0)
serverOptions = append(serverOptions, jsonrpc.WithProxyBind(jsonrpc.PBMethod))

server := jsonrpc.NewServer(serverOptions...)
switch version {
case "v0":
server = jsonrpc.NewServer(serverOptions...)

var fullNodeV0 v0api.FullNodeStruct
for _, apiStruct := range builder.v0APIStruct {
permission.PermissionProxy(apiStruct, &fullNodeV0)
Expand All @@ -139,6 +141,9 @@ func (builder *RPCBuilder) Build(version string, limiter *ratelimit.RateLimiter)
server.Register(nameSpace, &fullNodeV0)
}
case "v1":
serverOptions = append(serverOptions, jsonrpc.WithReverseClient[v1api.EthSubscriberMethods](v1api.MethodNamespace))
server = jsonrpc.NewServer(serverOptions...)

var fullNode v1api.FullNodeStruct
for _, apiStruct := range builder.v1APIStruct {
permission.PermissionProxy(apiStruct, &fullNode)
Expand Down
75 changes: 53 additions & 22 deletions app/submodule/eth/eth_event_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package eth

import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/venus/pkg/chain"
Expand All @@ -20,20 +22,22 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-varint"
"golang.org/x/xerrors"
)

const ChainHeadConfidence = 1

var _ v1.IETHEvent = (*ethEventAPI)(nil)

func newEthEventAPI(em *EthSubModule) (*ethEventAPI, error) {
func newEthEventAPI(ctx context.Context, em *EthSubModule) (*ethEventAPI, error) {
chainAPI := em.chainModule.API()
bsstore := em.chainModule.ChainReader.Blockstore()
cfg := em.cfg.FevmConfig
ee := &ethEventAPI{
em: em,
ChainAPI: chainAPI,
MaxFilterHeightRange: abi.ChainEpoch(cfg.Event.MaxFilterHeightRange),
SubscribtionCtx: ctx,
}

if !cfg.EnableEthRPC || cfg.Event.EnableRealTimeFilterAPI {
Expand Down Expand Up @@ -112,6 +116,7 @@ type ethEventAPI struct {
FilterStore filter.FilterStore
SubManager *EthSubscriptionManager
MaxFilterHeightRange abi.ChainEpoch
SubscribtionCtx context.Context
}

func (e *ethEventAPI) Start(ctx context.Context) error {
Expand Down Expand Up @@ -416,52 +421,72 @@ const (
EthSubscribeEventTypeLogs = "logs"
)

func (e *ethEventAPI) EthSubscribe(ctx context.Context, eventType string, params *types.EthSubscriptionParams) (<-chan types.EthSubscriptionResponse, error) {
func (e *ethEventAPI) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (types.EthSubscriptionID, error) {
params, err := jsonrpc.DecodeParams[types.EthSubscribeParams](p)
if err != nil {
return types.EthSubscriptionID{}, fmt.Errorf("decoding params: %w", err)
}
if e.SubManager == nil {
return nil, api.ErrNotSupported
return types.EthSubscriptionID{}, api.ErrNotSupported
}
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice.

sub, err := e.SubManager.StartSubscription(ctx)
ethCb, ok := jsonrpc.ExtractReverseClient[v1.EthSubscriberMethods](ctx)
if !ok {
return types.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
}

sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription)
if err != nil {
return nil, err
return types.EthSubscriptionID{}, err
}

switch eventType {
switch params.EventType {
case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err
return types.EthSubscriptionID{}, err
}
sub.addFilter(ctx, f)

case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{}
if params != nil {
if params.Params != nil {
var err error
keys, err = parseEthTopics(params.Topics)
keys, err = parseEthTopics(params.Params.Topics)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err
return types.EthSubscriptionID{}, err
}
}

var addresses []address.Address
if params.Params != nil {
for _, ea := range params.Params.Address {
a, err := ea.ToFilecoinAddress()
if err != nil {
return types.EthSubscriptionID{}, xerrors.Errorf("invalid address %x", ea)
}
addresses = append(addresses, a)
}
}

f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys)
f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, addresses, keys)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err
return types.EthSubscriptionID{}, err
}
sub.addFilter(ctx, f)
default:
return nil, fmt.Errorf("unsupported event type: %s", eventType)
return types.EthSubscriptionID{}, fmt.Errorf("unsupported event type: %s", params.EventType)
}

return sub.out, nil
return sub.id, nil
}

func (e *ethEventAPI) EthUnsubscribe(ctx context.Context, id types.EthSubscriptionID) (bool, error) {
Expand Down Expand Up @@ -610,7 +635,7 @@ type EthSubscriptionManager struct { // nolint
subs map[types.EthSubscriptionID]*ethSubscription
}

func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { // nolint
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint
rawid, err := uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("new uuid: %w", err)
Expand All @@ -625,7 +650,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
messageStore: e.messageStore,
id: id,
in: make(chan interface{}, 200),
out: make(chan types.EthSubscriptionResponse, 20),
out: out,
quit: quit,
}

Expand Down Expand Up @@ -655,12 +680,14 @@ func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id types.
return sub.filters, nil
}

type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error

type ethSubscription struct {
chainAPI v1.IChain
messageStore *chain.MessageStore
id types.EthSubscriptionID
in chan interface{}
out chan types.EthSubscriptionResponse
out ethSubscriptionCallback

mu sync.Mutex
filters []filter.Filter
Expand Down Expand Up @@ -704,10 +731,15 @@ func (e *ethSubscription) start(ctx context.Context) {
continue
}

select {
case e.out <- resp:
default:
// Skip if client is not reading responses
outParam, err := json.Marshal(resp)
if err != nil {
log.Warnw("marshaling subscription response", "sub", e.id, "error", err)
continue
}

if err := e.out(ctx, outParam); err != nil {
log.Warnw("sending subscription response", "sub", e.id, "error", err)
continue
}
}
}
Expand All @@ -719,7 +751,6 @@ func (e *ethSubscription) stop() {

if e.quit != nil {
e.quit()
close(e.out)
e.quit = nil
}
}
Expand Down
5 changes: 3 additions & 2 deletions app/submodule/eth/eth_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
)

func NewEthSubModule(cfg *config.Config,
func NewEthSubModule(ctx context.Context,
cfg *config.Config,
chainModule *chain.ChainSubmodule,
mpoolModule *mpool.MessagePoolSubmodule,
sqlitePath string,
Expand All @@ -21,7 +22,7 @@ func NewEthSubModule(cfg *config.Config,
mpoolModule: mpoolModule,
sqlitePath: sqlitePath,
}
ee, err := newEthEventAPI(em)
ee, err := newEthEventAPI(ctx, em)
if err != nil {
return nil, fmt.Errorf("create eth event api error %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,6 @@ require (

replace (
github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
github.com/filecoin-project/go-jsonrpc => github.com/ipfs-force-community/go-jsonrpc v0.1.4-0.20210731021807-68e5207079bc
github.com/filecoin-project/go-jsonrpc => github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230202064402-ca8d607e4c4b
github.com/filecoin-project/test-vectors => ./extern/test-vectors
)
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.2.2/go.mod h1:fa/d1lAdUHxuc1jedx3
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs-force-community/go-jsonrpc v0.1.4-0.20210731021807-68e5207079bc h1:L4JH2Ltl/Embq4qYezs3RsIYW1BB/fB9TfUkk42FOzU=
github.com/ipfs-force-community/go-jsonrpc v0.1.4-0.20210731021807-68e5207079bc/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230202064402-ca8d607e4c4b h1:aB4WyUMOacSm2OguMkptzA8d8+E7NI1X2PBCRGOYBsw=
github.com/ipfs-force-community/go-jsonrpc v0.1.7-0.20230202064402-ca8d607e4c4b/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/ipfs-force-community/metrics v1.0.1-0.20211022060227-11142a08b729 h1:elS3KmzAMVrcZpmP2RMEjs9Zlwh6LfhJTfYQdj4TREs=
github.com/ipfs-force-community/metrics v1.0.1-0.20211022060227-11142a08b729/go.mod h1:mn40SioMuKtjmRumHFy/fJ26Pn028XuDjUJE9dorjyw=
github.com/ipfs/bbloom v0.0.1/go.mod h1:oqo8CVWsJFMOZqTglBG4wydCE4IQA/G2/SEofB0rjUI=
Expand Down Expand Up @@ -812,7 +812,6 @@ github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW
github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
Expand Down
57 changes: 57 additions & 0 deletions venus-devtool/api-gen/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading