From f1e0903442a6c7cb88580236e7897298ed85ae11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 14 Feb 2023 12:32:02 +0100 Subject: [PATCH] feat: ethrpc: Support newPendingTransactions --- itests/eth_filter_test.go | 107 ++++++++++++++++++++++++++++++++++++++ node/impl/full/eth.go | 23 +++++++- 2 files changed, 128 insertions(+), 2 deletions(-) diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 9540c5ebead..1104bec1346 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -136,6 +136,113 @@ func TestEthNewPendingTransactionFilter(t *testing.T) { } } +func TestEthNewPendingTransactionSub(t *testing.T) { + require := require.New(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + kit.QuietAllLogsExcept("events", "messagepool") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(err) + + // install filter + subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newPendingTransactions"})).Assert(require.NoError)) + require.NoError(err) + + var subResponses []ethtypes.EthSubscriptionResponse + err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + subResponses = append(subResponses, *resp) + return nil + }) + require.NoError(err) + + const iterations = 100 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(err) + <-headChangeCh // skip hccurrent + + defer func() { + close(waitAllCh) + }() + + count := 0 + for { + select { + case <-ctx.Done(): + return + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply { + msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) + require.NoError(err) + count += len(msgs) + if count == iterations { + return + } + } + } + } + } + }() + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(err) + require.EqualValues(i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + select { + case <-waitAllCh: + case <-ctx.Done(): + t.Errorf("timeout waiting to pack messages") + } + + expected := make(map[string]bool) + for _, sm := range sms { + hash, err := ethtypes.EthHashFromCid(sm.Cid()) + require.NoError(err) + expected[hash.String()] = false + } + + // expect to have seen iteration number of mpool messages + require.Equal(len(subResponses), len(expected), "expected number of filter results to equal number of messages") + + for _, txid := range subResponses { + expected[txid.Result.(string)] = true + } + + for _, found := range expected { + require.True(found) + } +} + func TestEthNewBlockFilter(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 69571ca1f3b..775550be230 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1271,8 +1271,9 @@ func (e *EthEvent) uninstallFilter(ctx context.Context, f filter.Filter) error { } const ( - EthSubscribeEventTypeHeads = "newHeads" - EthSubscribeEventTypeLogs = "logs" + EthSubscribeEventTypeHeads = "newHeads" + EthSubscribeEventTypeLogs = "logs" + EthSubscribeEventTypePendingTransactions = "newPendingTransactions" ) func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) { @@ -1334,6 +1335,15 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty _, _ = e.EthUnsubscribe(ctx, sub.id) return ethtypes.EthSubscriptionID{}, err } + sub.addFilter(ctx, f) + case EthSubscribeEventTypePendingTransactions: + f, err := e.MemPoolFilterManager.Install(ctx) + if err != nil { + // clean up any previous filters added and stop the sub + _, _ = e.EthUnsubscribe(ctx, sub.id) + return ethtypes.EthSubscriptionID{}, err + } + sub.addFilter(ctx, f) default: return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", params.EventType) @@ -1654,6 +1664,15 @@ func (e *ethSubscription) start(ctx context.Context) { } e.send(ctx, ev) + case *types.SignedMessage: // mpool txid + evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}, e.StateAPI) + if err != nil { + continue + } + + for _, r := range evs.Results { + e.send(ctx, r) + } default: log.Warnf("unexpected subscription value type: %T", vt) }