Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ethrpc: Support newPendingTransactions in eth_subscribe #10269

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,113 @@ func TestEthNewPendingTransactionFilter(t *testing.T) {
}
}

func TestEthNewPendingTransactionSub(t *testing.T) {
require := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

kit.QuietAllLogsExcept("events", "messagepool")

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)

// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(err)

// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(err)

// install filter
subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newPendingTransactions"})).Assert(require.NoError))
require.NoError(err)

var subResponses []ethtypes.EthSubscriptionResponse
err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
subResponses = append(subResponses, *resp)
return nil
})
require.NoError(err)

const iterations = 100

// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))

waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(err)
<-headChangeCh // skip hccurrent

defer func() {
close(waitAllCh)
}()

count := 0
for {
select {
case <-ctx.Done():
return
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(err)
count += len(msgs)
if count == iterations {
return
}
}
}
}
}
}()

var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}

sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(err)
require.EqualValues(i, sm.Message.Nonce)

sms = append(sms, sm)
}

select {
case <-waitAllCh:
case <-ctx.Done():
t.Errorf("timeout waiting to pack messages")
}

expected := make(map[string]bool)
for _, sm := range sms {
hash, err := ethtypes.EthHashFromCid(sm.Cid())
require.NoError(err)
expected[hash.String()] = false
}

// expect to have seen iteration number of mpool messages
require.Equal(len(subResponses), len(expected), "expected number of filter results to equal number of messages")

for _, txid := range subResponses {
expected[txid.Result.(string)] = true
}

for _, found := range expected {
require.True(found)
}
}

func TestEthNewBlockFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down
23 changes: 21 additions & 2 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,8 +1271,9 @@ func (e *EthEvent) uninstallFilter(ctx context.Context, f filter.Filter) error {
}

const (
EthSubscribeEventTypeHeads = "newHeads"
EthSubscribeEventTypeLogs = "logs"
EthSubscribeEventTypeHeads = "newHeads"
EthSubscribeEventTypeLogs = "logs"
EthSubscribeEventTypePendingTransactions = "newPendingTransactions"
)

func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
Expand Down Expand Up @@ -1334,6 +1335,15 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty
_, _ = e.EthUnsubscribe(ctx, sub.id)
return ethtypes.EthSubscriptionID{}, err
}
sub.addFilter(ctx, f)
case EthSubscribeEventTypePendingTransactions:
f, err := e.MemPoolFilterManager.Install(ctx)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id)
return ethtypes.EthSubscriptionID{}, err
}

sub.addFilter(ctx, f)
default:
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", params.EventType)
Expand Down Expand Up @@ -1654,6 +1664,15 @@ func (e *ethSubscription) start(ctx context.Context) {
}

e.send(ctx, ev)
case *types.SignedMessage: // mpool txid
evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}, e.StateAPI)
if err != nil {
continue
}

for _, r := range evs.Results {
e.send(ctx, r)
}
default:
log.Warnf("unexpected subscription value type: %T", vt)
}
Expand Down